Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 from act.atlas.aCTDBPanda import aCTDBPanda
0002 from pandaharvester.harvesterconfig import harvester_config
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.plugin_base import PluginBase
0005 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0006 from pandaharvester.harvestercore.work_spec import WorkSpec
0007 from pandaharvester.harvestercore.worker_errors import WorkerErrors
0008 
0009 # json for job report
0010 jsonJobReport = harvester_config.payload_interaction.jobReportFile
0011 
0012 # logger
0013 baseLogger = core_utils.setup_logger("act_monitor")
0014 
0015 
0016 # monitor for aCT plugin
0017 class ACTMonitor(PluginBase):
0018     # constructor
0019     def __init__(self, **kwarg):
0020         PluginBase.__init__(self, **kwarg)
0021 
0022         # Set up aCT DB connection
0023         self.log = core_utils.make_logger(baseLogger, "aCT submitter", method_name="__init__")
0024         try:
0025             self.actDB = aCTDBPanda(self.log)
0026         except Exception as e:
0027             self.log.error(f"Could not connect to aCT database: {str(e)}")
0028             self.actDB = None
0029 
0030     # check workers
0031     def check_workers(self, workspec_list):
0032         retList = []
0033         for workSpec in workspec_list:
0034             # make logger
0035             tmpLog = core_utils.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0036 
0037             queueconfigmapper = QueueConfigMapper()
0038             queueconfig = queueconfigmapper.get_queue(workSpec.computingSite)
0039             try:
0040                 tmpLog.debug(f"Querying aCT for id {workSpec.batchID}")
0041                 columns = ["actpandastatus", "pandastatus", "computingElement", "node", "error"]
0042                 actjobs = self.actDB.getJobs(f"id={workSpec.batchID}", columns)
0043             except Exception as e:
0044                 if self.actDB:
0045                     tmpLog.error(f"Failed to query aCT DB: {str(e)}")
0046                 # send back current status
0047                 retList.append((workSpec.status, ""))
0048                 continue
0049 
0050             if not actjobs:
0051                 tmpLog.error(f"Job with id {workSpec.batchID} not found in aCT")
0052                 # send back current status
0053                 retList.append((WorkSpec.ST_failed, "Job not found in aCT"))
0054                 continue
0055 
0056             actstatus = actjobs[0]["actpandastatus"]
0057             workSpec.nativeStatus = actstatus
0058             newStatus = WorkSpec.ST_running
0059             errorMsg = ""
0060             if actstatus in ["waiting", "sent", "starting"]:
0061                 newStatus = WorkSpec.ST_submitted
0062 
0063             # Handle post running states
0064             if queueconfig.truePilot:
0065                 # True pilot: keep in running until really done
0066                 if actstatus in ["done", "donecancelled"]:
0067                     newStatus = WorkSpec.ST_finished
0068                 elif actstatus == "donefailed":
0069                     # set failed here with workspec sup error
0070                     errorMsg = actjobs[0]["error"] or "Unknown error"
0071                     error_code = WorkerErrors.error_codes.get("GENERAL_ERROR")
0072                     workSpec.set_supplemental_error(error_code=error_code, error_diag=errorMsg)
0073                     newStatus = WorkSpec.ST_failed
0074                     tmpLog.info(f"ID {workSpec.batchID} failed with error {errorMsg})")
0075             elif actstatus in ["done", "donefailed", "donecancelled", "transferring", "tovalidate"]:
0076                 # NG mode: all post processing is now done in the stager
0077                 newStatus = WorkSpec.ST_finished
0078 
0079             if newStatus != workSpec.status:
0080                 tmpLog.info(f"ID {workSpec.batchID} updated status {workSpec.status} -> {newStatus} ({actstatus})")
0081             else:
0082                 tmpLog.debug(f"batchStatus {actstatus} -> workerStatus {newStatus}")
0083 
0084             if actjobs[0]["computingElement"]:
0085                 workSpec.computingElement = actjobs[0]["computingElement"]
0086             if actjobs[0]["node"]:
0087                 try:
0088                     pandaid = workSpec.get_jobspec_list()[0].PandaID
0089                     workSpec.set_work_attributes({pandaid: {"node": actjobs[0]["node"]}})
0090                 except BaseException:
0091                     tmpLog.warning(f"Could not extract panda ID for worker {workSpec.batchID}")
0092 
0093             retList.append((newStatus, errorMsg))
0094 
0095         return True, retList