Warning, file /pilot2/pilot/user/atlas/jobmetrics.py was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import os
0011 import re
0012 import logging
0013
0014 from pilot.api import analytics
0015 from pilot.util.jobmetrics import get_job_metrics_entry
0016 from pilot.util.filehandling import find_last_line
0017
0018 from .cpu import get_core_count
0019 from .common import get_db_info, get_resimevents
0020 from .utilities import get_memory_monitor_output_filename
0021
0022 logger = logging.getLogger(__name__)
0023
0024
0025 def get_job_metrics_string(job):
0026 """
0027 Get the job metrics string.
0028
0029 :param job: job object.
0030 :return: job metrics (string).
0031 """
0032
0033 job_metrics = ""
0034
0035
0036 corecount = get_core_count(job)
0037 logger.debug('job definition core count: %d', corecount)
0038
0039
0040
0041
0042
0043 if job.actualcorecount:
0044 job_metrics += get_job_metrics_entry("actualCoreCount", job.actualcorecount)
0045
0046
0047 if job.nevents > 0:
0048 job_metrics += get_job_metrics_entry("nEvents", job.nevents)
0049 if job.neventsw > 0:
0050 job_metrics += get_job_metrics_entry("nEventsW", job.neventsw)
0051
0052
0053 if job.metadata:
0054 job.dbtime, job.dbdata = get_db_info(job.metadata)
0055 job.resimevents = get_resimevents(job.metadata)
0056 if job.dbtime and job.dbtime != "":
0057 job_metrics += get_job_metrics_entry("dbTime", job.dbtime)
0058 if job.dbdata and job.dbdata != "":
0059 job_metrics += get_job_metrics_entry("dbData", job.dbdata)
0060 if job.resimevents is not None:
0061 job_metrics += get_job_metrics_entry("resimevents", job.resimevents)
0062
0063
0064 if job.state == "finished" or job.state == "failed" or job.state == "holding":
0065 max_space = job.get_max_workdir_size()
0066
0067 try:
0068 zero = long(0)
0069 except NameError:
0070 zero = 0
0071
0072 if max_space > zero:
0073 job_metrics += get_job_metrics_entry("workDirSize", max_space)
0074 else:
0075 logger.info("will not add max space = %d B to job metrics", max_space)
0076
0077
0078 job_metrics = add_analytics_data(job_metrics, job.workdir, job.state)
0079
0080
0081 job_metrics = add_event_number(job_metrics, job.workdir)
0082
0083 return job_metrics
0084
0085
0086 def add_analytics_data(job_metrics, workdir, state):
0087 """
0088 Add the memory leak+chi2 analytics data to the job metrics.
0089
0090 :param job_metrics: job metrics (string).
0091 :param workdir: work directory (string).
0092 :param state: job state (string).
0093 :return: updated job metrics (string).
0094 """
0095
0096 path = os.path.join(workdir, get_memory_monitor_output_filename())
0097 if os.path.exists(path):
0098 client = analytics.Analytics()
0099
0100 tails = False if (state == "finished" or state == "failed" or state == "holding") else True
0101 data = client.get_fitted_data(path, tails=tails)
0102 slope = data.get("slope", "")
0103 chi2 = data.get("chi2", "")
0104 if slope != "":
0105 job_metrics += get_job_metrics_entry("leak", slope)
0106 if chi2 != "":
0107 job_metrics += get_job_metrics_entry("chi2", chi2)
0108
0109 return job_metrics
0110
0111
0112 def add_event_number(job_metrics, workdir):
0113 """
0114 Extract event number from file and add to job metrics if it exists
0115
0116 :param job_metrics: job metrics (string).
0117 :param workdir: work directory (string).
0118 :return: updated job metrics (string).
0119 """
0120
0121 path = os.path.join(workdir, 'eventLoopHeartBeat.txt')
0122 if os.path.exists(path):
0123 last_line = find_last_line(path)
0124 if last_line:
0125 event_number = get_number_in_string(last_line)
0126 if event_number:
0127 job_metrics += get_job_metrics_entry("eventnumber", event_number)
0128 else:
0129 logger.debug('file %s does not exist (skip for now)', path)
0130
0131 return job_metrics
0132
0133
0134 def get_job_metrics(job):
0135 """
0136 Return a properly formatted job metrics string.
0137 The format of the job metrics string is defined by the server. It will be reported to the server during updateJob.
0138
0139 Example of job metrics:
0140 Number of events read | Number of events written | vmPeak maximum | vmPeak average | RSS average | ..
0141 Format: nEvents=<int> nEventsW=<int> vmPeakMax=<int> vmPeakMean=<int> RSSMean=<int> hs06=<float> shutdownTime=<int>
0142 cpuFactor=<float> cpuLimit=<float> diskLimit=<float> jobStart=<int> memLimit=<int> runLimit=<float>
0143
0144 :param job: job object.
0145 :return: job metrics (string).
0146 """
0147
0148
0149 job_metrics = get_job_metrics_string(job)
0150
0151
0152 job_metrics = job_metrics.lstrip().rstrip()
0153
0154 if job_metrics != "":
0155 logger.debug('job metrics=\"%s\"', job_metrics)
0156 else:
0157 logger.debug("no job metrics (all values are zero)")
0158
0159
0160 if len(job_metrics) > 500:
0161 logger.warning("job_metrics out of size (%d)", len(job_metrics))
0162
0163
0164 job_metrics = job_metrics[:500]
0165 job_metrics = " ".join(job_metrics.split(" ")[:-1])
0166 logger.warning("job_metrics has been reduced to: %s", job_metrics)
0167
0168 return job_metrics
0169
0170
0171 def get_number_in_string(line, pattern=r'\ done\ processing\ event\ \#(\d+)\,'):
0172 """
0173 Extract a number from the given string.
0174
0175 E.g. file eventLoopHeartBeat.txt contains
0176 done processing event #20166959, run #276689 22807 events read so far <<<===
0177 This function will return 20166959 as in int.
0178
0179 :param line: line from a file (string).
0180 :param pattern: reg ex pattern (raw string).
0181 :return: extracted number (int).
0182 """
0183
0184 event_number = None
0185 match = re.search(pattern, line)
0186 if match:
0187 try:
0188 event_number = int(match.group(1))
0189 except (TypeError, ValueError):
0190 pass
0191
0192 return event_number