File indexing completed on 2026-04-20 07:59:00
0001 import datetime
0002 import uuid
0003
0004 from pandaharvester.harvestercore import core_utils
0005
0006 from .base_stager import BaseStager
0007
0008
0009 dummy_transfer_id = "dummy_id_for_out"
0010
0011
0012 baseLogger = core_utils.setup_logger("dummy_bulk_stager")
0013
0014
0015
0016
0017 class DummyBulkStager(BaseStager):
0018
0019 def __init__(self, **kwarg):
0020 BaseStager.__init__(self, **kwarg)
0021
0022
0023 def check_stage_out_status(self, jobspec):
0024
0025 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0026 tmpLog.debug("start")
0027
0028 groups = jobspec.get_groups_of_output_files()
0029
0030 if dummy_transfer_id in groups:
0031
0032 locked = self.dbInterface.get_object_lock(dummy_transfer_id, lock_interval=120)
0033 if not locked:
0034
0035 msgStr = "escape since locked by another thread"
0036 tmpLog.debug(msgStr)
0037 return None, msgStr
0038
0039 self.dbInterface.refresh_file_group_info(jobspec)
0040
0041 groups = jobspec.get_groups_of_output_files()
0042
0043 if dummy_transfer_id in groups:
0044 groupUpdateTime = groups[dummy_transfer_id]["groupUpdateTime"]
0045
0046 fileSpecs = self.dbInterface.get_files_with_group_id(dummy_transfer_id)
0047
0048
0049 if len(fileSpecs) >= 10 or groupUpdateTime < core_utils.naive_utcnow() - datetime.timedelta(minutes=10):
0050
0051
0052 transferID = str(uuid.uuid4())
0053
0054 self.dbInterface.set_file_group(fileSpecs, transferID, "running")
0055 msgStr = f"submitted transfer with ID={transferID}"
0056 tmpLog.debug(msgStr)
0057 else:
0058 msgStr = "wait until enough files are pooled"
0059 tmpLog.debug(msgStr)
0060
0061 self.dbInterface.release_object_lock(dummy_transfer_id)
0062
0063 return None, msgStr
0064
0065 self.dbInterface.release_object_lock(dummy_transfer_id)
0066
0067
0068
0069 for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0070 fileSpec.status = "finished"
0071 tmpLog.debug("all finished")
0072 return True, ""
0073
0074
0075 def trigger_stage_out(self, jobspec):
0076
0077 lfns = []
0078 for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0079 lfns.append(fileSpec.lfn)
0080 jobspec.set_groups_to_files({dummy_transfer_id: {"lfns": lfns, "groupStatus": "pending"}})
0081 return True, ""
0082
0083
0084 def zip_output(self, jobspec):
0085 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0086 return self.simple_zip_output(jobspec, tmpLog)