File indexing completed on 2026-04-10 08:39:03
0001 import datetime
0002 import json
0003 import re
0004 import sys
0005 from typing import Dict
0006
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandautils.PandaUtils import naive_utcnow
0009
0010 from pandaserver.config import panda_config
0011 from pandaserver.srvcore import CoreUtils
0012 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule
0013 from pandaserver.taskbuffer.JobSpec import JobSpec, get_task_queued_time
0014
0015
0016
0017 class MetricsModule(BaseModule):
0018
0019 def __init__(self, log_stream: LogWrapper):
0020 super().__init__(log_stream)
0021
0022
0023 def set_workload_metrics(self, jedi_task_id: int, panda_id: int | None, metrics: dict, use_commit: bool = True) -> bool:
0024 """
0025 Set job or task metrics
0026
0027 :param jedi_task_id: jediTaskID
0028 :param panda_id: PandaID. None to set task
0029 :param metrics: metrics data
0030 :param use_commit: use commit
0031 :return: True if success
0032 """
0033 comment = " /* DBProxy.set_workload_metrics */"
0034 if panda_id is not None:
0035 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} PandaID={panda_id}")
0036 else:
0037 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0038 tmp_log.debug("start")
0039 try:
0040 if panda_id is not None:
0041 table_name = "Job_Metrics"
0042 var_map = {":jediTaskID": jedi_task_id, ":PandaID": panda_id}
0043 else:
0044 table_name = "Task_Metrics"
0045 var_map = {":jediTaskID": jedi_task_id}
0046
0047 sql_check = f"SELECT data FROM {panda_config.schemaPANDA}.{table_name} WHERE jediTaskID=:jediTaskID "
0048 if panda_id is not None:
0049 sql_check += "AND PandaID=:PandaID "
0050
0051 sql_insert = f"INSERT INTO {panda_config.schemaPANDA}.{table_name} "
0052 if panda_id is not None:
0053 sql_insert += "(jediTaskID,PandaID,creationTime,modificationTime,data) VALUES(:jediTaskID,:PandaID,CURRENT_DATE,CURRENT_DATE,:data) "
0054 else:
0055 sql_insert += "(jediTaskID,creationTime,modificationTime,data) VALUES(:jediTaskID,CURRENT_DATE,CURRENT_DATE,:data) "
0056
0057 sql_update = f"UPDATE {panda_config.schemaPANDA}.{table_name} SET modificationTime=CURRENT_DATE,data=:data WHERE jediTaskID=:jediTaskID "
0058 if panda_id is not None:
0059 sql_update += "AND PandaID=:PandaID "
0060
0061 if use_commit:
0062 self.conn.begin()
0063
0064 self.cur.execute(sql_check + comment, var_map)
0065
0066 tmp_data = None
0067 for (clob_data,) in self.cur:
0068 try:
0069 tmp_data = clob_data.read()
0070 except AttributeError:
0071 tmp_data = str(clob_data)
0072 break
0073 if not tmp_data:
0074
0075 var_map[":data"] = json.dumps(metrics, cls=CoreUtils.NonJsonObjectEncoder)
0076 self.cur.execute(sql_insert + comment, var_map)
0077 tmp_log.debug("inserted")
0078 else:
0079
0080 tmp_data = json.loads(tmp_data, object_hook=CoreUtils.as_python_object)
0081 tmp_data.update(metrics)
0082 var_map[":data"] = json.dumps(tmp_data, cls=CoreUtils.NonJsonObjectEncoder)
0083 self.cur.execute(sql_update + comment, var_map)
0084 tmp_log.debug("updated")
0085 if use_commit:
0086
0087 if not self._commit():
0088 raise RuntimeError("Commit error")
0089 tmp_log.debug("done")
0090 return True
0091 except Exception:
0092
0093 self._rollback()
0094
0095 self.dump_error_message(tmp_log)
0096 return False
0097
0098
0099 def get_workload_metrics(self, jedi_task_id: int, panda_id: int = None) -> tuple[bool, dict | None]:
0100 """
0101 Get job metrics or task metrics
0102
0103 :param jedi_task_id: jediTaskID
0104 :param panda_id: PandaID. None to get task metrics
0105 :return: (False, None) if failed, otherwise (True, metrics)
0106 """
0107 comment = " /* DBProxy.get_workload_metrics */"
0108 if panda_id is not None:
0109 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} PandaID={panda_id}")
0110 else:
0111 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0112 tmp_log.debug("start")
0113 try:
0114 if panda_id is not None:
0115 table_name = "Job_Metrics"
0116 var_map = {":jediTaskID": jedi_task_id, ":PandaID": panda_id}
0117 else:
0118 table_name = "Task_Metrics"
0119 var_map = {":jediTaskID": jedi_task_id}
0120
0121 sql_get = f"SELECT data FROM {panda_config.schemaPANDA}.{table_name} WHERE jediTaskID=:jediTaskID "
0122 if panda_id is not None:
0123 sql_get += "AND PandaID=:PandaID "
0124 self.cur.execute(sql_get + comment, var_map)
0125
0126 metrics = None
0127 for (clob_data,) in self.cur:
0128 try:
0129 metrics = clob_data.read()
0130 except AttributeError:
0131 metrics = str(clob_data)
0132 break
0133 if metrics is not None:
0134 metrics = json.loads(metrics, object_hook=CoreUtils.as_python_object)
0135 tmp_log.debug(f"got {sys.getsizeof(metrics)} bytes")
0136 else:
0137 tmp_log.debug("no data")
0138 return True, metrics
0139 except Exception:
0140
0141 self.dump_error_message(tmp_log)
0142 return False, None
0143
0144
0145 def get_jobs_metrics_in_task(self, jedi_task_id: int) -> tuple[bool, list | None]:
0146 """
0147 Get metrics of jobs in a task
0148
0149 :param jedi_task_id: jediTaskID
0150 :return: (False, None) if failed, otherwise (True, list of [PandaID, metrics])
0151 """
0152 comment = " /* DBProxy.get_jobs_metrics_in_task */"
0153 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0154 tmp_log.debug("start")
0155 try:
0156 var_map = {":jediTaskID": jedi_task_id}
0157
0158 sql_get = f"SELECT PandaID,data FROM {panda_config.schemaPANDA}.Job_Metrics WHERE jediTaskID=:jediTaskID "
0159 self.cur.execute(sql_get + comment, var_map)
0160
0161 metrics_list = []
0162 for panda_id, clob_data in self.cur:
0163 try:
0164 tmp_data = clob_data.read()
0165 except AttributeError:
0166 tmp_data = str(clob_data)
0167 metrics_list.append([panda_id, json.loads(tmp_data, object_hook=CoreUtils.as_python_object)])
0168 tmp_log.debug(f"got metrics for {len(metrics_list)} jobs")
0169 return True, metrics_list
0170 except Exception:
0171
0172 self.dump_error_message(tmp_log)
0173 return False, None
0174
0175
0176 def update_task_queued_activated_times(self, jedi_task_id: int) -> None:
0177 """
0178 Update task queued time depending on the number of inputs to be processed. Update queued and activated times if they were None and inputs become available.
0179 Set None if no input is available and record the queuing duration to task metrics.
0180
0181 :param jedi_task_id: task's jediTaskID
0182 """
0183 comment = " /* DBProxy.update_task_queued_activated_times */"
0184 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0185 tmp_log.debug("start")
0186
0187 sql_check = (
0188 f"SELECT status,oldStatus,queuedTime,activatedTime,currentPriority,gshare FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
0189 )
0190 var_map = {":jediTaskID": jedi_task_id}
0191 self.cur.execute(sql_check + comment, var_map)
0192 res = self.cur.fetchone()
0193 if not res:
0194
0195 tmp_log.debug("task not found")
0196 return
0197 (task_status, task_old_status, queued_time, activated_time, current_priority, global_share) = res
0198 has_input = False
0199 active_status_list = ("ready", "running", "scouting", "scouted")
0200 if task_status in active_status_list or (task_old_status in active_status_list and task_status == "pending"):
0201
0202 sql_input = (
0203 f"SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) AND masterID IS NULL "
0204 "AND nFilesToBeUsed>0 AND nFilesToBeUSed>nFilesUsed "
0205 )
0206 var_map = {":jediTaskID": jedi_task_id, ":type1": "input", ":type2": "pseudo_input"}
0207 self.cur.execute(sql_input + comment, var_map)
0208 res = self.cur.fetchone()
0209 has_input = res is not None
0210
0211 if has_input:
0212 if queued_time is None:
0213
0214 tmp_log.debug(f"set queued time when task is {task_status}")
0215 sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET queuedTime=CURRENT_DATE WHERE jediTaskID=:jediTaskID AND queuedTime IS NULL "
0216 var_map = {":jediTaskID": jedi_task_id}
0217 self.cur.execute(sql_update + comment, var_map)
0218 n_row = self.cur.rowcount
0219 if n_row > 0 and activated_time is None:
0220
0221 tmp_log.debug(f"set activated time when task is {task_status}")
0222 sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET activatedTime=CURRENT_DATE WHERE jediTaskID=:jediTaskID AND activatedTime IS NULL "
0223 var_map = {":jediTaskID": jedi_task_id}
0224 self.cur.execute(sql_update + comment, var_map)
0225 else:
0226 tmp_log.debug(f"keep current queued time {queued_time.strftime('%Y-%m-%d %H:%M:%S')}")
0227
0228 if queued_time is not None and not has_input:
0229
0230 tmp_log.debug(f"unset queued time and record duration when task is {task_status}")
0231 sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET queuedTime=NULL WHERE jediTaskID=:jediTaskID AND queuedTime IS NOT NULL "
0232 var_map = {":jediTaskID": jedi_task_id}
0233 self.cur.execute(sql_update + comment, var_map)
0234
0235 tmp_success, task_metrics = self.get_workload_metrics(jedi_task_id, None)
0236 if not tmp_success:
0237 err_str = f"failed to get task metrics for jediTaskId={jedi_task_id}"
0238 tmp_log.error(err_str)
0239 return
0240
0241 if not task_metrics:
0242 task_metrics = {}
0243
0244 task_metrics.setdefault("queuingPeriods", [])
0245 if len(task_metrics["queuingPeriods"]) < 10000:
0246 task_metrics["queuingPeriods"].append({"start": queued_time, "end": naive_utcnow(), "status": task_status})
0247 tmp_success = self.set_workload_metrics(jedi_task_id, None, task_metrics, False)
0248 if not tmp_success:
0249 err_str = f"failed to update task metrics for jediTaskId={jedi_task_id}"
0250 tmp_log.error(err_str)
0251 return
0252 else:
0253 tmp_log.debug("skipped since queuing period list is too long")
0254 tmp_log.debug(f"done in {task_status}")
0255
0256
0257 def record_job_queuing_period(self, panda_id: int, job_spec: JobSpec = None) -> bool | None:
0258 """
0259 Record queuing period in job metrics. Skip if job.jobMetrics doesn't contain task queued time
0260
0261 :param panda_id: Job's PandaID
0262 :param job_spec: job spec. None to get it from the database
0263 :return: True if success. False if failed. None if skipped
0264 """
0265 comment = " /* DBProxy.record_job_queuing_period */"
0266 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id}")
0267 tmp_log.debug(f"start with job spec: {job_spec is None}")
0268
0269 if job_spec is None:
0270 sql_check = f"SELECT jediTaskID,jobStatus,specialHandling FROM {panda_config.schemaPANDA}.jobsActive4 WHERE PandaID=:PandaID "
0271 var_map = {":PandaID": panda_id}
0272 self.cur.execute(sql_check + comment, var_map)
0273 res = self.cur.fetchone()
0274 if not res:
0275
0276 tmp_log.debug("job not found")
0277 return None
0278 jedi_task_id, job_status, tmp_str = res
0279 else:
0280 jedi_task_id = job_spec.jediTaskID
0281 job_status = job_spec.jobStatus
0282 tmp_str = job_spec.specialHandling
0283 task_queued_time = get_task_queued_time(tmp_str)
0284
0285 if jedi_task_id and task_queued_time:
0286 tmp_log.debug(f"to record queuing period")
0287
0288 tmp_success, job_metrics = self.get_workload_metrics(jedi_task_id, panda_id)
0289 if not tmp_success:
0290 err_str = "Failed to get job metrics "
0291 tmp_log.error(err_str)
0292 return False
0293
0294 if not job_metrics:
0295 job_metrics = {}
0296
0297 if "queuingPeriod" in job_metrics:
0298 tmp_log.debug("skipped since queuing period already exists")
0299 return None
0300 else:
0301 job_metrics["queuingPeriod"] = {
0302 "start": task_queued_time,
0303 "end": naive_utcnow(),
0304 "status": job_status,
0305 }
0306 tmp_success = self.set_workload_metrics(jedi_task_id, panda_id, job_metrics, False)
0307 if not tmp_success:
0308 err_str = "Failed to update job metrics"
0309 tmp_log.error(err_str)
0310 return False
0311 tmp_log.debug(f"done in {job_status}")
0312 return True
0313 else:
0314 tmp_log.debug(f"skipped as jediTaskID={jedi_task_id} taskQueuedTime={task_queued_time}")
0315 return None
0316
0317
0318 def unset_task_activated_time(self, jedi_task_id: int, task_status: str = None) -> bool | None:
0319 """
0320 Unset activated time and record active period in task metrics
0321
0322 :param jedi_task_id: task's JediTaskID
0323 :param task_status: task status. None to get it from the database
0324 :return: True if success. False if failed. None if skipped
0325 """
0326 frozen_status_list = ("done", "finished", "failed", "broken", "paused", "exhausted")
0327 if task_status is not None and task_status not in frozen_status_list:
0328 return None
0329 comment = " /* DBProxy.record_task_active_period */"
0330 tmp_log = self.create_tagged_logger(comment, f"JediTaskID={jedi_task_id}")
0331 tmp_log.debug(f"start")
0332
0333 sql_check = f"SELECT status,activatedTime FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
0334 var_map = {":jediTaskID": jedi_task_id}
0335 self.cur.execute(sql_check + comment, var_map)
0336 res = self.cur.fetchone()
0337 if not res:
0338
0339 tmp_log.debug("task not found")
0340 return None
0341 task_status, activated_time = res
0342
0343 if task_status in frozen_status_list and activated_time:
0344 tmp_log.debug(f"to record active period when task is {task_status}")
0345
0346 tmp_success, task_metrics = self.get_workload_metrics(jedi_task_id, None)
0347 if not tmp_success:
0348 err_str = "Failed to get task metrics "
0349 tmp_log.error(err_str)
0350 return False
0351
0352 if not task_metrics:
0353 task_metrics = {}
0354
0355 task_metrics.setdefault("activePeriod", [])
0356 task_metrics["activePeriod"].append(
0357 {
0358 "start": activated_time,
0359 "end": naive_utcnow(),
0360 "status": task_status,
0361 }
0362 )
0363 tmp_success = self.set_workload_metrics(jedi_task_id, None, task_metrics, False)
0364 if not tmp_success:
0365 err_str = "Failed to update task metrics"
0366 tmp_log.error(err_str)
0367 return False
0368
0369 tmp_log.debug(f"unset activated time")
0370 sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET activatedTime=NULL WHERE jediTaskID=:jediTaskID AND activatedTime IS NOT NULL "
0371 var_map = {":jediTaskID": jedi_task_id}
0372 self.cur.execute(sql_update + comment, var_map)
0373 tmp_log.debug(f"done in {task_status}")
0374 return True
0375
0376
0377 def checkFailureCountWithCorruptedFiles(self, jediTaskID, pandaID):
0378 comment = " /* DBProxy.checkFailureCountWithCorruptedFiles */"
0379 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} pandaID={pandaID}")
0380
0381 sqlBD = "SELECT f2.lfn,COUNT(*) FROM ATLAS_PANDA.filesTable4 f1, ATLAS_PANDA.filesTable4 f2 "
0382 sqlBD += "WHERE f1.PandaID=:PandaID AND f1.type=:type AND f1.status=:status "
0383 sqlBD += "AND f2.lfn=f1.lfn AND f2.type=:type AND f2.status=:status AND f2.jediTaskID=:jediTaskID "
0384 sqlBD += "GROUP BY f2.lfn "
0385 varMap = {}
0386 varMap[":jediTaskID"] = jediTaskID
0387 varMap[":PandaID"] = pandaID
0388 varMap[":status"] = "corrupted"
0389 varMap[":type"] = "zipinput"
0390 self.cur.execute(sqlBD + comment, varMap)
0391 resBD = self.cur.fetchall()
0392 tooMany = False
0393 for lfn, nFailed in resBD:
0394 tmp_log.debug(f"{nFailed} failures with {lfn}")
0395 if nFailed >= 3:
0396 tooMany = True
0397 tmp_log.debug(f"too many failures : {tooMany}")
0398 return tooMany
0399
0400
0401 def get_task_failure_metrics(self, task_id, use_commit=True):
0402 comment = " /* JediDBProxy.get_task_failure_metrics */"
0403 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={task_id}")
0404 tmp_log.debug("start")
0405 try:
0406
0407 if use_commit:
0408 self.conn.begin()
0409
0410 sql = (
0411 f"SELECT SUM(is_finished),SUM(is_failed),SUM(HS06SEC*is_finished),SUM(HS06SEC*is_failed) "
0412 f"FROM ("
0413 f"SELECT PandaID, HS06SEC, CASE WHEN jobStatus='finished' THEN 1 ELSE 0 END is_finished, "
0414 f"CASE WHEN jobStatus='failed' THEN 1 ELSE 0 END is_failed "
0415 f"FROM {panda_config.schemaPANDA}.jobsArchived4 "
0416 f"WHERE jediTaskID=:jediTaskID AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) "
0417 f"UNION "
0418 f"SELECT PandaID, HS06SEC, CASE WHEN jobStatus='finished' THEN 1 ELSE 0 END is_finished, "
0419 f"CASE WHEN jobStatus='failed' THEN 1 ELSE 0 END is_failed "
0420 f"FROM {panda_config.schemaPANDAARCH}.jobsArchived "
0421 f"WHERE jediTaskID=:jediTaskID AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) "
0422 f")"
0423 )
0424 var_map = {
0425 ":jediTaskID": task_id,
0426 ":prodSourceLabel1": "managed",
0427 ":prodSourceLabel2": "user",
0428 }
0429 self.cur.execute(sql + comment, var_map)
0430 num_finished, num_failed, good_hep_score_sec, bad_hep_score_sec = self.cur.fetchone()
0431 ret_dict = {
0432 "num_failed": num_failed,
0433 "single_failure_rate": (
0434 round(num_failed / (num_finished + num_failed), 3)
0435 if num_finished is not None and num_failed is not None and num_finished + num_failed
0436 else None
0437 ),
0438 "failed_hep_score_hour": int(bad_hep_score_sec / 60 / 60) if bad_hep_score_sec is not None else None,
0439 "failed_hep_score_ratio": (
0440 round(bad_hep_score_sec / (good_hep_score_sec + bad_hep_score_sec), 3)
0441 if good_hep_score_sec is not None and bad_hep_score_sec is not None and good_hep_score_sec + bad_hep_score_sec
0442 else None
0443 ),
0444 }
0445
0446 if use_commit:
0447 if not self._commit():
0448 raise RuntimeError("Commit error")
0449
0450 tmp_log.debug(f"got {ret_dict}")
0451 return ret_dict
0452 except Exception:
0453
0454 if use_commit:
0455 self._rollback()
0456
0457 self.dump_error_message(tmp_log)
0458 return None
0459
0460
0461 def getAvgDiskIO_JEDI(self):
0462 comment = " /* JediDBProxy.getAvgDiskIO_JEDI */"
0463 tmp_log = self.create_tagged_logger(comment)
0464 tmp_log.debug("start")
0465 try:
0466
0467 sql = f"SELECT sum(prorated_diskio_avg * njobs) / sum(njobs), computingSite FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS "
0468 sql += "WHERE jobStatus=:jobStatus GROUP BY computingSite "
0469 var_map = dict()
0470 var_map[":jobStatus"] = "running"
0471
0472 self.conn.begin()
0473 self.cur.execute(sql + comment, var_map)
0474 resFL = self.cur.fetchall()
0475
0476 if not self._commit():
0477 raise RuntimeError("Commit error")
0478 ret_map = dict()
0479 for avg, computing_site in resFL:
0480 if avg:
0481 avg = float(avg)
0482 ret_map[computing_site] = avg
0483 tmp_log.debug("done")
0484 return ret_map
0485 except Exception:
0486
0487 self._rollback()
0488
0489 self.dump_error_message(tmp_log)
0490 return {}
0491
0492
0493 def get_task_carbon_footprint(self, jedi_task_id, level):
0494 comment = " /* JediDBProxy.get_task_carbon_footprint */"
0495 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} n_files={level}")
0496 tmp_log.debug("start")
0497
0498 if level == "regional":
0499 gco2_column = "GCO2_REGIONAL"
0500 else:
0501 gco2_column = "GCO2_GLOBAL"
0502
0503 try:
0504 sql = (
0505 "SELECT jobstatus, SUM(sum_gco2) FROM ( "
0506 " SELECT jobstatus, SUM({gco2_column}) sum_gco2 FROM {active_schema}.jobsarchived4 "
0507 " WHERE jeditaskid =:jeditaskid "
0508 " GROUP BY jobstatus "
0509 " UNION "
0510 " SELECT jobstatus, SUM({gco2_column}) sum_gco2 FROM {archive_schema}.jobsarchived "
0511 " WHERE jeditaskid =:jeditaskid "
0512 " GROUP BY jobstatus)"
0513 "GROUP BY jobstatus".format(gco2_column=gco2_column, active_schema=panda_config.schemaJEDI, archive_schema=panda_config.schemaPANDAARCH)
0514 )
0515 var_map = {":jeditaskid": jedi_task_id}
0516
0517
0518 self.conn.begin()
0519 self.cur.execute(sql + comment, var_map)
0520 results = self.cur.fetchall()
0521
0522 footprint = {"total": 0}
0523 data = False
0524 for job_status, g_co2 in results:
0525 if not g_co2:
0526 g_co2 = 0
0527 else:
0528 data = True
0529 footprint[job_status] = g_co2
0530 footprint["total"] += g_co2
0531
0532
0533 if not self._commit():
0534 raise RuntimeError("Commit error")
0535
0536 tmp_log.debug(f"done: {footprint}")
0537
0538 if not data:
0539 return None
0540
0541 return footprint
0542 except Exception:
0543
0544 self._rollback()
0545
0546 self.dump_error_message(tmp_log)
0547 return None
0548
0549
0550 def getJobStatisticsWithWorkQueue_JEDI(self, vo, prodSourceLabel, minPriority=None, cloud=None):
0551 comment = " /* DBProxy.getJobStatisticsWithWorkQueue_JEDI */"
0552 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} cloud={cloud}")
0553 tmpLog.debug(f"start minPriority={minPriority}")
0554 sql0 = "SELECT computingSite,cloud,jobStatus,workQueue_ID,COUNT(*) FROM %s "
0555 sql0 += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
0556 if cloud is not None:
0557 sql0 += "AND cloud=:cloud "
0558 tmpPrioMap = {}
0559 if minPriority is not None:
0560 sql0 += "AND currentPriority>=:minPriority "
0561 tmpPrioMap[":minPriority"] = minPriority
0562 sql0 += "GROUP BY computingSite,cloud,prodSourceLabel,jobStatus,workQueue_ID "
0563 sqlMV = sql0
0564 sqlMV = re.sub("COUNT\(\*\)", "SUM(num_of_jobs)", sqlMV)
0565 sqlMV = re.sub("SELECT ", "SELECT /*+ RESULT_CACHE */ ", sqlMV)
0566 tables = [f"{panda_config.schemaPANDA}.jobsActive4", f"{panda_config.schemaPANDA}.jobsDefined4"]
0567 if minPriority is not None:
0568
0569 tables.append(f"{panda_config.schemaPANDA}.jobsActive4")
0570 sqlMVforRun = re.sub("currentPriority>=", "currentPriority<=", sqlMV)
0571 varMap = {}
0572 varMap[":vo"] = vo
0573 varMap[":prodSourceLabel"] = prodSourceLabel
0574 if cloud is not None:
0575 varMap[":cloud"] = cloud
0576 for tmpPrio in tmpPrioMap.keys():
0577 varMap[tmpPrio] = tmpPrioMap[tmpPrio]
0578 returnMap = {}
0579 try:
0580 iActive = 0
0581 for table in tables:
0582
0583 self.conn.begin()
0584
0585 self.cur.arraysize = 10000
0586 useRunning = None
0587 if table == f"{panda_config.schemaPANDA}.jobsActive4":
0588 mvTableName = f"{panda_config.schemaPANDA}.MV_JOBSACTIVE4_STATS"
0589
0590 if minPriority is not None:
0591 if iActive == 0:
0592 useRunning = False
0593 else:
0594 useRunning = True
0595 iActive += 1
0596 if useRunning in [None, False]:
0597 sqlExeTmp = (sqlMV + comment) % mvTableName
0598 else:
0599 sqlExeTmp = (sqlMVforRun + comment) % mvTableName
0600 else:
0601 sqlExeTmp = (sql0 + comment) % table
0602 self.cur.execute(sqlExeTmp, varMap)
0603 res = self.cur.fetchall()
0604
0605 if not self._commit():
0606 raise RuntimeError("Commit error")
0607
0608 for computingSite, cloud, jobStatus, workQueue_ID, nCount in res:
0609
0610 if useRunning is True and jobStatus != "running":
0611 continue
0612
0613 if useRunning is False and jobStatus == "running":
0614 continue
0615
0616 if computingSite not in returnMap:
0617 returnMap[computingSite] = {}
0618
0619 if workQueue_ID not in returnMap[computingSite]:
0620 returnMap[computingSite][workQueue_ID] = {}
0621
0622 if jobStatus not in returnMap[computingSite][workQueue_ID]:
0623 returnMap[computingSite][workQueue_ID][jobStatus] = 0
0624
0625 returnMap[computingSite][workQueue_ID][jobStatus] += nCount
0626
0627 tmpLog.debug("done")
0628 return True, returnMap
0629 except Exception:
0630
0631 self._rollback()
0632
0633 self.dump_error_message(tmpLog)
0634 return False, {}
0635
0636
0637 def get_core_statistics(self, vo: str, prod_source_label: str) -> [bool, dict]:
0638 comment = " /* DBProxy.get_core_statistics */"
0639 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prod_source_label}")
0640 tmpLog.debug("start")
0641 sql0 = f"SELECT /*+ RESULT_CACHE */ computingSite,jobStatus,SUM(num_of_cores) FROM {panda_config.schemaPANDA}.MV_JOBSACTIVE4_STATS "
0642 sql0 += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
0643 sql0 += "GROUP BY computingSite,cloud,prodSourceLabel,jobStatus "
0644 var_map = {":vo": vo, ":prodSourceLabel": prod_source_label}
0645 return_map = {}
0646 try:
0647 self.conn.begin()
0648
0649 self.cur.arraysize = 10000
0650 self.cur.execute(sql0 + comment, var_map)
0651 res = self.cur.fetchall()
0652
0653 if not self._commit():
0654 raise RuntimeError("Commit error")
0655
0656 for computing_site, job_status, n_core in res:
0657
0658 return_map.setdefault(computing_site, {})
0659
0660 return_map[computing_site].setdefault(job_status, 0)
0661
0662 return_map[computing_site][job_status] += n_core
0663
0664 tmpLog.debug("done")
0665 return True, return_map
0666 except Exception:
0667
0668 self._rollback()
0669
0670 self.dump_error_message(tmpLog)
0671 return False, {}
0672
0673
0674 def getJobStatisticsByGlobalShare(self, vo, exclude_rwq):
0675 """
0676 :param vo: Virtual Organization
0677 :param exclude_rwq: True/False. Indicates whether we want to indicate special workqueues from the statistics
0678 """
0679 comment = " /* DBProxy.getJobStatisticsByGlobalShare */"
0680 tmpLog = self.create_tagged_logger(comment, f" vo={vo}")
0681 tmpLog.debug("start")
0682
0683
0684 var_map = {":vo": vo}
0685
0686
0687 sql_jt = """
0688 SELECT /*+ RESULT_CACHE */ computingSite, jobStatus, gShare, SUM(njobs) FROM %s
0689 WHERE vo=:vo
0690 """
0691
0692 if exclude_rwq:
0693 sql_jt += f"""
0694 AND workqueue_id NOT IN
0695 (SELECT queue_id FROM {panda_config.schemaPANDA}.jedi_work_queue WHERE queue_function = 'Resource')
0696 """
0697
0698 sql_jt += """
0699 GROUP BY computingSite, jobStatus, gshare
0700 """
0701
0702 tables = [f"{panda_config.schemaPANDA}.JOBS_SHARE_STATS", f"{panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS"]
0703
0704 return_map = {}
0705 try:
0706 for table in tables:
0707 self.cur.arraysize = 10000
0708 sql_exe = (sql_jt + comment) % table
0709 self.cur.execute(sql_exe, var_map)
0710 res = self.cur.fetchall()
0711
0712
0713 for panda_site, status, gshare, n_count in res:
0714
0715 return_map.setdefault(panda_site, {})
0716
0717 return_map[panda_site].setdefault(gshare, {})
0718
0719 return_map[panda_site][gshare].setdefault(status, 0)
0720
0721 return_map[panda_site][gshare][status] += n_count
0722
0723 tmpLog.debug("done")
0724 return True, return_map
0725 except Exception:
0726 self.dump_error_message(tmpLog)
0727 return False, {}
0728
0729 def getJobStatisticsByResourceType(self, workqueue):
0730 """
0731 This function will return the job statistics for a particular workqueue, broken down by resource type
0732 (SCORE, MCORE, etc.)
0733 :param workqueue: workqueue object
0734 """
0735 comment = " /* DBProxy.getJobStatisticsByResourceType */"
0736 tmpLog = self.create_tagged_logger(comment, f"workqueue={workqueue}")
0737 tmpLog.debug("start")
0738
0739
0740 var_map = {":vo": workqueue.VO}
0741
0742
0743 sql_jt = "SELECT /*+ RESULT_CACHE */ jobstatus, resource_type, SUM(njobs) FROM %s WHERE vo=:vo "
0744
0745 if workqueue.is_global_share:
0746 sql_jt += "AND gshare=:gshare "
0747 sql_jt += f"AND workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaPANDA}.jedi_work_queue WHERE queue_function = 'Resource') "
0748 var_map[":gshare"] = workqueue.queue_name
0749 else:
0750 sql_jt += "AND workqueue_id=:workqueue_id "
0751 var_map[":workqueue_id"] = workqueue.queue_id
0752
0753 sql_jt += "GROUP BY jobstatus, resource_type "
0754
0755 tables = [f"{panda_config.schemaPANDA}.JOBS_SHARE_STATS", f"{panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS"]
0756
0757 return_map = {}
0758 try:
0759 for table in tables:
0760 self.cur.arraysize = 10000
0761 sql_exe = (sql_jt + comment) % table
0762 self.cur.execute(sql_exe, var_map)
0763 res = self.cur.fetchall()
0764
0765
0766 for status, resource_type, n_count in res:
0767 return_map.setdefault(status, {})
0768 return_map[status][resource_type] = n_count
0769
0770 tmpLog.debug("done")
0771 return True, return_map
0772 except Exception:
0773 self.dump_error_message(tmpLog)
0774 return False, {}
0775
0776 def getJobStatisticsByResourceTypeSite(self, workqueue):
0777 """
0778 This function will return the job statistics per site for a particular workqueue, broken down by resource type
0779 (SCORE, MCORE, etc.)
0780 :param workqueue: workqueue object
0781 """
0782 comment = " /* DBProxy.getJobStatisticsByResourceTypeSite */"
0783 tmpLog = self.create_tagged_logger(comment, f"workqueue={workqueue}")
0784 tmpLog.debug("start")
0785
0786
0787 var_map = {":vo": workqueue.VO}
0788
0789
0790 sql_jt = "SELECT /*+ RESULT_CACHE */ jobstatus, resource_type, computingSite, SUM(njobs) FROM %s WHERE vo=:vo "
0791
0792 if workqueue.is_global_share:
0793 sql_jt += "AND gshare=:gshare "
0794 sql_jt += f"AND workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaPANDA}.jedi_work_queue WHERE queue_function = 'Resource') "
0795 var_map[":gshare"] = workqueue.queue_name
0796 else:
0797 sql_jt += "AND workqueue_id=:workqueue_id "
0798 var_map[":workqueue_id"] = workqueue.queue_id
0799
0800 sql_jt += "GROUP BY jobstatus, resource_type, computingSite "
0801
0802 tables = [f"{panda_config.schemaPANDA}.JOBS_SHARE_STATS", f"{panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS"]
0803
0804 return_map = {}
0805 try:
0806 for table in tables:
0807 self.cur.arraysize = 10000
0808 sql_exe = (sql_jt + comment) % table
0809 self.cur.execute(sql_exe, var_map)
0810 res = self.cur.fetchall()
0811
0812
0813 for status, resource_type, computingSite, n_count in res:
0814 return_map.setdefault(computingSite, {})
0815 return_map[computingSite].setdefault(resource_type, {})
0816 return_map[computingSite][resource_type][status] = n_count
0817
0818 tmpLog.debug("done")
0819 return True, return_map
0820 except Exception:
0821 self.dump_error_message(tmpLog)
0822 return False, {}
0823
0824
0825 def get_num_jobs_with_status_by_nucleus(self, vo: str, job_status: str) -> [bool, Dict[str, Dict[str, int]]]:
0826 """
0827 This function will return the number of jobs with a specific status for each nucleus at each site.
0828
0829 :param vo: Virtual Organization
0830 :param job_status: Job status
0831 :return: True/False and Dictionary with the number of jobs with a specific status for each nucleus at each site
0832 """
0833 comment = " /* DBProxy.get_num_jobs_with_status_by_nucleus */"
0834 tmp_log = self.create_tagged_logger(comment, f" vo={vo} status={job_status}")
0835 tmp_log.debug("start")
0836
0837
0838 var_map = {":vo": vo, ":job_status": job_status}
0839
0840
0841 sql_jt = """
0842 SELECT /*+ RESULT_CACHE */ computingSite, nucleus, SUM(njobs) FROM %s
0843 WHERE vo=:vo AND jobStatus=:job_status GROUP BY computingSite, nucleus
0844 """
0845
0846 if job_status in ["transferring", "running", "activated" "holding"]:
0847 table = f"{panda_config.schemaPANDA}.JOBS_SHARE_STATS"
0848 else:
0849 table = f"{panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS"
0850
0851 return_map = {}
0852 try:
0853 self.cur.arraysize = 10000
0854 sql_exe = (sql_jt + comment) % table
0855 self.cur.execute(sql_exe, var_map)
0856 res = self.cur.fetchall()
0857
0858
0859 for panda_site, nucleus, n_jobs in res:
0860 if n_jobs:
0861
0862 return_map.setdefault(panda_site, {})
0863
0864 return_map[panda_site][nucleus] = n_jobs
0865 tmp_log.debug("done")
0866 return True, return_map
0867 except Exception:
0868 self.dump_error_message(tmp_log)
0869 return False, {}
0870
0871
0872
0873 def get_metrics_module(base_mod) -> MetricsModule:
0874 return base_mod.get_composite_module("metrics")