File indexing completed on 2026-04-10 08:39:00
0001 import copy
0002 import sys
0003 import traceback
0004
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006
0007 from pandaserver.config import panda_config
0008 from pandaserver.dataservice.DataServiceUtils import select_scope
0009 from pandaserver.taskbuffer.NucleusSpec import NucleusSpec
0010 from pandaserver.taskbuffer.SiteSpec import SiteSpec
0011
0012 _logger = PandaLogger().getLogger("SiteMapper")
0013
0014 DEFAULT_SITE = SiteSpec()
0015 DEFAULT_SITE.sitename = panda_config.def_sitename
0016 DEFAULT_SITE.nickname = panda_config.def_nickname
0017 DEFAULT_SITE.ddm_input = {"default": panda_config.def_ddm}
0018 DEFAULT_SITE.ddm_output = {"default": panda_config.def_ddm}
0019 DEFAULT_SITE.type = panda_config.def_type
0020 DEFAULT_SITE.status = panda_config.def_status
0021 DEFAULT_SITE.setokens_input = {}
0022 DEFAULT_SITE.setokens_output = {}
0023 DEFAULT_SITE.ddm_endpoints_input = {}
0024 DEFAULT_SITE.ddm_endpoints_output = {}
0025
0026
0027 WORLD_CLOUD = "WORLD"
0028 NUCLEUS_TAG = "nucleus:"
0029
0030
0031 class SiteMapper:
0032 def __init__(self, taskBuffer, verbose=False):
0033 _logger.debug("__init__ SiteMapper")
0034 try:
0035 self.siteSpecList = {}
0036 self.cloudSpec = {}
0037 self.worldCloudSpec = {}
0038 self.nuclei = {}
0039 self.satellites = {}
0040 self.endpoint_to_sites_map = {"input": {}, "output": {}}
0041
0042
0043 resource_types = taskBuffer.load_resource_types()
0044
0045
0046
0047
0048 site_spec_dictionary, self.endpoint_detailed_status_summary = taskBuffer.getSiteInfo()
0049
0050
0051 clouds = taskBuffer.get_cloud_list()
0052 for tmp_name in clouds:
0053 if tmp_name == WORLD_CLOUD:
0054 self.worldCloudSpec = {"sites": []}
0055 else:
0056 self.cloudSpec[tmp_name] = {"sites": []}
0057
0058
0059 for site_name_tmp, site_spec_tmp in site_spec_dictionary.items():
0060
0061 if not site_spec_tmp:
0062 _logger.error(f"Could not read site info for {site_name_tmp}")
0063
0064
0065
0066 elif (site_name_tmp not in self.siteSpecList) or (
0067 site_name_tmp in self.siteSpecList and self.siteSpecList[site_name_tmp].status in ["offline", ""]
0068 ):
0069
0070 if site_name_tmp in self.siteSpecList and self.siteSpecList[site_name_tmp].status in ["offline", ""]:
0071 del self.siteSpecList[site_name_tmp]
0072
0073
0074 if site_name_tmp not in self.siteSpecList:
0075
0076 if site_spec_tmp.runs_production() and not site_spec_tmp.cloud:
0077 _logger.error(f"Empty cloud for {site_name_tmp}")
0078 else:
0079 self.siteSpecList[site_name_tmp] = site_spec_tmp
0080
0081
0082 else:
0083
0084 if site_spec_tmp.status not in ["offline", ""]:
0085 if self.siteSpecList[site_name_tmp].status != "online":
0086 self.siteSpecList[site_name_tmp].status = site_spec_tmp.status
0087
0088
0089 try:
0090 if site_spec_tmp.status in ["online"]:
0091 if self.siteSpecList[site_name_tmp].maxinputsize < site_spec_tmp.maxinputsize or site_spec_tmp.maxinputsize == 0:
0092 self.siteSpecList[site_name_tmp].maxinputsize = site_spec_tmp.maxinputsize
0093 if (
0094 self.siteSpecList[site_name_tmp].memory != 0 and self.siteSpecList[site_name_tmp].memory < site_spec_tmp.memory
0095 ) or site_spec_tmp.memory == 0:
0096 self.siteSpecList[site_name_tmp].memory = site_spec_tmp.memory
0097 except Exception:
0098 error_type, error_value = sys.exc_info()[:2]
0099 _logger.error(f"{site_name_tmp} memory/inputsize failure : {error_type} {error_value}")
0100
0101
0102 self.make_endpoint_to_sites_map()
0103
0104
0105 try:
0106 for site_name_tmp in list(self.siteSpecList):
0107 site_spec_tmp = self.siteSpecList[site_name_tmp]
0108 if site_spec_tmp.capability == "ucore":
0109
0110 for resource_spec_tmp in resource_types:
0111 child_site_spec = self.get_child_site_spec(site_spec_tmp, resource_spec_tmp)
0112 if child_site_spec:
0113 self.siteSpecList[child_site_spec.sitename] = child_site_spec
0114
0115
0116 site_spec_tmp.is_unified = True
0117 except Exception:
0118 _logger.error(traceback.format_exc())
0119
0120
0121 for tmp_site_spec in self.siteSpecList.values():
0122 self.collect_nuclei_and_satellites(tmp_site_spec)
0123
0124
0125 for tmp_site_spec in self.siteSpecList.values():
0126
0127 if not tmp_site_spec.runs_production():
0128 continue
0129
0130
0131 if tmp_site_spec.cloud and tmp_site_spec.cloud in self.cloudSpec:
0132 if tmp_site_spec.sitename not in self.cloudSpec[tmp_site_spec.cloud]["sites"]:
0133 self.cloudSpec[tmp_site_spec.cloud]["sites"].append(tmp_site_spec.sitename)
0134
0135
0136 if tmp_site_spec.sitename not in self.worldCloudSpec["sites"]:
0137 self.worldCloudSpec["sites"].append(tmp_site_spec.sitename)
0138
0139
0140 if verbose:
0141 self.dump_site_information()
0142 self.dump_cloud_information()
0143
0144 except Exception:
0145 error_type, error_value, _ = sys.exc_info()
0146 _logger.error(f"__init__ SiteMapper : {error_type} {error_value}")
0147 _logger.error(traceback.format_exc())
0148
0149 _logger.debug("__init__ SiteMapper done")
0150
0151 def get_child_site_spec(self, site_spec, resource_spec):
0152 core_count = max(1, site_spec.coreCount)
0153
0154
0155 if resource_spec.mincore is not None and core_count < resource_spec.mincore:
0156 return None
0157
0158
0159 child_site_spec = copy.copy(site_spec)
0160 child_site_spec.sitename = f"{site_spec.sitename}/{resource_spec.resource_name}"
0161
0162
0163 if resource_spec.maxcore is None:
0164 child_site_spec.coreCount = max(core_count, resource_spec.mincore)
0165 else:
0166 child_site_spec.coreCount = max(
0167 min(core_count, resource_spec.maxcore),
0168 resource_spec.mincore,
0169 )
0170
0171
0172 if resource_spec.minrampercore is not None:
0173 child_site_spec.minrss = max(
0174 child_site_spec.coreCount * resource_spec.minrampercore - child_site_spec.coreCount + 1,
0175 site_spec.minrss * child_site_spec.coreCount / core_count,
0176 )
0177 else:
0178 child_site_spec.minrss = max(
0179 site_spec.minrss * child_site_spec.coreCount / core_count,
0180 site_spec.minrss,
0181 )
0182
0183
0184 if resource_spec.maxrampercore is not None:
0185 child_site_spec.maxrss = min(
0186 child_site_spec.coreCount * resource_spec.maxrampercore,
0187 site_spec.maxrss * child_site_spec.coreCount / core_count,
0188 )
0189 else:
0190 child_site_spec.maxrss = min(
0191 site_spec.maxrss * child_site_spec.coreCount / core_count,
0192 site_spec.maxrss,
0193 )
0194
0195
0196 child_site_spec.unified_name = site_spec.sitename
0197
0198 return child_site_spec
0199
0200
0201 def collect_nuclei_and_satellites(self, ret):
0202
0203 if not ret.runs_production():
0204 return
0205
0206
0207 if ret.role == "nucleus":
0208 target = self.nuclei
0209 elif ret.role == "satellite":
0210 target = self.satellites
0211 else:
0212 return
0213
0214 if ret.pandasite not in target:
0215 atom = NucleusSpec(ret.pandasite)
0216 atom.state = ret.pandasite_state
0217
0218
0219 mode = ret.bare_nucleus_mode()
0220 if mode:
0221 atom.set_bare_nucleus_mode(mode)
0222
0223
0224 secondary = ret.secondary_nucleus()
0225 if secondary:
0226 atom.set_secondary_nucleus(secondary)
0227
0228
0229 if ret.role == "satellite":
0230 atom.set_satellite()
0231
0232
0233 try:
0234 atom.set_default_endpoint_out(ret.ddm_endpoints_output["default"].getDefaultWrite())
0235 except Exception:
0236 pass
0237
0238
0239 target[ret.pandasite] = atom
0240
0241
0242 target[ret.pandasite].add(ret.sitename, ret.ddm_endpoints_output, ret.ddm_endpoints_input)
0243
0244 def clean_site_name(self, site_name):
0245 try:
0246 if site_name.startswith(NUCLEUS_TAG):
0247 tmp_name = site_name.split(":")[-1]
0248 if tmp_name in self.nuclei:
0249 site_name = self.nuclei[tmp_name].getOnePandaSite()
0250 elif tmp_name in self.satellites:
0251 site_name = self.satellites[tmp_name].getOnePandaSite()
0252 except Exception:
0253 pass
0254
0255 return site_name
0256
0257
0258 def getSite(self, site_name):
0259 site_name = self.clean_site_name(site_name)
0260
0261
0262 if site_name in self.siteSpecList:
0263 return self.siteSpecList[site_name]
0264
0265
0266 return DEFAULT_SITE
0267
0268
0269 def checkSite(self, site_name):
0270 site_name = self.clean_site_name(site_name)
0271 return site_name in self.siteSpecList
0272
0273
0274 def resolveNucleus(self, site_name):
0275 site_name = self.clean_site_name(site_name)
0276 if site_name == "NULL":
0277 site_name = None
0278 return site_name
0279
0280
0281 def getCloud(self, cloud):
0282 if cloud in self.cloudSpec:
0283 return self.cloudSpec[cloud]
0284
0285 if cloud == WORLD_CLOUD:
0286 return self.worldCloudSpec
0287
0288
0289 return self.worldCloudSpec
0290
0291
0292 def checkCloud(self, cloud):
0293 if cloud in self.cloudSpec:
0294 return True
0295
0296 if cloud == WORLD_CLOUD:
0297 return True
0298
0299 return False
0300
0301
0302 def getCloudList(self):
0303 return list(self.cloudSpec)
0304
0305
0306 def getDdmEndpoint(self, site_name, storage_token, prod_source_label, job_label):
0307
0308 if not self.checkSite(site_name):
0309 return None
0310
0311
0312 site_spec = self.getSite(site_name)
0313 scope_input, scope_output = select_scope(site_spec, prod_source_label, job_label)
0314
0315 if storage_token in site_spec.setokens_output[scope_output]:
0316 return site_spec.setokens_output[scope_output][storage_token]
0317
0318 return site_spec.ddm_output[scope_output]
0319
0320
0321 def getNucleus(self, site_name):
0322 if site_name in self.nuclei:
0323 return self.nuclei[site_name]
0324 if site_name in self.satellites:
0325 return self.satellites[site_name]
0326 return None
0327
0328 def dump_site_information(self):
0329 _logger.debug("========= site dump =========")
0330 for tmp_site_spec in self.siteSpecList.values():
0331 _logger.debug(f"Site->{str(tmp_site_spec)}")
0332
0333 def dump_cloud_information(self):
0334 for cloud_name_tmp, cloud_spec_tmp in self.cloudSpec.items():
0335
0336 sites_with_issues = []
0337 sites_offline = []
0338 for site_name_tmp in cloud_spec_tmp["sites"]:
0339 if site_name_tmp not in self.siteSpecList:
0340 sites_with_issues.append({site_name_tmp})
0341 continue
0342
0343 site_spec_tmp = self.siteSpecList[site_name_tmp]
0344 if site_spec_tmp.status == "offline":
0345 sites_offline.append(f"{site_name_tmp}")
0346
0347 _logger.debug(f"========= {cloud_name_tmp} cloud dump =========")
0348 _logger.debug(f"Cloud:{cloud_name_tmp} has {cloud_spec_tmp['sites']}")
0349 if sites_offline:
0350 _logger.debug(f"Cloud:{cloud_name_tmp} has sites offline:{sites_offline}")
0351 if sites_with_issues:
0352 _logger.debug(f"Cloud:{cloud_name_tmp} has sites that don't exist:{sites_with_issues}")
0353
0354
0355 _logger.debug("========= WORLD cloud dump =========")
0356 _logger.debug(f"Cloud:WORLD has {self.worldCloudSpec['sites']}")
0357
0358 def is_readable_remotely(self, endpoint_name: str) -> bool:
0359 """Check if the given endpoint is readable remotely over WAN
0360 Args:
0361 endpoint_name (str): Name of the endpoint to check
0362 Returns:
0363 bool: True if the endpoint is readable, False otherwise
0364 """
0365 endpoints_with_read_wan_status = self.endpoint_detailed_status_summary.get("read_wan", {})
0366 bad_endpoints = endpoints_with_read_wan_status.get("OFF", []) + endpoints_with_read_wan_status.get("TEST", [])
0367 return endpoint_name not in bad_endpoints
0368
0369 def is_readable_locally(self, endpoint_name: str) -> bool:
0370 """Check if the given endpoint is readable locally over LAN
0371 Args:
0372 endpoint_name (str): Name of the endpoint to check
0373 Returns:
0374 bool: True if the endpoint is readable, False otherwise
0375 """
0376 endpoints_with_read_lan_status = self.endpoint_detailed_status_summary.get("read_lan", {})
0377 bad_endpoints = endpoints_with_read_lan_status.get("OFF", []) + endpoints_with_read_lan_status.get("TEST", [])
0378 return endpoint_name not in bad_endpoints
0379
0380 def make_endpoint_to_sites_map(self) -> None:
0381 """Create a mapping from endpoints to sites"""
0382 for site_name, site_spec in self.siteSpecList.items():
0383 for endpoint_spec in site_spec.ddm_endpoints_input.values():
0384 for endpoint in endpoint_spec.all.keys():
0385 self.endpoint_to_sites_map["input"].setdefault(endpoint, [])
0386 if site_name not in self.endpoint_to_sites_map["input"][endpoint]:
0387 self.endpoint_to_sites_map["input"][endpoint].append(site_name)
0388 for endpoint_spec in site_spec.ddm_endpoints_output.values():
0389 for endpoint in endpoint_spec.all.keys():
0390 self.endpoint_to_sites_map["output"].setdefault(endpoint, [])
0391 if site_name not in self.endpoint_to_sites_map["output"][endpoint]:
0392 self.endpoint_to_sites_map["output"][endpoint].append(site_name)
0393
0394 def get_sites_for_endpoint(self, endpoint_name: str, direction: str) -> list:
0395 """Get the set of sites associated with a given endpoint
0396 Args:
0397 endpoint_name (str): Name of the endpoint
0398 direction (str): 'input' or 'output' to specify the direction of data flow
0399 Returns:
0400 set: A list of site names associated with the endpoint
0401 """
0402 return self.endpoint_to_sites_map.get(direction, {}).get(endpoint_name, [])