当前位置: 首页 > ai >正文

小记Vert.x的Pipe都做了什么

注意: 本文内容于 2025-06-08 01:41:22 创建,可能不会在此平台上进行更新。如果您希望查看最新版本或更多相关内容,请访问原文地址:小记Vert.x的Pipe都做了什么。感谢您的关注与支持!

一、背景

最近我在思考一个问题。

在长连接的使用场景中,为了及时释放空闲资源,通常会配置空闲超时机制。

这种机制应用于单个连接(比如一个 TCP 或 HTTP 连接)时,自然没问题。然而,如果放在一整条通信链路中,链路上的各个节点分别配置了不同的空闲超时参数,会发生什么情况呢?

我在一次实施中就遇到了类似的情况:当请求在发送后一段时间(大约 1 分钟)再次发起时,系统就会报错。由于我负责的是链路最下游的部分,无法直接查看上游节点的配置,只能推测问题可能是由于链路中各节点的空闲超时设置不一致所致。最终,我尝试将我这边的 idleTimeout 参数调大,问题随之消失。

虽然问题得以解决,但具体成因仍然只是我的猜测,也没有权力知道全貌。因此为了验证这个猜想,我决定基于 Java 的 Vert.x 框架,模拟并分析这类链路中因空闲超时不一致而导致的问题。

首先了解TCP通信的三次握手、四次挥手。我在下面简单画一下。如何进行抓包可以参考TCP状态以及CLOSE_WAIT问题排查 - 言成言成啊

三次握手

主动建立方                                              被动建立方|                                                    || ------------------ SYN --------------------------> ||                                                    || <--------------- SYN + ACK ----------------------- ||                                                    || ------------------ ACK --------------------------> ||                                                    |
连接建立成功                                     连接建立成功

四次挥手

主动关闭方                                             被动关闭方|                                                    || ------------------ FIN --------------------------> ||                                                    || <------------------ ACK -------------------------- ||                                                    ||              等待服务器准备关闭                     ||                                                    || <----------------- FIN + ACK --------------------- ||                                                    || ------------------ ACK --------------------------> ||                                                    |
连接关闭成功                                     连接关闭成功

不过在实际使用时,主动关闭方会发送FIN+ACK给被动关闭方。这也是符合规范的。

二、实践

2.1 实现

本文代码meethigher/bug-test at vertx-network-disconnect

网络链路user --[a conn]-- proxyServer/proxyClient --[b conn]-- backendServer,我现在有三台机器,分别用来模拟链路中的三个角色。

  • backendServer: 192.168.1.223
    • 永不超时
  • proxyServer/proxyClient: 192.168.1.103
    • proxyServer: 永不超时
    • proxyClient: 5秒空闲超时
  • user: 192.168.1.105
    • 永不超时
    • 随便使用局域网一台设备即可,只需要有telnet命令。执行telnet 192.168.1.103 8080,观察5秒之后,连接是否会被断开

backendServer源码

NetServer backendServer = vertx.createNetServer();
backendServer.connectHandler(socket -> socket.write(String.valueOf(System.currentTimeMillis()))).listen(8888).onFailure(e -> System.exit(1));

下面记录使用Vertx中NetSocket的两种api来双向传输数据,以及超时导致的问题。

2.1.1 handler…write

proxyServer/proxyClient关键代码

NetServer proxyServer = vertx.createNetServer();
NetClient proxyClient = vertx.createNetClient(new NetClientOptions().setIdleTimeoutUnit(TimeUnit.SECONDS).setIdleTimeout(5));
proxyServer.connectHandler(a -> {a.pause();proxyClient.connect(8888, "192.168.1.223").onFailure(e -> System.exit(1)).onSuccess(b -> {b.pause();a.handler(b::write);b.handler(a::write);a.resume();b.resume();});
}).listen(8080).onFailure(e -> System.exit(1));

现象:b连接断开,a连接保持

tcp抓包日志截图如下,会发现proxyServer/proxyClientbackendServer发送了FIN,所以b连接断开,但是并没有向user发送,所以a连接仍然保持。

2.1.2 pipeTo

proxyServer/proxyClient关键代码

NetServer proxyServer = vertx.createNetServer();
NetClient proxyClient = vertx.createNetClient(new NetClientOptions().setIdleTimeoutUnit(TimeUnit.SECONDS).setIdleTimeout(5));
proxyServer.connectHandler(a -> {a.pause();proxyClient.connect(8888, "192.168.1.103").onFailure(e -> System.exit(1)).onSuccess(b -> {b.pause();a.pipeTo(b);b.pipeTo(a);a.resume();b.resume();});
}).listen(8080).onFailure(e -> System.exit(1));

现象:b连接断开,a连接也断开

tcp抓包日志截图如下,会发现proxyServer/proxyClientbackendServer发送了FIN,所以b连接断开,也向user发送FIN,所以a连接也断开。

2.2 思考

2.2.1 handler…write与pipeTo的区别

为啥handler..writepipeTo的结果不同呢?这就需要跟一下pipeTo源码了。

原因在于pipeTo内部给源头连接注册了endHandlerexceptionHandler,当监听到如上事件时,会默认将对端连接也进行end()

由于io.vertx.core.streams.Pipe的实现类io.vertx.core.streams.impl.PipeImpl逻辑不复杂,跟别的模块代码也并没有强耦合,因此我们可以自己复制一份DiyPipe出来,以供自己调试。

那么pipeTo到底做了哪些东西呢?这个可以将其使用handler..write来实现出来。a.pipeTo(b).onComplete(completion)就相当于如下代码

a.resume();
a.handler(buf -> {b.write(buf);if (b.writeQueueFull()) {a.pause();b.drainHandler(t -> a.resume());}
});
a.endHandler(v -> {a.handler(null);a.endHandler(null);a.exceptionHandler(null);b.end().onComplete(completion);
});
a.exceptionHandler(e -> {a.handler(null);a.endHandler(null);a.exceptionHandler(null);b.end().onComplete(v -> completion.handle(Future.failedFuture(e)));
});

在此也提一个插曲,之前发现了一个tcp反向代理的bug

  • TCP反向代理在反代HTTP短连接服务时,出现io.netty.channel.StacklessClosedChannelException · Issue #6 · meethigher/tcp-reverse-proxy
  • meethigher/bug-test at vertx-tcp-proxy-closed

这个问题其实挺傻逼的,用了pipeTo这个api,连接的生命周期已经双向绑定了,而我又进行了再次绑定,进而导致关了又关的问题。

2.2.2 endHandler()/closeHandler()区别

在Vertx中,end和close主要用于区分半关闭和全关闭的状态。

以NetSocket为例,end底层调用了close,因此调用end()和调用close()的作用是一致的。

但是endHandler()和closeHandler()是严格不一样的。可以通过源码查看对应的触发时机,明显是endHandler()会比closeHandler()触发更靠前。

2.3 Promise用法示例

常规使用示例

import io.vertx.core.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.LockSupport;public class PromiseUsage {private static final Vertx vertx = Vertx.vertx();private static final Logger log = LoggerFactory.getLogger(PromiseUsage.class);public static Future<String> getFuture() {// Promise用法final Promise<String> promise = Promise.promise();vertx.setTimer(5000, id -> {if (ThreadLocalRandom.current().nextBoolean()) {promise.complete("succeed");} else {promise.fail("failed");}});return promise.future();}public static void main(String[] args) {Handler<AsyncResult<String>> completion = ar -> {if (ar.succeeded()) {log.info("test succeed");} else {log.error("test failed", ar.cause());}};getFuture().onComplete(completion);getFuture().onComplete(v -> {completion.handle(Future.failedFuture(new RuntimeException("hh")));});for (int i = 0; i < 10; i++) {getFuture().onComplete(ar -> {if (ar.succeeded()) {log.info("future completed");} else {log.error("future failed");}});}LockSupport.park();}
}
http://www.xdnf.cn/news/12852.html

相关文章:

  • 《深入理解 Nacos 集群与 Raft 协议》系列三:日志对比机制:Raft 如何防止数据丢失与错误选主
  • 讲述我的plc自学之路 第十三章
  • 遍历 Map 类型集合的方法汇总
  • 第1篇:BLE 是什么?与经典蓝牙有何区别?
  • 【第三十九周】ViLT
  • 《高等数学》(同济大学·第7版)第三章第二节“洛必达法则“详解
  • C语言编程习题Day1
  • 曼昆《经济学原理》第九版 第七章消费者、生产者与市场效率
  • 解决Vscode JDK插件源码缺失问题
  • 手搓transformer
  • 【数据结构与算法】从广度优先搜索到Dijkstra算法解决单源最短路问题
  • springboot3.5整合Spring Security6.5默认密码没有打印输出控制台排查过程
  • DeepSeek 终章:破局之路,未来已来
  • 图像超分辨率
  • 爱抚宠物小程序源代码+lw+ppt
  • 数据库学习(三)——MySQL锁
  • for循环应用
  • 【西门子杯工业嵌入式-6-ADC采样基础】
  • 详细叙述一下Spring如何创建bean
  • Python训练营打卡DAY48
  • 华为IP(8)(OSPF开放最短路径优先)
  • 树状数组学习笔记
  • 振动力学:无阻尼多自由度系统(受迫振动)
  • SQL进阶之旅 Day 21:临时表与内存表应用
  • Spring MVC请求处理流程和DispatcherServlet机制解析
  • 【Go语言基础【18】】Map基础
  • 2025-04-28-堆、栈及其应用分析
  • 算法专题七:分治
  • 【CATIA的二次开发23】抽象对象Document涉及文档激活控制的方法
  • serv00 ssh登录保活脚本-邮件通知版