Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:18

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2019
0009 
0010 # This module contains implementations of job monitoring tasks
0011 
0012 import os
0013 import time
0014 from subprocess import PIPE
0015 from glob import glob
0016 
0017 from pilot.common.errorcodes import ErrorCodes
0018 from pilot.util.auxiliary import set_pilot_state, show_memory_usage
0019 from pilot.util.config import config
0020 from pilot.util.container import execute
0021 from pilot.util.filehandling import get_disk_usage, remove_files, get_local_file_size, read_file
0022 from pilot.util.loopingjob import looping_job
0023 from pilot.util.math import convert_mb_to_b, human2bytes
0024 from pilot.util.parameters import convert_to_int, get_maximum_input_sizes
0025 from pilot.util.processes import get_current_cpu_consumption_time, kill_processes, get_number_of_child_processes
0026 from pilot.util.workernode import get_local_disk_space, check_hz
0027 
0028 import logging
0029 logger = logging.getLogger(__name__)
0030 
0031 errors = ErrorCodes()
0032 
0033 
0034 def job_monitor_tasks(job, mt, args):
0035     """
0036     Perform the tasks for the job monitoring.
0037     The function is called once a minute. Individual checks will be performed at any desired time interval (>= 1
0038     minute).
0039 
0040     :param job: job object.
0041     :param mt: `MonitoringTime` object.
0042     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0043     :return: exit code (int), diagnostics (string).
0044     """
0045 
0046     exit_code = 0
0047     diagnostics = ""
0048 
0049     current_time = int(time.time())
0050 
0051     # update timing info for running jobs (to avoid an update after the job has finished)
0052     if job.state == 'running':
0053         # confirm that the worker node has a proper SC_CLK_TCK (problems seen on MPPMU)
0054         check_hz()
0055         try:
0056             cpuconsumptiontime = get_current_cpu_consumption_time(job.pid)
0057         except Exception as error:
0058             diagnostics = "Exception caught: %s" % error
0059             logger.warning(diagnostics)
0060             exit_code = get_exception_error_code(diagnostics)
0061             return exit_code, diagnostics
0062         else:
0063             job.cpuconsumptiontime = int(round(cpuconsumptiontime))
0064             job.cpuconversionfactor = 1.0
0065             logger.info('CPU consumption time for pid=%d: %f (rounded to %d)', job.pid, cpuconsumptiontime, job.cpuconsumptiontime)
0066 
0067         # check how many cores the payload is using
0068         set_number_used_cores(job)
0069 
0070         # check memory usage (optional) for jobs in running state
0071         exit_code, diagnostics = verify_memory_usage(current_time, mt, job)
0072         if exit_code != 0:
0073             return exit_code, diagnostics
0074 
0075         # display OOM process info
0076         display_oom_info(job.pid)
0077 
0078     # should the pilot abort the payload?
0079     exit_code, diagnostics = should_abort_payload(current_time, mt)
0080     if exit_code != 0:
0081         return exit_code, diagnostics
0082 
0083     # is it time to verify the pilot running time?
0084 #    exit_code, diagnostics = verify_pilot_running_time(current_time, mt, job)
0085 #    if exit_code != 0:
0086 #        return exit_code, diagnostics
0087 
0088     # should the proxy be verified?
0089     if args.verify_proxy:
0090         exit_code, diagnostics = verify_user_proxy(current_time, mt)
0091         if exit_code != 0:
0092             return exit_code, diagnostics
0093 
0094     # is it time to check for looping jobs?
0095     exit_code, diagnostics = verify_looping_job(current_time, mt, job)
0096     if exit_code != 0:
0097         return exit_code, diagnostics
0098 
0099     # is the job using too much space?
0100     exit_code, diagnostics = verify_disk_usage(current_time, mt, job)
0101     if exit_code != 0:
0102         return exit_code, diagnostics
0103 
0104     # is it time to verify the number of running processes?
0105     if job.pid:
0106         exit_code, diagnostics = verify_running_processes(current_time, mt, job.pid)
0107         if exit_code != 0:
0108             return exit_code, diagnostics
0109 
0110     # make sure that any utility commands are still running
0111     if job.utilities != {}:
0112         utility_monitor(job)
0113 
0114     return exit_code, diagnostics
0115 
0116 
0117 def display_oom_info(payload_pid):
0118     """
0119     Display OOM process info.
0120 
0121     :param payload_pid: payload pid (int).
0122     """
0123 
0124     payload_score = get_score(payload_pid) if payload_pid else 'UNKNOWN'
0125     pilot_score = get_score(os.getpid())
0126     logger.info('oom_score(pilot) = %s, oom_score(payload) = %s', pilot_score, payload_score)
0127 
0128 
0129 def get_score(pid):
0130     """
0131     Get the OOM process score.
0132 
0133     :param pid: process id (int).
0134     :return: score (string).
0135     """
0136 
0137     try:
0138         score = '%s' % read_file('/proc/%d/oom_score' % pid)
0139     except Exception as error:
0140         logger.warning('caught exception reading oom_score: %s', error)
0141         score = 'UNKNOWN'
0142     else:
0143         if score.endswith('\n'):
0144             score = score[:-1]
0145 
0146     return score
0147 
0148 
0149 def get_exception_error_code(diagnostics):
0150     """
0151     Identify a suitable error code to a given exception.
0152 
0153     :param diagnostics: exception diagnostics (string).
0154     :return: exit_code
0155     """
0156 
0157     import traceback
0158     logger.warning(traceback.format_exc())
0159     if "Resource temporarily unavailable" in diagnostics:
0160         exit_code = errors.RESOURCEUNAVAILABLE
0161     elif "No such file or directory" in diagnostics:
0162         exit_code = errors.STATFILEPROBLEM
0163     elif "No such process" in diagnostics:
0164         exit_code = errors.NOSUCHPROCESS
0165     else:
0166         exit_code = errors.GENERALCPUCALCPROBLEM
0167 
0168     return exit_code
0169 
0170 
0171 def set_number_used_cores(job):
0172     """
0173     Set the number of cores used by the payload.
0174     The number of actual used cores is reported with job metrics (if set).
0175 
0176     :param job: job object.
0177     :return:
0178     """
0179 
0180     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0181     cpu = __import__('pilot.user.%s.cpu' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0182     cpu.set_core_counts(job)
0183 
0184 
0185 def verify_memory_usage(current_time, mt, job):
0186     """
0187     Verify the memory usage (optional).
0188     Note: this function relies on a stand-alone memory monitor tool that may be executed by the Pilot.
0189 
0190     :param current_time: current time at the start of the monitoring loop (int).
0191     :param mt: measured time object.
0192     :param job: job object.
0193     :return: exit code (int), error diagnostics (string).
0194     """
0195 
0196     show_memory_usage()
0197 
0198     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0199     memory = __import__('pilot.user.%s.memory' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0200 
0201     if not memory.allow_memory_usage_verifications():
0202         return 0, ""
0203 
0204     # is it time to verify the memory usage?
0205     memory_verification_time = convert_to_int(config.Pilot.memory_usage_verification_time, default=60)
0206     if current_time - mt.get('ct_memory') > memory_verification_time:
0207         # is the used memory within the allowed limit?
0208         try:
0209             exit_code, diagnostics = memory.memory_usage(job)
0210         except Exception as error:
0211             logger.warning('caught exception: %s', error)
0212             exit_code = -1
0213         if exit_code != 0:
0214             logger.warning('ignoring failure to parse memory monitor output')
0215             #return exit_code, diagnostics
0216         else:
0217             # update the ct_proxy with the current time
0218             mt.update('ct_memory')
0219 
0220     return 0, ""
0221 
0222 
0223 def should_abort_payload(current_time, mt):
0224     """
0225     Should the pilot abort the payload?
0226     In the case of Raythena, the Driver is monitoring the time to end jobs and may decide
0227     that the pilot should abort the payload. Internally, this is achieved by letting the Actors
0228     know it's time to end, and they in turn contacts the pilot by placing a 'pilot_kill_payload' file
0229     in the run directory.
0230 
0231     :param current_time: current time at the start of the monitoring loop (int).
0232     :param mt: measured time object.
0233     :return: exit code (int), error diagnostics (string).
0234     """
0235 
0236     # is it time to look for the kill instruction file?
0237     killing_time = convert_to_int(config.Pilot.kill_instruction_time, default=600)
0238     if current_time - mt.get('ct_kill') > killing_time:
0239         path = os.path.join(os.environ.get('PILOT_HOME'), config.Pilot.kill_instruction_filename)
0240         if os.path.exists(path):
0241             logger.info('pilot encountered payload kill instruction file - will abort payload')
0242             return errors.KILLPAYLOAD, ""  # note, this is not an error
0243 
0244     return 0, ""
0245 
0246 
0247 def verify_user_proxy(current_time, mt):
0248     """
0249     Verify the user proxy.
0250     This function is called by the job_monitor_tasks() function.
0251 
0252     :param current_time: current time at the start of the monitoring loop (int).
0253     :param mt: measured time object.
0254     :return: exit code (int), error diagnostics (string).
0255     """
0256 
0257     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0258     userproxy = __import__('pilot.user.%s.proxy' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0259 
0260     # is it time to verify the proxy?
0261     proxy_verification_time = convert_to_int(config.Pilot.proxy_verification_time, default=600)
0262     if current_time - mt.get('ct_proxy') > proxy_verification_time:
0263         # is the proxy still valid?
0264         exit_code, diagnostics = userproxy.verify_proxy(test=False)  # use test=True to test expired proxy
0265         if exit_code != 0:
0266             return exit_code, diagnostics
0267         else:
0268             # update the ct_proxy with the current time
0269             mt.update('ct_proxy')
0270 
0271     return 0, ""
0272 
0273 
0274 def verify_looping_job(current_time, mt, job):
0275     """
0276     Verify that the job is not looping.
0277 
0278     :param current_time: current time at the start of the monitoring loop (int).
0279     :param mt: measured time object.
0280     :param job: job object.
0281     :return: exit code (int), error diagnostics (string).
0282     """
0283 
0284     # only perform looping job check if desired
0285     if not job.looping_check:
0286         logger.debug('looping check not desired')
0287         return 0, ""
0288 
0289     looping_verification_time = convert_to_int(config.Pilot.looping_verification_time, default=600)
0290     if current_time - mt.get('ct_looping') > looping_verification_time:
0291         # is the job looping?
0292         try:
0293             exit_code, diagnostics = looping_job(job, mt)
0294         except Exception as error:
0295             diagnostics = 'exception caught in looping job algorithm: %s' % error
0296             logger.warning(diagnostics)
0297             if "No module named" in diagnostics:
0298                 exit_code = errors.BLACKHOLE
0299             else:
0300                 exit_code = errors.UNKNOWNEXCEPTION
0301             return exit_code, diagnostics
0302         else:
0303             if exit_code != 0:
0304                 return exit_code, diagnostics
0305 
0306         # update the ct_proxy with the current time
0307         mt.update('ct_looping')
0308 
0309     return 0, ""
0310 
0311 
0312 def verify_disk_usage(current_time, mt, job):
0313     """
0314     Verify the disk usage.
0315     The function checks 1) payload stdout size, 2) local space, 3) work directory size, 4) output file sizes.
0316 
0317     :param current_time: current time at the start of the monitoring loop (int).
0318     :param mt: measured time object.
0319     :param job: job object.
0320     :return: exit code (int), error diagnostics (string).
0321     """
0322 
0323     disk_space_verification_time = convert_to_int(config.Pilot.disk_space_verification_time, default=300)
0324     if current_time - mt.get('ct_diskspace') > disk_space_verification_time:
0325         # time to check the disk space
0326 
0327         # check the size of the payload stdout
0328         exit_code, diagnostics = check_payload_stdout(job)
0329         if exit_code != 0:
0330             return exit_code, diagnostics
0331 
0332         # check the local space, if it's enough left to keep running the job
0333         exit_code, diagnostics = check_local_space(initial=False)
0334         if exit_code != 0:
0335             return exit_code, diagnostics
0336 
0337         # check the size of the workdir
0338         exit_code, diagnostics = check_work_dir(job)
0339         if exit_code != 0:
0340             return exit_code, diagnostics
0341 
0342         # check the output file sizes
0343         exit_code, diagnostics = check_output_file_sizes(job)
0344         if exit_code != 0:
0345             return exit_code, diagnostics
0346 
0347         # update the ct_diskspace with the current time
0348         mt.update('ct_diskspace')
0349 
0350     return 0, ""
0351 
0352 
0353 def verify_running_processes(current_time, mt, pid):
0354     """
0355     Verify the number of running processes.
0356     The function sets the environmental variable PILOT_MAXNPROC to the maximum number of found (child) processes
0357     corresponding to the main payload process id.
0358     The function does not return an error code (always returns exit code 0).
0359 
0360     :param current_time: current time at the start of the monitoring loop (int).
0361     :param mt: measured time object.
0362     :param pid: payload process id (int).
0363     :return: exit code (int), error diagnostics (string).
0364     """
0365 
0366     nproc_env = 0
0367 
0368     process_verification_time = convert_to_int(config.Pilot.process_verification_time, default=300)
0369     if current_time - mt.get('ct_process') > process_verification_time:
0370         # time to check the number of processes
0371         nproc = get_number_of_child_processes(pid)
0372         try:
0373             nproc_env = int(os.environ.get('PILOT_MAXNPROC', 0))
0374         except Exception as error:
0375             logger.warning('failed to convert PILOT_MAXNPROC to int: %s', error)
0376         else:
0377             if nproc > nproc_env:
0378                 # set the maximum number of found processes
0379                 os.environ['PILOT_MAXNPROC'] = str(nproc)
0380 
0381         if nproc_env > 0:
0382             logger.info('maximum number of monitored processes: %d', nproc_env)
0383 
0384     return 0, ""
0385 
0386 
0387 def utility_monitor(job):
0388     """
0389     Make sure that any utility commands are still running.
0390     In case a utility tool has crashed, this function may restart the process.
0391     The function is used by the job monitor thread.
0392 
0393     :param job: job object.
0394     :return:
0395     """
0396 
0397     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0398     usercommon = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0399 
0400     # loop over all utilities
0401     for utcmd in list(job.utilities.keys()):  # E.g. utcmd = MemoryMonitor, Python 2/3
0402 
0403         # make sure the subprocess is still running
0404         utproc = job.utilities[utcmd][0]
0405         if not utproc.poll() is None:
0406             if job.state == 'finished' or job.state == 'failed' or job.state == 'stageout':
0407                 logger.debug('no need to restart utility command since payload has finished running')
0408                 continue
0409 
0410             # if poll() returns anything but None it means that the subprocess has ended - which it
0411             # should not have done by itself
0412             utility_subprocess_launches = job.utilities[utcmd][1]
0413             if utility_subprocess_launches <= 5:
0414                 logger.warning('detected crashed utility subprocess - will restart it')
0415                 utility_command = job.utilities[utcmd][2]
0416 
0417                 try:
0418                     proc1 = execute(utility_command, workdir=job.workdir, returnproc=True, usecontainer=False,
0419                                     stdout=PIPE, stderr=PIPE, cwd=job.workdir, queuedata=job.infosys.queuedata)
0420                 except Exception as error:
0421                     logger.error('could not execute: %s', error)
0422                 else:
0423                     # store process handle in job object, and keep track on how many times the
0424                     # command has been launched
0425                     job.utilities[utcmd] = [proc1, utility_subprocess_launches + 1, utility_command]
0426             else:
0427                 logger.warning('detected crashed utility subprocess - too many restarts, will not restart %s again', utcmd)
0428         else:  # check the utility output (the selector option adds a substring to the output file name)
0429             filename = usercommon.get_utility_command_output_filename(utcmd, selector=True)
0430             path = os.path.join(job.workdir, filename)
0431             if not os.path.exists(path):
0432                 logger.warning('file: %s does not exist', path)
0433 
0434             time.sleep(10)
0435 
0436 
0437 def get_local_size_limit_stdout(bytes=True):
0438     """
0439     Return a proper value for the local size limit for payload stdout (from config file).
0440 
0441     :param bytes: boolean (if True, convert kB to Bytes).
0442     :return: size limit (int).
0443     """
0444 
0445     try:
0446         localsizelimit_stdout = int(config.Pilot.local_size_limit_stdout)
0447     except Exception as error:
0448         localsizelimit_stdout = 2097152
0449         logger.warning('bad value in config for local_size_limit_stdout: %s (will use value: %d kB)', error, localsizelimit_stdout)
0450 
0451     # convert from kB to B
0452     if bytes:
0453         localsizelimit_stdout *= 1024
0454 
0455     return localsizelimit_stdout
0456 
0457 
0458 def check_payload_stdout(job):
0459     """
0460     Check the size of the payload stdout.
0461 
0462     :param job: job object.
0463     :return: exit code (int), diagnostics (string).
0464     """
0465 
0466     exit_code = 0
0467     diagnostics = ""
0468 
0469     # get list of log files
0470     file_list = glob(os.path.join(job.workdir, 'log.*'))
0471 
0472     # is this a multi-trf job?
0473     n_jobs = job.jobparams.count("\n") + 1
0474     for _i in range(n_jobs):
0475         # get name of payload stdout file created by the pilot
0476         _stdout = config.Payload.payloadstdout
0477         if n_jobs > 1:
0478             _stdout = _stdout.replace(".txt", "_%d.txt" % (_i + 1))
0479 
0480         # add the primary stdout file to the fileList
0481         file_list.append(os.path.join(job.workdir, _stdout))
0482 
0483     tmp_list = glob(os.path.join(job.workdir, 'workDir/tmp.stdout.*'))
0484     if tmp_list:
0485         file_list += tmp_list
0486     logger.debug('file list=%s' % str(file_list))
0487 
0488     # now loop over all files and check each individually (any large enough file will fail the job)
0489     for filename in file_list:
0490 
0491         logger.debug('check_payload_stdout: filename=%s', filename)
0492         if "job.log.tgz" in filename:
0493             logger.info("skipping file size check of file (%s) since it is a special log file", filename)
0494             continue
0495 
0496         if os.path.exists(filename):
0497             try:
0498                 # get file size in bytes
0499                 fsize = os.path.getsize(filename)
0500             except Exception as error:
0501                 logger.warning("could not read file size of %s: %s", filename, error)
0502             else:
0503                 # is the file too big?
0504                 localsizelimit_stdout = get_local_size_limit_stdout()
0505                 if fsize > localsizelimit_stdout:
0506                     exit_code = errors.STDOUTTOOBIG
0507                     diagnostics = "Payload stdout file too big: %d B (larger than limit %d B)" % \
0508                                   (fsize, localsizelimit_stdout)
0509                     logger.warning(diagnostics)
0510 
0511                     # kill the job
0512                     set_pilot_state(job=job, state="failed")
0513                     job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code)
0514                     kill_processes(job.pid)
0515 
0516                     # remove the payload stdout file after the log extracts have been created
0517 
0518                     # remove any lingering input files from the work dir
0519                     lfns, guids = job.get_lfns_and_guids()
0520                     if lfns:
0521                         # remove any lingering input files from the work dir
0522                         exit_code = remove_files(job.workdir, lfns)
0523                 else:
0524                     logger.info("payload log (%s) within allowed size limit (%d B): %d B", os.path.basename(filename), localsizelimit_stdout, fsize)
0525         else:
0526             logger.info("skipping file size check of payload stdout file (%s) since it has not been created yet", filename)
0527 
0528     return exit_code, diagnostics
0529 
0530 
0531 def check_local_space(initial=True):
0532     """
0533     Do we have enough local disk space left to run the job?
0534     For the initial local space check, the Pilot will require 2 GB of free space, but during running
0535     this can be lowered to 1 GB.
0536 
0537     :param initial: True means a 2 GB limit, False means a 1 GB limit (optional Boolean)
0538     :return: pilot error code (0 if success, NOLOCALSPACE if failure)
0539     """
0540 
0541     ec = 0
0542     diagnostics = ""
0543 
0544     # is there enough local space to run a job?
0545     cwd = os.getcwd()
0546     logger.debug('checking local space on %s', cwd)
0547     spaceleft = convert_mb_to_b(get_local_disk_space(cwd))  # B (diskspace is in MB)
0548     free_space_limit = human2bytes(config.Pilot.free_space_limit) if initial else human2bytes(config.Pilot.free_space_limit_running)
0549 
0550     if spaceleft <= free_space_limit:
0551         diagnostics = 'too little space left on local disk to run job: %d B (need > %d B)' %\
0552                       (spaceleft, free_space_limit)
0553         ec = errors.NOLOCALSPACE
0554         logger.warning(diagnostics)
0555     else:
0556         logger.info('sufficient remaining disk space (%d B)', spaceleft)
0557 
0558     return ec, diagnostics
0559 
0560 
0561 def check_work_dir(job):
0562     """
0563     Check the size of the work directory.
0564     The function also updates the workdirsizes list in the job object.
0565 
0566     :param job: job object.
0567     :return: exit code (int), error diagnostics (string)
0568     """
0569 
0570     exit_code = 0
0571     diagnostics = ""
0572 
0573     if os.path.exists(job.workdir):
0574         # get the limit of the workdir
0575         maxwdirsize = get_max_allowed_work_dir_size(job.infosys.queuedata)
0576 
0577         if os.path.exists(job.workdir):
0578             workdirsize = get_disk_usage(job.workdir)
0579 
0580             # is user dir within allowed size limit?
0581             if workdirsize > maxwdirsize:
0582                 exit_code = errors.USERDIRTOOLARGE
0583                 diagnostics = "work directory (%s) is too large: %d B (must be < %d B)" % \
0584                               (job.workdir, workdirsize, maxwdirsize)
0585                 logger.fatal("%s", diagnostics)
0586 
0587                 cmd = 'ls -altrR %s' % job.workdir
0588                 _ec, stdout, stderr = execute(cmd, mute=True)
0589                 logger.info("%s: %s", cmd + '\n', stdout)
0590 
0591                 # kill the job
0592                 # pUtil.createLockFile(True, self.__env['jobDic'][k][1].workdir, lockfile="JOBWILLBEKILLED")
0593                 set_pilot_state(job=job, state="failed")
0594                 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code)
0595                 kill_processes(job.pid)
0596 
0597                 # remove any lingering input files from the work dir
0598                 lfns, guids = job.get_lfns_and_guids()
0599                 if lfns:
0600                     remove_files(job.workdir, lfns)
0601 
0602                     # remeasure the size of the workdir at this point since the value is stored below
0603                     workdirsize = get_disk_usage(job.workdir)
0604             else:
0605                 logger.info("size of work directory %s: %d B (within %d B limit)", job.workdir, workdirsize, maxwdirsize)
0606 
0607             # Store the measured disk space (the max value will later be sent with the job metrics)
0608             if workdirsize > 0:
0609                 job.add_workdir_size(workdirsize)
0610         else:
0611             logger.warning('job work dir does not exist: %s', job.workdir)
0612     else:
0613         logger.warning('skipping size check of workdir since it has not been created yet')
0614 
0615     return exit_code, diagnostics
0616 
0617 
0618 def get_max_allowed_work_dir_size(queuedata):
0619     """
0620     Return the maximum allowed size of the work directory.
0621 
0622     :param queuedata: job.infosys.queuedata object.
0623     :return: max allowed work dir size in Bytes (int).
0624     """
0625 
0626     try:
0627         maxwdirsize = convert_mb_to_b(get_maximum_input_sizes())  # from MB to B, e.g. 16336 MB -> 17,129,537,536 B
0628     except Exception as error:
0629         max_input_size = get_max_input_size()
0630         maxwdirsize = max_input_size + config.Pilot.local_size_limit_stdout * 1024
0631         logger.info("work directory size check will use %d B as a max limit (maxinputsize [%d B] + local size limit for"
0632                     " stdout [%d B])", maxwdirsize, max_input_size, config.Pilot.local_size_limit_stdout * 1024)
0633         logger.warning('conversion caught exception: %s', error)
0634     else:
0635         # grace margin, as discussed in https://its.cern.ch/jira/browse/ATLASPANDA-482
0636         margin = 10.0  # percent, read later from somewhere
0637         maxwdirsize = int(maxwdirsize * (1 + margin / 100.0))
0638         logger.info("work directory size check will use %d B as a max limit (10%% grace limit added)", maxwdirsize)
0639 
0640     return maxwdirsize
0641 
0642 
0643 def get_max_input_size(queuedata, megabyte=False):
0644     """
0645     Return a proper maxinputsize value.
0646 
0647     :param queuedata: job.infosys.queuedata object.
0648     :param megabyte: return results in MB (Boolean).
0649     :return: max input size (int).
0650     """
0651 
0652     _maxinputsize = queuedata.maxwdir  # normally 14336+2000 MB
0653     max_input_file_sizes = 14 * 1024 * 1024 * 1024  # 14 GB, 14336 MB (pilot default)
0654     max_input_file_sizes_mb = 14 * 1024  # 14336 MB (pilot default)
0655     if _maxinputsize != "":
0656         try:
0657             if megabyte:  # convert to MB int
0658                 _maxinputsize = int(_maxinputsize)  # MB
0659             else:  # convert to B int
0660                 _maxinputsize = int(_maxinputsize) * 1024 * 1024  # MB -> B
0661         except Exception as error:
0662             logger.warning("schedconfig.maxinputsize: %s", error)
0663             if megabyte:
0664                 _maxinputsize = max_input_file_sizes_mb
0665             else:
0666                 _maxinputsize = max_input_file_sizes
0667     else:
0668         if megabyte:
0669             _maxinputsize = max_input_file_sizes_mb
0670         else:
0671             _maxinputsize = max_input_file_sizes
0672 
0673     if megabyte:
0674         logger.info("max input size = %d MB (pilot default)", _maxinputsize)
0675     else:
0676         logger.info("Max input size = %d B (pilot default)", _maxinputsize)
0677 
0678     return _maxinputsize
0679 
0680 
0681 def check_output_file_sizes(job):
0682     """
0683     Are the output files within the allowed size limits?
0684 
0685     :param job: job object.
0686     :return: exit code (int), error diagnostics (string)
0687     """
0688 
0689     exit_code = 0
0690     diagnostics = ""
0691 
0692     # loop over all known output files
0693     for fspec in job.outdata:
0694         path = os.path.join(job.workdir, fspec.lfn)
0695         if os.path.exists(path):
0696             # get the current file size
0697             fsize = get_local_file_size(path)
0698             max_fsize = human2bytes(config.Pilot.maximum_output_file_size)
0699             if fsize and fsize < max_fsize:
0700                 logger.info('output file %s is within allowed size limit (%d B < %d B)', path, fsize, max_fsize)
0701             else:
0702                 exit_code = errors.OUTPUTFILETOOLARGE
0703                 diagnostics = 'output file %s is not within allowed size limit (%d B > %d B)' % (path, fsize, max_fsize)
0704                 logger.warning(diagnostics)
0705         else:
0706             logger.info('output file size check: skipping output file %s since it does not exist', path)
0707 
0708     return exit_code, diagnostics