propagate from branch 'i2p.i2p.zzz.sam' (head d5c193915251826fe4f5dcd58c36f74714495fd4)

to branch 'i2p.i2p' (head 5ad07e5b5ef68fddeec919c04c6c49178b6a6b31)
This commit is contained in:
zzz
2016-02-08 21:24:06 +00:00
21 changed files with 1592 additions and 596 deletions

View File

@@ -0,0 +1,404 @@
package net.i2p.sam;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionMuxedListener;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
/**
* A session that does nothing, but implements interfaces for raw, datagram, and streaming
* for convenience.
*
* We extend SAMv3StreamSession as we must have it set up the I2PSession, in case
* user adds a STREAM session (and he probably will).
* This session receives all data from I2P, but you can't send any data on it.
*
* @since 0.9.25
*/
class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, SAMRawReceiver,
SAMMessageSess, I2PSessionMuxedListener {
private final SAMv3Handler handler;
private final SAMv3DatagramServer dgs;
private final Map<String, SAMMessageSess> sessions;
private final StreamAcceptor streamAcceptor;
private static final String[] INVALID_OPTS = { "PORT", "HOST", "FROM_PORT", "TO_PORT",
"PROTOCOL", "LISTEN_PORT", "LISTEN_PROTOCOL" };
/**
* Build a Session according to information
* registered with the given nickname.
*
* Caller MUST call start().
*
* @param nick nickname of the session
* @throws IOException
* @throws DataFormatException
* @throws I2PSessionException
*/
public MasterSession(String nick, SAMv3DatagramServer dgServer, SAMv3Handler handler, Properties props)
throws IOException, DataFormatException, SAMException {
super(nick);
for (int i = 0; i < INVALID_OPTS.length; i++) {
String p = INVALID_OPTS[i];
if (props.containsKey(p))
throw new SAMException("MASTER session options may not contain " + p);
}
dgs = dgServer;
sessions = new ConcurrentHashMap<String, SAMMessageSess>(4);
this.handler = handler;
I2PSession isess = socketMgr.getSession();
// if we get a RAW session added with 0/0, it will replace this,
// and we won't add this back if removed.
isess.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
streamAcceptor = new StreamAcceptor();
}
/**
* Overridden to start the acceptor.
*/
@Override
public void start() {
Thread t = new I2PAppThread(streamAcceptor, "SAMMasterAcceptor");
t.start();
}
/**
* Add a session
* @return null for success, or error message
*/
public synchronized String add(String nick, String style, Properties props) {
if (props.containsKey("DESTINATION"))
return "SESSION ADD may not contain DESTINATION";
SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if (rec != null || sessions.containsKey(nick))
return "Duplicate ID " + nick;
int listenPort = I2PSession.PORT_ANY;
String slp = (String) props.remove("LISTEN_PORT");
if (slp == null)
slp = props.getProperty("FROM_PORT");
if (slp != null) {
try {
listenPort = Integer.parseInt(slp);
if (listenPort < 0 || listenPort > 65535)
return "Bad LISTEN_PORT " + slp;
// TODO enforce streaming listen port must be 0 or from port
} catch (NumberFormatException nfe) {
return "Bad LISTEN_PORT " + slp;
}
}
int listenProtocol;
SAMMessageSess sess;
SAMv3Handler subhandler;
try {
I2PSession isess = socketMgr.getSession();
subhandler = new SAMv3Handler(handler.getClientSocket(), handler.verMajor,
handler.verMinor, handler.getBridge());
if (style.equals("RAW")) {
if (!props.containsKey("PORT"))
return "RAW subsession must specify PORT";
listenProtocol = I2PSession.PROTO_DATAGRAM_RAW;
String spr = (String) props.remove("LISTEN_PROTOCOL");
if (spr == null)
spr = props.getProperty("PROTOCOL");
if (spr != null) {
try {
listenProtocol = Integer.parseInt(spr);
// RAW can't listen on streaming protocol
if (listenProtocol < 0 || listenProtocol > 255 ||
listenProtocol == I2PSession.PROTO_STREAMING)
return "Bad RAW LISTEN_PPROTOCOL " + spr;
} catch (NumberFormatException nfe) {
return "Bad LISTEN_PROTOCOL " + spr;
}
}
SAMv3RawSession ssess = new SAMv3RawSession(nick, props, handler, isess, listenProtocol, listenPort, dgs);
subhandler.setSession(ssess);
sess = ssess;
} else if (style.equals("DATAGRAM")) {
if (!props.containsKey("PORT"))
return "DATAGRAM subsession must specify PORT";
listenProtocol = I2PSession.PROTO_DATAGRAM;
SAMv3DatagramSession ssess = new SAMv3DatagramSession(nick, props, handler, isess, listenPort, dgs);
subhandler.setSession(ssess);
sess = ssess;
} else if (style.equals("STREAM")) {
listenProtocol = I2PSession.PROTO_STREAMING;
// FIXME need something that hangs off an existing dest
SAMv3StreamSession ssess = new SAMv3StreamSession(nick, props, handler, socketMgr, listenPort);
subhandler.setSession(ssess);
sess = ssess;
} else {
return "Unrecognized SESSION STYLE " + style;
}
} catch (IOException e) {
return e.toString();
} catch (DataFormatException e) {
return e.toString();
} catch (SAMException e) {
return e.toString();
} catch (I2PSessionException e) {
return e.toString();
}
for (SAMMessageSess s : sessions.values()) {
if (listenProtocol == s.getListenProtocol() &&
listenPort == s.getListenPort())
return "Duplicate protocol " + listenProtocol + " and port " + listenPort;
}
rec = new SessionRecord(getDestination().toBase64(), props, subhandler);
try {
SAMv3Handler.sSessionsHash.putDupDestOK(nick, rec);
sessions.put(nick, sess);
} catch (SessionsDB.ExistingIdException e) {
return "Duplicate ID " + nick;
}
if (_log.shouldWarn())
_log.warn("added " + style + " proto " + listenProtocol + " port " + listenPort);
sess.start();
// all ok
return null;
}
/**
* Remove a session
* @return null for success, or error message
*/
public synchronized String remove(String nick, Properties props) {
boolean ok;
SAMMessageSess sess = sessions.remove(nick);
if (sess != null) {
ok = SAMv3Handler.sSessionsHash.del(nick);
sess.close();
// TODO if 0/0, add back this as listener?
if (_log.shouldWarn())
_log.warn("removed " + sess + " proto " + sess.getListenProtocol() + " port " + sess.getListenPort());
} else {
ok = false;
}
if (!ok)
return "ID " + nick + " not found";
// all ok
return null;
}
/**
* @throws IOException always
*/
public void receiveDatagramBytes(Destination sender, byte[] data, int proto,
int fromPort, int toPort) throws IOException {
throw new IOException("master session");
}
/**
* Does nothing.
*/
public void stopDatagramReceiving() {}
/**
* @throws IOException always
*/
public void receiveRawBytes(byte[] data, int proto, int fromPort, int toPort) throws IOException {
throw new IOException("master session");
}
/**
* Does nothing.
*/
public void stopRawReceiving() {}
/////// stream session overrides
/** @throws I2PException always */
@Override
public void connect(SAMv3Handler handler, String dest, Properties props) throws I2PException {
throw new I2PException("master session");
}
/** @throws SAMException always */
@Override
public void accept(SAMv3Handler handler, boolean verbose) throws SAMException {
throw new SAMException("master session");
}
/** @throws SAMException always */
@Override
public void startForwardingIncoming(Properties props, boolean sendPorts) throws SAMException {
throw new SAMException("master session");
}
/** does nothing */
@Override
public void stopForwardingIncoming() {}
///// SAMMessageSess interface
@Override
public int getListenProtocol() {
return I2PSession.PROTO_ANY;
}
@Override
public int getListenPort() {
return I2PSession.PORT_ANY;
}
/**
* Close the master session
* Overridden to stop the acceptor.
*/
@Override
public void close() {
// close sessions?
streamAcceptor.stopRunning();
super.close();
}
// I2PSessionMuxedImpl interface
public void disconnected(I2PSession session) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P session disconnected");
close();
}
public void errorOccurred(I2PSession session, String message,
Throwable error) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P error: " + message, error);
close();
}
public void messageAvailable(I2PSession session, int msgId, long size) {
messageAvailable(session, msgId, size, I2PSession.PROTO_UNSPECIFIED,
I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
}
/** @since 0.9.24 */
public void messageAvailable(I2PSession session, int msgId, long size,
int proto, int fromPort, int toPort) {
try {
byte msg[] = session.receiveMessage(msgId);
if (msg == null)
return;
messageReceived(msg, proto, fromPort, toPort);
} catch (I2PSessionException e) {
_log.error("Error fetching I2P message", e);
close();
}
}
public void reportAbuse(I2PSession session, int severity) {
_log.warn("Abuse reported (severity: " + severity + ")");
close();
}
private void messageReceived(byte[] msg, int proto, int fromPort, int toPort) {
if (_log.shouldWarn())
_log.warn("Unhandled message received, length = " + msg.length +
" protocol: " + proto + " from port: " + fromPort + " to port: " + toPort);
}
private class StreamAcceptor implements Runnable {
private volatile boolean stop;
public StreamAcceptor() {
}
public void stopRunning() {
stop = true;
}
public void run() {
if (_log.shouldWarn())
_log.warn("Stream acceptor started");
final I2PServerSocket i2pss = socketMgr.getServerSocket();
while (!stop) {
// wait and accept a connection from I2P side
I2PSocket i2ps;
try {
i2ps = i2pss.accept();
if (i2ps == null) // never null as of 0.9.17
continue;
} catch (SocketTimeoutException ste) {
continue;
} catch (ConnectException ce) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error accepting", ce);
try { Thread.sleep(50); } catch (InterruptedException ie) {}
continue;
} catch (I2PException ipe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error accepting", ipe);
break;
}
int port = i2ps.getLocalPort();
SAMMessageSess foundSess = null;
Collection<SAMMessageSess> all = sessions.values();
for (Iterator<SAMMessageSess> iter = all.iterator(); iter.hasNext(); ) {
SAMMessageSess sess = iter.next();
if (sess.getListenProtocol() != I2PSession.PROTO_STREAMING) {
// remove as we may be going around again below
iter.remove();
continue;
}
if (sess.getListenPort() == port) {
foundSess = sess;
break;
}
}
// We never send streaming out as a raw packet to a default listener,
// and we don't allow raw to listen on streaming protocol,
// so we don't have to look for a default protocol,
// but we do have to look for a default port listener.
if (foundSess == null) {
for (SAMMessageSess sess : all) {
if (sess.getListenPort() == 0) {
foundSess = sess;
break;
}
}
}
if (foundSess != null) {
SAMv3StreamSession ssess = (SAMv3StreamSession) foundSess;
boolean ok = ssess.queueSocket(i2ps);
if (!ok) {
_log.logAlways(Log.WARN, "Accept queue overflow for " + ssess);
try { i2ps.close(); } catch (IOException ioe) {}
}
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("No subsession found for incoming streaming connection on port " + port);
}
}
if (_log.shouldWarn())
_log.warn("Stream acceptor stopped");
}
}
}

View File

@@ -32,9 +32,9 @@ class SAMDatagramSession extends SAMMessageSession {
// FIXME make final after fixing SAMv3DatagramSession override
protected SAMDatagramReceiver recv;
private final I2PDatagramMaker dgramMaker;
private final I2PDatagramDissector dgramDissector = new I2PDatagramDissector();
/**
* Create a new SAM DATAGRAM session.
*
@@ -45,11 +45,10 @@ class SAMDatagramSession extends SAMMessageSession {
* @throws DataFormatException
* @throws I2PSessionException
*/
public SAMDatagramSession(String dest, Properties props,
protected SAMDatagramSession(String dest, Properties props,
SAMDatagramReceiver recv) throws IOException,
DataFormatException, I2PSessionException {
super(dest, props);
this.recv = recv;
dgramMaker = new I2PDatagramMaker(getI2PSession());
}
@@ -57,6 +56,8 @@ class SAMDatagramSession extends SAMMessageSession {
/**
* Create a new SAM DATAGRAM session.
*
* Caller MUST call start().
*
* @param destStream Input stream containing the destination keys
* @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data
@@ -68,7 +69,20 @@ class SAMDatagramSession extends SAMMessageSession {
SAMDatagramReceiver recv) throws IOException,
DataFormatException, I2PSessionException {
super(destStream, props);
this.recv = recv;
dgramMaker = new I2PDatagramMaker(getI2PSession());
}
/**
* Create a new SAM DATAGRAM session on an existing I2P session.
*
* @param props unused for now
* @since 0.9.25
*/
protected SAMDatagramSession(I2PSession sess, Properties props, int listenPort,
SAMDatagramReceiver recv) throws IOException,
DataFormatException, I2PSessionException {
super(sess, I2PSession.PROTO_DATAGRAM, listenPort);
this.recv = recv;
dgramMaker = new I2PDatagramMaker(getI2PSession());
}
@@ -90,11 +104,31 @@ class SAMDatagramSession extends SAMMessageSession {
throw new DataFormatException("Datagram size exceeded (" + data.length + ")");
byte[] dgram ;
synchronized (dgramMaker) {
dgram = dgramMaker.makeI2PDatagram(data);
dgram = dgramMaker.makeI2PDatagram(data);
}
return sendBytesThroughMessageSession(dest, dgram, I2PSession.PROTO_DATAGRAM, fromPort, toPort);
}
/**
* Send bytes through a SAM DATAGRAM session.
*
* @since 0.9.25
*/
public boolean sendBytes(String dest, byte[] data, int proto,
int fromPort, int toPort,
boolean sendLeaseSet, int sendTags,
int tagThreshold, int expiration)
throws DataFormatException, I2PSessionException {
if (data.length > DGRAM_SIZE_MAX)
throw new DataFormatException("Datagram size exceeded (" + data.length + ")");
byte[] dgram ;
synchronized (dgramMaker) {
dgram = dgramMaker.makeI2PDatagram(data);
}
return sendBytesThroughMessageSession(dest, dgram, I2PSession.PROTO_DATAGRAM, fromPort, toPort,
sendLeaseSet, sendTags,tagThreshold, expiration);
}
protected void messageReceived(byte[] msg, int proto, int fromPort, int toPort) {
byte[] payload;
Destination sender;

View File

@@ -25,7 +25,7 @@ import net.i2p.util.VersionComparator;
*/
class SAMHandlerFactory {
private static final String VERSION = "3.2";
private static final String VERSION = "3.3";
private static final int HELLO_TIMEOUT = 60*1000;
@@ -139,6 +139,9 @@ class SAMHandlerFactory {
if (VersionComparator.comp(VERSION, minVer) >= 0 &&
VersionComparator.comp(VERSION, maxVer) <= 0)
return VERSION;
if (VersionComparator.comp("3.2", minVer) >= 0 &&
VersionComparator.comp("3.2", maxVer) <= 0)
return "3.2";
if (VersionComparator.comp("3.1", minVer) >= 0 &&
VersionComparator.comp("3.1", maxVer) <= 0)
return "3.1";

View File

@@ -0,0 +1,63 @@
package net.i2p.sam;
import java.io.Closeable;
import net.i2p.client.I2PSessionException;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
/**
* Base interface for SAMMessageSession, which is the base for
* v1/v3 datagram and raw sessions.
* Also implemented by SAMStreamSession.
*
* @since 0.9.25 pulled from SAMMessageSession
*/
interface SAMMessageSess extends Closeable {
/**
* Start a SAM message-based session.
* MUST be called after constructor.
*/
public void start();
/**
* Close a SAM message-based session.
*/
public void close();
/**
* Get the SAM message-based session Destination.
*
* @return The SAM message-based session Destination.
*/
public Destination getDestination();
/**
* Send bytes through a SAM message-based session.
*
* @param dest Destination
* @param data Bytes to be sent
*
* @return True if the data was sent, false otherwise
* @throws DataFormatException on unknown / bad dest
* @throws I2PSessionException on serious error, probably session closed
*/
public boolean sendBytes(String dest, byte[] data, int proto,
int fromPort, int toPort) throws DataFormatException, I2PSessionException;
/**
* Send bytes through a SAM message-based session.
*
* @since 0.9.25
*/
public boolean sendBytes(String dest, byte[] data, int proto,
int fromPort, int toPort,
boolean sendLeaseSet, int sendTags,
int tagThreshold, int expiration)
throws DataFormatException, I2PSessionException;
public int getListenProtocol();
public int getListenPort();
}

View File

@@ -33,11 +33,14 @@ import net.i2p.util.Log;
*
* @author human
*/
abstract class SAMMessageSession implements Closeable {
abstract class SAMMessageSession implements SAMMessageSess {
protected final Log _log;
private I2PSession session;
private SAMMessageSessionHandler handler;
private final I2PSession session;
protected final boolean _isOwnSession;
private final SAMMessageSessionHandler handler;
private final int listenProtocol;
private final int listenPort;
/**
* Initialize a new SAM message-based session.
@@ -49,9 +52,7 @@ abstract class SAMMessageSession implements Closeable {
* @throws I2PSessionException
*/
protected SAMMessageSession(String dest, Properties props) throws IOException, DataFormatException, I2PSessionException {
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decode(dest));
initSAMMessageSession(bais, props);
this(new ByteArrayInputStream(Base64.decode(dest)), props);
}
/**
@@ -64,17 +65,42 @@ abstract class SAMMessageSession implements Closeable {
* @throws I2PSessionException
*/
protected SAMMessageSession(InputStream destStream, Properties props) throws IOException, DataFormatException, I2PSessionException {
_log = new Log(getClass());
initSAMMessageSession(destStream, props);
}
private void initSAMMessageSession (InputStream destStream, Properties props) throws IOException, DataFormatException, I2PSessionException {
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Initializing SAM message-based session");
listenProtocol = I2PSession.PROTO_ANY;
listenPort = I2PSession.PORT_ANY;
_isOwnSession = true;
handler = new SAMMessageSessionHandler(destStream, props);
session = handler.getSession();
}
// FIXME don't start threads in constructors
/**
* Initialize a new SAM message-based session using an existing I2PSession.
*
* @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile)
* @param props Properties to setup the I2P session
* @throws IOException
* @throws DataFormatException
* @throws I2PSessionException
* @since 0.9.25
*/
protected SAMMessageSession(I2PSession sess, int listenProtocol, int listenPort)
throws IOException, DataFormatException, I2PSessionException {
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Initializing SAM message-based session");
this.listenProtocol = listenProtocol;
this.listenPort = listenPort;
_isOwnSession = false;
session = sess;
handler = new SAMMessageSessionHandler(session);
}
/*
* @since 0.9.25
*/
public void start() {
Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler");
t.start();
}
@@ -88,6 +114,20 @@ abstract class SAMMessageSession implements Closeable {
return session.getMyDestination();
}
/**
* @since 0.9.25
*/
public int getListenProtocol() {
return listenProtocol;
}
/**
* @since 0.9.25
*/
public int getListenPort() {
return listenPort;
}
/**
* Send bytes through a SAM message-based session.
*
@@ -128,14 +168,19 @@ abstract class SAMMessageSession implements Closeable {
}
/**
* Actually send bytes through the SAM message-based session I2PSession.
* TODO unused, umimplemented in the sessions and handlers
* Actually send bytes through the SAM message-based session I2PSession,
* using per-message extended options.
* For efficiency, use the method without all the extra options if they are all defaults.
*
* @param dest Destination
* @param data Bytes to be sent
* @param proto I2CP protocol
* @param fromPort I2CP from port
* @param toPort I2CP to port
* @param sendLeaseSet true is the usual setting and the I2CP default
* @param sendTags 0 to leave as default
* @param tagThreshold 0 to leave as default
* @param expiration SECONDS from now, NOT absolute time, 0 to leave as default
*
* @return True if the data was sent, false otherwise
* @throws DataFormatException on unknown / bad dest
@@ -145,7 +190,7 @@ abstract class SAMMessageSession implements Closeable {
protected boolean sendBytesThroughMessageSession(String dest, byte[] data,
int proto, int fromPort, int toPort,
boolean sendLeaseSet, int sendTags,
int tagThreshold, long expires)
int tagThreshold, int expiration)
throws DataFormatException, I2PSessionException {
Destination d = SAMUtils.getDest(dest);
@@ -153,10 +198,14 @@ abstract class SAMMessageSession implements Closeable {
_log.debug("Sending " + data.length + " bytes to " + dest);
}
SendMessageOptions opts = new SendMessageOptions();
opts.setSendLeaseSet(sendLeaseSet);
opts.setTagsToSend(sendTags);
opts.setTagThreshold(tagThreshold);
opts.setDate(expires);
if (!sendLeaseSet)
opts.setSendLeaseSet(false);
if (sendTags > 0)
opts.setTagsToSend(sendTags);
if (tagThreshold > 0)
opts.setTagThreshold(tagThreshold);
if (expiration > 0)
opts.setDate(I2PAppContext.getGlobalContext().clock().now() + (expiration * 1000));
return session.sendMessage(d, data, 0, data.length, proto, fromPort, toPort, opts);
}
@@ -194,8 +243,9 @@ abstract class SAMMessageSession implements Closeable {
*
* @author human
*/
class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener {
private class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener {
private final I2PSession _session;
private final Object runningLock = new Object();
private volatile boolean stillRunning = true;
@@ -203,8 +253,8 @@ abstract class SAMMessageSession implements Closeable {
* Create a new SAM message-based session handler
*
* @param destStream Input stream containing the destination keys
* @param props Properties to setup the I2P session
* @throws I2PSessionException
* @param props Properties to setup the I2P session
* @throws I2PSessionException
*/
public SAMMessageSessionHandler(InputStream destStream, Properties props) throws I2PSessionException {
if (_log.shouldLog(Log.DEBUG))
@@ -215,15 +265,33 @@ abstract class SAMMessageSession implements Closeable {
props.setProperty("inbound.nickname", "SAM UDP Client");
props.setProperty("outbound.nickname", "SAM UDP Client");
}
session = client.createSession(destStream, props);
_session = client.createSession(destStream, props);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Connecting I2P session...");
session.connect();
_session.connect();
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P session connected");
session.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
_session.addMuxedSessionListener(this, listenProtocol, listenPort);
}
/**
* Create a new SAM message-based session handler on an existing I2PSession
*
* @since 0.9.25
*/
public SAMMessageSessionHandler(I2PSession sess) throws I2PSessionException {
_session = sess;
_session.addMuxedSessionListener(this, listenProtocol, listenPort);
}
/**
* The session.
* @since 0.9.25
*/
public final I2PSession getSession() {
return _session;
}
/**
@@ -254,16 +322,18 @@ abstract class SAMMessageSession implements Closeable {
_log.debug("Shutting down SAM message-based session handler");
shutDown();
session.removeListener(I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
session.removeListener(listenProtocol, listenPort);
try {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Destroying I2P session...");
session.destroySession();
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P session destroyed");
} catch (I2PSessionException e) {
_log.error("Error destroying I2P session", e);
if (_isOwnSession) {
try {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Destroying I2P session...");
session.destroySession();
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P session destroyed");
} catch (I2PSessionException e) {
_log.error("Error destroying I2P session", e);
}
}
}

View File

@@ -39,16 +39,17 @@ class SAMRawSession extends SAMMessageSession {
* @throws DataFormatException
* @throws I2PSessionException
*/
public SAMRawSession(String dest, Properties props,
protected SAMRawSession(String dest, Properties props,
SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
super(dest, props);
this.recv = recv;
}
/**
* Create a new SAM RAW session.
*
* Caller MUST call start().
*
* @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile)
* @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data
@@ -59,7 +60,19 @@ class SAMRawSession extends SAMMessageSession {
public SAMRawSession(InputStream destStream, Properties props,
SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
super(destStream, props);
this.recv = recv;
}
/**
* Create a new SAM RAW session on an existing I2P session.
*
* @param props unused for now
* @since 0.9.25
*/
protected SAMRawSession(I2PSession sess, Properties props, int listenProtocol, int listenPort,
SAMRawReceiver recv) throws IOException,
DataFormatException, I2PSessionException {
super(sess, listenProtocol, listenPort);
this.recv = recv;
}
@@ -82,6 +95,24 @@ class SAMRawSession extends SAMMessageSession {
return sendBytesThroughMessageSession(dest, data, proto, fromPort, toPort);
}
/**
* Send bytes through a SAM RAW session.
*
* @since 0.9.25
*/
public boolean sendBytes(String dest, byte[] data, int proto,
int fromPort, int toPort,
boolean sendLeaseSet, int sendTags,
int tagThreshold, int expiration)
throws DataFormatException, I2PSessionException {
if (data.length > RAW_SIZE_MAX)
throw new DataFormatException("Data size limit exceeded (" + data.length + ")");
if (proto == I2PSession.PROTO_UNSPECIFIED)
proto = I2PSession.PROTO_DATAGRAM_RAW;
return sendBytesThroughMessageSession(dest, data, proto, fromPort, toPort,
sendLeaseSet, sendTags,tagThreshold, expiration);
}
protected void messageReceived(byte[] msg, int proto, int fromPort, int toPort) {
try {
recv.receiveRawBytes(msg, proto, fromPort, toPort);

View File

@@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PClient;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager;
@@ -47,16 +49,12 @@ import net.i2p.util.Log;
*
* @author human
*/
class SAMStreamSession {
class SAMStreamSession implements SAMMessageSess {
protected final Log _log;
protected final static int SOCKET_HANDLER_BUF_SIZE = 32768;
protected final SAMStreamReceiver recv;
protected final SAMStreamSessionServer server;
protected final I2PSocketManager socketMgr;
/** stream id (Long) to SAMStreamSessionSocketReader */
@@ -68,6 +66,9 @@ class SAMStreamSession {
// Can we create outgoing connections?
protected final boolean canCreate;
private final int listenProtocol;
private final int listenPort;
protected final boolean _isOwnSession;
/**
* should we flush every time we get a STREAM SEND, or leave that up to
@@ -81,6 +82,8 @@ class SAMStreamSession {
/**
* Create a new SAM STREAM session.
*
* Caller MUST call start().
*
* @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile)
* @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") or "__v3__" if extended by SAMv3StreamSession
* @param props Properties to setup the I2P session
@@ -105,8 +108,8 @@ class SAMStreamSession {
* @throws DataFormatException
* @throws SAMException
*/
public SAMStreamSession(InputStream destStream, String dir,
Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
protected SAMStreamSession(InputStream destStream, String dir,
Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
this.recv = recv;
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
@@ -156,30 +159,88 @@ class SAMStreamSession {
allprops.setProperty("outbound.nickname", "SAM TCP Client");
}
_isOwnSession = true;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Creating I2PSocketManager...");
socketMgr = I2PSocketManagerFactory.createManager(destStream,
i2cpHost,
i2cpPort,
allprops);
if (socketMgr == null) {
throw new SAMException("Error creating I2PSocketManager");
try {
// we do it this way so we get exceptions
socketMgr = I2PSocketManagerFactory.createDisconnectedManager(destStream,
i2cpHost, i2cpPort, allprops);
socketMgr.getSession().connect();
} catch (I2PSessionException ise) {
throw new SAMException("Error creating I2PSocketManager: " + ise.getMessage(), ise);
}
socketMgr.addDisconnectListener(new DisconnectListener());
forceFlush = Boolean.parseBoolean(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH));
if (Boolean.parseBoolean(props.getProperty("i2p.streaming.enforceProtocol")))
listenProtocol = I2PSession.PROTO_STREAMING;
else
listenProtocol = I2PSession.PROTO_ANY;
listenPort = I2PSession.PORT_ANY;
if (startAcceptor) {
server = new SAMStreamSessionServer();
Thread t = new I2PAppThread(server, "SAMStreamSessionServer");
t.start();
} else {
server = null;
}
}
/**
* Create a new SAM STREAM session on an existing socket manager.
* v3 only.
*
* @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data
* @throws IOException
* @throws DataFormatException
* @throws SAMException
* @since 0.9.25
*/
protected SAMStreamSession(I2PSocketManager mgr, Properties props, SAMStreamReceiver recv, int listenport)
throws IOException, DataFormatException, SAMException {
this.recv = recv;
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (_log.shouldLog(Log.DEBUG))
_log.debug("SAM STREAM session instantiated");
canCreate = true;
Properties allprops = (Properties) System.getProperties().clone();
allprops.putAll(props);
_isOwnSession = false;
socketMgr = mgr;
socketMgr.addDisconnectListener(new DisconnectListener());
forceFlush = Boolean.parseBoolean(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH));
listenProtocol = I2PSession.PROTO_STREAMING;
listenPort = listenport;
server = null;
}
/*
* @since 0.9.25
*/
public void start() {
if (server != null) {
Thread t = new I2PAppThread(server, "SAMStreamSessionServer");
t.start();
}
}
/*
* @since 0.9.25
*/
public int getListenProtocol() {
return listenProtocol;
}
/*
* @since 0.9.25
*/
public int getListenPort() {
return listenPort;
}
protected class DisconnectListener implements I2PSocketManager.DisconnectListener {
public void sessionDisconnected() {
@@ -284,7 +345,8 @@ class SAMStreamSession {
}
removeAllSocketHandlers();
recv.stopStreamReceiving();
socketMgr.destroySocketManager();
if (_isOwnSession)
socketMgr.destroySocketManager();
}
/**
@@ -304,6 +366,27 @@ class SAMStreamSession {
return true;
}
/**
* Unsupported
* @throws I2PSessionException always
* @since 0.9.25 moved from subclass SAMv3StreamSession to implement SAMMessageSess
*/
public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws I2PSessionException {
throw new I2PSessionException("Unsupported in STREAM or MASTER session");
}
/**
* Unsupported
* @throws I2PSessionException always
* @since 0.9.25
*/
public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp,
boolean sendLeaseSet, int sendTags,
int tagThreshold, int expiration)
throws I2PSessionException {
throw new I2PSessionException("Unsupported in STREAM or MASTER session");
}
/**
* Create a new SAM STREAM session socket handler, detaching its thread.
*

View File

@@ -40,17 +40,18 @@ import net.i2p.util.Log;
*/
class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramReceiver, SAMStreamReceiver {
protected SAMRawSession rawSession;
protected SAMDatagramSession datagramSession;
protected SAMMessageSess rawSession;
protected SAMMessageSess datagramSession;
protected SAMStreamSession streamSession;
protected SAMRawSession getRawSession() {return rawSession ;}
protected SAMDatagramSession getDatagramSession() {return datagramSession ;}
protected SAMStreamSession getStreamSession() {return streamSession ;}
protected final SAMMessageSess getRawSession() { return rawSession; }
protected final SAMMessageSess getDatagramSession() { return datagramSession; }
protected final SAMStreamSession getStreamSession() { return streamSession; }
protected final long _id;
private static final AtomicLong __id = new AtomicLong();
private static final int FIRST_READ_TIMEOUT = 60*1000;
protected static final String SESSION_ERROR = "SESSION STATUS RESULT=I2P_ERROR";
/**
* Create a new SAM version 1 handler. This constructor expects
@@ -132,7 +133,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
ReadLine.readLine(sock, buf, gotFirstLine ? 0 : FIRST_READ_TIMEOUT);
sock.setSoTimeout(0);
} catch (SocketTimeoutException ste) {
writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n");
writeString(SESSION_ERROR, "command timeout, bye");
break;
}
msg = buf.toString();
@@ -199,14 +200,14 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (_log.shouldWarn())
_log.warn("Error closing socket", e);
}
if (getRawSession() != null) {
getRawSession().close();
if (rawSession != null) {
rawSession.close();
}
if (getDatagramSession() != null) {
getDatagramSession().close();
if (datagramSession != null) {
datagramSession.close();
}
if (getStreamSession() != null) {
getStreamSession().close();
if (streamSession != null) {
streamSession.close();
}
}
}
@@ -218,25 +219,24 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
try{
if (opcode.equals("CREATE")) {
if ((getRawSession() != null) || (getDatagramSession() != null)
|| (getStreamSession() != null)) {
if ((rawSession != null) || (datagramSession != null)
|| (streamSession != null)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Trying to create a session, but one still exists");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n");
return writeString(SESSION_ERROR, "Session already exists");
}
if (props.isEmpty()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No parameters specified in SESSION CREATE message");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No parameters for SESSION CREATE\"\n");
return writeString(SESSION_ERROR, "No parameters for SESSION CREATE");
}
dest = props.getProperty("DESTINATION");
dest = (String) props.remove("DESTINATION");
if (dest == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION DESTINATION parameter not specified");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"DESTINATION not specified\"\n");
return writeString(SESSION_ERROR, "DESTINATION not specified");
}
props.remove("DESTINATION");
String destKeystream = null;
@@ -261,13 +261,12 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
}
}
String style = props.getProperty("STYLE");
String style = (String) props.remove("STYLE");
if (style == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION STYLE parameter not specified");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n");
return writeString(SESSION_ERROR, "No SESSION STYLE specified");
}
props.remove("STYLE");
// Unconditionally override what the client may have set
// (iMule sets BestEffort) as None is more efficient
@@ -276,10 +275,12 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (style.equals("RAW")) {
rawSession = new SAMRawSession(destKeystream, props, this);
rawSession.start();
} else if (style.equals("DATAGRAM")) {
datagramSession = new SAMDatagramSession(destKeystream, props,this);
datagramSession.start();
} else if (style.equals("STREAM")) {
String dir = props.getProperty("DIRECTION");
String dir = (String) props.remove("DIRECTION");
if (dir == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No DIRECTION parameter in STREAM session, defaulting to BOTH");
@@ -288,16 +289,15 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
&& !dir.equals("BOTH")) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unknown DIRECTION parameter value: [" + dir + "]");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unknown DIRECTION parameter\"\n");
} else {
props.remove("DIRECTION");
return writeString(SESSION_ERROR, "Unknown DIRECTION parameter");
}
streamSession = newSAMStreamSession(destKeystream, dir,props);
streamSession.start();
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized SESSION STYLE: \"" + style +"\"");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n");
return writeString(SESSION_ERROR, "Unrecognized SESSION STYLE");
}
return writeString("SESSION STATUS RESULT=OK DESTINATION="
+ dest + "\n");
@@ -305,22 +305,22 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized SESSION message opcode: \""
+ opcode + "\"");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized opcode\"\n");
return writeString(SESSION_ERROR, "Unrecognized opcode");
}
} catch (DataFormatException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Invalid destination specified");
return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString("SESSION STATUS RESULT=INVALID_KEY", e.getMessage());
} catch (I2PSessionException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P error when instantiating session", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
} catch (SAMException e) {
_log.error("Unexpected SAM error", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
} catch (IOException e) {
_log.error("Unexpected IOException", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
}
}
@@ -378,12 +378,12 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
Destination dest = null ;
if (name.equals("ME")) {
if (getRawSession() != null) {
dest = getRawSession().getDestination();
} else if (getStreamSession() != null) {
dest = getStreamSession().getDestination();
} else if (getDatagramSession() != null) {
dest = getDatagramSession().getDestination();
if (rawSession != null) {
dest = rawSession.getDestination();
} else if (streamSession != null) {
dest = streamSession.getDestination();
} else if (datagramSession != null) {
dest = datagramSession.getDestination();
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Lookup for SESSION destination, but session is null");
@@ -415,22 +415,46 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
/* Parse and execute a DATAGRAM message */
protected boolean execDatagramMessage(String opcode, Properties props) {
if (getDatagramSession() == null) {
if (datagramSession == null) {
_log.error("DATAGRAM message received, but no DATAGRAM session exists");
return false;
}
return execDgOrRawMessage(false, opcode, props);
}
/* Parse and execute a RAW message */
protected boolean execRawMessage(String opcode, Properties props) {
if (rawSession == null) {
_log.error("RAW message received, but no RAW session exists");
return false;
}
return execDgOrRawMessage(true, opcode, props);
}
/*
* Parse and execute a RAW or DATAGRAM SEND message.
* This is for v1/v2 compatible sending only.
* For v3 sending, see SAMv3DatagramServer.
*
* Note that props are from the command line only.
* Session defaults from CREATE are NOT honored here.
* FIXME if we care, but nobody's probably using v3.2 options for v1/v2 sending.
*
* @since 0.9.25 consolidated from execDatagramMessage() and execRawMessage()
*/
private boolean execDgOrRawMessage(boolean isRaw, String opcode, Properties props) {
if (opcode.equals("SEND")) {
if (props.isEmpty()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No parameters specified in DATAGRAM SEND message");
_log.debug("No parameters specified in SEND message");
return false;
}
String dest = props.getProperty("DESTINATION");
if (dest == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Destination not specified in DATAGRAM SEND message");
_log.debug("Destination not specified in SEND message");
return false;
}
@@ -438,32 +462,47 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
String strsize = props.getProperty("SIZE");
if (strsize == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Size not specified in DATAGRAM SEND message");
_log.warn("Size not specified in SEND message");
return false;
}
try {
size = Integer.parseInt(strsize);
} catch (NumberFormatException e) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid DATAGRAM SEND size specified: " + strsize);
_log.warn("Invalid SEND size specified: " + strsize);
return false;
}
if (!checkDatagramSize(size)) {
boolean ok = isRaw ? checkSize(size) : checkDatagramSize(size);
if (!ok) {
if (_log.shouldLog(Log.WARN))
_log.warn("Specified size (" + size
_log.warn("Specified size (" + size
+ ") is out of protocol limits");
return false;
}
int proto = I2PSession.PROTO_DATAGRAM;
int fromPort = I2PSession.PORT_UNSPECIFIED;
int toPort = I2PSession.PORT_UNSPECIFIED;
int proto;
if (isRaw) {
proto = I2PSession.PROTO_DATAGRAM_RAW;
String s = props.getProperty("PROTOCOL");
if (s != null) {
try {
proto = Integer.parseInt(s);
} catch (NumberFormatException e) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid SEND protocol specified: " + s);
}
}
} else {
proto = I2PSession.PROTO_DATAGRAM;
}
String s = props.getProperty("FROM_PORT");
if (s != null) {
try {
fromPort = Integer.parseInt(s);
} catch (NumberFormatException e) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid DATAGRAM SEND port specified: " + s);
_log.warn("Invalid SEND port specified: " + s);
}
}
s = props.getProperty("TO_PORT");
@@ -472,7 +511,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
toPort = Integer.parseInt(s);
} catch (NumberFormatException e) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid RAW SEND port specified: " + s);
_log.warn("Invalid SEND port specified: " + s);
}
}
@@ -482,8 +521,9 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
in.readFully(data);
if (!getDatagramSession().sendBytes(dest, data, proto, fromPort, toPort)) {
_log.error("DATAGRAM SEND failed");
SAMMessageSess sess = isRaw ? rawSession : datagramSession;
if (sess.sendBytes(dest, data, proto, fromPort, toPort)) {
_log.error("SEND failed");
// a message send failure is no reason to drop the SAM session
// for raw and repliable datagrams, just carry on our merry way
return true;
@@ -492,139 +532,26 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
return true;
} catch (EOFException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Too few bytes with DATAGRAM SEND message (expected: "
_log.debug("Too few bytes with SEND message (expected: "
+ size);
return false;
} catch (IOException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Caught IOException while parsing DATAGRAM SEND message",
_log.debug("Caught IOException while parsing SEND message",
e);
return false;
} catch (DataFormatException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Invalid key specified with DATAGRAM SEND message",
_log.debug("Invalid key specified with SEND message",
e);
return false;
} catch (I2PSessionException e) {
_log.error("Session error with DATAGRAM SEND message", e);
_log.error("Session error with SEND message", e);
return false;
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized DATAGRAM message opcode: \""
+ opcode + "\"");
return false;
}
}
/* Parse and execute a RAW message */
protected boolean execRawMessage(String opcode, Properties props) {
if (getRawSession() == null) {
_log.error("RAW message received, but no RAW session exists");
return false;
}
if (opcode.equals("SEND")) {
if (props.isEmpty()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No parameters specified in RAW SEND message");
return false;
}
String dest = props.getProperty("DESTINATION");
if (dest == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Destination not specified in RAW SEND message");
return false;
}
int size;
String strsize = props.getProperty("SIZE");
if (strsize == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Size not specified in RAW SEND message");
return false;
}
try {
size = Integer.parseInt(strsize);
} catch (NumberFormatException e) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid RAW SEND size specified: " + strsize);
return false;
}
if (!checkSize(size)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Specified size (" + size
+ ") is out of protocol limits");
return false;
}
int proto = I2PSession.PROTO_DATAGRAM_RAW;
int fromPort = I2PSession.PORT_UNSPECIFIED;
int toPort = I2PSession.PORT_UNSPECIFIED;
String s = props.getProperty("PROTOCOL");
if (s != null) {
try {
proto = Integer.parseInt(s);
} catch (NumberFormatException e) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid RAW SEND protocol specified: " + s);
}
}
s = props.getProperty("FROM_PORT");
if (s != null) {
try {
fromPort = Integer.parseInt(s);
} catch (NumberFormatException e) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid RAW SEND port specified: " + s);
}
}
s = props.getProperty("TO_PORT");
if (s != null) {
try {
toPort = Integer.parseInt(s);
} catch (NumberFormatException e) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid RAW SEND port specified: " + s);
}
}
try {
DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream());
byte[] data = new byte[size];
in.readFully(data);
if (!getRawSession().sendBytes(dest, data, proto, fromPort, toPort)) {
_log.error("RAW SEND failed");
// a message send failure is no reason to drop the SAM session
// for raw and repliable datagrams, just carry on our merry way
return true;
}
return true;
} catch (EOFException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Too few bytes with RAW SEND message (expected: "
+ size);
return false;
} catch (IOException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Caught IOException while parsing RAW SEND message",
e);
return false;
} catch (DataFormatException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Invalid key specified with RAW SEND message",
e);
return false;
} catch (I2PSessionException e) {
_log.error("Session error with RAW SEND message", e);
return false;
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized RAW message opcode: \""
_log.debug("Unrecognized message opcode: \""
+ opcode + "\"");
return false;
}
@@ -632,7 +559,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
/* Parse and execute a STREAM message */
protected boolean execStreamMessage(String opcode, Properties props) {
if (getStreamSession() == null) {
if (streamSession == null) {
_log.error("STREAM message received, but no STREAM session exists");
return false;
}
@@ -645,7 +572,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
return execStreamClose(props);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized RAW message opcode: \""
_log.debug("Unrecognized STREAM message opcode: \""
+ opcode + "\"");
return false;
}
@@ -699,13 +626,13 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
}
try {
if (!getStreamSession().sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) {
if (!streamSession.sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) {
if (_log.shouldLog(Log.WARN))
_log.warn("STREAM SEND [" + size + "] failed");
// a message send failure is no reason to drop the SAM session
// for style=stream, tell the client the stream failed, and kill the virtual connection..
boolean rv = writeString("STREAM CLOSED RESULT=CANT_REACH_PEER ID=" + id + " MESSAGE=\"Send of " + size + " bytes failed\"\n");
getStreamSession().closeConnection(id);
streamSession.closeConnection(id);
return rv;
}
@@ -732,7 +659,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
int id;
{
String strid = props.getProperty("ID");
String strid = (String) props.remove("ID");
if (strid == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("ID not specified in STREAM SEND message");
@@ -750,19 +677,17 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
_log.debug("Invalid STREAM CONNECT ID specified: " +strid);
return false;
}
props.remove("ID");
}
String dest = props.getProperty("DESTINATION");
String dest = (String) props.remove("DESTINATION");
if (dest == null) {
_log.debug("Destination not specified in RAW SEND message");
return false;
}
props.remove("DESTINATION");
try {
try {
if (!getStreamSession().connect(id, dest, props)) {
if (!streamSession.connect(id, dest, props)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM connection failed");
return false;
@@ -823,7 +748,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
}
}
boolean closed = getStreamSession().closeConnection(id);
boolean closed = streamSession.closeConnection(id);
if ( (!closed) && (_log.shouldLog(Log.WARN)) )
_log.warn("Stream unable to be closed, but this is non fatal");
return true;
@@ -841,7 +766,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
// SAMRawReceiver implementation
public void receiveRawBytes(byte data[], int proto, int fromPort, int toPort) throws IOException {
if (getRawSession() == null) {
if (rawSession == null) {
_log.error("BUG! Received raw bytes, but session is null!");
return;
}
@@ -867,7 +792,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (_log.shouldLog(Log.DEBUG))
_log.debug("stopRawReceiving() invoked");
if (getRawSession() == null) {
if (rawSession == null) {
_log.error("BUG! Got raw receiving stop, but session is null!");
return;
}
@@ -883,7 +808,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
// SAMDatagramReceiver implementation
public void receiveDatagramBytes(Destination sender, byte data[], int proto,
int fromPort, int toPort) throws IOException {
if (getDatagramSession() == null) {
if (datagramSession == null) {
_log.error("BUG! Received datagram bytes, but session is null!");
return;
}
@@ -910,7 +835,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (_log.shouldLog(Log.DEBUG))
_log.debug("stopDatagramReceiving() invoked");
if (getDatagramSession() == null) {
if (datagramSession == null) {
_log.error("BUG! Got datagram receiving stop, but session is null!");
return;
}
@@ -927,7 +852,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
public void streamSendAnswer( int id, String result, String bufferState ) throws IOException
{
if ( getStreamSession() == null )
if ( streamSession == null )
{
_log.error ( "BUG! Want to answer to stream SEND, but session is null!" );
return;
@@ -945,7 +870,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
public void notifyStreamSendBufferFree( int id ) throws IOException
{
if ( getStreamSession() == null )
if ( streamSession == null )
{
_log.error ( "BUG! Stream outgoing buffer is free, but session is null!" );
return;
@@ -959,7 +884,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
public void notifyStreamIncomingConnection(int id, Destination d) throws IOException {
if (getStreamSession() == null) {
if (streamSession == null) {
_log.error("BUG! Received stream connection, but session is null!");
return;
}
@@ -974,7 +899,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
/** @param msg may be null */
public void notifyStreamOutgoingConnection ( int id, String result, String msg ) throws IOException
{
if ( getStreamSession() == null )
if ( streamSession == null )
{
_log.error ( "BUG! Received stream connection, but session is null!" );
return;
@@ -1015,9 +940,21 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
}
return rv;
}
/**
* Write a string and message, escaping the message.
* Writes s + createMessageString(msg) + \n
*
* @param s The string, non-null
* @param s The message may be null
* @since 0.9.25
*/
protected boolean writeString(String s, String msg) {
return writeString(s + createMessageString(msg) + '\n');
}
public void receiveStreamBytes(int id, ByteBuffer data) throws IOException {
if (getStreamSession() == null) {
if (streamSession == null) {
_log.error("Received stream bytes, but session is null!");
return;
}
@@ -1038,7 +975,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
/** @param msg may be null */
public void notifyStreamDisconnection(int id, String result, String msg) throws IOException {
if (getStreamSession() == null) {
if (streamSession == null) {
_log.error("BUG! Received stream disconnection, but session is null!");
return;
}
@@ -1053,7 +990,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (_log.shouldLog(Log.DEBUG))
_log.debug("stopStreamReceiving() invoked", new Exception("stopped"));
if (getStreamSession() == null) {
if (streamSession == null) {
_log.error("BUG! Got stream receiving stop, but session is null!");
return;
}

View File

@@ -36,13 +36,13 @@ import net.i2p.util.Log;
*
* @author mkvore
*/
class SAMv2StreamSession extends SAMStreamSession
{
/**
* Create a new SAM STREAM session.
*
* Caller MUST call start().
*
* @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile)
* @param dir Session direction ("RECEIVE", "CREATE" or "BOTH")
* @param props Properties to setup the I2P session
@@ -60,6 +60,8 @@ class SAMv2StreamSession extends SAMStreamSession
/**
* Create a new SAM STREAM session.
*
* Caller MUST call start().
*
* @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile)
* @param dir Session direction ("RECEIVE", "CREATE" or "BOTH")
* @param props Properties to setup the I2P session
@@ -86,7 +88,6 @@ class SAMv2StreamSession extends SAMStreamSession
* receive-only session
* @return true if the communication with the SAM client is ok
*/
@Override
public boolean connect ( int id, String dest, Properties props )
throws DataFormatException, SAMInvalidDirectionException
@@ -120,31 +121,25 @@ class SAMv2StreamSession extends SAMStreamSession
return true ;
}
/**
* SAM STREAM socket connecter, running in its own thread.
*
* @author mkvore
*/
* SAM STREAM socket connecter, running in its own thread.
*
* @author mkvore
*/
private class StreamConnector implements Runnable
{
private final int id;
private final Destination dest ;
private final I2PSocketOptions opts ;
/**
* Create a new SAM STREAM session socket reader
*
* @param id Unique id assigned to the handler
* @param dest Destination to reach
* @param opts Socket options (I2PSocketOptions)
* Create a new SAM STREAM session socket reader
*
* @param id Unique id assigned to the handler
* @param dest Destination to reach
* @param opts Socket options (I2PSocketOptions)
*/
public StreamConnector ( int id, Destination dest, I2PSocketOptions opts )// throws IOException
{
if (_log.shouldLog(Log.DEBUG))
@@ -155,7 +150,6 @@ class SAMv2StreamSession extends SAMStreamSession
this.dest = dest ;
}
public void run()
{
if (_log.shouldLog(Log.DEBUG))
@@ -215,18 +209,15 @@ class SAMv2StreamSession extends SAMStreamSession
}
}
/**
* Lets us push data through the stream without blocking, (even after exceeding
* the I2PSocket's buffer)
* Lets us push data through the stream without blocking, (even after exceeding
* the I2PSocket's buffer)
*
* @param s I2PSocket
* @param id Socket ID
* @return v2StreamSender
* @throws IOException
*/
@Override
protected StreamSender newStreamSender ( I2PSocket s, int id ) throws IOException
{
@@ -241,7 +232,6 @@ class SAMv2StreamSession extends SAMStreamSession
}
private class V2StreamSender extends StreamSender
{
private final List<ByteArray> _data;
private int _dataSize;
@@ -260,12 +250,12 @@ class SAMv2StreamSession extends SAMStreamSession
}
/**
* Send bytes through the SAM STREAM session socket sender
*
* Send bytes through the SAM STREAM session socket sender
*
* @param in Data stream of data to send
* @param size Count of bytes to send
* @throws IOException if the client didnt provide enough data
*/
*/
@Override
public void sendBytes ( InputStream in, int size ) throws IOException
{
@@ -307,9 +297,9 @@ class SAMv2StreamSession extends SAMStreamSession
}
/**
* Stop a SAM STREAM session socket sender thread immediately
*
*/
* Stop a SAM STREAM session socket sender thread immediately
*
*/
@Override
public void stopRunning()
{
@@ -342,9 +332,9 @@ class SAMv2StreamSession extends SAMStreamSession
}
/**
* Stop a SAM STREAM session socket sender gracefully: stop the
* sender thread once all pending data has been sent.
*/
* Stop a SAM STREAM session socket sender gracefully: stop the
* sender thread once all pending data has been sent.
*/
@Override
public void shutDownGracefully()
{
@@ -431,8 +421,6 @@ class SAMv2StreamSession extends SAMStreamSession
}
}
/**
* Send bytes through a SAM STREAM session.
*
@@ -459,30 +447,24 @@ class SAMv2StreamSession extends SAMStreamSession
return true;
}
/**
* SAM STREAM socket reader, running in its own thread. It forwards
* forward data to/from an I2P socket.
*
* @author human
* SAM STREAM socket reader, running in its own thread. It forwards
* forward data to/from an I2P socket.
*
* @author human
*/
public class SAMv2StreamSessionSocketReader extends SAMv1StreamSessionSocketReader
{
protected boolean nolimit ;
protected long limit ;
protected long totalReceived ;
/**
* Create a new SAM STREAM session socket reader
*
* @param s Socket to be handled
* @param id Unique id assigned to the handler
*/
* Create a new SAM STREAM session socket reader
*
* @param s Socket to be handled
* @param id Unique id assigned to the handler
*/
public SAMv2StreamSessionSocketReader ( I2PSocket s, int id ) throws IOException
{
super ( s, id );
@@ -581,7 +563,4 @@ class SAMv2StreamSession extends SAMStreamSession
_log.debug ( "Shutting down SAM STREAM session socket handler " + id );
}
}
}

View File

@@ -137,6 +137,7 @@ class SAMv3DatagramServer implements Handler {
private static class MessageDispatcher implements Runnable {
private final ByteArrayInputStream is;
private static final int MAX_LINE_LENGTH = 2*1024;
public MessageDispatcher(byte[] buf) {
this.is = new ByteArrayInputStream(buf) ;
@@ -144,8 +145,21 @@ class SAMv3DatagramServer implements Handler {
public void run() {
try {
String header = DataHelper.readLine(is).trim();
// not UTF-8
//String header = DataHelper.readLine(is).trim();
// we cannot use SAMUtils.parseParams() here
final UTF8Reader reader = new UTF8Reader(is);
final StringBuilder buf = new StringBuilder(MAX_LINE_LENGTH);
int c;
int i = 0;
while ((c = reader.read()) != -1) {
if (++i > MAX_LINE_LENGTH)
throw new IOException("Line too long - max " + MAX_LINE_LENGTH);
if (c == '\n')
break;
buf.append((char)c);
}
String header = buf.toString();
StringTokenizer tok = new StringTokenizer(header, " ");
if (tok.countTokens() < 3) {
// This is not a correct message, for sure
@@ -160,57 +174,89 @@ class SAMv3DatagramServer implements Handler {
String nick = tok.nextToken();
String dest = tok.nextToken();
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if (rec!=null) {
Properties sprops = rec.getProps();
// 3.2 props
String pr = sprops.getProperty("PROTOCOL");
String fp = sprops.getProperty("FROM_PORT");
String tp = sprops.getProperty("TO_PORT");
// 3.3 props
// If this is a straight DATAGRAM or RAW session, we
// don't need to send these, the router already got them in
// the options, but if a subsession, we must, so just
// do it all the time.
String st = sprops.getProperty("crypto.tagsToSend");
String tt = sprops.getProperty("crypto.lowTagThreshold");
String sl = sprops.getProperty("shouldBundleReplyInfo");
String exms = sprops.getProperty("clientMessageTimeout"); // ms
String exs = null; // seconds
while (tok.hasMoreTokens()) {
String t = tok.nextToken();
// 3.2 props
if (t.startsWith("PROTOCOL="))
pr = t.substring("PROTOCOL=".length());
else if (t.startsWith("FROM_PORT="))
fp = t.substring("FROM_PORT=".length());
else if (t.startsWith("TO_PORT="))
tp = t.substring("TO_PORT=".length());
// 3.3 props
else if (t.startsWith("SEND_TAGS="))
st = t.substring("SEND_TAGS=".length());
else if (t.startsWith("TAG_THRESHOLD="))
tt = t.substring("TAG_THRESHOLD=".length());
else if (t.startsWith("EXPIRES="))
exs = t.substring("EXPIRES=".length());
else if (t.startsWith("SEND_LEASESET="))
sl = t.substring("SEND_LEASESET=".length());
}
// 3.2 props
int proto = I2PSession.PROTO_UNSPECIFIED;
int fromPort = I2PSession.PORT_UNSPECIFIED;
int toPort = I2PSession.PORT_UNSPECIFIED;
if (pr != null) {
try {
// 3.3 props
int sendTags = 0;
int tagThreshold = 0;
int expires = 0; // seconds
boolean sendLeaseSet = true;
try {
// 3.2 props
if (pr != null)
proto = Integer.parseInt(pr);
} catch (NumberFormatException nfe) {
warn("Bad datagram header received");
return;
}
}
if (fp != null) {
try {
if (fp != null)
fromPort = Integer.parseInt(fp);
} catch (NumberFormatException nfe) {
warn("Bad datagram header received");
return;
}
}
if (tp != null) {
try {
if (tp != null)
toPort = Integer.parseInt(tp);
} catch (NumberFormatException nfe) {
warn("Bad datagram header received");
return;
}
// 3.3 props
if (st != null)
sendTags = Integer.parseInt(st);
if (tt != null)
tagThreshold = Integer.parseInt(tt);
if (exs != null)
expires = Integer.parseInt(exs);
else if (exms != null)
expires = Integer.parseInt(exms) / 1000;
if (sl != null)
sendLeaseSet = Boolean.parseBoolean(sl);
} catch (NumberFormatException nfe) {
warn("Bad datagram header received");
return;
}
// TODO too many allocations and copies. One here and one in Listener above.
byte[] data = new byte[is.available()];
is.read(data);
SAMv3Handler.Session sess = rec.getHandler().getSession();
if (sess != null)
sess.sendBytes(dest, data, proto, fromPort, toPort);
else
Session sess = rec.getHandler().getSession();
if (sess != null) {
if (sendTags > 0 || tagThreshold > 0 || expires > 0 || !sendLeaseSet) {
sess.sendBytes(dest, data, proto, fromPort, toPort,
sendLeaseSet, sendTags, tagThreshold, expires);
} else {
sess.sendBytes(dest, data, proto, fromPort, toPort);
}
} else {
warn("Dropping datagram, no session for " + nick);
}
} else {
warn("Dropping datagram, no session for " + nick);
}

View File

@@ -6,19 +6,20 @@
package net.i2p.sam;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress ;
import java.nio.ByteBuffer;
import java.util.Properties;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.Log;
import java.net.InetSocketAddress;
import java.net.SocketAddress ;
import java.nio.ByteBuffer;
class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Handler.Session, SAMDatagramReceiver {
class SAMv3DatagramSession extends SAMDatagramSession implements Session, SAMDatagramReceiver {
private final SAMv3Handler handler;
private final SAMv3DatagramServer server;
@@ -31,6 +32,8 @@ class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Handler.Se
* build a DatagramSession according to informations registered
* with the given nickname
*
* Caller MUST call start().
*
* @param nick nickname of the session
* @throws IOException
* @throws DataFormatException
@@ -46,28 +49,37 @@ class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Handler.Se
this.recv = this; // replacement
this.server = dgServer;
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if (rec == null)
throw new SAMException("Record disappeared for nickname : \""+nick+"\"");
this.handler = rec.getHandler();
Properties props = rec.getProps();
String portStr = props.getProperty("PORT");
if (portStr == null) {
if (_log.shouldDebug())
_log.debug("receiver port not specified. Current socket will be used.");
this.clientAddress = null;
} else {
int port = Integer.parseInt(portStr);
String host = props.getProperty("HOST");
if (host == null) {
host = rec.getHandler().getClientIP();
if (_log.shouldDebug())
_log.debug("no host specified. Taken from the client socket : " + host+':'+port);
}
this.clientAddress = new InetSocketAddress(host, port);
}
clientAddress = SAMv3RawSession.getSocketAddress(props, handler);
}
/**
* Build a Datagram Session on an existing i2p session
* registered with the given nickname
*
* Caller MUST call start().
*
* @param nick nickname of the session
* @throws IOException
* @throws DataFormatException
* @throws I2PSessionException
* @since 0.9.25
*/
public SAMv3DatagramSession(String nick, Properties props, SAMv3Handler handler, I2PSession isess,
int listenPort, SAMv3DatagramServer dgServer)
throws IOException, DataFormatException, I2PSessionException {
super(isess, props, listenPort, null); // to be replaced by this
this.nick = nick ;
this.recv = this ; // replacement
this.server = dgServer;
this.handler = handler;
clientAddress = SAMv3RawSession.getSocketAddress(props, handler);
}
public void receiveDatagramBytes(Destination sender, byte[] data, int proto,

View File

@@ -23,7 +23,6 @@ import java.net.NoRouteToHostException;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.HashMap;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
@@ -49,6 +48,7 @@ class SAMv3Handler extends SAMv1Handler
{
private Session session;
// TODO remove singleton, hang off SAMBridge like dgserver
public static final SessionsDB sSessionsHash = new SessionsDB();
private volatile boolean stolenSocket;
private volatile boolean streamForwardingSocket;
@@ -56,13 +56,7 @@ class SAMv3Handler extends SAMv1Handler
private long _lastPing;
private static final int FIRST_READ_TIMEOUT = 60*1000;
private static final int READ_TIMEOUT = 3*60*1000;
interface Session {
String getNick();
void close();
boolean sendBytes(String dest, byte[] data, int proto,
int fromPort, int toPort) throws DataFormatException, I2PSessionException;
}
private static final String AUTH_ERROR = "AUTH STATUS RESULT=I2P_ERROR";
/**
* Create a new SAM version 3 handler. This constructor expects
@@ -104,121 +98,6 @@ class SAMv3Handler extends SAMv1Handler
{
return (verMajor == 3);
}
/**
* The values in the SessionsDB
*/
public static class SessionRecord
{
private final String m_dest ;
private final Properties m_props ;
private ThreadGroup m_threadgroup ;
private final SAMv3Handler m_handler ;
public SessionRecord( String dest, Properties props, SAMv3Handler handler )
{
m_dest = dest;
m_props = new Properties() ;
m_props.putAll(props);
m_handler = handler ;
}
public SessionRecord( SessionRecord in )
{
m_dest = in.getDest();
m_props = in.getProps();
m_threadgroup = in.getThreadGroup();
m_handler = in.getHandler();
}
public String getDest()
{
return m_dest;
}
synchronized public Properties getProps()
{
Properties p = new Properties();
p.putAll(m_props);
return m_props;
}
public SAMv3Handler getHandler()
{
return m_handler ;
}
synchronized public ThreadGroup getThreadGroup()
{
return m_threadgroup ;
}
synchronized public void createThreadGroup(String name)
{
if (m_threadgroup == null)
m_threadgroup = new ThreadGroup(name);
}
}
/**
* basically a HashMap from String to SessionRecord
*/
public static class SessionsDB
{
private static final long serialVersionUID = 0x1;
static class ExistingIdException extends Exception {
private static final long serialVersionUID = 0x1;
}
static class ExistingDestException extends Exception {
private static final long serialVersionUID = 0x1;
}
private final HashMap<String, SessionRecord> map;
public SessionsDB() {
map = new HashMap<String, SessionRecord>() ;
}
/** @return success */
synchronized public boolean put( String nick, SessionRecord session )
throws ExistingIdException, ExistingDestException
{
if ( map.containsKey(nick) ) {
throw new ExistingIdException();
}
for ( SessionRecord r : map.values() ) {
if (r.getDest().equals(session.getDest())) {
throw new ExistingDestException();
}
}
if ( !map.containsKey(nick) ) {
session.createThreadGroup("SAM session "+nick);
map.put(nick, session) ;
return true ;
}
else
return false ;
}
/** @return true if removed */
synchronized public boolean del( String nick )
{
return map.remove(nick) != null;
}
synchronized public SessionRecord get(String nick)
{
return map.get(nick);
}
synchronized public boolean containsKey( String nick )
{
return map.containsKey(nick);
}
}
public String getClientIP()
{
@@ -255,7 +134,31 @@ class SAMv3Handler extends SAMv1Handler
Session getSession() {
return session;
}
/**
* For subsessions created by MasterSession
* @since 0.9.25
*/
void setSession(SAMv3RawSession sess) {
rawSession = sess; session = sess;
}
/**
* For subsessions created by MasterSession
* @since 0.9.25
*/
void setSession(SAMv3DatagramSession sess) {
datagramSession = sess; session = sess;
}
/**
* For subsessions created by MasterSession
* @since 0.9.25
*/
void setSession(SAMv3StreamSession sess) {
streamSession = sess; session = sess;
}
@Override
public void handle() {
String msg = null;
@@ -294,7 +197,7 @@ class SAMv3Handler extends SAMv1Handler
if (now - _lastPing >= READ_TIMEOUT) {
if (_log.shouldWarn())
_log.warn("Failed to respond to PING");
writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
writeString(SESSION_ERROR, "PONG timeout");
break;
}
} else {
@@ -309,13 +212,13 @@ class SAMv3Handler extends SAMv1Handler
if (now - _lastPing >= 2*READ_TIMEOUT) {
if (_log.shouldWarn())
_log.warn("Failed to respond to PING");
writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
writeString(SESSION_ERROR, "PONG timeout");
break;
}
} else if (_lastPing < 0) {
if (_log.shouldWarn())
_log.warn("2nd timeout");
writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n");
writeString(SESSION_ERROR, "command timeout, bye");
break;
} else {
// don't clear buffer, don't send ping,
@@ -336,7 +239,7 @@ class SAMv3Handler extends SAMv1Handler
ReadLine.readLine(socket, buf, gotFirstLine ? 0 : FIRST_READ_TIMEOUT);
socket.setSoTimeout(0);
} catch (SocketTimeoutException ste) {
writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n");
writeString(SESSION_ERROR, "command timeout, bye");
break;
}
line = buf.toString();
@@ -373,7 +276,7 @@ class SAMv3Handler extends SAMv1Handler
if (opcode == null) {
// This is not a correct message, for sure
if (writeString(domain + " STATUS RESULT=I2P_ERROR MESSAGE=\"command not specified\"\n"))
if (writeString(domain + " STATUS RESULT=I2P_ERROR", "command not specified"))
continue;
else
break;
@@ -411,10 +314,13 @@ class SAMv3Handler extends SAMv1Handler
} catch (IOException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Caught IOException in handler", e);
writeString(SESSION_ERROR, e.getMessage());
} catch (SAMException e) {
_log.error("Unexpected exception for message [" + msg + ']', e);
writeString(SESSION_ERROR, e.getMessage());
} catch (RuntimeException e) {
_log.error("Unexpected exception for message [" + msg + ']', e);
writeString(SESSION_ERROR, e.getMessage());
} finally {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Stopping handler");
@@ -492,43 +398,48 @@ class SAMv3Handler extends SAMv1Handler
protected boolean execSessionMessage(String opcode, Properties props) {
String dest = "BUG!";
String nick = null ;
boolean ok = false ;
String nick = (String) props.remove("ID");
if (nick == null)
return writeString(SESSION_ERROR, "ID not specified");
String style = (String) props.remove("STYLE");
if (style == null && !opcode.equals("REMOVE"))
return writeString(SESSION_ERROR, "No SESSION STYLE specified");
try{
if (opcode.equals("CREATE")) {
if ((this.getRawSession()!= null) || (this.getDatagramSession() != null)
|| (this.getStreamSession() != null)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Trying to create a session, but one still exists");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n");
return writeString(SESSION_ERROR, "Session already exists");
}
if (props.isEmpty()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No parameters specified in SESSION CREATE message");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No parameters for SESSION CREATE\"\n");
return writeString(SESSION_ERROR, "No parameters for SESSION CREATE");
}
dest = props.getProperty("DESTINATION");
dest = (String) props.remove("DESTINATION");
if (dest == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION DESTINATION parameter not specified");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"DESTINATION not specified\"\n");
return writeString(SESSION_ERROR, "DESTINATION not specified");
}
props.remove("DESTINATION");
if (dest.equals("TRANSIENT")) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("TRANSIENT destination requested");
String sigTypeStr = props.getProperty("SIGNATURE_TYPE");
String sigTypeStr = (String) props.remove("SIGNATURE_TYPE");
SigType sigType;
if (sigTypeStr != null) {
sigType = SigType.parseSigType(sigTypeStr);
if (sigType == null) {
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"SIGNATURE_TYPE "
+ sigTypeStr + " unsupported\"\n");
return writeString(SESSION_ERROR, "SIGNATURE_TYPE "
+ sigTypeStr + " unsupported");
}
props.remove("SIGNATURE_TYPE");
} else {
sigType = SigType.DSA_SHA1;
}
@@ -543,24 +454,6 @@ class SAMv3Handler extends SAMv1Handler
return writeString("SESSION STATUS RESULT=INVALID_KEY\n");
}
nick = props.getProperty("ID");
if (nick == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION ID parameter not specified");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"ID not specified\"\n");
}
props.remove("ID");
String style = props.getProperty("STYLE");
if (style == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION STYLE parameter not specified");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n");
}
props.remove("STYLE");
// Unconditionally override what the client may have set
// (iMule sets BestEffort) as None is more efficient
// and the client has no way to access delivery notifications
@@ -570,7 +463,14 @@ class SAMv3Handler extends SAMv1Handler
Properties allProps = new Properties();
allProps.putAll(i2cpProps);
allProps.putAll(props);
if (style.equals("MASTER")) {
// We must put these here, as SessionRecord.getProps() makes a copy,
// and the socket manager is instantiated in the
// SAMStreamSession constructor.
allProps.setProperty("i2p.streaming.enforceProtocol", "true");
allProps.setProperty("i2cp.dontPublishLeaseSet", "false");
}
try {
sSessionsHash.put( nick, new SessionRecord(dest, allProps, this) ) ;
@@ -590,44 +490,71 @@ class SAMv3Handler extends SAMv1Handler
SAMv3RawSession v3 = new SAMv3RawSession(nick, dgs);
rawSession = v3;
this.session = v3;
v3.start();
} else if (style.equals("DATAGRAM")) {
SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props);
SAMv3DatagramSession v3 = new SAMv3DatagramSession(nick, dgs);
datagramSession = v3;
this.session = v3;
v3.start();
} else if (style.equals("STREAM")) {
SAMv3StreamSession v3 = newSAMStreamSession(nick);
streamSession = v3;
this.session = v3;
v3.start();
} else if (style.equals("MASTER")) {
SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props);
MasterSession v3 = new MasterSession(nick, dgs, this, allProps);
streamSession = v3;
datagramSession = v3;
rawSession = v3;
this.session = v3;
v3.start();
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized SESSION STYLE: \"" + style +"\"");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n");
return writeString(SESSION_ERROR, "Unrecognized SESSION STYLE");
}
ok = true ;
return writeString("SESSION STATUS RESULT=OK DESTINATION="
+ dest + "\n");
} else if (opcode.equals("ADD") || opcode.equals("REMOVE")) {
// prevent trouble in finally block
ok = true;
if (streamSession == null || datagramSession == null || rawSession == null)
return writeString(SESSION_ERROR, "Not a MASTER session");
MasterSession msess = (MasterSession) session;
String msg;
if (opcode.equals("ADD")) {
msg = msess.add(nick, style, props);
} else {
msg = msess.remove(nick, props);
}
if (msg == null)
return writeString("SESSION STATUS RESULT=OK ID=\"" + nick + '"', opcode + ' ' + nick);
else
return writeString(SESSION_ERROR + " ID=\"" + nick + '"', msg);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized SESSION message opcode: \""
+ opcode + "\"");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized opcode\"\n");
return writeString(SESSION_ERROR, "Unrecognized opcode");
}
} catch (DataFormatException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Invalid destination specified");
return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString("SESSION STATUS RESULT=INVALID_KEY", e.getMessage());
} catch (I2PSessionException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P error when instantiating session", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
} catch (SAMException e) {
if (_log.shouldLog(Log.INFO))
_log.info("Funny SAM error", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
} catch (IOException e) {
_log.error("Unexpected IOException", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
} finally {
// unregister the session if it has not been created
if ( !ok && nick!=null ) {
@@ -655,15 +582,14 @@ class SAMv3Handler extends SAMv1Handler
if ( session != null )
{
_log.error ( "STREAM message received, but this session is a master session" );
_log.error("v3 control socket cannot be used for STREAM");
try {
notifyStreamResult(true, "I2P_ERROR", "master session cannot be used for streams");
notifyStreamResult(true, "I2P_ERROR", "v3 control socket cannot be used for STREAM");
} catch (IOException e) {}
return false;
}
nick = props.getProperty("ID");
nick = (String) props.remove("ID");
if (nick == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION ID parameter not specified");
@@ -672,26 +598,23 @@ class SAMv3Handler extends SAMv1Handler
} catch (IOException e) {}
return false ;
}
props.remove("ID");
rec = sSessionsHash.get(nick);
if ( rec==null ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM SESSION ID does not exist");
try {
notifyStreamResult(true, "INVALID_ID", "STREAM SESSION ID does not exist");
notifyStreamResult(true, "INVALID_ID", "STREAM SESSION ID " + nick + " does not exist");
} catch (IOException e) {}
return false ;
}
streamSession = rec.getHandler().streamSession ;
if (streamSession==null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("specified ID is not a stream session");
try {
notifyStreamResult(true, "I2P_ERROR", "specified ID is not a STREAM session");
notifyStreamResult(true, "I2P_ERROR", "specified ID " + nick + " is not a STREAM session");
} catch (IOException e) {}
return false ;
}
@@ -733,14 +656,13 @@ class SAMv3Handler extends SAMv1Handler
return false;
}
String dest = props.getProperty("DESTINATION");
String dest = (String) props.remove("DESTINATION");
if (dest == null) {
notifyStreamResult(verbose, "I2P_ERROR", "Destination not specified in STREAM CONNECT message");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Destination not specified in STREAM CONNECT message");
return false;
}
props.remove("DESTINATION");
try {
((SAMv3StreamSession)streamSession).connect( this, dest, props );
@@ -748,19 +670,19 @@ class SAMv3Handler extends SAMv1Handler
} catch (DataFormatException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Invalid destination in STREAM CONNECT message");
notifyStreamResult ( verbose, "INVALID_KEY", null );
notifyStreamResult ( verbose, "INVALID_KEY", e.getMessage());
} catch (ConnectException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM CONNECT failed", e);
notifyStreamResult ( verbose, "CONNECTION_REFUSED", null );
notifyStreamResult ( verbose, "CONNECTION_REFUSED", e.getMessage());
} catch (NoRouteToHostException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM CONNECT failed", e);
notifyStreamResult ( verbose, "CANT_REACH_PEER", null );
notifyStreamResult ( verbose, "CANT_REACH_PEER", e.getMessage());
} catch (InterruptedIOException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM CONNECT failed", e);
notifyStreamResult ( verbose, "TIMEOUT", null );
notifyStreamResult ( verbose, "TIMEOUT", e.getMessage());
} catch (I2PException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM CONNECT failed", e);
@@ -812,7 +734,7 @@ class SAMv3Handler extends SAMv1Handler
} catch (SAMException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM ACCEPT failed", e);
notifyStreamResult ( verbose, "ALREADY_ACCEPTING", null );
notifyStreamResult ( verbose, "ALREADY_ACCEPTING", e.getMessage());
}
} catch (IOException e) {
}
@@ -820,6 +742,11 @@ class SAMv3Handler extends SAMv1Handler
}
/**
* @param verbose if false, does nothing
* @param result non-null
* @param message may be null
*/
public void notifyStreamResult(boolean verbose, String result, String message) throws IOException {
if (!verbose) return ;
String msgString = createMessageString(message);
@@ -870,29 +797,29 @@ class SAMv3Handler extends SAMv1Handler
String user = props.getProperty("USER");
String pw = props.getProperty("PASSWORD");
if (user == null || pw == null)
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"USER and PASSWORD required\"\n");
return writeString(AUTH_ERROR, "USER and PASSWORD required");
String prop = SAMBridge.PROP_PW_PREFIX + user + SAMBridge.PROP_PW_SUFFIX;
if (i2cpProps.containsKey(prop))
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"user " + user + " already exists\"\n");
return writeString(AUTH_ERROR, "user " + user + " already exists");
PasswordManager pm = new PasswordManager(I2PAppContext.getGlobalContext());
String shash = pm.createHash(pw);
i2cpProps.setProperty(prop, shash);
} else if (opcode.equals("REMOVE")) {
String user = props.getProperty("USER");
if (user == null)
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"USER required\"\n");
return writeString(AUTH_ERROR, "USER required");
String prop = SAMBridge.PROP_PW_PREFIX + user + SAMBridge.PROP_PW_SUFFIX;
if (!i2cpProps.containsKey(prop))
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"user " + user + " not found\"\n");
return writeString(AUTH_ERROR, "user " + user + " not found");
i2cpProps.remove(prop);
} else {
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"Unknown AUTH command\"\n");
return writeString(AUTH_ERROR, "Unknown AUTH command");
}
try {
bridge.saveConfig();
return writeString("AUTH STATUS RESULT=OK\n");
} catch (IOException ioe) {
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"Config save failed: " + ioe + "\"\n");
return writeString(AUTH_ERROR, "Config save failed: " + ioe);
}
}

View File

@@ -10,6 +10,7 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Properties;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
@@ -19,7 +20,7 @@ import net.i2p.util.Log;
* @author MKVore
*
*/
class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Session, SAMRawReceiver {
class SAMv3RawSession extends SAMRawSession implements Session, SAMRawReceiver {
private final String nick;
private final SAMv3Handler handler;
@@ -33,6 +34,8 @@ class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Session, SA
* Build a Raw Datagram Session according to information
* registered with the given nickname
*
* Caller MUST call start().
*
* @param nick nickname of the session
* @throws IOException
* @throws DataFormatException
@@ -42,36 +45,64 @@ class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Session, SA
throws IOException, DataFormatException, I2PSessionException {
super(SAMv3Handler.sSessionsHash.get(nick).getDest(),
SAMv3Handler.sSessionsHash.get(nick).getProps(),
SAMv3Handler.sSessionsHash.get(nick).getHandler() // to be replaced by this
null // to be replaced by this
);
this.nick = nick ;
this.recv = this ; // replacement
this.server = dgServer;
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if (rec == null)
throw new InterruptedIOException() ;
this.handler = rec.getHandler();
Properties props = rec.getProps();
clientAddress = getSocketAddress(props, handler);
_sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) &&
Boolean.parseBoolean(props.getProperty("HEADER"));
}
/**
* Build a Raw Session on an existing i2p session
* registered with the given nickname
*
* Caller MUST call start().
*
* @param nick nickname of the session
* @throws IOException
* @throws DataFormatException
* @throws I2PSessionException
* @since 0.9.25
*/
public SAMv3RawSession(String nick, Properties props, SAMv3Handler handler, I2PSession isess,
int listenProtocol, int listenPort, SAMv3DatagramServer dgServer)
throws IOException, DataFormatException, I2PSessionException {
super(isess, props, listenProtocol, listenPort, null); // to be replaced by this
this.nick = nick ;
this.recv = this ; // replacement
this.server = dgServer;
this.handler = handler;
clientAddress = getSocketAddress(props, handler);
_sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) &&
Boolean.parseBoolean(props.getProperty("HEADER"));
}
/**
* @return null if PORT not set
* @since 0.9.25 moved from constructor
*/
static SocketAddress getSocketAddress(Properties props, SAMv3Handler handler) {
String portStr = props.getProperty("PORT") ;
if (portStr == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("receiver port not specified. Current socket will be used.");
this.clientAddress = null;
return null;
} else {
int port = Integer.parseInt(portStr);
String host = props.getProperty("HOST");
if ( host==null ) {
host = rec.getHandler().getClientIP();
if (_log.shouldLog(Log.DEBUG))
_log.debug("no host specified. Taken from the client socket : " + host +':'+port);
host = handler.getClientIP();
}
this.clientAddress = new InetSocketAddress(host, port);
return new InetSocketAddress(host, port);
}
_sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) &&
Boolean.parseBoolean(props.getProperty("HEADER"));
}
public void receiveRawBytes(byte[] data, int proto, int fromPort, int toPort) throws IOException {
if (this.clientAddress==null) {
this.handler.receiveRawBytes(data, proto, fromPort, toPort);

View File

@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.security.GeneralSecurityException;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLException;
@@ -30,6 +31,7 @@ import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.client.streaming.I2PSocketOptions;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
@@ -43,16 +45,19 @@ import net.i2p.util.Log;
* @author mkvore
*/
class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Session
class SAMv3StreamSession extends SAMStreamSession implements Session
{
private static final int BUFFER_SIZE = 1024 ;
private static final int BUFFER_SIZE = 1024;
private static final int MAX_ACCEPT_QUEUE = 64;
private final Object socketServerLock = new Object();
/** this is ONLY set for FORWARD, not for ACCEPT */
private I2PServerSocket socketServer;
/** this is the count of active ACCEPT sockets */
private final AtomicInteger _acceptors = new AtomicInteger();
/** for subsession only, null otherwise */
private final LinkedBlockingQueue<I2PSocket> _acceptQueue;
private static I2PSSLSocketFactory _sslSocketFactory;
@@ -66,6 +71,8 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
* Create a new SAM STREAM session, according to information
* registered with the given nickname
*
* Caller MUST call start().
*
* @param login The nickname
* @throws IOException
* @throws DataFormatException
@@ -79,9 +86,61 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
getDB().get(login).getProps(),
getDB().get(login).getHandler());
this.nick = login ;
_acceptQueue = null;
}
public static SAMv3Handler.SessionsDB getDB()
/**
* Build a Stream Session on an existing I2P session
* registered with the given nickname
*
* Caller MUST call start().
*
* @param nick nickname of the session
* @throws IOException
* @throws DataFormatException
* @throws I2PSessionException
* @since 0.9.25
*/
public SAMv3StreamSession(String login, Properties props, SAMv3Handler handler, I2PSocketManager mgr,
int listenPort) throws IOException, DataFormatException, SAMException {
super(mgr, props, handler, listenPort);
this.nick = login ;
_acceptQueue = new LinkedBlockingQueue<I2PSocket>(MAX_ACCEPT_QUEUE);
}
/**
* Put a socket on the accept queue.
* Only for subsession, throws IllegalStateException otherwise.
*
* @return success, false if full
* @since 0.9.25
*/
public boolean queueSocket(I2PSocket sock) {
if (_acceptQueue == null)
throw new IllegalStateException();
return _acceptQueue.offer(sock);
}
/**
* Take a socket from the accept queue.
* Only for subsession, throws IllegalStateException otherwise.
*
* @since 0.9.25
*/
private I2PSocket acceptSocket() throws ConnectException {
if (_acceptQueue == null)
throw new IllegalStateException();
try {
// TODO there's no CoDel or expiration in this queue
return _acceptQueue.take();
} catch (InterruptedException ie) {
ConnectException ce = new ConnectException("interrupted");
ce.initCause(ie);
throw ce;
}
}
public static SessionsDB getDB()
{
return SAMv3Handler.sSessionsHash ;
}
@@ -135,7 +194,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
I2PSocket i2ps = socketMgr.connect(d, opts);
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if ( rec==null ) throw new InterruptedIOException() ;
@@ -185,15 +244,18 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
}
}
I2PSocket i2ps;
I2PSocket i2ps = null;
_acceptors.incrementAndGet();
try {
i2ps = socketMgr.getServerSocket().accept();
if (_acceptQueue != null)
i2ps = acceptSocket();
else
i2ps = socketMgr.getServerSocket().accept();
} finally {
_acceptors.decrementAndGet();
}
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if ( rec==null || i2ps==null ) throw new InterruptedIOException() ;
@@ -223,7 +285,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
*/
public void startForwardingIncoming(Properties props, boolean sendPorts) throws SAMException, InterruptedIOException
{
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT"));
if ( rec==null ) throw new InterruptedIOException() ;
@@ -257,25 +319,23 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
this.socketServer = this.socketMgr.getServerSocket();
}
SocketForwarder forwarder = new SocketForwarder(host, port, isSSL, this, verbose, sendPorts);
SocketForwarder forwarder = new SocketForwarder(host, port, isSSL, verbose, sendPorts);
(new I2PAppThread(rec.getThreadGroup(), forwarder, "SAMV3StreamForwarder")).start();
}
/**
* Forward sockets from I2P to the host/port provided
*/
private static class SocketForwarder implements Runnable
private class SocketForwarder implements Runnable
{
private final String host;
private final int port;
private final SAMv3StreamSession session;
private final boolean isSSL, verbose, sendPorts;
SocketForwarder(String host, int port, boolean isSSL,
SAMv3StreamSession session, boolean verbose, boolean sendPorts) {
boolean verbose, boolean sendPorts) {
this.host = host ;
this.port = port ;
this.session = session ;
this.verbose = verbose ;
this.sendPorts = sendPorts;
this.isSSL = isSSL;
@@ -283,12 +343,15 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
public void run()
{
while (session.getSocketServer()!=null) {
while (getSocketServer() != null) {
// wait and accept a connection from I2P side
I2PSocket i2ps;
try {
i2ps = session.getSocketServer().accept();
if (_acceptQueue != null)
i2ps = acceptSocket();
else
i2ps = getSocketServer().accept();
if (i2ps == null)
continue;
} catch (SocketTimeoutException ste) {
@@ -437,7 +500,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
}
}
private I2PServerSocket getSocketServer()
protected I2PServerSocket getSocketServer()
{
synchronized ( this.socketServerLock ) {
return this.socketServer ;
@@ -450,7 +513,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
*/
public void stopForwardingIncoming() throws SAMException, InterruptedIOException
{
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if ( rec==null ) throw new InterruptedIOException() ;
@@ -474,18 +537,11 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
/**
* Close the stream session
* TODO Why do we override?
*/
@Override
public void close() {
socketMgr.destroySocketManager();
}
/**
* Unsupported
* @throws DataFormatException always
*/
public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws DataFormatException
{
throw new DataFormatException(null);
if (_isOwnSession)
socketMgr.destroySocketManager();
}
}

View File

@@ -0,0 +1,11 @@
package net.i2p.sam;
/**
* A V3 session.
*
* @since 0.9.25 moved from SAMv3Handler
*/
interface Session extends SAMMessageSess {
String getNick();
}

View File

@@ -0,0 +1,71 @@
package net.i2p.sam;
/*
* free (adj.): unencumbered; not under the control of others
* Written by human in 2004 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.util.Properties;
/**
* The values in the SessionsDB
*
* @since 0.9.25 moved from SAMv3Handler
*/
class SessionRecord {
private final String m_dest ;
private final Properties m_props ;
private ThreadGroup m_threadgroup ;
private final SAMv3Handler m_handler ;
public SessionRecord( String dest, Properties props, SAMv3Handler handler )
{
m_dest = dest;
m_props = new Properties() ;
m_props.putAll(props);
m_handler = handler ;
}
public SessionRecord( SessionRecord in )
{
m_dest = in.getDest();
m_props = in.getProps();
m_threadgroup = in.getThreadGroup();
m_handler = in.getHandler();
}
public String getDest()
{
return m_dest;
}
/**
* Warning - returns a copy.
* @return a copy
*/
synchronized public Properties getProps()
{
Properties p = new Properties();
p.putAll(m_props);
return m_props;
}
public SAMv3Handler getHandler()
{
return m_handler ;
}
synchronized public ThreadGroup getThreadGroup()
{
return m_threadgroup ;
}
synchronized public void createThreadGroup(String name)
{
if (m_threadgroup == null)
m_threadgroup = new ThreadGroup(name);
}
}

View File

@@ -0,0 +1,76 @@
package net.i2p.sam;
/*
* free (adj.): unencumbered; not under the control of others
* Written by human in 2004 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.util.HashMap;
/**
* basically a HashMap from String to SessionRecord
*
* @since 0.9.25 moved from SAMv3Handler
*/
class SessionsDB {
private static final long serialVersionUID = 0x1;
static class ExistingIdException extends Exception {
private static final long serialVersionUID = 0x1;
}
static class ExistingDestException extends Exception {
private static final long serialVersionUID = 0x1;
}
private final HashMap<String, SessionRecord> map;
public SessionsDB() {
map = new HashMap<String, SessionRecord>() ;
}
public synchronized void put(String nick, SessionRecord session)
throws ExistingIdException, ExistingDestException
{
if ( map.containsKey(nick) ) {
throw new ExistingIdException();
}
for ( SessionRecord r : map.values() ) {
if (r.getDest().equals(session.getDest())) {
throw new ExistingDestException();
}
}
session.createThreadGroup("SAM session "+nick);
map.put(nick, session) ;
}
/** @since 0.9.25 */
public synchronized void putDupDestOK(String nick, SessionRecord session)
throws ExistingIdException
{
if (map.containsKey(nick)) {
throw new ExistingIdException();
}
session.createThreadGroup("SAM session "+nick);
map.put(nick, session) ;
}
/** @return true if removed */
synchronized public boolean del( String nick )
{
return map.remove(nick) != null;
}
synchronized public SessionRecord get(String nick)
{
return map.get(nick);
}
synchronized public boolean containsKey( String nick )
{
return map.containsKey(nick);
}
}

View File

@@ -18,6 +18,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
private String _version;
private final Object _helloLock = new Object();
private Boolean _sessionCreateOk;
private Boolean _sessionAddOk;
private Boolean _streamStatusOk;
private final Object _sessionCreateLock = new Object();
private final Object _namingReplyLock = new Object();
@@ -41,13 +42,19 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
}
}
/** may be called twice, first for CREATE and second for ADD */
@Override
public void sessionStatusReceived(String result, String destination, String msg) {
synchronized (_sessionCreateLock) {
Boolean ok;
if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result))
_sessionCreateOk = Boolean.TRUE;
ok = Boolean.TRUE;
else
_sessionCreateOk = Boolean.FALSE;
ok = Boolean.FALSE;
if (_sessionCreateOk == null)
_sessionCreateOk = ok;
else if (_sessionAddOk == null)
_sessionAddOk = ok;
_sessionCreateLock.notifyAll();
}
}
@@ -120,6 +127,25 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
}
}
/**
* Wait for the session to be added, returning true if everything went ok
*
* @return true if everything ok
* @since 0.9.25
*/
public boolean waitForSessionAddReply() {
while (true) {
try {
synchronized (_sessionCreateLock) {
if (_sessionAddOk == null)
_sessionCreateLock.wait();
else
return _sessionAddOk.booleanValue();
}
} catch (InterruptedException ie) { return false; }
}
}
/**
* Wait for the stream to be created, returning true if everything went ok
*

View File

@@ -137,6 +137,10 @@ public class SAMReader {
if ( (eq > 0) && (eq < pair.length() - 1) ) {
String name = pair.substring(0, eq);
String val = pair.substring(eq+1);
if (val.length() <= 0) {
_log.error("Empty value for " + name);
continue;
}
while ( (val.charAt(0) == '\"') && (val.length() > 0) )
val = val.substring(1);
while ( (val.length() > 0) && (val.charAt(val.length()-1) == '\"') )

View File

@@ -55,16 +55,21 @@ public class SAMStreamSend {
private static I2PSSLSocketFactory _sslSocketFactory;
private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4;
private static final String USAGE = "Usage: SAMStreamSend [-s] [-m mode] [-v version] [-b samHost] [-p samPort] [-o opt=val] [-u user] [-w password] peerDestFile dataDir\n" +
private static final int MASTER=8;
private static final String USAGE = "Usage: SAMStreamSend [-s] [-x] [-m mode] [-v version] [-b samHost] [-p samPort]\n" +
" [-o opt=val] [-u user] [-w password] peerDestFile dataDir\n" +
" modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" +
" default is stream\n" +
" -s: use SSL\n" +
" -x: use master session (forces -v 3.3)\n" +
" multiple -o session options are allowed";
public static void main(String args[]) {
Getopt g = new Getopt("SAM", args, "sb:m:o:p:u:v:w:");
Getopt g = new Getopt("SAM", args, "sxhb:m:o:p:u:v:w:");
boolean isSSL = false;
boolean isMaster = false;
int mode = STREAM;
String version = "1.0";
String version = "3.3";
String host = "127.0.0.1";
String port = "7656";
String user = null;
@@ -77,6 +82,10 @@ public class SAMStreamSend {
isSSL = true;
break;
case 'x':
isMaster = true;
break;
case 'm':
mode = Integer.parseInt(g.getOptarg());
if (mode < 0 || mode > V1RAW) {
@@ -123,6 +132,10 @@ public class SAMStreamSend {
System.err.println(USAGE);
return;
}
if (isMaster) {
mode += MASTER;
version = "3.3";
}
if ((user == null && password != null) ||
(user != null && password == null)) {
System.err.println("both user and password or neither");
@@ -162,6 +175,8 @@ public class SAMStreamSend {
_log.debug("Reader created");
OutputStream out = sock.getOutputStream();
String ourDest = handshake(out, version, true, eventHandler, mode, user, password, sessionOpts);
if (mode >= MASTER)
mode -= MASTER;
if (ourDest == null)
throw new IOException("handshake failed");
if (_log.shouldLog(Log.DEBUG))
@@ -230,7 +245,10 @@ public class SAMStreamSend {
return sock;
}
/** @return our b64 dest or null */
/**
* @param isMaster is this the control socket
* @return our b64 dest or null
*/
private String handshake(OutputStream samOut, String version, boolean isMaster,
SAMEventHandler eventHandler, int mode, String user, String password,
String opts) {
@@ -261,24 +279,66 @@ public class SAMStreamSend {
_v3ID = "xx€€xx" + _v3ID;
_conOptions = "ID=" + _v3ID;
}
boolean masterMode; // are we using v3.3 master session
String command;
if (mode >= MASTER) {
masterMode = true;
command = "ADD";
mode -= MASTER;
} else {
masterMode = false;
command = "CREATE DESTINATION=TRANSIENT";
}
String style;
if (mode == STREAM)
style = "STREAM";
else if (mode == DG || mode == V1DG)
style = "DATAGRAM";
else
else // RAW or V1RAW
style = "RAW";
String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + ' ' + opts + '\n';
if (masterMode) {
if (mode == V1DG || mode == V1RAW)
throw new IllegalArgumentException("v1 dg/raw incompatible with master session");
String req = "SESSION CREATE DESTINATION=TRANSIENT STYLE=MASTER ID=masterSend " + opts + '\n';
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION CREATE STYLE=MASTER sent");
boolean ok = eventHandler.waitForSessionCreateReply();
if (!ok)
throw new IOException("SESSION CREATE STYLE=MASTER failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION CREATE STYLE=MASTER reply found: " + ok);
// PORT required even if we aren't listening for this test
if (mode != STREAM)
opts += " PORT=9999";
}
String req = "SESSION " + command + " STYLE=" + style + ' ' + _conOptions + ' ' + opts + '\n';
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create sent");
boolean ok = eventHandler.waitForSessionCreateReply();
_log.debug("SESSION " + command + " sent");
boolean ok;
if (masterMode)
ok = eventHandler.waitForSessionAddReply();
else
ok = eventHandler.waitForSessionCreateReply();
if (!ok)
throw new IOException("Session create failed");
throw new IOException("SESSION " + command + " failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create reply found: " + ok);
_log.debug("SESSION " + command + " reply found: " + ok);
if (masterMode) {
// do a bunch more
req = "SESSION ADD STYLE=STREAM FROM_PORT=99 ID=stream99\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION ADD STYLE=STREAM FROM_PORT=98 ID=stream98\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION REMOVE ID=stream99\n";
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
}
req = "NAMING LOOKUP NAME=ME\n";
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
@@ -453,6 +513,7 @@ public class SAMStreamSend {
baos.write(DataHelper.getUTF8(" PROTOCOL=123 TO_PORT=5678"));
else
baos.write(DataHelper.getUTF8(" TO_PORT=5678"));
baos.write(DataHelper.getUTF8(" SEND_TAGS=19 TAG_THRESHOLD=13 EXPIRES=33 SEND_LEASESET=true"));
}
baos.write((byte) '\n');
baos.write(data, 0, read);

View File

@@ -58,21 +58,25 @@ public class SAMStreamSink {
private static I2PSSLSocketFactory _sslSocketFactory;
private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4, RAWHDR = 5, FORWARD = 6, FORWARDSSL=7;
private static final int MASTER=8;
private static final String USAGE = "Usage: SAMStreamSink [-s] [-m mode] [-v version] [-b samHost] [-p samPort]\n" +
" [-o opt=val] [-u user] [-w password] myDestFile sinkDir\n" +
" modes: stream: 0; datagram: 1; v1datagram: 2;\n" +
" raw: 3; v1raw: 4; raw-with-headers: 5;\n" +
" stream-forward: 6; stream-forward-ssl: 7\n" +
" default is stream\n" +
" -s: use SSL to connect to bridge\n" +
" -x: use master session (forces -v 3.3)\n" +
" multiple -o session options are allowed";
private static final int V3FORWARDPORT=9998;
private static final int V3DGPORT=9999;
public static void main(String args[]) {
Getopt g = new Getopt("SAM", args, "sb:m:p:u:v:w:");
Getopt g = new Getopt("SAM", args, "sxhb:m:p:u:v:w:");
boolean isSSL = false;
boolean isMaster = false;
int mode = STREAM;
String version = "1.0";
String version = "3.3";
String host = "127.0.0.1";
String port = "7656";
String user = null;
@@ -85,6 +89,10 @@ public class SAMStreamSink {
isSSL = true;
break;
case 'x':
isMaster = true;
break;
case 'm':
mode = Integer.parseInt(g.getOptarg());
if (mode < 0 || mode > FORWARDSSL) {
@@ -131,6 +139,10 @@ public class SAMStreamSink {
System.err.println(USAGE);
return;
}
if (isMaster) {
mode += MASTER;
version = "3.3";
}
if ((user == null && password != null) ||
(user != null && password == null)) {
System.err.println("both user and password or neither");
@@ -169,6 +181,8 @@ public class SAMStreamSink {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader created");
String ourDest = handshake(out, version, true, eventHandler, mode, user, password, sessionOpts);
if (mode >= MASTER)
mode -= MASTER;
if (ourDest == null)
throw new IOException("handshake failed");
if (_log.shouldLog(Log.DEBUG))
@@ -560,7 +574,10 @@ public class SAMStreamSink {
return sock;
}
/** @return our b64 dest or null */
/**
* @param isMaster is this the control socket
* @return our b64 dest or null
*/
private String handshake(OutputStream samOut, String version, boolean isMaster,
SAMEventHandler eventHandler, int mode, String user, String password,
String sopts) {
@@ -641,6 +658,16 @@ public class SAMStreamSink {
// and give it to the SAM server
dest = _destFile;
}
boolean masterMode; // are we using v3.3 master session
String command;
if (mode >= MASTER) {
masterMode = true;
command = "ADD";
mode -= MASTER;
} else {
masterMode = false;
command = "CREATE DESTINATION=" + dest;
}
String style;
if (mode == STREAM || mode == FORWARD || mode == FORWARDSSL)
style = "STREAM";
@@ -654,17 +681,61 @@ public class SAMStreamSink {
style = "RAW PORT=" + V3DGPORT;
else
style = "RAW HEADER=true PORT=" + V3DGPORT;
String req = "SESSION CREATE STYLE=" + style + " DESTINATION=" + dest + ' ' + _conOptions + ' ' + sopts + '\n';
if (masterMode) {
if (mode == V1DG || mode == V1RAW)
throw new IllegalArgumentException("v1 dg/raw incompatible with master session");
String req = "SESSION CREATE DESTINATION=" + dest + " STYLE=MASTER ID=masterSink " + sopts + '\n';
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION CREATE STYLE=MASTER sent");
boolean ok = eventHandler.waitForSessionCreateReply();
if (!ok)
throw new IOException("SESSION CREATE STYLE=MASTER failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION CREATE STYLE=MASTER reply found: " + ok);
}
String req = "SESSION " + command + " STYLE=" + style + ' ' + _conOptions + ' ' + sopts + '\n';
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create sent");
if (mode == STREAM) {
boolean ok = eventHandler.waitForSessionCreateReply();
_log.debug("SESSION " + command + " sent");
//if (mode == STREAM) {
boolean ok;
if (masterMode)
ok = eventHandler.waitForSessionAddReply();
else
ok = eventHandler.waitForSessionCreateReply();
if (!ok)
throw new IOException("Session create failed");
throw new IOException("SESSION " + command + " failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create reply found: " + ok);
_log.debug("SESSION " + command + " reply found: " + ok);
//}
if (masterMode) {
// do a bunch more
req = "SESSION ADD STYLE=STREAM FROM_PORT=99 ID=stream99\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION ADD STYLE=STREAM FROM_PORT=98 ID=stream98\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION ADD STYLE=DATAGRAM PORT=9997 LISTEN_PORT=97 ID=dg97\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION ADD STYLE=DATAGRAM PORT=9996 FROM_PORT=96 ID=dg96\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION ADD STYLE=RAW PORT=9995 LISTEN_PORT=95 ID=raw95\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION ADD STYLE=RAW PORT=9994 FROM_PORT=94 LISTEN_PROTOCOL=222 ID=raw94\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION REMOVE ID=stream99\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION REMOVE ID=raw95\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION REMOVE ID=notfound\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION REMOVE ID=masterSink\n"; // shouldn't remove ourselves
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
}
req = "NAMING LOOKUP NAME=ME\n";
samOut.write(req.getBytes("UTF-8"));