i2psnark: Update UDP tracker (Proposal 160) (#504)

from three years ago,
to match current version of Proposal 160.
Revert back to standard BEP 15 from "fast mode".
Add TrackerClient and I2PSnarkUtil changes.
Datagram2/3 hooked in.
Variable lifetime handled.

Co-authored-by: zzz <zzz@i2pmail.org>
Reviewed-on: I2P_Developers/i2p.i2p#504
Reviewed-by: idk <idki2p@mail.i2p>
This commit is contained in:
zzz
2025-06-21 06:47:28 -04:00
parent 5bb3cc488c
commit d53949f970
6 changed files with 514 additions and 98 deletions

View File

@ -86,6 +86,8 @@ public class I2PSnarkUtil implements DisconnectListener {
private boolean _areFilesPublic;
private List<String> _openTrackers;
private DHT _dht;
private boolean _enableUDP = ENABLE_UDP_TRACKER;
private UDPTrackerClient _udpTracker;
private long _startedTime;
private final DisconnectListener _discon;
private int _maxFilesPerTorrent = SnarkManager.DEFAULT_MAX_FILES_PER_TORRENT;
@ -99,6 +101,7 @@ public class I2PSnarkUtil implements DisconnectListener {
public static final String PROP_MAX_BW = "i2cp.outboundBytesPerSecond";
public static final boolean DEFAULT_USE_DHT = true;
public static final String EEPGET_USER_AGENT = "I2PSnark";
private static final boolean ENABLE_UDP_TRACKER = true;
private static final List<String> HIDDEN_I2CP_OPTS = Arrays.asList(new String[] {
PROP_MAX_BW, "inbound.length", "outbound.length", "inbound.quantity", "outbound.quantity"
});
@ -355,6 +358,11 @@ public class I2PSnarkUtil implements DisconnectListener {
}
if (_shouldUseDHT && _manager != null && _dht == null)
_dht = new KRPC(_context, _baseName, _manager.getSession());
if (_enableUDP &&_manager != null) {
if (_udpTracker == null)
_udpTracker = new UDPTrackerClient(_context, _manager.getSession(), this);
_udpTracker.start();
}
return (_manager != null);
}
@ -381,6 +389,12 @@ public class I2PSnarkUtil implements DisconnectListener {
*/
public DHT getDHT() { return _dht; }
/**
* @return null if disabled or not started
* @since 0.9.14
*/
public UDPTrackerClient getUDPTrackerClient() { return _udpTracker; }
public boolean connected() { return _manager != null; }
/** @since 0.9.1 */
@ -403,6 +417,10 @@ public class I2PSnarkUtil implements DisconnectListener {
_dht.stop();
_dht = null;
}
if (_udpTracker != null) {
_udpTracker.stop();
_udpTracker = null;
}
_startedTime = 0;
I2PSocketManager mgr = _manager;
// FIXME this can cause race NPEs elsewhere
@ -753,6 +771,16 @@ public class I2PSnarkUtil implements DisconnectListener {
return _shouldUseDHT;
}
/** @since 0.9.67 */
public void setUDPEnabled(boolean yes) {
_enableUDP = yes;
}
/** @since 0.9.67 */
public boolean udpEnabled() {
return _enableUDP;
}
/** @since 0.9.31 */
public void setRatingsEnabled(boolean yes) {
_enableRatings = yes;

View File

@ -3081,6 +3081,13 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
*/
public List<Tracker> getSortedTrackers() {
List<Tracker> rv = new ArrayList<Tracker>(_trackerMap.values());
if (!_util.udpEnabled()) {
for (Iterator<Tracker> iter = rv.iterator(); iter.hasNext(); ) {
Tracker tr = iter.next();
if (tr.announceURL.startsWith("udp://"))
iter.remove();
}
}
Collections.sort(rv, new IgnoreCaseComparator());
return rv;
}

View File

@ -28,6 +28,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
@ -89,6 +90,7 @@ public class TrackerClient implements Runnable {
/** No guidance in BEP 5; standard practice is K (=8) */
private static final int DHT_ANNOUNCE_PEERS = 4;
public static final int PORT = 6881;
private static final int DEFAULT_UDP_TRACKER_PORT = 6969;
private static final int MAX_TRACKERS = 12;
// tracker.welterde.i2p
private static final Hash DSA_ONLY_TRACKER = ConvertToHash.getHash("cfmqlafjfmgkzbt4r3jsfyhgsr5abgxryl6fnz3d3y5a365di5aa.b32.i2p");
@ -291,7 +293,6 @@ public class TrackerClient implements Runnable {
// followed by the secondary open trackers
// It's painful, but try to make sure if an open tracker is also
// the primary tracker, that we don't add it twice.
// todo: check for b32 matches as well
String primary = null;
if (meta != null)
primary = meta.getAnnounce();
@ -316,37 +317,31 @@ public class TrackerClient implements Runnable {
// announce list
// We completely ignore the BEP 12 processing rules
if (meta != null && !meta.isPrivate()) {
List<String> urls = new ArrayList<String>(16);
List<List<String>> list = meta.getAnnounceList();
if (list != null) {
for (List<String> llist : list) {
for (String url : llist) {
if (!isNewValidTracker(trackerHashes, url))
continue;
trackers.add(new TCTracker(url, trackers.isEmpty()));
if (_log.shouldLog(Log.DEBUG))
_log.debug("Additional announce (list): [" + url + "] for infoHash: " + infoHash);
urls.add(url);
}
}
if (trackers.size() > 2) {
// shuffle everything but the primary
TCTracker pri = trackers.remove(0);
Collections.shuffle(trackers, _util.getContext().random());
trackers.add(0, pri);
}
// configured open trackers
urls.addAll(_util.getOpenTrackers());
if (urls.size() > 1) {
Collections.shuffle(trackers, _util.getContext().random());
if (_util.udpEnabled()) {
// sort the list to put udp first so it will trump http
Collections.sort(urls, new URLComparator());
}
}
}
// configured open trackers
if (meta == null || !meta.isPrivate()) {
List<String> tlist = _util.getOpenTrackers();
for (int i = 0; i < tlist.size(); i++) {
String url = tlist.get(i);
for (String url : urls) {
if (!isNewValidTracker(trackerHashes, url))
continue;
// opentrackers are primary if we don't have primary
trackers.add(new TCTracker(url, trackers.isEmpty()));
if (_log.shouldLog(Log.DEBUG))
_log.debug("Additional announce: [" + url + "] for infoHash: " + infoHash);
// first one is primary if we don't have a primary
trackers.add(new TCTracker(url, trackers.isEmpty()));
if (_log.shouldLog(Log.DEBUG))
_log.debug("Additional announce: [" + url + "] for infoHash: " + infoHash);
}
}
@ -526,18 +521,30 @@ public class TrackerClient implements Runnable {
if (len > 0 && downloaded > len)
downloaded = len;
left = coordinator.getLeft();
String event;
if (!tr.started) {
event = STARTED_EVENT;
} else if (newlyCompleted) {
event = COMPLETED_EVENT;
TrackerInfo info;
if (tr.isUDP) {
int event;
if (!tr.started) {
event = UDPTrackerClient.EVENT_STARTED;
} else if (newlyCompleted) {
event = UDPTrackerClient.EVENT_COMPLETED;
} else {
event = UDPTrackerClient.EVENT_NONE;
}
info = doRequest(tr, uploaded, downloaded, left, event);
} else {
event = NO_EVENT;
String event;
if (!tr.started) {
event = STARTED_EVENT;
} else if (newlyCompleted) {
event = COMPLETED_EVENT;
} else {
event = NO_EVENT;
}
info = doRequest(tr, infoHash, peerID,
uploaded, downloaded, left,
event);
}
TrackerInfo info = doRequest(tr, infoHash, peerID,
uploaded, downloaded, left,
event);
snark.setTrackerProblems(null);
tr.trackerProblems = null;
tr.registerFails = 0;
@ -839,21 +846,27 @@ public class TrackerClient implements Runnable {
if (len > 0 && downloaded > len)
downloaded = len;
long left = coordinator.getLeft();
try
{
try {
// Don't try to restart I2CP connection just to say goodbye
if (_util.connected()) {
if (tr.started && (!tr.stop) && tr.trackerProblems == null)
doRequest(tr, infoHash, peerID, uploaded,
downloaded, left, STOPPED_EVENT);
}
}
if (_util.connected()) {
if (tr.started && (!tr.stop) && tr.trackerProblems == null) {
if (tr.isUDP) {
doRequest(tr, uploaded,
downloaded, left, UDPTrackerClient.EVENT_STOPPED);
} else {
doRequest(tr, infoHash, peerID, uploaded,
downloaded, left, STOPPED_EVENT);
}
}
}
}
catch(IOException ioe) { /* ignored */ }
tr.reset();
}
}
/**
* HTTP - blocking
*
* Note: IOException message text gets displayed in the UI
*
@ -922,6 +935,48 @@ public class TrackerClient implements Runnable {
return info;
}
/**
* UDP - blocking
*
* @return null if _fastUnannounce && event == STOPPED
* @since 0.9.54
*/
private TrackerInfo doRequest(TCTracker tr, long uploaded,
long downloaded, long left, int event) throws IOException {
UDPTrackerClient udptc = _util.getUDPTrackerClient();
if (udptc == null)
throw new IOException("no UDPTC");
if (_log.shouldLog(Log.INFO))
_log.info("Sending UDPTrackerClient request");
tr.lastRequestTime = System.currentTimeMillis();
// Don't wait for a response to stopped when shutting down
boolean fast = _fastUnannounce && event == UDPTrackerClient.EVENT_STOPPED;
long maxWait = fast ? 5*1000 : 60*1000;
boolean small = left == 0 || event == UDPTrackerClient.EVENT_STOPPED || !coordinator.needOutboundPeers();
int numWant = small ? 0 : _util.getMaxConnections();
UDPTrackerClient.TrackerResponse fetched = udptc.announce(meta.getInfoHash(), snark.getID(), numWant,
maxWait, tr.host, tr.port,
downloaded, left, uploaded, event, fast);
if (fast)
return null;
if (fetched == null)
throw new IOException("UDP announce error to: " + tr.host);
TrackerInfo info = new TrackerInfo(fetched.getPeers(), fetched.getInterval(), fetched.getSeedCount(),
fetched.getLeechCount(), fetched.getFailureReason(),
snark.getID(), snark.getInfoHash(), snark.getMetaInfo(), _util);
if (_log.shouldLog(Log.INFO))
_log.info("TrackerClient response: " + info);
String failure = info.getFailureReason();
if (failure != null)
throw new IOException(failure);
tr.interval = Math.max(MIN_TRACKER_ANNOUNCE_INTERVAL, info.getInterval() * 1000l);
return info;
}
/**
* Very lazy byte[] to URL encoder. Just encodes almost everything, even
* some "normal" chars.
@ -969,8 +1024,12 @@ public class TrackerClient implements Runnable {
String path = url.getPath();
if (path == null || !path.startsWith("/"))
return false;
return "http".equals(url.getScheme()) && url.getHost() != null &&
(url.getHost().endsWith(".i2p") || url.getHost().equals("i2p"));
String scheme = url.getScheme();
if (!("http".equals(scheme) || "udp".equals(scheme)))
return false;
String host = url.getHost();
return host != null &&
(host.endsWith(".i2p") || host.equals("i2p"));
}
/**
@ -980,14 +1039,15 @@ public class TrackerClient implements Runnable {
* @return a Hash for i2p hosts only, null otherwise
* @since 0.9.5
*/
private static Hash getHostHash(String ann) {
private Hash getHostHash(String ann) {
URI url;
try {
url = new URI(ann);
} catch (URISyntaxException use) {
return null;
}
if (!"http".equals(url.getScheme()))
String scheme = url.getScheme();
if (!("http".equals(scheme) || (_util.udpEnabled() && "udp".equals(scheme))))
return null;
String host = url.getHost();
if (host == null) {
@ -1022,11 +1082,30 @@ public class TrackerClient implements Runnable {
return null;
}
/**
* UDP before HTTP
*
* @since 0.9.67
*/
private static class URLComparator implements Comparator<String> {
public int compare(String l, String r) {
boolean ul = l.startsWith("udp://");
boolean ur = r.startsWith("udp://");
if (ul && !ur)
return -1;
if (ur && !ul)
return -1;
return 0;
}
}
private static class TCTracker
{
final String announce;
final String host;
final boolean isPrimary;
final boolean isUDP;
final int port;
long interval;
long lastRequestTime;
String trackerProblems;
@ -1037,14 +1116,27 @@ public class TrackerClient implements Runnable {
int seenPeers;
/**
* @param a must be a valid http URL with a path
* @param a must be a valid http URL with a path,
* or a udp URL (path is ignored)
* @param p true if primary
*/
public TCTracker(String a, boolean p)
{
announce = a;
String s = a.substring(7);
host = s.substring(0, s.indexOf('/'));
URI url;
try {
url = new URI(a);
isUDP = "udp".equals(url.getScheme());
host = url.getHost();
int pt = url.getPort();
if (pt < 0) {
pt = isUDP ? DEFAULT_UDP_TRACKER_PORT : 80;
}
port = pt;
} catch (URISyntaxException use) {
// shouldn't happen, already validated
throw new IllegalArgumentException(use);
}
isPrimary = p;
interval = INITIAL_SLEEP;
}

View File

@ -28,6 +28,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import net.i2p.data.Hash;
import org.klomp.snark.bencode.BDecoder;
import org.klomp.snark.bencode.BEValue;
import org.klomp.snark.bencode.InvalidBEncodingException;
@ -123,6 +125,21 @@ class TrackerInfo
}
******/
/**
* To convert returned UDPTracker data to the standard structure
* @param hashes may be null
* @param error may be null
* @since 0.9.14
*/
public TrackerInfo(Set<Hash> hashes, int interval, int complete, int incomplete, String error,
byte[] my_id, byte[] infohash, MetaInfo metainfo, I2PSnarkUtil util) {
peers = getPeers(hashes, my_id, infohash, metainfo, util);
this.interval = interval;
this.complete = complete;
this.incomplete = incomplete;
failure_reason = error;
}
/** List of Dictionaries or List of Strings */
private static Set<Peer> getPeers(List<BEValue> l, byte[] my_id, byte[] infohash, MetaInfo metainfo, I2PSnarkUtil util)
throws IOException
@ -179,6 +196,31 @@ class TrackerInfo
return peers;
}
/**
* From Hash to Peer
* @since 0.9.14
*/
private static Set<Peer> getPeers(Set<Hash> hashes, byte[] my_id,
byte[] infohash, MetaInfo metainfo, I2PSnarkUtil util) {
if (hashes == null)
return Collections.emptySet();
Set<Peer> peers = new HashSet<Peer>(hashes.size());
for (Hash h : hashes) {
PeerID peerID;
byte[] hash = new byte[HASH_LENGTH];
System.arraycopy(h.getData(), 0, hash, 0, HASH_LENGTH);
try {
peerID = new PeerID(hash, util);
} catch (InvalidBEncodingException ibe) {
// won't happen
continue;
}
peers.add(new Peer(peerID, my_id, infohash, metainfo));
}
return peers;
}
public Set<Peer> getPeers()
{
return peers;

View File

@ -1,5 +1,6 @@
package org.klomp.snark;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
@ -11,8 +12,8 @@ import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionMuxedListener;
import net.i2p.client.SendMessageOptions;
import net.i2p.client.datagram.I2PDatagramDissector;
import net.i2p.client.datagram.I2PDatagramMaker;
import net.i2p.client.datagram.Datagram2;
import net.i2p.client.datagram.Datagram3;
import net.i2p.client.datagram.I2PInvalidDatagramException;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
@ -21,6 +22,9 @@ import net.i2p.data.Hash;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
import org.klomp.snark.I2PSnarkUtil;
import org.klomp.snark.TrackerClient;
/**
* One of these for all trackers and info hashes.
* Ref: BEP 15, proposal 160
@ -28,15 +32,17 @@ import net.i2p.util.SimpleTimer2;
* The main difference from BEP 15 is that the announce response
* contains a 32-byte hash instead of a 4-byte IP and a 2-byte port.
*
* This implements only "fast mode".
* We send only repliable datagrams, and
* receive only raw datagrams, as follows:
* We send both repliable and raw datagrams, but
* we only receive raw datagrams, as follows:
*
*<pre>
* client tracker type
* ------ ------- ----
* announce --&gt; repliable
* &lt;-- ann resp raw
* conn req --> (repliable to query port)
* <-- conn resp (raw from resp port)
* announce --> (raw to resp port)
* <-- ann resp (raw from resp port)
* <-- error (raw from resp port)
*</pre>
*
* @since 0.9.53, enabled in 0.9.54
@ -48,6 +54,8 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
/** hook to inject and receive datagrams */
private final I2PSession _session;
private final I2PSnarkUtil _util;
/** 20 byte random id */
private final int _myKey;
private final Hash _myHash;
/** unsigned dgrams */
private final int _rPort;
@ -55,8 +63,11 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
private final ConcurrentHashMap<HostPort, Tracker> _trackers;
/** our TID to tracker */
private final Map<Integer, ReplyWaiter> _sentQueries;
private final SimpleTimer2.TimedEvent _cleaner;
private boolean _isRunning;
private static final long INIT_CONN_ID = 0x41727101980L;
public static final int EVENT_NONE = 0;
public static final int EVENT_COMPLETED = 1;
public static final int EVENT_STARTED = 2;
@ -70,6 +81,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
private static final int SEND_CRYPTO_TAGS = 8;
private static final int LOW_CRYPTO_TAGS = 4;
private static final long CONN_EXPIRATION = 60*1000; // BEP 15
private static final long DEFAULT_TIMEOUT = 15*1000;
private static final long DEFAULT_QUERY_TIMEOUT = 60*1000;
private static final long CLEAN_TIME = 163*1000;
@ -79,8 +91,6 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
private static final int MIN_INTERVAL = 15*60;
private static final int MAX_INTERVAL = 8*60*60;
private enum WaitState { INIT, SUCCESS, TIMEOUT, FAIL }
/**
*
*/
@ -90,9 +100,11 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
_util = util;
_log = ctx.logManager().getLog(UDPTrackerClient.class);
_rPort = TrackerClient.PORT - 1;
_myKey = ctx.random().nextInt();
_myHash = session.getMyDestination().calculateHash();
_trackers = new ConcurrentHashMap<HostPort, Tracker>(8);
_sentQueries = new ConcurrentHashMap<Integer, ReplyWaiter>(32);
_cleaner = new Cleaner();
}
@ -104,6 +116,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
return;
_session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM_RAW, _rPort);
_isRunning = true;
_cleaner.schedule(7 * CLEAN_TIME);
}
/**
@ -114,6 +127,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
return;
_isRunning = false;
_session.removeListener(I2PSession.PROTO_DATAGRAM_RAW, _rPort);
_cleaner.cancel();
_trackers.clear();
for (ReplyWaiter w : _sentQueries.values()) {
w.cancel();
@ -128,7 +142,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
*
* @param ih the Info Hash (torrent)
* @param max maximum number of peers to return
* @param maxWait the maximum time to wait (ms) must be greater than 0
* @param maxWait the maximum time to wait (ms) must be > 0
* @param fast if true, don't wait for dest, no retx, ...
* @return null on fail or if fast is true
*/
@ -138,7 +152,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
int event, boolean fast) {
long now = _context.clock().now();
long end = now + maxWait;
if (toPort < 0)
if (toPort <= 0)
throw new IllegalArgumentException();
Tracker tr = getTracker(toHost, toPort);
if (tr.getDest(fast) == null) {
@ -154,6 +168,12 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
_log.info("out of time after resolving: " + tr);
return null;
}
Long cid = getConnection(tr, now + toWait);
if (cid == null) {
if (_log.shouldInfo())
_log.info("no connection for: " + tr);
return null;
}
if (fast) {
toWait = 0;
} else {
@ -164,7 +184,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
return null;
}
}
ReplyWaiter w = sendAnnounce(tr, 0, ih, peerID,
ReplyWaiter w = sendAnnounce(tr, cid.longValue(), ih, peerID,
downloaded, left, uploaded, event, max, toWait);
if (fast)
return null;
@ -184,6 +204,55 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
//////// private below here
/**
* @return the connection ID, or null on fail
*/
private Long getConnection(Tracker tr, long untilTime) {
boolean shouldConnect = false;
synchronized(tr) {
boolean wasInProgress = false;
while(true) {
Long conn = tr.getConnection();
if (conn != null)
return conn;
// don't resend right after somebody else failed
if (wasInProgress)
return null;
long now = _context.clock().now();
long toWait = untilTime - now;
if (toWait <= 0)
return null;
if (tr.isConnInProgress()) {
wasInProgress = true;
try {
tr.wait(toWait);
} catch (InterruptedException ie) {}
} else {
shouldConnect = true;
tr.setConnInProgress(true);
break;
}
}
}
if (shouldConnect) {
long now = _context.clock().now();
long toWait = untilTime - now;
if (toWait <= 1000) {
tr.setConnInProgress(false);
return null;
}
ReplyWaiter w = sendConnReq(tr, toWait);
if (w == null) {
tr.setConnInProgress(false);
return null;
}
boolean success = waitAndRetransmit(w, untilTime);
if (success)
return tr.getConnection();
}
return null;
}
/**
* @return non-null
*/
@ -197,6 +266,40 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
///// Sending.....
/**
* Send one time with a new tid
* @param toWait > 0
* @return null on failure or if toWait <= 0
*/
private ReplyWaiter sendConnReq(Tracker tr, long toWait) {
if (toWait <= 0)
throw new IllegalArgumentException();
int tid = _context.random().nextInt();
byte[] payload = sendConnReq(tr, tid);
if (payload != null) {
ReplyWaiter rv = new ReplyWaiter(tid, tr, ACTION_CONNECT, payload, toWait);
_sentQueries.put(Integer.valueOf(tid), rv);
if (_log.shouldInfo())
_log.info("Sent: " + rv + " timeout: " + toWait);
return rv;
}
return null;
}
/**
* Send one time with given tid
* @return the payload or null on failure
*/
private byte[] sendConnReq(Tracker tr, int tid) {
// same as BEP 15
byte[] payload = new byte[16];
DataHelper.toLong8(payload, 0, INIT_CONN_ID);
// next 4 bytes are already zero
DataHelper.toLong(payload, 12, 4, tid);
boolean rv = sendMessage(tr.getDest(true), tr.getPort(), payload, true);
return rv ? payload : null;
}
/**
* Send one time with a new tid
* @param toWait if <= 0 does not register
@ -210,7 +313,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
byte[] payload = sendAnnounce(tr, tid, connID, ih, id, downloaded, left, uploaded, event, numWant);
if (payload != null) {
if (toWait > 0) {
ReplyWaiter rv = new ReplyWaiter(tid, tr, payload, toWait);
ReplyWaiter rv = new ReplyWaiter(tid, tr, ACTION_ANNOUNCE, payload, toWait);
_sentQueries.put(Integer.valueOf(tid), rv);
if (_log.shouldInfo())
_log.info("Sent: " + rv + " timeout: " + toWait);
@ -242,7 +345,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
DataHelper.toLong(payload, 80, 4, event);
DataHelper.toLong(payload, 92, 4, numWant);
DataHelper.toLong(payload, 96, 2, TrackerClient.PORT);
boolean rv = sendMessage(tr.getDest(true), tr.getPort(), payload, true);
boolean rv = sendMessage(tr.getDest(true), tr.getPort(), payload, false);
return rv ? payload : null;
}
@ -257,9 +360,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
if (toWait <= 0)
return false;
w.wait(toWait);
} catch (InterruptedException ie) {
return false;
}
} catch (InterruptedException ie) {}
switch (w.getState()) {
case INIT:
continue;
@ -290,11 +391,12 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
* @return success
*/
private boolean resend(ReplyWaiter w, long toWait) {
boolean repliable = w.getExpectedAction() == ACTION_CONNECT;
Tracker tr = w.getSentTo();
int port = tr.getPort();
if (_log.shouldInfo())
_log.info("Resending: " + w + " timeout: " + toWait);
boolean rv = sendMessage(tr.getDest(true), port, w.getPayload(), true);
boolean rv = sendMessage(tr.getDest(true), port, w.getPayload(), repliable);
if (rv) {
_sentQueries.put(Integer.valueOf(w.getID()), w);
w.schedule(toWait);
@ -319,28 +421,35 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
_log.info("send failed, no dest");
return false;
}
if (dest.calculateHash().equals(_myHash))
Hash to = dest.calculateHash();
if (to.equals(_myHash))
throw new IllegalArgumentException("don't send to ourselves");
if (repliable) {
I2PDatagramMaker dgMaker = new I2PDatagramMaker(_session);
payload = dgMaker.makeI2PDatagram(payload);
if (payload == null) {
try {
payload = Datagram2.make(_context, _session, payload, to);
} catch (DataFormatException dfe) {
if (_log.shouldWarn())
_log.warn("DGM fail");
_log.warn("DG2 fail", dfe);
return false;
}
} else {
try {
payload = Datagram3.make(_context, _session, payload);
} catch (DataFormatException dfe) {
if (_log.shouldWarn())
_log.warn("DG3 fail", dfe);
return false;
}
}
SendMessageOptions opts = new SendMessageOptions();
opts.setDate(_context.clock().now() + 60*1000);
opts.setTagsToSend(SEND_CRYPTO_TAGS);
opts.setTagThreshold(LOW_CRYPTO_TAGS);
if (!repliable)
opts.setSendLeaseSet(false);
try {
boolean success = _session.sendMessage(dest, payload, 0, payload.length,
repliable ? I2PSession.PROTO_DATAGRAM : I2PSession.PROTO_DATAGRAM_RAW,
repliable ? I2PSession.PROTO_DATAGRAM2 : I2PSession.PROTO_DATAGRAM3,
_rPort, toPort, opts);
if (success) {
// ...
@ -376,36 +485,69 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
_log.info("Rcvd msg with no one waiting: " + tid);
return;
}
int expect = waiter.getExpectedAction();
if (expect != action && action != ACTION_ERROR) {
if (_log.shouldInfo())
_log.info("Got action " + action + " but wanted " + expect + " for: " + waiter);
waiter.gotReply(false);
return;
}
if (action == ACTION_ANNOUNCE) {
switch (action) {
case ACTION_CONNECT:
receiveConnection(waiter, payload, fromPort);
break;
case ACTION_ANNOUNCE:
receiveAnnounce(waiter, payload);
} else if (action == ACTION_ERROR) {
receiveError(waiter, payload);
} else {
// includes ACTION_CONNECT
break;
case ACTION_ERROR:
receiveError(waiter, payload, expect);
break;
default:
if (_log.shouldInfo())
_log.info("Rcvd msg with unknown action: " + action + " for: " + waiter);
waiter.gotReply(false);
Tracker tr = waiter.getSentTo();
tr.gotError();
break;
}
}
/**
* @param lifetime ms
*/
private void receiveConnection(ReplyWaiter waiter, byte[] payload, int fromPort) {
Tracker tr = waiter.getSentTo();
if (payload.length >= 16) {
long cid = DataHelper.fromLong8(payload, 8);
long lifetime;
if (payload.length >= 18) {
// extension to BEP 15
lifetime = DataHelper.fromLong(payload, 16, 2) * 1000;
} else {
lifetime = CONN_EXPIRATION;
}
if (_log.shouldInfo())
_log.info("Rcvd connect response, id = " + cid + " lifetime = " + (lifetime / 1000) + " from " + tr);
tr.setConnection(cid, fromPort, lifetime);
waiter.gotReply(true);
} else {
waiter.gotReply(false);
tr.gotError();
}
}
private void receiveAnnounce(ReplyWaiter waiter, byte[] payload) {
Tracker tr = waiter.getSentTo();
if (payload.length >= 22) {
if (payload.length >= 20) {
int interval = Math.min(MAX_INTERVAL, Math.max(MIN_INTERVAL,
(int) DataHelper.fromLong(payload, 8, 4)));
int leeches = (int) DataHelper.fromLong(payload, 12, 4);
int seeds = (int) DataHelper.fromLong(payload, 16, 4);
int peers = (int) DataHelper.fromLong(payload, 20, 2);
if (22 + (peers * Hash.HASH_LENGTH) > payload.length) {
if (_log.shouldWarn())
_log.warn("Short reply");
waiter.gotReply(false);
tr.gotError();
return;
}
int peers = (payload.length - 20) / Hash.HASH_LENGTH;
if (_log.shouldInfo())
_log.info("Rcvd " + peers + " peers from " + tr);
Set<Hash> hashes;
@ -426,7 +568,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
}
}
private void receiveError(ReplyWaiter waiter, byte[] payload) {
private void receiveError(ReplyWaiter waiter, byte[] payload, int expected) {
String msg;
if (payload.length > 8) {
msg = DataHelper.getUTF8(payload, 8, payload.length - 8);
@ -437,6 +579,9 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
waiter.gotResponse(resp);
Tracker tr = waiter.getSentTo();
tr.gotError();
if (waiter.getExpectedAction() == ACTION_ANNOUNCE) {
// TODO if we were waiting for an announce reply, fire off a new connection request
}
}
// I2PSessionMuxedListener interface ----------------
@ -488,6 +633,43 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
_log.warn("UDPTC got error msg: ", error);
}
/**
* Cleaner-upper
*/
private class Cleaner extends SimpleTimer2.TimedEvent {
public Cleaner() {
super(SimpleTimer2.getInstance(), 7 * CLEAN_TIME);
}
public void timeReached() {
if (!_isRunning)
return;
long now = _context.clock().now();
/********
if (_log.shouldLog(Log.DEBUG))
_log.debug("UDPTC cleaner starting with " +
_blacklist.size() + " in blacklist, " +
_outgoingTokens.size() + " sent Tokens, " +
_incomingTokens.size() + " rcvd Tokens");
long expire = now - MAX_TOKEN_AGE;
for (Iterator<Token> iter = _outgoingTokens.keySet().iterator(); iter.hasNext(); ) {
Token tok = iter.next();
if (tok.lastSeen() < expire)
iter.remove();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("UDPTC cleaner done, now with " +
_blacklist.size() + " in blacklist, " +
_outgoingTokens.size() + " sent Tokens, " +
_incomingTokens.size() + " rcvd Tokens, " +
_knownNodes.size() + " known peers, " +
_sentQueries.size() + " queries awaiting response");
*******/
schedule(CLEAN_TIME);
}
}
public static class TrackerResponse {
private final int interval, complete, incomplete;
@ -578,20 +760,27 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
}
}
private enum ConnState { INVALID, IN_PROGRESS, VALID }
private class Tracker extends HostPort {
private final Object destLock = new Object();
private Destination dest;
// we store as a Long because all values are valid, so null is unset
private Long cid;
private long expires;
private long lastHeardFrom;
private long lastFailed;
private int consecFails;
private int responsePort;
private int interval = DEFAULT_INTERVAL;
private ConnState state = ConnState.INVALID;
private static final long DELAY = 15*1000;
public Tracker(String host, int port) {
super(host, port);
responsePort = port;
}
/**
@ -606,12 +795,57 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
}
}
public synchronized void setConnInProgress(boolean yes) {
if (yes)
state = ConnState.IN_PROGRESS;
else if (state == ConnState.IN_PROGRESS)
state = ConnState.INVALID;
}
public synchronized boolean isConnInProgress() {
return state == ConnState.IN_PROGRESS;
}
public synchronized boolean isConnValid() {
return state == ConnState.VALID &&
expires > _context.clock().now();
}
public synchronized void connFailed() {
replyTimeout();
expires = 0;
state = ConnState.INVALID;
}
/** does not change state */
public synchronized void replyTimeout() {
consecFails++;
lastFailed = _context.clock().now();
}
/**
* sets heardFrom
* @param lifetime ms
*/
public synchronized void setConnection(long cid, int rport, long lifetime) {
this.cid = Long.valueOf(cid);
responsePort = rport;
long now = _context.clock().now();
lastHeardFrom = now;
expires = now + lifetime;
consecFails = 0;
state = ConnState.VALID;
}
/**
* @return null if invalid
*/
public synchronized Long getConnection() {
if (isConnValid())
return cid;
return null;
}
public synchronized int getInterval() {
return interval;
}
@ -629,7 +863,9 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
public synchronized void gotError() {
long now = _context.clock().now();
lastHeardFrom = now;
consecFails = 0;
consecFails++;
state = ConnState.INVALID;
cid = null;
this.notifyAll();
}
@ -640,16 +876,20 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
@Override
public String toString() {
return "UDP Tracker " + host + ':' + port + " hasDest? " + (dest != null);
return "UDP Tracker " + host + ':' + port + " hasDest? " + (dest != null) +
"valid? " + isConnValid() + " conn ID: " + (cid != null ? cid : "none") + ' ' + state;
}
}
private enum WaitState { INIT, SUCCESS, TIMEOUT, FAIL }
/**
* Callback for replies
*/
private class ReplyWaiter extends SimpleTimer2.TimedEvent {
private final int tid;
private final Tracker sentTo;
private final int action;
private final byte[] data;
private TrackerResponse replyObject;
private WaitState state = WaitState.INIT;
@ -659,10 +899,11 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
* Any sent data to be remembered may be stored by setSentObject().
* Reply object may be in getReplyObject().
*/
public ReplyWaiter(int tid, Tracker tracker, byte[] payload, long toWait) {
public ReplyWaiter(int tid, Tracker tracker, int action, byte[] payload, long toWait) {
super(SimpleTimer2.getInstance(), toWait);
this.tid = tid;
sentTo = tracker;
this.action = action;
data = payload;
}
@ -674,6 +915,10 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
return sentTo;
}
public int getExpectedAction() {
return action;
}
public byte[] getPayload() {
return data;
}
@ -719,7 +964,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
*/
public synchronized void gotResponse(TrackerResponse resp) {
replyObject = resp;
gotReply(true);
gotReply(resp.error == null);
}
/**
@ -736,9 +981,9 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
// don't trump success or failure
if (state != WaitState.INIT)
return;
//if (action == ACTION_CONNECT)
// sentTo.connFailed();
//else
if (action == ACTION_CONNECT)
sentTo.connFailed();
else
sentTo.replyTimeout();
setState(WaitState.TIMEOUT);
if (_log.shouldWarn())
@ -747,7 +992,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener {
@Override
public String toString() {
return "Waiting for ID: " + tid + " to: " + sentTo + " state: " + state;
return "Message type: " + action + " ID: " + tid + " to: " + sentTo + " state: " + state;
}
}
}

View File

@ -2477,6 +2477,8 @@ public class I2PSnarkServlet extends BasicServlet {
String trackerLinkUrl = getTrackerLinkUrl(announce, infohash);
if (announce.startsWith("http://"))
announce = announce.substring(7);
else if (announce.startsWith("udp://"))
announce = announce.substring(6);
// strip path
int slsh = announce.indexOf('/');
if (slsh > 0)
@ -4102,11 +4104,11 @@ public class I2PSnarkServlet extends BasicServlet {
* Just to hide non-i2p trackers from the details page.
* @since 0.9.46
*/
private static boolean isI2PTracker(String url) {
private boolean isI2PTracker(String url) {
try {
URI uri = new URI(url);
String method = uri.getScheme();
if (!"http".equals(method) && !"https".equals(method))
if (!("http".equals(method) || (_manager.util().udpEnabled() && "udp".equals(method))))
return false;
String host = uri.getHost();
if (host == null || !host.endsWith(".i2p"))