在现代开发中,处理大量并发请求时,创建和管理SSH连接池能显著提高系统的性能和响应速度。本文将讲解如何在Go中通过维护SSH连接池,减少重复的SSH连接创建,进而减少响应时延。
1. 初始化配置与连接池
在使用连接池前,必须确保每次执行程序时,连接池不会每次都初始化;因而,我们需要利用 go 语言中的 sync.Once
实现确保所有的初始化都仅仅执行一次。
同时,保证连接池的相关维护变量是全局而非局部。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| var ( globalConfig *config.Config sshPool *SSHConnectionPool once sync.Once )
func (elf *ElFinderConnector) ServeHTTP(rw http.ResponseWriter, req *http.Request) { decoder := form.NewDecoder() switch req.Method { case "GET": if err := req.ParseForm(); err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return } case "POST": err := req.ParseMultipartForm(32 << 20) if err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) return } default: http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) return }
once.Do(func() { globalConfig, err = config.LoadConfig(common.CONFIG_YML) if err != nil { http.Error(rw, "加载配置文件失败", http.StatusInternalServerError) return }
sshConfig := &ssh.ClientConfig{ User: globalConfig.Username, Auth: []ssh.AuthMethod{ ssh.Password(globalConfig.Password), }, HostKeyCallback: ssh.InsecureIgnoreHostKey(), Timeout: 5 * time.Second, } sshPool = NewSSHConnectionPool(globalConfig.Host+":"+globalConfig.Port, sshConfig, 100) })
elf.dispatch(rw, req) }
|
在上述代码中,sshPool = NewSSHConnectionPool(globalConfig.Host+":"+globalConfig.Port, sshConfig, 100)
这行代码用于初始化 100 大小的连接池,并存储到 sshPool
变量中,后续如果需要维护,仅仅读取该变量即可。
2. 连接池的设计
我们使用自定义的 SSHConnectionPool
结构体来管理SSH连接,避免频繁创建和关闭连接,这将极大地提升效率。连接池在需要时创建连接,空闲时将连接归还到池中以供后续使用。
2.1 结构体
1 2 3 4 5 6 7
| type SSHConnectionPool struct { mu sync.Mutex conns chan *ssh.Client maxSize int config *ssh.ClientConfig address string }
|
2.2 创建连接池
1 2 3 4 5 6 7 8 9
| func NewSSHConnectionPool(address string, config *ssh.ClientConfig, maxSize int) *SSHConnectionPool { return &SSHConnectionPool{ conns: make(chan *ssh.Client, maxSize), maxSize: maxSize, config: config, address: address, } }
|
2.3 从连接池获取连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (pool *SSHConnectionPool) GetConnection() (*ssh.Client, error) { pool.mu.Lock() defer pool.mu.Unlock()
select { case conn := <-pool.conns: fmt.Println("从连接池获取到现有连接") return conn, nil default: fmt.Println("连接池中没有可用连接,创建新的连接") conn, err := ssh.Dial("tcp", pool.address, pool.config) if err != nil { return nil, fmt.Errorf("SSH 连接失败: %w", err) } return conn, nil } }
|
其中,pool.mu.Lock()
这个 pool
连接池对象的内部锁,负责对该对象进行加锁,这样可以保证单一访问的独占性,同时:
1 2 3 4 5 6 7 8 9 10 11 12
| select { case conn := <-pool.conns: fmt.Println("从连接池获取到现有连接") return conn, nil default: fmt.Println("连接池中没有可用连接,创建新的连接") conn, err := ssh.Dial("tcp", pool.address, pool.config) if err != nil { return nil, fmt.Errorf("SSH 连接失败: %w", err) } return conn, nil }
|
上述代码利用通道机制,当检测到 case conn := <-pool.conns:
连接池中有现存的连接,则直接返回该连接,视为重用已有连接操作。
否则,才去新建SSH连接,当然,这里只适用于所有用户都使用同一SSH连接的情况。
每次连接完毕将资源归回池中
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (pool *SSHConnectionPool) ReturnConnection(conn *ssh.Client) { pool.mu.Lock() defer pool.mu.Unlock()
if len(pool.conns) < pool.maxSize { pool.conns <- conn fmt.Println("连接已归还到连接池中") } else { conn.Close() fmt.Println("连接池已满,关闭连接") } }
|
前面初始化时,为池分配了一个池的大小(100),当每次连接使用资源完毕后,应及时归还(利用通道机制)。如果发现满了,则及时关闭当前连接。
3. 使用连接池(具体业务逻辑)
在具体操作中,通过连接池获取一个SSH连接,然后使用该连接创建SFTP客户端,执行远程目录检查及其他命令操作。操作完毕后,连接会被归还到连接池中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| func (elf *ElFinderConnector) handleFetchUserDir() { startTime := time.Now() defer func() { totalDuration := time.Since(startTime) fmt.Printf("总执行时间: %v\n", totalDuration) }()
remoteDir := filepath.Join(globalConfig.RemoteDir, "user_"+elf.req.CTHRCode)
getConnStart := time.Now() if sshPool == nil { elf.res.Error = "SSH连接池未初始化" return }
conn, err := sshPool.GetConnection() if handleError(err, "SSH 连接失败", elf) { return } getConnDuration := time.Since(getConnStart) fmt.Printf("获取 SSH 连接耗时: %v\n", getConnDuration)
defer func() { if conn != nil { sshPool.ReturnConnection(conn) } }()
createSFTPStart := time.Now() sftpClient, err := sftp.NewClient(conn) if handleError(err, "创建 SFTP 客户端失败", elf) { return } createSFTPDuration := time.Since(createSFTPStart) fmt.Printf("创建 SFTP 客户端耗时: %v\n", createSFTPDuration)
defer sftpClient.Close()
ensureDirStart := time.Now() if err := ensureRemoteDir(sftpClient, remoteDir); err != nil { elf.res.Error = err.Error() return } ensureDirDuration := time.Since(ensureDirStart) fmt.Printf("确保远程目录存在耗时: %v\n", ensureDirDuration)
type CmdResult struct { Output string Err error } cmdResultChan := make(chan CmdResult)
executeCmdStart := time.Now() go func() { output, err := executeRemoteCommand(conn, "ls -l "+remoteDir) cmdResultChan <- CmdResult{Output: output, Err: err} }() result := <-cmdResultChan executeCmdDuration := time.Since(executeCmdStart) fmt.Printf("执行远程命令耗时: %v\n", executeCmdDuration)
if result.Err != nil { handleError(result.Err, "执行 ls -l 命令失败", elf) return }
elf.res.CmdOutput = result.Output }
|
其中,这段代码利用通道机制,将执行命令并获取结果的操作单独抽离出来。
1 2 3 4 5 6 7 8 9 10 11 12
| type CmdResult struct { Output string Err error } cmdResultChan := make(chan CmdResult)
executeCmdStart := time.Now() go func() { output, err := executeRemoteCommand(conn, "ls -l "+remoteDir) cmdResultChan <- CmdResult{Output: output, Err: err} }() result := <-cmdResultChan
|
注意,这段代码需要写在一起:
1 2 3 4 5 6
| conn, err := sshPool.GetConnection() if err != nil { elf.res.Error = err.Error() return } defer sshPool.ReturnConnection(conn)
|
用于确保获取连接的同时,无论如何都会在这段代码所在函数执行后,归还连接。而上述的功能,主要是打开一个连接,创建一个SSH客户端,然后执行命令 ls -l,然后返回结果,给前端展示。
3.1 确保远程目录存在
在操作远程服务器时,我们通过 SFTP 客户端来检查目录是否存在,必要时创建它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func ensureRemoteDir(client *sftp.Client, remoteDir string) error { _, err := client.Stat(remoteDir) if os.IsNotExist(err) { if err := client.Mkdir(remoteDir); err != nil { return fmt.Errorf("创建远程目录失败: %w", err) } fmt.Println("远程目录已创建: ", remoteDir) } else if err != nil { return fmt.Errorf("检查远程目录失败: %w", err) } else { fmt.Println("远程目录已存在: ", remoteDir) } return nil }
|
3.2 执行远程命令
我们使用 executeRemoteCommand 方法执行远程命令并返回输出结果。
1 2 3 4 5 6 7 8 9 10 11 12 13
| func executeRemoteCommand(conn *ssh.Client, command string) (string, error) { session, err := conn.NewSession() if err != nil { return "", fmt.Errorf("创建 SSH 会话失败: %w", err) } defer session.Close()
outputBytes, err := session.CombinedOutput(command) if err != nil { return "", fmt.Errorf("执行命令失败: %w", err) } return string(outputBytes), nil }
|
打开 Session
会话之后,也要记得无论如何需要关闭它,以免资源长时间的占用。并使用 session.CombinedOutput(command)
方法执行命令并以字节流的方式获取结果。
总结
本文实现了一个简单而高效的连接池设计,充分利用了 Go 的并发特性和资源管理优势。具体来说:
一次性初始化:通过 sync.Once 确保配置文件和连接池的初始化仅执行一次,避免了不必要的重复初始化,减少了开销。
连接池的设计:使用自定义的 SSHConnectionPool 结构体,管理 SSH 连接的创建、获取和归还,确保在高并发场景下能够高效重用连接。
错误处理机制:在获取连接和执行命令时,采用了健壮的错误处理机制,确保系统在出现问题时能够及时反馈并采取相应措施。
并发执行命令:通过 goroutine 和通道机制,实现了命令的并发执行,提高了系统处理请求的效率。