* RetransmissionTimer: Instantiate per-destination

This commit is contained in:
zzz
2012-03-22 20:40:35 +00:00
parent cae94320b5
commit 675e8a91a4
8 changed files with 63 additions and 35 deletions

View File

@@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.client.I2PClient; import net.i2p.client.I2PClient;
import net.i2p.client.streaming.RetransmissionTimer;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer2; import net.i2p.util.SimpleTimer2;
@@ -182,10 +181,8 @@ public class BOB {
// Re-reading the config file in each thread is pretty damn stupid. // Re-reading the config file in each thread is pretty damn stupid.
String configLocation = System.getProperty(PROP_CONFIG_LOCATION, "bob.config"); String configLocation = System.getProperty(PROP_CONFIG_LOCATION, "bob.config");
// This is here just to ensure there is no interference with our threadgroups. // This is here just to ensure there is no interference with our threadgroups.
RetransmissionTimer Y = RetransmissionTimer.getInstance();
SimpleScheduler Y1 = SimpleScheduler.getInstance(); SimpleScheduler Y1 = SimpleScheduler.getInstance();
SimpleTimer2 Y2 = SimpleTimer2.getInstance(); SimpleTimer2 Y2 = SimpleTimer2.getInstance();
i = Y.hashCode();
i = Y1.hashCode(); i = Y1.hashCode();
i = Y2.hashCode(); i = Y2.hashCode();
try { try {

View File

@@ -23,7 +23,6 @@
*/ */
package net.i2p.BOB; package net.i2p.BOB;
import net.i2p.client.streaming.RetransmissionTimer;
import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer2; import net.i2p.util.SimpleTimer2;
@@ -40,7 +39,6 @@ public class Main {
*/ */
public static void main(String[] args) { public static void main(String[] args) {
// THINK THINK THINK THINK THINK THINK // THINK THINK THINK THINK THINK THINK
RetransmissionTimer Y = RetransmissionTimer.getInstance();
SimpleScheduler Y1 = SimpleScheduler.getInstance(); SimpleScheduler Y1 = SimpleScheduler.getInstance();
SimpleTimer2 Y2 = SimpleTimer2.getInstance(); SimpleTimer2 Y2 = SimpleTimer2.getInstance();
@@ -48,6 +46,5 @@ public class Main {
Y2.stop(); Y2.stop();
Y1.stop(); Y1.stop();
Y.stop();
} }
} }

View File

@@ -74,6 +74,7 @@ class Connection {
private final int _randomWait; private final int _randomWait;
private int _localPort; private int _localPort;
private int _remotePort; private int _remotePort;
private final SimpleTimer2 _timer;
private long _lifetimeBytesSent; private long _lifetimeBytesSent;
private long _lifetimeBytesReceived; private long _lifetimeBytesReceived;
@@ -88,12 +89,16 @@ class Connection {
public static final int MAX_WINDOW_SIZE = 128; public static final int MAX_WINDOW_SIZE = 128;
/****
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
PacketQueue queue, ConnectionPacketHandler handler) { PacketQueue queue, ConnectionPacketHandler handler) {
this(ctx, manager, chooser, queue, handler, null); this(ctx, manager, chooser, queue, handler, null);
} }
****/
/** */
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
SimpleTimer2 timer,
PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) {
_context = ctx; _context = ctx;
_connectionManager = manager; _connectionManager = manager;
@@ -104,7 +109,8 @@ class Connection {
_receiver = new ConnectionDataReceiver(_context, this); _receiver = new ConnectionDataReceiver(_context, this);
_inputStream = new MessageInputStream(_context); _inputStream = new MessageInputStream(_context);
// FIXME pass through a passive flush delay setting as the 4th arg // FIXME pass through a passive flush delay setting as the 4th arg
_outputStream = new MessageOutputStream(_context, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); _outputStream = new MessageOutputStream(_context, timer, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize()));
_timer = timer;
_outboundPackets = new TreeMap(); _outboundPackets = new TreeMap();
if (opts != null) { if (opts != null) {
_localPort = opts.getLocalPort(); _localPort = opts.getLocalPort();
@@ -895,7 +901,7 @@ class Connection {
private class ActivityTimer extends SimpleTimer2.TimedEvent { private class ActivityTimer extends SimpleTimer2.TimedEvent {
public ActivityTimer() { public ActivityTimer() {
super(RetransmissionTimer.getInstance()); super(_timer);
setFuzz(5*1000); // sloppy timer, don't reschedule unless at least 5s later setFuzz(5*1000); // sloppy timer, don't reschedule unless at least 5s later
} }
public void timeReached() { public void timeReached() {
@@ -1093,7 +1099,7 @@ class Connection {
private PacketLocal _packet; private PacketLocal _packet;
private long _nextSendTime; private long _nextSendTime;
public ResendPacketEvent(PacketLocal packet, long delay) { public ResendPacketEvent(PacketLocal packet, long delay) {
super(RetransmissionTimer.getInstance()); super(_timer);
_packet = packet; _packet = packet;
_nextSendTime = delay + _context.clock().now(); _nextSendTime = delay + _context.clock().now();
packet.setResendPacketEvent(ResendPacketEvent.this); packet.setResendPacketEvent(ResendPacketEvent.this);

View File

@@ -14,6 +14,7 @@ import net.i2p.data.Hash;
import net.i2p.data.SessionKey; import net.i2p.data.SessionKey;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer; import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/** /**
* Coordinate all of the connections for a single local destination. * Coordinate all of the connections for a single local destination.
@@ -44,6 +45,8 @@ class ConnectionManager {
private ConnThrottler _minuteThrottler; private ConnThrottler _minuteThrottler;
private ConnThrottler _hourThrottler; private ConnThrottler _hourThrottler;
private ConnThrottler _dayThrottler; private ConnThrottler _dayThrottler;
/** since 0.9, each manager instantiates its own timer */
private final SimpleTimer2 _timer;
public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
_context = context; _context = context;
@@ -58,7 +61,9 @@ class ConnectionManager {
_connectionHandler = new ConnectionHandler(_context, this); _connectionHandler = new ConnectionHandler(_context, this);
_schedulerChooser = new SchedulerChooser(_context); _schedulerChooser = new SchedulerChooser(_context);
_conPacketHandler = new ConnectionPacketHandler(_context); _conPacketHandler = new ConnectionPacketHandler(_context);
_tcbShare = new TCBShare(_context); _timer = new RetransmissionTimer(_context, "Streaming Timer " +
session.getMyDestination().calculateHash().toBase64().substring(0, 4));
_tcbShare = new TCBShare(_context, _timer);
// PROTO_ANY is for backward compatibility (pre-0.7.1) // PROTO_ANY is for backward compatibility (pre-0.7.1)
// TODO change proto to PROTO_STREAMING someday. // TODO change proto to PROTO_STREAMING someday.
// Right now we get everything, and rely on Datagram to specify PROTO_UDP. // Right now we get everything, and rely on Datagram to specify PROTO_UDP.
@@ -146,7 +151,7 @@ class ConnectionManager {
ConnectionOptions opts = new ConnectionOptions(_defaultOptions); ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
opts.setPort(synPacket.getRemotePort()); opts.setPort(synPacket.getRemotePort());
opts.setLocalPort(synPacket.getLocalPort()); opts.setLocalPort(synPacket.getLocalPort());
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); Connection con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts);
_tcbShare.updateOptsFromShare(con); _tcbShare.updateOptsFromShare(con);
con.setInbound(); con.setInbound();
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
@@ -253,7 +258,7 @@ class ConnectionManager {
// try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} // try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {} try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {}
} else { } else {
con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts);
con.setRemotePeer(peer); con.setRemotePeer(peer);
while (_connectionByInboundId.containsKey(Long.valueOf(receiveId))) { while (_connectionByInboundId.containsKey(Long.valueOf(receiveId))) {
@@ -368,6 +373,7 @@ class ConnectionManager {
iter.remove(); iter.remove();
} }
_tcbShare.stop(); _tcbShare.stop();
_timer.stop();
} }
/** /**
@@ -467,8 +473,9 @@ class ConnectionManager {
} }
private class PingFailed implements SimpleTimer.TimedEvent { private class PingFailed implements SimpleTimer.TimedEvent {
private Long _id; private final Long _id;
private PingNotifier _notifier; private final PingNotifier _notifier;
public PingFailed(Long id, PingNotifier notifier) { public PingFailed(Long id, PingNotifier notifier) {
_id = id; _id = id;
_notifier = notifier; _notifier = notifier;
@@ -487,15 +494,16 @@ class ConnectionManager {
private static class PingRequest { private static class PingRequest {
private boolean _ponged; private boolean _ponged;
private Destination _peer; private final Destination _peer;
private PacketLocal _packet; private final PacketLocal _packet;
private PingNotifier _notifier; private final PingNotifier _notifier;
public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) { public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
_ponged = false;
_peer = peer; _peer = peer;
_packet = packet; _packet = packet;
_notifier = notifier; _notifier = notifier;
} }
public void pong() { public void pong() {
// static, no log // static, no log
//_log.debug("Ping successful"); //_log.debug("Ping successful");

View File

@@ -49,13 +49,20 @@ class MessageOutputStream extends OutputStream {
*/ */
private static final int DEFAULT_PASSIVE_FLUSH_DELAY = 250; private static final int DEFAULT_PASSIVE_FLUSH_DELAY = 250;
/****
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) { public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE); this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE);
} }
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize) { ****/
this(ctx, receiver, bufSize, DEFAULT_PASSIVE_FLUSH_DELAY);
/** */
public MessageOutputStream(I2PAppContext ctx, SimpleTimer2 timer,
DataReceiver receiver, int bufSize) {
this(ctx, timer, receiver, bufSize, DEFAULT_PASSIVE_FLUSH_DELAY);
} }
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize, int passiveFlushDelay) {
public MessageOutputStream(I2PAppContext ctx, SimpleTimer2 timer,
DataReceiver receiver, int bufSize, int passiveFlushDelay) {
super(); super();
_dataCache = ByteCache.getInstance(128, bufSize); _dataCache = ByteCache.getInstance(128, bufSize);
_context = ctx; _context = ctx;
@@ -68,7 +75,7 @@ class MessageOutputStream extends OutputStream {
_nextBufferSize = -1; _nextBufferSize = -1;
_sendPeriodBeginTime = ctx.clock().now(); _sendPeriodBeginTime = ctx.clock().now();
_context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
_flusher = new Flusher(); _flusher = new Flusher(timer);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("MessageOutputStream created"); _log.debug("MessageOutputStream created");
} }
@@ -212,8 +219,8 @@ class MessageOutputStream extends OutputStream {
*/ */
private class Flusher extends SimpleTimer2.TimedEvent { private class Flusher extends SimpleTimer2.TimedEvent {
private boolean _enqueued; private boolean _enqueued;
public Flusher() { public Flusher(SimpleTimer2 timer) {
super(RetransmissionTimer.getInstance()); super(timer);
} }
public void enqueue() { public void enqueue() {
// no need to be overly worried about duplicates - it would just // no need to be overly worried about duplicates - it would just

View File

@@ -1,15 +1,26 @@
package net.i2p.client.streaming; package net.i2p.client.streaming;
import net.i2p.I2PAppContext;
import net.i2p.util.SimpleTimer2; import net.i2p.util.SimpleTimer2;
/** /**
* Not clear that we really need to create our own timer group, but we do, * Per-destination timer
* to prevent us clogging the router's timer group.
* Use from outside this package is deprecated.
* (BOB instantiates this for thread group reasons)
*/ */
public class RetransmissionTimer extends SimpleTimer2 { public class RetransmissionTimer extends SimpleTimer2 {
private static final RetransmissionTimer _instance = new RetransmissionTimer();
public static final RetransmissionTimer getInstance() { return _instance; } /**
protected RetransmissionTimer() { super("StreamingTimer"); } * @deprecated Don't use this to prestart threads, this is no longer a static instance
* @return a new instance as of 0.9
*/
public static final RetransmissionTimer getInstance() {
return new RetransmissionTimer(I2PAppContext.getGlobalContext(), "RetransmissionTimer");
}
/**
* @since 0.9
*/
RetransmissionTimer(I2PAppContext ctx, String name) {
super(ctx, name, false);
}
} }

View File

@@ -33,11 +33,11 @@ class TCBShare {
private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2; private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE / 4; private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE / 4;
public TCBShare(I2PAppContext ctx) { public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(TCBShare.class); _log = ctx.logManager().getLog(TCBShare.class);
_cache = new ConcurrentHashMap(4); _cache = new ConcurrentHashMap(4);
_cleaner = new CleanEvent(); _cleaner = new CleanEvent(timer);
_cleaner.schedule(CLEAN_TIME); _cleaner.schedule(CLEAN_TIME);
} }
@@ -125,8 +125,9 @@ class TCBShare {
} }
private class CleanEvent extends SimpleTimer2.TimedEvent { private class CleanEvent extends SimpleTimer2.TimedEvent {
public CleanEvent() { public CleanEvent(SimpleTimer2 timer) {
super(RetransmissionTimer.getInstance()); // Use router's SimpleTimer2
super(timer);
} }
public void timeReached() { public void timeReached() {
for (Iterator iter = _cache.keySet().iterator(); iter.hasNext(); ) { for (Iterator iter = _cache.keySet().iterator(); iter.hasNext(); ) {

View File

@@ -2,6 +2,7 @@
* Home page: CSS tweaks * Home page: CSS tweaks
* Reseeder: Get rid of static instance, root in netDB, * Reseeder: Get rid of static instance, root in netDB,
don't use system properties for status don't use system properties for status
* RetransmissionTimer: Instantiate per-destination
* Router: When removing a config setting, remove from context also * Router: When removing a config setting, remove from context also
* SimpleScheduler, SimpleTimer, SimpleTimer2: Replace static instances * SimpleScheduler, SimpleTimer, SimpleTimer2: Replace static instances
with I2PAppContext-rooted references with I2PAppContext-rooted references