Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /pilot2/pilot/user/atlas/jobmetrics.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
0009 
0010 import os
0011 import re
0012 import logging
0013 
0014 from pilot.api import analytics
0015 from pilot.util.jobmetrics import get_job_metrics_entry
0016 from pilot.util.filehandling import find_last_line
0017 
0018 from .cpu import get_core_count
0019 from .common import get_db_info, get_resimevents
0020 from .utilities import get_memory_monitor_output_filename
0021 
0022 logger = logging.getLogger(__name__)
0023 
0024 
0025 def get_job_metrics_string(job):
0026     """
0027     Get the job metrics string.
0028 
0029     :param job: job object.
0030     :return: job metrics (string).
0031     """
0032 
0033     job_metrics = ""
0034 
0035     # report core count (will also set corecount in job object)
0036     corecount = get_core_count(job)
0037     logger.debug('job definition core count: %d', corecount)
0038 
0039     #if corecount is not None and corecount != "NULL" and corecount != 'null':
0040     #    job_metrics += get_job_metrics_entry("coreCount", corecount)
0041 
0042     # report number of actual used cores and add it to the list of measured core counts
0043     if job.actualcorecount:
0044         job_metrics += get_job_metrics_entry("actualCoreCount", job.actualcorecount)
0045 
0046     # report number of events
0047     if job.nevents > 0:
0048         job_metrics += get_job_metrics_entry("nEvents", job.nevents)
0049     if job.neventsw > 0:
0050         job_metrics += get_job_metrics_entry("nEventsW", job.neventsw)
0051 
0052     # add metadata from job report
0053     if job.metadata:
0054         job.dbtime, job.dbdata = get_db_info(job.metadata)
0055         job.resimevents = get_resimevents(job.metadata)
0056     if job.dbtime and job.dbtime != "":
0057         job_metrics += get_job_metrics_entry("dbTime", job.dbtime)
0058     if job.dbdata and job.dbdata != "":
0059         job_metrics += get_job_metrics_entry("dbData", job.dbdata)
0060     if job.resimevents is not None:
0061         job_metrics += get_job_metrics_entry("resimevents", job.resimevents)
0062 
0063     # get the max disk space used by the payload (at the end of a job)
0064     if job.state == "finished" or job.state == "failed" or job.state == "holding":
0065         max_space = job.get_max_workdir_size()
0066 
0067         try:
0068             zero = long(0)  # Python 2 # noqa: F821
0069         except NameError:
0070             zero = 0  # Python 3
0071 
0072         if max_space > zero:
0073             job_metrics += get_job_metrics_entry("workDirSize", max_space)
0074         else:
0075             logger.info("will not add max space = %d B to job metrics", max_space)
0076 
0077     # get analytics data
0078     job_metrics = add_analytics_data(job_metrics, job.workdir, job.state)
0079 
0080     # extract event number from file and add to job metrics if it exists
0081     job_metrics = add_event_number(job_metrics, job.workdir)
0082 
0083     return job_metrics
0084 
0085 
0086 def add_analytics_data(job_metrics, workdir, state):
0087     """
0088     Add the memory leak+chi2 analytics data to the job metrics.
0089 
0090     :param job_metrics: job metrics (string).
0091     :param workdir: work directory (string).
0092     :param state: job state (string).
0093     :return: updated job metrics (string).
0094     """
0095 
0096     path = os.path.join(workdir, get_memory_monitor_output_filename())
0097     if os.path.exists(path):
0098         client = analytics.Analytics()
0099         # do not include tails on final update
0100         tails = False if (state == "finished" or state == "failed" or state == "holding") else True
0101         data = client.get_fitted_data(path, tails=tails)
0102         slope = data.get("slope", "")
0103         chi2 = data.get("chi2", "")
0104         if slope != "":
0105             job_metrics += get_job_metrics_entry("leak", slope)
0106         if chi2 != "":
0107             job_metrics += get_job_metrics_entry("chi2", chi2)
0108 
0109     return job_metrics
0110 
0111 
0112 def add_event_number(job_metrics, workdir):
0113     """
0114     Extract event number from file and add to job metrics if it exists
0115 
0116     :param job_metrics: job metrics (string).
0117     :param workdir: work directory (string).
0118     :return: updated job metrics (string).
0119     """
0120 
0121     path = os.path.join(workdir, 'eventLoopHeartBeat.txt')
0122     if os.path.exists(path):
0123         last_line = find_last_line(path)
0124         if last_line:
0125             event_number = get_number_in_string(last_line)
0126             if event_number:
0127                 job_metrics += get_job_metrics_entry("eventnumber", event_number)
0128     else:
0129         logger.debug('file %s does not exist (skip for now)', path)
0130 
0131     return job_metrics
0132 
0133 
0134 def get_job_metrics(job):
0135     """
0136     Return a properly formatted job metrics string.
0137     The format of the job metrics string is defined by the server. It will be reported to the server during updateJob.
0138 
0139     Example of job metrics:
0140     Number of events read | Number of events written | vmPeak maximum | vmPeak average | RSS average | ..
0141     Format: nEvents=<int> nEventsW=<int> vmPeakMax=<int> vmPeakMean=<int> RSSMean=<int> hs06=<float> shutdownTime=<int>
0142             cpuFactor=<float> cpuLimit=<float> diskLimit=<float> jobStart=<int> memLimit=<int> runLimit=<float>
0143 
0144     :param job: job object.
0145     :return: job metrics (string).
0146     """
0147 
0148     # get job metrics string
0149     job_metrics = get_job_metrics_string(job)
0150 
0151     # correct for potential initial and trailing space
0152     job_metrics = job_metrics.lstrip().rstrip()
0153 
0154     if job_metrics != "":
0155         logger.debug('job metrics=\"%s\"', job_metrics)
0156     else:
0157         logger.debug("no job metrics (all values are zero)")
0158 
0159     # is job_metrics within allowed size?
0160     if len(job_metrics) > 500:
0161         logger.warning("job_metrics out of size (%d)", len(job_metrics))
0162 
0163         # try to reduce the field size and remove the last entry which might be cut
0164         job_metrics = job_metrics[:500]
0165         job_metrics = " ".join(job_metrics.split(" ")[:-1])
0166         logger.warning("job_metrics has been reduced to: %s", job_metrics)
0167 
0168     return job_metrics
0169 
0170 
0171 def get_number_in_string(line, pattern=r'\ done\ processing\ event\ \#(\d+)\,'):
0172     """
0173     Extract a number from the given string.
0174 
0175     E.g. file eventLoopHeartBeat.txt contains
0176         done processing event #20166959, run #276689 22807 events read so far  <<<===
0177     This function will return 20166959 as in int.
0178 
0179     :param line: line from a file (string).
0180     :param pattern: reg ex pattern (raw string).
0181     :return: extracted number (int).
0182     """
0183 
0184     event_number = None
0185     match = re.search(pattern, line)
0186     if match:
0187         try:
0188             event_number = int(match.group(1))
0189         except (TypeError, ValueError):
0190             pass
0191 
0192     return event_number