Streaming: Don't exceed configured tag settings when overriding

This commit is contained in:
zzz
2018-02-19 14:31:51 +00:00
parent a5ca9364ef
commit 646fe20726
3 changed files with 79 additions and 15 deletions

View File

@@ -920,12 +920,12 @@ class Connection {
/** /**
* Retrieve the current ConnectionOptions. * Retrieve the current ConnectionOptions.
* @return the current ConnectionOptions * @return the current ConnectionOptions, non-null
*/ */
public ConnectionOptions getOptions() { return _options; } public ConnectionOptions getOptions() { return _options; }
/** /**
* Set the ConnectionOptions. * Set the ConnectionOptions.
* @param opts ConnectionOptions * @param opts ConnectionOptions non-null
*/ */
public void setOptions(ConnectionOptions opts) { _options = opts; } public void setOptions(ConnectionOptions opts) { _options = opts; }

View File

@@ -53,6 +53,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
private int _maxConns; private int _maxConns;
private boolean _disableRejectLog; private boolean _disableRejectLog;
private String _limitAction; private String _limitAction;
private int _tagsToSend;
private int _tagThreshold;
/** state of a connection */ /** state of a connection */
private enum AckInit { private enum AckInit {
@@ -125,6 +127,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
public static final String PROP_DISABLE_REJ_LOG = "i2p.streaming.disableRejectLogging"; public static final String PROP_DISABLE_REJ_LOG = "i2p.streaming.disableRejectLogging";
/** @since 0.9.34 reset,drop,http, or custom string, default reset */ /** @since 0.9.34 reset,drop,http, or custom string, default reset */
public static final String PROP_LIMIT_ACTION = "i2p.streaming.limitAction"; public static final String PROP_LIMIT_ACTION = "i2p.streaming.limitAction";
/** @since 0.9.34 */
public static final String PROP_TAGS_TO_SEND = "crypto.tagsToSend";
/** @since 0.9.34 */
public static final String PROP_TAG_THRESHOLD = "crypto.lowTagThreshold";
private static final int TREND_COUNT = 3; private static final int TREND_COUNT = 3;
@@ -139,7 +145,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
private static final int DEFAULT_INACTIVITY_ACTION = INACTIVITY_ACTION_SEND; private static final int DEFAULT_INACTIVITY_ACTION = INACTIVITY_ACTION_SEND;
private static final int DEFAULT_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR = 1; private static final int DEFAULT_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR = 1;
private static final int DEFAULT_SLOW_START_GROWTH_RATE_FACTOR = 1; private static final int DEFAULT_SLOW_START_GROWTH_RATE_FACTOR = 1;
/** @since 0.9.34 */
private static final String DEFAULT_LIMIT_ACTION = "reset"; private static final String DEFAULT_LIMIT_ACTION = "reset";
/** @since 0.9.34 */
public static final int DEFAULT_TAGS_TO_SEND = 40;
/** @since 0.9.34 */
public static final int DEFAULT_TAG_THRESHOLD = 30;
/** /**
@@ -352,6 +363,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay(); _maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay();
_maxConns = opts.getMaxConns(); _maxConns = opts.getMaxConns();
_limitAction = opts.getLimitAction(); _limitAction = opts.getLimitAction();
_tagsToSend = opts.getTagsToSend();
_tagThreshold = opts.getTagThreshold();
} }
/** /**
@@ -395,6 +408,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_limitAction = DEFAULT_LIMIT_ACTION; _limitAction = DEFAULT_LIMIT_ACTION;
_rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO); _rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO);
_tagsToSend = getInt(opts, PROP_TAGS_TO_SEND, DEFAULT_TAGS_TO_SEND);
_tagsToSend = getInt(opts, PROP_TAG_THRESHOLD, DEFAULT_TAG_THRESHOLD);
} }
/** /**
@@ -464,6 +479,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxConns = getInt(opts, PROP_MAX_STREAMS, 0); _maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
if (opts.getProperty(PROP_LIMIT_ACTION) != null) if (opts.getProperty(PROP_LIMIT_ACTION) != null)
_limitAction = opts.getProperty(PROP_LIMIT_ACTION); _limitAction = opts.getProperty(PROP_LIMIT_ACTION);
if (opts.getProperty(PROP_TAGS_TO_SEND) != null)
_maxConns = getInt(opts, PROP_TAGS_TO_SEND, DEFAULT_TAGS_TO_SEND);
if (opts.getProperty(PROP_TAG_THRESHOLD) != null)
_maxConns = getInt(opts, PROP_TAG_THRESHOLD, DEFAULT_TAG_THRESHOLD);
_rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO); _rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO);
} }
@@ -791,6 +810,24 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
*/ */
public String getLimitAction() { return _limitAction; } public String getLimitAction() { return _limitAction; }
/**
* This option is mostly handled on the router side,
* but PacketQueue also needs to know, so that when
* it overrides, it doesn't exceed the setting.
*
* @since 0.9.34
*/
public int getTagsToSend() { return _tagsToSend; }
/**
* This option is mostly handled on the router side,
* but PacketQueue also needs to know, so that when
* it overrides, it doesn't exceed the setting.
*
* @since 0.9.34
*/
public int getTagThreshold() { return _tagThreshold; }
private void initLists(ConnectionOptions opts) { private void initLists(ConnectionOptions opts) {
_accessList = opts.getAccessList(); _accessList = opts.getAccessList();
_blackList = opts.getBlacklist(); _blackList = opts.getBlacklist();

View File

@@ -125,16 +125,8 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
if (expires > 0) if (expires > 0)
options.setDate(expires); options.setDate(expires);
boolean listenForStatus = false; boolean listenForStatus = false;
if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) { // FINAL trumps INITIAL, in the case of SYN+CLOSE
if (con != null) { if (packet.isFlagSet(FLAGS_FINAL_TAGS)) {
if (con.isInbound())
options.setSendLeaseSet(false);
else if (ENABLE_STATUS_LISTEN)
listenForStatus = true;
}
options.setTagsToSend(INITIAL_TAGS_TO_SEND);
options.setTagThreshold(MIN_TAG_THRESHOLD);
} else if (packet.isFlagSet(FLAGS_FINAL_TAGS)) {
if (packet.isFlagSet(Packet.FLAG_ECHO)) { if (packet.isFlagSet(Packet.FLAG_ECHO)) {
// Send LS for PING, not for PONG // Send LS for PING, not for PONG
if (packet.getSendStreamId() <= 0) // pong if (packet.getSendStreamId() <= 0) // pong
@@ -142,16 +134,51 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
} else { } else {
options.setSendLeaseSet(false); options.setSendLeaseSet(false);
} }
options.setTagsToSend(FINAL_TAGS_TO_SEND); int sendTags = FINAL_TAGS_TO_SEND;
options.setTagThreshold(FINAL_TAG_THRESHOLD); int tagThresh = FINAL_TAG_THRESHOLD;
if (con != null) {
ConnectionOptions copts = con.getOptions();
int cSendTags = copts.getTagsToSend();
int cTagThresh = copts.getTagThreshold();
if (cSendTags < sendTags)
sendTags = cSendTags;
if (cTagThresh < tagThresh)
tagThresh = cTagThresh;
}
options.setTagsToSend(sendTags);
options.setTagThreshold(tagThresh);
} else if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) {
if (con != null) {
if (con.isInbound())
options.setSendLeaseSet(false);
else if (ENABLE_STATUS_LISTEN)
listenForStatus = true;
}
int sendTags = INITIAL_TAGS_TO_SEND;
int tagThresh = MIN_TAG_THRESHOLD;
if (con != null) {
ConnectionOptions copts = con.getOptions();
int cSendTags = copts.getTagsToSend();
int cTagThresh = copts.getTagThreshold();
if (cSendTags < sendTags)
sendTags = cSendTags;
if (cTagThresh < tagThresh)
tagThresh = cTagThresh;
}
options.setTagsToSend(sendTags);
options.setTagThreshold(tagThresh);
} else { } else {
if (con != null) { if (con != null) {
if (con.isInbound() && con.getLifetime() < 2*60*1000) if (con.isInbound() && con.getLifetime() < 2*60*1000)
options.setSendLeaseSet(false); options.setSendLeaseSet(false);
// increase threshold with higher window sizes to prevent stalls // increase threshold with higher window sizes to prevent stalls
// after tag delivery failure // after tag delivery failure
int wdw = con.getOptions().getWindowSize(); ConnectionOptions copts = con.getOptions();
int wdw = copts.getWindowSize();
int thresh = Math.max(MIN_TAG_THRESHOLD, wdw * TAG_WINDOW_FACTOR); int thresh = Math.max(MIN_TAG_THRESHOLD, wdw * TAG_WINDOW_FACTOR);
int cTagThresh = copts.getTagThreshold();
if (cTagThresh < thresh)
thresh = cTagThresh;
options.setTagThreshold(thresh); options.setTagThreshold(thresh);
} }
} }