NTCP: Use context time, not system time, in NTCPConnection

so that clock skew calculations work right
This commit is contained in:
zzz
2015-04-29 02:35:22 +00:00
parent fe680eb192
commit 489fdd5e4b

View File

@@ -172,7 +172,7 @@ class NTCPConnection {
public NTCPConnection(RouterContext ctx, NTCPTransport transport, SocketChannel chan, SelectionKey key) { public NTCPConnection(RouterContext ctx, NTCPTransport transport, SocketChannel chan, SelectionKey key) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(getClass()); _log = ctx.logManager().getLog(getClass());
_created = System.currentTimeMillis(); _created = ctx.clock().now();
_transport = transport; _transport = transport;
_remAddr = null; _remAddr = null;
_chan = chan; _chan = chan;
@@ -200,7 +200,7 @@ class NTCPConnection {
public NTCPConnection(RouterContext ctx, NTCPTransport transport, RouterIdentity remotePeer, RouterAddress remAddr) { public NTCPConnection(RouterContext ctx, NTCPTransport transport, RouterIdentity remotePeer, RouterAddress remAddr) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(getClass()); _log = ctx.logManager().getLog(getClass());
_created = System.currentTimeMillis(); _created = ctx.clock().now();
_transport = transport; _transport = transport;
_remotePeer = remotePeer; _remotePeer = remotePeer;
_remAddr = remAddr; _remAddr = remAddr;
@@ -302,10 +302,10 @@ class NTCPConnection {
System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE); System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd)); // _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
_establishedOn = System.currentTimeMillis(); _establishedOn = _context.clock().now();
NTCPConnection rv = _transport.inboundEstablished(this); NTCPConnection rv = _transport.inboundEstablished(this);
_nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY); _nextMetaTime = _establishedOn + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY);
_nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY); _nextInfoTime = _establishedOn + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
_establishState = EstablishState.VERIFIED; _establishState = EstablishState.VERIFIED;
return rv; return rv;
} }
@@ -321,7 +321,7 @@ class NTCPConnection {
if (!isEstablished()) if (!isEstablished())
return getTimeSinceCreated(); return getTimeSinceCreated();
else else
return System.currentTimeMillis()-_establishedOn; return _context.clock().now() -_establishedOn;
} }
public long getMessagesSent() { return _messagesWritten.get(); } public long getMessagesSent() { return _messagesWritten.get(); }
@@ -345,13 +345,13 @@ class NTCPConnection {
} }
/** @return milliseconds */ /** @return milliseconds */
public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; } public long getTimeSinceSend() { return _context.clock().now()-_lastSendTime; }
/** @return milliseconds */ /** @return milliseconds */
public long getTimeSinceReceive() { return System.currentTimeMillis()-_lastReceiveTime; } public long getTimeSinceReceive() { return _context.clock().now()-_lastReceiveTime; }
/** @return milliseconds */ /** @return milliseconds */
public long getTimeSinceCreated() { return System.currentTimeMillis()-_created; } public long getTimeSinceCreated() { return _context.clock().now()-_created; }
/** /**
* @return when this connection was created (not established) * @return when this connection was created (not established)
@@ -625,13 +625,13 @@ class NTCPConnection {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd)); _log.debug("Outbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
_establishedOn = System.currentTimeMillis(); _establishedOn = _context.clock().now();
_establishState = EstablishState.VERIFIED; _establishState = EstablishState.VERIFIED;
_transport.markReachable(getRemotePeer().calculateHash(), false); _transport.markReachable(getRemotePeer().calculateHash(), false);
//_context.banlist().unbanlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE); //_context.banlist().unbanlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE);
boolean msgs = !_outbound.isEmpty(); boolean msgs = !_outbound.isEmpty();
_nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY); _nextMetaTime = _establishedOn + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY);
_nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY); _nextInfoTime = _establishedOn + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
if (msgs) if (msgs)
_transport.getWriter().wantsWrite(this, "outbound established"); _transport.getWriter().wantsWrite(this, "outbound established");
} }
@@ -783,9 +783,10 @@ class NTCPConnection {
return; return;
} }
if (_nextMetaTime <= System.currentTimeMillis()) { long now = _context.clock().now();
if (_nextMetaTime <= now) {
sendMeta(); sendMeta();
_nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY / 2); _nextMetaTime = now + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY / 2);
} }
OutNetMessage msg = null; OutNetMessage msg = null;
@@ -855,10 +856,10 @@ class NTCPConnection {
// for every 6-12 hours that we are connected to a peer, send them // for every 6-12 hours that we are connected to a peer, send them
// our updated netDb info (they may not accept it and instead query // our updated netDb info (they may not accept it and instead query
// the floodfill netDb servers, but they may...) // the floodfill netDb servers, but they may...)
if (_nextInfoTime <= System.currentTimeMillis()) { if (_nextInfoTime <= now) {
// perhaps this should check to see if we are bw throttled, etc? // perhaps this should check to see if we are bw throttled, etc?
enqueueInfoMessage(); enqueueInfoMessage();
_nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY); _nextInfoTime = now + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
} }
} }
@@ -959,7 +960,7 @@ class NTCPConnection {
EventPumper.releaseBuf(buf); EventPumper.releaseBuf(buf);
return; return;
} }
_context.statManager().addRateData("ntcp.throttledReadComplete", (System.currentTimeMillis()-req.getRequestTime())); _context.statManager().addRateData("ntcp.throttledReadComplete", (_context.clock().now()-req.getRequestTime()));
recv(buf); recv(buf);
// our reads used to be bw throttled (during which time we were no // our reads used to be bw throttled (during which time we were no
// longer interested in reading from the network), but we aren't // longer interested in reading from the network), but we aren't
@@ -978,7 +979,7 @@ class NTCPConnection {
removeOBRequest(req); removeOBRequest(req);
ByteBuffer buf = (ByteBuffer)req.attachment(); ByteBuffer buf = (ByteBuffer)req.attachment();
if (!_closed.get()) { if (!_closed.get()) {
_context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime())); _context.statManager().addRateData("ntcp.throttledWriteComplete", (_context.clock().now()-req.getRequestTime()));
write(buf); write(buf);
} }
} }
@@ -1084,7 +1085,7 @@ class NTCPConnection {
} }
} }
if (msg != null) { if (msg != null) {
_lastSendTime = System.currentTimeMillis(); _lastSendTime = _context.clock().now();
_context.statManager().addRateData("ntcp.sendTime", msg.getSendTime()); _context.statManager().addRateData("ntcp.sendTime", msg.getSendTime());
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
_log.debug("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after " _log.debug("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after "
@@ -1130,7 +1131,7 @@ class NTCPConnection {
* Stats only for console * Stats only for console
*/ */
private void updateStats() { private void updateStats() {
long now = System.currentTimeMillis(); long now = _context.clock().now();
long time = now - _lastRateUpdated; long time = now - _lastRateUpdated;
// If enough time has passed... // If enough time has passed...
// Perhaps should synchronize, but if so do the time check before synching... // Perhaps should synchronize, but if so do the time check before synching...
@@ -1468,7 +1469,7 @@ class NTCPConnection {
readMeta(buf); readMeta(buf);
init(); init();
} else { } else {
_stateBegin = System.currentTimeMillis(); _stateBegin = _context.clock().now();
_dataBuf = acquireReadBuf(); _dataBuf = acquireReadBuf();
System.arraycopy(buf, 2, _dataBuf.getData(), 0, buf.length-2); System.arraycopy(buf, 2, _dataBuf.getData(), 0, buf.length-2);
_nextWrite += buf.length-2; _nextWrite += buf.length-2;
@@ -1526,7 +1527,7 @@ class NTCPConnection {
// So use the new handler method that limits the size. // So use the new handler method that limits the size.
h.readMessage(_dataBuf.getData(), 0, _size); h.readMessage(_dataBuf.getData(), 0, _size);
I2NPMessage read = h.lastRead(); I2NPMessage read = h.lastRead();
long timeToRecv = System.currentTimeMillis() - _stateBegin; long timeToRecv = _context.clock().now() - _stateBegin;
releaseHandler(h); releaseHandler(h);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("I2NP message " + _messagesRead + "/" + (read != null ? read.getUniqueId() : 0) _log.debug("I2NP message " + _messagesRead + "/" + (read != null ? read.getUniqueId() : 0)
@@ -1539,7 +1540,7 @@ class NTCPConnection {
if (read != null) { if (read != null) {
_transport.messageReceived(read, _remotePeer, null, timeToRecv, _size); _transport.messageReceived(read, _remotePeer, null, timeToRecv, _size);
_lastReceiveTime = System.currentTimeMillis(); _lastReceiveTime = _context.clock().now();
_messagesRead.incrementAndGet(); _messagesRead.incrementAndGet();
} }
} catch (I2NPMessageException ime) { } catch (I2NPMessageException ime) {