Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:02

0001 import os
0002 import random
0003 import subprocess
0004 import tempfile
0005 import threading
0006 import time
0007 import uuid
0008 
0009 import pexpect
0010 
0011 from pandaharvester.harvestercore import core_utils
0012 
0013 pexpect_spawn = pexpect.spawnu
0014 
0015 # logger
0016 baseLogger = core_utils.setup_logger("ssh_master_pool")
0017 
0018 
0019 # Pool of SSH control masters
0020 class SshMasterPool(object):
0021     # constructor
0022     def __init__(self):
0023         self.lock = threading.Lock()
0024         self.pool = dict()
0025         self.params = dict()
0026 
0027     # make a dict key
0028     def make_dict_key(self, host, port):
0029         return f"{host}:{port}"
0030 
0031     # make a control master
0032     def make_control_master(
0033         self,
0034         remote_host,
0035         remote_port,
0036         num_masters=1,
0037         ssh_username=None,
0038         ssh_password=None,
0039         private_key=None,
0040         pass_phrase=None,
0041         jump_host=None,
0042         jump_port=None,
0043         login_timeout=60,
0044         reconnect=False,
0045         with_lock=True,
0046         sock_dir=None,
0047         connection_lifetime=None,
0048     ):
0049         dict_key = self.make_dict_key(remote_host, remote_port)
0050         if with_lock:
0051             self.lock.acquire()
0052         # make dicts
0053         if dict_key not in self.pool:
0054             self.pool[dict_key] = []
0055         # preserve parameters
0056         if not reconnect:
0057             self.params[dict_key] = {
0058                 "num_masters": num_masters,
0059                 "ssh_username": ssh_username,
0060                 "ssh_password": ssh_password,
0061                 "private_key": private_key,
0062                 "pass_phrase": pass_phrase,
0063                 "jump_host": jump_host,
0064                 "jump_port": jump_port,
0065                 "login_timeout": login_timeout,
0066                 "sock_dir": sock_dir,
0067                 "connection_lifetime": connection_lifetime,
0068             }
0069         else:
0070             num_masters = self.params[dict_key]["num_masters"]
0071             ssh_username = self.params[dict_key]["ssh_username"]
0072             ssh_password = self.params[dict_key]["ssh_password"]
0073             private_key = self.params[dict_key]["private_key"]
0074             pass_phrase = self.params[dict_key]["pass_phrase"]
0075             jump_host = self.params[dict_key]["jump_host"]
0076             jump_port = self.params[dict_key]["jump_port"]
0077             login_timeout = self.params[dict_key]["login_timeout"]
0078             sock_dir = self.params[dict_key]["sock_dir"]
0079             connection_lifetime = self.params[dict_key]["connection_lifetime"]
0080         # make a master
0081         for i in range(num_masters - len(self.pool[dict_key])):
0082             # make a socket file
0083             sock_file = os.path.join(sock_dir, f"sock_{remote_host}_{uuid.uuid4().hex}")
0084             com = "ssh -M -S {sock_file} "
0085             com += "-p {remote_port} {ssh_username}@{remote_host} "
0086             com += "-o ServerAliveInterval=120 -o ServerAliveCountMax=2 "
0087             if private_key is not None:
0088                 com += "-i {private_key} "
0089             if jump_host is not None and jump_port is not None:
0090                 com += '-o ProxyCommand="ssh -p {jump_port} {ssh_username}@{jump_host} -W %h:%p" '
0091             com = com.format(
0092                 remote_host=remote_host,
0093                 remote_port=remote_port,
0094                 ssh_username=ssh_username,
0095                 private_key=private_key,
0096                 jump_host=jump_host,
0097                 jump_port=jump_port,
0098                 sock_file=sock_file,
0099             )
0100             loginString = "login_to_be_confirmed_with " + uuid.uuid4().hex
0101             com += f"'echo {loginString}; bash"
0102             # list of expected strings
0103             expected_list = [
0104                 pexpect.EOF,
0105                 pexpect.TIMEOUT,
0106                 "(?i)are you sure you want to continue connecting",
0107                 "(?i)password:",
0108                 "(?i)enter passphrase for key.*",
0109                 loginString,
0110             ]
0111             c = pexpect_spawn(com, echo=False)
0112             baseLogger.debug("pexpect_spawn")
0113             c.logfile_read = baseLogger.handlers[0].stream
0114             isOK = False
0115             for iTry in range(3):
0116                 idx = c.expect(expected_list, timeout=login_timeout)
0117                 if idx == expected_list.index(loginString):
0118                     # succeeded
0119                     isOK = True
0120                     break
0121                 if idx == 1:
0122                     # timeout
0123                     baseLogger.error(f"timeout when making a master with com={com} out={c.buffer}")
0124                     c.close()
0125                     break
0126                 if idx == 2:
0127                     # new certificate
0128                     c.sendline("yes")
0129                     idx = c.expect(expected_list, timeout=login_timeout)
0130                 if idx == 1:
0131                     # timeout
0132                     baseLogger.error(f"timeout after accepting new cert with com={com} out={c.buffer}")
0133                     c.close()
0134                     break
0135                 if idx == 3:
0136                     # password prompt
0137                     c.sendline(ssh_password)
0138                 elif idx == 4:
0139                     # passphrase prompt
0140                     c.sendline(pass_phrase)
0141                 elif idx == 0:
0142                     baseLogger.error(f"something weired with com={com} out={c.buffer}")
0143                     c.close()
0144                     break
0145                 # exec to confirm login
0146                 c.sendline(f"echo {loginString}")
0147             if isOK:
0148                 conn_exp_time = (time.time() + connection_lifetime) if connection_lifetime is not None else None
0149                 self.pool[dict_key].append((sock_file, c, conn_exp_time))
0150         if with_lock:
0151             self.lock.release()
0152 
0153     # get a connection
0154     def get_connection(self, remote_host, remote_port, exec_string):
0155         baseLogger.debug("get_connection start")
0156         dict_key = self.make_dict_key(remote_host, remote_port)
0157         self.lock.acquire()
0158         active_masters = []
0159         someClosed = False
0160         for sock_file, child, conn_exp_time in list(self.pool[dict_key]):
0161             if child.isalive() and time.time() <= conn_exp_time:
0162                 active_masters.append((sock_file, child, conn_exp_time))
0163             else:
0164                 child.close()
0165                 self.pool[dict_key].remove((sock_file, child, conn_exp_time))
0166                 someClosed = True
0167                 if child.isalive():
0168                     baseLogger.debug("a connection process is dead")
0169                 else:
0170                     baseLogger.debug("a connection is expired")
0171         if someClosed:
0172             self.make_control_master(remote_host, remote_port, reconnect=True, with_lock=False)
0173             active_masters = [item for item in self.pool[dict_key] if os.path.exists(item[0])]
0174             baseLogger.debug(f"reconnected; now {len(active_masters)} active connections")
0175         if len(active_masters) > 0:
0176             sock_file, child, conn_exp_time = random.choice(active_masters)
0177             con = subprocess.Popen(
0178                 ["ssh", "dummy", "-S", sock_file, exec_string], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
0179             )
0180         else:
0181             con = None
0182         self.lock.release()
0183         return con
0184 
0185 
0186 # singleton
0187 sshMasterPool = SshMasterPool()
0188 del SshMasterPool