File indexing completed on 2026-04-20 07:58:59
0001 import uuid
0002
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.plugin_base import PluginBase
0005
0006
0007 _logger = core_utils.setup_logger("dummy_preparator")
0008
0009
0010
0011 class DummyPreparator(PluginBase):
0012
0013 def __init__(self, **kwarg):
0014 PluginBase.__init__(self, **kwarg)
0015
0016
0017 def trigger_preparation(self, jobspec):
0018 """Trigger the stage-in procedure synchronously or asynchronously for the job.
0019 If the return code of this method is True, the job goes to the next step. If it is False,
0020 preparator immediately gives up the job. If it is None, the job is retried later.
0021 Input file attributes are available through jobspec.get_input_file_attributes(skip_ready=True)
0022 which gives a dictionary. The key of the dictionary is LFN of the input file
0023 and the value is a dictionary of file attributes. The attribute names are
0024 fsize, guid, checksum, scope, dataset, attemptNr, and endpoint. attemptNr shows how many times
0025 the file was tried so far. Grouping information such as transferID can be set to input files using
0026 jobspec.set_group_to_files(id_map) where id_map is
0027 {groupID:'lfns':[lfn1, ...], 'status':status}, and groupID and status are arbitrary strings.
0028
0029 :param jobspec: job specifications
0030 :type jobspec: JobSpec
0031 :return: A tuple of return code (True: success, False: fatal error, None: temporary error)
0032 and error dialog
0033 :rtype: (bool, string)
0034 """
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056 return True, ""
0057
0058
0059 def check_stage_in_status(self, jobspec):
0060 """Check status of the stage-in procedure.
0061 If the return code of this method is True, the job goes to the next step. If it is False,
0062 preparator immediately gives up the job. If it is None, the job is retried later.
0063 If preparation is done synchronously in trigger_preparation
0064 this method should always return True. Status of file group can be updated using
0065 jobspec.update_group_status_in_files(group_id, group_status) if necessary.
0066
0067 :param jobspec: job specifications
0068 :type jobspec: JobSpec
0069 :return: A tuple of return code (True: transfer success, False: fatal transfer failure,
0070 None: on-going or temporary failure) and error dialog
0071 :rtype: (bool, string)
0072 """
0073
0074
0075
0076
0077
0078
0079
0080 return True, ""
0081
0082
0083 def resolve_input_paths(self, jobspec):
0084 """Set input file paths to jobspec.get_input_file_attributes[LFN]['path'] for the job.
0085 New input file attributes need to be set to jobspec using jobspec.set_input_file_paths()
0086 after setting the file paths.
0087
0088 :param jobspec: job specifications
0089 :type jobspec: JobSpec
0090 :return: A tuple of return code (True for success, False otherwise) and error dialog
0091 :rtype: (bool, string)
0092 """
0093
0094
0095 inFiles = jobspec.get_input_file_attributes()
0096
0097 for inLFN, inFile in inFiles.items():
0098 inFile["path"] = f"dummypath/{inLFN}"
0099
0100 jobspec.set_input_file_paths(inFiles)
0101 return True, ""