【Netty系列】Protobuf编码解码:客户端、服务端
以下是一个完整的Netty + Protobuf实际案例,包含服务端和客户端实现:
步骤1:定义Protobuf协议
创建message.proto
文件:
syntax = "proto3";option java_package = "com.example.netty.protobuf";
option java_outer_classname = "MessageProto";message Message {int32 id = 1;string content = 2;MessageType type = 3;enum MessageType {REQUEST = 0;RESPONSE = 1;}
}
生成Java类:
protoc --java_out=./src/main/java message.proto
步骤2:服务端实现
// Server.java
public class Server {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 解码器pipeline.addLast(new ProtobufVarint32FrameDecoder());pipeline.addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));// 编码器pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());pipeline.addLast(new ProtobufEncoder());// 业务处理器pipeline.addLast(new ServerHandler());}});ChannelFuture f = b.bind(8888).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}// ServerHandler.java
public class ServerHandler extends SimpleChannelInboundHandler<MessageProto.Message> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProto.Message msg) {System.out.println("Server received: " + msg.getContent());// 构建响应MessageProto.Message response = MessageProto.Message.newBuilder().setId(msg.getId()).setContent("Response to: " + msg.getContent()).setType(MessageProto.Message.MessageType.RESPONSE).build();ctx.writeAndFlush(response);}
}
步骤3:客户端实现
// Client.java
public class Client {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ProtobufVarint32FrameDecoder());pipeline.addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());pipeline.addLast(new ProtobufEncoder());pipeline.addLast(new ClientHandler());}});ChannelFuture f = b.connect("localhost", 8888).sync();// 发送消息MessageProto.Message request = MessageProto.Message.newBuilder().setId(1).setContent("Hello Server").setType(MessageProto.Message.MessageType.REQUEST).build();f.channel().writeAndFlush(request);f.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}
}// ClientHandler.java
public class ClientHandler extends SimpleChannelInboundHandler<MessageProto.Message> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProto.Message msg) {System.out.println("Client received: " + msg.getContent());}
}
关键点说明
- 编解码器链配置:
-
ProtobufVarint32FrameDecoder
:处理TCP粘包/拆包ProtobufDecoder
:将字节流转换为Protobuf对象ProtobufVarint32LengthFieldPrepender
:添加长度前缀ProtobufEncoder
:将Protobuf对象序列化为字节流
- 消息流转:
客户端发送: Protobuf对象 → 编码 → 字节流 → 网络传输
服务端接收: 字节流 → 解码 → Protobuf对象 → 业务处理
- 性能优化:
-
- 复用Protobuf对象实例(
getDefaultInstance()
) - 使用
Unpooled
直接内存分配(示例中Netty已自动优化)
- 复用Protobuf对象实例(
运行测试
- 先启动服务端
- 再启动客户端
- 观察控制台输出:
Server received: Hello Server
Client received: Response to: Hello Server
扩展建议
- 添加SSL加密:
pipeline.addFirst(new SslHandler(sslEngine));
- 使用
LengthFieldBasedFrameDecoder
替代Protobuf自带拆包器 - 结合
ByteToMessageCodec
实现复合消息处理
这个示例完整展示了Netty与Protobuf的整合,实现了二进制数据的高效传输。实际生产环境中可根据需要添加心跳机制、重连策略等扩展功能。