- Close output stream in StreamForwarder to prevent lost data,

existing bug but made worse by larger pipe
This commit is contained in:
zzz
2011-09-19 23:37:49 +00:00
parent f186076fb0
commit 6630c29071
2 changed files with 62 additions and 17 deletions

View File

@@ -24,11 +24,9 @@ import net.i2p.util.Log;
*
*/
public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner {
private Log _log;
public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
List<I2PSocket> sockList, Runnable onTimeout) {
super(s, i2ps, slock, initialI2PData, sockList, onTimeout);
_log = I2PAppContext.getGlobalContext().logManager().getLog(I2PTunnelHTTPClientRunner.class);
}
@Override
@@ -37,10 +35,22 @@ public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner {
return new HTTPResponseOutputStream(raw);
}
/**
* Why is this overridden?
* Why flush in super but not here?
* Why do things in different order than in super?
*/
@Override
protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin, Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException, IOException {
protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin,
Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException, IOException {
try {
i2pin.close();
} catch (IOException ioe) {
// ignore
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unable to close the i2p socket input stream: " + i2pin, ioe);
}
try {
i2pout.close();
} catch (IOException ioe) {
// ignore
@@ -49,14 +59,28 @@ public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner {
}
try {
in.close();
} catch (IOException ioe) {
// ignore
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unable to close the browser input stream: " + in, ioe);
}
try {
out.close();
} catch (IOException ioe) {
// ignore
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unable to close the browser output stream: " + out, ioe);
}
i2ps.close();
s.close();
try {
i2ps.close();
} catch (IOException ioe) {
// ignore
}
try {
s.close();
} catch (IOException ioe) {
// ignore
}
t1.join(30*1000);
t2.join(30*1000);
}

View File

@@ -21,7 +21,7 @@ import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErrorListener {
private final Log _log;
protected final Log _log;
private static volatile long __runnerId;
private final long _runnerId;
@@ -89,7 +89,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
this.onTimeout = onTimeout;
lastActivityOn = -1;
startedOn = Clock.getInstance().now();
_log = I2PAppContext.getGlobalContext().logManager().getLog(I2PTunnelRunner.class);
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (_log.shouldLog(Log.INFO))
_log.info("I2PTunnelRunner started");
_runnerId = ++__runnerId;
@@ -203,22 +203,23 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
if (s != null)
s.close();
} catch (IOException ex) {
if (_log.shouldLog(Log.ERROR))
_log.error("Could not close java socket", ex);
if (_log.shouldLog(Log.WARN))
_log.warn("Could not close java socket", ex);
}
if (i2ps != null) {
try {
i2ps.close();
} catch (IOException ex) {
if (_log.shouldLog(Log.ERROR))
_log.error("Could not close I2PSocket", ex);
if (_log.shouldLog(Log.WARN))
_log.warn("Could not close I2PSocket", ex);
}
i2ps.setSocketErrorListener(null);
}
}
}
protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin, Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException, IOException {
protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin,
Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException, IOException {
try {
out.flush();
} catch (IOException ioe) {
@@ -229,12 +230,28 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
} catch (IOException ioe) {
// ignore
}
in.close();
i2pin.close();
try {
in.close();
} catch (IOException ioe) {
// ignore
}
try {
i2pin.close();
} catch (IOException ioe) {
// ignore
}
// ok, yeah, there's a race here in theory, if data comes in after flushing and before
// closing, but its better than before...
s.close();
i2ps.close();
try {
s.close();
} catch (IOException ioe) {
// ignore
}
try {
i2ps.close();
} catch (IOException ioe) {
// ignore
}
t1.join(30*1000);
t2.join(30*1000);
}
@@ -349,7 +366,11 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
_log.warn(direction + ": Error closing input stream", ex);
}
try {
out.flush();
// Thread must close() before exiting for a PipedOutputStream,
// or else input end gives up and we have data loss.
// http://techtavern.wordpress.com/2008/07/16/whats-this-ioexception-write-end-dead/
//out.flush();
out.close();
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn(direction + ": Error flushing to close", ioe);