Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import sys
0002 import threading
0003 import traceback
0004 from datetime import datetime, timedelta, timezone
0005 
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007 
0008 from pandaserver.config import panda_config
0009 from pandaserver.configurator import aux
0010 from pandaserver.configurator.aux import *
0011 
0012 _logger = PandaLogger().getLogger("configurator")
0013 
0014 # Definitions of roles
0015 WRITE_LAN = "write_lan"
0016 READ_LAN = "read_lan"
0017 DEFAULT = "default"
0018 
0019 
0020 class Configurator(threading.Thread):
0021     def __init__(self, taskBuffer, log_stream=None):
0022         threading.Thread.__init__(self)
0023 
0024         self.taskBuffer = taskBuffer
0025         if log_stream:
0026             self.log_stream = log_stream
0027         else:
0028             self.log_stream = _logger
0029 
0030         if hasattr(panda_config, "CRIC_URL_SITES"):
0031             self.CRIC_URL_SITES = panda_config.CRIC_URL_SITES
0032         else:
0033             self.CRIC_URL_SITES = "https://atlas-cric.cern.ch/api/atlas/site/query/?json"
0034 
0035         if hasattr(panda_config, "CRIC_URL_DDMENDPOINTS"):
0036             self.CRIC_URL_DDMENDPOINTS = panda_config.CRIC_URL_DDMENDPOINTS
0037         else:
0038             self.CRIC_URL_DDMENDPOINTS = "https://atlas-cric.cern.ch/api/atlas/ddmendpoint/query/?json"
0039 
0040         if hasattr(panda_config, "CRIC_URL_SCHEDCONFIG"):
0041             self.CRIC_URL_SCHEDCONFIG = panda_config.CRIC_URL_SCHEDCONFIG
0042         else:
0043             self.CRIC_URL_SCHEDCONFIG = "https://atlas-cric.cern.ch/api/atlas/pandaqueue/query/?json"
0044 
0045         if hasattr(panda_config, "CRIC_URL_DDMBLACKLIST"):
0046             self.CRIC_URL_DDMBLACKLIST = panda_config.CRIC_URL_DDMBLACKLIST
0047         else:
0048             self.CRIC_URL_DDMBLACKLIST = "https://atlas-cric.cern.ch/api/atlas/ddmendpointstatus/query/?json&activity=write_wan&fstate=OFF"
0049 
0050         if hasattr(panda_config, "CRIC_URL_DDMBLACKLIST_READ"):
0051             self.CRIC_URL_DDMBLACKLIST_READ = panda_config.CRIC_URL_DDMBLACKLIST_READ
0052         else:
0053             self.CRIC_URL_DDMBLACKLIST_READ = "https://atlas-cric.cern.ch/api/atlas/ddmendpointstatus/query/?json&activity=read_wan&fstate=OFF"
0054 
0055         if hasattr(panda_config, "CRIC_URL_DDMBLACKLIST_FULL"):
0056             self.CRIC_URL_DDMBLACKLIST_FULL = panda_config.CRIC_URL_DDMBLACKLIST_FULL
0057         else:
0058             self.CRIC_URL_DDMBLACKLIST_FULL = "https://atlas-cric.cern.ch/api/atlas/ddmendpointstatus/query/?json"
0059 
0060         if hasattr(panda_config, "RUCIO_RSE_USAGE"):
0061             self.RUCIO_RSE_USAGE = panda_config.RUCIO_RSE_USAGE
0062         else:
0063             self.RUCIO_RSE_USAGE = "https://rucio-hadoop.cern.ch/dumps/rse_usage/current.json"
0064 
0065     def retrieve_data(self):
0066         self.log_stream.debug("Getting site dump...")
0067         self.site_dump = aux.get_dump(self.CRIC_URL_SITES)
0068         if not self.site_dump:
0069             self.log_stream.error("The site dump was not retrieved correctly")
0070             return False
0071         self.log_stream.debug("Done")
0072 
0073         self.log_stream.debug("Getting DDM endpoints dump...")
0074         self.endpoint_dump = aux.get_dump(self.CRIC_URL_DDMENDPOINTS)
0075         if not self.endpoint_dump:
0076             self.log_stream.error("The endpoint dump was not retrieved correctly")
0077             return False
0078         self.log_stream.debug("Done")
0079 
0080         self.log_stream.debug("Parsing endpoints...")
0081         self.endpoint_token_dict = self.parse_endpoints()
0082         self.log_stream.debug("Done")
0083 
0084         self.log_stream.debug("Getting schedconfig dump...")
0085         self.schedconfig_dump = aux.get_dump(self.CRIC_URL_SCHEDCONFIG)
0086         if not self.schedconfig_dump:
0087             self.log_stream.error("The schedconfig dump was not retrieved correctly")
0088             return False
0089         self.log_stream.debug("Done")
0090 
0091         # --- WRITE ---
0092         self.log_stream.debug("Getting ddmblacklist dump...")
0093         try:
0094             if self.CRIC_URL_DDMBLACKLIST:
0095                 dump = aux.get_dump(self.CRIC_URL_DDMBLACKLIST)
0096                 if dump is None:
0097                     # True failure (e.g., fetch/parse problem)
0098                     self.log_stream.error("The blacklisted endpoint dump could not be retrieved (None)")
0099                     return False
0100                 # Treat {} as valid/empty
0101                 self.blacklisted_endpoints = list(dump) if isinstance(dump, (dict, list, tuple, set)) else list(
0102                     dump or [])
0103                 self.blacklisted_endpoints_write = self.blacklisted_endpoints
0104                 self.log_stream.debug(f"ddmblacklist(write) retrieved {len(self.blacklisted_endpoints)} endpoints")
0105             else:
0106                 self.blacklisted_endpoints = []
0107                 self.blacklisted_endpoints_write = []
0108         except Exception as e:
0109             self.log_stream.warning(f"Failed to retrieve ddmblacklist(write); defaulting to empty. Error: {e}")
0110             self.blacklisted_endpoints = []
0111             self.blacklisted_endpoints_write = []
0112 
0113         self.log_stream.debug(f"Blacklisted endpoints {self.blacklisted_endpoints}")
0114         self.log_stream.debug(f"Blacklisted endpoints write {self.blacklisted_endpoints_write}")
0115         self.log_stream.debug("Done")
0116 
0117         # --- READ ---
0118         self.log_stream.debug("Getting ddmblacklist read dump...")
0119         try:
0120             if self.CRIC_URL_DDMBLACKLIST_READ:
0121                 dump = aux.get_dump(self.CRIC_URL_DDMBLACKLIST_READ)
0122                 # Accept {} as empty; only treat None as failure
0123                 if dump is None:
0124                     self.log_stream.warning("ddmblacklist(read) returned None; defaulting to empty")
0125                     self.blacklisted_endpoints_read = []
0126                 else:
0127                     self.blacklisted_endpoints_read = list(dump) if isinstance(dump,
0128                                                                                (dict, list, tuple, set)) else list(
0129                         dump or [])
0130                 self.log_stream.debug(f"ddmblacklist(read) retrieved {len(self.blacklisted_endpoints_read)} endpoints")
0131             else:
0132                 self.blacklisted_endpoints_read = []
0133         except Exception as e:
0134             self.log_stream.warning(f"Failed to retrieve ddmblacklist(read); defaulting to empty. Error: {e}")
0135             self.blacklisted_endpoints_read = []
0136 
0137         self.log_stream.debug(f"Blacklisted endpoints read {self.blacklisted_endpoints_read}")
0138         self.log_stream.debug("Done")
0139 
0140         # --- FULL / DETAILED ---
0141         self.log_stream.debug("Getting DDM detailed status...")
0142         try:
0143             if self.CRIC_URL_DDMBLACKLIST_FULL:
0144                 dump = aux.get_dump(self.CRIC_URL_DDMBLACKLIST_FULL)
0145                 if dump is None:
0146                     self.log_stream.warning("ddmblacklist(full) returned None; defaulting to empty")
0147                     self.ddm_detailed_exclusions = {}
0148                 else:
0149                     self.ddm_detailed_exclusions = dump if isinstance(dump, dict) else dict(dump or {})
0150                 # Expecting a dict here; {} is valid/empty
0151                 if isinstance(dump, dict):
0152                     self.ddm_detailed_exclusions = dump
0153                 else:
0154                     # Be tolerant but safe: coerce to dict if possible, else empty
0155                     self.ddm_detailed_exclusions = dict(dump) if hasattr(dump, "items") else {}
0156                 self.log_stream.debug(
0157                     f"ddmblacklist(full) retrieved {len(self.ddm_detailed_exclusions)} entries"
0158                 )
0159             else:
0160                 self.ddm_detailed_exclusions = {}
0161         except Exception as e:
0162             self.log_stream.warning(f"Failed to retrieve ddmblacklist(full); defaulting to empty. Error: {e}")
0163             self.ddm_detailed_exclusions = {}
0164 
0165         self.log_stream.debug(f"Blacklisted endpoints full {self.ddm_detailed_exclusions}")
0166         self.log_stream.debug("Done")
0167 
0168         if self.RUCIO_RSE_USAGE:
0169             self.log_stream.debug("Getting Rucio RSE usage dump...")
0170             self.rse_usage = aux.get_dump(self.RUCIO_RSE_USAGE)
0171             if not self.rse_usage:
0172                 self.log_stream.error("The RSE usage dump was not retrieved correctly")
0173                 return False
0174             self.log_stream.debug("Done")
0175         else:
0176             self.rse_usage = {}
0177         return True
0178 
0179     def get_site_info(self, site):
0180         """
0181         Gets the relevant information from a site
0182         """
0183         state = site["state"]
0184         tier_level = site["tier_level"]
0185 
0186         if "Nucleus" in site["datapolicies"]:  # or site['tier_level'] <= 1:
0187             role = "nucleus"
0188         else:
0189             role = "satellite"
0190 
0191         return role, state, tier_level
0192 
0193     def parse_endpoints(self):
0194         """
0195         Puts the relevant information from endpoint_dump into a more usable format
0196         """
0197         endpoint_token_dict = {}
0198         for endpoint, endpoint_config in self.endpoint_dump.items():
0199             # Filter out testing and inactive endpoints
0200             if endpoint_config["state"] == "ACTIVE":  # and endpoint['type'] != 'TEST'
0201                 endpoint_token_dict[endpoint] = {}
0202                 endpoint_token_dict[endpoint]["token"] = endpoint_config["token"]
0203                 endpoint_token_dict[endpoint]["site_name"] = endpoint_config["site"]
0204                 endpoint_token_dict[endpoint]["type"] = endpoint_config["type"]
0205                 if endpoint_config["is_tape"]:
0206                     endpoint_token_dict[endpoint]["is_tape"] = "Y"
0207                 else:
0208                     endpoint_token_dict[endpoint]["is_tape"] = "N"
0209             else:
0210                 self.log_stream.debug(f"parse_endpoints: skipped endpoint {endpoint} (type: {endpoint_config['type']}, state: {endpoint_config['state']})")
0211 
0212         return endpoint_token_dict
0213 
0214     def process_site_dumps(self):
0215         """
0216         Parses the CRIC site and endpoint dumps and prepares a format loadable to the DB
0217         """
0218         # Variables that will contain only the relevant information
0219         sites_list = []
0220         included_sites = []
0221         ddm_endpoints_list = []
0222         panda_sites_list = []
0223 
0224         # New relationship information based on astorages field in CRIC.
0225         # Used to fill atlas_panda.panda_ddm_relation table
0226         try:
0227             panda_ddm_relation_list = self.get_panda_ddm_relations()
0228         except Exception:
0229             # Temporary protection to prevent issues
0230             self.log_stream.error(f"get_panda_ddm_relations excepted with {traceback.print_exc()}")
0231             panda_ddm_relation_list = []
0232 
0233         # Iterate the site dump
0234         for site_name, site_config in self.site_dump.items():
0235             # Add the site info to a list
0236             (site_role, site_state, tier_level) = self.get_site_info(site_config)
0237             if site_state == "ACTIVE" and site_name not in included_sites:
0238                 sites_list.append(
0239                     {
0240                         "site_name": site_name,
0241                         "role": site_role,
0242                         "tier_level": tier_level,
0243                     }
0244                 )
0245                 included_sites.append(site_name)
0246             else:
0247                 self.log_stream.debug(f"process_site_dumps: skipped site {site_name} (state: {site_state})")
0248 
0249             # Get the DDM endpoints for the site we are inspecting
0250             for ddm_endpoint_name in site_config["ddmendpoints"]:
0251                 try:
0252                     ddm_spacetoken_name = self.endpoint_token_dict[ddm_endpoint_name]["token"]
0253                     ddm_endpoint_type = self.endpoint_token_dict[ddm_endpoint_name]["type"]
0254                     ddm_endpoint_is_tape = self.endpoint_token_dict[ddm_endpoint_name]["is_tape"]
0255                     if ddm_endpoint_name in self.blacklisted_endpoints:
0256                         ddm_endpoint_blacklisted_write = "Y"
0257                         self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} is blacklisted for write")
0258                     else:
0259                         ddm_endpoint_blacklisted_write = "N"
0260                         self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} is NOT blacklisted for write")
0261 
0262                     if ddm_endpoint_name in self.blacklisted_endpoints_read:
0263                         ddm_endpoint_blacklisted_read = "Y"
0264                         self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} is blacklisted for read")
0265                     else:
0266                         ddm_endpoint_blacklisted_read = "N"
0267                         self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} is NOT blacklisted for read")
0268 
0269                     detailed_status = {}
0270                     if ddm_endpoint_name in self.ddm_detailed_exclusions:
0271                         for activity in self.ddm_detailed_exclusions[ddm_endpoint_name]:
0272                             detailed_status[activity] = self.ddm_detailed_exclusions[ddm_endpoint_name][activity]["status"]["value"]
0273 
0274                         self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} has detailed exclusions {detailed_status}")
0275                 except KeyError:
0276                     continue
0277 
0278                 # Get the storage space
0279                 try:
0280                     space_used = self.rse_usage[ddm_endpoint_name]["storage"]["used"] / GB
0281                     self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} has used space {space_used}GB")
0282                     space_free = self.rse_usage[ddm_endpoint_name]["storage"]["free"] / GB
0283                     self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} has free space {space_free}GB")
0284                     space_total = space_used + space_free
0285                     space_timestamp = datetime.strptime(
0286                         self.rse_usage[ddm_endpoint_name]["storage"]["updated_at"],
0287                         "%Y-%m-%d %H:%M:%S",
0288                     )
0289                     self.log_stream.debug(f"process_site_dumps: endpoint {ddm_endpoint_name} has space timestamp {space_timestamp}")
0290 
0291                 except (KeyError, ValueError):
0292                     space_used, space_free, space_total, space_timestamp = (
0293                         None,
0294                         None,
0295                         None,
0296                         None,
0297                     )
0298                     self.log_stream.warning(f"process_site_dumps: no rse storage usage information for {ddm_endpoint_name}")
0299 
0300                 # Get the Expired space
0301                 try:
0302                     space_expired = self.rse_usage[ddm_endpoint_name]["expired"]["used"] / GB
0303                 except KeyError:
0304                     space_expired = 0
0305                     self.log_stream.warning(f"process_site_dumps: no rse EXPIRED usage information for {ddm_endpoint_name}")
0306 
0307                 ddm_spacetoken_state = site_config["ddmendpoints"][ddm_endpoint_name]["state"]
0308                 if ddm_spacetoken_state == "ACTIVE":
0309                     ddm_endpoints_list.append(
0310                         {
0311                             "ddm_endpoint_name": ddm_endpoint_name,
0312                             "site_name": site_name,
0313                             "ddm_spacetoken_name": ddm_spacetoken_name,
0314                             "type": ddm_endpoint_type,
0315                             "is_tape": ddm_endpoint_is_tape,
0316                             "blacklisted": ddm_endpoint_blacklisted_write,
0317                             "blacklisted_write": ddm_endpoint_blacklisted_write,
0318                             "blacklisted_read": ddm_endpoint_blacklisted_read,
0319                             "detailed_status": json.dumps(detailed_status),
0320                             "space_used": space_used,
0321                             "space_free": space_free,
0322                             "space_total": space_total,
0323                             "space_expired": space_expired,
0324                             "space_timestamp": space_timestamp,
0325                         }
0326                     )
0327                     self.log_stream.debug(f"process_site_dumps: added DDM endpoint {ddm_endpoint_name}")
0328                 else:
0329                     self.log_stream.debug(f"process_site_dumps: skipped DDM endpoint {ddm_endpoint_name} because of state {ddm_spacetoken_state}")
0330 
0331             # Get the PanDA resources
0332             for panda_resource in site_config["presources"]:
0333                 for panda_site in site_config["presources"][panda_resource]:
0334                     panda_site_state = site_config["presources"][panda_resource][panda_site]["state"]
0335                     if panda_site_state != "ACTIVE":
0336                         self.log_stream.debug(f"process_site_dumps: skipped PanDA site {panda_site} (state: {panda_site_state})")
0337                         continue
0338                     panda_site_name = panda_site
0339                     panda_sites_list.append({"panda_site_name": panda_site_name, "site_name": site_name})
0340 
0341         return sites_list, panda_sites_list, ddm_endpoints_list, panda_ddm_relation_list
0342 
0343     def parse_role(self, role):
0344         """
0345         Traditionally roles have been "read_lan" or "write_lan". We will consider them the default roles.
0346         If you want to overwrite the role for specific jobs in CRIC, you can define e.g. "read_lan_analysis". Here we
0347         want to strip the role to the "analysis" tag and return role "read_lan" and scope "analysis"
0348 
0349         Examples:
0350             "read_lan_analysis" --> "read_lan" and "analysis"
0351             "read_lan" --> "read_lan" and "default"
0352         """
0353         # default roles: "read_lan" or "write_lan"
0354         if role == READ_LAN or role == WRITE_LAN:
0355             return role, DEFAULT
0356 
0357         # special read_lan roles, e.g. "read_lan_analysis"
0358         elif role.startswith(READ_LAN):
0359             role_clean = READ_LAN
0360             scope = role.replace(f"{READ_LAN}_", "")
0361             return role_clean, scope
0362 
0363         # special write_lan roles, e.g. "write_lan_analysis"
0364         elif role.startswith(WRITE_LAN):
0365             role_clean = WRITE_LAN
0366             scope = role.replace(f"{WRITE_LAN}_", "")
0367             return role_clean, scope
0368 
0369         # roles we are currently not expecting
0370         else:
0371             return role, DEFAULT
0372 
0373     def get_panda_ddm_relations(self):
0374         """
0375         Gets the DDM endpoints assigned to a panda queue, based on the CRIC astorages field of the panda queue definition
0376         """
0377         relation_list = []
0378 
0379         # iterate on panda queues
0380         for long_panda_site_name in self.schedconfig_dump:
0381             panda_site_name = self.schedconfig_dump[long_panda_site_name]["panda_resource"]
0382             cpu_site_name = self.schedconfig_dump[long_panda_site_name]["atlas_site"]
0383             dict_ddm_endpoint = {}
0384 
0385             # get the astorages field
0386             if self.schedconfig_dump[long_panda_site_name]["astorages"]:
0387                 astorages = self.schedconfig_dump[long_panda_site_name]["astorages"]
0388 
0389                 # iterate the storages to establish their roles and orders
0390                 order_read = {DEFAULT: 1}
0391                 order_write = {DEFAULT: 1}
0392                 for role in astorages:
0393                     # ignore old roles (pr, pw) that we are not supposed to use
0394                     if not role.startswith(READ_LAN) and not role.startswith(WRITE_LAN):
0395                         continue
0396 
0397                     for ddm_endpoint_name in astorages[role]:
0398                         # set the read/write order and increment the respective counter
0399                         role_clean, scope = self.parse_role(role)
0400                         order_read.setdefault(scope, 1)
0401                         order_write.setdefault(scope, 1)
0402 
0403                         # initialize DDM endpoint and scope if it does not exist
0404                         dict_ddm_endpoint.setdefault(ddm_endpoint_name, {})
0405                         dict_ddm_endpoint[ddm_endpoint_name].setdefault(scope, {"order_write": None, "order_read": None, "role": []})
0406 
0407                         if role.startswith(WRITE_LAN):
0408                             dict_ddm_endpoint[ddm_endpoint_name][scope]["order_write"] = order_write[scope]
0409                             order_write[scope] += 1
0410                         elif role.startswith(READ_LAN):
0411                             dict_ddm_endpoint[ddm_endpoint_name][scope]["order_read"] = order_read[scope]
0412                             order_read[scope] += 1
0413                         # append the roles
0414                         dict_ddm_endpoint[ddm_endpoint_name][scope]["role"].append(role_clean)
0415 
0416                 for ddm_endpoint_name, ddm_endpoint_values in dict_ddm_endpoint.items():
0417                     for scope, scope_values in ddm_endpoint_values.items():
0418                         # unpack the values
0419                         roles = scope_values["role"]
0420                         order_write = scope_values["order_write"]
0421                         order_read = scope_values["order_read"]
0422 
0423                         # figure out the ATLAS site the DDM endpoint belongs to
0424                         try:
0425                             storage_site_name = self.endpoint_token_dict[ddm_endpoint_name]["site_name"]
0426                         except KeyError:
0427                             self.log_stream.warning(
0428                                 f"Skipped {long_panda_site_name}, because primary associated DDM endpoint {ddm_endpoint_name} not found (e.g.DISABLED)"
0429                             )
0430                             continue
0431 
0432                         # figure out if the storage is local to the cpu
0433                         if storage_site_name == cpu_site_name:
0434                             is_local = "Y"
0435                         else:
0436                             is_local = "N"
0437 
0438                         # endpoints with order 1 will be considered default
0439                         if order_read == 1:
0440                             default_read = "Y"
0441                         else:
0442                             default_read = "N"
0443                         if order_write == 1:
0444                             default_write = "Y"
0445                         else:
0446                             default_write = "N"
0447 
0448                         # add the values to the list of relations if the ddm endpoint is valid
0449                         if ddm_endpoint_name in self.endpoint_token_dict:
0450                             relation_list.append(
0451                                 {
0452                                     "panda_site_name": panda_site_name,
0453                                     "ddm_endpoint_name": ddm_endpoint_name,
0454                                     "roles": ",".join(roles),
0455                                     "default_read": default_read,
0456                                     "default_write": default_write,
0457                                     "is_local": is_local,
0458                                     "order_read": order_read,
0459                                     "order_write": order_write,
0460                                     "scope": scope,
0461                                 }
0462                             )
0463 
0464         return relation_list
0465 
0466     def consistency_check(self):
0467         """
0468         Point out sites, panda sites and DDM endpoints that are missing in one of the sources
0469         """
0470         # Check for site inconsistencies
0471         CRIC_sites = set([site_name for site_name, site_config in self.site_dump.items() if site_config["state"] == "ACTIVE"])
0472         self.log_stream.debug(f"Sites in CRIC {CRIC_sites}")
0473         configurator_sites = self.taskBuffer.configurator_read_sites()
0474         self.log_stream.debug(f"Sites in Configurator {configurator_sites}")
0475         schedconfig_sites = self.taskBuffer.configurator_read_cric_sites()
0476         self.log_stream.debug(f"Sites in Schedconfig {schedconfig_sites}")
0477 
0478         all_sites = sorted(filter(None, CRIC_sites | configurator_sites | schedconfig_sites))
0479 
0480         for site in all_sites:
0481             missing = []
0482             if site not in CRIC_sites:
0483                 missing.append("CRIC")
0484             if site not in configurator_sites:
0485                 missing.append("Configurator")
0486             if site not in schedconfig_sites:
0487                 missing.append("Schedconfig")
0488             if missing:
0489                 self.log_stream.warning(f"SITE inconsistency: {site} was not found in {missing}")
0490 
0491         # Check for panda-site inconsistencies
0492         CRIC_panda_sites = set([self.schedconfig_dump[long_panda_site_name]["panda_resource"] for long_panda_site_name in self.schedconfig_dump])
0493         self.log_stream.debug(f"PanDA sites in CRIC {CRIC_panda_sites}")
0494         configurator_panda_sites = self.taskBuffer.configurator_read_panda_sites()
0495         self.log_stream.debug(f"PanDA sites in Configurator {configurator_panda_sites}")
0496         schedconfig_panda_sites = self.taskBuffer.configurator_read_cric_panda_sites()
0497         self.log_stream.debug(f"PanDA sites in Schedconfig {schedconfig_panda_sites}")
0498 
0499         all_panda_sites = sorted(CRIC_panda_sites | configurator_panda_sites | schedconfig_panda_sites)
0500 
0501         for site in all_panda_sites:
0502             missing = []
0503             if site not in CRIC_panda_sites:
0504                 missing.append("CRIC")
0505             if site not in configurator_panda_sites:
0506                 missing.append("Configurator")
0507             if site not in schedconfig_panda_sites:
0508                 missing.append("Schedconfig")
0509             if missing:
0510                 self.log_stream.warning(f"PanDA SITE inconsistency: {site} was not found in {missing}")
0511 
0512         # Check for DDM endpoint inconsistencies
0513         CRIC_ddm_endpoints = set([ddm_endpoint_name for ddm_endpoint_name in self.endpoint_token_dict])
0514         self.log_stream.debug(f"DDM endpoints in CRIC {CRIC_ddm_endpoints}")
0515         configurator_ddm_endpoints = self.taskBuffer.configurator_read_ddm_endpoints()
0516         self.log_stream.debug(f"DDM endpoints in Configurator {configurator_ddm_endpoints}")
0517 
0518         all_ddm_endpoints = sorted(CRIC_ddm_endpoints | configurator_ddm_endpoints)
0519 
0520         for site in all_ddm_endpoints:
0521             missing = []
0522             if site not in CRIC_ddm_endpoints:
0523                 missing.append("CRIC")
0524             if site not in configurator_ddm_endpoints:
0525                 missing.append("Configurator")
0526             if missing:
0527                 self.log_stream.warning(f"DDM ENDPOINT inconsistency: {site} was not found in {missing}")
0528 
0529         self.cleanup_configurator(
0530             CRIC_sites,
0531             CRIC_panda_sites,
0532             CRIC_ddm_endpoints,
0533             configurator_sites,
0534             configurator_panda_sites,
0535             configurator_ddm_endpoints,
0536         )
0537 
0538     def cleanup_configurator(
0539         self,
0540         CRIC_sites,
0541         CRIC_panda_sites,
0542         CRIC_ddm_endpoints,
0543         configurator_sites,
0544         configurator_panda_sites,
0545         configurator_ddm_endpoints,
0546     ):
0547         """
0548         Cleans up information from configurator that is not in CRIC
0549         """
0550         if not CRIC_sites or not CRIC_panda_sites or not CRIC_ddm_endpoints:
0551             self.log_stream.warning("Exiting cleanup because one of CRIC sets was empty")
0552 
0553         # Clean up sites
0554         sites_to_delete = configurator_sites - CRIC_sites
0555         self.taskBuffer.configurator_delete_sites(sites_to_delete)
0556 
0557         # Clean up panda sites
0558         panda_sites_to_delete = configurator_panda_sites - CRIC_panda_sites
0559         self.taskBuffer.configurator_delete_panda_sites(panda_sites_to_delete)
0560 
0561         # Clean up DDM endpoints
0562         ddm_endpoints_to_delete = configurator_ddm_endpoints - CRIC_ddm_endpoints
0563         self.taskBuffer.configurator_delete_ddm_endpoints(ddm_endpoints_to_delete)
0564 
0565     def run(self):
0566         """
0567         Principal function
0568         """
0569 
0570         """
0571         site_dump
0572         endpoint_dump
0573         schedconfig_dump
0574         blacklisted_endpoints
0575         blacklisted_endpoints_read
0576         rse_usage
0577         """
0578 
0579         if self.schedconfig_dump is None:
0580             self.log_stream.error(f"SKIPPING RUN. Failed to download {self.CRIC_URL_SCHEDCONFIG}")
0581             return False
0582 
0583         if self.endpoint_dump is None:
0584             self.log_stream.error(f"SKIPPING RUN. Failed to download {self.CRIC_URL_DDMENDPOINTS}")
0585             return False
0586 
0587         if self.site_dump is None:
0588             self.log_stream.error(f"SKIPPING RUN. Failed to download {self.CRIC_URL_SITES}")
0589             return False
0590 
0591         # Get pre-processed CRIC dumps
0592         (
0593             sites_list,
0594             panda_sites_list,
0595             ddm_endpoints_list,
0596             panda_ddm_relation_dict,
0597         ) = self.process_site_dumps()
0598 
0599         # Persist the information to the PanDA DB
0600         self.taskBuffer.configurator_write_sites(sites_list)
0601         self.taskBuffer.configurator_write_panda_sites(panda_sites_list)
0602         self.taskBuffer.configurator_write_ddm_endpoints(ddm_endpoints_list)
0603         self.taskBuffer.configurator_write_panda_ddm_relations(panda_ddm_relation_dict)
0604 
0605         # Do a data quality check
0606         self.consistency_check()
0607 
0608         return True
0609 
0610 
0611 class NetworkConfigurator(threading.Thread):
0612     def __init__(self, taskBuffer, log_stream=None):
0613         threading.Thread.__init__(self)
0614 
0615         self.taskBuffer = taskBuffer
0616         if log_stream:
0617             self.log_stream = log_stream
0618         else:
0619             self.log_stream = _logger
0620 
0621         if hasattr(panda_config, "NWS_URL"):
0622             self.NWS_URL = panda_config.NWS_URL
0623         else:
0624             self.NWS_URL = "https://atlas-rucio-network-metrics.cern.ch/metrics.json"
0625 
0626         if hasattr(panda_config, "CRIC_URL_CM"):
0627             self.CRIC_URL_CM = panda_config.CRIC_URL_CM
0628         else:
0629             self.CRIC_URL_CM = "https://atlas-cric.cern.ch/api/core/sitematrix/query/?json&json_pretty=0"
0630 
0631     def retrieve_data(self):
0632         self.log_stream.debug("Getting NWS dump...")
0633         self.nws_dump = aux.get_dump(self.NWS_URL)
0634         if not self.nws_dump:
0635             self.log_stream.error("Could not retrieve the NWS data")
0636             return False
0637         self.log_stream.debug("Done")
0638 
0639         self.log_stream.debug("Getting CRIC cost matrix dump...")
0640         self.CRIC_cm_dump = aux.get_dump(self.CRIC_URL_CM)
0641         if not self.CRIC_cm_dump:
0642             self.log_stream.error("Could not retrieve the cost matrix data")
0643             return False
0644         self.log_stream.debug("Done")
0645 
0646         return True
0647 
0648     def process_nws_dump(self):
0649         """
0650         Gets the second generation NWS information dump, filters out irrelevant information
0651         and prepares it for insertion into the PanDA DB
0652         """
0653 
0654         data = []
0655         sites_list = self.taskBuffer.configurator_read_sites()
0656         # source or destination shown as UNKNOWN before the rule is processed
0657         sites_list.add("UNKNOWN")
0658 
0659         # Ignore outdated values
0660         latest_validity = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(minutes=30)
0661 
0662         for src_dst in self.nws_dump:
0663             try:
0664                 source, destination = src_dst.split(":")
0665                 skip_sites = []
0666 
0667                 # Skip entries with sites not recognized by configurator
0668                 if source not in sites_list:
0669                     skip_sites.append(source)
0670                 if destination not in sites_list:
0671                     skip_sites.append(destination)
0672                 if skip_sites:
0673                     self.log_stream.warning(f"Could not find site(s) {skip_sites} in configurator sites")
0674                     continue
0675 
0676             except ValueError:
0677                 self.log_stream.error(f"Json wrongly formatted. Expected key with format src:dst, but found key {src_dst}")
0678                 continue
0679 
0680             # Transferred files
0681             try:
0682                 done = self.nws_dump[src_dst][FILES][DONE]
0683                 for activity in [PROD_INPUT, PROD_OUTPUT, EXPRESS]:
0684                     if activity not in done:
0685                         continue
0686                     try:
0687                         updated_at = datetime.strptime(done[activity][TIMESTAMP], "%Y-%m-%dT%H:%M:%S")
0688                         if updated_at > latest_validity:
0689                             done_1h = done[activity][H1]
0690                             done_6h = done[activity][H6]
0691                             data.append(
0692                                 (
0693                                     source,
0694                                     destination,
0695                                     activity + "_done_1h",
0696                                     done_1h,
0697                                     updated_at,
0698                                 )
0699                             )
0700                             data.append(
0701                                 (
0702                                     source,
0703                                     destination,
0704                                     activity + "_done_6h",
0705                                     done_6h,
0706                                     updated_at,
0707                                 )
0708                             )
0709                     except (KeyError, ValueError):
0710                         self.log_stream.debug(f"Entry {done} ({source}->{destination}) key {activity} does not follow standards")
0711                         continue
0712             except KeyError:
0713                 pass
0714 
0715             # Queued files
0716             try:
0717                 queued = self.nws_dump[src_dst][FILES][QUEUED]
0718                 for activity in [PROD_INPUT, PROD_OUTPUT, EXPRESS]:
0719                     if activity not in queued:
0720                         continue
0721                     try:
0722                         updated_at = datetime.strptime(queued[activity][TIMESTAMP], "%Y-%m-%dT%H:%M:%S")
0723                         if updated_at > latest_validity:
0724                             nqueued = queued[activity][LATEST]
0725                             data.append(
0726                                 (
0727                                     source,
0728                                     destination,
0729                                     activity + "_queued",
0730                                     nqueued,
0731                                     updated_at,
0732                                 )
0733                             )
0734                     except (KeyError, ValueError):
0735                         self.log_stream.error(f"Entry {queued} ({source}->{destination}) key {activity} does not follow standards")
0736                         continue
0737             except KeyError:
0738                 pass
0739 
0740             # MBps for Rucio, FAX, PerfSonar
0741             try:
0742                 mbps = self.nws_dump[src_dst][MBPS]
0743                 for system in mbps:
0744                     try:
0745                         updated_at = datetime.strptime(mbps[system][TIMESTAMP], "%Y-%m-%dT%H:%M:%S")
0746                     except ValueError:
0747                         self.log_stream.debug(f"Entry {mbps} has wrong timestamp for system {system}")
0748                     if updated_at > latest_validity:
0749                         for duration in [H1, D1, W1]:
0750                             try:
0751                                 mbps_entry = mbps[system][duration]
0752                                 data.append(
0753                                     (
0754                                         source,
0755                                         destination,
0756                                         f"{system}_mbps_{duration}",
0757                                         mbps_entry,
0758                                         updated_at,
0759                                     )
0760                                 )
0761                             except KeyError:
0762                                 self.log_stream.debug(
0763                                     f"Entry {mbps} ({source}->{destination}) system {system} duration {duration} not available or wrongly formatted"
0764                                 )
0765                                 self.log_stream.debug(sys.exc_info())
0766             except KeyError:
0767                 pass
0768 
0769         return data
0770 
0771     def process_CRIC_cm_dump(self):
0772         """
0773         Gets the CRIC CM information dump, filters out irrelevant information
0774         and prepares it for insertion into the PanDA DB
0775         """
0776         data = []
0777         sites_list = self.taskBuffer.configurator_read_sites()
0778 
0779         for entry in self.CRIC_cm_dump:
0780             self.log_stream.debug(f"Processing CRIC CM entry {entry}")
0781 
0782             try:
0783                 src = entry["src"]
0784                 dst = entry["dst"]
0785                 closeness = entry["closeness"]
0786                 ts = datetime.now()
0787 
0788                 # filter out sites that are not in CRIC
0789                 skip_sites = []
0790                 if src not in sites_list:
0791                     skip_sites.append(src)
0792                 if dst not in sites_list:
0793                     skip_sites.append(dst)
0794                 if skip_sites:
0795                     self.log_stream.warning(f"Could not find site(s) {skip_sites} in configurator sites")
0796                     continue
0797 
0798                 # Skip broken entries (protection against errors in CRIC)
0799                 if not src or not dst:
0800                     continue
0801 
0802                 # Prepare data for bulk upserts
0803                 data.append((src, dst, "AGIS_closeness", closeness, ts))
0804 
0805             except KeyError:
0806                 self.log_stream.warning(f"CRIC CM entry {entry} does not contain one or more of the keys src/dst/closeness")
0807                 continue
0808 
0809         return data
0810 
0811     def run(self):
0812         """
0813         Principal function
0814         """
0815 
0816         # Process and store the NWS information (NWS=Network Weather Service)
0817         data_nws = self.process_nws_dump()
0818         if not data_nws:
0819             self.log_stream.error("Could not retrieve any data from the NWS!")
0820 
0821         # Process and store the CRIC connectivity information
0822         data_CRIC_cm = self.process_CRIC_cm_dump()
0823         if not data_CRIC_cm:
0824             self.log_stream.error("Could not retrieve any data from the CRIC Cost Matrix!")
0825 
0826         data_combined = data_nws + data_CRIC_cm
0827         if data_combined:
0828             # Insert the new data
0829             self.taskBuffer.insertNetworkMatrixData(data_combined)
0830             # Do some cleanup of old data
0831             self.taskBuffer.deleteOldNetworkData()
0832             return True
0833         else:
0834             return False
0835 
0836 
0837 class SchedconfigJsonDumper(threading.Thread):
0838     """
0839     Downloads the CRIC schedconfig dump and stores it in the DB, one row per queue
0840     """
0841 
0842     def __init__(self, taskBuffer, log_stream=None):
0843         """
0844         Initialization and configuration
0845         """
0846         threading.Thread.__init__(self)
0847 
0848         self.taskBuffer = taskBuffer
0849         if log_stream:
0850             self.log_stream = log_stream
0851         else:
0852             self.log_stream = _logger
0853 
0854         if hasattr(panda_config, "CRIC_URL_SCHEDCONFIG"):
0855             self.CRIC_URL_SCHEDCONFIG = panda_config.CRIC_URL_SCHEDCONFIG
0856         else:
0857             self.CRIC_URL_SCHEDCONFIG = "https://atlas-cric.cern.ch/api/atlas/pandaqueue/query/?json"
0858 
0859         self.log_stream.debug("Getting schedconfig dump...")
0860         self.schedconfig_dump = aux.get_dump(self.CRIC_URL_SCHEDCONFIG)
0861         self.log_stream.debug("Done")
0862 
0863     def run(self):
0864         """
0865         Principal function
0866         """
0867         if self.schedconfig_dump is None:
0868             self.log_stream.error(f"SKIPPING RUN. Failed to download {self.CRIC_URL_SCHEDCONFIG}")
0869             return False
0870 
0871         return self.taskBuffer.upsertQueuesInJSONSchedconfig(self.schedconfig_dump)
0872 
0873 
0874 class SWTagsDumper(threading.Thread):
0875     """
0876     Downloads the CRIC tags dump, flattens it out and stores it in the DB, one row per queue
0877     """
0878 
0879     def __init__(self, taskBuffer, log_stream=None):
0880         """
0881         Initialization and configuration
0882         """
0883         threading.Thread.__init__(self)
0884 
0885         self.taskBuffer = taskBuffer
0886         if log_stream:
0887             self.log_stream = log_stream
0888         else:
0889             self.log_stream = _logger
0890 
0891         if hasattr(panda_config, "CRIC_URL_TAGS"):
0892             self.CRIC_URL_TAGS = panda_config.CRIC_URL_TAGS
0893         else:
0894             self.CRIC_URL_TAGS = "https://atlas-cric.cern.ch/api/atlas/pandaqueue/query/?json&preset=tags"
0895 
0896         self.log_stream.debug("Getting tags dump...")
0897         self.tags_dump = aux.get_dump(self.CRIC_URL_TAGS)
0898         self.log_stream.debug("Done")
0899 
0900     def run(self):
0901         """
0902         Principal function
0903         """
0904         if self.tags_dump is None:
0905             self.log_stream.error(f"SKIPPING RUN. Failed to download {self.CRIC_URL_TAGS}")
0906             return False
0907 
0908         return self.taskBuffer.loadSWTags(self.tags_dump)