Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import argparse
0002 import logging
0003 import os
0004 
0005 import daemon
0006 import daemon.pidfile
0007 import rpyc
0008 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0009 
0010 # rpyc configuration
0011 rpyc.core.protocol.DEFAULT_CONFIG["allow_pickle"] = True
0012 rpyc.core.protocol.DEFAULT_CONFIG["sync_request_timeout"] = 1800
0013 
0014 
0015 # logger setup
0016 def setupLogger(logger, pid=None, to_file=None):
0017     if to_file is not None:
0018         hdlr = logging.FileHandler(to_file)
0019     else:
0020         hdlr = logging.StreamHandler()
0021 
0022     def emit_decorator(fn):
0023         def func(*args):
0024             formatter = logging.Formatter(f"%(asctime)s %(levelname)s]({pid})(%(name)s.%(funcName)s) %(message)s")
0025             hdlr.setFormatter(formatter)
0026             return fn(*args)
0027 
0028         return func
0029 
0030     hdlr.emit = emit_decorator(hdlr.emit)
0031     logger.addHandler(hdlr)
0032 
0033 
0034 # RPC bot running on remote node
0035 class RpcBot(rpyc.Service):
0036     # initialization action
0037     def on_connect(self, conn):
0038         self.pluginFactory = PluginFactory(no_db=True)
0039 
0040     ######################
0041     # submitter section
0042 
0043     # submit workers
0044     def exposed_submit_workers(self, plugin_config, workspec_list):
0045         core = self.pluginFactory.get_plugin(plugin_config)
0046         return core.submit_workers(workspec_list)
0047 
0048     ######################
0049     # monitor section
0050 
0051     # check workers
0052     def exposed_check_workers(self, plugin_config, workspec_list):
0053         core = self.pluginFactory.get_plugin(plugin_config)
0054         return core.check_workers(workspec_list)
0055 
0056     ######################
0057     # sweeper section
0058 
0059     # kill worker
0060     def exposed_kill_worker(self, plugin_config, workspec):
0061         core = self.pluginFactory.get_plugin(plugin_config)
0062         return core.kill_worker(workspec)
0063 
0064     # kill workers
0065     def exposed_kill_workers(self, plugin_config, workspec_list):
0066         core = self.pluginFactory.get_plugin(plugin_config)
0067         return core.kill_workers(workspec_list)
0068 
0069     # cleanup for a worker
0070     def exposed_sweep_worker(self, plugin_config, workspec):
0071         core = self.pluginFactory.get_plugin(plugin_config)
0072         return core.sweep_worker(workspec)
0073 
0074     ######################
0075     # messenger section
0076 
0077     # setup access points
0078     def exposed_setup_access_points(self, plugin_config, workspec_list):
0079         core = self.pluginFactory.get_plugin(plugin_config)
0080         return core.setup_access_points(workspec_list)
0081 
0082     # feed jobs
0083     def exposed_feed_jobs(self, plugin_config, workspec, jobspec_list):
0084         core = self.pluginFactory.get_plugin(plugin_config)
0085         return core.feed_jobs(workspec, jobspec_list)
0086 
0087     # request job
0088     def exposed_job_requested(self, plugin_config, workspec):
0089         core = self.pluginFactory.get_plugin(plugin_config)
0090         return core.job_requested(workspec)
0091 
0092     # request kill
0093     def exposed_kill_requested(self, plugin_config, workspec):
0094         core = self.pluginFactory.get_plugin(plugin_config)
0095         return core.kill_requested(workspec)
0096 
0097     # is alive
0098     def exposed_is_alive(self, plugin_config, workspec, worker_heartbeat_limit):
0099         core = self.pluginFactory.get_plugin(plugin_config)
0100         return core.is_alive(workspec, worker_heartbeat_limit)
0101 
0102     # get work attributes
0103     def exposed_get_work_attributes(self, plugin_config, workspec):
0104         core = self.pluginFactory.get_plugin(plugin_config)
0105         return core.get_work_attributes(workspec)
0106 
0107     # get output files
0108     def exposed_get_files_to_stage_out(self, plugin_config, workspec):
0109         core = self.pluginFactory.get_plugin(plugin_config)
0110         return core.get_files_to_stage_out(workspec)
0111 
0112     # feed events
0113     def exposed_feed_events(self, plugin_config, workspec, events_dict):
0114         core = self.pluginFactory.get_plugin(plugin_config)
0115         return core.feed_events(workspec, events_dict)
0116 
0117     # get events
0118     def exposed_events_to_update(self, plugin_config, workspec):
0119         core = self.pluginFactory.get_plugin(plugin_config)
0120         return core.events_to_update(workspec)
0121 
0122     # request events
0123     def exposed_events_requested(self, plugin_config, workspec):
0124         core = self.pluginFactory.get_plugin(plugin_config)
0125         return core.events_requested(workspec)
0126 
0127     # get PandaIDs
0128     def exposed_get_panda_ids(self, plugin_config, workspec):
0129         core = self.pluginFactory.get_plugin(plugin_config)
0130         return core.get_panda_ids(workspec)
0131 
0132     # post processing
0133     def exposed_post_processing(self, plugin_config, workspec, jobspec_list, map_type):
0134         core = self.pluginFactory.get_plugin(plugin_config)
0135         return core.post_processing(workspec, jobspec_list, map_type)
0136 
0137     # send ACK
0138     def exposed_acknowledge_events_files(self, plugin_config, workspec):
0139         core = self.pluginFactory.get_plugin(plugin_config)
0140         return core.acknowledge_events_files(workspec)
0141 
0142     # clean up
0143     def exposed_clean_up(self, plugin_config, workspec):
0144         core = self.pluginFactory.get_plugin(plugin_config)
0145         return core.clean_up(workspec)
0146 
0147     ######################
0148     # stager section
0149 
0150     # check stage out status
0151     def exposed_check_stage_out_status(self, plugin_config, jobspec):
0152         core = self.pluginFactory.get_plugin(plugin_config)
0153         return core.check_stage_out_status(jobspec)
0154 
0155     # trigger stage out
0156     def exposed_trigger_stage_out(self, plugin_config, jobspec):
0157         core = self.pluginFactory.get_plugin(plugin_config)
0158         return core.trigger_stage_out(jobspec)
0159 
0160     # zip output files
0161     def exposed_zip_output(self, plugin_config, jobspec):
0162         core = self.pluginFactory.get_plugin(plugin_config)
0163         return core.zip_output(jobspec)
0164 
0165     ######################
0166     # preparator section
0167 
0168     # check stage in status
0169     def exposed_check_stage_in_status(self, plugin_config, jobspec):
0170         core = self.pluginFactory.get_plugin(plugin_config)
0171         return core.check_stage_in_status(jobspec)
0172 
0173     # trigger preparation
0174     def exposed_trigger_preparation(self, plugin_config, jobspec):
0175         core = self.pluginFactory.get_plugin(plugin_config)
0176         return core.trigger_preparation(jobspec)
0177 
0178     # resolve input file paths
0179     def exposed_resolve_input_paths(self, plugin_config, jobspec):
0180         core = self.pluginFactory.get_plugin(plugin_config)
0181         return core.resolve_input_paths(jobspec)
0182 
0183 
0184 # main body
0185 def main():
0186     # arguments
0187     parser = argparse.ArgumentParser()
0188     parser.add_argument("--pid", action="store", dest="pid", default="/var/tmp/harvester_rpc.pid", help="pid filename")
0189     parser.add_argument("--port", dest="port", type=int, default=18861, help="the TCP port to bind to")
0190     parser.add_argument("--backlog", dest="backlog", type=int, default=10, help="backlog for the port")
0191     parser.add_argument("--stdout", action="store", dest="stdout", default="/var/tmp/harvester_rpc.out", help="stdout filename")
0192     parser.add_argument("--stderr", action="store", dest="stderr", default="/var/tmp/harvester_rpc.err", help="stderr filename")
0193     options = parser.parse_args()
0194     # logger
0195     _logger = logging.getLogger("rpc_bot")
0196     setupLogger(_logger, pid=os.getpid())
0197     # make daemon context
0198     outfile = open(options.stdout, "a+")
0199     errfile = open(options.stderr, "a+")
0200     dc = daemon.DaemonContext(pidfile=daemon.pidfile.PIDLockFile(options.pid), stdout=outfile, stderr=errfile)
0201     # run thread server
0202     with dc:
0203         from rpyc.utils.server import ThreadPoolServer
0204 
0205         t = ThreadPoolServer(RpcBot, port=options.port, backlog=options.backlog, logger=_logger, protocol_config={"allow_all_attrs": True})
0206         t.start()
0207     # finalize
0208     outfile.close()
0209     errfile.close()
0210 
0211 
0212 if __name__ == "__main__":
0213     main()