* TunnelGateway: Implement pushback from a backlogged transport

queue to the pre-fragmentation queue
This commit is contained in:
zzz
2012-09-10 21:30:54 +00:00
parent e0fc642fc3
commit e8a8f3c210
5 changed files with 105 additions and 16 deletions

View File

@@ -1,5 +1,6 @@
package net.i2p.router.tunnel;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.TunnelDataMessage;
import net.i2p.router.JobImpl;
@@ -7,7 +8,8 @@ import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
/**
* Handle messages at the IBGW
* Handle messages at the IBGW.
* Not used for zero-hop IBGWs.
*/
class InboundGatewayReceiver implements TunnelGateway.Receiver {
private final RouterContext _context;
@@ -64,6 +66,15 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver {
return msg.getUniqueId();
}
/**
* The next hop
* @return non-null
* @since 0.9.3
*/
public Hash getSendTo() {
return _config.getSendTo();
}
private class ReceiveJob extends JobImpl {
private final byte[] _encrypted;

View File

@@ -1,5 +1,6 @@
package net.i2p.router.tunnel;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.TunnelDataMessage;
import net.i2p.router.JobImpl;
@@ -12,6 +13,7 @@ import net.i2p.util.Log;
* Receive the outbound message after it has been preprocessed and encrypted,
* then forward it on to the first hop in the tunnel.
*
* Not used for zero-hop OBGWs.
*/
class OutboundReceiver implements TunnelGateway.Receiver {
private final RouterContext _context;
@@ -55,6 +57,15 @@ class OutboundReceiver implements TunnelGateway.Receiver {
}
}
/**
* The next hop
* @return non-null
* @since 0.9.3
*/
public Hash getSendTo() {
return _config.getPeer(1);
}
private void send(TunnelDataMessage msg, RouterInfo ri) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("forwarding encrypted data out " + _config + ": " + msg.getUniqueId());

View File

@@ -39,9 +39,14 @@ class PumpedTunnelGateway extends TunnelGateway {
private final BlockingQueue<PendingGatewayMessage> _prequeue;
private final TunnelGatewayPumper _pumper;
private final boolean _isInbound;
private final Hash _nextHop;
private static final int MAX_OB_MSGS_PER_PUMP = 16;
private static final int MAX_IB_MSGS_PER_PUMP = 8;
/**
* warning - these limit total messages per second throughput due to
* requeue delay in TunnelGatewayPumper to max * 1000 / REQUEUE_TIME
*/
private static final int MAX_OB_MSGS_PER_PUMP = 64;
private static final int MAX_IB_MSGS_PER_PUMP = 24;
private static final int INITIAL_OB_QUEUE = 64;
private static final int MAX_IB_QUEUE = 1024;
@@ -53,15 +58,23 @@ class PumpedTunnelGateway extends TunnelGateway {
* @param receiver this receives the encrypted message and forwards it off
* to the first hop
*/
public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor,
Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
super(context, preprocessor, sender, receiver);
if (getClass() == PumpedTunnelGateway.class) {
// Unbounded priority queue for outbound
_prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE);
_nextHop = receiver.getSendTo();
_isInbound = false;
} else { // extended by ThrottledPTG for IB
} else if (receiver != null) { // extended by ThrottledPTG for IB
// Bounded non-priority queue for inbound
_prequeue = new CoDelBlockingQueue(context, "IBGW", MAX_IB_QUEUE);
_nextHop = receiver.getSendTo();
_isInbound = true;
} else {
// Poison PTG
_prequeue = null;
_nextHop = null;
_isInbound = true;
}
_pumper = pumper;
@@ -103,15 +116,25 @@ class PumpedTunnelGateway extends TunnelGateway {
*
* @param queueBuf Empty list for convenience, to use as a temporary buffer.
* Must be empty when called; will always be emptied before return.
* @return true if we did not finish, and the pumper should be requeued.
*/
void pump(List<PendingGatewayMessage> queueBuf) {
// TODO if an IBGW, and the next hop is backlogged,
// drain less or none... better to let things back up here.
// Don't do this for OBGWs?
int max = _isInbound ? MAX_IB_MSGS_PER_PUMP : MAX_OB_MSGS_PER_PUMP;
public boolean pump(List<PendingGatewayMessage> queueBuf) {
// If the next hop is backlogged,
// drain only a little... better to let things back up here,
// before fragmentation, where we have priority queueing (for OBGW)
int max;
boolean backlogged = _context.commSystem().isBacklogged(_nextHop);
if (backlogged && _log.shouldLog(Log.INFO))
_log.info("PTG backlogged, queued to " + _nextHop + " : " + _prequeue.size() +
" IB? " + _isInbound);
if (backlogged)
max = _isInbound ? 1 : 2;
else
max = _isInbound ? MAX_IB_MSGS_PER_PUMP : MAX_OB_MSGS_PER_PUMP;
_prequeue.drainTo(queueBuf, max);
if (queueBuf.isEmpty())
return;
return false;
boolean rv = !_prequeue.isEmpty();
long startAdd = System.currentTimeMillis();
long beforeLock = startAdd;
@@ -162,8 +185,10 @@ class PumpedTunnelGateway extends TunnelGateway {
+ " queue flush: " + (complete-afterExpire));
}
queueBuf.clear();
if (!_prequeue.isEmpty())
_pumper.wantsPumping(this);
if (rv && _log.shouldLog(Log.INFO))
_log.info("PTG remaining to " + _nextHop + " : " + _prequeue.size() +
" IB? " + _isInbound + " backlogged? " + backlogged);
return rv;
}
}

View File

@@ -183,6 +183,13 @@ class TunnelGateway {
* @return message ID it was sent in, or -1 if it had to be deferred
*/
public long receiveEncrypted(byte encrypted[]);
/**
* The next hop
* @return non-null
* @since 0.9.3
*/
public Hash getSendTo();
}
protected class DelayedFlush extends SimpleTimer2.TimedEvent {

View File

@@ -1,6 +1,7 @@
package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@@ -10,6 +11,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Run through the tunnel gateways that have had messages added to them and push
@@ -22,15 +25,23 @@ import net.i2p.util.I2PThread;
class TunnelGatewayPumper implements Runnable {
private final RouterContext _context;
private final Set<PumpedTunnelGateway> _wantsPumping;
private final Set<PumpedTunnelGateway> _backlogged;
private volatile boolean _stop;
private static final int MIN_PUMPERS = 1;
private static final int MAX_PUMPERS = 4;
private final int _pumpers;
/**
* Wait just a little, but this lets the pumper queue back up.
* See additional comments in PTG.
*/
private static final long REQUEUE_TIME = 50;
/** Creates a new instance of TunnelGatewayPumper */
public TunnelGatewayPumper(RouterContext ctx) {
_context = ctx;
_wantsPumping = new LinkedHashSet(16);
_backlogged = new HashSet(16);
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
@@ -57,7 +68,7 @@ class TunnelGatewayPumper implements Runnable {
public void wantsPumping(PumpedTunnelGateway gw) {
if (!_stop) {
synchronized (_wantsPumping) {
if (_wantsPumping.add(gw))
if ((!_backlogged.contains(gw)) && _wantsPumping.add(gw))
_wantsPumping.notify();
}
}
@@ -66,9 +77,17 @@ class TunnelGatewayPumper implements Runnable {
public void run() {
PumpedTunnelGateway gw = null;
List<PendingGatewayMessage> queueBuf = new ArrayList(32);
boolean requeue = false;
while (!_stop) {
try {
synchronized (_wantsPumping) {
if (requeue && gw != null) {
// in case another packet came in
_wantsPumping.remove(gw);
if (_backlogged.add(gw))
_context.simpleScheduler().addEvent(new Requeue(gw), REQUEUE_TIME);
}
gw = null;
if (_wantsPumping.isEmpty()) {
_wantsPumping.wait();
} else {
@@ -81,12 +100,28 @@ class TunnelGatewayPumper implements Runnable {
if (gw != null) {
if (gw.getMessagesSent() == POISON_PTG)
break;
gw.pump(queueBuf);
gw = null;
requeue = gw.pump(queueBuf);
}
}
}
private class Requeue implements SimpleTimer.TimedEvent {
private final PumpedTunnelGateway _ptg;
public Requeue(PumpedTunnelGateway ptg) {
_ptg = ptg;
}
public void timeReached() {
synchronized (_wantsPumping) {
_backlogged.remove(_ptg);
if (_wantsPumping.add(_ptg))
_wantsPumping.notify();
}
}
}
private static final int POISON_PTG = -99999;
private static class PoisonPTG extends PumpedTunnelGateway {