Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:04

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Miha Muskinja, miha.muskinja@cern.ch, 2020
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2020
0010 
0011 import json
0012 import os
0013 import time
0014 import traceback
0015 
0016 from pilot.common.errorcodes import ErrorCodes
0017 from pilot.eventservice.esprocess.esprocess import ESProcess
0018 from pilot.info.filespec import FileSpec
0019 from pilot.util.filehandling import calculate_checksum, move
0020 
0021 from .baseexecutor import BaseExecutor
0022 
0023 import logging
0024 logger = logging.getLogger(__name__)
0025 
0026 errors = ErrorCodes()
0027 
0028 """
0029 Raythena Executor with one process to manage EventService
0030 """
0031 
0032 
0033 class RaythenaExecutor(BaseExecutor):
0034     def __init__(self, **kwargs):
0035         super(RaythenaExecutor, self).__init__(**kwargs)
0036         self.setName("RaythenaExecutor")
0037 
0038         self.__queued_out_messages = []
0039         self.__last_stageout_time = None
0040         self.__all_out_messages = []
0041 
0042         self.proc = None
0043         self.exit_code = None
0044 
0045     def is_payload_started(self):
0046         return self.proc.is_payload_started() if self.proc else False
0047 
0048     def get_pid(self):
0049         return self.proc.pid if self.proc else None
0050 
0051     def get_exit_code(self):
0052         return self.exit_code
0053 
0054     def create_file_spec(self, pfn):
0055         checksum = calculate_checksum(pfn)
0056         filesize = os.path.getsize(pfn)
0057         file_data = {'scope': 'transient',
0058                      'lfn': os.path.basename(pfn),
0059                      'checksum': checksum,
0060                      'filesize': filesize,
0061                      }
0062         file_spec = FileSpec(filetype='output', **file_data)
0063         return file_spec
0064 
0065     def move_output(self, pfn):
0066         """
0067         Move output file from given PFN path to PILOT_OUTPUT_DIR if set.
0068 
0069         :param pfn: physical file name (string).
0070         :return:
0071         """
0072 
0073         outputdir = os.environ.get('PILOT_OUTPUT_DIR', None)
0074         if outputdir:
0075             try:
0076                 move(pfn, outputdir)
0077             except Exception as e:
0078                 logger.warning('failed to move output: %s' % e)
0079 
0080     def update_finished_event_ranges(self, out_messagess):
0081         """
0082         Update finished event ranges
0083 
0084         :param out_messages: messages from AthenaMP.
0085         """
0086 
0087         logger.info("update_finished_event_ranges:")
0088 
0089         if len(out_messagess) == 0:
0090             return
0091 
0092         event_ranges = []
0093         for out_msg in out_messagess:
0094             fspec = self.create_file_spec(out_msg['output'])
0095             event_range_status = {"eventRangeID": out_msg['id'], "eventStatus": 'finished', "pfn": out_msg['output'], "fsize": fspec.filesize}
0096             for checksum_key in fspec.checksum:
0097                 event_range_status[checksum_key] = fspec.checksum[checksum_key]
0098             event_ranges.append(event_range_status)
0099 
0100             # move the output to a common area if necessary
0101             self.move_output(out_msg['output'])
0102 
0103         event_ranges_status = {"esOutput": {"numEvents": len(event_ranges)}, "eventRanges": event_ranges}
0104         event_range_message = {'version': 1, 'eventRanges': json.dumps([event_ranges_status])}
0105         self.update_events(event_range_message)
0106 
0107         job = self.get_job()
0108         job.nevents += len(event_ranges)
0109 
0110     def update_failed_event_ranges(self, out_messagess):
0111         """
0112         Update failed event ranges
0113 
0114         :param out_messages: messages from AthenaMP.
0115         """
0116         if len(out_messagess) == 0:
0117             return
0118 
0119         event_ranges = []
0120         for message in out_messagess:
0121             status = message['status'] if message['status'] in ['failed', 'fatal'] else 'failed'
0122             # ToBeFixed errorCode
0123             event_ranges.append({"errorCode": errors.UNKNOWNPAYLOADFAILURE, "eventRangeID": message['id'], "eventStatus": status})
0124             event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)}
0125             self.update_events(event_range_message)
0126 
0127     def handle_out_message(self, message):
0128         """
0129         Handle ES output or error messages hook function for tests.
0130 
0131         :param message: a dict of parsed message.
0132                         For 'finished' event ranges, it's {'id': <id>, 'status': 'finished', 'output': <output>, 'cpu': <cpu>,
0133                                                            'wall': <wall>, 'message': <full message>}.
0134                         Fro 'failed' event ranges, it's {'id': <id>, 'status': 'failed', 'message': <full message>}.
0135         """
0136 
0137         logger.info("Handling out message: %s" % message)
0138 
0139         self.__all_out_messages.append(message)
0140 
0141         if message['status'] in ['failed', 'fatal']:
0142             self.update_failed_event_ranges([message])
0143         else:
0144             self.__queued_out_messages.append(message)
0145 
0146     def stageout_es(self, force=False):
0147         """
0148         Stage out event service outputs.
0149 
0150         """
0151 
0152         job = self.get_job()
0153         # logger.info("job.infosys.queuedata.es_stageout_gap: %s" % job.infosys.queuedata.es_stageout_gap)
0154         if len(self.__queued_out_messages):
0155             if force or self.__last_stageout_time is None or (time.time() > self.__last_stageout_time + job.infosys.queuedata.es_stageout_gap):
0156                 out_messages = []
0157                 while len(self.__queued_out_messages) > 0:
0158                     out_messages.append(self.__queued_out_messages.pop())
0159                 self.update_finished_event_ranges(out_messages)
0160 
0161     def clean(self):
0162         """
0163         Clean temp produced files
0164         """
0165 
0166         logger.info("shutting down...")
0167 
0168         self.__queued_out_messages = []
0169         self.__last_stageout_time = None
0170         self.__all_out_messages = []
0171 
0172         if self.proc:
0173             self.proc.stop()
0174             while self.proc.is_alive():
0175                 time.sleep(0.1)
0176 
0177         self.stop_communicator()
0178 
0179     def run(self):
0180         """
0181         Initialize and run ESProcess.
0182         """
0183         try:
0184             logger.info("starting ES RaythenaExecutor with thread ident: %s" % self.ident)
0185             if self.is_set_payload():
0186                 payload = self.get_payload()
0187             elif self.is_retrieve_payload():
0188                 payload = self.retrieve_payload()
0189             else:
0190                 logger.error("Payload is not set but is_retrieve_payload is also not set. No payloads.")
0191 
0192             logger.info("payload: %s" % payload)
0193 
0194             logger.info("Starting ESProcess")
0195             proc = ESProcess(payload, waiting_time=999999)
0196             self.proc = proc
0197             logger.info("ESProcess initialized")
0198 
0199             proc.set_get_event_ranges_hook(self.get_event_ranges)
0200             proc.set_handle_out_message_hook(self.handle_out_message)
0201 
0202             logger.info('ESProcess starts to run')
0203             proc.start()
0204             logger.info('ESProcess started to run')
0205 
0206             exit_code = None
0207             try:
0208                 iteration = long(0)  # Python 2  # noqa: F821
0209             except Exception:
0210                 iteration = 0  # Python 3
0211             while proc.is_alive():
0212                 iteration += 1
0213                 if self.is_stop():
0214                     logger.info('Stop is set. breaking -- stop process pid=%s' % proc.pid)
0215                     proc.stop()
0216                     break
0217                 self.stageout_es()
0218 
0219                 exit_code = proc.poll()
0220                 if iteration % 60 == 0:
0221                     logger.info('running: iteration=%d pid=%s exit_code=%s' % (iteration, proc.pid, exit_code))
0222                 time.sleep(5)
0223 
0224             while proc.is_alive():
0225                 time.sleep(1)
0226             logger.info("ESProcess finished")
0227 
0228             self.stageout_es(force=True)
0229             self.clean()
0230 
0231             self.exit_code = proc.poll()
0232 
0233         except Exception as e:
0234             logger.error('Execute payload failed: %s, %s' % (e, traceback.format_exc()))
0235             self.clean()
0236             self.exit_code = -1
0237         logger.info('ES raythena executor finished')