File indexing completed on 2026-04-19 08:00:04
0001 import os
0002 import random
0003
0004 import radical.utils
0005 import saga
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestercore.work_spec import WorkSpec as ws
0009
0010
0011 baseLogger = core_utils.setup_logger("saga_submitter")
0012
0013
0014
0015
0016 class SAGASubmitter(PluginBase):
0017
0018
0019 def __init__(self, **kwarg):
0020 PluginBase.__init__(self, **kwarg)
0021 tmpLog = self.make_logger(baseLogger, method_name="__init__")
0022 tmpLog.info(f"[{self.adaptor}] SAGA adaptor will be used")
0023
0024 def workers_list(self):
0025 job_service = saga.job.Service(self.adaptor)
0026 workers = []
0027 for j in job_service.jobs():
0028 worker = self.job_service.get_job(j)
0029 workers.append((worker, worker.state))
0030 job_service.close()
0031 return workers
0032
0033 def _get_executable(self, list_of_pandajobs):
0034 """
0035 Prepare command line to launch payload.
0036
0037 TODO: In general will migrate to specific worker maker
0038 :param list_of_pandajobs - list of job objects, which should be used:
0039 :return: string to execution which will be launched
0040 """
0041 executable_arr = ["module load python"]
0042 for pj in list_of_pandajobs:
0043 executable_arr.append("aprun -d 16 -n 1 " + pj.jobParams["transformation"] + " " + pj.jobParams["jobPars"])
0044 return executable_arr
0045
0046 def _state_change_cb(self, src_obj, fire_on, value):
0047 tmpLog = self.make_logger(baseLogger, method_name="_state_change_cb")
0048
0049
0050 self._workSpec.set_status(self.status_translator(value))
0051 self._workSpec.force_update("status")
0052 try:
0053 tmpLog.debug(f"Created time: {src_obj.created}")
0054 tmpLog.debug(f"src obj: {src_obj}")
0055 except BaseException:
0056 tmpLog.debug("FAILED")
0057 tmpLog.info(f"Worker with BatchID={self._workSpec.batchID} workerID={self._workSpec.workerID} change state to: {self._workSpec.status}")
0058
0059
0060 f = open(os.path.join(self._workSpec.accessPoint, "status.txt"), "w")
0061 f.write(self._workSpec.status)
0062 f.close()
0063
0064 return True
0065
0066 def _execute(self, work_spec):
0067 tmpLog = self.make_logger(baseLogger, method_name="_execute")
0068
0069 job_service = saga.job.Service(self.adaptor)
0070
0071
0072
0073 try:
0074 os.chdir(work_spec.accessPoint)
0075 tmpLog.info(f"Walltime: {work_spec.maxWalltime} sec. {work_spec.maxWalltime / 60} min.")
0076 tmpLog.info(f"Cores: {work_spec.nCore}")
0077 tmpLog.debug(f"Worker directory: {work_spec.accessPoint}")
0078 jd = saga.job.Description()
0079 if self.projectname:
0080 jd.project = self.projectname
0081
0082
0083 jd.wall_time_limit = work_spec.maxWalltime / 60
0084 if work_spec.workParams in (None, "NULL"):
0085 jd.executable = "\n".join(self._get_executable(work_spec.jobspec_list))
0086 else:
0087 tmpLog.debug(f"Work params (executable templatae): \n{work_spec.workParams}")
0088 exe_str = work_spec.workParams
0089 exe_str = exe_str.format(work_dir=work_spec.accessPoint)
0090 jd.executable = exe_str
0091
0092
0093 tmpLog.debug(f"Command to be launched: \n{jd.executable}")
0094 jd.total_cpu_count = work_spec.nCore
0095 jd.queue = self.localqueue
0096 jd.working_directory = work_spec.accessPoint
0097 uq_prefix = f"{random.randint(0, 10000000):07}"
0098 jd.output = os.path.join(work_spec.accessPoint, f"MPI_pilot_stdout_{uq_prefix}")
0099 jd.error = os.path.join(work_spec.accessPoint, f"MPI_pilot_stderr_{uq_prefix}")
0100 work_spec.set_log_file("stdout", jd.output)
0101 work_spec.set_log_file("stderr", jd.error)
0102
0103
0104
0105 task = job_service.create_job(jd)
0106
0107 self._workSpec = work_spec
0108 task.run()
0109 work_spec.batchID = task.id.split("-")[1][1:-1]
0110 tmpLog.info(f"Worker ID={work_spec.workerID} with BatchID={work_spec.batchID} submitted")
0111 tmpLog.debug(f"SAGA status: {task.state}")
0112
0113
0114 f = open(os.path.join(work_spec.accessPoint, "status.txt"), "w")
0115 f.write(self.status_translator(task.state))
0116 f.close()
0117
0118 job_service.close()
0119 return 0
0120
0121 except saga.SagaException as ex:
0122
0123 tmpLog.error(f"An exception occurred: ({ex.type}) {str(ex)} ")
0124
0125 tmpLog.error(f"\n*** Backtrace:\n {ex.traceback}")
0126 work_spec.status = work_spec.ST_failed
0127 return -1
0128
0129 @staticmethod
0130 def status_translator(saga_status):
0131 if saga_status == saga.job.PENDING:
0132 return ws.ST_submitted
0133 if saga_status == saga.job.RUNNING:
0134 return ws.ST_running
0135 if saga_status == saga.job.DONE:
0136 return ws.ST_finished
0137 if saga_status == saga.job.FAILED:
0138 return ws.ST_failed
0139 if saga_status == saga.job.CANCELED:
0140 return ws.ST_cancelled
0141
0142
0143 def submit_workers(self, work_specs):
0144 tmpLog = self.make_logger(baseLogger, method_name="submit_workers")
0145 tmpLog.debug(f"start nWorkers={len(work_specs)}")
0146 retList = []
0147
0148 for workSpec in work_specs:
0149 res = self._execute(workSpec)
0150 if res == 0:
0151 retList.append((True, ""))
0152 else:
0153 retList.append((False, "Failed to submit worker. Check logs"))
0154
0155 tmpLog.debug("done")
0156
0157 return retList