File indexing completed on 2026-04-20 07:58:59
0001 import re
0002 import subprocess
0003 from shlex import quote, split
0004
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestercore.work_spec import WorkSpec
0008
0009
0010 baseLogger = core_utils.setup_logger("lsf_monitor")
0011
0012
0013
0014 class LSFMonitor(PluginBase):
0015
0016 def __init__(self, **kwarg):
0017 PluginBase.__init__(self, **kwarg)
0018
0019
0020 def check_workers(self, workspec_list):
0021 retList = []
0022 for workSpec in workspec_list:
0023
0024 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0025
0026 comStr = f"bjobs -a -noheader -o {quote('jobid:10 stat:10')} {workSpec.batchID} "
0027 comStr_split = split(comStr)
0028
0029 p = subprocess.Popen(comStr_split, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0030 newStatus = workSpec.status
0031
0032 stdOut, stdErr = p.communicate()
0033 retCode = p.returncode
0034 tmpLog.debug(f"len(stdOut) = {len(str(stdOut))} stdOut={stdOut}")
0035 tmpLog.debug(f"len(stdErr) = {len(str(stdErr))} stdErr={stdErr}")
0036 tmpLog.debug(f"retCode={retCode}")
0037 errStr = ""
0038 if retCode == 0:
0039
0040 tempresponse = ""
0041 if len(str(stdOut)) >= len(str(stdErr)):
0042 tempresponse = str(stdOut)
0043 else:
0044 tempresponse = str(stdErr)
0045
0046
0047 for tmpLine in tempresponse.split("\n"):
0048 tmpMatch = re.search(f"{workSpec.batchID}", tmpLine)
0049 tmpLog.debug(f"tmpLine = {tmpLine} tmpMatch = {tmpMatch}")
0050 if tmpMatch is not None:
0051 errStr = tmpLine
0052
0053 tmpMatch = re.search("is not found", tmpLine)
0054 if tmpMatch is not None:
0055 batchStatus = f"Job {workSpec.batchID} is not found"
0056 newStatus = WorkSpec.ST_failed
0057 tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {retCode}")
0058 else:
0059 batchStatus = tmpLine.split()[-2]
0060 if batchStatus in ["RUN"]:
0061 newStatus = WorkSpec.ST_running
0062 elif batchStatus in ["DONE"]:
0063 newStatus = WorkSpec.ST_finished
0064 elif batchStatus in ["PEND", "PROV", "WAIT"]:
0065 newStatus = WorkSpec.ST_submitted
0066 else:
0067 newStatus = WorkSpec.ST_failed
0068 tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0069 break
0070 retList.append((newStatus, errStr))
0071 else:
0072
0073 errStr = stdOut + " " + stdErr
0074 tmpLog.error(errStr)
0075 if "Unknown Job Id Error" in errStr:
0076 tmpLog.info("Mark job as finished.")
0077 newStatus = WorkSpec.ST_finished
0078 retList.append((newStatus, errStr))
0079 return True, retList