File indexing completed on 2026-04-20 07:58:58
0001 import os
0002
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s
0005 from pandaharvester.harvestermisc.k8s_utils import k8s_Client
0006
0007 from .base_messenger import BaseMessenger
0008
0009
0010 _logger = core_utils.setup_logger("k8s_messenger")
0011
0012
0013
0014 class K8sMessenger(BaseMessenger):
0015 def __init__(self, **kwargs):
0016 BaseMessenger.__init__(self, **kwargs)
0017 try:
0018 self.logDir
0019 except AttributeError:
0020 print("K8sMessenger: Missing attribute logDir")
0021 raise
0022
0023
0024 self.panda_queues_dict = PandaQueuesDictK8s()
0025 namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName)
0026
0027 self.k8s_client = k8s_Client(namespace=namespace, queue_name=self.queueName, config_file=self.k8s_config_file)
0028 self._all_pods_list = self.k8s_client.get_pods_info()
0029
0030 def post_processing(self, workspec, jobspec_list, map_type):
0031 """
0032 Do the following in post_processing, i.e. when workers terminate (finished/failed/cancelled)
0033 - Fetch logs of the pod from k8s
0034 - Store or upload logs
0035 """
0036
0037 tmp_log = core_utils.make_logger(_logger, f"queueName={self.queueName} workerID={workspec.workerID}", method_name="post_processing")
0038 tmp_log.debug("start")
0039
0040 if self._all_pods_list is None:
0041 tmp_log.error("No pod information")
0042 tmp_log.debug("done")
0043 return None
0044
0045 try:
0046
0047 job_id = workspec.batchID
0048 pods_list = self.k8s_client.filter_pods_info(self._all_pods_list, job_name=job_id)
0049 pod_name_list = [pods_info["name"] for pods_info in pods_list]
0050 outlog_filename = os.path.join(self.logDir, f"gridK8S.{workspec.workerID}.{workspec.batchID}.out")
0051 with open(outlog_filename, "w") as f:
0052 for pod_name in pod_name_list:
0053 current_log_str = self.k8s_client.get_pod_logs(pod_name)
0054 f.write(current_log_str)
0055
0056 pass
0057
0058 tmp_log.debug("done")
0059 return True
0060 except Exception:
0061 core_utils.dump_error_message(tmp_log)
0062 return None