Compare commits

...

2 Commits

Author SHA1 Message Date
zzz
8584f2f4cc Reduce UDP max priority so the msgs are still droppable 2022-01-21 10:24:09 -05:00
zzz
827bf31576 WIP: Transport outbound queue fixes
to prevent part. tunnel traffic clogging
up queues at the bandwidth limiter, and ensuring
local traffic is prioritized over part. tunnel traffic

- Part. tunnel RED param tweaks:
  - double drop %
  - reduce time constant
- UDP:
  * Pass message priority through to the packets
  * Change UDP-Sender queue from CoDel to CoDelPriority
- NTCP:
  * Change NTCP-Connection queues from Priority to CoDelPriority,
    same time constants as UDP-Sender
- Bandwidth limiter log tweaks
2022-01-21 09:45:59 -05:00
7 changed files with 107 additions and 26 deletions

View File

@@ -281,10 +281,34 @@ public class FIFOBandwidthLimiter {
StringBuilder getStatus() { StringBuilder getStatus() {
StringBuilder rv = new StringBuilder(128); StringBuilder rv = new StringBuilder(128);
rv.append("Available: ").append(_availableInbound).append('/').append(_availableOutbound).append(' '); rv.append("Available: ").append(_availableInbound).append('/').append(_availableOutbound);
rv.append("Max: ").append(_maxInbound).append('/').append(_maxOutbound).append(' '); rv.append(" Max: ").append(_maxInbound).append('/').append(_maxOutbound);
rv.append("Burst: ").append(_unavailableInboundBurst).append('/').append(_unavailableOutboundBurst).append(' '); rv.append(" Burst: ").append(_unavailableInboundBurst).append('/').append(_unavailableOutboundBurst);
rv.append("Burst max: ").append(_maxInboundBurst).append('/').append(_maxOutboundBurst).append(' '); rv.append(" Burst max: ").append(_maxInboundBurst).append('/').append(_maxOutboundBurst);
return rv;
}
/**
* @since 0.9.53
*/
private StringBuilder getInboundStatus() {
StringBuilder rv = new StringBuilder(128);
rv.append("Available: ").append(_availableInbound);
rv.append(" Max: ").append(_maxInbound);
rv.append(" Burst: ").append(_unavailableInboundBurst);
rv.append(" Burst max: ").append(_maxInboundBurst);
return rv;
}
/**
* @since 0.9.53
*/
private StringBuilder getOutboundStatus() {
StringBuilder rv = new StringBuilder(128);
rv.append("Available: ").append(_availableOutbound);
rv.append(" Max: ").append(_maxOutbound);
rv.append(" Burst: ").append(_unavailableOutboundBurst);
rv.append(" Burst max: ").append(_maxOutboundBurst);
return rv; return rv;
} }
@@ -450,8 +474,8 @@ public class FIFOBandwidthLimiter {
} else { } else {
// no bandwidth available // no bandwidth available
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Still denying the " + _pendingInboundRequests.size() _log.debug("Denying " + _pendingInboundRequests.size()
+ " pending inbound requests (status: " + getStatus().toString() + " pending inbound requests (status: " + getInboundStatus()
+ ", longest waited " + locked_getLongestInboundWait() + ')'); + ", longest waited " + locked_getLongestInboundWait() + ')');
} }
} }
@@ -497,6 +521,7 @@ public class FIFOBandwidthLimiter {
/** /**
* There are no limits, so just give every inbound request whatever they want * There are no limits, so just give every inbound request whatever they want
* *
* @param satisfied out param, list of requests that were completely satisfied
*/ */
private final void locked_satisfyInboundUnlimited(List<Request> satisfied) { private final void locked_satisfyInboundUnlimited(List<Request> satisfied) {
while (!_pendingInboundRequests.isEmpty()) { while (!_pendingInboundRequests.isEmpty()) {
@@ -520,7 +545,7 @@ public class FIFOBandwidthLimiter {
* bandwidth as we can to those who have used what we have given them and are waiting * bandwidth as we can to those who have used what we have given them and are waiting
* for more (giving priority to the first ones who requested it) * for more (giving priority to the first ones who requested it)
* *
* @return list of requests that were completely satisfied * @param satisfied out param, list of requests that were completely satisfied
*/ */
private final void locked_satisfyInboundAvailable(List<Request> satisfied) { private final void locked_satisfyInboundAvailable(List<Request> satisfied) {
for (int i = 0; i < _pendingInboundRequests.size(); i++) { for (int i = 0; i < _pendingInboundRequests.size(); i++) {
@@ -600,8 +625,8 @@ public class FIFOBandwidthLimiter {
} else { } else {
// no bandwidth available // no bandwidth available
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Still denying the " + _pendingOutboundRequests.size() _log.info("Denying " + _pendingOutboundRequests.size()
+ " pending outbound requests (status: " + getStatus().toString() + " pending outbound requests (status: " + getOutboundStatus()
+ ", longest waited " + locked_getLongestOutboundWait() + ')'); + ", longest waited " + locked_getLongestOutboundWait() + ')');
} }
} }
@@ -618,6 +643,7 @@ public class FIFOBandwidthLimiter {
/** /**
* There are no limits, so just give every outbound request whatever they want * There are no limits, so just give every outbound request whatever they want
* *
* @param satisfied out param, list of requests that were completely satisfied
*/ */
private final void locked_satisfyOutboundUnlimited(List<Request> satisfied) { private final void locked_satisfyOutboundUnlimited(List<Request> satisfied) {
while (!_pendingOutboundRequests.isEmpty()) { while (!_pendingOutboundRequests.isEmpty()) {
@@ -642,7 +668,7 @@ public class FIFOBandwidthLimiter {
* bandwidth as we can to those who have used what we have given them and are waiting * bandwidth as we can to those who have used what we have given them and are waiting
* for more (giving priority to the first ones who requested it) * for more (giving priority to the first ones who requested it)
* *
* @return list of requests that were completely satisfied * @param satisfied out param, list of requests that were completely satisfied
*/ */
private final void locked_satisfyOutboundAvailable(List<Request> satisfied) { private final void locked_satisfyOutboundAvailable(List<Request> satisfied) {
for (int i = 0; i < _pendingOutboundRequests.size(); i++) { for (int i = 0; i < _pendingOutboundRequests.size(); i++) {

View File

@@ -77,8 +77,8 @@ class SyntheticREDQueue implements BandwidthEstimator {
private final int _bwBps; private final int _bwBps;
// bandwidth in bytes per ms. The queue is drained at this rate. // bandwidth in bytes per ms. The queue is drained at this rate.
private final float _bwBpms; private final float _bwBpms;
// As in RED paper // Twice the RED paper value
private static final float MAXP = 0.02f; private static final float MAXP = 2 * 0.02f;
// As in kernel tcp_westwood.c // As in kernel tcp_westwood.c
private static final int DECAY_FACTOR = 8; private static final int DECAY_FACTOR = 8;
@@ -86,7 +86,7 @@ class SyntheticREDQueue implements BandwidthEstimator {
// denominator of time, 1/x seconds of traffic in the queue // denominator of time, 1/x seconds of traffic in the queue
private static final int DEFAULT_LOW_THRESHOLD = 13; private static final int DEFAULT_LOW_THRESHOLD = 13;
// denominator of time, 1/x seconds of traffic in the queue // denominator of time, 1/x seconds of traffic in the queue
private static final int DEFAULT_HIGH_THRESHOLD = 3; private static final int DEFAULT_HIGH_THRESHOLD = 5;
/** /**
* Default thresholds. * Default thresholds.

View File

@@ -44,7 +44,8 @@ import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.router.transport.FIFOBandwidthLimiter; import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.transport.FIFOBandwidthLimiter.Request; import net.i2p.router.transport.FIFOBandwidthLimiter.Request;
import net.i2p.router.transport.ntcp.NTCP2Payload.Block; import net.i2p.router.transport.ntcp.NTCP2Payload.Block;
import net.i2p.router.util.PriBlockingQueue; //import net.i2p.router.util.PriBlockingQueue;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.util.ByteCache; import net.i2p.util.ByteCache;
import net.i2p.util.ConcurrentHashSet; import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.HexDump; import net.i2p.util.HexDump;
@@ -93,8 +94,8 @@ public class NTCPConnection implements Closeable {
/** /**
* pending unprepared OutNetMessage instances * pending unprepared OutNetMessage instances
*/ */
//private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound; private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound;
private final PriBlockingQueue<OutNetMessage> _outbound; //private final PriBlockingQueue<OutNetMessage> _outbound;
/** /**
* current prepared OutNetMessages, or empty - synchronize on _writeLock * current prepared OutNetMessages, or empty - synchronize on _writeLock
*/ */
@@ -160,6 +161,9 @@ public class NTCPConnection implements Closeable {
private static final AtomicLong __connID = new AtomicLong(); private static final AtomicLong __connID = new AtomicLong();
private final long _connID = __connID.incrementAndGet(); private final long _connID = __connID.incrementAndGet();
private static final int CODEL_TARGET = 100;
private static final int CODEL_INTERVAL = 500;
//// NTCP2 things //// NTCP2 things
/** See spec. Max Noise payload 65535, /** See spec. Max Noise payload 65535,
@@ -266,8 +270,8 @@ public class NTCPConnection implements Closeable {
_writeBufs = new ConcurrentLinkedQueue<ByteBuffer>(); _writeBufs = new ConcurrentLinkedQueue<ByteBuffer>();
_bwInRequests = new ConcurrentHashSet<Request>(2); _bwInRequests = new ConcurrentHashSet<Request>(2);
_bwOutRequests = new ConcurrentHashSet<Request>(8); _bwOutRequests = new ConcurrentHashSet<Request>(8);
//_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32); _outbound = new CoDelPriorityBlockingQueue<OutNetMessage>(ctx, "NTCP-Connection", 32, CODEL_TARGET, CODEL_INTERVAL);
_outbound = new PriBlockingQueue<OutNetMessage>(ctx, "NTCP-Connection", 32); //_outbound = new PriBlockingQueue<OutNetMessage>(ctx, "NTCP-Connection", 32);
_currentOutbound = new ArrayList<OutNetMessage>(1); _currentOutbound = new ArrayList<OutNetMessage>(1);
_isInbound = isIn; _isInbound = isIn;
_inboundListener = new InboundListener(); _inboundListener = new InboundListener();
@@ -536,6 +540,7 @@ public class NTCPConnection implements Closeable {
_sendSipIV = null; _sendSipIV = null;
} }
} }
for (OutNetMessage msg : pending) { for (OutNetMessage msg : pending) {
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); _transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
} }
@@ -589,6 +594,7 @@ public class NTCPConnection implements Closeable {
+ ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE)) + ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
+ ", currentOut set? " + currentOutboundSet + ", currentOut set? " + currentOutboundSet
+ ", id: " + seq + ", id: " + seq
+ ", bw reqs: " + _bwOutRequests.size()
+ ", writeBufs: " + writeBufs + " on " + toString()); + ", writeBufs: " + writeBufs + " on " + toString());
} catch (RuntimeException e) {} // java.nio.channels.CancelledKeyException } catch (RuntimeException e) {} // java.nio.channels.CancelledKeyException
} }
@@ -1004,8 +1010,11 @@ public class NTCPConnection implements Closeable {
removeOBRequest(req); removeOBRequest(req);
ByteBuffer buf = (ByteBuffer)req.attachment(); ByteBuffer buf = (ByteBuffer)req.attachment();
if (!_closed.get()) { if (!_closed.get()) {
_context.statManager().addRateData("ntcp.throttledWriteComplete", (_context.clock().now()-req.getRequestTime())); long delay = _context.clock().now() - req.getRequestTime();
_context.statManager().addRateData("ntcp.throttledWriteComplete", delay);
write(buf); write(buf);
if (_log.shouldInfo())
_log.info("Write " + buf.remaining() + " after delay: " + delay + " on " + NTCPConnection.this);
} }
} }
} }

View File

@@ -500,7 +500,7 @@ class OutboundMessageState implements CDPQEntry {
* @since 0.9.3 * @since 0.9.3
*/ */
public int getPriority() { public int getPriority() {
return _message != null ? _message.getPriority() : 1000; return _message != null ? _message.getPriority() : PacketBuilder.PRIORITY_HIGH;
} }
@Override @Override

View File

@@ -18,6 +18,7 @@ import net.i2p.data.router.RouterIdentity;
import net.i2p.data.SessionKey; import net.i2p.data.SessionKey;
import net.i2p.data.Signature; import net.i2p.data.Signature;
import net.i2p.data.router.RouterAddress; import net.i2p.data.router.RouterAddress;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.transport.TransportUtil; import net.i2p.router.transport.TransportUtil;
import net.i2p.util.Addresses; import net.i2p.util.Addresses;
@@ -182,6 +183,9 @@ class PacketBuilder {
private static final byte DATA_FLAG_BYTE = UDPPacket.PAYLOAD_TYPE_DATA << 4; private static final byte DATA_FLAG_BYTE = UDPPacket.PAYLOAD_TYPE_DATA << 4;
private static final byte PEER_TEST_FLAG_BYTE = UDPPacket.PAYLOAD_TYPE_TEST << 4; private static final byte PEER_TEST_FLAG_BYTE = UDPPacket.PAYLOAD_TYPE_TEST << 4;
private static final byte SESSION_DESTROY_FLAG_BYTE = (byte) (UDPPacket.PAYLOAD_TYPE_SESSION_DESTROY << 4); private static final byte SESSION_DESTROY_FLAG_BYTE = (byte) (UDPPacket.PAYLOAD_TYPE_SESSION_DESTROY << 4);
/* Higher than all other OutNetMessage priorities, but still droppable */
static final int PRIORITY_HIGH = 600;
/** /**
* No state, all methods are thread-safe. * No state, all methods are thread-safe.
@@ -318,9 +322,13 @@ class PacketBuilder {
// calculate data size // calculate data size
int numFragments = fragments.size(); int numFragments = fragments.size();
int dataSize = 0; int dataSize = 0;
int priority = 0;
for (int i = 0; i < numFragments; i++) { for (int i = 0; i < numFragments; i++) {
Fragment frag = fragments.get(i); Fragment frag = fragments.get(i);
OutboundMessageState state = frag.state; OutboundMessageState state = frag.state;
int pri = state.getPriority();
if (pri > priority)
priority = pri;
int fragment = frag.num; int fragment = frag.num;
int sz = state.fragmentSize(fragment); int sz = state.fragmentSize(fragment);
dataSize += sz; dataSize += sz;
@@ -569,6 +577,7 @@ class PacketBuilder {
} }
} }
packet.setPriority(priority);
return packet; return packet;
} }
@@ -681,6 +690,7 @@ class PacketBuilder {
pkt.setLength(off); pkt.setLength(off);
authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey()); authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey());
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort()); setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
packet.setPriority((fullACKCount > 0 || partialACKCount > 0) ? PRIORITY_HIGH : OutNetMessage.PRIORITY_LOWEST);
return packet; return packet;
} }
@@ -775,6 +785,7 @@ class PacketBuilder {
setTo(packet, to, state.getSentPort()); setTo(packet, to, state.getSentPort());
SimpleByteCache.release(iv); SimpleByteCache.release(iv);
packet.setMessageType(TYPE_CREAT); packet.setMessageType(TYPE_CREAT);
packet.setPriority(PRIORITY_HIGH);
return packet; return packet;
} }
@@ -840,6 +851,7 @@ class PacketBuilder {
authenticate(packet, state.getIntroKey(), state.getIntroKey()); authenticate(packet, state.getIntroKey(), state.getIntroKey());
setTo(packet, to, port); setTo(packet, to, port);
packet.setMessageType(TYPE_SREQ); packet.setMessageType(TYPE_SREQ);
packet.setPriority(PRIORITY_HIGH);
return packet; return packet;
} }
@@ -944,6 +956,7 @@ class PacketBuilder {
authenticate(packet, state.getCipherKey(), state.getMACKey()); authenticate(packet, state.getCipherKey(), state.getMACKey());
setTo(packet, to, state.getSentPort()); setTo(packet, to, state.getSentPort());
packet.setMessageType(TYPE_CONF); packet.setMessageType(TYPE_CONF);
packet.setPriority(PRIORITY_HIGH);
return packet; return packet;
} }
@@ -1040,6 +1053,7 @@ class PacketBuilder {
pkt.setLength(off); pkt.setLength(off);
authenticate(packet, cipherKey, macKey); authenticate(packet, cipherKey, macKey);
setTo(packet, addr, port); setTo(packet, addr, port);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet; return packet;
} }
@@ -1084,6 +1098,7 @@ class PacketBuilder {
authenticate(packet, toCipherKey, toMACKey); authenticate(packet, toCipherKey, toMACKey);
setTo(packet, toIP, toPort); setTo(packet, toIP, toPort);
packet.setMessageType(TYPE_TFA); packet.setMessageType(TYPE_TFA);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet; return packet;
} }
@@ -1135,6 +1150,7 @@ class PacketBuilder {
authenticate(packet, aliceCipherKey, aliceMACKey); authenticate(packet, aliceCipherKey, aliceMACKey);
setTo(packet, aliceIP, alicePort); setTo(packet, aliceIP, alicePort);
packet.setMessageType(TYPE_TTA); packet.setMessageType(TYPE_TTA);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet; return packet;
} }
@@ -1172,6 +1188,7 @@ class PacketBuilder {
authenticate(packet, charlieCipherKey, charlieMACKey); authenticate(packet, charlieCipherKey, charlieMACKey);
setTo(packet, charlieIP, charliePort); setTo(packet, charlieIP, charliePort);
packet.setMessageType(TYPE_TBC); packet.setMessageType(TYPE_TBC);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet; return packet;
} }
@@ -1209,6 +1226,7 @@ class PacketBuilder {
authenticate(packet, bobCipherKey, bobMACKey); authenticate(packet, bobCipherKey, bobMACKey);
setTo(packet, bobIP, bobPort); setTo(packet, bobIP, bobPort);
packet.setMessageType(TYPE_TCB); packet.setMessageType(TYPE_TCB);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet; return packet;
} }
@@ -1372,6 +1390,7 @@ class PacketBuilder {
authenticate(packet, cipherKey, macKey); authenticate(packet, cipherKey, macKey);
setTo(packet, introHost, introPort); setTo(packet, introHost, introPort);
packet.setMessageType(TYPE_RREQ); packet.setMessageType(TYPE_RREQ);
packet.setPriority(PRIORITY_HIGH);
return packet; return packet;
} }
@@ -1405,6 +1424,7 @@ class PacketBuilder {
authenticate(packet, charlie.getCurrentCipherKey(), charlie.getCurrentMACKey()); authenticate(packet, charlie.getCurrentCipherKey(), charlie.getCurrentMACKey());
setTo(packet, charlie.getRemoteIPAddress(), charlie.getRemotePort()); setTo(packet, charlie.getRemoteIPAddress(), charlie.getRemotePort());
packet.setMessageType(TYPE_INTRO); packet.setMessageType(TYPE_INTRO);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet; return packet;
} }
@@ -1451,6 +1471,7 @@ class PacketBuilder {
authenticate(packet, cipherKey, macKey); authenticate(packet, cipherKey, macKey);
setTo(packet, aliceAddr, alice.getPort()); setTo(packet, aliceAddr, alice.getPort());
packet.setMessageType(TYPE_RESP); packet.setMessageType(TYPE_RESP);
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
return packet; return packet;
} }
@@ -1469,6 +1490,7 @@ class PacketBuilder {
setTo(packet, to, port); setTo(packet, to, port);
packet.setMessageType(TYPE_PUNCH); packet.setMessageType(TYPE_PUNCH);
packet.setPriority(PRIORITY_HIGH);
return packet; return packet;
} }

View File

@@ -12,7 +12,7 @@ import net.i2p.data.DataHelper;
import net.i2p.data.SessionKey; import net.i2p.data.SessionKey;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter; import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CDQEntry; import net.i2p.router.util.CDPQEntry;
import net.i2p.util.TryCache; import net.i2p.util.TryCache;
import net.i2p.util.Addresses; import net.i2p.util.Addresses;
import net.i2p.util.Log; import net.i2p.util.Log;
@@ -23,10 +23,10 @@ import net.i2p.util.SystemVersion;
* of object instances to allow rapid reuse. * of object instances to allow rapid reuse.
* *
*/ */
class UDPPacket implements CDQEntry { class UDPPacket implements CDPQEntry {
private RouterContext _context; private RouterContext _context;
private final DatagramPacket _packet; private final DatagramPacket _packet;
private volatile short _priority; private int _priority;
private volatile long _initializeTime; private volatile long _initializeTime;
//private volatile long _expiration; //private volatile long _expiration;
private final byte[] _data; private final byte[] _data;
@@ -46,6 +46,7 @@ class UDPPacket implements CDQEntry {
private int _validateCount; private int _validateCount;
// private boolean _isInbound; // private boolean _isInbound;
private FIFOBandwidthLimiter.Request _bandwidthRequest; private FIFOBandwidthLimiter.Request _bandwidthRequest;
private long _seqNum;
private static class PacketFactory implements TryCache.ObjectFactory<UDPPacket> { private static class PacketFactory implements TryCache.ObjectFactory<UDPPacket> {
static RouterContext context; static RouterContext context;
@@ -168,6 +169,20 @@ class UDPPacket implements CDQEntry {
_receivedTime = 0; _receivedTime = 0;
_fragmentCount = 0; _fragmentCount = 0;
} }
/**
* CDPQEntry
* @since 0.9.53
*/
public void setSeqNum(long num) { _seqNum = num; }
/**
* CDPQEntry
* @since 0.9.53
*/
public long getSeqNum() { return _seqNum; }
/**** /****
public void writeData(byte src[], int offset, int len) { public void writeData(byte src[], int offset, int len) {
@@ -180,7 +195,13 @@ class UDPPacket implements CDQEntry {
/** */ /** */
public synchronized DatagramPacket getPacket() { verifyNotReleased(); return _packet; } public synchronized DatagramPacket getPacket() { verifyNotReleased(); return _packet; }
public synchronized short getPriority() { verifyNotReleased(); return _priority; } public int getPriority() { return _priority; }
/**
* @since 0.9.53
*/
public void setPriority(int pri) { _priority = pri; }
//public long getExpiration() { verifyNotReleased(); return _expiration; } //public long getExpiration() { verifyNotReleased(); return _expiration; }
public synchronized long getBegin() { verifyNotReleased(); return _initializeTime; } public synchronized long getBegin() { verifyNotReleased(); return _initializeTime; }
public long getLifetime() { /** verifyNotReleased(); */ return _context.clock().now() - _initializeTime; } public long getLifetime() { /** verifyNotReleased(); */ return _context.clock().now() - _initializeTime; }
@@ -394,6 +415,7 @@ class UDPPacket implements CDQEntry {
buf.append(" byte pkt with "); buf.append(" byte pkt with ");
buf.append(Addresses.toString(_packet.getAddress().getAddress(), _packet.getPort())); buf.append(Addresses.toString(_packet.getAddress().getAddress(), _packet.getPort()));
//buf.append(" id=").append(System.identityHashCode(this)); //buf.append(" id=").append(System.identityHashCode(this));
buf.append(" priority=").append(_priority);
if (_messageType >= 0) if (_messageType >= 0)
buf.append(" msgType=").append(_messageType); buf.append(" msgType=").append(_messageType);
if (_markedType >= 0) if (_markedType >= 0)

View File

@@ -7,7 +7,8 @@ import java.util.concurrent.BlockingQueue;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter; import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CoDelBlockingQueue; //import net.i2p.router.util.CoDelBlockingQueue;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SystemVersion; import net.i2p.util.SystemVersion;
@@ -46,7 +47,8 @@ class UDPSender {
_log = ctx.logManager().getLog(UDPSender.class); _log = ctx.logManager().getLog(UDPSender.class);
long maxMemory = SystemVersion.getMaxMemory(); long maxMemory = SystemVersion.getMaxMemory();
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024))); int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024)));
_outboundQueue = new CoDelBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL); //_outboundQueue = new CoDelBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL);
_outboundQueue = new CoDelPriorityBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL);
_socket = socket; _socket = socket;
_runner = new Runner(); _runner = new Runner();
_name = name; _name = name;