netty

netty

介绍

Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和简化了网络编程,例如 TCP 和 UDP 套接字服务器。
file

快速入门

官方 4.x 用户指南- 推荐版本
Reference (4.1.101.Final)API文档

发送接收时间戳

请查看案例

处理基于流的传输

简单来说,网络传输如果不进行如何处理可能会导致读取的不正确。
file

第一个解决方案

简单来说,设置一个等待多少长度的数据到达,但是这会带来一个问题就是实际使用中传输的数据长度是可变的。

public class ClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    // 当ChannelHandler添加到Channel时调用
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        // 分配一个4字节的缓冲区
        buf = ctx.alloc().buffer(4);
    }

    // 当ChannelHandler从Channel移除时调用
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        // 释放缓冲区
        buf.release();
        buf = null;
    }
}

第二种解决方案

幸运的是,Netty 提供了一个可扩展的类,可以帮助您编写开箱即用的第一个类:

public class Decoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 判断可读字节是否小于4
        if (in.readableBytes() < 4) {
            return;
        }

        // 将可读字节读取4个字节,添加到out中
        out.add(in.readBytes(4));
    }
}

ByteToMessageDecoder是一个ChannelInboundHandler可以轻松处理碎片问题的实现。
ByteToMessageDecoderdecode()每当收到新数据时,都会使用内部维护的累积缓冲区调用该方法。
decode()out当累积缓冲区中没有足够的数据时可以决定不添加任何内容。当收到更多数据时ByteToMessageDecoder将再次调用。decode()
如果decode()添加一个对象out,则表示解码器成功解码了一条消息。ByteToMessageDecoder将丢弃累积缓冲区的读取部分。请记住,您不需要解码多条消息。ByteToMessageDecoder将继续调用该decode()方法,直到它没有添加任何内容out。
然后在设置ChannelHandler中添加

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new Decoder(), new ClientHandler());
    }
});

此外,Netty 提供了开箱即用的解码器,使您能够非常轻松地实现大多数协议,并帮助您避免最终实现单一的、不可维护的处理程序。更详细的示例请参考以下包:

io.netty.example.factorial对于二进制协议,以及
io.netty.example.telnet用于基于文本行的协议。

用 POJO 代替ByteBuf

如果需要使用 POJO 而不是ByteBuf,那么就需要写对应的编码器和解码器

Object编码器和解码器

只需要在连接时传入(编码器,解码器,处理器)如下

ch.pipeline().addLast(new ObjectEncoder(),new ObjectDecoder(String.class),new Handler());

就可以直接发送消息

// 向客户端发送消息
ChannelFuture f = ctx.writeAndFlush("消息");
// 添加关闭监听器
f.addListener(ChannelFutureListener.CLOSE);

而接收数据也是直接接收

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    System.out.println("接收消息事件");
    System.out.println("服务端的消息:" + msg);
    ctx.close();
}

Object编码器

public class ObjectEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // 将消息对象转换为字节数组,并将其写入到ByteBuf中
        ByteBuf encoded = ctx.alloc().buffer();
        encoded.writeBytes(new ObjectMapper().writeValueAsBytes(msg));
        // 将ByteBuf写入到ChannelPipeline中
        ctx.write(encoded, promise);
    }
}

Object解码器

public class ObjectDecoder extends ByteToMessageDecoder {
    public ObjectDecoder(Class<?> clazz) {
        this.clazz = clazz;
    }

    private Class<?> clazz;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException {
        // 判断输入流中是否有数据
        if (in.readableBytes() <= 0) {
            return;
        }

        // 创建一个字节数组,用于存放输入流中的数据
        byte[] bytes = new byte[in.readableBytes()];
        // 读取输入流中的数据到字节数组中
        in.readBytes(bytes);

        // 使用ObjectMapper将字节数组转换成指定类型的对象,并添加到输出流中
        out.add(new ObjectMapper().readValue(bytes, clazz));
    }
}

案例

发送接收时间戳
/**
 * 服务端处理器
 */
public class ServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 连接成功前事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("注册事件");
    }

    /**
     * 连接成功后事件
     *
     * @param ctx
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("连接成功事件");
        // 创建一个4字节的缓冲区
        final ByteBuf time = ctx.alloc().buffer(4);
        // 将当前时间戳写入缓冲区
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        // 将缓冲区写入并刷新
        ChannelFuture channelFuture = ctx.writeAndFlush(time);
        // 添加一个监听器,当操作完成时,关闭连接
        channelFuture.addListener((ChannelFutureListener) future -> {
            assert channelFuture == future;
            ctx.close();
        });
    }

    /**
     * 关闭连接前事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("关闭连接事件");
    }

    /**
     * 关闭连接后事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("未注册事件");
    }

    /**
     * 连接异常处理
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 打印异常信息
        cause.printStackTrace();
        // 关闭连接
        ctx.close();
    }
}
/**
 * 客户端处理器
 */
public class ClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 接收消息事件
     *
     * @param ctx
     * @param msg 接收的消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        try {
            // 读取消息中的时间戳
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println("来自服务端的时间" + new Date(currentTimeMillis));
            // 关闭连接
            ctx.close();
        } finally {
            // 释放缓冲区
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 打印异常信息
        cause.printStackTrace();
        // 关闭连接
        ctx.close();
    }
}
public class Server {

    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // 创建两个EventLoopGroup,一个用于接受客户端连接,另一个用于处理网络操作
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 创建ServerBootstrap实例,用于启动和绑定服务器
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            // 为每一个客户端连接创建一个ServerHandler实例
                            ch.pipeline().addLast(new ServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口,启动服务器
            ChannelFuture f = b.bind(port).sync();

            // 等待服务器关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅关闭EventLoopGroup,释放所有资源
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        // 如果用户提供了命令行参数,则使用该参数作为端口号
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        // 创建并启动服务器
        new Server(port).run();
    }
}
public class Client {
    public static void main(String[] args) throws Exception {
        // 设置服务器地址和端口
        String host = "127.0.0.1";
        int port = 8080;
        // 创建EventLoopGroup,用于处理网络操作
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 创建Bootstrap实例,可以轻松引导Channel以供客户端使用
            Bootstrap bootstrap = new Bootstrap();
            // 设置EventLoopGroup
            bootstrap.group(workerGroup);
            // 设置Channel类型
            bootstrap.channel(NioSocketChannel.class);
            // 设置SO_KEEPALIVE选项
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            // 设置ChannelHandler
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ClientHandler());
                }
            });

            // 连接服务器
            ChannelFuture future = bootstrap.connect(host, port).sync();

            // 关闭连接
            future.channel().closeFuture().sync();
        } finally {
            // 优雅关闭EventLoopGroup
            workerGroup.shutdownGracefully();
        }
    }
}