Streaming: More efficient checking for input buffer overflow,

add additional checks.
Fix bug if available buffer calculation is negative
Check log level before calling displayPacket()
Log tweaks
This commit is contained in:
zzz
2015-04-23 13:19:18 +00:00
parent 6f0ebb2d94
commit 1b09b9faa4
5 changed files with 90 additions and 33 deletions

View File

@@ -124,9 +124,11 @@ class Connection {
_isInbound = isInbound;
_log = _context.logManager().getLog(Connection.class);
_receiver = new ConnectionDataReceiver(_context, this);
_inputStream = new MessageInputStream(_context);
_options = (opts != null ? opts : new ConnectionOptions());
_inputStream = new MessageInputStream(_context, _options.getMaxMessageSize(),
_options.getMaxWindowSize(), _options.getInboundBufferSize());
// FIXME pass through a passive flush delay setting as the 4th arg
_outputStream = new MessageOutputStream(_context, timer, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize()));
_outputStream = new MessageOutputStream(_context, timer, _receiver, _options.getMaxMessageSize());
_timer = timer;
_outboundPackets = new TreeMap<Long, PacketLocal>();
if (opts != null) {
@@ -136,7 +138,6 @@ class Connection {
_localPort = 0;
_remotePort = 0;
}
_options = (opts != null ? opts : new ConnectionOptions());
_outputStream.setWriteTimeout((int)_options.getWriteTimeout());
_inputStream.setReadTimeout((int)_options.getReadTimeout());
_lastSendId = new AtomicLong(-1);

View File

@@ -103,24 +103,15 @@ class ConnectionPacketHandler {
}
}
if (packet.getPayloadSize() > 0) {
// Here, for the purposes of calculating whether the input stream is full,
// we assume all the not-ready blocks are the max message size.
// This prevents us from getting DoSed by accepting unlimited out-of-order small messages
long ready = con.getInputStream().getHighestReadyBlockId();
int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize();
int allowedBlocks = available/con.getOptions().getMaxMessageSize();
if (seqNum > ready + allowedBlocks) {
if (_log.shouldLog(Log.WARN))
_log.warn("Inbound buffer exceeded on connection " + con + " ("
+ ready + "/"+ (ready+allowedBlocks) + "/" + available
+ ": dropping " + packet);
if (!con.getInputStream().canAccept(seqNum, packet.getPayloadSize())) {
if (_log.shouldWarn())
_log.warn("Inbound buffer exceeded on connection " + con +
", dropping " + packet);
con.getOptions().setChoke(61*1000);
packet.releasePayload();
con.ackImmediately();
return;
}
}
con.getOptions().setChoke(0);
_context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize());

View File

@@ -33,6 +33,7 @@ class MessageInputStream extends InputStream {
*
*/
private final List<ByteArray> _readyDataBlocks;
/** current byte index into _readyDataBlocks.get(0) */
private int _readyDataBlockIndex;
/** highest message ID used in the readyDataBlocks */
private long _highestReadyBlockId;
@@ -55,12 +56,16 @@ class MessageInputStream extends InputStream {
private IOException _streamError;
private long _readTotal;
//private ByteCache _cache;
private final int _maxMessageSize;
private final int _maxWindowSize;
private final int _maxBufferSize;
private final byte[] _oneByte = new byte[1];
private final Object _dataLock;
public MessageInputStream(I2PAppContext ctx) {
private static final int MIN_READY_BUFFERS = 16;
public MessageInputStream(I2PAppContext ctx, int maxMessageSize, int maxWindowSize, int maxBufferSize) {
_log = ctx.logManager().getLog(MessageInputStream.class);
_readyDataBlocks = new ArrayList<ByteArray>(4);
_highestReadyBlockId = -1;
@@ -68,6 +73,9 @@ class MessageInputStream extends InputStream {
_readTimeout = -1;
_notYetReadyBlocks = new HashMap<Long, ByteArray>(4);
_dataLock = new Object();
_maxMessageSize = maxMessageSize;
_maxWindowSize = maxWindowSize;
_maxBufferSize = maxBufferSize;
//_cache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE);
}
@@ -89,6 +97,62 @@ class MessageInputStream extends InputStream {
}
}
/**
* Determine if this packet will fit in our buffering limits.
*
* @return true if we have room. If false, do not call messageReceived()
* @since 0.9.20 moved from ConnectionPacketHandler.receivePacket() so it can all be under one lock,
* and we can efficiently do several checks
*/
public boolean canAccept(long messageId, int payloadSize) {
if (payloadSize <= 0)
return true;
if (messageId < MIN_READY_BUFFERS)
return true;
synchronized (_dataLock) {
// always accept if closed, will be processed elsewhere
if (_locallyClosed)
return true;
// ready dup check
// we always allow sequence numbers less than or equal to highest received
if (messageId <= _highestReadyBlockId)
return true;
// shortcut test, assuming all ready and not ready blocks are max size,
// to avoid iterating through all the ready blocks in getTotalReadySize()
if ((_readyDataBlocks.size() + _notYetReadyBlocks.size()) * _maxMessageSize < _maxBufferSize)
return true;
// not ready dup check
if (_notYetReadyBlocks.containsKey(Long.valueOf(messageId)))
return true;
// less efficient starting here
// Here, for the purposes of calculating whether the input stream is full,
// we assume all the not-ready blocks are the max message size.
// This prevents us from getting DoSed by accepting unlimited out-of-order small messages
int available = _maxBufferSize - getTotalReadySize();
if (available <= 0) {
if (_log.shouldWarn())
_log.warn("Dropping message " + messageId + ", inbound buffer exceeded: available = " +
available);
return false;
}
// following code screws up if available < 0
int allowedBlocks = available / _maxMessageSize;
if (messageId > _highestReadyBlockId + allowedBlocks) {
if (_log.shouldWarn())
_log.warn("Dropping message " + messageId + ", inbound buffer exceeded: " +
_highestReadyBlockId + '/' + (_highestReadyBlockId + allowedBlocks) + '/' + available);
return false;
}
// This prevents us from getting DoSed by accepting unlimited in-order small messages
if (_readyDataBlocks.size() >= 4 * _maxWindowSize) {
if (_log.shouldWarn())
_log.warn("Dropping message " + messageId + ", too many ready blocks");
return false;
}
}
return true;
}
/**
* Retrieve the message IDs that are holes in our sequence - ones
* past the highest ready ID and below the highest received message
@@ -227,7 +291,7 @@ class MessageInputStream extends InputStream {
/**
* A new message has arrived - toss it on the appropriate queue (moving
* previously pending messages to the ready queue if it fills the gap, etc).
* This does no limiting of pending data - it must be limited in ConnectionPacketHandler.
* This does no limiting of pending data - see canAccept() for limiting.
*
* @param messageId ID of the message
* @param payload message payload, may be null or have null or zero-length data
@@ -486,10 +550,9 @@ class MessageInputStream extends InputStream {
int numBytes = 0;
for (int i = 0; i < _readyDataBlocks.size(); i++) {
ByteArray cur = _readyDataBlocks.get(i);
if (i == 0)
numBytes += cur.getValid() - _readyDataBlockIndex;
else
numBytes += cur.getValid();
if (i == 0)
numBytes -= _readyDataBlockIndex;
}
return numBytes;
}

View File

@@ -100,10 +100,11 @@ class PacketHandler {
Connection con = (sendId > 0 ? _manager.getConnectionByInboundId(sendId) : null);
if (con != null) {
if (_log.shouldLog(Log.INFO))
if (_log.shouldDebug())
displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO());
receiveKnownCon(con, packet);
} else {
if (_log.shouldDebug())
displayPacket(packet, "UNKN", null);
receiveUnknownCon(packet, sendId, queueIfNoConn);
}
@@ -112,8 +113,9 @@ class PacketHandler {
}
private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS");
/** logs to System.out, and router log at debug level */
void displayPacket(Packet packet, String prefix, String suffix) {
if (!_log.shouldLog(Log.INFO)) return;
StringBuilder buf = new StringBuilder(256);
synchronized (_fmt) {
buf.append(_fmt.format(new Date()));
@@ -124,7 +126,6 @@ class PacketHandler {
buf.append(" ").append(suffix);
String str = buf.toString();
System.out.println(str);
if (_log.shouldLog(Log.DEBUG))
_log.debug(str);
}

View File

@@ -214,6 +214,7 @@ class PacketQueue implements SendMessageStatusListener {
}
Connection c = packet.getConnection();
String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO() : null);
if (_log.shouldDebug())
_connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix);
if (I2PSocketManagerFull.pcapWriter != null &&
_context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP))