介绍
Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和简化了网络编程,例如 TCP 和 UDP 套接字服务器。
快速入门
官方 4.x 用户指南- 推荐版本
Reference (4.1.101.Final)API文档
发送接收时间戳
请查看案例
处理基于流的传输
简单来说,网络传输如果不进行如何处理可能会导致读取的不正确。
第一个解决方案
简单来说,设置一个等待多少长度的数据到达,但是这会带来一个问题就是实际使用中传输的数据长度是可变的。
public class ClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
// 当ChannelHandler添加到Channel时调用
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
// 分配一个4字节的缓冲区
buf = ctx.alloc().buffer(4);
}
// 当ChannelHandler从Channel移除时调用
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
// 释放缓冲区
buf.release();
buf = null;
}
}
第二种解决方案
幸运的是,Netty 提供了一个可扩展的类,可以帮助您编写开箱即用的第一个类:
public class Decoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 判断可读字节是否小于4
if (in.readableBytes() < 4) {
return;
}
// 将可读字节读取4个字节,添加到out中
out.add(in.readBytes(4));
}
}
ByteToMessageDecoder是一个ChannelInboundHandler可以轻松处理碎片问题的实现。
ByteToMessageDecoderdecode()每当收到新数据时,都会使用内部维护的累积缓冲区调用该方法。
decode()out当累积缓冲区中没有足够的数据时可以决定不添加任何内容。当收到更多数据时ByteToMessageDecoder将再次调用。decode()
如果decode()添加一个对象out,则表示解码器成功解码了一条消息。ByteToMessageDecoder将丢弃累积缓冲区的读取部分。请记住,您不需要解码多条消息。ByteToMessageDecoder将继续调用该decode()方法,直到它没有添加任何内容out。
然后在设置ChannelHandler中添加
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new Decoder(), new ClientHandler());
}
});
此外,Netty 提供了开箱即用的解码器,使您能够非常轻松地实现大多数协议,并帮助您避免最终实现单一的、不可维护的处理程序。更详细的示例请参考以下包:
io.netty.example.factorial对于二进制协议,以及
io.netty.example.telnet用于基于文本行的协议。
用 POJO 代替ByteBuf
如果需要使用 POJO 而不是ByteBuf,那么就需要写对应的编码器和解码器
Object编码器和解码器
只需要在连接时传入(编码器,解码器,处理器)如下
ch.pipeline().addLast(new ObjectEncoder(),new ObjectDecoder(String.class),new Handler());
就可以直接发送消息
// 向客户端发送消息
ChannelFuture f = ctx.writeAndFlush("消息");
// 添加关闭监听器
f.addListener(ChannelFutureListener.CLOSE);
而接收数据也是直接接收
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("接收消息事件");
System.out.println("服务端的消息:" + msg);
ctx.close();
}
Object编码器
public class ObjectEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 将消息对象转换为字节数组,并将其写入到ByteBuf中
ByteBuf encoded = ctx.alloc().buffer();
encoded.writeBytes(new ObjectMapper().writeValueAsBytes(msg));
// 将ByteBuf写入到ChannelPipeline中
ctx.write(encoded, promise);
}
}
Object解码器
public class ObjectDecoder extends ByteToMessageDecoder {
public ObjectDecoder(Class<?> clazz) {
this.clazz = clazz;
}
private Class<?> clazz;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException {
// 判断输入流中是否有数据
if (in.readableBytes() <= 0) {
return;
}
// 创建一个字节数组,用于存放输入流中的数据
byte[] bytes = new byte[in.readableBytes()];
// 读取输入流中的数据到字节数组中
in.readBytes(bytes);
// 使用ObjectMapper将字节数组转换成指定类型的对象,并添加到输出流中
out.add(new ObjectMapper().readValue(bytes, clazz));
}
}
案例
发送接收时间戳
/**
* 服务端处理器
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
/**
* 连接成功前事件
* @param ctx
* @throws Exception
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("注册事件");
}
/**
* 连接成功后事件
*
* @param ctx
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("连接成功事件");
// 创建一个4字节的缓冲区
final ByteBuf time = ctx.alloc().buffer(4);
// 将当前时间戳写入缓冲区
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
// 将缓冲区写入并刷新
ChannelFuture channelFuture = ctx.writeAndFlush(time);
// 添加一个监听器,当操作完成时,关闭连接
channelFuture.addListener((ChannelFutureListener) future -> {
assert channelFuture == future;
ctx.close();
});
}
/**
* 关闭连接前事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("关闭连接事件");
}
/**
* 关闭连接后事件
* @param ctx
* @throws Exception
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("未注册事件");
}
/**
* 连接异常处理
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印异常信息
cause.printStackTrace();
// 关闭连接
ctx.close();
}
}
/**
* 客户端处理器
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
/**
* 接收消息事件
*
* @param ctx
* @param msg 接收的消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
try {
// 读取消息中的时间戳
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println("来自服务端的时间" + new Date(currentTimeMillis));
// 关闭连接
ctx.close();
} finally {
// 释放缓冲区
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印异常信息
cause.printStackTrace();
// 关闭连接
ctx.close();
}
}
public class Server {
private int port;
public Server(int port) {
this.port = port;
}
public void run() throws Exception {
// 创建两个EventLoopGroup,一个用于接受客户端连接,另一个用于处理网络操作
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建ServerBootstrap实例,用于启动和绑定服务器
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 为每一个客户端连接创建一个ServerHandler实例
ch.pipeline().addLast(new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,启动服务器
ChannelFuture f = b.bind(port).sync();
// 等待服务器关闭
f.channel().closeFuture().sync();
} finally {
// 优雅关闭EventLoopGroup,释放所有资源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
// 如果用户提供了命令行参数,则使用该参数作为端口号
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
// 创建并启动服务器
new Server(port).run();
}
}
public class Client {
public static void main(String[] args) throws Exception {
// 设置服务器地址和端口
String host = "127.0.0.1";
int port = 8080;
// 创建EventLoopGroup,用于处理网络操作
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建Bootstrap实例,可以轻松引导Channel以供客户端使用
Bootstrap bootstrap = new Bootstrap();
// 设置EventLoopGroup
bootstrap.group(workerGroup);
// 设置Channel类型
bootstrap.channel(NioSocketChannel.class);
// 设置SO_KEEPALIVE选项
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
// 设置ChannelHandler
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ClientHandler());
}
});
// 连接服务器
ChannelFuture future = bootstrap.connect(host, port).sync();
// 关闭连接
future.channel().closeFuture().sync();
} finally {
// 优雅关闭EventLoopGroup
workerGroup.shutdownGracefully();
}
}
}