File indexing completed on 2026-04-11 08:41:03
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import os
0013 import time
0014
0015 from pilot.common import exception
0016 from pilot.control.payloads import generic
0017 from pilot.eventservice.workexecutor.workexecutor import WorkExecutor
0018
0019 import logging
0020 logger = logging.getLogger(__name__)
0021
0022
0023 class Executor(generic.Executor):
0024 def __init__(self, args, job, out, err, traces):
0025 super(Executor, self).__init__(args, job, out, err, traces)
0026
0027 def run_payload(self, job, cmd, out, err):
0028 """
0029 (add description)
0030
0031 :param job: job object.
0032 :param cmd: (unused in ES mode)
0033 :param out: stdout file object.
0034 :param err: stderr file object.
0035 :return:
0036 """
0037
0038 self.pre_setup(job)
0039
0040
0041 pilot_user = os.environ.get('PILOT_USER', 'atlas').lower()
0042 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0043
0044 self.post_setup(job)
0045
0046 self.utility_before_payload(job)
0047
0048 self.utility_with_payload(job)
0049
0050 try:
0051 executable = user.get_payload_command(job)
0052 except exception.PilotException:
0053 logger.fatal('could not define payload command')
0054 return None
0055
0056 logger.info("payload execution command: %s", executable)
0057
0058 try:
0059 payload = {'executable': executable, 'workdir': job.workdir, 'output_file': out, 'error_file': err, 'job': job}
0060 logger.debug("payload: %s", payload)
0061
0062 logger.info("Starting EventService WorkExecutor")
0063 executor_type = self.get_executor_type()
0064 executor = WorkExecutor(args=executor_type)
0065 executor.set_payload(payload)
0066 executor.start()
0067 logger.info("EventService WorkExecutor started")
0068
0069 logger.info("ESProcess started with pid: %s", executor.get_pid())
0070 job.pid = executor.get_pid()
0071 if job.pid:
0072 job.pgrp = os.getpgid(job.pid)
0073
0074 self.utility_after_payload_started(job)
0075 except Exception as error:
0076 logger.error('could not execute: %s', str(error))
0077 return None
0078
0079 return executor
0080
0081 def get_executor_type(self):
0082 """
0083 Get the executor type.
0084 This is usually the 'generic' type, which means normal event service. It can also be 'raythena' if specified
0085 in the Pilot options.
0086
0087 :return: executor type dictionary.
0088 """
0089
0090
0091
0092 return {'executor_type': os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic')}
0093
0094 def wait_graceful(self, args, proc):
0095 """
0096 (add description)
0097
0098 :param args:
0099 :param proc:
0100 :return:
0101 """
0102
0103 t_1 = time.time()
0104 while proc.is_alive():
0105 if args.graceful_stop.is_set():
0106 logger.debug("Graceful stop is set, stopping work executor")
0107 proc.stop()
0108 break
0109 if time.time() > t_1 + 300:
0110 logger.info("Process is still running")
0111 t_1 = time.time()
0112 time.sleep(2)
0113
0114 while proc.is_alive():
0115 time.sleep(2)
0116 exit_code = proc.get_exit_code()
0117 return exit_code