Netty入门与实战教程

发布时间:2023年12月24日

1. 引言

????????Netty作为一个高性能、异步事件驱动的网络编程框架,为开发者提供了强大的工具和丰富的功能,使得处理网络通信变得更加简单和高效。本文将深入介绍 Netty 的基础知识和核心概念,帮助读者快速上手并理解如何利用 Netty 构建强大的网络应用。

什么是netty?

????????Netty是一个基于Java的高性能、异步事件驱动的网络应用程序框架,专注于快速开发可维护、高性能、可扩展的网络服务器和客户端。它提供了一组易于使用的API,用于处理网络通信,使开发人员能够轻松地构建各种类型的网络应用程序。

官网:Netty: Home

Netty的主要特点包括:

  1. 异步事件驱动:采用异步、基于事件驱动的模型,实现高效的并发处理。
  2. 高性能:通过精心设计和优化,提供了出色的性能和吞吐量。
  3. 易于使用:提供简洁、直观的API和丰富的文档,使开发人员能够轻松上手。
  4. 可扩展性:支持用户自定义的扩展和定制,适应各种不同的应用场景。
  5. 多种协议支持:支持多种主流协议,如TCP、UDP、HTTP、WebSocket等。

????????Netty被广泛应用于构建各种类型的网络应用,包括但不限于即时通讯服务器、游戏服务器、分布式系统、物联网应用、HTTP代理服务器等。其灵活性和强大的功能使得开发者可以
????????

2. Netty 基础概念

核心组件

Netty 是一个基于 NIO 的网络应用框架,它提供了一组强大的工具和组件,用于快速、简单地构建高性能、可扩展的网络应用程序。以下是 Netty 的核心组件和关键概念:

1. Channel(通道)

Channel 是网络操作的基本载体。它类似于 Java NIO 中的 Socket,代表了一个开放的连接,可以执行读取和写入数据等操作。Netty 的 Channel 是双向的,可以进行读和写操作。

Channel channel = ...; // 获取一个 Channel 实例

2. EventLoop(事件循环)

EventLoop 是处理所有 Channel 上的事件的线程,负责处理传入的事件并执行相应的操作,如接收和发送数据、处理连接等。一个 EventLoop 可以绑定多个 Channel。

EventLoopGroup group = new NioEventLoopGroup(); // 创建一个 EventLoopGroup

3. ByteBuf(字节缓冲区)

ByteBuf 是 Netty 提供的字节容器,优化了对字节数据的操作。与 JDK 的 ByteBuffer 不同,ByteBuf 支持更灵活的动态缓冲区,提供了更多的操作接口。

ByteBuf buffer = Unpooled.buffer(); // 创建一个 ByteBuf 实例

4. Handler(处理器)

Handler 是 Netty 中的关键组件,用于处理入站和出站数据、触发事件以及实现业务逻辑。有多种类型的 Handler,如 ChannelHandler、ChannelInitializer 等。

ChannelHandler handler = new MyHandler(); // 创建一个自定义的 Handler

以上这些组件构成了 Netty 框架的核心,允许开发者以高效且可靠的方式构建各种网络应用程序。 Netty 的强大之处在于它提供了灵活且易于使用的 API,使得网络编程变得更加简单和高效。

3. Netty 应用示例

如何构建服务器、客户端

????????使用 Netty 可以构建高性能的网络应用程序,它提供了一套强大的异步事件驱动的网络编程框架。下面是一个简单的示例,演示了如何创建一个简单的 Echo 服务器和客户端。

pom文件中引入依赖

<dependencies>
    <!-- 其他依赖项 -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.68.Final</version>
    </dependency>
</dependencies>
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new EchoServerHandler());
                    }
                });

            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    // 运行启动netty server端
    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoServer(port).run();
    }
    
    private static class EchoServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.write(msg); // 回显收到的数据
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush(); // 将未决消息冲刷到远程节点,并关闭该Channel
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class EchoClient {
    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new EchoClientHandler());
                    }
                });

            Channel channel = bootstrap.connect(host, port).sync().channel();
            channel.closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
    // 运行启动netty 客户端
    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 8080;
        new EchoClient(host, port).run();
    }

    private static class EchoClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.writeAndFlush("Hello, Netty!"); // 客户端连接建立时发送消息
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("Received: " + msg); // 打印收到的消息
            ctx.close(); // 收到消息后关闭连接
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

分别运行代码中的main方法,可以看到控制台可以输出消息。

Netty通讯示例

服务器端代码

package com.memory.blog.test;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;

public class NettyServer {

    public static void main(String[] args) throws Exception {

        //创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
        // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建服务器端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来配置参数
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现
                    // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
                    // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //对workerGroup的SocketChannel设置处理器
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start。。");
            //绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
            //启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
            ChannelFuture cf = bootstrap.bind(9000).sync();
            //给cf注册监听器,监听我们关心的事件
            /*cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口9000成功");
                    } else {
                        System.out.println("监听端口9000失败");
                    }
                }
            });*/
            //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}


/**
 * 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
 */
class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取客户端发送的数据
     *
     * @param ctx 上下文对象, 含有通道channel,管道pipeline
     * @param msg 就是客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程 " + Thread.currentThread().getName());
        //Channel channel = ctx.channel();
        //ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
        //将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 数据读取完毕处理方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    /**
     * 处理异常, 一般是需要关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端代码

package com.memory.blog.test;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;

public class NettyClient {
    public static void main(String[] args) throws Exception {
        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            //加入处理器
                            channel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("netty client start");
            //启动客户端去连接服务器端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully();
        }
    }
}

 class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端连接服务器完成就会触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    //当通道有读取事件时会触发,即服务端发送数据给客户端
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

能看到控制台输出下面内容

好了,netty的简单使用就介绍到这里,后续会针对netty编解码、粘包拆包、线程模型在进行深入介绍和分享。

文章来源:https://blog.csdn.net/EverythingAtOnce/article/details/135166509
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。