Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 """
0002 utilities routines associated with Kubernetes python client
0003 
0004 """
0005 
0006 import base64
0007 import copy
0008 import os
0009 from urllib.parse import urlparse
0010 
0011 import yaml
0012 from kubernetes import client, config
0013 from kubernetes.client.rest import ApiException
0014 
0015 from pandaharvester.harvesterconfig import harvester_config
0016 from pandaharvester.harvestercore import core_utils
0017 from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
0018 from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s
0019 from pandaharvester.harvestersubmitter import submitter_common
0020 
0021 base_logger = core_utils.setup_logger("k8s_utils")
0022 
0023 CONFIG_DIR = "/scratch/jobconfig"
0024 EXEC_DIR = "/scratch/executables"
0025 GiB_TO_GB = 2**30 / 10.0**9
0026 
0027 # command and image defaults
0028 DEF_COMMAND = ["/usr/bin/bash"]
0029 DEF_ARGS = ["-c", "cd; command -v python3 >/dev/null && python3 $EXEC_DIR/pilots_starter.py"]
0030 DEF_IMAGE = "atlasadc/atlas-grid-centos7"
0031 
0032 
0033 class k8s_Client(object):
0034     def __init__(self, namespace, config_file=None, queue_name=None):
0035         if not os.path.isfile(config_file):
0036             raise RuntimeError(f"Cannot find k8s config file: {config_file}")
0037         config.load_kube_config(config_file=config_file)
0038         self.corev1 = client.CoreV1Api()
0039         self.batchv1 = client.BatchV1Api()
0040         self.deletev1 = client.V1DeleteOptions(propagation_policy="Background")
0041 
0042         self.panda_queues_dict = PandaQueuesDictK8s()
0043         self.namespace = namespace
0044         self.queue_name = queue_name
0045 
0046         self.rt_mapper = ResourceTypeMapper()
0047 
0048     def read_yaml_file(self, yaml_file):
0049         with open(yaml_file) as f:
0050             yaml_content = yaml.load(f, Loader=yaml.FullLoader)
0051 
0052         return yaml_content
0053 
0054     def create_job_from_yaml(
0055         self,
0056         yaml_content,
0057         work_spec,
0058         prod_source_label,
0059         pilot_type,
0060         pilot_url_str,
0061         pilot_python_option,
0062         pilot_version,
0063         host_image,
0064         cert,
0065         panda_token_path,
0066         panda_token_filename,
0067         panda_token_key_filename,
0068         max_time=None,
0069     ):
0070         tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="create_job_from_yaml")
0071 
0072         # consider PULL mode as default, unless specified
0073         submit_mode = "PULL"
0074 
0075         # create the configmap in push mode
0076         worker_id = None
0077         if work_spec.mapType != "NoJob":
0078             submit_mode = "PUSH"
0079             worker_id = str(work_spec.workerID)
0080             res = self.create_configmap(work_spec)
0081             if not res:  # if the configmap creation failed, don't submit a job because the pod creation will hang
0082                 return res, "Failed to create a configmap"
0083 
0084         # retrieve panda queue information
0085         queue_name = self.panda_queues_dict.get_panda_queue_name(work_spec.computingSite)
0086 
0087         # set the worker name
0088         worker_name = yaml_content["metadata"]["name"] + "-" + str(work_spec.workerID)  # this will be the batch id later on
0089         yaml_content["metadata"]["name"] = worker_name
0090 
0091         # set the resource type and other metadata to filter the pods
0092         yaml_content["spec"]["template"].setdefault("metadata", {})
0093         yaml_content["spec"]["template"]["metadata"].update(
0094             {"labels": {"resourceType": str(work_spec.resourceType), "prodSourceLabel": str(prod_source_label), "pq": str(work_spec.computingSite)}}
0095         )
0096 
0097         # this flag should be respected by the k8s autoscaler not relocate (kill) the job during a scale down
0098         safe_to_evict = self.panda_queues_dict.get_k8s_annotations(work_spec.computingSite)
0099         if safe_to_evict is False:
0100             yaml_content["spec"]["template"]["metadata"].update({"annotations": {"cluster-autoscaler.kubernetes.io/safe-to-evict": "false"}})
0101 
0102         # fill the container details. we can only handle one container (take the first, delete the rest)
0103         yaml_containers = yaml_content["spec"]["template"]["spec"]["containers"]
0104         del yaml_containers[1 : len(yaml_containers)]
0105 
0106         container_env = yaml_containers[0]
0107 
0108         container_env.setdefault("resources", {})
0109         # set the container image
0110         if host_image:  # images defined in CRIC have absolute preference
0111             container_env["image"] = host_image
0112         elif "image" not in container_env:  # take default image only if not defined in yaml template
0113             container_env["image"] = DEF_IMAGE
0114 
0115         if "command" not in container_env:
0116             container_env["command"] = DEF_COMMAND
0117             container_env["args"] = DEF_ARGS
0118 
0119         # set the resources (CPU and memory) we need for the container
0120         # note that predefined values in the yaml template will NOT be overwritten
0121         # Be familiar with QoS classes: https://kubernetes.io/docs/tasks/configure-pod-container/quality-service-pod
0122         # The CPU & memory settings will affect the QoS for the pod
0123         container_env.setdefault("resources", {})
0124         resource_settings = self.panda_queues_dict.get_k8s_resource_settings(work_spec.computingSite)
0125         pilot_dir = self.panda_queues_dict.get_k8s_pilot_dir(work_spec.computingSite)
0126 
0127         # CPU resources
0128         cpu_scheduling_ratio = resource_settings["cpu_scheduling_ratio"]
0129         if work_spec.nCore > 0:
0130             # CPU requests
0131             container_env["resources"].setdefault("requests", {})
0132             if "cpu" not in container_env["resources"]["requests"]:
0133                 container_env["resources"]["requests"]["cpu"] = str(work_spec.nCore * cpu_scheduling_ratio / 100.0)
0134             # CPU limits
0135             container_env["resources"].setdefault("limits", {})
0136             if "cpu" not in container_env["resources"]["limits"]:
0137                 container_env["resources"]["limits"]["cpu"] = str(work_spec.nCore)
0138 
0139         # Memory resources
0140         use_memory_limit = resource_settings["use_memory_limit"]
0141         memory_limit_safety_factor = resource_settings["memory_limit_safety_factor"]
0142         memory_limit_min_offset = resource_settings["memory_limit_min_offset"]
0143         memory_scheduling_ratio = resource_settings["memory_scheduling_ratio"]
0144 
0145         if work_spec.minRamCount > 4:  # K8S minimum memory limit = 4 MB
0146             # memory requests
0147             container_env["resources"].setdefault("requests", {})
0148             if "memory" not in container_env["resources"]["requests"]:
0149                 memory_request = str(work_spec.minRamCount * memory_scheduling_ratio / 100.0)
0150                 container_env["resources"]["requests"]["memory"] = str(memory_request) + "Mi"
0151             # memory limits: kubernetes is very aggressive killing jobs due to memory, hence making this field optional
0152             # and adding configuration possibilities to add a safety factor
0153             if use_memory_limit:
0154                 container_env["resources"].setdefault("limits", {})
0155                 if "memory" not in container_env["resources"]["limits"]:
0156                     mem_limit = max(work_spec.minRamCount + memory_limit_min_offset, work_spec.minRamCount * memory_limit_safety_factor / 100.0)
0157                     container_env["resources"]["limits"]["memory"] = str(mem_limit) + "Mi"
0158 
0159         # Environment variables
0160         container_env.setdefault("env", [])
0161 
0162         # Ephemeral storage resources
0163         use_ephemeral_storage = resource_settings["use_ephemeral_storage"]
0164         ephemeral_storage_offset_GiB = resource_settings["ephemeral_storage_offset"] / 1024
0165         ephemeral_storage_limit_safety_factor = resource_settings["ephemeral_storage_limit_safety_factor"]
0166 
0167         if use_ephemeral_storage:
0168             maxwdir_prorated_GiB = self.panda_queues_dict.get_prorated_maxwdir_GiB(work_spec.computingSite, work_spec.nCore)
0169             # ephemeral storage requests
0170             container_env["resources"].setdefault("requests", {})
0171             if "ephemeral-storage" not in container_env["resources"]["requests"]:
0172                 eph_storage_request_GiB = maxwdir_prorated_GiB + ephemeral_storage_offset_GiB
0173                 eph_storage_request_MiB = round(eph_storage_request_GiB * 1024, 2)
0174                 container_env["resources"]["requests"]["ephemeral-storage"] = str(eph_storage_request_MiB) + "Mi"
0175                 container_env["env"].append({"name": "storageRequestMiB", "value": str(eph_storage_request_MiB)})
0176 
0177             # ephemeral storage limits
0178             container_env["resources"].setdefault("limits", {})
0179             if "ephemeral-storage" not in container_env["resources"]["limits"]:
0180                 eph_storage_limit_GiB = (maxwdir_prorated_GiB + ephemeral_storage_offset_GiB) * ephemeral_storage_limit_safety_factor / 100.0
0181                 eph_storage_limit_MiB = round(eph_storage_limit_GiB * 1024, 2)
0182                 container_env["resources"]["limits"]["ephemeral-storage"] = str(eph_storage_limit_MiB) + "Mi"
0183                 container_env["env"].append({"name": "storageLimitMiB", "value": str(eph_storage_limit_MiB)})
0184 
0185             # add the ephemeral storage and mount it on pilot_dir
0186             yaml_content["spec"]["template"]["spec"].setdefault("volumes", [])
0187             yaml_volumes = yaml_content["spec"]["template"]["spec"]["volumes"]
0188             exists = list(filter(lambda vol: vol["name"] == "pilot-dir", yaml_volumes))
0189             if not exists:
0190                 yaml_volumes.append({"name": "pilot-dir", "emptyDir": {}})
0191 
0192             container_env.setdefault("volumeMounts", [])
0193             exists = list(filter(lambda vol_mount: vol_mount["name"] == "pilot-dir", container_env["volumeMounts"]))
0194             if not exists:
0195                 container_env["volumeMounts"].append({"name": "pilot-dir", "mountPath": pilot_dir})
0196 
0197         # setting up the paths for writing and reading logs
0198         parsed_url = urlparse(work_spec.workAttributes["stdOut"])
0199         log_server = f"{parsed_url.scheme}://{parsed_url.hostname}:{parsed_url.port}"
0200 
0201         logs_frontend_w = log_server + harvester_config.pandacon.pandaCacheURL_W_path
0202         logs_frontend_r = log_server + harvester_config.pandacon.pandaCacheURL_R_path
0203 
0204         container_env["env"].extend(
0205             [
0206                 {"name": "computingSite", "value": work_spec.computingSite},
0207                 {"name": "pandaQueueName", "value": queue_name},
0208                 {"name": "resourceType", "value": work_spec.resourceType},
0209                 {"name": "pilotType", "value": pilot_type},
0210                 {"name": "pilotUrlOpt", "value": pilot_url_str},
0211                 {"name": "pythonOption", "value": pilot_python_option},
0212                 {"name": "pilotVersion", "value": pilot_version},
0213                 {"name": "proxySecretPath", "value": cert},
0214                 {"name": "PANDA_AUTH_ORIGIN", "value": "atlas.pilot"},
0215                 {"name": "PANDA_AUTH_DIR", "value": panda_token_path},
0216                 {"name": "PANDA_AUTH_TOKEN", "value": panda_token_filename},
0217                 {"name": "PANDA_AUTH_TOKEN_KEY", "value": panda_token_key_filename},
0218                 {"name": "workerID", "value": str(work_spec.workerID)},
0219                 {"name": "logs_frontend_w", "value": logs_frontend_w},
0220                 {"name": "logs_frontend_r", "value": logs_frontend_r},
0221                 {"name": "PANDA_JSID", "value": "harvester-" + harvester_config.master.harvester_id},
0222                 {"name": "HARVESTER_WORKER_ID", "value": str(work_spec.workerID)},
0223                 {"name": "HARVESTER_ID", "value": harvester_config.master.harvester_id},
0224                 {"name": "submit_mode", "value": submit_mode},
0225                 {"name": "EXEC_DIR", "value": EXEC_DIR},
0226                 {"name": "TMPDIR", "value": pilot_dir},
0227                 {"name": "HOME", "value": pilot_dir},
0228                 {"name": "PANDA_HOSTNAME", "valueFrom": {"fieldRef": {"apiVersion": "v1", "fieldPath": "spec.nodeName"}}},
0229                 {"name": "K8S_JOB_ID", "value": worker_name},
0230                 {"name": "prodSourceLabel", "value": submitter_common.get_joblabel(prod_source_label)},
0231                 {"name": "jobType", "value": submitter_common.get_pilot_job_type(work_spec.jobType)},
0232             ]
0233         )
0234 
0235         # add the pilots starter configmap
0236         yaml_content["spec"]["template"]["spec"].setdefault("volumes", [])
0237         yaml_volumes = yaml_content["spec"]["template"]["spec"]["volumes"]
0238         yaml_volumes.append({"name": "pilots-starter", "configMap": {"name": "pilots-starter"}})
0239         # mount the volume to the filesystem
0240         container_env.setdefault("volumeMounts", [])
0241         container_env["volumeMounts"].append({"name": "pilots-starter", "mountPath": EXEC_DIR})
0242 
0243         # in push mode, add the configmap as a volume to the pod
0244         if submit_mode == "PUSH" and worker_id:
0245             yaml_content["spec"]["template"]["spec"].setdefault("volumes", [])
0246             yaml_volumes = yaml_content["spec"]["template"]["spec"]["volumes"]
0247             yaml_volumes.append({"name": "job-config", "configMap": {"name": worker_id}})
0248             # mount the volume to the filesystem
0249             container_env.setdefault("volumeMounts", [])
0250             container_env["volumeMounts"].append({"name": "job-config", "mountPath": CONFIG_DIR})
0251 
0252         # set the affinity
0253         scheduling_settings = self.panda_queues_dict.get_k8s_scheduler_settings(work_spec.computingSite)
0254 
0255         use_affinity = scheduling_settings["use_affinity"]
0256         use_anti_affinity = scheduling_settings["use_anti_affinity"]
0257         if (use_affinity or use_anti_affinity) and "affinity" not in yaml_content["spec"]["template"]["spec"]:
0258             yaml_content = self.set_affinity(yaml_content, use_affinity, use_anti_affinity)
0259 
0260         # set the priority classes. Specific priority classes have precedence over general priority classes
0261         priority_class = None
0262 
0263         priority_class_key = f"priority_class_{work_spec.resourceType.lower()}"
0264         priority_class_specific = scheduling_settings.get(priority_class_key, None)
0265 
0266         priority_class_key = "priority_class"
0267         priority_class_general = scheduling_settings.get(priority_class_key, None)
0268 
0269         if priority_class_specific:
0270             priority_class = priority_class_specific
0271         elif priority_class_general:
0272             priority_class = priority_class_general
0273 
0274         if priority_class and "priorityClassName" not in yaml_content["spec"]["template"]["spec"]:
0275             yaml_content["spec"]["template"]["spec"]["priorityClassName"] = priority_class
0276 
0277         # set max_time to avoid having a pod running forever
0278         use_active_deadline_seconds = resource_settings["use_active_deadline_seconds"]
0279         if "activeDeadlineSeconds" not in yaml_content["spec"]["template"]["spec"] and use_active_deadline_seconds:
0280             if not max_time:  # 4 days
0281                 max_time = 4 * 24 * 23600
0282             yaml_content["spec"]["template"]["spec"]["activeDeadlineSeconds"] = max_time
0283 
0284         tmp_log.debug(f"creating job {yaml_content}")
0285 
0286         rsp = self.batchv1.create_namespaced_job(body=yaml_content, namespace=self.namespace, _request_timeout=(5, 20))
0287         return rsp, yaml_content
0288 
0289     def generate_ls_from_wsl(self, workspec_list=[]):
0290         """
0291         Generate the label selector string from a workspec list.
0292 
0293         Args:
0294             workspec_list (list): A list of WorkSpec objects.
0295 
0296         Returns:
0297             str: The generated label selector string.
0298         """
0299         if workspec_list:
0300             batch_ids_list = [workspec.batchID for workspec in workspec_list if workspec.batchID]
0301             batch_ids_concat = ",".join(batch_ids_list)
0302             return f"job-name in ({batch_ids_concat})"
0303 
0304         return ""
0305 
0306     def get_workers_info(self, workspec_list=[]):
0307         tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_workers_info")
0308         tmp_log.debug("start")
0309 
0310         label_selector = self.generate_ls_from_wsl(workspec_list)
0311 
0312         # get detailed information for available pods
0313         pods_dict = self.get_pods_info(label_selector)
0314         if pods_dict is None:  # communication failure to the cluster
0315             tmp_log.error("Communication failure to cluster. Stopping")
0316             return None
0317 
0318         # complement pod information with coarse job information
0319         jobs_dict = self.get_jobs_info(label_selector)
0320 
0321         # combine the pod and job information
0322         workers_dict = {}
0323         for worker in workspec_list:
0324             worker_info = {}
0325             batch_id = worker.batchID  # batch ID is used as the job name
0326             if pods_dict and batch_id in pods_dict:
0327                 worker_info.update(pods_dict[batch_id])
0328             if jobs_dict and batch_id in jobs_dict:
0329                 worker_info.update(jobs_dict[batch_id])
0330             workers_dict[batch_id] = worker_info
0331 
0332         tmp_log.debug("done")
0333         return workers_dict
0334 
0335     def get_pods_info(self, label_selector):
0336         # Monitoring at pod level provides much more information than at job level
0337         # We use job information in case the pod has been deleted (e.g. in Google bulk exercises), because the job
0338         # should persist up the TTL.
0339 
0340         tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_pods_info")
0341 
0342         try:
0343             pods = self.corev1.list_namespaced_pod(namespace=self.namespace, label_selector=label_selector, _request_timeout=(5, 20))
0344         except Exception as _e:
0345             tmp_log.error(f"Failed call to list_namespaced_pod with: {_e}")
0346             return None  # None needs to be treated differently than [] by the caller
0347 
0348         pods_dict = {}
0349         for pod in pods.items:
0350             job_name = pod.metadata.labels["job-name"] if pod.metadata.labels and "job-name" in pod.metadata.labels else None
0351 
0352             # pod information
0353             pod_info = {
0354                 "pod_name": pod.metadata.name,
0355                 "pod_start_time": pod.status.start_time.replace(tzinfo=None) if pod.status.start_time else pod.status.start_time,
0356                 "pod_status": pod.status.phase,
0357                 "pod_status_conditions": pod.status.conditions,
0358                 "pod_status_message": pod.status.message,
0359                 "containers_state": [],
0360                 "containers_exit_code": [],
0361             }
0362 
0363             # sub-container information
0364             if pod.status.container_statuses:
0365                 for container_status in pod.status.container_statuses:
0366                     if container_status.state:
0367                         pod_info["containers_state"].append(container_status.state)
0368                         exit_code = 0
0369                         if container_status.state.terminated:
0370                             exit_code = container_status.state.terminated.exit_code
0371                         pod_info["containers_exit_code"].append(exit_code)
0372 
0373             pods_dict[job_name] = pod_info
0374 
0375         return pods_dict
0376 
0377     def filter_pods_info(self, pods_list, job_name=None):
0378         if job_name:
0379             pods_list = [pod for pod in pods_list if pod["job_name"] == job_name]
0380         return pods_list
0381 
0382     def get_jobs_info(self, label_selector):
0383         tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_jobs_info")
0384 
0385         jobs_dict = {}
0386 
0387         try:
0388             jobs = self.batchv1.list_namespaced_job(namespace=self.namespace, label_selector=label_selector, _request_timeout=(5, 20))
0389 
0390             for job in jobs.items:
0391                 name = job.metadata.name
0392                 status = None
0393                 status_reason = None
0394                 status_message = None
0395                 n_pods_succeeded = 0
0396                 n_pods_failed = 0
0397                 if job.status.conditions:  # is only set when a pod started running
0398                     status = job.status.conditions[0].type
0399                     status_reason = job.status.conditions[0].reason
0400                     status_message = job.status.conditions[0].message
0401                     n_pods_succeeded = job.status.succeeded
0402                     n_pods_failed = job.status.failed
0403 
0404                 job_info = {
0405                     "job_status": status,
0406                     "job_status_reason": status_reason,
0407                     "job_status_message": status_message,
0408                     "n_pods_succeeded": n_pods_succeeded,
0409                     "n_pods_failed": n_pods_failed,
0410                 }
0411                 jobs_dict[name] = job_info
0412         except Exception as _e:
0413             tmp_log.error(f"Failed call to list_namespaced_job with: {_e}")
0414 
0415         return jobs_dict
0416 
0417     def delete_pods(self, pod_name_list):
0418         tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="delete_pods")
0419 
0420         tmp_log.debug(f"Going to delete {len(pod_name_list)} PODs: {pod_name_list}")
0421 
0422         ret_list = list()
0423 
0424         for pod_name in pod_name_list:
0425             rsp = {"name": pod_name}
0426             try:
0427                 self.corev1.delete_namespaced_pod(name=pod_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20))
0428             except ApiException as _e:
0429                 rsp["errMsg"] = "" if _e.status == 404 else _e.reason
0430             except Exception as _e:
0431                 rsp["errMsg"] = _e.reason
0432             else:
0433                 rsp["errMsg"] = ""
0434             ret_list.append(rsp)
0435 
0436         tmp_log.debug(f"Done with: {ret_list}")
0437         return ret_list
0438 
0439     def delete_job(self, job_name):
0440         tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name} job_name={job_name}", method_name="delete_job")
0441         tmp_log.debug(f"Going to delete JOB {job_name}")
0442         try:
0443             self.batchv1.delete_namespaced_job(name=job_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20))
0444             tmp_log.debug(f"Deleted JOB {job_name}")
0445         except Exception as _e:
0446             tmp_log.error(f"Failed to delete JOB {job_name} with: {_e}")
0447 
0448     def delete_config_map(self, config_map_name):
0449         self.corev1.delete_namespaced_config_map(
0450             name=config_map_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20)
0451         )
0452 
0453     def set_affinity(self, yaml_content, use_affinity, use_anti_affinity):
0454         if not use_affinity and not use_anti_affinity:
0455             # we are not supposed to use any affinity setting for this queue
0456             return yaml_content
0457 
0458         yaml_content["spec"]["template"]["spec"]["affinity"] = {}
0459         yaml_affinity = yaml_content["spec"]["template"]["spec"]["affinity"]
0460 
0461         single_core_resource_types = self.rt_mapper.get_single_core_resource_types()
0462         multi_core_resource_types = self.rt_mapper.get_multi_core_resource_types()
0463         all_resource_types = self.rt_mapper.get_all_resource_types()
0464 
0465         # create the anti-affinity matrix for higher single and multi core separation
0466         anti_affinity_matrix = {}
0467         for tmp_type in single_core_resource_types:
0468             anti_affinity_matrix[tmp_type] = multi_core_resource_types
0469         for tmp_type in multi_core_resource_types:
0470             anti_affinity_matrix[tmp_type] = single_core_resource_types
0471 
0472         # create the affinity spec
0473         affinity_spec = {
0474             "preferredDuringSchedulingIgnoredDuringExecution": [
0475                 {
0476                     "weight": 100,
0477                     "podAffinityTerm": {
0478                         "labelSelector": {"matchExpressions": [{"key": "resourceType", "operator": "In", "values": all_resource_types}]},
0479                         "topologyKey": "kubernetes.io/hostname",
0480                     },
0481                 }
0482             ]
0483         }
0484 
0485         resource_type = yaml_content["spec"]["template"]["metadata"]["labels"]["resourceType"]
0486 
0487         if use_affinity and resource_type in single_core_resource_types:
0488             # resource type SCORE* should attract each other instead of spreading across the nodes
0489             yaml_affinity["podAffinity"] = copy.deepcopy(affinity_spec)
0490 
0491         if use_anti_affinity:
0492             # SCORE* will repel MCORE* and viceversa. The main reasoning was to keep nodes for MCORE
0493             # This setting depends on the size of the node vs the MCORE job
0494             yaml_affinity["podAntiAffinity"] = copy.deepcopy(affinity_spec)
0495             yaml_affinity["podAntiAffinity"]["preferredDuringSchedulingIgnoredDuringExecution"][0]["podAffinityTerm"]["labelSelector"]["matchExpressions"][0][
0496                 "values"
0497             ] = anti_affinity_matrix[resource_type]
0498 
0499         return yaml_content
0500 
0501     def create_or_patch_secret(self, file_list, secret_name):
0502         # api_version = 'v1'
0503         # kind = 'Secret'
0504         # type='kubernetes.io/tls'
0505         rsp = None
0506         tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="create_or_patch_secret")
0507 
0508         metadata = {"name": secret_name, "namespace": self.namespace}
0509         data = {}
0510         for file_name in file_list:
0511             filename = os.path.basename(file_name)
0512             with open(file_name, "rb") as f:
0513                 content = f.read()
0514             data[filename] = base64.b64encode(content).decode()
0515         body = client.V1Secret(data=data, metadata=metadata)
0516         try:
0517             try:
0518                 rsp = self.corev1.patch_namespaced_secret(name=secret_name, body=body, namespace=self.namespace, _request_timeout=(5, 20))
0519                 tmp_log.debug("Patched secret")
0520             except ApiException as e:
0521                 tmp_log.debug(f"Exception when patching secret: {e} . Try to create secret instead...")
0522                 rsp = self.corev1.create_namespaced_secret(body=body, namespace=self.namespace, _request_timeout=(5, 20))
0523                 tmp_log.debug("Created secret")
0524         except Exception as e:
0525             tmp_log.error(f"Exception when patching or creating secret: {e}.")
0526         return rsp
0527 
0528     def create_configmap(self, work_spec):
0529         # useful guide: https://matthewpalmer.net/kubernetes-app-developer/articles/ultimate-configmap-guide-kubernetes.html
0530 
0531         tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="create_configmap")
0532 
0533         try:
0534             worker_id = str(work_spec.workerID)
0535 
0536             # Get the access point. The messenger should have dropped the input files for the pilot here
0537             access_point = work_spec.get_access_point()
0538             pjd = "pandaJobData.out"
0539             job_data_file = os.path.join(access_point, pjd)
0540             with open(job_data_file) as f:
0541                 job_data_contents = f.read()
0542 
0543             pfc = "PoolFileCatalog_H.xml"
0544             pool_file_catalog_file = os.path.join(access_point, pfc)
0545             with open(pool_file_catalog_file) as f:
0546                 pool_file_catalog_contents = f.read()
0547 
0548             # put the job data and PFC into a dictionary
0549             data = {pjd: job_data_contents, pfc: pool_file_catalog_contents}
0550 
0551             # instantiate the configmap object
0552             metadata = {"name": worker_id, "namespace": self.namespace}
0553             config_map = client.V1ConfigMap(api_version="v1", kind="ConfigMap", data=data, metadata=metadata)
0554 
0555             # create the configmap object in K8s
0556             api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map, _request_timeout=(5, 20))
0557             tmp_log.debug(f"Created configmap for worker id: {worker_id}")
0558             return True
0559 
0560         except Exception as e:
0561             tmp_log.error(f"Could not create configmap with: {e}")
0562             return False
0563 
0564     def create_or_patch_configmap_starter(self):
0565         # useful guide: https://matthewpalmer.net/kubernetes-app-developer/articles/ultimate-configmap-guide-kubernetes.html
0566 
0567         tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="create_or_patch_configmap_starter")
0568 
0569         try:
0570             fn = "pilots_starter.py"
0571             dirname = os.path.dirname(__file__)
0572             pilots_starter_file = os.path.join(dirname, f"../harvestercloud/{fn}")
0573             with open(pilots_starter_file) as f:
0574                 pilots_starter_contents = f.read()
0575 
0576             data = {fn: pilots_starter_contents}
0577             name = "pilots-starter"
0578 
0579             # instantiate the configmap object
0580             metadata = {"name": name, "namespace": self.namespace}
0581             config_map = client.V1ConfigMap(api_version="v1", kind="ConfigMap", data=data, metadata=metadata)
0582 
0583             try:
0584                 api_response = self.corev1.patch_namespaced_config_map(name=name, body=config_map, namespace=self.namespace, _request_timeout=(5, 20))
0585                 tmp_log.debug("Patched pilots-starter config_map")
0586             except ApiException as e:
0587                 tmp_log.debug(f"Exception when patching pilots-starter config_map: {e} . Try to create it instead...")
0588                 api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map, _request_timeout=(5, 20))
0589                 tmp_log.debug("Created pilots-starter config_map")
0590             return True
0591 
0592         except Exception as e:
0593             tmp_log.error(f"Could not create configmap with: {e}")
0594             return False
0595 
0596     def get_pod_logs(self, pod_name, previous=False):
0597         tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_pod_logs")
0598         try:
0599             rsp = self.corev1.read_namespaced_pod_log(name=pod_name, namespace=self.namespace, previous=previous, _request_timeout=(5, 20))
0600             tmp_log.debug(f"Log file retrieved for {pod_name}")
0601         except Exception as e:
0602             tmp_log.debug(f"Exception when getting logs for pod {pod_name} : {e}. Skipped")
0603             raise
0604         else:
0605             return rsp