File indexing completed on 2026-04-20 07:58:59
0001 import json
0002 import os.path
0003 import re
0004 import subprocess
0005 from pprint import pprint
0006
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009 from pandaharvester.harvestercore.work_spec import WorkSpec
0010
0011
0012 baseLogger = core_utils.setup_logger("cobalt_monitor")
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022 class CobaltMonitor(PluginBase):
0023
0024 def __init__(self, **kwarg):
0025 PluginBase.__init__(self, **kwarg)
0026
0027
0028 def check_workers(self, workspec_list):
0029 retList = []
0030 for workSpec in workspec_list:
0031
0032
0033
0034
0035
0036 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0037
0038 comStr = f"qstat {workSpec.batchID}"
0039
0040 tmpLog.debug(f"check with {comStr}")
0041 p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
0042 oldStatus = workSpec.status
0043 newStatus = None
0044
0045 stdOut, stdErr = p.communicate()
0046 retCode = p.returncode
0047 tmpLog.debug(f"retCode= {retCode}")
0048 tmpLog.debug(f"stdOut = {stdOut}")
0049 tmpLog.debug(f"stdErr = {stdErr}")
0050 errStr = ""
0051 if retCode == 0:
0052
0053
0054
0055
0056
0057 lines = stdOut.split("\n")
0058 parts = lines[2].split()
0059 batchid = parts[0]
0060 user = parts[1]
0061 walltime = parts[2]
0062 nodes = parts[3]
0063 state = parts[4]
0064
0065 if int(batchid) != int(workSpec.batchID):
0066 errStr += f"qstat returned status for wrong batch id {batchid} != {workSpec.batchID}"
0067 newStatus = WorkSpec.ST_failed
0068 else:
0069 if "running" in state:
0070 newStatus = WorkSpec.ST_running
0071 elif "queued" in state:
0072 newStatus = WorkSpec.ST_submitted
0073 elif "user_hold" in state:
0074 newStatus = WorkSpec.ST_submitted
0075 elif "starting" in state:
0076 newStatus = WorkSpec.ST_running
0077 elif "killing" in state:
0078 newStatus = WorkSpec.ST_failed
0079 elif "exiting" in state:
0080 newStatus = WorkSpec.ST_running
0081 elif "maxrun_hold" in state:
0082 newStatus = WorkSpec.ST_submitted
0083 else:
0084 raise Exception(f'failed to parse job state "{state}" qstat stdout: {stdOut}\n stderr: {stdErr}')
0085
0086 retList.append((newStatus, errStr))
0087 elif retCode == 1 and len(stdOut.strip()) == 0 and len(stdErr.strip()) == 0:
0088 tmpLog.debug("job has already exited, checking cobalt log for exit status")
0089
0090
0091
0092 cobalt_logfile = os.path.join(workSpec.get_access_point(), "cobalt.log")
0093 if os.path.exists(cobalt_logfile):
0094 return_code = None
0095 job_cancelled = False
0096 for line in open(cobalt_logfile):
0097
0098
0099 if "task completed normally" in line:
0100 start_index = line.find("exit code of ") + len("exit code of ")
0101 end_index = line.find(";", start_index)
0102 str_return_code = line[start_index:end_index]
0103 if "None" in str_return_code:
0104 return_code = -1
0105 else:
0106 return_code = int(str_return_code)
0107 break
0108 elif "maximum execution time exceeded" in line:
0109 errStr += " batch job exceeded wall clock time "
0110 elif "user delete requested" in line:
0111 errStr += " job was cancelled "
0112 job_cancelled = True
0113
0114 if return_code == 0:
0115 tmpLog.debug("job finished normally")
0116 newStatus = WorkSpec.ST_finished
0117 retList.append((newStatus, errStr))
0118 elif return_code is None:
0119 if job_cancelled:
0120 tmpLog.debug("job was cancelled")
0121 errStr += " job cancelled "
0122 newStatus = WorkSpec.ST_cancelled
0123 retList.append((newStatus, errStr))
0124 else:
0125 tmpLog.debug("job has no exit code, failing job")
0126 errStr += f" exit code not found in cobalt log file {cobalt_logfile} "
0127 newStatus = WorkSpec.ST_failed
0128 retList.append((newStatus, errStr))
0129 else:
0130 tmpLog.debug(f" non zero exit code {return_code} from batch job id {workSpec.batchID}")
0131 errStr += f" non-zero exit code {return_code} from batch job id {workSpec.batchID} "
0132 newStatus = WorkSpec.ST_failed
0133 retList.append((newStatus, errStr))
0134 else:
0135 tmpLog.debug(" cobalt log file does not exist")
0136 errStr += f" cobalt log file {cobalt_logfile} does not exist "
0137 newStatus = WorkSpec.ST_failed
0138 retList.append((newStatus, errStr))
0139
0140 tmpLog.debug(f"batchStatus {oldStatus} -> workerStatus {newStatus}")
0141 tmpLog.debug(f"errStr: {errStr}")
0142
0143 return True, retList