Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import re
0002 import subprocess
0003 from shlex import quote, split
0004 
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestercore.work_spec import WorkSpec
0008 
0009 # logger
0010 baseLogger = core_utils.setup_logger("lsf_monitor")
0011 
0012 
0013 # monitor for LSF batch system
0014 class LSFMonitor(PluginBase):
0015     # constructor
0016     def __init__(self, **kwarg):
0017         PluginBase.__init__(self, **kwarg)
0018 
0019     # check workers
0020     def check_workers(self, workspec_list):
0021         retList = []
0022         for workSpec in workspec_list:
0023             # make logger
0024             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0025             # command
0026             comStr = f"bjobs -a -noheader -o {quote('jobid:10 stat:10')} {workSpec.batchID} "
0027             comStr_split = split(comStr)
0028             # check
0029             p = subprocess.Popen(comStr_split, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0030             newStatus = workSpec.status
0031             # check return code
0032             stdOut, stdErr = p.communicate()
0033             retCode = p.returncode
0034             tmpLog.debug(f"len(stdOut) = {len(str(stdOut))} stdOut={stdOut}")
0035             tmpLog.debug(f"len(stdErr) = {len(str(stdErr))}  stdErr={stdErr}")
0036             tmpLog.debug(f"retCode={retCode}")
0037             errStr = ""
0038             if retCode == 0:
0039                 # check if any came back on stdOut otherwise check stdErr
0040                 tempresponse = ""
0041                 if len(str(stdOut)) >= len(str(stdErr)):
0042                     tempresponse = str(stdOut)
0043                 else:
0044                     tempresponse = str(stdErr)
0045                 # tmpLog.debug('tempresponse = {0}'.format(tempresponse))
0046                 # parse
0047                 for tmpLine in tempresponse.split("\n"):
0048                     tmpMatch = re.search(f"{workSpec.batchID}", tmpLine)
0049                     tmpLog.debug(f"tmpLine = {tmpLine} tmpMatch = {tmpMatch}")
0050                     if tmpMatch is not None:
0051                         errStr = tmpLine
0052                         # search for phrase  is not found
0053                         tmpMatch = re.search("is not found", tmpLine)
0054                         if tmpMatch is not None:
0055                             batchStatus = f"Job {workSpec.batchID} is not found"
0056                             newStatus = WorkSpec.ST_failed
0057                             tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {retCode}")
0058                         else:
0059                             batchStatus = tmpLine.split()[-2]
0060                             if batchStatus in ["RUN"]:
0061                                 newStatus = WorkSpec.ST_running
0062                             elif batchStatus in ["DONE"]:
0063                                 newStatus = WorkSpec.ST_finished
0064                             elif batchStatus in ["PEND", "PROV", "WAIT"]:
0065                                 newStatus = WorkSpec.ST_submitted
0066                             else:
0067                                 newStatus = WorkSpec.ST_failed
0068                             tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0069                         break
0070                 retList.append((newStatus, errStr))
0071             else:
0072                 # failed
0073                 errStr = stdOut + " " + stdErr
0074                 tmpLog.error(errStr)
0075                 if "Unknown Job Id Error" in errStr:
0076                     tmpLog.info("Mark job as finished.")
0077                     newStatus = WorkSpec.ST_finished
0078                 retList.append((newStatus, errStr))
0079         return True, retList