Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:56

0001 import itertools
0002 import re
0003 
0004 from pandaharvester.harvesterbody.agent_base import AgentBase
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0008 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0009 
0010 # logger
0011 _logger = core_utils.setup_logger("file_syncer")
0012 
0013 
0014 # file syncer
0015 class FileSyncer(AgentBase):
0016     # constructor
0017     def __init__(self, queue_config_mapper, single_mode=False):
0018         AgentBase.__init__(self, single_mode)
0019         self.queue_config_mapper = queue_config_mapper
0020         self.pluginFactory = PluginFactory()
0021         self.dbProxy = DBProxy()
0022         # plugin cores
0023         self.exe_cores = []
0024         self.queue_exe_cores = []
0025         # get plugin from harvester config
0026         self.get_cores_from_harvester_config()
0027         # update plugin cores from queue config
0028         self.update_cores_from_queue_config()
0029 
0030     # get list
0031     def get_list(self, data):
0032         if isinstance(data, list):
0033             return data
0034         else:
0035             return [data]
0036 
0037     # get plugin cores from harvester config
0038     def get_cores_from_harvester_config(self):
0039         # direct and merged plugin configuration in json
0040         if hasattr(harvester_config, "file_syncer") and hasattr(harvester_config.file_syncer, "pluginConfigs"):
0041             plugin_configs = harvester_config.file_syncer.pluginConfigs
0042         else:
0043             plugin_configs = []
0044         # from plugin_configs
0045         for pc in plugin_configs:
0046             try:
0047                 setup_maps = pc["configs"]
0048                 for setup_name, setup_map in setup_maps.items():
0049                     try:
0050                         plugin_params = {"module": pc["module"], "name": pc["name"], "setup_name": setup_name}
0051                         plugin_params.update(setup_map)
0052                         exe_core = self.pluginFactory.get_plugin(plugin_params)
0053                         self.exe_cores.append(exe_core)
0054                     except Exception:
0055                         _logger.error(f"failed to launch file_syncer in pluginConfigs for {plugin_params}")
0056                         core_utils.dump_error_message(_logger)
0057             except Exception:
0058                 _logger.error(f"failed to parse pluginConfigs {pc}")
0059                 core_utils.dump_error_message(_logger)
0060 
0061     # update plugin cores from queue config
0062     def update_cores_from_queue_config(self):
0063         self.queue_exe_cores = []
0064         for queue_name, queue_config in self.queue_config_mapper.get_all_queues().items():
0065             if queue_config.queueStatus == "offline" or not hasattr(queue_config, "file_syncer") or not isinstance(queue_config.file_syncer, list):
0066                 continue
0067             for cm_setup in queue_config.file_syncer:
0068                 try:
0069                     plugin_params = {"module": cm_setup["module"], "name": cm_setup["name"], "setup_name": queue_name, "queueName": queue_name}
0070                     for k, v in cm_setup.items():
0071                         if k in ("module", "name"):
0072                             pass
0073                         if isinstance(v, str) and "$" in v:
0074                             # replace placeholders
0075                             value = v
0076                             patts = re.findall("\$\{([a-zA-Z\d_.]+)\}", v)
0077                             for patt in patts:
0078                                 tmp_ph = "${" + patt + "}"
0079                                 tmp_val = None
0080                                 if patt == "harvesterID":
0081                                     tmp_val = harvester_config.master.harvester_id
0082                                 elif patt == "queueName":
0083                                     tmp_val = queue_name
0084                                 elif patt.startswith("common."):
0085                                     # values from common blocks
0086                                     attr = patt.replace("common.", "")
0087                                     if hasattr(queue_config, "common") and attr in queue_config.common:
0088                                         tmp_val = queue_config.common[attr]
0089                                 if tmp_val is not None:
0090                                     value = value.replace(tmp_ph, tmp_val)
0091                             # fill in
0092                             plugin_params[k] = value
0093                         else:
0094                             # fill in
0095                             plugin_params[k] = v
0096                     exe_core = self.pluginFactory.get_plugin(plugin_params)
0097                     self.queue_exe_cores.append(exe_core)
0098                 except Exception:
0099                     _logger.error(f"failed to launch plugin for queue={queue_name} and {plugin_params}")
0100                     core_utils.dump_error_message(_logger)
0101 
0102     # main loop
0103     def run(self):
0104         while True:
0105             # update plugin cores from queue config
0106             self.update_cores_from_queue_config()
0107             # execute
0108             self.execute()  # this is the main run
0109             # check if being terminated
0110             if self.terminated(harvester_config.file_syncer.sleepTime, randomize=False):
0111                 return
0112 
0113     # main
0114     def execute(self):
0115         # get lock
0116         locked = self.dbProxy.get_process_lock("file_syncer", self.get_pid(), getattr(harvester_config.file_syncer, "sleepTime", 10))
0117         if not locked:
0118             return
0119         # loop over all plugins
0120         for exe_core in itertools.chain(self.exe_cores, self.queue_exe_cores):
0121             # do nothing
0122             if exe_core is None:
0123                 continue
0124             # make logger
0125             file_syncer_name = exe_core.setup_name
0126             mainLog = self.make_logger(_logger, f"{exe_core.__class__.__name__} {file_syncer_name}", method_name="execute")
0127             try:
0128                 # check freshness
0129                 mainLog.debug("check")
0130                 to_update = exe_core.check()
0131                 if not to_update:
0132                     mainLog.debug("no need to update, skip")
0133                 else:
0134                     # update if necessary
0135                     mainLog.debug("updating")
0136                     tmpStat, tmpOut = exe_core.update()
0137                     if not tmpStat:
0138                         mainLog.error(f"failed : {tmpOut}")
0139                         continue
0140                     mainLog.debug("updated")
0141             except Exception:
0142                 core_utils.dump_error_message(mainLog)
0143             mainLog.debug("done")