File indexing completed on 2026-04-10 08:39:01
0001 import datetime
0002 import functools
0003 import json
0004 import os
0005 import socket
0006 import sys
0007 import traceback
0008 from zlib import adler32
0009
0010 from pandacommon.pandalogger import logger_utils
0011 from pandacommon.pandalogger.PandaLogger import PandaLogger
0012 from pandacommon.pandautils.PandaUtils import naive_utcnow
0013 from pandacommon.pandautils.thread_utils import GenericThread
0014
0015 from pandaserver.config import panda_config
0016 from pandaserver.daemons.scripts.metric_collector import MetricsDB
0017
0018
0019 main_logger = PandaLogger().getLogger("task_evaluator")
0020
0021
0022 DRY_RUN = False
0023
0024
0025 metric_list = [
0026 ("analy_task_eval", 10),
0027 ]
0028
0029
0030
0031
0032
0033 class TaskEvaluationDB(object):
0034 """
0035 Proxy to access the task_evaluation table in DB
0036 """
0037
0038 def __init__(self, tbuf):
0039 self.tbuf = tbuf
0040
0041 def _decor(method):
0042 def _decorator(_method, *args, **kwargs):
0043 @functools.wraps(_method)
0044 def _wrapped_method(self, *args, **kwargs):
0045 try:
0046 _method(self, *args, **kwargs)
0047 except Exception as exc:
0048 pass
0049
0050 return _wrapped_method
0051
0052 return _decorator(method)
0053
0054 def update(self, metric, entity_dict):
0055 tmp_log = logger_utils.make_logger(main_logger, "TaskEvaluationDB.update")
0056 tmp_log.debug(f"start metric={metric}")
0057
0058 sql_query_taskid = """SELECT jediTaskID """ """FROM ATLAS_PANDA.Task_Evaluation """ """WHERE metric = :metric """
0059 sql_update = (
0060 """UPDATE ATLAS_PANDA.Task_Evaluation SET """
0061 """value_json = :patch_value_json, """
0062 """timestamp = :timestamp """
0063 """WHERE jediTaskID=:taskID AND metric=:metric """
0064 )
0065 sql_insert = """INSERT INTO ATLAS_PANDA.Task_Evaluation """ """VALUES ( """ """:taskID, :metric, :patch_value_json, :timestamp """ """) """
0066
0067 now_time = naive_utcnow()
0068
0069 res = self.tbuf.querySQL(sql_query_taskid, {":metric": metric})
0070 existing_taskID_list = [taskID for (taskID,) in res]
0071
0072 varMap_template = {
0073 ":taskID": None,
0074 ":metric": metric,
0075 ":timestamp": now_time,
0076 ":patch_value_json": None,
0077 }
0078
0079 update_varMap_list = []
0080 insert_varMap_list = []
0081 for taskID, v in entity_dict.items():
0082
0083 try:
0084 patch_value_json = json.dumps(v)
0085 except Exception:
0086 tmp_log.error(traceback.format_exc() + " " + str(v))
0087 return
0088
0089 varMap = varMap_template.copy()
0090 varMap[":taskID"] = taskID
0091 varMap[":patch_value_json"] = patch_value_json
0092
0093 if taskID in existing_taskID_list:
0094 update_varMap_list.append(varMap)
0095 else:
0096 insert_varMap_list.append(varMap)
0097
0098 n_row = self.tbuf.executemanySQL(sql_update, update_varMap_list)
0099 if n_row < len(update_varMap_list):
0100 tmp_log.warning(f"only {n_row}/{len(update_varMap_list)} rows updated for metric={metric}")
0101 else:
0102 tmp_log.debug(f"updated {len(update_varMap_list)} rows for metric={metric}")
0103
0104 n_row = self.tbuf.executemanySQL(sql_insert, insert_varMap_list)
0105 if n_row is None:
0106
0107 n_row = 0
0108 for varMap in insert_varMap_list:
0109 res = self.tbuf.querySQL(sql_insert, varMap)
0110 try:
0111 n_row += res
0112 except TypeError:
0113 pass
0114 if n_row < len(insert_varMap_list):
0115 tmp_log.warning(f"only {n_row}/{len(insert_varMap_list)} rows inserted for metric={metric}")
0116 else:
0117 tmp_log.debug(f"inserted {len(insert_varMap_list)} rows for metric={metric}")
0118
0119 tmp_log.debug(f"done metric={metric}")
0120
0121 def get_metrics(self, metric, fresher_than_minutes_ago=120):
0122 tmp_log = logger_utils.make_logger(main_logger, "TaskEvaluationDB.update")
0123 tmp_log.debug(f"start metric={metric}")
0124
0125 sql_query = (
0126 """SELECT jediTaskID, value_json """ """FROM ATLAS_PANDA.Task_Evaluation """ """WHERE metric = :metric """ """AND timestamp >= :min_timestamp """
0127 )
0128
0129 now_time = naive_utcnow()
0130
0131 varMap = {
0132 ":metric": metric,
0133 ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0134 }
0135
0136 res = self.tbuf.querySQL(sql_query, varMap)
0137 if res is None:
0138 tmp_log.warning(f"failed to query metric={metric}")
0139 return
0140
0141 ret_map = {}
0142 for taskID, value_json in res:
0143 try:
0144 value_dict = json.loads(value_json)
0145 except Exception:
0146 tmp_log.error(traceback.format_exc() + " " + str(taskID) + str(value_json))
0147 else:
0148 ret_map[taskID] = value_dict
0149
0150 return ret_map
0151
0152 def clean_up(self, metric, fresher_than_minutes_ago=120):
0153 tmp_log = logger_utils.make_logger(main_logger, "TaskEvaluationDB.clean_up")
0154 tmp_log.debug(f"start metric={metric}")
0155
0156 sql_delete_terminated_tasks = (
0157 "DELETE "
0158 "FROM ATLAS_PANDA.Task_Evaluation te "
0159 "WHERE te.jediTaskID IN ( "
0160 "SELECT jt.jediTaskID "
0161 "FROM ATLAS_PANDA.JEDI_Tasks jt, ATLAS_PANDA.Task_Evaluation tez "
0162 "WHERE tez.jediTaskID = jt.jediTaskID "
0163 "AND jt.status IN ('done', 'finished', 'failed', 'broken', 'aborted', 'exhausted') "
0164 ") "
0165 "AND te.metric = :metric "
0166 "AND te.timestamp <= :max_timestamp "
0167 )
0168
0169 now_time = naive_utcnow()
0170
0171 varMap = {
0172 ":metric": metric,
0173 ":max_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0174 }
0175
0176 n_row = self.tbuf.querySQL(sql_delete_terminated_tasks, varMap)
0177 tmp_log.debug(f"cleaned up {n_row} rows for metric={metric}")
0178
0179
0180 class FetchData(object):
0181 """
0182 methods to fetch or evaluate data values to store
0183 """
0184
0185 def __init__(self, tbuf):
0186 self.tbuf = tbuf
0187
0188 self.gshare_status = None
0189
0190 def analy_task_eval(self):
0191 tmp_log = logger_utils.make_logger(main_logger, "FetchData")
0192
0193 sql_get_active_tasks = (
0194 "SELECT jt.jediTaskID, jt.userName, jt.gshare "
0195 "FROM ATLAS_PANDA.JEDI_Tasks jt, ATLAS_PANDA.JEDI_AUX_Status_MinTaskID asm "
0196 "WHERE jt.taskType = 'anal' AND jt.prodSourceLabel = 'user' "
0197 "AND jt.status=asm.status AND jt.jediTaskID >= asm.min_jediTaskID "
0198 "AND jt.status IN ('scouting', 'scouted', 'running', 'pending', 'throttled') "
0199 "AND jt.userName NOT IN ('gangarbt') "
0200 "AND jt.modificationTime >= CURRENT_DATE - 30 "
0201 )
0202 sql_get_task_dsinfo = (
0203 "SELECT ds.jediTaskID, SUM(ds.nFiles), SUM(ds.nFilesFinished), SUM(ds.nFilesFailed) "
0204 "FROM ATLAS_PANDA.JEDI_Datasets ds "
0205 "WHERE ds.jediTaskID = :taskID "
0206 "AND ds.type IN ('input', 'pseudo_input') "
0207 "AND ds.masterID IS NULL "
0208 "GROUP BY ds.jediTaskID "
0209 )
0210 sql_get_task_n_jobs = (
0211 "SELECT COUNT(*) FROM ( "
0212 "SELECT DISTINCT c.PandaID "
0213 "FROM ATLAS_PANDA.JEDI_Datasets ds, ATLAS_PANDA.JEDI_Dataset_Contents c "
0214 "WHERE c.jediTaskID=ds.jediTaskID AND c.datasetID=ds.datasetID "
0215 "AND ds.jediTaskID=:taskID AND ds.masterID IS NULL "
0216 "AND ds.type IN ('input', 'pseudo_input') "
0217 ") "
0218 )
0219 try:
0220
0221
0222 task_dict = dict()
0223
0224 now_time = naive_utcnow()
0225
0226 mdb = MetricsDB(self.tbuf)
0227
0228 ue_dict = mdb.get_metrics("analy_user_eval", "neither", fresher_than_minutes_ago=20)
0229
0230 varMap = {}
0231 active_tasks_list = self.tbuf.querySQL(sql_get_active_tasks, varMap)
0232 taskID_list = [task[0] for task in active_tasks_list]
0233 n_tot_tasks = len(active_tasks_list)
0234 tmp_log.debug(f"got total {n_tot_tasks} tasks")
0235
0236 cc = 0
0237 n_tasks_dict = {
0238 2: 0,
0239 1: 0,
0240 0: 0,
0241 -1: 0,
0242 }
0243
0244 for taskID, user, gshare in active_tasks_list:
0245
0246 task_class = 1
0247 n_files_total = 0
0248 n_files_finished = 0
0249 n_files_failed = 0
0250 n_files_remaining = 0
0251 pct_finished = 0
0252 pct_failed = 0
0253 n_jobs = 0
0254 n_jobs_remaining = 0
0255
0256 varMap = {":taskID": taskID}
0257 dsinfo_list = self.tbuf.querySQL(sql_get_task_dsinfo, varMap)
0258 dsinfo_dict = {
0259 tup[0]: {
0260 "nFiles": tup[1],
0261 "nFilesFinished": tup[2],
0262 "nFilesFailed": tup[3],
0263 }
0264 for tup in dsinfo_list
0265 }
0266
0267 tmp_res = self.tbuf.querySQL(sql_get_task_n_jobs, varMap)
0268 for obj in tmp_res:
0269 (n_jobs,) = obj
0270 dsinfo_dict.setdefault(taskID, {})
0271 dsinfo_dict[taskID]["nJobs"] = n_jobs
0272 break
0273
0274 ds_info = dsinfo_dict.get(taskID)
0275 if ds_info is not None:
0276 n_files_total = ds_info.get("nFiles", 0)
0277 n_files_finished = ds_info.get("nFilesFinished", 0)
0278 n_files_failed = ds_info.get("nFilesFailed", 0)
0279 n_files_remaining = max(n_files_total - n_files_finished - n_files_failed, 0)
0280 n_jobs = ds_info.get("nJobs", 0)
0281 if n_files_total > 0:
0282 pct_finished = n_files_finished * 100 / n_files_total
0283 pct_failed = n_files_failed * 100 / n_files_total
0284 n_jobs_remaining = int(n_jobs * n_files_remaining / n_files_total)
0285
0286 if gshare == "Express Analysis":
0287
0288 task_class = 2
0289 else:
0290
0291 progress_to_boost_A = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_A", default=90)
0292 progress_to_boost_B = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_B", default=95)
0293 max_rem_jobs_to_boost_A = self.tbuf.getConfigValue("analy_eval", "MAX_REM_JOBS_TO_BOOST_A", default=500)
0294 max_rem_jobs_to_boost_B = self.tbuf.getConfigValue("analy_eval", "MAX_REM_JOBS_TO_BOOST_B", default=200)
0295
0296 usage_dict = ue_dict.get(user)
0297 if usage_dict is None:
0298 continue
0299 if usage_dict["rem_slots_A"] <= 0:
0300 if usage_dict["rem_slots_B"] <= 0:
0301 task_class = -1
0302 else:
0303 task_class = 0
0304
0305 if task_class == 1 and pct_finished >= progress_to_boost_A and n_jobs_remaining > 0 and n_jobs_remaining <= max_rem_jobs_to_boost_A:
0306
0307 task_class = 2
0308 elif task_class == 0 and pct_finished >= progress_to_boost_B and n_jobs_remaining > 0 and n_jobs_remaining <= max_rem_jobs_to_boost_B:
0309
0310 task_class = 2
0311
0312 task_dict[taskID] = {
0313 "task_id": taskID,
0314 "user": user,
0315 "gshare": gshare,
0316 "n_files_total": n_files_total,
0317 "n_files_finished": n_files_finished,
0318 "n_files_failed": n_files_failed,
0319 "pct_finished": pct_finished,
0320 "pct_failed": pct_failed,
0321 "class": task_class,
0322 }
0323
0324 cc += 1
0325 if cc % 5000 == 0:
0326 tmp_log.debug(f"evaluated {cc:6d} tasks")
0327 n_tasks_dict[task_class] += 1
0328 tmp_log.debug(f"evaluated {cc:6d} tasks in total (S:{n_tasks_dict[2]}, A:{n_tasks_dict[1]}, B:{n_tasks_dict[0]}, C:{n_tasks_dict[-1]})")
0329
0330
0331 tmp_log.debug("done")
0332 return task_dict
0333 except Exception:
0334 tmp_log.error(traceback.format_exc())
0335
0336
0337
0338 def main(tbuf=None, **kwargs):
0339
0340 if tbuf is None:
0341 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0342
0343 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0344 taskBuffer.init(
0345 panda_config.dbhost,
0346 panda_config.dbpasswd,
0347 nDBConnection=1,
0348 useTimeout=True,
0349 requester=requester_id,
0350 )
0351 else:
0352 taskBuffer = tbuf
0353
0354 my_pid = os.getpid()
0355 my_full_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpgrp()}-{my_pid}"
0356
0357 if DRY_RUN:
0358
0359 fetcher = FetchData(taskBuffer)
0360
0361 for metric_name, period in metric_list:
0362 main_logger.debug(f"(dry-run) start {metric_name}")
0363
0364 the_method = getattr(fetcher, metric_name)
0365 fetched_data = the_method()
0366 if fetched_data is None:
0367 main_logger.warning(f"(dry-run) {metric_name} got no valid data")
0368 continue
0369 main_logger.debug(f"(dry-run) done {metric_name}")
0370 else:
0371
0372
0373 tedb = TaskEvaluationDB(taskBuffer)
0374 fetcher = FetchData(taskBuffer)
0375
0376 for metric_name, period in metric_list:
0377
0378 lock_component_name = f"pandaTaskEval.{metric_name:.30}.{adler32(metric_name.encode('utf-8')):0x}"
0379
0380 got_lock = taskBuffer.lockProcess_PANDA(component=lock_component_name, pid=my_full_pid, time_limit=period)
0381 if got_lock:
0382 main_logger.debug(f"got lock of {metric_name}")
0383 else:
0384 main_logger.debug(f"{metric_name} locked by other process; skipped...")
0385 continue
0386 main_logger.debug(f"start {metric_name}")
0387
0388 tedb.clean_up(metric=metric_name, fresher_than_minutes_ago=120)
0389 main_logger.debug(f"cleaned up {metric_name}")
0390
0391 the_method = getattr(fetcher, metric_name)
0392 fetched_data = the_method()
0393 if fetched_data is None:
0394 main_logger.warning(f"{metric_name} got no valid data")
0395 continue
0396 tedb.update(metric=metric_name, entity_dict=fetched_data)
0397 main_logger.debug(f"done {metric_name}")
0398
0399 if tbuf is None:
0400 taskBuffer.cleanup(requester=requester_id)
0401
0402
0403
0404 if __name__ == "__main__":
0405 main()