Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import datetime
0002 import uuid
0003 
0004 from pandaharvester.harvestercore import core_utils
0005 
0006 from .base_stager import BaseStager
0007 
0008 # dummy transfer identifier
0009 dummy_transfer_id = "dummy_id_for_out"
0010 
0011 # logger
0012 baseLogger = core_utils.setup_logger("dummy_bulk_stager")
0013 
0014 
0015 # dummy plugin for stager with bulk transfers. For JobSpec and DBInterface methods, see
0016 # https://github.com/PanDAWMS/panda-harvester/wiki/Utilities#file-grouping-for-file-transfers
0017 class DummyBulkStager(BaseStager):
0018     # constructor
0019     def __init__(self, **kwarg):
0020         BaseStager.__init__(self, **kwarg)
0021 
0022     # check status
0023     def check_stage_out_status(self, jobspec):
0024         # make logger
0025         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0026         tmpLog.debug("start")
0027         # get transfer groups
0028         groups = jobspec.get_groups_of_output_files()
0029         # lock if the dummy transfer ID is used to avoid submitting duplicated transfer requests
0030         if dummy_transfer_id in groups:
0031             # lock for 120 sec
0032             locked = self.dbInterface.get_object_lock(dummy_transfer_id, lock_interval=120)
0033             if not locked:
0034                 # escape since locked by another thread
0035                 msgStr = "escape since locked by another thread"
0036                 tmpLog.debug(msgStr)
0037                 return None, msgStr
0038             # refresh group information since that could have been updated by another thread before getting the lock
0039             self.dbInterface.refresh_file_group_info(jobspec)
0040             # get transfer groups again with refreshed info
0041             groups = jobspec.get_groups_of_output_files()
0042             # the dummy transfer ID is still there
0043             if dummy_transfer_id in groups:
0044                 groupUpdateTime = groups[dummy_transfer_id]["groupUpdateTime"]
0045                 # get files with the dummy transfer ID across jobs
0046                 fileSpecs = self.dbInterface.get_files_with_group_id(dummy_transfer_id)
0047                 # submit transfer if there are more than 10 files or the group was made before more than 10 min.
0048                 # those thresholds may be config params.
0049                 if len(fileSpecs) >= 10 or groupUpdateTime < core_utils.naive_utcnow() - datetime.timedelta(minutes=10):
0050                     # submit transfer and get a real transfer ID
0051                     # ...
0052                     transferID = str(uuid.uuid4())
0053                     # set the real transfer ID
0054                     self.dbInterface.set_file_group(fileSpecs, transferID, "running")
0055                     msgStr = f"submitted transfer with ID={transferID}"
0056                     tmpLog.debug(msgStr)
0057                 else:
0058                     msgStr = "wait until enough files are pooled"
0059                     tmpLog.debug(msgStr)
0060                 # release the lock
0061                 self.dbInterface.release_object_lock(dummy_transfer_id)
0062                 # return None to retry later
0063                 return None, msgStr
0064             # release the lock
0065             self.dbInterface.release_object_lock(dummy_transfer_id)
0066         # check transfer with real transfer IDs
0067         # ...
0068         # then set file status if successful
0069         for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0070             fileSpec.status = "finished"
0071         tmpLog.debug("all finished")
0072         return True, ""
0073 
0074     # trigger stage out
0075     def trigger_stage_out(self, jobspec):
0076         # set the dummy transfer ID which will be replaced with a real ID in check_stage_out_status()
0077         lfns = []
0078         for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0079             lfns.append(fileSpec.lfn)
0080         jobspec.set_groups_to_files({dummy_transfer_id: {"lfns": lfns, "groupStatus": "pending"}})
0081         return True, ""
0082 
0083     # zip output files
0084     def zip_output(self, jobspec):
0085         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0086         return self.simple_zip_output(jobspec, tmpLog)