Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:05

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2020
0009 
0010 import os
0011 import time
0012 from re import search
0013 
0014 # from pilot.info import infosys
0015 from .setup import get_asetup
0016 from pilot.util.auxiliary import is_python3
0017 from pilot.util.container import execute
0018 from pilot.util.filehandling import read_json, copy, write_json, remove
0019 from pilot.util.parameters import convert_to_int
0020 from pilot.util.processes import is_process_running
0021 
0022 import logging
0023 logger = logging.getLogger(__name__)
0024 
0025 
0026 def get_benchmark_setup(job):
0027     """
0028     Return the proper setup for the benchmark command.
0029 
0030     :param job: job object.
0031     :return: setup string for the benchmark command.
0032     """
0033 
0034     return ''
0035 
0036 
0037 def get_prefetcher_setup(job):
0038     """
0039     Return the proper setup for the Prefetcher.
0040     Prefetcher is a tool used with the Event Streaming Service.
0041 
0042     :param job: job object.
0043     :return: setup string for the Prefetcher command.
0044     """
0045 
0046     # add code here ..
0047 
0048     return ''
0049 
0050 
0051 def get_network_monitor_setup(setup, job):
0052     """
0053     Return the proper setup for the network monitor.
0054     The network monitor is currently setup together with the payload and is start before it. The payload setup should
0055     therefore be provided. The network monitor setup is prepended to it.
0056 
0057     :param setup: payload setup string.
0058     :param job: job object.
0059     :return: network monitor setup string.
0060     """
0061 
0062     return ''
0063 
0064 
0065 def get_memory_monitor_summary_filename(selector=None):
0066     """
0067     Return the name for the memory monitor summary file.
0068 
0069     :param selector: special conditions flag (boolean).
0070     :return: File name (string).
0071     """
0072 
0073     name = "memory_monitor_summary.json"
0074     if selector:
0075         name += '_snapshot'
0076 
0077     return name
0078 
0079 
0080 def get_memory_monitor_output_filename(suffix='txt'):
0081     """
0082     Return the filename of the memory monitor text output file.
0083 
0084     :return: File name (string).
0085     """
0086 
0087     return "memory_monitor_output.%s" % suffix
0088 
0089 
0090 def get_memory_monitor_setup(pid, pgrp, jobid, workdir, command, setup="", use_container=True, transformation="", outdata=None, dump_ps=False):
0091     """
0092     Return the proper setup for the memory monitor.
0093     If the payload release is provided, the memory monitor can be setup with the same release. Until early 2018, the
0094     memory monitor was still located in the release area. After many problems with the memory monitor, it was decided
0095     to use a fixed version for the setup. Currently, release 21.0.22 is used.
0096 
0097     :param pid: job process id (int).
0098     :param pgrp: process group id (int).
0099     :param jobid: job id (int).
0100     :param workdir: job work directory (string).
0101     :param command: payload command (string).
0102     :param setup: optional setup in case asetup can not be used, which uses infosys (string).
0103     :param use_container: optional boolean.
0104     :param transformation: optional name of transformation, e.g. Sim_tf.py (string).
0105     :param outdata: optional list of output fspec objects (list).
0106     :param dump_ps: should ps output be dumped when identifying prmon process? (Boolean).
0107     :return: job work directory (string), pid for process inside container (int).
0108     """
0109 
0110     # try to get the pid from a pid.txt file which might be created by a container_script
0111     pid = get_proper_pid(pid, pgrp, jobid, command=command, transformation=transformation, outdata=outdata, use_container=use_container, dump_ps=dump_ps)
0112     if pid == -1:
0113         logger.warning('process id was not identified before payload finished - will not launch memory monitor')
0114         return "", pid
0115 
0116     if not setup:
0117         setup = get_asetup(asetup=False)
0118         setup += 'lsetup prmon;'
0119     if not setup.endswith(';'):
0120         setup += ';'
0121 
0122     cmd = "prmon"
0123     interval = 60
0124     options = " --pid %d --filename %s --json-summary %s --interval %d" %\
0125               (pid, get_memory_monitor_output_filename(), get_memory_monitor_summary_filename(), interval)
0126     cmd = "cd " + workdir + ";" + setup + cmd + options
0127 
0128     return cmd, pid
0129 
0130 
0131 def get_memory_monitor_setup_old(pid, pgrp, jobid, workdir, command, setup="", use_container=True, transformation="", outdata=None, dump_ps=False):
0132     """
0133     Return the proper setup for the memory monitor.
0134     If the payload release is provided, the memory monitor can be setup with the same release. Until early 2018, the
0135     memory monitor was still located in the release area. After many problems with the memory monitor, it was decided
0136     to use a fixed version for the setup. Currently, release 21.0.22 is used.
0137 
0138     :param pid: job process id (int).
0139     :param pgrp: process group id (int).
0140     :param jobid: job id (int).
0141     :param workdir: job work directory (string).
0142     :param command: payload command (string).
0143     :param setup: optional setup in case asetup can not be used, which uses infosys (string).
0144     :param use_container: optional boolean.
0145     :param transformation: optional name of transformation, e.g. Sim_tf.py (string).
0146     :param outdata: optional list of output fspec objects (list).
0147     :param dump_ps: should ps output be dumped when identifying prmon process? (Boolean).
0148     :return: job work directory (string), pid for process inside container (int).
0149     """
0150 
0151     # try to get the pid from a pid.txt file which might be created by a container_script
0152     pid = get_proper_pid(pid, pgrp, jobid, command=command, transformation=transformation, outdata=outdata, use_container=use_container, dump_ps=dump_ps)
0153     if pid == -1:
0154         logger.warning('process id was not identified before payload finished - will not launch memory monitor')
0155         return "", pid
0156 
0157     release = "22.0.1"
0158     platform = "x86_64-centos7-gcc8-opt"
0159     if not setup:
0160         setup = get_asetup() + " Athena," + release + " --platform " + platform
0161     interval = 60
0162     if not setup.endswith(';'):
0163         setup += ';'
0164     # Decide which version of the memory monitor should be used
0165     cmd = "%swhich prmon" % setup
0166     exit_code, stdout, stderr = execute(cmd)
0167     if stdout and "Command not found" not in stdout:
0168         _cmd = "prmon "
0169     else:
0170         logger.warning('failed to find prmon, defaulting to old memory monitor: %d, %s' % (exit_code, stderr))
0171         _cmd = "MemoryMonitor "
0172         setup = setup.replace(release, "21.0.22")
0173         setup = setup.replace(platform, "x86_64-slc6-gcc62-opt")
0174 
0175     options = "--pid %d --filename %s --json-summary %s --interval %d" %\
0176               (pid, get_memory_monitor_output_filename(), get_memory_monitor_summary_filename(), interval)
0177     _cmd = "cd " + workdir + ";" + setup + _cmd + options
0178 
0179     return _cmd, pid
0180 
0181 
0182 def get_proper_pid(pid, pgrp, jobid, command="", transformation="", outdata="", use_container=True, dump_ps=False):
0183     """
0184     Return a pid from the proper source to be used with the memory monitor.
0185     The given pid comes from Popen(), but in the case containers are used, the pid should instead come from a ps aux
0186     lookup.
0187     If the main process has finished before the proper pid has been identified (it will take time if the payload is
0188     running inside a container), then this function will abort and return -1. The called should handle this and not
0189     launch the memory monitor as it is not needed any longer.
0190 
0191     :param pid: process id (int).
0192     :param pgrp: process group id (int).
0193     :param jobid: job id (int).
0194     :param command: payload command (string).
0195     :param transformation: optional name of transformation, e.g. Sim_tf.py (string).
0196     :param outdata: list of output fspec object (list).
0197     :param use_container: optional boolean.
0198     :return: pid (int).
0199     """
0200 
0201     if not use_container:
0202         return pid
0203 
0204     # abort if main process has finished already
0205     if not is_process_running(pid):
0206         return -1
0207 
0208     #_cmd = get_trf_command(command, transformation=transformation)
0209     # get ps info using group id
0210     ps = get_ps_info(pgrp)
0211     if dump_ps:
0212         logger.debug('ps:\n%s' % ps)
0213     #logger.debug('attempting to identify pid for Singularity (v.3) runtime parent process')
0214     #_pid = get_pid_for_command(ps, command="Singularity runtime parent")
0215     #if _pid:
0216     #    logger.debug('discovered pid=%d for process \"%s\"' % (_pid, _cmd))
0217     #    return _pid
0218 
0219     i = 0
0220     imax = 120
0221     while i < imax:
0222         # abort if main process has finished already
0223         if not is_process_running(pid):
0224             return -1
0225 
0226         ps = get_ps_info(pgrp)
0227         #logger.debug('ps:\n%s' % ps)
0228 
0229         # lookup the process id using ps aux
0230         logger.debug('attempting to identify pid from job id')
0231         _pid = get_pid_for_jobid(ps, jobid)
0232         if _pid:
0233             logger.debug('discovered pid=%d for job id %s' % (_pid, jobid))
0234             break
0235 
0236         #logger.debug('attempting to identify pid from transform name and its output')
0237         #_pid = get_pid_for_trf(ps, transformation, outdata) if outdata else None
0238         #if _pid:
0239         #    logger.debug('discovered pid=%d for transform name \"%s\"' % (_pid, transformation))
0240         #    break
0241 
0242         logger.warning('payload pid has not yet been identified (#%d/#%d)' % (i + 1, imax))
0243 
0244         # wait until the payload has launched
0245         time.sleep(5)
0246         i += 1
0247 
0248     if _pid:
0249         pid = _pid
0250 
0251     logger.info('will use pid=%d for memory monitor' % pid)
0252 
0253     return pid
0254 
0255 
0256 def get_ps_info(pgrp, whoami=None, options='axfo pid,user,args'):
0257     """
0258     Return ps info for the given user.
0259 
0260     :param pgrp: process group id (int).
0261     :param whoami: user name (string).
0262     :return: ps aux for given user (string).
0263     """
0264 
0265     if not whoami:
0266         whoami = os.getuid()
0267 
0268     cmd = "ps -u %s %s" % (whoami, options)
0269     #cmd = "ps %s | grep %s" % (options, whoami)
0270     #cmd = "ps %s | grep %s | awk -v p=%s '$1 == p {print $5}" % (options, whoami, pgrp)
0271     #cmd = "ps %s | awk -v p=%s '$1 == p {print $5}" % (options, pgrp)
0272     exit_code, stdout, stderr = execute(cmd)
0273 
0274     return stdout
0275 
0276 
0277 def get_pid_for_jobid(ps, jobid):
0278     """
0279     Return the process id for the ps entry that contains the job id.
0280 
0281     :param ps: ps command output (string).
0282     :param jobid: PanDA job id (int).
0283     :return: pid (int) or None if no such process.
0284     """
0285 
0286     pid = None
0287 
0288     for line in ps.split('\n'):
0289         if jobid in line and 'xrootd' not in line:
0290             # extract pid
0291             _pid = search(r'(\d+) ', line)
0292             try:
0293                 pid = int(_pid.group(1))
0294             except Exception as e:
0295                 logger.warning('pid has wrong type: %s' % e)
0296             else:
0297                 logger.debug('extracted pid=%d from ps output' % pid)
0298             break
0299 
0300     return pid
0301 
0302 
0303 def get_pid_for_trf(ps, transformation, outdata):
0304     """
0305     Return the process id for the given command and user.
0306     Note: function returns 0 in case pid could not be found.
0307 
0308     :param ps: ps command output (string).
0309     :param transformation: transformation name, e.g. Sim_tf.py (String).
0310     :param outdata: fspec objects (list).
0311     :return: pid (int) or None if no such process.
0312     """
0313 
0314     pid = None
0315     candidates = []
0316 
0317     # in the case of user analysis job, the transformation will contain a URL which should be stripped
0318     if "/" in transformation:
0319         transformation = transformation.split('/')[-1]
0320     logger.debug('using transformation name: %s' % transformation)
0321     for line in ps.split('\n'):
0322         if transformation in line:
0323             candidates.append(line)
0324             break
0325 
0326     if candidates:
0327         for line in candidates:
0328             for fspec in outdata:
0329                 if fspec.lfn in line:
0330                     # extract pid
0331                     _pid = search(r'(\d+) ', line)
0332                     try:
0333                         pid = int(_pid.group(1))
0334                     except Exception as e:
0335                         logger.warning('pid has wrong type: %s' % e)
0336                     else:
0337                         logger.debug('extracted pid=%d from ps output' % pid)
0338                     break
0339             if pid:
0340                 break
0341     else:
0342         logger.debug('pid not found in ps output for trf=%s' % transformation)
0343 
0344     return pid
0345 
0346 
0347 def get_pid_for_command(ps, command="python pilot2/pilot.py"):
0348     """
0349     Return the process id for the given command and user.
0350     The function returns 0 in case pid could not be found.
0351     If no command is specified, the function looks for the "python pilot2/pilot.py" command in the ps output.
0352 
0353     :param ps: ps command output (string).
0354     :param command: command string expected to be in ps output (string).
0355     :return: pid (int) or None if no such process.
0356     """
0357 
0358     pid = None
0359     found = None
0360 
0361     for line in ps.split('\n'):
0362         if command in line:
0363             found = line
0364             break
0365     if found:
0366         # extract pid
0367         _pid = search(r'(\d+) ', found)
0368         try:
0369             pid = int(_pid.group(1))
0370         except Exception as e:
0371             logger.warning('pid has wrong type: %s' % e)
0372         else:
0373             logger.debug('extracted pid=%d from ps output: %s' % (pid, found))
0374     else:
0375         logger.debug('command not found in ps output: %s' % command)
0376 
0377     return pid
0378 
0379 
0380 def get_trf_command(command, transformation=""):
0381     """
0382     Return the last command in the full payload command string.
0383     Note: this function returns the last command in job.command which is only set for containers.
0384 
0385     :param command: full payload command (string).
0386     :param transformation: optional name of transformation, e.g. Sim_tf.py (string).
0387     :return: trf command (string).
0388     """
0389 
0390     payload_command = ""
0391     if command:
0392         if not transformation:
0393             payload_command = command.split(';')[-2]
0394         else:
0395             if transformation in command:
0396                 payload_command = command[command.find(transformation):]
0397 
0398         # clean-up the command, remove '-signs and any trailing ;
0399         payload_command = payload_command.strip()
0400         payload_command = payload_command.replace("'", "")
0401         payload_command = payload_command.rstrip(";")
0402 
0403     return payload_command
0404 
0405 
0406 def get_memory_monitor_info_path(workdir, allowtxtfile=False):
0407     """
0408     Find the proper path to the utility info file
0409     Priority order:
0410        1. JSON summary file from workdir
0411        2. JSON summary file from pilot initdir
0412        3. Text output file from workdir (if allowtxtfile is True)
0413 
0414     :param workdir: relevant work directory (string).
0415     :param allowtxtfile: boolean attribute to allow for reading the raw memory monitor output.
0416     :return: path (string).
0417     """
0418 
0419     pilot_initdir = os.environ.get('PILOT_HOME', '')
0420     path = os.path.join(workdir, get_memory_monitor_summary_filename())
0421     init_path = os.path.join(pilot_initdir, get_memory_monitor_summary_filename())
0422 
0423     if not os.path.exists(path):
0424         if os.path.exists(init_path):
0425             path = init_path
0426         else:
0427             logger.info("neither %s, nor %s exist" % (path, init_path))
0428             path = ""
0429 
0430         if path == "" and allowtxtfile:
0431             path = os.path.join(workdir, get_memory_monitor_output_filename())
0432             if not os.path.exists(path):
0433                 logger.warning("file does not exist either: %s" % (path))
0434 
0435     return path
0436 
0437 
0438 def get_memory_monitor_info(workdir, allowtxtfile=False, name=""):  # noqa: C901
0439     """
0440     Add the utility info to the node structure if available.
0441 
0442     :param workdir: relevant work directory (string).
0443     :param allowtxtfile: boolean attribute to allow for reading the raw memory monitor output.
0444     :param name: name of memory monitor (string).
0445     :return: node structure (dictionary).
0446     """
0447 
0448     node = {}
0449 
0450     # Get the values from the memory monitor file (json if it exists, otherwise the preliminary txt file)
0451     # Note that only the final json file will contain the totRBYTES, etc
0452     try:
0453         summary_dictionary = get_memory_values(workdir, name=name)
0454     except Exception as e:
0455         logger.warning('failed to get memory values from memory monitor tool: %s' % e)
0456         summary_dictionary = {}
0457     else:
0458         logger.debug("summary_dictionary=%s" % str(summary_dictionary))
0459 
0460     # Fill the node dictionary
0461     if summary_dictionary and summary_dictionary != {}:
0462         # first determine which memory monitor version was running (MemoryMonitor or prmon)
0463         if 'maxRSS' in summary_dictionary['Max']:
0464             version = 'MemoryMonitor'
0465         elif 'rss' in summary_dictionary['Max']:
0466             version = 'prmon'
0467         else:
0468             version = 'unknown'
0469         if version == 'MemoryMonitor':
0470             try:
0471                 node['maxRSS'] = summary_dictionary['Max']['maxRSS']
0472                 node['maxVMEM'] = summary_dictionary['Max']['maxVMEM']
0473                 node['maxSWAP'] = summary_dictionary['Max']['maxSwap']
0474                 node['maxPSS'] = summary_dictionary['Max']['maxPSS']
0475                 node['avgRSS'] = summary_dictionary['Avg']['avgRSS']
0476                 node['avgVMEM'] = summary_dictionary['Avg']['avgVMEM']
0477                 node['avgSWAP'] = summary_dictionary['Avg']['avgSwap']
0478                 node['avgPSS'] = summary_dictionary['Avg']['avgPSS']
0479             except Exception as e:
0480                 logger.warning("exception caught while parsing memory monitor file: %s" % e)
0481                 logger.warning("will add -1 values for the memory info")
0482                 node['maxRSS'] = -1
0483                 node['maxVMEM'] = -1
0484                 node['maxSWAP'] = -1
0485                 node['maxPSS'] = -1
0486                 node['avgRSS'] = -1
0487                 node['avgVMEM'] = -1
0488                 node['avgSWAP'] = -1
0489                 node['avgPSS'] = -1
0490             else:
0491                 logger.info("extracted standard info from memory monitor json")
0492             try:
0493                 node['totRCHAR'] = summary_dictionary['Max']['totRCHAR']
0494                 node['totWCHAR'] = summary_dictionary['Max']['totWCHAR']
0495                 node['totRBYTES'] = summary_dictionary['Max']['totRBYTES']
0496                 node['totWBYTES'] = summary_dictionary['Max']['totWBYTES']
0497                 node['rateRCHAR'] = summary_dictionary['Avg']['rateRCHAR']
0498                 node['rateWCHAR'] = summary_dictionary['Avg']['rateWCHAR']
0499                 node['rateRBYTES'] = summary_dictionary['Avg']['rateRBYTES']
0500                 node['rateWBYTES'] = summary_dictionary['Avg']['rateWBYTES']
0501             except Exception:
0502                 logger.warning("standard memory fields were not found in memory monitor json (or json doesn't exist yet)")
0503             else:
0504                 logger.info("extracted standard memory fields from memory monitor json")
0505         elif version == 'prmon':
0506             try:
0507                 node['maxRSS'] = int(summary_dictionary['Max']['rss'])
0508                 node['maxVMEM'] = int(summary_dictionary['Max']['vmem'])
0509                 node['maxSWAP'] = int(summary_dictionary['Max']['swap'])
0510                 node['maxPSS'] = int(summary_dictionary['Max']['pss'])
0511                 node['avgRSS'] = summary_dictionary['Avg']['rss']
0512                 node['avgVMEM'] = summary_dictionary['Avg']['vmem']
0513                 node['avgSWAP'] = summary_dictionary['Avg']['swap']
0514                 node['avgPSS'] = summary_dictionary['Avg']['pss']
0515             except Exception as e:
0516                 logger.warning("exception caught while parsing prmon file: %s" % e)
0517                 logger.warning("will add -1 values for the memory info")
0518                 node['maxRSS'] = -1
0519                 node['maxVMEM'] = -1
0520                 node['maxSWAP'] = -1
0521                 node['maxPSS'] = -1
0522                 node['avgRSS'] = -1
0523                 node['avgVMEM'] = -1
0524                 node['avgSWAP'] = -1
0525                 node['avgPSS'] = -1
0526             else:
0527                 logger.info("extracted standard info from prmon json")
0528             try:
0529                 node['totRCHAR'] = int(summary_dictionary['Max']['rchar'])
0530                 node['totWCHAR'] = int(summary_dictionary['Max']['wchar'])
0531                 node['totRBYTES'] = int(summary_dictionary['Max']['read_bytes'])
0532                 node['totWBYTES'] = int(summary_dictionary['Max']['write_bytes'])
0533                 node['rateRCHAR'] = summary_dictionary['Avg']['rchar']
0534                 node['rateWCHAR'] = summary_dictionary['Avg']['wchar']
0535                 node['rateRBYTES'] = summary_dictionary['Avg']['read_bytes']
0536                 node['rateWBYTES'] = summary_dictionary['Avg']['write_bytes']
0537             except Exception:
0538                 logger.warning("standard memory fields were not found in prmon json (or json doesn't exist yet)")
0539             else:
0540                 logger.info("extracted standard memory fields from prmon json")
0541         else:
0542             logger.warning('unknown memory monitor version')
0543     else:
0544         logger.info("memory summary dictionary not yet available")
0545 
0546     return node
0547 
0548 
0549 def get_max_memory_monitor_value(value, maxvalue, totalvalue):  # noqa: C90
0550     """
0551     Return the max and total value (used by memory monitoring).
0552     Return an error code, 1, in case of value error.
0553 
0554     :param value: value to be tested (integer).
0555     :param maxvalue: current maximum value (integer).
0556     :param totalvalue: total value (integer).
0557     :return: exit code, maximum and total value (tuple of integers).
0558     """
0559 
0560     ec = 0
0561     try:
0562         value_int = int(value)
0563     except Exception as e:
0564         logger.warning("exception caught: %s" % e)
0565         ec = 1
0566     else:
0567         totalvalue += value_int
0568         if value_int > maxvalue:
0569             maxvalue = value_int
0570 
0571     return ec, maxvalue, totalvalue
0572 
0573 
0574 def convert_unicode_string(unicode_string):
0575     """
0576     Convert a unicode string into str.
0577 
0578     :param unicode_string:
0579     :return: string.
0580     """
0581 
0582     if unicode_string is not None:
0583         return str(unicode_string)
0584     return None
0585 
0586 
0587 def get_average_summary_dictionary_prmon(path):
0588     """
0589     Loop over the memory monitor output file and create the averaged summary dictionary.
0590 
0591     prmon keys:
0592     'Time', 'nprocs', 'nthreads', 'pss', 'rchar', 'read_bytes', 'rss', 'rx_bytes',
0593     'rx_packets', 'stime', 'swap', 'tx_bytes', 'tx_packets', 'utime', 'vmem', 'wchar',
0594     'write_bytes', 'wtime'
0595 
0596     The function uses the first line in the output file to define the dictionary keys used
0597     later in the function. This means that any change in the format such as new columns
0598     will be handled automatically.
0599 
0600     :param path: path to memory monitor txt output file (string).
0601     :return: summary dictionary.
0602     """
0603 
0604     summary_dictionary = {}
0605 
0606     # get the raw memory monitor output, convert to dictionary
0607     dictionary = convert_text_file_to_dictionary(path)
0608 
0609     if dictionary:
0610         # Calculate averages and store all values
0611         summary_dictionary = {"Max": {}, "Avg": {}, "Other": {}}
0612 
0613         def filter_value(value):
0614             """ Inline function used to remove any string or None values from data. """
0615             if type(value) == str or value is None:
0616                 return False
0617             else:
0618                 return True
0619 
0620         keys = ['vmem', 'pss', 'rss', 'swap']
0621         values = {}
0622         for key in keys:
0623             value_list = list(filter(filter_value, dictionary.get(key, 0)))  # Python 2/3
0624             n = len(value_list)
0625             average = int(float(sum(value_list)) / float(n)) if n > 0 else 0
0626             maximum = max(value_list)
0627             values[key] = {'avg': average, 'max': maximum}
0628 
0629         summary_dictionary["Max"] = {"maxVMEM": values['vmem'].get('max'), "maxPSS": values['pss'].get('max'),
0630                                      "maxRSS": values['rss'].get('max'), "maxSwap": values['swap'].get('max')}
0631         summary_dictionary["Avg"] = {"avgVMEM": values['vmem'].get('avg'), "avgPSS": values['pss'].get('avg'),
0632                                      "avgRSS": values['rss'].get('avg'), "avgSwap": values['swap'].get('avg')}
0633 
0634         # add the last of the rchar, .., values
0635         keys = ['rchar', 'wchar', 'read_bytes', 'write_bytes', 'nprocs']
0636         # warning: should read_bytes/write_bytes be reported as rbytes/wbytes?
0637         for key in keys:
0638             value = get_last_value(dictionary.get(key, None))
0639             if value:
0640                 summary_dictionary["Other"][key] = value
0641 
0642     return summary_dictionary
0643 
0644 
0645 def get_metadata_dict_from_txt(path, storejson=False, jobid=None):
0646     """
0647     Convert memory monitor text output to json, store it, and return a selection as a dictionary.
0648 
0649     :param path:
0650     :param storejson: store dictionary on disk if True (boolean).
0651     :param jobid: job id (string).
0652     :return: prmon metadata (dictionary).
0653     """
0654 
0655     # get the raw memory monitor output, convert to dictionary
0656     dictionary = convert_text_file_to_dictionary(path)
0657 
0658     if dictionary and storejson:
0659         # add metadata
0660         dictionary['type'] = 'MemoryMonitorData'
0661         dictionary['pandaid'] = jobid
0662 
0663         path = os.path.join(os.path.dirname(path), get_memory_monitor_output_filename(suffix='json'))
0664         logger.debug('writing prmon dictionary to: %s' % path)
0665         write_json(path, dictionary)
0666     else:
0667         logger.debug('nothing to write (no prmon dictionary)')
0668 
0669     # filter dictionary?
0670     # ..
0671 
0672     return dictionary
0673 
0674 
0675 def convert_text_file_to_dictionary(path):
0676     """
0677     Convert row-column text file to dictionary.
0678     User first row identifiers as dictionary keys.
0679     Note: file must follow the convention:
0680         NAME1   NAME2   ..
0681         value1  value2  ..
0682         ..      ..      ..
0683 
0684     :param path: path to file (string).
0685     :return: dictionary.
0686     """
0687 
0688     summary_keys = []  # to keep track of content
0689     header_locked = False
0690     dictionary = {}
0691 
0692     with open(path) as f:
0693         for line in f:
0694             line = convert_unicode_string(line)
0695             if line != "":
0696                 try:
0697                     # Remove empty entries from list (caused by multiple \t)
0698                     _l = line.replace('\n', '')
0699                     if is_python3():
0700                         _l = [_f for _f in _l.split('\t') if _f]  # Python 3
0701                     else:
0702                         _l = filter(None, _l.split('\t'))  # Python 2
0703 
0704                     # define dictionary keys
0705                     if type(_l[0]) == str and not header_locked:
0706                         summary_keys = _l
0707                         for key in _l:
0708                             dictionary[key] = []
0709                         header_locked = True
0710                     else:  # sort the memory measurements in the correct columns
0711                         for i, key in enumerate(_l):
0712                             # for key in _l:
0713                             key_entry = summary_keys[i]  # e.g. Time
0714                             value = convert_to_int(key)
0715                             dictionary[key_entry].append(value)
0716                 except Exception:
0717                     logger.warning("unexpected format of utility output: %s" % line)
0718 
0719     return dictionary
0720 
0721 
0722 def get_last_value(value_list):
0723     value = None
0724     if value_list:
0725         value = value_list[-1]
0726     return value
0727 
0728 
0729 def get_average_summary_dictionary(path):
0730     """
0731     Loop over the memory monitor output file and create the averaged summary dictionary.
0732 
0733     :param path: path to memory monitor txt output file (string).
0734     :return: summary dictionary.
0735     """
0736 
0737     maxvmem = -1
0738     maxrss = -1
0739     maxpss = -1
0740     maxswap = -1
0741     avgvmem = 0
0742     avgrss = 0
0743     avgpss = 0
0744     avgswap = 0
0745     totalvmem = 0
0746     totalrss = 0
0747     totalpss = 0
0748     totalswap = 0
0749     n = 0
0750     summary_dictionary = {}
0751 
0752     rchar = None
0753     wchar = None
0754     rbytes = None
0755     wbytes = None
0756 
0757     first = True
0758     with open(path) as f:
0759         for line in f:
0760             # Skip the first line
0761             if first:
0762                 first = False
0763                 continue
0764             line = convert_unicode_string(line)
0765             if line != "":
0766                 try:
0767                     # Remove empty entries from list (caused by multiple \t)
0768                     if is_python3():
0769                         _l = [_f for _f in line.split('\t') if _f]  # Python 3
0770                     else:
0771                         _l = filter(None, line.split('\t'))  # Python 2
0772                     # _time = _l[0]  # 'Time' not user
0773                     vmem = _l[1]
0774                     pss = _l[2]
0775                     rss = _l[3]
0776                     swap = _l[4]
0777                     # note: the last rchar etc values will be reported
0778                     if len(_l) == 9:
0779                         rchar = int(_l[5])
0780                         wchar = int(_l[6])
0781                         rbytes = int(_l[7])
0782                         wbytes = int(_l[8])
0783                     else:
0784                         rchar = None
0785                         wchar = None
0786                         rbytes = None
0787                         wbytes = None
0788                 except Exception:
0789                     logger.warning("unexpected format of utility output: %s (expected format: Time, VMEM,"
0790                                    " PSS, RSS, Swap [, RCHAR, WCHAR, RBYTES, WBYTES])" % (line))
0791                 else:
0792                     # Convert to int
0793                     ec1, maxvmem, totalvmem = get_max_memory_monitor_value(vmem, maxvmem, totalvmem)
0794                     ec2, maxpss, totalpss = get_max_memory_monitor_value(pss, maxpss, totalpss)
0795                     ec3, maxrss, totalrss = get_max_memory_monitor_value(rss, maxrss, totalrss)
0796                     ec4, maxswap, totalswap = get_max_memory_monitor_value(swap, maxswap, totalswap)
0797                     if ec1 or ec2 or ec3 or ec4:
0798                         logger.warning("will skip this row of numbers due to value exception: %s" % (line))
0799                     else:
0800                         n += 1
0801 
0802         # Calculate averages and store all values
0803         summary_dictionary = {"Max": {}, "Avg": {}, "Other": {}}
0804         summary_dictionary["Max"] = {"maxVMEM": maxvmem, "maxPSS": maxpss, "maxRSS": maxrss, "maxSwap": maxswap}
0805         if rchar:
0806             summary_dictionary["Other"]["rchar"] = rchar
0807         if wchar:
0808             summary_dictionary["Other"]["wchar"] = wchar
0809         if rbytes:
0810             summary_dictionary["Other"]["rbytes"] = rbytes
0811         if wbytes:
0812             summary_dictionary["Other"]["wbytes"] = wbytes
0813         if n > 0:
0814             avgvmem = int(float(totalvmem) / float(n))
0815             avgpss = int(float(totalpss) / float(n))
0816             avgrss = int(float(totalrss) / float(n))
0817             avgswap = int(float(totalswap) / float(n))
0818         summary_dictionary["Avg"] = {"avgVMEM": avgvmem, "avgPSS": avgpss, "avgRSS": avgrss, "avgSwap": avgswap}
0819 
0820     return summary_dictionary
0821 
0822 
0823 def get_memory_values(workdir, name=""):
0824     """
0825     Find the values in the memory monitor output file.
0826 
0827     In case the summary JSON file has not yet been produced, create a summary dictionary with the same format
0828     using the output text file (produced by the memory monitor and which is updated once per minute).
0829 
0830     FORMAT:
0831        {"Max":{"maxVMEM":40058624,"maxPSS":10340177,"maxRSS":16342012,"maxSwap":16235568},
0832         "Avg":{"avgVMEM":19384236,"avgPSS":5023500,"avgRSS":6501489,"avgSwap":5964997},
0833         "Other":{"rchar":NN,"wchar":NN,"rbytes":NN,"wbytes":NN}}
0834 
0835     :param workdir: relevant work directory (string).
0836     :param name: name of memory monitor (string).
0837     :return: memory values dictionary.
0838     """
0839 
0840     summary_dictionary = {}
0841 
0842     # Get the path to the proper memory info file (priority ordered)
0843     path = get_memory_monitor_info_path(workdir, allowtxtfile=True)
0844     if os.path.exists(path):
0845         logger.info("using path: %s (trf name=%s)" % (path, name))
0846 
0847         # Does a JSON summary file exist? If so, there's no need to calculate maximums and averages in the pilot
0848         if path.lower().endswith('json'):
0849             # Read the dictionary from the JSON file
0850             summary_dictionary = read_json(path)
0851         else:
0852             # Loop over the output file, line by line, and look for the maximum PSS value
0853             if name == "prmon":
0854                 summary_dictionary = get_average_summary_dictionary_prmon(path)
0855             else:
0856                 summary_dictionary = get_average_summary_dictionary(path)
0857             logger.debug('summary_dictionary=%s (trf name=%s)' % (str(summary_dictionary), name))
0858     else:
0859         if path == "":
0860             logger.warning("filename not set for memory monitor output")
0861         else:
0862             # Normally this means that the memory output file has not been produced yet
0863             pass
0864 
0865     return summary_dictionary
0866 
0867 
0868 def post_memory_monitor_action(job):
0869     """
0870     Perform post action items for memory monitor.
0871 
0872     :param job: job object.
0873     :return:
0874     """
0875 
0876     nap = 3
0877     path1 = os.path.join(job.workdir, get_memory_monitor_summary_filename())
0878     path2 = os.environ.get('PILOT_HOME')
0879     i = 0
0880     maxretry = 20
0881     while i <= maxretry:
0882         if os.path.exists(path1):
0883             break
0884         logger.info("taking a short nap (%d s) to allow the memory monitor to finish writing to the summary file (#%d/#%d)"
0885                     % (nap, i, maxretry))
0886         time.sleep(nap)
0887         i += 1
0888 
0889     try:
0890         copy(path1, path2)
0891     except Exception as e:
0892         logger.warning('failed to copy memory monitor output: %s' % e)
0893 
0894 
0895 def precleanup():
0896     """
0897     Pre-cleanup at the beginning of the job to remove any pre-existing files from previous jobs in the main work dir.
0898 
0899     :return:
0900     """
0901 
0902     logger.debug('performing pre-cleanup of potentially pre-existing files from earlier job in main work dir')
0903     path = os.path.join(os.environ.get('PILOT_HOME'), get_memory_monitor_summary_filename())
0904     if os.path.exists(path):
0905         logger.info('removing no longer needed file: %s' % path)
0906         remove(path)