forked from I2P_Developers/i2p.i2p
Compare commits
17 Commits
i2p_0_4_2
...
i2p_0_4_2_
Author | SHA1 | Date | |
---|---|---|---|
![]() |
610f1f7dd4 | ||
![]() |
516d0b4db8 | ||
![]() |
df61ae5c6f | ||
![]() |
9f6584b55e | ||
![]() |
e4b41f5bb0 | ||
![]() |
8d0cea93e9 | ||
![]() |
d294d07919 | ||
![]() |
153eea2bd5 | ||
![]() |
571e3c5c13 | ||
![]() |
a2d268f3d6 | ||
![]() |
02d456d7a0 | ||
![]() |
b3626ad86f | ||
![]() |
9b6eab451f | ||
![]() |
72be9b5f04 | ||
![]() |
35e94a7f65 | ||
![]() |
8e02586cc9 | ||
![]() |
0b5a640896 |
@@ -184,13 +184,26 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
|
||||
* create the default options (using the default timeout, etc)
|
||||
*
|
||||
*/
|
||||
private I2PSocketOptions getDefaultOptions() {
|
||||
protected I2PSocketOptions getDefaultOptions() {
|
||||
Properties defaultOpts = getTunnel().getClientOptions();
|
||||
I2PSocketOptions opts = sockMgr.buildOptions(defaultOpts);
|
||||
if (!defaultOpts.containsKey(I2PSocketOptions.PROP_CONNECT_TIMEOUT))
|
||||
opts.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
|
||||
return opts;
|
||||
}
|
||||
|
||||
/**
|
||||
* create the default options (using the default timeout, etc)
|
||||
*
|
||||
*/
|
||||
protected I2PSocketOptions getDefaultOptions(Properties overrides) {
|
||||
Properties defaultOpts = getTunnel().getClientOptions();
|
||||
defaultOpts.putAll(overrides);
|
||||
I2PSocketOptions opts = sockMgr.buildOptions(defaultOpts);
|
||||
if (!defaultOpts.containsKey(I2PSocketOptions.PROP_CONNECT_TIMEOUT))
|
||||
opts.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
|
||||
return opts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new I2PSocket towards to the specified destination,
|
||||
|
@@ -14,10 +14,12 @@ import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.I2PException;
|
||||
import net.i2p.client.streaming.I2PSocket;
|
||||
import net.i2p.client.streaming.I2PSocketOptions;
|
||||
import net.i2p.data.DataFormatException;
|
||||
import net.i2p.data.Destination;
|
||||
import net.i2p.util.Clock;
|
||||
@@ -142,6 +144,33 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
|
||||
return proxy;
|
||||
}
|
||||
}
|
||||
|
||||
private static final int DEFAULT_READ_TIMEOUT = 60*1000;
|
||||
|
||||
/**
|
||||
* create the default options (using the default timeout, etc)
|
||||
*
|
||||
*/
|
||||
protected I2PSocketOptions getDefaultOptions() {
|
||||
I2PSocketOptions opts = super.getDefaultOptions();
|
||||
Properties defaultOpts = getTunnel().getClientOptions();
|
||||
if (!defaultOpts.contains(I2PSocketOptions.PROP_READ_TIMEOUT))
|
||||
opts.setReadTimeout(DEFAULT_READ_TIMEOUT);
|
||||
return opts;
|
||||
}
|
||||
|
||||
/**
|
||||
* create the default options (using the default timeout, etc)
|
||||
*
|
||||
*/
|
||||
protected I2PSocketOptions getDefaultOptions(Properties overrides) {
|
||||
I2PSocketOptions opts = super.getDefaultOptions(overrides);
|
||||
Properties defaultOpts = getTunnel().getClientOptions();
|
||||
defaultOpts.putAll(overrides);
|
||||
if (!defaultOpts.containsKey(I2PSocketOptions.PROP_READ_TIMEOUT))
|
||||
opts.setConnectTimeout(DEFAULT_READ_TIMEOUT);
|
||||
return opts;
|
||||
}
|
||||
|
||||
private static long __requestId = 0;
|
||||
protected void clientConnectionRun(Socket s) {
|
||||
@@ -149,7 +178,6 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
|
||||
String targetRequest = null;
|
||||
boolean usingWWWProxy = false;
|
||||
String currentProxy = null;
|
||||
InactivityTimeoutThread timeoutThread = null;
|
||||
long requestId = ++__requestId;
|
||||
try {
|
||||
out = s.getOutputStream();
|
||||
@@ -295,7 +323,14 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getPrefix(requestId) + "Setting host = " + host);
|
||||
} else if (line.startsWith("User-Agent: ")) {
|
||||
line = "User-Agent: MYOB/6.66 (AN/ON)";
|
||||
// always stripped, added back at the end
|
||||
line = null;
|
||||
continue;
|
||||
} else if (line.startsWith("Accept")) {
|
||||
// strip the accept-blah headers, as they vary dramatically from
|
||||
// browser to browser
|
||||
line = null;
|
||||
continue;
|
||||
} else if (line.startsWith("Referer: ")) {
|
||||
// Shouldn't we be more specific, like accepting in-site referers ?
|
||||
//line = "Referer: i2p";
|
||||
@@ -313,6 +348,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
|
||||
}
|
||||
|
||||
if (line.length() == 0) {
|
||||
newRequest.append("User-Agent: MYOB/6.66 (AN/ON)");
|
||||
newRequest.append("Connection: close\r\n\r\n");
|
||||
break;
|
||||
} else {
|
||||
@@ -354,25 +390,26 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
|
||||
return;
|
||||
}
|
||||
String remoteID;
|
||||
I2PSocket i2ps = createI2PSocket(dest);
|
||||
|
||||
Properties opts = new Properties();
|
||||
opts.setProperty("i2p.streaming.inactivityTimeout", ""+120*1000);
|
||||
// 1 == disconnect. see ConnectionOptions in the new streaming lib, which i
|
||||
// dont want to hard link to here
|
||||
opts.setProperty("i2p.streaming.inactivityTimeoutAction", ""+1);
|
||||
I2PSocket i2ps = createI2PSocket(dest, getDefaultOptions(opts));
|
||||
byte[] data = newRequest.toString().getBytes("ISO-8859-1");
|
||||
I2PTunnelRunner runner = new I2PTunnelRunner(s, i2ps, sockLock, data, mySockets);
|
||||
timeoutThread = new InactivityTimeoutThread(runner, out, targetRequest, usingWWWProxy, currentProxy, s, requestId);
|
||||
timeoutThread.start();
|
||||
} catch (SocketException ex) {
|
||||
if (timeoutThread != null) timeoutThread.disable();
|
||||
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
|
||||
l.log(ex.getMessage());
|
||||
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
|
||||
closeSocket(s);
|
||||
} catch (IOException ex) {
|
||||
if (timeoutThread != null) timeoutThread.disable();
|
||||
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
|
||||
l.log(ex.getMessage());
|
||||
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
|
||||
closeSocket(s);
|
||||
} catch (I2PException ex) {
|
||||
if (timeoutThread != null) timeoutThread.disable();
|
||||
_log.info("getPrefix(requestId) + Error trying to connect", ex);
|
||||
l.log(ex.getMessage());
|
||||
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
|
||||
@@ -380,91 +417,6 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
|
||||
}
|
||||
}
|
||||
|
||||
private static final long INACTIVITY_TIMEOUT = 120 * 1000;
|
||||
private static volatile long __timeoutId = 0;
|
||||
|
||||
private class InactivityTimeoutThread extends I2PThread {
|
||||
|
||||
private Socket s;
|
||||
private I2PTunnelRunner _runner;
|
||||
private OutputStream _out;
|
||||
private String _targetRequest;
|
||||
private boolean _useWWWProxy;
|
||||
private String _currentProxy;
|
||||
private long _requestId;
|
||||
private boolean _disabled;
|
||||
private Object _disableLock = new Object();
|
||||
|
||||
public InactivityTimeoutThread(I2PTunnelRunner runner, OutputStream out, String targetRequest,
|
||||
boolean useWWWProxy, String currentProxy, Socket s, long requestId) {
|
||||
this.s = s;
|
||||
_runner = runner;
|
||||
_out = out;
|
||||
_targetRequest = targetRequest;
|
||||
_useWWWProxy = useWWWProxy;
|
||||
_currentProxy = currentProxy;
|
||||
_disabled = false;
|
||||
_requestId = requestId;
|
||||
long timeoutId = ++__timeoutId;
|
||||
setName("InactivityThread " + getPrefix(requestId) + timeoutId);
|
||||
}
|
||||
|
||||
public void disable() {
|
||||
_disabled = true;
|
||||
synchronized (_disableLock) {
|
||||
_disableLock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (!_disabled) {
|
||||
if (_runner.isFinished()) {
|
||||
if (_log.shouldLog(Log.INFO)) _log.info(getPrefix(_requestId) + "HTTP client request completed prior to timeout");
|
||||
return;
|
||||
}
|
||||
if (_runner.getLastActivityOn() < Clock.getInstance().now() - INACTIVITY_TIMEOUT) {
|
||||
if (_runner.getStartedOn() < Clock.getInstance().now() - INACTIVITY_TIMEOUT) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(getPrefix(_requestId) + "HTTP client request timed out (lastActivity: "
|
||||
+ new Date(_runner.getLastActivityOn()) + ", startedOn: "
|
||||
+ new Date(_runner.getStartedOn()) + ")");
|
||||
timeout();
|
||||
return;
|
||||
} else {
|
||||
// runner hasn't been going to long enough
|
||||
}
|
||||
} else {
|
||||
// there has been activity in the period
|
||||
}
|
||||
synchronized (_disableLock) {
|
||||
try {
|
||||
_disableLock.wait(INACTIVITY_TIMEOUT);
|
||||
} catch (InterruptedException ie) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void timeout() {
|
||||
_log.info(getPrefix(_requestId) + "Inactivity timeout reached");
|
||||
l.log("Inactivity timeout reached");
|
||||
if (_out != null) {
|
||||
try {
|
||||
if (_runner.getLastActivityOn() > 0) {
|
||||
// some data has been sent, so don't 404 it
|
||||
} else {
|
||||
writeErrorMessage(ERR_TIMEOUT, _out, _targetRequest, _useWWWProxy, _currentProxy);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
_log.warn(getPrefix(_requestId) + "Error writing out the 'timeout' message", ioe);
|
||||
}
|
||||
} else {
|
||||
_log.warn(getPrefix(_requestId) + "Client disconnected before we could say we timed out");
|
||||
}
|
||||
closeSocket(s);
|
||||
}
|
||||
}
|
||||
|
||||
private final static String getHostName(String host) {
|
||||
if (host == null) return null;
|
||||
try {
|
||||
|
@@ -110,6 +110,9 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
|
||||
//i2pout.flush();
|
||||
}
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Initial data " + (initialData != null ? initialData.length : 0)
|
||||
+ " written, starting forwarders");
|
||||
Thread t1 = new StreamForwarder(in, i2pout, "toI2P");
|
||||
Thread t2 = new StreamForwarder(i2pin, out, "fromI2P");
|
||||
synchronized (finishLock) {
|
||||
@@ -117,11 +120,13 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
|
||||
finishLock.wait();
|
||||
}
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("At least one forwarder completed, closing and joining");
|
||||
// now one connection is dead - kill the other as well.
|
||||
s.close();
|
||||
i2ps.close();
|
||||
t1.join();
|
||||
t2.join();
|
||||
t1.join(30*1000);
|
||||
t2.join(30*1000);
|
||||
closedCleanly = true;
|
||||
} catch (InterruptedException ex) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
@@ -221,7 +226,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
|
||||
out.flush(); // make sure the data get though
|
||||
}
|
||||
}
|
||||
out.flush();
|
||||
//out.flush(); // close() flushes
|
||||
} catch (SocketException ex) {
|
||||
// this *will* occur when the other threads closes the socket
|
||||
synchronized (finishLock) {
|
||||
@@ -248,11 +253,16 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
|
||||
+ from + " and " + to);
|
||||
}
|
||||
try {
|
||||
out.close();
|
||||
in.close();
|
||||
} catch (IOException ex) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(direction + ": Error closing streams", ex);
|
||||
_log.warn(direction + ": Error closing input stream", ex);
|
||||
}
|
||||
try {
|
||||
out.close();
|
||||
} catch (IOException ex) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(direction + ": Error closing output stream", ex);
|
||||
}
|
||||
synchronized (finishLock) {
|
||||
finished = true;
|
||||
|
@@ -120,10 +120,15 @@ public class ConfigNetHelper {
|
||||
private static String getBurstFactor(int numSeconds, String name) {
|
||||
StringBuffer buf = new StringBuffer(256);
|
||||
buf.append("<select name=\"").append(name).append("\">\n");
|
||||
boolean found = false;
|
||||
for (int i = 10; i <= 60; i += 10) {
|
||||
buf.append("<option value=\"").append(i).append("\" ");
|
||||
if ( (i == numSeconds) || (i == 60) )
|
||||
if (i == numSeconds) {
|
||||
buf.append("selected ");
|
||||
found = true;
|
||||
} else if ( (i == 60) && (!found) ) {
|
||||
buf.append("selected ");
|
||||
}
|
||||
buf.append(">");
|
||||
buf.append(i).append(" seconds</option>\n");
|
||||
}
|
||||
|
@@ -38,9 +38,11 @@ extern "C" {
|
||||
#include <stddef.h>
|
||||
#include <stdint.h>
|
||||
|
||||
|
||||
/*
|
||||
* Lengths
|
||||
*/
|
||||
|
||||
/* The maximum length a SAM command can be */
|
||||
#define SAM_CMD_LEN 128
|
||||
/* The maximum size of a single datagram packet */
|
||||
@@ -49,18 +51,22 @@ extern "C" {
|
||||
#define SAM_LOGMSG_LEN 256
|
||||
/* The longest `name' arg for the naming lookup callback */
|
||||
#define SAM_NAME_LEN 256
|
||||
/* The max size of a single stream packet */
|
||||
/* The maximum size of a single stream packet */
|
||||
#define SAM_STREAM_PAYLOAD_MAX (32 * 1024)
|
||||
/* The length of a base 64 public key - it's actually 516, but +1 for '\0' */
|
||||
#define SAM_PUBKEY_LEN 517
|
||||
/* A public key SAM command's length */
|
||||
/* The maximum length of a SAM command with a public key */
|
||||
#define SAM_PKCMD_LEN (SAM_PUBKEY_LEN + SAM_CMD_LEN)
|
||||
/* The maximum size of a single raw packet */
|
||||
#define SAM_RAW_PAYLOAD_MAX (32 * 1024)
|
||||
/* The maximum length a SAM non-data reply can be */
|
||||
#define SAM_REPLY_LEN 1024
|
||||
|
||||
|
||||
/*
|
||||
* Shorten some standard variable types
|
||||
* Some LibSAM variable types
|
||||
*/
|
||||
|
||||
typedef signed char schar_t;
|
||||
typedef unsigned char uchar_t;
|
||||
typedef unsigned int uint_t;
|
||||
@@ -102,57 +108,62 @@ typedef enum { /* see sam_strerror() for detailed descriptions of these */
|
||||
SAM_TOO_BIG
|
||||
} samerr_t;
|
||||
|
||||
|
||||
/*
|
||||
* Public functions
|
||||
*/
|
||||
|
||||
/* SAM controls - sessions */
|
||||
extern sam_sess_t *sam_session_init(sam_sess_t *session);
|
||||
extern void sam_session_free(sam_sess_t **session);
|
||||
sam_sess_t *sam_session_init(sam_sess_t *session);
|
||||
void sam_session_free(sam_sess_t **session);
|
||||
/* SAM controls - connection */
|
||||
extern bool sam_close(sam_sess_t *session);
|
||||
extern samerr_t sam_connect(sam_sess_t *session, const char *samhost,
|
||||
uint16_t samport, const char *destname, sam_conn_t style,
|
||||
uint_t tunneldepth);
|
||||
bool sam_close(sam_sess_t *session);
|
||||
samerr_t sam_connect(sam_sess_t *session, const char *samhost,
|
||||
uint16_t samport, const char *destname, sam_conn_t style,
|
||||
uint_t tunneldepth);
|
||||
/* SAM controls - utilities */
|
||||
extern void sam_naming_lookup(sam_sess_t *session, const char *name);
|
||||
extern bool sam_read_buffer(sam_sess_t *session);
|
||||
extern const char *sam_strerror(samerr_t code);
|
||||
void sam_naming_lookup(sam_sess_t *session, const char *name);
|
||||
bool sam_read_buffer(sam_sess_t *session);
|
||||
const char *sam_strerror(samerr_t code);
|
||||
/* SAM controls - callbacks */
|
||||
extern void (*sam_diedback)(sam_sess_t *session);
|
||||
extern void (*sam_logback)(char *str);
|
||||
extern void (*sam_namingback)(char *name, sam_pubkey_t pubkey,
|
||||
samerr_t result);
|
||||
void (*sam_diedback)(sam_sess_t *session);
|
||||
void (*sam_logback)(char *str);
|
||||
void (*sam_namingback)(char *name, sam_pubkey_t pubkey, samerr_t result);
|
||||
|
||||
/* Stream commands */
|
||||
extern void sam_stream_close(sam_sess_t *session, sam_sid_t stream_id);
|
||||
extern sam_sid_t sam_stream_connect(sam_sess_t *session,
|
||||
const sam_pubkey_t dest);
|
||||
extern samerr_t sam_stream_send(sam_sess_t *session, sam_sid_t stream_id,
|
||||
const void *data, size_t size);
|
||||
void sam_stream_close(sam_sess_t *session, sam_sid_t stream_id);
|
||||
sam_sid_t sam_stream_connect(sam_sess_t *session, const sam_pubkey_t dest);
|
||||
samerr_t sam_stream_send(sam_sess_t *session, sam_sid_t stream_id,
|
||||
const void *data, size_t size);
|
||||
/* Stream commands - callbacks */
|
||||
extern void (*sam_closeback)(sam_sess_t *session, sam_sid_t stream_id,
|
||||
samerr_t reason);
|
||||
extern void (*sam_connectback)(sam_sess_t *session, sam_sid_t stream_id,
|
||||
sam_pubkey_t dest);
|
||||
extern void (*sam_databack)(sam_sess_t *session, sam_sid_t stream_id,
|
||||
void *data, size_t size);
|
||||
extern void (*sam_statusback)(sam_sess_t *session, sam_sid_t stream_id,
|
||||
samerr_t result);
|
||||
void (*sam_closeback)(sam_sess_t *session, sam_sid_t stream_id,
|
||||
samerr_t reason);
|
||||
void (*sam_connectback)(sam_sess_t *session, sam_sid_t stream_id,
|
||||
sam_pubkey_t dest);
|
||||
void (*sam_databack)(sam_sess_t *session, sam_sid_t stream_id,
|
||||
void *data, size_t size);
|
||||
void (*sam_statusback)(sam_sess_t *session, sam_sid_t stream_id,
|
||||
samerr_t result);
|
||||
|
||||
/* Stream send queue (experimental) */
|
||||
extern void sam_sendq_add(sam_sess_t *session, sam_sid_t stream_id,
|
||||
sam_sendq_t **sendq, const void *data, size_t dsize);
|
||||
extern void sam_sendq_flush(sam_sess_t *session, sam_sid_t stream_id,
|
||||
sam_sendq_t **sendq);
|
||||
void sam_sendq_add(sam_sess_t *session, sam_sid_t stream_id,
|
||||
sam_sendq_t **sendq, const void *data, size_t dsize);
|
||||
void sam_sendq_flush(sam_sess_t *session, sam_sid_t stream_id,
|
||||
sam_sendq_t **sendq);
|
||||
|
||||
/* Datagram commands */
|
||||
extern samerr_t sam_dgram_send(sam_sess_t *session, const sam_pubkey_t dest,
|
||||
const void *data, size_t size);
|
||||
|
||||
samerr_t sam_dgram_send(sam_sess_t *session, const sam_pubkey_t dest,
|
||||
const void *data, size_t size);
|
||||
/* Datagram commands - callbacks */
|
||||
extern void (*sam_dgramback)(sam_sess_t *session, sam_pubkey_t dest,
|
||||
void *data, size_t size);
|
||||
void (*sam_dgramback)(sam_sess_t *session, sam_pubkey_t dest, void *data,
|
||||
size_t size);
|
||||
|
||||
/* Raw commands */
|
||||
samerr_t sam_raw_send(sam_sess_t *session, const sam_pubkey_t dest,
|
||||
const void *data, size_t size);
|
||||
/* Raw commands - callbacks */
|
||||
void (*sam_rawback)(sam_sess_t *session, void *data, size_t size);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
@@ -55,28 +55,40 @@ static ssize_t sam_write(sam_sess_t *session, const void *buf, size_t n);
|
||||
* Callback functions
|
||||
* Note: if you add a new callback be sure to check for non-NULL in sam_connect
|
||||
*/
|
||||
|
||||
/* a peer closed the connection */
|
||||
void (*sam_closeback)(sam_sess_t *session, sam_sid_t stream_id, samerr_t reason)
|
||||
= NULL;
|
||||
|
||||
/* a peer connected to us */
|
||||
void (*sam_connectback)(sam_sess_t *session, sam_sid_t stream_id,
|
||||
sam_pubkey_t dest) = NULL;
|
||||
|
||||
/* a peer sent some stream data (`data' MUST be freed) */
|
||||
void (*sam_databack)(sam_sess_t *session, sam_sid_t stream_id, void *data,
|
||||
size_t size) = NULL;
|
||||
|
||||
/* a peer sent some datagram data (`data' MUST be freed) */
|
||||
void (*sam_dgramback)(sam_sess_t *session, sam_pubkey_t dest, void *data,
|
||||
size_t size) = NULL;
|
||||
|
||||
/* we lost the connection to the SAM host */
|
||||
void (*sam_diedback)(sam_sess_t *session) = NULL;
|
||||
|
||||
/* logging callback */
|
||||
void (*sam_logback)(char *str) = NULL;
|
||||
|
||||
/* naming lookup reply - `pubkey' will be NULL if `result' isn't SAM_OK */
|
||||
void (*sam_namingback)(char *name, sam_pubkey_t pubkey, samerr_t result) = NULL;
|
||||
|
||||
/* our connection to a peer has completed */
|
||||
void (*sam_statusback)(sam_sess_t *session, sam_sid_t stream_id,
|
||||
samerr_t result) = NULL;
|
||||
|
||||
/* a peer sent some raw data (`data' MUST be freed) */
|
||||
void (*sam_rawback)(sam_sess_t *session, void *data, size_t size) = NULL;
|
||||
|
||||
|
||||
/*
|
||||
* Closes the connection to the SAM host
|
||||
*
|
||||
@@ -155,7 +167,11 @@ samerr_t sam_connect(sam_sess_t *session, const char *samhost, uint16_t samport,
|
||||
return SAM_CALLBACKS_UNSET;
|
||||
}
|
||||
} else if (style == SAM_RAW) {
|
||||
abort(); /* not implemented yet */
|
||||
if (sam_diedback == NULL || sam_logback == NULL
|
||||
|| sam_namingback == NULL || sam_rawback == NULL) {
|
||||
SAMLOGS("Please set callback functions before connecting");
|
||||
return SAM_CALLBACKS_UNSET;
|
||||
}
|
||||
} else {
|
||||
SAMLOGS("Unknown connection style");
|
||||
return SAM_BAD_STYLE;
|
||||
@@ -295,6 +311,7 @@ static void sam_parse(sam_sess_t *session, char *s)
|
||||
#define SAM_NAMING_REPLY_OK "NAMING REPLY RESULT=OK"
|
||||
#define SAM_NAMING_REPLY_IK "NAMING REPLY RESULT=INVALID_KEY"
|
||||
#define SAM_NAMING_REPLY_KNF "NAMING REPLY RESULT=KEY_NOT_FOUND"
|
||||
#define SAM_RAW_RECEIVED_REPLY "RAW RECEIVED"
|
||||
#define SAM_STREAM_CLOSED_REPLY "STREAM CLOSED"
|
||||
#define SAM_STREAM_CONNECTED_REPLY "STREAM CONNECTED"
|
||||
#define SAM_STREAM_RECEIVED_REPLY "STREAM RECEIVED"
|
||||
@@ -305,6 +322,10 @@ static void sam_parse(sam_sess_t *session, char *s)
|
||||
#define SAM_STREAM_STATUS_REPLY_IK "STREAM STATUS RESULT=INVALID_KEY"
|
||||
#define SAM_STREAM_STATUS_REPLY_TO "STREAM STATUS RESULT=TIMEOUT"
|
||||
|
||||
/*
|
||||
* TODO: add raw parsing
|
||||
*/
|
||||
|
||||
if (strncmp(s, SAM_DGRAM_RECEIVED_REPLY,
|
||||
strlen(SAM_DGRAM_RECEIVED_REPLY)) == 0) {
|
||||
char *p;
|
||||
@@ -518,6 +539,42 @@ static void sam_parse(sam_sess_t *session, char *s)
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Sends data to a destination in a raw packet
|
||||
*
|
||||
* dest - base 64 destination of who we're sending to
|
||||
* data - the data we're sending
|
||||
* size - the size of the data
|
||||
*
|
||||
* Returns: SAM_OK on success
|
||||
*/
|
||||
samerr_t sam_raw_send(sam_sess_t *session, const sam_pubkey_t dest,
|
||||
const void *data, size_t size)
|
||||
{
|
||||
assert(session != NULL);
|
||||
char cmd[SAM_PKCMD_LEN];
|
||||
|
||||
if (size < 1 || size > SAM_RAW_PAYLOAD_MAX) {
|
||||
#ifdef NO_Z_FORMAT
|
||||
SAMLOG("Invalid data send size (%u bytes)", size);
|
||||
#else
|
||||
SAMLOG("Invalid data send size (%zu bytes)", size);
|
||||
#endif
|
||||
return SAM_TOO_BIG;
|
||||
}
|
||||
#ifdef NO_Z_FORMAT
|
||||
snprintf(cmd, sizeof cmd, "RAW SEND DESTINATION=%s SIZE=%u\n",
|
||||
dest, size);
|
||||
#else
|
||||
snprintf(cmd, sizeof cmd, "RAW SEND DESTINATION=%s SIZE=%zu\n",
|
||||
dest, size);
|
||||
#endif
|
||||
sam_write(session, cmd, strlen(cmd));
|
||||
sam_write(session, data, size);
|
||||
|
||||
return SAM_OK;
|
||||
}
|
||||
|
||||
/*
|
||||
* Reads and callbacks everything in the SAM network buffer until it is clear
|
||||
*
|
||||
|
@@ -391,6 +391,8 @@ public class SAMStreamSession {
|
||||
while (stillRunning) {
|
||||
try {
|
||||
i2ps = serverSocket.accept();
|
||||
if (i2ps == null)
|
||||
break;
|
||||
|
||||
_log.debug("New incoming connection");
|
||||
|
||||
@@ -467,6 +469,7 @@ public class SAMStreamSession {
|
||||
}
|
||||
try {
|
||||
i2pSocketOS.write(data);
|
||||
//i2pSocketOS.flush();
|
||||
} catch (IOException e) {
|
||||
_log.error("Error sending data through I2P socket", e);
|
||||
return false;
|
||||
|
@@ -70,7 +70,7 @@ public class Connection {
|
||||
private long _lifetimeDupMessageReceived;
|
||||
|
||||
public static final long MAX_RESEND_DELAY = 60*1000;
|
||||
public static final long MIN_RESEND_DELAY = 20*1000;
|
||||
public static final long MIN_RESEND_DELAY = 40*1000;
|
||||
|
||||
/** wait up to 5 minutes after disconnection so we can ack/close packets */
|
||||
public static int DISCONNECT_TIMEOUT = 5*60*1000;
|
||||
@@ -313,14 +313,36 @@ public class Connection {
|
||||
return acked;
|
||||
}
|
||||
|
||||
private long _occurredTime;
|
||||
private long _occurredEventCount;
|
||||
void eventOccurred() {
|
||||
_chooser.getScheduler(this).eventOccurred(this);
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
TaskScheduler sched = _chooser.getScheduler(this);
|
||||
|
||||
now = now - now % 1000;
|
||||
if (_occurredTime == now) {
|
||||
_occurredEventCount++;
|
||||
} else {
|
||||
_occurredTime = now;
|
||||
if (_occurredEventCount > 100) {
|
||||
_log.log(Log.CRIT, "More than 100 events (" + _occurredEventCount + ") in a second on "
|
||||
+ toString() + ": scheduler = " + sched);
|
||||
}
|
||||
_occurredEventCount = 0;
|
||||
}
|
||||
|
||||
sched.eventOccurred(this);
|
||||
}
|
||||
|
||||
void resetReceived() {
|
||||
_resetReceived = true;
|
||||
_outputStream.streamErrorOccurred(new IOException("Reset received"));
|
||||
_inputStream.streamErrorOccurred(new IOException("Reset received"));
|
||||
MessageOutputStream mos = _outputStream;
|
||||
MessageInputStream mis = _inputStream;
|
||||
if (mos != null)
|
||||
mos.streamErrorOccurred(new IOException("Reset received"));
|
||||
if (mis != null)
|
||||
mis.streamErrorOccurred(new IOException("Reset received"));
|
||||
_connectionError = "Connection reset";
|
||||
synchronized (_connectLock) { _connectLock.notifyAll(); }
|
||||
}
|
||||
@@ -332,31 +354,18 @@ public class Connection {
|
||||
disconnect(cleanDisconnect, true);
|
||||
}
|
||||
void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) {
|
||||
if (!_connected) return;
|
||||
_connected = false;
|
||||
synchronized (_connectLock) { _connectLock.notifyAll(); }
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Disconnecting " + toString(), new Exception("discon"));
|
||||
|
||||
if (cleanDisconnect) {
|
||||
if (cleanDisconnect && _connected) {
|
||||
// send close packets and schedule stuff...
|
||||
_outputStream.closeInternal();
|
||||
_inputStream.close();
|
||||
} else {
|
||||
doClose();
|
||||
boolean tagsCancelled = false;
|
||||
synchronized (_outboundPackets) {
|
||||
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
|
||||
PacketLocal pl = (PacketLocal)iter.next();
|
||||
if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) )
|
||||
tagsCancelled = true;
|
||||
pl.cancelled();
|
||||
}
|
||||
_outboundPackets.clear();
|
||||
_outboundPackets.notifyAll();
|
||||
}
|
||||
if (tagsCancelled)
|
||||
_context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
|
||||
if (_connected)
|
||||
doClose();
|
||||
killOutstandingPackets();
|
||||
}
|
||||
if (removeFromConMgr) {
|
||||
if (!_disconnectScheduled) {
|
||||
@@ -364,6 +373,7 @@ public class Connection {
|
||||
SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
|
||||
}
|
||||
}
|
||||
_connected = false;
|
||||
}
|
||||
|
||||
void disconnectComplete() {
|
||||
@@ -376,10 +386,8 @@ public class Connection {
|
||||
_outputStream.destroy();
|
||||
_outputStream = null;
|
||||
_outboundQueue = null;
|
||||
_handler = null;
|
||||
if (_receiver != null)
|
||||
_receiver.destroy();
|
||||
_receiver = null;
|
||||
if (_activityTimer != null)
|
||||
SimpleTimer.getInstance().addEvent(_activityTimer, 1);
|
||||
_activityTimer = null;
|
||||
@@ -393,6 +401,10 @@ public class Connection {
|
||||
_connectionManager.removeConnection(this);
|
||||
}
|
||||
|
||||
killOutstandingPackets();
|
||||
}
|
||||
|
||||
private void killOutstandingPackets() {
|
||||
boolean tagsCancelled = false;
|
||||
synchronized (_outboundPackets) {
|
||||
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
|
||||
@@ -406,7 +418,6 @@ public class Connection {
|
||||
}
|
||||
if (tagsCancelled)
|
||||
_context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
|
||||
|
||||
}
|
||||
|
||||
private class DisconnectEvent implements SimpleTimer.TimedEvent {
|
||||
@@ -416,6 +427,7 @@ public class Connection {
|
||||
+ Connection.this.toString());
|
||||
}
|
||||
public void timeReached() {
|
||||
killOutstandingPackets();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Connection disconnect timer complete, drop the con "
|
||||
+ Connection.this.toString());
|
||||
|
@@ -63,7 +63,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
|
||||
|
||||
if (doSend) {
|
||||
PacketLocal packet = send(buf, off, size);
|
||||
return packet;
|
||||
//dont wait for non-acks
|
||||
if ( (packet.getPayloadSize() > 0) || (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) )
|
||||
return packet;
|
||||
else
|
||||
return _dummyStatus;
|
||||
} else {
|
||||
return _dummyStatus;
|
||||
}
|
||||
|
@@ -31,7 +31,14 @@ class ConnectionHandler {
|
||||
_acceptTimeout = DEFAULT_ACCEPT_TIMEOUT;
|
||||
}
|
||||
|
||||
public void setActive(boolean active) { _active = active; }
|
||||
public void setActive(boolean active) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("setActive(" + active + ") called");
|
||||
synchronized (_synQueue) {
|
||||
_active = active;
|
||||
_synQueue.notifyAll(); // so we break from the accept()
|
||||
}
|
||||
}
|
||||
public boolean getActive() { return _active; }
|
||||
|
||||
public void receiveNewSyn(Packet packet) {
|
||||
@@ -66,8 +73,17 @@ class ConnectionHandler {
|
||||
while (true) {
|
||||
if ( (timeoutMs > 0) && (expiration < _context.clock().now()) )
|
||||
return null;
|
||||
if (!_active)
|
||||
if (!_active) {
|
||||
// fail all the ones we had queued up
|
||||
synchronized (_synQueue) {
|
||||
for (int i = 0; i < _synQueue.size(); i++) {
|
||||
Packet packet = (Packet)_synQueue.get(i);
|
||||
sendReset(packet);
|
||||
}
|
||||
_synQueue.clear();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
Packet syn = null;
|
||||
synchronized (_synQueue) {
|
||||
|
@@ -157,7 +157,7 @@ public class ConnectionPacketHandler {
|
||||
+ ") for " + con);
|
||||
|
||||
return true;
|
||||
} else if (numResends > 0) {
|
||||
//} else if (numResends > 0) {
|
||||
// window sizes are shrunk on resend, not on ack
|
||||
} else {
|
||||
if (acked > 0) {
|
||||
@@ -166,17 +166,19 @@ public class ConnectionPacketHandler {
|
||||
// new packet that ack'ed uncongested data, or an empty ack
|
||||
int newWindowSize = con.getOptions().getWindowSize();
|
||||
|
||||
if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
|
||||
// congestion avoidance
|
||||
|
||||
// we can't use newWindowSize += 1/newWindowSize, since we're
|
||||
// integers, so lets use a random distribution instead
|
||||
int shouldIncrement = _context.random().nextInt(newWindowSize);
|
||||
if (shouldIncrement <= 0)
|
||||
if (numResends <= 0) {
|
||||
if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
|
||||
// congestion avoidance
|
||||
|
||||
// we can't use newWindowSize += 1/newWindowSize, since we're
|
||||
// integers, so lets use a random distribution instead
|
||||
int shouldIncrement = _context.random().nextInt(newWindowSize);
|
||||
if (shouldIncrement <= 0)
|
||||
newWindowSize += 1;
|
||||
} else {
|
||||
// slow start
|
||||
newWindowSize += 1;
|
||||
} else {
|
||||
// slow start
|
||||
newWindowSize += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
|
@@ -272,6 +272,9 @@ public class MessageInputStream extends InputStream {
|
||||
// at least one byte
|
||||
|
||||
while (_readyDataBlocks.size() <= 0) {
|
||||
if (_locallyClosed)
|
||||
throw new IOException("Already closed, you wanker");
|
||||
|
||||
if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("read(...," + offset + ", " + length + ")[" + i
|
||||
@@ -402,6 +405,7 @@ public class MessageInputStream extends InputStream {
|
||||
ba.setData(null);
|
||||
}
|
||||
_locallyClosed = true;
|
||||
_dataLock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -8,6 +8,7 @@ import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.ByteArray;
|
||||
import net.i2p.util.ByteCache;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SimpleTimer;
|
||||
|
||||
/**
|
||||
* A stream that we can shove data into that fires off those bytes
|
||||
@@ -26,6 +27,11 @@ public class MessageOutputStream extends OutputStream {
|
||||
private long _written;
|
||||
private int _writeTimeout;
|
||||
private ByteCache _dataCache;
|
||||
private Flusher _flusher;
|
||||
private long _lastFlushed;
|
||||
private long _lastBuffered;
|
||||
/** if we enqueue data but don't flush it in this period, flush it passively */
|
||||
private int _passiveFlushDelay;
|
||||
|
||||
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
|
||||
this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE);
|
||||
@@ -41,6 +47,10 @@ public class MessageOutputStream extends OutputStream {
|
||||
_written = 0;
|
||||
_closed = false;
|
||||
_writeTimeout = -1;
|
||||
_passiveFlushDelay = 5*1000;
|
||||
_flusher = new Flusher();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("MessageOutputStream created");
|
||||
}
|
||||
|
||||
public void setWriteTimeout(int ms) { _writeTimeout = ms; }
|
||||
@@ -51,10 +61,11 @@ public class MessageOutputStream extends OutputStream {
|
||||
}
|
||||
|
||||
public void write(byte b[], int off, int len) throws IOException {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("write(b[], " + off + ", " + len + ")");
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("write(b[], " + off + ", " + len + ") ");
|
||||
int cur = off;
|
||||
int remaining = len;
|
||||
long begin = _context.clock().now();
|
||||
while (remaining > 0) {
|
||||
WriteStatus ws = null;
|
||||
// we do any waiting outside the synchronized() block because we
|
||||
@@ -70,6 +81,11 @@ public class MessageOutputStream extends OutputStream {
|
||||
cur += remaining;
|
||||
_written += remaining;
|
||||
remaining = 0;
|
||||
_lastBuffered = _context.clock().now();
|
||||
if (_passiveFlushDelay > 0) {
|
||||
// if it is already enqueued, this just pushes it further out
|
||||
SimpleTimer.getInstance().addEvent(_flusher, _passiveFlushDelay);
|
||||
}
|
||||
} else {
|
||||
// buffer whatever we can fit then flush,
|
||||
// repeating until we've pushed all of the
|
||||
@@ -79,13 +95,20 @@ public class MessageOutputStream extends OutputStream {
|
||||
remaining -= toWrite;
|
||||
cur += toWrite;
|
||||
_valid = _buf.length;
|
||||
if (_dataReceiver == null) {
|
||||
throwAnyError();
|
||||
return;
|
||||
}
|
||||
ws = _dataReceiver.writeData(_buf, 0, _valid);
|
||||
_written += _valid;
|
||||
_valid = 0;
|
||||
throwAnyError();
|
||||
_lastFlushed = _context.clock().now();
|
||||
}
|
||||
}
|
||||
if (ws != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Waiting " + _writeTimeout + "ms for accept of " + ws);
|
||||
// ok, we've actually added a new packet - lets wait until
|
||||
// its accepted into the queue before moving on (so that we
|
||||
// dont fill our buffer instantly)
|
||||
@@ -96,8 +119,14 @@ public class MessageOutputStream extends OutputStream {
|
||||
else
|
||||
throw new IOException("Write not accepted into the queue");
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Queued " + len + " without sending to the receiver");
|
||||
}
|
||||
}
|
||||
long elapsed = _context.clock().now() - begin;
|
||||
if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) )
|
||||
_log.debug("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo"));
|
||||
throwAnyError();
|
||||
}
|
||||
|
||||
@@ -106,6 +135,33 @@ public class MessageOutputStream extends OutputStream {
|
||||
throwAnyError();
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush data that has been enqued but not flushed after a certain
|
||||
* period of inactivity
|
||||
*/
|
||||
private class Flusher implements SimpleTimer.TimedEvent {
|
||||
public void timeReached() {
|
||||
boolean sent = false;
|
||||
WriteStatus ws = null;
|
||||
synchronized (_dataLock) {
|
||||
if ( (_valid > 0) && (_lastBuffered + _passiveFlushDelay > _context.clock().now()) ) {
|
||||
if ( (_buf != null) && (_dataReceiver != null) ) {
|
||||
ws = _dataReceiver.writeData(_buf, 0, _valid);
|
||||
_written += _valid;
|
||||
_valid = 0;
|
||||
_lastFlushed = _context.clock().now();
|
||||
_dataLock.notifyAll();
|
||||
sent = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// ignore the ws
|
||||
if (sent && _log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Passive flush of " + ws);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush the data already queued up, blocking until it has been
|
||||
* delivered.
|
||||
@@ -114,12 +170,18 @@ public class MessageOutputStream extends OutputStream {
|
||||
* @throws InterruptedIOException if the write times out
|
||||
*/
|
||||
public void flush() throws IOException {
|
||||
long begin = _context.clock().now();
|
||||
WriteStatus ws = null;
|
||||
synchronized (_dataLock) {
|
||||
if (_buf == null) throw new IOException("closed (buffer went away)");
|
||||
if (_dataReceiver == null) {
|
||||
throwAnyError();
|
||||
return;
|
||||
}
|
||||
ws = _dataReceiver.writeData(_buf, 0, _valid);
|
||||
_written += _valid;
|
||||
_valid = 0;
|
||||
_lastFlushed = _context.clock().now();
|
||||
_dataLock.notifyAll();
|
||||
}
|
||||
|
||||
@@ -129,6 +191,8 @@ public class MessageOutputStream extends OutputStream {
|
||||
( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) ||
|
||||
(_writeTimeout <= 0) ) )
|
||||
ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
|
||||
else if ( (_writeTimeout <= 0) || (_writeTimeout > Connection.DISCONNECT_TIMEOUT) )
|
||||
ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
|
||||
else
|
||||
ws.waitForCompletion(_writeTimeout);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@@ -137,6 +201,10 @@ public class MessageOutputStream extends OutputStream {
|
||||
throw new InterruptedIOException("Timed out during write");
|
||||
else if (ws.writeFailed())
|
||||
throw new IOException("Write failed");
|
||||
|
||||
long elapsed = _context.clock().now() - begin;
|
||||
if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) )
|
||||
_log.debug("wtf, took " + elapsed + "ms to flush the stream?\n" + ws, new Exception("bar"));
|
||||
throwAnyError();
|
||||
}
|
||||
|
||||
@@ -164,7 +232,8 @@ public class MessageOutputStream extends OutputStream {
|
||||
ByteArray ba = null;
|
||||
synchronized (_dataLock) {
|
||||
// flush any data, but don't wait for it
|
||||
_dataReceiver.writeData(_buf, 0, _valid);
|
||||
if (_dataReceiver != null)
|
||||
_dataReceiver.writeData(_buf, 0, _valid);
|
||||
_written += _valid;
|
||||
_valid = 0;
|
||||
|
||||
@@ -173,6 +242,7 @@ public class MessageOutputStream extends OutputStream {
|
||||
_buf = null;
|
||||
_valid = 0;
|
||||
}
|
||||
_lastFlushed = _context.clock().now();
|
||||
_dataLock.notifyAll();
|
||||
}
|
||||
if (ba != null) {
|
||||
@@ -212,6 +282,7 @@ public class MessageOutputStream extends OutputStream {
|
||||
_written += _valid;
|
||||
_valid = 0;
|
||||
_dataLock.notifyAll();
|
||||
_lastFlushed = _context.clock().now();
|
||||
}
|
||||
if (blocking && ws != null) {
|
||||
ws.waitForAccept(_writeTimeout);
|
||||
|
@@ -47,7 +47,8 @@ class SchedulerClosed extends SchedulerImpl {
|
||||
}
|
||||
|
||||
public void eventOccurred(Connection con) {
|
||||
long timeLeft = con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT - _context.clock().now();
|
||||
reschedule(timeLeft, con);
|
||||
// noop. we do the timeout through the simpleTimer anyway
|
||||
//long timeLeft = con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT - _context.clock().now();
|
||||
//reschedule(timeLeft, con);
|
||||
}
|
||||
}
|
||||
|
@@ -239,6 +239,8 @@
|
||||
<copy file="build/routerconsole.jar" todir="pkg-temp/lib/" />
|
||||
<copy file="build/i2ptunnel.war" todir="pkg-temp/webapps/" />
|
||||
<copy file="build/routerconsole.war" todir="pkg-temp/webapps/" />
|
||||
<copy file="installer/resources/wrapper.config" todir="pkg-temp/" />
|
||||
<copy file="installer/resources/wrapper.config" tofile="pkg-temp/wrapper.config.updated" />
|
||||
<copy file="history.txt" todir="pkg-temp/" />
|
||||
<copy file="hosts.txt" todir="pkg-temp/" />
|
||||
<mkdir dir="pkg-temp/eepsite" />
|
||||
|
@@ -194,6 +194,12 @@ class I2PSessionImpl2 extends I2PSessionImpl {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId()
|
||||
+ " / " + state.getNonce() + " found = " + found);
|
||||
|
||||
long timeToSend = afterRemovingSync - beforeSendingSync;
|
||||
if ( (timeToSend > 10*1000) && (_log.shouldLog(Log.WARN)) ) {
|
||||
_log.warn("wtf, took " + timeToSend + "ms to send the message?!", new Exception("baz"));
|
||||
}
|
||||
|
||||
if (found) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with "
|
||||
|
@@ -72,6 +72,10 @@ public class SimpleTimer {
|
||||
_log.log(Log.CRIT, msg, t);
|
||||
}
|
||||
|
||||
private long _occurredTime;
|
||||
private long _occurredEventCount;
|
||||
private TimedEvent _recentEvents[] = new TimedEvent[5];
|
||||
|
||||
private class SimpleTimerRunner implements Runnable {
|
||||
public void run() {
|
||||
List eventsToFire = new ArrayList(1);
|
||||
@@ -121,6 +125,9 @@ public class SimpleTimer {
|
||||
}
|
||||
}
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
now = now - (now % 1000);
|
||||
|
||||
for (int i = 0; i < eventsToFire.size(); i++) {
|
||||
TimedEvent evt = (TimedEvent)eventsToFire.get(i);
|
||||
try {
|
||||
@@ -128,7 +135,30 @@ public class SimpleTimer {
|
||||
} catch (Throwable t) {
|
||||
log("wtf, event borked: " + evt, t);
|
||||
}
|
||||
_recentEvents[4] = _recentEvents[3];
|
||||
_recentEvents[3] = _recentEvents[2];
|
||||
_recentEvents[2] = _recentEvents[1];
|
||||
_recentEvents[1] = _recentEvents[0];
|
||||
_recentEvents[0] = evt;
|
||||
}
|
||||
|
||||
if (_occurredTime == now) {
|
||||
_occurredEventCount += eventsToFire.size();
|
||||
} else {
|
||||
_occurredTime = now;
|
||||
if (_occurredEventCount > 100) {
|
||||
StringBuffer buf = new StringBuffer(256);
|
||||
buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
|
||||
buf.append(") in a second! Last 5: \n");
|
||||
for (int i = 0; i < _recentEvents.length; i++) {
|
||||
if (_recentEvents[i] != null)
|
||||
buf.append(_recentEvents[i]).append('\n');
|
||||
}
|
||||
_log.log(Log.CRIT, buf.toString());
|
||||
}
|
||||
_occurredEventCount = 0;
|
||||
}
|
||||
|
||||
eventsToFire.clear();
|
||||
}
|
||||
}
|
||||
|
67
history.txt
67
history.txt
@@ -1,4 +1,69 @@
|
||||
$Id: history.txt,v 1.82 2004/11/25 16:57:19 jrandom Exp $
|
||||
$Id: history.txt,v 1.90 2004/11/30 18:41:52 jrandom Exp $
|
||||
|
||||
* 2004-12-01 0.4.2.1 released
|
||||
|
||||
2004-12-01 jrandom
|
||||
* Strip out any of the Accept-* HTTP header lines, and always make sure to
|
||||
include the forged User-agent header.
|
||||
* Adjust the default read timeout on the eepproxy to 60s, unless
|
||||
overridden.
|
||||
* Minor tweak on stream shutdown.
|
||||
|
||||
2004-11-30 jrandom
|
||||
* Render the burst rate fields on /config.jsp properly (thanks ugha!)
|
||||
* Build in a simple timeout to flush data queued into the I2PSocket but
|
||||
not yet flushed.
|
||||
* Don't explicitly flush after each SAM stream write, but leave it up to
|
||||
the [nonblocking] passive flush.
|
||||
* Don't whine about 10-99 connection events occurring in a second
|
||||
* Don't wait for completion of packets that will not be ACKed (duh)
|
||||
* Adjust the congestion window, even if the packet was resent (duh)
|
||||
* Make sure to wake up any blocking read()'s when the MessageInputStream
|
||||
is close()ed (duh)
|
||||
* Never wait more than the disconnect timeout for a write to complete
|
||||
|
||||
2004-11-29 jrandom
|
||||
* Minor fixes to avoid unnecessary errors on shutdown (thanks susi!)
|
||||
|
||||
2004-11-29 jrandom
|
||||
* Reduced contention for local client delivery
|
||||
* Drop the new code that munges the wrapper.config. Instead, updates that
|
||||
need to change it will include their own wrapper.config in the
|
||||
i2pupdate.zip, overwriting the existing file. If the file
|
||||
"wrapper.config.updated" is included, it is deleted at first opportunity
|
||||
and the router shut down, displaying a notice that the router must be
|
||||
started again cleanly to allow the changes to the wrapper.config to take
|
||||
effect.
|
||||
* Properly stop accept()ing I2PSocket connections if we close down the
|
||||
session (duh).
|
||||
* Make sure we cancel any outstanding Packets in flight when a connection
|
||||
is terminated (thanks susi!)
|
||||
* Split up the I2PTunnel closing a little further.
|
||||
|
||||
2004-11-28 jrandom
|
||||
* Accept IP address detection changes with a 2-out-of-3 minimum.
|
||||
* As long as the router is up, keep retrying to bind the I2CP listener.
|
||||
* Decrease the java service wrapper ping frequency to once every 10
|
||||
minutes, rather than once every 5 seconds.
|
||||
|
||||
2004-11-27 jrandom
|
||||
* Some cleanup and bugfixes for the IP address detection code where we
|
||||
only consider connections that have actually sent and received messages
|
||||
recently as active, rather than the mere presence of a TCP socket as
|
||||
activity.
|
||||
|
||||
2004-11-27 jrandom
|
||||
* Removed the I2PTunnel inactivity timeout thread, since the new streaming
|
||||
lib can do that (without an additional per-connection thread).
|
||||
* Close the I2PTunnel forwarder threads more aggressively
|
||||
|
||||
2004-11-27 jrandom
|
||||
* Fix for a fast loop caused by a race in the new streaming library (thanks
|
||||
DrWoo, frontier, pwk_, and thetower!)
|
||||
* Minor updates to the SimpleTimer and Connection to help track down a
|
||||
high CPU usage problem (dumping debug info to stdout/wrapper.log if too
|
||||
many events/tasks fire in a second)
|
||||
* Minor fixes for races on client disconnects (causing NPEs)
|
||||
|
||||
* 2004-11-26 0.4.2 released
|
||||
|
||||
|
@@ -1,6 +1,9 @@
|
||||
; TC's hosts.txt guaranteed freshness
|
||||
; $Id: hosts.txt,v 1.79 2004/11/22 22:58:46 jrandom Exp $
|
||||
; $Id: hosts.txt,v 1.82 2004/11/28 17:47:01 jrandom Exp $
|
||||
; changelog:
|
||||
; (1.105) added bdl.i2p
|
||||
; (1.104) added bacardi.i2p and guttersnipe.i2p
|
||||
; (1.103) added evil.i2p
|
||||
; (1.102) added bsdm.i2p
|
||||
; (1.101) added eschaton.i2p
|
||||
; (1.100) added blog.curiosity.i2p
|
||||
@@ -228,4 +231,8 @@ slacker.i2p=BR~D1lQNF~NLjdrJILDBA4DaQZpDoQZhVFNAwBgUz3sEP2tHnuiS7-EO2jwXEPKHIVZN
|
||||
blog.curiosity.i2p=GqgGNWvNcbEABjAxBNJmILDIVJq0GSpf3eyjQt5EQsJFPCA3fYFYUULh4WoPFEk3I4pOV0lPGlkFX6Q52nkIk~kW~YGsCjxDJJ~08m31KgPJ70-svh7Fj-D8Bs27VaG9nXEltCJfjzfHsNY6m5jCCf400-14jTEghAjguIwPbpCpJ351jrE36sSU861LhhKnCiTItIN6ig9LROIUOJnTvh4-vRHrHS~oQVesAUqZ0zMPRNYlC1GHgTvou8iKLXlPnq3u1ITfCQYjeC6lpNn3bnnJOfrzGsPW3Y70jG5fqTPRqCyef5j0gNa~ZewMAQvV~oKxpDFTixNx8pBRv913pRXTdZfxT8D-fpYbXP1O0GtaHTgPO7PYWFKHCAaxmleD7tzSpIyfl4LpO0Y1Be5eHAZh5Lh1nE~tqVJZUbcVqZzDPmrmT20oCDljsZ8kEIRb3mn3VpzN5wOikPTh8AHg1jfic9hGBv5Lqu5SBffSCQZi7znTB7peX3ZavGODotR2AAAA
|
||||
eschaton.i2p=YTlIyW42pdmE~D8fc3eUSdyN9x~SsddHcAXi3hISTTovLO-CphmfAqyvAZ1GV2-xExVlK7u~WBT8w9NBeig48NsAXCkNuWvZS~hzw9AERj5HVe5Jwzvd78x6rsFLaYbAFeSngtH~Bxi~ZLfLtXbcD7aZsXh2bCSV8OwKPRmPiKsV48Bwi9BSJA84VQLtpnbg3MtSDAciLVVH2Prvu939CciLAkdaST6JWBDY553c~4RlaHkxR~uHsskO0sNnCp2ASGRR8zoaDTP9eKhy9vXcCJZ6EYKDmRGcb~0hpuB67lP47PJaiNCWNk5jI~VNdCUE2O9D-XBPA88qvDZVK0wZWTL3Hxh5uvqr-rKZ2Txlh-qZ1vispjv47VC88ht~mQi~PGbqZxvPIOe9LxemIjlBedDjcS675cyVm-stwx-8G8au-hE7QNhZ3XN8kRlNcCTtMJxX2QRXT6juCZVNXfnO~sCexp7inl7~Q~yCeBbOoWhQoOOsm7kHMNEGxOkpkxjxAAAA
|
||||
bdsm.i2p=qOcqbBo4vxHbldI758R76ZJZu92zWefovF6IEEv~XKA-UkHNbY-y6v9w-9ts-tbZxpak8l1QyB6Mp1Rwy-sE1YxevjIPhV-UQZSsJHRJHVZasR3ULncY-g-dvzGLab9YGtkdfub2GMlCJzDm9F8sUvGnLxlZ28mj3~ZLC0vEt5BeCl5hDuT9CiAzWarcSC2M1PIOMxIUYz9K5ueanOZjmAkNet4igKM0XmfXL8c85VeiPO8jx9G0ZDARwObXxvNdPq5pDwOZSY-5f-crDWMV1KesdmqsKIgydZ5GSPzq5jKLJdY3bUsxViNoWzjIWKfbwm6Wgl3eb9wzLEXDP1m2WGWAetNCgwSbT30A2yhTEWFwEqSE0GvWKvlztyG3oGm8eKN-VymRwduORnpKYbALNZ2MG4ZtVhKklER4hrgYSLuuXIi7KhQ0WXrKjPFnnL2MmnpPsFYn0ZQz6ilYOSAYcV3rBsd83RAFPWodlCjt0TpnpvcFfYZqfKZtYQryzkdFAAAA
|
||||
evil.i2p=NGNaN9qrpk2dC~OQlo-1yHT~Tvb6SvU8VH9SiBkwNZxuNVRKN~2ZtU50nkTklevB5V08GMT5tkpyZCyVwVgZNNl50uz25t~xP3Pte6GUxzBfBJ8n2GoFdlhn5jGHhmM6QI3nlDgLO7AaNdIQu8LxaoO4lln~fReKvY3sNRMAOJlBO0mHH1PSsemJmqkhl02VX6-3KllgZ99E7uVT-ap2i7Pf3wvLtBtoHryZXvYtKL9zV17tBfhmIVT~VSuhsGRVoy43eoep~fPNCE3s0jr4GLEeWfx2LF0WsvN2avii43DFB23BaZAKd7myLV~pmN9L7pOILnsrktO2MQXy2q2OriZs8dxN3w0yboE5iAfTt8IWtHfNLp8aGx2HzmF1TacrPxuJmg~C9mawTGtZ2aKMBr7vHtN1VuZnOdVi5hcC7tQ0YuwPtAkOB1iO2Wbh3csYtfusPoIqezP1O-iUndNCRg86u3PZ71jXhSq4mMZaNZGYiiqB7nXVkBi15nUbQRb8AAAA
|
||||
bacardi.i2p=oZm0JRHiUFKwAzLz1wlNOK2h2fI8V6u1nUhgCpt1RcErs99QMPHqu4oR4cel5lsJbeg1X0GgHe72JYabsntjimjWs1zi0RzknddVvq0hMGnn9EA-9Atu9qViScXp42ddnYhIlMBNNswMp8AJ01jHkO3SUSDSVk-rF5wnrhpyN9BFyho8h18nHrr6S4jaGHsuLVage8ImwQRv~PYfr4hVULdiFn5HDRePdvgzKUD3o4Y7nFiQMXswP02ZhXXtE-rMGDs2TD15qYah6mkQWdZtRYFuKuCh7Myn35u2IHGyMs56zpnuVr~w~Uh7wydg26JShsAz4AyZhGy70eTFUC2dtEr7bUvQSE3V3xNKOeBuGYadipWB30xmGDX2kfaU5Efy45MLPBwTU-8f4owQBokgSD3jBEc4vV0DbJrGoNnA3zpAC0JwZ9rFLolQNzenRgkWYlO09PEkKPUyUMpUBGY~Wj7jcsGnt5uNskeHHpV4hsvdpboHnlPAcZKvN7rTCenoAAAA
|
||||
guttersnipe.i2p=yS6ECPvCobRLKMXJg-rLZ5PF6dRcXliR2e9KEX~aUZMEsznpzE-lCs~FgGd7w9-~GW8ObvKiyt47S7SuCunjndl874xPhrQ0dFRUNEsdovLBr6s9v2xi7kpWC1yKLbT-Ti3p8WhNPZkGsJl6YEIc357j4FjNVFx2-b~V16G0lhjhY1a55nsCoU7xXisdqOaKZL-4Y7vEVHqZNgM6cAZpY8vHtui30J3oGmV-RcJG9ahYkorqleKtb2G0fwbUMakxHv5uzlHyAU~r7OmS7NCIfoyBgU3MrSiwkYiW5elQ294XV-1Bciy9DFnJh4KA0rLIA7GanvkQ-NwqoGLsXz08mpi12vVssDhdavXooBeZLg7gipFYzle3-B1rgGKZ~51~9Tou~XSn6jGJGgtqKV7fTmys4~AXNy7vCWrMKQaWh6M1HI69yV466bJWv6nVhwNmOmtk48EIC3ik2IXC1wjA6Rgz2AcSiR-UEyaBkv251Vu6H92HZJTrsTzuzVmZcYt0AAAA
|
||||
bdl.i2p=IMzcrD4jQLBtyzV8TdL-jgbyDhRjMBS1BfI6xGuddMyJEoMnZ1ZWDUn3NwqdyEYRQD7zREgbsQgzpE7Gdmpp-vLrzba3W1mUR5Ddu13tgIiDrOWR38Omv7L~DTDimK9m0Y-HCJROVigRfxdZbsI6P37d77NArdpwLzFos6qyK2C40JQFIaNCPYGkf05DsXtHLPwTJXYVKVp0R2V7nGmI77BUdwDgt4zSotlyvmsX6U1mYqKwutwr~oxggdvgfoNbrGC0~xQCbfQtEOwFYwxc5oUeJlt4jjb1-C9HAb7r4LtJ3Daqr0bx1hXS5hADw68cfUHbEjbfrhJhBA0mMqgGMm~r~0II8R19EXprc91YY4d0QycR2Osdn9MVNXS01Ziy~JX-SfT8DTjj2ZZn7rpyjcq7rgVbspPLJOdNQNERiAm06yjosOrPjtl8mrWXxWcOcGlMO5ftoTvX2hFsfc-vmfN7S8IZEfUdJRpJxDytChNTUr~qMWuP0LicNv0xKtA3AAAA
|
||||
|
||||
|
@@ -4,7 +4,7 @@
|
||||
|
||||
<info>
|
||||
<appname>i2p</appname>
|
||||
<appversion>0.4.2</appversion>
|
||||
<appversion>0.4.2.1</appversion>
|
||||
<authors>
|
||||
<author name="I2P" email="support@i2p.net"/>
|
||||
</authors>
|
||||
|
@@ -103,6 +103,9 @@ wrapper.jvm_exit.timeout=10
|
||||
# give the OS 60s to clear all the old sockets / etc before restarting
|
||||
wrapper.restart.delay=60
|
||||
|
||||
wrapper.ping.interval=600
|
||||
wrapper.ping.timeout=605
|
||||
|
||||
# use the wrapper's internal timer thread. otherwise this would
|
||||
# force a restart of the router during daylight savings time as well
|
||||
# as any time that the OS clock changes
|
||||
|
@@ -36,7 +36,6 @@ import net.i2p.data.i2np.TunnelMessage;
|
||||
import net.i2p.router.message.GarlicMessageHandler;
|
||||
import net.i2p.router.message.TunnelMessageHandler;
|
||||
import net.i2p.router.startup.StartupJob;
|
||||
import net.i2p.router.startup.VerifyClasspath;
|
||||
import net.i2p.stat.Rate;
|
||||
import net.i2p.stat.RateStat;
|
||||
import net.i2p.util.FileUtil;
|
||||
@@ -794,7 +793,7 @@ public class Router {
|
||||
|
||||
public static void main(String args[]) {
|
||||
installUpdates();
|
||||
verifyClasspath();
|
||||
verifyWrapperConfig();
|
||||
Router r = new Router();
|
||||
r.runRouter();
|
||||
}
|
||||
@@ -820,10 +819,11 @@ public class Router {
|
||||
}
|
||||
}
|
||||
|
||||
private static void verifyClasspath() {
|
||||
boolean updated = VerifyClasspath.updateClasspath();
|
||||
if (updated) {
|
||||
System.out.println("INFO: Classpath updated, but the service wrapper requires you to manually restart");
|
||||
private static void verifyWrapperConfig() {
|
||||
File cfgUpdated = new File("wrapper.config.updated");
|
||||
if (cfgUpdated.exists()) {
|
||||
cfgUpdated.delete();
|
||||
System.out.println("INFO: Wrapper config updated, but the service wrapper requires you to manually restart");
|
||||
System.out.println("INFO: Shutting down the router - please rerun it!");
|
||||
System.exit(EXIT_HARD);
|
||||
}
|
||||
|
@@ -15,8 +15,8 @@ import net.i2p.CoreVersion;
|
||||
*
|
||||
*/
|
||||
public class RouterVersion {
|
||||
public final static String ID = "$Revision: 1.87 $ $Date: 2004/11/25 16:57:20 $";
|
||||
public final static String VERSION = "0.4.2";
|
||||
public final static String ID = "$Revision: 1.95 $ $Date: 2004/11/30 18:41:51 $";
|
||||
public final static String VERSION = "0.4.2.1";
|
||||
public final static long BUILD = 0;
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Router version: " + VERSION);
|
||||
|
@@ -143,6 +143,8 @@ public class StatisticsManager implements Service {
|
||||
//includeRate("transport.receiveMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||
//includeRate("transport.receiveMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 });
|
||||
includeRate("client.sendAckTime", stats, new long[] { 60*60*1000 }, true);
|
||||
includeRate("stream.con.sendDuplicateSize", stats, new long[] { 60*60*1000 });
|
||||
includeRate("stream.con.receiveDuplicateSize", stats, new long[] { 60*60*1000 });
|
||||
//includeRate("client.sendsPerFailure", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
|
||||
//includeRate("client.timeoutCongestionTunnel", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
|
||||
//includeRate("client.timeoutCongestionMessage", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
|
||||
|
@@ -55,7 +55,7 @@ public class ClientConnectionRunner {
|
||||
/** user's config */
|
||||
private SessionConfig _config;
|
||||
/** static mapping of MessageId to Payload, storing messages for retrieval */
|
||||
private static Map _messages;
|
||||
private Map _messages;
|
||||
/** lease set request state, or null if there is no request pending on at the moment */
|
||||
private LeaseRequestState _leaseRequest;
|
||||
/** currently allocated leaseSet, or null if none is allocated */
|
||||
@@ -227,7 +227,7 @@ public class ClientConnectionRunner {
|
||||
}
|
||||
|
||||
void disconnectClient(String reason) {
|
||||
_log.error("Disconnecting the client: " + reason, new Exception("Disconnecting!"));
|
||||
_log.error("Disconnecting the client: " + reason);
|
||||
DisconnectMessage msg = new DisconnectMessage();
|
||||
msg.setReason(reason);
|
||||
try {
|
||||
|
@@ -42,9 +42,6 @@ public class ClientListenerRunner implements Runnable {
|
||||
public void setPort(int port) { _port = port; }
|
||||
public int getPort() { return _port; }
|
||||
|
||||
/** max time to bind */
|
||||
private final static int MAX_FAIL_DELAY = 5*60*1000;
|
||||
|
||||
/**
|
||||
* Start up the socket listener, listens for connections, and
|
||||
* fires those connections off via {@link #runConnection runConnection}.
|
||||
@@ -55,7 +52,7 @@ public class ClientListenerRunner implements Runnable {
|
||||
public void runServer() {
|
||||
_running = true;
|
||||
int curDelay = 0;
|
||||
while ( (_running) && (curDelay < MAX_FAIL_DELAY) ) {
|
||||
while (_running) {
|
||||
try {
|
||||
_log.info("Starting up listening for connections on port " + _port);
|
||||
_socket = new ServerSocket(_port);
|
||||
|
@@ -145,20 +145,21 @@ public class ClientManager {
|
||||
if (runner != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Message " + msgId + " is targeting a local destination. distribute it as such");
|
||||
runner.receiveMessage(toDest, fromDest, payload);
|
||||
if (fromDest != null) {
|
||||
ClientConnectionRunner sender = getRunner(fromDest);
|
||||
if (sender != null) {
|
||||
sender.updateMessageDeliveryStatus(msgId, true);
|
||||
} else {
|
||||
_log.log(Log.CRIT, "Um, wtf, we're sending a local message, but we can't find who sent it?", new Exception("wtf"));
|
||||
}
|
||||
ClientConnectionRunner sender = getRunner(fromDest);
|
||||
if (sender == null) {
|
||||
// sender went away
|
||||
return;
|
||||
}
|
||||
_context.jobQueue().addJob(new DistributeLocal(toDest, runner, sender, fromDest, payload, msgId));
|
||||
} else {
|
||||
// remote. w00t
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Message " + msgId + " is targeting a REMOTE destination! Added to the client message pool");
|
||||
runner = getRunner(fromDest);
|
||||
if (runner == null) {
|
||||
// sender went away
|
||||
return;
|
||||
}
|
||||
ClientMessage msg = new ClientMessage();
|
||||
msg.setDestination(toDest);
|
||||
msg.setPayload(payload);
|
||||
@@ -170,6 +171,32 @@ public class ClientManager {
|
||||
}
|
||||
}
|
||||
|
||||
private class DistributeLocal extends JobImpl {
|
||||
private Destination _toDest;
|
||||
private ClientConnectionRunner _to;
|
||||
private ClientConnectionRunner _from;
|
||||
private Destination _fromDest;
|
||||
private Payload _payload;
|
||||
private MessageId _msgId;
|
||||
|
||||
public DistributeLocal(Destination toDest, ClientConnectionRunner to, ClientConnectionRunner from, Destination fromDest, Payload payload, MessageId id) {
|
||||
super(_context);
|
||||
_toDest = toDest;
|
||||
_to = to;
|
||||
_from = from;
|
||||
_fromDest = fromDest;
|
||||
_payload = payload;
|
||||
_msgId = id;
|
||||
}
|
||||
public String getName() { return "Distribute local message"; }
|
||||
public void runJob() {
|
||||
_to.receiveMessage(_toDest, _fromDest, _payload);
|
||||
if (_from != null) {
|
||||
_from.updateMessageDeliveryStatus(_msgId, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Request that a particular client authorize the Leases contained in the
|
||||
|
@@ -1,87 +0,0 @@
|
||||
package net.i2p.router.startup;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
import java.util.Properties;
|
||||
|
||||
import net.i2p.data.DataHelper;
|
||||
|
||||
/**
|
||||
* Make sure that if there is a wrapper.config file, it includes
|
||||
* all of the jar files necessary for the current build.
|
||||
* HOLY CRAP THIS IS UGLY.
|
||||
*
|
||||
*/
|
||||
public class VerifyClasspath {
|
||||
private static final String NL = System.getProperty("line.separator");
|
||||
private static final Set _jars = new HashSet();
|
||||
|
||||
static {
|
||||
_jars.add("lib/ant.jar");
|
||||
_jars.add("lib/heartbeat.jar");
|
||||
_jars.add("lib/i2p.jar");
|
||||
_jars.add("lib/i2ptunnel.jar");
|
||||
_jars.add("lib/jasper-compiler.jar");
|
||||
_jars.add("lib/jasper-runtime.jar");
|
||||
_jars.add("lib/javax.servlet.jar");
|
||||
_jars.add("lib/jnet.jar");
|
||||
_jars.add("lib/mstreaming.jar");
|
||||
_jars.add("lib/netmonitor.jar");
|
||||
_jars.add("lib/org.mortbay.jetty.jar");
|
||||
_jars.add("lib/router.jar");
|
||||
_jars.add("lib/routerconsole.jar");
|
||||
_jars.add("lib/sam.jar");
|
||||
_jars.add("lib/wrapper.jar");
|
||||
_jars.add("lib/xercesImpl.jar");
|
||||
_jars.add("lib/xml-apis.jar");
|
||||
_jars.add("lib/jbigi.jar");
|
||||
_jars.add("lib/systray.jar");
|
||||
_jars.add("lib/systray4j.jar");
|
||||
_jars.add("lib/streaming.jar");
|
||||
}
|
||||
|
||||
/**
|
||||
* update the wrapper.config
|
||||
*
|
||||
* @return true if the classpath was updated and a restart is
|
||||
* required, false otherwise.
|
||||
*/
|
||||
public static boolean updateClasspath() {
|
||||
Properties p = new Properties();
|
||||
File configFile = new File("wrapper.config");
|
||||
Set needed = new HashSet(_jars);
|
||||
try {
|
||||
DataHelper.loadProps(p, configFile);
|
||||
Set toAdd = new HashSet();
|
||||
int entry = 1;
|
||||
while (true) {
|
||||
String value = p.getProperty("wrapper.java.classpath." + entry);
|
||||
if (value == null) break;
|
||||
needed.remove(value);
|
||||
entry++;
|
||||
}
|
||||
if (needed.size() <= 0) {
|
||||
// we have everything we need
|
||||
return false;
|
||||
} else {
|
||||
// add on some new lines
|
||||
FileWriter out = new FileWriter(configFile, true);
|
||||
out.write(NL + "# Adding new libs as required by the update" + NL);
|
||||
for (Iterator iter = needed.iterator(); iter.hasNext(); ) {
|
||||
String name = (String)iter.next();
|
||||
out.write("wrapper.java.classpath." + entry + "=" + name + NL);
|
||||
}
|
||||
out.close();
|
||||
return true;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
ioe.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
@@ -142,6 +142,7 @@ public class ConnectionBuilder {
|
||||
con.setRemoteRouterIdentity(_actualPeer.getIdentity());
|
||||
con.setRemoteAddress(_remoteAddress);
|
||||
con.setAttemptedPeer(_target.getIdentity().getHash());
|
||||
con.setShownAddress(_localIP);
|
||||
if (_error == null) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Establishment successful! returning the con");
|
||||
|
@@ -30,6 +30,7 @@ public class MessageHandler implements I2NPMessageReader.I2NPMessageEventListene
|
||||
}
|
||||
|
||||
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToRead, int size) {
|
||||
_con.messageReceived();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Just received message " + message.getUniqueId() + " from "
|
||||
+ _identHash.toBase64().substring(0,6)
|
||||
|
@@ -41,6 +41,8 @@ public class TCPAddress {
|
||||
_host = null;
|
||||
_port = -1;
|
||||
_addr = null;
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Unknown host [" + host + "] for port [" + port + "]", uhe);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -30,6 +30,7 @@ public class TCPConnection {
|
||||
private RouterIdentity _ident;
|
||||
private Hash _attemptedPeer;
|
||||
private TCPAddress _remoteAddress;
|
||||
private String _shownAddress;
|
||||
private List _pendingMessages;
|
||||
private InputStream _in;
|
||||
private OutputStream _out;
|
||||
@@ -40,6 +41,8 @@ public class TCPConnection {
|
||||
private RateStat _sendRate;
|
||||
private long _started;
|
||||
private boolean _closed;
|
||||
private long _lastRead;
|
||||
private long _lastWrite;
|
||||
|
||||
public TCPConnection(RouterContext ctx) {
|
||||
_context = ctx;
|
||||
@@ -47,12 +50,15 @@ public class TCPConnection {
|
||||
_pendingMessages = new ArrayList(4);
|
||||
_ident = null;
|
||||
_remoteAddress = null;
|
||||
_shownAddress = null;
|
||||
_in = null;
|
||||
_out = null;
|
||||
_socket = null;
|
||||
_transport = null;
|
||||
_started = -1;
|
||||
_closed = false;
|
||||
_lastRead = 0;
|
||||
_lastWrite = 0;
|
||||
_runner = new ConnectionRunner(_context, this);
|
||||
_context.statManager().createRateStat("tcp.probabalisticDropQueueSize", "How many bytes were queued to be sent when a message as dropped probabalistically?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
|
||||
_context.statManager().createRateStat("tcp.queueSize", "How many bytes were queued on a connection?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
|
||||
@@ -71,6 +77,10 @@ public class TCPConnection {
|
||||
public void setRemoteAddress(TCPAddress addr) { _remoteAddress = addr; }
|
||||
/** Who we initially were trying to contact */
|
||||
public void setAttemptedPeer(Hash peer) { _attemptedPeer = peer; }
|
||||
/** What address the peer said we are reachable on */
|
||||
public void setShownAddress(String ip) { _shownAddress = ip; }
|
||||
/** What address the peer said we are reachable on */
|
||||
public String getShownAddress() { return _shownAddress; }
|
||||
|
||||
/**
|
||||
* Actually start processing the messages on the connection (and reading
|
||||
@@ -359,6 +369,19 @@ public class TCPConnection {
|
||||
boolean getIsClosed() { return _closed; }
|
||||
RouterContext getRouterContext() { return _context; }
|
||||
|
||||
boolean getIsActive() {
|
||||
if ( (_lastRead <= 0) || (_lastWrite <= 0) ) return false;
|
||||
long recent = (_lastRead > _lastWrite ? _lastRead : _lastWrite);
|
||||
long howLongAgo = _context.clock().now() - recent;
|
||||
if (howLongAgo < 1*60*1000)
|
||||
return true;
|
||||
else
|
||||
return false;
|
||||
}
|
||||
void messageReceived() {
|
||||
_lastRead = _context.clock().now();
|
||||
}
|
||||
|
||||
/**
|
||||
* The message was sent.
|
||||
*
|
||||
@@ -370,5 +393,7 @@ public class TCPConnection {
|
||||
_transport.afterSend(msg, ok, true, time);
|
||||
if (ok)
|
||||
_sendRate.addData(msg.getMessageSize(), msg.getLifetime());
|
||||
if (ok)
|
||||
_lastWrite = _context.clock().now();
|
||||
}
|
||||
}
|
||||
|
@@ -40,6 +40,7 @@ public class TCPConnectionEstablisher implements Runnable {
|
||||
if (con != null) {
|
||||
_transport.connectionEstablished(con);
|
||||
} else {
|
||||
if (!_context.router().isAlive()) return;
|
||||
_transport.addConnectionErrorMessage(cb.getError());
|
||||
Hash peer = info.getIdentity().getHash();
|
||||
_context.profileManager().commErrorOccurred(peer);
|
||||
|
@@ -69,6 +69,10 @@ class TCPListener {
|
||||
|
||||
if (addr.getPort() > 0) {
|
||||
if (_listener != null) {
|
||||
if ( (_listener.getMyAddress().getPort() == addr.getPort()) &&
|
||||
(_listener.getMyAddress().getHost() == null) ) {
|
||||
_listener.getMyAddress().setHost(addr.getHost());
|
||||
}
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Not starting another listener on " + addr
|
||||
+ " while already listening on " + _listener.getMyAddress());
|
||||
@@ -137,7 +141,7 @@ class TCPListener {
|
||||
|
||||
public void run() {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Beginning TCP listener");
|
||||
_log.info("Beginning TCP listener on " + _myAddress);
|
||||
|
||||
int curDelay = 0;
|
||||
while (_isRunning) {
|
||||
|
@@ -340,8 +340,10 @@ public class TCPTransport extends TransportImpl {
|
||||
* @param address address that the remote host said was ours
|
||||
*/
|
||||
void ourAddressReceived(String address) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Address received [" + address + "] our address: [" + _myAddress + "]");
|
||||
synchronized (_listener) { // no need to lock on the whole TCPTransport
|
||||
if (allowAddressUpdate()) {
|
||||
if (allowAddressUpdate(address)) {
|
||||
int port = getPort();
|
||||
TCPAddress addr = new TCPAddress(address, port);
|
||||
if (addr.getPort() > 0) {
|
||||
@@ -349,12 +351,17 @@ public class TCPTransport extends TransportImpl {
|
||||
if (_myAddress != null) {
|
||||
if (addr.getAddress().equals(_myAddress.getAddress())) {
|
||||
// ignore, since there is no change
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Not updating our local address, as it hasnt changed from " + address);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Update our local address to " + address);
|
||||
updateAddress(addr);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Address received is NOT a valid address! [" + addr + "]");
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
@@ -363,6 +370,8 @@ public class TCPTransport extends TransportImpl {
|
||||
} else {
|
||||
// either we have explicitly specified our IP address, or
|
||||
// we are already connected to some people.
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Not allowing address update");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -451,9 +460,16 @@ public class TCPTransport extends TransportImpl {
|
||||
*
|
||||
*/
|
||||
boolean allowAddress(TCPAddress address) {
|
||||
if (address == null) return false;
|
||||
if ( (address.getPort() <= 0) || (address.getPort() > 65535) )
|
||||
if (address == null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Address is null?!");
|
||||
return false;
|
||||
}
|
||||
if ( (address.getPort() <= 0) || (address.getPort() > 65535) ) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Port is invalid? " + address.getPort());
|
||||
return false;
|
||||
}
|
||||
if (!address.isPubliclyRoutable()) {
|
||||
String allowLocal = _context.getProperty(LISTEN_ALLOW_LOCAL, "false");
|
||||
if (Boolean.valueOf(allowLocal).booleanValue()) {
|
||||
@@ -504,12 +520,39 @@ public class TCPTransport extends TransportImpl {
|
||||
* have no fully established connections.
|
||||
*
|
||||
*/
|
||||
private boolean allowAddressUpdate() {
|
||||
boolean addressSpecified = (null != _context.getProperty(LISTEN_ADDRESS));
|
||||
if (addressSpecified)
|
||||
return false;
|
||||
private boolean allowAddressUpdate(String proposedAddress) {
|
||||
int connectedPeers = countActivePeers();
|
||||
return (connectedPeers == 0);
|
||||
boolean addressSpecified = (null != _context.getProperty(LISTEN_ADDRESS));
|
||||
if (addressSpecified) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Not allowing address update, sicne we have one specified (#cons=" + connectedPeers + ")");
|
||||
return false;
|
||||
}
|
||||
if (connectedPeers < 3) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Allowing address update, since the # of connected peers is " + connectedPeers);
|
||||
return true;
|
||||
} else if (connectedPeers == 3) {
|
||||
// ok, now comes the vote:
|
||||
// if we agree with the majority, allow the update
|
||||
// otherwise, reject the update
|
||||
int agreed = countActiveAgreeingPeers(proposedAddress);
|
||||
if (agreed > 1) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Most common address selected, allowing address update w/ # of connected peers is " + connectedPeers);
|
||||
return true;
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Proposed address [" + proposedAddress + "] is only used by " + agreed
|
||||
+ ", rejecting address update w/ # of connected peers is "
|
||||
+ connectedPeers);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Not allowing address update, since the # of connected peers is " + connectedPeers);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -544,9 +587,44 @@ public class TCPTransport extends TransportImpl {
|
||||
*
|
||||
*/
|
||||
public int countActivePeers() {
|
||||
int numActive = 0;
|
||||
int numInactive = 0;
|
||||
synchronized (_connectionLock) {
|
||||
return _connectionsByIdent.size();
|
||||
if (_connectionsByIdent.size() <= 0) return 0;
|
||||
for (Iterator iter = _connectionsByIdent.values().iterator(); iter.hasNext(); ) {
|
||||
TCPConnection con = (TCPConnection)iter.next();
|
||||
if (con.getIsActive())
|
||||
numActive++;
|
||||
else
|
||||
numInactive++;
|
||||
}
|
||||
}
|
||||
if ( (numInactive > 0) && (_log.shouldLog(Log.DEBUG)) )
|
||||
_log.debug("Inactive peers: " + numInactive + " active: " + numActive);
|
||||
|
||||
return numActive;
|
||||
}
|
||||
|
||||
/**
|
||||
* How many peers that we are connected to think we are reachable at the given
|
||||
* address?
|
||||
*
|
||||
*/
|
||||
public int countActiveAgreeingPeers(String address) {
|
||||
int agreed = 0;
|
||||
synchronized (_connectionLock) {
|
||||
if (_connectionsByIdent.size() <= 0) return 0;
|
||||
for (Iterator iter = _connectionsByIdent.values().iterator(); iter.hasNext(); ) {
|
||||
TCPConnection con = (TCPConnection)iter.next();
|
||||
if (con.getIsActive()) {
|
||||
String shown = con.getShownAddress();
|
||||
if ( (shown != null) && (shown.equals(address)) )
|
||||
agreed++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return agreed;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@@ -2,6 +2,7 @@ package net.i2p.router.tunnelmanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@@ -152,7 +153,7 @@ class TunnelPool {
|
||||
*
|
||||
*/
|
||||
public Set getManagedTunnelIds() {
|
||||
if (!_isLive) return null;
|
||||
if (!_isLive) return Collections.EMPTY_SET;
|
||||
Set ids = new HashSet(64);
|
||||
synchronized (_outboundTunnels) {
|
||||
ids.addAll(_outboundTunnels.keySet());
|
||||
|
Reference in New Issue
Block a user