File indexing completed on 2026-04-19 08:00:04
0001 import errno
0002 import json
0003 import os
0004 import random
0005 import re
0006 import socket
0007 import tempfile
0008 import threading
0009 from concurrent.futures import ThreadPoolExecutor
0010 from math import ceil
0011
0012 from pandaharvester.harvesterconfig import harvester_config
0013 from pandaharvester.harvestercore import core_utils
0014 from pandaharvester.harvestercore.plugin_base import PluginBase
0015 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0016 from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
0017 from pandaharvester.harvestermessenger.base_messenger import BaseMessenger
0018 from pandaharvester.harvestermisc.htcondor_utils import (
0019 CondorJobSubmit,
0020 get_job_id_tuple_from_batchid,
0021 )
0022 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0023 from pandaharvester.harvestermisc.token_utils import endpoint_to_filename
0024 from pandaharvester.harvestersubmitter import submitter_common
0025
0026
0027 baseLogger = core_utils.setup_logger("htcondor_submitter")
0028
0029
0030 base_messenger = BaseMessenger()
0031
0032
0033 def _condor_macro_replace(string, **kwarg):
0034 """
0035 Replace condor Macro from SDF file, return string
0036 """
0037 new_string = string
0038 macro_map = {
0039 "\$\(Cluster\)": str(kwarg["ClusterId"]),
0040 "\$\(Process\)": str(kwarg["ProcId"]),
0041 }
0042 for k, v in macro_map.items():
0043 new_string = re.sub(k, v, new_string)
0044 return new_string
0045
0046
0047 def submit_bag_of_workers(data_list):
0048 """
0049 submit a bag of workers
0050 """
0051
0052 tmpLog = core_utils.make_logger(baseLogger, method_name="submit_bag_of_workers")
0053
0054 workerIDs_list = [data["workspec"].workerID for data in data_list]
0055
0056 worker_retval_map = {}
0057 worker_data_map = {}
0058 host_jdl_list_workerid_map = {}
0059
0060 for data in data_list:
0061 workspec = data["workspec"]
0062 workerID = workspec.workerID
0063 worker_data_map[workerID] = data
0064 to_submit = data["to_submit"]
0065
0066 if not to_submit:
0067 errStr = f"{workerID} not submitted due to incomplete data of the worker"
0068 tmpLog.warning(errStr)
0069 tmpRetVal = (None, errStr)
0070
0071 worker_retval_map[workerID] = (tmpRetVal, workspec.get_changed_attributes())
0072
0073 try:
0074 use_spool = data["use_spool"]
0075 except KeyError:
0076 errStr = f"{workerID} not submitted due to incomplete data of the worker"
0077 tmpLog.warning(errStr)
0078 tmpRetVal = (None, errStr)
0079
0080 worker_retval_map[workerID] = (tmpRetVal, workspec.get_changed_attributes())
0081 else:
0082 workspec.reset_changed_list()
0083
0084 a_jdl, placeholder_map = make_a_jdl(**data)
0085 val = (workspec, a_jdl, placeholder_map)
0086 try:
0087 host_jdl_list_workerid_map[workspec.submissionHost].append(val)
0088 except KeyError:
0089 host_jdl_list_workerid_map[workspec.submissionHost] = [val]
0090
0091 for host, val_list in host_jdl_list_workerid_map.items():
0092
0093 jdl_list = [val[1] for val in val_list]
0094
0095 tmpLog.debug(f"submitting to submissionHost={host}")
0096
0097 try:
0098 condor_job_submit = CondorJobSubmit(id=host)
0099 batchIDs_list, ret_err_str = condor_job_submit.submit(jdl_list, use_spool=use_spool)
0100 except Exception as e:
0101 batchIDs_list = None
0102 ret_err_str = f"Exception {e.__class__.__name__}: {e}"
0103
0104 if batchIDs_list:
0105
0106 n_workers = len(val_list)
0107 tmpLog.debug(f"submitted {n_workers} workers to submissionHost={host}")
0108 for val_i in range(n_workers):
0109 val = val_list[val_i]
0110 workspec = val[0]
0111 placeholder_map = val[2]
0112
0113 workspec.batchID = batchIDs_list[val_i]
0114 tmpLog.debug(f"workerID={workspec.workerID} submissionHost={workspec.submissionHost} batchID={workspec.batchID}")
0115
0116 data = worker_data_map[workspec.workerID]
0117
0118 ce_info_dict = data["ce_info_dict"]
0119 workspec.computingElement = ce_info_dict.get("ce_endpoint", "")
0120
0121 batch_log_dict = data["batch_log_dict"]
0122 (clusterid, procid) = get_job_id_tuple_from_batchid(workspec.batchID)
0123 batch_log = _condor_macro_replace(batch_log_dict["batch_log"], ClusterId=clusterid, ProcId=procid).format(**placeholder_map)
0124 batch_stdout = _condor_macro_replace(batch_log_dict["batch_stdout"], ClusterId=clusterid, ProcId=procid).format(**placeholder_map)
0125 batch_stderr = _condor_macro_replace(batch_log_dict["batch_stderr"], ClusterId=clusterid, ProcId=procid).format(**placeholder_map)
0126 try:
0127 batch_jdl = f"{batch_stderr[:-4]}.jdl"
0128 except Exception:
0129 batch_jdl = None
0130 workspec.set_log_file("batch_log", batch_log)
0131 workspec.set_log_file("stdout", batch_stdout)
0132 workspec.set_log_file("stderr", batch_stderr)
0133 workspec.set_log_file("jdl", batch_jdl)
0134 if not workspec.get_jobspec_list():
0135 tmpLog.debug(f"No jobspec associated in the worker of workerID={workspec.workerID}")
0136 else:
0137 for jobSpec in workspec.get_jobspec_list():
0138
0139 jobSpec.set_one_attribute("pilotID", workspec.workAttributes["stdOut"])
0140 jobSpec.set_one_attribute("pilotLog", workspec.workAttributes["batchLog"])
0141 tmpLog.debug(f"Done set_log_file after submission of workerID={workspec.workerID}")
0142 tmpRetVal = (True, "")
0143 worker_retval_map[workspec.workerID] = (tmpRetVal, workspec.get_changed_attributes())
0144 else:
0145
0146 tmpLog.debug(f"failed to submit workers to submissionHost={host} ; {ret_err_str}")
0147 for val in val_list:
0148 workspec = val[0]
0149 errStr = f"submission failed: {ret_err_str}"
0150 tmpLog.error(errStr)
0151 tmpRetVal = (None, errStr)
0152 worker_retval_map[workspec.workerID] = (tmpRetVal, workspec.get_changed_attributes())
0153
0154 retValList = [worker_retval_map[w_id] for w_id in workerIDs_list]
0155 return retValList
0156
0157
0158 def make_a_jdl(
0159 workspec,
0160 template,
0161 n_node,
0162 n_core_per_node,
0163 log_dir,
0164 panda_queue_name,
0165 executable_file,
0166 x509_user_proxy,
0167 log_subdir=None,
0168 ce_info_dict=dict(),
0169 batch_log_dict=dict(),
0170 pilot_url=None,
0171 pilot_args="",
0172 special_par="",
0173 harvester_queue_config=None,
0174 is_unified_queue=False,
0175 pilot_version="unknown",
0176 python_version="unknown",
0177 prod_rc_permille=0,
0178 token_dir=None,
0179 panda_token_filename=None,
0180 panda_token_dir=None,
0181 panda_token_key_path=None,
0182 is_gpu_resource=False,
0183 n_core_factor=1,
0184 custom_submit_attr_dict=None,
0185 cric_panda_site=None,
0186 **kwarg,
0187 ):
0188 """
0189 make a condor jdl for a worker
0190 """
0191
0192 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID} resourceType={workspec.resourceType}", method_name="make_a_jdl")
0193
0194
0195 n_core_total = workspec.nCore if workspec.nCore else n_core_per_node
0196 request_ram = max(workspec.minRamCount, 1 * n_core_total) if workspec.minRamCount else 1 * n_core_total
0197 request_disk = workspec.maxDiskCount * 1024 if workspec.maxDiskCount else 1
0198 request_walltime = workspec.maxWalltime if workspec.maxWalltime else 0
0199 io_intensity = workspec.ioIntensity if workspec.ioIntensity else 0
0200 ce_info_dict = ce_info_dict.copy()
0201 batch_log_dict = batch_log_dict.copy()
0202 custom_submit_attr_dict = dict() if custom_submit_attr_dict is None else custom_submit_attr_dict.copy()
0203
0204 if special_par:
0205 special_par_attr_list = [
0206 "queue",
0207 "maxWallTime",
0208 "xcount",
0209 ]
0210 _match_special_par_dict = {attr: re.search(f"\\({attr}=([^)]+)\\)", special_par) for attr in special_par_attr_list}
0211 for attr, _match in _match_special_par_dict.items():
0212 if not _match:
0213 continue
0214 elif attr == "queue":
0215 ce_info_dict["ce_queue_name"] = str(_match.group(1))
0216 elif attr == "maxWallTime":
0217 request_walltime = int(_match.group(1))
0218 elif attr == "xcount":
0219 n_core_total = int(_match.group(1))
0220 tmpLog.debug(f"job attributes override by CRIC special_par: {attr}={str(_match.group(1))}")
0221
0222 n_core_total_factor = n_core_total * n_core_factor
0223 if n_node is None:
0224 n_node = ceil(n_core_total / n_core_per_node)
0225 request_ram_factor = request_ram * n_core_factor
0226 request_ram_bytes = request_ram * 2**20
0227 request_ram_bytes_factor = request_ram * 2**20 * n_core_factor
0228 request_ram_per_core = ceil(request_ram / n_core_total)
0229 request_ram_bytes_per_core = ceil(request_ram_bytes / n_core_total)
0230 request_cputime = request_walltime * n_core_total
0231 request_walltime_minute = ceil(request_walltime / 60)
0232 request_cputime_minute = ceil(request_cputime / 60)
0233
0234 pilot_opt_dict = submitter_common.get_complicated_pilot_options(
0235 pilot_type=workspec.pilotType,
0236 pilot_url=pilot_url,
0237 pilot_version=pilot_version,
0238 prod_source_label=harvester_queue_config.get_source_label(workspec.jobType),
0239 prod_rc_permille=prod_rc_permille,
0240 )
0241 prod_source_label = pilot_opt_dict["prod_source_label"]
0242 pilot_type_opt = pilot_opt_dict["pilot_type_opt"]
0243 pilot_url_str = pilot_opt_dict["pilot_url_str"]
0244 pilot_debug_str = pilot_opt_dict["pilot_debug_str"]
0245 tmpLog.debug(f"pilot options: {pilot_opt_dict}")
0246
0247 token_filename = None
0248 if token_dir is not None and ce_info_dict.get("ce_endpoint"):
0249 token_filename = endpoint_to_filename(ce_info_dict["ce_endpoint"])
0250 token_path = None
0251 if token_dir is not None and token_filename is not None:
0252 token_path = os.path.join(token_dir, token_filename)
0253 else:
0254 tmpLog.warning(f"token_path is None: site={panda_queue_name}, token_dir={token_dir} , token_filename={token_filename}")
0255
0256 panda_token_path = None
0257 if panda_token_dir is not None and panda_token_filename is not None:
0258 panda_token_path = os.path.join(panda_token_dir, panda_token_filename)
0259 else:
0260
0261 pass
0262
0263 panda_token_key_filename = None
0264 if panda_token_key_path is not None:
0265 panda_token_key_filename = os.path.basename(panda_token_key_path)
0266
0267 custom_submit_attr_str_list = []
0268 for attr_key, attr_value in custom_submit_attr_dict.items():
0269 custom_submit_attr_str_list.append(f"+{attr_key} = {attr_value}")
0270 custom_submit_attr_str = "\n".join(custom_submit_attr_str_list)
0271
0272 tmpFile = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="_submit.sdf", dir=workspec.get_access_point())
0273
0274
0275 rt_mapper = ResourceTypeMapper()
0276 all_resource_types = rt_mapper.get_all_resource_types()
0277
0278
0279 jobspec_filename = harvester_queue_config.messenger.get("jobSpecFileName", base_messenger.jobSpecFileName)
0280
0281
0282 placeholder_map = {
0283 "sdfPath": tmpFile.name,
0284 "executableFile": executable_file,
0285 "jobSpecFileName": jobspec_filename,
0286 "nCorePerNode": n_core_per_node,
0287 "nCoreTotal": n_core_total_factor,
0288 "nNode": n_node,
0289 "nCoreFactor": n_core_factor,
0290 "requestRam": request_ram_factor,
0291 "requestRamBytes": request_ram_bytes_factor,
0292 "requestRamPerCore": request_ram_per_core,
0293 "requestRamBytesPerCore": request_ram_bytes_per_core,
0294 "requestDisk": request_disk,
0295 "requestWalltime": request_walltime,
0296 "requestWalltimeMinute": request_walltime_minute,
0297 "requestCputime": request_cputime,
0298 "requestCputimeMinute": request_cputime_minute,
0299 "accessPoint": workspec.accessPoint,
0300 "harvesterID": harvester_config.master.harvester_id,
0301 "workerID": workspec.workerID,
0302 "computingSite": workspec.computingSite,
0303 "pandaQueueName": panda_queue_name,
0304 "x509UserProxy": x509_user_proxy,
0305 "ceEndpoint": ce_info_dict.get("ce_endpoint", ""),
0306 "ceHostname": ce_info_dict.get("ce_hostname", ""),
0307 "ceFlavour": ce_info_dict.get("ce_flavour", ""),
0308 "ceJobmanager": ce_info_dict.get("ce_jobmanager", ""),
0309 "ceQueueName": ce_info_dict.get("ce_queue_name", ""),
0310 "ceVersion": ce_info_dict.get("ce_version", ""),
0311 "logDir": log_dir,
0312 "logSubdir": log_subdir,
0313 "prodSourceLabel": prod_source_label,
0314 "jobType": workspec.jobType,
0315 "resourceType": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, all_resource_types),
0316 "pilotResourceTypeOption": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, all_resource_types, is_pilot_option=True),
0317 "ioIntensity": io_intensity,
0318 "pilotType": pilot_type_opt,
0319 "pilotUrlOption": pilot_url_str,
0320 "pilotVersion": pilot_version,
0321 "pilotPythonOption": submitter_common.get_python_version_option(python_version, prod_source_label),
0322 "pilotDebugOption": pilot_debug_str,
0323 "pilotArgs": pilot_args,
0324 "submissionHost": workspec.submissionHost,
0325 "submissionHostShort": workspec.submissionHost.split(".")[0],
0326 "ceARCGridType": ce_info_dict.get("ce_grid_type", "arc"),
0327 "tokenDir": token_dir,
0328 "tokenFilename": token_filename,
0329 "tokenPath": token_path,
0330 "pandaTokenFilename": panda_token_filename,
0331 "pandaTokenPath": panda_token_path,
0332 "pandaTokenKeyFilename": panda_token_key_filename,
0333 "pandaTokenKeyPath": panda_token_key_path,
0334 "pilotJobLabel": submitter_common.get_joblabel(prod_source_label),
0335 "pilotJobType": submitter_common.get_pilot_job_type(workspec.jobType),
0336 "requestGpus": 1 if is_gpu_resource else 0,
0337 "requireGpus": is_gpu_resource,
0338 "customSubmitAttributes": custom_submit_attr_str,
0339 "cricPandaSite": cric_panda_site,
0340 }
0341
0342 gtag = batch_log_dict.get("gtag", "fake_GTAG_string").format(**placeholder_map)
0343 placeholder_map["gtag"] = gtag
0344
0345
0346 jdl_str = template.format(**placeholder_map)
0347
0348 tmpFile.write(jdl_str)
0349 tmpFile.close()
0350 tmpLog.debug(f"saved sdf at {tmpFile.name}")
0351 tmpLog.debug("done")
0352 return jdl_str, placeholder_map
0353
0354
0355 def parse_batch_job_filename(value_str, file_dir, batchID, guess=False):
0356 """
0357 parse log, stdout, stderr filename
0358 """
0359 _filename = os.path.basename(value_str)
0360 if guess:
0361
0362 return _filename
0363 else:
0364 _sanitized_list = re.sub("\{(\w+)\}|\[(\w+)\]|\((\w+)\)|#(\w+)#|\$", "", _filename).split(".")
0365 _prefix = _sanitized_list[0]
0366 _suffix = _sanitized_list[-1] if len(_sanitized_list) > 1 else ""
0367
0368 for _f in os.listdir(file_dir):
0369 if re.match(f"{_prefix}(.*)\\.{batchID}\\.(.*)\\.{_suffix}", _f):
0370 return _f
0371 return None
0372
0373
0374
0375 class HTCondorSubmitter(PluginBase):
0376
0377 def __init__(self, **kwarg):
0378 tmpLog = core_utils.make_logger(baseLogger, method_name="__init__")
0379 self.logBaseURL = None
0380 if hasattr(self, "useFQDN") and self.useFQDN:
0381 self.hostname = socket.getfqdn()
0382 else:
0383 self.hostname = socket.gethostname().split(".")[0]
0384 PluginBase.__init__(self, **kwarg)
0385
0386 extra_plugin_configs = {}
0387 try:
0388 extra_plugin_configs = harvester_config.master.extraPluginConfigs["HTCondorSubmitter"]
0389 except AttributeError:
0390 pass
0391 except KeyError:
0392 pass
0393
0394 self.nProcesses = getattr(self, "nProcesses", 1)
0395 if (not self.nProcesses) or (self.nProcesses < 1):
0396 self.nProcesses = 1
0397
0398 self.nNode = getattr(self, "nNode", None)
0399
0400 self.nCorePerNode = getattr(self, "nCorePerNode", None)
0401
0402 self.nCoreFactor = getattr(self, "nCoreFactor", 1)
0403 if type(self.nCoreFactor) in [dict]:
0404
0405
0406 pass
0407 else:
0408 self.nCoreFactor = int(self.nCoreFactor)
0409 if (not self.nCoreFactor) or (self.nCoreFactor < 1):
0410 self.nCoreFactor = 1
0411
0412 self.executableFile = getattr(self, "executableFile", None)
0413
0414 self.logDir = getattr(self, "logDir", os.getenv("TMPDIR") or "/tmp")
0415 if self.logDir and ("$hostname" in self.logDir or "${hostname}" in self.logDir):
0416 self.logDir = self.logDir.replace("$hostname", self.hostname).replace("${hostname}", self.hostname)
0417 try:
0418 if not os.path.exists(self.logDir):
0419 os.mkdir(self.logDir)
0420 except Exception as ex:
0421 tmpLog.debug(f"Failed to create logDir({self.logDir}): {str(ex)}")
0422
0423 self.logBaseURL = getattr(self, "logBaseURL", None)
0424 if self.logBaseURL and ("$hostname" in self.logBaseURL or "${hostname}" in self.logBaseURL):
0425 self.logBaseURL = self.logBaseURL.replace("$hostname", self.hostname).replace("${hostname}", self.hostname)
0426 if self.logBaseURL and "${harvester_id}" in self.logBaseURL:
0427 self.logBaseURL = self.logBaseURL.replace("${harvester_id}", harvester_config.master.harvester_id)
0428
0429 self.x509UserProxy = getattr(self, "x509UserProxy", os.getenv("X509_USER_PROXY"))
0430
0431 self.x509UserProxyAnalysis = getattr(self, "x509UserProxyAnalysis", os.getenv("X509_USER_PROXY_ANAL"))
0432
0433 self.tokenDir = getattr(self, "tokenDir", None)
0434
0435 self.tokenDirAnalysis = getattr(self, "tokenDirAnalysis", None)
0436
0437 self.pandaTokenFilename = getattr(self, "pandaTokenFilename", None)
0438 self.pandaTokenDir = getattr(self, "pandaTokenDir", None)
0439 self.pandaTokenKeyPath = getattr(self, "pandaTokenKeyPath", None)
0440
0441 self.useCRIC = getattr(self, "useCRIC", getattr(self, "useAtlasCRIC", getattr(self, "useAtlasAGIS", False)))
0442
0443 self.useCRICGridCE = getattr(self, "useCRICGridCE", getattr(self, "useAtlasGridCE", False))
0444 self.useCRIC = self.useCRIC or self.useCRICGridCE
0445
0446 self.templateFile = getattr(self, "templateFile", None)
0447
0448 self.CEtemplateDir = getattr(self, "CEtemplateDir", "")
0449
0450 self.condorSchedd = getattr(self, "condorSchedd", None)
0451 if self.condorSchedd is not None and ("$hostname" in self.condorSchedd or "${hostname}" in self.condorSchedd):
0452 self.condorSchedd = self.condorSchedd.replace("$hostname", self.hostname).replace("${hostname}", self.hostname)
0453 self.condorPool = getattr(self, "condorPool", None)
0454 if self.condorPool is not None and ("$hostname" in self.condorPool or "${hostname}" in self.condorPool):
0455 self.condorPool = self.condorPool.replace("$hostname", self.hostname).replace("${hostname}", self.hostname)
0456
0457 self.condorHostConfig = getattr(self, "condorHostConfig", False)
0458 if self.condorHostConfig:
0459 try:
0460 self.condorSchedd = []
0461 self.condorPool = []
0462 self.condorHostWeight = []
0463 with open(self.condorHostConfig, "r") as f:
0464 condor_host_config_map = json.load(f)
0465 for _schedd, _cm in condor_host_config_map.items():
0466 _pool = _cm["pool"]
0467 _weight = int(_cm["weight"])
0468 self.condorSchedd.append(_schedd)
0469 self.condorPool.append(_pool)
0470 self.condorHostWeight.append(_weight)
0471 except Exception as e:
0472 tmpLog.error(f"error when parsing condorHostConfig json file; {e.__class__.__name__}: {e}")
0473 raise
0474 else:
0475 if isinstance(self.condorSchedd, list):
0476 self.condorHostWeight = [1] * len(self.condorSchedd)
0477 else:
0478 self.condorHostWeight = [1]
0479
0480 self.useSpool = getattr(self, "useSpool", False)
0481
0482 self.minBulkToRandomizedSchedd = getattr(self, "minBulkToRandomizedSchedd", 20)
0483
0484 self.useAnalysisCredentials = getattr(self, "useAnalysisCredentials", False)
0485
0486 self.rcPilotRandomWeightPermille = getattr(self, "rcPilotRandomWeightPermille", 0)
0487
0488 self.submit_arc_grid_type = "arc"
0489 if extra_plugin_configs.get("submit_arc_grid_type") == "nordugrid":
0490 self.submit_arc_grid_type = "nordugrid"
0491
0492 self.ceStatsLock = threading.Lock()
0493 self.ceStats = dict()
0494
0495 self._allowed_cric_attrs = [
0496 "ce_fairshare_percent",
0497 "pilot_url",
0498 "pilot_args",
0499 ]
0500 self._allowed_cric_attr_prefixes = [
0501 "jdl.plusattr.",
0502 ]
0503
0504
0505 def get_ce_statistics(self, site_name, queue_config, n_new_workers, time_window=21600):
0506 if site_name in self.ceStats:
0507 return self.ceStats[site_name]
0508 with self.ceStatsLock:
0509 if site_name in self.ceStats:
0510 return self.ceStats[site_name]
0511 else:
0512 worker_limits_dict, _ = self.dbInterface.get_worker_limits(self.queueName, queue_config)
0513 worker_ce_stats_dict = self.dbInterface.get_worker_ce_stats(self.queueName)
0514 worker_ce_backend_throughput_dict = self.dbInterface.get_worker_ce_backend_throughput(self.queueName, time_window=time_window)
0515 return (worker_limits_dict, worker_ce_stats_dict, worker_ce_backend_throughput_dict, time_window, n_new_workers)
0516
0517
0518 def submit_workers(self, workspec_list):
0519 tmpLog = self.make_logger(baseLogger, f"site={self.queueName}", method_name="submit_workers")
0520
0521 nWorkers = len(workspec_list)
0522 tmpLog.debug(f"start nWorkers={nWorkers}")
0523
0524
0525 to_submit_any = True
0526
0527
0528 timeNow = core_utils.naive_utcnow()
0529 log_subdir = timeNow.strftime("%y-%m-%d_%H")
0530 log_subdir_path = os.path.join(self.logDir, log_subdir)
0531 if self.condorSchedd is None or not self.useSpool:
0532 try:
0533 os.mkdir(log_subdir_path)
0534 except OSError as e:
0535 if e.errno != errno.EEXIST:
0536 raise
0537 else:
0538 pass
0539
0540
0541 _queueConfigMapper = QueueConfigMapper()
0542 harvester_queue_config = _queueConfigMapper.get_queue(self.queueName)
0543
0544
0545 associated_params_dict = {}
0546
0547 is_grandly_unified_queue = False
0548
0549 if self.useCRIC:
0550 panda_queues_dict = PandaQueuesDict()
0551 panda_queues_dict_last_refresh = core_utils.naive_utcfromtimestamp(panda_queues_dict.last_refresh_ts)
0552 tmpLog.debug(f"PandaQueuesDict last refresh at {panda_queues_dict_last_refresh}")
0553 panda_queue_name = panda_queues_dict.get_panda_queue_name(self.queueName)
0554 this_panda_queue_dict = panda_queues_dict.get(self.queueName, dict())
0555 is_grandly_unified_queue = panda_queues_dict.is_grandly_unified_queue(self.queueName)
0556
0557
0558 for key, val in panda_queues_dict.get_harvester_params(self.queueName).items():
0559 if not isinstance(key, str):
0560 continue
0561 if key in self._allowed_cric_attrs or any([key.startswith(the_prefix) for the_prefix in self._allowed_cric_attr_prefixes]):
0562 if isinstance(val, str):
0563
0564 val = re.sub(r"[;$~`]*", "", val)
0565 associated_params_dict[key] = val
0566 else:
0567 panda_queues_dict = dict()
0568 panda_queue_name = self.queueName
0569 this_panda_queue_dict = dict()
0570
0571
0572 n_core_per_node_from_queue = this_panda_queue_dict.get("corecount", 1) if this_panda_queue_dict.get("corecount", 1) else 1
0573 is_unified_queue = this_panda_queue_dict.get("capability", "") == "ucore"
0574 pilot_url = associated_params_dict.get("pilot_url")
0575 pilot_args = associated_params_dict.get("pilot_args", "")
0576 pilot_version = str(this_panda_queue_dict.get("pilot_version", "current"))
0577 python_version = str(this_panda_queue_dict.get("python_version", "3"))
0578 is_gpu_resource = this_panda_queue_dict.get("resource_type", "") == "gpu"
0579 ce_fairshare_percent = associated_params_dict.get("ce_fairshare_percent", 50)
0580 cric_panda_site = this_panda_queue_dict.get("panda_site")
0581 custom_submit_attr_dict = {}
0582 for k, v in associated_params_dict.items():
0583
0584 try:
0585 the_prefix = "jdl.plusattr."
0586 if k.startswith(the_prefix):
0587 attr_key = k[len(the_prefix) :]
0588 attr_value = str(v)
0589 if not re.fullmatch(r"[a-zA-Z_0-9][a-zA-Z_0-9.\-]*", attr_key):
0590
0591 tmpLog.warning(f'custom submit attribute "{k}: {v}" has invalid key; skipped')
0592 continue
0593 if not re.fullmatch(r'[a-zA-Z_0-9.\-,=&|()!?" ]+', attr_value):
0594
0595 tmpLog.warning(f'custom submit attribute "{k}: {v}" has invalid value; skipped')
0596 continue
0597 custom_submit_attr_dict[attr_key] = attr_value
0598 except Exception as e:
0599 tmpLog.warning(f'Got {e} with custom submit attributes "{k}: {v}"; skipped')
0600 continue
0601
0602
0603 n_node = getattr(self, "nNode", None)
0604 n_core_per_node = getattr(self, "nCorePerNode", n_core_per_node_from_queue)
0605 if not n_core_per_node:
0606 n_core_per_node = n_core_per_node_from_queue
0607
0608
0609 n_bulks = ceil(nWorkers / self.minBulkToRandomizedSchedd)
0610 if isinstance(self.condorSchedd, list) and len(self.condorSchedd) > 0:
0611 orig_list = []
0612 if isinstance(self.condorPool, list) and len(self.condorPool) > 0:
0613 for _schedd, _pool, _weight in zip(self.condorSchedd, self.condorPool, self.condorHostWeight):
0614 orig_list.extend([(_schedd, _pool)] * _weight)
0615 else:
0616 for _schedd, _weight in zip(self.condorSchedd, self.condorHostWeight):
0617 orig_list.extend([(_schedd, self.condorPool)] * _weight)
0618 if n_bulks < len(orig_list):
0619 schedd_pool_choice_list = random.sample(orig_list, n_bulks)
0620 else:
0621 schedd_pool_choice_list = orig_list
0622 else:
0623 schedd_pool_choice_list = [(self.condorSchedd, self.condorPool)]
0624
0625
0626 special_par = ""
0627 ce_weighting = None
0628 if self.useCRICGridCE:
0629
0630 tmpLog.debug("Using CRIC Grid CE mode...")
0631 queues_from_queue_list = this_panda_queue_dict.get("queues", [])
0632 special_par = this_panda_queue_dict.get("special_par", "")
0633 ce_auxiliary_dict = {}
0634 for _queue_dict in queues_from_queue_list:
0635 if not (
0636 _queue_dict.get("ce_endpoint")
0637 and str(_queue_dict.get("ce_state", "")).upper() == "ACTIVE"
0638 and str(_queue_dict.get("ce_flavour", "")).lower() in set(["arc-ce", "cream-ce", "htcondor-ce"])
0639 ):
0640 continue
0641 ce_info_dict = _queue_dict.copy()
0642
0643
0644 _match_ce_endpoint = re.match("^(\w+)://(\w+)", ce_info_dict.get("ce_endpoint", ""))
0645 ce_endpoint_prefix = ""
0646 if _match_ce_endpoint:
0647 ce_endpoint_prefix = _match_ce_endpoint.group(1)
0648 ce_endpoint_from_queue = re.sub("^\w+://", "", ce_info_dict.get("ce_endpoint", ""))
0649 ce_flavour_str = str(ce_info_dict.get("ce_flavour", "")).lower()
0650 ce_version_str = str(ce_info_dict.get("ce_version", "")).lower()
0651
0652 ce_info_dict["ce_grid_type"] = ""
0653 if ce_flavour_str == "arc-ce":
0654 ce_info_dict["ce_grid_type"] = self.submit_arc_grid_type
0655 ce_info_dict["ce_hostname"] = re.sub(":\w*", "", ce_endpoint_from_queue)
0656 if ce_info_dict["ce_grid_type"] == "arc":
0657 default_port = None
0658 if ce_info_dict["ce_hostname"] == ce_endpoint_from_queue:
0659
0660 default_port = 443
0661 else:
0662
0663 ce_endpoint_from_queue = re.sub(r":2811$", ":443", ce_endpoint_from_queue)
0664 ce_info_dict["ce_endpoint"] = f"{ce_endpoint_from_queue}{f':{default_port}' if default_port is not None else ''}"
0665 else:
0666 if ce_info_dict["ce_hostname"] == ce_endpoint_from_queue:
0667
0668 default_port_map = {
0669 "cream-ce": 8443,
0670 "arc-ce": 2811,
0671 "htcondor-ce": 9619,
0672 }
0673 if ce_flavour_str in default_port_map:
0674 default_port = default_port_map[ce_flavour_str]
0675 ce_info_dict["ce_endpoint"] = f"{ce_endpoint_from_queue}:{default_port}"
0676 if ce_flavour_str == "arc-ce":
0677 ce_info_dict["ce_endpoint"] = f"{ce_endpoint_from_queue}"
0678 tmpLog.debug(f'Got pilot version: "{pilot_version}"; CE endpoint: "{ce_endpoint_from_queue}", flavour: "{ce_flavour_str}"')
0679 ce_endpoint = ce_info_dict.get("ce_endpoint")
0680 if ce_endpoint in ce_auxiliary_dict and str(ce_info_dict.get("ce_queue_name", "")).lower() == "default":
0681 pass
0682 else:
0683 ce_auxiliary_dict[ce_endpoint] = ce_info_dict
0684
0685 n_qualified_ce = len(ce_auxiliary_dict)
0686 if n_qualified_ce > 0:
0687
0688 tmpLog.debug("Get CE weighting")
0689 worker_ce_all_tuple = self.get_ce_statistics(self.queueName, harvester_queue_config, nWorkers)
0690 is_slave_queue = harvester_queue_config.runMode == "slave"
0691 ce_weighting = submitter_common.get_ce_weighting(
0692 ce_endpoint_list=list(ce_auxiliary_dict.keys()),
0693 worker_ce_all_tuple=worker_ce_all_tuple,
0694 is_slave_queue=is_slave_queue,
0695 fairshare_percent=ce_fairshare_percent,
0696 )
0697 stats_weighting_display_str = submitter_common.get_ce_stats_weighting_display(
0698 ce_auxiliary_dict.keys(), worker_ce_all_tuple, ce_weighting, ce_fairshare_percent
0699 )
0700 tmpLog.debug(f"CE stats and weighting: {stats_weighting_display_str}")
0701 else:
0702 tmpLog.error("No valid CE endpoint found")
0703 to_submit_any = False
0704
0705 def _handle_one_worker(workspec, to_submit=to_submit_any):
0706
0707 tmpLog = core_utils.make_logger(
0708 baseLogger, f"site={self.queueName} workerID={workspec.workerID}, resourceType={workspec.resourceType}", method_name="_handle_one_worker"
0709 )
0710
0711 def _choose_credential():
0712 """
0713 Choose the credential based on the job type
0714 """
0715 proxy = self.x509UserProxy
0716 token_dir = self.tokenDir
0717 if self.useAnalysisCredentials:
0718 if self.x509UserProxyAnalysis:
0719 tmpLog.debug("Taking analysis proxy")
0720 proxy = self.x509UserProxyAnalysis
0721 if self.tokenDirAnalysis:
0722 tmpLog.debug("Taking analysis token_dir")
0723 token_dir = self.tokenDirAnalysis
0724 else:
0725 tmpLog.debug("Taking default proxy")
0726 if self.tokenDir:
0727 tmpLog.debug("Taking default token_dir")
0728 return proxy, token_dir
0729
0730 def get_core_factor(workspec):
0731 try:
0732 if type(self.nCoreFactor) in [dict]:
0733 if workspec.jobType in self.nCoreFactor:
0734 job_type = workspec.jobType
0735 else:
0736 job_type = "Any"
0737 if is_unified_queue:
0738 resource_type = workspec.resourceType
0739 else:
0740 resource_type = "Undefined"
0741 n_core_factor = self.nCoreFactor.get(job_type, {}).get(resource_type, 1)
0742 return int(n_core_factor)
0743 else:
0744 return int(self.nCoreFactor)
0745 except Exception as ex:
0746 tmpLog.warning(f"Failed to get core factor: {ex}")
0747 return 1
0748
0749
0750 ce_info_dict = dict()
0751 batch_log_dict = dict()
0752 data = {
0753 "workspec": workspec,
0754 "to_submit": to_submit,
0755 }
0756 if to_submit:
0757 sdf_template_file = None
0758 if self.useCRICGridCE:
0759
0760 tmpLog.info("choose a CE...")
0761 ce_chosen = submitter_common.choose_ce(ce_weighting)
0762 try:
0763 ce_info_dict = ce_auxiliary_dict[ce_chosen].copy()
0764 except KeyError:
0765 tmpLog.info("Problem choosing CE with weighting. Choose an arbitrary CE endpoint")
0766 ce_info_dict = random.choice(list(ce_auxiliary_dict.values())).copy()
0767 ce_flavour_str = str(ce_info_dict.get("ce_flavour", "")).lower()
0768 tmpLog.debug(f"Got pilot version: \"{pilot_version}\"; CE endpoint: \"{ce_info_dict['ce_endpoint']}\", flavour: \"{ce_flavour_str}\"")
0769 if self.templateFile:
0770 sdf_template_file = self.templateFile
0771 elif os.path.isdir(self.CEtemplateDir) and ce_flavour_str:
0772 sdf_suffix_str = ""
0773 if ce_info_dict["ce_grid_type"] and ce_info_dict["ce_grid_type"] != "arc":
0774 sdf_suffix_str = f"_{ce_info_dict['ce_grid_type']}"
0775 sdf_template_filename = f"{ce_flavour_str}{sdf_suffix_str}.sdf"
0776 sdf_template_file = os.path.join(self.CEtemplateDir, sdf_template_filename)
0777 else:
0778 if self.templateFile:
0779 sdf_template_file = self.templateFile
0780 try:
0781
0782 if self.ceHostname and isinstance(self.ceHostname, list) and len(self.ceHostname) > 0:
0783 if isinstance(self.ceEndpoint, list) and len(self.ceEndpoint) > 0:
0784 ce_info_dict["ce_hostname"], ce_info_dict["ce_endpoint"] = random.choice(list(zip(self.ceHostname, self.ceEndpoint)))
0785 else:
0786 ce_info_dict["ce_hostname"] = random.choice(self.ceHostname)
0787 ce_info_dict["ce_endpoint"] = self.ceEndpoint
0788 else:
0789 ce_info_dict["ce_hostname"] = self.ceHostname
0790 ce_info_dict["ce_endpoint"] = self.ceEndpoint
0791 except AttributeError:
0792 pass
0793 tmpLog.debug(f"Got pilot version: \"{pilot_version}\"; CE endpoint: \"{ce_info_dict.get('ce_endpoint')}\"")
0794 try:
0795
0796 if self.ceQueueName:
0797 ce_info_dict["ce_queue_name"] = self.ceQueueName
0798 except AttributeError:
0799 pass
0800
0801 try:
0802 tmpFile = open(sdf_template_file)
0803 sdf_template_raw = tmpFile.read()
0804 tmpFile.close()
0805 except AttributeError:
0806 tmpLog.error("No valid templateFile found. Maybe templateFile, CEtemplateDir invalid, or no valid CE found")
0807 to_submit = False
0808 return data
0809 else:
0810
0811 sdf_template_str_list = []
0812 for _line in sdf_template_raw.split("\n"):
0813 if _line.startswith("#"):
0814 continue
0815 sdf_template_str_list.append(_line)
0816 _match_batch_log = re.match("log = (.+)", _line)
0817 _match_stdout = re.match("output = (.+)", _line)
0818 _match_stderr = re.match("error = (.+)", _line)
0819 if _match_batch_log:
0820 batch_log_value = _match_batch_log.group(1)
0821 continue
0822 if _match_stdout:
0823 stdout_value = _match_stdout.group(1)
0824 continue
0825 if _match_stderr:
0826 stderr_value = _match_stderr.group(1)
0827 continue
0828 sdf_template = "\n".join(sdf_template_str_list)
0829
0830 condor_schedd, condor_pool = random.choice(schedd_pool_choice_list)
0831
0832 if not condor_schedd and not condor_pool:
0833 workspec.submissionHost = "LOCAL"
0834 else:
0835 workspec.submissionHost = f"{condor_schedd},{condor_pool}"
0836 tmpLog.debug(f"set submissionHost={workspec.submissionHost}")
0837
0838 if self.logBaseURL and "[ScheddHostname]" in self.logBaseURL:
0839 schedd_hostname = re.sub(
0840 r"(?:[a-zA-Z0-9_.\-]*@)?([a-zA-Z0-9.\-]+)(?::[0-9]+)?",
0841 lambda matchobj: matchobj.group(1) if matchobj.group(1) else "",
0842 condor_schedd,
0843 )
0844 log_base_url = re.sub(r"\[ScheddHostname\]", schedd_hostname, self.logBaseURL)
0845 else:
0846 log_base_url = self.logBaseURL
0847
0848 if not (log_base_url is None):
0849 if workspec.batchID:
0850 batchID = workspec.batchID
0851 guess = False
0852 else:
0853 batchID = ""
0854 guess = True
0855 batch_log_filename = parse_batch_job_filename(value_str=batch_log_value, file_dir=log_subdir_path, batchID=batchID, guess=guess)
0856 stdout_path_file_name = parse_batch_job_filename(value_str=stdout_value, file_dir=log_subdir_path, batchID=batchID, guess=guess)
0857 stderr_path_filename = parse_batch_job_filename(value_str=stderr_value, file_dir=log_subdir_path, batchID=batchID, guess=guess)
0858 batch_log = f"{log_base_url}/{log_subdir}/{batch_log_filename}"
0859 batch_stdout = f"{log_base_url}/{log_subdir}/{stdout_path_file_name}"
0860 batch_stderr = f"{log_base_url}/{log_subdir}/{stderr_path_filename}"
0861 workspec.set_log_file("batch_log", batch_log)
0862 workspec.set_log_file("stdout", batch_stdout)
0863 workspec.set_log_file("stderr", batch_stderr)
0864 batch_log_dict["batch_log"] = batch_log
0865 batch_log_dict["batch_stdout"] = batch_stdout
0866 batch_log_dict["batch_stderr"] = batch_stderr
0867 batch_log_dict["gtag"] = workspec.workAttributes["stdOut"]
0868 tmpLog.debug("Done set_log_file before submission")
0869 tmpLog.debug("Done jobspec attribute setting")
0870
0871
0872 proxy, token_dir = _choose_credential()
0873
0874
0875 data.update(
0876 {
0877 "workspec": workspec,
0878 "to_submit": to_submit,
0879 "template": sdf_template,
0880 "executable_file": self.executableFile,
0881 "log_dir": self.logDir,
0882 "log_subdir": log_subdir,
0883 "n_node": n_node,
0884 "n_core_per_node": n_core_per_node,
0885 "n_core_factor": get_core_factor(workspec),
0886 "panda_queue_name": panda_queue_name,
0887 "x509_user_proxy": proxy,
0888 "ce_info_dict": ce_info_dict,
0889 "batch_log_dict": batch_log_dict,
0890 "special_par": special_par,
0891 "harvester_queue_config": harvester_queue_config,
0892 "is_unified_queue": is_unified_queue,
0893 "condor_schedd": condor_schedd,
0894 "condor_pool": condor_pool,
0895 "use_spool": self.useSpool,
0896 "pilot_url": pilot_url,
0897 "pilot_args": pilot_args,
0898 "pilot_version": pilot_version,
0899 "python_version": python_version,
0900 "token_dir": token_dir,
0901 "panda_token_filename": self.pandaTokenFilename,
0902 "panda_token_dir": self.pandaTokenDir,
0903 "panda_token_key_path": self.pandaTokenKeyPath,
0904 "is_unified_dispatch": True,
0905 "prod_rc_permille": self.rcPilotRandomWeightPermille,
0906 "is_gpu_resource": is_gpu_resource,
0907 "custom_submit_attr_dict": custom_submit_attr_dict,
0908 "cric_panda_site": cric_panda_site,
0909 }
0910 )
0911 return data
0912
0913 def _propagate_attributes(workspec, tmpVal):
0914
0915 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="_propagate_attributes")
0916 (retVal, tmpDict) = tmpVal
0917 workspec.set_attributes_with_dict(tmpDict)
0918 tmpLog.debug("Done workspec attributes propagation")
0919 return retVal
0920
0921 tmpLog.debug("finished preparing worker attributes")
0922
0923
0924 with ThreadPoolExecutor(self.nProcesses * 4) as thread_pool:
0925 dataIterator = thread_pool.map(_handle_one_worker, workspec_list)
0926 tmpLog.debug(f"{nWorkers} workers handled")
0927
0928
0929 retValList = submit_bag_of_workers(list(dataIterator))
0930 tmpLog.debug(f"{nWorkers} workers submitted")
0931
0932
0933 with ThreadPoolExecutor(self.nProcesses) as thread_pool:
0934 retIterator = thread_pool.map(lambda _wv_tuple: _propagate_attributes(*_wv_tuple), zip(workspec_list, retValList))
0935
0936 retList = list(retIterator)
0937 tmpLog.debug("done")
0938
0939 return retList