Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /pilot2/pilot/eventservice/workexecutor/workexecutor.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Wen Guan, wen.guan@cern.ch, 2018
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2019
0010 
0011 
0012 import time
0013 
0014 from pilot.common import exception
0015 from pilot.common.pluginfactory import PluginFactory
0016 
0017 import logging
0018 logger = logging.getLogger(__name__)
0019 
0020 """
0021 Main class to manage the event service work.
0022 """
0023 
0024 
0025 class WorkExecutor(PluginFactory):
0026 
0027     def __init__(self, args=None):
0028         super(WorkExecutor, self).__init__()
0029         self.payload = None
0030         self.plugin = None
0031         self.is_retrieve_payload = False
0032         self.args = args
0033         self.pid = None
0034 
0035     def get_pid(self):
0036         return self.plugin.get_pid() if self.plugin else None
0037 
0038     def set_payload(self, payload):
0039         self.payload = payload
0040 
0041     def set_retrieve_paylaod(self):
0042         self.is_retrieve_payload = True
0043 
0044     def get_payload(self):
0045         return self.payload
0046 
0047     def get_plugin_confs(self):
0048         plugin_confs = {}
0049         if self.args and 'executor_type' in list(self.args.keys()):  # Python 2/3
0050             if self.args['executor_type'] == 'hpo':
0051                 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.hpoexecutor.HPOExecutor'}
0052             elif self.args['executor_type'] == 'raythena':
0053                 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.raythenaexecutor.RaythenaExecutor'}
0054             elif self.args['executor_type'] == 'generic':
0055                 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor'}
0056             elif self.args['executor_type'] == 'base':
0057                 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.baseexecutor.BaseExecutor'}
0058             elif self.args['executor_type'] == 'nl':  # network-less
0059                 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.nlexecutor.NLExecutor'}
0060             elif self.args['executor_type'] == 'boinc':
0061                 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.boincexecutor.BOINCExecutor'}
0062             elif self.args['executor_type'] == 'hammercloud':  # hammercloud test: refine normal simul to ES
0063                 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.hammercloudexecutor.HammerCloudExecutor'}
0064             elif self.args['executor_type'] == 'mpi':  # network-less
0065                 plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.mpiexecutor.MPIExecutor'}
0066         else:
0067             plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor'}
0068 
0069         plugin_confs['args'] = self.args
0070         return plugin_confs
0071 
0072     def start(self):
0073         plugin_confs = self.get_plugin_confs()
0074         logger.info("Plugin confs: %s" % plugin_confs)
0075         self.plugin = self.get_plugin(plugin_confs)
0076         logger.info("WorkExecutor started with plugin: %s" % self.plugin)
0077         if not self.plugin:
0078             raise exception.SetupFailure("No available executor plugin.")
0079 
0080         if self.is_retrieve_payload:
0081             self.payload = self.plugin.set_retrieve_payload()
0082         else:
0083             if not self.get_payload():
0084                 raise exception.SetupFailure("Payload is not assigned.")
0085             else:
0086                 self.plugin.set_payload(self.get_payload())
0087 
0088         logger.info("Starting plugin: %s" % self.plugin)
0089         self.plugin.start()
0090         logger.info("Waiting for payload to start")
0091         while self.plugin.is_alive():
0092             if self.plugin.is_payload_started():
0093                 logger.info("Payload started with pid: %s" % self.get_pid())
0094                 break
0095             time.sleep(1)
0096 
0097     def stop(self):
0098         if not self.plugin:
0099             raise exception.SetupFailure("No available executor plugin.")
0100         return self.plugin.stop()
0101 
0102     def is_alive(self):
0103         if not self.plugin:
0104             raise exception.SetupFailure("No available executor plugin.")
0105         return self.plugin.is_alive()
0106 
0107     def get_exit_code(self):
0108         if not self.plugin:
0109             raise exception.SetupFailure("No available executor plugin.")
0110         return self.plugin.get_exit_code()
0111 
0112     def get_event_ranges(self):
0113         if not self.plugin:
0114             raise exception.SetupFailure("No available executor plugin.")
0115         return self.plugin.get_event_ranges()
0116 
0117     def update_events(self, messages):
0118         if not self.plugin:
0119             raise exception.SetupFailure("No available executor plugin.")
0120         return self.plugin.update_events(messages)