File indexing completed on 2026-04-10 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import os
0019 import time
0020
0021 from pilot.util.config import config
0022 from pilot.util.constants import PILOT_START_TIME, PILOT_PRE_GETJOB, PILOT_POST_GETJOB, PILOT_PRE_SETUP, \
0023 PILOT_POST_SETUP, PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN, PILOT_PRE_PAYLOAD, PILOT_POST_PAYLOAD, PILOT_PRE_STAGEOUT,\
0024 PILOT_POST_STAGEOUT, PILOT_PRE_FINAL_UPDATE, PILOT_POST_FINAL_UPDATE, PILOT_END_TIME, PILOT_MULTIJOB_START_TIME
0025 from pilot.util.filehandling import read_json, write_json
0026
0027
0028 import logging
0029 logger = logging.getLogger(__name__)
0030
0031
0032 def read_pilot_timing():
0033 """
0034 Read the pilot timing dictionary from file.
0035
0036 :return: pilot timing dictionary (json dictionary).
0037 """
0038
0039 pilot_timing_dictionary = {}
0040
0041 path = os.path.join(os.environ.get('PILOT_HOME', ''), config.Pilot.timing_file)
0042 if os.path.exists(path):
0043 pilot_timing_dictionary = read_json(path)
0044
0045 return pilot_timing_dictionary
0046
0047
0048 def write_pilot_timing(pilot_timing_dictionary):
0049 """
0050 Write the given pilot timing dictionary to file.
0051
0052 :param pilot_timing_dictionary:
0053 :return:
0054 """
0055 timing_file = config.Pilot.timing_file
0056
0057
0058
0059 path = os.path.join(os.environ.get('PILOT_HOME', ''), timing_file)
0060 if write_json(path, pilot_timing_dictionary):
0061 logger.debug('updated pilot timing dictionary: %s' % path)
0062 else:
0063 logger.warning('failed to update pilot timing dictionary: %s' % path)
0064
0065
0066 def add_to_pilot_timing(job_id, timing_constant, time_measurement, args, store=False):
0067 """
0068 Add the given timing contant and measurement got job_id to the pilot timing dictionary.
0069
0070 :param job_id: PanDA job id (string).
0071 :param timing_constant: timing constant (string).
0072 :param time_measurement: time measurement (float).
0073 :param args: pilot arguments.
0074 :param store: if True, write timing dictionary to file. False by default.
0075 :return:
0076 """
0077
0078 if args.timing == {}:
0079 args.timing[job_id] = {timing_constant: time_measurement}
0080 else:
0081 if job_id not in args.timing:
0082 args.timing[job_id] = {}
0083 args.timing[job_id][timing_constant] = time_measurement
0084
0085
0086 if store:
0087 write_pilot_timing(args.timing)
0088
0089
0090 def get_initial_setup_time(job_id, args):
0091 """
0092 High level function that returns the time for the initial setup.
0093 The initial setup time is measured from PILOT_START_TIME to PILOT_PRE_GETJOB.
0094
0095 :param job_id: PanDA job id (string).
0096 :param args: pilot arguments.
0097 :return: time in seconds (int).
0098 """
0099
0100 return get_time_difference(job_id, PILOT_MULTIJOB_START_TIME, PILOT_PRE_GETJOB, args)
0101
0102
0103 def get_getjob_time(job_id, args):
0104 """
0105 High level function that returns the time for the getjob operation for the given job_id.
0106
0107 :param job_id: PanDA job id (string).
0108 :param args: pilot arguments.
0109 :return: time in seconds (int).
0110 """
0111
0112 return get_time_difference(job_id, PILOT_PRE_GETJOB, PILOT_POST_GETJOB, args)
0113
0114
0115 def get_setup_time(job_id, args):
0116 """
0117 High level function that returns the time for the setup operation for the given job_id.
0118
0119 :param job_id: PanDA job id (string).
0120 :param args: pilot arguments.
0121 :return: time in seconds (int).
0122 """
0123
0124 return get_time_difference(job_id, PILOT_PRE_SETUP, PILOT_POST_SETUP, args)
0125
0126
0127 def get_stagein_time(job_id, args):
0128 """
0129 High level function that returns the time for the stage-in operation for the given job_id.
0130
0131 :param job_id: PanDA job id (string).
0132 :param args: pilot arguments.
0133 :return: time in seconds (int).
0134 """
0135
0136 return get_time_difference(job_id, PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN, args)
0137
0138
0139 def get_stageout_time(job_id, args):
0140 """
0141 High level function that returns the time for the stage-out operation for the given job_id.
0142
0143 :param job_id: PanDA job id (string).
0144 :param args: pilot arguments.
0145 :return: time in seconds (int).
0146 """
0147
0148 return get_time_difference(job_id, PILOT_PRE_STAGEOUT, PILOT_POST_STAGEOUT, args)
0149
0150
0151 def get_payload_execution_time(job_id, args):
0152 """
0153 High level function that returns the time for the payload execution for the given job_id.
0154
0155 :param job_id: PanDA job id (string).
0156 :param args: pilot arguments.
0157 :return: time in seconds (int).
0158 """
0159
0160 return get_time_difference(job_id, PILOT_PRE_PAYLOAD, PILOT_POST_PAYLOAD, args)
0161
0162
0163 def get_final_update_time(job_id, args):
0164 """
0165 High level function that returns the time for execution the final update for the given job_id.
0166
0167 :param job_id: PanDA job id (string).
0168 :param args: pilot arguments.
0169 :return: time in seconds (int).
0170 """
0171
0172 return get_time_difference(job_id, PILOT_PRE_FINAL_UPDATE, PILOT_POST_FINAL_UPDATE, args)
0173
0174
0175 def get_total_pilot_time(job_id, args):
0176 """
0177 High level function that returns the end time for the given job_id.
0178 This means the wall time that has passed from the start of the pilot until after the last job update.
0179
0180 :param job_id: PanDA job id (string).
0181 :param args: pilot arguments.
0182 :return: time in seconds (int).
0183 """
0184
0185 return get_time_difference(job_id, PILOT_START_TIME, PILOT_END_TIME, args)
0186
0187
0188 def get_postgetjob_time(job_id, args):
0189 """
0190 Return the post getjob time.
0191
0192 :param job_id: job object.
0193 :param args: pilot arguments.
0194 :return: post getjob time measurement (int). In case of failure, return None.
0195 """
0196
0197 time_measurement = None
0198 timing_constant = PILOT_POST_GETJOB
0199
0200 if job_id in args.timing:
0201
0202 time_measurement_dictionary = args.timing.get(job_id, None)
0203 if time_measurement_dictionary:
0204 time_measurement = time_measurement_dictionary.get(timing_constant, None)
0205
0206 if not time_measurement:
0207 logger.warning('failed to extract time measurement %s from %s (no such key)' % (timing_constant, time_measurement_dictionary))
0208
0209 return time_measurement
0210
0211
0212 def get_time_measurement(timing_constant, time_measurement_dictionary, timing_dictionary, job_id):
0213 """
0214 Return a requested time measurement from the time measurement dictionary, read from the pilot timing file.
0215
0216 :param timing_constant: timing constant (e.g. PILOT_MULTIJOB_START_TIME)
0217 :param time_measurement_dictionary: time measurement dictionary, extracted from pilot timing dictionary.
0218 :param timing_dictionary: full timing dictionary from pilot timing file.
0219 :param job_id: PanDA job id (string).
0220 :return: time measurement (float).
0221 """
0222
0223 time_measurement = time_measurement_dictionary.get(timing_constant, None)
0224 if not time_measurement:
0225
0226 i = '0' if timing_constant == PILOT_START_TIME else '1'
0227 time_measurement_dictionary_0 = timing_dictionary.get(i, None)
0228 if time_measurement_dictionary_0:
0229 time_measurement = time_measurement_dictionary_0.get(timing_constant, None)
0230 else:
0231 logger.warning('failed to extract time measurement %s from %s (no such key)' % (timing_constant, time_measurement_dictionary))
0232
0233 return time_measurement
0234
0235
0236 def get_time_since_start(args):
0237 """
0238 Return the amount of time that has passed since the pilot was launched.
0239
0240 :param args: pilot arguments.
0241 :return: time in seconds (int).
0242 """
0243
0244 return get_time_since('0', PILOT_START_TIME, args)
0245
0246
0247 def get_time_since_multijob_start(args):
0248 """
0249 Return the amount of time that has passed since the last multi job was launched.
0250
0251 :param args: pilot arguments.
0252 :return: time in seconds (int).
0253 """
0254
0255 return get_time_since('1', PILOT_MULTIJOB_START_TIME, args)
0256
0257
0258 def get_time_since(job_id, timing_constant, args):
0259 """
0260 Return the amount of time that has passed since the time measurement of timing_constant.
0261
0262 :param job_id: PanDA job id (string).
0263 :param timing_constant:
0264 :param args: pilot arguments.
0265 :return: time in seconds (int).
0266 """
0267
0268 diff = 0
0269
0270 if job_id in args.timing:
0271
0272
0273 time_measurement_dictionary = args.timing.get(job_id, None)
0274 if time_measurement_dictionary:
0275 time_measurement = get_time_measurement(timing_constant, time_measurement_dictionary,
0276 args.timing, job_id)
0277 if time_measurement:
0278 diff = time.time() - time_measurement
0279 else:
0280 logger.warning('failed to extract time measurement dictionary from %s' % str(args.timing))
0281 else:
0282 logger.warning('job id %s not found in timing dictionary' % job_id)
0283
0284 return diff
0285
0286
0287 def get_time_difference(job_id, timing_constant_1, timing_constant_2, args):
0288 """
0289 Return the positive time difference between the given constants.
0290 The order is not important and a positive difference is always returned. The function collects the time measurements
0291 corresponding to the given timing constants from the pilot timing file.
0292 The job_id is used internally as a dictionary key. The given timing constants and their timing measurements, belong
0293 to the given job_id.
0294 Structure of pilot timing dictionary:
0295 { job_id: { <timing_constant_1>: <time measurement in seconds since epoch>, .. }
0296 job_id = 0 means timing information from wrapper. Timing constants are defined in pilot.util.constants.
0297 Time measurement are time.time() values. The float value will be converted to an int as a last step.
0298
0299 :param job_id: PanDA job id (string).
0300 :param timing_constant_1:
0301 :param timing_constant_2:
0302 :param args: pilot arguments.
0303 :return: time difference in seconds (int).
0304 """
0305
0306 diff = 0
0307
0308 if job_id in args.timing:
0309
0310
0311 time_measurement_dictionary = args.timing.get(job_id, None)
0312 if time_measurement_dictionary:
0313
0314 time_measurement_1 = get_time_measurement(timing_constant_1, time_measurement_dictionary,
0315 args.timing, job_id)
0316 time_measurement_2 = get_time_measurement(timing_constant_2, time_measurement_dictionary,
0317 args.timing, job_id)
0318
0319 if time_measurement_1 and time_measurement_2:
0320 diff = time_measurement_2 - time_measurement_1
0321 else:
0322 logger.warning('failed to extract time measurement dictionary from %s' % str(args.timing))
0323 else:
0324 logger.warning('job id %s not found in timing dictionary' % job_id)
0325
0326
0327 if diff < 0:
0328 diff = -diff
0329
0330
0331 try:
0332 diff = int(diff)
0333 except Exception as e:
0334 logger.warning('failed to convert %s to int: %s (will reset to 0)' % (diff, e))
0335 diff = 0
0336
0337 return diff
0338
0339
0340 def timing_report(job_id, args):
0341 """
0342 Write a timing report to the job log and return relevant timing measurements.
0343
0344 :param job_id: job id (string).
0345 :param args: pilot arguments.
0346 :return: time_getjob, time_stagein, time_payload, time_stageout, time_total_setup (integer strings).
0347 """
0348
0349
0350 time_getjob = get_getjob_time(job_id, args)
0351 time_initial_setup = get_initial_setup_time(job_id, args)
0352 time_setup = get_setup_time(job_id, args)
0353 time_total_setup = time_initial_setup + time_setup
0354 time_stagein = get_stagein_time(job_id, args)
0355 time_payload = get_payload_execution_time(job_id, args)
0356 time_stageout = get_stageout_time(job_id, args)
0357 logger.info('.' * 30)
0358 logger.info('. Timing measurements:')
0359 logger.info('. get job = %d s' % time_getjob)
0360 logger.info('. initial setup = %d s' % time_initial_setup)
0361 logger.info('. payload setup = %d s' % time_setup)
0362 logger.info('. total setup = %d s' % time_total_setup)
0363 logger.info('. stage-in = %d s' % time_stagein)
0364 logger.info('. payload execution = %d s' % time_payload)
0365 logger.info('. stage-out = %d s' % time_stageout)
0366 logger.info('.' * 30)
0367
0368 return time_getjob, time_stagein, time_payload, time_stageout, time_total_setup
0369
0370
0371 def time_stamp():
0372 """
0373 Return ISO-8601 compliant date/time format
0374
0375 :return: time information
0376 """
0377
0378 tmptz = time.timezone
0379 sign_str = '+'
0380 if tmptz > 0:
0381 sign_str = '-'
0382 tmptz_hours = int(tmptz / 3600)
0383
0384 return str("%s%s%02d:%02d" % (time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), sign_str, abs(tmptz_hours),
0385 int(tmptz / 60 - tmptz_hours * 60)))
0386
0387
0388 def get_elapsed_real_time(t0=None):
0389 """
0390 Return a time stamp corresponding to the elapsed real time (since t0 if requested).
0391 The function uses os.times() to get the current time stamp.
0392 If t0 is provided, the returned time stamp is relative to t0. t0 is assumed to be an os.times() tuple.
0393
0394 :param t0: os.times() tuple for the t0 time stamp.
0395 :return: time stamp (int).
0396 """
0397
0398 if t0 and type(t0) == tuple:
0399 try:
0400 _t0 = int(t0[4])
0401 except Exception as e:
0402 logger.warning('unknown timing format for t0: %s' % e)
0403 _t0 = 0
0404 else:
0405 _t0 = 0
0406
0407 t = int(os.times()[4])
0408
0409 return t - _t0