Compare commits

..

1 Commits

Author SHA1 Message Date
zzz
45c3fa842d Part. tunnel RED test
Change BW est. to exponential moving average instead of 40 ms bucket
Also use BW est. for tunnel.participatingBandwidthOut stat
Comment out linear moving average code previously used for stat
Reduce RED threshold from 120% to 95% of limit
For testing only, not to be merged
2021-03-10 07:21:44 -05:00
16 changed files with 361 additions and 544 deletions

View File

@@ -6,7 +6,7 @@
request.setCharacterEncoding("UTF-8");
response.setHeader("X-Frame-Options", "SAMEORIGIN");
response.setHeader("Content-Security-Policy", "default-src 'self'; style-src 'self' 'unsafe-inline'; script-src 'self'; frame-ancestors 'self'; object-src 'none'; media-src 'none'");
response.setHeader("Content-Security-Policy", "default-src 'self'; style-src 'self' 'unsafe-inline'; script-src 'none'; frame-ancestors 'self'; object-src 'none'; media-src 'none'");
response.setHeader("X-XSS-Protection", "1; mode=block");
response.setHeader("X-Content-Type-Options", "nosniff");
response.setHeader("Referrer-Policy", "no-referrer");

View File

@@ -14,8 +14,6 @@
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
<link href="/themes/console/images/favicon.ico" type="image/x-icon" rel="shortcut icon" />
<link href="<%=indexBean.getTheme()%>i2ptunnel.css?<%=net.i2p.CoreVersion.VERSION%>" rel="stylesheet" type="text/css" />
<script src="js/copy.js?<%=net.i2p.CoreVersion.VERSION%>" type="text/javascript"></script>
<noscript><style> .jsonly { display: none } </style></noscript>
</head><body id="tunnelListPage">
<div class="panel" id="overview"><h2><%=intl._t("Hidden Services Manager")%></h2><p>
<%=intl._t("These are the local services provided by your router.")%>
@@ -179,7 +177,7 @@
} // encName
%>
<tr>
<td class="tunnelDescription" colspan="2">
<td class="tunnelDescription" colspan="6">
<%
String descr = indexBean.getTunnelDescription(curServer);
if (descr != null && descr.length() > 0) {
@@ -193,38 +191,6 @@
} // descr
%>
</td>
<td class="tunnelPreview" colspan="1">
<%
if (("httpserver".equals(indexBean.getInternalType(curServer)) || ("httpbidirserver".equals(indexBean.getInternalType(curServer)))) && indexBean.getTunnelStatus(curServer) == IndexBean.RUNNING) {
if (name != null && !name.equals("") && name.endsWith(".i2p") ) {
%>
<textarea wrap="off" class="tunnelPreviewHostname" title="<%=intl._t("Share your site using the hostname")%>">http://<%=indexBean.getSpoofedHost(curServer)%>/?i2paddresshelper=<%=indexBean.getDestHashBase32(curServer)%></textarea>
<%
}
} else {
// needed to make the spacing look right
%>&nbsp;
<%
}
%>
</td>
<td class="tunnelPreview" colspan="1">
<%
if (("httpserver".equals(indexBean.getInternalType(curServer)) || ("httpbidirserver".equals(indexBean.getInternalType(curServer)))) && indexBean.getTunnelStatus(curServer) == IndexBean.RUNNING) {
if (name != null && !name.equals("") && name.endsWith(".i2p") ) {
%>
<button class="jsonly control tunnelHostnameCopy tunnelPreview" title="<%=intl._t("Copy the hostname to the clipboard")%>"><%=intl._t("Copy Hostname")%></button>
<%
}
} else {
// needed to make the spacing look right
%>&nbsp;
<%
}
%>
</td>
<td colspan="2">
</td>
</tr>
<%
} // for loop

View File

@@ -1,37 +0,0 @@
/* @license http://www.gnu.org/licenses/gpl-2.0.html GPL-2.0 */
/* see also licenses/LICENSE-GPLv2.txt */
function initCopyLink() {
var buttons = document.getElementsByClassName("tunnelHostnameCopy");
for (index = 0; index < buttons.length; index++) {
var button = buttons[index];
addClickHandler(button);
}
}
function addClickHandler(elem) {
elem.addEventListener("click", function() {
let prevElem = getPreviousHelper(elem).firstElementChild;
prevElem.select();
document.execCommand("copy");
alert("Copied the helper to the clipboard", prevElem.value);
});
}
document.addEventListener("DOMContentLoaded", function() {
initCopyLink();
}, true);
var getPreviousHelper = function (elem) {
var selector = ".tunnelPreview";
var parent = elem.parentElement
var sibling = parent.previousElementSibling;
while (sibling) {
if (sibling.matches(selector)) return sibling;
sibling = sibling.previousElementSibling;
}
return sibling
};
/* @license-end */

View File

@@ -583,23 +583,10 @@ html body#tunnelEditPage form div.panel table#serverTunnelEdit.tunnelConfig tbod
}
.tunnelPreview, .tunnelPort {
min-width: 15%;
width: 15%;
text-align: center;
}
.tunnelHostnameCopy {
margin-bottom: 1em !important;
margin-top: -1em !important;
}
.tunnelPreviewHostname {
margin-bottom: 1em !important;
margin-top: -1em !important;
width: 100%;
height: 1.5em;
overflow: hidden;
}
.tunnelLocation, .tunnelInterface {
width: 20%;
}

View File

@@ -3415,6 +3415,7 @@ h2 img {
h3 {
padding: 7px 5px 6px 7px;
margin: 12px 0 0 0;
border-radius: 0 2px 2px 0;
font-size: 11pt;
letter-spacing: 0.08em;

View File

@@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,7 +34,6 @@ import net.i2p.data.router.RouterInfo;
import net.i2p.router.JobImpl;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.FileSuffixFilter;
import net.i2p.util.FileUtil;
import net.i2p.util.I2PThread;
@@ -146,6 +144,7 @@ public class PersistentDataStore extends TransientDataStore {
public DatabaseEntry remove(Hash key, boolean persist) {
if (persist) {
_writer.remove(key);
_context.jobQueue().addJob(new RemoveJob(key));
}
return super.remove(key);
}
@@ -169,6 +168,24 @@ public class PersistentDataStore extends TransientDataStore {
return rv;
}
private class RemoveJob extends JobImpl {
private final Hash _key;
public RemoveJob(Hash key) {
super(PersistentDataStore.this._context);
_key = key;
}
public String getName() { return "Delete RI file"; }
public void runJob() {
if (_log.shouldLog(Log.INFO))
_log.info("Removing key " + _key /* , getAddedBy() */);
try {
removeFile(_key, _dbDir);
} catch (IOException ioe) {
_log.error("Error removing key " + _key, ioe);
}
}
}
/** How many files to write every 10 minutes. Doesn't make sense to limit it,
* they just back up in the queue hogging memory.
*/
@@ -185,19 +202,16 @@ public class PersistentDataStore extends TransientDataStore {
*/
private class Writer implements Runnable, Flushable {
private final Map<Hash, DatabaseEntry>_keys;
private final Set<Hash> _keysToRemove;
private final Object _waitLock;
private volatile boolean _quit;
public Writer() {
_keys = new ConcurrentHashMap<Hash, DatabaseEntry>(64);
_keysToRemove = new ConcurrentHashSet<Hash>();
_waitLock = new Object();
}
public void queue(Hash key, DatabaseEntry data) {
int pending = _keys.size();
_keysToRemove.remove(key);
boolean exists = (null != _keys.put(key, data));
if (exists)
_context.statManager().addRateData("netDb.writeClobber", pending);
@@ -206,25 +220,6 @@ public class PersistentDataStore extends TransientDataStore {
public void remove(Hash key) {
_keys.remove(key);
_keysToRemove.add(key);
}
/*
* @since 0.9.50 was in separate RemoveJob
*/
private void removeQueued() {
if (_keysToRemove.isEmpty())
return;
for (Iterator<Hash> iter = _keysToRemove.iterator(); iter.hasNext(); ) {
Hash key = iter.next();
iter.remove();
try {
removeFile(key, _dbDir);
} catch (IOException ioe) {
if (_log.shouldWarn())
_log.warn("Error removing key " + key, ioe);
}
}
}
public void run() {
@@ -265,7 +260,6 @@ public class PersistentDataStore extends TransientDataStore {
if (count >= WRITE_LIMIT)
count = 0;
if (count == 0) {
removeQueued();
if (lastCount > 0) {
long time = _context.clock().now() - startTime;
if (_log.shouldLog(Log.INFO))

View File

@@ -136,8 +136,8 @@ class TransientDataStore implements DataStore {
_log.info("Almost clobbered an old router! " + key + ": [old published on " + new Date(ori.getPublished()) +
" new on " + new Date(ri.getPublished()) + ']');
} else if (ri.getPublished() == ori.getPublished()) {
if (_log.shouldDebug())
_log.debug("Duplicate " + key);
if (_log.shouldLog(Log.INFO))
_log.info("Duplicate " + key);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Updated the old router for " + key + ": [old published on " + new Date(ori.getPublished()) +

View File

@@ -7,7 +7,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.router.RouterContext;
import net.i2p.I2PAppContext;
import net.i2p.router.util.PQEntry;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@@ -33,7 +33,7 @@ import net.i2p.util.Log;
*/
public class FIFOBandwidthLimiter {
private final Log _log;
private final RouterContext _context;
private final I2PAppContext _context;
private final List<SimpleRequest> _pendingInboundRequests;
private final List<SimpleRequest> _pendingOutboundRequests;
/** how many bytes we can consume for inbound transmission immediately */
@@ -84,7 +84,7 @@ public class FIFOBandwidthLimiter {
return System.currentTimeMillis();
}
public FIFOBandwidthLimiter(RouterContext context) {
public FIFOBandwidthLimiter(I2PAppContext context) {
_context = context;
_log = context.logManager().getLog(FIFOBandwidthLimiter.class);
_context.statManager().createRateStat("bwLimiter.pendingOutboundRequests", "How many outbound requests are ahead of the current one (ignoring ones with 0)?", "BandwidthLimiter", new long[] { 5*60*1000l, 60*60*1000l });
@@ -192,8 +192,8 @@ public class FIFOBandwidthLimiter {
* @param size bytes
* @since 0.8.12
*/
public boolean sentParticipatingMessage(int size, float factor) {
return _refiller.incrementParticipatingMessageBytes(size, factor);
public void sentParticipatingMessage(int size) {
_refiller.incrementParticipatingMessageBytes(size);
}
/**

View File

@@ -5,9 +5,10 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter.Request;
import net.i2p.util.BandwidthEstimator;
import net.i2p.util.Log;
/**
@@ -22,10 +23,9 @@ import net.i2p.util.Log;
*/
public class FIFOBandwidthRefiller implements Runnable {
private final Log _log;
private final RouterContext _context;
private final I2PAppContext _context;
private final FIFOBandwidthLimiter _limiter;
// This is only changed if the config changes
private volatile SyntheticREDQueue _partBWE;
private final BandwidthEstimator _partBWE;
/** how many KBps do we want to allow? */
private int _inboundKBytesPerSecond;
@@ -79,9 +79,6 @@ public class FIFOBandwidthRefiller implements Runnable {
* See util/DecayingBloomFilter and tunnel/BloomFilterIVValidator.
*/
public static final int MAX_OUTBOUND_BANDWIDTH = 16384;
private static final float MAX_SHARE_PERCENTAGE = 0.90f;
private static final float SHARE_LIMIT_FACTOR = 0.95f;
/**
* how often we replenish the queues.
@@ -89,12 +86,12 @@ public class FIFOBandwidthRefiller implements Runnable {
*/
private static final long REPLENISH_FREQUENCY = 40;
FIFOBandwidthRefiller(RouterContext context, FIFOBandwidthLimiter limiter) {
FIFOBandwidthRefiller(I2PAppContext context, FIFOBandwidthLimiter limiter) {
_limiter = limiter;
_context = context;
_log = context.logManager().getLog(FIFOBandwidthRefiller.class);
_context.statManager().createRateStat("bwLimiter.participatingBandwidthQueue", "size in bytes", "BandwidthLimiter", new long[] { 5*60*1000l, 60*60*1000l });
reinitialize();
_partBWE = new SimpleBandwidthEstimator(context);
_isRunning = true;
}
@@ -107,7 +104,7 @@ public class FIFOBandwidthRefiller implements Runnable {
// bootstrap 'em with nothing
_lastRefillTime = _limiter.now();
List<FIFOBandwidthLimiter.Request> buffer = new ArrayList<Request>(2);
byte i = 0;
int i = 0;
while (_isRunning) {
long now = _limiter.now();
if (now >= _lastCheckConfigTime + _configCheckPeriodMs) {
@@ -115,8 +112,7 @@ public class FIFOBandwidthRefiller implements Runnable {
now = _limiter.now();
_lastCheckConfigTime = now;
}
// just for the stats
if ((++i) == 0)
if ((++i & 0x3f) == 0)
updateParticipating(now);
boolean updated = updateQueues(buffer, now);
@@ -181,16 +177,6 @@ public class FIFOBandwidthRefiller implements Runnable {
return false;
}
}
/**
* In Bytes per second
*/
private int getShareBandwidth() {
int maxKBps = Math.min(_inboundKBytesPerSecond, _outboundKBytesPerSecond);
// limit to 90% so it doesn't clog up at the transport bandwidth limiter
float share = Math.min((float) _context.router().getSharePercentage(), MAX_SHARE_PERCENTAGE);
return (int) (maxKBps * share * 1024f * SHARE_LIMIT_FACTOR);
}
private void checkConfig() {
updateInboundRate();
@@ -199,13 +185,7 @@ public class FIFOBandwidthRefiller implements Runnable {
updateOutboundBurstRate();
updateInboundPeak();
updateOutboundPeak();
// if share bandwidth config changed, throw out the SyntheticREDQueue and make a new one
int maxBps = getShareBandwidth();
if (_partBWE == null || maxBps != _partBWE.getMaxBandwidth()) {
_partBWE = new SyntheticREDQueue(_context, maxBps);
}
// We are always limited for now
//_limiter.setInboundUnlimited(_inboundKBytesPerSecond <= 0);
//_limiter.setOutboundUnlimited(_outboundKBytesPerSecond <= 0);
@@ -325,14 +305,35 @@ public class FIFOBandwidthRefiller implements Runnable {
int getOutboundBurstKBytesPerSecond() { return _outboundBurstKBytesPerSecond; }
int getInboundBurstKBytesPerSecond() { return _inboundBurstKBytesPerSecond; }
/**
* Participating counter stuff below here
* TOTAL_TIME needs to be high enough to get a burst without dropping
* @since 0.8.12
*/
private static final int TOTAL_TIME = 4000;
private static final int PERIODS = TOTAL_TIME / (int) REPLENISH_FREQUENCY;
/** count in current replenish period */
private final AtomicInteger _currentParticipating = new AtomicInteger();
private long _lastPartUpdateTime;
private int _lastTotal;
/** the actual length of last total period as coalesced (nominally TOTAL_TIME) */
private long _lastTotalTime;
private int _lastIndex;
/** buffer of count per replenish period, last is at _lastIndex, older at higher indexes (wraps) */
private final int[] _counts = new int[PERIODS];
/** the actual length of the period (nominally REPLENISH_FREQUENCY) */
private final long[] _times = new long[PERIODS];
private final ReentrantReadWriteLock _updateLock = new ReentrantReadWriteLock(false);
/**
* We sent a message.
*
* @param size bytes
* @since 0.8.12
*/
boolean incrementParticipatingMessageBytes(int size, float factor) {
return _partBWE.offer(size, factor);
void incrementParticipatingMessageBytes(int size) {
//_currentParticipating.addAndGet(size);
_partBWE.addSample(size);
}
/**
@@ -343,8 +344,30 @@ public class FIFOBandwidthRefiller implements Runnable {
*/
int getCurrentParticipatingBandwidth() {
return (int) (_partBWE.getBandwidthEstimate() * 1000f);
/*
_updateLock.readLock().lock();
try {
return locked_getCurrentParticipatingBandwidth();
} finally {
_updateLock.readLock().unlock();
}
*/
}
/*
private int locked_getCurrentParticipatingBandwidth() {
int current = _currentParticipating.get();
long totalTime = (_limiter.now() - _lastPartUpdateTime) + _lastTotalTime;
if (totalTime <= 0)
return 0;
// 1000 for ms->seconds in denominator
long bw = 1000l * (current + _lastTotal) / totalTime;
if (bw > Integer.MAX_VALUE)
return 0;
return (int) bw;
}
*/
/**
* Run once every replenish period
*
@@ -352,6 +375,46 @@ public class FIFOBandwidthRefiller implements Runnable {
*/
private void updateParticipating(long now) {
_context.statManager().addRateData("tunnel.participatingBandwidthOut", getCurrentParticipatingBandwidth());
_context.statManager().addRateData("bwLimiter.participatingBandwidthQueue", (long) _partBWE.getQueueSizeEstimate());
/*
_updateLock.writeLock().lock();
try {
locked_updateParticipating(now);
} finally {
_updateLock.writeLock().unlock();
}
*/
}
/*
private void locked_updateParticipating(long now) {
long elapsed = now - _lastPartUpdateTime;
if (elapsed <= 0) {
// glitch in the matrix
_lastPartUpdateTime = now;
return;
}
_lastPartUpdateTime = now;
if (--_lastIndex < 0)
_lastIndex = PERIODS - 1;
_counts[_lastIndex] = _currentParticipating.getAndSet(0);
_times[_lastIndex] = elapsed;
_lastTotal = 0;
_lastTotalTime = 0;
// add up total counts and times
for (int i = 0; i < PERIODS; i++) {
int idx = (_lastIndex + i) % PERIODS;
_lastTotal += _counts[idx];
_lastTotalTime += _times[idx];
if (_lastTotalTime >= TOTAL_TIME)
break;
}
if (_lastIndex == 0 && _lastTotalTime > 0) {
long bw = 1000l * _lastTotal / _lastTotalTime;
_context.statManager().addRateData("tunnel.participatingBandwidthOut", bw);
if (_lastTotal > 0 && _log.shouldLog(Log.INFO))
_log.info(DataHelper.formatSize(_lastTotal) + " bytes out part. tunnels in last " + _lastTotalTime + " ms: " +
DataHelper.formatSize(bw) + " Bps");
}
}
*/
}

View File

@@ -0,0 +1,168 @@
package net.i2p.router.transport;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.BandwidthEstimator;
import net.i2p.util.Log;
/**
* A Westwood+ bandwidth estimator with
* a first stage anti-aliasing low pass filter based on RTT,
* and the time-varying Westwood filter based on inter-arrival time.
*
* Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks
* Casetti et al
* (Westwood)
*
* Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks
* Grieco and Mascolo
* (Westwood+)
*
* Adapted from: Linux kernel tcp_westwood.c (GPLv2)
*
* @since 0.9.49 adapted from streaming
*/
class SimpleBandwidthEstimator implements BandwidthEstimator {
private final I2PAppContext _context;
private final Log _log;
private long _tAck;
// bw_est, bw_ns_est
private float _bKFiltered, _bK_ns_est;
// bk
private int _acked;
// As in kernel tcp_westwood.c
// Should probably match ConnectionOptions.TCP_ALPHA
private static final int DECAY_FACTOR = 8;
private static final int WESTWOOD_RTT_MIN = 500;
SimpleBandwidthEstimator(I2PAppContext ctx) {
_log = ctx.logManager().getLog(SimpleBandwidthEstimator.class);
_context = ctx;
// assume we're about to send something
_tAck = ctx.clock().now();
_acked = -1;
}
/**
* Records an arriving ack.
* @param acked how many bytes were acked with this ack
*/
public void addSample(int acked) {
long now = _context.clock().now();
addSample(acked, now);
}
private synchronized void addSample(int acked, long now) {
if (_acked < 0) {
// first sample
// use time since constructed as the RTT
// getRTT() would return zero here.
long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN);
float bkdt = ((float) acked) / deltaT;
_bKFiltered = bkdt;
_bK_ns_est = bkdt;
_acked = 0;
_tAck = now;
if (_log.shouldDebug())
_log.debug("first sample bytes: " + acked + " deltaT: " + deltaT + ' ' + this);
} else {
_acked += acked;
// anti-aliasing filter
// As in kernel tcp_westwood.c
// and the Westwood+ paper
long deltaT = now - _tAck;
if (deltaT >= WESTWOOD_RTT_MIN)
computeBWE(now, (int) deltaT);
}
}
/**
* @return the current bandwidth estimate in bytes/ms.
*/
public float getBandwidthEstimate() {
long now = _context.clock().now();
// anti-aliasing filter
// As in kernel tcp_westwood.c
// and the Westwood+ paper
synchronized(this) {
long deltaT = now - _tAck;
if (deltaT >= WESTWOOD_RTT_MIN)
return computeBWE(now, (int) deltaT);
return _bKFiltered;
}
}
private synchronized float computeBWE(final long now, final int rtt) {
if (_acked < 0)
return 0.0f; // nothing ever sampled
updateBK(now, _acked, rtt);
_acked = 0;
return _bKFiltered;
}
/**
* Optimized version of updateBK with packets == 0
*/
private void decay() {
_bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR;
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
}
/**
* Here we insert virtual null samples if necessary as in Westwood,
* And use a very simple EWMA (exponential weighted moving average)
* time-varying filter, as in kernel tcp_westwood.c
*
* @param time the time of the measurement
* @param packets number of bytes acked
* @param rtt current rtt
*/
private void updateBK(long time, int packets, int rtt) {
long deltaT = time - _tAck;
if (rtt < WESTWOOD_RTT_MIN)
rtt = WESTWOOD_RTT_MIN;
if (deltaT > 2 * rtt) {
// Decay with virtual null samples as in the Westwood paper
int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR);
for (int i = 0; i < numrtts; i++) {
decay();
}
deltaT -= numrtts * rtt;
if (_log.shouldDebug())
_log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
float bkdt;
if (packets > 0) {
// As in kernel tcp_westwood.c
bkdt = ((float) packets) / deltaT;
_bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt);
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
} else {
bkdt = 0;
decay();
}
_tAck = time;
if (_log.shouldDebug())
_log.debug("computeBWE bytes: " + packets + " deltaT: " + deltaT +
" bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
/**
* As in kernel tcp_westwood.c
*/
private static float westwood_do_filter(float a, float b) {
return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR;
}
@Override
public synchronized String toString() {
return "SBE[" +
" _bKFiltered " + _bKFiltered +
" _tAck " + _tAck + "; " +
DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000), false) +
"Bps]";
}
}

View File

@@ -1,375 +0,0 @@
package net.i2p.router.transport;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.BandwidthEstimator;
import net.i2p.util.Log;
/**
* A "synthetic" queue in that it doesn't actually queue anything.
* Actual queueing is assumed to be "dowstream" of this.
*
* Maintains an average estimated "queue size" assuming a constant output rate
* declared in the constructor. The queue size is measured in bytes.
*
* With offer(), will return true for "accepted" or false for "dropped",
* based on the RED algorithm which uses the current average queue size
* and the offered data size to calculate a drop probability.
* Bandwidth is not directly used in the RED algorithm, except to
* synthetically calculate an average queue size assuming the
* queue is being drained precisely at that rate, byte-by-byte
* (not per-packet).
*
* addSample() unconditionally accepts the packet.
*
* Also maintains a Westwood+ bandwidth estimator.
* The bandwidth and queue size estimates are only updated if the
* packet is "accepted".
*
* The average queue size is calculated in the same manner as the
* bandwidth, with an update every WESTWOOD_RTT_MIN ms.
* Both estimators use
* a first stage anti-aliasing low pass filter based on RTT,
* and the time-varying Westwood filter based on inter-arrival time.
*
* Ref: Random Early Detection Gateways for Congestion Avoidance
* Sally Floyd and Van Jacobson
*
* Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks
* Casetti et al
* (Westwood)
*
* Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks
* Grieco and Mascolo
* (Westwood+)
*
* Adapted from: Linux kernel tcp_westwood.c (GPLv2)
*
* @since 0.9.49 adapted from streaming
*/
class SyntheticREDQueue implements BandwidthEstimator {
private final I2PAppContext _context;
private final Log _log;
private long _tAck;
// bw_est, bw_ns_est
private float _bKFiltered, _bK_ns_est;
// bk
private int _acked;
// RED vars
// pkts since last dropped pkt
private int _count = -1;
// smoothed average queue size in bytes
private float _avgQSize;
// last sample queue size in bytes
private float _qSize;
// current interval newly queued in bytes, since the last updateQSize()
private int _newDataSize;
// last time _avgQSize calculated
private long _tQSize;
// min queue size threshold, in bytes, to start dropping
private final int _minth;
// max queue size, in bytes, before dropping everything
private final int _maxth;
// bandwidth in bytes per second, as passed to the constructor.
private final int _bwBps;
// bandwidth in bytes per ms. The queue is drained at this rate.
private final float _bwBpms;
// As in RED paper
private static final float MAXP = 0.02f;
// As in kernel tcp_westwood.c
// Should probably match ConnectionOptions.TCP_ALPHA
private static final int DECAY_FACTOR = 8;
private static final int WESTWOOD_RTT_MIN = 500;
// denominator of time, 1/x seconds of traffic in the queue
private static final int DEFAULT_LOW_THRESHOLD = 13;
// denominator of time, 1/x seconds of traffic in the queue
private static final int DEFAULT_HIGH_THRESHOLD = 3;
/**
* Default thresholds.
* Min: 100 ms of traffic; max: 500 ms.
*
* @param bwKBps the output rate of the queue in Bps
*/
SyntheticREDQueue(I2PAppContext ctx, int bwBps) {
// the goal is to drop here rather than let the traffic
// get through to UDP-Sender CoDel and get dropped there,
// when we're at the default 80% share or below.
// That CoDel starts dropping when above 100 ms latency for 500 ms.
// let's try the same 100 ms of traffic here.
this(ctx, bwBps, bwBps / DEFAULT_LOW_THRESHOLD, bwBps / DEFAULT_HIGH_THRESHOLD);
}
/**
* Specified queue size thresholds.
* offer() drops a 1024 byte packet at 2% probability just lower than maxThKB,
* and at 100% probability higher than maxThKB.
*
* @param bwKBps the output rate of the queue in Bps
* @param minThKB the minimum queue size to start dropping in Bytes
* @param maxThKB the queue size to drop everything in Bytes
*/
SyntheticREDQueue(I2PAppContext ctx, int bwBps, int minThB, int maxThB) {
_log = ctx.logManager().getLog(SyntheticREDQueue.class);
_context = ctx;
// assume we're about to send something
_tAck = ctx.clock().now();
_acked = -1;
_minth = minThB;
_maxth = maxThB;
_bwBps = bwBps;
_bwBpms = bwBps / 1000f;
_tQSize = _tAck;
if (_log.shouldDebug())
_log.debug("Configured " + bwBps + " BPS, min: " + minThB + " B, max: " + maxThB + " B");
}
/**
*
* Nominal bandwidth limit in bytes per second, as passed to the constructor.
*
*/
public int getMaxBandwidth() {
return _bwBps;
}
/**
* Unconditional, never drop.
* The queue size and bandwidth estimates will be updated.
*/
public void addSample(int size) {
offer(size, 0);
}
/**
* Should we drop this packet?
* If accepted, the queue size and bandwidth estimates will be updated.
*
* @param size how many bytes to be offered
* @param factor how to adjust the size for the drop probability calculation,
* or 1.0 for standard probability. 0 to prevent dropping.
* Does not affect bandwidth calculations.
* @return true for accepted, false for drop
*/
public boolean offer(int size, float factor) {
long now = _context.clock().now();
return addSample(size, factor, now);
}
private synchronized boolean addSample(int acked, float factor, long now) {
if (_acked < 0) {
// first sample
// use time since constructed as the RTT
long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN);
float bkdt = ((float) acked) / deltaT;
_bKFiltered = bkdt;
_bK_ns_est = bkdt;
_acked = 0;
_tAck = now;
_tQSize = now;
_newDataSize = acked;
if (_log.shouldDebug())
_log.debug("first sample bytes: " + acked + " deltaT: " + deltaT + ' ' + this);
return true;
} else {
// update queue size average if necessary
// the current sample is not included in the calculation
long deltaT = now - _tQSize;
if (deltaT > WESTWOOD_RTT_MIN)
updateQSize(now, deltaT);
if (factor > 0) {
// drop calculation
if (_avgQSize > _maxth) {
if (_log.shouldWarn())
_log.warn("drop bytes (qsize): " + acked + ' ' + this);
_count = 0;
return false;
}
if (_avgQSize > _minth) {
_count++;
float pb = (acked / 1024f) * factor * MAXP * (_avgQSize - _minth) / (_maxth - _minth);
float pa = pb / (1 - (_count * pb));
float rand = _context.random().nextFloat();
if (rand < pa) {
if (_log.shouldWarn())
_log.warn("drop bytes (prob): " + acked + " factor " + factor + " prob: " + pa + " deltaT: " + deltaT + ' ' + this);
_count = 0;
return false;
}
_count = -1;
}
}
// accepted
_newDataSize += acked;
_acked += acked;
// update bandwidth estimate if necessary
deltaT = now - _tAck;
if (deltaT >= WESTWOOD_RTT_MIN)
computeBWE(now, (int) deltaT);
if (_log.shouldDebug())
_log.debug("accept bytes: " + acked + " factor " + factor + ' ' + this);
return true;
}
}
/**
* @return the current bandwidth estimate in bytes/ms.
*/
public float getBandwidthEstimate() {
long now = _context.clock().now();
// anti-aliasing filter
// As in kernel tcp_westwood.c
// and the Westwood+ paper
synchronized(this) {
long deltaT = now - _tAck;
if (deltaT >= WESTWOOD_RTT_MIN)
return computeBWE(now, (int) deltaT);
return _bKFiltered;
}
}
/**
* @return the current queue size estimate in bytes.
*/
public float getQueueSizeEstimate() {
long now = _context.clock().now();
// anti-aliasing filter
// As in kernel tcp_westwood.c
// and the Westwood+ paper
synchronized(this) {
long deltaT = now - _tQSize;
if (deltaT >= WESTWOOD_RTT_MIN)
updateQSize(now, deltaT);
return _avgQSize;
}
}
private synchronized float computeBWE(final long now, final int rtt) {
if (_acked < 0)
return 0.0f; // nothing ever sampled
updateBK(now, _acked, rtt);
_acked = 0;
return _bKFiltered;
}
/**
* Optimized version of updateBK with packets == 0
*/
private void decay() {
_bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR;
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
}
private void decayQueue(int rtt) {
_qSize -= rtt * _bwBpms;
if (_qSize < 1)
_qSize = 0;
_avgQSize = westwood_do_filter(_avgQSize, _qSize);
}
/**
* Here we insert virtual null samples if necessary as in Westwood,
* And use a very simple EWMA (exponential weighted moving average)
* time-varying filter, as in kernel tcp_westwood.c
*
* @param time the time of the measurement
* @param packets number of bytes acked
* @param rtt current rtt
*/
private void updateBK(long time, int packets, int rtt) {
long deltaT = time - _tAck;
if (rtt < WESTWOOD_RTT_MIN)
rtt = WESTWOOD_RTT_MIN;
if (deltaT > 2 * rtt) {
// Decay with virtual null samples as in the Westwood paper
int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR);
for (int i = 0; i < numrtts; i++) {
if (_bKFiltered <= 0)
break;
decay();
}
deltaT -= numrtts * rtt;
//if (_log.shouldDebug())
// _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
float bkdt;
if (packets > 0) {
// As in kernel tcp_westwood.c
bkdt = ((float) packets) / deltaT;
_bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt);
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
} else {
bkdt = 0;
decay();
}
_tAck = time;
//if (_log.shouldDebug())
// _log.debug("computeBWE bytes: " + packets + " deltaT: " + deltaT +
// " bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
/**
* Here we insert virtual null samples if necessary as in Westwood,
* And use a very simple EWMA (exponential weighted moving average)
* time-varying filter, as in kernel tcp_westwood.c
*
* @param time the time of the measurement
* @param deltaT at least WESTWOOD_RTT_MIN
*/
private void updateQSize(long time, long deltaT) {
long origDT = deltaT;
if (deltaT > 2 * WESTWOOD_RTT_MIN) {
// Decay with virtual null samples as in the Westwood paper
int numrtts = Math.min((int) ((deltaT / WESTWOOD_RTT_MIN) - 1), 2 * DECAY_FACTOR);
for (int i = 0; i < numrtts; i++) {
if (_avgQSize <= 0)
break;
decayQueue(WESTWOOD_RTT_MIN);
}
deltaT -= numrtts * WESTWOOD_RTT_MIN;
//if (_log.shouldDebug())
// _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
int origNDS = _newDataSize;
float newQSize = _newDataSize;
if (_newDataSize > 0) {
newQSize -= deltaT * _bwBpms;
if (newQSize < 1)
newQSize = 0;
_qSize = westwood_do_filter(_qSize, newQSize);
_avgQSize = westwood_do_filter(_avgQSize, _qSize);
_newDataSize = 0;
} else {
decayQueue((int) deltaT);
}
_tQSize = time;
if (_log.shouldDebug())
_log.debug("computeQS deltaT: " + origDT +
" newData: " + origNDS +
" newQsize: " + newQSize + " qSize: " + _qSize + ' ' + this);
}
/**
* As in kernel tcp_westwood.c
*/
private static float westwood_do_filter(float a, float b) {
return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR;
}
@Override
public synchronized String toString() {
return "SREDQ[" +
//" _bKFiltered " + _bKFiltered +
//" _tAck " + _tAck + "; " +
//" _tQSize " + _tQSize +
' ' + DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000), false) +
"Bps, avg_qsize " +
DataHelper.formatSize2((long) _avgQSize, false) +
"B]";
}
}

View File

@@ -52,6 +52,7 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver {
//if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
// return -1;
//_config.incrementSentMessages();
_context.bandwidthLimiter().sentParticipatingMessage(1024);
TunnelDataMessage msg = new TunnelDataMessage(_context);
msg.setData(encrypted);
msg.setTunnelId(_config.getSendTunnel());

View File

@@ -79,6 +79,8 @@ class OutboundTunnelEndpoint {
//int kb = (size + 1023) / 1024;
//for (int i = 0; i < kb; i++)
// _config.incrementSentMessages();
if (!toUs)
_context.bandwidthLimiter().sentParticipatingMessage(size);
_outDistributor.distribute(msg, toRouter, toTunnel);
}
}

View File

@@ -34,15 +34,7 @@ class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
// Hard to do this exactly, but we'll assume 2:1 batching
// for the purpose of estimating outgoing size.
// We assume that it's the outbound bandwidth that is the issue...
// first frag. overhead
int size = msg.getMessageSize() + 60;
if (size > 1024) {
// additional frag. overhead
size += 28 * (size / 1024);
} else if (size < 512) {
// 2:1 batching of small messages
size = 512;
}
int size = Math.max(msg.getMessageSize(), 1024/2);
if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.IBGW, msg.getType(), size)) {
// this overstates the stat somewhat, but ok for now
int kb = (size + 1023) / 1024;

View File

@@ -767,30 +767,84 @@ public class TunnelDispatcher implements Service {
public boolean shouldDropParticipatingMessage(Location loc, int type, int length) {
if (length <= 0)
return false;
/****
Don't use the tunnel.participatingBandwidth stat any more. It could be up to 3 minutes old.
Also, it counts inbound bandwidth, i.e. before dropping, which resulted in too many drops
during a burst.
We now use the bandwidth limiter to track outbound participating bandwidth
over the last few seconds.
****/
/****
RateStat rs = _context.statManager().getRate("tunnel.participatingBandwidth");
if (rs == null)
return false;
Rate r = rs.getRate(60*1000);
if (r == null)
return false;
// weight current period higher
long count = r.getLastEventCount() + (3 * r.getCurrentEventCount());
int bw = 0;
if (count > 0)
bw = (int) ((r.getLastTotalValue() + (3 * r.getCurrentTotalValue())) / count);
else
bw = (int) r.getLifetimeAverageValue();
int usedIn = Math.min(_context.router().get1sRateIn(), _context.router().get15sRateIn());
if (bw < usedIn)
usedIn = bw;
if (usedIn <= 0)
return false;
int usedOut = Math.min(_context.router().get1sRate(true), _context.router().get15sRate(true));
if (bw < usedOut)
usedOut = bw;
if (usedOut <= 0)
return false;
int used = Math.min(usedIn, usedOut);
****/
int used = _context.bandwidthLimiter().getCurrentParticipatingBandwidth();
if (used <= 0)
return false;
int maxKBps = Math.min(_context.bandwidthLimiter().getInboundKBytesPerSecond(),
_context.bandwidthLimiter().getOutboundKBytesPerSecond());
float share = (float) _context.router().getSharePercentage();
// start dropping at 120% of the limit,
// as we rely on Throttle for long-term bandwidth control by rejecting tunnels
float maxBps = maxKBps * share * (1024f * 0.95f);
float pctDrop = (used - maxBps) / used;
if (pctDrop <= 0)
return false;
// increase the drop probability for OBEP,
// (except lower it for tunnel build messages type 21/22/23/24),
// and lower it for IBGW, for network efficiency
float factor;
double len = length;
if (loc == Location.OBEP) {
// we don't need to check for VTBRM/TBRM as that happens at tunnel creation
if (type == VariableTunnelBuildMessage.MESSAGE_TYPE || type == TunnelBuildMessage.MESSAGE_TYPE)
factor = 1 / 1.5f;
len /= 1.5;
else
factor = 1.5f;
len *= 1.5;
} else if (loc == Location.IBGW) {
// we don't need to check for VTBM/TBM as that happens at tunnel creation
if (type == VariableTunnelBuildReplyMessage.MESSAGE_TYPE || type == TunnelBuildReplyMessage.MESSAGE_TYPE)
factor = 1 / (1.5f * 1.5f * 1.5f);
len /= 1.5 * 1.5 * 1.5;
else
factor = 1 / 1.5f;
} else {
factor = 1.0f;
len /= 1.5;
}
boolean reject = ! _context.bandwidthLimiter().sentParticipatingMessage(length, factor);
// drop in proportion to size w.r.t. a standard 1024-byte message
// this is a little expensive but we want to adjust the curve between 0 and 1
// Most messages are 1024, only at the OBEP do we see other sizes
if ((int)len != 1024)
pctDrop = (float) Math.pow(pctDrop, 1024d / len);
float rand = _context.random().nextFloat();
boolean reject = rand <= pctDrop;
if (reject) {
if (_log.shouldLog(Log.WARN)) {
_log.warn("Drop part. msg. factor=" + factor
int availBps = (int) (((maxKBps*1024)*share) - used);
_log.warn("Drop part. msg. avail/max/used " + availBps + "/" + (int) maxBps + "/"
+ used + " %Drop = " + pctDrop
+ ' ' + loc + ' ' + type + ' ' + length);
}
_context.statManager().addRateData("tunnel.participatingMessageDropped", 1);

View File

@@ -203,6 +203,7 @@ class TunnelParticipant {
TunnelDataMessage.MESSAGE_TYPE, 1024))
return;
//_config.incrementSentMessages();
_context.bandwidthLimiter().sentParticipatingMessage(1024);
long oldId = msg.getUniqueId();
long newId = _context.random().nextLong(I2NPMessage.MAX_ID_VALUE);
_context.messageHistory().wrap("TunnelDataMessage", oldId, "TunnelDataMessage", newId);