Java I/O 04 - BufferedInputStream & BufferedOutputStream

  关于 java.io.BufferedInputStream java.io.BufferedOutputStream 的部分笔记,这两个类配套完成带缓存处理的字节输入、输出流操作。本文演示代码段的执行环境基于JDK版本1.7

概述

  BufferedInputStream和BufferedOutputStream两个类完成带缓存处理的字节输入、输出流操作。BufferedInputStream继承的InputStream负责从外部资源将数据移动到Java程序中 ,BufferedOutputStream继承的OutputStream负责从Java程序中将数据移动到外部目标 。这两个类分别继承了FilterInputStream和FilterOutputStream这两个类,内部引用了InputStream和OutputStream,在初始化实例BufferedInputStream和BufferedOutputStream时分别需要传入一个InputStream和OutputStream。BufferedInputStream和BufferedOutputStream两个类在设计时借鉴了装饰器模式的设计思想。BufferedInputStream和BufferedOutputStream在读写数据时,从数据源一次读入多个字节的数据存储在内存缓存中,待内存缓存写满后再一次性将读入的数据写入到其他目的位置,然后再重新填充新的输入流数据到内存缓存中,直至整个输入流的数据全部读完。通过内存缓存作为临时存储空间向磁盘读写数据的处理方式避免了很多对磁盘IO的操作,而读写内存相对于读写磁盘来说,速度更快,由此带来的好处就是在读写大容量数据时通过带缓存的读取比不带缓存的读取效率更高,会节省很多时间。

继承关系

1
2
3
4
5
6
7
8
9
10
11
// BufferedInputStream
--java.lang.Object
--java.io.InputStream
--java.io.FilterInputStream
--java.io.BufferedInputStream

// BufferedOutputStream
--java.lang.Object
--java.io.OutputStream
--java.io.FilterOutputStream
--java.io.BufferedOutputStream

实现接口

类名 实现接口
BufferedInputStream Closeable, AutoCloseable
BufferedOutputStream Closeable, Flushable, AutoCloseable

部分方法

Fields

1
2
3
private static final
AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater.newUpdater
(BufferedInputStream.class, byte[].class, "buf");

  用来做缓存内容的原子更新,保证多线程下对缓存数组访问的安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//当前缓存区的存储内容的字节数
protected int count;

//当前缓存区已经读到的位置。
//仅BufferedInputStream持有
protected int pos;

//当前缓存区已经被标记的位置,通过mark()方法会将pos的值保存在markpos里,
//通过reset()方法会将markpos的值赋给pos,这样就实现缓存区数据的重读操作。
//仅BufferedInputStream持有
protected int markpos = -1;

//它的值表示在调用mark方法后reset方法前最多允许读取的字节数,如果超过这个限制,
//系统将不会再维护markpos位置值。也就是说,将不会再重新读到markpos之后的数据内容,
//同时在调用reset方法时会报:Resetting to invalid mark异常。
//这个变量的存在可以用来限制buffer数组的长度,使之不会无限制的增长导致过多的内存占用。
//仅BufferedInputStream持有
protected int marklimit;

Constructor Summary

public BufferedInputStream(InputStream in)

1
2
3
public BufferedInputStream(InputStream in) {
this(in, defaultBufferSize);
}

public BufferedInputStream(InputStream in, int size)

1
2
3
4
5
6
7
public BufferedInputStream(InputStream in, int size) {
super(in);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}

public BufferedOutputStream(OutputStream out)

1
2
3
public BufferedOutputStream(OutputStream out) {
this(out, 8192);
}

public BufferedOutputStream(OutputStream out, int size)

1
2
3
4
5
6
7
public BufferedOutputStream(OutputStream out, int size) {
super(out);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}

private void fill()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0)
pos = 0; /* no mark: throw away the buffer */
else if (pos >= buffer.length) /* no room left in buffer */
if (markpos > 0) { /* can throw away early part of the buffer */
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
markpos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
} else { /* grow buffer */
int nsz = pos * 2;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
// Can't replace buf if there was an async close.
// Note: This would need to be changed if fill()
// is ever made accessible to multiple threads.
// But for now, the only way CAS can fail is via close.
// assert buf == null;
throw new IOException("Stream closed");
}
buffer = nbuf;
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}

  用来负责向缓冲区数组中填充新的原始流数据。这里有几个比较重要的判断:

  1. 通过 if (pos >= count) 判断判断是否读完缓冲区数组中的数据(这个判断在read方法中);
  2. 通过 if (markpos < 0) 判断判断输入流有没有被标记为需要重读的数据;
  3. 通过 if (pos >= buffer.length) 判断判断缓冲区数组中有没有多余的空间来容纳新数据;

第2行代码负责获取缓冲区数组的信息,第3 ~ 4行代码的判断用来处理用户未设置mark位置的场景。在这种场景下,不存在需要重读的数据,所以直接将pos位置的归零。第5 ~ 29行代码则考虑当前缓存数据中存在需要重读的数据,且pos已经超过了缓冲数组存储数据的有效长度,当前缓冲数组中已经没有空闲位置来容纳新读入的数据。第6 ~ 10行代码处理在markpos位置之前存在不需要重读可以被丢弃的额数,那么这种场景下只需要将自markpos位置之后的数据全部左移至缓冲区数组最左端即可,这样便释放了缓冲区数组的空闲存储位置来存放新读入的数据。第11 ~ 13行代码则处理markpos在缓冲区数组最左端且需要保存的数据满满的充斥着缓冲区,同时缓冲区数组存储数据的有效长度大于marklimit的场景,这种场景下由于数组长度(同时也是需要重读的数据内容长度)已经超过了允许重读读取的最大长度,所以这种情况下markpos已经不再起作用了,所以直接废弃现有markpos位置,重新归零。第15 ~ 29行则处理markpos在缓冲区数组最左端且需要保存的数据满满的充斥着缓冲区,同时缓冲区数组存储数据的有效长度不大于marklimit的场景,这种场景下markpos依旧有效,但是没有新位置容纳新读入的数据。所以需要执行扩容操作,将现有缓冲区数组的数据复制到一个新的长度更大的数组中。第31行代码则向缓冲区数组读入新的数据,最后更新缓冲区数组存储数据的有效长度。

public synchronized int read()

1
2
3
4
5
6
7
8
public synchronized int read() throws IOException {
if (pos >= count) {
fill();
if (pos >= count)
return -1;
}
return getBufIfOpen()[pos++] & 0xff;
}

  获取数据的API。如果下一个读取位置比缓存数组中实际存储内容的有效长度大(pos >= count),那么就认为当前缓存中的内容已经被读完了,需要重新向缓存中写入新的数据,所以接下来会通过fill()方法来执行新数据的写入操作。如果写入操作执行后,下一个读取位置依旧比实际存储内容的有效长度大,那么会认为已经没有新的数据被写入了。最后会返回下一个位置的数据。但是在返回前需要与0XFF(1111 1111)执行一次与操作。因为缓存数组的类型是byte,而返回的类型是32位比特长度的int类型,在行5代码中,会返回-1标识数据已经读完。所以如果缓存数组中含有内容值-1时,需要确认-1的含义是什么。通过与0XFF相与,返回的内容中高24比特位都会被填充为0,所以即可辨别返回数据的含义。

public synchronized void write(int b)

1
2
3
4
5
6
7
8
9
10
11
12
13
public synchronized void write(int b) throws IOException {
if (count >= buf.length) {
flushBuffer();
}
buf[count++] = (byte)b;
}

private void flushBuffer() throws IOException {
if (count > 0) {
out.write(buf, 0, count);
count = 0;
}
}

  用于向外部数据写数据。如果当前缓冲区数组已经存满了内容,那么就调用flushBuffer()方法将缓冲区数组中的内容写入到输出流中,同时清空缓冲区数组。

public synchronized void flush()

1
2
3
4
public synchronized void flush() throws IOException {
flushBuffer();
out.flush();
}

  用户主动触发向输出流写数据的操作,而不用等到缓冲区数组被写满后再执行。

public synchronized int read(byte b[], int off, int len)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public synchronized int read(byte b[], int off, int len)
throws IOException
{
getBufIfOpen(); // Check for closed stream
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}

int n = 0;
for (;;) {
int nread = read1(b, off + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
// if not closed but no bytes available, return
InputStream input = in;
if (input != null && input.available() <= 0)
return n;
}
}

private int read1(byte[] b, int off, int len) throws IOException {
int avail = count - pos;
if (avail <= 0) {
/* If the requested length is at least as large as the buffer, and
if there is no mark/reset activity, do not bother to copy the
bytes into the local buffer. In this way buffered streams will
cascade harmlessly. */
if (len >= getBufIfOpen().length && markpos < 0) {
return getInIfOpen().read(b, off, len);
}
fill();
avail = count - pos;
if (avail <= 0) return -1;
}
int cnt = (avail < len) ? avail : len;
System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
pos += cnt;
return cnt;
}

  这个方法和前面谈论过的read()目的都是一样的,用于从原始输入流中读取数据。但是read()方法一次只会读取一个字节的数据到缓存区数组中,而这个方法会一次尽量读取多个字节的数据并返回,并且在没有读够指定长度的数据之前是不会结束操作的,除非数据流已经完全被读完。第5 ~ 9行负责做边界校验。第12 ~ 23行则每次分别从原始数据流中尽可能多的读数据到指定数组b中。第13行会得到从原始数据流中读取到的字节数量。第14 ~ 15行表示从原始数据流中读到的字节数是0,说明原始数据流中已经没有可读的数据了。第16行负责统计到当前循环已经得到的字节数。第17 ~ 18行表示已经获取到了指定数量的字节数,可以结束操作并返回了。第20 ~ 22行和第14 ~ 15行的含义相似,都表示当前原始数据流中已经没有新的可读数据了。

public synchronized void write(byte b[], int off, int len)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public synchronized void write(byte b[], int off, int len) throws IOException {
if (len >= buf.length) {
/* If the request length exceeds the size of the output buffer,
flush the output buffer and then write the data directly.
In this way buffered streams will cascade harmlessly. */
flushBuffer();
out.write(b, off, len);
return;
}
if (len > buf.length - count) {
flushBuffer();
}
System.arraycopy(b, off, buf, count, len);
count += len;
}

  该方法会一次把多个字节的数据写入到外部流中。如果需写入数据的入参长度len大于缓冲区数组的长度,那么直接把缓冲区数组的内容写入到外部流中,同时把入参中的多个字节的数据也直接写入到外部流中。如果需写入数据的入参长度len大于缓冲区数组的空闲存储容量,那么把缓冲区数组的内容写入到外部流中。如果上述两种场景都不满足,那么就意味着当前缓冲区数组的空闲容量可以容纳全部需要写入的数据量,那么就把需要写入的数据存放到缓冲区数组中,同时更新缓冲区数组的长度。

public synchronized long skip(long n)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public synchronized long skip(long n) throws IOException {
getBufIfOpen(); // Check for closed stream
if (n <= 0) {
return 0;
}
long avail = count - pos;

if (avail <= 0) {
// If no mark position set then don't keep in buffer
if (markpos <0)
return getInIfOpen().skip(n);

// Fill in buffer to save bytes for reset
fill();
avail = count - pos;
if (avail <= 0)
return 0;
}

long skipped = (avail < n) ? avail : n;
pos += skipped;
return skipped;
}

  首先判断当前缓冲区数组是否有待读取的数据内容。如果没有的话就需要从数据源读取数据填充到缓冲区数组中,然后再次判断是否有待读取的数据内容。如果还是没有的话说明数据已经全部读完,所以第17行代码直接返回0。如果当前缓冲区数据中还有待读取的数据,那么判断入参n和剩余待读取数据长度的大小关系,并将pos移动到计算后的结果位置上。

public synchronized int available()

1
2
3
4
5
6
7
public synchronized int available() throws IOException {
int n = count - pos;
int avail = getInIfOpen().available();
return n > (Integer.MAX_VALUE - avail)
? Integer.MAX_VALUE
: n + avail;
}

  返回剩余可读的字节数。

public synchronized void mark(int readlimit) and reset()

1
2
3
4
5
6
7
8
9
10
11
public synchronized void mark(int readlimit) {
marklimit = readlimit;
markpos = pos;
}

public synchronized void reset() throws IOException {
getBufIfOpen(); // Cause exception if closed
if (markpos < 0)
throw new IOException("Resetting to invalid mark");
pos = markpos;
}

  mark()方法用来设置需要重读数据的起始pos位置,同时限制了可以重读的数据的长度。也就是说,一旦调用了方法之后,就只能读取从markpos位置开始,长度为readlimit的内容。

  reset()方法会将当前pos位置重新归为重读数据的起始位置,reset()方法调用之后再次读取时便会读到markpos位置起始的数据。

public void close()

1
2
3
4
5
6
7
8
9
10
11
12
13
public void close() throws IOException {
byte[] buffer;
while ( (buffer = buf) != null) {
if (bufUpdater.compareAndSet(this, buffer, null)) {
InputStream input = in;
in = null;
if (input != null)
input.close();
return;
}
// Else retry in case a new buf was CASed in fill()
}
}

  关掉输入流,释放读取数据占用的资源。包括缓冲区数组和原始输入流对象,都要被关闭掉。在BufferedInputStream中,其他的几个主要方法都采用了synchronized关键字来修饰方法声明,以此来实现多线程下的数据安全访问。但是close()方法的声明中却没有采用该关键字,因为如果采用了synchronized关键字,会导致当前线程正在执行read方法时,如果想结束一个消耗巨大的读取操作却因为此时read()方法还没执行完,所以close方法必须等到read结束后才能执行,这会导致资源不能及时释放以致系统负载剧增。 但是该方法通过借助bufUpdater.compareAndSet()方法来保证多线程下的可以安全的释放输入流的相关资源。

涉及基础知识点

  1. AtomicReferenceFieldUpdater:

    AtomicReferenceFieldUpdater是一个基于反射的工具类,可以用来完成数据的原子更新。在本文中,通过使用AtomicReferenceFieldUpdater.newUpdater (BufferedInputStream.class, byte[].class, “buf”)来生成一个BufferedInputStream类的byte数组类型的名字为buf的原子修改器,然后调用其compareAndSet()对buf字段进行修改。

    参考文献

  2. 阿里中间件团队博客. BufferedInputStream 源码学习笔记 [E]
  3. skywang12345. java io系列12之 BufferedInputStream(缓冲输入流)的认知、源码和示例 [E]
  4. zhihu. java的io缓冲区buffered,他们高效在哪里,底层还是一个一个字节读到底哪里高效了? [E]
  5. [中] 林信良. Java SE 7技術手冊[M]. 台北:碁峰, 2011.
  6. chen_sg. Java IO流學習總結三:緩衝流-BufferedInputStream、BufferedOutputStream [E]
  7. 时间的朋友. Java原子属性更新器AtomicReferenceFieldUpdater的使用 [E]
  8. tydhot. BufferedInputStream的read()方法源码解析 [E]
  9. 一mimo一. InputStream mark()方法readlimit参数真实含义 [E]
  10. DiaoCow. BufferedInputStream实现原理分析 [E]
  11. zhhphappy. BufferedInputStream 源码学习笔记 [E]
  12. stackoverflow. what is the use of mark read limit in bufferedinputstream [E]



------------- End of this article, thanks! -------------


  版权声明:本文由N.C.Lee创作和发表,采用署名(BY)-非商业性使用(NC)-相同方式共享(SA)国际许可协议进行许可,转载请注明作者及出处。
  本文作者为 N.C.Lee
  本文标题为 Java I/O 04 - BufferedInputStream & BufferedOutputStream
  本文链接为 https://marcuseddie.github.io/2018/Java-source-BufferedInput-OutputStream.html