File indexing completed on 2026-04-10 08:39:06
0001 """
0002 site specification
0003
0004 """
0005
0006 import re
0007 from typing import Any
0008
0009 catchall_keys = {
0010 k: k
0011 for k in [
0012 "useJumboJobs",
0013 "gpu",
0014 "grandly_unified",
0015 "nSimEvents",
0016 "minEventsForJumbo",
0017 "maxDiskPerCore",
0018 "use_only_local_data",
0019 "disableReassign",
0020 "jobChunkSize",
0021 "bareNucleus",
0022 "secondaryNucleus",
0023 "allowed_processing",
0024 "excluded_processing",
0025 "per_core_attr",
0026 "allow_no_pilot",
0027 ]
0028 }
0029
0030
0031 class SiteSpec(object):
0032
0033 _attributes = (
0034 "sitename",
0035 "nickname",
0036 "dq2url",
0037 "cloud",
0038 "ddm",
0039 "ddm_input",
0040 "ddm_output",
0041 "type",
0042 "releases",
0043 "memory",
0044 "maxtime",
0045 "status",
0046 "setokens_input",
0047 "setokens_output",
0048 "defaulttoken",
0049 "validatedreleases",
0050 "maxinputsize",
0051 "comment",
0052 "statusmodtime",
0053 "pledgedCPU",
0054 "coreCount",
0055 "reliabilityLevel",
0056 "iscvmfs",
0057 "transferringlimit",
0058 "maxwdir",
0059 "fairsharePolicy",
0060 "mintime",
0061 "allowfax",
0062 "pandasite",
0063 "corepower",
0064 "wnconnectivity",
0065 "catchall",
0066 "role",
0067 "pandasite_state",
0068 "ddm_endpoints_input",
0069 "ddm_endpoints_output",
0070 "maxrss",
0071 "minrss",
0072 "direct_access_lan",
0073 "direct_access_wan",
0074 "tier",
0075 "objectstores",
0076 "is_unified",
0077 "unified_name",
0078 "jobseed",
0079 "capability",
0080 "num_slots_map",
0081 "workflow",
0082 "maxDiskio",
0083 "extra_queue_params",
0084 )
0085
0086
0087 def __init__(self):
0088
0089 for attr in self._attributes:
0090 setattr(self, attr, None)
0091
0092
0093 def __str__(self):
0094 str = ""
0095 for attr in self._attributes:
0096 str += f"{attr}:{getattr(self, attr)} "
0097 return str
0098
0099
0100 def isDirectIO(self):
0101 if self.direct_access_lan is True:
0102 return True
0103 return False
0104
0105
0106 def getJobSeed(self):
0107 tmpVal = self.jobseed
0108 if tmpVal is None:
0109 return "std"
0110 return tmpVal
0111
0112
0113 def getValueFromCatchall(self, key):
0114
0115 if key not in catchall_keys:
0116 return None
0117 key = catchall_keys[key]
0118
0119 has_value, value = self.get_extra_queue_param(key)
0120 if has_value:
0121 return value
0122
0123 if self.catchall is None:
0124 return None
0125 for tmpItem in self.catchall.split(","):
0126 tmpMatch = re.search(f"^{key}=(.+)", tmpItem)
0127 if tmpMatch is not None:
0128 return tmpMatch.group(1)
0129 return None
0130
0131
0132 def hasValueInCatchall(self, key):
0133
0134 if key not in catchall_keys:
0135 return False
0136 key = catchall_keys[key]
0137
0138 has_value, _ = self.get_extra_queue_param(key)
0139 if has_value:
0140 return True
0141
0142 if self.catchall is None:
0143 return False
0144 for tmpItem in self.catchall.split(","):
0145 tmpMatch = re.search(f"^{key}(=|)*", tmpItem)
0146 if tmpMatch is not None:
0147 return True
0148 return False
0149
0150
0151 def get_extra_queue_param(self, name: str) -> tuple[bool, None | Any]:
0152 """
0153 Get an extra queue parameter by name.
0154 Arguments:
0155 name: The name of the extra queue parameter to retrieve.
0156 Returns:
0157 A tuple containing a boolean indicating whether the parameter exists and its value (or None if it does not exist).
0158 """
0159 if not self.extra_queue_params or name not in self.extra_queue_params:
0160 return False, None
0161 return True, self.extra_queue_params[name]
0162
0163
0164 def allowWanInputAccess(self):
0165 return self.direct_access_lan is True and self.direct_access_wan is True
0166
0167
0168 def useJumboJobs(self):
0169 return self.hasValueInCatchall("useJumboJobs")
0170
0171
0172 def isGPU(self):
0173 return self.hasValueInCatchall("gpu")
0174
0175 def is_grandly_unified(self):
0176 if self.hasValueInCatchall("grandly_unified") or self.type == "unified":
0177 return True
0178 return False
0179
0180 def runs_production(self):
0181 if self.type == "production" or self.is_grandly_unified():
0182 return True
0183 return False
0184
0185 def runs_analysis(self):
0186 if self.type == "analysis" or self.is_grandly_unified():
0187 return True
0188 return False
0189
0190
0191 def get_unified_name(self):
0192 if self.unified_name is None:
0193 return self.sitename
0194 return self.unified_name
0195
0196
0197 def get_n_sim_events(self):
0198 tmpVal = self.getValueFromCatchall("nSimEvents")
0199 if tmpVal is None:
0200 return None
0201 return int(tmpVal)
0202
0203
0204 def getMinEventsForJumbo(self):
0205 tmpVal = self.getValueFromCatchall("minEventsForJumbo")
0206 if tmpVal is None:
0207 return None
0208 return int(tmpVal)
0209
0210
0211 def is_opportunistic(self):
0212 return self.pledgedCPU == -1
0213
0214
0215 def getNumStandby(self, sw_id, resource_type):
0216 numMap = self.num_slots_map
0217
0218 if sw_id not in numMap:
0219 if None in numMap:
0220 sw_id = None
0221 else:
0222 return None
0223
0224 if resource_type is None:
0225 return sum(numMap[sw_id].values())
0226
0227 if resource_type in numMap[sw_id]:
0228 return numMap[sw_id][resource_type]
0229 elif None in numMap[sw_id]:
0230 return numMap[sw_id][None]
0231 return None
0232
0233
0234 def get_max_disk_per_core(self):
0235 tmpVal = self.getValueFromCatchall("maxDiskPerCore")
0236 try:
0237 return int(tmpVal)
0238 except Exception:
0239 pass
0240 return None
0241
0242
0243 def use_only_local_data(self):
0244 return self.hasValueInCatchall("use_only_local_data")
0245
0246
0247 def use_vp(self, scope):
0248
0249 if scope not in self.ddm_endpoints_input:
0250 scope = "default"
0251
0252 if scope in self.ddm_endpoints_input and [i for i in self.ddm_endpoints_input[scope].getAllEndPoints() if i.endswith("_VP_DISK")]:
0253 return True
0254 return False
0255
0256
0257 def always_use_direct_io(self):
0258 return self.maxinputsize == -1
0259
0260
0261 def disable_reassign(self):
0262 if self.hasValueInCatchall("disableReassign"):
0263 return True
0264 self.status == "paused"
0265
0266
0267 def get_job_chunk_size(self):
0268 try:
0269 return int(self.getValueFromCatchall("jobChunkSize"))
0270 except Exception:
0271 return None
0272
0273
0274 def get_wn_connectivity(self):
0275 if self.wnconnectivity is None:
0276 return None
0277 items = self.wnconnectivity.split("#")
0278 if not items or not items[0]:
0279 return None
0280 else:
0281 return items[0]
0282
0283
0284 def get_ipstack(self):
0285 if self.wnconnectivity is None:
0286 return None
0287 items = self.wnconnectivity.split("#")
0288 if len(items) == 2 and items[-1]:
0289 return items[-1]
0290 else:
0291 return None
0292
0293
0294 def bare_nucleus_mode(self):
0295 mode = self.getValueFromCatchall("bareNucleus")
0296 if mode in ["only", "allow"]:
0297 return mode
0298 return None
0299
0300
0301 def secondary_nucleus(self):
0302 n = self.getValueFromCatchall("secondaryNucleus")
0303 if n:
0304 return n
0305 return None
0306
0307
0308 def get_allowed_processing_types(self):
0309 """
0310 Get allowed processing types for processing type-based job brokerage to access only tasks with specific processing types.
0311 They are defined in the catchall field as a pipe-separated list with the key "allowed_processing".
0312 """
0313 n = self.getValueFromCatchall("allowed_processing")
0314 if n:
0315 return n.split("|")
0316 return None
0317
0318
0319 def get_excluded_processing_types(self):
0320 """
0321 Get excluded processing types for processing type-based job brokerage to exclude tasks with specific processing types.
0322 They are defined in the catchall field as a pipe-separated list with the key "excluded_processing".
0323 """
0324 n = self.getValueFromCatchall("excluded_processing")
0325 if n:
0326 return n.split("|")
0327 return None
0328
0329
0330 def use_per_core_attr(self):
0331 return self.hasValueInCatchall("per_core_attr")