Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import os
0002 import time
0003 from datetime import datetime
0004 
0005 import radical.utils
0006 import saga
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0010 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0011 from pandaharvester.harvestersubmitter.saga_submitter import SAGASubmitter
0012 
0013 # logger
0014 baseLogger = core_utils.setup_logger("saga_monitor")
0015 
0016 
0017 # monitor through SAGA
0018 class SAGAMonitor(PluginBase):
0019     # constructor
0020     def __init__(self, **kwarg):
0021         PluginBase.__init__(self, **kwarg)
0022         self.pluginFactory = PluginFactory()
0023         self.queue_config_mapper = QueueConfigMapper()
0024         tmpLog = self.make_logger(baseLogger, method_name="__init__")
0025         tmpLog.info(f"[{self.adaptor}] SAGA adaptor will be used.")
0026 
0027     # check workers
0028     def check_workers(self, workspec_list):
0029         """Check status of workers. This method takes a list of WorkSpecs as input argument
0030         and returns a list of worker's statuses.
0031 
0032         :param workspec_list: a list of work specs instances
0033         :return: A tuple of return code (True for success, False otherwise) and a list of worker's statuses.
0034         :rtype: (bool, [string,])
0035         """
0036         try:
0037             job_service = saga.job.Service(self.adaptor)
0038         except saga.SagaException as ex:
0039             time.sleep(10)
0040             self.check_workers(workspec_list)
0041         sagadateformat_str = "%a %b %d %H:%M:%S %Y"
0042         retList = []
0043         for workSpec in workspec_list:
0044             # make logger
0045             errStr = ""
0046             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0047             tmpLog.debug("SAGA monitor started")
0048             if workSpec.batchID:
0049                 saga_submission_id = f"[{self.adaptor}]-[{workSpec.batchID}]"
0050                 try:
0051                     worker = job_service.get_job(saga_submission_id)
0052                     tmpLog.debug(f"SAGA State for submission with batchid: {workSpec.batchID} is: {worker.state}")
0053                     harvester_job_state = SAGASubmitter.status_translator(worker.state)
0054                     workSpec.nativeStatus = worker.state
0055                     workSpec.set_status(harvester_job_state)
0056                     tmpLog.debug(f"Worker state with batchid: {workSpec.batchID} is: {harvester_job_state} exit code: {worker.exit_code}")
0057                     workSpec.set_status(harvester_job_state)
0058                     if worker.created:
0059                         tmpLog.debug(f"Worker created (SAGA): {worker.created}")
0060                         workSpec.submitTime = datetime.strptime(worker.created, sagadateformat_str)
0061                     if worker.started:
0062                         tmpLog.debug(f"Worker started (SAGA): {worker.started}")
0063                         workSpec.startTime = datetime.strptime(worker.started, sagadateformat_str)
0064                     if worker.finished:
0065                         tmpLog.debug(f"Worker finished (SAGA): {worker.finished}")
0066                         workSpec.endTime = datetime.strptime(worker.finished, sagadateformat_str)
0067 
0068                     if workSpec.is_final_status():
0069                         workSpec.nativeExitCode = worker.exit_code
0070                         tmpLog.info(f"Worker in final status [{workSpec.status}] exit code: {workSpec.nativeExitCode}")
0071                         if workSpec.nativeExitCode != 0:  # let's try to find exit code, exit message etc...
0072                             tmpLog.info("Deep check to find exit code and exit status required")
0073                             harvester_job_state, workSpec.nativeExitCode, workSpec.nativeStatus, starttime, endtime, errStr = self.deep_checkjob(
0074                                 workSpec.batchID, workSpec.workerID
0075                             )
0076                             if harvester_job_state == "":
0077                                 harvester_job_state = workSpec.ST_finished
0078                             if not workSpec.startTime:
0079                                 workSpec.startTime = starttime
0080                             if endtime:
0081                                 workSpec.endTime = endtime
0082                             workSpec.set_status(harvester_job_state)
0083                         tmpLog.info(
0084                             f"Worker {workSpec.workerID} with BatchID={workSpec.batchID} finished with exit code {worker.exit_code} and state {worker.state}"
0085                         )
0086                         tmpLog.debug(f"Started: [{worker.started}] finished: [{worker.finished}]")
0087 
0088                     if worker.state == saga.job.PENDING:
0089                         queue_time = (datetime.now() - workSpec.submitTime).total_seconds()
0090                         tmpLog.info(f"Worker queued for {queue_time} sec.")
0091                         if hasattr(self, "maxqueuetime") and queue_time > self.maxqueuetime:
0092                             tmpLog.info(f"Queue time {queue_time} is longer than limit {self.maxqueuetime} worker will be canceled")
0093                             worker.cancel()
0094                             worker.wait()
0095                             workSpec.nativeExitCode = worker.exit_code
0096                             cur_time = datetime.now()
0097                             workSpec.startTime = cur_time
0098                             workSpec.endTime = cur_time
0099                             workSpec.set_pilot_closed()
0100                             workSpec.set_status(workSpec.ST_cancelled)
0101                             harvester_job_state = workSpec.ST_cancelled
0102                             tmpLog.info(f"Worker state: {harvester_job_state} worker exit code: {workSpec.nativeExitCode}")
0103                             # proper processing of jobs for worker will be required, to avoid 'fake' fails
0104 
0105                 except saga.SagaException as ex:
0106                     tmpLog.info(f"An exception occured during retriving worker information {workSpec.batchID}")
0107                     tmpLog.info(ex.get_message())
0108                     # probably 'fnished' is not proper state in this case, 'undefined' looks a bit better
0109                     # some more work for SAGA to get proper state
0110                     harvester_job_state, workSpec.nativeExitCode, workSpec.nativeStatus, starttime, endtime, errStr = self.deep_checkjob(
0111                         workSpec.batchID, workSpec.workerID
0112                     )
0113                     if harvester_job_state == "":
0114                         harvester_job_state = workSpec.ST_finished
0115                     if not workSpec.startTime:
0116                         workSpec.startTime = starttime
0117                     if endtime:
0118                         workSpec.endTime = endtime
0119                     workSpec.set_status(harvester_job_state)
0120                     tmpLog.debug(f"Worker state set to: {workSpec.status} ({harvester_job_state})")
0121                 retList.append((harvester_job_state, errStr))
0122                 # for compatibility with dummy monitor
0123                 f = open(os.path.join(workSpec.accessPoint, "status.txt"), "w")
0124                 f.write(workSpec.status)
0125                 f.close()
0126 
0127             else:
0128                 tmpLog.debug(f"SAGA monitor found worker [{workSpec.workerID}] without batchID")
0129 
0130         job_service.close()
0131         tmpLog.debug(f"Results: {retList}")
0132 
0133         return True, retList
0134 
0135     def deep_checkjob(self, batchid, workerid):
0136         """
0137         Get job state, exit code and some more parameters, from resources depending sources
0138 
0139         :param batchid:
0140         :return harvester_job_state, nativeExitCode, nativeStatus, startTime, endTime, diagMessage
0141         """
0142         tmpLog = self.make_logger(baseLogger, f"workerID={workerid}", method_name="deep_checkjob")
0143         harvester_job_state = None
0144         nativeexitcode = None
0145         nativestatus = None
0146         diagmessage = ""
0147         starttime = None
0148         endtime = None
0149         queue_config = self.queue_config_mapper.get_queue(self.queueName)
0150         if hasattr(queue_config, "resource"):
0151             resource_utils = self.pluginFactory.get_plugin(queue_config.resource)
0152         else:
0153             tmpLog.debug(f"Resource configuration missed for: {self.queueName}")
0154             resource_utils = None
0155         if resource_utils:
0156             batchjob_info = resource_utils.get_batchjob_info(batchid)
0157         if batchjob_info:
0158             tmpLog.info(f"Batch job info collected: {batchjob_info}")
0159             harvester_job_state = batchjob_info["status"]
0160             nativeexitcode = batchjob_info["nativeExitCode"]
0161             nativestatus = batchjob_info["nativeStatus"]
0162             diagmessage = batchjob_info["nativeExitMsg"]
0163             if batchjob_info["start_time"]:
0164                 starttime = batchjob_info["start_time"]
0165             if batchjob_info["finish_time"]:
0166                 endtime = batchjob_info["finish_time"]
0167 
0168         return harvester_job_state, nativeexitcode, nativestatus, starttime, endtime, diagmessage