File indexing completed on 2026-04-20 07:58:57
0001 import multiprocessing
0002 import re
0003 import subprocess
0004 import traceback
0005
0006 import psutil
0007
0008 from pandaharvester.harvesterbody.agent_base import AgentBase
0009 from pandaharvester.harvesterbody.cred_manager import CredManager
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0013 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0014 from pandaharvester.harvestercore.service_metrics_spec import ServiceMetricSpec
0015
0016
0017 _logger = core_utils.setup_logger("service_monitor")
0018
0019
0020 def round_floats(value):
0021
0022 return round(value, 2) if isinstance(value, float) else value
0023
0024
0025
0026 class ServiceMonitor(AgentBase):
0027
0028 def __init__(self, pid_file, single_mode=False):
0029 AgentBase.__init__(self, single_mode)
0030 self.db_proxy = DBProxy()
0031
0032 if pid_file is not None:
0033 self.pid_file = pid_file
0034 else:
0035 try:
0036 self.pid_file = harvester_config.service_monitor.pidfile
0037 except Exception:
0038 self.pid_file = None
0039
0040 self.pid = self.get_master_pid()
0041 self.master_process = psutil.Process(self.pid)
0042 self.children = self.master_process.children(recursive=True)
0043
0044 self.cpu_count = multiprocessing.cpu_count()
0045 self.queue_config_mapper = QueueConfigMapper()
0046 self.cred_manager = CredManager(self.queue_config_mapper, single_mode=True)
0047
0048 def get_master_pid(self):
0049 """
0050 Gets the master pid from the lock file
0051 :return:
0052 """
0053 try:
0054 fh = open(self.pid_file, "r")
0055 pid = int(fh.readline())
0056 fh.close()
0057 except Exception:
0058 _logger.error(f'Could not read pidfile "{self.pid_file}"')
0059 pid = None
0060
0061 return pid
0062
0063 def refresh_children_list(self, children):
0064 children_refreshed = []
0065
0066 for child_current in children:
0067 pid_current = child_current.pid
0068 found = False
0069 for child_stored in self.children:
0070 pid_stored = child_stored.pid
0071 if pid_stored == pid_current:
0072 found = True
0073 break
0074
0075 if found:
0076 children_refreshed.append(child_stored)
0077 else:
0078 children_refreshed.append(child_current)
0079
0080 self.children = children_refreshed
0081
0082 return children_refreshed
0083
0084 def get_memory_n_cpu(self):
0085 """
0086 sum memory of whole process tree starting from master pid
0087 :return: rss in MiB
0088 """
0089 try:
0090 master_process = self.master_process
0091 rss = master_process.memory_info()[0]
0092 memory_pc = master_process.memory_percent()
0093 cpu_pc = master_process.cpu_percent()
0094
0095 children = self.refresh_children_list(master_process.children(recursive=True))
0096 for child in children:
0097 rss += child.memory_info()[0]
0098 memory_pc += child.memory_percent()
0099 cpu_pc += child.cpu_percent()
0100
0101
0102 rss_mib = rss / float(2**20)
0103
0104 cpu_pc = cpu_pc * 1.0 / self.cpu_count
0105 except Exception:
0106 _logger.error(f"Excepted with: {traceback.format_exc()}")
0107 rss_mib = None
0108 memory_pc = None
0109 cpu_pc = None
0110
0111 return rss_mib, memory_pc, cpu_pc
0112
0113 def volume_use(self, volume_name):
0114 command = "df -Pkh /" + volume_name
0115 used_amount = 0
0116 tmp_array = command.split()
0117 output = subprocess.Popen(tmp_array, stdout=subprocess.PIPE).communicate()[0].decode("utf-8")
0118
0119 for line in output.split("\n"):
0120 if re.search(volume_name, line):
0121 used_amount = re.search(r"(\d+)\%", line).group(1)
0122
0123 try:
0124 used_amount_float = float(used_amount)
0125 except ValueError:
0126 used_amount_float = None
0127 _logger.error(f"Could not convert used amount {used_amount} to float for volume {volume_name}")
0128
0129 return used_amount_float
0130
0131 def cert_validities(self):
0132 try:
0133 cert_validities = self.cred_manager.execute_monit()
0134 return cert_validities
0135 except Exception:
0136 _logger.error("Could not extract ")
0137 return {}
0138
0139
0140 def run(self):
0141 while True:
0142 _logger.debug("Running service monitor")
0143
0144 service_metrics = {}
0145
0146
0147 rss_mib, memory_pc, cpu_pc = self.get_memory_n_cpu()
0148 service_metrics["rss_mib"] = round_floats(rss_mib)
0149 service_metrics["memory_pc"] = round_floats(memory_pc)
0150 service_metrics["cpu_pc"] = round_floats(cpu_pc)
0151 _logger.debug(f"Memory usage: {service_metrics['rss_mib']} MiB/{service_metrics['memory_pc']}%, CPU usage: {service_metrics['cpu_pc']}")
0152
0153
0154 try:
0155 volumes = harvester_config.service_monitor.disk_volumes.split(",")
0156 except Exception:
0157 volumes = []
0158 for volume in volumes:
0159 volume_use = self.volume_use(volume)
0160 service_metrics[f"volume_{volume}_pc"] = round_floats(volume_use)
0161 _logger.debug(f"Disk usage of {volume}: {service_metrics[f'volume_{volume}_pc']} %")
0162
0163
0164 _logger.debug("Getting cert lifetimes")
0165 service_metrics["cert_lifetime"] = {cert: round(value) for (cert, value) in self.cert_validities().items()}
0166
0167 _logger.debug(f"Got cert validities: {service_metrics['cert_lifetime']}")
0168
0169 service_metrics_spec = ServiceMetricSpec(service_metrics)
0170 self.db_proxy.insert_service_metrics(service_metrics_spec)
0171
0172
0173 try:
0174 sleep_time = harvester_config.service_monitor.sleepTime
0175 except Exception:
0176 sleep_time = 300
0177
0178 if self.terminated(sleep_time, randomize=False):
0179 return