Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:06

0001 """
0002 site specification
0003 
0004 """
0005 
0006 import re
0007 from typing import Any
0008 
0009 catchall_keys = {
0010     k: k
0011     for k in [
0012         "useJumboJobs",
0013         "gpu",
0014         "grandly_unified",
0015         "nSimEvents",
0016         "minEventsForJumbo",
0017         "maxDiskPerCore",
0018         "use_only_local_data",
0019         "disableReassign",
0020         "jobChunkSize",
0021         "bareNucleus",
0022         "secondaryNucleus",
0023         "allowed_processing",
0024         "excluded_processing",
0025         "per_core_attr",
0026         "allow_no_pilot",
0027     ]
0028 }
0029 
0030 
0031 class SiteSpec(object):
0032     # attributes
0033     _attributes = (
0034         "sitename",
0035         "nickname",
0036         "dq2url",
0037         "cloud",
0038         "ddm",
0039         "ddm_input",
0040         "ddm_output",
0041         "type",
0042         "releases",
0043         "memory",
0044         "maxtime",
0045         "status",
0046         "setokens_input",
0047         "setokens_output",
0048         "defaulttoken",
0049         "validatedreleases",
0050         "maxinputsize",
0051         "comment",
0052         "statusmodtime",
0053         "pledgedCPU",
0054         "coreCount",
0055         "reliabilityLevel",
0056         "iscvmfs",
0057         "transferringlimit",
0058         "maxwdir",
0059         "fairsharePolicy",
0060         "mintime",
0061         "allowfax",
0062         "pandasite",
0063         "corepower",
0064         "wnconnectivity",
0065         "catchall",
0066         "role",
0067         "pandasite_state",
0068         "ddm_endpoints_input",
0069         "ddm_endpoints_output",
0070         "maxrss",
0071         "minrss",
0072         "direct_access_lan",
0073         "direct_access_wan",
0074         "tier",
0075         "objectstores",
0076         "is_unified",
0077         "unified_name",
0078         "jobseed",
0079         "capability",
0080         "num_slots_map",
0081         "workflow",
0082         "maxDiskio",
0083         "extra_queue_params",
0084     )
0085 
0086     # constructor
0087     def __init__(self):
0088         # install attributes
0089         for attr in self._attributes:
0090             setattr(self, attr, None)
0091 
0092     # serialize
0093     def __str__(self):
0094         str = ""
0095         for attr in self._attributes:
0096             str += f"{attr}:{getattr(self, attr)} "
0097         return str
0098 
0099     # check if direct IO is used when tasks allow it
0100     def isDirectIO(self):
0101         if self.direct_access_lan is True:
0102             return True
0103         return False
0104 
0105     # check what type of jobs are allowed
0106     def getJobSeed(self):
0107         tmpVal = self.jobseed
0108         if tmpVal is None:
0109             return "std"
0110         return tmpVal
0111 
0112     # get value from catchall
0113     def getValueFromCatchall(self, key):
0114         # check if the key is valid
0115         if key not in catchall_keys:
0116             return None
0117         key = catchall_keys[key]
0118         # first get the value if the key is defined as an extra queue parameter
0119         has_value, value = self.get_extra_queue_param(key)
0120         if has_value:
0121             return value
0122         # next get the value if the key is defined in the catchall field
0123         if self.catchall is None:
0124             return None
0125         for tmpItem in self.catchall.split(","):
0126             tmpMatch = re.search(f"^{key}=(.+)", tmpItem)
0127             if tmpMatch is not None:
0128                 return tmpMatch.group(1)
0129         return None
0130 
0131     # has value in catchall
0132     def hasValueInCatchall(self, key):
0133         # check if the key is valid
0134         if key not in catchall_keys:
0135             return False
0136         key = catchall_keys[key]
0137         # first check if the key is defined as an extra queue parameter
0138         has_value, _ = self.get_extra_queue_param(key)
0139         if has_value:
0140             return True
0141         # next check if the key is defined in the catchall field
0142         if self.catchall is None:
0143             return False
0144         for tmpItem in self.catchall.split(","):
0145             tmpMatch = re.search(f"^{key}(=|)*", tmpItem)
0146             if tmpMatch is not None:
0147                 return True
0148         return False
0149 
0150     # get extra queue paramaeter
0151     def get_extra_queue_param(self, name: str) -> tuple[bool, None | Any]:
0152         """
0153         Get an extra queue parameter by name.
0154         Arguments:
0155             name: The name of the extra queue parameter to retrieve.
0156         Returns:
0157             A tuple containing a boolean indicating whether the parameter exists and its value (or None if it does not exist).
0158         """
0159         if not self.extra_queue_params or name not in self.extra_queue_params:
0160             return False, None
0161         return True, self.extra_queue_params[name]
0162 
0163     # allow WAN input access
0164     def allowWanInputAccess(self):
0165         return self.direct_access_lan is True and self.direct_access_wan is True
0166 
0167     # use jumbo jobs
0168     def useJumboJobs(self):
0169         return self.hasValueInCatchall("useJumboJobs")
0170 
0171     # GPU
0172     def isGPU(self):
0173         return self.hasValueInCatchall("gpu")
0174 
0175     def is_grandly_unified(self):
0176         if self.hasValueInCatchall("grandly_unified") or self.type == "unified":
0177             return True
0178         return False
0179 
0180     def runs_production(self):
0181         if self.type == "production" or self.is_grandly_unified():
0182             return True
0183         return False
0184 
0185     def runs_analysis(self):
0186         if self.type == "analysis" or self.is_grandly_unified():
0187             return True
0188         return False
0189 
0190     # get unified name
0191     def get_unified_name(self):
0192         if self.unified_name is None:
0193             return self.sitename
0194         return self.unified_name
0195 
0196     # get number of simulated events for dynamic number of events
0197     def get_n_sim_events(self):
0198         tmpVal = self.getValueFromCatchall("nSimEvents")
0199         if tmpVal is None:
0200             return None
0201         return int(tmpVal)
0202 
0203     # get minimum of remaining events for jumbo jobs
0204     def getMinEventsForJumbo(self):
0205         tmpVal = self.getValueFromCatchall("minEventsForJumbo")
0206         if tmpVal is None:
0207             return None
0208         return int(tmpVal)
0209 
0210     # check if opportunistic
0211     def is_opportunistic(self):
0212         return self.pledgedCPU == -1
0213 
0214     # get number of jobs for standby
0215     def getNumStandby(self, sw_id, resource_type):
0216         numMap = self.num_slots_map
0217         # neither gshare or workqueue is definied
0218         if sw_id not in numMap:
0219             if None in numMap:
0220                 sw_id = None
0221             else:
0222                 return None
0223         # give the total if resource type is undefined
0224         if resource_type is None:
0225             return sum(numMap[sw_id].values())
0226         # give the number for the resource type
0227         if resource_type in numMap[sw_id]:
0228             return numMap[sw_id][resource_type]
0229         elif None in numMap[sw_id]:
0230             return numMap[sw_id][None]
0231         return None
0232 
0233     # get max disk per core
0234     def get_max_disk_per_core(self):
0235         tmpVal = self.getValueFromCatchall("maxDiskPerCore")
0236         try:
0237             return int(tmpVal)
0238         except Exception:
0239             pass
0240         return None
0241 
0242     # use local data only
0243     def use_only_local_data(self):
0244         return self.hasValueInCatchall("use_only_local_data")
0245 
0246     # check if use VP
0247     def use_vp(self, scope):
0248         # use default scope if missing
0249         if scope not in self.ddm_endpoints_input:
0250             scope = "default"
0251         # check if VP_DISK is associated
0252         if scope in self.ddm_endpoints_input and [i for i in self.ddm_endpoints_input[scope].getAllEndPoints() if i.endswith("_VP_DISK")]:
0253             return True
0254         return False
0255 
0256     # check if always uses direct IO
0257     def always_use_direct_io(self):
0258         return self.maxinputsize == -1
0259 
0260     # disable reassign
0261     def disable_reassign(self):
0262         if self.hasValueInCatchall("disableReassign"):
0263             return True
0264         self.status == "paused"
0265 
0266     # get job chunk size
0267     def get_job_chunk_size(self):
0268         try:
0269             return int(self.getValueFromCatchall("jobChunkSize"))
0270         except Exception:
0271             return None
0272 
0273     # get WN connectivity
0274     def get_wn_connectivity(self):
0275         if self.wnconnectivity is None:
0276             return None
0277         items = self.wnconnectivity.split("#")
0278         if not items or not items[0]:
0279             return None
0280         else:
0281             return items[0]
0282 
0283     # get IP stack
0284     def get_ipstack(self):
0285         if self.wnconnectivity is None:
0286             return None
0287         items = self.wnconnectivity.split("#")
0288         if len(items) == 2 and items[-1]:
0289             return items[-1]
0290         else:
0291             return None
0292 
0293     # get bare nucleus mode
0294     def bare_nucleus_mode(self):
0295         mode = self.getValueFromCatchall("bareNucleus")
0296         if mode in ["only", "allow"]:
0297             return mode
0298         return None
0299 
0300     # get secondary nucleus
0301     def secondary_nucleus(self):
0302         n = self.getValueFromCatchall("secondaryNucleus")
0303         if n:
0304             return n
0305         return None
0306 
0307     # get allowed processing types
0308     def get_allowed_processing_types(self):
0309         """
0310         Get allowed processing types for processing type-based job brokerage to access only tasks with specific processing types.
0311         They are defined in the catchall field as a pipe-separated list with the key "allowed_processing".
0312         """
0313         n = self.getValueFromCatchall("allowed_processing")
0314         if n:
0315             return n.split("|")
0316         return None
0317 
0318     # get excluded process types
0319     def get_excluded_processing_types(self):
0320         """
0321         Get excluded processing types for processing type-based job brokerage to exclude tasks with specific processing types.
0322         They are defined in the catchall field as a pipe-separated list with the key "excluded_processing".
0323         """
0324         n = self.getValueFromCatchall("excluded_processing")
0325         if n:
0326             return n.split("|")
0327         return None
0328 
0329     # use per-core attributes
0330     def use_per_core_attr(self):
0331         return self.hasValueInCatchall("per_core_attr")