Java Nio 之高级搬运工(FileChannel)二

Java Nio 之高级搬运工(FileChannel)二

are you ok?

前言

前段时间同事分享了一篇文章给我:为什么Kafka速度这么快? ,这篇文章相信大家也都看了。这篇文章说Kafka 有个作弊的技能 :直接从文件某个位置处读取某个长度的字节直接发送给消费者,不需要读到应用程序里然后缓存在ByteBuffer 然后再往 客户端写;当时就对这项技术很着迷,上网搜了很多资料 很纳闷它是怎么实现的;上周在介绍FileChannel 的时候本来想只写一篇文章的,后来看到了它的map 、transeferTo以及TranseferFrom 方法就觉得一篇文章写不完,因为冗长的文章谁都不想看,所以另写一篇来研究一下 FileChannel 的高性能之处,以及介绍下Kafka是怎么使用的。

谈谈零拷贝

牢骚一下

Kafka 的高性能的重要点之一就在零拷贝上。零拷贝不是真的零拷贝,只不过是减少了拷贝的次数,为的不是减少DMA的拷贝次数,而是CPU 的拷贝次数,为啥呢?因为拷贝是个很简单的操作,占着CPU 的时间片简直就是高射炮打蚊子。

传统 Linux 服务器 传输数据 的流程

  • 1.应用程序调用系统方法read(),切换上下文:用户——内核,操作系统会先检查页面缓存里是否有要read 的内容,如果有则进行第二步,如果没有则需要让DMA 从指定磁盘位置上拷贝数据到内核缓冲区中,第一次拷贝由DMA 执行
  • 2.CPU 将数据从内核缓冲区拷贝到用户缓冲区,read 调用返回,切换上下文:内核——用户,第二次拷贝由CPU 执行
  • 3.应用程序调用系统write() 函数 ,切换上下文:用户——内核,CPU 将用户缓冲区的数据拷贝到socket 缓冲区,第三次拷贝由CPU执行
  • 4.write 调用返回,切换上下文:内核——用户,然后DMA 异步将socket 缓冲区的数据拷贝到协议引擎中
  • 总结一下,需要4次拷贝,其中有两次是需要CPU的执行,切换了4次上下文

零拷贝的两种实现方式

mmap + write 方式

何为mmap 呢—— 将一个文件或者其它对象映射进内存。映射到的这块内存区域在用户程序使用的内存空间 和 栈之间不在内核内存空间, 因此内核程序和用户程序都可以访问,如下草图:


Java Nio 之高级搬运工(FileChannel)二


mmap 、 munmap 和msync()函数

void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
  • addr 映射区的开始位置
  • length 映射区的长度
  • prot 期望的内存保护标志,可由如下几种方式组合:
  • PROT_EXEC 页内容可被执行
  • PROT_READ 页内容可被读取
  • PROT_WRITE 页内容可被写入
  • PROT_NONE 页内容不可以被访问
  • flags 影响内存区域的各种特性,可由以下几种方式组合:
  • MAP_FIXED 使用指定的起始位置,若是addr和length 重叠于现存的映射空间则重叠部分会丢失,不会对地址做出修正,不建议使用该选项。
  • MAP_SHARED 对该映射区域的更改会同步到文件里,而且允许其它映射该文件的进程共享该映射区域
  • MAP_PRIVATE 对映射区域的写入操作会产生一个映射文件的复制,即私有的“写入时复制”(copy on write)对此区域作的任何修改都不会写回原来的文件内容。
  • MAP_NORESERVE 不要为这个映射保留交换空间。当交换空间被保留,对映射区修改的可能会得到保证。当交换空间不被保留,同时内存不足,对映射区的修改会引起段违例信号
  • MAP_ANONYMOUS 匿名映射,不与任何文件关联
  • fd 被映射对象,若为匿名映射则为-1
  • offset 被映射对象的起始偏移
int munmap(void *addr, size_t length);
调用该函数可以解除 映射对象与addr 处开始的length 长度的内存空间的映射关系

addr mmap 函数返回的映射区域首地址

length 映射区域的长度

int msync ( void * addr , size_t len, int flags) ;
一般情况下 对映射空间的共享内容更改不会直接写到文件里,当然执行完 munmap 函数也可以,除了执行它,还可以执行 msync 函数来将修改的共享内容同步到文件

说说mmap + write 的 流程

  • 用户程序调用mmap函数,将 文件内容映射到内存映射区域。先由内存空间切换到内核空间,然后由内核空间切换到用户空间,完成两次上下文切换,DMA 将文件内容拷贝到内存映射区域
  • 调用write 函数,cpu将 内存映射区域的内容拷贝到 socket缓冲区,程序调用返回然后DMA 异步从socket 缓冲区拷贝到协议引擎的缓冲区
  • 发生 1次cpu 拷贝,2次DMA 拷贝

来看看Java 下的mmap

抽象类 MappedByteBuffer

定义

直接字节缓冲区,其内容是文件的内存映射区域。可由FileChannel#map 方法创建。该类通过增加对内存映射区域的特定操作扩展了ByteFuffer 类。映射字节缓冲区与它所映射的文件直到它自己被垃圾回收之前都是存在的。

tips

映射字节缓冲区的内容任何时候都可以被修改,例如,映射文件对应的区域被当前程序或者其他程序所更改。至于是否发生或者什么时候发生,都由操作系统来决定。

映射字节缓冲区的部分或者全部在任何时候都会变得不可访问,例如,映射的文件被截断了。尝试访问不可访问的映射字节的缓冲区的那一部分,将会有不友好的异常抛出。需要强烈的提醒,避免让当前程序或者其他程序对这个映射文件进行操作,除了读或者写它的内容。

方法

load() 该方法会尽最大可能将映射文件里的内容加载到物理内存中,可能会在加载的时候导致一些页面错误和IO操作。

isLoad() 返回 映射文件内容是否驻留在物理内存中。

force() 对映射内存区域的写入,并不会直接同步到文件中, 在解除映射关系的时候修改的内容才会同步到文件中。 调用该方法会将对映射区域的修改同步到磁盘,这就与上面的方法msync方法对应。

FileChannel # map 方法

方法签名

public abstract MappedByteBuffer map(MapMode mode,long position, long size) throws IOException;

参数小解

mode 为MapMode 中的READ_ONLY,READ_WRITE,PRIVATE中的其中一个,分别表示 只读,可读可写和写时复制与上述 mmap 方法中flags参数对应

position 从文件的哪里开始映射,对应上述 mmap 方法 中的offset参数

size 从文件position处开始映射多少个字节

Java 语言实现 mmap+write

简述:将文件a.txt 中的0到14个字节发给服务端

package zym.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
/**
 * file channel map study
 *
 * @author 24160
 */
public class FileChannelMapStudy {
 public static final String FILE_CHANNEL_MAP_STUDY_TXT = "a.txt";
 public static final int INT_BYTES_LENGTH = 4;
 public static void main(String[] args) {
 prepareEnviroment();
 try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.READ)) {
 long size = fileChannel.size();
 //将a.txt 文件映射到内存缓冲区,从0位置处映射,映射10个字节长度,该映射内存缓冲区只可读
 MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 14);
 //创建一个SocketChannel实例
 SocketChannel client = SocketChannel.open();
 //连接服务端
 client.connect(new InetSocketAddress("127.0.0.1", 8080));
 //写文件内容到服务端
 client.write(mappedByteBuffer);
 //读取文件内容 网络协议为 head + body 如6zengyi
 ByteBuffer head = ByteBuffer.allocate(INT_BYTES_LENGTH);
 while (client.read(head) != 0) {}
 //切换读写模式
 head.flip();
 //读取body
 ByteBuffer body = ByteBuffer.allocate(head.getInt());
 while (client.read(body) != 0) {}
 //切换读写模式
 body.flip();
 System.out.println(String.format("发送字节成功,服务端返回:%s", new String(body.array())));
 } catch (IOException e) {
 e.printStackTrace();
 }
 }
 private static void prepareEnviroment() {
 try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.CREATE,StandardOpenOption.READ, StandardOpenOption.WRITE)) {
 //将a.txt 映射文件到内存映射区域,模式为可读可写
 MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 14);
 //放进去一个int 为10
 mappedByteBuffer.putInt(10);
 mappedByteBuffer.put("zengyiming".getBytes());
 //强制刷盘
 mappedByteBuffer.force();
 } catch (IOException e) {
 e.printStackTrace();
 }
 }
}

服务端代码详见:
https://github.com/241600489/homeworks/blob/master/src/main/java/zym/netty/nio/NioServer.java

下面我们来看看kafka 是如何使用mmap,kafka AbstractIndex.scala 代码片段

 @volatile
 protected var mmap: MappedByteBuffer = {
 val newlyCreated = file.createNewFile()
 val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
 try {
 /* 如果是新创建则给file 预留分配空间 maxIndexSize 不超过50MB 单位为字节 */
 if(newlyCreated) {
 if(maxIndexSize < entrySize)
 throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
 raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
 }
 /* memory-map the file */
 /* 开始内存映射文件*/
 _length = raf.length()
 val idx = {
 if (writable)
 /*如果可写,则映射模式为可读可写*/
 raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
 else
 /*若可读,则映射模式为可读*/
 raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
 }
 /* set the position in the index for the next entry */
 /*为下一个条目 设置 buffer 中的position值*/
 if(newlyCreated)
 idx.position(0)
 else
 // if this is a pre-existing index, assume it is valid and set position to last entry
 //如果这是一个预先存在的索引,则假设它有效并将位置设置为最后一个条目
 idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
 idx
 } finally {
 CoreUtils.swallow(raf.close(), AbstractIndex)
 }
 }

kafka 的索引文件是映射到内存映射区域的,对消息偏移量的读写都是基于MappedByteBuffer 之上,当然牛逼的kafka 作者们 发明了一个简单且缓存命中友好的二叉查找算法,这个算法有机会和大家聊下。


分享到:


相關文章: