Compare commits

...

19 Commits

Author SHA1 Message Date
jrandom
61c97ab940 0.3.1.2 (backwards compatible, etc) 2004-05-13 23:49:08 +00:00
jrandom
b0a1b3b5ca added some harvest options
dont use javaw, since its a bitch to kill multiple jvms (yeah, this leaves a dos box.  we'll deal until we've got the shutdown admin control)
2004-05-13 23:32:57 +00:00
jrandom
4c7af01edc allow dynamic update to the reliability threshold factor (e.g. rather than the top 2/3rds being considered "reliable", allow that to be the top 1/3, or 1/2, etc)
the router.config var "profileOrganizer.reliabilityThresholdFactor=0.75" (or environment property -DprofileOrganizer.reliabilityThresholdFactor=0.5) etc
2004-05-13 23:24:09 +00:00
jrandom
0d431213cd include the previous period in the measurements (since they're discrete, not rolling)
also include the other elements as necessary by default
2004-05-13 07:14:54 +00:00
jrandom
ad9dd9a2e2 Lots of updates. I'm not calling this 0.3.1.2, still need to
"burn it it" some more, but its looking good.
* test all tunnels we manage every period or two. later we'll want to include some randomization to help fight traffic analysis, but that falls into the i2p 3.0 tunnel chaff / mixing / etc)
* test inbound tunnels correctly (use an outbound tunnel, not direct)
* only give the tunnels 30 seconds to succeed
* mark the tunnel as tested for both the inbound and outbound side and adjust the profiles for all participants accordingly
* keep track of the 'last test time' on a tunnel
* new tunnel test response time profile stat, as well as overall router stat (published in the netDb as "tunnel.testSuccessTime")
* rewrite of the speed calculator - the value it generates now is essentially "how many round trip messages can this router pass in a minute".
  it also allows a few runtime configurable options:
  = speedCalculator.eventThreshold:
    we use the smallest measurement period that has at least this many events in it (10m, 60m, 24h, lifetime)
  = speedCalculator.useInstantaneousRates:
    when we use the estimated round trip time, do we use instantaneous or period averages?
  = speedCalculator.useTunnelTestOnly:
    do we only use the tunnel test time (no db response or tunnel create time, or even estimated round trip times)?
* fix the reliability calculator to use the 10 minute tunnel create successes, not the (almost always 0) 1 minute rate.
* persist the tunnel create response time and send success time correctly (duh)
* add a new main() to PeerProfile - PeerProfile [filename]* will calculate the values of the peer profiles specified.  useful for tweaking the calculator, and/or the configurable options.  ala:
     java -DspeedCalculator.useInstantaneousRates peerProfiles/profile-*.dat
2004-05-13 04:32:26 +00:00
jrandom
c7895ed905 oh, you mean we're supposed to be at least a /little/ resiliant? 2004-05-13 03:54:33 +00:00
jrandom
57d7979d51 removed obsolete code
minor reorganization to help track down whats intermittently b0rking my kaffe instance
logging
2004-05-12 07:55:25 +00:00
jrandom
61f6871cd1 logging and a catastrophic try/catch (no situations have called for this yet, but its worth testing for) 2004-05-12 07:47:22 +00:00
jrandom
406048f7b9 ugly tests to see if the minimal RAW side of SAM works (it does, w00t) 2004-05-11 03:00:53 +00:00
jrandom
6dd5b0fe45 basic datagram tests (that work now :) 2004-05-11 02:44:16 +00:00
jrandom
fd4bc5e3cf keystream fixes 2004-05-11 02:43:52 +00:00
jrandom
3bab2d8957 only append the client's config properties to the SESSION commands (since some of the rest get confused with unknown tags...) 2004-05-11 02:07:27 +00:00
jrandom
af2f5cd2e1 kaffe shits a brick if you want the socket's address after .close() (grumble) 2004-05-11 01:55:22 +00:00
jrandom
d4bb32da82 fix per http://twiki.ntp.org/bin/view/Support/JavaSntpClientDev (thanks duck!)
this may have caused some clock skew problems on the deployed 0.3.1.1, so we'll get this deployed asap
2004-05-11 01:45:18 +00:00
jrandom
08aca6ca61 while (true) { m00; } 2004-05-09 07:23:43 +00:00
jrandom
697b3c6772 SAM .net lib work in progress - dm and firerabbit 2004-05-09 07:16:04 +00:00
jrandom
878525ced8 handle corrupt files more gracefully 2004-05-09 04:14:30 +00:00
shendaras
418531736b imports
Did you miss me?
(shendaras)
2004-05-09 01:31:12 +00:00
jrandom
6c175440c6 added mush.zeit.i2p 2004-05-08 06:41:01 +00:00
37 changed files with 1511 additions and 255 deletions

View File

@@ -4,7 +4,6 @@ import net.i2p.data.Destination;
import net.i2p.util.Clock;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.Clock;
/**
* Responsible for actually conducting the tests, coordinating the storing of the

View File

@@ -10,7 +10,6 @@ import javax.swing.JMenu;
import javax.swing.JMenuBar;
import javax.swing.JMenuItem;
import javax.swing.JScrollPane;
import javax.swing.JSplitPane;
class HeartbeatMonitorGUI extends JFrame {
private HeartbeatMonitor _monitor;

View File

@@ -1,6 +1,5 @@
package net.i2p.heartbeat.gui;
import java.awt.Color;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

View File

@@ -1,26 +1,23 @@
package net.i2p.heartbeat.gui;
import java.awt.Color;
import java.awt.Font;
import java.util.List;
import org.jfree.chart.ChartPanel;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.axis.DateAxis;
import org.jfree.chart.axis.NumberAxis;
import org.jfree.chart.plot.Plot;
import org.jfree.chart.plot.XYPlot;
import org.jfree.chart.renderer.XYItemRenderer;
import org.jfree.chart.renderer.XYLineAndShapeRenderer;
import org.jfree.data.XYSeries;
import org.jfree.data.XYSeriesCollection;
import net.i2p.heartbeat.PeerData;
import net.i2p.util.Log;
import org.jfree.data.XYSeries;
import org.jfree.data.XYSeriesCollection;
import org.jfree.data.MovingAverage;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.ChartPanel;
import org.jfree.chart.plot.Plot;
import org.jfree.chart.plot.XYPlot;
import org.jfree.chart.axis.DateAxis;
import org.jfree.chart.axis.NumberAxis;
import org.jfree.chart.renderer.XYLineAndShapeRenderer;
import org.jfree.chart.renderer.XYItemRenderer;
import org.jfree.chart.renderer.XYDotRenderer;
import java.util.List;
import javax.swing.JPanel;
import java.awt.Font;
import java.awt.Color;
class JFreeChartAdapter {
private final static Log _log = new Log(JFreeChartAdapter.class);
private final static Color WHITE = new Color(255, 255, 255);

View File

@@ -1,21 +1,14 @@
package net.i2p.heartbeat.gui;
import javax.swing.JPanel;
import javax.swing.JTextArea;
import javax.swing.JScrollPane;
import javax.swing.JLabel;
import java.awt.BorderLayout;
import java.awt.Color;
import java.awt.Dimension;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import net.i2p.heartbeat.PeerDataWriter;
import net.i2p.util.Log;
import javax.swing.JLabel;
import javax.swing.JScrollPane;
import org.jfree.chart.ChartPanel;
import net.i2p.util.Log;
/**
* Render the graph and legend
*

View File

@@ -33,57 +33,61 @@ class PeerSummaryReader {
PeerSummary summary = null;
String curDescription = null;
List curArgs = null;
while ((line = reader.readLine()) != null) {
if (line.startsWith("peer\t")) {
String name = line.substring("peer\t".length()).trim();
summary = monitor.getSummary(name);
if (summary == null)
summary = new PeerSummary(name);
} else if (line.startsWith("## ")) {
curDescription = line.substring("## ".length()).trim();
curArgs = new ArrayList(4);
} else if (line.startsWith("# param ")) {
int start = line.indexOf(':');
String arg = line.substring(start+1).trim();
curArgs.add(arg);
} else {
StringTokenizer tok = new StringTokenizer(line);
String name = tok.nextToken();
try {
long when = getTime(tok.nextToken());
boolean isDouble = false;
List argVals = new ArrayList(curArgs.size());
while (tok.hasMoreTokens()) {
String val = (String)tok.nextToken();
if (val.indexOf('.') >= 0) {
argVals.add(new Double(val));
isDouble = true;
} else {
argVals.add(new Long(val));
try {
while ((line = reader.readLine()) != null) {
if (line.startsWith("peer\t")) {
String name = line.substring("peer\t".length()).trim();
summary = monitor.getSummary(name);
if (summary == null)
summary = new PeerSummary(name);
} else if (line.startsWith("## ")) {
curDescription = line.substring("## ".length()).trim();
curArgs = new ArrayList(4);
} else if (line.startsWith("# param ")) {
int start = line.indexOf(':');
String arg = line.substring(start+1).trim();
curArgs.add(arg);
} else {
StringTokenizer tok = new StringTokenizer(line);
String name = tok.nextToken();
try {
long when = getTime(tok.nextToken());
boolean isDouble = false;
List argVals = new ArrayList(curArgs.size());
while (tok.hasMoreTokens()) {
String val = (String)tok.nextToken();
if (val.indexOf('.') >= 0) {
argVals.add(new Double(val));
isDouble = true;
} else {
argVals.add(new Long(val));
}
}
String valDescriptions[] = new String[curArgs.size()];
for (int i = 0; i < curArgs.size(); i++)
valDescriptions[i] = (String)curArgs.get(i);
if (isDouble) {
double values[] = new double[argVals.size()];
for (int i = 0; i < argVals.size(); i++)
values[i] = ((Double)argVals.get(i)).doubleValue();
summary.addData(name, curDescription, valDescriptions, when, values);
} else {
long values[] = new long[argVals.size()];
for (int i = 0; i < argVals.size(); i++)
values[i] = ((Long)argVals.get(i)).longValue();
summary.addData(name, curDescription, valDescriptions, when, values);
}
} catch (ParseException pe) {
_log.error("Error parsing the data line [" + line + "]", pe);
} catch (NumberFormatException nfe) {
_log.error("Error parsing the data line [" + line + "]", nfe);
}
String valDescriptions[] = new String[curArgs.size()];
for (int i = 0; i < curArgs.size(); i++)
valDescriptions[i] = (String)curArgs.get(i);
if (isDouble) {
double values[] = new double[argVals.size()];
for (int i = 0; i < argVals.size(); i++)
values[i] = ((Double)argVals.get(i)).doubleValue();
summary.addData(name, curDescription, valDescriptions, when, values);
} else {
long values[] = new long[argVals.size()];
for (int i = 0; i < argVals.size(); i++)
values[i] = ((Long)argVals.get(i)).longValue();
summary.addData(name, curDescription, valDescriptions, when, values);
}
} catch (ParseException pe) {
_log.error("Error parsing the data line [" + line + "]", pe);
} catch (NumberFormatException nfe) {
_log.error("Error parsing the data line [" + line + "]", nfe);
}
}
}
} catch (Exception e) {
_log.error("Error handling the data", e);
throw new IOException("Error parsing the data");
}
if (summary == null)
return;
summary.coallesceData(monitor.getSummaryDurationHours() * 60*60*1000);

View File

@@ -0,0 +1,58 @@
using System.Reflection;
using System.Runtime.CompilerServices;
//
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
//
[assembly: AssemblyTitle("")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("")]
[assembly: AssemblyCopyright("")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
//
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Revision and Build Numbers
// by using the '*' as shown below:
[assembly: AssemblyVersion("1.0.*")]
//
// In order to sign your assembly you must specify a key to use. Refer to the
// Microsoft .NET Framework documentation for more information on assembly signing.
//
// Use the attributes below to control which key is used for signing.
//
// Notes:
// (*) If no key is specified, the assembly is not signed.
// (*) KeyName refers to a key that has been installed in the Crypto Service
// Provider (CSP) on your machine. KeyFile refers to a file which contains
// a key.
// (*) If the KeyFile and the KeyName values are both specified, the
// following processing occurs:
// (1) If the KeyName can be found in the CSP, that key is used.
// (2) If the KeyName does not exist and the KeyFile does exist, the key
// in the KeyFile is installed into the CSP and used.
// (*) In order to create a KeyFile, you can use the sn.exe (Strong Name) utility.
// When specifying the KeyFile, the location of the KeyFile should be
// relative to the project output directory which is
// %Project Directory%\obj\<configuration>. For example, if your KeyFile is
// located in the project directory, you would specify the AssemblyKeyFile
// attribute as [assembly: AssemblyKeyFile("..\\..\\mykey.snk")]
// (*) Delay Signing is an advanced option - see the Microsoft .NET Framework
// documentation for more information on this.
//
[assembly: AssemblyDelaySign(false)]
[assembly: AssemblyKeyFile("")]
[assembly: AssemblyKeyName("")]

View File

@@ -0,0 +1,50 @@
using System;
using System.Net;
using System.Threading;
using System.Text;
using System.Collections;
namespace SAM.NET
{
class SAMTester
{
[STAThread]
static void Main(string[] args)
{
new SAMTester();
}
public SAMTester ()
{
SAMConnection connection1 = new SAMConnection(IPAddress.Parse("127.0.0.1"),7656);
SAMSession session1 = new SAMSession(connection1,SAM.NET.SamSocketType.Stream,"alice");
SAMConnection connection2 = new SAMConnection(IPAddress.Parse("127.0.0.1"),7656);
SAMSession session2 = new SAMSession(connection2,SAM.NET.SamSocketType.Stream,"bob");
SAMStream stream1 = new SAMStream(connection1,session1,233);
stream1.Connect(session2.getKey());
//Wait till we are connected to destination
while (!stream1.isConnected)
Thread.Sleep(1000);
//Send some bytes
stream1.Write(Encoding.ASCII.GetBytes(DateTime.Now.ToLongTimeString() + "Hi!!!!!!"));
//Wait till a stream magically appears on the other side
while (session2.getStreams().Count == 0) Thread.Sleep(1000);
Thread.Sleep(1000);
foreach (SAMStream stream in session2.getStreams().Values)
{
Console.WriteLine("Text received on " + stream.getID() + " at " + DateTime.Now.ToLongTimeString());
Console.WriteLine(Encoding.ASCII.GetString(stream.ReadToEnd()));
stream.Close();
}
stream1.Close();
connection1.Close();
connection2.Close();
}
}
}

View File

@@ -0,0 +1,58 @@
using System.Reflection;
using System.Runtime.CompilerServices;
//
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
//
[assembly: AssemblyTitle("")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("")]
[assembly: AssemblyCopyright("")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
//
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Revision and Build Numbers
// by using the '*' as shown below:
[assembly: AssemblyVersion("1.0.*")]
//
// In order to sign your assembly you must specify a key to use. Refer to the
// Microsoft .NET Framework documentation for more information on assembly signing.
//
// Use the attributes below to control which key is used for signing.
//
// Notes:
// (*) If no key is specified, the assembly is not signed.
// (*) KeyName refers to a key that has been installed in the Crypto Service
// Provider (CSP) on your machine. KeyFile refers to a file which contains
// a key.
// (*) If the KeyFile and the KeyName values are both specified, the
// following processing occurs:
// (1) If the KeyName can be found in the CSP, that key is used.
// (2) If the KeyName does not exist and the KeyFile does exist, the key
// in the KeyFile is installed into the CSP and used.
// (*) In order to create a KeyFile, you can use the sn.exe (Strong Name) utility.
// When specifying the KeyFile, the location of the KeyFile should be
// relative to the project output directory which is
// %Project Directory%\obj\<configuration>. For example, if your KeyFile is
// located in the project directory, you would specify the AssemblyKeyFile
// attribute as [assembly: AssemblyKeyFile("..\\..\\mykey.snk")]
// (*) Delay Signing is an advanced option - see the Microsoft .NET Framework
// documentation for more information on this.
//
[assembly: AssemblyDelaySign(false)]
[assembly: AssemblyKeyFile("")]
[assembly: AssemblyKeyName("")]

View File

@@ -0,0 +1,271 @@
using System;
using System.Net.Sockets;
using System.Text;
using System.Net;
using System.IO;
using System.Collections;
using System.Threading;
namespace SAM.NET
{
public enum SamSocketType
{
Stream,
Datagram,
Raw
}
public class SAMConnection
{
private const string propertyMinVersion = "1.0";
private const string propertyMaxVersion = "1.0";
private Socket _sock;
private NetworkStream _sockStream;
private StreamReader _sockStreamIn;
private StreamWriter _sockStreamOut;
public SAMConnection(IPAddress routerIP, int port)
{
_sock = new Socket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
IPEndPoint rEP = new IPEndPoint(routerIP,port);
_sock.Connect(rEP);
_sockStream = new NetworkStream(_sock);
_sockStreamIn = new StreamReader(_sockStream);
_sockStreamOut = new StreamWriter(_sockStream);
try
{
sendVersion(propertyMinVersion,propertyMinVersion);
}
catch (Exception e)
{
_sock.Close();
throw (new Exception("No SAM for you :("));
}
}
private void sendVersion(string min, string max)
{
_sockStreamOut.WriteLine("HELLO VERSION MIN=" + propertyMinVersion + " MAX=" + propertyMaxVersion);
_sockStreamOut.Flush();
Hashtable response = SAMUtil.parseKeyValues(_sockStreamIn.ReadLine(),2);
if (response["RESULT"].ToString() != "OK") throw (new Exception("Version mismatch"));
}
public StreamWriter getOutputStream()
{
return _sockStreamOut;
}
public StreamReader getInputStream()
{
return _sockStreamIn;
}
public NetworkStream getStream()
{
return _sockStream;
}
public void Close()
{
_sock.Close();
}
}
/*
* Creating a SAMSession object will automatically:
* 1) create a sesion on SAM
* 1) query for the base64key
* 2) start a listening thread to catch all stream commands
*/
public class SAMSession
{
private Hashtable _streams;
private string _sessionKey;
public SAMSession (SAMConnection connection, SamSocketType type, string destination)
{
_streams = new Hashtable();
StreamWriter writer = connection.getOutputStream();
StreamReader reader = connection.getInputStream();
writer.WriteLine("SESSION CREATE STYLE=STREAM DESTINATION=" + destination);
writer.Flush();
Hashtable response = SAMUtil.parseKeyValues(reader.ReadLine(),2);
if (response["RESULT"].ToString() != "OK")
{
throw (new Exception(response["MESSAGE"].ToString()));
}
else
{
writer.WriteLine("NAMING LOOKUP NAME=ME");
writer.Flush();
response = SAMUtil.parseKeyValues(reader.ReadLine(),2);
_sessionKey = response["VALUE"].ToString();
SAMSessionListener listener = new SAMSessionListener(connection,this,_streams);
new Thread(new ThreadStart(listener.startListening)).Start();
}
}
public void addStream(SAMStream stream)
{
_streams.Add(stream.getID(),stream);
}
public string getKey()
{
return _sessionKey;
}
public Hashtable getStreams()
{
return _streams;
}
}
public class SAMSessionListener
{
private Hashtable _streams;
private SAMConnection _connection;
private SAMSession _session;
private bool stayAlive = true;
public SAMSessionListener(SAMConnection connection,SAMSession session, Hashtable streams)
{
_streams = streams;
_connection = connection;
_session = session;
}
public void startListening()
{
StreamReader reader = _connection.getInputStream();
while (stayAlive)
{
string response = reader.ReadLine();
if (response.StartsWith("STREAM STATUS"))
{
Hashtable values = SAMUtil.parseKeyValues(response,2);
SAMStream theStream = (SAMStream)_streams[int.Parse(values["ID"].ToString())];
if (theStream != null) theStream.ReceivedStatus(values);
}
if (response.StartsWith("STREAM CONNECTED"))
{
Hashtable values = SAMUtil.parseKeyValues(response,2);
SAMStream theStream = (SAMStream)_streams[int.Parse(values["ID"].ToString())];
if (theStream != null) theStream.isConnected = true;
}
if (response.StartsWith("STREAM RECEIVED"))
{
Hashtable values = SAMUtil.parseKeyValues(response,2);
int streamID = int.Parse(values["ID"].ToString());
SAMStream theStream = (SAMStream)_streams[streamID];
if (theStream == null) new SAMStream(_connection,_session,streamID);
theStream = (SAMStream)_streams[streamID];
theStream.ReceivedData(int.Parse(values["SIZE"].ToString()));
}
if (response.StartsWith("STREAM CLOSE"))
{
Hashtable values = SAMUtil.parseKeyValues(response,2);
SAMStream theStream = (SAMStream)_streams[int.Parse(values["ID"].ToString())];
if (theStream != null) theStream.isConnected = false;
}
}
}
}
public class SAMStream
{
private int _ID;
private byte[] _data;
private int _position=0;
private int _size=0;
private SAMSession _session;
private SAMConnection _connection;
public bool isConnected=false;
public SAMStream (SAMConnection connection,SAMSession session, int ID)
{
_data = new byte[100000]; //FIXME: change to non-static structure for storing stream data
_ID = ID;
_connection = connection;
_session = session;
_session.addStream(this);
}
public void Connect(string destination)
{
StreamWriter writer = _connection.getOutputStream();
writer.WriteLine("STREAM CONNECT ID=" + _ID.ToString() + " DESTINATION=" + destination);
writer.Flush();
}
public void ReceivedData(int size) //FIXME: WTF is going on when reading the payload here? All zeros and way too many of them.
{
NetworkStream stream = _connection.getStream();
int bytesRead = stream.Read(_data,_size,size);
_size = _size + bytes;
}
public void ReceivedStatus(Hashtable response)
{
if (response["RESULT"].ToString() != "OK")
{
throw (new Exception(response["RESULT"].ToString()));
}
else
{
isConnected = true;
}
}
public int getID() {return _ID;}
public bool DataAvailable()
{
return _position != _size;
}
public void Write(byte[] buf)
{
NetworkStream stream = _connection.getStream();
int sent = 0;
while (sent < buf.Length)
{
int toSend = Math.Min(buf.Length - sent,32768);
string header = "STREAM SEND ID=" + _ID.ToString() + " SIZE=" + toSend.ToString() + "\n";
byte[] headerbytes = Encoding.ASCII.GetBytes(header);
stream.Write(headerbytes,0,headerbytes.Length);
stream.Write(buf,sent,toSend);
sent = sent + toSend;
}
}
public byte[] ReadToEnd()
{
byte[] ret = new byte[_size - _position];
Array.Copy(_data,_position,ret,0,_size - _position);
_position = _size;
return ret;
}
public void Close()
{
StreamWriter writer = _connection.getOutputStream();
writer.WriteLine("STREAM CLOSE " + _ID.ToString());
writer.Flush();
}
}
public class SAMUtil
{
public static Hashtable parseKeyValues(string str, int startingWord)
{
Hashtable hash = new Hashtable();
string strTruncated = string.Join(" ",str.Split(' '),startingWord,str.Split(' ').Length - startingWord);
string[] sets = strTruncated.Split('=',' ');
for (int i=0; i<sets.Length; i=i+2)
{
hash.Add(sets[i],sets[i+1]);
}
return hash;
}
}
}

View File

@@ -127,8 +127,6 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
+ "\"; opcode: \"" + opcode + "\")");
}
props = SAMUtils.parseParams(tok);
if (i2cpProps != null)
props.putAll(i2cpProps); // make sure we've got the i2cp settings
if (domain.equals("STREAM")) {
canContinue = execStreamMessage(opcode, props);
@@ -137,6 +135,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
} else if (domain.equals("RAW")) {
canContinue = execRawMessage(opcode, props);
} else if (domain.equals("SESSION")) {
if (i2cpProps != null)
props.putAll(i2cpProps); // make sure we've got the i2cp settings
canContinue = execSessionMessage(opcode, props);
} else if (domain.equals("DEST")) {
canContinue = execDestMessage(opcode, props);
@@ -234,9 +234,9 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
props.remove("STYLE");
if (style.equals("RAW")) {
rawSession = new SAMRawSession(dest, props, this);
rawSession = new SAMRawSession(destKeystream, props, this);
} else if (style.equals("DATAGRAM")) {
datagramSession = new SAMDatagramSession(dest, props,this);
datagramSession = new SAMDatagramSession(destKeystream, props,this);
} else if (style.equals("STREAM")) {
String dir = props.getProperty("DIRECTION");
if (dir == null) {

View File

@@ -0,0 +1,78 @@
package net.i2p.sam;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import net.i2p.util.Log;
public class TestCreateSessionDatagram {
private static Log _log = new Log(TestCreateSessionDatagram.class);
private static void runTest(String samHost, int samPort, String conOptions) {
testTransient(samHost, samPort, conOptions);
testNewDest(samHost, samPort, conOptions);
testOldDest(samHost, samPort, conOptions);
}
private static void testTransient(String host, int port, String conOptions) {
testDest(host, port, conOptions, "TRANSIENT");
_log.debug("\n\nTest of transient complete\n\n\n");
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
}
private static void testNewDest(String host, int port, String conOptions) {
String destName = "Alice" + Math.random();
testDest(host, port, conOptions, destName);
}
private static void testOldDest(String host, int port, String conOptions) {
String destName = "Alice" + Math.random();
testDest(host, port, conOptions, destName);
_log.debug("\n\nTest of initial contact for " + destName + " complete, waiting 90 seconds");
try { Thread.sleep(90*1000); } catch (InterruptedException ie) {}
_log.debug("now testing subsequent contact\n\n\n");
testDest(host, port, conOptions, destName);
_log.debug("\n\nTest of subsequent contact complete\n\n");
}
private static void testDest(String host, int port, String conOptions, String destName) {
_log.info("\n\nTesting creating a new destination (should come back with 'SESSION STATUS RESULT=OK DESTINATION=someName)\n\n\n");
try {
Socket s = new Socket(host, port);
OutputStream out = s.getOutputStream();
out.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes());
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
String line = reader.readLine();
_log.debug("line read for valid version: " + line);
String req = "SESSION CREATE STYLE=DATAGRAM DESTINATION=" + destName + " " + conOptions + "\n";
out.write(req.getBytes());
line = reader.readLine();
_log.info("Response to creating the session with destination " + destName + ": " + line);
_log.debug("The above should contain SESSION STATUS RESULT=OK\n\n\n");
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
s.close();
} catch (Exception e) {
_log.error("Error testing for valid version", e);
}
}
public static void main(String args[]) {
// "i2cp.tcp.host=www.i2p.net i2cp.tcp.port=7765";
// "i2cp.tcp.host=localhost i2cp.tcp.port=7654 tunnels.inboundDepth=0";
String conOptions = "i2cp.tcp.host=localhost i2cp.tcp.port=7654 tunnels.inboundDepth=0"; // "i2cp.tcp.host=dev.i2p.net i2cp.tcp.port=7002 tunnels.inboundDepth=0";
if (args.length > 0) {
conOptions = "";
for (int i = 0; i < args.length; i++)
conOptions = conOptions + " " + args[i];
}
try {
TestUtil.startupBridge(6000);
runTest("localhost", 6000, conOptions);
} catch (Throwable t) {
_log.error("Error running test", t);
}
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
System.exit(0);
}
}

View File

@@ -0,0 +1,78 @@
package net.i2p.sam;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import net.i2p.util.Log;
public class TestCreateSessionRaw {
private static Log _log = new Log(TestCreateSessionRaw.class);
private static void runTest(String samHost, int samPort, String conOptions) {
testTransient(samHost, samPort, conOptions);
testNewDest(samHost, samPort, conOptions);
testOldDest(samHost, samPort, conOptions);
}
private static void testTransient(String host, int port, String conOptions) {
testDest(host, port, conOptions, "TRANSIENT");
_log.debug("\n\nTest of transient complete\n\n\n");
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
}
private static void testNewDest(String host, int port, String conOptions) {
String destName = "Alice" + Math.random();
testDest(host, port, conOptions, destName);
}
private static void testOldDest(String host, int port, String conOptions) {
String destName = "Alice" + Math.random();
testDest(host, port, conOptions, destName);
_log.debug("\n\nTest of initial contact for " + destName + " complete, waiting 90 seconds");
try { Thread.sleep(90*1000); } catch (InterruptedException ie) {}
_log.debug("now testing subsequent contact\n\n\n");
testDest(host, port, conOptions, destName);
_log.debug("\n\nTest of subsequent contact complete\n\n");
}
private static void testDest(String host, int port, String conOptions, String destName) {
_log.info("\n\nTesting creating a new destination (should come back with 'SESSION STATUS RESULT=OK DESTINATION=someName)\n\n\n");
try {
Socket s = new Socket(host, port);
OutputStream out = s.getOutputStream();
out.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes());
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
String line = reader.readLine();
_log.debug("line read for valid version: " + line);
String req = "SESSION CREATE STYLE=RAW DESTINATION=" + destName + " " + conOptions + "\n";
out.write(req.getBytes());
line = reader.readLine();
_log.info("Response to creating the session with destination " + destName + ": " + line);
_log.debug("The above should contain SESSION STATUS RESULT=OK\n\n\n");
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
s.close();
} catch (Exception e) {
_log.error("Error testing for valid version", e);
}
}
public static void main(String args[]) {
// "i2cp.tcp.host=www.i2p.net i2cp.tcp.port=7765";
// "i2cp.tcp.host=localhost i2cp.tcp.port=7654 tunnels.inboundDepth=0";
String conOptions = "i2cp.tcp.host=dev.i2p.net i2cp.tcp.port=7002 tunnels.inboundDepth=0";
if (args.length > 0) {
conOptions = "";
for (int i = 0; i < args.length; i++)
conOptions = conOptions + " " + args[i];
}
try {
TestUtil.startupBridge(6000);
runTest("localhost", 6000, conOptions);
} catch (Throwable t) {
_log.error("Error running test", t);
}
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
System.exit(0);
}
}

View File

@@ -0,0 +1,106 @@
package net.i2p.sam;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.StringTokenizer;
import java.util.Properties;
import net.i2p.util.Log;
import net.i2p.sam.SAMUtils;
public class TestDatagramTransfer {
private static Log _log = new Log(TestCreateSessionDatagram.class);
private static void runTest(String samHost, int samPort, String conOptions) {
testTransfer(samHost, samPort, conOptions);
}
private static void testTransfer(String host, int port, String conOptions) {
String destName = "TRANSIENT";
_log.info("\n\nTesting creating a new destination (should come back with 'SESSION STATUS RESULT=OK DESTINATION=someName)\n\n\n");
try {
Socket s = new Socket(host, port);
OutputStream out = s.getOutputStream();
out.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes());
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
String line = reader.readLine();
_log.debug("line read for valid version: " + line);
String req = "SESSION CREATE STYLE=DATAGRAM DESTINATION=" + destName + " " + conOptions + "\n";
out.write(req.getBytes());
line = reader.readLine();
_log.info("Response to creating the session with destination " + destName + ": " + line);
_log.debug("The above should contain SESSION STATUS RESULT=OK\n\n\n");
String lookup = "NAMING LOOKUP NAME=ME\n";
out.write(lookup.getBytes());
line = reader.readLine();
_log.info("Response from the lookup for ME: " + line);
_log.debug("The above should be a NAMING REPLY");
StringTokenizer tok = new StringTokenizer(line);
String maj = tok.nextToken();
String min = tok.nextToken();
Properties props = SAMUtils.parseParams(tok);
String value = props.getProperty("VALUE");
if (value == null) {
_log.error("No value for ME found! [" + line + "]");
return;
} else {
_log.info("Alice is located at " + value);
}
String send = "DATAGRAM SEND DESTINATION=" + value + " SIZE=3\nYo!";
out.write(send.getBytes());
line = reader.readLine();
tok = new StringTokenizer(line);
maj = tok.nextToken();
min = tok.nextToken();
props = SAMUtils.parseParams(tok);
String size = props.getProperty("SIZE");
String from = props.getProperty("DESTINATION");
if ( (value == null) || (size == null) ||
(!from.equals(value)) || (!size.equals("3")) ) {
_log.error("Reply of the datagram is incorrect: [" + line + "]");
return;
}
char buf[] = new char[3];
int read = reader.read(buf);
if (read != 3) {
_log.error("Unable to read the full datagram");
return;
}
if (new String(buf).equals("Yo!")) {
_log.info("Received payload successfully");
} else {
_log.error("Payload is incorrect! [" + new String(buf) + "]");
}
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
s.close();
} catch (Exception e) {
_log.error("Error testing for valid version", e);
}
}
public static void main(String args[]) {
// "i2cp.tcp.host=www.i2p.net i2cp.tcp.port=7765";
// "i2cp.tcp.host=localhost i2cp.tcp.port=7654 tunnels.inboundDepth=0";
String conOptions = "i2cp.tcp.host=localhost i2cp.tcp.port=7654 tunnels.inboundDepth=0"; // "i2cp.tcp.host=dev.i2p.net i2cp.tcp.port=7002 tunnels.inboundDepth=0";
if (args.length > 0) {
conOptions = "";
for (int i = 0; i < args.length; i++)
conOptions = conOptions + " " + args[i];
}
try {
TestUtil.startupBridge(6000);
runTest("localhost", 6000, conOptions);
} catch (Throwable t) {
_log.error("Error running test", t);
}
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
System.exit(0);
}
}

View File

@@ -0,0 +1,109 @@
package net.i2p.sam;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.StringTokenizer;
import java.util.Properties;
import net.i2p.util.Log;
import net.i2p.sam.SAMUtils;
public class TestRawTransfer {
private static Log _log = new Log(TestCreateSessionDatagram.class);
private static void runTest(String samHost, int samPort, String conOptions) {
testTransfer(samHost, samPort, conOptions);
}
private static void testTransfer(String host, int port, String conOptions) {
String destName = "TRANSIENT";
_log.info("\n\nTesting creating a new destination (should come back with 'SESSION STATUS RESULT=OK DESTINATION=someName)\n\n\n");
try {
Socket s = new Socket(host, port);
OutputStream out = s.getOutputStream();
out.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes());
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
String line = reader.readLine();
_log.debug("line read for valid version: " + line);
String req = "SESSION CREATE STYLE=RAW DESTINATION=" + destName + " " + conOptions + "\n";
out.write(req.getBytes());
line = reader.readLine();
_log.info("Response to creating the session with destination " + destName + ": " + line);
_log.debug("The above should contain SESSION STATUS RESULT=OK\n\n\n");
String lookup = "NAMING LOOKUP NAME=ME\n";
out.write(lookup.getBytes());
line = reader.readLine();
_log.info("Response from the lookup for ME: " + line);
_log.debug("The above should be a NAMING REPLY");
StringTokenizer tok = new StringTokenizer(line);
String maj = tok.nextToken();
String min = tok.nextToken();
Properties props = SAMUtils.parseParams(tok);
String value = props.getProperty("VALUE");
if (value == null) {
_log.error("No value for ME found! [" + line + "]");
return;
} else {
_log.info("Alice is located at " + value);
}
String send = "RAW SEND DESTINATION=" + value + " SIZE=3\nYo!";
out.write(send.getBytes());
line = reader.readLine();
try {
tok = new StringTokenizer(line);
maj = tok.nextToken();
min = tok.nextToken();
props = SAMUtils.parseParams(tok);
} catch (Exception e) {
_log.error("Error parsing response line: [" + line + "]", e);
return;
}
String size = props.getProperty("SIZE");
if ( (size == null) || (!size.equals("3")) ) {
_log.error("Reply of the datagram is incorrect: [" + line + "]");
return;
}
char buf[] = new char[3];
int read = reader.read(buf);
if (read != 3) {
_log.error("Unable to read the full datagram");
return;
}
if (new String(buf).equals("Yo!")) {
_log.info("Rec8eived payload successfully");
} else {
_log.error("Payload is incorrect! [" + new String(buf) + "]");
}
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
s.close();
} catch (Exception e) {
_log.error("Error testing for valid version", e);
}
}
public static void main(String args[]) {
// "i2cp.tcp.host=www.i2p.net i2cp.tcp.port=7765";
// "i2cp.tcp.host=localhost i2cp.tcp.port=7654 tunnels.inboundDepth=0";
String conOptions = "i2cp.tcp.host=dev.i2p.net i2cp.tcp.port=7002 tunnels.inboundDepth=0";
if (args.length > 0) {
conOptions = "";
for (int i = 0; i < args.length; i++)
conOptions = conOptions + " " + args[i];
}
try {
TestUtil.startupBridge(6000);
runTest("localhost", 6000, conOptions);
} catch (Throwable t) {
_log.error("Error running test", t);
}
try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
System.exit(0);
}
}

View File

@@ -1,6 +1,7 @@
package net.i2p.time;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
@@ -79,7 +80,13 @@ public class NtpClient {
// Get response
packet = new DatagramPacket(buf, buf.length);
socket.receive(packet);
socket.setSoTimeout(10*1000);
try {
socket.receive(packet);
} catch (InterruptedIOException iie) {
socket.close();
return -1;
}
// Immediately record the incoming timestamp
double destinationTimestamp = (System.currentTimeMillis()/1000.0) + SECONDS_1900_TO_EPOCH;

View File

@@ -384,7 +384,7 @@ public class NtpMessage {
// low order bits of the timestamp with a random, unbiased
// bitstring, both to avoid systematic roundoff errors and as
// a means of loop detection and replay detection.
array[7] = (byte) (Math.random()*255.0);
array[7+pointer] = (byte) (Math.random()*255.0);
}

View File

@@ -47,18 +47,22 @@ public class Timestamper implements Runnable {
public void run() {
if (_log.shouldLog(Log.INFO))
_log.info("Starting up timestamper");
while (true) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Querying servers " + _serverList);
long now = NtpClient.currentTime(_serverList);
if (now < 0) {
_log.error("Unable to contact any of the NTP servers - network disconnect?");
} else {
try {
while (true) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Stamp time");
stampTime(now);
_log.debug("Querying servers " + _serverList);
long now = NtpClient.currentTime(_serverList);
if (now < 0) {
_log.error("Unable to contact any of the NTP servers - network disconnect?");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Stamp time");
stampTime(now);
}
try { Thread.sleep(DELAY_MS); } catch (InterruptedException ie) {}
}
try { Thread.sleep(DELAY_MS); } catch (InterruptedException ie) {}
} catch (Throwable t) {
_log.log(Log.CRIT, "Timestamper died!", t);
}
}
@@ -67,7 +71,10 @@ public class Timestamper implements Runnable {
*/
private void stampTime(long now) {
try {
URL url = new URL(_targetURL + "&now=" + getNow(now));
String toRequest = _targetURL + "&now=" + getNow(now);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Stamping [" + toRequest + "]");
URL url = new URL(toRequest);
Object o = url.getContent();
// ignore the content
} catch (MalformedURLException mue) {

View File

@@ -14,8 +14,8 @@ package net.i2p;
*
*/
public class CoreVersion {
public final static String ID = "$Revision: 1.4 $ $Date: 2004/04/30 18:04:13 $";
public final static String VERSION = "0.3.1.1";
public final static String ID = "$Revision: 1.5 $ $Date: 2004/05/07 12:52:49 $";
public final static String VERSION = "0.3.1.2";
public static void main(String args[]) {
System.out.println("I2P Core version: " + VERSION);

View File

@@ -1,6 +1,7 @@
; TC's hosts.txt guaranteed freshness
; $Id: hosts.txt,v 1.3 2004/05/03 02:04:12 jrandom Exp $
; $Id: hosts.txt,v 1.4 2004/05/03 19:43:09 jrandom Exp $
; changelog:
; (1.31) added mush.zeit.i2p (i2p's first eepMUSH)
; (1.30) added xilog.i2p
; (1.29) added lucky.i2p, removed lp.i2p
; (1.28) added sungo.i2p
@@ -84,4 +85,5 @@ jar.i2p=xPIYObh2AirO1xoWCj7Wwc5RsGmQ3qulIAOHux9pOm9tzErjAfxv~2EazsZjyXCZ0zi3ylUj
sungo.i2p=S53R0ZV3gK3yA0woixx--czUOOsltTZABkFw0VyVuq-Dsx7inqAPYZKIWok3TP2vxBNM9I8LEExp6rlvJlPNvunzxYw0uZg3paN8V1vU3uRRhg4KDh~eAHxQo4cMC7Qng1jCt8Nb6acsg-NNm81fFJGvFaSqoi7Vwr-zZ9G1OuuzLI02Ald4RnvTOcsoOlh~gna0Y~IXGXJFI0KA76rKL3jgRAE~TqIIorGK2fTSPOwcjUPE9VXTq2LTpJayqMBdc6U6U8goYyt~cyX5rE-U4t49Vg0dVTkEfDWq-GVTiMSu0tbbyjgUcz3Ls038ugOJL11wO20QRvBkqIDcKo9gIX2ZFlKFhJWpYpNkRmXHAUobMx~zqjrpmbqy6dEA-HzRxlXYHBbIQwrX3qiAFSXPbnWrSMfm2kqMd57PZtxpb9pftKA-I7yhqqRFl0yukzG5Q5nN-ID7zo~6jLTChYsnj1ceEXn6~KMnZ3K3JTC7dR8PsWlGmSwgogHpU31ipyOhAAAA
lucky.i2p=Xzf7x5fOHaUDri-ZCNpg0S-CL7aojzaZ8NQ7Ax~zq60zhvz1yIQoPw38QYS7Vn7F5H3tMlrsUxoEoMd2tgV2gVCUesDMgThRNAkQiJSSsIf2dA5AKqD3FtyNtNTYVH1PjlrsXQ67VOmXoxYgiIRY1QVJj5A4Huw0X5FFzvM1QfHV8CSHI6P0lvFtHZwT~TyhH4SeqfZUL4uilvtRKJb1beayuALpXQE~9B7NUlr-Ws3w6D6g9PJ8mgePo1~iVFo7oKzOPmiWLnXBCh7IZXpSIKkQiSWXD6vA-QkCtlQClV6VPqhwIs63qX8MIE2yXsNQgOb-u9Tc~Y8vk0VjMgaGCDd9g6L8BmsxKHerELt4ZDLcBt9yuX3DWwGjwskZ8E~Fydk9ELnI6if3sOKoKv4Ov8-CwyL4WsgUJECyPelCaZxHkv~m-OygnRN~71jvGOpWDGGRvoVQmSB~JwTQkUVAbCZpXp~J1qQsR2QgK2onW8q1OZ7Q~UsVQS86DmfOwQmQAAAA
xilog.i2p=yHOthRzTaowYM0dH0H8LfHeBpNzfVnBL9TtVSPF1bAImcm0tI1jyw4dERfijVunXviLGQ0NahyOaSRvbn3pBth179n1w5KE-~m1m9EIevv~XgMjDrfkrMnrQXiubvnCiSV9f7u6Cu0fdjlwP7e4Bw3p~6Uy06zikJKdB4duwdWmqOr-7UFuoY3CZRjp6fXe1Xy~JKRxHdUoTfT1qYmEVQqTT5bfCDTt0eiUcKfFz7zMAJgkLVbV7CifasppgaCDG5L3O~87JjKlQ5TazDhCD5222WBw81cMlxlrVXbGMKoS56DZfvBr0oKebzPI2DXq52mjtVF4u5VBgDfEUx2pWQoRaF7iePx2nYg9WdRcc-r4idLPeoZgQLgoPu74iof~T-wXs5wWe9U6lyYQWmRf9VxIiOjEMBOoNjs657-~vWXDQlk6IS95bqRxYXvXZheFqY1uzmdSA1VgmrJX~yN-lZC2LXHJqi4PiSpMqVWjwyGUZINzlc3flUX-NwouSmp01AAAA
mush.zeit.i2p=3mYATMQg83VtUln0eEi9-LZYJ-0wCuhFu5PnDV6mYWFnLwbPf2rG22jyhOBw2h1fXcj3lJg7VnLRx-PGYQIWsIle~z1FBrOyT9ydqGrjbQYJ0bqBYUnWSR-xuHOGWiJ8lH34uqZ~j~owvhH-NAKs332BkSCl36HsjJF46i00ICS4qxJQ9l7YBtGYAvoliMN4rz0FETPsvwroyQ7JptfovHe1yviF5bfjeZ9ITP5EpYpyOVtfR7ELpvjYN3mH087TrgvJWL5Sz4qjAc122luTHDZ~Z01Ti3d0GluOF0Fh8cugcMXZTcSZLLcoo1UX8Iv~azxYoYw4MB-w9o4ftxZGawHnkzNCX9LXvQb9IHtURGu4p~eKQKT3YBDEM1qIZL-AdOeoUJ87wLke-ukLBTErYrszIiBgCr22pAHbH12ygH63UAKgzChg4eYfQ0Ku-I6PukG3QQhn029cYz8KO9LG5cT6QzeMbGIQ0cVgkbxM6028DXQaChA6Ul8lCOR56ZDPAAAA

View File

@@ -95,6 +95,12 @@ statGroup.4.detail.6.field=3
statGroup.4.detail.7.name=unknown tunnel time remaining (day)
statGroup.4.detail.7.option=stat_tunnel.unknownTunnelTimeLeft.24h
statGroup.4.detail.7.field=0
statGroup.4.detail.8.name=tunnel test time (hour)
statGroup.4.detail.8.option=stat_tunnel.testSuccessTime.60m
statGroup.4.detail.8.field=0
statGroup.4.detail.9.name=tunnel test time (day)
statGroup.4.detail.9.option=stat_tunnel.testSuccessTime.24h
statGroup.4.detail.9.field=0
#
statGroup.5.name=transfer
statGroup.5.detail.0.name=messages sent (5 minutes)
@@ -154,4 +160,7 @@ statGroup.7.detail.2.option=stat_netDb.storeSent.5m
statGroup.7.detail.2.field=3
statGroup.7.detail.3.name=db store sent (hour)
statGroup.7.detail.3.option=stat_netDb.storeSent.60m
statGroup.7.detail.3.field=3
statGroup.7.detail.4.name=failed db lookups (hour)
statGroup.7.detail.3.option=stat_netDb.failedPeers.60m
statGroup.7.detail.3.field=3

View File

@@ -3,7 +3,5 @@ title I2P Router
cd ##_scripts_installdir##
REM the -XX args are workarounds for bugs in java 1.4.2's garbage collector
REM replace java with javaw if you don't want a window to pop up
javaw -cp lib\i2p.jar;lib\router.jar;lib\mstreaming.jar;lib\heartbeat.jar;lib\i2ptunnel.jar;lib\netmonitor.jar;lib\sam.jar;lib\timestamper.jar -Djava.library.path=. -DloggerFilenameOverride=logs\log-router-#.txt -XX:NewSize=4M -XX:MaxNewSize=8M -XX:PermSize=8M -XX:MaxPermSize=32M net.i2p.router.Router
echo Router started up, please see http://localhost:7655/
java -cp lib\i2p.jar;lib\router.jar;lib\mstreaming.jar;lib\heartbeat.jar;lib\i2ptunnel.jar;lib\netmonitor.jar;lib\sam.jar;lib\timestamper.jar -Djava.library.path=. -DloggerFilenameOverride=logs\log-router-#.txt -XX:NewSize=4M -XX:MaxNewSize=8M -XX:PermSize=8M -XX:MaxPermSize=32M net.i2p.router.Router

View File

@@ -42,8 +42,6 @@ public class JobQueue {
private ArrayList _readyJobs;
/** list of jobs that are scheduled for running in the future */
private ArrayList _timedJobs;
/** when true, don't run any new jobs or update any limits, etc */
private boolean _paused;
/** job name to JobStat for that job */
private SortedMap _jobStats;
/** how many job queue runners can go concurrently */
@@ -117,7 +115,6 @@ public class JobQueue {
_readyJobs = new ArrayList();
_timedJobs = new ArrayList();
_queueRunners = new HashMap();
_paused = false;
_jobStats = Collections.synchronizedSortedMap(new TreeMap());
_allowParallelOperation = false;
_pumper = new QueuePumper();
@@ -241,9 +238,6 @@ public class JobQueue {
*/
Job getNext() {
while (_alive) {
while (_paused) {
try { Thread.sleep(30); } catch (InterruptedException ie) {}
}
Job rv = null;
int ready = 0;
synchronized (_readyJobs) {
@@ -254,10 +248,13 @@ public class JobQueue {
if (rv != null) {
// we found one, but there may be more, so wake up enough
// other runners
awaken(ready-1);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Using a ready job after waking up " + (ready-1) + " others");
_log.debug("Waking up " + (ready-1) + " job runners (and running one)");
awaken(ready-1);
return rv;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No jobs pending, waiting a second");
}
try {
@@ -266,6 +263,8 @@ public class JobQueue {
}
} catch (InterruptedException ie) {}
}
if (_log.shouldLog(Log.WARN))
_log.warn("No longer alive, returning null");
return null;
}
@@ -341,8 +340,6 @@ public class JobQueue {
}
}
//public void pauseQueue() { _paused = true; }
//public void unpauseQueue() { _paused = false; }
void removeRunner(int id) { _queueRunners.remove(new Integer(id)); }
@@ -358,23 +355,6 @@ public class JobQueue {
_runnerLock.notify();
}
}
int numRunners = 0;
synchronized (_queueRunners) {
numRunners = _queueRunners.size();
}
if (numRunners > 1) {
if (numMadeReady > numRunners) {
if (numMadeReady < _maxRunners) {
_log.info("Too much job contention (" + numMadeReady + " ready and waiting, " + numRunners + " runners exist), adding " + numMadeReady + " new runners (with max " + _maxRunners + ")");
runQueue(numMadeReady);
} else {
_log.info("Too much job contention (" + numMadeReady + " ready and waiting, " + numRunners + " runners exist), increasing to our max of " + _maxRunners + " runners");
runQueue(_maxRunners);
}
}
}
}
/**
@@ -392,10 +372,6 @@ public class JobQueue {
public void run() {
try {
while (_alive) {
while (_paused) {
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
}
// periodically update our max runners limit
long now = _context.clock().now();
if (now > _lastLimitUpdated + MAX_LIMIT_UPDATE_DELAY) {

View File

@@ -37,7 +37,11 @@ class JobQueueRunner implements Runnable {
while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) {
try {
Job job = _context.jobQueue().getNext();
if (job == null) continue;
if (job == null) {
if (_log.shouldLog(Log.ERROR))
_log.error("getNext returned null - dead?");
continue;
}
long now = _context.clock().now();
long enqueuedTime = 0;
@@ -52,7 +56,6 @@ class JobQueueRunner implements Runnable {
}
long betweenJobs = now - lastActive;
_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
_currentJob = job;
_lastJob = null;
if (_log.shouldLog(Log.DEBUG))
@@ -67,6 +70,7 @@ class JobQueueRunner implements Runnable {
_context.jobQueue().updateStats(job, doStart, origStartAfter, duration);
long diff = _context.clock().now() - beforeUpdate;
_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
_context.statManager().addRateData("jobQueue.jobRun", duration, duration);
_context.statManager().addRateData("jobQueue.jobLag", doStart - origStartAfter, 0);
_context.statManager().addRateData("jobQueue.jobWait", enqueuedTime, enqueuedTime);

View File

@@ -50,6 +50,13 @@ public interface ProfileManager {
*/
void tunnelRejected(Hash peer, long responseTimeMs);
/**
* Note that a tunnel that the router is participating in
* was successfully tested with the given round trip latency
*
*/
void tunnelTestSucceeded(Hash peer, long responseTimeMs);
/**
* Note that the peer participated in a tunnel that failed. Its failure may not have
* been the peer's fault however.

View File

@@ -15,8 +15,8 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.3 $ $Date: 2004/04/30 18:04:13 $";
public final static String VERSION = "0.3.1.1";
public final static String ID = "$Revision: 1.4 $ $Date: 2004/05/07 12:52:49 $";
public final static String VERSION = "0.3.1.2";
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@@ -104,6 +104,7 @@ public class StatisticsManager implements Service {
includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 });
includeRate("inNetPool.dropped", stats, new long[] { 60*60*1000, 24*60*60*1000 });
includeRate("tunnel.participatingTunnels", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("tunnel.testSuccessTime", stats, new long[] { 60*60*1000l, 24*60*60*1000l });
includeRate("netDb.lookupsReceived", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("netDb.lookupsHandled", stats, new long[] { 5*60*1000, 60*60*1000 });
includeRate("netDb.lookupsMatched", stats, new long[] { 5*60*1000, 60*60*1000 });

View File

@@ -48,6 +48,7 @@ public class TunnelInfo extends DataStructureImpl {
private Properties _options;
private TunnelSettings _settings;
private long _created;
private long _lastTested;
private boolean _ready;
private boolean _wasEverReady;
@@ -67,6 +68,7 @@ public class TunnelInfo extends DataStructureImpl {
_ready = false;
_wasEverReady = false;
_created = _context.clock().now();
_lastTested = -1;
}
public TunnelId getTunnelId() { return _id; }
@@ -142,6 +144,10 @@ public class TunnelInfo extends DataStructureImpl {
public long getCreated() { return _created; }
/** when was the peer last tested (or -1 if never)? */
public long getLastTested() { return _lastTested; }
public void setLastTested(long when) { _lastTested = when; }
/**
* Number of hops left in the tunnel (including this one)
*

View File

@@ -68,9 +68,9 @@ public class ClientListenerRunner implements Runnable {
_log.debug("Connection received");
runConnection(socket);
} else {
socket.close();
if (_log.shouldLog(Log.WARN))
_log.warn("Refused connection from " + socket.getInetAddress());
socket.close();
}
} catch (IOException ioe) {
_log.error("Server error accepting", ioe);

View File

@@ -4,8 +4,9 @@ import net.i2p.data.Hash;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
import net.i2p.router.RouterContext;
import java.io.File;
class PeerProfile {
public class PeerProfile {
private Log _log;
private RouterContext _context;
// whoozaat?
@@ -22,6 +23,7 @@ class PeerProfile {
private RateStat _receiveSize = null;
private RateStat _dbResponseTime = null;
private RateStat _tunnelCreateResponseTime = null;
private RateStat _tunnelTestResponseTime = null;
private RateStat _commError = null;
private RateStat _dbIntroduction = null;
// calculation bonuses
@@ -63,7 +65,7 @@ class PeerProfile {
public void setPeer(Hash peer) { _peer = peer; }
/**
* are we keeping an expanded profile on the peer, or just the bare minimum?
* are we keeping an expanded profile on the peer, or just the bare minimum.
* If we aren't keeping the expanded profile, all of the rates as well as the
* TunnelHistory and DBHistory will not be available.
*
@@ -123,7 +125,9 @@ class PeerProfile {
public RateStat getDbResponseTime() { return _dbResponseTime; }
/** how long it takes to get a tunnel create response from the peer (in milliseconds), calculated over a 1 minute, 1 hour, and 1 day period */
public RateStat getTunnelCreateResponseTime() { return _tunnelCreateResponseTime; }
/** how long between communication errors with the peer (e.g. disconnection), calculated over a 1 minute, 1 hour, and 1 day period */
/** how long it takes to successfully test a tunnel this peer participates in (in milliseconds), calculated over a 10 minute, 1 hour, and 1 day period */
public RateStat getTunnelTestResponseTime() { return _tunnelTestResponseTime; }
/** how long between communication errors with the peer (disconnection, etc), calculated over a 1 minute, 1 hour, and 1 day period */
public RateStat getCommError() { return _commError; }
/** how many new peers we get from dbSearchReplyMessages or dbStore messages, calculated over a 1 hour, 1 day, and 1 week period */
public RateStat getDbIntroduction() { return _dbIntroduction; }
@@ -160,7 +164,7 @@ class PeerProfile {
*/
public double getSpeedValue() { return _speedValue; }
/**
* How likely are they to stay up and pass on messages over the next few minutes?
* How likely are they to stay up and pass on messages over the next few minutes.
* Positive numbers means more likely, negative numbers means its probably not
* even worth trying.
*
@@ -189,6 +193,7 @@ class PeerProfile {
_receiveSize = null;
_dbResponseTime = null;
_tunnelCreateResponseTime = null;
_tunnelTestResponseTime = null;
_commError = null;
_dbIntroduction = null;
_tunnelHistory = null;
@@ -212,9 +217,11 @@ class PeerProfile {
if (_receiveSize == null)
_receiveSize = new RateStat("receiveSize", "How large received messages are", "profile", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_dbResponseTime == null)
_dbResponseTime = new RateStat("dbResponseTime", "how long it takes to get a db response from the peer (in milliseconds)", "profile", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000 } );
_dbResponseTime = new RateStat("dbResponseTime", "how long it takes to get a db response from the peer (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_tunnelCreateResponseTime == null)
_tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", "profile", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000 } );
_tunnelCreateResponseTime = new RateStat("tunnelCreateResponseTime", "how long it takes to get a tunnel create response from the peer (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_tunnelTestResponseTime == null)
_tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_commError == null)
_commError = new RateStat("commErrorRate", "how long between communication errors with the peer (e.g. disconnection)", "profile", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_dbIntroduction == null)
@@ -238,6 +245,7 @@ class PeerProfile {
_sendFailureSize.coallesceStats();
_sendSuccessSize.coallesceStats();
_tunnelCreateResponseTime.coallesceStats();
_tunnelTestResponseTime.coallesceStats();
_dbHistory.coallesceStats();
_speedValue = calculateSpeed();
@@ -270,7 +278,7 @@ class PeerProfile {
* for an expanded profile, and ~212 bytes for a compacted one.
*
*/
public static void main(String args[]) {
public static void main2(String args[]) {
RouterContext ctx = new RouterContext(null);
testProfileSize(ctx, 100, 0); // 560KB
testProfileSize(ctx, 1000, 0); // 3.9MB
@@ -280,6 +288,36 @@ class PeerProfile {
testProfileSize(ctx, 0, 300000); // 63MB
}
/**
* Read in all of the profiles specified and print out
* their calculated values. Usage: <pre>
* PeerProfile [filename]*
* </pre>
*/
public static void main(String args[]) {
RouterContext ctx = new RouterContext(new net.i2p.router.Router());
ProfilePersistenceHelper helper = new ProfilePersistenceHelper(ctx);
try { Thread.sleep(5*1000); } catch (InterruptedException e) {}
StringBuffer buf = new StringBuffer(1024);
for (int i = 0; i < args.length; i++) {
PeerProfile profile = helper.readProfile(new File(args[i]));
if (profile == null) {
buf.append("Could not load profile ").append(args[i]).append('\n');
continue;
}
//profile.coallesceStats();
buf.append("Peer " + profile.getPeer().toBase64()
+ ":\t Speed:\t" + profile.calculateSpeed()
+ " Reliability:\t" + profile.calculateReliability()
+ " Integration:\t" + profile.calculateIntegration()
+ " Active?\t" + profile.getIsActive()
+ " Failing?\t" + profile.calculateIsFailing()
+ '\n');
}
try { Thread.sleep(5*1000); } catch (InterruptedException e) {}
System.out.println(buf.toString());
}
private static void testProfileSize(RouterContext ctx, int numExpanded, int numCompact) {
Runtime.getRuntime().gc();
PeerProfile profs[] = new PeerProfile[numExpanded];

View File

@@ -101,6 +101,17 @@ public class ProfileManagerImpl implements ProfileManager {
data.getTunnelHistory().incrementRejected();
}
/**
* Note that a tunnel that the router is participating in
* was successfully tested with the given round trip latency
*
*/
public void tunnelTestSucceeded(Hash peer, long responseTimeMs) {
PeerProfile data = getProfile(peer);
if (data == null) return;
data.getTunnelTestResponseTime().addData(responseTimeMs, responseTimeMs);
}
/**
* Note that the peer participated in a tunnel that failed. Its failure may not have
* been the peer's fault however.

View File

@@ -57,6 +57,15 @@ public class ProfileOrganizer {
/** integration value, seperating well integrated from not well integrated */
private double _thresholdIntegrationValue;
/**
* Defines what percentage of the average reliability will be used as the
* reliability threshold. For example, .5 means all peers with the reliability
* greater than half of the average will be considered "reliable".
*
*/
public static final String PROP_RELIABILITY_THRESHOLD_FACTOR = "profileOrganizer.reliabilityThresholdFactor";
public static final double DEFAULT_RELIABILITY_THRESHOLD_FACTOR = .5d;
/** synchronized against this lock when updating the tier that peers are located in (and when fetching them from a peer) */
private Object _reorganizeLock = new Object();
@@ -395,7 +404,7 @@ public class ProfileOrganizer {
if (profile.getReliabilityValue() > 0)
totalReliability += profile.getReliabilityValue();
}
_thresholdReliabilityValue = 0.5d * avg(totalReliability, numActive);
_thresholdReliabilityValue = getReliabilityThresholdFactor() * avg(totalReliability, numActive);
// now derive the integration and speed thresholds based ONLY on the reliable
// and active peers
@@ -620,6 +629,46 @@ public class ProfileOrganizer {
return buf.toString();
}
/**
* How much should we shrink (or grow) the average reliability to determine the
* threshold - numbers greater than 1 increase the threshold, less than 1 decrease
* it. This can be changed during runtime by updating the router.config
*
* @return factor to multiply the average reliability with to determine the threshold
*/
private double getReliabilityThresholdFactor() {
if (_context.router() != null) {
String val = _context.router().getConfigSetting(PROP_RELIABILITY_THRESHOLD_FACTOR);
if (val != null) {
try {
double rv = Double.parseDouble(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router config said " + PROP_RELIABILITY_THRESHOLD_FACTOR + '=' + val);
return rv;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Reliability threshold factor improperly set in the router config [" + val + "]", nfe);
}
}
}
String val = _context.getProperty(PROP_RELIABILITY_THRESHOLD_FACTOR, ""+DEFAULT_RELIABILITY_THRESHOLD_FACTOR);
if (val != null) {
try {
double rv = Double.parseDouble(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router context said " + PROP_RELIABILITY_THRESHOLD_FACTOR+ '=' + val);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Reliability threshold factor improperly set in the router environment [" + val + "]", nfe);
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("no config for " + PROP_RELIABILITY_THRESHOLD_FACTOR + ", using " + DEFAULT_RELIABILITY_THRESHOLD_FACTOR);
return DEFAULT_RELIABILITY_THRESHOLD_FACTOR;
}
private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK));
private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } }
}

View File

@@ -123,7 +123,9 @@ class ProfilePersistenceHelper {
profile.getDbResponseTime().store(out, "dbResponseTime");
profile.getReceiveSize().store(out, "receiveSize");
profile.getSendFailureSize().store(out, "sendFailureSize");
profile.getSendSuccessSize().store(out, "tunnelCreateResponseTime");
profile.getSendSuccessSize().store(out, "sendSuccessSize");
profile.getTunnelCreateResponseTime().store(out, "tunnelCreateResponseTime");
profile.getTunnelTestResponseTime().store(out, "tunnelTestResponseTime");
}
}
@@ -154,10 +156,13 @@ class ProfilePersistenceHelper {
rv.add(files[i]);
return rv;
}
private PeerProfile readProfile(File file) {
public PeerProfile readProfile(File file) {
Hash peer = getHash(file.getName());
try {
if (peer == null) return null;
if (peer == null) {
_log.error("The file " + file.getName() + " is not a valid hash");
return null;
}
PeerProfile profile = new PeerProfile(_context, peer);
Properties props = new Properties();
@@ -181,7 +186,9 @@ class ProfilePersistenceHelper {
profile.getDbResponseTime().load(props, "dbResponseTime", true);
profile.getReceiveSize().load(props, "receiveSize", true);
profile.getSendFailureSize().load(props, "sendFailureSize", true);
profile.getSendSuccessSize().load(props, "tunnelCreateResponseTime", true);
profile.getSendSuccessSize().load(props, "sendSuccessSize", true);
profile.getTunnelCreateResponseTime().load(props, "tunnelCreateResponseTime", true);
profile.getTunnelTestResponseTime().load(props, "tunnelTestResponseTime", true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Loaded the profile for " + peer.toBase64() + " from " + file.getName());
@@ -222,7 +229,7 @@ class ProfilePersistenceHelper {
props.setProperty(key, val);
}
} catch (IOException ioe) {
_log.error("Error loading properties from " + file.getName(), ioe);
_log.warn("Error loading properties from " + file.getName(), ioe);
} finally {
if (in != null) try { in.close(); } catch (IOException ioe) {}
}
@@ -237,6 +244,7 @@ class ProfilePersistenceHelper {
h.fromBase64(key);
return h;
} catch (DataFormatException dfe) {
_log.warn("Invalid base64 [" + key + "]", dfe);
return null;
}
}
@@ -247,10 +255,16 @@ class ProfilePersistenceHelper {
private File getProfileDir() {
if (_profileDir == null) {
String dir = _context.router().getConfigSetting(PROP_PEER_PROFILE_DIR);
if (dir == null) {
_log.info("No peer profile dir specified [" + PROP_PEER_PROFILE_DIR + "], using [" + DEFAULT_PEER_PROFILE_DIR + "]");
dir = DEFAULT_PEER_PROFILE_DIR;
String dir = null;
if (_context.router() == null) {
dir = _context.getProperty(PROP_PEER_PROFILE_DIR, DEFAULT_PEER_PROFILE_DIR);
} else {
dir = _context.router().getConfigSetting(PROP_PEER_PROFILE_DIR);
if (dir == null) {
_log.info("No peer profile dir specified [" + PROP_PEER_PROFILE_DIR
+ "], using [" + DEFAULT_PEER_PROFILE_DIR + "]");
dir = DEFAULT_PEER_PROFILE_DIR;
}
}
_profileDir = new File(dir);
}

View File

@@ -29,8 +29,7 @@ public class ReliabilityCalculator extends Calculator {
val += profile.getSendSuccessSize().getRate(60*60*1000).getLastEventCount();
val += profile.getSendSuccessSize().getRate(60*60*1000).getCurrentEventCount();
val += profile.getTunnelCreateResponseTime().getRate(60*1000).getCurrentEventCount() * 10;
val += profile.getTunnelCreateResponseTime().getRate(60*1000).getLastEventCount() * 5;
val += profile.getTunnelCreateResponseTime().getRate(10*60*1000).getLastEventCount() * 5;
val += profile.getTunnelCreateResponseTime().getRate(60*60*1000).getCurrentEventCount();
val += profile.getTunnelCreateResponseTime().getRate(60*60*1000).getLastEventCount();

View File

@@ -8,60 +8,318 @@ import net.i2p.router.RouterContext;
/**
* Quantify how fast the peer is - how fast they respond to our requests, how fast
* they pass messages on, etc. This should be affected both by their bandwidth/latency,
* as well as their load.
* as well as their load. The essence of the current algorithm is to determine
* approximately how many 2KB messages the peer can pass round trip within a single
* minute - not based just on itself though, but including the delays of other peers
* in the tunnels. As such, more events make it more accurate.
*
*/
public class SpeedCalculator extends Calculator {
private Log _log;
private RouterContext _context;
/**
* minimum number of events to use a particular period's data. If this many
* events haven't occurred in the period yet, the next largest period is tried.
*/
public static final String PROP_EVENT_THRESHOLD = "speedCalculator.eventThreshold";
public static final int DEFAULT_EVENT_THRESHOLD = 50;
/** should the calculator use instantaneous rates, or period averages? */
public static final String PROP_USE_INSTANTANEOUS_RATES = "speedCalculator.useInstantaneousRates";
public static final boolean DEFAULT_USE_INSTANTANEOUS_RATES = false;
/** should the calculator use tunnel test time only, or include all data? */
public static final String PROP_USE_TUNNEL_TEST_ONLY = "speedCalculator.useTunnelTestOnly";
public static final boolean DEFAULT_USE_TUNNEL_TEST_ONLY = false;
public SpeedCalculator(RouterContext context) {
_context = context;
_log = context.logManager().getLog(SpeedCalculator.class);
}
public double calc(PeerProfile profile) {
double dbResponseTime = profile.getDbResponseTime().getRate(60*1000).getLifetimeAverageValue();
double tunnelResponseTime = profile.getTunnelCreateResponseTime().getRate(60*1000).getLifetimeAverageValue();
double roundTripRate = Math.max(dbResponseTime, tunnelResponseTime);
long threshold = getEventThreshold();
boolean tunnelTestOnly = getUseTunnelTestOnly();
// send and receive rates are the (period rate) * (saturation %)
double sendRate = calcSendRate(profile);
double receiveRate = calcReceiveRate(profile);
long period = 10*60*1000;
long events = getEventCount(profile, period, tunnelTestOnly);
if (events < threshold) {
period = 60*60*1000l;
events = getEventCount(profile, period, tunnelTestOnly);
if (events < threshold) {
period = 24*60*60*1000;
events = getEventCount(profile, period, tunnelTestOnly);
if (events < threshold) {
period = -1;
events = getEventCount(profile, period, tunnelTestOnly);
}
}
}
double measuredRoundTripTime = getMeasuredRoundTripTime(profile, period, tunnelTestOnly);
double measuredRTPerMinute = 0;
if (measuredRoundTripTime > 0)
measuredRTPerMinute = (60000.0d / measuredRoundTripTime);
double val = 60000.0d - 0.1*roundTripRate + sendRate + receiveRate;
// if we don't have any data, the rate is 0
if ( (roundTripRate == 0.0d) && (sendRate == 0.0d) )
val = 0.0;
double estimatedRTPerMinute = 0;
double estimatedRoundTripTime = 0;
if (!tunnelTestOnly) {
estimatedRoundTripTime = getEstimatedRoundTripTime(profile, period);
if (estimatedRoundTripTime > 0)
estimatedRTPerMinute = (60000.0d / estimatedRoundTripTime);
}
double estimateFactor = getEstimateFactor(threshold, events);
double rv = (1-estimateFactor)*measuredRTPerMinute + (estimateFactor)*estimatedRTPerMinute;
if (_log.shouldLog(Log.DEBUG))
_log.debug("roundTripRate: " + roundTripRate + "ms sendRate: " + sendRate + "bytes/second, receiveRate: " + receiveRate + "bytes/second, val: " + val + " for " + profile.getPeer().toBase64());
val += profile.getSpeedBonus();
return val;
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("\n\nrv: " + rv + " events: " + events + " threshold: " + threshold + " period: " + period + " useTunnelTestOnly? " + tunnelTestOnly + "\n"
+ "measuredRTT: " + measuredRoundTripTime + " measured events per minute: " + measuredRTPerMinute + "\n"
+ "estimateRTT: " + estimatedRoundTripTime + " estimated events per minute: " + estimatedRTPerMinute + "\n"
+ "estimateFactor: " + estimateFactor + "\n"
+ "for peer: " + profile.getPeer().toBase64());
}
rv += profile.getSpeedBonus();
return rv;
}
/**
* How much do we want to prefer the measured values more than the estimated
* values, as a fraction. The value 1 means ignore the measured values, while
* the value 0 means ignore the estimate, and everything inbetween means, well
* everything inbetween.
*
*/
private double getEstimateFactor(long eventThreshold, long numEvents) {
if (numEvents > eventThreshold)
return 0.0d;
else
return numEvents / eventThreshold;
}
private double calcSendRate(PeerProfile profile) { return calcRate(profile.getSendSuccessSize()); }
private double calcReceiveRate(PeerProfile profile) { return calcRate(profile.getReceiveSize()); }
private double calcRate(RateStat stat) {
double rate = 0.0d;
Rate hourRate = stat.getRate(60*60*1000);
rate = calcRate(hourRate);
return rate;
/**
* How many measured events do we have for the given period? If the period is negative,
* return the lifetime events.
*
*/
private long getEventCount(PeerProfile profile, long period, boolean tunnelTestOnly) {
if (period < 0) {
Rate dbResponseRate = profile.getDbResponseTime().getRate(60*60*1000l);
Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(60*60*1000l);
Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(60*60*1000l);
long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getLifetimeEventCount();
long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getLifetimeEventCount();
long tunnelTests = tunnelTestRate.getLifetimeEventCount();
return dbResponses + tunnelResponses + tunnelTests;
} else {
Rate dbResponseRate = profile.getDbResponseTime().getRate(period);
Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(period);
Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(period);
long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getCurrentEventCount() + dbResponseRate.getLastEventCount();
long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getCurrentEventCount() + tunnelResponseRate.getLastEventCount();
long tunnelTests = tunnelTestRate.getCurrentEventCount() + tunnelTestRate.getLastEventCount();
if (_log.shouldLog(Log.DEBUG))
_log.debug("TunnelTests for period " + period + ": " + tunnelTests +
" last: " + tunnelTestRate.getLastEventCount() + " lifetime: " +
tunnelTestRate.getLifetimeEventCount());
return dbResponses + tunnelResponses + tunnelTests;
}
}
private double calcRate(Rate rate) {
long events = rate.getLastEventCount() + rate.getCurrentEventCount();
/**
* Retrieve the average measured round trip time within the period specified (including
* db responses, tunnel create responses, and tunnel tests). If the period is negative,
* it uses the lifetime stats. In addition, it weights each of those three measurements
* equally according to their event count (e.g. 4 dbResponses @ 10 seconds and 1 tunnel test
* at 5 seconds will leave the average at 9 seconds)
*
*/
private double getMeasuredRoundTripTime(PeerProfile profile, long period, boolean tunnelTestOnly) {
if (period < 0) {
Rate dbResponseRate = profile.getDbResponseTime().getRate(60*60*1000l);
Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(60*60*1000l);
Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(60*60*1000l);
long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getLifetimeEventCount();
long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getLifetimeEventCount();
long tunnelTests = tunnelTestRate.getLifetimeEventCount();
double dbResponseTime = tunnelTestOnly ? 0 : dbResponseRate.getLifetimeAverageValue();
double tunnelResponseTime = tunnelTestOnly ? 0 : tunnelResponseRate.getLifetimeAverageValue();
double tunnelTestTime = tunnelTestRate.getLifetimeAverageValue();
long events = dbResponses + tunnelResponses + tunnelTests;
if (events <= 0) return 0;
return (dbResponses*dbResponseTime + tunnelResponses*tunnelResponseTime + tunnelTests*tunnelTestTime)
/ events;
} else {
Rate dbResponseRate = profile.getDbResponseTime().getRate(period);
Rate tunnelResponseRate = profile.getTunnelCreateResponseTime().getRate(period);
Rate tunnelTestRate = profile.getTunnelTestResponseTime().getRate(period);
long dbResponses = tunnelTestOnly ? 0 : dbResponseRate.getCurrentEventCount() + dbResponseRate.getLastEventCount();
long tunnelResponses = tunnelTestOnly ? 0 : tunnelResponseRate.getCurrentEventCount() + tunnelResponseRate.getLastEventCount();
long tunnelTests = tunnelTestRate.getCurrentEventCount() + tunnelTestRate.getLastEventCount();
double dbResponseTime = tunnelTestOnly ? 0 : dbResponseRate.getAverageValue();
double tunnelResponseTime = tunnelTestOnly ? 0 : tunnelResponseRate.getAverageValue();
double tunnelTestTime = tunnelTestRate.getAverageValue();
long events = dbResponses + tunnelResponses + tunnelTests;
if (events <= 0) return 0;
return (dbResponses*dbResponseTime + tunnelResponses*tunnelResponseTime + tunnelTests*tunnelTestTime)
/ events;
}
}
private double getEstimatedRoundTripTime(PeerProfile profile, long period) {
double estSendTime = getEstimatedSendTime(profile, period);
double estRecvTime = getEstimatedReceiveTime(profile, period);
return estSendTime + estRecvTime;
}
private double getEstimatedSendTime(PeerProfile profile, long period) {
double bps = calcRate(profile.getSendSuccessSize(), period);
if (bps <= 0)
return 0.0d;
else
return 2048.0d / bps;
}
private double getEstimatedReceiveTime(PeerProfile profile, long period) {
double bps = calcRate(profile.getReceiveSize(), period);
if (bps <= 0)
return 0.0d;
else
return 2048.0d / bps;
}
private double calcRate(RateStat stat, long period) {
Rate rate = stat.getRate(period);
if (rate == null) return 0.0d;
return calcRate(rate, period);
}
private double calcRate(Rate rate, long period) {
long events = rate.getCurrentEventCount();
if (events >= 1) {
double ms = rate.getLastTotalEventTime() + rate.getCurrentTotalEventTime();
double bytes = rate.getLastTotalValue() + rate.getCurrentTotalValue();
double ms = rate.getCurrentTotalEventTime();
double bytes = rate.getCurrentTotalValue();
if (_log.shouldLog(Log.DEBUG))
_log.debug("calculating rate: ms=" + ((int)ms) + " bytes=" + ((int)bytes));
if ( (bytes > 0) && (ms > 0) ) {
return (bytes * 1000.0d) / ms;
if (getUseInstantaneousRates()) {
return (bytes * 1000.0d) / ms;
} else {
// period average
return (bytes * 1000.0d) / period;
}
}
}
return 0.0d;
}
/**
* What is the minimum number of measured events we want in a period before
* trusting the values? This first checks the router's configuration, then
* the context, and then finally falls back on a static default (100).
*
*/
private long getEventThreshold() {
if (_context.router() != null) {
String threshold = _context.router().getConfigSetting(PROP_EVENT_THRESHOLD);
if (threshold != null) {
try {
return Long.parseLong(threshold);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Event threshold for speed improperly set in the router config [" + threshold + "]", nfe);
}
}
}
String threshold = _context.getProperty(PROP_EVENT_THRESHOLD, ""+DEFAULT_EVENT_THRESHOLD);
if (threshold != null) {
try {
return Long.parseLong(threshold);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Event threshold for speed improperly set in the router environment [" + threshold + "]", nfe);
}
}
return DEFAULT_EVENT_THRESHOLD;
}
/**
* Should we use instantaneous rates for the estimated speed, or the period rates?
* This first checks the router's configuration, then the context, and then
* finally falls back on a static default (true).
*
* @return true if we should use instantaneous rates, false if we should use period averages
*/
private boolean getUseInstantaneousRates() {
if (_context.router() != null) {
String val = _context.router().getConfigSetting(PROP_USE_INSTANTANEOUS_RATES);
if (val != null) {
try {
return Boolean.getBoolean(val);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Instantaneous rate for speed improperly set in the router config [" + val + "]", nfe);
}
}
}
String val = _context.getProperty(PROP_USE_INSTANTANEOUS_RATES, ""+DEFAULT_USE_INSTANTANEOUS_RATES);
if (val != null) {
try {
return Boolean.getBoolean(val);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Instantaneous rate for speed improperly set in the router environment [" + val + "]", nfe);
}
}
return DEFAULT_USE_INSTANTANEOUS_RATES;
}
/**
* Should we only use the measured tunnel testing time, or should we include
* measurements on the db responses and tunnel create responses. This first
* checks the router's configuration, then the context, and then finally falls
* back on a static default (true).
*
* @return true if we should use tunnel test time only, false if we should use all available
*/
private boolean getUseTunnelTestOnly() {
if (_context.router() != null) {
String val = _context.router().getConfigSetting(PROP_USE_TUNNEL_TEST_ONLY);
if (val != null) {
try {
boolean rv = Boolean.getBoolean(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router config said " + PROP_USE_TUNNEL_TEST_ONLY + '=' + val);
return rv;
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Tunnel test only for speed improperly set in the router config [" + val + "]", nfe);
}
}
}
String val = _context.getProperty(PROP_USE_TUNNEL_TEST_ONLY, ""+DEFAULT_USE_TUNNEL_TEST_ONLY);
if (val != null) {
try {
boolean rv = Boolean.getBoolean(val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("router context said " + PROP_USE_TUNNEL_TEST_ONLY + '=' + val);
} catch (NumberFormatException nfe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Tunnel test only for speed improperly set in the router environment [" + val + "]", nfe);
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("no config for " + PROP_USE_TUNNEL_TEST_ONLY + ", using " + DEFAULT_USE_TUNNEL_TEST_ONLY);
return DEFAULT_USE_TUNNEL_TEST_ONLY;
}
}

View File

@@ -37,26 +37,33 @@ import net.i2p.router.RouterContext;
class TestTunnelJob extends JobImpl {
private Log _log;
private TunnelId _id;
/** tunnel that we want to test */
private TunnelId _primaryId;
/** tunnel that is used to help test the primary id */
private TunnelId _secondaryId;
private TunnelPool _pool;
private long _nonce;
public TestTunnelJob(RouterContext ctx, TunnelId id, TunnelPool pool) {
super(ctx);
_log = ctx.logManager().getLog(TestTunnelJob.class);
_id = id;
_primaryId = id;
_pool = pool;
_nonce = ctx.random().nextInt(Integer.MAX_VALUE);
}
public String getName() { return "Test Tunnel"; }
public void runJob() {
if (_log.shouldLog(Log.INFO))
_log.info("Testing tunnel " + _id.getTunnelId());
TunnelInfo info = _pool.getTunnelInfo(_id);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Testing tunnel " + _primaryId.getTunnelId());
TunnelInfo info = _pool.getTunnelInfo(_primaryId);
if (info == null) {
_log.error("wtf, why are we testing a tunnel that we do not know about? [" + _id.getTunnelId() + "]", getAddedBy());
_log.error("wtf, why are we testing a tunnel that we do not know about? ["
+ _primaryId.getTunnelId() + "]", getAddedBy());
return;
}
// mark it as something we're testing
info.setLastTested(_context.clock().now());
if (isOutbound(info)) {
testOutbound(info);
} else {
@@ -75,7 +82,7 @@ class TestTunnelJob extends JobImpl {
return false;
}
private final static long TEST_TIMEOUT = 60*1000; // 60 seconds for a test to succeed
private final static long TEST_TIMEOUT = 30*1000; // 30 seconds for a test to succeed
private final static int TEST_PRIORITY = 100;
/**
@@ -83,22 +90,51 @@ class TestTunnelJob extends JobImpl {
* to ourselves and wait for it to arrive.
*/
private void testOutbound(TunnelInfo info) {
if (_log.shouldLog(Log.INFO))
_log.info("Testing outbound tunnel " + info);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Testing outbound tunnel " + info);
DeliveryStatusMessage msg = new DeliveryStatusMessage(_context);
msg.setArrival(new Date(_context.clock().now()));
msg.setMessageId(_nonce);
Hash us = _context.routerHash();
TunnelId inboundTunnelId = getReplyTunnel();
if (inboundTunnelId == null) {
_secondaryId = getReplyTunnel();
if (_secondaryId == null) {
_context.jobQueue().addJob(new TestFailedJob());
return;
}
TunnelInfo inboundInfo = _pool.getTunnelInfo(_secondaryId);
inboundInfo.setLastTested(_context.clock().now());
TestFailedJob failureJob = new TestFailedJob();
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId());
SendTunnelMessageJob testJob = new SendTunnelMessageJob(_context, msg, info.getTunnelId(), us, inboundTunnelId, null, new TestSuccessfulJob(), failureJob, selector, TEST_TIMEOUT, TEST_PRIORITY);
SendTunnelMessageJob testJob = new SendTunnelMessageJob(_context, msg, info.getTunnelId(), us, _secondaryId, null, new TestSuccessfulJob(), failureJob, selector, TEST_TIMEOUT, TEST_PRIORITY);
_context.jobQueue().addJob(testJob);
}
/**
* Send a message to the gateway and wait for it to arrive.
*/
private void testInbound(TunnelInfo info) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Testing inbound tunnel " + info);
DeliveryStatusMessage msg = new DeliveryStatusMessage(_context);
msg.setArrival(new Date(_context.clock().now()));
msg.setMessageId(_nonce);
_secondaryId = getOutboundTunnel();
if (_secondaryId == null) {
_context.jobQueue().addJob(new TestFailedJob());
return;
}
TunnelInfo outboundInfo = _pool.getTunnelInfo(_secondaryId);
outboundInfo.setLastTested(_context.clock().now());
TestFailedJob failureJob = new TestFailedJob();
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId());
SendTunnelMessageJob j = new SendTunnelMessageJob(_context, msg, _secondaryId, info.getThisHop(), info.getTunnelId(), null, new TestSuccessfulJob(), failureJob, selector, _context.clock().now()+TEST_TIMEOUT, TEST_PRIORITY);
_context.jobQueue().addJob(j);
}
/**
* Get the tunnel for replies to be sent down when testing outbound tunnels
@@ -116,7 +152,7 @@ class TestTunnelJob extends JobImpl {
for (int i = 0; i < tunnelIds.size(); i++) {
TunnelId id = (TunnelId)tunnelIds.get(i);
if (id.equals(_id)) {
if (id.equals(_primaryId)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not testing a tunnel with itself [duh]");
} else {
@@ -124,39 +160,36 @@ class TestTunnelJob extends JobImpl {
}
}
_log.error("Unable to test tunnel " + _id + ", since there are NO OTHER INBOUND TUNNELS to receive the ack through");
_log.error("Unable to test tunnel " + _primaryId + ", since there are NO OTHER INBOUND TUNNELS to receive the ack through");
return null;
}
/**
* Send a message to the gateway and wait for it to arrive.
* todo: send the message to the gateway via an outbound tunnel or garlic, NOT DIRECT.
* Get the tunnel to send thte message out when testing inbound tunnels
*
*/
private void testInbound(TunnelInfo info) {
if (_log.shouldLog(Log.INFO))
_log.info("Testing inbound tunnel " + info);
DeliveryStatusMessage msg = new DeliveryStatusMessage(_context);
msg.setArrival(new Date(_context.clock().now()));
msg.setMessageId(_nonce);
TestFailedJob failureJob = new TestFailedJob();
MessageSelector selector = new TestMessageSelector(msg.getMessageId(), info.getTunnelId().getTunnelId());
TunnelMessage tmsg = new TunnelMessage(_context);
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
msg.writeBytes(baos);
tmsg.setData(baos.toByteArray());
tmsg.setTunnelId(info.getTunnelId());
_context.jobQueue().addJob(new SendMessageDirectJob(_context, tmsg, info.getThisHop(), new TestSuccessfulJob(), failureJob, selector, _context.clock().now() + TEST_TIMEOUT, TEST_PRIORITY));
String bodyType = msg.getClass().getName();
_context.messageHistory().wrap(bodyType, msg.getUniqueId(), TunnelMessage.class.getName(), tmsg.getUniqueId());
} catch (IOException ioe) {
_log.error("Error writing out the tunnel message to send to the tunnel", ioe);
_pool.tunnelFailed(_id);
} catch (DataFormatException dfe) {
_log.error("Error writing out the tunnel message to send to the tunnel", dfe);
_pool.tunnelFailed(_id);
private TunnelId getOutboundTunnel() {
TunnelSelectionCriteria crit = new TunnelSelectionCriteria();
crit.setMinimumTunnelsRequired(2);
crit.setMaximumTunnelsRequired(2);
// arbitrary priorities
crit.setAnonymityPriority(50);
crit.setLatencyPriority(50);
crit.setReliabilityPriority(50);
List tunnelIds = _context.tunnelManager().selectOutboundTunnelIds(crit);
for (int i = 0; i < tunnelIds.size(); i++) {
TunnelId id = (TunnelId)tunnelIds.get(i);
if (id.equals(_primaryId)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not testing a tunnel with itself [duh]");
} else {
return id;
}
}
_log.error("Unable to test tunnel " + _primaryId + ", since there are NO OTHER OUTBOUND TUNNELS to send the ack through");
return null;
}
private class TestFailedJob extends JobImpl {
@@ -167,8 +200,17 @@ class TestTunnelJob extends JobImpl {
public String getName() { return "Tunnel Test Failed"; }
public void runJob() {
if (_log.shouldLog(Log.WARN))
_log.warn("Test of tunnel " + _id.getTunnelId() + " failed while waiting for nonce " + _nonce, getAddedBy());
_pool.tunnelFailed(_id);
_log.warn("Test of tunnel " + _primaryId.getTunnelId()
+ " failed while waiting for nonce " + _nonce + ": "
+ _pool.getTunnelInfo(_primaryId), getAddedBy());
_pool.tunnelFailed(_primaryId);
if (_secondaryId != null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Secondary test of tunnel " + _secondaryId.getTunnelId()
+ " failed while waiting for nonce " + _nonce + ": "
+ _pool.getTunnelInfo(_secondaryId), getAddedBy());
_pool.tunnelFailed(_secondaryId);
}
}
}
@@ -183,10 +225,30 @@ class TestTunnelJob extends JobImpl {
public void runJob() {
long time = (_context.clock().now() - _msg.getArrival().getTime());
if (_log.shouldLog(Log.INFO))
_log.info("Test of tunnel " + _id+ " successfull after " + time + "ms waiting for " + _nonce);
TunnelInfo info = _pool.getTunnelInfo(_id);
if (info != null)
_log.info("Test of tunnel " + _primaryId+ " successfull after "
+ time + "ms waiting for " + _nonce);
TunnelInfo info = _pool.getTunnelInfo(_primaryId);
if (info != null) {
TestTunnelJob.this._context.messageHistory().tunnelValid(info, time);
updateProfiles(info, time);
}
info = _pool.getTunnelInfo(_secondaryId);
if (info != null) {
TestTunnelJob.this._context.messageHistory().tunnelValid(info, time);
updateProfiles(info, time);
}
_context.statManager().addRateData("tunnel.testSuccessTime", time, time);
}
private void updateProfiles(TunnelInfo info, long time) {
TunnelInfo cur = info;
while (cur != null) {
Hash peer = cur.getThisHop();
if ( (peer != null) && (!_context.routerHash().equals(peer)) )
_context.profileManager().tunnelTestSucceeded(peer, time);
cur = cur.getNextHopInfo();
}
}
public void setMessage(I2NPMessage message) {
@@ -205,15 +267,16 @@ class TestTunnelJob extends JobImpl {
_found = false;
_expiration = _context.clock().now() + TEST_TIMEOUT;
if (_log.shouldLog(Log.DEBUG))
_log.debug("the expiration while testing tunnel " + tunnelId + " waiting for nonce " + id + ": " + new Date(_expiration));
_log.debug("the expiration while testing tunnel " + tunnelId
+ " waiting for nonce " + id + ": " + new Date(_expiration));
}
public boolean continueMatching() {
if (!_found) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Continue matching while looking for nonce for tunnel " + _tunnelId);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Don't continue matching for tunnel " + _tunnelId + " / " + _id);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Don't continue matching for tunnel " + _tunnelId + " / " + _id);
}
return !_found;
}
@@ -229,12 +292,15 @@ class TestTunnelJob extends JobImpl {
DeliveryStatusMessage msg = (DeliveryStatusMessage)message;
if (msg.getMessageId() == _id) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Found successful test of tunnel " + _tunnelId + " after " + (_context.clock().now() - msg.getArrival().getTime()) + "ms waiting for " + _id);
_log.debug("Found successful test of tunnel " + _tunnelId + " after "
+ (_context.clock().now() - msg.getArrival().getTime())
+ "ms waiting for " + _id);
_found = true;
return true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Found a delivery status message, but it contains nonce " + msg.getMessageId() + " and not " + _id);
_log.debug("Found a delivery status message, but it contains nonce "
+ msg.getMessageId() + " and not " + _id);
}
} else {
//_log.debug("Not a match while looking to test tunnel " + _tunnelId + " with nonce " + _id + " (" + message + ")");
@@ -244,7 +310,8 @@ class TestTunnelJob extends JobImpl {
public String toString() {
StringBuffer buf = new StringBuffer(256);
buf.append(super.toString());
buf.append(": TestMessageSelector: tunnel ").append(_tunnelId).append(" looking for ").append(_id).append(" expiring on ");
buf.append(": TestMessageSelector: tunnel ").append(_tunnelId);
buf.append(" looking for ").append(_id).append(" expiring on ");
buf.append(new Date(_expiration));
return buf.toString();
}

View File

@@ -32,16 +32,13 @@ class TunnelTestManager {
private TunnelPool _pool;
private boolean _stopTesting;
private final static long MINIMUM_RETEST_DELAY = 60*1000; // dont test tunnels more than once every 30 seconds
/** avg # tests per tunnel lifetime that we want */
private final static int TESTS_PER_DURATION = 2;
/** how many times we'll be able to try the tests (this should take into consideration user prefs, but fsck it for now) */
private final static int CHANCES_PER_DURATION = 8;
/** dont test any particular tunnel more than once a minute */
private final static long MINIMUM_RETEST_DELAY = 60*1000;
public TunnelTestManager(RouterContext ctx, TunnelPool pool) {
_context = ctx;
_log = ctx.logManager().getLog(TunnelTestManager.class);
ctx.statManager().createRateStat("tunnel.testSuccessTime", "How long do successful tunnel tests take?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
_pool = pool;
_stopTesting = false;
_context.jobQueue().addJob(new CoordinateTunnelTestingJob());
@@ -61,18 +58,28 @@ class TunnelTestManager {
// skip not ready tunnels
} else if (info.getSettings().getExpiration() < now + MINIMUM_RETEST_DELAY) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tunnel " + id.getTunnelId() + " will be expiring within the current period (" + new Date(info.getSettings().getExpiration()) + "), so skip testing it");
_log.debug("Tunnel " + id.getTunnelId()
+ " will be expiring within the current period ("
+ new Date(info.getSettings().getExpiration())
+ "), so skip testing it");
} else if (info.getSettings().getCreated() + MINIMUM_RETEST_DELAY < now) {
double probability = TESTS_PER_DURATION / (allIds.size() * CHANCES_PER_DURATION);
if (_context.random().nextInt(10) <= (probability*10d)) {
toTest.add(id);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tunnel " + id.getTunnelId() + " could be tested, but probabilistically isn't going to be");
// we're past the initial buffer period
if (info.getLastTested() + MINIMUM_RETEST_DELAY < now) {
// we haven't tested this tunnel in the minimum delay, so maybe we
// should.
if (_context.random().nextBoolean()) {
toTest.add(id);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("We could have tested tunnel " + id.getTunnelId()
+ ", but randomly decided not to.");
}
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tunnel " + id.getTunnelId() + " was just created (" + new Date(info.getSettings().getCreated()) + "), wait until the next pass to test it");
_log.debug("Tunnel " + id.getTunnelId() + " was just created ("
+ new Date(info.getSettings().getCreated())
+ "), wait until the next pass to test it");
}
} else {
if (_log.shouldLog(Log.WARN))
@@ -112,11 +119,8 @@ class TunnelTestManager {
}
private void reschedule() {
long minNext = TunnelTestManager.this._context.clock().now() + MINIMUM_RETEST_DELAY;
long nxt = minNext + TunnelTestManager.this._context.random().nextInt(60*1000); // test tunnels once every 30-90 seconds
long nxt = TunnelTestManager.this._context.clock().now() + 30*1000;
getTiming().setStartAfter(nxt);
if (_log.shouldLog(Log.INFO))
_log.info("Rescheduling tunnel tests for " + new Date(nxt));
TunnelTestManager.this._context.jobQueue().addJob(CoordinateTunnelTestingJob.this);
}
}