Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:18

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018
0009 
0010 # Note: The Pilot 2 modules that need to record timing measurements, can do so using the add_to_pilot_timing() function.
0011 # When the timing measurements need to be recorded, the high-level functions, e.g. get_getjob_time(), can be used.
0012 
0013 # Structure of pilot timing dictionary:
0014 #     { job_id: { <timing_constant_1>: <time measurement in seconds since epoch>, .. }
0015 # job_id = 0 means timing information from wrapper. Timing constants are defined in pilot.util.constants.
0016 # Time measurement are time.time() values. The float value will be converted to an int as a last step.
0017 
0018 import os
0019 import time
0020 
0021 from pilot.util.config import config
0022 from pilot.util.constants import PILOT_START_TIME, PILOT_PRE_GETJOB, PILOT_POST_GETJOB, PILOT_PRE_SETUP, \
0023     PILOT_POST_SETUP, PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN, PILOT_PRE_PAYLOAD, PILOT_POST_PAYLOAD, PILOT_PRE_STAGEOUT,\
0024     PILOT_POST_STAGEOUT, PILOT_PRE_FINAL_UPDATE, PILOT_POST_FINAL_UPDATE, PILOT_END_TIME, PILOT_MULTIJOB_START_TIME
0025 from pilot.util.filehandling import read_json, write_json
0026 #from pilot.util.mpi import get_ranks_info
0027 
0028 import logging
0029 logger = logging.getLogger(__name__)
0030 
0031 
0032 def read_pilot_timing():
0033     """
0034     Read the pilot timing dictionary from file.
0035 
0036     :return: pilot timing dictionary (json dictionary).
0037     """
0038 
0039     pilot_timing_dictionary = {}
0040 
0041     path = os.path.join(os.environ.get('PILOT_HOME', ''), config.Pilot.timing_file)
0042     if os.path.exists(path):
0043         pilot_timing_dictionary = read_json(path)
0044 
0045     return pilot_timing_dictionary
0046 
0047 
0048 def write_pilot_timing(pilot_timing_dictionary):
0049     """
0050     Write the given pilot timing dictionary to file.
0051 
0052     :param pilot_timing_dictionary:
0053     :return:
0054     """
0055     timing_file = config.Pilot.timing_file
0056     #rank, max_ranks = get_ranks_info()
0057     #if rank is not None:
0058     #    timing_file += '_{0}'.format(rank)
0059     path = os.path.join(os.environ.get('PILOT_HOME', ''), timing_file)
0060     if write_json(path, pilot_timing_dictionary):
0061         logger.debug('updated pilot timing dictionary: %s' % path)
0062     else:
0063         logger.warning('failed to update pilot timing dictionary: %s' % path)
0064 
0065 
0066 def add_to_pilot_timing(job_id, timing_constant, time_measurement, args, store=False):
0067     """
0068     Add the given timing contant and measurement got job_id to the pilot timing dictionary.
0069 
0070     :param job_id: PanDA job id (string).
0071     :param timing_constant: timing constant (string).
0072     :param time_measurement: time measurement (float).
0073     :param args: pilot arguments.
0074     :param store: if True, write timing dictionary to file. False by default.
0075     :return:
0076     """
0077 
0078     if args.timing == {}:
0079         args.timing[job_id] = {timing_constant: time_measurement}
0080     else:
0081         if job_id not in args.timing:
0082             args.timing[job_id] = {}
0083         args.timing[job_id][timing_constant] = time_measurement
0084 
0085     # update the file
0086     if store:
0087         write_pilot_timing(args.timing)
0088 
0089 
0090 def get_initial_setup_time(job_id, args):
0091     """
0092     High level function that returns the time for the initial setup.
0093     The initial setup time is measured from PILOT_START_TIME to PILOT_PRE_GETJOB.
0094 
0095     :param job_id: PanDA job id (string).
0096     :param args: pilot arguments.
0097     :return: time in seconds (int).
0098     """
0099 
0100     return get_time_difference(job_id, PILOT_MULTIJOB_START_TIME, PILOT_PRE_GETJOB, args)
0101 
0102 
0103 def get_getjob_time(job_id, args):
0104     """
0105     High level function that returns the time for the getjob operation for the given job_id.
0106 
0107     :param job_id: PanDA job id (string).
0108     :param args: pilot arguments.
0109     :return: time in seconds (int).
0110     """
0111 
0112     return get_time_difference(job_id, PILOT_PRE_GETJOB, PILOT_POST_GETJOB, args)
0113 
0114 
0115 def get_setup_time(job_id, args):
0116     """
0117     High level function that returns the time for the setup operation for the given job_id.
0118 
0119     :param job_id: PanDA job id (string).
0120     :param args: pilot arguments.
0121     :return: time in seconds (int).
0122     """
0123 
0124     return get_time_difference(job_id, PILOT_PRE_SETUP, PILOT_POST_SETUP, args)
0125 
0126 
0127 def get_stagein_time(job_id, args):
0128     """
0129     High level function that returns the time for the stage-in operation for the given job_id.
0130 
0131     :param job_id: PanDA job id (string).
0132     :param args: pilot arguments.
0133     :return: time in seconds (int).
0134     """
0135 
0136     return get_time_difference(job_id, PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN, args)
0137 
0138 
0139 def get_stageout_time(job_id, args):
0140     """
0141     High level function that returns the time for the stage-out operation for the given job_id.
0142 
0143     :param job_id: PanDA job id (string).
0144     :param args: pilot arguments.
0145     :return: time in seconds (int).
0146     """
0147 
0148     return get_time_difference(job_id, PILOT_PRE_STAGEOUT, PILOT_POST_STAGEOUT, args)
0149 
0150 
0151 def get_payload_execution_time(job_id, args):
0152     """
0153     High level function that returns the time for the payload execution for the given job_id.
0154 
0155     :param job_id: PanDA job id (string).
0156     :param args: pilot arguments.
0157     :return: time in seconds (int).
0158     """
0159 
0160     return get_time_difference(job_id, PILOT_PRE_PAYLOAD, PILOT_POST_PAYLOAD, args)
0161 
0162 
0163 def get_final_update_time(job_id, args):
0164     """
0165     High level function that returns the time for execution the final update for the given job_id.
0166 
0167     :param job_id: PanDA job id (string).
0168     :param args: pilot arguments.
0169     :return: time in seconds (int).
0170     """
0171 
0172     return get_time_difference(job_id, PILOT_PRE_FINAL_UPDATE, PILOT_POST_FINAL_UPDATE, args)
0173 
0174 
0175 def get_total_pilot_time(job_id, args):
0176     """
0177     High level function that returns the end time for the given job_id.
0178     This means the wall time that has passed from the start of the pilot until after the last job update.
0179 
0180     :param job_id: PanDA job id (string).
0181     :param args: pilot arguments.
0182     :return: time in seconds (int).
0183     """
0184 
0185     return get_time_difference(job_id, PILOT_START_TIME, PILOT_END_TIME, args)
0186 
0187 
0188 def get_postgetjob_time(job_id, args):
0189     """
0190     Return the post getjob time.
0191 
0192     :param job_id: job object.
0193     :param args: pilot arguments.
0194     :return: post getjob time measurement (int). In case of failure, return None.
0195     """
0196 
0197     time_measurement = None
0198     timing_constant = PILOT_POST_GETJOB
0199 
0200     if job_id in args.timing:
0201         # extract time measurements
0202         time_measurement_dictionary = args.timing.get(job_id, None)
0203         if time_measurement_dictionary:
0204             time_measurement = time_measurement_dictionary.get(timing_constant, None)
0205 
0206         if not time_measurement:
0207             logger.warning('failed to extract time measurement %s from %s (no such key)' % (timing_constant, time_measurement_dictionary))
0208 
0209     return time_measurement
0210 
0211 
0212 def get_time_measurement(timing_constant, time_measurement_dictionary, timing_dictionary, job_id):
0213     """
0214     Return a requested time measurement from the time measurement dictionary, read from the pilot timing file.
0215 
0216     :param timing_constant: timing constant (e.g. PILOT_MULTIJOB_START_TIME)
0217     :param time_measurement_dictionary: time measurement dictionary, extracted from pilot timing dictionary.
0218     :param timing_dictionary: full timing dictionary from pilot timing file.
0219     :param job_id: PanDA job id (string).
0220     :return: time measurement (float).
0221     """
0222 
0223     time_measurement = time_measurement_dictionary.get(timing_constant, None)
0224     if not time_measurement:
0225         # try to get the measurement for the PILOT_MULTIJOB_START_TIME dictionary
0226         i = '0' if timing_constant == PILOT_START_TIME else '1'
0227         time_measurement_dictionary_0 = timing_dictionary.get(i, None)
0228         if time_measurement_dictionary_0:
0229             time_measurement = time_measurement_dictionary_0.get(timing_constant, None)
0230         else:
0231             logger.warning('failed to extract time measurement %s from %s (no such key)' % (timing_constant, time_measurement_dictionary))
0232 
0233     return time_measurement
0234 
0235 
0236 def get_time_since_start(args):
0237     """
0238     Return the amount of time that has passed since the pilot was launched.
0239 
0240     :param args: pilot arguments.
0241     :return: time in seconds (int).
0242     """
0243 
0244     return get_time_since('0', PILOT_START_TIME, args)
0245 
0246 
0247 def get_time_since_multijob_start(args):
0248     """
0249     Return the amount of time that has passed since the last multi job was launched.
0250 
0251     :param args: pilot arguments.
0252     :return: time in seconds (int).
0253     """
0254 
0255     return get_time_since('1', PILOT_MULTIJOB_START_TIME, args)
0256 
0257 
0258 def get_time_since(job_id, timing_constant, args):
0259     """
0260     Return the amount of time that has passed since the time measurement of timing_constant.
0261 
0262     :param job_id: PanDA job id (string).
0263     :param timing_constant:
0264     :param args: pilot arguments.
0265     :return: time in seconds (int).
0266     """
0267 
0268     diff = 0
0269 
0270     if job_id in args.timing:
0271 
0272         # extract time measurements
0273         time_measurement_dictionary = args.timing.get(job_id, None)
0274         if time_measurement_dictionary:
0275             time_measurement = get_time_measurement(timing_constant, time_measurement_dictionary,
0276                                                     args.timing, job_id)
0277             if time_measurement:
0278                 diff = time.time() - time_measurement
0279         else:
0280             logger.warning('failed to extract time measurement dictionary from %s' % str(args.timing))
0281     else:
0282         logger.warning('job id %s not found in timing dictionary' % job_id)
0283 
0284     return diff
0285 
0286 
0287 def get_time_difference(job_id, timing_constant_1, timing_constant_2, args):
0288     """
0289     Return the positive time difference between the given constants.
0290     The order is not important and a positive difference is always returned. The function collects the time measurements
0291     corresponding to the given timing constants from the pilot timing file.
0292     The job_id is used internally as a dictionary key. The given timing constants and their timing measurements, belong
0293     to the given job_id.
0294     Structure of pilot timing dictionary:
0295         { job_id: { <timing_constant_1>: <time measurement in seconds since epoch>, .. }
0296     job_id = 0 means timing information from wrapper. Timing constants are defined in pilot.util.constants.
0297     Time measurement are time.time() values. The float value will be converted to an int as a last step.
0298 
0299     :param job_id: PanDA job id (string).
0300     :param timing_constant_1:
0301     :param timing_constant_2:
0302     :param args: pilot arguments.
0303     :return: time difference in seconds (int).
0304     """
0305 
0306     diff = 0
0307 
0308     if job_id in args.timing:
0309 
0310         # extract time measurements
0311         time_measurement_dictionary = args.timing.get(job_id, None)
0312         if time_measurement_dictionary:
0313 
0314             time_measurement_1 = get_time_measurement(timing_constant_1, time_measurement_dictionary,
0315                                                       args.timing, job_id)
0316             time_measurement_2 = get_time_measurement(timing_constant_2, time_measurement_dictionary,
0317                                                       args.timing, job_id)
0318 
0319             if time_measurement_1 and time_measurement_2:
0320                 diff = time_measurement_2 - time_measurement_1
0321         else:
0322             logger.warning('failed to extract time measurement dictionary from %s' % str(args.timing))
0323     else:
0324         logger.warning('job id %s not found in timing dictionary' % job_id)
0325 
0326     # always return a positive number
0327     if diff < 0:
0328         diff = -diff
0329 
0330     # convert to int as a last step
0331     try:
0332         diff = int(diff)
0333     except Exception as e:
0334         logger.warning('failed to convert %s to int: %s (will reset to 0)' % (diff, e))
0335         diff = 0
0336 
0337     return diff
0338 
0339 
0340 def timing_report(job_id, args):
0341     """
0342     Write a timing report to the job log and return relevant timing measurements.
0343 
0344     :param job_id: job id (string).
0345     :param args: pilot arguments.
0346     :return: time_getjob, time_stagein, time_payload, time_stageout, time_total_setup (integer strings).
0347     """
0348 
0349     # collect pilot timing data
0350     time_getjob = get_getjob_time(job_id, args)
0351     time_initial_setup = get_initial_setup_time(job_id, args)
0352     time_setup = get_setup_time(job_id, args)
0353     time_total_setup = time_initial_setup + time_setup
0354     time_stagein = get_stagein_time(job_id, args)
0355     time_payload = get_payload_execution_time(job_id, args)
0356     time_stageout = get_stageout_time(job_id, args)
0357     logger.info('.' * 30)
0358     logger.info('. Timing measurements:')
0359     logger.info('. get job = %d s' % time_getjob)
0360     logger.info('. initial setup = %d s' % time_initial_setup)
0361     logger.info('. payload setup = %d s' % time_setup)
0362     logger.info('. total setup = %d s' % time_total_setup)
0363     logger.info('. stage-in = %d s' % time_stagein)
0364     logger.info('. payload execution = %d s' % time_payload)
0365     logger.info('. stage-out = %d s' % time_stageout)
0366     logger.info('.' * 30)
0367 
0368     return time_getjob, time_stagein, time_payload, time_stageout, time_total_setup
0369 
0370 
0371 def time_stamp():
0372     """
0373     Return ISO-8601 compliant date/time format
0374 
0375     :return: time information
0376     """
0377 
0378     tmptz = time.timezone
0379     sign_str = '+'
0380     if tmptz > 0:
0381         sign_str = '-'
0382     tmptz_hours = int(tmptz / 3600)
0383 
0384     return str("%s%s%02d:%02d" % (time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), sign_str, abs(tmptz_hours),
0385                                   int(tmptz / 60 - tmptz_hours * 60)))
0386 
0387 
0388 def get_elapsed_real_time(t0=None):
0389     """
0390     Return a time stamp corresponding to the elapsed real time (since t0 if requested).
0391     The function uses os.times() to get the current time stamp.
0392     If t0 is provided, the returned time stamp is relative to t0. t0 is assumed to be an os.times() tuple.
0393 
0394     :param t0: os.times() tuple for the t0 time stamp.
0395     :return: time stamp (int).
0396     """
0397 
0398     if t0 and type(t0) == tuple:
0399         try:
0400             _t0 = int(t0[4])
0401         except Exception as e:
0402             logger.warning('unknown timing format for t0: %s' % e)
0403             _t0 = 0
0404     else:
0405         _t0 = 0
0406 
0407     t = int(os.times()[4])
0408 
0409     return t - _t0