File indexing completed on 2026-04-20 07:58:59
0001 import os
0002 import time
0003 from datetime import datetime
0004
0005 import radical.utils
0006 import saga
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0010 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0011 from pandaharvester.harvestersubmitter.saga_submitter import SAGASubmitter
0012
0013
0014 baseLogger = core_utils.setup_logger("saga_monitor")
0015
0016
0017
0018 class SAGAMonitor(PluginBase):
0019
0020 def __init__(self, **kwarg):
0021 PluginBase.__init__(self, **kwarg)
0022 self.pluginFactory = PluginFactory()
0023 self.queue_config_mapper = QueueConfigMapper()
0024 tmpLog = self.make_logger(baseLogger, method_name="__init__")
0025 tmpLog.info(f"[{self.adaptor}] SAGA adaptor will be used.")
0026
0027
0028 def check_workers(self, workspec_list):
0029 """Check status of workers. This method takes a list of WorkSpecs as input argument
0030 and returns a list of worker's statuses.
0031
0032 :param workspec_list: a list of work specs instances
0033 :return: A tuple of return code (True for success, False otherwise) and a list of worker's statuses.
0034 :rtype: (bool, [string,])
0035 """
0036 try:
0037 job_service = saga.job.Service(self.adaptor)
0038 except saga.SagaException as ex:
0039 time.sleep(10)
0040 self.check_workers(workspec_list)
0041 sagadateformat_str = "%a %b %d %H:%M:%S %Y"
0042 retList = []
0043 for workSpec in workspec_list:
0044
0045 errStr = ""
0046 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0047 tmpLog.debug("SAGA monitor started")
0048 if workSpec.batchID:
0049 saga_submission_id = f"[{self.adaptor}]-[{workSpec.batchID}]"
0050 try:
0051 worker = job_service.get_job(saga_submission_id)
0052 tmpLog.debug(f"SAGA State for submission with batchid: {workSpec.batchID} is: {worker.state}")
0053 harvester_job_state = SAGASubmitter.status_translator(worker.state)
0054 workSpec.nativeStatus = worker.state
0055 workSpec.set_status(harvester_job_state)
0056 tmpLog.debug(f"Worker state with batchid: {workSpec.batchID} is: {harvester_job_state} exit code: {worker.exit_code}")
0057 workSpec.set_status(harvester_job_state)
0058 if worker.created:
0059 tmpLog.debug(f"Worker created (SAGA): {worker.created}")
0060 workSpec.submitTime = datetime.strptime(worker.created, sagadateformat_str)
0061 if worker.started:
0062 tmpLog.debug(f"Worker started (SAGA): {worker.started}")
0063 workSpec.startTime = datetime.strptime(worker.started, sagadateformat_str)
0064 if worker.finished:
0065 tmpLog.debug(f"Worker finished (SAGA): {worker.finished}")
0066 workSpec.endTime = datetime.strptime(worker.finished, sagadateformat_str)
0067
0068 if workSpec.is_final_status():
0069 workSpec.nativeExitCode = worker.exit_code
0070 tmpLog.info(f"Worker in final status [{workSpec.status}] exit code: {workSpec.nativeExitCode}")
0071 if workSpec.nativeExitCode != 0:
0072 tmpLog.info("Deep check to find exit code and exit status required")
0073 harvester_job_state, workSpec.nativeExitCode, workSpec.nativeStatus, starttime, endtime, errStr = self.deep_checkjob(
0074 workSpec.batchID, workSpec.workerID
0075 )
0076 if harvester_job_state == "":
0077 harvester_job_state = workSpec.ST_finished
0078 if not workSpec.startTime:
0079 workSpec.startTime = starttime
0080 if endtime:
0081 workSpec.endTime = endtime
0082 workSpec.set_status(harvester_job_state)
0083 tmpLog.info(
0084 f"Worker {workSpec.workerID} with BatchID={workSpec.batchID} finished with exit code {worker.exit_code} and state {worker.state}"
0085 )
0086 tmpLog.debug(f"Started: [{worker.started}] finished: [{worker.finished}]")
0087
0088 if worker.state == saga.job.PENDING:
0089 queue_time = (datetime.now() - workSpec.submitTime).total_seconds()
0090 tmpLog.info(f"Worker queued for {queue_time} sec.")
0091 if hasattr(self, "maxqueuetime") and queue_time > self.maxqueuetime:
0092 tmpLog.info(f"Queue time {queue_time} is longer than limit {self.maxqueuetime} worker will be canceled")
0093 worker.cancel()
0094 worker.wait()
0095 workSpec.nativeExitCode = worker.exit_code
0096 cur_time = datetime.now()
0097 workSpec.startTime = cur_time
0098 workSpec.endTime = cur_time
0099 workSpec.set_pilot_closed()
0100 workSpec.set_status(workSpec.ST_cancelled)
0101 harvester_job_state = workSpec.ST_cancelled
0102 tmpLog.info(f"Worker state: {harvester_job_state} worker exit code: {workSpec.nativeExitCode}")
0103
0104
0105 except saga.SagaException as ex:
0106 tmpLog.info(f"An exception occured during retriving worker information {workSpec.batchID}")
0107 tmpLog.info(ex.get_message())
0108
0109
0110 harvester_job_state, workSpec.nativeExitCode, workSpec.nativeStatus, starttime, endtime, errStr = self.deep_checkjob(
0111 workSpec.batchID, workSpec.workerID
0112 )
0113 if harvester_job_state == "":
0114 harvester_job_state = workSpec.ST_finished
0115 if not workSpec.startTime:
0116 workSpec.startTime = starttime
0117 if endtime:
0118 workSpec.endTime = endtime
0119 workSpec.set_status(harvester_job_state)
0120 tmpLog.debug(f"Worker state set to: {workSpec.status} ({harvester_job_state})")
0121 retList.append((harvester_job_state, errStr))
0122
0123 f = open(os.path.join(workSpec.accessPoint, "status.txt"), "w")
0124 f.write(workSpec.status)
0125 f.close()
0126
0127 else:
0128 tmpLog.debug(f"SAGA monitor found worker [{workSpec.workerID}] without batchID")
0129
0130 job_service.close()
0131 tmpLog.debug(f"Results: {retList}")
0132
0133 return True, retList
0134
0135 def deep_checkjob(self, batchid, workerid):
0136 """
0137 Get job state, exit code and some more parameters, from resources depending sources
0138
0139 :param batchid:
0140 :return harvester_job_state, nativeExitCode, nativeStatus, startTime, endTime, diagMessage
0141 """
0142 tmpLog = self.make_logger(baseLogger, f"workerID={workerid}", method_name="deep_checkjob")
0143 harvester_job_state = None
0144 nativeexitcode = None
0145 nativestatus = None
0146 diagmessage = ""
0147 starttime = None
0148 endtime = None
0149 queue_config = self.queue_config_mapper.get_queue(self.queueName)
0150 if hasattr(queue_config, "resource"):
0151 resource_utils = self.pluginFactory.get_plugin(queue_config.resource)
0152 else:
0153 tmpLog.debug(f"Resource configuration missed for: {self.queueName}")
0154 resource_utils = None
0155 if resource_utils:
0156 batchjob_info = resource_utils.get_batchjob_info(batchid)
0157 if batchjob_info:
0158 tmpLog.info(f"Batch job info collected: {batchjob_info}")
0159 harvester_job_state = batchjob_info["status"]
0160 nativeexitcode = batchjob_info["nativeExitCode"]
0161 nativestatus = batchjob_info["nativeStatus"]
0162 diagmessage = batchjob_info["nativeExitMsg"]
0163 if batchjob_info["start_time"]:
0164 starttime = batchjob_info["start_time"]
0165 if batchjob_info["finish_time"]:
0166 endtime = batchjob_info["finish_time"]
0167
0168 return harvester_job_state, nativeexitcode, nativestatus, starttime, endtime, diagmessage