This commit is contained in:
zzz
2010-10-02 14:02:41 +00:00
parent 2bffeea7eb
commit 171e3abe34

View File

@@ -11,7 +11,7 @@ import java.util.Map;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.util.ByteCache;
//import net.i2p.util.ByteCache;
import net.i2p.util.Log;
/**
@@ -20,8 +20,8 @@ import net.i2p.util.Log;
*
*/
public class MessageInputStream extends InputStream {
private I2PAppContext _context;
private Log _log;
private final I2PAppContext _context;
private final Log _log;
/**
* List of ByteArray objects of data ready to be read,
* with the first ByteArray at index 0, and the next
@@ -29,7 +29,7 @@ public class MessageInputStream extends InputStream {
* that array.
*
*/
private List _readyDataBlocks;
private final List<ByteArray> _readyDataBlocks;
private int _readyDataBlockIndex;
/** highest message ID used in the readyDataBlocks */
private volatile long _highestReadyBlockId;
@@ -40,7 +40,7 @@ public class MessageInputStream extends InputStream {
* out of order when there are lower IDs not yet
* received
*/
private Map _notYetReadyBlocks;
private final Map<Long, ByteArray> _notYetReadyBlocks;
/**
* if we have received a flag saying there won't be later messages, EOF
* after we have cleared what we have received.
@@ -51,9 +51,9 @@ public class MessageInputStream extends InputStream {
private int _readTimeout;
private IOException _streamError;
private long _readTotal;
private ByteCache _cache;
//private ByteCache _cache;
private byte[] _oneByte = new byte[1];
private final byte[] _oneByte = new byte[1];
private final Object _dataLock;
@@ -61,16 +61,12 @@ public class MessageInputStream extends InputStream {
_context = ctx;
_log = ctx.logManager().getLog(MessageInputStream.class);
_readyDataBlocks = new ArrayList(4);
_readyDataBlockIndex = 0;
_highestReadyBlockId = -1;
_highestBlockId = -1;
_readTimeout = -1;
_readTotal = 0;
_notYetReadyBlocks = new HashMap(4);
_dataLock = new Object();
_closeReceived = false;
_locallyClosed = false;
_cache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE);
//_cache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE);
}
/** What is the highest block ID we've completely received through?
@@ -140,10 +136,8 @@ public class MessageInputStream extends InputStream {
if (num <= 0) return null;
blocks = new long[num];
int i = 0;
for (Iterator iter = _notYetReadyBlocks.keySet().iterator(); iter.hasNext(); ) {
Long id = (Long)iter.next();
blocks[i] = id.longValue();
i++;
for (Long id : _notYetReadyBlocks.keySet()) {
blocks[i++] = id.longValue();
}
}
Arrays.sort(blocks);
@@ -178,16 +172,15 @@ public class MessageInputStream extends InputStream {
buf.append("Close received, ready bytes: ");
long available = 0;
for (int i = 0; i < _readyDataBlocks.size(); i++)
available += ((ByteArray)_readyDataBlocks.get(i)).getValid();
available += _readyDataBlocks.get(i).getValid();
available -= _readyDataBlockIndex;
buf.append(available);
buf.append(" blocks: ").append(_readyDataBlocks.size());
buf.append(" not ready blocks: ");
long notAvailable = 0;
for (Iterator iter = _notYetReadyBlocks.keySet().iterator(); iter.hasNext(); ) {
Long id = (Long)iter.next();
ByteArray ba = (ByteArray)_notYetReadyBlocks.get(id);
for (Long id : _notYetReadyBlocks.keySet()) {
ByteArray ba = _notYetReadyBlocks.get(id);
buf.append(id).append(" ");
if (ba != null)
@@ -237,7 +230,7 @@ public class MessageInputStream extends InputStream {
long cur = _highestReadyBlockId + 1;
// now pull in any previously pending blocks
while (_notYetReadyBlocks.containsKey(new Long(cur))) {
ByteArray ba = (ByteArray)_notYetReadyBlocks.remove(new Long(cur));
ByteArray ba = _notYetReadyBlocks.remove(new Long(cur));
if ( (ba != null) && (ba.getData() != null) && (ba.getValid() > 0) ) {
_readyDataBlocks.add(ba);
}
@@ -341,7 +334,7 @@ public class MessageInputStream extends InputStream {
return i;
} else {
// either was already ready, or we wait()ed and it arrived
ByteArray cur = (ByteArray)_readyDataBlocks.get(0);
ByteArray cur = _readyDataBlocks.get(0);
byte rv = cur.getData()[cur.getOffset()+_readyDataBlockIndex];
_readyDataBlockIndex++;
boolean removed = false;
@@ -378,7 +371,7 @@ public class MessageInputStream extends InputStream {
int numBytes = 0;
synchronized (_dataLock) {
for (int i = 0; i < _readyDataBlocks.size(); i++) {
ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
ByteArray cur = _readyDataBlocks.get(i);
if (i == 0)
numBytes += cur.getValid() - _readyDataBlockIndex;
else
@@ -402,14 +395,13 @@ public class MessageInputStream extends InputStream {
if (_locallyClosed) return 0;
int numBytes = 0;
for (int i = 0; i < _readyDataBlocks.size(); i++) {
ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
ByteArray cur = _readyDataBlocks.get(i);
if (i == 0)
numBytes += cur.getValid() - _readyDataBlockIndex;
else
numBytes += cur.getValid();
}
for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) {
ByteArray cur = (ByteArray)iter.next();
for (ByteArray cur : _notYetReadyBlocks.values()) {
numBytes += cur.getValid();
}
return numBytes;
@@ -421,7 +413,7 @@ public class MessageInputStream extends InputStream {
if (_locallyClosed) return 0;
int numBytes = 0;
for (int i = 0; i < _readyDataBlocks.size(); i++) {
ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
ByteArray cur = _readyDataBlocks.get(i);
if (i == 0)
numBytes += cur.getValid() - _readyDataBlockIndex;
else
@@ -440,8 +432,7 @@ public class MessageInputStream extends InputStream {
// we don't need the data, but we do need to keep track of the messageIds
// received, so we can ACK accordingly
for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) {
ByteArray ba = (ByteArray)iter.next();
for (ByteArray ba : _notYetReadyBlocks.values()) {
ba.setData(null);
//_cache.release(ba);
}