File indexing completed on 2026-04-20 07:58:58
0001 import json
0002 import types
0003
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006
0007 from .ssh_master_pool import sshMasterPool
0008
0009
0010 _logger = core_utils.setup_logger("direct_ssh_herder")
0011
0012
0013
0014 def is_mutable(obj):
0015 return isinstance(obj, (list, dict)) or hasattr(obj, "__dict__")
0016
0017
0018
0019 def update_object(old_obj, new_obj):
0020 if isinstance(old_obj, list):
0021 for i in range(len(old_obj)):
0022 try:
0023 new_obj[i]
0024 except IndexError:
0025 pass
0026 else:
0027 if is_mutable(old_obj[i]):
0028 update_object(old_obj[i], new_obj[i])
0029 else:
0030 old_obj[i] = new_obj[i]
0031 elif isinstance(old_obj, dict):
0032 for k in old_obj:
0033 try:
0034 new_obj[k]
0035 except KeyError:
0036 pass
0037 else:
0038 if is_mutable(old_obj[k]):
0039 update_object(old_obj[k], new_obj[k])
0040 else:
0041 old_obj[k] = new_obj[k]
0042 elif hasattr(old_obj, "__dict__"):
0043 for k in old_obj.__dict__:
0044 try:
0045 new_obj.__dict__[k]
0046 except KeyError:
0047 pass
0048 else:
0049 if k in ["isNew", "new_status"]:
0050
0051 pass
0052 elif is_mutable(old_obj.__dict__[k]):
0053 update_object(old_obj.__dict__[k], new_obj.__dict__[k])
0054 else:
0055 old_obj.__dict__[k] = new_obj.__dict__[k]
0056
0057
0058
0059 class Method(object):
0060
0061 def __init__(self, plugin_config, function_name, conn):
0062 self.plugin_config = plugin_config
0063 self.function_name = function_name
0064 self.conn = conn
0065
0066
0067 def __call__(self, *args, **kwargs):
0068 tmpLog = core_utils.make_logger(_logger, method_name=self.function_name)
0069 tmpLog.debug("start")
0070 if self.conn is None:
0071 tmpLog.warning(f"connection is not alive; method {self.function_name} returns None")
0072 return None
0073 params = {
0074 "plugin_config": self.plugin_config,
0075 "function_name": self.function_name,
0076 "args": core_utils.pickle_to_text(args),
0077 "kwargs": core_utils.pickle_to_text(kwargs),
0078 }
0079 stdout, stderr = self.conn.communicate(input=json.dumps(params).encode("latin_1"))
0080 if self.conn.returncode == 0:
0081 return_dict = json.loads(stdout)
0082 if "exception" in return_dict:
0083 errMsg = core_utils.unpickle_from_text(str(return_dict["dialog"]))
0084 tmpLog.error("Exception from remote : " + errMsg)
0085 raise core_utils.unpickle_from_text(str(return_dict["exception"]))
0086
0087 new_args = core_utils.unpickle_from_text(str(return_dict["args"]))
0088 for old_arg, new_arg in zip(args, new_args):
0089 update_object(old_arg, new_arg)
0090 new_kwargs = core_utils.unpickle_from_text(str(return_dict["kwargs"]))
0091 for key in kwargs:
0092 old_kwarg = kwargs[key]
0093 new_kwarg = new_kwargs[key]
0094 update_object(old_kwarg, new_kwarg)
0095 return core_utils.unpickle_from_text(str(return_dict["return"]))
0096 else:
0097 tmpLog.error(f"execution failed with {self.conn.returncode}; method={self.function_name} returns None")
0098 return None
0099
0100
0101
0102 class DirectSshHerder(PluginBase):
0103
0104 def __init__(self, **kwarg):
0105 tmpLog = core_utils.make_logger(_logger, method_name="__init__")
0106 PluginBase.__init__(self, **kwarg)
0107 self.bare_impl = None
0108 self.sshUserName = getattr(self, "sshUserName", None)
0109 self.sshPassword = getattr(self, "sshPassword", None)
0110 self.privateKey = getattr(self, "privateKey", None)
0111 self.passPhrase = getattr(self, "passPhrase", None)
0112 self.jumpHost = getattr(self, "jumpHost", None)
0113 self.jumpPort = getattr(self, "jumpPort", 22)
0114 self.remoteHost = getattr(self, "remoteHost", None)
0115 self.remotePort = getattr(self, "remotePort", 22)
0116 self.bareFunctions = getattr(self, "bareFunctions", list())
0117 self.sockDir = getattr(self, "sockDir", "/tmp")
0118 self.numMasters = getattr(self, "numMasters", 1)
0119 self.execStr = getattr(self, "execStr", "")
0120 self.connectionLifetime = getattr(self, "connectionLifetime", None)
0121 try:
0122 self._get_connection()
0123 except Exception as e:
0124 core_utils.dump_error_message(tmpLog)
0125 tmpLog.error("failed to get connection")
0126
0127
0128 def __getattr__(self, item):
0129 if item in self.__dict__:
0130 return self.__dict__[item]
0131
0132 if "bareFunctions" in self.__dict__ and self.__dict__["bareFunctions"] is not None and item in self.__dict__["bareFunctions"]:
0133 return getattr(object.__getattribute__(self, "bare_impl"), item)
0134
0135 bare_impl = object.__getattribute__(self, "bare_impl")
0136 if hasattr(bare_impl, item):
0137 if isinstance(getattr(bare_impl, item), types.MethodType):
0138 conn = self._get_connection()
0139 return Method(self.original_config, item, conn)
0140 else:
0141 return getattr(bare_impl, item)
0142
0143 raise AttributeError(item)
0144
0145
0146 def _get_connection(self):
0147 tmpLog = core_utils.make_logger(_logger, method_name="_get_connection")
0148 tmpLog.debug("start")
0149 sshMasterPool.make_control_master(
0150 self.remoteHost,
0151 self.remotePort,
0152 self.numMasters,
0153 ssh_username=self.sshUserName,
0154 ssh_password=self.sshPassword,
0155 private_key=self.privateKey,
0156 pass_phrase=self.passPhrase,
0157 jump_host=self.jumpHost,
0158 jump_port=self.jumpPort,
0159 sock_dir=self.sockDir,
0160 connection_lifetime=self.connectionLifetime,
0161 )
0162 conn = sshMasterPool.get_connection(self.remoteHost, self.remotePort, self.execStr)
0163 if conn is not None:
0164 tmpLog.debug("connected successfully")
0165 else:
0166 tmpLog.error("failed to connect")
0167 return conn