Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import datetime
0002 import threading
0003 import uuid
0004 
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 
0009 # dummy transfer identifier
0010 dummy_transfer_id_base = "dummy_id_for_in"
0011 
0012 # logger
0013 _logger = core_utils.setup_logger("dummy_bulk_preparator")
0014 
0015 # lock to get a unique ID
0016 uLock = threading.Lock()
0017 
0018 # number to get a unique ID
0019 uID = 0
0020 
0021 
0022 # dummy plugin for preparator with bulk transfers. For JobSpec and DBInterface methods, see
0023 # https://github.com/PanDAWMS/panda-harvester/wiki/Utilities#file-grouping-for-file-transfers
0024 class DummyBulkPreparator(PluginBase):
0025     # constructor
0026     def __init__(self, **kwarg):
0027         PluginBase.__init__(self, **kwarg)
0028         with uLock:
0029             global uID
0030             self.dummy_transfer_id = f"{dummy_transfer_id_base}_{uID}"
0031             uID += 1
0032             uID %= harvester_config.preparator.nThreads
0033 
0034     # trigger preparation
0035     def trigger_preparation(self, jobspec):
0036         # set the dummy transfer ID which will be replaced with a real ID in check_stage_in_status()
0037         inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0038         lfns = inFiles.keys()
0039         for inLFN in inFiles.keys():
0040             lfns.append(inLFN)
0041         jobspec.set_groups_to_files({self.dummy_transfer_id: {"lfns": lfns, "groupStatus": "pending"}})
0042         return True, ""
0043 
0044     # check status
0045     def check_stage_in_status(self, jobspec):
0046         # get groups of input files except ones already in ready state
0047         groups = jobspec.get_groups_of_input_files(skip_ready=True)
0048         # lock if the dummy transfer ID is used to avoid submitting duplicated transfer requests
0049         if self.dummy_transfer_id in groups:
0050             # lock for 120 sec
0051             locked = self.dbInterface.get_object_lock(self.dummy_transfer_id, lock_interval=120)
0052             if not locked:
0053                 # escape since locked by another thread
0054                 msgStr = "escape since locked by another thread"
0055                 return None, msgStr
0056             # refresh group information since that could have been updated by another thread before getting the lock
0057             self.dbInterface.refresh_file_group_info(jobspec)
0058             # get transfer groups again with refreshed info
0059             groups = jobspec.get_groups_of_input_files(skip_ready=True)
0060             # the dummy transfer ID is still there
0061             if self.dummy_transfer_id in groups:
0062                 groupUpdateTime = groups[self.dummy_transfer_id]["groupUpdateTime"]
0063                 # get files with the dummy transfer ID across jobs
0064                 fileSpecs = self.dbInterface.get_files_with_group_id(self.dummy_transfer_id)
0065                 # submit transfer if there are more than 10 files or the group was made before more than 10 min.
0066                 # those thresholds may be config params.
0067                 if len(fileSpecs) >= 10 or groupUpdateTime < core_utils.naive_utcnow() - datetime.timedelta(minutes=10):
0068                     # submit transfer and get a real transfer ID
0069                     # ...
0070                     transferID = str(uuid.uuid4())
0071                     # set the real transfer ID
0072                     self.dbInterface.set_file_group(fileSpecs, transferID, "running")
0073                     msgStr = f"real transfer submitted with ID={transferID}"
0074                 else:
0075                     msgStr = f"wait until enough files are pooled with {self.dummy_transfer_id}"
0076                 # release the lock
0077                 self.dbInterface.release_object_lock(self.dummy_transfer_id)
0078                 # return None to retry later
0079                 return None, msgStr
0080             # release the lock
0081             self.dbInterface.release_object_lock(self.dummy_transfer_id)
0082         # check transfer with real transfer IDs
0083         # ...
0084         # then update transfer status if successful
0085         for transferID, transferInfo in groups.items():
0086             jobspec.update_group_status_in_files(transferID, "done")
0087         return True, ""
0088 
0089     # resolve input file paths
0090     def resolve_input_paths(self, jobspec):
0091         """Set input file paths to jobspec.get_input_file_attributes[LFN]['path'] for the job.
0092         New input file attributes need to be set to jobspec using jobspec.set_input_file_paths()
0093         after setting the file paths.
0094 
0095         :param jobspec: job specifications
0096         :type jobspec: JobSpec
0097         :return: A tuple of return code (True for success, False otherwise) and error dialog
0098         :rtype: (bool, string)
0099         """
0100         # get input files
0101         inFiles = jobspec.get_input_file_attributes()
0102         # set path to each file
0103         for inLFN, inFile in inFiles.items():
0104             inFile["path"] = f"dummypath/{inLFN}"
0105         # set
0106         jobspec.set_input_file_paths(inFiles)
0107         return True, ""