Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import json
0002 import os
0003 import re
0004 import urllib
0005 
0006 
0007 class ARCParser:
0008     """Converts panda job description to ARC job description using AGIS info"""
0009 
0010     def __init__(self, jobdesc, pandaqueue, siteinfo, logurl, schedulerid, osmap, tmpdir, eventranges, log):
0011         self.log = log
0012         # The URL-encoded job that is eventually passed to the pilot
0013         self.pandajob = urllib.urlencode(jobdesc)
0014         # The json of the job description
0015         self.jobdesc = jobdesc
0016         self.pandaid = self.jobdesc["PandaID"]
0017         self.xrsl = {}
0018         self.siteinfo = siteinfo
0019         self.ncores = siteinfo["corecount"]
0020         self.logurl = logurl
0021         self.schedulerid = schedulerid
0022 
0023         self.defaults = {}
0024         self.defaults["memory"] = 2000
0025         self.defaults["cputime"] = 2 * 1440 * 60
0026         self.sitename = siteinfo["panda_resource"]
0027         self.schedconfig = pandaqueue
0028         self.truepilot = "mv" not in siteinfo["copytools"] or len(siteinfo["copytools"]) > 1
0029         self.osmap = osmap
0030         # Set to 1 week if not specified
0031         self.maxwalltime = siteinfo["maxtime"] / 60
0032         if self.maxwalltime == 0:
0033             self.maxwalltime = 7 * 24 * 60
0034 
0035         self.tmpdir = tmpdir
0036         self.inputfiledir = os.path.join(self.tmpdir, "inputfiles")
0037         self.inputjobdir = os.path.join(self.inputfiledir, str(self.pandaid))
0038         self.eventranges = eventranges
0039         self.longjob = False
0040         self.traces = []
0041         if len(self.pandajob) > 50000:
0042             self.longjob = True
0043         self.artes = None
0044         self.cvmfs = siteinfo["is_cvmfs"]
0045 
0046         # ES merge jobs need unique guids because pilot uses them as dict keys
0047         if not self.truepilot and self.jobdesc.get("eventServiceMerge") == "True":
0048             if self.pandajob.startswith("GUID"):
0049                 esjobdesc = self.pandajob[self.pandajob.find("&") :]
0050             else:
0051                 esjobdesc = self.pandajob[: self.pandajob.find("&GUID")] + self.pandajob[self.pandajob.find("&", self.pandajob.find("&GUID") + 5) :]
0052             esjobdesc += f"&GUID={'%2C'.join([('DUMMYGUID%i' % i) for i in range(len(self.jobdesc['GUID'].split(',')))])}"
0053             self.pandajob = esjobdesc
0054 
0055     def getNCores(self):
0056         # Unified panda queues: always use coreCount from job description
0057         try:
0058             self.ncores = int(self.jobdesc.get("coreCount", 1))
0059         except BaseException:  # corecount is NULL
0060             self.ncores = 1
0061         self.xrsl["count"] = "(count=%d)" % self.ncores
0062 
0063         # force single-node jobs for now
0064         if self.ncores > 1:
0065             self.xrsl["countpernode"] = "(runtimeenvironment = APPS/HEP/ATLAS-MULTICORE-1.0)"
0066             if self.sitename.find("RAL-LCG2") < 0 and self.sitename.find("TOKYO") < 0 and self.sitename.find("FZK") < 0:
0067                 self.xrsl["countpernode"] = "(runtimeenvironment = APPS/HEP/ATLAS-MULTICORE-1.0)"
0068 
0069         return self.ncores
0070 
0071     def setJobname(self):
0072         if "jobName" in self.jobdesc:
0073             jobname = self.jobdesc["jobName"]
0074         else:
0075             jobname = "pandajob"
0076         self.xrsl["jobname"] = f'(jobname = "{jobname}")'
0077 
0078     def setDisk(self):
0079         if "maxDiskCount" in self.jobdesc:
0080             disk = int(self.jobdesc["maxDiskCount"])
0081         else:
0082             disk = 500
0083 
0084         # Add input file sizes
0085         if "fsize" in self.jobdesc:
0086             disk += sum([int(f) for f in self.jobdesc["fsize"].split(",")]) / 1000000
0087         # Add safety factor
0088         disk += 2000
0089         self.log.debug("%s: disk space %d" % (self.pandaid, disk))
0090         self.xrsl["disk"] = "(disk = %d)" % disk
0091 
0092     def setTime(self):
0093         if "maxCpuCount" in self.jobdesc:
0094             cpucount = int(self.jobdesc["maxCpuCount"])
0095 
0096             # hack for group production!!!
0097             if cpucount == 600:
0098                 cpucount = 24 * 3600
0099 
0100             cpucount = int(2 * cpucount)
0101             self.log.info(f"{self.pandaid}: job maxCpuCount {cpucount}")
0102         else:
0103             cpucount = 2 * 24 * 3600
0104             self.log.info(f"{self.pandaid}: Using default maxCpuCount {cpucount}")
0105 
0106         if cpucount == 0:
0107             cpucount = 2 * 24 * 3600
0108 
0109         # shorten installation jobs
0110         try:
0111             if self.jobdesc["prodSourceLabel"] == "install":
0112                 cpucount = 12 * 3600
0113         except BaseException:
0114             pass
0115 
0116         if int(cpucount) <= 0:
0117             cpucount = self.defaults["cputime"]
0118 
0119         walltime = int(cpucount / 60)
0120 
0121         # JEDI analysis hack
0122         walltime = max(60, walltime)
0123         walltime = min(self.maxwalltime, walltime)
0124         if self.sitename.startswith("BOINC"):
0125             walltime = min(240, walltime)
0126         cputime = self.getNCores() * walltime
0127         self.log.info("%s: walltime: %d, cputime: %d" % (self.pandaid, walltime, cputime))
0128 
0129         self.xrsl["time"] = "(walltime=%d)(cputime=%d)" % (walltime, cputime)
0130 
0131     def setMemory(self):
0132         if "minRamCount" in self.jobdesc:
0133             memory = int(self.jobdesc["minRamCount"])
0134         elif not self.sitename.startswith("ANALY"):
0135             memory = 4000
0136         else:
0137             memory = 2000
0138 
0139         if memory <= 0:
0140             memory = self.defaults["memory"]
0141 
0142         # fix until maxrrs in pandajob is better known
0143         if memory <= 500:
0144             memory = 500
0145 
0146         if "BOINC" in self.sitename:
0147             memory = 2400
0148 
0149         if self.getNCores() > 1:
0150             # hack for 0 ramcount, defaulting to 4000, see above, fix to 2000/core
0151             if memory == 4000:
0152                 memory = 2000
0153             else:
0154                 memory = memory / self.getNCores()
0155 
0156         # fix memory to 500MB units
0157         memory = int(memory - 1) / 500 * 500 + 500
0158 
0159         self.xrsl["memory"] = "(memory = %d)" % (memory)
0160 
0161     def setRTE(self):
0162         self.artes = ""
0163         if self.truepilot:
0164             self.xrsl["rtes"] = "(runtimeenvironment = ENV/PROXY)(runtimeenvironment = APPS/HEP/ATLAS-SITE-LCG)"
0165             return
0166         if self.siteinfo["type"] == "analysis":
0167             # Require proxy for analysis
0168             self.xrsl["rtes"] = "(runtimeenvironment = ENV/PROXY)(runtimeenvironment = APPS/HEP/ATLAS-SITE)"
0169             return
0170         if self.cvmfs:
0171             # Normal sites with cvmfs
0172             self.xrsl["rtes"] = "(runtimeenvironment = APPS/HEP/ATLAS-SITE)"
0173             return
0174 
0175         # Old-style RTEs for special sites with no cvmfs
0176         atlasrtes = []
0177         for package, cache in zip(self.jobdesc["swRelease"].split("\n"), self.jobdesc["homepackage"].split("\n")):
0178             if cache.find("Production") > 1 and cache.find("AnalysisTransforms") < 0:
0179                 rte = package.split("-")[0].upper() + "-" + cache.split("/")[1]
0180             elif cache.find("AnalysisTransforms") != -1:
0181                 rte = package.upper()
0182                 res = re.match("AnalysisTransforms-(.+)_(.+)", cache)
0183                 if res is not None:
0184                     if res.group(1).find("AtlasProduction") != -1:
0185                         rte = "ATLAS-" + res.group(2)
0186                     else:
0187                         rte = "ATLAS-" + res.group(1).upper() + "-" + res.group(2)
0188             else:
0189                 rte = cache.replace("Atlas", "Atlas-").replace("/", "-").upper()
0190             rte = str(rte)
0191             rte = rte.replace("ATLAS-", "")
0192             rte += "-" + self.jobdesc["cmtConfig"].upper()
0193 
0194             if cache.find("AnalysisTransforms") < 0:
0195                 rte = rte.replace("PHYSICS-", "ATLASPHYSICS-")
0196                 rte = rte.replace("PROD2-", "ATLASPROD2-")
0197                 rte = rte.replace("PROD1-", "ATLASPROD1-")
0198                 rte = rte.replace("DERIVATION-", "ATLASDERIVATION-")
0199                 rte = rte.replace("P1HLT-", "ATLASP1HLT-")
0200                 rte = rte.replace("TESTHLT-", "ATLASTESTHLT-")
0201                 rte = rte.replace("CAFHLT-", "ATLASCAFHLT-")
0202                 rte = rte.replace("21.0.13.1", "ATLASPRODUCTION-21.0.13.1")
0203                 rte = rte.replace("21.0.20.1", "ATLASPRODUCTION-21.0.20.1")
0204             if cache.find("AnalysisTransforms") != -1:
0205                 res = re.match("(21\..+)", rte)
0206                 if res is not None:
0207                     rte = rte.replace("21", "OFFLINE-21")
0208 
0209             if rte.find("NULL") != -1:
0210                 rte = "PYTHON-CVMFS-X86_64-SLC6-GCC47-OPT"
0211 
0212             atlasrtes.append(rte)
0213 
0214         self.xrsl["rtes"] = ""
0215         for rte in atlasrtes[-1:]:
0216             self.xrsl["rtes"] += "(runtimeenvironment = APPS/HEP/ATLAS-" + rte + ")"
0217 
0218         if self.siteinfo["type"] == "analysis":
0219             self.xrsl["rtes"] += "(runtimeenvironment = ENV/PROXY)"
0220 
0221         self.artes = ",".join(atlasrtes)
0222 
0223     def setExecutable(self):
0224         self.xrsl["executable"] = "(executable = ARCpilot)"
0225 
0226     def setArguments(self):
0227         if self.artes is None:
0228             self.setRTE()
0229 
0230         # Set options for NG/true pilot
0231         if self.truepilot:
0232             pargs = '"pilot3/pilot.py" "-h" "%s" "-s" "%s" "-f" "false" "-p" "25443" "-d" "{HOME}" "-w" "https://pandaserver.cern.ch"' % (
0233                 self.schedconfig,
0234                 self.sitename,
0235             )
0236         else:
0237             pargs = '"pilot3/pilot.py" "-h" "%s" "-s" "%s" "-F" "Nordugrid-ATLAS" "-d" "{HOME}" "-j" "false" "-f" "false" "-z" "true" "-b" "2" "-t" "false"' % (
0238                 self.sitename,
0239                 self.sitename,
0240             )
0241 
0242         pandajobarg = self.pandajob
0243         if self.longjob:
0244             pandajobarg = "FILE"
0245         self.xrsl["arguments"] = '(arguments = "' + self.artes + '" "' + pandajobarg + '" ' + pargs + ")"
0246 
0247     def setInputsES(self, inf):
0248         for f, s, i in zip(self.jobdesc["inFiles"].split(","), self.jobdesc["scopeIn"].split(","), self.jobdesc["prodDBlockToken"].split(",")):
0249             if i == "None":
0250                 # Rucio file
0251                 lfn = "/".join(["rucio://rucio-lb-prod.cern.ch;rucioaccount=pilot;transferprotocol=gsiftp;cache=invariant/replicas", s, f])
0252             elif int(i) in self.osmap:
0253                 lfn = "/".join([self.osmap[int(i)], f])
0254             else:
0255                 # TODO this exception is ignored by panda2arc
0256                 raise Exception(f"No OS defined in AGIS for bucket id {i}")
0257             inf[f] = lfn
0258 
0259     def setInputs(self):
0260         x = ""
0261         if self.truepilot:
0262             x += '(ARCpilot "http://aipanda404.cern.ch;cache=check/data/releases/ARCpilot-true")'
0263         elif self.eventranges:
0264             x += '(ARCpilot "http://aipanda404.cern.ch;cache=check/data/releases/ARCpilot-es")'
0265         else:
0266             x += '(ARCpilot "http://aipanda404.cern.ch;cache=check/data/releases/ARCpilot")'
0267 
0268         if self.jobdesc["prodSourceLabel"] == "rc_test":
0269             x += '(pilotcode.tar.gz "http://pandaserver.cern.ch:25080;cache=check/cache/pilot/pilotcode-rc.tar.gz")'
0270         else:
0271             x += '(pilotcode.tar.gz "http://pandaserver.cern.ch:25080;cache=check/cache/pilot/pilotcode-PICARD.tar.gz")'
0272 
0273         if self.eventranges:
0274             x += '(ARCpilot-test.tar.gz "http://aipanda404.cern.ch;cache=check/data/releases/ARCpilot-es.tar.gz")'
0275 
0276         if self.longjob:
0277             try:
0278                 os.makedirs(self.inputjobdir)
0279             except BaseException:
0280                 pass
0281             tmpfile = self.inputjobdir + "/pandaJobData.out"
0282             with open(tmpfile, "w") as f:
0283                 f.write(self.pandajob)
0284             x += f'(pandaJobData.out "{self.inputjobdir}/pandaJobData.out")'
0285 
0286         if not self.truepilot:
0287             x += f'(queuedata.pilot.json "http://pandaserver.cern.ch:25085;cache=check/cache/schedconfig/{self.schedconfig}.all.json")'
0288             if self.sitename.find("BEIJING") != -1:
0289                 x += '(agis_ddmendpoints.json "/cvmfs/atlas.cern.ch/repo/sw/local/etc/agis_ddmendpoints.json")'
0290 
0291         if "inFiles" in self.jobdesc and not self.truepilot:
0292             inf = {}
0293             if self.jobdesc.get("eventServiceMerge") == "True":
0294                 self.setInputsES(inf)
0295 
0296             for filename, scope in zip(self.jobdesc["inFiles"].split(","), self.jobdesc["scopeIn"].split(",")):
0297                 # Skip files which use direct I/O: site has it enabled, token is
0298                 # not 'local', file is root file and --useLocalIO is not used
0299 
0300                 # don't use direct I/O - pending new mover switch
0301                 # if token != 'local' and self.siteinfo.get('direct_access_lan', False) and \
0302                 #  not ('.tar.gz' in filename or '.lib.tgz' in filename or '.raw.' in filename) and \
0303                 #  '--useLocalIO' not in self.jobdesc['jobPars'][0]:
0304                 #    continue
0305                 # Hard-coded pilot rucio account - should change based on proxy
0306                 # Rucio does not expose mtime, set cache=invariant so not to download too much
0307                 urloptions = ";rucioaccount=pilot;transferprotocol=gsiftp;cache=invariant"
0308                 ruciourl = f"rucio://rucio-lb-prod.cern.ch{urloptions}/replicas"
0309                 lfn = "/".join([ruciourl, scope, filename])
0310 
0311                 inf[filename] = lfn
0312 
0313             # some files are double:
0314             for k, v in inf.items():
0315                 x += f'({k} "{v}")'
0316 
0317             if self.jobdesc.get("eventService") and self.eventranges:
0318                 # Create tmp json file to upload with job
0319                 tmpjsonfile = os.path.join(self.tmpdir, "eventranges-%d.json" % self.pandaid)
0320                 jsondata = json.loads(self.eventranges)
0321                 with open(tmpjsonfile, "w") as f:
0322                     json.dump(jsondata, f)
0323                 x += f'("eventranges.json" "{tmpjsonfile}")'
0324 
0325         self.xrsl["inputfiles"] = f"(inputfiles =  {x} )"
0326 
0327     def setLog(self):
0328         logfile = self.jobdesc.get("logFile", "LOGFILE")
0329         self.xrsl["log"] = f"(stdout = \"{logfile.replace('.tgz', '')}\")(join = yes)"
0330 
0331     def setGMLog(self):
0332         self.xrsl["gmlog"] = '(gmlog = "gmlog")'
0333         self.xrsl["rerun"] = '(rerun = "2")'
0334 
0335     def setOutputs(self):
0336         # dynamic outputs
0337 
0338         output = '("jobSmallFiles.tgz" "")'
0339         output += '("@output.list" "")'
0340         # needed for SCEAPI
0341         # generated output file list"
0342         output += '("output.list" "")'
0343         self.xrsl["outputs"] = f"(outputfiles = {output} )"
0344 
0345         if self.truepilot:
0346             self.xrsl["outputs"] = ""
0347 
0348     def setPriority(self):
0349         if "currentPriority" in self.jobdesc:
0350             prio = 50
0351             try:
0352                 prio = int(self.jobdesc["currentPriority"])
0353                 if prio < 1:
0354                     prio = 1
0355                 if prio > 0 and prio < 1001:
0356                     prio = prio * 90 / 1000.0
0357                     prio = int(prio)
0358                 if prio > 1000 and prio < 10001:
0359                     prio = 90 + (prio - 1000) / 900.0
0360                     prio = int(prio)
0361                 if prio > 10000:
0362                     prio = 100
0363             except BaseException:
0364                 pass
0365             self.xrsl["priority"] = "(priority = %d )" % prio
0366             if "wuppertalprod" in self.sitename:
0367                 self.xrsl["priority"] = ""
0368 
0369     def setEnvironment(self):
0370         environment = {}
0371         environment["PANDA_JSID"] = self.schedulerid
0372         if self.logurl:
0373             environment["GTAG"] = self.logurl
0374 
0375         # Vars for APFMon
0376         if self.truepilot and self.monitorurl:
0377             environment["APFCID"] = self.pandaid
0378             environment["APFFID"] = schedid
0379             environment["APFMON"] = self.monitorurl
0380             environment["FACTORYQUEUE"] = self.sitename
0381 
0382         self.xrsl["environment"] = "(environment = %s)" % "".join(['("%s" "%s")' % (k, v) for (k, v) in environment.items()])
0383 
0384     def parse(self):
0385         self.setTime()
0386         self.setJobname()
0387         # self.setDisk()
0388         self.setMemory()
0389         self.setRTE()
0390         self.setExecutable()
0391         self.setArguments()
0392         self.setInputs()
0393         self.setLog()
0394         self.setGMLog()
0395         self.setOutputs()
0396         self.setPriority()
0397         self.setEnvironment()
0398 
0399     def getXrsl(self):
0400         x = "&"
0401         x += "\n".join([val for val in self.xrsl.values()])
0402         return x