File indexing completed on 2026-04-20 07:58:58
0001 from pandaharvester.harvesterconfig import harvester_config
0002 from pandaharvester.harvestercore.plugin_base import PluginBase
0003
0004
0005
0006 def get_payload_interaction_attr(attr, default=None):
0007 return getattr(harvester_config.payload_interaction, attr, default)
0008
0009
0010
0011 class BaseMessenger(PluginBase):
0012
0013 def __init__(self, **kwarg):
0014 self._load_default_attrs()
0015 PluginBase.__init__(self, **kwarg)
0016
0017
0018 def _load_default_attrs(self):
0019
0020 self.jsonAttrsFileName = get_payload_interaction_attr("workerAttributesFile")
0021
0022 self.jsonJobReport = get_payload_interaction_attr("jobReportFile")
0023
0024 self.jsonOutputsFileName = get_payload_interaction_attr("eventStatusDumpJsonFile")
0025
0026 self.xmlOutputsBaseFileName = get_payload_interaction_attr("eventStatusDumpXmlFile")
0027
0028 self.jsonJobRequestFileName = get_payload_interaction_attr("jobRequestFile")
0029
0030 self.jobSpecFileName = get_payload_interaction_attr("jobSpecFile", "pandaJobData.out")
0031
0032 self.jsonEventsRequestFileName = get_payload_interaction_attr("eventRequestFile")
0033
0034 self.jsonEventsFeedFileName = get_payload_interaction_attr("eventRangesFile")
0035
0036 self.jsonEventsUpdateFileName = get_payload_interaction_attr("updateEventsFile")
0037
0038 self.xmlPoolCatalogFileName = get_payload_interaction_attr("xmlPoolCatalogFile")
0039
0040 self.pandaIDsFile = get_payload_interaction_attr("pandaIDsFile")
0041
0042 self.killWorkerFile = get_payload_interaction_attr("killWorkerFile", "kill_worker.json")
0043
0044 self.heartbeatFile = get_payload_interaction_attr("heartbeatFile", "worker_heartbeat.json")
0045
0046 self.taskWorkBaseDir = get_payload_interaction_attr("taskWorkBaseDir", "/tmp/workdir")
0047
0048 self.taskWorkStateFile = get_payload_interaction_attr("taskWorkStateFile", "state.json")
0049
0050
0051 def get_access_point(self, workspec, panda_id):
0052 pass
0053
0054
0055
0056 def get_work_attributes(self, workspec):
0057 return dict()
0058
0059
0060
0061 def get_files_to_stage_out(self, workspec):
0062 return dict()
0063
0064
0065
0066 def job_requested(self, workspec):
0067 return 0
0068
0069
0070
0071 def feed_jobs(self, workspec, jobspec_list):
0072 return False
0073
0074
0075
0076 def events_requested(self, workspec):
0077 return dict()
0078
0079
0080
0081 def feed_events(self, workspec, events_dict):
0082 return False
0083
0084
0085
0086 def events_to_update(self, workspec):
0087 return dict()
0088
0089
0090
0091 def acknowledge_events_files(self, workspec):
0092 pass
0093
0094
0095 def setup_access_points(self, workspec_list):
0096 pass
0097
0098
0099 def filter_log_tgz(self, name):
0100 return False
0101
0102
0103 def post_processing(self, workspec, jobspec_list, map_type):
0104 return True
0105
0106
0107 def get_panda_ids(self, workspec):
0108 return list()
0109
0110
0111 def kill_requested(self, workspec):
0112 return False
0113
0114
0115 def is_alive(self, workspec, time_limit):
0116 return None
0117
0118
0119 def clean_up(self, workspec):
0120 return (None, "skipped")