forked from I2P_Developers/i2p.i2p
Compare commits
2 Commits
jetty-mult
...
obq
Author | SHA1 | Date | |
---|---|---|---|
![]() |
8584f2f4cc | ||
![]() |
827bf31576 |
@@ -281,10 +281,34 @@ public class FIFOBandwidthLimiter {
|
||||
|
||||
StringBuilder getStatus() {
|
||||
StringBuilder rv = new StringBuilder(128);
|
||||
rv.append("Available: ").append(_availableInbound).append('/').append(_availableOutbound).append(' ');
|
||||
rv.append("Max: ").append(_maxInbound).append('/').append(_maxOutbound).append(' ');
|
||||
rv.append("Burst: ").append(_unavailableInboundBurst).append('/').append(_unavailableOutboundBurst).append(' ');
|
||||
rv.append("Burst max: ").append(_maxInboundBurst).append('/').append(_maxOutboundBurst).append(' ');
|
||||
rv.append("Available: ").append(_availableInbound).append('/').append(_availableOutbound);
|
||||
rv.append(" Max: ").append(_maxInbound).append('/').append(_maxOutbound);
|
||||
rv.append(" Burst: ").append(_unavailableInboundBurst).append('/').append(_unavailableOutboundBurst);
|
||||
rv.append(" Burst max: ").append(_maxInboundBurst).append('/').append(_maxOutboundBurst);
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.9.53
|
||||
*/
|
||||
private StringBuilder getInboundStatus() {
|
||||
StringBuilder rv = new StringBuilder(128);
|
||||
rv.append("Available: ").append(_availableInbound);
|
||||
rv.append(" Max: ").append(_maxInbound);
|
||||
rv.append(" Burst: ").append(_unavailableInboundBurst);
|
||||
rv.append(" Burst max: ").append(_maxInboundBurst);
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.9.53
|
||||
*/
|
||||
private StringBuilder getOutboundStatus() {
|
||||
StringBuilder rv = new StringBuilder(128);
|
||||
rv.append("Available: ").append(_availableOutbound);
|
||||
rv.append(" Max: ").append(_maxOutbound);
|
||||
rv.append(" Burst: ").append(_unavailableOutboundBurst);
|
||||
rv.append(" Burst max: ").append(_maxOutboundBurst);
|
||||
return rv;
|
||||
}
|
||||
|
||||
@@ -450,8 +474,8 @@ public class FIFOBandwidthLimiter {
|
||||
} else {
|
||||
// no bandwidth available
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Still denying the " + _pendingInboundRequests.size()
|
||||
+ " pending inbound requests (status: " + getStatus().toString()
|
||||
_log.debug("Denying " + _pendingInboundRequests.size()
|
||||
+ " pending inbound requests (status: " + getInboundStatus()
|
||||
+ ", longest waited " + locked_getLongestInboundWait() + ')');
|
||||
}
|
||||
}
|
||||
@@ -497,6 +521,7 @@ public class FIFOBandwidthLimiter {
|
||||
/**
|
||||
* There are no limits, so just give every inbound request whatever they want
|
||||
*
|
||||
* @param satisfied out param, list of requests that were completely satisfied
|
||||
*/
|
||||
private final void locked_satisfyInboundUnlimited(List<Request> satisfied) {
|
||||
while (!_pendingInboundRequests.isEmpty()) {
|
||||
@@ -520,7 +545,7 @@ public class FIFOBandwidthLimiter {
|
||||
* bandwidth as we can to those who have used what we have given them and are waiting
|
||||
* for more (giving priority to the first ones who requested it)
|
||||
*
|
||||
* @return list of requests that were completely satisfied
|
||||
* @param satisfied out param, list of requests that were completely satisfied
|
||||
*/
|
||||
private final void locked_satisfyInboundAvailable(List<Request> satisfied) {
|
||||
for (int i = 0; i < _pendingInboundRequests.size(); i++) {
|
||||
@@ -600,8 +625,8 @@ public class FIFOBandwidthLimiter {
|
||||
} else {
|
||||
// no bandwidth available
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Still denying the " + _pendingOutboundRequests.size()
|
||||
+ " pending outbound requests (status: " + getStatus().toString()
|
||||
_log.info("Denying " + _pendingOutboundRequests.size()
|
||||
+ " pending outbound requests (status: " + getOutboundStatus()
|
||||
+ ", longest waited " + locked_getLongestOutboundWait() + ')');
|
||||
}
|
||||
}
|
||||
@@ -618,6 +643,7 @@ public class FIFOBandwidthLimiter {
|
||||
/**
|
||||
* There are no limits, so just give every outbound request whatever they want
|
||||
*
|
||||
* @param satisfied out param, list of requests that were completely satisfied
|
||||
*/
|
||||
private final void locked_satisfyOutboundUnlimited(List<Request> satisfied) {
|
||||
while (!_pendingOutboundRequests.isEmpty()) {
|
||||
@@ -642,7 +668,7 @@ public class FIFOBandwidthLimiter {
|
||||
* bandwidth as we can to those who have used what we have given them and are waiting
|
||||
* for more (giving priority to the first ones who requested it)
|
||||
*
|
||||
* @return list of requests that were completely satisfied
|
||||
* @param satisfied out param, list of requests that were completely satisfied
|
||||
*/
|
||||
private final void locked_satisfyOutboundAvailable(List<Request> satisfied) {
|
||||
for (int i = 0; i < _pendingOutboundRequests.size(); i++) {
|
||||
|
@@ -77,8 +77,8 @@ class SyntheticREDQueue implements BandwidthEstimator {
|
||||
private final int _bwBps;
|
||||
// bandwidth in bytes per ms. The queue is drained at this rate.
|
||||
private final float _bwBpms;
|
||||
// As in RED paper
|
||||
private static final float MAXP = 0.02f;
|
||||
// Twice the RED paper value
|
||||
private static final float MAXP = 2 * 0.02f;
|
||||
|
||||
// As in kernel tcp_westwood.c
|
||||
private static final int DECAY_FACTOR = 8;
|
||||
@@ -86,7 +86,7 @@ class SyntheticREDQueue implements BandwidthEstimator {
|
||||
// denominator of time, 1/x seconds of traffic in the queue
|
||||
private static final int DEFAULT_LOW_THRESHOLD = 13;
|
||||
// denominator of time, 1/x seconds of traffic in the queue
|
||||
private static final int DEFAULT_HIGH_THRESHOLD = 3;
|
||||
private static final int DEFAULT_HIGH_THRESHOLD = 5;
|
||||
|
||||
/**
|
||||
* Default thresholds.
|
||||
|
@@ -44,7 +44,8 @@ import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
|
||||
import net.i2p.router.transport.FIFOBandwidthLimiter;
|
||||
import net.i2p.router.transport.FIFOBandwidthLimiter.Request;
|
||||
import net.i2p.router.transport.ntcp.NTCP2Payload.Block;
|
||||
import net.i2p.router.util.PriBlockingQueue;
|
||||
//import net.i2p.router.util.PriBlockingQueue;
|
||||
import net.i2p.router.util.CoDelPriorityBlockingQueue;
|
||||
import net.i2p.util.ByteCache;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.HexDump;
|
||||
@@ -93,8 +94,8 @@ public class NTCPConnection implements Closeable {
|
||||
/**
|
||||
* pending unprepared OutNetMessage instances
|
||||
*/
|
||||
//private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound;
|
||||
private final PriBlockingQueue<OutNetMessage> _outbound;
|
||||
private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound;
|
||||
//private final PriBlockingQueue<OutNetMessage> _outbound;
|
||||
/**
|
||||
* current prepared OutNetMessages, or empty - synchronize on _writeLock
|
||||
*/
|
||||
@@ -160,6 +161,9 @@ public class NTCPConnection implements Closeable {
|
||||
private static final AtomicLong __connID = new AtomicLong();
|
||||
private final long _connID = __connID.incrementAndGet();
|
||||
|
||||
private static final int CODEL_TARGET = 100;
|
||||
private static final int CODEL_INTERVAL = 500;
|
||||
|
||||
//// NTCP2 things
|
||||
|
||||
/** See spec. Max Noise payload 65535,
|
||||
@@ -266,8 +270,8 @@ public class NTCPConnection implements Closeable {
|
||||
_writeBufs = new ConcurrentLinkedQueue<ByteBuffer>();
|
||||
_bwInRequests = new ConcurrentHashSet<Request>(2);
|
||||
_bwOutRequests = new ConcurrentHashSet<Request>(8);
|
||||
//_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
|
||||
_outbound = new PriBlockingQueue<OutNetMessage>(ctx, "NTCP-Connection", 32);
|
||||
_outbound = new CoDelPriorityBlockingQueue<OutNetMessage>(ctx, "NTCP-Connection", 32, CODEL_TARGET, CODEL_INTERVAL);
|
||||
//_outbound = new PriBlockingQueue<OutNetMessage>(ctx, "NTCP-Connection", 32);
|
||||
_currentOutbound = new ArrayList<OutNetMessage>(1);
|
||||
_isInbound = isIn;
|
||||
_inboundListener = new InboundListener();
|
||||
@@ -536,6 +540,7 @@ public class NTCPConnection implements Closeable {
|
||||
_sendSipIV = null;
|
||||
}
|
||||
}
|
||||
|
||||
for (OutNetMessage msg : pending) {
|
||||
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
|
||||
}
|
||||
@@ -589,6 +594,7 @@ public class NTCPConnection implements Closeable {
|
||||
+ ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
|
||||
+ ", currentOut set? " + currentOutboundSet
|
||||
+ ", id: " + seq
|
||||
+ ", bw reqs: " + _bwOutRequests.size()
|
||||
+ ", writeBufs: " + writeBufs + " on " + toString());
|
||||
} catch (RuntimeException e) {} // java.nio.channels.CancelledKeyException
|
||||
}
|
||||
@@ -1004,8 +1010,11 @@ public class NTCPConnection implements Closeable {
|
||||
removeOBRequest(req);
|
||||
ByteBuffer buf = (ByteBuffer)req.attachment();
|
||||
if (!_closed.get()) {
|
||||
_context.statManager().addRateData("ntcp.throttledWriteComplete", (_context.clock().now()-req.getRequestTime()));
|
||||
long delay = _context.clock().now() - req.getRequestTime();
|
||||
_context.statManager().addRateData("ntcp.throttledWriteComplete", delay);
|
||||
write(buf);
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Write " + buf.remaining() + " after delay: " + delay + " on " + NTCPConnection.this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -500,7 +500,7 @@ class OutboundMessageState implements CDPQEntry {
|
||||
* @since 0.9.3
|
||||
*/
|
||||
public int getPriority() {
|
||||
return _message != null ? _message.getPriority() : 1000;
|
||||
return _message != null ? _message.getPriority() : PacketBuilder.PRIORITY_HIGH;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@@ -18,6 +18,7 @@ import net.i2p.data.router.RouterIdentity;
|
||||
import net.i2p.data.SessionKey;
|
||||
import net.i2p.data.Signature;
|
||||
import net.i2p.data.router.RouterAddress;
|
||||
import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.transport.TransportUtil;
|
||||
import net.i2p.util.Addresses;
|
||||
@@ -182,6 +183,9 @@ class PacketBuilder {
|
||||
private static final byte DATA_FLAG_BYTE = UDPPacket.PAYLOAD_TYPE_DATA << 4;
|
||||
private static final byte PEER_TEST_FLAG_BYTE = UDPPacket.PAYLOAD_TYPE_TEST << 4;
|
||||
private static final byte SESSION_DESTROY_FLAG_BYTE = (byte) (UDPPacket.PAYLOAD_TYPE_SESSION_DESTROY << 4);
|
||||
|
||||
/* Higher than all other OutNetMessage priorities, but still droppable */
|
||||
static final int PRIORITY_HIGH = 600;
|
||||
|
||||
/**
|
||||
* No state, all methods are thread-safe.
|
||||
@@ -318,9 +322,13 @@ class PacketBuilder {
|
||||
// calculate data size
|
||||
int numFragments = fragments.size();
|
||||
int dataSize = 0;
|
||||
int priority = 0;
|
||||
for (int i = 0; i < numFragments; i++) {
|
||||
Fragment frag = fragments.get(i);
|
||||
OutboundMessageState state = frag.state;
|
||||
int pri = state.getPriority();
|
||||
if (pri > priority)
|
||||
priority = pri;
|
||||
int fragment = frag.num;
|
||||
int sz = state.fragmentSize(fragment);
|
||||
dataSize += sz;
|
||||
@@ -569,6 +577,7 @@ class PacketBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
packet.setPriority(priority);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -681,6 +690,7 @@ class PacketBuilder {
|
||||
pkt.setLength(off);
|
||||
authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey());
|
||||
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
|
||||
packet.setPriority((fullACKCount > 0 || partialACKCount > 0) ? PRIORITY_HIGH : OutNetMessage.PRIORITY_LOWEST);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -775,6 +785,7 @@ class PacketBuilder {
|
||||
setTo(packet, to, state.getSentPort());
|
||||
SimpleByteCache.release(iv);
|
||||
packet.setMessageType(TYPE_CREAT);
|
||||
packet.setPriority(PRIORITY_HIGH);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -840,6 +851,7 @@ class PacketBuilder {
|
||||
authenticate(packet, state.getIntroKey(), state.getIntroKey());
|
||||
setTo(packet, to, port);
|
||||
packet.setMessageType(TYPE_SREQ);
|
||||
packet.setPriority(PRIORITY_HIGH);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -944,6 +956,7 @@ class PacketBuilder {
|
||||
authenticate(packet, state.getCipherKey(), state.getMACKey());
|
||||
setTo(packet, to, state.getSentPort());
|
||||
packet.setMessageType(TYPE_CONF);
|
||||
packet.setPriority(PRIORITY_HIGH);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -1040,6 +1053,7 @@ class PacketBuilder {
|
||||
pkt.setLength(off);
|
||||
authenticate(packet, cipherKey, macKey);
|
||||
setTo(packet, addr, port);
|
||||
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -1084,6 +1098,7 @@ class PacketBuilder {
|
||||
authenticate(packet, toCipherKey, toMACKey);
|
||||
setTo(packet, toIP, toPort);
|
||||
packet.setMessageType(TYPE_TFA);
|
||||
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -1135,6 +1150,7 @@ class PacketBuilder {
|
||||
authenticate(packet, aliceCipherKey, aliceMACKey);
|
||||
setTo(packet, aliceIP, alicePort);
|
||||
packet.setMessageType(TYPE_TTA);
|
||||
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -1172,6 +1188,7 @@ class PacketBuilder {
|
||||
authenticate(packet, charlieCipherKey, charlieMACKey);
|
||||
setTo(packet, charlieIP, charliePort);
|
||||
packet.setMessageType(TYPE_TBC);
|
||||
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -1209,6 +1226,7 @@ class PacketBuilder {
|
||||
authenticate(packet, bobCipherKey, bobMACKey);
|
||||
setTo(packet, bobIP, bobPort);
|
||||
packet.setMessageType(TYPE_TCB);
|
||||
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -1372,6 +1390,7 @@ class PacketBuilder {
|
||||
authenticate(packet, cipherKey, macKey);
|
||||
setTo(packet, introHost, introPort);
|
||||
packet.setMessageType(TYPE_RREQ);
|
||||
packet.setPriority(PRIORITY_HIGH);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -1405,6 +1424,7 @@ class PacketBuilder {
|
||||
authenticate(packet, charlie.getCurrentCipherKey(), charlie.getCurrentMACKey());
|
||||
setTo(packet, charlie.getRemoteIPAddress(), charlie.getRemotePort());
|
||||
packet.setMessageType(TYPE_INTRO);
|
||||
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -1451,6 +1471,7 @@ class PacketBuilder {
|
||||
authenticate(packet, cipherKey, macKey);
|
||||
setTo(packet, aliceAddr, alice.getPort());
|
||||
packet.setMessageType(TYPE_RESP);
|
||||
packet.setPriority(OutNetMessage.PRIORITY_LOWEST);
|
||||
return packet;
|
||||
}
|
||||
|
||||
@@ -1469,6 +1490,7 @@ class PacketBuilder {
|
||||
setTo(packet, to, port);
|
||||
|
||||
packet.setMessageType(TYPE_PUNCH);
|
||||
packet.setPriority(PRIORITY_HIGH);
|
||||
return packet;
|
||||
}
|
||||
|
||||
|
@@ -12,7 +12,7 @@ import net.i2p.data.DataHelper;
|
||||
import net.i2p.data.SessionKey;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.transport.FIFOBandwidthLimiter;
|
||||
import net.i2p.router.util.CDQEntry;
|
||||
import net.i2p.router.util.CDPQEntry;
|
||||
import net.i2p.util.TryCache;
|
||||
import net.i2p.util.Addresses;
|
||||
import net.i2p.util.Log;
|
||||
@@ -23,10 +23,10 @@ import net.i2p.util.SystemVersion;
|
||||
* of object instances to allow rapid reuse.
|
||||
*
|
||||
*/
|
||||
class UDPPacket implements CDQEntry {
|
||||
class UDPPacket implements CDPQEntry {
|
||||
private RouterContext _context;
|
||||
private final DatagramPacket _packet;
|
||||
private volatile short _priority;
|
||||
private int _priority;
|
||||
private volatile long _initializeTime;
|
||||
//private volatile long _expiration;
|
||||
private final byte[] _data;
|
||||
@@ -46,6 +46,7 @@ class UDPPacket implements CDQEntry {
|
||||
private int _validateCount;
|
||||
// private boolean _isInbound;
|
||||
private FIFOBandwidthLimiter.Request _bandwidthRequest;
|
||||
private long _seqNum;
|
||||
|
||||
private static class PacketFactory implements TryCache.ObjectFactory<UDPPacket> {
|
||||
static RouterContext context;
|
||||
@@ -168,6 +169,20 @@ class UDPPacket implements CDQEntry {
|
||||
_receivedTime = 0;
|
||||
_fragmentCount = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* CDPQEntry
|
||||
* @since 0.9.53
|
||||
*/
|
||||
public void setSeqNum(long num) { _seqNum = num; }
|
||||
|
||||
/**
|
||||
* CDPQEntry
|
||||
* @since 0.9.53
|
||||
*/
|
||||
public long getSeqNum() { return _seqNum; }
|
||||
|
||||
|
||||
|
||||
/****
|
||||
public void writeData(byte src[], int offset, int len) {
|
||||
@@ -180,7 +195,13 @@ class UDPPacket implements CDQEntry {
|
||||
|
||||
/** */
|
||||
public synchronized DatagramPacket getPacket() { verifyNotReleased(); return _packet; }
|
||||
public synchronized short getPriority() { verifyNotReleased(); return _priority; }
|
||||
public int getPriority() { return _priority; }
|
||||
|
||||
/**
|
||||
* @since 0.9.53
|
||||
*/
|
||||
public void setPriority(int pri) { _priority = pri; }
|
||||
|
||||
//public long getExpiration() { verifyNotReleased(); return _expiration; }
|
||||
public synchronized long getBegin() { verifyNotReleased(); return _initializeTime; }
|
||||
public long getLifetime() { /** verifyNotReleased(); */ return _context.clock().now() - _initializeTime; }
|
||||
@@ -394,6 +415,7 @@ class UDPPacket implements CDQEntry {
|
||||
buf.append(" byte pkt with ");
|
||||
buf.append(Addresses.toString(_packet.getAddress().getAddress(), _packet.getPort()));
|
||||
//buf.append(" id=").append(System.identityHashCode(this));
|
||||
buf.append(" priority=").append(_priority);
|
||||
if (_messageType >= 0)
|
||||
buf.append(" msgType=").append(_messageType);
|
||||
if (_markedType >= 0)
|
||||
|
@@ -7,7 +7,8 @@ import java.util.concurrent.BlockingQueue;
|
||||
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.transport.FIFOBandwidthLimiter;
|
||||
import net.i2p.router.util.CoDelBlockingQueue;
|
||||
//import net.i2p.router.util.CoDelBlockingQueue;
|
||||
import net.i2p.router.util.CoDelPriorityBlockingQueue;
|
||||
import net.i2p.util.I2PThread;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SystemVersion;
|
||||
@@ -46,7 +47,8 @@ class UDPSender {
|
||||
_log = ctx.logManager().getLog(UDPSender.class);
|
||||
long maxMemory = SystemVersion.getMaxMemory();
|
||||
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024)));
|
||||
_outboundQueue = new CoDelBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL);
|
||||
//_outboundQueue = new CoDelBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL);
|
||||
_outboundQueue = new CoDelPriorityBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL);
|
||||
_socket = socket;
|
||||
_runner = new Runner();
|
||||
_name = name;
|
||||
|
Reference in New Issue
Block a user