File indexing completed on 2026-04-20 07:58:56
0001
0002
0003 import json
0004 import os
0005 import random
0006 import re
0007 import time
0008
0009 from kubernetes import client, config, watch
0010
0011 config.load_kube_config(config_file=os.environ.get("KUBECONFIG"))
0012 corev1 = client.CoreV1Api()
0013 scheduler_name = "atlas_scheduler"
0014
0015
0016 def node_allocatable_map(node_status_allocatable):
0017 cpu_str = node_status_allocatable["cpu"]
0018 memory_str = node_status_allocatable["memory"]
0019 mCpu = int(cpu_str) * 1000
0020 _m = re.match("^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$", memory_str)
0021 if _m is None:
0022 print("No memory allocatable in node")
0023 memoryKB = 0
0024 elif "M" in _m.group(2):
0025 memoryKB = int(float(_m.group(1)) * 2**10)
0026 elif "G" in _m.group(2):
0027 memoryKB = int(float(_m.group(1)) * 2**20)
0028 else:
0029 memoryKB = int(float(_m.group(1)))
0030 return {"mCpu": mCpu, "memoryKB": memoryKB}
0031
0032
0033 def get_mcpu(containers):
0034 mcpu_req = 0
0035 for c in containers:
0036 if hasattr(c.resources, "requests"):
0037 mcpu_req_str = c.resources.requests["cpu"]
0038 elif hasattr(c.resources, "limits"):
0039 mcpu_req_str = c.resources.limits["cpu"]
0040 else:
0041 mcpu_req_str = ""
0042 _m = re.match("^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$", mcpu_req_str)
0043 if _m is None:
0044 print("No cpu reources requests or limits specified")
0045 mcpu_req += 999
0046 elif _m.group(2) == "":
0047 mcpu_req += int(float(_m.group(1)) * 1000)
0048 elif _m.group(2) == "m":
0049 mcpu_req += int(float(_m.group(1)))
0050 else:
0051 print("Invalid cpu reources requests or limits specified")
0052 mcpu_req += 999
0053 return mcpu_req
0054
0055
0056 def get_allocated_resources(namespace="default"):
0057 node_allocated_resources_map = {}
0058 ret = corev1.list_namespaced_pod(namespace=namespace, field_selector="status.phase!=Succeeded,status.phase!=Failed")
0059 for i in ret.items:
0060
0061
0062
0063
0064
0065 nodeName = getattr(i.spec, "node_name", None)
0066 if nodeName is None:
0067 continue
0068 node_allocated_resources_map.setdefault(nodeName, {})
0069 node_allocated_resources_map[nodeName].setdefault("mCpu", 0)
0070 node_allocated_resources_map[nodeName]["mCpu"] += get_mcpu(i.spec.containers)
0071 return node_allocated_resources_map
0072
0073
0074 def nodes_available():
0075 allocated_resources_map = get_allocated_resources()
0076 ready_nodes = []
0077 for node in corev1.list_node().items:
0078 node_name = node.metadata.name
0079 for status in node.status.conditions:
0080 if status.status == "True" and status.type == "Ready":
0081 node_allocatable_dict = node_allocatable_map(node.status.allocatable)
0082 mcpu_available = node_allocatable_dict["mCpu"] - allocated_resources_map.get(node_name, {"mCpu": 0})["mCpu"]
0083 ready_nodes.append({"name": node_name, "mCpu": mcpu_available})
0084 ready_nodes = sorted(ready_nodes, key=(lambda x: x["mCpu"]))
0085 return ready_nodes
0086
0087
0088 def scheduler(name, node, namespace="default"):
0089 target = client.V1ObjectReference()
0090 target.kind = "Node"
0091 target.apiVersion = "corev1"
0092 target.name = node
0093 print("target", target)
0094 meta = client.V1ObjectMeta()
0095 meta.name = name
0096 body = client.V1Binding(metadata=meta, target=target)
0097 return corev1.create_namespaced_binding(namespace=namespace, body=body)
0098
0099
0100 def main():
0101 w = watch.Watch()
0102 while True:
0103 for event in w.stream(corev1.list_namespaced_pod, "default", timeout_seconds=30):
0104 pod = event["object"]
0105 if pod.status.phase == "Pending" and not pod.spec.node_name and pod.spec.scheduler_name == scheduler_name:
0106 for node_info in nodes_available():
0107 pod_mcpu_req = get_mcpu(pod.spec.containers)
0108 node_mcpu_free = node_info["mCpu"]
0109 to_bind = pod_mcpu_req <= node_mcpu_free
0110 print(f"Node {node_info['name']} has {node_mcpu_free} mcpu ; pod requests {pod_mcpu_req} mcpu ; to_bind: {to_bind}")
0111 if to_bind:
0112 try:
0113 print("Scheduling " + pod.metadata.name)
0114 res = scheduler(pod.metadata.name, node_info["name"])
0115 except ValueError as e:
0116 print("ValueError (maybe harmless):", e)
0117 except client.rest.ApiException as e:
0118 print(json.loads(e.body)["message"])
0119 finally:
0120 break
0121 time.sleep(2**-4)
0122
0123
0124 if __name__ == "__main__":
0125 main()