Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:04

0001 import errno
0002 import json
0003 import os
0004 import random
0005 import re
0006 import socket
0007 import tempfile
0008 import threading
0009 from concurrent.futures import ThreadPoolExecutor
0010 from math import ceil
0011 
0012 from pandaharvester.harvesterconfig import harvester_config
0013 from pandaharvester.harvestercore import core_utils
0014 from pandaharvester.harvestercore.plugin_base import PluginBase
0015 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0016 from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
0017 from pandaharvester.harvestermessenger.base_messenger import BaseMessenger
0018 from pandaharvester.harvestermisc.htcondor_utils import (
0019     CondorJobSubmit,
0020     get_job_id_tuple_from_batchid,
0021 )
0022 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0023 from pandaharvester.harvestermisc.token_utils import endpoint_to_filename
0024 from pandaharvester.harvestersubmitter import submitter_common
0025 
0026 # logger
0027 baseLogger = core_utils.setup_logger("htcondor_submitter")
0028 
0029 # base messenger instance
0030 base_messenger = BaseMessenger()
0031 
0032 
0033 def _condor_macro_replace(string, **kwarg):
0034     """
0035     Replace condor Macro from SDF file, return string
0036     """
0037     new_string = string
0038     macro_map = {
0039         "\$\(Cluster\)": str(kwarg["ClusterId"]),
0040         "\$\(Process\)": str(kwarg["ProcId"]),
0041     }
0042     for k, v in macro_map.items():
0043         new_string = re.sub(k, v, new_string)
0044     return new_string
0045 
0046 
0047 def submit_bag_of_workers(data_list):
0048     """
0049     submit a bag of workers
0050     """
0051     # make logger
0052     tmpLog = core_utils.make_logger(baseLogger, method_name="submit_bag_of_workers")
0053     # keep order of workers in data_list
0054     workerIDs_list = [data["workspec"].workerID for data in data_list]
0055     # initialization
0056     worker_retval_map = {}
0057     worker_data_map = {}
0058     host_jdl_list_workerid_map = {}
0059     # go
0060     for data in data_list:
0061         workspec = data["workspec"]
0062         workerID = workspec.workerID
0063         worker_data_map[workerID] = data
0064         to_submit = data["to_submit"]
0065         # no need to submit bad worker
0066         if not to_submit:
0067             errStr = f"{workerID} not submitted due to incomplete data of the worker"
0068             tmpLog.warning(errStr)
0069             tmpRetVal = (None, errStr)
0070             # return tmpRetVal, workspec.get_changed_attributes()
0071             worker_retval_map[workerID] = (tmpRetVal, workspec.get_changed_attributes())
0072         # attributes
0073         try:
0074             use_spool = data["use_spool"]
0075         except KeyError:
0076             errStr = f"{workerID} not submitted due to incomplete data of the worker"
0077             tmpLog.warning(errStr)
0078             tmpRetVal = (None, errStr)
0079             # return tmpRetVal, workspec.get_changed_attributes()
0080             worker_retval_map[workerID] = (tmpRetVal, workspec.get_changed_attributes())
0081         else:
0082             workspec.reset_changed_list()
0083             # fill in host_jdl_list_workerid_map
0084             a_jdl, placeholder_map = make_a_jdl(**data)
0085             val = (workspec, a_jdl, placeholder_map)
0086             try:
0087                 host_jdl_list_workerid_map[workspec.submissionHost].append(val)
0088             except KeyError:
0089                 host_jdl_list_workerid_map[workspec.submissionHost] = [val]
0090     # loop over submissionHost
0091     for host, val_list in host_jdl_list_workerid_map.items():
0092         # make jdl string of workers
0093         jdl_list = [val[1] for val in val_list]
0094         # condor job submit object
0095         tmpLog.debug(f"submitting to submissionHost={host}")
0096         # submit
0097         try:
0098             condor_job_submit = CondorJobSubmit(id=host)
0099             batchIDs_list, ret_err_str = condor_job_submit.submit(jdl_list, use_spool=use_spool)
0100         except Exception as e:
0101             batchIDs_list = None
0102             ret_err_str = f"Exception {e.__class__.__name__}: {e}"
0103         # result
0104         if batchIDs_list:
0105             # submitted
0106             n_workers = len(val_list)
0107             tmpLog.debug(f"submitted {n_workers} workers to submissionHost={host}")
0108             for val_i in range(n_workers):
0109                 val = val_list[val_i]
0110                 workspec = val[0]
0111                 placeholder_map = val[2]
0112                 # got batchID
0113                 workspec.batchID = batchIDs_list[val_i]
0114                 tmpLog.debug(f"workerID={workspec.workerID} submissionHost={workspec.submissionHost} batchID={workspec.batchID}")
0115                 # get worker data
0116                 data = worker_data_map[workspec.workerID]
0117                 # set computingElement
0118                 ce_info_dict = data["ce_info_dict"]
0119                 workspec.computingElement = ce_info_dict.get("ce_endpoint", "")
0120                 # set log
0121                 batch_log_dict = data["batch_log_dict"]
0122                 (clusterid, procid) = get_job_id_tuple_from_batchid(workspec.batchID)
0123                 batch_log = _condor_macro_replace(batch_log_dict["batch_log"], ClusterId=clusterid, ProcId=procid).format(**placeholder_map)
0124                 batch_stdout = _condor_macro_replace(batch_log_dict["batch_stdout"], ClusterId=clusterid, ProcId=procid).format(**placeholder_map)
0125                 batch_stderr = _condor_macro_replace(batch_log_dict["batch_stderr"], ClusterId=clusterid, ProcId=procid).format(**placeholder_map)
0126                 try:
0127                     batch_jdl = f"{batch_stderr[:-4]}.jdl"
0128                 except Exception:
0129                     batch_jdl = None
0130                 workspec.set_log_file("batch_log", batch_log)
0131                 workspec.set_log_file("stdout", batch_stdout)
0132                 workspec.set_log_file("stderr", batch_stderr)
0133                 workspec.set_log_file("jdl", batch_jdl)
0134                 if not workspec.get_jobspec_list():
0135                     tmpLog.debug(f"No jobspec associated in the worker of workerID={workspec.workerID}")
0136                 else:
0137                     for jobSpec in workspec.get_jobspec_list():
0138                         # using batchLog and stdOut URL as pilotID and pilotLog
0139                         jobSpec.set_one_attribute("pilotID", workspec.workAttributes["stdOut"])
0140                         jobSpec.set_one_attribute("pilotLog", workspec.workAttributes["batchLog"])
0141                 tmpLog.debug(f"Done set_log_file after submission of workerID={workspec.workerID}")
0142                 tmpRetVal = (True, "")
0143                 worker_retval_map[workspec.workerID] = (tmpRetVal, workspec.get_changed_attributes())
0144         else:
0145             # failed
0146             tmpLog.debug(f"failed to submit workers to submissionHost={host} ; {ret_err_str}")
0147             for val in val_list:
0148                 workspec = val[0]
0149                 errStr = f"submission failed: {ret_err_str}"
0150                 tmpLog.error(errStr)
0151                 tmpRetVal = (None, errStr)
0152                 worker_retval_map[workspec.workerID] = (tmpRetVal, workspec.get_changed_attributes())
0153     # make return list
0154     retValList = [worker_retval_map[w_id] for w_id in workerIDs_list]
0155     return retValList
0156 
0157 
0158 def make_a_jdl(
0159     workspec,
0160     template,
0161     n_node,
0162     n_core_per_node,
0163     log_dir,
0164     panda_queue_name,
0165     executable_file,
0166     x509_user_proxy,
0167     log_subdir=None,
0168     ce_info_dict=dict(),
0169     batch_log_dict=dict(),
0170     pilot_url=None,
0171     pilot_args="",
0172     special_par="",
0173     harvester_queue_config=None,
0174     is_unified_queue=False,
0175     pilot_version="unknown",
0176     python_version="unknown",
0177     prod_rc_permille=0,
0178     token_dir=None,
0179     panda_token_filename=None,
0180     panda_token_dir=None,
0181     panda_token_key_path=None,
0182     is_gpu_resource=False,
0183     n_core_factor=1,
0184     custom_submit_attr_dict=None,
0185     cric_panda_site=None,
0186     **kwarg,
0187 ):
0188     """
0189     make a condor jdl for a worker
0190     """
0191     # make logger
0192     tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID} resourceType={workspec.resourceType}", method_name="make_a_jdl")
0193     # Note: In workspec, unit of minRamCount and of maxDiskCount are both MB.
0194     #       In HTCondor SDF, unit of request_memory is MB, and request_disk is KB.
0195     n_core_total = workspec.nCore if workspec.nCore else n_core_per_node
0196     request_ram = max(workspec.minRamCount, 1 * n_core_total) if workspec.minRamCount else 1 * n_core_total
0197     request_disk = workspec.maxDiskCount * 1024 if workspec.maxDiskCount else 1
0198     request_walltime = workspec.maxWalltime if workspec.maxWalltime else 0
0199     io_intensity = workspec.ioIntensity if workspec.ioIntensity else 0
0200     ce_info_dict = ce_info_dict.copy()
0201     batch_log_dict = batch_log_dict.copy()
0202     custom_submit_attr_dict = dict() if custom_submit_attr_dict is None else custom_submit_attr_dict.copy()
0203     # possible override by CRIC special_par
0204     if special_par:
0205         special_par_attr_list = [
0206             "queue",
0207             "maxWallTime",
0208             "xcount",
0209         ]
0210         _match_special_par_dict = {attr: re.search(f"\\({attr}=([^)]+)\\)", special_par) for attr in special_par_attr_list}
0211         for attr, _match in _match_special_par_dict.items():
0212             if not _match:
0213                 continue
0214             elif attr == "queue":
0215                 ce_info_dict["ce_queue_name"] = str(_match.group(1))
0216             elif attr == "maxWallTime":
0217                 request_walltime = int(_match.group(1))
0218             elif attr == "xcount":
0219                 n_core_total = int(_match.group(1))
0220             tmpLog.debug(f"job attributes override by CRIC special_par: {attr}={str(_match.group(1))}")
0221     # derived job attributes
0222     n_core_total_factor = n_core_total * n_core_factor
0223     if n_node is None:
0224         n_node = ceil(n_core_total / n_core_per_node)
0225     request_ram_factor = request_ram * n_core_factor
0226     request_ram_bytes = request_ram * 2**20
0227     request_ram_bytes_factor = request_ram * 2**20 * n_core_factor
0228     request_ram_per_core = ceil(request_ram / n_core_total)
0229     request_ram_bytes_per_core = ceil(request_ram_bytes / n_core_total)
0230     request_cputime = request_walltime * n_core_total
0231     request_walltime_minute = ceil(request_walltime / 60)
0232     request_cputime_minute = ceil(request_cputime / 60)
0233     # decide prodSourceLabel
0234     pilot_opt_dict = submitter_common.get_complicated_pilot_options(
0235         pilot_type=workspec.pilotType,
0236         pilot_url=pilot_url,
0237         pilot_version=pilot_version,
0238         prod_source_label=harvester_queue_config.get_source_label(workspec.jobType),
0239         prod_rc_permille=prod_rc_permille,
0240     )
0241     prod_source_label = pilot_opt_dict["prod_source_label"]
0242     pilot_type_opt = pilot_opt_dict["pilot_type_opt"]
0243     pilot_url_str = pilot_opt_dict["pilot_url_str"]
0244     pilot_debug_str = pilot_opt_dict["pilot_debug_str"]
0245     tmpLog.debug(f"pilot options: {pilot_opt_dict}")
0246     # get token filename according to CE
0247     token_filename = None
0248     if token_dir is not None and ce_info_dict.get("ce_endpoint"):
0249         token_filename = endpoint_to_filename(ce_info_dict["ce_endpoint"])
0250     token_path = None
0251     if token_dir is not None and token_filename is not None:
0252         token_path = os.path.join(token_dir, token_filename)
0253     else:
0254         tmpLog.warning(f"token_path is None: site={panda_queue_name}, token_dir={token_dir} , token_filename={token_filename}")
0255     # get pilot-pandaserver token
0256     panda_token_path = None
0257     if panda_token_dir is not None and panda_token_filename is not None:
0258         panda_token_path = os.path.join(panda_token_dir, panda_token_filename)
0259     else:
0260         # tmpLog.warning(f"panda_token_path is None: panda_token_dir={panda_token_dir} , panda_token_filename={panda_token_filename}")
0261         pass
0262     # get panda token key
0263     panda_token_key_filename = None
0264     if panda_token_key_path is not None:
0265         panda_token_key_filename = os.path.basename(panda_token_key_path)
0266     # custom submit attributes (+key1 = value1 ; +key2 = value2 in JDL)
0267     custom_submit_attr_str_list = []
0268     for attr_key, attr_value in custom_submit_attr_dict.items():
0269         custom_submit_attr_str_list.append(f"+{attr_key} = {attr_value}")
0270     custom_submit_attr_str = "\n".join(custom_submit_attr_str_list)
0271     # open tmpfile as submit description file
0272     tmpFile = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="_submit.sdf", dir=workspec.get_access_point())
0273 
0274     # instance of resource type mapper
0275     rt_mapper = ResourceTypeMapper()
0276     all_resource_types = rt_mapper.get_all_resource_types()
0277 
0278     # jobspec filename
0279     jobspec_filename = harvester_queue_config.messenger.get("jobSpecFileName", base_messenger.jobSpecFileName)
0280 
0281     # placeholder map
0282     placeholder_map = {
0283         "sdfPath": tmpFile.name,
0284         "executableFile": executable_file,
0285         "jobSpecFileName": jobspec_filename,
0286         "nCorePerNode": n_core_per_node,
0287         "nCoreTotal": n_core_total_factor,
0288         "nNode": n_node,
0289         "nCoreFactor": n_core_factor,
0290         "requestRam": request_ram_factor,
0291         "requestRamBytes": request_ram_bytes_factor,
0292         "requestRamPerCore": request_ram_per_core,
0293         "requestRamBytesPerCore": request_ram_bytes_per_core,
0294         "requestDisk": request_disk,
0295         "requestWalltime": request_walltime,
0296         "requestWalltimeMinute": request_walltime_minute,
0297         "requestCputime": request_cputime,
0298         "requestCputimeMinute": request_cputime_minute,
0299         "accessPoint": workspec.accessPoint,
0300         "harvesterID": harvester_config.master.harvester_id,
0301         "workerID": workspec.workerID,
0302         "computingSite": workspec.computingSite,
0303         "pandaQueueName": panda_queue_name,
0304         "x509UserProxy": x509_user_proxy,
0305         "ceEndpoint": ce_info_dict.get("ce_endpoint", ""),
0306         "ceHostname": ce_info_dict.get("ce_hostname", ""),
0307         "ceFlavour": ce_info_dict.get("ce_flavour", ""),
0308         "ceJobmanager": ce_info_dict.get("ce_jobmanager", ""),
0309         "ceQueueName": ce_info_dict.get("ce_queue_name", ""),
0310         "ceVersion": ce_info_dict.get("ce_version", ""),
0311         "logDir": log_dir,
0312         "logSubdir": log_subdir,
0313         "prodSourceLabel": prod_source_label,
0314         "jobType": workspec.jobType,
0315         "resourceType": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, all_resource_types),
0316         "pilotResourceTypeOption": submitter_common.get_resource_type(workspec.resourceType, is_unified_queue, all_resource_types, is_pilot_option=True),
0317         "ioIntensity": io_intensity,
0318         "pilotType": pilot_type_opt,
0319         "pilotUrlOption": pilot_url_str,
0320         "pilotVersion": pilot_version,
0321         "pilotPythonOption": submitter_common.get_python_version_option(python_version, prod_source_label),
0322         "pilotDebugOption": pilot_debug_str,
0323         "pilotArgs": pilot_args,
0324         "submissionHost": workspec.submissionHost,
0325         "submissionHostShort": workspec.submissionHost.split(".")[0],
0326         "ceARCGridType": ce_info_dict.get("ce_grid_type", "arc"),
0327         "tokenDir": token_dir,
0328         "tokenFilename": token_filename,
0329         "tokenPath": token_path,
0330         "pandaTokenFilename": panda_token_filename,
0331         "pandaTokenPath": panda_token_path,
0332         "pandaTokenKeyFilename": panda_token_key_filename,
0333         "pandaTokenKeyPath": panda_token_key_path,
0334         "pilotJobLabel": submitter_common.get_joblabel(prod_source_label),
0335         "pilotJobType": submitter_common.get_pilot_job_type(workspec.jobType),
0336         "requestGpus": 1 if is_gpu_resource else 0,
0337         "requireGpus": is_gpu_resource,
0338         "customSubmitAttributes": custom_submit_attr_str,
0339         "cricPandaSite": cric_panda_site,
0340     }
0341 
0342     gtag = batch_log_dict.get("gtag", "fake_GTAG_string").format(**placeholder_map)
0343     placeholder_map["gtag"] = gtag
0344 
0345     # fill in template string
0346     jdl_str = template.format(**placeholder_map)
0347     # save jdl to submit description file
0348     tmpFile.write(jdl_str)
0349     tmpFile.close()
0350     tmpLog.debug(f"saved sdf at {tmpFile.name}")
0351     tmpLog.debug("done")
0352     return jdl_str, placeholder_map
0353 
0354 
0355 def parse_batch_job_filename(value_str, file_dir, batchID, guess=False):
0356     """
0357     parse log, stdout, stderr filename
0358     """
0359     _filename = os.path.basename(value_str)
0360     if guess:
0361         # guess file name before files really created; possibly containing condor macros
0362         return _filename
0363     else:
0364         _sanitized_list = re.sub("\{(\w+)\}|\[(\w+)\]|\((\w+)\)|#(\w+)#|\$", "", _filename).split(".")
0365         _prefix = _sanitized_list[0]
0366         _suffix = _sanitized_list[-1] if len(_sanitized_list) > 1 else ""
0367 
0368         for _f in os.listdir(file_dir):
0369             if re.match(f"{_prefix}(.*)\\.{batchID}\\.(.*)\\.{_suffix}", _f):
0370                 return _f
0371         return None
0372 
0373 
0374 # submitter for HTCondor batch system
0375 class HTCondorSubmitter(PluginBase):
0376     # constructor
0377     def __init__(self, **kwarg):
0378         tmpLog = core_utils.make_logger(baseLogger, method_name="__init__")
0379         self.logBaseURL = None
0380         if hasattr(self, "useFQDN") and self.useFQDN:
0381             self.hostname = socket.getfqdn()
0382         else:
0383             self.hostname = socket.gethostname().split(".")[0]
0384         PluginBase.__init__(self, **kwarg)
0385         # extra plugin configs
0386         extra_plugin_configs = {}
0387         try:
0388             extra_plugin_configs = harvester_config.master.extraPluginConfigs["HTCondorSubmitter"]
0389         except AttributeError:
0390             pass
0391         except KeyError:
0392             pass
0393         # number of processes
0394         self.nProcesses = getattr(self, "nProcesses", 1)
0395         if (not self.nProcesses) or (self.nProcesses < 1):
0396             self.nProcesses = 1
0397         # number of nodes
0398         self.nNode = getattr(self, "nNode", None)
0399         # number of cores per node
0400         self.nCorePerNode = getattr(self, "nCorePerNode", None)
0401         # ncore factor
0402         self.nCoreFactor = getattr(self, "nCoreFactor", 1)
0403         if type(self.nCoreFactor) in [dict]:
0404             # self.nCoreFactor is a dict for ucore
0405             # self.nCoreFactor = self.nCoreFactor
0406             pass
0407         else:
0408             self.nCoreFactor = int(self.nCoreFactor)
0409             if (not self.nCoreFactor) or (self.nCoreFactor < 1):
0410                 self.nCoreFactor = 1
0411         # executable file
0412         self.executableFile = getattr(self, "executableFile", None)
0413         # condor log directory
0414         self.logDir = getattr(self, "logDir", os.getenv("TMPDIR") or "/tmp")
0415         if self.logDir and ("$hostname" in self.logDir or "${hostname}" in self.logDir):
0416             self.logDir = self.logDir.replace("$hostname", self.hostname).replace("${hostname}", self.hostname)
0417             try:
0418                 if not os.path.exists(self.logDir):
0419                     os.mkdir(self.logDir)
0420             except Exception as ex:
0421                 tmpLog.debug(f"Failed to create logDir({self.logDir}): {str(ex)}")
0422         # log base url
0423         self.logBaseURL = getattr(self, "logBaseURL", None)
0424         if self.logBaseURL and ("$hostname" in self.logBaseURL or "${hostname}" in self.logBaseURL):
0425             self.logBaseURL = self.logBaseURL.replace("$hostname", self.hostname).replace("${hostname}", self.hostname)
0426         if self.logBaseURL and "${harvester_id}" in self.logBaseURL:
0427             self.logBaseURL = self.logBaseURL.replace("${harvester_id}", harvester_config.master.harvester_id)
0428         # Default x509 proxy for a queue
0429         self.x509UserProxy = getattr(self, "x509UserProxy", os.getenv("X509_USER_PROXY"))
0430         # x509 proxy for analysis jobs in grandly unified queues
0431         self.x509UserProxyAnalysis = getattr(self, "x509UserProxyAnalysis", os.getenv("X509_USER_PROXY_ANAL"))
0432         # Default token directory for a queue
0433         self.tokenDir = getattr(self, "tokenDir", None)
0434         # token directory for analysis jobs in grandly unified queues
0435         self.tokenDirAnalysis = getattr(self, "tokenDirAnalysis", None)
0436         # pilot-pandaserver token
0437         self.pandaTokenFilename = getattr(self, "pandaTokenFilename", None)
0438         self.pandaTokenDir = getattr(self, "pandaTokenDir", None)
0439         self.pandaTokenKeyPath = getattr(self, "pandaTokenKeyPath", None)
0440         # CRIC
0441         self.useCRIC = getattr(self, "useCRIC", getattr(self, "useAtlasCRIC", getattr(self, "useAtlasAGIS", False)))
0442         # Grid CE, requiring CRIC
0443         self.useCRICGridCE = getattr(self, "useCRICGridCE", getattr(self, "useAtlasGridCE", False))
0444         self.useCRIC = self.useCRIC or self.useCRICGridCE
0445         # sdf template
0446         self.templateFile = getattr(self, "templateFile", None)
0447         # sdf template directories of CEs; ignored if templateFile is set
0448         self.CEtemplateDir = getattr(self, "CEtemplateDir", "")
0449         # remote condor schedd and pool name (collector)
0450         self.condorSchedd = getattr(self, "condorSchedd", None)
0451         if self.condorSchedd is not None and ("$hostname" in self.condorSchedd or "${hostname}" in self.condorSchedd):
0452             self.condorSchedd = self.condorSchedd.replace("$hostname", self.hostname).replace("${hostname}", self.hostname)
0453         self.condorPool = getattr(self, "condorPool", None)
0454         if self.condorPool is not None and ("$hostname" in self.condorPool or "${hostname}" in self.condorPool):
0455             self.condorPool = self.condorPool.replace("$hostname", self.hostname).replace("${hostname}", self.hostname)
0456         # json config file of remote condor host: schedd/pool and weighting. If set, condorSchedd and condorPool are overwritten
0457         self.condorHostConfig = getattr(self, "condorHostConfig", False)
0458         if self.condorHostConfig:
0459             try:
0460                 self.condorSchedd = []
0461                 self.condorPool = []
0462                 self.condorHostWeight = []
0463                 with open(self.condorHostConfig, "r") as f:
0464                     condor_host_config_map = json.load(f)
0465                     for _schedd, _cm in condor_host_config_map.items():
0466                         _pool = _cm["pool"]
0467                         _weight = int(_cm["weight"])
0468                         self.condorSchedd.append(_schedd)
0469                         self.condorPool.append(_pool)
0470                         self.condorHostWeight.append(_weight)
0471             except Exception as e:
0472                 tmpLog.error(f"error when parsing condorHostConfig json file; {e.__class__.__name__}: {e}")
0473                 raise
0474         else:
0475             if isinstance(self.condorSchedd, list):
0476                 self.condorHostWeight = [1] * len(self.condorSchedd)
0477             else:
0478                 self.condorHostWeight = [1]
0479         # condor spool mechanism. If False, need shared FS across remote schedd
0480         self.useSpool = getattr(self, "useSpool", False)
0481         # number of workers less than this number will be bulkily submitted in only one schedd
0482         self.minBulkToRandomizedSchedd = getattr(self, "minBulkToRandomizedSchedd", 20)
0483         # try to use analysis credentials first
0484         self.useAnalysisCredentials = getattr(self, "useAnalysisCredentials", False)
0485         # probability permille to randomly run PR pilot with RC pilot url
0486         self.rcPilotRandomWeightPermille = getattr(self, "rcPilotRandomWeightPermille", 0)
0487         # submission to ARC CE's with nordugrid (gridftp) or arc (REST) grid type
0488         self.submit_arc_grid_type = "arc"
0489         if extra_plugin_configs.get("submit_arc_grid_type") == "nordugrid":
0490             self.submit_arc_grid_type = "nordugrid"
0491         # record of information of CE statistics
0492         self.ceStatsLock = threading.Lock()
0493         self.ceStats = dict()
0494         # allowed associated parameters and parameter prefixes from CRIC
0495         self._allowed_cric_attrs = [
0496             "ce_fairshare_percent",
0497             "pilot_url",
0498             "pilot_args",
0499         ]
0500         self._allowed_cric_attr_prefixes = [
0501             "jdl.plusattr.",
0502         ]
0503 
0504     # get CE statistics of a site
0505     def get_ce_statistics(self, site_name, queue_config, n_new_workers, time_window=21600):
0506         if site_name in self.ceStats:
0507             return self.ceStats[site_name]
0508         with self.ceStatsLock:
0509             if site_name in self.ceStats:
0510                 return self.ceStats[site_name]
0511             else:
0512                 worker_limits_dict, _ = self.dbInterface.get_worker_limits(self.queueName, queue_config)
0513                 worker_ce_stats_dict = self.dbInterface.get_worker_ce_stats(self.queueName)
0514                 worker_ce_backend_throughput_dict = self.dbInterface.get_worker_ce_backend_throughput(self.queueName, time_window=time_window)
0515                 return (worker_limits_dict, worker_ce_stats_dict, worker_ce_backend_throughput_dict, time_window, n_new_workers)
0516 
0517     # submit workers
0518     def submit_workers(self, workspec_list):
0519         tmpLog = self.make_logger(baseLogger, f"site={self.queueName}", method_name="submit_workers")
0520 
0521         nWorkers = len(workspec_list)
0522         tmpLog.debug(f"start nWorkers={nWorkers}")
0523 
0524         # whether to submit any worker
0525         to_submit_any = True
0526 
0527         # get log subdirectory name from timestamp
0528         timeNow = core_utils.naive_utcnow()
0529         log_subdir = timeNow.strftime("%y-%m-%d_%H")
0530         log_subdir_path = os.path.join(self.logDir, log_subdir)
0531         if self.condorSchedd is None or not self.useSpool:
0532             try:
0533                 os.mkdir(log_subdir_path)
0534             except OSError as e:
0535                 if e.errno != errno.EEXIST:
0536                     raise
0537                 else:
0538                     pass
0539 
0540         # get info from harvester queue config
0541         _queueConfigMapper = QueueConfigMapper()
0542         harvester_queue_config = _queueConfigMapper.get_queue(self.queueName)
0543 
0544         # associated parameters dict
0545         associated_params_dict = {}
0546 
0547         is_grandly_unified_queue = False
0548         # get queue info from CRIC by cacher in db
0549         if self.useCRIC:
0550             panda_queues_dict = PandaQueuesDict()
0551             panda_queues_dict_last_refresh = core_utils.naive_utcfromtimestamp(panda_queues_dict.last_refresh_ts)
0552             tmpLog.debug(f"PandaQueuesDict last refresh at {panda_queues_dict_last_refresh}")
0553             panda_queue_name = panda_queues_dict.get_panda_queue_name(self.queueName)
0554             this_panda_queue_dict = panda_queues_dict.get(self.queueName, dict())
0555             is_grandly_unified_queue = panda_queues_dict.is_grandly_unified_queue(self.queueName)
0556             # tmpLog.debug('panda_queues_name and queue_info: {0}, {1}'.format(self.queueName, panda_queues_dict[self.queueName]))
0557             # associated params on CRIC
0558             for key, val in panda_queues_dict.get_harvester_params(self.queueName).items():
0559                 if not isinstance(key, str):
0560                     continue
0561                 if key in self._allowed_cric_attrs or any([key.startswith(the_prefix) for the_prefix in self._allowed_cric_attr_prefixes]):
0562                     if isinstance(val, str):
0563                         # sanitized list the value
0564                         val = re.sub(r"[;$~`]*", "", val)
0565                     associated_params_dict[key] = val
0566         else:
0567             panda_queues_dict = dict()
0568             panda_queue_name = self.queueName
0569             this_panda_queue_dict = dict()
0570 
0571         # get default information from queue info
0572         n_core_per_node_from_queue = this_panda_queue_dict.get("corecount", 1) if this_panda_queue_dict.get("corecount", 1) else 1
0573         is_unified_queue = this_panda_queue_dict.get("capability", "") == "ucore"
0574         pilot_url = associated_params_dict.get("pilot_url")
0575         pilot_args = associated_params_dict.get("pilot_args", "")
0576         pilot_version = str(this_panda_queue_dict.get("pilot_version", "current"))
0577         python_version = str(this_panda_queue_dict.get("python_version", "3"))
0578         is_gpu_resource = this_panda_queue_dict.get("resource_type", "") == "gpu"
0579         ce_fairshare_percent = associated_params_dict.get("ce_fairshare_percent", 50)
0580         cric_panda_site = this_panda_queue_dict.get("panda_site")
0581         custom_submit_attr_dict = {}
0582         for k, v in associated_params_dict.items():
0583             # fill custom submit attributes for adding to JDL
0584             try:
0585                 the_prefix = "jdl.plusattr."
0586                 if k.startswith(the_prefix):
0587                     attr_key = k[len(the_prefix) :]
0588                     attr_value = str(v)
0589                     if not re.fullmatch(r"[a-zA-Z_0-9][a-zA-Z_0-9.\-]*", attr_key):
0590                         # skip invalid key
0591                         tmpLog.warning(f'custom submit attribute "{k}: {v}" has invalid key; skipped')
0592                         continue
0593                     if not re.fullmatch(r'[a-zA-Z_0-9.\-,=&|()!?" ]+', attr_value):
0594                         # skip invalid value
0595                         tmpLog.warning(f'custom submit attribute "{k}: {v}" has invalid value; skipped')
0596                         continue
0597                     custom_submit_attr_dict[attr_key] = attr_value
0598             except Exception as e:
0599                 tmpLog.warning(f'Got {e} with custom submit attributes "{k}: {v}"; skipped')
0600                 continue
0601 
0602         # get override requirements from queue configured
0603         n_node = getattr(self, "nNode", None)
0604         n_core_per_node = getattr(self, "nCorePerNode", n_core_per_node_from_queue)
0605         if not n_core_per_node:
0606             n_core_per_node = n_core_per_node_from_queue
0607 
0608         # deal with Condor schedd and central managers; make a random list the choose
0609         n_bulks = ceil(nWorkers / self.minBulkToRandomizedSchedd)
0610         if isinstance(self.condorSchedd, list) and len(self.condorSchedd) > 0:
0611             orig_list = []
0612             if isinstance(self.condorPool, list) and len(self.condorPool) > 0:
0613                 for _schedd, _pool, _weight in zip(self.condorSchedd, self.condorPool, self.condorHostWeight):
0614                     orig_list.extend([(_schedd, _pool)] * _weight)
0615             else:
0616                 for _schedd, _weight in zip(self.condorSchedd, self.condorHostWeight):
0617                     orig_list.extend([(_schedd, self.condorPool)] * _weight)
0618             if n_bulks < len(orig_list):
0619                 schedd_pool_choice_list = random.sample(orig_list, n_bulks)
0620             else:
0621                 schedd_pool_choice_list = orig_list
0622         else:
0623             schedd_pool_choice_list = [(self.condorSchedd, self.condorPool)]
0624 
0625         # deal with CE
0626         special_par = ""
0627         ce_weighting = None
0628         if self.useCRICGridCE:
0629             # If CRIC Grid CE mode used
0630             tmpLog.debug("Using CRIC Grid CE mode...")
0631             queues_from_queue_list = this_panda_queue_dict.get("queues", [])
0632             special_par = this_panda_queue_dict.get("special_par", "")
0633             ce_auxiliary_dict = {}
0634             for _queue_dict in queues_from_queue_list:
0635                 if not (
0636                     _queue_dict.get("ce_endpoint")
0637                     and str(_queue_dict.get("ce_state", "")).upper() == "ACTIVE"
0638                     and str(_queue_dict.get("ce_flavour", "")).lower() in set(["arc-ce", "cream-ce", "htcondor-ce"])
0639                 ):
0640                     continue
0641                 ce_info_dict = _queue_dict.copy()
0642                 # ignore protocol prefix in ce_endpoint for cream and condor CE
0643                 # check protocol prefix for ARC CE (gridftp or REST)
0644                 _match_ce_endpoint = re.match("^(\w+)://(\w+)", ce_info_dict.get("ce_endpoint", ""))
0645                 ce_endpoint_prefix = ""
0646                 if _match_ce_endpoint:
0647                     ce_endpoint_prefix = _match_ce_endpoint.group(1)
0648                 ce_endpoint_from_queue = re.sub("^\w+://", "", ce_info_dict.get("ce_endpoint", ""))
0649                 ce_flavour_str = str(ce_info_dict.get("ce_flavour", "")).lower()
0650                 ce_version_str = str(ce_info_dict.get("ce_version", "")).lower()
0651                 # grid type of htcondor grid universe to use; empty string as default
0652                 ce_info_dict["ce_grid_type"] = ""
0653                 if ce_flavour_str == "arc-ce":
0654                     ce_info_dict["ce_grid_type"] = self.submit_arc_grid_type
0655                 ce_info_dict["ce_hostname"] = re.sub(":\w*", "", ce_endpoint_from_queue)
0656                 if ce_info_dict["ce_grid_type"] == "arc":
0657                     default_port = None
0658                     if ce_info_dict["ce_hostname"] == ce_endpoint_from_queue:
0659                         # default port
0660                         default_port = 443
0661                     else:
0662                         # change port 2811 to 443
0663                         ce_endpoint_from_queue = re.sub(r":2811$", ":443", ce_endpoint_from_queue)
0664                     ce_info_dict["ce_endpoint"] = f"{ce_endpoint_from_queue}{f':{default_port}' if default_port is not None else ''}"
0665                 else:
0666                     if ce_info_dict["ce_hostname"] == ce_endpoint_from_queue:
0667                         # add default port to ce_endpoint if missing
0668                         default_port_map = {
0669                             "cream-ce": 8443,
0670                             "arc-ce": 2811,
0671                             "htcondor-ce": 9619,
0672                         }
0673                         if ce_flavour_str in default_port_map:
0674                             default_port = default_port_map[ce_flavour_str]
0675                             ce_info_dict["ce_endpoint"] = f"{ce_endpoint_from_queue}:{default_port}"
0676                     if ce_flavour_str == "arc-ce":
0677                         ce_info_dict["ce_endpoint"] = f"{ce_endpoint_from_queue}"
0678                 tmpLog.debug(f'Got pilot version: "{pilot_version}"; CE endpoint: "{ce_endpoint_from_queue}", flavour: "{ce_flavour_str}"')
0679                 ce_endpoint = ce_info_dict.get("ce_endpoint")
0680                 if ce_endpoint in ce_auxiliary_dict and str(ce_info_dict.get("ce_queue_name", "")).lower() == "default":
0681                     pass
0682                 else:
0683                     ce_auxiliary_dict[ce_endpoint] = ce_info_dict
0684             # qualified CEs from CRIC info
0685             n_qualified_ce = len(ce_auxiliary_dict)
0686             if n_qualified_ce > 0:
0687                 # Get CE weighting
0688                 tmpLog.debug("Get CE weighting")
0689                 worker_ce_all_tuple = self.get_ce_statistics(self.queueName, harvester_queue_config, nWorkers)
0690                 is_slave_queue = harvester_queue_config.runMode == "slave"
0691                 ce_weighting = submitter_common.get_ce_weighting(
0692                     ce_endpoint_list=list(ce_auxiliary_dict.keys()),
0693                     worker_ce_all_tuple=worker_ce_all_tuple,
0694                     is_slave_queue=is_slave_queue,
0695                     fairshare_percent=ce_fairshare_percent,
0696                 )
0697                 stats_weighting_display_str = submitter_common.get_ce_stats_weighting_display(
0698                     ce_auxiliary_dict.keys(), worker_ce_all_tuple, ce_weighting, ce_fairshare_percent
0699                 )
0700                 tmpLog.debug(f"CE stats and weighting: {stats_weighting_display_str}")
0701             else:
0702                 tmpLog.error("No valid CE endpoint found")
0703                 to_submit_any = False
0704 
0705         def _handle_one_worker(workspec, to_submit=to_submit_any):
0706             # make logger
0707             tmpLog = core_utils.make_logger(
0708                 baseLogger, f"site={self.queueName} workerID={workspec.workerID}, resourceType={workspec.resourceType}", method_name="_handle_one_worker"
0709             )
0710 
0711             def _choose_credential():
0712                 """
0713                 Choose the credential based on the job type
0714                 """
0715                 proxy = self.x509UserProxy
0716                 token_dir = self.tokenDir
0717                 if self.useAnalysisCredentials:
0718                     if self.x509UserProxyAnalysis:
0719                         tmpLog.debug("Taking analysis proxy")
0720                         proxy = self.x509UserProxyAnalysis
0721                     if self.tokenDirAnalysis:
0722                         tmpLog.debug("Taking analysis token_dir")
0723                         token_dir = self.tokenDirAnalysis
0724                 else:
0725                     tmpLog.debug("Taking default proxy")
0726                     if self.tokenDir:
0727                         tmpLog.debug("Taking default token_dir")
0728                 return proxy, token_dir
0729 
0730             def get_core_factor(workspec):
0731                 try:
0732                     if type(self.nCoreFactor) in [dict]:
0733                         if workspec.jobType in self.nCoreFactor:
0734                             job_type = workspec.jobType
0735                         else:
0736                             job_type = "Any"
0737                         if is_unified_queue:
0738                             resource_type = workspec.resourceType
0739                         else:
0740                             resource_type = "Undefined"
0741                         n_core_factor = self.nCoreFactor.get(job_type, {}).get(resource_type, 1)
0742                         return int(n_core_factor)
0743                     else:
0744                         return int(self.nCoreFactor)
0745                 except Exception as ex:
0746                     tmpLog.warning(f"Failed to get core factor: {ex}")
0747                 return 1
0748 
0749             # initialize
0750             ce_info_dict = dict()
0751             batch_log_dict = dict()
0752             data = {
0753                 "workspec": workspec,
0754                 "to_submit": to_submit,
0755             }
0756             if to_submit:
0757                 sdf_template_file = None
0758                 if self.useCRICGridCE:
0759                     # choose a CE
0760                     tmpLog.info("choose a CE...")
0761                     ce_chosen = submitter_common.choose_ce(ce_weighting)
0762                     try:
0763                         ce_info_dict = ce_auxiliary_dict[ce_chosen].copy()
0764                     except KeyError:
0765                         tmpLog.info("Problem choosing CE with weighting. Choose an arbitrary CE endpoint")
0766                         ce_info_dict = random.choice(list(ce_auxiliary_dict.values())).copy()
0767                     ce_flavour_str = str(ce_info_dict.get("ce_flavour", "")).lower()
0768                     tmpLog.debug(f"Got pilot version: \"{pilot_version}\"; CE endpoint: \"{ce_info_dict['ce_endpoint']}\", flavour: \"{ce_flavour_str}\"")
0769                     if self.templateFile:
0770                         sdf_template_file = self.templateFile
0771                     elif os.path.isdir(self.CEtemplateDir) and ce_flavour_str:
0772                         sdf_suffix_str = ""
0773                         if ce_info_dict["ce_grid_type"] and ce_info_dict["ce_grid_type"] != "arc":
0774                             sdf_suffix_str = f"_{ce_info_dict['ce_grid_type']}"
0775                         sdf_template_filename = f"{ce_flavour_str}{sdf_suffix_str}.sdf"
0776                         sdf_template_file = os.path.join(self.CEtemplateDir, sdf_template_filename)
0777                 else:
0778                     if self.templateFile:
0779                         sdf_template_file = self.templateFile
0780                     try:
0781                         # Manually define site condor schedd as ceHostname and central manager as ceEndpoint
0782                         if self.ceHostname and isinstance(self.ceHostname, list) and len(self.ceHostname) > 0:
0783                             if isinstance(self.ceEndpoint, list) and len(self.ceEndpoint) > 0:
0784                                 ce_info_dict["ce_hostname"], ce_info_dict["ce_endpoint"] = random.choice(list(zip(self.ceHostname, self.ceEndpoint)))
0785                             else:
0786                                 ce_info_dict["ce_hostname"] = random.choice(self.ceHostname)
0787                                 ce_info_dict["ce_endpoint"] = self.ceEndpoint
0788                         else:
0789                             ce_info_dict["ce_hostname"] = self.ceHostname
0790                             ce_info_dict["ce_endpoint"] = self.ceEndpoint
0791                     except AttributeError:
0792                         pass
0793                     tmpLog.debug(f"Got pilot version: \"{pilot_version}\"; CE endpoint: \"{ce_info_dict.get('ce_endpoint')}\"")
0794                     try:
0795                         # Manually define ceQueueName
0796                         if self.ceQueueName:
0797                             ce_info_dict["ce_queue_name"] = self.ceQueueName
0798                     except AttributeError:
0799                         pass
0800                 # template for batch script
0801                 try:
0802                     tmpFile = open(sdf_template_file)
0803                     sdf_template_raw = tmpFile.read()
0804                     tmpFile.close()
0805                 except AttributeError:
0806                     tmpLog.error("No valid templateFile found. Maybe templateFile, CEtemplateDir invalid, or no valid CE found")
0807                     to_submit = False
0808                     return data
0809                 else:
0810                     # get batch_log, stdout, stderr filename, and remove commented lines
0811                     sdf_template_str_list = []
0812                     for _line in sdf_template_raw.split("\n"):
0813                         if _line.startswith("#"):
0814                             continue
0815                         sdf_template_str_list.append(_line)
0816                         _match_batch_log = re.match("log = (.+)", _line)
0817                         _match_stdout = re.match("output = (.+)", _line)
0818                         _match_stderr = re.match("error = (.+)", _line)
0819                         if _match_batch_log:
0820                             batch_log_value = _match_batch_log.group(1)
0821                             continue
0822                         if _match_stdout:
0823                             stdout_value = _match_stdout.group(1)
0824                             continue
0825                         if _match_stderr:
0826                             stderr_value = _match_stderr.group(1)
0827                             continue
0828                     sdf_template = "\n".join(sdf_template_str_list)
0829                     # Choose from Condor schedd and central managers
0830                     condor_schedd, condor_pool = random.choice(schedd_pool_choice_list)
0831                     # set submissionHost
0832                     if not condor_schedd and not condor_pool:
0833                         workspec.submissionHost = "LOCAL"
0834                     else:
0835                         workspec.submissionHost = f"{condor_schedd},{condor_pool}"
0836                     tmpLog.debug(f"set submissionHost={workspec.submissionHost}")
0837                     # Log Base URL
0838                     if self.logBaseURL and "[ScheddHostname]" in self.logBaseURL:
0839                         schedd_hostname = re.sub(
0840                             r"(?:[a-zA-Z0-9_.\-]*@)?([a-zA-Z0-9.\-]+)(?::[0-9]+)?",
0841                             lambda matchobj: matchobj.group(1) if matchobj.group(1) else "",
0842                             condor_schedd,
0843                         )
0844                         log_base_url = re.sub(r"\[ScheddHostname\]", schedd_hostname, self.logBaseURL)
0845                     else:
0846                         log_base_url = self.logBaseURL
0847                     # URLs for log files
0848                     if not (log_base_url is None):
0849                         if workspec.batchID:
0850                             batchID = workspec.batchID
0851                             guess = False
0852                         else:
0853                             batchID = ""
0854                             guess = True
0855                         batch_log_filename = parse_batch_job_filename(value_str=batch_log_value, file_dir=log_subdir_path, batchID=batchID, guess=guess)
0856                         stdout_path_file_name = parse_batch_job_filename(value_str=stdout_value, file_dir=log_subdir_path, batchID=batchID, guess=guess)
0857                         stderr_path_filename = parse_batch_job_filename(value_str=stderr_value, file_dir=log_subdir_path, batchID=batchID, guess=guess)
0858                         batch_log = f"{log_base_url}/{log_subdir}/{batch_log_filename}"
0859                         batch_stdout = f"{log_base_url}/{log_subdir}/{stdout_path_file_name}"
0860                         batch_stderr = f"{log_base_url}/{log_subdir}/{stderr_path_filename}"
0861                         workspec.set_log_file("batch_log", batch_log)
0862                         workspec.set_log_file("stdout", batch_stdout)
0863                         workspec.set_log_file("stderr", batch_stderr)
0864                         batch_log_dict["batch_log"] = batch_log
0865                         batch_log_dict["batch_stdout"] = batch_stdout
0866                         batch_log_dict["batch_stderr"] = batch_stderr
0867                         batch_log_dict["gtag"] = workspec.workAttributes["stdOut"]
0868                         tmpLog.debug("Done set_log_file before submission")
0869                     tmpLog.debug("Done jobspec attribute setting")
0870 
0871                 # choose the x509 certificate based on the queue configuration
0872                 proxy, token_dir = _choose_credential()
0873 
0874                 # set data dict
0875                 data.update(
0876                     {
0877                         "workspec": workspec,
0878                         "to_submit": to_submit,
0879                         "template": sdf_template,
0880                         "executable_file": self.executableFile,
0881                         "log_dir": self.logDir,
0882                         "log_subdir": log_subdir,
0883                         "n_node": n_node,
0884                         "n_core_per_node": n_core_per_node,
0885                         "n_core_factor": get_core_factor(workspec),
0886                         "panda_queue_name": panda_queue_name,
0887                         "x509_user_proxy": proxy,
0888                         "ce_info_dict": ce_info_dict,
0889                         "batch_log_dict": batch_log_dict,
0890                         "special_par": special_par,
0891                         "harvester_queue_config": harvester_queue_config,
0892                         "is_unified_queue": is_unified_queue,
0893                         "condor_schedd": condor_schedd,
0894                         "condor_pool": condor_pool,
0895                         "use_spool": self.useSpool,
0896                         "pilot_url": pilot_url,
0897                         "pilot_args": pilot_args,
0898                         "pilot_version": pilot_version,
0899                         "python_version": python_version,
0900                         "token_dir": token_dir,
0901                         "panda_token_filename": self.pandaTokenFilename,
0902                         "panda_token_dir": self.pandaTokenDir,
0903                         "panda_token_key_path": self.pandaTokenKeyPath,
0904                         "is_unified_dispatch": True,
0905                         "prod_rc_permille": self.rcPilotRandomWeightPermille,
0906                         "is_gpu_resource": is_gpu_resource,
0907                         "custom_submit_attr_dict": custom_submit_attr_dict,
0908                         "cric_panda_site": cric_panda_site,
0909                     }
0910                 )
0911             return data
0912 
0913         def _propagate_attributes(workspec, tmpVal):
0914             # make logger
0915             tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="_propagate_attributes")
0916             (retVal, tmpDict) = tmpVal
0917             workspec.set_attributes_with_dict(tmpDict)
0918             tmpLog.debug("Done workspec attributes propagation")
0919             return retVal
0920 
0921         tmpLog.debug("finished preparing worker attributes")
0922 
0923         # map(_handle_one_worker, workspec_list)
0924         with ThreadPoolExecutor(self.nProcesses * 4) as thread_pool:
0925             dataIterator = thread_pool.map(_handle_one_worker, workspec_list)
0926         tmpLog.debug(f"{nWorkers} workers handled")
0927 
0928         # submit
0929         retValList = submit_bag_of_workers(list(dataIterator))
0930         tmpLog.debug(f"{nWorkers} workers submitted")
0931 
0932         # propagate changed attributes
0933         with ThreadPoolExecutor(self.nProcesses) as thread_pool:
0934             retIterator = thread_pool.map(lambda _wv_tuple: _propagate_attributes(*_wv_tuple), zip(workspec_list, retValList))
0935 
0936         retList = list(retIterator)
0937         tmpLog.debug("done")
0938 
0939         return retList