File indexing completed on 2026-04-20 07:58:56
0001 import datetime
0002 import json
0003 import os
0004 import re
0005 import shutil
0006 from concurrent.futures import ThreadPoolExecutor
0007
0008 import requests
0009 import requests.exceptions
0010
0011 from pandaharvester.harvesterbody.agent_base import AgentBase
0012 from pandaharvester.harvesterconfig import harvester_config
0013 from pandaharvester.harvestercore import core_utils
0014 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0015
0016
0017 _logger = core_utils.setup_logger("cacher")
0018
0019
0020
0021 class Cacher(AgentBase):
0022
0023 def __init__(self, communicator, single_mode=False):
0024 AgentBase.__init__(self, single_mode)
0025 self.dbProxy = DBProxy()
0026 self.communicator = communicator
0027
0028
0029 def run(self):
0030 while True:
0031
0032 self.execute()
0033
0034 if self.terminated(harvester_config.cacher.sleepTime, randomize=False):
0035 return
0036
0037
0038 def execute(self, force_update=False, skip_lock=False, n_threads=0):
0039 mainLog = self.make_logger(_logger, f"id={self.get_pid()}", method_name="execute")
0040
0041 locked = self.dbProxy.get_process_lock("cacher", self.get_pid(), harvester_config.cacher.sleepTime)
0042 if locked or skip_lock:
0043 mainLog.debug("getting information")
0044 timeLimit = core_utils.naive_utcnow() - datetime.timedelta(minutes=harvester_config.cacher.refreshInterval)
0045 itemsList = []
0046 nItems = 4
0047 for tmpStr in harvester_config.cacher.data:
0048 tmpItems = tmpStr.split("|")
0049 if len(tmpItems) < 3:
0050 continue
0051 tmpItems += [None] * (nItems - len(tmpItems))
0052 tmpItems = tmpItems[:nItems]
0053 itemsList.append(tmpItems)
0054
0055
0056 def _refresh_cache(inputs):
0057 mainKey, subKey, infoURL, dumpFile = inputs
0058 if subKey == "":
0059 subKey = None
0060
0061 lastUpdateTime = self.dbProxy.get_cache_last_update_time(mainKey, subKey)
0062 if (not force_update) and lastUpdateTime is not None and lastUpdateTime > timeLimit:
0063 return
0064
0065 tmpStat, newInfo = self.get_data(infoURL, mainLog)
0066 if tmpStat:
0067 mainLog.debug(f"got data for key={mainKey} subKey={subKey} from {infoURL}")
0068 else:
0069 mainLog.error(f"failed to get info for key={mainKey} subKey={subKey}")
0070 return
0071
0072 tmpStat = self.dbProxy.refresh_cache(mainKey, subKey, newInfo)
0073 if tmpStat:
0074 mainLog.debug(f"refreshed key={mainKey} subKey={subKey}")
0075 if dumpFile is not None:
0076 try:
0077 tmpFileName = dumpFile + ".tmp"
0078 with open(tmpFileName, "w") as tmpFile:
0079 json.dump(newInfo, tmpFile)
0080 shutil.move(tmpFileName, dumpFile)
0081 except Exception:
0082 core_utils.dump_error_message(mainLog)
0083 else:
0084 mainLog.error(f"failed to refresh key={mainKey} subKey={subKey} due to a DB error")
0085
0086
0087 if n_threads:
0088 mainLog.debug(f"refresh cache with {n_threads} threads")
0089 with ThreadPoolExecutor(n_threads) as thread_pool:
0090 thread_pool.map(_refresh_cache, itemsList)
0091 else:
0092 mainLog.debug("refresh cache")
0093 for inputs in itemsList:
0094 _refresh_cache(inputs)
0095 mainLog.debug("done")
0096
0097
0098
0099 def get_data(self, info_url, tmp_log):
0100 retStat = False
0101 retVal = None
0102
0103 match = re.search(r"\$\{*([^\}]+)\}*", info_url)
0104 if match:
0105 var_name = match.group(1)
0106 if var_name not in os.environ:
0107 errMsg = f"undefined environment variable: {var_name}"
0108 tmp_log.error(errMsg)
0109 else:
0110 info_url = os.environ[var_name]
0111 if info_url.startswith("file:"):
0112 try:
0113 with open(info_url.split(":")[-1], "r") as infoFile:
0114 retVal = infoFile.read()
0115 try:
0116 retVal = json.loads(retVal)
0117 except Exception:
0118 pass
0119 except Exception:
0120 core_utils.dump_error_message(tmp_log)
0121 elif info_url.startswith("http:"):
0122 try:
0123 res = requests.get(info_url, timeout=60)
0124 if res.status_code == 200:
0125 try:
0126 retVal = res.json()
0127 except Exception:
0128 errMsg = f"corrupted json from {info_url} : {res.text}"
0129 tmp_log.error(errMsg)
0130 else:
0131 errMsg = f"failed to get {info_url} with StatusCode={res.status_code} {res.text}"
0132 tmp_log.error(errMsg)
0133 except requests.exceptions.ReadTimeout:
0134 tmp_log.error(f"read timeout when getting data from {info_url}")
0135 except Exception:
0136 core_utils.dump_error_message(tmp_log)
0137 elif info_url.startswith("https:"):
0138 try:
0139 try:
0140
0141 cert_file = harvester_config.pandacon.cert_file
0142 key_file = harvester_config.pandacon.key_file
0143 ca_cert = harvester_config.pandacon.ca_cert
0144 if ca_cert is False:
0145 cert = None
0146 else:
0147 cert = (cert_file, key_file)
0148 res = requests.get(info_url, cert=cert, verify=ca_cert, timeout=60)
0149 except requests.exceptions.SSLError:
0150
0151 res = requests.get(info_url, timeout=60)
0152 except requests.exceptions.ReadTimeout:
0153 tmp_log.error(f"read timeout when getting data from {info_url}")
0154 except Exception:
0155 core_utils.dump_error_message(tmp_log)
0156 else:
0157 if res.status_code == 200:
0158 try:
0159 retVal = res.json()
0160 except Exception:
0161 errMsg = f"corrupted json from {info_url} : {res.text}"
0162 tmp_log.error(errMsg)
0163 else:
0164 errMsg = f"failed to get {info_url} with StatusCode={res.status_code} {res.text}"
0165 tmp_log.error(errMsg)
0166 elif info_url.startswith("panda_cache:"):
0167 try:
0168 publicKey, privateKey = info_url.split(":")[-1].split("&")
0169 retVal, outStr = self.communicator.get_key_pair(publicKey, privateKey)
0170 if retVal is None:
0171 tmp_log.error(outStr)
0172 except Exception:
0173 core_utils.dump_error_message(tmp_log)
0174 elif info_url.startswith("panda_server:"):
0175 try:
0176 method_name = info_url.split(":")[-1]
0177 method_function = getattr(self.communicator, method_name)
0178 retVal, outStr = method_function()
0179 if not retVal:
0180 tmp_log.error(outStr)
0181 except Exception:
0182 core_utils.dump_error_message(tmp_log)
0183 else:
0184 errMsg = f"unsupported protocol for {info_url}"
0185 tmp_log.error(errMsg)
0186 if retVal is not None:
0187 retStat = True
0188 return retStat, retVal
0189
0190
0191 def set_single_mode(self, single_mode):
0192 self.singleMode = single_mode