Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:04

0001 import json
0002 import socket
0003 import time
0004 import urllib.parse
0005 
0006 import arc
0007 from act.atlas.aCTDBPanda import aCTDBPanda
0008 from act.common.aCTConfig import aCTConfigARC
0009 from act.common.aCTProxy import aCTProxy
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.plugin_base import PluginBase
0013 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0014 
0015 # logger
0016 baseLogger = core_utils.setup_logger("act_submitter")
0017 
0018 # submitter for aCT
0019 
0020 
0021 class ACTSubmitter(PluginBase):
0022     # constructor
0023     def __init__(self, **kwarg):
0024         PluginBase.__init__(self, **kwarg)
0025 
0026         self.hostname = socket.getfqdn()
0027         # Set up aCT DB connection
0028         self.log = core_utils.make_logger(baseLogger, "aCT submitter", method_name="__init__")
0029         self.actDB = aCTDBPanda(self.log)
0030         # Credential dictionary role: proxy file
0031         self.certs = dict(zip([r.split("=")[1] for r in list(harvester_config.credmanager.voms)], list(harvester_config.credmanager.outCertFile)))
0032         # Map of role to aCT proxyid
0033         self.proxymap = {}
0034 
0035         # Get proxy info
0036         # TODO: better to send aCT the proxy file and let it handle it
0037         for role, proxy in self.certs.items():
0038             cred_type = arc.initializeCredentialsType(arc.initializeCredentialsType.SkipCredentials)
0039             uc = arc.UserConfig(cred_type)
0040             uc.ProxyPath(str(proxy))
0041             cred = arc.Credential(uc)
0042             dn = cred.GetIdentityName()
0043 
0044             actp = aCTProxy(self.log)
0045             attr = "/atlas/Role=" + role
0046             proxyid = actp.getProxyId(dn, attr)
0047             if not proxyid:
0048                 raise Exception(f"Proxy with DN {dn} and attribute {attr} was not found in proxies table")
0049 
0050             self.proxymap[role] = proxyid
0051 
0052     # submit workers
0053 
0054     def submit_workers(self, workspec_list):
0055         retList = []
0056         for workSpec in workspec_list:
0057             tmpLog = core_utils.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0058 
0059             queueconfigmapper = QueueConfigMapper()
0060             queueconfig = queueconfigmapper.get_queue(workSpec.computingSite)
0061             prodSourceLabel = queueconfig.get_source_label(workSpec.jobType)
0062 
0063             # If jobSpec is defined we are in push mode, if not pull mode
0064             # Both assume one to one worker to job mapping
0065             jobSpec = workSpec.get_jobspec_list()
0066             if jobSpec:
0067                 jobSpec = jobSpec[0]
0068                 tmpLog.debug(f"JobSpec: {jobSpec.values_map()}")
0069                 # Unified queues: take prodsourcelabel from job
0070                 prodSourceLabel = jobSpec.jobParams.get("prodSourceLabel", prodSourceLabel)
0071 
0072             desc = {}
0073             # If we need to prefetch events, set aCT status waiting.
0074             # feed_events in act_messenger will fill events and release the job
0075             if queueconfig.prefetchEvents:
0076                 desc["pandastatus"] = "waiting"
0077                 desc["actpandastatus"] = "waiting"
0078                 desc["arcjobid"] = -1  # dummy id to prevent submission
0079             else:
0080                 desc["pandastatus"] = "sent"
0081                 desc["actpandastatus"] = "sent"
0082             desc["siteName"] = workSpec.computingSite
0083             desc["proxyid"] = self.proxymap["pilot" if prodSourceLabel in ["user", "panda"] else "production"]
0084             desc["prodSourceLabel"] = prodSourceLabel
0085             desc["sendhb"] = 0
0086             metadata = {
0087                 "harvesteraccesspoint": workSpec.get_access_point(),
0088                 "schedulerid": f"harvester-{harvester_config.master.harvester_id}",
0089                 "harvesterid": harvester_config.master.harvester_id,
0090                 "harvesterworkerid": workSpec.workerID,
0091             }
0092             desc["metadata"] = json.dumps(metadata)
0093 
0094             if jobSpec:
0095                 # push mode: aCT takes the url-encoded job description (like it gets from panda server)
0096                 pandaid = jobSpec.PandaID
0097                 actjobdesc = urllib.parse.urlencode(jobSpec.jobParams)
0098             else:
0099                 # pull mode: set pandaid (to workerid), prodsourcelabel, resource type and requirements
0100                 pandaid = workSpec.workerID
0101                 actjobdesc = "&".join(
0102                     [
0103                         f"PandaID={pandaid}",
0104                         f"prodSourceLabel={prodSourceLabel}",
0105                         f"resourceType={workSpec.resourceType}",
0106                         f"minRamCount={workSpec.minRamCount}",
0107                         f"coreCount={workSpec.nCore}",
0108                         f"logFile={pandaid}.pilot.log",
0109                     ]
0110                 )
0111 
0112             tmpLog.info(f"Inserting job {pandaid} into aCT DB: {str(desc)}")
0113             try:
0114                 batchid = self.actDB.insertJob(pandaid, actjobdesc, desc)["LAST_INSERT_ID()"]
0115             except Exception as e:
0116                 result = (False, f"Failed to insert job into aCT DB: {str(e)}")
0117             else:
0118                 tmpLog.info(f"aCT batch id {batchid}")
0119                 workSpec.batchID = str(batchid)
0120                 workSpec.submissionHost = self.hostname
0121                 workSpec.nativeStatus = desc["actpandastatus"]
0122                 # Set log files in workSpec
0123                 today = time.strftime("%Y-%m-%d", time.gmtime())
0124                 logurl = "/".join([queueconfig.submitter.get("logBaseURL"), today, workSpec.computingSite, str(pandaid)])
0125                 workSpec.set_log_file("batch_log", f"{logurl}.log")
0126                 workSpec.set_log_file("stdout", f"{logurl}.out")
0127                 workSpec.set_log_file("stderr", f"{logurl}.err")
0128                 workSpec.set_log_file("jdl", f"{logurl}.jdl")
0129                 result = (True, "")
0130             retList.append(result)
0131 
0132         return retList