Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:04

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Wen Guan, wen.guan@cern.ch, 2017-2018
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2019
0009 
0010 import io
0011 import json
0012 import logging
0013 import os
0014 import re
0015 import subprocess
0016 import time
0017 import threading
0018 import traceback
0019 
0020 try:
0021     import Queue as queue  # noqa: N813
0022 except Exception:
0023     import queue  # Python 3
0024 
0025 from pilot.common.exception import PilotException, MessageFailure, SetupFailure, RunPayloadFailure, UnknownException
0026 from pilot.eventservice.esprocess.esmessage import MessageThread
0027 from pilot.util.container import containerise_executable
0028 from pilot.util.processes import kill_child_processes
0029 
0030 logger = logging.getLogger(__name__)
0031 
0032 """
0033 Main process to handle event service.
0034 It makes use of two hooks get_event_ranges_hook and handle_out_message_hook to communicate with other processes when
0035 it's running. The process will handle the logic of Event service independently.
0036 """
0037 
0038 
0039 class ESProcess(threading.Thread):
0040     """
0041     Main EventService Process.
0042     """
0043     def __init__(self, payload, waiting_time=30 * 60):
0044         """
0045         Init ESProcess.
0046 
0047         :param payload: a dict of {'executable': <cmd string>, 'output_file': <filename or without it>, 'error_file': <filename or without it>}
0048         """
0049         threading.Thread.__init__(self, name='esprocess')
0050 
0051         self.__message_queue = queue.Queue()
0052         self.__payload = payload
0053 
0054         self.__message_thread = None
0055         self.__process = None
0056 
0057         self.get_event_ranges_hook = None
0058         self.handle_out_message_hook = None
0059 
0060         self.__monitor_log_time = None
0061         self.is_no_more_events = False
0062         self.__no_more_event_time = None
0063         self.__waiting_time = waiting_time
0064         self.__stop = threading.Event()
0065         self.__stop_time = 180
0066         self.pid = None
0067         self.__is_payload_started = False
0068 
0069         self.__ret_code = None
0070         self.setName("ESProcess")
0071         self.corecount = 1
0072 
0073         self.event_ranges_cache = []
0074 
0075     def __del__(self):
0076         if self.__message_thread:
0077             self.__message_thread.stop()
0078 
0079     def is_payload_started(self):
0080         return self.__is_payload_started
0081 
0082     def stop(self, delay=1800):
0083         if not self.__stop.is_set():
0084             self.__stop.set()
0085             self.__stop_set_time = time.time()
0086             self.__stop_delay = delay
0087             event_ranges = "No more events"
0088             self.send_event_ranges_to_payload(event_ranges)
0089 
0090     def init_message_thread(self, socketname=None, context='local'):
0091         """
0092         init message thread.
0093 
0094         :param socket_name: name of the socket between current process and payload.
0095         :param context: name of the context between current process and payload, default is 'local'.
0096 
0097         :raises MessageFailure: when failed to init message thread.
0098         """
0099 
0100         logger.info("start to init message thread")
0101         try:
0102             self.__message_thread = MessageThread(self.__message_queue, socketname, context)
0103             self.__message_thread.start()
0104         except PilotException as e:
0105             logger.error("Failed to start message thread: %s" % e.get_detail())
0106             self.__ret_code = -1
0107         except Exception as e:
0108             logger.error("Failed to start message thread: %s" % str(e))
0109             self.__ret_code = -1
0110             raise MessageFailure(e)
0111         logger.info("finished to init message thread")
0112 
0113     def stop_message_thread(self):
0114         """
0115         Stop message thread
0116         """
0117         logger.info("Stopping message thread")
0118         if self.__message_thread:
0119             while self.__message_thread.is_alive():
0120                 if not self.__message_thread.is_stopped():
0121                     self.__message_thread.stop()
0122         logger.info("Message thread stopped")
0123 
0124     def init_yampl_socket(self, executable):
0125         socket_name = self.__message_thread.get_yampl_socket_name()
0126         if "PILOT_EVENTRANGECHANNEL" in executable:
0127             executable = "export PILOT_EVENTRANGECHANNEL=\"%s\"; " % (socket_name) + executable
0128         elif "--preExec" not in executable:
0129             executable += " --preExec \'from AthenaMP.AthenaMPFlags import jobproperties as jps;jps.AthenaMPFlags.EventRangeChannel=\"%s\"\'" % (socket_name)
0130         else:
0131             if "import jobproperties as jps" in executable:
0132                 executable = executable.replace("import jobproperties as jps;",
0133                                                 "import jobproperties as jps;jps.AthenaMPFlags.EventRangeChannel=\"%s\";" % (socket_name))
0134             else:
0135                 if "--preExec " in executable:
0136                     new_str = "--preExec \'from AthenaMP.AthenaMPFlags import jobproperties as jps;jps.AthenaMPFlags.EventRangeChannel=\"%s\"\' " % socket_name
0137                     executable = executable.replace("--preExec ", new_str)
0138                 else:
0139                     logger.warn("--preExec has an unknown format - expected \'--preExec \"\' or \"--preExec \'\", got: %s" % (executable))
0140 
0141         return executable
0142 
0143     def init_payload_process(self):
0144         """
0145         init payload process.
0146 
0147         :raise SetupFailure: when failed to init payload process.
0148         """
0149 
0150         logger.info("start to init payload process")
0151         try:
0152             try:
0153                 workdir = self.get_workdir()
0154             except Exception as e:
0155                 raise e
0156 
0157             executable = self.get_executable(workdir)
0158             output_file_fd = self.get_file(workdir, file_label='output_file', file_name='ES_payload_output.txt')
0159             error_file_fd = self.get_file(workdir, file_label='error_file', file_name='ES_payload_error.txt')
0160 
0161             # containerise executable if required
0162             if 'job' in self.__payload and self.__payload['job']:
0163                 try:
0164                     executable, diagnostics = containerise_executable(executable, job=self.__payload['job'], workdir=workdir)
0165                     if diagnostics:
0166                         msg = 'containerisation of executable failed: %s' % diagnostics
0167                         logger.warning(msg)
0168                         raise SetupFailure(msg)
0169                 except Exception as e:
0170                     msg = 'exception caught while preparing container command: %s' % e
0171                     logger.warning(msg)
0172                     raise SetupFailure(msg)
0173             else:
0174                 logger.warning('could not containerise executable')
0175 
0176             # get the process
0177             self.__process = subprocess.Popen(executable, stdout=output_file_fd, stderr=error_file_fd, shell=True)
0178             self.pid = self.__process.pid
0179             self.__payload['job'].pid = self.pid
0180             self.__is_payload_started = True
0181             logger.debug("Started new processs (executable: %s, stdout: %s, stderr: %s, pid: %s)" % (executable,
0182                                                                                                      output_file_fd,
0183                                                                                                      error_file_fd,
0184                                                                                                      self.__process.pid))
0185             if 'job' in self.__payload and self.__payload['job'] and self.__payload['job'].corecount:
0186                 self.corecount = int(self.__payload['job'].corecount)
0187         except PilotException as e:
0188             logger.error("Failed to start payload process: %s, %s" % (e.get_detail(), traceback.format_exc()))
0189             self.__ret_code = -1
0190         except Exception as e:
0191             logger.error("Failed to start payload process: %s, %s" % (str(e), traceback.format_exc()))
0192             self.__ret_code = -1
0193             raise SetupFailure(e)
0194         logger.info("finished initializing payload process")
0195 
0196     def get_file(self, workdir, file_label='output_file', file_name='ES_payload_output.txt'):
0197         """
0198         Return the requested file.
0199 
0200         :param file_label:
0201         :param workdir:
0202         :return:
0203         """
0204 
0205         try:
0206             file_type = file  # Python 2
0207         except NameError:
0208             file_type = io.IOBase  # Python 3
0209 
0210         if file_label in self.__payload:
0211             if isinstance(self.__payload[file_label], file_type):
0212                 _file_fd = self.__payload[file_label]
0213             else:
0214                 _file = self.__payload[file_label] if '/' in self.__payload[file_label] else os.path.join(workdir, self.__payload[file_label])
0215                 _file_fd = open(_file, 'w')
0216         else:
0217             _file = os.path.join(workdir, file_name)
0218             _file_fd = open(_file, 'w')
0219 
0220         return _file_fd
0221 
0222     def get_workdir(self):
0223         """
0224         Return the workdir.
0225         If the workdir is set but is not a directory, return None.
0226 
0227         :return: workdir (string or None).
0228         :raises SetupFailure: in case workdir is not a directory.
0229         """
0230 
0231         workdir = ''
0232         if 'workdir' in self.__payload:
0233             workdir = self.__payload['workdir']
0234             if not os.path.exists(workdir):
0235                 os.makedirs(workdir)
0236             elif not os.path.isdir(workdir):
0237                 raise SetupFailure('workdir exists but is not a directory')
0238         return workdir
0239 
0240     def get_executable(self, workdir):
0241         """
0242         Return the executable string.
0243 
0244         :param workdir: work directory (string).
0245         :return: executable (string).
0246         """
0247         executable = self.__payload['executable']
0248         executable = self.init_yampl_socket(executable)
0249         return 'cd %s; %s' % (workdir, executable)
0250 
0251     def set_get_event_ranges_hook(self, hook):
0252         """
0253         set get_event_ranges hook.
0254 
0255         :param hook: a hook method to get event ranges.
0256         """
0257 
0258         self.get_event_ranges_hook = hook
0259 
0260     def get_get_event_ranges_hook(self):
0261         """
0262         get get_event_ranges hook.
0263 
0264         :returns: The hook method to get event ranges.
0265         """
0266 
0267         return self.get_event_ranges_hook
0268 
0269     def set_handle_out_message_hook(self, hook):
0270         """
0271         set handle_out_message hook.
0272 
0273         :param hook: a hook method to handle payload output and error messages.
0274         """
0275 
0276         self.handle_out_message_hook = hook
0277 
0278     def get_handle_out_message_hook(self):
0279         """
0280         get handle_out_message hook.
0281 
0282         :returns: The hook method to handle payload output and error messages.
0283         """
0284 
0285         return self.handle_out_message_hook
0286 
0287     def init(self):
0288         """
0289         initialize message thread and payload process.
0290         """
0291 
0292         try:
0293             self.init_message_thread()
0294             self.init_payload_process()
0295         except Exception as e:
0296             # TODO: raise exceptions
0297             self.__ret_code = -1
0298             self.stop()
0299             raise e
0300 
0301     def monitor(self):
0302         """
0303         Monitor whether a process is dead.
0304 
0305         raises: MessageFailure: when the message thread is dead or exited.
0306                 RunPayloadFailure: when the payload process is dead or exited.
0307         """
0308 
0309         if self.__no_more_event_time and time.time() - self.__no_more_event_time > self.__waiting_time:
0310             self.__ret_code = -1
0311             raise Exception('Too long time (%s seconds) since "No more events" is injected' %
0312                             (time.time() - self.__no_more_event_time))
0313 
0314         if self.__monitor_log_time is None or self.__monitor_log_time < time.time() - 10 * 60:
0315             self.__monitor_log_time = time.time()
0316             logger.info('monitor is checking dead process.')
0317 
0318         if self.__message_thread is None:
0319             raise MessageFailure("Message thread has not started.")
0320         if not self.__message_thread.is_alive():
0321             raise MessageFailure("Message thread is not alive.")
0322 
0323         if self.__process is None:
0324             raise RunPayloadFailure("Payload process has not started.")
0325         if self.__process.poll() is not None:
0326             if self.is_no_more_events:
0327                 logger.info("Payload finished with no more events")
0328             else:
0329                 self.__ret_code = self.__process.poll()
0330                 raise RunPayloadFailure("Payload process is not alive: %s" % self.__process.poll())
0331 
0332         if self.__stop.is_set() and time.time() > self.__stop_set_time + self.__stop_delay:
0333             logger.info("Stop has been set for %s seconds, which is more than the stop wait time. Will terminate" % self.__stop_delay)
0334             self.terminate()
0335 
0336     def has_running_children(self):
0337         """
0338         Check whether it has running children
0339 
0340         :return: True if there are alive children, otherwise False
0341         """
0342         if self.__message_thread and self.__message_thread.is_alive():
0343             return True
0344         if self.__process and self.__process.poll() is None:
0345             return True
0346         return False
0347 
0348     def is_payload_running(self):
0349         """
0350         Check whether the payload is still running
0351 
0352         :return: True if the payload is running, otherwise False
0353         """
0354         if self.__process and self.__process.poll() is None:
0355             return True
0356         return False
0357 
0358     def get_event_range_to_payload(self):
0359         """
0360         Get one event range to be sent to payload
0361         """
0362         logger.debug("Number of cached event ranges: %s" % len(self.event_ranges_cache))
0363         if not self.event_ranges_cache:
0364             event_ranges = self.get_event_ranges()
0365             if event_ranges:
0366                 self.event_ranges_cache.extend(event_ranges)
0367 
0368         if self.event_ranges_cache:
0369             event_range = self.event_ranges_cache.pop(0)
0370             return event_range
0371         else:
0372             return []
0373 
0374     def get_event_ranges(self, num_ranges=None):
0375         """
0376         Calling get_event_ranges hook to get event ranges.
0377 
0378         :param num_ranges: number of event ranges to get.
0379 
0380         :raises: SetupFailure: If get_event_ranges_hook is not set.
0381                  MessageFailure: when failed to get event ranges.
0382         """
0383         if not num_ranges:
0384             num_ranges = self.corecount
0385 
0386         logger.debug('getting event ranges(num_ranges=%s)' % num_ranges)
0387         if not self.get_event_ranges_hook:
0388             raise SetupFailure("get_event_ranges_hook is not set")
0389 
0390         try:
0391             logger.debug('calling get_event_ranges hook(%s) to get event ranges.' % self.get_event_ranges_hook)
0392             event_ranges = self.get_event_ranges_hook(num_ranges)
0393             logger.debug('got event ranges: %s' % event_ranges)
0394             return event_ranges
0395         except Exception as e:
0396             raise MessageFailure("Failed to get event ranges: %s" % e)
0397 
0398     def send_event_ranges_to_payload(self, event_ranges):
0399         """
0400         Send event ranges to payload through message thread.
0401 
0402         :param event_ranges: list of event ranges.
0403         """
0404 
0405         msg = None
0406         if "No more events" in event_ranges:
0407             msg = event_ranges
0408             self.is_no_more_events = True
0409             self.__no_more_event_time = time.time()
0410         else:
0411             if type(event_ranges) is not list:
0412                 event_ranges = [event_ranges]
0413             msg = json.dumps(event_ranges)
0414         logger.debug('send event ranges to payload: %s' % msg)
0415         self.__message_thread.send(msg)
0416 
0417     def parse_out_message(self, message):
0418         """
0419         Parse output or error messages from payload.
0420 
0421         :param message: The message string received from payload.
0422 
0423         :returns: a dict {'id': <id>, 'status': <status>, 'output': <output if produced>, 'cpu': <cpu>, 'wall': <wall>, 'message': <full message>}
0424         :raises: PilotExecption: when a PilotException is caught.
0425                  UnknownException: when other unknown exception is caught.
0426         """
0427 
0428         logger.debug('parsing message: %s' % message)
0429         try:
0430             if message.startswith("/"):
0431                 parts = message.split(",")
0432                 ret = {'output': parts[0]}
0433                 parts = parts[1:]
0434                 for part in parts:
0435                     name, value = part.split(":")
0436                     name = name.lower()
0437                     ret[name] = value
0438                 ret['status'] = 'finished'
0439                 return ret
0440             elif message.startswith('ERR'):
0441                 if "ERR_ATHENAMP_PARSE" in message:
0442                     pattern = re.compile(r"(ERR\_[A-Z\_]+)\ (.+)\:\ ?(.+)")
0443                     found = re.findall(pattern, message)
0444                     event_range = found[0][1]
0445                     if "eventRangeID" in event_range:
0446                         pattern = re.compile(r"eventRangeID\'\:\ ?.?\'([0-9\-]+)")
0447                         found = re.findall(pattern, event_range)
0448                         event_range_id = found[0]
0449                         ret = {'id': event_range_id, 'status': 'failed', 'message': message}
0450                         return ret
0451                     else:
0452                         raise Exception("Failed to parse %s" % message)
0453                 else:
0454                     pattern = re.compile(r"(ERR\_[A-Z\_]+)\ ([0-9\-]+)\:\ ?(.+)")
0455                     found = re.findall(pattern, message)
0456                     event_range_id = found[0][1]
0457                     ret = {'id': event_range_id, 'status': 'failed', 'message': message}
0458                     return ret
0459             else:
0460                 raise UnknownException("Unknown message %s" % message)
0461         except PilotException as e:
0462             raise e
0463         except Exception as e:
0464             raise UnknownException(e)
0465 
0466     def handle_out_message(self, message):
0467         """
0468         Handle output or error messages from payload.
0469         Messages from payload will be parsed and the handle_out_message hook is called.
0470 
0471         :param message: The message string received from payload.
0472 
0473         :raises: SetupFailure: when handle_out_message_hook is not set.
0474                  RunPayloadFailure: when failed to handle an output or error message.
0475         """
0476 
0477         logger.debug('handling out message: %s' % message)
0478         if not self.handle_out_message_hook:
0479             raise SetupFailure("handle_out_message_hook is not set")
0480 
0481         try:
0482             message_status = self.parse_out_message(message)
0483             logger.debug('parsed out message: %s' % message_status)
0484             logger.debug('calling handle_out_message hook(%s) to handle parsed message.' % self.handle_out_message_hook)
0485             self.handle_out_message_hook(message_status)
0486         except Exception as e:
0487             raise RunPayloadFailure("Failed to handle out message: %s" % e)
0488 
0489     def handle_messages(self):
0490         """
0491         Monitor the message queue to get output or error messages from payload and response to different messages.
0492         """
0493 
0494         try:
0495             message = self.__message_queue.get(False)
0496         except queue.Empty:
0497             pass
0498         else:
0499             logger.debug('received message from payload: %s' % message)
0500             if "Ready for events" in message:
0501                 event_ranges = self.get_event_range_to_payload()
0502                 if not event_ranges:
0503                     event_ranges = "No more events"
0504                 self.send_event_ranges_to_payload(event_ranges)
0505             else:
0506                 self.handle_out_message(message)
0507 
0508     def poll(self):
0509         """
0510         poll whether the process is still running.
0511 
0512         :returns: None: still running.
0513                   0: finished successfully.
0514                   others: failed.
0515         """
0516         return self.__ret_code
0517 
0518     def terminate(self, time_to_wait=1):
0519         """
0520         Terminate running threads and processes.
0521 
0522         :param time_to_wait: integer, seconds to wait to force kill the payload process.
0523 
0524         :raises: PilotExecption: when a PilotException is caught.
0525                  UnknownException: when other unknown exception is caught.
0526         """
0527         logger.info('terminate running threads and processes.')
0528         try:
0529             self.stop()
0530             if self.__process:
0531                 if not self.__process.poll() is None:
0532                     if self.__process.poll() == 0:
0533                         logger.info("payload finished successfully.")
0534                     else:
0535                         logger.error("payload finished with error code: %s" % self.__process.poll())
0536                 else:
0537                     for i in range(time_to_wait * 10):
0538                         if not self.__process.poll() is None:
0539                             break
0540                         time.sleep(1)
0541 
0542                     if not self.__process.poll() is None:
0543                         if self.__process.poll() == 0:
0544                             logger.info("payload finished successfully.")
0545                         else:
0546                             logger.error("payload finished with error code: %s" % self.__process.poll())
0547                     else:
0548                         logger.info('terminating payload process.')
0549                         pgid = os.getpgid(self.__process.pid)
0550                         logger.info('got process group id for pid %s: %s' % (self.__process.pid, pgid))
0551                         # logger.info('send SIGTERM to process group: %s' % pgid)
0552                         # os.killpg(pgid, signal.SIGTERM)
0553                         logger.info('send SIGTERM to process: %s' % self.__process.pid)
0554                         kill_child_processes(self.__process.pid)
0555                 self.__ret_code = self.__process.poll()
0556             else:
0557                 self.__ret_code = -1
0558         except Exception as e:
0559             logger.error('Exception caught when terminating ESProcess: %s' % e)
0560             self.__ret_code = -1
0561             self.stop()
0562             raise UnknownException(e)
0563 
0564     def kill(self):
0565         """
0566         Terminate running threads and processes.
0567 
0568         :param time_to_wait: integer, seconds to wait to force kill the payload process.
0569 
0570         :raises: PilotException: when a PilotException is caught.
0571                  UnknownException: when other unknown exception is caught.
0572         """
0573         logger.info('terminate running threads and processes.')
0574         try:
0575             self.stop()
0576             if self.__process:
0577                 if not self.__process.poll() is None:
0578                     if self.__process.poll() == 0:
0579                         logger.info("payload finished successfully.")
0580                     else:
0581                         logger.error("payload finished with error code: %s" % self.__process.poll())
0582                 else:
0583                     logger.info('killing payload process.')
0584                     pgid = os.getpgid(self.__process.pid)
0585                     logger.info('got process group id for pid %s: %s' % (self.__process.pid, pgid))
0586                     # logger.info('send SIGKILL to process group: %s' % pgid)
0587                     # os.killpg(pgid, signal.SIGKILL)
0588                     logger.info('send SIGKILL to process: %s' % self.__process.pid)
0589                     kill_child_processes(self.__process.pid)
0590         except Exception as e:
0591             logger.error('Exception caught when terminating ESProcess: %s' % e)
0592             self.stop()
0593             raise UnknownException(e)
0594 
0595     def clean(self):
0596         """
0597         Clean left resources
0598         """
0599         self.terminate()
0600 
0601     def run(self):
0602         """
0603         Main run loops: monitor message thread and payload process.
0604                         handle messages from payload and response messages with injecting new event ranges or process outputs.
0605 
0606         :raises: PilotExecption: when a PilotException is caught.
0607                  UnknownException: when other unknown exception is caught.
0608         """
0609 
0610         logger.info('start esprocess with thread ident: %s' % (self.ident))
0611         logger.debug('initializing')
0612         self.init()
0613         logger.debug('initialization finished.')
0614 
0615         logger.info('starts to main loop')
0616         while self.is_payload_running():
0617             try:
0618                 self.monitor()
0619                 self.handle_messages()
0620                 time.sleep(0.01)
0621             except PilotException as e:
0622                 logger.error('PilotException caught in the main loop: %s, %s' % (e.get_detail(), traceback.format_exc()))
0623                 # TODO: define output message exception. If caught 3 output message exception, terminate
0624                 self.stop()
0625             except Exception as e:
0626                 logger.error('Exception caught in the main loop: %s, %s' % (e, traceback.format_exc()))
0627                 # TODO: catch and raise exceptions
0628                 # if catching dead process exception, terminate.
0629                 self.stop()
0630                 break
0631         self.clean()
0632         self.stop_message_thread()
0633         logger.debug('main loop finished')