forked from I2P_Developers/i2p.i2p
- Fix handling of duplicate participating tunnel IDs
- Reduce chance of generating duplicate IDs for our tunnels (ticket #812) - Stat cleanup - Comment out effectively unused countProactiveDrops()
This commit is contained in:
@@ -1,3 +1,8 @@
|
||||
2012-12-29 zzz
|
||||
* i2psnark: Redirect after post
|
||||
* Javadocs: Fix javax links
|
||||
* Tunnels: Handle duplicate tunnel IDs (ticket #812)
|
||||
|
||||
2012-12-26 zzz
|
||||
* I2CP:
|
||||
- Prep for delivery of detailed failure codes to the client (ticket #788)
|
||||
|
@@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Monotone";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 2;
|
||||
public final static long BUILD = 3;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
@@ -27,7 +27,7 @@ public interface TunnelManagerFacade extends Service {
|
||||
* Retrieve the information related to a particular tunnel
|
||||
*
|
||||
* @param id the tunnelId as seen at the gateway
|
||||
*
|
||||
* @deprecated unused
|
||||
*/
|
||||
TunnelInfo getTunnelInfo(TunnelId id);
|
||||
|
||||
|
@@ -29,6 +29,7 @@ import net.i2p.router.tunnel.pool.TunnelPool;
|
||||
*/
|
||||
public class DummyTunnelManagerFacade implements TunnelManagerFacade {
|
||||
|
||||
/** @deprecated unused */
|
||||
public TunnelInfo getTunnelInfo(TunnelId id) { return null; }
|
||||
public TunnelInfo selectInboundTunnel() { return null; }
|
||||
public TunnelInfo selectInboundTunnel(Hash destination) { return null; }
|
||||
|
@@ -61,11 +61,12 @@ class HopProcessor {
|
||||
public boolean process(byte orig[], int offset, int length, Hash prev) {
|
||||
// prev is null on gateways
|
||||
if (prev != null) {
|
||||
if (_config.getReceiveFrom() == null)
|
||||
if (_config.getReceiveFrom() == null) {
|
||||
_config.setReceiveFrom(prev);
|
||||
if (!_config.getReceiveFrom().equals(prev)) {
|
||||
} else if (!_config.getReceiveFrom().equals(prev)) {
|
||||
// shouldn't happen now that we have good dup ID detection in BuildHandler
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Invalid previous peer - attempted hostile loop? from " + prev
|
||||
_log.error("Attempted mid-tunnel injection from " + prev
|
||||
+ ", expected " + _config.getReceiveFrom());
|
||||
return false;
|
||||
}
|
||||
|
@@ -71,13 +71,14 @@ public class TunnelDispatcher implements Service {
|
||||
private final RouterContext _context;
|
||||
private final Log _log;
|
||||
/** us */
|
||||
private final Map<TunnelId, TunnelGateway> _outboundGateways;
|
||||
private final Map<TunnelId, OutboundTunnelEndpoint> _outboundEndpoints;
|
||||
private final ConcurrentHashMap<TunnelId, TunnelGateway> _outboundGateways;
|
||||
private final ConcurrentHashMap<TunnelId, OutboundTunnelEndpoint> _outboundEndpoints;
|
||||
/** regular participant or IBEP of our own tunnel */
|
||||
private final Map<TunnelId, TunnelParticipant> _participants;
|
||||
private final ConcurrentHashMap<TunnelId, TunnelParticipant> _participants;
|
||||
/** regular IBGW or our own zero-hop inbound */
|
||||
private final Map<TunnelId, TunnelGateway> _inboundGateways;
|
||||
private final Map<TunnelId, HopConfig> _participatingConfig;
|
||||
private final ConcurrentHashMap<TunnelId, TunnelGateway> _inboundGateways;
|
||||
/** anything we did not create - IBGW, OBEP, or middle hop */
|
||||
private final ConcurrentHashMap<TunnelId, HopConfig> _participatingConfig;
|
||||
/** what is the date/time on which the last non-locally-created tunnel expires? */
|
||||
private long _lastParticipatingExpiration;
|
||||
private BloomFilterIVValidator _validator;
|
||||
@@ -85,6 +86,7 @@ public class TunnelDispatcher implements Service {
|
||||
/** what is the date/time we last deliberately dropped a tunnel? **/
|
||||
//private long _lastDropTime;
|
||||
private final TunnelGatewayPumper _pumper;
|
||||
private final Object _joinParticipantLock = new Object();
|
||||
|
||||
/** for shouldDropParticipatingMessage() */
|
||||
enum Location {OBEP, PARTICIPANT, IBGW}
|
||||
@@ -233,93 +235,121 @@ public class TunnelDispatcher implements Service {
|
||||
|
||||
/**
|
||||
* We are the outbound gateway - we created this tunnel
|
||||
*
|
||||
* @return success; false if Tunnel ID is a duplicate
|
||||
*/
|
||||
public void joinOutbound(TunnelCreatorConfig cfg) {
|
||||
public boolean joinOutbound(TunnelCreatorConfig cfg) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Outbound built successfully: " + cfg);
|
||||
TunnelGateway gw;
|
||||
if (cfg.getLength() > 1) {
|
||||
TunnelGateway.QueuePreprocessor preproc = createPreprocessor(cfg);
|
||||
TunnelGateway.Sender sender = new OutboundSender(_context, cfg);
|
||||
TunnelGateway.Receiver receiver = new OutboundReceiver(_context, cfg);
|
||||
//TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
|
||||
TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper);
|
||||
TunnelId outId = cfg.getConfig(0).getSendTunnel();
|
||||
_outboundGateways.put(outId, gw);
|
||||
gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper);
|
||||
} else {
|
||||
gw = new TunnelGatewayZeroHop(_context, cfg);
|
||||
}
|
||||
TunnelId outId = cfg.getConfig(0).getSendTunnel();
|
||||
if (_outboundGateways.putIfAbsent(outId, gw) != null)
|
||||
return false;
|
||||
if (cfg.getLength() > 1) {
|
||||
_context.statManager().addRateData("tunnel.joinOutboundGateway", 1);
|
||||
_context.messageHistory().tunnelJoined("outbound", cfg);
|
||||
} else {
|
||||
TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg);
|
||||
TunnelId outId = cfg.getConfig(0).getSendTunnel();
|
||||
_outboundGateways.put(outId, gw);
|
||||
_context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1);
|
||||
_context.messageHistory().tunnelJoined("outboundZeroHop", cfg);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* We are the inbound endpoint - we created this tunnel
|
||||
*
|
||||
* @return success; false if Tunnel ID is a duplicate
|
||||
*/
|
||||
public void joinInbound(TunnelCreatorConfig cfg) {
|
||||
public boolean joinInbound(TunnelCreatorConfig cfg) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Inbound built successfully: " + cfg);
|
||||
|
||||
if (cfg.getLength() > 1) {
|
||||
TunnelParticipant participant = new TunnelParticipant(_context, new InboundEndpointProcessor(_context, cfg, _validator));
|
||||
TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel();
|
||||
_participants.put(recvId, participant);
|
||||
if (_participants.putIfAbsent(recvId, participant) != null)
|
||||
return false;
|
||||
_context.statManager().addRateData("tunnel.joinInboundEndpoint", 1);
|
||||
_context.messageHistory().tunnelJoined("inboundEndpoint", cfg);
|
||||
} else {
|
||||
TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg);
|
||||
TunnelId recvId = cfg.getConfig(0).getReceiveTunnel();
|
||||
_inboundGateways.put(recvId, gw);
|
||||
if (_inboundGateways.putIfAbsent(recvId, gw) != null)
|
||||
return false;
|
||||
_context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1);
|
||||
_context.messageHistory().tunnelJoined("inboundEndpointZeroHop", cfg);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* We are a participant in this tunnel, but not as the endpoint or gateway
|
||||
*
|
||||
* @return success; false if Tunnel ID is a duplicate
|
||||
*/
|
||||
public void joinParticipant(HopConfig cfg) {
|
||||
public boolean joinParticipant(HopConfig cfg) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Joining as participant: " + cfg);
|
||||
TunnelId recvId = cfg.getReceiveTunnel();
|
||||
TunnelParticipant participant = new TunnelParticipant(_context, cfg, new HopProcessor(_context, cfg, _validator));
|
||||
_participants.put(recvId, participant);
|
||||
_participatingConfig.put(recvId, cfg);
|
||||
synchronized (_joinParticipantLock) {
|
||||
if (_participatingConfig.putIfAbsent(recvId, cfg) != null)
|
||||
return false;
|
||||
if (_participants.putIfAbsent(recvId, participant) != null) {
|
||||
_participatingConfig.remove(recvId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
_context.messageHistory().tunnelJoined("participant", cfg);
|
||||
_context.statManager().addRateData("tunnel.joinParticipant", 1);
|
||||
if (cfg.getExpiration() > _lastParticipatingExpiration)
|
||||
_lastParticipatingExpiration = cfg.getExpiration();
|
||||
_leaveJob.add(cfg);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* We are the outbound endpoint in this tunnel, and did not create it
|
||||
*
|
||||
* @return success; false if Tunnel ID is a duplicate
|
||||
*/
|
||||
public void joinOutboundEndpoint(HopConfig cfg) {
|
||||
public boolean joinOutboundEndpoint(HopConfig cfg) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Joining as OBEP: " + cfg);
|
||||
TunnelId recvId = cfg.getReceiveTunnel();
|
||||
OutboundTunnelEndpoint endpoint = new OutboundTunnelEndpoint(_context, cfg, new HopProcessor(_context, cfg, _validator));
|
||||
_outboundEndpoints.put(recvId, endpoint);
|
||||
_participatingConfig.put(recvId, cfg);
|
||||
synchronized (_joinParticipantLock) {
|
||||
if (_participatingConfig.putIfAbsent(recvId, cfg) != null)
|
||||
return false;
|
||||
if (_outboundEndpoints.putIfAbsent(recvId, endpoint) != null) {
|
||||
_participatingConfig.remove(recvId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
_context.messageHistory().tunnelJoined("outboundEndpoint", cfg);
|
||||
_context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1);
|
||||
|
||||
if (cfg.getExpiration() > _lastParticipatingExpiration)
|
||||
_lastParticipatingExpiration = cfg.getExpiration();
|
||||
_leaveJob.add(cfg);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* We are the inbound gateway in this tunnel, and did not create it
|
||||
*
|
||||
* @return success; false if Tunnel ID is a duplicate
|
||||
*/
|
||||
public void joinInboundGateway(HopConfig cfg) {
|
||||
public boolean joinInboundGateway(HopConfig cfg) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Joining as IBGW: " + cfg);
|
||||
TunnelGateway.QueuePreprocessor preproc = createPreprocessor(cfg);
|
||||
@@ -328,20 +358,77 @@ public class TunnelDispatcher implements Service {
|
||||
//TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver);
|
||||
TunnelGateway gw = new ThrottledPumpedTunnelGateway(_context, preproc, sender, receiver, _pumper, cfg);
|
||||
TunnelId recvId = cfg.getReceiveTunnel();
|
||||
_inboundGateways.put(recvId, gw);
|
||||
_participatingConfig.put(recvId, cfg);
|
||||
synchronized (_joinParticipantLock) {
|
||||
if (_participatingConfig.putIfAbsent(recvId, cfg) != null)
|
||||
return false;
|
||||
if (_inboundGateways.putIfAbsent(recvId, gw) != null) {
|
||||
_participatingConfig.remove(recvId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
_context.messageHistory().tunnelJoined("inboundGateway", cfg);
|
||||
_context.statManager().addRateData("tunnel.joinInboundGateway", 1);
|
||||
|
||||
if (cfg.getExpiration() > _lastParticipatingExpiration)
|
||||
_lastParticipatingExpiration = cfg.getExpiration();
|
||||
_leaveJob.add(cfg);
|
||||
return true;
|
||||
}
|
||||
|
||||
public int getParticipatingCount() {
|
||||
return _participatingConfig.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a new random send tunnel ID that isn't a dup.
|
||||
* Note that we do not keep track of IDs for pending builds so this
|
||||
* does not fully prevent joinOutbound() from failing later.
|
||||
* @since 0.9.5
|
||||
*/
|
||||
public long getNewOBGWID() {
|
||||
long rv;
|
||||
TunnelId tid;
|
||||
do {
|
||||
rv = _context.random().nextLong(TunnelId.MAX_ID_VALUE);
|
||||
tid = new TunnelId(rv);
|
||||
} while (_outboundGateways.containsKey(tid));
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a new random receive tunnel ID that isn't a dup.
|
||||
* Not for zero hop tunnels.
|
||||
* Note that we do not keep track of IDs for pending builds so this
|
||||
* does not fully prevent joinInbound() from failing later.
|
||||
* @since 0.9.5
|
||||
*/
|
||||
public long getNewIBEPID() {
|
||||
long rv;
|
||||
TunnelId tid;
|
||||
do {
|
||||
rv = _context.random().nextLong(TunnelId.MAX_ID_VALUE);
|
||||
tid = new TunnelId(rv);
|
||||
} while (_participants.containsKey(tid));
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a new random receive tunnel ID that isn't a dup.
|
||||
* For zero hop tunnels only.
|
||||
* Note that we do not keep track of IDs for pending builds so this
|
||||
* does not fully prevent joinInbound() from failing later.
|
||||
* @since 0.9.5
|
||||
*/
|
||||
public long getNewIBZeroHopID() {
|
||||
long rv;
|
||||
TunnelId tid;
|
||||
do {
|
||||
rv = _context.random().nextLong(TunnelId.MAX_ID_VALUE);
|
||||
tid = new TunnelId(rv);
|
||||
} while (_inboundGateways.containsKey(tid));
|
||||
return rv;
|
||||
}
|
||||
|
||||
/******* may be used for congestion control later...
|
||||
public int getParticipatingInboundGatewayCount() {
|
||||
return _inboundGateways.size();
|
||||
@@ -598,6 +685,10 @@ public class TunnelDispatcher implements Service {
|
||||
// _context.statManager().addRateData("tunnel.dispatchOutboundTime", dispatchTime, dispatchTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Only for console TunnelRenderer.
|
||||
* @return a copy
|
||||
*/
|
||||
public List<HopConfig> listParticipatingTunnels() {
|
||||
return new ArrayList(_participatingConfig.values());
|
||||
}
|
||||
|
@@ -466,13 +466,10 @@ class BuildExecutor implements Runnable {
|
||||
void buildTunnel(TunnelPool pool, PooledTunnelCreatorConfig cfg) {
|
||||
long beforeBuild = System.currentTimeMillis();
|
||||
if (cfg.getLength() > 1) {
|
||||
// should we allow an ID of 0?
|
||||
cfg.setReplyMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
|
||||
if (addToBuilding(cfg)) {
|
||||
_log.error("Dup reply ID: " + cfg.getReplyMessageId());
|
||||
// fail
|
||||
return;
|
||||
}
|
||||
do {
|
||||
// should we allow an ID of 0?
|
||||
cfg.setReplyMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
|
||||
} while (addToBuilding(cfg)); // if a dup, go araound again
|
||||
}
|
||||
BuildRequestor.request(_context, pool, cfg, this);
|
||||
long buildTime = System.currentTimeMillis() - beforeBuild;
|
||||
|
@@ -93,6 +93,9 @@ class BuildHandler implements Runnable {
|
||||
_context.statManager().createRequiredRateStat("tunnel.decryptRequestTime", "Time to decrypt a build request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRequiredRateStat("tunnel.rejectTimeout", "Reject tunnel count (unknown next hop)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRequiredRateStat("tunnel.rejectTimeout2", "Reject tunnel count (can't contact next hop)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRequiredRateStat("tunnel.rejectDupID", "Part. tunnel dup ID", "Tunnels", new long[] { 24*60*60*1000 });
|
||||
_context.statManager().createRequiredRateStat("tunnel.ownDupID", "Our tunnel dup. ID", "Tunnels", new long[] { 24*60*60*1000 });
|
||||
_context.statManager().createRequiredRateStat("tunnel.rejectHostile", "Reject malicious tunnel", "Tunnels", new long[] { 24*60*60*1000 });
|
||||
|
||||
_context.statManager().createRequiredRateStat("tunnel.rejectOverloaded", "Delay to process rejected request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRequiredRateStat("tunnel.acceptLoad", "Delay to process accepted request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
|
||||
@@ -189,7 +192,7 @@ class BuildHandler implements Runnable {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Not even trying to handle/decrypt the request " + state.msg.getUniqueId()
|
||||
+ ", since we received it a long time ago: " + (now - state.recvTime));
|
||||
_context.statManager().addRateData("tunnel.dropLoadDelay", now - state.recvTime, 0);
|
||||
_context.statManager().addRateData("tunnel.dropLoadDelay", now - state.recvTime);
|
||||
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow"));
|
||||
return;
|
||||
}
|
||||
@@ -212,7 +215,7 @@ class BuildHandler implements Runnable {
|
||||
// cannot handle - not pending... took too long?
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("The reply " + replyMessageId + " did not match any pending tunnels");
|
||||
_context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0);
|
||||
_context.statManager().addRateData("tunnel.buildReplyTooSlow", 1);
|
||||
} else {
|
||||
handleReply(state.msg, cfg, System.currentTimeMillis()-state.recvTime);
|
||||
}
|
||||
@@ -258,9 +261,9 @@ class BuildHandler implements Runnable {
|
||||
else if (_log.shouldLog(Log.WARN)) _log.warn("Failed detecting bwTier, null routerInfo for: " + peer);
|
||||
// Record that a peer of the given tier agreed or rejected
|
||||
if (howBad == 0) {
|
||||
_context.statManager().addRateData("tunnel.tierAgree" + bwTier, 1, 0);
|
||||
_context.statManager().addRateData("tunnel.tierAgree" + bwTier, 1);
|
||||
} else {
|
||||
_context.statManager().addRateData("tunnel.tierReject" + bwTier, 1, 0);
|
||||
_context.statManager().addRateData("tunnel.tierReject" + bwTier, 1);
|
||||
}
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(msg.getUniqueId() + ": Peer " + peer + " replied with status " + howBad);
|
||||
@@ -272,17 +275,17 @@ class BuildHandler implements Runnable {
|
||||
allAgree = false;
|
||||
switch (howBad) {
|
||||
case TunnelHistory.TUNNEL_REJECT_BANDWIDTH:
|
||||
_context.statManager().addRateData("tunnel.receiveRejectionBandwidth", 1, 0);
|
||||
_context.statManager().addRateData("tunnel.receiveRejectionBandwidth", 1);
|
||||
break;
|
||||
case TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD:
|
||||
_context.statManager().addRateData("tunnel.receiveRejectionTransient", 1, 0);
|
||||
_context.statManager().addRateData("tunnel.receiveRejectionTransient", 1);
|
||||
break;
|
||||
case TunnelHistory.TUNNEL_REJECT_PROBABALISTIC_REJECT:
|
||||
_context.statManager().addRateData("tunnel.receiveRejectionProbabalistic", 1, 0);
|
||||
_context.statManager().addRateData("tunnel.receiveRejectionProbabalistic", 1);
|
||||
break;
|
||||
case TunnelHistory.TUNNEL_REJECT_CRIT:
|
||||
default:
|
||||
_context.statManager().addRateData("tunnel.receiveRejectionCritical", 1, 0);
|
||||
_context.statManager().addRateData("tunnel.receiveRejectionCritical", 1);
|
||||
}
|
||||
// penalize peer based on their bitchiness level
|
||||
_context.profileManager().tunnelRejected(peer, rtt, howBad);
|
||||
@@ -292,10 +295,20 @@ class BuildHandler implements Runnable {
|
||||
|
||||
if (allAgree) {
|
||||
// wikked, completely build
|
||||
boolean success;
|
||||
if (cfg.isInbound())
|
||||
_context.tunnelDispatcher().joinInbound(cfg);
|
||||
success = _context.tunnelDispatcher().joinInbound(cfg);
|
||||
else
|
||||
_context.tunnelDispatcher().joinOutbound(cfg);
|
||||
success = _context.tunnelDispatcher().joinOutbound(cfg);
|
||||
if (!success) {
|
||||
// This will happen very rarely. We check for dups when
|
||||
// creating the config, but we don't track IDs for builds in progress.
|
||||
_context.statManager().addRateData("tunnel.ownDupID", 1);
|
||||
_exec.buildComplete(cfg, cfg.getTunnelPool());
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Dup ID for our own tunnel " + cfg);
|
||||
return;
|
||||
}
|
||||
cfg.getTunnelPool().addTunnel(cfg); // self.self.self.foo!
|
||||
// call buildComplete() after addTunnel() so we don't try another build.
|
||||
_exec.buildComplete(cfg, cfg.getTunnelPool());
|
||||
@@ -305,21 +318,21 @@ class BuildHandler implements Runnable {
|
||||
cfg.setExpireJob(expireJob);
|
||||
_context.jobQueue().addJob(expireJob);
|
||||
if (cfg.getDestination() == null)
|
||||
_context.statManager().addRateData("tunnel.buildExploratorySuccess", rtt, rtt);
|
||||
_context.statManager().addRateData("tunnel.buildExploratorySuccess", rtt);
|
||||
else
|
||||
_context.statManager().addRateData("tunnel.buildClientSuccess", rtt, rtt);
|
||||
_context.statManager().addRateData("tunnel.buildClientSuccess", rtt);
|
||||
} else {
|
||||
// someone is no fun
|
||||
_exec.buildComplete(cfg, cfg.getTunnelPool());
|
||||
if (cfg.getDestination() == null)
|
||||
_context.statManager().addRateData("tunnel.buildExploratoryReject", rtt, rtt);
|
||||
_context.statManager().addRateData("tunnel.buildExploratoryReject", rtt);
|
||||
else
|
||||
_context.statManager().addRateData("tunnel.buildClientReject", rtt, rtt);
|
||||
_context.statManager().addRateData("tunnel.buildClientReject", rtt);
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(msg.getUniqueId() + ": Tunnel reply could not be decrypted for tunnel " + cfg);
|
||||
_context.statManager().addRateData("tunnel.corruptBuildReply", 1, 0);
|
||||
_context.statManager().addRateData("tunnel.corruptBuildReply", 1);
|
||||
// don't leak
|
||||
_exec.buildComplete(cfg, cfg.getTunnelPool());
|
||||
}
|
||||
@@ -337,7 +350,7 @@ class BuildHandler implements Runnable {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Not even trying to handle/decrypt the request " + state.msg.getUniqueId()
|
||||
+ ", since we received it a long time ago: " + timeSinceReceived);
|
||||
_context.statManager().addRateData("tunnel.dropLoadDelay", timeSinceReceived, 0);
|
||||
_context.statManager().addRateData("tunnel.dropLoadDelay", timeSinceReceived);
|
||||
return -1;
|
||||
}
|
||||
// ok, this is not our own tunnel, so we need to do some heavy lifting
|
||||
@@ -346,7 +359,7 @@ class BuildHandler implements Runnable {
|
||||
long beforeDecrypt = System.currentTimeMillis();
|
||||
BuildRequestRecord req = _processor.decrypt(_context, state.msg, _context.routerHash(), _context.keyManager().getPrivateKey());
|
||||
long decryptTime = System.currentTimeMillis() - beforeDecrypt;
|
||||
_context.statManager().addRateData("tunnel.decryptRequestTime", decryptTime, decryptTime);
|
||||
_context.statManager().addRateData("tunnel.decryptRequestTime", decryptTime);
|
||||
if (decryptTime > 500 && _log.shouldLog(Log.WARN))
|
||||
_log.warn("Took too long to decrypt the request: " + decryptTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago");
|
||||
if (req == null) {
|
||||
@@ -416,11 +429,11 @@ class BuildHandler implements Runnable {
|
||||
RouterInfo ri = getContext().netDb().lookupRouterInfoLocally(_nextPeer);
|
||||
if (ri != null) {
|
||||
handleReq(ri, _state, _req, _nextPeer);
|
||||
getContext().statManager().addRateData("tunnel.buildLookupSuccess", 1, 0);
|
||||
getContext().statManager().addRateData("tunnel.buildLookupSuccess", 1);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Deferred successfully, but we couldnt find " + _nextPeer);
|
||||
getContext().statManager().addRateData("tunnel.buildLookupSuccess", 0, 0);
|
||||
getContext().statManager().addRateData("tunnel.buildLookupSuccess", 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -437,8 +450,8 @@ class BuildHandler implements Runnable {
|
||||
}
|
||||
public String getName() { return "Timeout looking for next peer for tunnel join"; }
|
||||
public void runJob() {
|
||||
getContext().statManager().addRateData("tunnel.rejectTimeout", 1, 0);
|
||||
getContext().statManager().addRateData("tunnel.buildLookupSuccess", 0, 0);
|
||||
getContext().statManager().addRateData("tunnel.rejectTimeout", 1);
|
||||
getContext().statManager().addRateData("tunnel.buildLookupSuccess", 0);
|
||||
// logging commented out so class can be static
|
||||
//if (_log.shouldLog(Log.WARN))
|
||||
// _log.warn("Request " + _state.msg.getUniqueId()
|
||||
@@ -455,6 +468,7 @@ class BuildHandler implements Runnable {
|
||||
* If we are dropping lots of requests before even trying to handle them,
|
||||
* I suppose you could call us "overloaded"
|
||||
*/
|
||||
/**** unused, see handleReq() below
|
||||
private final static int MAX_PROACTIVE_DROPS = 240;
|
||||
|
||||
private int countProactiveDrops() {
|
||||
@@ -465,6 +479,7 @@ class BuildHandler implements Runnable {
|
||||
dropped += countEvents("tunnel.dropLoadDelay", 60*1000);
|
||||
return dropped;
|
||||
}
|
||||
|
||||
private int countEvents(String stat, long period) {
|
||||
RateStat rs = _context.statManager().getRate(stat);
|
||||
if (rs != null) {
|
||||
@@ -474,6 +489,7 @@ class BuildHandler implements Runnable {
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
****/
|
||||
|
||||
/**
|
||||
* Actually process the request and send the reply.
|
||||
@@ -491,6 +507,7 @@ class BuildHandler implements Runnable {
|
||||
|
||||
// Loop checks
|
||||
if ((!isOutEnd) && _context.routerHash().equals(nextPeer)) {
|
||||
_context.statManager().addRateData("tunnel.rejectHostile", 1);
|
||||
// We are 2 hops in a row? Drop it without a reply.
|
||||
// No way to recognize if we are every other hop, but see below
|
||||
_log.error("Dropping build request, we the next hop");
|
||||
@@ -502,6 +519,7 @@ class BuildHandler implements Runnable {
|
||||
if (from == null)
|
||||
from = state.from.calculateHash();
|
||||
if (_context.routerHash().equals(from)) {
|
||||
_context.statManager().addRateData("tunnel.rejectHostile", 1);
|
||||
_log.error("Dropping build request, we are the previous hop");
|
||||
return;
|
||||
}
|
||||
@@ -513,6 +531,7 @@ class BuildHandler implements Runnable {
|
||||
// Previous and next hop the same? Don't help somebody be evil. Drop it without a reply.
|
||||
// A-B-C-A is not preventable
|
||||
if (nextPeer.equals(from)) {
|
||||
_context.statManager().addRateData("tunnel.rejectHostile", 1);
|
||||
_log.error("Dropping build request with the same previous and next hop");
|
||||
return;
|
||||
}
|
||||
@@ -525,30 +544,35 @@ class BuildHandler implements Runnable {
|
||||
int ourSlot = -1;
|
||||
|
||||
int response = _context.throttle().acceptTunnelRequest();
|
||||
if (_context.tunnelManager().getTunnelInfo(new TunnelId(ourId)) != null) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Already participating in a tunnel with the given Id (" + ourId + "), so gotta reject");
|
||||
if (response == 0)
|
||||
response = TunnelHistory.TUNNEL_REJECT_PROBABALISTIC_REJECT;
|
||||
}
|
||||
|
||||
// This only checked OUR tunnels, so the log message was wrong.
|
||||
// Now checked by TunnelDispatcher.joinXXX()
|
||||
// and returned as success value, checked below.
|
||||
//if (_context.tunnelManager().getTunnelInfo(new TunnelId(ourId)) != null) {
|
||||
// if (_log.shouldLog(Log.ERROR))
|
||||
// _log.error("Already participating in a tunnel with the given Id (" + ourId + "), so gotta reject");
|
||||
// if (response == 0)
|
||||
// response = TunnelHistory.TUNNEL_REJECT_PROBABALISTIC_REJECT;
|
||||
//}
|
||||
|
||||
//if ( (response == 0) && (_context.random().nextInt(50) <= 1) )
|
||||
// response = TunnelHistory.TUNNEL_REJECT_PROBABALISTIC_REJECT;
|
||||
|
||||
int proactiveDrops = countProactiveDrops();
|
||||
long recvDelay = _context.clock().now()-state.recvTime;
|
||||
if (response == 0) {
|
||||
// unused
|
||||
//int proactiveDrops = countProactiveDrops();
|
||||
float pDrop = ((float) recvDelay) / (float) (BuildRequestor.REQUEST_TIMEOUT*3);
|
||||
pDrop = (float)Math.pow(pDrop, 16);
|
||||
if (_context.random().nextFloat() < pDrop) { // || (proactiveDrops > MAX_PROACTIVE_DROPS) ) ) {
|
||||
_context.statManager().addRateData("tunnel.rejectOverloaded", recvDelay, proactiveDrops);
|
||||
_context.statManager().addRateData("tunnel.rejectOverloaded", recvDelay);
|
||||
_context.throttle().setTunnelStatus(_x("Rejecting tunnels: Request overload"));
|
||||
if (true || (proactiveDrops < MAX_PROACTIVE_DROPS*2))
|
||||
//if (true || (proactiveDrops < MAX_PROACTIVE_DROPS*2))
|
||||
response = TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD;
|
||||
else
|
||||
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
|
||||
//else
|
||||
// response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
|
||||
} else {
|
||||
_context.statManager().addRateData("tunnel.acceptLoad", recvDelay, recvDelay);
|
||||
_context.statManager().addRateData("tunnel.acceptLoad", recvDelay);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -603,7 +627,7 @@ class BuildHandler implements Runnable {
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Responding to " + state.msg.getUniqueId() + "/" + ourId
|
||||
+ " after " + recvDelay + "/" + proactiveDrops + " with " + response
|
||||
+ " after " + recvDelay + " with " + response
|
||||
+ " from " + (state.fromHash != null ? state.fromHash :
|
||||
state.from != null ? state.from.calculateHash() : "tunnel"));
|
||||
|
||||
@@ -635,18 +659,29 @@ class BuildHandler implements Runnable {
|
||||
cfg.setSendTunnelId(DataHelper.toLong(4, nextId));
|
||||
}
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Joining " + state.msg.getUniqueId() + "/" + cfg.getReceiveTunnel() + "/" + recvDelay + " as " + (isOutEnd ? "outbound endpoint" : isInGW ? "inbound gw" : "participant"));
|
||||
|
||||
// now "actually" join
|
||||
boolean success;
|
||||
if (isOutEnd)
|
||||
_context.tunnelDispatcher().joinOutboundEndpoint(cfg);
|
||||
success = _context.tunnelDispatcher().joinOutboundEndpoint(cfg);
|
||||
else if (isInGW)
|
||||
_context.tunnelDispatcher().joinInboundGateway(cfg);
|
||||
success = _context.tunnelDispatcher().joinInboundGateway(cfg);
|
||||
else
|
||||
_context.tunnelDispatcher().joinParticipant(cfg);
|
||||
} else {
|
||||
_context.statManager().addRateData("tunnel.reject." + response, 1, 1);
|
||||
success = _context.tunnelDispatcher().joinParticipant(cfg);
|
||||
if (success) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Joining " + state.msg.getUniqueId() + "/" + cfg.getReceiveTunnel() + "/" + recvDelay + " as " + (isOutEnd ? "outbound endpoint" : isInGW ? "inbound gw" : "participant"));
|
||||
} else {
|
||||
// Dup Tunnel ID. This can definitely happen (birthday paradox).
|
||||
// Probability in 11 minutes (per hop type):
|
||||
// 0.1% for 2900 tunnels; 1% for 9300 tunnels
|
||||
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
|
||||
_context.statManager().addRateData("tunnel.rejectDupID", 1);
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("DUP ID failure " + state.msg.getUniqueId() + "/" + cfg.getReceiveTunnel() + " as " + (isOutEnd ? "outbound endpoint" : isInGW ? "inbound gw" : "participant"));
|
||||
}
|
||||
}
|
||||
if (response != 0) {
|
||||
_context.statManager().addRateData("tunnel.reject." + response, 1);
|
||||
_context.messageHistory().tunnelRejected(state.fromHash, new TunnelId(ourId), nextPeer,
|
||||
"rejecting for " + response + ": " +
|
||||
state.msg.getUniqueId() + "/" + ourId + "/" + req.readNextTunnelId() + " delay " +
|
||||
@@ -662,7 +697,9 @@ class BuildHandler implements Runnable {
|
||||
(! _context.routerHash().equals(nextPeer)) &&
|
||||
(! _context.commSystem().haveOutboundCapacity(81)) &&
|
||||
(! _context.commSystem().isEstablished(nextPeer))) {
|
||||
_context.statManager().addRateData("tunnel.dropConnLimits", 1, 0);
|
||||
_context.statManager().addRateData("tunnel.dropConnLimits", 1);
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Not sending rejection due to conn limits");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -767,7 +804,7 @@ class BuildHandler implements Runnable {
|
||||
// we are the IBEP but we already gave up?
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Dropping the reply " + reqId + ", as we used to be building that");
|
||||
_context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0);
|
||||
_context.statManager().addRateData("tunnel.buildReplyTooSlow", 1);
|
||||
} else {
|
||||
int sz = _inboundBuildMessages.size();
|
||||
// Can probably remove this check, since CoDel is in use
|
||||
@@ -799,7 +836,7 @@ class BuildHandler implements Runnable {
|
||||
_exec.repoll();
|
||||
} else {
|
||||
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load"));
|
||||
_context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz);
|
||||
_context.statManager().addRateData("tunnel.dropLoadBacklog", sz);
|
||||
}
|
||||
//}
|
||||
}
|
||||
@@ -935,7 +972,7 @@ class BuildHandler implements Runnable {
|
||||
|
||||
public void runJob() {
|
||||
getContext().tunnelDispatcher().remove(_cfg);
|
||||
getContext().statManager().addRateData("tunnel.rejectTimeout2", 1, 0);
|
||||
getContext().statManager().addRateData("tunnel.rejectTimeout2", 1);
|
||||
Log log = getContext().logManager().getLog(BuildHandler.class);
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.warn("Timeout contacting next hop for " + _cfg);
|
||||
|
@@ -65,13 +65,24 @@ abstract class BuildRequestor {
|
||||
|
||||
/** new style requests need to fill in the tunnel IDs before hand */
|
||||
private static void prepare(RouterContext ctx, PooledTunnelCreatorConfig cfg) {
|
||||
for (int i = 0; i < cfg.getLength(); i++) {
|
||||
if ( (!cfg.isInbound()) && (i == 0) ) {
|
||||
int len = cfg.getLength();
|
||||
boolean isIB = cfg.isInbound();
|
||||
for (int i = 0; i < len; i++) {
|
||||
if ( (!isIB) && (i == 0) ) {
|
||||
// outbound gateway (us) doesn't receive on a tunnel id
|
||||
if (cfg.getLength() <= 1) // zero hop, pretend to have a send id
|
||||
cfg.getConfig(i).setSendTunnelId(DataHelper.toLong(4, ctx.random().nextLong(TunnelId.MAX_ID_VALUE)));
|
||||
if (len <= 1) { // zero hop, pretend to have a send id
|
||||
long id = ctx.tunnelDispatcher().getNewOBGWID();
|
||||
cfg.getConfig(i).setSendTunnelId(DataHelper.toLong(4, id));
|
||||
}
|
||||
} else {
|
||||
cfg.getConfig(i).setReceiveTunnelId(DataHelper.toLong(4, ctx.random().nextLong(TunnelId.MAX_ID_VALUE)));
|
||||
long id;
|
||||
if (isIB && len == 1)
|
||||
id = ctx.tunnelDispatcher().getNewIBZeroHopID();
|
||||
else if (isIB && i == len - 1)
|
||||
id = ctx.tunnelDispatcher().getNewIBEPID();
|
||||
else
|
||||
id = ctx.random().nextLong(TunnelId.MAX_ID_VALUE);
|
||||
cfg.getConfig(i).setReceiveTunnelId(DataHelper.toLong(4, id));
|
||||
}
|
||||
|
||||
if (i > 0)
|
||||
|
@@ -251,6 +251,10 @@ public class TunnelPoolManager implements TunnelManagerFacade {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Expensive (iterates through all tunnels of all pools) and unnecessary.
|
||||
* @deprecated unused
|
||||
*/
|
||||
public TunnelInfo getTunnelInfo(TunnelId id) {
|
||||
TunnelInfo info = null;
|
||||
for (TunnelPool pool : _clientInboundPools.values()) {
|
||||
|
Reference in New Issue
Block a user