Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import uuid
0002 
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.plugin_base import PluginBase
0005 
0006 # logger
0007 _logger = core_utils.setup_logger("dummy_preparator")
0008 
0009 
0010 # dummy plugin for preparator
0011 class DummyPreparator(PluginBase):
0012     # constructor
0013     def __init__(self, **kwarg):
0014         PluginBase.__init__(self, **kwarg)
0015 
0016     # trigger preparation
0017     def trigger_preparation(self, jobspec):
0018         """Trigger the stage-in procedure synchronously or asynchronously for the job.
0019         If the return code of this method is True, the job goes to the next step. If it is False,
0020         preparator immediately gives up the job. If it is None, the job is retried later.
0021         Input file attributes are available through jobspec.get_input_file_attributes(skip_ready=True)
0022         which gives a dictionary. The key of the dictionary is LFN of the input file
0023         and the value is a dictionary of file attributes. The attribute names are
0024         fsize, guid, checksum, scope, dataset, attemptNr, and endpoint. attemptNr shows how many times
0025         the file was tried so far. Grouping information such as transferID can be set to input files using
0026         jobspec.set_group_to_files(id_map) where id_map is
0027         {groupID:'lfns':[lfn1, ...], 'status':status}, and groupID and status are arbitrary strings.
0028 
0029         :param jobspec: job specifications
0030         :type jobspec: JobSpec
0031         :return: A tuple of return code (True: success, False: fatal error, None: temporary error)
0032                  and error dialog
0033         :rtype: (bool, string)
0034         """
0035         # -- make log
0036         # tmpLog = self.make_logger(_logger, 'PandaID={0}'.format(jobspec.PandaID),
0037         #                           method_name='trigger_preparation')
0038         # tmpLog.debug('start')
0039         #
0040         # Here is an example to access cached data
0041         # c_data = self.dbInterface.get_cache('panda_queues.json')
0042         # tmpLog.debug(len(c_data.data))
0043         #
0044         # Here is an example with file grouping :
0045         # -- get input files while skipping files already in ready state
0046         # inFiles = jobspec.get_input_file_attributes(skip_ready=True)
0047         # tmpLog.debug('inputs={0}'.format(str(inFiles)))
0048         # lfns = []
0049         # for inLFN in inFiles.keys():
0050         #     lfns.append(inLFN)
0051         # -- one transfer ID for all input files
0052         # transferID = str(uuid.uuid4())
0053         # -- set transfer ID which are used for later lookup
0054         # jobspec.set_groups_to_files({transferID: {'lfns': lfns, 'groupStatus': 'active'}})
0055         # tmpLog.debug('done')
0056         return True, ""
0057 
0058     # check status
0059     def check_stage_in_status(self, jobspec):
0060         """Check status of the stage-in procedure.
0061         If the return code of this method is True, the job goes to the next step. If it is False,
0062         preparator immediately gives up the job. If it is None, the job is retried later.
0063         If preparation is done synchronously in trigger_preparation
0064         this method should always return True. Status of file group can be updated using
0065         jobspec.update_group_status_in_files(group_id, group_status) if necessary.
0066 
0067         :param jobspec: job specifications
0068         :type jobspec: JobSpec
0069         :return: A tuple of return code (True: transfer success, False: fatal transfer failure,
0070                  None: on-going or temporary failure) and error dialog
0071         :rtype: (bool, string)
0072         """
0073         #
0074         # Here is an example with file grouping :
0075         # get groups of input files except ones already in ready state
0076         # transferGroups = jobspec.get_groups_of_input_files(skip_ready=True)
0077         # -- update transfer status
0078         # for transferID, transferInfo in transferGroups.items():
0079         #    jobspec.update_group_status_in_files(transferID, 'done')
0080         return True, ""
0081 
0082     # resolve input file paths
0083     def resolve_input_paths(self, jobspec):
0084         """Set input file paths to jobspec.get_input_file_attributes[LFN]['path'] for the job.
0085         New input file attributes need to be set to jobspec using jobspec.set_input_file_paths()
0086         after setting the file paths.
0087 
0088         :param jobspec: job specifications
0089         :type jobspec: JobSpec
0090         :return: A tuple of return code (True for success, False otherwise) and error dialog
0091         :rtype: (bool, string)
0092         """
0093         # Here is an example to set file paths
0094         # -- get input files
0095         inFiles = jobspec.get_input_file_attributes()
0096         # -- set path to each file
0097         for inLFN, inFile in inFiles.items():
0098             inFile["path"] = f"dummypath/{inLFN}"
0099         # -- set
0100         jobspec.set_input_file_paths(inFiles)
0101         return True, ""