Transports: SSU1 removal part 6/n

This commit is contained in:
zzz
2024-05-18 11:50:30 +00:00
parent 84dbf53f95
commit 7f3db25080
9 changed files with 22 additions and 2430 deletions

View File

@@ -990,17 +990,11 @@ class EstablishmentManager {
RouterIdentity remote = state.getConfirmedIdentity(); RouterIdentity remote = state.getConfirmedIdentity();
PeerState peer; PeerState peer;
int version = state.getVersion(); int version = state.getVersion();
if (version == 1) {
peer = new PeerState(_context, _transport,
state.getSentIP(), state.getSentPort(), remote.calculateHash(), true, state.getRTT(),
state.getCipherKey(), state.getMACKey());
peer.setWeRelayToThemAs(state.getSentRelayTag());
} else {
InboundEstablishState2 state2 = (InboundEstablishState2) state; InboundEstablishState2 state2 = (InboundEstablishState2) state;
peer = state2.getPeerState(); peer = state2.getPeerState();
// now handled in IES2.createPeerState() // now handled in IES2.createPeerState()
//peer.setWeRelayToThemAs(state.getSentRelayTag()); //peer.setWeRelayToThemAs(state.getSentRelayTag());
}
if (version == 1) { if (version == 1) {
// Lookup the peer's MTU from the netdb, since it isn't included in the protocol setup (yet) // Lookup the peer's MTU from the netdb, since it isn't included in the protocol setup (yet)
@@ -1125,20 +1119,11 @@ class EstablishmentManager {
if (claimed != null) if (claimed != null)
_outboundByClaimedAddress.remove(claimed, state); _outboundByClaimedAddress.remove(claimed, state);
_outboundByHash.remove(remote.calculateHash(), state); _outboundByHash.remove(remote.calculateHash(), state);
int version = state.getVersion();
PeerState peer;
if (version == 1) {
peer = new PeerState(_context, _transport,
state.getSentIP(), state.getSentPort(), remote.calculateHash(), false, state.getRTT(),
state.getCipherKey(), state.getMACKey());
int mtu = state.getRemoteAddress().getMTU();
if (mtu > 0)
peer.setHisMTU(mtu);
} else {
OutboundEstablishState2 state2 = (OutboundEstablishState2) state; OutboundEstablishState2 state2 = (OutboundEstablishState2) state;
// OES2 sets PS2 MTU // OES2 sets PS2 MTU
peer = state2.getPeerState(); PeerState peer = state2.getPeerState();
}
peer.setTheyRelayToUsAs(state.getReceivedRelayTag()); peer.setTheyRelayToUsAs(state.getReceivedRelayTag());
// 0 is the default // 0 is the default
//peer.setWeRelayToThemAs(0); //peer.setWeRelayToThemAs(0);
@@ -1153,12 +1138,6 @@ class EstablishmentManager {
_context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(now)); _context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(now));
DatabaseStoreMessage dbsm = null; DatabaseStoreMessage dbsm = null;
if (version == 1) {
// version 2 sends our RI in handshake
if (!state.isFirstMessageOurDSM()) {
dbsm = getOurInfo();
}
}
List<OutNetMessage> msgs = new ArrayList<OutNetMessage>(8); List<OutNetMessage> msgs = new ArrayList<OutNetMessage>(8);
OutNetMessage msg; OutNetMessage msg;

View File

@@ -53,8 +53,6 @@ class OutboundMessageFragments {
private Iterator<PeerState> _iterator; private Iterator<PeerState> _iterator;
private volatile boolean _alive; private volatile boolean _alive;
private final PacketBuilder _builder;
// null if SSU2 not enabled
private final PacketBuilder2 _builder2; private final PacketBuilder2 _builder2;
/** if we can handle more messages explicitly, set this to true */ /** if we can handle more messages explicitly, set this to true */
@@ -72,7 +70,6 @@ class OutboundMessageFragments {
_transport = transport; _transport = transport;
// _throttle = throttle; // _throttle = throttle;
_activePeers = new ConcurrentHashSet<PeerState>(256); _activePeers = new ConcurrentHashSet<PeerState>(256);
_builder = transport.getBuilder();
_builder2 = transport.getBuilder2(); _builder2 = transport.getBuilder2();
_alive = true; _alive = true;
// _allowExcess = false; // _allowExcess = false;
@@ -392,34 +389,6 @@ class OutboundMessageFragments {
if (states == null || peer == null) if (states == null || peer == null)
return null; return null;
List<Long> msgIds;
int newFullAckCount;
List<ACKBitfield> partialACKBitfields;
int piggybackedPartialACK;
Set<Long> remaining;
int before;
if (peer.getVersion() == 1) {
// ok, simplest possible thing is to always tack on the bitfields if
msgIds = peer.getCurrentFullACKs();
newFullAckCount = msgIds.size();
msgIds.addAll(peer.getCurrentResendACKs());
partialACKBitfields = new ArrayList<ACKBitfield>();
peer.fetchPartialACKs(partialACKBitfields);
piggybackedPartialACK = partialACKBitfields.size();
// getCurrentFullACKs() already makes a copy, do we need to copy again?
// YES because buildPacket() now removes them (maybe)
remaining = new HashSet<Long>(msgIds);
before = remaining.size();
} else {
// all unused
msgIds = null;
newFullAckCount = 0;
partialACKBitfields = null;
piggybackedPartialACK = 0;
remaining = null;
before = 0;
}
// build the list of fragments to send // build the list of fragments to send
List<Fragment> toSend = new ArrayList<Fragment>(8); List<Fragment> toSend = new ArrayList<Fragment>(8);
for (OutboundMessageState state : states) { for (OutboundMessageState state : states) {
@@ -427,10 +396,7 @@ class OutboundMessageFragments {
// per-state stats // per-state stats
if (queued > 0 && state.getMaxSends() > 1) { if (queued > 0 && state.getMaxSends() > 1) {
int maxPktSz = state.fragmentSize(0); int maxPktSz = state.fragmentSize(0);
if (peer.getVersion() == 1) maxPktSz += SSU2Payload.BLOCK_HEADER_SIZE +
maxPktSz += (peer.isIPv6() ? PacketBuilder.MIN_IPV6_DATA_PACKET_OVERHEAD : PacketBuilder.MIN_DATA_PACKET_OVERHEAD);
else
maxPktSz += SSU2Payload.BLOCK_HEADER_SIZE +
(peer.isIPv6() ? PacketBuilder2.MIN_IPV6_DATA_PACKET_OVERHEAD : PacketBuilder2.MIN_DATA_PACKET_OVERHEAD); (peer.isIPv6() ? PacketBuilder2.MIN_IPV6_DATA_PACKET_OVERHEAD : PacketBuilder2.MIN_DATA_PACKET_OVERHEAD);
peer.messageRetransmitted(queued, maxPktSz); peer.messageRetransmitted(queued, maxPktSz);
// _packetsRetransmitted += toSend; // lifetime for the transport // _packetsRetransmitted += toSend; // lifetime for the transport
@@ -466,24 +432,18 @@ class OutboundMessageFragments {
if (_log.shouldDebug()) if (_log.shouldDebug())
_log.debug("Building packet for " + next + " to " + peer); _log.debug("Building packet for " + next + " to " + peer);
int curTotalDataSize = state.fragmentSize(next.num); int curTotalDataSize = state.fragmentSize(next.num);
if (peer.getVersion() > 1) { curTotalDataSize += SSU2Util.FIRST_FRAGMENT_HEADER_SIZE;
curTotalDataSize += SSU2Util.FIRST_FRAGMENT_HEADER_SIZE; if (next.num > 0)
if (next.num > 0) curTotalDataSize += SSU2Util.DATA_FOLLOWON_EXTRA_SIZE;
curTotalDataSize += SSU2Util.DATA_FOLLOWON_EXTRA_SIZE;
}
// now stuff in more fragments if they fit // now stuff in more fragments if they fit
if (i +1 < toSend.size()) { if (i +1 < toSend.size()) {
int maxAvail; int maxAvail = PacketBuilder2.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
if (peer.getVersion() == 1)
maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
else
maxAvail = PacketBuilder2.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
// if less than 16, just use it for acks, don't even try to look for a tiny fragment // if less than 16, just use it for acks, don't even try to look for a tiny fragment
if (maxAvail >= 16) { if (maxAvail >= 16) {
for (int j = i + 1; j < toSend.size(); j++) { for (int j = i + 1; j < toSend.size(); j++) {
next = toSend.get(j); next = toSend.get(j);
int nextDataSize = next.state.fragmentSize(next.num); int nextDataSize = next.state.fragmentSize(next.num);
if (next.num > 0 && peer.getVersion() > 1) if (next.num > 0)
nextDataSize += SSU2Util.DATA_FOLLOWON_EXTRA_SIZE; nextDataSize += SSU2Util.DATA_FOLLOWON_EXTRA_SIZE;
//if (PacketBuilder.canFitAnotherFragment(peer, sendNext.size(), curTotalDataSize, nextDataSize)) { //if (PacketBuilder.canFitAnotherFragment(peer, sendNext.size(), curTotalDataSize, nextDataSize)) {
//if (_builder.canFitAnotherFragment(peer, sendNext.size(), curTotalDataSize, nextDataSize)) { //if (_builder.canFitAnotherFragment(peer, sendNext.size(), curTotalDataSize, nextDataSize)) {
@@ -493,10 +453,7 @@ class OutboundMessageFragments {
j--; j--;
sendNext.add(next); sendNext.add(next);
curTotalDataSize += nextDataSize; curTotalDataSize += nextDataSize;
if (peer.getVersion() == 1) maxAvail = PacketBuilder2.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
else
maxAvail = PacketBuilder2.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Adding in additional " + next + " to " + peer); _log.info("Adding in additional " + next + " to " + peer);
// if less than 16, just use it for acks, don't even try to look for a tiny fragment // if less than 16, just use it for acks, don't even try to look for a tiny fragment
@@ -508,15 +465,11 @@ class OutboundMessageFragments {
} }
UDPPacket pkt; UDPPacket pkt;
if (peer.getVersion() == 1) {
pkt = _builder.buildPacket(sendNext, peer, remaining, newFullAckCount, partialACKBitfields);
} else {
try { try {
pkt = _builder2.buildPacket(sendNext, (PeerState2) peer); pkt = _builder2.buildPacket(sendNext, (PeerState2) peer);
} catch (IOException ioe) { } catch (IOException ioe) {
pkt = null; pkt = null;
} }
}
if (pkt != null) { if (pkt != null) {
if (_log.shouldDebug()) if (_log.shouldDebug())
_log.debug("Built packet with " + sendNext.size() + " fragments totalling " + curTotalDataSize + _log.debug("Built packet with " + sendNext.size() + " fragments totalling " + curTotalDataSize +
@@ -530,25 +483,6 @@ class OutboundMessageFragments {
} }
rv.add(pkt); rv.add(pkt);
if (peer.getVersion() == 1) {
int after = remaining.size();
newFullAckCount = Math.max(0, newFullAckCount - (before - after));
int piggybackedAck = 0;
if (msgIds.size() != remaining.size()) {
for (int j = 0; j < msgIds.size(); j++) {
Long id = msgIds.get(j);
if (!remaining.contains(id)) {
peer.removeACKMessage(id);
piggybackedAck++;
}
}
}
if (piggybackedAck > 0)
_context.statManager().addRateData("udp.sendPiggyback", piggybackedAck);
if (piggybackedPartialACK - partialACKBitfields.size() > 0)
_context.statManager().addRateData("udp.sendPiggybackPartial", piggybackedPartialACK - partialACKBitfields.size(), state.getLifetime());
}
// following for debugging and stats // following for debugging and stats
pkt.setFragmentCount(sendNext.size()); pkt.setFragmentCount(sendNext.size());
pkt.setMessageType(msgType); //type of first fragment pkt.setMessageType(msgType); //type of first fragment
@@ -559,8 +493,7 @@ class OutboundMessageFragments {
int sent = rv.size(); int sent = rv.size();
peer.packetsTransmitted(sent); peer.packetsTransmitted(sent);
if (newFullAckCount <= 0) peer.clearWantedACKSendSince();
peer.clearWantedACKSendSince();
if (_log.shouldDebug()) if (_log.shouldDebug())
_log.debug("Sent " + fragmentsToSend + " fragments of " + states.size() + _log.debug("Sent " + fragmentsToSend + " fragments of " + states.size() +
" messages in " + sent + " packets to " + peer); " messages in " + sent + " packets to " + peer);

File diff suppressed because it is too large Load Diff

View File

@@ -46,23 +46,7 @@ public class PeerState {
* is established. * is established.
*/ */
protected final Hash _remotePeer; protected final Hash _remotePeer;
/**
* The AES key used to verify packets, set only after the connection is
* established.
*/
private final SessionKey _currentMACKey;
/**
* The AES key used to encrypt/decrypt packets, set only after the
* connection is established.
*/
private final SessionKey _currentCipherKey;
/**
* The pending AES key for verifying packets if we are rekeying the
* connection, or null if we are not in the process of rekeying.
*/
private SessionKey _nextMACKey;
/** when were the current cipher and MAC keys established/rekeyed? */
protected final long _keyEstablishedTime; protected final long _keyEstablishedTime;
/** /**
@@ -86,21 +70,6 @@ public class PeerState {
/** when did we last have a failed send (beginning of period) */ /** when did we last have a failed send (beginning of period) */
// private long _lastFailedSendPeriod; // private long _lastFailedSendPeriod;
/**
* Set of messageIds (Long) that we have received but not yet sent
* Since even with the smallest MTU we can fit 131 acks in a message,
* we are unlikely to get backed up on acks, so we don't keep
* them in any particular order.
*/
private final Set<Long> _currentACKs;
/**
* list of the most recent messageIds (Long) that we have received and sent
* an ACK for. We keep a few of these around to retransmit with _currentACKs,
* hopefully saving some spurious retransmissions
*/
private final Queue<ResendACK> _currentACKsResend;
/** when did we last send ACKs to the peer? */ /** when did we last send ACKs to the peer? */
protected volatile long _lastACKSend; protected volatile long _lastACKSend;
/** when did we decide we need to ACK to this peer? */ /** when did we decide we need to ACK to this peer? */
@@ -198,8 +167,6 @@ public class PeerState {
*/ */
//private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue; //private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue;
private final PriBlockingQueue<OutboundMessageState> _outboundQueue; private final PriBlockingQueue<OutboundMessageState> _outboundQueue;
/** Message ID to sequence number */
private final Map<Integer, Long> _ackedMessages;
/** when the retransmit timer is about to trigger */ /** when the retransmit timer is about to trigger */
private long _retransmitTimer; private long _retransmitTimer;
@@ -321,66 +288,6 @@ public class PeerState {
/** if this many acks arrive out of order, fast rtx */ /** if this many acks arrive out of order, fast rtx */
private static final int FAST_RTX_ACKS = 3; private static final int FAST_RTX_ACKS = 3;
/**
* SSU 1 only.
*
* @param rtt from the EstablishState, or 0 if not available
*/
public PeerState(RouterContext ctx, UDPTransport transport,
byte[] remoteIP, int remotePort, Hash remotePeer, boolean isInbound, int rtt,
SessionKey cipherKey, SessionKey macKey) {
_context = ctx;
_log = ctx.logManager().getLog(PeerState.class);
_transport = transport;
long now = ctx.clock().now();
_keyEstablishedTime = now;
_lastSendTime = now;
_lastReceiveTime = now;
_currentACKs = new ConcurrentHashSet<Long>();
_currentACKsResend = new LinkedBlockingQueue<ResendACK>();
_slowStartThreshold = MAX_SEND_WINDOW_BYTES/2;
_receivePeriodBegin = now;
_remotePort = remotePort;
if (remoteIP.length == 4) {
_mtu = DEFAULT_MTU;
_mtuReceive = DEFAULT_MTU;
_largeMTU = transport.getMTU(false);
_minMTU = MIN_MTU;
} else {
_mtu = MIN_IPV6_MTU;
_mtuReceive = MIN_IPV6_MTU;
_largeMTU = transport.getMTU(true);
_minMTU = MIN_IPV6_MTU;
}
// RFC 5681 sec. 3.1
if (_mtu > 1095)
_sendWindowBytes = 3 * _mtu;
else
_sendWindowBytes = 4 * _mtu;
_sendWindowBytesRemaining = _sendWindowBytes;
_rto = INIT_RTO;
_rtt = INIT_RTT;
if (rtt > 0)
recalculateTimeouts(rtt);
else
_rttDeviation = _rtt;
_inboundMessages = new HashMap<Long, InboundMessageState>(8);
_outboundMessages = new CachedIteratorCollection<OutboundMessageState>();
//_outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32);
_outboundQueue = new PriBlockingQueue<OutboundMessageState>(ctx, "UDP-PeerState", 32);
_ackedMessages = new AckedMessages();
// all createRateStat() moved to EstablishmentManager
_remoteIP = remoteIP;
_remotePeer = remotePeer;
_isInbound = isInbound;
_remoteHostId = new RemoteHostId(remoteIP, remotePort);
_bwEstimator = new SimpleBandwidthEstimator(ctx, this);
_currentCipherKey = cipherKey;
_currentMACKey = macKey;
}
/** /**
* For SSU2 * For SSU2
* *
@@ -425,12 +332,6 @@ public class PeerState {
_isInbound = isInbound; _isInbound = isInbound;
_remoteHostId = new RemoteHostId(_remoteIP, _remotePort); _remoteHostId = new RemoteHostId(_remoteIP, _remotePort);
_bwEstimator = new SimpleBandwidthEstimator(ctx, this); _bwEstimator = new SimpleBandwidthEstimator(ctx, this);
// Unused in SSU2
_currentACKs = null;
_currentACKsResend = null;
_ackedMessages = null;
_currentCipherKey = null;
_currentMACKey = null;
} }
/** /**
@@ -453,41 +354,6 @@ public class PeerState {
* The peer are we talking to. Non-null. * The peer are we talking to. Non-null.
*/ */
public Hash getRemotePeer() { return _remotePeer; } public Hash getRemotePeer() { return _remotePeer; }
/**
* The AES key used to verify packets, set only after the connection is
* established.
*
* SSU 1 only.
*/
SessionKey getCurrentMACKey() { return _currentMACKey; }
/**
* The AES key used to encrypt/decrypt packets, set only after the
* connection is established.
*
* SSU 1 only.
*/
SessionKey getCurrentCipherKey() { return _currentCipherKey; }
/**
* The pending AES key for verifying packets if we are rekeying the
* connection, or null if we are not in the process of rekeying.
*
* SSU 1 only.
*
* @return null always, rekeying unimplemented
*/
SessionKey getNextMACKey() { return _nextMACKey; }
/**
* The pending AES key for encrypting/decrypting packets if we are
* rekeying the connection, or null if we are not in the process
* of rekeying.
*
* SSU 1 only.
*
* @return null always, rekeying unimplemented
*/
SessionKey getNextCipherKey() { return null; }
/** /**
* When were the current cipher and MAC keys established/rekeyed? * When were the current cipher and MAC keys established/rekeyed?
@@ -794,9 +660,6 @@ public class PeerState {
_receiveBytes = 0; _receiveBytes = 0;
_receivePeriodBegin = now; _receivePeriodBegin = now;
} }
// null for PeerState2
if (_currentACKs != null)
_currentACKs.add(messageId);
messagePartiallyReceived(now); messagePartiallyReceived(now);
} }
@@ -809,14 +672,12 @@ public class PeerState {
/** /**
* We received a partial message, or we want to send some acks. * We received a partial message, or we want to send some acks.
* SSU1 only, see override
*
* @since 0.9.52 * @since 0.9.52
*/ */
protected synchronized void messagePartiallyReceived(long now) { protected synchronized void messagePartiallyReceived(long now) {
_lastReceiveTime = now; throw new UnsupportedOperationException();
if (_wantACKSendSince <= 0) {
_wantACKSendSince = now;
new ACKTimer();
}
} }
/** /**
@@ -894,254 +755,6 @@ public class PeerState {
" BWE: " + DataHelper.formatSize2Decimal((long) (bwe * 1000), false) + "bps"); " BWE: " + DataHelper.formatSize2Decimal((long) (bwe * 1000), false) + "bps");
} }
/**
* Grab a list of message ids (Long) that we want to send to the remote
* peer, regardless of the packet size, but don't remove it from our
* "want to send" list. If the message id is transmitted to the peer,
* removeACKMessage(Long) should be called.
*
* The returned list contains acks not yet sent only.
* The caller should NOT transmit all of them all the time,
* even if there is room,
* or the packets will have way too much overhead.
*
* SSU 1 only.
*
* @return a new list, do as you like with it
*/
List<Long> getCurrentFullACKs() {
// no such element exception seen here
List<Long> rv = new ArrayList<Long>(_currentACKs);
return rv;
}
/**
* Grab a list of message ids (Long) that we want to send to the remote
* peer, regardless of the packet size, but don't remove it from our
* "want to send" list.
*
* The returned list contains
* a random assortment of acks already sent.
* The caller should NOT transmit all of them all the time,
* even if there is room,
* or the packets will have way too much overhead.
*
* SSU 1 only.
*
* @return a new list, do as you like with it
* @since 0.8.12 was included in getCurrentFullACKs()
*/
List<Long> getCurrentResendACKs() {
int sz = _currentACKsResend.size();
List<Long> randomResends = new ArrayList<Long>(sz);
if (sz > 0) {
long cutoff = _context.clock().now() - RESEND_ACK_TIMEOUT;
int i = 0;
for (Iterator<ResendACK> iter = _currentACKsResend.iterator(); iter.hasNext(); ) {
ResendACK rack = iter.next();
if (rack.time > cutoff && i++ < MAX_RESEND_ACKS) {
randomResends.add(rack.id);
} else {
iter.remove();
if (_log.shouldDebug())
_log.debug("Expired ack " + rack.id + " sent " + (cutoff + RESEND_ACK_TIMEOUT - rack.time) +
" ago, now " + _currentACKsResend.size() + " resend acks");
}
}
if (i > 1)
Collections.shuffle(randomResends, _context.random());
}
return randomResends;
}
/**
* The ack was sent.
* Side effect - sets _lastACKSend
*
* SSU 1 only.
*/
void removeACKMessage(Long messageId) {
boolean removed = _currentACKs.remove(messageId);
if (removed) {
// only add if removed from current, as this may be called for
// acks already in _currentACKsResend.
_currentACKsResend.offer(new ResendACK(messageId, _context.clock().now()));
// trim happens in getCurrentResendACKs above
if (_log.shouldDebug())
_log.debug("Sent ack " + messageId + " now " + _currentACKs.size() + " current and " +
_currentACKsResend.size() + " resend acks");
}
// should we only do this if removed?
_lastACKSend = _context.clock().now();
}
/**
* Only called by ACKTimer with alwaysIncludeRetransmissions = false.
* So this is only for ACK-only packets, so all the size limiting is useless.
* FIXME.
*
* Caller should sync on this.
*
* Side effect - sets _lastACKSend to now if rv is non-empty.
* Side effect - sets _wantACKSendSince to 0 if _currentACKs is now empty.
*
* SSU 1 only.
*
* @return non-null, possibly empty
*/
private List<ACKBitfield> retrieveACKBitfields(boolean alwaysIncludeRetransmissions) {
int bytesRemaining = countMaxACKData();
// Limit the overhead of all the resent acks when using small MTU
// 64 bytes in a 608-byte packet is too much...
// Send a random subset of all the queued resend acks.
int resendSize = _currentACKsResend.size();
int maxResendAcks;
if (bytesRemaining < MIN_MTU)
maxResendAcks = MAX_RESEND_ACKS_SMALL;
else
maxResendAcks = MAX_RESEND_ACKS_LARGE;
List<ACKBitfield> rv = new ArrayList<ACKBitfield>(maxResendAcks);
// save to add to currentACKsResend later so we don't include twice
List<Long> currentACKsRemoved = new ArrayList<Long>(_currentACKs.size());
// As explained above, we include the acks in any order
// since we are unlikely to get backed up -
// just take them using the Set iterator.
Iterator<Long> iter = _currentACKs.iterator();
while (bytesRemaining >= 4 && iter.hasNext()) {
Long val = iter.next();
iter.remove();
long id = val.longValue();
rv.add(new FullACKBitfield(id));
currentACKsRemoved.add(val);
bytesRemaining -= 4;
}
if (_currentACKs.isEmpty())
_wantACKSendSince = 0;
if (alwaysIncludeRetransmissions || !rv.isEmpty()) {
List<Long> randomResends = getCurrentResendACKs();
// now repeat by putting in some old ACKs
// randomly selected from the Resend queue.
// Maybe we should only resend each one a certain number of times...
int oldIndex = Math.min(resendSize, maxResendAcks);
iter = randomResends.iterator();
while (bytesRemaining >= 4 && oldIndex-- > 0 && iter.hasNext()) {
Long cur = iter.next();
long c = cur.longValue();
FullACKBitfield bf = new FullACKBitfield(c);
// try to avoid duplicates ??
// ACKsResend is not checked for dups at add time
//if (rv.contains(bf)) {
// iter.remove();
//} else {
rv.add(bf);
bytesRemaining -= 4;
//}
}
if (!currentACKsRemoved.isEmpty()) {
long now = _context.clock().now();
for (Long val : currentACKsRemoved) {
_currentACKsResend.offer(new ResendACK(val, now));
}
// trim happens in getCurrentResendACKs above
}
}
int partialIncluded = 0;
if (bytesRemaining > 4) {
// ok, there's room to *try* to fit in some partial ACKs, so
// we should try to find some packets to partially ACK
// (preferably the ones which have the most received fragments)
List<ACKBitfield> partial = new ArrayList<ACKBitfield>();
fetchPartialACKs(partial);
// we may not be able to use them all, but lets try...
for (int i = 0; (bytesRemaining > 4) && (i < partial.size()); i++) {
ACKBitfield bitfield = partial.get(i);
int bytes = (bitfield.fragmentCount() / 7) + 1;
if (bytesRemaining > bytes + 4) { // msgId + bitfields
rv.add(bitfield);
bytesRemaining -= bytes + 4;
partialIncluded++;
} else {
// continue on to another partial, in case there's a
// smaller one that will fit
}
}
}
if (!rv.isEmpty())
_lastACKSend = _context.clock().now();
if (partialIncluded > 0)
_context.statManager().addRateData("udp.sendACKPartial", partialIncluded, rv.size() - partialIncluded);
return rv;
}
/**
* SSU 1 only.
*
* @param rv out parameter, populated with true partial ACKBitfields.
* no full bitfields are included.
*/
void fetchPartialACKs(List<ACKBitfield> rv) {
List<InboundMessageState> states = null;
int curState = 0;
synchronized (_inboundMessages) {
int numMessages = _inboundMessages.size();
if (numMessages <= 0)
return;
// todo: make this a list instead of a map, so we can iterate faster w/out the memory overhead?
for (Iterator<InboundMessageState> iter = _inboundMessages.values().iterator(); iter.hasNext(); ) {
InboundMessageState state = iter.next();
if (state.isExpired()) {
//if (_context instanceof RouterContext)
// ((RouterContext)_context).messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired partially received: " + state.toString());
iter.remove();
// state.releaseResources() ??
} else {
if (!state.isComplete()) {
if (states == null)
states = new ArrayList<InboundMessageState>(numMessages);
states.add(state);
}
}
}
}
if (states != null) {
for (InboundMessageState ims : states) {
ACKBitfield abf = ims.createACKBitfield();
rv.add(abf);
}
}
}
/**
* A dummy "partial" ack which represents a full ACK of a message
*
* SSU 1 only.
*/
private static class FullACKBitfield implements ACKBitfield {
private final long _msgId;
public FullACKBitfield(long id) { _msgId = id; }
public int fragmentCount() { return 1; }
public int ackCount() { return 1; }
public int highestReceived() { return 0; }
public long getMessageId() { return _msgId; }
public boolean received(int fragmentNum) { return true; }
public boolean receivedComplete() { return true; }
@Override
public int hashCode() { return (int) _msgId; }
@Override
public boolean equals(Object o) {
if (!(o instanceof FullACKBitfield)) return false;
return _msgId == ((ACKBitfield)o).getMessageId();
}
@Override
public String toString() { return "Full ACK " + _msgId; }
}
/** /**
* We sent a message which was ACKed containing the given # of bytes. * We sent a message which was ACKed containing the given # of bytes.
* Caller should synch on this * Caller should synch on this
@@ -1391,48 +1004,12 @@ public class PeerState {
/** /**
* All acks have been sent. * All acks have been sent.
* *
* SSU 1 only. * SSU 1 only, see override
* *
* @since 0.9.52 * @since 0.9.52
*/ */
synchronized void clearWantedACKSendSince() { synchronized void clearWantedACKSendSince() {
// race prevention throw new UnsupportedOperationException();
if (_currentACKs.isEmpty())
_wantACKSendSince = 0;
}
/**
* Are we out of room to send all the current unsent acks in a single packet?
* This is a huge threshold (134 for small MTU and 255 for large MTU)
* that is rarely if ever exceeded in practice.
* So just use a fixed threshold of half the resend acks, so that if the
* packet is lost the acks have a decent chance of getting retransmitted.
* Used only by ACKSender.
*
* SSU 1 only.
*/
boolean unsentACKThresholdReached() {
return _currentACKs.size() >= MAX_RESEND_ACKS / 2;
}
/**
* SSU 1 only.
*
* @return how many bytes available for acks in an ack-only packet, == MTU - 83
* Max of 1020
*/
private int countMaxACKData() {
return Math.min(PacketBuilder.ABSOLUTE_MAX_ACKS * 4,
_mtu
- (_remoteIP.length == 4 ? PacketBuilder.IP_HEADER_SIZE : PacketBuilder.IPV6_HEADER_SIZE)
- PacketBuilder.UDP_HEADER_SIZE
- UDPPacket.IV_SIZE
- UDPPacket.MAC_SIZE
- 1 // type flag
- 4 // timestamp
- 1 // data flag
- 1 // # ACKs
- 16); // padding safety
} }
/** /**
@@ -1860,197 +1437,6 @@ public class PeerState {
} }
} }
/**
* A full ACK was received.
* TODO if messages awaiting ack were a HashMap&lt;Long, OutboundMessageState&gt; this would be faster.
*
* SSU 1 only.
*
* @param highestSeqNumAcked in/out param, will modify if this seq. number is higher
* @return true if the message was acked for the first time
*/
boolean acked(long messageId, ModifiableLong highestSeqNumAcked) {
if (_dead) return false;
OutboundMessageState state = null;
boolean anyPending;
synchronized (_outboundMessages) {
for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
state = iter.next();
if (state.getMessageId() == messageId) {
iter.remove();
break;
} else if (state.getPushCount() <= 0) {
// _outboundMessages is ordered, so once we get to a msg that
// hasn't been transmitted yet, we can stop
state = null;
break;
} else {
state = null;
}
}
anyPending = !_outboundMessages.isEmpty();
}
if (state != null) {
int numSends = state.getMaxSends();
long lifetime = state.getLifetime();
if (_log.shouldDebug())
_log.debug("Received ack of " + messageId + " by " + _remotePeer
+ " after " + lifetime + " and " + numSends + " sends");
_context.statManager().addRateData("udp.sendConfirmTime", lifetime);
if (state.getFragmentCount() > 1)
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount());
_context.statManager().addRateData("udp.sendConfirmVolley", numSends);
_transport.succeeded(state);
boolean anyQueued;
if (anyPending) {
// locked_messageACKed will nudge()
anyQueued = false;
} else {
synchronized (_outboundQueue) {
anyQueued = !_outboundQueue.isEmpty();
}
}
long sn = state.getSeqNum();
if (sn > highestSeqNumAcked.value)
highestSeqNumAcked.value = sn;
synchronized(_ackedMessages) {
_ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn));
}
// this adjusts the rtt/rto/window/etc
int maxPktSz = state.fragmentSize(0) +
(isIPv6() ? PacketBuilder.MIN_IPV6_DATA_PACKET_OVERHEAD : PacketBuilder.MIN_DATA_PACKET_OVERHEAD);
messageACKed(state.getUnackedSize(), maxPktSz, lifetime, numSends, anyPending, anyQueued);
} else {
// dupack, likely
Long seq;
synchronized(_ackedMessages) {
seq = _ackedMessages.get(Integer.valueOf((int) messageId));
}
if (seq != null) {
long sn = seq.longValue();
if (sn > highestSeqNumAcked.value)
highestSeqNumAcked.value = sn;
}
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Received an ACK for a message not pending: " + messageId);
}
return state != null;
}
/**
* A partial ACK was received. This is much less common than full ACKs.
*
* SSU 1 only.
*
* @param highestSeqNumAcked in/out param, will modify if this seq. number is higher
* @return true if any fragment of the message was completely acked for the first time
*/
boolean acked(ACKBitfield bitfield, ModifiableLong highestSeqNumAcked) {
if (_dead)
return false;
final long messageId = bitfield.getMessageId();
if (bitfield.receivedComplete()) {
return acked(messageId, highestSeqNumAcked);
}
OutboundMessageState state = null;
boolean isComplete = false;
boolean anyPending;
int ackedSize = 0;
synchronized (_outboundMessages) {
for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
state = iter.next();
if (state.getMessageId() == messageId) {
ackedSize = state.getUnackedSize();
boolean complete = state.acked(bitfield);
if (complete) {
isComplete = true;
iter.remove();
} else {
ackedSize -= state.getUnackedSize();
}
break;
} else if (state.getPushCount() <= 0) {
// _outboundMessages is ordered, so once we get to a msg that
// hasn't been transmitted yet, we can stop
state = null;
break;
} else {
state = null;
}
}
anyPending = !_outboundMessages.isEmpty();
}
if (state != null) {
int numSends = state.getMaxSends();
int numACKed = bitfield.ackCount();
_context.statManager().addRateData("udp.partialACKReceived", numACKed);
long lifetime = state.getLifetime();
if (isComplete) {
_context.statManager().addRateData("udp.sendConfirmTime", lifetime);
if (state.getFragmentCount() > 1)
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount());
_context.statManager().addRateData("udp.sendConfirmVolley", numSends);
_transport.succeeded(state);
if (_log.shouldDebug())
_log.debug("Received partial ack of " + messageId + " by " + _remotePeer
+ " newly-acked: " + ackedSize
+ ", now complete for: " + state);
} else {
if (_log.shouldDebug())
_log.debug("Received partial ack of " + messageId + " by " + _remotePeer
+ " after " + lifetime + " and " + numSends + " sends"
+ " complete? false"
+ " newly-acked: " + ackedSize
+ ' ' + bitfield
+ " for: " + state);
}
if (ackedSize > 0) {
state.clearNACKs();
boolean anyQueued;
if (anyPending) {
// locked_messageACKed will nudge()
anyQueued = false;
} else {
synchronized (_outboundQueue) {
anyQueued = !_outboundQueue.isEmpty();
}
}
// this adjusts the rtt/rto/window/etc
messageACKed(ackedSize, 0, lifetime, numSends, anyPending, anyQueued);
}
// we do this even if only partial
long sn = state.getSeqNum();
if (sn > highestSeqNumAcked.value)
highestSeqNumAcked.value = sn;
if (isComplete) {
synchronized(_ackedMessages) {
_ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn));
}
}
return ackedSize > 0;
} else {
// dupack
Long seq;
synchronized(_ackedMessages) {
seq = _ackedMessages.get(Integer.valueOf((int) messageId));
}
if (seq != null) {
long sn = seq.longValue();
if (sn > highestSeqNumAcked.value)
highestSeqNumAcked.value = sn;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received an ACK for a message not pending: " + bitfield);
return false;
}
}
/** /**
* An ACK of a fragment was received. * An ACK of a fragment was received.
* *
@@ -2268,28 +1654,6 @@ public class PeerState {
_sendWindowBytes = oldPeer._sendWindowBytes; _sendWindowBytes = oldPeer._sendWindowBytes;
oldPeer._dead = true; oldPeer._dead = true;
if (getVersion() == 1 && oldPeer.getVersion() == 1) {
List<Long> tmp = new ArrayList<Long>();
// AIOOBE from concurrent access
//tmp.addAll(oldPeer._currentACKs);
for (Long l : oldPeer._currentACKs) {
tmp.add(l);
}
oldPeer._currentACKs.clear();
if (!_dead) {
_currentACKs.addAll(tmp);
}
List<ResendACK> tmp3 = new ArrayList<ResendACK>();
tmp3.addAll(oldPeer._currentACKsResend);
oldPeer._currentACKsResend.clear();
if (!_dead) {
_currentACKsResend.addAll(tmp3);
}
}
if (getVersion() == oldPeer.getVersion()) { if (getVersion() == oldPeer.getVersion()) {
Map<Long, InboundMessageState> msgs = new HashMap<Long, InboundMessageState>(); Map<Long, InboundMessageState> msgs = new HashMap<Long, InboundMessageState>();
synchronized (oldPeer._inboundMessages) { synchronized (oldPeer._inboundMessages) {
@@ -2356,69 +1720,6 @@ public class PeerState {
} }
} }
/**
* A timer to send an ack-only packet.
*
* SSU 1 only.
*
* @since 0.9.52
*/
private class ACKTimer extends SimpleTimer2.TimedEvent {
public ACKTimer() {
super(_context.simpleTimer2());
long delta = Math.max(10, Math.min(_rtt/6, ACK_FREQUENCY));
if (_log.shouldDebug())
_log.debug("Sending delayed ack in " + delta + ": " + PeerState.this);
schedule(delta);
}
/**
* Send an ack-only packet, unless acks were already sent
* as indicated by _wantACKSendSince == 0.
* Will not requeue unless the acks don't all fit (unlikely).
*/
public void timeReached() {
synchronized(PeerState.this) {
long wanted = _wantACKSendSince;
if (wanted <= 0) {
if (_log.shouldDebug())
_log.debug("Already acked:" + PeerState.this);
return;
}
List<ACKBitfield> ackBitfields = retrieveACKBitfields(false);
if (!ackBitfields.isEmpty()) {
UDPPacket ack = _transport.getBuilder().buildACK(PeerState.this, ackBitfields);
ack.markType(1);
ack.setFragmentCount(-1);
ack.setMessageType(PacketBuilder.TYPE_ACK);
if (_log.shouldDebug()) {
//_log.debug("Sending " + ackBitfields + " to " + PeerState.this);
_log.debug("Sending " + ackBitfields.size() + " acks to " + PeerState.this);
}
// locking issues, we ignore the result, and acks are small,
// so don't even bother allocating
//peer.allocateSendingBytes(ack.getPacket().getLength(), true);
// ignore whether its ok or not, its a bloody ack. this should be fixed, probably.
_transport.send(ack);
if (_wantACKSendSince > 0) {
// still full packets left to be ACKed, since wanted time
// is reset by retrieveACKBitfields when all of the IDs are
// removed
if (_log.shouldInfo())
_log.info("Requeueing more ACKs for " + PeerState.this);
reschedule(25);
}
} else {
if (_log.shouldDebug())
_log.debug("No more acks:" + PeerState.this);
}
}
}
}
// why removed? Some risk of dups in OutboundMessageFragments._activePeers ??? // why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
/* /*

View File

@@ -326,17 +326,6 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
return true; return true;
} }
// SSU 1 unsupported things
@Override
List<Long> getCurrentFullACKs() { throw new UnsupportedOperationException(); }
@Override
List<Long> getCurrentResendACKs() { throw new UnsupportedOperationException(); }
@Override
void removeACKMessage(Long messageId) { throw new UnsupportedOperationException(); }
@Override
void fetchPartialACKs(List<ACKBitfield> rv) { throw new UnsupportedOperationException(); }
// SSU 2 things // SSU 2 things
/// begin SSU2Sender interface /// /// begin SSU2Sender interface ///

View File

@@ -138,7 +138,6 @@ class PeerTestManager {
private final RouterContext _context; private final RouterContext _context;
private final Log _log; private final Log _log;
private final UDPTransport _transport; private final UDPTransport _transport;
private final PacketBuilder _packetBuilder;
private final PacketBuilder2 _packetBuilder2; private final PacketBuilder2 _packetBuilder2;
/** map of Long(nonce) to PeerTestState for tests currently in progress (as Bob/Charlie) */ /** map of Long(nonce) to PeerTestState for tests currently in progress (as Bob/Charlie) */
private final Map<Long, PeerTestState> _activeTests; private final Map<Long, PeerTestState> _activeTests;
@@ -196,7 +195,6 @@ class PeerTestManager {
_log = context.logManager().getLog(PeerTestManager.class); _log = context.logManager().getLog(PeerTestManager.class);
_activeTests = new ConcurrentHashMap<Long, PeerTestState>(); _activeTests = new ConcurrentHashMap<Long, PeerTestState>();
_recentTests = new LinkedBlockingQueue<Long>(); _recentTests = new LinkedBlockingQueue<Long>();
_packetBuilder = transport.getBuilder();
_packetBuilder2 = transport.getBuilder2(); _packetBuilder2 = transport.getBuilder2();
_throttle = new IPThrottler(MAX_PER_IP, THROTTLE_CLEAN_TIME); _throttle = new IPThrottler(MAX_PER_IP, THROTTLE_CLEAN_TIME);
_context.statManager().createRateStat("udp.statusKnownCharlie", "How often the bob we pick passes us to a charlie we already have a session with?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.statusKnownCharlie", "How often the bob we pick passes us to a charlie we already have a session with?", "udp", UDPTransport.RATES);

View File

@@ -176,9 +176,6 @@ class PeerTestState {
public SessionKey getCharlieIntroKey() { return _charlieIntroKey; } public SessionKey getCharlieIntroKey() { return _charlieIntroKey; }
public void setCharlieIntroKey(SessionKey key) { _charlieIntroKey = key; } public void setCharlieIntroKey(SessionKey key) { _charlieIntroKey = key; }
public SessionKey getBobCipherKey() { return _bob.getCurrentCipherKey(); }
public SessionKey getBobMACKey() { return _bob.getCurrentMACKey(); }
/** when did this test begin? */ /** when did this test begin? */
public long getBeginTime() { return _beginTime; } public long getBeginTime() { return _beginTime; }

View File

@@ -93,7 +93,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private final IntroductionManager _introManager; private final IntroductionManager _introManager;
private final ExpirePeerEvent _expireEvent; private final ExpirePeerEvent _expireEvent;
private final PeerTestEvent _testEvent; private final PeerTestEvent _testEvent;
private final PacketBuilder _packetBuilder;
private Status _reachabilityStatus; private Status _reachabilityStatus;
private Status _reachabilityStatusPending; private Status _reachabilityStatusPending;
// only for logging, to be removed // only for logging, to be removed
@@ -407,8 +406,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_cachedBid[i] = new SharedBid(BID_VALUES[i]); _cachedBid[i] = new SharedBid(BID_VALUES[i]);
} }
_packetBuilder = (dh != null) ? new PacketBuilder(_context, this) : null; _packetBuilder2 = new PacketBuilder2(_context, this);
_packetBuilder2 = (xdh != null) ? new PacketBuilder2(_context, this) : null;
_fragments = new OutboundMessageFragments(_context, this, _activeThrottle); _fragments = new OutboundMessageFragments(_context, this, _activeThrottle);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this); _inboundFragments = new InboundMessageFragments(_context, _fragments, this);
//if (SHOULD_FLOOD_PEERS) //if (SHOULD_FLOOD_PEERS)
@@ -2397,18 +2395,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
*/ */
void sendDestroy(PeerState peer, int reasonCode) { void sendDestroy(PeerState peer, int reasonCode) {
UDPPacket pkt; UDPPacket pkt;
if (peer.getVersion() == 1) {
// peer must be fully established
if (peer.getCurrentCipherKey() == null)
return;
pkt = _packetBuilder.buildSessionDestroyPacket(peer);
} else {
try { try {
pkt = _packetBuilder2.buildSessionDestroyPacket(reasonCode, (PeerState2) peer); pkt = _packetBuilder2.buildSessionDestroyPacket(reasonCode, (PeerState2) peer);
} catch (IOException ioe) { } catch (IOException ioe) {
return; return;
} }
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending destroy to : " + peer); _log.debug("Sending destroy to : " + peer);
send(pkt); send(pkt);
@@ -3749,14 +3740,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return _hmac; return _hmac;
} }
/**
* @return the PacketBuilder, or null if SSU1 disabled
* @since 0.9.52
*/
PacketBuilder getBuilder() {
return _packetBuilder;
}
/** /**
* @return null if not configured for SSU2 * @return null if not configured for SSU2
* @since 0.9.54 * @since 0.9.54
@@ -3891,15 +3874,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// or else session will stay open forever? // or else session will stay open forever?
//peer.setLastSendTime(now); //peer.setLastSendTime(now);
UDPPacket ping; UDPPacket ping;
if (peer.getVersion() == 2) {
try { try {
ping = _packetBuilder2.buildPing((PeerState2) peer); ping = _packetBuilder2.buildPing((PeerState2) peer);
} catch (IOException ioe) { } catch (IOException ioe) {
continue; continue;
} }
} else {
ping = _packetBuilder.buildPing(peer);
}
send(ping); send(ping);
peer.setLastPingTime(now); peer.setLastPingTime(now);
// If external port is different, it may be changing the port for every // If external port is different, it may be changing the port for every

View File

@@ -95,6 +95,9 @@ public class UDPEndpointTestStandalone {
_endpoint = peer; _endpoint = peer;
} }
public void run() { public void run() {
System.out.println("rewrite me for SSU2");
throw new UnsupportedOperationException("rewrite me for SSU2");
/*
while (!_beginTest) { while (!_beginTest) {
try { Thread.sleep(2000); } catch (InterruptedException ie) {} try { Thread.sleep(2000); } catch (InterruptedException ie) {}
} }
@@ -149,6 +152,7 @@ public class UDPEndpointTestStandalone {
_log.error("Test failed, " + _sentNotReceived.size() + " not received"); _log.error("Test failed, " + _sentNotReceived.size() + " not received");
System.exit(1); System.exit(1);
} }
*/
} }
} }