Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import re
0002 import subprocess
0003 
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 from pandaharvester.harvestercore.work_spec import WorkSpec
0007 
0008 # logger
0009 baseLogger = core_utils.setup_logger("pbs_monitor")
0010 
0011 
0012 # monitor for PBS batch system
0013 class PBSMonitor(PluginBase):
0014     # constructor
0015     def __init__(self, **kwarg):
0016         PluginBase.__init__(self, **kwarg)
0017 
0018     # check workers
0019     def check_workers(self, workspec_list):
0020         retList = []
0021         for workSpec in workspec_list:
0022             # make logger
0023             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0024             # command
0025             comStr = f"qstat {workSpec.batchID}"
0026             # check
0027             tmpLog.debug(f"check with {comStr}")
0028             p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0029             newStatus = workSpec.status
0030             # check return code
0031             stdOut, stdErr = p.communicate()
0032             retCode = p.returncode
0033             tmpLog.debug(f"retCode={retCode}")
0034             errStr = ""
0035             if retCode == 0:
0036                 # parse
0037                 for tmpLine in stdOut.split("\n"):
0038                     tmpMatch = re.search(f"{workSpec.batchID} ", tmpLine)
0039                     if tmpMatch is not None:
0040                         errStr = tmpLine
0041                         batchStatus = tmpLine.split()[-2]
0042                         if batchStatus in ["R", "E"]:
0043                             newStatus = WorkSpec.ST_running
0044                         elif batchStatus in ["C", "H"]:
0045                             newStatus = WorkSpec.ST_finished
0046                         elif batchStatus in ["CANCELLED"]:
0047                             newStatus = WorkSpec.ST_cancelled
0048                         elif batchStatus in ["Q", "W", "S"]:
0049                             newStatus = WorkSpec.ST_submitted
0050                         else:
0051                             newStatus = WorkSpec.ST_failed
0052                         tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0053                         break
0054                 retList.append((newStatus, errStr))
0055             else:
0056                 # failed
0057                 errStr = stdOut + " " + stdErr
0058                 tmpLog.error(errStr)
0059                 if "Unknown Job Id Error" in errStr:
0060                     tmpLog.info("Mark job as finished.")
0061                     newStatus = WorkSpec.ST_finished
0062                 retList.append((newStatus, errStr))
0063         return True, retList