From 597662d0dcf2486ea37b57fa7de885e881f35833 Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 5 Mar 2014 16:32:04 +0000 Subject: [PATCH] * Transports: Don't send a duplicate store of our RI at start of a connection (ticket #1187) * NTCP: - Lower send priority of the RI at exchange - Bob will now send his RI even if he doesn't have Alice's - Send RI again sooner on long-lived connections --- history.txt | 8 +++ .../src/net/i2p/router/OutNetMessage.java | 2 +- .../src/net/i2p/router/RouterVersion.java | 2 +- .../i2p/router/transport/TransportImpl.java | 7 ++- .../router/transport/ntcp/NTCPConnection.java | 59 ++++++++++++------- .../router/transport/ntcp/NTCPTransport.java | 26 +++++++- .../transport/udp/EstablishmentManager.java | 6 +- .../transport/udp/OutboundEstablishState.java | 24 +++++++- 8 files changed, 103 insertions(+), 31 deletions(-) diff --git a/history.txt b/history.txt index d289b7afc..03ca9b73f 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,11 @@ +2014-03-05 zzz + * Transports: Don't send a duplicate store of our RI at + start of a connection (ticket #1187) + * NTCP: + - Lower send priority of the RI at exchange + - Bob will now send his RI even if he doesn't have Alice's + - Send RI again sooner on long-lived connections + 2014-03-05 str4d * Console: Updated website URLs in readme.html files * susimail: Removed remaining Jetty dependencies in susimail diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java index f1aa5519e..6c1635a4d 100644 --- a/router/java/src/net/i2p/router/OutNetMessage.java +++ b/router/java/src/net/i2p/router/OutNetMessage.java @@ -69,13 +69,13 @@ public class OutNetMessage implements CDPQEntry { public static final int PRIORITY_EXPLORATORY = 455; /** may be adjusted +/- 25 for outbound traffic */ public static final int PRIORITY_MY_DATA = 425; - public static final int PRIORITY_MY_NETDB_STORE_LOW = 300; public static final int PRIORITY_HIS_BUILD_REQUEST = 300; public static final int PRIORITY_BUILD_REPLY = 300; public static final int PRIORITY_NETDB_REPLY = 300; public static final int PRIORITY_HIS_NETDB_STORE = 200; public static final int PRIORITY_NETDB_FLOOD = 200; public static final int PRIORITY_PARTICIPATING = 200; + public static final int PRIORITY_MY_NETDB_STORE_LOW = 150; public static final int PRIORITY_NETDB_EXPLORE = 100; public static final int PRIORITY_NETDB_HARVEST = 100; public static final int PRIORITY_LOWEST = 100; diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index e36cd3b14..7f4edb975 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 12; + public final static long BUILD = 13; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index 068f65cb0..3c55dc4e5 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -233,6 +233,11 @@ public abstract class TransportImpl implements Transport { * @param allowRequeue true if we should try other transports if available */ protected void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) { + if (msg.getTarget() == null) { + // Probably injected by the transport. + // Bail out now as it will NPE in a dozen places below. + return; + } boolean log = false; if (sendSuccessful) msg.timestamp("afterSend(successful)"); @@ -244,7 +249,7 @@ public abstract class TransportImpl implements Transport { if (msToSend > 1500) { if (_log.shouldLog(Log.INFO)) - _log.warn(getStyle() + " afterSend slow: " + (sendSuccessful ? "success " : "FAIL ") + _log.info(getStyle() + " afterSend slow: " + (sendSuccessful ? "success " : "FAIL ") + msg.getMessageSize() + " byte " + msg.getMessageType() + ' ' + msg.getMessageId() + " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend + " ms"); diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 1aab41d4a..0ab8a2e04 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -36,6 +36,7 @@ import net.i2p.util.ConcurrentHashSet; import net.i2p.util.HexDump; import net.i2p.util.Log; import net.i2p.util.SystemVersion; +import net.i2p.util.VersionComparator; /** * Coordinate the connection to a single peer. @@ -143,7 +144,8 @@ class NTCPConnection { private static final int META_FREQUENCY = 45*60*1000; /** how often we send our routerinfo unsolicited */ - private static final int INFO_FREQUENCY = 90*60*1000; + private static final int INFO_FREQUENCY = 50*60*1000; + /** * Why this is 16K, and where it is documented, good question? * We claim we can do 32K datagrams so this is a problem. @@ -156,7 +158,8 @@ class NTCPConnection { /** 2 bytes for length and 4 for CRC */ public static final int MAX_MSG_SIZE = BUFFER_SIZE - (2 + 4); - private static final int PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW; + private static final int INFO_PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW; + private static final String FIXED_RI_VERSION = "0.9.12"; /** * Create an inbound connected (though not established) NTCP connection @@ -271,6 +274,7 @@ class NTCPConnection { _context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", toClose.getUptime()); toClose.close(); } + enqueueInfoMessage(); } private synchronized NTCPConnection locked_finishInboundEstablishment( @@ -301,7 +305,9 @@ class NTCPConnection { } public long getMessagesSent() { return _messagesWritten; } + public long getMessagesReceived() { return _messagesRead; } + public long getOutboundQueueSize() { int queued; synchronized(_outbound) { @@ -487,25 +493,36 @@ class NTCPConnection { } } + /** + * Inject a DatabaseStoreMessage with our RouterInfo + */ public void enqueueInfoMessage() { - RouterInfo target = _context.netDb().lookupRouterInfoLocally(_remotePeer.calculateHash()); - if (target != null) { - DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context); - dsm.setEntry(_context.router().getRouterInfo()); - OutNetMessage infoMsg = new OutNetMessage(_context, dsm, _context.clock().now()+10*1000, PRIORITY, target); - infoMsg.beginSend(); - _context.statManager().addRateData("ntcp.infoMessageEnqueued", 1); - send(infoMsg); - - // See comment below - //enqueueFloodfillMessage(target); - } else { - if (_isInbound) { - // ok, we shouldn't have enqueued it yet, as we havent received their info - } else { - // how did we make an outbound connection to someone we don't know about? - } - } + int priority = INFO_PRIORITY; + //if (!_isInbound) { + // Workaround for bug at Bob's end. + // This probably isn't helpful because Bob puts the store on the job queue. + // Prior to 0.9.12, Bob would only send his RI if he had our RI after + // the first received message, so make sure it is first in our queue. + // As of 0.9.12 this is fixed and Bob will always send his RI. + // RouterInfo target = _context.netDb().lookupRouterInfoLocally(_remotePeer.calculateHash()); + // if (target != null) { + // String v = target.getOption("router.version"); + // if (v == null || VersionComparator.comp(v, FIXED_RI_VERSION) < 0) { + // priority = OutNetMessage.PRIORITY_HIGHEST; + // } + // } else { + // priority = OutNetMessage.PRIORITY_HIGHEST; + // } + //} + if (_log.shouldLog(Log.INFO)) + _log.info("SENDING INFO message pri. " + priority + ": " + toString()); + DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context); + dsm.setEntry(_context.router().getRouterInfo()); + // We are injecting directly, so we can use a null target. + OutNetMessage infoMsg = new OutNetMessage(_context, dsm, _context.clock().now()+10*1000, priority, null); + infoMsg.beginSend(); + //_context.statManager().addRateData("ntcp.infoMessageEnqueued", 1); + send(infoMsg); } //private static final int PEERS_TO_FLOOD = 3; @@ -1474,8 +1491,6 @@ class NTCPConnection { if (read != null) { _transport.messageReceived(read, _remotePeer, null, timeToRecv, _size); - if (_messagesRead <= 0) - enqueueInfoMessage(); _lastReceiveTime = System.currentTimeMillis(); _messagesRead++; } diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index d3bc7d30f..77b133201 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -27,6 +27,8 @@ import net.i2p.data.Hash; import net.i2p.data.RouterAddress; import net.i2p.data.RouterIdentity; import net.i2p.data.RouterInfo; +import net.i2p.data.i2np.DatabaseStoreMessage; +import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.CommSystemFacade; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; @@ -142,8 +144,8 @@ public class NTCPTransport extends TransportImpl { //_context.statManager().createRateStat("ntcp.inboundCheckConnection", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.inboundEstablished", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.inboundEstablishedDuplicate", "", "ntcp", RATES); - _context.statManager().createRateStat("ntcp.infoMessageEnqueued", "", "ntcp", RATES); - _context.statManager().createRateStat("ntcp.floodInfoMessageEnqueued", "", "ntcp", RATES); + //_context.statManager().createRateStat("ntcp.infoMessageEnqueued", "", "ntcp", RATES); + //_context.statManager().createRateStat("ntcp.floodInfoMessageEnqueued", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.invalidDH", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.invalidHXY", "", "ntcp", RATES); _context.statManager().createRateStat("ntcp.invalidHXxorBIH", "", "ntcp", RATES); @@ -241,8 +243,26 @@ public class NTCPTransport extends TransportImpl { return; } if (isNew) { - con.enqueueInfoMessage(); // enqueues a netDb store of our own info con.send(msg); // doesn't do anything yet, just enqueues it + // As of 0.9.12, don't send our info if the first message is + // doing the same (common when connecting to a floodfill). + // Also, put the info message after whatever we are trying to send + // (it's a priority queue anyway and the info is low priority) + // Prior to 0.9.12, Bob would not send his RI unless he had ours, + // but that's fixed in 0.9.12. + boolean shouldSkipInfo = false; + I2NPMessage m = msg.getMessage(); + if (m.getType() == DatabaseStoreMessage.MESSAGE_TYPE) { + DatabaseStoreMessage dsm = (DatabaseStoreMessage) m; + if (dsm.getKey().equals(_context.routerHash())) { + shouldSkipInfo = true; + } + } + if (!shouldSkipInfo) { + con.enqueueInfoMessage(); + } else if (_log.shouldLog(Log.INFO)) { + _log.info("SKIPPING INFO message: " + con); + } try { SocketChannel channel = SocketChannel.open(); diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 47d25f971..b471a3c97 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -792,7 +792,11 @@ class EstablishmentManager { _transport.setIP(remote.calculateHash(), state.getSentIP()); _context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0); - sendOurInfo(peer, false); + if (!state.isFirstMessageOurDSM()) { + sendOurInfo(peer, false); + } else if (_log.shouldLog(Log.INFO)) { + _log.info("Skipping publish: " + state); + } OutNetMessage msg; while ((msg = state.getNextQueuedMessage()) != null) { diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java index d81674bb0..8f3aa8d8a 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java @@ -9,6 +9,8 @@ import net.i2p.data.DataHelper; import net.i2p.data.RouterIdentity; import net.i2p.data.SessionKey; import net.i2p.data.Signature; +import net.i2p.data.i2np.DatabaseStoreMessage; +import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; import net.i2p.router.transport.crypto.DHSessionKeyBuilder; @@ -56,6 +58,7 @@ class OutboundEstablishState { private final Queue _queuedMessages; private OutboundState _currentState; private long _introductionNonce; + private boolean _isFirstMessageOurDSM; // intro private final UDPAddress _remoteAddress; private boolean _complete; @@ -151,12 +154,29 @@ class OutboundEstablishState { * Queue a message to be sent after the session is established. */ public void addMessage(OutNetMessage msg) { + if (_queuedMessages.isEmpty()) { + I2NPMessage m = msg.getMessage(); + if (m.getType() == DatabaseStoreMessage.MESSAGE_TYPE) { + DatabaseStoreMessage dsm = (DatabaseStoreMessage) m; + if (dsm.getKey().equals(_context.routerHash())) { + _isFirstMessageOurDSM = true; + } + } + } // chance of a duplicate here in a race, that's ok if (!_queuedMessages.contains(msg)) _queuedMessages.offer(msg); else if (_log.shouldLog(Log.WARN)) _log.warn("attempt to add duplicate msg to queue: " + msg); } + + /** + * Is the first message queued our own DatabaseStoreMessage? + * @since 0.9.12 + */ + public boolean isFirstMessageOurDSM() { + return _isFirstMessageOurDSM; + } /** @return null if none */ public OutNetMessage getNextQueuedMessage() { @@ -260,8 +280,8 @@ class OutboundEstablishState { return false; } if (_receivedSignature != null) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Session created already validated"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Session created already validated"); return true; }