当前位置: 首页 > news >正文

[Java实战]Spring Boot + Netty 实现 TCP 长连接客户端及 RESTful 请求转发(二十六)

[Java实战]Spring Boot + Netty 实现 TCP 长连接客户端及 RESTful 请求转发(二十六)

在现代微服务架构中,经常需要在不同服务之间进行高效、可靠的通信。本文将介绍如何使用 Spring Boot 结合 Netty 实现一个 TCP 长连接客户端,并通过 RESTful 接口转发请求到后台 TCP 服务。这种架构在物联网、实时通信等领域非常常见。

一.架构设计

以下是系统的架构设计图:

RESTful
Response
Async
HTTP Client
Spring Boot Controller
Netty Client Service
TCP Connection Pool
Backend TCP Server
  • HTTP Client:外部客户端通过 RESTful 接口发送请求。
  • Spring Boot Controller:接收 HTTP 请求并转发到 Netty 客户端服务。
  • Netty Client Service:管理 TCP 长连接,发送请求并接收响应。
  • TCP Connection Pool:管理多个 TCP 连接,提高性能和可靠性。
  • Backend TCP Server:后台 TCP 服务,处理实际的业务逻辑。

二.项目搭建与依赖配置

1. 创建项目并添加依赖

创建一个 Spring Boot 项目,并添加以下依赖:

<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Netty --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.94.Final</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>

2. 配置参数

application.yml 文件中配置 TCP 服务器的地址、端口以及其他参数:

tcp:server:host: 127.0.0.1port: 9000client:worker-threads: 4connect-timeout: 3000heartbeat-interval: 30000max-frame-length: 65535

三.核心组件实现

3.1 Netty 客户端启动器

NettyClientBootstrap 负责初始化 Netty 客户端并建立长连接:

@Component
@Slf4j
public class NettyClientBootstrap {@Value("${tcp.server.host}")private String host;@Value("${tcp.server.port}")private int port;private volatile Channel channel;private Bootstrap bootstrap;@PostConstructpublic void init() throws InterruptedException {EventLoopGroup workerGroup = new NioEventLoopGroup();bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 协议处理pipeline.addLast(new LengthFieldPrepender(4));pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));// 业务处理器pipeline.addLast(new ClientHandler());}});connect();}private void connect() throws InterruptedException {ChannelFuture future = bootstrap.connect(host, port).sync();channel = future.channel();channel.closeFuture().addListener(f -> {log.warn("Connection lost, reconnecting...");reconnect();});}private void reconnect() {bootstrap.connect(host, port).addListener((ChannelFutureListener) future -> {if (future.isSuccess()) {channel = future.channel();log.info("Reconnected successfully");} else {log.error("Reconnect failed, retrying in 5s");channel.eventLoop().schedule(this::reconnect, 5, TimeUnit.SECONDS);}});}public Channel getChannel() {if (!channel.isActive()) {throw new IllegalStateException("Channel is inactive");}return channel;}
}

3.2 业务处理器

ClientHandler 负责处理 TCP 通信中的请求和响应:

@Slf4j
public class ClientHandler extends SimpleChannelInboundHandler<String> {private static final ConcurrentMap<String, CompletableFuture<String>> pendingRequests = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {// 响应格式示例:REQ_ID|RESPONSE_DATAString[] parts = msg.split("\\|", 2);if (parts.length == 2) {CompletableFuture<String> future = pendingRequests.remove(parts[0]);if (future != null) {future.complete(parts[1]);}}}public static CompletableFuture<String> sendRequest(Channel channel, String message) {String reqId = UUID.randomUUID().toString();CompletableFuture<String> future = new CompletableFuture<>();pendingRequests.put(reqId, future);String protocolMsg = reqId + "|" + message;channel.writeAndFlush(protocolMsg).addListener(f -> {if (!f.isSuccess()) {future.completeExceptionally(f.cause());pendingRequests.remove(reqId);}});// 设置超时channel.eventLoop().schedule(() -> {if (pendingRequests.remove(reqId) != null) {future.completeExceptionally(new TimeoutException("Request timeout"));}}, 5, TimeUnit.SECONDS);return future;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("Channel error", cause);ctx.close();}
}

3.3 服务层封装

TcpClientService 提供对外的服务接口,封装了 TCP 通信逻辑:

@Service
@RequiredArgsConstructor
public class TcpClientService {private final NettyClientBootstrap clientBootstrap;public String sendMessage(String message) throws Exception {Channel channel = clientBootstrap.getChannel();return ClientHandler.sendRequest(channel, message).get(5, TimeUnit.SECONDS);}
}

3.4 RESTful 接口层

TcpController 提供 RESTful 接口,接收外部请求并转发到 TCP 服务:

@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class TcpController {private final TcpClientService tcpClientService;@PostMapping("/send")public ResponseEntity<?> sendCommand(@RequestBody String payload) {try {String response = tcpClientService.sendMessage(payload);return ResponseEntity.ok(response);} catch (TimeoutException e) {return ResponseEntity.status(HttpStatus.GATEWAY_TIMEOUT).body("Backend service timeout");} catch (Exception e) {return ResponseEntity.status(HttpStatus.BAD_GATEWAY).body("Communication error: " + e.getMessage());}}
}

3.5 心跳保活机制

HeartbeatScheduler 定期发送心跳消息,保持 TCP 连接活跃:

@Component
@RequiredArgsConstructor
public class HeartbeatScheduler {private final TcpClientService tcpClientService;@Scheduled(fixedRateString = "${tcp.client.heartbeat-interval}")public void heartbeat() {try {tcpClientService.sendMessage("HEARTBEAT");} catch (Exception e) {log.error("Heartbeat failed", e);}}
}

四.关键功能说明

1. 长连接管理

  • 自动重连机制:断线后每 5 秒自动重试。
  • Channel 状态监控:实时监控连接状态,确保连接可用。
  • 异常自动恢复:捕获异常并尝试恢复连接。

2. 协议设计

协议格式如下:

+----------------+-------------------+
| 32字节UUID     | 实际消息内容(UTF8)|
+----------------+-------------------+

3. 异步处理流程

异步处理流程如下:

Controller Service Handler TCP Server 发送请求 生成请求ID 发送协议消息 返回响应 完成Future 返回响应 Controller Service Handler TCP Server

五.测试方法

1. 启动模拟 TCP 服务端

使用 Python 快速搭建一个测试 TCP 服务端:

# 使用Python快速搭建测试服务
import socketsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('0.0.0.0', 9000))
sock.listen(1)while True:conn, addr = sock.accept()while True:data = conn.recv(1024)if not data: breakreq_id, payload = data.decode().split('|', 1)conn.send(f"{req_id}|ECHO:{payload}".encode())conn.close()

2. 发送测试请求

通过 curl 发送测试请求:

curl -X POST -H "Content-Type: text/plain" \
-d "Hello Netty" \
http://localhost:8080/api/send

六.生产级优化建议

1. 连接池扩展

实现多 Channel 管理,提高性能和可靠性:

public class ConnectionPool {private final BlockingQueue<Channel> pool = new LinkedBlockingQueue<>(10);public Channel getChannel() {Channel channel = pool.poll();if (channel == null || !channel.isActive()) {channel = createNewChannel();}return channel;}private Channel createNewChannel() {// 创建新连接逻辑}
}

2. 监控指标

添加监控指标,便于实时监控系统状态:

@Bean
public MeterRegistryCustomizer<MeterRegistry> metrics() {return registry -> {Gauge.builder("tcp.active.connections", () -> clientBootstrap.getActiveCount()).register(registry);};
}

3. SSL 加密支持

为 TCP 连接添加 SSL 加密支持:

bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {SSLEngine engine = sslContext.newEngine(ch.alloc());pipeline.addLast(new SslHandler(engine));// 其他处理器...}
});

4. 流量控制

添加流量控制机制,防止服务过载:

// 在Handler中添加限流器
private final RateLimiter rateLimiter = RateLimiter.create(1000); // QPS=1000public static CompletableFuture<String> sendRequest(...) {if (!rateLimiter.tryAcquire()) {throw new RateLimitExceededException();}// 原有逻辑...
}

总结

本文介绍了一个基于 Spring Boot 和 Netty 的 TCP 长连接客户端实现方案,支持通过 RESTful 接口转发请求到后台 TCP 服务。该方案具备以下核心功能:

  • 基于 Netty 的 TCP 长连接管理
  • 异步请求/响应匹配机制
  • 自动重连和心跳保活
  • RESTful 接口集成
  • 完善的超时和异常处理

你可以根据实际需求调整协议格式、连接池参数和安全策略。建议配合 APM 工具(如 SkyWalking)进行链路监控,以进一步优化系统性能和稳定性。

希望本文对你有所帮助!如果有任何问题或建议,欢迎在评论区留言。

http://www.xdnf.cn/news/461179.html

相关文章:

  • 【Linux】动静态库的使用
  • 人工智能100问☞第23问:卷积神经网络(CNN)为何擅长图像处理?
  • 双系统重装ubuntu
  • Newton 迭代
  • 【ORB-SLAM3】CreateNewKeyFrame()函数阅读
  • OpenCV CUDA模块中矩阵操作------矩阵元素求和
  • vue3.0的name属性插件——vite-plugin-vue-setup-extend
  • Spring框架的事务管理
  • 2025全网首发:ComfyUI整合GPT-Image-1完全指南 - 8步实现AI图像创作革命
  • 各类开发教程资料推荐,Java / python /golang /js等
  • ARP Detection MAC-Address Static
  • Uniapp开发鸿蒙购物项目教程之样式选择器
  • Gitee DevSecOps:军工软件研发的智能化变革引擎
  • 使用itextsharp5.0版本来合并多个pdf文件并保留书签目录结构
  • 人体肢体工作识别-一步几个脚印从头设计数字生命——仙盟创梦IDE
  • 产品创新怎么算
  • MySQL主从复制与读写分离
  • 模糊综合评价模型建立
  • Leetcode刷题 | Day63_图论08_拓扑排序
  • Ubuntu 20.04 LTS 中部署 网页 + Node.js 应用 + Nginx 跨域配置 的详细步骤
  • x-file-storage
  • AI数字人融合VR全景:开启未来营销与交互新篇章
  • 每日算法 - 【Swift 算法】Two Sum 问题:从暴力解法到最优解法的演进
  • C#数据类型
  • 新能源汽车制动系统建模全解析——从理论到工程应用
  • 【系统架构师】2025论文《WEB系统性能优化技术》
  • Added non-passive event listener to a scroll-blocking
  • 大语言模型 07 - 从0开始训练GPT 0.25B参数量 - MiniMind 实机训练 预训练 监督微调
  • 【Python 面向对象】
  • Android Development Roadmap