Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:05

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
0009 
0010 import logging
0011 
0012 from pilot.common.errorcodes import ErrorCodes
0013 from pilot.util.auxiliary import set_pilot_state
0014 from pilot.util.processes import kill_processes
0015 from .utilities import get_memory_values
0016 
0017 logger = logging.getLogger(__name__)
0018 errors = ErrorCodes()
0019 
0020 
0021 def allow_memory_usage_verifications():
0022     """
0023     Should memory usage verifications be performed?
0024 
0025     :return: boolean.
0026     """
0027 
0028     return True
0029 
0030 
0031 def get_ucore_scale_factor(job):
0032     """
0033     Get the correction/scale factor for SCORE/4CORE/nCORE jobs on UCORE queues/
0034 
0035     :param job: job object.
0036     :return: scale factor (int).
0037     """
0038 
0039     try:
0040         job_corecount = float(job.corecount)
0041     except (ValueError, TypeError) as exc:
0042         logger.warning('exception caught: %s (job.corecount=%s)', exc, str(job.corecount))
0043         job_corecount = None
0044 
0045     try:
0046         schedconfig_corecount = float(job.infosys.queuedata.corecount)
0047     except (ValueError, TypeError) as exc:
0048         logger.warning('exception caught: %s (job.infosys.queuedata.corecount=%s)', exc, str(job.infosys.queuedata.corecount))
0049         schedconfig_corecount = None
0050 
0051     if job_corecount and schedconfig_corecount:
0052         try:
0053             scale = job_corecount / schedconfig_corecount
0054             logger.debug('scale: job_corecount / schedconfig_corecount=%f', scale)
0055         except (ZeroDivisionError, TypeError) as exc:
0056             logger.warning('exception caught: %s (using scale factor 1)', exc)
0057             scale = 1
0058     else:
0059         logger.debug('will use scale factor 1')
0060         scale = 1
0061 
0062     return scale
0063 
0064 
0065 def memory_usage(job):
0066     """
0067     Perform memory usage verification.
0068 
0069     :param job: job object
0070     :return: exit code (int), diagnostics (string).
0071     """
0072 
0073     exit_code = 0
0074     diagnostics = ""
0075 
0076     # Get the maxPSS value from the memory monitor
0077     summary_dictionary = get_memory_values(job.workdir, name=job.memorymonitor)
0078 
0079     if not summary_dictionary:
0080         exit_code = errors.BADMEMORYMONITORJSON
0081         diagnostics = "Memory monitor output could not be read"
0082         return exit_code, diagnostics
0083 
0084     maxdict = summary_dictionary.get('Max', {})
0085     maxpss_int = maxdict.get('maxPSS', -1)
0086 
0087     # Only proceed if values are set
0088     if maxpss_int != -1:
0089         maxrss = job.infosys.queuedata.maxrss
0090 
0091         if maxrss:
0092             # correction for SCORE/4CORE/nCORE jobs on UCORE queues
0093             scale = get_ucore_scale_factor(job)
0094             try:
0095                 maxrss_int = 2 * int(maxrss * scale) * 1024  # Convert to int and kB
0096             except (ValueError, TypeError) as exc:
0097                 logger.warning("unexpected value for maxRSS: %s", exc)
0098             else:
0099                 # Compare the maxRSS with the maxPSS from memory monitor
0100                 if maxrss_int > 0 and maxpss_int > 0:
0101                     if maxpss_int > maxrss_int:
0102                         diagnostics = "job has exceeded the memory limit %d kB > %d kB (2 * queuedata.maxrss)" % \
0103                                       (maxpss_int, maxrss_int)
0104                         logger.warning(diagnostics)
0105 
0106                         # Create a lockfile to let RunJob know that it should not restart the memory monitor after it has been killed
0107                         #pUtil.createLockFile(False, self.__env['jobDic'][k][1].workdir, lockfile="MEMORYEXCEEDED")
0108 
0109                         # Kill the job
0110                         set_pilot_state(job=job, state="failed")
0111                         job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PAYLOADEXCEEDMAXMEM)
0112                         kill_processes(job.pid)
0113                     else:
0114                         logger.info("max memory (maxPSS) used by the payload is within the allowed limit: "
0115                                     "%d B (2 * maxRSS = %d B)", maxpss_int, maxrss_int)
0116         else:
0117             if maxrss == 0 or maxrss == "0":
0118                 logger.info("queuedata.maxrss set to 0 (no memory checks will be done)")
0119             else:
0120                 logger.warning("queuedata.maxrss is not set")
0121 
0122     return exit_code, diagnostics