同步阻塞 IO(又称 BIO),是一种传统的 IO 模型。用户进程发起 IO 操作后,必须等待 IO 操作完成用户进程才可以做其他事情。此时系统资源并未得到充分利用,不适合大量 IO 场景的业务。
JDK 提供 Socket、ServerSocket 以及字符/节操作相关(如 OutputStream、BufferedWriter 等)的 API 支持 BIO。若一个客户端发起请求,服务端 ServerSocket 都会创建一个客户端 Socket 实例与之通信。由于 ServerSocket 的 accept 方法只能接受一个连接且连接过程中是阻塞的,所以服务端要想连接多个客户端就必须开启多个线程。这样就会带来一个问题:随着客户端增多,服务端线程数量也会增多,很可能导致线程堆栈溢出等问题。
同步非阻塞 IO,用户进程发起 IO 操作后就可以返回做其它事情,但是用户进程时不时地需要轮询 IO 操作是否就绪,这就要求用户进程不停的去询问,从而引入不必要的 CPU 资源浪费。
从操作系统层面理解:应用进程发起 recvform 系统调用后,进程并没有被阻塞,内核马上返回给进程,如果数据还没有准备好,就返回一个 Error。进程在返回之后,可以继续去抢占CPU资源,做一些别的工作。然后在适当的时间在发起一次recvform系统调用,重复上述过程。需要注意的是从内核空间拷贝数据到用户空间的时候,应用进程也是阻塞的。
JDK 1.4 提供新的关于 I/O 的 API(如 Channel、Selector、Buffer 等),即 new I/O,个别资料也会把 Selector(多路复用)称为 NIO 的一种实现,详细参考面的分析。
参考:New I/O APIs:https://docs.oracle.com/javase/1.5.0/docs/guide/nio/index.html
异步非阻塞 IO(AIO),采用“订阅-通知”模式:应用程序向操作系统注册 IO 监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的回调函数。
JDK 1.7 提供 AIO 实现 API(如 AsynchronousServerSocketChannel、AsynchronousSocketChannel、AsynchronousChannel、CompletionHandler 等)。
BIO 中一个客户端连接会消耗服务端一个线程,为了解决这个问题,JDK 提供了多路复用模式,服务器实现模式为一个线程处理多个连接请求,即客户端发送的连接请求都会注册到多路复用器(Selector)上,多路复用器轮询到连接有 IO 请求就进行处理。这样就减少了服务端线程的使用,降低了因为线程的切换引起的不必要额资源浪费和多余的开销。根据个人的理解,如下图所示:
Selector(选择器)是 Channel 的多路复用器,它可以同时监控多个 Channel 的 IO 情况,允许单个线程来操作多个 Channel,从而管理多个网络连接。
可以只用一个线程处理所有的通道,这样会大量的减少线程之间上下文切换的开销。
通道将缓存区(Buffer)的数据移入或移出到各种 I/O 源,如文件、Socket 等。Channel 是双向的,既可以通过 Channel 读数据,也可以写数据,但必须使用 Buffer 作为缓冲区来操作数据。网络编程中,重要的有两个通道类:
SelectionKey 表示 SelectableChannel 在 Selector 中注册的标识。每个 Channel 向 Selector 注册时都将会创建一个 SelectionKey。
SelectionKey 是 SelectableChannel 与 Selector 关系连接者,同时维护了 Channel 事件。
服务端:
public class NIOServer {
public static void main(String[] args) throws IOException {
// 1. 创建选择器
Selector selector = Selector.open();
// 2. 创建 ServerSocketChannel,并监听端口 8888,同时设置非阻塞
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
//3. 把 serverSocketChannel 注册到 selector,只关心事件为 OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 4. 轮询获取“准备就绪”的注册过的操作
while (true){
if(selector.select(1000) == 0) {
System.out.println("服务器等待了1秒,无连接"); //没有事件发生
continue;
}
// selector.selectedKeys() 返回关注事件的集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove(); // 将其删除掉,表示我们已经处理了这个事件
if(key.isAcceptable()) { // 如果是 OP_ACCEPT, 有新的客户端连接
SocketChannel socketChannel = serverSocketChannel.accept(); //该该客户端生成一个 SocketChannel
socketChannel.configureBlocking(false);
// 将socketChannel 注册到selector, 关注事件为 OP_READ, 同时关联一个Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
} if(key.isReadable()) { //发生 OP_READ
// 通过 key 反向获取到对应 channel,从客户端接收消息并发消息给客户端
SocketChannel clientChannel = (SocketChannel)key.channel();
ByteBuffer buffer = (ByteBuffer)key.attachment(); // 获取到该channel关联的buffer
int read = clientChannel.read(buffer);
if(read == -1) {
key.cancel();
} else {
// 接收客户端消息
System.out.println("form 客户端 " + new String(buffer.array()));
// 服务端接收消息后,给客户端发送给客户端
Scanner scanner = new Scanner(System.in);
String string = scanner.nextLine();
ByteBuffer writeBuffer = Charset.forName("utf-8").encode(string);
clientChannel.write(writeBuffer);
if (writeBuffer.hasRemaining()) {
// 如果不能一次性发完只需要触发 write 事件去发
key.attach(writeBuffer);
key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);
}
}
} else if(key.isWritable() && key.isValid()) {
// 可写
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
SocketChannel clientChannel = (SocketChannel) key.channel();
byteBuffer.flip();
clientChannel.write(byteBuffer);
if (!byteBuffer.hasRemaining()) {
// 如果已完,则只无须关注 write 事件
key.attach(null);
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
} // while
} // while
}
}
客户端:
public class NIOClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8888);
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(new Date().toString().getBytes());
buffer.flip();
socketChannel.write(buffer);
socketChannel.close();
}
}
/**
* nfds:需要监控的文件描述符的最大值加1。这个值通常设为所有文件描述符中的最大值加1,以确保select能够正确地监控所有需要的文件描述符
* readfds:等待读事件的文件描述符集合,如果不关心读事件可以传 NULL
* writefds:等待写事件的文件描述符集合,如果不关心读事件可以传 NULL
* exceptfds:如果内核等待相应的文件描述符发生异常,则将失败的文件描述符设置进 exceptfds 中,如果不关心则传 NULL
* timeout:超时时间,单位是毫秒(ms)
*/
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
一个线程通过调用 select 函数监控多个文件描述符,一旦描述符就绪(可读、可写、或发生异常),就能通知程序进行相应的读写操作。select 函数返回后,可通过遍历 fd_set 数据结构找到就绪的描述符。缺点如下:
/**
* fds:poll 函数监视的结构列表,每一个元素包含三部分内容:文件描述符、监视的事件集合、就绪的事件集合
* nfds:fds数组的长度
* timeout:超时时间,单位是毫秒(ms)
*/
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd; // 文件描述符
short events; // 该文件描述符监控的事件
short revents; // 该文件描述符上已经就绪的事件
};
poll 函数重新设计了新的数据结构,程序可显示指定文件描述符的长度,并添加到 pollfd 链表中。相对于 select 而言,只是去掉了单个进程能打开的文件描述符的限制。
epoll 重新设计了数据结构(红黑树和就绪队列),不仅没有单个进程能打开的文件描述符的限制,而且能通过异步 IO 事件唤醒的方式通知客户端进程就绪事件,不需要遍历就能找到就绪的文件描述符。另外在内核中保存了一份文件描述符集合,不需要用户每次传入,只需要告诉内核修改的部分即可,省去了数据拷贝的动作。详细如下图所示:
(侵删)
JDK Selector 是 JVM 层面的 API,底层依赖操作系统的系统调用支持,所以在不同操作系统环境中,提供了不同的实现::
Reactor 模式是一种设计思想,具体描述如下:
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to associated request handlers.
关键知识:基于事件驱动的模式,有多个输入源(可理解为客户端),一个服务处理器(Service Handler),多个 请求处理器(Request Handlers)。
Java NIO 的多路复用机制就是对 Reactor 模式的支持。基础的 Reactor 模式如下图(单 Reactor 单线程版本):
其中 acceptor 只处理连接请求,其他 Handler 处理读写请求。另外,根据不同的使用场景延伸出以下模式: