File indexing completed on 2026-04-10 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import os
0013 import time
0014 from subprocess import PIPE
0015 from glob import glob
0016
0017 from pilot.common.errorcodes import ErrorCodes
0018 from pilot.util.auxiliary import set_pilot_state, show_memory_usage
0019 from pilot.util.config import config
0020 from pilot.util.container import execute
0021 from pilot.util.filehandling import get_disk_usage, remove_files, get_local_file_size, read_file
0022 from pilot.util.loopingjob import looping_job
0023 from pilot.util.math import convert_mb_to_b, human2bytes
0024 from pilot.util.parameters import convert_to_int, get_maximum_input_sizes
0025 from pilot.util.processes import get_current_cpu_consumption_time, kill_processes, get_number_of_child_processes
0026 from pilot.util.workernode import get_local_disk_space, check_hz
0027
0028 import logging
0029 logger = logging.getLogger(__name__)
0030
0031 errors = ErrorCodes()
0032
0033
0034 def job_monitor_tasks(job, mt, args):
0035 """
0036 Perform the tasks for the job monitoring.
0037 The function is called once a minute. Individual checks will be performed at any desired time interval (>= 1
0038 minute).
0039
0040 :param job: job object.
0041 :param mt: `MonitoringTime` object.
0042 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0043 :return: exit code (int), diagnostics (string).
0044 """
0045
0046 exit_code = 0
0047 diagnostics = ""
0048
0049 current_time = int(time.time())
0050
0051
0052 if job.state == 'running':
0053
0054 check_hz()
0055 try:
0056 cpuconsumptiontime = get_current_cpu_consumption_time(job.pid)
0057 except Exception as error:
0058 diagnostics = "Exception caught: %s" % error
0059 logger.warning(diagnostics)
0060 exit_code = get_exception_error_code(diagnostics)
0061 return exit_code, diagnostics
0062 else:
0063 job.cpuconsumptiontime = int(round(cpuconsumptiontime))
0064 job.cpuconversionfactor = 1.0
0065 logger.info('CPU consumption time for pid=%d: %f (rounded to %d)', job.pid, cpuconsumptiontime, job.cpuconsumptiontime)
0066
0067
0068 set_number_used_cores(job)
0069
0070
0071 exit_code, diagnostics = verify_memory_usage(current_time, mt, job)
0072 if exit_code != 0:
0073 return exit_code, diagnostics
0074
0075
0076 display_oom_info(job.pid)
0077
0078
0079 exit_code, diagnostics = should_abort_payload(current_time, mt)
0080 if exit_code != 0:
0081 return exit_code, diagnostics
0082
0083
0084
0085
0086
0087
0088
0089 if args.verify_proxy:
0090 exit_code, diagnostics = verify_user_proxy(current_time, mt)
0091 if exit_code != 0:
0092 return exit_code, diagnostics
0093
0094
0095 exit_code, diagnostics = verify_looping_job(current_time, mt, job)
0096 if exit_code != 0:
0097 return exit_code, diagnostics
0098
0099
0100 exit_code, diagnostics = verify_disk_usage(current_time, mt, job)
0101 if exit_code != 0:
0102 return exit_code, diagnostics
0103
0104
0105 if job.pid:
0106 exit_code, diagnostics = verify_running_processes(current_time, mt, job.pid)
0107 if exit_code != 0:
0108 return exit_code, diagnostics
0109
0110
0111 if job.utilities != {}:
0112 utility_monitor(job)
0113
0114 return exit_code, diagnostics
0115
0116
0117 def display_oom_info(payload_pid):
0118 """
0119 Display OOM process info.
0120
0121 :param payload_pid: payload pid (int).
0122 """
0123
0124 payload_score = get_score(payload_pid) if payload_pid else 'UNKNOWN'
0125 pilot_score = get_score(os.getpid())
0126 logger.info('oom_score(pilot) = %s, oom_score(payload) = %s', pilot_score, payload_score)
0127
0128
0129 def get_score(pid):
0130 """
0131 Get the OOM process score.
0132
0133 :param pid: process id (int).
0134 :return: score (string).
0135 """
0136
0137 try:
0138 score = '%s' % read_file('/proc/%d/oom_score' % pid)
0139 except Exception as error:
0140 logger.warning('caught exception reading oom_score: %s', error)
0141 score = 'UNKNOWN'
0142 else:
0143 if score.endswith('\n'):
0144 score = score[:-1]
0145
0146 return score
0147
0148
0149 def get_exception_error_code(diagnostics):
0150 """
0151 Identify a suitable error code to a given exception.
0152
0153 :param diagnostics: exception diagnostics (string).
0154 :return: exit_code
0155 """
0156
0157 import traceback
0158 logger.warning(traceback.format_exc())
0159 if "Resource temporarily unavailable" in diagnostics:
0160 exit_code = errors.RESOURCEUNAVAILABLE
0161 elif "No such file or directory" in diagnostics:
0162 exit_code = errors.STATFILEPROBLEM
0163 elif "No such process" in diagnostics:
0164 exit_code = errors.NOSUCHPROCESS
0165 else:
0166 exit_code = errors.GENERALCPUCALCPROBLEM
0167
0168 return exit_code
0169
0170
0171 def set_number_used_cores(job):
0172 """
0173 Set the number of cores used by the payload.
0174 The number of actual used cores is reported with job metrics (if set).
0175
0176 :param job: job object.
0177 :return:
0178 """
0179
0180 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0181 cpu = __import__('pilot.user.%s.cpu' % pilot_user, globals(), locals(), [pilot_user], 0)
0182 cpu.set_core_counts(job)
0183
0184
0185 def verify_memory_usage(current_time, mt, job):
0186 """
0187 Verify the memory usage (optional).
0188 Note: this function relies on a stand-alone memory monitor tool that may be executed by the Pilot.
0189
0190 :param current_time: current time at the start of the monitoring loop (int).
0191 :param mt: measured time object.
0192 :param job: job object.
0193 :return: exit code (int), error diagnostics (string).
0194 """
0195
0196 show_memory_usage()
0197
0198 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0199 memory = __import__('pilot.user.%s.memory' % pilot_user, globals(), locals(), [pilot_user], 0)
0200
0201 if not memory.allow_memory_usage_verifications():
0202 return 0, ""
0203
0204
0205 memory_verification_time = convert_to_int(config.Pilot.memory_usage_verification_time, default=60)
0206 if current_time - mt.get('ct_memory') > memory_verification_time:
0207
0208 try:
0209 exit_code, diagnostics = memory.memory_usage(job)
0210 except Exception as error:
0211 logger.warning('caught exception: %s', error)
0212 exit_code = -1
0213 if exit_code != 0:
0214 logger.warning('ignoring failure to parse memory monitor output')
0215
0216 else:
0217
0218 mt.update('ct_memory')
0219
0220 return 0, ""
0221
0222
0223 def should_abort_payload(current_time, mt):
0224 """
0225 Should the pilot abort the payload?
0226 In the case of Raythena, the Driver is monitoring the time to end jobs and may decide
0227 that the pilot should abort the payload. Internally, this is achieved by letting the Actors
0228 know it's time to end, and they in turn contacts the pilot by placing a 'pilot_kill_payload' file
0229 in the run directory.
0230
0231 :param current_time: current time at the start of the monitoring loop (int).
0232 :param mt: measured time object.
0233 :return: exit code (int), error diagnostics (string).
0234 """
0235
0236
0237 killing_time = convert_to_int(config.Pilot.kill_instruction_time, default=600)
0238 if current_time - mt.get('ct_kill') > killing_time:
0239 path = os.path.join(os.environ.get('PILOT_HOME'), config.Pilot.kill_instruction_filename)
0240 if os.path.exists(path):
0241 logger.info('pilot encountered payload kill instruction file - will abort payload')
0242 return errors.KILLPAYLOAD, ""
0243
0244 return 0, ""
0245
0246
0247 def verify_user_proxy(current_time, mt):
0248 """
0249 Verify the user proxy.
0250 This function is called by the job_monitor_tasks() function.
0251
0252 :param current_time: current time at the start of the monitoring loop (int).
0253 :param mt: measured time object.
0254 :return: exit code (int), error diagnostics (string).
0255 """
0256
0257 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0258 userproxy = __import__('pilot.user.%s.proxy' % pilot_user, globals(), locals(), [pilot_user], 0)
0259
0260
0261 proxy_verification_time = convert_to_int(config.Pilot.proxy_verification_time, default=600)
0262 if current_time - mt.get('ct_proxy') > proxy_verification_time:
0263
0264 exit_code, diagnostics = userproxy.verify_proxy(test=False)
0265 if exit_code != 0:
0266 return exit_code, diagnostics
0267 else:
0268
0269 mt.update('ct_proxy')
0270
0271 return 0, ""
0272
0273
0274 def verify_looping_job(current_time, mt, job):
0275 """
0276 Verify that the job is not looping.
0277
0278 :param current_time: current time at the start of the monitoring loop (int).
0279 :param mt: measured time object.
0280 :param job: job object.
0281 :return: exit code (int), error diagnostics (string).
0282 """
0283
0284
0285 if not job.looping_check:
0286 logger.debug('looping check not desired')
0287 return 0, ""
0288
0289 looping_verification_time = convert_to_int(config.Pilot.looping_verification_time, default=600)
0290 if current_time - mt.get('ct_looping') > looping_verification_time:
0291
0292 try:
0293 exit_code, diagnostics = looping_job(job, mt)
0294 except Exception as error:
0295 diagnostics = 'exception caught in looping job algorithm: %s' % error
0296 logger.warning(diagnostics)
0297 if "No module named" in diagnostics:
0298 exit_code = errors.BLACKHOLE
0299 else:
0300 exit_code = errors.UNKNOWNEXCEPTION
0301 return exit_code, diagnostics
0302 else:
0303 if exit_code != 0:
0304 return exit_code, diagnostics
0305
0306
0307 mt.update('ct_looping')
0308
0309 return 0, ""
0310
0311
0312 def verify_disk_usage(current_time, mt, job):
0313 """
0314 Verify the disk usage.
0315 The function checks 1) payload stdout size, 2) local space, 3) work directory size, 4) output file sizes.
0316
0317 :param current_time: current time at the start of the monitoring loop (int).
0318 :param mt: measured time object.
0319 :param job: job object.
0320 :return: exit code (int), error diagnostics (string).
0321 """
0322
0323 disk_space_verification_time = convert_to_int(config.Pilot.disk_space_verification_time, default=300)
0324 if current_time - mt.get('ct_diskspace') > disk_space_verification_time:
0325
0326
0327
0328 exit_code, diagnostics = check_payload_stdout(job)
0329 if exit_code != 0:
0330 return exit_code, diagnostics
0331
0332
0333 exit_code, diagnostics = check_local_space(initial=False)
0334 if exit_code != 0:
0335 return exit_code, diagnostics
0336
0337
0338 exit_code, diagnostics = check_work_dir(job)
0339 if exit_code != 0:
0340 return exit_code, diagnostics
0341
0342
0343 exit_code, diagnostics = check_output_file_sizes(job)
0344 if exit_code != 0:
0345 return exit_code, diagnostics
0346
0347
0348 mt.update('ct_diskspace')
0349
0350 return 0, ""
0351
0352
0353 def verify_running_processes(current_time, mt, pid):
0354 """
0355 Verify the number of running processes.
0356 The function sets the environmental variable PILOT_MAXNPROC to the maximum number of found (child) processes
0357 corresponding to the main payload process id.
0358 The function does not return an error code (always returns exit code 0).
0359
0360 :param current_time: current time at the start of the monitoring loop (int).
0361 :param mt: measured time object.
0362 :param pid: payload process id (int).
0363 :return: exit code (int), error diagnostics (string).
0364 """
0365
0366 nproc_env = 0
0367
0368 process_verification_time = convert_to_int(config.Pilot.process_verification_time, default=300)
0369 if current_time - mt.get('ct_process') > process_verification_time:
0370
0371 nproc = get_number_of_child_processes(pid)
0372 try:
0373 nproc_env = int(os.environ.get('PILOT_MAXNPROC', 0))
0374 except Exception as error:
0375 logger.warning('failed to convert PILOT_MAXNPROC to int: %s', error)
0376 else:
0377 if nproc > nproc_env:
0378
0379 os.environ['PILOT_MAXNPROC'] = str(nproc)
0380
0381 if nproc_env > 0:
0382 logger.info('maximum number of monitored processes: %d', nproc_env)
0383
0384 return 0, ""
0385
0386
0387 def utility_monitor(job):
0388 """
0389 Make sure that any utility commands are still running.
0390 In case a utility tool has crashed, this function may restart the process.
0391 The function is used by the job monitor thread.
0392
0393 :param job: job object.
0394 :return:
0395 """
0396
0397 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0398 usercommon = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0399
0400
0401 for utcmd in list(job.utilities.keys()):
0402
0403
0404 utproc = job.utilities[utcmd][0]
0405 if not utproc.poll() is None:
0406 if job.state == 'finished' or job.state == 'failed' or job.state == 'stageout':
0407 logger.debug('no need to restart utility command since payload has finished running')
0408 continue
0409
0410
0411
0412 utility_subprocess_launches = job.utilities[utcmd][1]
0413 if utility_subprocess_launches <= 5:
0414 logger.warning('detected crashed utility subprocess - will restart it')
0415 utility_command = job.utilities[utcmd][2]
0416
0417 try:
0418 proc1 = execute(utility_command, workdir=job.workdir, returnproc=True, usecontainer=False,
0419 stdout=PIPE, stderr=PIPE, cwd=job.workdir, queuedata=job.infosys.queuedata)
0420 except Exception as error:
0421 logger.error('could not execute: %s', error)
0422 else:
0423
0424
0425 job.utilities[utcmd] = [proc1, utility_subprocess_launches + 1, utility_command]
0426 else:
0427 logger.warning('detected crashed utility subprocess - too many restarts, will not restart %s again', utcmd)
0428 else:
0429 filename = usercommon.get_utility_command_output_filename(utcmd, selector=True)
0430 path = os.path.join(job.workdir, filename)
0431 if not os.path.exists(path):
0432 logger.warning('file: %s does not exist', path)
0433
0434 time.sleep(10)
0435
0436
0437 def get_local_size_limit_stdout(bytes=True):
0438 """
0439 Return a proper value for the local size limit for payload stdout (from config file).
0440
0441 :param bytes: boolean (if True, convert kB to Bytes).
0442 :return: size limit (int).
0443 """
0444
0445 try:
0446 localsizelimit_stdout = int(config.Pilot.local_size_limit_stdout)
0447 except Exception as error:
0448 localsizelimit_stdout = 2097152
0449 logger.warning('bad value in config for local_size_limit_stdout: %s (will use value: %d kB)', error, localsizelimit_stdout)
0450
0451
0452 if bytes:
0453 localsizelimit_stdout *= 1024
0454
0455 return localsizelimit_stdout
0456
0457
0458 def check_payload_stdout(job):
0459 """
0460 Check the size of the payload stdout.
0461
0462 :param job: job object.
0463 :return: exit code (int), diagnostics (string).
0464 """
0465
0466 exit_code = 0
0467 diagnostics = ""
0468
0469
0470 file_list = glob(os.path.join(job.workdir, 'log.*'))
0471
0472
0473 n_jobs = job.jobparams.count("\n") + 1
0474 for _i in range(n_jobs):
0475
0476 _stdout = config.Payload.payloadstdout
0477 if n_jobs > 1:
0478 _stdout = _stdout.replace(".txt", "_%d.txt" % (_i + 1))
0479
0480
0481 file_list.append(os.path.join(job.workdir, _stdout))
0482
0483 tmp_list = glob(os.path.join(job.workdir, 'workDir/tmp.stdout.*'))
0484 if tmp_list:
0485 file_list += tmp_list
0486 logger.debug('file list=%s' % str(file_list))
0487
0488
0489 for filename in file_list:
0490
0491 logger.debug('check_payload_stdout: filename=%s', filename)
0492 if "job.log.tgz" in filename:
0493 logger.info("skipping file size check of file (%s) since it is a special log file", filename)
0494 continue
0495
0496 if os.path.exists(filename):
0497 try:
0498
0499 fsize = os.path.getsize(filename)
0500 except Exception as error:
0501 logger.warning("could not read file size of %s: %s", filename, error)
0502 else:
0503
0504 localsizelimit_stdout = get_local_size_limit_stdout()
0505 if fsize > localsizelimit_stdout:
0506 exit_code = errors.STDOUTTOOBIG
0507 diagnostics = "Payload stdout file too big: %d B (larger than limit %d B)" % \
0508 (fsize, localsizelimit_stdout)
0509 logger.warning(diagnostics)
0510
0511
0512 set_pilot_state(job=job, state="failed")
0513 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code)
0514 kill_processes(job.pid)
0515
0516
0517
0518
0519 lfns, guids = job.get_lfns_and_guids()
0520 if lfns:
0521
0522 exit_code = remove_files(job.workdir, lfns)
0523 else:
0524 logger.info("payload log (%s) within allowed size limit (%d B): %d B", os.path.basename(filename), localsizelimit_stdout, fsize)
0525 else:
0526 logger.info("skipping file size check of payload stdout file (%s) since it has not been created yet", filename)
0527
0528 return exit_code, diagnostics
0529
0530
0531 def check_local_space(initial=True):
0532 """
0533 Do we have enough local disk space left to run the job?
0534 For the initial local space check, the Pilot will require 2 GB of free space, but during running
0535 this can be lowered to 1 GB.
0536
0537 :param initial: True means a 2 GB limit, False means a 1 GB limit (optional Boolean)
0538 :return: pilot error code (0 if success, NOLOCALSPACE if failure)
0539 """
0540
0541 ec = 0
0542 diagnostics = ""
0543
0544
0545 cwd = os.getcwd()
0546 logger.debug('checking local space on %s', cwd)
0547 spaceleft = convert_mb_to_b(get_local_disk_space(cwd))
0548 free_space_limit = human2bytes(config.Pilot.free_space_limit) if initial else human2bytes(config.Pilot.free_space_limit_running)
0549
0550 if spaceleft <= free_space_limit:
0551 diagnostics = 'too little space left on local disk to run job: %d B (need > %d B)' %\
0552 (spaceleft, free_space_limit)
0553 ec = errors.NOLOCALSPACE
0554 logger.warning(diagnostics)
0555 else:
0556 logger.info('sufficient remaining disk space (%d B)', spaceleft)
0557
0558 return ec, diagnostics
0559
0560
0561 def check_work_dir(job):
0562 """
0563 Check the size of the work directory.
0564 The function also updates the workdirsizes list in the job object.
0565
0566 :param job: job object.
0567 :return: exit code (int), error diagnostics (string)
0568 """
0569
0570 exit_code = 0
0571 diagnostics = ""
0572
0573 if os.path.exists(job.workdir):
0574
0575 maxwdirsize = get_max_allowed_work_dir_size(job.infosys.queuedata)
0576
0577 if os.path.exists(job.workdir):
0578 workdirsize = get_disk_usage(job.workdir)
0579
0580
0581 if workdirsize > maxwdirsize:
0582 exit_code = errors.USERDIRTOOLARGE
0583 diagnostics = "work directory (%s) is too large: %d B (must be < %d B)" % \
0584 (job.workdir, workdirsize, maxwdirsize)
0585 logger.fatal("%s", diagnostics)
0586
0587 cmd = 'ls -altrR %s' % job.workdir
0588 _ec, stdout, stderr = execute(cmd, mute=True)
0589 logger.info("%s: %s", cmd + '\n', stdout)
0590
0591
0592
0593 set_pilot_state(job=job, state="failed")
0594 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code)
0595 kill_processes(job.pid)
0596
0597
0598 lfns, guids = job.get_lfns_and_guids()
0599 if lfns:
0600 remove_files(job.workdir, lfns)
0601
0602
0603 workdirsize = get_disk_usage(job.workdir)
0604 else:
0605 logger.info("size of work directory %s: %d B (within %d B limit)", job.workdir, workdirsize, maxwdirsize)
0606
0607
0608 if workdirsize > 0:
0609 job.add_workdir_size(workdirsize)
0610 else:
0611 logger.warning('job work dir does not exist: %s', job.workdir)
0612 else:
0613 logger.warning('skipping size check of workdir since it has not been created yet')
0614
0615 return exit_code, diagnostics
0616
0617
0618 def get_max_allowed_work_dir_size(queuedata):
0619 """
0620 Return the maximum allowed size of the work directory.
0621
0622 :param queuedata: job.infosys.queuedata object.
0623 :return: max allowed work dir size in Bytes (int).
0624 """
0625
0626 try:
0627 maxwdirsize = convert_mb_to_b(get_maximum_input_sizes())
0628 except Exception as error:
0629 max_input_size = get_max_input_size()
0630 maxwdirsize = max_input_size + config.Pilot.local_size_limit_stdout * 1024
0631 logger.info("work directory size check will use %d B as a max limit (maxinputsize [%d B] + local size limit for"
0632 " stdout [%d B])", maxwdirsize, max_input_size, config.Pilot.local_size_limit_stdout * 1024)
0633 logger.warning('conversion caught exception: %s', error)
0634 else:
0635
0636 margin = 10.0
0637 maxwdirsize = int(maxwdirsize * (1 + margin / 100.0))
0638 logger.info("work directory size check will use %d B as a max limit (10%% grace limit added)", maxwdirsize)
0639
0640 return maxwdirsize
0641
0642
0643 def get_max_input_size(queuedata, megabyte=False):
0644 """
0645 Return a proper maxinputsize value.
0646
0647 :param queuedata: job.infosys.queuedata object.
0648 :param megabyte: return results in MB (Boolean).
0649 :return: max input size (int).
0650 """
0651
0652 _maxinputsize = queuedata.maxwdir
0653 max_input_file_sizes = 14 * 1024 * 1024 * 1024
0654 max_input_file_sizes_mb = 14 * 1024
0655 if _maxinputsize != "":
0656 try:
0657 if megabyte:
0658 _maxinputsize = int(_maxinputsize)
0659 else:
0660 _maxinputsize = int(_maxinputsize) * 1024 * 1024
0661 except Exception as error:
0662 logger.warning("schedconfig.maxinputsize: %s", error)
0663 if megabyte:
0664 _maxinputsize = max_input_file_sizes_mb
0665 else:
0666 _maxinputsize = max_input_file_sizes
0667 else:
0668 if megabyte:
0669 _maxinputsize = max_input_file_sizes_mb
0670 else:
0671 _maxinputsize = max_input_file_sizes
0672
0673 if megabyte:
0674 logger.info("max input size = %d MB (pilot default)", _maxinputsize)
0675 else:
0676 logger.info("Max input size = %d B (pilot default)", _maxinputsize)
0677
0678 return _maxinputsize
0679
0680
0681 def check_output_file_sizes(job):
0682 """
0683 Are the output files within the allowed size limits?
0684
0685 :param job: job object.
0686 :return: exit code (int), error diagnostics (string)
0687 """
0688
0689 exit_code = 0
0690 diagnostics = ""
0691
0692
0693 for fspec in job.outdata:
0694 path = os.path.join(job.workdir, fspec.lfn)
0695 if os.path.exists(path):
0696
0697 fsize = get_local_file_size(path)
0698 max_fsize = human2bytes(config.Pilot.maximum_output_file_size)
0699 if fsize and fsize < max_fsize:
0700 logger.info('output file %s is within allowed size limit (%d B < %d B)', path, fsize, max_fsize)
0701 else:
0702 exit_code = errors.OUTPUTFILETOOLARGE
0703 diagnostics = 'output file %s is not within allowed size limit (%d B > %d B)' % (path, fsize, max_fsize)
0704 logger.warning(diagnostics)
0705 else:
0706 logger.info('output file size check: skipping output file %s since it does not exist', path)
0707
0708 return exit_code, diagnostics