根据黑马程序员Netty教程学习所做笔记

Channel由java.nio.channels 包定义的。Channel 表示IO 源与目标打开的连接

Channel 不同于传统的“流”。Channel 本身不能直接访问数据,Channel 只能与Buffer 进行交互

Channel就像是铁轨,Buffer就像是车厢,车厢中保存着数据

Channel继承结构图

FileChannel

获取

不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法

  • 通过 FileInputStream 获取的 channel 只能读

  • 通过 FileOutputStream 获取的 channel 只能写

  • 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定

    r w rw

读取

会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1 表示到达了文件的末尾

1
int readBytes = channel.read(buffer);

写入

写入的正确姿势如下, SocketChannel

1
2
3
4
5
6
7
ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip(); // 切换读模式

while(buffer.hasRemaining()) {
channel.write(buffer);
}

在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel

关闭

channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法

位置

获取当前位置

1
long pos = channel.position();

设置当前位置

1
2
long newPos = ...;
channel.position(newPos);

设置当前位置时,如果设置为文件的末尾

  • 这时读取会返回 -1
  • 这时写入,会追加内容,但要注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)

大小

使用 size 方法获取文件的大小

强制写入

操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘

可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

数据传输

transferTo()

我们使用Filechannel提供的**transferTo()**方法进行文件的拷贝。

  • **transferTo()**方法底层采用了操作系统的零拷贝

零拷贝(Zero Copy)指的是避免在数据传输过程中多次复制数据而导致的性能损耗的技术。

在传统 I/O 模型中,数据从磁盘读取到内核缓冲区后,又被复制到用户空间的应用程序缓冲区,然后再被复制到网络套接字的缓冲区,最终才能发送出去。这种复制操作会消耗很多 CPU 资源和时间,降低系统的吞吐量。而使用零拷贝技术可以避免这些复制操作,直接将数据从内核缓冲区传输到网络套接字缓冲区,减少了数据复制的次数,提高了数据传输的效率。

  • 传统的拷贝磁盘=>内核缓冲=>用户应用缓冲=>网络socket缓冲=>发送
  • 零拷贝内核缓冲=>网络socket缓冲

Java 的 NIO(New IO)库中的 FileChannel 类就是利用了零拷贝技术来提高文件读写的效率。FileChannel 提供了两个方法:transferTo 和 transferFrom

它们可以将数据从 FileChannel 直接传输到其他 Channel 或者从其他 Channel 直接传输到 FileChannel,避免了中间的数据复制过程。此外,FileChannel 还支持内存映射文件(MappedByteBuffer),可以将文件直接映射到内存中,让应用程序对文件进行读写操作时可以像操作普通的内存一样高效地进行,也是一种典型的零拷贝技术应用。

使用了Gather Copy零拷贝

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
static void testTransform2() {
try (
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("./result/to.txt").getChannel()
) {
// 效率高, 带有transferTo的方法, 底层都会采用操作系统的零拷贝进行优化
from.transferTo(0,from.size(),to);
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
}
}

需要注意的是,transferTo(long position, long count, WritableByteChannel target )方法虽然非常好用,并且利用了零拷贝技术,效率也非常高,但是对于一次传输的内容还是有大小限制的

在 Unix/Linux 系统上,FileChanneltransferTo()transferFrom() 方法使用了 sendfile() 系统调用,该系统调用可以直接从文件描述符中将数据传输到网络套接字上,而不需要将数据复制到用户空间中。因此,在这种情况下,FileChannel 可以处理非常大的文件。

在 Windows 系统上,FileChanneltransferTo()transferFrom() 方法使用了 TransmitFile()WSASend() 等系统调用,这些系统调用也能够直接将数据从文件传输到套接字上,避免了数据复制的开销。但是需要注意的是,在 Windows 系统上,每次传输的数据量不能超过 2GB

我们通过查看transferTo()方法的源码也不难发现,虽然count参数是一个long 类型的变量,但是在实际执行过程中源码中有这样的一行代码

int icount = (int)Math.min(count, Integer.MAX_VALUE);

也就是说实际上count的传输的过程中,最大值就是Integer.MAX_VALUE=0x7fffffff=231-1 = 2147483647B =>2GB

ByteBuffer每次传输的单位是Byte,因此计算的时候采用的是Byte而不是Bit ,故理论上transferTo()方法的最大传输大小为2GB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public long transferTo(long position, long count,
WritableByteChannel target)
throws IOException
{
ensureOpen();
if (!target.isOpen())
throw new ClosedChannelException();
if (!readable)
throw new NonReadableChannelException();
if (target instanceof FileChannelImpl &&
!((FileChannelImpl)target).writable)
throw new NonWritableChannelException();
if ((position < 0) || (count < 0))
throw new IllegalArgumentException();
long sz = size();
if (position > sz)
return 0;
int icount = (int)Math.min(count, Integer.MAX_VALUE);
if ((sz - position) < icount)
icount = (int)(sz - position);

long n;

// Attempt a direct transfer, if the kernel supports it
if ((n = transferToDirectly(position, icount, target)) >= 0)
return n;

// Attempt a mapped transfer, but only to trusted channel types
if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
return n;

// Slow path for untrusted targets
return transferToArbitraryChannel(position, icount, target);
}

接下来我们通过实际情况来进行最大的传输测试

首先编写代码在bigData.txt中写入2GB的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static void writeBigFile() {
// 1k
ByteBuffer allocate = ByteBuffer.allocate(1024);
byte[] bytes = new byte[1024];
Arrays.fill(bytes, (byte) 97);
allocate.put(bytes);
allocate.flip();
try (
FileChannel from = new FileOutputStream("./data/bigData.txt").getChannel();
) {
//1k * 2048 * 1024 =>2GB
for (int j = 0; j < 1024; j++) {
// 一次2MB
for (int i = 0; i < 2048; i++) {
from.write(allocate);
allocate.position(0);
}
}

} catch (Exception e) {
log.error(e.getMessage());
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
static void testTransferSize() {
try (
FileChannel from = new FileInputStream("./data/bigData.txt").getChannel();
FileChannel to = new FileOutputStream("./data/bigTo.txt").getChannel()
) {
// 效率高, 带有transferTo的方法, 底层都会采用操作系统的零拷贝进行优化
long l = from.transferTo(0, from.size(), to);
System.out.println(l);
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
}
}

打印结果 :

2147483647

不出所料,本次传输了2147483647个字节的数据。

接着我们调整bigData.txt的大小为4GB

执行传输方法,打印结果为

2147483647,查看资源管理器,为2G

需要注意的是,这里的 2GB 是指单次传输的数据量上限,并不代表 FileChannel 能够处理的最大文件大小。

FileChannel 可以处理非常大的文件,其最大文件大小取决于所使用的文件系统和操作系统的限制。

在现代操作系统和文件系统中,通常可以支持数百 GB 或甚至更大的文件。

那么既然单词传输会有大小限制,我们只需要把文件分片多次传输即可

已知public long transferTo(long position, long count, WritableByteChannel target) 方法每次最大传输2GB数据,同时返回每次传输的字节数,我们通过返回值即可完成大文件的分片传输

对之前的代码稍作修改即可

1
2
3
4
5
long size = from.size();
// left 表示剩余多少字节
for (long left = size; left > 0; ) {
left -= from.transferTo(size-left,left, to);
}

运行结果如下

当然也不只是可以传输文本类型的文件,对于图像文件也可以直接通过transfer()方法传输

1
2
3
4
5
6
7
8
9
10
11
12
13
static void transferImg() {
try (
FileChannel from = new FileInputStream("./img/img1.jpg").getChannel();
FileChannel to = new FileOutputStream("./img/img1_img.jpg").getChannel()
) {
// 效率高, 带有transferTo的方法, 底层都会采用操作系统的零拷贝进行优化
long l = from.transferTo(0, from.size(), to);
System.out.println(l);
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
}
}

Path

jdk7 引入了 Path 和 Paths 类

  • Path 用来表示文件路径
  • Paths 是工具类,用来获取 Path 实例
1
2
3
4
5
6
7
Path source = Paths.get("1.txt"); // 相对路径 使用 user.dir 环境变量来定位 1.txt

Path source = Paths.get("d:\\1.txt"); // 绝对路径 代表了 d:\1.txt

Path source = Paths.get("d:/1.txt"); // 绝对路径 同样代表了 d:\1.txt

Path projects = Paths.get("d:\\data", "projects"); // 代表了 d:\data\projects
  • . 代表了当前路径
  • .. 代表了上一级路径

例如目录结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
D:.
├─.idea
├─data
├─img
└─src
├─main
│ ├─java
│ │ └─com
│ │ └─dhx
│ │ ├─c1
│ │ ├─c2
│ │ └─util
│ └─resources
└─test
└─java

代码

1
2
3
Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize()); // 正常化路径

会输出

1
2
d:\data\projects\a\..\b
d:\data\projects\b

Files

检查文件是否存在

1
2
Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));

创建一级目录

1
2
Path path = Paths.get("helloword/d1");
Files.createDirectory(path);
  • 如果目录已存在,会抛异常 FileAlreadyExistsException
  • 不能一次创建多级目录,否则会抛异常 NoSuchFileException

创建多级目录用

1
2
Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);

拷贝文件

1
2
3
4
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");

Files.copy(source, target);
  • 如果文件已存在,会抛异常 FileAlreadyExistsException

如果希望用 source 覆盖掉 target,需要用 StandardCopyOption 来控制

1
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);

移动文件

1
2
3
4
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");

Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
  • StandardCopyOption.ATOMIC_MOVE 保证文件移动的原子性

删除文件

1
2
3
Path target = Paths.get("helloword/target.txt");

Files.delete(target);
  • 如果文件不存在,会抛异常 NoSuchFileException

删除目录

1
2
3
Path target = Paths.get("helloword/d1");

Files.delete(target);
  • 如果目录还有内容,会抛异常 DirectoryNotEmptyException

遍历目录文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static void fileCount() throws IOException {
AtomicLong fileCnt = new AtomicLong(0);
AtomicLong dirCnt = new AtomicLong(0);
Files.walkFileTree(Paths.get("D:\\j2ee_project"), new SimpleFileVisitor<>() {
// 遍历之前
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
log.info("进入目录: {}",dir);
dirCnt.incrementAndGet();
return super.preVisitDirectory(dir,attrs);
}

// 遍历
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
log.info(String.valueOf(file));
fileCnt.incrementAndGet();
return super.visitFile(file,attrs);
}
});
log.info("文件数量: {}",fileCnt);
log.info("目录数量: {}",dirCnt);
}

统计 jar 的数目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void jarCount() throws IOException {
AtomicLong fileCnt = new AtomicLong(0);
AtomicLong jarCount = new AtomicLong(0);
Files.walkFileTree(Paths.get("D:\\j2ee_project"), new SimpleFileVisitor<>() {
// 遍历之前
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
log.info("进入目录: {}",dir);
return super.preVisitDirectory(dir,attrs);
}
// 遍历
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
log.info(String.valueOf(file));
fileCnt.incrementAndGet();
if(file.toString().endsWith(".jar")){
jarCount.incrementAndGet();
}
return super.visitFile(file,attrs);
}
});
log.info("文件数量: {}",fileCnt);
log.info("jar数量: {}",jarCount);
}

删除多级目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Path path = Paths.get("d:\\a");
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
Files.delete(file);
return super.visitFile(file, attrs);
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc)
throws IOException {
Files.delete(dir);
return super.postVisitDirectory(dir, exc);
}
});

删除是危险操作,确保要递归删除的文件夹没有重要内容

拷贝多级目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
long start = System.currentTimeMillis();
String source = "D:\\Snipaste-1.16.2-x64";
String target = "D:\\Snipaste-1.16.2-x64aaa";

Files.walk(Paths.get(source)).forEach(path -> {
try {
String targetName = path.toString().replace(source, target);
// 是目录
if (Files.isDirectory(path)) {
Files.createDirectory(Paths.get(targetName));
}
// 是普通文件
else if (Files.isRegularFile(path)) {
Files.copy(path, Paths.get(targetName));
}
} catch (IOException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(end - start);

网络编程

非阻塞 && 阻塞

阻塞

服务器端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(16); //16B
// 1. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8888));

List<SocketChannel> channels = new ArrayList<>();
while(true){
// 3. accept 与客户端建立连接 , socketChannel用来与客户端进行通信
log.debug("connecting....");
SocketChannel sc = ssc.accept(); // 默认是一个阻塞方法,会让线程暂停
log.debug("connected....");
channels.add(sc);
for (SocketChannel channel : channels) {
log.debug("before read....");
channel.read(buffer); // 也会让线程阻塞
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read....");
}
}
}

客户端

1
2
3
4
5
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8888));
System.out.println("waiting......");
}

对于上面的代码,我们需要注意的是accept()方法以及channel.read()方法都会导致当前的线程阻塞,

阻塞模式下,相关方法都会导致线程暂停执行

  • ServerSocketChannel.accept() 会在没有连接建立时让线程暂停
  • SocketChannel.read() 会在没有数据可读时让线程暂停
  • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置

单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持

但多线程下,又会出现新的问题,如果我们为每个用户的情况都创建一个线程,那么由于计算机执行的特性,多个线程必定会引发大量的线程上下文切换的问题,导致效率执行较低。

如果我们使用线程池技术,比如ThreadPollExecutor,虽然可以在一定程度上减少线程的上下文切换以及重复的创建与销毁问题,但是当请求数量过多的时候或者是线程池任务较少的时候,还是会出现问题。

接下来通过IDEA在DEBUG模式下进行评估表达式来验证ServerSocketChannel.accept()的阻塞

首先运行server.java , 然后在debug模式下启动client.java

我们在client程序执行完socket连接之后,

通过评估表达式插入代码并且执行

我们插入代码给server发送信息

查看server的控制台输出情况

可以看到原本**阻塞的serve**r在接收到客户端发送消息之后执行了之后的代码并且进入到了下一轮的循环。

非阻塞

非阻塞模式下,相关方法都会不会让线程暂停!!

  • ServerSocketChannel.accept() 在没有连接建立时,会返回 null,继续运行
  • SocketChannel.read() 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept()
  • 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去

但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,会浪费CPU资源

数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)

服务器端代码添加 ssc.configureBlocking(false);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) throws IOException, InterruptedException {
ByteBuffer buffer = ByteBuffer.allocate(16); //16B

// 1. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8888));

List<SocketChannel> channels = new ArrayList<>();
while(true){
// 3. accept 与客户端建立连接 , socketChannel用来与客户端进行通信
SocketChannel sc = ssc.accept(); // 默认是一个阻塞方法,会让线程暂停
if(sc!=null){
log.debug("connected....{}",sc);
sc.configureBlocking(false);
channels.add(sc);
}
for (SocketChannel channel : channels) {
int read = channel.read(buffer);// 也会让线程阻塞
if(read>0){
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read....");
}
}
}
}

关于非阻塞CPU占用

开启server.java并且发送数据

多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

多路复用(Multiplexing,又称“多任务”[来源请求])是一个通信计算机网络领域的专业术语,在没有歧义的情况下,“多路复用”也可被称为“复用”。多路复用通常表示在一个信道传输多路信号数据流的过程和技术。因为多路复用能够将多个低速信道集成到一个高速信道进行传输,从而有效地利用了高速信道。通过使用多路复用,通信运营商可以避免维护多条线路,从而有效地节约运营成本[1]

–维基百科

如果学习过计算机网络的相关内容,那么应该会听说过TCP多路复用

  • TCP是工作在传输层的协议,TCP在源主机当中从不同的套接字中收集数据块,并为每一个数据块封装上首部信息(用于分解)从而生成报文段,然后将此报文段传递到网络层。称为多路复用(multiplexing)

多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用

如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用 功,而 Selector 能够保证

  • 有可连接事件时才去连接
  • 有可读事件才去读取
  • 有可写事件才去写入
    • 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件

Selector(选择器)

好处

  • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

创建

1
Selector selector = Selector.open();

绑定 Channel 事件

也称之为注册事件,绑定的事件 selector 才会关心

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);
  • channel 必须工作在非阻塞模式
  • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
  • 绑定的事件类型可以有
    • connect - 客户端连接成功时触发
    • accept - 服务器端成功接受连接时触发
    • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
    • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

监听 Channel 事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件

方法1,阻塞直到绑定事件发生

1
int count = selector.select();

方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)

1
int count = selector.select(long timeout);

方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

1
int count = selector.selectNow();

select 何时不阻塞

  • 事件发生时
    • 客户端发起连接请求,会触发 accept 事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
    • channel 可写,会触发 write 事件
    • 在 linux 下 nio bug 发生时
  • 调用 selector.wakeup()
  • 调用 selector.close()
  • selector 所在线程 interrupt

处理 accept 事件

客户端代码为

1
2
3
4
5
6
7
8
9
10
11
public class Client {
public static void main(String[] args) {
try (Socket socket = new Socket("localhost", 8080)) {
System.out.println(socket);
socket.getOutputStream().write("world".getBytes());
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}

服务器端代码为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }

// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();

// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
log.debug("{}", sc);
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

事件发生后能否不处理

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

处理 read 事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
int count = selector.select();
log.debug("select count: {}", count);

// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();

// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
log.debug("连接已建立: {}", sc);
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if(read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
debug(buffer);
}
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}

开启两个客户端,修改一下发送文字,输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - 连接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60367]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - 连接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60378]
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 6f 72 6c 64 |world |
+--------+-------------------------------------------------+----------------+

注意, 对于 selectedKeys 集合, 我们在每次使用key来进行操作的时候 , 都需要进行 iter.remove()

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

⚠️ 不处理边界的问题

比如在UTF-8的编码下 , 一个汉字需要3个字节 , 假设我们的server读取信息的时候采用的byteBuffer只有4个Byte , 客户端此时发送了一个 你好 , 此时接收端收到了正确的数据 , 但是却无法进行正确的编码 , 此时就可能会出现问题

比如下面的代码

1
2
3
4
5
6
7
8
9
10
11
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(4);
int read = channel.read(buffer);
// 如果是正常断开, 那么read == -1
if (read == -1) {
key.cancel();
} else {
buffer.flip(); // 切换读写模式
// debugRead(buffer);
System.out.println(StandardCharsets.UTF_8.decode(buffer).toString());
}

客户端

1
2
3
4
5
6
7
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1",8888));
System.out.println("........");
sc.write(StandardCharsets.UTF_8.encode("你好"));
sc.close();
}

输出

1
2
你�
��

处理消息的边界

  • 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
  • 另一种思路是按分隔符拆分,缺点是效率低
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式
TCP 报文段的首部格式

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public static void main(String[] args) throws IOException {
// 1. 创建 selector, 管理多个 channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2. 建立 selector 和 channel 的联系(注册)
// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("sscKey:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true) {
// 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
// select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
selector.select();
// 4. 处理事件, selectedKeys 内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
iter.remove();
log.debug("key: {}", key);
// 5. 区分事件类型
if (key.isAcceptable()) { // 如果是 accept
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
// 将一个 byteBuffer 作为附件关联到 selectionKey 上
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
log.debug("scKey:{}", scKey);
} else if (key.isReadable()) { // 如果是 read
try {
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
// 获取 selectionKey 上关联的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
if(read == -1) {
key.cancel();
} else {
split(buffer);
// 需要扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer); // 0123456789abcdef3333\n
key.attach(newBuffer);
}
}

} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
}
}
}
}
}


static void split(ByteBuffer source) {
source.flip(); // 切换成读模式
for (int i = 0; i < source.limit(); i++) {
// length表示本次循环需要读取的长度 len = l-r+1
if(source.get(i)=='\n'){
// 此时找到了一句完整的话
int pos = source.position();
int length = i+1-pos;
ByteBuffer target = ByteBuffer.allocate(length);
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
source.compact(); // 注意切换成写模式 , 需要把前面的读取过的字符给压到前面 , 然后再进行写入 , 否则空间不够
}

客户端

1
2
3
4
5
6
7
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();

关于ByteBuffer 大小分配

每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用

因此要为每个 channel 维护一个独立的 ByteBuffer

ByteBuffer 不能太大,比如一个 ByteBuffer 1 Mb 的话,加入我们的服务需要支持百万连接就要 1Tb 内存,因此设计大小可变的 ByteBuffer 是十分必要的

  • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
  • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

处理 write 事件

非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)

  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
    • 如果不取消,会每次可写均会触发 write 事件

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class WriteServer {

public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));

Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);

while(true) {
selector.select();

Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客户端发送内容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);
// 3. write 表示实际写了多少字节
System.out.println("实际写入字节:" + write);
// 4. 如果有剩余未读字节,才需要关注写事件
if (buffer.hasRemaining()) {
// read 1 write 4
// 在原有关注事件的基础上,多关注 写事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作为附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println("实际写入字节:" + write);
if (!buffer.hasRemaining()) { // 写完了
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class WriteClient {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
sc.connect(new InetSocketAddress("localhost", 8080));
int count = 0;
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isConnectable()) {
System.out.println(sc.finishConnect());
} else if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
buffer.clear();
System.out.println(count);
}
}
}
}
}

只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注

多线程优化

目前都是基本上都是多核 cpu,设计时要充分利用上CPU的性能

前面的代码只有一个选择器,没有充分利用多核 cpu,如何改进呢?

分两组选择器

  • 单线程配一个选择器,专门处理 accept 事件
  • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件
  • Boss线程负责Accepted
  • Worker线程负责 write / read

server初始代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
Thread.currentThread().setName("BOSS thread");

ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.bind(new InetSocketAddress(8080));
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT); // care Accept Event

while(true){
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if(key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
}
}

}
}

根据上面的思路 , 我们定义一个Boss Selector , 通过Boss 去 对应多个Worker 的Selector

定义Worker类如下

  • thread 表示执行任务的线程

  • selector 表示当前的worker专属的selector

  • name为worker标识

  • queue用来存储任务

    这里的任务由于在server的代码中 , 是通过Boss 的线程来把 事件类型等信息去注册到Worker的 selector中 , 而work类实际在执行的时候又会使用其专有的线程 , 这里queue的作用是 两个线程之间的通信来保证 worker在执行的时候 , selector已经是初始化完毕了(否则出现NPE)

  • isRegistered 是标识是否已经register的变量 , 保证我们的线程以及selector只会被初始化一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
static class Worker implements Runnable {
private Thread thread;

private volatile Selector selector;

private String name;

/* transfer data between theads*/
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

private volatile boolean isRegistered = false;

public Worker(String name) {
this.name = name;
}

/* init thread and selector (just execute once) */
public void register(SocketChannel sc) throws IOException {
if (!isRegistered) {
thread = new Thread(this, this.name);
selector = Selector.open();
isRegistered = true;
thread.start();
}
/* add a task to queue but not execute at once */
queue.add(() -> {
try {
/* register to socketChannel (solve the multiThread problem)*/
sc.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
});
selector.wakeup();// wake up selector
}

@Override
public void run() {
while (true) {
try {
selector.select();
Runnable task = queue.poll();
/* notice : run is not like Thread,run a task won't create a new thread */
if (task != null) {
task.run();/* run task ( sc.register() )*/
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
// just care the read Event
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
/* temporarily ignoring details */
SocketChannel sc = (SocketChannel) key.channel();
log.debug("read...{}", sc.getRemoteAddress());
sc.read(buffer);
buffer.flip();
debugAll(buffer);
}
key.cancel();
}

} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

Boss中的代码

  • 定义了 Worker数组 , 提高效率 , 与上面的思路相对应
  • 使用Round-robin轮询算法, 使得worker能够交替工作
  • AtomicInteger 是java中提供的原子操作类 , 其内部实现保证了我们对这个变量操作的原子性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
Thread.currentThread().setName("BOSS thread");

ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.bind(new InetSocketAddress(8080));
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT); // care Accept Event

/* 1. create a fixed number of workers*/
Worker[] workers = new Worker[3];
for (int i = 0; i < workers.length; i++) {
workers[i]=new Worker("worker-"+i);
}

AtomicInteger index = new AtomicInteger(0);

while (true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}", sc.getRemoteAddress());
/* 2. associate workers' selector*/
log.debug("before register...{}", sc.getRemoteAddress());
/* round-robin algorithm*/
workers[index.getAndIncrement()%workers.length].register(sc); // run in BOSS thread
log.debug("after register...{}", sc.getRemoteAddress());
}
}

}
}

client代码

1
2
3
4
5
6
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress( 8080));
sc.write(StandardCharsets.UTF_8.encode("12345678456t3465456910\nabcdef1111"));
sc.close();
}

执行server , 多次运行client( idea 中右键debug)

测试结果

  • 可以看到worker交替完成了读取工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
21:54:33.753 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - connected.../192.168.159.1:27866
21:54:33.758 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - before register.../192.168.159.1:27866
21:54:33.760 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - after register.../192.168.159.1:27866
21:54:33.761 [worker-0] DEBUG com.dhx.server.MultiThreadServer - read.../192.168.159.1:27866
21:54:33.770 [worker-0] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [16]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 32 33 34 35 36 37 38 34 35 36 74 33 34 36 35 |12345678456t3465|
+--------+-------------------------------------------------+----------------+
21:54:48.832 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - connected.../192.168.159.1:27885
21:54:48.832 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - before register.../192.168.159.1:27885
21:54:48.833 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - after register.../192.168.159.1:27885
21:54:48.833 [worker-1] DEBUG com.dhx.server.MultiThreadServer - read.../192.168.159.1:27885
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [16]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 32 33 34 35 36 37 38 34 35 36 74 33 34 36 35 |12345678456t3465|
+--------+-------------------------------------------------+----------------+
21:55:00.726 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - connected.../192.168.159.1:27893
21:55:00.726 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - before register.../192.168.159.1:27893
21:55:00.727 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - after register.../192.168.159.1:27893
21:55:00.727 [worker-2] DEBUG com.dhx.server.MultiThreadServer - read.../192.168.159.1:27893
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [16]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 32 33 34 35 36 37 38 34 35 36 74 33 34 36 35 |12345678456t3465|
+--------+-------------------------------------------------+----------------+

优化问题

零拷贝

传统 IO 问题

传统的 IO 将一个文件通过 socket 写出

image-20230626103834686

内部工作流程:

image-20230626104450253
  1. java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,使用read系统调用,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,期间不会使用 cpu

    DMA ( Direct Memory Access ) 是计算机设备中的硬件 , 可以脱离CPU独自完成IO任务

  2. 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA

  3. 调用 write 方法,底层使用write系统调用 , 这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝

  4. 接下来要向网卡写数据,再次从用户态切换至内核态,调用write系统调用,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

注意每一次使用系统调用都需要操作系统从 用户态切换到内核态。

操作系统在两种状态之间的切换是比较耗费时间的。

可以看到在这个过程中,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次

NIO 优化

DirectByteBuf
  • ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存 ( 实际是 byte[] )
  • ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存

大部分步骤与优化前相同,java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用

  • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
linux 2.1 sendFile

java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

这个过程中

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了 3 次
linux 2.4

这个过程中

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正零次拷贝,而是在不会拷贝重复数据到 jvm 内存中

零拷贝优点

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输

AIO

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

文件 AIO

先来看看 AsynchronousFileChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class AIODemo {
public static void main(String[] args) throws IOException {
try{
AsynchronousFileChannel s =
AsynchronousFileChannel.open(
Paths.get("1.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(2);
log.debug("begin...");
s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed...{}", result);
buffer.flip();
debug(buffer);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.debug("read failed...");
}
});

} catch (IOException e) {
e.printStackTrace();
}
log.debug("do other things...");
System.in.read();
}
}

输出

1
2
3
4
5
6
7
8
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin...
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things...
13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0d |a. |
+--------+-------------------------------------------------+----------------+

可以看到

  • 响应文件读取成功的是另一个线程 Thread-5
  • 主线程并没有 IO 操作阻塞

守护线程

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束

网络 AIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class AioServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
}

private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}

private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}

private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;

public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}

@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}

@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}

参考内容