Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 from pandaharvester.harvesterconfig import harvester_config
0002 from pandaharvester.harvestercore.plugin_base import PluginBase
0003 
0004 
0005 # get payload interaction attributes from harvester config
0006 def get_payload_interaction_attr(attr, default=None):
0007     return getattr(harvester_config.payload_interaction, attr, default)
0008 
0009 
0010 # base messenger
0011 class BaseMessenger(PluginBase):
0012     # constructor
0013     def __init__(self, **kwarg):
0014         self._load_default_attrs()
0015         PluginBase.__init__(self, **kwarg)
0016 
0017     # load default messenger attributes
0018     def _load_default_attrs(self):
0019         # json for worker attributes
0020         self.jsonAttrsFileName = get_payload_interaction_attr("workerAttributesFile")
0021         # json for job report
0022         self.jsonJobReport = get_payload_interaction_attr("jobReportFile")
0023         # json for outputs
0024         self.jsonOutputsFileName = get_payload_interaction_attr("eventStatusDumpJsonFile")
0025         # xml for outputs
0026         self.xmlOutputsBaseFileName = get_payload_interaction_attr("eventStatusDumpXmlFile")
0027         # json for job request
0028         self.jsonJobRequestFileName = get_payload_interaction_attr("jobRequestFile")
0029         # json for job spec
0030         self.jobSpecFileName = get_payload_interaction_attr("jobSpecFile", "pandaJobData.out")
0031         # json for event request
0032         self.jsonEventsRequestFileName = get_payload_interaction_attr("eventRequestFile")
0033         # json to feed events
0034         self.jsonEventsFeedFileName = get_payload_interaction_attr("eventRangesFile")
0035         # json to update events
0036         self.jsonEventsUpdateFileName = get_payload_interaction_attr("updateEventsFile")
0037         # PFC for input files
0038         self.xmlPoolCatalogFileName = get_payload_interaction_attr("xmlPoolCatalogFile")
0039         # json to get PandaIDs
0040         self.pandaIDsFile = get_payload_interaction_attr("pandaIDsFile")
0041         # json to kill worker itself
0042         self.killWorkerFile = get_payload_interaction_attr("killWorkerFile", "kill_worker.json")
0043         # json for heartbeats from the worker
0044         self.heartbeatFile = get_payload_interaction_attr("heartbeatFile", "worker_heartbeat.json")
0045         # task specific persistent dir
0046         self.taskWorkBaseDir = get_payload_interaction_attr("taskWorkBaseDir", "/tmp/workdir")
0047         # task-level work state file
0048         self.taskWorkStateFile = get_payload_interaction_attr("taskWorkStateFile", "state.json")
0049 
0050     # get access point
0051     def get_access_point(self, workspec, panda_id):
0052         pass
0053 
0054     # get attributes of a worker which should be propagated to job(s).
0055     #  * the worker needs to put a json under the access point
0056     def get_work_attributes(self, workspec):
0057         return dict()
0058 
0059     # get files to stage-out.
0060     #  * the worker needs to put a json under the access point
0061     def get_files_to_stage_out(self, workspec):
0062         return dict()
0063 
0064     # check if job is requested.
0065     # * the worker needs to put a json under the access point
0066     def job_requested(self, workspec):
0067         return 0
0068 
0069     # feed jobs
0070     # * worker_jobspec.json is put under the access point
0071     def feed_jobs(self, workspec, jobspec_list):
0072         return False
0073 
0074     # request events.
0075     # * the worker needs to put a json under the access point
0076     def events_requested(self, workspec):
0077         return dict()
0078 
0079     # feed events
0080     # * worker_events.json is put under the access point
0081     def feed_events(self, workspec, events_dict):
0082         return False
0083 
0084     # update events.
0085     # * the worker needs to put a json under the access point
0086     def events_to_update(self, workspec):
0087         return dict()
0088 
0089     # acknowledge events and files
0090     # * delete json.read files
0091     def acknowledge_events_files(self, workspec):
0092         pass
0093 
0094     # setup access points
0095     def setup_access_points(self, workspec_list):
0096         pass
0097 
0098     # filter for log.tar.gz
0099     def filter_log_tgz(self, name):
0100         return False
0101 
0102     # post-processing (archiving log files and collecting job metrics)
0103     def post_processing(self, workspec, jobspec_list, map_type):
0104         return True
0105 
0106     # get PandaIDs for pull model
0107     def get_panda_ids(self, workspec):
0108         return list()
0109 
0110     # check if requested to kill the worker itself
0111     def kill_requested(self, workspec):
0112         return False
0113 
0114     # check if the worker is alive
0115     def is_alive(self, workspec, time_limit):
0116         return None
0117 
0118     # clean up. Called by sweeper agent to clean up stuff made by messenger for the worker
0119     def clean_up(self, workspec):
0120         return (None, "skipped")