Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:05

0001 import random
0002 from math import log1p
0003 from typing import List, Optional, Tuple
0004 
0005 #########################
0006 # Pilot related functions
0007 #########################
0008 
0009 # Map "pilotType" (defined in harvester) to prodSourceLabel and pilotType option (defined in pilot, -i option)
0010 # and piloturl (pilot option --piloturl) for pilot 2
0011 
0012 
0013 def get_complicated_pilot_options(pilot_type, pilot_url=None, pilot_version="", prod_source_label="ANY", prod_rc_permille=0):
0014     # for pilot 3
0015     is_pilot3 = True if pilot_version.startswith("3") else False
0016     # basic map
0017     pt_psl_map = {
0018         "RC": {
0019             "prod_source_label": "rc_test2",
0020             "pilot_type_opt": "RC",
0021             "pilot_url_str": (
0022                 "--piloturl http://cern.ch/atlas-panda-pilot/pilot3-dev.tar.gz"
0023                 if is_pilot3
0024                 else "--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz"
0025             ),
0026             "pilot_debug_str": "-d",
0027         },
0028         "ALRB": {
0029             "prod_source_label": "rc_alrb",
0030             "pilot_type_opt": "ALRB",
0031             "pilot_url_str": "",
0032             "pilot_debug_str": "",
0033         },
0034         "PT": {
0035             "prod_source_label": "ptest",
0036             "pilot_type_opt": "PR",
0037             "pilot_url_str": (
0038                 "--piloturl http://cern.ch/atlas-panda-pilot/pilot3-dev2.tar.gz"
0039                 if is_pilot3
0040                 else "--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev2.tar.gz"
0041             ),
0042             "pilot_debug_str": "-d",
0043         },
0044         "PR": {
0045             "prod_source_label": prod_source_label,
0046             "pilot_type_opt": "PR",
0047             "pilot_url_str": "",
0048             "pilot_debug_str": "",
0049         },
0050     }
0051     # get pilot option dict
0052     pilot_opt_dict = pt_psl_map.get(pilot_type, pt_psl_map["PR"])
0053     if pilot_url:
0054         # overwrite with specified pilot_url
0055         pilot_opt_dict["pilot_url_str"] = f"--piloturl {pilot_url}"
0056     elif pilot_type == "PR":
0057         # randomization of pilot url for PR (managed, user) pilot run some portion of RC version (not RC dev) pilot
0058         prod_rc_pilot_url_str = "--piloturl http://pandaserver.cern.ch:25085/cache/pilot/pilot3-rc.tar.gz"
0059         prod_rc_prob = min(max(prod_rc_permille / 1000.0, 0), 1)
0060         lucky_number = random.random()
0061         if lucky_number < prod_rc_prob:
0062             pilot_opt_dict["pilot_url_str"] = prod_rc_pilot_url_str
0063     # return pilot option dict
0064     return pilot_opt_dict
0065 
0066 
0067 # get special flag of pilot wrapper about python version of pilot, and whether to run with python 3 if python version is "3"
0068 
0069 
0070 def get_python_version_option(python_version, prod_source_label):
0071     option = ""
0072     if python_version.startswith("3"):
0073         option = "--pythonversion 3"
0074     return option
0075 
0076 
0077 # get pilot joblabel (-j) option, support unified dispatch
0078 def get_joblabel(prod_source_label):
0079     if prod_source_label in ["managed", "user"]:
0080         job_label = "unified"  # queues use unified dispatch for production and analysis
0081     else:
0082         job_label = prod_source_label
0083     return job_label
0084 
0085 
0086 # get pilot job type (--job-type) option, support unified dispatch
0087 def get_pilot_job_type(job_type):
0088     return "unified"
0089 
0090 
0091 # Parse resource type from string for Unified PanDA Queue
0092 def get_resource_type(resource_type_name, is_unified_queue, all_resource_types, is_pilot_option=False):
0093     resource_type_name = str(resource_type_name)
0094     if not is_unified_queue:
0095         ret = ""
0096     elif resource_type_name in set(all_resource_types):
0097         if is_pilot_option:
0098             ret = f"--resource-type {resource_type_name}"
0099         else:
0100             ret = resource_type_name
0101     else:
0102         ret = ""
0103     return ret
0104 
0105 
0106 #############################
0107 # CE stats related functions
0108 #############################
0109 
0110 
0111 # Compute weight of each CE according to worker stat, return tuple(dict, total weight score)
0112 def get_ce_weighting(
0113     ce_endpoint_list: Optional[List] = None,
0114     worker_ce_all_tuple: Optional[Tuple] = None,
0115     is_slave_queue: bool = False,
0116     fairshare_percent: int = 50,
0117 ) -> Tuple:
0118     """
0119     Compute the weighting of each CE based on worker statistics and throughput.
0120 
0121     Args:
0122         ce_endpoint_list (list): List of CE endpoints to consider.
0123         worker_ce_all_tuple (tuple): A tuple containing:
0124             - worker_limits_dict (dict): Dictionary with worker limits.
0125             - worker_ce_stats_dict (dict): Dictionary with CE statistics.
0126             - worker_ce_backend_throughput_dict (dict): Dictionary with CE backend throughput.
0127             - time_window (int): Time window for statistics in seconds.
0128             - n_new_workers (int): Number of new workers to consider.
0129         is_slave_queue (bool): Whether the queue is a slave queue.
0130         fairshare_percent (int): Percentage of fair share to apply to the weighting.
0131 
0132     Returns:
0133         tuple: A tuple containing:
0134             - total_score (float): Total score for the weighting.
0135             - ce_weight_dict (dict): Dictionary with CE endpoints as keys and their weights as values.
0136             - ce_thruput_dict (dict): Dictionary with CE endpoints as keys and their throughput as values.
0137             - target_Q (float): Target number of queuing workers.
0138     """
0139     multiplier = 1000.0
0140     if ce_endpoint_list is None:
0141         ce_endpoint_list = []
0142     n_ce = len(ce_endpoint_list)
0143     worker_limits_dict, worker_ce_stats_dict, worker_ce_backend_throughput_dict, time_window, n_new_workers = worker_ce_all_tuple
0144     N = float(n_ce)
0145     Q = float(worker_limits_dict["nQueueLimitWorker"])
0146     # W = float(worker_limits_dict["maxWorkers"])
0147     Q_good_init = float(
0148         sum(worker_ce_backend_throughput_dict[_ce][_st] for _st in ("submitted", "running", "finished") for _ce in worker_ce_backend_throughput_dict)
0149     )
0150     Q_good_fin = float(sum(worker_ce_backend_throughput_dict[_ce][_st] for _st in ("submitted",) for _ce in worker_ce_backend_throughput_dict))
0151     thruput_avg = log1p(Q_good_init) - log1p(Q_good_fin)
0152     n_new_workers = float(n_new_workers)
0153     # target number of queuing
0154     target_Q = Q + n_new_workers
0155     if is_slave_queue:
0156         # take total number of current queuing if slave queue
0157         total_Q = sum((float(worker_ce_stats_dict[_k]["submitted"]) for _k in worker_ce_stats_dict))
0158         target_Q = min(total_Q, Q) + n_new_workers
0159 
0160     def _get_thruput(_ce_endpoint):  # inner function
0161         if _ce_endpoint not in worker_ce_backend_throughput_dict:
0162             q_good_init = 0.0
0163             q_good_fin = 0.0
0164         else:
0165             q_good_init = float(sum(worker_ce_backend_throughput_dict[_ce_endpoint][_st] for _st in ("submitted", "running", "finished")))
0166             q_good_fin = float(sum(worker_ce_backend_throughput_dict[_ce_endpoint][_st] for _st in ("submitted",)))
0167         thruput = log1p(q_good_init) - log1p(q_good_fin)
0168         return thruput
0169 
0170     def _get_nslots(_ce_endpoint):  # inner function
0171         # estimated number of slots behind the CE by historical running/finished workers and current running workers
0172         hrf = 0
0173         cr = 0
0174         if _ce_endpoint not in worker_ce_backend_throughput_dict:
0175             pass
0176         else:
0177             hrf = sum(worker_ce_backend_throughput_dict[_ce_endpoint][_st] for _st in ("running", "finished"))
0178         if _ce_endpoint not in worker_ce_stats_dict:
0179             pass
0180         else:
0181             cr = worker_ce_stats_dict[_ce_endpoint].get("running", 0)
0182         return (hrf + cr) / 2.0
0183 
0184     total_nslots = sum((_get_nslots(_ce) for _ce in ce_endpoint_list))
0185     if total_nslots == 0:
0186         total_nslots = 1
0187 
0188     def _get_adj_ratio(thruput, nslots, fairshare_percent=fairshare_percent):  # inner function
0189         # compute coefficients for adjustment
0190         if fairshare_percent < 0:
0191             fairshare_percent = 0
0192         elif fairshare_percent > 100:
0193             fairshare_percent = 100
0194         fair_share_coeff = float(fairshare_percent) / 100.0
0195         thruput_coeff = 0.5
0196         nslots_coeff = 0.0
0197         if fair_share_coeff > 0.5:
0198             thruput_coeff = 1.0 - fair_share_coeff
0199         else:
0200             thruput_coeff = fair_share_coeff
0201             nslots_coeff = 1.0 - thruput_coeff - fair_share_coeff
0202         # adjust throughput
0203         try:
0204             adj_ratio = thruput_coeff * thruput / thruput_avg + fair_share_coeff * (1 / N) + nslots_coeff * nslots / total_nslots
0205         except ZeroDivisionError:
0206             if thruput == 0.0:
0207                 adj_ratio = 1 / N
0208             else:
0209                 raise
0210         return adj_ratio
0211 
0212     ce_base_weight_sum = sum((_get_adj_ratio(_get_thruput(_ce), _get_nslots(_ce)) for _ce in ce_endpoint_list))
0213 
0214     def _get_init_weight(_ce_endpoint):  # inner function
0215         if _ce_endpoint not in worker_ce_stats_dict:
0216             q = 0.0
0217             r = 0.0
0218         else:
0219             q = float(worker_ce_stats_dict[_ce_endpoint]["submitted"])
0220             r = float(worker_ce_stats_dict[_ce_endpoint]["running"])
0221             # q_avg = sum(( float(worker_ce_stats_dict[_k]['submitted']) for _k in worker_ce_stats_dict )) / N
0222             # r_avg = sum(( float(worker_ce_stats_dict[_k]['running']) for _k in worker_ce_stats_dict )) / N
0223         if _ce_endpoint in worker_ce_stats_dict and q > Q:
0224             return float(0)
0225         ce_base_weight_normalized = _get_adj_ratio(_get_thruput(_ce_endpoint), _get_nslots(_ce_endpoint)) / ce_base_weight_sum
0226         # target number of queuing of the CE
0227         q_expected = target_Q * ce_base_weight_normalized
0228         # weight by difference
0229         ret = max((q_expected - q), 2**-10)
0230         # # Weight by running ratio
0231         # _weight_r = 1 + N*r/R
0232         if r == 0:
0233             # Penalty for dead CE (no running worker)
0234             ret = ret / (1 + log1p(q) ** 2)
0235         return ret
0236 
0237     init_weight_iterator = map(_get_init_weight, ce_endpoint_list)
0238     sum_of_weights = sum(init_weight_iterator)
0239     total_score = multiplier * N
0240     try:
0241         regulator = total_score / sum_of_weights
0242     except ZeroDivisionError:
0243         regulator = 1.0
0244     ce_weight_dict = {_ce: _get_init_weight(_ce) * regulator for _ce in ce_endpoint_list}
0245     ce_thruput_dict = {_ce: _get_thruput(_ce) * 86400.0 / time_window for _ce in ce_endpoint_list}
0246     return total_score, ce_weight_dict, ce_thruput_dict, target_Q
0247 
0248 
0249 # Choose a CE according to weighting
0250 def choose_ce(weighting):
0251     total_score, ce_weight_dict, ce_thruput_dict, target_Q = weighting
0252     lucky_number = random.random() * total_score
0253     cur = 0.0
0254     ce_now = None
0255     for _ce, _w in ce_weight_dict.items():
0256         if _w == 0.0:
0257             continue
0258         ce_now = _ce
0259         cur += _w
0260         if cur >= lucky_number:
0261             return _ce
0262     if ce_weight_dict.get(ce_now, -1) > 0.0:
0263         return ce_now
0264     else:
0265         return None
0266 
0267 
0268 # Get better string to display the statistics and weighting of CEs
0269 def get_ce_stats_weighting_display(ce_list, worker_ce_all_tuple, ce_weighting, fairshare_percent=None):
0270     worker_limits_dict, worker_ce_stats_dict, worker_ce_backend_throughput_dict, time_window, n_new_workers = worker_ce_all_tuple
0271     total_score, ce_weight_dict, ce_thruput_dict, target_Q = ce_weighting
0272     worker_ce_stats_dict_sub_default = {"submitted": 0, "running": 0}
0273     worker_ce_backend_throughput_dict_sub_default = {"submitted": 0, "running": 0, "finished": 0}
0274     general_dict = {
0275         "maxWorkers": int(worker_limits_dict.get("maxWorkers")),
0276         "nQueueLimitWorker": int(worker_limits_dict.get("nQueueLimitWorker")),
0277         "nNewWorkers": int(n_new_workers),
0278         "target_Q": int(target_Q),
0279         "history_time_window": int(time_window),
0280         "fairshare_percent": fairshare_percent,
0281     }
0282     general_str = (
0283         "maxWorkers={maxWorkers} "
0284         "nQueueLimitWorker={nQueueLimitWorker} "
0285         "nNewWorkers={nNewWorkers} "
0286         "target_Q={target_Q} "
0287         "hist_timeWindow={history_time_window} "
0288         "fairshare_perc={fairshare_percent} "
0289     ).format(**general_dict)
0290     ce_str_list = []
0291     for _ce in ce_list:
0292         schema_sub_dict = {
0293             "submitted_now": int(worker_ce_stats_dict.get(_ce, worker_ce_stats_dict_sub_default).get("submitted")),
0294             "running_now": int(worker_ce_stats_dict.get(_ce, worker_ce_stats_dict_sub_default).get("running")),
0295             "submitted_history": int(worker_ce_backend_throughput_dict.get(_ce, worker_ce_backend_throughput_dict_sub_default).get("submitted")),
0296             "running_history": int(worker_ce_backend_throughput_dict.get(_ce, worker_ce_backend_throughput_dict_sub_default).get("running")),
0297             "finished_history": int(worker_ce_backend_throughput_dict.get(_ce, worker_ce_backend_throughput_dict_sub_default).get("finished")),
0298             "thruput_score": ce_thruput_dict.get(_ce),
0299             "weight_score": ce_weight_dict.get(_ce),
0300         }
0301         ce_str = (
0302             '"{_ce}": '
0303             "now_S={submitted_now} "
0304             "now_R={running_now} "
0305             "hist_S={submitted_history} "
0306             "hist_R={running_history} "
0307             "hist_F={finished_history} "
0308             "T={thruput_score:.02f} "
0309             "W={weight_score:.03f} "
0310         ).format(_ce=_ce, **schema_sub_dict)
0311         ce_str_list.append(ce_str)
0312     stats_weighting_display_str = general_str + " ; " + " , ".join(ce_str_list)
0313     return stats_weighting_display_str