Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 import base64
0002 import json
0003 import re
0004 from urllib.parse import urlencode
0005 
0006 from pandaserver.dataservice import DataServiceUtils
0007 from pandaserver.taskbuffer import EventServiceUtils
0008 
0009 # constants
0010 TimeOutToken = "TimeOut"
0011 
0012 # status codes
0013 # succeeded
0014 SC_Success = 0
0015 # timeout
0016 SC_TimeOut = 10
0017 # no available jobs
0018 SC_NoJobs = 20
0019 # failed
0020 SC_Failed = 30
0021 # invalid token
0022 SC_Invalid = 50
0023 # invalid role
0024 SC_Role = 60
0025 # permission denied
0026 SC_Perms = 70
0027 # key missing
0028 SC_MissKey = 80
0029 # failure of proxy retrieval
0030 SC_ProxyError = 90
0031 
0032 
0033 # response
0034 class Response:
0035     # constructor
0036     def __init__(self, statusCode, errorDialog=None):
0037         # create data object
0038         self.data = {"StatusCode": statusCode}
0039         if errorDialog is not None:
0040             self.data["errorDialog"] = errorDialog
0041 
0042     # URL encode
0043     def encode(self, acceptJson=False):
0044         if not acceptJson:
0045             return urlencode(self.data)
0046         else:
0047             return {"type": "json", "content": json.dumps(self.data)}
0048 
0049     # append Node
0050     def appendNode(self, name, value):
0051         self.data[name] = value
0052 
0053     # append job
0054     def appendJob(self, job, siteMapperCache=None):
0055         # event service merge
0056         if EventServiceUtils.isEventServiceMerge(job):
0057             isEventServiceMerge = True
0058         else:
0059             isEventServiceMerge = False
0060         # PandaID
0061         self.data["PandaID"] = job.PandaID
0062         # prodSourceLabel
0063         self.data["prodSourceLabel"] = job.prodSourceLabel
0064         # swRelease
0065         self.data["swRelease"] = job.AtlasRelease
0066         # homepackage
0067         self.data["homepackage"] = job.homepackage
0068         # transformation
0069         self.data["transformation"] = job.transformation
0070         # job name
0071         self.data["jobName"] = job.jobName
0072         # job definition ID
0073         self.data["jobDefinitionID"] = job.jobDefinitionID
0074         # cloud
0075         self.data["cloud"] = job.cloud
0076         # files
0077         strIFiles = ""
0078         strOFiles = ""
0079         strDispatch = ""
0080         strDisToken = ""
0081         strDisTokenForOutput = ""
0082         strDestination = ""
0083         strRealDataset = ""
0084         strRealDatasetIn = ""
0085         strProdDBlock = ""
0086         strDestToken = ""
0087         strProdToken = ""
0088         strProdTokenForOutput = ""
0089         strGUID = ""
0090         strFSize = ""
0091         strCheckSum = ""
0092         strFileDestinationSE = ""
0093         strScopeIn = ""
0094         strScopeOut = ""
0095         strScopeLog = ""
0096         logFile = ""
0097         logGUID = ""
0098         ddmEndPointIn = []
0099         ddmEndPointOut = []
0100         noOutput = []
0101         siteSpec = None
0102         inDsLfnMap = {}
0103         inLFNset = set()
0104         if siteMapperCache is not None:
0105             siteMapper = siteMapperCache.get_object()
0106             siteSpec = siteMapper.getSite(job.computingSite)
0107             # resolve destSE
0108             try:
0109                 job.destinationSE = siteMapper.resolveNucleus(job.destinationSE)
0110                 for tmpFile in job.Files:
0111                     tmpFile.destinationSE = siteMapper.resolveNucleus(tmpFile.destinationSE)
0112             except Exception:
0113                 pass
0114             siteMapperCache.release_object()
0115         for file in job.Files:
0116             if file.type == "input":
0117                 if file.lfn in inLFNset:
0118                     pass
0119                 else:
0120                     inLFNset.add(file.lfn)
0121                     if strIFiles != "":
0122                         strIFiles += ","
0123                     strIFiles += file.lfn
0124                     if strDispatch != "":
0125                         strDispatch += ","
0126                     strDispatch += file.dispatchDBlock
0127                     if strDisToken != "":
0128                         strDisToken += ","
0129                     strDisToken += file.dispatchDBlockToken
0130                     strProdDBlock += f"{file.prodDBlock},"
0131                     if not isEventServiceMerge:
0132                         strProdToken += f"{file.prodDBlockToken},"
0133                     else:
0134                         strProdToken += f"{job.metadata[1][file.lfn]},"
0135                     if strGUID != "":
0136                         strGUID += ","
0137                     strGUID += file.GUID
0138                     strRealDatasetIn += f"{file.dataset},"
0139                     strFSize += f"{file.fsize},"
0140                     if file.checksum not in ["", "NULL", None]:
0141                         strCheckSum += f"{file.checksum},"
0142                     else:
0143                         strCheckSum += f"{file.md5sum},"
0144                     strScopeIn += f"{file.scope},"
0145                     ddmEndPointIn.append(
0146                         self.getDdmEndpoint(
0147                             siteSpec,
0148                             file.dispatchDBlockToken,
0149                             "input",
0150                             job.prodSourceLabel,
0151                             job.job_label,
0152                         )
0153                     )
0154                     if file.dataset not in inDsLfnMap:
0155                         inDsLfnMap[file.dataset] = []
0156                     inDsLfnMap[file.dataset].append(file.lfn)
0157             if file.type == "output" or file.type == "log":
0158                 if strOFiles != "":
0159                     strOFiles += ","
0160                 strOFiles += file.lfn
0161                 if strDestination != "":
0162                     strDestination += ","
0163                 strDestination += file.destinationDBlock
0164                 if strRealDataset != "":
0165                     strRealDataset += ","
0166                 strRealDataset += file.dataset
0167                 strFileDestinationSE += f"{file.destinationSE},"
0168                 if file.type == "log":
0169                     logFile = file.lfn
0170                     logGUID = file.GUID
0171                     strScopeLog = file.scope
0172                 else:
0173                     strScopeOut += f"{file.scope},"
0174                 if strDestToken != "":
0175                     strDestToken += ","
0176                 strDestToken += re.sub("^ddd:", "dst:", file.destinationDBlockToken.split(",")[0])
0177                 strDisTokenForOutput += f"{file.dispatchDBlockToken},"
0178                 strProdTokenForOutput += f"{file.prodDBlockToken},"
0179                 ddmEndPointOut.append(
0180                     self.getDdmEndpoint(
0181                         siteSpec,
0182                         file.destinationDBlockToken.split(",")[0],
0183                         "output",
0184                         job.prodSourceLabel,
0185                         job.job_label,
0186                     )
0187                 )
0188                 if file.isAllowedNoOutput():
0189                     noOutput.append(file.lfn)
0190         # inFiles
0191         self.data["inFiles"] = strIFiles
0192         # dispatch DBlock
0193         self.data["dispatchDblock"] = strDispatch
0194         # dispatch DBlock space token
0195         self.data["dispatchDBlockToken"] = strDisToken
0196         # dispatch DBlock space token for output
0197         self.data["dispatchDBlockTokenForOut"] = strDisTokenForOutput[:-1]
0198         # outFiles
0199         self.data["outFiles"] = strOFiles
0200         # destination DBlock
0201         self.data["destinationDblock"] = strDestination
0202         # destination DBlock space token
0203         self.data["destinationDBlockToken"] = strDestToken
0204         # prod DBlocks
0205         self.data["prodDBlocks"] = strProdDBlock[:-1]
0206         # prod DBlock space token
0207         self.data["prodDBlockToken"] = strProdToken[:-1]
0208         # real output datasets
0209         self.data["realDatasets"] = strRealDataset
0210         # real output datasets
0211         self.data["realDatasetsIn"] = strRealDatasetIn[:-1]
0212         # file's destinationSE
0213         self.data["fileDestinationSE"] = strFileDestinationSE[:-1]
0214         # log filename
0215         self.data["logFile"] = logFile
0216         # log GUID
0217         self.data["logGUID"] = logGUID
0218         # jobPars
0219         self.data["jobPars"], ppSteps = job.extractMultiStepExec()
0220         if ppSteps is not None:
0221             self.data.update(ppSteps)
0222         if job.to_encode_job_params():
0223             self.data["jobPars"] = base64.b64encode(self.data["jobPars"].encode()).decode()
0224         # attempt number
0225         self.data["attemptNr"] = job.attemptNr
0226         # GUIDs
0227         self.data["GUID"] = strGUID
0228         # checksum
0229         self.data["checksum"] = strCheckSum[:-1]
0230         # fsize
0231         self.data["fsize"] = strFSize[:-1]
0232         # scope
0233         self.data["scopeIn"] = strScopeIn[:-1]
0234         self.data["scopeOut"] = strScopeOut[:-1]
0235         self.data["scopeLog"] = strScopeLog
0236         # DDM endpoints
0237         try:
0238             self.data["ddmEndPointIn"] = ",".join(ddmEndPointIn)
0239         except TypeError:
0240             self.data["ddmEndPointIn"] = ""
0241         try:
0242             self.data["ddmEndPointOut"] = ",".join(ddmEndPointOut)
0243         except TypeError:
0244             self.data["ddmEndPointOut"] = ""
0245         # destinationSE
0246         self.data["destinationSE"] = job.destinationSE
0247         # user ID
0248         self.data["prodUserID"] = job.prodUserID
0249         # CPU count
0250         self.data["maxCpuCount"] = job.maxCpuCount
0251         # RAM count
0252         self.data["minRamCount"] = job.minRamCount
0253         # disk count
0254         self.data["maxDiskCount"] = job.maxDiskCount
0255         # cmtconfig
0256         if ppSteps is None or job.cmtConfig not in ["NULL", None]:
0257             self.data["cmtConfig"] = job.cmtConfig
0258         else:
0259             self.data["cmtConfig"] = ""
0260         # processingType
0261         self.data["processingType"] = job.processingType
0262         # transferType
0263         self.data["transferType"] = job.transferType
0264         # sourceSite
0265         self.data["sourceSite"] = job.sourceSite
0266         # current priority
0267         self.data["currentPriority"] = job.currentPriority
0268         # taskID
0269         if job.lockedby == "jedi":
0270             self.data["taskID"] = job.jediTaskID
0271         else:
0272             self.data["taskID"] = job.taskID
0273         # core count
0274         if job.coreCount in ["NULL", None]:
0275             self.data["coreCount"] = 1
0276         else:
0277             self.data["coreCount"] = job.coreCount
0278         # jobsetID
0279         self.data["jobsetID"] = job.jobsetID
0280         # requestID
0281         self.data["reqID"] = job.reqID
0282         # nucleus
0283         self.data["nucleus"] = job.nucleus
0284         # walltime
0285         self.data["maxWalltime"] = job.maxWalltime
0286         # resource type
0287         self.data["resource_type"] = job.resource_type
0288         # looping check
0289         if job.is_no_looping_check():
0290             self.data["loopingCheck"] = False
0291         # debug mode
0292         if job.is_debug_mode():
0293             self.data["debug"] = "True"
0294         # event service or job cloning or fine-grained
0295         if EventServiceUtils.isJobCloningJob(job):
0296             self.data["cloneJob"] = EventServiceUtils.getJobCloningType(job)
0297         elif EventServiceUtils.isEventServiceJob(job) or EventServiceUtils.isJumboJob(job):
0298             self.data["eventService"] = "True"
0299             # prod DBlock space token for pre-merging output
0300             self.data["prodDBlockTokenForOutput"] = strProdTokenForOutput[:-1]
0301         elif EventServiceUtils.is_fine_grained_job(job):
0302             self.data["eventService"] = "True"
0303         # event service merge
0304         if isEventServiceMerge:
0305             self.data["eventServiceMerge"] = "True"
0306             # write to file for ES merge
0307             writeToFileStr = ""
0308             try:
0309                 for outputName in job.metadata[0]:
0310                     inputList = job.metadata[0][outputName]
0311                     writeToFileStr += f"inputFor_{outputName}:"
0312                     for tmpInput in inputList:
0313                         writeToFileStr += f"{tmpInput},"
0314                     writeToFileStr = writeToFileStr[:-1]
0315                     writeToFileStr += "^"
0316                 writeToFileStr = writeToFileStr[:-1]
0317             except Exception:
0318                 pass
0319             self.data["writeToFile"] = writeToFileStr
0320         elif job.writeInputToFile():
0321             try:
0322                 # write input to file
0323                 writeToFileStr = ""
0324                 for inDS in inDsLfnMap:
0325                     inputList = inDsLfnMap[inDS]
0326                     inDS = re.sub("/$", "", inDS)
0327                     inDS = inDS.split(":")[-1]
0328                     writeToFileStr += f"tmpin_{inDS}:"
0329                     writeToFileStr += ",".join(inputList)
0330                     writeToFileStr += "^"
0331                 writeToFileStr = writeToFileStr[:-1]
0332                 self.data["writeToFile"] = writeToFileStr
0333             except Exception:
0334                 pass
0335         # replace placeholder
0336         if EventServiceUtils.isJumboJob(job) or EventServiceUtils.isCoJumboJob(job):
0337             try:
0338                 for inDS in inDsLfnMap:
0339                     inputList = inDsLfnMap[inDS]
0340                     inDS = re.sub("/$", "", inDS)
0341                     inDS = inDS.split(":")[-1]
0342                     srcStr = f"tmpin__cnt_{inDS}"
0343                     dstStr = ",".join(inputList)
0344                     self.data["jobPars"] = self.data["jobPars"].replace(srcStr, dstStr)
0345             except Exception:
0346                 pass
0347         # no output
0348         if noOutput != []:
0349             self.data["allowNoOutput"] = ",".join(noOutput)
0350         # alternative stage-out
0351         if job.getAltStgOut() is not None:
0352             self.data["altStageOut"] = job.getAltStgOut()
0353         # log to OS
0354         if job.putLogToOS():
0355             self.data["putLogToOS"] = "True"
0356         # suppress execute string conversion
0357         if job.noExecStrCnv():
0358             self.data["noExecStrCnv"] = "True"
0359         # in-file positional event number
0360         if job.inFilePosEvtNum():
0361             self.data["inFilePosEvtNum"] = "True"
0362         # use prefetcher
0363         if job.usePrefetcher():
0364             self.data["usePrefetcher"] = "True"
0365         # image name
0366         if job.container_name not in ["NULL", None]:
0367             self.data["container_name"] = job.container_name
0368         # IO
0369         self.data["ioIntensity"] = job.get_task_attribute("ioIntensity")
0370         self.data["ioIntensityUnit"] = job.get_task_attribute("ioIntensityUnit")
0371         # HPO
0372         if job.is_hpo_workflow():
0373             self.data["isHPO"] = "True"
0374         # VP
0375         if siteSpec is not None:
0376             scope_input, scope_output = DataServiceUtils.select_scope(siteSpec, job.prodSourceLabel, job.job_label)
0377             if siteSpec.use_vp(scope_input):
0378                 self.data["useVP"] = "True"
0379         # on-site merging
0380         if job.is_on_site_merging():
0381             self.data["onSiteMerging"] = "True"
0382 
0383     # set proxy key
0384     def setProxyKey(self, proxyKey):
0385         names = ["credname", "myproxy"]
0386         for name in names:
0387             if name in proxyKey:
0388                 self.data[name] = proxyKey[name]
0389             else:
0390                 self.data[name] = ""
0391 
0392     # set secret key for panda proxy
0393     def setPandaProxySecretKey(self, secretKey):
0394         self.data["pandaProxySecretKey"] = secretKey
0395 
0396     # get ddm endpoint
0397     def getDdmEndpoint(self, siteSpec, spaceToken, mode, prodSourceLabel, job_label):
0398         scope_input, scope_output = DataServiceUtils.select_scope(siteSpec, prodSourceLabel, job_label)
0399         if siteSpec is None or mode not in ["input", "output"]:
0400             return ""
0401 
0402         if mode == "input":
0403             connected_endpoints = siteSpec.ddm_endpoints_input.get(scope_input)
0404         elif mode == "output":
0405             connected_endpoints = siteSpec.ddm_endpoints_output.get(scope_output)
0406 
0407         endPoint = DataServiceUtils.getDestinationSE(spaceToken)
0408         if endPoint and connected_endpoints and connected_endpoints.isAssociated(endPoint):
0409             return endPoint
0410 
0411         endPoint = DataServiceUtils.getDistributedDestination(spaceToken)
0412         if endPoint and connected_endpoints and connected_endpoints.isAssociated(endPoint):
0413             return endPoint
0414 
0415         if mode == "input":
0416             setokens = siteSpec.setokens_input.get(scope_input, [])
0417             ddm = siteSpec.ddm_input.get(scope_input)
0418         elif mode == "output":
0419             setokens = siteSpec.setokens_output.get(scope_output, [])
0420             ddm = siteSpec.ddm_output.get(scope_output)
0421         if spaceToken in setokens:
0422             return setokens[spaceToken]
0423 
0424         # Protection against misconfigured sites
0425         if not ddm:
0426             ddm = ""
0427 
0428         return ddm
0429 
0430 
0431 # check if secure connection
0432 def isSecure(req):
0433     if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0434         return False
0435     return True
0436 
0437 
0438 # get user DN
0439 def getUserDN(req):
0440     try:
0441         return req.subprocess_env["SSL_CLIENT_S_DN"]
0442     except Exception:
0443         return "None"