Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:03

0001 import datetime
0002 import json
0003 import re
0004 import sys
0005 from typing import Dict
0006 
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandautils.PandaUtils import naive_utcnow
0009 
0010 from pandaserver.config import panda_config
0011 from pandaserver.srvcore import CoreUtils
0012 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule
0013 from pandaserver.taskbuffer.JobSpec import JobSpec, get_task_queued_time
0014 
0015 
0016 # Module class to define metrics related methods
0017 class MetricsModule(BaseModule):
0018     # constructor
0019     def __init__(self, log_stream: LogWrapper):
0020         super().__init__(log_stream)
0021 
0022     # set job or task metrics
0023     def set_workload_metrics(self, jedi_task_id: int, panda_id: int | None, metrics: dict, use_commit: bool = True) -> bool:
0024         """
0025         Set job or task metrics
0026 
0027         :param jedi_task_id: jediTaskID
0028         :param panda_id: PandaID. None to set task
0029         :param metrics: metrics data
0030         :param use_commit: use commit
0031         :return: True if success
0032         """
0033         comment = " /* DBProxy.set_workload_metrics */"
0034         if panda_id is not None:
0035             tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} PandaID={panda_id}")
0036         else:
0037             tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0038         tmp_log.debug("start")
0039         try:
0040             if panda_id is not None:
0041                 table_name = "Job_Metrics"
0042                 var_map = {":jediTaskID": jedi_task_id, ":PandaID": panda_id}
0043             else:
0044                 table_name = "Task_Metrics"
0045                 var_map = {":jediTaskID": jedi_task_id}
0046             # check if data is already there
0047             sql_check = f"SELECT data FROM {panda_config.schemaPANDA}.{table_name} WHERE jediTaskID=:jediTaskID "
0048             if panda_id is not None:
0049                 sql_check += "AND PandaID=:PandaID "
0050             # insert data
0051             sql_insert = f"INSERT INTO {panda_config.schemaPANDA}.{table_name} "
0052             if panda_id is not None:
0053                 sql_insert += "(jediTaskID,PandaID,creationTime,modificationTime,data) VALUES(:jediTaskID,:PandaID,CURRENT_DATE,CURRENT_DATE,:data) "
0054             else:
0055                 sql_insert += "(jediTaskID,creationTime,modificationTime,data) VALUES(:jediTaskID,CURRENT_DATE,CURRENT_DATE,:data) "
0056             # update data
0057             sql_update = f"UPDATE {panda_config.schemaPANDA}.{table_name} SET modificationTime=CURRENT_DATE,data=:data WHERE jediTaskID=:jediTaskID "
0058             if panda_id is not None:
0059                 sql_update += "AND PandaID=:PandaID "
0060             # start transaction
0061             if use_commit:
0062                 self.conn.begin()
0063             # check if data is already there
0064             self.cur.execute(sql_check + comment, var_map)
0065             # read data
0066             tmp_data = None
0067             for (clob_data,) in self.cur:
0068                 try:
0069                     tmp_data = clob_data.read()
0070                 except AttributeError:
0071                     tmp_data = str(clob_data)
0072                 break
0073             if not tmp_data:
0074                 # insert new data
0075                 var_map[":data"] = json.dumps(metrics, cls=CoreUtils.NonJsonObjectEncoder)
0076                 self.cur.execute(sql_insert + comment, var_map)
0077                 tmp_log.debug("inserted")
0078             else:
0079                 # update existing data
0080                 tmp_data = json.loads(tmp_data, object_hook=CoreUtils.as_python_object)
0081                 tmp_data.update(metrics)
0082                 var_map[":data"] = json.dumps(tmp_data, cls=CoreUtils.NonJsonObjectEncoder)
0083                 self.cur.execute(sql_update + comment, var_map)
0084                 tmp_log.debug("updated")
0085             if use_commit:
0086                 # commit
0087                 if not self._commit():
0088                     raise RuntimeError("Commit error")
0089             tmp_log.debug("done")
0090             return True
0091         except Exception:
0092             # roll back
0093             self._rollback()
0094             # error
0095             self.dump_error_message(tmp_log)
0096             return False
0097 
0098     # get job or task metrics
0099     def get_workload_metrics(self, jedi_task_id: int, panda_id: int = None) -> tuple[bool, dict | None]:
0100         """
0101         Get job metrics or task metrics
0102 
0103         :param jedi_task_id: jediTaskID
0104         :param panda_id: PandaID. None to get task metrics
0105         :return: (False, None) if failed, otherwise (True, metrics)
0106         """
0107         comment = " /* DBProxy.get_workload_metrics */"
0108         if panda_id is not None:
0109             tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} PandaID={panda_id}")
0110         else:
0111             tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0112         tmp_log.debug("start")
0113         try:
0114             if panda_id is not None:
0115                 table_name = "Job_Metrics"
0116                 var_map = {":jediTaskID": jedi_task_id, ":PandaID": panda_id}
0117             else:
0118                 table_name = "Task_Metrics"
0119                 var_map = {":jediTaskID": jedi_task_id}
0120             # get data
0121             sql_get = f"SELECT data FROM {panda_config.schemaPANDA}.{table_name} WHERE jediTaskID=:jediTaskID "
0122             if panda_id is not None:
0123                 sql_get += "AND PandaID=:PandaID "
0124             self.cur.execute(sql_get + comment, var_map)
0125             # read data
0126             metrics = None
0127             for (clob_data,) in self.cur:
0128                 try:
0129                     metrics = clob_data.read()
0130                 except AttributeError:
0131                     metrics = str(clob_data)
0132                 break
0133             if metrics is not None:
0134                 metrics = json.loads(metrics, object_hook=CoreUtils.as_python_object)
0135                 tmp_log.debug(f"got {sys.getsizeof(metrics)} bytes")
0136             else:
0137                 tmp_log.debug("no data")
0138             return True, metrics
0139         except Exception:
0140             # error
0141             self.dump_error_message(tmp_log)
0142             return False, None
0143 
0144     # get jobs' metrics in a task
0145     def get_jobs_metrics_in_task(self, jedi_task_id: int) -> tuple[bool, list | None]:
0146         """
0147         Get metrics of jobs in a task
0148 
0149         :param jedi_task_id: jediTaskID
0150         :return: (False, None) if failed, otherwise (True, list of [PandaID, metrics])
0151         """
0152         comment = " /* DBProxy.get_jobs_metrics_in_task */"
0153         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0154         tmp_log.debug("start")
0155         try:
0156             var_map = {":jediTaskID": jedi_task_id}
0157             # get data
0158             sql_get = f"SELECT PandaID,data FROM {panda_config.schemaPANDA}.Job_Metrics WHERE jediTaskID=:jediTaskID "
0159             self.cur.execute(sql_get + comment, var_map)
0160             # read data
0161             metrics_list = []
0162             for panda_id, clob_data in self.cur:
0163                 try:
0164                     tmp_data = clob_data.read()
0165                 except AttributeError:
0166                     tmp_data = str(clob_data)
0167                 metrics_list.append([panda_id, json.loads(tmp_data, object_hook=CoreUtils.as_python_object)])
0168             tmp_log.debug(f"got metrics for {len(metrics_list)} jobs")
0169             return True, metrics_list
0170         except Exception:
0171             # error
0172             self.dump_error_message(tmp_log)
0173             return False, None
0174 
0175     # update task queued and activated times
0176     def update_task_queued_activated_times(self, jedi_task_id: int) -> None:
0177         """
0178         Update task queued time depending on the number of inputs to be processed. Update queued and activated times if they were None and inputs become available.
0179         Set None if no input is available and record the queuing duration to task metrics.
0180 
0181         :param jedi_task_id: task's jediTaskID
0182         """
0183         comment = " /* DBProxy.update_task_queued_activated_times */"
0184         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id}")
0185         tmp_log.debug("start")
0186         # check if the task is queued
0187         sql_check = (
0188             f"SELECT status,oldStatus,queuedTime,activatedTime,currentPriority,gshare FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
0189         )
0190         var_map = {":jediTaskID": jedi_task_id}
0191         self.cur.execute(sql_check + comment, var_map)
0192         res = self.cur.fetchone()
0193         if not res:
0194             # not found
0195             tmp_log.debug("task not found")
0196             return
0197         (task_status, task_old_status, queued_time, activated_time, current_priority, global_share) = res
0198         has_input = False
0199         active_status_list = ("ready", "running", "scouting", "scouted")
0200         if task_status in active_status_list or (task_old_status in active_status_list and task_status == "pending"):
0201             # check if the task has unprocessed inputs
0202             sql_input = (
0203                 f"SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) AND masterID IS NULL "
0204                 "AND nFilesToBeUsed>0 AND nFilesToBeUSed>nFilesUsed "
0205             )
0206             var_map = {":jediTaskID": jedi_task_id, ":type1": "input", ":type2": "pseudo_input"}
0207             self.cur.execute(sql_input + comment, var_map)
0208             res = self.cur.fetchone()
0209             has_input = res is not None
0210             # set queued time if it was None and inputs are available
0211             if has_input:
0212                 if queued_time is None:
0213                     # update queued time
0214                     tmp_log.debug(f"set queued time when task is {task_status}")
0215                     sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET queuedTime=CURRENT_DATE WHERE jediTaskID=:jediTaskID AND queuedTime IS NULL "
0216                     var_map = {":jediTaskID": jedi_task_id}
0217                     self.cur.execute(sql_update + comment, var_map)
0218                     n_row = self.cur.rowcount
0219                     if n_row > 0 and activated_time is None:
0220                         # set activated time
0221                         tmp_log.debug(f"set activated time when task is {task_status}")
0222                         sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET activatedTime=CURRENT_DATE WHERE jediTaskID=:jediTaskID AND activatedTime IS NULL "
0223                         var_map = {":jediTaskID": jedi_task_id}
0224                         self.cur.execute(sql_update + comment, var_map)
0225                 else:
0226                     tmp_log.debug(f"keep current queued time {queued_time.strftime('%Y-%m-%d %H:%M:%S')}")
0227         # record queuing duration since the task has no more input to process
0228         if queued_time is not None and not has_input:
0229             # unset queued time
0230             tmp_log.debug(f"unset queued time and record duration when task is {task_status}")
0231             sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET queuedTime=NULL WHERE jediTaskID=:jediTaskID AND queuedTime IS NOT NULL "
0232             var_map = {":jediTaskID": jedi_task_id}
0233             self.cur.execute(sql_update + comment, var_map)
0234             # get task metrics dict
0235             tmp_success, task_metrics = self.get_workload_metrics(jedi_task_id, None)
0236             if not tmp_success:
0237                 err_str = f"failed to get task metrics for jediTaskId={jedi_task_id}"
0238                 tmp_log.error(err_str)
0239                 return
0240             # new
0241             if not task_metrics:
0242                 task_metrics = {}
0243             # add duration
0244             task_metrics.setdefault("queuingPeriods", [])
0245             if len(task_metrics["queuingPeriods"]) < 10000:
0246                 task_metrics["queuingPeriods"].append({"start": queued_time, "end": naive_utcnow(), "status": task_status})
0247                 tmp_success = self.set_workload_metrics(jedi_task_id, None, task_metrics, False)
0248                 if not tmp_success:
0249                     err_str = f"failed to update task metrics for jediTaskId={jedi_task_id}"
0250                     tmp_log.error(err_str)
0251                     return
0252             else:
0253                 tmp_log.debug("skipped since queuing period list is too long")
0254         tmp_log.debug(f"done in {task_status}")
0255 
0256     # record job queuing period
0257     def record_job_queuing_period(self, panda_id: int, job_spec: JobSpec = None) -> bool | None:
0258         """
0259         Record queuing period in job metrics. Skip if job.jobMetrics doesn't contain task queued time
0260 
0261         :param panda_id: Job's PandaID
0262         :param job_spec: job spec. None to get it from the database
0263         :return: True if success. False if failed. None if skipped
0264         """
0265         comment = " /* DBProxy.record_job_queuing_period */"
0266         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id}")
0267         tmp_log.debug(f"start with job spec: {job_spec is None}")
0268         # get task queued time
0269         if job_spec is None:
0270             sql_check = f"SELECT jediTaskID,jobStatus,specialHandling FROM {panda_config.schemaPANDA}.jobsActive4 WHERE PandaID=:PandaID "
0271             var_map = {":PandaID": panda_id}
0272             self.cur.execute(sql_check + comment, var_map)
0273             res = self.cur.fetchone()
0274             if not res:
0275                 # not found
0276                 tmp_log.debug("job not found")
0277                 return None
0278             jedi_task_id, job_status, tmp_str = res
0279         else:
0280             jedi_task_id = job_spec.jediTaskID
0281             job_status = job_spec.jobStatus
0282             tmp_str = job_spec.specialHandling
0283         task_queued_time = get_task_queued_time(tmp_str)
0284         # record queuing duration
0285         if jedi_task_id and task_queued_time:
0286             tmp_log.debug(f"to record queuing period")
0287             # get job metrics dict
0288             tmp_success, job_metrics = self.get_workload_metrics(jedi_task_id, panda_id)
0289             if not tmp_success:
0290                 err_str = "Failed to get job metrics "
0291                 tmp_log.error(err_str)
0292                 return False
0293             # new
0294             if not job_metrics:
0295                 job_metrics = {}
0296             # add duration
0297             if "queuingPeriod" in job_metrics:
0298                 tmp_log.debug("skipped since queuing period already exists")
0299                 return None
0300             else:
0301                 job_metrics["queuingPeriod"] = {
0302                     "start": task_queued_time,
0303                     "end": naive_utcnow(),
0304                     "status": job_status,
0305                 }
0306                 tmp_success = self.set_workload_metrics(jedi_task_id, panda_id, job_metrics, False)
0307                 if not tmp_success:
0308                     err_str = "Failed to update job metrics"
0309                     tmp_log.error(err_str)
0310                     return False
0311             tmp_log.debug(f"done in {job_status}")
0312             return True
0313         else:
0314             tmp_log.debug(f"skipped as jediTaskID={jedi_task_id} taskQueuedTime={task_queued_time}")
0315             return None
0316 
0317     # unset task active period
0318     def unset_task_activated_time(self, jedi_task_id: int, task_status: str = None) -> bool | None:
0319         """
0320         Unset activated time and record active period in task metrics
0321 
0322         :param jedi_task_id: task's JediTaskID
0323         :param task_status: task status. None to get it from the database
0324         :return: True if success. False if failed. None if skipped
0325         """
0326         frozen_status_list = ("done", "finished", "failed", "broken", "paused", "exhausted")
0327         if task_status is not None and task_status not in frozen_status_list:
0328             return None
0329         comment = " /* DBProxy.record_task_active_period */"
0330         tmp_log = self.create_tagged_logger(comment, f"JediTaskID={jedi_task_id}")
0331         tmp_log.debug(f"start")
0332         # get activated time
0333         sql_check = f"SELECT status,activatedTime FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID "
0334         var_map = {":jediTaskID": jedi_task_id}
0335         self.cur.execute(sql_check + comment, var_map)
0336         res = self.cur.fetchone()
0337         if not res:
0338             # not found
0339             tmp_log.debug("task not found")
0340             return None
0341         task_status, activated_time = res
0342         # record active duration
0343         if task_status in frozen_status_list and activated_time:
0344             tmp_log.debug(f"to record active period when task is {task_status}")
0345             # get task metrics dict
0346             tmp_success, task_metrics = self.get_workload_metrics(jedi_task_id, None)
0347             if not tmp_success:
0348                 err_str = "Failed to get task metrics "
0349                 tmp_log.error(err_str)
0350                 return False
0351             # new
0352             if not task_metrics:
0353                 task_metrics = {}
0354             # add duration
0355             task_metrics.setdefault("activePeriod", [])
0356             task_metrics["activePeriod"].append(
0357                 {
0358                     "start": activated_time,
0359                     "end": naive_utcnow(),
0360                     "status": task_status,
0361                 }
0362             )
0363             tmp_success = self.set_workload_metrics(jedi_task_id, None, task_metrics, False)
0364             if not tmp_success:
0365                 err_str = "Failed to update task metrics"
0366                 tmp_log.error(err_str)
0367                 return False
0368             # unset activated time
0369             tmp_log.debug(f"unset activated time")
0370             sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET activatedTime=NULL WHERE jediTaskID=:jediTaskID AND activatedTime IS NOT NULL "
0371             var_map = {":jediTaskID": jedi_task_id}
0372             self.cur.execute(sql_update + comment, var_map)
0373         tmp_log.debug(f"done in {task_status}")
0374         return True
0375 
0376     # check failure count due to corrupted files
0377     def checkFailureCountWithCorruptedFiles(self, jediTaskID, pandaID):
0378         comment = " /* DBProxy.checkFailureCountWithCorruptedFiles */"
0379         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID} pandaID={pandaID}")
0380         # sql to failure counts
0381         sqlBD = "SELECT f2.lfn,COUNT(*) FROM ATLAS_PANDA.filesTable4 f1, ATLAS_PANDA.filesTable4 f2 "
0382         sqlBD += "WHERE f1.PandaID=:PandaID AND f1.type=:type AND f1.status=:status "
0383         sqlBD += "AND f2.lfn=f1.lfn AND f2.type=:type AND f2.status=:status AND f2.jediTaskID=:jediTaskID "
0384         sqlBD += "GROUP BY f2.lfn "
0385         varMap = {}
0386         varMap[":jediTaskID"] = jediTaskID
0387         varMap[":PandaID"] = pandaID
0388         varMap[":status"] = "corrupted"
0389         varMap[":type"] = "zipinput"
0390         self.cur.execute(sqlBD + comment, varMap)
0391         resBD = self.cur.fetchall()
0392         tooMany = False
0393         for lfn, nFailed in resBD:
0394             tmp_log.debug(f"{nFailed} failures with {lfn}")
0395             if nFailed >= 3:
0396                 tooMany = True
0397         tmp_log.debug(f"too many failures : {tooMany}")
0398         return tooMany
0399 
0400     # calculate failure metrics, such as single failure rate and failed HEPScore, for a task
0401     def get_task_failure_metrics(self, task_id, use_commit=True):
0402         comment = " /* JediDBProxy.get_task_failure_metrics */"
0403         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={task_id}")
0404         tmp_log.debug("start")
0405         try:
0406             # start transaction
0407             if use_commit:
0408                 self.conn.begin()
0409             # sql to get metrics
0410             sql = (
0411                 f"SELECT SUM(is_finished),SUM(is_failed),SUM(HS06SEC*is_finished),SUM(HS06SEC*is_failed) "
0412                 f"FROM ("
0413                 f"SELECT PandaID, HS06SEC, CASE WHEN jobStatus='finished' THEN 1 ELSE 0 END is_finished, "
0414                 f"CASE WHEN jobStatus='failed' THEN 1 ELSE 0 END is_failed "
0415                 f"FROM {panda_config.schemaPANDA}.jobsArchived4 "
0416                 f"WHERE jediTaskID=:jediTaskID AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) "
0417                 f"UNION "
0418                 f"SELECT PandaID, HS06SEC, CASE WHEN jobStatus='finished' THEN 1 ELSE 0 END is_finished, "
0419                 f"CASE WHEN jobStatus='failed' THEN 1 ELSE 0 END is_failed "
0420                 f"FROM {panda_config.schemaPANDAARCH}.jobsArchived "
0421                 f"WHERE jediTaskID=:jediTaskID AND prodSourceLabel IN (:prodSourceLabel1,:prodSourceLabel2) "
0422                 f")"
0423             )
0424             var_map = {
0425                 ":jediTaskID": task_id,
0426                 ":prodSourceLabel1": "managed",
0427                 ":prodSourceLabel2": "user",
0428             }
0429             self.cur.execute(sql + comment, var_map)
0430             num_finished, num_failed, good_hep_score_sec, bad_hep_score_sec = self.cur.fetchone()
0431             ret_dict = {
0432                 "num_failed": num_failed,
0433                 "single_failure_rate": (
0434                     round(num_failed / (num_finished + num_failed), 3)
0435                     if num_finished is not None and num_failed is not None and num_finished + num_failed
0436                     else None
0437                 ),
0438                 "failed_hep_score_hour": int(bad_hep_score_sec / 60 / 60) if bad_hep_score_sec is not None else None,
0439                 "failed_hep_score_ratio": (
0440                     round(bad_hep_score_sec / (good_hep_score_sec + bad_hep_score_sec), 3)
0441                     if good_hep_score_sec is not None and bad_hep_score_sec is not None and good_hep_score_sec + bad_hep_score_sec
0442                     else None
0443                 ),
0444             }
0445             # commit
0446             if use_commit:
0447                 if not self._commit():
0448                     raise RuntimeError("Commit error")
0449             # return
0450             tmp_log.debug(f"got {ret_dict}")
0451             return ret_dict
0452         except Exception:
0453             # roll back
0454             if use_commit:
0455                 self._rollback()
0456             # error
0457             self.dump_error_message(tmp_log)
0458             return None
0459 
0460     # get averaged disk IO
0461     def getAvgDiskIO_JEDI(self):
0462         comment = " /* JediDBProxy.getAvgDiskIO_JEDI */"
0463         tmp_log = self.create_tagged_logger(comment)
0464         tmp_log.debug("start")
0465         try:
0466             # sql
0467             sql = f"SELECT sum(prorated_diskio_avg * njobs) / sum(njobs), computingSite FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS "
0468             sql += "WHERE jobStatus=:jobStatus GROUP BY computingSite "
0469             var_map = dict()
0470             var_map[":jobStatus"] = "running"
0471             # begin transaction
0472             self.conn.begin()
0473             self.cur.execute(sql + comment, var_map)
0474             resFL = self.cur.fetchall()
0475             # commit
0476             if not self._commit():
0477                 raise RuntimeError("Commit error")
0478             ret_map = dict()
0479             for avg, computing_site in resFL:
0480                 if avg:
0481                     avg = float(avg)
0482                 ret_map[computing_site] = avg
0483             tmp_log.debug("done")
0484             return ret_map
0485         except Exception:
0486             # roll back
0487             self._rollback()
0488             # error
0489             self.dump_error_message(tmp_log)
0490             return {}
0491 
0492     # get carbon footprint for a task, the level has to be 'regional' or 'global'. If misspelled, it defaults to 'global'
0493     def get_task_carbon_footprint(self, jedi_task_id, level):
0494         comment = " /* JediDBProxy.get_task_carbon_footprint */"
0495         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={jedi_task_id} n_files={level}")
0496         tmp_log.debug("start")
0497 
0498         if level == "regional":
0499             gco2_column = "GCO2_REGIONAL"
0500         else:
0501             gco2_column = "GCO2_GLOBAL"
0502 
0503         try:
0504             sql = (
0505                 "SELECT jobstatus, SUM(sum_gco2) FROM ( "
0506                 "  SELECT jobstatus, SUM({gco2_column}) sum_gco2 FROM {active_schema}.jobsarchived4 "
0507                 "  WHERE jeditaskid =:jeditaskid "
0508                 "  GROUP BY jobstatus "
0509                 "  UNION "
0510                 "  SELECT jobstatus, SUM({gco2_column}) sum_gco2 FROM {archive_schema}.jobsarchived "
0511                 "  WHERE jeditaskid =:jeditaskid "
0512                 "  GROUP BY jobstatus)"
0513                 "GROUP BY jobstatus".format(gco2_column=gco2_column, active_schema=panda_config.schemaJEDI, archive_schema=panda_config.schemaPANDAARCH)
0514             )
0515             var_map = {":jeditaskid": jedi_task_id}
0516 
0517             # start transaction
0518             self.conn.begin()
0519             self.cur.execute(sql + comment, var_map)
0520             results = self.cur.fetchall()
0521 
0522             footprint = {"total": 0}
0523             data = False
0524             for job_status, g_co2 in results:
0525                 if not g_co2:
0526                     g_co2 = 0
0527                 else:
0528                     data = True
0529                 footprint[job_status] = g_co2
0530                 footprint["total"] += g_co2
0531 
0532             # commit
0533             if not self._commit():
0534                 raise RuntimeError("Commit error")
0535 
0536             tmp_log.debug(f"done: {footprint}")
0537 
0538             if not data:
0539                 return None
0540 
0541             return footprint
0542         except Exception:
0543             # roll back
0544             self._rollback()
0545             # error
0546             self.dump_error_message(tmp_log)
0547             return None
0548 
0549     # get job statistics with work queue
0550     def getJobStatisticsWithWorkQueue_JEDI(self, vo, prodSourceLabel, minPriority=None, cloud=None):
0551         comment = " /* DBProxy.getJobStatisticsWithWorkQueue_JEDI */"
0552         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} cloud={cloud}")
0553         tmpLog.debug(f"start minPriority={minPriority}")
0554         sql0 = "SELECT computingSite,cloud,jobStatus,workQueue_ID,COUNT(*) FROM %s "
0555         sql0 += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
0556         if cloud is not None:
0557             sql0 += "AND cloud=:cloud "
0558         tmpPrioMap = {}
0559         if minPriority is not None:
0560             sql0 += "AND currentPriority>=:minPriority "
0561             tmpPrioMap[":minPriority"] = minPriority
0562         sql0 += "GROUP BY computingSite,cloud,prodSourceLabel,jobStatus,workQueue_ID "
0563         sqlMV = sql0
0564         sqlMV = re.sub("COUNT\(\*\)", "SUM(num_of_jobs)", sqlMV)
0565         sqlMV = re.sub("SELECT ", "SELECT /*+ RESULT_CACHE */ ", sqlMV)
0566         tables = [f"{panda_config.schemaPANDA}.jobsActive4", f"{panda_config.schemaPANDA}.jobsDefined4"]
0567         if minPriority is not None:
0568             # read the number of running jobs with prio<=MIN
0569             tables.append(f"{panda_config.schemaPANDA}.jobsActive4")
0570             sqlMVforRun = re.sub("currentPriority>=", "currentPriority<=", sqlMV)
0571         varMap = {}
0572         varMap[":vo"] = vo
0573         varMap[":prodSourceLabel"] = prodSourceLabel
0574         if cloud is not None:
0575             varMap[":cloud"] = cloud
0576         for tmpPrio in tmpPrioMap.keys():
0577             varMap[tmpPrio] = tmpPrioMap[tmpPrio]
0578         returnMap = {}
0579         try:
0580             iActive = 0
0581             for table in tables:
0582                 # start transaction
0583                 self.conn.begin()
0584                 # select
0585                 self.cur.arraysize = 10000
0586                 useRunning = None
0587                 if table == f"{panda_config.schemaPANDA}.jobsActive4":
0588                     mvTableName = f"{panda_config.schemaPANDA}.MV_JOBSACTIVE4_STATS"
0589                     # first count non-running and then running if minPriority is specified
0590                     if minPriority is not None:
0591                         if iActive == 0:
0592                             useRunning = False
0593                         else:
0594                             useRunning = True
0595                         iActive += 1
0596                     if useRunning in [None, False]:
0597                         sqlExeTmp = (sqlMV + comment) % mvTableName
0598                     else:
0599                         sqlExeTmp = (sqlMVforRun + comment) % mvTableName
0600                 else:
0601                     sqlExeTmp = (sql0 + comment) % table
0602                 self.cur.execute(sqlExeTmp, varMap)
0603                 res = self.cur.fetchall()
0604                 # commit
0605                 if not self._commit():
0606                     raise RuntimeError("Commit error")
0607                 # create map
0608                 for computingSite, cloud, jobStatus, workQueue_ID, nCount in res:
0609                     # count the number of non-running with prio>=MIN
0610                     if useRunning is True and jobStatus != "running":
0611                         continue
0612                     # count the number of running with prio<=MIN
0613                     if useRunning is False and jobStatus == "running":
0614                         continue
0615                     # add site
0616                     if computingSite not in returnMap:
0617                         returnMap[computingSite] = {}
0618                     # add workQueue
0619                     if workQueue_ID not in returnMap[computingSite]:
0620                         returnMap[computingSite][workQueue_ID] = {}
0621                     # add jobstatus
0622                     if jobStatus not in returnMap[computingSite][workQueue_ID]:
0623                         returnMap[computingSite][workQueue_ID][jobStatus] = 0
0624                     # add
0625                     returnMap[computingSite][workQueue_ID][jobStatus] += nCount
0626             # return
0627             tmpLog.debug("done")
0628             return True, returnMap
0629         except Exception:
0630             # roll back
0631             self._rollback()
0632             # error
0633             self.dump_error_message(tmpLog)
0634             return False, {}
0635 
0636     # get core statistics with VO and prodSourceLabel
0637     def get_core_statistics(self, vo: str, prod_source_label: str) -> [bool, dict]:
0638         comment = " /* DBProxy.get_core_statistics */"
0639         tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prod_source_label}")
0640         tmpLog.debug("start")
0641         sql0 = f"SELECT /*+ RESULT_CACHE */ computingSite,jobStatus,SUM(num_of_cores) FROM {panda_config.schemaPANDA}.MV_JOBSACTIVE4_STATS "
0642         sql0 += "WHERE vo=:vo AND prodSourceLabel=:prodSourceLabel "
0643         sql0 += "GROUP BY computingSite,cloud,prodSourceLabel,jobStatus "
0644         var_map = {":vo": vo, ":prodSourceLabel": prod_source_label}
0645         return_map = {}
0646         try:
0647             self.conn.begin()
0648             # select
0649             self.cur.arraysize = 10000
0650             self.cur.execute(sql0 + comment, var_map)
0651             res = self.cur.fetchall()
0652             # commit
0653             if not self._commit():
0654                 raise RuntimeError("Commit error")
0655             # create map
0656             for computing_site, job_status, n_core in res:
0657                 # add site
0658                 return_map.setdefault(computing_site, {})
0659                 # add status
0660                 return_map[computing_site].setdefault(job_status, 0)
0661                 # add num cores
0662                 return_map[computing_site][job_status] += n_core
0663             # return
0664             tmpLog.debug("done")
0665             return True, return_map
0666         except Exception:
0667             # roll back
0668             self._rollback()
0669             # error
0670             self.dump_error_message(tmpLog)
0671             return False, {}
0672 
0673     # get job statistics by global share
0674     def getJobStatisticsByGlobalShare(self, vo, exclude_rwq):
0675         """
0676         :param vo: Virtual Organization
0677         :param exclude_rwq: True/False. Indicates whether we want to indicate special workqueues from the statistics
0678         """
0679         comment = " /* DBProxy.getJobStatisticsByGlobalShare */"
0680         tmpLog = self.create_tagged_logger(comment, f" vo={vo}")
0681         tmpLog.debug("start")
0682 
0683         # define the var map of query parameters
0684         var_map = {":vo": vo}
0685 
0686         # sql to query on pre-cached job statistics tables (JOBS_SHARE_STATS and JOBSDEFINED_SHARE_STATS)
0687         sql_jt = """
0688                SELECT /*+ RESULT_CACHE */ computingSite, jobStatus, gShare, SUM(njobs) FROM %s
0689                WHERE vo=:vo
0690                """
0691 
0692         if exclude_rwq:
0693             sql_jt += f"""
0694                AND workqueue_id NOT IN
0695                (SELECT queue_id FROM {panda_config.schemaPANDA}.jedi_work_queue WHERE queue_function = 'Resource')
0696                """
0697 
0698         sql_jt += """
0699                GROUP BY computingSite, jobStatus, gshare
0700                """
0701 
0702         tables = [f"{panda_config.schemaPANDA}.JOBS_SHARE_STATS", f"{panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS"]
0703 
0704         return_map = {}
0705         try:
0706             for table in tables:
0707                 self.cur.arraysize = 10000
0708                 sql_exe = (sql_jt + comment) % table
0709                 self.cur.execute(sql_exe, var_map)
0710                 res = self.cur.fetchall()
0711 
0712                 # create map
0713                 for panda_site, status, gshare, n_count in res:
0714                     # add site
0715                     return_map.setdefault(panda_site, {})
0716                     # add global share
0717                     return_map[panda_site].setdefault(gshare, {})
0718                     # add job status
0719                     return_map[panda_site][gshare].setdefault(status, 0)
0720                     # increase count
0721                     return_map[panda_site][gshare][status] += n_count
0722 
0723             tmpLog.debug("done")
0724             return True, return_map
0725         except Exception:
0726             self.dump_error_message(tmpLog)
0727             return False, {}
0728 
0729     def getJobStatisticsByResourceType(self, workqueue):
0730         """
0731         This function will return the job statistics for a particular workqueue, broken down by resource type
0732         (SCORE, MCORE, etc.)
0733         :param workqueue: workqueue object
0734         """
0735         comment = " /* DBProxy.getJobStatisticsByResourceType */"
0736         tmpLog = self.create_tagged_logger(comment, f"workqueue={workqueue}")
0737         tmpLog.debug("start")
0738 
0739         # define the var map of query parameters
0740         var_map = {":vo": workqueue.VO}
0741 
0742         # sql to query on pre-cached job statistics tables (JOBS_SHARE_STATS and JOBSDEFINED_SHARE_STATS)
0743         sql_jt = "SELECT /*+ RESULT_CACHE */ jobstatus, resource_type, SUM(njobs) FROM %s WHERE vo=:vo "
0744 
0745         if workqueue.is_global_share:
0746             sql_jt += "AND gshare=:gshare "
0747             sql_jt += f"AND workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaPANDA}.jedi_work_queue WHERE queue_function = 'Resource') "
0748             var_map[":gshare"] = workqueue.queue_name
0749         else:
0750             sql_jt += "AND workqueue_id=:workqueue_id "
0751             var_map[":workqueue_id"] = workqueue.queue_id
0752 
0753         sql_jt += "GROUP BY jobstatus, resource_type "
0754 
0755         tables = [f"{panda_config.schemaPANDA}.JOBS_SHARE_STATS", f"{panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS"]
0756 
0757         return_map = {}
0758         try:
0759             for table in tables:
0760                 self.cur.arraysize = 10000
0761                 sql_exe = (sql_jt + comment) % table
0762                 self.cur.execute(sql_exe, var_map)
0763                 res = self.cur.fetchall()
0764 
0765                 # create map
0766                 for status, resource_type, n_count in res:
0767                     return_map.setdefault(status, {})
0768                     return_map[status][resource_type] = n_count
0769 
0770             tmpLog.debug("done")
0771             return True, return_map
0772         except Exception:
0773             self.dump_error_message(tmpLog)
0774             return False, {}
0775 
0776     def getJobStatisticsByResourceTypeSite(self, workqueue):
0777         """
0778         This function will return the job statistics per site for a particular workqueue, broken down by resource type
0779         (SCORE, MCORE, etc.)
0780         :param workqueue: workqueue object
0781         """
0782         comment = " /* DBProxy.getJobStatisticsByResourceTypeSite */"
0783         tmpLog = self.create_tagged_logger(comment, f"workqueue={workqueue}")
0784         tmpLog.debug("start")
0785 
0786         # define the var map of query parameters
0787         var_map = {":vo": workqueue.VO}
0788 
0789         # sql to query on pre-cached job statistics tables (JOBS_SHARE_STATS and JOBSDEFINED_SHARE_STATS)
0790         sql_jt = "SELECT /*+ RESULT_CACHE */ jobstatus, resource_type, computingSite, SUM(njobs) FROM %s WHERE vo=:vo "
0791 
0792         if workqueue.is_global_share:
0793             sql_jt += "AND gshare=:gshare "
0794             sql_jt += f"AND workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaPANDA}.jedi_work_queue WHERE queue_function = 'Resource') "
0795             var_map[":gshare"] = workqueue.queue_name
0796         else:
0797             sql_jt += "AND workqueue_id=:workqueue_id "
0798             var_map[":workqueue_id"] = workqueue.queue_id
0799 
0800         sql_jt += "GROUP BY jobstatus, resource_type, computingSite "
0801 
0802         tables = [f"{panda_config.schemaPANDA}.JOBS_SHARE_STATS", f"{panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS"]
0803 
0804         return_map = {}
0805         try:
0806             for table in tables:
0807                 self.cur.arraysize = 10000
0808                 sql_exe = (sql_jt + comment) % table
0809                 self.cur.execute(sql_exe, var_map)
0810                 res = self.cur.fetchall()
0811 
0812                 # create map
0813                 for status, resource_type, computingSite, n_count in res:
0814                     return_map.setdefault(computingSite, {})
0815                     return_map[computingSite].setdefault(resource_type, {})
0816                     return_map[computingSite][resource_type][status] = n_count
0817 
0818             tmpLog.debug("done")
0819             return True, return_map
0820         except Exception:
0821             self.dump_error_message(tmpLog)
0822             return False, {}
0823 
0824     # gets statistics on the number of jobs with a specific status for each nucleus at each site
0825     def get_num_jobs_with_status_by_nucleus(self, vo: str, job_status: str) -> [bool, Dict[str, Dict[str, int]]]:
0826         """
0827         This function will return the number of jobs with a specific status for each nucleus at each site.
0828 
0829         :param vo: Virtual Organization
0830         :param job_status: Job status
0831         :return: True/False and Dictionary with the number of jobs with a specific status for each nucleus at each site
0832         """
0833         comment = " /* DBProxy.get_num_jobs_with_status_by_nucleus */"
0834         tmp_log = self.create_tagged_logger(comment, f" vo={vo} status={job_status}")
0835         tmp_log.debug("start")
0836 
0837         # define the var map of query parameters
0838         var_map = {":vo": vo, ":job_status": job_status}
0839 
0840         # sql to query on pre-cached job statistics table
0841         sql_jt = """
0842                SELECT /*+ RESULT_CACHE */ computingSite, nucleus, SUM(njobs) FROM %s
0843                WHERE vo=:vo AND jobStatus=:job_status GROUP BY computingSite, nucleus
0844                """
0845 
0846         if job_status in ["transferring", "running", "activated" "holding"]:
0847             table = f"{panda_config.schemaPANDA}.JOBS_SHARE_STATS"
0848         else:
0849             table = f"{panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS"
0850 
0851         return_map = {}
0852         try:
0853             self.cur.arraysize = 10000
0854             sql_exe = (sql_jt + comment) % table
0855             self.cur.execute(sql_exe, var_map)
0856             res = self.cur.fetchall()
0857 
0858             # create a dict
0859             for panda_site, nucleus, n_jobs in res:
0860                 if n_jobs:
0861                     # add site
0862                     return_map.setdefault(panda_site, {})
0863                     # add stat per nucleus
0864                     return_map[panda_site][nucleus] = n_jobs
0865             tmp_log.debug("done")
0866             return True, return_map
0867         except Exception:
0868             self.dump_error_message(tmp_log)
0869             return False, {}
0870 
0871 
0872 # get metrics module
0873 def get_metrics_module(base_mod) -> MetricsModule:
0874     return base_mod.get_composite_module("metrics")