File indexing completed on 2026-04-10 08:39:02
0001 import base64
0002 import json
0003 import re
0004 from urllib.parse import urlencode
0005
0006 from pandaserver.dataservice import DataServiceUtils
0007 from pandaserver.taskbuffer import EventServiceUtils
0008
0009
0010 TimeOutToken = "TimeOut"
0011
0012
0013
0014 SC_Success = 0
0015
0016 SC_TimeOut = 10
0017
0018 SC_NoJobs = 20
0019
0020 SC_Failed = 30
0021
0022 SC_Invalid = 50
0023
0024 SC_Role = 60
0025
0026 SC_Perms = 70
0027
0028 SC_MissKey = 80
0029
0030 SC_ProxyError = 90
0031
0032
0033
0034 class Response:
0035
0036 def __init__(self, statusCode, errorDialog=None):
0037
0038 self.data = {"StatusCode": statusCode}
0039 if errorDialog is not None:
0040 self.data["errorDialog"] = errorDialog
0041
0042
0043 def encode(self, acceptJson=False):
0044 if not acceptJson:
0045 return urlencode(self.data)
0046 else:
0047 return {"type": "json", "content": json.dumps(self.data)}
0048
0049
0050 def appendNode(self, name, value):
0051 self.data[name] = value
0052
0053
0054 def appendJob(self, job, siteMapperCache=None):
0055
0056 if EventServiceUtils.isEventServiceMerge(job):
0057 isEventServiceMerge = True
0058 else:
0059 isEventServiceMerge = False
0060
0061 self.data["PandaID"] = job.PandaID
0062
0063 self.data["prodSourceLabel"] = job.prodSourceLabel
0064
0065 self.data["swRelease"] = job.AtlasRelease
0066
0067 self.data["homepackage"] = job.homepackage
0068
0069 self.data["transformation"] = job.transformation
0070
0071 self.data["jobName"] = job.jobName
0072
0073 self.data["jobDefinitionID"] = job.jobDefinitionID
0074
0075 self.data["cloud"] = job.cloud
0076
0077 strIFiles = ""
0078 strOFiles = ""
0079 strDispatch = ""
0080 strDisToken = ""
0081 strDisTokenForOutput = ""
0082 strDestination = ""
0083 strRealDataset = ""
0084 strRealDatasetIn = ""
0085 strProdDBlock = ""
0086 strDestToken = ""
0087 strProdToken = ""
0088 strProdTokenForOutput = ""
0089 strGUID = ""
0090 strFSize = ""
0091 strCheckSum = ""
0092 strFileDestinationSE = ""
0093 strScopeIn = ""
0094 strScopeOut = ""
0095 strScopeLog = ""
0096 logFile = ""
0097 logGUID = ""
0098 ddmEndPointIn = []
0099 ddmEndPointOut = []
0100 noOutput = []
0101 siteSpec = None
0102 inDsLfnMap = {}
0103 inLFNset = set()
0104 if siteMapperCache is not None:
0105 siteMapper = siteMapperCache.get_object()
0106 siteSpec = siteMapper.getSite(job.computingSite)
0107
0108 try:
0109 job.destinationSE = siteMapper.resolveNucleus(job.destinationSE)
0110 for tmpFile in job.Files:
0111 tmpFile.destinationSE = siteMapper.resolveNucleus(tmpFile.destinationSE)
0112 except Exception:
0113 pass
0114 siteMapperCache.release_object()
0115 for file in job.Files:
0116 if file.type == "input":
0117 if file.lfn in inLFNset:
0118 pass
0119 else:
0120 inLFNset.add(file.lfn)
0121 if strIFiles != "":
0122 strIFiles += ","
0123 strIFiles += file.lfn
0124 if strDispatch != "":
0125 strDispatch += ","
0126 strDispatch += file.dispatchDBlock
0127 if strDisToken != "":
0128 strDisToken += ","
0129 strDisToken += file.dispatchDBlockToken
0130 strProdDBlock += f"{file.prodDBlock},"
0131 if not isEventServiceMerge:
0132 strProdToken += f"{file.prodDBlockToken},"
0133 else:
0134 strProdToken += f"{job.metadata[1][file.lfn]},"
0135 if strGUID != "":
0136 strGUID += ","
0137 strGUID += file.GUID
0138 strRealDatasetIn += f"{file.dataset},"
0139 strFSize += f"{file.fsize},"
0140 if file.checksum not in ["", "NULL", None]:
0141 strCheckSum += f"{file.checksum},"
0142 else:
0143 strCheckSum += f"{file.md5sum},"
0144 strScopeIn += f"{file.scope},"
0145 ddmEndPointIn.append(
0146 self.getDdmEndpoint(
0147 siteSpec,
0148 file.dispatchDBlockToken,
0149 "input",
0150 job.prodSourceLabel,
0151 job.job_label,
0152 )
0153 )
0154 if file.dataset not in inDsLfnMap:
0155 inDsLfnMap[file.dataset] = []
0156 inDsLfnMap[file.dataset].append(file.lfn)
0157 if file.type == "output" or file.type == "log":
0158 if strOFiles != "":
0159 strOFiles += ","
0160 strOFiles += file.lfn
0161 if strDestination != "":
0162 strDestination += ","
0163 strDestination += file.destinationDBlock
0164 if strRealDataset != "":
0165 strRealDataset += ","
0166 strRealDataset += file.dataset
0167 strFileDestinationSE += f"{file.destinationSE},"
0168 if file.type == "log":
0169 logFile = file.lfn
0170 logGUID = file.GUID
0171 strScopeLog = file.scope
0172 else:
0173 strScopeOut += f"{file.scope},"
0174 if strDestToken != "":
0175 strDestToken += ","
0176 strDestToken += re.sub("^ddd:", "dst:", file.destinationDBlockToken.split(",")[0])
0177 strDisTokenForOutput += f"{file.dispatchDBlockToken},"
0178 strProdTokenForOutput += f"{file.prodDBlockToken},"
0179 ddmEndPointOut.append(
0180 self.getDdmEndpoint(
0181 siteSpec,
0182 file.destinationDBlockToken.split(",")[0],
0183 "output",
0184 job.prodSourceLabel,
0185 job.job_label,
0186 )
0187 )
0188 if file.isAllowedNoOutput():
0189 noOutput.append(file.lfn)
0190
0191 self.data["inFiles"] = strIFiles
0192
0193 self.data["dispatchDblock"] = strDispatch
0194
0195 self.data["dispatchDBlockToken"] = strDisToken
0196
0197 self.data["dispatchDBlockTokenForOut"] = strDisTokenForOutput[:-1]
0198
0199 self.data["outFiles"] = strOFiles
0200
0201 self.data["destinationDblock"] = strDestination
0202
0203 self.data["destinationDBlockToken"] = strDestToken
0204
0205 self.data["prodDBlocks"] = strProdDBlock[:-1]
0206
0207 self.data["prodDBlockToken"] = strProdToken[:-1]
0208
0209 self.data["realDatasets"] = strRealDataset
0210
0211 self.data["realDatasetsIn"] = strRealDatasetIn[:-1]
0212
0213 self.data["fileDestinationSE"] = strFileDestinationSE[:-1]
0214
0215 self.data["logFile"] = logFile
0216
0217 self.data["logGUID"] = logGUID
0218
0219 self.data["jobPars"], ppSteps = job.extractMultiStepExec()
0220 if ppSteps is not None:
0221 self.data.update(ppSteps)
0222 if job.to_encode_job_params():
0223 self.data["jobPars"] = base64.b64encode(self.data["jobPars"].encode()).decode()
0224
0225 self.data["attemptNr"] = job.attemptNr
0226
0227 self.data["GUID"] = strGUID
0228
0229 self.data["checksum"] = strCheckSum[:-1]
0230
0231 self.data["fsize"] = strFSize[:-1]
0232
0233 self.data["scopeIn"] = strScopeIn[:-1]
0234 self.data["scopeOut"] = strScopeOut[:-1]
0235 self.data["scopeLog"] = strScopeLog
0236
0237 try:
0238 self.data["ddmEndPointIn"] = ",".join(ddmEndPointIn)
0239 except TypeError:
0240 self.data["ddmEndPointIn"] = ""
0241 try:
0242 self.data["ddmEndPointOut"] = ",".join(ddmEndPointOut)
0243 except TypeError:
0244 self.data["ddmEndPointOut"] = ""
0245
0246 self.data["destinationSE"] = job.destinationSE
0247
0248 self.data["prodUserID"] = job.prodUserID
0249
0250 self.data["maxCpuCount"] = job.maxCpuCount
0251
0252 self.data["minRamCount"] = job.minRamCount
0253
0254 self.data["maxDiskCount"] = job.maxDiskCount
0255
0256 if ppSteps is None or job.cmtConfig not in ["NULL", None]:
0257 self.data["cmtConfig"] = job.cmtConfig
0258 else:
0259 self.data["cmtConfig"] = ""
0260
0261 self.data["processingType"] = job.processingType
0262
0263 self.data["transferType"] = job.transferType
0264
0265 self.data["sourceSite"] = job.sourceSite
0266
0267 self.data["currentPriority"] = job.currentPriority
0268
0269 if job.lockedby == "jedi":
0270 self.data["taskID"] = job.jediTaskID
0271 else:
0272 self.data["taskID"] = job.taskID
0273
0274 if job.coreCount in ["NULL", None]:
0275 self.data["coreCount"] = 1
0276 else:
0277 self.data["coreCount"] = job.coreCount
0278
0279 self.data["jobsetID"] = job.jobsetID
0280
0281 self.data["reqID"] = job.reqID
0282
0283 self.data["nucleus"] = job.nucleus
0284
0285 self.data["maxWalltime"] = job.maxWalltime
0286
0287 self.data["resource_type"] = job.resource_type
0288
0289 if job.is_no_looping_check():
0290 self.data["loopingCheck"] = False
0291
0292 if job.is_debug_mode():
0293 self.data["debug"] = "True"
0294
0295 if EventServiceUtils.isJobCloningJob(job):
0296 self.data["cloneJob"] = EventServiceUtils.getJobCloningType(job)
0297 elif EventServiceUtils.isEventServiceJob(job) or EventServiceUtils.isJumboJob(job):
0298 self.data["eventService"] = "True"
0299
0300 self.data["prodDBlockTokenForOutput"] = strProdTokenForOutput[:-1]
0301 elif EventServiceUtils.is_fine_grained_job(job):
0302 self.data["eventService"] = "True"
0303
0304 if isEventServiceMerge:
0305 self.data["eventServiceMerge"] = "True"
0306
0307 writeToFileStr = ""
0308 try:
0309 for outputName in job.metadata[0]:
0310 inputList = job.metadata[0][outputName]
0311 writeToFileStr += f"inputFor_{outputName}:"
0312 for tmpInput in inputList:
0313 writeToFileStr += f"{tmpInput},"
0314 writeToFileStr = writeToFileStr[:-1]
0315 writeToFileStr += "^"
0316 writeToFileStr = writeToFileStr[:-1]
0317 except Exception:
0318 pass
0319 self.data["writeToFile"] = writeToFileStr
0320 elif job.writeInputToFile():
0321 try:
0322
0323 writeToFileStr = ""
0324 for inDS in inDsLfnMap:
0325 inputList = inDsLfnMap[inDS]
0326 inDS = re.sub("/$", "", inDS)
0327 inDS = inDS.split(":")[-1]
0328 writeToFileStr += f"tmpin_{inDS}:"
0329 writeToFileStr += ",".join(inputList)
0330 writeToFileStr += "^"
0331 writeToFileStr = writeToFileStr[:-1]
0332 self.data["writeToFile"] = writeToFileStr
0333 except Exception:
0334 pass
0335
0336 if EventServiceUtils.isJumboJob(job) or EventServiceUtils.isCoJumboJob(job):
0337 try:
0338 for inDS in inDsLfnMap:
0339 inputList = inDsLfnMap[inDS]
0340 inDS = re.sub("/$", "", inDS)
0341 inDS = inDS.split(":")[-1]
0342 srcStr = f"tmpin__cnt_{inDS}"
0343 dstStr = ",".join(inputList)
0344 self.data["jobPars"] = self.data["jobPars"].replace(srcStr, dstStr)
0345 except Exception:
0346 pass
0347
0348 if noOutput != []:
0349 self.data["allowNoOutput"] = ",".join(noOutput)
0350
0351 if job.getAltStgOut() is not None:
0352 self.data["altStageOut"] = job.getAltStgOut()
0353
0354 if job.putLogToOS():
0355 self.data["putLogToOS"] = "True"
0356
0357 if job.noExecStrCnv():
0358 self.data["noExecStrCnv"] = "True"
0359
0360 if job.inFilePosEvtNum():
0361 self.data["inFilePosEvtNum"] = "True"
0362
0363 if job.usePrefetcher():
0364 self.data["usePrefetcher"] = "True"
0365
0366 if job.container_name not in ["NULL", None]:
0367 self.data["container_name"] = job.container_name
0368
0369 self.data["ioIntensity"] = job.get_task_attribute("ioIntensity")
0370 self.data["ioIntensityUnit"] = job.get_task_attribute("ioIntensityUnit")
0371
0372 if job.is_hpo_workflow():
0373 self.data["isHPO"] = "True"
0374
0375 if siteSpec is not None:
0376 scope_input, scope_output = DataServiceUtils.select_scope(siteSpec, job.prodSourceLabel, job.job_label)
0377 if siteSpec.use_vp(scope_input):
0378 self.data["useVP"] = "True"
0379
0380 if job.is_on_site_merging():
0381 self.data["onSiteMerging"] = "True"
0382
0383
0384 def setProxyKey(self, proxyKey):
0385 names = ["credname", "myproxy"]
0386 for name in names:
0387 if name in proxyKey:
0388 self.data[name] = proxyKey[name]
0389 else:
0390 self.data[name] = ""
0391
0392
0393 def setPandaProxySecretKey(self, secretKey):
0394 self.data["pandaProxySecretKey"] = secretKey
0395
0396
0397 def getDdmEndpoint(self, siteSpec, spaceToken, mode, prodSourceLabel, job_label):
0398 scope_input, scope_output = DataServiceUtils.select_scope(siteSpec, prodSourceLabel, job_label)
0399 if siteSpec is None or mode not in ["input", "output"]:
0400 return ""
0401
0402 if mode == "input":
0403 connected_endpoints = siteSpec.ddm_endpoints_input.get(scope_input)
0404 elif mode == "output":
0405 connected_endpoints = siteSpec.ddm_endpoints_output.get(scope_output)
0406
0407 endPoint = DataServiceUtils.getDestinationSE(spaceToken)
0408 if endPoint and connected_endpoints and connected_endpoints.isAssociated(endPoint):
0409 return endPoint
0410
0411 endPoint = DataServiceUtils.getDistributedDestination(spaceToken)
0412 if endPoint and connected_endpoints and connected_endpoints.isAssociated(endPoint):
0413 return endPoint
0414
0415 if mode == "input":
0416 setokens = siteSpec.setokens_input.get(scope_input, [])
0417 ddm = siteSpec.ddm_input.get(scope_input)
0418 elif mode == "output":
0419 setokens = siteSpec.setokens_output.get(scope_output, [])
0420 ddm = siteSpec.ddm_output.get(scope_output)
0421 if spaceToken in setokens:
0422 return setokens[spaceToken]
0423
0424
0425 if not ddm:
0426 ddm = ""
0427
0428 return ddm
0429
0430
0431
0432 def isSecure(req):
0433 if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0434 return False
0435 return True
0436
0437
0438
0439 def getUserDN(req):
0440 try:
0441 return req.subprocess_env["SSL_CLIENT_S_DN"]
0442 except Exception:
0443 return "None"