From 1b09b9faa4782cdf4020148938f6aeb91766135b Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 23 Apr 2015 13:19:18 +0000 Subject: [PATCH] Streaming: More efficient checking for input buffer overflow, add additional checks. Fix bug if available buffer calculation is negative Check log level before calling displayPacket() Log tweaks --- .../i2p/client/streaming/impl/Connection.java | 7 +- .../impl/ConnectionPacketHandler.java | 25 ++---- .../streaming/impl/MessageInputStream.java | 77 +++++++++++++++++-- .../client/streaming/impl/PacketHandler.java | 11 +-- .../client/streaming/impl/PacketQueue.java | 3 +- 5 files changed, 90 insertions(+), 33 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index 2885c12f9..c27bc9dea 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -124,9 +124,11 @@ class Connection { _isInbound = isInbound; _log = _context.logManager().getLog(Connection.class); _receiver = new ConnectionDataReceiver(_context, this); - _inputStream = new MessageInputStream(_context); + _options = (opts != null ? opts : new ConnectionOptions()); + _inputStream = new MessageInputStream(_context, _options.getMaxMessageSize(), + _options.getMaxWindowSize(), _options.getInboundBufferSize()); // FIXME pass through a passive flush delay setting as the 4th arg - _outputStream = new MessageOutputStream(_context, timer, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); + _outputStream = new MessageOutputStream(_context, timer, _receiver, _options.getMaxMessageSize()); _timer = timer; _outboundPackets = new TreeMap(); if (opts != null) { @@ -136,7 +138,6 @@ class Connection { _localPort = 0; _remotePort = 0; } - _options = (opts != null ? opts : new ConnectionOptions()); _outputStream.setWriteTimeout((int)_options.getWriteTimeout()); _inputStream.setReadTimeout((int)_options.getReadTimeout()); _lastSendId = new AtomicLong(-1); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java index 4b8c49810..998ede4a3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java @@ -103,23 +103,14 @@ class ConnectionPacketHandler { } } - if (packet.getPayloadSize() > 0) { - // Here, for the purposes of calculating whether the input stream is full, - // we assume all the not-ready blocks are the max message size. - // This prevents us from getting DoSed by accepting unlimited out-of-order small messages - long ready = con.getInputStream().getHighestReadyBlockId(); - int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize(); - int allowedBlocks = available/con.getOptions().getMaxMessageSize(); - if (seqNum > ready + allowedBlocks) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Inbound buffer exceeded on connection " + con + " (" - + ready + "/"+ (ready+allowedBlocks) + "/" + available - + ": dropping " + packet); - con.getOptions().setChoke(61*1000); - packet.releasePayload(); - con.ackImmediately(); - return; - } + if (!con.getInputStream().canAccept(seqNum, packet.getPayloadSize())) { + if (_log.shouldWarn()) + _log.warn("Inbound buffer exceeded on connection " + con + + ", dropping " + packet); + con.getOptions().setChoke(61*1000); + packet.releasePayload(); + con.ackImmediately(); + return; } con.getOptions().setChoke(0); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java index ea0c33c34..fdf3d397b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java @@ -33,6 +33,7 @@ class MessageInputStream extends InputStream { * */ private final List _readyDataBlocks; + /** current byte index into _readyDataBlocks.get(0) */ private int _readyDataBlockIndex; /** highest message ID used in the readyDataBlocks */ private long _highestReadyBlockId; @@ -55,12 +56,16 @@ class MessageInputStream extends InputStream { private IOException _streamError; private long _readTotal; //private ByteCache _cache; - + private final int _maxMessageSize; + private final int _maxWindowSize; + private final int _maxBufferSize; private final byte[] _oneByte = new byte[1]; - private final Object _dataLock; - public MessageInputStream(I2PAppContext ctx) { + private static final int MIN_READY_BUFFERS = 16; + + + public MessageInputStream(I2PAppContext ctx, int maxMessageSize, int maxWindowSize, int maxBufferSize) { _log = ctx.logManager().getLog(MessageInputStream.class); _readyDataBlocks = new ArrayList(4); _highestReadyBlockId = -1; @@ -68,6 +73,9 @@ class MessageInputStream extends InputStream { _readTimeout = -1; _notYetReadyBlocks = new HashMap(4); _dataLock = new Object(); + _maxMessageSize = maxMessageSize; + _maxWindowSize = maxWindowSize; + _maxBufferSize = maxBufferSize; //_cache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE); } @@ -89,6 +97,62 @@ class MessageInputStream extends InputStream { } } + /** + * Determine if this packet will fit in our buffering limits. + * + * @return true if we have room. If false, do not call messageReceived() + * @since 0.9.20 moved from ConnectionPacketHandler.receivePacket() so it can all be under one lock, + * and we can efficiently do several checks + */ + public boolean canAccept(long messageId, int payloadSize) { + if (payloadSize <= 0) + return true; + if (messageId < MIN_READY_BUFFERS) + return true; + synchronized (_dataLock) { + // always accept if closed, will be processed elsewhere + if (_locallyClosed) + return true; + // ready dup check + // we always allow sequence numbers less than or equal to highest received + if (messageId <= _highestReadyBlockId) + return true; + // shortcut test, assuming all ready and not ready blocks are max size, + // to avoid iterating through all the ready blocks in getTotalReadySize() + if ((_readyDataBlocks.size() + _notYetReadyBlocks.size()) * _maxMessageSize < _maxBufferSize) + return true; + // not ready dup check + if (_notYetReadyBlocks.containsKey(Long.valueOf(messageId))) + return true; + // less efficient starting here + // Here, for the purposes of calculating whether the input stream is full, + // we assume all the not-ready blocks are the max message size. + // This prevents us from getting DoSed by accepting unlimited out-of-order small messages + int available = _maxBufferSize - getTotalReadySize(); + if (available <= 0) { + if (_log.shouldWarn()) + _log.warn("Dropping message " + messageId + ", inbound buffer exceeded: available = " + + available); + return false; + } + // following code screws up if available < 0 + int allowedBlocks = available / _maxMessageSize; + if (messageId > _highestReadyBlockId + allowedBlocks) { + if (_log.shouldWarn()) + _log.warn("Dropping message " + messageId + ", inbound buffer exceeded: " + + _highestReadyBlockId + '/' + (_highestReadyBlockId + allowedBlocks) + '/' + available); + return false; + } + // This prevents us from getting DoSed by accepting unlimited in-order small messages + if (_readyDataBlocks.size() >= 4 * _maxWindowSize) { + if (_log.shouldWarn()) + _log.warn("Dropping message " + messageId + ", too many ready blocks"); + return false; + } + } + return true; + } + /** * Retrieve the message IDs that are holes in our sequence - ones * past the highest ready ID and below the highest received message @@ -227,7 +291,7 @@ class MessageInputStream extends InputStream { /** * A new message has arrived - toss it on the appropriate queue (moving * previously pending messages to the ready queue if it fills the gap, etc). - * This does no limiting of pending data - it must be limited in ConnectionPacketHandler. + * This does no limiting of pending data - see canAccept() for limiting. * * @param messageId ID of the message * @param payload message payload, may be null or have null or zero-length data @@ -486,10 +550,9 @@ class MessageInputStream extends InputStream { int numBytes = 0; for (int i = 0; i < _readyDataBlocks.size(); i++) { ByteArray cur = _readyDataBlocks.get(i); + numBytes += cur.getValid(); if (i == 0) - numBytes += cur.getValid() - _readyDataBlockIndex; - else - numBytes += cur.getValid(); + numBytes -= _readyDataBlockIndex; } return numBytes; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java index 3292e3834..1eb63a663 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java @@ -100,11 +100,12 @@ class PacketHandler { Connection con = (sendId > 0 ? _manager.getConnectionByInboundId(sendId) : null); if (con != null) { - if (_log.shouldLog(Log.INFO)) + if (_log.shouldDebug()) displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO()); receiveKnownCon(con, packet); } else { - displayPacket(packet, "UNKN", null); + if (_log.shouldDebug()) + displayPacket(packet, "UNKN", null); receiveUnknownCon(packet, sendId, queueIfNoConn); } // Don't log here, wait until we have the conn to make the dumps easier to follow @@ -112,8 +113,9 @@ class PacketHandler { } private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS"); + + /** logs to System.out, and router log at debug level */ void displayPacket(Packet packet, String prefix, String suffix) { - if (!_log.shouldLog(Log.INFO)) return; StringBuilder buf = new StringBuilder(256); synchronized (_fmt) { buf.append(_fmt.format(new Date())); @@ -124,8 +126,7 @@ class PacketHandler { buf.append(" ").append(suffix); String str = buf.toString(); System.out.println(str); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(str); + _log.debug(str); } private void receiveKnownCon(Connection con, Packet packet) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java index 51b8baef7..96222a8a2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java @@ -214,7 +214,8 @@ class PacketQueue implements SendMessageStatusListener { } Connection c = packet.getConnection(); String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO() : null); - _connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix); + if (_log.shouldDebug()) + _connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix); if (I2PSocketManagerFull.pcapWriter != null && _context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP)) packet.logTCPDump();