Java-NIO篇章(4)——Selector选择器详解

发布时间:2024年01月19日

Selector介绍

选择器(Selector)是什么呢?选择器和通道的关系又是什么?这里详细说明,假设不用选择器,那么一个客户端请求数据传输那就需要建立一个连接,为了避免线程阻塞,那么每个客户端开辟一个线程。而学过JVM的都知道,默认每开一个线程需要栈空间内存1MB大小。如果这时候有大量的客户端连接请求,那么这个内存占用是非常可怕的,而且开辟大量的线程将导致CPU频繁上下文切换,效率非常低。举个例子,我们的服务器就是一家餐厅,客户端就是顾客,餐厅为顾客服务,如果每来一个客人(客户端请求)我们就派一个服务员(线程)那么这样消耗是消耗不起的。最正常的逻辑是,餐厅只招聘一个服务员(一个线程),然后通过一个监控器(Selector)监控所有顾客的需求(监控IO事件),如果哪个顾客需要服务就喊一下(这个信号就是下面的IO事件),然后服务员就跑过去为他服务。这样虽然一个线程很累,但是只需要一个线程就可以处理大量的socket连接,参考Redis单线程模式设计就知道一个线程如果专心处理非阻塞不耗时的业务是非常非常快的。借用一张网图非常清楚地描述了Selector、Channel、Buffer三个核心组件的关系,如下图所示:
在这里插入图片描述

来一段专业的介绍:选择器的使命是完成IO的多路复用,其主要工作是通道的注册、监听、事件查询。一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO(输入输出)状况。选择器和通道的关系,是监控和被监控的关系。 选择器提供了独特的API方法,能够选出(select)所监控的通道已经发生了哪些IO事件,包括读写就绪的IO操作事件。 一般是一个单线程处理一个选择器,一个选择器可以监控很多通道。所以,通过选择器,一个单线程可以处理数百、数千、数万、甚至更多的通道。在极端情况下(数万个连接),只用一个线程就可以处理所有的通道,这样会大量地减少线程之间上下文切换的开销。

先介绍什么是IO事件吧,这里的IO事件不是对通道的IO操作,而是通道处于某个IO操作的就绪状态,表示通道具备执行某个IO操作的条件。 比方说某个SocketChannel传输通道,如果完成了和对端的三次握手过程,则会发生“连接就绪” (OP_CONNECT)的事件。再比方说某个ServerSocketChannel服务器连接监听通道,在监听到一个新连接的到来时,则会发生“接收就绪”(OP_ACCEPT)的事件。还比方说,一个SocketChannel通道有数据可读,则会发生“读就绪”(OP_READ)事件;一个等待写入数据的SocketChannel通道,会发生写就绪(OP_WRITE)事件。这里注意,只有FileChannel文件通道不可用被选择器监控或选择的。其他的三个通道都可以被Selector监控。

通道和选择器之间的关联,通过register(注册)的方式完成。调用通道的Channel.register (Selector selector, int ops)方法,可以将通道实例注册到一个选择器中。 register方法有两个参数:第一个参数,指定通道注册到的选择器实例; 第二个参数,指定选择器要监控的IO事件类型。可供选择器监控的通道IO事件类型,包括以下四种:

  • 可读: SelectionKey.OP_READ
  • 可写:SelectionKey.OP_WRITE
  • 连接:SelectionKey.OP_CONNECT
  • 接收: SelectionKey.OP_ACCEPT

以上的事件类型常量定义在SelectionKey类中。如果选择器要监控通道的多种事件,可以用“按位或”运算符来实现。例如,同时监控可读和可写IO事件:

//监控通道的多种事件,用“按位或”运算符来实现
int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE ;

SelectionKey选择键

通道和选择器的监控关系, 本质是一种多对一的关联关系。 一个选择器Selector可以监控多个通道Channel,那么如何区分不同的Channel呢?很简单,给每个Channel取一个唯一的名字就行,这个名字就是SelectionKey,这样就可以维护不同的Channel了。Selector并不直接去管理Channel,而是直接管理SelectionKey,通过SelectionKey与Channel发生关系。一个Channel最多能向Selector注册一次,注册之后就形成了唯一的SelectionKey, 然后被Selector管理起来。 Selector有一个核心成员keys,专门用于管理注册上来的SelectionKey, Channel注册到Selector后所创建的那一个唯一的SelectionKey,添加在这个keys成员中,这是一个HashSet类型的集合。 除了成员keys之外, Selector还有一个核心成员selectedKeys,用于存放已经发生了IO事件的SelectionKey。怎么样?绕晕了吗?别慌,看下面的图:

在这里插入图片描述

SelectionKey是IO事件的记录者(或存储者) , SelectionKey 有三个核心成员,一个是关联的Channel通道,另外两个分别存储着自己关联的Channel上的感兴趣IO事件和已经发生的IO事件。Channel通道上可以发生多种IO事件,比如说读就绪事件、写就绪事件、新连接就绪事件,但是SelectionKey记录事件的成员却是一个整数类型。 这样问题就来了,一个整数如何记录多个事件呢?答案是,通过比特位来完成的。 具体的IO事件所占用的哪一个比特位,通过常量的方式定义在SelectionKey中, 如下:

//读取就绪事件,第 0 位
public static final int OP_READ = 1 << 0;
//写入就绪事件,第 2 位
public static final int OP_WRITE = 1 << 2;
//传输通道建立成功的 IO 事件,第 3 位
public static final int OP_CONNECT = 1 << 3;
//新连接就绪事件,第 4 位
public static final int OP_ACCEPT = 1 << 4;

通过SelectionKey的interestOps成员上相应的比特位,可以设置、查询关联的Channel所感兴趣的IO事件;通过SelectionKey的readyOps上相应的比特位,可以查询关联Channel所已经发生的IO事件。 对于interestOps成员上的比特位, 应用程序是可以设置的;但是对于readyOps上的比特位,应用程序只能查询,不能设置。因为,readyOps上的比特位是已经发生了的IO事件,只能由客户端被动触发,不能主动设置。readyOps发生的IO事件只能是Channel感兴趣的interestOps中的IO事件。通道和选择器的监控关系注册成功后, Selector就可以查询就绪事件。具体的查询操作,是通过调用选择器Selector的select( )系列方法来完成。通过select系列方法,可以不断地查询通道中所发生操作的就绪状态(或者IO事件) , 并且把这些发生了底层IO事件,转换成Java NIO中的IO事件,记录在的通道关联的SelectionKey的readyOps上。除此之外,发生了IO事件的SelectionKey,还会记录在Selector内部selectedKeys集合中。简单来说, 一旦在通道中发生了某些IO事件(就绪状态达成),这个事件就被记录在SelectionKey的readyOps上,并且这个SelectionKey被记录在Selector内部的selectedKeys集合中。(1) 通道必须在Selector注册过;(2) 所发生的事件必须是SelectionKey上interestOps成员记录的事件。

使用Selector选择器

使用选择器,主要有以下三步:

  • 获取选择器实例;
  • 将通道注册到选择器中;
  • 轮询感兴趣的IO就绪事件(选择键集合)。

第一步:获取选择器实例。选择器实例是通过调用静态工厂方法open()来获取的,具体如下:

//调用静态工厂方法 open()来获取 Selector 实例
Selector selector = Selector.open();

第二步:将通道注册到选择器实例。要实现选择器管理通道,需要将通道注册到相应的选择器上,简单的示例代码如下:

// 2.获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3.设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 4.绑定连接
serverSocketChannel.bind(new InetSocketAddress(18899));
// 5.将通道注册到选择器上,并制定监听事件为:“接收连接”事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

这里需要注意:注册到选择器的通道,必须处于非阻塞模式下,否则将抛出IllegalBlockingModeException异常。还需要注意:一个通道,并不一定要支持所有的四种IO事件。例如服务器监听通道ServerSocketChannel,仅仅支持Accept(接收到新连接) IO事件;而传输通道SocketChannel则不同,该类型通道仅不支持Accept类型的IO事件。

第三步:选出感兴趣的IO就绪事件(选择键集合)。通过Selector选择器的select()方法 ,选出已经注册的、已经就绪的IO事件,并且保存到SelectionKey选择键集合中。 SelectionKey集合保存在选择器实例内部,其元素为SelectionKey类型实例。调用选择器的selectedKeys()方法,可以取得选择键集合。

//轮询,选择感兴趣的 IO 就绪事件(选择键集合)
while (selector.select() > 0) {
    Set selectedKeys = selector.selectedKeys();
    Iterator keyIterator = selectedKeys.iterator();
    while(keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        	//根据具体的 IO 事件类型,执行对应的业务操作
        if(key.isAcceptable()) {
        	// IO 事件: ServerSocketChannel 服务器监听通道有新连接
        } else if (key.isConnectable()) {
        	// IO 事件:传输通道连接成功
        } else if (key.isReadable()) {
        	// IO 事件:传输通道可读
        } else if (key.isWritable()) {
        	// IO 事件:传输通道可写
        }
        //处理完成后,移除选择键
        keyIterator.remove();
    }
}

处理完成后,需要将选择键从这个SelectionKey集合中移除,防止下一次循环的时候,被重复的处理。 SelectionKeys集合不能添加元素。select()方法的返回值的是整数类型(int),表示发生了IO事件的数量。更准确地说,是从上一次select到这一次select之间,有多少通道发生了IO事件,更加准确地说,是指发生了选择器感兴趣(注册过)的IO事件数。

用于选择就绪的IO事件的select()方法,有多个重载的实现版本,具体如下:

  • select():阻塞调用,一直到至少有一个通道发生了注册的IO事件。
  • select(long timeout):和select()一样,但最长阻塞时间为timeout指定的毫秒数。
  • selectNow():非阻塞,不管有没有IO事件,都会立刻返回。

常用的是select():阻塞调用,因为如果没有IO事件发生的话CPU就不用在那儿空旋了,这样大大减少了系统消耗。

客户端连接服务器并发送数据例子

下面将举例将上面介绍的三个核心组件以一个案例的形式综合运用,代码如下:

首先是服务端的代码:

public class SelectorServer {
    public static void main(String[] args) throws IOException {
        // 1.创建selector,管理多个channel
        Selector selector = Selector.open();
        // ServerSocketChannel 可以获取连接通道和套接字通道
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);//开启非阻塞时连接,影响的只是 channel.accept();

        // 2. 建立selector与channel之间的联系(注册channel)
        // (事件有四种:accept 有连接请求时触发 、connect 客户端建立后触发的事件、read 可读事件、write 可写事件)
        // sscKey 代表了 ssc连接通道与selector的关联关系
        SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT); // 表示 ssc通道 只关注 accept 事件
        log.debug("sscKey:"+sscKey);

        ssc.bind(new InetSocketAddress(8080)); // 服务器程序的端口号,ip为本机ip
        while (true){
            //3. select 方法,发生了上述事件才会向下继续执行,否则阻塞
            // selector 在事假未被处理时会将事件重新加入,因此一个事件要么处理要么取消,不能置之不理
            selector.select(); // 如果没有事件则阻塞
            //4. 处理事件 , 获取所有发生的事件
            // 获取所有注册的channel的key,可以拿到key访问channel
            Set<SelectionKey> selectionKeys = selector.selectedKeys(); 
            Iterator<SelectionKey> iter = selectionKeys.iterator();
            while (iter.hasNext()){
                SelectionKey selectionKey = iter.next();
                iter.remove(); // 拿到了立即移除
                log.debug("Key:"+selectionKey);

                if(selectionKey.isAcceptable()){
                    // 如果是连接就绪事件,那就获取对应的ServerSocketChannel,然后在接受获得可以数据传输的SocketChannel
                    // 通过key获取channel
                    ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel(); 
                    SocketChannel sc = channel.accept(); // 前面设置了非阻塞,没有连接就返回null
                    sc.configureBlocking(false); //开启非阻塞读,影响的只是 channel.read(buffer);
                    SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ, null);
                    //其实上面第二个参数已经绑定了感兴趣的IO事件,这行不写也行,或者下面这行保留,上面第二个参数给0就行
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("SocketChannel:"+sc);
                } else if (selectionKey.isReadable()) {
                    // 如果是可读事件,那么就读取内容
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    channel.read(buffer);
                    buffer.flip();
                    CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer);
                    String msg = charBuffer.toString();
                    System.out.println("客户端发来的信息:"+msg);
                }
            }
            ssc.close();
        }
    }
}

其次是客户端的代码:

public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",8080));
        sc.configureBlocking(false);
        System.out.println("waiting......");
        while(!sc.finishConnect()){
            // 没有连接完成时等待
            Thread.yield();
        }
        System.out.println("客户端连接成功!");
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        byteBuffer.put("Hello world".getBytes());

        // 发送到服务器
        byteBuffer.flip();
        sc.write(byteBuffer);
        sc.shutdownOutput();
        sc.close();
    }
}

在NIO中,服务器接收新连接的工作,是异步进行的。不像Java的OIO那样,服务器监听连接,是同步的、阻塞的。 NIO可以通过选择器(也可以说成:多路复用器),后续不断地轮询选择器的选择键集合,选择新到来的连接。 有了Linux底层的epoll支持,以及Java NIO Selector选择器等等应用层IO复用技术, Java程序从而可以实现IO通信的高TPS、高并发,使服务器具备并发数十万、数百万的连接能力。 Java的NIO技术非常适合用于高性能、高负载的网络服务器。鼎鼎大名的通信服务器中间件Netty,就是基于Java的NIO技术实现的 。

文章来源:https://blog.csdn.net/cj151525/article/details/135695467
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。