Transports: Periodically send RI to connected peers (Gitlab ticket #356)

This commit is contained in:
zzz
2022-06-03 08:15:01 -04:00
parent 4dd5e7737b
commit 266dc09384
6 changed files with 50 additions and 8 deletions

View File

@@ -1,3 +1,11 @@
2022-06-03 zzz
* SSU: Possible fix for rare HMAC NPE
* Transports: Periodically send RI to connected peers
(Gitlab ticket #356)
2022-06-02 zzz
* SSU2: Relay partial implementation
2022-06-01 zzz 2022-06-01 zzz
* SSU2: Enable peer test * SSU2: Enable peer test

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Git"; public final static String ID = "Git";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 2; public final static long BUILD = 3;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";

View File

@@ -93,6 +93,7 @@ class EventPumper implements Runnable {
private static final long MIN_EXPIRE_IDLE_TIME = 120*1000l; private static final long MIN_EXPIRE_IDLE_TIME = 120*1000l;
private static final long MAX_EXPIRE_IDLE_TIME = 11*60*1000l; private static final long MAX_EXPIRE_IDLE_TIME = 11*60*1000l;
private static final long MAY_DISCON_TIMEOUT = 10*1000; private static final long MAY_DISCON_TIMEOUT = 10*1000;
private static final long RI_STORE_INTERVAL = 29*60*1000;
/** /**
* Do we use direct buffers for reading? Default false. * Do we use direct buffers for reading? Default false.
@@ -312,6 +313,18 @@ class EventPumper implements Runnable {
if (_log.shouldInfo()) if (_log.shouldInfo())
_log.info("Failsafe or expire close for " + con); _log.info("Failsafe or expire close for " + con);
failsafeCloses++; failsafeCloses++;
} else {
// periodically send our RI
long estab = con.getEstablishedOn();
if (estab > 0) {
long uptime = now - estab;
if (uptime >= RI_STORE_INTERVAL) {
long mod = uptime % RI_STORE_INTERVAL;
if (mod < FAILSAFE_ITERATION_FREQ) {
con.sendOurRouterInfo(false);
}
}
}
} }
} catch (CancelledKeyException cke) { } catch (CancelledKeyException cke) {
// cancelled while updating the interest ops. ah well // cancelled while updating the interest ops. ah well

View File

@@ -345,6 +345,15 @@ public class NTCPConnection implements Closeable {
return _context.clock().now() -_establishedOn; return _context.clock().now() -_establishedOn;
} }
/**
* @since 0.9.55
*/
public long getEstablishedOn() {
if (!isEstablished())
return 0;
return _establishedOn;
}
public int getMessagesSent() { return _messagesWritten.get(); } public int getMessagesSent() { return _messagesWritten.get(); }
public int getMessagesReceived() { return _messagesRead.get(); } public int getMessagesReceived() { return _messagesRead.get(); }
@@ -806,9 +815,9 @@ public class NTCPConnection implements Closeable {
/** /**
* NTCP2 only * NTCP2 only
* *
* @since 0.9.36 * @since 0.9.36, pkg private since 0.9.55 for EventPumper
*/ */
private void sendOurRouterInfo(boolean shouldFlood) { void sendOurRouterInfo(boolean shouldFlood) {
RouterInfo ri = _context.router().getRouterInfo(); RouterInfo ri = _context.router().getRouterInfo();
if (ri == null) if (ri == null)
return; return;
@@ -1386,7 +1395,7 @@ public class NTCPConnection implements Closeable {
_context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", toClose.getUptime()); _context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", toClose.getUptime());
toClose.close(); toClose.close();
} }
enqueueInfoMessage(); //enqueueInfoMessage();
} }
/** /**

View File

@@ -1141,9 +1141,9 @@ class EstablishmentManager {
/** /**
* A database store message with our router info * A database store message with our router info
* @return non-null * @return non-null
* @since 0.9.24 split from sendOurInfo() * @since 0.9.24 split from sendOurInfo(), public since 0.9.55 for UDPTransport
*/ */
private DatabaseStoreMessage getOurInfo() { public DatabaseStoreMessage getOurInfo() {
DatabaseStoreMessage m = new DatabaseStoreMessage(_context); DatabaseStoreMessage m = new DatabaseStoreMessage(_context);
m.setEntry(_context.router().getRouterInfo()); m.setEntry(_context.router().getRouterInfo());
m.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT); m.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT);

View File

@@ -3504,6 +3504,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final long EXPIRE_INCREMENT = 15*1000; private static final long EXPIRE_INCREMENT = 15*1000;
private static final long EXPIRE_DECREMENT = 45*1000; private static final long EXPIRE_DECREMENT = 45*1000;
private static final long MAY_DISCON_TIMEOUT = 10*1000; private static final long MAY_DISCON_TIMEOUT = 10*1000;
private static final long RI_STORE_INTERVAL = 29*60*1000;
public ExpirePeerEvent() { public ExpirePeerEvent() {
super(_context.simpleTimer2()); super(_context.simpleTimer2());
@@ -3540,6 +3541,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
int currentListenPort = getListenPort(false); int currentListenPort = getListenPort(false);
boolean pingOneOnly = shouldPingFirewall && getExternalPort(false) == currentListenPort; boolean pingOneOnly = shouldPingFirewall && getExternalPort(false) == currentListenPort;
boolean shortLoop = shouldPingFirewall || !haveCap || _context.netDb().floodfillEnabled(); boolean shortLoop = shouldPingFirewall || !haveCap || _context.netDb().floodfillEnabled();
long loopTime = shortLoop ? SHORT_LOOP_TIME : LONG_LOOP_TIME;
_lastLoopShort = shortLoop; _lastLoopShort = shortLoop;
_expireBuffer.clear(); _expireBuffer.clear();
_runCount++; _runCount++;
@@ -3585,7 +3587,17 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// session, so ping all of them. Otherwise only one. // session, so ping all of them. Otherwise only one.
if (pingOneOnly) if (pingOneOnly)
shouldPingFirewall = false; shouldPingFirewall = false;
} } else {
// periodically send our RI
long uptime = now - peer.getKeyEstablishedTime();
if (uptime >= RI_STORE_INTERVAL) {
long mod = uptime % RI_STORE_INTERVAL;
if (mod < loopTime) {
DatabaseStoreMessage dsm = _establisher.getOurInfo();
send(dsm, peer);
}
}
}
} }
if (!_expireBuffer.isEmpty()) { if (!_expireBuffer.isEmpty()) {
@@ -3603,7 +3615,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
} }
if (_alive) if (_alive)
schedule(shortLoop ? SHORT_LOOP_TIME : LONG_LOOP_TIME); schedule(loopTime);
} }
public void add(PeerState peer) { public void add(PeerState peer) {