Streaming: Don't change buffer size when max message size is adjusted

Only change max message size when buffer is empty
This commit is contained in:
zzz
2017-03-09 19:40:17 +00:00
parent 9146f3c7e1
commit 48a055d462
3 changed files with 38 additions and 19 deletions

View File

@@ -30,7 +30,9 @@ class MessageOutputStream extends OutputStream {
private final AtomicBoolean _closed = new AtomicBoolean();
private long _written;
private int _writeTimeout;
private ByteCache _dataCache;
private final ByteCache _dataCache;
private final int _originalBufferSize;
private int _currentBufferSize;
private final Flusher _flusher;
private volatile long _lastBuffered;
/** if we enqueue data but don't flush it in this period, flush it passively */
@@ -68,6 +70,8 @@ class MessageOutputStream extends OutputStream {
DataReceiver receiver, int bufSize, int passiveFlushDelay) {
super();
_dataCache = ByteCache.getInstance(128, bufSize);
_originalBufferSize = bufSize;
_currentBufferSize = bufSize;
_context = ctx;
_log = ctx.logManager().getLog(MessageOutputStream.class);
_buf = _dataCache.acquire().getData(); // new byte[bufSize];
@@ -75,7 +79,7 @@ class MessageOutputStream extends OutputStream {
_dataLock = new Object();
_writeTimeout = -1;
_passiveFlushDelay = passiveFlushDelay;
_nextBufferSize = -1;
_nextBufferSize = 0;
//_sendPeriodBeginTime = ctx.clock().now();
//_context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
_flusher = new Flusher(timer);
@@ -92,7 +96,16 @@ class MessageOutputStream extends OutputStream {
public int getWriteTimeout() { return _writeTimeout; }
public void setBufferSize(int size) { _nextBufferSize = size; }
/**
* Caller should enforce a sane minimum.
*
* @param size must be greater than 0, and smaller than or equal to bufSize in constructor
*/
public void setBufferSize(int size) {
if (size <= 0 || size > _originalBufferSize)
return;
_nextBufferSize = size;
}
@Override
public void write(byte b[]) throws IOException {
@@ -115,8 +128,11 @@ class MessageOutputStream extends OutputStream {
// this is the only method that *adds* to the _buf, and all
// code that reads from it is synchronized
synchronized (_dataLock) {
// To simplify the code, and avoid losing data from shrinking the max size,
// we only update max size when current buffer is empty
final int maxBuffer = (_valid == 0) ? locked_updateBufferSize() : _currentBufferSize;
if (_buf == null) throw new IOException("closed (buffer went away)");
if (_valid + remaining < _buf.length) {
if (_valid + remaining < maxBuffer) {
// simply buffer the data, no flush
System.arraycopy(b, cur, _buf, _valid, remaining);
_valid += remaining;
@@ -131,19 +147,17 @@ class MessageOutputStream extends OutputStream {
// buffer whatever we can fit then flush,
// repeating until we've pushed all of the
// data through
int toWrite = _buf.length - _valid;
int toWrite = maxBuffer - _valid;
System.arraycopy(b, cur, _buf, _valid, toWrite);
remaining -= toWrite;
cur += toWrite;
_valid = _buf.length;
_valid = maxBuffer;
if (_log.shouldLog(Log.INFO))
_log.info("write() direct valid = " + _valid);
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
throwAnyError();
locked_updateBufferSize();
}
}
if (ws != null) {
@@ -207,17 +221,21 @@ class MessageOutputStream extends OutputStream {
/**
* If the other side requested we shrink our buffer, do so.
*
* @return the current buffer size
*/
private final void locked_updateBufferSize() {
private final int locked_updateBufferSize() {
int size = _nextBufferSize;
if (size > 0) {
// update the buffer size to the requested amount
_dataCache.release(new ByteArray(_buf));
_dataCache = ByteCache.getInstance(128, size);
ByteArray ba = _dataCache.acquire();
_buf = ba.getData();
_nextBufferSize = -1;
// No, never do this, to avoid ByteCache churn.
//_dataCache.release(new ByteArray(_buf));
//_dataCache = ByteCache.getInstance(128, size);
//ByteArray ba = _dataCache.acquire();
//_buf = ba.getData();
_currentBufferSize = size;
_nextBufferSize = 0;
}
return _currentBufferSize;
}
/**
@@ -273,7 +291,6 @@ class MessageOutputStream extends OutputStream {
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
locked_updateBufferSize();
_dataLock.notifyAll();
sent = true;
}
@@ -336,7 +353,6 @@ class MessageOutputStream extends OutputStream {
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
locked_updateBufferSize();
_dataLock.notifyAll();
}
}
@@ -409,7 +425,6 @@ class MessageOutputStream extends OutputStream {
ba = new ByteArray(_buf);
_buf = null;
_valid = 0;
locked_updateBufferSize();
}
_dataLock.notifyAll();
}
@@ -494,7 +509,6 @@ class MessageOutputStream extends OutputStream {
ws = target.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
locked_updateBufferSize();
_dataLock.notifyAll();
}
long afterBuild = System.currentTimeMillis();

View File

@@ -1,3 +1,8 @@
2017-03-09 zzz
* i2psnark: Prevent RuntimeException caused by corrupt i2psnark.dht.dat file
* Router: Set default sig type to EdDSA for non-Android ARM
* Streaming: Don't change buffer size when max message size is adjusted
2017-03-06 zzz
* CPUID:
- Fix saving of libjcpuid.jnifile on Macs,

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 2;
public final static long BUILD = 3;
/** for example "-test" */
public final static String EXTRA = "";