diff --git a/src/java/net/i2p/zzzot/Peer.java b/src/java/net/i2p/zzzot/Peer.java index b36a131..19b0b8a 100644 --- a/src/java/net/i2p/zzzot/Peer.java +++ b/src/java/net/i2p/zzzot/Peer.java @@ -36,6 +36,13 @@ public class Peer { hash = address.calculateHash(); } + /** + * @since 0.20.0 + */ + public Peer(byte[] id, Hash h) { + hash = h; + } + public void setLeft(long l) { bytesLeft = l; lastSeen = System.currentTimeMillis(); diff --git a/src/java/net/i2p/zzzot/UDPHandler.java b/src/java/net/i2p/zzzot/UDPHandler.java index 1e7a2c8..324def3 100644 --- a/src/java/net/i2p/zzzot/UDPHandler.java +++ b/src/java/net/i2p/zzzot/UDPHandler.java @@ -22,6 +22,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import net.i2p.I2PAppContext; @@ -57,6 +63,9 @@ public class UDPHandler implements I2PSessionMuxedListener { private final Map _destCache; private final AtomicInteger _announces = new AtomicInteger(); private volatile boolean _running; + private ThreadPoolExecutor _executor; + /** how long to wait before dropping an idle thread */ + private static final long HANDLER_KEEPALIVE_MS = 2*60*1000; // The listen port. public final int PORT; @@ -70,8 +79,9 @@ public class UDPHandler implements I2PSessionMuxedListener { private static final int EVENT_COMPLETED = 1; private static final int EVENT_STARTED = 2; private static final int EVENT_STOPPED = 3; - // keep it short, we should have the leaseset - private final long LOOKUP_TIMEOUT = 1000; + // keep it short, we should have the leaseset, + // if a new ratchet session was created + private final long LOOKUP_TIMEOUT = 2000; private final long CLEAN_TIME; private final long STAT_TIME = 2*60*1000; private static final byte[] INVALID = DataHelper.getUTF8("Invalid connection ID"); @@ -93,8 +103,10 @@ public class UDPHandler implements I2PSessionMuxedListener { _destCache = new LHMCache(1024); } - public void start() { + public synchronized void start() { _running = true; + _executor = new CustomThreadPoolExecutor(); + _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); (new I2PAppThread(new Waiter(), "ZzzOT UDP startup", true)).start(); long[] r = new long[] { 5*60*1000 }; _context.statManager().createRequiredRateStat("plugin.zzzot.announces.udp", "UDP announces per minute", "Plugins", r); @@ -103,8 +115,11 @@ public class UDPHandler implements I2PSessionMuxedListener { /** * @since 0.20.0 */ - public void stop() { + public synchronized void stop() { _running = false; + _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + _executor.shutdownNow(); + _executor = null; _cleaner.cancel(); _context.statManager().removeRateStat("plugin.zzzot.announces.udp"); _announces.set(0); @@ -269,11 +284,6 @@ public class UDPHandler implements I2PSessionMuxedListener { return; } - // TODO use a waiter - Destination from = lookup(session, fromHash); - if (from == null) - return; - // parse packet byte[] bih = new byte[InfoHash.LENGTH]; System.arraycopy(data, 16, bih, 0, InfoHash.LENGTH); @@ -316,7 +326,7 @@ public class UDPHandler implements I2PSessionMuxedListener { } else { Peer p = peers.get(pid); if (p == null) { - p = new Peer(pid.getData(), from); + p = new Peer(pid.getData(), fromHash); Peer p2 = peers.putIfAbsent(pid, p); if (p2 != null) p = p2; @@ -360,6 +370,17 @@ public class UDPHandler implements I2PSessionMuxedListener { } } + Destination from = lookupCache(fromHash); + if (from == null) { + try { + _executor.execute(new Lookup(session, fromHash, fromPort, resp)); + } catch (RejectedExecutionException ree) { + if (_log.shouldWarn()) + _log.warn("error sending announce reply - thread pool full"); + } + return; + } + try { session.sendMessage(from, resp, I2PSession.PROTO_DATAGRAM_RAW, PORT, fromPort); if (_log.shouldDebug()) @@ -375,10 +396,13 @@ public class UDPHandler implements I2PSessionMuxedListener { * @param msg non-null */ private void sendError(I2PSession session, Hash toHash, int toPort, long transID, byte[] msg) { - // TODO use a waiter - Destination to = lookup(session, toHash); - if (to == null) + Destination to = lookupCache(toHash); + if (to == null) { + if (_log.shouldInfo()) + _log.info("don't have cached dest to send error to " + toHash.toBase32()); return; + } + // don't bother looking up via I2CP sendError(session, to, toPort, transID, msg); } @@ -406,16 +430,37 @@ public class UDPHandler implements I2PSessionMuxedListener { * @return null on failure */ private Destination lookup(I2PSession session, Hash hash) { - Destination rv; - synchronized(_destCache) { - rv = _destCache.get(hash); - } + Destination rv = lookupCache(hash); if (rv != null) return rv; - // TODO use a waiter + return lookupI2CP(session, hash); + } + + /** + * Nonblocking. + * @return null on failure + */ + private Destination lookupCache(Hash hash) { + // Test deferred + //if (true) return null; + synchronized(_destCache) { + return _destCache.get(hash); + } + } + + /** + * Blocking. + * @return null on failure + */ + private Destination lookupI2CP(I2PSession session, Hash hash) { + Destination rv; try { rv = session.lookupDest(hash, LOOKUP_TIMEOUT); - } catch (I2PSessionException ise) {} + } catch (I2PSessionException ise) { + if (_log.shouldWarn()) + _log.warn("lookup error", ise); + return null; + } if (rv == null) { if (_log.shouldWarn()) _log.warn("lookup failed for response to " + hash.toBase32()); @@ -458,4 +503,69 @@ public class UDPHandler implements I2PSessionMuxedListener { schedule(STAT_TIME); } } + + /** + * Until we have a nonblocking lookup API in I2CP + * + * @since 0.20.0 + */ + private class Lookup implements Runnable { + private final I2PSession _session; + private final Hash _hash; + private final int _port; + private final byte[] _msg; + + public Lookup(I2PSession sess, Hash h, int port, byte[] msg) { + _session = sess; + _hash = h; + _port = port; + _msg = msg; + } + + public void run() { + // blocking + Destination d = lookupI2CP(_session, _hash); + if (d == null) { + if (_log.shouldWarn()) + _log.warn("deferred lookup failed for " + _hash.toBase32()); + return; + } + try { + _session.sendMessage(d, _msg, I2PSession.PROTO_DATAGRAM_RAW, PORT, _port); + if (_log.shouldDebug()) + _log.debug("sent deferred reply to " + _hash.toBase32()); + } catch (I2PSessionException ise) { + if (_log.shouldWarn()) + _log.warn("error sending deferred reply", ise); + } + } + } + + /** + * Until we have a nonblocking lookup API in I2CP + * + * @since 0.20.0 + */ + private static class CustomThreadPoolExecutor extends ThreadPoolExecutor { + public CustomThreadPoolExecutor() { + super(0, 25, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS, + new SynchronousQueue(), new CustomThreadFactory()); + } + } + + /** + * Just to set the name and set Daemon + * + * @since 0.20.0 + */ + private static class CustomThreadFactory implements ThreadFactory { + private final AtomicInteger _executorThreadCount = new AtomicInteger(); + + public Thread newThread(Runnable r) { + Thread rv = Executors.defaultThreadFactory().newThread(r); + rv.setName("ZzzOT lookup " + _executorThreadCount.incrementAndGet()); + rv.setDaemon(true); + return rv; + } + } } diff --git a/src/java/net/i2p/zzzot/ZzzOTController.java b/src/java/net/i2p/zzzot/ZzzOTController.java index bd3d736..cdcfc4a 100644 --- a/src/java/net/i2p/zzzot/ZzzOTController.java +++ b/src/java/net/i2p/zzzot/ZzzOTController.java @@ -82,7 +82,7 @@ public class ZzzOTController implements ClientApp { private static final String NAME = "ZzzOT"; private static final String DEFAULT_SITENAME = "ZZZOT"; private static final String PROP_SITENAME = "sitename"; - private static final String VERSION = "0.20.0-beta"; + private static final String VERSION = "0.20.0-beta2"; private static final String DEFAULT_SHOWFOOTER = "true"; private static final String PROP_SHOWFOOTER = "showfooter"; private static final String DEFAULT_FOOTERTEXT = "Running ZZZOT " + VERSION;