Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:03

0001 import datetime
0002 import json
0003 import os
0004 import random
0005 import re
0006 import time
0007 import traceback
0008 import uuid
0009 
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0012 
0013 from pandaserver.config import panda_config
0014 from pandaserver.srvcore import CoreUtils
0015 from pandaserver.taskbuffer import (
0016     EventServiceUtils,
0017     GlobalShares,
0018     JobUtils,
0019     PrioUtil,
0020     SiteSpec,
0021 )
0022 from pandaserver.taskbuffer.db_proxy_mods.base_module import (
0023     BaseModule,
0024     convert_dict_to_bind_vars,
0025     memoize,
0026 )
0027 from pandaserver.taskbuffer.DdmSpec import DdmSpec
0028 from pandaserver.taskbuffer.JediDatasetSpec import (
0029     INPUT_TYPES_var_map,
0030     INPUT_TYPES_var_str,
0031 )
0032 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0033 from pandaserver.taskbuffer.ResourceSpec import ResourceSpec, ResourceSpecMapper
0034 from pandaserver.taskbuffer.Utils import create_shards
0035 from pandaserver.taskbuffer.WorkQueueMapper import WorkQueueMapper
0036 
0037 
0038 # Module class to define methods related to fundamental entities like GlobalShares, Resource Types, CO2, etc
0039 class EntityModule(BaseModule):
0040     # constructor
0041     def __init__(self, log_stream: LogWrapper):
0042         super().__init__(log_stream)
0043         # global share variables
0044         self.tree = None  # Pointer to the root of the global shares tree
0045         self.leave_shares = None  # Pointer to the list with leave shares
0046         self.__t_update_shares = None  # Timestamp when the shares were last updated
0047         self.__hs_distribution = None  # HS06s distribution of sites
0048         self.__t_update_distribution = None  # Timestamp when the HS06s distribution was last updated
0049 
0050         # resource type mapper
0051         # if you want to use it, you need to call reload_resource_spec_mapper first
0052         self.resource_spec_mapper = None
0053         self.__t_update_resource_type_mapper = None
0054 
0055         # priority boost
0056         self.job_prio_boost_dict = None
0057         self.job_prio_boost_dict_update_time = None
0058 
0059     def __get_hs_leave_distribution(self, leave_shares):
0060         """
0061         Get the current HS06 distribution for running and queued jobs
0062         """
0063         comment = " /* DBProxy.get_hs_leave_distribution */"
0064 
0065         sql_hs_distribution = """
0066             SELECT gshare, jobstatus_grouped, SUM(HS)
0067             FROM
0068                 (SELECT gshare, HS,
0069                      CASE
0070                          WHEN jobstatus IN('activated') THEN 'queued'
0071                          WHEN jobstatus IN('sent', 'running') THEN 'executing'
0072                          ELSE 'ignore'
0073                      END jobstatus_grouped
0074                  FROM ATLAS_PANDA.JOBS_SHARE_STATS JSS) a
0075             GROUP BY gshare, jobstatus_grouped
0076             """
0077 
0078         self.cur.execute(sql_hs_distribution + comment)
0079         hs_distribution_raw = self.cur.fetchall()
0080 
0081         # get the hs distribution data into a dictionary structure
0082         hs_distribution_dict = {}
0083         hs_queued_total = 0
0084         hs_executing_total = 0
0085         hs_ignore_total = 0
0086         for hs_entry in hs_distribution_raw:
0087             gshare, status_group, hs = hs_entry
0088             if hs is None:
0089                 continue
0090             hs = float(hs)
0091             hs_distribution_dict.setdefault(
0092                 gshare,
0093                 {
0094                     GlobalShares.PLEDGED: 0,
0095                     GlobalShares.QUEUED: 0,
0096                     GlobalShares.EXECUTING: 0,
0097                 },
0098             )
0099             hs_distribution_dict[gshare][status_group] = hs
0100             # calculate totals
0101             if status_group == GlobalShares.QUEUED:
0102                 hs_queued_total += hs
0103             elif status_group == GlobalShares.EXECUTING:
0104                 hs_executing_total += hs
0105             else:
0106                 hs_ignore_total += hs
0107 
0108         # Calculate the ideal HS06 distribution based on shares.
0109         for share_node in leave_shares:
0110             share_name, share_value = share_node.name, share_node.value
0111             hs_pledged_share = hs_executing_total * share_value / 100.0
0112 
0113             hs_distribution_dict.setdefault(
0114                 share_name,
0115                 {
0116                     GlobalShares.PLEDGED: 0,
0117                     GlobalShares.QUEUED: 0,
0118                     GlobalShares.EXECUTING: 0,
0119                 },
0120             )
0121             # Pledged HS according to global share definitions
0122             hs_distribution_dict[share_name]["pledged"] = hs_pledged_share
0123         return hs_distribution_dict
0124 
0125     # retrieve global shares
0126     def get_shares(self, parents=""):
0127         comment = " /* DBProxy.get_shares */"
0128         tmp_log = self.create_tagged_logger(comment)
0129         tmp_log.debug("start")
0130 
0131         sql = """
0132                SELECT NAME, VALUE, PARENT, PRODSOURCELABEL, WORKINGGROUP, CAMPAIGN, PROCESSINGTYPE, TRANSPATH, RTYPE,
0133                VO, QUEUE_ID, THROTTLED
0134                FROM ATLAS_PANDA.GLOBAL_SHARES
0135                """
0136         var_map = None
0137 
0138         if parents == "":
0139             # Get all shares
0140             pass
0141         elif parents is None:
0142             # Get top level shares
0143             sql += "WHERE parent IS NULL"
0144 
0145         elif isinstance(parents, str):
0146             # Get the children of a specific share
0147             var_map = {":parent": parents}
0148             sql += "WHERE parent = :parent"
0149 
0150         elif type(parents) in (list, tuple):
0151             # Get the children of a list of shares
0152             var_map = {}
0153             parent_var_names_str, parent_var_map = get_sql_IN_bind_variables(parents, prefix=":parent")
0154             sql += f"WHERE parent IN ({parent_var_names_str})"
0155             var_map.update(parent_var_map)
0156 
0157         self.cur.execute(sql + comment, var_map)
0158         resList = self.cur.fetchall()
0159 
0160         tmp_log.debug("done")
0161         return resList
0162 
0163     def reload_shares(self, force=False):
0164         """
0165         Reloads the shares from the DB and recalculates distributions
0166         """
0167         comment = " /* DBProxy.reload_shares */"
0168         # Don't reload shares every time
0169         if (self.__t_update_shares is not None and self.__t_update_shares > datetime.datetime.now() - datetime.timedelta(hours=1)) or force:
0170             return
0171 
0172         tmp_log = self.create_tagged_logger(comment)
0173 
0174         # Root dummy node
0175         t_before = time.time()
0176         tree = GlobalShares.Share("root", 100, None, None, None, None, None, None, None, None, None, None)
0177         t_after = time.time()
0178         total = t_after - t_before
0179         tmp_log.debug(f"Root dummy tree took {total}s")
0180 
0181         # Get top level shares from DB
0182         t_before = time.time()
0183         shares_top_level = self.get_shares(parents=None)
0184         t_after = time.time()
0185         total = t_after - t_before
0186         tmp_log.debug(f"Getting shares took {total}s")
0187 
0188         # Load branches
0189         t_before = time.time()
0190         for (
0191             name,
0192             value,
0193             parent,
0194             prodsourcelabel,
0195             workinggroup,
0196             campaign,
0197             processingtype,
0198             transpath,
0199             rtype,
0200             vo,
0201             queue_id,
0202             throttled,
0203         ) in shares_top_level:
0204             share = GlobalShares.Share(
0205                 name,
0206                 value,
0207                 parent,
0208                 prodsourcelabel,
0209                 workinggroup,
0210                 campaign,
0211                 processingtype,
0212                 transpath,
0213                 rtype,
0214                 vo,
0215                 queue_id,
0216                 throttled,
0217             )
0218             tree.children.append(self.__load_branch(share))
0219         t_after = time.time()
0220         total = t_after - t_before
0221         tmp_log.debug(f"Loading the branches took {total}s")
0222 
0223         # Normalize the values in the database
0224         t_before = time.time()
0225         tree.normalize()
0226         t_after = time.time()
0227         total = t_after - t_before
0228         tmp_log.debug(f"Normalizing the values took {total}s")
0229 
0230         # get the leave shares (the ones not having more children)
0231         t_before = time.time()
0232         leave_shares = tree.get_leaves()
0233         t_after = time.time()
0234         total = t_after - t_before
0235         tmp_log.debug(f"Getting the leaves took {total}s")
0236 
0237         self.leave_shares = leave_shares
0238         self.__t_update_shares = datetime.datetime.now()
0239 
0240         # get the distribution of shares
0241         t_before = time.time()
0242         # Retrieve the current HS06 distribution of jobs from the database and then aggregate recursively up to the root
0243         hs_distribution = self.__get_hs_leave_distribution(leave_shares)
0244         tree.aggregate_hs_distribution(hs_distribution)
0245         t_after = time.time()
0246         total = t_after - t_before
0247         tmp_log.debug(f"Aggregating the hs distribution took {total}s")
0248 
0249         self.tree = tree
0250         self.__hs_distribution = hs_distribution
0251         self.__t_update_distribution = datetime.datetime.now()
0252         return
0253 
0254     def __reload_hs_distribution(self):
0255         """
0256         Reloads the HS distribution
0257         """
0258         comment = " /* DBProxy.__reload_hs_distribution */"
0259         tmp_log = self.create_tagged_logger(comment)
0260         tmp_log.debug(self.__t_update_distribution)
0261         tmp_log.debug(self.__hs_distribution)
0262         # Reload HS06s distribution every 10 seconds
0263         if self.__t_update_distribution is not None and self.__t_update_distribution > datetime.datetime.now() - datetime.timedelta(seconds=10):
0264             tmp_log.debug("release")
0265             return
0266 
0267         # Retrieve the current HS06 distribution of jobs from the database and then aggregate recursively up to the root
0268         tmp_log.debug("get dist")
0269         t_before = time.time()
0270         hs_distribution = self.__get_hs_leave_distribution(self.leave_shares)
0271         tmp_log.debug("aggr dist")
0272         self.tree.aggregate_hs_distribution(hs_distribution)
0273         t_after = time.time()
0274         total = t_after - t_before
0275         tmp_log.debug(f"Reloading the hs distribution took {total}s")
0276 
0277         self.__hs_distribution = hs_distribution
0278         self.__t_update_distribution = datetime.datetime.now()
0279 
0280         # log the distribution for debugging purposes
0281         tmp_log.debug(f"Current HS06 distribution is {hs_distribution}")
0282 
0283         return
0284 
0285     def get_sorted_leaves(self):
0286         """
0287         Re-loads the shares, then returns the leaves sorted by under usage
0288         """
0289         self.reload_shares()
0290         self.__reload_hs_distribution()
0291         return self.tree.sort_branch_by_current_hs_distribution(self.__hs_distribution)
0292 
0293     def get_tree_of_gshare_names(self):
0294         """
0295         get nested dict of gshare names implying the tree structure
0296         """
0297 
0298         def get_nested_gshare(share):
0299             val = None
0300             if not share.children:
0301                 # leaf
0302                 pass
0303             else:
0304                 # branch
0305                 val = {}
0306                 for child in share.children:
0307                     val[child.name] = get_nested_gshare(child)
0308             return val
0309 
0310         ret_dict = get_nested_gshare(self.tree)
0311         return ret_dict
0312 
0313     def __load_branch(self, share):
0314         """
0315         Recursively load a branch
0316         """
0317         node = GlobalShares.Share(
0318             share.name,
0319             share.value,
0320             share.parent,
0321             share.prodsourcelabel,
0322             share.workinggroup,
0323             share.campaign,
0324             share.processingtype,
0325             share.transpath,
0326             share.rtype,
0327             share.vo,
0328             share.queue_id,
0329             share.throttled,
0330         )
0331 
0332         children = self.get_shares(parents=share.name)
0333         if not children:
0334             return node
0335 
0336         for (
0337             name,
0338             value,
0339             parent,
0340             prodsourcelabel,
0341             workinggroup,
0342             campaign,
0343             processingtype,
0344             transpath,
0345             rtype,
0346             vo,
0347             queue_id,
0348             throttled,
0349         ) in children:
0350             child = GlobalShares.Share(
0351                 name,
0352                 value,
0353                 parent,
0354                 prodsourcelabel,
0355                 workinggroup,
0356                 campaign,
0357                 processingtype,
0358                 transpath,
0359                 rtype,
0360                 vo,
0361                 queue_id,
0362                 throttled,
0363             )
0364             node.children.append(self.__load_branch(child))
0365 
0366         return node
0367 
0368     def getGShareStatus(self):
0369         """
0370         Generates a list with sorted leave branches
0371         """
0372 
0373         comment = " /* DBProxy.getGShareStatus */"
0374         tmp_log = self.create_tagged_logger(comment)
0375         tmp_log.debug("start")
0376 
0377         self.reload_shares()
0378         self.__reload_hs_distribution()
0379         sorted_shares = self.tree.sort_branch_by_current_hs_distribution(self.__hs_distribution)
0380 
0381         sorted_shares_export = []
0382         for share in sorted_shares:
0383             sorted_shares_export.append(
0384                 {
0385                     "name": share.name,
0386                     "running": self.__hs_distribution[share.name]["executing"],
0387                     "target": self.__hs_distribution[share.name]["pledged"],
0388                     "queuing": self.__hs_distribution[share.name]["queued"],
0389                 }
0390             )
0391         return sorted_shares_export
0392 
0393     def is_valid_share(self, share_name):
0394         """
0395         Checks whether the share is a valid leave share
0396         """
0397         self.reload_shares()
0398         for share in self.leave_shares:
0399             if share_name == share.name:
0400                 # Share found
0401                 return True
0402 
0403         # Share not found
0404         return False
0405 
0406     def reassignShare(self, jedi_task_ids, gshare, reassign_running):
0407         """
0408         Will reassign all tasks and their jobs that have not yet completed to specified share
0409         @param jedi_task_ids: task ids
0410         @param gshare: dest share
0411         @param reassign_running: whether to reassign running jobs
0412         """
0413 
0414         comment = " /* DBProxy.reassignShare */"
0415         tmp_log = self.create_tagged_logger(comment)
0416         tmp_log.debug(f"Start reassigning {jedi_task_ids} to gshare={gshare} and reassign_running={reassign_running}")
0417 
0418         try:
0419             if not self.is_valid_share(gshare):
0420                 error_msg = f"Share {gshare} is not a leave share "
0421                 tmp_log.debug(error_msg)
0422                 ret_val = 1, error_msg
0423                 tmp_log.debug(error_msg)
0424                 return ret_val
0425 
0426             # begin transaction
0427             self.conn.begin()
0428 
0429             # update in shards of 100 task ids
0430             for shard in create_shards(jedi_task_ids, 100):
0431                 # Prepare the bindings
0432                 var_map = {}
0433                 shard_taskid_set = set()
0434                 i = 0
0435                 for _task_id in shard:
0436                     jedi_task_id = int(_task_id)
0437                     var_map[f":jtid{i}"] = jedi_task_id
0438                     i += 1
0439                     shard_taskid_set.add(jedi_task_id)
0440                 jtid_bindings = ",".join(f":jtid{i}" for i in range(len(shard_taskid_set)))
0441 
0442                 # select only tasks without lock
0443                 sql_tasks_not_locked = f"""
0444                        SELECT jediTaskID FROM ATLAS_PANDA.JEDI_Tasks
0445                        WHERE jediTaskID IN ({jtid_bindings}) AND lockedBy IS NULL
0446                        """
0447 
0448                 self.cur.execute(sql_tasks_not_locked + comment, var_map)
0449                 res = self.cur.fetchall()
0450 
0451                 # Update the bindings and prepare var map
0452                 var_map = {":gshare": gshare}
0453                 good_taskid_set = set()
0454                 i = 0
0455                 for (_task_id,) in res:
0456                     jedi_task_id = int(_task_id)
0457                     var_map[f":jtid{i}"] = jedi_task_id
0458                     i += 1
0459                     good_taskid_set.add(jedi_task_id)
0460                 jtid_bindings = ",".join(f":jtid{i}" for i in range(len(good_taskid_set)))
0461                 locked_taskid_set = shard_taskid_set - good_taskid_set
0462                 if locked_taskid_set:
0463                     tmp_log.debug(f"skip locked tasks: {','.join([str(i) for i in locked_taskid_set])}")
0464 
0465                 # Skip if there are no tasks to update
0466                 if not jtid_bindings:
0467                     continue
0468 
0469                 # update the task
0470                 sql_task = f"""
0471                        UPDATE ATLAS_PANDA.jedi_tasks set gshare=:gshare
0472                        WHERE jeditaskid IN ({jtid_bindings})
0473                        """
0474 
0475                 self.cur.execute(sql_task + comment, var_map)
0476                 tmp_log.debug(f"""set tasks {",".join([str(i) for i in good_taskid_set])} to gshare={gshare}""")
0477 
0478                 var_map[":pending"] = "pending"
0479                 var_map[":defined"] = "defined"
0480                 var_map[":assigned"] = "assigned"
0481                 var_map[":waiting"] = "waiting"
0482                 var_map[":activated"] = "activated"
0483                 jobstatus = ":pending, :defined, :assigned, :waiting, :activated"
0484 
0485                 # add running status in case these jobs also need to be reassigned
0486                 if reassign_running:
0487                     jobstatus = f"{jobstatus}, :running, :starting"
0488                     var_map[":running"] = "running"
0489                     var_map[":starting"] = "starting"
0490 
0491                 # update the jobs
0492                 sql_jobs = """
0493                        UPDATE ATLAS_PANDA.{0} set gshare=:gshare
0494                        WHERE jeditaskid IN ({1})
0495                        AND jobstatus IN ({2})
0496                        """
0497 
0498                 for table in ["jobsactive4", "jobsdefined4"]:
0499                     self.cur.execute(
0500                         sql_jobs.format(table, jtid_bindings, jobstatus) + comment,
0501                         var_map,
0502                     )
0503 
0504             # commit
0505             if not self._commit():
0506                 raise RuntimeError("Commit error")
0507 
0508             tmp_log.debug("done")
0509             return 0, None
0510 
0511         except Exception:
0512             # roll back
0513             self._rollback()
0514             # dump error
0515             self.dump_error_message(tmp_log)
0516             return -1, None
0517 
0518     def reload_resource_spec_mapper(self):
0519         # update once per hour only
0520         if self.__t_update_resource_type_mapper and self.__t_update_resource_type_mapper > datetime.datetime.now() - datetime.timedelta(hours=1):
0521             return
0522 
0523         # get the resource types from the DB and make the ResourceSpecMapper object
0524         resource_types = self.load_resource_types(use_commit=False)
0525         if resource_types:
0526             self.resource_spec_mapper = ResourceSpecMapper(resource_types)
0527             self.__t_update_resource_type_mapper = datetime.datetime.now()
0528         return
0529 
0530     def load_resource_types(self, formatting="spec", use_commit=True):
0531         """
0532         Load the resource type table to memory
0533         """
0534         comment = " /* JediDBProxy.load_resource_types */"
0535         tmp_log = self.create_tagged_logger(comment)
0536         tmp_log.debug("start")
0537         try:
0538             sql = f"SELECT {ResourceSpec.column_names()} FROM {panda_config.schemaJEDI}.resource_types "
0539             self.cur.execute(sql + comment)
0540             resource_list = self.cur.fetchall()
0541             resource_spec_list = []
0542             for row in resource_list:
0543                 resource_name, mincore, maxcore, minrampercore, maxrampercore = row
0544                 if formatting == "dict":
0545                     resource_dict = {
0546                         "resource_name": resource_name,
0547                         "mincore": mincore,
0548                         "maxcore": maxcore,
0549                         "minrampercore": minrampercore,
0550                         "maxrampercore": maxrampercore,
0551                     }
0552                     resource_spec_list.append(resource_dict)
0553                 else:
0554                     resource_spec_list.append(
0555                         ResourceSpec(
0556                             resource_name,
0557                             mincore,
0558                             maxcore,
0559                             minrampercore,
0560                             maxrampercore,
0561                         )
0562                     )
0563             # commit for postgres to avoid idle transactions
0564             if use_commit:
0565                 if not self._commit():
0566                     raise RuntimeError("Commit error")
0567             tmp_log.debug("done")
0568             return resource_spec_list
0569         except Exception:
0570             if use_commit:
0571                 self._rollback()
0572             self.dump_error_message(tmp_log)
0573             return []
0574 
0575     def get_resource_type_task(self, task_spec):
0576         """
0577         Identify the resource type of the task based on the resource type map.
0578         Return the name of the resource type
0579         """
0580         comment = " /* JediDBProxy.get_resource_type_task */"
0581         tmp_log = self.create_tagged_logger(comment)
0582         tmp_log.debug("start")
0583 
0584         resource_map = self.load_resource_types()
0585 
0586         for resource_spec in resource_map:
0587             if resource_spec.match_task(task_spec):
0588                 tmp_log.debug(f"done. resource_type is {resource_spec.resource_name}")
0589                 return resource_spec.resource_name
0590 
0591         tmp_log.debug("done. resource_type is Undefined")
0592         return "Undefined"
0593 
0594     def reset_resource_type_task(self, jedi_task_id, use_commit=True):
0595         """
0596         Retrieve the relevant task parameters and reset the resource type
0597         """
0598         comment = " /* JediDBProxy.reset_resource_type */"
0599         tmp_log = self.create_tagged_logger(comment)
0600         tmp_log.debug("start")
0601 
0602         # 1. Get the task parameters
0603         var_map = {":jedi_task_id": jedi_task_id}
0604         sql = f"SELECT corecount, ramcount, baseramcount, ramunit FROM {panda_config.schemaJEDI}.jedi_tasks WHERE jeditaskid = :jedi_task_id "
0605         self.cur.execute(sql + comment, var_map)
0606         corecount, ramcount, baseramcount, ramunit = self.cur.fetchone()
0607         tmp_log.debug(
0608             "retrieved following values for jediTaskid {0}: corecount {1}, ramcount {2}, baseramcount {3}, ramunit {4}".format(
0609                 jedi_task_id, corecount, ramcount, baseramcount, ramunit
0610             )
0611         )
0612 
0613         # 2. Load the resource types and figure out the matching one
0614         resource_map = self.load_resource_types(use_commit=False)
0615         resource_name = "Undefined"
0616         for resource_spec in resource_map:
0617             if resource_spec.match_task_basic(corecount, ramcount, baseramcount, ramunit):
0618                 resource_name = resource_spec.resource_name
0619                 break
0620 
0621         tmp_log.debug(f"decided resource_type {resource_name} jediTaskid {jedi_task_id}")
0622 
0623         # 3. Update the task
0624         try:
0625             var_map = {":jedi_task_id": jedi_task_id, ":resource_type": resource_name}
0626             sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET resource_type = :resource_type WHERE jeditaskid = :jedi_task_id "
0627             tmp_log.debug("conn begin...")
0628             if use_commit:
0629                 self.conn.begin()
0630             tmp_log.debug("execute...")
0631             self.cur.execute(sql + comment, var_map)
0632             tmp_log.debug("commit...")
0633             if use_commit:
0634                 if not self._commit():
0635                     raise RuntimeError("Commit error")
0636                 tmp_log.debug("committed...")
0637         except Exception:
0638             # roll back
0639             if use_commit:
0640                 tmp_log.debug("rolling back...")
0641                 self._rollback()
0642             self.dump_error_message(tmp_log)
0643             tmp_log.error(f"{comment}: {sql} {var_map}")
0644             return False
0645 
0646         tmp_log.debug("done")
0647         return True
0648 
0649     def get_resource_type_job(self, job_spec):
0650         """
0651         Identify the resource type of the job based on the resource type map.
0652         Return the name of the resource type
0653         """
0654         comment = " /* JediDBProxy.get_resource_type_job */"
0655         tmp_log = self.create_tagged_logger(comment)
0656         tmp_log.debug("start")
0657 
0658         resource_map = self.load_resource_types()
0659         tmp_log.debug(
0660             "going to call match_job for pandaid {0} with minRamCount {1} (type{2}) and coreCount {3} (type{4})".format(
0661                 job_spec.PandaID,
0662                 job_spec.minRamCount,
0663                 type(job_spec.minRamCount),
0664                 job_spec.coreCount,
0665                 type(job_spec.coreCount),
0666             )
0667         )
0668         resource_type = JobUtils.get_resource_type_job(resource_map, job_spec)
0669         tmp_log.debug(f"done. resource_type is {resource_type}")
0670         return resource_type
0671 
0672     # get the resource type of a site
0673     def get_rtype_site(self, site):
0674         comment = " /* DBProxy.get_rtype_site */"
0675         tmp_log = self.create_tagged_logger(comment)
0676         tmp_log.debug("start")
0677         try:
0678             var_map = {":site": site}
0679             sql = "SELECT /* use_json_type */ scj.data.resource_type FROM ATLAS_PANDA.schedconfig_json scj WHERE panda_queue=:site "
0680 
0681             # start transaction
0682             self.conn.begin()
0683             self.cur.arraysize = 1
0684             self.cur.execute(sql + comment, var_map)
0685             rtype = self.cur.fetchone()[0]
0686             # commit
0687             if not self._commit():
0688                 raise RuntimeError("Commit error")
0689             tmp_log.debug(f"site {site} has rtype: {rtype} ")
0690             return rtype
0691         except Exception:
0692             # roll back
0693             self._rollback()
0694             # error
0695             self.dump_error_message(tmp_log)
0696             return None
0697 
0698     def compare_share_task(self, share, task):
0699         """
0700         Logic to compare the relevant fields of share and task.
0701         Return: False if some condition does NOT match. True if all conditions match.
0702         """
0703         if share.prodsourcelabel is not None and task.prodSourceLabel is not None and re.match(share.prodsourcelabel, task.prodSourceLabel) is None:
0704             return False
0705 
0706         working_group = task.workingGroup
0707         if working_group is None:
0708             working_group = " "
0709 
0710         if share.workinggroup is not None and working_group is not None and re.match(share.workinggroup, working_group) is None:
0711             return False
0712 
0713         if share.campaign is not None and task.campaign and re.match(share.campaign, task.campaign) is None:
0714             return False
0715 
0716         if share.processingtype is not None and task.processingType is not None and re.match(share.processingtype, task.processingType) is None:
0717             return False
0718 
0719         if share.transpath is not None and task.transPath is not None and re.match(share.transpath, task.transPath) is None:
0720             return False
0721 
0722         if share.rtype is not None and task.site is not None:
0723             try:
0724                 site = task.site.split(",")[0]  # if task assigned to more than one site, take the first one
0725                 rtype_site = self.get_rtype_site(site)
0726                 if rtype_site and re.match(share.rtype, rtype_site) is None:
0727                     return False
0728             except Exception:
0729                 return False
0730 
0731         return True
0732 
0733     def compare_share_job(self, share, job):
0734         """
0735         Logic to compare the relevant fields of share and job. It's basically the same as compare_share_task, but
0736         does not check for the campaign field, which is not part of the job
0737         """
0738 
0739         if share.prodsourcelabel is not None and re.match(share.prodsourcelabel, job.prodSourceLabel) is None:
0740             return False
0741 
0742         if share.workinggroup is not None and re.match(share.workinggroup, job.workingGroup) is None:
0743             return False
0744 
0745         if share.processingtype is not None and re.match(share.processingtype, job.processingType) is None:
0746             return False
0747 
0748         if share.rtype is not None and job.computingSite is not None:
0749             try:
0750                 site = job.computingSite.split(",")[0]
0751                 rtype_site = self.get_rtype_site(site)
0752                 if re.match(share.rtype, rtype_site) is None:
0753                     return False
0754             except Exception:
0755                 return False
0756 
0757         return True
0758 
0759     def get_share_for_task(self, task):
0760         """
0761         Return the share based on a task specification
0762         """
0763         self.reload_shares()
0764         selected_share_name = "Undefined"
0765 
0766         for share in self.leave_shares:
0767             if self.compare_share_task(share, task):
0768                 selected_share_name = share.name
0769                 break
0770 
0771         if selected_share_name == "Undefined":
0772             self._log_stream.warning(
0773                 "No share matching jediTaskId={0} (prodSourceLabel={1} workingGroup={2} campaign={3} transpath={4} site={5})".format(
0774                     task.jediTaskID,
0775                     task.prodSourceLabel,
0776                     task.workingGroup,
0777                     task.campaign,
0778                     task.transPath,
0779                     task.site,
0780                 )
0781             )
0782 
0783         return selected_share_name
0784 
0785     def get_share_for_job(self, job):
0786         """
0787         Return the share based on a job specification
0788         """
0789         # special case: esmerge jobs go to Express share
0790         if job.eventService == EventServiceUtils.esMergeJobFlagNumber:
0791             return "Express"
0792 
0793         self.reload_shares()
0794         selected_share_name = "Undefined"
0795 
0796         for share in self.leave_shares:
0797             if self.compare_share_job(share, job):
0798                 selected_share_name = share.name
0799                 break
0800 
0801         if selected_share_name == "Undefined":
0802             self._log_stream.warning(f"No share matching PandaID={job.PandaID} (prodSourceLabel={job.prodSourceLabel} workingGroup={job.workingGroup})")
0803 
0804         return selected_share_name
0805 
0806     # get dispatch sorting criteria
0807     def getSortingCriteria(self, site_name, max_jobs):
0808         comment = " /* DBProxy.getSortingCriteria */"
0809         tmp_log = self.create_tagged_logger(comment)
0810         # throw the dice to decide the algorithm
0811         random_number = random.randrange(100)
0812 
0813         sloppy_ratio = self.getConfigValue("jobdispatch", "SLOPPY_DISPATCH_RATIO")
0814         if not sloppy_ratio:
0815             sloppy_ratio = 10
0816 
0817         tmp_log.debug(f"random_number: {random_number} sloppy_ratio: {sloppy_ratio}")
0818 
0819         if random_number <= sloppy_ratio:
0820             # generate the age sorting
0821             tmp_log.debug(f"sorting by age")
0822             return self.getCriteriaByAge(site_name, max_jobs)
0823         else:
0824             # generate the global share sorting
0825             tmp_log.debug(f"sorting by gshare")
0826             return self.getCriteriaForGlobalShares(site_name, max_jobs)
0827 
0828     # get selection criteria for share of production activities
0829     def getCriteriaForGlobalShares(self, site_name, max_jobs):
0830         comment = " /* DBProxy.getCriteriaForGlobalShare */"
0831         tmp_log = self.create_tagged_logger(comment)
0832         # return for no criteria
0833         var_map = {}
0834         ret_empty = "", {}
0835 
0836         try:
0837             # Get the share leaves sorted by order of under-pledging
0838             tmp_log.debug(f"Going to call get sorted leaves")
0839             t_before = time.time()
0840             sorted_leaves = self.get_sorted_leaves()
0841             t_after = time.time()
0842             total = t_after - t_before
0843             tmp_log.debug(f"Sorting leaves took {total}s")
0844 
0845             i = 0
0846             tmp_list = []
0847             for leave in sorted_leaves:
0848                 var_map[f":leave{i}"] = leave.name
0849                 if leave.name == "Test":
0850                     # Test share will bypass others for the moment
0851                     tmp_list.append(f"WHEN gshare=:leave{i} THEN 0")
0852                 else:
0853                     tmp_list.append("WHEN gshare=:leave{0} THEN {0}".format(i))
0854                 i += 1
0855 
0856             # Only get max_jobs, to avoid getting all activated jobs from the table
0857             var_map[":njobs"] = max_jobs
0858 
0859             # We want to sort by global share, highest priority and lowest pandaid
0860             leave_bindings = " ".join(tmp_list)
0861             ret_sql = f"""
0862                       ORDER BY (CASE {leave_bindings} ELSE {len(sorted_leaves)} END), currentpriority desc, pandaid asc)
0863                       WHERE ROWNUM <= :njobs
0864                       """
0865 
0866             tmp_log.debug(f"ret_sql: {ret_sql}. var_map: {var_map}")
0867             return ret_sql, var_map
0868 
0869         except Exception:
0870             # roll back
0871             self._rollback()
0872             self.dump_error_message(tmp_log)
0873             return ret_empty
0874 
0875     # get selection criteria for share of production activities
0876     def getCriteriaByAge(self, site_name, max_jobs):
0877         comment = " /* DBProxy.getCriteriaByAge */"
0878         tmp_log = self.create_tagged_logger(comment)
0879         # return for no criteria
0880         ret_sql = ""
0881         var_map = {}
0882         ret_empty = "", {}
0883 
0884         try:
0885             # Only get max_jobs, to avoid getting all activated jobs from the table
0886             var_map[":njobs"] = max_jobs
0887 
0888             # We want to ignore global share and just take the oldest pandaid
0889             ret_sql = """
0890                       ORDER BY pandaid asc)
0891                       WHERE ROWNUM <= :njobs
0892                       """
0893 
0894             tmp_log.debug(f"ret_sql: {ret_sql}. var_map: {var_map}")
0895             return ret_sql, var_map
0896 
0897         except Exception:
0898             # roll back
0899             self._rollback()
0900             self.dump_error_message(tmp_log)
0901             return ret_empty
0902 
0903     # set HS06sec
0904     def setHS06sec(self, pandaID, inActive=False):
0905         comment = " /* DBProxy.setHS06sec */"
0906         tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0907         tmp_log.debug("start")
0908         hs06sec = None
0909 
0910         # sql to get job attributes
0911         sqlJ = "SELECT jediTaskID,startTime,endTime,actualCoreCount,coreCount,jobMetrics,computingSite "
0912         if inActive:
0913             sqlJ += "FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
0914         else:
0915             sqlJ += "FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
0916 
0917         # sql to update HS06sec
0918         if inActive:
0919             sqlU = "UPDATE ATLAS_PANDA.jobsActive4 "
0920         else:
0921             sqlU = "UPDATE ATLAS_PANDA.jobsArchived4 "
0922         sqlU += "SET hs06sec=:hs06sec WHERE PandaID=:PandaID "
0923 
0924         # get job attributes
0925         varMap = {}
0926         varMap[":PandaID"] = pandaID
0927         self.cur.execute(sqlJ + comment, varMap)
0928         resJ = self.cur.fetchone()
0929         if resJ is None:
0930             tmp_log.debug("skip since job not found")
0931         else:
0932             (
0933                 jediTaskID,
0934                 startTime,
0935                 endTime,
0936                 actualCoreCount,
0937                 defCoreCount,
0938                 jobMetrics,
0939                 computingSite,
0940             ) = resJ
0941             # get corePower
0942             corePower, tmpMsg = self.get_core_power(computingSite)
0943             if corePower is None:
0944                 tmp_log.debug(f"skip since corePower is undefined for site={computingSite}")
0945             else:
0946                 # get core count
0947                 coreCount = JobUtils.getCoreCount(actualCoreCount, defCoreCount, jobMetrics)
0948                 # get HS06sec
0949                 hs06sec = JobUtils.getHS06sec(startTime, endTime, corePower, coreCount)
0950                 if hs06sec is None:
0951                     tmp_log.debug("skip since HS06sec is None")
0952                 else:
0953                     # cap
0954                     hs06sec = int(hs06sec)
0955                     maxHS06sec = 999999999
0956                     if hs06sec > maxHS06sec:
0957                         hs06sec = maxHS06sec
0958                     # update HS06sec
0959                     varMap = {}
0960                     varMap[":PandaID"] = pandaID
0961                     varMap[":hs06sec"] = hs06sec
0962                     self.cur.execute(sqlU + comment, varMap)
0963                     tmp_log.debug(f"set HS06sec={hs06sec}")
0964         # return
0965         return hs06sec
0966 
0967     def convert_computingsite_to_region(self, computing_site):
0968         comment = " /* DBProxy.convert_computingsite_to_region */"
0969 
0970         var_map = {":panda_queue": computing_site}
0971         sql = "SELECT /* use_json_type */ scj.data.region FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.panda_queue=:panda_queue"
0972         self.cur.arraysize = 100
0973         self.cur.execute(sql + comment, var_map)
0974         res_region = self.cur.fetchone()
0975         region = "GRID"  # when region is not defined, take average values
0976         if res_region:
0977             region = res_region[0]
0978 
0979         return region
0980 
0981     def get_co2_emissions_site(self, computing_site):
0982         comment = " /* DBProxy.get_co2_emissions_site */"
0983         region = self.convert_computingsite_to_region(computing_site)
0984         if not region:
0985             return None
0986 
0987         var_map = {":region": region}
0988         sql = "SELECT timestamp, region, value FROM ATLAS_PANDA.CARBON_REGION_EMISSIONS WHERE region=:region"
0989         self.cur.execute(sql + comment, var_map)
0990         results = self.cur.fetchall()
0991         return results
0992 
0993     def get_co2_emissions_grid(self):
0994         comment = " /* DBProxy.get_co2_emissions_grid */"
0995 
0996         sql = "SELECT timestamp, region, value FROM ATLAS_PANDA.CARBON_REGION_EMISSIONS WHERE region='GRID'"
0997         self.cur.execute(sql + comment)
0998         results = self.cur.fetchall()
0999         return results
1000 
1001     # set CO2 emissions
1002     def set_co2_emissions(self, panda_id, in_active=False):
1003         comment = " /* DBProxy.set_co2_emissions */"
1004         tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id}")
1005         tmp_log.debug("start")
1006         gco2_regional, gco2_global = None, None
1007 
1008         # sql to get job attributes
1009         sql_read = "SELECT jediTaskID, startTime, endTime, actualCoreCount, coreCount, jobMetrics, computingSite "
1010         if in_active:
1011             sql_read += "FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
1012         else:
1013             sql_read += "FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
1014 
1015         # sql to update CO2 emissions
1016         if in_active:
1017             sql_update = "UPDATE ATLAS_PANDA.jobsActive4 "
1018         else:
1019             sql_update = "UPDATE ATLAS_PANDA.jobsArchived4 "
1020         sql_update += "SET gCO2_global=:gco2_global, gCO2_regional=:gco2_regional WHERE PandaID=:PandaID "
1021 
1022         # get job attributes
1023         var_map = {":PandaID": panda_id}
1024         self.cur.execute(sql_read + comment, var_map)
1025         res_read = self.cur.fetchone()
1026         if res_read is None:
1027             tmp_log.debug("skip since job not found")
1028         else:
1029             (
1030                 task_id,
1031                 start_time,
1032                 end_time,
1033                 actual_cores,
1034                 defined_cores,
1035                 job_metrics,
1036                 computing_site,
1037             ) = res_read
1038 
1039             # get core count
1040             core_count = JobUtils.getCoreCount(actual_cores, defined_cores, job_metrics)
1041 
1042             # get the queues watts per core value
1043             var_map = {":panda_queue": computing_site}
1044             sql_wpc = "SELECT /* use_json_type */ scj.data.coreenergy FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.panda_queue=:panda_queue"
1045             self.cur.arraysize = 100
1046             self.cur.execute(sql_wpc + comment, var_map)
1047             res_wpc = self.cur.fetchone()
1048             try:
1049                 watts_per_core = float(res_wpc[0])
1050             except Exception:
1051                 watts_per_core = 10
1052             tmp_log.debug(f"using watts_per_core={watts_per_core} for computing_site={computing_site}")
1053 
1054             # get regional CO2 emissions
1055             co2_emissions = self.get_co2_emissions_site(computing_site)
1056             if not co2_emissions:
1057                 tmp_log.debug(f"skip since co2_emissions are undefined for site={computing_site}")
1058             else:
1059                 # get emitted CO2 for the job
1060                 gco2_regional = JobUtils.get_job_co2(start_time, end_time, core_count, co2_emissions, watts_per_core)
1061                 if gco2_regional is None:
1062                     tmp_log.debug("skip since the co2 emissions could not be calculated")
1063                 else:
1064                     max_gco2 = 999999999
1065                     gco2_regional = min(gco2_regional, max_gco2)
1066                     tmp_log.debug(f"set gco2_regional={gco2_regional}")
1067 
1068             # get globally averaged CO2 emissions
1069             co2_emissions = self.get_co2_emissions_grid()
1070             if not co2_emissions:
1071                 tmp_log.debug("skip since co2_emissions are undefined for the grid")
1072             else:
1073                 # get emitted CO2 for the job
1074                 gco2_global = JobUtils.get_job_co2(start_time, end_time, core_count, co2_emissions, watts_per_core)
1075                 if gco2_global is None:
1076                     tmp_log.debug("skip since the co2 emissions could not be calculated")
1077                 else:
1078                     max_gco2 = 999999999
1079                     gco2_global = min(gco2_global, max_gco2)
1080 
1081                     tmp_log.debug(f"set gco2_global={gco2_global}")
1082 
1083             var_map = {
1084                 ":PandaID": panda_id,
1085                 ":gco2_regional": gco2_regional,
1086                 ":gco2_global": gco2_global,
1087             }
1088             self.cur.execute(sql_update + comment, var_map)
1089 
1090         tmp_log.debug("done")
1091         # return
1092         return gco2_regional, gco2_global
1093 
1094     # get core power
1095     @memoize
1096     def get_core_power(self, site_id):
1097         comment = " /* DBProxy.get_core_power */"
1098         tmp_log = self.create_tagged_logger(comment, f"siteid={site_id}")
1099         tmp_log.debug("start")
1100 
1101         sqlS = "SELECT /* use_json_type */ scj.data.corepower FROM ATLAS_PANDA.schedconfig_json scj "
1102         sqlS += "WHERE panda_queue=:siteid "
1103 
1104         varMap = {":siteid": site_id}
1105 
1106         try:
1107             self.cur.arraysize = 100
1108             self.cur.execute(sqlS + comment, varMap)
1109             resS = self.cur.fetchone()
1110             core_power = None
1111             if resS is not None:
1112                 (core_power,) = resS
1113                 core_power = float(core_power)
1114             tmp_log.debug(f"got {core_power}")
1115             return core_power, None
1116 
1117         except Exception:
1118             # error
1119             self.dump_error_message(tmp_log)
1120             return None, "failed to get corePower"
1121 
1122     # convert ObjID to endpoint
1123     @memoize
1124     def convertObjIDtoEndPoint(self, srcFileName, objID):
1125         comment = " /* DBProxy.convertObjIDtoEndPoint */"
1126         tmp_log = self.create_tagged_logger(comment, f"ID={objID}")
1127         tmp_log.debug("start")
1128         try:
1129             for srcFile in srcFileName.split(","):
1130                 if not os.path.exists(srcFile):
1131                     continue
1132                 with open(srcFile) as f:
1133                     data = json.load(f)
1134                     for rseName in data:
1135                         rseData = data[rseName]
1136                         if objID in [rseData["id"], rseName]:
1137                             retMap = {
1138                                 "name": rseName,
1139                                 "is_deterministic": rseData["is_deterministic"],
1140                                 "type": rseData["type"],
1141                             }
1142                             tmp_log.debug(f"got {str(retMap)}")
1143                             return retMap
1144             tmp_log.debug("not found")
1145         except Exception:
1146             # error
1147             self.dump_error_message(tmp_log)
1148             return None
1149 
1150     def get_config_for_pq(self, pq_name):
1151         """
1152         Get the CRIC json configuration for a particular queue
1153         """
1154 
1155         comment = " /* DBProxy.get_config_for_pq */"
1156         tmp_log = self.create_tagged_logger(comment, pq_name)
1157         tmp_log.debug("start")
1158 
1159         var_map = {":pq": pq_name}
1160         sql_get_queue_config = """
1161         SELECT data FROM ATLAS_PANDA.SCHEDCONFIG_JSON
1162         WHERE panda_queue = :pq
1163         """
1164         tmp_v, pq_data = self.getClobObj(sql_get_queue_config + comment, var_map)
1165         if pq_data is None:
1166             tmp_log.error("Could not find queue configuration")
1167             return None
1168 
1169         try:
1170             pq_data_des = pq_data[0][0]
1171             if not isinstance(pq_data_des, dict):
1172                 pq_data_des = json.loads(pq_data_des)
1173         except Exception:
1174             tmp_log.error("Could not find queue configuration")
1175             return None
1176 
1177         tmp_log.debug("done")
1178         return pq_data_des
1179 
1180     def getQueuesInJSONSchedconfig(self):
1181         comment = " /* DBProxy.getQueuesInJSONSchedconfig */"
1182         tmp_log = self.create_tagged_logger(comment)
1183         tmp_log.debug("start")
1184         try:
1185             # sql to get workers
1186             sqlC = "SELECT /* use_json_type */ panda_queue FROM ATLAS_PANDA.schedconfig_json"
1187             # start transaction
1188             self.conn.begin()
1189             self.cur.execute(sqlC + comment)
1190             panda_queues = [row[0] for row in self.cur.fetchall()]
1191             # commit
1192             if not self._commit():
1193                 raise RuntimeError("Commit error")
1194             tmp_log.debug(f"got {len(panda_queues)} queues")
1195             return panda_queues
1196         except Exception:
1197             # roll back
1198             self._rollback()
1199             # error
1200             self.dump_error_message(tmp_log)
1201             return None
1202 
1203     # update queues
1204     def upsertQueuesInJSONSchedconfig(self, schedconfig_dump):
1205         comment = " /* DBProxy.upsertQueuesInJSONSchedconfig */"
1206         tmp_log = self.create_tagged_logger(comment)
1207         tmp_log.debug("start")
1208 
1209         if not schedconfig_dump:
1210             tmp_log.error("empty schedconfig dump")
1211             return "ERROR"
1212 
1213         try:
1214             existing_queues = self.getQueuesInJSONSchedconfig()
1215             if existing_queues is None:
1216                 tmp_log.error("Could not retrieve already existing queues")
1217                 return None
1218 
1219             # separate the queues to the ones we have to update (existing) and the ones we have to insert (new)
1220             var_map_insert = []
1221             var_map_update = []
1222             utc_now = naive_utcnow()
1223             for pq in schedconfig_dump:
1224                 data = json.dumps(schedconfig_dump[pq])
1225                 if not data:
1226                     tmp_log.error(f"no data for {pq}")
1227                     continue
1228 
1229                 if pq in existing_queues:
1230                     tmp_log.debug(f"pq {pq} present")
1231                     var_map_update.append({":pq": pq, ":data": data, ":last_update": utc_now})
1232                 else:
1233                     tmp_log.debug(f"pq {pq} is new")
1234                     var_map_insert.append({":pq": pq, ":data": data, ":last_update": utc_now})
1235 
1236             # start transaction
1237             self.conn.begin()
1238 
1239             # run the updates
1240             if var_map_update:
1241                 sql_update = """
1242                              UPDATE ATLAS_PANDA.SCHEDCONFIG_JSON SET data = :data, last_update = :last_update
1243                              WHERE panda_queue = :pq
1244                              """
1245                 tmp_log.debug("start updates")
1246                 self.cur.executemany(sql_update + comment, var_map_update)
1247                 tmp_log.debug("finished updates")
1248 
1249             # run the inserts
1250             if var_map_insert:
1251                 sql_insert = """
1252                              INSERT INTO ATLAS_PANDA.SCHEDCONFIG_JSON (panda_queue, data, last_update)
1253                              VALUES (:pq, :data, :last_update)
1254                              """
1255                 tmp_log.debug("start inserts")
1256                 self.cur.executemany(sql_insert + comment, var_map_insert)
1257                 tmp_log.debug("finished inserts")
1258 
1259             # delete inactive queues
1260             tmp_log.debug("Going to delete obsoleted queues")
1261             sql_delete = """
1262                          DELETE FROM ATLAS_PANDA.SCHEDCONFIG_JSON WHERE last_update < current_date - INTERVAL '7' DAY
1263                          """
1264             self.cur.execute(sql_delete + comment)
1265             tmp_log.debug("deleted old queues")
1266 
1267             if not self._commit():
1268                 raise RuntimeError("Commit error")
1269 
1270             tmp_log.debug("done")
1271             return "OK"
1272 
1273         except Exception:
1274             # roll back
1275             self._rollback()
1276             self.dump_error_message(tmp_log)
1277             return "ERROR"
1278 
1279     # update queues
1280     def loadSWTags(self, sw_tags):
1281         comment = " /* DBProxy.loadSWTags */"
1282         tmp_log = self.create_tagged_logger(comment)
1283         tmp_log.debug("start")
1284 
1285         if not sw_tags:
1286             tmp_log.error("empty sw tag dump")
1287             return "ERROR"
1288 
1289         try:
1290             var_map_tags = []
1291 
1292             utc_now = naive_utcnow()
1293             for pq in sw_tags:
1294                 data = sw_tags[pq]
1295                 var_map_tags.append({":pq": pq, ":data": json.dumps(data), ":last_update": utc_now})
1296 
1297             # start transaction on SW_TAGS table
1298             # delete everything in the table to start every time from a clean table
1299             # cleaning and filling needs to be done within the same transaction
1300             self.conn.begin()
1301 
1302             sql_delete = "DELETE FROM ATLAS_PANDA.SW_TAGS"
1303             tmp_log.debug("start cleaning up SW_TAGS table")
1304             self.cur.execute(sql_delete + comment)
1305             tmp_log.debug("done cleaning up SW_TAGS table")
1306 
1307             sql_insert = "INSERT INTO ATLAS_PANDA.SW_TAGS (panda_queue, data, last_update) VALUES (:pq, :data, :last_update)"
1308             tmp_log.debug("start filling up SW_TAGS table")
1309             for shard in create_shards(var_map_tags, 100):  # insert in batches of 100 rows
1310                 self.cur.executemany(sql_insert + comment, shard)
1311             tmp_log.debug("done filling up table")
1312             if not self._commit():
1313                 raise RuntimeError("Commit error")
1314 
1315             tmp_log.debug("done")
1316             return "OK"
1317 
1318         except Exception:
1319             # roll back
1320             self._rollback()
1321             self.dump_error_message(tmp_log)
1322             return "ERROR"
1323 
1324     # get working group with production role
1325     def getWorkingGroup(self, fqans):
1326         for fqan in fqans:
1327             # check production role
1328             match = re.search("/[^/]+/([^/]+)/Role=production", fqan)
1329             if match is not None:
1330                 return match.group(1)
1331         return None
1332 
1333     # update site data
1334     def updateSiteData(self, hostID, pilotRequests, interval):
1335         comment = " /* DBProxy.updateSiteData */"
1336         tmp_log = self.create_tagged_logger(comment)
1337         tmp_log.debug("start")
1338 
1339         sqlDel = "DELETE FROM ATLAS_PANDAMETA.SiteData WHERE LASTMOD < :LASTMOD"
1340 
1341         sqlRst = (
1342             "UPDATE ATLAS_PANDAMETA.SiteData "
1343             "SET GETJOB = :GETJOB, UPDATEJOB = :UPDATEJOB, NOJOB = :NOJOB, "
1344             "GETJOBABS = :GETJOBABS, UPDATEJOBABS = :UPDATEJOBABS, NOJOBABS = :NOJOBABS "
1345             "WHERE HOURS = :HOURS AND LASTMOD < :LASTMOD"
1346         )
1347 
1348         sqlCh = "SELECT * FROM ATLAS_PANDAMETA.SiteData WHERE FLAG = :FLAG AND HOURS = :HOURS AND SITE = :SITE FOR UPDATE NOWAIT "
1349 
1350         sqlIn = (
1351             "INSERT INTO ATLAS_PANDAMETA.SiteData "
1352             "(SITE, FLAG, HOURS, GETJOB, UPDATEJOB, NOJOB, GETJOBABS, UPDATEJOBABS, NOJOBABS, "
1353             "LASTMOD, NSTART, FINISHED, FAILED, DEFINED, ASSIGNED, WAITING, ACTIVATED, HOLDING, RUNNING, TRANSFERRING) "
1354             "VALUES (:SITE, :FLAG, :HOURS, :GETJOB, :UPDATEJOB, :NOJOB, :GETJOBABS, :UPDATEJOBABS, :NOJOBABS, CURRENT_DATE, "
1355             "0, 0, 0, 0, 0, 0, 0, 0, 0, 0)"
1356         )
1357 
1358         sqlUp = (
1359             "UPDATE ATLAS_PANDAMETA.SiteData "
1360             "SET GETJOB = :GETJOB, UPDATEJOB = :UPDATEJOB, NOJOB = :NOJOB, "
1361             "GETJOBABS = :GETJOBABS, UPDATEJOBABS = :UPDATEJOBABS, NOJOBABS = :NOJOBABS, LASTMOD = CURRENT_DATE "
1362             "WHERE FLAG = :FLAG AND HOURS = :HOURS AND SITE = :SITE"
1363         )
1364 
1365         sqlAll = "SELECT GETJOB, UPDATEJOB, NOJOB, GETJOBABS, UPDATEJOBABS, NOJOBABS, FLAG FROM ATLAS_PANDAMETA.SiteData WHERE HOURS = :HOURS AND SITE = :SITE"
1366 
1367         try:
1368             timeNow = naive_utcnow()
1369             self.conn.begin()
1370             # delete old records
1371             varMap = {}
1372             varMap[":LASTMOD"] = timeNow - datetime.timedelta(hours=48)
1373             self.cur.execute(sqlDel + comment, varMap)
1374             # set 0 to old records
1375             varMap = {}
1376             varMap[":HOURS"] = interval
1377             varMap[":GETJOB"] = 0
1378             varMap[":UPDATEJOB"] = 0
1379             varMap[":NOJOB"] = 0
1380             varMap[":GETJOBABS"] = 0
1381             varMap[":UPDATEJOBABS"] = 0
1382             varMap[":NOJOBABS"] = 0
1383             varMap[":LASTMOD"] = timeNow - datetime.timedelta(hours=interval)
1384             self.cur.execute(sqlRst + comment, varMap)
1385             # commit
1386             if not self._commit():
1387                 raise RuntimeError("Commit error")
1388             # shuffle to avoid concatenation
1389             tmpSiteList = list(pilotRequests)
1390             random.shuffle(tmpSiteList)
1391             # loop over all sites
1392             for tmpSite in tmpSiteList:
1393                 tmpVal = pilotRequests[tmpSite]
1394                 # start transaction
1395                 self.conn.begin()
1396                 # check individual host info first
1397                 varMap = {}
1398                 varMap[":FLAG"] = hostID
1399                 varMap[":SITE"] = tmpSite
1400                 varMap[":HOURS"] = interval
1401                 self.cur.arraysize = 10
1402                 locked = True
1403                 try:
1404                     # lock individual row
1405                     self.cur.execute(sqlCh + comment, varMap)
1406                 except Exception as e:
1407                     # skip since it is being locked by another
1408                     tmp_log.debug(f"skip to update {str(varMap)} due to {str(e)}")
1409                     locked = False
1410                 if locked:
1411                     res = self.cur.fetchone()
1412                     # row exists or not
1413                     if res is None:
1414                         sql = sqlIn
1415                     else:
1416                         sql = sqlUp
1417 
1418                     # getJob, updateJob and noJob entries contain the number of slots/nodes that submitted the request
1419                     # getJobAbs, updateJobAbs and noJobAbs entries contain the absolute number of requests
1420                     if "getJob" in tmpVal:
1421                         varMap[":GETJOB"] = len(tmpVal["getJob"])
1422                         getJobAbs = 0
1423                         for node in tmpVal["getJob"]:
1424                             getJobAbs += tmpVal["getJob"][node]
1425                         varMap[":GETJOBABS"] = getJobAbs
1426                     else:
1427                         varMap[":GETJOB"] = 0
1428                         varMap[":GETJOBABS"] = 0
1429 
1430                     if "updateJob" in tmpVal:
1431                         varMap[":UPDATEJOB"] = len(tmpVal["updateJob"])
1432                         updateJobAbs = 0
1433                         for node in tmpVal["updateJob"]:
1434                             updateJobAbs += tmpVal["updateJob"][node]
1435                         varMap[":UPDATEJOBABS"] = updateJobAbs
1436                     else:
1437                         varMap[":UPDATEJOB"] = 0
1438                         varMap[":UPDATEJOBABS"] = 0
1439 
1440                     if "noJob" in tmpVal:
1441                         varMap[":NOJOB"] = len(tmpVal["noJob"])
1442                         noJobAbs = 0
1443                         for node in tmpVal["noJob"]:
1444                             noJobAbs += tmpVal["noJob"][node]
1445                         varMap[":NOJOBABS"] = noJobAbs
1446                     else:
1447                         varMap[":NOJOB"] = 0
1448                         varMap[":NOJOBABS"] = 0
1449 
1450                     # update
1451                     self.cur.execute(sql + comment, varMap)
1452 
1453                 # commit
1454                 if not self._commit():
1455                     raise RuntimeError("Commit error")
1456 
1457                 if locked:
1458                     # start transaction
1459                     self.conn.begin()
1460                     # get all info
1461                     sumExist = False
1462                     varMap = {}
1463                     varMap[":SITE"] = tmpSite
1464                     varMap[":HOURS"] = interval
1465                     self.cur.arraysize = 100
1466                     self.cur.execute(sqlAll + comment, varMap)
1467                     res = self.cur.fetchall()
1468                     # get total getJob/updateJob
1469                     varMap[":GETJOB"] = 0
1470                     varMap[":UPDATEJOB"] = 0
1471                     varMap[":NOJOB"] = 0
1472                     varMap[":GETJOBABS"] = 0
1473                     varMap[":UPDATEJOBABS"] = 0
1474                     varMap[":NOJOBABS"] = 0
1475                     nCol = 0
1476                     for (
1477                         tmpGetJob,
1478                         tmpUpdateJob,
1479                         tmpNoJob,
1480                         tmpGetJobAbs,
1481                         tmpUpdateJobAbs,
1482                         tmpNoJobAbs,
1483                         tmpFlag,
1484                     ) in res:
1485                         # don't use summed info
1486                         if tmpFlag == "production":
1487                             sumExist = True
1488                             continue
1489                         if tmpFlag == "analysis":
1490                             if tmpSite.startswith("ANALY_"):
1491                                 sumExist = True
1492                             continue
1493                         if tmpFlag in ["test"]:
1494                             continue
1495 
1496                         if tmpGetJob is None:
1497                             tmpGetJob = 0
1498                         if tmpUpdateJob is None:
1499                             tmpUpdateJob = 0
1500                         if tmpNoJob is None:
1501                             tmpNoJob = 0
1502                         if tmpGetJobAbs is None:
1503                             tmpGetJobAbs = 0
1504                         if tmpUpdateJobAbs is None:
1505                             tmpUpdateJobAbs = 0
1506                         if tmpNoJobAbs is None:
1507                             tmpNoJobAbs = 0
1508 
1509                         # sum
1510                         varMap[":GETJOB"] += tmpGetJob
1511                         varMap[":UPDATEJOB"] += tmpUpdateJob
1512                         varMap[":NOJOB"] += tmpNoJob
1513                         varMap[":GETJOBABS"] += tmpGetJobAbs
1514                         varMap[":UPDATEJOBABS"] += tmpUpdateJobAbs
1515                         varMap[":NOJOBABS"] += tmpNoJobAbs
1516                         nCol += 1
1517                     # get average
1518                     if nCol != 0:
1519                         if varMap[":GETJOB"] >= nCol:
1520                             varMap[":GETJOB"] /= nCol
1521                         if varMap[":UPDATEJOB"] >= nCol:
1522                             varMap[":UPDATEJOB"] /= nCol
1523                         if varMap[":NOJOB"] >= nCol:
1524                             varMap[":NOJOB"] /= nCol
1525                         if varMap[":GETJOBABS"] >= nCol:
1526                             varMap[":GETJOBABS"] /= nCol
1527                         if varMap[":UPDATEJOBABS"] >= nCol:
1528                             varMap[":UPDATEJOBABS"] /= nCol
1529                         if varMap[":NOJOBABS"] >= nCol:
1530                             varMap[":NOJOBABS"] /= nCol
1531 
1532                     if tmpSite.startswith("ANALY_"):
1533                         varMap[":FLAG"] = "analysis"
1534                     else:
1535                         varMap[":FLAG"] = "production"
1536                     # row exists or not
1537                     locked_sum = True
1538                     if sumExist:
1539                         sql = sqlUp
1540                     else:
1541                         sql = sqlIn
1542                         # lock the summary row
1543                         var_map = {k: varMap[k] for k in [":FLAG", ":SITE", ":HOURS"]}
1544                         try:
1545                             # lock it
1546                             self.cur.execute(sqlCh + comment, var_map)
1547                         except Exception as e:
1548                             # skip since it is being locked by another
1549                             tmp_log.debug(f"skip to update {str(var_map)} due to {str(e)}")
1550                             locked_sum = False
1551                     # update
1552                     if locked_sum:
1553                         self.cur.execute(sql + comment, varMap)
1554                         tmp_log.debug(
1555                             " %s hours=%s getJob=%s updateJob=%s, noJob=%s, getJobAbs=%s updateJobAbs=%s, noJobAbs=%s"
1556                             % (
1557                                 tmpSite,
1558                                 interval,
1559                                 varMap[":GETJOB"],
1560                                 varMap[":UPDATEJOB"],
1561                                 varMap[":NOJOB"],
1562                                 varMap[":GETJOBABS"],
1563                                 varMap[":UPDATEJOBABS"],
1564                                 varMap[":NOJOBABS"],
1565                             )
1566                         )
1567                         # commit
1568                     if not self._commit():
1569                         raise RuntimeError("Commit error")
1570             tmp_log.debug("done")
1571             return True
1572         except Exception:
1573             # roll back
1574             self._rollback()
1575             self.dump_error_message(tmp_log)
1576             return False
1577 
1578     # get site data
1579     def getCurrentSiteData(self):
1580         comment = " /* DBProxy.getCurrentSiteData */"
1581         tmp_log = self.create_tagged_logger(comment)
1582         sql = "SELECT SITE,getJob,updateJob,FLAG FROM ATLAS_PANDAMETA.SiteData WHERE FLAG IN (:FLAG1,:FLAG2) and HOURS=3"
1583         varMap = {}
1584         varMap[":FLAG1"] = "production"
1585         varMap[":FLAG2"] = "analysis"
1586         try:
1587             # set autocommit on
1588             self.conn.begin()
1589             # select
1590             self.cur.arraysize = 10000
1591             self.cur.execute(sql + comment, varMap)
1592             res = self.cur.fetchall()
1593             # commit
1594             if not self._commit():
1595                 raise RuntimeError("Commit error")
1596             ret = {}
1597             for site, getJob, updateJob, flag in res:
1598                 if site.startswith("ANALY_"):
1599                     if flag != "analysis":
1600                         continue
1601                 else:
1602                     if flag != "production":
1603                         continue
1604                 ret[site] = {"getJob": getJob, "updateJob": updateJob}
1605             return ret
1606         except Exception:
1607             self.dump_error_message(tmp_log)
1608             # roll back
1609             self._rollback()
1610             return {}
1611 
1612     # insert nRunning in site data
1613     def insertnRunningInSiteData(self):
1614         comment = " /* DBProxy.insertnRunningInSiteData */"
1615         tmp_log = self.create_tagged_logger(comment)
1616         tmp_log.debug("start")
1617 
1618         sqlDel = "DELETE FROM ATLAS_PANDAMETA.SiteData WHERE FLAG IN (:FLAG1, :FLAG2) AND LASTMOD < CURRENT_DATE - 1"
1619 
1620         sqlRun = (
1621             "SELECT COUNT(*), computingSite "
1622             "FROM ATLAS_PANDA.jobsActive4 "
1623             "WHERE prodSourceLabel IN (:prodSourceLabel1, :prodSourceLabel2) "
1624             "AND jobStatus = :jobStatus "
1625             "GROUP BY computingSite"
1626         )
1627 
1628         sqlCh = "SELECT COUNT(*) FROM ATLAS_PANDAMETA.SiteData WHERE FLAG = :FLAG AND HOURS = :HOURS AND SITE = :SITE"
1629 
1630         sqlIn = (
1631             "INSERT INTO ATLAS_PANDAMETA.SiteData "
1632             "(SITE, FLAG, HOURS, GETJOB, UPDATEJOB, LASTMOD, "
1633             "NSTART, FINISHED, FAILED, DEFINED, ASSIGNED, WAITING, "
1634             "ACTIVATED, HOLDING, RUNNING, TRANSFERRING) "
1635             "VALUES (:SITE, :FLAG, :HOURS, 0, 0, CURRENT_DATE, "
1636             "0, 0, 0, 0, 0, 0, 0, 0, :RUNNING, 0)"
1637         )
1638 
1639         sqlUp = "UPDATE ATLAS_PANDAMETA.SiteData SET RUNNING = :RUNNING, LASTMOD = CURRENT_DATE WHERE FLAG = :FLAG AND HOURS = :HOURS AND SITE = :SITE"
1640 
1641         sqlMax = "SELECT SITE, MAX(RUNNING) FROM ATLAS_PANDAMETA.SiteData WHERE FLAG = :FLAG GROUP BY SITE"
1642 
1643         try:
1644             # use offset(1000)+minutes for :HOURS
1645             timeNow = naive_utcnow()
1646             nHours = 1000 + timeNow.hour * 60 + timeNow.minute
1647             # delete old records
1648             varMap = {}
1649             varMap[":FLAG1"] = "max"
1650             varMap[":FLAG2"] = "snapshot"
1651             self.conn.begin()
1652             self.cur.execute(sqlDel + comment, varMap)
1653             # commit
1654             if not self._commit():
1655                 raise RuntimeError("Commit error")
1656             # get nRunning
1657             varMap = {}
1658             varMap[":jobStatus"] = "running"
1659             varMap[":prodSourceLabel1"] = "user"
1660             varMap[":prodSourceLabel2"] = "panda"
1661             self.conn.begin()
1662             self.cur.arraysize = 10000
1663             self.cur.execute(sqlRun + comment, varMap)
1664             res = self.cur.fetchall()
1665             # commit
1666             if not self._commit():
1667                 raise RuntimeError("Commit error")
1668             # loop over all sites
1669             for nRunning, computingSite in res:
1670                 # only ANALY_ sites
1671                 if not computingSite.startswith("ANALY_"):
1672                     continue
1673                 # check if the row is already there
1674                 varMap = {}
1675                 varMap[":FLAG"] = "snapshot"
1676                 varMap[":SITE"] = computingSite
1677                 varMap[":HOURS"] = nHours
1678                 # start transaction
1679                 self.conn.begin()
1680                 self.cur.arraysize = 10
1681                 self.cur.execute(sqlCh + comment, varMap)
1682                 res = self.cur.fetchone()
1683                 # row exists or not
1684                 if res[0] == 0:
1685                     sql = sqlIn
1686                 else:
1687                     sql = sqlUp
1688                 # set current nRunning
1689                 varMap = {}
1690                 varMap[":FLAG"] = "snapshot"
1691                 varMap[":SITE"] = computingSite
1692                 varMap[":HOURS"] = nHours
1693                 varMap[":RUNNING"] = nRunning
1694                 # insert or update
1695                 self.cur.execute(sql + comment, varMap)
1696                 # commit
1697                 if not self._commit():
1698                     raise RuntimeError("Commit error")
1699             # get max nRunning
1700             varMap = {}
1701             varMap[":FLAG"] = "snapshot"
1702             self.conn.begin()
1703             self.cur.arraysize = 10000
1704             self.cur.execute(sqlMax + comment, varMap)
1705             res = self.cur.fetchall()
1706             # commit
1707             if not self._commit():
1708                 raise RuntimeError("Commit error")
1709             # loop over all sites
1710             for computingSite, maxnRunning in res:
1711                 # start transaction
1712                 self.conn.begin()
1713                 # check if the row is already there
1714                 varMap = {}
1715                 varMap[":FLAG"] = "max"
1716                 varMap[":SITE"] = computingSite
1717                 varMap[":HOURS"] = 0
1718                 self.cur.arraysize = 10
1719                 self.cur.execute(sqlCh + comment, varMap)
1720                 res = self.cur.fetchone()
1721                 # row exists or not
1722                 if res[0] == 0:
1723                     sql = sqlIn
1724                 else:
1725                     sql = sqlUp
1726                 # set max nRunning
1727                 varMap = {}
1728                 varMap[":FLAG"] = "max"
1729                 varMap[":SITE"] = computingSite
1730                 varMap[":HOURS"] = 0
1731                 varMap[":RUNNING"] = maxnRunning
1732                 self.cur.execute(sql + comment, varMap)
1733                 # commit
1734                 if not self._commit():
1735                     raise RuntimeError("Commit error")
1736             tmp_log.debug("done")
1737             return True
1738         except Exception:
1739             # roll back
1740             self._rollback()
1741             self.dump_error_message(tmp_log)
1742             return False
1743 
1744     # get site info
1745     def getSiteInfo(self):
1746         comment = " /* DBProxy.getSiteInfo */"
1747         tmp_log = self.create_tagged_logger(comment)
1748         tmp_log.debug("start")
1749         try:
1750             # get DDM endpoints
1751             pandaEndpointMap, endpoint_detailed_status_summary = self.getDdmEndpoints()
1752 
1753             # sql to get site spec
1754             sql = """
1755                    SELECT /* use_json_type */ panda_queue, data, b.site_name, c.role
1756                    FROM (ATLAS_PANDA.schedconfig_json a
1757                    LEFT JOIN ATLAS_PANDA.panda_site b ON a.panda_queue = b.panda_site_name)
1758                    LEFT JOIN ATLAS_PANDA.site c ON b.site_name = c.site_name
1759                    WHERE panda_queue IS NOT NULL
1760                    """
1761             self.cur.arraysize = 10000
1762             # self.cur.execute(sql+comment)
1763             # resList = self.cur.fetchall()
1764             ret, resList = self.getClobObj(sql, {})
1765             if not resList:
1766                 tmp_log.error("Empty site list!")
1767 
1768             # set autocommit on
1769             self.conn.begin()
1770             # sql to get num slots
1771             sqlSL = "SELECT pandaQueueName, gshare, resourcetype, numslots FROM ATLAS_PANDA.Harvester_Slots "
1772             sqlSL += "WHERE (expirationTime IS NULL OR expirationTime>CURRENT_DATE) "
1773 
1774             num_slots_by_site = {}
1775             self.cur.execute(sqlSL + comment)
1776             resSL = self.cur.fetchall()
1777 
1778             for sl_queuename, sl_gshare, sl_resourcetype, sl_numslots in resSL:
1779                 if sl_numslots < 0:
1780                     continue
1781                 num_slots_by_site.setdefault(sl_queuename, {})
1782                 num_slots_by_site[sl_queuename].setdefault(sl_gshare, {})
1783                 num_slots_by_site[sl_queuename][sl_gshare][sl_resourcetype] = sl_numslots
1784 
1785             retList = {}
1786             if resList is not None:
1787                 # loop over all results
1788                 for res in resList:
1789                     try:  # don't let a problem with one queue break the whole map
1790                         # change None to ''
1791                         resTmp = []
1792                         for tmpItem in res:
1793                             if tmpItem is None:
1794                                 tmpItem = ""
1795                             resTmp.append(tmpItem)
1796 
1797                         siteid, queue_data_json, pandasite, role = resTmp
1798                         try:
1799                             if isinstance(queue_data_json, dict):
1800                                 queue_data = queue_data_json
1801                             else:
1802                                 queue_data = json.loads(queue_data_json)
1803                         except Exception:
1804                             tmp_log.error(f"loading json for queue {siteid} excepted. json was: {queue_data_json}")
1805                             continue
1806 
1807                         # skip invalid siteid
1808                         if siteid in [None, "", "ALL"] or not queue_data:
1809                             if siteid != "ALL":  # skip noisy error message for ALL
1810                                 tmp_log.error(f"siteid {siteid} had no queue_data {queue_data}")
1811                             continue
1812 
1813                         tmp_log.debug(f"processing queue {siteid}")
1814 
1815                         # instantiate SiteSpec
1816                         ret = SiteSpec.SiteSpec()
1817                         ret.sitename = siteid
1818                         ret.pandasite = pandasite
1819                         ret.role = role
1820 
1821                         ret.type = queue_data.get("type", "production")
1822                         ret.nickname = queue_data.get("nickname")
1823                         try:
1824                             ret.ddm = queue_data.get("ddm", "").split(",")[0]
1825                         except AttributeError:
1826                             ret.ddm = ""
1827                         try:
1828                             ret.cloud = queue_data.get("cloud", "").split(",")[0]
1829                         except AttributeError:
1830                             ret.cloud = ""
1831                         ret.memory = queue_data.get("memory")
1832                         ret.maxrss = queue_data.get("maxrss")
1833                         ret.minrss = queue_data.get("minrss")
1834                         ret.maxtime = queue_data.get("maxtime")
1835                         ret.status = queue_data.get("status")
1836                         ret.space = queue_data.get("space")
1837                         ret.maxinputsize = queue_data.get("maxinputsize")
1838                         ret.comment = queue_data.get("comment_")
1839                         ret.statusmodtime = queue_data.get("lastmod")
1840                         ret.catchall = queue_data.get("catchall")
1841                         ret.tier = queue_data.get("tier")
1842                         ret.jobseed = queue_data.get("jobseed")
1843                         ret.capability = queue_data.get("capability")
1844                         ret.workflow = queue_data.get("workflow")
1845                         ret.maxDiskio = queue_data.get("maxdiskio")
1846                         ret.pandasite_state = "ACTIVE"
1847                         ret.fairsharePolicy = queue_data.get("fairsharepolicy")
1848                         ret.defaulttoken = queue_data.get("defaulttoken")
1849 
1850                         ret.direct_access_lan = queue_data.get("direct_access_lan") is True
1851                         ret.direct_access_wan = queue_data.get("direct_access_wan") is True
1852 
1853                         ret.iscvmfs = queue_data.get("is_cvmfs") is True
1854 
1855                         if queue_data.get("corepower") is None:
1856                             ret.corepower = 0
1857                         else:
1858                             ret.corepower = queue_data.get("corepower")
1859 
1860                         ret.wnconnectivity = queue_data.get("wnconnectivity")
1861                         if ret.wnconnectivity == "":
1862                             ret.wnconnectivity = None
1863 
1864                         # maxwdir
1865                         try:
1866                             if queue_data.get("maxwdir") is None:
1867                                 ret.maxwdir = 0
1868                             else:
1869                                 ret.maxwdir = int(queue_data["maxwdir"])
1870                         except Exception:
1871                             if ret.maxinputsize in [0, None]:
1872                                 ret.maxwdir = 0
1873                             else:
1874                                 try:
1875                                     ret.maxwdir = ret.maxinputsize + 2000
1876                                 except Exception:
1877                                     ret.maxwdir = 16336
1878 
1879                         # mintime
1880                         if queue_data.get("mintime") is not None:
1881                             ret.mintime = queue_data["mintime"]
1882                         else:
1883                             ret.mintime = 0
1884 
1885                         # reliability
1886                         ret.reliabilityLevel = None
1887 
1888                         # pledged CPUs
1889                         ret.pledgedCPU = 0
1890                         if queue_data.get("pledgedcpu") not in ["", None]:
1891                             try:
1892                                 ret.pledgedCPU = int(queue_data["pledgedcpu"])
1893                             except Exception:
1894                                 pass
1895 
1896                         # core count
1897                         ret.coreCount = 0
1898                         if queue_data.get("corecount") not in ["", None]:
1899                             try:
1900                                 ret.coreCount = int(queue_data["corecount"])
1901                             except Exception:
1902                                 pass
1903 
1904                         # convert releases to list
1905                         ret.releases = []
1906                         if queue_data.get("releases"):
1907                             ret.releases = queue_data["releases"]
1908 
1909                         # convert validatedreleases to list
1910                         ret.validatedreleases = []
1911                         if queue_data.get("validatedreleases"):
1912                             for tmpRel in queue_data["validatedreleases"].split("|"):
1913                                 # remove white space
1914                                 tmpRel = tmpRel.strip()
1915                                 if tmpRel != "":
1916                                     ret.validatedreleases.append(tmpRel)
1917 
1918                         # limit of the number of transferring jobs
1919                         ret.transferringlimit = 0
1920                         if queue_data.get("transferringlimit") not in ["", None]:
1921                             try:
1922                                 ret.transferringlimit = int(queue_data["transferringlimit"])
1923                             except Exception:
1924                                 pass
1925 
1926                         # FAX
1927                         ret.allowfax = False
1928                         try:
1929                             if queue_data.get("catchall") is not None and "allowfax" in queue_data["catchall"]:
1930                                 ret.allowfax = True
1931                             if queue_data.get("allowfax") is True:
1932                                 ret.allowfax = True
1933                         except Exception:
1934                             pass
1935 
1936                         # DDM endpoints
1937                         ret.ddm_endpoints_input = {}
1938                         ret.ddm_endpoints_output = {}
1939                         if siteid in pandaEndpointMap:
1940                             for scope in pandaEndpointMap[siteid]:
1941                                 if "input" in pandaEndpointMap[siteid][scope]:
1942                                     ret.ddm_endpoints_input[scope] = pandaEndpointMap[siteid][scope]["input"]
1943                                 if "output" in pandaEndpointMap[siteid][scope]:
1944                                     ret.ddm_endpoints_output[scope] = pandaEndpointMap[siteid][scope]["output"]
1945                         else:
1946                             # empty
1947                             ret.ddm_endpoints_input["default"] = DdmSpec()
1948                             ret.ddm_endpoints_output["default"] = DdmSpec()
1949 
1950                         # initialize dictionary fields
1951                         ret.setokens_input = {}
1952                         ret.setokens_output = {}
1953                         ret.ddm_input = {}
1954                         for scope in ret.ddm_endpoints_input:
1955                             # mapping between token and endpoints
1956                             ret.setokens_input[scope] = ret.ddm_endpoints_input[scope].getTokenMap("input")
1957                             # set DDM to the default endpoint
1958                             ret.ddm_input[scope] = ret.ddm_endpoints_input[scope].getDefaultRead()
1959 
1960                         ret.ddm_output = {}
1961                         for scope in ret.ddm_endpoints_output:
1962                             # mapping between token and endpoints
1963                             ret.setokens_output[scope] = ret.ddm_endpoints_output[scope].getTokenMap("output")
1964                             # set DDM to the default endpoint
1965                             ret.ddm_output[scope] = ret.ddm_endpoints_output[scope].getDefaultWrite()
1966 
1967                         # object stores
1968                         try:
1969                             ret.objectstores = queue_data["objectstores"]
1970                         except Exception:
1971                             ret.objectstores = []
1972 
1973                         # default unified flag
1974                         ret.is_unified = False
1975 
1976                         # num slots
1977                         ret.num_slots_map = num_slots_by_site.get(siteid, {})
1978 
1979                         # scale with core count
1980                         if ret.use_per_core_attr() and ret.coreCount > 0:
1981                             if ret.maxrss:
1982                                 ret.maxrss = ret.maxrss * ret.coreCount
1983                             if ret.minrss:
1984                                 ret.minrss = ret.minrss * ret.coreCount
1985                             if ret.maxwdir:
1986                                 ret.maxwdir = ret.maxwdir * ret.coreCount
1987                             if ret.maxinputsize:
1988                                 ret.maxinputsize = ret.maxinputsize
1989 
1990                         # extra parameters for the queue
1991                         ret.extra_queue_params = queue_data.get("params", {})
1992 
1993                         # append
1994                         retList[ret.nickname] = ret
1995                     except Exception:
1996                         tmp_log.error(f"exception in queue: {traceback.format_exc()}")
1997                         continue
1998             # commit
1999             if not self._commit():
2000                 raise RuntimeError("Commit error")
2001             tmp_log.debug("done")
2002             return retList, endpoint_detailed_status_summary
2003         except Exception as e:
2004             # roll back
2005             self._rollback()
2006             # error
2007             self.dump_error_message(tmp_log)
2008             return {}, {}
2009 
2010     def getDdmEndpoints(self):
2011         """
2012         get list of ddm input endpoints
2013         """
2014         comment = " /* DBProxy.getDdmEndpoints */"
2015         tmp_log = self.create_tagged_logger(comment)
2016         tmp_log.debug(f"start")
2017 
2018         # get all ddm endpoints
2019         sql_ddm = "SELECT * FROM ATLAS_PANDA.ddm_endpoint "
2020         self.cur.arraysize = 10000
2021         self.cur.execute(f"{sql_ddm}{comment}")
2022         results_ddm = self.cur.fetchall()
2023 
2024         # extract the column names from the query
2025         column_names = [i[0].lower() for i in self.cur.description]
2026 
2027         # save the endpoints into a dictionary
2028         endpoint_dict = {}
2029         detailed_status_summary = {}
2030         for ddm_endpoint_row in results_ddm:
2031             tmp_endpoint = {}
2032             # unzip the ddm_endpoint row into a dictionary
2033             for column_name, column_val in zip(column_names, ddm_endpoint_row):
2034                 tmp_endpoint[column_name] = column_val
2035 
2036             ddm_endpoint_name = tmp_endpoint["ddm_endpoint_name"]
2037             try:
2038                 tmp_detailed_status = tmp_endpoint["detailed_status"]
2039                 if not isinstance(tmp_detailed_status, dict):
2040                     tmp_detailed_status = json.loads(tmp_detailed_status)
2041                 # make a summary of detailed status of all endpoints
2042                 if tmp_detailed_status:
2043                     for tmp_activity, tmp_status in tmp_detailed_status.items():
2044                         detailed_status_summary.setdefault(tmp_activity, {})
2045                         detailed_status_summary[tmp_activity].setdefault(tmp_status, [])
2046                         detailed_status_summary[tmp_activity][tmp_status].append(ddm_endpoint_name)
2047                 tmp_endpoint["detailed_status"] = tmp_detailed_status
2048             except Exception as e:
2049                 tmp_log.error(f"exception when decoding detailed_status for {ddm_endpoint_name}: {str(e)}")
2050                 tmp_endpoint["detailed_status"] = {}
2051             endpoint_dict[ddm_endpoint_name] = tmp_endpoint
2052 
2053         # get relationship between panda sites and ddm endpoints
2054         sql_panda_ddm = """
2055                SELECT pdr.panda_site_name, pdr.ddm_endpoint_name, pdr.is_local, de.ddm_spacetoken_name,
2056                       de.is_tape, pdr.default_read, pdr.default_write, pdr.roles, pdr.order_read, pdr.order_write,
2057                       nvl(pdr.scope, 'default') as scope, de.blacklisted_read
2058                FROM ATLAS_PANDA.panda_ddm_relation pdr, ATLAS_PANDA.ddm_endpoint de
2059                WHERE pdr.ddm_endpoint_name = de.ddm_endpoint_name
2060                """
2061         if self.backend == "mysql":
2062             sql_panda_ddm = """
2063                SELECT pdr.panda_site_name, pdr.ddm_endpoint_name, pdr.is_local, de.ddm_spacetoken_name,
2064                       de.is_tape, pdr.default_read, pdr.default_write, pdr.roles, pdr.order_read, pdr.order_write,
2065                       ifnull(pdr.scope, 'default') as scope, de.blacklisted
2066                FROM ATLAS_PANDA.panda_ddm_relation pdr, ATLAS_PANDA.ddm_endpoint de
2067                WHERE pdr.ddm_endpoint_name = de.ddm_endpoint_name
2068                """
2069 
2070         self.cur.execute(f"{sql_panda_ddm}{comment}")
2071         results_panda_ddm = self.cur.fetchall()
2072         column_names = [i[0].lower() for i in self.cur.description]
2073 
2074         # save the panda ddm relations into a dictionary
2075         panda_endpoint_map = {}
2076         for panda_ddm_row in results_panda_ddm:
2077             tmp_relation = {}
2078             for column_name, column_val in zip(column_names, panda_ddm_row):
2079                 # Default unavailable endpoint space to 0
2080                 if column_name.startswith("space_") and column_val is None:
2081                     column_val = 0
2082                 tmp_relation[column_name] = column_val
2083 
2084             # add the relations to the panda endpoint map
2085             panda_site_name = tmp_relation["panda_site_name"]
2086             scope = tmp_relation["scope"]
2087             panda_endpoint_map.setdefault(panda_site_name, {})
2088             panda_endpoint_map[panda_site_name].setdefault(scope, {})
2089 
2090             if panda_site_name not in panda_endpoint_map:
2091                 panda_endpoint_map[panda_site_name] = {
2092                     "input": DdmSpec(),
2093                     "output": DdmSpec(),
2094                 }
2095             if "read_lan" in tmp_relation["roles"] and tmp_relation["blacklisted_read"] != "Y":
2096                 panda_endpoint_map[panda_site_name][scope].setdefault("input", DdmSpec())
2097                 panda_endpoint_map[panda_site_name][scope]["input"].add(tmp_relation, endpoint_dict)
2098             if "write_lan" in tmp_relation["roles"]:
2099                 panda_endpoint_map[panda_site_name][scope].setdefault("output", DdmSpec())
2100                 panda_endpoint_map[panda_site_name][scope]["output"].add(tmp_relation, endpoint_dict)
2101 
2102         tmp_log.debug(f"done")
2103         return panda_endpoint_map, detailed_status_summary
2104 
2105     def get_cloud_list(self):
2106         """
2107         Get a list of distinct cloud names from the database.
2108         """
2109         comment = " /* DBProxy.get_cloud_list */"
2110         tmp_log = self.create_tagged_logger(comment)
2111         tmp_log.debug("start")
2112         try:
2113             with self.conn:
2114                 sql = (
2115                     f"SELECT /* use_json_type */ DISTINCT sj.data.cloud AS cloud "
2116                     f"FROM {panda_config.schemaPANDA}.schedconfig_json sj "
2117                     f"UNION "
2118                     f"SELECT 'WORLD' AS cloud "
2119                     f"FROM dual "
2120                     f"ORDER BY cloud"
2121                 )
2122                 self.cur.arraysize = 100
2123                 self.cur.execute(sql + comment)
2124                 results = self.cur.fetchall()
2125                 clouds = [result[0] for result in results]
2126 
2127             tmp_log.debug("done")
2128             return clouds
2129         except Exception:
2130             self.dump_error_message(tmp_log)
2131             self._rollback()
2132             return []
2133 
2134     # get users and groups to boost job priorities
2135     def get_dict_to_boost_job_prio(self, vo):
2136         comment = " /* DBProxy.get_dict_to_boost_job_prio */"
2137         tmp_log = self.create_tagged_logger(comment)
2138 
2139         if self.job_prio_boost_dict_update_time and datetime.datetime.now(datetime.timezone.utc).replace(
2140             tzinfo=None
2141         ) - self.job_prio_boost_dict_update_time < datetime.timedelta(minutes=15):
2142             return self.job_prio_boost_dict
2143         try:
2144             self.job_prio_boost_dict_update_time = naive_utcnow()
2145             self.job_prio_boost_dict = {}
2146             # get configs
2147             tmp_log = self.create_tagged_logger(comment)
2148             # get dicts
2149             res_dicts = self.getConfigValue("dbproxy", "USER_JOB_PRIO_BOOST_DICTS", "pandaserver")
2150             # parse list
2151             if res_dicts:
2152                 for tmp_item in res_dicts:
2153                     try:
2154                         tmp_name = tmp_item["name"]
2155                         tmp_type = tmp_item["type"]
2156                         tmp_prio = tmp_item["prio"]
2157                         tmp_expire = tmp_item.get("expire", None)
2158                         # check expiration
2159                         if tmp_expire:
2160                             tmp_expire = datetime.datetime.strptime(tmp_expire, "%Y%m%d")
2161                             if tmp_expire < naive_utcnow():
2162                                 continue
2163                         self.job_prio_boost_dict.setdefault(tmp_type, {})
2164                         self.job_prio_boost_dict[tmp_type][tmp_name] = int(tmp_prio)
2165                     except Exception as e:
2166                         tmp_log.error(str(e))
2167             tmp_log.debug(f"got {self.job_prio_boost_dict}")
2168             return self.job_prio_boost_dict
2169         except Exception:
2170             # roll back
2171             self._rollback()
2172             # error
2173             self.dump_error_message(tmp_log)
2174             return {}
2175 
2176     # set user secret
2177     def set_user_secret(self, owner, key, value):
2178         comment = " /* DBProxy.set_user_secret */"
2179         tmp_log = self.create_tagged_logger(comment, f"owner={owner} key={key}")
2180         try:
2181             # sql to check data
2182             sqlC = "SELECT data FROM ATLAS_PANDA.Secrets WHERE owner=:owner "
2183             # sql to insert dummy
2184             sqlI = "INSERT INTO ATLAS_PANDA.Secrets (owner, updated_at) " "VALUES(:owner,CURRENT_TIMESTAMP) "
2185             # sql to update data
2186             sqlU = "UPDATE ATLAS_PANDA.Secrets SET updated_at=CURRENT_TIMESTAMP,data=:data " "WHERE owner=:owner "
2187             # start transaction
2188             self.conn.begin()
2189             # check
2190             varMap = {}
2191             varMap[":owner"] = owner
2192             tmpS, tmpR = self.getClobObj(sqlC, varMap, use_commit=False)
2193             if not tmpR:
2194                 # insert dummy for new entry
2195                 self.cur.execute(sqlI + comment, varMap)
2196                 data = {}
2197             else:
2198                 data = json.loads(tmpR[0][0])
2199             # update
2200             if key is None:
2201                 # delete all
2202                 data = {}
2203             elif value is None:
2204                 # delete key
2205                 if key in data:
2206                     del data[key]
2207                 else:
2208                     file_key = f"___file___:{key}"
2209                     if file_key in data:
2210                         del data[file_key]
2211             else:
2212                 data[key] = value
2213             varMap = {}
2214             varMap[":owner"] = owner
2215             varMap[":data"] = json.dumps(data)
2216             self.cur.execute(sqlU + comment, varMap)
2217             # commit
2218             if not self._commit():
2219                 raise RuntimeError("Commit error")
2220             tmp_log.debug("done")
2221             return True, "OK"
2222         except Exception:
2223             # roll back
2224             self._rollback()
2225             # error
2226             self.dump_error_message(tmp_log)
2227             return False, "database error"
2228 
2229     # get user secrets
2230     def get_user_secrets(self, owner, keys=None, get_json=False, use_commit=True):
2231         comment = " /* DBProxy.get_user_secrets */"
2232         tmp_log = self.create_tagged_logger(comment, f"owner={owner} keys={keys}")
2233         try:
2234             # sql to get data
2235             sqlC = "SELECT data FROM ATLAS_PANDA.Secrets WHERE owner=:owner "
2236             # check
2237             varMap = {}
2238             varMap[":owner"] = owner
2239             tmpS, tmpR = self.getClobObj(sqlC, varMap, use_commit=use_commit)
2240             if not tmpR:
2241                 data = {}
2242                 if not get_json:
2243                     data = json.dumps({})
2244             else:
2245                 data = tmpR[0][0]
2246                 # return only interesting keys
2247                 if keys:
2248                     keys = set(keys.split(","))
2249                     data = json.loads(data)
2250                     for k in list(data):
2251                         if k not in keys:
2252                             data.pop(k)
2253                     if not get_json:
2254                         data = json.dumps(data)
2255                 else:
2256                     if get_json:
2257                         data = json.loads(data)
2258             tmp_log.debug(f"got data with length={len(data)}")
2259             return True, data
2260         except Exception:
2261             # error
2262             self.dump_error_message(tmp_log)
2263             return False, "database error"
2264 
2265     def configurator_write_sites(self, site_list):
2266         """
2267         Cache the CRIC site information in the PanDA database
2268         """
2269         comment = " /* DBProxy.configurator_write_sites */"
2270         tmp_log = self.create_tagged_logger(comment)
2271         tmp_log.debug("start")
2272 
2273         try:
2274             # begin transaction
2275             self.conn.begin()
2276 
2277             # get existing sites
2278             tmp_log.debug("getting existing sites")
2279             sql_get = "SELECT site_name FROM ATLAS_PANDA.site"
2280             self.cur.execute(sql_get + comment)
2281             results = self.cur.fetchall()
2282             site_name_list = list(map(lambda result: result[0], results))
2283             tmp_log.debug("finished getting existing sites")
2284 
2285             # see which sites need an update and which need to be inserted new
2286             var_map_insert = []
2287             var_map_update = []
2288             for site in site_list:
2289                 if site["site_name"] in site_name_list:
2290                     var_map_update.append(convert_dict_to_bind_vars(site))
2291                 else:
2292                     var_map_insert.append(convert_dict_to_bind_vars(site))
2293 
2294             tmp_log.debug("Updating sites")
2295             sql_update = "UPDATE ATLAS_PANDA.site set role=:role, tier_level=:tier_level WHERE site_name=:site_name"
2296             for shard in create_shards(var_map_update, 100):
2297                 self.cur.executemany(sql_update + comment, shard)
2298 
2299             tmp_log.debug("Inserting sites")
2300             sql_insert = "INSERT INTO ATLAS_PANDA.site (site_name, role, tier_level) " "VALUES(:site_name, :role, :tier_level)"
2301             for shard in create_shards(var_map_insert, 100):
2302                 self.cur.executemany(sql_insert + comment, shard)
2303 
2304             # commit
2305             if not self._commit():
2306                 raise RuntimeError("Commit error")
2307 
2308             tmp_log.debug("Done")
2309             return 0, None
2310 
2311         except Exception:
2312             self._rollback()
2313             self.dump_error_message(tmp_log)
2314             return -1, None
2315 
2316     def configurator_write_panda_sites(self, panda_site_list):
2317         comment = " /* DBProxy.configurator_write_panda_sites */"
2318         tmp_log = self.create_tagged_logger(comment)
2319         tmp_log.debug("start")
2320 
2321         try:
2322             # begin transaction
2323             self.conn.begin()
2324 
2325             # get existing panda sites
2326             tmp_log.debug("getting existing panda sites")
2327             sql_get = "SELECT panda_site_name FROM ATLAS_PANDA.panda_site"
2328             self.cur.execute(sql_get + comment)
2329             results = self.cur.fetchall()
2330             panda_site_name_list = list(map(lambda result: result[0], results))
2331             tmp_log.debug("finished getting existing panda sites")
2332 
2333             # see which sites need an update and which need to be inserted new
2334             var_map_insert = []
2335             var_map_update = []
2336             for panda_site in panda_site_list:
2337                 if panda_site["panda_site_name"] in panda_site_name_list:
2338                     var_map_update.append(convert_dict_to_bind_vars(panda_site))
2339                 else:
2340                     var_map_insert.append(convert_dict_to_bind_vars(panda_site))
2341 
2342             tmp_log.debug("Updating panda sites")
2343             sql_update = "UPDATE ATLAS_PANDA.panda_site set site_name=:site_name WHERE panda_site_name=:panda_site_name "
2344             for shard in create_shards(var_map_update, 100):
2345                 self.cur.executemany(sql_update + comment, shard)
2346 
2347             tmp_log.debug("Inserting panda sites")
2348             sql_insert = "INSERT INTO ATLAS_PANDA.panda_site (panda_site_name, site_name) " "VALUES(:panda_site_name, :site_name)"
2349             for shard in create_shards(var_map_insert, 100):
2350                 self.cur.executemany(sql_insert + comment, shard)
2351 
2352             # commit
2353             if not self._commit():
2354                 raise RuntimeError("Commit error")
2355 
2356             tmp_log.debug("Done")
2357             return 0, None
2358 
2359         except Exception:
2360             self._rollback()
2361             self.dump_error_message(tmp_log)
2362             return -1, None
2363 
2364     def configurator_write_ddm_endpoints(self, ddm_endpoint_list):
2365         comment = " /* DBProxy.configurator_write_ddm_endpoints */"
2366         tmp_log = self.create_tagged_logger(comment)
2367         tmp_log.debug("start")
2368 
2369         try:
2370             # begin transaction
2371             self.conn.begin()
2372 
2373             # get existing ddm endpoints
2374             tmp_log.debug("getting existing ddm endpoints")
2375             sql_get = "SELECT ddm_endpoint_name FROM ATLAS_PANDA.ddm_endpoint"
2376             self.cur.execute(sql_get + comment)
2377             results = self.cur.fetchall()
2378             ddm_endpoint_name_list = list(map(lambda result: result[0], results))
2379             tmp_log.debug("finished getting existing ddm endpoints")
2380 
2381             # see which sites need an update and which need to be inserted new
2382             var_map_insert = []
2383             var_map_update = []
2384             for ddm_endpoint in ddm_endpoint_list:
2385                 if ddm_endpoint["ddm_endpoint_name"] in ddm_endpoint_name_list:
2386                     var_map_update.append(convert_dict_to_bind_vars(ddm_endpoint))
2387                 else:
2388                     var_map_insert.append(convert_dict_to_bind_vars(ddm_endpoint))
2389 
2390             tmp_log.debug("Updating ddm endpoints")
2391             sql_update = (
2392                 "UPDATE ATLAS_PANDA.ddm_endpoint set "
2393                 "site_name=:site_name, ddm_spacetoken_name=:ddm_spacetoken_name, type=:type, is_tape=:is_tape, "
2394                 "blacklisted=:blacklisted, blacklisted_write=:blacklisted_write, blacklisted_read=:blacklisted_read, detailed_status=:detailed_status, "
2395                 "space_used=:space_used, space_free=:space_free, space_total=:space_total, space_expired=:space_expired, space_timestamp=:space_timestamp "
2396                 "WHERE ddm_endpoint_name=:ddm_endpoint_name"
2397             )
2398             for shard in create_shards(var_map_update, 100):
2399                 self.cur.executemany(sql_update + comment, shard)
2400 
2401             tmp_log.debug("Inserting ddm endpoints")
2402             sql_insert = (
2403                 "INSERT INTO ATLAS_PANDA.ddm_endpoint (ddm_endpoint_name, site_name, ddm_spacetoken_name, type, is_tape, "
2404                 "blacklisted, blacklisted_write, blacklisted_read,  detailed_status, "
2405                 "space_used, space_free, space_total, space_expired, space_timestamp) "
2406                 "VALUES(:ddm_endpoint_name, :site_name, :ddm_spacetoken_name, :type, :is_tape, "
2407                 ":blacklisted, :blacklisted_write, :blacklisted_read, :detailed_status, "
2408                 ":space_used, :space_free, :space_total, :space_expired, :space_timestamp)"
2409             )
2410             for shard in create_shards(var_map_insert, 100):
2411                 self.cur.executemany(sql_insert + comment, shard)
2412 
2413             # commit
2414             if not self._commit():
2415                 raise RuntimeError("Commit error")
2416 
2417             tmp_log.debug("Done")
2418             return 0, None
2419 
2420         except Exception:
2421             self._rollback()
2422             self.dump_error_message(tmp_log)
2423             return -1, None
2424 
2425     def configurator_write_panda_ddm_relations(self, relation_list):
2426         comment = " /* DBProxy.configurator_write_panda_ddm_relations */"
2427         tmp_log = self.create_tagged_logger(comment)
2428         tmp_log.debug("start")
2429 
2430         try:
2431             # begin transaction
2432             self.conn.begin()
2433 
2434             # Reset the relations. Important to do this inside the transaction
2435             tmp_log.debug("Deleting existing panda ddm relations")
2436             sql_delete = "DELETE FROM ATLAS_PANDA.panda_ddm_relation"
2437             self.cur.execute(sql_delete + comment)
2438 
2439             var_map_insert = []
2440             for relation in relation_list:
2441                 var_map_insert.append(convert_dict_to_bind_vars(relation))
2442 
2443             tmp_log.debug("Inserting panda ddm relations")
2444             sql_insert = (
2445                 "INSERT INTO ATLAS_PANDA.panda_ddm_relation (panda_site_name, ddm_endpoint_name, roles, "
2446                 "is_local, order_read, order_write, default_read, default_write, scope) "
2447                 "VALUES(:panda_site_name, :ddm_endpoint_name, :roles, "
2448                 ":is_local, :order_read, :order_write, :default_read, :default_write, :scope)"
2449             )
2450             for shard in create_shards(var_map_insert, 100):
2451                 self.cur.executemany(sql_insert + comment, shard)
2452 
2453             # commit
2454             if not self._commit():
2455                 raise RuntimeError("Commit error")
2456 
2457             tmp_log.debug("Done")
2458             return 0, None
2459 
2460         except Exception:
2461             self._rollback()
2462             self.dump_error_message(tmp_log)
2463             return -1, None
2464 
2465     def configurator_read_sites(self):
2466         comment = " /* DBProxy.configurator_read_sites */"
2467         tmp_log = self.create_tagged_logger(comment)
2468         tmp_log.debug("start")
2469 
2470         try:
2471             tmp_log.debug("getting existing panda sites")
2472             sql_get = "SELECT site_name FROM ATLAS_PANDA.site"
2473             self.cur.execute(sql_get + comment)
2474             results = self.cur.fetchall()
2475             site_names = set(map(lambda result: result[0], results))
2476             tmp_log.debug("finished getting site names in configurator")
2477 
2478             tmp_log.debug("Done")
2479             return site_names
2480 
2481         except Exception:
2482             self.dump_error_message(tmp_log)
2483             return set()
2484 
2485     def configurator_read_panda_sites(self):
2486         comment = " /* DBProxy.configurator_read_sites */"
2487         tmp_log = self.create_tagged_logger(comment)
2488         tmp_log.debug("start")
2489 
2490         try:
2491             tmp_log.debug("getting existing panda sites")
2492             sql_get = "SELECT panda_site_name FROM ATLAS_PANDA.panda_site"
2493             self.cur.execute(sql_get + comment)
2494             results = self.cur.fetchall()
2495             panda_site_names = set(map(lambda result: result[0], results))
2496             tmp_log.debug("finished getting panda site names in configurator")
2497 
2498             tmp_log.debug("Done")
2499             return panda_site_names
2500 
2501         except Exception:
2502             self.dump_error_message(tmp_log)
2503             return set()
2504 
2505     def configurator_read_ddm_endpoints(self):
2506         comment = " /* DBProxy.configurator_read_ddm_endpoints */"
2507         tmp_log = self.create_tagged_logger(comment)
2508         tmp_log.debug("start")
2509 
2510         try:
2511             tmp_log.debug("getting existing ddm endpoints")
2512             sql_get = "SELECT ddm_endpoint_name FROM ATLAS_PANDA.ddm_endpoint"
2513             self.cur.execute(sql_get + comment)
2514             results = self.cur.fetchall()
2515             ddm_endpoint_names = set(map(lambda result: result[0], results))
2516             tmp_log.debug("finished getting ddm endpoint names in configurator")
2517 
2518             tmp_log.debug("Done")
2519             return ddm_endpoint_names
2520 
2521         except Exception:
2522             self.dump_error_message(tmp_log)
2523             return set()
2524 
2525     def configurator_read_cric_sites(self):
2526         comment = " /* DBProxy.configurator_read_cric_sites */"
2527         tmp_log = self.create_tagged_logger(comment)
2528         tmp_log.debug("start")
2529 
2530         try:
2531             tmp_log.debug("getting existing CRIC sites")
2532             sql_get = "SELECT /* use_json_type */ distinct scj.data.atlas_site FROM ATLAS_PANDA.schedconfig_json scj"
2533             self.cur.arraysize = 1000
2534             self.cur.execute(sql_get + comment)
2535             results = self.cur.fetchall()
2536             site_names = set(map(lambda result: result[0], results))
2537             tmp_log.debug("finished getting CRIC sites")
2538 
2539             tmp_log.debug("Done")
2540             return site_names
2541 
2542         except Exception:
2543             self.dump_error_message(tmp_log)
2544             return set()
2545 
2546     def configurator_read_cric_panda_sites(self):
2547         comment = " /* DBProxy.configurator_read_cric_panda_sites */"
2548         tmp_log = self.create_tagged_logger(comment)
2549         tmp_log.debug("start")
2550 
2551         try:
2552             tmp_log.debug("getting existing CRIC panda queues")
2553             sql_get = "SELECT panda_queue FROM ATLAS_PANDA.schedconfig_json"
2554             self.cur.execute(sql_get + comment)
2555             results = self.cur.fetchall()
2556             panda_site_names = set(map(lambda result: result[0], results))
2557             tmp_log.debug("finished getting CRIC panda queues")
2558 
2559             tmp_log.debug("Done")
2560             return panda_site_names
2561 
2562         except Exception:
2563             self._rollback()
2564             self.dump_error_message(tmp_log)
2565             return set()
2566 
2567     def configurator_delete_sites(self, sites_to_delete):
2568         """
2569         Delete sites and all dependent entries (panda_sites, ddm_endpoints, panda_ddm_relations).
2570         Deletion of dependent entries is done through cascade definition in models
2571         """
2572         comment = " /* DBProxy.configurator_delete_sites */"
2573         tmp_log = self.create_tagged_logger(comment)
2574         tmp_log.debug("start")
2575 
2576         if not sites_to_delete:
2577             tmp_log.debug("nothing to delete")
2578             return
2579 
2580         var_map_list = list(map(lambda site_name: {":site_name": site_name}, sites_to_delete))
2581 
2582         try:
2583             # begin transaction
2584             self.conn.begin()
2585             tmp_log.debug(f"deleting sites: {sites_to_delete}")
2586             sql_update = "DELETE FROM ATLAS_PANDA.site WHERE site_name=:site_name"
2587             self.cur.executemany(sql_update + comment, var_map_list)
2588             tmp_log.debug("done deleting sites")
2589 
2590             # commit
2591             if not self._commit():
2592                 raise RuntimeError("Commit error")
2593 
2594             tmp_log.debug("Done")
2595             return 0, None
2596 
2597         except Exception:
2598             self._rollback()
2599             self.dump_error_message(tmp_log)
2600             return -1, None
2601 
2602     def configurator_delete_panda_sites(self, panda_sites_to_delete):
2603         """
2604         Delete PanDA sites and dependent entries in panda_ddm_relations
2605         """
2606         comment = " /* DBProxy.configurator_delete_panda_sites */"
2607         tmp_log = self.create_tagged_logger(comment)
2608         tmp_log.debug("start")
2609 
2610         if not panda_sites_to_delete:
2611             tmp_log.debug("nothing to delete")
2612             return
2613 
2614         var_map_list = list(
2615             map(
2616                 lambda panda_site_name: {":panda_site_name": panda_site_name},
2617                 panda_sites_to_delete,
2618             )
2619         )
2620 
2621         try:
2622             # begin transaction
2623             self.conn.begin()
2624             tmp_log.debug(f"deleting panda sites: {panda_sites_to_delete}")
2625             sql_update = "DELETE FROM ATLAS_PANDA.panda_site WHERE panda_site_name=:panda_site_name"
2626             self.cur.executemany(sql_update + comment, var_map_list)
2627             tmp_log.debug("done deleting panda sites")
2628 
2629             # commit
2630             if not self._commit():
2631                 raise RuntimeError("Commit error")
2632 
2633             tmp_log.debug("Done")
2634             return 0, None
2635 
2636         except Exception:
2637             self._rollback()
2638             self.dump_error_message(tmp_log)
2639             return -1, None
2640 
2641     def configurator_delete_ddm_endpoints(self, ddm_endpoints_to_delete):
2642         """
2643         Delete DDM endpoints dependent entries in panda_ddm_relations
2644         """
2645         comment = " /* DBProxy.configurator_delete_ddm_endpoints */"
2646         tmp_log = self.create_tagged_logger(comment)
2647         tmp_log.debug("start")
2648 
2649         if not ddm_endpoints_to_delete:
2650             tmp_log.debug("nothing to delete")
2651             return
2652 
2653         var_map_list = list(
2654             map(
2655                 lambda ddm_endpoint_name: {":ddm_endpoint_name": ddm_endpoint_name},
2656                 ddm_endpoints_to_delete,
2657             )
2658         )
2659 
2660         try:
2661             # begin transaction
2662             self.conn.begin()
2663             tmp_log.debug(f"deleting ddm endpoints: {ddm_endpoints_to_delete}")
2664             sql_update = "DELETE FROM ATLAS_PANDA.ddm_endpoint WHERE ddm_endpoint_name=:ddm_endpoint_name"
2665             self.cur.executemany(sql_update + comment, var_map_list)
2666             tmp_log.debug("done deleting ddm endpoints")
2667 
2668             # commit
2669             if not self._commit():
2670                 raise RuntimeError("Commit error")
2671 
2672             tmp_log.debug("Done")
2673             return 0, None
2674 
2675         except Exception:
2676             self._rollback()
2677             self.dump_error_message(tmp_log)
2678             return -1, None
2679 
2680     def carbon_write_region_emissions(self, emissions):
2681         comment = " /* DBProxy.carbon_write_regional_emissions */"
2682         tmp_log = self.create_tagged_logger(comment)
2683         tmp_log.debug("start")
2684 
2685         try:
2686             # begin transaction
2687             self.conn.begin()
2688 
2689             tmp_log.debug("Deleting old entries")
2690             sql_delete = "DELETE FROM ATLAS_PANDA.CARBON_REGION_EMISSIONS " "WHERE timestamp < sysdate - interval '10' day"
2691             self.cur.execute(sql_delete + comment)
2692 
2693             tmp_log.debug("Inserting emissions by region")
2694 
2695             sql_insert = (
2696                 "INSERT /*+ ignore_row_on_dupkey_index (emissions(region, timestamp)) */ "
2697                 "INTO ATLAS_PANDA.CARBON_REGION_EMISSIONS emissions (REGION, TIMESTAMP, VALUE) "
2698                 "VALUES(:region, :timestamp, :value)"
2699             )
2700             for shard in create_shards(emissions, 100):
2701                 self.cur.executemany(sql_insert + comment, shard)
2702 
2703             # commit
2704             if not self._commit():
2705                 raise RuntimeError("Commit error")
2706 
2707             tmp_log.debug("Done")
2708             return 0, None
2709 
2710         except Exception:
2711             self._rollback()
2712             self.dump_error_message(tmp_log)
2713             return -1, None
2714 
2715     def carbon_aggregate_emissions(self):
2716         comment = " /* DBProxy.carbon_aggregate_emissions */"
2717         tmp_log = self.create_tagged_logger(comment)
2718         tmp_log.debug("start")
2719 
2720         try:
2721             # begin transaction
2722             self.conn.begin()
2723 
2724             # get the percentage each region is contributing to grid computing power
2725             sql_stat = (
2726                 "WITH tmp_total(total_hs) AS "
2727                 "(SELECT sum(hs) "
2728                 "FROM ATLAS_PANDA.jobs_share_stats) "
2729                 "SELECT scj.data.region, sum(jss.hs)/tmp_total.total_hs "
2730                 "FROM ATLAS_PANDA.jobs_share_stats jss, ATLAS_PANDA.schedconfig_json scj, tmp_total "
2731                 "WHERE jss.computingsite = scj.panda_queue "
2732                 "AND scj.data.region IS NOT NULL AND scj.data.region != 'GRID'"
2733                 "GROUP BY scj.data.region, tmp_total.total_hs "
2734             )
2735 
2736             region_dic = {}
2737             self.cur.arraysize = 1000
2738             stats_raw = self.cur.execute(sql_stat + comment)
2739             for entry in stats_raw:
2740                 region, per_cent = entry
2741                 region_dic.setdefault(region, {"emissions": 0, "per_cent": 0})
2742                 region_dic[region]["per_cent"] = per_cent
2743 
2744             # get the last emission values for each region
2745             sql_last = (
2746                 "WITH top_ts(timestamp, region) AS "
2747                 "(SELECT max(timestamp), region "
2748                 "FROM atlas_panda.carbon_region_emissions "
2749                 "GROUP BY region) "
2750                 "SELECT cre.region, cre.value, cre.timestamp "
2751                 "FROM atlas_panda.carbon_region_emissions cre, top_ts "
2752                 "WHERE cre.timestamp = top_ts.timestamp AND cre.region = top_ts.region"
2753             )
2754 
2755             last_emission_values = self.cur.execute(sql_last + comment)
2756             for entry in last_emission_values:
2757                 region, value, ts = entry
2758                 region_dic.setdefault(region, {"emissions": 0, "per_cent": 0})
2759                 region_dic[region]["emissions"] = value
2760 
2761             # calculate the grid average emissions
2762             average_emissions = 0
2763             for region in region_dic:
2764                 tmp_log.debug(f"Region {region} with per_cent {region_dic[region]['per_cent']} and emissions {region_dic[region]['emissions']}")
2765                 try:
2766                     average_emissions = average_emissions + region_dic[region]["per_cent"] * region_dic[region]["emissions"]
2767                 except Exception:
2768                     tmp_log.debug(f"Skipped Region {region} with per_cent {region_dic[region]['per_cent']} and emissions {region_dic[region]['emissions']}")
2769 
2770             # store the average emissions
2771             tmp_log.debug(f"The grid co2 emissions were averaged to {average_emissions}")
2772             utc_now = naive_utcnow()
2773             var_map = {
2774                 ":region": "GRID",
2775                 ":timestamp": utc_now,
2776                 ":value": average_emissions,
2777             }
2778 
2779             sql_insert = "INSERT INTO ATLAS_PANDA.carbon_region_emissions (region, timestamp, value) " "VALUES (:region, :timestamp, :value)"
2780             self.cur.execute(sql_insert + comment, var_map)
2781 
2782             # commit
2783             if not self._commit():
2784                 raise RuntimeError("Commit error")
2785 
2786             tmp_log.debug("Done")
2787             return 0, None
2788 
2789         except Exception:
2790             self._rollback()
2791             self.dump_error_message(tmp_log)
2792             return -1, None
2793 
2794     # check quota
2795     def checkQuota(self, dn):
2796         comment = " /* DBProxy.checkQuota */"
2797         tmp_log = self.create_tagged_logger(comment, f"dn={dn}")
2798         tmp_log.debug(f"start")
2799         try:
2800             # set autocommit on
2801             self.conn.begin()
2802             # select
2803             name = CoreUtils.clean_user_id(dn)
2804             sql = "SELECT cpua1, cpua7, cpua30, quotaa1, quotaa7, quotaa30 FROM ATLAS_PANDAMETA.users WHERE name=:name"
2805             varMap = {}
2806             varMap[":name"] = name
2807             self.cur.arraysize = 10
2808             self.cur.execute(sql + comment, varMap)
2809             res = self.cur.fetchall()
2810             # commit
2811             if not self._commit():
2812                 raise RuntimeError("Commit error")
2813             weight = 0.0
2814             if res is not None and len(res) != 0:
2815                 item = res[0]
2816                 # cpu and quota
2817                 cpu1 = item[0]
2818                 cpu7 = item[1]
2819                 cpu30 = item[2]
2820                 if item[3] in [0, None]:
2821                     quota1 = 0
2822                 else:
2823                     quota1 = item[3] * 3600
2824                 if item[4] in [0, None]:
2825                     quota7 = 0
2826                 else:
2827                     quota7 = item[4] * 3600
2828                 if item[5] in [0, None]:
2829                     quota30 = 0
2830                 else:
2831                     quota30 = item[5] * 3600
2832                 # CPU usage
2833                 if cpu1 is None:
2834                     cpu1 = 0.0
2835                 # weight
2836                 if quota1 > 0:
2837                     weight = float(cpu1) / float(quota1)
2838                 # not exceeded the limit
2839                 weight = 0.0
2840                 tmp_log.debug(f"Weight:{weight} Quota:{quota1} CPU:{cpu1}")
2841             else:
2842                 tmp_log.debug(f"cannot found")
2843             return weight
2844         except Exception:
2845             self.dump_error_message(tmp_log)
2846             # roll back
2847             self._rollback()
2848             return 0.0
2849 
2850     # check if superuser
2851     def isSuperUser(self, userName):
2852         comment = " /* DBProxy.isSuperUser */"
2853         tmp_log = self.create_tagged_logger(comment, f"userName={userName}")
2854         tmp_log.debug("start")
2855         try:
2856             isSU = False
2857             isSG = False
2858             # start transaction
2859             self.conn.begin()
2860             # check gridpref
2861             name = CoreUtils.clean_user_id(userName)
2862             sql = "SELECT gridpref FROM ATLAS_PANDAMETA.users WHERE name=:name"
2863             varMap = {}
2864             varMap[":name"] = name
2865             self.cur.arraysize = 10
2866             self.cur.execute(sql + comment, varMap)
2867             res = self.cur.fetchone()
2868             # commit
2869             if not self._commit():
2870                 raise RuntimeError("Commit error")
2871             # check if s in gridpref
2872             if res is not None:
2873                 (gridpref,) = res
2874                 if gridpref is not None:
2875                     if PrioUtil.PERMISSION_SUPER_USER in gridpref:
2876                         isSU = True
2877                     if PrioUtil.PERMISSION_SUPER_GROUP in gridpref:
2878                         isSG = True
2879             tmp_log.debug(f"done with superUser={isSU} superGroup={isSG}")
2880             return isSU, isSG
2881         except Exception:
2882             # roll back
2883             self._rollback()
2884             # error
2885             self.dump_error_message(tmp_log)
2886             return False, False
2887 
2888     # get serialize JobID and status
2889     def getUserParameter(self, dn, jobID, jobsetID):
2890         comment = " /* DBProxy.getUserParameter */"
2891         tmp_log = self.create_tagged_logger(comment, f"dn={dn} jobID={jobID} jobsetID={jobsetID}")
2892         try:
2893             # set initial values
2894             retStatus = True
2895             if jobsetID == -1:
2896                 # generate new jobsetID
2897                 retJobsetID = jobID
2898                 # new jobID = 1 + new jobsetID
2899                 retJobID = retJobsetID + 1
2900             elif jobsetID in ["NULL", None, 0]:
2901                 # no jobsetID
2902                 retJobsetID = None
2903                 retJobID = jobID
2904             else:
2905                 # user specified jobsetID
2906                 retJobsetID = jobsetID
2907                 retJobID = jobID
2908             # set autocommit on
2909             self.conn.begin()
2910             # select
2911             name = CoreUtils.clean_user_id(dn)
2912             sql = "SELECT jobid,status FROM ATLAS_PANDAMETA.users WHERE name=:name "
2913             sql += "FOR UPDATE "
2914             sqlAdd = "INSERT INTO ATLAS_PANDAMETA.users "
2915             sqlAdd += "(ID,NAME,LASTMOD,FIRSTJOB,LATESTJOB,CACHETIME,NCURRENT,JOBID) "
2916             sqlAdd += "VALUES(ATLAS_PANDAMETA.USERS_ID_SEQ.nextval,:name,"
2917             sqlAdd += "CURRENT_DATE,CURRENT_DATE,CURRENT_DATE,CURRENT_DATE,0,1) "
2918             varMap = {}
2919             varMap[":name"] = name
2920             self.cur.execute(sql + comment, varMap)
2921             self.cur.arraysize = 10
2922             res = self.cur.fetchall()
2923             # insert if no record
2924             if res is None or len(res) == 0:
2925                 try:
2926                     self.cur.execute(sqlAdd + comment, varMap)
2927                     retI = self.cur.rowcount
2928                     tmp_log.debug(f"inserted new row with {retI}")
2929                     # emulate DB response
2930                     res = [[1, ""]]
2931                 except Exception:
2932                     self.dump_error_message(tmp_log)
2933             if res is not None and len(res) != 0:
2934                 item = res[0]
2935                 # JobID in DB
2936                 dbJobID = item[0]
2937                 # check status
2938                 if item[1] in ["disabled"]:
2939                     retStatus = False
2940                 # use larger JobID
2941                 if dbJobID >= int(retJobID) or (jobsetID == -1 and dbJobID >= int(retJobsetID)):
2942                     if jobsetID == -1:
2943                         # generate new jobsetID = 1 + exsiting jobID
2944                         retJobsetID = dbJobID + 1
2945                         # new jobID = 1 + new jobsetID
2946                         retJobID = retJobsetID + 1
2947                     else:
2948                         # new jobID = 1 + existing jobID
2949                         retJobID = dbJobID + 1
2950                 # update DB
2951                 varMap = {}
2952                 varMap[":name"] = name
2953                 varMap[":jobid"] = retJobID
2954                 sql = "UPDATE ATLAS_PANDAMETA.users SET jobid=:jobid WHERE name=:name"
2955                 self.cur.execute(sql + comment, varMap)
2956                 tmp_log.debug(f"set JobID={retJobID}")
2957             # commit
2958             if not self._commit():
2959                 raise RuntimeError("Commit error")
2960             tmp_log.debug(f"return JobID={retJobID} JobsetID={retJobsetID} Status={retStatus}")
2961             return retJobID, retJobsetID, retStatus
2962         except Exception:
2963             self.dump_error_message(tmp_log)
2964             # roll back
2965             self._rollback()
2966             return retJobID, retJobsetID, retStatus
2967 
2968     # check ban user
2969     def checkBanUser(self, dn, sourceLabel, jediCheck=False):
2970         comment = " /* DBProxy.checkBanUser */"
2971         try:
2972             methodName = "checkBanUser"
2973             # set initial values
2974             retStatus = True
2975             name = CoreUtils.clean_user_id(dn)
2976             tmp_log = self.create_tagged_logger(comment, f"name={name}")
2977             tmp_log.debug(f"start dn={dn} label={sourceLabel} jediCheck={jediCheck}")
2978             # set autocommit on
2979             self.conn.begin()
2980             # select
2981             sql = "SELECT status,dn FROM ATLAS_PANDAMETA.users WHERE name=:name"
2982             varMap = {}
2983             varMap[":name"] = name
2984             self.cur.execute(sql + comment, varMap)
2985             self.cur.arraysize = 10
2986             res = self.cur.fetchone()
2987             if res is not None:
2988                 # check status
2989                 tmpStatus, dnInDB = res
2990                 if tmpStatus in ["disabled"]:
2991                     retStatus = False
2992                 elif jediCheck and (dnInDB in ["", None] or dnInDB != dn):
2993                     # add DN
2994                     sqlUp = "UPDATE ATLAS_PANDAMETA.users SET dn=:dn WHERE name=:name "
2995                     varMap = {}
2996                     varMap[":name"] = name
2997                     varMap[":dn"] = dn
2998                     self.cur.execute(sqlUp + comment, varMap)
2999                     retI = self.cur.rowcount
3000                     tmp_log.debug(f"update DN with Status={retI}")
3001                     if retI != 1:
3002                         retStatus = 1
3003             else:
3004                 # new user
3005                 if jediCheck:
3006                     name = CoreUtils.clean_user_id(dn)
3007                     sqlAdd = "INSERT INTO ATLAS_PANDAMETA.users "
3008                     sqlAdd += "(ID,NAME,DN,LASTMOD,FIRSTJOB,LATESTJOB,CACHETIME,NCURRENT,JOBID) "
3009                     sqlAdd += "VALUES(ATLAS_PANDAMETA.USERS_ID_SEQ.nextval,:name,:dn,"
3010                     sqlAdd += "CURRENT_DATE,CURRENT_DATE,CURRENT_DATE,CURRENT_DATE,0,1) "
3011                     varMap = {}
3012                     varMap[":name"] = name
3013                     varMap[":dn"] = dn
3014                     self.cur.execute(sqlAdd + comment, varMap)
3015                     retI = self.cur.rowcount
3016                     tmp_log.debug(f"inserted new row with Status={retI}")
3017                     if retI != 1:
3018                         retStatus = 2
3019             # commit
3020             if not self._commit():
3021                 raise RuntimeError("Commit error")
3022             tmp_log.debug(f"done with Status={retStatus}")
3023             return retStatus
3024         except Exception:
3025             # roll back
3026             self._rollback()
3027             # error
3028             self.dump_error_message(tmp_log)
3029             return retStatus
3030 
3031     # get email address for a user
3032     def getEmailAddr(self, name, withDN=False, withUpTime=False):
3033         comment = " /* DBProxy.getEmailAddr */"
3034         tmp_log = self.create_tagged_logger(comment)
3035         tmp_log.debug(f"get email for {name}")
3036         # sql
3037         if withDN:
3038             failedRet = "", "", None
3039             sql = "SELECT email,dn,location FROM ATLAS_PANDAMETA.users WHERE name=:name"
3040         elif withUpTime:
3041             failedRet = "", None
3042             sql = "SELECT email,location FROM ATLAS_PANDAMETA.users WHERE name=:name"
3043         else:
3044             failedRet = ""
3045             sql = "SELECT email FROM ATLAS_PANDAMETA.users WHERE name=:name"
3046         try:
3047             # set autocommit on
3048             self.conn.begin()
3049             # select
3050             varMap = {}
3051             varMap[":name"] = name
3052             self.cur.execute(sql + comment, varMap)
3053             self.cur.arraysize = 10
3054             res = self.cur.fetchall()
3055             # commit
3056             if not self._commit():
3057                 raise RuntimeError("Commit error")
3058             if res is not None and len(res) != 0:
3059                 if withDN or withUpTime:
3060                     if withDN:
3061                         email, dn, upTime = res[0]
3062                     else:
3063                         email, upTime = res[0]
3064                     # convert time
3065                     try:
3066                         upTime = datetime.datetime.strptime(upTime, "%Y-%m-%d %H:%M:%S")
3067                     except Exception:
3068                         upTime = None
3069                     if withDN:
3070                         return email, dn, upTime
3071                     else:
3072                         return email, upTime
3073                 else:
3074                     return res[0][0]
3075             # return empty string
3076             return failedRet
3077         except Exception:
3078             self.dump_error_message(tmp_log)
3079             # roll back
3080             self._rollback()
3081             return failedRet
3082 
3083     # set email address for a user
3084     def setEmailAddr(self, userName, emailAddr):
3085         comment = " /* DBProxy.setEmailAddr */"
3086         tmp_log = self.create_tagged_logger(comment)
3087         tmp_log.debug(f"{userName} to {emailAddr}")
3088         # sql
3089         sql = "UPDATE ATLAS_PANDAMETA.users SET email=:email,location=:uptime WHERE name=:name "
3090         try:
3091             # set autocommit on
3092             self.conn.begin()
3093             # set
3094             varMap = {}
3095             varMap[":name"] = userName
3096             varMap[":email"] = emailAddr
3097             varMap[":uptime"] = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
3098             self.cur.execute(sql + comment, varMap)
3099             # commit
3100             if not self._commit():
3101                 raise RuntimeError("Commit error")
3102             return True
3103         except Exception:
3104             # roll back
3105             self._rollback()
3106             # error
3107             self.dump_error_message(tmp_log)
3108             return False
3109 
3110     # get ban users
3111     def get_ban_users(self):
3112         comment = " /* DBProxy.get_ban_user */"
3113         tmp_log = self.create_tagged_logger(comment)
3114         tmp_log.debug("start")
3115         # sql
3116         sql = "SELECT name FROM ATLAS_PANDAMETA.users WHERE status=:status "
3117         try:
3118             # set autocommit on
3119             self.conn.begin()
3120             varMap = {}
3121             varMap[":status"] = "disabled"
3122             self.cur.execute(sql + comment, varMap)
3123             self.cur.arraysize = 10
3124             res = self.cur.fetchall()
3125             # commit
3126             if not self._commit():
3127                 raise RuntimeError("Commit error")
3128             retVal = {name: False for name, in res}
3129             tmp_log.debug(f"got {retVal}")
3130             return True, retVal
3131         except Exception:
3132             # roll back
3133             self._rollback()
3134             # error
3135             self.dump_error_message(tmp_log)
3136             return False, None
3137 
3138     # register token key
3139     def register_token_key(self, client_name: str, lifetime: int) -> bool:
3140         """
3141         Register token key for a client with a lifetime and delete expired tokens
3142 
3143         :param client_name: client name who owns the token key
3144         :param lifetime: lifetime of the token key in hours
3145 
3146         :return: True if succeeded. False otherwise
3147         """
3148         comment = " /* DBProxy.register_token_key */"
3149         tmp_log = self.create_tagged_logger(comment, f"client_name={client_name}")
3150         tmp_log.debug("start")
3151         try:
3152             # set autocommit on
3153             self.conn.begin()
3154             time_now = naive_utcnow()
3155             expire_at = time_now + datetime.timedelta(hours=lifetime)
3156             # check if a new key was registered recently
3157             sql = f"SELECT 1 FROM {panda_config.schemaMETA}.proxykey WHERE dn=:dn AND expires>:limit "
3158             var_map = {":dn": client_name, ":limit": expire_at - datetime.timedelta(hours=1)}
3159             self.cur.execute(sql + comment, var_map)
3160             res = self.cur.fetchone()
3161             if res:
3162                 tmp_log.debug("skip as a new key was registered recently")
3163             else:
3164                 # get max ID
3165                 max_id = None
3166                 sql = "SELECT MAX(ID) FROM ATLAS_PANDAMETA.proxykey "
3167                 self.cur.execute(sql + comment, {})
3168                 res = self.cur.fetchone()
3169                 if res:
3170                     (max_id,) = res
3171                 if max_id is None:
3172                     max_id = 0
3173                 max_id += 1
3174                 max_id %= 10000000
3175                 # register a key
3176                 sql = (
3177                     f"INSERT INTO {panda_config.schemaMETA}.proxykey (ID,DN,CREDNAME,CREATED,EXPIRES,ORIGIN,MYPROXY) "
3178                     "VALUES(:id,:dn,:credname,:created,:expires,:origin,:myproxy) "
3179                 )
3180                 var_map = {
3181                     ":id": max_id,
3182                     ":dn": client_name,
3183                     ":credname": str(uuid.uuid4()),
3184                     ":created": time_now,
3185                     ":expires": expire_at,
3186                     ":origin": "panda",
3187                     ":myproxy": "NA",
3188                 }
3189                 try:
3190                     self.cur.execute(sql + comment, var_map)
3191                     tmp_log.debug(f"registered a new key with id={max_id}")
3192                 except Exception as e:
3193                     # ignore ID duplication error
3194                     tmp_log.debug(f"ignoring registration failure with {str(e)}")
3195             # delete obsolete keys
3196             sql = "DELETE FROM ATLAS_PANDAMETA.proxykey WHERE expires<:limit "
3197             var_map = {":limit": time_now}
3198             self.cur.execute(sql + comment, var_map)
3199             # commit
3200             if not self._commit():
3201                 raise RuntimeError("Commit error")
3202             # return True
3203             tmp_log.debug("done")
3204             return True
3205         except Exception:
3206             # roll back
3207             self._rollback()
3208             # dump error
3209             self.dump_error_message(tmp_log)
3210             return False
3211 
3212     # Configurator function: inserts data into the network matrix
3213     def insertNetworkMatrixData(self, data):
3214         comment = " /* DBProxy.insertNetworkMatrixData */"
3215         tmp_log = self.create_tagged_logger(comment)
3216         tmp_log.debug("start")
3217 
3218         # For performance reasons we will insert the data into a temporary table
3219         # and then merge this data into the permanent table.
3220 
3221         sql_insert = """
3222         INSERT INTO ATLAS_PANDA.network_matrix_kv_temp (src, dst, key, value, ts)
3223         VALUES (:src, :dst, :key, :value, :ts)
3224         """
3225 
3226         if self.backend == "postgres":
3227             sql_merge = (
3228                 "INSERT INTO ATLAS_PANDA.network_matrix_kv "
3229                 "(src, dst, key, value, ts) "
3230                 "SELECT  src, dst, key, value, ts FROM ATLAS_PANDA.NETWORK_MATRIX_KV_TEMP "
3231                 "ON CONFLICT (src, dst, key) "
3232                 "DO UPDATE SET value=EXCLUDED.value, ts=EXCLUDED.ts "
3233             )
3234         else:
3235             sql_merge = """
3236             MERGE /*+ FULL(nm_kv) */ INTO ATLAS_PANDA.network_matrix_kv nm_kv USING
3237                 (SELECT  src, dst, key, value, ts FROM ATLAS_PANDA.NETWORK_MATRIX_KV_TEMP) input
3238                 ON (nm_kv.src = input.src AND nm_kv.dst= input.dst AND nm_kv.key = input.key)
3239             WHEN NOT MATCHED THEN
3240                 INSERT (src, dst, key, value, ts)
3241                 VALUES (input.src, input.dst, input.key, input.value, input.ts)
3242             WHEN MATCHED THEN
3243                 UPDATE SET nm_kv.value = input.value, nm_kv.ts = input.ts
3244             """
3245         try:
3246             self.conn.begin()
3247             for shard in create_shards(data, 100):
3248                 time1 = time.time()
3249                 var_maps = []
3250                 for entry in shard:
3251                     var_map = {
3252                         ":src": entry[0],
3253                         ":dst": entry[1],
3254                         ":key": entry[2],
3255                         ":value": entry[3],
3256                         ":ts": entry[4],
3257                     }
3258                     var_maps.append(var_map)
3259 
3260                 time2 = time.time()
3261                 self.cur.executemany(sql_insert + comment, var_maps)
3262                 time3 = time.time()
3263                 tmp_log.debug(f"Processing a shard took: {time2 - time1}s of data preparation and {time3 - time2}s of insertion = {time3 - time1}")
3264 
3265             time4 = time.time()
3266             self.cur.execute(sql_merge + comment)
3267             time5 = time.time()
3268             tmp_log.debug(f"Final merge took: {time5 - time4}s")
3269             if self.backend == "postgres":
3270                 # cleanup since ON CONFLICT DO UPDATE doesn't work with duplicated entries
3271                 self.cur.execute("DELETE FROM ATLAS_PANDA.NETWORK_MATRIX_KV_TEMP " + comment)
3272             # commit
3273             if not self._commit():
3274                 raise RuntimeError("Commit error")
3275 
3276         except Exception:
3277             # roll back
3278             self._rollback()
3279             # error
3280             self.dump_error_message(tmp_log)
3281             return None, ""
3282 
3283     # Configurator function: delete old network data
3284     def deleteOldNetworkData(self):
3285         comment = " /* DBProxy.deleteOldNetworkData */"
3286         tmp_log = self.create_tagged_logger(comment)
3287         tmp_log.debug("start")
3288 
3289         # delete any data older than a week
3290         sql_delete = """
3291         DELETE FROM ATLAS_PANDA.network_matrix_kv
3292         WHERE ts < (current_date - 7)
3293         """
3294         try:
3295             self.conn.begin()
3296             time1 = time.time()
3297             self.cur.execute(sql_delete + comment)
3298             time2 = time.time()
3299             tmp_log.debug(f"Deletion of old network data took: {time2 - time1}s")
3300 
3301             # commit
3302             if not self._commit():
3303                 raise RuntimeError("Commit error")
3304 
3305         except Exception:
3306             # roll back
3307             self._rollback()
3308             # error
3309             self.dump_error_message(tmp_log)
3310             return None, ""
3311 
3312     def ups_get_queues(self):
3313         """
3314         Identify unified pilot streaming (ups) queues: served in pull (late binding) model
3315         :return: list of panda queues
3316         """
3317         comment = " /* DBProxy.ups_get_queues */"
3318         tmp_log = self.create_tagged_logger(comment)
3319         tmp_log.debug("start")
3320 
3321         ups_queues = []
3322         sql = f"""
3323               SELECT /* use_json_type */ scj.panda_queue FROM {panda_config.schemaPANDA}.schedconfig_json scj
3324               WHERE scj.data.capability='ucore' AND scj.data.workflow = 'pull_ups'
3325               """
3326 
3327         self.cur.execute(sql + comment)
3328         res = self.cur.fetchall()
3329         for (ups_queue,) in res:
3330             ups_queues.append(ups_queue)
3331 
3332         tmp_log.debug("done")
3333         return ups_queues
3334 
3335     # calculate RW for tasks
3336     def calculateTaskRW_JEDI(self, jediTaskID):
3337         comment = " /* JediDBProxy.calculateTaskRW_JEDI */"
3338         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3339         tmpLog.debug("start")
3340         try:
3341             # sql to get RW
3342             sql = "SELECT ROUND(SUM((nFiles-nFilesFinished-nFilesFailed-nFilesOnHold)*walltime)/24/3600) "
3343             sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD ".format(panda_config.schemaJEDI)
3344             sql += "WHERE tabT.jediTaskID=tabD.jediTaskID AND masterID IS NULL "
3345             sql += "AND tabT.jediTaskID=:jediTaskID "
3346             varMap = {}
3347             varMap[":jediTaskID"] = jediTaskID
3348             # begin transaction
3349             self.conn.begin()
3350             # get
3351             self.cur.execute(sql + comment, varMap)
3352             resRT = self.cur.fetchone()
3353             # commit
3354             if not self._commit():
3355                 raise RuntimeError("Commit error")
3356             # locked by another
3357             if resRT is None:
3358                 retVal = None
3359             else:
3360                 retVal = resRT[0]
3361             tmpLog.debug(f"RW={retVal}")
3362             # return
3363             tmpLog.debug("done")
3364             return retVal
3365         except Exception:
3366             # roll back
3367             self._rollback()
3368             # error
3369             self.dump_error_message(tmpLog)
3370             return None
3371 
3372     # calculate RW with a priority
3373     def calculateRWwithPrio_JEDI(self, vo, prodSourceLabel, workQueue, priority):
3374         comment = " /* JediDBProxy.calculateRWwithPrio_JEDI */"
3375         if workQueue is None:
3376             tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={None} prio={priority}")
3377         else:
3378             tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name} prio={priority}")
3379         tmpLog.debug("start")
3380         try:
3381             # sql to get RW
3382             varMap = {}
3383             varMap[":vo"] = vo
3384             varMap[":prodSourceLabel"] = prodSourceLabel
3385             if priority is not None:
3386                 varMap[":priority"] = priority
3387             sql = "SELECT tabT.jediTaskID,tabT.cloud,tabD.datasetID,nFiles-nFilesFinished-nFilesFailed,walltime "
3388             sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3389             sql += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3390             sql += "AND tabT.jediTaskID=tabD.jediTaskID AND masterID IS NULL "
3391             sql += "AND (nFiles-nFilesFinished-nFilesFailed)>0 "
3392             sql += "AND tabT.vo=:vo AND prodSourceLabel=:prodSourceLabel "
3393 
3394             if priority is not None:
3395                 sql += "AND currentPriority>=:priority "
3396 
3397             if workQueue is not None:
3398                 if workQueue.is_global_share:
3399                     sql += "AND gshare=:wq_name "
3400                     sql += f"AND tabT.workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaJEDI}.jedi_work_queue WHERE queue_function = 'Resource') "
3401                     varMap[":wq_name"] = workQueue.queue_name
3402                 else:
3403                     sql += "AND workQueue_ID=:wq_id "
3404                     varMap[":wq_id"] = workQueue.queue_id
3405 
3406             sql += "AND tabT.status IN (:status1,:status2,:status3,:status4) "
3407             sql += f"AND tabD.type IN ({INPUT_TYPES_var_str}) "
3408             varMap.update(INPUT_TYPES_var_map)
3409             varMap[":status1"] = "ready"
3410             varMap[":status2"] = "scouting"
3411             varMap[":status3"] = "running"
3412             varMap[":status4"] = "pending"
3413             sql += "AND tabT.cloud IS NOT NULL "
3414             # begin transaction
3415             self.conn.begin()
3416             # set cloud
3417             self.cur.execute(sql + comment, varMap)
3418             resList = self.cur.fetchall()
3419             # commit
3420             if not self._commit():
3421                 raise RuntimeError("Commit error")
3422             # loop over all tasks
3423             retMap = {}
3424             sqlF = "SELECT fsize,startEvent,endEvent,nEvents "
3425             sqlF += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3426             sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND rownum<=1"
3427             for jediTaskID, cloud, datasetID, nRem, walltime in resList:
3428                 # get effective size
3429                 varMap = {}
3430                 varMap[":jediTaskID"] = jediTaskID
3431                 varMap[":datasetID"] = datasetID
3432                 # begin transaction
3433                 self.conn.begin()
3434                 # get file
3435                 self.cur.execute(sqlF + comment, varMap)
3436                 resFile = self.cur.fetchone()
3437                 # commit
3438                 if not self._commit():
3439                     raise RuntimeError("Commit error")
3440                 if resFile is not None:
3441                     # calculate RW using effective size
3442                     fsize, startEvent, endEvent, nEvents = resFile
3443                     effectiveFsize = CoreUtils.getEffectiveFileSize(fsize, startEvent, endEvent, nEvents)
3444                     tmpRW = nRem * effectiveFsize * walltime
3445                     if cloud not in retMap:
3446                         retMap[cloud] = 0
3447                     retMap[cloud] += tmpRW
3448             for cloudName, rwValue in retMap.items():
3449                 retMap[cloudName] = int(rwValue / 24 / 3600)
3450             tmpLog.debug(f"RW={str(retMap)}")
3451             # return
3452             tmpLog.debug("done")
3453             return retMap
3454         except Exception:
3455             # roll back
3456             self._rollback()
3457             # error
3458             self.dump_error_message(tmpLog)
3459             return None
3460 
3461     # calculate WORLD RW with a priority
3462     def calculateWorldRWwithPrio_JEDI(self, vo, prodSourceLabel, workQueue, priority):
3463         comment = " /* JediDBProxy.calculateWorldRWwithPrio_JEDI */"
3464         if workQueue is None:
3465             tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={None} prio={priority}")
3466         else:
3467             tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name} prio={priority}")
3468         tmpLog.debug("start")
3469         try:
3470             # sql to get RW
3471             varMap = {}
3472             varMap[":vo"] = vo
3473             varMap[":prodSourceLabel"] = prodSourceLabel
3474             varMap[":worldCloud"] = JediTaskSpec.worldCloudName
3475             if priority is not None:
3476                 varMap[":priority"] = priority
3477             sql = "SELECT tabT.nucleus,SUM((nEvents-nEventsUsed)*(CASE WHEN cpuTime IS NULL THEN 300 ELSE cpuTime END)) "
3478             sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3479             sql += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3480             sql += "AND tabT.jediTaskID=tabD.jediTaskID AND masterID IS NULL "
3481             sql += "AND (nFiles-nFilesFinished-nFilesFailed)>0 "
3482             sql += "AND tabT.vo=:vo AND prodSourceLabel=:prodSourceLabel "
3483             sql += "AND tabT.cloud=:worldCloud "
3484 
3485             if priority is not None:
3486                 sql += "AND currentPriority>=:priority "
3487 
3488             if workQueue is not None:
3489                 if workQueue.is_global_share:
3490                     sql += "AND gshare=:wq_name "
3491                     sql += f"AND tabT.workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaJEDI}.jedi_work_queue WHERE queue_function = 'Resource') "
3492                     varMap[":wq_name"] = workQueue.queue_name
3493                 else:
3494                     sql += "AND workQueue_ID=:wq_id "
3495                     varMap[":wq_id"] = workQueue.queue_id
3496 
3497             sql += "AND tabT.status IN (:status1,:status2,:status3,:status4) "
3498             sql += f"AND tabD.type IN ({INPUT_TYPES_var_str}) "
3499             varMap.update(INPUT_TYPES_var_map)
3500             varMap[":status1"] = "ready"
3501             varMap[":status2"] = "scouting"
3502             varMap[":status3"] = "running"
3503             varMap[":status4"] = "pending"
3504             sql += "GROUP BY tabT.nucleus "
3505             # begin transaction
3506             self.conn.begin()
3507             # set cloud
3508             self.cur.execute(sql + comment, varMap)
3509             resList = self.cur.fetchall()
3510             # commit
3511             if not self._commit():
3512                 raise RuntimeError("Commit error")
3513             # loop over all nuclei
3514             retMap = {}
3515             for nucleus, worldRW in resList:
3516                 retMap[nucleus] = worldRW
3517             tmpLog.debug(f"RW={str(retMap)}")
3518             # return
3519             tmpLog.debug("done")
3520             return retMap
3521         except Exception:
3522             # roll back
3523             self._rollback()
3524             # error
3525             self.dump_error_message(tmpLog)
3526             return None
3527 
3528     # calculate WORLD RW for tasks
3529     def calculateTaskWorldRW_JEDI(self, jediTaskID):
3530         comment = " /* JediDBProxy.calculateTaskWorldRW_JEDI */"
3531         tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3532         tmpLog.debug("start")
3533         try:
3534             # sql to get RW
3535             sql = (
3536                 "SELECT (nEvents-nEventsUsed)*(CASE "
3537                 "WHEN cpuTime IS NULL THEN 300 "
3538                 "WHEN cpuTimeUnit='mHS06sPerEvent' OR cpuTimeUnit='mHS06sPerEventFixed' THEN cpuTime/1000 "
3539                 "ELSE cpuTime END) "
3540             )
3541             sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD ".format(panda_config.schemaJEDI)
3542             sql += "WHERE tabT.jediTaskID=tabD.jediTaskID AND masterID IS NULL "
3543             sql += "AND tabT.jediTaskID=:jediTaskID "
3544             varMap = {}
3545             varMap[":jediTaskID"] = jediTaskID
3546             # begin transaction
3547             self.conn.begin()
3548             # get
3549             self.cur.execute(sql + comment, varMap)
3550             resRT = self.cur.fetchone()
3551             # commit
3552             if not self._commit():
3553                 raise RuntimeError("Commit error")
3554             # locked by another
3555             if resRT is None:
3556                 retVal = None
3557             else:
3558                 retVal = resRT[0]
3559             tmpLog.debug(f"RW={retVal}")
3560             # return
3561             tmpLog.debug("done")
3562             return retVal
3563         except Exception:
3564             # roll back
3565             self._rollback()
3566             # error
3567             self.dump_error_message(tmpLog)
3568             return None
3569 
3570     def load_sw_map(self):
3571         comment = " /* JediDBProxy.load_sw_map */"
3572         tmp_log = self.create_tagged_logger(comment)
3573         tmp_log.debug("start")
3574 
3575         sw_map = {}
3576 
3577         try:
3578             # sql to get size
3579             sql = f"SELECT PANDA_QUEUE, DATA FROM {panda_config.schemaPANDA}.SW_TAGS"
3580             self.cur.execute(sql + comment)
3581             results = self.cur.fetchall()
3582             for panda_queue, data in results:
3583                 sw_map[panda_queue] = json.loads(data)
3584 
3585             tmp_log.debug("done")
3586             return sw_map
3587 
3588         except Exception:
3589             self._rollback()
3590             self.dump_error_message(tmp_log)
3591             return None
3592 
3593     def getNetworkMetrics(self, dst, keyList):
3594         """
3595         Get the network metrics from a source to all possible destinations
3596         :param dst: destination site
3597         :param keyList: activity keys.
3598         :return: returns a dictionary with network values in the style
3599         {
3600             <dest>: {<key>: <value>, <key>: <value>},
3601             <dest>: {<key>: <value>, <key>: <value>},
3602             ...
3603         }
3604         """
3605         comment = " /* JediDBProxy.getNetworkMetrics */"
3606         tmpLog = self.create_tagged_logger(comment)
3607         tmpLog.debug("start")
3608 
3609         latest_validity = naive_utcnow() - datetime.timedelta(minutes=60)
3610 
3611         varMap = {":dst": dst, ":latest_validity": latest_validity}
3612 
3613         key_var_names_str, key_var_map = get_sql_IN_bind_variables(keyList, prefix=":key")
3614 
3615         sql = f"""
3616         SELECT src, key, value, ts FROM {panda_config.schemaJEDI}.network_matrix_kv
3617         WHERE dst = :dst AND key IN ({key_var_names_str})
3618         AND ts > :latest_validity
3619         """
3620 
3621         varMap.update(key_var_map)
3622 
3623         self.cur.execute(sql + comment, varMap)
3624         resList = self.cur.fetchall()
3625 
3626         networkMap = {}
3627         total = {}
3628         for res in resList:
3629             src, key, value, ts = res
3630             networkMap.setdefault(src, {})
3631             networkMap[src][key] = value
3632             total.setdefault(key, 0)
3633             try:
3634                 total[key] += value
3635             except Exception:
3636                 pass
3637         networkMap["total"] = total
3638         tmpLog.debug(f"network map to nucleus {dst} is: {networkMap}")
3639 
3640         return networkMap
3641 
3642     def getBackloggedNuclei(self):
3643         """
3644         Return a list of nuclei, which has built up transfer backlog. We will consider a nucleus as backlogged,
3645          when it has over 2000 output transfers queued and there are more than 3 sites with queues over
3646         """
3647 
3648         comment = " /* JediDBProxy.getBackloggedNuclei */"
3649         tmpLog = self.create_tagged_logger(comment)
3650         tmpLog.debug("start")
3651 
3652         latest_validity = naive_utcnow() - datetime.timedelta(minutes=60)
3653 
3654         nqueued_cap = self.getConfigValue("taskbrokerage", "NQUEUED_NUC_CAP", "jedi")
3655         if nqueued_cap is None:
3656             nqueued_cap = 2000
3657 
3658         varMap = {":latest_validity": latest_validity, ":nqueued_cap": nqueued_cap}
3659 
3660         sql = f"""
3661               SELECT dst
3662               FROM {panda_config.schemaJEDI}.network_matrix_kv
3663               WHERE key = 'Production Output_queued'
3664               AND ts > :latest_validity
3665               GROUP BY dst
3666               HAVING SUM(value) > :nqueued_cap
3667         """
3668 
3669         self.cur.execute(sql + comment, varMap)
3670         try:
3671             backlogged_nuclei = [entry[0] for entry in self.cur.fetchall()]
3672         except IndexError:
3673             backlogged_nuclei = []
3674 
3675         tmpLog.debug(f"Nuclei with a long backlog are: {backlogged_nuclei}")
3676 
3677         return backlogged_nuclei
3678 
3679     def getPandaSiteToOutputStorageSiteMapping(self):
3680         """
3681         Get a  mapping of panda sites to their storage site. We consider the storage site of the default ddm endpoint
3682         :return: dictionary with panda_site_name keys and site_name values
3683         """
3684         comment = " /* JediDBProxy.getPandaSiteToOutputStorageSiteMapping */"
3685         tmpLog = self.create_tagged_logger(comment)
3686         tmpLog.debug("start")
3687 
3688         sql = """
3689         SELECT pdr.panda_site_name, de.site_name, nvl(pdr.scope, 'default')
3690         FROM atlas_panda.panda_ddm_relation pdr, atlas_panda.ddm_endpoint de
3691         WHERE pdr.default_write = 'Y'
3692         AND pdr.ddm_endpoint_name = de.ddm_endpoint_name
3693         """
3694 
3695         self.cur.execute(sql + comment)
3696         resList = self.cur.fetchall()
3697         mapping = {}
3698 
3699         for res in resList:
3700             pandaSiteName, siteName, scope = res
3701             mapping.setdefault(pandaSiteName, {})
3702             mapping[pandaSiteName][scope] = siteName
3703 
3704         # tmpLog.debug('panda site to ATLAS site mapping is: {0}'.format(mapping))
3705 
3706         tmpLog.debug("done")
3707         return mapping
3708 
3709     def get_active_gshare_rtypes(self, vo):
3710         """
3711         Gets the active gshare/resource wq combinations.  Active means they have at least 1 job in (assigned, activate, starting, running, ...)
3712         :param vo: Virtual Organization
3713         """
3714         comment = " /* DBProxy.get_active_gshare_rtypes */"
3715         tmp_log = self.create_tagged_logger(comment, f"vo={vo}")
3716         tmp_log.debug("start")
3717 
3718         # define the var map of query parameters
3719         var_map = {":vo": vo}
3720 
3721         # sql to query on pre-cached job statistics tables, creating a single result set with active gshares and resource workqueues
3722         sql_get_active_combinations = f"""
3723             WITH gshare_results AS ( 
3724             SELECT /*+ RESULT_CACHE */ gshare AS name, resource_type 
3725             FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS 
3726             WHERE vo=:vo 
3727             UNION 
3728             SELECT /*+ RESULT_CACHE */ gshare AS name, resource_type 
3729             FROM {panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS 
3730             WHERE vo=:vo 
3731             ), wq_results AS ( 
3732             SELECT jwq.QUEUE_NAME AS name, jss.resource_type 
3733             FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS jss 
3734             JOIN {panda_config.schemaPANDA}.JEDI_WORK_QUEUE jwq ON jss.WORKQUEUE_ID = jwq.QUEUE_ID 
3735             WHERE jwq.QUEUE_FUNCTION = 'Resource' AND jss.vo=:vo AND jwq.vo=:vo 
3736             UNION 
3737             SELECT jwq.QUEUE_NAME AS name, jss.resource_type 
3738             FROM {panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS jss 
3739             JOIN {panda_config.schemaPANDA}.JEDI_WORK_QUEUE jwq ON jss.WORKQUEUE_ID = jwq.QUEUE_ID 
3740             WHERE jwq.QUEUE_FUNCTION = 'Resource' AND jss.vo=:vo AND jwq.vo=:vo 
3741             ) 
3742             SELECT name, resource_type FROM gshare_results 
3743             UNION 
3744             SELECT name, resource_type FROM wq_results 
3745             GROUP BY name, resource_type
3746         """
3747 
3748         return_map = {}
3749         try:
3750             self.cur.arraysize = 1000
3751             self.cur.execute(f"{sql_get_active_combinations} {comment}", var_map)
3752             res = self.cur.fetchall()
3753 
3754             # create map
3755             for name, resource_type in res:
3756                 return_map.setdefault(name, [])
3757                 return_map[name].append(resource_type)
3758 
3759             tmp_log.debug("done")
3760             return return_map
3761         except Exception:
3762             self.dump_error_message(tmp_log)
3763             return {}
3764 
3765     # get work queue map
3766     def getWorkQueueMap(self):
3767         self.refreshWorkQueueMap()
3768         return self.workQueueMap
3769 
3770     # refresh work queue map
3771     def refreshWorkQueueMap(self):
3772         # avoid frequent lookup
3773         if self.updateTimeForWorkQueue is not None and (naive_utcnow() - self.updateTimeForWorkQueue) < datetime.timedelta(minutes=10):
3774             return
3775         comment = " /* JediDBProxy.refreshWorkQueueMap */"
3776         tmpLog = self.create_tagged_logger(comment)
3777 
3778         leave_shares = self.get_sorted_leaves()
3779 
3780         if self.workQueueMap is None:
3781             self.workQueueMap = WorkQueueMapper()
3782         # SQL
3783         sql = self.workQueueMap.getSqlQuery()
3784         try:
3785             # start transaction
3786             self.conn.begin()
3787             self.cur.arraysize = 1000
3788             self.cur.execute(sql + comment)
3789             res = self.cur.fetchall()
3790             if not self._commit():
3791                 raise RuntimeError("Commit error")
3792             # make map
3793             self.workQueueMap.makeMap(res, leave_shares)
3794             tmpLog.debug("done")
3795             self.updateTimeForWorkQueue = naive_utcnow()
3796             return True
3797         except Exception:
3798             # roll back
3799             self._rollback()
3800             # error
3801             self.dump_error_message(tmpLog)
3802             return False
3803 
3804 
3805 # get entity module
3806 def get_entity_module(base_mod) -> EntityModule:
3807     return base_mod.get_composite_module("entity")