Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0002 
0003 
0004 class PandaQueuesDictK8s(PandaQueuesDict):
0005     def get_k8s_scheduler_settings(self, panda_resource):
0006         # this is how the affinity settings are declared in CRIC
0007         key_affinity = "k8s.scheduler.use_score_mcore_affinity"
0008         key_anti_affinity = "k8s.scheduler.use_score_mcore_anti_affinity"
0009 
0010         params = self.get_harvester_params(panda_resource)
0011         ret_map = {}
0012 
0013         try:
0014             ret_map["use_affinity"] = params[key_affinity]
0015         except KeyError:
0016             # return default value
0017             ret_map["use_affinity"] = True
0018 
0019         try:
0020             ret_map["use_anti_affinity"] = params[key_anti_affinity]
0021         except KeyError:
0022             # return default value
0023             ret_map["use_anti_affinity"] = False
0024 
0025         # this is how the affinity settings are declared in CRIC
0026         key_priority_class = "k8s.scheduler.priorityClassName"
0027         key_priority_class_score = "k8s.scheduler.priorityClassName.score"
0028         key_priority_class_score_himem = "k8s.scheduler.priorityClassName.score_himem"
0029         key_priority_class_mcore = "k8s.scheduler.priorityClassName.mcore"
0030         key_priority_class_mcore_himem = "k8s.scheduler.priorityClassName.mcore_himem"
0031 
0032         ret_map["priority_class"] = params.get(key_priority_class, None)
0033         ret_map["priority_class_score"] = params.get(key_priority_class_score, None)
0034         ret_map["priority_class_score_himem"] = params.get(key_priority_class_score_himem, None)
0035         ret_map["priority_class_mcore"] = params.get(key_priority_class_mcore, None)
0036         ret_map["priority_class_mcore_himem"] = params.get(key_priority_class_mcore_himem, None)
0037 
0038         return ret_map
0039 
0040     def get_k8s_resource_settings(self, panda_resource):
0041         params = self.get_harvester_params(panda_resource)
0042         ret_map = {}
0043 
0044         # this is how the CPU parameters are declared in CRIC
0045         key_cpu_scheduling_ratio = "k8s.resources.requests.cpu_scheduling_ratio"
0046         ret_map["cpu_scheduling_ratio"] = params.get(key_cpu_scheduling_ratio, 90)
0047 
0048         # this is how the memory parameters are declared in CRIC
0049         key_memory_limit = "k8s.resources.limits.use_memory_limit"
0050         key_memory_limit_safety_factor = "k8s.resources.limits.memory_limit_safety_factor"
0051         key_memory_limit_min_offset = "k8s.resources.limits.memory_limit_min_offset"
0052         key_memory_scheduling_ratio = "k8s.resources.requests.memory_scheduling_ratio"
0053 
0054         ret_map["use_memory_limit"] = params.get(key_memory_limit, False)
0055         ret_map["memory_limit_safety_factor"] = params.get(key_memory_limit_safety_factor, 100)
0056         ret_map["memory_limit_min_offset"] = params.get(key_memory_limit_min_offset, 0)  # in MiB to be consistent with minRamCount
0057         ret_map["memory_scheduling_ratio"] = params.get(key_memory_scheduling_ratio, 100)
0058 
0059         # this is how the ephemeral storage parameters are declared in CRIC
0060         key_ephemeral_storage = "k8s.resources.use_ephemeral_storage_resource_specs"
0061         key_ephemeral_storage_resources_offset = "k8s.resources.ephemeral_storage_offset"
0062         key_ephemeral_storage_limit_safety_factor = "k8s.resources.limits.ephemeral_storage_limit_safety_factor"
0063 
0064         ret_map["use_ephemeral_storage"] = params.get(key_ephemeral_storage, True)  # use ephemeral storage unless explicitly disabled
0065         ret_map["ephemeral_storage_limit_safety_factor"] = params.get(key_ephemeral_storage_limit_safety_factor, 100)
0066         ret_map["ephemeral_storage_offset"] = params.get(key_ephemeral_storage_resources_offset, 0)  # should come in MiB
0067 
0068         # decide whether to kill on maxtime
0069         use_active_deadline_seconds = "k8s.use_active_deadline_seconds"
0070 
0071         ret_map["use_active_deadline_seconds"] = params.get(use_active_deadline_seconds, True)  # kill on max time
0072 
0073         return ret_map
0074 
0075     def get_k8s_namespace(self, panda_resource):
0076         default_namespace = "default"
0077 
0078         # 1. check if there is an associated CE and use the queue name as namespace
0079         panda_queue_dict = self.get(panda_resource, {})
0080         try:
0081             namespace = panda_queue_dict["queues"][0]["ce_queue_name"]
0082             return namespace
0083         except (KeyError, TypeError, IndexError, ValueError):
0084             pass
0085 
0086         # 2. alternatively, check if namespace defined in the associated parameter section
0087         key_namespace = "k8s.namespace"
0088         params = self.get_harvester_params(panda_resource)
0089 
0090         try:
0091             namespace = params[key_namespace]
0092         except KeyError:
0093             # return default value
0094             namespace = default_namespace
0095 
0096         return namespace
0097 
0098     def get_k8s_host_image(self, panda_resource):
0099         # check if host_image defined in the associated parameter section
0100         key_host_image = "k8s.host_image"
0101         params = self.get_harvester_params(panda_resource)
0102         host_image = params.get(key_host_image, None)
0103 
0104         return host_image
0105 
0106     def get_k8s_pilot_dir(self, panda_resource):
0107         # TODO: check if it can be replaced by an existing PQ field like tmpdir or wntmpdir
0108         # check if pilot_dir_mount defined in the associated parameter section
0109         key_pilot_dir = "k8s.volumes.pilot_dir_mount"
0110         params = self.get_harvester_params(panda_resource)
0111         pilot_dir = params.get(key_pilot_dir, "/pilotdir")
0112 
0113         return pilot_dir
0114 
0115     def get_k8s_annotations(self, panda_resource):
0116         # check if there are annotations to be specified
0117         key_safe_to_evict = "k8s.annotations.safe_to_evict"
0118 
0119         params = self.get_harvester_params(panda_resource)
0120         safe_to_evict = params.get(key_safe_to_evict, None)
0121 
0122         return safe_to_evict