Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:04

0001 import os
0002 import subprocess
0003 import uuid
0004 from concurrent.futures import ProcessPoolExecutor as Pool
0005 
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestercore.work_spec import WorkSpec
0009 
0010 # setup base logger
0011 baseLogger = core_utils.setup_logger("dummy_mcore_submitter")
0012 
0013 
0014 # submit a worker using subprocess
0015 def submit_a_worker(workspec):
0016     tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="submit_a_worker")
0017     workspec.reset_changed_list()
0018     if workspec.get_jobspec_list() is not None:
0019         tmpLog.debug(f"aggregated nCore={workspec.nCore} minRamCount={workspec.minRamCount} maxDiskCount={workspec.maxDiskCount}")
0020         tmpLog.debug(f"max maxWalltime={workspec.maxWalltime}")
0021         for jobSpec in workspec.get_jobspec_list():
0022             tmpLog.debug(f"PandaID={jobSpec.PandaID} nCore={jobSpec.jobParams['coreCount']} RAM={jobSpec.jobParams['minRamCount']}")
0023         for job in workspec.jobspec_list:
0024             tmpLog.debug(" ".join([job.jobParams["transformation"], job.jobParams["jobPars"]]))
0025     workspec.batchID = f"batch_ID_{uuid.uuid4().hex}"
0026     workspec.queueName = "batch_queue_name"
0027     workspec.computingElement = "CE_name"
0028     f = open(os.path.join(workspec.accessPoint, "status.txt"), "w")
0029     f.write(WorkSpec.ST_submitted)
0030     f.close()
0031     # fake submission
0032     p = subprocess.Popen(["sleep", "3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0033     stdoutStr, stderrStr = p.communicate()
0034     return (True, stdoutStr + stderrStr), workspec.get_changed_attributes()
0035 
0036 
0037 # dummy submitter with multi-cores
0038 class DummyMcoreSubmitter(PluginBase):
0039     # constructor
0040     def __init__(self, **kwarg):
0041         self.logBaseURL = "http://localhost/test"
0042         PluginBase.__init__(self, **kwarg)
0043 
0044     # submit workers with multiple cores
0045     def submit_workers(self, workspec_list):
0046         tmpLog = self.make_logger(baseLogger, method_name="submit_workers")
0047         tmpLog.debug(f"start nWorkers={len(workspec_list)}")
0048         with Pool() as pool:
0049             retValList = pool.map(submit_a_worker, workspec_list)
0050         # propagate changed attributes
0051         retList = []
0052         for workSpec, tmpVal in zip(workspec_list, retValList):
0053             retVal, tmpDict = tmpVal
0054             workSpec.set_attributes_with_dict(tmpDict)
0055             workSpec.set_log_file("batch_log", f"{self.logBaseURL}/{workSpec.batchID}.log")
0056             workSpec.set_log_file("stdout", f"{self.logBaseURL}/{workSpec.batchID}.out")
0057             workSpec.set_log_file("stderr", f"{self.logBaseURL}/{workSpec.batchID}.err")
0058             retList.append(retVal)
0059         tmpLog.debug("done")
0060         return retList