Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:04

0001 import os
0002 import traceback
0003 from concurrent.futures import ThreadPoolExecutor
0004 
0005 from pandacommon.pandautils.net_utils import replace_hostname_in_url_randomly
0006 
0007 from pandaharvester.harvesterconfig import harvester_config
0008 from pandaharvester.harvestercore import core_utils
0009 from pandaharvester.harvestercore.plugin_base import PluginBase
0010 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0011 from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s
0012 from pandaharvester.harvestermisc.k8s_utils import k8s_Client
0013 from pandaharvester.harvestersubmitter import submitter_common
0014 
0015 base_logger = core_utils.setup_logger("k8s_submitter")
0016 
0017 
0018 # submitter for K8S
0019 class K8sSubmitter(PluginBase):
0020     def __init__(self, **kwarg):
0021         self.logBaseURL = None
0022         PluginBase.__init__(self, **kwarg)
0023 
0024         self.panda_queues_dict = PandaQueuesDictK8s()
0025 
0026         # retrieve the k8s namespace from CRIC
0027         namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName)
0028 
0029         self.k8s_client = k8s_Client(namespace=namespace, queue_name=self.queueName, config_file=self.k8s_config_file)
0030 
0031         # update or create the pilot starter executable
0032         self.k8s_client.create_or_patch_configmap_starter()
0033 
0034         # allowed associated parameters from CRIC
0035         self.allowed_cric_attrs = ("pilot_url",)
0036 
0037         # number of processes
0038         try:
0039             self.nProcesses
0040         except AttributeError:
0041             self.nProcesses = 1
0042         else:
0043             if (not self.nProcesses) or (self.nProcesses < 1):
0044                 self.nProcesses = 1
0045 
0046         # x509 proxy through k8s secrets: preferred way
0047         try:
0048             self.proxySecretPath
0049         except AttributeError:
0050             if os.getenv("PROXY_SECRET_PATH"):
0051                 self.proxySecretPath = os.getenv("PROXY_SECRET_PATH")
0052 
0053         # analysis x509 proxy through k8s secrets: on GU queues
0054         try:
0055             self.proxySecretPathAnalysis
0056         except AttributeError:
0057             if os.getenv("PROXY_SECRET_PATH_ANAL"):
0058                 self.proxySecretPath = os.getenv("PROXY_SECRET_PATH_ANAL")
0059 
0060         # token for pilot-pandaserver communications
0061         self.pandaTokenPath = getattr(self, "tokenDir", None)
0062         self.pandaTokenFilename = getattr(self, "pandaTokenFilename", None)
0063         self.pandaTokenKeyFilename = getattr(self, "pandaTokenKeyFilename", None)
0064 
0065     def _choose_proxy(self):
0066         """
0067         Choose the proxy based on the queue configuration
0068         """
0069         cert = None
0070 
0071         if self.proxySecretPath:
0072             cert = self.proxySecretPath
0073 
0074         return cert
0075 
0076     def submit_k8s_worker(self, work_spec):
0077         tmp_log = self.make_logger(base_logger, f"queueName={self.queueName}", method_name="submit_k8s_worker")
0078 
0079         # get info from harvester queue config
0080         _queueConfigMapper = QueueConfigMapper()
0081         harvester_queue_config = _queueConfigMapper.get_queue(self.queueName)
0082 
0083         # set the stdout log file
0084         log_file_name = f"{harvester_config.master.harvester_id}_{work_spec.workerID}_gz.out"
0085         log_server = replace_hostname_in_url_randomly(harvester_config.pandacon.pandaCacheURL)
0086         logs_frontend_r = log_server + harvester_config.pandacon.pandaCacheURL_R_path
0087         work_spec.set_log_file("stdout", f"{logs_frontend_r}/{log_file_name}")
0088 
0089         yaml_content = self.k8s_client.read_yaml_file(self.k8s_yaml_file)
0090         try:
0091             # choose the appropriate proxy
0092             this_panda_queue_dict = self.panda_queues_dict.get(self.queueName, dict())
0093             cert = self._choose_proxy()
0094             if not cert:
0095                 err_str = "No proxy specified in proxySecretPath. Not submitted"
0096                 return False, err_str
0097             # get the walltime limit
0098             try:
0099                 max_time = this_panda_queue_dict["maxtime"]
0100             except Exception as e:
0101                 tmp_log.warning(f"Could not retrieve maxtime field for queue {self.queueName}")
0102                 max_time = None
0103 
0104             associated_params_dict = {}
0105             for key, val in self.panda_queues_dict.get_harvester_params(self.queueName).items():
0106                 if key in self.allowed_cric_attrs:
0107                     associated_params_dict[key] = val
0108 
0109             pilot_url = associated_params_dict.get("pilot_url")
0110             pilot_version = str(this_panda_queue_dict.get("pilot_version", "current"))
0111             python_version = str(this_panda_queue_dict.get("python_version", "2"))
0112 
0113             prod_source_label_tmp = harvester_queue_config.get_source_label(work_spec.jobType)
0114             pilot_opt_dict = submitter_common.get_complicated_pilot_options(work_spec.pilotType, pilot_url, pilot_version, prod_source_label_tmp)
0115             if pilot_opt_dict is None:
0116                 prod_source_label = prod_source_label_tmp
0117                 pilot_type = work_spec.pilotType
0118                 pilot_url_str = f"--piloturl {pilot_url}" if pilot_url else ""
0119             else:
0120                 prod_source_label = pilot_opt_dict["prod_source_label"]
0121                 pilot_type = pilot_opt_dict["pilot_type_opt"]
0122                 pilot_url_str = pilot_opt_dict["pilot_url_str"]
0123 
0124             pilot_python_option = submitter_common.get_python_version_option(python_version, prod_source_label)
0125             host_image = self.panda_queues_dict.get_k8s_host_image(self.queueName)
0126 
0127             # submit the worker
0128             rsp, yaml_content_final = self.k8s_client.create_job_from_yaml(
0129                 yaml_content,
0130                 work_spec,
0131                 prod_source_label,
0132                 pilot_type,
0133                 pilot_url_str,
0134                 pilot_python_option,
0135                 pilot_version,
0136                 host_image,
0137                 cert,
0138                 self.pandaTokenPath,
0139                 self.pandaTokenFilename,
0140                 self.pandaTokenKeyFilename,
0141                 max_time=max_time,
0142             )
0143 
0144         except Exception as _e:
0145             tmp_log.error(traceback.format_exc())
0146             err_str = f"Failed to create a JOB; {_e}"
0147             tmp_return_value = (False, err_str)
0148         else:
0149             work_spec.batchID = yaml_content["metadata"]["name"]
0150             tmp_log.debug(f"Created worker {work_spec.workerID} with batchID={work_spec.batchID}")
0151             tmp_return_value = (True, "")
0152 
0153         return tmp_return_value
0154 
0155     # submit workers
0156     def submit_workers(self, workspec_list):
0157         tmp_log = self.make_logger(base_logger, f"queueName={self.queueName}", method_name="submit_workers")
0158 
0159         n_workers = len(workspec_list)
0160         tmp_log.debug(f"start, n_workers={n_workers}")
0161 
0162         ret_list = []
0163         if not workspec_list:
0164             tmp_log.debug("empty workspec_list")
0165             return ret_list
0166 
0167         with ThreadPoolExecutor(self.nProcesses) as thread_pool:
0168             ret_val_list = thread_pool.map(self.submit_k8s_worker, workspec_list)
0169             tmp_log.debug(f"{n_workers} workers submitted")
0170 
0171         ret_list = list(ret_val_list)
0172 
0173         tmp_log.debug("done")
0174 
0175         return ret_list