Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:04

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2020
0009 
0010 import json
0011 import os
0012 import time
0013 import traceback
0014 
0015 from pilot.common.errorcodes import ErrorCodes
0016 from pilot.eventservice.esprocess.esprocess import ESProcess
0017 from pilot.info.filespec import FileSpec
0018 from pilot.util.filehandling import calculate_checksum
0019 
0020 from .baseexecutor import BaseExecutor
0021 
0022 import logging
0023 logger = logging.getLogger(__name__)
0024 
0025 errors = ErrorCodes()
0026 
0027 """
0028 HPO Executor
0029 """
0030 
0031 
0032 class HPOExecutor(BaseExecutor):
0033     def __init__(self, **kwargs):
0034         super(HPOExecutor, self).__init__(**kwargs)
0035         self.setName("HPOExecutor")
0036 
0037         self.__queued_out_messages = []
0038         self.__last_stageout_time = None
0039         self.__all_out_messages = []
0040 
0041         self.proc = None
0042         self.exit_code = None
0043 
0044     def is_payload_started(self):
0045         return self.proc.is_payload_started() if self.proc else False
0046 
0047     def get_pid(self):
0048         return self.proc.pid if self.proc else None
0049 
0050     def get_exit_code(self):
0051         return self.exit_code
0052 
0053     def create_file_spec(self, pfn):
0054         checksum = calculate_checksum(pfn)
0055         filesize = os.path.getsize(pfn)
0056         file_data = {'scope': 'transient',
0057                      'lfn': os.path.basename(pfn),
0058                      'checksum': checksum,
0059                      'filesize': filesize,
0060                      }
0061         file_spec = FileSpec(filetype='output', **file_data)
0062         return file_spec
0063 
0064     def update_finished_event_ranges(self, out_messagess):
0065         """
0066         Update finished event ranges
0067 
0068         :param out_messages: messages from AthenaMP.
0069         """
0070 
0071         logger.info("update_finished_event_ranges:")
0072 
0073         if len(out_messagess) == 0:
0074             return
0075 
0076         event_ranges = []
0077         for out_msg in out_messagess:
0078             fspec = self.create_file_spec(out_msg['output'])
0079             event_range_status = {"eventRangeID": out_msg['id'], "eventStatus": 'finished', "pfn": out_msg['output'], "fsize": fspec.filesize}
0080             for checksum_key in fspec.checksum:
0081                 event_range_status[checksum_key] = fspec.checksum[checksum_key]
0082             event_ranges.append(event_range_status)
0083         event_ranges_status = {"esOutput": {"numEvents": len(event_ranges)}, "eventRanges": event_ranges}
0084         event_range_message = {'version': 1, 'eventRanges': json.dumps([event_ranges_status])}
0085         self.update_events(event_range_message)
0086 
0087         job = self.get_job()
0088         job.nevents += len(event_ranges)
0089 
0090     def update_failed_event_ranges(self, out_messagess):
0091         """
0092         Update failed event ranges
0093 
0094         :param out_messages: messages from AthenaMP.
0095         """
0096         if len(out_messagess) == 0:
0097             return
0098 
0099         event_ranges = []
0100         for message in out_messagess:
0101             status = message['status'] if message['status'] in ['failed', 'fatal'] else 'failed'
0102             # ToBeFixed errorCode
0103             event_ranges.append({"errorCode": errors.UNKNOWNPAYLOADFAILURE, "eventRangeID": message['id'], "eventStatus": status})
0104             event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)}
0105             self.update_events(event_range_message)
0106 
0107     def handle_out_message(self, message):
0108         """
0109         Handle ES output or error messages hook function for tests.
0110 
0111         :param message: a dict of parsed message.
0112                         For 'finished' event ranges, it's {'id': <id>, 'status': 'finished', 'output': <output>, 'cpu': <cpu>,
0113                                                            'wall': <wall>, 'message': <full message>}.
0114                         Fro 'failed' event ranges, it's {'id': <id>, 'status': 'failed', 'message': <full message>}.
0115         """
0116 
0117         logger.info("Handling out message: %s" % message)
0118 
0119         self.__all_out_messages.append(message)
0120 
0121         if message['status'] in ['failed', 'fatal']:
0122             self.update_failed_event_ranges([message])
0123         else:
0124             self.__queued_out_messages.append(message)
0125 
0126     def stageout_es(self, force=False):
0127         """
0128         Stage out event service outputs.
0129 
0130         """
0131 
0132         job = self.get_job()
0133         # logger.info("job.infosys.queuedata.es_stageout_gap: %s" % job.infosys.queuedata.es_stageout_gap)
0134         if len(self.__queued_out_messages):
0135             if force or self.__last_stageout_time is None or (time.time() > self.__last_stageout_time + job.infosys.queuedata.es_stageout_gap):
0136                 out_messages = []
0137                 while len(self.__queued_out_messages) > 0:
0138                     out_messages.append(self.__queued_out_messages.pop())
0139                 self.update_finished_event_ranges(out_messages)
0140 
0141     def clean(self):
0142         """
0143         Clean temp produced files
0144         """
0145 
0146         logger.info("shutting down...")
0147 
0148         self.__queued_out_messages = []
0149         self.__last_stageout_time = None
0150         self.__all_out_messages = []
0151 
0152         if self.proc:
0153             self.proc.stop()
0154             while self.proc.is_alive():
0155                 time.sleep(0.1)
0156 
0157         self.stop_communicator()
0158 
0159     def run(self):
0160         """
0161         Initialize and run ESProcess.
0162         """
0163         try:
0164             logger.info("starting ES HPOExecutor with thread ident: %s" % self.ident)
0165             if self.is_set_payload():
0166                 payload = self.get_payload()
0167             elif self.is_retrieve_payload():
0168                 payload = self.retrieve_payload()
0169             else:
0170                 logger.error("Payload is not set but is_retrieve_payload is also not set. No payloads.")
0171 
0172             logger.info("payload: %s" % payload)
0173 
0174             logger.info("Starting ESProcess")
0175             proc = ESProcess(payload, waiting_time=999999)
0176             self.proc = proc
0177             logger.info("ESProcess initialized")
0178 
0179             proc.set_get_event_ranges_hook(self.get_event_ranges)
0180             proc.set_handle_out_message_hook(self.handle_out_message)
0181 
0182             logger.info('ESProcess starts to run')
0183             proc.start()
0184             logger.info('ESProcess started to run')
0185 
0186             exit_code = None
0187             try:
0188                 iteration = long(0)  # Python 2  # noqa: F821
0189             except Exception:
0190                 iteration = 0  # Python 3
0191             while proc.is_alive():
0192                 iteration += 1
0193                 if self.is_stop():
0194                     logger.info('Stop is set. breaking -- stop process pid=%s' % proc.pid)
0195                     proc.stop()
0196                     break
0197                 self.stageout_es()
0198 
0199                 exit_code = proc.poll()
0200                 if iteration % 60 == 0:
0201                     logger.info('running: iteration=%d pid=%s exit_code=%s' % (iteration, proc.pid, exit_code))
0202                 time.sleep(5)
0203 
0204             while proc.is_alive():
0205                 time.sleep(1)
0206             logger.info("ESProcess finished")
0207 
0208             self.stageout_es(force=True)
0209             self.clean()
0210 
0211             self.exit_code = proc.poll()
0212 
0213         except Exception as e:
0214             logger.error('Execute payload failed: %s, %s' % (e, traceback.format_exc()))
0215             self.clean()
0216             self.exit_code = -1
0217         logger.info('ES HPO executor finished')