- Adjust RTT/RTO calculations
   - Better bandwidth tracking
   - Cleanup of OutboundMessageState
   - Stat tweaks
 * Transports: Increase min peer port to 1024
This commit is contained in:
zzz
2012-10-30 18:16:37 +00:00
parent d92f5e6508
commit 4ce11a174a
10 changed files with 181 additions and 157 deletions

View File

@@ -1,3 +1,30 @@
2012-10-30 zzz
* i2psnark:
- Add kbucket debugging
- Eliminate redundant explore keys
- Add more limits to DHT tracker
- Delay expiration at startup
- Only enable updates for dev builds and 1% of release builds
* i2ptunnel:
- Create backup privkey files (ticket #752)
- Fix NPE in Android startup
- Fix disabling proxy authorization
* Installer: Drop news.xml and old certs
* logs.jsp:
- Don't display dup message if last
- Spacing tweaks
* OutNetMessage: Properly clean up when dropped by codel (but unused for now
since codel is disabled for ONM)
* SSU:
- Adjust RTT/RTO calculations
- Better bandwidth tracking
- Cleanup of OutboundMessageState
- Stat tweaks
* StatisticsManager: Publish stats less often
* Transports: Increase min peer port to 1024
* Tunnels: Implement per-client outbound tunnel message priority (ticket #719)
* Update Manager: Warn on dup registration
2012-10-30 sponge
* cleanups as requested

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 1;
public final static long BUILD = 2;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -269,7 +269,7 @@ public abstract class TransportImpl implements Transport {
+ " to " + msg.getTarget().getIdentity().getHash().toBase64()
+ " (details: " + msg + ')');
if (msg.getExpiration() < _context.clock().now())
_context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime, lifetime);
_context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime);
if (allowRequeue) {
if ( ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) )
@@ -343,7 +343,7 @@ public abstract class TransportImpl implements Transport {
if (sendSuccessful) {
// TODO fix this stat for SSU ticket #698
_context.statManager().addRateData("transport.sendProcessingTime", lifetime, lifetime);
_context.statManager().addRateData("transport.sendProcessingTime", lifetime);
// object churn. 33 ms for NTCP and 788 for SSU, but meaningless due to
// differences in how it's computed (immediate vs. round trip)
//_context.statManager().addRateData("transport.sendProcessingTime." + getStyle(), lifetime, 0);
@@ -351,7 +351,7 @@ public abstract class TransportImpl implements Transport {
_context.statManager().addRateData("transport.sendMessageSize", msg.getMessageSize(), sendTime);
} else {
_context.profileManager().messageFailed(msg.getTarget().getIdentity().getHash(), getStyle());
_context.statManager().addRateData("transport.sendMessageFailureLifetime", lifetime, lifetime);
_context.statManager().addRateData("transport.sendMessageFailureLifetime", lifetime);
}
}
@@ -433,9 +433,9 @@ public abstract class TransportImpl implements Transport {
_context.statManager().addRateData("transport.receiveMessageSize", bytesReceived, msToReceive);
}
_context.statManager().addRateData("transport.receiveMessageTime", msToReceive, msToReceive);
_context.statManager().addRateData("transport.receiveMessageTime", msToReceive);
if (msToReceive > 1000) {
_context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive, msToReceive);
_context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive);
}
//// this functionality is built into the InNetMessagePool

View File

@@ -70,11 +70,11 @@ public class NTCPTransport extends TransportImpl {
private static final long[] RATES = { 10*60*1000 };
/**
* To prevent trouble. To be raised to 1024 in 0.9.4.
* To prevent trouble. 1024 as of 0.9.4.
*
* @since 0.9.3
*/
private static final int MIN_PEER_PORT = 500;
private static final int MIN_PEER_PORT = 1024;
// Opera doesn't have the char, TODO check UA
//private static final String THINSP = "&thinsp;/&thinsp;";

View File

@@ -154,8 +154,8 @@ class EstablishmentManager {
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES);

View File

@@ -80,8 +80,8 @@ class OutboundMessageFragments {
_context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the peer's active pool", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.outboundActivePeers", "How many peers we are actively sending to", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed (time == message lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendSparse", "How many fragments were partially ACKed and hence not resent (time == message lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPiggyback", "How many acks were piggybacked on a data packet (time == message lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPiggybackPartial", "How many partial acks were piggybacked on a data packet (time == message lifetime)", "udp", UDPTransport.RATES);
@@ -152,25 +152,18 @@ class OutboundMessageFragments {
public void add(OutNetMessage msg) {
I2NPMessage msgBody = msg.getMessage();
RouterInfo target = msg.getTarget();
if ( (msgBody == null) || (target == null) )
if (target == null)
return;
// todo: make sure the outNetMessage is initialzed once and only once
OutboundMessageState state = new OutboundMessageState(_context);
boolean ok = state.initialize(msg, msgBody);
if (ok) {
PeerState peer = _transport.getPeerState(target.getIdentity().calculateHash());
if (peer == null) {
_transport.failed(msg, "Peer disconnected quickly");
state.releaseResources();
return;
}
PeerState peer = _transport.getPeerState(target.getIdentity().calculateHash());
try {
// will throw IAE if peer == null
OutboundMessageState state = new OutboundMessageState(_context, msg, peer);
peer.add(state);
add(peer);
//_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Error initializing " + msg);
} catch (IllegalArgumentException iae) {
_transport.failed(msg, "Peer disconnected quickly");
return;
}
}

View File

@@ -19,17 +19,19 @@ class OutboundMessageState implements CDPQEntry {
private final I2PAppContext _context;
private final Log _log;
/** may be null if we are part of the establishment */
private OutNetMessage _message;
private long _messageId;
private final OutNetMessage _message;
private final long _messageId;
/** will be null, unless we are part of the establishment */
private PeerState _peer;
private long _expiration;
private final PeerState _peer;
private final long _expiration;
private ByteArray _messageBuf;
/** fixed fragment size across the message */
private int _fragmentSize;
/** size of the I2NP message */
private final int _totalSize;
/** sends[i] is how many times the fragment has been sent, or -1 if ACKed */
private short _fragmentSends[];
private long _startedOn;
private final long _startedOn;
private long _nextSendTime;
private int _pushCount;
private short _maxSends;
@@ -59,43 +61,15 @@ class OutboundMessageState implements CDPQEntry {
private static final long EXPIRATION = 10*1000;
public OutboundMessageState(I2PAppContext context) {
_context = context;
_log = _context.logManager().getLog(OutboundMessageState.class);
}
/****
public boolean initialize(OutNetMessage msg) {
if (msg == null) return false;
try {
return initialize(msg, msg.getMessage(), null);
} catch (OutOfMemoryError oom) {
throw oom;
} catch (Exception e) {
_log.log(Log.CRIT, "Error initializing " + msg, e);
return false;
}
}
****/
/**
* Called from UDPTransport
* TODO make two constructors, remove this, and make more things final
* @return success
* @throws IAE if too big
*/
public boolean initialize(I2NPMessage msg, PeerState peer) {
if (msg == null)
return false;
try {
return initialize(null, msg, peer);
} catch (OutOfMemoryError oom) {
throw oom;
} catch (Exception e) {
_log.log(Log.CRIT, "Error initializing " + msg, e);
return false;
}
public OutboundMessageState(I2PAppContext context, I2NPMessage msg, PeerState peer) {
this(context, null, msg, peer);
}
/**
@@ -104,18 +78,8 @@ class OutboundMessageState implements CDPQEntry {
* @return success
* @throws IAE if too big
*/
public boolean initialize(OutNetMessage m, I2NPMessage msg) {
if ( (m == null) || (msg == null) )
return false;
try {
return initialize(m, msg, null);
} catch (OutOfMemoryError oom) {
throw oom;
} catch (Exception e) {
_log.log(Log.CRIT, "Error initializing " + msg, e);
return false;
}
public OutboundMessageState(I2PAppContext context, OutNetMessage m, PeerState peer) {
this(context, m, m.getMessage(), peer);
}
/**
@@ -124,28 +88,26 @@ class OutboundMessageState implements CDPQEntry {
* @return success
* @throws IAE if too big
*/
private boolean initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
private OutboundMessageState(I2PAppContext context, OutNetMessage m, I2NPMessage msg, PeerState peer) {
if (msg == null || peer == null)
throw new IllegalArgumentException();
_context = context;
_log = _context.logManager().getLog(OutboundMessageState.class);
_message = m;
_peer = peer;
int size = msg.getRawMessageSize();
acquireBuf(size);
try {
int len = msg.toRawByteArray(_messageBuf.getData());
_messageBuf.setValid(len);
_messageId = msg.getUniqueId();
_totalSize = msg.toRawByteArray(_messageBuf.getData());
_messageBuf.setValid(_totalSize);
_messageId = msg.getUniqueId();
_startedOn = _context.clock().now();
_nextSendTime = _startedOn;
_expiration = _startedOn + EXPIRATION;
//_expiration = msg.getExpiration();
_startedOn = _context.clock().now();
_nextSendTime = _startedOn;
_expiration = _startedOn + EXPIRATION;
//_expiration = msg.getExpiration();
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
return true;
} catch (IllegalStateException ise) {
releaseBuf();
return false;
}
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
}
/**
@@ -203,7 +165,6 @@ class OutboundMessageState implements CDPQEntry {
public OutNetMessage getMessage() { return _message; }
public long getMessageId() { return _messageId; }
public PeerState getPeer() { return _peer; }
public void setPeer(PeerState peer) { _peer = peer; }
public boolean isExpired() {
return _expiration < _context.clock().now();
@@ -224,8 +185,7 @@ class OutboundMessageState implements CDPQEntry {
ByteArray messageBuf = _messageBuf;
int rv = 0;
if ( (messageBuf != null) && (fragmentSends != null) ) {
int totalSize = messageBuf.getValid();
int lastSize = totalSize % _fragmentSize;
int lastSize = _totalSize % _fragmentSize;
if (lastSize == 0)
lastSize = _fragmentSize;
for (int i = 0; i < fragmentSends.length; i++) {
@@ -286,11 +246,22 @@ class OutboundMessageState implements CDPQEntry {
public long getNextSendTime() { return _nextSendTime; }
public void setNextSendTime(long when) { _nextSendTime = when; }
/**
* The max number of sends for any fragment, which is the
* same as the push count, at least as it's coded now.
*/
public int getMaxSends() { return _maxSends; }
/**
* The number of times we've pushed some fragments, which is the
* same as the max sends, at least as it's coded now.
*/
public int getPushCount() { return _pushCount; }
/** note that we have pushed the message fragments */
public void push() {
// these will never be different...
_pushCount++;
if (_pushCount > _maxSends)
_maxSends = (short)_pushCount;
@@ -301,23 +272,35 @@ class OutboundMessageState implements CDPQEntry {
}
/**
* Whether fragment() has been called.
* NOT whether it has more than one fragment.
*
* Caller should synchronize
*
* @return true iff fragment() has been called previously
*/
public boolean isFragmented() { return _fragmentSends != null; }
/**
* Prepare the message for fragmented delivery, using no more than
* fragmentSize bytes per fragment.
*
* Caller should synchronize
*
* @throws IllegalStateException if called more than once
*/
public void fragment(int fragmentSize) {
int totalSize = _messageBuf.getValid();
int numFragments = totalSize / fragmentSize;
if (numFragments * fragmentSize < totalSize)
if (_fragmentSends != null)
throw new IllegalStateException();
int numFragments = _totalSize / fragmentSize;
if (numFragments * fragmentSize < _totalSize)
numFragments++;
// This should never happen, as 534 bytes * 64 fragments > 32KB, and we won't bid on > 32KB
if (numFragments > InboundMessageState.MAX_FRAGMENTS)
throw new IllegalArgumentException("Fragmenting a " + totalSize + " message into " + numFragments + " fragments - too many!");
throw new IllegalArgumentException("Fragmenting a " + _totalSize + " message into " + numFragments + " fragments - too many!");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Fragmenting a " + totalSize + " message into " + numFragments + " fragments");
_log.debug("Fragmenting a " + _totalSize + " message into " + numFragments + " fragments");
//_fragmentEnd = new int[numFragments];
_fragmentSends = new short[numFragments];
@@ -327,7 +310,13 @@ class OutboundMessageState implements CDPQEntry {
_fragmentSize = fragmentSize;
}
/** how many fragments in the message */
/**
* How many fragments in the message.
* Only valid after fragment() has been called.
* Returns -1 before then.
*
* Caller should synchronize
*/
public int getFragmentCount() {
if (_fragmentSends == null)
return -1;
@@ -335,15 +324,26 @@ class OutboundMessageState implements CDPQEntry {
return _fragmentSends.length;
}
public int getFragmentSize() { return _fragmentSize; }
/**
* The size of the I2NP message. Does not include any SSU overhead.
*
* Caller should synchronize
*/
public int getMessageSize() { return _totalSize; }
/** should we continue sending this fragment? */
/**
* Should we continue sending this fragment?
* Only valid after fragment() has been called.
* Throws NPE before then.
*
* Caller should synchronize
*/
public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; }
public int fragmentSize(int fragmentNum) {
if (_messageBuf == null) return -1;
if (fragmentNum + 1 == _fragmentSends.length) {
int valid = _messageBuf.getValid();
int valid = _totalSize;
if (valid <= _fragmentSize)
return valid;
// bugfix 0.8.12
@@ -406,7 +406,7 @@ class OutboundMessageState implements CDPQEntry {
System.arraycopy(_messageBuf.getData(), start, out, outOffset, toSend);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Raw fragment[" + fragmentNum + "] for " + _messageId
+ "[" + start + "-" + (start+toSend) + "/" + _messageBuf.getValid() + "/" + _fragmentSize + "]: "
+ "[" + start + "-" + (start+toSend) + "/" + _totalSize + "/" + _fragmentSize + "]: "
+ Base64.encode(out, outOffset, toSend));
return toSend;
} else {
@@ -467,13 +467,11 @@ class OutboundMessageState implements CDPQEntry {
@Override
public String toString() {
short sends[] = _fragmentSends;
ByteArray messageBuf = _messageBuf;
StringBuilder buf = new StringBuilder(256);
buf.append("OB Message ").append(_messageId);
if (sends != null)
buf.append(" with ").append(sends.length).append(" fragments");
if (messageBuf != null)
buf.append(" of size ").append(messageBuf.getValid());
buf.append(" of size ").append(_totalSize);
buf.append(" volleys: ").append(_maxSends);
buf.append(" lifetime: ").append(getLifetime());
if (sends != null) {

View File

@@ -289,7 +289,6 @@ class PeerState {
*/
public static final int LARGE_MTU = 1484;
/** 600 */
private static final int MIN_RTO = 100 + ACKSender.ACK_FREQUENCY;
private static final int INIT_RTO = 3*1000;
public static final int INIT_RTT = INIT_RTO / 2;
@@ -714,7 +713,7 @@ class PeerState {
//_receiveACKBytes = 0;
_receiveBytes = 0;
_receivePeriodBegin = now;
_context.statManager().addRateData("udp.receiveBps", _receiveBps, 0);
_context.statManager().addRateData("udp.receiveBps", _receiveBps);
}
if (_wantACKSendSince <= 0)
@@ -1102,31 +1101,32 @@ class PeerState {
if (numSends >= 2 && _log.shouldLog(Log.INFO))
_log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
_context.statManager().addRateData("udp.sendBps", _sendBps, lifetime);
_context.statManager().addRateData("udp.sendBps", _sendBps);
}
/** This is the value specified in RFC 2988 */
private static final float RTT_DAMPENING = 0.125f;
/**
* Adjust the tcp-esque timeouts.
* Caller should synch on this
*/
private void recalculateTimeouts(long lifetime) {
// the rttDev calculation matches that recommended in RFC 2988 (beta = 1/4)
_rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation));
float scale = RTT_DAMPENING;
// the faster we are going, the slower we want to reduce the rtt
float scale = 0.1f;
if (_sendBps > 0)
scale = lifetime / ((float)lifetime + (float)_sendBps);
if (scale < 0.001f) scale = 0.001f;
//if (_sendBps > 0)
// scale = lifetime / ((float)lifetime + (float)_sendBps);
//if (scale < 0.001f) scale = 0.001f;
_rtt = (int)(_rtt*(1.0f-scale) + (scale)*lifetime);
_rto = _rtt + (_rttDeviation<<2);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt
+ " rttDev=" + _rttDeviation + " rto=" + _rto);
if (_rto < minRTO())
_rto = minRTO();
else if (_rto > MAX_RTO)
_rto = MAX_RTO;
// K = 4
_rto = Math.min(MAX_RTO, Math.max(minRTO(), _rtt + (_rttDeviation<<2)));
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt
// + " rttDev=" + _rttDeviation + " rto=" + _rto);
}
/**
@@ -1141,12 +1141,12 @@ class PeerState {
if (_context.random().nextLong(_mtuDecreases) <= 0) {
_mtu = _largeMTU;
_mtuIncreases++;
_context.statManager().addRateData("udp.mtuIncrease", _mtuIncreases, _mtuDecreases);
_context.statManager().addRateData("udp.mtuIncrease", _mtuIncreases);
}
} else if (!wantLarge && _mtu == _largeMTU) {
_mtu = MIN_MTU;
_mtuDecreases++;
_context.statManager().addRateData("udp.mtuDecrease", _mtuDecreases, _mtuIncreases);
_context.statManager().addRateData("udp.mtuDecrease", _mtuDecreases);
}
} else {
_mtu = DEFAULT_MTU;
@@ -1178,7 +1178,7 @@ class PeerState {
_packetsRetransmitted = packets;
}
*****/
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps);
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes);
_context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation);
synchronized (this) {
congestionOccurred();
@@ -1250,7 +1250,7 @@ class PeerState {
synchronized(this) {
congestionOccurred();
}
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps);
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes);
_currentSecondECNReceived = true;
_lastReceiveTime = _context.clock().now();
}
@@ -1323,7 +1323,12 @@ class PeerState {
_transport.failed(state, false);
return;
}
state.setPeer(this);
if (state.getPeer() != this) {
if (_log.shouldLog(Log.WARN))
_log.warn("Not for me!", new Exception("I did it"));
_transport.failed(state, false);
return;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding to " + _remotePeer + ": " + state.getMessageId());
int rv = 0;
@@ -1452,14 +1457,14 @@ class PeerState {
iter.remove();
if (_retransmitter == state)
_retransmitter = null;
_context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime());
_context.statManager().addRateData("udp.sendFailed", state.getPushCount());
if (failed == null) failed = new ArrayList(4);
failed.add(state);
} else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
iter.remove();
if (state == _retransmitter)
_retransmitter = null;
_context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime());
_context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount());
if (failed == null) failed = new ArrayList(4);
failed.add(state);
} // end (pushCount > maxVolleys)
@@ -1624,7 +1629,7 @@ class PeerState {
/**
* how much payload data can we shove in there?
* @return MTU - 87, i.e. 521 or 1401
* @return MTU - 87, i.e. 533 or 1397
*/
private static final int fragmentSize(int mtu) {
// 46 + 20 + 8 + 13 = 74 + 13 = 87
@@ -1659,7 +1664,7 @@ class PeerState {
if ( (retrans != null) && (retrans != state) ) {
// choke it, since there's already another message retransmitting to this
// peer.
_context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted, _packetsTransmitted);
_context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted);
int max = state.getMaxSends();
if ( (max <= 0) && (!THROTTLE_INITIAL_SEND) ) {
//if (state.getMessage() != null)
@@ -1695,7 +1700,7 @@ class PeerState {
// _throttle.unchoke(peer.getRemotePeer());
return ShouldSend.YES;
} else {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
_context.statManager().addRateData("udp.sendRejected", state.getPushCount());
//if (state.getMessage() != null)
// state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
if (_log.shouldLog(Log.INFO))
@@ -1760,15 +1765,14 @@ class PeerState {
if (_log.shouldLog(Log.INFO))
_log.info("Received ack of " + messageId + " by " + _remotePeer
+ " after " + state.getLifetime() + " and " + numSends + " sends");
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime());
if (state.getFragmentCount() > 1)
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
if (numSends > 1)
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount());
_context.statManager().addRateData("udp.sendConfirmVolley", numSends);
_transport.succeeded(state);
int numFragments = state.getFragmentCount();
// this adjusts the rtt/rto/window/etc
messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), numSends);
messageACKed(state.getMessageSize(), state.getLifetime(), numSends);
//if (getSendWindowBytesRemaining() > 0)
// _throttle.unchoke(peer.getRemotePeer());
@@ -1828,7 +1832,7 @@ class PeerState {
if (bitfield.received(i))
numACKed++;
_context.statManager().addRateData("udp.partialACKReceived", numACKed, state.getLifetime());
_context.statManager().addRateData("udp.partialACKReceived", numACKed);
if (_log.shouldLog(Log.INFO))
_log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
@@ -1836,17 +1840,16 @@ class PeerState {
+ isComplete + ": " + state);
if (isComplete) {
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime());
if (state.getFragmentCount() > 1)
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
if (numSends > 1)
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount());
_context.statManager().addRateData("udp.sendConfirmVolley", numSends);
//if (state.getMessage() != null)
// state.getMessage().timestamp("partial ack to complete after " + numSends);
_transport.succeeded(state);
// this adjusts the rtt/rto/window/etc
messageACKed(state.getFragmentCount()*state.getFragmentSize(), state.getLifetime(), 0);
messageACKed(state.getMessageSize(), state.getLifetime(), numSends);
//if (state.getPeer().getSendWindowBytesRemaining() > 0)
// _throttle.unchoke(state.getPeer().getRemotePeer());

View File

@@ -52,9 +52,9 @@ class UDPReceiver {
_socket = socket;
_transport = transport;
_runner = new Runner();
_context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveRemaining", "How many packets are left sitting on the receiver's queue", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receiveRemaining", "How many packets are left sitting on the receiver's queue", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveHolePunch", "How often we receive a NAT hole punch", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.ignorePacketFromDroplist", "Packet lifetime for those dropped on the drop list", "udp", UDPTransport.RATES);
}
@@ -165,7 +165,7 @@ class UDPReceiver {
if (_transport.isInDropList(from)) {
if (_log.shouldLog(Log.INFO))
_log.info("Ignoring packet from the drop-listed peer: " + from);
_context.statManager().addRateData("udp.ignorePacketFromDroplist", packet.getLifetime(), 0);
_context.statManager().addRateData("udp.ignorePacketFromDroplist", packet.getLifetime());
packet.release();
return 0;
}
@@ -296,10 +296,10 @@ class UDPReceiver {
while (req.getPendingRequested() > 0)
req.waitForNextAllocation();
int queued = receive(packet);
_context.statManager().addRateData("udp.receivePacketSize", size, queued);
receive(packet);
//_context.statManager().addRateData("udp.receivePacketSize", size);
} else {
_context.statManager().addRateData("udp.receiveHolePunch", 1, 0);
_context.statManager().addRateData("udp.receiveHolePunch", 1);
// nat hole punch packets are 0 bytes
if (_log.shouldLog(Log.INFO))
_log.info("Received a 0 byte udp packet from " + packet.getPacket().getAddress() + ":" + packet.getPacket().getPort());

View File

@@ -113,11 +113,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public static final int DEFAULT_INTERNAL_PORT = 8887;
/**
* To prevent trouble. To be raised to 1024 in 0.9.4.
* To prevent trouble. 1024 as of 0.9.4.
*
* @since 0.9.3
*/
static final int MIN_PEER_PORT = 500;
static final int MIN_PEER_PORT = 1024;
/** Limits on port told to us by others,
* We should have an exception if it matches the existing low port.
@@ -1419,12 +1419,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* "injected" message from the EstablishmentManager
*/
void send(I2NPMessage msg, PeerState peer) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Injecting a data message to a new peer: " + peer);
OutboundMessageState state = new OutboundMessageState(_context);
boolean ok = state.initialize(msg, peer);
if (ok)
try {
OutboundMessageState state = new OutboundMessageState(_context, msg, peer);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Injecting a data message to a new peer: " + peer);
_fragments.add(state);
} catch (IllegalArgumentException iae) {
if (_log.shouldLog(Log.WARN))
_log.warn("Shouldnt happen", new Exception("I did it"));
}
}
// we don't need the following, since we have our own queueing