Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:03

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Wen Guan, wen.guan@cern.ch, 2017-2018
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2021
0010 
0011 
0012 import os
0013 import time
0014 
0015 from pilot.common import exception
0016 from pilot.control.payloads import generic
0017 from pilot.eventservice.workexecutor.workexecutor import WorkExecutor
0018 
0019 import logging
0020 logger = logging.getLogger(__name__)
0021 
0022 
0023 class Executor(generic.Executor):
0024     def __init__(self, args, job, out, err, traces):
0025         super(Executor, self).__init__(args, job, out, err, traces)
0026 
0027     def run_payload(self, job, cmd, out, err):
0028         """
0029         (add description)
0030 
0031         :param job: job object.
0032         :param cmd: (unused in ES mode)
0033         :param out: stdout file object.
0034         :param err: stderr file object.
0035         :return:
0036         """
0037 
0038         self.pre_setup(job)
0039 
0040         # get the payload command from the user specific code
0041         pilot_user = os.environ.get('PILOT_USER', 'atlas').lower()
0042         user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0043 
0044         self.post_setup(job)
0045 
0046         self.utility_before_payload(job)
0047 
0048         self.utility_with_payload(job)
0049 
0050         try:
0051             executable = user.get_payload_command(job)
0052         except exception.PilotException:
0053             logger.fatal('could not define payload command')
0054             return None
0055 
0056         logger.info("payload execution command: %s", executable)
0057 
0058         try:
0059             payload = {'executable': executable, 'workdir': job.workdir, 'output_file': out, 'error_file': err, 'job': job}
0060             logger.debug("payload: %s", payload)
0061 
0062             logger.info("Starting EventService WorkExecutor")
0063             executor_type = self.get_executor_type()
0064             executor = WorkExecutor(args=executor_type)
0065             executor.set_payload(payload)
0066             executor.start()
0067             logger.info("EventService WorkExecutor started")
0068 
0069             logger.info("ESProcess started with pid: %s", executor.get_pid())
0070             job.pid = executor.get_pid()
0071             if job.pid:
0072                 job.pgrp = os.getpgid(job.pid)
0073 
0074             self.utility_after_payload_started(job)
0075         except Exception as error:
0076             logger.error('could not execute: %s', str(error))
0077             return None
0078 
0079         return executor
0080 
0081     def get_executor_type(self):
0082         """
0083         Get the executor type.
0084         This is usually the 'generic' type, which means normal event service. It can also be 'raythena' if specified
0085         in the Pilot options.
0086 
0087         :return: executor type dictionary.
0088         """
0089 
0090         # executor_type = 'hpo' if job.is_hpo else os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic')
0091         # return {'executor_type': executor_type}
0092         return {'executor_type': os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic')}
0093 
0094     def wait_graceful(self, args, proc):
0095         """
0096         (add description)
0097 
0098         :param args:
0099         :param proc:
0100         :return:
0101         """
0102 
0103         t_1 = time.time()
0104         while proc.is_alive():
0105             if args.graceful_stop.is_set():
0106                 logger.debug("Graceful stop is set, stopping work executor")
0107                 proc.stop()
0108                 break
0109             if time.time() > t_1 + 300:  # 5 minutes
0110                 logger.info("Process is still running")
0111                 t_1 = time.time()
0112             time.sleep(2)
0113 
0114         while proc.is_alive():
0115             time.sleep(2)
0116         exit_code = proc.get_exit_code()
0117         return exit_code