关于 java.io.InputStreamReader 和 java.io.OutputStreamWriter 的部分笔记,这两个类完成了字节流和字符流之间的相互转化和读写操作,但是这两个类自身不负责任何操作,所有的操作均依赖于类StreamDecoder和StreamEncoder。本文演示代码段的执行环境基于JDK版本1.7 。
概述 InputStreamReader和OutputStreamWriter是字节流和字符流之间进行相互转化的一个桥梁和媒介。InputStreamReader负责将字节流转成字符流,其底层是通过StreamDecoder来完成所有操作的。OutputStreamWriter则负责将字符流转换成对应的字节流,其底层的实现则依赖于StreamEncoder。换言之,InputStreamReader和OutputStreamWriter自身并不需要做什么操作和处理,所有的操作和实现都是在底层的调用StreamDecoder和StreamEncoder中,InputStreamReader和OutputStreamWriter只是起到了一个封装和调用的作用。
继承关系 1 2 3 4 5 6 7 8 9 --java.lang.Object --java.io.Reader --java.io.InputStreamReader --java.lang.Object --java.io.Writer --java.io.OutputStreamWriter
实现接口
类名
实现接口
InputStreamReader
Closeable, AutoCloseable,Readable
OutputStreamWriter
Closeable, Flushable, Appendable,AutoCloseable
Constructor Summary 1 2 3 4 5 6 7 8 9 10 public InputStreamReader (InputStream in) { super (in); try { sd = StreamDecoder.forInputStreamReader(in, this , (String)null ); } catch (UnsupportedEncodingException e) { throw new Error(e); } }
初始化一个底层的StreamDecoder。在InputStreamReader中,所有的流读取和转换操作都是在流解码器(StreamDecoder)中完成的。所以在完成继承父类的构造函数后,即通过StreamDecoder 的方法forInputStreamReader来完成StreamDecoder的初始化工作。StreamDecoder的相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private Charset cs;private CharsetDecoder decoder;private ByteBuffer bb;private InputStream in;private ReadableByteChannel ch;public static StreamDecoder forInputStreamReader (InputStream in, Object lock, String charsetName) throws UnsupportedEncodingException { String csn = charsetName; if (csn == null ) csn = Charset.defaultCharset().name(); try { if (Charset.isSupported(csn)) return new StreamDecoder(in, lock, Charset.forName(csn)); } catch (IllegalCharsetNameException x) { } throw new UnsupportedEncodingException(csn); } StreamDecoder(InputStream in, Object lock, Charset cs) { this (in, lock, cs.newDecoder() .onMalformedInput(CodingErrorAction.REPLACE) .onUnmappableCharacter(CodingErrorAction.REPLACE)); } StreamDecoder(InputStream in, Object lock, CharsetDecoder dec) { super (lock); this .cs = dec.charset(); this .decoder = dec; if (false && in instanceof FileInputStream) { ch = getChannel((FileInputStream)in); if (ch != null ) bb = ByteBuffer.allocateDirect(DEFAULT_BYTE_BUFFER_SIZE); } if (ch == null ) { this .in = in; this .ch = null ; bb = ByteBuffer.allocate(DEFAULT_BYTE_BUFFER_SIZE); } bb.flip(); }
StreamDecoder的路径信息如下:sun.nio.cs.StreamDecoder 。方法forInputStreamReader( InputStream in, Object lock, String charsetName)完成StreamDecoder的初始化工作。第11 ~ 13行代码负责指定一个编码字符集。第16行代码则创建了一个StreamDecoder,并返回给方法调用方。第22 ~ 27行代码中,根据传入的字符集创建一个字符集解码器,然后调用方法StreamDecoder (InputStream in, Object lock, CharsetDecoder dec)完成相应的初始化操作。由于StreamDecoder继承了Reader,所以第30行代码会首先调用父类的有参构造方法。第31 ~ 32行指定了字符集和字符集解码器。第40 ~ 44行代码则指定了最终调用的输入流和用到的字节缓冲数组的大小,初始化大小为8192 。第45行代码则表示缓冲数组目前为空,第一个可以写入的位置为0位置。
1 2 3 4 5 6 7 8 public InputStreamReader (InputStream in, String charsetName) throws UnsupportedEncodingException { super (in); if (charsetName == null ) throw new NullPointerException("charsetName" ); sd = StreamDecoder.forInputStreamReader(in, this , charsetName); }
初始化一个底层的StreamDecoder。这个方法中显示指定了需要使用的字符集编码。
1 2 3 4 5 6 public InputStreamReader (InputStream in, Charset cs) { super (in); if (cs == null ) throw new NullPointerException("charset" ); sd = StreamDecoder.forInputStreamReader(in, this , cs); }
初始化一个底层的StreamDecoder。这个方法中显示指定了需要使用的字符集编码。创建StreamDecoder调用的方法如下:
1 2 3 4 public static StreamDecoder forInputStreamReader (InputStream in, Object lock, Charset cs) { return new StreamDecoder(in, lock, cs); }
调用的构造方法和public InputStreamReader(InputStream in) 实际调用的方法一致。
1 2 3 4 5 6 public InputStreamReader (InputStream in, CharsetDecoder dec) { super (in); if (dec == null ) throw new NullPointerException("charset decoder" ); sd = StreamDecoder.forInputStreamReader(in, this , dec); }
初始化一个底层的StreamDecoder。这个方法中显示指定了需要使用的字符集解码器。CharsetDecoder可以将一个字节序列按照特定的字符集转换成一个16位的Unicode序列。创建StreamDecoder调用的方法如下:
1 2 3 4 5 public static StreamDecoder forInputStreamReader (InputStream in, Object lock, CharsetDecoder dec) { return new StreamDecoder(in, lock, dec); }
调用的构造方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 StreamDecoder(InputStream in, Object lock, CharsetDecoder dec) { super (lock); this .cs = dec.charset(); this .decoder = dec; if (false && in instanceof FileInputStream) { ch = getChannel((FileInputStream)in); if (ch != null ) bb = ByteBuffer.allocateDirect(DEFAULT_BYTE_BUFFER_SIZE); } if (ch == null ) { this .in = in; this .ch = null ; bb = ByteBuffer.allocate(DEFAULT_BYTE_BUFFER_SIZE); } bb.flip(); }
部分方法 public String getEncoding() 1 2 3 public String getEncoding () { return sd.getEncoding(); }
获取当前流使用的编码字符集信息,如果当前流已经被关闭,那么会返回null给接口调用方。具体实现位于StreamDecoder中,相关代码段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public String getEncoding () { if (isOpen()) return encodingName(); return null ; } private boolean isOpen () { return isOpen; } String encodingName () { return ((cs instanceof HistoricallyNamedCharset) ? ((HistoricallyNamedCharset)cs).historicalName() : cs.name()); }
在StreamDecoder的getEncoding()方法中,首先检查了当前输入流的开闭状态,如果当前输入流已经关闭,那么就返回null,否则就通过调用encodingName()来获取当前流采用的编码字符集信息。在encodingName()方法中,会判断当前编码字符集的种类,如果当前编码字符集是一个HistoricallyNamedCharset,那么就返回对应的historicalName,否则就返回对应的编码字符集名称。
public int read() 1 2 3 public int read () throws IOException { return sd.read(); }
从底层输入流中读取一个字节的内容并返回给调用方。具体实现位于StreamDecoder中,相关代码段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public int read () throws IOException { return read0(); } private boolean haveLeftoverChar = false ;private char leftoverChar;private int read0 () throws IOException { synchronized (lock) { if (haveLeftoverChar) { haveLeftoverChar = false ; return leftoverChar; } char cb[] = new char [2 ]; int n = read(cb, 0 , 2 ); switch (n) { case -1 : return -1 ; case 2 : leftoverChar = cb[1 ]; haveLeftoverChar = true ; case 1 : return cb[0 ]; default : assert false : n; return -1 ; } } }
StreamDecoder中的方法read()中所有的逻辑操作都位于方法read0()中。在StreamDecoder中,为了支持对由两个字符构成的字符进行编解码的处理,每次都需要从底层输入流中读取两个字符的数据。但是方法只会返回一个字符,所以需要通过haveLeftoverChar来标记是否存储了一个高位字符,如果存储了一个高位字符的话,则需要通过leftoverChar来返回存储的高位字符。
底层方法read0()完成数据的读取操作,且该方法内部整个方法体由synchronized关键字完成多线程环境下的锁操作。第11 ~ 15行代码中,如果存储了一个高位字符(haveLeftoverChar = true),那么就返回其存储的高位字符。第19行代码则从方法read(char cbuf[], int offset, int length)中读取两个字符。第20 ~ 32行则分情况向接口调用方返回读取到的字符信息。
public int read(char cbuf[], int offset, int length) 1 2 3 public int read (char cbuf[], int offset, int length) throws IOException { return sd.read(cbuf, offset, length); }
底层输入流中将读取的内容存储到入参数组cbuf中并返回实际读取到的数据长度给调用方。具体实现位于StreamDecoder中,相关代码段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private void ensureOpen () throws IOException { if (!isOpen) throw new IOException("Stream closed" ); } public int read (char cbuf[], int offset, int length) throws IOException { int off = offset; int len = length; synchronized (lock) { ensureOpen(); if ((off < 0 ) || (off > cbuf.length) || (len < 0 ) || ((off + len) > cbuf.length) || ((off + len) < 0 )) { throw new IndexOutOfBoundsException(); } if (len == 0 ) return 0 ; int n = 0 ; if (haveLeftoverChar) { cbuf[off] = leftoverChar; off++; len--; haveLeftoverChar = false ; n = 1 ; if ((len == 0 ) || !implReady()) return n; } if (len == 1 ) { int c = read0(); if (c == -1 ) return (n == 0 ) ? -1 : n; cbuf[off] = (char )c; return n + 1 ; } return n + implRead(cbuf, off, off + len); } }
该方法会读取入参length指定长度的数据内容并存储到入参cbuf中。方法体被synchronized包围保证了多线程环境的锁操作。第10行代码判断当前输入流的开闭状态,如果关闭则直接抛出一个流关闭异常给方法调用方。第11 ~ 14行代码负责参数的有效性校验,防止出现越界溢出的情况。第18行代码初始化一个计数器n,用来标记实际读取的字符数。第20 ~ 29行代码中,如果当前环境已经存储了一个高位字符,那么就取出存储的高位字符保存到cbuf中,这个字符会作为读取到到的第一个字符被返回。之后更新了off,len和n的值。如果len已经变为0,或者底层输入流中已经没有可读的数据内容,那么就认为读取结束,直接返回当前记录的实际读取的字符数内容。第31 ~ 38行代码中,如果剩余需要读取的字符长度为1,那么就调用上面提到的read0()方法,该方法会直接读取并返回一个字符内容。最后返回实际读取的字符数。第40行代码则继续从底层输入流中读取字符内容,并返回最终实际读取的字符长度。其调用的方法implRead() 处理逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private CharsetDecoder decoder;private ByteBuffer bb;int implRead (char [] cbuf, int off, int end) throws IOException { assert (end - off > 1 ); CharBuffer cb = CharBuffer.wrap(cbuf, off, end - off); if (cb.position() != 0 ) cb = cb.slice(); boolean eof = false ; for (;;) { CoderResult cr = decoder.decode(bb, cb, eof); if (cr.isUnderflow()) { if (eof) break ; if (!cb.hasRemaining()) break ; if ((cb.position() > 0 ) && !inReady()) break ; int n = readBytes(); if (n < 0 ) { eof = true ; if ((cb.position() == 0 ) && (!bb.hasRemaining())) break ; decoder.reset(); } continue ; } if (cr.isOverflow()) { assert cb.position() > 0 ; break ; } cr.throwException(); } if (eof) { decoder.reset(); } if (cb.position() == 0 ) { if (eof) return -1 ; assert false ; } return cb.position(); }
该方法用来把从底层输入流读取到的字节流转成字符流,然后再保存到数组cbuf中。第9行代码做了一个关于入参off和end的大小关系,避免越界溢出的情况发生,需要注意的是,这里的参数end和off一样,为下标值,而不是需要读取的长度值。第11 ~ 14行代码将入参cbuf封装到一个字符buffer中,对cbuf和字符buffer中任意一个的修改都会导致另外一个同时发生变更。第17 ~ 40行代码完成实际的字节数据转字符数据的操作,具体操作逻辑如下:
第18行代码将当前字节buffer中的数据转成字符数据,并存储到字符buffer中;
第19 ~ 21行代码中,如果字节buffer中的数据已经全部解码并存储到字符buffer中且已经读到了文件结束位置(eof = true),那么退出循环;
第22 ~ 23行代码中,如果字节buffer中的数据已经全部解码并存储到字符buffer中且字符buffer中已经没有空余空间来容纳新的字符数据,那么退出循环;
第24 ~ 25行代码中,如果字节buffer中的数据已经全部解码并存储到字符buffer中且底层输入流已经不再支持通过read方法获取数据,那么退出循环;
通过readBytes()方法从底层输入流中获取字节数据,数据会被存储在字节buffer中,并返回实际读取的数据长度;
如果通过readBytes()方法未读取到至少一个数据,那么设置eof = true,同时如果字节buffer中所有数据都已经读取完毕,那么退出循环;
继续循环,将字节buffer中的内容解码存储到字符buffer中;
第35 ~ 38行代码表示如果字符buffer中空间已满,那么退出循环。
最后返回字符buffer中实际读取的数据长度。其中涉及到的readBytes()方法的相关实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private ByteBuffer bb;private ReadableByteChannel ch;private InputStream in;private int readBytes () throws IOException { bb.compact(); try { if (ch != null ) { int n = ch.read(bb); if (n < 0 ) return n; } else { int lim = bb.limit(); int pos = bb.position(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0 ); assert rem > 0 ; int n = in.read(bb.array(), bb.arrayOffset() + pos, rem); if (n < 0 ) return n; if (n == 0 ) throw new IOException("Underlying input stream returned zero bytes" ); assert (n <= rem) : "n = " + n + ", rem = " + rem; bb.position(pos + n); } } finally { bb.flip(); } int rem = bb.remaining(); assert (rem != 0 ) : rem; return rem; }
该方法完成从底层输入流中读取字节数据到字节buffer中的操作处理。第6行代码会将字节buffer中未读取的数据全部移到buffer左端,这样右端会留出空间来容纳新数据。第8 ~ 12行代码通过ReadableByteChannel来获取数据,第14 ~ 27行代码则通过输入流来获取数据,其中,第15 ~ 19行代码完成可用空间的计算,第20行代码则会从底层输入流中读取数据并存储在字节buffer中。第26行代码则更新字节buffer中下一个可以新数据的位置值。第31行代码负责读写转换,会将position置为0,同时将limit移动到存储的最后一个字节数据的位置。最后返回字节buffer中包含的未读取的字节数据的长度。
public boolean ready() 1 2 3 public boolean ready () throws IOException { return sd.ready(); }
返回当前输入流是否可以通过read方法得到字符数据。具体实现位于StreamDecoder中,相关代码段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private ByteBuffer bb;private InputStream in;private ReadableByteChannel ch;public boolean ready () throws IOException { synchronized (lock) { ensureOpen(); return haveLeftoverChar || implReady(); } } boolean implReady () { return bb.hasRemaining() || inReady(); } private boolean inReady () { try { return (((in != null ) && (in.available() > 0 )) || (ch instanceof FileChannel)); } catch (IOException x) { return false ; } }
在StreamDecoder中,ready方法体被synchronized关键字包围,保证了多线程环境下一次只能被一个线程访问和操作。为了确认是否可以对外提供数据,首先会通过ensureOpen()判断当前流是否开启,只有在开启情况下才能对外提供数据。如果在之前的read方法调用过程中存储了高位字符,那么haveLeftoverChar的值就为true,可以继续通过read方法返回存储的高位字符,或者通过implReady()方法判断底层输入流中是否有未读取的数据流信息。
在implReady()方法中,如果字节buffer中有数据,那么认为当前流是可对外提供数据的,或者通过inReady()方法来判断底层输入流中是否有可读取的数据内容。
public void close() 1 2 3 public void close () throws IOException { sd.close(); }
关闭当前输入流。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private InputStream in;private ReadableByteChannel ch;public void close () throws IOException { synchronized (lock) { if (!isOpen) return ; implClose(); isOpen = false ; } } void implClose () throws IOException { if (ch != null ) ch.close(); else in.close(); }
关闭当前输入流,释放占用资源。
OutputStreamWriter Constructor Summary public OutputStreamWriter(OutputStream out, String charsetName) 1 2 3 4 5 6 7 8 public OutputStreamWriter (OutputStream out, String charsetName) throws UnsupportedEncodingException { super (out); if (charsetName == null ) throw new NullPointerException("charsetName" ); se = StreamEncoder.forOutputStreamWriter(out, this , charsetName); }
初始化一个底层的StreamEncoder。在OutputStreamWriter中,所有的流写入和转换操作都是在流编码器(StreamEncoder)中完成的。所以在完成继承父类的构造函数后,即通过StreamEncoder 的方法forOutputStreamWriter来完成StreamEncoder的初始化工作。StreamEncoder的相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private Charset cs;private CharsetEncoder encoder;private ByteBuffer bb;private final OutputStream out;private WritableByteChannel ch;public static StreamEncoder forOutputStreamWriter (OutputStream out, Object lock, String charsetName) throws UnsupportedEncodingException { String csn = charsetName; if (csn == null ) csn = Charset.defaultCharset().name(); try { if (Charset.isSupported(csn)) return new StreamEncoder(out, lock, Charset.forName(csn)); } catch (IllegalCharsetNameException x) { } throw new UnsupportedEncodingException(csn); } private StreamEncoder (OutputStream out, Object lock, Charset cs) { this (out, lock, cs.newEncoder() .onMalformedInput(CodingErrorAction.REPLACE) .onUnmappableCharacter(CodingErrorAction.REPLACE)); } private StreamEncoder (OutputStream out, Object lock, CharsetEncoder enc) { super (lock); this .out = out; this .ch = null ; this .cs = enc.charset(); this .encoder = enc; if (false && out instanceof FileOutputStream) { ch = ((FileOutputStream)out).getChannel(); if (ch != null ) bb = ByteBuffer.allocateDirect(DEFAULT_BYTE_BUFFER_SIZE); } if (ch == null ) { bb = ByteBuffer.allocate(DEFAULT_BYTE_BUFFER_SIZE); } }
StreamEncoder的路径信息如下:sun.nio.cs.StreamEncoder 。方法forOutputStreamWriter( OutputStream out, Object lock, String charsetName)完成StreamEncoder的初始化工作。第10 ~ 11行代码负责指定一个编码字符集。第14行代码则创建了一个StreamEncoder,并返回给方法调用方。第22 ~ 24行代码中,根据传入的字符集创建一个字符集解码器,然后调用方法StreamEncoder(OutputStream out, Object lock, CharsetEncoder enc)完成相应的初始化操作。由于StreamEncoder继承了Writer,所以第30行代码会首先调用父类的有参构造方法。第29 ~ 32行指定了字符集和字符集解码器。第40 ~ 42行代码则指定了最终调用的输入流和用到的字节缓冲数组的大小,初始化大小为8192 。第45行代码则表示缓冲数组目前为空,第一个可以写入的位置为0位置。
public OutputStreamWriter(OutputStream out) 1 2 3 4 5 6 7 8 public OutputStreamWriter (OutputStream out) { super (out); try { se = StreamEncoder.forOutputStreamWriter(out, this , (String)null ); } catch (UnsupportedEncodingException e) { throw new Error(e); } }
初始化一个底层的StreamEncoder。这个方法中采用默认的字符集编码。
public OutputStreamWriter(OutputStream out, Charset cs) 1 2 3 4 5 6 public OutputStreamWriter (OutputStream out, Charset cs) { super (out); if (cs == null ) throw new NullPointerException("charset" ); se = StreamEncoder.forOutputStreamWriter(out, this , cs); }
初始化一个底层的StreamEncoder。这个方法中显示指定了需要使用的字符集。创建StreamEncoder调用的方法如下:
1 2 3 4 5 6 public static StreamEncoder forOutputStreamWriter (OutputStream out, Object lock, Charset cs) { return new StreamEncoder(out, lock, cs); }
public OutputStreamWriter(OutputStream out, CharsetEncoder enc) 1 2 3 4 5 6 public OutputStreamWriter (OutputStream out, CharsetEncoder enc) { super (out); if (enc == null ) throw new NullPointerException("charset encoder" ); se = StreamEncoder.forOutputStreamWriter(out, this , enc); }
初始化一个底层的StreamEncoder。这个方法中显示指定了需要使用的字符集编码器。创建StreamEncoder调用的方法如下:
1 2 3 4 5 6 public static StreamEncoder forOutputStreamWriter (OutputStream out, Object lock, CharsetEncoder enc) { return new StreamEncoder(out, lock, enc); }
部分方法 public String getEncoding() 1 2 3 public String getEncoding () { return se.getEncoding(); }
获取当前流使用的编码字符集信息,如果当前流已经被关闭,那么会返回null给接口调用方。具体实现位于StreamEncoder中,相关代码段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public String getEncoding () { if (isOpen()) return encodingName(); return null ; } private boolean isOpen () { return isOpen; } String encodingName () { return ((cs instanceof HistoricallyNamedCharset) ? ((HistoricallyNamedCharset)cs).historicalName() : cs.name()); }
在StreamEncoder的getEncoding()方法中,首先检查了当前输出流的开闭状态,如果当前输出流已经关闭,那么就返回null,否则就通过调用encodingName()来获取当前流采用的编码字符集信息。在encodingName()方法中,会判断当前编码字符集的种类,如果当前编码字符集是一个HistoricallyNamedCharset,那么就返回对应的historicalName,否则就返回对应的编码字符集名称。
void flushBuffer() 1 2 3 void flushBuffer () throws IOException { se.flushBuffer(); }
把输出缓冲区buffer的数据推送到底层字节输出流中。具体实现位于StreamEncoder中,相关代码段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void flushBuffer () throws IOException { synchronized (lock) { if (isOpen()) implFlushBuffer(); else throw new IOException("Stream closed" ); } } void implFlushBuffer () throws IOException { if (bb.position() > 0 ) writeBytes(); } private void writeBytes () throws IOException { bb.flip(); int lim = bb.limit(); int pos = bb.position(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0 ); if (rem > 0 ) { if (ch != null ) { if (ch.write(bb) != rem) assert false : rem; } else { out.write(bb.array(), bb.arrayOffset() + pos, rem); } } bb.clear(); }
在flushBuffer()方法中,整个方法体被synchronized包围,保证了多线程环境下的同时只有一个线程可以获得资源的访问控制。如果当前流是开启的,那么调用implFlushBuffer()方法完成数据向底层输出流的推送。在第11 ~ 13行代码中,此时字节bufferbb 处于写状态,所以一旦有内容写入,那么position满足大于0的条件,那么就调用writeBytes()方法完成具体的数据推送处理。
在writeBytes()方法中,第16行代码将字节buffer由写状态转变成读状态。第17 ~ 20行代码完成空间和起始位置的相关计算。第23 ~ 28行代码则会将字节buffer中的内容实际写入到对应的底层输出流中。最后将字节buffer的内容清空,完成所有操作。
public void write(int c) 1 2 3 public void write (int c) throws IOException { se.write(c); }
向底层输出流中写入一个字节的数据。因为在实际写入时,会首先将字符数据转成字节数据,然后将转换后的字节数据存储在一个字节buffer中,此时数据并没有真正的被写出去,所以需要通过调用 flushBuffer()或者close()方法将字节buffer的数据真正写入到某个输出流中,否则会发生数据丢失无法获取的情况。具体实现位于StreamEncoder中,相关代码段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private CharsetEncoder encoder;private ByteBuffer bb;public void write (int c) throws IOException { char cbuf[] = new char [1 ]; cbuf[0 ] = (char ) c; write(cbuf, 0 , 1 ); } private void ensureOpen () throws IOException { if (!isOpen) throw new IOException("Stream closed" ); } public void write (char cbuf[], int off, int len) throws IOException { synchronized (lock) { ensureOpen(); if ((off < 0 ) || (off > cbuf.length) || (len < 0 ) || ((off + len) > cbuf.length) || ((off + len) < 0 )) { throw new IndexOutOfBoundsException(); } else if (len == 0 ) { return ; } implWrite(cbuf, off, len); } } void implWrite (char cbuf[], int off, int len) throws IOException { CharBuffer cb = CharBuffer.wrap(cbuf, off, len); if (haveLeftoverChar) flushLeftoverChar(cb, false ); while (cb.hasRemaining()) { CoderResult cr = encoder.encode(cb, bb, false ); if (cr.isUnderflow()) { assert (cb.remaining() <= 1 ) : cb.remaining(); if (cb.remaining() == 1 ) { haveLeftoverChar = true ; leftoverChar = cb.get(); } break ; } if (cr.isOverflow()) { assert bb.position() > 0 ; writeBytes(); continue ; } cr.throwException(); } }
write(int c)最终依赖方法write(char cbuf[], int off, int len)完成字符数据向字节数据的转换以及写入操作。write(char cbuf[], int off, int len)底层直接依赖implWrite(char cbuf[], int off, int len)方法完成相关操作。第31 ~ 32行代码中,如果之前的写入操作中检测到了高位字符,那么就把保存下来的高位字符连同本次的字符数组中的内容一起写入到字节buffer中。第34 ~ 50行代码完成字符数据向字节数据的转换,将数据写入到底层的字节buffer中,最后将字节buffer中的数据写入到底层输出流中。第35行代码将字符数组中的内容转换成字节数据,并保存到字节buffer中。第36 ~ 43行代码如果最后字符数组中还留有一个字符,那么就标记高位字符并保存这个剩余的字符。第44 ~ 48行代码如果字节buffer没有空间来容纳新的字节数据,那么就将字节buffer中的内容输出到底层输出流中,清空字节buffer来容纳新的字节数据。下面的代码演示了如何处理高位字符的场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private boolean haveLeftoverChar = false ;private char leftoverChar;private CharBuffer lcb = null ;private ByteBuffer bb;private void flushLeftoverChar (CharBuffer cb, boolean endOfInput) throws IOException { if (!haveLeftoverChar && !endOfInput) return ; if (lcb == null ) lcb = CharBuffer.allocate(2 ); else lcb.clear(); if (haveLeftoverChar) lcb.put(leftoverChar); if ((cb != null ) && cb.hasRemaining()) lcb.put(cb.get()); lcb.flip(); while (lcb.hasRemaining() || endOfInput) { CoderResult cr = encoder.encode(lcb, bb, endOfInput); if (cr.isUnderflow()) { if (lcb.hasRemaining()) { leftoverChar = lcb.get(); if (cb != null && cb.hasRemaining()) flushLeftoverChar(cb, endOfInput); return ; } break ; } if (cr.isOverflow()) { assert bb.position() > 0 ; writeBytes(); continue ; } cr.throwException(); } haveLeftoverChar = false ; }
第11 ~ 18行代码将保留的高位字符和入参字符buffer的内容都获取到,然后在第19行将字节buffer转成读模式。第21行代码将字符数据编码成字节数据并存入字节buffer中。如果仍有留余,继续调用自身直至数据全部换成完成。如果字节buffer没有空间容纳新数据,那么将字节buffer中的数据全部写入到底层输出流中清空字节buffer继续循环过程。
public void write(char cbuf[], int off, int len) 1 2 3 public void write (char cbuf[], int off, int len) throws IOException { se.write(cbuf, off, len); }
将一个字符数组cbuf中的内容写入到底层输出流中。
public void write(String str, int off, int len) 1 2 3 public void write (String str, int off, int len) throws IOException { se.write(str, off, len); }
将一个String字符串写入到底层输出流中。具体实现位于StreamEncoder中,相关代码段如下:
1 2 3 4 5 6 7 8 public void write (String str, int off, int len) throws IOException { if (len < 0 ) throw new IndexOutOfBoundsException(); char cbuf[] = new char [len]; str.getChars(off, off + len, cbuf, 0 ); write(cbuf, 0 , len); }
在方法中,会将String字符串先转成一个char数组,然后再调用上面提到的write(char cbuf[], int off, int len)将数据写入到底层输出流中。
public void flush() 1 2 3 public void flush () throws IOException { se.flush(); }
将底层字节buffer的数据输出到底层输出流中。具体实现位于StreamEncoder中,相关代码段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void flush () throws IOException { synchronized (lock) { ensureOpen(); implFlush(); } } void implFlushBuffer () throws IOException { if (bb.position() > 0 ) writeBytes(); } void implFlush () throws IOException { implFlushBuffer(); if (out != null ) out.flush(); }
在StreamEncoder的flush()方法,最终由方法implFlush()完成推送操作。在该方法中,首先调用方法implFlushBuffer()将底层字节buffer的内容输出到底层输出流中,输出完毕后关闭当前输出流,释放占用的资源。
public void close() 1 2 3 public void close () throws IOException { se.close(); }
关闭当前输入流。具体实现位于StreamEncoder中,相关代码段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public void close () throws IOException { synchronized (lock) { if (!isOpen) return ; implClose(); isOpen = false ; } } void implClose () throws IOException { flushLeftoverChar(null , true ); try { for (;;) { CoderResult cr = encoder.flush(bb); if (cr.isUnderflow()) break ; if (cr.isOverflow()) { assert bb.position() > 0 ; writeBytes(); continue ; } cr.throwException(); } if (bb.position() > 0 ) writeBytes(); if (ch != null ) ch.close(); else out.close(); } catch (IOException x) { encoder.reset(); throw x; } }
涉及基础知识点
StreamDecoder的其他构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static StreamDecoder forDecoder (ReadableByteChannel ch, CharsetDecoder dec, int minBufferCap) { return new StreamDecoder(ch, dec, minBufferCap); } StreamDecoder(ReadableByteChannel ch, CharsetDecoder dec, int mbc) { this .in = null ; this .ch = ch; this .decoder = dec; this .cs = dec.charset(); this .bb = ByteBuffer.allocate(mbc < 0 ? DEFAULT_BYTE_BUFFER_SIZE : (mbc < MIN_BYTE_BUFFER_SIZE ? MIN_BYTE_BUFFER_SIZE : mbc)); bb.flip(); }
StreamEncoder的其他构造方法:
1 2 3 4 5 6 7 8 9 10 11 public static StreamEncoder forEncoder (WritableByteChannel ch, CharsetEncoder enc, int minBufferCap) { return new StreamEncoder(ch, enc, minBufferCap); } private StreamEncoder (WritableByteChannel ch, CharsetEncoder enc, int mbc) { this .out = null ; this .ch = ch; this .cs = enc.charset(); this .encoder = enc; this .bb = ByteBuffer.allocate(mbc < 0 ? DEFAULT_BYTE_BUFFER_SIZE : mbc); }
参考文献
黄亿华. Java NIO学习笔记之二-图解ByteBuffer [E]
咕噜是个大胖子. JAVA基础知识之StreamDecoder流 [E]
咕噜是个大胖子. JAVA基础知识之StreamEncoder流 [E]
江湖人称小白哥. 通俗编程——白话NIO之Buffer [E]