Compare commits

...

1 Commits

Author SHA1 Message Date
zzz
45c3fa842d Part. tunnel RED test
Change BW est. to exponential moving average instead of 40 ms bucket
Also use BW est. for tunnel.participatingBandwidthOut stat
Comment out linear moving average code previously used for stat
Reduce RED threshold from 120% to 95% of limit
For testing only, not to be merged
2021-03-10 07:21:44 -05:00
3 changed files with 188 additions and 3 deletions

View File

@@ -8,6 +8,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.router.transport.FIFOBandwidthLimiter.Request;
import net.i2p.util.BandwidthEstimator;
import net.i2p.util.Log;
/**
@@ -24,6 +25,8 @@ public class FIFOBandwidthRefiller implements Runnable {
private final Log _log;
private final I2PAppContext _context;
private final FIFOBandwidthLimiter _limiter;
private final BandwidthEstimator _partBWE;
/** how many KBps do we want to allow? */
private int _inboundKBytesPerSecond;
/** how many KBps do we want to allow? */
@@ -88,6 +91,7 @@ public class FIFOBandwidthRefiller implements Runnable {
_context = context;
_log = context.logManager().getLog(FIFOBandwidthRefiller.class);
reinitialize();
_partBWE = new SimpleBandwidthEstimator(context);
_isRunning = true;
}
@@ -100,6 +104,7 @@ public class FIFOBandwidthRefiller implements Runnable {
// bootstrap 'em with nothing
_lastRefillTime = _limiter.now();
List<FIFOBandwidthLimiter.Request> buffer = new ArrayList<Request>(2);
int i = 0;
while (_isRunning) {
long now = _limiter.now();
if (now >= _lastCheckConfigTime + _configCheckPeriodMs) {
@@ -107,8 +112,9 @@ public class FIFOBandwidthRefiller implements Runnable {
now = _limiter.now();
_lastCheckConfigTime = now;
}
if ((++i & 0x3f) == 0)
updateParticipating(now);
updateParticipating(now);
boolean updated = updateQueues(buffer, now);
if (updated) {
_lastRefillTime = now;
@@ -326,7 +332,8 @@ public class FIFOBandwidthRefiller implements Runnable {
* @since 0.8.12
*/
void incrementParticipatingMessageBytes(int size) {
_currentParticipating.addAndGet(size);
//_currentParticipating.addAndGet(size);
_partBWE.addSample(size);
}
/**
@@ -336,14 +343,18 @@ public class FIFOBandwidthRefiller implements Runnable {
* @since 0.8.12
*/
int getCurrentParticipatingBandwidth() {
return (int) (_partBWE.getBandwidthEstimate() * 1000f);
/*
_updateLock.readLock().lock();
try {
return locked_getCurrentParticipatingBandwidth();
} finally {
_updateLock.readLock().unlock();
}
*/
}
/*
private int locked_getCurrentParticipatingBandwidth() {
int current = _currentParticipating.get();
long totalTime = (_limiter.now() - _lastPartUpdateTime) + _lastTotalTime;
@@ -355,6 +366,7 @@ public class FIFOBandwidthRefiller implements Runnable {
return 0;
return (int) bw;
}
*/
/**
* Run once every replenish period
@@ -362,14 +374,18 @@ public class FIFOBandwidthRefiller implements Runnable {
* @since 0.8.12
*/
private void updateParticipating(long now) {
_context.statManager().addRateData("tunnel.participatingBandwidthOut", getCurrentParticipatingBandwidth());
/*
_updateLock.writeLock().lock();
try {
locked_updateParticipating(now);
} finally {
_updateLock.writeLock().unlock();
}
*/
}
/*
private void locked_updateParticipating(long now) {
long elapsed = now - _lastPartUpdateTime;
if (elapsed <= 0) {
@@ -400,4 +416,5 @@ public class FIFOBandwidthRefiller implements Runnable {
DataHelper.formatSize(bw) + " Bps");
}
}
*/
}

View File

@@ -0,0 +1,168 @@
package net.i2p.router.transport;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.BandwidthEstimator;
import net.i2p.util.Log;
/**
* A Westwood+ bandwidth estimator with
* a first stage anti-aliasing low pass filter based on RTT,
* and the time-varying Westwood filter based on inter-arrival time.
*
* Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks
* Casetti et al
* (Westwood)
*
* Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks
* Grieco and Mascolo
* (Westwood+)
*
* Adapted from: Linux kernel tcp_westwood.c (GPLv2)
*
* @since 0.9.49 adapted from streaming
*/
class SimpleBandwidthEstimator implements BandwidthEstimator {
private final I2PAppContext _context;
private final Log _log;
private long _tAck;
// bw_est, bw_ns_est
private float _bKFiltered, _bK_ns_est;
// bk
private int _acked;
// As in kernel tcp_westwood.c
// Should probably match ConnectionOptions.TCP_ALPHA
private static final int DECAY_FACTOR = 8;
private static final int WESTWOOD_RTT_MIN = 500;
SimpleBandwidthEstimator(I2PAppContext ctx) {
_log = ctx.logManager().getLog(SimpleBandwidthEstimator.class);
_context = ctx;
// assume we're about to send something
_tAck = ctx.clock().now();
_acked = -1;
}
/**
* Records an arriving ack.
* @param acked how many bytes were acked with this ack
*/
public void addSample(int acked) {
long now = _context.clock().now();
addSample(acked, now);
}
private synchronized void addSample(int acked, long now) {
if (_acked < 0) {
// first sample
// use time since constructed as the RTT
// getRTT() would return zero here.
long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN);
float bkdt = ((float) acked) / deltaT;
_bKFiltered = bkdt;
_bK_ns_est = bkdt;
_acked = 0;
_tAck = now;
if (_log.shouldDebug())
_log.debug("first sample bytes: " + acked + " deltaT: " + deltaT + ' ' + this);
} else {
_acked += acked;
// anti-aliasing filter
// As in kernel tcp_westwood.c
// and the Westwood+ paper
long deltaT = now - _tAck;
if (deltaT >= WESTWOOD_RTT_MIN)
computeBWE(now, (int) deltaT);
}
}
/**
* @return the current bandwidth estimate in bytes/ms.
*/
public float getBandwidthEstimate() {
long now = _context.clock().now();
// anti-aliasing filter
// As in kernel tcp_westwood.c
// and the Westwood+ paper
synchronized(this) {
long deltaT = now - _tAck;
if (deltaT >= WESTWOOD_RTT_MIN)
return computeBWE(now, (int) deltaT);
return _bKFiltered;
}
}
private synchronized float computeBWE(final long now, final int rtt) {
if (_acked < 0)
return 0.0f; // nothing ever sampled
updateBK(now, _acked, rtt);
_acked = 0;
return _bKFiltered;
}
/**
* Optimized version of updateBK with packets == 0
*/
private void decay() {
_bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR;
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
}
/**
* Here we insert virtual null samples if necessary as in Westwood,
* And use a very simple EWMA (exponential weighted moving average)
* time-varying filter, as in kernel tcp_westwood.c
*
* @param time the time of the measurement
* @param packets number of bytes acked
* @param rtt current rtt
*/
private void updateBK(long time, int packets, int rtt) {
long deltaT = time - _tAck;
if (rtt < WESTWOOD_RTT_MIN)
rtt = WESTWOOD_RTT_MIN;
if (deltaT > 2 * rtt) {
// Decay with virtual null samples as in the Westwood paper
int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR);
for (int i = 0; i < numrtts; i++) {
decay();
}
deltaT -= numrtts * rtt;
if (_log.shouldDebug())
_log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
float bkdt;
if (packets > 0) {
// As in kernel tcp_westwood.c
bkdt = ((float) packets) / deltaT;
_bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt);
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
} else {
bkdt = 0;
decay();
}
_tAck = time;
if (_log.shouldDebug())
_log.debug("computeBWE bytes: " + packets + " deltaT: " + deltaT +
" bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
/**
* As in kernel tcp_westwood.c
*/
private static float westwood_do_filter(float a, float b) {
return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR;
}
@Override
public synchronized String toString() {
return "SBE[" +
" _bKFiltered " + _bKFiltered +
" _tAck " + _tAck + "; " +
DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000), false) +
"Bps]";
}
}

View File

@@ -812,7 +812,7 @@ public class TunnelDispatcher implements Service {
// start dropping at 120% of the limit,
// as we rely on Throttle for long-term bandwidth control by rejecting tunnels
float maxBps = maxKBps * share * (1024f * 1.20f);
float maxBps = maxKBps * share * (1024f * 0.95f);
float pctDrop = (used - maxBps) / used;
if (pctDrop <= 0)
return false;