Compare commits

...

2 Commits

Author SHA1 Message Date
zzz
04c0c22ab8 Fix for InternalSockets 2023-10-14 14:36:18 -04:00
zzz
9f04bb04b4 Draft: Do not merge: I2PTunnel: Implement HTTP persistence for the browser-to-client proxy socket
(persistence phase 1)

Warnings:
- WIP
- Lightly tested, requires much more
- Requires further cleanup
- Not for 2.4.0
- Will not provide any measurable performance benefit
- Indenting in HTTPClient intentionally wrong to make it easier to review
- Contains some unrelated cleanup that will make this more difficult to review

Previous Work
-------------

In 2005 I attempted to implement persistence and pipelining and created a patch.
Pipelining is very difficult, and it was not enabled by default in browsers anyway.
Today, pipelining has been abandoned by the browsers because support in
proxies is very poor.
See https://developer.mozilla.org/en-US/docs/Web/HTTP/Connection_management_in_HTTP_1.x

The 2005 development had a fatal flaw. It attempted to manage persistence and pipelining end-to-end.
The browser, client proxy, server proxy, and client would all have to support persistence
for keepalive to work.
But RFC 2616 defines persistence as a hop-by-hop property
(what it calls "messages" are hop-by-hop, while the "entity" is end-to-end).

The current code is not completely compliant with the hop-by-hop specification,
and the 2005 patch made it much worse.

The good news is that by implementing persistence for each hop separately,
it is RFC-compliant, and it does not require simultaneous support at all points.
This allows a gradual implementation for each of the three hops.

Browser -- Browser -- Client -- I2P   -- Server -- Server -- Web
           Socket     Proxy     Socket   Proxy     Socket    Server

Proposed Development Plan
-------------------------

Phase 1: Browser Socket
------------------------

This is the simplest.
However, it requires significant changes in the HTTP Client Proxy.
Browser-to-client-proxy sockets will be persistent.
Each I2P socket will be unique for a single request, as now.

There is almost no performance savings for phase 1,
because localhost-to-localhost sockets are cheap.
There will be some savings in that we currently start two
threads for every socket.
However, it gives us experience with persistent connections.
Also, it is necessary for phase 2.
Without persistent browser sockets, if we tried to implement
phase 2 only, we would have to maintain a "pool" of idle I2P sockets,
indexed by server destination:port,
and grab one to use for the next request.

So it's by far easier to implement Phase 2 if we do Phase 1 first,
and require keepalive on the browser socket to use keepalive on
the I2P socket.
All browsers support keepalive to proxies.

Phase 2: I2P Socket
------------------------

With Phase 1 implemented, we can bind the I2P socket to the browser socket.
As long as the browser continues making requests to the same destination,
we will reuse the I2P socket as well.

When a request for a different destination:port comes in, we will close
the I2P socket for the old destination:port, and open a new one.

Thus we maintain a 1:1 mapping of I2PTunnelHTTPClient, HTTPResponseOutputStream,
I2PTunnelHTTPClientRunner, browser socket, and I2P socket.

This is all made possible by abandoning the attempt to support pipelining.

This phase provides significant bandwidth savings and, perhaps, some latency savings also.
Bandwidth is reduced because the streaming SYN is very large.
Latency savings are small because (unlike for TCP) streaming supports 0-RTT delivery of data,
we don't have to wait for a 3-way handshake.

Phase 3: Server Socket
------------------------

This is probably not necessary.
There is almost no performance savings for phase 3.
We would have to maintain a "pool" of idle server sockets,
indexed by server IP:port,
and grab one to use for the next request.

The only case were there would be performance improvement would be for
the outproxy, or for people running web servers remotely (not on localhost).

I2P Proxy Specification Changes
--------------------------------

This will be required to support persistence on I2P sockets that contain
x-i2p-gzip responses.

The current i2ptunnel proxy behavior is documented very briefly
at http://i2p-projekt.i2p/en/docs/api/i2ptunnel

Specification changes are only required for phase 2, and only for
responses currently encoded with x-i2p-gzip (i.e., responses
not gzip encoded by the server).

We could split phase 2 into 2A and 2B, where we only implement
x-i2p-gzchunked in 2B, and let x-i2p-gzip responses
close the I2P socket.

Assumptions
-------------

- All current I2P and i2pd client and server proxies,
  and all maintained clients such as eepget, currently inject
  Connection: close, as required by RFC 2616 (unless sending HTTP/1.0 in the request), to indicate that
  they do not support keepalive on that hop.

Current Java Behavior
--------------------------------

- The client proxy injects an X-Accept-Encoding header containing x-i2p-gzip to the request header.
  The actual current value is:
  "x-i2p-gzip;q=1.0, identity;q=0.5, deflate;q=0, gzip;q=0, *;q=0"
- The server proxy looks for X-Accept-Encoding header containing x-i2p-gzip" in the request headers (this is the current standard)
- The server proxy looks for Accept-Encoding header containing x-i2p-gzip" in the request headers (this hasn't been done in 20 years)
- The server proxy strips the X-Accept-Encoding header from the request
- The server proxy passes through the Accept-Encoding header in the request
- If the client supports x-i2p-gzip (as specified in the request headers),
  and the response Content-Length is either not present or above a minimum size,
  and the response Content-Type is either not present or is not a known compressed mime type (image, audio, video, gzip, etc.),
  and there is no Content-Encoding header in the response,
  THEN the server proxy injects a "Content-Encoding: x-i2p-gzip" header to the response,
  and gzips the payload, including any chunking sent by the server.
- If the client proxy sees a "Content-Encoding: x-i2p-gzip" header to the response,
  it removes the header, and gunzips the response.
- We also inject Proxy-Connection: close in some places, although that is apparently wrong:
  see http://jdebp.info/FGA/web-proxy-connection-header.html

Problems with current specification and Java implementation
----------------------------------------------------------------

Most of this violates RFC 2616.
While it works in practice, the violations aren't well-documented and that makes
the code less maintainable.
In some cases, we're relying on browsers to handle our protocol violations,
which may not always be true.

- Does not dechunk/chunk per-hop; passes through chunking end-to-end
- Passes Transfer-Encoding header through end-to-end
- Uses Content-Encoding, not Transfer-Encoding, to specify the per-hop encoding
- Prohibits x-i2p gzipping when Content-Encoding is set (but we probably don't want to do that anyway)
- Gzips the server-sent chunking, rather than dechunk-gzip-rechunk and dechunk-gunzip-rechunk
- Because the chunking, if any, is inside the gzip rather than outside, there's no easy way to
  find the end of the data, which prohibits I2P socket keepalive on those sockets.
  This violates the requirement that all Transfer-Encoding other than "identity" is chunked.
- The elaborate X-Accept-Encoding header with all the q values is pointless;
  we just look for x-i2p-gzip and then strip the header out.
- Spec says Content-Length must not be sent if Transfer-Encoding is present,
  but we do. Spec says ignore Content-Length if Transfer-Encoding is present,
  so it works for us.

Changes to support I2P Socket Persistence
--------------------------------------------

Requirements:
-----------------

- Backward compatible with all previous versions of I2P and i2pd client and server proxies,
  and I2P eepget, or at least for several years back.
- Any new encoding name must not contain the string "x-i2p-gzip" because current I2P proxies
  search for that string in the headers
- Compatible with browsers and servers
- Compatible with the current outproxies
- Do not leak any i2p-specific headers out the outproxies
- Get closer to RFC 2616 standards
- Don't gunzip if the Content-Encoding is gzip, or we screw up download of
  gz/tgz files (see eepget)

While the standard "Transfer-Encoding: gzip" may be used for the response,
we still need a way for the client proxy to indicate support to the server proxy.
We can't use "gzip" in X-Accept-Encoding because it's already in there.
For symmetry, better to use x-i2p-gzchunked in the response, to match the request.
We just specify that the format is identical to gzip.

Proposal:
----------

- Client proxy sends: X-Accept-Encoding: x-i2p-gzip, x-i2p-gzchunked
- If server proxy sees any Transfer-Encoding except for chunked, pass it through as we do now,
  and disable x-i2p-gzchunked
- If we're going to use x-i2p-gzchunked, and the server sends Transfer-Encoding: chunked,
  then server proxy dechunks first (don't double-chunk)
- Server proxy sends: "Transfer-Encoding: gzip" or x-i2p-gzchunked?
- Both proxies continue to send chunking through end-to-end, except
  for x-i2p-gzchunked, in which case the server proxy dechunks if necessary, gzips, and chunks.
- The client proxy dechunks, gunzips, and chunks if there's no Content-Length.
- Alternatively, the client proxy could replace Transfer-Encoding: x-i2p-gzchunked with gzip
  and pass it through to the client to do the gzipping, which isn't standard, but
  would save in-java CPU where gzipping is less efficient than in the browser.
- Phase 2 follows phase 1 by at least one release

Unchanged
----------

- Don't support keepalive after client-side errors. All error pages will continue
  to send Connection: close.
- Don't support keepalive for internal sockets.
- Don't add keepalive support to eepget. Eepget will continue to send Connection: close.
- Continue passing through Transfer-Encoding and chunking end-to-end in most cases,
  even though it violates the spec.
- Continue supporting x-i2p-gzip for backward compatibility
- Probably don't ever do the server side (phase 3)

Future I2P socket keepalive:
------------------------------

This will allow future development on I2P socket keepalive.
Dechunk (or just spy on if we're passing it through) at the client proxy,
so we know where the end is and can return the socket for the next request.
2023-10-13 14:24:14 -04:00
4 changed files with 392 additions and 90 deletions

View File

@@ -37,7 +37,8 @@ class HTTPResponseOutputStream extends FilterOutputStream {
private boolean _headerWritten;
private final byte _buf1[];
protected boolean _gzip;
protected long _dataExpected;
protected long _dataExpected = -1;
protected boolean _keepAlive;
/** lower-case, trimmed */
protected String _contentType;
/** lower-case, trimmed */
@@ -61,6 +62,44 @@ class HTTPResponseOutputStream extends FilterOutputStream {
_buf1 = new byte[1];
}
/**
* Optionally keep browser socket alive and call callback when we're done.
* BROWSER-SIDE ONLY for now.
*
* @param allowKeepAlive we may, but are not required to, keep the browser-side socket alive
* @param isHead is this a response to a HEAD, and thus no data is expected (RFC 2616 sec. 4.4)
* @since 0.9.61
*/
public HTTPResponseOutputStream(OutputStream raw, boolean allowKeepAlive, boolean isHead) {
this(raw);
_keepAlive = allowKeepAlive;
if (isHead)
_dataExpected = 0;
if (_log.shouldInfo())
_log.info("Before headers: keepalive? " + _keepAlive);
}
/**
* Should we keep the input stream alive when done?
*
* @return false always, not yet supported
* @since 0.9.61
*/
public boolean getKeepAliveIn() {
return false;
}
/**
* Should we keep the output stream alive when done?
* Only supported for the browser socket side.
* I2P socket on server side not supported yet.
*
* @since 0.9.61
*/
public boolean getKeepAliveOut() {
return _keepAlive;
}
@Override
public void write(int c) throws IOException {
_buf1[0] = (byte)c;
@@ -142,10 +181,9 @@ class HTTPResponseOutputStream extends FilterOutputStream {
/** ok, received, now munge & write it */
private void writeHeader() throws IOException {
String responseLine = null;
boolean connectionSent = false;
boolean proxyConnectionSent = false;
boolean chunked = false;
int lastEnd = -1;
byte[] data = _headerBuffer.getData();
@@ -153,11 +191,26 @@ class HTTPResponseOutputStream extends FilterOutputStream {
for (int i = 0; i < valid; i++) {
if (data[i] == NL) {
if (lastEnd == -1) {
responseLine = DataHelper.getUTF8(data, 0, i+1); // includes NL
String responseLine = DataHelper.getUTF8(data, 0, i+1); // includes NL
responseLine = filterResponseLine(responseLine);
responseLine = (responseLine.trim() + "\r\n");
if (_log.shouldInfo())
_log.info("Response: " + responseLine.trim());
// Persistent conn requires HTTP/1.1
if (_keepAlive && !responseLine.startsWith("HTTP/1.1 "))
_keepAlive = false;
// force zero datalen for 1xx, 204, 304 (RFC 2616 sec. 4.4)
// so that these don't prevent keepalive
int sp = responseLine.indexOf(" ");
if (sp > 0) {
String s = responseLine.substring(sp + 1);
if (s.startsWith("1") || s.startsWith("204") || s.startsWith("304"))
_dataExpected = 0;
} else {
// no status?
_keepAlive = false;
}
out.write(DataHelper.getUTF8(responseLine));
} else {
for (int j = lastEnd+1; j < i; j++) {
@@ -182,12 +235,18 @@ class HTTPResponseOutputStream extends FilterOutputStream {
// pass through for websocket
out.write(DataHelper.getASCII("Connection: " + val + "\r\n"));
proxyConnectionSent = true;
// Disable persistence
_keepAlive = false;
} else {
out.write(CONNECTION_CLOSE);
// Strip to allow persistence, replace to disallow
if (!_keepAlive)
out.write(CONNECTION_CLOSE);
}
connectionSent = true;
} else if ("proxy-connection".equals(lcKey)) {
out.write(PROXY_CONNECTION_CLOSE);
// Strip to allow persistence, replace to disallow
if (!_keepAlive)
out.write(PROXY_CONNECTION_CLOSE);
proxyConnectionSent = true;
} else if ("content-encoding".equals(lcKey) && "x-i2p-gzip".equals(val.toLowerCase(Locale.US))) {
_gzip = true;
@@ -206,6 +265,9 @@ class HTTPResponseOutputStream extends FilterOutputStream {
} else if ("content-encoding".equals(lcKey)) {
// save for compress decision on server side
_contentEncoding = val.toLowerCase(Locale.US);
} else if ("transfer-encoding".equals(lcKey) && val.toLowerCase(Locale.US).contains("chunked")) {
// save for keepalive decision on client side
chunked = true;
} else if ("set-cookie".equals(lcKey)) {
String lcVal = val.toLowerCase(Locale.US);
if (lcVal.contains("domain=b32.i2p") ||
@@ -228,21 +290,48 @@ class HTTPResponseOutputStream extends FilterOutputStream {
lastEnd = i;
}
}
// Now make the final keepalive decision
if (_keepAlive) {
// we need one but not both
if ((chunked && _dataExpected >= 0) ||
(!chunked && _dataExpected < 0))
_keepAlive = false;
}
if (!connectionSent)
if (!connectionSent && !_keepAlive)
out.write(CONNECTION_CLOSE);
if (!proxyConnectionSent)
if (!proxyConnectionSent && !_keepAlive)
out.write(PROXY_CONNECTION_CLOSE);
finishHeaders();
boolean shouldCompress = shouldCompress();
if (_log.shouldInfo())
_log.info("After headers: gzip? " + _gzip + " compress? " + shouldCompress);
_log.info("After headers: gzip? " + _gzip + " compress? " + shouldCompress + " keepalive? " + _keepAlive);
if (data.length == CACHE_SIZE)
_cache.release(_headerBuffer);
_headerBuffer = null;
/*****
// TODO
// Setup the keepalive streams
// Until we have keepalive for the i2p socket, the client side
// does not need to do this, we just wait for the socket to close.
// Until we have keepalive for the server socket, the server side
// does not need to do this, we just wait for the socket to close.
if (_keepAlive) {
if (_dataExpected > 0) {
// content-length
// filter output stream to count the data
} else {
// chunked
// filter output stream to look for the end
}
}
****/
if (shouldCompress) {
beginProcessing();
}
@@ -257,7 +346,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
@Override
public void close() throws IOException {
if (_log.shouldInfo())
_log.info("Closing " + out + " compressed? " + shouldCompress(), new Exception("I did it"));
_log.info("Closing " + out + " compressed? " + shouldCompress() + " keepalive? " + _keepAlive, new Exception("I did it"));
synchronized(this) {
// synch with changing out field below
super.close();

View File

@@ -40,6 +40,7 @@ import net.i2p.i2ptunnel.localServer.LocalHTTPServer;
import net.i2p.util.ConvertToHash;
import net.i2p.util.DNSOverHTTPS;
import net.i2p.util.EventDispatcher;
import net.i2p.util.InternalSocket;
import net.i2p.util.Log;
import net.i2p.util.PortMapper;
@@ -99,6 +100,10 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
private static final String PROP_UA_I2P = "httpclient.userAgent.i2p";
private static final String PROP_UA_CLEARNET = "httpclient.userAgent.outproxy";
// how long to wait for another request on the same socket
// Firefox timeout appears to be about 114 seconds, so it will close before we do.
private static final int BROWSER_KEEPALIVE_TIMEOUT = 2*60*1000;
/**
* These are backups if the xxx.ht error page is missing.
*/
@@ -405,11 +410,28 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
String currentProxy = null;
long requestId = __requestId.incrementAndGet();
boolean shout = false;
boolean isConnect = false;
boolean isHead = false;
I2PSocket i2ps = null;
try {
s.setSoTimeout(INITIAL_SO_TIMEOUT);
out = s.getOutputStream();
InputReader reader = new InputReader(s.getInputStream());
int requestCount = 0;
// Will be set to false for non-GET/HEAD, non-HTTP/1.1,
// Connection: close, Proxy-Connection: close,
// or after analysis of the response headers in HTTPResponseOutputStream,
// or on errors in I2PTunnelRunner.
// Do not set keepalive for internal sockets.
boolean keepalive = !(s instanceof InternalSocket);
// indent -------------------------------------------------------------
do {
// indent -------------------------------------------------------------
if (requestCount > 0) {
if (_log.shouldInfo())
_log.info("Keepalive, awaiting request #" + requestCount);
s.setSoTimeout(BROWSER_KEEPALIVE_TIMEOUT);
}
String line, method = null, protocol = null, host = null, destination = null;
String hostLowerCase = null;
StringBuilder newRequest = new StringBuilder();
@@ -431,9 +453,10 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
String lowercaseLine = line.toLowerCase(Locale.US);
if(method == null) { // first line (GET /base64/realaddr)
if(method == null) {
// first line GET/POST/etc.
if(_log.shouldLog(Log.DEBUG)) {
_log.debug(getPrefix(requestId) + "First line [" + line + "]");
_log.debug(getPrefix(requestId) + "req #" + requestCount + " first line [" + line + "]");
}
String[] params = DataHelper.split(line, " ", 3);
@@ -481,12 +504,19 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
****/
}
method = params[0];
if (method.toUpperCase(Locale.US).equals("CONNECT")) {
method = params[0].toUpperCase(Locale.US);
if (method.equals("HEAD")) {
isHead = true;
} else if (method.equals("CONNECT")) {
// this makes things easier later, by spoofing a
// protocol so the URI parser find the host and port
// For in-net outproxy, will be fixed up below
request = "https://" + request + '/';
isConnect = true;
keepalive = false;
} else if (!method.equals("GET")) {
// POST, PUT, ...
keepalive = false;
}
// Now use the Java URI parser
@@ -568,6 +598,8 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
}
String protocolVersion = params[2];
if (!protocolVersion.equals("HTTP/1.1"))
keepalive = false;
protocol = requestURI.getScheme();
host = requestURI.getHost();
@@ -650,8 +682,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
break;
}
******/
} else if ("https".equals(protocol) ||
method.toUpperCase(Locale.US).equals("CONNECT")) {
} else if ("https".equals(protocol) || isConnect) {
remotePort = 443;
} else {
remotePort = 80;
@@ -815,19 +846,21 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
host = getHostName(addressHelper);
}
// now strip everything but path and query from URI
targetRequest = requestURI.toASCIIString();
String newURI = requestURI.getRawPath();
if(query != null) {
newURI += '?' + query;
}
try {
requestURI = new URI(newURI);
} catch(URISyntaxException use) {
// shouldnt happen
_log.warn(request, use);
method = null;
break;
if (!isConnect) {
// now strip everything but path and query from URI
String newURI = requestURI.getRawPath();
if(query != null) {
newURI += '?' + query;
}
try {
requestURI = new URI(newURI);
} catch(URISyntaxException use) {
// shouldnt happen
_log.warn(request, use);
method = null;
break;
}
}
// end of (host endsWith(".i2p"))
@@ -853,8 +886,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
int rPort = requestURI.getPort();
if (rPort > 0)
remotePort = rPort;
else if ("https".equals(protocol) ||
method.toUpperCase(Locale.US).equals("CONNECT"))
else if ("https".equals(protocol) || isConnect)
remotePort = 443;
else
remotePort = 80;
@@ -873,8 +905,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("Before selecting outproxy for " + host);
}
if ("https".equals(protocol) ||
method.toUpperCase(Locale.US).equals("CONNECT"))
if ("https".equals(protocol) || isConnect)
currentProxy = selectSSLProxy(hostLowerCase);
else
currentProxy = selectProxy(hostLowerCase);
@@ -930,16 +961,22 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
break;
}
if (method.toUpperCase(Locale.US).equals("CONNECT")) {
if (isConnect) {
// fix up the change to requestURI above to get back to the original host:port
line = method + ' ' + requestURI.getHost() + ':' + requestURI.getPort() + ' ' + protocolVersion;
if (usingInternalOutproxy || usingWWWProxy)
line = method + ' ' + requestURI.getHost() + ':' + requestURI.getPort() + ' ' + protocolVersion;
else
line = method + ' ' + host + ':' + remotePort + ' ' + protocolVersion;
} else {
line = method + ' ' + requestURI.toASCIIString() + ' ' + protocolVersion;
}
if(_log.shouldLog(Log.DEBUG)) {
_log.debug(getPrefix(requestId) + "REQ : \"" + request + "\"");
_log.debug(getPrefix(requestId) + "REQURI: \"" + requestURI + "\"");
_log.debug(getPrefix(requestId) + "NEWREQ: \"" + line + "\"");
_log.debug(getPrefix(requestId) + "HOST : \"" + host + "\"");
_log.debug(getPrefix(requestId) + "RPORT : \"" + remotePort + "\"");
_log.debug(getPrefix(requestId) + "DEST : \"" + destination + "\"");
}
@@ -950,11 +987,16 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
if (lowercaseLine.contains("upgrade")) {
// pass through for websocket
preserveConnectionHeader = true;
keepalive = false;
} else {
if (lowercaseLine.contains("close"))
keepalive = false;
continue;
}
} else if (lowercaseLine.startsWith("keep-alive: ") ||
lowercaseLine.startsWith("proxy-connection: ")) {
if (lowercaseLine.contains("close"))
keepalive = false;
continue;
} else if (lowercaseLine.startsWith("host: ") && !usingWWWProxy && !usingInternalOutproxy) {
// Note that we only pass the original Host: line through to the outproxy
@@ -1062,8 +1104,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
if(ok != null) {
gzip = Boolean.parseBoolean(ok);
}
if(gzip && !usingInternalServer &&
!method.toUpperCase(Locale.US).equals("CONNECT")) {
if(gzip && !usingInternalServer && !isConnect) {
// according to rfc2616 s14.3, this *should* force identity, even if
// an explicit q=0 for gzip doesn't. tested against orion.i2p, and it
// seems to work.
@@ -1072,7 +1113,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
if (!usingInternalOutproxy)
newRequest.append("X-Accept-Encoding: x-i2p-gzip;q=1.0, identity;q=0.5, deflate;q=0, gzip;q=0, *;q=0\r\n");
}
if(!shout && !method.toUpperCase(Locale.US).equals("CONNECT")) {
if(!shout && !isConnect) {
if(!Boolean.parseBoolean(getTunnel().getClientOptions().getProperty(PROP_USER_AGENT))) {
// let's not advertise to external sites that we are from I2P
String ua;
@@ -1124,7 +1165,12 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
}
if(method == null || (destination == null && !usingInternalOutproxy)) {
//l.log("No HTTP method found in the request.");
if (requestCount > 0) {
// SocketTimeout, normal to get here for persistent connections,
// because DataHelper.readLine() returns null on EOF
return;
}
_log.debug("No HTTP method found in the request.");
try {
if (protocol != null && "http".equals(protocol.toLowerCase(Locale.US))) {
out.write(getErrorPage("denied", ERR_REQUEST_DENIED).getBytes("UTF-8"));
@@ -1143,6 +1189,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
}
// Authorization
// Yes, this is sent and checked for every request on a persistent connection
AuthResult result = authorize(s, requestId, method, authorization);
if (result != AuthResult.AUTH_GOOD) {
if(_log.shouldLog(Log.WARN)) {
@@ -1184,7 +1231,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, currentProxy, requestId);
byte[] data;
byte[] response;
if (method.toUpperCase(Locale.US).equals("CONNECT")) {
if (isConnect) {
data = null;
response = SUCCESS_RESPONSE.getBytes("UTF-8");
} else {
@@ -1329,7 +1376,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
}
// as of 0.9.35, allowInternalSSL defaults to true, and overridden to true unless PROP_SSL_SET is set
if (method.toUpperCase(Locale.US).equals("CONNECT") &&
if (isConnect &&
!usingWWWProxy &&
getTunnel().getClientOptions().getProperty(PROP_SSL_SET) != null &&
!Boolean.parseBoolean(getTunnel().getClientOptions().getProperty(PROP_INTERNAL_SSL, "true"))) {
@@ -1401,10 +1448,10 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
if (remotePort > 0)
sktOpts.setPort(remotePort);
i2ps = createI2PSocket(clientDest, sktOpts);
boolean isConnect = method.toUpperCase(Locale.US).equals("CONNECT");
OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy,
currentProxy, requestId, hostLowerCase, isConnect);
I2PTunnelRunner t;
I2PTunnelHTTPClientRunner hrunner = null;
if (isConnect) {
byte[] data;
byte[] response;
@@ -1418,15 +1465,31 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, onTimeout);
} else {
byte[] data = newRequest.toString().getBytes("ISO-8859-1");
t = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout);
hrunner = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout, keepalive, isHead);
t = hrunner;
}
if (usingWWWProxy) {
t.setSuccessCallback(new OnProxySuccess(currentProxy, hostLowerCase, isConnect));
}
// we are called from an unlimited thread pool, so run inline
//t.start();
String name = Thread.currentThread().getName();
t.run();
// I2PTunnelHTTPClientRunner spins off the browser-to-i2p thread and keeps
// the i2p-to-socket copier in-line. So we won't get here until the i2p socket is closed.
Thread.currentThread().setName(name);
// check if whatever was in the response does not allow keepalive
if (keepalive && hrunner != null && !hrunner.getKeepAliveSocket())
break;
// go around again
requestCount++;
// indent -------------------------------------------------------------
} while (keepalive);
// indent -------------------------------------------------------------
} catch(IOException ex) {
// This is normal for keepalive when the browser closed the socket,
// or a SocketTimeoutException if we gave up first
if(_log.shouldLog(Log.INFO)) {
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
}

View File

@@ -23,53 +23,103 @@ import net.i2p.client.streaming.I2PSocket;
* Warning - not maintained as a stable API for external use.
*/
public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner {
private HTTPResponseOutputStream _hout;
private final boolean _isHead;
/**
* Does NOT start itself. Caller must call start().
*
* @deprecated use other constructor
*/
@Deprecated
public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
List<I2PSocket> sockList, FailCallback onFail) {
super(s, i2ps, slock, initialI2PData, null, sockList, onFail);
_isHead = false;
}
/**
* Does NOT start itself. Caller must call start().
*
* @param allowKeepAliveSocket we may, but are not required to, keep the browser-side socket alive
* @param isHead is this a response to a HEAD, and thus no data is expected (RFC 2616 sec. 4.4)
* @since 0.9.61
*/
public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
List<I2PSocket> sockList, FailCallback onFail,
boolean allowKeepAliveSocket, boolean isHead) {
super(s, i2ps, slock, initialI2PData, null, sockList, onFail, false, allowKeepAliveSocket);
_isHead = isHead;
}
/**
* Only call once!
*
* @return an HTTPResponseOutputStream
* @throws IllegalStateException if called again
*/
@Override
protected OutputStream getSocketOut() throws IOException {
if (_hout != null)
throw new IllegalStateException("already called");
OutputStream raw = super.getSocketOut();
return new HTTPResponseOutputStream(raw);
_hout = new HTTPResponseOutputStream(raw, super.getKeepAliveSocket(), _isHead);
return _hout;
}
/**
* Should we keep the local browser/server socket open when done?
* @since 0.9.61
*/
@Override
boolean getKeepAliveSocket() {
return _hout != null && _hout.getKeepAliveOut() && super.getKeepAliveSocket();
}
/**
* Why is this overridden?
* Why flush in super but not here?
* Why do things in different order than in super?
*
* @param out may be null
* @param in may be null
* @param i2pout may be null
* @param i2pin may be null
* @param t1 may be null
* @param t2 may be null, ignored, we only join t1
*/
@Override
protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin,
Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException {
boolean keepalive = getKeepAliveSocket();
if (_log.shouldInfo())
_log.info("Closing HTTPClientRunner keepalive? " + keepalive, new Exception("I did it"));
if (i2pin != null) { try {
i2pin.close();
} catch (IOException ioe) {} }
if (i2pout != null) { try {
i2pout.close();
} catch (IOException ioe) {} }
if (in != null) { try {
in.close();
} catch (IOException ioe) {} }
if (!keepalive) {
if (in != null) { try {
in.close();
} catch (IOException ioe) {} }
}
if (out != null) { try {
out.close();
if (keepalive)
out.flush();
else
out.close();
} catch (IOException ioe) {} }
try {
i2ps.close();
} catch (IOException ioe) {}
try {
s.close();
} catch (IOException ioe) {}
if (!keepalive) {
try {
s.close();
} catch (IOException ioe) {}
}
if (t1 != null)
t1.join(30*1000);
// t2 = fromI2P now run inline
//t2.join(30*1000);
}
}

View File

@@ -27,7 +27,10 @@ import net.i2p.util.InternalSocket;
import net.i2p.util.Log;
/**
* A thread that starts two more threads, one to forward traffic in each direction.
* A thread that starts one more thread if keepAliveSocket is false,
* to forward traffic in each direction.
* When keepAliveSocket is true, we do not expect additional data and do not
* need a forwarding thread from the socket to I2P.
*
* Warning - not maintained as a stable API for external use.
*/
@@ -52,8 +55,6 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
private volatile boolean finished;
private final byte[] initialI2PData;
private final byte[] initialSocketData;
/** when the last data was sent/received (or -1 if never) */
private long lastActivityOn;
/** when the runner started up */
private final long startedOn;
private final List<I2PSocket> sockList;
@@ -65,6 +66,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
private long totalSent;
// does not include initialSocketData
private long totalReceived;
// not final, may be changed by extending classes
protected volatile boolean _keepAliveI2P, _keepAliveSocket;
/**
* For use in new constructor
@@ -166,6 +169,26 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, false);
}
/**
* With keepAlive args. Does NOT start itself. Caller must call start().
*
* @param slock the socket lock, non-null
* @param initialI2PData may be null
* @param initialSocketData may be null
* @param sockList may be null. Caller must add i2ps to the list! It will be removed here on completion.
* Will synchronize on slock when removing.
* @param onFail May be null. If non-null and no data (except initial data) was received,
* it will be run before closing s.
* @param keepAliveI2P do not close the I2P socket at the end
* @param keepAliveSocket do not close the local socket at the end
* @since 0.9.61
*/
public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
byte[] initialSocketData, List<I2PSocket> sockList, FailCallback onFail,
boolean keepAliveI2P, boolean keepAliveSocket) {
this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, keepAliveI2P, keepAliveSocket, false);
}
/**
* Base constructor
*
@@ -182,6 +205,30 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
private I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
byte[] initialSocketData, List<I2PSocket> sockList, Runnable onTimeout,
FailCallback onFail, boolean shouldStart) {
this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, false, false, shouldStart);
}
/**
* Base constructor with keepAlive args
*
* @param slock the socket lock, non-null
* @param initialI2PData may be null
* @param initialSocketData may be null
* @param sockList may be null. Caller must add i2ps to the list! It will be removed here on completion.
* Will synchronize on slock when removing.
* @param onTimeout May be null. If non-null and no data (except initial data) was received,
* it will be run before closing s.
* @param onFail Trumps onTimeout
* @param shouldStart should thread be started in constructor (bad, false recommended)
* @param keepAliveI2P do not close the I2P socket at the end
* @param keepAliveSocket do not close the local socket at the end
* @since 0.9.61
*/
private I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
byte[] initialSocketData, List<I2PSocket> sockList, Runnable onTimeout,
FailCallback onFail,
boolean keepAliveI2P, boolean keepAliveSocket,
boolean shouldStart) {
this.sockList = sockList;
this.s = s;
this.i2ps = i2ps;
@@ -190,9 +237,10 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
this.initialSocketData = initialSocketData;
this.onTimeout = onTimeout;
_onFail = onFail;
lastActivityOn = -1;
startedOn = Clock.getInstance().now();
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
_keepAliveI2P = keepAliveI2P;
_keepAliveSocket = keepAliveSocket;
if (_log.shouldLog(Log.INFO))
_log.info("I2PTunnelRunner started");
_runnerId = __runnerId.incrementAndGet();
@@ -221,15 +269,9 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
*/
@Deprecated
public long getLastActivityOn() {
return lastActivityOn;
return -1L;
}
/****
private void updateActivity() {
lastActivityOn = Clock.getInstance().now();
}
****/
/**
* When this runner started up transferring data
*
@@ -251,6 +293,22 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
protected InputStream getSocketIn() throws IOException { return s.getInputStream(); }
protected OutputStream getSocketOut() throws IOException { return s.getOutputStream(); }
/**
* Should we keep the I2P socket open when done?
* @since 0.9.61
*/
boolean getKeepAliveI2P() {
return _keepAliveI2P;
}
/**
* Should we keep the local browser/server socket open when done?
* @since 0.9.61
*/
boolean getKeepAliveSocket() {
return _keepAliveSocket;
}
private static final byte[] POST = { 'P', 'O', 'S', 'T', ' ' };
private static final byte[] PUT = { 'P', 'U', 'T', ' ' };
@@ -266,7 +324,6 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
StreamForwarder toI2P = null;
StreamForwarder fromI2P = null;
try {
in = getSocketIn();
out = getSocketOut(); // = new BufferedOutputStream(s.getOutputStream(), NETWORK_BUFFER_SIZE);
// unimplemented in streaming
//i2ps.setSocketErrorListener(this);
@@ -299,15 +356,25 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
// this does not increment totalReceived
out.write(initialSocketData);
}
if (_log.shouldLog(Log.DEBUG))
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Initial data " + (initialI2PData != null ? initialI2PData.length : 0)
+ " written to I2P, " + (initialSocketData != null ? initialSocketData.length : 0)
+ " written to the socket, starting forwarders");
if (!(s instanceof InternalSocket))
in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE);
toI2P = new StreamForwarder(in, i2pout, true, null);
}
if (_keepAliveSocket) {
// standard GET or HEAD, no data, do not thread a forwarder
// because we don't need it and
// we don't want it to swallow the next request
} else {
in = getSocketIn();
// InternalSocket already has buffering
if (!(s instanceof InternalSocket))
in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE);
toI2P = new StreamForwarder(in, i2pout, true, null);
toI2P.start();
}
fromI2P = new StreamForwarder(i2pin, out, false, _onSuccess);
toI2P.start();
// We are already a thread, so run the second one inline
//fromI2P.start();
fromI2P.run();
@@ -330,7 +397,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
// HTTPClient never sets initialSocketData.
if (_onFail != null) {
Exception e = fromI2P.getFailure();
if (e == null)
if (e == null && toI2P != null)
e = toI2P.getFailure();
_onFail.onFail(e);
} else {
@@ -339,7 +406,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
} else {
// Detect a reset on one side, and propagate to the other
Exception e1 = fromI2P.getFailure();
Exception e2 = toI2P.getFailure();
Exception e2 = toI2P != null ? toI2P.getFailure() : null;
Throwable c1 = e1 != null ? e1.getCause() : null;
Throwable c2 = e2 != null ? e2.getCause() : null;
if (c1 != null && c1 instanceof I2PSocketException) {
@@ -365,11 +432,17 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
} catch (InterruptedException ex) {
if (_log.shouldLog(Log.ERROR))
_log.error("Interrupted", ex);
_keepAliveI2P = false;
_keepAliveSocket = false;
} catch (SSLException she) {
_log.error("SSL error", she);
_keepAliveI2P = false;
_keepAliveSocket = false;
} catch (IOException ex) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error forwarding", ex);
_keepAliveI2P = false;
_keepAliveSocket = false;
} catch (IllegalStateException ise) {
// JamVM (Gentoo: jamvm-1.5.4, gnu-classpath-0.98+gmp)
//java.nio.channels.NotYetConnectedException
@@ -384,9 +457,13 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
// at net.i2p.i2ptunnel.I2PTunnelRunner.run(I2PTunnelRunner.java:167)
if (_log.shouldLog(Log.WARN))
_log.warn("gnu?", ise);
_keepAliveI2P = false;
_keepAliveSocket = false;
} catch (RuntimeException e) {
if (_log.shouldLog(Log.ERROR))
_log.error("Internal error", e);
_keepAliveI2P = false;
_keepAliveSocket = false;
} finally {
removeRef();
if (i2pReset) {
@@ -401,6 +478,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
try {
i2ps.close();
} catch (IOException ioe) {}
_keepAliveI2P = false;
_keepAliveSocket = false;
} else if (sockReset) {
if (_log.shouldWarn())
_log.warn("Got socket reset, resetting I2P socket");
@@ -410,6 +489,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
try {
s.close();
} catch (IOException ioe) {}
_keepAliveI2P = false;
_keepAliveSocket = false;
} else {
// now one connection is dead - kill the other as well, after making sure we flush
try {
@@ -420,12 +501,16 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
}
/**
* Warning - overridden in I2PTunnelHTTPClientRunner.
* Here we ignore keepalive and always close both sides.
* The HTTP flavor handles keepalive.
*
* @param out may be null
* @param in may be null
* @param i2pout may be null
* @param i2pin may be null
* @param t1 may be null
* @param t2 may be null
* @param t2 may be null, ignored, we only join t1
*/
protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin,
Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException {
@@ -451,13 +536,13 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
} catch (IOException ioe) {}
if (t1 != null)
t1.join(30*1000);
// t2 = fromI2P now run inline
//t2.join(30*1000);
}
/**
* Deprecated, unimplemented in streaming, never called.
* @deprecated unused
*/
@Deprecated
public void errorOccurred() {
synchronized (finishLock) {
finished = true;
@@ -510,12 +595,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
+ from + " and " + to);
}
// boo, hiss! shouldn't need this - the streaming lib should be configurable, but
// somehow the inactivity timer is sometimes failing to get triggered properly
//i2ps.setReadTimeout(2*60*1000);
ByteArray ba = _cache.acquire();
byte[] buffer = ba.getData(); // new byte[NETWORK_BUFFER_SIZE];
byte[] buffer = ba.getData();
try {
int len;
while ((len = in.read(buffer)) != -1) {
@@ -583,15 +664,28 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
_failure = ex;
} finally {
_cache.release(ba);
if (_log.shouldLog(Log.INFO)) {
_log.info(direction + ": done forwarding between "
+ from + " and " + to);
boolean keepAliveFrom, keepAliveTo;
if (_toI2P) {
keepAliveFrom = _keepAliveSocket;
keepAliveTo = _keepAliveI2P;
} else {
keepAliveFrom = _keepAliveI2P;
keepAliveTo = _keepAliveSocket;
}
try {
in.close();
} catch (IOException ex) {
if (_log.shouldLog(Log.WARN))
_log.warn(direction + ": Error closing input stream", ex);
if (_log.shouldLog(Log.INFO)) {
_log.info(direction + ": done forwarding from "
+ from + " to " + to
+ " keepalive from? " + keepAliveFrom
+ " keepalive to? " + keepAliveTo
+ " bytes: " + (_toI2P ? totalSent : totalReceived));
}
if (!keepAliveFrom) {
try {
in.close();
} catch (IOException ex) {
if (_log.shouldLog(Log.WARN))
_log.warn(direction + ": Error closing input stream", ex);
}
}
try {
// Thread must close() before exiting for a PipedOutputStream,
@@ -601,10 +695,16 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
// DON'T close if we have a timeout job and we haven't received anything,
// or else the timeout job can't write the error message to the stream.
// close() above will close it after the timeout job is run.
if (!((onTimeout != null || _onFail != null) && (!_toI2P) && totalReceived <= 0))
out.close();
else if (_log.shouldLog(Log.INFO))
if (!((onTimeout != null || _onFail != null) && (!_toI2P) && totalReceived <= 0)) {
if (keepAliveTo)
out.flush();
else
out.close();
} else if (_log.shouldInfo()) {
_log.info(direction + ": not closing so we can write the error message");
if (keepAliveTo)
out.flush();
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(direction + ": Error flushing to close", ioe);