Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:56

0001 import datetime
0002 import json
0003 import os
0004 import re
0005 import shutil
0006 from concurrent.futures import ThreadPoolExecutor
0007 
0008 import requests
0009 import requests.exceptions
0010 
0011 from pandaharvester.harvesterbody.agent_base import AgentBase
0012 from pandaharvester.harvesterconfig import harvester_config
0013 from pandaharvester.harvestercore import core_utils
0014 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0015 
0016 # logger
0017 _logger = core_utils.setup_logger("cacher")
0018 
0019 
0020 # cache information
0021 class Cacher(AgentBase):
0022     # constructor
0023     def __init__(self, communicator, single_mode=False):
0024         AgentBase.__init__(self, single_mode)
0025         self.dbProxy = DBProxy()
0026         self.communicator = communicator
0027 
0028     # main loop
0029     def run(self):
0030         while True:
0031             # execute
0032             self.execute()
0033             # check if being terminated
0034             if self.terminated(harvester_config.cacher.sleepTime, randomize=False):
0035                 return
0036 
0037     # main
0038     def execute(self, force_update=False, skip_lock=False, n_threads=0):
0039         mainLog = self.make_logger(_logger, f"id={self.get_pid()}", method_name="execute")
0040         # get lock
0041         locked = self.dbProxy.get_process_lock("cacher", self.get_pid(), harvester_config.cacher.sleepTime)
0042         if locked or skip_lock:
0043             mainLog.debug("getting information")
0044             timeLimit = core_utils.naive_utcnow() - datetime.timedelta(minutes=harvester_config.cacher.refreshInterval)
0045             itemsList = []
0046             nItems = 4
0047             for tmpStr in harvester_config.cacher.data:
0048                 tmpItems = tmpStr.split("|")
0049                 if len(tmpItems) < 3:
0050                     continue
0051                 tmpItems += [None] * (nItems - len(tmpItems))
0052                 tmpItems = tmpItems[:nItems]
0053                 itemsList.append(tmpItems)
0054             # refresh cache function
0055 
0056             def _refresh_cache(inputs):
0057                 mainKey, subKey, infoURL, dumpFile = inputs
0058                 if subKey == "":
0059                     subKey = None
0060                 # check last update time
0061                 lastUpdateTime = self.dbProxy.get_cache_last_update_time(mainKey, subKey)
0062                 if (not force_update) and lastUpdateTime is not None and lastUpdateTime > timeLimit:
0063                     return
0064                 # get information
0065                 tmpStat, newInfo = self.get_data(infoURL, mainLog)
0066                 if tmpStat:
0067                     mainLog.debug(f"got data for key={mainKey} subKey={subKey} from {infoURL}")
0068                 else:
0069                     mainLog.error(f"failed to get info for key={mainKey} subKey={subKey}")
0070                     return
0071                 # update
0072                 tmpStat = self.dbProxy.refresh_cache(mainKey, subKey, newInfo)
0073                 if tmpStat:
0074                     mainLog.debug(f"refreshed key={mainKey} subKey={subKey}")
0075                     if dumpFile is not None:
0076                         try:
0077                             tmpFileName = dumpFile + ".tmp"
0078                             with open(tmpFileName, "w") as tmpFile:
0079                                 json.dump(newInfo, tmpFile)
0080                             shutil.move(tmpFileName, dumpFile)
0081                         except Exception:
0082                             core_utils.dump_error_message(mainLog)
0083                 else:
0084                     mainLog.error(f"failed to refresh key={mainKey} subKey={subKey} due to a DB error")
0085 
0086             # loop over all items
0087             if n_threads:
0088                 mainLog.debug(f"refresh cache with {n_threads} threads")
0089                 with ThreadPoolExecutor(n_threads) as thread_pool:
0090                     thread_pool.map(_refresh_cache, itemsList)
0091             else:
0092                 mainLog.debug("refresh cache")
0093                 for inputs in itemsList:
0094                     _refresh_cache(inputs)
0095             mainLog.debug("done")
0096 
0097     # get new data
0098 
0099     def get_data(self, info_url, tmp_log):
0100         retStat = False
0101         retVal = None
0102         # resolve env variable
0103         match = re.search(r"\$\{*([^\}]+)\}*", info_url)
0104         if match:
0105             var_name = match.group(1)
0106             if var_name not in os.environ:
0107                 errMsg = f"undefined environment variable: {var_name}"
0108                 tmp_log.error(errMsg)
0109             else:
0110                 info_url = os.environ[var_name]
0111         if info_url.startswith("file:"):
0112             try:
0113                 with open(info_url.split(":")[-1], "r") as infoFile:
0114                     retVal = infoFile.read()
0115                     try:
0116                         retVal = json.loads(retVal)
0117                     except Exception:
0118                         pass
0119             except Exception:
0120                 core_utils.dump_error_message(tmp_log)
0121         elif info_url.startswith("http:"):
0122             try:
0123                 res = requests.get(info_url, timeout=60)
0124                 if res.status_code == 200:
0125                     try:
0126                         retVal = res.json()
0127                     except Exception:
0128                         errMsg = f"corrupted json from {info_url} : {res.text}"
0129                         tmp_log.error(errMsg)
0130                 else:
0131                     errMsg = f"failed to get {info_url} with StatusCode={res.status_code} {res.text}"
0132                     tmp_log.error(errMsg)
0133             except requests.exceptions.ReadTimeout:
0134                 tmp_log.error(f"read timeout when getting data from {info_url}")
0135             except Exception:
0136                 core_utils.dump_error_message(tmp_log)
0137         elif info_url.startswith("https:"):
0138             try:
0139                 try:
0140                     # try with pandacon certificate
0141                     cert_file = harvester_config.pandacon.cert_file
0142                     key_file = harvester_config.pandacon.key_file
0143                     ca_cert = harvester_config.pandacon.ca_cert
0144                     if ca_cert is False:
0145                         cert = None
0146                     else:
0147                         cert = (cert_file, key_file)
0148                     res = requests.get(info_url, cert=cert, verify=ca_cert, timeout=60)
0149                 except requests.exceptions.SSLError:
0150                     # try without certificate
0151                     res = requests.get(info_url, timeout=60)
0152             except requests.exceptions.ReadTimeout:
0153                 tmp_log.error(f"read timeout when getting data from {info_url}")
0154             except Exception:
0155                 core_utils.dump_error_message(tmp_log)
0156             else:
0157                 if res.status_code == 200:
0158                     try:
0159                         retVal = res.json()
0160                     except Exception:
0161                         errMsg = f"corrupted json from {info_url} : {res.text}"
0162                         tmp_log.error(errMsg)
0163                 else:
0164                     errMsg = f"failed to get {info_url} with StatusCode={res.status_code} {res.text}"
0165                     tmp_log.error(errMsg)
0166         elif info_url.startswith("panda_cache:"):
0167             try:
0168                 publicKey, privateKey = info_url.split(":")[-1].split("&")
0169                 retVal, outStr = self.communicator.get_key_pair(publicKey, privateKey)
0170                 if retVal is None:
0171                     tmp_log.error(outStr)
0172             except Exception:
0173                 core_utils.dump_error_message(tmp_log)
0174         elif info_url.startswith("panda_server:"):
0175             try:
0176                 method_name = info_url.split(":")[-1]
0177                 method_function = getattr(self.communicator, method_name)
0178                 retVal, outStr = method_function()
0179                 if not retVal:
0180                     tmp_log.error(outStr)
0181             except Exception:
0182                 core_utils.dump_error_message(tmp_log)
0183         else:
0184             errMsg = f"unsupported protocol for {info_url}"
0185             tmp_log.error(errMsg)
0186         if retVal is not None:
0187             retStat = True
0188         return retStat, retVal
0189 
0190     # set single mode
0191     def set_single_mode(self, single_mode):
0192         self.singleMode = single_mode