File indexing completed on 2026-04-10 08:39:07
0001 import hashlib
0002 import json
0003 import os
0004 import re
0005 import sys
0006 import uuid
0007 from http.client import HTTPSConnection
0008 from urllib.parse import urlencode
0009
0010 from pandacommon.pandautils.thread_utils import GenericThread
0011 from pandaserver.brokerage.SiteMapper import SiteMapper
0012 from pandaserver.config import panda_config
0013 from pandaserver.dataservice import DataServiceUtils
0014 from pandaserver.dataservice.DataServiceUtils import select_scope
0015 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0016 from pandaserver.userinterface.Client import baseURLSSL
0017
0018
0019 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0020 taskBuffer.init(panda_config.dbhost, panda_config.dbpasswd, nDBConnection=1, requester=requester_id)
0021
0022 siteMapper = SiteMapper(taskBuffer)
0023
0024 id = sys.argv[1]
0025 job = taskBuffer.peekJobs([id])[0]
0026
0027 if job is None:
0028 print("got None")
0029 sys.exit(0)
0030
0031 xml = """<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
0032 <!-- ATLAS file meta-data catalog -->
0033 <!DOCTYPE POOLFILECATALOG SYSTEM "InMemory">
0034 <POOLFILECATALOG>
0035 """
0036 try:
0037 att = sys.argv[2]
0038 except Exception:
0039 att = job.attemptNr
0040
0041 if job.computingSite in ["", None, "NULL"]:
0042 print("computingSite is not yet defined")
0043 sys.exit(0)
0044
0045 siteSpec = siteMapper.getSite(job.computingSite)
0046 scope_input, scope_output = select_scope(siteSpec, job.prodSourceLabel, job.job_label)
0047
0048 with open("/cvmfs/atlas.cern.ch/repo/sw/local/etc/cric_ddmendpoints.json") as f:
0049 rseDict = json.load(f)
0050
0051 hash = hashlib.md5()
0052
0053 iOut = 0
0054 outFileName = []
0055 fileDict = {}
0056 for tmpFile in job.Files:
0057 if tmpFile.type in ["output"]:
0058 if False:
0059 outFileName.append(tmpFile.lfn)
0060 if tmpFile.type in ["output", "log"]:
0061 fileList = []
0062 if False:
0063 for i in range(8):
0064 newFile = copy.copy(tmpFile)
0065 newFile.lfn += f"._00{i}"
0066 fileList.append(newFile)
0067
0068 else:
0069 fileList.append(tmpFile)
0070 iOut += 1
0071 for file in fileList:
0072 file.GUID = str(uuid.uuid4())
0073 if DataServiceUtils.getDistributedDestination(file.destinationDBlockToken) is not None:
0074 tmpSrcDDM = DataServiceUtils.getDistributedDestination(file.destinationDBlockToken)
0075 elif job.computingSite == file.destinationSE and file.destinationDBlockToken in siteSpec.setokens_output[scope_output]:
0076 tmpSrcDDM = siteSpec.setokens_output[scope_output][file.destinationDBlockToken]
0077 elif file.lfn in outFileName:
0078 tmpSrcDDM = DataServiceUtils.getDestinationSE(file.destinationDBlockToken)
0079 if tmpSrcDDM is None:
0080 tmpSrcSite = siteMapper.getSite(file.destinationSE)
0081 tmp_scope_input, tmp_scope_output = select_scope(siteSpec, job.prodSourceLabel, job.job_label)
0082 tmpSrcDDM = tmpSrcSite.ddm_output[tmp_scope_output]
0083 else:
0084 tmpSrcSite = siteMapper.getSite(job.computingSite)
0085 tmp_scope_input, tmp_scope_output = select_scope(siteSpec, job.prodSourceLabel, job.job_label)
0086 tmpSrcDDM = tmpSrcSite.ddm_output[tmp_scope_output]
0087 srm, dummy, root = rseDict[tmpSrcDDM]["aprotocols"]["w"][0]
0088 srm = re.sub("^token:[^:]+:", "", srm)
0089 srm += root
0090 srm = re.sub("/$", "", srm)
0091 hash = hashlib.md5()
0092 sl = f"{file.scope}:{file.lfn}"
0093 hash.update(sl.encode())
0094 hash_hex = hash.hexdigest()
0095 correctedscope = "/".join(file.scope.split("."))
0096 path = f"{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{file.lfn}"
0097
0098 strDDM = ""
0099 if tmpFile.type == "log":
0100 strDDM += f"<endpoint>{tmpSrcDDM}</endpoint>"
0101 strDDM += "<endpoint>CERN-PROD_LOGS</endpoint>"
0102 xml += f"""
0103 <File ID="{file.GUID}">
0104 <logical>
0105 <lfn name="{file.lfn}"/>
0106 </logical>
0107 {strDDM}
0108 <metadata att_name="surl" att_value="{srm}/{path}"/>
0109 <metadata att_name="fsize" att_value="1273400000"/>
0110 <metadata att_name="adler32" att_value="0d2a9dc9"/>
0111 </File>"""
0112 fileDict[file.lfn] = {
0113 "guid": file.GUID,
0114 "fsize": 1234,
0115 "adler32": "0d2a9dc9",
0116 "surl": f"{srm}/{path}",
0117 }
0118
0119 xml += """
0120 </POOLFILECATALOG>
0121 """
0122
0123 xml = json.dumps(fileDict)
0124
0125 meta = """<?xml version="1.0" encoding="UTF-8" standalone="no" ?> <!-- ATLAS file meta-data catalog --> <!DOCTYPE POOLFILECATALOG SYSTEM "InMemory"> <POOLFILECATALOG> <META type="string" name="size" value="82484969"/> <META type="string" name="conditionsTag" value="COMCOND-BLKPA-006-11"/> <META type="string" name="beamType" value="collisions"/> <META type="string" name="fileType" value="aod"/> <META type="string" name="autoConfiguration" value="['everything']"/> <META type="string" name="dataset" value=""/> <META type="string" name="maxEvents" value="200"/> <META type="string" name="AMITag" value="r5475"/> <META type="string" name="postInclude" value="['EventOverlayJobTransforms/Rt_override_BLKPA-006-11.py', 'EventOverlayJobTransforms/muAlign_reco.py']"/> <META type="string" name="preExec" value="['from CaloRec.CaloCellFlags import jobproperties;jobproperties.CaloCellFlags.doLArHVCorr.set_Value_and_Lock(False);muonRecFlags.writeSDOs=True', 'TriggerFlags.AODEDMSet=AODSLIM;rec.Commissioning.set_Value_and_Lock(True);jobproperties.Beam.numberOfCollisions.set_Value_and_Lock(20.0)']"/> <META type="string" name="triggerConfig" value="MCRECO:DBF:TRIGGERDBMC:325,142,266"/> <META type="string" name="preInclude" value="['EventOverlayJobTransforms/UseOracle.py', 'EventOverlayJobTransforms/custom.py', 'EventOverlayJobTransforms/recotrfpre.py']"/> <META type="string" name="geometryVersion" value="ATLAS-GEO-20-00-01"/> <META type="string" name="events"/> <META type="string" name="postExec" value="['from IOVDbSvc.CondDB import conddb"/> """
0126
0127 meta = """{ "argValues": { "AMITag": "p1815", "athenaopts": [ "--preloadlib=/cvmfs/atlas.cern.ch/repo/sw/software/x86_64-slc6-gcc47-opt/19.1.4/sw/IntelSoftware/linux/x86_64/xe2013/composer_xe_2013.3.163/compiler/lib/intel64/libintlc.so.5:/cvmfs/atlas.cern.ch/repo/sw/software/x86_64-slc6-gcc47-opt/19.1.4/sw/IntelSoftware/linux/x86_64/xe2013/composer_xe_2013.3.163/compiler/lib/intel64/libimf.so" ], "digiSeedOffset1": 8, "digiSeedOffset2": 8, "inputAODFile": [ "AOD.01598144._000008.pool.root.3" ], "jobNumber": 108, "reductionConf": [ "SUSY2" ], "runNumber": 167775 }, "cmdLine": "'/cvmfs/atlas.cern.ch/repo/sw/software/x86_64-slc6-gcc47-opt/19.1.4/AtlasDerivation/19.1.4.4/InstallArea/share/bin/Reco_tf.py' '--inputAODFile=AOD.01598144._000008.pool.root.3' '--AMITag' 'p1815' '--digiSeedOffset1' '8' '--digiSeedOffset2' '8' '--jobNumber' '108' '--outputDAODFile' '04553369._000108.pool.root.1.panda.um' '--reductionConf' 'SUSY2' '--runNumber' '167775'", "created": "2015-01-10T02:51:03", "executor": [ { "asetup": null, "errMsg": "", "exeConfig": { "inputs": [ "AOD" ], "outputs": [ "DAOD_SUSY2" ], "script": "athena.py", "substep": "a2da" }, "logfileReport": { "countSummary": { "CRITICAL": 0, "DEBUG": 1, "ERROR": 0, "FATAL": 0, "IGNORED": 0, "INFO": 6166, "UNKNOWN": 2114, "VERBOSE": 1, "WARNING": 95 }, "details": {} }, "metaData": {}, "name": "AODtoDAOD", "rc": 0, "resource": { "cpuTime": 2478, "wallTime": 2649 }, "statusOK": true, "validation": true } ], "exitAcronym": "OK", "exitCode": 0, "exitMsg": "OK", "exitMsgExtra": "", "files": { "input": [ { "dataset": null, "nentries": 5000, "subFiles": [ { "file_guid": "727E5251-EE36-1F44-8EFB-C8FBA7D9BE52", "name": "AOD.01598144._000008.pool.root.3" } ], "type": "AOD" } ], "output": [ { "argName": "SUSY2", "dataset": null, "subFiles": [ """
0128
0129 for file in job.Files:
0130 if file.type in ["output"]:
0131
0132 meta += (
0133 """{ "AODFixVersion": "", "beam_energy": [ 6500000.0 ], "beam_type": [ "collisions" ], "conditions_tag": "OFLCOND-RUN12-SDR-14", "file_guid": "%s", "file_size": 643794695, "file_type": "pool", "geometry": "ATLAS-R2-2015-01-01-00", "integrity": true, "lumi_block": [ 15 ], "name": "%s", "run_number": [ 222222 ] } ,"""
0134 % (file.GUID, file.lfn)
0135 )
0136 meta = meta[:-1]
0137 meta += """], "type": "aod" } ] }, "name": "Reco_tf", "reportVersion": "1.0.0", "resource": { "cpuUnit": "seconds", "externalsCpuTime": 24, "memUnit": "kB", "transformCpuTime": 50, "wallTime": 2782 } }"""
0138
0139 node = {}
0140 node["jobId"] = id
0141 node["state"] = "finished"
0142
0143
0144 node["pilotErrorCode"] = 2
0145 node["pilotErrorCode"] = 1099
0146
0147
0148 node["corruptedFiles"] = "4003029-1800232404-8696089-2-2.zip"
0149 node["siteName"] = "BNL_ATLAS_test"
0150 node["attemptNr"] = att
0151 node["jobMetrics"] = f"aaaaa=2 bbbb=3 alt:{','.join(outFileName)}"
0152
0153
0154 node["cpuConsumptionTime"] = 12340
0155 node["maxRSS"] = 1
0156 node["maxVMEM"] = 2
0157 node["maxSWAP"] = 3
0158 node["maxPSS"] = 5 * 1024 * 1024
0159 node["avgRSS"] = 11
0160 node["avgVMEM"] = 12
0161 node["avgSWAP"] = 13
0162 node["avgPSS"] = 14
0163 node["rateWCHAR"] = 1400
0164
0165 node["xml"] = xml
0166 url = f"{baseURLSSL}/updateJob"
0167
0168 match = re.search("[^:/]+://([^/]+)(/.+)", url)
0169 host = match.group(1)
0170 path = match.group(2)
0171
0172 if "X509_USER_PROXY" in os.environ:
0173 certKey = os.environ["X509_USER_PROXY"]
0174 else:
0175 certKey = f"/tmp/x509up_u{os.getuid()}"
0176
0177 rdata = urlencode(node)
0178
0179 conn = HTTPSConnection(host, key_file=certKey, cert_file=certKey)
0180 conn.request("POST", path, rdata)
0181 resp = conn.getresponse()
0182 data = resp.read()
0183
0184 print(data)