nio模拟netty

发布时间:2023年12月26日

netty底层调用了nio相关功能,但是由于netty代码量较大,将nio层层封装,导致没接触过nio开发的程序员无法快速理解netty底层究竟怎样调用nio,又对nio实现了什么封装。本篇文章尽可能(水平有限,只能做到"尽可能")在不依赖netty库的前提下模拟netty对nio的调用,尽量把每一行代码在netty中的相应位置阐释出来。

未完待续,部分注解后期补充。

接收服务端消息并打印

package com.example.demo;

import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SocketUtils;

import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

class DemoApplicationTests3 {

    public static void main(String[] args) throws Throwable {
        Set<SelectionKey> selectedKeys = new HashSet<>();

        SelectorProvider provider = SelectorProvider.provider();
        AbstractSelector unwrappedSelector = provider.openSelector();

        ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(SocketUtils.socketAddress("127.0.0.1", 9004), 200);
        SelectionKey key = serverSocketChannel.register(unwrappedSelector, 0);
        key.interestOps(16);

        /**
         * io.netty.channel.nio.NioEventLoop#openSelector()
         */
        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");

        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
        PlatformDependent.putObject(
                unwrappedSelector, selectedKeysFieldOffset, selectedKeys);

        while (true) {
            int select = unwrappedSelector.select();
            System.out.println("asdaaa:" + select);
            Iterator<SelectionKey> iterator = selectedKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey next = iterator.next();
                iterator.remove();
                handleInput(next, unwrappedSelector);
            }
            System.out.println("over");
        }
    }


    private static void handleInput(SelectionKey key, Selector selector) throws IOException {

        if (key.isValid()) {
            // 处理新接入的请求消息
            if (key.isAcceptable()) {
                // Accept the new connection
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                // Add the new connection to the selector
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                // Read the data
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : "
                            + body);
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    // 读到0字节,忽略
                }

            }
        }
    }

    private static void doWrite(SocketChannel channel, String response)
            throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }

}

客户端,负责链接服务端并发送消息

package com.example.demo;

import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SocketUtils;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

class DemoApplicationTests2 {


    private static Thread thread = new Thread(() -> {
        try {
            while (true) {
                Thread.sleep(1000);
                doWrite(DemoApplicationTests2.socketChannel, "wjwljwjlj");
                System.out.println("send msg");
            }
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
    });
    private static SocketChannel socketChannel;

    public static void main(String[] args) throws Throwable {
        Set<SelectionKey> selectedKeys = new HashSet<>();

        SelectorProvider provider = SelectorProvider.provider();
        AbstractSelector unwrappedSelector = provider.openSelector();

        SocketChannel socketChannel = provider.openSocketChannel();
        socketChannel.configureBlocking(false);
        socketChannel.register(unwrappedSelector, SelectionKey.OP_READ);

        /**
         * io.netty.channel.nio.NioEventLoop#openSelector()
         */
        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");

        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
        PlatformDependent.putObject(
                unwrappedSelector, selectedKeysFieldOffset, selectedKeys);



        // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
        if (socketChannel.connect(new InetSocketAddress("127.0.0.1", 9004))) {
            socketChannel.register(unwrappedSelector, SelectionKey.OP_READ);
            doWrite(socketChannel, "currentTime");
        } else {
            socketChannel.register(unwrappedSelector, SelectionKey.OP_CONNECT);
        }


        DemoApplicationTests2.socketChannel = socketChannel;

        while (true) {

            int select = unwrappedSelector.select();
            System.out.println("asdaaa:" + select);
            Iterator<SelectionKey> iterator = selectedKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey next = iterator.next();
                handleInput(next, unwrappedSelector);
            }
            System.out.println("over");
            if (!DemoApplicationTests2.thread.isAlive()) {
                DemoApplicationTests2.thread.start();
            }
        }
    }


    private static void handleInput(SelectionKey key, Selector selector) throws IOException {
        if (key.isValid()) {
            // 判断是否连接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc, "connected");
                } else
                    System.exit(1);// 连接失败,进程退出
            }
            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("Now is : " + body);
//                    this.stop = true;
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else
                    ; // 读到0字节,忽略
            }
        }
    }

    private static void doWrite(SocketChannel channel, String response)
            throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }

}

文章来源:https://blog.csdn.net/m1f2c3/article/details/135217807
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。