Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import json
0002 import types
0003 
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 
0007 from .ssh_master_pool import sshMasterPool
0008 
0009 # logger
0010 _logger = core_utils.setup_logger("direct_ssh_herder")
0011 
0012 
0013 # is mutable object to handle
0014 def is_mutable(obj):
0015     return isinstance(obj, (list, dict)) or hasattr(obj, "__dict__")
0016 
0017 
0018 # update changes recursively of an object from a new object
0019 def update_object(old_obj, new_obj):
0020     if isinstance(old_obj, list):
0021         for i in range(len(old_obj)):
0022             try:
0023                 new_obj[i]
0024             except IndexError:
0025                 pass
0026             else:
0027                 if is_mutable(old_obj[i]):
0028                     update_object(old_obj[i], new_obj[i])
0029                 else:
0030                     old_obj[i] = new_obj[i]
0031     elif isinstance(old_obj, dict):
0032         for k in old_obj:
0033             try:
0034                 new_obj[k]
0035             except KeyError:
0036                 pass
0037             else:
0038                 if is_mutable(old_obj[k]):
0039                     update_object(old_obj[k], new_obj[k])
0040                 else:
0041                     old_obj[k] = new_obj[k]
0042     elif hasattr(old_obj, "__dict__"):
0043         for k in old_obj.__dict__:
0044             try:
0045                 new_obj.__dict__[k]
0046             except KeyError:
0047                 pass
0048             else:
0049                 if k in ["isNew", "new_status"]:
0050                     # skip attributes omitted in workspec pickling
0051                     pass
0052                 elif is_mutable(old_obj.__dict__[k]):
0053                     update_object(old_obj.__dict__[k], new_obj.__dict__[k])
0054                 else:
0055                     old_obj.__dict__[k] = new_obj.__dict__[k]
0056 
0057 
0058 # function class
0059 class Method(object):
0060     # constructor
0061     def __init__(self, plugin_config, function_name, conn):
0062         self.plugin_config = plugin_config
0063         self.function_name = function_name
0064         self.conn = conn
0065 
0066     # execution
0067     def __call__(self, *args, **kwargs):
0068         tmpLog = core_utils.make_logger(_logger, method_name=self.function_name)
0069         tmpLog.debug("start")
0070         if self.conn is None:
0071             tmpLog.warning(f"connection is not alive; method {self.function_name} returns None")
0072             return None
0073         params = {
0074             "plugin_config": self.plugin_config,
0075             "function_name": self.function_name,
0076             "args": core_utils.pickle_to_text(args),
0077             "kwargs": core_utils.pickle_to_text(kwargs),
0078         }
0079         stdout, stderr = self.conn.communicate(input=json.dumps(params).encode("latin_1"))
0080         if self.conn.returncode == 0:
0081             return_dict = json.loads(stdout)
0082             if "exception" in return_dict:
0083                 errMsg = core_utils.unpickle_from_text(str(return_dict["dialog"]))
0084                 tmpLog.error("Exception from remote : " + errMsg)
0085                 raise core_utils.unpickle_from_text(str(return_dict["exception"]))
0086             # propagate changes in mutable args
0087             new_args = core_utils.unpickle_from_text(str(return_dict["args"]))
0088             for old_arg, new_arg in zip(args, new_args):
0089                 update_object(old_arg, new_arg)
0090             new_kwargs = core_utils.unpickle_from_text(str(return_dict["kwargs"]))
0091             for key in kwargs:
0092                 old_kwarg = kwargs[key]
0093                 new_kwarg = new_kwargs[key]
0094                 update_object(old_kwarg, new_kwarg)
0095             return core_utils.unpickle_from_text(str(return_dict["return"]))
0096         else:
0097             tmpLog.error(f"execution failed with {self.conn.returncode}; method={self.function_name} returns None")
0098             return None
0099 
0100 
0101 # Direct SSH herder
0102 class DirectSshHerder(PluginBase):
0103     # constructor
0104     def __init__(self, **kwarg):
0105         tmpLog = core_utils.make_logger(_logger, method_name="__init__")
0106         PluginBase.__init__(self, **kwarg)
0107         self.bare_impl = None
0108         self.sshUserName = getattr(self, "sshUserName", None)
0109         self.sshPassword = getattr(self, "sshPassword", None)
0110         self.privateKey = getattr(self, "privateKey", None)
0111         self.passPhrase = getattr(self, "passPhrase", None)
0112         self.jumpHost = getattr(self, "jumpHost", None)
0113         self.jumpPort = getattr(self, "jumpPort", 22)
0114         self.remoteHost = getattr(self, "remoteHost", None)
0115         self.remotePort = getattr(self, "remotePort", 22)
0116         self.bareFunctions = getattr(self, "bareFunctions", list())
0117         self.sockDir = getattr(self, "sockDir", "/tmp")
0118         self.numMasters = getattr(self, "numMasters", 1)
0119         self.execStr = getattr(self, "execStr", "")
0120         self.connectionLifetime = getattr(self, "connectionLifetime", None)
0121         try:
0122             self._get_connection()
0123         except Exception as e:
0124             core_utils.dump_error_message(tmpLog)
0125             tmpLog.error("failed to get connection")
0126 
0127     # get attribute
0128     def __getattr__(self, item):
0129         if item in self.__dict__:
0130             return self.__dict__[item]
0131         # bare functions
0132         if "bareFunctions" in self.__dict__ and self.__dict__["bareFunctions"] is not None and item in self.__dict__["bareFunctions"]:
0133             return getattr(object.__getattribute__(self, "bare_impl"), item)
0134         # remote functions
0135         bare_impl = object.__getattribute__(self, "bare_impl")
0136         if hasattr(bare_impl, item):
0137             if isinstance(getattr(bare_impl, item), types.MethodType):
0138                 conn = self._get_connection()
0139                 return Method(self.original_config, item, conn)
0140             else:
0141                 return getattr(bare_impl, item)
0142         # others
0143         raise AttributeError(item)
0144 
0145     # ssh connection
0146     def _get_connection(self):
0147         tmpLog = core_utils.make_logger(_logger, method_name="_get_connection")
0148         tmpLog.debug("start")
0149         sshMasterPool.make_control_master(
0150             self.remoteHost,
0151             self.remotePort,
0152             self.numMasters,
0153             ssh_username=self.sshUserName,
0154             ssh_password=self.sshPassword,
0155             private_key=self.privateKey,
0156             pass_phrase=self.passPhrase,
0157             jump_host=self.jumpHost,
0158             jump_port=self.jumpPort,
0159             sock_dir=self.sockDir,
0160             connection_lifetime=self.connectionLifetime,
0161         )
0162         conn = sshMasterPool.get_connection(self.remoteHost, self.remotePort, self.execStr)
0163         if conn is not None:
0164             tmpLog.debug("connected successfully")
0165         else:
0166             tmpLog.error("failed to connect")
0167         return conn