File indexing completed on 2026-04-20 07:59:00
0001 import datetime
0002 import hashlib
0003 import os
0004 import os.path
0005 import string
0006 import sys
0007 import tarfile
0008 import threading
0009 import time
0010 import uuid
0011
0012
0013 import requests.packages.urllib3
0014 from globus_sdk import (
0015 NativeAppAuthClient,
0016 RefreshTokenAuthorizer,
0017 TransferClient,
0018 TransferData,
0019 )
0020
0021 try:
0022 requests.packages.urllib3.disable_warnings()
0023 except BaseException:
0024 pass
0025 from pandaharvester.harvesterconfig import harvester_config
0026 from pandaharvester.harvestercore import core_utils
0027 from pandaharvester.harvestercore.plugin_base import PluginBase
0028 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0029 from pandaharvester.harvestermisc import globus_utils
0030 from pandaharvester.harvestermover import mover_utils
0031 from pandaharvester.harvesterstager.base_stager import BaseStager
0032
0033
0034 dummy_transfer_id_base = "dummy_id_for_out"
0035
0036 uLock = threading.Lock()
0037
0038
0039 uID = 0
0040
0041
0042 _logger = core_utils.setup_logger("go_bulk_stager")
0043
0044
0045 def dump(obj):
0046 for attr in dir(obj):
0047 if hasattr(obj, attr):
0048 print(f"obj.{attr} = {getattr(obj, attr)}")
0049
0050
0051 def validate_transferid(transferid):
0052 tmptransferid = transferid.replace("-", "")
0053 return all(c in string.hexdigits for c in tmptransferid)
0054
0055
0056
0057
0058 class GlobusBulkStager(BaseStager):
0059 next_id = 0
0060
0061
0062 def __init__(self, **kwarg):
0063 PluginBase.__init__(self, **kwarg)
0064
0065 tmpLog = self.make_logger(_logger, f"ThreadID={threading.current_thread().ident}", method_name="GlobusBulkStager __init__ ")
0066 tmpLog.debug("start")
0067 self.EventServicejob = False
0068 self.pathConvention = None
0069 self.id = GlobusBulkStager.next_id
0070 self.changeFileStatusOnSuccess = True
0071 GlobusBulkStager.next_id += 1
0072 with uLock:
0073 global uID
0074
0075 self.dummy_transfer_id = f"{dummy_transfer_id_base}_XXXX"
0076 uID += 1
0077 uID %= harvester_config.stager.nThreads
0078
0079 try:
0080 self.tc = None
0081
0082 tmpLog.debug("about to call dbInterface.get_cache(globus_secret)")
0083 c_data = self.dbInterface.get_cache("globus_secret")
0084 if (c_data is not None) and c_data.data["StatusCode"] == 0:
0085 tmpLog.debug("Got the globus_secrets from PanDA")
0086 self.client_id = c_data.data["publicKey"]
0087 self.refresh_token = c_data.data["privateKey"]
0088 tmpStat, self.tc = globus_utils.create_globus_transfer_client(tmpLog, self.client_id, self.refresh_token)
0089 if not tmpStat:
0090 self.tc = None
0091 errStr = "failed to create Globus Transfer Client"
0092 tmpLog.error(errStr)
0093 else:
0094 self.client_id = None
0095 self.refresh_token = None
0096 self.tc = None
0097 errStr = "failed to get Globus Client ID and Refresh Token"
0098 tmpLog.error(errStr)
0099 except BaseException:
0100 core_utils.dump_error_message(tmpLog)
0101 tmpLog.debug("__init__ finish")
0102
0103
0104
0105 def get_dummy_transfer_id(self):
0106 return self.dummy_transfer_id
0107
0108
0109 def set_dummy_transfer_id_testing(self, dummy_transfer_id):
0110 self.dummy_transfer_id = dummy_transfer_id
0111
0112
0113 def set_FileSpec_objstoreID(self, jobspec, objstoreID, pathConvention):
0114
0115 for fileSpec in jobspec.outFiles:
0116 fileSpec.objstoreID = objstoreID
0117 fileSpec.pathConvention = pathConvention
0118
0119
0120 def set_FileSpec_status(self, jobspec, status):
0121
0122 for fileSpec in jobspec.outFiles:
0123 fileSpec.status = status
0124
0125
0126 def check_stage_out_status(self, jobspec):
0127
0128 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID} ThreadID={threading.current_thread().ident}", method_name="check_stage_out_status")
0129 tmpLog.debug("start")
0130
0131 tmpRetVal = (True, "")
0132
0133 if jobspec.computingSite is None:
0134
0135 tmpLog.error("jobspec.computingSite is not defined")
0136 return False, "jobspec.computingSite is not defined"
0137 else:
0138 tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0139
0140 tmpLog.debug(f"self.dummy_transfer_id = {self.dummy_transfer_id}")
0141 if self.dummy_transfer_id == f"{dummy_transfer_id_base}_XXXX":
0142 old_dummy_transfer_id = self.dummy_transfer_id
0143 self.dummy_transfer_id = f"{dummy_transfer_id_base}_{jobspec.computingSite}"
0144 tmpLog.debug(f"Change self.dummy_transfer_id from {old_dummy_transfer_id} to {self.dummy_transfer_id}")
0145
0146 have_db_lock = False
0147
0148 queueConfigMapper = QueueConfigMapper()
0149 queueConfig = queueConfigMapper.get_queue(jobspec.computingSite)
0150
0151 if "jobtype" in queueConfig.stager:
0152 if queueConfig.stager["jobtype"] == "EventService":
0153 self.EventServicejob = True
0154 tmpLog.debug("Setting job type to EventService")
0155
0156 if queueConfig.stager["jobtype"] == "Yoda":
0157 self.EventServicejob = True
0158 tmpLog.debug("Setting job type to EventService")
0159
0160
0161 self.objstoreID = int(queueConfig.stager["objStoreID_ES"])
0162 if self.EventServicejob:
0163 self.pathConvention = int(queueConfig.stager["pathConvention"])
0164 tmpLog.debug(f"EventService Job - PandaID = {jobspec.PandaID} objstoreID = {self.objstoreID} pathConvention ={self.pathConvention}")
0165 else:
0166 self.pathConvention = None
0167 tmpLog.debug(f"PandaID = {jobspec.PandaID} objstoreID = {self.objstoreID}")
0168
0169 if not self.tc:
0170 errStr = "failed to get Globus Transfer Client"
0171 tmpLog.error(errStr)
0172 return False, errStr
0173
0174 transferID = None
0175
0176 outfileattrib = jobspec.get_output_file_attributes()
0177
0178 groups = jobspec.get_groups_of_output_files()
0179 tmpLog.debug(f"jobspec.get_groups_of_output_files() = : {groups}")
0180
0181 for dummy_transferID in groups:
0182 if validate_transferid(dummy_transferID):
0183 continue
0184
0185 tmpLog.debug(f"attempt to set DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0186 have_db_lock = self.dbInterface.get_object_lock(dummy_transferID, lock_interval=120)
0187 if not have_db_lock:
0188
0189 msgStr = "escape since locked by another thread"
0190 tmpLog.debug(msgStr)
0191 return None, msgStr
0192
0193 tmpLog.debug("self.dbInterface.refresh_file_group_info(jobspec)")
0194 self.dbInterface.refresh_file_group_info(jobspec)
0195
0196 tmpLog.debug("After db refresh call groups=jobspec.get_groups_of_output_files()")
0197 groups = jobspec.get_groups_of_output_files()
0198 tmpLog.debug(f"jobspec.get_groups_of_output_files() = : {groups}")
0199
0200 if dummy_transferID in groups:
0201 groupUpdateTime = groups[dummy_transferID]["groupUpdateTime"]
0202
0203 fileSpecs = self.dbInterface.get_files_with_group_id(dummy_transferID)
0204
0205 msgStr = f"dummy_transferID = {dummy_transferID} number of files = {len(fileSpecs)}"
0206 tmpLog.debug(msgStr)
0207 if len(fileSpecs) >= 10 or groupUpdateTime < core_utils.naive_utcnow() - datetime.timedelta(minutes=10):
0208 tmpLog.debug("prepare to transfer files")
0209
0210
0211
0212 self.srcEndpoint = queueConfig.stager["srcEndpoint"]
0213 self.Globus_srcPath = self.basePath
0214 self.Globus_dstPath = queueConfig.stager["Globus_dstPath"]
0215 self.dstEndpoint = queueConfig.stager["dstEndpoint"]
0216
0217 errMsg = None
0218 try:
0219
0220 tmpStatsrc, srcStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.srcEndpoint)
0221 tmpStatdst, dstStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.dstEndpoint)
0222 if tmpStatsrc and tmpStatdst:
0223 errStr = "source Endpoint and destination Endpoint activated"
0224 tmpLog.debug(errStr)
0225 else:
0226 errMsg = ""
0227 if not tmpStatsrc:
0228 errMsg += " source Endpoint not activated "
0229 if not tmpStatdst:
0230 errMsg += " destination Endpoint not activated "
0231
0232 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0233 self.have_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0234 if not self.have_db_lock:
0235 errMsg += f" - Could not release DB lock for {dummy_transferID}"
0236 tmpLog.error(errMsg)
0237 tmpRetVal = (None, errMsg)
0238 return tmpRetVal
0239
0240 tdata = None
0241 tdata = TransferData(self.tc, self.srcEndpoint, self.dstEndpoint, sync_level="checksum")
0242 except BaseException:
0243 errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0244
0245 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0246 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0247 if not release_db_lock:
0248 errMsg += f" - Could not release DB lock for {dummy_transferID}"
0249 tmpLog.error(errMsg)
0250 tmpRetVal = (errStat, errMsg)
0251 return tmpRetVal
0252
0253 ifile = 0
0254 for fileSpec in fileSpecs:
0255
0256 if not fileSpec.lfn:
0257 msgStr = "fileSpec.lfn is empty"
0258 tmpLog.debug(msgStr)
0259 continue
0260 logfile = False
0261 scope = "panda"
0262 if fileSpec.scope is not None:
0263 scope = fileSpec.scope
0264
0265
0266 if ifile < 25:
0267 msgStr = f"fileSpec.lfn - {fileSpec.lfn} fileSpec.scope - {fileSpec.scope}"
0268 tmpLog.debug(msgStr)
0269 if ifile == 25:
0270 msgStr = "printed first 25 files skipping the rest".format(fileSpec.lfn, fileSpec.scope)
0271 tmpLog.debug(msgStr)
0272 hash = hashlib.md5()
0273 if sys.version_info.major == 2:
0274 hash.update(f"{scope}:{fileSpec.lfn}")
0275 if sys.version_info.major == 3:
0276 hash_string = f"{scope}:{fileSpec.lfn}"
0277 hash.update(bytes(hash_string, "utf-8"))
0278 hash_hex = hash.hexdigest()
0279 correctedscope = "/".join(scope.split("."))
0280 srcURL = fileSpec.path
0281 dstURL = f"{self.Globus_dstPath}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0282 if logfile:
0283 tmpLog.debug(f"src={srcURL} dst={dstURL}")
0284 if ifile < 25:
0285 tmpLog.debug(f"src={srcURL} dst={dstURL}")
0286
0287 if os.access(srcURL, os.R_OK):
0288 if ifile < 25:
0289 tmpLog.debug(f"tdata.add_item({srcURL},{dstURL})")
0290 tdata.add_item(srcURL, dstURL)
0291 else:
0292 errMsg = f"source file {srcURL} does not exist"
0293
0294 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0295 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0296 if not release_db_lock:
0297 errMsg += f" - Could not release DB lock for {dummy_transferID}"
0298 tmpLog.error(errMsg)
0299 tmpRetVal = (False, errMsg)
0300 return tmpRetVal
0301 ifile += 1
0302
0303 tmpLog.debug(f"Number of files to transfer - {len(tdata['DATA'])}")
0304 try:
0305 transfer_result = self.tc.submit_transfer(tdata)
0306
0307 tmpLog.debug(str(transfer_result))
0308 if transfer_result["code"] == "Accepted":
0309
0310
0311 transferID = transfer_result["task_id"]
0312 tmpLog.debug(f"successfully submitted id={transferID}")
0313
0314 self.dbInterface.set_file_group(fileSpecs, transferID, "running")
0315 msgStr = f"submitted transfer with ID={transferID}"
0316 tmpLog.debug(msgStr)
0317 else:
0318
0319 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0320 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0321 if not release_db_lock:
0322 errMsg = f"Could not release DB lock for {dummy_transferID}"
0323 tmpLog.error(errMsg)
0324 tmpRetVal = (None, transfer_result["message"])
0325 return tmpRetVal
0326 except Exception as e:
0327 errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0328
0329 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0330 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0331 if not release_db_lock:
0332 errMsg += f" - Could not release DB lock for {dummy_transferID}"
0333 tmpLog.error(errMsg)
0334 return errStat, errMsg
0335 else:
0336 msgStr = "wait until enough files are pooled"
0337 tmpLog.debug(msgStr)
0338
0339 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0340 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0341 if release_db_lock:
0342 tmpLog.debug(f"released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0343 have_db_lock = False
0344 else:
0345 msgStr += f" - Could not release DB lock for {dummy_transferID}"
0346 tmpLog.error(msgStr)
0347
0348 return None, msgStr
0349
0350 if have_db_lock:
0351 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0352 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0353 if release_db_lock:
0354 tmpLog.debug(f"released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0355 have_db_lock = False
0356 else:
0357 msgStr += f" - Could not release DB lock for {dummy_transferID}"
0358 tmpLog.error(msgStr)
0359 return None, msgStr
0360
0361
0362 tmpLog.debug("groups = jobspec.get_groups_of_output_files()")
0363 groups = jobspec.get_groups_of_output_files()
0364 tmpLog.debug(f"Number of transfer groups - {len(groups)}")
0365 tmpLog.debug(f"transfer groups any state - {groups}")
0366 if len(groups) == 0:
0367 tmpLog.debug("jobspec.get_groups_of_output_files(skip_done=True) returned no files ")
0368 tmpLog.debug("check_stage_out_status return status - True ")
0369 return True, ""
0370
0371 for transferID in groups:
0372
0373 if validate_transferid(transferID):
0374
0375 tmpStat, transferTasks = globus_utils.get_transfer_task_by_id(tmpLog, self.tc, transferID)
0376
0377 if not tmpStat:
0378 errStr = f"failed to get transfer task; tc = {str(self.tc)}; transferID = {str(transferID)}"
0379 tmpLog.error(errStr)
0380 return None, errStr
0381
0382 if transferID not in transferTasks:
0383 errStr = f"transfer task ID - {transferID} is missing"
0384 tmpLog.error(errStr)
0385 return None, errStr
0386
0387 if transferTasks[transferID]["status"] == "SUCCEEDED":
0388 tmpLog.debug(f"transfer task {transferID} succeeded")
0389 self.set_FileSpec_objstoreID(jobspec, self.objstoreID, self.pathConvention)
0390 if self.changeFileStatusOnSuccess:
0391 self.set_FileSpec_status(jobspec, "finished")
0392 return True, ""
0393
0394 if transferTasks[transferID]["status"] == "FAILED":
0395 errStr = f"transfer task {transferID} failed"
0396 tmpLog.error(errStr)
0397 self.set_FileSpec_status(jobspec, "failed")
0398 return False, errStr
0399
0400 tmpStr = f"transfer task {transferID} status: {transferTasks[transferID]['status']}"
0401 tmpLog.debug(tmpStr)
0402 return None, ""
0403
0404 tmpLog.debug("End of loop over transfers groups - ending check_stage_out_status function")
0405 return None, "no valid transfer id found"
0406
0407
0408 def trigger_stage_out(self, jobspec):
0409
0410 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID} ThreadID={threading.current_thread().ident}", method_name="trigger_stage_out")
0411 tmpLog.debug("start")
0412
0413
0414 tmpRetVal = (True, "")
0415
0416 if jobspec.computingSite is None:
0417
0418 tmpLog.error("jobspec.computingSite is not defined")
0419 return False, "jobspec.computingSite is not defined"
0420 else:
0421 tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0422
0423 if not self.tc:
0424 errStr = "failed to get Globus Transfer Client"
0425 tmpLog.error(errStr)
0426 return False, errStr
0427
0428 tmpLog.debug(f"self.dummy_transfer_id = {self.dummy_transfer_id}")
0429 if self.dummy_transfer_id == f"{dummy_transfer_id_base}_XXXX":
0430 old_dummy_transfer_id = self.dummy_transfer_id
0431 self.dummy_transfer_id = f"{dummy_transfer_id_base}_{jobspec.computingSite}"
0432 tmpLog.debug(f"Change self.dummy_transfer_id from {old_dummy_transfer_id} to {self.dummy_transfer_id}")
0433
0434 lfns = []
0435 for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0436
0437 if not fileSpec.lfn:
0438 msgStr = "fileSpec.lfn is empty"
0439 else:
0440 msgStr = f"fileSpec.lfn is {fileSpec.lfn}"
0441 lfns.append(fileSpec.lfn)
0442 tmpLog.debug(msgStr)
0443 jobspec.set_groups_to_files({self.dummy_transfer_id: {"lfns": lfns, "groupStatus": "pending"}})
0444 msgStr = f"jobspec.set_groups_to_files - self.dummy_tranfer_id - {self.dummy_transfer_id}, lfns - {lfns}, groupStatus - pending"
0445 tmpLog.debug(msgStr)
0446 tmpLog.debug("call self.dbInterface.set_file_group(jobspec.get_output_file_specs(skip_done=True),self.dummy_transfer_id,pending)")
0447 tmpStat = self.dbInterface.set_file_group(jobspec.get_output_file_specs(skip_done=True), self.dummy_transfer_id, "pending")
0448 tmpLog.debug("called self.dbInterface.set_file_group(jobspec.get_output_file_specs(skip_done=True),self.dummy_transfer_id,pending)")
0449 return True, ""
0450
0451
0452 def zip_output(self, jobspec):
0453
0454 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID} ThreadID={threading.current_thread().ident}", method_name="zip_output")
0455 return self.simple_zip_output(jobspec, tmpLog)
0456
0457
0458 def make_label(self, jobspec):
0459 return f"OUT-{jobspec.computingSite}-{jobspec.PandaID}"
0460
0461
0462 def resolve_input_paths(self, jobspec):
0463
0464 inFiles = jobspec.get_input_file_attributes()
0465
0466 for inLFN, inFile in inFiles.items():
0467 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0468
0469 jobspec.set_input_file_paths(inFiles)
0470 return True, ""