File indexing completed on 2026-04-11 08:41:05
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import logging
0011
0012 from pilot.common.errorcodes import ErrorCodes
0013 from pilot.util.auxiliary import set_pilot_state
0014 from pilot.util.processes import kill_processes
0015 from .utilities import get_memory_values
0016
0017 logger = logging.getLogger(__name__)
0018 errors = ErrorCodes()
0019
0020
0021 def allow_memory_usage_verifications():
0022 """
0023 Should memory usage verifications be performed?
0024
0025 :return: boolean.
0026 """
0027
0028 return True
0029
0030
0031 def get_ucore_scale_factor(job):
0032 """
0033 Get the correction/scale factor for SCORE/4CORE/nCORE jobs on UCORE queues/
0034
0035 :param job: job object.
0036 :return: scale factor (int).
0037 """
0038
0039 try:
0040 job_corecount = float(job.corecount)
0041 except (ValueError, TypeError) as exc:
0042 logger.warning('exception caught: %s (job.corecount=%s)', exc, str(job.corecount))
0043 job_corecount = None
0044
0045 try:
0046 schedconfig_corecount = float(job.infosys.queuedata.corecount)
0047 except (ValueError, TypeError) as exc:
0048 logger.warning('exception caught: %s (job.infosys.queuedata.corecount=%s)', exc, str(job.infosys.queuedata.corecount))
0049 schedconfig_corecount = None
0050
0051 if job_corecount and schedconfig_corecount:
0052 try:
0053 scale = job_corecount / schedconfig_corecount
0054 logger.debug('scale: job_corecount / schedconfig_corecount=%f', scale)
0055 except (ZeroDivisionError, TypeError) as exc:
0056 logger.warning('exception caught: %s (using scale factor 1)', exc)
0057 scale = 1
0058 else:
0059 logger.debug('will use scale factor 1')
0060 scale = 1
0061
0062 return scale
0063
0064
0065 def memory_usage(job):
0066 """
0067 Perform memory usage verification.
0068
0069 :param job: job object
0070 :return: exit code (int), diagnostics (string).
0071 """
0072
0073 exit_code = 0
0074 diagnostics = ""
0075
0076
0077 summary_dictionary = get_memory_values(job.workdir, name=job.memorymonitor)
0078
0079 if not summary_dictionary:
0080 exit_code = errors.BADMEMORYMONITORJSON
0081 diagnostics = "Memory monitor output could not be read"
0082 return exit_code, diagnostics
0083
0084 maxdict = summary_dictionary.get('Max', {})
0085 maxpss_int = maxdict.get('maxPSS', -1)
0086
0087
0088 if maxpss_int != -1:
0089 maxrss = job.infosys.queuedata.maxrss
0090
0091 if maxrss:
0092
0093 scale = get_ucore_scale_factor(job)
0094 try:
0095 maxrss_int = 2 * int(maxrss * scale) * 1024
0096 except (ValueError, TypeError) as exc:
0097 logger.warning("unexpected value for maxRSS: %s", exc)
0098 else:
0099
0100 if maxrss_int > 0 and maxpss_int > 0:
0101 if maxpss_int > maxrss_int:
0102 diagnostics = "job has exceeded the memory limit %d kB > %d kB (2 * queuedata.maxrss)" % \
0103 (maxpss_int, maxrss_int)
0104 logger.warning(diagnostics)
0105
0106
0107
0108
0109
0110 set_pilot_state(job=job, state="failed")
0111 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PAYLOADEXCEEDMAXMEM)
0112 kill_processes(job.pid)
0113 else:
0114 logger.info("max memory (maxPSS) used by the payload is within the allowed limit: "
0115 "%d B (2 * maxRSS = %d B)", maxpss_int, maxrss_int)
0116 else:
0117 if maxrss == 0 or maxrss == "0":
0118 logger.info("queuedata.maxrss set to 0 (no memory checks will be done)")
0119 else:
0120 logger.warning("queuedata.maxrss is not set")
0121
0122 return exit_code, diagnostics