File indexing completed on 2026-04-19 08:00:04
0001 import os
0002 import subprocess
0003 import uuid
0004 from concurrent.futures import ProcessPoolExecutor as Pool
0005
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestercore.work_spec import WorkSpec
0009
0010
0011 baseLogger = core_utils.setup_logger("dummy_mcore_submitter")
0012
0013
0014
0015 def submit_a_worker(workspec):
0016 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="submit_a_worker")
0017 workspec.reset_changed_list()
0018 if workspec.get_jobspec_list() is not None:
0019 tmpLog.debug(f"aggregated nCore={workspec.nCore} minRamCount={workspec.minRamCount} maxDiskCount={workspec.maxDiskCount}")
0020 tmpLog.debug(f"max maxWalltime={workspec.maxWalltime}")
0021 for jobSpec in workspec.get_jobspec_list():
0022 tmpLog.debug(f"PandaID={jobSpec.PandaID} nCore={jobSpec.jobParams['coreCount']} RAM={jobSpec.jobParams['minRamCount']}")
0023 for job in workspec.jobspec_list:
0024 tmpLog.debug(" ".join([job.jobParams["transformation"], job.jobParams["jobPars"]]))
0025 workspec.batchID = f"batch_ID_{uuid.uuid4().hex}"
0026 workspec.queueName = "batch_queue_name"
0027 workspec.computingElement = "CE_name"
0028 f = open(os.path.join(workspec.accessPoint, "status.txt"), "w")
0029 f.write(WorkSpec.ST_submitted)
0030 f.close()
0031
0032 p = subprocess.Popen(["sleep", "3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0033 stdoutStr, stderrStr = p.communicate()
0034 return (True, stdoutStr + stderrStr), workspec.get_changed_attributes()
0035
0036
0037
0038 class DummyMcoreSubmitter(PluginBase):
0039
0040 def __init__(self, **kwarg):
0041 self.logBaseURL = "http://localhost/test"
0042 PluginBase.__init__(self, **kwarg)
0043
0044
0045 def submit_workers(self, workspec_list):
0046 tmpLog = self.make_logger(baseLogger, method_name="submit_workers")
0047 tmpLog.debug(f"start nWorkers={len(workspec_list)}")
0048 with Pool() as pool:
0049 retValList = pool.map(submit_a_worker, workspec_list)
0050
0051 retList = []
0052 for workSpec, tmpVal in zip(workspec_list, retValList):
0053 retVal, tmpDict = tmpVal
0054 workSpec.set_attributes_with_dict(tmpDict)
0055 workSpec.set_log_file("batch_log", f"{self.logBaseURL}/{workSpec.batchID}.log")
0056 workSpec.set_log_file("stdout", f"{self.logBaseURL}/{workSpec.batchID}.out")
0057 workSpec.set_log_file("stderr", f"{self.logBaseURL}/{workSpec.batchID}.err")
0058 retList.append(retVal)
0059 tmpLog.debug("done")
0060 return retList