Compare commits

...

2 Commits

Author SHA1 Message Date
zzz
0348905f34 cleanups, remove commented-out linear average bw code
repleace magic numbers with statics
create new SREDQ on config change (untested)
change default thresholds from 100-500 ms to 77-333 ms
2021-03-15 09:04:49 -04:00
zzz
aa4a0419ba Tunnels: Fix RED
This builds on and replaces MR !23 which fixed the bw calc used for RED by
changing from a 40 ms bucket to an exponential moving average.

Here we fix the other part of RED which is the dropping calculation.
Previously, it simply used the bw calc to start dropping if the
bw was higher than a threshold. The drop percentage rose from
0 to 100%, linearly, based on how far the bandwidth was
above the threshold. This was far, far from the RED paper.

Now, we follow the RED paper (see ref. in SyntheticREDQueue javadoc)
to calculate an average queue size, using the exact same
exponential moving average method used for bandwidth.
Similar to CoDel, it also includes a count of how long
the size is over the threshold, and increases the drop probability
with the count.
The unadjusted drop probability rises from 0 to 2% and then
everything is dropped, as in the RED paper.
The low and high thresholds are configured at 100 ms and 500 ms
of queued data, respectively.

The queue is "synthetic" in that there's not actually a queue.
It only calculates how big the queue would be if it were
a real queue and were being emptied at exactly the
target rate.
All actual queueing is done downstream in the transports
and in UDP-Sender.

The goals are, for an 80% default share, to do most of the
part. traffic dropping here in RED, not downstream in UDP-Sender,
while fully utilizing the configured share bandwidth.
If the router goes into high message delay mode, that means
we are not dropping enough in RED.
Above 80% share this probably doesn't work as well.

Tested for a few days in the live net.
There will probably be more tuning required, in particular
to achieve the goal in live net of "protecting" the UDP-Sender
queue and local client/router traffic by dropping
more aggressively in RED.

WIP, not for merging, see !23 for additional comments.
2021-03-13 07:26:04 -05:00
8 changed files with 434 additions and 155 deletions

View File

@@ -7,7 +7,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.router.RouterContext;
import net.i2p.router.util.PQEntry;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@@ -33,7 +33,7 @@ import net.i2p.util.Log;
*/
public class FIFOBandwidthLimiter {
private final Log _log;
private final I2PAppContext _context;
private final RouterContext _context;
private final List<SimpleRequest> _pendingInboundRequests;
private final List<SimpleRequest> _pendingOutboundRequests;
/** how many bytes we can consume for inbound transmission immediately */
@@ -84,7 +84,7 @@ public class FIFOBandwidthLimiter {
return System.currentTimeMillis();
}
public FIFOBandwidthLimiter(I2PAppContext context) {
public FIFOBandwidthLimiter(RouterContext context) {
_context = context;
_log = context.logManager().getLog(FIFOBandwidthLimiter.class);
_context.statManager().createRateStat("bwLimiter.pendingOutboundRequests", "How many outbound requests are ahead of the current one (ignoring ones with 0)?", "BandwidthLimiter", new long[] { 5*60*1000l, 60*60*1000l });
@@ -192,8 +192,8 @@ public class FIFOBandwidthLimiter {
* @param size bytes
* @since 0.8.12
*/
public void sentParticipatingMessage(int size) {
_refiller.incrementParticipatingMessageBytes(size);
public boolean sentParticipatingMessage(int size, float factor) {
return _refiller.incrementParticipatingMessageBytes(size, factor);
}
/**

View File

@@ -5,8 +5,8 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter.Request;
import net.i2p.util.Log;
@@ -22,8 +22,11 @@ import net.i2p.util.Log;
*/
public class FIFOBandwidthRefiller implements Runnable {
private final Log _log;
private final I2PAppContext _context;
private final RouterContext _context;
private final FIFOBandwidthLimiter _limiter;
// This is only changed if the config changes
private volatile SyntheticREDQueue _partBWE;
/** how many KBps do we want to allow? */
private int _inboundKBytesPerSecond;
/** how many KBps do we want to allow? */
@@ -76,6 +79,9 @@ public class FIFOBandwidthRefiller implements Runnable {
* See util/DecayingBloomFilter and tunnel/BloomFilterIVValidator.
*/
public static final int MAX_OUTBOUND_BANDWIDTH = 16384;
private static final float MAX_SHARE_PERCENTAGE = 0.90f;
private static final float SHARE_LIMIT_FACTOR = 0.95f;
/**
* how often we replenish the queues.
@@ -83,10 +89,11 @@ public class FIFOBandwidthRefiller implements Runnable {
*/
private static final long REPLENISH_FREQUENCY = 40;
FIFOBandwidthRefiller(I2PAppContext context, FIFOBandwidthLimiter limiter) {
FIFOBandwidthRefiller(RouterContext context, FIFOBandwidthLimiter limiter) {
_limiter = limiter;
_context = context;
_log = context.logManager().getLog(FIFOBandwidthRefiller.class);
_context.statManager().createRateStat("bwLimiter.participatingBandwidthQueue", "size in bytes", "BandwidthLimiter", new long[] { 5*60*1000l, 60*60*1000l });
reinitialize();
_isRunning = true;
}
@@ -100,6 +107,7 @@ public class FIFOBandwidthRefiller implements Runnable {
// bootstrap 'em with nothing
_lastRefillTime = _limiter.now();
List<FIFOBandwidthLimiter.Request> buffer = new ArrayList<Request>(2);
byte i = 0;
while (_isRunning) {
long now = _limiter.now();
if (now >= _lastCheckConfigTime + _configCheckPeriodMs) {
@@ -107,8 +115,10 @@ public class FIFOBandwidthRefiller implements Runnable {
now = _limiter.now();
_lastCheckConfigTime = now;
}
// just for the stats
if ((++i) == 0)
updateParticipating(now);
updateParticipating(now);
boolean updated = updateQueues(buffer, now);
if (updated) {
_lastRefillTime = now;
@@ -171,6 +181,16 @@ public class FIFOBandwidthRefiller implements Runnable {
return false;
}
}
/**
* In Bytes per second
*/
private int getShareBandwidth() {
int maxKBps = Math.min(_inboundKBytesPerSecond, _outboundKBytesPerSecond);
// limit to 90% so it doesn't clog up at the transport bandwidth limiter
float share = Math.min((float) _context.router().getSharePercentage(), MAX_SHARE_PERCENTAGE);
return (int) (maxKBps * share * 1024f * SHARE_LIMIT_FACTOR);
}
private void checkConfig() {
updateInboundRate();
@@ -179,7 +199,13 @@ public class FIFOBandwidthRefiller implements Runnable {
updateOutboundBurstRate();
updateInboundPeak();
updateOutboundPeak();
// if share bandwidth config changed, throw out the SyntheticREDQueue and make a new one
int maxBps = getShareBandwidth();
if (_partBWE == null || maxBps != _partBWE.getMaxBandwidth()) {
_partBWE = new SyntheticREDQueue(_context, maxBps);
}
// We are always limited for now
//_limiter.setInboundUnlimited(_inboundKBytesPerSecond <= 0);
//_limiter.setOutboundUnlimited(_outboundKBytesPerSecond <= 0);
@@ -299,34 +325,14 @@ public class FIFOBandwidthRefiller implements Runnable {
int getOutboundBurstKBytesPerSecond() { return _outboundBurstKBytesPerSecond; }
int getInboundBurstKBytesPerSecond() { return _inboundBurstKBytesPerSecond; }
/**
* Participating counter stuff below here
* TOTAL_TIME needs to be high enough to get a burst without dropping
* @since 0.8.12
*/
private static final int TOTAL_TIME = 4000;
private static final int PERIODS = TOTAL_TIME / (int) REPLENISH_FREQUENCY;
/** count in current replenish period */
private final AtomicInteger _currentParticipating = new AtomicInteger();
private long _lastPartUpdateTime;
private int _lastTotal;
/** the actual length of last total period as coalesced (nominally TOTAL_TIME) */
private long _lastTotalTime;
private int _lastIndex;
/** buffer of count per replenish period, last is at _lastIndex, older at higher indexes (wraps) */
private final int[] _counts = new int[PERIODS];
/** the actual length of the period (nominally REPLENISH_FREQUENCY) */
private final long[] _times = new long[PERIODS];
private final ReentrantReadWriteLock _updateLock = new ReentrantReadWriteLock(false);
/**
* We sent a message.
*
* @param size bytes
* @since 0.8.12
*/
void incrementParticipatingMessageBytes(int size) {
_currentParticipating.addAndGet(size);
boolean incrementParticipatingMessageBytes(int size, float factor) {
return _partBWE.offer(size, factor);
}
/**
@@ -336,24 +342,7 @@ public class FIFOBandwidthRefiller implements Runnable {
* @since 0.8.12
*/
int getCurrentParticipatingBandwidth() {
_updateLock.readLock().lock();
try {
return locked_getCurrentParticipatingBandwidth();
} finally {
_updateLock.readLock().unlock();
}
}
private int locked_getCurrentParticipatingBandwidth() {
int current = _currentParticipating.get();
long totalTime = (_limiter.now() - _lastPartUpdateTime) + _lastTotalTime;
if (totalTime <= 0)
return 0;
// 1000 for ms->seconds in denominator
long bw = 1000l * (current + _lastTotal) / totalTime;
if (bw > Integer.MAX_VALUE)
return 0;
return (int) bw;
return (int) (_partBWE.getBandwidthEstimate() * 1000f);
}
/**
@@ -362,42 +351,7 @@ public class FIFOBandwidthRefiller implements Runnable {
* @since 0.8.12
*/
private void updateParticipating(long now) {
_updateLock.writeLock().lock();
try {
locked_updateParticipating(now);
} finally {
_updateLock.writeLock().unlock();
}
}
private void locked_updateParticipating(long now) {
long elapsed = now - _lastPartUpdateTime;
if (elapsed <= 0) {
// glitch in the matrix
_lastPartUpdateTime = now;
return;
}
_lastPartUpdateTime = now;
if (--_lastIndex < 0)
_lastIndex = PERIODS - 1;
_counts[_lastIndex] = _currentParticipating.getAndSet(0);
_times[_lastIndex] = elapsed;
_lastTotal = 0;
_lastTotalTime = 0;
// add up total counts and times
for (int i = 0; i < PERIODS; i++) {
int idx = (_lastIndex + i) % PERIODS;
_lastTotal += _counts[idx];
_lastTotalTime += _times[idx];
if (_lastTotalTime >= TOTAL_TIME)
break;
}
if (_lastIndex == 0 && _lastTotalTime > 0) {
long bw = 1000l * _lastTotal / _lastTotalTime;
_context.statManager().addRateData("tunnel.participatingBandwidthOut", bw);
if (_lastTotal > 0 && _log.shouldLog(Log.INFO))
_log.info(DataHelper.formatSize(_lastTotal) + " bytes out part. tunnels in last " + _lastTotalTime + " ms: " +
DataHelper.formatSize(bw) + " Bps");
}
_context.statManager().addRateData("tunnel.participatingBandwidthOut", getCurrentParticipatingBandwidth());
_context.statManager().addRateData("bwLimiter.participatingBandwidthQueue", (long) _partBWE.getQueueSizeEstimate());
}
}

View File

@@ -0,0 +1,375 @@
package net.i2p.router.transport;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.BandwidthEstimator;
import net.i2p.util.Log;
/**
* A "synthetic" queue in that it doesn't actually queue anything.
* Actual queueing is assumed to be "dowstream" of this.
*
* Maintains an average estimated "queue size" assuming a constant output rate
* declared in the constructor. The queue size is measured in bytes.
*
* With offer(), will return true for "accepted" or false for "dropped",
* based on the RED algorithm which uses the current average queue size
* and the offered data size to calculate a drop probability.
* Bandwidth is not directly used in the RED algorithm, except to
* synthetically calculate an average queue size assuming the
* queue is being drained precisely at that rate, byte-by-byte
* (not per-packet).
*
* addSample() unconditionally accepts the packet.
*
* Also maintains a Westwood+ bandwidth estimator.
* The bandwidth and queue size estimates are only updated if the
* packet is "accepted".
*
* The average queue size is calculated in the same manner as the
* bandwidth, with an update every WESTWOOD_RTT_MIN ms.
* Both estimators use
* a first stage anti-aliasing low pass filter based on RTT,
* and the time-varying Westwood filter based on inter-arrival time.
*
* Ref: Random Early Detection Gateways for Congestion Avoidance
* Sally Floyd and Van Jacobson
*
* Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks
* Casetti et al
* (Westwood)
*
* Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks
* Grieco and Mascolo
* (Westwood+)
*
* Adapted from: Linux kernel tcp_westwood.c (GPLv2)
*
* @since 0.9.49 adapted from streaming
*/
class SyntheticREDQueue implements BandwidthEstimator {
private final I2PAppContext _context;
private final Log _log;
private long _tAck;
// bw_est, bw_ns_est
private float _bKFiltered, _bK_ns_est;
// bk
private int _acked;
// RED vars
// pkts since last dropped pkt
private int _count = -1;
// smoothed average queue size in bytes
private float _avgQSize;
// last sample queue size in bytes
private float _qSize;
// current interval newly queued in bytes, since the last updateQSize()
private int _newDataSize;
// last time _avgQSize calculated
private long _tQSize;
// min queue size threshold, in bytes, to start dropping
private final int _minth;
// max queue size, in bytes, before dropping everything
private final int _maxth;
// bandwidth in bytes per second, as passed to the constructor.
private final int _bwBps;
// bandwidth in bytes per ms. The queue is drained at this rate.
private final float _bwBpms;
// As in RED paper
private static final float MAXP = 0.02f;
// As in kernel tcp_westwood.c
// Should probably match ConnectionOptions.TCP_ALPHA
private static final int DECAY_FACTOR = 8;
private static final int WESTWOOD_RTT_MIN = 500;
// denominator of time, 1/x seconds of traffic in the queue
private static final int DEFAULT_LOW_THRESHOLD = 13;
// denominator of time, 1/x seconds of traffic in the queue
private static final int DEFAULT_HIGH_THRESHOLD = 3;
/**
* Default thresholds.
* Min: 100 ms of traffic; max: 500 ms.
*
* @param bwKBps the output rate of the queue in Bps
*/
SyntheticREDQueue(I2PAppContext ctx, int bwBps) {
// the goal is to drop here rather than let the traffic
// get through to UDP-Sender CoDel and get dropped there,
// when we're at the default 80% share or below.
// That CoDel starts dropping when above 100 ms latency for 500 ms.
// let's try the same 100 ms of traffic here.
this(ctx, bwBps, bwBps / DEFAULT_LOW_THRESHOLD, bwBps / DEFAULT_HIGH_THRESHOLD);
}
/**
* Specified queue size thresholds.
* offer() drops a 1024 byte packet at 2% probability just lower than maxThKB,
* and at 100% probability higher than maxThKB.
*
* @param bwKBps the output rate of the queue in Bps
* @param minThKB the minimum queue size to start dropping in Bytes
* @param maxThKB the queue size to drop everything in Bytes
*/
SyntheticREDQueue(I2PAppContext ctx, int bwBps, int minThB, int maxThB) {
_log = ctx.logManager().getLog(SyntheticREDQueue.class);
_context = ctx;
// assume we're about to send something
_tAck = ctx.clock().now();
_acked = -1;
_minth = minThB;
_maxth = maxThB;
_bwBps = bwBps;
_bwBpms = bwBps / 1000f;
_tQSize = _tAck;
if (_log.shouldDebug())
_log.debug("Configured " + bwBps + " BPS, min: " + minThB + " B, max: " + maxThB + " B");
}
/**
*
* Nominal bandwidth limit in bytes per second, as passed to the constructor.
*
*/
public int getMaxBandwidth() {
return _bwBps;
}
/**
* Unconditional, never drop.
* The queue size and bandwidth estimates will be updated.
*/
public void addSample(int size) {
offer(size, 0);
}
/**
* Should we drop this packet?
* If accepted, the queue size and bandwidth estimates will be updated.
*
* @param size how many bytes to be offered
* @param factor how to adjust the size for the drop probability calculation,
* or 1.0 for standard probability. 0 to prevent dropping.
* Does not affect bandwidth calculations.
* @return true for accepted, false for drop
*/
public boolean offer(int size, float factor) {
long now = _context.clock().now();
return addSample(size, factor, now);
}
private synchronized boolean addSample(int acked, float factor, long now) {
if (_acked < 0) {
// first sample
// use time since constructed as the RTT
long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN);
float bkdt = ((float) acked) / deltaT;
_bKFiltered = bkdt;
_bK_ns_est = bkdt;
_acked = 0;
_tAck = now;
_tQSize = now;
_newDataSize = acked;
if (_log.shouldDebug())
_log.debug("first sample bytes: " + acked + " deltaT: " + deltaT + ' ' + this);
return true;
} else {
// update queue size average if necessary
// the current sample is not included in the calculation
long deltaT = now - _tQSize;
if (deltaT > WESTWOOD_RTT_MIN)
updateQSize(now, deltaT);
if (factor > 0) {
// drop calculation
if (_avgQSize > _maxth) {
if (_log.shouldWarn())
_log.warn("drop bytes (qsize): " + acked + ' ' + this);
_count = 0;
return false;
}
if (_avgQSize > _minth) {
_count++;
float pb = (acked / 1024f) * factor * MAXP * (_avgQSize - _minth) / (_maxth - _minth);
float pa = pb / (1 - (_count * pb));
float rand = _context.random().nextFloat();
if (rand < pa) {
if (_log.shouldWarn())
_log.warn("drop bytes (prob): " + acked + " factor " + factor + " prob: " + pa + " deltaT: " + deltaT + ' ' + this);
_count = 0;
return false;
}
_count = -1;
}
}
// accepted
_newDataSize += acked;
_acked += acked;
// update bandwidth estimate if necessary
deltaT = now - _tAck;
if (deltaT >= WESTWOOD_RTT_MIN)
computeBWE(now, (int) deltaT);
if (_log.shouldDebug())
_log.debug("accept bytes: " + acked + " factor " + factor + ' ' + this);
return true;
}
}
/**
* @return the current bandwidth estimate in bytes/ms.
*/
public float getBandwidthEstimate() {
long now = _context.clock().now();
// anti-aliasing filter
// As in kernel tcp_westwood.c
// and the Westwood+ paper
synchronized(this) {
long deltaT = now - _tAck;
if (deltaT >= WESTWOOD_RTT_MIN)
return computeBWE(now, (int) deltaT);
return _bKFiltered;
}
}
/**
* @return the current queue size estimate in bytes.
*/
public float getQueueSizeEstimate() {
long now = _context.clock().now();
// anti-aliasing filter
// As in kernel tcp_westwood.c
// and the Westwood+ paper
synchronized(this) {
long deltaT = now - _tQSize;
if (deltaT >= WESTWOOD_RTT_MIN)
updateQSize(now, deltaT);
return _avgQSize;
}
}
private synchronized float computeBWE(final long now, final int rtt) {
if (_acked < 0)
return 0.0f; // nothing ever sampled
updateBK(now, _acked, rtt);
_acked = 0;
return _bKFiltered;
}
/**
* Optimized version of updateBK with packets == 0
*/
private void decay() {
_bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR;
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
}
private void decayQueue(int rtt) {
_qSize -= rtt * _bwBpms;
if (_qSize < 1)
_qSize = 0;
_avgQSize = westwood_do_filter(_avgQSize, _qSize);
}
/**
* Here we insert virtual null samples if necessary as in Westwood,
* And use a very simple EWMA (exponential weighted moving average)
* time-varying filter, as in kernel tcp_westwood.c
*
* @param time the time of the measurement
* @param packets number of bytes acked
* @param rtt current rtt
*/
private void updateBK(long time, int packets, int rtt) {
long deltaT = time - _tAck;
if (rtt < WESTWOOD_RTT_MIN)
rtt = WESTWOOD_RTT_MIN;
if (deltaT > 2 * rtt) {
// Decay with virtual null samples as in the Westwood paper
int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR);
for (int i = 0; i < numrtts; i++) {
if (_bKFiltered <= 0)
break;
decay();
}
deltaT -= numrtts * rtt;
//if (_log.shouldDebug())
// _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
float bkdt;
if (packets > 0) {
// As in kernel tcp_westwood.c
bkdt = ((float) packets) / deltaT;
_bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt);
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
} else {
bkdt = 0;
decay();
}
_tAck = time;
//if (_log.shouldDebug())
// _log.debug("computeBWE bytes: " + packets + " deltaT: " + deltaT +
// " bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
/**
* Here we insert virtual null samples if necessary as in Westwood,
* And use a very simple EWMA (exponential weighted moving average)
* time-varying filter, as in kernel tcp_westwood.c
*
* @param time the time of the measurement
* @param deltaT at least WESTWOOD_RTT_MIN
*/
private void updateQSize(long time, long deltaT) {
long origDT = deltaT;
if (deltaT > 2 * WESTWOOD_RTT_MIN) {
// Decay with virtual null samples as in the Westwood paper
int numrtts = Math.min((int) ((deltaT / WESTWOOD_RTT_MIN) - 1), 2 * DECAY_FACTOR);
for (int i = 0; i < numrtts; i++) {
if (_avgQSize <= 0)
break;
decayQueue(WESTWOOD_RTT_MIN);
}
deltaT -= numrtts * WESTWOOD_RTT_MIN;
//if (_log.shouldDebug())
// _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
int origNDS = _newDataSize;
float newQSize = _newDataSize;
if (_newDataSize > 0) {
newQSize -= deltaT * _bwBpms;
if (newQSize < 1)
newQSize = 0;
_qSize = westwood_do_filter(_qSize, newQSize);
_avgQSize = westwood_do_filter(_avgQSize, _qSize);
_newDataSize = 0;
} else {
decayQueue((int) deltaT);
}
_tQSize = time;
if (_log.shouldDebug())
_log.debug("computeQS deltaT: " + origDT +
" newData: " + origNDS +
" newQsize: " + newQSize + " qSize: " + _qSize + ' ' + this);
}
/**
* As in kernel tcp_westwood.c
*/
private static float westwood_do_filter(float a, float b) {
return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR;
}
@Override
public synchronized String toString() {
return "SREDQ[" +
//" _bKFiltered " + _bKFiltered +
//" _tAck " + _tAck + "; " +
//" _tQSize " + _tQSize +
' ' + DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000), false) +
"Bps, avg_qsize " +
DataHelper.formatSize2((long) _avgQSize, false) +
"B]";
}
}

View File

@@ -52,7 +52,6 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver {
//if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
// return -1;
//_config.incrementSentMessages();
_context.bandwidthLimiter().sentParticipatingMessage(1024);
TunnelDataMessage msg = new TunnelDataMessage(_context);
msg.setData(encrypted);
msg.setTunnelId(_config.getSendTunnel());

View File

@@ -79,8 +79,6 @@ class OutboundTunnelEndpoint {
//int kb = (size + 1023) / 1024;
//for (int i = 0; i < kb; i++)
// _config.incrementSentMessages();
if (!toUs)
_context.bandwidthLimiter().sentParticipatingMessage(size);
_outDistributor.distribute(msg, toRouter, toTunnel);
}
}

View File

@@ -34,7 +34,15 @@ class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
// Hard to do this exactly, but we'll assume 2:1 batching
// for the purpose of estimating outgoing size.
// We assume that it's the outbound bandwidth that is the issue...
int size = Math.max(msg.getMessageSize(), 1024/2);
// first frag. overhead
int size = msg.getMessageSize() + 60;
if (size > 1024) {
// additional frag. overhead
size += 28 * (size / 1024);
} else if (size < 512) {
// 2:1 batching of small messages
size = 512;
}
if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.IBGW, msg.getType(), size)) {
// this overstates the stat somewhat, but ok for now
int kb = (size + 1023) / 1024;

View File

@@ -767,84 +767,30 @@ public class TunnelDispatcher implements Service {
public boolean shouldDropParticipatingMessage(Location loc, int type, int length) {
if (length <= 0)
return false;
/****
Don't use the tunnel.participatingBandwidth stat any more. It could be up to 3 minutes old.
Also, it counts inbound bandwidth, i.e. before dropping, which resulted in too many drops
during a burst.
We now use the bandwidth limiter to track outbound participating bandwidth
over the last few seconds.
****/
/****
RateStat rs = _context.statManager().getRate("tunnel.participatingBandwidth");
if (rs == null)
return false;
Rate r = rs.getRate(60*1000);
if (r == null)
return false;
// weight current period higher
long count = r.getLastEventCount() + (3 * r.getCurrentEventCount());
int bw = 0;
if (count > 0)
bw = (int) ((r.getLastTotalValue() + (3 * r.getCurrentTotalValue())) / count);
else
bw = (int) r.getLifetimeAverageValue();
int usedIn = Math.min(_context.router().get1sRateIn(), _context.router().get15sRateIn());
if (bw < usedIn)
usedIn = bw;
if (usedIn <= 0)
return false;
int usedOut = Math.min(_context.router().get1sRate(true), _context.router().get15sRate(true));
if (bw < usedOut)
usedOut = bw;
if (usedOut <= 0)
return false;
int used = Math.min(usedIn, usedOut);
****/
int used = _context.bandwidthLimiter().getCurrentParticipatingBandwidth();
if (used <= 0)
return false;
int maxKBps = Math.min(_context.bandwidthLimiter().getInboundKBytesPerSecond(),
_context.bandwidthLimiter().getOutboundKBytesPerSecond());
float share = (float) _context.router().getSharePercentage();
// start dropping at 120% of the limit,
// as we rely on Throttle for long-term bandwidth control by rejecting tunnels
float maxBps = maxKBps * share * (1024f * 1.20f);
float pctDrop = (used - maxBps) / used;
if (pctDrop <= 0)
return false;
// increase the drop probability for OBEP,
// (except lower it for tunnel build messages type 21/22/23/24),
// and lower it for IBGW, for network efficiency
double len = length;
float factor;
if (loc == Location.OBEP) {
// we don't need to check for VTBRM/TBRM as that happens at tunnel creation
if (type == VariableTunnelBuildMessage.MESSAGE_TYPE || type == TunnelBuildMessage.MESSAGE_TYPE)
len /= 1.5;
factor = 1 / 1.5f;
else
len *= 1.5;
factor = 1.5f;
} else if (loc == Location.IBGW) {
// we don't need to check for VTBM/TBM as that happens at tunnel creation
if (type == VariableTunnelBuildReplyMessage.MESSAGE_TYPE || type == TunnelBuildReplyMessage.MESSAGE_TYPE)
len /= 1.5 * 1.5 * 1.5;
factor = 1 / (1.5f * 1.5f * 1.5f);
else
len /= 1.5;
factor = 1 / 1.5f;
} else {
factor = 1.0f;
}
// drop in proportion to size w.r.t. a standard 1024-byte message
// this is a little expensive but we want to adjust the curve between 0 and 1
// Most messages are 1024, only at the OBEP do we see other sizes
if ((int)len != 1024)
pctDrop = (float) Math.pow(pctDrop, 1024d / len);
float rand = _context.random().nextFloat();
boolean reject = rand <= pctDrop;
boolean reject = ! _context.bandwidthLimiter().sentParticipatingMessage(length, factor);
if (reject) {
if (_log.shouldLog(Log.WARN)) {
int availBps = (int) (((maxKBps*1024)*share) - used);
_log.warn("Drop part. msg. avail/max/used " + availBps + "/" + (int) maxBps + "/"
+ used + " %Drop = " + pctDrop
_log.warn("Drop part. msg. factor=" + factor
+ ' ' + loc + ' ' + type + ' ' + length);
}
_context.statManager().addRateData("tunnel.participatingMessageDropped", 1);

View File

@@ -203,7 +203,6 @@ class TunnelParticipant {
TunnelDataMessage.MESSAGE_TYPE, 1024))
return;
//_config.incrementSentMessages();
_context.bandwidthLimiter().sentParticipatingMessage(1024);
long oldId = msg.getUniqueId();
long newId = _context.random().nextLong(I2NPMessage.MAX_ID_VALUE);
_context.messageHistory().wrap("TunnelDataMessage", oldId, "TunnelDataMessage", newId);