File indexing completed on 2026-04-10 08:39:14
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 from .services import Services
0011 from pilot.common.exception import NotDefined, NotSameLength, UnknownException
0012 from pilot.util.filehandling import get_table_from_file
0013 from pilot.util.math import mean, sum_square_dev, sum_dev, chi2, float_to_rounded_string
0014
0015 import logging
0016 logger = logging.getLogger(__name__)
0017
0018
0019 class Analytics(Services):
0020 """
0021 Analytics service class.
0022 """
0023
0024 _fit = None
0025
0026 def __init__(self, **kwargs):
0027 """
0028 Init function.
0029
0030 :param kwargs:
0031 """
0032
0033 self._fit = None
0034
0035 def fit(self, x, y, model='linear'):
0036 """
0037 Fitting function.
0038 For a linear model: y(x) = slope * x + intersect
0039
0040 :param x: list of input data (list of floats or ints).
0041 :param y: list of input data (list of floats or ints).
0042 :param model: model name (string).
0043 :raises UnknownException: in case Fit() fails.
0044 :return:
0045 """
0046
0047 try:
0048 self._fit = Fit(x=x, y=y, model=model)
0049 except Exception as e:
0050 raise UnknownException(e)
0051
0052 return self._fit
0053
0054 def slope(self):
0055 """
0056 Return the slope of a linear fit, y(x) = slope * x + intersect.
0057
0058 :raises NotDefined: exception thrown if fit is not defined.
0059 :return: slope (float).
0060 """
0061
0062 slope = None
0063
0064 if self._fit:
0065 slope = self._fit.slope()
0066 else:
0067 raise NotDefined('Fit has not been defined')
0068
0069 return slope
0070
0071 def intersect(self):
0072 """
0073 Return the intersect of a linear fit, y(x) = slope * x + intersect.
0074
0075 :raises NotDefined: exception thrown if fit is not defined.
0076 :return: intersect (float).
0077 """
0078
0079 intersect = None
0080
0081 if self._fit:
0082 intersect = self._fit.intersect()
0083 else:
0084 raise NotDefined('Fit has not been defined')
0085
0086 return intersect
0087
0088 def chi2(self):
0089 """
0090 Return the chi2 of the fit.
0091
0092 :raises NotDefined: exception thrown if fit is not defined.
0093 :return: chi2 (float).
0094 """
0095
0096 x2 = None
0097
0098 if self._fit:
0099 x2 = self._fit.chi2()
0100 else:
0101 raise NotDefined('Fit has not been defined')
0102
0103 return x2
0104
0105 def get_table(self, filename, header=None, separator="\t", convert_to_float=True):
0106 """
0107
0108 :param filename: full path to input file (string).
0109 :param header: header string.
0110 :param separator: separator character (char).
0111 :param convert_to_float: boolean, if True, all values will be converted to floats.
0112 :return: dictionary.
0113 """
0114
0115 return get_table_from_file(filename, header=header, separator=separator, convert_to_float=convert_to_float)
0116
0117 def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision=2, tails=True):
0118 """
0119 Return a properly formatted job metrics string with analytics data.
0120 Currently the function returns a fit for PSS+Swap vs time, whose slope measures memory leaks.
0121
0122 :param filename: full path to memory monitor output (string).
0123 :param x_name: optional string, name selector for table column.
0124 :param y_name: optional string, name selector for table column.
0125 :param precision: optional precision for fitted slope parameter, default 2.
0126 :param tails: should tails (first and last values) be used? (boolean).
0127 :return: {"slope": slope, "chi2": chi2} (float strings with desired precision).
0128 """
0129
0130 slope = ""
0131 chi2 = ""
0132 table = self.get_table(filename)
0133
0134 if table:
0135
0136 x, y = self.extract_from_table(table, x_name, y_name)
0137
0138
0139
0140
0141 if not tails and len(x) > 7 and len(y) > 7:
0142 logger.debug('removing tails from data to be fitted')
0143 x = x[5:]
0144 x = x[:-2]
0145 y = y[5:]
0146 y = y[:-2]
0147
0148 if (len(x) > 7 and len(y) > 7) and len(x) == len(y):
0149 logger.info('fitting %s vs %s', y_name, x_name)
0150 try:
0151 fit = self.fit(x, y)
0152 _slope = self.slope()
0153 except Exception as e:
0154 logger.warning('failed to fit data, x=%s, y=%s: %s', str(x), str(y), e)
0155 else:
0156 if _slope:
0157 slope = float_to_rounded_string(fit.slope(), precision=precision)
0158 chi2 = float_to_rounded_string(fit.chi2(), precision=precision)
0159 if slope != "":
0160 logger.info('current memory leak: %s B/s (using %d data points, chi2=%s)', slope, len(x), chi2)
0161 else:
0162 logger.warning('wrong length of table data, x=%s, y=%s (must be same and length>=4)', str(x), str(y))
0163
0164 return {"slope": slope, "chi2": chi2}
0165
0166 def extract_from_table(self, table, x_name, y_name):
0167 """
0168
0169 :param table: dictionary with columns.
0170 :param x_name: column name to be extracted (string).
0171 :param y_name: column name to be extracted (may contain '+'-sign) (string).
0172 :return: x (list), y (list).
0173 """
0174
0175 x = table.get(x_name, [])
0176 if '+' not in y_name:
0177 y = table.get(y_name, [])
0178 else:
0179 try:
0180 y1_name = y_name.split('+')[0]
0181 y2_name = y_name.split('+')[1]
0182 y1_value = table.get(y1_name, [])
0183 y2_value = table.get(y2_name, [])
0184 except Exception as error:
0185 logger.warning('exception caught: %s', error)
0186 x = []
0187 y = []
0188 else:
0189
0190 y = [x0 + y0 for x0, y0 in zip(y1_value, y2_value)]
0191
0192 return x, y
0193
0194
0195 class Fit(object):
0196 """
0197 Low-level fitting class.
0198 """
0199
0200 _model = 'linear'
0201 _x = None
0202 _y = None
0203 _xm = None
0204 _ym = None
0205 _ss = None
0206 _ss2 = None
0207 _slope = None
0208 _intersect = None
0209 _chi2 = None
0210
0211 def __init__(self, **kwargs):
0212 """
0213 Init function.
0214
0215 :param kwargs:
0216 :raises PilotException: NotImplementedError for unknown fitting model, NotDefined if input data not defined.
0217 """
0218
0219
0220 self._model = kwargs.get('model', 'linear')
0221 self._x = kwargs.get('x', None)
0222 self._y = kwargs.get('y', None)
0223
0224 if not self._x or not self._y:
0225 raise NotDefined('input data not defined')
0226
0227 if len(self._x) != len(self._y):
0228 raise NotSameLength('input data (lists) have different lengths')
0229
0230
0231 if self._model == 'linear':
0232 self._ss = sum_square_dev(self._x)
0233 self._ss2 = sum_dev(self._x, self._y)
0234 self.set_slope()
0235 self._xm = mean(self._x)
0236 self._ym = mean(self._y)
0237 self.set_intersect()
0238 self.set_chi2()
0239 else:
0240 logger.warning("\'%s\' model is not implemented", self._model)
0241 raise NotImplementedError()
0242
0243 def fit(self):
0244 """
0245 Return fitting object.
0246
0247 :return: fitting object.
0248 """
0249
0250 return self
0251
0252 def value(self, t):
0253 """
0254 Return the value y(x=t) of a linear fit y(x) = slope * x + intersect.
0255
0256 :return: intersect (float).
0257 """
0258
0259 return self._slope * t + self._intersect
0260
0261 def set_chi2(self):
0262 """
0263 Calculate and set the chi2 value.
0264
0265 :return:
0266 """
0267
0268 y_observed = self._y
0269 y_expected = []
0270
0271 for x in self._x:
0272
0273 y_expected.append(self.value(x))
0274
0275 if y_observed and y_observed != [] and y_expected and y_expected != []:
0276 self._chi2 = chi2(y_observed, y_expected)
0277 else:
0278 self._chi2 = None
0279
0280 def chi2(self):
0281 """
0282 Return the chi2 value.
0283
0284 :return: chi2 (float).
0285 """
0286
0287 return self._chi2
0288
0289 def set_slope(self):
0290 """
0291 Calculate and set the slope of the linear fit.
0292
0293 :return:
0294 """
0295
0296 if self._ss2 and self._ss and self._ss != 0:
0297 self._slope = self._ss2 / float(self._ss)
0298 else:
0299 self._slope = None
0300
0301 def slope(self):
0302 """
0303 Return the slope value.
0304
0305 :return: slope (float).
0306 """
0307
0308 return self._slope
0309
0310 def set_intersect(self):
0311 """
0312 Calculate and set the intersect of the linear fit.
0313
0314 :return:
0315 """
0316
0317 if self._ym and self._slope and self._xm:
0318 self._intersect = self._ym - self._slope * self._xm
0319 else:
0320 self._intersect = None
0321
0322 def intersect(self):
0323 """
0324 Return the intersect value.
0325
0326 :return: intersect (float).
0327 """
0328
0329 return self._intersect