File indexing completed on 2026-04-19 08:00:04
0001 import os
0002 import uuid
0003
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 from pandaharvester.harvestercore.work_spec import WorkSpec
0007
0008
0009 baseLogger = core_utils.setup_logger("dummy_submitter")
0010
0011
0012
0013 class DummySubmitter(PluginBase):
0014
0015 def __init__(self, **kwarg):
0016 self.logBaseURL = "http://localhost/test"
0017 PluginBase.__init__(self, **kwarg)
0018
0019
0020 def submit_workers(self, workspec_list):
0021 """Submit workers to a scheduling system like batch systems and computing elements.
0022 This method takes a list of WorkSpecs as input argument, and returns a list of tuples.
0023 Each tuple is composed of a return code and a dialog message.
0024 The return code could be True (for success), False (for permanent failures), or None (for temporary failures).
0025 If the return code is None, submission is retried maxSubmissionAttempts times at most which is defined
0026 for each queue in queue_config.json.
0027 Nth tuple in the returned list corresponds to submission status and dialog message for Nth worker
0028 in the given WorkSpec list.
0029 A unique identifier is set to WorkSpec.batchID when submission is successful,
0030 so that they can be identified in the scheduling system. It would be useful to set other attributes
0031 like queueName (batch queue name), computingElement (CE's host name), and nodeID (identifier of the node
0032 where the worker is running).
0033
0034 :param workspec_list: a list of work specs instances
0035 :return: A list of tuples. Each tuple is composed of submission status (True for success,
0036 False for permanent failures, None for temporary failures) and dialog message
0037 :rtype: [(bool, string),]
0038 """
0039 tmpLog = self.make_logger(baseLogger, method_name="submit_workers")
0040 tmpLog.debug(f"start nWorkers={len(workspec_list)}")
0041 retList = []
0042 for workSpec in workspec_list:
0043 workSpec.batchID = f"batch_ID_{uuid.uuid4().hex}"
0044 workSpec.queueName = "batch_queue_name"
0045 workSpec.computingElement = "CE_name"
0046 workSpec.set_log_file("batch_log", f"{self.logBaseURL}/{workSpec.batchID}.log")
0047 workSpec.set_log_file("stdout", f"{self.logBaseURL}/{workSpec.batchID}.out")
0048 workSpec.set_log_file("stderr", f"{self.logBaseURL}/{workSpec.batchID}.err")
0049 if workSpec.get_jobspec_list() is not None:
0050 tmpLog.debug(f"aggregated nCore={workSpec.nCore} minRamCount={workSpec.minRamCount} maxDiskCount={workSpec.maxDiskCount}")
0051 tmpLog.debug(f"max maxWalltime={workSpec.maxWalltime}")
0052 for jobSpec in workSpec.get_jobspec_list():
0053 tmpLog.debug(f"PandaID={jobSpec.PandaID} nCore={jobSpec.jobParams['coreCount']} RAM={jobSpec.jobParams['minRamCount']}")
0054
0055 jobSpec.set_one_attribute("pilotID", workSpec.workAttributes["batchLog"])
0056 for job in workSpec.jobspec_list:
0057 tmpLog.debug(" ".join([job.jobParams["transformation"], job.jobParams["jobPars"]]))
0058 f = open(os.path.join(workSpec.accessPoint, "status.txt"), "w")
0059 f.write(WorkSpec.ST_submitted)
0060 f.close()
0061 retList.append((True, ""))
0062 tmpLog.debug("done")
0063 return retList