File indexing completed on 2026-04-10 08:39:01
0001 import copy
0002 import datetime
0003 import functools
0004 import json
0005 import os
0006 import socket
0007 import sys
0008 import traceback
0009 from zlib import adler32
0010
0011 import numpy as np
0012 from pandacommon.pandalogger import logger_utils
0013 from pandacommon.pandalogger.PandaLogger import PandaLogger
0014 from pandacommon.pandautils.PandaUtils import naive_utcnow
0015 from pandacommon.pandautils.thread_utils import GenericThread
0016 from scipy import stats
0017
0018 from pandaserver.config import panda_config
0019
0020
0021 main_logger = PandaLogger().getLogger("metric_collector")
0022
0023
0024 DRY_RUN = False
0025
0026
0027 metric_list = [
0028 ("gshare_preference", "gshare", 5),
0029 ("analy_pmerge_jobs_wait_time", "site", 30),
0030 ("analy_site_eval", "site", 30),
0031 ("users_jobs_stats", "both", 5),
0032 ("analy_user_eval", "neither", 10),
0033 ]
0034
0035
0036 class_value_rank_map = {1: "A_sites", 0: "B_sites", -1: "C_sites"}
0037
0038
0039 def get_now_time_str():
0040 """
0041 Return string of nowtime that can be stored in DB
0042 """
0043 now_time = naive_utcnow()
0044 ts_str = now_time.strftime("%Y-%m-%d %H:%M:%S")
0045 return ts_str
0046
0047
0048 def conf_interval_upper(n, mean, stdev, cl=0.95):
0049 """
0050 Get estimated confidence level
0051 """
0052 max_value = 999999
0053 ciu = stats.t.ppf(cl, (n - 1), loc=mean, scale=stdev)
0054 ciu = min(ciu, max_value)
0055 return ciu
0056
0057
0058 def weighted_stats(values, weights):
0059 """
0060 Return sum of weights, weighted mean and standard deviation
0061 """
0062 sum_of_weights = np.sum(weights)
0063 mean = np.average(values, weights=weights)
0064 variance = np.average((values - mean) ** 2, weights=weights)
0065 stdev = np.sqrt(variance)
0066 return sum_of_weights, mean, stdev
0067
0068
0069
0070 def get_site_strr_stats(tbuf, time_window=21600, cutoff=300):
0071 """
0072 :param time_window: float, time window in hours to compute slot to-running rate
0073 """
0074
0075 tmp_log = logger_utils.make_logger(main_logger, "get_site_strr_stats")
0076
0077 current_time = naive_utcnow()
0078 starttime_max = current_time - datetime.timedelta(seconds=cutoff)
0079 starttime_min = current_time - datetime.timedelta(seconds=time_window)
0080
0081 starttime_max_rounded = starttime_max.replace(minute=starttime_max.minute // 10 * 10, second=0, microsecond=0)
0082 starttime_min_rounded = starttime_min.replace(minute=starttime_min.minute // 10 * 10, second=0, microsecond=0)
0083 real_interval_hours = (starttime_max_rounded - starttime_min_rounded).total_seconds() / 3600
0084
0085 var_map = {
0086 ":startTimeMin": starttime_min_rounded,
0087 ":startTimeMax": starttime_max_rounded,
0088 }
0089
0090 sql_jt = """
0091 SELECT computingSite, coreCount, COUNT(*) FROM %s
0092 WHERE vo='atlas'
0093 AND startTime IS NOT NULL AND startTime>=:startTimeMin AND startTime<:startTimeMax
0094 AND jobStatus IN ('running', 'holding', 'transferring', 'finished', 'cancelled')
0095 """
0096 sql_jt += """
0097 GROUP BY computingSite, coreCount
0098 """
0099
0100 tables = [
0101 "ATLAS_PANDA.jobsActive4",
0102 "ATLAS_PANDA.jobsArchived4",
0103 ]
0104
0105 return_map = {}
0106 try:
0107 for table in tables:
0108 sql_exe = (sql_jt) % table
0109 res = tbuf.querySQL(sql_exe, var_map)
0110
0111 for panda_site, core_count, n_count in res:
0112
0113 return_map.setdefault(panda_site, 0)
0114
0115 if core_count is None or not core_count:
0116 continue
0117
0118 to_running_rate = n_count * core_count / real_interval_hours if real_interval_hours > 0 else 0
0119 return_map[panda_site] += to_running_rate
0120
0121
0122 return True, return_map
0123 except Exception as e:
0124 tmp_log.error(f"Exception {e.__class__.__name__}: {e}")
0125 return False, {}
0126
0127
0128
0129 def fill_leaf_shares(key, val, the_list):
0130 if val is None:
0131 the_list.append(key)
0132 else:
0133 for k, v in val.items():
0134 fill_leaf_shares(k, v, the_list)
0135
0136
0137 class MetricsDB(object):
0138 """
0139 Proxy to access the metrics table in DB
0140 """
0141
0142 def __init__(self, tbuf):
0143 self.tbuf = tbuf
0144
0145 def _decor(method):
0146 def _decorator(_method, *args, **kwargs):
0147 @functools.wraps(_method)
0148 def _wrapped_method(self, *args, **kwargs):
0149 try:
0150 _method(self, *args, **kwargs)
0151 except Exception as exc:
0152 pass
0153
0154 return _wrapped_method
0155
0156 return _decorator(method)
0157
0158 def update(self, metric, key_type, entity_dict):
0159 tmp_log = logger_utils.make_logger(main_logger, "MetricsDB.update")
0160
0161
0162
0163
0164
0165
0166
0167
0168 sql_update = (
0169 """UPDATE ATLAS_PANDA.Metrics SET """
0170 """value_json = :patch_value_json , """
0171 """timestamp = :timestamp """
0172 """WHERE computingSite=:site AND gshare=:gshare AND metric=:metric """
0173 )
0174 sql_insert = """INSERT INTO ATLAS_PANDA.Metrics """ """VALUES ( """ """:site, :gshare, :metric, :patch_value_json, :timestamp """ """) """
0175
0176 now_time = naive_utcnow()
0177
0178 varMap_template = {
0179 ":site": None,
0180 ":gshare": None,
0181 ":metric": metric,
0182 ":timestamp": now_time,
0183 ":patch_value_json": None,
0184 }
0185
0186 varMap_list = []
0187 if key_type == "neither":
0188
0189 try:
0190 patch_value_json = json.dumps(entity_dict)
0191 except Exception:
0192 tmp_log.error(traceback.format_exc() + " " + str(v))
0193 return
0194
0195 varMap = varMap_template.copy()
0196 varMap.update(
0197 {
0198 ":site": "NULL",
0199 ":gshare": "NULL",
0200 ":patch_value_json": patch_value_json,
0201 }
0202 )
0203
0204 varMap_list.append(varMap)
0205 else:
0206 for entity, v in entity_dict.items():
0207
0208 try:
0209 patch_value_json = json.dumps(v)
0210 except Exception:
0211 tmp_log.error(traceback.format_exc() + " " + str(v))
0212 return
0213
0214 varMap = varMap_template.copy()
0215 varMap[":patch_value_json"] = patch_value_json
0216
0217 if key_type == "site":
0218 varMap.update(
0219 {
0220 ":site": entity,
0221 ":gshare": "NULL",
0222 }
0223 )
0224 elif key_type == "gshare":
0225 varMap.update(
0226 {
0227 ":site": "NULL",
0228 ":gshare": entity,
0229 }
0230 )
0231 elif key_type == "both":
0232 varMap.update(
0233 {
0234 ":site": entity[0],
0235 ":gshare": entity[1],
0236 }
0237 )
0238
0239 varMap_list.append(varMap)
0240
0241 n_row = self.tbuf.executemanySQL(sql_update, varMap_list)
0242
0243 if n_row < len(varMap_list):
0244 try:
0245 tmp_log.debug(f"only {n_row}/{len(varMap_list)} rows updated for metric={metric} ; trying insert")
0246 for varMap in varMap_list:
0247 self.tbuf.querySQLS(sql_insert, varMap)
0248 tmp_log.debug(f"inserted for metric={metric}")
0249 except Exception:
0250 tmp_log.warning(f"failed to insert for metric={metric}")
0251 else:
0252 tmp_log.debug(f"updated for metric={metric}")
0253
0254
0255
0256 def get_metrics(self, metric, key_type=None, fresher_than_minutes_ago=120):
0257 tmp_log = logger_utils.make_logger(main_logger, "MetricsDB.get_metrics")
0258
0259
0260 sql_query = (
0261 """SELECT computingSite, gshare, value_json """ """FROM ATLAS_PANDA.Metrics """ """WHERE metric = :metric """ """AND timestamp >= :min_timestamp """
0262 )
0263
0264 now_time = naive_utcnow()
0265
0266 varMap = {
0267 ":metric": metric,
0268 ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0269 }
0270
0271 res = self.tbuf.querySQL(sql_query, varMap)
0272 if res is None:
0273 tmp_log.warning(f"failed to query metric={metric}")
0274 return
0275
0276 if key_type is None:
0277 key = {x[0]: x[1] for x in metric_list}.get(metric, "both")
0278
0279 ret_map = {}
0280 for computingSite, gshare, value_json in res:
0281 key = (computingSite, gshare)
0282 if key_type == "site":
0283 key = computingSite
0284 elif key_type == "gshare":
0285 key = gshare
0286 elif key_type == "neither":
0287 key = None
0288 if value_json is None:
0289 tmp_log.warning("missing data for " + str((computingSite, gshare)))
0290 continue
0291 try:
0292 value_dict = json.loads(value_json)
0293 except Exception:
0294 tmp_log.error(traceback.format_exc() + " " + str((computingSite, gshare)) + str(value_json))
0295 continue
0296 else:
0297 if key is None:
0298 ret_map = copy.deepcopy(value_dict)
0299 else:
0300 ret_map[key] = value_dict
0301
0302 return ret_map
0303
0304
0305 class FetchData(object):
0306 """
0307 methods to fetch or evaluate data values to store
0308 """
0309
0310 def __init__(self, tbuf):
0311 self.tbuf = tbuf
0312
0313 self.gshare_status = None
0314
0315 def analy_pmerge_jobs_wait_time(self):
0316 tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0317
0318 sql_get_jobs_archived4 = (
0319 "SELECT pandaID, computingSite "
0320 "FROM ATLAS_PANDA.jobsArchived4 "
0321 "WHERE prodSourceLabel='user' "
0322 "AND gshare='User Analysis' "
0323 "AND (processingType='pmerge' OR prodUserName='gangarbt') "
0324 "AND modificationTime>:modificationTime "
0325 )
0326 sql_get_jobs_active4 = (
0327 "SELECT pandaID, computingSite "
0328 "FROM ATLAS_PANDA.jobsActive4 "
0329 "WHERE prodSourceLabel='user' "
0330 "AND gshare='User Analysis' "
0331 "AND jobStatus IN ('running', 'holding', 'merging', 'transferring', 'finished', 'failed', 'closed', 'cancelled') "
0332 "AND (processingType='pmerge' OR prodUserName='gangarbt') "
0333 "AND modificationTime>:modificationTime "
0334 )
0335 sql_get_latest_job_mtime_status = (
0336 "SELECT jobStatus, MIN(modificationTime) " "FROM ATLAS_PANDA.jobs_StatusLog " "WHERE pandaID=:pandaID " "GROUP BY jobStatus "
0337 )
0338 sql_get_site_workflow = "SELECT /* use_json_type */ scj.data.workflow " "FROM ATLAS_PANDA.schedconfig_json scj " "WHERE scj.panda_queue=:computingSite "
0339 sql_get_long_queuing_job_wait_time_template = (
0340 "SELECT COUNT(*), AVG(CURRENT_DATE-creationtime) "
0341 "FROM ATLAS_PANDA.jobsActive4 "
0342 "WHERE prodSourceLabel='user' "
0343 "AND gshare='User Analysis' "
0344 "AND jobStatus IN {q_status_list_str} "
0345 "AND (processingType='pmerge') "
0346 "AND computingSite=:computingSite "
0347 "AND (CURRENT_DATE-creationtime)>:w_mean "
0348 )
0349 try:
0350
0351 tmp_site_dict = dict()
0352
0353 now_time = naive_utcnow()
0354
0355 varMap = {
0356 ":modificationTime": now_time - datetime.timedelta(days=4),
0357 }
0358 archived4_jobs_list = self.tbuf.querySQL(sql_get_jobs_archived4, varMap)
0359 active4_jobs_list = self.tbuf.querySQL(sql_get_jobs_active4, varMap)
0360 all_jobs_set = set()
0361 all_jobs_set.update(archived4_jobs_list)
0362 all_jobs_set.update(active4_jobs_list)
0363 n_tot_jobs = len(all_jobs_set)
0364 tmp_log.debug(f"got total {n_tot_jobs} jobs")
0365
0366 cc = 0
0367 for pandaID, site in all_jobs_set:
0368 if not site:
0369 continue
0370 varMap = {":pandaID": pandaID}
0371 status_mtime_list = self.tbuf.querySQL(sql_get_latest_job_mtime_status, varMap)
0372 status_mtime_dict = dict(status_mtime_list)
0373 if "activated" not in status_mtime_dict or "running" not in status_mtime_dict:
0374 continue
0375 wait_time = status_mtime_dict["running"] - status_mtime_dict["activated"]
0376 wait_time_sec = wait_time.total_seconds()
0377 if wait_time_sec < 0:
0378 tmp_log.warning(f"job {pandaID} has negative wait time")
0379 continue
0380 run_age_sec = int((now_time - status_mtime_dict["running"]).total_seconds())
0381 if run_age_sec < 0:
0382 tmp_log.warning(f"job {pandaID} has negative run age")
0383 continue
0384 tmp_site_dict.setdefault(site, {"wait_time": [], "run_age": []})
0385 tmp_site_dict[site]["wait_time"].append(wait_time_sec)
0386 tmp_site_dict[site]["run_age"].append(run_age_sec)
0387
0388 if cc > 0 and cc % 5000 == 0:
0389 tmp_log.debug(f"... queried {cc:9d} jobs ...")
0390 cc += 1
0391 tmp_log.debug(f"queried {cc} jobs")
0392
0393 site_dict = dict()
0394 for site, data_dict in tmp_site_dict.items():
0395 site_dict.setdefault(site, {})
0396 n_jobs = len(data_dict["wait_time"])
0397
0398 mean = np.nan
0399 stdev = np.nan
0400 median = np.nan
0401 cl95upp = np.nan
0402 sum_of_weights = np.nan
0403 w_mean = np.nan
0404 w_stdev = np.nan
0405 w_cl95upp = np.nan
0406 long_q_n = np.nan
0407 long_q_mean = np.nan
0408
0409 if n_jobs > 0:
0410 wait_time_array = np.array(data_dict["wait_time"])
0411 run_age_array = np.array(data_dict["run_age"])
0412
0413 mean = np.mean(wait_time_array)
0414 stdev = np.std(wait_time_array)
0415 median = np.median(wait_time_array)
0416 cl95upp = conf_interval_upper(n=n_jobs, mean=mean, stdev=stdev, cl=0.95)
0417
0418 weight_array = np.exp2(-run_age_array / (12 * 60 * 60))
0419 sum_of_weights, w_mean, w_stdev = weighted_stats(wait_time_array, weight_array)
0420 w_cl95upp = conf_interval_upper(n=sum_of_weights + 1, mean=w_mean, stdev=w_stdev, cl=0.95)
0421
0422 if w_mean:
0423 q_status_list_str = "('activated', 'sent')"
0424 varMap = {
0425 ":computingSite": site,
0426 }
0427 (site_workflow,) = self.tbuf.querySQL(sql_get_site_workflow, varMap)[0]
0428 if site_workflow and site_workflow.startswith("push"):
0429 q_status_list_str = "('activated', 'sent', 'starting')"
0430 varMap = {
0431 ":computingSite": site,
0432 ":w_mean": w_mean / (24 * 60 * 60),
0433 }
0434 sql_get_long_queuing_job_wait_time = sql_get_long_queuing_job_wait_time_template.format(q_status_list_str=q_status_list_str)
0435 (long_q_n, long_q_mean_day) = self.tbuf.querySQL(sql_get_long_queuing_job_wait_time, varMap)[0]
0436 if long_q_mean_day:
0437 long_q_mean = long_q_mean_day * (24 * 60 * 60)
0438 else:
0439 long_q_mean = w_mean
0440 long_q_n = 0
0441
0442 site_dict[site].update(
0443 {
0444 "n": n_jobs,
0445 "mean": mean,
0446 "stdev": stdev,
0447 "med": median,
0448 "cl95upp": cl95upp,
0449 "sum_of_weights": sum_of_weights,
0450 "w_mean": w_mean,
0451 "w_stdev": w_stdev,
0452 "w_cl95upp": w_cl95upp,
0453 "long_q_n": long_q_n,
0454 "long_q_mean": long_q_mean,
0455 }
0456 )
0457
0458 tmp_log.debug(
0459 (
0460 "site={site}, n={n}, "
0461 "mean={mean:.3f}, stdev={stdev:.3f}, med={med:.3f}, cl95upp={cl95upp:.3f}, "
0462 "sum_of_weights={sum_of_weights:.3f}, "
0463 "w_mean={w_mean:.3f}, w_stdev={w_stdev:.3f}, w_cl95upp={w_cl95upp:.3f}, "
0464 "long_q_n={long_q_n}, long_q_mean={long_q_mean:.3f} "
0465 ).format(site=site, **site_dict[site])
0466 )
0467
0468 for key in site_dict[site]:
0469 if np.isnan(site_dict[site][key]):
0470 site_dict[site][key] = None
0471
0472 return site_dict
0473 except Exception:
0474 tmp_log.error(traceback.format_exc())
0475
0476 def gshare_preference(self):
0477 tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0478 try:
0479
0480 if self.gshare_status is None:
0481 self.gshare_status = self.tbuf.getGShareStatus()
0482 share_name_tree_dict = self.tbuf.get_tree_of_gshare_names()
0483
0484 gshare_dict = dict()
0485 total_hs = sum([leaf["target"] for leaf in self.gshare_status])
0486
0487 for idx, leaf in enumerate(self.gshare_status):
0488 rank = idx + 1
0489 gshare = leaf["name"]
0490 gshare_dict[gshare] = {
0491 "gshare": gshare,
0492 "rank": rank,
0493 "queuing_hs": leaf["queuing"],
0494 "running_hs": leaf["running"],
0495 "target_hs": leaf["target"],
0496 "usage_perc": leaf["running"] / leaf["target"] if leaf["target"] > 0 else 999999,
0497 "queue_perc": leaf["queuing"] / leaf["target"] if leaf["target"] > 0 else 999999,
0498 "norm_usage_perc": leaf["running"] / total_hs if total_hs > 0 else 999999,
0499 "norm_queue_perc": leaf["queuing"] / total_hs if total_hs > 0 else 999999,
0500 "norm_target_perc": leaf["target"] / total_hs if total_hs > 0 else 999999,
0501 "proj_target_hs": 0,
0502 "eqiv_target_hs": 0,
0503 "eqiv_usage_perc": 0,
0504 }
0505 tmp_log.debug("rank={rank}, gshare={gshare}, usage={usage_perc:.3%}, queue={queue_perc:.3%} ".format(**gshare_dict[gshare]))
0506
0507 tmp_L1_leaves_map = {}
0508 l1_share_dict = {}
0509 for l1_share, val in share_name_tree_dict.items():
0510 tmp_L1_leaves_map.setdefault(l1_share, [])
0511 fill_leaf_shares(l1_share, val, tmp_L1_leaves_map[l1_share])
0512 for l1_share, leaves_list in tmp_L1_leaves_map.items():
0513 l1_share_name = f"L1 {l1_share}"
0514 l1_share_dict.setdefault(
0515 l1_share_name,
0516 {
0517 "gshare": l1_share_name,
0518 "rank": -1,
0519 "queuing_hs": 0,
0520 "running_hs": 0,
0521 "target_hs": 0,
0522 "usage_perc": 0,
0523 "queue_perc": 0,
0524 },
0525 )
0526 val_dict = l1_share_dict[l1_share_name]
0527 for leaf in leaves_list:
0528 if leaf in gshare_dict:
0529 for field in ["queuing_hs", "running_hs", "target_hs"]:
0530 val_dict.setdefault(field, 0)
0531 val_dict[field] += gshare_dict[leaf].get(field, 0)
0532 val_dict["usage_perc"] = val_dict["running_hs"] / val_dict["target_hs"] if val_dict["target_hs"] > 0 else 999999
0533 val_dict["queue_perc"] = val_dict["queuing_hs"] / val_dict["target_hs"] if val_dict["target_hs"] > 0 else 999999
0534 val_dict["norm_usage_perc"] = val_dict["running_hs"] / total_hs if total_hs > 0 else 999999
0535 val_dict["norm_queue_perc"] = val_dict["queuing_hs"] / total_hs if total_hs > 0 else 999999
0536 val_dict["norm_target_perc"] = val_dict["target_hs"] / total_hs
0537 val_dict["proj_target_hs"] = min(
0538 val_dict["target_hs"],
0539 max(val_dict["running_hs"], val_dict["queuing_hs"] / 2),
0540 )
0541 proj_total_hs = sum([v["proj_target_hs"] for v in l1_share_dict.values()])
0542 idle_total_hs = total_hs - proj_total_hs
0543 idle_l1_share_list = []
0544 idle_threshold_perc = 0.85
0545 for l1_share_name, val_dict in l1_share_dict.items():
0546 if val_dict["usage_perc"] < idle_threshold_perc and val_dict["proj_target_hs"] < val_dict["target_hs"]:
0547
0548 idle_l1_share_list.append(l1_share_name)
0549 busy_total_hs = sum([v["target_hs"] for k, v in l1_share_dict.items() if k not in idle_l1_share_list]) + 1
0550 for l1_share_name, val_dict in l1_share_dict.items():
0551 if l1_share_name in idle_l1_share_list:
0552
0553 val_dict["eqiv_target_hs"] = val_dict["target_hs"]
0554 val_dict["eqiv_usage_perc"] = val_dict["usage_perc"]
0555 else:
0556
0557 val_dict["eqiv_target_hs"] = val_dict["target_hs"] + idle_total_hs * val_dict["target_hs"] / busy_total_hs
0558 val_dict["eqiv_usage_perc"] = val_dict["running_hs"] / val_dict["eqiv_target_hs"]
0559 tmp_log.debug(
0560 (
0561 "share={gshare}, usage={usage_perc:.3%}, queue={queue_perc:.3%}, "
0562 "target_hs={target_hs:.0f}, eqiv_target_hs={eqiv_target_hs:.0f}, "
0563 "norm_target_perc={norm_target_perc:.3%}, eqiv_usage_perc={eqiv_usage_perc:.3%}, "
0564 "norm_usage={norm_usage_perc:.3%}, norm_queue={norm_queue_perc:.3%} "
0565 ).format(**val_dict)
0566 )
0567 gshare_dict.update(l1_share_dict)
0568
0569 return gshare_dict
0570 except Exception:
0571 tmp_log.error(traceback.format_exc())
0572
0573 def analy_site_eval(self):
0574 tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0575 try:
0576
0577 site_dict = dict()
0578 class_A_set = set()
0579 class_B_set = set()
0580 class_C_set = set()
0581
0582 res = self.tbuf.querySQL(
0583 ("SELECT /* use_json_type */ scj.panda_queue, scj.data.resource_type " "FROM ATLAS_PANDA.schedconfig_json scj "),
0584 {},
0585 )
0586 site_resource_type_map = {site: resource_type for site, resource_type in res}
0587
0588 mdb = MetricsDB(self.tbuf)
0589
0590 apjwt_dict = mdb.get_metrics("analy_pmerge_jobs_wait_time", "site")
0591
0592
0593 ranking_wait_time_list = []
0594 for site, v in apjwt_dict.items():
0595 if site_resource_type_map.get(site) != "GRID":
0596 continue
0597 try:
0598 if v["w_cl95upp"] is not None and v["long_q_mean"] is not None:
0599 ranking_wait_time = np.maximum(v["w_cl95upp"], v["long_q_mean"])
0600 ranking_wait_time_list.append(ranking_wait_time)
0601 else:
0602 tmp_log.warning(
0603 ("site={site} none value, skipped : w_cl95upp={w_cl95upp} long_q_mean={long_q_mean} ").format(
0604 w_cl95upp=v["w_cl95upp"], long_q_mean=v["long_q_mean"]
0605 )
0606 )
0607 continue
0608 except KeyError:
0609 continue
0610 first_one_third_wait_time = np.nanquantile(np.array(ranking_wait_time_list), 0.333)
0611 last_one_third_wait_time = np.nanquantile(np.array(ranking_wait_time_list), 0.667)
0612 tmp_log.debug(f"GRID n_sites= {len(ranking_wait_time_list)} wait time PR33={first_one_third_wait_time:.3f} PR67={last_one_third_wait_time:.3f}")
0613
0614 tmp_st, site_6h_strr_map = get_site_strr_stats(self.tbuf, time_window=60 * 60 * 6)
0615 if not tmp_st:
0616 tmp_log.error("failed to get 6h-slot-to-running-rate")
0617 tmp_st, site_1d_strr_map = get_site_strr_stats(self.tbuf, time_window=60 * 60 * 24)
0618 if not tmp_st:
0619 tmp_log.error("failed to get 1d-slot-to-running-rate")
0620
0621 for site in apjwt_dict:
0622
0623
0624 v = apjwt_dict[site]
0625
0626 try:
0627 if v["w_cl95upp"] is not None and v["long_q_mean"] is not None:
0628 v["ranking_wait_time"] = np.maximum(v["w_cl95upp"], v["long_q_mean"])
0629
0630 else:
0631 tmp_log.warning(f"site={site} none value, skipped : w_cl95upp={v['w_cl95upp']} long_q_mean={v['long_q_mean']} ")
0632 continue
0633 except KeyError as e:
0634 tmp_log.warning(f"site={site} misses value, skipped : {e} ")
0635 continue
0636
0637 site_dict[site] = dict()
0638
0639 site_dict[site]["resource_type"] = site_resource_type_map.get(site)
0640
0641 site_6h_strr = site_6h_strr_map.get(site, 0)
0642 site_1d_strr = site_1d_strr_map.get(site, 0)
0643 site_dict[site]["strr_6h"] = site_6h_strr
0644 site_dict[site]["strr_1d"] = site_1d_strr
0645
0646 if (
0647 v["ranking_wait_time"] <= max(first_one_third_wait_time, 3600)
0648 or (v["w_cl95upp"] <= max(first_one_third_wait_time, 3600) and v["long_q_n"] <= 3)
0649 ) and site_1d_strr > 0:
0650
0651 site_dict[site]["class"] = 1
0652 class_A_set.add(site)
0653 elif v["ranking_wait_time"] > max(last_one_third_wait_time, 10800):
0654
0655 site_dict[site]["class"] = -1
0656 class_C_set.add(site)
0657 else:
0658
0659 site_dict[site]["class"] = 0
0660 class_B_set.add(site)
0661
0662 tmp_log.debug(
0663 ("site={site}, class={class}, type={resource_type}, strr_6h={strr_6h:.3f}, strr_1d={strr_1d:.3f} ").format(site=site, **site_dict[site])
0664 )
0665
0666 for key in site_dict[site]:
0667 _val = site_dict[site][key]
0668 if not isinstance(_val, str) and np.isnan(_val):
0669 site_dict[site][key] = None
0670
0671 tmp_log.debug(
0672 ("class_A ({}) : {} ; class_B ({}) : {} ; class_C ({}) : {}").format(
0673 len(class_A_set),
0674 ",".join(sorted(list(class_A_set))),
0675 len(class_B_set),
0676 ",".join(sorted(list(class_B_set))),
0677 len(class_C_set),
0678 ",".join(sorted(list(class_C_set))),
0679 )
0680 )
0681
0682 return site_dict
0683 except Exception:
0684 tmp_log.error(traceback.format_exc())
0685
0686 def users_jobs_stats(self):
0687 prod_source_label = "user"
0688 tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0689 tmp_log.debug("start")
0690 try:
0691
0692 site_gshare_dict = dict()
0693
0694 jobsStatsPerUser = {}
0695 varMap = {}
0696 varMap[":prodSourceLabel"] = prod_source_label
0697 varMap[":pmerge"] = "pmerge"
0698 varMap[":gangarbt"] = "gangarbt"
0699 sqlJ = (
0700 "SELECT COUNT(*),SUM(coreCount),prodUserName,jobStatus,gshare,computingSite "
0701 "FROM ATLAS_PANDA.jobsActive4 "
0702 "WHERE prodSourceLabel=:prodSourceLabel "
0703 "AND processingType<>:pmerge "
0704 "AND prodUserName<>:gangarbt "
0705 "GROUP BY prodUserName,jobStatus,gshare,computingSite "
0706 )
0707
0708 res = self.tbuf.querySQL(sqlJ, varMap)
0709 if res is None:
0710 tmp_log.debug(f"got {res} ")
0711 else:
0712 tmp_log.debug(f"total {len(res)} ")
0713
0714 for cnt, n_slots, prodUserName, jobStatus, gshare, computingSite in res:
0715
0716 jobsStatsPerUser.setdefault(computingSite, {})
0717 jobsStatsPerUser[computingSite].setdefault(gshare, {})
0718 jobsStatsPerUser[computingSite][gshare].setdefault(
0719 prodUserName,
0720 {
0721 "nDefined": 0,
0722 "nAssigned": 0,
0723 "nActivated": 0,
0724 "nStarting": 0,
0725 "nQueue": 0,
0726 "nRunning": 0,
0727 "slotsDefined": 0,
0728 "slotsAssigned": 0,
0729 "slotsActivated": 0,
0730 "slotsStarting": 0,
0731 "slotsQueue": 0,
0732 "slotsRunning": 0,
0733 },
0734 )
0735 jobsStatsPerUser[computingSite][gshare].setdefault(
0736 "_total",
0737 {
0738 "nDefined": 0,
0739 "nAssigned": 0,
0740 "nActivated": 0,
0741 "nStarting": 0,
0742 "nQueue": 0,
0743 "nRunning": 0,
0744 "slotsDefined": 0,
0745 "slotsAssigned": 0,
0746 "slotsActivated": 0,
0747 "slotsStarting": 0,
0748 "slotsQueue": 0,
0749 "slotsRunning": 0,
0750 },
0751 )
0752
0753 if jobStatus in ["defined", "assigned", "activated", "starting"]:
0754 status_name = f"n{jobStatus.capitalize()}"
0755 slots_status_name = f"slots{jobStatus.capitalize()}"
0756 jobsStatsPerUser[computingSite][gshare][prodUserName][status_name] += cnt
0757 jobsStatsPerUser[computingSite][gshare][prodUserName]["nQueue"] += cnt
0758 jobsStatsPerUser[computingSite][gshare]["_total"][status_name] += cnt
0759 jobsStatsPerUser[computingSite][gshare]["_total"]["nQueue"] += cnt
0760 jobsStatsPerUser[computingSite][gshare][prodUserName][slots_status_name] += n_slots
0761 jobsStatsPerUser[computingSite][gshare][prodUserName]["slotsQueue"] += n_slots
0762 jobsStatsPerUser[computingSite][gshare]["_total"][slots_status_name] += n_slots
0763 jobsStatsPerUser[computingSite][gshare]["_total"]["slotsQueue"] += n_slots
0764 elif jobStatus in ["running"]:
0765 jobsStatsPerUser[computingSite][gshare][prodUserName]["nRunning"] += cnt
0766 jobsStatsPerUser[computingSite][gshare]["_total"]["nRunning"] += cnt
0767 jobsStatsPerUser[computingSite][gshare][prodUserName]["slotsRunning"] += n_slots
0768 jobsStatsPerUser[computingSite][gshare]["_total"]["slotsRunning"] += n_slots
0769
0770 for computingSite in jobsStatsPerUser:
0771 g_dict = jobsStatsPerUser[computingSite]
0772 for gshare in g_dict:
0773 data_dict = g_dict[gshare]
0774 site_gshare_dict[(computingSite, gshare)] = data_dict
0775 tmp_log.debug(f"site={computingSite}, gshare={gshare}, stats={data_dict}")
0776
0777 tmp_log.debug("done")
0778 return site_gshare_dict
0779 except Exception:
0780 tmp_log.error(traceback.format_exc())
0781
0782 def analy_user_eval(self):
0783 tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0784 try:
0785
0786 user_dict = dict()
0787
0788 mdb = MetricsDB(self.tbuf)
0789
0790 ase_dict = mdb.get_metrics("analy_site_eval", "site", fresher_than_minutes_ago=120)
0791
0792 ujs_dict = mdb.get_metrics("users_jobs_stats", fresher_than_minutes_ago=15)
0793
0794 for (site, gshare), usage_dict in ujs_dict.items():
0795
0796 if gshare not in ["User Analysis", "Express Analysis"]:
0797 continue
0798
0799 try:
0800 site_eval_dict = ase_dict[site]
0801 except KeyError:
0802 tmp_log.warning(f"analy_site_eval missed site={site} gshare={gshare}, skipped ")
0803 continue
0804
0805 try:
0806 site_class_value = site_eval_dict["class"]
0807 except KeyError:
0808 tmp_log.warning(f"analy_site_eval class missed for site={site} gshare={gshare}, skipped ")
0809 continue
0810 else:
0811 site_class_rank = class_value_rank_map[site_class_value]
0812
0813 site_resource_type = site_eval_dict.get("resource_type")
0814
0815 if site_resource_type != "GRID":
0816 continue
0817
0818 for user, v in usage_dict.items():
0819
0820 user_dict.setdefault(user, {"user": user})
0821 for _rank in class_value_rank_map.values():
0822 user_dict[user].setdefault(
0823 _rank,
0824 {
0825 "nQueue": 0,
0826 "nRunning": 0,
0827 "slotsQueue": 0,
0828 "slotsRunning": 0,
0829 },
0830 )
0831
0832 user_dict[user][site_class_rank]["nQueue"] += v["nQueue"]
0833 user_dict[user][site_class_rank]["nRunning"] += v["nRunning"]
0834 user_dict[user][site_class_rank]["slotsQueue"] += v["slotsQueue"]
0835 user_dict[user][site_class_rank]["slotsRunning"] += v["slotsRunning"]
0836
0837 threshold_A = self.tbuf.getConfigValue("analy_eval", "USER_USAGE_THRESHOLD_A")
0838 if threshold_A is None:
0839 threshold_A = 1000
0840 threshold_B = self.tbuf.getConfigValue("analy_eval", "USER_USAGE_THRESHOLD_B")
0841 if threshold_B is None:
0842 threshold_B = 8000
0843 for user, d in copy.deepcopy(user_dict).items():
0844 run_slots_A = d["A_sites"]["slotsRunning"]
0845 run_slots_AB = d["A_sites"]["slotsRunning"] + d["B_sites"]["slotsRunning"]
0846
0847 rem_slots_A = max(threshold_A - run_slots_A, 0)
0848 rem_slots_B = max(threshold_B - run_slots_AB, 0)
0849 user_dict[user]["rem_slots_A"] = rem_slots_A
0850 user_dict[user]["rem_slots_B"] = rem_slots_B
0851 user_dict[user]["rem_slot_ratio_A"] = rem_slots_A / threshold_A
0852 user_dict[user]["rem_slot_ratio_B"] = rem_slots_B / threshold_B
0853
0854 tmp_log.debug(f"{user_dict}")
0855
0856 return user_dict
0857 except Exception:
0858 tmp_log.error(traceback.format_exc())
0859
0860
0861
0862 def main(tbuf=None, **kwargs):
0863 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0864
0865
0866 if tbuf is None:
0867 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0868
0869 taskBuffer.init(
0870 panda_config.dbhost,
0871 panda_config.dbpasswd,
0872 nDBConnection=1,
0873 useTimeout=True,
0874 requester=requester_id,
0875 )
0876 else:
0877 taskBuffer = tbuf
0878
0879 my_pid = os.getpid()
0880 my_full_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpgrp()}-{my_pid}"
0881
0882 if DRY_RUN:
0883
0884 fetcher = FetchData(taskBuffer)
0885
0886 for metric_name, key_type, period in metric_list:
0887 main_logger.debug(f"(dry-run) start {metric_name}")
0888
0889 the_method = getattr(fetcher, metric_name)
0890 fetched_data = the_method()
0891 if fetched_data is None:
0892 main_logger.warning(f"(dry-run) {metric_name} got no valid data")
0893 continue
0894 main_logger.debug(f"(dry-run) done {metric_name}")
0895 else:
0896
0897
0898 mdb = MetricsDB(taskBuffer)
0899 fetcher = FetchData(taskBuffer)
0900
0901 for metric_name, key_type, period in metric_list:
0902
0903 lock_component_name = f"pandaMetr.{metric_name:.16}.{adler32(metric_name.encode('utf-8')):0x}"
0904
0905 got_lock = taskBuffer.lockProcess_PANDA(component=lock_component_name, pid=my_full_pid, time_limit=period)
0906 if got_lock:
0907 main_logger.debug(f"got lock of {metric_name}")
0908 else:
0909 main_logger.debug(f"{metric_name} locked by other process; skipped...")
0910 continue
0911 main_logger.debug(f"start {metric_name}")
0912
0913 the_method = getattr(fetcher, metric_name)
0914 fetched_data = the_method()
0915 if fetched_data is None:
0916 main_logger.warning(f"{metric_name} got no valid data")
0917 continue
0918 mdb.update(metric=metric_name, key_type=key_type, entity_dict=fetched_data)
0919 main_logger.debug(f"done {metric_name}")
0920
0921 if tbuf is None:
0922 taskBuffer.cleanup(requester=requester_id)
0923
0924
0925
0926 if __name__ == "__main__":
0927 main()