This commit is contained in:
zzz
2012-12-29 13:23:57 +00:00
parent 667393e8cf
commit 2f4e3862e3

View File

@@ -28,6 +28,7 @@ import net.i2p.util.Log;
* Network communication system when it receives messages, and various jobs * Network communication system when it receives messages, and various jobs
* periodically retrieve them for processing. * periodically retrieve them for processing.
* *
* Actually, this doesn't 'pool' anything, since DISPATCH_DIRECT = true.
*/ */
public class InNetMessagePool implements Service { public class InNetMessagePool implements Service {
private final Log _log; private final Log _log;
@@ -149,9 +150,9 @@ public class InNetMessagePool implements Service {
_log.log(level, "Dropping message [" + messageBody.getUniqueId() _log.log(level, "Dropping message [" + messageBody.getUniqueId()
+ " expiring on " + exp + "]: " + messageBody.getClass().getSimpleName() + ": " + invalidReason + " expiring on " + exp + "]: " + messageBody.getClass().getSimpleName() + ": " + invalidReason
+ ": " + messageBody); + ": " + messageBody);
_context.statManager().addRateData("inNetPool.dropped", 1, 0); _context.statManager().addRateData("inNetPool.dropped", 1);
// FIXME not necessarily a duplicate, could be expired too long ago / too far in future // FIXME not necessarily a duplicate, could be expired too long ago / too far in future
_context.statManager().addRateData("inNetPool.duplicate", 1, 0); _context.statManager().addRateData("inNetPool.duplicate", 1);
_context.messageHistory().droppedOtherMessage(messageBody, (fromRouter != null ? fromRouter.calculateHash() : fromRouterHash)); _context.messageHistory().droppedOtherMessage(messageBody, (fromRouter != null ? fromRouter.calculateHash() : fromRouterHash));
_context.messageHistory().messageProcessingError(messageBody.getUniqueId(), _context.messageHistory().messageProcessingError(messageBody.getUniqueId(),
messageBody.getClass().getSimpleName(), messageBody.getClass().getSimpleName(),
@@ -167,10 +168,10 @@ public class InNetMessagePool implements Service {
int type = messageBody.getType(); int type = messageBody.getType();
boolean allowMatches = true; boolean allowMatches = true;
if (messageBody instanceof TunnelGatewayMessage) { if (type == TunnelGatewayMessage.MESSAGE_TYPE) {
shortCircuitTunnelGateway(messageBody); shortCircuitTunnelGateway(messageBody);
allowMatches = false; allowMatches = false;
} else if (messageBody instanceof TunnelDataMessage) { } else if (type == TunnelDataMessage.MESSAGE_TYPE) {
shortCircuitTunnelData(messageBody, fromRouterHash); shortCircuitTunnelData(messageBody, fromRouterHash);
allowMatches = false; allowMatches = false;
} else { } else {
@@ -213,7 +214,7 @@ public class InNetMessagePool implements Service {
long timeSinceSent = _context.clock().now() - arr; long timeSinceSent = _context.clock().now() - arr;
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Dropping unhandled delivery status message created " + timeSinceSent + "ms ago: " + messageBody); _log.warn("Dropping unhandled delivery status message created " + timeSinceSent + "ms ago: " + messageBody);
_context.statManager().addRateData("inNetPool.droppedDeliveryStatusDelay", timeSinceSent, timeSinceSent); _context.statManager().addRateData("inNetPool.droppedDeliveryStatusDelay", timeSinceSent);
} }
//} else if (type == TunnelCreateStatusMessage.MESSAGE_TYPE) { //} else if (type == TunnelCreateStatusMessage.MESSAGE_TYPE) {
// if (_log.shouldLog(Log.INFO)) // if (_log.shouldLog(Log.INFO))
@@ -222,7 +223,7 @@ public class InNetMessagePool implements Service {
} else if (type == DatabaseSearchReplyMessage.MESSAGE_TYPE) { } else if (type == DatabaseSearchReplyMessage.MESSAGE_TYPE) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Dropping slow db lookup response: " + messageBody); _log.info("Dropping slow db lookup response: " + messageBody);
_context.statManager().addRateData("inNetPool.droppedDbLookupResponseMessage", 1, 0); _context.statManager().addRateData("inNetPool.droppedDbLookupResponseMessage", 1);
} else if (type == DatabaseLookupMessage.MESSAGE_TYPE) { } else if (type == DatabaseLookupMessage.MESSAGE_TYPE) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Dropping netDb lookup due to throttling"); _log.debug("Dropping netDb lookup due to throttling");
@@ -232,7 +233,7 @@ public class InNetMessagePool implements Service {
+ messageBody.getMessageExpiration() + messageBody.getMessageExpiration()
+ " was not handled by a HandlerJobBuilder - DROPPING: " + messageBody, + " was not handled by a HandlerJobBuilder - DROPPING: " + messageBody,
new Exception("f00!")); new Exception("f00!"));
_context.statManager().addRateData("inNetPool.dropped", 1, 0); _context.statManager().addRateData("inNetPool.dropped", 1);
} }
} else { } else {
String mtype = messageBody.getClass().getName(); String mtype = messageBody.getClass().getName();
@@ -295,10 +296,7 @@ public class InNetMessagePool implements Service {
private void doShortCircuitTunnelGateway(I2NPMessage messageBody) { private void doShortCircuitTunnelGateway(I2NPMessage messageBody) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Shortcut dispatch tunnelGateway message " + messageBody); _log.debug("Shortcut dispatch tunnelGateway message " + messageBody);
long before = _context.clock().now();
_context.tunnelDispatcher().dispatch((TunnelGatewayMessage)messageBody); _context.tunnelDispatcher().dispatch((TunnelGatewayMessage)messageBody);
long dispatchTime = _context.clock().now() - before;
_context.statManager().addRateData("tunnel.dispatchGatewayTime", dispatchTime, dispatchTime);
} }
private void shortCircuitTunnelData(I2NPMessage messageBody, Hash from) { private void shortCircuitTunnelData(I2NPMessage messageBody, Hash from) {
@@ -423,7 +421,7 @@ public class InNetMessagePool implements Service {
long before = _context.clock().now(); long before = _context.clock().now();
doShortCircuitTunnelGateway(msg); doShortCircuitTunnelGateway(msg);
long elapsed = _context.clock().now() - before; long elapsed = _context.clock().now() - before;
_context.statManager().addRateData("pool.dispatchGatewayTime", elapsed, elapsed); _context.statManager().addRateData("pool.dispatchGatewayTime", elapsed);
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@@ -456,7 +454,7 @@ public class InNetMessagePool implements Service {
long before = _context.clock().now(); long before = _context.clock().now();
doShortCircuitTunnelData(msg, from); doShortCircuitTunnelData(msg, from);
long elapsed = _context.clock().now() - before; long elapsed = _context.clock().now() - before;
_context.statManager().addRateData("pool.dispatchDataTime", elapsed, elapsed); _context.statManager().addRateData("pool.dispatchDataTime", elapsed);
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {