File indexing completed on 2026-04-20 07:58:59
0001 import datetime
0002 import threading
0003 import uuid
0004
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008
0009
0010 dummy_transfer_id_base = "dummy_id_for_in"
0011
0012
0013 _logger = core_utils.setup_logger("dummy_bulk_preparator")
0014
0015
0016 uLock = threading.Lock()
0017
0018
0019 uID = 0
0020
0021
0022
0023
0024 class DummyBulkPreparator(PluginBase):
0025
0026 def __init__(self, **kwarg):
0027 PluginBase.__init__(self, **kwarg)
0028 with uLock:
0029 global uID
0030 self.dummy_transfer_id = f"{dummy_transfer_id_base}_{uID}"
0031 uID += 1
0032 uID %= harvester_config.preparator.nThreads
0033
0034
0035 def trigger_preparation(self, jobspec):
0036
0037 inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0038 lfns = inFiles.keys()
0039 for inLFN in inFiles.keys():
0040 lfns.append(inLFN)
0041 jobspec.set_groups_to_files({self.dummy_transfer_id: {"lfns": lfns, "groupStatus": "pending"}})
0042 return True, ""
0043
0044
0045 def check_stage_in_status(self, jobspec):
0046
0047 groups = jobspec.get_groups_of_input_files(skip_ready=True)
0048
0049 if self.dummy_transfer_id in groups:
0050
0051 locked = self.dbInterface.get_object_lock(self.dummy_transfer_id, lock_interval=120)
0052 if not locked:
0053
0054 msgStr = "escape since locked by another thread"
0055 return None, msgStr
0056
0057 self.dbInterface.refresh_file_group_info(jobspec)
0058
0059 groups = jobspec.get_groups_of_input_files(skip_ready=True)
0060
0061 if self.dummy_transfer_id in groups:
0062 groupUpdateTime = groups[self.dummy_transfer_id]["groupUpdateTime"]
0063
0064 fileSpecs = self.dbInterface.get_files_with_group_id(self.dummy_transfer_id)
0065
0066
0067 if len(fileSpecs) >= 10 or groupUpdateTime < core_utils.naive_utcnow() - datetime.timedelta(minutes=10):
0068
0069
0070 transferID = str(uuid.uuid4())
0071
0072 self.dbInterface.set_file_group(fileSpecs, transferID, "running")
0073 msgStr = f"real transfer submitted with ID={transferID}"
0074 else:
0075 msgStr = f"wait until enough files are pooled with {self.dummy_transfer_id}"
0076
0077 self.dbInterface.release_object_lock(self.dummy_transfer_id)
0078
0079 return None, msgStr
0080
0081 self.dbInterface.release_object_lock(self.dummy_transfer_id)
0082
0083
0084
0085 for transferID, transferInfo in groups.items():
0086 jobspec.update_group_status_in_files(transferID, "done")
0087 return True, ""
0088
0089
0090 def resolve_input_paths(self, jobspec):
0091 """Set input file paths to jobspec.get_input_file_attributes[LFN]['path'] for the job.
0092 New input file attributes need to be set to jobspec using jobspec.set_input_file_paths()
0093 after setting the file paths.
0094
0095 :param jobspec: job specifications
0096 :type jobspec: JobSpec
0097 :return: A tuple of return code (True for success, False otherwise) and error dialog
0098 :rtype: (bool, string)
0099 """
0100
0101 inFiles = jobspec.get_input_file_attributes()
0102
0103 for inLFN, inFile in inFiles.items():
0104 inFile["path"] = f"dummypath/{inLFN}"
0105
0106 jobspec.set_input_file_paths(inFiles)
0107 return True, ""