Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import functools
0002 
0003 import rpyc
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 
0007 from .ssh_tunnel_pool import sshTunnelPool
0008 
0009 # logger
0010 _logger = core_utils.setup_logger("rpc_herder")
0011 
0012 
0013 # rpyc configuration
0014 rpyc.core.protocol.DEFAULT_CONFIG["allow_pickle"] = True
0015 rpyc.core.protocol.DEFAULT_CONFIG["sync_request_timeout"] = 1800
0016 
0017 
0018 # RPC herder
0019 class RpcHerder(PluginBase):
0020     # decorator
0021     def require_alive(func):
0022         @functools.wraps(func)
0023         def wrapper(self, *args, **kwargs):
0024             if self.bareFunctions is not None and func.__name__ in self.bareFunctions:
0025                 return getattr(self.bare_impl, func.__name__)(*args, **kwargs)
0026             elif self.is_connected:
0027                 retVal = func(self, *args, **kwargs)
0028                 return rpyc.utils.classic.obtain(retVal)
0029             else:
0030                 tmpLog = core_utils.make_logger(_logger, method_name=func.__name__)
0031                 tmpLog.warning(f"instance not alive; method {func.__name__} returns None")
0032                 return None
0033 
0034         return wrapper
0035 
0036     # constructor
0037     def __init__(self, **kwarg):
0038         tmpLog = core_utils.make_logger(_logger, method_name="__init__")
0039         PluginBase.__init__(self, **kwarg)
0040         self.sshUserName = getattr(self, "sshUserName", None)
0041         self.sshPassword = getattr(self, "sshPassword", None)
0042         self.privateKey = getattr(self, "privateKey", None)
0043         self.passPhrase = getattr(self, "passPhrase", None)
0044         self.jumpHost = getattr(self, "jumpHost", None)
0045         self.jumpPort = getattr(self, "jumpPort", 22)
0046         self.remotePort = getattr(self, "remotePort", 22)
0047         self.bareFunctions = getattr(self, "bareFunctions", list())
0048         # is connected only if ssh forwarding works
0049         self.is_connected = False
0050         try:
0051             self._get_connection()
0052         except Exception as e:
0053             tmpLog.error(f"failed to get connection ; {e.__class__.__name__}: {e}")
0054         else:
0055             self.is_connected = True
0056 
0057     # ssh and rpc connect
0058     def _get_connection(self):
0059         tmpLog = core_utils.make_logger(_logger, method_name="_get_connection")
0060         tmpLog.debug("start")
0061         sshTunnelPool.make_tunnel_server(
0062             self.remoteHost,
0063             self.remotePort,
0064             self.remoteBindPort,
0065             self.numTunnels,
0066             ssh_username=self.sshUserName,
0067             ssh_password=self.sshPassword,
0068             private_key=self.privateKey,
0069             pass_phrase=self.passPhrase,
0070             jump_host=self.jumpHost,
0071             jump_port=self.jumpPort,
0072         )
0073         tunnelHost, tunnelPort, tunnelCore = sshTunnelPool.get_tunnel(self.remoteHost, self.remotePort)
0074         self.conn = rpyc.connect(tunnelHost, tunnelPort, config={"allow_all_attrs": True, "allow_setattr": True, "allow_delattr": True})
0075         tmpLog.debug(f"connected successfully to {tunnelHost}:{tunnelPort}")
0076 
0077     ######################
0078     # submitter section
0079 
0080     # submit workers
0081     @require_alive
0082     def submit_workers(self, workspec_list):
0083         tmpLog = core_utils.make_logger(_logger, method_name="submit_workers")
0084         tmpLog.debug("start")
0085         try:
0086             ret = self.conn.root.submit_workers(self.original_config, workspec_list)
0087         except Exception:
0088             core_utils.dump_error_message(tmpLog)
0089             ret = None
0090         else:
0091             tmpLog.debug("done")
0092         return ret
0093 
0094     ######################
0095     # monitor section
0096 
0097     # check workers
0098     @require_alive
0099     def check_workers(self, workspec_list):
0100         tmpLog = core_utils.make_logger(_logger, method_name="check_workers")
0101         tmpLog.debug("start")
0102         try:
0103             ret = self.conn.root.check_workers(self.original_config, workspec_list)
0104         except Exception:
0105             core_utils.dump_error_message(tmpLog)
0106             ret = None
0107         else:
0108             tmpLog.debug("done")
0109         return ret
0110 
0111     ######################
0112     # sweeper section
0113 
0114     # kill worker
0115     @require_alive
0116     def kill_worker(self, workspec):
0117         tmpLog = core_utils.make_logger(_logger, method_name="kill_worker")
0118         tmpLog.debug("start")
0119         try:
0120             ret = self.conn.root.kill_worker(self.original_config, workspec)
0121         except Exception:
0122             core_utils.dump_error_message(tmpLog)
0123             ret = None
0124         else:
0125             tmpLog.debug("done")
0126         return ret
0127 
0128     # FIXME: cannot have this yet otherwise sweeper agent see this method while the real plugin may not implemented this method yet...
0129     # kill workers
0130     # @require_alive
0131     # def kill_workers(self, workspec_list):
0132     #     tmpLog = core_utils.make_logger(_logger, method_name='kill_workers')
0133     #     tmpLog.debug('start')
0134     #     try:
0135     #         ret = self.conn.root.kill_workers(self.original_config, workspec_list)
0136     #     except Exception:
0137     #         core_utils.dump_error_message(tmpLog)
0138     #         ret = None
0139     #     else:
0140     #         tmpLog.debug('done')
0141     #     return ret
0142 
0143     # cleanup for a worker
0144     @require_alive
0145     def sweep_worker(self, workspec):
0146         tmpLog = core_utils.make_logger(_logger, method_name="sweep_worker")
0147         tmpLog.debug("start")
0148         try:
0149             ret = self.conn.root.sweep_worker(self.original_config, workspec)
0150         except Exception:
0151             core_utils.dump_error_message(tmpLog)
0152             ret = None
0153         else:
0154             tmpLog.debug("done")
0155         return ret
0156 
0157     ######################
0158     # messenger section
0159 
0160     # setup access points
0161     @require_alive
0162     def setup_access_points(self, workspec_list):
0163         tmpLog = core_utils.make_logger(_logger, method_name="setup_access_points")
0164         tmpLog.debug("start")
0165         try:
0166             ret = self.conn.root.setup_access_points(self.original_config, workspec_list)
0167         except Exception:
0168             core_utils.dump_error_message(tmpLog)
0169             ret = None
0170         else:
0171             tmpLog.debug("done")
0172         return ret
0173 
0174     # feed jobs
0175     @require_alive
0176     def feed_jobs(self, workspec, jobspec_list):
0177         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="feed_jobs")
0178         tmpLog.debug("start")
0179         try:
0180             ret = self.conn.root.feed_jobs(self.original_config, workspec, jobspec_list)
0181         except Exception:
0182             core_utils.dump_error_message(tmpLog)
0183             ret = None
0184         else:
0185             tmpLog.debug("done")
0186         return ret
0187 
0188     # request job
0189     @require_alive
0190     def job_requested(self, workspec):
0191         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="job_requested")
0192         tmpLog.debug("start")
0193         try:
0194             ret = self.conn.root.job_requested(self.original_config, workspec)
0195         except Exception:
0196             core_utils.dump_error_message(tmpLog)
0197             ret = None
0198         else:
0199             tmpLog.debug("done")
0200         return ret
0201 
0202     # request kill
0203     @require_alive
0204     def kill_requested(self, workspec):
0205         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="kill_requested")
0206         tmpLog.debug("start")
0207         try:
0208             ret = self.conn.root.kill_requested(self.original_config, workspec)
0209         except Exception:
0210             core_utils.dump_error_message(tmpLog)
0211             ret = None
0212         else:
0213             tmpLog.debug("done")
0214         return ret
0215 
0216     # is alive
0217     @require_alive
0218     def is_alive(self, workspec, worker_heartbeat_limit):
0219         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="is_alive")
0220         tmpLog.debug("start")
0221         try:
0222             ret = self.conn.root.is_alive(self.original_config, workspec, worker_heartbeat_limit)
0223         except Exception:
0224             core_utils.dump_error_message(tmpLog)
0225             ret = None
0226         else:
0227             tmpLog.debug("done")
0228         return ret
0229 
0230     # get work attributes
0231     @require_alive
0232     def get_work_attributes(self, workspec):
0233         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_work_attributes")
0234         tmpLog.debug("start")
0235         try:
0236             ret = self.conn.root.get_work_attributes(self.original_config, workspec)
0237         except Exception:
0238             core_utils.dump_error_message(tmpLog)
0239             ret = None
0240         else:
0241             tmpLog.debug("done")
0242         return ret
0243 
0244     # get output files
0245     @require_alive
0246     def get_files_to_stage_out(self, workspec):
0247         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_files_to_stage_out")
0248         tmpLog.debug("start")
0249         try:
0250             ret = self.conn.root.get_files_to_stage_out(self.original_config, workspec)
0251         except Exception:
0252             core_utils.dump_error_message(tmpLog)
0253             ret = None
0254         else:
0255             tmpLog.debug("done")
0256         return ret
0257 
0258     # feed events
0259     @require_alive
0260     def feed_events(self, workspec, events_dict):
0261         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="feed_events")
0262         tmpLog.debug("start")
0263         try:
0264             ret = self.conn.root.feed_events(self.original_config, workspec, events_dict)
0265         except Exception:
0266             core_utils.dump_error_message(tmpLog)
0267             ret = None
0268         else:
0269             tmpLog.debug("done")
0270         return ret
0271 
0272     # get events
0273     @require_alive
0274     def events_to_update(self, workspec):
0275         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="events_to_update")
0276         tmpLog.debug("start")
0277         try:
0278             ret = self.conn.root.events_to_update(self.original_config, workspec)
0279         except Exception:
0280             core_utils.dump_error_message(tmpLog)
0281             ret = None
0282         else:
0283             tmpLog.debug("done")
0284         return ret
0285 
0286     # request events
0287     @require_alive
0288     def events_requested(self, workspec):
0289         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="events_requested")
0290         tmpLog.debug("start")
0291         try:
0292             ret = self.conn.root.events_requested(self.original_config, workspec)
0293         except Exception:
0294             core_utils.dump_error_message(tmpLog)
0295             ret = None
0296         else:
0297             tmpLog.debug("done")
0298         return ret
0299 
0300     # get PandaIDs
0301     @require_alive
0302     def get_panda_ids(self, workspec):
0303         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_panda_ids")
0304         tmpLog.debug("start")
0305         try:
0306             ret = self.conn.root.get_panda_ids(self.original_config, workspec)
0307         except Exception:
0308             core_utils.dump_error_message(tmpLog)
0309             ret = None
0310         else:
0311             tmpLog.debug("done")
0312         return ret
0313 
0314     # post processing
0315     @require_alive
0316     def post_processing(self, workspec, jobspec_list, map_type):
0317         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="post_processing")
0318         tmpLog.debug("start")
0319         try:
0320             ret = self.conn.root.post_processing(self.original_config, workspec, jobspec_list, map_type)
0321         except Exception:
0322             core_utils.dump_error_message(tmpLog)
0323             ret = None
0324         else:
0325             tmpLog.debug("done")
0326         return ret
0327 
0328     # send ACK
0329     @require_alive
0330     def acknowledge_events_files(self, workspec):
0331         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="acknowledge_events_files")
0332         tmpLog.debug("start")
0333         try:
0334             ret = self.conn.root.acknowledge_events_files(self.original_config, workspec)
0335         except Exception:
0336             core_utils.dump_error_message(tmpLog)
0337             ret = None
0338         else:
0339             tmpLog.debug("done")
0340         return ret
0341 
0342     # clean up
0343     @require_alive
0344     def clean_up(self, workspec):
0345         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="clean_up")
0346         tmpLog.debug("start")
0347         try:
0348             ret = self.conn.root.clean_up(self.original_config, workspec)
0349         except Exception:
0350             core_utils.dump_error_message(tmpLog)
0351             ret = None
0352         else:
0353             tmpLog.debug("done")
0354         return ret
0355 
0356     ######################
0357     # stager section
0358 
0359     # check stage out status
0360     @require_alive
0361     def check_stage_out_status(self, jobspec):
0362         tmpLog = core_utils.make_logger(_logger, method_name="check_stage_out_status")
0363         tmpLog.debug("start")
0364         try:
0365             ret = self.conn.root.check_stage_out_status(self.original_config, jobspec)
0366         except Exception:
0367             core_utils.dump_error_message(tmpLog)
0368             ret = None
0369         else:
0370             tmpLog.debug("done")
0371         return ret
0372 
0373     # trigger stage out
0374     @require_alive
0375     def trigger_stage_out(self, jobspec):
0376         tmpLog = core_utils.make_logger(_logger, method_name="trigger_stage_out")
0377         tmpLog.debug("start")
0378         try:
0379             ret = self.conn.root.trigger_stage_out(self.original_config, jobspec)
0380         except Exception:
0381             core_utils.dump_error_message(tmpLog)
0382             ret = None
0383         else:
0384             tmpLog.debug("done")
0385         return ret
0386 
0387     # zip output files
0388     @require_alive
0389     def zip_output(self, jobspec):
0390         tmpLog = core_utils.make_logger(_logger, method_name="zip_output")
0391         tmpLog.debug("start")
0392         try:
0393             ret = self.conn.root.zip_output(self.original_config, jobspec)
0394         except Exception:
0395             core_utils.dump_error_message(tmpLog)
0396             ret = None
0397         else:
0398             tmpLog.debug("done")
0399         return ret
0400 
0401     ######################
0402     # preparator section
0403 
0404     # check stage in status
0405     @require_alive
0406     def check_stage_in_status(self, jobspec):
0407         tmpLog = core_utils.make_logger(_logger, method_name="check_stage_in_status")
0408         tmpLog.debug("start")
0409         try:
0410             ret = self.conn.root.check_stage_in_status(self.original_config, jobspec)
0411         except Exception:
0412             core_utils.dump_error_message(tmpLog)
0413             ret = None
0414         else:
0415             tmpLog.debug("done")
0416         return ret
0417 
0418     # trigger preparation
0419     @require_alive
0420     def trigger_preparation(self, jobspec):
0421         tmpLog = core_utils.make_logger(_logger, method_name="trigger_preparation")
0422         tmpLog.debug("start")
0423         try:
0424             ret = self.conn.root.trigger_preparation(self.original_config, jobspec)
0425         except Exception:
0426             core_utils.dump_error_message(tmpLog)
0427             ret = None
0428         else:
0429             tmpLog.debug("done")
0430         return ret
0431 
0432     # resolve input file paths
0433     @require_alive
0434     def resolve_input_paths(self, jobspec):
0435         tmpLog = core_utils.make_logger(_logger, method_name="resolve_input_paths")
0436         tmpLog.debug("start")
0437         try:
0438             ret = self.conn.root.resolve_input_paths(self.original_config, jobspec)
0439         except Exception:
0440             core_utils.dump_error_message(tmpLog)
0441             ret = None
0442         else:
0443             tmpLog.debug("done")
0444         return ret