Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import datetime
0002 import hashlib
0003 import os
0004 import os.path
0005 import string
0006 import sys
0007 import threading
0008 import time
0009 import uuid
0010 import zipfile
0011 
0012 # TO BE REMOVED for python2.7
0013 import requests.packages.urllib3
0014 from globus_sdk import (
0015     NativeAppAuthClient,
0016     RefreshTokenAuthorizer,
0017     TransferClient,
0018     TransferData,
0019 )
0020 
0021 from pandaharvester.harvestermisc import globus_utils
0022 
0023 try:
0024     requests.packages.urllib3.disable_warnings()
0025 except BaseException:
0026     pass
0027 from pandaharvester.harvesterconfig import harvester_config
0028 from pandaharvester.harvestercore import core_utils
0029 from pandaharvester.harvestercore.plugin_base import PluginBase
0030 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0031 from pandaharvester.harvestermover import mover_utils
0032 
0033 # Define dummy transfer identifier
0034 dummy_transfer_id_base = "dummy_id_for_in"
0035 # lock to get a unique ID
0036 uLock = threading.Lock()
0037 
0038 # number to get a unique ID
0039 uID = 0
0040 
0041 # logger
0042 _logger = core_utils.setup_logger("go_bulk_preparator")
0043 
0044 
0045 def validate_transferid(transferid):
0046     tmptransferid = transferid.replace("-", "")
0047     return all(c in string.hexdigits for c in tmptransferid)
0048 
0049 
0050 def dump(obj):
0051     for attr in dir(obj):
0052         if hasattr(obj, attr):
0053             print(f"obj.{attr} = {getattr(obj, attr)}")
0054 
0055 
0056 # Globus plugin for stager with bulk transfers. For JobSpec and DBInterface methods, see
0057 # https://github.com/PanDAWMS/panda-harvester/wiki/Utilities#file-grouping-for-file-transfers
0058 class GlobusBulkPreparator(PluginBase):
0059     next_id = 0
0060     # constructor
0061 
0062     def __init__(self, **kwarg):
0063         PluginBase.__init__(self, **kwarg)
0064         # make logger
0065         tmpLog = self.make_logger(_logger, f"ThreadID={threading.current_thread().ident}", method_name="GlobusBulkPreparator __init__ {} ")
0066         tmpLog.debug("__init__ start")
0067         self.thread_id = threading.current_thread().ident
0068         self.id = GlobusBulkPreparator.next_id
0069         GlobusBulkPreparator.next_id += 1
0070         with uLock:
0071             global uID
0072             self.dummy_transfer_id = f"{dummy_transfer_id_base}_XXXX"
0073             uID += 1
0074             uID %= harvester_config.preparator.nThreads
0075         # create Globus Transfer Client
0076         try:
0077             self.tc = None
0078             # need to get client_id and refresh_token from PanDA server via harvester cache mechanism
0079             tmpLog.debug("about to call dbInterface.get_cache(globus_secret)")
0080             c_data = self.dbInterface.get_cache("globus_secret")
0081             if (c_data is not None) and c_data.data["StatusCode"] == 0:
0082                 tmpLog.debug("Got the globus_secrets from PanDA")
0083                 self.client_id = c_data.data["publicKey"]  # client_id
0084                 self.refresh_token = c_data.data["privateKey"]  # refresh_token
0085                 tmpStat, self.tc = globus_utils.create_globus_transfer_client(tmpLog, self.client_id, self.refresh_token)
0086                 if not tmpStat:
0087                     self.tc = None
0088                     errStr = "failed to create Globus Transfer Client"
0089                     tmpLog.error(errStr)
0090             else:
0091                 self.client_id = None
0092                 self.refresh_token = None
0093                 self.tc = None
0094                 errStr = "failed to get Globus Client ID and Refresh Token"
0095                 tmpLog.error(errStr)
0096         except BaseException:
0097             core_utils.dump_error_message(tmpLog)
0098         # tmp debugging
0099         tmpLog.debug(f"self.id = {self.id}")
0100         tmpLog.debug(f"self.dummy_transfer_id = {self.dummy_transfer_id}")
0101         # tmp debugging
0102         tmpLog.debug("__init__ finish")
0103 
0104     # get dummy_transfer_id
0105 
0106     def get_dummy_transfer_id(self):
0107         return self.dummy_transfer_id
0108 
0109     # set dummy_transfer_id for testing
0110     def set_dummy_transfer_id_testing(self, dummy_transfer_id):
0111         self.dummy_transfer_id = dummy_transfer_id
0112 
0113     # set FileSpec.status
0114     def set_FileSpec_status(self, jobspec, status):
0115         # loop over all input files
0116         for fileSpec in jobspec.inFiles:
0117             fileSpec.status = status
0118 
0119     # check status
0120     def check_stage_in_status(self, jobspec):
0121         # make logger
0122         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID} ThreadID={threading.current_thread().ident}", method_name="check_stage_in_status")
0123         tmpLog.debug("start")
0124         # check that jobspec.computingSite is defined
0125         if jobspec.computingSite is None:
0126             # not found
0127             tmpLog.error("jobspec.computingSite is not defined")
0128             return False, "jobspec.computingSite is not defined"
0129         else:
0130             tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0131         # show the dummy transfer id and set to a value with the jobspec.computingSite if needed.
0132         tmpLog.debug(f"self.dummy_transfer_id = {self.dummy_transfer_id}")
0133         if self.dummy_transfer_id == f"{dummy_transfer_id_base}_XXXX":
0134             old_dummy_transfer_id = self.dummy_transfer_id
0135             self.dummy_transfer_id = f"{dummy_transfer_id_base}_{jobspec.computingSite}"
0136             tmpLog.debug(f"Change self.dummy_transfer_id  from {old_dummy_transfer_id} to {self.dummy_transfer_id}")
0137 
0138         # default return
0139         tmpRetVal = (True, "")
0140         # set flag if have db lock
0141         have_db_lock = False
0142         queueConfigMapper = QueueConfigMapper()
0143         queueConfig = queueConfigMapper.get_queue(jobspec.computingSite)
0144         # test we have a Globus Transfer Client
0145         if not self.tc:
0146             errStr = "failed to get Globus Transfer Client"
0147             tmpLog.error(errStr)
0148             return False, errStr
0149         # set transferID to None
0150         transferID = None
0151         # get transfer groups
0152         groups = jobspec.get_groups_of_input_files(skip_ready=True)
0153         tmpLog.debug(f"jobspec.get_groups_of_input_files() = : {groups}")
0154         # lock if the dummy transfer ID is used to avoid submitting duplicated transfer requests
0155         for dummy_transferID in groups:
0156             # skip if valid transfer ID not dummy one
0157             if validate_transferid(dummy_transferID):
0158                 continue
0159             # lock for 120 sec
0160             tmpLog.debug(
0161                 f"attempt to set DB lock for self.id - {self.id} self.dummy_transfer_id - {self.dummy_transfer_id}, dummy_transferID - {dummy_transferID}"
0162             )
0163             have_db_lock = self.dbInterface.get_object_lock(dummy_transferID, lock_interval=120)
0164             tmpLog.debug(f" DB lock result - {have_db_lock}")
0165             if not have_db_lock:
0166                 # escape since locked by another thread
0167                 msgStr = "escape since locked by another thread"
0168                 tmpLog.debug(msgStr)
0169                 return None, msgStr
0170             # refresh group information since that could have been updated by another thread before getting the lock
0171             tmpLog.debug("self.dbInterface.refresh_file_group_info(jobspec)")
0172             self.dbInterface.refresh_file_group_info(jobspec)
0173             tmpLog.debug("after self.dbInterface.refresh_file_group_info(jobspec)")
0174             # get transfer groups again with refreshed info
0175             tmpLog.debug("groups = jobspec.get_groups_of_input_files(skip_ready=True)")
0176             groups = jobspec.get_groups_of_input_files(skip_ready=True)
0177             tmpLog.debug(f"after db lock and refresh - jobspec.get_groups_of_input_files(skip_ready=True) = : {groups}")
0178             # the dummy transfer ID is still there
0179             if dummy_transferID in groups:
0180                 groupUpdateTime = groups[dummy_transferID]["groupUpdateTime"]
0181                 # get files with the dummy transfer ID across jobs
0182                 fileSpecs_allgroups = self.dbInterface.get_files_with_group_id(dummy_transferID)
0183                 msgStr = "dummy_transferID = {0} self.dbInterface.get_files_with_group_id(dummy_transferID)  number of files = {1}".format(
0184                     dummy_transferID, len(fileSpecs_allgroups)
0185                 )
0186                 tmpLog.debug(msgStr)
0187                 fileSpecs = jobspec.get_input_file_specs(dummy_transferID, skip_ready=True)
0188                 msgStr = "dummy_transferID = {0} jobspec.get_input_file_specs(dummy_transferID,skip_ready=True)  number of files = {1}".format(
0189                     dummy_transferID, len(fileSpecs)
0190                 )
0191                 tmpLog.debug(msgStr)
0192                 # submit transfer if there are more than 10 files or the group was made before more than 10 min
0193                 if len(fileSpecs) >= 10 or groupUpdateTime < core_utils.naive_utcnow() - datetime.timedelta(minutes=10):
0194                     tmpLog.debug("prepare to transfer files")
0195                     # submit transfer and get a real transfer ID
0196                     # set the Globus destination Endpoint id and path will get them from Agis eventually
0197                     self.Globus_srcPath = queueConfig.preparator["Globus_srcPath"]
0198                     self.srcEndpoint = queueConfig.preparator["srcEndpoint"]
0199                     self.Globus_dstPath = self.basePath
0200                     # self.Globus_dstPath = queueConfig.preparator['Globus_dstPath']
0201                     self.dstEndpoint = queueConfig.preparator["dstEndpoint"]
0202                     # Test the endpoints and create the transfer data class
0203                     errMsg = None
0204                     try:
0205                         # Test endpoints for activation
0206                         tmpStatsrc, srcStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.srcEndpoint)
0207                         tmpStatdst, dstStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.dstEndpoint)
0208                         if tmpStatsrc and tmpStatdst:
0209                             errStr = "source Endpoint and destination Endpoint activated"
0210                             tmpLog.debug(errStr)
0211                         else:
0212                             errMsg = ""
0213                             if not tmpStatsrc:
0214                                 errMsg += " source Endpoint not activated "
0215                             if not tmpStatdst:
0216                                 errMsg += " destination Endpoint not activated "
0217                             # release process lock
0218                             tmpLog.debug(
0219                                 "attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}, dummy_transferID - {2}".format(
0220                                     self.id, self.dummy_transfer_id, dummy_transferID
0221                                 )
0222                             )
0223                             have_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0224                             if not have_db_lock:
0225                                 errMsg += f" - Could not release DB lock for {dummy_transferID}"
0226                             tmpLog.error(errMsg)
0227                             tmpRetVal = (None, errMsg)
0228                             return tmpRetVal
0229                         # both endpoints activated now prepare to transfer data
0230                         tdata = None
0231                         tdata = TransferData(self.tc, self.srcEndpoint, self.dstEndpoint, sync_level="exists")
0232                         #                                             sync_level="checksum")
0233                         tmpLog.debug(f"size of tdata[DATA] - {len(tdata['DATA'])}")
0234 
0235                     except BaseException:
0236                         errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0237                         # release process lock
0238                         tmpLog.debug(
0239                             "attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}, dummy_transferID - {2}".format(
0240                                 self.id, self.dummy_transfer_id, dummy_transferID
0241                             )
0242                         )
0243                         release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0244                         if not release_db_lock:
0245                             errMsg += f" - Could not release DB lock for {self.dummy_transferID}"
0246                         tmpLog.error(errMsg)
0247                         tmpRetVal = (errStat, errMsg)
0248                         return tmpRetVal
0249                     # loop over all files
0250                     ifile = 0
0251                     for fileSpec in fileSpecs:
0252                         # only print to log file first 25 files
0253                         if ifile < 25:
0254                             msgStr = f"fileSpec.lfn - {fileSpec.lfn} fileSpec.scope - {fileSpec.scope}"
0255                             tmpLog.debug(msgStr)
0256                         if ifile == 25:
0257                             msgStr = "printed first 25 files skipping the rest".format(fileSpec.lfn, fileSpec.scope)
0258                             tmpLog.debug(msgStr)
0259                         # end debug log file test
0260                         scope = "panda"
0261                         if fileSpec.scope is not None:
0262                             scope = fileSpec.scope
0263                         hash = hashlib.md5()
0264                         if sys.version_info.major == 2:
0265                             hash.update(f"{scope}:{fileSpec.lfn}")
0266                         if sys.version_info.major == 3:
0267                             hash_string = f"{scope}:{fileSpec.lfn}"
0268                             hash.update(bytes(hash_string, "utf-8"))
0269                         hash_hex = hash.hexdigest()
0270                         correctedscope = "/".join(scope.split("."))
0271                         # srcURL = fileSpec.path
0272                         srcURL = f"{self.Globus_srcPath}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0273                         dstURL = f"{self.Globus_dstPath}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0274                         # add files to transfer object - tdata
0275                         if ifile < 25:
0276                             tmpLog.debug(f"tdata.add_item({srcURL},{dstURL})")
0277                         tdata.add_item(srcURL, dstURL)
0278                         ifile += 1
0279                     # submit transfer
0280                     tmpLog.debug(f"Number of files to transfer - {len(tdata['DATA'])}")
0281                     try:
0282                         transfer_result = self.tc.submit_transfer(tdata)
0283                         # check status code and message
0284                         tmpLog.debug(str(transfer_result))
0285                         if transfer_result["code"] == "Accepted":
0286                             # succeeded
0287                             # set transfer ID which are used for later lookup
0288                             transferID = transfer_result["task_id"]
0289                             tmpLog.debug(f"successfully submitted id={transferID}")
0290                             # set status for files
0291                             self.dbInterface.set_file_group(fileSpecs, transferID, "running")
0292                             msgStr = f"submitted transfer with ID={transferID}"
0293                             tmpLog.debug(msgStr)
0294                         else:
0295                             # release process lock
0296                             tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0297                             release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0298                             if release_db_lock:
0299                                 tmpLog.debug(f"Released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0300                                 have_db_lock = False
0301                             else:
0302                                 errMsg = f"Could not release DB lock for {dummy_transferID}"
0303                                 tmpLog.error(errMsg)
0304                             tmpRetVal = (None, transfer_result["message"])
0305                             return tmpRetVal
0306                     except Exception as e:
0307                         errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0308                         # release process lock
0309                         tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0310                         release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0311                         if release_db_lock:
0312                             tmpLog.debug(f"Released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0313                             have_db_lock = False
0314                         else:
0315                             errMsg += f" - Could not release DB lock for {dummy_transferID}"
0316                         tmpLog.error(errMsg)
0317                         return errStat, errMsg
0318                 else:
0319                     msgStr = "wait until enough files are pooled"
0320                     tmpLog.debug(msgStr)
0321                 # release the lock
0322                 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0323                 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0324                 if release_db_lock:
0325                     tmpLog.debug(f"released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0326                     have_db_lock = False
0327                 else:
0328                     msgStr += f" - Could not release DB lock for {dummy_transferID}"
0329                     tmpLog.error(msgStr)
0330                 # return None to retry later
0331                 return None, msgStr
0332             # release the db lock if needed
0333             if have_db_lock:
0334                 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0335                 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0336                 if release_db_lock:
0337                     tmpLog.debug(f"released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0338                     have_db_lock = False
0339                 else:
0340                     msgStr += f" - Could not release DB lock for {dummy_transferID}"
0341                     tmpLog.error(msgStr)
0342                     return None, msgStr
0343         # check transfer with real transfer IDs
0344         # get transfer groups
0345         tmpLog.debug("groups = jobspec.get_groups_of_input_files(skip_ready=True)")
0346         groups = jobspec.get_groups_of_input_files(skip_ready=True)
0347         tmpLog.debug(f"Number of transfer groups (skip_ready)- {len(groups)}")
0348         tmpLog.debug(f"transfer groups any state (skip_ready)- {groups}")
0349         tmpLog.debug("groups = jobspec.get_groups_of_input_files()")
0350         groups = jobspec.get_groups_of_input_files()
0351         tmpLog.debug(f"Number of transfer groups - {len(groups)}")
0352         tmpLog.debug(f"transfer groups any state - {groups}")
0353         tmpLog.debug("groups = jobspec.get_groups_of_input_files(skip_ready=True)")
0354         groups = jobspec.get_groups_of_input_files(skip_ready=True)
0355         if len(groups) == 0:
0356             tmpLog.debug("jobspec.get_groups_of_input_files(skip_ready=True) returned no files ")
0357             tmpLog.debug("check_stage_in_status return status - True ")
0358             return True, ""
0359         for transferID in groups:
0360             # allow only valid UUID
0361             if validate_transferid(transferID):
0362                 # get transfer task
0363                 tmpStat, transferTasks = globus_utils.get_transfer_task_by_id(tmpLog, self.tc, transferID)
0364                 # return a temporary error when failed to get task
0365                 if not tmpStat:
0366                     errStr = f"failed to get transfer task; tc = {str(self.tc)}; transferID = {str(transferID)}"
0367                     tmpLog.error(errStr)
0368                     return None, errStr
0369                 # return a temporary error when task is missing
0370                 if transferID not in transferTasks:
0371                     errStr = f"transfer task ID - {transferID} is missing"
0372                     tmpLog.error(errStr)
0373                     return None, errStr
0374                 # succeeded in finding a transfer task by tranferID
0375                 if transferTasks[transferID]["status"] == "SUCCEEDED":
0376                     tmpLog.debug(f"transfer task {transferID} succeeded")
0377                     self.set_FileSpec_status(jobspec, "finished")
0378                     return True, ""
0379                 # failed
0380                 if transferTasks[transferID]["status"] == "FAILED":
0381                     errStr = f"transfer task {transferID} failed"
0382                     tmpLog.error(errStr)
0383                     self.set_FileSpec_status(jobspec, "failed")
0384                     return False, errStr
0385                 # another status
0386                 tmpStr = f"transfer task {transferID} status: {transferTasks[transferID]['status']}"
0387                 tmpLog.debug(tmpStr)
0388                 return None, tmpStr
0389         # end of loop over transfer groups
0390         tmpLog.debug("End of loop over transfers groups - ending check_stage_in_status function")
0391         return None, "no valid transfer id found"
0392 
0393     # trigger preparation
0394 
0395     def trigger_preparation(self, jobspec):
0396         # make logger
0397         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID} ThreadID={threading.current_thread().ident}", method_name="trigger_preparation")
0398         tmpLog.debug("start")
0399         # default return
0400         tmpRetVal = (True, "")
0401         # check that jobspec.computingSite is defined
0402         if jobspec.computingSite is None:
0403             # not found
0404             tmpLog.error("jobspec.computingSite is not defined")
0405             return False, "jobspec.computingSite is not defined"
0406         else:
0407             tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0408         # test we have a Globus Transfer Client
0409         if not self.tc:
0410             errStr = "failed to get Globus Transfer Client"
0411             tmpLog.error(errStr)
0412             return False, errStr
0413         # show the dummy transfer id and set to a value with the computingSite if needed.
0414         tmpLog.debug(f"self.dummy_transfer_id = {self.dummy_transfer_id}")
0415         if self.dummy_transfer_id == f"{dummy_transfer_id_base}_XXXX":
0416             old_dummy_transfer_id = self.dummy_transfer_id
0417             self.dummy_transfer_id = f"{dummy_transfer_id_base}_{jobspec.computingSite}"
0418             tmpLog.debug(f"Change self.dummy_transfer_id  from {old_dummy_transfer_id} to {self.dummy_transfer_id}")
0419         # set the dummy transfer ID which will be replaced with a real ID in check_stage_in_status()
0420         inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0421         lfns = list(inFiles.keys())
0422         # for inLFN in inFiles.keys():
0423         #    lfns.append(inLFN)
0424         tmpLog.debug(f"number of lfns - {len(lfns)} type(lfns) - {type(lfns)}")
0425         jobspec.set_groups_to_files({self.dummy_transfer_id: {"lfns": lfns, "groupStatus": "pending"}})
0426         if len(lfns) < 10:
0427             msgStr = f"jobspec.set_groups_to_files - self.dummy_tranfer_id - {self.dummy_transfer_id}, lfns - {lfns}, groupStatus - pending"
0428         else:
0429             tmp_lfns = lfns[:10]
0430             msgStr = f"jobspec.set_groups_to_files - self.dummy_tranfer_id - {self.dummy_transfer_id}, lfns (first 25) - {tmp_lfns}, groupStatus - pending"
0431         tmpLog.debug(msgStr)
0432         fileSpec_list = jobspec.get_input_file_specs(self.dummy_transfer_id, skip_ready=True)
0433         tmpLog.debug(f"call jobspec.get_input_file_specs({self.dummy_transfer_id}, skip_ready=True) num files returned = {len(fileSpec_list)}")
0434         tmpLog.debug(
0435             "call self.dbInterface.set_file_group(jobspec.get_input_file_specs(self.dummy_transfer_id,skip_ready=True),self.dummy_transfer_id,pending)"
0436         )
0437         tmpStat = self.dbInterface.set_file_group(fileSpec_list, self.dummy_transfer_id, "pending")
0438         msgStr = "called self.dbInterface.set_file_group(jobspec.get_input_file_specs(self.dummy_transfer_id,skip_ready=True),self.dummy_transfer_id,pending) return Status {}".format(
0439             tmpStat
0440         )
0441         tmpLog.debug(msgStr)
0442         return True, ""
0443 
0444     # make label for transfer task
0445 
0446     def make_label(self, jobspec):
0447         return f"IN-{jobspec.computingSite}-{jobspec.PandaID}"
0448 
0449     # resolve input file paths
0450     def resolve_input_paths(self, jobspec):
0451         # get input files
0452         inFiles = jobspec.get_input_file_attributes()
0453         # set path to each file
0454         for inLFN, inFile in inFiles.items():
0455             inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0456         # set
0457         jobspec.set_input_file_paths(inFiles)
0458         return True, ""
0459 
0460     # Globus specific commands