Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import datetime
0002 import functools
0003 import json
0004 import os
0005 import socket
0006 import sys
0007 import traceback
0008 from zlib import adler32
0009 
0010 from pandacommon.pandalogger import logger_utils
0011 from pandacommon.pandalogger.PandaLogger import PandaLogger
0012 from pandacommon.pandautils.PandaUtils import naive_utcnow
0013 from pandacommon.pandautils.thread_utils import GenericThread
0014 
0015 from pandaserver.config import panda_config
0016 from pandaserver.daemons.scripts.metric_collector import MetricsDB
0017 
0018 # logger
0019 main_logger = PandaLogger().getLogger("task_evaluator")
0020 
0021 # dry run
0022 DRY_RUN = False
0023 
0024 # list of metrics in FetchData to fetch data and update to DB. Format: (metric, period_minutes)
0025 metric_list = [
0026     ("analy_task_eval", 10),
0027 ]
0028 
0029 # constant maps
0030 # class_value_rank_map = {1: 'A_sites', 0: 'B_sites', -1: 'C_sites'}
0031 
0032 
0033 class TaskEvaluationDB(object):
0034     """
0035     Proxy to access the task_evaluation table in DB
0036     """
0037 
0038     def __init__(self, tbuf):
0039         self.tbuf = tbuf
0040 
0041     def _decor(method):
0042         def _decorator(_method, *args, **kwargs):
0043             @functools.wraps(_method)
0044             def _wrapped_method(self, *args, **kwargs):
0045                 try:
0046                     _method(self, *args, **kwargs)
0047                 except Exception as exc:
0048                     pass
0049 
0050             return _wrapped_method
0051 
0052         return _decorator(method)
0053 
0054     def update(self, metric, entity_dict):
0055         tmp_log = logger_utils.make_logger(main_logger, "TaskEvaluationDB.update")
0056         tmp_log.debug(f"start metric={metric}")
0057         # sql
0058         sql_query_taskid = """SELECT jediTaskID """ """FROM ATLAS_PANDA.Task_Evaluation """ """WHERE metric = :metric """
0059         sql_update = (
0060             """UPDATE ATLAS_PANDA.Task_Evaluation SET """
0061             """value_json = :patch_value_json, """
0062             """timestamp = :timestamp """
0063             """WHERE jediTaskID=:taskID AND metric=:metric """
0064         )
0065         sql_insert = """INSERT INTO ATLAS_PANDA.Task_Evaluation """ """VALUES ( """ """:taskID, :metric, :patch_value_json, :timestamp """ """) """
0066         # now
0067         now_time = naive_utcnow()
0068         # get existing taskID list
0069         res = self.tbuf.querySQL(sql_query_taskid, {":metric": metric})
0070         existing_taskID_list = [taskID for (taskID,) in res]
0071         # var map template
0072         varMap_template = {
0073             ":taskID": None,
0074             ":metric": metric,
0075             ":timestamp": now_time,
0076             ":patch_value_json": None,
0077         }
0078         # make var map lists
0079         update_varMap_list = []
0080         insert_varMap_list = []
0081         for taskID, v in entity_dict.items():
0082             # values to json string
0083             try:
0084                 patch_value_json = json.dumps(v)
0085             except Exception:
0086                 tmp_log.error(traceback.format_exc() + " " + str(v))
0087                 return
0088             # update varMap
0089             varMap = varMap_template.copy()
0090             varMap[":taskID"] = taskID
0091             varMap[":patch_value_json"] = patch_value_json
0092             # append to the list
0093             if taskID in existing_taskID_list:
0094                 update_varMap_list.append(varMap)
0095             else:
0096                 insert_varMap_list.append(varMap)
0097         # update
0098         n_row = self.tbuf.executemanySQL(sql_update, update_varMap_list)
0099         if n_row < len(update_varMap_list):
0100             tmp_log.warning(f"only {n_row}/{len(update_varMap_list)} rows updated for metric={metric}")
0101         else:
0102             tmp_log.debug(f"updated {len(update_varMap_list)} rows for metric={metric}")
0103         # insert
0104         n_row = self.tbuf.executemanySQL(sql_insert, insert_varMap_list)
0105         if n_row is None:
0106             # try to insert one by one
0107             n_row = 0
0108             for varMap in insert_varMap_list:
0109                 res = self.tbuf.querySQL(sql_insert, varMap)
0110                 try:
0111                     n_row += res
0112                 except TypeError:
0113                     pass
0114         if n_row < len(insert_varMap_list):
0115             tmp_log.warning(f"only {n_row}/{len(insert_varMap_list)} rows inserted for metric={metric}")
0116         else:
0117             tmp_log.debug(f"inserted {len(insert_varMap_list)} rows for metric={metric}")
0118         # done
0119         tmp_log.debug(f"done metric={metric}")
0120 
0121     def get_metrics(self, metric, fresher_than_minutes_ago=120):
0122         tmp_log = logger_utils.make_logger(main_logger, "TaskEvaluationDB.update")
0123         tmp_log.debug(f"start metric={metric}")
0124         # sql
0125         sql_query = (
0126             """SELECT jediTaskID, value_json """ """FROM ATLAS_PANDA.Task_Evaluation """ """WHERE metric = :metric """ """AND timestamp >= :min_timestamp """
0127         )
0128         # now
0129         now_time = naive_utcnow()
0130         # var map
0131         varMap = {
0132             ":metric": metric,
0133             ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0134         }
0135         # query
0136         res = self.tbuf.querySQL(sql_query, varMap)
0137         if res is None:
0138             tmp_log.warning(f"failed to query metric={metric}")
0139             return
0140         # return map
0141         ret_map = {}
0142         for taskID, value_json in res:
0143             try:
0144                 value_dict = json.loads(value_json)
0145             except Exception:
0146                 tmp_log.error(traceback.format_exc() + " " + str(taskID) + str(value_json))
0147             else:
0148                 ret_map[taskID] = value_dict
0149         # return
0150         return ret_map
0151 
0152     def clean_up(self, metric, fresher_than_minutes_ago=120):
0153         tmp_log = logger_utils.make_logger(main_logger, "TaskEvaluationDB.clean_up")
0154         tmp_log.debug(f"start metric={metric}")
0155         # sql
0156         sql_delete_terminated_tasks = (
0157             "DELETE "
0158             "FROM ATLAS_PANDA.Task_Evaluation te "
0159             "WHERE te.jediTaskID IN ( "
0160             "SELECT jt.jediTaskID "
0161             "FROM ATLAS_PANDA.JEDI_Tasks jt, ATLAS_PANDA.Task_Evaluation tez "
0162             "WHERE tez.jediTaskID = jt.jediTaskID "
0163             "AND jt.status IN ('done', 'finished', 'failed', 'broken', 'aborted', 'exhausted') "
0164             ") "
0165             "AND te.metric = :metric "
0166             "AND te.timestamp <= :max_timestamp "
0167         )
0168         # now
0169         now_time = naive_utcnow()
0170         # var map
0171         varMap = {
0172             ":metric": metric,
0173             ":max_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0174         }
0175         # clean up
0176         n_row = self.tbuf.querySQL(sql_delete_terminated_tasks, varMap)
0177         tmp_log.debug(f"cleaned up {n_row} rows for metric={metric}")
0178 
0179 
0180 class FetchData(object):
0181     """
0182     methods to fetch or evaluate data values to store
0183     """
0184 
0185     def __init__(self, tbuf):
0186         self.tbuf = tbuf
0187         # initialize stored data
0188         self.gshare_status = None
0189 
0190     def analy_task_eval(self):
0191         tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0192         # sql
0193         sql_get_active_tasks = (
0194             "SELECT jt.jediTaskID, jt.userName, jt.gshare "
0195             "FROM ATLAS_PANDA.JEDI_Tasks jt, ATLAS_PANDA.JEDI_AUX_Status_MinTaskID asm "
0196             "WHERE jt.taskType = 'anal' AND jt.prodSourceLabel = 'user' "
0197             "AND jt.status=asm.status AND jt.jediTaskID >= asm.min_jediTaskID "
0198             "AND jt.status IN ('scouting', 'scouted', 'running', 'pending', 'throttled') "
0199             "AND jt.userName NOT IN ('gangarbt') "
0200             "AND jt.modificationTime >= CURRENT_DATE - 30 "
0201         )
0202         sql_get_task_dsinfo = (
0203             "SELECT ds.jediTaskID, SUM(ds.nFiles), SUM(ds.nFilesFinished), SUM(ds.nFilesFailed) "
0204             "FROM ATLAS_PANDA.JEDI_Datasets ds "
0205             "WHERE ds.jediTaskID = :taskID "
0206             "AND ds.type IN ('input', 'pseudo_input') "
0207             "AND ds.masterID IS NULL "
0208             "GROUP BY ds.jediTaskID "
0209         )
0210         sql_get_task_n_jobs = (
0211             "SELECT COUNT(*) FROM ( "
0212             "SELECT DISTINCT c.PandaID "
0213             "FROM ATLAS_PANDA.JEDI_Datasets ds, ATLAS_PANDA.JEDI_Dataset_Contents c "
0214             "WHERE c.jediTaskID=ds.jediTaskID AND c.datasetID=ds.datasetID "
0215             "AND ds.jediTaskID=:taskID AND ds.masterID IS NULL "
0216             "AND ds.type IN ('input', 'pseudo_input') "
0217             ") "
0218         )
0219         try:
0220             # initialize
0221             # tmp_site_dict = dict()
0222             task_dict = dict()
0223             # now time
0224             now_time = naive_utcnow()
0225             # MetricsDB
0226             mdb = MetricsDB(self.tbuf)
0227             # get user evaluation
0228             ue_dict = mdb.get_metrics("analy_user_eval", "neither", fresher_than_minutes_ago=20)
0229             # get active tasks
0230             varMap = {}
0231             active_tasks_list = self.tbuf.querySQL(sql_get_active_tasks, varMap)
0232             taskID_list = [task[0] for task in active_tasks_list]
0233             n_tot_tasks = len(active_tasks_list)
0234             tmp_log.debug(f"got total {n_tot_tasks} tasks")
0235             # counter
0236             cc = 0
0237             n_tasks_dict = {
0238                 2: 0,
0239                 1: 0,
0240                 0: 0,
0241                 -1: 0,
0242             }
0243             # loop over tasks
0244             for taskID, user, gshare in active_tasks_list:
0245                 # initialize
0246                 task_class = 1
0247                 n_files_total = 0
0248                 n_files_finished = 0
0249                 n_files_failed = 0
0250                 n_files_remaining = 0
0251                 pct_finished = 0
0252                 pct_failed = 0
0253                 n_jobs = 0
0254                 n_jobs_remaining = 0
0255                 # get dataset info of each task
0256                 varMap = {":taskID": taskID}
0257                 dsinfo_list = self.tbuf.querySQL(sql_get_task_dsinfo, varMap)
0258                 dsinfo_dict = {
0259                     tup[0]: {
0260                         "nFiles": tup[1],
0261                         "nFilesFinished": tup[2],
0262                         "nFilesFailed": tup[3],
0263                     }
0264                     for tup in dsinfo_list
0265                 }
0266                 # get n jobs of each task
0267                 tmp_res = self.tbuf.querySQL(sql_get_task_n_jobs, varMap)
0268                 for obj in tmp_res:
0269                     (n_jobs,) = obj
0270                     dsinfo_dict.setdefault(taskID, {})
0271                     dsinfo_dict[taskID]["nJobs"] = n_jobs
0272                     break
0273                 # get task proceeding progress
0274                 ds_info = dsinfo_dict.get(taskID)
0275                 if ds_info is not None:
0276                     n_files_total = ds_info.get("nFiles", 0)
0277                     n_files_finished = ds_info.get("nFilesFinished", 0)
0278                     n_files_failed = ds_info.get("nFilesFailed", 0)
0279                     n_files_remaining = max(n_files_total - n_files_finished - n_files_failed, 0)
0280                     n_jobs = ds_info.get("nJobs", 0)
0281                     if n_files_total > 0:
0282                         pct_finished = n_files_finished * 100 / n_files_total
0283                         pct_failed = n_files_failed * 100 / n_files_total
0284                         n_jobs_remaining = int(n_jobs * n_files_remaining / n_files_total)
0285                 # classify
0286                 if gshare == "Express Analysis":
0287                     # Express Analysis tasks always in class S
0288                     task_class = 2
0289                 else:
0290                     # parameters
0291                     progress_to_boost_A = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_A", default=90)
0292                     progress_to_boost_B = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_B", default=95)
0293                     max_rem_jobs_to_boost_A = self.tbuf.getConfigValue("analy_eval", "MAX_REM_JOBS_TO_BOOST_A", default=500)
0294                     max_rem_jobs_to_boost_B = self.tbuf.getConfigValue("analy_eval", "MAX_REM_JOBS_TO_BOOST_B", default=200)
0295                     # check usage of the user
0296                     usage_dict = ue_dict.get(user)
0297                     if usage_dict is None:
0298                         continue
0299                     if usage_dict["rem_slots_A"] <= 0:
0300                         if usage_dict["rem_slots_B"] <= 0:
0301                             task_class = -1
0302                         else:
0303                             task_class = 0
0304                     # boost for nearly done tasks
0305                     if task_class == 1 and pct_finished >= progress_to_boost_A and n_jobs_remaining > 0 and n_jobs_remaining <= max_rem_jobs_to_boost_A:
0306                         # almost done A-tasks, to boost
0307                         task_class = 2
0308                     elif task_class == 0 and pct_finished >= progress_to_boost_B and n_jobs_remaining > 0 and n_jobs_remaining <= max_rem_jobs_to_boost_B:
0309                         # almost done B-tasks, to boost
0310                         task_class = 2
0311                 # fill in task class
0312                 task_dict[taskID] = {
0313                     "task_id": taskID,
0314                     "user": user,
0315                     "gshare": gshare,
0316                     "n_files_total": n_files_total,
0317                     "n_files_finished": n_files_finished,
0318                     "n_files_failed": n_files_failed,
0319                     "pct_finished": pct_finished,
0320                     "pct_failed": pct_failed,
0321                     "class": task_class,
0322                 }
0323                 # counter
0324                 cc += 1
0325                 if cc % 5000 == 0:
0326                     tmp_log.debug(f"evaluated {cc:6d} tasks")
0327                 n_tasks_dict[task_class] += 1
0328             tmp_log.debug(f"evaluated {cc:6d} tasks in total (S:{n_tasks_dict[2]}, A:{n_tasks_dict[1]}, B:{n_tasks_dict[0]}, C:{n_tasks_dict[-1]})")
0329             # return
0330             # tmp_log.debug('{}'.format(str([ v for v in task_dict.values() if v['class'] != 1 ])[:3000]))
0331             tmp_log.debug("done")
0332             return task_dict
0333         except Exception:
0334             tmp_log.error(traceback.format_exc())
0335 
0336 
0337 # main
0338 def main(tbuf=None, **kwargs):
0339     # instantiate TB
0340     if tbuf is None:
0341         from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0342 
0343         requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0344         taskBuffer.init(
0345             panda_config.dbhost,
0346             panda_config.dbpasswd,
0347             nDBConnection=1,
0348             useTimeout=True,
0349             requester=requester_id,
0350         )
0351     else:
0352         taskBuffer = tbuf
0353     # pid
0354     my_pid = os.getpid()
0355     my_full_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpgrp()}-{my_pid}"
0356     # go
0357     if DRY_RUN:
0358         # dry run, regardless of lock, not update DB
0359         fetcher = FetchData(taskBuffer)
0360         # loop over all fetch data methods to run and update to DB
0361         for metric_name, period in metric_list:
0362             main_logger.debug(f"(dry-run) start {metric_name}")
0363             # fetch data and update DB
0364             the_method = getattr(fetcher, metric_name)
0365             fetched_data = the_method()
0366             if fetched_data is None:
0367                 main_logger.warning(f"(dry-run) {metric_name} got no valid data")
0368                 continue
0369             main_logger.debug(f"(dry-run) done {metric_name}")
0370     else:
0371         # real run, will update DB
0372         # instantiate
0373         tedb = TaskEvaluationDB(taskBuffer)
0374         fetcher = FetchData(taskBuffer)
0375         # loop over all fetch data methods to run and update to DB
0376         for metric_name, period in metric_list:
0377             # metric lock
0378             lock_component_name = f"pandaTaskEval.{metric_name:.30}.{adler32(metric_name.encode('utf-8')):0x}"
0379             # try to get lock
0380             got_lock = taskBuffer.lockProcess_PANDA(component=lock_component_name, pid=my_full_pid, time_limit=period)
0381             if got_lock:
0382                 main_logger.debug(f"got lock of {metric_name}")
0383             else:
0384                 main_logger.debug(f"{metric_name} locked by other process; skipped...")
0385                 continue
0386             main_logger.debug(f"start {metric_name}")
0387             # clean up
0388             tedb.clean_up(metric=metric_name, fresher_than_minutes_ago=120)
0389             main_logger.debug(f"cleaned up {metric_name}")
0390             # fetch data and update DB
0391             the_method = getattr(fetcher, metric_name)
0392             fetched_data = the_method()
0393             if fetched_data is None:
0394                 main_logger.warning(f"{metric_name} got no valid data")
0395                 continue
0396             tedb.update(metric=metric_name, entity_dict=fetched_data)
0397             main_logger.debug(f"done {metric_name}")
0398     # stop taskBuffer if created inside this script
0399     if tbuf is None:
0400         taskBuffer.cleanup(requester=requester_id)
0401 
0402 
0403 # run
0404 if __name__ == "__main__":
0405     main()