io 演进
- bio(blocking I/O), 阻塞io, 面向流的处理数据, 一直等待系统io的完成
- nio(non-blocking I/O), 非阻塞io, 面向块(缓冲区)处理数据, 轮询所有IO操作的状态改变, 底层是epoll 实现
- aio(Asynchronous I/O), 异步io, 面向块(缓冲区)处理数据, IO操作的状态改变后系统自动通知, 底层是epoll 实现
nio 构成
Selector(选择器), Channel(通道),Buffer(缓冲区)
- 一个线程一个 Selector
- 一个Selector 多个 channel
- 一个 Selector 一个Buffer
- Buffer 就是一个内存块 , 底层是有一个数组
Selector
Selector 能够检测多个注册的服务端通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。
这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求
Channel
- 所有IO都是在 Channel(通道)流通的.
- Channel 可以读也可以写
- Channel 是基于缓冲区Buffer来读写
Buffer
- 一个固定数据量的指定基本类型的数据容器。
- 缓冲区还具有位置和界限.
- 某个时刻能是是读或者写
example
服务端
public class EchoServer {
private static final String POISON_PILL = "POISON_PILL";
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", 5454));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT); // 将java nio事件转成epoll事件,然后维护fd与事件对应关系,将fd放到变更数据中
ByteBuffer buffer = ByteBuffer.allocate(256);
while (true) {
selector.select(); // 阻塞
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
register(selector, serverSocket);
}
if (key.isReadable()) {
answerWithEcho(buffer, key);
}
iter.remove();
}
}
}
private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
if (new String(buffer.array()).trim().equals(POISON_PILL)) {
client.close();
System.out.println("Not accepting client messages anymore");
}
else {
buffer.flip(); // 读写模式转换
client.write(buffer);
buffer.clear();
}
}
private static void register(Selector selector, ServerSocketChannel serverSocket)
throws IOException {
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}
public static Process start() throws IOException, InterruptedException {
String javaHome = System.getProperty("java.home");
String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
String classpath = System.getProperty("java.class.path");
String className = EchoServer.class.getCanonicalName();
ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);
return builder.start();
}
}
客户端
public class EchoClient {
private static SocketChannel client;
private static ByteBuffer buffer;
private static EchoClient instance;
public static EchoClient start() {
if (instance == null)
instance = new EchoClient();
return instance;
}
public static void stop() throws IOException {
client.close();
buffer = null;
}
private EchoClient() {
try {
client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
buffer = ByteBuffer.allocate(256);
} catch (IOException e) {
e.printStackTrace();
}
}
public String sendMessage(String msg) {
buffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
try {
client.write(buffer);
buffer.clear();
client.read(buffer);
response = new String(buffer.array()).trim();
System.out.println("response=" + response);
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
}