当前位置: 首页 > news >正文

Netty集群方案详解与实战(Zookeeper + Redis + RabbitMQ)

一、背景

二、Netty 单体架构的优缺点

        优点

        缺点

三、Netty 集群架构的优缺点

        优点

        缺点

四、适用场景对比

五、Netty单体架构代码实现

六、Netty集群架构方案实现

        方案一、Nginx负载均衡实现集群(较为简单)

                Nginx配置

                前端连接方式

        方案二、Nacos+Gateway(结合SpringCloud生态)

                Netty服务

                gateway网关服务

                前端连接方式

        方案三、Zookeeper + Redis + RabbitMQ方案实现

                redis自动分配端口

                Zookeeper实现Netty服务的注册、在线人数

                Chat-Web服务根据人数最少策略拿到Netty地址

                RabbitMQ实现Netty服务对消息的监听消费

                Chat-Web服务监听Zookeeper节点清理Redis与RabbitMQ残留数据

七、结语


 一、背景

Netty 是一个基于 Java NIO 的高性能网络应用框架,广泛应用于高并发、低延迟的通信场景(如游戏服务器、即时通讯、RPC 框架等)。

单体架构:单台服务器运行一个 Netty 实例,处理所有客户端请求,适合轻量级应用或开发测试阶段,资源集中但存在单点风险。

集群架构:多台服务器协同工作,通过负载均衡、分布式通信等技术共同处理请求,提升性能和可靠性,通过横向扩展解决性能瓶颈,适合高并发、高可用性要求的场景,但需处理分布式复杂性。

二、Netty 单体架构的优缺点

优点

简单易用:无需考虑分布式协调、数据分片等问题,开发逻辑直接(如直接操作 Channel 和 EventLoop)。部署方便,一台服务器即可运行,适合快速验证业务逻辑。

低延迟通信:所有请求在同一进程内处理,避免网络传输和序列化开销,适合对延迟敏感的场景(如实时游戏)。

资源集中管理:共享线程池、缓存等资源,减少重复创建开销。调试方便,可直接通过日志或调试工具定位问题。

成本低:无需额外负载均衡器或分布式中间件,硬件和运维成本较低。

缺点

单点故障风险:服务器宕机或网络中断会导致整个服务不可用,缺乏容灾能力。

性能瓶颈:单台服务器的 CPU、内存、网络带宽有限,无法支撑超大规模并发(如百万级连接)。

扩展性差:垂直扩展(升级硬件)成本高,且受物理限制;水平扩展(增加服务器)需重构为集群架构。

维护困难:随着业务增长,单体代码可能变得臃肿,模块间耦合度高,难以维护和迭代。

三、Netty 集群架构的优缺点

优点

高可用性:通过多节点部署和心跳检测,实现故障自动转移(如使用 ZooKeeper 或 etcd 管理节点状态)。单节点故障不影响整体服务,适合金融、电商等对稳定性要求高的场景。

弹性扩展:水平扩展方便,通过增加服务器即可提升处理能力(如支持千万级连接)。结合负载均衡(如 Nginx、LVS)或服务发现(如 Consul)动态分配流量。

负载均衡:请求均匀分发到多个节点,避免单节点过载,提升资源利用率。支持根据业务优先级或用户特征进行智能路由(如灰度发布)。

数据一致性支持:结合分布式缓存(如 Redis)或数据库分片,解决多节点数据同步问题。适合需要强一致性的场景(如订单处理、支付系统)。

缺点

复杂性增加:需处理分布式事务、序列化、网络分区(脑裂)等问题,开发难度显著提升。需要引入中间件(如 Kafka、RocketMQ)或框架(如 Spring Cloud)协调节点间通信。

性能开销:节点间通信需经过网络传输和序列化/反序列化,增加延迟(如 gRPC 的 Protobuf 编码)。负载均衡器可能成为瓶颈(如 Nginx 性能不足时需升级或分片)。

运维成本高:需监控多节点状态、日志聚合(如 ELK)、分布式追踪(如 SkyWalking)等。部署和升级需考虑滚动重启、数据迁移等操作,流程复杂。

一致性挑战:分布式环境下难以保证强一致性,需权衡 CAP 理论(如采用最终一致性模型)。需设计幂等、重试、补偿机制应对网络异常。

四、适用场景对比

 

选择单体:若业务规模小、对延迟敏感且无需高可用,单体 Netty 是简单高效的选择。

选择集群:若需支撑高并发、高可用或未来扩展,集群架构是必然趋势,但需投入更多资源解决分布式问题。

五、Netty单体架构代码实现

请参考:【Netty实战】基于Netty+WebSocket的IM通信后台服务代码详解-CSDN博客

六、Netty集群架构方案实现

方案一、Nginx负载均衡实现集群(较为简单)

Nginx配置
http {upstream netty_cluster {server 192.168.1.101:875; # 节点1server 192.168.1.102:875; # 节点2ip_hash; # 基于客户端IP的会话保持}server {listen 80;location /ws {proxy_pass http://netty_cluster;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}}
}
前端连接方式
const socket = new WebSocket("ws://your-nginx-ip:875/ws");

由于单节点不可能有全部的channel信息,后续的会话转发可参考方案三中的RabbitMQ实现

方案二、Nacos+Gateway(结合SpringCloud生态)

Netty服务

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project><dependencies><!-- Netty核心 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.86.Final</version></dependency><!-- Nacos服务发现 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
</project>

application.yml 

server:port: 875 # Netty服务端口spring:application:name: netty-servicecloud:nacos:discovery:server-addr: nacos-server:8848namespace: prodephemeral: truenetty:websocket:path: /ws

启动类 

@SpringBootApplication
@EnableDiscoveryClient
public class NettyServerApplication {public static void main(String[] args) {SpringApplication.run(NettyServerApplication.class, args);}@Beanpublic ApplicationRunner nettyStarter() {return args -> {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new WSServerInitializer()).bind(875).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}};}
}
gateway网关服务

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project><dependencies><!-- Spring Cloud Gateway --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId></dependency><!-- Nacos服务发现 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency></dependencies>
</project>

application.yml

server:port: 8080spring:application:name: api-gatewaycloud:gateway:discovery:locator:enabled: trueroutes:- id: netty-ws-routeuri: lb://netty-servicepredicates:- Path=/ws/**filters:- StripPrefix=1- name: RequestRateLimiterargs:redis-rate-limiter.replenishRate: 100redis-rate-limiter.burstCapacity: 200nacos:discovery:server-addr: nacos-server:8848
前端连接方式
// 通过Gateway连接
const socket = new WebSocket("ws://your-gateway-ip/ws");

由于单节点不可能有全部的channel信息,后续的会话转发可参考方案三中的RabbitMQ实现

方案三、Zookeeper + Redis + RabbitMQ方案实现

redis自动分配端口

其实这里也可以将端口与在线人数放在Redis中,改成zookeeper方案可以不需要在中断连接后,监听并且清理在线人数和端口,因为netty与zk建立的临时节点,中断连接后,会自动删除该临时节点。

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.4.3</version>
</dependency>
/*** Jedis 连接池工具类*/
public class JedisPoolUtils {private static final JedisPool jedisPool;static {//配置连接池JedisPoolConfig poolConfig = new JedisPoolConfig();//最大连接数poolConfig.setMaxTotal(10);//最大空闲连接poolConfig.setMaxIdle(10);//最小空闲连接poolConfig.setMinIdle(5);//最长等待时间,mspoolConfig.setMaxWaitMillis(1500);//创建连接池对象jedisPool = new JedisPool(poolConfig,"127.0.0.1",6379,1000,"root");}public static Jedis getJedis(){return jedisPool.getResource();}}
// 动态分配端口
public static Integer selectPort(Integer port) {String portKey = "netty_port";Jedis jedis = JedisPoolUtils.getJedis();Map<String, String> portMap = jedis.hgetAll(portKey);System.out.println(portMap);// 由于map中的key都应该是整数类型的port,所以先转换成整数后,再比对,否则string类型的比对会有问题List<Integer> portList = portMap.entrySet().stream().map(entry -> Integer.valueOf(entry.getKey())).collect(Collectors.toList());// step1: 编码到此处先运行测试看一下结果System.out.println(portList);Integer nettyPort = null;if (portList == null || portList.isEmpty()) {// step2: 编码到此处先运行测试看一下结果jedis.hset(portKey, port+"", initOnlineCounts);nettyPort = port;} else {// 循环portList,获得最大值,并且累加10Optional<Integer> maxInteger = portList.stream().max(Integer::compareTo);Integer maxPort = maxInteger.get().intValue();Integer currentPort = maxPort + 10;jedis.hset(portKey, currentPort+"", initOnlineCounts);nettyPort = currentPort;}// step3: 编码到此处先运行测试看一下最终结果return nettyPort;}// 删除端口分配关系
public static void removePort(Integer port) {String portKey = "netty_port";Jedis jedis = JedisPoolUtils.getJedis();jedis.hdel(portKey, port+"");}

 这样就可以在启动类中自动分配端口

public static void main(String[] args) throws Exception {// 定义主从线程组// 定义主线程池,用于接受客户端的连接,但是不做任何处理,比如老板会谈业务,拉到业务就会交给下面的员工去做了EventLoopGroup bossGroup = new NioEventLoopGroup();// 定义从线程池,处理主线程池交过来的任务,公司业务员开展业务,完成老板交代的任务EventLoopGroup workerGroup = new NioEventLoopGroup();// Netty服务启动的时候,从redis中查找有没有端口,如果没有则用875,如果有则把端口累加1(或10)再启动Integer nettyPort = selectPort(875);try {// 构建Netty服务器ServerBootstrap server = new ServerBootstrap();     // 服务的启动类server.group(bossGroup, workerGroup)                // 把主从线程池组放入到启动类中.channel(NioServerSocketChannel.class)      // 设置Nio的双向通道.childHandler(new WSServerInitializer());   // 设置处理器,用于处理workerGroup// 启动server,并且绑定分配的端口号,同时启动方式为"同步"ChannelFuture channelFuture = server.bind(nettyPort).sync();// 监听关闭的channelchannelFuture.channel().closeFuture().sync();} finally {// 优雅的关闭线程池组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();// 移除现有的redis与netty的端口关系removePort(nettyPort);}}
Zookeeper实现Netty服务的注册、在线人数
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.5.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.5.0</version>
</dependency>
/*** Zookeeper 配置类*/
public class CuratorConfig {private static String host = "127.0.0.1:3191";                 // 单机/集群的ip:port地址private static Integer connectionTimeoutMs = 30 * 1000;        // 连接超时时间private static Integer sessionTimeoutMs = 3 * 1000;            // 会话超时时间private static Integer sleepMsBetweenRetry = 2 * 1000;         // 每次重试的间隔时间private static Integer maxRetries = 3;                         // 最大重试次数private static String namespace = "IM";                 // 命名空间(root根节点名称)// curator客户端private static CuratorFramework client;static {// 声明重试策略RetryPolicy backoffRetry = new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries);// 声明初始化客户端client = CuratorFrameworkFactory.builder().connectString(host).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).retryPolicy(backoffRetry).namespace(namespace).build();client.start();     // 启动curator客户端}public static CuratorFramework getClient() {return client;}}
/*** Netty服务节点类*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class NettyServerNode {private String ip;                   // IP地址private Integer port;                // 服务端口private Integer onlineCounts = 0;    // 在线人数}
/*** Zookeeper注册工具类 - 用于注册Netty服务节点和管理在线人数统计*/
public class ZookeeperRegister {/*** 注册Netty服务到Zookeeper* @param nodeName 节点名称(如服务名称)* @param ip Netty服务IP地址* @param port Netty服务端口号* @throws Exception 可能抛出的异常*/public static void registerNettyServer(String nodeName,String ip,Integer port) throws Exception {// 获取Zookeeper客户端连接CuratorFramework zkClient = CuratorConfig.getClient();String path = "/" + nodeName;// 检查父节点是否存在,不存在则创建持久化节点Stat stat = zkClient.checkExists().forPath(path);if (stat == null) {zkClient.create().creatingParentsIfNeeded()  // 自动创建父节点.withMode(CreateMode.PERSISTENT)  // 持久化节点.forPath(path);} else {System.out.println(stat.toString());}// 创建临时顺序节点存储Netty服务信息(EPHEMERAL_SEQUENTIAL表示临时顺序节点)NettyServerNode serverNode = new NettyServerNode();serverNode.setIp(ip);serverNode.setPort(port);String nodeJson = JsonUtils.objectToJson(serverNode);  // 对象转JSONzkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)  // 临时顺序节点.forPath(path + "/im-", nodeJson.getBytes());  // 节点路径格式:/nodeName/im-0000000001}/*** 获取本机IP地址* @return 本机IP地址* @throws Exception 可能抛出的异常*/public static String getLocalIp() throws Exception {InetAddress addr = InetAddress.getLocalHost();String ip = addr.getHostAddress();System.out.println("本机IP地址:" + ip);return ip;}/*** 增加在线人数统计* @param serverNode Netty服务节点信息* @throws Exception 可能抛出的异常*/public static void incrementOnlineCounts(NettyServerNode serverNode) throws Exception {dealOnlineCounts(serverNode, 1);  // 增加1个在线人数}/*** 减少在线人数统计* @param serverNode Netty服务节点信息* @throws Exception 可能抛出的异常*/public static void decrementOnlineCounts(NettyServerNode serverNode) throws Exception {dealOnlineCounts(serverNode, -1);  // 减少1个在线人数}/*** 处理在线人数的增减操作(核心方法)* @param serverNode Netty服务节点信息* @param counts 变化量(+1表示增加,-1表示减少)* @throws Exception 可能抛出的异常*/public static void dealOnlineCounts(NettyServerNode serverNode,Integer counts) throws Exception {// 获取Zookeeper客户端连接CuratorFramework zkClient = CuratorConfig.getClient();// 创建分布式读写锁(防止并发修改问题)InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(zkClient, "/rw-locks");readWriteLock.writeLock().acquire();  // 获取写锁try {String path = "/server-list";// 获取所有子节点List<String> list = zkClient.getChildren().forPath(path);// 遍历所有节点for (String node : list) {String pendingNodePath = path + "/" + node;// 获取节点数据String nodeValue = new String(zkClient.getData().forPath(pendingNodePath));// 反序列化为NettyServerNode对象NettyServerNode pendingNode = JsonUtils.jsonToPojo(nodeValue, NettyServerNode.class);// 匹配IP和端口的服务节点if (pendingNode.getIp().equals(serverNode.getIp()) &&(pendingNode.getPort().intValue() == serverNode.getPort().intValue())) {// 更新在线人数pendingNode.setOnlineCounts(pendingNode.getOnlineCounts() + counts);String nodeJson = JsonUtils.objectToJson(pendingNode);// 写回ZookeeperzkClient.setData().forPath(pendingNodePath, nodeJson.getBytes());}}} finally {readWriteLock.writeLock().release();  // 释放写锁}}
}

 然后启动服务时将节点注册到Zookeeper上

public static void main(String[] args) throws Exception {// 定义主从线程组// 定义主线程池,用于接受客户端的连接,但是不做任何处理,比如老板会谈业务,拉到业务就会交给下面的员工去做了EventLoopGroup bossGroup = new NioEventLoopGroup();// 定义从线程池,处理主线程池交过来的任务,公司业务员开展业务,完成老板交代的任务EventLoopGroup workerGroup = new NioEventLoopGroup();// Netty服务启动的时候,从redis中查找有没有端口,如果没有则用875,如果有则把端口累加1(或10)再启动Integer nettyPort = selectPort(875);// 注册当前netty服务到zookeeper中ZookeeperRegister.registerNettyServer("server-list",ZookeeperRegister.getLocalIp(),nettyPort);try {// 构建Netty服务器ServerBootstrap server = new ServerBootstrap();     // 服务的启动类server.group(bossGroup, workerGroup)                // 把主从线程池组放入到启动类中.channel(NioServerSocketChannel.class)      // 设置Nio的双向通道.childHandler(new WSServerInitializer());   // 设置处理器,用于处理workerGroup// 启动server,并且绑定自动分配的端口号,同时启动方式为"同步"ChannelFuture channelFuture = server.bind(nettyPort).sync();// 监听关闭的channelchannelFuture.channel().closeFuture().sync();} finally {// 优雅的关闭线程池组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();// 移除现有的redis与netty的端口关系removePort(nettyPort);}}
Chat-Web服务根据人数最少策略拿到Netty地址
<!-- zookeeper -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.9.2</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.3.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.3.0</version>
</dependency>
zookeeper:curator:host: 127.0.0.1:3191connectionTimeoutMs: 30000sessionTimeoutMs: 3000sleepMsBetweenRetry: 2000maxRetries: 3namespace: itzixi-im
@Component
@ConfigurationProperties(prefix = "zookeeper.curator")
@Data
public class CuratorConfig extends BaseInfoProperties {private String host;                    // 单机/集群的ip:port地址private Integer connectionTimeoutMs;    // 连接超时时间private Integer sessionTimeoutMs;         // 会话超时时间private Integer sleepMsBetweenRetry;    // 每次重试的间隔时间private Integer maxRetries;             // 最大重试次数private String namespace;               // 命名空间(root根节点名称)public static final String path = "/server-list";@Bean("curatorClient")public CuratorFramework curatorClient() {// 三秒后重连一次,只连一次//RetryPolicy retryOneTime = new RetryOneTime(3000);// 每3秒重连一次,重连3次//RetryPolicy retryNTimes = new RetryNTimes(3, 3000);// 每3秒重连一次,总等待时间超过10秒则停止重连//RetryPolicy retryPolicy = new RetryUntilElapsed(10 * 1000, 3000);// 随着重试次数的增加,重试的间隔时间也会增加(推荐)RetryPolicy backoffRetry = new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries);// 声明初始化客户端CuratorFramework client = CuratorFrameworkFactory.builder().connectString(host).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).retryPolicy(backoffRetry).namespace(namespace).build();client.start();     // 启动curator客户端return client;}
@Resource(name = "curatorClient")private CuratorFramework zkClient;@PostMapping("getNettyOnlineInfo")public GraceJSONResult getNettyOnlineInfo() throws Exception {// 从zookeeper中获得当前已经注册的netty 服务列表String path = "/server-list";List<String> list = zkClient.getChildren().forPath(path);List<NettyServerNode> serverNodeList = new ArrayList<>();for (String node:list) {// System.out.println(node);String nodeValue = new String(zkClient.getData().forPath(path + "/" + node));// System.out.println(nodeValue);NettyServerNode serverNode = JsonUtils.jsonToPojo(nodeValue, NettyServerNode.class);serverNodeList.add(serverNode);}// 计算当前哪个zk的node是最少人数连接,获得[ip:port]并且返回给前端Optional<NettyServerNode> minNodeOptional = serverNodeList.stream().min(Comparator.comparing(nettyServerNode -> nettyServerNode.getOnlineCounts()));NettyServerNode minNode = minNodeOptional.get();return Result.ok(minNode);}

这样前端就可以根据调用此接口获得的Netty节点进行连接

RabbitMQ实现Netty服务对消息的监听消费

我们这里将使用RabbitMQ的topic消息队列将消息广播到所有Netty服务,各个Netty服务进行查找要发送的用户的channel,最终会有一台找到了并且进行发送或者都没找到存储到数据库。

当然我们也可以用Redis实现,只需要将用户ID与Netty服务的节点进行绑定,当发送消息时去Redis找到要发送的用户channel所在的节点,使用RabbitMQ发送到对应节点的队列即可,可以不用广播到所有Netty节点了。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
/*** RabbitMQ连接工具类 - 提供RabbitMQ连接池管理和消息收发功能*/
public class RabbitMQConnectUtils {// 连接池集合,用于复用连接private final List<Connection> connections = new ArrayList<>();// 连接池最大连接数限制private final int maxConnection = 20;// RabbitMQ服务器配置private final String host = "127.0.0.1";private final int port = 5682;private final String username = "root";private final String password = "1234";private final String virtualHost = "IM";  // 虚拟主机// RabbitMQ连接工厂public ConnectionFactory factory;/*** 获取RabbitMQ连接工厂* @return ConnectionFactory实例*/public ConnectionFactory getRabbitMqConnection() {return getFactory();}/*** 获取连接工厂(单例模式)* @return 初始化好的ConnectionFactory*/public ConnectionFactory getFactory() {initFactory();return factory;}/*** 初始化连接工厂配置*/private void initFactory() {try {if (factory == null) {factory = new ConnectionFactory();factory.setHost(host);  // 设置主机地址factory.setPort(port);  // 设置端口factory.setUsername(username);  // 设置用户名factory.setPassword(password);  // 设置密码factory.setVirtualHost(virtualHost);  // 设置虚拟主机}} catch (Exception e) {e.printStackTrace();}}/*** 发送消息到RabbitMQ* @param message 消息内容* @param exchange 交换机名称* @param routingKey 路由键* @throws Exception 可能抛出的异常*/public void sendMsg(String message, String exchange, String routingKey) throws Exception {// 从连接池获取连接Connection connection = getConnection();// 创建通道Channel channel = connection.createChannel();// 发布消息(消息持久化)channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("utf-8"));// 关闭通道channel.close();// 归还连接到连接池setConnection(connection);}/*** 从指定队列获取单条消息* @param queue 队列名称* @param autoAck 是否自动确认* @return GetResponse对象,包含消息内容* @throws Exception 可能抛出的异常*/public GetResponse basicGet(String queue, boolean autoAck) throws Exception {GetResponse getResponse = null;// 从连接池获取连接Connection connection = getConnection();// 创建通道Channel channel = connection.createChannel();// 获取消息getResponse = channel.basicGet(queue, autoAck);// 关闭通道channel.close();// 归还连接到连接池setConnection(connection);return getResponse;}/*** 从连接池获取连接* @return RabbitMQ连接* @throws Exception 可能抛出的异常*/public Connection getConnection() throws Exception {return getAndSetConnection(true, null);}/*** 归还连接到连接池* @param connection 要归还的连接* @throws Exception 可能抛出的异常*/public void setConnection(Connection connection) throws Exception {getAndSetConnection(false, connection);}/*** 监听指定交换机的队列消息(FANOUT模式)* @param fanout_exchange 交换机名称* @param queueName 队列名称* @throws Exception 可能抛出的异常*/public void listen(String fanout_exchange, String queueName) throws Exception {// 获取连接Connection connection = getConnection();// 创建通道Channel channel = connection.createChannel();// 声明FANOUT类型交换机(持久化)channel.exchangeDeclare(fanout_exchange,BuiltinExchangeType.FANOUT,true, false, false, null);// 声明队列(持久化,非排他,非自动删除)channel.queueDeclare(queueName, true, false, false, null);// 绑定队列到交换机(FANOUT模式不需要路由键)channel.queueBind(queueName, fanout_exchange, "");// 创建消费者Consumer consumer = new DefaultConsumer(channel){/*** 消息处理回调方法* @param consumerTag 消费者标签* @param envelope 消息信封(包含交换机和路由信息)* @param properties 消息属性* @param body 消息体*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {// 解析消息内容String msg = new String(body);System.out.println("body = " + msg);String exchange = envelope.getExchange();System.out.println("exchange = " + exchange);// 处理fanout_exchange类型的消息if (exchange.equalsIgnoreCase("fanout_exchange")) {// 反序列化消息内容DataContent dataContent = JsonUtils.jsonToPojo(msg, DataContent.class);String senderId = dataContent.getChatMsg().getSenderId();String receiverId = dataContent.getChatMsg().getReceiverId();// 1. 发送消息给接收者(支持多设备)List<io.netty.channel.Channel> receiverChannels =UserChannelSession.getMultiChannels(receiverId);UserChannelSession.sendToTarget(receiverChannels, dataContent);// 2. 同步消息给发送者的其他设备(排除当前设备)String currentChannelId = dataContent.getExtend();List<io.netty.channel.Channel> senderChannels =UserChannelSession.getMyOtherChannels(senderId, currentChannelId);UserChannelSession.sendToTarget(senderChannels, dataContent);}}};// 开始消费消息(自动确认模式)channel.basicConsume(queueName, true, consumer);}/*** 连接池核心管理方法(线程安全)* @param isGet true表示获取连接,false表示归还连接* @param connection 要归还的连接(isGet为false时有效)* @return 获取到的连接(isGet为true时有效)* @throws Exception 可能抛出的异常*/private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception {// 确保连接工厂已初始化getRabbitMqConnection();if (isGet) {// 获取连接逻辑if (connections.isEmpty()) {// 连接池为空,创建新连接return factory.newConnection();}// 从连接池取出第一个连接Connection newConnection = connections.get(0);connections.remove(0);// 检查连接是否有效if (newConnection.isOpen()) {return newConnection;} else {// 连接已关闭,创建新连接return factory.newConnection();}} else {// 归还连接逻辑if (connections.size() < maxConnection) {// 连接池未满,回收连接connections.add(connection);}// 连接池已满,不回收(连接会被自动关闭)return null;}}
}

修改ChatHandler信息处理类,消息不再在此类中处理,而是发给RabbitMQ

/*** ChatHandler类*/
// SimpleChannelInboundHandler: 对于请求来说,相当于入站(入境)
// TextWebSocketFrame: 用于为websocket专门处理的文本数据对象,Frame是数据(消息)的载体
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {// 用于记录和管理所有客户端的channel组public static ChannelGroup clients =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overrideprotected void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {// 获得客户端传输过来的消息String content = msg.text();System.out.println("接受到的数据:" + content);// 1. 获取客户端发来的消息并且解析DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);ChatMsg chatMsg = dataContent.getChatMsg();String msgText = chatMsg.getMsg();String receiverId = chatMsg.getReceiverId();String senderId = chatMsg.getSenderId();// 判断是否黑名单 start// 如果双方只要有一方是黑名单,则终止发送Result result = OkHttpUtil.get("http://127.0.0.1:1000/friendship/isBlack?friendId1st=" + receiverId+ "&friendId2nd=" + senderId);boolean isBlack = (Boolean)result.getData();System.out.println("当前的黑名单关系为: " + isBlack);if (isBlack) {return;}// 判断是否黑名单 end// 时间校准,以服务器的时间为准chatMsg.setChatTime(LocalDateTime.now());Integer msgType = chatMsg.getMsgType();// 获取channelChannel currentChannel = ctx.channel();String currentChannelId = currentChannel.id().asLongText();String currentChannelIdShort = currentChannel.id().asShortText();// 2. 判断消息类型,根据不同的类型来处理不同的业务if (msgType == MsgTypeEnum.CONNECT_INIT.type) {// 当websocket初次open的时候,初始化channel,把channel和用户userid关联起来UserChannelSession.putMultiChannels(senderId, currentChannel);UserChannelSession.putUserChannelIdRelation(currentChannelId, senderId);NettyServerNode minNode = dataContent.getServerNode();// System.out.println(minNode);// 初次连接后,该节点下的在线人数累加ZookeeperRegister.incrementOnlineCounts(minNode);// 获得ip+端口,在redis中设置关系,以便在前端设备断线后减少在线人数Jedis jedis = JedisPoolUtils.getJedis();jedis.set(senderId, JsonUtils.objectToJson(minNode));} else if (msgType == MsgTypeEnum.WORDS.type|| msgType == MsgTypeEnum.IMAGE.type|| msgType == MsgTypeEnum.VIDEO.type|| msgType == MsgTypeEnum.VOICE.type) {// 此处为mq异步解耦,保存信息到数据库,数据库无法获得信息的主键id,// 所以此处可以用snowflake直接生成唯一的主键idSnowflake snowflake = new Snowflake(new IdWorkerConfigBean());String sid = snowflake.nextId();System.out.println("sid = " + sid);String iid = IdWorker.getIdStr();System.out.println("iid = " + iid);chatMsg.setMsgId(sid);// 此处receiverId所对应的channel为空// 发送消息// List<Channel> receiverChannels = UserChannelSession.getMultiChannels(receiverId);// if (receiverChannels == null || receiverChannels.size() == 0 || receiverChannels.isEmpty()) {// receiverChannels为空,表示用户离线/断线状态,消息不需要发送,后续可以存储到数据库// chatMsg.setIsReceiverOnLine(false);// } else {//     chatMsg.setIsReceiverOnLine(true);if (msgType == MsgTypeEnum.VOICE.type) {chatMsg.setIsRead(false);}dataContent.setChatMsg(chatMsg);String chatTimeFormat = LocalDateUtils.format(chatMsg.getChatTime(),LocalDateUtils.DATETIME_PATTERN_2);dataContent.setChatTime(chatTimeFormat);// UserChannelSession.sendToTarget(receiverChannels, dataContent);// 通过RabbitMQ发送消息MessagePublisher.sendMsgToOtherNettyServer(JsonUtils.objectToJson(dataContent));                                                                                                                                                // 当receiverChannels为空不为空的时候,同账户多端设备接受消息// for (Channel c : receiverChannels) {//     Channel findChannel = clients.find(c.id());//     if (findChannel != null) {////         // if (msgType == MsgTypeEnum.VOICE.type) {//         //     chatMsg.setIsRead(false);//         // }//         // dataContent.setChatMsg(chatMsg);//         // String chatTimeFormat = LocalDateUtils//         //         .format(chatMsg.getChatTime(),//         //                 LocalDateUtils.DATETIME_PATTERN_2);//         // dataContent.setChatTime(chatTimeFormat);//         // 发送消息给在线的用户//         findChannel.writeAndFlush(//                 new TextWebSocketFrame(//                         JsonUtils.objectToJson(dataContent)));//     }//// }// }// TODO: 消息持久化到数据库(通过MQ异步处理或者其他方式)}// 此处也不需要了,都在mq的监听中完成// dataContent.setChatMsg(chatMsg);// String chatTimeFormat = LocalDateUtils//         .format(chatMsg.getChatTime(),//                 LocalDateUtils.DATETIME_PATTERN_2);// dataContent.setChatTime(chatTimeFormat);// dataContent.setExtend(currentChannelId);//// List<Channel> myOtherChannels = UserChannelSession//                 .getMyOtherChannels(senderId, currentChannelId);// UserChannelSession.sendToMyOthers(myOtherChannels, dataContent);// for (Channel c : myOtherChannels) {//     Channel findChannel = clients.find(c.id());//     if (findChannel != null) {//         // dataContent.setChatMsg(chatMsg);//         // String chatTimeFormat = LocalDateUtils//         //         .format(chatMsg.getChatTime(),//         //                 LocalDateUtils.DATETIME_PATTERN_2);//         // dataContent.setChatTime(chatTimeFormat);//         // 同步消息给在线的其他设备端//         findChannel.writeAndFlush(//                 new TextWebSocketFrame(//                         JsonUtils.objectToJson(dataContent)));//     }// }// currentChannel.writeAndFlush(new TextWebSocketFrame(currentChannelId));// clients.writeAndFlush(new TextWebSocketFrame(currentChannelId));// 调试输出当前会话状态UserChannelSession.outputMulti();}/*** 客户端连接到服务端之后(打开链接)* @param ctx* @throws Exception*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel currentChannel = ctx.channel();String currentChannelId = currentChannel.id().asLongText();System.out.println("客户端建立连接,channel对应的长id为:" + currentChannelId);// 获得客户端的channel,并且存入到ChannelGroup中进行管理(作为一个客户端群组)clients.add(currentChannel);}/*** 关闭连接,移除channel* @param ctx* @throws Exception*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel currentChannel = ctx.channel();String currentChannelId = currentChannel.id().asLongText();System.out.println("客户端关闭连接,channel对应的长id为:" + currentChannelId);// 移除多余的会话String userId = UserChannelSession.getUserIdByChannelId(currentChannelId);UserChannelSession.removeUselessChannels(userId, currentChannelId);clients.remove(currentChannel);// zk中在线人数累减Jedis jedis = JedisPoolUtils.getJedis();NettyServerNode minNode = JsonUtils.jsonToPojo(jedis.get(userId),NettyServerNode.class);ZookeeperRegister.decrementOnlineCounts(minNode);}/*** 发生异常并且捕获,移除channel* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {Channel currentChannel = ctx.channel();String currentChannelId = currentChannel.id().asLongText();System.out.println("发生异常捕获,channel对应的长id为:" + currentChannelId);// 发生异常之后关闭连接(关闭channel)ctx.channel().close();// 随后从ChannelGroup中移除对应的channelclients.remove(currentChannel);// 移除多余的会话String userId = UserChannelSession.getUserIdByChannelId(currentChannelId);UserChannelSession.removeUselessChannels(userId, currentChannelId);// zk中在线人数累减Jedis jedis = JedisPoolUtils.getJedis();NettyServerNode minNode = JsonUtils.jsonToPojo(jedis.get(userId),NettyServerNode.class);ZookeeperRegister.decrementOnlineCounts(minNode);}}public class RabbitMQConnectUtils {private final List<Connection> connections = new ArrayList<>();private final int maxConnection = 20;private final String host = "127.0.0.1";private final int port = 5682;private final String username = "root";private final String password = "1234";private final String virtualHost = "IM";public ConnectionFactory factory;public ConnectionFactory getRabbitMqConnection() {return getFactory();}public ConnectionFactory getFactory() {initFactory();return factory;}private void initFactory() {try {if (factory == null) {factory = new ConnectionFactory();factory.setHost(host);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);factory.setVirtualHost(virtualHost);}} catch (Exception e) {e.printStackTrace();}}public void sendMsg(String message, String exchange, String routingKey) throws Exception {Connection connection = getConnection();Channel channel = connection.createChannel();channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("utf-8"));channel.close();setConnection(connection);}public GetResponse basicGet(String queue, boolean autoAck) throws Exception {GetResponse getResponse = null;Connection connection = getConnection();Channel channel = connection.createChannel();getResponse = channel.basicGet(queue, autoAck);channel.close();setConnection(connection);return getResponse;}public Connection getConnection() throws Exception {return getAndSetConnection(true, null);}public void setConnection(Connection connection) throws Exception {getAndSetConnection(false, connection);}public void listen(String fanout_exchange, String queueName) throws Exception {Connection connection = getConnection();Channel channel = connection.createChannel();// FANOUT 发布订阅模式(广播模式)channel.exchangeDeclare(fanout_exchange,BuiltinExchangeType.FANOUT,true, false, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, fanout_exchange, "");Consumer consumer = new DefaultConsumer(channel){/*** 重写消息配送方法* @param consumerTag 消息的标签(标识)* @param envelope  信封(一些信息,比如交换机路由等等信息)* @param properties 配置信息* @param body 收到的消息数据* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String msg = new String(body);System.out.println("body = " + msg);String exchange = envelope.getExchange();System.out.println("exchange = " + exchange);if (exchange.equalsIgnoreCase("fanout_exchange")) {DataContent dataContent = JsonUtils.jsonToPojo(msg, DataContent.class);String senderId = dataContent.getChatMsg().getSenderId();String receiverId = dataContent.getChatMsg().getReceiverId();// 广播至集群的其他节点并且发送给用户聊天信息List<io.netty.channel.Channel> receiverChannels =UserChannelSession.getMultiChannels(receiverId);UserChannelSession.sendToTarget(receiverChannels, dataContent);// 广播至集群的其他节点并且同步给自己其他设备聊天信息String currentChannelId = dataContent.getExtend();List<io.netty.channel.Channel> senderChannels =UserChannelSession.getMyOtherChannels(senderId, currentChannelId);UserChannelSession.sendToTarget(senderChannels, dataContent);}}};/*** queue: 监听的队列名* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知* callback: 回调函数,处理监听到的消息*/channel.basicConsume(queueName, true, consumer);}private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception {getRabbitMqConnection();if (isGet) {if (connections.isEmpty()) {return factory.newConnection();}Connection newConnection = connections.get(0);connections.remove(0);if (newConnection.isOpen()) {return newConnection;} else {return factory.newConnection();}} else {if (connections.size() < maxConnection) {connections.add(connection);}return null;}}
}
public class MessagePublisher {public static void sendMsgToOtherNettyServer(String msg) throws Exception {RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();String fanout_exchange = "fanout_exchange";connectUtils.sendMsg(msg, fanout_exchange, "");}
}
Chat-Web服务监听Zookeeper节点清理Redis与RabbitMQ残留数据
spring: rabbitmq:host: 127.0.0.1port: 5682username: rootpassword: 1234virtual-host: wechat-dev

删除队列只可以使用RabbitAdminRabbitTemplate无法删除

/*** RabbitAdmin的配置类*/
@Configuration
public class RabbitAdminConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private Integer port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;/*** 构建RabbitMQ的连接工厂* @return*/@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setPassword(username);connectionFactory.setUsername(password);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}/*** 构建RabbitAdmin* @param connectionFactory* @return*/@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){return new RabbitAdmin(connectionFactory);}}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryPolicy;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;/*** Zookeeper Curator 客户端配置类* 功能:* 1. 初始化Curator客户端连接* 2. 监听Zookeeper节点变化* 3. 处理节点删除时的清理工作(Redis/RabbitMQ)*/
@Slf4j
@Component
@ConfigurationProperties(prefix = "zookeeper.curator") // 从配置文件中读取前缀为zookeeper.curator的属性
@Data // Lombok注解,自动生成getter/setter
public class CuratorConfig extends BaseInfoProperties {// Zookeeper连接配置private String host;                    // Zookeeper服务器地址(格式:ip:port)private Integer connectionTimeoutMs;    // 连接超时时间(毫秒)private Integer sessionTimeoutMs;       // 会话超时时间(毫秒)private Integer sleepMsBetweenRetry;    // 重试间隔时间(毫秒)private Integer maxRetries;             // 最大重试次数private String namespace;               // 命名空间(相当于根节点)// 监听的Zookeeper路径public static final String path = "/server-list";// Redis和RabbitMQ操作模板@Autowiredprivate RedisTemplate redisTemplate;@Resourceprivate RabbitAdmin rabbitAdmin;/*** 创建CuratorFramework客户端Bean* @return 配置好的Curator客户端实例*/@Bean("curatorClient")public CuratorFramework curatorClient() {// 使用指数退避策略进行重试(推荐)// 参数:初始重试间隔时间,最大重试次数RetryPolicy backoffRetry = new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries);// 构建Curator客户端CuratorFramework client = CuratorFrameworkFactory.builder().connectString(host)               // Zookeeper服务器地址.connectionTimeoutMs(connectionTimeoutMs) // 连接超时时间.sessionTimeoutMs(sessionTimeoutMs)       // 会话超时时间.retryPolicy(backoffRetry)         // 重试策略.namespace(namespace)              // 命名空间隔离.build();client.start();     // 启动客户端// 注册节点监听器add(path, client);return client;}/*** 注册Zookeeper节点监听器* @param path 监听的节点路径* @param client Curator客户端实例*/public void add(String path, CuratorFramework client) {// 创建节点缓存CuratorCache curatorCache = CuratorCache.build(client, path);// 添加监听器curatorCache.listenable().addListener((type, oldData, data) -> {// type: 事件类型(NODE_CREATED, NODE_CHANGED, NODE_DELETED)// oldData: 事件发生前的节点数据// data: 事件发生后的节点数据switch (type.name()) {case "NODE_CREATED":log.info("(子)节点创建");break;case "NODE_CHANGED":log.info("(子)节点数据变更");break;case "NODE_DELETED":log.info("(子)节点删除");// 反序列化被删除节点的数据NettyServerNode oldNode = JsonUtils.jsonToPojo(new String(oldData.getData()), NettyServerNode.class);log.info("被删除节点路径: {}, 节点值: {}", oldData.getPath(), oldNode);// 1. 清理Redis中的相关数据String oldPort = oldNode.getPort() + "";String portKey = "netty_port";redis.hdel(portKey, oldPort); // 删除Redis中存储的端口信息// 2. 删除RabbitMQ中对应的队列String queueName = "netty_queue_" + oldPort;rabbitAdmin.deleteQueue(queueName); // 删除RabbitMQ队列break;default:log.info("未处理的事件类型: {}", type);break;}});curatorCache.start(); // 启动监听}
}

七、结语

在分布式系统日益普及的今天,Netty 作为高性能网络通信框架,其单体架构与集群架构的选择需紧密结合业务需求、团队能力和资源投入进行权衡。

单体架构 以 简单、低延迟、低成本 为核心优势,适合快速验证、轻量级应用或资源受限的场景。然而,其 单点故障风险 和 性能天花板 决定了它难以支撑大规模并发或高可用性要求,长期来看可能成为业务增长的瓶颈。

集群架构 通过 分布式扩展、容灾能力和负载均衡 解决了单体架构的痛点,是支撑高并发、高稳定性系统的关键方案。但随之而来的是 复杂性提升、性能开销增加 以及 运维成本高企 等挑战,需要团队具备分布式系统设计、监控治理和故障恢复的成熟经验。

实践建议

初期优先单体:在业务初期或内部工具开发中,优先选择单体架构以快速迭代,降低开发成本。

渐进式迁移:当并发量接近单机极限(如 10K+ 连接)或可用性要求提升时,通过服务拆分、网关层抽象或消息队列(如 Kafka)逐步向集群过渡。

技术选型平衡:集群架构中需合理选择负载均衡策略(如轮询、最少连接)、序列化协议(如 Protobuf、JSON)和一致性模型(如最终一致性),避免过度设计。

关注可观测性:集群环境下需加强日志聚合(ELK)、分布式追踪(SkyWalking)和链路压测,确保问题可定位、性能可优化。

最终目标:无论选择单体还是集群,均应以 业务价值 为导向,避免为“分布式而分布式”。在技术复杂性与业务需求间找到平衡点,才能构建出既高效又稳定的网络通信系统。

上述三种方案可大致实现Netty集群,如果有更高性能的方案或者疑问欢迎评论区留言讨论! 

http://www.xdnf.cn/news/1146835.html

相关文章:

  • 如何用Python并发下载?深入解析concurrent.futures 与期物机制
  • 【密码学】1. 引言
  • 目标框的位置以及大小的分布
  • 题解:CF1617C Paprika and Permutation
  • VMC850立式加工中心Y轴传动机械结构设计cad【7张】三维图+设计说明书
  • DTW算法解决时序问题的解析实践
  • JavaSE -- 数组详细讲解(数组介绍,Arrays常用方法,二维数组创建)
  • Spring中的设计模式
  • pom.xml文件中的${}变量从哪里传值
  • 基于Qwen2.5-3B-Instruct的LoRA微调与推理实战指南
  • js中的微任务和宏任务的理解
  • 读书笔记:《动手做AI Agent》
  • Android性能优化之UI渲染优化
  • LP-MSPM0G3507学习--05中断及管脚中断
  • CMake指令:常见内置命令行工具( CMake -E )
  • math.h函数
  • CCF编程能力等级认证GESP—C++3级—20250628
  • 20250718-3-Kubernetes 应用程序生命周期管理-Pod对象:存在意义_笔记
  • MyBatis-Flex代码生成
  • jvm分析篇---1、先认识下dump文件
  • b-up:Enzo_Mi:深度学习基础知识
  • 【C语言进阶】题目练习(2)
  • 【51】MFC入门到精通——MFC串口助手(一)---初级版(初始化、串口设置、修改参数、打开/关闭、状态显示),附源码
  • 机器学习基础:线性回归算法详解(原理+代码+实战)
  • Proto文件从入门到精通——现代分布式系统通信的基石(含实战案例)
  • 数据库模型异常问题深度解析:冗余与操作异常
  • 柴油机活塞cad【4张】三维图+设计说明书
  • 小架构step系列18:工具
  • 《每日AI-人工智能-编程日报》--2025年7月18日
  • 【洛谷P1417】烹调方案 题解