Warning, file /pilot2/pilot/eventservice/workexecutor/workexecutor.py was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import time
0013
0014 from pilot.common import exception
0015 from pilot.common.pluginfactory import PluginFactory
0016
0017 import logging
0018 logger = logging.getLogger(__name__)
0019
0020 """
0021 Main class to manage the event service work.
0022 """
0023
0024
0025 class WorkExecutor(PluginFactory):
0026
0027 def __init__(self, args=None):
0028 super(WorkExecutor, self).__init__()
0029 self.payload = None
0030 self.plugin = None
0031 self.is_retrieve_payload = False
0032 self.args = args
0033 self.pid = None
0034
0035 def get_pid(self):
0036 return self.plugin.get_pid() if self.plugin else None
0037
0038 def set_payload(self, payload):
0039 self.payload = payload
0040
0041 def set_retrieve_paylaod(self):
0042 self.is_retrieve_payload = True
0043
0044 def get_payload(self):
0045 return self.payload
0046
0047 def get_plugin_confs(self):
0048 plugin_confs = {}
0049 if self.args and 'executor_type' in list(self.args.keys()):
0050 if self.args['executor_type'] == 'hpo':
0051 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.hpoexecutor.HPOExecutor'}
0052 elif self.args['executor_type'] == 'raythena':
0053 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.raythenaexecutor.RaythenaExecutor'}
0054 elif self.args['executor_type'] == 'generic':
0055 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor'}
0056 elif self.args['executor_type'] == 'base':
0057 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.baseexecutor.BaseExecutor'}
0058 elif self.args['executor_type'] == 'nl':
0059 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.nlexecutor.NLExecutor'}
0060 elif self.args['executor_type'] == 'boinc':
0061 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.boincexecutor.BOINCExecutor'}
0062 elif self.args['executor_type'] == 'hammercloud':
0063 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.hammercloudexecutor.HammerCloudExecutor'}
0064 elif self.args['executor_type'] == 'mpi':
0065 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.mpiexecutor.MPIExecutor'}
0066 else:
0067 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor'}
0068
0069 plugin_confs['args'] = self.args
0070 return plugin_confs
0071
0072 def start(self):
0073 plugin_confs = self.get_plugin_confs()
0074 logger.info("Plugin confs: %s" % plugin_confs)
0075 self.plugin = self.get_plugin(plugin_confs)
0076 logger.info("WorkExecutor started with plugin: %s" % self.plugin)
0077 if not self.plugin:
0078 raise exception.SetupFailure("No available executor plugin.")
0079
0080 if self.is_retrieve_payload:
0081 self.payload = self.plugin.set_retrieve_payload()
0082 else:
0083 if not self.get_payload():
0084 raise exception.SetupFailure("Payload is not assigned.")
0085 else:
0086 self.plugin.set_payload(self.get_payload())
0087
0088 logger.info("Starting plugin: %s" % self.plugin)
0089 self.plugin.start()
0090 logger.info("Waiting for payload to start")
0091 while self.plugin.is_alive():
0092 if self.plugin.is_payload_started():
0093 logger.info("Payload started with pid: %s" % self.get_pid())
0094 break
0095 time.sleep(1)
0096
0097 def stop(self):
0098 if not self.plugin:
0099 raise exception.SetupFailure("No available executor plugin.")
0100 return self.plugin.stop()
0101
0102 def is_alive(self):
0103 if not self.plugin:
0104 raise exception.SetupFailure("No available executor plugin.")
0105 return self.plugin.is_alive()
0106
0107 def get_exit_code(self):
0108 if not self.plugin:
0109 raise exception.SetupFailure("No available executor plugin.")
0110 return self.plugin.get_exit_code()
0111
0112 def get_event_ranges(self):
0113 if not self.plugin:
0114 raise exception.SetupFailure("No available executor plugin.")
0115 return self.plugin.get_event_ranges()
0116
0117 def update_events(self, messages):
0118 if not self.plugin:
0119 raise exception.SetupFailure("No available executor plugin.")
0120 return self.plugin.update_events(messages)