diff --git a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java index 6cca9171a..a2c055ade 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java +++ b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java @@ -1,5 +1,6 @@ package net.i2p.router.tunnel; +import net.i2p.data.Hash; import net.i2p.data.RouterInfo; import net.i2p.data.i2np.TunnelDataMessage; import net.i2p.router.JobImpl; @@ -7,7 +8,8 @@ import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; /** - * Handle messages at the IBGW + * Handle messages at the IBGW. + * Not used for zero-hop IBGWs. */ class InboundGatewayReceiver implements TunnelGateway.Receiver { private final RouterContext _context; @@ -64,6 +66,15 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver { return msg.getUniqueId(); } + /** + * The next hop + * @return non-null + * @since 0.9.3 + */ + public Hash getSendTo() { + return _config.getSendTo(); + } + private class ReceiveJob extends JobImpl { private final byte[] _encrypted; diff --git a/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java b/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java index abcb5a742..d67dcc39f 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java @@ -1,5 +1,6 @@ package net.i2p.router.tunnel; +import net.i2p.data.Hash; import net.i2p.data.RouterInfo; import net.i2p.data.i2np.TunnelDataMessage; import net.i2p.router.JobImpl; @@ -12,6 +13,7 @@ import net.i2p.util.Log; * Receive the outbound message after it has been preprocessed and encrypted, * then forward it on to the first hop in the tunnel. * + * Not used for zero-hop OBGWs. */ class OutboundReceiver implements TunnelGateway.Receiver { private final RouterContext _context; @@ -55,6 +57,15 @@ class OutboundReceiver implements TunnelGateway.Receiver { } } + /** + * The next hop + * @return non-null + * @since 0.9.3 + */ + public Hash getSendTo() { + return _config.getPeer(1); + } + private void send(TunnelDataMessage msg, RouterInfo ri) { if (_log.shouldLog(Log.DEBUG)) _log.debug("forwarding encrypted data out " + _config + ": " + msg.getUniqueId()); diff --git a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java index 488af9e7f..560fb7f39 100644 --- a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java @@ -39,9 +39,14 @@ class PumpedTunnelGateway extends TunnelGateway { private final BlockingQueue _prequeue; private final TunnelGatewayPumper _pumper; private final boolean _isInbound; + private final Hash _nextHop; - private static final int MAX_OB_MSGS_PER_PUMP = 16; - private static final int MAX_IB_MSGS_PER_PUMP = 8; + /** + * warning - these limit total messages per second throughput due to + * requeue delay in TunnelGatewayPumper to max * 1000 / REQUEUE_TIME + */ + private static final int MAX_OB_MSGS_PER_PUMP = 64; + private static final int MAX_IB_MSGS_PER_PUMP = 24; private static final int INITIAL_OB_QUEUE = 64; private static final int MAX_IB_QUEUE = 1024; @@ -53,15 +58,23 @@ class PumpedTunnelGateway extends TunnelGateway { * @param receiver this receives the encrypted message and forwards it off * to the first hop */ - public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) { + public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, + Sender sender, Receiver receiver, TunnelGatewayPumper pumper) { super(context, preprocessor, sender, receiver); if (getClass() == PumpedTunnelGateway.class) { // Unbounded priority queue for outbound _prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE); + _nextHop = receiver.getSendTo(); _isInbound = false; - } else { // extended by ThrottledPTG for IB + } else if (receiver != null) { // extended by ThrottledPTG for IB // Bounded non-priority queue for inbound _prequeue = new CoDelBlockingQueue(context, "IBGW", MAX_IB_QUEUE); + _nextHop = receiver.getSendTo(); + _isInbound = true; + } else { + // Poison PTG + _prequeue = null; + _nextHop = null; _isInbound = true; } _pumper = pumper; @@ -103,15 +116,25 @@ class PumpedTunnelGateway extends TunnelGateway { * * @param queueBuf Empty list for convenience, to use as a temporary buffer. * Must be empty when called; will always be emptied before return. + * @return true if we did not finish, and the pumper should be requeued. */ - void pump(List queueBuf) { - // TODO if an IBGW, and the next hop is backlogged, - // drain less or none... better to let things back up here. - // Don't do this for OBGWs? - int max = _isInbound ? MAX_IB_MSGS_PER_PUMP : MAX_OB_MSGS_PER_PUMP; + public boolean pump(List queueBuf) { + // If the next hop is backlogged, + // drain only a little... better to let things back up here, + // before fragmentation, where we have priority queueing (for OBGW) + int max; + boolean backlogged = _context.commSystem().isBacklogged(_nextHop); + if (backlogged && _log.shouldLog(Log.INFO)) + _log.info("PTG backlogged, queued to " + _nextHop + " : " + _prequeue.size() + + " IB? " + _isInbound); + if (backlogged) + max = _isInbound ? 1 : 2; + else + max = _isInbound ? MAX_IB_MSGS_PER_PUMP : MAX_OB_MSGS_PER_PUMP; _prequeue.drainTo(queueBuf, max); if (queueBuf.isEmpty()) - return; + return false; + boolean rv = !_prequeue.isEmpty(); long startAdd = System.currentTimeMillis(); long beforeLock = startAdd; @@ -162,8 +185,10 @@ class PumpedTunnelGateway extends TunnelGateway { + " queue flush: " + (complete-afterExpire)); } queueBuf.clear(); - if (!_prequeue.isEmpty()) - _pumper.wantsPumping(this); + if (rv && _log.shouldLog(Log.INFO)) + _log.info("PTG remaining to " + _nextHop + " : " + _prequeue.size() + + " IB? " + _isInbound + " backlogged? " + backlogged); + return rv; } } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java index 03c93b8ce..fe54c9e02 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java @@ -183,6 +183,13 @@ class TunnelGateway { * @return message ID it was sent in, or -1 if it had to be deferred */ public long receiveEncrypted(byte encrypted[]); + + /** + * The next hop + * @return non-null + * @since 0.9.3 + */ + public Hash getSendTo(); } protected class DelayedFlush extends SimpleTimer2.TimedEvent { diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java index 380023811..7dfe8dc76 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java @@ -1,6 +1,7 @@ package net.i2p.router.tunnel; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -10,6 +11,8 @@ import java.util.concurrent.LinkedBlockingQueue; import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; +import net.i2p.util.SimpleScheduler; +import net.i2p.util.SimpleTimer; /** * Run through the tunnel gateways that have had messages added to them and push @@ -22,15 +25,23 @@ import net.i2p.util.I2PThread; class TunnelGatewayPumper implements Runnable { private final RouterContext _context; private final Set _wantsPumping; + private final Set _backlogged; private volatile boolean _stop; private static final int MIN_PUMPERS = 1; private static final int MAX_PUMPERS = 4; private final int _pumpers; + + /** + * Wait just a little, but this lets the pumper queue back up. + * See additional comments in PTG. + */ + private static final long REQUEUE_TIME = 50; /** Creates a new instance of TunnelGatewayPumper */ public TunnelGatewayPumper(RouterContext ctx) { _context = ctx; _wantsPumping = new LinkedHashSet(16); + _backlogged = new HashSet(16); long maxMemory = Runtime.getRuntime().maxMemory(); if (maxMemory == Long.MAX_VALUE) maxMemory = 96*1024*1024l; @@ -57,7 +68,7 @@ class TunnelGatewayPumper implements Runnable { public void wantsPumping(PumpedTunnelGateway gw) { if (!_stop) { synchronized (_wantsPumping) { - if (_wantsPumping.add(gw)) + if ((!_backlogged.contains(gw)) && _wantsPumping.add(gw)) _wantsPumping.notify(); } } @@ -66,9 +77,17 @@ class TunnelGatewayPumper implements Runnable { public void run() { PumpedTunnelGateway gw = null; List queueBuf = new ArrayList(32); + boolean requeue = false; while (!_stop) { try { synchronized (_wantsPumping) { + if (requeue && gw != null) { + // in case another packet came in + _wantsPumping.remove(gw); + if (_backlogged.add(gw)) + _context.simpleScheduler().addEvent(new Requeue(gw), REQUEUE_TIME); + } + gw = null; if (_wantsPumping.isEmpty()) { _wantsPumping.wait(); } else { @@ -81,12 +100,28 @@ class TunnelGatewayPumper implements Runnable { if (gw != null) { if (gw.getMessagesSent() == POISON_PTG) break; - gw.pump(queueBuf); - gw = null; + requeue = gw.pump(queueBuf); } } } + private class Requeue implements SimpleTimer.TimedEvent { + private final PumpedTunnelGateway _ptg; + + public Requeue(PumpedTunnelGateway ptg) { + _ptg = ptg; + } + + public void timeReached() { + synchronized (_wantsPumping) { + _backlogged.remove(_ptg); + if (_wantsPumping.add(_ptg)) + _wantsPumping.notify(); + } + } + } + + private static final int POISON_PTG = -99999; private static class PoisonPTG extends PumpedTunnelGateway {