Compare commits
2 Commits
i2p-2.7.0-
...
http-persi
Author | SHA1 | Date | |
---|---|---|---|
![]() |
04c0c22ab8 | ||
![]() |
9f04bb04b4 |
@@ -37,7 +37,8 @@ class HTTPResponseOutputStream extends FilterOutputStream {
|
||||
private boolean _headerWritten;
|
||||
private final byte _buf1[];
|
||||
protected boolean _gzip;
|
||||
protected long _dataExpected;
|
||||
protected long _dataExpected = -1;
|
||||
protected boolean _keepAlive;
|
||||
/** lower-case, trimmed */
|
||||
protected String _contentType;
|
||||
/** lower-case, trimmed */
|
||||
@@ -61,6 +62,44 @@ class HTTPResponseOutputStream extends FilterOutputStream {
|
||||
_buf1 = new byte[1];
|
||||
}
|
||||
|
||||
/**
|
||||
* Optionally keep browser socket alive and call callback when we're done.
|
||||
* BROWSER-SIDE ONLY for now.
|
||||
*
|
||||
* @param allowKeepAlive we may, but are not required to, keep the browser-side socket alive
|
||||
* @param isHead is this a response to a HEAD, and thus no data is expected (RFC 2616 sec. 4.4)
|
||||
* @since 0.9.61
|
||||
*/
|
||||
public HTTPResponseOutputStream(OutputStream raw, boolean allowKeepAlive, boolean isHead) {
|
||||
this(raw);
|
||||
_keepAlive = allowKeepAlive;
|
||||
if (isHead)
|
||||
_dataExpected = 0;
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Before headers: keepalive? " + _keepAlive);
|
||||
}
|
||||
|
||||
/**
|
||||
* Should we keep the input stream alive when done?
|
||||
*
|
||||
* @return false always, not yet supported
|
||||
* @since 0.9.61
|
||||
*/
|
||||
public boolean getKeepAliveIn() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should we keep the output stream alive when done?
|
||||
* Only supported for the browser socket side.
|
||||
* I2P socket on server side not supported yet.
|
||||
*
|
||||
* @since 0.9.61
|
||||
*/
|
||||
public boolean getKeepAliveOut() {
|
||||
return _keepAlive;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int c) throws IOException {
|
||||
_buf1[0] = (byte)c;
|
||||
@@ -142,10 +181,9 @@ class HTTPResponseOutputStream extends FilterOutputStream {
|
||||
|
||||
/** ok, received, now munge & write it */
|
||||
private void writeHeader() throws IOException {
|
||||
String responseLine = null;
|
||||
|
||||
boolean connectionSent = false;
|
||||
boolean proxyConnectionSent = false;
|
||||
boolean chunked = false;
|
||||
|
||||
int lastEnd = -1;
|
||||
byte[] data = _headerBuffer.getData();
|
||||
@@ -153,11 +191,26 @@ class HTTPResponseOutputStream extends FilterOutputStream {
|
||||
for (int i = 0; i < valid; i++) {
|
||||
if (data[i] == NL) {
|
||||
if (lastEnd == -1) {
|
||||
responseLine = DataHelper.getUTF8(data, 0, i+1); // includes NL
|
||||
String responseLine = DataHelper.getUTF8(data, 0, i+1); // includes NL
|
||||
responseLine = filterResponseLine(responseLine);
|
||||
responseLine = (responseLine.trim() + "\r\n");
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Response: " + responseLine.trim());
|
||||
// Persistent conn requires HTTP/1.1
|
||||
if (_keepAlive && !responseLine.startsWith("HTTP/1.1 "))
|
||||
_keepAlive = false;
|
||||
// force zero datalen for 1xx, 204, 304 (RFC 2616 sec. 4.4)
|
||||
// so that these don't prevent keepalive
|
||||
int sp = responseLine.indexOf(" ");
|
||||
if (sp > 0) {
|
||||
String s = responseLine.substring(sp + 1);
|
||||
if (s.startsWith("1") || s.startsWith("204") || s.startsWith("304"))
|
||||
_dataExpected = 0;
|
||||
} else {
|
||||
// no status?
|
||||
_keepAlive = false;
|
||||
}
|
||||
|
||||
out.write(DataHelper.getUTF8(responseLine));
|
||||
} else {
|
||||
for (int j = lastEnd+1; j < i; j++) {
|
||||
@@ -182,12 +235,18 @@ class HTTPResponseOutputStream extends FilterOutputStream {
|
||||
// pass through for websocket
|
||||
out.write(DataHelper.getASCII("Connection: " + val + "\r\n"));
|
||||
proxyConnectionSent = true;
|
||||
// Disable persistence
|
||||
_keepAlive = false;
|
||||
} else {
|
||||
out.write(CONNECTION_CLOSE);
|
||||
// Strip to allow persistence, replace to disallow
|
||||
if (!_keepAlive)
|
||||
out.write(CONNECTION_CLOSE);
|
||||
}
|
||||
connectionSent = true;
|
||||
} else if ("proxy-connection".equals(lcKey)) {
|
||||
out.write(PROXY_CONNECTION_CLOSE);
|
||||
// Strip to allow persistence, replace to disallow
|
||||
if (!_keepAlive)
|
||||
out.write(PROXY_CONNECTION_CLOSE);
|
||||
proxyConnectionSent = true;
|
||||
} else if ("content-encoding".equals(lcKey) && "x-i2p-gzip".equals(val.toLowerCase(Locale.US))) {
|
||||
_gzip = true;
|
||||
@@ -206,6 +265,9 @@ class HTTPResponseOutputStream extends FilterOutputStream {
|
||||
} else if ("content-encoding".equals(lcKey)) {
|
||||
// save for compress decision on server side
|
||||
_contentEncoding = val.toLowerCase(Locale.US);
|
||||
} else if ("transfer-encoding".equals(lcKey) && val.toLowerCase(Locale.US).contains("chunked")) {
|
||||
// save for keepalive decision on client side
|
||||
chunked = true;
|
||||
} else if ("set-cookie".equals(lcKey)) {
|
||||
String lcVal = val.toLowerCase(Locale.US);
|
||||
if (lcVal.contains("domain=b32.i2p") ||
|
||||
@@ -228,21 +290,48 @@ class HTTPResponseOutputStream extends FilterOutputStream {
|
||||
lastEnd = i;
|
||||
}
|
||||
}
|
||||
|
||||
// Now make the final keepalive decision
|
||||
if (_keepAlive) {
|
||||
// we need one but not both
|
||||
if ((chunked && _dataExpected >= 0) ||
|
||||
(!chunked && _dataExpected < 0))
|
||||
_keepAlive = false;
|
||||
}
|
||||
|
||||
if (!connectionSent)
|
||||
if (!connectionSent && !_keepAlive)
|
||||
out.write(CONNECTION_CLOSE);
|
||||
if (!proxyConnectionSent)
|
||||
if (!proxyConnectionSent && !_keepAlive)
|
||||
out.write(PROXY_CONNECTION_CLOSE);
|
||||
|
||||
finishHeaders();
|
||||
|
||||
boolean shouldCompress = shouldCompress();
|
||||
if (_log.shouldInfo())
|
||||
_log.info("After headers: gzip? " + _gzip + " compress? " + shouldCompress);
|
||||
_log.info("After headers: gzip? " + _gzip + " compress? " + shouldCompress + " keepalive? " + _keepAlive);
|
||||
|
||||
if (data.length == CACHE_SIZE)
|
||||
_cache.release(_headerBuffer);
|
||||
_headerBuffer = null;
|
||||
|
||||
/*****
|
||||
// TODO
|
||||
// Setup the keepalive streams
|
||||
// Until we have keepalive for the i2p socket, the client side
|
||||
// does not need to do this, we just wait for the socket to close.
|
||||
// Until we have keepalive for the server socket, the server side
|
||||
// does not need to do this, we just wait for the socket to close.
|
||||
if (_keepAlive) {
|
||||
if (_dataExpected > 0) {
|
||||
// content-length
|
||||
// filter output stream to count the data
|
||||
} else {
|
||||
// chunked
|
||||
// filter output stream to look for the end
|
||||
}
|
||||
}
|
||||
****/
|
||||
|
||||
if (shouldCompress) {
|
||||
beginProcessing();
|
||||
}
|
||||
@@ -257,7 +346,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Closing " + out + " compressed? " + shouldCompress(), new Exception("I did it"));
|
||||
_log.info("Closing " + out + " compressed? " + shouldCompress() + " keepalive? " + _keepAlive, new Exception("I did it"));
|
||||
synchronized(this) {
|
||||
// synch with changing out field below
|
||||
super.close();
|
||||
|
@@ -40,6 +40,7 @@ import net.i2p.i2ptunnel.localServer.LocalHTTPServer;
|
||||
import net.i2p.util.ConvertToHash;
|
||||
import net.i2p.util.DNSOverHTTPS;
|
||||
import net.i2p.util.EventDispatcher;
|
||||
import net.i2p.util.InternalSocket;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.PortMapper;
|
||||
|
||||
@@ -99,6 +100,10 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
private static final String PROP_UA_I2P = "httpclient.userAgent.i2p";
|
||||
private static final String PROP_UA_CLEARNET = "httpclient.userAgent.outproxy";
|
||||
|
||||
// how long to wait for another request on the same socket
|
||||
// Firefox timeout appears to be about 114 seconds, so it will close before we do.
|
||||
private static final int BROWSER_KEEPALIVE_TIMEOUT = 2*60*1000;
|
||||
|
||||
/**
|
||||
* These are backups if the xxx.ht error page is missing.
|
||||
*/
|
||||
@@ -405,11 +410,28 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
String currentProxy = null;
|
||||
long requestId = __requestId.incrementAndGet();
|
||||
boolean shout = false;
|
||||
boolean isConnect = false;
|
||||
boolean isHead = false;
|
||||
I2PSocket i2ps = null;
|
||||
try {
|
||||
s.setSoTimeout(INITIAL_SO_TIMEOUT);
|
||||
out = s.getOutputStream();
|
||||
InputReader reader = new InputReader(s.getInputStream());
|
||||
int requestCount = 0;
|
||||
// Will be set to false for non-GET/HEAD, non-HTTP/1.1,
|
||||
// Connection: close, Proxy-Connection: close,
|
||||
// or after analysis of the response headers in HTTPResponseOutputStream,
|
||||
// or on errors in I2PTunnelRunner.
|
||||
// Do not set keepalive for internal sockets.
|
||||
boolean keepalive = !(s instanceof InternalSocket);
|
||||
// indent -------------------------------------------------------------
|
||||
do {
|
||||
// indent -------------------------------------------------------------
|
||||
if (requestCount > 0) {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Keepalive, awaiting request #" + requestCount);
|
||||
s.setSoTimeout(BROWSER_KEEPALIVE_TIMEOUT);
|
||||
}
|
||||
String line, method = null, protocol = null, host = null, destination = null;
|
||||
String hostLowerCase = null;
|
||||
StringBuilder newRequest = new StringBuilder();
|
||||
@@ -431,9 +453,10 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
|
||||
String lowercaseLine = line.toLowerCase(Locale.US);
|
||||
|
||||
if(method == null) { // first line (GET /base64/realaddr)
|
||||
if(method == null) {
|
||||
// first line GET/POST/etc.
|
||||
if(_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug(getPrefix(requestId) + "First line [" + line + "]");
|
||||
_log.debug(getPrefix(requestId) + "req #" + requestCount + " first line [" + line + "]");
|
||||
}
|
||||
|
||||
String[] params = DataHelper.split(line, " ", 3);
|
||||
@@ -481,12 +504,19 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
****/
|
||||
}
|
||||
|
||||
method = params[0];
|
||||
if (method.toUpperCase(Locale.US).equals("CONNECT")) {
|
||||
method = params[0].toUpperCase(Locale.US);
|
||||
if (method.equals("HEAD")) {
|
||||
isHead = true;
|
||||
} else if (method.equals("CONNECT")) {
|
||||
// this makes things easier later, by spoofing a
|
||||
// protocol so the URI parser find the host and port
|
||||
// For in-net outproxy, will be fixed up below
|
||||
request = "https://" + request + '/';
|
||||
isConnect = true;
|
||||
keepalive = false;
|
||||
} else if (!method.equals("GET")) {
|
||||
// POST, PUT, ...
|
||||
keepalive = false;
|
||||
}
|
||||
|
||||
// Now use the Java URI parser
|
||||
@@ -568,6 +598,8 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
}
|
||||
|
||||
String protocolVersion = params[2];
|
||||
if (!protocolVersion.equals("HTTP/1.1"))
|
||||
keepalive = false;
|
||||
|
||||
protocol = requestURI.getScheme();
|
||||
host = requestURI.getHost();
|
||||
@@ -650,8 +682,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
break;
|
||||
}
|
||||
******/
|
||||
} else if ("https".equals(protocol) ||
|
||||
method.toUpperCase(Locale.US).equals("CONNECT")) {
|
||||
} else if ("https".equals(protocol) || isConnect) {
|
||||
remotePort = 443;
|
||||
} else {
|
||||
remotePort = 80;
|
||||
@@ -815,19 +846,21 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
host = getHostName(addressHelper);
|
||||
}
|
||||
|
||||
// now strip everything but path and query from URI
|
||||
targetRequest = requestURI.toASCIIString();
|
||||
String newURI = requestURI.getRawPath();
|
||||
if(query != null) {
|
||||
newURI += '?' + query;
|
||||
}
|
||||
try {
|
||||
requestURI = new URI(newURI);
|
||||
} catch(URISyntaxException use) {
|
||||
// shouldnt happen
|
||||
_log.warn(request, use);
|
||||
method = null;
|
||||
break;
|
||||
if (!isConnect) {
|
||||
// now strip everything but path and query from URI
|
||||
String newURI = requestURI.getRawPath();
|
||||
if(query != null) {
|
||||
newURI += '?' + query;
|
||||
}
|
||||
try {
|
||||
requestURI = new URI(newURI);
|
||||
} catch(URISyntaxException use) {
|
||||
// shouldnt happen
|
||||
_log.warn(request, use);
|
||||
method = null;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// end of (host endsWith(".i2p"))
|
||||
@@ -853,8 +886,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
int rPort = requestURI.getPort();
|
||||
if (rPort > 0)
|
||||
remotePort = rPort;
|
||||
else if ("https".equals(protocol) ||
|
||||
method.toUpperCase(Locale.US).equals("CONNECT"))
|
||||
else if ("https".equals(protocol) || isConnect)
|
||||
remotePort = 443;
|
||||
else
|
||||
remotePort = 80;
|
||||
@@ -873,8 +905,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
if(_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug("Before selecting outproxy for " + host);
|
||||
}
|
||||
if ("https".equals(protocol) ||
|
||||
method.toUpperCase(Locale.US).equals("CONNECT"))
|
||||
if ("https".equals(protocol) || isConnect)
|
||||
currentProxy = selectSSLProxy(hostLowerCase);
|
||||
else
|
||||
currentProxy = selectProxy(hostLowerCase);
|
||||
@@ -930,16 +961,22 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
break;
|
||||
}
|
||||
|
||||
if (method.toUpperCase(Locale.US).equals("CONNECT")) {
|
||||
if (isConnect) {
|
||||
// fix up the change to requestURI above to get back to the original host:port
|
||||
line = method + ' ' + requestURI.getHost() + ':' + requestURI.getPort() + ' ' + protocolVersion;
|
||||
if (usingInternalOutproxy || usingWWWProxy)
|
||||
line = method + ' ' + requestURI.getHost() + ':' + requestURI.getPort() + ' ' + protocolVersion;
|
||||
else
|
||||
line = method + ' ' + host + ':' + remotePort + ' ' + protocolVersion;
|
||||
} else {
|
||||
line = method + ' ' + requestURI.toASCIIString() + ' ' + protocolVersion;
|
||||
}
|
||||
|
||||
if(_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug(getPrefix(requestId) + "REQ : \"" + request + "\"");
|
||||
_log.debug(getPrefix(requestId) + "REQURI: \"" + requestURI + "\"");
|
||||
_log.debug(getPrefix(requestId) + "NEWREQ: \"" + line + "\"");
|
||||
_log.debug(getPrefix(requestId) + "HOST : \"" + host + "\"");
|
||||
_log.debug(getPrefix(requestId) + "RPORT : \"" + remotePort + "\"");
|
||||
_log.debug(getPrefix(requestId) + "DEST : \"" + destination + "\"");
|
||||
}
|
||||
|
||||
@@ -950,11 +987,16 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
if (lowercaseLine.contains("upgrade")) {
|
||||
// pass through for websocket
|
||||
preserveConnectionHeader = true;
|
||||
keepalive = false;
|
||||
} else {
|
||||
if (lowercaseLine.contains("close"))
|
||||
keepalive = false;
|
||||
continue;
|
||||
}
|
||||
} else if (lowercaseLine.startsWith("keep-alive: ") ||
|
||||
lowercaseLine.startsWith("proxy-connection: ")) {
|
||||
if (lowercaseLine.contains("close"))
|
||||
keepalive = false;
|
||||
continue;
|
||||
} else if (lowercaseLine.startsWith("host: ") && !usingWWWProxy && !usingInternalOutproxy) {
|
||||
// Note that we only pass the original Host: line through to the outproxy
|
||||
@@ -1062,8 +1104,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
if(ok != null) {
|
||||
gzip = Boolean.parseBoolean(ok);
|
||||
}
|
||||
if(gzip && !usingInternalServer &&
|
||||
!method.toUpperCase(Locale.US).equals("CONNECT")) {
|
||||
if(gzip && !usingInternalServer && !isConnect) {
|
||||
// according to rfc2616 s14.3, this *should* force identity, even if
|
||||
// an explicit q=0 for gzip doesn't. tested against orion.i2p, and it
|
||||
// seems to work.
|
||||
@@ -1072,7 +1113,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
if (!usingInternalOutproxy)
|
||||
newRequest.append("X-Accept-Encoding: x-i2p-gzip;q=1.0, identity;q=0.5, deflate;q=0, gzip;q=0, *;q=0\r\n");
|
||||
}
|
||||
if(!shout && !method.toUpperCase(Locale.US).equals("CONNECT")) {
|
||||
if(!shout && !isConnect) {
|
||||
if(!Boolean.parseBoolean(getTunnel().getClientOptions().getProperty(PROP_USER_AGENT))) {
|
||||
// let's not advertise to external sites that we are from I2P
|
||||
String ua;
|
||||
@@ -1124,7 +1165,12 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
}
|
||||
|
||||
if(method == null || (destination == null && !usingInternalOutproxy)) {
|
||||
//l.log("No HTTP method found in the request.");
|
||||
if (requestCount > 0) {
|
||||
// SocketTimeout, normal to get here for persistent connections,
|
||||
// because DataHelper.readLine() returns null on EOF
|
||||
return;
|
||||
}
|
||||
_log.debug("No HTTP method found in the request.");
|
||||
try {
|
||||
if (protocol != null && "http".equals(protocol.toLowerCase(Locale.US))) {
|
||||
out.write(getErrorPage("denied", ERR_REQUEST_DENIED).getBytes("UTF-8"));
|
||||
@@ -1143,6 +1189,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
}
|
||||
|
||||
// Authorization
|
||||
// Yes, this is sent and checked for every request on a persistent connection
|
||||
AuthResult result = authorize(s, requestId, method, authorization);
|
||||
if (result != AuthResult.AUTH_GOOD) {
|
||||
if(_log.shouldLog(Log.WARN)) {
|
||||
@@ -1184,7 +1231,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, currentProxy, requestId);
|
||||
byte[] data;
|
||||
byte[] response;
|
||||
if (method.toUpperCase(Locale.US).equals("CONNECT")) {
|
||||
if (isConnect) {
|
||||
data = null;
|
||||
response = SUCCESS_RESPONSE.getBytes("UTF-8");
|
||||
} else {
|
||||
@@ -1329,7 +1376,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
}
|
||||
|
||||
// as of 0.9.35, allowInternalSSL defaults to true, and overridden to true unless PROP_SSL_SET is set
|
||||
if (method.toUpperCase(Locale.US).equals("CONNECT") &&
|
||||
if (isConnect &&
|
||||
!usingWWWProxy &&
|
||||
getTunnel().getClientOptions().getProperty(PROP_SSL_SET) != null &&
|
||||
!Boolean.parseBoolean(getTunnel().getClientOptions().getProperty(PROP_INTERNAL_SSL, "true"))) {
|
||||
@@ -1401,10 +1448,10 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
if (remotePort > 0)
|
||||
sktOpts.setPort(remotePort);
|
||||
i2ps = createI2PSocket(clientDest, sktOpts);
|
||||
boolean isConnect = method.toUpperCase(Locale.US).equals("CONNECT");
|
||||
OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy,
|
||||
currentProxy, requestId, hostLowerCase, isConnect);
|
||||
I2PTunnelRunner t;
|
||||
I2PTunnelHTTPClientRunner hrunner = null;
|
||||
if (isConnect) {
|
||||
byte[] data;
|
||||
byte[] response;
|
||||
@@ -1418,15 +1465,31 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
|
||||
t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, onTimeout);
|
||||
} else {
|
||||
byte[] data = newRequest.toString().getBytes("ISO-8859-1");
|
||||
t = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout);
|
||||
hrunner = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout, keepalive, isHead);
|
||||
t = hrunner;
|
||||
}
|
||||
if (usingWWWProxy) {
|
||||
t.setSuccessCallback(new OnProxySuccess(currentProxy, hostLowerCase, isConnect));
|
||||
}
|
||||
// we are called from an unlimited thread pool, so run inline
|
||||
//t.start();
|
||||
String name = Thread.currentThread().getName();
|
||||
t.run();
|
||||
// I2PTunnelHTTPClientRunner spins off the browser-to-i2p thread and keeps
|
||||
// the i2p-to-socket copier in-line. So we won't get here until the i2p socket is closed.
|
||||
Thread.currentThread().setName(name);
|
||||
|
||||
// check if whatever was in the response does not allow keepalive
|
||||
if (keepalive && hrunner != null && !hrunner.getKeepAliveSocket())
|
||||
break;
|
||||
// go around again
|
||||
requestCount++;
|
||||
// indent -------------------------------------------------------------
|
||||
} while (keepalive);
|
||||
// indent -------------------------------------------------------------
|
||||
} catch(IOException ex) {
|
||||
// This is normal for keepalive when the browser closed the socket,
|
||||
// or a SocketTimeoutException if we gave up first
|
||||
if(_log.shouldLog(Log.INFO)) {
|
||||
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
|
||||
}
|
||||
|
@@ -23,53 +23,103 @@ import net.i2p.client.streaming.I2PSocket;
|
||||
* Warning - not maintained as a stable API for external use.
|
||||
*/
|
||||
public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner {
|
||||
private HTTPResponseOutputStream _hout;
|
||||
private final boolean _isHead;
|
||||
|
||||
/**
|
||||
* Does NOT start itself. Caller must call start().
|
||||
*
|
||||
* @deprecated use other constructor
|
||||
*/
|
||||
@Deprecated
|
||||
public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
|
||||
List<I2PSocket> sockList, FailCallback onFail) {
|
||||
super(s, i2ps, slock, initialI2PData, null, sockList, onFail);
|
||||
_isHead = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Does NOT start itself. Caller must call start().
|
||||
*
|
||||
* @param allowKeepAliveSocket we may, but are not required to, keep the browser-side socket alive
|
||||
* @param isHead is this a response to a HEAD, and thus no data is expected (RFC 2616 sec. 4.4)
|
||||
* @since 0.9.61
|
||||
*/
|
||||
public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
|
||||
List<I2PSocket> sockList, FailCallback onFail,
|
||||
boolean allowKeepAliveSocket, boolean isHead) {
|
||||
super(s, i2ps, slock, initialI2PData, null, sockList, onFail, false, allowKeepAliveSocket);
|
||||
_isHead = isHead;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only call once!
|
||||
*
|
||||
* @return an HTTPResponseOutputStream
|
||||
* @throws IllegalStateException if called again
|
||||
*/
|
||||
@Override
|
||||
protected OutputStream getSocketOut() throws IOException {
|
||||
if (_hout != null)
|
||||
throw new IllegalStateException("already called");
|
||||
OutputStream raw = super.getSocketOut();
|
||||
return new HTTPResponseOutputStream(raw);
|
||||
_hout = new HTTPResponseOutputStream(raw, super.getKeepAliveSocket(), _isHead);
|
||||
return _hout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should we keep the local browser/server socket open when done?
|
||||
* @since 0.9.61
|
||||
*/
|
||||
@Override
|
||||
boolean getKeepAliveSocket() {
|
||||
return _hout != null && _hout.getKeepAliveOut() && super.getKeepAliveSocket();
|
||||
}
|
||||
|
||||
/**
|
||||
* Why is this overridden?
|
||||
* Why flush in super but not here?
|
||||
* Why do things in different order than in super?
|
||||
*
|
||||
* @param out may be null
|
||||
* @param in may be null
|
||||
* @param i2pout may be null
|
||||
* @param i2pin may be null
|
||||
* @param t1 may be null
|
||||
* @param t2 may be null, ignored, we only join t1
|
||||
*/
|
||||
@Override
|
||||
protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin,
|
||||
Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException {
|
||||
boolean keepalive = getKeepAliveSocket();
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Closing HTTPClientRunner keepalive? " + keepalive, new Exception("I did it"));
|
||||
if (i2pin != null) { try {
|
||||
i2pin.close();
|
||||
} catch (IOException ioe) {} }
|
||||
if (i2pout != null) { try {
|
||||
i2pout.close();
|
||||
} catch (IOException ioe) {} }
|
||||
if (in != null) { try {
|
||||
in.close();
|
||||
} catch (IOException ioe) {} }
|
||||
if (!keepalive) {
|
||||
if (in != null) { try {
|
||||
in.close();
|
||||
} catch (IOException ioe) {} }
|
||||
}
|
||||
if (out != null) { try {
|
||||
out.close();
|
||||
if (keepalive)
|
||||
out.flush();
|
||||
else
|
||||
out.close();
|
||||
} catch (IOException ioe) {} }
|
||||
try {
|
||||
i2ps.close();
|
||||
} catch (IOException ioe) {}
|
||||
try {
|
||||
s.close();
|
||||
} catch (IOException ioe) {}
|
||||
if (!keepalive) {
|
||||
try {
|
||||
s.close();
|
||||
} catch (IOException ioe) {}
|
||||
}
|
||||
if (t1 != null)
|
||||
t1.join(30*1000);
|
||||
// t2 = fromI2P now run inline
|
||||
//t2.join(30*1000);
|
||||
}
|
||||
}
|
||||
|
@@ -27,7 +27,10 @@ import net.i2p.util.InternalSocket;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* A thread that starts two more threads, one to forward traffic in each direction.
|
||||
* A thread that starts one more thread if keepAliveSocket is false,
|
||||
* to forward traffic in each direction.
|
||||
* When keepAliveSocket is true, we do not expect additional data and do not
|
||||
* need a forwarding thread from the socket to I2P.
|
||||
*
|
||||
* Warning - not maintained as a stable API for external use.
|
||||
*/
|
||||
@@ -52,8 +55,6 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
private volatile boolean finished;
|
||||
private final byte[] initialI2PData;
|
||||
private final byte[] initialSocketData;
|
||||
/** when the last data was sent/received (or -1 if never) */
|
||||
private long lastActivityOn;
|
||||
/** when the runner started up */
|
||||
private final long startedOn;
|
||||
private final List<I2PSocket> sockList;
|
||||
@@ -65,6 +66,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
private long totalSent;
|
||||
// does not include initialSocketData
|
||||
private long totalReceived;
|
||||
// not final, may be changed by extending classes
|
||||
protected volatile boolean _keepAliveI2P, _keepAliveSocket;
|
||||
|
||||
/**
|
||||
* For use in new constructor
|
||||
@@ -166,6 +169,26 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* With keepAlive args. Does NOT start itself. Caller must call start().
|
||||
*
|
||||
* @param slock the socket lock, non-null
|
||||
* @param initialI2PData may be null
|
||||
* @param initialSocketData may be null
|
||||
* @param sockList may be null. Caller must add i2ps to the list! It will be removed here on completion.
|
||||
* Will synchronize on slock when removing.
|
||||
* @param onFail May be null. If non-null and no data (except initial data) was received,
|
||||
* it will be run before closing s.
|
||||
* @param keepAliveI2P do not close the I2P socket at the end
|
||||
* @param keepAliveSocket do not close the local socket at the end
|
||||
* @since 0.9.61
|
||||
*/
|
||||
public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
|
||||
byte[] initialSocketData, List<I2PSocket> sockList, FailCallback onFail,
|
||||
boolean keepAliveI2P, boolean keepAliveSocket) {
|
||||
this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, keepAliveI2P, keepAliveSocket, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Base constructor
|
||||
*
|
||||
@@ -182,6 +205,30 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
private I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
|
||||
byte[] initialSocketData, List<I2PSocket> sockList, Runnable onTimeout,
|
||||
FailCallback onFail, boolean shouldStart) {
|
||||
this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, false, false, shouldStart);
|
||||
}
|
||||
|
||||
/**
|
||||
* Base constructor with keepAlive args
|
||||
*
|
||||
* @param slock the socket lock, non-null
|
||||
* @param initialI2PData may be null
|
||||
* @param initialSocketData may be null
|
||||
* @param sockList may be null. Caller must add i2ps to the list! It will be removed here on completion.
|
||||
* Will synchronize on slock when removing.
|
||||
* @param onTimeout May be null. If non-null and no data (except initial data) was received,
|
||||
* it will be run before closing s.
|
||||
* @param onFail Trumps onTimeout
|
||||
* @param shouldStart should thread be started in constructor (bad, false recommended)
|
||||
* @param keepAliveI2P do not close the I2P socket at the end
|
||||
* @param keepAliveSocket do not close the local socket at the end
|
||||
* @since 0.9.61
|
||||
*/
|
||||
private I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
|
||||
byte[] initialSocketData, List<I2PSocket> sockList, Runnable onTimeout,
|
||||
FailCallback onFail,
|
||||
boolean keepAliveI2P, boolean keepAliveSocket,
|
||||
boolean shouldStart) {
|
||||
this.sockList = sockList;
|
||||
this.s = s;
|
||||
this.i2ps = i2ps;
|
||||
@@ -190,9 +237,10 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
this.initialSocketData = initialSocketData;
|
||||
this.onTimeout = onTimeout;
|
||||
_onFail = onFail;
|
||||
lastActivityOn = -1;
|
||||
startedOn = Clock.getInstance().now();
|
||||
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
|
||||
_keepAliveI2P = keepAliveI2P;
|
||||
_keepAliveSocket = keepAliveSocket;
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("I2PTunnelRunner started");
|
||||
_runnerId = __runnerId.incrementAndGet();
|
||||
@@ -221,15 +269,9 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
*/
|
||||
@Deprecated
|
||||
public long getLastActivityOn() {
|
||||
return lastActivityOn;
|
||||
return -1L;
|
||||
}
|
||||
|
||||
/****
|
||||
private void updateActivity() {
|
||||
lastActivityOn = Clock.getInstance().now();
|
||||
}
|
||||
****/
|
||||
|
||||
/**
|
||||
* When this runner started up transferring data
|
||||
*
|
||||
@@ -251,6 +293,22 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
|
||||
protected InputStream getSocketIn() throws IOException { return s.getInputStream(); }
|
||||
protected OutputStream getSocketOut() throws IOException { return s.getOutputStream(); }
|
||||
|
||||
/**
|
||||
* Should we keep the I2P socket open when done?
|
||||
* @since 0.9.61
|
||||
*/
|
||||
boolean getKeepAliveI2P() {
|
||||
return _keepAliveI2P;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should we keep the local browser/server socket open when done?
|
||||
* @since 0.9.61
|
||||
*/
|
||||
boolean getKeepAliveSocket() {
|
||||
return _keepAliveSocket;
|
||||
}
|
||||
|
||||
private static final byte[] POST = { 'P', 'O', 'S', 'T', ' ' };
|
||||
private static final byte[] PUT = { 'P', 'U', 'T', ' ' };
|
||||
@@ -266,7 +324,6 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
StreamForwarder toI2P = null;
|
||||
StreamForwarder fromI2P = null;
|
||||
try {
|
||||
in = getSocketIn();
|
||||
out = getSocketOut(); // = new BufferedOutputStream(s.getOutputStream(), NETWORK_BUFFER_SIZE);
|
||||
// unimplemented in streaming
|
||||
//i2ps.setSocketErrorListener(this);
|
||||
@@ -299,15 +356,25 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
// this does not increment totalReceived
|
||||
out.write(initialSocketData);
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug("Initial data " + (initialI2PData != null ? initialI2PData.length : 0)
|
||||
+ " written to I2P, " + (initialSocketData != null ? initialSocketData.length : 0)
|
||||
+ " written to the socket, starting forwarders");
|
||||
if (!(s instanceof InternalSocket))
|
||||
in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE);
|
||||
toI2P = new StreamForwarder(in, i2pout, true, null);
|
||||
|
||||
}
|
||||
if (_keepAliveSocket) {
|
||||
// standard GET or HEAD, no data, do not thread a forwarder
|
||||
// because we don't need it and
|
||||
// we don't want it to swallow the next request
|
||||
} else {
|
||||
in = getSocketIn();
|
||||
// InternalSocket already has buffering
|
||||
if (!(s instanceof InternalSocket))
|
||||
in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE);
|
||||
toI2P = new StreamForwarder(in, i2pout, true, null);
|
||||
toI2P.start();
|
||||
}
|
||||
fromI2P = new StreamForwarder(i2pin, out, false, _onSuccess);
|
||||
toI2P.start();
|
||||
// We are already a thread, so run the second one inline
|
||||
//fromI2P.start();
|
||||
fromI2P.run();
|
||||
@@ -330,7 +397,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
// HTTPClient never sets initialSocketData.
|
||||
if (_onFail != null) {
|
||||
Exception e = fromI2P.getFailure();
|
||||
if (e == null)
|
||||
if (e == null && toI2P != null)
|
||||
e = toI2P.getFailure();
|
||||
_onFail.onFail(e);
|
||||
} else {
|
||||
@@ -339,7 +406,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
} else {
|
||||
// Detect a reset on one side, and propagate to the other
|
||||
Exception e1 = fromI2P.getFailure();
|
||||
Exception e2 = toI2P.getFailure();
|
||||
Exception e2 = toI2P != null ? toI2P.getFailure() : null;
|
||||
Throwable c1 = e1 != null ? e1.getCause() : null;
|
||||
Throwable c2 = e2 != null ? e2.getCause() : null;
|
||||
if (c1 != null && c1 instanceof I2PSocketException) {
|
||||
@@ -365,11 +432,17 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
} catch (InterruptedException ex) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Interrupted", ex);
|
||||
_keepAliveI2P = false;
|
||||
_keepAliveSocket = false;
|
||||
} catch (SSLException she) {
|
||||
_log.error("SSL error", she);
|
||||
_keepAliveI2P = false;
|
||||
_keepAliveSocket = false;
|
||||
} catch (IOException ex) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Error forwarding", ex);
|
||||
_keepAliveI2P = false;
|
||||
_keepAliveSocket = false;
|
||||
} catch (IllegalStateException ise) {
|
||||
// JamVM (Gentoo: jamvm-1.5.4, gnu-classpath-0.98+gmp)
|
||||
//java.nio.channels.NotYetConnectedException
|
||||
@@ -384,9 +457,13 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
// at net.i2p.i2ptunnel.I2PTunnelRunner.run(I2PTunnelRunner.java:167)
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("gnu?", ise);
|
||||
_keepAliveI2P = false;
|
||||
_keepAliveSocket = false;
|
||||
} catch (RuntimeException e) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Internal error", e);
|
||||
_keepAliveI2P = false;
|
||||
_keepAliveSocket = false;
|
||||
} finally {
|
||||
removeRef();
|
||||
if (i2pReset) {
|
||||
@@ -401,6 +478,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
try {
|
||||
i2ps.close();
|
||||
} catch (IOException ioe) {}
|
||||
_keepAliveI2P = false;
|
||||
_keepAliveSocket = false;
|
||||
} else if (sockReset) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Got socket reset, resetting I2P socket");
|
||||
@@ -410,6 +489,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
try {
|
||||
s.close();
|
||||
} catch (IOException ioe) {}
|
||||
_keepAliveI2P = false;
|
||||
_keepAliveSocket = false;
|
||||
} else {
|
||||
// now one connection is dead - kill the other as well, after making sure we flush
|
||||
try {
|
||||
@@ -420,12 +501,16 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
}
|
||||
|
||||
/**
|
||||
* Warning - overridden in I2PTunnelHTTPClientRunner.
|
||||
* Here we ignore keepalive and always close both sides.
|
||||
* The HTTP flavor handles keepalive.
|
||||
*
|
||||
* @param out may be null
|
||||
* @param in may be null
|
||||
* @param i2pout may be null
|
||||
* @param i2pin may be null
|
||||
* @param t1 may be null
|
||||
* @param t2 may be null
|
||||
* @param t2 may be null, ignored, we only join t1
|
||||
*/
|
||||
protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin,
|
||||
Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException {
|
||||
@@ -451,13 +536,13 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
} catch (IOException ioe) {}
|
||||
if (t1 != null)
|
||||
t1.join(30*1000);
|
||||
// t2 = fromI2P now run inline
|
||||
//t2.join(30*1000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deprecated, unimplemented in streaming, never called.
|
||||
* @deprecated unused
|
||||
*/
|
||||
@Deprecated
|
||||
public void errorOccurred() {
|
||||
synchronized (finishLock) {
|
||||
finished = true;
|
||||
@@ -510,12 +595,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
+ from + " and " + to);
|
||||
}
|
||||
|
||||
// boo, hiss! shouldn't need this - the streaming lib should be configurable, but
|
||||
// somehow the inactivity timer is sometimes failing to get triggered properly
|
||||
//i2ps.setReadTimeout(2*60*1000);
|
||||
|
||||
ByteArray ba = _cache.acquire();
|
||||
byte[] buffer = ba.getData(); // new byte[NETWORK_BUFFER_SIZE];
|
||||
byte[] buffer = ba.getData();
|
||||
try {
|
||||
int len;
|
||||
while ((len = in.read(buffer)) != -1) {
|
||||
@@ -583,15 +664,28 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
_failure = ex;
|
||||
} finally {
|
||||
_cache.release(ba);
|
||||
if (_log.shouldLog(Log.INFO)) {
|
||||
_log.info(direction + ": done forwarding between "
|
||||
+ from + " and " + to);
|
||||
boolean keepAliveFrom, keepAliveTo;
|
||||
if (_toI2P) {
|
||||
keepAliveFrom = _keepAliveSocket;
|
||||
keepAliveTo = _keepAliveI2P;
|
||||
} else {
|
||||
keepAliveFrom = _keepAliveI2P;
|
||||
keepAliveTo = _keepAliveSocket;
|
||||
}
|
||||
try {
|
||||
in.close();
|
||||
} catch (IOException ex) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(direction + ": Error closing input stream", ex);
|
||||
if (_log.shouldLog(Log.INFO)) {
|
||||
_log.info(direction + ": done forwarding from "
|
||||
+ from + " to " + to
|
||||
+ " keepalive from? " + keepAliveFrom
|
||||
+ " keepalive to? " + keepAliveTo
|
||||
+ " bytes: " + (_toI2P ? totalSent : totalReceived));
|
||||
}
|
||||
if (!keepAliveFrom) {
|
||||
try {
|
||||
in.close();
|
||||
} catch (IOException ex) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(direction + ": Error closing input stream", ex);
|
||||
}
|
||||
}
|
||||
try {
|
||||
// Thread must close() before exiting for a PipedOutputStream,
|
||||
@@ -601,10 +695,16 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
// DON'T close if we have a timeout job and we haven't received anything,
|
||||
// or else the timeout job can't write the error message to the stream.
|
||||
// close() above will close it after the timeout job is run.
|
||||
if (!((onTimeout != null || _onFail != null) && (!_toI2P) && totalReceived <= 0))
|
||||
out.close();
|
||||
else if (_log.shouldLog(Log.INFO))
|
||||
if (!((onTimeout != null || _onFail != null) && (!_toI2P) && totalReceived <= 0)) {
|
||||
if (keepAliveTo)
|
||||
out.flush();
|
||||
else
|
||||
out.close();
|
||||
} else if (_log.shouldInfo()) {
|
||||
_log.info(direction + ": not closing so we can write the error message");
|
||||
if (keepAliveTo)
|
||||
out.flush();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(direction + ": Error flushing to close", ioe);
|
||||
|
Reference in New Issue
Block a user