File indexing completed on 2026-04-20 07:58:59
0001 import datetime
0002 import hashlib
0003 import os
0004 import os.path
0005 import string
0006 import sys
0007 import threading
0008 import time
0009 import uuid
0010 import zipfile
0011
0012
0013 import requests.packages.urllib3
0014 from globus_sdk import (
0015 NativeAppAuthClient,
0016 RefreshTokenAuthorizer,
0017 TransferClient,
0018 TransferData,
0019 )
0020
0021 from pandaharvester.harvestermisc import globus_utils
0022
0023 try:
0024 requests.packages.urllib3.disable_warnings()
0025 except BaseException:
0026 pass
0027 from pandaharvester.harvesterconfig import harvester_config
0028 from pandaharvester.harvestercore import core_utils
0029 from pandaharvester.harvestercore.plugin_base import PluginBase
0030 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0031 from pandaharvester.harvestermover import mover_utils
0032
0033
0034 dummy_transfer_id_base = "dummy_id_for_in"
0035
0036 uLock = threading.Lock()
0037
0038
0039 uID = 0
0040
0041
0042 _logger = core_utils.setup_logger("go_bulk_preparator")
0043
0044
0045 def validate_transferid(transferid):
0046 tmptransferid = transferid.replace("-", "")
0047 return all(c in string.hexdigits for c in tmptransferid)
0048
0049
0050 def dump(obj):
0051 for attr in dir(obj):
0052 if hasattr(obj, attr):
0053 print(f"obj.{attr} = {getattr(obj, attr)}")
0054
0055
0056
0057
0058 class GlobusBulkPreparator(PluginBase):
0059 next_id = 0
0060
0061
0062 def __init__(self, **kwarg):
0063 PluginBase.__init__(self, **kwarg)
0064
0065 tmpLog = self.make_logger(_logger, f"ThreadID={threading.current_thread().ident}", method_name="GlobusBulkPreparator __init__ {} ")
0066 tmpLog.debug("__init__ start")
0067 self.thread_id = threading.current_thread().ident
0068 self.id = GlobusBulkPreparator.next_id
0069 GlobusBulkPreparator.next_id += 1
0070 with uLock:
0071 global uID
0072 self.dummy_transfer_id = f"{dummy_transfer_id_base}_XXXX"
0073 uID += 1
0074 uID %= harvester_config.preparator.nThreads
0075
0076 try:
0077 self.tc = None
0078
0079 tmpLog.debug("about to call dbInterface.get_cache(globus_secret)")
0080 c_data = self.dbInterface.get_cache("globus_secret")
0081 if (c_data is not None) and c_data.data["StatusCode"] == 0:
0082 tmpLog.debug("Got the globus_secrets from PanDA")
0083 self.client_id = c_data.data["publicKey"]
0084 self.refresh_token = c_data.data["privateKey"]
0085 tmpStat, self.tc = globus_utils.create_globus_transfer_client(tmpLog, self.client_id, self.refresh_token)
0086 if not tmpStat:
0087 self.tc = None
0088 errStr = "failed to create Globus Transfer Client"
0089 tmpLog.error(errStr)
0090 else:
0091 self.client_id = None
0092 self.refresh_token = None
0093 self.tc = None
0094 errStr = "failed to get Globus Client ID and Refresh Token"
0095 tmpLog.error(errStr)
0096 except BaseException:
0097 core_utils.dump_error_message(tmpLog)
0098
0099 tmpLog.debug(f"self.id = {self.id}")
0100 tmpLog.debug(f"self.dummy_transfer_id = {self.dummy_transfer_id}")
0101
0102 tmpLog.debug("__init__ finish")
0103
0104
0105
0106 def get_dummy_transfer_id(self):
0107 return self.dummy_transfer_id
0108
0109
0110 def set_dummy_transfer_id_testing(self, dummy_transfer_id):
0111 self.dummy_transfer_id = dummy_transfer_id
0112
0113
0114 def set_FileSpec_status(self, jobspec, status):
0115
0116 for fileSpec in jobspec.inFiles:
0117 fileSpec.status = status
0118
0119
0120 def check_stage_in_status(self, jobspec):
0121
0122 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID} ThreadID={threading.current_thread().ident}", method_name="check_stage_in_status")
0123 tmpLog.debug("start")
0124
0125 if jobspec.computingSite is None:
0126
0127 tmpLog.error("jobspec.computingSite is not defined")
0128 return False, "jobspec.computingSite is not defined"
0129 else:
0130 tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0131
0132 tmpLog.debug(f"self.dummy_transfer_id = {self.dummy_transfer_id}")
0133 if self.dummy_transfer_id == f"{dummy_transfer_id_base}_XXXX":
0134 old_dummy_transfer_id = self.dummy_transfer_id
0135 self.dummy_transfer_id = f"{dummy_transfer_id_base}_{jobspec.computingSite}"
0136 tmpLog.debug(f"Change self.dummy_transfer_id from {old_dummy_transfer_id} to {self.dummy_transfer_id}")
0137
0138
0139 tmpRetVal = (True, "")
0140
0141 have_db_lock = False
0142 queueConfigMapper = QueueConfigMapper()
0143 queueConfig = queueConfigMapper.get_queue(jobspec.computingSite)
0144
0145 if not self.tc:
0146 errStr = "failed to get Globus Transfer Client"
0147 tmpLog.error(errStr)
0148 return False, errStr
0149
0150 transferID = None
0151
0152 groups = jobspec.get_groups_of_input_files(skip_ready=True)
0153 tmpLog.debug(f"jobspec.get_groups_of_input_files() = : {groups}")
0154
0155 for dummy_transferID in groups:
0156
0157 if validate_transferid(dummy_transferID):
0158 continue
0159
0160 tmpLog.debug(
0161 f"attempt to set DB lock for self.id - {self.id} self.dummy_transfer_id - {self.dummy_transfer_id}, dummy_transferID - {dummy_transferID}"
0162 )
0163 have_db_lock = self.dbInterface.get_object_lock(dummy_transferID, lock_interval=120)
0164 tmpLog.debug(f" DB lock result - {have_db_lock}")
0165 if not have_db_lock:
0166
0167 msgStr = "escape since locked by another thread"
0168 tmpLog.debug(msgStr)
0169 return None, msgStr
0170
0171 tmpLog.debug("self.dbInterface.refresh_file_group_info(jobspec)")
0172 self.dbInterface.refresh_file_group_info(jobspec)
0173 tmpLog.debug("after self.dbInterface.refresh_file_group_info(jobspec)")
0174
0175 tmpLog.debug("groups = jobspec.get_groups_of_input_files(skip_ready=True)")
0176 groups = jobspec.get_groups_of_input_files(skip_ready=True)
0177 tmpLog.debug(f"after db lock and refresh - jobspec.get_groups_of_input_files(skip_ready=True) = : {groups}")
0178
0179 if dummy_transferID in groups:
0180 groupUpdateTime = groups[dummy_transferID]["groupUpdateTime"]
0181
0182 fileSpecs_allgroups = self.dbInterface.get_files_with_group_id(dummy_transferID)
0183 msgStr = "dummy_transferID = {0} self.dbInterface.get_files_with_group_id(dummy_transferID) number of files = {1}".format(
0184 dummy_transferID, len(fileSpecs_allgroups)
0185 )
0186 tmpLog.debug(msgStr)
0187 fileSpecs = jobspec.get_input_file_specs(dummy_transferID, skip_ready=True)
0188 msgStr = "dummy_transferID = {0} jobspec.get_input_file_specs(dummy_transferID,skip_ready=True) number of files = {1}".format(
0189 dummy_transferID, len(fileSpecs)
0190 )
0191 tmpLog.debug(msgStr)
0192
0193 if len(fileSpecs) >= 10 or groupUpdateTime < core_utils.naive_utcnow() - datetime.timedelta(minutes=10):
0194 tmpLog.debug("prepare to transfer files")
0195
0196
0197 self.Globus_srcPath = queueConfig.preparator["Globus_srcPath"]
0198 self.srcEndpoint = queueConfig.preparator["srcEndpoint"]
0199 self.Globus_dstPath = self.basePath
0200
0201 self.dstEndpoint = queueConfig.preparator["dstEndpoint"]
0202
0203 errMsg = None
0204 try:
0205
0206 tmpStatsrc, srcStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.srcEndpoint)
0207 tmpStatdst, dstStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.dstEndpoint)
0208 if tmpStatsrc and tmpStatdst:
0209 errStr = "source Endpoint and destination Endpoint activated"
0210 tmpLog.debug(errStr)
0211 else:
0212 errMsg = ""
0213 if not tmpStatsrc:
0214 errMsg += " source Endpoint not activated "
0215 if not tmpStatdst:
0216 errMsg += " destination Endpoint not activated "
0217
0218 tmpLog.debug(
0219 "attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}, dummy_transferID - {2}".format(
0220 self.id, self.dummy_transfer_id, dummy_transferID
0221 )
0222 )
0223 have_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0224 if not have_db_lock:
0225 errMsg += f" - Could not release DB lock for {dummy_transferID}"
0226 tmpLog.error(errMsg)
0227 tmpRetVal = (None, errMsg)
0228 return tmpRetVal
0229
0230 tdata = None
0231 tdata = TransferData(self.tc, self.srcEndpoint, self.dstEndpoint, sync_level="exists")
0232
0233 tmpLog.debug(f"size of tdata[DATA] - {len(tdata['DATA'])}")
0234
0235 except BaseException:
0236 errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0237
0238 tmpLog.debug(
0239 "attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}, dummy_transferID - {2}".format(
0240 self.id, self.dummy_transfer_id, dummy_transferID
0241 )
0242 )
0243 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0244 if not release_db_lock:
0245 errMsg += f" - Could not release DB lock for {self.dummy_transferID}"
0246 tmpLog.error(errMsg)
0247 tmpRetVal = (errStat, errMsg)
0248 return tmpRetVal
0249
0250 ifile = 0
0251 for fileSpec in fileSpecs:
0252
0253 if ifile < 25:
0254 msgStr = f"fileSpec.lfn - {fileSpec.lfn} fileSpec.scope - {fileSpec.scope}"
0255 tmpLog.debug(msgStr)
0256 if ifile == 25:
0257 msgStr = "printed first 25 files skipping the rest".format(fileSpec.lfn, fileSpec.scope)
0258 tmpLog.debug(msgStr)
0259
0260 scope = "panda"
0261 if fileSpec.scope is not None:
0262 scope = fileSpec.scope
0263 hash = hashlib.md5()
0264 if sys.version_info.major == 2:
0265 hash.update(f"{scope}:{fileSpec.lfn}")
0266 if sys.version_info.major == 3:
0267 hash_string = f"{scope}:{fileSpec.lfn}"
0268 hash.update(bytes(hash_string, "utf-8"))
0269 hash_hex = hash.hexdigest()
0270 correctedscope = "/".join(scope.split("."))
0271
0272 srcURL = f"{self.Globus_srcPath}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0273 dstURL = f"{self.Globus_dstPath}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0274
0275 if ifile < 25:
0276 tmpLog.debug(f"tdata.add_item({srcURL},{dstURL})")
0277 tdata.add_item(srcURL, dstURL)
0278 ifile += 1
0279
0280 tmpLog.debug(f"Number of files to transfer - {len(tdata['DATA'])}")
0281 try:
0282 transfer_result = self.tc.submit_transfer(tdata)
0283
0284 tmpLog.debug(str(transfer_result))
0285 if transfer_result["code"] == "Accepted":
0286
0287
0288 transferID = transfer_result["task_id"]
0289 tmpLog.debug(f"successfully submitted id={transferID}")
0290
0291 self.dbInterface.set_file_group(fileSpecs, transferID, "running")
0292 msgStr = f"submitted transfer with ID={transferID}"
0293 tmpLog.debug(msgStr)
0294 else:
0295
0296 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0297 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0298 if release_db_lock:
0299 tmpLog.debug(f"Released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0300 have_db_lock = False
0301 else:
0302 errMsg = f"Could not release DB lock for {dummy_transferID}"
0303 tmpLog.error(errMsg)
0304 tmpRetVal = (None, transfer_result["message"])
0305 return tmpRetVal
0306 except Exception as e:
0307 errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0308
0309 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0310 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0311 if release_db_lock:
0312 tmpLog.debug(f"Released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0313 have_db_lock = False
0314 else:
0315 errMsg += f" - Could not release DB lock for {dummy_transferID}"
0316 tmpLog.error(errMsg)
0317 return errStat, errMsg
0318 else:
0319 msgStr = "wait until enough files are pooled"
0320 tmpLog.debug(msgStr)
0321
0322 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0323 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0324 if release_db_lock:
0325 tmpLog.debug(f"released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0326 have_db_lock = False
0327 else:
0328 msgStr += f" - Could not release DB lock for {dummy_transferID}"
0329 tmpLog.error(msgStr)
0330
0331 return None, msgStr
0332
0333 if have_db_lock:
0334 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0335 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0336 if release_db_lock:
0337 tmpLog.debug(f"released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0338 have_db_lock = False
0339 else:
0340 msgStr += f" - Could not release DB lock for {dummy_transferID}"
0341 tmpLog.error(msgStr)
0342 return None, msgStr
0343
0344
0345 tmpLog.debug("groups = jobspec.get_groups_of_input_files(skip_ready=True)")
0346 groups = jobspec.get_groups_of_input_files(skip_ready=True)
0347 tmpLog.debug(f"Number of transfer groups (skip_ready)- {len(groups)}")
0348 tmpLog.debug(f"transfer groups any state (skip_ready)- {groups}")
0349 tmpLog.debug("groups = jobspec.get_groups_of_input_files()")
0350 groups = jobspec.get_groups_of_input_files()
0351 tmpLog.debug(f"Number of transfer groups - {len(groups)}")
0352 tmpLog.debug(f"transfer groups any state - {groups}")
0353 tmpLog.debug("groups = jobspec.get_groups_of_input_files(skip_ready=True)")
0354 groups = jobspec.get_groups_of_input_files(skip_ready=True)
0355 if len(groups) == 0:
0356 tmpLog.debug("jobspec.get_groups_of_input_files(skip_ready=True) returned no files ")
0357 tmpLog.debug("check_stage_in_status return status - True ")
0358 return True, ""
0359 for transferID in groups:
0360
0361 if validate_transferid(transferID):
0362
0363 tmpStat, transferTasks = globus_utils.get_transfer_task_by_id(tmpLog, self.tc, transferID)
0364
0365 if not tmpStat:
0366 errStr = f"failed to get transfer task; tc = {str(self.tc)}; transferID = {str(transferID)}"
0367 tmpLog.error(errStr)
0368 return None, errStr
0369
0370 if transferID not in transferTasks:
0371 errStr = f"transfer task ID - {transferID} is missing"
0372 tmpLog.error(errStr)
0373 return None, errStr
0374
0375 if transferTasks[transferID]["status"] == "SUCCEEDED":
0376 tmpLog.debug(f"transfer task {transferID} succeeded")
0377 self.set_FileSpec_status(jobspec, "finished")
0378 return True, ""
0379
0380 if transferTasks[transferID]["status"] == "FAILED":
0381 errStr = f"transfer task {transferID} failed"
0382 tmpLog.error(errStr)
0383 self.set_FileSpec_status(jobspec, "failed")
0384 return False, errStr
0385
0386 tmpStr = f"transfer task {transferID} status: {transferTasks[transferID]['status']}"
0387 tmpLog.debug(tmpStr)
0388 return None, tmpStr
0389
0390 tmpLog.debug("End of loop over transfers groups - ending check_stage_in_status function")
0391 return None, "no valid transfer id found"
0392
0393
0394
0395 def trigger_preparation(self, jobspec):
0396
0397 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID} ThreadID={threading.current_thread().ident}", method_name="trigger_preparation")
0398 tmpLog.debug("start")
0399
0400 tmpRetVal = (True, "")
0401
0402 if jobspec.computingSite is None:
0403
0404 tmpLog.error("jobspec.computingSite is not defined")
0405 return False, "jobspec.computingSite is not defined"
0406 else:
0407 tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0408
0409 if not self.tc:
0410 errStr = "failed to get Globus Transfer Client"
0411 tmpLog.error(errStr)
0412 return False, errStr
0413
0414 tmpLog.debug(f"self.dummy_transfer_id = {self.dummy_transfer_id}")
0415 if self.dummy_transfer_id == f"{dummy_transfer_id_base}_XXXX":
0416 old_dummy_transfer_id = self.dummy_transfer_id
0417 self.dummy_transfer_id = f"{dummy_transfer_id_base}_{jobspec.computingSite}"
0418 tmpLog.debug(f"Change self.dummy_transfer_id from {old_dummy_transfer_id} to {self.dummy_transfer_id}")
0419
0420 inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0421 lfns = list(inFiles.keys())
0422
0423
0424 tmpLog.debug(f"number of lfns - {len(lfns)} type(lfns) - {type(lfns)}")
0425 jobspec.set_groups_to_files({self.dummy_transfer_id: {"lfns": lfns, "groupStatus": "pending"}})
0426 if len(lfns) < 10:
0427 msgStr = f"jobspec.set_groups_to_files - self.dummy_tranfer_id - {self.dummy_transfer_id}, lfns - {lfns}, groupStatus - pending"
0428 else:
0429 tmp_lfns = lfns[:10]
0430 msgStr = f"jobspec.set_groups_to_files - self.dummy_tranfer_id - {self.dummy_transfer_id}, lfns (first 25) - {tmp_lfns}, groupStatus - pending"
0431 tmpLog.debug(msgStr)
0432 fileSpec_list = jobspec.get_input_file_specs(self.dummy_transfer_id, skip_ready=True)
0433 tmpLog.debug(f"call jobspec.get_input_file_specs({self.dummy_transfer_id}, skip_ready=True) num files returned = {len(fileSpec_list)}")
0434 tmpLog.debug(
0435 "call self.dbInterface.set_file_group(jobspec.get_input_file_specs(self.dummy_transfer_id,skip_ready=True),self.dummy_transfer_id,pending)"
0436 )
0437 tmpStat = self.dbInterface.set_file_group(fileSpec_list, self.dummy_transfer_id, "pending")
0438 msgStr = "called self.dbInterface.set_file_group(jobspec.get_input_file_specs(self.dummy_transfer_id,skip_ready=True),self.dummy_transfer_id,pending) return Status {}".format(
0439 tmpStat
0440 )
0441 tmpLog.debug(msgStr)
0442 return True, ""
0443
0444
0445
0446 def make_label(self, jobspec):
0447 return f"IN-{jobspec.computingSite}-{jobspec.PandaID}"
0448
0449
0450 def resolve_input_paths(self, jobspec):
0451
0452 inFiles = jobspec.get_input_file_attributes()
0453
0454 for inLFN, inFile in inFiles.items():
0455 inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0456
0457 jobspec.set_input_file_paths(inFiles)
0458 return True, ""
0459
0460