File indexing completed on 2026-04-11 08:41:04
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import json
0012 import os
0013 import time
0014 import traceback
0015
0016 from pilot.common.errorcodes import ErrorCodes
0017 from pilot.eventservice.esprocess.esprocess import ESProcess
0018 from pilot.info.filespec import FileSpec
0019 from pilot.util.filehandling import calculate_checksum, move
0020
0021 from .baseexecutor import BaseExecutor
0022
0023 import logging
0024 logger = logging.getLogger(__name__)
0025
0026 errors = ErrorCodes()
0027
0028 """
0029 Raythena Executor with one process to manage EventService
0030 """
0031
0032
0033 class RaythenaExecutor(BaseExecutor):
0034 def __init__(self, **kwargs):
0035 super(RaythenaExecutor, self).__init__(**kwargs)
0036 self.setName("RaythenaExecutor")
0037
0038 self.__queued_out_messages = []
0039 self.__last_stageout_time = None
0040 self.__all_out_messages = []
0041
0042 self.proc = None
0043 self.exit_code = None
0044
0045 def is_payload_started(self):
0046 return self.proc.is_payload_started() if self.proc else False
0047
0048 def get_pid(self):
0049 return self.proc.pid if self.proc else None
0050
0051 def get_exit_code(self):
0052 return self.exit_code
0053
0054 def create_file_spec(self, pfn):
0055 checksum = calculate_checksum(pfn)
0056 filesize = os.path.getsize(pfn)
0057 file_data = {'scope': 'transient',
0058 'lfn': os.path.basename(pfn),
0059 'checksum': checksum,
0060 'filesize': filesize,
0061 }
0062 file_spec = FileSpec(filetype='output', **file_data)
0063 return file_spec
0064
0065 def move_output(self, pfn):
0066 """
0067 Move output file from given PFN path to PILOT_OUTPUT_DIR if set.
0068
0069 :param pfn: physical file name (string).
0070 :return:
0071 """
0072
0073 outputdir = os.environ.get('PILOT_OUTPUT_DIR', None)
0074 if outputdir:
0075 try:
0076 move(pfn, outputdir)
0077 except Exception as e:
0078 logger.warning('failed to move output: %s' % e)
0079
0080 def update_finished_event_ranges(self, out_messagess):
0081 """
0082 Update finished event ranges
0083
0084 :param out_messages: messages from AthenaMP.
0085 """
0086
0087 logger.info("update_finished_event_ranges:")
0088
0089 if len(out_messagess) == 0:
0090 return
0091
0092 event_ranges = []
0093 for out_msg in out_messagess:
0094 fspec = self.create_file_spec(out_msg['output'])
0095 event_range_status = {"eventRangeID": out_msg['id'], "eventStatus": 'finished', "pfn": out_msg['output'], "fsize": fspec.filesize}
0096 for checksum_key in fspec.checksum:
0097 event_range_status[checksum_key] = fspec.checksum[checksum_key]
0098 event_ranges.append(event_range_status)
0099
0100
0101 self.move_output(out_msg['output'])
0102
0103 event_ranges_status = {"esOutput": {"numEvents": len(event_ranges)}, "eventRanges": event_ranges}
0104 event_range_message = {'version': 1, 'eventRanges': json.dumps([event_ranges_status])}
0105 self.update_events(event_range_message)
0106
0107 job = self.get_job()
0108 job.nevents += len(event_ranges)
0109
0110 def update_failed_event_ranges(self, out_messagess):
0111 """
0112 Update failed event ranges
0113
0114 :param out_messages: messages from AthenaMP.
0115 """
0116 if len(out_messagess) == 0:
0117 return
0118
0119 event_ranges = []
0120 for message in out_messagess:
0121 status = message['status'] if message['status'] in ['failed', 'fatal'] else 'failed'
0122
0123 event_ranges.append({"errorCode": errors.UNKNOWNPAYLOADFAILURE, "eventRangeID": message['id'], "eventStatus": status})
0124 event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)}
0125 self.update_events(event_range_message)
0126
0127 def handle_out_message(self, message):
0128 """
0129 Handle ES output or error messages hook function for tests.
0130
0131 :param message: a dict of parsed message.
0132 For 'finished' event ranges, it's {'id': <id>, 'status': 'finished', 'output': <output>, 'cpu': <cpu>,
0133 'wall': <wall>, 'message': <full message>}.
0134 Fro 'failed' event ranges, it's {'id': <id>, 'status': 'failed', 'message': <full message>}.
0135 """
0136
0137 logger.info("Handling out message: %s" % message)
0138
0139 self.__all_out_messages.append(message)
0140
0141 if message['status'] in ['failed', 'fatal']:
0142 self.update_failed_event_ranges([message])
0143 else:
0144 self.__queued_out_messages.append(message)
0145
0146 def stageout_es(self, force=False):
0147 """
0148 Stage out event service outputs.
0149
0150 """
0151
0152 job = self.get_job()
0153
0154 if len(self.__queued_out_messages):
0155 if force or self.__last_stageout_time is None or (time.time() > self.__last_stageout_time + job.infosys.queuedata.es_stageout_gap):
0156 out_messages = []
0157 while len(self.__queued_out_messages) > 0:
0158 out_messages.append(self.__queued_out_messages.pop())
0159 self.update_finished_event_ranges(out_messages)
0160
0161 def clean(self):
0162 """
0163 Clean temp produced files
0164 """
0165
0166 logger.info("shutting down...")
0167
0168 self.__queued_out_messages = []
0169 self.__last_stageout_time = None
0170 self.__all_out_messages = []
0171
0172 if self.proc:
0173 self.proc.stop()
0174 while self.proc.is_alive():
0175 time.sleep(0.1)
0176
0177 self.stop_communicator()
0178
0179 def run(self):
0180 """
0181 Initialize and run ESProcess.
0182 """
0183 try:
0184 logger.info("starting ES RaythenaExecutor with thread ident: %s" % self.ident)
0185 if self.is_set_payload():
0186 payload = self.get_payload()
0187 elif self.is_retrieve_payload():
0188 payload = self.retrieve_payload()
0189 else:
0190 logger.error("Payload is not set but is_retrieve_payload is also not set. No payloads.")
0191
0192 logger.info("payload: %s" % payload)
0193
0194 logger.info("Starting ESProcess")
0195 proc = ESProcess(payload, waiting_time=999999)
0196 self.proc = proc
0197 logger.info("ESProcess initialized")
0198
0199 proc.set_get_event_ranges_hook(self.get_event_ranges)
0200 proc.set_handle_out_message_hook(self.handle_out_message)
0201
0202 logger.info('ESProcess starts to run')
0203 proc.start()
0204 logger.info('ESProcess started to run')
0205
0206 exit_code = None
0207 try:
0208 iteration = long(0)
0209 except Exception:
0210 iteration = 0
0211 while proc.is_alive():
0212 iteration += 1
0213 if self.is_stop():
0214 logger.info('Stop is set. breaking -- stop process pid=%s' % proc.pid)
0215 proc.stop()
0216 break
0217 self.stageout_es()
0218
0219 exit_code = proc.poll()
0220 if iteration % 60 == 0:
0221 logger.info('running: iteration=%d pid=%s exit_code=%s' % (iteration, proc.pid, exit_code))
0222 time.sleep(5)
0223
0224 while proc.is_alive():
0225 time.sleep(1)
0226 logger.info("ESProcess finished")
0227
0228 self.stageout_es(force=True)
0229 self.clean()
0230
0231 self.exit_code = proc.poll()
0232
0233 except Exception as e:
0234 logger.error('Execute payload failed: %s, %s' % (e, traceback.format_exc()))
0235 self.clean()
0236 self.exit_code = -1
0237 logger.info('ES raythena executor finished')