Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:14

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
0009 
0010 from .services import Services
0011 from pilot.common.exception import NotDefined, NotSameLength, UnknownException
0012 from pilot.util.filehandling import get_table_from_file
0013 from pilot.util.math import mean, sum_square_dev, sum_dev, chi2, float_to_rounded_string
0014 
0015 import logging
0016 logger = logging.getLogger(__name__)
0017 
0018 
0019 class Analytics(Services):
0020     """
0021     Analytics service class.
0022     """
0023 
0024     _fit = None
0025 
0026     def __init__(self, **kwargs):
0027         """
0028         Init function.
0029 
0030         :param kwargs:
0031         """
0032 
0033         self._fit = None
0034 
0035     def fit(self, x, y, model='linear'):
0036         """
0037         Fitting function.
0038         For a linear model: y(x) = slope * x + intersect
0039 
0040         :param x: list of input data (list of floats or ints).
0041         :param y: list of input data (list of floats or ints).
0042         :param model: model name (string).
0043         :raises UnknownException: in case Fit() fails.
0044         :return:
0045         """
0046 
0047         try:
0048             self._fit = Fit(x=x, y=y, model=model)
0049         except Exception as e:
0050             raise UnknownException(e)
0051 
0052         return self._fit
0053 
0054     def slope(self):
0055         """
0056         Return the slope of a linear fit, y(x) = slope * x + intersect.
0057 
0058         :raises NotDefined: exception thrown if fit is not defined.
0059         :return: slope (float).
0060         """
0061 
0062         slope = None
0063 
0064         if self._fit:
0065             slope = self._fit.slope()
0066         else:
0067             raise NotDefined('Fit has not been defined')
0068 
0069         return slope
0070 
0071     def intersect(self):
0072         """
0073         Return the intersect of a linear fit, y(x) = slope * x + intersect.
0074 
0075         :raises NotDefined: exception thrown if fit is not defined.
0076         :return: intersect (float).
0077         """
0078 
0079         intersect = None
0080 
0081         if self._fit:
0082             intersect = self._fit.intersect()
0083         else:
0084             raise NotDefined('Fit has not been defined')
0085 
0086         return intersect
0087 
0088     def chi2(self):
0089         """
0090         Return the chi2 of the fit.
0091 
0092         :raises NotDefined: exception thrown if fit is not defined.
0093         :return: chi2 (float).
0094         """
0095 
0096         x2 = None
0097 
0098         if self._fit:
0099             x2 = self._fit.chi2()
0100         else:
0101             raise NotDefined('Fit has not been defined')
0102 
0103         return x2
0104 
0105     def get_table(self, filename, header=None, separator="\t", convert_to_float=True):
0106         """
0107 
0108         :param filename: full path to input file (string).
0109         :param header: header string.
0110         :param separator: separator character (char).
0111         :param convert_to_float: boolean, if True, all values will be converted to floats.
0112         :return: dictionary.
0113         """
0114 
0115         return get_table_from_file(filename, header=header, separator=separator, convert_to_float=convert_to_float)
0116 
0117     def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision=2, tails=True):
0118         """
0119         Return a properly formatted job metrics string with analytics data.
0120         Currently the function returns a fit for PSS+Swap vs time, whose slope measures memory leaks.
0121 
0122         :param filename: full path to memory monitor output (string).
0123         :param x_name: optional string, name selector for table column.
0124         :param y_name: optional string, name selector for table column.
0125         :param precision: optional precision for fitted slope parameter, default 2.
0126         :param tails: should tails (first and last values) be used? (boolean).
0127         :return: {"slope": slope, "chi2": chi2} (float strings with desired precision).
0128         """
0129 
0130         slope = ""
0131         chi2 = ""
0132         table = self.get_table(filename)
0133 
0134         if table:
0135             # extract data to be fitted
0136             x, y = self.extract_from_table(table, x_name, y_name)
0137 
0138             # remove tails if desired
0139             # this is useful e.g. for memory monitor data where the first and last values
0140             # represent allocation and de-allocation, ie not interesting
0141             if not tails and len(x) > 7 and len(y) > 7:
0142                 logger.debug('removing tails from data to be fitted')
0143                 x = x[5:]
0144                 x = x[:-2]
0145                 y = y[5:]
0146                 y = y[:-2]
0147 
0148             if (len(x) > 7 and len(y) > 7) and len(x) == len(y):
0149                 logger.info('fitting %s vs %s', y_name, x_name)
0150                 try:
0151                     fit = self.fit(x, y)
0152                     _slope = self.slope()
0153                 except Exception as e:
0154                     logger.warning('failed to fit data, x=%s, y=%s: %s', str(x), str(y), e)
0155                 else:
0156                     if _slope:
0157                         slope = float_to_rounded_string(fit.slope(), precision=precision)
0158                         chi2 = float_to_rounded_string(fit.chi2(), precision=precision)
0159                         if slope != "":
0160                             logger.info('current memory leak: %s B/s (using %d data points, chi2=%s)', slope, len(x), chi2)
0161             else:
0162                 logger.warning('wrong length of table data, x=%s, y=%s (must be same and length>=4)', str(x), str(y))
0163 
0164         return {"slope": slope, "chi2": chi2}
0165 
0166     def extract_from_table(self, table, x_name, y_name):
0167         """
0168 
0169         :param table: dictionary with columns.
0170         :param x_name: column name to be extracted (string).
0171         :param y_name: column name to be extracted (may contain '+'-sign) (string).
0172         :return: x (list), y (list).
0173         """
0174 
0175         x = table.get(x_name, [])
0176         if '+' not in y_name:
0177             y = table.get(y_name, [])
0178         else:
0179             try:
0180                 y1_name = y_name.split('+')[0]
0181                 y2_name = y_name.split('+')[1]
0182                 y1_value = table.get(y1_name, [])
0183                 y2_value = table.get(y2_name, [])
0184             except Exception as error:
0185                 logger.warning('exception caught: %s', error)
0186                 x = []
0187                 y = []
0188             else:
0189                 # create new list with added values (1,2,3) + (4,5,6) = (5,7,9)
0190                 y = [x0 + y0 for x0, y0 in zip(y1_value, y2_value)]
0191 
0192         return x, y
0193 
0194 
0195 class Fit(object):
0196     """
0197     Low-level fitting class.
0198     """
0199 
0200     _model = 'linear'  # fitting model
0201     _x = None  # x values
0202     _y = None  # y values
0203     _xm = None  # x mean
0204     _ym = None  # y mean
0205     _ss = None  # sum of square deviations
0206     _ss2 = None  # sum of deviations
0207     _slope = None  # slope
0208     _intersect = None  # intersect
0209     _chi2 = None  # chi2
0210 
0211     def __init__(self, **kwargs):
0212         """
0213         Init function.
0214 
0215         :param kwargs:
0216         :raises PilotException: NotImplementedError for unknown fitting model, NotDefined if input data not defined.
0217         """
0218 
0219         # extract parameters
0220         self._model = kwargs.get('model', 'linear')
0221         self._x = kwargs.get('x', None)
0222         self._y = kwargs.get('y', None)
0223 
0224         if not self._x or not self._y:
0225             raise NotDefined('input data not defined')
0226 
0227         if len(self._x) != len(self._y):
0228             raise NotSameLength('input data (lists) have different lengths')
0229 
0230         # base calculations
0231         if self._model == 'linear':
0232             self._ss = sum_square_dev(self._x)
0233             self._ss2 = sum_dev(self._x, self._y)
0234             self.set_slope()
0235             self._xm = mean(self._x)
0236             self._ym = mean(self._y)
0237             self.set_intersect()
0238             self.set_chi2()
0239         else:
0240             logger.warning("\'%s\' model is not implemented", self._model)
0241             raise NotImplementedError()
0242 
0243     def fit(self):
0244         """
0245         Return fitting object.
0246 
0247         :return: fitting object.
0248         """
0249 
0250         return self
0251 
0252     def value(self, t):
0253         """
0254         Return the value y(x=t) of a linear fit y(x) = slope * x + intersect.
0255 
0256         :return: intersect (float).
0257         """
0258 
0259         return self._slope * t + self._intersect
0260 
0261     def set_chi2(self):
0262         """
0263         Calculate and set the chi2 value.
0264 
0265         :return:
0266         """
0267 
0268         y_observed = self._y
0269         y_expected = []
0270         #i = 0
0271         for x in self._x:
0272             #y_expected.append(self.value(x) - y_observed[i])
0273             y_expected.append(self.value(x))
0274             #i += 1
0275         if y_observed and y_observed != [] and y_expected and y_expected != []:
0276             self._chi2 = chi2(y_observed, y_expected)
0277         else:
0278             self._chi2 = None
0279 
0280     def chi2(self):
0281         """
0282         Return the chi2 value.
0283 
0284         :return: chi2 (float).
0285         """
0286 
0287         return self._chi2
0288 
0289     def set_slope(self):
0290         """
0291         Calculate and set the slope of the linear fit.
0292 
0293         :return:
0294         """
0295 
0296         if self._ss2 and self._ss and self._ss != 0:
0297             self._slope = self._ss2 / float(self._ss)
0298         else:
0299             self._slope = None
0300 
0301     def slope(self):
0302         """
0303         Return the slope value.
0304 
0305         :return: slope (float).
0306         """
0307 
0308         return self._slope
0309 
0310     def set_intersect(self):
0311         """
0312         Calculate and set the intersect of the linear fit.
0313 
0314         :return:
0315         """
0316 
0317         if self._ym and self._slope and self._xm:
0318             self._intersect = self._ym - self._slope * self._xm
0319         else:
0320             self._intersect = None
0321 
0322     def intersect(self):
0323         """
0324         Return the intersect value.
0325 
0326         :return: intersect (float).
0327         """
0328 
0329         return self._intersect