Socket-Worker模式

发布时间:2024年01月16日

介绍

Socket-客户端和服务端通信改造

服务端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
public class ServerSocket {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel server = ServerSocketChannel.open();
        server.configureBlocking(Boolean.FALSE);
        server.bind(new InetSocketAddress(8888));

        Selector selector = Selector.open();
        SelectionKey selectionKey = server.register(selector, SelectionKey.OP_ACCEPT, null);
        Worker worker = new Worker("worker-0");
        while(true){
            selector.select();
            Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
            while (selectionKeyIterator.hasNext()){
                SelectionKey key = selectionKeyIterator.next();
                selectionKeyIterator.remove();
                if(key.isAcceptable()){
                    ServerSocketChannel serverSocketChannel  = (ServerSocketChannel) key.channel();
                    SocketChannel client = serverSocketChannel.accept();
                    client.configureBlocking(Boolean.FALSE);
                    System.out.println("注册前");
                    worker.register(client);
                    System.out.println("注册后");
                }
            }

        }
    }
    static class Worker implements Runnable{
        private Selector selector;
        private String name;
        private Thread thread;
        private Boolean initd = Boolean.FALSE;
        private ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<>();
        public Worker(String name) {
            this.name = name;
        }

        public void register(SocketChannel socketChannel) throws IOException {
            runnables.add(() -> {
                SelectionKey selectionKey = null;
                try {
                    selectionKey = socketChannel.register( this.selector, 0, ByteBuffer.allocate(16));
                    selectionKey.interestOps(SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });

            if(!initd){
                this.selector = Selector.open();
                this.thread = new Thread(this);
                this.thread.setName(this.name);
                this.thread.start();
                this.initd = Boolean.TRUE;
            }



        }

        @Override
        public void run() {
            while (true){
                try {
                    Iterator<Runnable> iterator = runnables.iterator();
                    while(iterator.hasNext()){
                        System.out.println("注册");
                        Runnable runnable = iterator.next();
                        iterator.remove();
                        runnable.run();
                    }
                    this.selector.select();
                    Iterator<SelectionKey> selectionKeyIterator = this.selector.selectedKeys().iterator();
                    while (selectionKeyIterator.hasNext()){
                        SelectionKey selectionKey = selectionKeyIterator.next();
                        selectionKeyIterator.remove();
                        if(selectionKey.isReadable()){
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                            socketChannel.read(buffer);
                            buffer.flip();
                            if(buffer.limit() == buffer.capacity()){
                                ByteBuffer bufferNew = ByteBuffer.allocate( buffer.capacity() * 2);
                                bufferNew.put(buffer);
                                selectionKey.attach(bufferNew);
                            }else{
                                System.out.println("work-读数据");
                                System.out.println(StandardCharsets.UTF_8.decode(buffer));
                                buffer.clear();
                                selectionKey.attach(ByteBuffer.allocate(16));
                            }

                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

客户端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class ClientSocket {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 8888));
        System.out.println(socketChannel.isConnected());
        Scanner scanner = new Scanner(System.in);
        while(true){
            String content = scanner.next();
            if("read".equals(content)){
                socketChannel.write(StandardCharsets.UTF_8.encode(content));
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                socketChannel.read(buffer);
                buffer.flip();
                System.out.println(StandardCharsets.UTF_8.decode(buffer));
            }
        }
    }
}

文章来源:https://blog.csdn.net/sunboylife/article/details/135627747
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。