Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 from pandaharvester.harvestercore import core_utils
0002 from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s
0003 from pandaharvester.harvestermisc.k8s_utils import k8s_Client
0004 from pandaharvester.harvestersweeper.base_sweeper import BaseSweeper
0005 
0006 # logger
0007 base_logger = core_utils.setup_logger("k8s_sweeper")
0008 
0009 
0010 # sweeper for K8S
0011 class K8sSweeper(BaseSweeper):
0012     # constructor
0013     def __init__(self, **kwarg):
0014         BaseSweeper.__init__(self, **kwarg)
0015 
0016         # retrieve the k8s namespace from CRIC
0017         self.panda_queues_dict = PandaQueuesDictK8s()
0018         namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName)
0019         self.k8s_client = k8s_Client(namespace, queue_name=self.queueName, config_file=self.k8s_config_file)
0020 
0021     # kill workers
0022     def kill_workers(self, work_spec_list):
0023         tmp_log = self.make_logger(base_logger, method_name="kill_workers")
0024 
0025         ret_list = []
0026         for work_spec in work_spec_list:
0027 
0028             batch_id = work_spec.batchID
0029             worker_id = str(work_spec.workerID)
0030             if batch_id:  # sometimes there are missed workers that were not submitted
0031                 # if push mode, delete the configmap
0032                 if work_spec.mapType != "NoJob":
0033                     try:
0034                         self.k8s_client.delete_config_map(worker_id)
0035                         tmp_log.debug(f"Deleted configmap {worker_id}")
0036                     except Exception as _e:
0037                         err_str = f"Failed to delete a CONFIGMAP with id={worker_id} ; {_e}"
0038                         tmp_log.error(err_str)
0039                 else:
0040                     tmp_log.debug(f"No pandajob/configmap associated to worker {work_spec.workerID}")
0041 
0042                 # delete the job
0043                 try:
0044                     self.k8s_client.delete_job(batch_id)
0045                     tmp_log.debug(f"Deleted JOB {batch_id}")
0046                     tmp_ret_val = (True, "")
0047                 except Exception as _e:
0048                     err_str = f"Failed to delete a JOB with id={batch_id} ; {_e}"
0049                     tmp_log.error(err_str)
0050                     tmp_ret_val = (False, err_str)
0051 
0052             else:  # the worker does not need be cleaned
0053                 tmp_ret_val = (True, "")
0054 
0055             ret_list.append(tmp_ret_val)
0056 
0057         return ret_list
0058 
0059     def sweep_worker(self, work_spec):
0060         # cleanup for a worker
0061         tmp_log = self.make_logger(base_logger, f"workerID={work_spec.workerID}", method_name="sweep_worker")
0062 
0063         # retrieve and upload the logs to panda cache
0064         # batch_id = work_spec.batchID
0065         # log_content = self.k8s_client.retrieve_pod_log(batch_id)
0066 
0067         # nothing to do
0068         return True, ""