Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 import datetime
0002 import os
0003 import signal
0004 import smtplib
0005 import socket
0006 import subprocess
0007 import time
0008 from email.mime.text import MIMEText
0009 
0010 from pandalogger import logger_config
0011 
0012 from pandaharvester.harvesterbody.agent_base import AgentBase
0013 from pandaharvester.harvesterconfig import harvester_config
0014 from pandaharvester.harvestercore import core_utils
0015 
0016 logDir = logger_config.daemon["logdir"]
0017 if "PANDA_LOCK_DIR" in os.environ:
0018     lockFileName = os.path.join("PANDA_LOCK_DIR", "watcher.lock")
0019 else:
0020     lockFileName = os.path.join(logDir, "watcher.lock")
0021 
0022 # logger
0023 _logger = core_utils.setup_logger("watcher")
0024 
0025 
0026 # watching the system
0027 class Watcher(AgentBase):
0028     # constructor
0029     def __init__(self, single_mode=False):
0030         AgentBase.__init__(self, single_mode)
0031         self.startTime = core_utils.naive_utcnow()
0032 
0033     # main loop
0034     def run(self):
0035         while True:
0036             # execute
0037             self.execute()
0038             # check if being terminated
0039             if self.terminated(harvester_config.watcher.sleepTime, randomize=False):
0040                 return
0041 
0042     # main
0043     def execute(self):
0044         # avoid too early check
0045         if not self.singleMode and core_utils.naive_utcnow() - self.startTime < datetime.timedelta(seconds=harvester_config.watcher.checkInterval):
0046             return
0047         mainLog = core_utils.make_logger(_logger, f"id={self.get_pid()}", method_name="execute")
0048         mainLog.debug("start")
0049         # get file lock
0050         try:
0051             with core_utils.get_file_lock(lockFileName, harvester_config.watcher.checkInterval):
0052                 try:
0053                     logFileNameList = harvester_config.watcher.logFileNameList.split(",")
0054                 except Exception:
0055                     logFileNameList = ["panda-db_proxy.log"]
0056                 lastTime = None
0057                 logDuration = None
0058                 lastTimeName = None
0059                 logDurationName = None
0060                 actionsList = harvester_config.watcher.actions.split(",")
0061                 for logFileName in logFileNameList:
0062                     logFilePath = os.path.join(logDir, logFileName)
0063                     timeNow = core_utils.naive_utcnow()
0064                     if os.path.exists(logFilePath):
0065                         # get latest timestamp
0066                         tmpLogDuration = None
0067                         try:
0068                             p = subprocess.Popen(["tail", "-1", logFilePath], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0069                             line = p.communicate()[0]
0070                             tmpLastTime = datetime.datetime.strptime(str(line[:23], "utf-8"), "%Y-%m-%d %H:%M:%S,%f")
0071                         except Exception:
0072                             tmpLastTime = None
0073                         # get processing time for last 1000 queries
0074                         try:
0075                             p = subprocess.Popen(
0076                                 f"tail -{harvester_config.watcher.nMessages} {logFilePath} | head -1",
0077                                 stdout=subprocess.PIPE,
0078                                 stderr=subprocess.PIPE,
0079                                 shell=True,
0080                             )
0081                             line = p.communicate()[0]
0082                             firstTime = datetime.datetime.strptime(str(line[:23], "utf-8"), "%Y-%m-%d %H:%M:%S,%f")
0083                             if tmpLastTime is not None:
0084                                 tmpLogDuration = tmpLastTime - firstTime
0085                         except Exception as e:
0086                             mainLog.warning(f"Skip with error {e.__class__.__name__}: {e}")
0087                         tmpMsg = "log={0} : last message at {0}. ".format(logFileName, tmpLastTime)
0088                         if tmpLogDuration is not None:
0089                             tmpMsg += f"{harvester_config.watcher.nMessages} messages took {tmpLogDuration.total_seconds()} sec"
0090                         mainLog.debug(tmpMsg)
0091                         if tmpLastTime is not None and (lastTime is None or lastTime > tmpLastTime):
0092                             lastTime = tmpLastTime
0093                             lastTimeName = logFileName
0094                         if tmpLogDuration is not None and (logDuration is None or logDuration < tmpLogDuration):
0095                             logDuration = tmpLogDuration
0096                             logDurationName = logFileName
0097                     # check timestamp
0098                     doAction = False
0099                     if (
0100                         harvester_config.watcher.maxStalled > 0
0101                         and lastTime is not None
0102                         and timeNow - lastTime > datetime.timedelta(seconds=harvester_config.watcher.maxStalled)
0103                     ):
0104                         mainLog.warning(f"last log message is too old in {lastTimeName}. seems to be stalled")
0105                         doAction = True
0106                     elif (
0107                         harvester_config.watcher.maxDuration > 0
0108                         and logDuration is not None
0109                         and logDuration.total_seconds() > harvester_config.watcher.maxDuration
0110                     ):
0111                         mainLog.warning(f"slow message generation in {logDurationName}. seems to be a performance issue")
0112                         doAction = True
0113                     # take action
0114                     if doAction:
0115                         # email
0116                         if "email" in actionsList:
0117                             # get pass phrase
0118                             toSkip = False
0119                             mailUser = None
0120                             mailPass = None
0121                             if harvester_config.watcher.mailUser != "" and harvester_config.watcher.mailPassword != "":
0122                                 envName = harvester_config.watcher.passphraseEnv
0123                                 if envName not in os.environ:
0124                                     tmpMsg = f"{envName} is undefined in etc/sysconfig/panda_harvester"
0125                                     mainLog.error(tmpMsg)
0126                                     toSkip = True
0127                                 else:
0128                                     key = os.environ[envName]
0129                                     mailUser = core_utils.decrypt_string(key, harvester_config.watcher.mailUser)
0130                                     mailPass = core_utils.decrypt_string(key, harvester_config.watcher.mailPassword)
0131                             if not toSkip:
0132                                 # message
0133                                 msgBody = f"harvester {harvester_config.master.harvester_id} "
0134                                 msgBody += f"is having a problem on {socket.getfqdn()} "
0135                                 msgBody += f"at {core_utils.naive_utcnow()} (UTC)"
0136                                 message = MIMEText(msgBody)
0137                                 message["Subject"] = "Harvester Alarm"
0138                                 message["From"] = harvester_config.watcher.mailFrom
0139                                 message["To"] = harvester_config.watcher.mailTo
0140                                 # send email
0141                                 mainLog.debug(f"sending email to {harvester_config.watcher.mailTo}")
0142                                 server = smtplib.SMTP(harvester_config.watcher.mailServer, harvester_config.watcher.mailPort)
0143                                 if hasattr(harvester_config.watcher, "mailUseSSL") and harvester_config.watcher.mailUseSSL is True:
0144                                     server.starttls()
0145                                 if mailUser is not None and mailPass is not None:
0146                                     server.login(mailUser, mailPass)
0147                                 server.ehlo()
0148                                 server.sendmail(harvester_config.watcher.mailFrom, harvester_config.watcher.mailTo.split(","), message.as_string())
0149                                 server.quit()
0150                         # kill
0151                         if "kill" in actionsList:
0152                             # send USR2 fist
0153                             mainLog.debug("sending SIGUSR2")
0154                             os.killpg(os.getpgrp(), signal.SIGUSR2)
0155                             time.sleep(60)
0156                             mainLog.debug("sending SIGKILL")
0157                             os.killpg(os.getpgrp(), signal.SIGKILL)
0158                         elif "terminate" in actionsList:
0159                             mainLog.debug("sending SIGTERM")
0160                             os.killpg(os.getpgrp(), signal.SIGTERM)
0161                     else:
0162                         mainLog.debug(f"No action needed for {logFileName}")
0163         except IOError:
0164             mainLog.debug("skip as locked by another thread or too early to check")
0165         except Exception:
0166             core_utils.dump_error_message(mainLog)
0167         mainLog.debug("done")