NTCP: Move wantsWrite(byte[]) from EventPumper to NTCPConnection

for sanity in following the write code path, rather than
going from con to pumper to con, keep the code in con.
Prep for possible write-side improvements in a future release,
ref: http://zzz.i2p/topics/3192
This commit is contained in:
zzz
2021-11-16 11:09:05 -05:00
parent 0e4d684e7d
commit 6ef4c74d97
4 changed files with 38 additions and 36 deletions

View File

@@ -426,35 +426,6 @@ class EventPumper implements Runnable {
}
}
/**
* Called by the connection when it has data ready to write.
* If we have bandwidth, calls con.Write() which calls wantsWrite(con).
* If no bandwidth, calls con.queuedWrite().
*/
public void wantsWrite(NTCPConnection con, byte data[]) {
wantsWrite(con, data, 0, data.length);
}
/**
* Called by the connection when it has data ready to write.
* If we have bandwidth, calls con.Write() which calls wantsWrite(con).
* If no bandwidth, calls con.queuedWrite().
*
* @since 0.9.35 off/len version
*/
public void wantsWrite(NTCPConnection con, byte data[], int off, int len) {
ByteBuffer buf = ByteBuffer.wrap(data, off, len);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(len, 0, "NTCP write");//con, buf);
if (req.getPendingRequested() > 0) {
if (_log.shouldLog(Log.INFO))
_log.info("queued write on " + con + " for " + len);
_context.statManager().addRateData("ntcp.wantsQueuedWrite", 1);
con.queuedWrite(buf, req);
} else {
con.write(buf);
}
}
/**
* Called by the connection when it has data ready to write (after bw allocation).
* Only wakeup if new.

View File

@@ -537,7 +537,7 @@ class InboundEstablishState extends EstablishBase implements NTCP2Payload.Payloa
changeState(State.IB_NTCP2_SENT_Y);
// send it all at once
_transport.getPumper().wantsWrite(_con, tmp);
_con.wantsWrite(tmp);
}
/**

View File

@@ -919,7 +919,7 @@ public class NTCPConnection implements Closeable {
}
_log.debug(buf.toString());
}
_transport.getPumper().wantsWrite(this, enc);
wantsWrite(enc);
toLong8LE(_sendSipIV, 0, sipIV);
}
@@ -990,7 +990,38 @@ public class NTCPConnection implements Closeable {
private void addOBRequest(FIFOBandwidthLimiter.Request req) {
_bwOutRequests.add(req);
}
/**
* Call when there is data ready to write.
* If we have bandwidth, calls write() which calls EventPumnper.wantsWrite(con).
* If no bandwidth, calls queuedWrite().
*
* @since moved from EventPumper in 0.9.52
*/
void wantsWrite(byte data[]) {
wantsWrite(data, 0, data.length);
}
/**
* Call when there is data ready to write.
* If we have bandwidth, calls write() which calls EventPumnper.wantsWrite(con).
* If no bandwidth, calls queuedWrite().
*
* @since 0.9.35 off/len version, moved from EventPumper in 0.9.52
*/
void wantsWrite(byte data[], int off, int len) {
ByteBuffer buf = ByteBuffer.wrap(data, off, len);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(len, 0, "NTCP write");
if (req.getPendingRequested() > 0) {
if (_log.shouldInfo())
_log.info("queued write on " + toString() + " for " + len);
_context.statManager().addRateData("ntcp.wantsQueuedWrite", 1);
queuedWrite(buf, req);
} else {
write(buf);
}
}
/**
* We have read the data in the buffer, but we can't process it locally yet,
* because we're choked by the bandwidth limiter. Cache the contents of
@@ -1004,7 +1035,7 @@ public class NTCPConnection implements Closeable {
}
/** ditto for writes */
void queuedWrite(ByteBuffer buf, FIFOBandwidthLimiter.Request req) {
private void queuedWrite(ByteBuffer buf, FIFOBandwidthLimiter.Request req) {
req.attach(buf);
req.setCompleteListener(_outboundListener);
addOBRequest(req);
@@ -1034,7 +1065,7 @@ public class NTCPConnection implements Closeable {
* The contents of the buffer have been encrypted / padded / etc and have
* been fully allocated for the bandwidth limiter.
*/
void write(ByteBuffer buf) {
private void write(ByteBuffer buf) {
_writeBufs.offer(buf);
_transport.getPumper().wantsWrite(this);
}

View File

@@ -254,7 +254,7 @@ class OutboundNTCP2State implements EstablishState {
changeState(State.OB_SENT_X);
// send it all at once
_transport.getPumper().wantsWrite(_con, _tmp, 0, MSG1_SIZE + padlen1);
_con.wantsWrite(_tmp, 0, MSG1_SIZE + padlen1);
}
/**
@@ -392,7 +392,7 @@ class OutboundNTCP2State implements EstablishState {
// send it all at once
if (_log.shouldDebug())
_log.debug("Sending msg3, part 1 is:\n" + net.i2p.util.HexDump.dump(tmp, 0, MSG3P1_SIZE));
_transport.getPumper().wantsWrite(_con, tmp);
_con.wantsWrite(tmp);
if (_log.shouldDebug())
_log.debug("After msg 3: " + _handshakeState.toString());
setDataPhase();