From f630d2dd27ff4a22ddad657fe3dbfa555ed3fff3 Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 23 Nov 2011 23:36:37 +0000 Subject: [PATCH] * NTCP: - More optimizations in recvEncrypted() - More efficient XOR - Reduce bandwidth stat update frequency - Check for repeated zero-length reads --- history.txt | 10 + .../src/net/i2p/router/RouterVersion.java | 2 +- .../router/transport/ntcp/EventPumper.java | 16 ++ .../router/transport/ntcp/NTCPConnection.java | 181 ++++++++++++++---- .../router/transport/ntcp/NTCPTransport.java | 14 +- 5 files changed, 175 insertions(+), 48 deletions(-) diff --git a/history.txt b/history.txt index db1f32e62..e5ffd01be 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,13 @@ +2011-11-23 zzz + * CryptixAESEngine: Fix bogus bounds checks + * NTCP: + - More optimizations in recvEncrypted() + - More efficient XOR + - Reduce bandwidth stat update frequency + - Check for repeated zero-length reads + * RandomSource: Add new method getBytes(buf, offset, length) + * Tunnel encryption: More efficient XOR + 2011-11-21 zzz * NTCP Pumper: - Ensure failsafe pumper code gets run on schedule diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index c10128fe6..fecba78d6 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 6; + public final static long BUILD = 7; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 8e65463c6..086b1dd37 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -101,6 +101,8 @@ class EventPumper implements Runnable { _context.statManager().createRateStat("ntcp.pumperKeySetSize", "", "ntcp", new long[] {10*60*1000} ); _context.statManager().createRateStat("ntcp.pumperKeysPerLoop", "", "ntcp", new long[] {10*60*1000} ); _context.statManager().createRateStat("ntcp.pumperLoopsPerSecond", "", "ntcp", new long[] {10*60*1000} ); + _context.statManager().createRateStat("ntcp.zeroRead", "", "ntcp", new long[] {10*60*1000} ); + _context.statManager().createRateStat("ntcp.zeroReadDrop", "", "ntcp", new long[] {10*60*1000} ); } public synchronized void startPumping() { @@ -561,7 +563,21 @@ class EventPumper implements Runnable { // stay interested //key.interestOps(key.interestOps() | SelectionKey.OP_READ); releaseBuf(buf); + // workaround for channel stuck returning 0 all the time, causing 100% CPU + int consec = con.gotZeroRead(); + if (consec >= 5) { + _context.statManager().addRateData("ntcp.zeroReadDrop", 1); + if (_log.shouldLog(Log.WARN)) + _log.warn("Fail safe zero read close " + con); + con.close(); + } else { + _context.statManager().addRateData("ntcp.zeroRead", consec); + if (_log.shouldLog(Log.INFO)) + _log.info("nothing to read for " + con + ", but stay interested"); + } } else if (read > 0) { + // clear counter for workaround above + con.clearZeroRead(); // ZERO COPY. The buffer will be returned in Reader.processRead() buf.flip(); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf); diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 023b709cb..114cbc40b 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -106,15 +106,26 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { private long _messagesWritten; private long _lastSendTime; private long _lastReceiveTime; + private long _lastRateUpdated; private final long _created; private long _nextMetaTime; + private int _consecutiveZeroReads; + + private static final int BLOCK_SIZE = 16; + private static final int META_SIZE = BLOCK_SIZE; + /** unencrypted outbound metadata buffer */ - private final byte _meta[] = new byte[16]; + private final byte _meta[] = new byte[META_SIZE]; private boolean _sendingMeta; /** how many consecutive sends were failed due to (estimated) send queue time */ private int _consecutiveBacklog; private long _nextInfoTime; + /* + * Update frequency for send/recv rates in console peers page + */ + private static final long STAT_UPDATE_TIME_MS = 30*1000; + private static final int META_FREQUENCY = 10*60*1000; /** how often we send our routerinfo unsolicited */ private static final int INFO_FREQUENCY = 90*60*1000; @@ -144,7 +155,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { // TODO possible switch to CLQ but beware non-constant size() - see below _outbound = new LinkedBlockingQueue(); _isInbound = true; - _decryptBlockBuf = new byte[16]; + _decryptBlockBuf = new byte[BLOCK_SIZE]; _curReadState = new ReadState(); _establishState = new EstablishState(ctx, transport, this); _conKey = key; @@ -169,7 +180,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { // TODO possible switch to CLQ but beware non-constant size() - see below _outbound = new LinkedBlockingQueue(); _isInbound = false; - _decryptBlockBuf = new byte[16]; + _decryptBlockBuf = new byte[BLOCK_SIZE]; _curReadState = new ReadState(); initialize(); } @@ -177,8 +188,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { private void initialize() { _lastSendTime = _created; _lastReceiveTime = _created; - _curReadBlock = new byte[16]; - _prevReadBlock = new byte[16]; + _lastRateUpdated = _created; + _curReadBlock = new byte[BLOCK_SIZE]; + _prevReadBlock = new byte[BLOCK_SIZE]; _transport.establishing(this); } @@ -200,9 +212,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _sessionKey = key; _clockSkew = clockSkew; _prevWriteEnd = prevWriteEnd; - System.arraycopy(prevReadEnd, prevReadEnd.length-16, _prevReadBlock, 0, _prevReadBlock.length); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd)); + System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd)); _established = true; _establishedOn = System.currentTimeMillis(); _transport.inboundEstablished(this); @@ -241,7 +253,24 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { public long getTimeSinceCreated() { return System.currentTimeMillis()-_created; } public int getConsecutiveBacklog() { return _consecutiveBacklog; } - + + /** + * workaround for EventPumper + * @since 0.8.12 + */ + public void clearZeroRead() { + _consecutiveZeroReads = 0; + } + + /** + * workaround for EventPumper + * @return value after incrementing + * @since 0.8.12 + */ + public int gotZeroRead() { + return ++_consecutiveZeroReads; + } + public boolean isClosed() { return _closed; } public void close() { close(false); } public void close(boolean allowRequeue) { @@ -441,7 +470,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _sessionKey = key; _clockSkew = clockSkew; _prevWriteEnd = prevWriteEnd; - System.arraycopy(prevReadEnd, prevReadEnd.length-16, _prevReadBlock, 0, _prevReadBlock.length); + System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE); if (_log.shouldLog(Log.DEBUG)) _log.debug("Outbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd)); @@ -595,8 +624,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { * */ synchronized void prepareNextWriteFast() { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established); if (!_isInbound && !_established) { if (_establishState == null) { _establishState = new EstablishState(_context, _transport, this); @@ -715,13 +744,12 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (rem > 0) padding = 16 - rem; - buf.padLength = padding; buf.unencryptedLength = min+padding; DataHelper.toLong(buf.unencrypted, 0, 2, sz); System.arraycopy(buf.base, 0, buf.unencrypted, 2, buf.baseLength); - if (padding > 0) - _context.random().nextBytes(buf.pad); // maybe more than necessary, but its only the prng - System.arraycopy(buf.pad, 0, buf.unencrypted, 2+sz, buf.padLength); + if (padding > 0) { + _context.random().nextBytes(buf.unencrypted, 2+sz, padding); + } //long serialized = System.currentTimeMillis(); buf.crc.update(buf.unencrypted, 0, buf.unencryptedLength-4); @@ -765,22 +793,18 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { int unencryptedLength; final byte base[]; int baseLength; - final byte pad[]; - int padLength; final Adler32 crc; byte encrypted[]; PrepBuffer() { unencrypted = new byte[BUFFER_SIZE]; base = new byte[BUFFER_SIZE]; - pad = new byte[16]; crc = new Adler32(); } private void init() { unencryptedLength = 0; baseLength = 0; - padLength = 0; encrypted = null; crc.reset(); } @@ -948,20 +972,24 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { private long _lastBytesReceived; /** _bytesSent when we last updated the rate */ private long _lastBytesSent; - private long _lastRateUpdated; private float _sendBps; private float _recvBps; - private float _sendBps15s; - private float _recvBps15s; + //private float _sendBps15s; + //private float _recvBps15s; - public float getSendRate() { return _sendBps15s; } - public float getRecvRate() { return _recvBps15s; } + public float getSendRate() { return _sendBps; } + public float getRecvRate() { return _recvBps; } + /** + * Stats only for console + */ private void updateStats() { long now = System.currentTimeMillis(); long time = now - _lastRateUpdated; - // If at least one second has passed - if (time >= 1000) { + // If enough time has passed... + // Perhaps should synchronize, but if so do the time check before synching... + // only for console so don't bother.... + if (time >= STAT_UPDATE_TIME_MS) { long totS = _bytesSent; long totR = _bytesReceived; long sent = totS - _lastBytesSent; // How much we sent meanwhile @@ -976,14 +1004,14 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { // Maintain an approximate average with a 15-second halflife // Weights (0.955 and 0.045) are tuned so that transition between two values (e.g. 0..10) // would reach their midpoint (e.g. 5) in 15s - _sendBps15s = (0.955f)*_sendBps15s + (0.045f)*((float)sent*1000f)/(float)time; - _recvBps15s = (0.955f)*_recvBps15s + (0.045f)*((float)recv*1000)/(float)time; + //_sendBps15s = (0.955f)*_sendBps15s + (0.045f)*((float)sent*1000f)/(float)time; + //_recvBps15s = (0.955f)*_recvBps15s + (0.045f)*((float)recv*1000)/(float)time; if (_log.shouldLog(Log.DEBUG)) _log.debug("Rates updated to " - + _sendBps + "/" + _recvBps + "Bps in/out (" - + _sendBps15s + "/" + _recvBps15s + "Bps in/out 15s) after " - + sent + "/" + recv + " in " + time); + + _sendBps + '/' + _recvBps + "Bps in/out " + //+ _sendBps15s + "/" + _recvBps15s + "Bps in/out 15s after " + + sent + '/' + recv + " in " + DataHelper.formatDuration(time)); } } @@ -1003,17 +1031,30 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { synchronized void recvEncryptedI2NP(ByteBuffer buf) { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("receive encrypted i2np: " + buf.remaining()); + // hasArray() is false for direct buffers, at least on my system... + if (_curReadBlockIndex == 0 && buf.hasArray()) { + // fast way + int tot = buf.remaining(); + if (tot >= 32 && tot % 16 == 0) { + recvEncryptedFast(buf); + return; + } + } + while (buf.hasRemaining() && !_closed) { - int want = Math.min(buf.remaining(), _curReadBlock.length-_curReadBlockIndex); + int want = Math.min(buf.remaining(), BLOCK_SIZE - _curReadBlockIndex); if (want > 0) { buf.get(_curReadBlock, _curReadBlockIndex, want); _curReadBlockIndex += want; } //_curReadBlock[_curReadBlockIndex++] = buf.get(); - if (_curReadBlockIndex >= _curReadBlock.length) { + if (_curReadBlockIndex >= BLOCK_SIZE) { // cbc _context.aes().decryptBlock(_curReadBlock, 0, _sessionKey, _decryptBlockBuf, 0); - DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, _decryptBlockBuf.length); + //DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, BLOCK_SIZE); + for (int i = 0; i < BLOCK_SIZE; i++) { + _decryptBlockBuf[i] ^= _prevReadBlock[i]; + } //if (_log.shouldLog(Log.DEBUG)) // _log.debug("parse decrypted i2np block (remaining: " + buf.remaining() + ")"); boolean ok = recvUnencryptedI2NP(); @@ -1029,6 +1070,51 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } } } + + /** + * Decrypt directly out of the ByteBuffer instead of copying the bytes + * 16 at a time to the _curReadBlock / _prevReadBlock flip buffers. + * + * More efficient but can only be used if buf.hasArray == true AND + * _curReadBlockIndex must be 0 and buf.getRemaining() % 16 must be 0 + * and buf.getRemaining() must be >= 16. + * All this is true for most buffers. + * In theory this could be fixed up to handle the other cases too but that's hard. + * Caller must synchronize! + * @since 0.8.12 + */ + private void recvEncryptedFast(ByteBuffer buf) { + byte[] array = buf.array(); + int pos = buf.arrayOffset(); + int end = pos + buf.remaining(); + boolean first = true; + + for ( ; pos < end && !_closed; pos += BLOCK_SIZE) { + _context.aes().decryptBlock(array, pos, _sessionKey, _decryptBlockBuf, 0); + if (first) { + // XOR with _prevReadBlock the first time... + //DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, BLOCK_SIZE); + for (int i = 0; i < BLOCK_SIZE; i++) { + _decryptBlockBuf[i] ^= _prevReadBlock[i]; + } + first = false; + } else { + //DataHelper.xor(_decryptBlockBuf, 0, array, pos - BLOCK_SIZE, _decryptBlockBuf, 0, BLOCK_SIZE); + int start = pos - BLOCK_SIZE; + for (int i = 0; i < BLOCK_SIZE; i++) { + _decryptBlockBuf[i] ^= array[start + i]; + } + } + boolean ok = recvUnencryptedI2NP(); + if (!ok) { + _log.error("Read buffer " + System.identityHashCode(buf) + " contained corrupt data"); + _context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1); + return; + } + } + // ...and copy to _prevReadBlock the last time + System.arraycopy(array, end - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE); + } /** * Append the next 16 bytes of cleartext to the read state. @@ -1038,6 +1124,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { */ private boolean recvUnencryptedI2NP() { _curReadState.receiveBlock(_decryptBlockBuf); + // FIXME move check to ReadState; must we close? possible attack vector? if (_curReadState.getSize() > BUFFER_SIZE) { _log.error("I2NP message too big - size: " + _curReadState.getSize() + " Dropping " + toString()); _context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _curReadState.getSize()); @@ -1087,12 +1174,23 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } } + /** + * One special case is a metadata message where the sizeof(data) is 0. In + * that case, the unencrypted message is encoded as: + *
+     *  +-------+-------+-------+-------+-------+-------+-------+-------+
+     *  |       0       |      timestamp in seconds     | uninterpreted             
+     *  +-------+-------+-------+-------+-------+-------+-------+-------+
+     *          uninterpreted           | adler checksum of sz+data+pad |
+     *  +-------+-------+-------+-------+-------+-------+-------+-------+
+     *
+ */ private void sendMeta() { byte encrypted[] = new byte[_meta.length]; synchronized (_meta) { - _context.random().nextBytes(_meta); // randomize the uninterpreted, then overwrite w/ data DataHelper.toLong(_meta, 0, 2, 0); DataHelper.toLong(_meta, 2, 4, (_context.clock().now() + 500) / 1000); + _context.random().nextBytes(_meta, 6, 6); Adler32 crc = new Adler32(); crc.update(_meta, 0, _meta.length-4); DataHelper.toLong(_meta, _meta.length-4, 4, crc.getValue()); @@ -1234,13 +1332,12 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { /** @param buf 16 bytes */ private void receiveInitial(byte buf[]) { - _stateBegin = System.currentTimeMillis(); _size = (int)DataHelper.fromLong(buf, 0, 2); if (_size == 0) { readMeta(buf); init(); - return; } else { + _stateBegin = System.currentTimeMillis(); _dataBuf = acquireReadBuf(); System.arraycopy(buf, 2, _dataBuf.data, 0, buf.length-2); _nextWrite += buf.length-2; @@ -1262,6 +1359,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { remaining -= blockUsed; } if ( (remaining <= 0) && (buf.length-blockUsed < 4) ) { + // we've received all the data but not the 4-byte checksum if (_log.shouldLog(Log.DEBUG)) _log.debug("crc wraparound required on block " + _blocks + " in message " + _messagesRead); _crc.update(buf); @@ -1284,8 +1382,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _expectedCrc = DataHelper.fromLong(buf, buf.length-4, 4); _crc.update(buf, 0, buf.length-4); long val = _crc.getValue(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("CRC value computed: " + val + " expected: " + _expectedCrc + " size: " + _size); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("CRC value computed: " + val + " expected: " + _expectedCrc + " size: " + _size); if (val == _expectedCrc) { try { I2NPMessageHandler h = acquireHandler(_context); @@ -1329,6 +1427,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (_log.shouldLog(Log.WARN)) _log.warn("Error parsing I2NP message", ime); _context.statManager().addRateData("ntcp.corruptI2NPIME", 1); + // FIXME don't close the con, possible attack vector? close(); // handler and databuf are lost return; @@ -1337,7 +1436,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (_log.shouldLog(Log.WARN)) _log.warn("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " blocks " + _blocks); _context.statManager().addRateData("ntcp.corruptI2NPCRC", 1); - // FIXME should we try to read in the message and keep going? + // FIXME don't close the con, possible attack vector? close(); // databuf is lost return; diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index 9fff5f896..de39759ea 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -756,16 +756,18 @@ public class NTCPTransport extends TransportImpl { buf.append(DataHelper.formatDuration2(con.getTimeSinceReceive())); buf.append(THINSP).append(DataHelper.formatDuration2(con.getTimeSinceSend())); buf.append(""); - if (con.getTimeSinceReceive() < 10*1000) { - buf.append(formatRate(con.getRecvRate()/1024)); - bpsRecv += con.getRecvRate(); + if (con.getTimeSinceReceive() < 2*60*1000) { + float r = con.getRecvRate(); + buf.append(formatRate(r / 1024)); + bpsRecv += r; } else { buf.append(formatRate(0)); } buf.append(THINSP); - if (con.getTimeSinceSend() < 10*1000) { - buf.append(formatRate(con.getSendRate()/1024)); - bpsSend += con.getSendRate(); + if (con.getTimeSinceSend() < 2*60*1000) { + float r = con.getSendRate(); + buf.append(formatRate(r / 1024)); + bpsSend += r; } else { buf.append(formatRate(0)); }