File indexing completed on 2026-04-11 08:41:05
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import os
0011 import time
0012 from re import search
0013
0014
0015 from .setup import get_asetup
0016 from pilot.util.auxiliary import is_python3
0017 from pilot.util.container import execute
0018 from pilot.util.filehandling import read_json, copy, write_json, remove
0019 from pilot.util.parameters import convert_to_int
0020 from pilot.util.processes import is_process_running
0021
0022 import logging
0023 logger = logging.getLogger(__name__)
0024
0025
0026 def get_benchmark_setup(job):
0027 """
0028 Return the proper setup for the benchmark command.
0029
0030 :param job: job object.
0031 :return: setup string for the benchmark command.
0032 """
0033
0034 return ''
0035
0036
0037 def get_prefetcher_setup(job):
0038 """
0039 Return the proper setup for the Prefetcher.
0040 Prefetcher is a tool used with the Event Streaming Service.
0041
0042 :param job: job object.
0043 :return: setup string for the Prefetcher command.
0044 """
0045
0046
0047
0048 return ''
0049
0050
0051 def get_network_monitor_setup(setup, job):
0052 """
0053 Return the proper setup for the network monitor.
0054 The network monitor is currently setup together with the payload and is start before it. The payload setup should
0055 therefore be provided. The network monitor setup is prepended to it.
0056
0057 :param setup: payload setup string.
0058 :param job: job object.
0059 :return: network monitor setup string.
0060 """
0061
0062 return ''
0063
0064
0065 def get_memory_monitor_summary_filename(selector=None):
0066 """
0067 Return the name for the memory monitor summary file.
0068
0069 :param selector: special conditions flag (boolean).
0070 :return: File name (string).
0071 """
0072
0073 name = "memory_monitor_summary.json"
0074 if selector:
0075 name += '_snapshot'
0076
0077 return name
0078
0079
0080 def get_memory_monitor_output_filename(suffix='txt'):
0081 """
0082 Return the filename of the memory monitor text output file.
0083
0084 :return: File name (string).
0085 """
0086
0087 return "memory_monitor_output.%s" % suffix
0088
0089
0090 def get_memory_monitor_setup(pid, pgrp, jobid, workdir, command, setup="", use_container=True, transformation="", outdata=None, dump_ps=False):
0091 """
0092 Return the proper setup for the memory monitor.
0093 If the payload release is provided, the memory monitor can be setup with the same release. Until early 2018, the
0094 memory monitor was still located in the release area. After many problems with the memory monitor, it was decided
0095 to use a fixed version for the setup. Currently, release 21.0.22 is used.
0096
0097 :param pid: job process id (int).
0098 :param pgrp: process group id (int).
0099 :param jobid: job id (int).
0100 :param workdir: job work directory (string).
0101 :param command: payload command (string).
0102 :param setup: optional setup in case asetup can not be used, which uses infosys (string).
0103 :param use_container: optional boolean.
0104 :param transformation: optional name of transformation, e.g. Sim_tf.py (string).
0105 :param outdata: optional list of output fspec objects (list).
0106 :param dump_ps: should ps output be dumped when identifying prmon process? (Boolean).
0107 :return: job work directory (string), pid for process inside container (int).
0108 """
0109
0110
0111 pid = get_proper_pid(pid, pgrp, jobid, command=command, transformation=transformation, outdata=outdata, use_container=use_container, dump_ps=dump_ps)
0112 if pid == -1:
0113 logger.warning('process id was not identified before payload finished - will not launch memory monitor')
0114 return "", pid
0115
0116 if not setup:
0117 setup = get_asetup(asetup=False)
0118 setup += 'lsetup prmon;'
0119 if not setup.endswith(';'):
0120 setup += ';'
0121
0122 cmd = "prmon"
0123 interval = 60
0124 options = " --pid %d --filename %s --json-summary %s --interval %d" %\
0125 (pid, get_memory_monitor_output_filename(), get_memory_monitor_summary_filename(), interval)
0126 cmd = "cd " + workdir + ";" + setup + cmd + options
0127
0128 return cmd, pid
0129
0130
0131 def get_memory_monitor_setup_old(pid, pgrp, jobid, workdir, command, setup="", use_container=True, transformation="", outdata=None, dump_ps=False):
0132 """
0133 Return the proper setup for the memory monitor.
0134 If the payload release is provided, the memory monitor can be setup with the same release. Until early 2018, the
0135 memory monitor was still located in the release area. After many problems with the memory monitor, it was decided
0136 to use a fixed version for the setup. Currently, release 21.0.22 is used.
0137
0138 :param pid: job process id (int).
0139 :param pgrp: process group id (int).
0140 :param jobid: job id (int).
0141 :param workdir: job work directory (string).
0142 :param command: payload command (string).
0143 :param setup: optional setup in case asetup can not be used, which uses infosys (string).
0144 :param use_container: optional boolean.
0145 :param transformation: optional name of transformation, e.g. Sim_tf.py (string).
0146 :param outdata: optional list of output fspec objects (list).
0147 :param dump_ps: should ps output be dumped when identifying prmon process? (Boolean).
0148 :return: job work directory (string), pid for process inside container (int).
0149 """
0150
0151
0152 pid = get_proper_pid(pid, pgrp, jobid, command=command, transformation=transformation, outdata=outdata, use_container=use_container, dump_ps=dump_ps)
0153 if pid == -1:
0154 logger.warning('process id was not identified before payload finished - will not launch memory monitor')
0155 return "", pid
0156
0157 release = "22.0.1"
0158 platform = "x86_64-centos7-gcc8-opt"
0159 if not setup:
0160 setup = get_asetup() + " Athena," + release + " --platform " + platform
0161 interval = 60
0162 if not setup.endswith(';'):
0163 setup += ';'
0164
0165 cmd = "%swhich prmon" % setup
0166 exit_code, stdout, stderr = execute(cmd)
0167 if stdout and "Command not found" not in stdout:
0168 _cmd = "prmon "
0169 else:
0170 logger.warning('failed to find prmon, defaulting to old memory monitor: %d, %s' % (exit_code, stderr))
0171 _cmd = "MemoryMonitor "
0172 setup = setup.replace(release, "21.0.22")
0173 setup = setup.replace(platform, "x86_64-slc6-gcc62-opt")
0174
0175 options = "--pid %d --filename %s --json-summary %s --interval %d" %\
0176 (pid, get_memory_monitor_output_filename(), get_memory_monitor_summary_filename(), interval)
0177 _cmd = "cd " + workdir + ";" + setup + _cmd + options
0178
0179 return _cmd, pid
0180
0181
0182 def get_proper_pid(pid, pgrp, jobid, command="", transformation="", outdata="", use_container=True, dump_ps=False):
0183 """
0184 Return a pid from the proper source to be used with the memory monitor.
0185 The given pid comes from Popen(), but in the case containers are used, the pid should instead come from a ps aux
0186 lookup.
0187 If the main process has finished before the proper pid has been identified (it will take time if the payload is
0188 running inside a container), then this function will abort and return -1. The called should handle this and not
0189 launch the memory monitor as it is not needed any longer.
0190
0191 :param pid: process id (int).
0192 :param pgrp: process group id (int).
0193 :param jobid: job id (int).
0194 :param command: payload command (string).
0195 :param transformation: optional name of transformation, e.g. Sim_tf.py (string).
0196 :param outdata: list of output fspec object (list).
0197 :param use_container: optional boolean.
0198 :return: pid (int).
0199 """
0200
0201 if not use_container:
0202 return pid
0203
0204
0205 if not is_process_running(pid):
0206 return -1
0207
0208
0209
0210 ps = get_ps_info(pgrp)
0211 if dump_ps:
0212 logger.debug('ps:\n%s' % ps)
0213
0214
0215
0216
0217
0218
0219 i = 0
0220 imax = 120
0221 while i < imax:
0222
0223 if not is_process_running(pid):
0224 return -1
0225
0226 ps = get_ps_info(pgrp)
0227
0228
0229
0230 logger.debug('attempting to identify pid from job id')
0231 _pid = get_pid_for_jobid(ps, jobid)
0232 if _pid:
0233 logger.debug('discovered pid=%d for job id %s' % (_pid, jobid))
0234 break
0235
0236
0237
0238
0239
0240
0241
0242 logger.warning('payload pid has not yet been identified (#%d/#%d)' % (i + 1, imax))
0243
0244
0245 time.sleep(5)
0246 i += 1
0247
0248 if _pid:
0249 pid = _pid
0250
0251 logger.info('will use pid=%d for memory monitor' % pid)
0252
0253 return pid
0254
0255
0256 def get_ps_info(pgrp, whoami=None, options='axfo pid,user,args'):
0257 """
0258 Return ps info for the given user.
0259
0260 :param pgrp: process group id (int).
0261 :param whoami: user name (string).
0262 :return: ps aux for given user (string).
0263 """
0264
0265 if not whoami:
0266 whoami = os.getuid()
0267
0268 cmd = "ps -u %s %s" % (whoami, options)
0269
0270
0271
0272 exit_code, stdout, stderr = execute(cmd)
0273
0274 return stdout
0275
0276
0277 def get_pid_for_jobid(ps, jobid):
0278 """
0279 Return the process id for the ps entry that contains the job id.
0280
0281 :param ps: ps command output (string).
0282 :param jobid: PanDA job id (int).
0283 :return: pid (int) or None if no such process.
0284 """
0285
0286 pid = None
0287
0288 for line in ps.split('\n'):
0289 if jobid in line and 'xrootd' not in line:
0290
0291 _pid = search(r'(\d+) ', line)
0292 try:
0293 pid = int(_pid.group(1))
0294 except Exception as e:
0295 logger.warning('pid has wrong type: %s' % e)
0296 else:
0297 logger.debug('extracted pid=%d from ps output' % pid)
0298 break
0299
0300 return pid
0301
0302
0303 def get_pid_for_trf(ps, transformation, outdata):
0304 """
0305 Return the process id for the given command and user.
0306 Note: function returns 0 in case pid could not be found.
0307
0308 :param ps: ps command output (string).
0309 :param transformation: transformation name, e.g. Sim_tf.py (String).
0310 :param outdata: fspec objects (list).
0311 :return: pid (int) or None if no such process.
0312 """
0313
0314 pid = None
0315 candidates = []
0316
0317
0318 if "/" in transformation:
0319 transformation = transformation.split('/')[-1]
0320 logger.debug('using transformation name: %s' % transformation)
0321 for line in ps.split('\n'):
0322 if transformation in line:
0323 candidates.append(line)
0324 break
0325
0326 if candidates:
0327 for line in candidates:
0328 for fspec in outdata:
0329 if fspec.lfn in line:
0330
0331 _pid = search(r'(\d+) ', line)
0332 try:
0333 pid = int(_pid.group(1))
0334 except Exception as e:
0335 logger.warning('pid has wrong type: %s' % e)
0336 else:
0337 logger.debug('extracted pid=%d from ps output' % pid)
0338 break
0339 if pid:
0340 break
0341 else:
0342 logger.debug('pid not found in ps output for trf=%s' % transformation)
0343
0344 return pid
0345
0346
0347 def get_pid_for_command(ps, command="python pilot2/pilot.py"):
0348 """
0349 Return the process id for the given command and user.
0350 The function returns 0 in case pid could not be found.
0351 If no command is specified, the function looks for the "python pilot2/pilot.py" command in the ps output.
0352
0353 :param ps: ps command output (string).
0354 :param command: command string expected to be in ps output (string).
0355 :return: pid (int) or None if no such process.
0356 """
0357
0358 pid = None
0359 found = None
0360
0361 for line in ps.split('\n'):
0362 if command in line:
0363 found = line
0364 break
0365 if found:
0366
0367 _pid = search(r'(\d+) ', found)
0368 try:
0369 pid = int(_pid.group(1))
0370 except Exception as e:
0371 logger.warning('pid has wrong type: %s' % e)
0372 else:
0373 logger.debug('extracted pid=%d from ps output: %s' % (pid, found))
0374 else:
0375 logger.debug('command not found in ps output: %s' % command)
0376
0377 return pid
0378
0379
0380 def get_trf_command(command, transformation=""):
0381 """
0382 Return the last command in the full payload command string.
0383 Note: this function returns the last command in job.command which is only set for containers.
0384
0385 :param command: full payload command (string).
0386 :param transformation: optional name of transformation, e.g. Sim_tf.py (string).
0387 :return: trf command (string).
0388 """
0389
0390 payload_command = ""
0391 if command:
0392 if not transformation:
0393 payload_command = command.split(';')[-2]
0394 else:
0395 if transformation in command:
0396 payload_command = command[command.find(transformation):]
0397
0398
0399 payload_command = payload_command.strip()
0400 payload_command = payload_command.replace("'", "")
0401 payload_command = payload_command.rstrip(";")
0402
0403 return payload_command
0404
0405
0406 def get_memory_monitor_info_path(workdir, allowtxtfile=False):
0407 """
0408 Find the proper path to the utility info file
0409 Priority order:
0410 1. JSON summary file from workdir
0411 2. JSON summary file from pilot initdir
0412 3. Text output file from workdir (if allowtxtfile is True)
0413
0414 :param workdir: relevant work directory (string).
0415 :param allowtxtfile: boolean attribute to allow for reading the raw memory monitor output.
0416 :return: path (string).
0417 """
0418
0419 pilot_initdir = os.environ.get('PILOT_HOME', '')
0420 path = os.path.join(workdir, get_memory_monitor_summary_filename())
0421 init_path = os.path.join(pilot_initdir, get_memory_monitor_summary_filename())
0422
0423 if not os.path.exists(path):
0424 if os.path.exists(init_path):
0425 path = init_path
0426 else:
0427 logger.info("neither %s, nor %s exist" % (path, init_path))
0428 path = ""
0429
0430 if path == "" and allowtxtfile:
0431 path = os.path.join(workdir, get_memory_monitor_output_filename())
0432 if not os.path.exists(path):
0433 logger.warning("file does not exist either: %s" % (path))
0434
0435 return path
0436
0437
0438 def get_memory_monitor_info(workdir, allowtxtfile=False, name=""):
0439 """
0440 Add the utility info to the node structure if available.
0441
0442 :param workdir: relevant work directory (string).
0443 :param allowtxtfile: boolean attribute to allow for reading the raw memory monitor output.
0444 :param name: name of memory monitor (string).
0445 :return: node structure (dictionary).
0446 """
0447
0448 node = {}
0449
0450
0451
0452 try:
0453 summary_dictionary = get_memory_values(workdir, name=name)
0454 except Exception as e:
0455 logger.warning('failed to get memory values from memory monitor tool: %s' % e)
0456 summary_dictionary = {}
0457 else:
0458 logger.debug("summary_dictionary=%s" % str(summary_dictionary))
0459
0460
0461 if summary_dictionary and summary_dictionary != {}:
0462
0463 if 'maxRSS' in summary_dictionary['Max']:
0464 version = 'MemoryMonitor'
0465 elif 'rss' in summary_dictionary['Max']:
0466 version = 'prmon'
0467 else:
0468 version = 'unknown'
0469 if version == 'MemoryMonitor':
0470 try:
0471 node['maxRSS'] = summary_dictionary['Max']['maxRSS']
0472 node['maxVMEM'] = summary_dictionary['Max']['maxVMEM']
0473 node['maxSWAP'] = summary_dictionary['Max']['maxSwap']
0474 node['maxPSS'] = summary_dictionary['Max']['maxPSS']
0475 node['avgRSS'] = summary_dictionary['Avg']['avgRSS']
0476 node['avgVMEM'] = summary_dictionary['Avg']['avgVMEM']
0477 node['avgSWAP'] = summary_dictionary['Avg']['avgSwap']
0478 node['avgPSS'] = summary_dictionary['Avg']['avgPSS']
0479 except Exception as e:
0480 logger.warning("exception caught while parsing memory monitor file: %s" % e)
0481 logger.warning("will add -1 values for the memory info")
0482 node['maxRSS'] = -1
0483 node['maxVMEM'] = -1
0484 node['maxSWAP'] = -1
0485 node['maxPSS'] = -1
0486 node['avgRSS'] = -1
0487 node['avgVMEM'] = -1
0488 node['avgSWAP'] = -1
0489 node['avgPSS'] = -1
0490 else:
0491 logger.info("extracted standard info from memory monitor json")
0492 try:
0493 node['totRCHAR'] = summary_dictionary['Max']['totRCHAR']
0494 node['totWCHAR'] = summary_dictionary['Max']['totWCHAR']
0495 node['totRBYTES'] = summary_dictionary['Max']['totRBYTES']
0496 node['totWBYTES'] = summary_dictionary['Max']['totWBYTES']
0497 node['rateRCHAR'] = summary_dictionary['Avg']['rateRCHAR']
0498 node['rateWCHAR'] = summary_dictionary['Avg']['rateWCHAR']
0499 node['rateRBYTES'] = summary_dictionary['Avg']['rateRBYTES']
0500 node['rateWBYTES'] = summary_dictionary['Avg']['rateWBYTES']
0501 except Exception:
0502 logger.warning("standard memory fields were not found in memory monitor json (or json doesn't exist yet)")
0503 else:
0504 logger.info("extracted standard memory fields from memory monitor json")
0505 elif version == 'prmon':
0506 try:
0507 node['maxRSS'] = int(summary_dictionary['Max']['rss'])
0508 node['maxVMEM'] = int(summary_dictionary['Max']['vmem'])
0509 node['maxSWAP'] = int(summary_dictionary['Max']['swap'])
0510 node['maxPSS'] = int(summary_dictionary['Max']['pss'])
0511 node['avgRSS'] = summary_dictionary['Avg']['rss']
0512 node['avgVMEM'] = summary_dictionary['Avg']['vmem']
0513 node['avgSWAP'] = summary_dictionary['Avg']['swap']
0514 node['avgPSS'] = summary_dictionary['Avg']['pss']
0515 except Exception as e:
0516 logger.warning("exception caught while parsing prmon file: %s" % e)
0517 logger.warning("will add -1 values for the memory info")
0518 node['maxRSS'] = -1
0519 node['maxVMEM'] = -1
0520 node['maxSWAP'] = -1
0521 node['maxPSS'] = -1
0522 node['avgRSS'] = -1
0523 node['avgVMEM'] = -1
0524 node['avgSWAP'] = -1
0525 node['avgPSS'] = -1
0526 else:
0527 logger.info("extracted standard info from prmon json")
0528 try:
0529 node['totRCHAR'] = int(summary_dictionary['Max']['rchar'])
0530 node['totWCHAR'] = int(summary_dictionary['Max']['wchar'])
0531 node['totRBYTES'] = int(summary_dictionary['Max']['read_bytes'])
0532 node['totWBYTES'] = int(summary_dictionary['Max']['write_bytes'])
0533 node['rateRCHAR'] = summary_dictionary['Avg']['rchar']
0534 node['rateWCHAR'] = summary_dictionary['Avg']['wchar']
0535 node['rateRBYTES'] = summary_dictionary['Avg']['read_bytes']
0536 node['rateWBYTES'] = summary_dictionary['Avg']['write_bytes']
0537 except Exception:
0538 logger.warning("standard memory fields were not found in prmon json (or json doesn't exist yet)")
0539 else:
0540 logger.info("extracted standard memory fields from prmon json")
0541 else:
0542 logger.warning('unknown memory monitor version')
0543 else:
0544 logger.info("memory summary dictionary not yet available")
0545
0546 return node
0547
0548
0549 def get_max_memory_monitor_value(value, maxvalue, totalvalue):
0550 """
0551 Return the max and total value (used by memory monitoring).
0552 Return an error code, 1, in case of value error.
0553
0554 :param value: value to be tested (integer).
0555 :param maxvalue: current maximum value (integer).
0556 :param totalvalue: total value (integer).
0557 :return: exit code, maximum and total value (tuple of integers).
0558 """
0559
0560 ec = 0
0561 try:
0562 value_int = int(value)
0563 except Exception as e:
0564 logger.warning("exception caught: %s" % e)
0565 ec = 1
0566 else:
0567 totalvalue += value_int
0568 if value_int > maxvalue:
0569 maxvalue = value_int
0570
0571 return ec, maxvalue, totalvalue
0572
0573
0574 def convert_unicode_string(unicode_string):
0575 """
0576 Convert a unicode string into str.
0577
0578 :param unicode_string:
0579 :return: string.
0580 """
0581
0582 if unicode_string is not None:
0583 return str(unicode_string)
0584 return None
0585
0586
0587 def get_average_summary_dictionary_prmon(path):
0588 """
0589 Loop over the memory monitor output file and create the averaged summary dictionary.
0590
0591 prmon keys:
0592 'Time', 'nprocs', 'nthreads', 'pss', 'rchar', 'read_bytes', 'rss', 'rx_bytes',
0593 'rx_packets', 'stime', 'swap', 'tx_bytes', 'tx_packets', 'utime', 'vmem', 'wchar',
0594 'write_bytes', 'wtime'
0595
0596 The function uses the first line in the output file to define the dictionary keys used
0597 later in the function. This means that any change in the format such as new columns
0598 will be handled automatically.
0599
0600 :param path: path to memory monitor txt output file (string).
0601 :return: summary dictionary.
0602 """
0603
0604 summary_dictionary = {}
0605
0606
0607 dictionary = convert_text_file_to_dictionary(path)
0608
0609 if dictionary:
0610
0611 summary_dictionary = {"Max": {}, "Avg": {}, "Other": {}}
0612
0613 def filter_value(value):
0614 """ Inline function used to remove any string or None values from data. """
0615 if type(value) == str or value is None:
0616 return False
0617 else:
0618 return True
0619
0620 keys = ['vmem', 'pss', 'rss', 'swap']
0621 values = {}
0622 for key in keys:
0623 value_list = list(filter(filter_value, dictionary.get(key, 0)))
0624 n = len(value_list)
0625 average = int(float(sum(value_list)) / float(n)) if n > 0 else 0
0626 maximum = max(value_list)
0627 values[key] = {'avg': average, 'max': maximum}
0628
0629 summary_dictionary["Max"] = {"maxVMEM": values['vmem'].get('max'), "maxPSS": values['pss'].get('max'),
0630 "maxRSS": values['rss'].get('max'), "maxSwap": values['swap'].get('max')}
0631 summary_dictionary["Avg"] = {"avgVMEM": values['vmem'].get('avg'), "avgPSS": values['pss'].get('avg'),
0632 "avgRSS": values['rss'].get('avg'), "avgSwap": values['swap'].get('avg')}
0633
0634
0635 keys = ['rchar', 'wchar', 'read_bytes', 'write_bytes', 'nprocs']
0636
0637 for key in keys:
0638 value = get_last_value(dictionary.get(key, None))
0639 if value:
0640 summary_dictionary["Other"][key] = value
0641
0642 return summary_dictionary
0643
0644
0645 def get_metadata_dict_from_txt(path, storejson=False, jobid=None):
0646 """
0647 Convert memory monitor text output to json, store it, and return a selection as a dictionary.
0648
0649 :param path:
0650 :param storejson: store dictionary on disk if True (boolean).
0651 :param jobid: job id (string).
0652 :return: prmon metadata (dictionary).
0653 """
0654
0655
0656 dictionary = convert_text_file_to_dictionary(path)
0657
0658 if dictionary and storejson:
0659
0660 dictionary['type'] = 'MemoryMonitorData'
0661 dictionary['pandaid'] = jobid
0662
0663 path = os.path.join(os.path.dirname(path), get_memory_monitor_output_filename(suffix='json'))
0664 logger.debug('writing prmon dictionary to: %s' % path)
0665 write_json(path, dictionary)
0666 else:
0667 logger.debug('nothing to write (no prmon dictionary)')
0668
0669
0670
0671
0672 return dictionary
0673
0674
0675 def convert_text_file_to_dictionary(path):
0676 """
0677 Convert row-column text file to dictionary.
0678 User first row identifiers as dictionary keys.
0679 Note: file must follow the convention:
0680 NAME1 NAME2 ..
0681 value1 value2 ..
0682 .. .. ..
0683
0684 :param path: path to file (string).
0685 :return: dictionary.
0686 """
0687
0688 summary_keys = []
0689 header_locked = False
0690 dictionary = {}
0691
0692 with open(path) as f:
0693 for line in f:
0694 line = convert_unicode_string(line)
0695 if line != "":
0696 try:
0697
0698 _l = line.replace('\n', '')
0699 if is_python3():
0700 _l = [_f for _f in _l.split('\t') if _f]
0701 else:
0702 _l = filter(None, _l.split('\t'))
0703
0704
0705 if type(_l[0]) == str and not header_locked:
0706 summary_keys = _l
0707 for key in _l:
0708 dictionary[key] = []
0709 header_locked = True
0710 else:
0711 for i, key in enumerate(_l):
0712
0713 key_entry = summary_keys[i]
0714 value = convert_to_int(key)
0715 dictionary[key_entry].append(value)
0716 except Exception:
0717 logger.warning("unexpected format of utility output: %s" % line)
0718
0719 return dictionary
0720
0721
0722 def get_last_value(value_list):
0723 value = None
0724 if value_list:
0725 value = value_list[-1]
0726 return value
0727
0728
0729 def get_average_summary_dictionary(path):
0730 """
0731 Loop over the memory monitor output file and create the averaged summary dictionary.
0732
0733 :param path: path to memory monitor txt output file (string).
0734 :return: summary dictionary.
0735 """
0736
0737 maxvmem = -1
0738 maxrss = -1
0739 maxpss = -1
0740 maxswap = -1
0741 avgvmem = 0
0742 avgrss = 0
0743 avgpss = 0
0744 avgswap = 0
0745 totalvmem = 0
0746 totalrss = 0
0747 totalpss = 0
0748 totalswap = 0
0749 n = 0
0750 summary_dictionary = {}
0751
0752 rchar = None
0753 wchar = None
0754 rbytes = None
0755 wbytes = None
0756
0757 first = True
0758 with open(path) as f:
0759 for line in f:
0760
0761 if first:
0762 first = False
0763 continue
0764 line = convert_unicode_string(line)
0765 if line != "":
0766 try:
0767
0768 if is_python3():
0769 _l = [_f for _f in line.split('\t') if _f]
0770 else:
0771 _l = filter(None, line.split('\t'))
0772
0773 vmem = _l[1]
0774 pss = _l[2]
0775 rss = _l[3]
0776 swap = _l[4]
0777
0778 if len(_l) == 9:
0779 rchar = int(_l[5])
0780 wchar = int(_l[6])
0781 rbytes = int(_l[7])
0782 wbytes = int(_l[8])
0783 else:
0784 rchar = None
0785 wchar = None
0786 rbytes = None
0787 wbytes = None
0788 except Exception:
0789 logger.warning("unexpected format of utility output: %s (expected format: Time, VMEM,"
0790 " PSS, RSS, Swap [, RCHAR, WCHAR, RBYTES, WBYTES])" % (line))
0791 else:
0792
0793 ec1, maxvmem, totalvmem = get_max_memory_monitor_value(vmem, maxvmem, totalvmem)
0794 ec2, maxpss, totalpss = get_max_memory_monitor_value(pss, maxpss, totalpss)
0795 ec3, maxrss, totalrss = get_max_memory_monitor_value(rss, maxrss, totalrss)
0796 ec4, maxswap, totalswap = get_max_memory_monitor_value(swap, maxswap, totalswap)
0797 if ec1 or ec2 or ec3 or ec4:
0798 logger.warning("will skip this row of numbers due to value exception: %s" % (line))
0799 else:
0800 n += 1
0801
0802
0803 summary_dictionary = {"Max": {}, "Avg": {}, "Other": {}}
0804 summary_dictionary["Max"] = {"maxVMEM": maxvmem, "maxPSS": maxpss, "maxRSS": maxrss, "maxSwap": maxswap}
0805 if rchar:
0806 summary_dictionary["Other"]["rchar"] = rchar
0807 if wchar:
0808 summary_dictionary["Other"]["wchar"] = wchar
0809 if rbytes:
0810 summary_dictionary["Other"]["rbytes"] = rbytes
0811 if wbytes:
0812 summary_dictionary["Other"]["wbytes"] = wbytes
0813 if n > 0:
0814 avgvmem = int(float(totalvmem) / float(n))
0815 avgpss = int(float(totalpss) / float(n))
0816 avgrss = int(float(totalrss) / float(n))
0817 avgswap = int(float(totalswap) / float(n))
0818 summary_dictionary["Avg"] = {"avgVMEM": avgvmem, "avgPSS": avgpss, "avgRSS": avgrss, "avgSwap": avgswap}
0819
0820 return summary_dictionary
0821
0822
0823 def get_memory_values(workdir, name=""):
0824 """
0825 Find the values in the memory monitor output file.
0826
0827 In case the summary JSON file has not yet been produced, create a summary dictionary with the same format
0828 using the output text file (produced by the memory monitor and which is updated once per minute).
0829
0830 FORMAT:
0831 {"Max":{"maxVMEM":40058624,"maxPSS":10340177,"maxRSS":16342012,"maxSwap":16235568},
0832 "Avg":{"avgVMEM":19384236,"avgPSS":5023500,"avgRSS":6501489,"avgSwap":5964997},
0833 "Other":{"rchar":NN,"wchar":NN,"rbytes":NN,"wbytes":NN}}
0834
0835 :param workdir: relevant work directory (string).
0836 :param name: name of memory monitor (string).
0837 :return: memory values dictionary.
0838 """
0839
0840 summary_dictionary = {}
0841
0842
0843 path = get_memory_monitor_info_path(workdir, allowtxtfile=True)
0844 if os.path.exists(path):
0845 logger.info("using path: %s (trf name=%s)" % (path, name))
0846
0847
0848 if path.lower().endswith('json'):
0849
0850 summary_dictionary = read_json(path)
0851 else:
0852
0853 if name == "prmon":
0854 summary_dictionary = get_average_summary_dictionary_prmon(path)
0855 else:
0856 summary_dictionary = get_average_summary_dictionary(path)
0857 logger.debug('summary_dictionary=%s (trf name=%s)' % (str(summary_dictionary), name))
0858 else:
0859 if path == "":
0860 logger.warning("filename not set for memory monitor output")
0861 else:
0862
0863 pass
0864
0865 return summary_dictionary
0866
0867
0868 def post_memory_monitor_action(job):
0869 """
0870 Perform post action items for memory monitor.
0871
0872 :param job: job object.
0873 :return:
0874 """
0875
0876 nap = 3
0877 path1 = os.path.join(job.workdir, get_memory_monitor_summary_filename())
0878 path2 = os.environ.get('PILOT_HOME')
0879 i = 0
0880 maxretry = 20
0881 while i <= maxretry:
0882 if os.path.exists(path1):
0883 break
0884 logger.info("taking a short nap (%d s) to allow the memory monitor to finish writing to the summary file (#%d/#%d)"
0885 % (nap, i, maxretry))
0886 time.sleep(nap)
0887 i += 1
0888
0889 try:
0890 copy(path1, path2)
0891 except Exception as e:
0892 logger.warning('failed to copy memory monitor output: %s' % e)
0893
0894
0895 def precleanup():
0896 """
0897 Pre-cleanup at the beginning of the job to remove any pre-existing files from previous jobs in the main work dir.
0898
0899 :return:
0900 """
0901
0902 logger.debug('performing pre-cleanup of potentially pre-existing files from earlier job in main work dir')
0903 path = os.path.join(os.environ.get('PILOT_HOME'), get_memory_monitor_summary_filename())
0904 if os.path.exists(path):
0905 logger.info('removing no longer needed file: %s' % path)
0906 remove(path)