Compare commits

..

17 Commits

Author SHA1 Message Date
jrandom
610f1f7dd4 * 2004-12-01 0.4.2.1 released
2004-12-01  jrandom
    * Strip out any of the Accept-* HTTP header lines, and always make sure to
      include the forged User-agent header.
    * Adjust the default read timeout on the eepproxy to 60s, unless
      overridden.
    * Minor tweak on stream shutdown.
2004-12-01 22:31:55 +00:00
jrandom
516d0b4db8 2004-11-30 jrandom
* Render the burst rate fields on /config.jsp properly (thanks ugha!)
    * Build in a simple timeout to flush data queued into the I2PSocket but
      not yet flushed.
    * Don't explicitly flush after each SAM stream write, but leave it up to
      the [nonblocking] passive flush.
    * Don't whine about 10-99 connection events occurring in a second
    * Don't wait for completion of packets that will not be ACKed (duh)
    * Adjust the congestion window, even if the packet was resent (duh)
    * Make sure to wake up any blocking read()'s when the MessageInputStream
      is close()ed (duh)
    * Never wait more than the disconnect timeout for a write to complete
2004-11-30 23:41:51 +00:00
jrandom
df61ae5c6f duh. thanks clayboy :) 2004-11-30 00:10:20 +00:00
jrandom
9f6584b55e 2004-11-29 jrandom
* Minor fixes to avoid unnecessary errors on shutdown (thanks susi!)
2004-11-29 23:24:49 +00:00
jrandom
e4b41f5bb0 2004-11-29 jrandom
* Reduced contention for local client delivery
    * Drop the new code that munges the wrapper.config.  Instead, updates that
      need to change it will include their own wrapper.config in the
      i2pupdate.zip, overwriting the existing file.  If the file
      "wrapper.config.updated" is included, it is deleted at first opportunity
      and the router shut down, displaying a notice that the router must be
      started again cleanly to allow the changes to the wrapper.config to take
      effect.
    * Properly stop accept()ing I2PSocket connections if we close down the
      session (duh).
    * Make sure we cancel any outstanding Packets in flight when a connection
      is terminated (thanks susi!)
    * Split up the I2PTunnel closing a little further.
2004-11-29 22:27:39 +00:00
jrandom
8d0cea93e9 2004-11-29 jrandom
* Reduced contention for local client delivery
    * Drop the new code that munges the wrapper.config.  Instead, updates that
      need to change it will include their own wrapper.config in the
      i2pupdate.zip, overwriting the existing file.  If the file
      "wrapper.config.updated" is included, it is deleted at first opportunity
      and the router shut down, displaying a notice that the router must be
      started again cleanly to allow the changes to the wrapper.config to take
      effect.
    * Properly stop accept()ing I2PSocket connections if we close down the
      session (duh).
    * Make sure we cancel any outstanding Packets in flight when a connection
      is terminated (thanks susi!)
    * Split up the I2PTunnel closing a little further.
2004-11-29 21:57:14 +00:00
jrandom
d294d07919 added bdl.i2p 2004-11-29 20:55:38 +00:00
jrandom
153eea2bd5 you mean i'm supposed to *test* it? 2004-11-29 03:35:39 +00:00
jrandom
571e3c5c13 2004-11-28 jrandom
* Accept IP address detection changes with a 2-out-of-3 minimum.
    * As long as the router is up, keep retrying to bind the I2CP listener.
    * Decrease the java service wrapper ping frequency to once every 10
      minutes, rather than once every 5 seconds.
2004-11-29 02:09:27 +00:00
jrandom
a2d268f3d6 2004-11-28 jrandom
* Accept IP address detection changes with a 2-out-of-3 minimum.
    * As long as the router is up, keep retrying to bind the I2CP listener.
    * Decrease the java service wrapper ping frequency to once every 10
      minutes, rather than once every 5 seconds.
2004-11-29 01:58:38 +00:00
jrandom
02d456d7a0 added bacardi.i2p and guttersnipe.i2p 2004-11-28 22:47:01 +00:00
mpc
b3626ad86f should've tested it first 2004-11-28 05:11:39 +00:00
mpc
9b6eab451f partial raw handling 2004-11-28 05:10:29 +00:00
jrandom
72be9b5f04 2004-11-27 jrandom
* Some cleanup and bugfixes for the IP address detection code where we
      only consider connections that have actually sent and received messages
      recently as active, rather than the mere presence of a TCP socket as
      activity.
2004-11-27 21:02:06 +00:00
jrandom
35e94a7f65 added evil.i2p 2004-11-27 06:56:29 +00:00
jrandom
8e02586cc9 2004-11-27 jrandom
* Removed the I2PTunnel inactivity timeout thread, since the new streaming
      lib can do that (without an additional per-connection thread).
    * Close the I2PTunnel forwarder threads more aggressively
2004-11-27 05:17:06 +00:00
jrandom
0b5a640896 2004-11-27 jrandom
* Fix for a fast loop caused by a race in the new streaming library (thanks
      DrWoo, frontier, pwk_, and thetower!)
    * Minor updates to the SimpleTimer and Connection to help track down a
      high CPU usage problem (dumping debug info to stdout/wrapper.log if too
      many events/tasks fire in a second)
    * Minor fixes for races on client disconnects (causing NPEs)
2004-11-27 03:54:17 +00:00
36 changed files with 631 additions and 305 deletions

View File

@@ -184,13 +184,26 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna
* create the default options (using the default timeout, etc)
*
*/
private I2PSocketOptions getDefaultOptions() {
protected I2PSocketOptions getDefaultOptions() {
Properties defaultOpts = getTunnel().getClientOptions();
I2PSocketOptions opts = sockMgr.buildOptions(defaultOpts);
if (!defaultOpts.containsKey(I2PSocketOptions.PROP_CONNECT_TIMEOUT))
opts.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
return opts;
}
/**
* create the default options (using the default timeout, etc)
*
*/
protected I2PSocketOptions getDefaultOptions(Properties overrides) {
Properties defaultOpts = getTunnel().getClientOptions();
defaultOpts.putAll(overrides);
I2PSocketOptions opts = sockMgr.buildOptions(defaultOpts);
if (!defaultOpts.containsKey(I2PSocketOptions.PROP_CONNECT_TIMEOUT))
opts.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
return opts;
}
/**
* Create a new I2PSocket towards to the specified destination,

View File

@@ -14,10 +14,12 @@ import java.util.Date;
import java.util.List;
import java.util.StringTokenizer;
import java.util.HashMap;
import java.util.Properties;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketOptions;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.util.Clock;
@@ -142,6 +144,33 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
return proxy;
}
}
private static final int DEFAULT_READ_TIMEOUT = 60*1000;
/**
* create the default options (using the default timeout, etc)
*
*/
protected I2PSocketOptions getDefaultOptions() {
I2PSocketOptions opts = super.getDefaultOptions();
Properties defaultOpts = getTunnel().getClientOptions();
if (!defaultOpts.contains(I2PSocketOptions.PROP_READ_TIMEOUT))
opts.setReadTimeout(DEFAULT_READ_TIMEOUT);
return opts;
}
/**
* create the default options (using the default timeout, etc)
*
*/
protected I2PSocketOptions getDefaultOptions(Properties overrides) {
I2PSocketOptions opts = super.getDefaultOptions(overrides);
Properties defaultOpts = getTunnel().getClientOptions();
defaultOpts.putAll(overrides);
if (!defaultOpts.containsKey(I2PSocketOptions.PROP_READ_TIMEOUT))
opts.setConnectTimeout(DEFAULT_READ_TIMEOUT);
return opts;
}
private static long __requestId = 0;
protected void clientConnectionRun(Socket s) {
@@ -149,7 +178,6 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
String targetRequest = null;
boolean usingWWWProxy = false;
String currentProxy = null;
InactivityTimeoutThread timeoutThread = null;
long requestId = ++__requestId;
try {
out = s.getOutputStream();
@@ -295,7 +323,14 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix(requestId) + "Setting host = " + host);
} else if (line.startsWith("User-Agent: ")) {
line = "User-Agent: MYOB/6.66 (AN/ON)";
// always stripped, added back at the end
line = null;
continue;
} else if (line.startsWith("Accept")) {
// strip the accept-blah headers, as they vary dramatically from
// browser to browser
line = null;
continue;
} else if (line.startsWith("Referer: ")) {
// Shouldn't we be more specific, like accepting in-site referers ?
//line = "Referer: i2p";
@@ -313,6 +348,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
}
if (line.length() == 0) {
newRequest.append("User-Agent: MYOB/6.66 (AN/ON)");
newRequest.append("Connection: close\r\n\r\n");
break;
} else {
@@ -354,25 +390,26 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
return;
}
String remoteID;
I2PSocket i2ps = createI2PSocket(dest);
Properties opts = new Properties();
opts.setProperty("i2p.streaming.inactivityTimeout", ""+120*1000);
// 1 == disconnect. see ConnectionOptions in the new streaming lib, which i
// dont want to hard link to here
opts.setProperty("i2p.streaming.inactivityTimeoutAction", ""+1);
I2PSocket i2ps = createI2PSocket(dest, getDefaultOptions(opts));
byte[] data = newRequest.toString().getBytes("ISO-8859-1");
I2PTunnelRunner runner = new I2PTunnelRunner(s, i2ps, sockLock, data, mySockets);
timeoutThread = new InactivityTimeoutThread(runner, out, targetRequest, usingWWWProxy, currentProxy, s, requestId);
timeoutThread.start();
} catch (SocketException ex) {
if (timeoutThread != null) timeoutThread.disable();
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
l.log(ex.getMessage());
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
closeSocket(s);
} catch (IOException ex) {
if (timeoutThread != null) timeoutThread.disable();
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
l.log(ex.getMessage());
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
closeSocket(s);
} catch (I2PException ex) {
if (timeoutThread != null) timeoutThread.disable();
_log.info("getPrefix(requestId) + Error trying to connect", ex);
l.log(ex.getMessage());
handleHTTPClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId);
@@ -380,91 +417,6 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable
}
}
private static final long INACTIVITY_TIMEOUT = 120 * 1000;
private static volatile long __timeoutId = 0;
private class InactivityTimeoutThread extends I2PThread {
private Socket s;
private I2PTunnelRunner _runner;
private OutputStream _out;
private String _targetRequest;
private boolean _useWWWProxy;
private String _currentProxy;
private long _requestId;
private boolean _disabled;
private Object _disableLock = new Object();
public InactivityTimeoutThread(I2PTunnelRunner runner, OutputStream out, String targetRequest,
boolean useWWWProxy, String currentProxy, Socket s, long requestId) {
this.s = s;
_runner = runner;
_out = out;
_targetRequest = targetRequest;
_useWWWProxy = useWWWProxy;
_currentProxy = currentProxy;
_disabled = false;
_requestId = requestId;
long timeoutId = ++__timeoutId;
setName("InactivityThread " + getPrefix(requestId) + timeoutId);
}
public void disable() {
_disabled = true;
synchronized (_disableLock) {
_disableLock.notifyAll();
}
}
public void run() {
while (!_disabled) {
if (_runner.isFinished()) {
if (_log.shouldLog(Log.INFO)) _log.info(getPrefix(_requestId) + "HTTP client request completed prior to timeout");
return;
}
if (_runner.getLastActivityOn() < Clock.getInstance().now() - INACTIVITY_TIMEOUT) {
if (_runner.getStartedOn() < Clock.getInstance().now() - INACTIVITY_TIMEOUT) {
if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix(_requestId) + "HTTP client request timed out (lastActivity: "
+ new Date(_runner.getLastActivityOn()) + ", startedOn: "
+ new Date(_runner.getStartedOn()) + ")");
timeout();
return;
} else {
// runner hasn't been going to long enough
}
} else {
// there has been activity in the period
}
synchronized (_disableLock) {
try {
_disableLock.wait(INACTIVITY_TIMEOUT);
} catch (InterruptedException ie) {
}
}
}
}
private void timeout() {
_log.info(getPrefix(_requestId) + "Inactivity timeout reached");
l.log("Inactivity timeout reached");
if (_out != null) {
try {
if (_runner.getLastActivityOn() > 0) {
// some data has been sent, so don't 404 it
} else {
writeErrorMessage(ERR_TIMEOUT, _out, _targetRequest, _useWWWProxy, _currentProxy);
}
} catch (IOException ioe) {
_log.warn(getPrefix(_requestId) + "Error writing out the 'timeout' message", ioe);
}
} else {
_log.warn(getPrefix(_requestId) + "Client disconnected before we could say we timed out");
}
closeSocket(s);
}
}
private final static String getHostName(String host) {
if (host == null) return null;
try {

View File

@@ -110,6 +110,9 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
//i2pout.flush();
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Initial data " + (initialData != null ? initialData.length : 0)
+ " written, starting forwarders");
Thread t1 = new StreamForwarder(in, i2pout, "toI2P");
Thread t2 = new StreamForwarder(i2pin, out, "fromI2P");
synchronized (finishLock) {
@@ -117,11 +120,13 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
finishLock.wait();
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("At least one forwarder completed, closing and joining");
// now one connection is dead - kill the other as well.
s.close();
i2ps.close();
t1.join();
t2.join();
t1.join(30*1000);
t2.join(30*1000);
closedCleanly = true;
} catch (InterruptedException ex) {
if (_log.shouldLog(Log.ERROR))
@@ -221,7 +226,7 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
out.flush(); // make sure the data get though
}
}
out.flush();
//out.flush(); // close() flushes
} catch (SocketException ex) {
// this *will* occur when the other threads closes the socket
synchronized (finishLock) {
@@ -248,11 +253,16 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
+ from + " and " + to);
}
try {
out.close();
in.close();
} catch (IOException ex) {
if (_log.shouldLog(Log.WARN))
_log.warn(direction + ": Error closing streams", ex);
_log.warn(direction + ": Error closing input stream", ex);
}
try {
out.close();
} catch (IOException ex) {
if (_log.shouldLog(Log.WARN))
_log.warn(direction + ": Error closing output stream", ex);
}
synchronized (finishLock) {
finished = true;

View File

@@ -120,10 +120,15 @@ public class ConfigNetHelper {
private static String getBurstFactor(int numSeconds, String name) {
StringBuffer buf = new StringBuffer(256);
buf.append("<select name=\"").append(name).append("\">\n");
boolean found = false;
for (int i = 10; i <= 60; i += 10) {
buf.append("<option value=\"").append(i).append("\" ");
if ( (i == numSeconds) || (i == 60) )
if (i == numSeconds) {
buf.append("selected ");
found = true;
} else if ( (i == 60) && (!found) ) {
buf.append("selected ");
}
buf.append(">");
buf.append(i).append(" seconds</option>\n");
}

View File

@@ -38,9 +38,11 @@ extern "C" {
#include <stddef.h>
#include <stdint.h>
/*
* Lengths
*/
/* The maximum length a SAM command can be */
#define SAM_CMD_LEN 128
/* The maximum size of a single datagram packet */
@@ -49,18 +51,22 @@ extern "C" {
#define SAM_LOGMSG_LEN 256
/* The longest `name' arg for the naming lookup callback */
#define SAM_NAME_LEN 256
/* The max size of a single stream packet */
/* The maximum size of a single stream packet */
#define SAM_STREAM_PAYLOAD_MAX (32 * 1024)
/* The length of a base 64 public key - it's actually 516, but +1 for '\0' */
#define SAM_PUBKEY_LEN 517
/* A public key SAM command's length */
/* The maximum length of a SAM command with a public key */
#define SAM_PKCMD_LEN (SAM_PUBKEY_LEN + SAM_CMD_LEN)
/* The maximum size of a single raw packet */
#define SAM_RAW_PAYLOAD_MAX (32 * 1024)
/* The maximum length a SAM non-data reply can be */
#define SAM_REPLY_LEN 1024
/*
* Shorten some standard variable types
* Some LibSAM variable types
*/
typedef signed char schar_t;
typedef unsigned char uchar_t;
typedef unsigned int uint_t;
@@ -102,57 +108,62 @@ typedef enum { /* see sam_strerror() for detailed descriptions of these */
SAM_TOO_BIG
} samerr_t;
/*
* Public functions
*/
/* SAM controls - sessions */
extern sam_sess_t *sam_session_init(sam_sess_t *session);
extern void sam_session_free(sam_sess_t **session);
sam_sess_t *sam_session_init(sam_sess_t *session);
void sam_session_free(sam_sess_t **session);
/* SAM controls - connection */
extern bool sam_close(sam_sess_t *session);
extern samerr_t sam_connect(sam_sess_t *session, const char *samhost,
uint16_t samport, const char *destname, sam_conn_t style,
uint_t tunneldepth);
bool sam_close(sam_sess_t *session);
samerr_t sam_connect(sam_sess_t *session, const char *samhost,
uint16_t samport, const char *destname, sam_conn_t style,
uint_t tunneldepth);
/* SAM controls - utilities */
extern void sam_naming_lookup(sam_sess_t *session, const char *name);
extern bool sam_read_buffer(sam_sess_t *session);
extern const char *sam_strerror(samerr_t code);
void sam_naming_lookup(sam_sess_t *session, const char *name);
bool sam_read_buffer(sam_sess_t *session);
const char *sam_strerror(samerr_t code);
/* SAM controls - callbacks */
extern void (*sam_diedback)(sam_sess_t *session);
extern void (*sam_logback)(char *str);
extern void (*sam_namingback)(char *name, sam_pubkey_t pubkey,
samerr_t result);
void (*sam_diedback)(sam_sess_t *session);
void (*sam_logback)(char *str);
void (*sam_namingback)(char *name, sam_pubkey_t pubkey, samerr_t result);
/* Stream commands */
extern void sam_stream_close(sam_sess_t *session, sam_sid_t stream_id);
extern sam_sid_t sam_stream_connect(sam_sess_t *session,
const sam_pubkey_t dest);
extern samerr_t sam_stream_send(sam_sess_t *session, sam_sid_t stream_id,
const void *data, size_t size);
void sam_stream_close(sam_sess_t *session, sam_sid_t stream_id);
sam_sid_t sam_stream_connect(sam_sess_t *session, const sam_pubkey_t dest);
samerr_t sam_stream_send(sam_sess_t *session, sam_sid_t stream_id,
const void *data, size_t size);
/* Stream commands - callbacks */
extern void (*sam_closeback)(sam_sess_t *session, sam_sid_t stream_id,
samerr_t reason);
extern void (*sam_connectback)(sam_sess_t *session, sam_sid_t stream_id,
sam_pubkey_t dest);
extern void (*sam_databack)(sam_sess_t *session, sam_sid_t stream_id,
void *data, size_t size);
extern void (*sam_statusback)(sam_sess_t *session, sam_sid_t stream_id,
samerr_t result);
void (*sam_closeback)(sam_sess_t *session, sam_sid_t stream_id,
samerr_t reason);
void (*sam_connectback)(sam_sess_t *session, sam_sid_t stream_id,
sam_pubkey_t dest);
void (*sam_databack)(sam_sess_t *session, sam_sid_t stream_id,
void *data, size_t size);
void (*sam_statusback)(sam_sess_t *session, sam_sid_t stream_id,
samerr_t result);
/* Stream send queue (experimental) */
extern void sam_sendq_add(sam_sess_t *session, sam_sid_t stream_id,
sam_sendq_t **sendq, const void *data, size_t dsize);
extern void sam_sendq_flush(sam_sess_t *session, sam_sid_t stream_id,
sam_sendq_t **sendq);
void sam_sendq_add(sam_sess_t *session, sam_sid_t stream_id,
sam_sendq_t **sendq, const void *data, size_t dsize);
void sam_sendq_flush(sam_sess_t *session, sam_sid_t stream_id,
sam_sendq_t **sendq);
/* Datagram commands */
extern samerr_t sam_dgram_send(sam_sess_t *session, const sam_pubkey_t dest,
const void *data, size_t size);
samerr_t sam_dgram_send(sam_sess_t *session, const sam_pubkey_t dest,
const void *data, size_t size);
/* Datagram commands - callbacks */
extern void (*sam_dgramback)(sam_sess_t *session, sam_pubkey_t dest,
void *data, size_t size);
void (*sam_dgramback)(sam_sess_t *session, sam_pubkey_t dest, void *data,
size_t size);
/* Raw commands */
samerr_t sam_raw_send(sam_sess_t *session, const sam_pubkey_t dest,
const void *data, size_t size);
/* Raw commands - callbacks */
void (*sam_rawback)(sam_sess_t *session, void *data, size_t size);
#ifdef __cplusplus
}

View File

@@ -55,28 +55,40 @@ static ssize_t sam_write(sam_sess_t *session, const void *buf, size_t n);
* Callback functions
* Note: if you add a new callback be sure to check for non-NULL in sam_connect
*/
/* a peer closed the connection */
void (*sam_closeback)(sam_sess_t *session, sam_sid_t stream_id, samerr_t reason)
= NULL;
/* a peer connected to us */
void (*sam_connectback)(sam_sess_t *session, sam_sid_t stream_id,
sam_pubkey_t dest) = NULL;
/* a peer sent some stream data (`data' MUST be freed) */
void (*sam_databack)(sam_sess_t *session, sam_sid_t stream_id, void *data,
size_t size) = NULL;
/* a peer sent some datagram data (`data' MUST be freed) */
void (*sam_dgramback)(sam_sess_t *session, sam_pubkey_t dest, void *data,
size_t size) = NULL;
/* we lost the connection to the SAM host */
void (*sam_diedback)(sam_sess_t *session) = NULL;
/* logging callback */
void (*sam_logback)(char *str) = NULL;
/* naming lookup reply - `pubkey' will be NULL if `result' isn't SAM_OK */
void (*sam_namingback)(char *name, sam_pubkey_t pubkey, samerr_t result) = NULL;
/* our connection to a peer has completed */
void (*sam_statusback)(sam_sess_t *session, sam_sid_t stream_id,
samerr_t result) = NULL;
/* a peer sent some raw data (`data' MUST be freed) */
void (*sam_rawback)(sam_sess_t *session, void *data, size_t size) = NULL;
/*
* Closes the connection to the SAM host
*
@@ -155,7 +167,11 @@ samerr_t sam_connect(sam_sess_t *session, const char *samhost, uint16_t samport,
return SAM_CALLBACKS_UNSET;
}
} else if (style == SAM_RAW) {
abort(); /* not implemented yet */
if (sam_diedback == NULL || sam_logback == NULL
|| sam_namingback == NULL || sam_rawback == NULL) {
SAMLOGS("Please set callback functions before connecting");
return SAM_CALLBACKS_UNSET;
}
} else {
SAMLOGS("Unknown connection style");
return SAM_BAD_STYLE;
@@ -295,6 +311,7 @@ static void sam_parse(sam_sess_t *session, char *s)
#define SAM_NAMING_REPLY_OK "NAMING REPLY RESULT=OK"
#define SAM_NAMING_REPLY_IK "NAMING REPLY RESULT=INVALID_KEY"
#define SAM_NAMING_REPLY_KNF "NAMING REPLY RESULT=KEY_NOT_FOUND"
#define SAM_RAW_RECEIVED_REPLY "RAW RECEIVED"
#define SAM_STREAM_CLOSED_REPLY "STREAM CLOSED"
#define SAM_STREAM_CONNECTED_REPLY "STREAM CONNECTED"
#define SAM_STREAM_RECEIVED_REPLY "STREAM RECEIVED"
@@ -305,6 +322,10 @@ static void sam_parse(sam_sess_t *session, char *s)
#define SAM_STREAM_STATUS_REPLY_IK "STREAM STATUS RESULT=INVALID_KEY"
#define SAM_STREAM_STATUS_REPLY_TO "STREAM STATUS RESULT=TIMEOUT"
/*
* TODO: add raw parsing
*/
if (strncmp(s, SAM_DGRAM_RECEIVED_REPLY,
strlen(SAM_DGRAM_RECEIVED_REPLY)) == 0) {
char *p;
@@ -518,6 +539,42 @@ static void sam_parse(sam_sess_t *session, char *s)
return;
}
/*
* Sends data to a destination in a raw packet
*
* dest - base 64 destination of who we're sending to
* data - the data we're sending
* size - the size of the data
*
* Returns: SAM_OK on success
*/
samerr_t sam_raw_send(sam_sess_t *session, const sam_pubkey_t dest,
const void *data, size_t size)
{
assert(session != NULL);
char cmd[SAM_PKCMD_LEN];
if (size < 1 || size > SAM_RAW_PAYLOAD_MAX) {
#ifdef NO_Z_FORMAT
SAMLOG("Invalid data send size (%u bytes)", size);
#else
SAMLOG("Invalid data send size (%zu bytes)", size);
#endif
return SAM_TOO_BIG;
}
#ifdef NO_Z_FORMAT
snprintf(cmd, sizeof cmd, "RAW SEND DESTINATION=%s SIZE=%u\n",
dest, size);
#else
snprintf(cmd, sizeof cmd, "RAW SEND DESTINATION=%s SIZE=%zu\n",
dest, size);
#endif
sam_write(session, cmd, strlen(cmd));
sam_write(session, data, size);
return SAM_OK;
}
/*
* Reads and callbacks everything in the SAM network buffer until it is clear
*

View File

@@ -391,6 +391,8 @@ public class SAMStreamSession {
while (stillRunning) {
try {
i2ps = serverSocket.accept();
if (i2ps == null)
break;
_log.debug("New incoming connection");
@@ -467,6 +469,7 @@ public class SAMStreamSession {
}
try {
i2pSocketOS.write(data);
//i2pSocketOS.flush();
} catch (IOException e) {
_log.error("Error sending data through I2P socket", e);
return false;

View File

@@ -70,7 +70,7 @@ public class Connection {
private long _lifetimeDupMessageReceived;
public static final long MAX_RESEND_DELAY = 60*1000;
public static final long MIN_RESEND_DELAY = 20*1000;
public static final long MIN_RESEND_DELAY = 40*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */
public static int DISCONNECT_TIMEOUT = 5*60*1000;
@@ -313,14 +313,36 @@ public class Connection {
return acked;
}
private long _occurredTime;
private long _occurredEventCount;
void eventOccurred() {
_chooser.getScheduler(this).eventOccurred(this);
long now = System.currentTimeMillis();
TaskScheduler sched = _chooser.getScheduler(this);
now = now - now % 1000;
if (_occurredTime == now) {
_occurredEventCount++;
} else {
_occurredTime = now;
if (_occurredEventCount > 100) {
_log.log(Log.CRIT, "More than 100 events (" + _occurredEventCount + ") in a second on "
+ toString() + ": scheduler = " + sched);
}
_occurredEventCount = 0;
}
sched.eventOccurred(this);
}
void resetReceived() {
_resetReceived = true;
_outputStream.streamErrorOccurred(new IOException("Reset received"));
_inputStream.streamErrorOccurred(new IOException("Reset received"));
MessageOutputStream mos = _outputStream;
MessageInputStream mis = _inputStream;
if (mos != null)
mos.streamErrorOccurred(new IOException("Reset received"));
if (mis != null)
mis.streamErrorOccurred(new IOException("Reset received"));
_connectionError = "Connection reset";
synchronized (_connectLock) { _connectLock.notifyAll(); }
}
@@ -332,31 +354,18 @@ public class Connection {
disconnect(cleanDisconnect, true);
}
void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) {
if (!_connected) return;
_connected = false;
synchronized (_connectLock) { _connectLock.notifyAll(); }
if (_log.shouldLog(Log.DEBUG))
_log.debug("Disconnecting " + toString(), new Exception("discon"));
if (cleanDisconnect) {
if (cleanDisconnect && _connected) {
// send close packets and schedule stuff...
_outputStream.closeInternal();
_inputStream.close();
} else {
doClose();
boolean tagsCancelled = false;
synchronized (_outboundPackets) {
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
PacketLocal pl = (PacketLocal)iter.next();
if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) )
tagsCancelled = true;
pl.cancelled();
}
_outboundPackets.clear();
_outboundPackets.notifyAll();
}
if (tagsCancelled)
_context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
if (_connected)
doClose();
killOutstandingPackets();
}
if (removeFromConMgr) {
if (!_disconnectScheduled) {
@@ -364,6 +373,7 @@ public class Connection {
SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
}
}
_connected = false;
}
void disconnectComplete() {
@@ -376,10 +386,8 @@ public class Connection {
_outputStream.destroy();
_outputStream = null;
_outboundQueue = null;
_handler = null;
if (_receiver != null)
_receiver.destroy();
_receiver = null;
if (_activityTimer != null)
SimpleTimer.getInstance().addEvent(_activityTimer, 1);
_activityTimer = null;
@@ -393,6 +401,10 @@ public class Connection {
_connectionManager.removeConnection(this);
}
killOutstandingPackets();
}
private void killOutstandingPackets() {
boolean tagsCancelled = false;
synchronized (_outboundPackets) {
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
@@ -406,7 +418,6 @@ public class Connection {
}
if (tagsCancelled)
_context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
}
private class DisconnectEvent implements SimpleTimer.TimedEvent {
@@ -416,6 +427,7 @@ public class Connection {
+ Connection.this.toString());
}
public void timeReached() {
killOutstandingPackets();
if (_log.shouldLog(Log.INFO))
_log.info("Connection disconnect timer complete, drop the con "
+ Connection.this.toString());

View File

@@ -63,7 +63,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
if (doSend) {
PacketLocal packet = send(buf, off, size);
return packet;
//dont wait for non-acks
if ( (packet.getPayloadSize() > 0) || (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) )
return packet;
else
return _dummyStatus;
} else {
return _dummyStatus;
}

View File

@@ -31,7 +31,14 @@ class ConnectionHandler {
_acceptTimeout = DEFAULT_ACCEPT_TIMEOUT;
}
public void setActive(boolean active) { _active = active; }
public void setActive(boolean active) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("setActive(" + active + ") called");
synchronized (_synQueue) {
_active = active;
_synQueue.notifyAll(); // so we break from the accept()
}
}
public boolean getActive() { return _active; }
public void receiveNewSyn(Packet packet) {
@@ -66,8 +73,17 @@ class ConnectionHandler {
while (true) {
if ( (timeoutMs > 0) && (expiration < _context.clock().now()) )
return null;
if (!_active)
if (!_active) {
// fail all the ones we had queued up
synchronized (_synQueue) {
for (int i = 0; i < _synQueue.size(); i++) {
Packet packet = (Packet)_synQueue.get(i);
sendReset(packet);
}
_synQueue.clear();
}
return null;
}
Packet syn = null;
synchronized (_synQueue) {

View File

@@ -157,7 +157,7 @@ public class ConnectionPacketHandler {
+ ") for " + con);
return true;
} else if (numResends > 0) {
//} else if (numResends > 0) {
// window sizes are shrunk on resend, not on ack
} else {
if (acked > 0) {
@@ -166,17 +166,19 @@ public class ConnectionPacketHandler {
// new packet that ack'ed uncongested data, or an empty ack
int newWindowSize = con.getOptions().getWindowSize();
if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
// congestion avoidance
// we can't use newWindowSize += 1/newWindowSize, since we're
// integers, so lets use a random distribution instead
int shouldIncrement = _context.random().nextInt(newWindowSize);
if (shouldIncrement <= 0)
if (numResends <= 0) {
if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
// congestion avoidance
// we can't use newWindowSize += 1/newWindowSize, since we're
// integers, so lets use a random distribution instead
int shouldIncrement = _context.random().nextInt(newWindowSize);
if (shouldIncrement <= 0)
newWindowSize += 1;
} else {
// slow start
newWindowSize += 1;
} else {
// slow start
newWindowSize += 1;
}
}
if (_log.shouldLog(Log.DEBUG))

View File

@@ -272,6 +272,9 @@ public class MessageInputStream extends InputStream {
// at least one byte
while (_readyDataBlocks.size() <= 0) {
if (_locallyClosed)
throw new IOException("Already closed, you wanker");
if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) {
if (_log.shouldLog(Log.INFO))
_log.info("read(...," + offset + ", " + length + ")[" + i
@@ -402,6 +405,7 @@ public class MessageInputStream extends InputStream {
ba.setData(null);
}
_locallyClosed = true;
_dataLock.notifyAll();
}
}

View File

@@ -8,6 +8,7 @@ import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
/**
* A stream that we can shove data into that fires off those bytes
@@ -26,6 +27,11 @@ public class MessageOutputStream extends OutputStream {
private long _written;
private int _writeTimeout;
private ByteCache _dataCache;
private Flusher _flusher;
private long _lastFlushed;
private long _lastBuffered;
/** if we enqueue data but don't flush it in this period, flush it passively */
private int _passiveFlushDelay;
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE);
@@ -41,6 +47,10 @@ public class MessageOutputStream extends OutputStream {
_written = 0;
_closed = false;
_writeTimeout = -1;
_passiveFlushDelay = 5*1000;
_flusher = new Flusher();
if (_log.shouldLog(Log.DEBUG))
_log.debug("MessageOutputStream created");
}
public void setWriteTimeout(int ms) { _writeTimeout = ms; }
@@ -51,10 +61,11 @@ public class MessageOutputStream extends OutputStream {
}
public void write(byte b[], int off, int len) throws IOException {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("write(b[], " + off + ", " + len + ")");
if (_log.shouldLog(Log.DEBUG))
_log.debug("write(b[], " + off + ", " + len + ") ");
int cur = off;
int remaining = len;
long begin = _context.clock().now();
while (remaining > 0) {
WriteStatus ws = null;
// we do any waiting outside the synchronized() block because we
@@ -70,6 +81,11 @@ public class MessageOutputStream extends OutputStream {
cur += remaining;
_written += remaining;
remaining = 0;
_lastBuffered = _context.clock().now();
if (_passiveFlushDelay > 0) {
// if it is already enqueued, this just pushes it further out
SimpleTimer.getInstance().addEvent(_flusher, _passiveFlushDelay);
}
} else {
// buffer whatever we can fit then flush,
// repeating until we've pushed all of the
@@ -79,13 +95,20 @@ public class MessageOutputStream extends OutputStream {
remaining -= toWrite;
cur += toWrite;
_valid = _buf.length;
if (_dataReceiver == null) {
throwAnyError();
return;
}
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
throwAnyError();
_lastFlushed = _context.clock().now();
}
}
if (ws != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Waiting " + _writeTimeout + "ms for accept of " + ws);
// ok, we've actually added a new packet - lets wait until
// its accepted into the queue before moving on (so that we
// dont fill our buffer instantly)
@@ -96,8 +119,14 @@ public class MessageOutputStream extends OutputStream {
else
throw new IOException("Write not accepted into the queue");
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Queued " + len + " without sending to the receiver");
}
}
long elapsed = _context.clock().now() - begin;
if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo"));
throwAnyError();
}
@@ -106,6 +135,33 @@ public class MessageOutputStream extends OutputStream {
throwAnyError();
}
/**
* Flush data that has been enqued but not flushed after a certain
* period of inactivity
*/
private class Flusher implements SimpleTimer.TimedEvent {
public void timeReached() {
boolean sent = false;
WriteStatus ws = null;
synchronized (_dataLock) {
if ( (_valid > 0) && (_lastBuffered + _passiveFlushDelay > _context.clock().now()) ) {
if ( (_buf != null) && (_dataReceiver != null) ) {
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
_lastFlushed = _context.clock().now();
_dataLock.notifyAll();
sent = true;
}
}
}
// ignore the ws
if (sent && _log.shouldLog(Log.DEBUG))
_log.debug("Passive flush of " + ws);
}
}
/**
* Flush the data already queued up, blocking until it has been
* delivered.
@@ -114,12 +170,18 @@ public class MessageOutputStream extends OutputStream {
* @throws InterruptedIOException if the write times out
*/
public void flush() throws IOException {
long begin = _context.clock().now();
WriteStatus ws = null;
synchronized (_dataLock) {
if (_buf == null) throw new IOException("closed (buffer went away)");
if (_dataReceiver == null) {
throwAnyError();
return;
}
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
_lastFlushed = _context.clock().now();
_dataLock.notifyAll();
}
@@ -129,6 +191,8 @@ public class MessageOutputStream extends OutputStream {
( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) ||
(_writeTimeout <= 0) ) )
ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
else if ( (_writeTimeout <= 0) || (_writeTimeout > Connection.DISCONNECT_TIMEOUT) )
ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
else
ws.waitForCompletion(_writeTimeout);
if (_log.shouldLog(Log.DEBUG))
@@ -137,6 +201,10 @@ public class MessageOutputStream extends OutputStream {
throw new InterruptedIOException("Timed out during write");
else if (ws.writeFailed())
throw new IOException("Write failed");
long elapsed = _context.clock().now() - begin;
if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("wtf, took " + elapsed + "ms to flush the stream?\n" + ws, new Exception("bar"));
throwAnyError();
}
@@ -164,7 +232,8 @@ public class MessageOutputStream extends OutputStream {
ByteArray ba = null;
synchronized (_dataLock) {
// flush any data, but don't wait for it
_dataReceiver.writeData(_buf, 0, _valid);
if (_dataReceiver != null)
_dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
@@ -173,6 +242,7 @@ public class MessageOutputStream extends OutputStream {
_buf = null;
_valid = 0;
}
_lastFlushed = _context.clock().now();
_dataLock.notifyAll();
}
if (ba != null) {
@@ -212,6 +282,7 @@ public class MessageOutputStream extends OutputStream {
_written += _valid;
_valid = 0;
_dataLock.notifyAll();
_lastFlushed = _context.clock().now();
}
if (blocking && ws != null) {
ws.waitForAccept(_writeTimeout);

View File

@@ -47,7 +47,8 @@ class SchedulerClosed extends SchedulerImpl {
}
public void eventOccurred(Connection con) {
long timeLeft = con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT - _context.clock().now();
reschedule(timeLeft, con);
// noop. we do the timeout through the simpleTimer anyway
//long timeLeft = con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT - _context.clock().now();
//reschedule(timeLeft, con);
}
}

View File

@@ -239,6 +239,8 @@
<copy file="build/routerconsole.jar" todir="pkg-temp/lib/" />
<copy file="build/i2ptunnel.war" todir="pkg-temp/webapps/" />
<copy file="build/routerconsole.war" todir="pkg-temp/webapps/" />
<copy file="installer/resources/wrapper.config" todir="pkg-temp/" />
<copy file="installer/resources/wrapper.config" tofile="pkg-temp/wrapper.config.updated" />
<copy file="history.txt" todir="pkg-temp/" />
<copy file="hosts.txt" todir="pkg-temp/" />
<mkdir dir="pkg-temp/eepsite" />

View File

@@ -194,6 +194,12 @@ class I2PSessionImpl2 extends I2PSessionImpl {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId()
+ " / " + state.getNonce() + " found = " + found);
long timeToSend = afterRemovingSync - beforeSendingSync;
if ( (timeToSend > 10*1000) && (_log.shouldLog(Log.WARN)) ) {
_log.warn("wtf, took " + timeToSend + "ms to send the message?!", new Exception("baz"));
}
if (found) {
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with "

View File

@@ -72,6 +72,10 @@ public class SimpleTimer {
_log.log(Log.CRIT, msg, t);
}
private long _occurredTime;
private long _occurredEventCount;
private TimedEvent _recentEvents[] = new TimedEvent[5];
private class SimpleTimerRunner implements Runnable {
public void run() {
List eventsToFire = new ArrayList(1);
@@ -121,6 +125,9 @@ public class SimpleTimer {
}
}
long now = System.currentTimeMillis();
now = now - (now % 1000);
for (int i = 0; i < eventsToFire.size(); i++) {
TimedEvent evt = (TimedEvent)eventsToFire.get(i);
try {
@@ -128,7 +135,30 @@ public class SimpleTimer {
} catch (Throwable t) {
log("wtf, event borked: " + evt, t);
}
_recentEvents[4] = _recentEvents[3];
_recentEvents[3] = _recentEvents[2];
_recentEvents[2] = _recentEvents[1];
_recentEvents[1] = _recentEvents[0];
_recentEvents[0] = evt;
}
if (_occurredTime == now) {
_occurredEventCount += eventsToFire.size();
} else {
_occurredTime = now;
if (_occurredEventCount > 100) {
StringBuffer buf = new StringBuffer(256);
buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
buf.append(") in a second! Last 5: \n");
for (int i = 0; i < _recentEvents.length; i++) {
if (_recentEvents[i] != null)
buf.append(_recentEvents[i]).append('\n');
}
_log.log(Log.CRIT, buf.toString());
}
_occurredEventCount = 0;
}
eventsToFire.clear();
}
}

View File

@@ -1,4 +1,69 @@
$Id: history.txt,v 1.82 2004/11/25 16:57:19 jrandom Exp $
$Id: history.txt,v 1.90 2004/11/30 18:41:52 jrandom Exp $
* 2004-12-01 0.4.2.1 released
2004-12-01 jrandom
* Strip out any of the Accept-* HTTP header lines, and always make sure to
include the forged User-agent header.
* Adjust the default read timeout on the eepproxy to 60s, unless
overridden.
* Minor tweak on stream shutdown.
2004-11-30 jrandom
* Render the burst rate fields on /config.jsp properly (thanks ugha!)
* Build in a simple timeout to flush data queued into the I2PSocket but
not yet flushed.
* Don't explicitly flush after each SAM stream write, but leave it up to
the [nonblocking] passive flush.
* Don't whine about 10-99 connection events occurring in a second
* Don't wait for completion of packets that will not be ACKed (duh)
* Adjust the congestion window, even if the packet was resent (duh)
* Make sure to wake up any blocking read()'s when the MessageInputStream
is close()ed (duh)
* Never wait more than the disconnect timeout for a write to complete
2004-11-29 jrandom
* Minor fixes to avoid unnecessary errors on shutdown (thanks susi!)
2004-11-29 jrandom
* Reduced contention for local client delivery
* Drop the new code that munges the wrapper.config. Instead, updates that
need to change it will include their own wrapper.config in the
i2pupdate.zip, overwriting the existing file. If the file
"wrapper.config.updated" is included, it is deleted at first opportunity
and the router shut down, displaying a notice that the router must be
started again cleanly to allow the changes to the wrapper.config to take
effect.
* Properly stop accept()ing I2PSocket connections if we close down the
session (duh).
* Make sure we cancel any outstanding Packets in flight when a connection
is terminated (thanks susi!)
* Split up the I2PTunnel closing a little further.
2004-11-28 jrandom
* Accept IP address detection changes with a 2-out-of-3 minimum.
* As long as the router is up, keep retrying to bind the I2CP listener.
* Decrease the java service wrapper ping frequency to once every 10
minutes, rather than once every 5 seconds.
2004-11-27 jrandom
* Some cleanup and bugfixes for the IP address detection code where we
only consider connections that have actually sent and received messages
recently as active, rather than the mere presence of a TCP socket as
activity.
2004-11-27 jrandom
* Removed the I2PTunnel inactivity timeout thread, since the new streaming
lib can do that (without an additional per-connection thread).
* Close the I2PTunnel forwarder threads more aggressively
2004-11-27 jrandom
* Fix for a fast loop caused by a race in the new streaming library (thanks
DrWoo, frontier, pwk_, and thetower!)
* Minor updates to the SimpleTimer and Connection to help track down a
high CPU usage problem (dumping debug info to stdout/wrapper.log if too
many events/tasks fire in a second)
* Minor fixes for races on client disconnects (causing NPEs)
* 2004-11-26 0.4.2 released

View File

@@ -1,6 +1,9 @@
; TC's hosts.txt guaranteed freshness
; $Id: hosts.txt,v 1.79 2004/11/22 22:58:46 jrandom Exp $
; $Id: hosts.txt,v 1.82 2004/11/28 17:47:01 jrandom Exp $
; changelog:
; (1.105) added bdl.i2p
; (1.104) added bacardi.i2p and guttersnipe.i2p
; (1.103) added evil.i2p
; (1.102) added bsdm.i2p
; (1.101) added eschaton.i2p
; (1.100) added blog.curiosity.i2p
@@ -228,4 +231,8 @@ slacker.i2p=BR~D1lQNF~NLjdrJILDBA4DaQZpDoQZhVFNAwBgUz3sEP2tHnuiS7-EO2jwXEPKHIVZN
blog.curiosity.i2p=GqgGNWvNcbEABjAxBNJmILDIVJq0GSpf3eyjQt5EQsJFPCA3fYFYUULh4WoPFEk3I4pOV0lPGlkFX6Q52nkIk~kW~YGsCjxDJJ~08m31KgPJ70-svh7Fj-D8Bs27VaG9nXEltCJfjzfHsNY6m5jCCf400-14jTEghAjguIwPbpCpJ351jrE36sSU861LhhKnCiTItIN6ig9LROIUOJnTvh4-vRHrHS~oQVesAUqZ0zMPRNYlC1GHgTvou8iKLXlPnq3u1ITfCQYjeC6lpNn3bnnJOfrzGsPW3Y70jG5fqTPRqCyef5j0gNa~ZewMAQvV~oKxpDFTixNx8pBRv913pRXTdZfxT8D-fpYbXP1O0GtaHTgPO7PYWFKHCAaxmleD7tzSpIyfl4LpO0Y1Be5eHAZh5Lh1nE~tqVJZUbcVqZzDPmrmT20oCDljsZ8kEIRb3mn3VpzN5wOikPTh8AHg1jfic9hGBv5Lqu5SBffSCQZi7znTB7peX3ZavGODotR2AAAA
eschaton.i2p=YTlIyW42pdmE~D8fc3eUSdyN9x~SsddHcAXi3hISTTovLO-CphmfAqyvAZ1GV2-xExVlK7u~WBT8w9NBeig48NsAXCkNuWvZS~hzw9AERj5HVe5Jwzvd78x6rsFLaYbAFeSngtH~Bxi~ZLfLtXbcD7aZsXh2bCSV8OwKPRmPiKsV48Bwi9BSJA84VQLtpnbg3MtSDAciLVVH2Prvu939CciLAkdaST6JWBDY553c~4RlaHkxR~uHsskO0sNnCp2ASGRR8zoaDTP9eKhy9vXcCJZ6EYKDmRGcb~0hpuB67lP47PJaiNCWNk5jI~VNdCUE2O9D-XBPA88qvDZVK0wZWTL3Hxh5uvqr-rKZ2Txlh-qZ1vispjv47VC88ht~mQi~PGbqZxvPIOe9LxemIjlBedDjcS675cyVm-stwx-8G8au-hE7QNhZ3XN8kRlNcCTtMJxX2QRXT6juCZVNXfnO~sCexp7inl7~Q~yCeBbOoWhQoOOsm7kHMNEGxOkpkxjxAAAA
bdsm.i2p=qOcqbBo4vxHbldI758R76ZJZu92zWefovF6IEEv~XKA-UkHNbY-y6v9w-9ts-tbZxpak8l1QyB6Mp1Rwy-sE1YxevjIPhV-UQZSsJHRJHVZasR3ULncY-g-dvzGLab9YGtkdfub2GMlCJzDm9F8sUvGnLxlZ28mj3~ZLC0vEt5BeCl5hDuT9CiAzWarcSC2M1PIOMxIUYz9K5ueanOZjmAkNet4igKM0XmfXL8c85VeiPO8jx9G0ZDARwObXxvNdPq5pDwOZSY-5f-crDWMV1KesdmqsKIgydZ5GSPzq5jKLJdY3bUsxViNoWzjIWKfbwm6Wgl3eb9wzLEXDP1m2WGWAetNCgwSbT30A2yhTEWFwEqSE0GvWKvlztyG3oGm8eKN-VymRwduORnpKYbALNZ2MG4ZtVhKklER4hrgYSLuuXIi7KhQ0WXrKjPFnnL2MmnpPsFYn0ZQz6ilYOSAYcV3rBsd83RAFPWodlCjt0TpnpvcFfYZqfKZtYQryzkdFAAAA
evil.i2p=NGNaN9qrpk2dC~OQlo-1yHT~Tvb6SvU8VH9SiBkwNZxuNVRKN~2ZtU50nkTklevB5V08GMT5tkpyZCyVwVgZNNl50uz25t~xP3Pte6GUxzBfBJ8n2GoFdlhn5jGHhmM6QI3nlDgLO7AaNdIQu8LxaoO4lln~fReKvY3sNRMAOJlBO0mHH1PSsemJmqkhl02VX6-3KllgZ99E7uVT-ap2i7Pf3wvLtBtoHryZXvYtKL9zV17tBfhmIVT~VSuhsGRVoy43eoep~fPNCE3s0jr4GLEeWfx2LF0WsvN2avii43DFB23BaZAKd7myLV~pmN9L7pOILnsrktO2MQXy2q2OriZs8dxN3w0yboE5iAfTt8IWtHfNLp8aGx2HzmF1TacrPxuJmg~C9mawTGtZ2aKMBr7vHtN1VuZnOdVi5hcC7tQ0YuwPtAkOB1iO2Wbh3csYtfusPoIqezP1O-iUndNCRg86u3PZ71jXhSq4mMZaNZGYiiqB7nXVkBi15nUbQRb8AAAA
bacardi.i2p=oZm0JRHiUFKwAzLz1wlNOK2h2fI8V6u1nUhgCpt1RcErs99QMPHqu4oR4cel5lsJbeg1X0GgHe72JYabsntjimjWs1zi0RzknddVvq0hMGnn9EA-9Atu9qViScXp42ddnYhIlMBNNswMp8AJ01jHkO3SUSDSVk-rF5wnrhpyN9BFyho8h18nHrr6S4jaGHsuLVage8ImwQRv~PYfr4hVULdiFn5HDRePdvgzKUD3o4Y7nFiQMXswP02ZhXXtE-rMGDs2TD15qYah6mkQWdZtRYFuKuCh7Myn35u2IHGyMs56zpnuVr~w~Uh7wydg26JShsAz4AyZhGy70eTFUC2dtEr7bUvQSE3V3xNKOeBuGYadipWB30xmGDX2kfaU5Efy45MLPBwTU-8f4owQBokgSD3jBEc4vV0DbJrGoNnA3zpAC0JwZ9rFLolQNzenRgkWYlO09PEkKPUyUMpUBGY~Wj7jcsGnt5uNskeHHpV4hsvdpboHnlPAcZKvN7rTCenoAAAA
guttersnipe.i2p=yS6ECPvCobRLKMXJg-rLZ5PF6dRcXliR2e9KEX~aUZMEsznpzE-lCs~FgGd7w9-~GW8ObvKiyt47S7SuCunjndl874xPhrQ0dFRUNEsdovLBr6s9v2xi7kpWC1yKLbT-Ti3p8WhNPZkGsJl6YEIc357j4FjNVFx2-b~V16G0lhjhY1a55nsCoU7xXisdqOaKZL-4Y7vEVHqZNgM6cAZpY8vHtui30J3oGmV-RcJG9ahYkorqleKtb2G0fwbUMakxHv5uzlHyAU~r7OmS7NCIfoyBgU3MrSiwkYiW5elQ294XV-1Bciy9DFnJh4KA0rLIA7GanvkQ-NwqoGLsXz08mpi12vVssDhdavXooBeZLg7gipFYzle3-B1rgGKZ~51~9Tou~XSn6jGJGgtqKV7fTmys4~AXNy7vCWrMKQaWh6M1HI69yV466bJWv6nVhwNmOmtk48EIC3ik2IXC1wjA6Rgz2AcSiR-UEyaBkv251Vu6H92HZJTrsTzuzVmZcYt0AAAA
bdl.i2p=IMzcrD4jQLBtyzV8TdL-jgbyDhRjMBS1BfI6xGuddMyJEoMnZ1ZWDUn3NwqdyEYRQD7zREgbsQgzpE7Gdmpp-vLrzba3W1mUR5Ddu13tgIiDrOWR38Omv7L~DTDimK9m0Y-HCJROVigRfxdZbsI6P37d77NArdpwLzFos6qyK2C40JQFIaNCPYGkf05DsXtHLPwTJXYVKVp0R2V7nGmI77BUdwDgt4zSotlyvmsX6U1mYqKwutwr~oxggdvgfoNbrGC0~xQCbfQtEOwFYwxc5oUeJlt4jjb1-C9HAb7r4LtJ3Daqr0bx1hXS5hADw68cfUHbEjbfrhJhBA0mMqgGMm~r~0II8R19EXprc91YY4d0QycR2Osdn9MVNXS01Ziy~JX-SfT8DTjj2ZZn7rpyjcq7rgVbspPLJOdNQNERiAm06yjosOrPjtl8mrWXxWcOcGlMO5ftoTvX2hFsfc-vmfN7S8IZEfUdJRpJxDytChNTUr~qMWuP0LicNv0xKtA3AAAA

View File

@@ -4,7 +4,7 @@
<info>
<appname>i2p</appname>
<appversion>0.4.2</appversion>
<appversion>0.4.2.1</appversion>
<authors>
<author name="I2P" email="support@i2p.net"/>
</authors>

View File

@@ -103,6 +103,9 @@ wrapper.jvm_exit.timeout=10
# give the OS 60s to clear all the old sockets / etc before restarting
wrapper.restart.delay=60
wrapper.ping.interval=600
wrapper.ping.timeout=605
# use the wrapper's internal timer thread. otherwise this would
# force a restart of the router during daylight savings time as well
# as any time that the OS clock changes

View File

@@ -36,7 +36,6 @@ import net.i2p.data.i2np.TunnelMessage;
import net.i2p.router.message.GarlicMessageHandler;
import net.i2p.router.message.TunnelMessageHandler;
import net.i2p.router.startup.StartupJob;
import net.i2p.router.startup.VerifyClasspath;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.FileUtil;
@@ -794,7 +793,7 @@ public class Router {
public static void main(String args[]) {
installUpdates();
verifyClasspath();
verifyWrapperConfig();
Router r = new Router();
r.runRouter();
}
@@ -820,10 +819,11 @@ public class Router {
}
}
private static void verifyClasspath() {
boolean updated = VerifyClasspath.updateClasspath();
if (updated) {
System.out.println("INFO: Classpath updated, but the service wrapper requires you to manually restart");
private static void verifyWrapperConfig() {
File cfgUpdated = new File("wrapper.config.updated");
if (cfgUpdated.exists()) {
cfgUpdated.delete();
System.out.println("INFO: Wrapper config updated, but the service wrapper requires you to manually restart");
System.out.println("INFO: Shutting down the router - please rerun it!");
System.exit(EXIT_HARD);
}

View File

@@ -15,8 +15,8 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.87 $ $Date: 2004/11/25 16:57:20 $";
public final static String VERSION = "0.4.2";
public final static String ID = "$Revision: 1.95 $ $Date: 2004/11/30 18:41:51 $";
public final static String VERSION = "0.4.2.1";
public final static long BUILD = 0;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);

View File

@@ -143,6 +143,8 @@ public class StatisticsManager implements Service {
//includeRate("transport.receiveMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 });
//includeRate("transport.receiveMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("client.sendAckTime", stats, new long[] { 60*60*1000 }, true);
includeRate("stream.con.sendDuplicateSize", stats, new long[] { 60*60*1000 });
includeRate("stream.con.receiveDuplicateSize", stats, new long[] { 60*60*1000 });
//includeRate("client.sendsPerFailure", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
//includeRate("client.timeoutCongestionTunnel", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
//includeRate("client.timeoutCongestionMessage", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);

View File

@@ -55,7 +55,7 @@ public class ClientConnectionRunner {
/** user's config */
private SessionConfig _config;
/** static mapping of MessageId to Payload, storing messages for retrieval */
private static Map _messages;
private Map _messages;
/** lease set request state, or null if there is no request pending on at the moment */
private LeaseRequestState _leaseRequest;
/** currently allocated leaseSet, or null if none is allocated */
@@ -227,7 +227,7 @@ public class ClientConnectionRunner {
}
void disconnectClient(String reason) {
_log.error("Disconnecting the client: " + reason, new Exception("Disconnecting!"));
_log.error("Disconnecting the client: " + reason);
DisconnectMessage msg = new DisconnectMessage();
msg.setReason(reason);
try {

View File

@@ -42,9 +42,6 @@ public class ClientListenerRunner implements Runnable {
public void setPort(int port) { _port = port; }
public int getPort() { return _port; }
/** max time to bind */
private final static int MAX_FAIL_DELAY = 5*60*1000;
/**
* Start up the socket listener, listens for connections, and
* fires those connections off via {@link #runConnection runConnection}.
@@ -55,7 +52,7 @@ public class ClientListenerRunner implements Runnable {
public void runServer() {
_running = true;
int curDelay = 0;
while ( (_running) && (curDelay < MAX_FAIL_DELAY) ) {
while (_running) {
try {
_log.info("Starting up listening for connections on port " + _port);
_socket = new ServerSocket(_port);

View File

@@ -145,20 +145,21 @@ public class ClientManager {
if (runner != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Message " + msgId + " is targeting a local destination. distribute it as such");
runner.receiveMessage(toDest, fromDest, payload);
if (fromDest != null) {
ClientConnectionRunner sender = getRunner(fromDest);
if (sender != null) {
sender.updateMessageDeliveryStatus(msgId, true);
} else {
_log.log(Log.CRIT, "Um, wtf, we're sending a local message, but we can't find who sent it?", new Exception("wtf"));
}
ClientConnectionRunner sender = getRunner(fromDest);
if (sender == null) {
// sender went away
return;
}
_context.jobQueue().addJob(new DistributeLocal(toDest, runner, sender, fromDest, payload, msgId));
} else {
// remote. w00t
if (_log.shouldLog(Log.DEBUG))
_log.debug("Message " + msgId + " is targeting a REMOTE destination! Added to the client message pool");
runner = getRunner(fromDest);
if (runner == null) {
// sender went away
return;
}
ClientMessage msg = new ClientMessage();
msg.setDestination(toDest);
msg.setPayload(payload);
@@ -170,6 +171,32 @@ public class ClientManager {
}
}
private class DistributeLocal extends JobImpl {
private Destination _toDest;
private ClientConnectionRunner _to;
private ClientConnectionRunner _from;
private Destination _fromDest;
private Payload _payload;
private MessageId _msgId;
public DistributeLocal(Destination toDest, ClientConnectionRunner to, ClientConnectionRunner from, Destination fromDest, Payload payload, MessageId id) {
super(_context);
_toDest = toDest;
_to = to;
_from = from;
_fromDest = fromDest;
_payload = payload;
_msgId = id;
}
public String getName() { return "Distribute local message"; }
public void runJob() {
_to.receiveMessage(_toDest, _fromDest, _payload);
if (_from != null) {
_from.updateMessageDeliveryStatus(_msgId, true);
}
}
}
/**
* Request that a particular client authorize the Leases contained in the

View File

@@ -1,87 +0,0 @@
package net.i2p.router.startup;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.Properties;
import net.i2p.data.DataHelper;
/**
* Make sure that if there is a wrapper.config file, it includes
* all of the jar files necessary for the current build.
* HOLY CRAP THIS IS UGLY.
*
*/
public class VerifyClasspath {
private static final String NL = System.getProperty("line.separator");
private static final Set _jars = new HashSet();
static {
_jars.add("lib/ant.jar");
_jars.add("lib/heartbeat.jar");
_jars.add("lib/i2p.jar");
_jars.add("lib/i2ptunnel.jar");
_jars.add("lib/jasper-compiler.jar");
_jars.add("lib/jasper-runtime.jar");
_jars.add("lib/javax.servlet.jar");
_jars.add("lib/jnet.jar");
_jars.add("lib/mstreaming.jar");
_jars.add("lib/netmonitor.jar");
_jars.add("lib/org.mortbay.jetty.jar");
_jars.add("lib/router.jar");
_jars.add("lib/routerconsole.jar");
_jars.add("lib/sam.jar");
_jars.add("lib/wrapper.jar");
_jars.add("lib/xercesImpl.jar");
_jars.add("lib/xml-apis.jar");
_jars.add("lib/jbigi.jar");
_jars.add("lib/systray.jar");
_jars.add("lib/systray4j.jar");
_jars.add("lib/streaming.jar");
}
/**
* update the wrapper.config
*
* @return true if the classpath was updated and a restart is
* required, false otherwise.
*/
public static boolean updateClasspath() {
Properties p = new Properties();
File configFile = new File("wrapper.config");
Set needed = new HashSet(_jars);
try {
DataHelper.loadProps(p, configFile);
Set toAdd = new HashSet();
int entry = 1;
while (true) {
String value = p.getProperty("wrapper.java.classpath." + entry);
if (value == null) break;
needed.remove(value);
entry++;
}
if (needed.size() <= 0) {
// we have everything we need
return false;
} else {
// add on some new lines
FileWriter out = new FileWriter(configFile, true);
out.write(NL + "# Adding new libs as required by the update" + NL);
for (Iterator iter = needed.iterator(); iter.hasNext(); ) {
String name = (String)iter.next();
out.write("wrapper.java.classpath." + entry + "=" + name + NL);
}
out.close();
return true;
}
} catch (IOException ioe) {
ioe.printStackTrace();
return false;
}
}
}

View File

@@ -142,6 +142,7 @@ public class ConnectionBuilder {
con.setRemoteRouterIdentity(_actualPeer.getIdentity());
con.setRemoteAddress(_remoteAddress);
con.setAttemptedPeer(_target.getIdentity().getHash());
con.setShownAddress(_localIP);
if (_error == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Establishment successful! returning the con");

View File

@@ -30,6 +30,7 @@ public class MessageHandler implements I2NPMessageReader.I2NPMessageEventListene
}
public void messageReceived(I2NPMessageReader reader, I2NPMessage message, long msToRead, int size) {
_con.messageReceived();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Just received message " + message.getUniqueId() + " from "
+ _identHash.toBase64().substring(0,6)

View File

@@ -41,6 +41,8 @@ public class TCPAddress {
_host = null;
_port = -1;
_addr = null;
if (_log.shouldLog(Log.WARN))
_log.warn("Unknown host [" + host + "] for port [" + port + "]", uhe);
}
}

View File

@@ -30,6 +30,7 @@ public class TCPConnection {
private RouterIdentity _ident;
private Hash _attemptedPeer;
private TCPAddress _remoteAddress;
private String _shownAddress;
private List _pendingMessages;
private InputStream _in;
private OutputStream _out;
@@ -40,6 +41,8 @@ public class TCPConnection {
private RateStat _sendRate;
private long _started;
private boolean _closed;
private long _lastRead;
private long _lastWrite;
public TCPConnection(RouterContext ctx) {
_context = ctx;
@@ -47,12 +50,15 @@ public class TCPConnection {
_pendingMessages = new ArrayList(4);
_ident = null;
_remoteAddress = null;
_shownAddress = null;
_in = null;
_out = null;
_socket = null;
_transport = null;
_started = -1;
_closed = false;
_lastRead = 0;
_lastWrite = 0;
_runner = new ConnectionRunner(_context, this);
_context.statManager().createRateStat("tcp.probabalisticDropQueueSize", "How many bytes were queued to be sent when a message as dropped probabalistically?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
_context.statManager().createRateStat("tcp.queueSize", "How many bytes were queued on a connection?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
@@ -71,6 +77,10 @@ public class TCPConnection {
public void setRemoteAddress(TCPAddress addr) { _remoteAddress = addr; }
/** Who we initially were trying to contact */
public void setAttemptedPeer(Hash peer) { _attemptedPeer = peer; }
/** What address the peer said we are reachable on */
public void setShownAddress(String ip) { _shownAddress = ip; }
/** What address the peer said we are reachable on */
public String getShownAddress() { return _shownAddress; }
/**
* Actually start processing the messages on the connection (and reading
@@ -359,6 +369,19 @@ public class TCPConnection {
boolean getIsClosed() { return _closed; }
RouterContext getRouterContext() { return _context; }
boolean getIsActive() {
if ( (_lastRead <= 0) || (_lastWrite <= 0) ) return false;
long recent = (_lastRead > _lastWrite ? _lastRead : _lastWrite);
long howLongAgo = _context.clock().now() - recent;
if (howLongAgo < 1*60*1000)
return true;
else
return false;
}
void messageReceived() {
_lastRead = _context.clock().now();
}
/**
* The message was sent.
*
@@ -370,5 +393,7 @@ public class TCPConnection {
_transport.afterSend(msg, ok, true, time);
if (ok)
_sendRate.addData(msg.getMessageSize(), msg.getLifetime());
if (ok)
_lastWrite = _context.clock().now();
}
}

View File

@@ -40,6 +40,7 @@ public class TCPConnectionEstablisher implements Runnable {
if (con != null) {
_transport.connectionEstablished(con);
} else {
if (!_context.router().isAlive()) return;
_transport.addConnectionErrorMessage(cb.getError());
Hash peer = info.getIdentity().getHash();
_context.profileManager().commErrorOccurred(peer);

View File

@@ -69,6 +69,10 @@ class TCPListener {
if (addr.getPort() > 0) {
if (_listener != null) {
if ( (_listener.getMyAddress().getPort() == addr.getPort()) &&
(_listener.getMyAddress().getHost() == null) ) {
_listener.getMyAddress().setHost(addr.getHost());
}
if (_log.shouldLog(Log.WARN))
_log.warn("Not starting another listener on " + addr
+ " while already listening on " + _listener.getMyAddress());
@@ -137,7 +141,7 @@ class TCPListener {
public void run() {
if (_log.shouldLog(Log.INFO))
_log.info("Beginning TCP listener");
_log.info("Beginning TCP listener on " + _myAddress);
int curDelay = 0;
while (_isRunning) {

View File

@@ -340,8 +340,10 @@ public class TCPTransport extends TransportImpl {
* @param address address that the remote host said was ours
*/
void ourAddressReceived(String address) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Address received [" + address + "] our address: [" + _myAddress + "]");
synchronized (_listener) { // no need to lock on the whole TCPTransport
if (allowAddressUpdate()) {
if (allowAddressUpdate(address)) {
int port = getPort();
TCPAddress addr = new TCPAddress(address, port);
if (addr.getPort() > 0) {
@@ -349,12 +351,17 @@ public class TCPTransport extends TransportImpl {
if (_myAddress != null) {
if (addr.getAddress().equals(_myAddress.getAddress())) {
// ignore, since there is no change
if (_log.shouldLog(Log.INFO))
_log.info("Not updating our local address, as it hasnt changed from " + address);
return;
}
}
if (_log.shouldLog(Log.INFO))
_log.info("Update our local address to " + address);
updateAddress(addr);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Address received is NOT a valid address! [" + addr + "]");
}
} else {
if (_log.shouldLog(Log.ERROR))
@@ -363,6 +370,8 @@ public class TCPTransport extends TransportImpl {
} else {
// either we have explicitly specified our IP address, or
// we are already connected to some people.
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not allowing address update");
}
}
}
@@ -451,9 +460,16 @@ public class TCPTransport extends TransportImpl {
*
*/
boolean allowAddress(TCPAddress address) {
if (address == null) return false;
if ( (address.getPort() <= 0) || (address.getPort() > 65535) )
if (address == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Address is null?!");
return false;
}
if ( (address.getPort() <= 0) || (address.getPort() > 65535) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Port is invalid? " + address.getPort());
return false;
}
if (!address.isPubliclyRoutable()) {
String allowLocal = _context.getProperty(LISTEN_ALLOW_LOCAL, "false");
if (Boolean.valueOf(allowLocal).booleanValue()) {
@@ -504,12 +520,39 @@ public class TCPTransport extends TransportImpl {
* have no fully established connections.
*
*/
private boolean allowAddressUpdate() {
boolean addressSpecified = (null != _context.getProperty(LISTEN_ADDRESS));
if (addressSpecified)
return false;
private boolean allowAddressUpdate(String proposedAddress) {
int connectedPeers = countActivePeers();
return (connectedPeers == 0);
boolean addressSpecified = (null != _context.getProperty(LISTEN_ADDRESS));
if (addressSpecified) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not allowing address update, sicne we have one specified (#cons=" + connectedPeers + ")");
return false;
}
if (connectedPeers < 3) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allowing address update, since the # of connected peers is " + connectedPeers);
return true;
} else if (connectedPeers == 3) {
// ok, now comes the vote:
// if we agree with the majority, allow the update
// otherwise, reject the update
int agreed = countActiveAgreeingPeers(proposedAddress);
if (agreed > 1) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Most common address selected, allowing address update w/ # of connected peers is " + connectedPeers);
return true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Proposed address [" + proposedAddress + "] is only used by " + agreed
+ ", rejecting address update w/ # of connected peers is "
+ connectedPeers);
return false;
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not allowing address update, since the # of connected peers is " + connectedPeers);
return false;
}
}
/**
@@ -544,9 +587,44 @@ public class TCPTransport extends TransportImpl {
*
*/
public int countActivePeers() {
int numActive = 0;
int numInactive = 0;
synchronized (_connectionLock) {
return _connectionsByIdent.size();
if (_connectionsByIdent.size() <= 0) return 0;
for (Iterator iter = _connectionsByIdent.values().iterator(); iter.hasNext(); ) {
TCPConnection con = (TCPConnection)iter.next();
if (con.getIsActive())
numActive++;
else
numInactive++;
}
}
if ( (numInactive > 0) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("Inactive peers: " + numInactive + " active: " + numActive);
return numActive;
}
/**
* How many peers that we are connected to think we are reachable at the given
* address?
*
*/
public int countActiveAgreeingPeers(String address) {
int agreed = 0;
synchronized (_connectionLock) {
if (_connectionsByIdent.size() <= 0) return 0;
for (Iterator iter = _connectionsByIdent.values().iterator(); iter.hasNext(); ) {
TCPConnection con = (TCPConnection)iter.next();
if (con.getIsActive()) {
String shown = con.getShownAddress();
if ( (shown != null) && (shown.equals(address)) )
agreed++;
}
}
}
return agreed;
}
/**

View File

@@ -2,6 +2,7 @@ package net.i2p.router.tunnelmanager;
import java.io.IOException;
import java.io.Writer;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -152,7 +153,7 @@ class TunnelPool {
*
*/
public Set getManagedTunnelIds() {
if (!_isLive) return null;
if (!_isLive) return Collections.EMPTY_SET;
Set ids = new HashSet(64);
synchronized (_outboundTunnels) {
ids.addAll(_outboundTunnels.keySet());