forked from I2P_Developers/i2p.i2p
Compare commits
2 Commits
configadva
...
test-red-2
Author | SHA1 | Date | |
---|---|---|---|
![]() |
0348905f34 | ||
![]() |
aa4a0419ba |
@@ -7,7 +7,7 @@ import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.util.PQEntry;
|
||||
import net.i2p.util.I2PThread;
|
||||
import net.i2p.util.Log;
|
||||
@@ -33,7 +33,7 @@ import net.i2p.util.Log;
|
||||
*/
|
||||
public class FIFOBandwidthLimiter {
|
||||
private final Log _log;
|
||||
private final I2PAppContext _context;
|
||||
private final RouterContext _context;
|
||||
private final List<SimpleRequest> _pendingInboundRequests;
|
||||
private final List<SimpleRequest> _pendingOutboundRequests;
|
||||
/** how many bytes we can consume for inbound transmission immediately */
|
||||
@@ -84,7 +84,7 @@ public class FIFOBandwidthLimiter {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public FIFOBandwidthLimiter(I2PAppContext context) {
|
||||
public FIFOBandwidthLimiter(RouterContext context) {
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(FIFOBandwidthLimiter.class);
|
||||
_context.statManager().createRateStat("bwLimiter.pendingOutboundRequests", "How many outbound requests are ahead of the current one (ignoring ones with 0)?", "BandwidthLimiter", new long[] { 5*60*1000l, 60*60*1000l });
|
||||
@@ -192,8 +192,8 @@ public class FIFOBandwidthLimiter {
|
||||
* @param size bytes
|
||||
* @since 0.8.12
|
||||
*/
|
||||
public void sentParticipatingMessage(int size) {
|
||||
_refiller.incrementParticipatingMessageBytes(size);
|
||||
public boolean sentParticipatingMessage(int size, float factor) {
|
||||
return _refiller.incrementParticipatingMessageBytes(size, factor);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@@ -5,8 +5,8 @@ import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.DataHelper;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.transport.FIFOBandwidthLimiter.Request;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
@@ -22,8 +22,11 @@ import net.i2p.util.Log;
|
||||
*/
|
||||
public class FIFOBandwidthRefiller implements Runnable {
|
||||
private final Log _log;
|
||||
private final I2PAppContext _context;
|
||||
private final RouterContext _context;
|
||||
private final FIFOBandwidthLimiter _limiter;
|
||||
// This is only changed if the config changes
|
||||
private volatile SyntheticREDQueue _partBWE;
|
||||
|
||||
/** how many KBps do we want to allow? */
|
||||
private int _inboundKBytesPerSecond;
|
||||
/** how many KBps do we want to allow? */
|
||||
@@ -76,6 +79,9 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
* See util/DecayingBloomFilter and tunnel/BloomFilterIVValidator.
|
||||
*/
|
||||
public static final int MAX_OUTBOUND_BANDWIDTH = 16384;
|
||||
|
||||
private static final float MAX_SHARE_PERCENTAGE = 0.90f;
|
||||
private static final float SHARE_LIMIT_FACTOR = 0.95f;
|
||||
|
||||
/**
|
||||
* how often we replenish the queues.
|
||||
@@ -83,10 +89,11 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
*/
|
||||
private static final long REPLENISH_FREQUENCY = 40;
|
||||
|
||||
FIFOBandwidthRefiller(I2PAppContext context, FIFOBandwidthLimiter limiter) {
|
||||
FIFOBandwidthRefiller(RouterContext context, FIFOBandwidthLimiter limiter) {
|
||||
_limiter = limiter;
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(FIFOBandwidthRefiller.class);
|
||||
_context.statManager().createRateStat("bwLimiter.participatingBandwidthQueue", "size in bytes", "BandwidthLimiter", new long[] { 5*60*1000l, 60*60*1000l });
|
||||
reinitialize();
|
||||
_isRunning = true;
|
||||
}
|
||||
@@ -100,6 +107,7 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
// bootstrap 'em with nothing
|
||||
_lastRefillTime = _limiter.now();
|
||||
List<FIFOBandwidthLimiter.Request> buffer = new ArrayList<Request>(2);
|
||||
byte i = 0;
|
||||
while (_isRunning) {
|
||||
long now = _limiter.now();
|
||||
if (now >= _lastCheckConfigTime + _configCheckPeriodMs) {
|
||||
@@ -107,8 +115,10 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
now = _limiter.now();
|
||||
_lastCheckConfigTime = now;
|
||||
}
|
||||
// just for the stats
|
||||
if ((++i) == 0)
|
||||
updateParticipating(now);
|
||||
|
||||
updateParticipating(now);
|
||||
boolean updated = updateQueues(buffer, now);
|
||||
if (updated) {
|
||||
_lastRefillTime = now;
|
||||
@@ -171,6 +181,16 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In Bytes per second
|
||||
*/
|
||||
private int getShareBandwidth() {
|
||||
int maxKBps = Math.min(_inboundKBytesPerSecond, _outboundKBytesPerSecond);
|
||||
// limit to 90% so it doesn't clog up at the transport bandwidth limiter
|
||||
float share = Math.min((float) _context.router().getSharePercentage(), MAX_SHARE_PERCENTAGE);
|
||||
return (int) (maxKBps * share * 1024f * SHARE_LIMIT_FACTOR);
|
||||
}
|
||||
|
||||
private void checkConfig() {
|
||||
updateInboundRate();
|
||||
@@ -179,7 +199,13 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
updateOutboundBurstRate();
|
||||
updateInboundPeak();
|
||||
updateOutboundPeak();
|
||||
|
||||
|
||||
// if share bandwidth config changed, throw out the SyntheticREDQueue and make a new one
|
||||
int maxBps = getShareBandwidth();
|
||||
if (_partBWE == null || maxBps != _partBWE.getMaxBandwidth()) {
|
||||
_partBWE = new SyntheticREDQueue(_context, maxBps);
|
||||
}
|
||||
|
||||
// We are always limited for now
|
||||
//_limiter.setInboundUnlimited(_inboundKBytesPerSecond <= 0);
|
||||
//_limiter.setOutboundUnlimited(_outboundKBytesPerSecond <= 0);
|
||||
@@ -299,34 +325,14 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
int getOutboundBurstKBytesPerSecond() { return _outboundBurstKBytesPerSecond; }
|
||||
int getInboundBurstKBytesPerSecond() { return _inboundBurstKBytesPerSecond; }
|
||||
|
||||
/**
|
||||
* Participating counter stuff below here
|
||||
* TOTAL_TIME needs to be high enough to get a burst without dropping
|
||||
* @since 0.8.12
|
||||
*/
|
||||
private static final int TOTAL_TIME = 4000;
|
||||
private static final int PERIODS = TOTAL_TIME / (int) REPLENISH_FREQUENCY;
|
||||
/** count in current replenish period */
|
||||
private final AtomicInteger _currentParticipating = new AtomicInteger();
|
||||
private long _lastPartUpdateTime;
|
||||
private int _lastTotal;
|
||||
/** the actual length of last total period as coalesced (nominally TOTAL_TIME) */
|
||||
private long _lastTotalTime;
|
||||
private int _lastIndex;
|
||||
/** buffer of count per replenish period, last is at _lastIndex, older at higher indexes (wraps) */
|
||||
private final int[] _counts = new int[PERIODS];
|
||||
/** the actual length of the period (nominally REPLENISH_FREQUENCY) */
|
||||
private final long[] _times = new long[PERIODS];
|
||||
private final ReentrantReadWriteLock _updateLock = new ReentrantReadWriteLock(false);
|
||||
|
||||
/**
|
||||
* We sent a message.
|
||||
*
|
||||
* @param size bytes
|
||||
* @since 0.8.12
|
||||
*/
|
||||
void incrementParticipatingMessageBytes(int size) {
|
||||
_currentParticipating.addAndGet(size);
|
||||
boolean incrementParticipatingMessageBytes(int size, float factor) {
|
||||
return _partBWE.offer(size, factor);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -336,24 +342,7 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
* @since 0.8.12
|
||||
*/
|
||||
int getCurrentParticipatingBandwidth() {
|
||||
_updateLock.readLock().lock();
|
||||
try {
|
||||
return locked_getCurrentParticipatingBandwidth();
|
||||
} finally {
|
||||
_updateLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private int locked_getCurrentParticipatingBandwidth() {
|
||||
int current = _currentParticipating.get();
|
||||
long totalTime = (_limiter.now() - _lastPartUpdateTime) + _lastTotalTime;
|
||||
if (totalTime <= 0)
|
||||
return 0;
|
||||
// 1000 for ms->seconds in denominator
|
||||
long bw = 1000l * (current + _lastTotal) / totalTime;
|
||||
if (bw > Integer.MAX_VALUE)
|
||||
return 0;
|
||||
return (int) bw;
|
||||
return (int) (_partBWE.getBandwidthEstimate() * 1000f);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -362,42 +351,7 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
* @since 0.8.12
|
||||
*/
|
||||
private void updateParticipating(long now) {
|
||||
_updateLock.writeLock().lock();
|
||||
try {
|
||||
locked_updateParticipating(now);
|
||||
} finally {
|
||||
_updateLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void locked_updateParticipating(long now) {
|
||||
long elapsed = now - _lastPartUpdateTime;
|
||||
if (elapsed <= 0) {
|
||||
// glitch in the matrix
|
||||
_lastPartUpdateTime = now;
|
||||
return;
|
||||
}
|
||||
_lastPartUpdateTime = now;
|
||||
if (--_lastIndex < 0)
|
||||
_lastIndex = PERIODS - 1;
|
||||
_counts[_lastIndex] = _currentParticipating.getAndSet(0);
|
||||
_times[_lastIndex] = elapsed;
|
||||
_lastTotal = 0;
|
||||
_lastTotalTime = 0;
|
||||
// add up total counts and times
|
||||
for (int i = 0; i < PERIODS; i++) {
|
||||
int idx = (_lastIndex + i) % PERIODS;
|
||||
_lastTotal += _counts[idx];
|
||||
_lastTotalTime += _times[idx];
|
||||
if (_lastTotalTime >= TOTAL_TIME)
|
||||
break;
|
||||
}
|
||||
if (_lastIndex == 0 && _lastTotalTime > 0) {
|
||||
long bw = 1000l * _lastTotal / _lastTotalTime;
|
||||
_context.statManager().addRateData("tunnel.participatingBandwidthOut", bw);
|
||||
if (_lastTotal > 0 && _log.shouldLog(Log.INFO))
|
||||
_log.info(DataHelper.formatSize(_lastTotal) + " bytes out part. tunnels in last " + _lastTotalTime + " ms: " +
|
||||
DataHelper.formatSize(bw) + " Bps");
|
||||
}
|
||||
_context.statManager().addRateData("tunnel.participatingBandwidthOut", getCurrentParticipatingBandwidth());
|
||||
_context.statManager().addRateData("bwLimiter.participatingBandwidthQueue", (long) _partBWE.getQueueSizeEstimate());
|
||||
}
|
||||
}
|
||||
|
375
router/java/src/net/i2p/router/transport/SyntheticREDQueue.java
Normal file
375
router/java/src/net/i2p/router/transport/SyntheticREDQueue.java
Normal file
@@ -0,0 +1,375 @@
|
||||
package net.i2p.router.transport;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.DataHelper;
|
||||
import net.i2p.util.BandwidthEstimator;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* A "synthetic" queue in that it doesn't actually queue anything.
|
||||
* Actual queueing is assumed to be "dowstream" of this.
|
||||
*
|
||||
* Maintains an average estimated "queue size" assuming a constant output rate
|
||||
* declared in the constructor. The queue size is measured in bytes.
|
||||
*
|
||||
* With offer(), will return true for "accepted" or false for "dropped",
|
||||
* based on the RED algorithm which uses the current average queue size
|
||||
* and the offered data size to calculate a drop probability.
|
||||
* Bandwidth is not directly used in the RED algorithm, except to
|
||||
* synthetically calculate an average queue size assuming the
|
||||
* queue is being drained precisely at that rate, byte-by-byte
|
||||
* (not per-packet).
|
||||
*
|
||||
* addSample() unconditionally accepts the packet.
|
||||
*
|
||||
* Also maintains a Westwood+ bandwidth estimator.
|
||||
* The bandwidth and queue size estimates are only updated if the
|
||||
* packet is "accepted".
|
||||
*
|
||||
* The average queue size is calculated in the same manner as the
|
||||
* bandwidth, with an update every WESTWOOD_RTT_MIN ms.
|
||||
* Both estimators use
|
||||
* a first stage anti-aliasing low pass filter based on RTT,
|
||||
* and the time-varying Westwood filter based on inter-arrival time.
|
||||
*
|
||||
* Ref: Random Early Detection Gateways for Congestion Avoidance
|
||||
* Sally Floyd and Van Jacobson
|
||||
*
|
||||
* Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks
|
||||
* Casetti et al
|
||||
* (Westwood)
|
||||
*
|
||||
* Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks
|
||||
* Grieco and Mascolo
|
||||
* (Westwood+)
|
||||
*
|
||||
* Adapted from: Linux kernel tcp_westwood.c (GPLv2)
|
||||
*
|
||||
* @since 0.9.49 adapted from streaming
|
||||
*/
|
||||
class SyntheticREDQueue implements BandwidthEstimator {
|
||||
|
||||
private final I2PAppContext _context;
|
||||
private final Log _log;
|
||||
|
||||
private long _tAck;
|
||||
// bw_est, bw_ns_est
|
||||
private float _bKFiltered, _bK_ns_est;
|
||||
// bk
|
||||
private int _acked;
|
||||
|
||||
// RED vars
|
||||
// pkts since last dropped pkt
|
||||
private int _count = -1;
|
||||
// smoothed average queue size in bytes
|
||||
private float _avgQSize;
|
||||
// last sample queue size in bytes
|
||||
private float _qSize;
|
||||
// current interval newly queued in bytes, since the last updateQSize()
|
||||
private int _newDataSize;
|
||||
// last time _avgQSize calculated
|
||||
private long _tQSize;
|
||||
// min queue size threshold, in bytes, to start dropping
|
||||
private final int _minth;
|
||||
// max queue size, in bytes, before dropping everything
|
||||
private final int _maxth;
|
||||
// bandwidth in bytes per second, as passed to the constructor.
|
||||
private final int _bwBps;
|
||||
// bandwidth in bytes per ms. The queue is drained at this rate.
|
||||
private final float _bwBpms;
|
||||
// As in RED paper
|
||||
private static final float MAXP = 0.02f;
|
||||
|
||||
// As in kernel tcp_westwood.c
|
||||
// Should probably match ConnectionOptions.TCP_ALPHA
|
||||
private static final int DECAY_FACTOR = 8;
|
||||
private static final int WESTWOOD_RTT_MIN = 500;
|
||||
// denominator of time, 1/x seconds of traffic in the queue
|
||||
private static final int DEFAULT_LOW_THRESHOLD = 13;
|
||||
// denominator of time, 1/x seconds of traffic in the queue
|
||||
private static final int DEFAULT_HIGH_THRESHOLD = 3;
|
||||
|
||||
/**
|
||||
* Default thresholds.
|
||||
* Min: 100 ms of traffic; max: 500 ms.
|
||||
*
|
||||
* @param bwKBps the output rate of the queue in Bps
|
||||
*/
|
||||
SyntheticREDQueue(I2PAppContext ctx, int bwBps) {
|
||||
// the goal is to drop here rather than let the traffic
|
||||
// get through to UDP-Sender CoDel and get dropped there,
|
||||
// when we're at the default 80% share or below.
|
||||
// That CoDel starts dropping when above 100 ms latency for 500 ms.
|
||||
// let's try the same 100 ms of traffic here.
|
||||
this(ctx, bwBps, bwBps / DEFAULT_LOW_THRESHOLD, bwBps / DEFAULT_HIGH_THRESHOLD);
|
||||
}
|
||||
|
||||
/**
|
||||
* Specified queue size thresholds.
|
||||
* offer() drops a 1024 byte packet at 2% probability just lower than maxThKB,
|
||||
* and at 100% probability higher than maxThKB.
|
||||
*
|
||||
* @param bwKBps the output rate of the queue in Bps
|
||||
* @param minThKB the minimum queue size to start dropping in Bytes
|
||||
* @param maxThKB the queue size to drop everything in Bytes
|
||||
*/
|
||||
SyntheticREDQueue(I2PAppContext ctx, int bwBps, int minThB, int maxThB) {
|
||||
_log = ctx.logManager().getLog(SyntheticREDQueue.class);
|
||||
_context = ctx;
|
||||
// assume we're about to send something
|
||||
_tAck = ctx.clock().now();
|
||||
_acked = -1;
|
||||
_minth = minThB;
|
||||
_maxth = maxThB;
|
||||
_bwBps = bwBps;
|
||||
_bwBpms = bwBps / 1000f;
|
||||
_tQSize = _tAck;
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Configured " + bwBps + " BPS, min: " + minThB + " B, max: " + maxThB + " B");
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Nominal bandwidth limit in bytes per second, as passed to the constructor.
|
||||
*
|
||||
*/
|
||||
public int getMaxBandwidth() {
|
||||
return _bwBps;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unconditional, never drop.
|
||||
* The queue size and bandwidth estimates will be updated.
|
||||
*/
|
||||
public void addSample(int size) {
|
||||
offer(size, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Should we drop this packet?
|
||||
* If accepted, the queue size and bandwidth estimates will be updated.
|
||||
*
|
||||
* @param size how many bytes to be offered
|
||||
* @param factor how to adjust the size for the drop probability calculation,
|
||||
* or 1.0 for standard probability. 0 to prevent dropping.
|
||||
* Does not affect bandwidth calculations.
|
||||
* @return true for accepted, false for drop
|
||||
*/
|
||||
public boolean offer(int size, float factor) {
|
||||
long now = _context.clock().now();
|
||||
return addSample(size, factor, now);
|
||||
}
|
||||
|
||||
private synchronized boolean addSample(int acked, float factor, long now) {
|
||||
if (_acked < 0) {
|
||||
// first sample
|
||||
// use time since constructed as the RTT
|
||||
long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN);
|
||||
float bkdt = ((float) acked) / deltaT;
|
||||
_bKFiltered = bkdt;
|
||||
_bK_ns_est = bkdt;
|
||||
_acked = 0;
|
||||
_tAck = now;
|
||||
_tQSize = now;
|
||||
_newDataSize = acked;
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("first sample bytes: " + acked + " deltaT: " + deltaT + ' ' + this);
|
||||
return true;
|
||||
} else {
|
||||
// update queue size average if necessary
|
||||
// the current sample is not included in the calculation
|
||||
long deltaT = now - _tQSize;
|
||||
if (deltaT > WESTWOOD_RTT_MIN)
|
||||
updateQSize(now, deltaT);
|
||||
if (factor > 0) {
|
||||
// drop calculation
|
||||
if (_avgQSize > _maxth) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("drop bytes (qsize): " + acked + ' ' + this);
|
||||
_count = 0;
|
||||
return false;
|
||||
}
|
||||
if (_avgQSize > _minth) {
|
||||
_count++;
|
||||
float pb = (acked / 1024f) * factor * MAXP * (_avgQSize - _minth) / (_maxth - _minth);
|
||||
float pa = pb / (1 - (_count * pb));
|
||||
float rand = _context.random().nextFloat();
|
||||
if (rand < pa) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("drop bytes (prob): " + acked + " factor " + factor + " prob: " + pa + " deltaT: " + deltaT + ' ' + this);
|
||||
_count = 0;
|
||||
return false;
|
||||
}
|
||||
_count = -1;
|
||||
}
|
||||
}
|
||||
// accepted
|
||||
_newDataSize += acked;
|
||||
_acked += acked;
|
||||
// update bandwidth estimate if necessary
|
||||
deltaT = now - _tAck;
|
||||
if (deltaT >= WESTWOOD_RTT_MIN)
|
||||
computeBWE(now, (int) deltaT);
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("accept bytes: " + acked + " factor " + factor + ' ' + this);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the current bandwidth estimate in bytes/ms.
|
||||
*/
|
||||
public float getBandwidthEstimate() {
|
||||
long now = _context.clock().now();
|
||||
// anti-aliasing filter
|
||||
// As in kernel tcp_westwood.c
|
||||
// and the Westwood+ paper
|
||||
synchronized(this) {
|
||||
long deltaT = now - _tAck;
|
||||
if (deltaT >= WESTWOOD_RTT_MIN)
|
||||
return computeBWE(now, (int) deltaT);
|
||||
return _bKFiltered;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the current queue size estimate in bytes.
|
||||
*/
|
||||
public float getQueueSizeEstimate() {
|
||||
long now = _context.clock().now();
|
||||
// anti-aliasing filter
|
||||
// As in kernel tcp_westwood.c
|
||||
// and the Westwood+ paper
|
||||
synchronized(this) {
|
||||
long deltaT = now - _tQSize;
|
||||
if (deltaT >= WESTWOOD_RTT_MIN)
|
||||
updateQSize(now, deltaT);
|
||||
return _avgQSize;
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized float computeBWE(final long now, final int rtt) {
|
||||
if (_acked < 0)
|
||||
return 0.0f; // nothing ever sampled
|
||||
updateBK(now, _acked, rtt);
|
||||
_acked = 0;
|
||||
return _bKFiltered;
|
||||
}
|
||||
|
||||
/**
|
||||
* Optimized version of updateBK with packets == 0
|
||||
*/
|
||||
private void decay() {
|
||||
_bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR;
|
||||
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
|
||||
}
|
||||
|
||||
private void decayQueue(int rtt) {
|
||||
_qSize -= rtt * _bwBpms;
|
||||
if (_qSize < 1)
|
||||
_qSize = 0;
|
||||
_avgQSize = westwood_do_filter(_avgQSize, _qSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Here we insert virtual null samples if necessary as in Westwood,
|
||||
* And use a very simple EWMA (exponential weighted moving average)
|
||||
* time-varying filter, as in kernel tcp_westwood.c
|
||||
*
|
||||
* @param time the time of the measurement
|
||||
* @param packets number of bytes acked
|
||||
* @param rtt current rtt
|
||||
*/
|
||||
private void updateBK(long time, int packets, int rtt) {
|
||||
long deltaT = time - _tAck;
|
||||
if (rtt < WESTWOOD_RTT_MIN)
|
||||
rtt = WESTWOOD_RTT_MIN;
|
||||
if (deltaT > 2 * rtt) {
|
||||
// Decay with virtual null samples as in the Westwood paper
|
||||
int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR);
|
||||
for (int i = 0; i < numrtts; i++) {
|
||||
if (_bKFiltered <= 0)
|
||||
break;
|
||||
decay();
|
||||
}
|
||||
deltaT -= numrtts * rtt;
|
||||
//if (_log.shouldDebug())
|
||||
// _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
|
||||
}
|
||||
float bkdt;
|
||||
if (packets > 0) {
|
||||
// As in kernel tcp_westwood.c
|
||||
bkdt = ((float) packets) / deltaT;
|
||||
_bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt);
|
||||
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
|
||||
} else {
|
||||
bkdt = 0;
|
||||
decay();
|
||||
}
|
||||
_tAck = time;
|
||||
//if (_log.shouldDebug())
|
||||
// _log.debug("computeBWE bytes: " + packets + " deltaT: " + deltaT +
|
||||
// " bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Here we insert virtual null samples if necessary as in Westwood,
|
||||
* And use a very simple EWMA (exponential weighted moving average)
|
||||
* time-varying filter, as in kernel tcp_westwood.c
|
||||
*
|
||||
* @param time the time of the measurement
|
||||
* @param deltaT at least WESTWOOD_RTT_MIN
|
||||
*/
|
||||
private void updateQSize(long time, long deltaT) {
|
||||
long origDT = deltaT;
|
||||
if (deltaT > 2 * WESTWOOD_RTT_MIN) {
|
||||
// Decay with virtual null samples as in the Westwood paper
|
||||
int numrtts = Math.min((int) ((deltaT / WESTWOOD_RTT_MIN) - 1), 2 * DECAY_FACTOR);
|
||||
for (int i = 0; i < numrtts; i++) {
|
||||
if (_avgQSize <= 0)
|
||||
break;
|
||||
decayQueue(WESTWOOD_RTT_MIN);
|
||||
}
|
||||
deltaT -= numrtts * WESTWOOD_RTT_MIN;
|
||||
//if (_log.shouldDebug())
|
||||
// _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
|
||||
}
|
||||
int origNDS = _newDataSize;
|
||||
float newQSize = _newDataSize;
|
||||
if (_newDataSize > 0) {
|
||||
newQSize -= deltaT * _bwBpms;
|
||||
if (newQSize < 1)
|
||||
newQSize = 0;
|
||||
_qSize = westwood_do_filter(_qSize, newQSize);
|
||||
_avgQSize = westwood_do_filter(_avgQSize, _qSize);
|
||||
_newDataSize = 0;
|
||||
} else {
|
||||
decayQueue((int) deltaT);
|
||||
}
|
||||
_tQSize = time;
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("computeQS deltaT: " + origDT +
|
||||
" newData: " + origNDS +
|
||||
" newQsize: " + newQSize + " qSize: " + _qSize + ' ' + this);
|
||||
}
|
||||
|
||||
/**
|
||||
* As in kernel tcp_westwood.c
|
||||
*/
|
||||
private static float westwood_do_filter(float a, float b) {
|
||||
return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String toString() {
|
||||
return "SREDQ[" +
|
||||
//" _bKFiltered " + _bKFiltered +
|
||||
//" _tAck " + _tAck + "; " +
|
||||
//" _tQSize " + _tQSize +
|
||||
' ' + DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000), false) +
|
||||
"Bps, avg_qsize " +
|
||||
DataHelper.formatSize2((long) _avgQSize, false) +
|
||||
"B]";
|
||||
}
|
||||
}
|
@@ -52,7 +52,6 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver {
|
||||
//if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
|
||||
// return -1;
|
||||
//_config.incrementSentMessages();
|
||||
_context.bandwidthLimiter().sentParticipatingMessage(1024);
|
||||
TunnelDataMessage msg = new TunnelDataMessage(_context);
|
||||
msg.setData(encrypted);
|
||||
msg.setTunnelId(_config.getSendTunnel());
|
||||
|
@@ -79,8 +79,6 @@ class OutboundTunnelEndpoint {
|
||||
//int kb = (size + 1023) / 1024;
|
||||
//for (int i = 0; i < kb; i++)
|
||||
// _config.incrementSentMessages();
|
||||
if (!toUs)
|
||||
_context.bandwidthLimiter().sentParticipatingMessage(size);
|
||||
_outDistributor.distribute(msg, toRouter, toTunnel);
|
||||
}
|
||||
}
|
||||
|
@@ -34,7 +34,15 @@ class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
|
||||
// Hard to do this exactly, but we'll assume 2:1 batching
|
||||
// for the purpose of estimating outgoing size.
|
||||
// We assume that it's the outbound bandwidth that is the issue...
|
||||
int size = Math.max(msg.getMessageSize(), 1024/2);
|
||||
// first frag. overhead
|
||||
int size = msg.getMessageSize() + 60;
|
||||
if (size > 1024) {
|
||||
// additional frag. overhead
|
||||
size += 28 * (size / 1024);
|
||||
} else if (size < 512) {
|
||||
// 2:1 batching of small messages
|
||||
size = 512;
|
||||
}
|
||||
if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.IBGW, msg.getType(), size)) {
|
||||
// this overstates the stat somewhat, but ok for now
|
||||
int kb = (size + 1023) / 1024;
|
||||
|
@@ -767,84 +767,30 @@ public class TunnelDispatcher implements Service {
|
||||
public boolean shouldDropParticipatingMessage(Location loc, int type, int length) {
|
||||
if (length <= 0)
|
||||
return false;
|
||||
/****
|
||||
Don't use the tunnel.participatingBandwidth stat any more. It could be up to 3 minutes old.
|
||||
Also, it counts inbound bandwidth, i.e. before dropping, which resulted in too many drops
|
||||
during a burst.
|
||||
We now use the bandwidth limiter to track outbound participating bandwidth
|
||||
over the last few seconds.
|
||||
****/
|
||||
|
||||
/****
|
||||
RateStat rs = _context.statManager().getRate("tunnel.participatingBandwidth");
|
||||
if (rs == null)
|
||||
return false;
|
||||
Rate r = rs.getRate(60*1000);
|
||||
if (r == null)
|
||||
return false;
|
||||
// weight current period higher
|
||||
long count = r.getLastEventCount() + (3 * r.getCurrentEventCount());
|
||||
int bw = 0;
|
||||
if (count > 0)
|
||||
bw = (int) ((r.getLastTotalValue() + (3 * r.getCurrentTotalValue())) / count);
|
||||
else
|
||||
bw = (int) r.getLifetimeAverageValue();
|
||||
|
||||
int usedIn = Math.min(_context.router().get1sRateIn(), _context.router().get15sRateIn());
|
||||
if (bw < usedIn)
|
||||
usedIn = bw;
|
||||
if (usedIn <= 0)
|
||||
return false;
|
||||
int usedOut = Math.min(_context.router().get1sRate(true), _context.router().get15sRate(true));
|
||||
if (bw < usedOut)
|
||||
usedOut = bw;
|
||||
if (usedOut <= 0)
|
||||
return false;
|
||||
int used = Math.min(usedIn, usedOut);
|
||||
****/
|
||||
int used = _context.bandwidthLimiter().getCurrentParticipatingBandwidth();
|
||||
if (used <= 0)
|
||||
return false;
|
||||
|
||||
int maxKBps = Math.min(_context.bandwidthLimiter().getInboundKBytesPerSecond(),
|
||||
_context.bandwidthLimiter().getOutboundKBytesPerSecond());
|
||||
float share = (float) _context.router().getSharePercentage();
|
||||
|
||||
// start dropping at 120% of the limit,
|
||||
// as we rely on Throttle for long-term bandwidth control by rejecting tunnels
|
||||
float maxBps = maxKBps * share * (1024f * 1.20f);
|
||||
float pctDrop = (used - maxBps) / used;
|
||||
if (pctDrop <= 0)
|
||||
return false;
|
||||
// increase the drop probability for OBEP,
|
||||
// (except lower it for tunnel build messages type 21/22/23/24),
|
||||
// and lower it for IBGW, for network efficiency
|
||||
double len = length;
|
||||
float factor;
|
||||
if (loc == Location.OBEP) {
|
||||
// we don't need to check for VTBRM/TBRM as that happens at tunnel creation
|
||||
if (type == VariableTunnelBuildMessage.MESSAGE_TYPE || type == TunnelBuildMessage.MESSAGE_TYPE)
|
||||
len /= 1.5;
|
||||
factor = 1 / 1.5f;
|
||||
else
|
||||
len *= 1.5;
|
||||
factor = 1.5f;
|
||||
} else if (loc == Location.IBGW) {
|
||||
// we don't need to check for VTBM/TBM as that happens at tunnel creation
|
||||
if (type == VariableTunnelBuildReplyMessage.MESSAGE_TYPE || type == TunnelBuildReplyMessage.MESSAGE_TYPE)
|
||||
len /= 1.5 * 1.5 * 1.5;
|
||||
factor = 1 / (1.5f * 1.5f * 1.5f);
|
||||
else
|
||||
len /= 1.5;
|
||||
factor = 1 / 1.5f;
|
||||
} else {
|
||||
factor = 1.0f;
|
||||
}
|
||||
// drop in proportion to size w.r.t. a standard 1024-byte message
|
||||
// this is a little expensive but we want to adjust the curve between 0 and 1
|
||||
// Most messages are 1024, only at the OBEP do we see other sizes
|
||||
if ((int)len != 1024)
|
||||
pctDrop = (float) Math.pow(pctDrop, 1024d / len);
|
||||
float rand = _context.random().nextFloat();
|
||||
boolean reject = rand <= pctDrop;
|
||||
boolean reject = ! _context.bandwidthLimiter().sentParticipatingMessage(length, factor);
|
||||
if (reject) {
|
||||
if (_log.shouldLog(Log.WARN)) {
|
||||
int availBps = (int) (((maxKBps*1024)*share) - used);
|
||||
_log.warn("Drop part. msg. avail/max/used " + availBps + "/" + (int) maxBps + "/"
|
||||
+ used + " %Drop = " + pctDrop
|
||||
_log.warn("Drop part. msg. factor=" + factor
|
||||
+ ' ' + loc + ' ' + type + ' ' + length);
|
||||
}
|
||||
_context.statManager().addRateData("tunnel.participatingMessageDropped", 1);
|
||||
|
@@ -203,7 +203,6 @@ class TunnelParticipant {
|
||||
TunnelDataMessage.MESSAGE_TYPE, 1024))
|
||||
return;
|
||||
//_config.incrementSentMessages();
|
||||
_context.bandwidthLimiter().sentParticipatingMessage(1024);
|
||||
long oldId = msg.getUniqueId();
|
||||
long newId = _context.random().nextLong(I2NPMessage.MAX_ID_VALUE);
|
||||
_context.messageHistory().wrap("TunnelDataMessage", oldId, "TunnelDataMessage", newId);
|
||||
|
Reference in New Issue
Block a user