Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:56

0001 #!/usr/bin/env python
0002 
0003 import json
0004 import os
0005 import random
0006 import re
0007 import time
0008 
0009 from kubernetes import client, config, watch
0010 
0011 config.load_kube_config(config_file=os.environ.get("KUBECONFIG"))
0012 corev1 = client.CoreV1Api()
0013 scheduler_name = "atlas_scheduler"
0014 
0015 
0016 def node_allocatable_map(node_status_allocatable):
0017     cpu_str = node_status_allocatable["cpu"]
0018     memory_str = node_status_allocatable["memory"]
0019     mCpu = int(cpu_str) * 1000
0020     _m = re.match("^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$", memory_str)
0021     if _m is None:
0022         print("No memory allocatable in node")
0023         memoryKB = 0
0024     elif "M" in _m.group(2):
0025         memoryKB = int(float(_m.group(1)) * 2**10)
0026     elif "G" in _m.group(2):
0027         memoryKB = int(float(_m.group(1)) * 2**20)
0028     else:
0029         memoryKB = int(float(_m.group(1)))
0030     return {"mCpu": mCpu, "memoryKB": memoryKB}
0031 
0032 
0033 def get_mcpu(containers):
0034     mcpu_req = 0
0035     for c in containers:
0036         if hasattr(c.resources, "requests"):
0037             mcpu_req_str = c.resources.requests["cpu"]
0038         elif hasattr(c.resources, "limits"):
0039             mcpu_req_str = c.resources.limits["cpu"]
0040         else:
0041             mcpu_req_str = ""
0042         _m = re.match("^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$", mcpu_req_str)
0043         if _m is None:
0044             print("No cpu reources requests or limits specified")
0045             mcpu_req += 999
0046         elif _m.group(2) == "":
0047             mcpu_req += int(float(_m.group(1)) * 1000)
0048         elif _m.group(2) == "m":
0049             mcpu_req += int(float(_m.group(1)))
0050         else:
0051             print("Invalid cpu reources requests or limits specified")
0052             mcpu_req += 999
0053     return mcpu_req
0054 
0055 
0056 def get_allocated_resources(namespace="default"):
0057     node_allocated_resources_map = {}
0058     ret = corev1.list_namespaced_pod(namespace=namespace, field_selector="status.phase!=Succeeded,status.phase!=Failed")
0059     for i in ret.items:
0060         # pod_info = {}
0061         # pod_info['name'] = i.metadata.name
0062         # pod_info['status'] = i.status.phase
0063         # pod_info['status_reason'] = i.status.conditions[0].reason if i.status.conditions else None
0064         # pod_info['status_message'] = i.status.conditions[0].message if i.status.conditions else None
0065         nodeName = getattr(i.spec, "node_name", None)
0066         if nodeName is None:
0067             continue
0068         node_allocated_resources_map.setdefault(nodeName, {})
0069         node_allocated_resources_map[nodeName].setdefault("mCpu", 0)
0070         node_allocated_resources_map[nodeName]["mCpu"] += get_mcpu(i.spec.containers)
0071     return node_allocated_resources_map
0072 
0073 
0074 def nodes_available():
0075     allocated_resources_map = get_allocated_resources()
0076     ready_nodes = []
0077     for node in corev1.list_node().items:
0078         node_name = node.metadata.name
0079         for status in node.status.conditions:
0080             if status.status == "True" and status.type == "Ready":
0081                 node_allocatable_dict = node_allocatable_map(node.status.allocatable)
0082                 mcpu_available = node_allocatable_dict["mCpu"] - allocated_resources_map.get(node_name, {"mCpu": 0})["mCpu"]
0083                 ready_nodes.append({"name": node_name, "mCpu": mcpu_available})
0084     ready_nodes = sorted(ready_nodes, key=(lambda x: x["mCpu"]))
0085     return ready_nodes
0086 
0087 
0088 def scheduler(name, node, namespace="default"):
0089     target = client.V1ObjectReference()
0090     target.kind = "Node"
0091     target.apiVersion = "corev1"
0092     target.name = node
0093     print("target", target)
0094     meta = client.V1ObjectMeta()
0095     meta.name = name
0096     body = client.V1Binding(metadata=meta, target=target)
0097     return corev1.create_namespaced_binding(namespace=namespace, body=body)
0098 
0099 
0100 def main():
0101     w = watch.Watch()
0102     while True:
0103         for event in w.stream(corev1.list_namespaced_pod, "default", timeout_seconds=30):
0104             pod = event["object"]
0105             if pod.status.phase == "Pending" and not pod.spec.node_name and pod.spec.scheduler_name == scheduler_name:
0106                 for node_info in nodes_available():
0107                     pod_mcpu_req = get_mcpu(pod.spec.containers)
0108                     node_mcpu_free = node_info["mCpu"]
0109                     to_bind = pod_mcpu_req <= node_mcpu_free
0110                     print(f"Node {node_info['name']} has {node_mcpu_free} mcpu ; pod requests {pod_mcpu_req} mcpu ; to_bind: {to_bind}")
0111                     if to_bind:
0112                         try:
0113                             print("Scheduling " + pod.metadata.name)
0114                             res = scheduler(pod.metadata.name, node_info["name"])
0115                         except ValueError as e:
0116                             print("ValueError (maybe harmless):", e)
0117                         except client.rest.ApiException as e:
0118                             print(json.loads(e.body)["message"])
0119                         finally:
0120                             break
0121             time.sleep(2**-4)
0122 
0123 
0124 if __name__ == "__main__":
0125     main()