* HTTPResponseOutputStream

- More caching
      - Stats cleanup
      - Max header length check
      - Catch OOM
      - Initializer cleanup
      - Javadoc
This commit is contained in:
zzz
2010-10-02 14:43:40 +00:00
parent 4c31c70298
commit 4456048e79
2 changed files with 44 additions and 25 deletions

View File

@@ -24,6 +24,9 @@ import net.i2p.util.I2PAppThread;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
* This does the transparent gzip decompression on the client side.
* Extended in I2PTunnelHTTPServer to do the compression on the server side.
*
* Simple stream for delivering an HTTP response to * Simple stream for delivering an HTTP response to
* the client, trivially filtered to make sure "Connection: close" * the client, trivially filtered to make sure "Connection: close"
* is always in the response. Perhaps add transparent handling of the * is always in the response. Perhaps add transparent handling of the
@@ -33,29 +36,27 @@ import net.i2p.util.Log;
* *
*/ */
class HTTPResponseOutputStream extends FilterOutputStream { class HTTPResponseOutputStream extends FilterOutputStream {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private ByteCache _cache;
protected ByteArray _headerBuffer; protected ByteArray _headerBuffer;
private boolean _headerWritten; private boolean _headerWritten;
private byte _buf1[]; private final byte _buf1[];
protected boolean _gzip; protected boolean _gzip;
private long _dataWritten; private long _dataWritten;
private InternalGZIPInputStream _in; private InternalGZIPInputStream _in;
private static final int CACHE_SIZE = 8*1024; private static final int CACHE_SIZE = 8*1024;
private static final ByteCache _cache = ByteCache.getInstance(8, CACHE_SIZE);
// OOM DOS prevention
private static final int MAX_HEADER_SIZE = 64*1024;
public HTTPResponseOutputStream(OutputStream raw) { public HTTPResponseOutputStream(OutputStream raw) {
super(raw); super(raw);
_context = I2PAppContext.getGlobalContext(); _context = I2PAppContext.getGlobalContext();
_context.statManager().createRateStat("i2ptunnel.httpCompressionRatio", "ratio of compressed size to decompressed size after transfer", "I2PTunnel", new long[] { 60*1000, 30*60*1000 }); _context.statManager().createRateStat("i2ptunnel.httpCompressionRatio", "ratio of compressed size to decompressed size after transfer", "I2PTunnel", new long[] { 60*60*1000 });
_context.statManager().createRateStat("i2ptunnel.httpCompressed", "compressed size transferred", "I2PTunnel", new long[] { 60*1000, 30*60*1000 }); _context.statManager().createRateStat("i2ptunnel.httpCompressed", "compressed size transferred", "I2PTunnel", new long[] { 60*60*1000 });
_context.statManager().createRateStat("i2ptunnel.httpExpanded", "size transferred after expansion", "I2PTunnel", new long[] { 60*1000, 30*60*1000 }); _context.statManager().createRateStat("i2ptunnel.httpExpanded", "size transferred after expansion", "I2PTunnel", new long[] { 60*60*1000 });
_log = _context.logManager().getLog(getClass()); _log = _context.logManager().getLog(getClass());
_cache = ByteCache.getInstance(8, CACHE_SIZE);
_headerBuffer = _cache.acquire(); _headerBuffer = _cache.acquire();
_headerWritten = false;
_gzip = false;
_dataWritten = 0;
_buf1 = new byte[1]; _buf1 = new byte[1];
} }
@@ -96,14 +97,20 @@ class HTTPResponseOutputStream extends FilterOutputStream {
} }
} }
/** grow (and free) the buffer as necessary */ /**
private void ensureCapacity() { * grow (and free) the buffer as necessary
* @throws IOException if the headers are too big
*/
private void ensureCapacity() throws IOException {
if (_headerBuffer.getValid() >= MAX_HEADER_SIZE)
throw new IOException("Max header size exceeded: " + MAX_HEADER_SIZE);
if (_headerBuffer.getValid() + 1 >= _headerBuffer.getData().length) { if (_headerBuffer.getValid() + 1 >= _headerBuffer.getData().length) {
int newSize = (int)(_headerBuffer.getData().length * 1.5); int newSize = (int)(_headerBuffer.getData().length * 1.5);
ByteArray newBuf = new ByteArray(new byte[newSize]); ByteArray newBuf = new ByteArray(new byte[newSize]);
System.arraycopy(_headerBuffer.getData(), 0, newBuf.getData(), 0, _headerBuffer.getValid()); System.arraycopy(_headerBuffer.getData(), 0, newBuf.getData(), 0, _headerBuffer.getValid());
newBuf.setValid(_headerBuffer.getValid()); newBuf.setValid(_headerBuffer.getValid());
newBuf.setOffset(0); newBuf.setOffset(0);
// if we changed the ByteArray size, don't put it back in the cache
if (_headerBuffer.getData().length == CACHE_SIZE) if (_headerBuffer.getData().length == CACHE_SIZE)
_cache.release(_headerBuffer); _cache.release(_headerBuffer);
_headerBuffer = newBuf; _headerBuffer = newBuf;
@@ -219,7 +226,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
//out.flush(); //out.flush();
PipedInputStream pi = new PipedInputStream(); PipedInputStream pi = new PipedInputStream();
PipedOutputStream po = new PipedOutputStream(pi); PipedOutputStream po = new PipedOutputStream(pi);
new I2PAppThread(new Pusher(pi, out), "HTTP decompresser").start(); new I2PAppThread(new Pusher(pi, out), "HTTP decompressor").start();
out = po; out = po;
} }
@@ -231,13 +238,13 @@ class HTTPResponseOutputStream extends FilterOutputStream {
_out = out; _out = out;
} }
public void run() { public void run() {
OutputStream to = null;
_in = null; _in = null;
long start = System.currentTimeMillis();
long written = 0; long written = 0;
ByteArray ba = null;
try { try {
_in = new InternalGZIPInputStream(_inRaw); _in = new InternalGZIPInputStream(_inRaw);
byte buf[] = new byte[8192]; ba = _cache.acquire();
byte buf[] = ba.getData();
int read = -1; int read = -1;
while ( (read = _in.read(buf)) != -1) { while ( (read = _in.read(buf)) != -1) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@@ -251,6 +258,8 @@ class HTTPResponseOutputStream extends FilterOutputStream {
} catch (IOException ioe) { } catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Error decompressing: " + written + ", " + (_in != null ? _in.getTotalRead() + "/" + _in.getTotalExpanded() : ""), ioe); _log.warn("Error decompressing: " + written + ", " + (_in != null ? _in.getTotalRead() + "/" + _in.getTotalExpanded() : ""), ioe);
} catch (OutOfMemoryError oom) {
_log.error("OOM in HTTP Decompressor", oom);
} finally { } finally {
if (_log.shouldLog(Log.WARN) && (_in != null)) if (_log.shouldLog(Log.WARN) && (_in != null))
_log.warn("After decompression, written=" + written + _log.warn("After decompression, written=" + written +
@@ -259,23 +268,26 @@ class HTTPResponseOutputStream extends FilterOutputStream {
+ ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining() + ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining()
+ ", finished=" + _in.getFinished() + ", finished=" + _in.getFinished()
: "")); : ""));
if (ba != null)
_cache.release(ba);
if (_out != null) try { if (_out != null) try {
_out.close(); _out.close();
} catch (IOException ioe) {} } catch (IOException ioe) {}
} }
long end = System.currentTimeMillis();
double compressed = (_in != null ? _in.getTotalRead() : 0); double compressed = (_in != null ? _in.getTotalRead() : 0);
double expanded = (_in != null ? _in.getTotalExpanded() : 0); double expanded = (_in != null ? _in.getTotalExpanded() : 0);
double ratio = 0; if (compressed > 0 && expanded > 0) {
if (expanded > 0) // only update the stats if we did something
ratio = compressed/expanded; double ratio = compressed/expanded;
_context.statManager().addRateData("i2ptunnel.httpCompressionRatio", (int)(100d*ratio), 0);
_context.statManager().addRateData("i2ptunnel.httpCompressionRatio", (int)(100d*ratio), end-start); _context.statManager().addRateData("i2ptunnel.httpCompressed", (long)compressed, 0);
_context.statManager().addRateData("i2ptunnel.httpCompressed", (long)compressed, end-start); _context.statManager().addRateData("i2ptunnel.httpExpanded", (long)expanded, 0);
_context.statManager().addRateData("i2ptunnel.httpExpanded", (long)expanded, end-start); }
} }
} }
/** just a wrapper to provide stats for debugging */
private static class InternalGZIPInputStream extends GZIPInputStream { private static class InternalGZIPInputStream extends GZIPInputStream {
public InternalGZIPInputStream(InputStream in) throws IOException { public InternalGZIPInputStream(InputStream in) throws IOException {
super(in); super(in);
@@ -294,6 +306,12 @@ class HTTPResponseOutputStream extends FilterOutputStream {
return 0; return 0;
} }
} }
/**
* From Inflater javadoc:
* Returns the total number of bytes remaining in the input buffer. This can be used to find out
* what bytes still remain in the input buffer after decompression has finished.
*/
public long getRemaining() { public long getRemaining() {
try { try {
return super.inf.getRemaining(); return super.inf.getRemaining();

View File

@@ -290,6 +290,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
} }
} }
/** just a wrapper to provide stats for debugging */
private static class InternalGZIPOutputStream extends GZIPOutputStream { private static class InternalGZIPOutputStream extends GZIPOutputStream {
public InternalGZIPOutputStream(OutputStream target) throws IOException { public InternalGZIPOutputStream(OutputStream target) throws IOException {
super(target); super(target);