当前位置: 首页 > backend >正文

用go从零构建写一个RPC(仿gRPC,tRPC)--- 版本2

在版本1中,虽然系统能够满足基本需求,但随着连接数的增加和处理请求的复杂度上升,性能瓶颈逐渐显现。为了进一步提升系统的稳定性、并发处理能力以及资源的高效利用,版本2引入了三个重要功能:客户端连接池、服务器长连接以及服务器处理业务逻辑时引入的协程池,主要是为了更好的利用资源和提高系统的稳定性。这些功能的引入,使得系统在面对大规模连接和高并发请求时,能够更好地应对。

为了更好的验证两个版本的效果,文章末尾也增加了两个版本的压测效果
代码地址:https://github.com/karatttt/MyRPC

版本2新增特性

分别解释一下每个功能引入的原因和实际需求。

客户端连接池的引入

背景:

在之前的版本中,每次与服务器的通信都会创建和销毁连接,频繁的连接创建和销毁不仅浪费了系统资源,每一次都需要经过TCP的握手和挥手,同时也导致了连接响应时间的不稳定,我们希望引入一个连接池,更好的管理和复用连接。

// 原版本
// 实现Send方法
func (c *clientTransport) Send(ctx context.Context, reqBody interface{}, rspBody interface{}, opt *ClientTransportOption) error {// 获取连接// TODO 这里的连接后续可以优化从连接池获取conn, err := net.Dial("tcp", opt.Address)if err != nil {return err}defer conn.Close()
实现思路:

连接池和协程池有相似的地方也有不同的地方,首先他们都是池化机制,旨在重用已有资源(连接或协程),避免频繁创建和销毁资源的性能开销。都有空闲资源的回收、最大并发数的限制,以及对请求的排队和等待机制,同时都需要处理资源的生命周期管理(如超时、关闭等)。

但是他们的对象不同,一个是TCP连接,一个是系统的协程。我们可以借鉴协程池的实现来实现这个连接池。

这里还需要注意一个点。我们往往对于一个service的请求是同一目的 IP + 端口(如并发调用多次Hello方法,访问server也是同一个同一目的 IP + 端口,只是源端口不同),不同的方法通过协议数据中的方法名来进行service内的路由。而我们创建的这个连接池,是对于这个目的 IP + 端口的池化处理,针对这个目的 IP + 端口创建多个连接并复用,所以我们系统中对于不同的service应有不同的连接池,故做一个poolManager来统一管理。

PoolManager
先来看看这个poolManager,首先是getPoolManager,获取一个全局唯一的poolManager

type PoolManager struct {mu       sync.RWMutexpools    map[string]*ConnPool // key是目的ip加端口,v是实际连接池ctx      context.Contextcancel   context.CancelFuncsigChan  chan os.Signal}// GetPoolManager 获取全局唯一的 PoolManager 实例func GetPoolManager() *PoolManager {poolManagerOnce.Do(func() {ctx, cancel := context.WithCancel(context.Background())globalPoolManager = &PoolManager{pools:    make(map[string]*ConnPool),ctx:      ctx,cancel:   cancel,sigChan:  make(chan os.Signal, 1),}// 启动信号处理go globalPoolManager.handleSignals()})return globalPoolManager}
  • 首先这个struct持有一个pools,以及sigChan用于监听程序退出时,优雅关闭所有连接池,这个优雅关闭后续再看。
  • poolManagerOnce.Do(func() 保证只有一个manager实例
  • 我们下面看看如何创建一个连接池
// GetPool 获取指定地址的连接池,如果不存在则创建
func (pm *PoolManager) GetPool(addr string) *ConnPool {pm.mu.RLock()if pool, exists := pm.pools[addr]; exists {pm.mu.RUnlock()return pool}pm.mu.RUnlock()// 创建连接池pm.mu.Lock()defer pm.mu.Unlock()// 双重检查,防止其他goroutine已经创建if pool, exists := pm.pools[addr]; exists {return pool}pool := NewConnPool(addr,           // 服务器地址1000,             // 最大活跃连接数1000,              // 最小空闲连接数60*time.Second, // 空闲连接超时时间60*time.Second, // 建立连接最大生命周期func(address string) (net.Conn, error) {return net.DialTimeout("tcp", address, 60*time.Second)},)pm.pools[addr] = poolreturn pool
}
// handleSignals 处理系统信号
func (pm *PoolManager) handleSignals() {// 注册信号signal.Notify(pm.sigChan, syscall.SIGINT, syscall.SIGTERM)// 等待信号sig := <-pm.sigChanfmt.Printf("\nReceived signal: %v\n", sig)// 创建一个带超时的上下文用于关闭shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)defer cancel()// 优雅关闭连接池fmt.Println("Shutting down connection pools...")if err := pm.Shutdown(shutdownCtx); err != nil {fmt.Printf("Error during shutdown: %v\n", err)}fmt.Println("Connection pools shut down successfully")
}
// Shutdown 优雅关闭所有连接池
func (pm *PoolManager) Shutdown(ctx context.Context) error {// 发送关闭信号pm.cancel()// 等待所有连接池关闭done := make(chan struct{})go func() {pm.mu.Lock()for _, pool := range pm.pools {pool.Close()}pm.pools = make(map[string]*ConnPool)pm.mu.Unlock()close(done)}()// 等待关闭完成或超时select {case <-done:return nilcase <-ctx.Done():return ctx.Err()}
}
  • GetPool中,先获取读锁判断是否有pool已经创建,若没有获取写锁创建连接池
  • handleSignals是创建manager时创建的一个协程监听系统信号,如果进程结束,则调用Shutdown,关闭所有的pool。

ConnectPool
获得了连接池后,我们看看如何获取连接,即get操作


func (p *ConnPool) Get() (net.Conn, error) {p.mu.Lock()defer p.mu.Unlock()// 如果连接池已关闭或正在关闭,拒绝新连接if p.closed || p.closing {return nil, ErrPoolClosed}// 设置等待超时var startWait time.Timeif p.maxWait > 0 {startWait = time.Now()}for {// 检查空闲连接if len(p.idleConns) > 0 {conn := p.idleConns[len(p.idleConns)-1]p.idleConns = p.idleConns[:len(p.idleConns)-1]atomic.AddUint64(&p.stats.Hits, 1)// 连接健康检查if !p.isHealthy(conn) {conn.conn.Close()atomic.AddInt32(&p.activeCount, -1)continue}// 重置超时设置conn.conn.SetDeadline(time.Time{})conn.conn.SetReadDeadline(time.Time{})conn.conn.SetWriteDeadline(time.Time{})conn.lastUsed = time.Now()atomic.AddInt32(&p.activeCount, 1)p.wg.Add(1)p.mu.Unlock()return &pooledConnWrapper{conn, p}, nil}// 检查是否可以创建新连接if int(atomic.LoadInt32(&p.activeCount)) < p.maxActive {atomic.AddInt32(&p.activeCount, 1)p.wg.Add(1)atomic.AddUint64(&p.stats.Misses, 1)p.mu.Unlock()conn, err := p.dialFunc(p.addr)if err != nil {atomic.AddInt32(&p.activeCount, -1)p.wg.Done()atomic.AddUint64(&p.stats.Errors, 1)p.cond.Signal()return nil, err}pooledConn := &PooledConn{conn:      conn,createdAt: time.Now(),lastUsed:  time.Now(),}return &pooledConnWrapper{pooledConn, p}, nil}// 等待连接释放if p.maxWait > 0 {atomic.AddUint64(&p.stats.Timeouts, 1)p.cond.Wait()// 检查是否超时if time.Since(startWait) >= p.maxWait {p.mu.Unlock()return nil, fmt.Errorf("连接池获取连接超时,等待时间: %v", time.Since(startWait))}} else {p.cond.Wait()}}
}
  • 连接池的各个连接有两种状态,空闲连接,活跃连接(正在处理IO的连接),空闲连接通过Put方法归还连接,若超过最大空闲连接数,则该连接被close,活跃连接通过Get方法将其置为活跃状态,若超过最大活跃连接数,则需要排队等待连接池释放连接
  • 获取到连接有几种可能,一种是有空闲连接直接获取返回,一种是无空闲连接,但是活跃连接数未达到最大阈值,则创建新连接,一种是无空闲连接且达到了最大活跃连接数,则通过cond.Wait等待,直到被唤醒的时候,在通过上面几种方式尝试获取
  • 同样有三种情况无法调用get的时候立即创建新连接,一个就是连接池正在关闭或者已经关闭,一个是获取的空闲连接健康检测不通过(这里的检测就是简单的调用conn.Read读,如果读到数据或者返回err则说明不健康,这个连接仍在被使用),还有一个就是无空闲连接且达到了最大活跃连接数,cond.wait等待的时间超过了最大等待时间
  • 接下来看看Put方法

func (p *ConnPool) Put(conn net.Conn) {pc, ok := conn.(*pooledConnWrapper)if !ok {conn.Close()return}p.mu.Lock()defer p.mu.Unlock()// 减少活跃计数atomic.AddInt32(&p.activeCount, -1)p.wg.Done()// 如果连接池正在关闭或已关闭,直接关闭连接if p.closing || p.closed {pc.conn.conn.Close()p.cond.Signal()return}// 检查连接是否健康if !p.isHealthy(pc.conn) {pc.conn.conn.Close()p.cond.Signal()return}// 检查是否超过最大空闲连接数if len(p.idleConns) >= p.maxIdle {pc.conn.conn.Close()p.cond.Signal()return}pc.conn.lastUsed = time.Now()p.idleConns = append(p.idleConns, pc.conn)p.cond.Signal()
}
  • Put方法较为简单,当客户端处理完请求的时候,就 defer一下pool的Put方法归还连接,将该连接的状态置为空闲,如果超过了最大空闲数则close这个连接
  • 最后看看close方法,连接池的优雅关闭
func (p *ConnPool) Shutdown(timeout time.Duration) error {p.mu.Lock()// 标记为正在关闭,不再接受新连接p.closing = truep.mu.Unlock()// 关闭所有空闲连接p.mu.Lock()for _, conn := range p.idleConns {conn.conn.Close()}p.idleConns = nilp.mu.Unlock()// 等待活跃连接完成或超时done := make(chan struct{})go func() {p.wg.Wait() // 等待所有活跃连接归还close(done)}()select {case <-done:// 所有活跃连接已完成p.mu.Lock()p.closed = truep.mu.Unlock()return nilcase <-time.After(timeout):// 超时,强制关闭p.mu.Lock()p.closed = truep.mu.Unlock()return fmt.Errorf("连接池关闭超时,仍有 %d 个活跃连接", atomic.LoadInt32(&p.activeCount))}
}func (p *ConnPool) Close() {// 默认给5秒超时if err := p.Shutdown(5 * time.Second); err != nil {fmt.Println("连接池关闭警告:", err)}
}
  • 首先为什么需要优雅关闭? 如果没有任何协程监听信号(无 signal.Notify)当 SIGINT(Ctrl+C)或 SIGTERM(kill)发生时,进程会立即退出,所有协程(包括 main 和子协程)会被强制终止。操作系统回收所有资源包括socket连接,这就意味着所有未完成的 net.Conn 会被强制关闭,服务端会收到 RST(强制关闭连接,不进行四次挥手) 而非 FIN。我们希望这些正在进行的连接能够正常处理完再关闭,避免server收到不完整的数据从而引发其他意外发生
  • 当然我们需要为这个等待活跃连接处理完设置一个超时时间,所以再Close中,调用shutdown方法并设置了5秒超时时间,shutdown中关闭所有空闲连接,并wg.wait等待活跃连接处理完毕

至此客户端的连接池做好了,但是需要考虑的是,版本1的server端的连接是一次性的,处理完业务逻辑返回后立即close连接。如果不改server的短连接为长连接,那么客户端的连接池则没有意义,即使是空闲的连接仍然会被server立即close掉,所以我们引入server端的长连接

服务器长连接的引入

背景:

在旧版本中,客户端与服务器之间的每次请求都需要进行连接和断开,频繁的建立和关闭连接增加了系统的负担。长连接机制能够让服务器保持与客户端的连接,在连接周期内不断发送和接收数据,减少了连接频繁创建的开销,提高了通信效率,尤其是在需要频繁请求的场景下。

技术思路:

使用for循环监听长连接,不断读取请求帧,并根据上下文状态决定是否关闭连接。同时使用超时机制来防止长时间未操作的连接占用资源。主要更改handleConnection这个方法,即listen到新连接后,go出去的一个协程处理这个方法


// handleConnection 处理单个连接
func (t *serverTransport) handleConnection(ctx context.Context, conn net.Conn) {// 设置连接超时idleTimeout := 30 * time.Secondif t.opts != nil && t.opts.IdleTimeout > 0 {idleTimeout = t.opts.IdleTimeout}// 设置读取超时conn.SetReadDeadline(time.Now().Add(idleTimeout))// 处理连接fmt.Printf("New connection from %s\n", conn.RemoteAddr())// 循环读取请求,即长连接for {select {// 1. 如果上下文被取消,则关闭连接case <-ctx.Done():fmt.Printf("Context cancelled, closing connection from %s\n", conn.RemoteAddr())returndefault:frame, err := codec.ReadFrame(conn)if err != nil {// 2. 如果读取帧失败,如客户端断开连接,则关闭连接if err == io.EOF {fmt.Printf("Client %s disconnected normally\n", conn.RemoteAddr())return}// 3. 如果连接超时,超过设置的idletime,关闭连接if e, ok := err.(net.Error); ok && e.Timeout() {fmt.Printf("Connection from %s timed out after %v\n", conn.RemoteAddr(), idleTimeout)return}// 4. 处理强制关闭的情况if strings.Contains(err.Error(), "forcibly closed") {fmt.Printf("Client %s forcibly closed the connection\n", conn.RemoteAddr())return}fmt.Printf("Read error from %s: %v\n", conn.RemoteAddr(), err)return}// 重置读取超时conn.SetReadDeadline(time.Now().Add(idleTimeout))// 使用协程池处理请求frameCopy := frame // 创建副本避免闭包问题err = t.pool.Submit(func() {// 处理请求response, err := t.ConnHandler.Handle(context.Background(), frameCopy)if err != nil {fmt.Printf("Handle error for %s: %v\n", conn.RemoteAddr(), err)return}// 发送响应if _, err := conn.Write(response); err != nil {fmt.Printf("Write response error for %s: %v\n", conn.RemoteAddr(), err)}})if err != nil {fmt.Printf("Submit task to pool error for %s: %v\n", conn.RemoteAddr(), err)// 协程池提交失败,直接处理response, err := t.ConnHandler.Handle(ctx, frame)if err != nil {fmt.Printf("Handle error for %s: %v\n", conn.RemoteAddr(), err)continue}if _, err := conn.Write(response); err != nil {fmt.Printf("Write response error for %s: %v\n", conn.RemoteAddr(), err)return}}}}
}
  • 可见现在处理读取帧的逻辑变成了for循环,即长连接处理多次的请求,当遇到客户端连接断开或者长时间没有数据传输,则server关闭这个连接

服务端协程池的引入

背景:

在之前的版本中,服务器需要为每一个请求分配一个独立的协程处理,随着请求量的增加,创建大量的协程会导致资源浪费和上下文切换的开销。 通过引入协程池,可以限制并发协程的数量,避免系统因为过多的协程而出现性能瓶颈。协程池帮助复用已有的协程,减少了每个请求创建新协程的开销,提升了处理能力。

技术思路:

使用一个协程池管理并发任务,限制并发请求的数量。如果协程池中的协程数量已达到上限,新请求将被等待或者直接失败,避免过度的并发资源竞争。这里主要还是handleConnection这个方法

// handleConnection部分代码
// 使用协程池处理请求frameCopy := frame // 创建副本避免闭包问题err = t.pool.Submit(func() {// 处理请求response, err := t.ConnHandler.Handle(context.Background(), frameCopy)if err != nil {fmt.Printf("Handle error for %s: %v\n", conn.RemoteAddr(), err)return}// 发送响应if _, err := conn.Write(response); err != nil {fmt.Printf("Write response error for %s: %v\n", conn.RemoteAddr(), err)}})
  • 这里使用了t.pool.Submit(func() 来把业务处理逻辑交由协程池,这里协程池用的是"github.com/panjf2000/ants/v2" 里的ants.Pool
  • 其实这里想和netty的多路复用模型做一个比对,可以更好的理解这种go的模型:
  1. Go 的 net
    • 全局 netpoller 管理所有 socket(监听 + 连接)。实际上go的事件循环包括监听和连接事件,也就是epoll监听的事件循环中,包括监听连接事件(就绪则创建连接),和处理连接事件(就绪则读请求,这个事件是在我们调用conn.Read的时候注册到事件循环中,协程进入等待状态,当数据到达内核的时候则事件就绪,go的调度器会唤醒这个协程开始read数据),实际上和redis的单reactor单线程有点像
    • 但是这里处理业务逻辑就不是单线程了, 我们这里用了协程池来处理业务逻辑
  2. Netty
    • 主 Reactor 负责 Accept,从 Reactor 负责 Read/Write。相对于go的模型做了职责的分类,处理业务逻辑同样是线程池来做,主从reactor多线程模型
      ![[Pasted image 20250510184039.png]]

压测

进行了以上的改进,我们试着写一个例子进行压测,和版本1进行比对

package mainimport ("MyRPC/core/client""MyRPC/pb""context""fmt""net/http"_ "net/http/pprof""runtime""sync""sync/atomic""time"
)var (success int64wg      sync.WaitGroup
)func main() {// 启动 pprof 服务(用于可视化内存分析)go func() {fmt.Println("[pprof] listening on :6060")_ = http.ListenAndServe(":6060", nil)}()// 创建 RPC 客户端c := pb.NewHelloClientProxy(client.WithTarget("121.40.244.59:8001"))if c == nil {fmt.Println("Failed to create client")return}printMemStats("Before requests")// 20000并发// 客户端连接池是1000最大活跃连接数,1000最小空闲连接数const N = 20000start := time.Now()for i := 0; i < N; i++ {wg.Add(1)go func(i int) {defer wg.Done()rsp, err := c.Hello(context.Background(), &pb.HelloRequest{Msg: "world"})if err == nil && rsp != nil {atomic.AddInt64(&success, 1)} else {fmt.Printf("Request %d error: %v\n", i, err)}}(i)}wg.Wait()elapsed := time.Since(start)printMemStats("After requests")fmt.Println("\n------ Benchmark Summary ------")fmt.Printf("Total requests: %d\n", N)fmt.Printf("Success count:  %d\n", success)fmt.Printf("Total time:     %v\n", elapsed)fmt.Printf("Avg per call:   %v\n", elapsed/time.Duration(N))// 休眠3stime.Sleep(3 * time.Second)
}// 打印内存状态
func printMemStats(label string) {var m runtime.MemStatsruntime.ReadMemStats(&m)fmt.Printf("\n=== %s ===\n", label)fmt.Printf("Alloc = %v KB\n", m.Alloc/1024)fmt.Printf("TotalAlloc = %v KB\n", m.TotalAlloc/1024)fmt.Printf("Sys = %v KB\n", m.Sys/1024)fmt.Printf("NumGC = %v\n", m.NumGC)
}

对于4核CPU的压测结果如下:
![[Pasted image 20250510184147.png]]

同样对于10000并发和30000并发,平均处理时间都在2至4ms左右,
而对于版本一,当并发量到达万级别时,已经处理不完了…笔者等了好长时间也没等到结果,tcp连接已经达到了机器极限,太多时间阻塞在连接的关闭和建立上

总结

该版本实际上与http1.1的特性有相似之处,同样是支持了长连接,但是同样会存在队头阻塞的情况,虽然是复用同一 TCP 连接发送多个请求,但是请求对于一个TCP的连接连接仍然是串行,必须等上一个请求完成,如果上一个请求耗时长就会阻塞了。所以后续版本可以考虑引入http2的多路复用的特性。同时也可以考虑实现rpc的异步发送和流式传输

http://www.xdnf.cn/news/5209.html

相关文章:

  • 实验四:网络编程
  • localStorage和sessionStorage
  • Day28 -js开发01 -JS三个实例:文件上传 登录验证 购物商城 ---逻辑漏洞复现 及 判断js的payload思路
  • [Linux网络_71] NAT技术 | 正反代理 | 网络协议总结 | 五种IO模型
  • 好用的播放器推荐
  • 蓝桥杯嵌入式第十一届省赛真题
  • Python企业级OCR实战开发:从基础识别到智能应用
  • 健康养生:开启活力生活的密码
  • JGL066生活垃圾滚筒筛分选机实验装置
  • MAD-TD: MODEL-AUGMENTED DATA STABILIZES HIGH UPDATE RATIO RL
  • Ubuntu22.04安装显卡驱动/卸载显卡驱动
  • JDBC工具类的三个版本
  • Windows系统Jenkins企业级实战
  • Redis经典面试题
  • 数据库实验10
  • 【经验总结】Ubuntu 22.04.5 LTS 将内核从5.15.0-140 升级到6.8.0-60后纽曼无线网卡无法使用解决措施
  • C++ 命令模式详解
  • R 语言科研绘图 --- 桑基图-汇总
  • Python网络爬虫:从入门到实践
  • uniapp-商城-51-后台 商家信息(logo处理)
  • 33号远征队 - SDKDump
  • Spring 必会之微服务篇(2)
  • 前端进化论·JavaScript 篇 · 数据类型
  • [学习]RTKLib详解:sbas.c与rtcm.c
  • Linux 阻塞和非阻塞 I/O 简明指南
  • [架构之美]linux常见故障问题解决方案(十九)
  • 数据结构与算法分析实验11 实现顺序查找表
  • vue注册用户使用v-model实现数据双向绑定
  • 202536 | KafKa生产者分区写入策略+消费者分区分配策略
  • 0.环境初始化