经过前两节的nio 学习,我们开始进入第三节了真不知道自己会坚持多长时间。
附上前两节的链接
前言
大家对搬砖都很熟悉吧;小绿和小蓝是搬砖工,小绿比小蓝早一点开始搬砖,小绿搬砖的方式:一块砖一块砖搬,一直勤勤恳恳;小蓝的性格比较懒,一开始和小绿一起搬砖的方式 一样,但是发现这样搬砖很累,于是小蓝就想着怎么减轻自己的活然后又能干多一点,于是想到了一个好方法:将砖放到小推车里然后运输到目的地,这样小蓝干多的活不仅多了,看起来也不是那么累了。
这里的小推车就是我们的缓冲区而小蓝我们的高级搬砖工就是今天的主题FileChannel
概念
注释原文 :A channel for reading, writing, mapping, and manipulating a file.
翻译:用于读、写、映射、操作一个文件的通道。
相比java io 的读和写以及操作文件,它多了一个功能 就是映射,什么是映射呢,我们下一个专题再说。
详细的概念
FileChannel 是一个连接一个文件的字节通道,它可以提供position() 来查询当前读或写的位置以及position(long)来设置当前读或者写的位置。文件自身包含了可读可写的可变长度字节序列,当前的字节序列长度可以通过size() 方法 获取。当写的字节数超过当前大小时 ,文件的size 增加;当调用 truncate() 方法时,文件的大小减小。文件还可能具有一些相关的元数据,如访问权限、内容类型和最后修改时间;该类不定义用于元数据访问的方法。
如何生成FileChannel
FileChannel##open(Path path, OpenOption... options)
Path 是一个可以定位某个文件在文件系统中位置的对象,依赖于文件系统。Path 类是从JDK7开始有的,大家可以去了解下,这里就不做过多解释。
OpenOption 表示 如何打开或者创建一个文件,这是一个接口我们来看看它的主要的实现枚举StandardOpenOption:代码清单4-1:
//只读,与WRITE 和Append 不能在一起使用,否则会抛出异常 READ, // 打开后可写,从文件开始位置写,可能会覆盖文件中以前的数据 WRITE, // 从文件结尾追加写 APPEND, // 当和WRITE 选项并存的时候,该文件的长度将被清为零,与READ选项并存的 时候该选项将被忽略 TRUNCATE_EXISTING, // 创建一个不存在的文件 //与CREATE_NEW 并存时 该选项配置将被忽略 CREATE, //创建新的文件,文件存在则失败 //相对于其他系统操作检查文件存在和创建文件是原子性的 CREATE_NEW
上面有三个 枚举没有列举出来这里用不到。
演示一波 代码清单4-2:
public class FileChannelOpenStudy { public static final String JAVA_NIO = "java NIO"; static String createAndWriteAndReadPath = "createAndWriteAndReadPath.txt"; public static void main(String[] args) { FileChannel readChannel = null; FileChannel createAndWriteChannel = null; FileChannel appendChannel = null; // FileChannel dataSyncChannel = null; try{ //create and write createAndWriteChannel = FileChannel.open(Paths.get(createAndWriteAndReadPath), StandardOpenOption.CREATE, StandardOpenOption.WRITE); ByteBuffer writeBuffer = ByteBuffer.allocate(6); writeBuffer.put("hello,".getBytes(Charset.forName("UTF-8"))); //切换 读写模式 writeBuffer.flip(); int writed1 = createAndWriteChannel.write(writeBuffer); System.out.println("createAndWriteChannel write in " + writed1 + " bytes"); readChannel = FileChannel.open(Paths.get(createAndWriteAndReadPath), StandardOpenOption.READ); ByteBuffer readBuffer = ByteBuffer.allocate(writed1); int readed1 = readChannel.read(readBuffer); readBuffer.flip(); System.out.println("readChannel read " + readed1 + " bytes:" + new String(readBytesFrromBuffer(readBuffer))); createAndWriteChannel.close(); appendChannel = FileChannel.open(Paths.get(createAndWriteAndReadPath), StandardOpenOption.APPEND); ByteBuffer appendBuffer = ByteBuffer.allocate(JAVA_NIO.length()); appendBuffer.put(JAVA_NIO.getBytes("UTF-8")); appendBuffer.flip(); int writed2 = appendChannel.write(appendBuffer); System.out.println("appendChannel writed in " + writed2 + " bytes"); ByteBuffer readBuffer1 = ByteBuffer.allocate(writed2); int readed2 = readChannel.read(readBuffer1, readed1); readBuffer1.flip(); System.out.println("readChannel readed " + readed2 + " bytes:" + new String(readBytesFrromBuffer(readBuffer1))); appendChannel.close(); } catch (IOException e) { e.printStackTrace(); } finally { } } private static byte[] readBytesFrromBuffer(ByteBuffer byteBuffer) { byte[] result = new byte[byteBuffer.limit()]; byteBuffer.get(result); return result; } }
结果如下:
createAndWriteChannel write in 6 bytes readChannel read 6 bytes:hello, appendChannel writed in 8 bytes readChannel readed 8 bytes:java NIO
FileInputStream##getChannel()
调用FileInputStream 的getChannel 返回一个与这个文件输入流关联的唯一的文件通道,记住是唯一的,如果已创建与之关联的文件通道则直接返回已创建的。
该方法返回的FileChannel 是只读的,调用read会抛出异常
来个demo 极度舒适一下
public class FileInputStreamGetChannel { static String filePath = "F:\\idea_two\\demo\\createAndWriteAndReadPath.txt"; public static void main(String[] args) { try { FileInputStream fileInputStream = new FileInputStream(filePath); FileChannel channel = fileInputStream.getChannel(); ByteBuffer readBuffer = ByteBuffer.allocate((int) channel.size()); int readed = channel.read(readBuffer); readBuffer.flip(); System.out.println(" fileInputStream get Channel read from " + readed + " bytes:" + new String(readBytesFrromBuffer(readBuffer))); //这里关闭的时候会先判断文件通道的parent 属性是否为空,如果不为空则会调用parent属性的关闭,这里的parent 就是FileInputStream对象 channel.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { } } private static byte[] readBytesFrromBuffer(ByteBuffer byteBuffer) { byte[] result = new byte[byteBuffer.limit()]; byteBuffer.get(result); return result; } }
看看结果再来极度舒适一下
fileInputStream get Channel read from 14 bytes:hello,java NIO
相对于普通文件读写的一些特殊方法
read 方法 重载系列
撸一 撸 代码
代码清单5-1
public class FileChannelReadTest { //该文件内容自己脑补 static String testFilePath = "testFileChannelReadTest.txt"; public static void main(String[] args) { Path path = Paths.get(testFilePath); try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) { //read ByteBuffer read1 = ByteBuffer.allocate(4); int readed1 = fileChannel.read(read1, 0); System.out.println("read ByteBuffer " + readed1 + " bytes,內容:" + new String(read1.array())); ByteBuffer[] byteBuffersRead = new ByteBuffer[3]; IntStream.rangeClosed(0, 2).forEach(i -> { byteBuffersRead[i] = ByteBuffer.allocate(5); }); System.out.println("alfter read fileChannel position:" + fileChannel.position()); //很关键 要不然又从头开始读,如下面的结果A fileChannel.position(readed1); //scatter read 分散读取 从channnel 中分散地读取到buffer数组中 long readed2 = fileChannel.read(byteBuffersRead, 0, byteBuffersRead.length); IntStream.rangeClosed(0, 2).forEach(i -> { byteBuffersRead[i].flip(); }); System.out.println(String.format("read ByteBuffers %d bytes,content:%s\n%s\n%s",readed2, new String(readBytesFrromBuffer(byteBuffersRead[0])), new String(readBytesFrromBuffer(byteBuffersRead[1])), new String(readBytesFrromBuffer(byteBuffersRead[2])))); } catch (IOException e) { } } private static byte[] readBytesFrromBuffer(ByteBuffer byteBuffer) { byte[] result = new byte[byteBuffer.limit()]; byteBuffer.get(result); return result; } }
结果A(错误结果)
read ByteBuffer 4 bytes,內容:hell read ByteBuffers 15 bytes,content:helln iha0n iha1n
结果C(正确结果)
read ByteBuffer 4 bytes,內容:hell alfter read fileChannel position:0 read ByteBuffers 15 bytes,content:niha0 niha1 niha2
write 方法重载系列
参数返回值类型及说明说明
秀一波操作
代码清单5-2
public class FileChannelWriteTest { static String testFilePath = "testFileChannelReadTest.txt"; public static void main(String[] args) { Path path = Paths.get(testFilePath); //使用FileChannel #open 方法 创建 FileChannel 比较推荐这种 try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND)) { ByteBuffer write1 = ByteBuffer.allocate(8); write1.put("hell".getBytes(Charset.forName("UTF-8"))); //缓冲区切换读写模式 详见第一节内容 write1.flip(); //写入 int writed1 = fileChannel.write(write1); System.out.println("write ByteBuffer " + writed1 + " bytes"); ByteBuffer[] byteBuffers = new ByteBuffer[3]; IntStream.rangeClosed(0, 2).forEach(i -> { byteBuffers[i] = ByteBuffer.allocate(8); byteBuffers[i].put(("niha" + i).getBytes()); //切换读写模式 一定别忘了 byteBuffers[i].flip(); }); //聚合写入 将多个缓冲写入到一个通道里 long writed2 = fileChannel.write(byteBuffers, 0, byteBuffers.length); System.out.println(" write ByteBuffers "+writed2 +" bytes"); } catch (IOException e) { } } }
撸的结果:
write ByteBuffer 4 bytes write ByteBuffers 15 bytes
force(boolean metaData)
强制此通道文件的任何更新写入到包含它的存储设备。我们调用上面的write 方法只是将数据写入到 系统缓存中,然后后由pdflush线程异步刷新到硬盘上 ,当然是根据过期脏页的内存占用工作内存的百分比、脏页占工作内存的百分比决定是否开启pdflush线程。调用该方法可以确保我们写入的数据保存到磁盘上,这样不会再突然断电的时候丢失关键的信息。
?该方法包含一个参数metaData 布尔型,表示是否将文件的元数据信息一起刷新到磁盘,文件的元数据信息包括文件的访问权限、文件的最后更新时间等,与操作系统有关;如果metaData 为true 则会多一个io 操作,可以通过设置该值来限制io 操作的数量。
用法: 每次调用write 完就调用该方法则会降低应用的响应时间以及吞吐率
- 如果写入的数据不希望在断电的情况丢失,则可以每次写每次force 一下,牺牲性能保证写入数据的完整性。
- 如果对写入的数据在断电时可以丢失极小部分数据,可以采用异步刷盘的策略,有个定时任务定时调用force 而保证数据只会在断电时候丢失几秒的数据,这在许多应用中都是可以接受的
话不多说先撸个代码为敬
及时写及时刷 演示代码: 代码清单5-3
public class FileChannelForceInTime { static String testFilePath = "testFileChannelForceInTime.txt"; static byte[] imageBytes; final static String PICTUREE_PATH = "meimei.jpg"; static { try { imageBytes = Files.readAllBytes(Paths.get(PICTUREE_PATH)); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { try { Path path = Paths.get(testFilePath); FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND); //每次写都调用force ByteBuffer imageBytesBuffer = ByteBuffer.allocateDirect(imageBytes.length); imageBytesBuffer.put(imageBytes); long now = System.currentTimeMillis(); for (int i = 0; i <= 9; i++) { imageBytesBuffer.flip(); fileChannel.write(imageBytesBuffer); fileChannel.force(false); //这里为了和异步刷盘的例子保持一致,因为异步刷盘有个定时任务所以 可能时间需要长点,所以这里模拟一下 Thread.sleep( 1000); } System.out.println("write in time force cost " + ((System.currentTimeMillis() - now)-10 * 1000) + "ms"); fileChannel.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
输出结果
write in time force cost 922ms
定时任务异步刷盘 代码演示:代码清单5-4
public class FileChannelTimelyForce { static String testFilePath = "testFileChannelTimelyForce.txt"; static byte[] imageBytes; final static String PICTUREE_PATH = "meimei.jpg"; static { try { imageBytes = Files.readAllBytes(Paths.get(PICTUREE_PATH)); } catch (IOException e) { e.printStackTrace(); } } private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); public static void main(String[] args) { try { Path path = Paths.get(testFilePath);; FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND); executorService.scheduleAtFixedRate(() -> { try { if (fileChannel.isOpen()) { fileChannel.force(false); } } catch (IOException e) { } }, 1, 1, TimeUnit.SECONDS); long now = System.currentTimeMillis(); ByteBuffer imageBytesBuffer = ByteBuffer.allocateDirect(imageBytes.length); imageBytesBuffer.put(imageBytes); for (int i = 0; i <= 9; i++) { imageBytesBuffer.flip(); fileChannel.write(imageBytesBuffer); //异步刷盘有s个定时任务所以 可能时间需要长点,所以这里模拟一下 Thread.sleep(1000); } System.out.println("fileChannnel write timely foce cost " + (System.currentTimeMillis() - now - 10 * 1000) + "ms"); fileChannel.close(); } catch (IOException e) { } catch (InterruptedException e) { }finally { executorService.shutdown(); } } }
运行结果如下:
fileChannnel write timely foce cost 127ms
两种用法的耗时也是非常明显的,可以根据大家的场景去使用,当然有更好的方法大家也可以在下面评论下,共同进步。
后记
下一节我还会讨论FileChannel,会涉及到该类的一些高级用法以及一些底层原理,还有Kafka消息中间件是怎么用的,可能需要一段时间。