File indexing completed on 2026-04-20 07:58:58
0001 import argparse
0002 import logging
0003 import os
0004
0005 import daemon
0006 import daemon.pidfile
0007 import rpyc
0008 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0009
0010
0011 rpyc.core.protocol.DEFAULT_CONFIG["allow_pickle"] = True
0012 rpyc.core.protocol.DEFAULT_CONFIG["sync_request_timeout"] = 1800
0013
0014
0015
0016 def setupLogger(logger, pid=None, to_file=None):
0017 if to_file is not None:
0018 hdlr = logging.FileHandler(to_file)
0019 else:
0020 hdlr = logging.StreamHandler()
0021
0022 def emit_decorator(fn):
0023 def func(*args):
0024 formatter = logging.Formatter(f"%(asctime)s %(levelname)s]({pid})(%(name)s.%(funcName)s) %(message)s")
0025 hdlr.setFormatter(formatter)
0026 return fn(*args)
0027
0028 return func
0029
0030 hdlr.emit = emit_decorator(hdlr.emit)
0031 logger.addHandler(hdlr)
0032
0033
0034
0035 class RpcBot(rpyc.Service):
0036
0037 def on_connect(self, conn):
0038 self.pluginFactory = PluginFactory(no_db=True)
0039
0040
0041
0042
0043
0044 def exposed_submit_workers(self, plugin_config, workspec_list):
0045 core = self.pluginFactory.get_plugin(plugin_config)
0046 return core.submit_workers(workspec_list)
0047
0048
0049
0050
0051
0052 def exposed_check_workers(self, plugin_config, workspec_list):
0053 core = self.pluginFactory.get_plugin(plugin_config)
0054 return core.check_workers(workspec_list)
0055
0056
0057
0058
0059
0060 def exposed_kill_worker(self, plugin_config, workspec):
0061 core = self.pluginFactory.get_plugin(plugin_config)
0062 return core.kill_worker(workspec)
0063
0064
0065 def exposed_kill_workers(self, plugin_config, workspec_list):
0066 core = self.pluginFactory.get_plugin(plugin_config)
0067 return core.kill_workers(workspec_list)
0068
0069
0070 def exposed_sweep_worker(self, plugin_config, workspec):
0071 core = self.pluginFactory.get_plugin(plugin_config)
0072 return core.sweep_worker(workspec)
0073
0074
0075
0076
0077
0078 def exposed_setup_access_points(self, plugin_config, workspec_list):
0079 core = self.pluginFactory.get_plugin(plugin_config)
0080 return core.setup_access_points(workspec_list)
0081
0082
0083 def exposed_feed_jobs(self, plugin_config, workspec, jobspec_list):
0084 core = self.pluginFactory.get_plugin(plugin_config)
0085 return core.feed_jobs(workspec, jobspec_list)
0086
0087
0088 def exposed_job_requested(self, plugin_config, workspec):
0089 core = self.pluginFactory.get_plugin(plugin_config)
0090 return core.job_requested(workspec)
0091
0092
0093 def exposed_kill_requested(self, plugin_config, workspec):
0094 core = self.pluginFactory.get_plugin(plugin_config)
0095 return core.kill_requested(workspec)
0096
0097
0098 def exposed_is_alive(self, plugin_config, workspec, worker_heartbeat_limit):
0099 core = self.pluginFactory.get_plugin(plugin_config)
0100 return core.is_alive(workspec, worker_heartbeat_limit)
0101
0102
0103 def exposed_get_work_attributes(self, plugin_config, workspec):
0104 core = self.pluginFactory.get_plugin(plugin_config)
0105 return core.get_work_attributes(workspec)
0106
0107
0108 def exposed_get_files_to_stage_out(self, plugin_config, workspec):
0109 core = self.pluginFactory.get_plugin(plugin_config)
0110 return core.get_files_to_stage_out(workspec)
0111
0112
0113 def exposed_feed_events(self, plugin_config, workspec, events_dict):
0114 core = self.pluginFactory.get_plugin(plugin_config)
0115 return core.feed_events(workspec, events_dict)
0116
0117
0118 def exposed_events_to_update(self, plugin_config, workspec):
0119 core = self.pluginFactory.get_plugin(plugin_config)
0120 return core.events_to_update(workspec)
0121
0122
0123 def exposed_events_requested(self, plugin_config, workspec):
0124 core = self.pluginFactory.get_plugin(plugin_config)
0125 return core.events_requested(workspec)
0126
0127
0128 def exposed_get_panda_ids(self, plugin_config, workspec):
0129 core = self.pluginFactory.get_plugin(plugin_config)
0130 return core.get_panda_ids(workspec)
0131
0132
0133 def exposed_post_processing(self, plugin_config, workspec, jobspec_list, map_type):
0134 core = self.pluginFactory.get_plugin(plugin_config)
0135 return core.post_processing(workspec, jobspec_list, map_type)
0136
0137
0138 def exposed_acknowledge_events_files(self, plugin_config, workspec):
0139 core = self.pluginFactory.get_plugin(plugin_config)
0140 return core.acknowledge_events_files(workspec)
0141
0142
0143 def exposed_clean_up(self, plugin_config, workspec):
0144 core = self.pluginFactory.get_plugin(plugin_config)
0145 return core.clean_up(workspec)
0146
0147
0148
0149
0150
0151 def exposed_check_stage_out_status(self, plugin_config, jobspec):
0152 core = self.pluginFactory.get_plugin(plugin_config)
0153 return core.check_stage_out_status(jobspec)
0154
0155
0156 def exposed_trigger_stage_out(self, plugin_config, jobspec):
0157 core = self.pluginFactory.get_plugin(plugin_config)
0158 return core.trigger_stage_out(jobspec)
0159
0160
0161 def exposed_zip_output(self, plugin_config, jobspec):
0162 core = self.pluginFactory.get_plugin(plugin_config)
0163 return core.zip_output(jobspec)
0164
0165
0166
0167
0168
0169 def exposed_check_stage_in_status(self, plugin_config, jobspec):
0170 core = self.pluginFactory.get_plugin(plugin_config)
0171 return core.check_stage_in_status(jobspec)
0172
0173
0174 def exposed_trigger_preparation(self, plugin_config, jobspec):
0175 core = self.pluginFactory.get_plugin(plugin_config)
0176 return core.trigger_preparation(jobspec)
0177
0178
0179 def exposed_resolve_input_paths(self, plugin_config, jobspec):
0180 core = self.pluginFactory.get_plugin(plugin_config)
0181 return core.resolve_input_paths(jobspec)
0182
0183
0184
0185 def main():
0186
0187 parser = argparse.ArgumentParser()
0188 parser.add_argument("--pid", action="store", dest="pid", default="/var/tmp/harvester_rpc.pid", help="pid filename")
0189 parser.add_argument("--port", dest="port", type=int, default=18861, help="the TCP port to bind to")
0190 parser.add_argument("--backlog", dest="backlog", type=int, default=10, help="backlog for the port")
0191 parser.add_argument("--stdout", action="store", dest="stdout", default="/var/tmp/harvester_rpc.out", help="stdout filename")
0192 parser.add_argument("--stderr", action="store", dest="stderr", default="/var/tmp/harvester_rpc.err", help="stderr filename")
0193 options = parser.parse_args()
0194
0195 _logger = logging.getLogger("rpc_bot")
0196 setupLogger(_logger, pid=os.getpid())
0197
0198 outfile = open(options.stdout, "a+")
0199 errfile = open(options.stderr, "a+")
0200 dc = daemon.DaemonContext(pidfile=daemon.pidfile.PIDLockFile(options.pid), stdout=outfile, stderr=errfile)
0201
0202 with dc:
0203 from rpyc.utils.server import ThreadPoolServer
0204
0205 t = ThreadPoolServer(RpcBot, port=options.port, backlog=options.backlog, logger=_logger, protocol_config={"allow_all_attrs": True})
0206 t.start()
0207
0208 outfile.close()
0209 errfile.close()
0210
0211
0212 if __name__ == "__main__":
0213 main()