File indexing completed on 2026-04-20 07:58:59
0001 import datetime
0002 import traceback
0003 from concurrent.futures import ThreadPoolExecutor
0004
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestercore.work_spec import WorkSpec
0008 from pandaharvester.harvestercore.worker_errors import WorkerErrors
0009 from pandaharvester.harvestermisc.info_utils_k8s import PandaQueuesDictK8s
0010 from pandaharvester.harvestermisc.k8s_utils import k8s_Client
0011 from pandaharvester.harvestermonitor.monitor_common import get_payload_errstr_from_ec
0012
0013 base_logger = core_utils.setup_logger("k8s_monitor")
0014
0015 BAD_CONTAINER_STATES = ["CreateContainerError", "CrashLoopBackOff", "FailedMount"]
0016
0017
0018
0019
0020 class K8sMonitor(PluginBase):
0021
0022 def __init__(self, **kwarg):
0023 PluginBase.__init__(self, **kwarg)
0024
0025 self.panda_queues_dict = PandaQueuesDictK8s()
0026
0027
0028 namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName)
0029
0030 self.k8s_client = k8s_Client(namespace=namespace, queue_name=self.queueName, config_file=self.k8s_config_file)
0031
0032 try:
0033 self.nProcesses
0034 except AttributeError:
0035 self.nProcesses = 4
0036 try:
0037 self.cancelUnknown
0038 except AttributeError:
0039 self.cancelUnknown = False
0040 else:
0041 self.cancelUnknown = bool(self.cancelUnknown)
0042 try:
0043 self.podQueueTimeLimit
0044 except AttributeError:
0045 self.podQueueTimeLimit = 172800
0046 try:
0047 self.payloadType
0048 except AttributeError:
0049 self.payloadType = None
0050
0051 self._all_workers_dict = []
0052
0053 def check_pods_status(self, pods_status_list, pods_status_message_list, containers_state_list, containers_exit_code_list):
0054 sub_msg = ""
0055 exit_code = 0
0056
0057 if "Unknown" in pods_status_list:
0058 if all(item == "Unknown" for item in pods_status_list):
0059 new_status = None
0060 elif "Running" in pods_status_list:
0061 new_status = WorkSpec.ST_running
0062 else:
0063 new_status = WorkSpec.ST_idle
0064
0065 else:
0066
0067 if all(item == "Pending" for item in pods_status_list):
0068 new_status = WorkSpec.ST_submitted
0069 for item in containers_state_list:
0070 if item.waiting and item.waiting.reason in BAD_CONTAINER_STATES:
0071 new_status = WorkSpec.ST_failed
0072
0073
0074 elif "Succeeded" in pods_status_list:
0075 if all((item.terminated is not None and item.terminated.reason == "Completed") for item in containers_state_list):
0076 new_status = WorkSpec.ST_finished
0077 else:
0078 sub_message_list = []
0079 for item in containers_state_list:
0080 msg_str = ""
0081 if item.terminated is None:
0082 state = "UNKNOWN"
0083 if item.running is not None:
0084 state = "running"
0085 elif item.waiting is not None:
0086 state = "waiting"
0087 msg_str = f"container not terminated yet ({state}) while pod Succeeded"
0088 elif item.terminated.reason != "Completed":
0089 msg_str = f"container terminated by k8s for reason {item.terminated.reason}"
0090 sub_message_list.append(msg_str)
0091 sub_msg = ";".join(sub_message_list)
0092 new_status = WorkSpec.ST_cancelled
0093
0094
0095 elif "Running" in pods_status_list:
0096 new_status = WorkSpec.ST_running
0097
0098
0099 elif "Failed" in pods_status_list:
0100 new_status = WorkSpec.ST_failed
0101
0102 for tmp_code in containers_exit_code_list:
0103 if tmp_code != 0:
0104 exit_code = tmp_code
0105
0106 try:
0107 sub_msg = ";".join(pods_status_message_list)
0108 except BaseException:
0109 sub_msg = ""
0110 else:
0111 new_status = WorkSpec.ST_idle
0112
0113 return new_status, exit_code, sub_msg
0114
0115 def check_job_status(self, job_status, job_status_reason, job_status_message, n_pods_succeeded, n_pods_failed):
0116 new_status = None
0117 sub_msg = ""
0118
0119 if n_pods_succeeded or job_status == "Complete":
0120 new_status = WorkSpec.ST_finished
0121 sub_msg = ""
0122 elif n_pods_failed or job_status == "Failed":
0123 new_status = WorkSpec.ST_failed
0124 sub_msg = job_status_message + job_status_reason
0125
0126 return new_status, sub_msg
0127
0128 def check_a_worker(self, workspec):
0129
0130 tmp_log = self.make_logger(
0131 base_logger, f"queueName={self.queueName} workerID={workspec.workerID} batchID={workspec.batchID}", method_name="check_a_worker"
0132 )
0133
0134
0135 job_id = workspec.batchID
0136 error_message = ""
0137 time_now = core_utils.naive_utcnow()
0138 pods_status_list = []
0139 pods_status_message_list = []
0140 pods_name_to_delete_list = []
0141 job_status = ""
0142 job_status_reason = ""
0143 job_status_message = ""
0144 n_pods_succeeded = 0
0145 n_pods_failed = 0
0146 try:
0147 containers_state_list = []
0148 containers_exit_code_list = []
0149 pods_sup_diag_list = []
0150 if job_id in self._all_workers_dict:
0151 worker_info = self._all_workers_dict[job_id]
0152
0153
0154 if "pod_status" in worker_info and "containers_state" in worker_info and "pod_name" in worker_info:
0155 pods_status_list.append(worker_info["pod_status"])
0156 pods_status_message_list.append(worker_info["pod_status_message"])
0157 containers_state_list.extend(worker_info["containers_state"])
0158 containers_exit_code_list.extend(worker_info["containers_exit_code"])
0159 pods_sup_diag_list.append(worker_info["pod_name"])
0160
0161
0162 if "job_status" in worker_info and "job_status_reason" in worker_info and "job_status_message" in worker_info:
0163 job_status = worker_info["job_status"]
0164 job_status_reason = worker_info["job_status_reason"]
0165 job_status_message = worker_info["job_status_message"]
0166 n_pods_succeeded = worker_info["n_pods_succeeded"]
0167 n_pods_failed = worker_info["n_pods_failed"]
0168
0169
0170
0171 if (
0172 "pod_status" in worker_info
0173 and worker_info["pod_status"] in ["Pending", "Unknown"]
0174 and worker_info["pod_start_time"]
0175 and time_now - worker_info["pod_start_time"] > datetime.timedelta(seconds=self.podQueueTimeLimit)
0176 ):
0177 pods_name_to_delete_list.append(worker_info["pod_name"])
0178
0179 if "pod_status" in worker_info and worker_info["pod_status"] in ["Pending", "Unknown"]:
0180 for item in worker_info["containers_state"]:
0181 if item.waiting and item.waiting.reason in BAD_CONTAINER_STATES:
0182 pods_name_to_delete_list.append(worker_info["pod_name"])
0183
0184 except Exception as _e:
0185 error_message = f"Failed to get status for id={job_id} ; {traceback.format_exc()}"
0186 tmp_log.error(error_message)
0187 new_status = None
0188 else:
0189 exit_code = 0
0190
0191 if not pods_status_list and not job_status:
0192
0193 error_message = f"JOB id={job_id} not found"
0194 tmp_log.error(error_message)
0195 tmp_log.info("Force to cancel the worker due to JOB not found")
0196 new_status = WorkSpec.ST_cancelled
0197
0198 elif pods_status_list:
0199
0200 tmp_log.debug(f"pods_status_list={pods_status_list}")
0201 new_status, exit_code, sub_msg = self.check_pods_status(
0202 pods_status_list, pods_status_message_list, containers_state_list, containers_exit_code_list
0203 )
0204 if sub_msg:
0205 error_message += sub_msg
0206 tmp_log.debug(f"new_status={new_status}, error_message={error_message}")
0207
0208
0209 if new_status == WorkSpec.ST_running:
0210 job_status, job_msg = self.check_job_status(job_status, job_status_reason, job_status_message, n_pods_succeeded, n_pods_failed)
0211 if job_status:
0212 new_status = job_status
0213 error_message += job_msg
0214
0215
0216 else:
0217 new_status, sub_msg = self.check_job_status(job_status, job_status_reason, job_status_message, n_pods_succeeded, n_pods_failed)
0218 if sub_msg:
0219 error_message += sub_msg
0220 tmp_log.debug(f"new_status={new_status}, error_message={error_message}")
0221
0222
0223 if pods_name_to_delete_list:
0224 tmp_log.debug("Deleting pods queuing too long")
0225 ret_list = self.k8s_client.delete_pods(pods_name_to_delete_list)
0226 deleted_pods_list = []
0227 for item in ret_list:
0228 if item["errMsg"] == "":
0229 deleted_pods_list.append(item["name"])
0230 tmp_log.debug(f"Deleted pods queuing too long: {','.join(deleted_pods_list)}")
0231
0232
0233 sup_error_diag = ""
0234 if exit_code != 0:
0235 sup_error_code = exit_code
0236 sup_error_diag += get_payload_errstr_from_ec(self.payloadType, exit_code)
0237 else:
0238 if error_message:
0239 sup_error_code = WorkerErrors.k8s_message_pattern_handler.get_error_code(error_message)
0240 if sup_error_code is None:
0241 sup_error_code = WorkerErrors.error_codes.get("GENERAL_ERROR")
0242 else:
0243 sup_error_code = WorkerErrors.error_codes.get("SUCCEEDED")
0244
0245
0246 sup_error_diag += error_message
0247
0248 tmp_log.debug(f"setting sup_error_code={sup_error_code}, sup_error_diag={sup_error_diag}")
0249 workspec.set_supplemental_error(error_code=sup_error_code, error_diag=sup_error_diag)
0250
0251 tmp_log.debug(f"returning new_status={new_status}, sup_error_diag={sup_error_diag}")
0252 return new_status, sup_error_diag
0253
0254 def check_workers(self, workspec_list):
0255 tmp_log = self.make_logger(base_logger, f"queueName={self.queueName}", method_name="check_workers")
0256 tmp_log.debug("start")
0257
0258 ret_list = []
0259 if not workspec_list:
0260 error_message = "empty workspec_list"
0261 tmp_log.debug(error_message)
0262 ret_list.append(("", error_message))
0263 return False, ret_list
0264
0265 workers_info = self.k8s_client.get_workers_info(workspec_list=workspec_list)
0266 if workers_info is None:
0267 tmp_log.debug("done without answer")
0268 return False, ret_list
0269
0270 self._all_workers_dict = workers_info
0271
0272
0273 with ThreadPoolExecutor(self.nProcesses) as thread_pool:
0274 ret_iterator = thread_pool.map(self.check_a_worker, workspec_list)
0275
0276 ret_list = list(ret_iterator)
0277
0278 tmp_log.debug("done")
0279 return True, ret_list