File indexing completed on 2026-04-11 08:41:04
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import os
0012 import threading
0013
0014 from pilot.common.pluginfactory import PluginFactory
0015 from pilot.control.job import create_job
0016 from pilot.eventservice.communicationmanager.communicationmanager import CommunicationManager
0017 import logging
0018 logger = logging.getLogger(__name__)
0019
0020 """
0021 Base Executor with one process to manage EventService
0022 """
0023
0024
0025 class BaseExecutor(threading.Thread, PluginFactory):
0026
0027 def __init__(self, **kwargs):
0028 super(BaseExecutor, self).__init__()
0029 self.setName("BaseExecutor")
0030 self.queue = None
0031 self.payload = None
0032
0033 self.args = None
0034 for key in kwargs:
0035 setattr(self, key, kwargs[key])
0036
0037 self.__stop = threading.Event()
0038
0039 self.__event_ranges = []
0040 self.__is_set_payload = False
0041 self.__is_retrieve_payload = False
0042
0043 self.communication_manager = None
0044
0045 self.proc = None
0046
0047 def get_pid(self):
0048 return self.proc.pid if self.proc else None
0049
0050 def __del__(self):
0051 self.stop()
0052 if self.communication_manager:
0053 self.communication_manager.stop()
0054
0055 def is_payload_started(self):
0056 return False
0057
0058 def start(self):
0059 super(BaseExecutor, self).start()
0060 self.communication_manager = CommunicationManager()
0061 self.communication_manager.start()
0062
0063 def stop(self):
0064 if not self.is_stop():
0065 self.__stop.set()
0066
0067 def is_stop(self):
0068 return self.__stop.is_set()
0069
0070 def stop_communicator(self):
0071 logger.info("Stopping communication manager")
0072 if self.communication_manager:
0073 while self.communication_manager.is_alive():
0074 if not self.communication_manager.is_stop():
0075 self.communication_manager.stop()
0076 logger.info("Communication manager stopped")
0077
0078 def set_payload(self, payload):
0079 self.payload = payload
0080 self.__is_set_payload = True
0081 job = self.get_job()
0082 if job and job.workdir:
0083 os.chdir(job.workdir)
0084
0085 def is_set_payload(self):
0086 return self.__is_set_payload
0087
0088 def set_retrieve_payload(self):
0089 self.__is_retrieve_payload = True
0090
0091 def is_retrieve_payload(self):
0092 return self.__is_retrieve_payload
0093
0094 def retrieve_payload(self):
0095 logger.info("Retrieving payload: %s" % self.args)
0096 jobs = self.communication_manager.get_jobs(njobs=1, args=self.args)
0097 logger.info("Received jobs: %s" % jobs)
0098 if jobs:
0099 job = create_job(jobs[0], queue=self.queue)
0100
0101
0102 pilot_user = os.environ.get('PILOT_USER', 'atlas').lower()
0103 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0104 cmd = user.get_payload_command(job)
0105 logger.info("payload execution command: %s" % cmd)
0106
0107 payload = {'executable': cmd,
0108 'workdir': job.workdir,
0109 'job': job}
0110 logger.info("Retrieved payload: %s" % payload)
0111 return payload
0112 return None
0113
0114 def get_payload(self):
0115 if self.__is_set_payload:
0116 return self.payload
0117
0118 def get_job(self):
0119 return self.payload['job'] if self.payload and 'job' in list(self.payload.keys()) else None
0120
0121 def get_event_ranges(self, num_event_ranges=1, queue_factor=2):
0122 if os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'raythena':
0123 old_queue_factor = queue_factor
0124 queue_factor = 1
0125 logger.info("raythena - Changing queue_factor from %s to %s" % (old_queue_factor, queue_factor))
0126 logger.info("Getting event ranges: (num_ranges: %s) (queue_factor: %s)" % (num_event_ranges, queue_factor))
0127 if len(self.__event_ranges) < num_event_ranges:
0128 ret = self.communication_manager.get_event_ranges(num_event_ranges=num_event_ranges * queue_factor, job=self.get_job())
0129 for event_range in ret:
0130 self.__event_ranges.append(event_range)
0131
0132 ret = []
0133 for _ in range(num_event_ranges):
0134 if len(self.__event_ranges) > 0:
0135 event_range = self.__event_ranges.pop(0)
0136 ret.append(event_range)
0137 logger.info("Received event ranges(num:%s): %s" % (len(ret), ret))
0138 return ret
0139
0140 def update_events(self, messages):
0141 logger.info("Updating event ranges: %s" % messages)
0142 ret = self.communication_manager.update_events(messages)
0143 logger.info("Updated event ranges status: %s" % ret)
0144 return ret
0145
0146 def update_jobs(self, jobs):
0147 logger.info("Updating jobs: %s" % jobs)
0148 ret = self.communication_manager.update_jobs(jobs)
0149 logger.info("Updated jobs status: %s" % ret)
0150 return ret
0151
0152 def run(self):
0153 """
0154 Main run process
0155 """
0156 raise NotImplementedError()