File indexing completed on 2026-04-20 07:58:57
0001 import datetime
0002 import os
0003 import signal
0004 import smtplib
0005 import socket
0006 import subprocess
0007 import time
0008 from email.mime.text import MIMEText
0009
0010 from pandalogger import logger_config
0011
0012 from pandaharvester.harvesterbody.agent_base import AgentBase
0013 from pandaharvester.harvesterconfig import harvester_config
0014 from pandaharvester.harvestercore import core_utils
0015
0016 logDir = logger_config.daemon["logdir"]
0017 if "PANDA_LOCK_DIR" in os.environ:
0018 lockFileName = os.path.join("PANDA_LOCK_DIR", "watcher.lock")
0019 else:
0020 lockFileName = os.path.join(logDir, "watcher.lock")
0021
0022
0023 _logger = core_utils.setup_logger("watcher")
0024
0025
0026
0027 class Watcher(AgentBase):
0028
0029 def __init__(self, single_mode=False):
0030 AgentBase.__init__(self, single_mode)
0031 self.startTime = core_utils.naive_utcnow()
0032
0033
0034 def run(self):
0035 while True:
0036
0037 self.execute()
0038
0039 if self.terminated(harvester_config.watcher.sleepTime, randomize=False):
0040 return
0041
0042
0043 def execute(self):
0044
0045 if not self.singleMode and core_utils.naive_utcnow() - self.startTime < datetime.timedelta(seconds=harvester_config.watcher.checkInterval):
0046 return
0047 mainLog = core_utils.make_logger(_logger, f"id={self.get_pid()}", method_name="execute")
0048 mainLog.debug("start")
0049
0050 try:
0051 with core_utils.get_file_lock(lockFileName, harvester_config.watcher.checkInterval):
0052 try:
0053 logFileNameList = harvester_config.watcher.logFileNameList.split(",")
0054 except Exception:
0055 logFileNameList = ["panda-db_proxy.log"]
0056 lastTime = None
0057 logDuration = None
0058 lastTimeName = None
0059 logDurationName = None
0060 actionsList = harvester_config.watcher.actions.split(",")
0061 for logFileName in logFileNameList:
0062 logFilePath = os.path.join(logDir, logFileName)
0063 timeNow = core_utils.naive_utcnow()
0064 if os.path.exists(logFilePath):
0065
0066 tmpLogDuration = None
0067 try:
0068 p = subprocess.Popen(["tail", "-1", logFilePath], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0069 line = p.communicate()[0]
0070 tmpLastTime = datetime.datetime.strptime(str(line[:23], "utf-8"), "%Y-%m-%d %H:%M:%S,%f")
0071 except Exception:
0072 tmpLastTime = None
0073
0074 try:
0075 p = subprocess.Popen(
0076 f"tail -{harvester_config.watcher.nMessages} {logFilePath} | head -1",
0077 stdout=subprocess.PIPE,
0078 stderr=subprocess.PIPE,
0079 shell=True,
0080 )
0081 line = p.communicate()[0]
0082 firstTime = datetime.datetime.strptime(str(line[:23], "utf-8"), "%Y-%m-%d %H:%M:%S,%f")
0083 if tmpLastTime is not None:
0084 tmpLogDuration = tmpLastTime - firstTime
0085 except Exception as e:
0086 mainLog.warning(f"Skip with error {e.__class__.__name__}: {e}")
0087 tmpMsg = "log={0} : last message at {0}. ".format(logFileName, tmpLastTime)
0088 if tmpLogDuration is not None:
0089 tmpMsg += f"{harvester_config.watcher.nMessages} messages took {tmpLogDuration.total_seconds()} sec"
0090 mainLog.debug(tmpMsg)
0091 if tmpLastTime is not None and (lastTime is None or lastTime > tmpLastTime):
0092 lastTime = tmpLastTime
0093 lastTimeName = logFileName
0094 if tmpLogDuration is not None and (logDuration is None or logDuration < tmpLogDuration):
0095 logDuration = tmpLogDuration
0096 logDurationName = logFileName
0097
0098 doAction = False
0099 if (
0100 harvester_config.watcher.maxStalled > 0
0101 and lastTime is not None
0102 and timeNow - lastTime > datetime.timedelta(seconds=harvester_config.watcher.maxStalled)
0103 ):
0104 mainLog.warning(f"last log message is too old in {lastTimeName}. seems to be stalled")
0105 doAction = True
0106 elif (
0107 harvester_config.watcher.maxDuration > 0
0108 and logDuration is not None
0109 and logDuration.total_seconds() > harvester_config.watcher.maxDuration
0110 ):
0111 mainLog.warning(f"slow message generation in {logDurationName}. seems to be a performance issue")
0112 doAction = True
0113
0114 if doAction:
0115
0116 if "email" in actionsList:
0117
0118 toSkip = False
0119 mailUser = None
0120 mailPass = None
0121 if harvester_config.watcher.mailUser != "" and harvester_config.watcher.mailPassword != "":
0122 envName = harvester_config.watcher.passphraseEnv
0123 if envName not in os.environ:
0124 tmpMsg = f"{envName} is undefined in etc/sysconfig/panda_harvester"
0125 mainLog.error(tmpMsg)
0126 toSkip = True
0127 else:
0128 key = os.environ[envName]
0129 mailUser = core_utils.decrypt_string(key, harvester_config.watcher.mailUser)
0130 mailPass = core_utils.decrypt_string(key, harvester_config.watcher.mailPassword)
0131 if not toSkip:
0132
0133 msgBody = f"harvester {harvester_config.master.harvester_id} "
0134 msgBody += f"is having a problem on {socket.getfqdn()} "
0135 msgBody += f"at {core_utils.naive_utcnow()} (UTC)"
0136 message = MIMEText(msgBody)
0137 message["Subject"] = "Harvester Alarm"
0138 message["From"] = harvester_config.watcher.mailFrom
0139 message["To"] = harvester_config.watcher.mailTo
0140
0141 mainLog.debug(f"sending email to {harvester_config.watcher.mailTo}")
0142 server = smtplib.SMTP(harvester_config.watcher.mailServer, harvester_config.watcher.mailPort)
0143 if hasattr(harvester_config.watcher, "mailUseSSL") and harvester_config.watcher.mailUseSSL is True:
0144 server.starttls()
0145 if mailUser is not None and mailPass is not None:
0146 server.login(mailUser, mailPass)
0147 server.ehlo()
0148 server.sendmail(harvester_config.watcher.mailFrom, harvester_config.watcher.mailTo.split(","), message.as_string())
0149 server.quit()
0150
0151 if "kill" in actionsList:
0152
0153 mainLog.debug("sending SIGUSR2")
0154 os.killpg(os.getpgrp(), signal.SIGUSR2)
0155 time.sleep(60)
0156 mainLog.debug("sending SIGKILL")
0157 os.killpg(os.getpgrp(), signal.SIGKILL)
0158 elif "terminate" in actionsList:
0159 mainLog.debug("sending SIGTERM")
0160 os.killpg(os.getpgrp(), signal.SIGTERM)
0161 else:
0162 mainLog.debug(f"No action needed for {logFileName}")
0163 except IOError:
0164 mainLog.debug("skip as locked by another thread or too early to check")
0165 except Exception:
0166 core_utils.dump_error_message(mainLog)
0167 mainLog.debug("done")