File indexing completed on 2026-04-11 08:41:04
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import json
0011 import os
0012 import time
0013 import traceback
0014
0015 from pilot.common.errorcodes import ErrorCodes
0016 from pilot.eventservice.esprocess.esprocess import ESProcess
0017 from pilot.info.filespec import FileSpec
0018 from pilot.util.filehandling import calculate_checksum
0019
0020 from .baseexecutor import BaseExecutor
0021
0022 import logging
0023 logger = logging.getLogger(__name__)
0024
0025 errors = ErrorCodes()
0026
0027 """
0028 HPO Executor
0029 """
0030
0031
0032 class HPOExecutor(BaseExecutor):
0033 def __init__(self, **kwargs):
0034 super(HPOExecutor, self).__init__(**kwargs)
0035 self.setName("HPOExecutor")
0036
0037 self.__queued_out_messages = []
0038 self.__last_stageout_time = None
0039 self.__all_out_messages = []
0040
0041 self.proc = None
0042 self.exit_code = None
0043
0044 def is_payload_started(self):
0045 return self.proc.is_payload_started() if self.proc else False
0046
0047 def get_pid(self):
0048 return self.proc.pid if self.proc else None
0049
0050 def get_exit_code(self):
0051 return self.exit_code
0052
0053 def create_file_spec(self, pfn):
0054 checksum = calculate_checksum(pfn)
0055 filesize = os.path.getsize(pfn)
0056 file_data = {'scope': 'transient',
0057 'lfn': os.path.basename(pfn),
0058 'checksum': checksum,
0059 'filesize': filesize,
0060 }
0061 file_spec = FileSpec(filetype='output', **file_data)
0062 return file_spec
0063
0064 def update_finished_event_ranges(self, out_messagess):
0065 """
0066 Update finished event ranges
0067
0068 :param out_messages: messages from AthenaMP.
0069 """
0070
0071 logger.info("update_finished_event_ranges:")
0072
0073 if len(out_messagess) == 0:
0074 return
0075
0076 event_ranges = []
0077 for out_msg in out_messagess:
0078 fspec = self.create_file_spec(out_msg['output'])
0079 event_range_status = {"eventRangeID": out_msg['id'], "eventStatus": 'finished', "pfn": out_msg['output'], "fsize": fspec.filesize}
0080 for checksum_key in fspec.checksum:
0081 event_range_status[checksum_key] = fspec.checksum[checksum_key]
0082 event_ranges.append(event_range_status)
0083 event_ranges_status = {"esOutput": {"numEvents": len(event_ranges)}, "eventRanges": event_ranges}
0084 event_range_message = {'version': 1, 'eventRanges': json.dumps([event_ranges_status])}
0085 self.update_events(event_range_message)
0086
0087 job = self.get_job()
0088 job.nevents += len(event_ranges)
0089
0090 def update_failed_event_ranges(self, out_messagess):
0091 """
0092 Update failed event ranges
0093
0094 :param out_messages: messages from AthenaMP.
0095 """
0096 if len(out_messagess) == 0:
0097 return
0098
0099 event_ranges = []
0100 for message in out_messagess:
0101 status = message['status'] if message['status'] in ['failed', 'fatal'] else 'failed'
0102
0103 event_ranges.append({"errorCode": errors.UNKNOWNPAYLOADFAILURE, "eventRangeID": message['id'], "eventStatus": status})
0104 event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)}
0105 self.update_events(event_range_message)
0106
0107 def handle_out_message(self, message):
0108 """
0109 Handle ES output or error messages hook function for tests.
0110
0111 :param message: a dict of parsed message.
0112 For 'finished' event ranges, it's {'id': <id>, 'status': 'finished', 'output': <output>, 'cpu': <cpu>,
0113 'wall': <wall>, 'message': <full message>}.
0114 Fro 'failed' event ranges, it's {'id': <id>, 'status': 'failed', 'message': <full message>}.
0115 """
0116
0117 logger.info("Handling out message: %s" % message)
0118
0119 self.__all_out_messages.append(message)
0120
0121 if message['status'] in ['failed', 'fatal']:
0122 self.update_failed_event_ranges([message])
0123 else:
0124 self.__queued_out_messages.append(message)
0125
0126 def stageout_es(self, force=False):
0127 """
0128 Stage out event service outputs.
0129
0130 """
0131
0132 job = self.get_job()
0133
0134 if len(self.__queued_out_messages):
0135 if force or self.__last_stageout_time is None or (time.time() > self.__last_stageout_time + job.infosys.queuedata.es_stageout_gap):
0136 out_messages = []
0137 while len(self.__queued_out_messages) > 0:
0138 out_messages.append(self.__queued_out_messages.pop())
0139 self.update_finished_event_ranges(out_messages)
0140
0141 def clean(self):
0142 """
0143 Clean temp produced files
0144 """
0145
0146 logger.info("shutting down...")
0147
0148 self.__queued_out_messages = []
0149 self.__last_stageout_time = None
0150 self.__all_out_messages = []
0151
0152 if self.proc:
0153 self.proc.stop()
0154 while self.proc.is_alive():
0155 time.sleep(0.1)
0156
0157 self.stop_communicator()
0158
0159 def run(self):
0160 """
0161 Initialize and run ESProcess.
0162 """
0163 try:
0164 logger.info("starting ES HPOExecutor with thread ident: %s" % self.ident)
0165 if self.is_set_payload():
0166 payload = self.get_payload()
0167 elif self.is_retrieve_payload():
0168 payload = self.retrieve_payload()
0169 else:
0170 logger.error("Payload is not set but is_retrieve_payload is also not set. No payloads.")
0171
0172 logger.info("payload: %s" % payload)
0173
0174 logger.info("Starting ESProcess")
0175 proc = ESProcess(payload, waiting_time=999999)
0176 self.proc = proc
0177 logger.info("ESProcess initialized")
0178
0179 proc.set_get_event_ranges_hook(self.get_event_ranges)
0180 proc.set_handle_out_message_hook(self.handle_out_message)
0181
0182 logger.info('ESProcess starts to run')
0183 proc.start()
0184 logger.info('ESProcess started to run')
0185
0186 exit_code = None
0187 try:
0188 iteration = long(0)
0189 except Exception:
0190 iteration = 0
0191 while proc.is_alive():
0192 iteration += 1
0193 if self.is_stop():
0194 logger.info('Stop is set. breaking -- stop process pid=%s' % proc.pid)
0195 proc.stop()
0196 break
0197 self.stageout_es()
0198
0199 exit_code = proc.poll()
0200 if iteration % 60 == 0:
0201 logger.info('running: iteration=%d pid=%s exit_code=%s' % (iteration, proc.pid, exit_code))
0202 time.sleep(5)
0203
0204 while proc.is_alive():
0205 time.sleep(1)
0206 logger.info("ESProcess finished")
0207
0208 self.stageout_es(force=True)
0209 self.clean()
0210
0211 self.exit_code = proc.poll()
0212
0213 except Exception as e:
0214 logger.error('Execute payload failed: %s, %s' % (e, traceback.format_exc()))
0215 self.clean()
0216 self.exit_code = -1
0217 logger.info('ES HPO executor finished')