File indexing completed on 2026-04-19 08:00:04
0001 import os
0002 import stat
0003 import subprocess
0004 import tempfile
0005
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008
0009
0010 baseLogger = core_utils.setup_logger("cobalt_submitter")
0011
0012
0013
0014 class CobaltSubmitter(PluginBase):
0015
0016 def __init__(self, **kwarg):
0017 self.uploadLog = False
0018 self.logBaseURL = None
0019 PluginBase.__init__(self, **kwarg)
0020
0021 tmpFile = open(self.templateFile)
0022 self.template = tmpFile.read()
0023 tmpFile.close()
0024
0025
0026 def submit_workers(self, workspec_list):
0027 retList = []
0028 retStrList = []
0029 for workSpec in workspec_list:
0030
0031 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0032
0033 workSpec.nCore = self.nCore
0034
0035 batchFile = self.make_batch_script(workSpec)
0036
0037
0038 comStr = f"qsub {batchFile}"
0039
0040 tmpLog.debug(f"submit with {batchFile}")
0041 p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
0042
0043 stdOut, stdErr = p.communicate()
0044 retCode = p.returncode
0045 tmpLog.debug(f"retCode={retCode}")
0046 if retCode == 0:
0047
0048 workSpec.batchID = stdOut.split()[-1]
0049 tmpLog.debug(f"batchID={workSpec.batchID}")
0050
0051 if self.uploadLog:
0052 if self.logBaseURL is None:
0053 baseDir = workSpec.get_access_point()
0054 else:
0055 baseDir = self.logBaseURL
0056 batchLog, stdOut, stdErr = self.get_log_file_names(batchFile, workSpec.batchID)
0057 if batchLog is not None:
0058 workSpec.set_log_file("batch_log", "{0}/{0}".format(baseDir, batchLog))
0059 if stdOut is not None:
0060 workSpec.set_log_file("stdout", f"{baseDir}/{stdOut}")
0061 if stdErr is not None:
0062 workSpec.set_log_file("stderr", f"{baseDir}/{stdErr}")
0063 tmpRetVal = (True, "")
0064 else:
0065
0066 errStr = stdOut + " " + stdErr
0067 tmpLog.error(errStr)
0068 tmpRetVal = (False, errStr)
0069 retList.append(tmpRetVal)
0070 return retList
0071
0072
0073 def make_batch_script(self, workspec):
0074 tmpFile = tempfile.NamedTemporaryFile(mode="w+t", delete=False, suffix="_submit.sh", dir=workspec.get_access_point())
0075 tmpFile.write(self.template.format(nNode=int(workspec.nCore / self.nCorePerNode), accessPoint=workspec.accessPoint, workerID=workspec.workerID))
0076 tmpFile.close()
0077
0078
0079 st = os.stat(tmpFile.name)
0080 os.chmod(tmpFile.name, st.st_mode | stat.S_IEXEC | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH)
0081
0082 return tmpFile.name
0083
0084
0085 def get_log_file_names(self, batch_script, batch_id):
0086 batchLog = None
0087 stdOut = None
0088 stdErr = None
0089 with open(batch_script) as f:
0090 for line in f:
0091 if not line.startswith("#COBALT"):
0092 continue
0093 items = line.split()
0094 if "--debuglog" in items:
0095 batchLog = items[-1].replace("$COBALT_JOBID", batch_id)
0096 elif "-o" in items:
0097 stdOut = items[-1].replace("$COBALT_JOBID", batch_id)
0098 elif "-e" in items:
0099 stdErr = items[-1].replace("$COBALT_JOBID", batch_id)
0100 return batchLog, stdOut, stdErr