Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:04

0001 import datetime
0002 import subprocess
0003 import tempfile
0004 
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 
0008 # logger
0009 baseLogger = core_utils.setup_logger("pbs_submitter")
0010 
0011 
0012 # submitter for PBS batch system
0013 class PBSSubmitter(PluginBase):
0014     # constructor
0015     def __init__(self, **kwarg):
0016         self.uploadLog = False
0017         self.logBaseURL = None
0018         PluginBase.__init__(self, **kwarg)
0019         # template for batch script
0020         tmpFile = open(self.templateFile)
0021         self.template = tmpFile.read()
0022         tmpFile.close()
0023 
0024     # submit workers
0025     def submit_workers(self, workspec_list):
0026         retList = []
0027         for workSpec in workspec_list:
0028             # make logger
0029             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0030             # make batch script
0031             batchFile = self.make_batch_script(workSpec)
0032             # command
0033             comStr = f"qsub {batchFile}"
0034             # submit
0035             tmpLog.debug(f"submit with {comStr}")
0036             p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0037             # check return code
0038             stdOut, stdErr = p.communicate()
0039             retCode = p.returncode
0040             tmpLog.debug(f"retCode={retCode}")
0041             if retCode == 0:
0042                 # extract batchID
0043                 workSpec.batchID = stdOut.split()[-1]
0044                 tmpLog.debug(f"batchID={workSpec.batchID}")
0045                 # set log files
0046                 if self.uploadLog:
0047                     if self.logBaseURL is None:
0048                         baseDir = workSpec.get_access_point()
0049                     else:
0050                         baseDir = self.logBaseURL
0051                     stdOut, stdErr = self.get_log_file_names(batchFile, workSpec.batchID)
0052                     if stdOut is not None:
0053                         workSpec.set_log_file("stdout", f"{baseDir}/{stdOut}")
0054                     if stdErr is not None:
0055                         workSpec.set_log_file("stderr", f"{baseDir}/{stdErr}")
0056                 tmpRetVal = (True, "")
0057             else:
0058                 # failed
0059                 errStr = stdOut + " " + stdErr
0060                 tmpLog.error(errStr)
0061                 tmpRetVal = (False, errStr)
0062             retList.append(tmpRetVal)
0063         return retList
0064 
0065     # make batch script
0066     def make_batch_script(self, workspec):
0067         if hasattr(self, "dynamicSizing") and self.dynamicSizing is True:
0068             maxWalltime = str(datetime.timedelta(seconds=workspec.maxWalltime))
0069             yodaWallClockLimit = workspec.maxWalltime / 60
0070         else:
0071             workspec.nCore = self.nCore
0072             maxWalltime = str(datetime.timedelta(seconds=self.maxWalltime))
0073             yodaWallClockLimit = self.maxWalltime / 60
0074         tmpFile = tempfile.NamedTemporaryFile(delete=False, suffix="_submit.sh", dir=workspec.get_access_point())
0075         tmpFile.write(
0076             self.template.format(
0077                 nCorePerNode=self.nCorePerNode,
0078                 localQueue=self.localQueue,
0079                 projectName=self.projectName,
0080                 nNode=workspec.nCore / self.nCorePerNode,
0081                 accessPoint=workspec.accessPoint,
0082                 walltime=maxWalltime,
0083                 yodaWallClockLimit=yodaWallClockLimit,
0084                 workerID=workspec.workerID,
0085             )
0086         )
0087         tmpFile.close()
0088         return tmpFile.name
0089 
0090     # get log file names
0091     def get_log_file_names(self, batch_script, batch_id):
0092         stdOut = None
0093         stdErr = None
0094         with open(batch_script) as f:
0095             for line in f:
0096                 if not line.startswith("#PBS"):
0097                     continue
0098                 items = line.split()
0099                 if "-o" in items:
0100                     stdOut = items[-1].replace("$SPBS_JOBID", batch_id)
0101                 elif "-e" in items:
0102                     stdErr = items[-1].replace("$PBS_JOBID", batch_id)
0103         return stdOut, stdErr