forked from I2P_Developers/i2p.i2p
Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
e4cd37f2a7 |
@@ -25,7 +25,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
|||||||
private DecayingBloomFilter _recentlyCompletedMessages;
|
private DecayingBloomFilter _recentlyCompletedMessages;
|
||||||
private final OutboundMessageFragments _outbound;
|
private final OutboundMessageFragments _outbound;
|
||||||
private final UDPTransport _transport;
|
private final UDPTransport _transport;
|
||||||
private final ACKSender _ackSender;
|
|
||||||
private final MessageReceiver _messageReceiver;
|
private final MessageReceiver _messageReceiver;
|
||||||
private volatile boolean _alive;
|
private volatile boolean _alive;
|
||||||
|
|
||||||
@@ -38,7 +37,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
|||||||
//_inboundMessages = new HashMap(64);
|
//_inboundMessages = new HashMap(64);
|
||||||
_outbound = outbound;
|
_outbound = outbound;
|
||||||
_transport = transport;
|
_transport = transport;
|
||||||
_ackSender = new ACKSender(_context, _transport);
|
|
||||||
_messageReceiver = new MessageReceiver(_context, _transport);
|
_messageReceiver = new MessageReceiver(_context, _transport);
|
||||||
_context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", UDPTransport.RATES);
|
_context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", UDPTransport.RATES);
|
||||||
_context.statManager().createRateStat("udp.receivedCompleteFragments", "How many fragments go in a fully received message", "udp", UDPTransport.RATES);
|
_context.statManager().createRateStat("udp.receivedCompleteFragments", "How many fragments go in a fully received message", "udp", UDPTransport.RATES);
|
||||||
@@ -55,7 +53,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
|||||||
// array size (currently its tuned for 10 minute rates for the
|
// array size (currently its tuned for 10 minute rates for the
|
||||||
// messageValidator)
|
// messageValidator)
|
||||||
_recentlyCompletedMessages = new DecayingHashSet(_context, DECAY_PERIOD, 4, "UDPIMF");
|
_recentlyCompletedMessages = new DecayingHashSet(_context, DECAY_PERIOD, 4, "UDPIMF");
|
||||||
_ackSender.startup();
|
|
||||||
_messageReceiver.startup();
|
_messageReceiver.startup();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -64,7 +61,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
|||||||
if (_recentlyCompletedMessages != null)
|
if (_recentlyCompletedMessages != null)
|
||||||
_recentlyCompletedMessages.stopDecaying();
|
_recentlyCompletedMessages.stopDecaying();
|
||||||
_recentlyCompletedMessages = null;
|
_recentlyCompletedMessages = null;
|
||||||
_ackSender.shutdown();
|
|
||||||
_messageReceiver.shutdown();
|
_messageReceiver.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,7 +123,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
|||||||
if (data.readMessageFragmentNum(i) == 0) {
|
if (data.readMessageFragmentNum(i) == 0) {
|
||||||
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1);
|
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1);
|
||||||
from.messageFullyReceived(messageId, -1);
|
from.messageFullyReceived(messageId, -1);
|
||||||
_ackSender.ackPeer(from);
|
|
||||||
if (_log.shouldLog(Log.INFO))
|
if (_log.shouldLog(Log.INFO))
|
||||||
_log.info("Message received is a dup: " + mid + " dups: "
|
_log.info("Message received is a dup: " + mid + " dups: "
|
||||||
+ _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of "
|
+ _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of "
|
||||||
@@ -174,7 +169,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
|||||||
if (messageComplete) {
|
if (messageComplete) {
|
||||||
_recentlyCompletedMessages.add(mid);
|
_recentlyCompletedMessages.add(mid);
|
||||||
from.messageFullyReceived(messageId, state.getCompleteSize());
|
from.messageFullyReceived(messageId, state.getCompleteSize());
|
||||||
_ackSender.ackPeer(from);
|
|
||||||
|
|
||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Message received completely! " + state);
|
_log.debug("Message received completely! " + state);
|
||||||
@@ -196,7 +190,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
|||||||
if (_log.shouldLog(Log.DEBUG))
|
if (_log.shouldLog(Log.DEBUG))
|
||||||
_log.debug("Queueing up a partial ACK for peer: " + from + " for " + state);
|
_log.debug("Queueing up a partial ACK for peer: " + from + " for " + state);
|
||||||
from.messagePartiallyReceived();
|
from.messagePartiallyReceived();
|
||||||
_ackSender.ackPeer(from);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Why give up on other fragments if one is bad?
|
// TODO: Why give up on other fragments if one is bad?
|
||||||
|
@@ -502,6 +502,8 @@ class OutboundMessageFragments {
|
|||||||
|
|
||||||
int sent = rv.size();
|
int sent = rv.size();
|
||||||
peer.packetsTransmitted(sent);
|
peer.packetsTransmitted(sent);
|
||||||
|
if (newFullAckCount <= 0)
|
||||||
|
peer.clearWantedACKSendSince();
|
||||||
if (_log.shouldDebug())
|
if (_log.shouldDebug())
|
||||||
_log.debug("Sent " + fragmentsToSend + " fragments of " + states.size() +
|
_log.debug("Sent " + fragmentsToSend + " fragments of " + states.size() +
|
||||||
" messages in " + sent + " packets to " + peer);
|
" messages in " + sent + " packets to " + peer);
|
||||||
|
@@ -25,8 +25,9 @@ import net.i2p.router.util.CachedIteratorCollection;
|
|||||||
import net.i2p.router.util.CoDelPriorityBlockingQueue;
|
import net.i2p.router.util.CoDelPriorityBlockingQueue;
|
||||||
import net.i2p.router.util.PriBlockingQueue;
|
import net.i2p.router.util.PriBlockingQueue;
|
||||||
import net.i2p.util.BandwidthEstimator;
|
import net.i2p.util.BandwidthEstimator;
|
||||||
import net.i2p.util.Log;
|
|
||||||
import net.i2p.util.ConcurrentHashSet;
|
import net.i2p.util.ConcurrentHashSet;
|
||||||
|
import net.i2p.util.Log;
|
||||||
|
import net.i2p.util.SimpleTimer2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Contain all of the state about a UDP connection to a peer.
|
* Contain all of the state about a UDP connection to a peer.
|
||||||
@@ -712,15 +713,27 @@ public class PeerState {
|
|||||||
_receiveBytes = 0;
|
_receiveBytes = 0;
|
||||||
_receivePeriodBegin = now;
|
_receivePeriodBegin = now;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_wantACKSendSince <= 0)
|
|
||||||
_wantACKSendSince = now;
|
|
||||||
_currentACKs.add(messageId);
|
_currentACKs.add(messageId);
|
||||||
|
messagePartiallyReceived(now);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We received a partial message, or we want to send some acks.
|
||||||
|
*/
|
||||||
void messagePartiallyReceived() {
|
void messagePartiallyReceived() {
|
||||||
if (_wantACKSendSince <= 0)
|
messagePartiallyReceived(_context.clock().now());
|
||||||
_wantACKSendSince = _context.clock().now();
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We received a partial message, or we want to send some acks.
|
||||||
|
* @since 0.9.52
|
||||||
|
*/
|
||||||
|
private synchronized void messagePartiallyReceived(long now) {
|
||||||
|
if (_wantACKSendSince <= 0) {
|
||||||
|
_wantACKSendSince = now;
|
||||||
|
// todo keep the same timer
|
||||||
|
new ACKTimer(now);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -891,7 +904,9 @@ public class PeerState {
|
|||||||
* See above. Only called by ACKSender with alwaysIncludeRetransmissions = false.
|
* See above. Only called by ACKSender with alwaysIncludeRetransmissions = false.
|
||||||
* So this is only for ACK-only packets, so all the size limiting is useless.
|
* So this is only for ACK-only packets, so all the size limiting is useless.
|
||||||
* FIXME.
|
* FIXME.
|
||||||
* Side effect - sets _lastACKSend if rv is non-empty
|
*
|
||||||
|
* Side effect - sets _lastACKSend to now if rv is non-empty.
|
||||||
|
* Side effect - sets _wantACKSendSince to -1 if _currentACKs is now empty.
|
||||||
*
|
*
|
||||||
* @return non-null, possibly empty
|
* @return non-null, possibly empty
|
||||||
*/
|
*/
|
||||||
@@ -924,7 +939,7 @@ public class PeerState {
|
|||||||
bytesRemaining -= 4;
|
bytesRemaining -= 4;
|
||||||
}
|
}
|
||||||
if (_currentACKs.isEmpty())
|
if (_currentACKs.isEmpty())
|
||||||
_wantACKSendSince = -1;
|
_wantACKSendSince = 0;
|
||||||
if (alwaysIncludeRetransmissions || !rv.isEmpty()) {
|
if (alwaysIncludeRetransmissions || !rv.isEmpty()) {
|
||||||
List<Long> randomResends = getCurrentResendACKs();
|
List<Long> randomResends = getCurrentResendACKs();
|
||||||
// now repeat by putting in some old ACKs
|
// now repeat by putting in some old ACKs
|
||||||
@@ -1282,8 +1297,20 @@ public class PeerState {
|
|||||||
@Deprecated
|
@Deprecated
|
||||||
public void setLastACKSend(long when) { _lastACKSend = when; }
|
public void setLastACKSend(long when) { _lastACKSend = when; }
|
||||||
|
|
||||||
|
/** ACKSender only, to be removed
|
||||||
|
* @deprecated unused
|
||||||
|
*/
|
||||||
public long getWantedACKSendSince() { return _wantACKSendSince; }
|
public long getWantedACKSendSince() { return _wantACKSendSince; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* All acks have been sent.
|
||||||
|
* @since 0.9.52
|
||||||
|
*/
|
||||||
|
synchronized void clearWantedACKSendSince() {
|
||||||
|
_wantACKSendSince = 0;
|
||||||
|
// TODO we could also cancel ACKTimer if we keep a ref to it
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Are we out of room to send all the current unsent acks in a single packet?
|
* Are we out of room to send all the current unsent acks in a single packet?
|
||||||
* This is a huge threshold (134 for small MTU and 255 for large MTU)
|
* This is a huge threshold (134 for small MTU and 255 for large MTU)
|
||||||
@@ -1372,7 +1399,7 @@ public class PeerState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// so the ACKSender will drop this peer from its queue
|
// so the ACKSender will drop this peer from its queue
|
||||||
_wantACKSendSince = -1;
|
_wantACKSendSince = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -2087,6 +2114,86 @@ public class PeerState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static long ackFrequency(long timeSinceACK, long rtt) {
|
||||||
|
// if we are actively pumping lots of data to them, we can depend upon
|
||||||
|
// the unsentACKThreshold to figure out when to send an ACK instead of
|
||||||
|
// using the timer, so we can set the timeout/frequency higher
|
||||||
|
// TODO move constant to PS
|
||||||
|
if (timeSinceACK < 2*1000)
|
||||||
|
return Math.min(rtt/2, ACKSender.ACK_FREQUENCY);
|
||||||
|
else
|
||||||
|
return ACKSender.ACK_FREQUENCY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A timer to send an ack-only packet.
|
||||||
|
* @since 0.9.52
|
||||||
|
*/
|
||||||
|
private class ACKTimer extends SimpleTimer2.TimedEvent {
|
||||||
|
public ACKTimer(long now) {
|
||||||
|
super(_context.simpleTimer2());
|
||||||
|
long delta = ackFrequency(_lastACKSend, _rtt);
|
||||||
|
if (_log.shouldDebug())
|
||||||
|
_log.debug("Sending delayed ack in " + delta + ": " + PeerState.this);
|
||||||
|
schedule(delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send an ack-only packet, unless acks were already sent
|
||||||
|
* as indicated by _wantACKSendSince == 0.
|
||||||
|
* Will not requeue unless the acks don't all fit (unlikely).
|
||||||
|
*/
|
||||||
|
public void timeReached() {
|
||||||
|
synchronized(PeerState.this) {
|
||||||
|
long wanted = _wantACKSendSince;
|
||||||
|
if (wanted <= 0) {
|
||||||
|
if (_log.shouldDebug())
|
||||||
|
_log.debug("Already acked:" + PeerState.this);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<ACKBitfield> ackBitfields = retrieveACKBitfields(false);
|
||||||
|
|
||||||
|
// TODO move stats
|
||||||
|
if (!ackBitfields.isEmpty()) {
|
||||||
|
_context.statManager().addRateData("udp.sendACKCount", ackBitfields.size());
|
||||||
|
// todo do we need this stat?
|
||||||
|
/*
|
||||||
|
long now = _context.clock().now();
|
||||||
|
long lastSend = _lastACKSend;
|
||||||
|
if (lastSend < 0)
|
||||||
|
lastSend = now - 1;
|
||||||
|
_context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted);
|
||||||
|
*/
|
||||||
|
// todo just create it once, put it in the transport
|
||||||
|
PacketBuilder builder = new PacketBuilder(_context, _transport);
|
||||||
|
UDPPacket ack = builder.buildACK(PeerState.this, ackBitfields);
|
||||||
|
ack.markType(1);
|
||||||
|
ack.setFragmentCount(-1);
|
||||||
|
ack.setMessageType(PacketBuilder.TYPE_ACK);
|
||||||
|
|
||||||
|
if (_log.shouldDebug())
|
||||||
|
_log.debug("Sending " + ackBitfields.size() + " acks to " + PeerState.this);
|
||||||
|
// locking issues, we ignore the result, and acks are small,
|
||||||
|
// so don't even bother allocating
|
||||||
|
//peer.allocateSendingBytes(ack.getPacket().getLength(), true);
|
||||||
|
// ignore whether its ok or not, its a bloody ack. this should be fixed, probably.
|
||||||
|
_transport.send(ack);
|
||||||
|
|
||||||
|
if (_wantACKSendSince > 0) {
|
||||||
|
// still full packets left to be ACKed, since wanted time
|
||||||
|
// is reset by retrieveACKBitfields when all of the IDs are
|
||||||
|
// removed
|
||||||
|
if (_log.shouldInfo())
|
||||||
|
_log.info("Requeueing more ACKs for " + PeerState.this);
|
||||||
|
reschedule(25);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
_context.statManager().addRateData("udp.abortACK", 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
|
// why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
Reference in New Issue
Block a user