Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:03

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017
0009 # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017
0010 # - Tobias Wegner, tobias.wegner@cern.ch, 2017
0011 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0012 # - Wen Guan, wen.guan@cern.ch, 2018
0013 
0014 import time
0015 import os
0016 import signal
0017 from subprocess import PIPE
0018 
0019 from pilot.common.errorcodes import ErrorCodes
0020 from pilot.control.job import send_state
0021 from pilot.util.auxiliary import set_pilot_state, show_memory_usage
0022 # from pilot.util.config import config
0023 from pilot.util.container import execute
0024 from pilot.util.constants import UTILITY_BEFORE_PAYLOAD, UTILITY_WITH_PAYLOAD, UTILITY_AFTER_PAYLOAD_STARTED, \
0025     UTILITY_AFTER_PAYLOAD_FINISHED, PILOT_PRE_SETUP, PILOT_POST_SETUP, PILOT_PRE_PAYLOAD, PILOT_POST_PAYLOAD, \
0026     UTILITY_AFTER_PAYLOAD_STARTED2, UTILITY_AFTER_PAYLOAD_FINISHED2
0027 from pilot.util.filehandling import write_file
0028 from pilot.util.processes import kill_processes
0029 from pilot.util.timing import add_to_pilot_timing
0030 from pilot.common.exception import PilotException
0031 
0032 import logging
0033 logger = logging.getLogger(__name__)
0034 
0035 errors = ErrorCodes()
0036 
0037 
0038 class Executor(object):
0039     def __init__(self, args, job, out, err, traces):
0040         self.__args = args
0041         self.__job = job
0042         self.__out = out  # payload stdout file object
0043         self.__err = err  # payload stderr file object
0044         self.__traces = traces
0045         self.__preprocess_stdout_name = ''
0046         self.__preprocess_stderr_name = ''
0047         self.__coprocess_stdout_name = 'coprocess_stdout.txt'
0048         self.__coprocess_stderr_name = 'coprocess_stderr.txt'
0049         self.__postprocess_stdout_name = ''
0050         self.__postprocess_stderr_name = ''
0051 
0052     def get_job(self):
0053         """
0054         Get the job object.
0055         :return: job object.
0056         """
0057         return self.__job
0058 
0059     def pre_setup(self, job):
0060         """
0061         Functions to run pre setup
0062         :param job: job object
0063         """
0064         # write time stamps to pilot timing file
0065         add_to_pilot_timing(job.jobid, PILOT_PRE_SETUP, time.time(), self.__args)
0066 
0067     def post_setup(self, job):
0068         """
0069         Functions to run post setup
0070         :param job: job object
0071         """
0072         # write time stamps to pilot timing file
0073         add_to_pilot_timing(job.jobid, PILOT_POST_SETUP, time.time(), self.__args)
0074 
0075     def utility_before_payload(self, job):
0076         """
0077         Prepare commands/utilities to run before payload.
0078         These commands will be executed later (as eg the payload command setup is unknown at this point, which is
0079         needed for the preprocessing. Preprocessing is prepared here).
0080 
0081         REFACTOR
0082 
0083         :param job: job object.
0084         """
0085         cmd = ""
0086 
0087         # get the payload command from the user specific code
0088         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0089         user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0090 
0091         # should we run any additional commands? (e.g. special monitoring commands)
0092         cmd_dictionary = user.get_utility_commands(order=UTILITY_BEFORE_PAYLOAD, job=job)
0093         if cmd_dictionary:
0094             cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0095             logger.info('utility command (\'%s\') to be executed before the payload: %s', cmd_dictionary.get('label', 'utility'), cmd)
0096 
0097         return cmd
0098 
0099     def utility_with_payload(self, job):
0100         """
0101         Functions to run with payload.
0102 
0103         REFACTOR
0104 
0105         :param job: job object.
0106         """
0107         cmd = ""
0108 
0109         # get the payload command from the user specific code
0110         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0111         user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0112 
0113         # should any additional commands be prepended to the payload execution string?
0114         cmd_dictionary = user.get_utility_commands(order=UTILITY_WITH_PAYLOAD, job=job)
0115         if cmd_dictionary:
0116             cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0117             logger.info('utility command (\'%s\') to be executed with the payload: %s', cmd_dictionary.get('label', 'utility'), cmd)
0118 
0119         return cmd
0120 
0121     def get_utility_command(self, order=None):
0122         """
0123         Return the command for the requested utility command (will be downloaded if necessary).
0124         Note: the utility itself is defined in the user common code and is defined according to the order,
0125         e.g. UTILITY_AFTER_PAYLOAD_STARTED means a co-process (see ATLAS user code).
0126 
0127         :param order: order constant (const).
0128         :return: command to be executed (string).
0129         """
0130 
0131         cmd = ""
0132 
0133         # get the payload command from the user specific code
0134         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0135         user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0136 
0137         # should any additional commands be executed after the payload?
0138         cmd_dictionary = user.get_utility_commands(order=order, job=self.__job)
0139         if cmd_dictionary:
0140             cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0141             logger.info('utility command (\'%s\') to be executed after the payload: %s', cmd_dictionary.get('label', 'utility'), cmd)
0142 
0143         return cmd
0144 
0145     def utility_after_payload_started(self, job):
0146         """
0147         Functions to run after payload started
0148         :param job: job object
0149         """
0150 
0151         # get the payload command from the user specific code
0152         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0153         user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0154 
0155         # should any additional commands be executed after the payload?
0156         cmd_dictionary = user.get_utility_commands(order=UTILITY_AFTER_PAYLOAD_STARTED, job=job)
0157         if cmd_dictionary:
0158             cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0159             logger.info('utility command to be executed after the payload: %s', cmd)
0160 
0161             # how should this command be executed?
0162             utilitycommand = user.get_utility_command_setup(cmd_dictionary.get('command'), job)
0163             if not utilitycommand:
0164                 logger.warning('empty utility command - nothing to run')
0165                 return
0166             try:
0167                 proc1 = execute(utilitycommand, workdir=job.workdir, returnproc=True, usecontainer=False,
0168                                 stdout=PIPE, stderr=PIPE, cwd=job.workdir, job=job)
0169             except Exception as error:
0170                 logger.error('could not execute: %s', error)
0171             else:
0172                 # store process handle in job object, and keep track on how many times the command has been launched
0173                 # also store the full command in case it needs to be restarted later (by the job_monitor() thread)
0174                 job.utilities[cmd_dictionary.get('command')] = [proc1, 1, utilitycommand]
0175 
0176     def utility_after_payload_started_new(self, job):
0177         """
0178         Functions to run after payload started
0179 
0180         REFACTOR
0181 
0182         :param job: job object
0183         """
0184         cmd = ""
0185 
0186         # get the payload command from the user specific code
0187         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0188         user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0189 
0190         # should any additional commands be executed after the payload?
0191         cmd_dictionary = user.get_utility_commands(order=UTILITY_AFTER_PAYLOAD_STARTED, job=job)
0192         if cmd_dictionary:
0193             cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0194             logger.info('utility command to be executed after the payload: %s', cmd)
0195 
0196         return cmd
0197 
0198 #            # how should this command be executed?
0199 #            utilitycommand = user.get_utility_command_setup(cmd_dictionary.get('command'), job)
0200 #            if not utilitycommand:
0201 #                logger.warning('empty utility command - nothing to run')
0202 #                return
0203 #            try:
0204 #                proc = execute(utilitycommand, workdir=job.workdir, returnproc=True, usecontainer=False,
0205 #                               stdout=PIPE, stderr=PIPE, cwd=job.workdir, job=job)
0206 #            except Exception as error:
0207 #                logger.error('could not execute: %s', error)
0208 #            else:
0209 #                # store process handle in job object, and keep track on how many times the command has been launched
0210 #                # also store the full command in case it needs to be restarted later (by the job_monitor() thread)
0211 #                job.utilities[cmd_dictionary.get('command')] = [proc, 1, utilitycommand]
0212 
0213     def utility_after_payload_finished(self, job, order):
0214         """
0215         Prepare commands/utilities to run after payload has finished.
0216 
0217         This command will be executed later.
0218 
0219         The order constant can be UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2
0220 
0221         :param job: job object.
0222         :param order: constant used for utility selection (constant).
0223         :return: command (string), label (string).
0224         """
0225 
0226         cmd = ""
0227 
0228         # get the payload command from the user specific code
0229         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0230         user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0231 
0232         # should any additional commands be prepended to the payload execution string?
0233         cmd_dictionary = user.get_utility_commands(order=order, job=job)
0234         if cmd_dictionary:
0235             cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0236             logger.info('utility command (\'%s\') to be executed after the payload has finished: %s', cmd_dictionary.get('label', 'utility'), cmd)
0237 
0238         return cmd, cmd_dictionary.get('label'), cmd_dictionary.get('ignore_failure')
0239 
0240     def execute_utility_command(self, cmd, job, label):
0241         """
0242         Execute a utility command (e.g. pre/postprocess commands; label=preprocess etc).
0243 
0244         :param cmd: full command to be executed (string).
0245         :param job: job object.
0246         :param label: command label (string).
0247         :return: exit code (int).
0248         """
0249 
0250         exit_code, stdout, stderr = execute(cmd, workdir=job.workdir, cwd=job.workdir, usecontainer=False)
0251         if exit_code:
0252             ignored_exit_codes = [160, 161, 162]
0253             logger.warning('command returned non-zero exit code: %s (exit code = %d) - see utility logs for details', cmd, exit_code)
0254             if label == 'preprocess':
0255                 err = errors.PREPROCESSFAILURE
0256             elif label == 'postprocess':
0257                 err = errors.POSTPROCESSFAILURE
0258             else:
0259                 err = 0  # ie ignore
0260                 exit_code = 0
0261             if err and exit_code not in ignored_exit_codes:  # ignore no-more-data-points exit codes
0262                 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(err)
0263             if exit_code in ignored_exit_codes:
0264                 job.transexitcode = exit_code
0265 
0266         # write output to log files
0267         self.write_utility_output(job.workdir, label, stdout, stderr)
0268 
0269         return exit_code
0270 
0271     def write_utility_output(self, workdir, step, stdout, stderr):
0272         """
0273         Write the utility command output to stdout, stderr files to the job.workdir for the current step.
0274         -> <step>_stdout.txt, <step>_stderr.txt
0275         Example of step: preprocess, postprocess.
0276 
0277         :param workdir: job workdir (string).
0278         :param step: utility step (string).
0279         :param stdout: command stdout (string).
0280         :param stderr: command stderr (string).
0281         :return:
0282         """
0283 
0284         # dump to file
0285         try:
0286             name_stdout = step + '_stdout.txt'
0287             name_stderr = step + '_stderr.txt'
0288             if step == 'preprocess':
0289                 self.__preprocess_stdout_name = name_stdout
0290                 self.__preprocess_stderr_name = name_stderr
0291             elif step == 'postprocess':
0292                 self.__postprocess_stdout_name = name_stdout
0293                 self.__postprocess_stderr_name = name_stderr
0294             name = os.path.join(workdir, step + '_stdout.txt')
0295             write_file(name, stdout, unique=True)
0296         except PilotException as error:
0297             logger.warning('failed to write utility stdout to file: %s, %s', error, stdout)
0298         else:
0299             logger.debug('wrote %s', name)
0300 
0301         try:
0302             name = os.path.join(workdir, step + '_stderr.txt')
0303             write_file(name, stderr, unique=True)
0304         except PilotException as error:
0305             logger.warning('failed to write utility stderr to file: %s, %s', error, stderr)
0306         else:
0307             logger.debug('wrote %s', name)
0308 
0309     def pre_payload(self, job):
0310         """
0311         Calls to functions to run before payload.
0312         E.g. write time stamps to timing file.
0313 
0314         :param job: job object.
0315         """
0316         # write time stamps to pilot timing file
0317         add_to_pilot_timing(job.jobid, PILOT_PRE_PAYLOAD, time.time(), self.__args)
0318 
0319     def post_payload(self, job):
0320         """
0321         Calls to functions to run after payload.
0322         E.g. write time stamps to timing file.
0323 
0324         :param job: job object
0325         """
0326         # write time stamps to pilot timing file
0327         add_to_pilot_timing(job.jobid, PILOT_POST_PAYLOAD, time.time(), self.__args)
0328 
0329     def run_command(self, cmd, label=None):
0330         """
0331         Execute the given command and return the process id.
0332 
0333         :param cmd: command (string).
0334         :return: process id (int).
0335         """
0336 
0337         if label:
0338             logger.info('\n\n%s:\n\n%s\n', label, cmd)
0339         if label == 'coprocess':
0340             try:
0341                 out = open(os.path.join(self.__job.workdir, self.__coprocess_stdout_name), 'wb')
0342                 err = open(os.path.join(self.__job.workdir, self.__coprocess_stderr_name), 'wb')
0343             except Exception as error:
0344                 logger.warning('failed to open coprocess stdout/err: %s', error)
0345                 out = None
0346                 err = None
0347         else:
0348             out = None
0349             err = None
0350         try:
0351             proc = execute(cmd, workdir=self.__job.workdir, returnproc=True, stdout=out, stderr=err,
0352                            usecontainer=False, cwd=self.__job.workdir, job=self.__job)
0353         except Exception as error:
0354             logger.error('could not execute: %s', error)
0355             return None
0356         if isinstance(proc, tuple) and not proc[0]:
0357             logger.error('failed to execute command')
0358             return None
0359 
0360         logger.info('started %s -- pid=%s executable=%s', label, proc.pid, cmd)
0361 
0362         return proc
0363 
0364     def run_payload(self, job, cmd, out, err):
0365         """
0366         Setup and execute the main payload process.
0367 
0368         REFACTOR using run_command()
0369 
0370         :param job: job object.
0371         :param out: (currently not used; deprecated)
0372         :param err: (currently not used; deprecated)
0373         :return: proc (subprocess returned by Popen())
0374         """
0375 
0376         # main payload process steps
0377 
0378         # add time for PILOT_PRE_PAYLOAD
0379         self.pre_payload(job)
0380 
0381         logger.info("\n\npayload execution command:\n\n%s\n", cmd)
0382         try:
0383             proc = execute(cmd, workdir=job.workdir, returnproc=True,
0384                            usecontainer=True, stdout=out, stderr=err, cwd=job.workdir, job=job)
0385         except Exception as error:
0386             logger.error('could not execute: %s', error)
0387             return None
0388         if isinstance(proc, tuple) and not proc[0]:
0389             logger.error('failed to execute payload')
0390             return None
0391 
0392         logger.info('started -- pid=%s executable=%s', proc.pid, cmd)
0393         job.pid = proc.pid
0394         job.pgrp = os.getpgid(job.pid)
0395         set_pilot_state(job=job, state="running")
0396 
0397         #_cmd = self.utility_with_payload(job)
0398 
0399         self.utility_after_payload_started(job)
0400 
0401         return proc
0402 
0403     def extract_setup(self, cmd):
0404         """
0405         Extract the setup from the payload command (cmd).
0406         E.g. extract the full setup from the payload command will be prepended to the pre/postprocess command.
0407 
0408         :param cmd: payload command (string).
0409         :return: updated secondary command (string).
0410         """
0411 
0412         def cut_str_from(_cmd, _str):
0413             """
0414             Cut the string from the position of the given _cmd
0415             """
0416             return _cmd[:_cmd.find(_str)]
0417 
0418         def cut_str_from_last_semicolon(_cmd):
0419             """
0420             Cut the string from the last semicolon
0421             NOTE: this will not work if jobParams also contain ;
0422             """
0423             # remove any trailing spaces and ;-signs
0424             _cmd = _cmd.strip()
0425             _cmd = _cmd[:-1] if _cmd.endswith(';') else _cmd
0426             last_bit = _cmd.split(';')[-1]
0427             return _cmd.replace(last_bit.strip(), '')
0428 
0429         if '/' in self.__job.transformation:  # e.g. http://pandaserver.cern.ch:25080/trf/user/runHPO-00-00-01
0430             trfname = self.__job.transformation[self.__job.transformation.rfind('/') + 1:]  # runHPO-00-00-01
0431             _trf = './' + trfname
0432         else:
0433             trfname = self.__job.transformation
0434             _trf = './' + self.__job.transformation
0435 
0436         if _trf in cmd:
0437             setup = cut_str_from(cmd, _trf)
0438         elif trfname in cmd:
0439             setup = cut_str_from(cmd, trfname)
0440         else:
0441             setup = cut_str_from_last_semicolon(cmd)
0442 
0443         return setup
0444 
0445     def wait_graceful(self, args, proc):
0446         """
0447         Wait for payload process to finish.
0448 
0449         :param args: Pilot arguments object.
0450         :param proc: Process id (int).
0451         :return: exit code (int).
0452         """
0453 
0454         breaker = False
0455         exit_code = None
0456         try:
0457             iteration = long(0)  # Python 2, do not use 0L since it will create a syntax error in spite of the try # noqa: F821
0458         except Exception:
0459             iteration = 0  # Python 3, long doesn't exist
0460         while True:
0461             time.sleep(0.1)
0462 
0463             iteration += 1
0464             for _ in range(60):  # Python 2/3
0465                 if args.graceful_stop.is_set():
0466                     breaker = True
0467                     logger.info('breaking -- sending SIGTERM pid=%s', proc.pid)
0468                     os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
0469                     break
0470                 exit_code = proc.poll()
0471                 if exit_code is not None:
0472                     break
0473                 time.sleep(1)
0474             if breaker:
0475                 logger.info('breaking -- sleep 3s before sending SIGKILL pid=%s', proc.pid)
0476                 time.sleep(3)
0477                 proc.kill()
0478                 break
0479 
0480             exit_code = proc.poll()
0481 
0482             if iteration % 10 == 0:
0483                 logger.info('running: iteration=%d pid=%s exit_code=%s', iteration, proc.pid, exit_code)
0484             if exit_code is not None:
0485                 break
0486             else:
0487                 continue
0488 
0489         return exit_code
0490 
0491     def get_payload_command(self, job):
0492         """
0493         Return the payload command string.
0494 
0495         :param job: job object.
0496         :return: command (string).
0497         """
0498 
0499         cmd = ""
0500         # for testing looping job: cmd = user.get_payload_command(job) + ';sleep 240'
0501         try:
0502             pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0503             user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user],
0504                               0)  # Python 2/3
0505             cmd = user.get_payload_command(job)  #+ 'sleep 1000'  # to test looping jobs
0506         except PilotException as error:
0507             self.post_setup(job)
0508             import traceback
0509             logger.error(traceback.format_exc())
0510             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code())
0511             self.__traces.pilot['error_code'] = job.piloterrorcodes[0]
0512             logger.fatal(
0513                 'could not define payload command (traces error set to: %d)', self.__traces.pilot['error_code'])
0514 
0515         return cmd
0516 
0517     def run_preprocess(self, job):
0518         """
0519         Run any preprocess payloads.
0520 
0521         :param job: job object.
0522         :return:
0523         """
0524 
0525         exit_code = 0
0526 
0527         try:
0528             # note: this might update the jobparams
0529             cmd_before_payload = self.utility_before_payload(job)
0530         except Exception as error:
0531             logger.error(error)
0532             raise error
0533 
0534         if cmd_before_payload:
0535             cmd_before_payload = job.setup + cmd_before_payload
0536             logger.info("\n\npreprocess execution command:\n\n%s\n", cmd_before_payload)
0537             exit_code = self.execute_utility_command(cmd_before_payload, job, 'preprocess')
0538             if exit_code == 160:
0539                 logger.warning('no more HP points - time to abort processing loop')
0540             elif exit_code == 161:
0541                 logger.warning('no more HP points but at least one point was processed - time to abort processing loop')
0542             elif exit_code == 162:
0543                 logger.warning('loop count reached the limit - time to abort processing loop')
0544             elif exit_code:
0545                 # set error code
0546                 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PREPROCESSFAILURE)
0547                 logger.fatal('cannot continue since preprocess failed: exit_code=%d', exit_code)
0548             else:
0549                 # in case the preprocess produced a command, chmod it
0550                 path = os.path.join(job.workdir, job.containeroptions.get('containerExec', 'does_not_exist'))
0551                 if os.path.exists(path):
0552                     logger.debug('chmod 0o755: %s', path)
0553                     os.chmod(path, 0o755)
0554 
0555         return exit_code
0556 
0557     def run(self):  # noqa: C901
0558         """
0559         Run all payload processes (including pre- and post-processes, and utilities).
0560         In the case of HPO jobs, this function will loop over all processes until the preprocess returns a special
0561         exit code.
0562         :return:
0563         """
0564 
0565         # get the payload command from the user specific code
0566         self.pre_setup(self.__job)
0567 
0568         cmd = self.get_payload_command(self.__job)
0569         # extract the setup in case the preprocess command needs it
0570         self.__job.setup = self.extract_setup(cmd)
0571         self.post_setup(self.__job)
0572 
0573         # a loop is needed for HPO jobs
0574         # abort when nothing more to run, or when the preprocess returns a special exit code
0575         iteration = 0
0576         while True:
0577 
0578             logger.info('payload iteration loop #%d', iteration + 1)
0579             os.environ['PILOT_EXEC_ITERATION_COUNT'] = '%s' % iteration
0580             show_memory_usage()
0581 
0582             # first run the preprocess (if necessary) - note: this might update jobparams -> must update cmd
0583             jobparams_pre = self.__job.jobparams
0584             exit_code = self.run_preprocess(self.__job)
0585             jobparams_post = self.__job.jobparams
0586             if exit_code:
0587                 if exit_code >= 160 and exit_code <= 162:
0588                     exit_code = 0
0589                     # wipe the output file list since there won't be any new files
0590                     # any output files from previous iterations, should have been transferred already
0591                     logger.debug('reset outdata since further output should not be expected after preprocess exit')
0592                     self.__job.outdata = []
0593                 break
0594             if jobparams_pre != jobparams_post:
0595                 logger.debug('jobparams were updated by utility_before_payload()')
0596                 # must update cmd
0597                 cmd = cmd.replace(jobparams_pre, jobparams_post)
0598 
0599             # now run the main payload, when it finishes, run the postprocess (if necessary)
0600             # note: no need to run any main payload in HPO Horovod jobs on Kubernetes
0601             if os.environ.get('HARVESTER_HOROVOD', '') == '':
0602 
0603                 #exit_code, _stdout, _stderr = execute('pgrep -x xrootd | awk \'{print \"ps -p \"$1\" -o args --no-headers --cols 300\"}\' | sh')
0604                 #logger.debug('[before payload start] stdout=%s', _stdout)
0605                 #logger.debug('[before payload start] stderr=%s', _stderr)
0606 
0607                 proc = self.run_payload(self.__job, cmd, self.__out, self.__err)
0608             else:
0609                 proc = None
0610 
0611             proc_co = None
0612             if proc is None:
0613                 # run the post-process command even if there was no main payload
0614                 if os.environ.get('HARVESTER_HOROVOD', '') != '':
0615                     logger.info('No need to execute any main payload')
0616                     exit_code = self.run_utility_after_payload_finished(exit_code, True, UTILITY_AFTER_PAYLOAD_FINISHED2)
0617                     self.post_payload(self.__job)
0618                 else:
0619                     break
0620             else:
0621                 # the process is now running, update the server
0622                 # test 'tobekilled' from here to try payload kill
0623                 send_state(self.__job, self.__args, self.__job.state)
0624 
0625                 # note: when sending a state change to the server, the server might respond with 'tobekilled'
0626                 if self.__job.state == 'failed':
0627                     logger.warning('job state is \'failed\' - abort payload and run()')
0628                     kill_processes(proc.pid)
0629                     break
0630 
0631                 # allow for a secondary command to be started after the payload (e.g. a coprocess)
0632                 utility_cmd = self.get_utility_command(order=UTILITY_AFTER_PAYLOAD_STARTED2)
0633                 if utility_cmd:
0634                     logger.debug('starting utility command: %s', utility_cmd)
0635                     label = 'coprocess' if 'coprocess' in utility_cmd else None
0636                     proc_co = self.run_command(utility_cmd, label=label)
0637 
0638                 logger.info('will wait for graceful exit')
0639                 exit_code = self.wait_graceful(self.__args, proc)
0640                 # reset error if Raythena decided to kill payload (no error)
0641                 if errors.KILLPAYLOAD in self.__job.piloterrorcodes:
0642                     logger.debug('ignoring KILLPAYLOAD error')
0643                     self.__job.piloterrorcodes, self.__job.piloterrordiags = errors.remove_error_code(errors.KILLPAYLOAD,
0644                                                                                                       pilot_error_codes=self.__job.piloterrorcodes,
0645                                                                                                       pilot_error_diags=self.__job.piloterrordiags)
0646                     exit_code = 0
0647                     state = 'finished'
0648                 else:
0649                     state = 'finished' if exit_code == 0 else 'failed'
0650                 set_pilot_state(job=self.__job, state=state)
0651                 logger.info('\n\nfinished pid=%s exit_code=%s state=%s\n', proc.pid, exit_code, self.__job.state)
0652 
0653                 #exit_code, _stdout, _stderr = execute('pgrep -x xrootd | awk \'{print \"ps -p \"$1\" -o args --no-headers --cols 300\"}\' | sh')
0654                 #logger.debug('[after payload finish] stdout=%s', _stdout)
0655                 #logger.debug('[after payload finish] stderr=%s', _stderr)
0656 
0657                 # stop the utility command (e.g. a coprocess if necessary
0658                 if proc_co:
0659                     logger.debug('stopping utility command: %s', utility_cmd)
0660                     kill_processes(proc_co.pid)
0661 
0662                 if exit_code is None:
0663                     logger.warning('detected unset exit_code from wait_graceful - reset to -1')
0664                     exit_code = -1
0665 
0666                 for order in [UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2]:
0667                     exit_code = self.run_utility_after_payload_finished(exit_code, state, order)
0668 
0669                 self.post_payload(self.__job)
0670 
0671                 # stop any running utilities
0672                 if self.__job.utilities != {}:
0673                     self.stop_utilities()
0674 
0675             if self.__job.is_hpo and state != 'failed':
0676                 # in case there are more hyper-parameter points, move away the previous log files
0677                 #self.rename_log_files(iteration)
0678                 iteration += 1
0679             else:
0680                 break
0681 
0682         return exit_code
0683 
0684     def run_utility_after_payload_finished(self, exit_code, state, order):
0685         """
0686         Run utility command after the main payload has finished.
0687         In horovod mode, select the corresponding post-process. Otherwise, select different post-process (e.g. Xcache).
0688 
0689         The order constant can be UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2
0690 
0691         :param exit_code: transform exit code (int).
0692         :param state: payload state; finished/failed (string).
0693         :param order: constant used for utility selection (constant).
0694         :return: exit code (int).
0695         """
0696 
0697         _exit_code = 0
0698         try:
0699             cmd_after_payload, label, ignore_failure = self.utility_after_payload_finished(self.__job, order)
0700         except Exception as error:
0701             logger.error(error)
0702             ignore_failure = False
0703         else:
0704             if cmd_after_payload and self.__job.postprocess and state != 'failed':
0705                 cmd_after_payload = self.__job.setup + cmd_after_payload
0706                 logger.info("\n\npostprocess execution command:\n\n%s\n", cmd_after_payload)
0707                 _exit_code = self.execute_utility_command(cmd_after_payload, self.__job, label)
0708             elif cmd_after_payload:
0709                 logger.info("\n\npostprocess execution command:\n\n%s\n", cmd_after_payload)
0710                 _exit_code = self.execute_utility_command(cmd_after_payload, self.__job, label)
0711 
0712         # only set a new non-zero exit code if exit_code was not already set and ignore_failure is False
0713         # (e.g. any Xcache failure should be ignored to prevent job from failing since exit_code might get updated)
0714         if _exit_code and not exit_code and not ignore_failure:
0715             exit_code = _exit_code
0716 
0717         return exit_code
0718 
0719     def stop_utilities(self):
0720         """
0721         Stop any running utilities.
0722 
0723         :return:
0724         """
0725 
0726         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0727 
0728         for utcmd in list(self.__job.utilities.keys()):  # Python 2/3
0729             utproc = self.__job.utilities[utcmd][0]
0730             if utproc:
0731                 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0732                 sig = user.get_utility_command_kill_signal(utcmd)
0733                 logger.info("stopping process \'%s\' with signal %d", utcmd, sig)
0734                 try:
0735                     os.killpg(os.getpgid(utproc.pid), sig)
0736                 except Exception as error:
0737                     logger.warning('exception caught: %s (ignoring)', error)
0738 
0739                 user.post_utility_command_action(utcmd, self.__job)
0740 
0741     def rename_log_files(self, iteration):
0742         """
0743 
0744         :param iteration:
0745         :return:
0746         """
0747 
0748         names = [self.__preprocess_stdout_name, self.__preprocess_stderr_name,
0749                  self.__postprocess_stdout_name, self.__postprocess_stderr_name]
0750         for name in names:
0751             if os.path.exists(name):
0752                 os.rename(name, name + '%d' % iteration)
0753             else:
0754                 logger.warning('cannot rename %s since it does not exist', name)