Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:04

0001 import os
0002 import uuid
0003 
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 from pandaharvester.harvestercore.work_spec import WorkSpec
0007 
0008 # setup base logger
0009 baseLogger = core_utils.setup_logger("dummy_submitter")
0010 
0011 
0012 # dummy submitter
0013 class DummySubmitter(PluginBase):
0014     # constructor
0015     def __init__(self, **kwarg):
0016         self.logBaseURL = "http://localhost/test"
0017         PluginBase.__init__(self, **kwarg)
0018 
0019     # submit workers
0020     def submit_workers(self, workspec_list):
0021         """Submit workers to a scheduling system like batch systems and computing elements.
0022         This method takes a list of WorkSpecs as input argument, and returns a list of tuples.
0023         Each tuple is composed of a return code and a dialog message.
0024         The return code could be True (for success), False (for permanent failures), or None (for temporary failures).
0025         If the return code is None, submission is retried maxSubmissionAttempts times at most which is defined
0026         for each queue in queue_config.json.
0027         Nth tuple in the returned list corresponds to submission status and dialog message for Nth worker
0028         in the given WorkSpec list.
0029         A unique identifier is set to WorkSpec.batchID when submission is successful,
0030         so that they can be identified in the scheduling system. It would be useful to set other attributes
0031         like queueName (batch queue name), computingElement (CE's host name), and nodeID (identifier of the node
0032         where the worker is running).
0033 
0034         :param workspec_list: a list of work specs instances
0035         :return: A list of tuples. Each tuple is composed of submission status (True for success,
0036         False for permanent failures, None for temporary failures) and dialog message
0037         :rtype: [(bool, string),]
0038         """
0039         tmpLog = self.make_logger(baseLogger, method_name="submit_workers")
0040         tmpLog.debug(f"start nWorkers={len(workspec_list)}")
0041         retList = []
0042         for workSpec in workspec_list:
0043             workSpec.batchID = f"batch_ID_{uuid.uuid4().hex}"
0044             workSpec.queueName = "batch_queue_name"
0045             workSpec.computingElement = "CE_name"
0046             workSpec.set_log_file("batch_log", f"{self.logBaseURL}/{workSpec.batchID}.log")
0047             workSpec.set_log_file("stdout", f"{self.logBaseURL}/{workSpec.batchID}.out")
0048             workSpec.set_log_file("stderr", f"{self.logBaseURL}/{workSpec.batchID}.err")
0049             if workSpec.get_jobspec_list() is not None:
0050                 tmpLog.debug(f"aggregated nCore={workSpec.nCore} minRamCount={workSpec.minRamCount} maxDiskCount={workSpec.maxDiskCount}")
0051                 tmpLog.debug(f"max maxWalltime={workSpec.maxWalltime}")
0052                 for jobSpec in workSpec.get_jobspec_list():
0053                     tmpLog.debug(f"PandaID={jobSpec.PandaID} nCore={jobSpec.jobParams['coreCount']} RAM={jobSpec.jobParams['minRamCount']}")
0054                     # using batchLog URL as pilot ID
0055                     jobSpec.set_one_attribute("pilotID", workSpec.workAttributes["batchLog"])
0056                 for job in workSpec.jobspec_list:
0057                     tmpLog.debug(" ".join([job.jobParams["transformation"], job.jobParams["jobPars"]]))
0058             f = open(os.path.join(workSpec.accessPoint, "status.txt"), "w")
0059             f.write(WorkSpec.ST_submitted)
0060             f.close()
0061             retList.append((True, ""))
0062         tmpLog.debug("done")
0063         return retList