File indexing completed on 2026-04-20 07:58:59
0001 import json
0002 import os
0003 import re
0004 import urllib
0005
0006
0007 class ARCParser:
0008 """Converts panda job description to ARC job description using AGIS info"""
0009
0010 def __init__(self, jobdesc, pandaqueue, siteinfo, logurl, schedulerid, osmap, tmpdir, eventranges, log):
0011 self.log = log
0012
0013 self.pandajob = urllib.urlencode(jobdesc)
0014
0015 self.jobdesc = jobdesc
0016 self.pandaid = self.jobdesc["PandaID"]
0017 self.xrsl = {}
0018 self.siteinfo = siteinfo
0019 self.ncores = siteinfo["corecount"]
0020 self.logurl = logurl
0021 self.schedulerid = schedulerid
0022
0023 self.defaults = {}
0024 self.defaults["memory"] = 2000
0025 self.defaults["cputime"] = 2 * 1440 * 60
0026 self.sitename = siteinfo["panda_resource"]
0027 self.schedconfig = pandaqueue
0028 self.truepilot = "mv" not in siteinfo["copytools"] or len(siteinfo["copytools"]) > 1
0029 self.osmap = osmap
0030
0031 self.maxwalltime = siteinfo["maxtime"] / 60
0032 if self.maxwalltime == 0:
0033 self.maxwalltime = 7 * 24 * 60
0034
0035 self.tmpdir = tmpdir
0036 self.inputfiledir = os.path.join(self.tmpdir, "inputfiles")
0037 self.inputjobdir = os.path.join(self.inputfiledir, str(self.pandaid))
0038 self.eventranges = eventranges
0039 self.longjob = False
0040 self.traces = []
0041 if len(self.pandajob) > 50000:
0042 self.longjob = True
0043 self.artes = None
0044 self.cvmfs = siteinfo["is_cvmfs"]
0045
0046
0047 if not self.truepilot and self.jobdesc.get("eventServiceMerge") == "True":
0048 if self.pandajob.startswith("GUID"):
0049 esjobdesc = self.pandajob[self.pandajob.find("&") :]
0050 else:
0051 esjobdesc = self.pandajob[: self.pandajob.find("&GUID")] + self.pandajob[self.pandajob.find("&", self.pandajob.find("&GUID") + 5) :]
0052 esjobdesc += f"&GUID={'%2C'.join([('DUMMYGUID%i' % i) for i in range(len(self.jobdesc['GUID'].split(',')))])}"
0053 self.pandajob = esjobdesc
0054
0055 def getNCores(self):
0056
0057 try:
0058 self.ncores = int(self.jobdesc.get("coreCount", 1))
0059 except BaseException:
0060 self.ncores = 1
0061 self.xrsl["count"] = "(count=%d)" % self.ncores
0062
0063
0064 if self.ncores > 1:
0065 self.xrsl["countpernode"] = "(runtimeenvironment = APPS/HEP/ATLAS-MULTICORE-1.0)"
0066 if self.sitename.find("RAL-LCG2") < 0 and self.sitename.find("TOKYO") < 0 and self.sitename.find("FZK") < 0:
0067 self.xrsl["countpernode"] = "(runtimeenvironment = APPS/HEP/ATLAS-MULTICORE-1.0)"
0068
0069 return self.ncores
0070
0071 def setJobname(self):
0072 if "jobName" in self.jobdesc:
0073 jobname = self.jobdesc["jobName"]
0074 else:
0075 jobname = "pandajob"
0076 self.xrsl["jobname"] = f'(jobname = "{jobname}")'
0077
0078 def setDisk(self):
0079 if "maxDiskCount" in self.jobdesc:
0080 disk = int(self.jobdesc["maxDiskCount"])
0081 else:
0082 disk = 500
0083
0084
0085 if "fsize" in self.jobdesc:
0086 disk += sum([int(f) for f in self.jobdesc["fsize"].split(",")]) / 1000000
0087
0088 disk += 2000
0089 self.log.debug("%s: disk space %d" % (self.pandaid, disk))
0090 self.xrsl["disk"] = "(disk = %d)" % disk
0091
0092 def setTime(self):
0093 if "maxCpuCount" in self.jobdesc:
0094 cpucount = int(self.jobdesc["maxCpuCount"])
0095
0096
0097 if cpucount == 600:
0098 cpucount = 24 * 3600
0099
0100 cpucount = int(2 * cpucount)
0101 self.log.info(f"{self.pandaid}: job maxCpuCount {cpucount}")
0102 else:
0103 cpucount = 2 * 24 * 3600
0104 self.log.info(f"{self.pandaid}: Using default maxCpuCount {cpucount}")
0105
0106 if cpucount == 0:
0107 cpucount = 2 * 24 * 3600
0108
0109
0110 try:
0111 if self.jobdesc["prodSourceLabel"] == "install":
0112 cpucount = 12 * 3600
0113 except BaseException:
0114 pass
0115
0116 if int(cpucount) <= 0:
0117 cpucount = self.defaults["cputime"]
0118
0119 walltime = int(cpucount / 60)
0120
0121
0122 walltime = max(60, walltime)
0123 walltime = min(self.maxwalltime, walltime)
0124 if self.sitename.startswith("BOINC"):
0125 walltime = min(240, walltime)
0126 cputime = self.getNCores() * walltime
0127 self.log.info("%s: walltime: %d, cputime: %d" % (self.pandaid, walltime, cputime))
0128
0129 self.xrsl["time"] = "(walltime=%d)(cputime=%d)" % (walltime, cputime)
0130
0131 def setMemory(self):
0132 if "minRamCount" in self.jobdesc:
0133 memory = int(self.jobdesc["minRamCount"])
0134 elif not self.sitename.startswith("ANALY"):
0135 memory = 4000
0136 else:
0137 memory = 2000
0138
0139 if memory <= 0:
0140 memory = self.defaults["memory"]
0141
0142
0143 if memory <= 500:
0144 memory = 500
0145
0146 if "BOINC" in self.sitename:
0147 memory = 2400
0148
0149 if self.getNCores() > 1:
0150
0151 if memory == 4000:
0152 memory = 2000
0153 else:
0154 memory = memory / self.getNCores()
0155
0156
0157 memory = int(memory - 1) / 500 * 500 + 500
0158
0159 self.xrsl["memory"] = "(memory = %d)" % (memory)
0160
0161 def setRTE(self):
0162 self.artes = ""
0163 if self.truepilot:
0164 self.xrsl["rtes"] = "(runtimeenvironment = ENV/PROXY)(runtimeenvironment = APPS/HEP/ATLAS-SITE-LCG)"
0165 return
0166 if self.siteinfo["type"] == "analysis":
0167
0168 self.xrsl["rtes"] = "(runtimeenvironment = ENV/PROXY)(runtimeenvironment = APPS/HEP/ATLAS-SITE)"
0169 return
0170 if self.cvmfs:
0171
0172 self.xrsl["rtes"] = "(runtimeenvironment = APPS/HEP/ATLAS-SITE)"
0173 return
0174
0175
0176 atlasrtes = []
0177 for package, cache in zip(self.jobdesc["swRelease"].split("\n"), self.jobdesc["homepackage"].split("\n")):
0178 if cache.find("Production") > 1 and cache.find("AnalysisTransforms") < 0:
0179 rte = package.split("-")[0].upper() + "-" + cache.split("/")[1]
0180 elif cache.find("AnalysisTransforms") != -1:
0181 rte = package.upper()
0182 res = re.match("AnalysisTransforms-(.+)_(.+)", cache)
0183 if res is not None:
0184 if res.group(1).find("AtlasProduction") != -1:
0185 rte = "ATLAS-" + res.group(2)
0186 else:
0187 rte = "ATLAS-" + res.group(1).upper() + "-" + res.group(2)
0188 else:
0189 rte = cache.replace("Atlas", "Atlas-").replace("/", "-").upper()
0190 rte = str(rte)
0191 rte = rte.replace("ATLAS-", "")
0192 rte += "-" + self.jobdesc["cmtConfig"].upper()
0193
0194 if cache.find("AnalysisTransforms") < 0:
0195 rte = rte.replace("PHYSICS-", "ATLASPHYSICS-")
0196 rte = rte.replace("PROD2-", "ATLASPROD2-")
0197 rte = rte.replace("PROD1-", "ATLASPROD1-")
0198 rte = rte.replace("DERIVATION-", "ATLASDERIVATION-")
0199 rte = rte.replace("P1HLT-", "ATLASP1HLT-")
0200 rte = rte.replace("TESTHLT-", "ATLASTESTHLT-")
0201 rte = rte.replace("CAFHLT-", "ATLASCAFHLT-")
0202 rte = rte.replace("21.0.13.1", "ATLASPRODUCTION-21.0.13.1")
0203 rte = rte.replace("21.0.20.1", "ATLASPRODUCTION-21.0.20.1")
0204 if cache.find("AnalysisTransforms") != -1:
0205 res = re.match("(21\..+)", rte)
0206 if res is not None:
0207 rte = rte.replace("21", "OFFLINE-21")
0208
0209 if rte.find("NULL") != -1:
0210 rte = "PYTHON-CVMFS-X86_64-SLC6-GCC47-OPT"
0211
0212 atlasrtes.append(rte)
0213
0214 self.xrsl["rtes"] = ""
0215 for rte in atlasrtes[-1:]:
0216 self.xrsl["rtes"] += "(runtimeenvironment = APPS/HEP/ATLAS-" + rte + ")"
0217
0218 if self.siteinfo["type"] == "analysis":
0219 self.xrsl["rtes"] += "(runtimeenvironment = ENV/PROXY)"
0220
0221 self.artes = ",".join(atlasrtes)
0222
0223 def setExecutable(self):
0224 self.xrsl["executable"] = "(executable = ARCpilot)"
0225
0226 def setArguments(self):
0227 if self.artes is None:
0228 self.setRTE()
0229
0230
0231 if self.truepilot:
0232 pargs = '"pilot3/pilot.py" "-h" "%s" "-s" "%s" "-f" "false" "-p" "25443" "-d" "{HOME}" "-w" "https://pandaserver.cern.ch"' % (
0233 self.schedconfig,
0234 self.sitename,
0235 )
0236 else:
0237 pargs = '"pilot3/pilot.py" "-h" "%s" "-s" "%s" "-F" "Nordugrid-ATLAS" "-d" "{HOME}" "-j" "false" "-f" "false" "-z" "true" "-b" "2" "-t" "false"' % (
0238 self.sitename,
0239 self.sitename,
0240 )
0241
0242 pandajobarg = self.pandajob
0243 if self.longjob:
0244 pandajobarg = "FILE"
0245 self.xrsl["arguments"] = '(arguments = "' + self.artes + '" "' + pandajobarg + '" ' + pargs + ")"
0246
0247 def setInputsES(self, inf):
0248 for f, s, i in zip(self.jobdesc["inFiles"].split(","), self.jobdesc["scopeIn"].split(","), self.jobdesc["prodDBlockToken"].split(",")):
0249 if i == "None":
0250
0251 lfn = "/".join(["rucio://rucio-lb-prod.cern.ch;rucioaccount=pilot;transferprotocol=gsiftp;cache=invariant/replicas", s, f])
0252 elif int(i) in self.osmap:
0253 lfn = "/".join([self.osmap[int(i)], f])
0254 else:
0255
0256 raise Exception(f"No OS defined in AGIS for bucket id {i}")
0257 inf[f] = lfn
0258
0259 def setInputs(self):
0260 x = ""
0261 if self.truepilot:
0262 x += '(ARCpilot "http://aipanda404.cern.ch;cache=check/data/releases/ARCpilot-true")'
0263 elif self.eventranges:
0264 x += '(ARCpilot "http://aipanda404.cern.ch;cache=check/data/releases/ARCpilot-es")'
0265 else:
0266 x += '(ARCpilot "http://aipanda404.cern.ch;cache=check/data/releases/ARCpilot")'
0267
0268 if self.jobdesc["prodSourceLabel"] == "rc_test":
0269 x += '(pilotcode.tar.gz "http://pandaserver.cern.ch:25080;cache=check/cache/pilot/pilotcode-rc.tar.gz")'
0270 else:
0271 x += '(pilotcode.tar.gz "http://pandaserver.cern.ch:25080;cache=check/cache/pilot/pilotcode-PICARD.tar.gz")'
0272
0273 if self.eventranges:
0274 x += '(ARCpilot-test.tar.gz "http://aipanda404.cern.ch;cache=check/data/releases/ARCpilot-es.tar.gz")'
0275
0276 if self.longjob:
0277 try:
0278 os.makedirs(self.inputjobdir)
0279 except BaseException:
0280 pass
0281 tmpfile = self.inputjobdir + "/pandaJobData.out"
0282 with open(tmpfile, "w") as f:
0283 f.write(self.pandajob)
0284 x += f'(pandaJobData.out "{self.inputjobdir}/pandaJobData.out")'
0285
0286 if not self.truepilot:
0287 x += f'(queuedata.pilot.json "http://pandaserver.cern.ch:25085;cache=check/cache/schedconfig/{self.schedconfig}.all.json")'
0288 if self.sitename.find("BEIJING") != -1:
0289 x += '(agis_ddmendpoints.json "/cvmfs/atlas.cern.ch/repo/sw/local/etc/agis_ddmendpoints.json")'
0290
0291 if "inFiles" in self.jobdesc and not self.truepilot:
0292 inf = {}
0293 if self.jobdesc.get("eventServiceMerge") == "True":
0294 self.setInputsES(inf)
0295
0296 for filename, scope in zip(self.jobdesc["inFiles"].split(","), self.jobdesc["scopeIn"].split(",")):
0297
0298
0299
0300
0301
0302
0303
0304
0305
0306
0307 urloptions = ";rucioaccount=pilot;transferprotocol=gsiftp;cache=invariant"
0308 ruciourl = f"rucio://rucio-lb-prod.cern.ch{urloptions}/replicas"
0309 lfn = "/".join([ruciourl, scope, filename])
0310
0311 inf[filename] = lfn
0312
0313
0314 for k, v in inf.items():
0315 x += f'({k} "{v}")'
0316
0317 if self.jobdesc.get("eventService") and self.eventranges:
0318
0319 tmpjsonfile = os.path.join(self.tmpdir, "eventranges-%d.json" % self.pandaid)
0320 jsondata = json.loads(self.eventranges)
0321 with open(tmpjsonfile, "w") as f:
0322 json.dump(jsondata, f)
0323 x += f'("eventranges.json" "{tmpjsonfile}")'
0324
0325 self.xrsl["inputfiles"] = f"(inputfiles = {x} )"
0326
0327 def setLog(self):
0328 logfile = self.jobdesc.get("logFile", "LOGFILE")
0329 self.xrsl["log"] = f"(stdout = \"{logfile.replace('.tgz', '')}\")(join = yes)"
0330
0331 def setGMLog(self):
0332 self.xrsl["gmlog"] = '(gmlog = "gmlog")'
0333 self.xrsl["rerun"] = '(rerun = "2")'
0334
0335 def setOutputs(self):
0336
0337
0338 output = '("jobSmallFiles.tgz" "")'
0339 output += '("@output.list" "")'
0340
0341
0342 output += '("output.list" "")'
0343 self.xrsl["outputs"] = f"(outputfiles = {output} )"
0344
0345 if self.truepilot:
0346 self.xrsl["outputs"] = ""
0347
0348 def setPriority(self):
0349 if "currentPriority" in self.jobdesc:
0350 prio = 50
0351 try:
0352 prio = int(self.jobdesc["currentPriority"])
0353 if prio < 1:
0354 prio = 1
0355 if prio > 0 and prio < 1001:
0356 prio = prio * 90 / 1000.0
0357 prio = int(prio)
0358 if prio > 1000 and prio < 10001:
0359 prio = 90 + (prio - 1000) / 900.0
0360 prio = int(prio)
0361 if prio > 10000:
0362 prio = 100
0363 except BaseException:
0364 pass
0365 self.xrsl["priority"] = "(priority = %d )" % prio
0366 if "wuppertalprod" in self.sitename:
0367 self.xrsl["priority"] = ""
0368
0369 def setEnvironment(self):
0370 environment = {}
0371 environment["PANDA_JSID"] = self.schedulerid
0372 if self.logurl:
0373 environment["GTAG"] = self.logurl
0374
0375
0376 if self.truepilot and self.monitorurl:
0377 environment["APFCID"] = self.pandaid
0378 environment["APFFID"] = schedid
0379 environment["APFMON"] = self.monitorurl
0380 environment["FACTORYQUEUE"] = self.sitename
0381
0382 self.xrsl["environment"] = "(environment = %s)" % "".join(['("%s" "%s")' % (k, v) for (k, v) in environment.items()])
0383
0384 def parse(self):
0385 self.setTime()
0386 self.setJobname()
0387
0388 self.setMemory()
0389 self.setRTE()
0390 self.setExecutable()
0391 self.setArguments()
0392 self.setInputs()
0393 self.setLog()
0394 self.setGMLog()
0395 self.setOutputs()
0396 self.setPriority()
0397 self.setEnvironment()
0398
0399 def getXrsl(self):
0400 x = "&"
0401 x += "\n".join([val for val in self.xrsl.values()])
0402 return x