File indexing completed on 2026-04-20 07:59:00
0001 import hashlib
0002 import sys
0003
0004 import requests
0005
0006
0007 import requests.packages.urllib3
0008
0009 try:
0010 requests.packages.urllib3.disable_warnings()
0011 except BaseException:
0012 pass
0013 from pandaharvester.harvesterconfig import harvester_config
0014 from pandaharvester.harvestercore import core_utils
0015
0016 from .base_stager import BaseStager
0017
0018
0019 baseLogger = core_utils.setup_logger("fts_stager")
0020
0021
0022
0023 class FtsStager(BaseStager):
0024
0025 def __init__(self, **kwarg):
0026 BaseStager.__init__(self, **kwarg)
0027
0028
0029 def check_stage_out_status(self, jobspec):
0030
0031 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0032 tmpLog.debug("start")
0033
0034 allChecked = True
0035 oneErrMsg = None
0036 trasnferStatus = {}
0037 for fileSpec in jobspec.outFiles:
0038
0039 transferID = fileSpec.fileAttributes["transferID"]
0040 if transferID not in trasnferStatus:
0041
0042 errMsg = None
0043 try:
0044 url = f"{self.ftsServer}/jobs/{transferID}"
0045 res = requests.get(
0046 url, timeout=self.ftsLookupTimeout, verify=self.ca_cert, cert=(harvester_config.pandacon.cert_file, harvester_config.pandacon.key_file)
0047 )
0048 if res.status_code == 200:
0049 transferData = res.json()
0050 trasnferStatus[transferID] = transferData["job_state"]
0051 tmpLog.debug(f"got {trasnferStatus[transferID]} for {transferID}")
0052 else:
0053 errMsg = f"StatusCode={res.status_code} {res.text}"
0054 except BaseException:
0055 if errMsg is None:
0056 errtype, errvalue = sys.exc_info()[:2]
0057 errMsg = f"{errtype.__name__} {errvalue}"
0058
0059 if errMsg is not None:
0060 allChecked = False
0061 tmpLog.error(f"failed to get status for {transferID} with {errMsg}")
0062
0063 trasnferStatus[transferID] = None
0064
0065 if oneErrMsg is None:
0066 oneErrMsg = errMsg
0067
0068 if trasnferStatus[transferID] == "DONE":
0069 fileSpec.status = "finished"
0070 elif trasnferStatus[transferID] in ["FAILED", "CANCELED"]:
0071 fileSpec.status = "failed"
0072 if allChecked:
0073 return True, ""
0074 else:
0075 return False, oneErrMsg
0076
0077
0078 def trigger_stage_out(self, jobspec):
0079
0080 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0081 tmpLog.debug("start")
0082
0083 tmpRetVal = (True, "")
0084
0085 files = []
0086 lfns = set()
0087 fileAttrs = jobspec.get_output_file_attributes()
0088 for fileSpec in jobspec.outFiles:
0089
0090 if fileSpec.zipFileID is not None:
0091 continue
0092
0093 if fileSpec.fileType == "es_output":
0094 srcURL = self.srcEndpointES + fileSpec.path
0095 dstURL = self.dstEndpointES + fileSpec.path
0096
0097 fileSpec.objstoreID = self.esObjStoreID
0098 else:
0099 scope = fileAttrs[fileSpec.lfn]["scope"]
0100 hash = hashlib.md5()
0101 hash.update(f"{scope}:{fileSpec.lfn}")
0102 hash_hex = hash.hexdigest()
0103 correctedscope = "/".join(scope.split("."))
0104 if fileSpec.fileType == "output":
0105 srcURL = self.srcEndpointOut + fileSpec.path
0106 dstURL = f"{self.dstEndpointOut}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0107 elif fileSpec.fileType == "log":
0108
0109 if self.srcEndpointLog is None:
0110 continue
0111 srcURL = self.srcEndpointLog + fileSpec.path
0112 dstURL = f"{self.dstEndpointLog}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0113 else:
0114 continue
0115 tmpLog.debug(f"src={srcURL} dst={dstURL}")
0116 files.append(
0117 {
0118 "sources": [srcURL],
0119 "destinations": [dstURL],
0120 }
0121 )
0122 lfns.add(fileSpec.lfn)
0123
0124 if files != []:
0125
0126 errMsg = None
0127 try:
0128 url = f"{self.ftsServer}/jobs"
0129 res = requests.post(
0130 url,
0131 json={"Files": files},
0132 timeout=self.ftsLookupTimeout,
0133 verify=self.ca_cert,
0134 cert=(harvester_config.pandacon.cert_file, harvester_config.pandacon.key_file),
0135 )
0136 if res.status_code == 200:
0137 transferData = res.json()
0138 transferID = transferData["job_id"]
0139 tmpLog.debug(f"successfully submitted id={transferID}")
0140
0141 for fileSpec in jobspec.outFiles:
0142 if fileSpec.fileAttributes is None:
0143 fileSpec.fileAttributes = {}
0144 fileSpec.fileAttributes["transferID"] = transferID
0145 else:
0146
0147 errMsg = f"StatusCode={res.status_code} {res.text}"
0148 except BaseException:
0149 if errMsg is None:
0150 errtype, errvalue = sys.exc_info()[:2]
0151 errMsg = f"{errtype.__name__} {errvalue}"
0152
0153 if errMsg is not None:
0154 tmpLog.error(f"failed to submit transfer to {url} with {errMsg}")
0155 tmpRetVal = (False, errMsg)
0156
0157 tmpLog.debug("done")
0158 return tmpRetVal
0159
0160
0161 def zip_output(self, jobspec):
0162
0163 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0164 return self.simple_zip_output(jobspec, tmpLog)