Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 import multiprocessing
0002 import re
0003 import subprocess
0004 import traceback
0005 
0006 import psutil
0007 
0008 from pandaharvester.harvesterbody.agent_base import AgentBase
0009 from pandaharvester.harvesterbody.cred_manager import CredManager
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0013 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0014 from pandaharvester.harvestercore.service_metrics_spec import ServiceMetricSpec
0015 
0016 # logger
0017 _logger = core_utils.setup_logger("service_monitor")
0018 
0019 
0020 def round_floats(value):
0021     # Will round floats to 2 decimals. If the value is not a float, it will return the value unchanged
0022     return round(value, 2) if isinstance(value, float) else value
0023 
0024 
0025 # class to monitor the service, e.g. memory usage
0026 class ServiceMonitor(AgentBase):
0027     # constructor
0028     def __init__(self, pid_file, single_mode=False):
0029         AgentBase.__init__(self, single_mode)
0030         self.db_proxy = DBProxy()
0031 
0032         if pid_file is not None:
0033             self.pid_file = pid_file
0034         else:
0035             try:
0036                 self.pid_file = harvester_config.service_monitor.pidfile
0037             except Exception:
0038                 self.pid_file = None
0039 
0040         self.pid = self.get_master_pid()
0041         self.master_process = psutil.Process(self.pid)
0042         self.children = self.master_process.children(recursive=True)
0043 
0044         self.cpu_count = multiprocessing.cpu_count()
0045         self.queue_config_mapper = QueueConfigMapper()
0046         self.cred_manager = CredManager(self.queue_config_mapper, single_mode=True)
0047 
0048     def get_master_pid(self):
0049         """
0050         Gets the master pid from the lock file
0051         :return:
0052         """
0053         try:
0054             fh = open(self.pid_file, "r")
0055             pid = int(fh.readline())
0056             fh.close()
0057         except Exception:
0058             _logger.error(f'Could not read pidfile "{self.pid_file}"')
0059             pid = None
0060 
0061         return pid
0062 
0063     def refresh_children_list(self, children):
0064         children_refreshed = []
0065 
0066         for child_current in children:
0067             pid_current = child_current.pid
0068             found = False
0069             for child_stored in self.children:
0070                 pid_stored = child_stored.pid
0071                 if pid_stored == pid_current:
0072                     found = True
0073                     break
0074 
0075             if found:
0076                 children_refreshed.append(child_stored)
0077             else:
0078                 children_refreshed.append(child_current)
0079 
0080         self.children = children_refreshed
0081 
0082         return children_refreshed
0083 
0084     def get_memory_n_cpu(self):
0085         """
0086         sum memory of whole process tree starting from master pid
0087         :return: rss in MiB
0088         """
0089         try:
0090             master_process = self.master_process
0091             rss = master_process.memory_info()[0]
0092             memory_pc = master_process.memory_percent()
0093             cpu_pc = master_process.cpu_percent()
0094 
0095             children = self.refresh_children_list(master_process.children(recursive=True))
0096             for child in children:
0097                 rss += child.memory_info()[0]
0098                 memory_pc += child.memory_percent()
0099                 cpu_pc += child.cpu_percent()
0100 
0101             # convert bytes to MiB
0102             rss_mib = rss / float(2**20)
0103             # normalize cpu percentage by cpu count
0104             cpu_pc = cpu_pc * 1.0 / self.cpu_count
0105         except Exception:
0106             _logger.error(f"Excepted with: {traceback.format_exc()}")
0107             rss_mib = None
0108             memory_pc = None
0109             cpu_pc = None
0110 
0111         return rss_mib, memory_pc, cpu_pc
0112 
0113     def volume_use(self, volume_name):
0114         command = "df -Pkh /" + volume_name
0115         used_amount = 0
0116         tmp_array = command.split()
0117         output = subprocess.Popen(tmp_array, stdout=subprocess.PIPE).communicate()[0].decode("utf-8")
0118 
0119         for line in output.split("\n"):
0120             if re.search(volume_name, line):
0121                 used_amount = re.search(r"(\d+)\%", line).group(1)
0122 
0123         try:
0124             used_amount_float = float(used_amount)
0125         except ValueError:
0126             used_amount_float = None
0127             _logger.error(f"Could not convert used amount {used_amount} to float for volume {volume_name}")
0128 
0129         return used_amount_float
0130 
0131     def cert_validities(self):
0132         try:
0133             cert_validities = self.cred_manager.execute_monit()
0134             return cert_validities
0135         except Exception:
0136             _logger.error("Could not extract ")
0137             return {}
0138 
0139     # main loop
0140     def run(self):
0141         while True:
0142             _logger.debug("Running service monitor")
0143 
0144             service_metrics = {}
0145 
0146             # get memory usage
0147             rss_mib, memory_pc, cpu_pc = self.get_memory_n_cpu()
0148             service_metrics["rss_mib"] = round_floats(rss_mib)
0149             service_metrics["memory_pc"] = round_floats(memory_pc)
0150             service_metrics["cpu_pc"] = round_floats(cpu_pc)
0151             _logger.debug(f"Memory usage: {service_metrics['rss_mib']} MiB/{service_metrics['memory_pc']}%, CPU usage: {service_metrics['cpu_pc']}")
0152 
0153             # get volume usage
0154             try:
0155                 volumes = harvester_config.service_monitor.disk_volumes.split(",")
0156             except Exception:
0157                 volumes = []
0158             for volume in volumes:
0159                 volume_use = self.volume_use(volume)
0160                 service_metrics[f"volume_{volume}_pc"] = round_floats(volume_use)
0161                 _logger.debug(f"Disk usage of {volume}: {service_metrics[f'volume_{volume}_pc']} %")
0162 
0163             # get certificate lifetimes. Not all plugins have implemented it
0164             _logger.debug("Getting cert lifetimes")
0165             service_metrics["cert_lifetime"] = {cert: round(value) for (cert, value) in self.cert_validities().items()}
0166 
0167             _logger.debug(f"Got cert validities: {service_metrics['cert_lifetime']}")
0168 
0169             service_metrics_spec = ServiceMetricSpec(service_metrics)
0170             self.db_proxy.insert_service_metrics(service_metrics_spec)
0171 
0172             # check if being terminated
0173             try:
0174                 sleep_time = harvester_config.service_monitor.sleepTime
0175             except Exception:
0176                 sleep_time = 300
0177 
0178             if self.terminated(sleep_time, randomize=False):
0179                 return