File indexing completed on 2026-04-19 08:00:04
0001 import os
0002 import traceback
0003 from concurrent.futures import ThreadPoolExecutor
0004
0005 from pandacommon.pandautils.net_utils import replace_hostname_in_url_randomly
0006
0007 from pandaharvester.harvesterconfig import harvester_config
0008 from pandaharvester.harvestercore import core_utils
0009 from pandaharvester.harvestercore.plugin_base import PluginBase
0010 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0011 from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s
0012 from pandaharvester.harvestermisc.k8s_utils import k8s_Client
0013 from pandaharvester.harvestersubmitter import submitter_common
0014
0015 base_logger = core_utils.setup_logger("k8s_submitter")
0016
0017
0018
0019 class K8sSubmitter(PluginBase):
0020 def __init__(self, **kwarg):
0021 self.logBaseURL = None
0022 PluginBase.__init__(self, **kwarg)
0023
0024 self.panda_queues_dict = PandaQueuesDictK8s()
0025
0026
0027 namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName)
0028
0029 self.k8s_client = k8s_Client(namespace=namespace, queue_name=self.queueName, config_file=self.k8s_config_file)
0030
0031
0032 self.k8s_client.create_or_patch_configmap_starter()
0033
0034
0035 self.allowed_cric_attrs = ("pilot_url",)
0036
0037
0038 try:
0039 self.nProcesses
0040 except AttributeError:
0041 self.nProcesses = 1
0042 else:
0043 if (not self.nProcesses) or (self.nProcesses < 1):
0044 self.nProcesses = 1
0045
0046
0047 try:
0048 self.proxySecretPath
0049 except AttributeError:
0050 if os.getenv("PROXY_SECRET_PATH"):
0051 self.proxySecretPath = os.getenv("PROXY_SECRET_PATH")
0052
0053
0054 try:
0055 self.proxySecretPathAnalysis
0056 except AttributeError:
0057 if os.getenv("PROXY_SECRET_PATH_ANAL"):
0058 self.proxySecretPath = os.getenv("PROXY_SECRET_PATH_ANAL")
0059
0060
0061 self.pandaTokenPath = getattr(self, "tokenDir", None)
0062 self.pandaTokenFilename = getattr(self, "pandaTokenFilename", None)
0063 self.pandaTokenKeyFilename = getattr(self, "pandaTokenKeyFilename", None)
0064
0065 def _choose_proxy(self):
0066 """
0067 Choose the proxy based on the queue configuration
0068 """
0069 cert = None
0070
0071 if self.proxySecretPath:
0072 cert = self.proxySecretPath
0073
0074 return cert
0075
0076 def submit_k8s_worker(self, work_spec):
0077 tmp_log = self.make_logger(base_logger, f"queueName={self.queueName}", method_name="submit_k8s_worker")
0078
0079
0080 _queueConfigMapper = QueueConfigMapper()
0081 harvester_queue_config = _queueConfigMapper.get_queue(self.queueName)
0082
0083
0084 log_file_name = f"{harvester_config.master.harvester_id}_{work_spec.workerID}_gz.out"
0085 log_server = replace_hostname_in_url_randomly(harvester_config.pandacon.pandaCacheURL)
0086 logs_frontend_r = log_server + harvester_config.pandacon.pandaCacheURL_R_path
0087 work_spec.set_log_file("stdout", f"{logs_frontend_r}/{log_file_name}")
0088
0089 yaml_content = self.k8s_client.read_yaml_file(self.k8s_yaml_file)
0090 try:
0091
0092 this_panda_queue_dict = self.panda_queues_dict.get(self.queueName, dict())
0093 cert = self._choose_proxy()
0094 if not cert:
0095 err_str = "No proxy specified in proxySecretPath. Not submitted"
0096 return False, err_str
0097
0098 try:
0099 max_time = this_panda_queue_dict["maxtime"]
0100 except Exception as e:
0101 tmp_log.warning(f"Could not retrieve maxtime field for queue {self.queueName}")
0102 max_time = None
0103
0104 associated_params_dict = {}
0105 for key, val in self.panda_queues_dict.get_harvester_params(self.queueName).items():
0106 if key in self.allowed_cric_attrs:
0107 associated_params_dict[key] = val
0108
0109 pilot_url = associated_params_dict.get("pilot_url")
0110 pilot_version = str(this_panda_queue_dict.get("pilot_version", "current"))
0111 python_version = str(this_panda_queue_dict.get("python_version", "2"))
0112
0113 prod_source_label_tmp = harvester_queue_config.get_source_label(work_spec.jobType)
0114 pilot_opt_dict = submitter_common.get_complicated_pilot_options(work_spec.pilotType, pilot_url, pilot_version, prod_source_label_tmp)
0115 if pilot_opt_dict is None:
0116 prod_source_label = prod_source_label_tmp
0117 pilot_type = work_spec.pilotType
0118 pilot_url_str = f"--piloturl {pilot_url}" if pilot_url else ""
0119 else:
0120 prod_source_label = pilot_opt_dict["prod_source_label"]
0121 pilot_type = pilot_opt_dict["pilot_type_opt"]
0122 pilot_url_str = pilot_opt_dict["pilot_url_str"]
0123
0124 pilot_python_option = submitter_common.get_python_version_option(python_version, prod_source_label)
0125 host_image = self.panda_queues_dict.get_k8s_host_image(self.queueName)
0126
0127
0128 rsp, yaml_content_final = self.k8s_client.create_job_from_yaml(
0129 yaml_content,
0130 work_spec,
0131 prod_source_label,
0132 pilot_type,
0133 pilot_url_str,
0134 pilot_python_option,
0135 pilot_version,
0136 host_image,
0137 cert,
0138 self.pandaTokenPath,
0139 self.pandaTokenFilename,
0140 self.pandaTokenKeyFilename,
0141 max_time=max_time,
0142 )
0143
0144 except Exception as _e:
0145 tmp_log.error(traceback.format_exc())
0146 err_str = f"Failed to create a JOB; {_e}"
0147 tmp_return_value = (False, err_str)
0148 else:
0149 work_spec.batchID = yaml_content["metadata"]["name"]
0150 tmp_log.debug(f"Created worker {work_spec.workerID} with batchID={work_spec.batchID}")
0151 tmp_return_value = (True, "")
0152
0153 return tmp_return_value
0154
0155
0156 def submit_workers(self, workspec_list):
0157 tmp_log = self.make_logger(base_logger, f"queueName={self.queueName}", method_name="submit_workers")
0158
0159 n_workers = len(workspec_list)
0160 tmp_log.debug(f"start, n_workers={n_workers}")
0161
0162 ret_list = []
0163 if not workspec_list:
0164 tmp_log.debug("empty workspec_list")
0165 return ret_list
0166
0167 with ThreadPoolExecutor(self.nProcesses) as thread_pool:
0168 ret_val_list = thread_pool.map(self.submit_k8s_worker, workspec_list)
0169 tmp_log.debug(f"{n_workers} workers submitted")
0170
0171 ret_list = list(ret_val_list)
0172
0173 tmp_log.debug("done")
0174
0175 return ret_list