File indexing completed on 2026-04-19 08:00:04
0001 import datetime
0002 import re
0003 import subprocess
0004 import tempfile
0005
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008
0009
0010 baseLogger = core_utils.setup_logger("lsf_submitter")
0011
0012
0013
0014 class LSFSubmitter(PluginBase):
0015
0016 def __init__(self, **kwarg):
0017 self.uploadLog = False
0018 self.logBaseURL = None
0019 PluginBase.__init__(self, **kwarg)
0020
0021 tmpFile = open(self.templateFile)
0022 self.template = tmpFile.read()
0023 tmpFile.close()
0024
0025
0026 def submit_workers(self, workspec_list):
0027 retList = []
0028 for workSpec in workspec_list:
0029
0030 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0031
0032 batchFile = self.make_batch_script(workSpec)
0033
0034 comStr = "bsub -L /bin/sh"
0035
0036 tmpLog.debug(f"submit with {comStr} and LSF options file {batchFile}")
0037 p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=open(batchFile, "r"))
0038
0039 stdOut, stdErr = p.communicate()
0040 retCode = p.returncode
0041 tmpLog.debug(f"retCode={retCode}")
0042 tmpLog.debug(f"stdOut={stdOut}")
0043 tmpLog.debug(f"stdErr={stdErr}")
0044 if retCode == 0:
0045
0046 batchID = str(stdOut.split()[1], "utf-8")
0047 result = re.sub("[^0-9]", "", batchID)
0048 tmpLog.debug(f"strip out non-numberic charactors from {batchID} - result {result}")
0049 workSpec.batchID = result
0050 tmpLog.debug(f"batchID={workSpec.batchID}")
0051
0052 if self.uploadLog:
0053 if self.logBaseURL is None:
0054 baseDir = workSpec.get_access_point()
0055 else:
0056 baseDir = self.logBaseURL
0057 stdOut, stdErr = self.get_log_file_names(batchFile, workSpec.batchID)
0058 if stdOut is not None:
0059 workSpec.set_log_file("stdout", f"{baseDir}/{stdOut}")
0060 if stdErr is not None:
0061 workSpec.set_log_file("stderr", f"{baseDir}/{stdErr}")
0062 tmpRetVal = (True, "")
0063 else:
0064
0065 errStr = stdOut + " " + stdErr
0066 tmpLog.error(errStr)
0067 tmpRetVal = (False, errStr)
0068 retList.append(tmpRetVal)
0069 return retList
0070
0071
0072 def make_batch_script(self, workspec):
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082 if hasattr(self, "nGpuPerNode"):
0083 if int(self.nGpuPerNode) > 0:
0084 numnodes = int(workspec.nJobs / self.nGpuPerNode)
0085 if numnodes <= 0:
0086 numnodes = 1
0087 else:
0088 if (workspec.nJobs % self.nGpuPerNode) != 0:
0089 numnodes += 1
0090 else:
0091 numnodes = workspec.nCore / self.nCorePerNode
0092
0093 tmpFile = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="_submit.sh", dir=workspec.get_access_point())
0094 tmpFile.write(
0095 self.template.format(
0096 nCorePerNode=self.nCorePerNode,
0097
0098
0099 nNode=numnodes,
0100 accessPoint=workspec.accessPoint,
0101
0102
0103 workerID=workspec.workerID,
0104 )
0105 )
0106 tmpFile.close()
0107 return tmpFile.name
0108
0109
0110 def get_log_file_names(self, batch_script, batch_id):
0111 stdOut = None
0112 stdErr = None
0113 with open(batch_script) as f:
0114 for line in f:
0115 if not line.startswith("#BSUB"):
0116 continue
0117 items = line.split()
0118 if "-o" in items:
0119
0120 stdOut = items[-1].replace("%J", batch_id)
0121 elif "-e" in items:
0122
0123 stdErr = items[-1].replace("%J", batch_id)
0124 return stdOut, stdErr