File indexing completed on 2026-04-19 08:00:05
0001 import random
0002 from math import log1p
0003 from typing import List, Optional, Tuple
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 def get_complicated_pilot_options(pilot_type, pilot_url=None, pilot_version="", prod_source_label="ANY", prod_rc_permille=0):
0014
0015 is_pilot3 = True if pilot_version.startswith("3") else False
0016
0017 pt_psl_map = {
0018 "RC": {
0019 "prod_source_label": "rc_test2",
0020 "pilot_type_opt": "RC",
0021 "pilot_url_str": (
0022 "--piloturl http://cern.ch/atlas-panda-pilot/pilot3-dev.tar.gz"
0023 if is_pilot3
0024 else "--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz"
0025 ),
0026 "pilot_debug_str": "-d",
0027 },
0028 "ALRB": {
0029 "prod_source_label": "rc_alrb",
0030 "pilot_type_opt": "ALRB",
0031 "pilot_url_str": "",
0032 "pilot_debug_str": "",
0033 },
0034 "PT": {
0035 "prod_source_label": "ptest",
0036 "pilot_type_opt": "PR",
0037 "pilot_url_str": (
0038 "--piloturl http://cern.ch/atlas-panda-pilot/pilot3-dev2.tar.gz"
0039 if is_pilot3
0040 else "--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev2.tar.gz"
0041 ),
0042 "pilot_debug_str": "-d",
0043 },
0044 "PR": {
0045 "prod_source_label": prod_source_label,
0046 "pilot_type_opt": "PR",
0047 "pilot_url_str": "",
0048 "pilot_debug_str": "",
0049 },
0050 }
0051
0052 pilot_opt_dict = pt_psl_map.get(pilot_type, pt_psl_map["PR"])
0053 if pilot_url:
0054
0055 pilot_opt_dict["pilot_url_str"] = f"--piloturl {pilot_url}"
0056 elif pilot_type == "PR":
0057
0058 prod_rc_pilot_url_str = "--piloturl http://pandaserver.cern.ch:25085/cache/pilot/pilot3-rc.tar.gz"
0059 prod_rc_prob = min(max(prod_rc_permille / 1000.0, 0), 1)
0060 lucky_number = random.random()
0061 if lucky_number < prod_rc_prob:
0062 pilot_opt_dict["pilot_url_str"] = prod_rc_pilot_url_str
0063
0064 return pilot_opt_dict
0065
0066
0067
0068
0069
0070 def get_python_version_option(python_version, prod_source_label):
0071 option = ""
0072 if python_version.startswith("3"):
0073 option = "--pythonversion 3"
0074 return option
0075
0076
0077
0078 def get_joblabel(prod_source_label):
0079 if prod_source_label in ["managed", "user"]:
0080 job_label = "unified"
0081 else:
0082 job_label = prod_source_label
0083 return job_label
0084
0085
0086
0087 def get_pilot_job_type(job_type):
0088 return "unified"
0089
0090
0091
0092 def get_resource_type(resource_type_name, is_unified_queue, all_resource_types, is_pilot_option=False):
0093 resource_type_name = str(resource_type_name)
0094 if not is_unified_queue:
0095 ret = ""
0096 elif resource_type_name in set(all_resource_types):
0097 if is_pilot_option:
0098 ret = f"--resource-type {resource_type_name}"
0099 else:
0100 ret = resource_type_name
0101 else:
0102 ret = ""
0103 return ret
0104
0105
0106
0107
0108
0109
0110
0111
0112 def get_ce_weighting(
0113 ce_endpoint_list: Optional[List] = None,
0114 worker_ce_all_tuple: Optional[Tuple] = None,
0115 is_slave_queue: bool = False,
0116 fairshare_percent: int = 50,
0117 ) -> Tuple:
0118 """
0119 Compute the weighting of each CE based on worker statistics and throughput.
0120
0121 Args:
0122 ce_endpoint_list (list): List of CE endpoints to consider.
0123 worker_ce_all_tuple (tuple): A tuple containing:
0124 - worker_limits_dict (dict): Dictionary with worker limits.
0125 - worker_ce_stats_dict (dict): Dictionary with CE statistics.
0126 - worker_ce_backend_throughput_dict (dict): Dictionary with CE backend throughput.
0127 - time_window (int): Time window for statistics in seconds.
0128 - n_new_workers (int): Number of new workers to consider.
0129 is_slave_queue (bool): Whether the queue is a slave queue.
0130 fairshare_percent (int): Percentage of fair share to apply to the weighting.
0131
0132 Returns:
0133 tuple: A tuple containing:
0134 - total_score (float): Total score for the weighting.
0135 - ce_weight_dict (dict): Dictionary with CE endpoints as keys and their weights as values.
0136 - ce_thruput_dict (dict): Dictionary with CE endpoints as keys and their throughput as values.
0137 - target_Q (float): Target number of queuing workers.
0138 """
0139 multiplier = 1000.0
0140 if ce_endpoint_list is None:
0141 ce_endpoint_list = []
0142 n_ce = len(ce_endpoint_list)
0143 worker_limits_dict, worker_ce_stats_dict, worker_ce_backend_throughput_dict, time_window, n_new_workers = worker_ce_all_tuple
0144 N = float(n_ce)
0145 Q = float(worker_limits_dict["nQueueLimitWorker"])
0146
0147 Q_good_init = float(
0148 sum(worker_ce_backend_throughput_dict[_ce][_st] for _st in ("submitted", "running", "finished") for _ce in worker_ce_backend_throughput_dict)
0149 )
0150 Q_good_fin = float(sum(worker_ce_backend_throughput_dict[_ce][_st] for _st in ("submitted",) for _ce in worker_ce_backend_throughput_dict))
0151 thruput_avg = log1p(Q_good_init) - log1p(Q_good_fin)
0152 n_new_workers = float(n_new_workers)
0153
0154 target_Q = Q + n_new_workers
0155 if is_slave_queue:
0156
0157 total_Q = sum((float(worker_ce_stats_dict[_k]["submitted"]) for _k in worker_ce_stats_dict))
0158 target_Q = min(total_Q, Q) + n_new_workers
0159
0160 def _get_thruput(_ce_endpoint):
0161 if _ce_endpoint not in worker_ce_backend_throughput_dict:
0162 q_good_init = 0.0
0163 q_good_fin = 0.0
0164 else:
0165 q_good_init = float(sum(worker_ce_backend_throughput_dict[_ce_endpoint][_st] for _st in ("submitted", "running", "finished")))
0166 q_good_fin = float(sum(worker_ce_backend_throughput_dict[_ce_endpoint][_st] for _st in ("submitted",)))
0167 thruput = log1p(q_good_init) - log1p(q_good_fin)
0168 return thruput
0169
0170 def _get_nslots(_ce_endpoint):
0171
0172 hrf = 0
0173 cr = 0
0174 if _ce_endpoint not in worker_ce_backend_throughput_dict:
0175 pass
0176 else:
0177 hrf = sum(worker_ce_backend_throughput_dict[_ce_endpoint][_st] for _st in ("running", "finished"))
0178 if _ce_endpoint not in worker_ce_stats_dict:
0179 pass
0180 else:
0181 cr = worker_ce_stats_dict[_ce_endpoint].get("running", 0)
0182 return (hrf + cr) / 2.0
0183
0184 total_nslots = sum((_get_nslots(_ce) for _ce in ce_endpoint_list))
0185 if total_nslots == 0:
0186 total_nslots = 1
0187
0188 def _get_adj_ratio(thruput, nslots, fairshare_percent=fairshare_percent):
0189
0190 if fairshare_percent < 0:
0191 fairshare_percent = 0
0192 elif fairshare_percent > 100:
0193 fairshare_percent = 100
0194 fair_share_coeff = float(fairshare_percent) / 100.0
0195 thruput_coeff = 0.5
0196 nslots_coeff = 0.0
0197 if fair_share_coeff > 0.5:
0198 thruput_coeff = 1.0 - fair_share_coeff
0199 else:
0200 thruput_coeff = fair_share_coeff
0201 nslots_coeff = 1.0 - thruput_coeff - fair_share_coeff
0202
0203 try:
0204 adj_ratio = thruput_coeff * thruput / thruput_avg + fair_share_coeff * (1 / N) + nslots_coeff * nslots / total_nslots
0205 except ZeroDivisionError:
0206 if thruput == 0.0:
0207 adj_ratio = 1 / N
0208 else:
0209 raise
0210 return adj_ratio
0211
0212 ce_base_weight_sum = sum((_get_adj_ratio(_get_thruput(_ce), _get_nslots(_ce)) for _ce in ce_endpoint_list))
0213
0214 def _get_init_weight(_ce_endpoint):
0215 if _ce_endpoint not in worker_ce_stats_dict:
0216 q = 0.0
0217 r = 0.0
0218 else:
0219 q = float(worker_ce_stats_dict[_ce_endpoint]["submitted"])
0220 r = float(worker_ce_stats_dict[_ce_endpoint]["running"])
0221
0222
0223 if _ce_endpoint in worker_ce_stats_dict and q > Q:
0224 return float(0)
0225 ce_base_weight_normalized = _get_adj_ratio(_get_thruput(_ce_endpoint), _get_nslots(_ce_endpoint)) / ce_base_weight_sum
0226
0227 q_expected = target_Q * ce_base_weight_normalized
0228
0229 ret = max((q_expected - q), 2**-10)
0230
0231
0232 if r == 0:
0233
0234 ret = ret / (1 + log1p(q) ** 2)
0235 return ret
0236
0237 init_weight_iterator = map(_get_init_weight, ce_endpoint_list)
0238 sum_of_weights = sum(init_weight_iterator)
0239 total_score = multiplier * N
0240 try:
0241 regulator = total_score / sum_of_weights
0242 except ZeroDivisionError:
0243 regulator = 1.0
0244 ce_weight_dict = {_ce: _get_init_weight(_ce) * regulator for _ce in ce_endpoint_list}
0245 ce_thruput_dict = {_ce: _get_thruput(_ce) * 86400.0 / time_window for _ce in ce_endpoint_list}
0246 return total_score, ce_weight_dict, ce_thruput_dict, target_Q
0247
0248
0249
0250 def choose_ce(weighting):
0251 total_score, ce_weight_dict, ce_thruput_dict, target_Q = weighting
0252 lucky_number = random.random() * total_score
0253 cur = 0.0
0254 ce_now = None
0255 for _ce, _w in ce_weight_dict.items():
0256 if _w == 0.0:
0257 continue
0258 ce_now = _ce
0259 cur += _w
0260 if cur >= lucky_number:
0261 return _ce
0262 if ce_weight_dict.get(ce_now, -1) > 0.0:
0263 return ce_now
0264 else:
0265 return None
0266
0267
0268
0269 def get_ce_stats_weighting_display(ce_list, worker_ce_all_tuple, ce_weighting, fairshare_percent=None):
0270 worker_limits_dict, worker_ce_stats_dict, worker_ce_backend_throughput_dict, time_window, n_new_workers = worker_ce_all_tuple
0271 total_score, ce_weight_dict, ce_thruput_dict, target_Q = ce_weighting
0272 worker_ce_stats_dict_sub_default = {"submitted": 0, "running": 0}
0273 worker_ce_backend_throughput_dict_sub_default = {"submitted": 0, "running": 0, "finished": 0}
0274 general_dict = {
0275 "maxWorkers": int(worker_limits_dict.get("maxWorkers")),
0276 "nQueueLimitWorker": int(worker_limits_dict.get("nQueueLimitWorker")),
0277 "nNewWorkers": int(n_new_workers),
0278 "target_Q": int(target_Q),
0279 "history_time_window": int(time_window),
0280 "fairshare_percent": fairshare_percent,
0281 }
0282 general_str = (
0283 "maxWorkers={maxWorkers} "
0284 "nQueueLimitWorker={nQueueLimitWorker} "
0285 "nNewWorkers={nNewWorkers} "
0286 "target_Q={target_Q} "
0287 "hist_timeWindow={history_time_window} "
0288 "fairshare_perc={fairshare_percent} "
0289 ).format(**general_dict)
0290 ce_str_list = []
0291 for _ce in ce_list:
0292 schema_sub_dict = {
0293 "submitted_now": int(worker_ce_stats_dict.get(_ce, worker_ce_stats_dict_sub_default).get("submitted")),
0294 "running_now": int(worker_ce_stats_dict.get(_ce, worker_ce_stats_dict_sub_default).get("running")),
0295 "submitted_history": int(worker_ce_backend_throughput_dict.get(_ce, worker_ce_backend_throughput_dict_sub_default).get("submitted")),
0296 "running_history": int(worker_ce_backend_throughput_dict.get(_ce, worker_ce_backend_throughput_dict_sub_default).get("running")),
0297 "finished_history": int(worker_ce_backend_throughput_dict.get(_ce, worker_ce_backend_throughput_dict_sub_default).get("finished")),
0298 "thruput_score": ce_thruput_dict.get(_ce),
0299 "weight_score": ce_weight_dict.get(_ce),
0300 }
0301 ce_str = (
0302 '"{_ce}": '
0303 "now_S={submitted_now} "
0304 "now_R={running_now} "
0305 "hist_S={submitted_history} "
0306 "hist_R={running_history} "
0307 "hist_F={finished_history} "
0308 "T={thruput_score:.02f} "
0309 "W={weight_score:.03f} "
0310 ).format(_ce=_ce, **schema_sub_dict)
0311 ce_str_list.append(ce_str)
0312 stats_weighting_display_str = general_str + " ; " + " , ".join(ce_str_list)
0313 return stats_weighting_display_str