File indexing completed on 2026-04-19 08:00:04
0001 import json
0002 import socket
0003 import time
0004 import urllib.parse
0005
0006 import arc
0007 from act.atlas.aCTDBPanda import aCTDBPanda
0008 from act.common.aCTConfig import aCTConfigARC
0009 from act.common.aCTProxy import aCTProxy
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.plugin_base import PluginBase
0013 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0014
0015
0016 baseLogger = core_utils.setup_logger("act_submitter")
0017
0018
0019
0020
0021 class ACTSubmitter(PluginBase):
0022
0023 def __init__(self, **kwarg):
0024 PluginBase.__init__(self, **kwarg)
0025
0026 self.hostname = socket.getfqdn()
0027
0028 self.log = core_utils.make_logger(baseLogger, "aCT submitter", method_name="__init__")
0029 self.actDB = aCTDBPanda(self.log)
0030
0031 self.certs = dict(zip([r.split("=")[1] for r in list(harvester_config.credmanager.voms)], list(harvester_config.credmanager.outCertFile)))
0032
0033 self.proxymap = {}
0034
0035
0036
0037 for role, proxy in self.certs.items():
0038 cred_type = arc.initializeCredentialsType(arc.initializeCredentialsType.SkipCredentials)
0039 uc = arc.UserConfig(cred_type)
0040 uc.ProxyPath(str(proxy))
0041 cred = arc.Credential(uc)
0042 dn = cred.GetIdentityName()
0043
0044 actp = aCTProxy(self.log)
0045 attr = "/atlas/Role=" + role
0046 proxyid = actp.getProxyId(dn, attr)
0047 if not proxyid:
0048 raise Exception(f"Proxy with DN {dn} and attribute {attr} was not found in proxies table")
0049
0050 self.proxymap[role] = proxyid
0051
0052
0053
0054 def submit_workers(self, workspec_list):
0055 retList = []
0056 for workSpec in workspec_list:
0057 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0058
0059 queueconfigmapper = QueueConfigMapper()
0060 queueconfig = queueconfigmapper.get_queue(workSpec.computingSite)
0061 prodSourceLabel = queueconfig.get_source_label(workSpec.jobType)
0062
0063
0064
0065 jobSpec = workSpec.get_jobspec_list()
0066 if jobSpec:
0067 jobSpec = jobSpec[0]
0068 tmpLog.debug(f"JobSpec: {jobSpec.values_map()}")
0069
0070 prodSourceLabel = jobSpec.jobParams.get("prodSourceLabel", prodSourceLabel)
0071
0072 desc = {}
0073
0074
0075 if queueconfig.prefetchEvents:
0076 desc["pandastatus"] = "waiting"
0077 desc["actpandastatus"] = "waiting"
0078 desc["arcjobid"] = -1
0079 else:
0080 desc["pandastatus"] = "sent"
0081 desc["actpandastatus"] = "sent"
0082 desc["siteName"] = workSpec.computingSite
0083 desc["proxyid"] = self.proxymap["pilot" if prodSourceLabel in ["user", "panda"] else "production"]
0084 desc["prodSourceLabel"] = prodSourceLabel
0085 desc["sendhb"] = 0
0086 metadata = {
0087 "harvesteraccesspoint": workSpec.get_access_point(),
0088 "schedulerid": f"harvester-{harvester_config.master.harvester_id}",
0089 "harvesterid": harvester_config.master.harvester_id,
0090 "harvesterworkerid": workSpec.workerID,
0091 }
0092 desc["metadata"] = json.dumps(metadata)
0093
0094 if jobSpec:
0095
0096 pandaid = jobSpec.PandaID
0097 actjobdesc = urllib.parse.urlencode(jobSpec.jobParams)
0098 else:
0099
0100 pandaid = workSpec.workerID
0101 actjobdesc = "&".join(
0102 [
0103 f"PandaID={pandaid}",
0104 f"prodSourceLabel={prodSourceLabel}",
0105 f"resourceType={workSpec.resourceType}",
0106 f"minRamCount={workSpec.minRamCount}",
0107 f"coreCount={workSpec.nCore}",
0108 f"logFile={pandaid}.pilot.log",
0109 ]
0110 )
0111
0112 tmpLog.info(f"Inserting job {pandaid} into aCT DB: {str(desc)}")
0113 try:
0114 batchid = self.actDB.insertJob(pandaid, actjobdesc, desc)["LAST_INSERT_ID()"]
0115 except Exception as e:
0116 result = (False, f"Failed to insert job into aCT DB: {str(e)}")
0117 else:
0118 tmpLog.info(f"aCT batch id {batchid}")
0119 workSpec.batchID = str(batchid)
0120 workSpec.submissionHost = self.hostname
0121 workSpec.nativeStatus = desc["actpandastatus"]
0122
0123 today = time.strftime("%Y-%m-%d", time.gmtime())
0124 logurl = "/".join([queueconfig.submitter.get("logBaseURL"), today, workSpec.computingSite, str(pandaid)])
0125 workSpec.set_log_file("batch_log", f"{logurl}.log")
0126 workSpec.set_log_file("stdout", f"{logurl}.out")
0127 workSpec.set_log_file("stderr", f"{logurl}.err")
0128 workSpec.set_log_file("jdl", f"{logurl}.jdl")
0129 result = (True, "")
0130 retList.append(result)
0131
0132 return retList