From 675e8a91a468e70ac0336799dd805be1979020dc Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 22 Mar 2012 20:40:35 +0000 Subject: [PATCH] * RetransmissionTimer: Instantiate per-destination --- apps/BOB/src/net/i2p/BOB/BOB.java | 3 --- apps/BOB/src/net/i2p/BOB/Main.java | 3 --- .../net/i2p/client/streaming/Connection.java | 12 ++++++--- .../client/streaming/ConnectionManager.java | 26 ++++++++++++------- .../client/streaming/MessageOutputStream.java | 19 +++++++++----- .../client/streaming/RetransmissionTimer.java | 25 +++++++++++++----- .../net/i2p/client/streaming/TCBShare.java | 9 ++++--- history.txt | 1 + 8 files changed, 63 insertions(+), 35 deletions(-) diff --git a/apps/BOB/src/net/i2p/BOB/BOB.java b/apps/BOB/src/net/i2p/BOB/BOB.java index 60bf2b9ff..e08c4cea9 100644 --- a/apps/BOB/src/net/i2p/BOB/BOB.java +++ b/apps/BOB/src/net/i2p/BOB/BOB.java @@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import net.i2p.I2PAppContext; import net.i2p.client.I2PClient; -import net.i2p.client.streaming.RetransmissionTimer; import net.i2p.util.Log; import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleTimer2; @@ -182,10 +181,8 @@ public class BOB { // Re-reading the config file in each thread is pretty damn stupid. String configLocation = System.getProperty(PROP_CONFIG_LOCATION, "bob.config"); // This is here just to ensure there is no interference with our threadgroups. - RetransmissionTimer Y = RetransmissionTimer.getInstance(); SimpleScheduler Y1 = SimpleScheduler.getInstance(); SimpleTimer2 Y2 = SimpleTimer2.getInstance(); - i = Y.hashCode(); i = Y1.hashCode(); i = Y2.hashCode(); try { diff --git a/apps/BOB/src/net/i2p/BOB/Main.java b/apps/BOB/src/net/i2p/BOB/Main.java index 53a7d9e71..099d01636 100644 --- a/apps/BOB/src/net/i2p/BOB/Main.java +++ b/apps/BOB/src/net/i2p/BOB/Main.java @@ -23,7 +23,6 @@ */ package net.i2p.BOB; -import net.i2p.client.streaming.RetransmissionTimer; import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleTimer2; @@ -40,7 +39,6 @@ public class Main { */ public static void main(String[] args) { // THINK THINK THINK THINK THINK THINK - RetransmissionTimer Y = RetransmissionTimer.getInstance(); SimpleScheduler Y1 = SimpleScheduler.getInstance(); SimpleTimer2 Y2 = SimpleTimer2.getInstance(); @@ -48,6 +46,5 @@ public class Main { Y2.stop(); Y1.stop(); - Y.stop(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 39afc7680..a7a99174e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -74,6 +74,7 @@ class Connection { private final int _randomWait; private int _localPort; private int _remotePort; + private final SimpleTimer2 _timer; private long _lifetimeBytesSent; private long _lifetimeBytesReceived; @@ -88,12 +89,16 @@ class Connection { public static final int MAX_WINDOW_SIZE = 128; +/**** public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { this(ctx, manager, chooser, queue, handler, null); } +****/ + /** */ public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, + SimpleTimer2 timer, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { _context = ctx; _connectionManager = manager; @@ -104,7 +109,8 @@ class Connection { _receiver = new ConnectionDataReceiver(_context, this); _inputStream = new MessageInputStream(_context); // FIXME pass through a passive flush delay setting as the 4th arg - _outputStream = new MessageOutputStream(_context, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); + _outputStream = new MessageOutputStream(_context, timer, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); + _timer = timer; _outboundPackets = new TreeMap(); if (opts != null) { _localPort = opts.getLocalPort(); @@ -895,7 +901,7 @@ class Connection { private class ActivityTimer extends SimpleTimer2.TimedEvent { public ActivityTimer() { - super(RetransmissionTimer.getInstance()); + super(_timer); setFuzz(5*1000); // sloppy timer, don't reschedule unless at least 5s later } public void timeReached() { @@ -1093,7 +1099,7 @@ class Connection { private PacketLocal _packet; private long _nextSendTime; public ResendPacketEvent(PacketLocal packet, long delay) { - super(RetransmissionTimer.getInstance()); + super(_timer); _packet = packet; _nextSendTime = delay + _context.clock().now(); packet.setResendPacketEvent(ResendPacketEvent.this); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 6e8811a2f..ee757aeaa 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -14,6 +14,7 @@ import net.i2p.data.Hash; import net.i2p.data.SessionKey; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; /** * Coordinate all of the connections for a single local destination. @@ -44,6 +45,8 @@ class ConnectionManager { private ConnThrottler _minuteThrottler; private ConnThrottler _hourThrottler; private ConnThrottler _dayThrottler; + /** since 0.9, each manager instantiates its own timer */ + private final SimpleTimer2 _timer; public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { _context = context; @@ -58,7 +61,9 @@ class ConnectionManager { _connectionHandler = new ConnectionHandler(_context, this); _schedulerChooser = new SchedulerChooser(_context); _conPacketHandler = new ConnectionPacketHandler(_context); - _tcbShare = new TCBShare(_context); + _timer = new RetransmissionTimer(_context, "Streaming Timer " + + session.getMyDestination().calculateHash().toBase64().substring(0, 4)); + _tcbShare = new TCBShare(_context, _timer); // PROTO_ANY is for backward compatibility (pre-0.7.1) // TODO change proto to PROTO_STREAMING someday. // Right now we get everything, and rely on Datagram to specify PROTO_UDP. @@ -146,7 +151,7 @@ class ConnectionManager { ConnectionOptions opts = new ConnectionOptions(_defaultOptions); opts.setPort(synPacket.getRemotePort()); opts.setLocalPort(synPacket.getLocalPort()); - Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); + Connection con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts); _tcbShare.updateOptsFromShare(con); con.setInbound(); long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; @@ -253,7 +258,7 @@ class ConnectionManager { // try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {} } else { - con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); + con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts); con.setRemotePeer(peer); while (_connectionByInboundId.containsKey(Long.valueOf(receiveId))) { @@ -368,6 +373,7 @@ class ConnectionManager { iter.remove(); } _tcbShare.stop(); + _timer.stop(); } /** @@ -467,8 +473,9 @@ class ConnectionManager { } private class PingFailed implements SimpleTimer.TimedEvent { - private Long _id; - private PingNotifier _notifier; + private final Long _id; + private final PingNotifier _notifier; + public PingFailed(Long id, PingNotifier notifier) { _id = id; _notifier = notifier; @@ -487,15 +494,16 @@ class ConnectionManager { private static class PingRequest { private boolean _ponged; - private Destination _peer; - private PacketLocal _packet; - private PingNotifier _notifier; + private final Destination _peer; + private final PacketLocal _packet; + private final PingNotifier _notifier; + public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) { - _ponged = false; _peer = peer; _packet = packet; _notifier = notifier; } + public void pong() { // static, no log //_log.debug("Ping successful"); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 737e0f7b2..96a3ebe55 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -49,13 +49,20 @@ class MessageOutputStream extends OutputStream { */ private static final int DEFAULT_PASSIVE_FLUSH_DELAY = 250; +/**** public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) { this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE); } - public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize) { - this(ctx, receiver, bufSize, DEFAULT_PASSIVE_FLUSH_DELAY); +****/ + + /** */ + public MessageOutputStream(I2PAppContext ctx, SimpleTimer2 timer, + DataReceiver receiver, int bufSize) { + this(ctx, timer, receiver, bufSize, DEFAULT_PASSIVE_FLUSH_DELAY); } - public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize, int passiveFlushDelay) { + + public MessageOutputStream(I2PAppContext ctx, SimpleTimer2 timer, + DataReceiver receiver, int bufSize, int passiveFlushDelay) { super(); _dataCache = ByteCache.getInstance(128, bufSize); _context = ctx; @@ -68,7 +75,7 @@ class MessageOutputStream extends OutputStream { _nextBufferSize = -1; _sendPeriodBeginTime = ctx.clock().now(); _context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); - _flusher = new Flusher(); + _flusher = new Flusher(timer); if (_log.shouldLog(Log.DEBUG)) _log.debug("MessageOutputStream created"); } @@ -212,8 +219,8 @@ class MessageOutputStream extends OutputStream { */ private class Flusher extends SimpleTimer2.TimedEvent { private boolean _enqueued; - public Flusher() { - super(RetransmissionTimer.getInstance()); + public Flusher(SimpleTimer2 timer) { + super(timer); } public void enqueue() { // no need to be overly worried about duplicates - it would just diff --git a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java index 6f3bc3054..93e9fda35 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java @@ -1,15 +1,26 @@ package net.i2p.client.streaming; +import net.i2p.I2PAppContext; import net.i2p.util.SimpleTimer2; /** - * Not clear that we really need to create our own timer group, but we do, - * to prevent us clogging the router's timer group. - * Use from outside this package is deprecated. - * (BOB instantiates this for thread group reasons) + * Per-destination timer */ public class RetransmissionTimer extends SimpleTimer2 { - private static final RetransmissionTimer _instance = new RetransmissionTimer(); - public static final RetransmissionTimer getInstance() { return _instance; } - protected RetransmissionTimer() { super("StreamingTimer"); } + + /** + * @deprecated Don't use this to prestart threads, this is no longer a static instance + * @return a new instance as of 0.9 + */ + public static final RetransmissionTimer getInstance() { + return new RetransmissionTimer(I2PAppContext.getGlobalContext(), "RetransmissionTimer"); + } + + + /** + * @since 0.9 + */ + RetransmissionTimer(I2PAppContext ctx, String name) { + super(ctx, name, false); + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java index d33e7a741..e4c4ec917 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java @@ -33,11 +33,11 @@ class TCBShare { private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2; private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE / 4; - public TCBShare(I2PAppContext ctx) { + public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) { _context = ctx; _log = ctx.logManager().getLog(TCBShare.class); _cache = new ConcurrentHashMap(4); - _cleaner = new CleanEvent(); + _cleaner = new CleanEvent(timer); _cleaner.schedule(CLEAN_TIME); } @@ -125,8 +125,9 @@ class TCBShare { } private class CleanEvent extends SimpleTimer2.TimedEvent { - public CleanEvent() { - super(RetransmissionTimer.getInstance()); + public CleanEvent(SimpleTimer2 timer) { + // Use router's SimpleTimer2 + super(timer); } public void timeReached() { for (Iterator iter = _cache.keySet().iterator(); iter.hasNext(); ) { diff --git a/history.txt b/history.txt index 8a51b9175..5c66bf9bb 100644 --- a/history.txt +++ b/history.txt @@ -2,6 +2,7 @@ * Home page: CSS tweaks * Reseeder: Get rid of static instance, root in netDB, don't use system properties for status + * RetransmissionTimer: Instantiate per-destination * Router: When removing a config setting, remove from context also * SimpleScheduler, SimpleTimer, SimpleTimer2: Replace static instances with I2PAppContext-rooted references