Halo
发布于 2022-05-23 / 141 阅读 / 0 评论 / 0 点赞

nio

io 演进

  1. bio(blocking I/O), 阻塞io, 面向流的处理数据, 一直等待系统io的完成
  2. nio(non-blocking I/O), 非阻塞io, 面向块(缓冲区)处理数据, 轮询所有IO操作的状态改变, 底层是epoll 实现
  3. aio(Asynchronous I/O), 异步io, 面向块(缓冲区)处理数据, IO操作的状态改变后系统自动通知, 底层是epoll 实现

nio 构成

Selector(选择器), Channel(通道),Buffer(缓冲区)

  • 一个线程一个 Selector
  • 一个Selector 多个 channel
  • 一个 Selector 一个Buffer
  • Buffer 就是一个内存块 , 底层是有一个数组

Selector

Selector 能够检测多个注册的服务端通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。
这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求

Channel

  • 所有IO都是在 Channel(通道)流通的.
  • Channel 可以读也可以写
  • Channel 是基于缓冲区Buffer来读写

Buffer

  • 一个固定数据量的指定基本类型的数据容器。
  • 缓冲区还具有位置和界限.
  • 某个时刻能是是读或者写

example

服务端

public class EchoServer {

    private static final String POISON_PILL = "POISON_PILL";

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);  // 将java nio事件转成epoll事件,然后维护fd与事件对应关系,将fd放到变更数据中
        ByteBuffer buffer = ByteBuffer.allocate(256);

        while (true) {
            selector.select(); // 阻塞
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {

                SelectionKey key = iter.next();

                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }

                if (key.isReadable()) {
                    answerWithEcho(buffer, key);
                }
                iter.remove();
            }
        }
    }

    private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
      throws IOException {
 
        SocketChannel client = (SocketChannel) key.channel();
        client.read(buffer);
        if (new String(buffer.array()).trim().equals(POISON_PILL)) {
            client.close();
            System.out.println("Not accepting client messages anymore");
        }
        else {
            buffer.flip();  // 读写模式转换
            client.write(buffer);
            buffer.clear();
        }
    }

    private static void register(Selector selector, ServerSocketChannel serverSocket)
      throws IOException {
 
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
    }

    public static Process start() throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = EchoServer.class.getCanonicalName();

        ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);

        return builder.start();
    }
}

客户端

public class EchoClient {
    private static SocketChannel client;
    private static ByteBuffer buffer;
    private static EchoClient instance;

    public static EchoClient start() {
        if (instance == null)
            instance = new EchoClient();

        return instance;
    }

    public static void stop() throws IOException {
        client.close();
        buffer = null;
    }

    private EchoClient() {
        try {
            client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            buffer = ByteBuffer.allocate(256);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public String sendMessage(String msg) {
        buffer = ByteBuffer.wrap(msg.getBytes());
        String response = null;
        try {
            client.write(buffer);
            buffer.clear();
            client.read(buffer);
            response = new String(buffer.array()).trim();
            System.out.println("response=" + response);
            buffer.clear();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;

    }
}

评论