Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:04

0001 import datetime
0002 import re
0003 import subprocess
0004 import tempfile
0005 
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 
0009 # logger
0010 baseLogger = core_utils.setup_logger("lsf_submitter")
0011 
0012 
0013 # submitter for LSF batch system
0014 class LSFSubmitter(PluginBase):
0015     # constructor
0016     def __init__(self, **kwarg):
0017         self.uploadLog = False
0018         self.logBaseURL = None
0019         PluginBase.__init__(self, **kwarg)
0020         # template for batch script
0021         tmpFile = open(self.templateFile)
0022         self.template = tmpFile.read()
0023         tmpFile.close()
0024 
0025     # submit workers
0026     def submit_workers(self, workspec_list):
0027         retList = []
0028         for workSpec in workspec_list:
0029             # make logger
0030             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0031             # make batch script
0032             batchFile = self.make_batch_script(workSpec)
0033             # command
0034             comStr = "bsub -L /bin/sh"
0035             # submit
0036             tmpLog.debug(f"submit with {comStr} and LSF options file {batchFile}")
0037             p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=open(batchFile, "r"))
0038             # check return code
0039             stdOut, stdErr = p.communicate()
0040             retCode = p.returncode
0041             tmpLog.debug(f"retCode={retCode}")
0042             tmpLog.debug(f"stdOut={stdOut}")
0043             tmpLog.debug(f"stdErr={stdErr}")
0044             if retCode == 0:
0045                 # extract batchID
0046                 batchID = str(stdOut.split()[1], "utf-8")
0047                 result = re.sub("[^0-9]", "", batchID)
0048                 tmpLog.debug(f"strip out non-numberic charactors from {batchID} - result {result}")
0049                 workSpec.batchID = result
0050                 tmpLog.debug(f"batchID={workSpec.batchID}")
0051                 # set log files
0052                 if self.uploadLog:
0053                     if self.logBaseURL is None:
0054                         baseDir = workSpec.get_access_point()
0055                     else:
0056                         baseDir = self.logBaseURL
0057                     stdOut, stdErr = self.get_log_file_names(batchFile, workSpec.batchID)
0058                     if stdOut is not None:
0059                         workSpec.set_log_file("stdout", f"{baseDir}/{stdOut}")
0060                     if stdErr is not None:
0061                         workSpec.set_log_file("stderr", f"{baseDir}/{stdErr}")
0062                 tmpRetVal = (True, "")
0063             else:
0064                 # failed
0065                 errStr = stdOut + " " + stdErr
0066                 tmpLog.error(errStr)
0067                 tmpRetVal = (False, errStr)
0068             retList.append(tmpRetVal)
0069         return retList
0070 
0071     # make batch script
0072     def make_batch_script(self, workspec):
0073         # if hasattr(self, 'dynamicSizing') and self.dynamicSizing is True:
0074         #    maxWalltime = str(datetime.timedelta(seconds=workspec.maxWalltime))
0075         #    yodaWallClockLimit = workspec.maxWalltime / 60
0076         # else:
0077         #    workspec.nCore = self.nCore
0078         #    maxWalltime = str(datetime.timedelta(seconds=self.maxWalltime))
0079         #    yodaWallClockLimit = self.maxWalltime / 60
0080 
0081         # set number of nodes  - Note Ultimately will need to something more sophisticated
0082         if hasattr(self, "nGpuPerNode"):
0083             if int(self.nGpuPerNode) > 0:
0084                 numnodes = int(workspec.nJobs / self.nGpuPerNode)
0085                 if numnodes <= 0:
0086                     numnodes = 1
0087                 else:
0088                     if (workspec.nJobs % self.nGpuPerNode) != 0:
0089                         numnodes += 1
0090         else:
0091             numnodes = workspec.nCore / self.nCorePerNode
0092 
0093         tmpFile = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="_submit.sh", dir=workspec.get_access_point())
0094         tmpFile.write(
0095             self.template.format(
0096                 nCorePerNode=self.nCorePerNode,
0097                 # localQueue=self.localQueue,
0098                 # projectName=self.projectName,
0099                 nNode=numnodes,
0100                 accessPoint=workspec.accessPoint,
0101                 # walltime=maxWalltime,
0102                 # yodaWallClockLimit=yodaWallClockLimit,
0103                 workerID=workspec.workerID,
0104             )
0105         )
0106         tmpFile.close()
0107         return tmpFile.name
0108 
0109     # get log file names
0110     def get_log_file_names(self, batch_script, batch_id):
0111         stdOut = None
0112         stdErr = None
0113         with open(batch_script) as f:
0114             for line in f:
0115                 if not line.startswith("#BSUB"):
0116                     continue
0117                 items = line.split()
0118                 if "-o" in items:
0119                     # stdOut = items[-1].replace('$LSB_BATCH_JID', batch_id)
0120                     stdOut = items[-1].replace("%J", batch_id)
0121                 elif "-e" in items:
0122                     # stdErr = items[-1].replace('$LSB_BATCH_JID', batch_id)
0123                     stdErr = items[-1].replace("%J", batch_id)
0124         return stdOut, stdErr