Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import hashlib
0002 import sys
0003 
0004 import requests
0005 
0006 # TO BE REMOVED for python2.7
0007 import requests.packages.urllib3
0008 
0009 try:
0010     requests.packages.urllib3.disable_warnings()
0011 except BaseException:
0012     pass
0013 from pandaharvester.harvesterconfig import harvester_config
0014 from pandaharvester.harvestercore import core_utils
0015 
0016 from .base_stager import BaseStager
0017 
0018 # logger
0019 baseLogger = core_utils.setup_logger("fts_stager")
0020 
0021 
0022 # plugin for stager with FTS
0023 class FtsStager(BaseStager):
0024     # constructor
0025     def __init__(self, **kwarg):
0026         BaseStager.__init__(self, **kwarg)
0027 
0028     # check status
0029     def check_stage_out_status(self, jobspec):
0030         # make logger
0031         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0032         tmpLog.debug("start")
0033         # loop over all files
0034         allChecked = True
0035         oneErrMsg = None
0036         trasnferStatus = {}
0037         for fileSpec in jobspec.outFiles:
0038             # get transfer ID
0039             transferID = fileSpec.fileAttributes["transferID"]
0040             if transferID not in trasnferStatus:
0041                 # get status
0042                 errMsg = None
0043                 try:
0044                     url = f"{self.ftsServer}/jobs/{transferID}"
0045                     res = requests.get(
0046                         url, timeout=self.ftsLookupTimeout, verify=self.ca_cert, cert=(harvester_config.pandacon.cert_file, harvester_config.pandacon.key_file)
0047                     )
0048                     if res.status_code == 200:
0049                         transferData = res.json()
0050                         trasnferStatus[transferID] = transferData["job_state"]
0051                         tmpLog.debug(f"got {trasnferStatus[transferID]} for {transferID}")
0052                     else:
0053                         errMsg = f"StatusCode={res.status_code} {res.text}"
0054                 except BaseException:
0055                     if errMsg is None:
0056                         errtype, errvalue = sys.exc_info()[:2]
0057                         errMsg = f"{errtype.__name__} {errvalue}"
0058                 # failed
0059                 if errMsg is not None:
0060                     allChecked = False
0061                     tmpLog.error(f"failed to get status for {transferID} with {errMsg}")
0062                     # set dummy not to lookup again
0063                     trasnferStatus[transferID] = None
0064                     # keep one message
0065                     if oneErrMsg is None:
0066                         oneErrMsg = errMsg
0067             # final status
0068             if trasnferStatus[transferID] == "DONE":
0069                 fileSpec.status = "finished"
0070             elif trasnferStatus[transferID] in ["FAILED", "CANCELED"]:
0071                 fileSpec.status = "failed"
0072         if allChecked:
0073             return True, ""
0074         else:
0075             return False, oneErrMsg
0076 
0077     # trigger stage out
0078     def trigger_stage_out(self, jobspec):
0079         # make logger
0080         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0081         tmpLog.debug("start")
0082         # default return
0083         tmpRetVal = (True, "")
0084         # loop over all files
0085         files = []
0086         lfns = set()
0087         fileAttrs = jobspec.get_output_file_attributes()
0088         for fileSpec in jobspec.outFiles:
0089             # skip zipped files
0090             if fileSpec.zipFileID is not None:
0091                 continue
0092             # source and destination URLs
0093             if fileSpec.fileType == "es_output":
0094                 srcURL = self.srcEndpointES + fileSpec.path
0095                 dstURL = self.dstEndpointES + fileSpec.path
0096                 # set OS ID
0097                 fileSpec.objstoreID = self.esObjStoreID
0098             else:
0099                 scope = fileAttrs[fileSpec.lfn]["scope"]
0100                 hash = hashlib.md5()
0101                 hash.update(f"{scope}:{fileSpec.lfn}")
0102                 hash_hex = hash.hexdigest()
0103                 correctedscope = "/".join(scope.split("."))
0104                 if fileSpec.fileType == "output":
0105                     srcURL = self.srcEndpointOut + fileSpec.path
0106                     dstURL = f"{self.dstEndpointOut}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0107                 elif fileSpec.fileType == "log":
0108                     # skip if no endpoint
0109                     if self.srcEndpointLog is None:
0110                         continue
0111                     srcURL = self.srcEndpointLog + fileSpec.path
0112                     dstURL = f"{self.dstEndpointLog}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0113                 else:
0114                     continue
0115             tmpLog.debug(f"src={srcURL} dst={dstURL}")
0116             files.append(
0117                 {
0118                     "sources": [srcURL],
0119                     "destinations": [dstURL],
0120                 }
0121             )
0122             lfns.add(fileSpec.lfn)
0123         # submit
0124         if files != []:
0125             # get status
0126             errMsg = None
0127             try:
0128                 url = f"{self.ftsServer}/jobs"
0129                 res = requests.post(
0130                     url,
0131                     json={"Files": files},
0132                     timeout=self.ftsLookupTimeout,
0133                     verify=self.ca_cert,
0134                     cert=(harvester_config.pandacon.cert_file, harvester_config.pandacon.key_file),
0135                 )
0136                 if res.status_code == 200:
0137                     transferData = res.json()
0138                     transferID = transferData["job_id"]
0139                     tmpLog.debug(f"successfully submitted id={transferID}")
0140                     # set
0141                     for fileSpec in jobspec.outFiles:
0142                         if fileSpec.fileAttributes is None:
0143                             fileSpec.fileAttributes = {}
0144                         fileSpec.fileAttributes["transferID"] = transferID
0145                 else:
0146                     # HTTP error
0147                     errMsg = f"StatusCode={res.status_code} {res.text}"
0148             except BaseException:
0149                 if errMsg is None:
0150                     errtype, errvalue = sys.exc_info()[:2]
0151                     errMsg = f"{errtype.__name__} {errvalue}"
0152             # failed
0153             if errMsg is not None:
0154                 tmpLog.error(f"failed to submit transfer to {url} with {errMsg}")
0155                 tmpRetVal = (False, errMsg)
0156         # return
0157         tmpLog.debug("done")
0158         return tmpRetVal
0159 
0160     # zip output files
0161     def zip_output(self, jobspec):
0162         # make logger
0163         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0164         return self.simple_zip_output(jobspec, tmpLog)