Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import datetime
0002 import hashlib
0003 import os
0004 import os.path
0005 import string
0006 import sys
0007 import tarfile
0008 import threading
0009 import time
0010 import uuid
0011 
0012 # TO BE REMOVED for python2.7
0013 import requests.packages.urllib3
0014 from globus_sdk import (
0015     NativeAppAuthClient,
0016     RefreshTokenAuthorizer,
0017     TransferClient,
0018     TransferData,
0019 )
0020 
0021 try:
0022     requests.packages.urllib3.disable_warnings()
0023 except BaseException:
0024     pass
0025 from pandaharvester.harvesterconfig import harvester_config
0026 from pandaharvester.harvestercore import core_utils
0027 from pandaharvester.harvestercore.plugin_base import PluginBase
0028 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0029 from pandaharvester.harvestermisc import globus_utils
0030 from pandaharvester.harvestermover import mover_utils
0031 from pandaharvester.harvesterstager.base_stager import BaseStager
0032 
0033 # Define dummy transfer identifier
0034 dummy_transfer_id_base = "dummy_id_for_out"
0035 # lock to get a unique ID
0036 uLock = threading.Lock()
0037 
0038 # number to get a unique ID
0039 uID = 0
0040 
0041 # logger
0042 _logger = core_utils.setup_logger("go_bulk_stager")
0043 
0044 
0045 def dump(obj):
0046     for attr in dir(obj):
0047         if hasattr(obj, attr):
0048             print(f"obj.{attr} = {getattr(obj, attr)}")
0049 
0050 
0051 def validate_transferid(transferid):
0052     tmptransferid = transferid.replace("-", "")
0053     return all(c in string.hexdigits for c in tmptransferid)
0054 
0055 
0056 # Globus plugin for stager with bulk transfers. For JobSpec and DBInterface methods, see
0057 # https://github.com/PanDAWMS/panda-harvester/wiki/Utilities#file-grouping-for-file-transfers
0058 class GlobusBulkStager(BaseStager):
0059     next_id = 0
0060     # constructor
0061 
0062     def __init__(self, **kwarg):
0063         PluginBase.__init__(self, **kwarg)
0064         # make logger
0065         tmpLog = self.make_logger(_logger, f"ThreadID={threading.current_thread().ident}", method_name="GlobusBulkStager __init__ ")
0066         tmpLog.debug("start")
0067         self.EventServicejob = False
0068         self.pathConvention = None
0069         self.id = GlobusBulkStager.next_id
0070         self.changeFileStatusOnSuccess = True
0071         GlobusBulkStager.next_id += 1
0072         with uLock:
0073             global uID
0074             # self.dummy_transfer_id = '{0}_{1}_{2}'.format(dummy_transfer_id_base,self.id,int(round(time.time() * 1000)))
0075             self.dummy_transfer_id = f"{dummy_transfer_id_base}_XXXX"
0076             uID += 1
0077             uID %= harvester_config.stager.nThreads
0078         # create Globus Transfer Client
0079         try:
0080             self.tc = None
0081             # need to get client_id and refresh_token from PanDA server via harvester cache mechanism
0082             tmpLog.debug("about to call dbInterface.get_cache(globus_secret)")
0083             c_data = self.dbInterface.get_cache("globus_secret")
0084             if (c_data is not None) and c_data.data["StatusCode"] == 0:
0085                 tmpLog.debug("Got the globus_secrets from PanDA")
0086                 self.client_id = c_data.data["publicKey"]  # client_id
0087                 self.refresh_token = c_data.data["privateKey"]  # refresh_token
0088                 tmpStat, self.tc = globus_utils.create_globus_transfer_client(tmpLog, self.client_id, self.refresh_token)
0089                 if not tmpStat:
0090                     self.tc = None
0091                     errStr = "failed to create Globus Transfer Client"
0092                     tmpLog.error(errStr)
0093             else:
0094                 self.client_id = None
0095                 self.refresh_token = None
0096                 self.tc = None
0097                 errStr = "failed to get Globus Client ID and Refresh Token"
0098                 tmpLog.error(errStr)
0099         except BaseException:
0100             core_utils.dump_error_message(tmpLog)
0101         tmpLog.debug("__init__ finish")
0102 
0103     # get dummy_transfer_id
0104 
0105     def get_dummy_transfer_id(self):
0106         return self.dummy_transfer_id
0107 
0108     # set dummy_transfer_id for testing
0109     def set_dummy_transfer_id_testing(self, dummy_transfer_id):
0110         self.dummy_transfer_id = dummy_transfer_id
0111 
0112     # set FileSpec.objstoreID
0113     def set_FileSpec_objstoreID(self, jobspec, objstoreID, pathConvention):
0114         # loop over all output files
0115         for fileSpec in jobspec.outFiles:
0116             fileSpec.objstoreID = objstoreID
0117             fileSpec.pathConvention = pathConvention
0118 
0119     # set FileSpec.status
0120     def set_FileSpec_status(self, jobspec, status):
0121         # loop over all output files
0122         for fileSpec in jobspec.outFiles:
0123             fileSpec.status = status
0124 
0125     # check status
0126     def check_stage_out_status(self, jobspec):
0127         # make logger
0128         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID} ThreadID={threading.current_thread().ident}", method_name="check_stage_out_status")
0129         tmpLog.debug("start")
0130         # default return
0131         tmpRetVal = (True, "")
0132         # check that jobspec.computingSite is defined
0133         if jobspec.computingSite is None:
0134             # not found
0135             tmpLog.error("jobspec.computingSite is not defined")
0136             return False, "jobspec.computingSite is not defined"
0137         else:
0138             tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0139         # show the dummy transfer id and set to a value with the PandaID if needed.
0140         tmpLog.debug(f"self.dummy_transfer_id = {self.dummy_transfer_id}")
0141         if self.dummy_transfer_id == f"{dummy_transfer_id_base}_XXXX":
0142             old_dummy_transfer_id = self.dummy_transfer_id
0143             self.dummy_transfer_id = f"{dummy_transfer_id_base}_{jobspec.computingSite}"
0144             tmpLog.debug(f"Change self.dummy_transfer_id  from {old_dummy_transfer_id} to {self.dummy_transfer_id}")
0145         # set flag if have db lock
0146         have_db_lock = False
0147         # get the queueConfig and corresponding objStoreID_ES
0148         queueConfigMapper = QueueConfigMapper()
0149         queueConfig = queueConfigMapper.get_queue(jobspec.computingSite)
0150         # check queueConfig stager section to see if jobtype is set
0151         if "jobtype" in queueConfig.stager:
0152             if queueConfig.stager["jobtype"] == "EventService":
0153                 self.EventServicejob = True
0154                 tmpLog.debug("Setting job type to EventService")
0155             # guard against old parameter in queue config
0156             if queueConfig.stager["jobtype"] == "Yoda":
0157                 self.EventServicejob = True
0158                 tmpLog.debug("Setting job type to EventService")
0159         # set the location of the files in fileSpec.objstoreID
0160         # see file /cvmfs/atlas.cern.ch/repo/sw/local/etc/agis_ddmendpoints.json
0161         self.objstoreID = int(queueConfig.stager["objStoreID_ES"])
0162         if self.EventServicejob:
0163             self.pathConvention = int(queueConfig.stager["pathConvention"])
0164             tmpLog.debug(f"EventService Job - PandaID = {jobspec.PandaID} objstoreID = {self.objstoreID} pathConvention ={self.pathConvention}")
0165         else:
0166             self.pathConvention = None
0167             tmpLog.debug(f"PandaID = {jobspec.PandaID} objstoreID = {self.objstoreID}")
0168         # test we have a Globus Transfer Client
0169         if not self.tc:
0170             errStr = "failed to get Globus Transfer Client"
0171             tmpLog.error(errStr)
0172             return False, errStr
0173         # set transferID to None
0174         transferID = None
0175         # get the scope of the log files
0176         outfileattrib = jobspec.get_output_file_attributes()
0177         # get transfer groups
0178         groups = jobspec.get_groups_of_output_files()
0179         tmpLog.debug(f"jobspec.get_groups_of_output_files() = : {groups}")
0180         # lock if the dummy transfer ID is used to avoid submitting duplicated transfer requests
0181         for dummy_transferID in groups:
0182             if validate_transferid(dummy_transferID):
0183                 continue
0184             # lock for 120 sec
0185             tmpLog.debug(f"attempt to set DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0186             have_db_lock = self.dbInterface.get_object_lock(dummy_transferID, lock_interval=120)
0187             if not have_db_lock:
0188                 # escape since locked by another thread
0189                 msgStr = "escape since locked by another thread"
0190                 tmpLog.debug(msgStr)
0191                 return None, msgStr
0192             # refresh group information since that could have been updated by another thread before getting the lock
0193             tmpLog.debug("self.dbInterface.refresh_file_group_info(jobspec)")
0194             self.dbInterface.refresh_file_group_info(jobspec)
0195             # get transfer groups again with refreshed info
0196             tmpLog.debug("After db refresh call groups=jobspec.get_groups_of_output_files()")
0197             groups = jobspec.get_groups_of_output_files()
0198             tmpLog.debug(f"jobspec.get_groups_of_output_files() = : {groups}")
0199             # the dummy transfer ID is still there
0200             if dummy_transferID in groups:
0201                 groupUpdateTime = groups[dummy_transferID]["groupUpdateTime"]
0202                 # get files with the dummy transfer ID across jobs
0203                 fileSpecs = self.dbInterface.get_files_with_group_id(dummy_transferID)
0204                 # submit transfer if there are more than 10 files or the group was made before more than 10 min
0205                 msgStr = f"dummy_transferID = {dummy_transferID}  number of files = {len(fileSpecs)}"
0206                 tmpLog.debug(msgStr)
0207                 if len(fileSpecs) >= 10 or groupUpdateTime < core_utils.naive_utcnow() - datetime.timedelta(minutes=10):
0208                     tmpLog.debug("prepare to transfer files")
0209                     # submit transfer and get a real transfer ID
0210                     # set the Globus destination Endpoint id and path will get them from Agis eventually
0211                     # self.Globus_srcPath = queueConfig.stager['Globus_srcPath']
0212                     self.srcEndpoint = queueConfig.stager["srcEndpoint"]
0213                     self.Globus_srcPath = self.basePath
0214                     self.Globus_dstPath = queueConfig.stager["Globus_dstPath"]
0215                     self.dstEndpoint = queueConfig.stager["dstEndpoint"]
0216                     # Test the endpoints and create the transfer data class
0217                     errMsg = None
0218                     try:
0219                         # Test endpoints for activation
0220                         tmpStatsrc, srcStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.srcEndpoint)
0221                         tmpStatdst, dstStr = globus_utils.check_endpoint_activation(tmpLog, self.tc, self.dstEndpoint)
0222                         if tmpStatsrc and tmpStatdst:
0223                             errStr = "source Endpoint and destination Endpoint activated"
0224                             tmpLog.debug(errStr)
0225                         else:
0226                             errMsg = ""
0227                             if not tmpStatsrc:
0228                                 errMsg += " source Endpoint not activated "
0229                             if not tmpStatdst:
0230                                 errMsg += " destination Endpoint not activated "
0231                             # release process lock
0232                             tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0233                             self.have_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0234                             if not self.have_db_lock:
0235                                 errMsg += f" - Could not release DB lock for {dummy_transferID}"
0236                             tmpLog.error(errMsg)
0237                             tmpRetVal = (None, errMsg)
0238                             return tmpRetVal
0239                         # both endpoints activated now prepare to transfer data
0240                         tdata = None
0241                         tdata = TransferData(self.tc, self.srcEndpoint, self.dstEndpoint, sync_level="checksum")
0242                     except BaseException:
0243                         errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0244                         # release process lock
0245                         tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0246                         release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0247                         if not release_db_lock:
0248                             errMsg += f" - Could not release DB lock for {dummy_transferID}"
0249                         tmpLog.error(errMsg)
0250                         tmpRetVal = (errStat, errMsg)
0251                         return tmpRetVal
0252                     # loop over all files
0253                     ifile = 0
0254                     for fileSpec in fileSpecs:
0255                         # protect against blank lfn's
0256                         if not fileSpec.lfn:
0257                             msgStr = "fileSpec.lfn is empty"
0258                             tmpLog.debug(msgStr)
0259                             continue
0260                         logfile = False
0261                         scope = "panda"
0262                         if fileSpec.scope is not None:
0263                             scope = fileSpec.scope
0264                         #The scope of the Raythena output files should not be "transient" so this is remove.
0265                         # only print to log file first 25 files
0266                         if ifile < 25:
0267                             msgStr = f"fileSpec.lfn - {fileSpec.lfn} fileSpec.scope - {fileSpec.scope}"
0268                             tmpLog.debug(msgStr)
0269                         if ifile == 25:
0270                             msgStr = "printed first 25 files skipping the rest".format(fileSpec.lfn, fileSpec.scope)
0271                             tmpLog.debug(msgStr)
0272                         hash = hashlib.md5()
0273                         if sys.version_info.major == 2:
0274                             hash.update(f"{scope}:{fileSpec.lfn}")
0275                         if sys.version_info.major == 3:
0276                             hash_string = f"{scope}:{fileSpec.lfn}"
0277                             hash.update(bytes(hash_string, "utf-8"))
0278                         hash_hex = hash.hexdigest()
0279                         correctedscope = "/".join(scope.split("."))
0280                         srcURL = fileSpec.path
0281                         dstURL = f"{self.Globus_dstPath}/{correctedscope}/{hash_hex[0:2]}/{hash_hex[2:4]}/{fileSpec.lfn}"
0282                         if logfile:
0283                             tmpLog.debug(f"src={srcURL} dst={dstURL}")
0284                         if ifile < 25:
0285                             tmpLog.debug(f"src={srcURL} dst={dstURL}")
0286                         # add files to transfer object - tdata
0287                         if os.access(srcURL, os.R_OK):
0288                             if ifile < 25:
0289                                 tmpLog.debug(f"tdata.add_item({srcURL},{dstURL})")
0290                             tdata.add_item(srcURL, dstURL)
0291                         else:
0292                             errMsg = f"source file {srcURL} does not exist"
0293                             # release process lock
0294                             tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0295                             release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0296                             if not release_db_lock:
0297                                 errMsg += f" - Could not release DB lock for {dummy_transferID}"
0298                             tmpLog.error(errMsg)
0299                             tmpRetVal = (False, errMsg)
0300                             return tmpRetVal
0301                         ifile += 1
0302                     # submit transfer
0303                     tmpLog.debug(f"Number of files to transfer - {len(tdata['DATA'])}")
0304                     try:
0305                         transfer_result = self.tc.submit_transfer(tdata)
0306                         # check status code and message
0307                         tmpLog.debug(str(transfer_result))
0308                         if transfer_result["code"] == "Accepted":
0309                             # succeeded
0310                             # set transfer ID which are used for later lookup
0311                             transferID = transfer_result["task_id"]
0312                             tmpLog.debug(f"successfully submitted id={transferID}")
0313                             # set status for files
0314                             self.dbInterface.set_file_group(fileSpecs, transferID, "running")
0315                             msgStr = f"submitted transfer with ID={transferID}"
0316                             tmpLog.debug(msgStr)
0317                         else:
0318                             # release process lock
0319                             tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0320                             release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0321                             if not release_db_lock:
0322                                 errMsg = f"Could not release DB lock for {dummy_transferID}"
0323                                 tmpLog.error(errMsg)
0324                             tmpRetVal = (None, transfer_result["message"])
0325                             return tmpRetVal
0326                     except Exception as e:
0327                         errStat, errMsg = globus_utils.handle_globus_exception(tmpLog)
0328                         # release process lock
0329                         tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0330                         release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0331                         if not release_db_lock:
0332                             errMsg += f" - Could not release DB lock for {dummy_transferID}"
0333                         tmpLog.error(errMsg)
0334                         return errStat, errMsg
0335                 else:
0336                     msgStr = "wait until enough files are pooled"
0337                     tmpLog.debug(msgStr)
0338                 # release the lock
0339                 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0340                 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0341                 if release_db_lock:
0342                     tmpLog.debug(f"released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0343                     have_db_lock = False
0344                 else:
0345                     msgStr += f" - Could not release DB lock for {dummy_transferID}"
0346                     tmpLog.error(msgStr)
0347                 # return None to retry later
0348                 return None, msgStr
0349             # release the db lock if needed
0350             if have_db_lock:
0351                 tmpLog.debug(f"attempt to release DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0352                 release_db_lock = self.dbInterface.release_object_lock(dummy_transferID)
0353                 if release_db_lock:
0354                     tmpLog.debug(f"released DB lock for self.id - {self.id} dummy_transferID - {dummy_transferID}")
0355                     have_db_lock = False
0356                 else:
0357                     msgStr += f" - Could not release DB lock for {dummy_transferID}"
0358                     tmpLog.error(msgStr)
0359                     return None, msgStr
0360         # check transfer with real transfer IDs
0361         # get transfer groups
0362         tmpLog.debug("groups = jobspec.get_groups_of_output_files()")
0363         groups = jobspec.get_groups_of_output_files()
0364         tmpLog.debug(f"Number of transfer groups - {len(groups)}")
0365         tmpLog.debug(f"transfer groups any state - {groups}")
0366         if len(groups) == 0:
0367             tmpLog.debug("jobspec.get_groups_of_output_files(skip_done=True) returned no files ")
0368             tmpLog.debug("check_stage_out_status return status - True ")
0369             return True, ""
0370 
0371         for transferID in groups:
0372             # allow only valid UUID
0373             if validate_transferid(transferID):
0374                 # get transfer task
0375                 tmpStat, transferTasks = globus_utils.get_transfer_task_by_id(tmpLog, self.tc, transferID)
0376                 # return a temporary error when failed to get task
0377                 if not tmpStat:
0378                     errStr = f"failed to get transfer task; tc = {str(self.tc)}; transferID = {str(transferID)}"
0379                     tmpLog.error(errStr)
0380                     return None, errStr
0381                 # return a temporary error when task is missing
0382                 if transferID not in transferTasks:
0383                     errStr = f"transfer task ID - {transferID} is missing"
0384                     tmpLog.error(errStr)
0385                     return None, errStr
0386                 # succeeded in finding a transfer task by tranferID
0387                 if transferTasks[transferID]["status"] == "SUCCEEDED":
0388                     tmpLog.debug(f"transfer task {transferID} succeeded")
0389                     self.set_FileSpec_objstoreID(jobspec, self.objstoreID, self.pathConvention)
0390                     if self.changeFileStatusOnSuccess:
0391                         self.set_FileSpec_status(jobspec, "finished")
0392                     return True, ""
0393                 # failed
0394                 if transferTasks[transferID]["status"] == "FAILED":
0395                     errStr = f"transfer task {transferID} failed"
0396                     tmpLog.error(errStr)
0397                     self.set_FileSpec_status(jobspec, "failed")
0398                     return False, errStr
0399                 # another status
0400                 tmpStr = f"transfer task {transferID} status: {transferTasks[transferID]['status']}"
0401                 tmpLog.debug(tmpStr)
0402                 return None, ""
0403         # end of loop over transfer groups
0404         tmpLog.debug("End of loop over transfers groups - ending check_stage_out_status function")
0405         return None, "no valid transfer id found"
0406 
0407     # trigger stage out
0408     def trigger_stage_out(self, jobspec):
0409         # make logger
0410         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}  ThreadID={threading.current_thread().ident}", method_name="trigger_stage_out")
0411         tmpLog.debug("start")
0412 
0413         # default return
0414         tmpRetVal = (True, "")
0415         # check that jobspec.computingSite is defined
0416         if jobspec.computingSite is None:
0417             # not found
0418             tmpLog.error("jobspec.computingSite is not defined")
0419             return False, "jobspec.computingSite is not defined"
0420         else:
0421             tmpLog.debug(f"jobspec.computingSite : {jobspec.computingSite}")
0422         # test we have a Globus Transfer Client
0423         if not self.tc:
0424             errStr = "failed to get Globus Transfer Client"
0425             tmpLog.error(errStr)
0426             return False, errStr
0427         # show the dummy transfer id and set to a value with the PandaID if needed.
0428         tmpLog.debug(f"self.dummy_transfer_id = {self.dummy_transfer_id}")
0429         if self.dummy_transfer_id == f"{dummy_transfer_id_base}_XXXX":
0430             old_dummy_transfer_id = self.dummy_transfer_id
0431             self.dummy_transfer_id = f"{dummy_transfer_id_base}_{jobspec.computingSite}"
0432             tmpLog.debug(f"Change self.dummy_transfer_id  from {old_dummy_transfer_id} to {self.dummy_transfer_id}")
0433         # set the dummy transfer ID which will be replaced with a real ID in check_stage_out_status()
0434         lfns = []
0435         for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0436             # test if fileSpec.lfn is not empty
0437             if not fileSpec.lfn:
0438                 msgStr = "fileSpec.lfn is empty"
0439             else:
0440                 msgStr = f"fileSpec.lfn is {fileSpec.lfn}"
0441                 lfns.append(fileSpec.lfn)
0442             tmpLog.debug(msgStr)
0443         jobspec.set_groups_to_files({self.dummy_transfer_id: {"lfns": lfns, "groupStatus": "pending"}})
0444         msgStr = f"jobspec.set_groups_to_files - self.dummy_tranfer_id - {self.dummy_transfer_id}, lfns - {lfns}, groupStatus - pending"
0445         tmpLog.debug(msgStr)
0446         tmpLog.debug("call self.dbInterface.set_file_group(jobspec.get_output_file_specs(skip_done=True),self.dummy_transfer_id,pending)")
0447         tmpStat = self.dbInterface.set_file_group(jobspec.get_output_file_specs(skip_done=True), self.dummy_transfer_id, "pending")
0448         tmpLog.debug("called self.dbInterface.set_file_group(jobspec.get_output_file_specs(skip_done=True),self.dummy_transfer_id,pending)")
0449         return True, ""
0450 
0451     # use tar despite name for  output files
0452     def zip_output(self, jobspec):
0453         # make logger
0454         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID} ThreadID={threading.current_thread().ident}", method_name="zip_output")
0455         return self.simple_zip_output(jobspec, tmpLog)
0456 
0457     # make label for transfer task
0458     def make_label(self, jobspec):
0459         return f"OUT-{jobspec.computingSite}-{jobspec.PandaID}"
0460 
0461     # resolve input file paths
0462     def resolve_input_paths(self, jobspec):
0463         # get input files
0464         inFiles = jobspec.get_input_file_attributes()
0465         # set path to each file
0466         for inLFN, inFile in inFiles.items():
0467             inFile["path"] = mover_utils.construct_file_path(self.basePath, inFile["scope"], inLFN)
0468         # set
0469         jobspec.set_input_file_paths(inFiles)
0470         return True, ""