Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:04

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Wen Guan, wen.guan@cern.ch, 2018
0009 # - Alexey Anisenkov, anisyonk@cern.ch, 2019
0010 # - Paul Nilsson, paul.nilsson@cern.ch, 2020
0011 
0012 import json
0013 import os
0014 import time
0015 import traceback
0016 
0017 from pilot.api.es_data import StageOutESClient
0018 from pilot.common.exception import PilotException, StageOutFailure
0019 
0020 from pilot.common.errorcodes import ErrorCodes
0021 from pilot.eventservice.esprocess.esprocess import ESProcess
0022 from pilot.info.filespec import FileSpec
0023 from pilot.info import infosys
0024 from pilot.util.container import execute
0025 
0026 from .baseexecutor import BaseExecutor
0027 
0028 import logging
0029 logger = logging.getLogger(__name__)
0030 
0031 errors = ErrorCodes()
0032 
0033 """
0034 Generic Executor with one process to manage EventService
0035 """
0036 
0037 
0038 class GenericExecutor(BaseExecutor):
0039     def __init__(self, **kwargs):
0040         super(GenericExecutor, self).__init__(**kwargs)
0041         self.setName("GenericExecutor")
0042 
0043         self.__queued_out_messages = []
0044         self.__last_stageout_time = None
0045         self.__all_out_messages = []
0046 
0047         self.proc = None
0048         self.exit_code = None
0049 
0050     def is_payload_started(self):
0051         return self.proc.is_payload_started() if self.proc else False
0052 
0053     def get_pid(self):
0054         return self.proc.pid if self.proc else None
0055 
0056     def get_exit_code(self):
0057         return self.exit_code
0058 
0059     def update_finished_event_ranges(self, out_messagess, output_file, fsize, checksum, storage_id):
0060         """
0061         Update finished event ranges
0062 
0063         :param out_messages: messages from AthenaMP.
0064         :param output_file: output file name.
0065         :param fsize: file size.
0066         :param adler32: checksum (adler32) of the file.
0067         :param storage_id: the id of the storage.
0068         """
0069 
0070         if len(out_messagess) == 0:
0071             return
0072 
0073         event_ranges = []
0074         for out_msg in out_messagess:
0075             event_ranges.append({"eventRangeID": out_msg['id'], "eventStatus": 'finished'})
0076         event_range_status = {"zipFile": {"numEvents": len(event_ranges),
0077                                           "objstoreID": storage_id,
0078                                           "lfn": os.path.basename(output_file),
0079                                           "fsize": fsize,
0080                                           "pathConvention": 1000},
0081                               "eventRanges": event_ranges}
0082         for checksum_key in checksum:
0083             event_range_status["zipFile"][checksum_key] = checksum[checksum_key]
0084         event_range_message = {'version': 1, 'eventRanges': json.dumps([event_range_status])}
0085         self.update_events(event_range_message)
0086 
0087         job = self.get_job()
0088         job.nevents += len(event_ranges)
0089 
0090     def update_failed_event_ranges(self, out_messagess):
0091         """
0092         Update failed event ranges
0093 
0094         :param out_messages: messages from AthenaMP.
0095         """
0096 
0097         if len(out_messagess) == 0:
0098             return
0099 
0100         event_ranges = []
0101         for message in out_messagess:
0102             status = message['status'] if message['status'] in ['failed', 'fatal'] else 'failed'
0103             # ToBeFixed errorCode
0104             event_ranges.append({"errorCode": errors.UNKNOWNPAYLOADFAILURE, "eventRangeID": message['id'], "eventStatus": status})
0105             event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)}
0106             self.update_events(event_range_message)
0107 
0108     def handle_out_message(self, message):
0109         """
0110         Handle ES output or error messages hook function for tests.
0111 
0112         :param message: a dict of parsed message.
0113                         For 'finished' event ranges, it's {'id': <id>, 'status': 'finished', 'output': <output>, 'cpu': <cpu>,
0114                                                            'wall': <wall>, 'message': <full message>}.
0115                         Fro 'failed' event ranges, it's {'id': <id>, 'status': 'failed', 'message': <full message>}.
0116         """
0117 
0118         logger.info("Handling out message: %s" % message)
0119 
0120         self.__all_out_messages.append(message)
0121 
0122         if message['status'] in ['failed', 'fatal']:
0123             self.update_failed_event_ranges([message])
0124         else:
0125             self.__queued_out_messages.append(message)
0126 
0127     def tarzip_output_es(self):
0128         """
0129         Tar/zip eventservice outputs.
0130 
0131         :return: out_messages, output_file
0132         """
0133 
0134         out_messages = []
0135         while len(self.__queued_out_messages) > 0:
0136             out_messages.append(self.__queued_out_messages.pop())
0137 
0138         output_file = "EventService_premerge_%s.tar" % out_messages[0]['id']
0139 
0140         ret_messages = []
0141         try:
0142             for out_msg in out_messages:
0143                 command = "tar -rf " + output_file + " --directory=%s %s" % (os.path.dirname(out_msg['output']), os.path.basename(out_msg['output']))
0144                 exit_code, stdout, stderr = execute(command)
0145                 if exit_code == 0:
0146                     ret_messages.append(out_msg)
0147                 else:
0148                     logger.error("Failed to add event output to tar/zip file: out_message: "
0149                                  "%s, exit_code: %s, stdout: %s, stderr: %s" % (out_msg, exit_code, stdout, stderr))
0150                     if 'retries' in out_msg and out_msg['retries'] >= 3:
0151                         logger.error("Discard out messages because it has been retried more than 3 times: %s" % out_msg)
0152                     else:
0153                         if 'retries' in out_msg:
0154                             out_msg['retries'] += 1
0155                         else:
0156                             out_msg['retries'] = 1
0157                         self.__queued_out_messages.append(out_msg)
0158         except Exception as e:
0159             logger.error("Failed to tar/zip event ranges: %s" % str(e))
0160             self.__queued_out_messages += out_messages
0161             return None, None
0162 
0163         return ret_messages, output_file
0164 
0165     def stageout_es_real(self, output_file):
0166         """
0167         Stage out event service output file.
0168 
0169         :param output_file: output file name.
0170         """
0171 
0172         job = self.get_job()
0173         logger.info('prepare to stage-out eventservice files')
0174 
0175         error = None
0176         file_data = {'scope': 'transient',
0177                      'lfn': os.path.basename(output_file),
0178                      }
0179         file_spec = FileSpec(filetype='output', **file_data)
0180         xdata = [file_spec]
0181         kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job)
0182 
0183         try_failover = False
0184         activity = ['es_events', 'pw']  ## FIX ME LATER: replace `pw` with `write_lan` once AGIS is updated (acopytools)
0185 
0186         try:
0187             client = StageOutESClient(job.infosys, logger=logger)
0188             try_failover = True
0189 
0190             client.prepare_destinations(xdata, activity)  ## IF ES job should be allowed to write only at `es_events` astorages, then fix activity names here
0191             client.transfer(xdata, activity=activity, **kwargs)
0192         except PilotException as error:
0193             logger.error(error.get_detail())
0194         except Exception as e:
0195             logger.error(traceback.format_exc())
0196             error = StageOutFailure("stageOut failed with error=%s" % e)
0197 
0198         logger.info('Summary of transferred files:')
0199         logger.info(" -- lfn=%s, status_code=%s, status=%s" % (file_spec.lfn, file_spec.status_code, file_spec.status))
0200 
0201         if error:
0202             logger.error('Failed to stage-out eventservice file(%s): error=%s' % (output_file, error.get_detail()))
0203         elif file_spec.status != 'transferred':
0204             msg = 'Failed to stage-out ES file(%s): logic corrupted: unknown internal error, fspec=%s' % (output_file, file_spec)
0205             logger.error(msg)
0206             raise StageOutFailure(msg)
0207 
0208         failover_storage_activity = ['es_failover', 'pw']
0209 
0210         if try_failover and error and error.get_error_code() not in [ErrorCodes.MISSINGOUTPUTFILE]:  ## try to failover to other storage
0211 
0212             xdata2 = [FileSpec(filetype='output', **file_data)]
0213 
0214             try:
0215                 client.prepare_destinations(xdata2, failover_storage_activity)
0216                 if xdata2[0].ddmendpoint != xdata[0].ddmendpoint:  ## skip transfer to same output storage
0217                     msg = 'Will try to failover ES transfer to astorage with activity=%s, rse=%s' % (failover_storage_activity, xdata2[0].ddmendpoint)
0218                     logger.info(msg)
0219                     client.transfer(xdata2, activity=activity, **kwargs)
0220 
0221                     logger.info('Summary of transferred files (failover transfer):')
0222                     logger.info(" -- lfn=%s, status_code=%s, status=%s" % (xdata2[0].lfn, xdata2[0].status_code, xdata2[0].status))
0223 
0224             except PilotException as e:
0225                 if e.get_error_code() == ErrorCodes.NOSTORAGE:
0226                     logger.info('Failover ES storage is not defined for activity=%s .. skipped' % failover_storage_activity)
0227                 else:
0228                     logger.error('Transfer to failover storage=%s failed .. skipped, error=%s' % (xdata2[0].ddmendpoint, e.get_detail()))
0229             except Exception:
0230                 logger.error('Failover ES stageout failed .. skipped')
0231                 logger.error(traceback.format_exc())
0232 
0233             if xdata2[0].status == 'transferred':
0234                 error = None
0235                 file_spec = xdata2[0]
0236 
0237         if error:
0238             raise error
0239 
0240         storage_id = infosys.get_storage_id(file_spec.ddmendpoint)
0241 
0242         return file_spec.ddmendpoint, storage_id, file_spec.filesize, file_spec.checksum
0243 
0244     def stageout_es(self, force=False):
0245         """
0246         Stage out event service outputs.
0247 
0248         """
0249 
0250         job = self.get_job()
0251         if len(self.__queued_out_messages):
0252             if force or self.__last_stageout_time is None or (time.time() > self.__last_stageout_time + job.infosys.queuedata.es_stageout_gap):
0253 
0254                 out_messagess, output_file = self.tarzip_output_es()
0255                 logger.info("tar/zip event ranges: %s, output_file: %s" % (out_messagess, output_file))
0256 
0257                 if out_messagess:
0258                     self.__last_stageout_time = time.time()
0259                     try:
0260                         logger.info("Staging output file: %s" % output_file)
0261                         storage, storage_id, fsize, checksum = self.stageout_es_real(output_file)
0262                         logger.info("Staged output file (%s) to storage: %s storage_id: %s" % (output_file, storage, storage_id))
0263 
0264                         self.update_finished_event_ranges(out_messagess, output_file, fsize, checksum, storage_id)
0265                     except Exception as e:
0266                         logger.error("Failed to stage out file(%s): %s, %s" % (output_file, str(e), traceback.format_exc()))
0267 
0268                         if force:
0269                             self.update_failed_event_ranges(out_messagess)
0270                         else:
0271                             logger.info("Failed to stageout, adding messages back to the queued messages")
0272                             self.__queued_out_messages += out_messagess
0273 
0274     def clean(self):
0275         """
0276         Clean temp produced files
0277         """
0278 
0279         for msg in self.__all_out_messages:
0280             if msg['status'] in ['failed', 'fatal']:
0281                 pass
0282             elif 'output' in msg:
0283                 try:
0284                     logger.info("Removing es premerge file: %s" % msg['output'])
0285                     os.remove(msg['output'])
0286                 except Exception as e:
0287                     logger.error("Failed to remove file(%s): %s" % (msg['output'], str(e)))
0288         self.__queued_out_messages = []
0289         self.__last_stageout_time = None
0290         self.__all_out_messages = []
0291 
0292         if self.proc:
0293             self.proc.stop()
0294             while self.proc.is_alive():
0295                 time.sleep(0.1)
0296 
0297         self.stop_communicator()
0298 
0299     def run(self):
0300         """
0301         Initialize and run ESProcess.
0302         """
0303 
0304         try:
0305             logger.info("starting ES GenericExecutor with thread ident: %s" % (self.ident))
0306             if self.is_set_payload():
0307                 payload = self.get_payload()
0308             elif self.is_retrieve_payload():
0309                 payload = self.retrieve_payload()
0310             else:
0311                 logger.error("Payload is not set but is_retrieve_payload is also not set. No payloads.")
0312 
0313             logger.info("payload: %s" % payload)
0314 
0315             logger.info("Starting ESProcess")
0316             proc = ESProcess(payload)
0317             self.proc = proc
0318             logger.info("ESProcess initialized")
0319 
0320             proc.set_get_event_ranges_hook(self.get_event_ranges)
0321             proc.set_handle_out_message_hook(self.handle_out_message)
0322 
0323             logger.info('ESProcess starts to run')
0324             proc.start()
0325             logger.info('ESProcess started to run')
0326 
0327             try:
0328                 iteration = long(0)  # Python 2 # noqa: F821
0329             except Exception:
0330                 iteration = 0  # Python 3
0331             while proc.is_alive():
0332                 iteration += 1
0333                 if self.is_stop():
0334                     logger.info('Stop is set. breaking -- stop process pid=%s' % proc.pid)
0335                     proc.stop()
0336                     break
0337                 self.stageout_es()
0338 
0339                 exit_code = proc.poll()
0340                 if iteration % 60 == 0:
0341                     logger.info('running: iteration=%d pid=%s exit_code=%s' % (iteration, proc.pid, exit_code))
0342                 time.sleep(5)
0343 
0344             while proc.is_alive():
0345                 time.sleep(1)
0346             logger.info("ESProcess finished")
0347 
0348             self.stageout_es(force=True)
0349             self.clean()
0350 
0351             self.exit_code = proc.poll()
0352 
0353         except Exception as e:
0354             logger.error('Execute payload failed: %s, %s' % (str(e), traceback.format_exc()))
0355             self.clean()
0356             self.exit_code = -1
0357         logger.info('ES generic executor finished')