Java中的BIO和NIO

前言

网易的一位面试官问了我一下java的BIO、NIO和AIO之间的区别,感觉自己答的不够好,所以来补一补。

BIO

Blocking-IO,直译过来就是阻塞式的IO,一般配合多线程(线程池)使用。

服务端的代码

下面是服务器端的示例demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Server {

public static void main(String[] args) throws IOException {
// Thread Pool
ExecutorService executor = Executors.newFixedThreadPool(100);

ServerSocket serverSocket = new ServerSocket(8088);
while (true) {
Socket socket = serverSocket.accept();

// let thread pool to deal with
executor.submit(new ConnectIOnHandler(socket));
}
}
}

class ConnectIOnHandler extends Thread {
private Socket socket;
public final String END_MESSAGE = "bye";

public ConnectIOnHandler(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
try {
PrintStream out = new PrintStream(socket.getOutputStream());
BufferedReader buf = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while (true) {
String line = buf.readLine();
if (line == null || line.length() == 0 || END_MESSAGE.equals(line)) {
break;
} else {
// server show!
System.out.println("get from client = " + line);

// send to the client!
out.println("get :" + line);
}
}
out.close();
buf.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

这里客户端省略了(直接用nc代替就完事儿了)。对于服务端代码的讲解:

  1. 调用内核socket方法,创建一个服务端socket,该方法返回一个数值,该数值被称为文件描述符(fd),这个fd指向这个serversocket。为了叙述方便假设这个fd=5。

  2. 调用内核bind(5,8088)方法绑定socket(fd=5),以及端口8088;

  3. 调用内核listen(5,backlog)方法开启监听,监听socket(fd=5);

  4. 调用accept(5,......)阻塞,等待客户端建立连接;

  5. 当有客户端建立连接时,内核accept方法返回该客户端socket的文件描述符(比如fd=6) ,Java代码通过accept 返回Socket,其实本质上就是一个fd=6,但是被java包装成了Socket。

  6. 调用内核recvfrom(6,....)方法,获取fd=6的客户端发送的数据,该方法是阻塞的;

  7. 调用内核write(1,....)方法,将获取到的数据输出(写入fd=1 ,文件描述符1表示输出);

注意!从jdk1.5开始,底层不再是accept系统调用了,而是改为了poll,但是上文为了循序渐进,仍然使用accept。

不同情况下的异常

这里顺手再插一句,如果用客户端去访问一个压根不存在的主机,那么经过一段时间后就会返回给你这个异常:

image-20200815234523714

而如果你去访问一台存在的主机,而该主机并没有监听指定的端口,则是会马上抛出这个异常:

image-20200815234644217

系统调用执行流程

根据之前在C语言讲socket的时候讲到过的,我们创建一个服务器,必须要走的三个步骤:

  1. 创建socket,java中封装了ServerSocke,对应的系统调用是socket
  2. 进行端口的绑定,java中也进行了封装,而且也封装进了ServerSocket,对应的系统调用是bind
  3. 进行监听。对应的系统调用是listen

上面的这三步走,不论你什么样的服务器,都是必须要走的。接下来的是BIO需要执行的流程:

  1. 阻塞在socket.accept中,在JDK1.4版本中对应的系统调用是accept,会阻塞住;而现在最新版的jdk则是用的poll,然后当有新的连接到来的时候立马执行一下accept
  2. 一旦接受到新的请求,就开启一个线程去执行,对应的是clone
  3. 新的线程开起来之后,阻塞在recvfrom(即recv),这个不论是哪个版本的jdk,都会进行阻塞。

###为什么要使用多线程?

如果是单线程,当一个连接进来,我们顺着流程走,会发现一直在recv这个系统调用中阻塞,无法继续下去,自然也无法建立别的连接。

所以需要使用多线程来解决这个问题,而我上面的例子中用到了线程池,在并发量比较低的情况下,性能还是非常棒的。而且每个线程的逻辑都非常的简单,而且因为用到了线程池技术,本身就可以防止超大量的并发请求同时进来击垮系统。

那么这个的缺点在哪里呢?最主要是因为线程开销会比较大:

  • 我们一般会分配给一个线程栈起码512K,当有一千并发的时候,JVM就需要用到0.5G的内存。
  • 线程切换的成本高。表现在CPU的sy偏高。

可以看到,完全就是因为多线程的存在导致了这部分的开销,从而导致其性能不佳。

如果你的需求只是满足上千级别的并发量,那上面这个BIO是完全可行的,高效且简单。但是现在要求的是超高并发量,就催生出了一种新的IO模型——NIO

NIO

首先,这里需要搞清楚,有两个地方有NIO,一个是在操作系统中,意味着Non-Blocking IO;还有一个是java的nio包,java中的nio的意思是New IO(当然New IO底层实现用到了Non-Blocking IO这项技术,但是更多的依赖了IO多路复用器)。在本文中,为了分辨,下面用大写的NIO来代表操作系统的,而小写的nio代表java中用到的。

我们上面也提到了,就是因为多线程的存在,导致了BIO的性能低下;而多线程的出现是为了解决acceptrecv阻塞的,也就是根本原因在于阻塞,那我们让accept和recv不阻塞不就完了吗。那这个阻塞不阻塞谁唆了算呢?操作系统。也就是操作系统发展了,我们的程序才能实现非阻塞。

通过man 2 socket,可以看到里面有这么一段:

1
2
3
4
5
6
int socket(int domain, int type, int protocol);    // 这段是说明,注意中间那个type
......
Since Linux 2.6.27, the type argument serves a second purpose: in addition to specifying a socket type,it may include the bitwise OR of any of the following values, to modify the behavior of socket():

SOCK_NONBLOCK Set the O_NONBLOCK file status flag on the new open file description. Using this flag
saves extra calls to fcntl(2) to achieve the same result.

翻译一下就是从Linux内核的2.6.27开始,中间的参数可以是SOCK_NONBLOCK,或者是通过fcntl来改变socket是否是阻塞的,也就是可以让socket变成非阻塞的。具体落实到系统调用是这样子的:fcntl(15, F_SETFL, O_RDWR|O_NONBLOCK) = 0 其中的15是server socket的fd。

那么NIO的过程如下:

  1. 创建socket,这个socket被特地声明了是NONBLOCK的
  2. 进行bind
  3. 进行listen
  4. 执行accept,函数不论如何都会立马返回,如果返回值是-1,则说明此时没有新的连接进来。如果有新连接,则返回对应的fd。
  5. 对每个已经连接的socket进行操作。

具体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) throws IOException {
LinkedList<SocketChannel> clients = new LinkedList<>();
ServerSocketChannel channel = ServerSocketChannel.open();
channel.bind(new InetSocketAddress(9898));
channel.configureBlocking(false);

while (true) {
SocketChannel client = channel.accept();
if (client == null) {
System.out.println("null");
} else {
client.configureBlocking(false);
System.out.println("port = " + client.socket().getPort());
clients.add(client);
}

ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);

for (SocketChannel socketChannel : clients) {
int result = socketChannel.read(byteBuffer);
if (result > 0) {
// only result >0 means hava data
byteBuffer.flip();
byte[] data = new byte[byteBuffer.limit()];
byteBuffer.get(data);
System.out.println("data = " + new String(data));
byteBuffer.clear();
}
}
}
}

上面这段程序不论是第8行的accept也好,还是之后的循环中第20行的read读取也好,都不会有阻塞,因为有代码设置了它们是非阻塞的(底层就是通过socket的type参数)。所以会疯狂执行循环,值得注意的是如果没有连接,那可能循环会比较快,所以考虑加一个Thread.sleep()。

优势与劣势

优势:

  • 上面没有任何关于开启线程的代码,也就是上面的代码完全是一个线程完成的。它只用了一个线程,完成了之前需要成千上万个线程才能完成的任务。在并发量很大的时候,就可以有效避免C10K(client 10000)的问题的出现。

劣势:

  • 假设有一万个连接,那么在一次while中,我的for循环一共有一万次,然后每一次我都使用了read,也就是底层需要去使用read这个系统调用来进行操作。
  • 实际追踪一下就可以知道,只要clients中有N个连接(不管这些socket是不是有数据),底层就需要调用N次的read系统调用,这显然是非常耗时间的。但是其实实际中只有几个连接需要真正去遍历的,绝大部分我们都浪费了。

IO多路复用器

NIO的缺点就是你每次都需要对每一个连接都使用read系统调用,这是瓶颈所在。所以有了IO多路复用器,多路复用的意思,就是我所有的连接,都复用到一个系统调用上。

select和poll

多路复用器就是——select poll和epoll。注意,它们本质上还是同步的,同步的意思就是多路复用器只会告诉你,哪些是fd可以读了,但是真正的读取,还是程序自己去读取的。

所以select和poll,只需要用户态调用一次系统调用,然后让内核帮你去找到所有可读可写的fds。这里的遍历是发生在内核中的,没有用户态到内核态的变化,所以效率很高。

但是它也是有缺点的,因为你每次都需要准备好所有的fds,也就是每次都需要把这些fds传递到内核空间中,那解决办法不是呼之欲出了么:把fds放到内核中就行了。第二个缺点是每次你调用select和poll,都需要遍历全部的fd,那有没有办法不要全部遍历呢。为了解决这些缺点,epoll就闪亮登场了。

epoll

epoll就是为了解决上面的poll的缺点,所以首先,epoll会在内核开辟空间(具体来说就是一颗红黑树,也叫做等待队列),然后只要fd还活着,就会被放到这个等待队列(树)里。然后再开辟一块返回区(也叫做就绪队列),一旦fd有数据了,那么就复制一份扔到返回区里,程序只需要关注返回区就可以了。

这么做的好处是,机器可以充分利用多CPU,一颗CPU把fd搬运到返回区,另外一颗CPU把返回区的fd返还给用户,两者互不干扰,并行执行。

针对需要遍历所有的fd问题,使用的是中断的方法。当数据从IO进入到内存,操作系统产生中断,通过监听这个事件来达成监听数据,避免遍历所有的fd。

总体执行流程是:

  1. 创建socket,假设socket的文件描述符是3。
  2. 进行bind
  3. 进行listen

上面这三步是谁都省略不了的。

  1. 调用epoll_create,在内核中开辟一个空间,并且返回对应的epoll描述符,只会执行一次。假设内核空间的epoll文件描述符是7。
  2. epoll_ctl(7,ADD,3,accept) ,这个调用把之前创建的socket放到了内核空间的等待队列中,同时向内核的中断程序注册一个回调函数,监听fd=3的accept事件,一旦事件发生,就只需要把这个fd复制到就绪队列中去即可。当之后TCP连接建立,会产生中断,然后找到对应的事件,就会把fd放到就绪队列中去。
  3. 调用epoll_wait,获取就绪队列的信息,如果就绪队列中什么都没有,那就需要等待了(当然你也可以设置不等待)。

不多废话,下面上一段代码,然后根据代码来查看底层是怎么实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class Server {
private ServerSocketChannel server;
private Selector selector;
int port = 9797;

private void initServer() {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));

// selector 底层可能是select,可能是poll,可能是epoll
// 如果是select和poll,就是开辟了空间而已
// 如果是epoll,相当于执行了epoll_create
selector = Selector.open();

// 如果是select和poll,则相当于是在jvm中开辟了空间,并且把server对应的fd放了进去
// 如果是epoll,则相当于epoll_ctl,即把server对应的fd放到了内核空间
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}

}

private void start() throws IOException {
initServer();
System.out.println("服务器成功启动");
while (true) {
Set<SelectionKey> keys = selector.keys();
System.out.println("keys size = " + keys.size());


// 这里的selector.select()方法,如果是select和poll,就是把刚刚register的那些文件描述符传入进去
// 如果是epoll,则会调用epoll_wait
while (selector.select(500) > 0) {
// 进入循环的条件,是程序去询问内核有没有需要处理的fd,内核告诉你有
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
acceptHandler(key);
} else if (key.isReadable()) {
readHandler(key);
}else{
// 其它暂时不关心
}
}
}
}
}

private void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel client = channel.accept();
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(4096);

// 同理如果是poll那么就在jvm里面开辟空间并且把client的fd放进去,如果是epoll则放入到内核空间中
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("---------------------");
System.out.println(client.getRemoteAddress().toString());
System.out.println("---------------------");
} catch (IOException e) {
e.printStackTrace();
}

}

private void readHandler(SelectionKey key) {
try {
SocketChannel channel = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(4096);

// 同理如果是poll那么就在jvm里面开辟空间并且把client的fd放进去,如果是epoll则放入到内核空间中
channel.read(buffer);
buffer.flip();

String receiveData = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.println("receiveData:" + receiveData);

// 把读到的数据绑定到key中
key.attach("server message echo:" + receiveData);

} catch (IOException e) {
e.printStackTrace();
}

}

public static void main(String[] args) throws IOException {
new Server().start();
}
}

首先运行这段代码,然后观察服务器究竟是怎么运行的,我把主要的调用给抄下来了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 14   // 创建socket,此时的socket fd = 14
bind(14, {sa_family=AF_INET, sin_port=htons(9797), sin_addr=inet_addr("0.0.0.0")}, 16) = 0 // 进行绑定
listen(14, 50) = 0 // 监听
epoll_create(256) = 17 // 创建了epoll 返回了一个17作为epoll的fd
epoll_ctl(17, EPOLL_CTL_ADD, 14, {EPOLLIN, {u32=14, u64=15384323235067723790}}) = 0 // 把14放入到内核中

// 接下来根据我的代码,epoll_wait是不阻塞的,500毫秒就返回,但是epoll_wait是可以阻塞的

epoll_wait(17, [], 8192, 500) = 0 // 查看17的对应的fds
// 不停重复中.....

// 此时突然来了一个连接
accept(14, {sa_family=AF_INET, sin_port=htons(55175), sin_addr=inet_addr("183.144.74.56")}, [16]) = 18 // 来了一个18的连接
epoll_ctl(17, EPOLL_CTL_ADD, 18, {EPOLLIN, {u32=18, u64=13809169882207485970}}) = 0 // 把18放到内核中

// 接下来的所有都是这样的重复而已

不难看出epoll的逻辑是很简单的,首先就是通过epoll_create创建一个epoll,对应的就是去Linux内核中开辟空间;然后通过epoll_ctl把对应的需要”监听”的fd移动到内核中的等待队列中(只会移动一次),然后就调用epoll_wait,这个可以是阻塞的(因为只要有可以读取的,或者是有新连接就不会阻塞了),也可以是不阻塞的。然后系统会返回给你对应的fd的集合,你去里面进行处理就好了。

优势与劣势

优势很明显,单线程,且系统会返回给你对应的fd,你只需要根据自己的需要进行读写即可,而且java封装也很好,屏蔽了底层到底是epoll还是poll。

缺点就是如果epoll_wait返回之后,有大量的操作,可能会使两个epoll_wait的间隔时间越来越久,效率会不太好。

epoll在成熟软件中的应用

Nginx

打开Nginx之后,发现一共启动了9个线程,然后最主要的那个是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
epoll_create(25600)                     = 8
eventfd2(0, 0) = 9
epoll_ctl(8, EPOLL_CTL_ADD, 9, {EPOLLIN|EPOLLET, {u32=10449952, u64=10449952}}) = 0
socketpair(PF_LOCAL, SOCK_STREAM, 0, [10, 11]) = 0
epoll_ctl(8, EPOLL_CTL_ADD, 10, {EPOLLIN|EPOLLRDHUP|EPOLLET, {u32=10449952, u64=10449952}}) = 0
close(11) = 0
epoll_wait(8, [{EPOLLIN|EPOLLHUP|EPOLLRDHUP, {u32=10449952, u64=10449952}}], 1, 5000) = 1
close(10) = 0
mmap(NULL, 12292096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f6de4dfc000
mmap(NULL, 4919296, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f6de494b000
mmap(NULL, 4919296, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f6de449a000
epoll_ctl(8, EPOLL_CTL_ADD, 6, {EPOLLIN|EPOLLRDHUP, {u32=3839868944, u64=140109968031760}}) = 0
close(3) = 0
epoll_ctl(8, EPOLL_CTL_ADD, 7, {EPOLLIN|EPOLLRDHUP, {u32=3839869184, u64=140109968032000}}) = 0
epoll_wait(8, // ←此处不缺内容

可以看到用的就是epoll这一套,但是是阻塞的epoll_wait

redis

Redis稍微不同,因为它虽然是单线程,但是还是需要去处理诸如LRU之类的东西,所以它不可能像nginx一样阻塞,所以它的调用是这样的:

1
2
3
4
5
// 下面那段疯狂重复
epoll_wait(3, [], 10128, 100) = 0
open("/proc/24872/stat", O_RDONLY) = 5
read(5, "24872 (redis-server) R 24870 248"..., 4096) = 329
close(5) = 0

redis 3.0.6


连接Redis的系统调用:

1
2
3
4
5
6
epoll_wait(3, [{EPOLLIN, {u32=4, u64=4}}], 10128, 100) = 1
accept(4, {sa_family=AF_INET, sin_port=htons(34170), sin_addr=inet_addr("127.0.0.1")}, [16]) = 5
fcntl(5, F_GETFL) = 0x2 (flags O_RDWR)
fcntl(5, F_SETFL, O_RDWR|O_NONBLOCK) = 0
setsockopt(5, SOL_TCP, TCP_NODELAY, [1], 4) = 0
epoll_ctl(3, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=5}}) = 0

可以看到epoll_wait返回了1,然后Redis就调用了accept去获取了连接的客户端,此时fd是5(记住这个5,接下来全是它哦)

然后把fd=5搬动到了内核空间中。


然后我输入了auth,看看redis是怎么处理的:

1
2
3
4
5
6
epoll_wait(3, [{EPOLLIN, {u32=5, u64=5}}], 10128, 100) = 1
read(5, "*1\r\n$4\r\nauth\r\n", 16384) = 14
epoll_ctl(3, EPOLL_CTL_MOD, 5, {EPOLLIN|EPOLLOUT, {u32=5, u64=5}}) = 0
epoll_wait(3, [{EPOLLOUT, {u32=5, u64=5}}], 10128, 96) = 1
write(5, "-ERR wrong number of arguments f"..., 51) = 51
epoll_ctl(3, EPOLL_CTL_MOD, 5, {EPOLLIN, {u32=5, u64=5}}) = 0

首先照理是wait,然后发现返回了1,那么就美滋滋从fd=5的文件描述符中读取数据,读取到了我发送的auth(我记得前面后面那些东西是Redis帮你加的),然后它在下一个wait中,发送给了我错误信息(error) ERR wrong number of arguments for 'auth' command


然后是我要退出了,那么在命令行里输入exit,看看Redis的做法:

1
2
3
4
epoll_wait(3, [{EPOLLIN, {u32=5, u64=5}}], 10128, 100) = 1
read(5, "", 16384) = 0
epoll_ctl(3, EPOLL_CTL_DEL, 5, 0x7ffd37552ad0) = 0
close(5)

很简单,就是直接通过ctl把5这个文件描述符直接干掉了。

redis 6.0.6

emmm 好像是启动的时候多了两个线程?然后好像并没有变化…..

image-20200817014236682

总结

从最开始的BIO(如果要复现,请使用JDK1.4)到NIO,再到多路复用,其实这一步一步的进步,都是内核的进步,java只不过在上面封装了一层罢了。

复习完这部分,更让人深刻意识到操作系统和计算机组成原理的重要啊,毕竟java只是在linux内核进步的基础上,做了封装。