* NTCP: Reduce log level for race (ticket #392)

* NTCPConnection: Concurrent PrepBufs
  * OutNetMessage: Remove some fields and methods used only in NTCP debugging
This commit is contained in:
zzz
2011-11-16 01:00:08 +00:00
parent caaa8dacad
commit e4ee5e3016
5 changed files with 63 additions and 92 deletions

View File

@@ -1,3 +1,10 @@
2011-11-16 zzz
* Console: Add Jetty version to logs page
* NTCP: Reduce log level for race (ticket #392)
* NTCPConnection: Concurrent PrepBufs
* OutNetMessage: Remove some fields and methods used only in NTCP debugging
* Router: Move router.ping file from temp directory to config directory
2011-11-14 zzz
* Console: Remove % chart at bottom of tunnels.jsp
* Profiles: Only use same-country metric for countries with

View File

@@ -48,7 +48,6 @@ public class OutNetMessage {
private MessageSelector _replySelector;
private Set<String> _failedTransports;
private long _sendBegin;
private long _transmitBegin;
//private Exception _createdBy;
private final long _created;
/** for debugging, contains a mapping of even name to Long (e.g. "begin sending", "handleOutbound", etc) */
@@ -58,9 +57,6 @@ public class OutNetMessage {
* (some JVMs have less than 10ms resolution, so the Long above doesn't guarantee order)
*/
private List<String> _timestampOrder;
private int _queueSize;
private long _prepareBegin;
private long _prepareEnd;
private Object _preparationBuf;
public OutNetMessage(RouterContext context) {
@@ -247,14 +243,13 @@ public class OutNetMessage {
/** when did the sending process begin */
public long getSendBegin() { return _sendBegin; }
public void beginSend() { _sendBegin = _context.clock().now(); }
public void beginTransmission() { _transmitBegin = _context.clock().now(); }
public void beginPrepare() { _prepareBegin = _context.clock().now(); }
public void prepared() { prepared(null); }
public void prepared(Object buf) {
_prepareEnd = _context.clock().now();
_preparationBuf = buf;
}
public Object releasePreparationBuffer() {
Object rv = _preparationBuf;
_preparationBuf = null;
@@ -262,18 +257,13 @@ public class OutNetMessage {
}
public long getCreated() { return _created; }
/** time since the message was created */
public long getLifetime() { return _context.clock().now() - _created; }
/** time the transport tries to send the message (including any queueing) */
public long getSendTime() { return _context.clock().now() - _sendBegin; }
/** time during which the i2np message is actually in flight */
public long getTransmissionTime() { return _context.clock().now() - _transmitBegin; }
/** how long it took to prepare the i2np message for transmission (including serialization and transport layer encryption) */
public long getPreparationTime() { return _prepareEnd - _prepareBegin; }
/** number of messages ahead of this one going to the targetted peer when it is first enqueued */
public int getQueueSize() { return _queueSize; }
public void setQueueSize(int size) { _queueSize = size; }
/**
* We've done what we need to do with the data from this message, though
* we may keep the object around for a while to use its ID, jobs, etc.

View File

@@ -213,8 +213,7 @@ public abstract class TransportImpl implements Transport {
if (_log.shouldLog(Log.WARN))
_log.warn("afterSend slow: [success=" + sendSuccessful + "] " + msg.getMessageSize() + " byte "
+ msg.getMessageType() + " " + msg.getMessageId() + " to "
+ msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend
+ "/" + msg.getTransmissionTime());
+ msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend);
}
//if (true)
// _log.error("(not error) I2NP message sent? " + sendSuccessful + " " + msg.getMessageId() + " after " + msToSend + "/" + msg.getTransmissionTime());
@@ -225,7 +224,7 @@ public abstract class TransportImpl implements Transport {
if (!sendSuccessful)
level = Log.INFO;
if (_log.shouldLog(level))
_log.log(level, "afterSend slow (" + lifetime + "/" + msToSend + "/" + msg.getTransmissionTime() + "): [success=" + sendSuccessful + "] " + msg.getMessageSize() + " byte "
_log.log(level, "afterSend slow (" + lifetime + "/" + msToSend + "): [success=" + sendSuccessful + "] " + msg.getMessageSize() + " byte "
+ msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6)
+ " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + ": " + msg.toString());
} else {

View File

@@ -70,7 +70,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private boolean _established;
private long _establishedOn;
private EstablishState _establishState;
private NTCPTransport _transport;
private final NTCPTransport _transport;
private final boolean _isInbound;
private boolean _closed;
private NTCPAddress _remAddr;
@@ -80,7 +80,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
* pending unprepared OutNetMessage instances
*/
private final LinkedBlockingQueue<OutNetMessage> _outbound;
/** current prepared OutNetMessage, or null - synchronize on _outbound to modify */
/**
* current prepared OutNetMessage, or null - synchronize on _outbound to modify
* FIXME why do we need this???
*/
private OutNetMessage _currentOutbound;
private SessionKey _sessionKey;
/** encrypted block of the current I2NP message being read */
@@ -290,12 +293,13 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
_consecutiveBacklog = 0;
int enqueued = 0;
if (FAST_LARGE)
//if (FAST_LARGE)
bufferedPrepare(msg);
boolean noOutbound = false;
_outbound.offer(msg);
enqueued = _outbound.size();
msg.setQueueSize(enqueued);
// although stat description says ahead of this one, not including this one...
_context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
noOutbound = (_currentOutbound == null);
if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
if (_established && noOutbound)
@@ -437,6 +441,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_transport.getWriter().wantsWrite(this, "outbound established");
}
/**
// Time vs space tradeoff:
// on slow GCing jvms, the mallocs in the following preparation can cause the
// write to get congested, taking up a substantial portion of the Writer's
@@ -450,6 +455,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
// just the currently transmitting one.
//
// hmm.
*/
private static final boolean FAST_LARGE = true; // otherwise, SLOW_SMALL
/**
@@ -605,6 +611,11 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (msg == null)
return;
} else {
// FIXME
// This is a linear search to implement a priority queue, O(n**2)
// Also race with unsynchronized removal in close() above
// Either implement a real (concurrent?) priority queue or just comment out all of this,
// as it isn't clear how effective the priorities on a per-connection basis are.
int slot = 0; // only for logging
Iterator<OutNetMessage> it = _outbound.iterator();
for (int i = 0; it.hasNext() && i < 75; i++) { //arbitrary bound
@@ -627,11 +638,15 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_currentOutbound = msg;
}
msg.beginTransmission();
//long begin = System.currentTimeMillis();
PrepBuffer buf = (PrepBuffer)msg.releasePreparationBuffer();
if (buf == null)
throw new RuntimeException("buf is null for " + msg);
if (buf == null) {
// race, see ticket #392
//throw new RuntimeException("buf is null for " + msg);
if (_log.shouldLog(Log.WARN))
_log.warn("Null prep buf for " + msg);
return;
}
_context.aes().encrypt(buf.unencrypted, 0, buf.encrypted, 0, _sessionKey, _prevWriteEnd, 0, buf.unencryptedLength);
System.arraycopy(buf.encrypted, buf.encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length);
//long encryptedTime = System.currentTimeMillis();
@@ -669,7 +684,6 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private void bufferedPrepare(OutNetMessage msg) {
//if (!_isInbound && !_established)
// return;
msg.beginPrepare();
//long begin = System.currentTimeMillis();
PrepBuffer buf = acquireBuf();
//long alloc = System.currentTimeMillis();
@@ -710,66 +724,34 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
// + " serialize=" + (serialized-alloc) + " crc=" + (crced-serialized));
}
private static final int MIN_PREP_BUFS = 5;
private static int NUM_PREP_BUFS = 5;
private static int __liveBufs = 0;
private static int __consecutiveExtra;
private final static List _bufs = new ArrayList(NUM_PREP_BUFS);
private PrepBuffer acquireBuf() {
synchronized (_bufs) {
if (!_bufs.isEmpty()) {
PrepBuffer b = (PrepBuffer)_bufs.remove(0);
b.acquired();
return b;
}
}
PrepBuffer b = new PrepBuffer();
b.init();
NUM_PREP_BUFS = ++__liveBufs;
if (_log.shouldLog(Log.DEBUG))
_log.debug("creating a new prep buffer with " + __liveBufs + " live");
_context.statManager().addRateData("ntcp.prepBufCache", NUM_PREP_BUFS, 0);
b.acquired();
private static int NUM_PREP_BUFS = 6;
private final static LinkedBlockingQueue<PrepBuffer> _bufs = new LinkedBlockingQueue(NUM_PREP_BUFS);
/**
* @return initialized buffer
*/
private static PrepBuffer acquireBuf() {
PrepBuffer b = _bufs.poll();
if (b == null)
b = new PrepBuffer();
return b;
}
private void releaseBuf(PrepBuffer buf) {
private static void releaseBuf(PrepBuffer buf) {
buf.init();
long lifetime = buf.lifetime();
int extra = 0;
boolean cached = false;
synchronized (_bufs) {
if (_bufs.size() < NUM_PREP_BUFS) {
extra = _bufs.size();
_bufs.add(buf);
cached = true;
if (extra > 5) {
__consecutiveExtra++;
if (__consecutiveExtra >= 20) {
NUM_PREP_BUFS = Math.max(NUM_PREP_BUFS - 1, MIN_PREP_BUFS);
__consecutiveExtra = 0;
}
}
} else {
buf.unencrypted = null;
buf.base = null;
buf.pad = null;
buf.crc = null;
--__liveBufs;
}
}
if (cached && _log.shouldLog(Log.DEBUG))
_log.debug("releasing cached buffer with " + __liveBufs + " live after " + lifetime);
_bufs.offer(buf);
}
private static class PrepBuffer {
byte unencrypted[];
final byte unencrypted[];
int unencryptedLength;
byte base[];
final byte base[];
int baseLength;
byte pad[];
final byte pad[];
int padLength;
Adler32 crc;
final Adler32 crc;
byte encrypted[];
private long acquiredOn;
PrepBuffer() {
unencrypted = new byte[BUFFER_SIZE];
@@ -777,6 +759,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
pad = new byte[16];
crc = new Adler32();
}
private void init() {
unencryptedLength = 0;
baseLength = 0;
@@ -784,8 +767,6 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
encrypted = null;
crc.reset();
}
public void acquired() { acquiredOn = System.currentTimeMillis(); }
public long lifetime() { return System.currentTimeMillis()-acquiredOn; }
}
/**
@@ -906,13 +887,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (msg != null) {
_lastSendTime = System.currentTimeMillis();
_context.statManager().addRateData("ntcp.sendTime", msg.getSendTime(), msg.getSendTime());
_context.statManager().addRateData("ntcp.transmitTime", msg.getTransmissionTime(), msg.getTransmissionTime());
_context.statManager().addRateData("ntcp.sendQueueSize", msg.getQueueSize(), msg.getLifetime());
if (_log.shouldLog(Log.INFO)) {
_log.info("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after "
+ msg.getSendTime() + "/" + msg.getTransmissionTime() + "/"
+ msg.getPreparationTime() + "/" + msg.getLifetime()
+ " queued after " + msg.getQueueSize()
+ msg.getSendTime() + "/"
+ msg.getLifetime()
+ " with " + buf.capacity() + " bytes (uid=" + System.identityHashCode(msg)+" on " + toString() + ")");
}
_messagesWritten++;
@@ -1112,8 +1090,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
public long getReadTime() { return _curReadState.getReadTime(); }
private static class DataBuf {
byte data[];
ByteArrayInputStream bais;
final byte data[];
final ByteArrayInputStream bais;
public DataBuf() {
data = new byte[BUFFER_SIZE];
bais = new ByteArrayInputStream(data);
@@ -1139,9 +1118,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
static void releaseResources() {
_i2npHandlers.clear();
_dataReadBufs.clear();
synchronized(_bufs) {
_bufs.clear();
}
_bufs.clear();
}
/**

View File

@@ -73,7 +73,6 @@ public class NTCPTransport extends TransportImpl {
_log = ctx.logManager().getLog(getClass());
_context.statManager().createRateStat("ntcp.sendTime", "Total message lifetime when sent completely", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.transmitTime", "How long after message preparation before the message was fully sent", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.sendQueueSize", "How many messages were ahead of the current one on the connection's queue when it was first added", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.receiveTime", "How long it takes to receive an inbound message", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.receiveSize", "How large the received message was", "ntcp", RATES);
@@ -122,7 +121,6 @@ public class NTCPTransport extends TransportImpl {
_context.statManager().createRateStat("ntcp.outboundFailedIOEImmediate", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.invalidOutboundSkew", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.noBidTooLargeI2NP", "send size", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.prepBufCache", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.queuedRecv", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", RATES);