netty底层调用了nio相关功能,但是由于netty代码量较大,将nio层层封装,导致没接触过nio开发的程序员无法快速理解netty底层究竟怎样调用nio,又对nio实现了什么封装。本篇文章尽可能(水平有限,只能做到"尽可能")在不依赖netty库的前提下模拟netty对nio的调用,尽量把每一行代码在netty中的相应位置阐释出来。
未完待续,部分注解后期补充。
接收服务端消息并打印
package com.example.demo;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SocketUtils;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
class DemoApplicationTests3 {
public static void main(String[] args) throws Throwable {
Set<SelectionKey> selectedKeys = new HashSet<>();
SelectorProvider provider = SelectorProvider.provider();
AbstractSelector unwrappedSelector = provider.openSelector();
ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(SocketUtils.socketAddress("127.0.0.1", 9004), 200);
SelectionKey key = serverSocketChannel.register(unwrappedSelector, 0);
key.interestOps(16);
/**
* io.netty.channel.nio.NioEventLoop#openSelector()
*/
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeys);
while (true) {
int select = unwrappedSelector.select();
System.out.println("asdaaa:" + select);
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
iterator.remove();
handleInput(next, unwrappedSelector);
}
System.out.println("over");
}
}
private static void handleInput(SelectionKey key, Selector selector) throws IOException {
if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// Read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server receive order : "
+ body);
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else {
// 读到0字节,忽略
}
}
}
}
private static void doWrite(SocketChannel channel, String response)
throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
客户端,负责链接服务端并发送消息
package com.example.demo;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SocketUtils;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
class DemoApplicationTests2 {
private static Thread thread = new Thread(() -> {
try {
while (true) {
Thread.sleep(1000);
doWrite(DemoApplicationTests2.socketChannel, "wjwljwjlj");
System.out.println("send msg");
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
private static SocketChannel socketChannel;
public static void main(String[] args) throws Throwable {
Set<SelectionKey> selectedKeys = new HashSet<>();
SelectorProvider provider = SelectorProvider.provider();
AbstractSelector unwrappedSelector = provider.openSelector();
SocketChannel socketChannel = provider.openSocketChannel();
socketChannel.configureBlocking(false);
socketChannel.register(unwrappedSelector, SelectionKey.OP_READ);
/**
* io.netty.channel.nio.NioEventLoop#openSelector()
*/
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeys);
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if (socketChannel.connect(new InetSocketAddress("127.0.0.1", 9004))) {
socketChannel.register(unwrappedSelector, SelectionKey.OP_READ);
doWrite(socketChannel, "currentTime");
} else {
socketChannel.register(unwrappedSelector, SelectionKey.OP_CONNECT);
}
DemoApplicationTests2.socketChannel = socketChannel;
while (true) {
int select = unwrappedSelector.select();
System.out.println("asdaaa:" + select);
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
handleInput(next, unwrappedSelector);
}
System.out.println("over");
if (!DemoApplicationTests2.thread.isAlive()) {
DemoApplicationTests2.thread.start();
}
}
}
private static void handleInput(SelectionKey key, Selector selector) throws IOException {
if (key.isValid()) {
// 判断是否连接成功
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc, "connected");
} else
System.exit(1);// 连接失败,进程退出
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
// this.stop = true;
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}
private static void doWrite(SocketChannel channel, String response)
throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}