File indexing completed on 2026-04-20 07:59:00
0001 import hashlib
0002 import os
0003 import os.path
0004 import sys
0005 import zipfile
0006
0007
0008 import requests.packages.urllib3
0009 from globus_sdk import (
0010 NativeAppAuthClient,
0011 RefreshTokenAuthorizer,
0012 TransferClient,
0013 TransferData,
0014 )
0015
0016 try:
0017 requests.packages.urllib3.disable_warnings()
0018 except BaseException:
0019 pass
0020 from pandaharvester.harvesterconfig import harvester_config
0021 from pandaharvester.harvestercore import core_utils
0022 from pandaharvester.harvestercore.plugin_base import PluginBase
0023 from pandaharvester.harvestermisc import globus_utils
0024 from pandaharvester.harvestermover import mover_utils
0025
0026
0027 _logger = core_utils.setup_logger("go_stager")
0028
0029
0030 def dump(obj):
0031 for attr in dir(obj):
0032 if hasattr(obj, attr):
0033 print(f"obj.{attr} = {getattr(obj, attr)}")
0034
0035
0036
0037 class GlobusStager(PluginBase):
0038
0039 def __init__(self, **kwarg):
0040 PluginBase.__init__(self, **kwarg)
0041
0042 tmpLog = self.make_logger(_logger, method_name="GlobusStager __init__ ")
0043 try:
0044 self.tc = None
0045
0046 tmpLog.debug("about to call dbInterface.get_cache(globus_secret)")
0047 c_data = self.dbInterface.get_cache("globus_secret")
0048 if (c_data is not None) and c_data.data["StatusCode"] == 0:
0049 tmpLog.debug("Got the globus_secrets from PanDA")
0050 self.client_id = c_data.data["publicKey"]
0051 self.refresh_token = c_data.data["privateKey"]
0052 tmpStat, self.tc = globus_utils.create_globus_transfer_client(tmpLog, self.client_id, self.refresh_token)
0053 if not tmpStat:
0054 self.tc = None
0055 errStr = "failed to create Globus Transfer Client"
0056 tmpLog.error(errStr)
0057 else:
0058 self.client_id = None
0059 self.refresh_token = None
0060 self.tc = None
0061 errStr = "failed to get Globus Client ID and Refresh Token"
0062 tmpLog.error(errStr)
0063 except BaseException:
0064 core_utils.dump_error_message(tmpLog)
0065
0066
0067 def set_FileSpec_status(self, jobspec, status):
0068
0069 for fileSpec in jobspec.outFiles:
0070 fileSpec.status = status
0071
0072
0073 def check_stage_out_status(self, jobspec):
0074
0075 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0076 tmpLog.debug("start")
0077
0078 label = self.make_label(jobspec)
0079 tmpLog.debug(f"label={label}")
0080
0081 tmpStat, transferTasks = globus_utils.get_transfer_tasks(tmpLog, self.tc, label)
0082
0083 if not tmpStat:
0084 errStr = "failed to get transfer task"
0085 tmpLog.error(errStr)
0086 return None, errStr
0087
0088 if label not in transferTasks:
0089 errStr = "transfer task is missing"
0090 tmpLog.error(errStr)
0091 return False, errStr
0092
0093 transferID = transferTasks[label]["task_id"]
0094 if transferTasks[label]["status"] == "SUCCEEDED":
0095 tmpLog.debug(f"transfer task {transferID} succeeded")
0096 self.set_FileSpec_status(jobspec, "finished")
0097 return True, ""
0098
0099 if transferTasks[label]["status"] == "FAILED":
0100 errStr = f"transfer task {transferID} failed"
0101 tmpLog.error(errStr)
0102 self.set_FileSpec_status(jobspec, "failed")
0103 return False, errStr
0104
0105 tmpStr = f"transfer task {transferID} status: {transferTasks[label]['status']}"
0106 tmpLog.debug(tmpStr)
0107 return None, ""
0108
0109
0110
0111 def trigger_stage_out(self, jobspec):
0112
0113 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0114 tmpLog.debug("start")
0115
0116 tmpRetVal = (True, "")
0117
0118 if jobspec.computingSite is None:
0119
0120 tmpLog.error("jobspec.computingSite is not defined")
0121 return False, "jobspec.computingSite is not defined"
0122 else:
0123 tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0124
0125 if not self.tc:
0126 errStr = "failed to get Globus Transfer Client"
0127 tmpLog.error(errStr)
0128 return False, errStr
0129
0130 label = self.make_label(jobspec)
0131 tmpLog.debug(f"label={label}")
0132
0133 tmpStat, transferTasks = globus_utils.get_transfer_tasks(tmpLog, self.tc, label)
0134 if not tmpStat:
0135 errStr = "failed to get transfer tasks"
0136 tmpLog.error(errStr)
0137 return False, errStr
0138
0139 if label in transferTasks:
0140 tmpLog.debug(f"skip since already queued with {str(transferTasks[label])}")
0141 return True, ""
0142
0143 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0144
0145 queueConfigMapper = QueueConfigMapper()
0146 queueConfig = queueConfigMapper.get_queue(jobspec.computingSite)
0147
0148 self.srcEndpoint = queueConfig.stager["srcEndpoint"]
0149 self.Globus_srcPath = self.basePath
0150 self.Globus_dstPath = queueConfig.stager["Globus_dstPath"]
0151 self.dstEndpoint = queueConfig.stager["dstEndpoint"]
0152
0153 errMsg = None
0154 try:
0155
0156 tmpStatsrc, srcStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.srcEndpoint)
0157 tmpStatdst, dstStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.dstEndpoint)
0158 if tmpStatsrc and tmpStatdst:
0159 errStr = "source Endpoint and destination Endpoint activated"
0160 tmpLog.debug(errStr)
0161 else:
0162 errMsg = ""
0163 if not tmpStatsrc:
0164 errMsg += " source Endpoint not activated "
0165 if not tmpStatdst:
0166 errMsg += " destination Endpoint not activated "
0167 tmpLog.error(errMsg)
0168 tmpRetVal = (False, errMsg)
0169 return tmpRetVal
0170
0171 tdata = TransferData(self.tc, self.srcEndpoint, self.dstEndpoint, label=label, sync_level="checksum")
0172 except BaseException:
0173 errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0174 tmpRetVal = (errStat, errMsg)
0175 return tmpRetVal
0176
0177 fileAttrs = jobspec.get_output_file_attributes()
0178 lfns = []
0179 for fileSpec in jobspec.outFiles:
0180 scope = fileAttrs[fileSpec.lfn]["scope"]
0181 hash = hashlib.md5()
0182 hash.update(f"{scope}:{fileSpec.lfn}")
0183 hash_hex = hash.hexdigest()
0184 correctedscope = "/".join(scope.split("."))
0185 srcURL = fileSpec.path
0186 dstURL = f"{self.Globus_dstPath}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0187 tmpLog.debug(f"src={srcURL} dst={dstURL}")
0188
0189 if os.access(srcURL, os.R_OK):
0190 tmpLog.debug(f"tdata.add_item({srcURL},{dstURL})")
0191 tdata.add_item(srcURL, dstURL)
0192 lfns.append(fileSpec.lfn)
0193 else:
0194 errMsg = f"source file {srcURL} does not exist"
0195 tmpLog.error(errMsg)
0196 tmpRetVal = (False, errMsg)
0197 return tmpRetVal
0198
0199 try:
0200 transfer_result = self.tc.submit_transfer(tdata)
0201
0202 tmpLog.debug(str(transfer_result))
0203 if transfer_result["code"] == "Accepted":
0204
0205
0206 transferID = transfer_result["task_id"]
0207 tmpLog.debug(f"successfully submitted id={transferID}")
0208 jobspec.set_groups_to_files({transferID: {"lfns": lfns, "groupStatus": "active"}})
0209
0210 for fileSpec in jobspec.outFiles:
0211 if fileSpec.fileAttributes is None:
0212 fileSpec.fileAttributes = {}
0213 fileSpec.fileAttributes["transferID"] = transferID
0214 else:
0215 tmpRetVal = (False, transfer_result["message"])
0216 except Exception as e:
0217 errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0218 if errMsg is None:
0219 errtype, errvalue = sys.exc_info()[:2]
0220 errMsg = f"{errtype.__name__} {errvalue}"
0221 tmpRetVal = (errStat, errMsg)
0222
0223 tmpLog.debug("done")
0224 return tmpRetVal
0225
0226
0227 def zip_output(self, jobspec):
0228
0229 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0230 tmpLog.debug("start")
0231 try:
0232 for fileSpec in jobspec.outFiles:
0233 if self.zipDir == "${SRCDIR}":
0234
0235 zipDir = os.path.dirname(next(iter(fileSpec.associatedFiles)).path)
0236 else:
0237 zipDir = self.zipDir
0238 zipPath = os.path.join(zipDir, fileSpec.lfn)
0239
0240 try:
0241 os.remove(zipPath)
0242 except BaseException:
0243 pass
0244
0245 with zipfile.ZipFile(zipPath, "w", zipfile.ZIP_STORED) as zf:
0246 for assFileSpec in fileSpec.associatedFiles:
0247 zf.write(assFileSpec.path, os.path.basename(assFileSpec.path))
0248
0249 fileSpec.path = zipPath
0250
0251 statInfo = os.stat(zipPath)
0252 fileSpec.fsize = statInfo.st_size
0253 except BaseException:
0254 errMsg = core_utils.dump_error_message(tmpLog)
0255 return False, f"failed to zip with {errMsg}"
0256 tmpLog.debug("done")
0257 return True, ""
0258
0259
0260 def make_label(self, jobspec):
0261 return f"OUT-{jobspec.computingSite}-{jobspec.PandaID}"
0262
0263
0264 def resolve_input_paths(self, jobspec):
0265
0266 inFiles = jobspec.get_input_file_attributes()
0267
0268 for inLFN, inFile in inFiles.items():
0269 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0270
0271 jobspec.set_input_file_paths(inFiles)
0272 return True, ""