Compare commits

...

2 Commits

Author SHA1 Message Date
zzz
8584f2f4cc Reduce UDP max priority so the msgs are still droppable 2022-01-21 10:24:09 -05:00
zzz
827bf31576 WIP: Transport outbound queue fixes
to prevent part. tunnel traffic clogging
up queues at the bandwidth limiter, and ensuring
local traffic is prioritized over part. tunnel traffic

- Part. tunnel RED param tweaks:
  - double drop %
  - reduce time constant
- UDP:
  * Pass message priority through to the packets
  * Change UDP-Sender queue from CoDel to CoDelPriority
- NTCP:
  * Change NTCP-Connection queues from Priority to CoDelPriority,
    same time constants as UDP-Sender
- Bandwidth limiter log tweaks
2022-01-21 09:45:59 -05:00
7 changed files with 107 additions and 26 deletions

View File

@@ -281,10 +281,34 @@ public class FIFOBandwidthLimiter {
StringBuilder getStatus() {
StringBuilder rv = new StringBuilder(128);
rv.append("Available: ").append(_availableInbound).append('/').append(_availableOutbound).append(' ');
rv.append("Max: ").append(_maxInbound).append('/').append(_maxOutbound).append(' ');
rv.append("Burst: ").append(_unavailableInboundBurst).append('/').append(_unavailableOutboundBurst).append(' ');
rv.append("Burst max: ").append(_maxInboundBurst).append('/').append(_maxOutboundBurst).append(' ');
rv.append("Available: ").append(_availableInbound).append('/').append(_availableOutbound);
rv.append(" Max: ").append(_maxInbound).append('/').append(_maxOutbound);
rv.append(" Burst: ").append(_unavailableInboundBurst).append('/').append(_unavailableOutboundBurst);
rv.append(" Burst max: ").append(_maxInboundBurst).append('/').append(_maxOutboundBurst);
return rv;
}
/**
* @since 0.9.53
*/
private StringBuilder getInboundStatus() {
StringBuilder rv = new StringBuilder(128);
rv.append("Available: ").append(_availableInbound);
rv.append(" Max: ").append(_maxInbound);
rv.append(" Burst: ").append(_unavailableInboundBurst);
rv.append(" Burst max: ").append(_maxInboundBurst);
return rv;
}
/**
* @since 0.9.53
*/
private StringBuilder getOutboundStatus() {
StringBuilder rv = new StringBuilder(128);
rv.append("Available: ").append(_availableOutbound);
rv.append(" Max: ").append(_maxOutbound);
rv.append(" Burst: ").append(_unavailableOutboundBurst);
rv.append(" Burst max: ").append(_maxOutboundBurst);
return rv;
}
@@ -450,8 +474,8 @@ public class FIFOBandwidthLimiter {
} else {
// no bandwidth available
if (_log.shouldLog(Log.DEBUG))
_log.debug("Still denying the " + _pendingInboundRequests.size()
+ " pending inbound requests (status: " + getStatus().toString()
_log.debug("Denying " + _pendingInboundRequests.size()
+ " pending inbound requests (status: " + getInboundStatus()
+ ", longest waited " + locked_getLongestInboundWait() + ')');
}
}
@@ -497,6 +521,7 @@ public class FIFOBandwidthLimiter {
/**
* There are no limits, so just give every inbound request whatever they want
*
* @param satisfied out param, list of requests that were completely satisfied
*/
private final void locked_satisfyInboundUnlimited(List<Request> satisfied) {
while (!_pendingInboundRequests.isEmpty()) {
@@ -520,7 +545,7 @@ public class FIFOBandwidthLimiter {
* bandwidth as we can to those who have used what we have given them and are waiting
* for more (giving priority to the first ones who requested it)
*
* @return list of requests that were completely satisfied
* @param satisfied out param, list of requests that were completely satisfied
*/
private final void locked_satisfyInboundAvailable(List<Request> satisfied) {
for (int i = 0; i < _pendingInboundRequests.size(); i++) {
@@ -600,8 +625,8 @@ public class FIFOBandwidthLimiter {
} else {
// no bandwidth available
if (_log.shouldLog(Log.INFO))
_log.info("Still denying the " + _pendingOutboundRequests.size()
+ " pending outbound requests (status: " + getStatus().toString()
_log.info("Denying " + _pendingOutboundRequests.size()
+ " pending outbound requests (status: " + getOutboundStatus()
+ ", longest waited " + locked_getLongestOutboundWait() + ')');
}
}
@@ -618,6 +643,7 @@ public class FIFOBandwidthLimiter {
/**
* There are no limits, so just give every outbound request whatever they want
*
* @param satisfied out param, list of requests that were completely satisfied
*/
private final void locked_satisfyOutboundUnlimited(List<Request> satisfied) {
while (!_pendingOutboundRequests.isEmpty()) {
@@ -642,7 +668,7 @@ public class FIFOBandwidthLimiter {
* bandwidth as we can to those who have used what we have given them and are waiting
* for more (giving priority to the first ones who requested it)
*
* @return list of requests that were completely satisfied
* @param satisfied out param, list of requests that were completely satisfied
*/
private final void locked_satisfyOutboundAvailable(List<Request> satisfied) {
for (int i = 0; i < _pendingOutboundRequests.size(); i++) {

View File

@@ -77,8 +77,8 @@ class SyntheticREDQueue implements BandwidthEstimator {
private final int _bwBps;
// bandwidth in bytes per ms. The queue is drained at this rate.
private final float _bwBpms;
// As in RED paper
private static final float MAXP = 0.02f;
// Twice the RED paper value
private static final float MAXP = 2 * 0.02f;
// As in kernel tcp_westwood.c
private static final int DECAY_FACTOR = 8;
@@ -86,7 +86,7 @@ class SyntheticREDQueue implements BandwidthEstimator {
// denominator of time, 1/x seconds of traffic in the queue
private static final int DEFAULT_LOW_THRESHOLD = 13;
// denominator of time, 1/x seconds of traffic in the queue
private static final int DEFAULT_HIGH_THRESHOLD = 3;
private static final int DEFAULT_HIGH_THRESHOLD = 5;
/**
* Default thresholds.

View File

@@ -44,7 +44,8 @@ import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.transport.FIFOBandwidthLimiter.Request;
import net.i2p.router.transport.ntcp.NTCP2Payload.Block;
import net.i2p.router.util.PriBlockingQueue;
//import net.i2p.router.util.PriBlockingQueue;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.util.ByteCache;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.HexDump;
@@ -93,8 +94,8 @@ public class NTCPConnection implements Closeable {
/**
* pending unprepared OutNetMessage instances
*/
//private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound;
private final PriBlockingQueue<OutNetMessage> _outbound;
private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound;
//private final PriBlockingQueue<OutNetMessage> _outbound;
/**
* current prepared OutNetMessages, or empty - synchronize on _writeLock
*/
@@ -160,6 +161,9 @@ public class NTCPConnection implements Closeable {
private static final AtomicLong __connID = new AtomicLong();
private final long _connID = __connID.incrementAndGet();
private static final int CODEL_TARGET = 100;
private static final int CODEL_INTERVAL = 500;
//// NTCP2 things
/** See spec. Max Noise payload 65535,
@@ -266,8 +270,8 @@ public class NTCPConnection implements Closeable {
_writeBufs = new ConcurrentLinkedQueue<ByteBuffer>();
_bwInRequests = new ConcurrentHashSet<Request>(2);
_bwOutRequests = new ConcurrentHashSet<Request>(8);
//_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
_outbound = new PriBlockingQueue<OutNetMessage>(ctx, "NTCP-Connection", 32);
_outbound = new CoDelPriorityBlockingQueue<OutNetMessage>(ctx, "NTCP-Connection", 32, CODEL_TARGET, CODEL_INTERVAL);
//_outbound = new PriBlockingQueue<OutNetMessage>(ctx, "NTCP-Connection", 32);
_currentOutbound = new ArrayList<OutNetMessage>(1);
_isInbound = isIn;
_inboundListener = new InboundListener();
@@ -536,6 +540,7 @@ public class NTCPConnection implements Closeable {
_sendSipIV = null;
}
}
for (OutNetMessage msg : pending) {
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
}
@@ -589,6 +594,7 @@ public class NTCPConnection implements Closeable {
+ ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
+ ", currentOut set? " + currentOutboundSet
+ ", id: " + seq
+ ", bw reqs: " + _bwOutRequests.size()
+ ", writeBufs: " + writeBufs + " on " + toString());
} catch (RuntimeException e) {} // java.nio.channels.CancelledKeyException
}
@@ -1004,8 +1010,11 @@ public class NTCPConnection implements Closeable {
removeOBRequest(req);
ByteBuffer buf = (ByteBuffer)req.attachment();
if (!_closed.get()) {
_context.statManager().addRateData("ntcp.throttledWriteComplete", (_context.clock().now()-req.getRequestTime()));
long delay = _context.clock().now() - req.getRequestTime();
_context.statManager().addRateData("ntcp.throttledWriteComplete", delay);
write(buf);
if (_log.shouldInfo())
_log.info("Write " + buf.remaining() + " after delay: " + delay + " on " + NTCPConnection.this);
}
}
}

View File

@@ -500,7 +500,7 @@ class OutboundMessageState implements CDPQEntry {
* @since 0.9.3
*/
public int getPriority() {
return _message != null ? _message.getPriority() : 1000;
return _message != null ? _message.getPriority() : PacketBuilder.PRIORITY_HIGH;
}
@Override

View File

@@ -18,6 +18,7 @@ import net.i2p.data.router.RouterIdentity;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.data.router.RouterAddress;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.TransportUtil;
import net.i2p.util.Addresses;
@@ -182,6 +183,9 @@ class PacketBuilder {
private static final byte DATA_FLAG_BYTE = UDPPacket.PAYLOAD_TYPE_DATA << 4;
private static final byte PEER_TEST_FLAG_BYTE = UDPPacket.PAYLOAD_TYPE_TEST << 4;
private static final byte SESSION_DESTROY_FLAG_BYTE = (byte) (UDPPacket.PAYLOAD_TYPE_SESSION_DESTROY << 4);
/* Higher than all other OutNetMessage priorities, but still droppable */
static final int PRIORITY_HIGH = 600;
/**
* No state, all methods are thread-safe.
@@ -318,9 +322,13 @@ class PacketBuilder {
// calculate data size
int numFragments = fragments.size();
int dataSize = 0;
int priority = 0;
for (int i = 0; i < numFragments; i++) {
Fragment frag = fragments.get(i);
OutboundMessageState state = frag.state;
int pri = state.getPriority();
if (pri > priority)
priority = pri;
int fragment = frag.num;
int sz = state.fragmentSize(fragment);
dataSize += sz;
@@ -569,6 +577,7 @@ class PacketBuilder {
}
}
packet.setPriority(priority);
return packet;
}
@@ -681,6 +690,7 @@ class PacketBuilder {
pkt.setLength(off);
authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey());
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
packet.setPriority((fullACKCount > 0 || partialACKCount > 0) ? PRIORITY_HIGH : OutNetMessage.PRIORITY_LOWEST);
return packet;
}
@@ -775,6 +785,7 @@ class PacketBuilder {
setTo(packet, to, state.getSentPort());
SimpleByteCache.release(iv);
packet.setMessageType(TYPE_CREAT);
packet.setPriority(PRIORITY_HIGH);
return packet;
}
@@ -840,6 +851,7 @@ class PacketBuilder {
authenticate(packet, state.getIntroKey(), state.getIntroKey());
setTo(packet, to, port);
packet.setMessageType(TYPE_SREQ);
packet.setPriority(PRIORITY_HIGH);
return packet;
}
@@ -944,6 +956,7 @@ class PacketBuilder {
authenticate(packet, state.getCipherKey(), state.getMACKey());
setTo(packet, to, state.getSentPort());
packet.setMessageType(TYPE_CONF);
packet.setPriority(PRIORITY_HIGH);
return packet;
}
@@ -1040,6 +1053,7 @@ class PacketBuilder {
pkt.setLength(off);
authenticate(packet, cipherKey, macKey);
setTo(packet, addr, port);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet;
}
@@ -1084,6 +1098,7 @@ class PacketBuilder {
authenticate(packet, toCipherKey, toMACKey);
setTo(packet, toIP, toPort);
packet.setMessageType(TYPE_TFA);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet;
}
@@ -1135,6 +1150,7 @@ class PacketBuilder {
authenticate(packet, aliceCipherKey, aliceMACKey);
setTo(packet, aliceIP, alicePort);
packet.setMessageType(TYPE_TTA);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet;
}
@@ -1172,6 +1188,7 @@ class PacketBuilder {
authenticate(packet, charlieCipherKey, charlieMACKey);
setTo(packet, charlieIP, charliePort);
packet.setMessageType(TYPE_TBC);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet;
}
@@ -1209,6 +1226,7 @@ class PacketBuilder {
authenticate(packet, bobCipherKey, bobMACKey);
setTo(packet, bobIP, bobPort);
packet.setMessageType(TYPE_TCB);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet;
}
@@ -1372,6 +1390,7 @@ class PacketBuilder {
authenticate(packet, cipherKey, macKey);
setTo(packet, introHost, introPort);
packet.setMessageType(TYPE_RREQ);
packet.setPriority(PRIORITY_HIGH);
return packet;
}
@@ -1405,6 +1424,7 @@ class PacketBuilder {
authenticate(packet, charlie.getCurrentCipherKey(), charlie.getCurrentMACKey());
setTo(packet, charlie.getRemoteIPAddress(), charlie.getRemotePort());
packet.setMessageType(TYPE_INTRO);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet;
}
@@ -1451,6 +1471,7 @@ class PacketBuilder {
authenticate(packet, cipherKey, macKey);
setTo(packet, aliceAddr, alice.getPort());
packet.setMessageType(TYPE_RESP);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet;
}
@@ -1469,6 +1490,7 @@ class PacketBuilder {
setTo(packet, to, port);
packet.setMessageType(TYPE_PUNCH);
packet.setPriority(PRIORITY_HIGH);
return packet;
}

View File

@@ -12,7 +12,7 @@ import net.i2p.data.DataHelper;
import net.i2p.data.SessionKey;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CDQEntry;
import net.i2p.router.util.CDPQEntry;
import net.i2p.util.TryCache;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
@@ -23,10 +23,10 @@ import net.i2p.util.SystemVersion;
* of object instances to allow rapid reuse.
*
*/
class UDPPacket implements CDQEntry {
class UDPPacket implements CDPQEntry {
private RouterContext _context;
private final DatagramPacket _packet;
private volatile short _priority;
private int _priority;
private volatile long _initializeTime;
//private volatile long _expiration;
private final byte[] _data;
@@ -46,6 +46,7 @@ class UDPPacket implements CDQEntry {
private int _validateCount;
// private boolean _isInbound;
private FIFOBandwidthLimiter.Request _bandwidthRequest;
private long _seqNum;
private static class PacketFactory implements TryCache.ObjectFactory<UDPPacket> {
static RouterContext context;
@@ -168,6 +169,20 @@ class UDPPacket implements CDQEntry {
_receivedTime = 0;
_fragmentCount = 0;
}
/**
* CDPQEntry
* @since 0.9.53
*/
public void setSeqNum(long num) { _seqNum = num; }
/**
* CDPQEntry
* @since 0.9.53
*/
public long getSeqNum() { return _seqNum; }
/****
public void writeData(byte src[], int offset, int len) {
@@ -180,7 +195,13 @@ class UDPPacket implements CDQEntry {
/** */
public synchronized DatagramPacket getPacket() { verifyNotReleased(); return _packet; }
public synchronized short getPriority() { verifyNotReleased(); return _priority; }
public int getPriority() { return _priority; }
/**
* @since 0.9.53
*/
public void setPriority(int pri) { _priority = pri; }
//public long getExpiration() { verifyNotReleased(); return _expiration; }
public synchronized long getBegin() { verifyNotReleased(); return _initializeTime; }
public long getLifetime() { /** verifyNotReleased(); */ return _context.clock().now() - _initializeTime; }
@@ -394,6 +415,7 @@ class UDPPacket implements CDQEntry {
buf.append(" byte pkt with ");
buf.append(Addresses.toString(_packet.getAddress().getAddress(), _packet.getPort()));
//buf.append(" id=").append(System.identityHashCode(this));
buf.append(" priority=").append(_priority);
if (_messageType >= 0)
buf.append(" msgType=").append(_messageType);
if (_markedType >= 0)

View File

@@ -7,7 +7,8 @@ import java.util.concurrent.BlockingQueue;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CoDelBlockingQueue;
//import net.i2p.router.util.CoDelBlockingQueue;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SystemVersion;
@@ -46,7 +47,8 @@ class UDPSender {
_log = ctx.logManager().getLog(UDPSender.class);
long maxMemory = SystemVersion.getMaxMemory();
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024)));
_outboundQueue = new CoDelBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL);
//_outboundQueue = new CoDelBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL);
_outboundQueue = new CoDelPriorityBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL);
_socket = socket;
_runner = new Runner();
_name = name;