Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import datetime
0002 import json
0003 import os
0004 
0005 from act.atlas.aCTDBPanda import aCTDBPanda
0006 
0007 from pandaharvester.harvesterconfig import harvester_config
0008 from pandaharvester.harvestercore import core_utils
0009 from pandaharvester.harvestercore.work_spec import WorkSpec
0010 
0011 from .base_messenger import BaseMessenger
0012 
0013 # json for outputs
0014 jsonOutputsFileName = harvester_config.payload_interaction.eventStatusDumpJsonFile
0015 
0016 # json to update events
0017 jsonEventsUpdateFileName = harvester_config.payload_interaction.updateEventsFile
0018 
0019 # suffix to read json
0020 suffixReadJson = ".read"
0021 
0022 # logger
0023 baseLogger = core_utils.setup_logger("act_messenger")
0024 
0025 
0026 class ACTMessenger(BaseMessenger):
0027     """Mechanism for passing information about completed jobs back to harvester."""
0028 
0029     def __init__(self, **kwarg):
0030         BaseMessenger.__init__(self, **kwarg)
0031 
0032         # Set up aCT DB connection
0033         self.log = core_utils.make_logger(baseLogger, "aCT messenger", method_name="__init__")
0034         self.actDB = aCTDBPanda(self.log)
0035 
0036     # get access point
0037     def get_access_point(self, workspec, panda_id):
0038         if workspec.mapType == WorkSpec.MT_MultiJobs:
0039             accessPoint = os.path.join(workspec.get_access_point(), str(panda_id))
0040         else:
0041             accessPoint = workspec.get_access_point()
0042         return accessPoint
0043 
0044     def post_processing(self, workspec, jobspec_list, map_type):
0045         """Now done in stager"""
0046         return True
0047 
0048     def get_work_attributes(self, workspec):
0049         """Get info from the job to pass back to harvester"""
0050         # Just return existing attributes. Attributes are added to workspec for
0051         # finished jobs in post_processing
0052         return workspec.workAttributes
0053 
0054     def events_requested(self, workspec):
0055         """Used to tell harvester that the worker requests events."""
0056 
0057         # Not yet implemented, dynamic event fetching not supported yet
0058         return {}
0059 
0060     def feed_events(self, workspec, events_dict):
0061         """
0062         Harvester has an event range to pass to job
0063         events_dict is {pandaid: [{eventrange1}, {eventrange2}, ..]}
0064         """
0065 
0066         # get logger
0067         tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="feed_events")
0068         retVal = True
0069         if workspec.mapType in [WorkSpec.MT_OneToOne, WorkSpec.MT_MultiWorkers]:
0070             # insert the event range into aCT DB and mark the job ready to go
0071             for pandaid, eventranges in events_dict.items():
0072                 desc = {"eventranges": json.dumps(eventranges), "actpandastatus": "sent", "pandastatus": "sent", "arcjobid": None}
0073                 tmpLog.info(f"Inserting {len(eventranges)} events for job {pandaid}")
0074                 try:
0075                     self.actDB.updateJob(pandaid, desc)
0076                 except Exception as e:
0077                     core_utils.dump_error_message(tmpLog)
0078                     retVal = False
0079         elif workspec.mapType == WorkSpec.MT_MultiJobs:
0080             # TOBEFIXED
0081             pass
0082         tmpLog.debug("done")
0083         return retVal
0084 
0085     def events_to_update(self, workspec):
0086         """Report events processed for harvester to update"""
0087 
0088         # get logger
0089         tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="events_to_update")
0090         # look for the json just under the access point
0091         retDict = dict()
0092         for pandaID in workspec.pandaid_list:
0093             # look for the json just under the access point
0094             accessPoint = self.get_access_point(workspec, pandaID)
0095 
0096             jsonFilePath = os.path.join(accessPoint, jsonEventsUpdateFileName)
0097             readJsonPath = jsonFilePath + suffixReadJson
0098             # first look for json.read which is not yet acknowledged
0099             tmpLog.debug(f"looking for event update file {readJsonPath}")
0100             if os.path.exists(readJsonPath):
0101                 pass
0102             else:
0103                 tmpLog.debug(f"looking for event update file {jsonFilePath}")
0104                 if not os.path.exists(jsonFilePath):
0105                     # not found
0106                     tmpLog.debug("not found")
0107                     continue
0108                 try:
0109                     # rename to prevent from being overwritten
0110                     os.rename(jsonFilePath, readJsonPath)
0111                 except Exception:
0112                     tmpLog.error("failed to rename json")
0113                     continue
0114             # load json
0115             nData = 0
0116             try:
0117                 with open(readJsonPath) as jsonFile:
0118                     tmpOrigDict = json.load(jsonFile)
0119                     newDict = dict()
0120                     # change the key from str to int
0121                     for tmpPandaID, tmpDict in tmpOrigDict.items():
0122                         tmpPandaID = int(tmpPandaID)
0123                         retDict[tmpPandaID] = tmpDict
0124                         nData += len(tmpDict)
0125             except Exception as x:
0126                 tmpLog.error(f"failed to load json: {str(x)}")
0127             # delete empty file
0128             if nData == 0:
0129                 try:
0130                     os.remove(readJsonPath)
0131                 except Exception:
0132                     pass
0133             tmpLog.debug(f"got {nData} events for PandaID={pandaID}")
0134         return retDict
0135 
0136     def acknowledge_events_files(self, workspec):
0137         """Acknowledge that events were picked up by harvester"""
0138 
0139         # get logger
0140         tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="acknowledge_events_files")
0141         # remove request file
0142         for pandaID in workspec.pandaid_list:
0143             accessPoint = self.get_access_point(workspec, pandaID)
0144             try:
0145                 jsonFilePath = os.path.join(accessPoint, jsonEventsUpdateFileName)
0146                 jsonFilePath += suffixReadJson
0147                 jsonFilePath_rename = jsonFilePath + "." + str(core_utils.naive_utcnow())
0148                 os.rename(jsonFilePath, jsonFilePath_rename)
0149             except Exception:
0150                 pass
0151             try:
0152                 jsonFilePath = os.path.join(accessPoint, jsonOutputsFileName)
0153                 jsonFilePath += suffixReadJson
0154                 jsonFilePath_rename = jsonFilePath + "." + str(core_utils.naive_utcnow())
0155                 os.rename(jsonFilePath, jsonFilePath_rename)
0156             except Exception:
0157                 pass
0158         tmpLog.debug("done")
0159         return
0160 
0161     # setup access points
0162     def setup_access_points(self, workspec_list):
0163         try:
0164             for workSpec in workspec_list:
0165                 accessPoint = workSpec.get_access_point()
0166                 # make the dir if missing
0167                 if not os.path.exists(accessPoint):
0168                     os.makedirs(accessPoint)
0169                 jobSpecs = workSpec.get_jobspec_list()
0170                 if jobSpecs is not None:
0171                     for jobSpec in jobSpecs:
0172                         subAccessPoint = self.get_access_point(workSpec, jobSpec.PandaID)
0173                         if accessPoint != subAccessPoint:
0174                             if not os.path.exists(subAccessPoint):
0175                                 os.mkdir(subAccessPoint)
0176             return True
0177         except Exception:
0178             # get logger
0179             tmpLog = core_utils.make_logger(_logger, method_name="setup_access_points")
0180             core_utils.dump_error_message(tmpLog)
0181             return False
0182 
0183     # The remaining methods do not apply to ARC
0184 
0185     def feed_jobs(self, workspec, jobspec_list):
0186         """Pass job to worker. No-op for Grid"""
0187         return True
0188 
0189     def get_files_to_stage_out(self, workspec):
0190         """Not required in Grid case"""
0191         return {}
0192 
0193     def job_requested(self, workspec):
0194         """Used in pull model to say that worker is ready for a job"""
0195         return False
0196 
0197     def setup_access_points(self, workspec_list):
0198         """Access is through CE so nothing to set up here"""
0199         pass
0200 
0201     def get_panda_ids(self, workspec):
0202         """For pull model, get panda IDs assigned to jobs"""
0203         return []
0204 
0205     def kill_requested(self, workspec):
0206         """Worker wants to kill itself (?)"""
0207         return False
0208 
0209     def is_alive(self, workspec, time_limit):
0210         """Check if worker is alive, not for Grid"""
0211         return True
0212 
0213 
0214 def test():
0215     pass
0216 
0217 
0218 if __name__ == "__main__":
0219     test()