Netty网络聊天室及扩展序列化算法
一、前言
Netty是一个基于Java的高性能、事件驱动的网络应用框架,广泛应用于各种网络通信场景。本文将介绍如何使用Netty构建一个简单的网络聊天室,并扩展序列化算法来提高数据传输效率和灵活性。
二、Netty网络聊天室的实现
1. 项目结构
我们将使用Maven构建项目,项目结构如下:
netty-chatroom/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ ├── server/
│ │ │ │ ├── ChatServer.java
│ │ │ │ ├── ChatServerInitializer.java
│ │ │ │ ├── ChatServerHandler.java
│ │ │ ├── client/
│ │ │ │ ├── ChatClient.java
│ │ │ │ ├── ChatClientInitializer.java
│ │ │ │ ├── ChatClientHandler.java
│ │ ├── resources/
│ │ ├── log4j.properties
├── pom.xml
2. 服务器端实现
ChatServer.java
package server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class ChatServer {private final int port;public ChatServer(int port) {this.port = port;}public void start() throws Exception {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChatServerInitializer()).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {new ChatServer(8080).start();}
}
ChatServerInitializer.java
package server;import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;public class ChatServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ChatServerHandler());}
}
ChatServerHandler.java
package server;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;public class ChatServerHandler extends SimpleChannelInboundHandler<String> {private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {channels.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {channels.remove(ctx.channel());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {for (var channel : channels) {if (channel != ctx.channel()) {channel.writeAndFlush("[Client] " + ctx.channel().remoteAddress() + " says: " + msg + "\n");} else {channel.writeAndFlush("[You] " + msg + "\n");}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
3. 客户端实现
ChatClient.java
package client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class ChatClient {private final String host;private final int port;public ChatClient(String host, int port) {this.host = host;this.port = port;}public void start() throws Exception {NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChatClientInitializer());ChannelFuture f = b.connect(host, port).sync();f.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}public static void main(String[] args) throws Exception {new ChatClient("localhost", 8080).start();}
}
ChatClientInitializer.java
package client;import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;public class ChatClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ChatClientHandler());}
}
ChatClientHandler.java
package client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class ChatClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
三、扩展序列化算法
为了提高数据传输效率,我们可以扩展Netty的序列化算法。Netty默认提供的序列化算法包括Java序列化、JSON、Protobuf等。下面介绍如何使用Protobuf进行序列化。
1. 配置Protobuf
首先,在 pom.xml
中添加Protobuf依赖:
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.19.1</version>
</dependency>
2. 定义Protobuf消息
创建一个 chat.proto
文件:
syntax = "proto3";package chat;message ChatMessage {string from = 1;string to = 2;string content = 3;
}
编译Protobuf文件生成Java类:
protoc --java_out=src/main/java src/main/proto/chat.proto
3. 修改服务器端处理器
在服务器端,使用Protobuf进行消息的序列化和反序列化:
package server;import chat.ChatMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;public class ChatServerHandler extends SimpleChannelInboundHandler<ChatMessage> {private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {channels.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {channels.remove(ctx.channel());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ChatMessage msg) throws Exception {for (var channel : channels) {if (channel != ctx.channel()) {channel.writeAndFlush(msg.toBuilder().setContent("[Client] " + ctx.channel().remoteAddress() + " says: " + msg.getContent()).build());} else {channel.writeAndFlush(msg.toBuilder().setContent("[You] " + msg.getContent()).build());}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
4. 修改客户端处理器
在客户端,同样使用Protobuf进行消息的序列化和反序列化:
package client;import chat.ChatMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class ChatClientHandler extends SimpleChannelInboundHandler<ChatMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ChatMessage msg) throws Exception {System.out.println(msg.getContent());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}