Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 import copy
0002 import sys
0003 import traceback
0004 
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 
0007 from pandaserver.config import panda_config
0008 from pandaserver.dataservice.DataServiceUtils import select_scope
0009 from pandaserver.taskbuffer.NucleusSpec import NucleusSpec
0010 from pandaserver.taskbuffer.SiteSpec import SiteSpec
0011 
0012 _logger = PandaLogger().getLogger("SiteMapper")
0013 
0014 DEFAULT_SITE = SiteSpec()
0015 DEFAULT_SITE.sitename = panda_config.def_sitename
0016 DEFAULT_SITE.nickname = panda_config.def_nickname
0017 DEFAULT_SITE.ddm_input = {"default": panda_config.def_ddm}
0018 DEFAULT_SITE.ddm_output = {"default": panda_config.def_ddm}
0019 DEFAULT_SITE.type = panda_config.def_type
0020 DEFAULT_SITE.status = panda_config.def_status
0021 DEFAULT_SITE.setokens_input = {}
0022 DEFAULT_SITE.setokens_output = {}
0023 DEFAULT_SITE.ddm_endpoints_input = {}
0024 DEFAULT_SITE.ddm_endpoints_output = {}
0025 
0026 # constants
0027 WORLD_CLOUD = "WORLD"
0028 NUCLEUS_TAG = "nucleus:"
0029 
0030 
0031 class SiteMapper:
0032     def __init__(self, taskBuffer, verbose=False):
0033         _logger.debug("__init__ SiteMapper")
0034         try:
0035             self.siteSpecList = {}
0036             self.cloudSpec = {}  # in reality this is a dictionary of clouds, not a "spec" object
0037             self.worldCloudSpec = {}
0038             self.nuclei = {}
0039             self.satellites = {}
0040             self.endpoint_to_sites_map = {"input": {}, "output": {}}
0041 
0042             # get resource types
0043             resource_types = taskBuffer.load_resource_types()
0044 
0045             # read sites information from database in the format
0046             # {'PANDA_QUEUE_1': < pandaserver.taskbuffer.SiteSpec.SiteSpec object1 >,
0047             #  'PANDA_QUEUE_2': < pandaserver.taskbuffer.SiteSpec.SiteSpec object2 >, ...}
0048             site_spec_dictionary, self.endpoint_detailed_status_summary = taskBuffer.getSiteInfo()
0049 
0050             # create dictionary with clouds
0051             clouds = taskBuffer.get_cloud_list()
0052             for tmp_name in clouds:
0053                 if tmp_name == WORLD_CLOUD:
0054                     self.worldCloudSpec = {"sites": []}
0055                 else:
0056                     self.cloudSpec[tmp_name] = {"sites": []}
0057 
0058             # read DB to produce parameters in site info dynamically
0059             for site_name_tmp, site_spec_tmp in site_spec_dictionary.items():
0060                 # we can't find the site info for the queue
0061                 if not site_spec_tmp:
0062                     _logger.error(f"Could not read site info for {site_name_tmp}")
0063 
0064                 # if the current site has not been processed yet
0065                 # or the current site has been processed and is offline or has no status
0066                 elif (site_name_tmp not in self.siteSpecList) or (
0067                     site_name_tmp in self.siteSpecList and self.siteSpecList[site_name_tmp].status in ["offline", ""]
0068                 ):
0069                     # remove existing offline site
0070                     if site_name_tmp in self.siteSpecList and self.siteSpecList[site_name_tmp].status in ["offline", ""]:
0071                         del self.siteSpecList[site_name_tmp]
0072 
0073                     # add the site information
0074                     if site_name_tmp not in self.siteSpecList:
0075                         # don't use site for production when cloud is undefined
0076                         if site_spec_tmp.runs_production() and not site_spec_tmp.cloud:
0077                             _logger.error(f"Empty cloud for {site_name_tmp}")
0078                         else:
0079                             self.siteSpecList[site_name_tmp] = site_spec_tmp
0080 
0081                 # if the current site has been processed and is not offline or has a status
0082                 else:
0083                     # overwrite status
0084                     if site_spec_tmp.status not in ["offline", ""]:
0085                         if self.siteSpecList[site_name_tmp].status != "online":
0086                             self.siteSpecList[site_name_tmp].status = site_spec_tmp.status
0087 
0088                         # use larger maxinputsize and memory
0089                         try:
0090                             if site_spec_tmp.status in ["online"]:
0091                                 if self.siteSpecList[site_name_tmp].maxinputsize < site_spec_tmp.maxinputsize or site_spec_tmp.maxinputsize == 0:
0092                                     self.siteSpecList[site_name_tmp].maxinputsize = site_spec_tmp.maxinputsize
0093                                 if (
0094                                     self.siteSpecList[site_name_tmp].memory != 0 and self.siteSpecList[site_name_tmp].memory < site_spec_tmp.memory
0095                                 ) or site_spec_tmp.memory == 0:
0096                                     self.siteSpecList[site_name_tmp].memory = site_spec_tmp.memory
0097                         except Exception:
0098                             error_type, error_value = sys.exc_info()[:2]
0099                             _logger.error(f"{site_name_tmp} memory/inputsize failure : {error_type} {error_value}")
0100 
0101             # make map from endpoint to sites
0102             self.make_endpoint_to_sites_map()
0103 
0104             # make virtual queues from unified queues
0105             try:
0106                 for site_name_tmp in list(self.siteSpecList):
0107                     site_spec_tmp = self.siteSpecList[site_name_tmp]
0108                     if site_spec_tmp.capability == "ucore":
0109                         # add child sites
0110                         for resource_spec_tmp in resource_types:
0111                             child_site_spec = self.get_child_site_spec(site_spec_tmp, resource_spec_tmp)
0112                             if child_site_spec:
0113                                 self.siteSpecList[child_site_spec.sitename] = child_site_spec
0114 
0115                         # set unified flag
0116                         site_spec_tmp.is_unified = True
0117             except Exception:
0118                 _logger.error(traceback.format_exc())
0119 
0120             # collect nuclei and satellites for main and child sites
0121             for tmp_site_spec in self.siteSpecList.values():
0122                 self.collect_nuclei_and_satellites(tmp_site_spec)
0123 
0124             # group sites by cloud
0125             for tmp_site_spec in self.siteSpecList.values():
0126                 # choose only prod or grandly unified sites
0127                 if not tmp_site_spec.runs_production():
0128                     continue
0129 
0130                 # append prod site to cloud structure
0131                 if tmp_site_spec.cloud and tmp_site_spec.cloud in self.cloudSpec:
0132                     if tmp_site_spec.sitename not in self.cloudSpec[tmp_site_spec.cloud]["sites"]:
0133                         self.cloudSpec[tmp_site_spec.cloud]["sites"].append(tmp_site_spec.sitename)
0134 
0135                 # add to WORLD cloud
0136                 if tmp_site_spec.sitename not in self.worldCloudSpec["sites"]:
0137                     self.worldCloudSpec["sites"].append(tmp_site_spec.sitename)
0138 
0139             # dump site information in verbose mode and cloud information
0140             if verbose:
0141                 self.dump_site_information()
0142             self.dump_cloud_information()
0143 
0144         except Exception:
0145             error_type, error_value, _ = sys.exc_info()
0146             _logger.error(f"__init__ SiteMapper : {error_type} {error_value}")
0147             _logger.error(traceback.format_exc())
0148 
0149         _logger.debug("__init__ SiteMapper done")
0150 
0151     def get_child_site_spec(self, site_spec, resource_spec):
0152         core_count = max(1, site_spec.coreCount)
0153 
0154         # make sure our queue is compatible with the resource type
0155         if resource_spec.mincore is not None and core_count < resource_spec.mincore:
0156             return None
0157 
0158         # copy the site spec for the child site and later overwrite relevant fields
0159         child_site_spec = copy.copy(site_spec)
0160         child_site_spec.sitename = f"{site_spec.sitename}/{resource_spec.resource_name}"
0161 
0162         # calculate the core count for the child queue
0163         if resource_spec.maxcore is None:
0164             child_site_spec.coreCount = max(core_count, resource_spec.mincore)
0165         else:
0166             child_site_spec.coreCount = max(
0167                 min(core_count, resource_spec.maxcore),
0168                 resource_spec.mincore,
0169             )
0170 
0171         # calculate the minRSS for the child queue
0172         if resource_spec.minrampercore is not None:
0173             child_site_spec.minrss = max(
0174                 child_site_spec.coreCount * resource_spec.minrampercore - child_site_spec.coreCount + 1,
0175                 site_spec.minrss * child_site_spec.coreCount / core_count,
0176             )
0177         else:
0178             child_site_spec.minrss = max(
0179                 site_spec.minrss * child_site_spec.coreCount / core_count,
0180                 site_spec.minrss,
0181             )
0182 
0183         # calculate the maxRSS for the child queue
0184         if resource_spec.maxrampercore is not None:
0185             child_site_spec.maxrss = min(
0186                 child_site_spec.coreCount * resource_spec.maxrampercore,
0187                 site_spec.maxrss * child_site_spec.coreCount / core_count,
0188             )
0189         else:
0190             child_site_spec.maxrss = min(
0191                 site_spec.maxrss * child_site_spec.coreCount / core_count,
0192                 site_spec.maxrss,
0193             )
0194 
0195         # set unified name
0196         child_site_spec.unified_name = site_spec.sitename
0197 
0198         return child_site_spec
0199 
0200     # collect nuclei and satellites
0201     def collect_nuclei_and_satellites(self, ret):
0202         # only consider production sites
0203         if not ret.runs_production():
0204             return
0205 
0206         # target will point to the nuclei or satellites attribute in the SiteMapper object
0207         if ret.role == "nucleus":
0208             target = self.nuclei
0209         elif ret.role == "satellite":
0210             target = self.satellites
0211         else:
0212             return
0213 
0214         if ret.pandasite not in target:
0215             atom = NucleusSpec(ret.pandasite)
0216             atom.state = ret.pandasite_state
0217 
0218             # set if there is a bare nucleus mode (only, allow)
0219             mode = ret.bare_nucleus_mode()
0220             if mode:
0221                 atom.set_bare_nucleus_mode(mode)
0222 
0223             # set if this is a secondary nucleus
0224             secondary = ret.secondary_nucleus()
0225             if secondary:
0226                 atom.set_secondary_nucleus(secondary)
0227 
0228             # set if this is a satellite
0229             if ret.role == "satellite":
0230                 atom.set_satellite()
0231 
0232             # set the default endpoint out
0233             try:
0234                 atom.set_default_endpoint_out(ret.ddm_endpoints_output["default"].getDefaultWrite())
0235             except Exception:
0236                 pass
0237 
0238             # add the atom to the dictionary of nuclei/satellites in the SiteMapper object
0239             target[ret.pandasite] = atom
0240 
0241         # add the site name and ddm endpoints
0242         target[ret.pandasite].add(ret.sitename, ret.ddm_endpoints_output, ret.ddm_endpoints_input)
0243 
0244     def clean_site_name(self, site_name):
0245         try:
0246             if site_name.startswith(NUCLEUS_TAG):
0247                 tmp_name = site_name.split(":")[-1]
0248                 if tmp_name in self.nuclei:
0249                     site_name = self.nuclei[tmp_name].getOnePandaSite()
0250                 elif tmp_name in self.satellites:
0251                     site_name = self.satellites[tmp_name].getOnePandaSite()
0252         except Exception:
0253             pass
0254 
0255         return site_name
0256 
0257     # accessor for site
0258     def getSite(self, site_name):
0259         site_name = self.clean_site_name(site_name)
0260 
0261         # Return the site spec if it exists
0262         if site_name in self.siteSpecList:
0263             return self.siteSpecList[site_name]
0264 
0265         # return default site
0266         return DEFAULT_SITE
0267 
0268     # check if site exists
0269     def checkSite(self, site_name):
0270         site_name = self.clean_site_name(site_name)
0271         return site_name in self.siteSpecList
0272 
0273     # resolve nucleus
0274     def resolveNucleus(self, site_name):
0275         site_name = self.clean_site_name(site_name)
0276         if site_name == "NULL":
0277             site_name = None
0278         return site_name
0279 
0280     # accessor for cloud
0281     def getCloud(self, cloud):
0282         if cloud in self.cloudSpec:
0283             return self.cloudSpec[cloud]
0284 
0285         if cloud == WORLD_CLOUD:
0286             return self.worldCloudSpec
0287 
0288         # return all sites (WORLD) as fallback for unknown clouds
0289         return self.worldCloudSpec
0290 
0291     # accessor for cloud
0292     def checkCloud(self, cloud):
0293         if cloud in self.cloudSpec:
0294             return True
0295 
0296         if cloud == WORLD_CLOUD:
0297             return True
0298 
0299         return False
0300 
0301     # accessor for cloud list
0302     def getCloudList(self):
0303         return list(self.cloudSpec)
0304 
0305     # get DDM endpoint
0306     def getDdmEndpoint(self, site_name, storage_token, prod_source_label, job_label):
0307         # Skip if site doesn't exist
0308         if not self.checkSite(site_name):
0309             return None
0310 
0311         # Get the site spec and the input/output scope
0312         site_spec = self.getSite(site_name)
0313         scope_input, scope_output = select_scope(site_spec, prod_source_label, job_label)
0314 
0315         if storage_token in site_spec.setokens_output[scope_output]:
0316             return site_spec.setokens_output[scope_output][storage_token]
0317 
0318         return site_spec.ddm_output[scope_output]
0319 
0320     # get nucleus
0321     def getNucleus(self, site_name):
0322         if site_name in self.nuclei:
0323             return self.nuclei[site_name]
0324         if site_name in self.satellites:
0325             return self.satellites[site_name]
0326         return None
0327 
0328     def dump_site_information(self):
0329         _logger.debug("========= site dump =========")
0330         for tmp_site_spec in self.siteSpecList.values():
0331             _logger.debug(f"Site->{str(tmp_site_spec)}")
0332 
0333     def dump_cloud_information(self):
0334         for cloud_name_tmp, cloud_spec_tmp in self.cloudSpec.items():
0335             # Generate lists of sites with special cases (offline or not existing)
0336             sites_with_issues = []
0337             sites_offline = []
0338             for site_name_tmp in cloud_spec_tmp["sites"]:
0339                 if site_name_tmp not in self.siteSpecList:
0340                     sites_with_issues.append({site_name_tmp})
0341                     continue
0342 
0343                 site_spec_tmp = self.siteSpecList[site_name_tmp]
0344                 if site_spec_tmp.status == "offline":
0345                     sites_offline.append(f"{site_name_tmp}")
0346 
0347             _logger.debug(f"========= {cloud_name_tmp} cloud dump =========")
0348             _logger.debug(f"Cloud:{cloud_name_tmp} has {cloud_spec_tmp['sites']}")
0349             if sites_offline:
0350                 _logger.debug(f"Cloud:{cloud_name_tmp} has sites offline:{sites_offline}")
0351             if sites_with_issues:
0352                 _logger.debug(f"Cloud:{cloud_name_tmp} has sites that don't exist:{sites_with_issues}")
0353 
0354         # dump the WORLD cloud sites
0355         _logger.debug("========= WORLD cloud dump =========")
0356         _logger.debug(f"Cloud:WORLD has {self.worldCloudSpec['sites']}")
0357 
0358     def is_readable_remotely(self, endpoint_name: str) -> bool:
0359         """Check if the given endpoint is readable remotely over WAN
0360         Args:
0361             endpoint_name (str): Name of the endpoint to check
0362         Returns:
0363             bool: True if the endpoint is readable, False otherwise
0364         """
0365         endpoints_with_read_wan_status = self.endpoint_detailed_status_summary.get("read_wan", {})
0366         bad_endpoints = endpoints_with_read_wan_status.get("OFF", []) + endpoints_with_read_wan_status.get("TEST", [])
0367         return endpoint_name not in bad_endpoints
0368 
0369     def is_readable_locally(self, endpoint_name: str) -> bool:
0370         """Check if the given endpoint is readable locally over LAN
0371         Args:
0372             endpoint_name (str): Name of the endpoint to check
0373         Returns:
0374             bool: True if the endpoint is readable, False otherwise
0375         """
0376         endpoints_with_read_lan_status = self.endpoint_detailed_status_summary.get("read_lan", {})
0377         bad_endpoints = endpoints_with_read_lan_status.get("OFF", []) + endpoints_with_read_lan_status.get("TEST", [])
0378         return endpoint_name not in bad_endpoints
0379 
0380     def make_endpoint_to_sites_map(self) -> None:
0381         """Create a mapping from endpoints to sites"""
0382         for site_name, site_spec in self.siteSpecList.items():
0383             for endpoint_spec in site_spec.ddm_endpoints_input.values():
0384                 for endpoint in endpoint_spec.all.keys():
0385                     self.endpoint_to_sites_map["input"].setdefault(endpoint, [])
0386                     if site_name not in self.endpoint_to_sites_map["input"][endpoint]:
0387                         self.endpoint_to_sites_map["input"][endpoint].append(site_name)
0388             for endpoint_spec in site_spec.ddm_endpoints_output.values():
0389                 for endpoint in endpoint_spec.all.keys():
0390                     self.endpoint_to_sites_map["output"].setdefault(endpoint, [])
0391                     if site_name not in self.endpoint_to_sites_map["output"][endpoint]:
0392                         self.endpoint_to_sites_map["output"][endpoint].append(site_name)
0393 
0394     def get_sites_for_endpoint(self, endpoint_name: str, direction: str) -> list:
0395         """Get the set of sites associated with a given endpoint
0396         Args:
0397             endpoint_name (str): Name of the endpoint
0398             direction (str): 'input' or 'output' to specify the direction of data flow
0399         Returns:
0400             set: A list of site names associated with the endpoint
0401         """
0402         return self.endpoint_to_sites_map.get(direction, {}).get(endpoint_name, [])