Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:04

0001 import os
0002 import random
0003 
0004 import radical.utils
0005 import saga
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestercore.work_spec import WorkSpec as ws
0009 
0010 # setup base logger
0011 baseLogger = core_utils.setup_logger("saga_submitter")
0012 
0013 # SAGA submitter
0014 
0015 
0016 class SAGASubmitter(PluginBase):
0017     # constructor
0018     # constructor define job service with particular adaptor (can be extended to support remote execution)
0019     def __init__(self, **kwarg):
0020         PluginBase.__init__(self, **kwarg)
0021         tmpLog = self.make_logger(baseLogger, method_name="__init__")
0022         tmpLog.info(f"[{self.adaptor}] SAGA adaptor will be used")
0023 
0024     def workers_list(self):
0025         job_service = saga.job.Service(self.adaptor)
0026         workers = []
0027         for j in job_service.jobs():
0028             worker = self.job_service.get_job(j)
0029             workers.append((worker, worker.state))
0030         job_service.close()
0031         return workers
0032 
0033     def _get_executable(self, list_of_pandajobs):
0034         """
0035         Prepare command line to launch payload.
0036 
0037         TODO: In general will migrate to specific worker maker
0038         :param list_of_pandajobs - list of job objects, which should be used:
0039         :return:  string to execution which will be launched
0040         """
0041         executable_arr = ["module load python"]
0042         for pj in list_of_pandajobs:
0043             executable_arr.append("aprun -d 16 -n 1 " + pj.jobParams["transformation"] + " " + pj.jobParams["jobPars"])
0044         return executable_arr
0045 
0046     def _state_change_cb(self, src_obj, fire_on, value):
0047         tmpLog = self.make_logger(baseLogger, method_name="_state_change_cb")
0048 
0049         # self._workSpec.status = self.status_translator(value)
0050         self._workSpec.set_status(self.status_translator(value))
0051         self._workSpec.force_update("status")
0052         try:
0053             tmpLog.debug(f"Created time: {src_obj.created}")
0054             tmpLog.debug(f"src obj: {src_obj}")
0055         except BaseException:
0056             tmpLog.debug("FAILED")
0057         tmpLog.info(f"Worker with BatchID={self._workSpec.batchID} workerID={self._workSpec.workerID} change state to: {self._workSpec.status}")
0058 
0059         # for compatibility with dummy monitor
0060         f = open(os.path.join(self._workSpec.accessPoint, "status.txt"), "w")
0061         f.write(self._workSpec.status)
0062         f.close()
0063 
0064         return True
0065 
0066     def _execute(self, work_spec):
0067         tmpLog = self.make_logger(baseLogger, method_name="_execute")
0068 
0069         job_service = saga.job.Service(self.adaptor)
0070 
0071         # sagadateformat_str = 'Tue Nov  7 11:31:10 2017'
0072         # sagadateformat_str = '%a %b %d %H:%M:%S %Y'
0073         try:
0074             os.chdir(work_spec.accessPoint)
0075             tmpLog.info(f"Walltime: {work_spec.maxWalltime} sec. {work_spec.maxWalltime / 60} min.")
0076             tmpLog.info(f"Cores: {work_spec.nCore}")
0077             tmpLog.debug(f"Worker directory: {work_spec.accessPoint}")
0078             jd = saga.job.Description()
0079             if self.projectname:
0080                 jd.project = self.projectname
0081             # launching job at HPC
0082 
0083             jd.wall_time_limit = work_spec.maxWalltime / 60  # minutes
0084             if work_spec.workParams in (None, "NULL"):
0085                 jd.executable = "\n".join(self._get_executable(work_spec.jobspec_list))
0086             else:
0087                 tmpLog.debug(f"Work params (executable templatae): \n{work_spec.workParams}")
0088                 exe_str = work_spec.workParams
0089                 exe_str = exe_str.format(work_dir=work_spec.accessPoint)
0090                 jd.executable = exe_str
0091                 # jd.executable = work_spec.workParams.format(work_dir=work_spec.accessPoint)
0092 
0093             tmpLog.debug(f"Command to be launched: \n{jd.executable}")
0094             jd.total_cpu_count = work_spec.nCore  # one node with 16 cores for one job
0095             jd.queue = self.localqueue
0096             jd.working_directory = work_spec.accessPoint  # working directory of task
0097             uq_prefix = f"{random.randint(0, 10000000):07}"
0098             jd.output = os.path.join(work_spec.accessPoint, f"MPI_pilot_stdout_{uq_prefix}")
0099             jd.error = os.path.join(work_spec.accessPoint, f"MPI_pilot_stderr_{uq_prefix}")
0100             work_spec.set_log_file("stdout", jd.output)
0101             work_spec.set_log_file("stderr", jd.error)
0102 
0103             # Create a new job from the job description. The initial state of
0104             # the job is 'New'.
0105             task = job_service.create_job(jd)
0106 
0107             self._workSpec = work_spec
0108             task.run()
0109             work_spec.batchID = task.id.split("-")[1][1:-1]  # SAGA have own representation, but real batch id easy to extract
0110             tmpLog.info(f"Worker ID={work_spec.workerID} with BatchID={work_spec.batchID} submitted")
0111             tmpLog.debug(f"SAGA status: {task.state}")
0112 
0113             # for compatibility with dummy monitor
0114             f = open(os.path.join(work_spec.accessPoint, "status.txt"), "w")
0115             f.write(self.status_translator(task.state))
0116             f.close()
0117 
0118             job_service.close()
0119             return 0
0120 
0121         except saga.SagaException as ex:
0122             # Catch all saga exceptions
0123             tmpLog.error(f"An exception occurred: ({ex.type}) {str(ex)} ")
0124             # Trace back the exception. That can be helpful for debugging.
0125             tmpLog.error(f"\n*** Backtrace:\n {ex.traceback}")
0126             work_spec.status = work_spec.ST_failed
0127             return -1
0128 
0129     @staticmethod
0130     def status_translator(saga_status):
0131         if saga_status == saga.job.PENDING:
0132             return ws.ST_submitted
0133         if saga_status == saga.job.RUNNING:
0134             return ws.ST_running
0135         if saga_status == saga.job.DONE:
0136             return ws.ST_finished
0137         if saga_status == saga.job.FAILED:
0138             return ws.ST_failed
0139         if saga_status == saga.job.CANCELED:
0140             return ws.ST_cancelled
0141 
0142     # submit workers
0143     def submit_workers(self, work_specs):
0144         tmpLog = self.make_logger(baseLogger, method_name="submit_workers")
0145         tmpLog.debug(f"start nWorkers={len(work_specs)}")
0146         retList = []
0147 
0148         for workSpec in work_specs:
0149             res = self._execute(workSpec)
0150             if res == 0:
0151                 retList.append((True, ""))
0152             else:
0153                 retList.append((False, "Failed to submit worker. Check logs"))
0154 
0155         tmpLog.debug("done")
0156 
0157         return retList