用go从零构建写一个RPC(仿gRPC,tRPC)--- 版本2
在版本1中,虽然系统能够满足基本需求,但随着连接数的增加和处理请求的复杂度上升,性能瓶颈逐渐显现。为了进一步提升系统的稳定性、并发处理能力以及资源的高效利用,版本2引入了三个重要功能:客户端连接池、服务器长连接以及服务器处理业务逻辑时引入的协程池,主要是为了更好的利用资源和提高系统的稳定性。这些功能的引入,使得系统在面对大规模连接和高并发请求时,能够更好地应对。
为了更好的验证两个版本的效果,文章末尾也增加了两个版本的压测效果
代码地址:https://github.com/karatttt/MyRPC
版本2新增特性
分别解释一下每个功能引入的原因和实际需求。
客户端连接池的引入
背景:
在之前的版本中,每次与服务器的通信都会创建和销毁连接,频繁的连接创建和销毁不仅浪费了系统资源,每一次都需要经过TCP的握手和挥手,同时也导致了连接响应时间的不稳定,我们希望引入一个连接池,更好的管理和复用连接。
// 原版本
// 实现Send方法
func (c *clientTransport) Send(ctx context.Context, reqBody interface{}, rspBody interface{}, opt *ClientTransportOption) error {// 获取连接// TODO 这里的连接后续可以优化从连接池获取conn, err := net.Dial("tcp", opt.Address)if err != nil {return err}defer conn.Close()
实现思路:
连接池和协程池有相似的地方也有不同的地方,首先他们都是池化机制,旨在重用已有资源(连接或协程),避免频繁创建和销毁资源的性能开销。都有空闲资源的回收、最大并发数的限制,以及对请求的排队和等待机制,同时都需要处理资源的生命周期管理(如超时、关闭等)。
但是他们的对象不同,一个是TCP连接,一个是系统的协程。我们可以借鉴协程池的实现来实现这个连接池。
这里还需要注意一个点。我们往往对于一个service的请求是同一目的 IP + 端口(如并发调用多次Hello方法,访问server也是同一个同一目的 IP + 端口,只是源端口不同),不同的方法通过协议数据中的方法名来进行service内的路由。而我们创建的这个连接池,是对于这个目的 IP + 端口的池化处理,针对这个目的 IP + 端口创建多个连接并复用,所以我们系统中对于不同的service应有不同的连接池,故做一个poolManager来统一管理。
PoolManager
先来看看这个poolManager,首先是getPoolManager,获取一个全局唯一的poolManager
type PoolManager struct {mu sync.RWMutexpools map[string]*ConnPool // key是目的ip加端口,v是实际连接池ctx context.Contextcancel context.CancelFuncsigChan chan os.Signal}// GetPoolManager 获取全局唯一的 PoolManager 实例func GetPoolManager() *PoolManager {poolManagerOnce.Do(func() {ctx, cancel := context.WithCancel(context.Background())globalPoolManager = &PoolManager{pools: make(map[string]*ConnPool),ctx: ctx,cancel: cancel,sigChan: make(chan os.Signal, 1),}// 启动信号处理go globalPoolManager.handleSignals()})return globalPoolManager}
- 首先这个struct持有一个pools,以及sigChan用于监听程序退出时,优雅关闭所有连接池,这个优雅关闭后续再看。
- poolManagerOnce.Do(func() 保证只有一个manager实例
- 我们下面看看如何创建一个连接池
// GetPool 获取指定地址的连接池,如果不存在则创建
func (pm *PoolManager) GetPool(addr string) *ConnPool {pm.mu.RLock()if pool, exists := pm.pools[addr]; exists {pm.mu.RUnlock()return pool}pm.mu.RUnlock()// 创建连接池pm.mu.Lock()defer pm.mu.Unlock()// 双重检查,防止其他goroutine已经创建if pool, exists := pm.pools[addr]; exists {return pool}pool := NewConnPool(addr, // 服务器地址1000, // 最大活跃连接数1000, // 最小空闲连接数60*time.Second, // 空闲连接超时时间60*time.Second, // 建立连接最大生命周期func(address string) (net.Conn, error) {return net.DialTimeout("tcp", address, 60*time.Second)},)pm.pools[addr] = poolreturn pool
}
// handleSignals 处理系统信号
func (pm *PoolManager) handleSignals() {// 注册信号signal.Notify(pm.sigChan, syscall.SIGINT, syscall.SIGTERM)// 等待信号sig := <-pm.sigChanfmt.Printf("\nReceived signal: %v\n", sig)// 创建一个带超时的上下文用于关闭shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)defer cancel()// 优雅关闭连接池fmt.Println("Shutting down connection pools...")if err := pm.Shutdown(shutdownCtx); err != nil {fmt.Printf("Error during shutdown: %v\n", err)}fmt.Println("Connection pools shut down successfully")
}
// Shutdown 优雅关闭所有连接池
func (pm *PoolManager) Shutdown(ctx context.Context) error {// 发送关闭信号pm.cancel()// 等待所有连接池关闭done := make(chan struct{})go func() {pm.mu.Lock()for _, pool := range pm.pools {pool.Close()}pm.pools = make(map[string]*ConnPool)pm.mu.Unlock()close(done)}()// 等待关闭完成或超时select {case <-done:return nilcase <-ctx.Done():return ctx.Err()}
}
- GetPool中,先获取读锁判断是否有pool已经创建,若没有获取写锁创建连接池
- handleSignals是创建manager时创建的一个协程监听系统信号,如果进程结束,则调用Shutdown,关闭所有的pool。
ConnectPool
获得了连接池后,我们看看如何获取连接,即get操作
func (p *ConnPool) Get() (net.Conn, error) {p.mu.Lock()defer p.mu.Unlock()// 如果连接池已关闭或正在关闭,拒绝新连接if p.closed || p.closing {return nil, ErrPoolClosed}// 设置等待超时var startWait time.Timeif p.maxWait > 0 {startWait = time.Now()}for {// 检查空闲连接if len(p.idleConns) > 0 {conn := p.idleConns[len(p.idleConns)-1]p.idleConns = p.idleConns[:len(p.idleConns)-1]atomic.AddUint64(&p.stats.Hits, 1)// 连接健康检查if !p.isHealthy(conn) {conn.conn.Close()atomic.AddInt32(&p.activeCount, -1)continue}// 重置超时设置conn.conn.SetDeadline(time.Time{})conn.conn.SetReadDeadline(time.Time{})conn.conn.SetWriteDeadline(time.Time{})conn.lastUsed = time.Now()atomic.AddInt32(&p.activeCount, 1)p.wg.Add(1)p.mu.Unlock()return &pooledConnWrapper{conn, p}, nil}// 检查是否可以创建新连接if int(atomic.LoadInt32(&p.activeCount)) < p.maxActive {atomic.AddInt32(&p.activeCount, 1)p.wg.Add(1)atomic.AddUint64(&p.stats.Misses, 1)p.mu.Unlock()conn, err := p.dialFunc(p.addr)if err != nil {atomic.AddInt32(&p.activeCount, -1)p.wg.Done()atomic.AddUint64(&p.stats.Errors, 1)p.cond.Signal()return nil, err}pooledConn := &PooledConn{conn: conn,createdAt: time.Now(),lastUsed: time.Now(),}return &pooledConnWrapper{pooledConn, p}, nil}// 等待连接释放if p.maxWait > 0 {atomic.AddUint64(&p.stats.Timeouts, 1)p.cond.Wait()// 检查是否超时if time.Since(startWait) >= p.maxWait {p.mu.Unlock()return nil, fmt.Errorf("连接池获取连接超时,等待时间: %v", time.Since(startWait))}} else {p.cond.Wait()}}
}
- 连接池的各个连接有两种状态,空闲连接,活跃连接(正在处理IO的连接),空闲连接通过Put方法归还连接,若超过最大空闲连接数,则该连接被close,活跃连接通过Get方法将其置为活跃状态,若超过最大活跃连接数,则需要排队等待连接池释放连接
- 获取到连接有几种可能,一种是有空闲连接直接获取返回,一种是无空闲连接,但是活跃连接数未达到最大阈值,则创建新连接,一种是无空闲连接且达到了最大活跃连接数,则通过cond.Wait等待,直到被唤醒的时候,在通过上面几种方式尝试获取
- 同样有三种情况无法调用get的时候立即创建新连接,一个就是连接池正在关闭或者已经关闭,一个是获取的空闲连接健康检测不通过(这里的检测就是简单的调用conn.Read读,如果读到数据或者返回err则说明不健康,这个连接仍在被使用),还有一个就是无空闲连接且达到了最大活跃连接数,cond.wait等待的时间超过了最大等待时间
- 接下来看看Put方法
func (p *ConnPool) Put(conn net.Conn) {pc, ok := conn.(*pooledConnWrapper)if !ok {conn.Close()return}p.mu.Lock()defer p.mu.Unlock()// 减少活跃计数atomic.AddInt32(&p.activeCount, -1)p.wg.Done()// 如果连接池正在关闭或已关闭,直接关闭连接if p.closing || p.closed {pc.conn.conn.Close()p.cond.Signal()return}// 检查连接是否健康if !p.isHealthy(pc.conn) {pc.conn.conn.Close()p.cond.Signal()return}// 检查是否超过最大空闲连接数if len(p.idleConns) >= p.maxIdle {pc.conn.conn.Close()p.cond.Signal()return}pc.conn.lastUsed = time.Now()p.idleConns = append(p.idleConns, pc.conn)p.cond.Signal()
}
- Put方法较为简单,当客户端处理完请求的时候,就 defer一下pool的Put方法归还连接,将该连接的状态置为空闲,如果超过了最大空闲数则close这个连接
- 最后看看close方法,连接池的优雅关闭
func (p *ConnPool) Shutdown(timeout time.Duration) error {p.mu.Lock()// 标记为正在关闭,不再接受新连接p.closing = truep.mu.Unlock()// 关闭所有空闲连接p.mu.Lock()for _, conn := range p.idleConns {conn.conn.Close()}p.idleConns = nilp.mu.Unlock()// 等待活跃连接完成或超时done := make(chan struct{})go func() {p.wg.Wait() // 等待所有活跃连接归还close(done)}()select {case <-done:// 所有活跃连接已完成p.mu.Lock()p.closed = truep.mu.Unlock()return nilcase <-time.After(timeout):// 超时,强制关闭p.mu.Lock()p.closed = truep.mu.Unlock()return fmt.Errorf("连接池关闭超时,仍有 %d 个活跃连接", atomic.LoadInt32(&p.activeCount))}
}func (p *ConnPool) Close() {// 默认给5秒超时if err := p.Shutdown(5 * time.Second); err != nil {fmt.Println("连接池关闭警告:", err)}
}
- 首先为什么需要优雅关闭? 如果没有任何协程监听信号(无 signal.Notify)当 SIGINT(Ctrl+C)或 SIGTERM(kill)发生时,进程会立即退出,所有协程(包括 main 和子协程)会被强制终止。操作系统回收所有资源包括socket连接,这就意味着所有未完成的 net.Conn 会被强制关闭,服务端会收到 RST(强制关闭连接,不进行四次挥手) 而非 FIN。我们希望这些正在进行的连接能够正常处理完再关闭,避免server收到不完整的数据从而引发其他意外发生
- 当然我们需要为这个等待活跃连接处理完设置一个超时时间,所以再Close中,调用shutdown方法并设置了5秒超时时间,shutdown中关闭所有空闲连接,并wg.wait等待活跃连接处理完毕
至此客户端的连接池做好了,但是需要考虑的是,版本1的server端的连接是一次性的,处理完业务逻辑返回后立即close连接。如果不改server的短连接为长连接,那么客户端的连接池则没有意义,即使是空闲的连接仍然会被server立即close掉,所以我们引入server端的长连接
服务器长连接的引入
背景:
在旧版本中,客户端与服务器之间的每次请求都需要进行连接和断开,频繁的建立和关闭连接增加了系统的负担。长连接机制能够让服务器保持与客户端的连接,在连接周期内不断发送和接收数据,减少了连接频繁创建的开销,提高了通信效率,尤其是在需要频繁请求的场景下。
技术思路:
使用for循环监听长连接,不断读取请求帧,并根据上下文状态决定是否关闭连接。同时使用超时机制来防止长时间未操作的连接占用资源。主要更改handleConnection这个方法,即listen到新连接后,go出去的一个协程处理这个方法
// handleConnection 处理单个连接
func (t *serverTransport) handleConnection(ctx context.Context, conn net.Conn) {// 设置连接超时idleTimeout := 30 * time.Secondif t.opts != nil && t.opts.IdleTimeout > 0 {idleTimeout = t.opts.IdleTimeout}// 设置读取超时conn.SetReadDeadline(time.Now().Add(idleTimeout))// 处理连接fmt.Printf("New connection from %s\n", conn.RemoteAddr())// 循环读取请求,即长连接for {select {// 1. 如果上下文被取消,则关闭连接case <-ctx.Done():fmt.Printf("Context cancelled, closing connection from %s\n", conn.RemoteAddr())returndefault:frame, err := codec.ReadFrame(conn)if err != nil {// 2. 如果读取帧失败,如客户端断开连接,则关闭连接if err == io.EOF {fmt.Printf("Client %s disconnected normally\n", conn.RemoteAddr())return}// 3. 如果连接超时,超过设置的idletime,关闭连接if e, ok := err.(net.Error); ok && e.Timeout() {fmt.Printf("Connection from %s timed out after %v\n", conn.RemoteAddr(), idleTimeout)return}// 4. 处理强制关闭的情况if strings.Contains(err.Error(), "forcibly closed") {fmt.Printf("Client %s forcibly closed the connection\n", conn.RemoteAddr())return}fmt.Printf("Read error from %s: %v\n", conn.RemoteAddr(), err)return}// 重置读取超时conn.SetReadDeadline(time.Now().Add(idleTimeout))// 使用协程池处理请求frameCopy := frame // 创建副本避免闭包问题err = t.pool.Submit(func() {// 处理请求response, err := t.ConnHandler.Handle(context.Background(), frameCopy)if err != nil {fmt.Printf("Handle error for %s: %v\n", conn.RemoteAddr(), err)return}// 发送响应if _, err := conn.Write(response); err != nil {fmt.Printf("Write response error for %s: %v\n", conn.RemoteAddr(), err)}})if err != nil {fmt.Printf("Submit task to pool error for %s: %v\n", conn.RemoteAddr(), err)// 协程池提交失败,直接处理response, err := t.ConnHandler.Handle(ctx, frame)if err != nil {fmt.Printf("Handle error for %s: %v\n", conn.RemoteAddr(), err)continue}if _, err := conn.Write(response); err != nil {fmt.Printf("Write response error for %s: %v\n", conn.RemoteAddr(), err)return}}}}
}
- 可见现在处理读取帧的逻辑变成了for循环,即长连接处理多次的请求,当遇到客户端连接断开或者长时间没有数据传输,则server关闭这个连接
服务端协程池的引入
背景:
在之前的版本中,服务器需要为每一个请求分配一个独立的协程处理,随着请求量的增加,创建大量的协程会导致资源浪费和上下文切换的开销。 通过引入协程池,可以限制并发协程的数量,避免系统因为过多的协程而出现性能瓶颈。协程池帮助复用已有的协程,减少了每个请求创建新协程的开销,提升了处理能力。
技术思路:
使用一个协程池管理并发任务,限制并发请求的数量。如果协程池中的协程数量已达到上限,新请求将被等待或者直接失败,避免过度的并发资源竞争。这里主要还是handleConnection这个方法
// handleConnection部分代码
// 使用协程池处理请求frameCopy := frame // 创建副本避免闭包问题err = t.pool.Submit(func() {// 处理请求response, err := t.ConnHandler.Handle(context.Background(), frameCopy)if err != nil {fmt.Printf("Handle error for %s: %v\n", conn.RemoteAddr(), err)return}// 发送响应if _, err := conn.Write(response); err != nil {fmt.Printf("Write response error for %s: %v\n", conn.RemoteAddr(), err)}})
- 这里使用了t.pool.Submit(func() 来把业务处理逻辑交由协程池,这里协程池用的是"github.com/panjf2000/ants/v2" 里的ants.Pool
- 其实这里想和netty的多路复用模型做一个比对,可以更好的理解这种go的模型:
- Go 的
net
包:- 全局 netpoller 管理所有 socket(监听 + 连接)。实际上go的事件循环包括监听和连接事件,也就是epoll监听的事件循环中,包括监听连接事件(就绪则创建连接),和处理连接事件(就绪则读请求,这个事件是在我们调用conn.Read的时候注册到事件循环中,协程进入等待状态,当数据到达内核的时候则事件就绪,go的调度器会唤醒这个协程开始read数据),实际上和redis的单reactor单线程有点像
- 但是这里处理业务逻辑就不是单线程了, 我们这里用了协程池来处理业务逻辑
- Netty:
- 主 Reactor 负责
Accept
,从 Reactor 负责Read
/Write
。相对于go的模型做了职责的分类,处理业务逻辑同样是线程池来做,主从reactor多线程模型
- 主 Reactor 负责
压测
进行了以上的改进,我们试着写一个例子进行压测,和版本1进行比对
package mainimport ("MyRPC/core/client""MyRPC/pb""context""fmt""net/http"_ "net/http/pprof""runtime""sync""sync/atomic""time"
)var (success int64wg sync.WaitGroup
)func main() {// 启动 pprof 服务(用于可视化内存分析)go func() {fmt.Println("[pprof] listening on :6060")_ = http.ListenAndServe(":6060", nil)}()// 创建 RPC 客户端c := pb.NewHelloClientProxy(client.WithTarget("121.40.244.59:8001"))if c == nil {fmt.Println("Failed to create client")return}printMemStats("Before requests")// 20000并发// 客户端连接池是1000最大活跃连接数,1000最小空闲连接数const N = 20000start := time.Now()for i := 0; i < N; i++ {wg.Add(1)go func(i int) {defer wg.Done()rsp, err := c.Hello(context.Background(), &pb.HelloRequest{Msg: "world"})if err == nil && rsp != nil {atomic.AddInt64(&success, 1)} else {fmt.Printf("Request %d error: %v\n", i, err)}}(i)}wg.Wait()elapsed := time.Since(start)printMemStats("After requests")fmt.Println("\n------ Benchmark Summary ------")fmt.Printf("Total requests: %d\n", N)fmt.Printf("Success count: %d\n", success)fmt.Printf("Total time: %v\n", elapsed)fmt.Printf("Avg per call: %v\n", elapsed/time.Duration(N))// 休眠3stime.Sleep(3 * time.Second)
}// 打印内存状态
func printMemStats(label string) {var m runtime.MemStatsruntime.ReadMemStats(&m)fmt.Printf("\n=== %s ===\n", label)fmt.Printf("Alloc = %v KB\n", m.Alloc/1024)fmt.Printf("TotalAlloc = %v KB\n", m.TotalAlloc/1024)fmt.Printf("Sys = %v KB\n", m.Sys/1024)fmt.Printf("NumGC = %v\n", m.NumGC)
}
对于4核CPU的压测结果如下:
同样对于10000并发和30000并发,平均处理时间都在2至4ms左右,
而对于版本一,当并发量到达万级别时,已经处理不完了…笔者等了好长时间也没等到结果,tcp连接已经达到了机器极限,太多时间阻塞在连接的关闭和建立上
总结
该版本实际上与http1.1的特性有相似之处,同样是支持了长连接,但是同样会存在队头阻塞的情况,虽然是复用同一 TCP 连接发送多个请求,但是请求对于一个TCP的连接连接仍然是串行,必须等上一个请求完成,如果上一个请求耗时长就会阻塞了。所以后续版本可以考虑引入http2的多路复用的特性。同时也可以考虑实现rpc的异步发送和流式传输