? ? ? ? 选择器是什么?选择器和通道关系是什么?
? ? ? ? 简单的说,选择器的使用是完成IO的多路复用,其主要工作是通道的注册、监听、事件查询。一个通道代表一条连接通路,通过选择器可以同时监听多个通道的IO(输入输出)状况。选择器和通道的关系是监控和被监控的关系。
? ? ? ? 选择器提供了独特的API方法,能够选出(select)所监控的通道已经发生了哪些IO事件,包括读写就绪的IO操作事件。
? ? ? ? 在NIO编程中,一般是一个单线程处理一个选择器,一个选择器可以监控很多通道。所以,通过选择器,一个单线程可以处理成千上万甚至更多的通道。在极端情况狂(数万个连接),只用一个线程就可以处理所有的通道,这样会大量减少线程之间上下文切换的开销。
? ? ? ? 通道和选择器之间的关联通过register(注册)的方式完成。调用通道的register(Selector sel,int ops)方法,可以将通道实例注册到一个选择器中。register方法有两个参数:第一个参数指定通道注册到的选择器实例;第二个参数指定选择器要监控的IO事件类型。
? ? ? ? 可供选择监控的通道IO事件类型包括以下四种:????????
? ? ? ? (1) 可读:SelectionKey.OP_READ。
? ? ? ? (2) 可写:SelectionKey.OP_WRITE。
? ? ? ? (3) 连接:SelectionKey.OP_CONNECT。
? ? ? ? (4) 接收:SelectionKey.OP_ACCEPT。
? ? ? ? 以上事件类型常量定义在SelectionKey类中。如果选择器要监听通道的多种事件,可以用"按位或"运算符来实现。例如,同时监听可读和可写IO事件:
int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE
? ? ? ? 什么是IO事件?
? ? ? ? 这里的IO事件不是对通道的IO操作,而是通道处于某个IO操作的就绪状态,表示通道具备执行某个IO操作的条件。例如,某个SocketChannel传输通道如果完成了和对端的三次握手,就会发生“链接就绪”事件;某个ServerSocketChannel服务器连接监听通道,在监听到一个新连接到来时,则会发生“接受就绪”事件等。
【说明】socket连接事件的核心原理和TCP连接的建立过程有关。
? ? ? ? 并不是所有的通道都是可以被选择器监控或选择的。例如,FileChannel就不能被选择器复用。判读一个通道能否被选择器监控或选择有一个前提:判断它是否继承了抽象类SelectableChannel(可选择通道),如果是,就可以被选择,否则不能被选择。
????????SelectableChannel类,提供了实现通道可选择性所需要的公共方法。Java NIO中所有网路连接socket通道都继承了SelectableChannel类,都是可选择的。
? ? ? ? 通道和选择器的监控关系注册成功后,就可以选择就绪事件,具体的选择工作可调用Selector的select()方法来完成。通过select()方法,选择器可以不断地选择通道中所发生操作的就绪状态,返回注册过的那些感兴趣的IO事件。换句话说,一旦在通道中发生了某些IO事件,并且是在选择器中注册过的IO事件,就会被选择器选中,并放入SelectionKey(选择键)的集合中。
????????SelectionKey是什么呢?简单的说,SelectionKey就是那些被选择器选中的IO事件。前面讲到,一个IO事件发生(就绪装填达成)后,如果之前在选择器中注册过,就会被选择器选中,并放入SelectionKey中;如果之前没有注册过,那么即使发生了IO事件,也不会被选择器选中。SelectionKey和IO的关系可以简单地理解为SelectionKey就是被选中了的IO事件。
? ? ? ? 在实际编程时,SelectionKey的功能是很强大的。通过SelectionKey,不仅可以获得通道的IO事件类型(比如SelectionKey.OP_READ),还可以获得选择器实例。
? ? ? ? 选择器的使用主要有以下三步:
? ? ? ? (1)获取选择器实例。选择器实例是通过调用静态工厂方法open()来获取的。具体如下:
Selector selector = Selector.open();
? ? ? ? Selector的类方法open()的内部是向选择器SPI发出请求,通过默认的SelectorProvider(选择器提供者)对象获取一个新的选择器实例。Java中的SPI(服务提供者接口)是一种可以扩展的服务提供和发现机制。Java通过SPI的方式提供选择器的默认实现版本。也就是说,其他的服务提供者可以通过SPI的方式提供定制化版本的选择器的动态替换或者扩展。
? ? ? ? (2)将通道注册到选择器实例。要实现选择器管理通道,需要将通道注册到相应的选择器上,简单的示例代码如下:
//1.获取Selector选择器
Selector selector = Selector.open();
//2.获取通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
//3.设置为非阻塞
serverChannel.configureBlocking(false);
//4.绑定连接
InetSocketAddress address = new InetSocketAddress(8080);
serverSocket.bind(address);
//5.将通道注册到选择器上,并注册的IO事件为:"接收新连接"
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
? ? ? ? 上面通过调用通道的register()方法将 ServerSocketChannel 注册到一个选择器上。当然,在注册之前,需要准备好通道。
? ? ? ? 这里需要注意:注册到选择器的通道必须处于非阻塞模式下,否则将抛出异常。这意味着,FileChannel不能与选择器一起使用,因为FileChannel只有阻塞模式,不能切换到非阻塞模式;而socket相关的所有通道都可以。其次,一个通道并一定支持所有的四种IO事件。例如,服务器监听通道ServerSocketChannel 仅支持Accept(接收到新连接)IO事件,而传输通道SocketChannel则不同,它不支持Accept类型的IO时间。
? ? ? ? 如何判断通道支持哪些事件呢?可以在注册之前通过通道的validOps()方法来获取该通道支持的所有IO事件集合。
? ? ? ? (3)选出感兴趣的IO就绪时间(选择键集合)。通过Selector的select()方法,选出已经注册的、已经就绪的IO事件,并且保存到SelectionKey集合中。SelectionKey集合保存在选择器实例内部,其元素为SelectionKey类型实例。调用选择器的selectionKeys()方法,可以取得选择键集合。
? ? ? ? 接下来,迭代集合的每一个选择键,根据具体IO事件类型执行对应的业务操作。大致的处理流程如下:
public void test() throws IOException{
//1.获取Selector选择器
Selector selector = Selector.open();
//2.获取通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
//3.设置为非阻塞
serverChannel.configureBlocking(false);
//4.绑定连接
InetSocketAddress address = new InetSocketAddress(8080);
serverSocket.bind(address);
//5.将通道注册到选择器上,并注册的IO事件为:"接收新连接"
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("serverChannel 在监听");
//6.轮询感兴趣的IO就绪事件(选择键集合)
while(selector.select() > 0){
if(null == selector.selectedKeys()) continue;
//7.获取键集合
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
//8.获取单个的选择键,并处理
SelectionKey key = it.next();
if(null == key) continue;
//IO事件ServerSocketChannel 服务器监听通道有新连接
if(key.isAcceptable()){
//业务处理
}
//IO事件传输通道连接成功
else if(key.isConnectable()){
}
//IO事件传输通道可读
else if(key.isReadable()){
}
//IO事件传输通道可写
else if(key.isWritable()){
}
//处理完成后,移除选择键
it.remove();
}
}
}
? ? ? ? 处理完成后,需要将选择键从SelectionKey集合中移除,以防止下一次循环时被重复处理。SelectionKey集合不能添加元素,则将抛出异常。
? ? ? ? 用于选择就绪的IO事件select()方法有多个重载的实现版本,具体如下:
? ? ? ? 1、select():阻塞调用,直到至少有一个通道发生了注册的IO事件。
? ? ? ? 2、select(long timeout):和select()一样,但最长阻塞时间为timeout指定毫秒数。
? ? ? ? 3、selectNow():非阻塞,不管有没有IO事件都会立刻返回。
? ? ? ? select()方法的返回值是整数类型(int),表示发生了IO事件的数量,即从上一次select到这一次select之间有多少通道发生了IO事件,更加准备地说是发生了选择器感兴趣(注册过)的IO事件数。
? ? ? ? Discard服务器的功能很简单:仅读取客户端通道的输入数据,读取完成后直接关闭客户端通道,并且直接抛弃掉(Discard)读取到的数据。
public class NioDiscardServer {
public static void startServer() throws IOException{
//1.创建一个 Selector 选择器
Selector selector = Selector.open();
//2. 获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//3.设置为非阻塞
serverSocketChannel.configureBlocking(false);
//4.绑定监听端口
serverSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("服务器启动成功");
//5. 将通道注册到选择器上,并注册的IO事件为接收新连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//6. 轮询感兴趣的IO就绪时间(选择键集合)
while(selector.select() > 0){
//7. 获取选择键集合
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while(selectedKeys.hasNext()){
//8. 获取单个的选择键,并处理
SelectionKey selectionKey = selectedKeys.next();
//9.判断key是具体的什么事件
if(selectionKey.isAcceptable()){
System.out.println("发生了 新连接到来事件 "+ selectionKey.channel());
//10. 若选择键的IO事件是"连接就绪"事件,就获取客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
//11. 切换非阻塞模式
socketChannel.configureBlocking(false);
//12. 将该通道注册到selector选择器上
SelectionKey channelSK = socketChannel.register(selector,SelectionKey.OP_READ | SelectionKey.OP_CONNECT | SelectionKey.OP_CONNECT);
}
if(selectionKey.isWritable()){
System.out.println("发生了写就绪事件 " + selectionKey.channel());
}
if(selectionKey.isConnectable()){
System.out.println("发生了客户端 连接成功事件 " + selectionKey.channel());
}
if(selectionKey.isReadable()){
System.out.println("发生了读 就绪事件 " + selectionKey.channel());
//13. 若选择键的IO事件是“可读”事件,读取数据
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//14. 读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
while((length = socketChannel.read(byteBuffer)) > 0){
byteBuffer.flip();
Logger.info(new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
socketChannel.close();
}
selectedKeys.remove();
}
}
serverSocketChannel.close();
}
public static void main(String[] args) throws IOException {
startServer();
}
}
????????