File indexing completed on 2026-04-11 08:41:04
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import json
0013 import os
0014 import time
0015 import traceback
0016
0017 from pilot.api.es_data import StageOutESClient
0018 from pilot.common.exception import PilotException, StageOutFailure
0019
0020 from pilot.common.errorcodes import ErrorCodes
0021 from pilot.eventservice.esprocess.esprocess import ESProcess
0022 from pilot.info.filespec import FileSpec
0023 from pilot.info import infosys
0024 from pilot.util.container import execute
0025
0026 from .baseexecutor import BaseExecutor
0027
0028 import logging
0029 logger = logging.getLogger(__name__)
0030
0031 errors = ErrorCodes()
0032
0033 """
0034 Generic Executor with one process to manage EventService
0035 """
0036
0037
0038 class GenericExecutor(BaseExecutor):
0039 def __init__(self, **kwargs):
0040 super(GenericExecutor, self).__init__(**kwargs)
0041 self.setName("GenericExecutor")
0042
0043 self.__queued_out_messages = []
0044 self.__last_stageout_time = None
0045 self.__all_out_messages = []
0046
0047 self.proc = None
0048 self.exit_code = None
0049
0050 def is_payload_started(self):
0051 return self.proc.is_payload_started() if self.proc else False
0052
0053 def get_pid(self):
0054 return self.proc.pid if self.proc else None
0055
0056 def get_exit_code(self):
0057 return self.exit_code
0058
0059 def update_finished_event_ranges(self, out_messagess, output_file, fsize, checksum, storage_id):
0060 """
0061 Update finished event ranges
0062
0063 :param out_messages: messages from AthenaMP.
0064 :param output_file: output file name.
0065 :param fsize: file size.
0066 :param adler32: checksum (adler32) of the file.
0067 :param storage_id: the id of the storage.
0068 """
0069
0070 if len(out_messagess) == 0:
0071 return
0072
0073 event_ranges = []
0074 for out_msg in out_messagess:
0075 event_ranges.append({"eventRangeID": out_msg['id'], "eventStatus": 'finished'})
0076 event_range_status = {"zipFile": {"numEvents": len(event_ranges),
0077 "objstoreID": storage_id,
0078 "lfn": os.path.basename(output_file),
0079 "fsize": fsize,
0080 "pathConvention": 1000},
0081 "eventRanges": event_ranges}
0082 for checksum_key in checksum:
0083 event_range_status["zipFile"][checksum_key] = checksum[checksum_key]
0084 event_range_message = {'version': 1, 'eventRanges': json.dumps([event_range_status])}
0085 self.update_events(event_range_message)
0086
0087 job = self.get_job()
0088 job.nevents += len(event_ranges)
0089
0090 def update_failed_event_ranges(self, out_messagess):
0091 """
0092 Update failed event ranges
0093
0094 :param out_messages: messages from AthenaMP.
0095 """
0096
0097 if len(out_messagess) == 0:
0098 return
0099
0100 event_ranges = []
0101 for message in out_messagess:
0102 status = message['status'] if message['status'] in ['failed', 'fatal'] else 'failed'
0103
0104 event_ranges.append({"errorCode": errors.UNKNOWNPAYLOADFAILURE, "eventRangeID": message['id'], "eventStatus": status})
0105 event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)}
0106 self.update_events(event_range_message)
0107
0108 def handle_out_message(self, message):
0109 """
0110 Handle ES output or error messages hook function for tests.
0111
0112 :param message: a dict of parsed message.
0113 For 'finished' event ranges, it's {'id': <id>, 'status': 'finished', 'output': <output>, 'cpu': <cpu>,
0114 'wall': <wall>, 'message': <full message>}.
0115 Fro 'failed' event ranges, it's {'id': <id>, 'status': 'failed', 'message': <full message>}.
0116 """
0117
0118 logger.info("Handling out message: %s" % message)
0119
0120 self.__all_out_messages.append(message)
0121
0122 if message['status'] in ['failed', 'fatal']:
0123 self.update_failed_event_ranges([message])
0124 else:
0125 self.__queued_out_messages.append(message)
0126
0127 def tarzip_output_es(self):
0128 """
0129 Tar/zip eventservice outputs.
0130
0131 :return: out_messages, output_file
0132 """
0133
0134 out_messages = []
0135 while len(self.__queued_out_messages) > 0:
0136 out_messages.append(self.__queued_out_messages.pop())
0137
0138 output_file = "EventService_premerge_%s.tar" % out_messages[0]['id']
0139
0140 ret_messages = []
0141 try:
0142 for out_msg in out_messages:
0143 command = "tar -rf " + output_file + " --directory=%s %s" % (os.path.dirname(out_msg['output']), os.path.basename(out_msg['output']))
0144 exit_code, stdout, stderr = execute(command)
0145 if exit_code == 0:
0146 ret_messages.append(out_msg)
0147 else:
0148 logger.error("Failed to add event output to tar/zip file: out_message: "
0149 "%s, exit_code: %s, stdout: %s, stderr: %s" % (out_msg, exit_code, stdout, stderr))
0150 if 'retries' in out_msg and out_msg['retries'] >= 3:
0151 logger.error("Discard out messages because it has been retried more than 3 times: %s" % out_msg)
0152 else:
0153 if 'retries' in out_msg:
0154 out_msg['retries'] += 1
0155 else:
0156 out_msg['retries'] = 1
0157 self.__queued_out_messages.append(out_msg)
0158 except Exception as e:
0159 logger.error("Failed to tar/zip event ranges: %s" % str(e))
0160 self.__queued_out_messages += out_messages
0161 return None, None
0162
0163 return ret_messages, output_file
0164
0165 def stageout_es_real(self, output_file):
0166 """
0167 Stage out event service output file.
0168
0169 :param output_file: output file name.
0170 """
0171
0172 job = self.get_job()
0173 logger.info('prepare to stage-out eventservice files')
0174
0175 error = None
0176 file_data = {'scope': 'transient',
0177 'lfn': os.path.basename(output_file),
0178 }
0179 file_spec = FileSpec(filetype='output', **file_data)
0180 xdata = [file_spec]
0181 kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job)
0182
0183 try_failover = False
0184 activity = ['es_events', 'pw']
0185
0186 try:
0187 client = StageOutESClient(job.infosys, logger=logger)
0188 try_failover = True
0189
0190 client.prepare_destinations(xdata, activity)
0191 client.transfer(xdata, activity=activity, **kwargs)
0192 except PilotException as error:
0193 logger.error(error.get_detail())
0194 except Exception as e:
0195 logger.error(traceback.format_exc())
0196 error = StageOutFailure("stageOut failed with error=%s" % e)
0197
0198 logger.info('Summary of transferred files:')
0199 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (file_spec.lfn, file_spec.status_code, file_spec.status))
0200
0201 if error:
0202 logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0203 elif file_spec.status != 'transferred':
0204 msg = 'Failed to stage-out ES file(%s): logic corrupted: unknown internal error, fspec=%s' % (output_file, file_spec)
0205 logger.error(msg)
0206 raise StageOutFailure(msg)
0207
0208 failover_storage_activity = ['es_failover', 'pw']
0209
0210 if try_failover and error and error.get_error_code() not in [ErrorCodes.MISSINGOUTPUTFILE]:
0211
0212 xdata2 = [FileSpec(filetype='output', **file_data)]
0213
0214 try:
0215 client.prepare_destinations(xdata2, failover_storage_activity)
0216 if xdata2[0].ddmendpoint != xdata[0].ddmendpoint:
0217 msg = 'Will try to failover ES transfer to astorage with activity=%s, rse=%s' % (failover_storage_activity, xdata2[0].ddmendpoint)
0218 logger.info(msg)
0219 client.transfer(xdata2, activity=activity, **kwargs)
0220
0221 logger.info('Summary of transferred files (failover transfer):')
0222 logger.info(" -- lfn=%s, status_code=%s, status=%s" % (xdata2[0].lfn, xdata2[0].status_code, xdata2[0].status))
0223
0224 except PilotException as e:
0225 if e.get_error_code() == ErrorCodes.NOSTORAGE:
0226 logger.info('Failover ES storage is not defined for activity=%s .. skipped' % failover_storage_activity)
0227 else:
0228 logger.error('Transfer to failover storage=%s failed .. skipped, error=%s' % (xdata2[0].ddmendpoint, e.get_detail()))
0229 except Exception:
0230 logger.error('Failover ES stageout failed .. skipped')
0231 logger.error(traceback.format_exc())
0232
0233 if xdata2[0].status == 'transferred':
0234 error = None
0235 file_spec = xdata2[0]
0236
0237 if error:
0238 raise error
0239
0240 storage_id = infosys.get_storage_id(file_spec.ddmendpoint)
0241
0242 return file_spec.ddmendpoint, storage_id, file_spec.filesize, file_spec.checksum
0243
0244 def stageout_es(self, force=False):
0245 """
0246 Stage out event service outputs.
0247
0248 """
0249
0250 job = self.get_job()
0251 if len(self.__queued_out_messages):
0252 if force or self.__last_stageout_time is None or (time.time() > self.__last_stageout_time + job.infosys.queuedata.es_stageout_gap):
0253
0254 out_messagess, output_file = self.tarzip_output_es()
0255 logger.info("tar/zip event ranges: %s, output_file: %s" % (out_messagess, output_file))
0256
0257 if out_messagess:
0258 self.__last_stageout_time = time.time()
0259 try:
0260 logger.info("Staging output file: %s" % output_file)
0261 storage, storage_id, fsize, checksum = self.stageout_es_real(output_file)
0262 logger.info("Staged output file (%s) to storage: %s storage_id: %s" % (output_file, storage, storage_id))
0263
0264 self.update_finished_event_ranges(out_messagess, output_file, fsize, checksum, storage_id)
0265 except Exception as e:
0266 logger.error("Failed to stage out file(%s): %s, %s" % (output_file, str(e), traceback.format_exc()))
0267
0268 if force:
0269 self.update_failed_event_ranges(out_messagess)
0270 else:
0271 logger.info("Failed to stageout, adding messages back to the queued messages")
0272 self.__queued_out_messages += out_messagess
0273
0274 def clean(self):
0275 """
0276 Clean temp produced files
0277 """
0278
0279 for msg in self.__all_out_messages:
0280 if msg['status'] in ['failed', 'fatal']:
0281 pass
0282 elif 'output' in msg:
0283 try:
0284 logger.info("Removing es premerge file: %s" % msg['output'])
0285 os.remove(msg['output'])
0286 except Exception as e:
0287 logger.error("Failed to remove file(%s): %s" % (msg['output'], str(e)))
0288 self.__queued_out_messages = []
0289 self.__last_stageout_time = None
0290 self.__all_out_messages = []
0291
0292 if self.proc:
0293 self.proc.stop()
0294 while self.proc.is_alive():
0295 time.sleep(0.1)
0296
0297 self.stop_communicator()
0298
0299 def run(self):
0300 """
0301 Initialize and run ESProcess.
0302 """
0303
0304 try:
0305 logger.info("starting ES GenericExecutor with thread ident: %s" % (self.ident))
0306 if self.is_set_payload():
0307 payload = self.get_payload()
0308 elif self.is_retrieve_payload():
0309 payload = self.retrieve_payload()
0310 else:
0311 logger.error("Payload is not set but is_retrieve_payload is also not set. No payloads.")
0312
0313 logger.info("payload: %s" % payload)
0314
0315 logger.info("Starting ESProcess")
0316 proc = ESProcess(payload)
0317 self.proc = proc
0318 logger.info("ESProcess initialized")
0319
0320 proc.set_get_event_ranges_hook(self.get_event_ranges)
0321 proc.set_handle_out_message_hook(self.handle_out_message)
0322
0323 logger.info('ESProcess starts to run')
0324 proc.start()
0325 logger.info('ESProcess started to run')
0326
0327 try:
0328 iteration = long(0)
0329 except Exception:
0330 iteration = 0
0331 while proc.is_alive():
0332 iteration += 1
0333 if self.is_stop():
0334 logger.info('Stop is set. breaking -- stop process pid=%s' % proc.pid)
0335 proc.stop()
0336 break
0337 self.stageout_es()
0338
0339 exit_code = proc.poll()
0340 if iteration % 60 == 0:
0341 logger.info('running: iteration=%d pid=%s exit_code=%s' % (iteration, proc.pid, exit_code))
0342 time.sleep(5)
0343
0344 while proc.is_alive():
0345 time.sleep(1)
0346 logger.info("ESProcess finished")
0347
0348 self.stageout_es(force=True)
0349 self.clean()
0350
0351 self.exit_code = proc.poll()
0352
0353 except Exception as e:
0354 logger.error('Execute payload failed: %s, %s' % (str(e), traceback.format_exc()))
0355 self.clean()
0356 self.exit_code = -1
0357 logger.info('ES generic executor finished')