File indexing completed on 2026-04-20 07:58:58
0001 import functools
0002
0003 import rpyc
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006
0007 from .ssh_tunnel_pool import sshTunnelPool
0008
0009
0010 _logger = core_utils.setup_logger("rpc_herder")
0011
0012
0013
0014 rpyc.core.protocol.DEFAULT_CONFIG["allow_pickle"] = True
0015 rpyc.core.protocol.DEFAULT_CONFIG["sync_request_timeout"] = 1800
0016
0017
0018
0019 class RpcHerder(PluginBase):
0020
0021 def require_alive(func):
0022 @functools.wraps(func)
0023 def wrapper(self, *args, **kwargs):
0024 if self.bareFunctions is not None and func.__name__ in self.bareFunctions:
0025 return getattr(self.bare_impl, func.__name__)(*args, **kwargs)
0026 elif self.is_connected:
0027 retVal = func(self, *args, **kwargs)
0028 return rpyc.utils.classic.obtain(retVal)
0029 else:
0030 tmpLog = core_utils.make_logger(_logger, method_name=func.__name__)
0031 tmpLog.warning(f"instance not alive; method {func.__name__} returns None")
0032 return None
0033
0034 return wrapper
0035
0036
0037 def __init__(self, **kwarg):
0038 tmpLog = core_utils.make_logger(_logger, method_name="__init__")
0039 PluginBase.__init__(self, **kwarg)
0040 self.sshUserName = getattr(self, "sshUserName", None)
0041 self.sshPassword = getattr(self, "sshPassword", None)
0042 self.privateKey = getattr(self, "privateKey", None)
0043 self.passPhrase = getattr(self, "passPhrase", None)
0044 self.jumpHost = getattr(self, "jumpHost", None)
0045 self.jumpPort = getattr(self, "jumpPort", 22)
0046 self.remotePort = getattr(self, "remotePort", 22)
0047 self.bareFunctions = getattr(self, "bareFunctions", list())
0048
0049 self.is_connected = False
0050 try:
0051 self._get_connection()
0052 except Exception as e:
0053 tmpLog.error(f"failed to get connection ; {e.__class__.__name__}: {e}")
0054 else:
0055 self.is_connected = True
0056
0057
0058 def _get_connection(self):
0059 tmpLog = core_utils.make_logger(_logger, method_name="_get_connection")
0060 tmpLog.debug("start")
0061 sshTunnelPool.make_tunnel_server(
0062 self.remoteHost,
0063 self.remotePort,
0064 self.remoteBindPort,
0065 self.numTunnels,
0066 ssh_username=self.sshUserName,
0067 ssh_password=self.sshPassword,
0068 private_key=self.privateKey,
0069 pass_phrase=self.passPhrase,
0070 jump_host=self.jumpHost,
0071 jump_port=self.jumpPort,
0072 )
0073 tunnelHost, tunnelPort, tunnelCore = sshTunnelPool.get_tunnel(self.remoteHost, self.remotePort)
0074 self.conn = rpyc.connect(tunnelHost, tunnelPort, config={"allow_all_attrs": True, "allow_setattr": True, "allow_delattr": True})
0075 tmpLog.debug(f"connected successfully to {tunnelHost}:{tunnelPort}")
0076
0077
0078
0079
0080
0081 @require_alive
0082 def submit_workers(self, workspec_list):
0083 tmpLog = core_utils.make_logger(_logger, method_name="submit_workers")
0084 tmpLog.debug("start")
0085 try:
0086 ret = self.conn.root.submit_workers(self.original_config, workspec_list)
0087 except Exception:
0088 core_utils.dump_error_message(tmpLog)
0089 ret = None
0090 else:
0091 tmpLog.debug("done")
0092 return ret
0093
0094
0095
0096
0097
0098 @require_alive
0099 def check_workers(self, workspec_list):
0100 tmpLog = core_utils.make_logger(_logger, method_name="check_workers")
0101 tmpLog.debug("start")
0102 try:
0103 ret = self.conn.root.check_workers(self.original_config, workspec_list)
0104 except Exception:
0105 core_utils.dump_error_message(tmpLog)
0106 ret = None
0107 else:
0108 tmpLog.debug("done")
0109 return ret
0110
0111
0112
0113
0114
0115 @require_alive
0116 def kill_worker(self, workspec):
0117 tmpLog = core_utils.make_logger(_logger, method_name="kill_worker")
0118 tmpLog.debug("start")
0119 try:
0120 ret = self.conn.root.kill_worker(self.original_config, workspec)
0121 except Exception:
0122 core_utils.dump_error_message(tmpLog)
0123 ret = None
0124 else:
0125 tmpLog.debug("done")
0126 return ret
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138
0139
0140
0141
0142
0143
0144 @require_alive
0145 def sweep_worker(self, workspec):
0146 tmpLog = core_utils.make_logger(_logger, method_name="sweep_worker")
0147 tmpLog.debug("start")
0148 try:
0149 ret = self.conn.root.sweep_worker(self.original_config, workspec)
0150 except Exception:
0151 core_utils.dump_error_message(tmpLog)
0152 ret = None
0153 else:
0154 tmpLog.debug("done")
0155 return ret
0156
0157
0158
0159
0160
0161 @require_alive
0162 def setup_access_points(self, workspec_list):
0163 tmpLog = core_utils.make_logger(_logger, method_name="setup_access_points")
0164 tmpLog.debug("start")
0165 try:
0166 ret = self.conn.root.setup_access_points(self.original_config, workspec_list)
0167 except Exception:
0168 core_utils.dump_error_message(tmpLog)
0169 ret = None
0170 else:
0171 tmpLog.debug("done")
0172 return ret
0173
0174
0175 @require_alive
0176 def feed_jobs(self, workspec, jobspec_list):
0177 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="feed_jobs")
0178 tmpLog.debug("start")
0179 try:
0180 ret = self.conn.root.feed_jobs(self.original_config, workspec, jobspec_list)
0181 except Exception:
0182 core_utils.dump_error_message(tmpLog)
0183 ret = None
0184 else:
0185 tmpLog.debug("done")
0186 return ret
0187
0188
0189 @require_alive
0190 def job_requested(self, workspec):
0191 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="job_requested")
0192 tmpLog.debug("start")
0193 try:
0194 ret = self.conn.root.job_requested(self.original_config, workspec)
0195 except Exception:
0196 core_utils.dump_error_message(tmpLog)
0197 ret = None
0198 else:
0199 tmpLog.debug("done")
0200 return ret
0201
0202
0203 @require_alive
0204 def kill_requested(self, workspec):
0205 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="kill_requested")
0206 tmpLog.debug("start")
0207 try:
0208 ret = self.conn.root.kill_requested(self.original_config, workspec)
0209 except Exception:
0210 core_utils.dump_error_message(tmpLog)
0211 ret = None
0212 else:
0213 tmpLog.debug("done")
0214 return ret
0215
0216
0217 @require_alive
0218 def is_alive(self, workspec, worker_heartbeat_limit):
0219 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="is_alive")
0220 tmpLog.debug("start")
0221 try:
0222 ret = self.conn.root.is_alive(self.original_config, workspec, worker_heartbeat_limit)
0223 except Exception:
0224 core_utils.dump_error_message(tmpLog)
0225 ret = None
0226 else:
0227 tmpLog.debug("done")
0228 return ret
0229
0230
0231 @require_alive
0232 def get_work_attributes(self, workspec):
0233 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_work_attributes")
0234 tmpLog.debug("start")
0235 try:
0236 ret = self.conn.root.get_work_attributes(self.original_config, workspec)
0237 except Exception:
0238 core_utils.dump_error_message(tmpLog)
0239 ret = None
0240 else:
0241 tmpLog.debug("done")
0242 return ret
0243
0244
0245 @require_alive
0246 def get_files_to_stage_out(self, workspec):
0247 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_files_to_stage_out")
0248 tmpLog.debug("start")
0249 try:
0250 ret = self.conn.root.get_files_to_stage_out(self.original_config, workspec)
0251 except Exception:
0252 core_utils.dump_error_message(tmpLog)
0253 ret = None
0254 else:
0255 tmpLog.debug("done")
0256 return ret
0257
0258
0259 @require_alive
0260 def feed_events(self, workspec, events_dict):
0261 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="feed_events")
0262 tmpLog.debug("start")
0263 try:
0264 ret = self.conn.root.feed_events(self.original_config, workspec, events_dict)
0265 except Exception:
0266 core_utils.dump_error_message(tmpLog)
0267 ret = None
0268 else:
0269 tmpLog.debug("done")
0270 return ret
0271
0272
0273 @require_alive
0274 def events_to_update(self, workspec):
0275 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="events_to_update")
0276 tmpLog.debug("start")
0277 try:
0278 ret = self.conn.root.events_to_update(self.original_config, workspec)
0279 except Exception:
0280 core_utils.dump_error_message(tmpLog)
0281 ret = None
0282 else:
0283 tmpLog.debug("done")
0284 return ret
0285
0286
0287 @require_alive
0288 def events_requested(self, workspec):
0289 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="events_requested")
0290 tmpLog.debug("start")
0291 try:
0292 ret = self.conn.root.events_requested(self.original_config, workspec)
0293 except Exception:
0294 core_utils.dump_error_message(tmpLog)
0295 ret = None
0296 else:
0297 tmpLog.debug("done")
0298 return ret
0299
0300
0301 @require_alive
0302 def get_panda_ids(self, workspec):
0303 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_panda_ids")
0304 tmpLog.debug("start")
0305 try:
0306 ret = self.conn.root.get_panda_ids(self.original_config, workspec)
0307 except Exception:
0308 core_utils.dump_error_message(tmpLog)
0309 ret = None
0310 else:
0311 tmpLog.debug("done")
0312 return ret
0313
0314
0315 @require_alive
0316 def post_processing(self, workspec, jobspec_list, map_type):
0317 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="post_processing")
0318 tmpLog.debug("start")
0319 try:
0320 ret = self.conn.root.post_processing(self.original_config, workspec, jobspec_list, map_type)
0321 except Exception:
0322 core_utils.dump_error_message(tmpLog)
0323 ret = None
0324 else:
0325 tmpLog.debug("done")
0326 return ret
0327
0328
0329 @require_alive
0330 def acknowledge_events_files(self, workspec):
0331 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="acknowledge_events_files")
0332 tmpLog.debug("start")
0333 try:
0334 ret = self.conn.root.acknowledge_events_files(self.original_config, workspec)
0335 except Exception:
0336 core_utils.dump_error_message(tmpLog)
0337 ret = None
0338 else:
0339 tmpLog.debug("done")
0340 return ret
0341
0342
0343 @require_alive
0344 def clean_up(self, workspec):
0345 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="clean_up")
0346 tmpLog.debug("start")
0347 try:
0348 ret = self.conn.root.clean_up(self.original_config, workspec)
0349 except Exception:
0350 core_utils.dump_error_message(tmpLog)
0351 ret = None
0352 else:
0353 tmpLog.debug("done")
0354 return ret
0355
0356
0357
0358
0359
0360 @require_alive
0361 def check_stage_out_status(self, jobspec):
0362 tmpLog = core_utils.make_logger(_logger, method_name="check_stage_out_status")
0363 tmpLog.debug("start")
0364 try:
0365 ret = self.conn.root.check_stage_out_status(self.original_config, jobspec)
0366 except Exception:
0367 core_utils.dump_error_message(tmpLog)
0368 ret = None
0369 else:
0370 tmpLog.debug("done")
0371 return ret
0372
0373
0374 @require_alive
0375 def trigger_stage_out(self, jobspec):
0376 tmpLog = core_utils.make_logger(_logger, method_name="trigger_stage_out")
0377 tmpLog.debug("start")
0378 try:
0379 ret = self.conn.root.trigger_stage_out(self.original_config, jobspec)
0380 except Exception:
0381 core_utils.dump_error_message(tmpLog)
0382 ret = None
0383 else:
0384 tmpLog.debug("done")
0385 return ret
0386
0387
0388 @require_alive
0389 def zip_output(self, jobspec):
0390 tmpLog = core_utils.make_logger(_logger, method_name="zip_output")
0391 tmpLog.debug("start")
0392 try:
0393 ret = self.conn.root.zip_output(self.original_config, jobspec)
0394 except Exception:
0395 core_utils.dump_error_message(tmpLog)
0396 ret = None
0397 else:
0398 tmpLog.debug("done")
0399 return ret
0400
0401
0402
0403
0404
0405 @require_alive
0406 def check_stage_in_status(self, jobspec):
0407 tmpLog = core_utils.make_logger(_logger, method_name="check_stage_in_status")
0408 tmpLog.debug("start")
0409 try:
0410 ret = self.conn.root.check_stage_in_status(self.original_config, jobspec)
0411 except Exception:
0412 core_utils.dump_error_message(tmpLog)
0413 ret = None
0414 else:
0415 tmpLog.debug("done")
0416 return ret
0417
0418
0419 @require_alive
0420 def trigger_preparation(self, jobspec):
0421 tmpLog = core_utils.make_logger(_logger, method_name="trigger_preparation")
0422 tmpLog.debug("start")
0423 try:
0424 ret = self.conn.root.trigger_preparation(self.original_config, jobspec)
0425 except Exception:
0426 core_utils.dump_error_message(tmpLog)
0427 ret = None
0428 else:
0429 tmpLog.debug("done")
0430 return ret
0431
0432
0433 @require_alive
0434 def resolve_input_paths(self, jobspec):
0435 tmpLog = core_utils.make_logger(_logger, method_name="resolve_input_paths")
0436 tmpLog.debug("start")
0437 try:
0438 ret = self.conn.root.resolve_input_paths(self.original_config, jobspec)
0439 except Exception:
0440 core_utils.dump_error_message(tmpLog)
0441 ret = None
0442 else:
0443 tmpLog.debug("done")
0444 return ret