Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import hashlib
0002 import os
0003 import os.path
0004 import sys
0005 import zipfile
0006 
0007 # TO BE REMOVED for python2.7
0008 import requests.packages.urllib3
0009 from globus_sdk import (
0010     NativeAppAuthClient,
0011     RefreshTokenAuthorizer,
0012     TransferClient,
0013     TransferData,
0014 )
0015 
0016 try:
0017     requests.packages.urllib3.disable_warnings()
0018 except BaseException:
0019     pass
0020 from pandaharvester.harvesterconfig import harvester_config
0021 from pandaharvester.harvestercore import core_utils
0022 from pandaharvester.harvestercore.plugin_base import PluginBase
0023 from pandaharvester.harvestermisc import globus_utils
0024 from pandaharvester.harvestermover import mover_utils
0025 
0026 # logger
0027 _logger = core_utils.setup_logger("go_stager")
0028 
0029 
0030 def dump(obj):
0031     for attr in dir(obj):
0032         if hasattr(obj, attr):
0033             print(f"obj.{attr} = {getattr(obj, attr)}")
0034 
0035 
0036 # plugin for stager with FTS
0037 class GlobusStager(PluginBase):
0038     # constructor
0039     def __init__(self, **kwarg):
0040         PluginBase.__init__(self, **kwarg)
0041         # create Globus Transfer Client
0042         tmpLog = self.make_logger(_logger, method_name="GlobusStager __init__ ")
0043         try:
0044             self.tc = None
0045             # need to get client_id and refresh_token from PanDA server via harvester cache mechanism
0046             tmpLog.debug("about to call dbInterface.get_cache(globus_secret)")
0047             c_data = self.dbInterface.get_cache("globus_secret")
0048             if (c_data is not None) and c_data.data["StatusCode"] == 0:
0049                 tmpLog.debug("Got the globus_secrets from PanDA")
0050                 self.client_id = c_data.data["publicKey"]  # client_id
0051                 self.refresh_token = c_data.data["privateKey"]  # refresh_token
0052                 tmpStat, self.tc = globus_utils.create_globus_transfer_client(tmpLog, self.client_id, self.refresh_token)
0053                 if not tmpStat:
0054                     self.tc = None
0055                     errStr = "failed to create Globus Transfer Client"
0056                     tmpLog.error(errStr)
0057             else:
0058                 self.client_id = None
0059                 self.refresh_token = None
0060                 self.tc = None
0061                 errStr = "failed to get Globus Client ID and Refresh Token"
0062                 tmpLog.error(errStr)
0063         except BaseException:
0064             core_utils.dump_error_message(tmpLog)
0065 
0066     # set FileSpec.status
0067     def set_FileSpec_status(self, jobspec, status):
0068         # loop over all output files
0069         for fileSpec in jobspec.outFiles:
0070             fileSpec.status = status
0071 
0072     # check status
0073     def check_stage_out_status(self, jobspec):
0074         # make logger
0075         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0076         tmpLog.debug("start")
0077         # get label
0078         label = self.make_label(jobspec)
0079         tmpLog.debug(f"label={label}")
0080         # get transfer task
0081         tmpStat, transferTasks = globus_utils.get_transfer_tasks(tmpLog, self.tc, label)
0082         # return a temporary error when failed to get task
0083         if not tmpStat:
0084             errStr = "failed to get transfer task"
0085             tmpLog.error(errStr)
0086             return None, errStr
0087         # return a fatal error when task is missing # FIXME retry instead?
0088         if label not in transferTasks:
0089             errStr = "transfer task is missing"
0090             tmpLog.error(errStr)
0091             return False, errStr
0092         # succeeded
0093         transferID = transferTasks[label]["task_id"]
0094         if transferTasks[label]["status"] == "SUCCEEDED":
0095             tmpLog.debug(f"transfer task {transferID} succeeded")
0096             self.set_FileSpec_status(jobspec, "finished")
0097             return True, ""
0098         # failed
0099         if transferTasks[label]["status"] == "FAILED":
0100             errStr = f"transfer task {transferID} failed"
0101             tmpLog.error(errStr)
0102             self.set_FileSpec_status(jobspec, "failed")
0103             return False, errStr
0104         # another status
0105         tmpStr = f"transfer task {transferID} status: {transferTasks[label]['status']}"
0106         tmpLog.debug(tmpStr)
0107         return None, ""
0108 
0109     # trigger stage out
0110 
0111     def trigger_stage_out(self, jobspec):
0112         # make logger
0113         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0114         tmpLog.debug("start")
0115         # default return
0116         tmpRetVal = (True, "")
0117         # check that jobspec.computingSite is defined
0118         if jobspec.computingSite is None:
0119             # not found
0120             tmpLog.error("jobspec.computingSite is not defined")
0121             return False, "jobspec.computingSite is not defined"
0122         else:
0123             tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0124         # test we have a Globus Transfer Client
0125         if not self.tc:
0126             errStr = "failed to get Globus Transfer Client"
0127             tmpLog.error(errStr)
0128             return False, errStr
0129         # get label
0130         label = self.make_label(jobspec)
0131         tmpLog.debug(f"label={label}")
0132         # get transfer tasks
0133         tmpStat, transferTasks = globus_utils.get_transfer_tasks(tmpLog, self.tc, label)
0134         if not tmpStat:
0135             errStr = "failed to get transfer tasks"
0136             tmpLog.error(errStr)
0137             return False, errStr
0138         # check if already queued
0139         if label in transferTasks:
0140             tmpLog.debug(f"skip since already queued with {str(transferTasks[label])}")
0141             return True, ""
0142         # set the Globus destination Endpoint id and path will get them from Agis eventually
0143         from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0144 
0145         queueConfigMapper = QueueConfigMapper()
0146         queueConfig = queueConfigMapper.get_queue(jobspec.computingSite)
0147         # self.Globus_srcPath = queueConfig.stager['Globus_srcPath']
0148         self.srcEndpoint = queueConfig.stager["srcEndpoint"]
0149         self.Globus_srcPath = self.basePath
0150         self.Globus_dstPath = queueConfig.stager["Globus_dstPath"]
0151         self.dstEndpoint = queueConfig.stager["dstEndpoint"]
0152         # Test the endpoints and create the transfer data class
0153         errMsg = None
0154         try:
0155             # Test endpoints for activation
0156             tmpStatsrc, srcStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.srcEndpoint)
0157             tmpStatdst, dstStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.dstEndpoint)
0158             if tmpStatsrc and tmpStatdst:
0159                 errStr = "source Endpoint and destination Endpoint activated"
0160                 tmpLog.debug(errStr)
0161             else:
0162                 errMsg = ""
0163                 if not tmpStatsrc:
0164                     errMsg += " source Endpoint not activated "
0165                 if not tmpStatdst:
0166                     errMsg += " destination Endpoint not activated "
0167                 tmpLog.error(errMsg)
0168                 tmpRetVal = (False, errMsg)
0169                 return tmpRetVal
0170             # both endpoints activated now prepare to transfer data
0171             tdata = TransferData(self.tc, self.srcEndpoint, self.dstEndpoint, label=label, sync_level="checksum")
0172         except BaseException:
0173             errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0174             tmpRetVal = (errStat, errMsg)
0175             return tmpRetVal
0176         # loop over all files
0177         fileAttrs = jobspec.get_output_file_attributes()
0178         lfns = []
0179         for fileSpec in jobspec.outFiles:
0180             scope = fileAttrs[fileSpec.lfn]["scope"]
0181             hash = hashlib.md5()
0182             hash.update(f"{scope}:{fileSpec.lfn}")
0183             hash_hex = hash.hexdigest()
0184             correctedscope = "/".join(scope.split("."))
0185             srcURL = fileSpec.path
0186             dstURL = f"{self.Globus_dstPath}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0187             tmpLog.debug(f"src={srcURL} dst={dstURL}")
0188             # add files to transfer object - tdata
0189             if os.access(srcURL, os.R_OK):
0190                 tmpLog.debug(f"tdata.add_item({srcURL},{dstURL})")
0191                 tdata.add_item(srcURL, dstURL)
0192                 lfns.append(fileSpec.lfn)
0193             else:
0194                 errMsg = f"source file {srcURL} does not exist"
0195                 tmpLog.error(errMsg)
0196                 tmpRetVal = (False, errMsg)
0197                 return tmpRetVal
0198         # submit transfer
0199         try:
0200             transfer_result = self.tc.submit_transfer(tdata)
0201             # check status code and message
0202             tmpLog.debug(str(transfer_result))
0203             if transfer_result["code"] == "Accepted":
0204                 # succeeded
0205                 # set transfer ID which are used for later lookup
0206                 transferID = transfer_result["task_id"]
0207                 tmpLog.debug(f"successfully submitted id={transferID}")
0208                 jobspec.set_groups_to_files({transferID: {"lfns": lfns, "groupStatus": "active"}})
0209                 # set
0210                 for fileSpec in jobspec.outFiles:
0211                     if fileSpec.fileAttributes is None:
0212                         fileSpec.fileAttributes = {}
0213                         fileSpec.fileAttributes["transferID"] = transferID
0214             else:
0215                 tmpRetVal = (False, transfer_result["message"])
0216         except Exception as e:
0217             errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0218             if errMsg is None:
0219                 errtype, errvalue = sys.exc_info()[:2]
0220                 errMsg = f"{errtype.__name__} {errvalue}"
0221             tmpRetVal = (errStat, errMsg)
0222         # return
0223         tmpLog.debug("done")
0224         return tmpRetVal
0225 
0226     # zip output files
0227     def zip_output(self, jobspec):
0228         # make logger
0229         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0230         tmpLog.debug("start")
0231         try:
0232             for fileSpec in jobspec.outFiles:
0233                 if self.zipDir == "${SRCDIR}":
0234                     # the same directory as src
0235                     zipDir = os.path.dirname(next(iter(fileSpec.associatedFiles)).path)
0236                 else:
0237                     zipDir = self.zipDir
0238                 zipPath = os.path.join(zipDir, fileSpec.lfn)
0239                 # remove zip file just in case
0240                 try:
0241                     os.remove(zipPath)
0242                 except BaseException:
0243                     pass
0244                 # make zip file
0245                 with zipfile.ZipFile(zipPath, "w", zipfile.ZIP_STORED) as zf:
0246                     for assFileSpec in fileSpec.associatedFiles:
0247                         zf.write(assFileSpec.path, os.path.basename(assFileSpec.path))
0248                 # set path
0249                 fileSpec.path = zipPath
0250                 # get size
0251                 statInfo = os.stat(zipPath)
0252                 fileSpec.fsize = statInfo.st_size
0253         except BaseException:
0254             errMsg = core_utils.dump_error_message(tmpLog)
0255             return False, f"failed to zip with {errMsg}"
0256         tmpLog.debug("done")
0257         return True, ""
0258 
0259     # make label for transfer task
0260     def make_label(self, jobspec):
0261         return f"OUT-{jobspec.computingSite}-{jobspec.PandaID}"
0262 
0263     # resolve input file paths
0264     def resolve_input_paths(self, jobspec):
0265         # get input files
0266         inFiles = jobspec.get_input_file_attributes()
0267         # set path to each file
0268         for inLFN, inFile in inFiles.items():
0269             inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0270         # set
0271         jobspec.set_input_file_paths(inFiles)
0272         return True, ""