File indexing completed on 2026-04-10 08:39:01
0001 import sys
0002 import threading
0003 import traceback
0004 from datetime import datetime, timedelta, timezone
0005
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007
0008 from pandaserver.config import panda_config
0009 from pandaserver.configurator import aux
0010 from pandaserver.configurator.aux import *
0011
0012 _logger = PandaLogger().getLogger("configurator")
0013
0014
0015 WRITE_LAN = "write_lan"
0016 READ_LAN = "read_lan"
0017 DEFAULT = "default"
0018
0019
0020 class Configurator(threading.Thread):
0021 def __init__(self, taskBuffer, log_stream=None):
0022 threading.Thread.__init__(self)
0023
0024 self.taskBuffer = taskBuffer
0025 if log_stream:
0026 self.log_stream = log_stream
0027 else:
0028 self.log_stream = _logger
0029
0030 if hasattr(panda_config, "CRIC_URL_SITES"):
0031 self.CRIC_URL_SITES = panda_config.CRIC_URL_SITES
0032 else:
0033 self.CRIC_URL_SITES = "https://atlas-cric.cern.ch/api/atlas/site/query/?json"
0034
0035 if hasattr(panda_config, "CRIC_URL_DDMENDPOINTS"):
0036 self.CRIC_URL_DDMENDPOINTS = panda_config.CRIC_URL_DDMENDPOINTS
0037 else:
0038 self.CRIC_URL_DDMENDPOINTS = "https://atlas-cric.cern.ch/api/atlas/ddmendpoint/query/?json"
0039
0040 if hasattr(panda_config, "CRIC_URL_SCHEDCONFIG"):
0041 self.CRIC_URL_SCHEDCONFIG = panda_config.CRIC_URL_SCHEDCONFIG
0042 else:
0043 self.CRIC_URL_SCHEDCONFIG = "https://atlas-cric.cern.ch/api/atlas/pandaqueue/query/?json"
0044
0045 if hasattr(panda_config, "CRIC_URL_DDMBLACKLIST"):
0046 self.CRIC_URL_DDMBLACKLIST = panda_config.CRIC_URL_DDMBLACKLIST
0047 else:
0048 self.CRIC_URL_DDMBLACKLIST = "https://atlas-cric.cern.ch/api/atlas/ddmendpointstatus/query/?json&activity=write_wan&fstate=OFF"
0049
0050 if hasattr(panda_config, "CRIC_URL_DDMBLACKLIST_READ"):
0051 self.CRIC_URL_DDMBLACKLIST_READ = panda_config.CRIC_URL_DDMBLACKLIST_READ
0052 else:
0053 self.CRIC_URL_DDMBLACKLIST_READ = "https://atlas-cric.cern.ch/api/atlas/ddmendpointstatus/query/?json&activity=read_wan&fstate=OFF"
0054
0055 if hasattr(panda_config, "CRIC_URL_DDMBLACKLIST_FULL"):
0056 self.CRIC_URL_DDMBLACKLIST_FULL = panda_config.CRIC_URL_DDMBLACKLIST_FULL
0057 else:
0058 self.CRIC_URL_DDMBLACKLIST_FULL = "https://atlas-cric.cern.ch/api/atlas/ddmendpointstatus/query/?json"
0059
0060 if hasattr(panda_config, "RUCIO_RSE_USAGE"):
0061 self.RUCIO_RSE_USAGE = panda_config.RUCIO_RSE_USAGE
0062 else:
0063 self.RUCIO_RSE_USAGE = "https://rucio-hadoop.cern.ch/dumps/rse_usage/current.json"
0064
0065 def retrieve_data(self):
0066 self.log_stream.debug("Getting site dump...")
0067 self.site_dump = aux.get_dump(self.CRIC_URL_SITES)
0068 if not self.site_dump:
0069 self.log_stream.error("The site dump was not retrieved correctly")
0070 return False
0071 self.log_stream.debug("Done")
0072
0073 self.log_stream.debug("Getting DDM endpoints dump...")
0074 self.endpoint_dump = aux.get_dump(self.CRIC_URL_DDMENDPOINTS)
0075 if not self.endpoint_dump:
0076 self.log_stream.error("The endpoint dump was not retrieved correctly")
0077 return False
0078 self.log_stream.debug("Done")
0079
0080 self.log_stream.debug("Parsing endpoints...")
0081 self.endpoint_token_dict = self.parse_endpoints()
0082 self.log_stream.debug("Done")
0083
0084 self.log_stream.debug("Getting schedconfig dump...")
0085 self.schedconfig_dump = aux.get_dump(self.CRIC_URL_SCHEDCONFIG)
0086 if not self.schedconfig_dump:
0087 self.log_stream.error("The schedconfig dump was not retrieved correctly")
0088 return False
0089 self.log_stream.debug("Done")
0090
0091
0092 self.log_stream.debug("Getting ddmblacklist dump...")
0093 try:
0094 if self.CRIC_URL_DDMBLACKLIST:
0095 dump = aux.get_dump(self.CRIC_URL_DDMBLACKLIST)
0096 if dump is None:
0097
0098 self.log_stream.error("The blacklisted endpoint dump could not be retrieved (None)")
0099 return False
0100
0101 self.blacklisted_endpoints = list(dump) if isinstance(dump, (dict, list, tuple, set)) else list(
0102 dump or [])
0103 self.blacklisted_endpoints_write = self.blacklisted_endpoints
0104 self.log_stream.debug(f"ddmblacklist(write) retrieved {len(self.blacklisted_endpoints)} endpoints")
0105 else:
0106 self.blacklisted_endpoints = []
0107 self.blacklisted_endpoints_write = []
0108 except Exception as e:
0109 self.log_stream.warning(f"Failed to retrieve ddmblacklist(write); defaulting to empty. Error: {e}")
0110 self.blacklisted_endpoints = []
0111 self.blacklisted_endpoints_write = []
0112
0113 self.log_stream.debug(f"Blacklisted endpoints {self.blacklisted_endpoints}")
0114 self.log_stream.debug(f"Blacklisted endpoints write {self.blacklisted_endpoints_write}")
0115 self.log_stream.debug("Done")
0116
0117
0118 self.log_stream.debug("Getting ddmblacklist read dump...")
0119 try:
0120 if self.CRIC_URL_DDMBLACKLIST_READ:
0121 dump = aux.get_dump(self.CRIC_URL_DDMBLACKLIST_READ)
0122
0123 if dump is None:
0124 self.log_stream.warning("ddmblacklist(read) returned None; defaulting to empty")
0125 self.blacklisted_endpoints_read = []
0126 else:
0127 self.blacklisted_endpoints_read = list(dump) if isinstance(dump,
0128 (dict, list, tuple, set)) else list(
0129 dump or [])
0130 self.log_stream.debug(f"ddmblacklist(read) retrieved {len(self.blacklisted_endpoints_read)} endpoints")
0131 else:
0132 self.blacklisted_endpoints_read = []
0133 except Exception as e:
0134 self.log_stream.warning(f"Failed to retrieve ddmblacklist(read); defaulting to empty. Error: {e}")
0135 self.blacklisted_endpoints_read = []
0136
0137 self.log_stream.debug(f"Blacklisted endpoints read {self.blacklisted_endpoints_read}")
0138 self.log_stream.debug("Done")
0139
0140
0141 self.log_stream.debug("Getting DDM detailed status...")
0142 try:
0143 if self.CRIC_URL_DDMBLACKLIST_FULL:
0144 dump = aux.get_dump(self.CRIC_URL_DDMBLACKLIST_FULL)
0145 if dump is None:
0146 self.log_stream.warning("ddmblacklist(full) returned None; defaulting to empty")
0147 self.ddm_detailed_exclusions = {}
0148 else:
0149 self.ddm_detailed_exclusions = dump if isinstance(dump, dict) else dict(dump or {})
0150
0151 if isinstance(dump, dict):
0152 self.ddm_detailed_exclusions = dump
0153 else:
0154
0155 self.ddm_detailed_exclusions = dict(dump) if hasattr(dump, "items") else {}
0156 self.log_stream.debug(
0157 f"ddmblacklist(full) retrieved {len(self.ddm_detailed_exclusions)} entries"
0158 )
0159 else:
0160 self.ddm_detailed_exclusions = {}
0161 except Exception as e:
0162 self.log_stream.warning(f"Failed to retrieve ddmblacklist(full); defaulting to empty. Error: {e}")
0163 self.ddm_detailed_exclusions = {}
0164
0165 self.log_stream.debug(f"Blacklisted endpoints full {self.ddm_detailed_exclusions}")
0166 self.log_stream.debug("Done")
0167
0168 if self.RUCIO_RSE_USAGE:
0169 self.log_stream.debug("Getting Rucio RSE usage dump...")
0170 self.rse_usage = aux.get_dump(self.RUCIO_RSE_USAGE)
0171 if not self.rse_usage:
0172 self.log_stream.error("The RSE usage dump was not retrieved correctly")
0173 return False
0174 self.log_stream.debug("Done")
0175 else:
0176 self.rse_usage = {}
0177 return True
0178
0179 def get_site_info(self, site):
0180 """
0181 Gets the relevant information from a site
0182 """
0183 state = site["state"]
0184 tier_level = site["tier_level"]
0185
0186 if "Nucleus" in site["datapolicies"]:
0187 role = "nucleus"
0188 else:
0189 role = "satellite"
0190
0191 return role, state, tier_level
0192
0193 def parse_endpoints(self):
0194 """
0195 Puts the relevant information from endpoint_dump into a more usable format
0196 """
0197 endpoint_token_dict = {}
0198 for endpoint, endpoint_config in self.endpoint_dump.items():
0199
0200 if endpoint_config["state"] == "ACTIVE":
0201 endpoint_token_dict[endpoint] = {}
0202 endpoint_token_dict[endpoint]["token"] = endpoint_config["token"]
0203 endpoint_token_dict[endpoint]["site_name"] = endpoint_config["site"]
0204 endpoint_token_dict[endpoint]["type"] = endpoint_config["type"]
0205 if endpoint_config["is_tape"]:
0206 endpoint_token_dict[endpoint]["is_tape"] = "Y"
0207 else:
0208 endpoint_token_dict[endpoint]["is_tape"] = "N"
0209 else:
0210 self.log_stream.debug(f"parse_endpoints: skipped endpoint {endpoint} (type: {endpoint_config['type']}, state: {endpoint_config['state']})")
0211
0212 return endpoint_token_dict
0213
0214 def process_site_dumps(self):
0215 """
0216 Parses the CRIC site and endpoint dumps and prepares a format loadable to the DB
0217 """
0218
0219 sites_list = []
0220 included_sites = []
0221 ddm_endpoints_list = []
0222 panda_sites_list = []
0223
0224
0225
0226 try:
0227 panda_ddm_relation_list = self.get_panda_ddm_relations()
0228 except Exception:
0229
0230 self.log_stream.error(f"get_panda_ddm_relations excepted with {traceback.print_exc()}")
0231 panda_ddm_relation_list = []
0232
0233
0234 for site_name, site_config in self.site_dump.items():
0235
0236 (site_role, site_state, tier_level) = self.get_site_info(site_config)
0237 if site_state == "ACTIVE" and site_name not in included_sites:
0238 sites_list.append(
0239 {
0240 "site_name": site_name,
0241 "role": site_role,
0242 "tier_level": tier_level,
0243 }
0244 )
0245 included_sites.append(site_name)
0246 else:
0247 self.log_stream.debug(f"process_site_dumps: skipped site {site_name} (state: {site_state})")
0248
0249
0250 for ddm_endpoint_name in site_config["ddmendpoints"]:
0251 try:
0252 ddm_spacetoken_name = self.endpoint_token_dict[ddm_endpoint_name]["token"]
0253 ddm_endpoint_type = self.endpoint_token_dict[ddm_endpoint_name]["type"]
0254 ddm_endpoint_is_tape = self.endpoint_token_dict[ddm_endpoint_name]["is_tape"]
0255 if ddm_endpoint_name in self.blacklisted_endpoints:
0256 ddm_endpoint_blacklisted_write = "Y"
0257 self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} is blacklisted for write")
0258 else:
0259 ddm_endpoint_blacklisted_write = "N"
0260 self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} is NOT blacklisted for write")
0261
0262 if ddm_endpoint_name in self.blacklisted_endpoints_read:
0263 ddm_endpoint_blacklisted_read = "Y"
0264 self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} is blacklisted for read")
0265 else:
0266 ddm_endpoint_blacklisted_read = "N"
0267 self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} is NOT blacklisted for read")
0268
0269 detailed_status = {}
0270 if ddm_endpoint_name in self.ddm_detailed_exclusions:
0271 for activity in self.ddm_detailed_exclusions[ddm_endpoint_name]:
0272 detailed_status[activity] = self.ddm_detailed_exclusions[ddm_endpoint_name][activity]["status"]["value"]
0273
0274 self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} has detailed exclusions {detailed_status}")
0275 except KeyError:
0276 continue
0277
0278
0279 try:
0280 space_used = self.rse_usage[ddm_endpoint_name]["storage"]["used"] / GB
0281 self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} has used space {space_used}GB")
0282 space_free = self.rse_usage[ddm_endpoint_name]["storage"]["free"] / GB
0283 self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} has free space {space_free}GB")
0284 space_total = space_used + space_free
0285 space_timestamp = datetime.strptime(
0286 self.rse_usage[ddm_endpoint_name]["storage"]["updated_at"],
0287 "%Y-%m-%d %H:%M:%S",
0288 )
0289 self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} has space timestamp {space_timestamp}")
0290
0291 except (KeyError, ValueError):
0292 space_used, space_free, space_total, space_timestamp = (
0293 None,
0294 None,
0295 None,
0296 None,
0297 )
0298 self.log_stream.warning(f"process_site_dumps: no rse storage usage information for {ddm_endpoint_name}")
0299
0300
0301 try:
0302 space_expired = self.rse_usage[ddm_endpoint_name]["expired"]["used"] / GB
0303 except KeyError:
0304 space_expired = 0
0305 self.log_stream.warning(f"process_site_dumps: no rse EXPIRED usage information for {ddm_endpoint_name}")
0306
0307 ddm_spacetoken_state = site_config["ddmendpoints"][ddm_endpoint_name]["state"]
0308 if ddm_spacetoken_state == "ACTIVE":
0309 ddm_endpoints_list.append(
0310 {
0311 "ddm_endpoint_name": ddm_endpoint_name,
0312 "site_name": site_name,
0313 "ddm_spacetoken_name": ddm_spacetoken_name,
0314 "type": ddm_endpoint_type,
0315 "is_tape": ddm_endpoint_is_tape,
0316 "blacklisted": ddm_endpoint_blacklisted_write,
0317 "blacklisted_write": ddm_endpoint_blacklisted_write,
0318 "blacklisted_read": ddm_endpoint_blacklisted_read,
0319 "detailed_status": json.dumps(detailed_status),
0320 "space_used": space_used,
0321 "space_free": space_free,
0322 "space_total": space_total,
0323 "space_expired": space_expired,
0324 "space_timestamp": space_timestamp,
0325 }
0326 )
0327 self.log_stream.debug(f"process_site_dumps: added DDM endpoint {ddm_endpoint_name}")
0328 else:
0329 self.log_stream.debug(f"process_site_dumps: skipped DDM endpoint {ddm_endpoint_name} because of state {ddm_spacetoken_state}")
0330
0331
0332 for panda_resource in site_config["presources"]:
0333 for panda_site in site_config["presources"][panda_resource]:
0334 panda_site_state = site_config["presources"][panda_resource][panda_site]["state"]
0335 if panda_site_state != "ACTIVE":
0336 self.log_stream.debug(f"process_site_dumps: skipped PanDA site {panda_site} (state: {panda_site_state})")
0337 continue
0338 panda_site_name = panda_site
0339 panda_sites_list.append({"panda_site_name": panda_site_name, "site_name": site_name})
0340
0341 return sites_list, panda_sites_list, ddm_endpoints_list, panda_ddm_relation_list
0342
0343 def parse_role(self, role):
0344 """
0345 Traditionally roles have been "read_lan" or "write_lan". We will consider them the default roles.
0346 If you want to overwrite the role for specific jobs in CRIC, you can define e.g. "read_lan_analysis". Here we
0347 want to strip the role to the "analysis" tag and return role "read_lan" and scope "analysis"
0348
0349 Examples:
0350 "read_lan_analysis" --> "read_lan" and "analysis"
0351 "read_lan" --> "read_lan" and "default"
0352 """
0353
0354 if role == READ_LAN or role == WRITE_LAN:
0355 return role, DEFAULT
0356
0357
0358 elif role.startswith(READ_LAN):
0359 role_clean = READ_LAN
0360 scope = role.replace(f"{READ_LAN}_", "")
0361 return role_clean, scope
0362
0363
0364 elif role.startswith(WRITE_LAN):
0365 role_clean = WRITE_LAN
0366 scope = role.replace(f"{WRITE_LAN}_", "")
0367 return role_clean, scope
0368
0369
0370 else:
0371 return role, DEFAULT
0372
0373 def get_panda_ddm_relations(self):
0374 """
0375 Gets the DDM endpoints assigned to a panda queue, based on the CRIC astorages field of the panda queue definition
0376 """
0377 relation_list = []
0378
0379
0380 for long_panda_site_name in self.schedconfig_dump:
0381 panda_site_name = self.schedconfig_dump[long_panda_site_name]["panda_resource"]
0382 cpu_site_name = self.schedconfig_dump[long_panda_site_name]["atlas_site"]
0383 dict_ddm_endpoint = {}
0384
0385
0386 if self.schedconfig_dump[long_panda_site_name]["astorages"]:
0387 astorages = self.schedconfig_dump[long_panda_site_name]["astorages"]
0388
0389
0390 order_read = {DEFAULT: 1}
0391 order_write = {DEFAULT: 1}
0392 for role in astorages:
0393
0394 if not role.startswith(READ_LAN) and not role.startswith(WRITE_LAN):
0395 continue
0396
0397 for ddm_endpoint_name in astorages[role]:
0398
0399 role_clean, scope = self.parse_role(role)
0400 order_read.setdefault(scope, 1)
0401 order_write.setdefault(scope, 1)
0402
0403
0404 dict_ddm_endpoint.setdefault(ddm_endpoint_name, {})
0405 dict_ddm_endpoint[ddm_endpoint_name].setdefault(scope, {"order_write": None, "order_read": None, "role": []})
0406
0407 if role.startswith(WRITE_LAN):
0408 dict_ddm_endpoint[ddm_endpoint_name][scope]["order_write"] = order_write[scope]
0409 order_write[scope] += 1
0410 elif role.startswith(READ_LAN):
0411 dict_ddm_endpoint[ddm_endpoint_name][scope]["order_read"] = order_read[scope]
0412 order_read[scope] += 1
0413
0414 dict_ddm_endpoint[ddm_endpoint_name][scope]["role"].append(role_clean)
0415
0416 for ddm_endpoint_name, ddm_endpoint_values in dict_ddm_endpoint.items():
0417 for scope, scope_values in ddm_endpoint_values.items():
0418
0419 roles = scope_values["role"]
0420 order_write = scope_values["order_write"]
0421 order_read = scope_values["order_read"]
0422
0423
0424 try:
0425 storage_site_name = self.endpoint_token_dict[ddm_endpoint_name]["site_name"]
0426 except KeyError:
0427 self.log_stream.warning(
0428 f"Skipped {long_panda_site_name}, because primary associated DDM endpoint {ddm_endpoint_name} not found (e.g.DISABLED)"
0429 )
0430 continue
0431
0432
0433 if storage_site_name == cpu_site_name:
0434 is_local = "Y"
0435 else:
0436 is_local = "N"
0437
0438
0439 if order_read == 1:
0440 default_read = "Y"
0441 else:
0442 default_read = "N"
0443 if order_write == 1:
0444 default_write = "Y"
0445 else:
0446 default_write = "N"
0447
0448
0449 if ddm_endpoint_name in self.endpoint_token_dict:
0450 relation_list.append(
0451 {
0452 "panda_site_name": panda_site_name,
0453 "ddm_endpoint_name": ddm_endpoint_name,
0454 "roles": ",".join(roles),
0455 "default_read": default_read,
0456 "default_write": default_write,
0457 "is_local": is_local,
0458 "order_read": order_read,
0459 "order_write": order_write,
0460 "scope": scope,
0461 }
0462 )
0463
0464 return relation_list
0465
0466 def consistency_check(self):
0467 """
0468 Point out sites, panda sites and DDM endpoints that are missing in one of the sources
0469 """
0470
0471 CRIC_sites = set([site_name for site_name, site_config in self.site_dump.items() if site_config["state"] == "ACTIVE"])
0472 self.log_stream.debug(f"Sites in CRIC {CRIC_sites}")
0473 configurator_sites = self.taskBuffer.configurator_read_sites()
0474 self.log_stream.debug(f"Sites in Configurator {configurator_sites}")
0475 schedconfig_sites = self.taskBuffer.configurator_read_cric_sites()
0476 self.log_stream.debug(f"Sites in Schedconfig {schedconfig_sites}")
0477
0478 all_sites = sorted(filter(None, CRIC_sites | configurator_sites | schedconfig_sites))
0479
0480 for site in all_sites:
0481 missing = []
0482 if site not in CRIC_sites:
0483 missing.append("CRIC")
0484 if site not in configurator_sites:
0485 missing.append("Configurator")
0486 if site not in schedconfig_sites:
0487 missing.append("Schedconfig")
0488 if missing:
0489 self.log_stream.warning(f"SITE inconsistency: {site} was not found in {missing}")
0490
0491
0492 CRIC_panda_sites = set([self.schedconfig_dump[long_panda_site_name]["panda_resource"] for long_panda_site_name in self.schedconfig_dump])
0493 self.log_stream.debug(f"PanDA sites in CRIC {CRIC_panda_sites}")
0494 configurator_panda_sites = self.taskBuffer.configurator_read_panda_sites()
0495 self.log_stream.debug(f"PanDA sites in Configurator {configurator_panda_sites}")
0496 schedconfig_panda_sites = self.taskBuffer.configurator_read_cric_panda_sites()
0497 self.log_stream.debug(f"PanDA sites in Schedconfig {schedconfig_panda_sites}")
0498
0499 all_panda_sites = sorted(CRIC_panda_sites | configurator_panda_sites | schedconfig_panda_sites)
0500
0501 for site in all_panda_sites:
0502 missing = []
0503 if site not in CRIC_panda_sites:
0504 missing.append("CRIC")
0505 if site not in configurator_panda_sites:
0506 missing.append("Configurator")
0507 if site not in schedconfig_panda_sites:
0508 missing.append("Schedconfig")
0509 if missing:
0510 self.log_stream.warning(f"PanDA SITE inconsistency: {site} was not found in {missing}")
0511
0512
0513 CRIC_ddm_endpoints = set([ddm_endpoint_name for ddm_endpoint_name in self.endpoint_token_dict])
0514 self.log_stream.debug(f"DDM endpoints in CRIC {CRIC_ddm_endpoints}")
0515 configurator_ddm_endpoints = self.taskBuffer.configurator_read_ddm_endpoints()
0516 self.log_stream.debug(f"DDM endpoints in Configurator {configurator_ddm_endpoints}")
0517
0518 all_ddm_endpoints = sorted(CRIC_ddm_endpoints | configurator_ddm_endpoints)
0519
0520 for site in all_ddm_endpoints:
0521 missing = []
0522 if site not in CRIC_ddm_endpoints:
0523 missing.append("CRIC")
0524 if site not in configurator_ddm_endpoints:
0525 missing.append("Configurator")
0526 if missing:
0527 self.log_stream.warning(f"DDM ENDPOINT inconsistency: {site} was not found in {missing}")
0528
0529 self.cleanup_configurator(
0530 CRIC_sites,
0531 CRIC_panda_sites,
0532 CRIC_ddm_endpoints,
0533 configurator_sites,
0534 configurator_panda_sites,
0535 configurator_ddm_endpoints,
0536 )
0537
0538 def cleanup_configurator(
0539 self,
0540 CRIC_sites,
0541 CRIC_panda_sites,
0542 CRIC_ddm_endpoints,
0543 configurator_sites,
0544 configurator_panda_sites,
0545 configurator_ddm_endpoints,
0546 ):
0547 """
0548 Cleans up information from configurator that is not in CRIC
0549 """
0550 if not CRIC_sites or not CRIC_panda_sites or not CRIC_ddm_endpoints:
0551 self.log_stream.warning("Exiting cleanup because one of CRIC sets was empty")
0552
0553
0554 sites_to_delete = configurator_sites - CRIC_sites
0555 self.taskBuffer.configurator_delete_sites(sites_to_delete)
0556
0557
0558 panda_sites_to_delete = configurator_panda_sites - CRIC_panda_sites
0559 self.taskBuffer.configurator_delete_panda_sites(panda_sites_to_delete)
0560
0561
0562 ddm_endpoints_to_delete = configurator_ddm_endpoints - CRIC_ddm_endpoints
0563 self.taskBuffer.configurator_delete_ddm_endpoints(ddm_endpoints_to_delete)
0564
0565 def run(self):
0566 """
0567 Principal function
0568 """
0569
0570 """
0571 site_dump
0572 endpoint_dump
0573 schedconfig_dump
0574 blacklisted_endpoints
0575 blacklisted_endpoints_read
0576 rse_usage
0577 """
0578
0579 if self.schedconfig_dump is None:
0580 self.log_stream.error(f"SKIPPING RUN. Failed to download {self.CRIC_URL_SCHEDCONFIG}")
0581 return False
0582
0583 if self.endpoint_dump is None:
0584 self.log_stream.error(f"SKIPPING RUN. Failed to download {self.CRIC_URL_DDMENDPOINTS}")
0585 return False
0586
0587 if self.site_dump is None:
0588 self.log_stream.error(f"SKIPPING RUN. Failed to download {self.CRIC_URL_SITES}")
0589 return False
0590
0591
0592 (
0593 sites_list,
0594 panda_sites_list,
0595 ddm_endpoints_list,
0596 panda_ddm_relation_dict,
0597 ) = self.process_site_dumps()
0598
0599
0600 self.taskBuffer.configurator_write_sites(sites_list)
0601 self.taskBuffer.configurator_write_panda_sites(panda_sites_list)
0602 self.taskBuffer.configurator_write_ddm_endpoints(ddm_endpoints_list)
0603 self.taskBuffer.configurator_write_panda_ddm_relations(panda_ddm_relation_dict)
0604
0605
0606 self.consistency_check()
0607
0608 return True
0609
0610
0611 class NetworkConfigurator(threading.Thread):
0612 def __init__(self, taskBuffer, log_stream=None):
0613 threading.Thread.__init__(self)
0614
0615 self.taskBuffer = taskBuffer
0616 if log_stream:
0617 self.log_stream = log_stream
0618 else:
0619 self.log_stream = _logger
0620
0621 if hasattr(panda_config, "NWS_URL"):
0622 self.NWS_URL = panda_config.NWS_URL
0623 else:
0624 self.NWS_URL = "https://atlas-rucio-network-metrics.cern.ch/metrics.json"
0625
0626 if hasattr(panda_config, "CRIC_URL_CM"):
0627 self.CRIC_URL_CM = panda_config.CRIC_URL_CM
0628 else:
0629 self.CRIC_URL_CM = "https://atlas-cric.cern.ch/api/core/sitematrix/query/?json&json_pretty=0"
0630
0631 def retrieve_data(self):
0632 self.log_stream.debug("Getting NWS dump...")
0633 self.nws_dump = aux.get_dump(self.NWS_URL)
0634 if not self.nws_dump:
0635 self.log_stream.error("Could not retrieve the NWS data")
0636 return False
0637 self.log_stream.debug("Done")
0638
0639 self.log_stream.debug("Getting CRIC cost matrix dump...")
0640 self.CRIC_cm_dump = aux.get_dump(self.CRIC_URL_CM)
0641 if not self.CRIC_cm_dump:
0642 self.log_stream.error("Could not retrieve the cost matrix data")
0643 return False
0644 self.log_stream.debug("Done")
0645
0646 return True
0647
0648 def process_nws_dump(self):
0649 """
0650 Gets the second generation NWS information dump, filters out irrelevant information
0651 and prepares it for insertion into the PanDA DB
0652 """
0653
0654 data = []
0655 sites_list = self.taskBuffer.configurator_read_sites()
0656
0657 sites_list.add("UNKNOWN")
0658
0659
0660 latest_validity = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(minutes=30)
0661
0662 for src_dst in self.nws_dump:
0663 try:
0664 source, destination = src_dst.split(":")
0665 skip_sites = []
0666
0667
0668 if source not in sites_list:
0669 skip_sites.append(source)
0670 if destination not in sites_list:
0671 skip_sites.append(destination)
0672 if skip_sites:
0673 self.log_stream.warning(f"Could not find site(s) {skip_sites} in configurator sites")
0674 continue
0675
0676 except ValueError:
0677 self.log_stream.error(f"Json wrongly formatted. Expected key with format src:dst, but found key {src_dst}")
0678 continue
0679
0680
0681 try:
0682 done = self.nws_dump[src_dst][FILES][DONE]
0683 for activity in [PROD_INPUT, PROD_OUTPUT, EXPRESS]:
0684 if activity not in done:
0685 continue
0686 try:
0687 updated_at = datetime.strptime(done[activity][TIMESTAMP], "%Y-%m-%dT%H:%M:%S")
0688 if updated_at > latest_validity:
0689 done_1h = done[activity][H1]
0690 done_6h = done[activity][H6]
0691 data.append(
0692 (
0693 source,
0694 destination,
0695 activity + "_done_1h",
0696 done_1h,
0697 updated_at,
0698 )
0699 )
0700 data.append(
0701 (
0702 source,
0703 destination,
0704 activity + "_done_6h",
0705 done_6h,
0706 updated_at,
0707 )
0708 )
0709 except (KeyError, ValueError):
0710 self.log_stream.debug(f"Entry {done} ({source}->{destination}) key {activity} does not follow standards")
0711 continue
0712 except KeyError:
0713 pass
0714
0715
0716 try:
0717 queued = self.nws_dump[src_dst][FILES][QUEUED]
0718 for activity in [PROD_INPUT, PROD_OUTPUT, EXPRESS]:
0719 if activity not in queued:
0720 continue
0721 try:
0722 updated_at = datetime.strptime(queued[activity][TIMESTAMP], "%Y-%m-%dT%H:%M:%S")
0723 if updated_at > latest_validity:
0724 nqueued = queued[activity][LATEST]
0725 data.append(
0726 (
0727 source,
0728 destination,
0729 activity + "_queued",
0730 nqueued,
0731 updated_at,
0732 )
0733 )
0734 except (KeyError, ValueError):
0735 self.log_stream.error(f"Entry {queued} ({source}->{destination}) key {activity} does not follow standards")
0736 continue
0737 except KeyError:
0738 pass
0739
0740
0741 try:
0742 mbps = self.nws_dump[src_dst][MBPS]
0743 for system in mbps:
0744 try:
0745 updated_at = datetime.strptime(mbps[system][TIMESTAMP], "%Y-%m-%dT%H:%M:%S")
0746 except ValueError:
0747 self.log_stream.debug(f"Entry {mbps} has wrong timestamp for system {system}")
0748 if updated_at > latest_validity:
0749 for duration in [H1, D1, W1]:
0750 try:
0751 mbps_entry = mbps[system][duration]
0752 data.append(
0753 (
0754 source,
0755 destination,
0756 f"{system}_mbps_{duration}",
0757 mbps_entry,
0758 updated_at,
0759 )
0760 )
0761 except KeyError:
0762 self.log_stream.debug(
0763 f"Entry {mbps} ({source}->{destination}) system {system} duration {duration} not available or wrongly formatted"
0764 )
0765 self.log_stream.debug(sys.exc_info())
0766 except KeyError:
0767 pass
0768
0769 return data
0770
0771 def process_CRIC_cm_dump(self):
0772 """
0773 Gets the CRIC CM information dump, filters out irrelevant information
0774 and prepares it for insertion into the PanDA DB
0775 """
0776 data = []
0777 sites_list = self.taskBuffer.configurator_read_sites()
0778
0779 for entry in self.CRIC_cm_dump:
0780 self.log_stream.debug(f"Processing CRIC CM entry {entry}")
0781
0782 try:
0783 src = entry["src"]
0784 dst = entry["dst"]
0785 closeness = entry["closeness"]
0786 ts = datetime.now()
0787
0788
0789 skip_sites = []
0790 if src not in sites_list:
0791 skip_sites.append(src)
0792 if dst not in sites_list:
0793 skip_sites.append(dst)
0794 if skip_sites:
0795 self.log_stream.warning(f"Could not find site(s) {skip_sites} in configurator sites")
0796 continue
0797
0798
0799 if not src or not dst:
0800 continue
0801
0802
0803 data.append((src, dst, "AGIS_closeness", closeness, ts))
0804
0805 except KeyError:
0806 self.log_stream.warning(f"CRIC CM entry {entry} does not contain one or more of the keys src/dst/closeness")
0807 continue
0808
0809 return data
0810
0811 def run(self):
0812 """
0813 Principal function
0814 """
0815
0816
0817 data_nws = self.process_nws_dump()
0818 if not data_nws:
0819 self.log_stream.error("Could not retrieve any data from the NWS!")
0820
0821
0822 data_CRIC_cm = self.process_CRIC_cm_dump()
0823 if not data_CRIC_cm:
0824 self.log_stream.error("Could not retrieve any data from the CRIC Cost Matrix!")
0825
0826 data_combined = data_nws + data_CRIC_cm
0827 if data_combined:
0828
0829 self.taskBuffer.insertNetworkMatrixData(data_combined)
0830
0831 self.taskBuffer.deleteOldNetworkData()
0832 return True
0833 else:
0834 return False
0835
0836
0837 class SchedconfigJsonDumper(threading.Thread):
0838 """
0839 Downloads the CRIC schedconfig dump and stores it in the DB, one row per queue
0840 """
0841
0842 def __init__(self, taskBuffer, log_stream=None):
0843 """
0844 Initialization and configuration
0845 """
0846 threading.Thread.__init__(self)
0847
0848 self.taskBuffer = taskBuffer
0849 if log_stream:
0850 self.log_stream = log_stream
0851 else:
0852 self.log_stream = _logger
0853
0854 if hasattr(panda_config, "CRIC_URL_SCHEDCONFIG"):
0855 self.CRIC_URL_SCHEDCONFIG = panda_config.CRIC_URL_SCHEDCONFIG
0856 else:
0857 self.CRIC_URL_SCHEDCONFIG = "https://atlas-cric.cern.ch/api/atlas/pandaqueue/query/?json"
0858
0859 self.log_stream.debug("Getting schedconfig dump...")
0860 self.schedconfig_dump = aux.get_dump(self.CRIC_URL_SCHEDCONFIG)
0861 self.log_stream.debug("Done")
0862
0863 def run(self):
0864 """
0865 Principal function
0866 """
0867 if self.schedconfig_dump is None:
0868 self.log_stream.error(f"SKIPPING RUN. Failed to download {self.CRIC_URL_SCHEDCONFIG}")
0869 return False
0870
0871 return self.taskBuffer.upsertQueuesInJSONSchedconfig(self.schedconfig_dump)
0872
0873
0874 class SWTagsDumper(threading.Thread):
0875 """
0876 Downloads the CRIC tags dump, flattens it out and stores it in the DB, one row per queue
0877 """
0878
0879 def __init__(self, taskBuffer, log_stream=None):
0880 """
0881 Initialization and configuration
0882 """
0883 threading.Thread.__init__(self)
0884
0885 self.taskBuffer = taskBuffer
0886 if log_stream:
0887 self.log_stream = log_stream
0888 else:
0889 self.log_stream = _logger
0890
0891 if hasattr(panda_config, "CRIC_URL_TAGS"):
0892 self.CRIC_URL_TAGS = panda_config.CRIC_URL_TAGS
0893 else:
0894 self.CRIC_URL_TAGS = "https://atlas-cric.cern.ch/api/atlas/pandaqueue/query/?json&preset=tags"
0895
0896 self.log_stream.debug("Getting tags dump...")
0897 self.tags_dump = aux.get_dump(self.CRIC_URL_TAGS)
0898 self.log_stream.debug("Done")
0899
0900 def run(self):
0901 """
0902 Principal function
0903 """
0904 if self.tags_dump is None:
0905 self.log_stream.error(f"SKIPPING RUN. Failed to download {self.CRIC_URL_TAGS}")
0906 return False
0907
0908 return self.taskBuffer.loadSWTags(self.tags_dump)