File indexing completed on 2026-04-20 07:58:58
0001 import datetime
0002 import json
0003 import os
0004
0005 from act.atlas.aCTDBPanda import aCTDBPanda
0006
0007 from pandaharvester.harvesterconfig import harvester_config
0008 from pandaharvester.harvestercore import core_utils
0009 from pandaharvester.harvestercore.work_spec import WorkSpec
0010
0011 from .base_messenger import BaseMessenger
0012
0013
0014 jsonOutputsFileName = harvester_config.payload_interaction.eventStatusDumpJsonFile
0015
0016
0017 jsonEventsUpdateFileName = harvester_config.payload_interaction.updateEventsFile
0018
0019
0020 suffixReadJson = ".read"
0021
0022
0023 baseLogger = core_utils.setup_logger("act_messenger")
0024
0025
0026 class ACTMessenger(BaseMessenger):
0027 """Mechanism for passing information about completed jobs back to harvester."""
0028
0029 def __init__(self, **kwarg):
0030 BaseMessenger.__init__(self, **kwarg)
0031
0032
0033 self.log = core_utils.make_logger(baseLogger, "aCT messenger", method_name="__init__")
0034 self.actDB = aCTDBPanda(self.log)
0035
0036
0037 def get_access_point(self, workspec, panda_id):
0038 if workspec.mapType == WorkSpec.MT_MultiJobs:
0039 accessPoint = os.path.join(workspec.get_access_point(), str(panda_id))
0040 else:
0041 accessPoint = workspec.get_access_point()
0042 return accessPoint
0043
0044 def post_processing(self, workspec, jobspec_list, map_type):
0045 """Now done in stager"""
0046 return True
0047
0048 def get_work_attributes(self, workspec):
0049 """Get info from the job to pass back to harvester"""
0050
0051
0052 return workspec.workAttributes
0053
0054 def events_requested(self, workspec):
0055 """Used to tell harvester that the worker requests events."""
0056
0057
0058 return {}
0059
0060 def feed_events(self, workspec, events_dict):
0061 """
0062 Harvester has an event range to pass to job
0063 events_dict is {pandaid: [{eventrange1}, {eventrange2}, ..]}
0064 """
0065
0066
0067 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="feed_events")
0068 retVal = True
0069 if workspec.mapType in [WorkSpec.MT_OneToOne, WorkSpec.MT_MultiWorkers]:
0070
0071 for pandaid, eventranges in events_dict.items():
0072 desc = {"eventranges": json.dumps(eventranges), "actpandastatus": "sent", "pandastatus": "sent", "arcjobid": None}
0073 tmpLog.info(f"Inserting {len(eventranges)} events for job {pandaid}")
0074 try:
0075 self.actDB.updateJob(pandaid, desc)
0076 except Exception as e:
0077 core_utils.dump_error_message(tmpLog)
0078 retVal = False
0079 elif workspec.mapType == WorkSpec.MT_MultiJobs:
0080
0081 pass
0082 tmpLog.debug("done")
0083 return retVal
0084
0085 def events_to_update(self, workspec):
0086 """Report events processed for harvester to update"""
0087
0088
0089 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="events_to_update")
0090
0091 retDict = dict()
0092 for pandaID in workspec.pandaid_list:
0093
0094 accessPoint = self.get_access_point(workspec, pandaID)
0095
0096 jsonFilePath = os.path.join(accessPoint, jsonEventsUpdateFileName)
0097 readJsonPath = jsonFilePath + suffixReadJson
0098
0099 tmpLog.debug(f"looking for event update file {readJsonPath}")
0100 if os.path.exists(readJsonPath):
0101 pass
0102 else:
0103 tmpLog.debug(f"looking for event update file {jsonFilePath}")
0104 if not os.path.exists(jsonFilePath):
0105
0106 tmpLog.debug("not found")
0107 continue
0108 try:
0109
0110 os.rename(jsonFilePath, readJsonPath)
0111 except Exception:
0112 tmpLog.error("failed to rename json")
0113 continue
0114
0115 nData = 0
0116 try:
0117 with open(readJsonPath) as jsonFile:
0118 tmpOrigDict = json.load(jsonFile)
0119 newDict = dict()
0120
0121 for tmpPandaID, tmpDict in tmpOrigDict.items():
0122 tmpPandaID = int(tmpPandaID)
0123 retDict[tmpPandaID] = tmpDict
0124 nData += len(tmpDict)
0125 except Exception as x:
0126 tmpLog.error(f"failed to load json: {str(x)}")
0127
0128 if nData == 0:
0129 try:
0130 os.remove(readJsonPath)
0131 except Exception:
0132 pass
0133 tmpLog.debug(f"got {nData} events for PandaID={pandaID}")
0134 return retDict
0135
0136 def acknowledge_events_files(self, workspec):
0137 """Acknowledge that events were picked up by harvester"""
0138
0139
0140 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="acknowledge_events_files")
0141
0142 for pandaID in workspec.pandaid_list:
0143 accessPoint = self.get_access_point(workspec, pandaID)
0144 try:
0145 jsonFilePath = os.path.join(accessPoint, jsonEventsUpdateFileName)
0146 jsonFilePath += suffixReadJson
0147 jsonFilePath_rename = jsonFilePath + "." + str(core_utils.naive_utcnow())
0148 os.rename(jsonFilePath, jsonFilePath_rename)
0149 except Exception:
0150 pass
0151 try:
0152 jsonFilePath = os.path.join(accessPoint, jsonOutputsFileName)
0153 jsonFilePath += suffixReadJson
0154 jsonFilePath_rename = jsonFilePath + "." + str(core_utils.naive_utcnow())
0155 os.rename(jsonFilePath, jsonFilePath_rename)
0156 except Exception:
0157 pass
0158 tmpLog.debug("done")
0159 return
0160
0161
0162 def setup_access_points(self, workspec_list):
0163 try:
0164 for workSpec in workspec_list:
0165 accessPoint = workSpec.get_access_point()
0166
0167 if not os.path.exists(accessPoint):
0168 os.makedirs(accessPoint)
0169 jobSpecs = workSpec.get_jobspec_list()
0170 if jobSpecs is not None:
0171 for jobSpec in jobSpecs:
0172 subAccessPoint = self.get_access_point(workSpec, jobSpec.PandaID)
0173 if accessPoint != subAccessPoint:
0174 if not os.path.exists(subAccessPoint):
0175 os.mkdir(subAccessPoint)
0176 return True
0177 except Exception:
0178
0179 tmpLog = core_utils.make_logger(_logger, method_name="setup_access_points")
0180 core_utils.dump_error_message(tmpLog)
0181 return False
0182
0183
0184
0185 def feed_jobs(self, workspec, jobspec_list):
0186 """Pass job to worker. No-op for Grid"""
0187 return True
0188
0189 def get_files_to_stage_out(self, workspec):
0190 """Not required in Grid case"""
0191 return {}
0192
0193 def job_requested(self, workspec):
0194 """Used in pull model to say that worker is ready for a job"""
0195 return False
0196
0197 def setup_access_points(self, workspec_list):
0198 """Access is through CE so nothing to set up here"""
0199 pass
0200
0201 def get_panda_ids(self, workspec):
0202 """For pull model, get panda IDs assigned to jobs"""
0203 return []
0204
0205 def kill_requested(self, workspec):
0206 """Worker wants to kill itself (?)"""
0207 return False
0208
0209 def is_alive(self, workspec, time_limit):
0210 """Check if worker is alive, not for Grid"""
0211 return True
0212
0213
0214 def test():
0215 pass
0216
0217
0218 if __name__ == "__main__":
0219 test()