File indexing completed on 2026-04-19 08:00:04
0001 import datetime
0002 import subprocess
0003 import tempfile
0004
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007
0008
0009 baseLogger = core_utils.setup_logger("pbs_submitter")
0010
0011
0012
0013 class PBSSubmitter(PluginBase):
0014
0015 def __init__(self, **kwarg):
0016 self.uploadLog = False
0017 self.logBaseURL = None
0018 PluginBase.__init__(self, **kwarg)
0019
0020 tmpFile = open(self.templateFile)
0021 self.template = tmpFile.read()
0022 tmpFile.close()
0023
0024
0025 def submit_workers(self, workspec_list):
0026 retList = []
0027 for workSpec in workspec_list:
0028
0029 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0030
0031 batchFile = self.make_batch_script(workSpec)
0032
0033 comStr = f"qsub {batchFile}"
0034
0035 tmpLog.debug(f"submit with {comStr}")
0036 p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0037
0038 stdOut, stdErr = p.communicate()
0039 retCode = p.returncode
0040 tmpLog.debug(f"retCode={retCode}")
0041 if retCode == 0:
0042
0043 workSpec.batchID = stdOut.split()[-1]
0044 tmpLog.debug(f"batchID={workSpec.batchID}")
0045
0046 if self.uploadLog:
0047 if self.logBaseURL is None:
0048 baseDir = workSpec.get_access_point()
0049 else:
0050 baseDir = self.logBaseURL
0051 stdOut, stdErr = self.get_log_file_names(batchFile, workSpec.batchID)
0052 if stdOut is not None:
0053 workSpec.set_log_file("stdout", f"{baseDir}/{stdOut}")
0054 if stdErr is not None:
0055 workSpec.set_log_file("stderr", f"{baseDir}/{stdErr}")
0056 tmpRetVal = (True, "")
0057 else:
0058
0059 errStr = stdOut + " " + stdErr
0060 tmpLog.error(errStr)
0061 tmpRetVal = (False, errStr)
0062 retList.append(tmpRetVal)
0063 return retList
0064
0065
0066 def make_batch_script(self, workspec):
0067 if hasattr(self, "dynamicSizing") and self.dynamicSizing is True:
0068 maxWalltime = str(datetime.timedelta(seconds=workspec.maxWalltime))
0069 yodaWallClockLimit = workspec.maxWalltime / 60
0070 else:
0071 workspec.nCore = self.nCore
0072 maxWalltime = str(datetime.timedelta(seconds=self.maxWalltime))
0073 yodaWallClockLimit = self.maxWalltime / 60
0074 tmpFile = tempfile.NamedTemporaryFile(delete=False, suffix="_submit.sh", dir=workspec.get_access_point())
0075 tmpFile.write(
0076 self.template.format(
0077 nCorePerNode=self.nCorePerNode,
0078 localQueue=self.localQueue,
0079 projectName=self.projectName,
0080 nNode=workspec.nCore / self.nCorePerNode,
0081 accessPoint=workspec.accessPoint,
0082 walltime=maxWalltime,
0083 yodaWallClockLimit=yodaWallClockLimit,
0084 workerID=workspec.workerID,
0085 )
0086 )
0087 tmpFile.close()
0088 return tmpFile.name
0089
0090
0091 def get_log_file_names(self, batch_script, batch_id):
0092 stdOut = None
0093 stdErr = None
0094 with open(batch_script) as f:
0095 for line in f:
0096 if not line.startswith("#PBS"):
0097 continue
0098 items = line.split()
0099 if "-o" in items:
0100 stdOut = items[-1].replace("$SPBS_JOBID", batch_id)
0101 elif "-e" in items:
0102 stdErr = items[-1].replace("$PBS_JOBID", batch_id)
0103 return stdOut, stdErr