Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import json
0002 import os.path
0003 import re
0004 import subprocess
0005 from pprint import pprint
0006 
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009 from pandaharvester.harvestercore.work_spec import WorkSpec
0010 
0011 # logger
0012 baseLogger = core_utils.setup_logger("cobalt_monitor")
0013 
0014 
0015 # qstat output
0016 # JobID  User     WallTime  Nodes  State   Location
0017 # ===================================================
0018 # 77734  fcurtis  06:00:00  64     queued  None
0019 
0020 
0021 # monitor for HTCONDOR batch system
0022 class CobaltMonitor(PluginBase):
0023     # constructor
0024     def __init__(self, **kwarg):
0025         PluginBase.__init__(self, **kwarg)
0026 
0027     # check workers
0028     def check_workers(self, workspec_list):
0029         retList = []
0030         for workSpec in workspec_list:
0031             # print "pprint(dir(workSpec))"
0032             # pprint(dir(workSpec))
0033             # print "pprint(vars(workSpec))"
0034             # pprint(vars(workSpec))
0035             # make logger
0036             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0037             # first command
0038             comStr = f"qstat {workSpec.batchID}"
0039             # first check
0040             tmpLog.debug(f"check with {comStr}")
0041             p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
0042             oldStatus = workSpec.status
0043             newStatus = None
0044             # first check return code
0045             stdOut, stdErr = p.communicate()
0046             retCode = p.returncode
0047             tmpLog.debug(f"retCode= {retCode}")
0048             tmpLog.debug(f"stdOut = {stdOut}")
0049             tmpLog.debug(f"stdErr = {stdErr}")
0050             errStr = ""
0051             if retCode == 0:
0052                 # batch job is still running and has a state, output looks like this:
0053                 # JobID   User    WallTime  Nodes  State   Location
0054                 # ===================================================
0055                 # 124559  hdshin  06:00:00  64     queued  None
0056 
0057                 lines = stdOut.split("\n")
0058                 parts = lines[2].split()
0059                 batchid = parts[0]
0060                 user = parts[1]
0061                 walltime = parts[2]
0062                 nodes = parts[3]
0063                 state = parts[4]
0064 
0065                 if int(batchid) != int(workSpec.batchID):
0066                     errStr += f"qstat returned status for wrong batch id {batchid} != {workSpec.batchID}"
0067                     newStatus = WorkSpec.ST_failed
0068                 else:
0069                     if "running" in state:
0070                         newStatus = WorkSpec.ST_running
0071                     elif "queued" in state:
0072                         newStatus = WorkSpec.ST_submitted
0073                     elif "user_hold" in state:
0074                         newStatus = WorkSpec.ST_submitted
0075                     elif "starting" in state:
0076                         newStatus = WorkSpec.ST_running
0077                     elif "killing" in state:
0078                         newStatus = WorkSpec.ST_failed
0079                     elif "exiting" in state:
0080                         newStatus = WorkSpec.ST_running
0081                     elif "maxrun_hold" in state:
0082                         newStatus = WorkSpec.ST_submitted
0083                     else:
0084                         raise Exception(f'failed to parse job state "{state}" qstat stdout: {stdOut}\n stderr: {stdErr}')
0085 
0086                 retList.append((newStatus, errStr))
0087             elif retCode == 1 and len(stdOut.strip()) == 0 and len(stdErr.strip()) == 0:
0088                 tmpLog.debug("job has already exited, checking cobalt log for exit status")
0089                 # exit code 1 and stdOut/stdErr has no content means job exited
0090                 # need to look at cobalt log to determine exit status
0091 
0092                 cobalt_logfile = os.path.join(workSpec.get_access_point(), "cobalt.log")
0093                 if os.path.exists(cobalt_logfile):
0094                     return_code = None
0095                     job_cancelled = False
0096                     for line in open(cobalt_logfile):
0097                         # looking for line like this:
0098                         # Thu Aug 24 19:01:20 2017 +0000 (UTC) Info: task completed normally with an exit code of 0; initiating job cleanup and removal
0099                         if "task completed normally" in line:
0100                             start_index = line.find("exit code of ") + len("exit code of ")
0101                             end_index = line.find(";", start_index)
0102                             str_return_code = line[start_index:end_index]
0103                             if "None" in str_return_code:
0104                                 return_code = -1
0105                             else:
0106                                 return_code = int(str_return_code)
0107                             break
0108                         elif "maximum execution time exceeded" in line:
0109                             errStr += " batch job exceeded wall clock time "
0110                         elif "user delete requested" in line:
0111                             errStr += " job was cancelled "
0112                             job_cancelled = True
0113 
0114                     if return_code == 0:
0115                         tmpLog.debug("job finished normally")
0116                         newStatus = WorkSpec.ST_finished
0117                         retList.append((newStatus, errStr))
0118                     elif return_code is None:
0119                         if job_cancelled:
0120                             tmpLog.debug("job was cancelled")
0121                             errStr += " job cancelled "
0122                             newStatus = WorkSpec.ST_cancelled
0123                             retList.append((newStatus, errStr))
0124                         else:
0125                             tmpLog.debug("job has no exit code, failing job")
0126                             errStr += f" exit code not found in cobalt log file {cobalt_logfile} "
0127                             newStatus = WorkSpec.ST_failed
0128                             retList.append((newStatus, errStr))
0129                     else:
0130                         tmpLog.debug(f" non zero exit code {return_code} from batch job id {workSpec.batchID}")
0131                         errStr += f" non-zero exit code {return_code} from batch job id {workSpec.batchID} "
0132                         newStatus = WorkSpec.ST_failed
0133                         retList.append((newStatus, errStr))
0134                 else:
0135                     tmpLog.debug(" cobalt log file does not exist")
0136                     errStr += f" cobalt log file {cobalt_logfile} does not exist "
0137                     newStatus = WorkSpec.ST_failed
0138                     retList.append((newStatus, errStr))
0139 
0140             tmpLog.debug(f"batchStatus {oldStatus} -> workerStatus {newStatus}")
0141             tmpLog.debug(f"errStr: {errStr}")
0142 
0143         return True, retList