Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import datetime
0002 import traceback
0003 from concurrent.futures import ThreadPoolExecutor
0004 
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestercore.work_spec import WorkSpec
0008 from pandaharvester.harvestercore.worker_errors import WorkerErrors
0009 from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s
0010 from pandaharvester.harvestermisc.k8s_utils import k8s_Client
0011 from pandaharvester.harvestermonitor.monitor_common import get_payload_errstr_from_ec
0012 
0013 base_logger = core_utils.setup_logger("k8s_monitor")
0014 
0015 BAD_CONTAINER_STATES = ["CreateContainerError", "CrashLoopBackOff", "FailedMount"]
0016 
0017 # monitor for K8S
0018 
0019 
0020 class K8sMonitor(PluginBase):
0021     # constructor
0022     def __init__(self, **kwarg):
0023         PluginBase.__init__(self, **kwarg)
0024 
0025         self.panda_queues_dict = PandaQueuesDictK8s()
0026 
0027         # retrieve the k8s namespace from CRIC
0028         namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName)
0029 
0030         self.k8s_client = k8s_Client(namespace=namespace, queue_name=self.queueName, config_file=self.k8s_config_file)
0031 
0032         try:
0033             self.nProcesses
0034         except AttributeError:
0035             self.nProcesses = 4
0036         try:
0037             self.cancelUnknown
0038         except AttributeError:
0039             self.cancelUnknown = False
0040         else:
0041             self.cancelUnknown = bool(self.cancelUnknown)
0042         try:
0043             self.podQueueTimeLimit
0044         except AttributeError:
0045             self.podQueueTimeLimit = 172800
0046         try:
0047             self.payloadType
0048         except AttributeError:
0049             self.payloadType = None
0050 
0051         self._all_workers_dict = []
0052 
0053     def check_pods_status(self, pods_status_list, pods_status_message_list, containers_state_list, containers_exit_code_list):
0054         sub_msg = ""
0055         exit_code = 0
0056 
0057         if "Unknown" in pods_status_list:
0058             if all(item == "Unknown" for item in pods_status_list):
0059                 new_status = None
0060             elif "Running" in pods_status_list:
0061                 new_status = WorkSpec.ST_running
0062             else:
0063                 new_status = WorkSpec.ST_idle
0064 
0065         else:
0066             # Pod in Pending status
0067             if all(item == "Pending" for item in pods_status_list):
0068                 new_status = WorkSpec.ST_submitted  # default is submitted, but consider certain cases
0069                 for item in containers_state_list:
0070                     if item.waiting and item.waiting.reason in BAD_CONTAINER_STATES:
0071                         new_status = WorkSpec.ST_failed  # change state to failed
0072 
0073             # Pod in Succeeded status
0074             elif "Succeeded" in pods_status_list:
0075                 if all((item.terminated is not None and item.terminated.reason == "Completed") for item in containers_state_list):
0076                     new_status = WorkSpec.ST_finished
0077                 else:
0078                     sub_message_list = []
0079                     for item in containers_state_list:
0080                         msg_str = ""
0081                         if item.terminated is None:
0082                             state = "UNKNOWN"
0083                             if item.running is not None:
0084                                 state = "running"
0085                             elif item.waiting is not None:
0086                                 state = "waiting"
0087                             msg_str = f"container not terminated yet ({state}) while pod Succeeded"
0088                         elif item.terminated.reason != "Completed":
0089                             msg_str = f"container terminated by k8s for reason {item.terminated.reason}"
0090                         sub_message_list.append(msg_str)
0091                     sub_msg = ";".join(sub_message_list)
0092                     new_status = WorkSpec.ST_cancelled
0093 
0094             # Pod in Running status
0095             elif "Running" in pods_status_list:
0096                 new_status = WorkSpec.ST_running
0097 
0098             # Pod in Failed status
0099             elif "Failed" in pods_status_list:
0100                 new_status = WorkSpec.ST_failed
0101 
0102                 for tmp_code in containers_exit_code_list:
0103                     if tmp_code != 0:
0104                         exit_code = tmp_code
0105 
0106                 try:
0107                     sub_msg = ";".join(pods_status_message_list)
0108                 except BaseException:
0109                     sub_msg = ""
0110             else:
0111                 new_status = WorkSpec.ST_idle
0112 
0113         return new_status, exit_code, sub_msg
0114 
0115     def check_job_status(self, job_status, job_status_reason, job_status_message, n_pods_succeeded, n_pods_failed):
0116         new_status = None
0117         sub_msg = ""
0118 
0119         if n_pods_succeeded or job_status == "Complete":
0120             new_status = WorkSpec.ST_finished
0121             sub_msg = ""
0122         elif n_pods_failed or job_status == "Failed":
0123             new_status = WorkSpec.ST_failed
0124             sub_msg = job_status_message + job_status_reason
0125         # in principle the job information should only apply to final states, but consider other states in the future
0126         return new_status, sub_msg
0127 
0128     def check_a_worker(self, workspec):
0129         # set logger
0130         tmp_log = self.make_logger(
0131             base_logger, f"queueName={self.queueName} workerID={workspec.workerID} batchID={workspec.batchID}", method_name="check_a_worker"
0132         )
0133 
0134         # initialization
0135         job_id = workspec.batchID
0136         error_message = ""
0137         time_now = core_utils.naive_utcnow()
0138         pods_status_list = []
0139         pods_status_message_list = []
0140         pods_name_to_delete_list = []
0141         job_status = ""
0142         job_status_reason = ""
0143         job_status_message = ""
0144         n_pods_succeeded = 0
0145         n_pods_failed = 0
0146         try:
0147             containers_state_list = []
0148             containers_exit_code_list = []
0149             pods_sup_diag_list = []
0150             if job_id in self._all_workers_dict:
0151                 worker_info = self._all_workers_dict[job_id]
0152 
0153                 # make list of status of the pods belonging to our job
0154                 if "pod_status" in worker_info and "containers_state" in worker_info and "pod_name" in worker_info:
0155                     pods_status_list.append(worker_info["pod_status"])
0156                     pods_status_message_list.append(worker_info["pod_status_message"])
0157                     containers_state_list.extend(worker_info["containers_state"])
0158                     containers_exit_code_list.extend(worker_info["containers_exit_code"])
0159                     pods_sup_diag_list.append(worker_info["pod_name"])
0160 
0161                 # get backup info about the job
0162                 if "job_status" in worker_info and "job_status_reason" in worker_info and "job_status_message" in worker_info:
0163                     job_status = worker_info["job_status"]
0164                     job_status_reason = worker_info["job_status_reason"]
0165                     job_status_message = worker_info["job_status_message"]
0166                     n_pods_succeeded = worker_info["n_pods_succeeded"]
0167                     n_pods_failed = worker_info["n_pods_failed"]
0168 
0169                 # make a list of pods that should be removed
0170                 # 1. pods being queued too long
0171                 if (
0172                     "pod_status" in worker_info
0173                     and worker_info["pod_status"] in ["Pending", "Unknown"]
0174                     and worker_info["pod_start_time"]
0175                     and time_now - worker_info["pod_start_time"] > datetime.timedelta(seconds=self.podQueueTimeLimit)
0176                 ):
0177                     pods_name_to_delete_list.append(worker_info["pod_name"])
0178                 # 2. pods with containers in bad states
0179                 if "pod_status" in worker_info and worker_info["pod_status"] in ["Pending", "Unknown"]:
0180                     for item in worker_info["containers_state"]:
0181                         if item.waiting and item.waiting.reason in BAD_CONTAINER_STATES:
0182                             pods_name_to_delete_list.append(worker_info["pod_name"])
0183 
0184         except Exception as _e:
0185             error_message = f"Failed to get status for id={job_id} ; {traceback.format_exc()}"
0186             tmp_log.error(error_message)
0187             new_status = None
0188         else:
0189             exit_code = 0
0190             # we didn't find neither the pod nor the job for the worker
0191             if not pods_status_list and not job_status:
0192                 # there were no pods found belonging to our job
0193                 error_message = f"JOB id={job_id} not found"
0194                 tmp_log.error(error_message)
0195                 tmp_log.info("Force to cancel the worker due to JOB not found")
0196                 new_status = WorkSpec.ST_cancelled
0197             # we found a pod for the worker, it has precedence over the job information
0198             elif pods_status_list:
0199                 # we found pods belonging to our job. Obtain the final status
0200                 tmp_log.debug(f"pods_status_list={pods_status_list}")
0201                 new_status, exit_code, sub_msg = self.check_pods_status(
0202                     pods_status_list, pods_status_message_list, containers_state_list, containers_exit_code_list
0203                 )
0204                 if sub_msg:
0205                     error_message += sub_msg
0206                 tmp_log.debug(f"new_status={new_status}, error_message={error_message}")
0207 
0208                 # Double check the job is not in failed or completed state and override the pod status
0209                 if new_status == WorkSpec.ST_running:
0210                     job_status, job_msg = self.check_job_status(job_status, job_status_reason, job_status_message, n_pods_succeeded, n_pods_failed)
0211                     if job_status:
0212                         new_status = job_status
0213                         error_message += job_msg
0214 
0215             # we didn't find the pod, but there was still a job for the worker
0216             else:
0217                 new_status, sub_msg = self.check_job_status(job_status, job_status_reason, job_status_message, n_pods_succeeded, n_pods_failed)
0218                 if sub_msg:
0219                     error_message += sub_msg
0220                 tmp_log.debug(f"new_status={new_status}, error_message={error_message}")
0221 
0222             # delete pods that have been queueing too long
0223             if pods_name_to_delete_list:
0224                 tmp_log.debug("Deleting pods queuing too long")
0225                 ret_list = self.k8s_client.delete_pods(pods_name_to_delete_list)
0226                 deleted_pods_list = []
0227                 for item in ret_list:
0228                     if item["errMsg"] == "":
0229                         deleted_pods_list.append(item["name"])
0230                 tmp_log.debug(f"Deleted pods queuing too long: {','.join(deleted_pods_list)}")
0231 
0232             # If there was an exit code and we have predefined message from e.g. the pilot wrapper
0233             sup_error_diag = ""
0234             if exit_code != 0:
0235                 sup_error_code = exit_code
0236                 sup_error_diag += get_payload_errstr_from_ec(self.payloadType, exit_code)
0237             else:
0238                 if error_message:
0239                     sup_error_code = WorkerErrors.k8s_message_pattern_handler.get_error_code(error_message)
0240                     if sup_error_code is None:
0241                         sup_error_code = WorkerErrors.error_codes.get("GENERAL_ERROR")
0242                 else:
0243                     sup_error_code = WorkerErrors.error_codes.get("SUCCEEDED")
0244 
0245             # Extend the base error message with the information found querying the job and pods
0246             sup_error_diag += error_message
0247 
0248             tmp_log.debug(f"setting sup_error_code={sup_error_code}, sup_error_diag={sup_error_diag}")
0249             workspec.set_supplemental_error(error_code=sup_error_code, error_diag=sup_error_diag)
0250 
0251         tmp_log.debug(f"returning new_status={new_status}, sup_error_diag={sup_error_diag}")
0252         return new_status, sup_error_diag
0253 
0254     def check_workers(self, workspec_list):
0255         tmp_log = self.make_logger(base_logger, f"queueName={self.queueName}", method_name="check_workers")
0256         tmp_log.debug("start")
0257 
0258         ret_list = []
0259         if not workspec_list:
0260             error_message = "empty workspec_list"
0261             tmp_log.debug(error_message)
0262             ret_list.append(("", error_message))
0263             return False, ret_list
0264 
0265         workers_info = self.k8s_client.get_workers_info(workspec_list=workspec_list)
0266         if workers_info is None:  # there was a communication issue to the K8S cluster
0267             tmp_log.debug("done without answer")
0268             return False, ret_list
0269 
0270         self._all_workers_dict = workers_info
0271 
0272         # resolve status requested workers
0273         with ThreadPoolExecutor(self.nProcesses) as thread_pool:
0274             ret_iterator = thread_pool.map(self.check_a_worker, workspec_list)
0275 
0276         ret_list = list(ret_iterator)
0277 
0278         tmp_log.debug("done")
0279         return True, ret_list