Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import copy
0002 import datetime
0003 import functools
0004 import json
0005 import os
0006 import socket
0007 import sys
0008 import traceback
0009 from zlib import adler32
0010 
0011 import numpy as np
0012 from pandacommon.pandalogger import logger_utils
0013 from pandacommon.pandalogger.PandaLogger import PandaLogger
0014 from pandacommon.pandautils.PandaUtils import naive_utcnow
0015 from pandacommon.pandautils.thread_utils import GenericThread
0016 from scipy import stats
0017 
0018 from pandaserver.config import panda_config
0019 
0020 # logger
0021 main_logger = PandaLogger().getLogger("metric_collector")
0022 
0023 # dry run
0024 DRY_RUN = False
0025 
0026 # list of metrics in FetchData to fetch data and update to DB. Format: (metric, key_type, period_minutes)
0027 metric_list = [
0028     ("gshare_preference", "gshare", 5),
0029     ("analy_pmerge_jobs_wait_time", "site", 30),
0030     ("analy_site_eval", "site", 30),
0031     ("users_jobs_stats", "both", 5),
0032     ("analy_user_eval", "neither", 10),
0033 ]
0034 
0035 # constant maps
0036 class_value_rank_map = {1: "A_sites", 0: "B_sites", -1: "C_sites"}
0037 
0038 
0039 def get_now_time_str():
0040     """
0041     Return string of nowtime that can be stored in DB
0042     """
0043     now_time = naive_utcnow()
0044     ts_str = now_time.strftime("%Y-%m-%d %H:%M:%S")
0045     return ts_str
0046 
0047 
0048 def conf_interval_upper(n, mean, stdev, cl=0.95):
0049     """
0050     Get estimated confidence level
0051     """
0052     max_value = 999999
0053     ciu = stats.t.ppf(cl, (n - 1), loc=mean, scale=stdev)
0054     ciu = min(ciu, max_value)
0055     return ciu
0056 
0057 
0058 def weighted_stats(values, weights):
0059     """
0060     Return sum of weights, weighted mean and standard deviation
0061     """
0062     sum_of_weights = np.sum(weights)
0063     mean = np.average(values, weights=weights)
0064     variance = np.average((values - mean) ** 2, weights=weights)
0065     stdev = np.sqrt(variance)
0066     return sum_of_weights, mean, stdev
0067 
0068 
0069 # get site slot to-running rate statistics
0070 def get_site_strr_stats(tbuf, time_window=21600, cutoff=300):
0071     """
0072     :param time_window: float, time window in hours to compute slot to-running rate
0073     """
0074     # log
0075     tmp_log = logger_utils.make_logger(main_logger, "get_site_strr_stats")
0076     # timestamps
0077     current_time = naive_utcnow()
0078     starttime_max = current_time - datetime.timedelta(seconds=cutoff)
0079     starttime_min = current_time - datetime.timedelta(seconds=time_window)
0080     # rounded with 10 minutes
0081     starttime_max_rounded = starttime_max.replace(minute=starttime_max.minute // 10 * 10, second=0, microsecond=0)
0082     starttime_min_rounded = starttime_min.replace(minute=starttime_min.minute // 10 * 10, second=0, microsecond=0)
0083     real_interval_hours = (starttime_max_rounded - starttime_min_rounded).total_seconds() / 3600
0084     # define the var map of query parameters
0085     var_map = {
0086         ":startTimeMin": starttime_min_rounded,
0087         ":startTimeMax": starttime_max_rounded,
0088     }
0089     # sql to query on jobs-tables (jobsactive4 and jobsArchived4)
0090     sql_jt = """
0091            SELECT computingSite, coreCount, COUNT(*) FROM %s
0092            WHERE vo='atlas'
0093            AND startTime IS NOT NULL AND startTime>=:startTimeMin AND startTime<:startTimeMax
0094            AND jobStatus IN ('running', 'holding', 'transferring', 'finished', 'cancelled')
0095            """
0096     sql_jt += """
0097            GROUP BY computingSite, coreCount
0098            """
0099     # job tables
0100     tables = [
0101         "ATLAS_PANDA.jobsActive4",
0102         "ATLAS_PANDA.jobsArchived4",
0103     ]
0104     # get
0105     return_map = {}
0106     try:
0107         for table in tables:
0108             sql_exe = (sql_jt) % table
0109             res = tbuf.querySQL(sql_exe, var_map)
0110             # create map
0111             for panda_site, core_count, n_count in res:
0112                 # add site
0113                 return_map.setdefault(panda_site, 0)
0114                 # skip null coreCount
0115                 if core_count is None or not core_count:
0116                     continue
0117                 # increase to-running rate
0118                 to_running_rate = n_count * core_count / real_interval_hours if real_interval_hours > 0 else 0
0119                 return_map[panda_site] += to_running_rate
0120         # end loop
0121         # tmp_log.debug('done')
0122         return True, return_map
0123     except Exception as e:
0124         tmp_log.error(f"Exception {e.__class__.__name__}: {e}")
0125         return False, {}
0126 
0127 
0128 # get each share and its leaf share
0129 def fill_leaf_shares(key, val, the_list):
0130     if val is None:
0131         the_list.append(key)
0132     else:
0133         for k, v in val.items():
0134             fill_leaf_shares(k, v, the_list)
0135 
0136 
0137 class MetricsDB(object):
0138     """
0139     Proxy to access the metrics table in DB
0140     """
0141 
0142     def __init__(self, tbuf):
0143         self.tbuf = tbuf
0144 
0145     def _decor(method):
0146         def _decorator(_method, *args, **kwargs):
0147             @functools.wraps(_method)
0148             def _wrapped_method(self, *args, **kwargs):
0149                 try:
0150                     _method(self, *args, **kwargs)
0151                 except Exception as exc:
0152                     pass
0153 
0154             return _wrapped_method
0155 
0156         return _decorator(method)
0157 
0158     def update(self, metric, key_type, entity_dict):
0159         tmp_log = logger_utils.make_logger(main_logger, "MetricsDB.update")
0160         # tmp_log.debug('start key={0} site={1}, gshare={2}'.format(key, site, gshare))
0161         # sql
0162         # sql_update = (
0163         #     """UPDATE ATLAS_PANDA.Metrics SET """
0164         #         """value_json = json_mergepatch(value_json, :patch_value_json), """
0165         #         """timestamp = :timestamp """
0166         #     """WHERE computingSite=:site AND gshare=:gshare AND metric=:metric """
0167         # )
0168         sql_update = (
0169             """UPDATE ATLAS_PANDA.Metrics SET """
0170             """value_json = :patch_value_json , """
0171             """timestamp = :timestamp """
0172             """WHERE computingSite=:site AND gshare=:gshare AND metric=:metric """
0173         )
0174         sql_insert = """INSERT INTO ATLAS_PANDA.Metrics """ """VALUES ( """ """:site, :gshare, :metric, :patch_value_json, :timestamp """ """) """
0175         # now
0176         now_time = naive_utcnow()
0177         # var map template
0178         varMap_template = {
0179             ":site": None,
0180             ":gshare": None,
0181             ":metric": metric,
0182             ":timestamp": now_time,
0183             ":patch_value_json": None,
0184         }
0185         # make var map list
0186         varMap_list = []
0187         if key_type == "neither":
0188             # values to json string
0189             try:
0190                 patch_value_json = json.dumps(entity_dict)
0191             except Exception:
0192                 tmp_log.error(traceback.format_exc() + " " + str(v))
0193                 return
0194             # initialize varMap
0195             varMap = varMap_template.copy()
0196             varMap.update(
0197                 {
0198                     ":site": "NULL",
0199                     ":gshare": "NULL",
0200                     ":patch_value_json": patch_value_json,
0201                 }
0202             )
0203             # append to the list
0204             varMap_list.append(varMap)
0205         else:
0206             for entity, v in entity_dict.items():
0207                 # values to json string
0208                 try:
0209                     patch_value_json = json.dumps(v)
0210                 except Exception:
0211                     tmp_log.error(traceback.format_exc() + " " + str(v))
0212                     return
0213                 # initialize varMap
0214                 varMap = varMap_template.copy()
0215                 varMap[":patch_value_json"] = patch_value_json
0216                 # update varMap according to key_type
0217                 if key_type == "site":
0218                     varMap.update(
0219                         {
0220                             ":site": entity,
0221                             ":gshare": "NULL",
0222                         }
0223                     )
0224                 elif key_type == "gshare":
0225                     varMap.update(
0226                         {
0227                             ":site": "NULL",
0228                             ":gshare": entity,
0229                         }
0230                     )
0231                 elif key_type == "both":
0232                     varMap.update(
0233                         {
0234                             ":site": entity[0],
0235                             ":gshare": entity[1],
0236                         }
0237                     )
0238                 # append to the list
0239                 varMap_list.append(varMap)
0240         # update
0241         n_row = self.tbuf.executemanySQL(sql_update, varMap_list)
0242         # try insert if not all rows updated
0243         if n_row < len(varMap_list):
0244             try:
0245                 tmp_log.debug(f"only {n_row}/{len(varMap_list)} rows updated for metric={metric} ; trying insert")
0246                 for varMap in varMap_list:
0247                     self.tbuf.querySQLS(sql_insert, varMap)
0248                 tmp_log.debug(f"inserted for metric={metric}")
0249             except Exception:
0250                 tmp_log.warning(f"failed to insert for metric={metric}")
0251         else:
0252             tmp_log.debug(f"updated for metric={metric}")
0253         # done
0254         # tmp_log.debug('done key={0} site={1}, gshare={2}'.format(key, site, gshare))
0255 
0256     def get_metrics(self, metric, key_type=None, fresher_than_minutes_ago=120):
0257         tmp_log = logger_utils.make_logger(main_logger, "MetricsDB.get_metrics")
0258         # tmp_log.debug('start key={0} site={1}, gshare={2}'.format(key, site, gshare))
0259         # sql
0260         sql_query = (
0261             """SELECT computingSite, gshare, value_json """ """FROM ATLAS_PANDA.Metrics """ """WHERE metric = :metric """ """AND timestamp >= :min_timestamp """
0262         )
0263         # now
0264         now_time = naive_utcnow()
0265         # var map
0266         varMap = {
0267             ":metric": metric,
0268             ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0269         }
0270         # query
0271         res = self.tbuf.querySQL(sql_query, varMap)
0272         if res is None:
0273             tmp_log.warning(f"failed to query metric={metric}")
0274             return
0275         # key type default
0276         if key_type is None:
0277             key = {x[0]: x[1] for x in metric_list}.get(metric, "both")
0278         # return map
0279         ret_map = {}
0280         for computingSite, gshare, value_json in res:
0281             key = (computingSite, gshare)
0282             if key_type == "site":
0283                 key = computingSite
0284             elif key_type == "gshare":
0285                 key = gshare
0286             elif key_type == "neither":
0287                 key = None
0288             if value_json is None:
0289                 tmp_log.warning("missing data for " + str((computingSite, gshare)))
0290                 continue
0291             try:
0292                 value_dict = json.loads(value_json)
0293             except Exception:
0294                 tmp_log.error(traceback.format_exc() + " " + str((computingSite, gshare)) + str(value_json))
0295                 continue
0296             else:
0297                 if key is None:
0298                     ret_map = copy.deepcopy(value_dict)
0299                 else:
0300                     ret_map[key] = value_dict
0301         # return
0302         return ret_map
0303 
0304 
0305 class FetchData(object):
0306     """
0307     methods to fetch or evaluate data values to store
0308     """
0309 
0310     def __init__(self, tbuf):
0311         self.tbuf = tbuf
0312         # initialize stored data
0313         self.gshare_status = None
0314 
0315     def analy_pmerge_jobs_wait_time(self):
0316         tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0317         # sql
0318         sql_get_jobs_archived4 = (
0319             "SELECT pandaID, computingSite "
0320             "FROM ATLAS_PANDA.jobsArchived4 "
0321             "WHERE prodSourceLabel='user' "
0322             "AND gshare='User Analysis' "
0323             "AND (processingType='pmerge' OR prodUserName='gangarbt') "
0324             "AND modificationTime>:modificationTime "
0325         )
0326         sql_get_jobs_active4 = (
0327             "SELECT pandaID, computingSite "
0328             "FROM ATLAS_PANDA.jobsActive4 "
0329             "WHERE prodSourceLabel='user' "
0330             "AND gshare='User Analysis' "
0331             "AND jobStatus IN ('running', 'holding', 'merging', 'transferring', 'finished', 'failed', 'closed', 'cancelled') "
0332             "AND (processingType='pmerge' OR prodUserName='gangarbt') "
0333             "AND modificationTime>:modificationTime "
0334         )
0335         sql_get_latest_job_mtime_status = (
0336             "SELECT jobStatus, MIN(modificationTime) " "FROM ATLAS_PANDA.jobs_StatusLog " "WHERE pandaID=:pandaID " "GROUP BY jobStatus "
0337         )
0338         sql_get_site_workflow = "SELECT /* use_json_type */ scj.data.workflow " "FROM ATLAS_PANDA.schedconfig_json scj " "WHERE scj.panda_queue=:computingSite "
0339         sql_get_long_queuing_job_wait_time_template = (
0340             "SELECT COUNT(*), AVG(CURRENT_DATE-creationtime) "
0341             "FROM ATLAS_PANDA.jobsActive4 "
0342             "WHERE prodSourceLabel='user' "
0343             "AND gshare='User Analysis' "
0344             "AND jobStatus IN {q_status_list_str} "
0345             "AND (processingType='pmerge') "
0346             "AND computingSite=:computingSite "
0347             "AND (CURRENT_DATE-creationtime)>:w_mean "
0348         )
0349         try:
0350             # initialize
0351             tmp_site_dict = dict()
0352             # now time
0353             now_time = naive_utcnow()
0354             # get user jobs
0355             varMap = {
0356                 ":modificationTime": now_time - datetime.timedelta(days=4),
0357             }
0358             archived4_jobs_list = self.tbuf.querySQL(sql_get_jobs_archived4, varMap)
0359             active4_jobs_list = self.tbuf.querySQL(sql_get_jobs_active4, varMap)
0360             all_jobs_set = set()
0361             all_jobs_set.update(archived4_jobs_list)
0362             all_jobs_set.update(active4_jobs_list)
0363             n_tot_jobs = len(all_jobs_set)
0364             tmp_log.debug(f"got total {n_tot_jobs} jobs")
0365             # loop over jobs to get modificationTime when activated and running
0366             cc = 0
0367             for pandaID, site in all_jobs_set:
0368                 if not site:
0369                     continue
0370                 varMap = {":pandaID": pandaID}
0371                 status_mtime_list = self.tbuf.querySQL(sql_get_latest_job_mtime_status, varMap)
0372                 status_mtime_dict = dict(status_mtime_list)
0373                 if "activated" not in status_mtime_dict or "running" not in status_mtime_dict:
0374                     continue
0375                 wait_time = status_mtime_dict["running"] - status_mtime_dict["activated"]
0376                 wait_time_sec = wait_time.total_seconds()
0377                 if wait_time_sec < 0:
0378                     tmp_log.warning(f"job {pandaID} has negative wait time")
0379                     continue
0380                 run_age_sec = int((now_time - status_mtime_dict["running"]).total_seconds())
0381                 if run_age_sec < 0:
0382                     tmp_log.warning(f"job {pandaID} has negative run age")
0383                     continue
0384                 tmp_site_dict.setdefault(site, {"wait_time": [], "run_age": []})
0385                 tmp_site_dict[site]["wait_time"].append(wait_time_sec)
0386                 tmp_site_dict[site]["run_age"].append(run_age_sec)
0387                 # log message
0388                 if cc > 0 and cc % 5000 == 0:
0389                     tmp_log.debug(f"... queried {cc:9d} jobs ...")
0390                 cc += 1
0391             tmp_log.debug(f"queried {cc} jobs")
0392             # evaluate stats
0393             site_dict = dict()
0394             for site, data_dict in tmp_site_dict.items():
0395                 site_dict.setdefault(site, {})
0396                 n_jobs = len(data_dict["wait_time"])
0397                 # init with nan
0398                 mean = np.nan
0399                 stdev = np.nan
0400                 median = np.nan
0401                 cl95upp = np.nan
0402                 sum_of_weights = np.nan
0403                 w_mean = np.nan
0404                 w_stdev = np.nan
0405                 w_cl95upp = np.nan
0406                 long_q_n = np.nan
0407                 long_q_mean = np.nan
0408                 # fill the stats values
0409                 if n_jobs > 0:
0410                     wait_time_array = np.array(data_dict["wait_time"])
0411                     run_age_array = np.array(data_dict["run_age"])
0412                     # stats
0413                     mean = np.mean(wait_time_array)
0414                     stdev = np.std(wait_time_array)
0415                     median = np.median(wait_time_array)
0416                     cl95upp = conf_interval_upper(n=n_jobs, mean=mean, stdev=stdev, cl=0.95)
0417                     # weighted by run age (weight halves every 12 hours)
0418                     weight_array = np.exp2(-run_age_array / (12 * 60 * 60))
0419                     sum_of_weights, w_mean, w_stdev = weighted_stats(wait_time_array, weight_array)
0420                     w_cl95upp = conf_interval_upper(n=sum_of_weights + 1, mean=w_mean, stdev=w_stdev, cl=0.95)
0421                     # current long queuing jobs
0422                     if w_mean:
0423                         q_status_list_str = "('activated', 'sent')"
0424                         varMap = {
0425                             ":computingSite": site,
0426                         }
0427                         (site_workflow,) = self.tbuf.querySQL(sql_get_site_workflow, varMap)[0]
0428                         if site_workflow and site_workflow.startswith("push"):
0429                             q_status_list_str = "('activated', 'sent', 'starting')"
0430                         varMap = {
0431                             ":computingSite": site,
0432                             ":w_mean": w_mean / (24 * 60 * 60),
0433                         }
0434                         sql_get_long_queuing_job_wait_time = sql_get_long_queuing_job_wait_time_template.format(q_status_list_str=q_status_list_str)
0435                         (long_q_n, long_q_mean_day) = self.tbuf.querySQL(sql_get_long_queuing_job_wait_time, varMap)[0]
0436                         if long_q_mean_day:
0437                             long_q_mean = long_q_mean_day * (24 * 60 * 60)
0438                         else:
0439                             long_q_mean = w_mean
0440                             long_q_n = 0
0441                 # update
0442                 site_dict[site].update(
0443                     {
0444                         "n": n_jobs,
0445                         "mean": mean,
0446                         "stdev": stdev,
0447                         "med": median,
0448                         "cl95upp": cl95upp,
0449                         "sum_of_weights": sum_of_weights,
0450                         "w_mean": w_mean,
0451                         "w_stdev": w_stdev,
0452                         "w_cl95upp": w_cl95upp,
0453                         "long_q_n": long_q_n,
0454                         "long_q_mean": long_q_mean,
0455                     }
0456                 )
0457                 # log
0458                 tmp_log.debug(
0459                     (
0460                         "site={site}, n={n}, "
0461                         "mean={mean:.3f}, stdev={stdev:.3f}, med={med:.3f}, cl95upp={cl95upp:.3f}, "
0462                         "sum_of_weights={sum_of_weights:.3f}, "
0463                         "w_mean={w_mean:.3f}, w_stdev={w_stdev:.3f}, w_cl95upp={w_cl95upp:.3f}, "
0464                         "long_q_n={long_q_n}, long_q_mean={long_q_mean:.3f} "
0465                     ).format(site=site, **site_dict[site])
0466                 )
0467                 # turn nan into None
0468                 for key in site_dict[site]:
0469                     if np.isnan(site_dict[site][key]):
0470                         site_dict[site][key] = None
0471             # return
0472             return site_dict
0473         except Exception:
0474             tmp_log.error(traceback.format_exc())
0475 
0476     def gshare_preference(self):
0477         tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0478         try:
0479             # get share and hs info
0480             if self.gshare_status is None:
0481                 self.gshare_status = self.tbuf.getGShareStatus()
0482             share_name_tree_dict = self.tbuf.get_tree_of_gshare_names()
0483             # initialize
0484             gshare_dict = dict()
0485             total_hs = sum([leaf["target"] for leaf in self.gshare_status])
0486             # rank and data
0487             for idx, leaf in enumerate(self.gshare_status):
0488                 rank = idx + 1
0489                 gshare = leaf["name"]
0490                 gshare_dict[gshare] = {
0491                     "gshare": gshare,
0492                     "rank": rank,
0493                     "queuing_hs": leaf["queuing"],
0494                     "running_hs": leaf["running"],
0495                     "target_hs": leaf["target"],
0496                     "usage_perc": leaf["running"] / leaf["target"] if leaf["target"] > 0 else 999999,
0497                     "queue_perc": leaf["queuing"] / leaf["target"] if leaf["target"] > 0 else 999999,
0498                     "norm_usage_perc": leaf["running"] / total_hs if total_hs > 0 else 999999,
0499                     "norm_queue_perc": leaf["queuing"] / total_hs if total_hs > 0 else 999999,
0500                     "norm_target_perc": leaf["target"] / total_hs if total_hs > 0 else 999999,
0501                     "proj_target_hs": 0,
0502                     "eqiv_target_hs": 0,
0503                     "eqiv_usage_perc": 0,
0504                 }
0505                 tmp_log.debug("rank={rank}, gshare={gshare}, usage={usage_perc:.3%}, queue={queue_perc:.3%} ".format(**gshare_dict[gshare]))
0506             # add L1 share
0507             tmp_L1_leaves_map = {}
0508             l1_share_dict = {}
0509             for l1_share, val in share_name_tree_dict.items():
0510                 tmp_L1_leaves_map.setdefault(l1_share, [])
0511                 fill_leaf_shares(l1_share, val, tmp_L1_leaves_map[l1_share])
0512             for l1_share, leaves_list in tmp_L1_leaves_map.items():
0513                 l1_share_name = f"L1 {l1_share}"
0514                 l1_share_dict.setdefault(
0515                     l1_share_name,
0516                     {
0517                         "gshare": l1_share_name,
0518                         "rank": -1,
0519                         "queuing_hs": 0,
0520                         "running_hs": 0,
0521                         "target_hs": 0,
0522                         "usage_perc": 0,
0523                         "queue_perc": 0,
0524                     },
0525                 )
0526                 val_dict = l1_share_dict[l1_share_name]
0527                 for leaf in leaves_list:
0528                     if leaf in gshare_dict:
0529                         for field in ["queuing_hs", "running_hs", "target_hs"]:
0530                             val_dict.setdefault(field, 0)
0531                             val_dict[field] += gshare_dict[leaf].get(field, 0)
0532                 val_dict["usage_perc"] = val_dict["running_hs"] / val_dict["target_hs"] if val_dict["target_hs"] > 0 else 999999
0533                 val_dict["queue_perc"] = val_dict["queuing_hs"] / val_dict["target_hs"] if val_dict["target_hs"] > 0 else 999999
0534                 val_dict["norm_usage_perc"] = val_dict["running_hs"] / total_hs if total_hs > 0 else 999999
0535                 val_dict["norm_queue_perc"] = val_dict["queuing_hs"] / total_hs if total_hs > 0 else 999999
0536                 val_dict["norm_target_perc"] = val_dict["target_hs"] / total_hs
0537                 val_dict["proj_target_hs"] = min(
0538                     val_dict["target_hs"],
0539                     max(val_dict["running_hs"], val_dict["queuing_hs"] / 2),
0540                 )
0541             proj_total_hs = sum([v["proj_target_hs"] for v in l1_share_dict.values()])
0542             idle_total_hs = total_hs - proj_total_hs
0543             idle_l1_share_list = []
0544             idle_threshold_perc = 0.85
0545             for l1_share_name, val_dict in l1_share_dict.items():
0546                 if val_dict["usage_perc"] < idle_threshold_perc and val_dict["proj_target_hs"] < val_dict["target_hs"]:
0547                     # idle l1 shares
0548                     idle_l1_share_list.append(l1_share_name)
0549             busy_total_hs = sum([v["target_hs"] for k, v in l1_share_dict.items() if k not in idle_l1_share_list]) + 1
0550             for l1_share_name, val_dict in l1_share_dict.items():
0551                 if l1_share_name in idle_l1_share_list:
0552                     # idle l1 shares
0553                     val_dict["eqiv_target_hs"] = val_dict["target_hs"]
0554                     val_dict["eqiv_usage_perc"] = val_dict["usage_perc"]
0555                 else:
0556                     # busy l1 shares
0557                     val_dict["eqiv_target_hs"] = val_dict["target_hs"] + idle_total_hs * val_dict["target_hs"] / busy_total_hs
0558                     val_dict["eqiv_usage_perc"] = val_dict["running_hs"] / val_dict["eqiv_target_hs"]
0559                 tmp_log.debug(
0560                     (
0561                         "share={gshare}, usage={usage_perc:.3%}, queue={queue_perc:.3%}, "
0562                         "target_hs={target_hs:.0f}, eqiv_target_hs={eqiv_target_hs:.0f}, "
0563                         "norm_target_perc={norm_target_perc:.3%}, eqiv_usage_perc={eqiv_usage_perc:.3%}, "
0564                         "norm_usage={norm_usage_perc:.3%}, norm_queue={norm_queue_perc:.3%} "
0565                     ).format(**val_dict)
0566                 )
0567             gshare_dict.update(l1_share_dict)
0568             # return
0569             return gshare_dict
0570         except Exception:
0571             tmp_log.error(traceback.format_exc())
0572 
0573     def analy_site_eval(self):
0574         tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0575         try:
0576             # initialize
0577             site_dict = dict()
0578             class_A_set = set()
0579             class_B_set = set()
0580             class_C_set = set()
0581             # get resource_type of sites (GRID, hpc, cloud, ...) from schedconfig
0582             res = self.tbuf.querySQL(
0583                 ("SELECT /* use_json_type */ scj.panda_queue, scj.data.resource_type " "FROM ATLAS_PANDA.schedconfig_json scj "),
0584                 {},
0585             )
0586             site_resource_type_map = {site: resource_type for site, resource_type in res}
0587             # MetricsDB
0588             mdb = MetricsDB(self.tbuf)
0589             # get analysis jobs wait time stats
0590             apjwt_dict = mdb.get_metrics("analy_pmerge_jobs_wait_time", "site")
0591             # evaluate derived values from stats
0592             # max of w_cl95upp and long_q_mean for ranking. Only consider GRID sites
0593             ranking_wait_time_list = []
0594             for site, v in apjwt_dict.items():
0595                 if site_resource_type_map.get(site) != "GRID":
0596                     continue
0597                 try:
0598                     if v["w_cl95upp"] is not None and v["long_q_mean"] is not None:
0599                         ranking_wait_time = np.maximum(v["w_cl95upp"], v["long_q_mean"])
0600                         ranking_wait_time_list.append(ranking_wait_time)
0601                     else:
0602                         tmp_log.warning(
0603                             ("site={site} none value, skipped : w_cl95upp={w_cl95upp} long_q_mean={long_q_mean} ").format(
0604                                 w_cl95upp=v["w_cl95upp"], long_q_mean=v["long_q_mean"]
0605                             )
0606                         )
0607                         continue
0608                 except KeyError:
0609                     continue
0610             first_one_third_wait_time = np.nanquantile(np.array(ranking_wait_time_list), 0.333)
0611             last_one_third_wait_time = np.nanquantile(np.array(ranking_wait_time_list), 0.667)
0612             tmp_log.debug(f"GRID n_sites= {len(ranking_wait_time_list)} wait time PR33={first_one_third_wait_time:.3f} PR67={last_one_third_wait_time:.3f}")
0613             # get to-running rate of sites
0614             tmp_st, site_6h_strr_map = get_site_strr_stats(self.tbuf, time_window=60 * 60 * 6)
0615             if not tmp_st:
0616                 tmp_log.error("failed to get 6h-slot-to-running-rate")
0617             tmp_st, site_1d_strr_map = get_site_strr_stats(self.tbuf, time_window=60 * 60 * 24)
0618             if not tmp_st:
0619                 tmp_log.error("failed to get 1d-slot-to-running-rate")
0620             # for each site
0621             for site in apjwt_dict:
0622                 # from wait time stats
0623                 # TODO: to consider failure rate, site fullness, etc.
0624                 v = apjwt_dict[site]
0625                 # evaluate derived values
0626                 try:
0627                     if v["w_cl95upp"] is not None and v["long_q_mean"] is not None:
0628                         v["ranking_wait_time"] = np.maximum(v["w_cl95upp"], v["long_q_mean"])
0629                         # v['is_slowing_down'] = (v['long_q_mean'] > v['w_cl95upp'] and v['long_q_n'] >= 3)
0630                     else:
0631                         tmp_log.warning(f"site={site} none value, skipped : w_cl95upp={v['w_cl95upp']} long_q_mean={v['long_q_mean']} ")
0632                         continue
0633                 except KeyError as e:
0634                     tmp_log.warning(f"site={site} misses value, skipped : {e} ")
0635                     continue
0636                 # initialize
0637                 site_dict[site] = dict()
0638                 # resource type of the site
0639                 site_dict[site]["resource_type"] = site_resource_type_map.get(site)
0640                 # to-running rate
0641                 site_6h_strr = site_6h_strr_map.get(site, 0)
0642                 site_1d_strr = site_1d_strr_map.get(site, 0)
0643                 site_dict[site]["strr_6h"] = site_6h_strr
0644                 site_dict[site]["strr_1d"] = site_1d_strr
0645                 # classify
0646                 if (
0647                     v["ranking_wait_time"] <= max(first_one_third_wait_time, 3600)
0648                     or (v["w_cl95upp"] <= max(first_one_third_wait_time, 3600) and v["long_q_n"] <= 3)
0649                 ) and site_1d_strr > 0:
0650                     # class A (1)
0651                     site_dict[site]["class"] = 1
0652                     class_A_set.add(site)
0653                 elif v["ranking_wait_time"] > max(last_one_third_wait_time, 10800):
0654                     # class C (-1)
0655                     site_dict[site]["class"] = -1
0656                     class_C_set.add(site)
0657                 else:
0658                     # class B (0)
0659                     site_dict[site]["class"] = 0
0660                     class_B_set.add(site)
0661                 # log
0662                 tmp_log.debug(
0663                     ("site={site}, class={class}, type={resource_type}, strr_6h={strr_6h:.3f}, strr_1d={strr_1d:.3f} ").format(site=site, **site_dict[site])
0664                 )
0665                 # turn nan into None
0666                 for key in site_dict[site]:
0667                     _val = site_dict[site][key]
0668                     if not isinstance(_val, str) and np.isnan(_val):
0669                         site_dict[site][key] = None
0670             # log
0671             tmp_log.debug(
0672                 ("class_A ({}) : {} ; class_B ({}) : {} ; class_C ({}) : {}").format(
0673                     len(class_A_set),
0674                     ",".join(sorted(list(class_A_set))),
0675                     len(class_B_set),
0676                     ",".join(sorted(list(class_B_set))),
0677                     len(class_C_set),
0678                     ",".join(sorted(list(class_C_set))),
0679                 )
0680             )
0681             # return
0682             return site_dict
0683         except Exception:
0684             tmp_log.error(traceback.format_exc())
0685 
0686     def users_jobs_stats(self):
0687         prod_source_label = "user"
0688         tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0689         tmp_log.debug("start")
0690         try:
0691             # initialize
0692             site_gshare_dict = dict()
0693             # get users jobs stats
0694             jobsStatsPerUser = {}
0695             varMap = {}
0696             varMap[":prodSourceLabel"] = prod_source_label
0697             varMap[":pmerge"] = "pmerge"
0698             varMap[":gangarbt"] = "gangarbt"
0699             sqlJ = (
0700                 "SELECT COUNT(*),SUM(coreCount),prodUserName,jobStatus,gshare,computingSite "
0701                 "FROM ATLAS_PANDA.jobsActive4 "
0702                 "WHERE prodSourceLabel=:prodSourceLabel "
0703                 "AND processingType<>:pmerge "
0704                 "AND prodUserName<>:gangarbt "
0705                 "GROUP BY prodUserName,jobStatus,gshare,computingSite "
0706             )
0707             # result
0708             res = self.tbuf.querySQL(sqlJ, varMap)
0709             if res is None:
0710                 tmp_log.debug(f"got {res} ")
0711             else:
0712                 tmp_log.debug(f"total {len(res)} ")
0713                 # make map
0714                 for cnt, n_slots, prodUserName, jobStatus, gshare, computingSite in res:
0715                     # append to PerUser map
0716                     jobsStatsPerUser.setdefault(computingSite, {})
0717                     jobsStatsPerUser[computingSite].setdefault(gshare, {})
0718                     jobsStatsPerUser[computingSite][gshare].setdefault(
0719                         prodUserName,
0720                         {
0721                             "nDefined": 0,
0722                             "nAssigned": 0,
0723                             "nActivated": 0,
0724                             "nStarting": 0,
0725                             "nQueue": 0,
0726                             "nRunning": 0,
0727                             "slotsDefined": 0,
0728                             "slotsAssigned": 0,
0729                             "slotsActivated": 0,
0730                             "slotsStarting": 0,
0731                             "slotsQueue": 0,
0732                             "slotsRunning": 0,
0733                         },
0734                     )
0735                     jobsStatsPerUser[computingSite][gshare].setdefault(
0736                         "_total",
0737                         {
0738                             "nDefined": 0,
0739                             "nAssigned": 0,
0740                             "nActivated": 0,
0741                             "nStarting": 0,
0742                             "nQueue": 0,
0743                             "nRunning": 0,
0744                             "slotsDefined": 0,
0745                             "slotsAssigned": 0,
0746                             "slotsActivated": 0,
0747                             "slotsStarting": 0,
0748                             "slotsQueue": 0,
0749                             "slotsRunning": 0,
0750                         },
0751                     )
0752                     # count # of running/done and activated
0753                     if jobStatus in ["defined", "assigned", "activated", "starting"]:
0754                         status_name = f"n{jobStatus.capitalize()}"
0755                         slots_status_name = f"slots{jobStatus.capitalize()}"
0756                         jobsStatsPerUser[computingSite][gshare][prodUserName][status_name] += cnt
0757                         jobsStatsPerUser[computingSite][gshare][prodUserName]["nQueue"] += cnt
0758                         jobsStatsPerUser[computingSite][gshare]["_total"][status_name] += cnt
0759                         jobsStatsPerUser[computingSite][gshare]["_total"]["nQueue"] += cnt
0760                         jobsStatsPerUser[computingSite][gshare][prodUserName][slots_status_name] += n_slots
0761                         jobsStatsPerUser[computingSite][gshare][prodUserName]["slotsQueue"] += n_slots
0762                         jobsStatsPerUser[computingSite][gshare]["_total"][slots_status_name] += n_slots
0763                         jobsStatsPerUser[computingSite][gshare]["_total"]["slotsQueue"] += n_slots
0764                     elif jobStatus in ["running"]:
0765                         jobsStatsPerUser[computingSite][gshare][prodUserName]["nRunning"] += cnt
0766                         jobsStatsPerUser[computingSite][gshare]["_total"]["nRunning"] += cnt
0767                         jobsStatsPerUser[computingSite][gshare][prodUserName]["slotsRunning"] += n_slots
0768                         jobsStatsPerUser[computingSite][gshare]["_total"]["slotsRunning"] += n_slots
0769             # fill
0770             for computingSite in jobsStatsPerUser:
0771                 g_dict = jobsStatsPerUser[computingSite]
0772                 for gshare in g_dict:
0773                     data_dict = g_dict[gshare]
0774                     site_gshare_dict[(computingSite, gshare)] = data_dict
0775                     tmp_log.debug(f"site={computingSite}, gshare={gshare}, stats={data_dict}")
0776             # done
0777             tmp_log.debug("done")
0778             return site_gshare_dict
0779         except Exception:
0780             tmp_log.error(traceback.format_exc())
0781 
0782     def analy_user_eval(self):
0783         tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0784         try:
0785             # initialize
0786             user_dict = dict()
0787             # MetricsDB
0788             mdb = MetricsDB(self.tbuf)
0789             # get analysis site classification evalutation
0790             ase_dict = mdb.get_metrics("analy_site_eval", "site", fresher_than_minutes_ago=120)
0791             # get users jobs stats
0792             ujs_dict = mdb.get_metrics("users_jobs_stats", fresher_than_minutes_ago=15)
0793             # for each site x gshare
0794             for (site, gshare), usage_dict in ujs_dict.items():
0795                 # count only User Analysis & Express Analysis
0796                 if gshare not in ["User Analysis", "Express Analysis"]:
0797                     continue
0798                 # get site evaluation data
0799                 try:
0800                     site_eval_dict = ase_dict[site]
0801                 except KeyError:
0802                     tmp_log.warning(f"analy_site_eval missed site={site} gshare={gshare}, skipped ")
0803                     continue
0804                 # get site class value
0805                 try:
0806                     site_class_value = site_eval_dict["class"]
0807                 except KeyError:
0808                     tmp_log.warning(f"analy_site_eval class missed for site={site} gshare={gshare}, skipped ")
0809                     continue
0810                 else:
0811                     site_class_rank = class_value_rank_map[site_class_value]
0812                 # site resource_type
0813                 site_resource_type = site_eval_dict.get("resource_type")
0814                 # skip non-GRID sites
0815                 if site_resource_type != "GRID":
0816                     continue
0817                 # evaluate usage of GRID sites for each user
0818                 for user, v in usage_dict.items():
0819                     # initialize
0820                     user_dict.setdefault(user, {"user": user})
0821                     for _rank in class_value_rank_map.values():
0822                         user_dict[user].setdefault(
0823                             _rank,
0824                             {
0825                                 "nQueue": 0,
0826                                 "nRunning": 0,
0827                                 "slotsQueue": 0,
0828                                 "slotsRunning": 0,
0829                             },
0830                         )
0831                     # fill nQ & nR at each site class of each user
0832                     user_dict[user][site_class_rank]["nQueue"] += v["nQueue"]
0833                     user_dict[user][site_class_rank]["nRunning"] += v["nRunning"]
0834                     user_dict[user][site_class_rank]["slotsQueue"] += v["slotsQueue"]
0835                     user_dict[user][site_class_rank]["slotsRunning"] += v["slotsRunning"]
0836             # evaluate derived values for each user
0837             threshold_A = self.tbuf.getConfigValue("analy_eval", "USER_USAGE_THRESHOLD_A")
0838             if threshold_A is None:
0839                 threshold_A = 1000
0840             threshold_B = self.tbuf.getConfigValue("analy_eval", "USER_USAGE_THRESHOLD_B")
0841             if threshold_B is None:
0842                 threshold_B = 8000
0843             for user, d in copy.deepcopy(user_dict).items():
0844                 run_slots_A = d["A_sites"]["slotsRunning"]
0845                 run_slots_AB = d["A_sites"]["slotsRunning"] + d["B_sites"]["slotsRunning"]
0846                 # remaining allowed slots
0847                 rem_slots_A = max(threshold_A - run_slots_A, 0)
0848                 rem_slots_B = max(threshold_B - run_slots_AB, 0)
0849                 user_dict[user]["rem_slots_A"] = rem_slots_A
0850                 user_dict[user]["rem_slots_B"] = rem_slots_B
0851                 user_dict[user]["rem_slot_ratio_A"] = rem_slots_A / threshold_A
0852                 user_dict[user]["rem_slot_ratio_B"] = rem_slots_B / threshold_B
0853             # log
0854             tmp_log.debug(f"{user_dict}")
0855             # return
0856             return user_dict
0857         except Exception:
0858             tmp_log.error(traceback.format_exc())
0859 
0860 
0861 # main
0862 def main(tbuf=None, **kwargs):
0863     requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0864 
0865     # instantiate TB
0866     if tbuf is None:
0867         from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0868 
0869         taskBuffer.init(
0870             panda_config.dbhost,
0871             panda_config.dbpasswd,
0872             nDBConnection=1,
0873             useTimeout=True,
0874             requester=requester_id,
0875         )
0876     else:
0877         taskBuffer = tbuf
0878     # pid
0879     my_pid = os.getpid()
0880     my_full_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpgrp()}-{my_pid}"
0881     # go
0882     if DRY_RUN:
0883         # dry run, regardless of lock, not update DB
0884         fetcher = FetchData(taskBuffer)
0885         # loop over all fetch data methods to run and update to DB
0886         for metric_name, key_type, period in metric_list:
0887             main_logger.debug(f"(dry-run) start {metric_name}")
0888             # fetch data and update DB
0889             the_method = getattr(fetcher, metric_name)
0890             fetched_data = the_method()
0891             if fetched_data is None:
0892                 main_logger.warning(f"(dry-run) {metric_name} got no valid data")
0893                 continue
0894             main_logger.debug(f"(dry-run) done {metric_name}")
0895     else:
0896         # real run, will update DB
0897         # instantiate
0898         mdb = MetricsDB(taskBuffer)
0899         fetcher = FetchData(taskBuffer)
0900         # loop over all fetch data methods to run and update to DB
0901         for metric_name, key_type, period in metric_list:
0902             # metric lock
0903             lock_component_name = f"pandaMetr.{metric_name:.16}.{adler32(metric_name.encode('utf-8')):0x}"
0904             # try to get lock
0905             got_lock = taskBuffer.lockProcess_PANDA(component=lock_component_name, pid=my_full_pid, time_limit=period)
0906             if got_lock:
0907                 main_logger.debug(f"got lock of {metric_name}")
0908             else:
0909                 main_logger.debug(f"{metric_name} locked by other process; skipped...")
0910                 continue
0911             main_logger.debug(f"start {metric_name}")
0912             # fetch data and update DB
0913             the_method = getattr(fetcher, metric_name)
0914             fetched_data = the_method()
0915             if fetched_data is None:
0916                 main_logger.warning(f"{metric_name} got no valid data")
0917                 continue
0918             mdb.update(metric=metric_name, key_type=key_type, entity_dict=fetched_data)
0919             main_logger.debug(f"done {metric_name}")
0920     # stop taskBuffer if created inside this script
0921     if tbuf is None:
0922         taskBuffer.cleanup(requester=requester_id)
0923 
0924 
0925 # run
0926 if __name__ == "__main__":
0927     main()