File indexing completed on 2026-04-20 07:58:59
0001 """
0002 utilities routines associated with Kubernetes python client
0003
0004 """
0005
0006 import base64
0007 import copy
0008 import os
0009 from urllib.parse import urlparse
0010
0011 import yaml
0012 from kubernetes import client, config
0013 from kubernetes.client.rest import ApiException
0014
0015 from pandaharvester.harvesterconfig import harvester_config
0016 from pandaharvester.harvestercore import core_utils
0017 from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
0018 from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s
0019 from pandaharvester.harvestersubmitter import submitter_common
0020
0021 base_logger = core_utils.setup_logger("k8s_utils")
0022
0023 CONFIG_DIR = "/scratch/jobconfig"
0024 EXEC_DIR = "/scratch/executables"
0025 GiB_TO_GB = 2**30 / 10.0**9
0026
0027
0028 DEF_COMMAND = ["/usr/bin/bash"]
0029 DEF_ARGS = ["-c", "cd; command -v python3 >/dev/null && python3 $EXEC_DIR/pilots_starter.py"]
0030 DEF_IMAGE = "atlasadc/atlas-grid-centos7"
0031
0032
0033 class k8s_Client(object):
0034 def __init__(self, namespace, config_file=None, queue_name=None):
0035 if not os.path.isfile(config_file):
0036 raise RuntimeError(f"Cannot find k8s config file: {config_file}")
0037 config.load_kube_config(config_file=config_file)
0038 self.corev1 = client.CoreV1Api()
0039 self.batchv1 = client.BatchV1Api()
0040 self.deletev1 = client.V1DeleteOptions(propagation_policy="Background")
0041
0042 self.panda_queues_dict = PandaQueuesDictK8s()
0043 self.namespace = namespace
0044 self.queue_name = queue_name
0045
0046 self.rt_mapper = ResourceTypeMapper()
0047
0048 def read_yaml_file(self, yaml_file):
0049 with open(yaml_file) as f:
0050 yaml_content = yaml.load(f, Loader=yaml.FullLoader)
0051
0052 return yaml_content
0053
0054 def create_job_from_yaml(
0055 self,
0056 yaml_content,
0057 work_spec,
0058 prod_source_label,
0059 pilot_type,
0060 pilot_url_str,
0061 pilot_python_option,
0062 pilot_version,
0063 host_image,
0064 cert,
0065 panda_token_path,
0066 panda_token_filename,
0067 panda_token_key_filename,
0068 max_time=None,
0069 ):
0070 tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="create_job_from_yaml")
0071
0072
0073 submit_mode = "PULL"
0074
0075
0076 worker_id = None
0077 if work_spec.mapType != "NoJob":
0078 submit_mode = "PUSH"
0079 worker_id = str(work_spec.workerID)
0080 res = self.create_configmap(work_spec)
0081 if not res:
0082 return res, "Failed to create a configmap"
0083
0084
0085 queue_name = self.panda_queues_dict.get_panda_queue_name(work_spec.computingSite)
0086
0087
0088 worker_name = yaml_content["metadata"]["name"] + "-" + str(work_spec.workerID)
0089 yaml_content["metadata"]["name"] = worker_name
0090
0091
0092 yaml_content["spec"]["template"].setdefault("metadata", {})
0093 yaml_content["spec"]["template"]["metadata"].update(
0094 {"labels": {"resourceType": str(work_spec.resourceType), "prodSourceLabel": str(prod_source_label), "pq": str(work_spec.computingSite)}}
0095 )
0096
0097
0098 safe_to_evict = self.panda_queues_dict.get_k8s_annotations(work_spec.computingSite)
0099 if safe_to_evict is False:
0100 yaml_content["spec"]["template"]["metadata"].update({"annotations": {"cluster-autoscaler.kubernetes.io/safe-to-evict": "false"}})
0101
0102
0103 yaml_containers = yaml_content["spec"]["template"]["spec"]["containers"]
0104 del yaml_containers[1 : len(yaml_containers)]
0105
0106 container_env = yaml_containers[0]
0107
0108 container_env.setdefault("resources", {})
0109
0110 if host_image:
0111 container_env["image"] = host_image
0112 elif "image" not in container_env:
0113 container_env["image"] = DEF_IMAGE
0114
0115 if "command" not in container_env:
0116 container_env["command"] = DEF_COMMAND
0117 container_env["args"] = DEF_ARGS
0118
0119
0120
0121
0122
0123 container_env.setdefault("resources", {})
0124 resource_settings = self.panda_queues_dict.get_k8s_resource_settings(work_spec.computingSite)
0125 pilot_dir = self.panda_queues_dict.get_k8s_pilot_dir(work_spec.computingSite)
0126
0127
0128 cpu_scheduling_ratio = resource_settings["cpu_scheduling_ratio"]
0129 if work_spec.nCore > 0:
0130
0131 container_env["resources"].setdefault("requests", {})
0132 if "cpu" not in container_env["resources"]["requests"]:
0133 container_env["resources"]["requests"]["cpu"] = str(work_spec.nCore * cpu_scheduling_ratio / 100.0)
0134
0135 container_env["resources"].setdefault("limits", {})
0136 if "cpu" not in container_env["resources"]["limits"]:
0137 container_env["resources"]["limits"]["cpu"] = str(work_spec.nCore)
0138
0139
0140 use_memory_limit = resource_settings["use_memory_limit"]
0141 memory_limit_safety_factor = resource_settings["memory_limit_safety_factor"]
0142 memory_limit_min_offset = resource_settings["memory_limit_min_offset"]
0143 memory_scheduling_ratio = resource_settings["memory_scheduling_ratio"]
0144
0145 if work_spec.minRamCount > 4:
0146
0147 container_env["resources"].setdefault("requests", {})
0148 if "memory" not in container_env["resources"]["requests"]:
0149 memory_request = str(work_spec.minRamCount * memory_scheduling_ratio / 100.0)
0150 container_env["resources"]["requests"]["memory"] = str(memory_request) + "Mi"
0151
0152
0153 if use_memory_limit:
0154 container_env["resources"].setdefault("limits", {})
0155 if "memory" not in container_env["resources"]["limits"]:
0156 mem_limit = max(work_spec.minRamCount + memory_limit_min_offset, work_spec.minRamCount * memory_limit_safety_factor / 100.0)
0157 container_env["resources"]["limits"]["memory"] = str(mem_limit) + "Mi"
0158
0159
0160 container_env.setdefault("env", [])
0161
0162
0163 use_ephemeral_storage = resource_settings["use_ephemeral_storage"]
0164 ephemeral_storage_offset_GiB = resource_settings["ephemeral_storage_offset"] / 1024
0165 ephemeral_storage_limit_safety_factor = resource_settings["ephemeral_storage_limit_safety_factor"]
0166
0167 if use_ephemeral_storage:
0168 maxwdir_prorated_GiB = self.panda_queues_dict.get_prorated_maxwdir_GiB(work_spec.computingSite, work_spec.nCore)
0169
0170 container_env["resources"].setdefault("requests", {})
0171 if "ephemeral-storage" not in container_env["resources"]["requests"]:
0172 eph_storage_request_GiB = maxwdir_prorated_GiB + ephemeral_storage_offset_GiB
0173 eph_storage_request_MiB = round(eph_storage_request_GiB * 1024, 2)
0174 container_env["resources"]["requests"]["ephemeral-storage"] = str(eph_storage_request_MiB) + "Mi"
0175 container_env["env"].append({"name": "storageRequestMiB", "value": str(eph_storage_request_MiB)})
0176
0177
0178 container_env["resources"].setdefault("limits", {})
0179 if "ephemeral-storage" not in container_env["resources"]["limits"]:
0180 eph_storage_limit_GiB = (maxwdir_prorated_GiB + ephemeral_storage_offset_GiB) * ephemeral_storage_limit_safety_factor / 100.0
0181 eph_storage_limit_MiB = round(eph_storage_limit_GiB * 1024, 2)
0182 container_env["resources"]["limits"]["ephemeral-storage"] = str(eph_storage_limit_MiB) + "Mi"
0183 container_env["env"].append({"name": "storageLimitMiB", "value": str(eph_storage_limit_MiB)})
0184
0185
0186 yaml_content["spec"]["template"]["spec"].setdefault("volumes", [])
0187 yaml_volumes = yaml_content["spec"]["template"]["spec"]["volumes"]
0188 exists = list(filter(lambda vol: vol["name"] == "pilot-dir", yaml_volumes))
0189 if not exists:
0190 yaml_volumes.append({"name": "pilot-dir", "emptyDir": {}})
0191
0192 container_env.setdefault("volumeMounts", [])
0193 exists = list(filter(lambda vol_mount: vol_mount["name"] == "pilot-dir", container_env["volumeMounts"]))
0194 if not exists:
0195 container_env["volumeMounts"].append({"name": "pilot-dir", "mountPath": pilot_dir})
0196
0197
0198 parsed_url = urlparse(work_spec.workAttributes["stdOut"])
0199 log_server = f"{parsed_url.scheme}://{parsed_url.hostname}:{parsed_url.port}"
0200
0201 logs_frontend_w = log_server + harvester_config.pandacon.pandaCacheURL_W_path
0202 logs_frontend_r = log_server + harvester_config.pandacon.pandaCacheURL_R_path
0203
0204 container_env["env"].extend(
0205 [
0206 {"name": "computingSite", "value": work_spec.computingSite},
0207 {"name": "pandaQueueName", "value": queue_name},
0208 {"name": "resourceType", "value": work_spec.resourceType},
0209 {"name": "pilotType", "value": pilot_type},
0210 {"name": "pilotUrlOpt", "value": pilot_url_str},
0211 {"name": "pythonOption", "value": pilot_python_option},
0212 {"name": "pilotVersion", "value": pilot_version},
0213 {"name": "proxySecretPath", "value": cert},
0214 {"name": "PANDA_AUTH_ORIGIN", "value": "atlas.pilot"},
0215 {"name": "PANDA_AUTH_DIR", "value": panda_token_path},
0216 {"name": "PANDA_AUTH_TOKEN", "value": panda_token_filename},
0217 {"name": "PANDA_AUTH_TOKEN_KEY", "value": panda_token_key_filename},
0218 {"name": "workerID", "value": str(work_spec.workerID)},
0219 {"name": "logs_frontend_w", "value": logs_frontend_w},
0220 {"name": "logs_frontend_r", "value": logs_frontend_r},
0221 {"name": "PANDA_JSID", "value": "harvester-" + harvester_config.master.harvester_id},
0222 {"name": "HARVESTER_WORKER_ID", "value": str(work_spec.workerID)},
0223 {"name": "HARVESTER_ID", "value": harvester_config.master.harvester_id},
0224 {"name": "submit_mode", "value": submit_mode},
0225 {"name": "EXEC_DIR", "value": EXEC_DIR},
0226 {"name": "TMPDIR", "value": pilot_dir},
0227 {"name": "HOME", "value": pilot_dir},
0228 {"name": "PANDA_HOSTNAME", "valueFrom": {"fieldRef": {"apiVersion": "v1", "fieldPath": "spec.nodeName"}}},
0229 {"name": "K8S_JOB_ID", "value": worker_name},
0230 {"name": "prodSourceLabel", "value": submitter_common.get_joblabel(prod_source_label)},
0231 {"name": "jobType", "value": submitter_common.get_pilot_job_type(work_spec.jobType)},
0232 ]
0233 )
0234
0235
0236 yaml_content["spec"]["template"]["spec"].setdefault("volumes", [])
0237 yaml_volumes = yaml_content["spec"]["template"]["spec"]["volumes"]
0238 yaml_volumes.append({"name": "pilots-starter", "configMap": {"name": "pilots-starter"}})
0239
0240 container_env.setdefault("volumeMounts", [])
0241 container_env["volumeMounts"].append({"name": "pilots-starter", "mountPath": EXEC_DIR})
0242
0243
0244 if submit_mode == "PUSH" and worker_id:
0245 yaml_content["spec"]["template"]["spec"].setdefault("volumes", [])
0246 yaml_volumes = yaml_content["spec"]["template"]["spec"]["volumes"]
0247 yaml_volumes.append({"name": "job-config", "configMap": {"name": worker_id}})
0248
0249 container_env.setdefault("volumeMounts", [])
0250 container_env["volumeMounts"].append({"name": "job-config", "mountPath": CONFIG_DIR})
0251
0252
0253 scheduling_settings = self.panda_queues_dict.get_k8s_scheduler_settings(work_spec.computingSite)
0254
0255 use_affinity = scheduling_settings["use_affinity"]
0256 use_anti_affinity = scheduling_settings["use_anti_affinity"]
0257 if (use_affinity or use_anti_affinity) and "affinity" not in yaml_content["spec"]["template"]["spec"]:
0258 yaml_content = self.set_affinity(yaml_content, use_affinity, use_anti_affinity)
0259
0260
0261 priority_class = None
0262
0263 priority_class_key = f"priority_class_{work_spec.resourceType.lower()}"
0264 priority_class_specific = scheduling_settings.get(priority_class_key, None)
0265
0266 priority_class_key = "priority_class"
0267 priority_class_general = scheduling_settings.get(priority_class_key, None)
0268
0269 if priority_class_specific:
0270 priority_class = priority_class_specific
0271 elif priority_class_general:
0272 priority_class = priority_class_general
0273
0274 if priority_class and "priorityClassName" not in yaml_content["spec"]["template"]["spec"]:
0275 yaml_content["spec"]["template"]["spec"]["priorityClassName"] = priority_class
0276
0277
0278 use_active_deadline_seconds = resource_settings["use_active_deadline_seconds"]
0279 if "activeDeadlineSeconds" not in yaml_content["spec"]["template"]["spec"] and use_active_deadline_seconds:
0280 if not max_time:
0281 max_time = 4 * 24 * 23600
0282 yaml_content["spec"]["template"]["spec"]["activeDeadlineSeconds"] = max_time
0283
0284 tmp_log.debug(f"creating job {yaml_content}")
0285
0286 rsp = self.batchv1.create_namespaced_job(body=yaml_content, namespace=self.namespace, _request_timeout=(5, 20))
0287 return rsp, yaml_content
0288
0289 def generate_ls_from_wsl(self, workspec_list=[]):
0290 """
0291 Generate the label selector string from a workspec list.
0292
0293 Args:
0294 workspec_list (list): A list of WorkSpec objects.
0295
0296 Returns:
0297 str: The generated label selector string.
0298 """
0299 if workspec_list:
0300 batch_ids_list = [workspec.batchID for workspec in workspec_list if workspec.batchID]
0301 batch_ids_concat = ",".join(batch_ids_list)
0302 return f"job-name in ({batch_ids_concat})"
0303
0304 return ""
0305
0306 def get_workers_info(self, workspec_list=[]):
0307 tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_workers_info")
0308 tmp_log.debug("start")
0309
0310 label_selector = self.generate_ls_from_wsl(workspec_list)
0311
0312
0313 pods_dict = self.get_pods_info(label_selector)
0314 if pods_dict is None:
0315 tmp_log.error("Communication failure to cluster. Stopping")
0316 return None
0317
0318
0319 jobs_dict = self.get_jobs_info(label_selector)
0320
0321
0322 workers_dict = {}
0323 for worker in workspec_list:
0324 worker_info = {}
0325 batch_id = worker.batchID
0326 if pods_dict and batch_id in pods_dict:
0327 worker_info.update(pods_dict[batch_id])
0328 if jobs_dict and batch_id in jobs_dict:
0329 worker_info.update(jobs_dict[batch_id])
0330 workers_dict[batch_id] = worker_info
0331
0332 tmp_log.debug("done")
0333 return workers_dict
0334
0335 def get_pods_info(self, label_selector):
0336
0337
0338
0339
0340 tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_pods_info")
0341
0342 try:
0343 pods = self.corev1.list_namespaced_pod(namespace=self.namespace, label_selector=label_selector, _request_timeout=(5, 20))
0344 except Exception as _e:
0345 tmp_log.error(f"Failed call to list_namespaced_pod with: {_e}")
0346 return None
0347
0348 pods_dict = {}
0349 for pod in pods.items:
0350 job_name = pod.metadata.labels["job-name"] if pod.metadata.labels and "job-name" in pod.metadata.labels else None
0351
0352
0353 pod_info = {
0354 "pod_name": pod.metadata.name,
0355 "pod_start_time": pod.status.start_time.replace(tzinfo=None) if pod.status.start_time else pod.status.start_time,
0356 "pod_status": pod.status.phase,
0357 "pod_status_conditions": pod.status.conditions,
0358 "pod_status_message": pod.status.message,
0359 "containers_state": [],
0360 "containers_exit_code": [],
0361 }
0362
0363
0364 if pod.status.container_statuses:
0365 for container_status in pod.status.container_statuses:
0366 if container_status.state:
0367 pod_info["containers_state"].append(container_status.state)
0368 exit_code = 0
0369 if container_status.state.terminated:
0370 exit_code = container_status.state.terminated.exit_code
0371 pod_info["containers_exit_code"].append(exit_code)
0372
0373 pods_dict[job_name] = pod_info
0374
0375 return pods_dict
0376
0377 def filter_pods_info(self, pods_list, job_name=None):
0378 if job_name:
0379 pods_list = [pod for pod in pods_list if pod["job_name"] == job_name]
0380 return pods_list
0381
0382 def get_jobs_info(self, label_selector):
0383 tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_jobs_info")
0384
0385 jobs_dict = {}
0386
0387 try:
0388 jobs = self.batchv1.list_namespaced_job(namespace=self.namespace, label_selector=label_selector, _request_timeout=(5, 20))
0389
0390 for job in jobs.items:
0391 name = job.metadata.name
0392 status = None
0393 status_reason = None
0394 status_message = None
0395 n_pods_succeeded = 0
0396 n_pods_failed = 0
0397 if job.status.conditions:
0398 status = job.status.conditions[0].type
0399 status_reason = job.status.conditions[0].reason
0400 status_message = job.status.conditions[0].message
0401 n_pods_succeeded = job.status.succeeded
0402 n_pods_failed = job.status.failed
0403
0404 job_info = {
0405 "job_status": status,
0406 "job_status_reason": status_reason,
0407 "job_status_message": status_message,
0408 "n_pods_succeeded": n_pods_succeeded,
0409 "n_pods_failed": n_pods_failed,
0410 }
0411 jobs_dict[name] = job_info
0412 except Exception as _e:
0413 tmp_log.error(f"Failed call to list_namespaced_job with: {_e}")
0414
0415 return jobs_dict
0416
0417 def delete_pods(self, pod_name_list):
0418 tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="delete_pods")
0419
0420 tmp_log.debug(f"Going to delete {len(pod_name_list)} PODs: {pod_name_list}")
0421
0422 ret_list = list()
0423
0424 for pod_name in pod_name_list:
0425 rsp = {"name": pod_name}
0426 try:
0427 self.corev1.delete_namespaced_pod(name=pod_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20))
0428 except ApiException as _e:
0429 rsp["errMsg"] = "" if _e.status == 404 else _e.reason
0430 except Exception as _e:
0431 rsp["errMsg"] = _e.reason
0432 else:
0433 rsp["errMsg"] = ""
0434 ret_list.append(rsp)
0435
0436 tmp_log.debug(f"Done with: {ret_list}")
0437 return ret_list
0438
0439 def delete_job(self, job_name):
0440 tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name} job_name={job_name}", method_name="delete_job")
0441 tmp_log.debug(f"Going to delete JOB {job_name}")
0442 try:
0443 self.batchv1.delete_namespaced_job(name=job_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20))
0444 tmp_log.debug(f"Deleted JOB {job_name}")
0445 except Exception as _e:
0446 tmp_log.error(f"Failed to delete JOB {job_name} with: {_e}")
0447
0448 def delete_config_map(self, config_map_name):
0449 self.corev1.delete_namespaced_config_map(
0450 name=config_map_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20)
0451 )
0452
0453 def set_affinity(self, yaml_content, use_affinity, use_anti_affinity):
0454 if not use_affinity and not use_anti_affinity:
0455
0456 return yaml_content
0457
0458 yaml_content["spec"]["template"]["spec"]["affinity"] = {}
0459 yaml_affinity = yaml_content["spec"]["template"]["spec"]["affinity"]
0460
0461 single_core_resource_types = self.rt_mapper.get_single_core_resource_types()
0462 multi_core_resource_types = self.rt_mapper.get_multi_core_resource_types()
0463 all_resource_types = self.rt_mapper.get_all_resource_types()
0464
0465
0466 anti_affinity_matrix = {}
0467 for tmp_type in single_core_resource_types:
0468 anti_affinity_matrix[tmp_type] = multi_core_resource_types
0469 for tmp_type in multi_core_resource_types:
0470 anti_affinity_matrix[tmp_type] = single_core_resource_types
0471
0472
0473 affinity_spec = {
0474 "preferredDuringSchedulingIgnoredDuringExecution": [
0475 {
0476 "weight": 100,
0477 "podAffinityTerm": {
0478 "labelSelector": {"matchExpressions": [{"key": "resourceType", "operator": "In", "values": all_resource_types}]},
0479 "topologyKey": "kubernetes.io/hostname",
0480 },
0481 }
0482 ]
0483 }
0484
0485 resource_type = yaml_content["spec"]["template"]["metadata"]["labels"]["resourceType"]
0486
0487 if use_affinity and resource_type in single_core_resource_types:
0488
0489 yaml_affinity["podAffinity"] = copy.deepcopy(affinity_spec)
0490
0491 if use_anti_affinity:
0492
0493
0494 yaml_affinity["podAntiAffinity"] = copy.deepcopy(affinity_spec)
0495 yaml_affinity["podAntiAffinity"]["preferredDuringSchedulingIgnoredDuringExecution"][0]["podAffinityTerm"]["labelSelector"]["matchExpressions"][0][
0496 "values"
0497 ] = anti_affinity_matrix[resource_type]
0498
0499 return yaml_content
0500
0501 def create_or_patch_secret(self, file_list, secret_name):
0502
0503
0504
0505 rsp = None
0506 tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="create_or_patch_secret")
0507
0508 metadata = {"name": secret_name, "namespace": self.namespace}
0509 data = {}
0510 for file_name in file_list:
0511 filename = os.path.basename(file_name)
0512 with open(file_name, "rb") as f:
0513 content = f.read()
0514 data[filename] = base64.b64encode(content).decode()
0515 body = client.V1Secret(data=data, metadata=metadata)
0516 try:
0517 try:
0518 rsp = self.corev1.patch_namespaced_secret(name=secret_name, body=body, namespace=self.namespace, _request_timeout=(5, 20))
0519 tmp_log.debug("Patched secret")
0520 except ApiException as e:
0521 tmp_log.debug(f"Exception when patching secret: {e} . Try to create secret instead...")
0522 rsp = self.corev1.create_namespaced_secret(body=body, namespace=self.namespace, _request_timeout=(5, 20))
0523 tmp_log.debug("Created secret")
0524 except Exception as e:
0525 tmp_log.error(f"Exception when patching or creating secret: {e}.")
0526 return rsp
0527
0528 def create_configmap(self, work_spec):
0529
0530
0531 tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="create_configmap")
0532
0533 try:
0534 worker_id = str(work_spec.workerID)
0535
0536
0537 access_point = work_spec.get_access_point()
0538 pjd = "pandaJobData.out"
0539 job_data_file = os.path.join(access_point, pjd)
0540 with open(job_data_file) as f:
0541 job_data_contents = f.read()
0542
0543 pfc = "PoolFileCatalog_H.xml"
0544 pool_file_catalog_file = os.path.join(access_point, pfc)
0545 with open(pool_file_catalog_file) as f:
0546 pool_file_catalog_contents = f.read()
0547
0548
0549 data = {pjd: job_data_contents, pfc: pool_file_catalog_contents}
0550
0551
0552 metadata = {"name": worker_id, "namespace": self.namespace}
0553 config_map = client.V1ConfigMap(api_version="v1", kind="ConfigMap", data=data, metadata=metadata)
0554
0555
0556 api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map, _request_timeout=(5, 20))
0557 tmp_log.debug(f"Created configmap for worker id: {worker_id}")
0558 return True
0559
0560 except Exception as e:
0561 tmp_log.error(f"Could not create configmap with: {e}")
0562 return False
0563
0564 def create_or_patch_configmap_starter(self):
0565
0566
0567 tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="create_or_patch_configmap_starter")
0568
0569 try:
0570 fn = "pilots_starter.py"
0571 dirname = os.path.dirname(__file__)
0572 pilots_starter_file = os.path.join(dirname, f"../harvestercloud/{fn}")
0573 with open(pilots_starter_file) as f:
0574 pilots_starter_contents = f.read()
0575
0576 data = {fn: pilots_starter_contents}
0577 name = "pilots-starter"
0578
0579
0580 metadata = {"name": name, "namespace": self.namespace}
0581 config_map = client.V1ConfigMap(api_version="v1", kind="ConfigMap", data=data, metadata=metadata)
0582
0583 try:
0584 api_response = self.corev1.patch_namespaced_config_map(name=name, body=config_map, namespace=self.namespace, _request_timeout=(5, 20))
0585 tmp_log.debug("Patched pilots-starter config_map")
0586 except ApiException as e:
0587 tmp_log.debug(f"Exception when patching pilots-starter config_map: {e} . Try to create it instead...")
0588 api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map, _request_timeout=(5, 20))
0589 tmp_log.debug("Created pilots-starter config_map")
0590 return True
0591
0592 except Exception as e:
0593 tmp_log.error(f"Could not create configmap with: {e}")
0594 return False
0595
0596 def get_pod_logs(self, pod_name, previous=False):
0597 tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_pod_logs")
0598 try:
0599 rsp = self.corev1.read_namespaced_pod_log(name=pod_name, namespace=self.namespace, previous=previous, _request_timeout=(5, 20))
0600 tmp_log.debug(f"Log file retrieved for {pod_name}")
0601 except Exception as e:
0602 tmp_log.debug(f"Exception when getting logs for pod {pod_name} : {e}. Skipped")
0603 raise
0604 else:
0605 return rsp