Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import os
0002 
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s
0005 from pandaharvester.harvestermisc.k8s_utils import k8s_Client
0006 
0007 from .base_messenger import BaseMessenger
0008 
0009 # logger
0010 _logger = core_utils.setup_logger("k8s_messenger")
0011 
0012 
0013 # Messenger for generic Kubernetes clusters
0014 class K8sMessenger(BaseMessenger):
0015     def __init__(self, **kwargs):
0016         BaseMessenger.__init__(self, **kwargs)
0017         try:
0018             self.logDir
0019         except AttributeError:
0020             print("K8sMessenger: Missing attribute logDir")
0021             raise
0022 
0023         # retrieve the k8s namespace from CRIC
0024         self.panda_queues_dict = PandaQueuesDictK8s()
0025         namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName)
0026 
0027         self.k8s_client = k8s_Client(namespace=namespace, queue_name=self.queueName, config_file=self.k8s_config_file)
0028         self._all_pods_list = self.k8s_client.get_pods_info()
0029 
0030     def post_processing(self, workspec, jobspec_list, map_type):
0031         """
0032         Do the following in post_processing, i.e. when workers terminate (finished/failed/cancelled)
0033         - Fetch logs of the pod from k8s
0034         - Store or upload logs
0035         """
0036         # get logger
0037         tmp_log = core_utils.make_logger(_logger, f"queueName={self.queueName} workerID={workspec.workerID}", method_name="post_processing")
0038         tmp_log.debug("start")
0039 
0040         if self._all_pods_list is None:
0041             tmp_log.error("No pod information")
0042             tmp_log.debug("done")
0043             return None
0044 
0045         try:
0046             # fetch and store logs
0047             job_id = workspec.batchID
0048             pods_list = self.k8s_client.filter_pods_info(self._all_pods_list, job_name=job_id)
0049             pod_name_list = [pods_info["name"] for pods_info in pods_list]
0050             outlog_filename = os.path.join(self.logDir, f"gridK8S.{workspec.workerID}.{workspec.batchID}.out")
0051             with open(outlog_filename, "w") as f:
0052                 for pod_name in pod_name_list:
0053                     current_log_str = self.k8s_client.get_pod_logs(pod_name)
0054                     f.write(current_log_str)
0055             # upload logs
0056             pass
0057             # return
0058             tmp_log.debug("done")
0059             return True
0060         except Exception:
0061             core_utils.dump_error_message(tmp_log)
0062             return None