forked from I2P_Developers/i2p.i2p
* UDP:
- Complete rewrite of OutboundMessageFragments for concurrent and for efficiency to avoid O(n**2) behavior - Queue a new send immediately after a packet is acked - Cleanups, log tweaks, javadocs, final
This commit is contained in:
@@ -14,13 +14,14 @@ import net.i2p.util.Log;
|
||||
/**
|
||||
* Blocking thread that is given peers by the inboundFragment pool, sending out
|
||||
* any outstanding ACKs.
|
||||
*
|
||||
* The ACKs are sent directly to UDPSender,
|
||||
* bypassing OutboundMessageFragments and PacketPusher.
|
||||
*/
|
||||
class ACKSender implements Runnable {
|
||||
private RouterContext _context;
|
||||
private Log _log;
|
||||
private UDPTransport _transport;
|
||||
private PacketBuilder _builder;
|
||||
private final RouterContext _context;
|
||||
private final Log _log;
|
||||
private final UDPTransport _transport;
|
||||
private final PacketBuilder _builder;
|
||||
/** list of peers (PeerState) who we have received data from but not yet ACKed to */
|
||||
private final BlockingQueue<PeerState> _peersToACK;
|
||||
private boolean _alive;
|
||||
|
@@ -72,6 +72,19 @@ class EstablishmentManager {
|
||||
_context.statManager().createRateStat("udp.receiveIntroRelayResponse", "How long it took to receive a relay response", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.establishRejected", "How many pending outbound connections are there when we refuse to add any more?", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.establishOverflow", "How many messages were queued up on a pending connection when it was too much?", "udp", UDPTransport.RATES);
|
||||
// following are for PeerState
|
||||
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES);
|
||||
//_context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES);
|
||||
//_context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES);
|
||||
}
|
||||
|
||||
public void startup() {
|
||||
|
@@ -18,14 +18,14 @@ import net.i2p.util.Log;
|
||||
*
|
||||
*/
|
||||
class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
private RouterContext _context;
|
||||
private Log _log;
|
||||
private final RouterContext _context;
|
||||
private final Log _log;
|
||||
/** list of message IDs recently received, so we can ignore in flight dups */
|
||||
private DecayingBloomFilter _recentlyCompletedMessages;
|
||||
private OutboundMessageFragments _outbound;
|
||||
private UDPTransport _transport;
|
||||
private ACKSender _ackSender;
|
||||
private MessageReceiver _messageReceiver;
|
||||
private final OutboundMessageFragments _outbound;
|
||||
private final UDPTransport _transport;
|
||||
private final ACKSender _ackSender;
|
||||
private final MessageReceiver _messageReceiver;
|
||||
private boolean _alive;
|
||||
|
||||
/** decay the recently completed every 20 seconds */
|
||||
@@ -148,8 +148,8 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
from.messageFullyReceived(messageId, state.getCompleteSize());
|
||||
_ackSender.ackPeer(from);
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Message received completely! " + state);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Message received completely! " + state);
|
||||
|
||||
_context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
|
||||
if (state.getFragmentCount() > 0)
|
||||
@@ -174,10 +174,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
return fragments;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of bitfields in the ack? why?
|
||||
*/
|
||||
private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) {
|
||||
int rv = 0;
|
||||
boolean newAck = false;
|
||||
if (data.readACKsIncluded()) {
|
||||
int fragments = 0;
|
||||
int ackCount = data.readACKCount();
|
||||
if (ackCount > 0) {
|
||||
rv += ackCount;
|
||||
@@ -186,9 +189,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
|
||||
for (int i = 0; i < ackCount; i++) {
|
||||
long id = data.readACK(i);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Full ACK of message " + id + " received!");
|
||||
fragments += _outbound.acked(id, from.getRemotePeer());
|
||||
if (from.acked(id)) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("First full ACK of message " + id + " received from " + from.getRemotePeer());
|
||||
newAck = true;
|
||||
//} else if (_log.shouldLog(Log.DEBUG)) {
|
||||
// _log.debug("Dup full ACK of message " + id + " received from " + from.getRemotePeer());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
_log.error("Received ACKs with no acks?! " + data);
|
||||
@@ -201,9 +208,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
//_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receivePartialACKCount", bitfields.length, 0);
|
||||
|
||||
for (int i = 0; i < bitfields.length; i++) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Partial ACK received: " + bitfields[i]);
|
||||
_outbound.acked(bitfields[i], from.getRemotePeer());
|
||||
if (from.acked(bitfields[i])) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Final partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer());
|
||||
newAck = true;
|
||||
} else if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug("Partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -211,6 +222,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
from.ECNReceived();
|
||||
else
|
||||
from.dataReceived();
|
||||
|
||||
// Wake up the packet pusher if it is sleeping.
|
||||
// By calling add(), this also is a failsafe against possible
|
||||
// races in OutboundMessageFragments.
|
||||
if (newAck && from.getOutboundMessageCount() > 0)
|
||||
_outbound.add(from);
|
||||
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
|
@@ -1,13 +1,16 @@
|
||||
package net.i2p.router.transport.udp;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import net.i2p.data.Hash;
|
||||
import net.i2p.data.RouterInfo;
|
||||
import net.i2p.data.i2np.I2NPMessage;
|
||||
import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
@@ -23,16 +26,33 @@ import net.i2p.util.Log;
|
||||
*
|
||||
*/
|
||||
class OutboundMessageFragments {
|
||||
private RouterContext _context;
|
||||
private Log _log;
|
||||
private UDPTransport _transport;
|
||||
private final RouterContext _context;
|
||||
private final Log _log;
|
||||
private final UDPTransport _transport;
|
||||
// private ActiveThrottle _throttle; // LINT not used ??
|
||||
/** peers we are actively sending messages to */
|
||||
private final List<PeerState> _activePeers;
|
||||
|
||||
/**
|
||||
* Peers we are actively sending messages to.
|
||||
* We use the iterator so we treat it like a list,
|
||||
* but we use a HashSet so remove() is fast and
|
||||
* we don't need to do contains().
|
||||
* Even though most (but NOT all) accesses are synchronized,
|
||||
* we use a ConcurrentHashSet as the iterator is long-lived.
|
||||
*/
|
||||
private final Set<PeerState> _activePeers;
|
||||
|
||||
/**
|
||||
* The long-lived iterator over _activePeers.
|
||||
*/
|
||||
private Iterator<PeerState> _iterator;
|
||||
|
||||
/**
|
||||
* Avoid sync in add() if possible (not 100% reliable)
|
||||
*/
|
||||
private boolean _isWaiting;
|
||||
|
||||
private boolean _alive;
|
||||
/** which peer should we build the next packet out of? */
|
||||
private int _nextPeer;
|
||||
private PacketBuilder _builder;
|
||||
private final PacketBuilder _builder;
|
||||
private long _lastCycleTime = System.currentTimeMillis();
|
||||
|
||||
/** if we can handle more messages explicitly, set this to true */
|
||||
@@ -42,13 +62,14 @@ class OutboundMessageFragments {
|
||||
// private static final int MAX_ACTIVE = 64; // not used.
|
||||
// don't send a packet more than 10 times
|
||||
static final int MAX_VOLLEYS = 10;
|
||||
private static final int MAX_WAIT = 1000;
|
||||
|
||||
public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) {
|
||||
_context = ctx;
|
||||
_log = ctx.logManager().getLog(OutboundMessageFragments.class);
|
||||
_transport = transport;
|
||||
// _throttle = throttle;
|
||||
_activePeers = new ArrayList(256);
|
||||
_activePeers = new ConcurrentHashSet(256);
|
||||
_builder = new PacketBuilder(ctx, transport);
|
||||
_alive = true;
|
||||
// _allowExcess = false;
|
||||
@@ -59,6 +80,7 @@ class OutboundMessageFragments {
|
||||
_context.statManager().createRateStat("udp.sendFailed", "How many sends a failed message was pushed", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the peer's active pool", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.outboundActivePeers", "How many peers we are actively sending to", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed (time == message lifetime)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.sendSparse", "How many fragments were partially ACKed and hence not resent (time == message lifetime)", "udp", UDPTransport.RATES);
|
||||
@@ -72,20 +94,20 @@ class OutboundMessageFragments {
|
||||
}
|
||||
|
||||
public void startup() { _alive = true; }
|
||||
|
||||
public void shutdown() {
|
||||
_alive = false;
|
||||
_activePeers.clear();
|
||||
synchronized (_activePeers) {
|
||||
_activePeers.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
void dropPeer(PeerState peer) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Dropping peer " + peer.getRemotePeer().toBase64());
|
||||
peer.dropOutbound();
|
||||
synchronized (_activePeers) {
|
||||
_activePeers.remove(peer);
|
||||
_activePeers.notifyAll();
|
||||
}
|
||||
_activePeers.remove(peer);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -145,24 +167,12 @@ class OutboundMessageFragments {
|
||||
return;
|
||||
}
|
||||
int active = peer.add(state);
|
||||
synchronized (_activePeers) {
|
||||
if (!_activePeers.contains(peer)) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
|
||||
_activePeers.add(peer);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
|
||||
}
|
||||
_activePeers.notifyAll();
|
||||
}
|
||||
//msg.timestamp("made active along with: " + active);
|
||||
add(peer);
|
||||
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Error initializing " + msg);
|
||||
}
|
||||
//finishMessages();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -174,94 +184,115 @@ class OutboundMessageFragments {
|
||||
if (peer == null)
|
||||
throw new RuntimeException("wtf, null peer for " + state);
|
||||
int active = peer.add(state);
|
||||
synchronized (_activePeers) {
|
||||
if (!_activePeers.contains(peer)) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
|
||||
if (_activePeers.isEmpty())
|
||||
_lastCycleTime = System.currentTimeMillis();
|
||||
_activePeers.add(peer);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
|
||||
}
|
||||
_activePeers.notifyAll();
|
||||
}
|
||||
add(peer);
|
||||
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
|
||||
// should we finish messages here too?
|
||||
/*
|
||||
synchronized (_activeMessages) {
|
||||
_activeMessages.add(state);
|
||||
if (_activeMessages.size() == 1)
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the peer to the list of peers wanting to transmit something.
|
||||
* This wakes up the packet pusher if it is sleeping.
|
||||
*
|
||||
* Avoid synchronization where possible.
|
||||
* There are small chances of races.
|
||||
* There are larger chances of adding the PeerState "behind" where
|
||||
* the iterator is now... but these issues are the same as before concurrentification.
|
||||
*
|
||||
* @since 0.8.9
|
||||
*/
|
||||
public void add(PeerState peer) {
|
||||
boolean wasEmpty = _activePeers.isEmpty();
|
||||
boolean added = _activePeers.add(peer);
|
||||
if (added) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
|
||||
if (wasEmpty)
|
||||
_lastCycleTime = System.currentTimeMillis();
|
||||
_activeMessages.notifyAll();
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
|
||||
}
|
||||
_context.statManager().addRateData("udp.outboundActivePeers", _activePeers.size(), 0);
|
||||
|
||||
// Avoid sync if possible
|
||||
// no, this doesn't always work.
|
||||
// Also note that the iterator in getNextVolley may have alreay passed us,
|
||||
// or not reflect the addition.
|
||||
if (_isWaiting || wasEmpty) {
|
||||
synchronized (_activePeers) {
|
||||
_activePeers.notifyAll();
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove any expired or complete messages
|
||||
*/
|
||||
/****
|
||||
private void finishMessages() {
|
||||
int rv = 0;
|
||||
List peers = null;
|
||||
synchronized (_activePeers) {
|
||||
peers = new ArrayList(_activePeers.size());
|
||||
for (int i = 0; i < _activePeers.size(); i++) {
|
||||
PeerState state = _activePeers.get(i);
|
||||
if (state.getOutboundMessageCount() <= 0) {
|
||||
_activePeers.remove(i);
|
||||
i--;
|
||||
} else {
|
||||
peers.add(state);
|
||||
}
|
||||
}
|
||||
_activePeers.notifyAll();
|
||||
}
|
||||
for (int i = 0; i < peers.size(); i++) {
|
||||
PeerState state = (PeerState)peers.get(i);
|
||||
int remaining = state.finishMessages();
|
||||
if (remaining <= 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("No more pending messages for " + state.getRemotePeer().toBase64());
|
||||
}
|
||||
rv += remaining;
|
||||
}
|
||||
}
|
||||
|
||||
for (Iterator<PeerState> iter = _activePeers.iterator(); iter.hasNext(); ) {
|
||||
PeerState state = iter.next();
|
||||
if (state.getOutboundMessageCount() <= 0) {
|
||||
iter.remove();
|
||||
} else {
|
||||
int remaining = state.finishMessages();
|
||||
if (remaining <= 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("No more pending messages for " + state.getRemotePeer().toBase64());
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
****/
|
||||
|
||||
/**
|
||||
* Fetch all the packets for a message volley, blocking until there is a
|
||||
* message which can be fully transmitted (or the transport is shut down).
|
||||
* The returned array may be sparse, with null packets taking the place of
|
||||
* already ACKed fragments.
|
||||
*
|
||||
* NOT thread-safe. Called by the PacketPusher thread only.
|
||||
*
|
||||
* @return null only on shutdown
|
||||
*/
|
||||
public UDPPacket[] getNextVolley() {
|
||||
PeerState peer = null;
|
||||
OutboundMessageState state = null;
|
||||
// Keep track of how many we've looked at, since we don't start the iterator at the beginning.
|
||||
int peersProcessed = 0;
|
||||
while (_alive && (state == null) ) {
|
||||
long now = _context.clock().now();
|
||||
int nextSendDelay = -1;
|
||||
finishMessages();
|
||||
try {
|
||||
synchronized (_activePeers) {
|
||||
for (int i = 0; i < _activePeers.size(); i++) {
|
||||
int cur = (i + _nextPeer) % _activePeers.size();
|
||||
if (cur == 0) {
|
||||
// FIXME or delete, these stats aren't much help since they include the sleep time
|
||||
long ts = System.currentTimeMillis();
|
||||
long cycleTime = ts - _lastCycleTime;
|
||||
_lastCycleTime = ts;
|
||||
_context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activePeers.size());
|
||||
// make longer than the default sleep time below
|
||||
if (cycleTime > 1100)
|
||||
_context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size());
|
||||
// no, not every time - O(n**2) - do just before waiting below
|
||||
//finishMessages();
|
||||
|
||||
// do we need a new long-lived iterator?
|
||||
if (_iterator == null ||
|
||||
((!_activePeers.isEmpty()) && (!_iterator.hasNext()))) {
|
||||
_iterator = _activePeers.iterator();
|
||||
}
|
||||
|
||||
// Go through all the peers that we are actively sending messages to.
|
||||
// Call finishMessages() for each one, and remove them from the iterator
|
||||
// if there is nothing left to send.
|
||||
// Otherwise, return the volley to be sent.
|
||||
// Otherwise, wait()
|
||||
while (_iterator.hasNext()) {
|
||||
peer = _iterator.next();
|
||||
int remaining = peer.finishMessages();
|
||||
if (remaining <= 0) {
|
||||
// race with add()
|
||||
_iterator.remove();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("No more pending messages for " + peer.getRemotePeer().toBase64());
|
||||
continue;
|
||||
}
|
||||
peer = _activePeers.get(i);
|
||||
peersProcessed++;
|
||||
state = peer.allocateSend();
|
||||
if (state != null) {
|
||||
// we have something to send and we will be returning it
|
||||
_nextPeer = i + 1;
|
||||
break;
|
||||
} else if (peersProcessed >= _activePeers.size()) {
|
||||
// we've gone all the way around, time to sleep
|
||||
break;
|
||||
} else {
|
||||
// Update the minimum delay for all peers (getNextDelay() returns 1 for "now")
|
||||
@@ -270,53 +301,70 @@ class OutboundMessageFragments {
|
||||
if ( (nextSendDelay <= 0) || (delay < nextSendDelay) )
|
||||
nextSendDelay = delay;
|
||||
peer = null;
|
||||
state = null;
|
||||
}
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
|
||||
if (peer != null && _log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Done looping, next peer we are sending for: " +
|
||||
(peer != null ? peer.getRemotePeer().toBase64() : "none"));
|
||||
if (state == null) {
|
||||
peer.getRemotePeer().toBase64());
|
||||
|
||||
// if we've gone all the way through the loop, wait
|
||||
if (state == null && peersProcessed >= _activePeers.size()) {
|
||||
peersProcessed = 0;
|
||||
// why? we do this in the loop one at a time
|
||||
//finishMessages();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("wait for " + nextSendDelay);
|
||||
// wait.. or somethin'
|
||||
// wait a min of 10 and a max of 3000 ms no matter what peer.getNextDelay() says
|
||||
if (nextSendDelay > 0)
|
||||
_activePeers.wait(Math.min(Math.max(nextSendDelay, 10), 3000));
|
||||
else
|
||||
_activePeers.wait(1000);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("dont wait: alive=" + _alive + " state = " + state);
|
||||
// wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says
|
||||
_isWaiting = true;
|
||||
synchronized (_activePeers) {
|
||||
try {
|
||||
// use max of 1 second so finishMessages() and/or PeerState.finishMessages()
|
||||
// gets called regularly
|
||||
if (nextSendDelay > 0)
|
||||
_activePeers.wait(Math.min(Math.max(nextSendDelay, 10), MAX_WAIT));
|
||||
else
|
||||
_activePeers.wait(MAX_WAIT);
|
||||
} catch (InterruptedException ie) {
|
||||
// noop
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Woken up while waiting");
|
||||
}
|
||||
}
|
||||
_isWaiting = false;
|
||||
//} else {
|
||||
// if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("dont wait: alive=" + _alive + " state = " + state);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
// noop
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Woken up while waiting");
|
||||
}
|
||||
}
|
||||
|
||||
} // while alive && state == null
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Sending " + state);
|
||||
|
||||
UDPPacket packets[] = preparePackets(state, peer);
|
||||
|
||||
/****
|
||||
if ( (state != null) && (state.getMessage() != null) ) {
|
||||
int valid = 0;
|
||||
for (int i = 0; packets != null && i < packets.length ; i++)
|
||||
if (packets[i] != null)
|
||||
valid++;
|
||||
/*
|
||||
state.getMessage().timestamp("sending a volley of " + valid
|
||||
+ " lastReceived: "
|
||||
+ (_context.clock().now() - peer.getLastReceiveTime())
|
||||
+ " lastSentFully: "
|
||||
+ (_context.clock().now() - peer.getLastSendFullyTime()));
|
||||
*/
|
||||
}
|
||||
****/
|
||||
|
||||
return packets;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return null if state or peer is null
|
||||
*/
|
||||
private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) {
|
||||
if ( (state != null) && (peer != null) ) {
|
||||
int fragments = state.getFragmentCount();
|
||||
@@ -397,37 +445,6 @@ class OutboundMessageFragments {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We received an ACK of the given messageId from the given peer, so if it
|
||||
* is still unacked, mark it as complete.
|
||||
*
|
||||
* @return fragments acked
|
||||
*/
|
||||
public int acked(long messageId, Hash ackedBy) {
|
||||
PeerState peer = _transport.getPeerState(ackedBy);
|
||||
if (peer != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("acked [" + messageId + "] by " + ackedBy.toBase64());
|
||||
return peer.acked(messageId);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("acked [" + messageId + "] by an unknown remote peer? " + ackedBy.toBase64());
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public void acked(ACKBitfield bitfield, Hash ackedBy) {
|
||||
PeerState peer = _transport.getPeerState(ackedBy);
|
||||
if (peer != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("partial acked [" + bitfield + "] by " + ackedBy.toBase64());
|
||||
peer.acked(bitfield);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("partial acked [" + bitfield + "] by an unknown remote peer? " + ackedBy.toBase64());
|
||||
}
|
||||
}
|
||||
|
||||
public interface ActiveThrottle {
|
||||
public void choke(Hash peer);
|
||||
public void unchoke(Hash peer);
|
||||
|
@@ -12,7 +12,7 @@ import net.i2p.util.ByteCache;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Maintain the outbound fragmentation for resending
|
||||
* Maintain the outbound fragmentation for resending, for a single message.
|
||||
*
|
||||
*/
|
||||
class OutboundMessageState {
|
||||
|
@@ -11,10 +11,10 @@ import net.i2p.util.Log;
|
||||
*/
|
||||
class PacketPusher implements Runnable {
|
||||
// private RouterContext _context;
|
||||
private Log _log;
|
||||
private OutboundMessageFragments _fragments;
|
||||
private UDPSender _sender;
|
||||
private boolean _alive;
|
||||
private final Log _log;
|
||||
private final OutboundMessageFragments _fragments;
|
||||
private final UDPSender _sender;
|
||||
private volatile boolean _alive;
|
||||
|
||||
public PacketPusher(RouterContext ctx, OutboundMessageFragments fragments, UDPSender sender) {
|
||||
// _context = ctx;
|
||||
|
@@ -24,8 +24,8 @@ import net.i2p.util.ConcurrentHashSet;
|
||||
*
|
||||
*/
|
||||
class PeerState {
|
||||
private RouterContext _context;
|
||||
private Log _log;
|
||||
private final RouterContext _context;
|
||||
private final Log _log;
|
||||
/**
|
||||
* The peer are we talking to. This should be set as soon as this
|
||||
* state is created if we are initiating a connection, but if we are
|
||||
@@ -192,7 +192,7 @@ class PeerState {
|
||||
/** which outbound message is currently being retransmitted */
|
||||
private OutboundMessageState _retransmitter;
|
||||
|
||||
private UDPTransport _transport;
|
||||
private final UDPTransport _transport;
|
||||
|
||||
/** have we migrated away from this peer to another newer one? */
|
||||
private volatile boolean _dead;
|
||||
@@ -268,18 +268,7 @@ class PeerState {
|
||||
_rttDeviation = _rtt;
|
||||
_inboundMessages = new HashMap(8);
|
||||
_outboundMessages = new ArrayList(32);
|
||||
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES);
|
||||
// all createRateStat() moved to EstablishmentManager
|
||||
}
|
||||
|
||||
private int getDefaultMTU() {
|
||||
@@ -1061,7 +1050,6 @@ class PeerState {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
|
||||
List<OutboundMessageState> msgs = _outboundMessages;
|
||||
if (msgs == null) return 0;
|
||||
int rv = 0;
|
||||
boolean fail = false;
|
||||
synchronized (msgs) {
|
||||
@@ -1070,11 +1058,14 @@ class PeerState {
|
||||
// 32 queued messages? to *one* peer? nuh uh.
|
||||
fail = true;
|
||||
rv--;
|
||||
|
||||
/******* proactive tail drop disabled by jr 2006-04-19 so all this is pointless
|
||||
|
||||
} else if (_retransmitter != null) {
|
||||
long lifetime = _retransmitter.getLifetime();
|
||||
long totalLifetime = lifetime;
|
||||
for (int i = 1; i < msgs.size(); i++) { // skip the first, as thats the retransmitter
|
||||
OutboundMessageState cur = (OutboundMessageState)msgs.get(i);
|
||||
OutboundMessageState cur = msgs.get(i);
|
||||
totalLifetime += cur.getLifetime();
|
||||
}
|
||||
long remaining = -1;
|
||||
@@ -1103,6 +1094,9 @@ class PeerState {
|
||||
_context.statManager().addRateData("udp.queueAllowTotalLifetime", totalLifetime, lifetime);
|
||||
msgs.add(state);
|
||||
}
|
||||
|
||||
*******/
|
||||
|
||||
} else {
|
||||
msgs.add(state);
|
||||
}
|
||||
@@ -1111,6 +1105,7 @@ class PeerState {
|
||||
_transport.failed(state, false);
|
||||
return rv;
|
||||
}
|
||||
|
||||
/** drop all outbound messages */
|
||||
public void dropOutbound() {
|
||||
//if (_dead) return;
|
||||
@@ -1118,7 +1113,7 @@ class PeerState {
|
||||
List<OutboundMessageState> msgs = _outboundMessages;
|
||||
//_outboundMessages = null;
|
||||
_retransmitter = null;
|
||||
if (msgs != null) {
|
||||
|
||||
int sz = 0;
|
||||
List<OutboundMessageState> tempList = null;
|
||||
synchronized (msgs) {
|
||||
@@ -1130,21 +1125,17 @@ class PeerState {
|
||||
}
|
||||
for (int i = 0; i < sz; i++)
|
||||
_transport.failed(tempList.get(i), false);
|
||||
}
|
||||
|
||||
// so the ACKSender will drop this peer from its queue
|
||||
_wantACKSendSince = -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return number of active outbound messages remaining (unsynchronized)
|
||||
*/
|
||||
public int getOutboundMessageCount() {
|
||||
List<OutboundMessageState> msgs = _outboundMessages;
|
||||
if (_dead) return 0;
|
||||
if (msgs != null) {
|
||||
synchronized (msgs) {
|
||||
return msgs.size();
|
||||
}
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
return _outboundMessages.size();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1152,39 +1143,37 @@ class PeerState {
|
||||
* @return number of active outbound messages remaining
|
||||
*/
|
||||
public int finishMessages() {
|
||||
int rv = 0;
|
||||
List<OutboundMessageState> msgs = _outboundMessages;
|
||||
// short circuit, unsynchronized
|
||||
if (msgs.isEmpty())
|
||||
return 0;
|
||||
|
||||
if (_dead) {
|
||||
dropOutbound();
|
||||
return 0;
|
||||
}
|
||||
|
||||
int rv = 0;
|
||||
List<OutboundMessageState> succeeded = null;
|
||||
List<OutboundMessageState> failed = null;
|
||||
synchronized (msgs) {
|
||||
int size = msgs.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
OutboundMessageState state = msgs.get(i);
|
||||
for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
|
||||
OutboundMessageState state = iter.next();
|
||||
if (state.isComplete()) {
|
||||
msgs.remove(i);
|
||||
i--;
|
||||
size--;
|
||||
iter.remove();
|
||||
if (_retransmitter == state)
|
||||
_retransmitter = null;
|
||||
if (succeeded == null) succeeded = new ArrayList(4);
|
||||
succeeded.add(state);
|
||||
} else if (state.isExpired()) {
|
||||
msgs.remove(i);
|
||||
i--;
|
||||
size--;
|
||||
iter.remove();
|
||||
if (_retransmitter == state)
|
||||
_retransmitter = null;
|
||||
_context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime());
|
||||
if (failed == null) failed = new ArrayList(4);
|
||||
failed.add(state);
|
||||
} else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
|
||||
msgs.remove(i);
|
||||
i--;
|
||||
size--;
|
||||
iter.remove();
|
||||
if (state == _retransmitter)
|
||||
_retransmitter = null;
|
||||
_context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime());
|
||||
@@ -1232,9 +1221,7 @@ class PeerState {
|
||||
List<OutboundMessageState> msgs = _outboundMessages;
|
||||
if (_dead) return null;
|
||||
synchronized (msgs) {
|
||||
int size = msgs.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
OutboundMessageState state = msgs.get(i);
|
||||
for (OutboundMessageState state : msgs) {
|
||||
if (locked_shouldSend(state)) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId());
|
||||
@@ -1276,9 +1263,7 @@ class PeerState {
|
||||
else
|
||||
return rv;
|
||||
}
|
||||
int size = msgs.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
OutboundMessageState state = msgs.get(i);
|
||||
for (OutboundMessageState state : msgs) {
|
||||
int delay = (int)(state.getNextSendTime() - now);
|
||||
if (delay <= 0)
|
||||
delay = 1;
|
||||
@@ -1393,16 +1378,20 @@ class PeerState {
|
||||
return false;
|
||||
}
|
||||
|
||||
public int acked(long messageId) {
|
||||
/**
|
||||
* A full ACK was received.
|
||||
*
|
||||
* @return true if the message was acked for the first time
|
||||
*/
|
||||
public boolean acked(long messageId) {
|
||||
if (_dead) return false;
|
||||
OutboundMessageState state = null;
|
||||
List<OutboundMessageState> msgs = _outboundMessages;
|
||||
if (_dead) return 0;
|
||||
synchronized (msgs) {
|
||||
int sz = msgs.size();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
state = msgs.get(i);
|
||||
for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
|
||||
state = iter.next();
|
||||
if (state.getMessageId() == messageId) {
|
||||
msgs.remove(i);
|
||||
iter.remove();
|
||||
break;
|
||||
} else {
|
||||
state = null;
|
||||
@@ -1438,22 +1427,25 @@ class PeerState {
|
||||
// _throttle.unchoke(peer.getRemotePeer());
|
||||
|
||||
state.releaseResources();
|
||||
return numFragments;
|
||||
} else {
|
||||
// dupack, likely
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Received an ACK for a message not pending: " + messageId);
|
||||
return 0;
|
||||
}
|
||||
return state != null;
|
||||
}
|
||||
|
||||
public void acked(ACKBitfield bitfield) {
|
||||
/**
|
||||
* A partial ACK was received. This is much less common than full ACKs.
|
||||
*
|
||||
* @return true if the message was completely acked for the first time
|
||||
*/
|
||||
public boolean acked(ACKBitfield bitfield) {
|
||||
if (_dead)
|
||||
return;
|
||||
return false;
|
||||
|
||||
if (bitfield.receivedComplete()) {
|
||||
acked(bitfield.getMessageId());
|
||||
return;
|
||||
return acked(bitfield.getMessageId());
|
||||
}
|
||||
|
||||
List<OutboundMessageState> msgs = _outboundMessages;
|
||||
@@ -1461,13 +1453,13 @@ class PeerState {
|
||||
OutboundMessageState state = null;
|
||||
boolean isComplete = false;
|
||||
synchronized (msgs) {
|
||||
for (int i = 0; i < msgs.size(); i++) {
|
||||
state = msgs.get(i);
|
||||
for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
|
||||
state = iter.next();
|
||||
if (state.getMessageId() == bitfield.getMessageId()) {
|
||||
boolean complete = state.acked(bitfield);
|
||||
if (complete) {
|
||||
isComplete = true;
|
||||
msgs.remove(i);
|
||||
iter.remove();
|
||||
if (state == _retransmitter)
|
||||
_retransmitter = null;
|
||||
}
|
||||
@@ -1514,12 +1506,12 @@ class PeerState {
|
||||
//if (state.getMessage() != null)
|
||||
// state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString());
|
||||
}
|
||||
return;
|
||||
return isComplete;
|
||||
} else {
|
||||
// dupack
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Received an ACK for a message not pending: " + bitfield);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -16,13 +16,13 @@ import net.i2p.util.Log;
|
||||
*
|
||||
*/
|
||||
class UDPSender {
|
||||
private RouterContext _context;
|
||||
private Log _log;
|
||||
private final RouterContext _context;
|
||||
private final Log _log;
|
||||
private DatagramSocket _socket;
|
||||
private String _name;
|
||||
private final BlockingQueue<UDPPacket> _outboundQueue;
|
||||
private boolean _keepRunning;
|
||||
private Runner _runner;
|
||||
private final Runner _runner;
|
||||
private static final int TYPE_POISON = 99999;
|
||||
|
||||
//private static final int MAX_QUEUED = 4;
|
||||
|
@@ -345,9 +345,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
_refiller.shutdown();
|
||||
if (_handler != null)
|
||||
_handler.shutdown();
|
||||
_fragments.shutdown();
|
||||
if (_pusher != null)
|
||||
_pusher.shutdown();
|
||||
_fragments.shutdown();
|
||||
if (_establisher != null)
|
||||
_establisher.shutdown();
|
||||
_inboundFragments.shutdown();
|
||||
|
Reference in New Issue
Block a user