File indexing completed on 2026-04-10 08:39:03
0001 import datetime
0002 import json
0003 import os
0004 import random
0005 import re
0006 import time
0007 import traceback
0008 import uuid
0009
0010 from pandacommon.pandalogger.LogWrapper import LogWrapper
0011 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0012
0013 from pandaserver.config import panda_config
0014 from pandaserver.srvcore import CoreUtils
0015 from pandaserver.taskbuffer import (
0016 EventServiceUtils,
0017 GlobalShares,
0018 JobUtils,
0019 PrioUtil,
0020 SiteSpec,
0021 )
0022 from pandaserver.taskbuffer.db_proxy_mods.base_module import (
0023 BaseModule,
0024 convert_dict_to_bind_vars,
0025 memoize,
0026 )
0027 from pandaserver.taskbuffer.DdmSpec import DdmSpec
0028 from pandaserver.taskbuffer.JediDatasetSpec import (
0029 INPUT_TYPES_var_map,
0030 INPUT_TYPES_var_str,
0031 )
0032 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0033 from pandaserver.taskbuffer.ResourceSpec import ResourceSpec, ResourceSpecMapper
0034 from pandaserver.taskbuffer.Utils import create_shards
0035 from pandaserver.taskbuffer.WorkQueueMapper import WorkQueueMapper
0036
0037
0038
0039 class EntityModule(BaseModule):
0040
0041 def __init__(self, log_stream: LogWrapper):
0042 super().__init__(log_stream)
0043
0044 self.tree = None
0045 self.leave_shares = None
0046 self.__t_update_shares = None
0047 self.__hs_distribution = None
0048 self.__t_update_distribution = None
0049
0050
0051
0052 self.resource_spec_mapper = None
0053 self.__t_update_resource_type_mapper = None
0054
0055
0056 self.job_prio_boost_dict = None
0057 self.job_prio_boost_dict_update_time = None
0058
0059 def __get_hs_leave_distribution(self, leave_shares):
0060 """
0061 Get the current HS06 distribution for running and queued jobs
0062 """
0063 comment = " /* DBProxy.get_hs_leave_distribution */"
0064
0065 sql_hs_distribution = """
0066 SELECT gshare, jobstatus_grouped, SUM(HS)
0067 FROM
0068 (SELECT gshare, HS,
0069 CASE
0070 WHEN jobstatus IN('activated') THEN 'queued'
0071 WHEN jobstatus IN('sent', 'running') THEN 'executing'
0072 ELSE 'ignore'
0073 END jobstatus_grouped
0074 FROM ATLAS_PANDA.JOBS_SHARE_STATS JSS) a
0075 GROUP BY gshare, jobstatus_grouped
0076 """
0077
0078 self.cur.execute(sql_hs_distribution + comment)
0079 hs_distribution_raw = self.cur.fetchall()
0080
0081
0082 hs_distribution_dict = {}
0083 hs_queued_total = 0
0084 hs_executing_total = 0
0085 hs_ignore_total = 0
0086 for hs_entry in hs_distribution_raw:
0087 gshare, status_group, hs = hs_entry
0088 if hs is None:
0089 continue
0090 hs = float(hs)
0091 hs_distribution_dict.setdefault(
0092 gshare,
0093 {
0094 GlobalShares.PLEDGED: 0,
0095 GlobalShares.QUEUED: 0,
0096 GlobalShares.EXECUTING: 0,
0097 },
0098 )
0099 hs_distribution_dict[gshare][status_group] = hs
0100
0101 if status_group == GlobalShares.QUEUED:
0102 hs_queued_total += hs
0103 elif status_group == GlobalShares.EXECUTING:
0104 hs_executing_total += hs
0105 else:
0106 hs_ignore_total += hs
0107
0108
0109 for share_node in leave_shares:
0110 share_name, share_value = share_node.name, share_node.value
0111 hs_pledged_share = hs_executing_total * share_value / 100.0
0112
0113 hs_distribution_dict.setdefault(
0114 share_name,
0115 {
0116 GlobalShares.PLEDGED: 0,
0117 GlobalShares.QUEUED: 0,
0118 GlobalShares.EXECUTING: 0,
0119 },
0120 )
0121
0122 hs_distribution_dict[share_name]["pledged"] = hs_pledged_share
0123 return hs_distribution_dict
0124
0125
0126 def get_shares(self, parents=""):
0127 comment = " /* DBProxy.get_shares */"
0128 tmp_log = self.create_tagged_logger(comment)
0129 tmp_log.debug("start")
0130
0131 sql = """
0132 SELECT NAME, VALUE, PARENT, PRODSOURCELABEL, WORKINGGROUP, CAMPAIGN, PROCESSINGTYPE, TRANSPATH, RTYPE,
0133 VO, QUEUE_ID, THROTTLED
0134 FROM ATLAS_PANDA.GLOBAL_SHARES
0135 """
0136 var_map = None
0137
0138 if parents == "":
0139
0140 pass
0141 elif parents is None:
0142
0143 sql += "WHERE parent IS NULL"
0144
0145 elif isinstance(parents, str):
0146
0147 var_map = {":parent": parents}
0148 sql += "WHERE parent = :parent"
0149
0150 elif type(parents) in (list, tuple):
0151
0152 var_map = {}
0153 parent_var_names_str, parent_var_map = get_sql_IN_bind_variables(parents, prefix=":parent")
0154 sql += f"WHERE parent IN ({parent_var_names_str})"
0155 var_map.update(parent_var_map)
0156
0157 self.cur.execute(sql + comment, var_map)
0158 resList = self.cur.fetchall()
0159
0160 tmp_log.debug("done")
0161 return resList
0162
0163 def reload_shares(self, force=False):
0164 """
0165 Reloads the shares from the DB and recalculates distributions
0166 """
0167 comment = " /* DBProxy.reload_shares */"
0168
0169 if (self.__t_update_shares is not None and self.__t_update_shares > datetime.datetime.now() - datetime.timedelta(hours=1)) or force:
0170 return
0171
0172 tmp_log = self.create_tagged_logger(comment)
0173
0174
0175 t_before = time.time()
0176 tree = GlobalShares.Share("root", 100, None, None, None, None, None, None, None, None, None, None)
0177 t_after = time.time()
0178 total = t_after - t_before
0179 tmp_log.debug(f"Root dummy tree took {total}s")
0180
0181
0182 t_before = time.time()
0183 shares_top_level = self.get_shares(parents=None)
0184 t_after = time.time()
0185 total = t_after - t_before
0186 tmp_log.debug(f"Getting shares took {total}s")
0187
0188
0189 t_before = time.time()
0190 for (
0191 name,
0192 value,
0193 parent,
0194 prodsourcelabel,
0195 workinggroup,
0196 campaign,
0197 processingtype,
0198 transpath,
0199 rtype,
0200 vo,
0201 queue_id,
0202 throttled,
0203 ) in shares_top_level:
0204 share = GlobalShares.Share(
0205 name,
0206 value,
0207 parent,
0208 prodsourcelabel,
0209 workinggroup,
0210 campaign,
0211 processingtype,
0212 transpath,
0213 rtype,
0214 vo,
0215 queue_id,
0216 throttled,
0217 )
0218 tree.children.append(self.__load_branch(share))
0219 t_after = time.time()
0220 total = t_after - t_before
0221 tmp_log.debug(f"Loading the branches took {total}s")
0222
0223
0224 t_before = time.time()
0225 tree.normalize()
0226 t_after = time.time()
0227 total = t_after - t_before
0228 tmp_log.debug(f"Normalizing the values took {total}s")
0229
0230
0231 t_before = time.time()
0232 leave_shares = tree.get_leaves()
0233 t_after = time.time()
0234 total = t_after - t_before
0235 tmp_log.debug(f"Getting the leaves took {total}s")
0236
0237 self.leave_shares = leave_shares
0238 self.__t_update_shares = datetime.datetime.now()
0239
0240
0241 t_before = time.time()
0242
0243 hs_distribution = self.__get_hs_leave_distribution(leave_shares)
0244 tree.aggregate_hs_distribution(hs_distribution)
0245 t_after = time.time()
0246 total = t_after - t_before
0247 tmp_log.debug(f"Aggregating the hs distribution took {total}s")
0248
0249 self.tree = tree
0250 self.__hs_distribution = hs_distribution
0251 self.__t_update_distribution = datetime.datetime.now()
0252 return
0253
0254 def __reload_hs_distribution(self):
0255 """
0256 Reloads the HS distribution
0257 """
0258 comment = " /* DBProxy.__reload_hs_distribution */"
0259 tmp_log = self.create_tagged_logger(comment)
0260 tmp_log.debug(self.__t_update_distribution)
0261 tmp_log.debug(self.__hs_distribution)
0262
0263 if self.__t_update_distribution is not None and self.__t_update_distribution > datetime.datetime.now() - datetime.timedelta(seconds=10):
0264 tmp_log.debug("release")
0265 return
0266
0267
0268 tmp_log.debug("get dist")
0269 t_before = time.time()
0270 hs_distribution = self.__get_hs_leave_distribution(self.leave_shares)
0271 tmp_log.debug("aggr dist")
0272 self.tree.aggregate_hs_distribution(hs_distribution)
0273 t_after = time.time()
0274 total = t_after - t_before
0275 tmp_log.debug(f"Reloading the hs distribution took {total}s")
0276
0277 self.__hs_distribution = hs_distribution
0278 self.__t_update_distribution = datetime.datetime.now()
0279
0280
0281 tmp_log.debug(f"Current HS06 distribution is {hs_distribution}")
0282
0283 return
0284
0285 def get_sorted_leaves(self):
0286 """
0287 Re-loads the shares, then returns the leaves sorted by under usage
0288 """
0289 self.reload_shares()
0290 self.__reload_hs_distribution()
0291 return self.tree.sort_branch_by_current_hs_distribution(self.__hs_distribution)
0292
0293 def get_tree_of_gshare_names(self):
0294 """
0295 get nested dict of gshare names implying the tree structure
0296 """
0297
0298 def get_nested_gshare(share):
0299 val = None
0300 if not share.children:
0301
0302 pass
0303 else:
0304
0305 val = {}
0306 for child in share.children:
0307 val[child.name] = get_nested_gshare(child)
0308 return val
0309
0310 ret_dict = get_nested_gshare(self.tree)
0311 return ret_dict
0312
0313 def __load_branch(self, share):
0314 """
0315 Recursively load a branch
0316 """
0317 node = GlobalShares.Share(
0318 share.name,
0319 share.value,
0320 share.parent,
0321 share.prodsourcelabel,
0322 share.workinggroup,
0323 share.campaign,
0324 share.processingtype,
0325 share.transpath,
0326 share.rtype,
0327 share.vo,
0328 share.queue_id,
0329 share.throttled,
0330 )
0331
0332 children = self.get_shares(parents=share.name)
0333 if not children:
0334 return node
0335
0336 for (
0337 name,
0338 value,
0339 parent,
0340 prodsourcelabel,
0341 workinggroup,
0342 campaign,
0343 processingtype,
0344 transpath,
0345 rtype,
0346 vo,
0347 queue_id,
0348 throttled,
0349 ) in children:
0350 child = GlobalShares.Share(
0351 name,
0352 value,
0353 parent,
0354 prodsourcelabel,
0355 workinggroup,
0356 campaign,
0357 processingtype,
0358 transpath,
0359 rtype,
0360 vo,
0361 queue_id,
0362 throttled,
0363 )
0364 node.children.append(self.__load_branch(child))
0365
0366 return node
0367
0368 def getGShareStatus(self):
0369 """
0370 Generates a list with sorted leave branches
0371 """
0372
0373 comment = " /* DBProxy.getGShareStatus */"
0374 tmp_log = self.create_tagged_logger(comment)
0375 tmp_log.debug("start")
0376
0377 self.reload_shares()
0378 self.__reload_hs_distribution()
0379 sorted_shares = self.tree.sort_branch_by_current_hs_distribution(self.__hs_distribution)
0380
0381 sorted_shares_export = []
0382 for share in sorted_shares:
0383 sorted_shares_export.append(
0384 {
0385 "name": share.name,
0386 "running": self.__hs_distribution[share.name]["executing"],
0387 "target": self.__hs_distribution[share.name]["pledged"],
0388 "queuing": self.__hs_distribution[share.name]["queued"],
0389 }
0390 )
0391 return sorted_shares_export
0392
0393 def is_valid_share(self, share_name):
0394 """
0395 Checks whether the share is a valid leave share
0396 """
0397 self.reload_shares()
0398 for share in self.leave_shares:
0399 if share_name == share.name:
0400
0401 return True
0402
0403
0404 return False
0405
0406 def reassignShare(self, jedi_task_ids, gshare, reassign_running):
0407 """
0408 Will reassign all tasks and their jobs that have not yet completed to specified share
0409 @param jedi_task_ids: task ids
0410 @param gshare: dest share
0411 @param reassign_running: whether to reassign running jobs
0412 """
0413
0414 comment = " /* DBProxy.reassignShare */"
0415 tmp_log = self.create_tagged_logger(comment)
0416 tmp_log.debug(f"Start reassigning {jedi_task_ids} to gshare={gshare} and reassign_running={reassign_running}")
0417
0418 try:
0419 if not self.is_valid_share(gshare):
0420 error_msg = f"Share {gshare} is not a leave share "
0421 tmp_log.debug(error_msg)
0422 ret_val = 1, error_msg
0423 tmp_log.debug(error_msg)
0424 return ret_val
0425
0426
0427 self.conn.begin()
0428
0429
0430 for shard in create_shards(jedi_task_ids, 100):
0431
0432 var_map = {}
0433 shard_taskid_set = set()
0434 i = 0
0435 for _task_id in shard:
0436 jedi_task_id = int(_task_id)
0437 var_map[f":jtid{i}"] = jedi_task_id
0438 i += 1
0439 shard_taskid_set.add(jedi_task_id)
0440 jtid_bindings = ",".join(f":jtid{i}" for i in range(len(shard_taskid_set)))
0441
0442
0443 sql_tasks_not_locked = f"""
0444 SELECT jediTaskID FROM ATLAS_PANDA.JEDI_Tasks
0445 WHERE jediTaskID IN ({jtid_bindings}) AND lockedBy IS NULL
0446 """
0447
0448 self.cur.execute(sql_tasks_not_locked + comment, var_map)
0449 res = self.cur.fetchall()
0450
0451
0452 var_map = {":gshare": gshare}
0453 good_taskid_set = set()
0454 i = 0
0455 for (_task_id,) in res:
0456 jedi_task_id = int(_task_id)
0457 var_map[f":jtid{i}"] = jedi_task_id
0458 i += 1
0459 good_taskid_set.add(jedi_task_id)
0460 jtid_bindings = ",".join(f":jtid{i}" for i in range(len(good_taskid_set)))
0461 locked_taskid_set = shard_taskid_set - good_taskid_set
0462 if locked_taskid_set:
0463 tmp_log.debug(f"skip locked tasks: {','.join([str(i) for i in locked_taskid_set])}")
0464
0465
0466 if not jtid_bindings:
0467 continue
0468
0469
0470 sql_task = f"""
0471 UPDATE ATLAS_PANDA.jedi_tasks set gshare=:gshare
0472 WHERE jeditaskid IN ({jtid_bindings})
0473 """
0474
0475 self.cur.execute(sql_task + comment, var_map)
0476 tmp_log.debug(f"""set tasks {",".join([str(i) for i in good_taskid_set])} to gshare={gshare}""")
0477
0478 var_map[":pending"] = "pending"
0479 var_map[":defined"] = "defined"
0480 var_map[":assigned"] = "assigned"
0481 var_map[":waiting"] = "waiting"
0482 var_map[":activated"] = "activated"
0483 jobstatus = ":pending, :defined, :assigned, :waiting, :activated"
0484
0485
0486 if reassign_running:
0487 jobstatus = f"{jobstatus}, :running, :starting"
0488 var_map[":running"] = "running"
0489 var_map[":starting"] = "starting"
0490
0491
0492 sql_jobs = """
0493 UPDATE ATLAS_PANDA.{0} set gshare=:gshare
0494 WHERE jeditaskid IN ({1})
0495 AND jobstatus IN ({2})
0496 """
0497
0498 for table in ["jobsactive4", "jobsdefined4"]:
0499 self.cur.execute(
0500 sql_jobs.format(table, jtid_bindings, jobstatus) + comment,
0501 var_map,
0502 )
0503
0504
0505 if not self._commit():
0506 raise RuntimeError("Commit error")
0507
0508 tmp_log.debug("done")
0509 return 0, None
0510
0511 except Exception:
0512
0513 self._rollback()
0514
0515 self.dump_error_message(tmp_log)
0516 return -1, None
0517
0518 def reload_resource_spec_mapper(self):
0519
0520 if self.__t_update_resource_type_mapper and self.__t_update_resource_type_mapper > datetime.datetime.now() - datetime.timedelta(hours=1):
0521 return
0522
0523
0524 resource_types = self.load_resource_types(use_commit=False)
0525 if resource_types:
0526 self.resource_spec_mapper = ResourceSpecMapper(resource_types)
0527 self.__t_update_resource_type_mapper = datetime.datetime.now()
0528 return
0529
0530 def load_resource_types(self, formatting="spec", use_commit=True):
0531 """
0532 Load the resource type table to memory
0533 """
0534 comment = " /* JediDBProxy.load_resource_types */"
0535 tmp_log = self.create_tagged_logger(comment)
0536 tmp_log.debug("start")
0537 try:
0538 sql = f"SELECT {ResourceSpec.column_names()} FROM {panda_config.schemaJEDI}.resource_types "
0539 self.cur.execute(sql + comment)
0540 resource_list = self.cur.fetchall()
0541 resource_spec_list = []
0542 for row in resource_list:
0543 resource_name, mincore, maxcore, minrampercore, maxrampercore = row
0544 if formatting == "dict":
0545 resource_dict = {
0546 "resource_name": resource_name,
0547 "mincore": mincore,
0548 "maxcore": maxcore,
0549 "minrampercore": minrampercore,
0550 "maxrampercore": maxrampercore,
0551 }
0552 resource_spec_list.append(resource_dict)
0553 else:
0554 resource_spec_list.append(
0555 ResourceSpec(
0556 resource_name,
0557 mincore,
0558 maxcore,
0559 minrampercore,
0560 maxrampercore,
0561 )
0562 )
0563
0564 if use_commit:
0565 if not self._commit():
0566 raise RuntimeError("Commit error")
0567 tmp_log.debug("done")
0568 return resource_spec_list
0569 except Exception:
0570 if use_commit:
0571 self._rollback()
0572 self.dump_error_message(tmp_log)
0573 return []
0574
0575 def get_resource_type_task(self, task_spec):
0576 """
0577 Identify the resource type of the task based on the resource type map.
0578 Return the name of the resource type
0579 """
0580 comment = " /* JediDBProxy.get_resource_type_task */"
0581 tmp_log = self.create_tagged_logger(comment)
0582 tmp_log.debug("start")
0583
0584 resource_map = self.load_resource_types()
0585
0586 for resource_spec in resource_map:
0587 if resource_spec.match_task(task_spec):
0588 tmp_log.debug(f"done. resource_type is {resource_spec.resource_name}")
0589 return resource_spec.resource_name
0590
0591 tmp_log.debug("done. resource_type is Undefined")
0592 return "Undefined"
0593
0594 def reset_resource_type_task(self, jedi_task_id, use_commit=True):
0595 """
0596 Retrieve the relevant task parameters and reset the resource type
0597 """
0598 comment = " /* JediDBProxy.reset_resource_type */"
0599 tmp_log = self.create_tagged_logger(comment)
0600 tmp_log.debug("start")
0601
0602
0603 var_map = {":jedi_task_id": jedi_task_id}
0604 sql = f"SELECT corecount, ramcount, baseramcount, ramunit FROM {panda_config.schemaJEDI}.jedi_tasks WHERE jeditaskid = :jedi_task_id "
0605 self.cur.execute(sql + comment, var_map)
0606 corecount, ramcount, baseramcount, ramunit = self.cur.fetchone()
0607 tmp_log.debug(
0608 "retrieved following values for jediTaskid {0}: corecount {1}, ramcount {2}, baseramcount {3}, ramunit {4}".format(
0609 jedi_task_id, corecount, ramcount, baseramcount, ramunit
0610 )
0611 )
0612
0613
0614 resource_map = self.load_resource_types(use_commit=False)
0615 resource_name = "Undefined"
0616 for resource_spec in resource_map:
0617 if resource_spec.match_task_basic(corecount, ramcount, baseramcount, ramunit):
0618 resource_name = resource_spec.resource_name
0619 break
0620
0621 tmp_log.debug(f"decided resource_type {resource_name} jediTaskid {jedi_task_id}")
0622
0623
0624 try:
0625 var_map = {":jedi_task_id": jedi_task_id, ":resource_type": resource_name}
0626 sql = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET resource_type = :resource_type WHERE jeditaskid = :jedi_task_id "
0627 tmp_log.debug("conn begin...")
0628 if use_commit:
0629 self.conn.begin()
0630 tmp_log.debug("execute...")
0631 self.cur.execute(sql + comment, var_map)
0632 tmp_log.debug("commit...")
0633 if use_commit:
0634 if not self._commit():
0635 raise RuntimeError("Commit error")
0636 tmp_log.debug("committed...")
0637 except Exception:
0638
0639 if use_commit:
0640 tmp_log.debug("rolling back...")
0641 self._rollback()
0642 self.dump_error_message(tmp_log)
0643 tmp_log.error(f"{comment}: {sql} {var_map}")
0644 return False
0645
0646 tmp_log.debug("done")
0647 return True
0648
0649 def get_resource_type_job(self, job_spec):
0650 """
0651 Identify the resource type of the job based on the resource type map.
0652 Return the name of the resource type
0653 """
0654 comment = " /* JediDBProxy.get_resource_type_job */"
0655 tmp_log = self.create_tagged_logger(comment)
0656 tmp_log.debug("start")
0657
0658 resource_map = self.load_resource_types()
0659 tmp_log.debug(
0660 "going to call match_job for pandaid {0} with minRamCount {1} (type{2}) and coreCount {3} (type{4})".format(
0661 job_spec.PandaID,
0662 job_spec.minRamCount,
0663 type(job_spec.minRamCount),
0664 job_spec.coreCount,
0665 type(job_spec.coreCount),
0666 )
0667 )
0668 resource_type = JobUtils.get_resource_type_job(resource_map, job_spec)
0669 tmp_log.debug(f"done. resource_type is {resource_type}")
0670 return resource_type
0671
0672
0673 def get_rtype_site(self, site):
0674 comment = " /* DBProxy.get_rtype_site */"
0675 tmp_log = self.create_tagged_logger(comment)
0676 tmp_log.debug("start")
0677 try:
0678 var_map = {":site": site}
0679 sql = "SELECT /* use_json_type */ scj.data.resource_type FROM ATLAS_PANDA.schedconfig_json scj WHERE panda_queue=:site "
0680
0681
0682 self.conn.begin()
0683 self.cur.arraysize = 1
0684 self.cur.execute(sql + comment, var_map)
0685 rtype = self.cur.fetchone()[0]
0686
0687 if not self._commit():
0688 raise RuntimeError("Commit error")
0689 tmp_log.debug(f"site {site} has rtype: {rtype} ")
0690 return rtype
0691 except Exception:
0692
0693 self._rollback()
0694
0695 self.dump_error_message(tmp_log)
0696 return None
0697
0698 def compare_share_task(self, share, task):
0699 """
0700 Logic to compare the relevant fields of share and task.
0701 Return: False if some condition does NOT match. True if all conditions match.
0702 """
0703 if share.prodsourcelabel is not None and task.prodSourceLabel is not None and re.match(share.prodsourcelabel, task.prodSourceLabel) is None:
0704 return False
0705
0706 working_group = task.workingGroup
0707 if working_group is None:
0708 working_group = " "
0709
0710 if share.workinggroup is not None and working_group is not None and re.match(share.workinggroup, working_group) is None:
0711 return False
0712
0713 if share.campaign is not None and task.campaign and re.match(share.campaign, task.campaign) is None:
0714 return False
0715
0716 if share.processingtype is not None and task.processingType is not None and re.match(share.processingtype, task.processingType) is None:
0717 return False
0718
0719 if share.transpath is not None and task.transPath is not None and re.match(share.transpath, task.transPath) is None:
0720 return False
0721
0722 if share.rtype is not None and task.site is not None:
0723 try:
0724 site = task.site.split(",")[0]
0725 rtype_site = self.get_rtype_site(site)
0726 if rtype_site and re.match(share.rtype, rtype_site) is None:
0727 return False
0728 except Exception:
0729 return False
0730
0731 return True
0732
0733 def compare_share_job(self, share, job):
0734 """
0735 Logic to compare the relevant fields of share and job. It's basically the same as compare_share_task, but
0736 does not check for the campaign field, which is not part of the job
0737 """
0738
0739 if share.prodsourcelabel is not None and re.match(share.prodsourcelabel, job.prodSourceLabel) is None:
0740 return False
0741
0742 if share.workinggroup is not None and re.match(share.workinggroup, job.workingGroup) is None:
0743 return False
0744
0745 if share.processingtype is not None and re.match(share.processingtype, job.processingType) is None:
0746 return False
0747
0748 if share.rtype is not None and job.computingSite is not None:
0749 try:
0750 site = job.computingSite.split(",")[0]
0751 rtype_site = self.get_rtype_site(site)
0752 if re.match(share.rtype, rtype_site) is None:
0753 return False
0754 except Exception:
0755 return False
0756
0757 return True
0758
0759 def get_share_for_task(self, task):
0760 """
0761 Return the share based on a task specification
0762 """
0763 self.reload_shares()
0764 selected_share_name = "Undefined"
0765
0766 for share in self.leave_shares:
0767 if self.compare_share_task(share, task):
0768 selected_share_name = share.name
0769 break
0770
0771 if selected_share_name == "Undefined":
0772 self._log_stream.warning(
0773 "No share matching jediTaskId={0} (prodSourceLabel={1} workingGroup={2} campaign={3} transpath={4} site={5})".format(
0774 task.jediTaskID,
0775 task.prodSourceLabel,
0776 task.workingGroup,
0777 task.campaign,
0778 task.transPath,
0779 task.site,
0780 )
0781 )
0782
0783 return selected_share_name
0784
0785 def get_share_for_job(self, job):
0786 """
0787 Return the share based on a job specification
0788 """
0789
0790 if job.eventService == EventServiceUtils.esMergeJobFlagNumber:
0791 return "Express"
0792
0793 self.reload_shares()
0794 selected_share_name = "Undefined"
0795
0796 for share in self.leave_shares:
0797 if self.compare_share_job(share, job):
0798 selected_share_name = share.name
0799 break
0800
0801 if selected_share_name == "Undefined":
0802 self._log_stream.warning(f"No share matching PandaID={job.PandaID} (prodSourceLabel={job.prodSourceLabel} workingGroup={job.workingGroup})")
0803
0804 return selected_share_name
0805
0806
0807 def getSortingCriteria(self, site_name, max_jobs):
0808 comment = " /* DBProxy.getSortingCriteria */"
0809 tmp_log = self.create_tagged_logger(comment)
0810
0811 random_number = random.randrange(100)
0812
0813 sloppy_ratio = self.getConfigValue("jobdispatch", "SLOPPY_DISPATCH_RATIO")
0814 if not sloppy_ratio:
0815 sloppy_ratio = 10
0816
0817 tmp_log.debug(f"random_number: {random_number} sloppy_ratio: {sloppy_ratio}")
0818
0819 if random_number <= sloppy_ratio:
0820
0821 tmp_log.debug(f"sorting by age")
0822 return self.getCriteriaByAge(site_name, max_jobs)
0823 else:
0824
0825 tmp_log.debug(f"sorting by gshare")
0826 return self.getCriteriaForGlobalShares(site_name, max_jobs)
0827
0828
0829 def getCriteriaForGlobalShares(self, site_name, max_jobs):
0830 comment = " /* DBProxy.getCriteriaForGlobalShare */"
0831 tmp_log = self.create_tagged_logger(comment)
0832
0833 var_map = {}
0834 ret_empty = "", {}
0835
0836 try:
0837
0838 tmp_log.debug(f"Going to call get sorted leaves")
0839 t_before = time.time()
0840 sorted_leaves = self.get_sorted_leaves()
0841 t_after = time.time()
0842 total = t_after - t_before
0843 tmp_log.debug(f"Sorting leaves took {total}s")
0844
0845 i = 0
0846 tmp_list = []
0847 for leave in sorted_leaves:
0848 var_map[f":leave{i}"] = leave.name
0849 if leave.name == "Test":
0850
0851 tmp_list.append(f"WHEN gshare=:leave{i} THEN 0")
0852 else:
0853 tmp_list.append("WHEN gshare=:leave{0} THEN {0}".format(i))
0854 i += 1
0855
0856
0857 var_map[":njobs"] = max_jobs
0858
0859
0860 leave_bindings = " ".join(tmp_list)
0861 ret_sql = f"""
0862 ORDER BY (CASE {leave_bindings} ELSE {len(sorted_leaves)} END), currentpriority desc, pandaid asc)
0863 WHERE ROWNUM <= :njobs
0864 """
0865
0866 tmp_log.debug(f"ret_sql: {ret_sql}. var_map: {var_map}")
0867 return ret_sql, var_map
0868
0869 except Exception:
0870
0871 self._rollback()
0872 self.dump_error_message(tmp_log)
0873 return ret_empty
0874
0875
0876 def getCriteriaByAge(self, site_name, max_jobs):
0877 comment = " /* DBProxy.getCriteriaByAge */"
0878 tmp_log = self.create_tagged_logger(comment)
0879
0880 ret_sql = ""
0881 var_map = {}
0882 ret_empty = "", {}
0883
0884 try:
0885
0886 var_map[":njobs"] = max_jobs
0887
0888
0889 ret_sql = """
0890 ORDER BY pandaid asc)
0891 WHERE ROWNUM <= :njobs
0892 """
0893
0894 tmp_log.debug(f"ret_sql: {ret_sql}. var_map: {var_map}")
0895 return ret_sql, var_map
0896
0897 except Exception:
0898
0899 self._rollback()
0900 self.dump_error_message(tmp_log)
0901 return ret_empty
0902
0903
0904 def setHS06sec(self, pandaID, inActive=False):
0905 comment = " /* DBProxy.setHS06sec */"
0906 tmp_log = self.create_tagged_logger(comment, f"PandaID={pandaID}")
0907 tmp_log.debug("start")
0908 hs06sec = None
0909
0910
0911 sqlJ = "SELECT jediTaskID,startTime,endTime,actualCoreCount,coreCount,jobMetrics,computingSite "
0912 if inActive:
0913 sqlJ += "FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
0914 else:
0915 sqlJ += "FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
0916
0917
0918 if inActive:
0919 sqlU = "UPDATE ATLAS_PANDA.jobsActive4 "
0920 else:
0921 sqlU = "UPDATE ATLAS_PANDA.jobsArchived4 "
0922 sqlU += "SET hs06sec=:hs06sec WHERE PandaID=:PandaID "
0923
0924
0925 varMap = {}
0926 varMap[":PandaID"] = pandaID
0927 self.cur.execute(sqlJ + comment, varMap)
0928 resJ = self.cur.fetchone()
0929 if resJ is None:
0930 tmp_log.debug("skip since job not found")
0931 else:
0932 (
0933 jediTaskID,
0934 startTime,
0935 endTime,
0936 actualCoreCount,
0937 defCoreCount,
0938 jobMetrics,
0939 computingSite,
0940 ) = resJ
0941
0942 corePower, tmpMsg = self.get_core_power(computingSite)
0943 if corePower is None:
0944 tmp_log.debug(f"skip since corePower is undefined for site={computingSite}")
0945 else:
0946
0947 coreCount = JobUtils.getCoreCount(actualCoreCount, defCoreCount, jobMetrics)
0948
0949 hs06sec = JobUtils.getHS06sec(startTime, endTime, corePower, coreCount)
0950 if hs06sec is None:
0951 tmp_log.debug("skip since HS06sec is None")
0952 else:
0953
0954 hs06sec = int(hs06sec)
0955 maxHS06sec = 999999999
0956 if hs06sec > maxHS06sec:
0957 hs06sec = maxHS06sec
0958
0959 varMap = {}
0960 varMap[":PandaID"] = pandaID
0961 varMap[":hs06sec"] = hs06sec
0962 self.cur.execute(sqlU + comment, varMap)
0963 tmp_log.debug(f"set HS06sec={hs06sec}")
0964
0965 return hs06sec
0966
0967 def convert_computingsite_to_region(self, computing_site):
0968 comment = " /* DBProxy.convert_computingsite_to_region */"
0969
0970 var_map = {":panda_queue": computing_site}
0971 sql = "SELECT /* use_json_type */ scj.data.region FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.panda_queue=:panda_queue"
0972 self.cur.arraysize = 100
0973 self.cur.execute(sql + comment, var_map)
0974 res_region = self.cur.fetchone()
0975 region = "GRID"
0976 if res_region:
0977 region = res_region[0]
0978
0979 return region
0980
0981 def get_co2_emissions_site(self, computing_site):
0982 comment = " /* DBProxy.get_co2_emissions_site */"
0983 region = self.convert_computingsite_to_region(computing_site)
0984 if not region:
0985 return None
0986
0987 var_map = {":region": region}
0988 sql = "SELECT timestamp, region, value FROM ATLAS_PANDA.CARBON_REGION_EMISSIONS WHERE region=:region"
0989 self.cur.execute(sql + comment, var_map)
0990 results = self.cur.fetchall()
0991 return results
0992
0993 def get_co2_emissions_grid(self):
0994 comment = " /* DBProxy.get_co2_emissions_grid */"
0995
0996 sql = "SELECT timestamp, region, value FROM ATLAS_PANDA.CARBON_REGION_EMISSIONS WHERE region='GRID'"
0997 self.cur.execute(sql + comment)
0998 results = self.cur.fetchall()
0999 return results
1000
1001
1002 def set_co2_emissions(self, panda_id, in_active=False):
1003 comment = " /* DBProxy.set_co2_emissions */"
1004 tmp_log = self.create_tagged_logger(comment, f"PandaID={panda_id}")
1005 tmp_log.debug("start")
1006 gco2_regional, gco2_global = None, None
1007
1008
1009 sql_read = "SELECT jediTaskID, startTime, endTime, actualCoreCount, coreCount, jobMetrics, computingSite "
1010 if in_active:
1011 sql_read += "FROM ATLAS_PANDA.jobsActive4 WHERE PandaID=:PandaID "
1012 else:
1013 sql_read += "FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
1014
1015
1016 if in_active:
1017 sql_update = "UPDATE ATLAS_PANDA.jobsActive4 "
1018 else:
1019 sql_update = "UPDATE ATLAS_PANDA.jobsArchived4 "
1020 sql_update += "SET gCO2_global=:gco2_global, gCO2_regional=:gco2_regional WHERE PandaID=:PandaID "
1021
1022
1023 var_map = {":PandaID": panda_id}
1024 self.cur.execute(sql_read + comment, var_map)
1025 res_read = self.cur.fetchone()
1026 if res_read is None:
1027 tmp_log.debug("skip since job not found")
1028 else:
1029 (
1030 task_id,
1031 start_time,
1032 end_time,
1033 actual_cores,
1034 defined_cores,
1035 job_metrics,
1036 computing_site,
1037 ) = res_read
1038
1039
1040 core_count = JobUtils.getCoreCount(actual_cores, defined_cores, job_metrics)
1041
1042
1043 var_map = {":panda_queue": computing_site}
1044 sql_wpc = "SELECT /* use_json_type */ scj.data.coreenergy FROM ATLAS_PANDA.schedconfig_json scj WHERE scj.panda_queue=:panda_queue"
1045 self.cur.arraysize = 100
1046 self.cur.execute(sql_wpc + comment, var_map)
1047 res_wpc = self.cur.fetchone()
1048 try:
1049 watts_per_core = float(res_wpc[0])
1050 except Exception:
1051 watts_per_core = 10
1052 tmp_log.debug(f"using watts_per_core={watts_per_core} for computing_site={computing_site}")
1053
1054
1055 co2_emissions = self.get_co2_emissions_site(computing_site)
1056 if not co2_emissions:
1057 tmp_log.debug(f"skip since co2_emissions are undefined for site={computing_site}")
1058 else:
1059
1060 gco2_regional = JobUtils.get_job_co2(start_time, end_time, core_count, co2_emissions, watts_per_core)
1061 if gco2_regional is None:
1062 tmp_log.debug("skip since the co2 emissions could not be calculated")
1063 else:
1064 max_gco2 = 999999999
1065 gco2_regional = min(gco2_regional, max_gco2)
1066 tmp_log.debug(f"set gco2_regional={gco2_regional}")
1067
1068
1069 co2_emissions = self.get_co2_emissions_grid()
1070 if not co2_emissions:
1071 tmp_log.debug("skip since co2_emissions are undefined for the grid")
1072 else:
1073
1074 gco2_global = JobUtils.get_job_co2(start_time, end_time, core_count, co2_emissions, watts_per_core)
1075 if gco2_global is None:
1076 tmp_log.debug("skip since the co2 emissions could not be calculated")
1077 else:
1078 max_gco2 = 999999999
1079 gco2_global = min(gco2_global, max_gco2)
1080
1081 tmp_log.debug(f"set gco2_global={gco2_global}")
1082
1083 var_map = {
1084 ":PandaID": panda_id,
1085 ":gco2_regional": gco2_regional,
1086 ":gco2_global": gco2_global,
1087 }
1088 self.cur.execute(sql_update + comment, var_map)
1089
1090 tmp_log.debug("done")
1091
1092 return gco2_regional, gco2_global
1093
1094
1095 @memoize
1096 def get_core_power(self, site_id):
1097 comment = " /* DBProxy.get_core_power */"
1098 tmp_log = self.create_tagged_logger(comment, f"siteid={site_id}")
1099 tmp_log.debug("start")
1100
1101 sqlS = "SELECT /* use_json_type */ scj.data.corepower FROM ATLAS_PANDA.schedconfig_json scj "
1102 sqlS += "WHERE panda_queue=:siteid "
1103
1104 varMap = {":siteid": site_id}
1105
1106 try:
1107 self.cur.arraysize = 100
1108 self.cur.execute(sqlS + comment, varMap)
1109 resS = self.cur.fetchone()
1110 core_power = None
1111 if resS is not None:
1112 (core_power,) = resS
1113 core_power = float(core_power)
1114 tmp_log.debug(f"got {core_power}")
1115 return core_power, None
1116
1117 except Exception:
1118
1119 self.dump_error_message(tmp_log)
1120 return None, "failed to get corePower"
1121
1122
1123 @memoize
1124 def convertObjIDtoEndPoint(self, srcFileName, objID):
1125 comment = " /* DBProxy.convertObjIDtoEndPoint */"
1126 tmp_log = self.create_tagged_logger(comment, f"ID={objID}")
1127 tmp_log.debug("start")
1128 try:
1129 for srcFile in srcFileName.split(","):
1130 if not os.path.exists(srcFile):
1131 continue
1132 with open(srcFile) as f:
1133 data = json.load(f)
1134 for rseName in data:
1135 rseData = data[rseName]
1136 if objID in [rseData["id"], rseName]:
1137 retMap = {
1138 "name": rseName,
1139 "is_deterministic": rseData["is_deterministic"],
1140 "type": rseData["type"],
1141 }
1142 tmp_log.debug(f"got {str(retMap)}")
1143 return retMap
1144 tmp_log.debug("not found")
1145 except Exception:
1146
1147 self.dump_error_message(tmp_log)
1148 return None
1149
1150 def get_config_for_pq(self, pq_name):
1151 """
1152 Get the CRIC json configuration for a particular queue
1153 """
1154
1155 comment = " /* DBProxy.get_config_for_pq */"
1156 tmp_log = self.create_tagged_logger(comment, pq_name)
1157 tmp_log.debug("start")
1158
1159 var_map = {":pq": pq_name}
1160 sql_get_queue_config = """
1161 SELECT data FROM ATLAS_PANDA.SCHEDCONFIG_JSON
1162 WHERE panda_queue = :pq
1163 """
1164 tmp_v, pq_data = self.getClobObj(sql_get_queue_config + comment, var_map)
1165 if pq_data is None:
1166 tmp_log.error("Could not find queue configuration")
1167 return None
1168
1169 try:
1170 pq_data_des = pq_data[0][0]
1171 if not isinstance(pq_data_des, dict):
1172 pq_data_des = json.loads(pq_data_des)
1173 except Exception:
1174 tmp_log.error("Could not find queue configuration")
1175 return None
1176
1177 tmp_log.debug("done")
1178 return pq_data_des
1179
1180 def getQueuesInJSONSchedconfig(self):
1181 comment = " /* DBProxy.getQueuesInJSONSchedconfig */"
1182 tmp_log = self.create_tagged_logger(comment)
1183 tmp_log.debug("start")
1184 try:
1185
1186 sqlC = "SELECT /* use_json_type */ panda_queue FROM ATLAS_PANDA.schedconfig_json"
1187
1188 self.conn.begin()
1189 self.cur.execute(sqlC + comment)
1190 panda_queues = [row[0] for row in self.cur.fetchall()]
1191
1192 if not self._commit():
1193 raise RuntimeError("Commit error")
1194 tmp_log.debug(f"got {len(panda_queues)} queues")
1195 return panda_queues
1196 except Exception:
1197
1198 self._rollback()
1199
1200 self.dump_error_message(tmp_log)
1201 return None
1202
1203
1204 def upsertQueuesInJSONSchedconfig(self, schedconfig_dump):
1205 comment = " /* DBProxy.upsertQueuesInJSONSchedconfig */"
1206 tmp_log = self.create_tagged_logger(comment)
1207 tmp_log.debug("start")
1208
1209 if not schedconfig_dump:
1210 tmp_log.error("empty schedconfig dump")
1211 return "ERROR"
1212
1213 try:
1214 existing_queues = self.getQueuesInJSONSchedconfig()
1215 if existing_queues is None:
1216 tmp_log.error("Could not retrieve already existing queues")
1217 return None
1218
1219
1220 var_map_insert = []
1221 var_map_update = []
1222 utc_now = naive_utcnow()
1223 for pq in schedconfig_dump:
1224 data = json.dumps(schedconfig_dump[pq])
1225 if not data:
1226 tmp_log.error(f"no data for {pq}")
1227 continue
1228
1229 if pq in existing_queues:
1230 tmp_log.debug(f"pq {pq} present")
1231 var_map_update.append({":pq": pq, ":data": data, ":last_update": utc_now})
1232 else:
1233 tmp_log.debug(f"pq {pq} is new")
1234 var_map_insert.append({":pq": pq, ":data": data, ":last_update": utc_now})
1235
1236
1237 self.conn.begin()
1238
1239
1240 if var_map_update:
1241 sql_update = """
1242 UPDATE ATLAS_PANDA.SCHEDCONFIG_JSON SET data = :data, last_update = :last_update
1243 WHERE panda_queue = :pq
1244 """
1245 tmp_log.debug("start updates")
1246 self.cur.executemany(sql_update + comment, var_map_update)
1247 tmp_log.debug("finished updates")
1248
1249
1250 if var_map_insert:
1251 sql_insert = """
1252 INSERT INTO ATLAS_PANDA.SCHEDCONFIG_JSON (panda_queue, data, last_update)
1253 VALUES (:pq, :data, :last_update)
1254 """
1255 tmp_log.debug("start inserts")
1256 self.cur.executemany(sql_insert + comment, var_map_insert)
1257 tmp_log.debug("finished inserts")
1258
1259
1260 tmp_log.debug("Going to delete obsoleted queues")
1261 sql_delete = """
1262 DELETE FROM ATLAS_PANDA.SCHEDCONFIG_JSON WHERE last_update < current_date - INTERVAL '7' DAY
1263 """
1264 self.cur.execute(sql_delete + comment)
1265 tmp_log.debug("deleted old queues")
1266
1267 if not self._commit():
1268 raise RuntimeError("Commit error")
1269
1270 tmp_log.debug("done")
1271 return "OK"
1272
1273 except Exception:
1274
1275 self._rollback()
1276 self.dump_error_message(tmp_log)
1277 return "ERROR"
1278
1279
1280 def loadSWTags(self, sw_tags):
1281 comment = " /* DBProxy.loadSWTags */"
1282 tmp_log = self.create_tagged_logger(comment)
1283 tmp_log.debug("start")
1284
1285 if not sw_tags:
1286 tmp_log.error("empty sw tag dump")
1287 return "ERROR"
1288
1289 try:
1290 var_map_tags = []
1291
1292 utc_now = naive_utcnow()
1293 for pq in sw_tags:
1294 data = sw_tags[pq]
1295 var_map_tags.append({":pq": pq, ":data": json.dumps(data), ":last_update": utc_now})
1296
1297
1298
1299
1300 self.conn.begin()
1301
1302 sql_delete = "DELETE FROM ATLAS_PANDA.SW_TAGS"
1303 tmp_log.debug("start cleaning up SW_TAGS table")
1304 self.cur.execute(sql_delete + comment)
1305 tmp_log.debug("done cleaning up SW_TAGS table")
1306
1307 sql_insert = "INSERT INTO ATLAS_PANDA.SW_TAGS (panda_queue, data, last_update) VALUES (:pq, :data, :last_update)"
1308 tmp_log.debug("start filling up SW_TAGS table")
1309 for shard in create_shards(var_map_tags, 100):
1310 self.cur.executemany(sql_insert + comment, shard)
1311 tmp_log.debug("done filling up table")
1312 if not self._commit():
1313 raise RuntimeError("Commit error")
1314
1315 tmp_log.debug("done")
1316 return "OK"
1317
1318 except Exception:
1319
1320 self._rollback()
1321 self.dump_error_message(tmp_log)
1322 return "ERROR"
1323
1324
1325 def getWorkingGroup(self, fqans):
1326 for fqan in fqans:
1327
1328 match = re.search("/[^/]+/([^/]+)/Role=production", fqan)
1329 if match is not None:
1330 return match.group(1)
1331 return None
1332
1333
1334 def updateSiteData(self, hostID, pilotRequests, interval):
1335 comment = " /* DBProxy.updateSiteData */"
1336 tmp_log = self.create_tagged_logger(comment)
1337 tmp_log.debug("start")
1338
1339 sqlDel = "DELETE FROM ATLAS_PANDAMETA.SiteData WHERE LASTMOD < :LASTMOD"
1340
1341 sqlRst = (
1342 "UPDATE ATLAS_PANDAMETA.SiteData "
1343 "SET GETJOB = :GETJOB, UPDATEJOB = :UPDATEJOB, NOJOB = :NOJOB, "
1344 "GETJOBABS = :GETJOBABS, UPDATEJOBABS = :UPDATEJOBABS, NOJOBABS = :NOJOBABS "
1345 "WHERE HOURS = :HOURS AND LASTMOD < :LASTMOD"
1346 )
1347
1348 sqlCh = "SELECT * FROM ATLAS_PANDAMETA.SiteData WHERE FLAG = :FLAG AND HOURS = :HOURS AND SITE = :SITE FOR UPDATE NOWAIT "
1349
1350 sqlIn = (
1351 "INSERT INTO ATLAS_PANDAMETA.SiteData "
1352 "(SITE, FLAG, HOURS, GETJOB, UPDATEJOB, NOJOB, GETJOBABS, UPDATEJOBABS, NOJOBABS, "
1353 "LASTMOD, NSTART, FINISHED, FAILED, DEFINED, ASSIGNED, WAITING, ACTIVATED, HOLDING, RUNNING, TRANSFERRING) "
1354 "VALUES (:SITE, :FLAG, :HOURS, :GETJOB, :UPDATEJOB, :NOJOB, :GETJOBABS, :UPDATEJOBABS, :NOJOBABS, CURRENT_DATE, "
1355 "0, 0, 0, 0, 0, 0, 0, 0, 0, 0)"
1356 )
1357
1358 sqlUp = (
1359 "UPDATE ATLAS_PANDAMETA.SiteData "
1360 "SET GETJOB = :GETJOB, UPDATEJOB = :UPDATEJOB, NOJOB = :NOJOB, "
1361 "GETJOBABS = :GETJOBABS, UPDATEJOBABS = :UPDATEJOBABS, NOJOBABS = :NOJOBABS, LASTMOD = CURRENT_DATE "
1362 "WHERE FLAG = :FLAG AND HOURS = :HOURS AND SITE = :SITE"
1363 )
1364
1365 sqlAll = "SELECT GETJOB, UPDATEJOB, NOJOB, GETJOBABS, UPDATEJOBABS, NOJOBABS, FLAG FROM ATLAS_PANDAMETA.SiteData WHERE HOURS = :HOURS AND SITE = :SITE"
1366
1367 try:
1368 timeNow = naive_utcnow()
1369 self.conn.begin()
1370
1371 varMap = {}
1372 varMap[":LASTMOD"] = timeNow - datetime.timedelta(hours=48)
1373 self.cur.execute(sqlDel + comment, varMap)
1374
1375 varMap = {}
1376 varMap[":HOURS"] = interval
1377 varMap[":GETJOB"] = 0
1378 varMap[":UPDATEJOB"] = 0
1379 varMap[":NOJOB"] = 0
1380 varMap[":GETJOBABS"] = 0
1381 varMap[":UPDATEJOBABS"] = 0
1382 varMap[":NOJOBABS"] = 0
1383 varMap[":LASTMOD"] = timeNow - datetime.timedelta(hours=interval)
1384 self.cur.execute(sqlRst + comment, varMap)
1385
1386 if not self._commit():
1387 raise RuntimeError("Commit error")
1388
1389 tmpSiteList = list(pilotRequests)
1390 random.shuffle(tmpSiteList)
1391
1392 for tmpSite in tmpSiteList:
1393 tmpVal = pilotRequests[tmpSite]
1394
1395 self.conn.begin()
1396
1397 varMap = {}
1398 varMap[":FLAG"] = hostID
1399 varMap[":SITE"] = tmpSite
1400 varMap[":HOURS"] = interval
1401 self.cur.arraysize = 10
1402 locked = True
1403 try:
1404
1405 self.cur.execute(sqlCh + comment, varMap)
1406 except Exception as e:
1407
1408 tmp_log.debug(f"skip to update {str(varMap)} due to {str(e)}")
1409 locked = False
1410 if locked:
1411 res = self.cur.fetchone()
1412
1413 if res is None:
1414 sql = sqlIn
1415 else:
1416 sql = sqlUp
1417
1418
1419
1420 if "getJob" in tmpVal:
1421 varMap[":GETJOB"] = len(tmpVal["getJob"])
1422 getJobAbs = 0
1423 for node in tmpVal["getJob"]:
1424 getJobAbs += tmpVal["getJob"][node]
1425 varMap[":GETJOBABS"] = getJobAbs
1426 else:
1427 varMap[":GETJOB"] = 0
1428 varMap[":GETJOBABS"] = 0
1429
1430 if "updateJob" in tmpVal:
1431 varMap[":UPDATEJOB"] = len(tmpVal["updateJob"])
1432 updateJobAbs = 0
1433 for node in tmpVal["updateJob"]:
1434 updateJobAbs += tmpVal["updateJob"][node]
1435 varMap[":UPDATEJOBABS"] = updateJobAbs
1436 else:
1437 varMap[":UPDATEJOB"] = 0
1438 varMap[":UPDATEJOBABS"] = 0
1439
1440 if "noJob" in tmpVal:
1441 varMap[":NOJOB"] = len(tmpVal["noJob"])
1442 noJobAbs = 0
1443 for node in tmpVal["noJob"]:
1444 noJobAbs += tmpVal["noJob"][node]
1445 varMap[":NOJOBABS"] = noJobAbs
1446 else:
1447 varMap[":NOJOB"] = 0
1448 varMap[":NOJOBABS"] = 0
1449
1450
1451 self.cur.execute(sql + comment, varMap)
1452
1453
1454 if not self._commit():
1455 raise RuntimeError("Commit error")
1456
1457 if locked:
1458
1459 self.conn.begin()
1460
1461 sumExist = False
1462 varMap = {}
1463 varMap[":SITE"] = tmpSite
1464 varMap[":HOURS"] = interval
1465 self.cur.arraysize = 100
1466 self.cur.execute(sqlAll + comment, varMap)
1467 res = self.cur.fetchall()
1468
1469 varMap[":GETJOB"] = 0
1470 varMap[":UPDATEJOB"] = 0
1471 varMap[":NOJOB"] = 0
1472 varMap[":GETJOBABS"] = 0
1473 varMap[":UPDATEJOBABS"] = 0
1474 varMap[":NOJOBABS"] = 0
1475 nCol = 0
1476 for (
1477 tmpGetJob,
1478 tmpUpdateJob,
1479 tmpNoJob,
1480 tmpGetJobAbs,
1481 tmpUpdateJobAbs,
1482 tmpNoJobAbs,
1483 tmpFlag,
1484 ) in res:
1485
1486 if tmpFlag == "production":
1487 sumExist = True
1488 continue
1489 if tmpFlag == "analysis":
1490 if tmpSite.startswith("ANALY_"):
1491 sumExist = True
1492 continue
1493 if tmpFlag in ["test"]:
1494 continue
1495
1496 if tmpGetJob is None:
1497 tmpGetJob = 0
1498 if tmpUpdateJob is None:
1499 tmpUpdateJob = 0
1500 if tmpNoJob is None:
1501 tmpNoJob = 0
1502 if tmpGetJobAbs is None:
1503 tmpGetJobAbs = 0
1504 if tmpUpdateJobAbs is None:
1505 tmpUpdateJobAbs = 0
1506 if tmpNoJobAbs is None:
1507 tmpNoJobAbs = 0
1508
1509
1510 varMap[":GETJOB"] += tmpGetJob
1511 varMap[":UPDATEJOB"] += tmpUpdateJob
1512 varMap[":NOJOB"] += tmpNoJob
1513 varMap[":GETJOBABS"] += tmpGetJobAbs
1514 varMap[":UPDATEJOBABS"] += tmpUpdateJobAbs
1515 varMap[":NOJOBABS"] += tmpNoJobAbs
1516 nCol += 1
1517
1518 if nCol != 0:
1519 if varMap[":GETJOB"] >= nCol:
1520 varMap[":GETJOB"] /= nCol
1521 if varMap[":UPDATEJOB"] >= nCol:
1522 varMap[":UPDATEJOB"] /= nCol
1523 if varMap[":NOJOB"] >= nCol:
1524 varMap[":NOJOB"] /= nCol
1525 if varMap[":GETJOBABS"] >= nCol:
1526 varMap[":GETJOBABS"] /= nCol
1527 if varMap[":UPDATEJOBABS"] >= nCol:
1528 varMap[":UPDATEJOBABS"] /= nCol
1529 if varMap[":NOJOBABS"] >= nCol:
1530 varMap[":NOJOBABS"] /= nCol
1531
1532 if tmpSite.startswith("ANALY_"):
1533 varMap[":FLAG"] = "analysis"
1534 else:
1535 varMap[":FLAG"] = "production"
1536
1537 locked_sum = True
1538 if sumExist:
1539 sql = sqlUp
1540 else:
1541 sql = sqlIn
1542
1543 var_map = {k: varMap[k] for k in [":FLAG", ":SITE", ":HOURS"]}
1544 try:
1545
1546 self.cur.execute(sqlCh + comment, var_map)
1547 except Exception as e:
1548
1549 tmp_log.debug(f"skip to update {str(var_map)} due to {str(e)}")
1550 locked_sum = False
1551
1552 if locked_sum:
1553 self.cur.execute(sql + comment, varMap)
1554 tmp_log.debug(
1555 " %s hours=%s getJob=%s updateJob=%s, noJob=%s, getJobAbs=%s updateJobAbs=%s, noJobAbs=%s"
1556 % (
1557 tmpSite,
1558 interval,
1559 varMap[":GETJOB"],
1560 varMap[":UPDATEJOB"],
1561 varMap[":NOJOB"],
1562 varMap[":GETJOBABS"],
1563 varMap[":UPDATEJOBABS"],
1564 varMap[":NOJOBABS"],
1565 )
1566 )
1567
1568 if not self._commit():
1569 raise RuntimeError("Commit error")
1570 tmp_log.debug("done")
1571 return True
1572 except Exception:
1573
1574 self._rollback()
1575 self.dump_error_message(tmp_log)
1576 return False
1577
1578
1579 def getCurrentSiteData(self):
1580 comment = " /* DBProxy.getCurrentSiteData */"
1581 tmp_log = self.create_tagged_logger(comment)
1582 sql = "SELECT SITE,getJob,updateJob,FLAG FROM ATLAS_PANDAMETA.SiteData WHERE FLAG IN (:FLAG1,:FLAG2) and HOURS=3"
1583 varMap = {}
1584 varMap[":FLAG1"] = "production"
1585 varMap[":FLAG2"] = "analysis"
1586 try:
1587
1588 self.conn.begin()
1589
1590 self.cur.arraysize = 10000
1591 self.cur.execute(sql + comment, varMap)
1592 res = self.cur.fetchall()
1593
1594 if not self._commit():
1595 raise RuntimeError("Commit error")
1596 ret = {}
1597 for site, getJob, updateJob, flag in res:
1598 if site.startswith("ANALY_"):
1599 if flag != "analysis":
1600 continue
1601 else:
1602 if flag != "production":
1603 continue
1604 ret[site] = {"getJob": getJob, "updateJob": updateJob}
1605 return ret
1606 except Exception:
1607 self.dump_error_message(tmp_log)
1608
1609 self._rollback()
1610 return {}
1611
1612
1613 def insertnRunningInSiteData(self):
1614 comment = " /* DBProxy.insertnRunningInSiteData */"
1615 tmp_log = self.create_tagged_logger(comment)
1616 tmp_log.debug("start")
1617
1618 sqlDel = "DELETE FROM ATLAS_PANDAMETA.SiteData WHERE FLAG IN (:FLAG1, :FLAG2) AND LASTMOD < CURRENT_DATE - 1"
1619
1620 sqlRun = (
1621 "SELECT COUNT(*), computingSite "
1622 "FROM ATLAS_PANDA.jobsActive4 "
1623 "WHERE prodSourceLabel IN (:prodSourceLabel1, :prodSourceLabel2) "
1624 "AND jobStatus = :jobStatus "
1625 "GROUP BY computingSite"
1626 )
1627
1628 sqlCh = "SELECT COUNT(*) FROM ATLAS_PANDAMETA.SiteData WHERE FLAG = :FLAG AND HOURS = :HOURS AND SITE = :SITE"
1629
1630 sqlIn = (
1631 "INSERT INTO ATLAS_PANDAMETA.SiteData "
1632 "(SITE, FLAG, HOURS, GETJOB, UPDATEJOB, LASTMOD, "
1633 "NSTART, FINISHED, FAILED, DEFINED, ASSIGNED, WAITING, "
1634 "ACTIVATED, HOLDING, RUNNING, TRANSFERRING) "
1635 "VALUES (:SITE, :FLAG, :HOURS, 0, 0, CURRENT_DATE, "
1636 "0, 0, 0, 0, 0, 0, 0, 0, :RUNNING, 0)"
1637 )
1638
1639 sqlUp = "UPDATE ATLAS_PANDAMETA.SiteData SET RUNNING = :RUNNING, LASTMOD = CURRENT_DATE WHERE FLAG = :FLAG AND HOURS = :HOURS AND SITE = :SITE"
1640
1641 sqlMax = "SELECT SITE, MAX(RUNNING) FROM ATLAS_PANDAMETA.SiteData WHERE FLAG = :FLAG GROUP BY SITE"
1642
1643 try:
1644
1645 timeNow = naive_utcnow()
1646 nHours = 1000 + timeNow.hour * 60 + timeNow.minute
1647
1648 varMap = {}
1649 varMap[":FLAG1"] = "max"
1650 varMap[":FLAG2"] = "snapshot"
1651 self.conn.begin()
1652 self.cur.execute(sqlDel + comment, varMap)
1653
1654 if not self._commit():
1655 raise RuntimeError("Commit error")
1656
1657 varMap = {}
1658 varMap[":jobStatus"] = "running"
1659 varMap[":prodSourceLabel1"] = "user"
1660 varMap[":prodSourceLabel2"] = "panda"
1661 self.conn.begin()
1662 self.cur.arraysize = 10000
1663 self.cur.execute(sqlRun + comment, varMap)
1664 res = self.cur.fetchall()
1665
1666 if not self._commit():
1667 raise RuntimeError("Commit error")
1668
1669 for nRunning, computingSite in res:
1670
1671 if not computingSite.startswith("ANALY_"):
1672 continue
1673
1674 varMap = {}
1675 varMap[":FLAG"] = "snapshot"
1676 varMap[":SITE"] = computingSite
1677 varMap[":HOURS"] = nHours
1678
1679 self.conn.begin()
1680 self.cur.arraysize = 10
1681 self.cur.execute(sqlCh + comment, varMap)
1682 res = self.cur.fetchone()
1683
1684 if res[0] == 0:
1685 sql = sqlIn
1686 else:
1687 sql = sqlUp
1688
1689 varMap = {}
1690 varMap[":FLAG"] = "snapshot"
1691 varMap[":SITE"] = computingSite
1692 varMap[":HOURS"] = nHours
1693 varMap[":RUNNING"] = nRunning
1694
1695 self.cur.execute(sql + comment, varMap)
1696
1697 if not self._commit():
1698 raise RuntimeError("Commit error")
1699
1700 varMap = {}
1701 varMap[":FLAG"] = "snapshot"
1702 self.conn.begin()
1703 self.cur.arraysize = 10000
1704 self.cur.execute(sqlMax + comment, varMap)
1705 res = self.cur.fetchall()
1706
1707 if not self._commit():
1708 raise RuntimeError("Commit error")
1709
1710 for computingSite, maxnRunning in res:
1711
1712 self.conn.begin()
1713
1714 varMap = {}
1715 varMap[":FLAG"] = "max"
1716 varMap[":SITE"] = computingSite
1717 varMap[":HOURS"] = 0
1718 self.cur.arraysize = 10
1719 self.cur.execute(sqlCh + comment, varMap)
1720 res = self.cur.fetchone()
1721
1722 if res[0] == 0:
1723 sql = sqlIn
1724 else:
1725 sql = sqlUp
1726
1727 varMap = {}
1728 varMap[":FLAG"] = "max"
1729 varMap[":SITE"] = computingSite
1730 varMap[":HOURS"] = 0
1731 varMap[":RUNNING"] = maxnRunning
1732 self.cur.execute(sql + comment, varMap)
1733
1734 if not self._commit():
1735 raise RuntimeError("Commit error")
1736 tmp_log.debug("done")
1737 return True
1738 except Exception:
1739
1740 self._rollback()
1741 self.dump_error_message(tmp_log)
1742 return False
1743
1744
1745 def getSiteInfo(self):
1746 comment = " /* DBProxy.getSiteInfo */"
1747 tmp_log = self.create_tagged_logger(comment)
1748 tmp_log.debug("start")
1749 try:
1750
1751 pandaEndpointMap, endpoint_detailed_status_summary = self.getDdmEndpoints()
1752
1753
1754 sql = """
1755 SELECT /* use_json_type */ panda_queue, data, b.site_name, c.role
1756 FROM (ATLAS_PANDA.schedconfig_json a
1757 LEFT JOIN ATLAS_PANDA.panda_site b ON a.panda_queue = b.panda_site_name)
1758 LEFT JOIN ATLAS_PANDA.site c ON b.site_name = c.site_name
1759 WHERE panda_queue IS NOT NULL
1760 """
1761 self.cur.arraysize = 10000
1762
1763
1764 ret, resList = self.getClobObj(sql, {})
1765 if not resList:
1766 tmp_log.error("Empty site list!")
1767
1768
1769 self.conn.begin()
1770
1771 sqlSL = "SELECT pandaQueueName, gshare, resourcetype, numslots FROM ATLAS_PANDA.Harvester_Slots "
1772 sqlSL += "WHERE (expirationTime IS NULL OR expirationTime>CURRENT_DATE) "
1773
1774 num_slots_by_site = {}
1775 self.cur.execute(sqlSL + comment)
1776 resSL = self.cur.fetchall()
1777
1778 for sl_queuename, sl_gshare, sl_resourcetype, sl_numslots in resSL:
1779 if sl_numslots < 0:
1780 continue
1781 num_slots_by_site.setdefault(sl_queuename, {})
1782 num_slots_by_site[sl_queuename].setdefault(sl_gshare, {})
1783 num_slots_by_site[sl_queuename][sl_gshare][sl_resourcetype] = sl_numslots
1784
1785 retList = {}
1786 if resList is not None:
1787
1788 for res in resList:
1789 try:
1790
1791 resTmp = []
1792 for tmpItem in res:
1793 if tmpItem is None:
1794 tmpItem = ""
1795 resTmp.append(tmpItem)
1796
1797 siteid, queue_data_json, pandasite, role = resTmp
1798 try:
1799 if isinstance(queue_data_json, dict):
1800 queue_data = queue_data_json
1801 else:
1802 queue_data = json.loads(queue_data_json)
1803 except Exception:
1804 tmp_log.error(f"loading json for queue {siteid} excepted. json was: {queue_data_json}")
1805 continue
1806
1807
1808 if siteid in [None, "", "ALL"] or not queue_data:
1809 if siteid != "ALL":
1810 tmp_log.error(f"siteid {siteid} had no queue_data {queue_data}")
1811 continue
1812
1813 tmp_log.debug(f"processing queue {siteid}")
1814
1815
1816 ret = SiteSpec.SiteSpec()
1817 ret.sitename = siteid
1818 ret.pandasite = pandasite
1819 ret.role = role
1820
1821 ret.type = queue_data.get("type", "production")
1822 ret.nickname = queue_data.get("nickname")
1823 try:
1824 ret.ddm = queue_data.get("ddm", "").split(",")[0]
1825 except AttributeError:
1826 ret.ddm = ""
1827 try:
1828 ret.cloud = queue_data.get("cloud", "").split(",")[0]
1829 except AttributeError:
1830 ret.cloud = ""
1831 ret.memory = queue_data.get("memory")
1832 ret.maxrss = queue_data.get("maxrss")
1833 ret.minrss = queue_data.get("minrss")
1834 ret.maxtime = queue_data.get("maxtime")
1835 ret.status = queue_data.get("status")
1836 ret.space = queue_data.get("space")
1837 ret.maxinputsize = queue_data.get("maxinputsize")
1838 ret.comment = queue_data.get("comment_")
1839 ret.statusmodtime = queue_data.get("lastmod")
1840 ret.catchall = queue_data.get("catchall")
1841 ret.tier = queue_data.get("tier")
1842 ret.jobseed = queue_data.get("jobseed")
1843 ret.capability = queue_data.get("capability")
1844 ret.workflow = queue_data.get("workflow")
1845 ret.maxDiskio = queue_data.get("maxdiskio")
1846 ret.pandasite_state = "ACTIVE"
1847 ret.fairsharePolicy = queue_data.get("fairsharepolicy")
1848 ret.defaulttoken = queue_data.get("defaulttoken")
1849
1850 ret.direct_access_lan = queue_data.get("direct_access_lan") is True
1851 ret.direct_access_wan = queue_data.get("direct_access_wan") is True
1852
1853 ret.iscvmfs = queue_data.get("is_cvmfs") is True
1854
1855 if queue_data.get("corepower") is None:
1856 ret.corepower = 0
1857 else:
1858 ret.corepower = queue_data.get("corepower")
1859
1860 ret.wnconnectivity = queue_data.get("wnconnectivity")
1861 if ret.wnconnectivity == "":
1862 ret.wnconnectivity = None
1863
1864
1865 try:
1866 if queue_data.get("maxwdir") is None:
1867 ret.maxwdir = 0
1868 else:
1869 ret.maxwdir = int(queue_data["maxwdir"])
1870 except Exception:
1871 if ret.maxinputsize in [0, None]:
1872 ret.maxwdir = 0
1873 else:
1874 try:
1875 ret.maxwdir = ret.maxinputsize + 2000
1876 except Exception:
1877 ret.maxwdir = 16336
1878
1879
1880 if queue_data.get("mintime") is not None:
1881 ret.mintime = queue_data["mintime"]
1882 else:
1883 ret.mintime = 0
1884
1885
1886 ret.reliabilityLevel = None
1887
1888
1889 ret.pledgedCPU = 0
1890 if queue_data.get("pledgedcpu") not in ["", None]:
1891 try:
1892 ret.pledgedCPU = int(queue_data["pledgedcpu"])
1893 except Exception:
1894 pass
1895
1896
1897 ret.coreCount = 0
1898 if queue_data.get("corecount") not in ["", None]:
1899 try:
1900 ret.coreCount = int(queue_data["corecount"])
1901 except Exception:
1902 pass
1903
1904
1905 ret.releases = []
1906 if queue_data.get("releases"):
1907 ret.releases = queue_data["releases"]
1908
1909
1910 ret.validatedreleases = []
1911 if queue_data.get("validatedreleases"):
1912 for tmpRel in queue_data["validatedreleases"].split("|"):
1913
1914 tmpRel = tmpRel.strip()
1915 if tmpRel != "":
1916 ret.validatedreleases.append(tmpRel)
1917
1918
1919 ret.transferringlimit = 0
1920 if queue_data.get("transferringlimit") not in ["", None]:
1921 try:
1922 ret.transferringlimit = int(queue_data["transferringlimit"])
1923 except Exception:
1924 pass
1925
1926
1927 ret.allowfax = False
1928 try:
1929 if queue_data.get("catchall") is not None and "allowfax" in queue_data["catchall"]:
1930 ret.allowfax = True
1931 if queue_data.get("allowfax") is True:
1932 ret.allowfax = True
1933 except Exception:
1934 pass
1935
1936
1937 ret.ddm_endpoints_input = {}
1938 ret.ddm_endpoints_output = {}
1939 if siteid in pandaEndpointMap:
1940 for scope in pandaEndpointMap[siteid]:
1941 if "input" in pandaEndpointMap[siteid][scope]:
1942 ret.ddm_endpoints_input[scope] = pandaEndpointMap[siteid][scope]["input"]
1943 if "output" in pandaEndpointMap[siteid][scope]:
1944 ret.ddm_endpoints_output[scope] = pandaEndpointMap[siteid][scope]["output"]
1945 else:
1946
1947 ret.ddm_endpoints_input["default"] = DdmSpec()
1948 ret.ddm_endpoints_output["default"] = DdmSpec()
1949
1950
1951 ret.setokens_input = {}
1952 ret.setokens_output = {}
1953 ret.ddm_input = {}
1954 for scope in ret.ddm_endpoints_input:
1955
1956 ret.setokens_input[scope] = ret.ddm_endpoints_input[scope].getTokenMap("input")
1957
1958 ret.ddm_input[scope] = ret.ddm_endpoints_input[scope].getDefaultRead()
1959
1960 ret.ddm_output = {}
1961 for scope in ret.ddm_endpoints_output:
1962
1963 ret.setokens_output[scope] = ret.ddm_endpoints_output[scope].getTokenMap("output")
1964
1965 ret.ddm_output[scope] = ret.ddm_endpoints_output[scope].getDefaultWrite()
1966
1967
1968 try:
1969 ret.objectstores = queue_data["objectstores"]
1970 except Exception:
1971 ret.objectstores = []
1972
1973
1974 ret.is_unified = False
1975
1976
1977 ret.num_slots_map = num_slots_by_site.get(siteid, {})
1978
1979
1980 if ret.use_per_core_attr() and ret.coreCount > 0:
1981 if ret.maxrss:
1982 ret.maxrss = ret.maxrss * ret.coreCount
1983 if ret.minrss:
1984 ret.minrss = ret.minrss * ret.coreCount
1985 if ret.maxwdir:
1986 ret.maxwdir = ret.maxwdir * ret.coreCount
1987 if ret.maxinputsize:
1988 ret.maxinputsize = ret.maxinputsize
1989
1990
1991 ret.extra_queue_params = queue_data.get("params", {})
1992
1993
1994 retList[ret.nickname] = ret
1995 except Exception:
1996 tmp_log.error(f"exception in queue: {traceback.format_exc()}")
1997 continue
1998
1999 if not self._commit():
2000 raise RuntimeError("Commit error")
2001 tmp_log.debug("done")
2002 return retList, endpoint_detailed_status_summary
2003 except Exception as e:
2004
2005 self._rollback()
2006
2007 self.dump_error_message(tmp_log)
2008 return {}, {}
2009
2010 def getDdmEndpoints(self):
2011 """
2012 get list of ddm input endpoints
2013 """
2014 comment = " /* DBProxy.getDdmEndpoints */"
2015 tmp_log = self.create_tagged_logger(comment)
2016 tmp_log.debug(f"start")
2017
2018
2019 sql_ddm = "SELECT * FROM ATLAS_PANDA.ddm_endpoint "
2020 self.cur.arraysize = 10000
2021 self.cur.execute(f"{sql_ddm}{comment}")
2022 results_ddm = self.cur.fetchall()
2023
2024
2025 column_names = [i[0].lower() for i in self.cur.description]
2026
2027
2028 endpoint_dict = {}
2029 detailed_status_summary = {}
2030 for ddm_endpoint_row in results_ddm:
2031 tmp_endpoint = {}
2032
2033 for column_name, column_val in zip(column_names, ddm_endpoint_row):
2034 tmp_endpoint[column_name] = column_val
2035
2036 ddm_endpoint_name = tmp_endpoint["ddm_endpoint_name"]
2037 try:
2038 tmp_detailed_status = tmp_endpoint["detailed_status"]
2039 if not isinstance(tmp_detailed_status, dict):
2040 tmp_detailed_status = json.loads(tmp_detailed_status)
2041
2042 if tmp_detailed_status:
2043 for tmp_activity, tmp_status in tmp_detailed_status.items():
2044 detailed_status_summary.setdefault(tmp_activity, {})
2045 detailed_status_summary[tmp_activity].setdefault(tmp_status, [])
2046 detailed_status_summary[tmp_activity][tmp_status].append(ddm_endpoint_name)
2047 tmp_endpoint["detailed_status"] = tmp_detailed_status
2048 except Exception as e:
2049 tmp_log.error(f"exception when decoding detailed_status for {ddm_endpoint_name}: {str(e)}")
2050 tmp_endpoint["detailed_status"] = {}
2051 endpoint_dict[ddm_endpoint_name] = tmp_endpoint
2052
2053
2054 sql_panda_ddm = """
2055 SELECT pdr.panda_site_name, pdr.ddm_endpoint_name, pdr.is_local, de.ddm_spacetoken_name,
2056 de.is_tape, pdr.default_read, pdr.default_write, pdr.roles, pdr.order_read, pdr.order_write,
2057 nvl(pdr.scope, 'default') as scope, de.blacklisted_read
2058 FROM ATLAS_PANDA.panda_ddm_relation pdr, ATLAS_PANDA.ddm_endpoint de
2059 WHERE pdr.ddm_endpoint_name = de.ddm_endpoint_name
2060 """
2061 if self.backend == "mysql":
2062 sql_panda_ddm = """
2063 SELECT pdr.panda_site_name, pdr.ddm_endpoint_name, pdr.is_local, de.ddm_spacetoken_name,
2064 de.is_tape, pdr.default_read, pdr.default_write, pdr.roles, pdr.order_read, pdr.order_write,
2065 ifnull(pdr.scope, 'default') as scope, de.blacklisted
2066 FROM ATLAS_PANDA.panda_ddm_relation pdr, ATLAS_PANDA.ddm_endpoint de
2067 WHERE pdr.ddm_endpoint_name = de.ddm_endpoint_name
2068 """
2069
2070 self.cur.execute(f"{sql_panda_ddm}{comment}")
2071 results_panda_ddm = self.cur.fetchall()
2072 column_names = [i[0].lower() for i in self.cur.description]
2073
2074
2075 panda_endpoint_map = {}
2076 for panda_ddm_row in results_panda_ddm:
2077 tmp_relation = {}
2078 for column_name, column_val in zip(column_names, panda_ddm_row):
2079
2080 if column_name.startswith("space_") and column_val is None:
2081 column_val = 0
2082 tmp_relation[column_name] = column_val
2083
2084
2085 panda_site_name = tmp_relation["panda_site_name"]
2086 scope = tmp_relation["scope"]
2087 panda_endpoint_map.setdefault(panda_site_name, {})
2088 panda_endpoint_map[panda_site_name].setdefault(scope, {})
2089
2090 if panda_site_name not in panda_endpoint_map:
2091 panda_endpoint_map[panda_site_name] = {
2092 "input": DdmSpec(),
2093 "output": DdmSpec(),
2094 }
2095 if "read_lan" in tmp_relation["roles"] and tmp_relation["blacklisted_read"] != "Y":
2096 panda_endpoint_map[panda_site_name][scope].setdefault("input", DdmSpec())
2097 panda_endpoint_map[panda_site_name][scope]["input"].add(tmp_relation, endpoint_dict)
2098 if "write_lan" in tmp_relation["roles"]:
2099 panda_endpoint_map[panda_site_name][scope].setdefault("output", DdmSpec())
2100 panda_endpoint_map[panda_site_name][scope]["output"].add(tmp_relation, endpoint_dict)
2101
2102 tmp_log.debug(f"done")
2103 return panda_endpoint_map, detailed_status_summary
2104
2105 def get_cloud_list(self):
2106 """
2107 Get a list of distinct cloud names from the database.
2108 """
2109 comment = " /* DBProxy.get_cloud_list */"
2110 tmp_log = self.create_tagged_logger(comment)
2111 tmp_log.debug("start")
2112 try:
2113 with self.conn:
2114 sql = (
2115 f"SELECT /* use_json_type */ DISTINCT sj.data.cloud AS cloud "
2116 f"FROM {panda_config.schemaPANDA}.schedconfig_json sj "
2117 f"UNION "
2118 f"SELECT 'WORLD' AS cloud "
2119 f"FROM dual "
2120 f"ORDER BY cloud"
2121 )
2122 self.cur.arraysize = 100
2123 self.cur.execute(sql + comment)
2124 results = self.cur.fetchall()
2125 clouds = [result[0] for result in results]
2126
2127 tmp_log.debug("done")
2128 return clouds
2129 except Exception:
2130 self.dump_error_message(tmp_log)
2131 self._rollback()
2132 return []
2133
2134
2135 def get_dict_to_boost_job_prio(self, vo):
2136 comment = " /* DBProxy.get_dict_to_boost_job_prio */"
2137 tmp_log = self.create_tagged_logger(comment)
2138
2139 if self.job_prio_boost_dict_update_time and datetime.datetime.now(datetime.timezone.utc).replace(
2140 tzinfo=None
2141 ) - self.job_prio_boost_dict_update_time < datetime.timedelta(minutes=15):
2142 return self.job_prio_boost_dict
2143 try:
2144 self.job_prio_boost_dict_update_time = naive_utcnow()
2145 self.job_prio_boost_dict = {}
2146
2147 tmp_log = self.create_tagged_logger(comment)
2148
2149 res_dicts = self.getConfigValue("dbproxy", "USER_JOB_PRIO_BOOST_DICTS", "pandaserver")
2150
2151 if res_dicts:
2152 for tmp_item in res_dicts:
2153 try:
2154 tmp_name = tmp_item["name"]
2155 tmp_type = tmp_item["type"]
2156 tmp_prio = tmp_item["prio"]
2157 tmp_expire = tmp_item.get("expire", None)
2158
2159 if tmp_expire:
2160 tmp_expire = datetime.datetime.strptime(tmp_expire, "%Y%m%d")
2161 if tmp_expire < naive_utcnow():
2162 continue
2163 self.job_prio_boost_dict.setdefault(tmp_type, {})
2164 self.job_prio_boost_dict[tmp_type][tmp_name] = int(tmp_prio)
2165 except Exception as e:
2166 tmp_log.error(str(e))
2167 tmp_log.debug(f"got {self.job_prio_boost_dict}")
2168 return self.job_prio_boost_dict
2169 except Exception:
2170
2171 self._rollback()
2172
2173 self.dump_error_message(tmp_log)
2174 return {}
2175
2176
2177 def set_user_secret(self, owner, key, value):
2178 comment = " /* DBProxy.set_user_secret */"
2179 tmp_log = self.create_tagged_logger(comment, f"owner={owner} key={key}")
2180 try:
2181
2182 sqlC = "SELECT data FROM ATLAS_PANDA.Secrets WHERE owner=:owner "
2183
2184 sqlI = "INSERT INTO ATLAS_PANDA.Secrets (owner, updated_at) " "VALUES(:owner,CURRENT_TIMESTAMP) "
2185
2186 sqlU = "UPDATE ATLAS_PANDA.Secrets SET updated_at=CURRENT_TIMESTAMP,data=:data " "WHERE owner=:owner "
2187
2188 self.conn.begin()
2189
2190 varMap = {}
2191 varMap[":owner"] = owner
2192 tmpS, tmpR = self.getClobObj(sqlC, varMap, use_commit=False)
2193 if not tmpR:
2194
2195 self.cur.execute(sqlI + comment, varMap)
2196 data = {}
2197 else:
2198 data = json.loads(tmpR[0][0])
2199
2200 if key is None:
2201
2202 data = {}
2203 elif value is None:
2204
2205 if key in data:
2206 del data[key]
2207 else:
2208 file_key = f"___file___:{key}"
2209 if file_key in data:
2210 del data[file_key]
2211 else:
2212 data[key] = value
2213 varMap = {}
2214 varMap[":owner"] = owner
2215 varMap[":data"] = json.dumps(data)
2216 self.cur.execute(sqlU + comment, varMap)
2217
2218 if not self._commit():
2219 raise RuntimeError("Commit error")
2220 tmp_log.debug("done")
2221 return True, "OK"
2222 except Exception:
2223
2224 self._rollback()
2225
2226 self.dump_error_message(tmp_log)
2227 return False, "database error"
2228
2229
2230 def get_user_secrets(self, owner, keys=None, get_json=False, use_commit=True):
2231 comment = " /* DBProxy.get_user_secrets */"
2232 tmp_log = self.create_tagged_logger(comment, f"owner={owner} keys={keys}")
2233 try:
2234
2235 sqlC = "SELECT data FROM ATLAS_PANDA.Secrets WHERE owner=:owner "
2236
2237 varMap = {}
2238 varMap[":owner"] = owner
2239 tmpS, tmpR = self.getClobObj(sqlC, varMap, use_commit=use_commit)
2240 if not tmpR:
2241 data = {}
2242 if not get_json:
2243 data = json.dumps({})
2244 else:
2245 data = tmpR[0][0]
2246
2247 if keys:
2248 keys = set(keys.split(","))
2249 data = json.loads(data)
2250 for k in list(data):
2251 if k not in keys:
2252 data.pop(k)
2253 if not get_json:
2254 data = json.dumps(data)
2255 else:
2256 if get_json:
2257 data = json.loads(data)
2258 tmp_log.debug(f"got data with length={len(data)}")
2259 return True, data
2260 except Exception:
2261
2262 self.dump_error_message(tmp_log)
2263 return False, "database error"
2264
2265 def configurator_write_sites(self, site_list):
2266 """
2267 Cache the CRIC site information in the PanDA database
2268 """
2269 comment = " /* DBProxy.configurator_write_sites */"
2270 tmp_log = self.create_tagged_logger(comment)
2271 tmp_log.debug("start")
2272
2273 try:
2274
2275 self.conn.begin()
2276
2277
2278 tmp_log.debug("getting existing sites")
2279 sql_get = "SELECT site_name FROM ATLAS_PANDA.site"
2280 self.cur.execute(sql_get + comment)
2281 results = self.cur.fetchall()
2282 site_name_list = list(map(lambda result: result[0], results))
2283 tmp_log.debug("finished getting existing sites")
2284
2285
2286 var_map_insert = []
2287 var_map_update = []
2288 for site in site_list:
2289 if site["site_name"] in site_name_list:
2290 var_map_update.append(convert_dict_to_bind_vars(site))
2291 else:
2292 var_map_insert.append(convert_dict_to_bind_vars(site))
2293
2294 tmp_log.debug("Updating sites")
2295 sql_update = "UPDATE ATLAS_PANDA.site set role=:role, tier_level=:tier_level WHERE site_name=:site_name"
2296 for shard in create_shards(var_map_update, 100):
2297 self.cur.executemany(sql_update + comment, shard)
2298
2299 tmp_log.debug("Inserting sites")
2300 sql_insert = "INSERT INTO ATLAS_PANDA.site (site_name, role, tier_level) " "VALUES(:site_name, :role, :tier_level)"
2301 for shard in create_shards(var_map_insert, 100):
2302 self.cur.executemany(sql_insert + comment, shard)
2303
2304
2305 if not self._commit():
2306 raise RuntimeError("Commit error")
2307
2308 tmp_log.debug("Done")
2309 return 0, None
2310
2311 except Exception:
2312 self._rollback()
2313 self.dump_error_message(tmp_log)
2314 return -1, None
2315
2316 def configurator_write_panda_sites(self, panda_site_list):
2317 comment = " /* DBProxy.configurator_write_panda_sites */"
2318 tmp_log = self.create_tagged_logger(comment)
2319 tmp_log.debug("start")
2320
2321 try:
2322
2323 self.conn.begin()
2324
2325
2326 tmp_log.debug("getting existing panda sites")
2327 sql_get = "SELECT panda_site_name FROM ATLAS_PANDA.panda_site"
2328 self.cur.execute(sql_get + comment)
2329 results = self.cur.fetchall()
2330 panda_site_name_list = list(map(lambda result: result[0], results))
2331 tmp_log.debug("finished getting existing panda sites")
2332
2333
2334 var_map_insert = []
2335 var_map_update = []
2336 for panda_site in panda_site_list:
2337 if panda_site["panda_site_name"] in panda_site_name_list:
2338 var_map_update.append(convert_dict_to_bind_vars(panda_site))
2339 else:
2340 var_map_insert.append(convert_dict_to_bind_vars(panda_site))
2341
2342 tmp_log.debug("Updating panda sites")
2343 sql_update = "UPDATE ATLAS_PANDA.panda_site set site_name=:site_name WHERE panda_site_name=:panda_site_name "
2344 for shard in create_shards(var_map_update, 100):
2345 self.cur.executemany(sql_update + comment, shard)
2346
2347 tmp_log.debug("Inserting panda sites")
2348 sql_insert = "INSERT INTO ATLAS_PANDA.panda_site (panda_site_name, site_name) " "VALUES(:panda_site_name, :site_name)"
2349 for shard in create_shards(var_map_insert, 100):
2350 self.cur.executemany(sql_insert + comment, shard)
2351
2352
2353 if not self._commit():
2354 raise RuntimeError("Commit error")
2355
2356 tmp_log.debug("Done")
2357 return 0, None
2358
2359 except Exception:
2360 self._rollback()
2361 self.dump_error_message(tmp_log)
2362 return -1, None
2363
2364 def configurator_write_ddm_endpoints(self, ddm_endpoint_list):
2365 comment = " /* DBProxy.configurator_write_ddm_endpoints */"
2366 tmp_log = self.create_tagged_logger(comment)
2367 tmp_log.debug("start")
2368
2369 try:
2370
2371 self.conn.begin()
2372
2373
2374 tmp_log.debug("getting existing ddm endpoints")
2375 sql_get = "SELECT ddm_endpoint_name FROM ATLAS_PANDA.ddm_endpoint"
2376 self.cur.execute(sql_get + comment)
2377 results = self.cur.fetchall()
2378 ddm_endpoint_name_list = list(map(lambda result: result[0], results))
2379 tmp_log.debug("finished getting existing ddm endpoints")
2380
2381
2382 var_map_insert = []
2383 var_map_update = []
2384 for ddm_endpoint in ddm_endpoint_list:
2385 if ddm_endpoint["ddm_endpoint_name"] in ddm_endpoint_name_list:
2386 var_map_update.append(convert_dict_to_bind_vars(ddm_endpoint))
2387 else:
2388 var_map_insert.append(convert_dict_to_bind_vars(ddm_endpoint))
2389
2390 tmp_log.debug("Updating ddm endpoints")
2391 sql_update = (
2392 "UPDATE ATLAS_PANDA.ddm_endpoint set "
2393 "site_name=:site_name, ddm_spacetoken_name=:ddm_spacetoken_name, type=:type, is_tape=:is_tape, "
2394 "blacklisted=:blacklisted, blacklisted_write=:blacklisted_write, blacklisted_read=:blacklisted_read, detailed_status=:detailed_status, "
2395 "space_used=:space_used, space_free=:space_free, space_total=:space_total, space_expired=:space_expired, space_timestamp=:space_timestamp "
2396 "WHERE ddm_endpoint_name=:ddm_endpoint_name"
2397 )
2398 for shard in create_shards(var_map_update, 100):
2399 self.cur.executemany(sql_update + comment, shard)
2400
2401 tmp_log.debug("Inserting ddm endpoints")
2402 sql_insert = (
2403 "INSERT INTO ATLAS_PANDA.ddm_endpoint (ddm_endpoint_name, site_name, ddm_spacetoken_name, type, is_tape, "
2404 "blacklisted, blacklisted_write, blacklisted_read, detailed_status, "
2405 "space_used, space_free, space_total, space_expired, space_timestamp) "
2406 "VALUES(:ddm_endpoint_name, :site_name, :ddm_spacetoken_name, :type, :is_tape, "
2407 ":blacklisted, :blacklisted_write, :blacklisted_read, :detailed_status, "
2408 ":space_used, :space_free, :space_total, :space_expired, :space_timestamp)"
2409 )
2410 for shard in create_shards(var_map_insert, 100):
2411 self.cur.executemany(sql_insert + comment, shard)
2412
2413
2414 if not self._commit():
2415 raise RuntimeError("Commit error")
2416
2417 tmp_log.debug("Done")
2418 return 0, None
2419
2420 except Exception:
2421 self._rollback()
2422 self.dump_error_message(tmp_log)
2423 return -1, None
2424
2425 def configurator_write_panda_ddm_relations(self, relation_list):
2426 comment = " /* DBProxy.configurator_write_panda_ddm_relations */"
2427 tmp_log = self.create_tagged_logger(comment)
2428 tmp_log.debug("start")
2429
2430 try:
2431
2432 self.conn.begin()
2433
2434
2435 tmp_log.debug("Deleting existing panda ddm relations")
2436 sql_delete = "DELETE FROM ATLAS_PANDA.panda_ddm_relation"
2437 self.cur.execute(sql_delete + comment)
2438
2439 var_map_insert = []
2440 for relation in relation_list:
2441 var_map_insert.append(convert_dict_to_bind_vars(relation))
2442
2443 tmp_log.debug("Inserting panda ddm relations")
2444 sql_insert = (
2445 "INSERT INTO ATLAS_PANDA.panda_ddm_relation (panda_site_name, ddm_endpoint_name, roles, "
2446 "is_local, order_read, order_write, default_read, default_write, scope) "
2447 "VALUES(:panda_site_name, :ddm_endpoint_name, :roles, "
2448 ":is_local, :order_read, :order_write, :default_read, :default_write, :scope)"
2449 )
2450 for shard in create_shards(var_map_insert, 100):
2451 self.cur.executemany(sql_insert + comment, shard)
2452
2453
2454 if not self._commit():
2455 raise RuntimeError("Commit error")
2456
2457 tmp_log.debug("Done")
2458 return 0, None
2459
2460 except Exception:
2461 self._rollback()
2462 self.dump_error_message(tmp_log)
2463 return -1, None
2464
2465 def configurator_read_sites(self):
2466 comment = " /* DBProxy.configurator_read_sites */"
2467 tmp_log = self.create_tagged_logger(comment)
2468 tmp_log.debug("start")
2469
2470 try:
2471 tmp_log.debug("getting existing panda sites")
2472 sql_get = "SELECT site_name FROM ATLAS_PANDA.site"
2473 self.cur.execute(sql_get + comment)
2474 results = self.cur.fetchall()
2475 site_names = set(map(lambda result: result[0], results))
2476 tmp_log.debug("finished getting site names in configurator")
2477
2478 tmp_log.debug("Done")
2479 return site_names
2480
2481 except Exception:
2482 self.dump_error_message(tmp_log)
2483 return set()
2484
2485 def configurator_read_panda_sites(self):
2486 comment = " /* DBProxy.configurator_read_sites */"
2487 tmp_log = self.create_tagged_logger(comment)
2488 tmp_log.debug("start")
2489
2490 try:
2491 tmp_log.debug("getting existing panda sites")
2492 sql_get = "SELECT panda_site_name FROM ATLAS_PANDA.panda_site"
2493 self.cur.execute(sql_get + comment)
2494 results = self.cur.fetchall()
2495 panda_site_names = set(map(lambda result: result[0], results))
2496 tmp_log.debug("finished getting panda site names in configurator")
2497
2498 tmp_log.debug("Done")
2499 return panda_site_names
2500
2501 except Exception:
2502 self.dump_error_message(tmp_log)
2503 return set()
2504
2505 def configurator_read_ddm_endpoints(self):
2506 comment = " /* DBProxy.configurator_read_ddm_endpoints */"
2507 tmp_log = self.create_tagged_logger(comment)
2508 tmp_log.debug("start")
2509
2510 try:
2511 tmp_log.debug("getting existing ddm endpoints")
2512 sql_get = "SELECT ddm_endpoint_name FROM ATLAS_PANDA.ddm_endpoint"
2513 self.cur.execute(sql_get + comment)
2514 results = self.cur.fetchall()
2515 ddm_endpoint_names = set(map(lambda result: result[0], results))
2516 tmp_log.debug("finished getting ddm endpoint names in configurator")
2517
2518 tmp_log.debug("Done")
2519 return ddm_endpoint_names
2520
2521 except Exception:
2522 self.dump_error_message(tmp_log)
2523 return set()
2524
2525 def configurator_read_cric_sites(self):
2526 comment = " /* DBProxy.configurator_read_cric_sites */"
2527 tmp_log = self.create_tagged_logger(comment)
2528 tmp_log.debug("start")
2529
2530 try:
2531 tmp_log.debug("getting existing CRIC sites")
2532 sql_get = "SELECT /* use_json_type */ distinct scj.data.atlas_site FROM ATLAS_PANDA.schedconfig_json scj"
2533 self.cur.arraysize = 1000
2534 self.cur.execute(sql_get + comment)
2535 results = self.cur.fetchall()
2536 site_names = set(map(lambda result: result[0], results))
2537 tmp_log.debug("finished getting CRIC sites")
2538
2539 tmp_log.debug("Done")
2540 return site_names
2541
2542 except Exception:
2543 self.dump_error_message(tmp_log)
2544 return set()
2545
2546 def configurator_read_cric_panda_sites(self):
2547 comment = " /* DBProxy.configurator_read_cric_panda_sites */"
2548 tmp_log = self.create_tagged_logger(comment)
2549 tmp_log.debug("start")
2550
2551 try:
2552 tmp_log.debug("getting existing CRIC panda queues")
2553 sql_get = "SELECT panda_queue FROM ATLAS_PANDA.schedconfig_json"
2554 self.cur.execute(sql_get + comment)
2555 results = self.cur.fetchall()
2556 panda_site_names = set(map(lambda result: result[0], results))
2557 tmp_log.debug("finished getting CRIC panda queues")
2558
2559 tmp_log.debug("Done")
2560 return panda_site_names
2561
2562 except Exception:
2563 self._rollback()
2564 self.dump_error_message(tmp_log)
2565 return set()
2566
2567 def configurator_delete_sites(self, sites_to_delete):
2568 """
2569 Delete sites and all dependent entries (panda_sites, ddm_endpoints, panda_ddm_relations).
2570 Deletion of dependent entries is done through cascade definition in models
2571 """
2572 comment = " /* DBProxy.configurator_delete_sites */"
2573 tmp_log = self.create_tagged_logger(comment)
2574 tmp_log.debug("start")
2575
2576 if not sites_to_delete:
2577 tmp_log.debug("nothing to delete")
2578 return
2579
2580 var_map_list = list(map(lambda site_name: {":site_name": site_name}, sites_to_delete))
2581
2582 try:
2583
2584 self.conn.begin()
2585 tmp_log.debug(f"deleting sites: {sites_to_delete}")
2586 sql_update = "DELETE FROM ATLAS_PANDA.site WHERE site_name=:site_name"
2587 self.cur.executemany(sql_update + comment, var_map_list)
2588 tmp_log.debug("done deleting sites")
2589
2590
2591 if not self._commit():
2592 raise RuntimeError("Commit error")
2593
2594 tmp_log.debug("Done")
2595 return 0, None
2596
2597 except Exception:
2598 self._rollback()
2599 self.dump_error_message(tmp_log)
2600 return -1, None
2601
2602 def configurator_delete_panda_sites(self, panda_sites_to_delete):
2603 """
2604 Delete PanDA sites and dependent entries in panda_ddm_relations
2605 """
2606 comment = " /* DBProxy.configurator_delete_panda_sites */"
2607 tmp_log = self.create_tagged_logger(comment)
2608 tmp_log.debug("start")
2609
2610 if not panda_sites_to_delete:
2611 tmp_log.debug("nothing to delete")
2612 return
2613
2614 var_map_list = list(
2615 map(
2616 lambda panda_site_name: {":panda_site_name": panda_site_name},
2617 panda_sites_to_delete,
2618 )
2619 )
2620
2621 try:
2622
2623 self.conn.begin()
2624 tmp_log.debug(f"deleting panda sites: {panda_sites_to_delete}")
2625 sql_update = "DELETE FROM ATLAS_PANDA.panda_site WHERE panda_site_name=:panda_site_name"
2626 self.cur.executemany(sql_update + comment, var_map_list)
2627 tmp_log.debug("done deleting panda sites")
2628
2629
2630 if not self._commit():
2631 raise RuntimeError("Commit error")
2632
2633 tmp_log.debug("Done")
2634 return 0, None
2635
2636 except Exception:
2637 self._rollback()
2638 self.dump_error_message(tmp_log)
2639 return -1, None
2640
2641 def configurator_delete_ddm_endpoints(self, ddm_endpoints_to_delete):
2642 """
2643 Delete DDM endpoints dependent entries in panda_ddm_relations
2644 """
2645 comment = " /* DBProxy.configurator_delete_ddm_endpoints */"
2646 tmp_log = self.create_tagged_logger(comment)
2647 tmp_log.debug("start")
2648
2649 if not ddm_endpoints_to_delete:
2650 tmp_log.debug("nothing to delete")
2651 return
2652
2653 var_map_list = list(
2654 map(
2655 lambda ddm_endpoint_name: {":ddm_endpoint_name": ddm_endpoint_name},
2656 ddm_endpoints_to_delete,
2657 )
2658 )
2659
2660 try:
2661
2662 self.conn.begin()
2663 tmp_log.debug(f"deleting ddm endpoints: {ddm_endpoints_to_delete}")
2664 sql_update = "DELETE FROM ATLAS_PANDA.ddm_endpoint WHERE ddm_endpoint_name=:ddm_endpoint_name"
2665 self.cur.executemany(sql_update + comment, var_map_list)
2666 tmp_log.debug("done deleting ddm endpoints")
2667
2668
2669 if not self._commit():
2670 raise RuntimeError("Commit error")
2671
2672 tmp_log.debug("Done")
2673 return 0, None
2674
2675 except Exception:
2676 self._rollback()
2677 self.dump_error_message(tmp_log)
2678 return -1, None
2679
2680 def carbon_write_region_emissions(self, emissions):
2681 comment = " /* DBProxy.carbon_write_regional_emissions */"
2682 tmp_log = self.create_tagged_logger(comment)
2683 tmp_log.debug("start")
2684
2685 try:
2686
2687 self.conn.begin()
2688
2689 tmp_log.debug("Deleting old entries")
2690 sql_delete = "DELETE FROM ATLAS_PANDA.CARBON_REGION_EMISSIONS " "WHERE timestamp < sysdate - interval '10' day"
2691 self.cur.execute(sql_delete + comment)
2692
2693 tmp_log.debug("Inserting emissions by region")
2694
2695 sql_insert = (
2696 "INSERT /*+ ignore_row_on_dupkey_index (emissions(region, timestamp)) */ "
2697 "INTO ATLAS_PANDA.CARBON_REGION_EMISSIONS emissions (REGION, TIMESTAMP, VALUE) "
2698 "VALUES(:region, :timestamp, :value)"
2699 )
2700 for shard in create_shards(emissions, 100):
2701 self.cur.executemany(sql_insert + comment, shard)
2702
2703
2704 if not self._commit():
2705 raise RuntimeError("Commit error")
2706
2707 tmp_log.debug("Done")
2708 return 0, None
2709
2710 except Exception:
2711 self._rollback()
2712 self.dump_error_message(tmp_log)
2713 return -1, None
2714
2715 def carbon_aggregate_emissions(self):
2716 comment = " /* DBProxy.carbon_aggregate_emissions */"
2717 tmp_log = self.create_tagged_logger(comment)
2718 tmp_log.debug("start")
2719
2720 try:
2721
2722 self.conn.begin()
2723
2724
2725 sql_stat = (
2726 "WITH tmp_total(total_hs) AS "
2727 "(SELECT sum(hs) "
2728 "FROM ATLAS_PANDA.jobs_share_stats) "
2729 "SELECT scj.data.region, sum(jss.hs)/tmp_total.total_hs "
2730 "FROM ATLAS_PANDA.jobs_share_stats jss, ATLAS_PANDA.schedconfig_json scj, tmp_total "
2731 "WHERE jss.computingsite = scj.panda_queue "
2732 "AND scj.data.region IS NOT NULL AND scj.data.region != 'GRID'"
2733 "GROUP BY scj.data.region, tmp_total.total_hs "
2734 )
2735
2736 region_dic = {}
2737 self.cur.arraysize = 1000
2738 stats_raw = self.cur.execute(sql_stat + comment)
2739 for entry in stats_raw:
2740 region, per_cent = entry
2741 region_dic.setdefault(region, {"emissions": 0, "per_cent": 0})
2742 region_dic[region]["per_cent"] = per_cent
2743
2744
2745 sql_last = (
2746 "WITH top_ts(timestamp, region) AS "
2747 "(SELECT max(timestamp), region "
2748 "FROM atlas_panda.carbon_region_emissions "
2749 "GROUP BY region) "
2750 "SELECT cre.region, cre.value, cre.timestamp "
2751 "FROM atlas_panda.carbon_region_emissions cre, top_ts "
2752 "WHERE cre.timestamp = top_ts.timestamp AND cre.region = top_ts.region"
2753 )
2754
2755 last_emission_values = self.cur.execute(sql_last + comment)
2756 for entry in last_emission_values:
2757 region, value, ts = entry
2758 region_dic.setdefault(region, {"emissions": 0, "per_cent": 0})
2759 region_dic[region]["emissions"] = value
2760
2761
2762 average_emissions = 0
2763 for region in region_dic:
2764 tmp_log.debug(f"Region {region} with per_cent {region_dic[region]['per_cent']} and emissions {region_dic[region]['emissions']}")
2765 try:
2766 average_emissions = average_emissions + region_dic[region]["per_cent"] * region_dic[region]["emissions"]
2767 except Exception:
2768 tmp_log.debug(f"Skipped Region {region} with per_cent {region_dic[region]['per_cent']} and emissions {region_dic[region]['emissions']}")
2769
2770
2771 tmp_log.debug(f"The grid co2 emissions were averaged to {average_emissions}")
2772 utc_now = naive_utcnow()
2773 var_map = {
2774 ":region": "GRID",
2775 ":timestamp": utc_now,
2776 ":value": average_emissions,
2777 }
2778
2779 sql_insert = "INSERT INTO ATLAS_PANDA.carbon_region_emissions (region, timestamp, value) " "VALUES (:region, :timestamp, :value)"
2780 self.cur.execute(sql_insert + comment, var_map)
2781
2782
2783 if not self._commit():
2784 raise RuntimeError("Commit error")
2785
2786 tmp_log.debug("Done")
2787 return 0, None
2788
2789 except Exception:
2790 self._rollback()
2791 self.dump_error_message(tmp_log)
2792 return -1, None
2793
2794
2795 def checkQuota(self, dn):
2796 comment = " /* DBProxy.checkQuota */"
2797 tmp_log = self.create_tagged_logger(comment, f"dn={dn}")
2798 tmp_log.debug(f"start")
2799 try:
2800
2801 self.conn.begin()
2802
2803 name = CoreUtils.clean_user_id(dn)
2804 sql = "SELECT cpua1, cpua7, cpua30, quotaa1, quotaa7, quotaa30 FROM ATLAS_PANDAMETA.users WHERE name=:name"
2805 varMap = {}
2806 varMap[":name"] = name
2807 self.cur.arraysize = 10
2808 self.cur.execute(sql + comment, varMap)
2809 res = self.cur.fetchall()
2810
2811 if not self._commit():
2812 raise RuntimeError("Commit error")
2813 weight = 0.0
2814 if res is not None and len(res) != 0:
2815 item = res[0]
2816
2817 cpu1 = item[0]
2818 cpu7 = item[1]
2819 cpu30 = item[2]
2820 if item[3] in [0, None]:
2821 quota1 = 0
2822 else:
2823 quota1 = item[3] * 3600
2824 if item[4] in [0, None]:
2825 quota7 = 0
2826 else:
2827 quota7 = item[4] * 3600
2828 if item[5] in [0, None]:
2829 quota30 = 0
2830 else:
2831 quota30 = item[5] * 3600
2832
2833 if cpu1 is None:
2834 cpu1 = 0.0
2835
2836 if quota1 > 0:
2837 weight = float(cpu1) / float(quota1)
2838
2839 weight = 0.0
2840 tmp_log.debug(f"Weight:{weight} Quota:{quota1} CPU:{cpu1}")
2841 else:
2842 tmp_log.debug(f"cannot found")
2843 return weight
2844 except Exception:
2845 self.dump_error_message(tmp_log)
2846
2847 self._rollback()
2848 return 0.0
2849
2850
2851 def isSuperUser(self, userName):
2852 comment = " /* DBProxy.isSuperUser */"
2853 tmp_log = self.create_tagged_logger(comment, f"userName={userName}")
2854 tmp_log.debug("start")
2855 try:
2856 isSU = False
2857 isSG = False
2858
2859 self.conn.begin()
2860
2861 name = CoreUtils.clean_user_id(userName)
2862 sql = "SELECT gridpref FROM ATLAS_PANDAMETA.users WHERE name=:name"
2863 varMap = {}
2864 varMap[":name"] = name
2865 self.cur.arraysize = 10
2866 self.cur.execute(sql + comment, varMap)
2867 res = self.cur.fetchone()
2868
2869 if not self._commit():
2870 raise RuntimeError("Commit error")
2871
2872 if res is not None:
2873 (gridpref,) = res
2874 if gridpref is not None:
2875 if PrioUtil.PERMISSION_SUPER_USER in gridpref:
2876 isSU = True
2877 if PrioUtil.PERMISSION_SUPER_GROUP in gridpref:
2878 isSG = True
2879 tmp_log.debug(f"done with superUser={isSU} superGroup={isSG}")
2880 return isSU, isSG
2881 except Exception:
2882
2883 self._rollback()
2884
2885 self.dump_error_message(tmp_log)
2886 return False, False
2887
2888
2889 def getUserParameter(self, dn, jobID, jobsetID):
2890 comment = " /* DBProxy.getUserParameter */"
2891 tmp_log = self.create_tagged_logger(comment, f"dn={dn} jobID={jobID} jobsetID={jobsetID}")
2892 try:
2893
2894 retStatus = True
2895 if jobsetID == -1:
2896
2897 retJobsetID = jobID
2898
2899 retJobID = retJobsetID + 1
2900 elif jobsetID in ["NULL", None, 0]:
2901
2902 retJobsetID = None
2903 retJobID = jobID
2904 else:
2905
2906 retJobsetID = jobsetID
2907 retJobID = jobID
2908
2909 self.conn.begin()
2910
2911 name = CoreUtils.clean_user_id(dn)
2912 sql = "SELECT jobid,status FROM ATLAS_PANDAMETA.users WHERE name=:name "
2913 sql += "FOR UPDATE "
2914 sqlAdd = "INSERT INTO ATLAS_PANDAMETA.users "
2915 sqlAdd += "(ID,NAME,LASTMOD,FIRSTJOB,LATESTJOB,CACHETIME,NCURRENT,JOBID) "
2916 sqlAdd += "VALUES(ATLAS_PANDAMETA.USERS_ID_SEQ.nextval,:name,"
2917 sqlAdd += "CURRENT_DATE,CURRENT_DATE,CURRENT_DATE,CURRENT_DATE,0,1) "
2918 varMap = {}
2919 varMap[":name"] = name
2920 self.cur.execute(sql + comment, varMap)
2921 self.cur.arraysize = 10
2922 res = self.cur.fetchall()
2923
2924 if res is None or len(res) == 0:
2925 try:
2926 self.cur.execute(sqlAdd + comment, varMap)
2927 retI = self.cur.rowcount
2928 tmp_log.debug(f"inserted new row with {retI}")
2929
2930 res = [[1, ""]]
2931 except Exception:
2932 self.dump_error_message(tmp_log)
2933 if res is not None and len(res) != 0:
2934 item = res[0]
2935
2936 dbJobID = item[0]
2937
2938 if item[1] in ["disabled"]:
2939 retStatus = False
2940
2941 if dbJobID >= int(retJobID) or (jobsetID == -1 and dbJobID >= int(retJobsetID)):
2942 if jobsetID == -1:
2943
2944 retJobsetID = dbJobID + 1
2945
2946 retJobID = retJobsetID + 1
2947 else:
2948
2949 retJobID = dbJobID + 1
2950
2951 varMap = {}
2952 varMap[":name"] = name
2953 varMap[":jobid"] = retJobID
2954 sql = "UPDATE ATLAS_PANDAMETA.users SET jobid=:jobid WHERE name=:name"
2955 self.cur.execute(sql + comment, varMap)
2956 tmp_log.debug(f"set JobID={retJobID}")
2957
2958 if not self._commit():
2959 raise RuntimeError("Commit error")
2960 tmp_log.debug(f"return JobID={retJobID} JobsetID={retJobsetID} Status={retStatus}")
2961 return retJobID, retJobsetID, retStatus
2962 except Exception:
2963 self.dump_error_message(tmp_log)
2964
2965 self._rollback()
2966 return retJobID, retJobsetID, retStatus
2967
2968
2969 def checkBanUser(self, dn, sourceLabel, jediCheck=False):
2970 comment = " /* DBProxy.checkBanUser */"
2971 try:
2972 methodName = "checkBanUser"
2973
2974 retStatus = True
2975 name = CoreUtils.clean_user_id(dn)
2976 tmp_log = self.create_tagged_logger(comment, f"name={name}")
2977 tmp_log.debug(f"start dn={dn} label={sourceLabel} jediCheck={jediCheck}")
2978
2979 self.conn.begin()
2980
2981 sql = "SELECT status,dn FROM ATLAS_PANDAMETA.users WHERE name=:name"
2982 varMap = {}
2983 varMap[":name"] = name
2984 self.cur.execute(sql + comment, varMap)
2985 self.cur.arraysize = 10
2986 res = self.cur.fetchone()
2987 if res is not None:
2988
2989 tmpStatus, dnInDB = res
2990 if tmpStatus in ["disabled"]:
2991 retStatus = False
2992 elif jediCheck and (dnInDB in ["", None] or dnInDB != dn):
2993
2994 sqlUp = "UPDATE ATLAS_PANDAMETA.users SET dn=:dn WHERE name=:name "
2995 varMap = {}
2996 varMap[":name"] = name
2997 varMap[":dn"] = dn
2998 self.cur.execute(sqlUp + comment, varMap)
2999 retI = self.cur.rowcount
3000 tmp_log.debug(f"update DN with Status={retI}")
3001 if retI != 1:
3002 retStatus = 1
3003 else:
3004
3005 if jediCheck:
3006 name = CoreUtils.clean_user_id(dn)
3007 sqlAdd = "INSERT INTO ATLAS_PANDAMETA.users "
3008 sqlAdd += "(ID,NAME,DN,LASTMOD,FIRSTJOB,LATESTJOB,CACHETIME,NCURRENT,JOBID) "
3009 sqlAdd += "VALUES(ATLAS_PANDAMETA.USERS_ID_SEQ.nextval,:name,:dn,"
3010 sqlAdd += "CURRENT_DATE,CURRENT_DATE,CURRENT_DATE,CURRENT_DATE,0,1) "
3011 varMap = {}
3012 varMap[":name"] = name
3013 varMap[":dn"] = dn
3014 self.cur.execute(sqlAdd + comment, varMap)
3015 retI = self.cur.rowcount
3016 tmp_log.debug(f"inserted new row with Status={retI}")
3017 if retI != 1:
3018 retStatus = 2
3019
3020 if not self._commit():
3021 raise RuntimeError("Commit error")
3022 tmp_log.debug(f"done with Status={retStatus}")
3023 return retStatus
3024 except Exception:
3025
3026 self._rollback()
3027
3028 self.dump_error_message(tmp_log)
3029 return retStatus
3030
3031
3032 def getEmailAddr(self, name, withDN=False, withUpTime=False):
3033 comment = " /* DBProxy.getEmailAddr */"
3034 tmp_log = self.create_tagged_logger(comment)
3035 tmp_log.debug(f"get email for {name}")
3036
3037 if withDN:
3038 failedRet = "", "", None
3039 sql = "SELECT email,dn,location FROM ATLAS_PANDAMETA.users WHERE name=:name"
3040 elif withUpTime:
3041 failedRet = "", None
3042 sql = "SELECT email,location FROM ATLAS_PANDAMETA.users WHERE name=:name"
3043 else:
3044 failedRet = ""
3045 sql = "SELECT email FROM ATLAS_PANDAMETA.users WHERE name=:name"
3046 try:
3047
3048 self.conn.begin()
3049
3050 varMap = {}
3051 varMap[":name"] = name
3052 self.cur.execute(sql + comment, varMap)
3053 self.cur.arraysize = 10
3054 res = self.cur.fetchall()
3055
3056 if not self._commit():
3057 raise RuntimeError("Commit error")
3058 if res is not None and len(res) != 0:
3059 if withDN or withUpTime:
3060 if withDN:
3061 email, dn, upTime = res[0]
3062 else:
3063 email, upTime = res[0]
3064
3065 try:
3066 upTime = datetime.datetime.strptime(upTime, "%Y-%m-%d %H:%M:%S")
3067 except Exception:
3068 upTime = None
3069 if withDN:
3070 return email, dn, upTime
3071 else:
3072 return email, upTime
3073 else:
3074 return res[0][0]
3075
3076 return failedRet
3077 except Exception:
3078 self.dump_error_message(tmp_log)
3079
3080 self._rollback()
3081 return failedRet
3082
3083
3084 def setEmailAddr(self, userName, emailAddr):
3085 comment = " /* DBProxy.setEmailAddr */"
3086 tmp_log = self.create_tagged_logger(comment)
3087 tmp_log.debug(f"{userName} to {emailAddr}")
3088
3089 sql = "UPDATE ATLAS_PANDAMETA.users SET email=:email,location=:uptime WHERE name=:name "
3090 try:
3091
3092 self.conn.begin()
3093
3094 varMap = {}
3095 varMap[":name"] = userName
3096 varMap[":email"] = emailAddr
3097 varMap[":uptime"] = naive_utcnow().strftime("%Y-%m-%d %H:%M:%S")
3098 self.cur.execute(sql + comment, varMap)
3099
3100 if not self._commit():
3101 raise RuntimeError("Commit error")
3102 return True
3103 except Exception:
3104
3105 self._rollback()
3106
3107 self.dump_error_message(tmp_log)
3108 return False
3109
3110
3111 def get_ban_users(self):
3112 comment = " /* DBProxy.get_ban_user */"
3113 tmp_log = self.create_tagged_logger(comment)
3114 tmp_log.debug("start")
3115
3116 sql = "SELECT name FROM ATLAS_PANDAMETA.users WHERE status=:status "
3117 try:
3118
3119 self.conn.begin()
3120 varMap = {}
3121 varMap[":status"] = "disabled"
3122 self.cur.execute(sql + comment, varMap)
3123 self.cur.arraysize = 10
3124 res = self.cur.fetchall()
3125
3126 if not self._commit():
3127 raise RuntimeError("Commit error")
3128 retVal = {name: False for name, in res}
3129 tmp_log.debug(f"got {retVal}")
3130 return True, retVal
3131 except Exception:
3132
3133 self._rollback()
3134
3135 self.dump_error_message(tmp_log)
3136 return False, None
3137
3138
3139 def register_token_key(self, client_name: str, lifetime: int) -> bool:
3140 """
3141 Register token key for a client with a lifetime and delete expired tokens
3142
3143 :param client_name: client name who owns the token key
3144 :param lifetime: lifetime of the token key in hours
3145
3146 :return: True if succeeded. False otherwise
3147 """
3148 comment = " /* DBProxy.register_token_key */"
3149 tmp_log = self.create_tagged_logger(comment, f"client_name={client_name}")
3150 tmp_log.debug("start")
3151 try:
3152
3153 self.conn.begin()
3154 time_now = naive_utcnow()
3155 expire_at = time_now + datetime.timedelta(hours=lifetime)
3156
3157 sql = f"SELECT 1 FROM {panda_config.schemaMETA}.proxykey WHERE dn=:dn AND expires>:limit "
3158 var_map = {":dn": client_name, ":limit": expire_at - datetime.timedelta(hours=1)}
3159 self.cur.execute(sql + comment, var_map)
3160 res = self.cur.fetchone()
3161 if res:
3162 tmp_log.debug("skip as a new key was registered recently")
3163 else:
3164
3165 max_id = None
3166 sql = "SELECT MAX(ID) FROM ATLAS_PANDAMETA.proxykey "
3167 self.cur.execute(sql + comment, {})
3168 res = self.cur.fetchone()
3169 if res:
3170 (max_id,) = res
3171 if max_id is None:
3172 max_id = 0
3173 max_id += 1
3174 max_id %= 10000000
3175
3176 sql = (
3177 f"INSERT INTO {panda_config.schemaMETA}.proxykey (ID,DN,CREDNAME,CREATED,EXPIRES,ORIGIN,MYPROXY) "
3178 "VALUES(:id,:dn,:credname,:created,:expires,:origin,:myproxy) "
3179 )
3180 var_map = {
3181 ":id": max_id,
3182 ":dn": client_name,
3183 ":credname": str(uuid.uuid4()),
3184 ":created": time_now,
3185 ":expires": expire_at,
3186 ":origin": "panda",
3187 ":myproxy": "NA",
3188 }
3189 try:
3190 self.cur.execute(sql + comment, var_map)
3191 tmp_log.debug(f"registered a new key with id={max_id}")
3192 except Exception as e:
3193
3194 tmp_log.debug(f"ignoring registration failure with {str(e)}")
3195
3196 sql = "DELETE FROM ATLAS_PANDAMETA.proxykey WHERE expires<:limit "
3197 var_map = {":limit": time_now}
3198 self.cur.execute(sql + comment, var_map)
3199
3200 if not self._commit():
3201 raise RuntimeError("Commit error")
3202
3203 tmp_log.debug("done")
3204 return True
3205 except Exception:
3206
3207 self._rollback()
3208
3209 self.dump_error_message(tmp_log)
3210 return False
3211
3212
3213 def insertNetworkMatrixData(self, data):
3214 comment = " /* DBProxy.insertNetworkMatrixData */"
3215 tmp_log = self.create_tagged_logger(comment)
3216 tmp_log.debug("start")
3217
3218
3219
3220
3221 sql_insert = """
3222 INSERT INTO ATLAS_PANDA.network_matrix_kv_temp (src, dst, key, value, ts)
3223 VALUES (:src, :dst, :key, :value, :ts)
3224 """
3225
3226 if self.backend == "postgres":
3227 sql_merge = (
3228 "INSERT INTO ATLAS_PANDA.network_matrix_kv "
3229 "(src, dst, key, value, ts) "
3230 "SELECT src, dst, key, value, ts FROM ATLAS_PANDA.NETWORK_MATRIX_KV_TEMP "
3231 "ON CONFLICT (src, dst, key) "
3232 "DO UPDATE SET value=EXCLUDED.value, ts=EXCLUDED.ts "
3233 )
3234 else:
3235 sql_merge = """
3236 MERGE /*+ FULL(nm_kv) */ INTO ATLAS_PANDA.network_matrix_kv nm_kv USING
3237 (SELECT src, dst, key, value, ts FROM ATLAS_PANDA.NETWORK_MATRIX_KV_TEMP) input
3238 ON (nm_kv.src = input.src AND nm_kv.dst= input.dst AND nm_kv.key = input.key)
3239 WHEN NOT MATCHED THEN
3240 INSERT (src, dst, key, value, ts)
3241 VALUES (input.src, input.dst, input.key, input.value, input.ts)
3242 WHEN MATCHED THEN
3243 UPDATE SET nm_kv.value = input.value, nm_kv.ts = input.ts
3244 """
3245 try:
3246 self.conn.begin()
3247 for shard in create_shards(data, 100):
3248 time1 = time.time()
3249 var_maps = []
3250 for entry in shard:
3251 var_map = {
3252 ":src": entry[0],
3253 ":dst": entry[1],
3254 ":key": entry[2],
3255 ":value": entry[3],
3256 ":ts": entry[4],
3257 }
3258 var_maps.append(var_map)
3259
3260 time2 = time.time()
3261 self.cur.executemany(sql_insert + comment, var_maps)
3262 time3 = time.time()
3263 tmp_log.debug(f"Processing a shard took: {time2 - time1}s of data preparation and {time3 - time2}s of insertion = {time3 - time1}")
3264
3265 time4 = time.time()
3266 self.cur.execute(sql_merge + comment)
3267 time5 = time.time()
3268 tmp_log.debug(f"Final merge took: {time5 - time4}s")
3269 if self.backend == "postgres":
3270
3271 self.cur.execute("DELETE FROM ATLAS_PANDA.NETWORK_MATRIX_KV_TEMP " + comment)
3272
3273 if not self._commit():
3274 raise RuntimeError("Commit error")
3275
3276 except Exception:
3277
3278 self._rollback()
3279
3280 self.dump_error_message(tmp_log)
3281 return None, ""
3282
3283
3284 def deleteOldNetworkData(self):
3285 comment = " /* DBProxy.deleteOldNetworkData */"
3286 tmp_log = self.create_tagged_logger(comment)
3287 tmp_log.debug("start")
3288
3289
3290 sql_delete = """
3291 DELETE FROM ATLAS_PANDA.network_matrix_kv
3292 WHERE ts < (current_date - 7)
3293 """
3294 try:
3295 self.conn.begin()
3296 time1 = time.time()
3297 self.cur.execute(sql_delete + comment)
3298 time2 = time.time()
3299 tmp_log.debug(f"Deletion of old network data took: {time2 - time1}s")
3300
3301
3302 if not self._commit():
3303 raise RuntimeError("Commit error")
3304
3305 except Exception:
3306
3307 self._rollback()
3308
3309 self.dump_error_message(tmp_log)
3310 return None, ""
3311
3312 def ups_get_queues(self):
3313 """
3314 Identify unified pilot streaming (ups) queues: served in pull (late binding) model
3315 :return: list of panda queues
3316 """
3317 comment = " /* DBProxy.ups_get_queues */"
3318 tmp_log = self.create_tagged_logger(comment)
3319 tmp_log.debug("start")
3320
3321 ups_queues = []
3322 sql = f"""
3323 SELECT /* use_json_type */ scj.panda_queue FROM {panda_config.schemaPANDA}.schedconfig_json scj
3324 WHERE scj.data.capability='ucore' AND scj.data.workflow = 'pull_ups'
3325 """
3326
3327 self.cur.execute(sql + comment)
3328 res = self.cur.fetchall()
3329 for (ups_queue,) in res:
3330 ups_queues.append(ups_queue)
3331
3332 tmp_log.debug("done")
3333 return ups_queues
3334
3335
3336 def calculateTaskRW_JEDI(self, jediTaskID):
3337 comment = " /* JediDBProxy.calculateTaskRW_JEDI */"
3338 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3339 tmpLog.debug("start")
3340 try:
3341
3342 sql = "SELECT ROUND(SUM((nFiles-nFilesFinished-nFilesFailed-nFilesOnHold)*walltime)/24/3600) "
3343 sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD ".format(panda_config.schemaJEDI)
3344 sql += "WHERE tabT.jediTaskID=tabD.jediTaskID AND masterID IS NULL "
3345 sql += "AND tabT.jediTaskID=:jediTaskID "
3346 varMap = {}
3347 varMap[":jediTaskID"] = jediTaskID
3348
3349 self.conn.begin()
3350
3351 self.cur.execute(sql + comment, varMap)
3352 resRT = self.cur.fetchone()
3353
3354 if not self._commit():
3355 raise RuntimeError("Commit error")
3356
3357 if resRT is None:
3358 retVal = None
3359 else:
3360 retVal = resRT[0]
3361 tmpLog.debug(f"RW={retVal}")
3362
3363 tmpLog.debug("done")
3364 return retVal
3365 except Exception:
3366
3367 self._rollback()
3368
3369 self.dump_error_message(tmpLog)
3370 return None
3371
3372
3373 def calculateRWwithPrio_JEDI(self, vo, prodSourceLabel, workQueue, priority):
3374 comment = " /* JediDBProxy.calculateRWwithPrio_JEDI */"
3375 if workQueue is None:
3376 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={None} prio={priority}")
3377 else:
3378 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name} prio={priority}")
3379 tmpLog.debug("start")
3380 try:
3381
3382 varMap = {}
3383 varMap[":vo"] = vo
3384 varMap[":prodSourceLabel"] = prodSourceLabel
3385 if priority is not None:
3386 varMap[":priority"] = priority
3387 sql = "SELECT tabT.jediTaskID,tabT.cloud,tabD.datasetID,nFiles-nFilesFinished-nFilesFailed,walltime "
3388 sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3389 sql += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3390 sql += "AND tabT.jediTaskID=tabD.jediTaskID AND masterID IS NULL "
3391 sql += "AND (nFiles-nFilesFinished-nFilesFailed)>0 "
3392 sql += "AND tabT.vo=:vo AND prodSourceLabel=:prodSourceLabel "
3393
3394 if priority is not None:
3395 sql += "AND currentPriority>=:priority "
3396
3397 if workQueue is not None:
3398 if workQueue.is_global_share:
3399 sql += "AND gshare=:wq_name "
3400 sql += f"AND tabT.workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaJEDI}.jedi_work_queue WHERE queue_function = 'Resource') "
3401 varMap[":wq_name"] = workQueue.queue_name
3402 else:
3403 sql += "AND workQueue_ID=:wq_id "
3404 varMap[":wq_id"] = workQueue.queue_id
3405
3406 sql += "AND tabT.status IN (:status1,:status2,:status3,:status4) "
3407 sql += f"AND tabD.type IN ({INPUT_TYPES_var_str}) "
3408 varMap.update(INPUT_TYPES_var_map)
3409 varMap[":status1"] = "ready"
3410 varMap[":status2"] = "scouting"
3411 varMap[":status3"] = "running"
3412 varMap[":status4"] = "pending"
3413 sql += "AND tabT.cloud IS NOT NULL "
3414
3415 self.conn.begin()
3416
3417 self.cur.execute(sql + comment, varMap)
3418 resList = self.cur.fetchall()
3419
3420 if not self._commit():
3421 raise RuntimeError("Commit error")
3422
3423 retMap = {}
3424 sqlF = "SELECT fsize,startEvent,endEvent,nEvents "
3425 sqlF += f"FROM {panda_config.schemaJEDI}.JEDI_Dataset_Contents "
3426 sqlF += "WHERE jediTaskID=:jediTaskID AND datasetID=:datasetID AND rownum<=1"
3427 for jediTaskID, cloud, datasetID, nRem, walltime in resList:
3428
3429 varMap = {}
3430 varMap[":jediTaskID"] = jediTaskID
3431 varMap[":datasetID"] = datasetID
3432
3433 self.conn.begin()
3434
3435 self.cur.execute(sqlF + comment, varMap)
3436 resFile = self.cur.fetchone()
3437
3438 if not self._commit():
3439 raise RuntimeError("Commit error")
3440 if resFile is not None:
3441
3442 fsize, startEvent, endEvent, nEvents = resFile
3443 effectiveFsize = CoreUtils.getEffectiveFileSize(fsize, startEvent, endEvent, nEvents)
3444 tmpRW = nRem * effectiveFsize * walltime
3445 if cloud not in retMap:
3446 retMap[cloud] = 0
3447 retMap[cloud] += tmpRW
3448 for cloudName, rwValue in retMap.items():
3449 retMap[cloudName] = int(rwValue / 24 / 3600)
3450 tmpLog.debug(f"RW={str(retMap)}")
3451
3452 tmpLog.debug("done")
3453 return retMap
3454 except Exception:
3455
3456 self._rollback()
3457
3458 self.dump_error_message(tmpLog)
3459 return None
3460
3461
3462 def calculateWorldRWwithPrio_JEDI(self, vo, prodSourceLabel, workQueue, priority):
3463 comment = " /* JediDBProxy.calculateWorldRWwithPrio_JEDI */"
3464 if workQueue is None:
3465 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={None} prio={priority}")
3466 else:
3467 tmpLog = self.create_tagged_logger(comment, f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name} prio={priority}")
3468 tmpLog.debug("start")
3469 try:
3470
3471 varMap = {}
3472 varMap[":vo"] = vo
3473 varMap[":prodSourceLabel"] = prodSourceLabel
3474 varMap[":worldCloud"] = JediTaskSpec.worldCloudName
3475 if priority is not None:
3476 varMap[":priority"] = priority
3477 sql = "SELECT tabT.nucleus,SUM((nEvents-nEventsUsed)*(CASE WHEN cpuTime IS NULL THEN 300 ELSE cpuTime END)) "
3478 sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD,{0}.JEDI_AUX_Status_MinTaskID tabA ".format(panda_config.schemaJEDI)
3479 sql += "WHERE tabT.status=tabA.status AND tabT.jediTaskID>=tabA.min_jediTaskID "
3480 sql += "AND tabT.jediTaskID=tabD.jediTaskID AND masterID IS NULL "
3481 sql += "AND (nFiles-nFilesFinished-nFilesFailed)>0 "
3482 sql += "AND tabT.vo=:vo AND prodSourceLabel=:prodSourceLabel "
3483 sql += "AND tabT.cloud=:worldCloud "
3484
3485 if priority is not None:
3486 sql += "AND currentPriority>=:priority "
3487
3488 if workQueue is not None:
3489 if workQueue.is_global_share:
3490 sql += "AND gshare=:wq_name "
3491 sql += f"AND tabT.workqueue_id NOT IN (SELECT queue_id FROM {panda_config.schemaJEDI}.jedi_work_queue WHERE queue_function = 'Resource') "
3492 varMap[":wq_name"] = workQueue.queue_name
3493 else:
3494 sql += "AND workQueue_ID=:wq_id "
3495 varMap[":wq_id"] = workQueue.queue_id
3496
3497 sql += "AND tabT.status IN (:status1,:status2,:status3,:status4) "
3498 sql += f"AND tabD.type IN ({INPUT_TYPES_var_str}) "
3499 varMap.update(INPUT_TYPES_var_map)
3500 varMap[":status1"] = "ready"
3501 varMap[":status2"] = "scouting"
3502 varMap[":status3"] = "running"
3503 varMap[":status4"] = "pending"
3504 sql += "GROUP BY tabT.nucleus "
3505
3506 self.conn.begin()
3507
3508 self.cur.execute(sql + comment, varMap)
3509 resList = self.cur.fetchall()
3510
3511 if not self._commit():
3512 raise RuntimeError("Commit error")
3513
3514 retMap = {}
3515 for nucleus, worldRW in resList:
3516 retMap[nucleus] = worldRW
3517 tmpLog.debug(f"RW={str(retMap)}")
3518
3519 tmpLog.debug("done")
3520 return retMap
3521 except Exception:
3522
3523 self._rollback()
3524
3525 self.dump_error_message(tmpLog)
3526 return None
3527
3528
3529 def calculateTaskWorldRW_JEDI(self, jediTaskID):
3530 comment = " /* JediDBProxy.calculateTaskWorldRW_JEDI */"
3531 tmpLog = self.create_tagged_logger(comment, f"jediTaskID={jediTaskID}")
3532 tmpLog.debug("start")
3533 try:
3534
3535 sql = (
3536 "SELECT (nEvents-nEventsUsed)*(CASE "
3537 "WHEN cpuTime IS NULL THEN 300 "
3538 "WHEN cpuTimeUnit='mHS06sPerEvent' OR cpuTimeUnit='mHS06sPerEventFixed' THEN cpuTime/1000 "
3539 "ELSE cpuTime END) "
3540 )
3541 sql += "FROM {0}.JEDI_Tasks tabT,{0}.JEDI_Datasets tabD ".format(panda_config.schemaJEDI)
3542 sql += "WHERE tabT.jediTaskID=tabD.jediTaskID AND masterID IS NULL "
3543 sql += "AND tabT.jediTaskID=:jediTaskID "
3544 varMap = {}
3545 varMap[":jediTaskID"] = jediTaskID
3546
3547 self.conn.begin()
3548
3549 self.cur.execute(sql + comment, varMap)
3550 resRT = self.cur.fetchone()
3551
3552 if not self._commit():
3553 raise RuntimeError("Commit error")
3554
3555 if resRT is None:
3556 retVal = None
3557 else:
3558 retVal = resRT[0]
3559 tmpLog.debug(f"RW={retVal}")
3560
3561 tmpLog.debug("done")
3562 return retVal
3563 except Exception:
3564
3565 self._rollback()
3566
3567 self.dump_error_message(tmpLog)
3568 return None
3569
3570 def load_sw_map(self):
3571 comment = " /* JediDBProxy.load_sw_map */"
3572 tmp_log = self.create_tagged_logger(comment)
3573 tmp_log.debug("start")
3574
3575 sw_map = {}
3576
3577 try:
3578
3579 sql = f"SELECT PANDA_QUEUE, DATA FROM {panda_config.schemaPANDA}.SW_TAGS"
3580 self.cur.execute(sql + comment)
3581 results = self.cur.fetchall()
3582 for panda_queue, data in results:
3583 sw_map[panda_queue] = json.loads(data)
3584
3585 tmp_log.debug("done")
3586 return sw_map
3587
3588 except Exception:
3589 self._rollback()
3590 self.dump_error_message(tmp_log)
3591 return None
3592
3593 def getNetworkMetrics(self, dst, keyList):
3594 """
3595 Get the network metrics from a source to all possible destinations
3596 :param dst: destination site
3597 :param keyList: activity keys.
3598 :return: returns a dictionary with network values in the style
3599 {
3600 <dest>: {<key>: <value>, <key>: <value>},
3601 <dest>: {<key>: <value>, <key>: <value>},
3602 ...
3603 }
3604 """
3605 comment = " /* JediDBProxy.getNetworkMetrics */"
3606 tmpLog = self.create_tagged_logger(comment)
3607 tmpLog.debug("start")
3608
3609 latest_validity = naive_utcnow() - datetime.timedelta(minutes=60)
3610
3611 varMap = {":dst": dst, ":latest_validity": latest_validity}
3612
3613 key_var_names_str, key_var_map = get_sql_IN_bind_variables(keyList, prefix=":key")
3614
3615 sql = f"""
3616 SELECT src, key, value, ts FROM {panda_config.schemaJEDI}.network_matrix_kv
3617 WHERE dst = :dst AND key IN ({key_var_names_str})
3618 AND ts > :latest_validity
3619 """
3620
3621 varMap.update(key_var_map)
3622
3623 self.cur.execute(sql + comment, varMap)
3624 resList = self.cur.fetchall()
3625
3626 networkMap = {}
3627 total = {}
3628 for res in resList:
3629 src, key, value, ts = res
3630 networkMap.setdefault(src, {})
3631 networkMap[src][key] = value
3632 total.setdefault(key, 0)
3633 try:
3634 total[key] += value
3635 except Exception:
3636 pass
3637 networkMap["total"] = total
3638 tmpLog.debug(f"network map to nucleus {dst} is: {networkMap}")
3639
3640 return networkMap
3641
3642 def getBackloggedNuclei(self):
3643 """
3644 Return a list of nuclei, which has built up transfer backlog. We will consider a nucleus as backlogged,
3645 when it has over 2000 output transfers queued and there are more than 3 sites with queues over
3646 """
3647
3648 comment = " /* JediDBProxy.getBackloggedNuclei */"
3649 tmpLog = self.create_tagged_logger(comment)
3650 tmpLog.debug("start")
3651
3652 latest_validity = naive_utcnow() - datetime.timedelta(minutes=60)
3653
3654 nqueued_cap = self.getConfigValue("taskbrokerage", "NQUEUED_NUC_CAP", "jedi")
3655 if nqueued_cap is None:
3656 nqueued_cap = 2000
3657
3658 varMap = {":latest_validity": latest_validity, ":nqueued_cap": nqueued_cap}
3659
3660 sql = f"""
3661 SELECT dst
3662 FROM {panda_config.schemaJEDI}.network_matrix_kv
3663 WHERE key = 'Production Output_queued'
3664 AND ts > :latest_validity
3665 GROUP BY dst
3666 HAVING SUM(value) > :nqueued_cap
3667 """
3668
3669 self.cur.execute(sql + comment, varMap)
3670 try:
3671 backlogged_nuclei = [entry[0] for entry in self.cur.fetchall()]
3672 except IndexError:
3673 backlogged_nuclei = []
3674
3675 tmpLog.debug(f"Nuclei with a long backlog are: {backlogged_nuclei}")
3676
3677 return backlogged_nuclei
3678
3679 def getPandaSiteToOutputStorageSiteMapping(self):
3680 """
3681 Get a mapping of panda sites to their storage site. We consider the storage site of the default ddm endpoint
3682 :return: dictionary with panda_site_name keys and site_name values
3683 """
3684 comment = " /* JediDBProxy.getPandaSiteToOutputStorageSiteMapping */"
3685 tmpLog = self.create_tagged_logger(comment)
3686 tmpLog.debug("start")
3687
3688 sql = """
3689 SELECT pdr.panda_site_name, de.site_name, nvl(pdr.scope, 'default')
3690 FROM atlas_panda.panda_ddm_relation pdr, atlas_panda.ddm_endpoint de
3691 WHERE pdr.default_write = 'Y'
3692 AND pdr.ddm_endpoint_name = de.ddm_endpoint_name
3693 """
3694
3695 self.cur.execute(sql + comment)
3696 resList = self.cur.fetchall()
3697 mapping = {}
3698
3699 for res in resList:
3700 pandaSiteName, siteName, scope = res
3701 mapping.setdefault(pandaSiteName, {})
3702 mapping[pandaSiteName][scope] = siteName
3703
3704
3705
3706 tmpLog.debug("done")
3707 return mapping
3708
3709 def get_active_gshare_rtypes(self, vo):
3710 """
3711 Gets the active gshare/resource wq combinations. Active means they have at least 1 job in (assigned, activate, starting, running, ...)
3712 :param vo: Virtual Organization
3713 """
3714 comment = " /* DBProxy.get_active_gshare_rtypes */"
3715 tmp_log = self.create_tagged_logger(comment, f"vo={vo}")
3716 tmp_log.debug("start")
3717
3718
3719 var_map = {":vo": vo}
3720
3721
3722 sql_get_active_combinations = f"""
3723 WITH gshare_results AS (
3724 SELECT /*+ RESULT_CACHE */ gshare AS name, resource_type
3725 FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS
3726 WHERE vo=:vo
3727 UNION
3728 SELECT /*+ RESULT_CACHE */ gshare AS name, resource_type
3729 FROM {panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS
3730 WHERE vo=:vo
3731 ), wq_results AS (
3732 SELECT jwq.QUEUE_NAME AS name, jss.resource_type
3733 FROM {panda_config.schemaPANDA}.JOBS_SHARE_STATS jss
3734 JOIN {panda_config.schemaPANDA}.JEDI_WORK_QUEUE jwq ON jss.WORKQUEUE_ID = jwq.QUEUE_ID
3735 WHERE jwq.QUEUE_FUNCTION = 'Resource' AND jss.vo=:vo AND jwq.vo=:vo
3736 UNION
3737 SELECT jwq.QUEUE_NAME AS name, jss.resource_type
3738 FROM {panda_config.schemaPANDA}.JOBSDEFINED_SHARE_STATS jss
3739 JOIN {panda_config.schemaPANDA}.JEDI_WORK_QUEUE jwq ON jss.WORKQUEUE_ID = jwq.QUEUE_ID
3740 WHERE jwq.QUEUE_FUNCTION = 'Resource' AND jss.vo=:vo AND jwq.vo=:vo
3741 )
3742 SELECT name, resource_type FROM gshare_results
3743 UNION
3744 SELECT name, resource_type FROM wq_results
3745 GROUP BY name, resource_type
3746 """
3747
3748 return_map = {}
3749 try:
3750 self.cur.arraysize = 1000
3751 self.cur.execute(f"{sql_get_active_combinations} {comment}", var_map)
3752 res = self.cur.fetchall()
3753
3754
3755 for name, resource_type in res:
3756 return_map.setdefault(name, [])
3757 return_map[name].append(resource_type)
3758
3759 tmp_log.debug("done")
3760 return return_map
3761 except Exception:
3762 self.dump_error_message(tmp_log)
3763 return {}
3764
3765
3766 def getWorkQueueMap(self):
3767 self.refreshWorkQueueMap()
3768 return self.workQueueMap
3769
3770
3771 def refreshWorkQueueMap(self):
3772
3773 if self.updateTimeForWorkQueue is not None and (naive_utcnow() - self.updateTimeForWorkQueue) < datetime.timedelta(minutes=10):
3774 return
3775 comment = " /* JediDBProxy.refreshWorkQueueMap */"
3776 tmpLog = self.create_tagged_logger(comment)
3777
3778 leave_shares = self.get_sorted_leaves()
3779
3780 if self.workQueueMap is None:
3781 self.workQueueMap = WorkQueueMapper()
3782
3783 sql = self.workQueueMap.getSqlQuery()
3784 try:
3785
3786 self.conn.begin()
3787 self.cur.arraysize = 1000
3788 self.cur.execute(sql + comment)
3789 res = self.cur.fetchall()
3790 if not self._commit():
3791 raise RuntimeError("Commit error")
3792
3793 self.workQueueMap.makeMap(res, leave_shares)
3794 tmpLog.debug("done")
3795 self.updateTimeForWorkQueue = naive_utcnow()
3796 return True
3797 except Exception:
3798
3799 self._rollback()
3800
3801 self.dump_error_message(tmpLog)
3802 return False
3803
3804
3805
3806 def get_entity_module(base_mod) -> EntityModule:
3807 return base_mod.get_composite_module("entity")