Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:04

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Wen Guan, wen.guan@cern.ch, 2018
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2019
0010 
0011 import os
0012 import threading
0013 
0014 from pilot.common.pluginfactory import PluginFactory
0015 from pilot.control.job import create_job
0016 from pilot.eventservice.communicationmanager.communicationmanager import CommunicationManager
0017 import logging
0018 logger = logging.getLogger(__name__)
0019 
0020 """
0021 Base Executor with one process to manage EventService
0022 """
0023 
0024 
0025 class BaseExecutor(threading.Thread, PluginFactory):
0026 
0027     def __init__(self, **kwargs):
0028         super(BaseExecutor, self).__init__()
0029         self.setName("BaseExecutor")
0030         self.queue = None
0031         self.payload = None
0032 
0033         self.args = None
0034         for key in kwargs:
0035             setattr(self, key, kwargs[key])
0036 
0037         self.__stop = threading.Event()
0038 
0039         self.__event_ranges = []
0040         self.__is_set_payload = False
0041         self.__is_retrieve_payload = False
0042 
0043         self.communication_manager = None
0044 
0045         self.proc = None
0046 
0047     def get_pid(self):
0048         return self.proc.pid if self.proc else None
0049 
0050     def __del__(self):
0051         self.stop()
0052         if self.communication_manager:
0053             self.communication_manager.stop()
0054 
0055     def is_payload_started(self):
0056         return False
0057 
0058     def start(self):
0059         super(BaseExecutor, self).start()
0060         self.communication_manager = CommunicationManager()
0061         self.communication_manager.start()
0062 
0063     def stop(self):
0064         if not self.is_stop():
0065             self.__stop.set()
0066 
0067     def is_stop(self):
0068         return self.__stop.is_set()
0069 
0070     def stop_communicator(self):
0071         logger.info("Stopping communication manager")
0072         if self.communication_manager:
0073             while self.communication_manager.is_alive():
0074                 if not self.communication_manager.is_stop():
0075                     self.communication_manager.stop()
0076         logger.info("Communication manager stopped")
0077 
0078     def set_payload(self, payload):
0079         self.payload = payload
0080         self.__is_set_payload = True
0081         job = self.get_job()
0082         if job and job.workdir:
0083             os.chdir(job.workdir)
0084 
0085     def is_set_payload(self):
0086         return self.__is_set_payload
0087 
0088     def set_retrieve_payload(self):
0089         self.__is_retrieve_payload = True
0090 
0091     def is_retrieve_payload(self):
0092         return self.__is_retrieve_payload
0093 
0094     def retrieve_payload(self):
0095         logger.info("Retrieving payload: %s" % self.args)
0096         jobs = self.communication_manager.get_jobs(njobs=1, args=self.args)
0097         logger.info("Received jobs: %s" % jobs)
0098         if jobs:
0099             job = create_job(jobs[0], queue=self.queue)
0100 
0101             # get the payload command from the user specific code
0102             pilot_user = os.environ.get('PILOT_USER', 'atlas').lower()
0103             user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0104             cmd = user.get_payload_command(job)
0105             logger.info("payload execution command: %s" % cmd)
0106 
0107             payload = {'executable': cmd,
0108                        'workdir': job.workdir,
0109                        'job': job}
0110             logger.info("Retrieved payload: %s" % payload)
0111             return payload
0112         return None
0113 
0114     def get_payload(self):
0115         if self.__is_set_payload:
0116             return self.payload
0117 
0118     def get_job(self):
0119         return self.payload['job'] if self.payload and 'job' in list(self.payload.keys()) else None  # Python 2/3
0120 
0121     def get_event_ranges(self, num_event_ranges=1, queue_factor=2):
0122         if os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'raythena':
0123             old_queue_factor = queue_factor
0124             queue_factor = 1
0125             logger.info("raythena - Changing queue_factor from %s to %s" % (old_queue_factor, queue_factor))
0126         logger.info("Getting event ranges: (num_ranges: %s) (queue_factor: %s)" % (num_event_ranges, queue_factor))
0127         if len(self.__event_ranges) < num_event_ranges:
0128             ret = self.communication_manager.get_event_ranges(num_event_ranges=num_event_ranges * queue_factor, job=self.get_job())
0129             for event_range in ret:
0130                 self.__event_ranges.append(event_range)
0131 
0132         ret = []
0133         for _ in range(num_event_ranges):
0134             if len(self.__event_ranges) > 0:
0135                 event_range = self.__event_ranges.pop(0)
0136                 ret.append(event_range)
0137         logger.info("Received event ranges(num:%s): %s" % (len(ret), ret))
0138         return ret
0139 
0140     def update_events(self, messages):
0141         logger.info("Updating event ranges: %s" % messages)
0142         ret = self.communication_manager.update_events(messages)
0143         logger.info("Updated event ranges status: %s" % ret)
0144         return ret
0145 
0146     def update_jobs(self, jobs):
0147         logger.info("Updating jobs: %s" % jobs)
0148         ret = self.communication_manager.update_jobs(jobs)
0149         logger.info("Updated jobs status: %s" % ret)
0150         return ret
0151 
0152     def run(self):
0153         """
0154         Main run process
0155         """
0156         raise NotImplementedError()