File indexing completed on 2026-04-11 08:41:03
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 import time
0015 import os
0016 import signal
0017 from subprocess import PIPE
0018
0019 from pilot.common.errorcodes import ErrorCodes
0020 from pilot.control.job import send_state
0021 from pilot.util.auxiliary import set_pilot_state, show_memory_usage
0022
0023 from pilot.util.container import execute
0024 from pilot.util.constants import UTILITY_BEFORE_PAYLOAD, UTILITY_WITH_PAYLOAD, UTILITY_AFTER_PAYLOAD_STARTED, \
0025 UTILITY_AFTER_PAYLOAD_FINISHED, PILOT_PRE_SETUP, PILOT_POST_SETUP, PILOT_PRE_PAYLOAD, PILOT_POST_PAYLOAD, \
0026 UTILITY_AFTER_PAYLOAD_STARTED2, UTILITY_AFTER_PAYLOAD_FINISHED2
0027 from pilot.util.filehandling import write_file
0028 from pilot.util.processes import kill_processes
0029 from pilot.util.timing import add_to_pilot_timing
0030 from pilot.common.exception import PilotException
0031
0032 import logging
0033 logger = logging.getLogger(__name__)
0034
0035 errors = ErrorCodes()
0036
0037
0038 class Executor(object):
0039 def __init__(self, args, job, out, err, traces):
0040 self.__args = args
0041 self.__job = job
0042 self.__out = out
0043 self.__err = err
0044 self.__traces = traces
0045 self.__preprocess_stdout_name = ''
0046 self.__preprocess_stderr_name = ''
0047 self.__coprocess_stdout_name = 'coprocess_stdout.txt'
0048 self.__coprocess_stderr_name = 'coprocess_stderr.txt'
0049 self.__postprocess_stdout_name = ''
0050 self.__postprocess_stderr_name = ''
0051
0052 def get_job(self):
0053 """
0054 Get the job object.
0055 :return: job object.
0056 """
0057 return self.__job
0058
0059 def pre_setup(self, job):
0060 """
0061 Functions to run pre setup
0062 :param job: job object
0063 """
0064
0065 add_to_pilot_timing(job.jobid, PILOT_PRE_SETUP, time.time(), self.__args)
0066
0067 def post_setup(self, job):
0068 """
0069 Functions to run post setup
0070 :param job: job object
0071 """
0072
0073 add_to_pilot_timing(job.jobid, PILOT_POST_SETUP, time.time(), self.__args)
0074
0075 def utility_before_payload(self, job):
0076 """
0077 Prepare commands/utilities to run before payload.
0078 These commands will be executed later (as eg the payload command setup is unknown at this point, which is
0079 needed for the preprocessing. Preprocessing is prepared here).
0080
0081 REFACTOR
0082
0083 :param job: job object.
0084 """
0085 cmd = ""
0086
0087
0088 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0089 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0090
0091
0092 cmd_dictionary = user.get_utility_commands(order=UTILITY_BEFORE_PAYLOAD, job=job)
0093 if cmd_dictionary:
0094 cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0095 logger.info('utility command (\'%s\') to be executed before the payload: %s', cmd_dictionary.get('label', 'utility'), cmd)
0096
0097 return cmd
0098
0099 def utility_with_payload(self, job):
0100 """
0101 Functions to run with payload.
0102
0103 REFACTOR
0104
0105 :param job: job object.
0106 """
0107 cmd = ""
0108
0109
0110 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0111 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0112
0113
0114 cmd_dictionary = user.get_utility_commands(order=UTILITY_WITH_PAYLOAD, job=job)
0115 if cmd_dictionary:
0116 cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0117 logger.info('utility command (\'%s\') to be executed with the payload: %s', cmd_dictionary.get('label', 'utility'), cmd)
0118
0119 return cmd
0120
0121 def get_utility_command(self, order=None):
0122 """
0123 Return the command for the requested utility command (will be downloaded if necessary).
0124 Note: the utility itself is defined in the user common code and is defined according to the order,
0125 e.g. UTILITY_AFTER_PAYLOAD_STARTED means a co-process (see ATLAS user code).
0126
0127 :param order: order constant (const).
0128 :return: command to be executed (string).
0129 """
0130
0131 cmd = ""
0132
0133
0134 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0135 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0136
0137
0138 cmd_dictionary = user.get_utility_commands(order=order, job=self.__job)
0139 if cmd_dictionary:
0140 cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0141 logger.info('utility command (\'%s\') to be executed after the payload: %s', cmd_dictionary.get('label', 'utility'), cmd)
0142
0143 return cmd
0144
0145 def utility_after_payload_started(self, job):
0146 """
0147 Functions to run after payload started
0148 :param job: job object
0149 """
0150
0151
0152 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0153 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0154
0155
0156 cmd_dictionary = user.get_utility_commands(order=UTILITY_AFTER_PAYLOAD_STARTED, job=job)
0157 if cmd_dictionary:
0158 cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0159 logger.info('utility command to be executed after the payload: %s', cmd)
0160
0161
0162 utilitycommand = user.get_utility_command_setup(cmd_dictionary.get('command'), job)
0163 if not utilitycommand:
0164 logger.warning('empty utility command - nothing to run')
0165 return
0166 try:
0167 proc1 = execute(utilitycommand, workdir=job.workdir, returnproc=True, usecontainer=False,
0168 stdout=PIPE, stderr=PIPE, cwd=job.workdir, job=job)
0169 except Exception as error:
0170 logger.error('could not execute: %s', error)
0171 else:
0172
0173
0174 job.utilities[cmd_dictionary.get('command')] = [proc1, 1, utilitycommand]
0175
0176 def utility_after_payload_started_new(self, job):
0177 """
0178 Functions to run after payload started
0179
0180 REFACTOR
0181
0182 :param job: job object
0183 """
0184 cmd = ""
0185
0186
0187 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0188 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0189
0190
0191 cmd_dictionary = user.get_utility_commands(order=UTILITY_AFTER_PAYLOAD_STARTED, job=job)
0192 if cmd_dictionary:
0193 cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0194 logger.info('utility command to be executed after the payload: %s', cmd)
0195
0196 return cmd
0197
0198
0199
0200
0201
0202
0203
0204
0205
0206
0207
0208
0209
0210
0211
0212
0213 def utility_after_payload_finished(self, job, order):
0214 """
0215 Prepare commands/utilities to run after payload has finished.
0216
0217 This command will be executed later.
0218
0219 The order constant can be UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2
0220
0221 :param job: job object.
0222 :param order: constant used for utility selection (constant).
0223 :return: command (string), label (string).
0224 """
0225
0226 cmd = ""
0227
0228
0229 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0230 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0231
0232
0233 cmd_dictionary = user.get_utility_commands(order=order, job=job)
0234 if cmd_dictionary:
0235 cmd = '%s %s' % (cmd_dictionary.get('command'), cmd_dictionary.get('args'))
0236 logger.info('utility command (\'%s\') to be executed after the payload has finished: %s', cmd_dictionary.get('label', 'utility'), cmd)
0237
0238 return cmd, cmd_dictionary.get('label'), cmd_dictionary.get('ignore_failure')
0239
0240 def execute_utility_command(self, cmd, job, label):
0241 """
0242 Execute a utility command (e.g. pre/postprocess commands; label=preprocess etc).
0243
0244 :param cmd: full command to be executed (string).
0245 :param job: job object.
0246 :param label: command label (string).
0247 :return: exit code (int).
0248 """
0249
0250 exit_code, stdout, stderr = execute(cmd, workdir=job.workdir, cwd=job.workdir, usecontainer=False)
0251 if exit_code:
0252 ignored_exit_codes = [160, 161, 162]
0253 logger.warning('command returned non-zero exit code: %s (exit code = %d) - see utility logs for details', cmd, exit_code)
0254 if label == 'preprocess':
0255 err = errors.PREPROCESSFAILURE
0256 elif label == 'postprocess':
0257 err = errors.POSTPROCESSFAILURE
0258 else:
0259 err = 0
0260 exit_code = 0
0261 if err and exit_code not in ignored_exit_codes:
0262 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(err)
0263 if exit_code in ignored_exit_codes:
0264 job.transexitcode = exit_code
0265
0266
0267 self.write_utility_output(job.workdir, label, stdout, stderr)
0268
0269 return exit_code
0270
0271 def write_utility_output(self, workdir, step, stdout, stderr):
0272 """
0273 Write the utility command output to stdout, stderr files to the job.workdir for the current step.
0274 -> <step>_stdout.txt, <step>_stderr.txt
0275 Example of step: preprocess, postprocess.
0276
0277 :param workdir: job workdir (string).
0278 :param step: utility step (string).
0279 :param stdout: command stdout (string).
0280 :param stderr: command stderr (string).
0281 :return:
0282 """
0283
0284
0285 try:
0286 name_stdout = step + '_stdout.txt'
0287 name_stderr = step + '_stderr.txt'
0288 if step == 'preprocess':
0289 self.__preprocess_stdout_name = name_stdout
0290 self.__preprocess_stderr_name = name_stderr
0291 elif step == 'postprocess':
0292 self.__postprocess_stdout_name = name_stdout
0293 self.__postprocess_stderr_name = name_stderr
0294 name = os.path.join(workdir, step + '_stdout.txt')
0295 write_file(name, stdout, unique=True)
0296 except PilotException as error:
0297 logger.warning('failed to write utility stdout to file: %s, %s', error, stdout)
0298 else:
0299 logger.debug('wrote %s', name)
0300
0301 try:
0302 name = os.path.join(workdir, step + '_stderr.txt')
0303 write_file(name, stderr, unique=True)
0304 except PilotException as error:
0305 logger.warning('failed to write utility stderr to file: %s, %s', error, stderr)
0306 else:
0307 logger.debug('wrote %s', name)
0308
0309 def pre_payload(self, job):
0310 """
0311 Calls to functions to run before payload.
0312 E.g. write time stamps to timing file.
0313
0314 :param job: job object.
0315 """
0316
0317 add_to_pilot_timing(job.jobid, PILOT_PRE_PAYLOAD, time.time(), self.__args)
0318
0319 def post_payload(self, job):
0320 """
0321 Calls to functions to run after payload.
0322 E.g. write time stamps to timing file.
0323
0324 :param job: job object
0325 """
0326
0327 add_to_pilot_timing(job.jobid, PILOT_POST_PAYLOAD, time.time(), self.__args)
0328
0329 def run_command(self, cmd, label=None):
0330 """
0331 Execute the given command and return the process id.
0332
0333 :param cmd: command (string).
0334 :return: process id (int).
0335 """
0336
0337 if label:
0338 logger.info('\n\n%s:\n\n%s\n', label, cmd)
0339 if label == 'coprocess':
0340 try:
0341 out = open(os.path.join(self.__job.workdir, self.__coprocess_stdout_name), 'wb')
0342 err = open(os.path.join(self.__job.workdir, self.__coprocess_stderr_name), 'wb')
0343 except Exception as error:
0344 logger.warning('failed to open coprocess stdout/err: %s', error)
0345 out = None
0346 err = None
0347 else:
0348 out = None
0349 err = None
0350 try:
0351 proc = execute(cmd, workdir=self.__job.workdir, returnproc=True, stdout=out, stderr=err,
0352 usecontainer=False, cwd=self.__job.workdir, job=self.__job)
0353 except Exception as error:
0354 logger.error('could not execute: %s', error)
0355 return None
0356 if isinstance(proc, tuple) and not proc[0]:
0357 logger.error('failed to execute command')
0358 return None
0359
0360 logger.info('started %s -- pid=%s executable=%s', label, proc.pid, cmd)
0361
0362 return proc
0363
0364 def run_payload(self, job, cmd, out, err):
0365 """
0366 Setup and execute the main payload process.
0367
0368 REFACTOR using run_command()
0369
0370 :param job: job object.
0371 :param out: (currently not used; deprecated)
0372 :param err: (currently not used; deprecated)
0373 :return: proc (subprocess returned by Popen())
0374 """
0375
0376
0377
0378
0379 self.pre_payload(job)
0380
0381 logger.info("\n\npayload execution command:\n\n%s\n", cmd)
0382 try:
0383 proc = execute(cmd, workdir=job.workdir, returnproc=True,
0384 usecontainer=True, stdout=out, stderr=err, cwd=job.workdir, job=job)
0385 except Exception as error:
0386 logger.error('could not execute: %s', error)
0387 return None
0388 if isinstance(proc, tuple) and not proc[0]:
0389 logger.error('failed to execute payload')
0390 return None
0391
0392 logger.info('started -- pid=%s executable=%s', proc.pid, cmd)
0393 job.pid = proc.pid
0394 job.pgrp = os.getpgid(job.pid)
0395 set_pilot_state(job=job, state="running")
0396
0397
0398
0399 self.utility_after_payload_started(job)
0400
0401 return proc
0402
0403 def extract_setup(self, cmd):
0404 """
0405 Extract the setup from the payload command (cmd).
0406 E.g. extract the full setup from the payload command will be prepended to the pre/postprocess command.
0407
0408 :param cmd: payload command (string).
0409 :return: updated secondary command (string).
0410 """
0411
0412 def cut_str_from(_cmd, _str):
0413 """
0414 Cut the string from the position of the given _cmd
0415 """
0416 return _cmd[:_cmd.find(_str)]
0417
0418 def cut_str_from_last_semicolon(_cmd):
0419 """
0420 Cut the string from the last semicolon
0421 NOTE: this will not work if jobParams also contain ;
0422 """
0423
0424 _cmd = _cmd.strip()
0425 _cmd = _cmd[:-1] if _cmd.endswith(';') else _cmd
0426 last_bit = _cmd.split(';')[-1]
0427 return _cmd.replace(last_bit.strip(), '')
0428
0429 if '/' in self.__job.transformation:
0430 trfname = self.__job.transformation[self.__job.transformation.rfind('/') + 1:]
0431 _trf = './' + trfname
0432 else:
0433 trfname = self.__job.transformation
0434 _trf = './' + self.__job.transformation
0435
0436 if _trf in cmd:
0437 setup = cut_str_from(cmd, _trf)
0438 elif trfname in cmd:
0439 setup = cut_str_from(cmd, trfname)
0440 else:
0441 setup = cut_str_from_last_semicolon(cmd)
0442
0443 return setup
0444
0445 def wait_graceful(self, args, proc):
0446 """
0447 Wait for payload process to finish.
0448
0449 :param args: Pilot arguments object.
0450 :param proc: Process id (int).
0451 :return: exit code (int).
0452 """
0453
0454 breaker = False
0455 exit_code = None
0456 try:
0457 iteration = long(0)
0458 except Exception:
0459 iteration = 0
0460 while True:
0461 time.sleep(0.1)
0462
0463 iteration += 1
0464 for _ in range(60):
0465 if args.graceful_stop.is_set():
0466 breaker = True
0467 logger.info('breaking -- sending SIGTERM pid=%s', proc.pid)
0468 os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
0469 break
0470 exit_code = proc.poll()
0471 if exit_code is not None:
0472 break
0473 time.sleep(1)
0474 if breaker:
0475 logger.info('breaking -- sleep 3s before sending SIGKILL pid=%s', proc.pid)
0476 time.sleep(3)
0477 proc.kill()
0478 break
0479
0480 exit_code = proc.poll()
0481
0482 if iteration % 10 == 0:
0483 logger.info('running: iteration=%d pid=%s exit_code=%s', iteration, proc.pid, exit_code)
0484 if exit_code is not None:
0485 break
0486 else:
0487 continue
0488
0489 return exit_code
0490
0491 def get_payload_command(self, job):
0492 """
0493 Return the payload command string.
0494
0495 :param job: job object.
0496 :return: command (string).
0497 """
0498
0499 cmd = ""
0500
0501 try:
0502 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0503 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user],
0504 0)
0505 cmd = user.get_payload_command(job)
0506 except PilotException as error:
0507 self.post_setup(job)
0508 import traceback
0509 logger.error(traceback.format_exc())
0510 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code())
0511 self.__traces.pilot['error_code'] = job.piloterrorcodes[0]
0512 logger.fatal(
0513 'could not define payload command (traces error set to: %d)', self.__traces.pilot['error_code'])
0514
0515 return cmd
0516
0517 def run_preprocess(self, job):
0518 """
0519 Run any preprocess payloads.
0520
0521 :param job: job object.
0522 :return:
0523 """
0524
0525 exit_code = 0
0526
0527 try:
0528
0529 cmd_before_payload = self.utility_before_payload(job)
0530 except Exception as error:
0531 logger.error(error)
0532 raise error
0533
0534 if cmd_before_payload:
0535 cmd_before_payload = job.setup + cmd_before_payload
0536 logger.info("\n\npreprocess execution command:\n\n%s\n", cmd_before_payload)
0537 exit_code = self.execute_utility_command(cmd_before_payload, job, 'preprocess')
0538 if exit_code == 160:
0539 logger.warning('no more HP points - time to abort processing loop')
0540 elif exit_code == 161:
0541 logger.warning('no more HP points but at least one point was processed - time to abort processing loop')
0542 elif exit_code == 162:
0543 logger.warning('loop count reached the limit - time to abort processing loop')
0544 elif exit_code:
0545
0546 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PREPROCESSFAILURE)
0547 logger.fatal('cannot continue since preprocess failed: exit_code=%d', exit_code)
0548 else:
0549
0550 path = os.path.join(job.workdir, job.containeroptions.get('containerExec', 'does_not_exist'))
0551 if os.path.exists(path):
0552 logger.debug('chmod 0o755: %s', path)
0553 os.chmod(path, 0o755)
0554
0555 return exit_code
0556
0557 def run(self):
0558 """
0559 Run all payload processes (including pre- and post-processes, and utilities).
0560 In the case of HPO jobs, this function will loop over all processes until the preprocess returns a special
0561 exit code.
0562 :return:
0563 """
0564
0565
0566 self.pre_setup(self.__job)
0567
0568 cmd = self.get_payload_command(self.__job)
0569
0570 self.__job.setup = self.extract_setup(cmd)
0571 self.post_setup(self.__job)
0572
0573
0574
0575 iteration = 0
0576 while True:
0577
0578 logger.info('payload iteration loop #%d', iteration + 1)
0579 os.environ['PILOT_EXEC_ITERATION_COUNT'] = '%s' % iteration
0580 show_memory_usage()
0581
0582
0583 jobparams_pre = self.__job.jobparams
0584 exit_code = self.run_preprocess(self.__job)
0585 jobparams_post = self.__job.jobparams
0586 if exit_code:
0587 if exit_code >= 160 and exit_code <= 162:
0588 exit_code = 0
0589
0590
0591 logger.debug('reset outdata since further output should not be expected after preprocess exit')
0592 self.__job.outdata = []
0593 break
0594 if jobparams_pre != jobparams_post:
0595 logger.debug('jobparams were updated by utility_before_payload()')
0596
0597 cmd = cmd.replace(jobparams_pre, jobparams_post)
0598
0599
0600
0601 if os.environ.get('HARVESTER_HOROVOD', '') == '':
0602
0603
0604
0605
0606
0607 proc = self.run_payload(self.__job, cmd, self.__out, self.__err)
0608 else:
0609 proc = None
0610
0611 proc_co = None
0612 if proc is None:
0613
0614 if os.environ.get('HARVESTER_HOROVOD', '') != '':
0615 logger.info('No need to execute any main payload')
0616 exit_code = self.run_utility_after_payload_finished(exit_code, True, UTILITY_AFTER_PAYLOAD_FINISHED2)
0617 self.post_payload(self.__job)
0618 else:
0619 break
0620 else:
0621
0622
0623 send_state(self.__job, self.__args, self.__job.state)
0624
0625
0626 if self.__job.state == 'failed':
0627 logger.warning('job state is \'failed\' - abort payload and run()')
0628 kill_processes(proc.pid)
0629 break
0630
0631
0632 utility_cmd = self.get_utility_command(order=UTILITY_AFTER_PAYLOAD_STARTED2)
0633 if utility_cmd:
0634 logger.debug('starting utility command: %s', utility_cmd)
0635 label = 'coprocess' if 'coprocess' in utility_cmd else None
0636 proc_co = self.run_command(utility_cmd, label=label)
0637
0638 logger.info('will wait for graceful exit')
0639 exit_code = self.wait_graceful(self.__args, proc)
0640
0641 if errors.KILLPAYLOAD in self.__job.piloterrorcodes:
0642 logger.debug('ignoring KILLPAYLOAD error')
0643 self.__job.piloterrorcodes, self.__job.piloterrordiags = errors.remove_error_code(errors.KILLPAYLOAD,
0644 pilot_error_codes=self.__job.piloterrorcodes,
0645 pilot_error_diags=self.__job.piloterrordiags)
0646 exit_code = 0
0647 state = 'finished'
0648 else:
0649 state = 'finished' if exit_code == 0 else 'failed'
0650 set_pilot_state(job=self.__job, state=state)
0651 logger.info('\n\nfinished pid=%s exit_code=%s state=%s\n', proc.pid, exit_code, self.__job.state)
0652
0653
0654
0655
0656
0657
0658 if proc_co:
0659 logger.debug('stopping utility command: %s', utility_cmd)
0660 kill_processes(proc_co.pid)
0661
0662 if exit_code is None:
0663 logger.warning('detected unset exit_code from wait_graceful - reset to -1')
0664 exit_code = -1
0665
0666 for order in [UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2]:
0667 exit_code = self.run_utility_after_payload_finished(exit_code, state, order)
0668
0669 self.post_payload(self.__job)
0670
0671
0672 if self.__job.utilities != {}:
0673 self.stop_utilities()
0674
0675 if self.__job.is_hpo and state != 'failed':
0676
0677
0678 iteration += 1
0679 else:
0680 break
0681
0682 return exit_code
0683
0684 def run_utility_after_payload_finished(self, exit_code, state, order):
0685 """
0686 Run utility command after the main payload has finished.
0687 In horovod mode, select the corresponding post-process. Otherwise, select different post-process (e.g. Xcache).
0688
0689 The order constant can be UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2
0690
0691 :param exit_code: transform exit code (int).
0692 :param state: payload state; finished/failed (string).
0693 :param order: constant used for utility selection (constant).
0694 :return: exit code (int).
0695 """
0696
0697 _exit_code = 0
0698 try:
0699 cmd_after_payload, label, ignore_failure = self.utility_after_payload_finished(self.__job, order)
0700 except Exception as error:
0701 logger.error(error)
0702 ignore_failure = False
0703 else:
0704 if cmd_after_payload and self.__job.postprocess and state != 'failed':
0705 cmd_after_payload = self.__job.setup + cmd_after_payload
0706 logger.info("\n\npostprocess execution command:\n\n%s\n", cmd_after_payload)
0707 _exit_code = self.execute_utility_command(cmd_after_payload, self.__job, label)
0708 elif cmd_after_payload:
0709 logger.info("\n\npostprocess execution command:\n\n%s\n", cmd_after_payload)
0710 _exit_code = self.execute_utility_command(cmd_after_payload, self.__job, label)
0711
0712
0713
0714 if _exit_code and not exit_code and not ignore_failure:
0715 exit_code = _exit_code
0716
0717 return exit_code
0718
0719 def stop_utilities(self):
0720 """
0721 Stop any running utilities.
0722
0723 :return:
0724 """
0725
0726 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0727
0728 for utcmd in list(self.__job.utilities.keys()):
0729 utproc = self.__job.utilities[utcmd][0]
0730 if utproc:
0731 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0732 sig = user.get_utility_command_kill_signal(utcmd)
0733 logger.info("stopping process \'%s\' with signal %d", utcmd, sig)
0734 try:
0735 os.killpg(os.getpgid(utproc.pid), sig)
0736 except Exception as error:
0737 logger.warning('exception caught: %s (ignoring)', error)
0738
0739 user.post_utility_command_action(utcmd, self.__job)
0740
0741 def rename_log_files(self, iteration):
0742 """
0743
0744 :param iteration:
0745 :return:
0746 """
0747
0748 names = [self.__preprocess_stdout_name, self.__preprocess_stderr_name,
0749 self.__postprocess_stdout_name, self.__postprocess_stderr_name]
0750 for name in names:
0751 if os.path.exists(name):
0752 os.rename(name, name + '%d' % iteration)
0753 else:
0754 logger.warning('cannot rename %s since it does not exist', name)