forked from I2P_Developers/i2p.i2p
Tunnels: Fix RED
This builds on and replaces MR !23 which fixed the bw calc used for RED by changing from a 40 ms bucket to an exponential moving average. Here we fix the other part of RED which is the dropping calculation. Previously, it simply used the bw calc to start dropping if the bw was higher than a threshold. The drop percentage rose from 0 to 100%, linearly, based on how far the bandwidth was above the threshold. This was far, far from the RED paper. Now, we follow the RED paper (see ref. in SyntheticREDQueue javadoc) to calculate an average queue size, using the exact same exponential moving average method used for bandwidth. Similar to CoDel, it also includes a count of how long the size is over the threshold, and increases the drop probability with the count. The unadjusted drop probability rises from 0 to 2% and then everything is dropped, as in the RED paper. The low and high thresholds are configured at 100 ms and 500 ms of queued data, respectively. The queue is "synthetic" in that there's not actually a queue. It only calculates how big the queue would be if it were a real queue and were being emptied at exactly the target rate. All actual queueing is done downstream in the transports and in UDP-Sender. The goals are, for an 80% default share, to do most of the part. traffic dropping here in RED, not downstream in UDP-Sender, while fully utilizing the configured share bandwidth. If the router goes into high message delay mode, that means we are not dropping enough in RED. Above 80% share this probably doesn't work as well. Tested for a few days in the live net. There will probably be more tuning required, in particular to achieve the goal in live net of "protecting" the UDP-Sender queue and local client/router traffic by dropping more aggressively in RED. WIP, not for merging, see !23 for additional comments.
This commit is contained in:
@@ -7,7 +7,7 @@ import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.util.PQEntry;
|
||||
import net.i2p.util.I2PThread;
|
||||
import net.i2p.util.Log;
|
||||
@@ -33,7 +33,7 @@ import net.i2p.util.Log;
|
||||
*/
|
||||
public class FIFOBandwidthLimiter {
|
||||
private final Log _log;
|
||||
private final I2PAppContext _context;
|
||||
private final RouterContext _context;
|
||||
private final List<SimpleRequest> _pendingInboundRequests;
|
||||
private final List<SimpleRequest> _pendingOutboundRequests;
|
||||
/** how many bytes we can consume for inbound transmission immediately */
|
||||
@@ -84,7 +84,7 @@ public class FIFOBandwidthLimiter {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public FIFOBandwidthLimiter(I2PAppContext context) {
|
||||
public FIFOBandwidthLimiter(RouterContext context) {
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(FIFOBandwidthLimiter.class);
|
||||
_context.statManager().createRateStat("bwLimiter.pendingOutboundRequests", "How many outbound requests are ahead of the current one (ignoring ones with 0)?", "BandwidthLimiter", new long[] { 5*60*1000l, 60*60*1000l });
|
||||
@@ -192,8 +192,8 @@ public class FIFOBandwidthLimiter {
|
||||
* @param size bytes
|
||||
* @since 0.8.12
|
||||
*/
|
||||
public void sentParticipatingMessage(int size) {
|
||||
_refiller.incrementParticipatingMessageBytes(size);
|
||||
public boolean sentParticipatingMessage(int size, float factor) {
|
||||
return _refiller.incrementParticipatingMessageBytes(size, factor);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@@ -5,8 +5,8 @@ import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.DataHelper;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.transport.FIFOBandwidthLimiter.Request;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
@@ -22,8 +22,10 @@ import net.i2p.util.Log;
|
||||
*/
|
||||
public class FIFOBandwidthRefiller implements Runnable {
|
||||
private final Log _log;
|
||||
private final I2PAppContext _context;
|
||||
private final RouterContext _context;
|
||||
private final FIFOBandwidthLimiter _limiter;
|
||||
private final SyntheticREDQueue _partBWE;
|
||||
|
||||
/** how many KBps do we want to allow? */
|
||||
private int _inboundKBytesPerSecond;
|
||||
/** how many KBps do we want to allow? */
|
||||
@@ -83,11 +85,18 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
*/
|
||||
private static final long REPLENISH_FREQUENCY = 40;
|
||||
|
||||
FIFOBandwidthRefiller(I2PAppContext context, FIFOBandwidthLimiter limiter) {
|
||||
FIFOBandwidthRefiller(RouterContext context, FIFOBandwidthLimiter limiter) {
|
||||
_limiter = limiter;
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(FIFOBandwidthRefiller.class);
|
||||
_context.statManager().createRateStat("bwLimiter.participatingBandwidthQueue", "size in bytes", "BandwidthLimiter", new long[] { 5*60*1000l, 60*60*1000l });
|
||||
reinitialize();
|
||||
// todo config changes
|
||||
int maxKBps = Math.min(_inboundKBytesPerSecond, _outboundKBytesPerSecond);
|
||||
// limit to 90% so it doesn't clog up at the transport bandwidth limiter
|
||||
float share = Math.min((float) _context.router().getSharePercentage(), 0.90f);
|
||||
float maxBps = maxKBps * share * 1024f * 0.95f;
|
||||
_partBWE = new SyntheticREDQueue(context, (int) maxBps);
|
||||
_isRunning = true;
|
||||
}
|
||||
|
||||
@@ -100,6 +109,7 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
// bootstrap 'em with nothing
|
||||
_lastRefillTime = _limiter.now();
|
||||
List<FIFOBandwidthLimiter.Request> buffer = new ArrayList<Request>(2);
|
||||
byte i = 0;
|
||||
while (_isRunning) {
|
||||
long now = _limiter.now();
|
||||
if (now >= _lastCheckConfigTime + _configCheckPeriodMs) {
|
||||
@@ -107,8 +117,10 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
now = _limiter.now();
|
||||
_lastCheckConfigTime = now;
|
||||
}
|
||||
// just for the stats
|
||||
if ((++i) == 0)
|
||||
updateParticipating(now);
|
||||
|
||||
updateParticipating(now);
|
||||
boolean updated = updateQueues(buffer, now);
|
||||
if (updated) {
|
||||
_lastRefillTime = now;
|
||||
@@ -325,8 +337,9 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
* @param size bytes
|
||||
* @since 0.8.12
|
||||
*/
|
||||
void incrementParticipatingMessageBytes(int size) {
|
||||
_currentParticipating.addAndGet(size);
|
||||
boolean incrementParticipatingMessageBytes(int size, float factor) {
|
||||
//_currentParticipating.addAndGet(size);
|
||||
return _partBWE.offer(size, factor);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -336,14 +349,18 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
* @since 0.8.12
|
||||
*/
|
||||
int getCurrentParticipatingBandwidth() {
|
||||
return (int) (_partBWE.getBandwidthEstimate() * 1000f);
|
||||
/*
|
||||
_updateLock.readLock().lock();
|
||||
try {
|
||||
return locked_getCurrentParticipatingBandwidth();
|
||||
} finally {
|
||||
_updateLock.readLock().unlock();
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
private int locked_getCurrentParticipatingBandwidth() {
|
||||
int current = _currentParticipating.get();
|
||||
long totalTime = (_limiter.now() - _lastPartUpdateTime) + _lastTotalTime;
|
||||
@@ -355,6 +372,7 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
return 0;
|
||||
return (int) bw;
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* Run once every replenish period
|
||||
@@ -362,14 +380,19 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
* @since 0.8.12
|
||||
*/
|
||||
private void updateParticipating(long now) {
|
||||
_context.statManager().addRateData("tunnel.participatingBandwidthOut", getCurrentParticipatingBandwidth());
|
||||
_context.statManager().addRateData("bwLimiter.participatingBandwidthQueue", (long) _partBWE.getQueueSizeEstimate());
|
||||
/*
|
||||
_updateLock.writeLock().lock();
|
||||
try {
|
||||
locked_updateParticipating(now);
|
||||
} finally {
|
||||
_updateLock.writeLock().unlock();
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
private void locked_updateParticipating(long now) {
|
||||
long elapsed = now - _lastPartUpdateTime;
|
||||
if (elapsed <= 0) {
|
||||
@@ -400,4 +423,5 @@ public class FIFOBandwidthRefiller implements Runnable {
|
||||
DataHelper.formatSize(bw) + " Bps");
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
359
router/java/src/net/i2p/router/transport/SyntheticREDQueue.java
Normal file
359
router/java/src/net/i2p/router/transport/SyntheticREDQueue.java
Normal file
@@ -0,0 +1,359 @@
|
||||
package net.i2p.router.transport;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.DataHelper;
|
||||
import net.i2p.util.BandwidthEstimator;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* A "synthetic" queue in that it doesn't actually queue anything.
|
||||
* Actual queueing is assumed to be "dowstream" of this.
|
||||
*
|
||||
* Maintains an average estimated "queue size" assuming a constant output rate
|
||||
* declared in the constructor. The queue size is measured in bytes.
|
||||
*
|
||||
* With offer(), will return true for "accepted" or false for "dropped",
|
||||
* based on the RED algorithm which uses the current average queue size
|
||||
* and the offered data size to calculate a drop probability.
|
||||
* Bandwidth is not directly used in the RED algorithm, except to
|
||||
* synthetically calculate an average queue size assuming the
|
||||
* queue is being drained precisely at that rate, byte-by-byte
|
||||
* (not per-packet).
|
||||
*
|
||||
* addSample() unconditionally accepts the packet.
|
||||
*
|
||||
* Also maintains a Westwood+ bandwidth estimator.
|
||||
* The bandwidth and queue size estimates are only updated if the
|
||||
* packet is "accepted".
|
||||
*
|
||||
* The average queue size is calculated in the same manner as the
|
||||
* bandwidth, with an update every WESTWOOD_RTT_MIN ms.
|
||||
* Both estimators use
|
||||
* a first stage anti-aliasing low pass filter based on RTT,
|
||||
* and the time-varying Westwood filter based on inter-arrival time.
|
||||
*
|
||||
* Ref: Random Early Detection Gateways for Congestion Avoidance
|
||||
* Sally Floyd and Van Jacobson
|
||||
*
|
||||
* Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks
|
||||
* Casetti et al
|
||||
* (Westwood)
|
||||
*
|
||||
* Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks
|
||||
* Grieco and Mascolo
|
||||
* (Westwood+)
|
||||
*
|
||||
* Adapted from: Linux kernel tcp_westwood.c (GPLv2)
|
||||
*
|
||||
* @since 0.9.49 adapted from streaming
|
||||
*/
|
||||
class SyntheticREDQueue implements BandwidthEstimator {
|
||||
|
||||
private final I2PAppContext _context;
|
||||
private final Log _log;
|
||||
|
||||
private long _tAck;
|
||||
// bw_est, bw_ns_est
|
||||
private float _bKFiltered, _bK_ns_est;
|
||||
// bk
|
||||
private int _acked;
|
||||
|
||||
// RED vars
|
||||
// pkts since last dropped pkt
|
||||
private int _count = -1;
|
||||
// smoothed average queue size in bytes
|
||||
private float _avgQSize;
|
||||
// last sample queue size in bytes
|
||||
private float _qSize;
|
||||
// current interval newly queued in bytes, since the last updateQSize()
|
||||
private int _newDataSize;
|
||||
// last time _avgQSize calculated
|
||||
private long _tQSize;
|
||||
// min queue size threshold, in bytes, to start dropping
|
||||
private final int _minth;
|
||||
// max queue size, in bytes, before dropping everything
|
||||
private final int _maxth;
|
||||
// bandwidth in bytes per ms. The queue is drained at this rate.
|
||||
private final float _bwBpms;
|
||||
// As in RED paper
|
||||
private static final float MAXP = 0.02f;
|
||||
|
||||
// As in kernel tcp_westwood.c
|
||||
// Should probably match ConnectionOptions.TCP_ALPHA
|
||||
private static final int DECAY_FACTOR = 8;
|
||||
private static final int WESTWOOD_RTT_MIN = 500;
|
||||
|
||||
/**
|
||||
* Default thresholds.
|
||||
* Min: 100 ms of traffic; max: 500 ms.
|
||||
*
|
||||
* @param bwKBps the output rate of the queue in Bps
|
||||
*/
|
||||
SyntheticREDQueue(I2PAppContext ctx, int bwBps) {
|
||||
// the goal is to drop here rather than let the traffic
|
||||
// get through to UDP-Sender CoDel and get dropped there,
|
||||
// when we're at the default 80% share or below.
|
||||
// That CoDel starts dropping when above 100 ms latency for 500 ms.
|
||||
// let's try the same 100 ms of traffic here.
|
||||
this(ctx, bwBps, bwBps / 10, bwBps / 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Specified queue size thresholds.
|
||||
* offer() drops a 1024 byte packet at 2% probability just lower than maxThKB,
|
||||
* and at 100% probability higher than maxThKB.
|
||||
*
|
||||
* @param bwKBps the output rate of the queue in Bps
|
||||
* @param minThKB the minimum queue size to start dropping in Bytes
|
||||
* @param maxThKB the queue size to drop everything in Bytes
|
||||
*/
|
||||
SyntheticREDQueue(I2PAppContext ctx, int bwBps, int minThB, int maxThB) {
|
||||
_log = ctx.logManager().getLog(SyntheticREDQueue.class);
|
||||
_context = ctx;
|
||||
// assume we're about to send something
|
||||
_tAck = ctx.clock().now();
|
||||
_acked = -1;
|
||||
_minth = minThB;
|
||||
_maxth = maxThB;
|
||||
_bwBpms = bwBps / 1000f;
|
||||
_tQSize = _tAck;
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Configured " + bwBps + " BPS, min: " + minThB + " B, max: " + maxThB + " B");
|
||||
}
|
||||
|
||||
/**
|
||||
* Unconditional, never drop.
|
||||
* The queue size and bandwidth estimates will be updated.
|
||||
*/
|
||||
public void addSample(int size) {
|
||||
offer(size, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Should we drop this packet?
|
||||
* If accepted, the queue size and bandwidth estimates will be updated.
|
||||
*
|
||||
* @param size how many bytes to be offered
|
||||
* @param factor how to adjust the size for the drop probability calculation,
|
||||
* or 1.0 for standard probability. 0 to prevent dropping.
|
||||
* Does not affect bandwidth calculations.
|
||||
* @return true for accepted, false for drop
|
||||
*/
|
||||
public boolean offer(int size, float factor) {
|
||||
long now = _context.clock().now();
|
||||
return addSample(size, factor, now);
|
||||
}
|
||||
|
||||
private synchronized boolean addSample(int acked, float factor, long now) {
|
||||
if (_acked < 0) {
|
||||
// first sample
|
||||
// use time since constructed as the RTT
|
||||
long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN);
|
||||
float bkdt = ((float) acked) / deltaT;
|
||||
_bKFiltered = bkdt;
|
||||
_bK_ns_est = bkdt;
|
||||
_acked = 0;
|
||||
_tAck = now;
|
||||
_tQSize = now;
|
||||
_newDataSize = acked;
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("first sample bytes: " + acked + " deltaT: " + deltaT + ' ' + this);
|
||||
return true;
|
||||
} else {
|
||||
// update queue size average if necessary
|
||||
// the current sample is not included in the calculation
|
||||
long deltaT = now - _tQSize;
|
||||
if (deltaT > WESTWOOD_RTT_MIN)
|
||||
updateQSize(now, deltaT);
|
||||
if (factor > 0) {
|
||||
// drop calculation
|
||||
if (_avgQSize > _maxth) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("drop bytes (qsize): " + acked + ' ' + this);
|
||||
_count = 0;
|
||||
return false;
|
||||
}
|
||||
if (_avgQSize > _minth) {
|
||||
_count++;
|
||||
float pb = (acked / 1024f) * factor * MAXP * (_avgQSize - _minth) / (_maxth - _minth);
|
||||
float pa = pb / (1 - (_count * pb));
|
||||
float rand = _context.random().nextFloat();
|
||||
if (rand < pa) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("drop bytes (prob): " + acked + " factor " + factor + " prob: " + pa + " deltaT: " + deltaT + ' ' + this);
|
||||
_count = 0;
|
||||
return false;
|
||||
}
|
||||
_count = -1;
|
||||
}
|
||||
}
|
||||
// accepted
|
||||
_newDataSize += acked;
|
||||
_acked += acked;
|
||||
// update bandwidth estimate if necessary
|
||||
deltaT = now - _tAck;
|
||||
if (deltaT >= WESTWOOD_RTT_MIN)
|
||||
computeBWE(now, (int) deltaT);
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("accept bytes: " + acked + " factor " + factor + ' ' + this);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the current bandwidth estimate in bytes/ms.
|
||||
*/
|
||||
public float getBandwidthEstimate() {
|
||||
long now = _context.clock().now();
|
||||
// anti-aliasing filter
|
||||
// As in kernel tcp_westwood.c
|
||||
// and the Westwood+ paper
|
||||
synchronized(this) {
|
||||
long deltaT = now - _tAck;
|
||||
if (deltaT >= WESTWOOD_RTT_MIN)
|
||||
return computeBWE(now, (int) deltaT);
|
||||
return _bKFiltered;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the current queue size estimate in bytes.
|
||||
*/
|
||||
public float getQueueSizeEstimate() {
|
||||
long now = _context.clock().now();
|
||||
// anti-aliasing filter
|
||||
// As in kernel tcp_westwood.c
|
||||
// and the Westwood+ paper
|
||||
synchronized(this) {
|
||||
long deltaT = now - _tQSize;
|
||||
if (deltaT >= WESTWOOD_RTT_MIN)
|
||||
updateQSize(now, deltaT);
|
||||
return _avgQSize;
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized float computeBWE(final long now, final int rtt) {
|
||||
if (_acked < 0)
|
||||
return 0.0f; // nothing ever sampled
|
||||
updateBK(now, _acked, rtt);
|
||||
_acked = 0;
|
||||
return _bKFiltered;
|
||||
}
|
||||
|
||||
/**
|
||||
* Optimized version of updateBK with packets == 0
|
||||
*/
|
||||
private void decay() {
|
||||
_bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR;
|
||||
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
|
||||
}
|
||||
|
||||
private void decayQueue(int rtt) {
|
||||
_qSize -= rtt * _bwBpms;
|
||||
if (_qSize < 1)
|
||||
_qSize = 0;
|
||||
_avgQSize = westwood_do_filter(_avgQSize, _qSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Here we insert virtual null samples if necessary as in Westwood,
|
||||
* And use a very simple EWMA (exponential weighted moving average)
|
||||
* time-varying filter, as in kernel tcp_westwood.c
|
||||
*
|
||||
* @param time the time of the measurement
|
||||
* @param packets number of bytes acked
|
||||
* @param rtt current rtt
|
||||
*/
|
||||
private void updateBK(long time, int packets, int rtt) {
|
||||
long deltaT = time - _tAck;
|
||||
if (rtt < WESTWOOD_RTT_MIN)
|
||||
rtt = WESTWOOD_RTT_MIN;
|
||||
if (deltaT > 2 * rtt) {
|
||||
// Decay with virtual null samples as in the Westwood paper
|
||||
int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR);
|
||||
for (int i = 0; i < numrtts; i++) {
|
||||
if (_bKFiltered <= 0)
|
||||
break;
|
||||
decay();
|
||||
}
|
||||
deltaT -= numrtts * rtt;
|
||||
//if (_log.shouldDebug())
|
||||
// _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
|
||||
}
|
||||
float bkdt;
|
||||
if (packets > 0) {
|
||||
// As in kernel tcp_westwood.c
|
||||
bkdt = ((float) packets) / deltaT;
|
||||
_bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt);
|
||||
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
|
||||
} else {
|
||||
bkdt = 0;
|
||||
decay();
|
||||
}
|
||||
_tAck = time;
|
||||
//if (_log.shouldDebug())
|
||||
// _log.debug("computeBWE bytes: " + packets + " deltaT: " + deltaT +
|
||||
// " bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Here we insert virtual null samples if necessary as in Westwood,
|
||||
* And use a very simple EWMA (exponential weighted moving average)
|
||||
* time-varying filter, as in kernel tcp_westwood.c
|
||||
*
|
||||
* @param time the time of the measurement
|
||||
* @param deltaT at least WESTWOOD_RTT_MIN
|
||||
*/
|
||||
private void updateQSize(long time, long deltaT) {
|
||||
long origDT = deltaT;
|
||||
if (deltaT > 2 * WESTWOOD_RTT_MIN) {
|
||||
// Decay with virtual null samples as in the Westwood paper
|
||||
int numrtts = Math.min((int) ((deltaT / WESTWOOD_RTT_MIN) - 1), 2 * DECAY_FACTOR);
|
||||
for (int i = 0; i < numrtts; i++) {
|
||||
if (_avgQSize <= 0)
|
||||
break;
|
||||
decayQueue(WESTWOOD_RTT_MIN);
|
||||
}
|
||||
deltaT -= numrtts * WESTWOOD_RTT_MIN;
|
||||
//if (_log.shouldDebug())
|
||||
// _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
|
||||
}
|
||||
int origNDS = _newDataSize;
|
||||
float newQSize = _newDataSize;
|
||||
if (_newDataSize > 0) {
|
||||
newQSize -= deltaT * _bwBpms;
|
||||
if (newQSize < 1)
|
||||
newQSize = 0;
|
||||
_qSize = westwood_do_filter(_qSize, newQSize);
|
||||
_avgQSize = westwood_do_filter(_avgQSize, _qSize);
|
||||
_newDataSize = 0;
|
||||
} else {
|
||||
decayQueue((int) deltaT);
|
||||
}
|
||||
_tQSize = time;
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("computeQS deltaT: " + origDT +
|
||||
" newData: " + origNDS +
|
||||
" newQsize: " + newQSize + " qSize: " + _qSize + ' ' + this);
|
||||
}
|
||||
|
||||
/**
|
||||
* As in kernel tcp_westwood.c
|
||||
*/
|
||||
private static float westwood_do_filter(float a, float b) {
|
||||
return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String toString() {
|
||||
return "SREDQ[" +
|
||||
//" _bKFiltered " + _bKFiltered +
|
||||
//" _tAck " + _tAck + "; " +
|
||||
//" _tQSize " + _tQSize +
|
||||
' ' + DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000), false) +
|
||||
"Bps, avg_qsize " +
|
||||
DataHelper.formatSize2((long) _avgQSize, false) +
|
||||
"B]";
|
||||
}
|
||||
}
|
@@ -52,7 +52,6 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver {
|
||||
//if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
|
||||
// return -1;
|
||||
//_config.incrementSentMessages();
|
||||
_context.bandwidthLimiter().sentParticipatingMessage(1024);
|
||||
TunnelDataMessage msg = new TunnelDataMessage(_context);
|
||||
msg.setData(encrypted);
|
||||
msg.setTunnelId(_config.getSendTunnel());
|
||||
|
@@ -79,8 +79,6 @@ class OutboundTunnelEndpoint {
|
||||
//int kb = (size + 1023) / 1024;
|
||||
//for (int i = 0; i < kb; i++)
|
||||
// _config.incrementSentMessages();
|
||||
if (!toUs)
|
||||
_context.bandwidthLimiter().sentParticipatingMessage(size);
|
||||
_outDistributor.distribute(msg, toRouter, toTunnel);
|
||||
}
|
||||
}
|
||||
|
@@ -34,7 +34,15 @@ class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
|
||||
// Hard to do this exactly, but we'll assume 2:1 batching
|
||||
// for the purpose of estimating outgoing size.
|
||||
// We assume that it's the outbound bandwidth that is the issue...
|
||||
int size = Math.max(msg.getMessageSize(), 1024/2);
|
||||
// first frag. overhead
|
||||
int size = msg.getMessageSize() + 60;
|
||||
if (size > 1024) {
|
||||
// additional frag. overhead
|
||||
size += 28 * (size / 1024);
|
||||
} else if (size < 512) {
|
||||
// 2:1 batching of small messages
|
||||
size = 512;
|
||||
}
|
||||
if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.IBGW, msg.getType(), size)) {
|
||||
// this overstates the stat somewhat, but ok for now
|
||||
int kb = (size + 1023) / 1024;
|
||||
|
@@ -767,84 +767,30 @@ public class TunnelDispatcher implements Service {
|
||||
public boolean shouldDropParticipatingMessage(Location loc, int type, int length) {
|
||||
if (length <= 0)
|
||||
return false;
|
||||
/****
|
||||
Don't use the tunnel.participatingBandwidth stat any more. It could be up to 3 minutes old.
|
||||
Also, it counts inbound bandwidth, i.e. before dropping, which resulted in too many drops
|
||||
during a burst.
|
||||
We now use the bandwidth limiter to track outbound participating bandwidth
|
||||
over the last few seconds.
|
||||
****/
|
||||
|
||||
/****
|
||||
RateStat rs = _context.statManager().getRate("tunnel.participatingBandwidth");
|
||||
if (rs == null)
|
||||
return false;
|
||||
Rate r = rs.getRate(60*1000);
|
||||
if (r == null)
|
||||
return false;
|
||||
// weight current period higher
|
||||
long count = r.getLastEventCount() + (3 * r.getCurrentEventCount());
|
||||
int bw = 0;
|
||||
if (count > 0)
|
||||
bw = (int) ((r.getLastTotalValue() + (3 * r.getCurrentTotalValue())) / count);
|
||||
else
|
||||
bw = (int) r.getLifetimeAverageValue();
|
||||
|
||||
int usedIn = Math.min(_context.router().get1sRateIn(), _context.router().get15sRateIn());
|
||||
if (bw < usedIn)
|
||||
usedIn = bw;
|
||||
if (usedIn <= 0)
|
||||
return false;
|
||||
int usedOut = Math.min(_context.router().get1sRate(true), _context.router().get15sRate(true));
|
||||
if (bw < usedOut)
|
||||
usedOut = bw;
|
||||
if (usedOut <= 0)
|
||||
return false;
|
||||
int used = Math.min(usedIn, usedOut);
|
||||
****/
|
||||
int used = _context.bandwidthLimiter().getCurrentParticipatingBandwidth();
|
||||
if (used <= 0)
|
||||
return false;
|
||||
|
||||
int maxKBps = Math.min(_context.bandwidthLimiter().getInboundKBytesPerSecond(),
|
||||
_context.bandwidthLimiter().getOutboundKBytesPerSecond());
|
||||
float share = (float) _context.router().getSharePercentage();
|
||||
|
||||
// start dropping at 120% of the limit,
|
||||
// as we rely on Throttle for long-term bandwidth control by rejecting tunnels
|
||||
float maxBps = maxKBps * share * (1024f * 1.20f);
|
||||
float pctDrop = (used - maxBps) / used;
|
||||
if (pctDrop <= 0)
|
||||
return false;
|
||||
// increase the drop probability for OBEP,
|
||||
// (except lower it for tunnel build messages type 21/22/23/24),
|
||||
// and lower it for IBGW, for network efficiency
|
||||
double len = length;
|
||||
float factor;
|
||||
if (loc == Location.OBEP) {
|
||||
// we don't need to check for VTBRM/TBRM as that happens at tunnel creation
|
||||
if (type == VariableTunnelBuildMessage.MESSAGE_TYPE || type == TunnelBuildMessage.MESSAGE_TYPE)
|
||||
len /= 1.5;
|
||||
factor = 1 / 1.5f;
|
||||
else
|
||||
len *= 1.5;
|
||||
factor = 1.5f;
|
||||
} else if (loc == Location.IBGW) {
|
||||
// we don't need to check for VTBM/TBM as that happens at tunnel creation
|
||||
if (type == VariableTunnelBuildReplyMessage.MESSAGE_TYPE || type == TunnelBuildReplyMessage.MESSAGE_TYPE)
|
||||
len /= 1.5 * 1.5 * 1.5;
|
||||
factor = 1 / (1.5f * 1.5f * 1.5f);
|
||||
else
|
||||
len /= 1.5;
|
||||
factor = 1 / 1.5f;
|
||||
} else {
|
||||
factor = 1.0f;
|
||||
}
|
||||
// drop in proportion to size w.r.t. a standard 1024-byte message
|
||||
// this is a little expensive but we want to adjust the curve between 0 and 1
|
||||
// Most messages are 1024, only at the OBEP do we see other sizes
|
||||
if ((int)len != 1024)
|
||||
pctDrop = (float) Math.pow(pctDrop, 1024d / len);
|
||||
float rand = _context.random().nextFloat();
|
||||
boolean reject = rand <= pctDrop;
|
||||
boolean reject = ! _context.bandwidthLimiter().sentParticipatingMessage(length, factor);
|
||||
if (reject) {
|
||||
if (_log.shouldLog(Log.WARN)) {
|
||||
int availBps = (int) (((maxKBps*1024)*share) - used);
|
||||
_log.warn("Drop part. msg. avail/max/used " + availBps + "/" + (int) maxBps + "/"
|
||||
+ used + " %Drop = " + pctDrop
|
||||
_log.warn("Drop part. msg. factor=" + factor
|
||||
+ ' ' + loc + ' ' + type + ' ' + length);
|
||||
}
|
||||
_context.statManager().addRateData("tunnel.participatingMessageDropped", 1);
|
||||
|
@@ -203,7 +203,6 @@ class TunnelParticipant {
|
||||
TunnelDataMessage.MESSAGE_TYPE, 1024))
|
||||
return;
|
||||
//_config.incrementSentMessages();
|
||||
_context.bandwidthLimiter().sentParticipatingMessage(1024);
|
||||
long oldId = msg.getUniqueId();
|
||||
long newId = _context.random().nextLong(I2NPMessage.MAX_ID_VALUE);
|
||||
_context.messageHistory().wrap("TunnelDataMessage", oldId, "TunnelDataMessage", newId);
|
||||
|
Reference in New Issue
Block a user