File indexing completed on 2026-04-20 07:58:56
0001 import itertools
0002 import re
0003
0004 from pandaharvester.harvesterbody.agent_base import AgentBase
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0008 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0009
0010
0011 _logger = core_utils.setup_logger("file_syncer")
0012
0013
0014
0015 class FileSyncer(AgentBase):
0016
0017 def __init__(self, queue_config_mapper, single_mode=False):
0018 AgentBase.__init__(self, single_mode)
0019 self.queue_config_mapper = queue_config_mapper
0020 self.pluginFactory = PluginFactory()
0021 self.dbProxy = DBProxy()
0022
0023 self.exe_cores = []
0024 self.queue_exe_cores = []
0025
0026 self.get_cores_from_harvester_config()
0027
0028 self.update_cores_from_queue_config()
0029
0030
0031 def get_list(self, data):
0032 if isinstance(data, list):
0033 return data
0034 else:
0035 return [data]
0036
0037
0038 def get_cores_from_harvester_config(self):
0039
0040 if hasattr(harvester_config, "file_syncer") and hasattr(harvester_config.file_syncer, "pluginConfigs"):
0041 plugin_configs = harvester_config.file_syncer.pluginConfigs
0042 else:
0043 plugin_configs = []
0044
0045 for pc in plugin_configs:
0046 try:
0047 setup_maps = pc["configs"]
0048 for setup_name, setup_map in setup_maps.items():
0049 try:
0050 plugin_params = {"module": pc["module"], "name": pc["name"], "setup_name": setup_name}
0051 plugin_params.update(setup_map)
0052 exe_core = self.pluginFactory.get_plugin(plugin_params)
0053 self.exe_cores.append(exe_core)
0054 except Exception:
0055 _logger.error(f"failed to launch file_syncer in pluginConfigs for {plugin_params}")
0056 core_utils.dump_error_message(_logger)
0057 except Exception:
0058 _logger.error(f"failed to parse pluginConfigs {pc}")
0059 core_utils.dump_error_message(_logger)
0060
0061
0062 def update_cores_from_queue_config(self):
0063 self.queue_exe_cores = []
0064 for queue_name, queue_config in self.queue_config_mapper.get_all_queues().items():
0065 if queue_config.queueStatus == "offline" or not hasattr(queue_config, "file_syncer") or not isinstance(queue_config.file_syncer, list):
0066 continue
0067 for cm_setup in queue_config.file_syncer:
0068 try:
0069 plugin_params = {"module": cm_setup["module"], "name": cm_setup["name"], "setup_name": queue_name, "queueName": queue_name}
0070 for k, v in cm_setup.items():
0071 if k in ("module", "name"):
0072 pass
0073 if isinstance(v, str) and "$" in v:
0074
0075 value = v
0076 patts = re.findall("\$\{([a-zA-Z\d_.]+)\}", v)
0077 for patt in patts:
0078 tmp_ph = "${" + patt + "}"
0079 tmp_val = None
0080 if patt == "harvesterID":
0081 tmp_val = harvester_config.master.harvester_id
0082 elif patt == "queueName":
0083 tmp_val = queue_name
0084 elif patt.startswith("common."):
0085
0086 attr = patt.replace("common.", "")
0087 if hasattr(queue_config, "common") and attr in queue_config.common:
0088 tmp_val = queue_config.common[attr]
0089 if tmp_val is not None:
0090 value = value.replace(tmp_ph, tmp_val)
0091
0092 plugin_params[k] = value
0093 else:
0094
0095 plugin_params[k] = v
0096 exe_core = self.pluginFactory.get_plugin(plugin_params)
0097 self.queue_exe_cores.append(exe_core)
0098 except Exception:
0099 _logger.error(f"failed to launch plugin for queue={queue_name} and {plugin_params}")
0100 core_utils.dump_error_message(_logger)
0101
0102
0103 def run(self):
0104 while True:
0105
0106 self.update_cores_from_queue_config()
0107
0108 self.execute()
0109
0110 if self.terminated(harvester_config.file_syncer.sleepTime, randomize=False):
0111 return
0112
0113
0114 def execute(self):
0115
0116 locked = self.dbProxy.get_process_lock("file_syncer", self.get_pid(), getattr(harvester_config.file_syncer, "sleepTime", 10))
0117 if not locked:
0118 return
0119
0120 for exe_core in itertools.chain(self.exe_cores, self.queue_exe_cores):
0121
0122 if exe_core is None:
0123 continue
0124
0125 file_syncer_name = exe_core.setup_name
0126 mainLog = self.make_logger(_logger, f"{exe_core.__class__.__name__} {file_syncer_name}", method_name="execute")
0127 try:
0128
0129 mainLog.debug("check")
0130 to_update = exe_core.check()
0131 if not to_update:
0132 mainLog.debug("no need to update, skip")
0133 else:
0134
0135 mainLog.debug("updating")
0136 tmpStat, tmpOut = exe_core.update()
0137 if not tmpStat:
0138 mainLog.error(f"failed : {tmpOut}")
0139 continue
0140 mainLog.debug("updated")
0141 except Exception:
0142 core_utils.dump_error_message(mainLog)
0143 mainLog.debug("done")