Java I/O 17 - PipedReader & PipedWriter

  关于 java.io.PipedReader java.io.PipedWriter 的部分笔记,用来完成不同线程间的数据传送和通信。读写操作分别在不同的线程中向缓冲区buffer中写入数据、读取数据。本文演示代码段的执行环境基于JDK版本1.7

概述

  PipedReader和PipedWriter两个类可以同时使用用来满足线程间的数据传送和通信需求。读和写操作各自使用一个线程,向缓冲区buffer中写入字符内容,然后唤醒读线程读取内容,然后再唤醒写线程继续写入字符内容,然后再唤醒读线程读取内容,依次直到满足某个条件读取/写入操作结束。

继承关系

1
2
3
4
5
6
7
8
9
// PipedReader
--java.lang.Object
--java.io.Reader
--java.io.PipedReader

// PipedWriter
--java.lang.Object
--java.io.Writer
--java.io.PipedWriter

实现接口

类名 实现接口
PipedReader Closeable, AutoCloseable,Readable
PipedWriter Closeable, Flushable, AutoCloseable, Appendable

PipedReader

Constructor Summary

public StringReader(String s)

1
2
3
public PipedReader(PipedWriter src) throws IOException {
this(src, DEFAULT_PIPE_SIZE);
}

  初始化一个管道输入流。底层调用的是方法public PipedReader(PipedWriter src, int pipeSize),在初始化底层缓冲区buffer时,传入的是默认长度1024。

public PipedReader(PipedWriter src, int pipeSize)

1
2
3
4
public PipedReader(PipedWriter src, int pipeSize) throws IOException {
initPipe(pipeSize);
connect(src);
}

  初始化一个管道输入流。首先初始化管道缓冲区buffer,在初始化完成后,调用方法connect(PipedWriter src)完成输入、输出管道的互联。

public PipedReader()

1
2
3
public PipedReader() {
initPipe(DEFAULT_PIPE_SIZE);
}

  初始化一个管道输入流。由于入参中没有管道输出流,所以只需要初始化缓冲区buffer即可,buffer的长度为默认1024。

public PipedReader(int pipeSize)

1
2
3
public PipedReader(int pipeSize) {
initPipe(pipeSize);
}

  初始化一个管道输入流。由于入参中没有管道输出流,所以只需要初始化缓冲区buffer即可,buffer的长度为入参pipeSize指定的长度。

private void initPipe(int pipeSize)

1
2
3
4
5
6
private void initPipe(int pipeSize) {
if (pipeSize <= 0) {
throw new IllegalArgumentException("Pipe size <= 0");
}
buffer = new char[pipeSize];
}

  初始化管道缓冲区buffer,长度由参数pipeSize确定。

部分方法

public void connect(PipedWriter src)

1
2
3
public void connect(PipedWriter src) throws IOException {
src.connect(this);
}

  建立一个reader端到writer端之间的管道连接。如果当前reader未曾与其他管道连接,那么该方法会建立一个有效的连接,否则会抛出一个IOException异常。具体实现在PipedWriter.connect(PipedReader snk)中。

synchronized void receive(int c)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
synchronized void receive(int c) throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByWriter || closedByReader) {
throw new IOException("Pipe closed");
} else if (readSide != null && !readSide.isAlive()) {
throw new IOException("Read end dead");
}

writeSide = Thread.currentThread();
while (in == out) {
if ((readSide != null) && !readSide.isAlive()) {
throw new IOException("Pipe broken");
}
/* full: kick any waiting readers */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
if (in < 0) {
in = 0;
out = 0;
}
buffer[in++] = (char) c;
if (in >= buffer.length) {
in = 0;
}
}

  接收一个字符数据并存入到底层缓冲区buffer中。第2 ~ 3行代码判断当前输入/输出管道是否正在连接,第4 ~ 5行代码判断输入/输出管道是否关闭,第6 ~ 7行代码判断输出管道(PipedReader)是否正常。

  第10行代码将当前线程切换到输入线程(写线程),如果当前缓冲区buffer已满(in == out),那么唤醒读线程(输出线程),暂停写线程,让读线程运行并读取缓冲区buffer中的内容并释放缓冲区buffer空间。第23 ~ 26行代码中则将buffer清空,所有读写标记位都归零。第27行代码将入参字符写入到缓冲区buffer中。第28 ~ 30行代码,如果写标记到达缓冲区buffer尾部,那么将其置于buffer头部,继续写数据。

synchronized void receive(char c[], int off, int len)

1
2
3
4
5
synchronized void receive(char c[], int off, int len)  throws IOException {
while (--len >= 0) {
receive(c[off++]);
}
}

  接收一个字符数组的数据并存入到底层缓冲区buffer中。底层调用方法是receive(int c)。

synchronized void receivedLast()

1
2
3
4
synchronized void receivedLast() {
closedByWriter = true;
notifyAll();
}

  通知所有等待的线程(读线程)所有内容都已经写入到缓冲区buffer中,同时关闭输入线程(写线程),即PipedWriter。

public synchronized int read()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public synchronized int read()  throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}

readSide = Thread.currentThread();
int trials = 2;
while (in < 0) {
if (closedByWriter) {
/* closed by writer, return EOF */
return -1;
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
/* might be a writer waiting */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++];
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
return ret;
}

  读一个字符内容并返回。第2 ~ 3行代码判断当前输入/输出管道是否正在连接,第4 ~ 5行代码判断输出管道是否关闭,第6 ~ 9行代码判断输入管道(PipedWriter)是否正常。

  第11行代码将当前线程切换到读线程。如果缓冲区buffer为空,那么检查写线程是否可用,如果写线程关闭则返回-1标识已读取到文件结束位置。重试最多两次检查写线程是否正常可用,唤醒所有线程,同时暂停当前读线程以便让写线程工作向缓冲区buffer中写入新的字符数据内容。

  之后读取新的内容并返回给方法调用方,如果已经读到了buffer尾部,那么将回到头部继续读取。如果已经读完了缓冲区中的所有内容,那么将in标记为-1标识当前缓冲区buffer中已经没有未读取的数据了。

public synchronized int read(char cbuf[], int off, int len)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public synchronized int read(char cbuf[], int off, int len)  throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}

if ((off < 0) || (off > cbuf.length) || (len < 0) ||
((off + len) > cbuf.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}

/* possibly wait on the first character */
int c = read();
if (c < 0) {
return -1;
}
cbuf[off] = (char)c;
int rlen = 1;
while ((in >= 0) && (--len > 0)) {
cbuf[off + rlen] = buffer[out++];
rlen++;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
}
return rlen;
}

  读取字符内容并保存到入参字符数组cbuf中。第2 ~ 3行代码判断当前输入/输出管道是否正在连接,第4 ~ 5行代码判断输出管道是否关闭,第6 ~ 9行代码判断输入管道(PipedWriter)是否正常。第11 ~ 16行代码完成入参的有效性校验,避免发生越界溢出。

  第19 ~ 22行代码中尝试性的从缓冲区buffer中读取一个字符内容,并判断是否为EOF状态。这个操作个人猜测是用来检查缓冲区buffer中是否有可供读取的内容的,如果没有可读取的字符,那么会直接切换到写线程向缓冲区buffer中写入新的字符数据。第23 ~ 35行代码则从缓冲区buffer中依次读取字符内容,并将读取到的内容保存到cbuf中。最后返回实际读取的字符长度。

public synchronized boolean ready()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public synchronized boolean ready() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}
if (in < 0) {
return false;
} else {
return true;
}
}

  通知方法调用方是否可以通过read方法读取字符数据。第2 ~ 3行代码判断当前输入/输出管道是否正在连接,第4 ~ 5行代码判断输出管道是否关闭,第6 ~ 9行代码判断输入管道(PipedWriter)是否正常。如果in >= 0,那么则表示缓冲区buffer中已有未读取的字符内容,否则缓冲区buffer为空。

public void close()

1
2
3
4
public void close()  throws IOException {
in = -1;
closedByReader = true;
}

  关闭当前输出管道。

PipedWriter

Constructor Summary

public PipedWriter(PipedReader snk)

1
2
3
public PipedWriter(PipedReader snk)  throws IOException {
connect(snk);
}

  初始化一个管道输入流。该流用于向缓冲区buffer中写入字符数据内容。初始化过程中,当前流会和管道输出流建立一个连接。

public PipedWriter()

1
2
public PipedWriter() {
}

  初始化一个管道输入流。

部分方法

public synchronized void connect(PipedReader snk)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public synchronized void connect(PipedReader snk) throws IOException {
if (snk == null) {
throw new NullPointerException();
} else if (sink != null || snk.connected) {
throw new IOException("Already connected");
} else if (snk.closedByReader || closed) {
throw new IOException("Pipe closed");
}

sink = snk;
snk.in = -1;
snk.out = 0;
snk.connected = true;
}

  建立一个管道输出流和输入流的连接。第2 ~ 3行代码用来判断管道输出流(读管道)是否存在,第4 ~ 5行代码用来判断当前管道输入流是否已经和其他管道输出流建立了连接,或者入参管道输出流是否已经处于连接状态;第6 ~ 7行代码则检查管道流的开启状态。

  第10 ~ 13行代码用来建立并维系一个入参管道输出流和当前管道输入流的连接关系,并初始化管道输出流的读写标记位值。

public void write(int c)

1
2
3
4
5
6
public void write(int c)  throws IOException {
if (sink == null) {
throw new IOException("Pipe not connected");
}
sink.receive(c);
}

  向缓冲区buf中写入一个字符数据。第2 ~ 4行代码完成管道状态的有效性校验。实际上调用管道输出流的方法PipedReader.receive(int c)来实现功能的。

public void write(char cbuf[], int off, int len)

1
2
3
4
5
6
7
8
public void write(char cbuf[], int off, int len) throws IOException {
if (sink == null) {
throw new IOException("Pipe not connected");
} else if ((off | len | (off + len) | (cbuf.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
}
sink.receive(cbuf, off, len);
}

  向缓冲区buf中写入入参cbuf中含有的字符数据。第2 ~ 6行代码完成管道状态和参数的有效性校验。底层通过调用方法PipedReader.receive(char c[], int off, int len)来实现功能。

public synchronized void flush()

1
2
3
4
5
6
7
8
9
10
public synchronized void flush() throws IOException {
if (sink != null) {
if (sink.closedByReader || closed) {
throw new IOException("Pipe closed");
}
synchronized (sink) {
sink.notifyAll();
}
}
}

  将缓冲区buffer中的数据推到目标输出位置上。这里实际上是唤醒所有的读线程,通过读线程去读取缓冲区buffer中的内容。

public void close()

1
2
3
4
5
6
public void close()  throws IOException {
closed = true;
if (sink != null) {
sink.receivedLast();
}
}

  关闭当前输出流。

涉及基础知识点

  1. NIL

参考文献

  1. NIL




------------- End of this article, thanks! -------------


  版权声明:本文由Nathan R. Lee创作和发表,采用署名(BY)-非商业性使用(NC)-相同方式共享(SA)国际许可协议进行许可,转载请注明作者及出处。
  本文作者为 Nathan R. Lee
  本文标题为 Java I/O 17 - PipedReader & PipedWriter
  本文链接为 https://marcuseddie.github.io/2018/java-PipedReader-PipedWriter.html