一文道破Java NIO

发布时间:2023年12月30日

一、常见的几种 Java IO 工作模式

1.1 同步阻塞 IO

同步阻塞 IO(又称 BIO),是一种传统的 IO 模型。用户进程发起 IO 操作后,必须等待 IO 操作完成用户进程才可以做其他事情。此时系统资源并未得到充分利用,不适合大量 IO 场景的业务。

在这里插入图片描述

JDK 提供 Socket、ServerSocket 以及字符/节操作相关(如 OutputStream、BufferedWriter 等)的 API 支持 BIO。若一个客户端发起请求,服务端 ServerSocket 都会创建一个客户端 Socket 实例与之通信。由于 ServerSocket 的 accept 方法只能接受一个连接且连接过程中是阻塞的,所以服务端要想连接多个客户端就必须开启多个线程。这样就会带来一个问题:随着客户端增多,服务端线程数量也会增多,很可能导致线程堆栈溢出等问题。

1.2 同步非阻塞 IO

同步非阻塞 IO,用户进程发起 IO 操作后就可以返回做其它事情,但是用户进程时不时地需要轮询 IO 操作是否就绪,这就要求用户进程不停的去询问,从而引入不必要的 CPU 资源浪费。

在这里插入图片描述

从操作系统层面理解:应用进程发起 recvform 系统调用后,进程并没有被阻塞,内核马上返回给进程,如果数据还没有准备好,就返回一个 Error。进程在返回之后,可以继续去抢占CPU资源,做一些别的工作。然后在适当的时间在发起一次recvform系统调用,重复上述过程。需要注意的是从内核空间拷贝数据到用户空间的时候,应用进程也是阻塞的。

JDK 1.4 提供新的关于 I/O 的 API(如 Channel、Selector、Buffer 等),即 new I/O,个别资料也会把 Selector(多路复用)称为 NIO 的一种实现,详细参考面的分析。

参考:New I/O APIs:https://docs.oracle.com/javase/1.5.0/docs/guide/nio/index.html

1.3 异步非阻塞 IO

异步非阻塞 IO(AIO),采用“订阅-通知”模式:应用程序向操作系统注册 IO 监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的回调函数。

在这里插入图片描述

JDK 1.7 提供 AIO 实现 API(如 AsynchronousServerSocketChannel、AsynchronousSocketChannel、AsynchronousChannel、CompletionHandler 等)。

二、Java NIO 多路复用详解

2.1 原理图

BIO 中一个客户端连接会消耗服务端一个线程,为了解决这个问题,JDK 提供了多路复用模式,服务器实现模式为一个线程处理多个连接请求,即客户端发送的连接请求都会注册到多路复用器(Selector)上,多路复用器轮询到连接有 IO 请求就进行处理。这样就减少了服务端线程的使用,降低了因为线程的切换引起的不必要额资源浪费和多余的开销。根据个人的理解,如下图所示:

在这里插入图片描述

  1. 首先服务端创建选择器(Selector)实例,同时创建服务端 ServerSocketChannel,并注册到选择器(Selector)上,产生 SelectionKey.OP_ACCEPT 事件,并等待客户端的连接。
  2. 客户端启动,创建客户端 SocketChannel 实例。
  3. 当服务端发现客户端触发连接,或者有客户端数据可读写时,服务端会为连接创建 SocketChannel 实例,借助 SelectionKey 上 attach 的 ByteBuffer 进行数据传输。
2.2 基础组件简介

在这里插入图片描述

Selector

Selector(选择器)是 Channel 的多路复用器,它可以同时监控多个 Channel 的 IO 情况,允许单个线程来操作多个 Channel,从而管理多个网络连接。

可以只用一个线程处理所有的通道,这样会大量的减少线程之间上下文切换的开销。

Channel

通道将缓存区(Buffer)的数据移入或移出到各种 I/O 源,如文件、Socket 等。Channel 是双向的,既可以通过 Channel 读数据,也可以写数据,但必须使用 Buffer 作为缓冲区来操作数据。网络编程中,重要的有两个通道类:

  • SocketChannel : 可读写 TCP Socket,数据必须编码到 ByteBuffer 中来完成读/写,每个 SocketChannel 都有一个对等端 Socket 对象相关联。
  • ServerSocketChannel:ServerSocketChannel 只有一个目的:接入入站连接。无法读、写、连接 ServerSocketChannel。它支持的唯一操作是接受一个新的入站连接。
SelectionKey

SelectionKey 表示 SelectableChannel 在 Selector 中注册的标识。每个 Channel 向 Selector 注册时都将会创建一个 SelectionKey。
SelectionKey 是 SelectableChannel 与 Selector 关系连接者,同时维护了 Channel 事件。

  • SelectionKey.OP_ACCEPT:SelectionKey 关联的 Channel 是否准备好接收 socket 连接
  • SelectionKey.OP_CONNECT:SelectionKey 关联的 Channel 是否支持 socket 连接操作
  • SelectionKey.OP_READ:SelectionKey 关联的 Channel 是否有数据可读
  • SelectionKey.OP_WRITE:SelectionKey 关联的 Channel 是否有数据可写
2.3 Java NIO 代码示例

服务端:

public class NIOServer {
  
  public static void main(String[] args) throws IOException {
    // 1. 创建选择器
    Selector selector = Selector.open();

    // 2. 创建 ServerSocketChannel,并监听端口 8888,同时设置非阻塞
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.socket().bind(new InetSocketAddress(8888));
    
    //3. 把 serverSocketChannel 注册到 selector,只关心事件为 OP_ACCEPT
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
    // 4. 轮询获取“准备就绪”的注册过的操作
    while (true){
      if(selector.select(1000) == 0) { 
        System.out.println("服务器等待了1秒,无连接");  //没有事件发生
        continue;
      }
      
      // selector.selectedKeys() 返回关注事件的集合
      Set<SelectionKey> selectionKeys = selector.selectedKeys();
      Iterator<SelectionKey> iterator = selectionKeys.iterator();
      while(iterator.hasNext()){
        SelectionKey key = iterator.next();
        iterator.remove();     // 将其删除掉,表示我们已经处理了这个事件
        
        if(key.isAcceptable()) {      // 如果是 OP_ACCEPT, 有新的客户端连接
          
          SocketChannel socketChannel = serverSocketChannel.accept();  //该该客户端生成一个 SocketChannel
          socketChannel.configureBlocking(false);
          // 将socketChannel 注册到selector, 关注事件为 OP_READ, 同时关联一个Buffer
          socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
          
        } if(key.isReadable()) {     //发生 OP_READ
          
          // 通过 key 反向获取到对应 channel,从客户端接收消息并发消息给客户端
          SocketChannel clientChannel = (SocketChannel)key.channel();
          ByteBuffer buffer = (ByteBuffer)key.attachment(); // 获取到该channel关联的buffer
          int read = clientChannel.read(buffer);
          if(read == -1) {
            key.cancel();
          } else {
            // 接收客户端消息
            System.out.println("form 客户端 " + new String(buffer.array()));
            
            // 服务端接收消息后,给客户端发送给客户端
            Scanner scanner = new Scanner(System.in);
            String string = scanner.nextLine();
            ByteBuffer writeBuffer = Charset.forName("utf-8").encode(string);
            clientChannel.write(writeBuffer);
            if (writeBuffer.hasRemaining()) {
              // 如果不能一次性发完只需要触发 write 事件去发
              key.attach(writeBuffer);
              key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);
            }
          }
        } else if(key.isWritable() && key.isValid()) {
          // 可写
          ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
          SocketChannel clientChannel = (SocketChannel) key.channel();
          byteBuffer.flip();
          clientChannel.write(byteBuffer);
          if (!byteBuffer.hasRemaining()) {
            // 如果已完,则只无须关注 write 事件
            key.attach(null);
            key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
          }
        }
      } // while
    } // while
  }
  
}

客户端:

public class NIOClient {
  
  public static void main(String[] args) throws IOException {
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
    InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8888);
    
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    buffer.put(new Date().toString().getBytes());
    buffer.flip();
    socketChannel.write(buffer);
    socketChannel.close();
  }
}
2.4 Linux 支持多路复用的系统调用函数
select 函数
/**
 * nfds:需要监控的文件描述符的最大值加1。这个值通常设为所有文件描述符中的最大值加1,以确保select能够正确地监控所有需要的文件描述符
 * readfds:等待读事件的文件描述符集合,如果不关心读事件可以传 NULL
 * writefds:等待写事件的文件描述符集合,如果不关心读事件可以传 NULL
 * exceptfds:如果内核等待相应的文件描述符发生异常,则将失败的文件描述符设置进 exceptfds 中,如果不关心则传 NULL
 * timeout:超时时间,单位是毫秒(ms)
 */
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

一个线程通过调用 select 函数监控多个文件描述符,一旦描述符就绪(可读、可写、或发生异常),就能通知程序进行相应的读写操作。select 函数返回后,可通过遍历 fd_set 数据结构找到就绪的描述符。缺点如下:

  • 单个进程所打开的文件描述符是有限制的,由 FD_SETSIZE 设置(fd_set 结构中指定)。
  • select 函数仅返回就绪文件描述符的个数,具体哪个就绪,还需要进行遍历
  • select 函数调用需要 fd_set 数组,需要拷贝一份到内核,耗费资源
poll 函数
/**
 * fds:poll 函数监视的结构列表,每一个元素包含三部分内容:文件描述符、监视的事件集合、就绪的事件集合
 * nfds:fds数组的长度
 * timeout:超时时间,单位是毫秒(ms)
 */
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

struct pollfd {
  int fd;        // 文件描述符
  short events;   // 该文件描述符监控的事件
  short revents;  // 该文件描述符上已经就绪的事件
};

poll 函数重新设计了新的数据结构,程序可显示指定文件描述符的长度,并添加到 pollfd 链表中。相对于 select 而言,只是去掉了单个进程能打开的文件描述符的限制。

epoll 函数

epoll 重新设计了数据结构(红黑树和就绪队列),不仅没有单个进程能打开的文件描述符的限制,而且能通过异步 IO 事件唤醒的方式通知客户端进程就绪事件,不需要遍历就能找到就绪的文件描述符。另外在内核中保存了一份文件描述符集合,不需要用户每次传入,只需要告诉内核修改的部分即可,省去了数据拷贝的动作。详细如下图所示:

在这里插入图片描述
(侵删)

JDK Selector 底层实现

JDK Selector 是 JVM 层面的 API,底层依赖操作系统的系统调用支持,所以在不同操作系统环境中,提供了不同的实现::

  • Windows : select
  • Unix : poll
  • Mac : kqueue
  • Linux : epoll

三、Reactor模式

Reactor 模式是一种设计思想,具体描述如下:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to associated request handlers.
  
关键知识:基于事件驱动的模式,有多个输入源(可理解为客户端),一个服务处理器(Service Handler),多个 请求处理器(Request Handlers)。

Java NIO 的多路复用机制就是对 Reactor 模式的支持。基础的 Reactor 模式如下图(单 Reactor 单线程版本):

在这里插入图片描述

其中 acceptor 只处理连接请求,其他 Handler 处理读写请求。另外,根据不同的使用场景延伸出以下模式:

  • 单 Reactor 多线程:Handler 处理器(图中的 read、decode 等操作)的执行放入线程池,多线程进行业务处理。
  • **主从 Reactor 多线程:**Netty 默认实现。

四、Java NIO 知识图谱

在这里插入图片描述

文章来源:https://blog.csdn.net/zhuqiuhui/article/details/135303656
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。