File indexing completed on 2026-04-19 08:00:02
0001 import os
0002 import random
0003 import subprocess
0004 import tempfile
0005 import threading
0006 import time
0007 import uuid
0008
0009 import pexpect
0010
0011 from pandaharvester.harvestercore import core_utils
0012
0013 pexpect_spawn = pexpect.spawnu
0014
0015
0016 baseLogger = core_utils.setup_logger("ssh_master_pool")
0017
0018
0019
0020 class SshMasterPool(object):
0021
0022 def __init__(self):
0023 self.lock = threading.Lock()
0024 self.pool = dict()
0025 self.params = dict()
0026
0027
0028 def make_dict_key(self, host, port):
0029 return f"{host}:{port}"
0030
0031
0032 def make_control_master(
0033 self,
0034 remote_host,
0035 remote_port,
0036 num_masters=1,
0037 ssh_username=None,
0038 ssh_password=None,
0039 private_key=None,
0040 pass_phrase=None,
0041 jump_host=None,
0042 jump_port=None,
0043 login_timeout=60,
0044 reconnect=False,
0045 with_lock=True,
0046 sock_dir=None,
0047 connection_lifetime=None,
0048 ):
0049 dict_key = self.make_dict_key(remote_host, remote_port)
0050 if with_lock:
0051 self.lock.acquire()
0052
0053 if dict_key not in self.pool:
0054 self.pool[dict_key] = []
0055
0056 if not reconnect:
0057 self.params[dict_key] = {
0058 "num_masters": num_masters,
0059 "ssh_username": ssh_username,
0060 "ssh_password": ssh_password,
0061 "private_key": private_key,
0062 "pass_phrase": pass_phrase,
0063 "jump_host": jump_host,
0064 "jump_port": jump_port,
0065 "login_timeout": login_timeout,
0066 "sock_dir": sock_dir,
0067 "connection_lifetime": connection_lifetime,
0068 }
0069 else:
0070 num_masters = self.params[dict_key]["num_masters"]
0071 ssh_username = self.params[dict_key]["ssh_username"]
0072 ssh_password = self.params[dict_key]["ssh_password"]
0073 private_key = self.params[dict_key]["private_key"]
0074 pass_phrase = self.params[dict_key]["pass_phrase"]
0075 jump_host = self.params[dict_key]["jump_host"]
0076 jump_port = self.params[dict_key]["jump_port"]
0077 login_timeout = self.params[dict_key]["login_timeout"]
0078 sock_dir = self.params[dict_key]["sock_dir"]
0079 connection_lifetime = self.params[dict_key]["connection_lifetime"]
0080
0081 for i in range(num_masters - len(self.pool[dict_key])):
0082
0083 sock_file = os.path.join(sock_dir, f"sock_{remote_host}_{uuid.uuid4().hex}")
0084 com = "ssh -M -S {sock_file} "
0085 com += "-p {remote_port} {ssh_username}@{remote_host} "
0086 com += "-o ServerAliveInterval=120 -o ServerAliveCountMax=2 "
0087 if private_key is not None:
0088 com += "-i {private_key} "
0089 if jump_host is not None and jump_port is not None:
0090 com += '-o ProxyCommand="ssh -p {jump_port} {ssh_username}@{jump_host} -W %h:%p" '
0091 com = com.format(
0092 remote_host=remote_host,
0093 remote_port=remote_port,
0094 ssh_username=ssh_username,
0095 private_key=private_key,
0096 jump_host=jump_host,
0097 jump_port=jump_port,
0098 sock_file=sock_file,
0099 )
0100 loginString = "login_to_be_confirmed_with " + uuid.uuid4().hex
0101 com += f"'echo {loginString}; bash"
0102
0103 expected_list = [
0104 pexpect.EOF,
0105 pexpect.TIMEOUT,
0106 "(?i)are you sure you want to continue connecting",
0107 "(?i)password:",
0108 "(?i)enter passphrase for key.*",
0109 loginString,
0110 ]
0111 c = pexpect_spawn(com, echo=False)
0112 baseLogger.debug("pexpect_spawn")
0113 c.logfile_read = baseLogger.handlers[0].stream
0114 isOK = False
0115 for iTry in range(3):
0116 idx = c.expect(expected_list, timeout=login_timeout)
0117 if idx == expected_list.index(loginString):
0118
0119 isOK = True
0120 break
0121 if idx == 1:
0122
0123 baseLogger.error(f"timeout when making a master with com={com} out={c.buffer}")
0124 c.close()
0125 break
0126 if idx == 2:
0127
0128 c.sendline("yes")
0129 idx = c.expect(expected_list, timeout=login_timeout)
0130 if idx == 1:
0131
0132 baseLogger.error(f"timeout after accepting new cert with com={com} out={c.buffer}")
0133 c.close()
0134 break
0135 if idx == 3:
0136
0137 c.sendline(ssh_password)
0138 elif idx == 4:
0139
0140 c.sendline(pass_phrase)
0141 elif idx == 0:
0142 baseLogger.error(f"something weired with com={com} out={c.buffer}")
0143 c.close()
0144 break
0145
0146 c.sendline(f"echo {loginString}")
0147 if isOK:
0148 conn_exp_time = (time.time() + connection_lifetime) if connection_lifetime is not None else None
0149 self.pool[dict_key].append((sock_file, c, conn_exp_time))
0150 if with_lock:
0151 self.lock.release()
0152
0153
0154 def get_connection(self, remote_host, remote_port, exec_string):
0155 baseLogger.debug("get_connection start")
0156 dict_key = self.make_dict_key(remote_host, remote_port)
0157 self.lock.acquire()
0158 active_masters = []
0159 someClosed = False
0160 for sock_file, child, conn_exp_time in list(self.pool[dict_key]):
0161 if child.isalive() and time.time() <= conn_exp_time:
0162 active_masters.append((sock_file, child, conn_exp_time))
0163 else:
0164 child.close()
0165 self.pool[dict_key].remove((sock_file, child, conn_exp_time))
0166 someClosed = True
0167 if child.isalive():
0168 baseLogger.debug("a connection process is dead")
0169 else:
0170 baseLogger.debug("a connection is expired")
0171 if someClosed:
0172 self.make_control_master(remote_host, remote_port, reconnect=True, with_lock=False)
0173 active_masters = [item for item in self.pool[dict_key] if os.path.exists(item[0])]
0174 baseLogger.debug(f"reconnected; now {len(active_masters)} active connections")
0175 if len(active_masters) > 0:
0176 sock_file, child, conn_exp_time = random.choice(active_masters)
0177 con = subprocess.Popen(
0178 ["ssh", "dummy", "-S", sock_file, exec_string], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
0179 )
0180 else:
0181 con = None
0182 self.lock.release()
0183 return con
0184
0185
0186
0187 sshMasterPool = SshMasterPool()
0188 del SshMasterPool