* Transport:

- Cleanup max connections code
      - Add i2np.udp.maxConnections
      - Set max connections based on share bandwidth
      - Add haveCapacity() that can be used for connection
        throttling in the router
      - Reject IBGW/OBEP requests when near connection limit
      - Reduce idle timeout when near connection limit
    * Tunnel request handler:
      - Require tunnel.dropLoad* stats
      - Speed up request loop
This commit is contained in:
zzz
2008-12-14 14:07:37 +00:00
parent dae6fd47d9
commit 734818f651
12 changed files with 117 additions and 19 deletions

View File

@@ -50,7 +50,7 @@ public class StatManager {
"tunnel.acceptLoad,tunnel.buildRequestTime,tunnel.rejectOverloaded,tunnel.rejectTimeout" +
"tunnel.buildClientExpire,tunnel.buildClientReject,tunnel.buildClientSuccess," +
"tunnel.buildExploratoryExpire,tunnel.buildExploratoryReject,tunnel.buildExploratorySuccess," +
"tunnel.buildRatio.*,tunnel.corruptMessage," +
"tunnel.buildRatio.*,tunnel.corruptMessage,tunnel.dropLoad*," +
"tunnel.decryptRequestTime,tunnel.fragmentedDropped,tunnel.participatingMessageCount,"+
"tunnel.participatingTunnels,tunnel.testFailedTime,tunnel.testSuccessTime," +
"tunnel.participatingBandwidth,udp.sendPacketSize,udp.packetsRetransmitted" ;

View File

@@ -34,6 +34,7 @@ public abstract class CommSystemFacade implements Service {
public int countActivePeers() { return 0; }
public int countActiveSendPeers() { return 0; }
public boolean haveCapacity() { return true; }
public List getMostRecentErrorMessages() { return Collections.EMPTY_LIST; }
/**

View File

@@ -60,6 +60,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
public int countActivePeers() { return (_manager == null ? 0 : _manager.countActivePeers()); }
public int countActiveSendPeers() { return (_manager == null ? 0 : _manager.countActiveSendPeers()); }
public boolean haveCapacity() { return (_manager == null ? false : _manager.haveCapacity()); }
/**
* Framed average clock skew of connected peers in seconds, or null if we cannot answer.

View File

@@ -40,6 +40,7 @@ public interface Transport {
public int countActivePeers();
public int countActiveSendPeers();
public boolean haveCapacity();
public Vector getClockSkews();
public List getMostRecentErrorMessages();

View File

@@ -31,7 +31,9 @@ import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.MessageSelector;
import net.i2p.router.OutNetMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.util.Log;
/**
@@ -78,6 +80,33 @@ public abstract class TransportImpl implements Transport {
* How many peers are we actively sending messages to (this minute)
*/
public int countActiveSendPeers() { return 0; }
/** Default is 500 for floodfills... */
public static final int DEFAULT_MAX_CONNECTIONS = 500;
/** ...and 60/120/180/240/300 for BW Tiers K/L/M/N/O */
public static final int MAX_CONNECTION_FACTOR = 60;
/** Per-transport connection limit */
public int getMaxConnections() {
String style = getStyle();
if (style.equals("SSU"))
style = "udp";
else
style = style.toLowerCase();
int def = DEFAULT_MAX_CONNECTIONS;
RouterInfo ri = _context.router().getRouterInfo();
if (ri != null) {
char bw = ri.getBandwidthTier().charAt(0);
if (bw != 'U' &&
! ((FloodfillNetworkDatabaseFacade)_context.netDb()).floodfillEnabled())
def = MAX_CONNECTION_FACTOR * (1 + bw - Router.CAPABILITY_BW12);
}
return _context.getProperty("i2np." + style + ".maxConnections", def);
}
/**
* Can we initiate or accept a connection to another peer, saving some margin
*/
public boolean haveCapacity() { return true; }
/**
* Return our peer clock skews on a transport.

View File

@@ -151,6 +151,19 @@ public class TransportManager implements TransportEventListener {
return peers;
}
/**
* Is at least one transport below its connection limit + some margin
* Use for throttling in the router.
* Perhaps we should just use SSU?
*/
public boolean haveCapacity() {
for (int i = 0; i < _transports.size(); i++) {
if (((Transport)_transports.get(i)).haveCapacity())
return true;
}
return false;
}
/**
* Return our peer clock skews on all transports.
* Vector composed of Long, each element representing a peer skew in seconds.

View File

@@ -39,6 +39,7 @@ public class EventPumper implements Runnable {
private List _wantsRegister;
private List _wantsConRegister;
private NTCPTransport _transport;
private long _expireIdleWriteTime;
private static final int BUF_SIZE = 8*1024;
private static final int MAX_CACHE_SIZE = 64;
@@ -50,6 +51,8 @@ public class EventPumper implements Runnable {
* the time to iterate across them to check a few flags shouldn't be a problem.
*/
private static final long FAILSAFE_ITERATION_FREQ = 2*1000l;
private static final long MIN_EXPIRE_IDLE_TIME = 5*60*1000l;
private static final long MAX_EXPIRE_IDLE_TIME = 15*60*1000l;
public EventPumper(RouterContext ctx, NTCPTransport transport) {
_context = ctx;
@@ -57,6 +60,7 @@ public class EventPumper implements Runnable {
_transport = transport;
_alive = false;
_bufCache = new ArrayList(MAX_CACHE_SIZE);
_expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME;
}
public void startPumping() {
@@ -135,8 +139,12 @@ public class EventPumper implements Runnable {
int failsafeWrites = 0;
int failsafeCloses = 0;
int failsafeInvalid = 0;
// pointless if we do this every 2 seconds?
long expireIdleWriteTime = 10*60*1000l; // + _context.random().nextLong(60*60*1000l);
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
if (_transport.haveCapacity())
_expireIdleWriteTime = Math.min(_expireIdleWriteTime + 1000, MAX_EXPIRE_IDLE_TIME);
else
_expireIdleWriteTime = Math.max(_expireIdleWriteTime - 3000, MIN_EXPIRE_IDLE_TIME);
for (Iterator iter = all.iterator(); iter.hasNext(); ) {
try {
SelectionKey key = (SelectionKey)iter.next();
@@ -181,8 +189,8 @@ public class EventPumper implements Runnable {
failsafeWrites++;
}
if ( con.getTimeSinceSend() > expireIdleWriteTime &&
con.getTimeSinceReceive() > expireIdleWriteTime) {
if ( con.getTimeSinceSend() > _expireIdleWriteTime &&
con.getTimeSinceReceive() > _expireIdleWriteTime) {
// we haven't sent or received anything in a really long time, so lets just close 'er up
con.close();
failsafeCloses++;
@@ -680,4 +688,5 @@ public class EventPumper implements Runnable {
private void expireTimedOut() {
_transport.expireTimedOut();
}
public long getIdleTimeout() { return _expireIdleWriteTime; }
}

View File

@@ -300,18 +300,13 @@ public class NTCPTransport extends TransportImpl {
return _slowBid;
}
private static final int DEFAULT_MAX_CONNECTIONS = 500;
public boolean allowConnection() {
int max = DEFAULT_MAX_CONNECTIONS;
String mc = _context.getProperty("i2np.ntcp.maxConnections");
if (mc != null) {
try {
max = Integer.parseInt(mc);
} catch (NumberFormatException nfe) {}
}
return countActivePeers() < max;
return countActivePeers() < getMaxConnections();
}
public boolean haveCapacity() {
return countActivePeers() < getMaxConnections() * 4 / 5;
}
void sendComplete(OutNetMessage msg) { _finisher.add(msg); }
/** async afterSend call, which can take some time w/ jobs, etc */
@@ -581,7 +576,10 @@ public class NTCPTransport extends TransportImpl {
long totalRecv = 0;
StringBuffer buf = new StringBuffer(512);
buf.append("<b id=\"ntcpcon\">NTCP connections: ").append(peers.size()).append("</b><br />\n");
buf.append("<b id=\"ntcpcon\">NTCP connections: ").append(peers.size());
buf.append(" limit: ").append(getMaxConnections());
buf.append(" timeout: ").append(DataHelper.formatDuration(_pumper.getIdleTimeout()));
buf.append("</b><br />\n");
buf.append("<table border=\"1\">\n");
buf.append(" <tr><td><b><a href=\"#def.peer\">peer</a></b></td>");
buf.append(" <td><b>dir</b></td>");

View File

@@ -271,6 +271,8 @@ public class EstablishmentManager {
_log.warn("Receive session request from blocklisted IP: " + from);
return; // drop the packet
}
if (!_transport.allowConnection())
return; // drop the packet
state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getLocalPort());
state.receiveSessionRequest(reader.getSessionRequestReader());
isNew = true;

View File

@@ -90,6 +90,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/** list of RemoteHostId for peers whose packets we want to drop outright */
private List _dropList;
private int _expireTimeout;
private static final int DROPLIST_PERIOD = 10*60*1000;
private static final int MAX_DROPLIST_SIZE = 256;
@@ -159,6 +161,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_fragments = new OutboundMessageFragments(_context, this, _activeThrottle);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this);
_flooder = new UDPFlooder(_context, this);
_expireTimeout = EXPIRE_TIMEOUT;
_expireEvent = new ExpirePeerEvent();
_testEvent = new PeerTestEvent();
_reachabilityStatus = CommSystemFacade.STATUS_UNKNOWN;
@@ -887,6 +890,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return null;
}
}
if (!allowConnection())
return null;
if (_log.shouldLog(Log.DEBUG))
_log.debug("bidding on a message to an unestablished peer: " + to.toBase64());
@@ -922,6 +927,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// in the IntroductionManager a chance to work.
public static final int EXPIRE_TIMEOUT = 30*60*1000;
private static final int MAX_IDLE_TIME = EXPIRE_TIMEOUT;
private static final int MIN_EXPIRE_TIMEOUT = 10*60*1000;
public String getStyle() { return STYLE; }
public void send(OutNetMessage msg) {
@@ -1264,6 +1270,18 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return getPeerState(dest) != null;
}
public boolean allowConnection() {
synchronized (_peersByIdent) {
return _peersByIdent.size() < getMaxConnections();
}
}
public boolean haveCapacity() {
synchronized (_peersByIdent) {
return _peersByIdent.size() < getMaxConnections() * 4 / 5;
}
}
/**
* Return our peer clock skews on this transport.
* Vector composed of Long, each element representing a peer skew in seconds.
@@ -1622,7 +1640,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
int numPeers = 0;
StringBuffer buf = new StringBuffer(512);
buf.append("<b id=\"udpcon\">UDP connections: ").append(peers.size()).append("</b><br />\n");
buf.append("<b id=\"udpcon\">UDP connections: ").append(peers.size());
buf.append(" limit: ").append(getMaxConnections());
buf.append(" timeout: ").append(DataHelper.formatDuration(_expireTimeout));
buf.append("</b><br />\n");
buf.append("<table border=\"1\">\n");
buf.append(" <tr><td><b><a href=\"#def.peer\">peer</a></b>");
if (sortFlags == FLAG_ALPHA)
@@ -1951,12 +1972,25 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_expireBuffer = new ArrayList(128);
}
public void timeReached() {
long inactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT;
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
if (haveCapacity())
_expireTimeout = Math.min(_expireTimeout + 15*1000, EXPIRE_TIMEOUT);
else
_expireTimeout = Math.max(_expireTimeout - 45*1000, MIN_EXPIRE_TIMEOUT);
long shortInactivityCutoff = _context.clock().now() - _expireTimeout;
long longInactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT;
long pingCutoff = _context.clock().now() - (2 * 60*60*1000);
_expireBuffer.clear();
synchronized (_expirePeers) {
int sz = _expirePeers.size();
for (int i = 0; i < sz; i++) {
PeerState peer = (PeerState)_expirePeers.get(i);
long inactivityCutoff;
// if we offered to introduce them, or we used them as introducer in last 2 hours
if (peer.getWeRelayToThemAs() > 0 || peer.getIntroducerTime() > pingCutoff)
inactivityCutoff = longInactivityCutoff;
else
inactivityCutoff = shortInactivityCutoff;
if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) {
_expireBuffer.add(peer);
_expirePeers.remove(i);

View File

@@ -214,6 +214,9 @@ class BuildExecutor implements Runnable {
}
*/
/** Set 1.5 * LOOP_TIME < BuildRequestor.REQUEST_TIMEOUT/4 - margin */
private static final int LOOP_TIME = 1000;
public void run() {
_isRunning = true;
List wanted = new ArrayList(8);
@@ -316,7 +319,7 @@ class BuildExecutor implements Runnable {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Nothin' doin (allowed=" + allowed + ", wanted=" + wanted.size() + ", pending=" + pendingRemaining + "), wait for a while");
//if (allowed <= 0)
_currentlyBuilding.wait(2000 + _context.random().nextInt(2*1000));
_currentlyBuilding.wait((LOOP_TIME/2) + _context.random().nextInt(LOOP_TIME));
//else // wanted <= 0
// _currentlyBuilding.wait(_context.random().nextInt(30*1000));
}

View File

@@ -498,8 +498,15 @@ class BuildHandler {
}
}
/*
* Being a IBGW or OBEP generally leads to more connections, so if we are
* approaching our connection limit (i.e. !haveCapacity()),
* reject this request.
*/
if (response == 0 && (isInGW || isOutEnd) &&
Boolean.valueOf(_context.getProperty(PROP_REJECT_NONPARTICIPANT)).booleanValue()) {
(Boolean.valueOf(_context.getProperty(PROP_REJECT_NONPARTICIPANT)).booleanValue() ||
! _context.commSystem().haveCapacity())) {
_context.throttle().setTunnelStatus("Rejecting tunnels: Connection limit");
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
}