Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:04

0001 import os
0002 import stat
0003 import subprocess
0004 import tempfile
0005 
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 
0009 # logger
0010 baseLogger = core_utils.setup_logger("cobalt_submitter")
0011 
0012 
0013 # submitter for Cobalt batch system
0014 class CobaltSubmitter(PluginBase):
0015     # constructor
0016     def __init__(self, **kwarg):
0017         self.uploadLog = False
0018         self.logBaseURL = None
0019         PluginBase.__init__(self, **kwarg)
0020         # template for batch script
0021         tmpFile = open(self.templateFile)
0022         self.template = tmpFile.read()
0023         tmpFile.close()
0024 
0025     # submit workers
0026     def submit_workers(self, workspec_list):
0027         retList = []
0028         retStrList = []
0029         for workSpec in workspec_list:
0030             # make logger
0031             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="submit_workers")
0032             # set nCore
0033             workSpec.nCore = self.nCore
0034             # make batch script
0035             batchFile = self.make_batch_script(workSpec)
0036             # command
0037             # DPBcomStr = "qsub --cwd {0} {1}".format(workSpec.get_access_point(), batchFile)
0038             comStr = f"qsub {batchFile}"
0039             # submit
0040             tmpLog.debug(f"submit with {batchFile}")
0041             p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
0042             # check return code
0043             stdOut, stdErr = p.communicate()
0044             retCode = p.returncode
0045             tmpLog.debug(f"retCode={retCode}")
0046             if retCode == 0:
0047                 # extract batchID
0048                 workSpec.batchID = stdOut.split()[-1]
0049                 tmpLog.debug(f"batchID={workSpec.batchID}")
0050                 # set log files
0051                 if self.uploadLog:
0052                     if self.logBaseURL is None:
0053                         baseDir = workSpec.get_access_point()
0054                     else:
0055                         baseDir = self.logBaseURL
0056                     batchLog, stdOut, stdErr = self.get_log_file_names(batchFile, workSpec.batchID)
0057                     if batchLog is not None:
0058                         workSpec.set_log_file("batch_log", "{0}/{0}".format(baseDir, batchLog))
0059                     if stdOut is not None:
0060                         workSpec.set_log_file("stdout", f"{baseDir}/{stdOut}")
0061                     if stdErr is not None:
0062                         workSpec.set_log_file("stderr", f"{baseDir}/{stdErr}")
0063                 tmpRetVal = (True, "")
0064             else:
0065                 # failed
0066                 errStr = stdOut + " " + stdErr
0067                 tmpLog.error(errStr)
0068                 tmpRetVal = (False, errStr)
0069             retList.append(tmpRetVal)
0070         return retList
0071 
0072     # make batch script
0073     def make_batch_script(self, workspec):
0074         tmpFile = tempfile.NamedTemporaryFile(mode="w+t", delete=False, suffix="_submit.sh", dir=workspec.get_access_point())
0075         tmpFile.write(self.template.format(nNode=int(workspec.nCore / self.nCorePerNode), accessPoint=workspec.accessPoint, workerID=workspec.workerID))
0076         tmpFile.close()
0077 
0078         # set execution bit on the temp file
0079         st = os.stat(tmpFile.name)
0080         os.chmod(tmpFile.name, st.st_mode | stat.S_IEXEC | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH)
0081 
0082         return tmpFile.name
0083 
0084     # get log file names
0085     def get_log_file_names(self, batch_script, batch_id):
0086         batchLog = None
0087         stdOut = None
0088         stdErr = None
0089         with open(batch_script) as f:
0090             for line in f:
0091                 if not line.startswith("#COBALT"):
0092                     continue
0093                 items = line.split()
0094                 if "--debuglog" in items:
0095                     batchLog = items[-1].replace("$COBALT_JOBID", batch_id)
0096                 elif "-o" in items:
0097                     stdOut = items[-1].replace("$COBALT_JOBID", batch_id)
0098                 elif "-e" in items:
0099                     stdErr = items[-1].replace("$COBALT_JOBID", batch_id)
0100         return batchLog, stdOut, stdErr