介绍
对Socket-客户端和服务端通信改造
服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
public class ServerSocket {
public static void main(String[] args) throws IOException {
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(Boolean.FALSE);
server.bind(new InetSocketAddress(8888));
Selector selector = Selector.open();
SelectionKey selectionKey = server.register(selector, SelectionKey.OP_ACCEPT, null);
Worker worker = new Worker("worker-0");
while(true){
selector.select();
Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
while (selectionKeyIterator.hasNext()){
SelectionKey key = selectionKeyIterator.next();
selectionKeyIterator.remove();
if(key.isAcceptable()){
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel client = serverSocketChannel.accept();
client.configureBlocking(Boolean.FALSE);
System.out.println("注册前");
worker.register(client);
System.out.println("注册后");
}
}
}
}
static class Worker implements Runnable{
private Selector selector;
private String name;
private Thread thread;
private Boolean initd = Boolean.FALSE;
private ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<>();
public Worker(String name) {
this.name = name;
}
public void register(SocketChannel socketChannel) throws IOException {
runnables.add(() -> {
SelectionKey selectionKey = null;
try {
selectionKey = socketChannel.register( this.selector, 0, ByteBuffer.allocate(16));
selectionKey.interestOps(SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
if(!initd){
this.selector = Selector.open();
this.thread = new Thread(this);
this.thread.setName(this.name);
this.thread.start();
this.initd = Boolean.TRUE;
}
}
@Override
public void run() {
while (true){
try {
Iterator<Runnable> iterator = runnables.iterator();
while(iterator.hasNext()){
System.out.println("注册");
Runnable runnable = iterator.next();
iterator.remove();
runnable.run();
}
this.selector.select();
Iterator<SelectionKey> selectionKeyIterator = this.selector.selectedKeys().iterator();
while (selectionKeyIterator.hasNext()){
SelectionKey selectionKey = selectionKeyIterator.next();
selectionKeyIterator.remove();
if(selectionKey.isReadable()){
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
socketChannel.read(buffer);
buffer.flip();
if(buffer.limit() == buffer.capacity()){
ByteBuffer bufferNew = ByteBuffer.allocate( buffer.capacity() * 2);
bufferNew.put(buffer);
selectionKey.attach(bufferNew);
}else{
System.out.println("work-读数据");
System.out.println(StandardCharsets.UTF_8.decode(buffer));
buffer.clear();
selectionKey.attach(ByteBuffer.allocate(16));
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class ClientSocket {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8888));
System.out.println(socketChannel.isConnected());
Scanner scanner = new Scanner(System.in);
while(true){
String content = scanner.next();
if("read".equals(content)){
socketChannel.write(StandardCharsets.UTF_8.encode(content));
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer);
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
}
}
}
}