Merge branch 'ssu-low-latency-2' into 'master'

SSU low-latency changes pt1

See merge request i2p-hackers/i2p.i2p!37
This commit is contained in:
Zlatin Balevsky
2021-09-17 16:36:42 +00:00
2 changed files with 30 additions and 14 deletions

View File

@@ -78,7 +78,7 @@ class ACKSender implements Runnable {
// the unsentACKThreshold to figure out when to send an ACK instead of
// using the timer, so we can set the timeout/frequency higher
if (timeSinceACK < 2*1000)
return Math.max(rtt/2, ACK_FREQUENCY);
return Math.min(rtt/2, ACK_FREQUENCY);
else
return ACK_FREQUENCY;
}

View File

@@ -121,6 +121,7 @@ public class PeerState {
private int _sendWindowBytes;
/** how many bytes can we send to the peer in the current second */
private int _sendWindowBytesRemaining;
private final Object _sendWindowBytesRemainingLock = new Object();
private final BandwidthEstimator _bwEstimator;
// smoothed value, for display only
private int _receiveBps;
@@ -452,7 +453,7 @@ public class PeerState {
/** how many bytes can we send to the peer in the current second */
public int getSendWindowBytesRemaining() {
synchronized(_outboundMessages) {
synchronized(_sendWindowBytesRemainingLock) {
return _sendWindowBytesRemaining;
}
}
@@ -606,10 +607,11 @@ public class PeerState {
_context.statManager().addRateData("udp.rejectConcurrentActive", _outboundMessages.size(), _consecutiveRejections);
return false;
}
if (_sendWindowBytesRemaining <= fragmentOverhead())
final int sendRemaining = getSendWindowBytesRemaining();
if (sendRemaining <= fragmentOverhead())
return false;
int size = state.getSendSize(_sendWindowBytesRemaining);
int size = state.getSendSize(sendRemaining);
if (size > 0) {
if (messagePushCount == 0) {
_context.statManager().addRateData("udp.allowConcurrentActive", _outboundMessages.size(), _concurrentMessagesAllowed);
@@ -617,7 +619,9 @@ public class PeerState {
_context.statManager().addRateData("udp.rejectConcurrentSequence", _consecutiveRejections, _outboundMessages.size());
_consecutiveRejections = 0;
}
_sendWindowBytesRemaining -= size;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining -= size;
}
_lastSendTime = now;
return true;
} else {
@@ -1053,14 +1057,18 @@ public class PeerState {
if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed;
_sendWindowBytesRemaining += bytesACKed;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining += bytesACKed;
}
} else {
float prob = ((float)bytesACKed) / ((float)(_sendWindowBytes<<1));
float v = _context.random().nextFloat();
if (v < 0) v = 0-v;
if (v <= prob) {
_sendWindowBytes += bytesACKed;
_sendWindowBytesRemaining += bytesACKed;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining += bytesACKed;
}
}
}
} else {
@@ -1074,9 +1082,11 @@ public class PeerState {
_lastReceiveTime = _context.clock().now();
_lastSendFullyTime = _lastReceiveTime;
_sendWindowBytesRemaining += bytesACKed;
if (_sendWindowBytesRemaining > _sendWindowBytes)
_sendWindowBytesRemaining = _sendWindowBytes;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining += bytesACKed;
if (_sendWindowBytesRemaining > _sendWindowBytes)
_sendWindowBytesRemaining = _sendWindowBytes;
}
if (numSends < 2) {
// caller synchs
@@ -1458,7 +1468,7 @@ public class PeerState {
}
if (failedSize > 0) {
// restore the window
synchronized(this) {
synchronized(_sendWindowBytesRemainingLock) {
// this isn't exactly right, because some fragments may not have been sent at all,
// but that should be unlikely
_sendWindowBytesRemaining += failedSize;
@@ -1935,7 +1945,9 @@ public class PeerState {
if (continueFast) {
// RFC 5681 sec. 3.2 #4 increase cwnd
_sendWindowBytes += _mtu;
_sendWindowBytesRemaining += _mtu;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining += _mtu;
}
if (_log.shouldDebug())
_log.debug("Continue FAST RTX, inflated window: " + this);
} else if (startFast) {
@@ -1945,7 +1957,9 @@ public class PeerState {
_slowStartThreshold = Math.max((int)(bwe * _rtt), 2 * _mtu);
// RFC 5681 sec. 3.2 #3 set cwnd
_sendWindowBytes = _slowStartThreshold + (3 * _mtu);
_sendWindowBytesRemaining = _sendWindowBytes;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining = _sendWindowBytes;
}
if (_log.shouldDebug())
_log.debug("Start of FAST RTX, inflated window: " + this);
}
@@ -1973,7 +1987,9 @@ public class PeerState {
synchronized(this) {
// RFC 5681 sec. 2.4 #6 deflate the window
_sendWindowBytes = _slowStartThreshold;
_sendWindowBytesRemaining = _sendWindowBytes;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining = _sendWindowBytes;
}
}
if (_log.shouldDebug())
_log.debug("End of FAST RTX, deflated window: " + this);