File indexing completed on 2026-04-20 07:58:59
0001 import re
0002 import subprocess
0003
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 from pandaharvester.harvestercore.work_spec import WorkSpec
0007
0008
0009 baseLogger = core_utils.setup_logger("pbs_monitor")
0010
0011
0012
0013 class PBSMonitor(PluginBase):
0014
0015 def __init__(self, **kwarg):
0016 PluginBase.__init__(self, **kwarg)
0017
0018
0019 def check_workers(self, workspec_list):
0020 retList = []
0021 for workSpec in workspec_list:
0022
0023 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0024
0025 comStr = f"qstat {workSpec.batchID}"
0026
0027 tmpLog.debug(f"check with {comStr}")
0028 p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0029 newStatus = workSpec.status
0030
0031 stdOut, stdErr = p.communicate()
0032 retCode = p.returncode
0033 tmpLog.debug(f"retCode={retCode}")
0034 errStr = ""
0035 if retCode == 0:
0036
0037 for tmpLine in stdOut.split("\n"):
0038 tmpMatch = re.search(f"{workSpec.batchID} ", tmpLine)
0039 if tmpMatch is not None:
0040 errStr = tmpLine
0041 batchStatus = tmpLine.split()[-2]
0042 if batchStatus in ["R", "E"]:
0043 newStatus = WorkSpec.ST_running
0044 elif batchStatus in ["C", "H"]:
0045 newStatus = WorkSpec.ST_finished
0046 elif batchStatus in ["CANCELLED"]:
0047 newStatus = WorkSpec.ST_cancelled
0048 elif batchStatus in ["Q", "W", "S"]:
0049 newStatus = WorkSpec.ST_submitted
0050 else:
0051 newStatus = WorkSpec.ST_failed
0052 tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0053 break
0054 retList.append((newStatus, errStr))
0055 else:
0056
0057 errStr = stdOut + " " + stdErr
0058 tmpLog.error(errStr)
0059 if "Unknown Job Id Error" in errStr:
0060 tmpLog.info("Mark job as finished.")
0061 newStatus = WorkSpec.ST_finished
0062 retList.append((newStatus, errStr))
0063 return True, retList