根据黑马程序员Netty教程 学习所做笔记
Channel由java.nio.channels 包定义的。Channel 表示IO 源与目标打开的连接 。
Channel 不同于传统的“流”。Channel 本身不能直接访问数据,Channel 只能与Buffer 进行交互
Channel就像是铁轨,Buffer就像是车厢,车厢中保存着数据
FileChannel
获取
不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法
通过 FileInputStream 获取的 channel 只能读
通过 FileOutputStream 获取的 channel 只能写
通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式 决定
r w rw
读取
会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1 表示到达了文件的末尾
1 int readBytes = channel.read(buffer);
写入
写入的正确姿势如下, SocketChannel
1 2 3 4 5 6 7 ByteBuffer buffer = ...;buffer.put(...); buffer.flip(); while (buffer.hasRemaining()) { channel.write(buffer); }
在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel
关闭
channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法
位置
获取当前位置
1 long pos = channel.position();
设置当前位置
1 2 long newPos = ...;channel.position(newPos);
设置当前位置时,如果设置为文件的末尾
这时读取会返回 -1
这时写入,会追加内容,但要注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)
大小
使用 size 方法获取文件的大小
强制写入
操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘 。
可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘
数据传输
transferTo()
我们使用Filechannel提供的**transferTo()**方法进行文件的拷贝。
**transferTo()**方法底层采用了操作系统的零拷贝
零拷贝(Zero Copy)指的是 避免在数据传输过程中多次复制数据而导致的性能损耗 的技术。
在传统 I/O 模型中,数据从磁盘读取到内核缓冲区后,又被复制到用户空间的应用程序缓冲区,然后再被复制到网络套接字的缓冲区,最终才能发送出去。这种复制操作会消耗很多 CPU 资源和时间,降低系统的吞吐量。而使用零拷贝技术可以避免这些复制操作,直接将数据从内核缓冲区传输到网络套接字缓冲区,减少了数据复制的次数,提高了数据传输的效率。
传统的拷贝磁盘=>内核缓冲=>用户应用缓冲=>网络socket缓冲=>发送
零拷贝内核缓冲=>网络socket缓冲
Java 的 NIO(New IO)库中的 FileChannel 类就是利用了零拷贝技术来提高文件读写的效率。FileChannel 提供了两个方法:transferTo 和 transferFrom ,
它们可以将数据从 FileChannel 直接传输到其他 Channel 或者从其他 Channel 直接传输到 FileChannel,避免了中间的数据复制过程。此外,FileChannel 还支持内存映射文件(MappedByteBuffer ),可以将文件直接映射到内存中,让应用程序对文件进行读写操作时可以像操作普通的内存一样高效地进行,也是一种典型的零拷贝技术应用。
使用了Gather Copy零拷贝
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 static void testTransform2 () { try ( FileChannel from = new FileInputStream ("data.txt" ).getChannel(); FileChannel to = new FileOutputStream ("./result/to.txt" ).getChannel() ) { from.transferTo(0 ,from.size(),to); } catch (Exception e) { e.printStackTrace(); log.error(e.getMessage()); } }
需要注意的是,transferTo(long position, long count, WritableByteChannel target )
方法虽然非常好用,并且利用了零拷贝技术,效率也非常高,但是对于一次传输的内容还是有大小限制的
在 Unix/Linux 系统上,FileChannel
的 transferTo()
和 transferFrom()
方法使用了 sendfile()
系统调用,该系统调用可以直接从文件描述符中将数据传输到网络套接字上,而不需要将数据复制到用户空间中。因此,在这种情况下,FileChannel
可以处理非常大的文件。
在 Windows 系统上,FileChannel
的 transferTo()
和 transferFrom()
方法使用了 TransmitFile()
或 WSASend()
等系统调用,这些系统调用也能够直接将数据从文件传输到套接字上,避免了数据复制的开销。但是需要注意的是,在 Windows 系统上,每次传输的数据量不能超过 2GB 。
我们通过查看transferTo()方法的源码也不难发现,虽然count参数是一个long 类型的变量,但是在实际执行过程中源码中有这样的一行代码
int icount = (int)Math.min(count, Integer.MAX_VALUE);
也就是说实际上count的传输的过程中,最大值就是Integer.MAX_VALUE=0x7fffffff=231 -1 = 2147483647B =>2GB
ByteBuffer每次传输的单位是Byte,因此计算的时候采用的是Byte而不是Bit ,故理论上transferTo()
方法的最大传输大小为2GB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public long transferTo (long position, long count, WritableByteChannel target) throws IOException { ensureOpen(); if (!target.isOpen()) throw new ClosedChannelException (); if (!readable) throw new NonReadableChannelException (); if (target instanceof FileChannelImpl && !((FileChannelImpl)target).writable) throw new NonWritableChannelException (); if ((position < 0 ) || (count < 0 )) throw new IllegalArgumentException (); long sz = size(); if (position > sz) return 0 ; int icount = (int )Math.min(count, Integer.MAX_VALUE); if ((sz - position) < icount) icount = (int )(sz - position); long n; if ((n = transferToDirectly(position, icount, target)) >= 0 ) return n; if ((n = transferToTrustedChannel(position, icount, target)) >= 0 ) return n; return transferToArbitraryChannel(position, icount, target); }
接下来我们通过实际情况来进行最大的传输测试
首先编写代码在bigData.txt中写入2GB的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static void writeBigFile () { ByteBuffer allocate = ByteBuffer.allocate(1024 ); byte [] bytes = new byte [1024 ]; Arrays.fill(bytes, (byte ) 97 ); allocate.put(bytes); allocate.flip(); try ( FileChannel from = new FileOutputStream ("./data/bigData.txt" ).getChannel(); ) { for (int j = 0 ; j < 1024 ; j++) { for (int i = 0 ; i < 2048 ; i++) { from.write(allocate); allocate.position(0 ); } } } catch (Exception e) { log.error(e.getMessage()); } }
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 static void testTransferSize () { try ( FileChannel from = new FileInputStream ("./data/bigData.txt" ).getChannel(); FileChannel to = new FileOutputStream ("./data/bigTo.txt" ).getChannel() ) { long l = from.transferTo(0 , from.size(), to); System.out.println(l); } catch (Exception e) { e.printStackTrace(); log.error(e.getMessage()); } }
打印结果 :
2147483647
不出所料,本次传输了2147483647 个字节的数据。
接着我们调整bigData.txt的大小为4GB
执行传输方法,打印结果为
2147483647
,查看资源管理器,为2G
需要注意的是,这里的 2GB 是指单次传输的数据量上限 ,并不代表 FileChannel
能够处理的最大文件大小。
FileChannel
可以处理非常大的文件,其最大文件大小取决于所使用的文件系统和操作系统的限制。
在现代操作系统和文件系统中,通常可以支持数百 GB 或甚至更大的文件。
那么既然单词传输会有大小限制,我们只需要把文件分片多次传输即可
已知public long transferTo(long position, long count, WritableByteChannel target)
方法每次最大传输2GB数据,同时返回每次传输的字节数,我们通过返回值即可完成大文件的分片传输
对之前的代码稍作修改即可
1 2 3 4 5 long size = from.size();for (long left = size; left > 0 ; ) { left -= from.transferTo(size-left,left, to); }
运行结果如下
当然也不只是可以传输文本类型的文件,对于图像文件也可以直接通过transfer()方法传输
1 2 3 4 5 6 7 8 9 10 11 12 13 static void transferImg () { try ( FileChannel from = new FileInputStream ("./img/img1.jpg" ).getChannel(); FileChannel to = new FileOutputStream ("./img/img1_img.jpg" ).getChannel() ) { long l = from.transferTo(0 , from.size(), to); System.out.println(l); } catch (Exception e) { e.printStackTrace(); log.error(e.getMessage()); } }
Path
jdk7 引入了 Path 和 Paths 类
Path 用来表示文件路径
Paths 是工具类,用来获取 Path 实例
1 2 3 4 5 6 7 Path source = Paths.get("1.txt" ); Path source = Paths.get("d:\\1.txt" ); Path source = Paths.get("d:/1.txt" ); Path projects = Paths.get("d:\\data" , "projects" );
例如目录结构如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 D:. ├─.idea ├─data ├─img └─src ├─main │ ├─java │ │ └─com │ │ └─dhx │ │ ├─c1 │ │ ├─c2 │ │ └─util │ └─resources └─test └─java
代码
1 2 3 Path path = Paths.get("d:\\data\\projects\\a\\..\\b" );System.out.println(path); System.out.println(path.normalize());
会输出
1 2 d:\data\projects\a\..\b d:\data\projects\b
Files
检查文件是否存在
1 2 Path path = Paths.get("helloword/data.txt" );System.out.println(Files.exists(path));
创建一级目录
1 2 Path path = Paths.get("helloword/d1" );Files.createDirectory(path);
如果目录已存在,会抛异常 FileAlreadyExistsException
不能一次创建多级目录,否则会抛异常 NoSuchFileException
创建多级目录用
1 2 Path path = Paths.get("helloword/d1/d2" );Files.createDirectories(path);
拷贝文件
1 2 3 4 Path source = Paths.get("helloword/data.txt" );Path target = Paths.get("helloword/target.txt" );Files.copy(source, target);
如果文件已存在,会抛异常 FileAlreadyExistsException
如果希望用 source 覆盖掉 target,需要用 StandardCopyOption 来控制
1 Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);
移动文件
1 2 3 4 Path source = Paths.get("helloword/data.txt" );Path target = Paths.get("helloword/data.txt" );Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
StandardCopyOption.ATOMIC_MOVE 保证文件移动的原子性
删除文件
1 2 3 Path target = Paths.get("helloword/target.txt" );Files.delete(target);
如果文件不存在,会抛异常 NoSuchFileException
删除目录
1 2 3 Path target = Paths.get("helloword/d1" );Files.delete(target);
如果目录还有内容,会抛异常 DirectoryNotEmptyException
遍历目录文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static void fileCount () throws IOException { AtomicLong fileCnt = new AtomicLong (0 ); AtomicLong dirCnt = new AtomicLong (0 ); Files.walkFileTree(Paths.get("D:\\j2ee_project" ), new SimpleFileVisitor <>() { @Override public FileVisitResult preVisitDirectory (Path dir, BasicFileAttributes attrs) throws IOException { log.info("进入目录: {}" ,dir); dirCnt.incrementAndGet(); return super .preVisitDirectory(dir,attrs); } @Override public FileVisitResult visitFile (Path file, BasicFileAttributes attrs) throws IOException { log.info(String.valueOf(file)); fileCnt.incrementAndGet(); return super .visitFile(file,attrs); } }); log.info("文件数量: {}" ,fileCnt); log.info("目录数量: {}" ,dirCnt); }
统计 jar 的数目
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 static void jarCount () throws IOException { AtomicLong fileCnt = new AtomicLong (0 ); AtomicLong jarCount = new AtomicLong (0 ); Files.walkFileTree(Paths.get("D:\\j2ee_project" ), new SimpleFileVisitor <>() { @Override public FileVisitResult preVisitDirectory (Path dir, BasicFileAttributes attrs) throws IOException { log.info("进入目录: {}" ,dir); return super .preVisitDirectory(dir,attrs); } @Override public FileVisitResult visitFile (Path file, BasicFileAttributes attrs) throws IOException { log.info(String.valueOf(file)); fileCnt.incrementAndGet(); if (file.toString().endsWith(".jar" )){ jarCount.incrementAndGet(); } return super .visitFile(file,attrs); } }); log.info("文件数量: {}" ,fileCnt); log.info("jar数量: {}" ,jarCount); }
删除多级目录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Path path = Paths.get("d:\\a" );Files.walkFileTree(path, new SimpleFileVisitor <Path>(){ @Override public FileVisitResult visitFile (Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); return super .visitFile(file, attrs); } @Override public FileVisitResult postVisitDirectory (Path dir, IOException exc) throws IOException { Files.delete(dir); return super .postVisitDirectory(dir, exc); } });
删除是危险操作,确保要递归删除的文件夹没有重要内容
拷贝多级目录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 long start = System.currentTimeMillis();String source = "D:\\Snipaste-1.16.2-x64" ;String target = "D:\\Snipaste-1.16.2-x64aaa" ;Files.walk(Paths.get(source)).forEach(path -> { try { String targetName = path.toString().replace(source, target); if (Files.isDirectory(path)) { Files.createDirectory(Paths.get(targetName)); } else if (Files.isRegularFile(path)) { Files.copy(path, Paths.get(targetName)); } } catch (IOException e) { e.printStackTrace(); } }); long end = System.currentTimeMillis();System.out.println(end - start);
网络编程
非阻塞 && 阻塞
阻塞
服务器端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void main (String[] args) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(16 ); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.bind(new InetSocketAddress (8888 )); List<SocketChannel> channels = new ArrayList <>(); while (true ){ log.debug("connecting...." ); SocketChannel sc = ssc.accept(); log.debug("connected...." ); channels.add(sc); for (SocketChannel channel : channels) { log.debug("before read...." ); channel.read(buffer); buffer.flip(); debugRead(buffer); buffer.clear(); log.debug("after read...." ); } } }
客户端
1 2 3 4 5 public static void main (String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress ("localhost" ,8888 )); System.out.println("waiting......" ); }
对于上面的代码,我们需要注意的是accept()方法以及channel.read()方法都会导致当前的线程阻塞,
阻塞模式下,相关方法都会导致线程暂停执行
ServerSocketChannel.accept()
会在没有连接建立时让线程暂停
SocketChannel.read()
会在没有数据可读时让线程暂停
阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
但多线程下,又会出现新的问题,如果我们为每个用户的情况都创建一个线程,那么由于计算机执行的特性,多个线程必定会引发大量的线程上下文切换的问题,导致效率执行较低。
如果我们使用线程池技术,比如ThreadPollExecutor
,虽然可以在一定程度上减少线程的上下文切换以及重复的创建与销毁问题,但是当请求数量过多的时候或者是线程池任务较少的时候,还是会出现问题。
接下来通过IDEA在DEBUG模式下进行评估表达式来验证ServerSocketChannel.accept()
的阻塞
首先运行server.java , 然后在debug模式下启动client.java
我们在client程序执行完socket连接之后,
通过评估表达式插入代码并且执行
我们插入代码给server发送信息
查看server的控制台输出情况
可以看到原本**阻塞的serve**r在接收到客户端发送消息之后执行了之后的代码并且进入到了下一轮的循环。
非阻塞
非阻塞模式下,相关方法都会不会让线程暂停!!
在 ServerSocketChannel.accept()
在没有连接建立时,会返回 null,继续运行
SocketChannel.read()
在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept()
写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去
但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,会浪费CPU资源
数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)
服务器端代码添加 ssc.configureBlocking(false);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static void main (String[] args) throws IOException, InterruptedException { ByteBuffer buffer = ByteBuffer.allocate(16 ); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false ); ssc.bind(new InetSocketAddress (8888 )); List<SocketChannel> channels = new ArrayList <>(); while (true ){ SocketChannel sc = ssc.accept(); if (sc!=null ){ log.debug("connected....{}" ,sc); sc.configureBlocking(false ); channels.add(sc); } for (SocketChannel channel : channels) { int read = channel.read(buffer); if (read>0 ){ buffer.flip(); debugRead(buffer); buffer.clear(); log.debug("after read...." ); } } } }
关于非阻塞CPU占用
开启server.java并且发送数据
多路复用
单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用
多路复用 (Multiplexing,又称“多任务 ”[来源请求] )是一个通信 和计算机网络 领域的专业术语 ,在没有歧义的情况下,“多路复用”也可被称为“复用”。多路复用通常表示在一个信道 上传输 多路信号 或数据流 的过程和技术。因为多路复用能够将多个低速信道集成到一个高速信道进行传输,从而有效地利用了高速信道。通过使用多路复用,通信运营商 可以避免维护多条线路,从而有效地节约运营成本[1] 。
–维基百科
如果学习过计算机网络的相关内容,那么应该会听说过TCP多路复用
TCP是工作在传输层的协议,TCP在源主机当中从不同的套接字中收集数据块,并为每一个数据块 封装上首部信息 (用于分解)从而生成报文段 ,然后将此报文段传递到网络层。称为多路复用(multiplexing) 。
多路复用仅针对网络 IO 、普通文件 IO 没法利用多路复用
如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用 功,而 Selector 能够保证
有可连接事件时才去连接
有可读事件才去读取
有可写事件才去写入
限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件
Selector(选择器)
好处
一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
让这个线程能够被充分利用
节约了线程的数量
减少了线程上下文切换
创建
1 Selector selector = Selector.open();
绑定 Channel 事件
也称之为注册事件,绑定的事件 selector 才会关心
1 2 channel.configureBlocking(false ); SelectionKey key = channel.register(selector, 绑定事件);
channel 必须工作在非阻塞模式
FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
绑定的事件类型可以有
connect - 客户端连接成功时触发
accept - 服务器端成功接受连接时触发
read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
监听 Channel 事件
可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件
方法1,阻塞直到绑定事件发生
1 int count = selector.select();
方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)
1 int count = selector.select(long timeout);
方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件
1 int count = selector.selectNow();
select 何时不阻塞
事件发生时
客户端发起连接请求,会触发 accept 事件
客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
channel 可写,会触发 write 事件
在 linux 下 nio bug 发生时
调用 selector.wakeup()
调用 selector.close()
selector 所在线程 interrupt
处理 accept 事件
客户端代码为
1 2 3 4 5 6 7 8 9 10 11 public class Client { public static void main (String[] args) { try (Socket socket = new Socket ("localhost" , 8080 )) { System.out.println(socket); socket.getOutputStream().write("world" .getBytes()); System.in.read(); } catch (IOException e) { e.printStackTrace(); } } }
服务器端代码为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Slf4j public class ChannelDemo6 { public static void main (String[] args) { try (ServerSocketChannel channel = ServerSocketChannel.open()) { channel.bind(new InetSocketAddress (8080 )); System.out.println(channel); Selector selector = Selector.open(); channel.configureBlocking(false ); channel.register(selector, SelectionKey.OP_ACCEPT); while (true ) { int count = selector.select(); log.debug("select count: {}" , count); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isAcceptable()) { ServerSocketChannel c = (ServerSocketChannel) key.channel(); SocketChannel sc = c.accept(); log.debug("{}" , sc); } iter.remove(); } } } catch (IOException e) { e.printStackTrace(); } } }
事件发生后能否不处理
事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发
处理 read 事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 try (ServerSocketChannel channel = ServerSocketChannel.open()) { channel.bind(new InetSocketAddress (8080 )); System.out.println(channel); Selector selector = Selector.open(); channel.configureBlocking(false ); channel.register(selector, SelectionKey.OP_ACCEPT); while (true ) { int count = selector.select(); log.debug("select count: {}" , count); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isAcceptable()) { ServerSocketChannel c = (ServerSocketChannel) key.channel(); SocketChannel sc = c.accept(); sc.configureBlocking(false ); sc.register(selector, SelectionKey.OP_READ); log.debug("连接已建立: {}" , sc); } else if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(128 ); int read = sc.read(buffer); if (read == -1 ) { key.cancel(); sc.close(); } else { buffer.flip(); debug(buffer); } } iter.remove(); } } } catch (IOException e) { e.printStackTrace(); }
开启两个客户端,修改一下发送文字,输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080] 21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1 21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - 连接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60367] 21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+ 21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1 21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - 连接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60378] 21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 77 6f 72 6c 64 |world | +--------+-------------------------------------------------+----------------+
注意, 对于 selectedKeys 集合, 我们在每次使用key来进行操作的时候 , 都需要进行 iter.remove()
因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如
第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常
cancel 的作用
cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件
⚠️ 不处理边界的问题
比如在UTF-8的编码下 , 一个汉字需要3个字节 , 假设我们的server读取信息的时候采用的byteBuffer只有4个Byte , 客户端此时发送了一个 你好 , 此时接收端收到了正确的数据 , 但是却无法进行正确的编码 , 此时就可能会出现问题
比如下面的代码
1 2 3 4 5 6 7 8 9 10 11 SocketChannel channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(4 );int read = channel.read(buffer);if (read == -1 ) { key.cancel(); } else { buffer.flip(); System.out.println(StandardCharsets.UTF_8.decode(buffer).toString()); }
客户端
1 2 3 4 5 6 7 public static void main (String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress ("127.0.0.1" ,8888 )); System.out.println("........" ); sc.write(StandardCharsets.UTF_8.encode("你好" )); sc.close(); }
输出
处理消息的边界
一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
另一种思路是按分隔符拆分,缺点是效率低
TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
Http 1.1 是 TLV 格式
Http 2.0 是 LTV 格式
server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public static void main (String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false ); SelectionKey sscKey = ssc.register(selector, 0 , null ); sscKey.interestOps(SelectionKey.OP_ACCEPT); log.debug("sscKey:{}" , sscKey); ssc.bind(new InetSocketAddress (8080 )); while (true ) { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); log.debug("key: {}" , key); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false ); ByteBuffer buffer = ByteBuffer.allocate(16 ); SelectionKey scKey = sc.register(selector, 0 , buffer); scKey.interestOps(SelectionKey.OP_READ); log.debug("{}" , sc); log.debug("scKey:{}" , scKey); } else if (key.isReadable()) { try { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); int read = channel.read(buffer); if (read == -1 ) { key.cancel(); } else { split(buffer); if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2 ); buffer.flip(); newBuffer.put(buffer); key.attach(newBuffer); } } } catch (IOException e) { e.printStackTrace(); key.cancel(); } } } } } static void split (ByteBuffer source) { source.flip(); for (int i = 0 ; i < source.limit(); i++) { if (source.get(i)=='\n' ){ int pos = source.position(); int length = i+1 -pos; ByteBuffer target = ByteBuffer.allocate(length); for (int j = 0 ; j < length; j++) { target.put(source.get()); } debugAll(target); } } source.compact(); }
客户端
1 2 3 4 5 6 7 SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress ("localhost" , 8080 )); SocketAddress address = sc.getLocalAddress();sc.write(Charset.defaultCharset().encode("0123\n456789abcdef" )); sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n" )); System.in.read();
关于ByteBuffer 大小分配
每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用
因此要为每个 channel 维护一个独立的 ByteBuffer
ByteBuffer 不能太大,比如一个 ByteBuffer 1 Mb 的话,加入我们的服务需要支持百万连接就要 1Tb 内存,因此设计大小可变的 ByteBuffer 是十分必要的
处理 write 事件
非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值 (代表实际写入字节数)
用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
如果不取消,会每次可写均会触发 write 事件
server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class WriteServer { public static void main (String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false ); ssc.bind(new InetSocketAddress (8080 )); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); while (true ) { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false ); SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ); StringBuilder sb = new StringBuilder (); for (int i = 0 ; i < 3000000 ; i++) { sb.append("a" ); } ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString()); int write = sc.write(buffer); System.out.println("实际写入字节:" + write); if (buffer.hasRemaining()) { sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE); sckey.attach(buffer); } } else if (key.isWritable()) { ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); int write = sc.write(buffer); System.out.println("实际写入字节:" + write); if (!buffer.hasRemaining()) { key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); key.attach(null ); } } } } } }
client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class WriteClient { public static void main (String[] args) throws IOException { Selector selector = Selector.open(); SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false ); sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); sc.connect(new InetSocketAddress ("localhost" , 8080 )); int count = 0 ; while (true ) { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isConnectable()) { System.out.println(sc.finishConnect()); } else if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 ); count += sc.read(buffer); buffer.clear(); System.out.println(count); } } } } }
只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注
多线程优化
目前都是基本上都是多核 cpu,设计时要充分利用上CPU的性能
前面的代码只有一个选择器,没有充分利用多核 cpu,如何改进呢?
分两组选择器
单线程配一个选择器,专门处理 accept 事件
创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件
Boss线程负责Accepted
Worker线程负责 write / read
server初始代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void main (String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); Thread.currentThread().setName("BOSS thread" ); ssc.configureBlocking(false ); Selector boss = Selector.open(); ssc.bind(new InetSocketAddress (8080 )); SelectionKey bossKey = ssc.register(boss, 0 , null ); bossKey.interestOps(SelectionKey.OP_ACCEPT); while (true ){ boss.select(); Iterator<SelectionKey> iter = boss.selectedKeys().iterator(); while (iter.hasNext()){ SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()){ SocketChannel sc = ssc.accept(); sc.configureBlocking(false ); } } } }
根据上面的思路 , 我们定义一个Boss Selector , 通过Boss 去 对应多个Worker 的Selector
定义Worker类如下
thread 表示执行任务的线程
selector 表示当前的worker专属的selector
name为worker标识
queue用来存储任务
这里的任务由于在server的代码中 , 是通过Boss 的线程来把 事件类型等信息去注册到Worker的 selector中 , 而work类实际在执行的时候又会使用其专有的线程 , 这里queue的作用是 两个线程之间的通信来保证 worker在执行的时候 , selector已经是初始化完毕了(否则出现NPE )
isRegistered 是标识是否已经register的变量 , 保证我们的线程以及selector只会被初始化一次
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 static class Worker implements Runnable { private Thread thread; private volatile Selector selector; private String name; private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue <>(); private volatile boolean isRegistered = false ; public Worker (String name) { this .name = name; } public void register (SocketChannel sc) throws IOException { if (!isRegistered) { thread = new Thread (this , this .name); selector = Selector.open(); isRegistered = true ; thread.start(); } queue.add(() -> { try { sc.register(selector, SelectionKey.OP_READ, null ); } catch (ClosedChannelException e) { throw new RuntimeException (e); } }); selector.wakeup(); } @Override public void run () { while (true ) { try { selector.select(); Runnable task = queue.poll(); if (task != null ) { task.run(); } Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16 ); SocketChannel sc = (SocketChannel) key.channel(); log.debug("read...{}" , sc.getRemoteAddress()); sc.read(buffer); buffer.flip(); debugAll(buffer); } key.cancel(); } } catch (IOException e) { throw new RuntimeException (e); } } }
Boss中的代码
定义了 Worker数组 , 提高效率 , 与上面的思路相对应
使用Round-robin轮询算法, 使得worker能够交替工作
AtomicInteger 是java中提供的原子操作类 , 其内部实现保证了我们对这个变量操作的原子性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public static void main (String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); Thread.currentThread().setName("BOSS thread" ); ssc.configureBlocking(false ); Selector boss = Selector.open(); ssc.bind(new InetSocketAddress (8080 )); SelectionKey bossKey = ssc.register(boss, 0 , null ); bossKey.interestOps(SelectionKey.OP_ACCEPT); Worker[] workers = new Worker [3 ]; for (int i = 0 ; i < workers.length; i++) { workers[i]=new Worker ("worker-" +i); } AtomicInteger index = new AtomicInteger (0 ); while (true ) { boss.select(); Iterator<SelectionKey> iter = boss.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false ); log.debug("connected...{}" , sc.getRemoteAddress()); log.debug("before register...{}" , sc.getRemoteAddress()); workers[index.getAndIncrement()%workers.length].register(sc); log.debug("after register...{}" , sc.getRemoteAddress()); } } } }
client代码
1 2 3 4 5 6 public static void main (String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress ( 8080 )); sc.write(StandardCharsets.UTF_8.encode("12345678456t3465456910\nabcdef1111" )); sc.close(); }
执行server , 多次运行client( idea 中右键debug)
测试结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 21:54:33.753 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - connected.../192.168.159.1:27866 21:54:33.758 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - before register.../192.168.159.1:27866 21:54:33.760 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - after register.../192.168.159.1:27866 21:54:33.761 [worker-0] DEBUG com.dhx.server.MultiThreadServer - read.../192.168.159.1:27866 21:54:33.770 [worker-0] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [16] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 31 32 33 34 35 36 37 38 34 35 36 74 33 34 36 35 |12345678456t3465| +--------+-------------------------------------------------+----------------+ 21:54:48.832 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - connected.../192.168.159.1:27885 21:54:48.832 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - before register.../192.168.159.1:27885 21:54:48.833 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - after register.../192.168.159.1:27885 21:54:48.833 [worker-1] DEBUG com.dhx.server.MultiThreadServer - read.../192.168.159.1:27885 +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [16] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 31 32 33 34 35 36 37 38 34 35 36 74 33 34 36 35 |12345678456t3465| +--------+-------------------------------------------------+----------------+ 21:55:00.726 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - connected.../192.168.159.1:27893 21:55:00.726 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - before register.../192.168.159.1:27893 21:55:00.727 [BOSS thread] DEBUG com.dhx.server.MultiThreadServer - after register.../192.168.159.1:27893 21:55:00.727 [worker-2] DEBUG com.dhx.server.MultiThreadServer - read.../192.168.159.1:27893 +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [16] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 31 32 33 34 35 36 37 38 34 35 36 74 33 34 36 35 |12345678456t3465| +--------+-------------------------------------------------+----------------+
优化问题
零拷贝
传统 IO 问题
传统的 IO 将一个文件通过 socket 写出
内部工作流程:
java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态 切换至内核态 ,使用read系统调用,将数据读入内核缓冲区 。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,期间不会使用 cpu
DMA ( Direct Memory Access ) 是计算机设备中的硬件 , 可以脱离CPU独自完成IO任务
从内核态 切换回用户态 ,将数据从内核缓冲区 读入用户缓冲区 (即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA
调用 write 方法,底层使用write系统调用 , 这时将数据从用户缓冲区 (byte[] buf)写入 socket 缓冲区 ,cpu 会参与拷贝
接下来要向网卡写数据,再次从用户态 切换至内核态 ,调用write系统调用,使用 DMA 将 socket 缓冲区 的数据写入网卡,不会使用 cpu
注意每一次使用系统调用都需要操作系统从 用户态切换到内核态。
操作系统在两种状态之间的切换是比较耗费时间的。
可以看到在这个过程中,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制 ,底层的真正读写是操作系统来完成的
用户态与内核态的切换发生了 3 次,这个操作比较重量级
数据拷贝了共 4 次
NIO 优化
DirectByteBuf
ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存 ( 实际是 byte[] )
ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存
大部分步骤与优化前相同,java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用
这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
通过专门线程访问引用队列,根据虚引用释放堆外内存
减少了一次数据拷贝,用户态与内核态的切换次数没有减少
linux 2.1 sendFile
java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
java 调用 transferTo 方法后,要从 java 程序的用户态 切换至内核态 ,使用 DMA将数据读入内核缓冲区 ,不会使用 cpu
数据从内核缓冲区 传输到 socket 缓冲区 ,cpu 会参与拷贝
最后使用 DMA 将 socket 缓冲区 的数据写入网卡,不会使用 cpu
这个过程中
只发生了一次用户态与内核态的切换
数据拷贝了 3 次
linux 2.4
这个过程中
java 调用 transferTo 方法后,要从 java 程序的用户态 切换至内核态 ,使用 DMA将数据读入内核缓冲区 ,不会使用 cpu
只会将一些 offset 和 length 信息拷入 socket 缓冲区 ,几乎无消耗
使用 DMA 将 内核缓冲区 的数据写入网卡,不会使用 cpu
整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝 】,并不是真正零次拷贝,而是在不会拷贝重复数据到 jvm 内存中
零拷贝优点
更少的用户态与内核态的切换
不利用 cpu 计算,减少 cpu 缓存伪共享
零拷贝适合小文件传输
AIO
AIO 用来解决数据复制阶段的阻塞问题
同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(Kernel)提供支持
Windows 系统通过 IOCP 实现了真正的异步 IO
Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势
文件 AIO
先来看看 AsynchronousFileChannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Slf4j public class AIODemo { public static void main (String[] args) throws IOException { try { AsynchronousFileChannel s = AsynchronousFileChannel.open( Paths.get("1.txt" ), StandardOpenOption.READ); ByteBuffer buffer = ByteBuffer.allocate(2 ); log.debug("begin..." ); s.read(buffer, 0 , null , new CompletionHandler <Integer, ByteBuffer>() { @Override public void completed (Integer result, ByteBuffer attachment) { log.debug("read completed...{}" , result); buffer.flip(); debug(buffer); } @Override public void failed (Throwable exc, ByteBuffer attachment) { log.debug("read failed..." ); } }); } catch (IOException e) { e.printStackTrace(); } log.debug("do other things..." ); System.in.read(); } }
输出
1 2 3 4 5 6 7 8 13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin... 13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things... 13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 0d |a. | +--------+-------------------------------------------------+----------------+
可以看到
响应文件读取成功的是另一个线程 Thread-5
主线程并没有 IO 操作阻塞
守护线程
默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read()
以避免守护线程意外结束
网络 AIO
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public class AioServer { public static void main (String[] args) throws IOException { AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open(); ssc.bind(new InetSocketAddress (8080 )); ssc.accept(null , new AcceptHandler (ssc)); System.in.read(); } private static void closeChannel (AsynchronousSocketChannel sc) { try { System.out.printf("[%s] %s close\n" , Thread.currentThread().getName(), sc.getRemoteAddress()); sc.close(); } catch (IOException e) { e.printStackTrace(); } } private static class ReadHandler implements CompletionHandler <Integer, ByteBuffer> { private final AsynchronousSocketChannel sc; public ReadHandler (AsynchronousSocketChannel sc) { this .sc = sc; } @Override public void completed (Integer result, ByteBuffer attachment) { try { if (result == -1 ) { closeChannel(sc); return ; } System.out.printf("[%s] %s read\n" , Thread.currentThread().getName(), sc.getRemoteAddress()); attachment.flip(); System.out.println(Charset.defaultCharset().decode(attachment)); attachment.clear(); sc.read(attachment, attachment, this ); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed (Throwable exc, ByteBuffer attachment) { closeChannel(sc); exc.printStackTrace(); } } private static class WriteHandler implements CompletionHandler <Integer, ByteBuffer> { private final AsynchronousSocketChannel sc; private WriteHandler (AsynchronousSocketChannel sc) { this .sc = sc; } @Override public void completed (Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()) { sc.write(attachment); } } @Override public void failed (Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); closeChannel(sc); } } private static class AcceptHandler implements CompletionHandler <AsynchronousSocketChannel, Object> { private final AsynchronousServerSocketChannel ssc; public AcceptHandler (AsynchronousServerSocketChannel ssc) { this .ssc = ssc; } @Override public void completed (AsynchronousSocketChannel sc, Object attachment) { try { System.out.printf("[%s] %s connected\n" , Thread.currentThread().getName(), sc.getRemoteAddress()); } catch (IOException e) { e.printStackTrace(); } ByteBuffer buffer = ByteBuffer.allocate(16 ); sc.read(buffer, buffer, new ReadHandler (sc)); sc.write(Charset.defaultCharset().encode("server hello!" ), ByteBuffer.allocate(16 ), new WriteHandler (sc)); ssc.accept(null , this ); } @Override public void failed (Throwable exc, Object attachment) { exc.printStackTrace(); } } }
参考内容