From 77f0dd653af996cd5fba0a77ab281f3a079ea6a9 Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 29 Dec 2012 13:40:55 +0000 Subject: [PATCH] - Fix handling of duplicate participating tunnel IDs - Reduce chance of generating duplicate IDs for our tunnels (ticket #812) - Stat cleanup - Comment out effectively unused countProactiveDrops() --- history.txt | 5 + .../src/net/i2p/router/RouterVersion.java | 2 +- .../net/i2p/router/TunnelManagerFacade.java | 2 +- .../dummy/DummyTunnelManagerFacade.java | 1 + .../net/i2p/router/tunnel/HopProcessor.java | 7 +- .../i2p/router/tunnel/TunnelDispatcher.java | 139 +++++++++++++++--- .../i2p/router/tunnel/pool/BuildExecutor.java | 11 +- .../i2p/router/tunnel/pool/BuildHandler.java | 129 ++++++++++------ .../router/tunnel/pool/BuildRequestor.java | 21 ++- .../router/tunnel/pool/TunnelPoolManager.java | 4 + 10 files changed, 234 insertions(+), 87 deletions(-) diff --git a/history.txt b/history.txt index dd7fb33989..001bc78df6 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,8 @@ +2012-12-29 zzz + * i2psnark: Redirect after post + * Javadocs: Fix javax links + * Tunnels: Handle duplicate tunnel IDs (ticket #812) + 2012-12-26 zzz * I2CP: - Prep for delivery of detailed failure codes to the client (ticket #788) diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 282c18b422..0725033fa6 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 2; + public final static long BUILD = 3; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/TunnelManagerFacade.java b/router/java/src/net/i2p/router/TunnelManagerFacade.java index 5e00d7010d..2a35d0c970 100644 --- a/router/java/src/net/i2p/router/TunnelManagerFacade.java +++ b/router/java/src/net/i2p/router/TunnelManagerFacade.java @@ -27,7 +27,7 @@ public interface TunnelManagerFacade extends Service { * Retrieve the information related to a particular tunnel * * @param id the tunnelId as seen at the gateway - * + * @deprecated unused */ TunnelInfo getTunnelInfo(TunnelId id); diff --git a/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java b/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java index 221f2d9e93..5de3db1a35 100644 --- a/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java +++ b/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java @@ -29,6 +29,7 @@ import net.i2p.router.tunnel.pool.TunnelPool; */ public class DummyTunnelManagerFacade implements TunnelManagerFacade { + /** @deprecated unused */ public TunnelInfo getTunnelInfo(TunnelId id) { return null; } public TunnelInfo selectInboundTunnel() { return null; } public TunnelInfo selectInboundTunnel(Hash destination) { return null; } diff --git a/router/java/src/net/i2p/router/tunnel/HopProcessor.java b/router/java/src/net/i2p/router/tunnel/HopProcessor.java index 386f96d798..e0350668e8 100644 --- a/router/java/src/net/i2p/router/tunnel/HopProcessor.java +++ b/router/java/src/net/i2p/router/tunnel/HopProcessor.java @@ -61,11 +61,12 @@ class HopProcessor { public boolean process(byte orig[], int offset, int length, Hash prev) { // prev is null on gateways if (prev != null) { - if (_config.getReceiveFrom() == null) + if (_config.getReceiveFrom() == null) { _config.setReceiveFrom(prev); - if (!_config.getReceiveFrom().equals(prev)) { + } else if (!_config.getReceiveFrom().equals(prev)) { + // shouldn't happen now that we have good dup ID detection in BuildHandler if (_log.shouldLog(Log.ERROR)) - _log.error("Invalid previous peer - attempted hostile loop? from " + prev + _log.error("Attempted mid-tunnel injection from " + prev + ", expected " + _config.getReceiveFrom()); return false; } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index e2a6561afe..c009e9fb38 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -71,13 +71,14 @@ public class TunnelDispatcher implements Service { private final RouterContext _context; private final Log _log; /** us */ - private final Map _outboundGateways; - private final Map _outboundEndpoints; + private final ConcurrentHashMap _outboundGateways; + private final ConcurrentHashMap _outboundEndpoints; /** regular participant or IBEP of our own tunnel */ - private final Map _participants; + private final ConcurrentHashMap _participants; /** regular IBGW or our own zero-hop inbound */ - private final Map _inboundGateways; - private final Map _participatingConfig; + private final ConcurrentHashMap _inboundGateways; + /** anything we did not create - IBGW, OBEP, or middle hop */ + private final ConcurrentHashMap _participatingConfig; /** what is the date/time on which the last non-locally-created tunnel expires? */ private long _lastParticipatingExpiration; private BloomFilterIVValidator _validator; @@ -85,6 +86,7 @@ public class TunnelDispatcher implements Service { /** what is the date/time we last deliberately dropped a tunnel? **/ //private long _lastDropTime; private final TunnelGatewayPumper _pumper; + private final Object _joinParticipantLock = new Object(); /** for shouldDropParticipatingMessage() */ enum Location {OBEP, PARTICIPANT, IBGW} @@ -233,93 +235,121 @@ public class TunnelDispatcher implements Service { /** * We are the outbound gateway - we created this tunnel + * + * @return success; false if Tunnel ID is a duplicate */ - public void joinOutbound(TunnelCreatorConfig cfg) { + public boolean joinOutbound(TunnelCreatorConfig cfg) { if (_log.shouldLog(Log.INFO)) _log.info("Outbound built successfully: " + cfg); + TunnelGateway gw; if (cfg.getLength() > 1) { TunnelGateway.QueuePreprocessor preproc = createPreprocessor(cfg); TunnelGateway.Sender sender = new OutboundSender(_context, cfg); TunnelGateway.Receiver receiver = new OutboundReceiver(_context, cfg); //TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); - TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper); - TunnelId outId = cfg.getConfig(0).getSendTunnel(); - _outboundGateways.put(outId, gw); + gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper); + } else { + gw = new TunnelGatewayZeroHop(_context, cfg); + } + TunnelId outId = cfg.getConfig(0).getSendTunnel(); + if (_outboundGateways.putIfAbsent(outId, gw) != null) + return false; + if (cfg.getLength() > 1) { _context.statManager().addRateData("tunnel.joinOutboundGateway", 1); _context.messageHistory().tunnelJoined("outbound", cfg); } else { - TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); - TunnelId outId = cfg.getConfig(0).getSendTunnel(); - _outboundGateways.put(outId, gw); _context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1); _context.messageHistory().tunnelJoined("outboundZeroHop", cfg); } + return true; } /** * We are the inbound endpoint - we created this tunnel + * + * @return success; false if Tunnel ID is a duplicate */ - public void joinInbound(TunnelCreatorConfig cfg) { + public boolean joinInbound(TunnelCreatorConfig cfg) { if (_log.shouldLog(Log.INFO)) _log.info("Inbound built successfully: " + cfg); if (cfg.getLength() > 1) { TunnelParticipant participant = new TunnelParticipant(_context, new InboundEndpointProcessor(_context, cfg, _validator)); TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel(); - _participants.put(recvId, participant); + if (_participants.putIfAbsent(recvId, participant) != null) + return false; _context.statManager().addRateData("tunnel.joinInboundEndpoint", 1); _context.messageHistory().tunnelJoined("inboundEndpoint", cfg); } else { TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); TunnelId recvId = cfg.getConfig(0).getReceiveTunnel(); - _inboundGateways.put(recvId, gw); + if (_inboundGateways.putIfAbsent(recvId, gw) != null) + return false; _context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1); _context.messageHistory().tunnelJoined("inboundEndpointZeroHop", cfg); } + return true; } /** * We are a participant in this tunnel, but not as the endpoint or gateway * + * @return success; false if Tunnel ID is a duplicate */ - public void joinParticipant(HopConfig cfg) { + public boolean joinParticipant(HopConfig cfg) { if (_log.shouldLog(Log.INFO)) _log.info("Joining as participant: " + cfg); TunnelId recvId = cfg.getReceiveTunnel(); TunnelParticipant participant = new TunnelParticipant(_context, cfg, new HopProcessor(_context, cfg, _validator)); - _participants.put(recvId, participant); - _participatingConfig.put(recvId, cfg); + synchronized (_joinParticipantLock) { + if (_participatingConfig.putIfAbsent(recvId, cfg) != null) + return false; + if (_participants.putIfAbsent(recvId, participant) != null) { + _participatingConfig.remove(recvId); + return false; + } + } _context.messageHistory().tunnelJoined("participant", cfg); _context.statManager().addRateData("tunnel.joinParticipant", 1); if (cfg.getExpiration() > _lastParticipatingExpiration) _lastParticipatingExpiration = cfg.getExpiration(); _leaveJob.add(cfg); + return true; } /** * We are the outbound endpoint in this tunnel, and did not create it * + * @return success; false if Tunnel ID is a duplicate */ - public void joinOutboundEndpoint(HopConfig cfg) { + public boolean joinOutboundEndpoint(HopConfig cfg) { if (_log.shouldLog(Log.INFO)) _log.info("Joining as OBEP: " + cfg); TunnelId recvId = cfg.getReceiveTunnel(); OutboundTunnelEndpoint endpoint = new OutboundTunnelEndpoint(_context, cfg, new HopProcessor(_context, cfg, _validator)); - _outboundEndpoints.put(recvId, endpoint); - _participatingConfig.put(recvId, cfg); + synchronized (_joinParticipantLock) { + if (_participatingConfig.putIfAbsent(recvId, cfg) != null) + return false; + if (_outboundEndpoints.putIfAbsent(recvId, endpoint) != null) { + _participatingConfig.remove(recvId); + return false; + } + } _context.messageHistory().tunnelJoined("outboundEndpoint", cfg); _context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1); if (cfg.getExpiration() > _lastParticipatingExpiration) _lastParticipatingExpiration = cfg.getExpiration(); _leaveJob.add(cfg); + return true; } /** * We are the inbound gateway in this tunnel, and did not create it * + * @return success; false if Tunnel ID is a duplicate */ - public void joinInboundGateway(HopConfig cfg) { + public boolean joinInboundGateway(HopConfig cfg) { if (_log.shouldLog(Log.INFO)) _log.info("Joining as IBGW: " + cfg); TunnelGateway.QueuePreprocessor preproc = createPreprocessor(cfg); @@ -328,20 +358,77 @@ public class TunnelDispatcher implements Service { //TunnelGateway gw = new TunnelGateway(_context, preproc, sender, receiver); TunnelGateway gw = new ThrottledPumpedTunnelGateway(_context, preproc, sender, receiver, _pumper, cfg); TunnelId recvId = cfg.getReceiveTunnel(); - _inboundGateways.put(recvId, gw); - _participatingConfig.put(recvId, cfg); + synchronized (_joinParticipantLock) { + if (_participatingConfig.putIfAbsent(recvId, cfg) != null) + return false; + if (_inboundGateways.putIfAbsent(recvId, gw) != null) { + _participatingConfig.remove(recvId); + return false; + } + } _context.messageHistory().tunnelJoined("inboundGateway", cfg); _context.statManager().addRateData("tunnel.joinInboundGateway", 1); if (cfg.getExpiration() > _lastParticipatingExpiration) _lastParticipatingExpiration = cfg.getExpiration(); _leaveJob.add(cfg); + return true; } public int getParticipatingCount() { return _participatingConfig.size(); } + /** + * Get a new random send tunnel ID that isn't a dup. + * Note that we do not keep track of IDs for pending builds so this + * does not fully prevent joinOutbound() from failing later. + * @since 0.9.5 + */ + public long getNewOBGWID() { + long rv; + TunnelId tid; + do { + rv = _context.random().nextLong(TunnelId.MAX_ID_VALUE); + tid = new TunnelId(rv); + } while (_outboundGateways.containsKey(tid)); + return rv; + } + + /** + * Get a new random receive tunnel ID that isn't a dup. + * Not for zero hop tunnels. + * Note that we do not keep track of IDs for pending builds so this + * does not fully prevent joinInbound() from failing later. + * @since 0.9.5 + */ + public long getNewIBEPID() { + long rv; + TunnelId tid; + do { + rv = _context.random().nextLong(TunnelId.MAX_ID_VALUE); + tid = new TunnelId(rv); + } while (_participants.containsKey(tid)); + return rv; + } + + /** + * Get a new random receive tunnel ID that isn't a dup. + * For zero hop tunnels only. + * Note that we do not keep track of IDs for pending builds so this + * does not fully prevent joinInbound() from failing later. + * @since 0.9.5 + */ + public long getNewIBZeroHopID() { + long rv; + TunnelId tid; + do { + rv = _context.random().nextLong(TunnelId.MAX_ID_VALUE); + tid = new TunnelId(rv); + } while (_inboundGateways.containsKey(tid)); + return rv; + } + /******* may be used for congestion control later... public int getParticipatingInboundGatewayCount() { return _inboundGateways.size(); @@ -598,6 +685,10 @@ public class TunnelDispatcher implements Service { // _context.statManager().addRateData("tunnel.dispatchOutboundTime", dispatchTime, dispatchTime); } + /** + * Only for console TunnelRenderer. + * @return a copy + */ public List listParticipatingTunnels() { return new ArrayList(_participatingConfig.values()); } diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java index 019dad51cb..3ed3c18ebb 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java @@ -466,13 +466,10 @@ class BuildExecutor implements Runnable { void buildTunnel(TunnelPool pool, PooledTunnelCreatorConfig cfg) { long beforeBuild = System.currentTimeMillis(); if (cfg.getLength() > 1) { - // should we allow an ID of 0? - cfg.setReplyMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE)); - if (addToBuilding(cfg)) { - _log.error("Dup reply ID: " + cfg.getReplyMessageId()); - // fail - return; - } + do { + // should we allow an ID of 0? + cfg.setReplyMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE)); + } while (addToBuilding(cfg)); // if a dup, go araound again } BuildRequestor.request(_context, pool, cfg, this); long buildTime = System.currentTimeMillis() - beforeBuild; diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java index ba0b0e56b3..87cf05cf3a 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java @@ -93,6 +93,9 @@ class BuildHandler implements Runnable { _context.statManager().createRequiredRateStat("tunnel.decryptRequestTime", "Time to decrypt a build request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRequiredRateStat("tunnel.rejectTimeout", "Reject tunnel count (unknown next hop)", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRequiredRateStat("tunnel.rejectTimeout2", "Reject tunnel count (can't contact next hop)", "Tunnels", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRequiredRateStat("tunnel.rejectDupID", "Part. tunnel dup ID", "Tunnels", new long[] { 24*60*60*1000 }); + _context.statManager().createRequiredRateStat("tunnel.ownDupID", "Our tunnel dup. ID", "Tunnels", new long[] { 24*60*60*1000 }); + _context.statManager().createRequiredRateStat("tunnel.rejectHostile", "Reject malicious tunnel", "Tunnels", new long[] { 24*60*60*1000 }); _context.statManager().createRequiredRateStat("tunnel.rejectOverloaded", "Delay to process rejected request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRequiredRateStat("tunnel.acceptLoad", "Delay to process accepted request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 }); @@ -189,7 +192,7 @@ class BuildHandler implements Runnable { if (_log.shouldLog(Log.WARN)) _log.warn("Not even trying to handle/decrypt the request " + state.msg.getUniqueId() + ", since we received it a long time ago: " + (now - state.recvTime)); - _context.statManager().addRateData("tunnel.dropLoadDelay", now - state.recvTime, 0); + _context.statManager().addRateData("tunnel.dropLoadDelay", now - state.recvTime); _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow")); return; } @@ -212,7 +215,7 @@ class BuildHandler implements Runnable { // cannot handle - not pending... took too long? if (_log.shouldLog(Log.WARN)) _log.warn("The reply " + replyMessageId + " did not match any pending tunnels"); - _context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0); + _context.statManager().addRateData("tunnel.buildReplyTooSlow", 1); } else { handleReply(state.msg, cfg, System.currentTimeMillis()-state.recvTime); } @@ -258,9 +261,9 @@ class BuildHandler implements Runnable { else if (_log.shouldLog(Log.WARN)) _log.warn("Failed detecting bwTier, null routerInfo for: " + peer); // Record that a peer of the given tier agreed or rejected if (howBad == 0) { - _context.statManager().addRateData("tunnel.tierAgree" + bwTier, 1, 0); + _context.statManager().addRateData("tunnel.tierAgree" + bwTier, 1); } else { - _context.statManager().addRateData("tunnel.tierReject" + bwTier, 1, 0); + _context.statManager().addRateData("tunnel.tierReject" + bwTier, 1); } if (_log.shouldLog(Log.INFO)) _log.info(msg.getUniqueId() + ": Peer " + peer + " replied with status " + howBad); @@ -272,17 +275,17 @@ class BuildHandler implements Runnable { allAgree = false; switch (howBad) { case TunnelHistory.TUNNEL_REJECT_BANDWIDTH: - _context.statManager().addRateData("tunnel.receiveRejectionBandwidth", 1, 0); + _context.statManager().addRateData("tunnel.receiveRejectionBandwidth", 1); break; case TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD: - _context.statManager().addRateData("tunnel.receiveRejectionTransient", 1, 0); + _context.statManager().addRateData("tunnel.receiveRejectionTransient", 1); break; case TunnelHistory.TUNNEL_REJECT_PROBABALISTIC_REJECT: - _context.statManager().addRateData("tunnel.receiveRejectionProbabalistic", 1, 0); + _context.statManager().addRateData("tunnel.receiveRejectionProbabalistic", 1); break; case TunnelHistory.TUNNEL_REJECT_CRIT: default: - _context.statManager().addRateData("tunnel.receiveRejectionCritical", 1, 0); + _context.statManager().addRateData("tunnel.receiveRejectionCritical", 1); } // penalize peer based on their bitchiness level _context.profileManager().tunnelRejected(peer, rtt, howBad); @@ -292,10 +295,20 @@ class BuildHandler implements Runnable { if (allAgree) { // wikked, completely build + boolean success; if (cfg.isInbound()) - _context.tunnelDispatcher().joinInbound(cfg); + success = _context.tunnelDispatcher().joinInbound(cfg); else - _context.tunnelDispatcher().joinOutbound(cfg); + success = _context.tunnelDispatcher().joinOutbound(cfg); + if (!success) { + // This will happen very rarely. We check for dups when + // creating the config, but we don't track IDs for builds in progress. + _context.statManager().addRateData("tunnel.ownDupID", 1); + _exec.buildComplete(cfg, cfg.getTunnelPool()); + if (_log.shouldLog(Log.WARN)) + _log.warn("Dup ID for our own tunnel " + cfg); + return; + } cfg.getTunnelPool().addTunnel(cfg); // self.self.self.foo! // call buildComplete() after addTunnel() so we don't try another build. _exec.buildComplete(cfg, cfg.getTunnelPool()); @@ -305,21 +318,21 @@ class BuildHandler implements Runnable { cfg.setExpireJob(expireJob); _context.jobQueue().addJob(expireJob); if (cfg.getDestination() == null) - _context.statManager().addRateData("tunnel.buildExploratorySuccess", rtt, rtt); + _context.statManager().addRateData("tunnel.buildExploratorySuccess", rtt); else - _context.statManager().addRateData("tunnel.buildClientSuccess", rtt, rtt); + _context.statManager().addRateData("tunnel.buildClientSuccess", rtt); } else { // someone is no fun _exec.buildComplete(cfg, cfg.getTunnelPool()); if (cfg.getDestination() == null) - _context.statManager().addRateData("tunnel.buildExploratoryReject", rtt, rtt); + _context.statManager().addRateData("tunnel.buildExploratoryReject", rtt); else - _context.statManager().addRateData("tunnel.buildClientReject", rtt, rtt); + _context.statManager().addRateData("tunnel.buildClientReject", rtt); } } else { if (_log.shouldLog(Log.WARN)) _log.warn(msg.getUniqueId() + ": Tunnel reply could not be decrypted for tunnel " + cfg); - _context.statManager().addRateData("tunnel.corruptBuildReply", 1, 0); + _context.statManager().addRateData("tunnel.corruptBuildReply", 1); // don't leak _exec.buildComplete(cfg, cfg.getTunnelPool()); } @@ -337,7 +350,7 @@ class BuildHandler implements Runnable { if (_log.shouldLog(Log.WARN)) _log.warn("Not even trying to handle/decrypt the request " + state.msg.getUniqueId() + ", since we received it a long time ago: " + timeSinceReceived); - _context.statManager().addRateData("tunnel.dropLoadDelay", timeSinceReceived, 0); + _context.statManager().addRateData("tunnel.dropLoadDelay", timeSinceReceived); return -1; } // ok, this is not our own tunnel, so we need to do some heavy lifting @@ -346,7 +359,7 @@ class BuildHandler implements Runnable { long beforeDecrypt = System.currentTimeMillis(); BuildRequestRecord req = _processor.decrypt(_context, state.msg, _context.routerHash(), _context.keyManager().getPrivateKey()); long decryptTime = System.currentTimeMillis() - beforeDecrypt; - _context.statManager().addRateData("tunnel.decryptRequestTime", decryptTime, decryptTime); + _context.statManager().addRateData("tunnel.decryptRequestTime", decryptTime); if (decryptTime > 500 && _log.shouldLog(Log.WARN)) _log.warn("Took too long to decrypt the request: " + decryptTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago"); if (req == null) { @@ -416,11 +429,11 @@ class BuildHandler implements Runnable { RouterInfo ri = getContext().netDb().lookupRouterInfoLocally(_nextPeer); if (ri != null) { handleReq(ri, _state, _req, _nextPeer); - getContext().statManager().addRateData("tunnel.buildLookupSuccess", 1, 0); + getContext().statManager().addRateData("tunnel.buildLookupSuccess", 1); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Deferred successfully, but we couldnt find " + _nextPeer); - getContext().statManager().addRateData("tunnel.buildLookupSuccess", 0, 0); + getContext().statManager().addRateData("tunnel.buildLookupSuccess", 0); } } } @@ -437,8 +450,8 @@ class BuildHandler implements Runnable { } public String getName() { return "Timeout looking for next peer for tunnel join"; } public void runJob() { - getContext().statManager().addRateData("tunnel.rejectTimeout", 1, 0); - getContext().statManager().addRateData("tunnel.buildLookupSuccess", 0, 0); + getContext().statManager().addRateData("tunnel.rejectTimeout", 1); + getContext().statManager().addRateData("tunnel.buildLookupSuccess", 0); // logging commented out so class can be static //if (_log.shouldLog(Log.WARN)) // _log.warn("Request " + _state.msg.getUniqueId() @@ -455,6 +468,7 @@ class BuildHandler implements Runnable { * If we are dropping lots of requests before even trying to handle them, * I suppose you could call us "overloaded" */ +/**** unused, see handleReq() below private final static int MAX_PROACTIVE_DROPS = 240; private int countProactiveDrops() { @@ -465,6 +479,7 @@ class BuildHandler implements Runnable { dropped += countEvents("tunnel.dropLoadDelay", 60*1000); return dropped; } + private int countEvents(String stat, long period) { RateStat rs = _context.statManager().getRate(stat); if (rs != null) { @@ -474,6 +489,7 @@ class BuildHandler implements Runnable { } return 0; } +****/ /** * Actually process the request and send the reply. @@ -491,6 +507,7 @@ class BuildHandler implements Runnable { // Loop checks if ((!isOutEnd) && _context.routerHash().equals(nextPeer)) { + _context.statManager().addRateData("tunnel.rejectHostile", 1); // We are 2 hops in a row? Drop it without a reply. // No way to recognize if we are every other hop, but see below _log.error("Dropping build request, we the next hop"); @@ -502,6 +519,7 @@ class BuildHandler implements Runnable { if (from == null) from = state.from.calculateHash(); if (_context.routerHash().equals(from)) { + _context.statManager().addRateData("tunnel.rejectHostile", 1); _log.error("Dropping build request, we are the previous hop"); return; } @@ -513,6 +531,7 @@ class BuildHandler implements Runnable { // Previous and next hop the same? Don't help somebody be evil. Drop it without a reply. // A-B-C-A is not preventable if (nextPeer.equals(from)) { + _context.statManager().addRateData("tunnel.rejectHostile", 1); _log.error("Dropping build request with the same previous and next hop"); return; } @@ -525,30 +544,35 @@ class BuildHandler implements Runnable { int ourSlot = -1; int response = _context.throttle().acceptTunnelRequest(); - if (_context.tunnelManager().getTunnelInfo(new TunnelId(ourId)) != null) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Already participating in a tunnel with the given Id (" + ourId + "), so gotta reject"); - if (response == 0) - response = TunnelHistory.TUNNEL_REJECT_PROBABALISTIC_REJECT; - } + + // This only checked OUR tunnels, so the log message was wrong. + // Now checked by TunnelDispatcher.joinXXX() + // and returned as success value, checked below. + //if (_context.tunnelManager().getTunnelInfo(new TunnelId(ourId)) != null) { + // if (_log.shouldLog(Log.ERROR)) + // _log.error("Already participating in a tunnel with the given Id (" + ourId + "), so gotta reject"); + // if (response == 0) + // response = TunnelHistory.TUNNEL_REJECT_PROBABALISTIC_REJECT; + //} //if ( (response == 0) && (_context.random().nextInt(50) <= 1) ) // response = TunnelHistory.TUNNEL_REJECT_PROBABALISTIC_REJECT; - int proactiveDrops = countProactiveDrops(); long recvDelay = _context.clock().now()-state.recvTime; if (response == 0) { + // unused + //int proactiveDrops = countProactiveDrops(); float pDrop = ((float) recvDelay) / (float) (BuildRequestor.REQUEST_TIMEOUT*3); pDrop = (float)Math.pow(pDrop, 16); if (_context.random().nextFloat() < pDrop) { // || (proactiveDrops > MAX_PROACTIVE_DROPS) ) ) { - _context.statManager().addRateData("tunnel.rejectOverloaded", recvDelay, proactiveDrops); + _context.statManager().addRateData("tunnel.rejectOverloaded", recvDelay); _context.throttle().setTunnelStatus(_x("Rejecting tunnels: Request overload")); - if (true || (proactiveDrops < MAX_PROACTIVE_DROPS*2)) + //if (true || (proactiveDrops < MAX_PROACTIVE_DROPS*2)) response = TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD; - else - response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH; + //else + // response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH; } else { - _context.statManager().addRateData("tunnel.acceptLoad", recvDelay, recvDelay); + _context.statManager().addRateData("tunnel.acceptLoad", recvDelay); } } @@ -603,7 +627,7 @@ class BuildHandler implements Runnable { if (_log.shouldLog(Log.DEBUG)) _log.debug("Responding to " + state.msg.getUniqueId() + "/" + ourId - + " after " + recvDelay + "/" + proactiveDrops + " with " + response + + " after " + recvDelay + " with " + response + " from " + (state.fromHash != null ? state.fromHash : state.from != null ? state.from.calculateHash() : "tunnel")); @@ -635,18 +659,29 @@ class BuildHandler implements Runnable { cfg.setSendTunnelId(DataHelper.toLong(4, nextId)); } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Joining " + state.msg.getUniqueId() + "/" + cfg.getReceiveTunnel() + "/" + recvDelay + " as " + (isOutEnd ? "outbound endpoint" : isInGW ? "inbound gw" : "participant")); - // now "actually" join + boolean success; if (isOutEnd) - _context.tunnelDispatcher().joinOutboundEndpoint(cfg); + success = _context.tunnelDispatcher().joinOutboundEndpoint(cfg); else if (isInGW) - _context.tunnelDispatcher().joinInboundGateway(cfg); + success = _context.tunnelDispatcher().joinInboundGateway(cfg); else - _context.tunnelDispatcher().joinParticipant(cfg); - } else { - _context.statManager().addRateData("tunnel.reject." + response, 1, 1); + success = _context.tunnelDispatcher().joinParticipant(cfg); + if (success) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Joining " + state.msg.getUniqueId() + "/" + cfg.getReceiveTunnel() + "/" + recvDelay + " as " + (isOutEnd ? "outbound endpoint" : isInGW ? "inbound gw" : "participant")); + } else { + // Dup Tunnel ID. This can definitely happen (birthday paradox). + // Probability in 11 minutes (per hop type): + // 0.1% for 2900 tunnels; 1% for 9300 tunnels + response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH; + _context.statManager().addRateData("tunnel.rejectDupID", 1); + if (_log.shouldLog(Log.WARN)) + _log.warn("DUP ID failure " + state.msg.getUniqueId() + "/" + cfg.getReceiveTunnel() + " as " + (isOutEnd ? "outbound endpoint" : isInGW ? "inbound gw" : "participant")); + } + } + if (response != 0) { + _context.statManager().addRateData("tunnel.reject." + response, 1); _context.messageHistory().tunnelRejected(state.fromHash, new TunnelId(ourId), nextPeer, "rejecting for " + response + ": " + state.msg.getUniqueId() + "/" + ourId + "/" + req.readNextTunnelId() + " delay " + @@ -662,7 +697,9 @@ class BuildHandler implements Runnable { (! _context.routerHash().equals(nextPeer)) && (! _context.commSystem().haveOutboundCapacity(81)) && (! _context.commSystem().isEstablished(nextPeer))) { - _context.statManager().addRateData("tunnel.dropConnLimits", 1, 0); + _context.statManager().addRateData("tunnel.dropConnLimits", 1); + if (_log.shouldLog(Log.WARN)) + _log.warn("Not sending rejection due to conn limits"); return; } @@ -767,7 +804,7 @@ class BuildHandler implements Runnable { // we are the IBEP but we already gave up? if (_log.shouldLog(Log.WARN)) _log.warn("Dropping the reply " + reqId + ", as we used to be building that"); - _context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0); + _context.statManager().addRateData("tunnel.buildReplyTooSlow", 1); } else { int sz = _inboundBuildMessages.size(); // Can probably remove this check, since CoDel is in use @@ -799,7 +836,7 @@ class BuildHandler implements Runnable { _exec.repoll(); } else { _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load")); - _context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz); + _context.statManager().addRateData("tunnel.dropLoadBacklog", sz); } //} } @@ -935,7 +972,7 @@ class BuildHandler implements Runnable { public void runJob() { getContext().tunnelDispatcher().remove(_cfg); - getContext().statManager().addRateData("tunnel.rejectTimeout2", 1, 0); + getContext().statManager().addRateData("tunnel.rejectTimeout2", 1); Log log = getContext().logManager().getLog(BuildHandler.class); if (log.shouldLog(Log.WARN)) log.warn("Timeout contacting next hop for " + _cfg); diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java b/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java index 31f807c7c0..ffd7550658 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java @@ -65,13 +65,24 @@ abstract class BuildRequestor { /** new style requests need to fill in the tunnel IDs before hand */ private static void prepare(RouterContext ctx, PooledTunnelCreatorConfig cfg) { - for (int i = 0; i < cfg.getLength(); i++) { - if ( (!cfg.isInbound()) && (i == 0) ) { + int len = cfg.getLength(); + boolean isIB = cfg.isInbound(); + for (int i = 0; i < len; i++) { + if ( (!isIB) && (i == 0) ) { // outbound gateway (us) doesn't receive on a tunnel id - if (cfg.getLength() <= 1) // zero hop, pretend to have a send id - cfg.getConfig(i).setSendTunnelId(DataHelper.toLong(4, ctx.random().nextLong(TunnelId.MAX_ID_VALUE))); + if (len <= 1) { // zero hop, pretend to have a send id + long id = ctx.tunnelDispatcher().getNewOBGWID(); + cfg.getConfig(i).setSendTunnelId(DataHelper.toLong(4, id)); + } } else { - cfg.getConfig(i).setReceiveTunnelId(DataHelper.toLong(4, ctx.random().nextLong(TunnelId.MAX_ID_VALUE))); + long id; + if (isIB && len == 1) + id = ctx.tunnelDispatcher().getNewIBZeroHopID(); + else if (isIB && i == len - 1) + id = ctx.tunnelDispatcher().getNewIBEPID(); + else + id = ctx.random().nextLong(TunnelId.MAX_ID_VALUE); + cfg.getConfig(i).setReceiveTunnelId(DataHelper.toLong(4, id)); } if (i > 0) diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index a066338734..464a5791ca 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -251,6 +251,10 @@ public class TunnelPoolManager implements TunnelManagerFacade { return null; } + /** + * Expensive (iterates through all tunnels of all pools) and unnecessary. + * @deprecated unused + */ public TunnelInfo getTunnelInfo(TunnelId id) { TunnelInfo info = null; for (TunnelPool pool : _clientInboundPools.values()) {