在网络通信中,当连接已经建立成功,服务端和客户端都会拥有一个Socket实例,每个Socket实例都有一个InputStream和OutputStream,并通过这两个对象来交换数据。同时我们也知道网络I/O都是以字节流传输的,当创建Socket对象时,操作系统会为InputStream和OutputStream分别分配一定大小的缓存区,数据的写入都是通过这个缓存区完成的。写入端将数据写到SendQ队列中,当队列填满时,数据将被转移到另一端的InputStream的RecvQ队列中,如果这时RecvQ已经满了,那么OutputStream的write方法将会阻塞直到RecvQ队列有足够的空间容纳SendQ发送的数据。值的注意的是,这个缓存区的大小及写入端的速度和读取端的速度非常影响这个连接的数据传输效率,由于可能发生阻塞,所以网络I/O与磁盘I/O不同的是数据的写入和读取还要有一个协调的过程,如果两边同时传送数据可能会产生死锁。使用NIO可以解决该问题。
死锁产生的根本原因是 客户端和服务端都需要对方释放资源,例如在缓存队列中写入数据。当缓存队列满时,写操作会被阻塞,但同时双方都可能需要对方的缓存队列空间,这样就形成了资源争用的情况
BIO即阻塞I/O,不管是磁盘I/O还是网络I/O,数据载写入OutputStream或者从InputStream读取时都可能被阻塞,一旦被阻塞,线程将会失去CPU的使用权,这在当前大规模访问量和有性能要求的情况下时不能被接受的。虽然当前的网络I/O有一些解决方案,如一个客户端一个处理线程,出现阻塞时只能是一个线程阻塞而不会影响其他线程工作,还有为了减少系统线程的开销,采用线程池的办法来减少线程创建和回收的成本。单如果当前需要大量的HTTP长连接的情况,线程池可能无法创建那么多线程来保持连接。所以此时我们需要一种新的I/O操作方式。
磁盘IO场景:
在磁盘 I/O 中,阻塞通常发生在数据的读取和写入过程中。假设一个应用程序需要从磁盘读取大量数据:
- 读取阻塞: 当应用程序发起读取请求时,系统会将请求发送给磁盘驱动器,然后等待磁盘驱动器将数据加载到内存中。在这个过程中,应用程序的线程会被阻塞,直到读取操作完成。如果读取的数据量较大,阻塞时间可能会显著增加。
- 写入阻塞: 类似地,当应用程序发起写入请求时,系统会将数据传输到磁盘,然后等待写入操作完成。写入操作的阻塞时间取决于数据的大小和磁盘的性能。如果写入的数据量很大,应用程序可能会长时间地被阻塞。
网络IO场景:
在网络 I/O 中,BIO 模型同样存在阻塞问题。考虑一个基于阻塞 I/O 的服务器应用程序,它接受客户端连接并处理数据:
- 接受连接阻塞: 当服务器调用accept函数等待客户端连接时,如果没有客户端连接进来,该调用会一直阻塞。在这段时间内,服务器的线程无法执行其他任务,造成资源浪费。
- 读取数据阻塞: 在已经建立连接的情况下,当服务器调用read函数等待接收客户端发送的数据时,如果没有数据到达,该调用会一直阻塞。服务器线程被迫等待,可能会导致性能下降。
- 写入数据阻塞: 类似地,当服务器调用write函数将数据发送到客户端时,如果客户端接收缓冲区已满,写入操作也会阻塞。这可能使得服务器线程长时间处于等待状态。
Buffer类
在 Java NIO 中,Buffer 类是一个抽象类,表示一个数据缓冲区,用于在通道(Channel)和原始数据之间进行数据传输。主要的 Buffer 子类包括 ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer 和 DoubleBuffer,它们分别对应不同的基本数据类型。
public abstract class Buffer {
//mark 属性和 reset() 方法一起使用,用于在缓冲区中设置一个标记(mark)位置,并在之后通过 reset() 方法将当前位置重置为这个标记位置。mark 属性的作用是用于记录一个位置,以便后续能够回到该位置,方便重新处理或采取其他措施。
private int mark = -1;
//当前位置的索引
private int position = 0;
//限制位置的索引,即缓冲区中数据的有效长度
private int limit;
//缓存区的容量
private int capacity;
//内存段的代理对象
final MemorySegmentProxy segment;
//该方法用于设置新的索引位置
public Buffer position(int newPosition) {
//如果新的索引位置大于数据的有效长度,或小于0,抛出异常
if (newPosition > limit | newPosition < 0)
throw createPositionException(newPosition);
//如果新的索引位置在mark标记之后,那么标记置为-1
if (mark > newPosition) mark = -1;
//然后将当前索引位置设置为新的索引位置
position = newPosition;
return this;
}
//设置新的有效数据长度
public Buffer limit(int newLimit) {
//如果新的有效数据长度大于缓冲区最大容量或小于0,抛出异常
if (newLimit > capacity | newLimit < 0)
throw createLimitException(newLimit);
//将有效数据长度设置为新的有效数据长度
limit = newLimit;
//如果当前位置所以大于新的有效数据长度,那么当前位置索引同样设置为有效数据长度
if (position > newLimit) position = newLimit;
//mark如果大于新的有效数据长度,则直接设置为无效
if (mark > newLimit) mark = -1;
return this;
}
//用于标记当前索引位置
public Buffer mark() {
mark = position;
return this;
}
//将当前索引位置重置为mark标记所在的位置
public Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
//清空缓存区数据,实际上就是重置了mark、position和limit
public Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
//flip() 方法可以将缓冲区从写模式切换到读模式。调用 flip() 后,位置被设置为0,限制被设置为之前的位置,用于准备读取缓冲区中的数据。
public Buffer flip() {
//limit有效数据长度为当前索引所在位置
limit = position;
//postion设置为0,表示从0开始读取
position = 0;
mark = -1;
return this;
}
//将缓冲区的位置设置为 0,限制保持不变,用于重新读取缓冲区中的数据,类似于 flip() 但不改变限制。
public Buffer rewind() {
position = 0;
mark = -1;
return this;
}
//获取buffer中剩余的有效容量
public final int remaining() {
int rem = limit - position;
return rem > 0 ? rem : 0;
}
}
Channel类
Java NIO 中,Channel 是一个接口,它提供了用于读取和写入数据的统一的 API。Channel 接口是 NIO 用于与 I/O 设备(如文件、套接字、选择器等)交互的核心部分。Channel 接口的实现类可以包括文件通道、套接字通道等。
public interface Channel extends Closeable {
//判断通道是否打开
public boolean isOpen();
//关闭通道
public void close() throws IOException;
}
Seletionkey类
SelectionKey 类是 Java NIO 中的关键类,它用于表示注册到 Selector 上的通道和对应的事件。SelectionKey 对象是 Selector 与通道之间的桥梁,它包含了与通道相关的一些信息,以及通道所感兴趣的事件,以便在选择器上进行有效的事件选择。
选择键则是一种将通道和选择器进行关联的机制。
public abstract class SelectionKey {
//SelectableChannel 是 Java NIO 中表示支持非阻塞模式的通道的抽象类,该返回返回一个通道
public abstract SelectableChannel channel();
//该方法返回一个选择器
public abstract Selector selector();
//获取通道所感兴趣的操作集合(事件集合),返回一个位掩码,用于表示关注的事件
public abstract int interestOps();
//读操作的位掩码,用于表示通道已经准备好进行读操作。
public static final int OP_READ = 1 << 0;
//写操作的位掩码,用于表示通道已经准备好进行写操作。
public static final int OP_WRITE = 1 << 2;
//连接操作的位掩码, 用于表示连接已经建立
public static final int OP_CONNECT = 1 << 3;
//accept操作的位掩码, 用于表示通道已经准备好接受新的连接。
public static final int OP_ACCEPT = 1 << 4;
//返回通道现在是否可读
public final boolean isReadable() {
return (readyOps() & OP_READ) != 0;
}
//通道现在是否可写
public final boolean isWritable() {
return (readyOps() & OP_WRITE) != 0;
}
//通道是否已经连接就绪
public final boolean isConnectable() {
return (readyOps() & OP_CONNECT) != 0;
}
//通道是否可以接受新的连接
public final boolean isAcceptable() {
return (readyOps() & OP_ACCEPT) != 0;
}
//将指定的对象附加到此键
public final Object attach(Object ob) {
return ATTACHMENT.getAndSet(this, ob);
}
}
Selector类
Selector 是 Java NIO 中的一个关键类,用于实现非阻塞 I/O 操作的多路复用。通过 Selector,可以在单个线程上同时监控多个通道的事件,从而实现高效的事件驱动编程模型。
public abstract class Selector implements Closeable {
//返回新创建的选择器实例
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
//判断selector是否已经打开
public abstract boolean isOpen();
//SelectorProvider 是一个抽象类,用于提供 Selector 和 Channel 的创建。每个 Selector 都与一个 SelectorProvider 实例关联,而SelectorProvider 实例的具体实现是由具体的操作系统提供的。
public abstract SelectorProvider provider();
//获取选择器上所有的键集
public abstract Set<SelectionKey> keys();
//关闭选择器
public abstract void close() throws IOException;
// 阻塞,直到至少有一个通道在选择器上准备好进行 I/O 操作,或者调用线程被中断。返回已经准备就绪的通道的数量。
public abstract int select() throws IOException;
//非阻塞地检查是否有通道准备好进行 I/O 操作。返回已经准备就绪的通道的数量。
public abstract int selectNow() throws IOException;
//唤醒因为调用 select 或 selectNow 方法而处于阻塞状态的线程。返回调用 wakeup 方法的选择器。
public abstract Selector wakeup();
}
关键类就是Channel和Selector,它们是NIO中的两个核心概念。Channel 通过 register 方法可以注册到 Selector 中,以实现非阻塞 I/O 操作。Channel 和 Buffer 之间通过 read 和 write 方法进行数据的传输。数据首先被写入到缓冲区,然后从缓冲区读取到通道或从通道读取到缓冲区。Channel 可以通过 register 方法注册到 Selector 上,注册时需要指定感兴趣的事件,例如读、写等。SelectionKey 对象表示了一个通道在一个选择器上的注册信息,它与通道和选择器之间建立了关联。SelectionKey 包含了通道、选择器、感兴趣的操作集合(事件集合)、附件等信息。
下面看看NIO是如何工作的:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioServer {
public static void main(String[] args) {
try (
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open() //创建一个通道
) {
//将通道绑定到一个指定的端口,ServerSocketChannel使用选择器(Selector)来管理多个通道,可以在一个线程中处理多个通道的连接请求。
serverSocketChannel.bind(new InetSocketAddress(8080));
//将通道设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//创建一个选择器
Selector selector = Selector.open();
//将通道注册到选择器中,状态是等待接受客户端连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NIO Server started on port 8080");
while (true) {
//自旋获取准备好的通道
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
//获取所有的键集
Set<SelectionKey> selectedKeys = selector.selectedKeys();
//迭代键集
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
//处理相应事件
if (key.isAcceptable()) {
handleAcceptEvent(key, selector);
} else if (key.isReadable()) {
handleReadEvent(key);
}
keyIterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static void handleAcceptEvent(SelectionKey key, Selector selector) throws IOException {
//获取键所关联的通道
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//准备接受连接
SocketChannel clientChannel = serverSocketChannel.accept();
//设置通道模式为非阻塞
clientChannel.configureBlocking(false);
//通道注册到选择器上,状态为可以读取数据
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("Accepted connection from " + clientChannel);
}
private static void handleReadEvent(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
//分配缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//通道读取
int bytesRead = clientChannel.read(buffer);
if (bytesRead == -1) {
System.out.println("Connection closed by client: " + clientChannel);
clientChannel.close();
} else if (bytesRead > 0) {
buffer.flip();
clientChannel.write(buffer);
}
}
}
上面代码用NIO实现了一个简单的服务器
调用Selector的静态工厂方法创建一个选择器(这个静态工厂是操作系统底层实现的)。然后创建一个服务端Channel,绑定到一个Socket对象,并把这个通道注册到选择器上,把这个通道设置为非阻塞模式,然后就可以调用Selector的selectedKeys方法检查已经注册在这个选择器上的所有通信信道是否有需要事件发生,从而可以读取通信的数据,而这里读取的数据是Buffer,这个Buffer是我们可以控制的缓冲区。在上面这段程序中,将server的监听连接请求和事件吹了放在一个线程中,但是在事件应用中,我们通常会放在两个线程中,一个线程专门监听客户端的连接请求,而且是以阻塞的方式执行的;另一个线程专门负责处理请求,这个专门处理请求的线程才会真正采用NIO的方式。
下图展示量NIO的工作方式,Selector可以监听一组通信信道上的I/O状态,前提是这个Selector已经注册到这些通信通道中,选择器Selector可以调用select()方法检查已经注册的通信信道上I/O是否已经准备好,如果没有至少一个信道I/O状态发生变化,那么select方法会阻塞等待或在超时时间后返回0。如果有多个信道有数据,那么将会把这些数据分配到对应的数据Buffer中。所以关键的地方是,有一个线程来处理所有连接的数据交互,每个链接的数据交互不是阻塞方式的,所以可以同时处理大量的连接请求。
NIO模式下客户端和服务端通信流程对比如下:
1.服务端初始化:
服务端创建一个 ServerSocketChannel,并绑定到一个特定的端口。将 ServerSocketChannel 设置为非阻塞模式,并注册到一个 Selector 上,以监听连接事件。
2.客户端初始化
客户端创建一个 SocketChannel,连接到服务端的地址。将 SocketChannel 设置为非阻塞模式,并注册到一个 Selector 上,以监听连接事件。
3.事件循环
服务端和客户端都进入一个事件循环,不断地检查发生的事件。在服务端,可能会检查 OP_ACCEPT 事件,表示有新的连接请求。在客户端,可能会检查 OP_CONNECT 事件,表示连接已建立。(这里如果一直没有客户端访问程序陷入空转并不会阻塞,这样也就避免了内核切换)
4.处理连接事件
在服务端,当有新的连接请求到达时,通过 ServerSocketChannel.accept() 接受连接,并将新的 SocketChannel 注册到 Selector 上监听读事件。
5.客户端处理连接事件
在客户端,当连接建立完成时,通过 SocketChannel.finishConnect() 完成连接,然后注册到 Selector 上监听读事件
6.处理读事件
在服务端和客户端,当通道可读时,从通道中读取数据
这样,通过 NIO,服务端和客户端可以通过非阻塞的方式处理多个连接,并在一个事件循环中实现数据的读取和写入。在实际应用中,可能需要结合多线程、线程池等机制,以更好地处理多个连接的并发处理。
通过前面分析源码,我们也大致知道了Buffer的工作方式,Buffer可以简单理解为一组基本数据类型的元素列表,它通过介个变量来保持这个数据当前的位置状态,也就是有四个索引:
我们通过ByteBuffer.allocate(11)
方法创建了一个11个byte的数组缓冲区,初始状态如下所示,position为0,capacity和limit都是数组默认长度。
当我们写入五个数据后位置如下所示。
调用flip方法,数组切换状态为读状态:
此时底层操作系统可以从缓存区中正确读取这5个字节数据并发送出去了,在下一次写数据之前我们再调用一下clear方法,缓冲区又回回到默认位置。mark标记就是记录当前position的前一个位置,我们调用reset时,position会恢复到mark位置。
通过Channel获取的I/O数据首先要经过操作系统的Socket缓冲区缓冲区再讲数据复制到Buffer中,从操作系统缓冲区到用户缓冲区比较消耗性能,Buffer提供了另外一种直接操作操作系统缓冲区的方法,即ByteBuffer.allocateDirector(size);,该方法返回与底层存储空间关联的缓冲区,它通过Native代码操作非JVM堆的内存空间,每次创建和释放的时候都会调用一次System.gc()。
我们知道当我们通过IO读取磁盘数据时,需要使用操作系统的系统调用方法,而这会涉及用户态到内核态的切换,操作系统需要把数据读取到内核态空间,然后将数据送到用户态,这是十分重的一个操作,NIO提供了两个优化方法:
减少数据从内核到用户空间中的复制,数据直接在哪和空间中移动,下图首先是传统数据访问方式,然后是tansferXXX方式:
它将文件按照一定大小块映射为内存区域,当程序访问这个内存区域时,直接操作这个文件数据,这种方式就直接跳过了数据从内核空间向用户空间复制。