aboutsummaryrefslogtreecommitdiffstats
path: root/lib/jinterface/java_src/com
diff options
context:
space:
mode:
authorDmitriy Kargapolov <[email protected]>2015-01-31 23:18:23 -0500
committerDmitriy Kargapolov <[email protected]>2015-01-31 23:22:21 -0500
commitfd76d49c7d7bbed4775818390e47b958ee50f469 (patch)
tree196b0c358ab4f188a8498dfb8f255c8c4e47259f /lib/jinterface/java_src/com
parent8c93fcbb746be862e07db22b7406d370a12c39d2 (diff)
downloadotp-fd76d49c7d7bbed4775818390e47b958ee50f469.tar.gz
otp-fd76d49c7d7bbed4775818390e47b958ee50f469.tar.bz2
otp-fd76d49c7d7bbed4775818390e47b958ee50f469.zip
jinterface: transport factory implementation
Transport factory basic implementation added. This makes possible creating connections between nodes using ssh channels for example. Default transport factory based on standart Socket/ServerSocket classes is provided. Modifications are backward compatible.
Diffstat (limited to 'lib/jinterface/java_src/com')
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java46
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java46
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java5
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpCookedConnection.java5
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java46
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpLocalNode.java30
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java96
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpOutputStream.java13
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpPeer.java17
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSelf.java118
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpServerSocketTransport.java68
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpServerTransport.java46
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSocketTransport.java89
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSocketTransportFactory.java56
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpTransport.java49
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpTransportFactory.java124
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/java_files8
17 files changed, 787 insertions, 75 deletions
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java
index 1b0fe3e2e6..ab8fa06c1b 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java
@@ -20,7 +20,7 @@
package com.ericsson.otp.erlang;
import java.io.IOException;
-import java.net.Socket;
+import java.io.OutputStream;
import java.util.Random;
/**
@@ -84,7 +84,7 @@ public abstract class AbstractConnection extends Thread {
private volatile boolean done = false;
protected boolean connected = false; // connection status
- protected Socket socket; // communication channel
+ protected OtpTransport socket; // communication channel
protected OtpPeer peer; // who are we connected to
protected OtpLocalNode localNode; // this nodes id
String name; // local name of this connection
@@ -126,7 +126,7 @@ public abstract class AbstractConnection extends Thread {
* Accept an incoming connection from a remote node. Used by
* {@link OtpSelf#accept() OtpSelf.accept()} to create a connection based on
* data received when handshaking with the peer node, when the remote node
- * is the connection intitiator.
+ * is the connection initiator.
*
* @exception java.io.IOException
* if it was not possible to connect to the peer.
@@ -134,20 +134,17 @@ public abstract class AbstractConnection extends Thread {
* @exception OtpAuthException
* if handshake resulted in an authentication error
*/
- protected AbstractConnection(final OtpLocalNode self, final Socket s)
+ protected AbstractConnection(final OtpLocalNode self, final OtpTransport s)
throws IOException, OtpAuthException {
localNode = self;
- peer = new OtpPeer();
+ peer = new OtpPeer(self.transportFactory);
socket = s;
- socket.setTcpNoDelay(true);
-
traceLevel = defaultLevel;
setDaemon(true);
if (traceLevel >= handshakeThreshold) {
- System.out.println("<- ACCEPT FROM " + s.getInetAddress() + ":"
- + s.getPort());
+ System.out.println("<- ACCEPT FROM " + s);
}
// get his info
@@ -189,6 +186,8 @@ public abstract class AbstractConnection extends Thread {
// now get a connection between the two...
port = OtpEpmd.lookupPort(peer);
+ if (port == 0)
+ throw new IOException("No remote node found - cannot connect");
// now find highest common dist value
if (peer.proto != self.proto || self.distHigh < peer.distLow
@@ -523,7 +522,9 @@ public abstract class AbstractConnection extends Thread {
// received tick? send tock!
if (len == 0) {
synchronized (this) {
- socket.getOutputStream().write(tock);
+ OutputStream out = socket.getOutputStream();
+ out.write(tock);
+ out.flush();
}
}
@@ -837,8 +838,11 @@ public abstract class AbstractConnection extends Thread {
}
}
- header.writeTo(socket.getOutputStream());
- payload.writeTo(socket.getOutputStream());
+ // group flush op in favour of possible ssh-tunneled stream
+ OutputStream out = socket.getOutputStream();
+ header.writeTo(out);
+ payload.writeTo(out);
+ out.flush();
} catch (final IOException e) {
close();
throw e;
@@ -859,7 +863,7 @@ public abstract class AbstractConnection extends Thread {
+ e);
}
}
- header.writeTo(socket.getOutputStream());
+ header.writeToAndFlush(socket.getOutputStream());
} catch (final IOException e) {
close();
throw e;
@@ -913,7 +917,8 @@ public abstract class AbstractConnection extends Thread {
}
/* this method now throws exception if we don't get full read */
- protected int readSock(final Socket s, final byte[] b) throws IOException {
+ protected int readSock(final OtpTransport s, final byte[] b)
+ throws IOException {
int got = 0;
final int len = b.length;
int i;
@@ -980,8 +985,7 @@ public abstract class AbstractConnection extends Thread {
protected void doConnect(final int port) throws IOException,
OtpAuthException {
try {
- socket = new Socket(peer.host(), port);
- socket.setTcpNoDelay(true);
+ socket = peer.createTransport(peer.host(), port);
if (traceLevel >= handshakeThreshold) {
System.out.println("-> MD5 CONNECT TO " + peer.host() + ":"
@@ -1077,7 +1081,7 @@ public abstract class AbstractConnection extends Thread {
obuf.write4BE(aflags);
obuf.write(str.getBytes());
- obuf.writeTo(socket.getOutputStream());
+ obuf.writeToAndFlush(socket.getOutputStream());
if (traceLevel >= handshakeThreshold) {
System.out.println("-> " + "HANDSHAKE sendName" + " flags="
@@ -1098,7 +1102,7 @@ public abstract class AbstractConnection extends Thread {
obuf.write4BE(challenge);
obuf.write(str.getBytes());
- obuf.writeTo(socket.getOutputStream());
+ obuf.writeToAndFlush(socket.getOutputStream());
if (traceLevel >= handshakeThreshold) {
System.out.println("-> " + "HANDSHAKE sendChallenge" + " flags="
@@ -1232,7 +1236,7 @@ public abstract class AbstractConnection extends Thread {
obuf.write1(ChallengeReply);
obuf.write4BE(challenge);
obuf.write(digest);
- obuf.writeTo(socket.getOutputStream());
+ obuf.writeToAndFlush(socket.getOutputStream());
if (traceLevel >= handshakeThreshold) {
System.out.println("-> " + "HANDSHAKE sendChallengeReply"
@@ -1294,7 +1298,7 @@ public abstract class AbstractConnection extends Thread {
obuf.write1(ChallengeAck);
obuf.write(digest);
- obuf.writeTo(socket.getOutputStream());
+ obuf.writeToAndFlush(socket.getOutputStream());
if (traceLevel >= handshakeThreshold) {
System.out.println("-> " + "HANDSHAKE sendChallengeAck"
@@ -1341,7 +1345,7 @@ public abstract class AbstractConnection extends Thread {
obuf.write1(ChallengeStatus);
obuf.write(status.getBytes());
- obuf.writeTo(socket.getOutputStream());
+ obuf.writeToAndFlush(socket.getOutputStream());
if (traceLevel >= handshakeThreshold) {
System.out.println("-> " + "HANDSHAKE sendStatus" + " status="
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java
index 6f07d8171e..0a33984b31 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java
@@ -64,13 +64,14 @@ import java.net.UnknownHostException;
* instead.
* </p>
*/
-public class AbstractNode {
+public class AbstractNode implements OtpTransportFactory {
static String localHost = null;
String node;
String host;
String alive;
String cookie;
static String defaultCookie = null;
+ final OtpTransportFactory transportFactory;
// Node types
static final int NTYPE_R6 = 110; // 'n' post-r5, all nodes
@@ -146,21 +147,41 @@ public class AbstractNode {
}
}
- protected AbstractNode() {
+ protected AbstractNode(final OtpTransportFactory transportFactory) {
+ this.transportFactory = transportFactory;
}
/**
- * Create a node with the given name and the default cookie.
+ * Create a node with the given name and default cookie and transport
+ * factory.
*/
protected AbstractNode(final String node) {
- this(node, defaultCookie);
+ this(node, defaultCookie, new OtpSocketTransportFactory());
}
/**
- * Create a node with the given name and cookie.
+ * Create a node with the given name, transport factory and the default
+ * cookie.
+ */
+ protected AbstractNode(final String node,
+ final OtpTransportFactory transportFactory) {
+ this(node, defaultCookie, transportFactory);
+ }
+
+ /**
+ * Create a node with the given name, cookie and default transport factory.
*/
protected AbstractNode(final String name, final String cookie) {
+ this(name, cookie, new OtpSocketTransportFactory());
+ }
+
+ /**
+ * Create a node with the given name, cookie and transport factory.
+ */
+ protected AbstractNode(final String name, final String cookie,
+ final OtpTransportFactory transportFactory) {
this.cookie = cookie;
+ this.transportFactory = transportFactory;
final int i = name.indexOf('@', 0);
if (i < 0) {
@@ -268,4 +289,19 @@ public class AbstractNode {
}
return home;
}
+
+ public OtpTransport createTransport(final String addr, final int port)
+ throws IOException {
+ return transportFactory.createTransport(addr, port);
+ }
+
+ public OtpTransport createTransport(final InetAddress addr, final int port)
+ throws IOException {
+ return transportFactory.createTransport(addr, port);
+ }
+
+ public OtpServerTransport createServerTransport(final int port)
+ throws IOException {
+ return transportFactory.createServerTransport(port);
+ }
}
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java
index 2c9b7766bc..af0926f939 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java
@@ -19,7 +19,6 @@
package com.ericsson.otp.erlang;
import java.io.IOException;
-import java.net.Socket;
/**
* Maintains a connection between a Java process and a remote Erlang, Java or C
@@ -63,8 +62,8 @@ public class OtpConnection extends AbstractConnection {
* error
*/
// package scope
- OtpConnection(final OtpSelf self, final Socket s) throws IOException,
- OtpAuthException {
+ OtpConnection(final OtpSelf self, final OtpTransport s)
+ throws IOException, OtpAuthException {
super(self, s);
this.self = self;
queue = new GenericQueue();
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpCookedConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpCookedConnection.java
index 4d80f61d52..b0e3e81fca 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpCookedConnection.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpCookedConnection.java
@@ -19,7 +19,6 @@
package com.ericsson.otp.erlang;
import java.io.IOException;
-import java.net.Socket;
/**
* <p>
@@ -78,8 +77,8 @@ public class OtpCookedConnection extends AbstractConnection {
* error
*/
// package scope
- OtpCookedConnection(final OtpNode self, final Socket s) throws IOException,
- OtpAuthException {
+ OtpCookedConnection(final OtpNode self, final OtpTransport s)
+ throws IOException, OtpAuthException {
super(self, s);
this.self = self;
links = new Links(25);
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java
index 796babee1b..6c7c8fe951 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java
@@ -21,13 +21,12 @@ package com.ericsson.otp.erlang;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.Socket;
/**
* Provides methods for registering, unregistering and looking up nodes with the
* Erlang portmapper daemon (Epmd). For each registered node, Epmd maintains
* information about the port on which incoming connections are accepted, as
- * well as which versions of the Erlang communication protocolt the node
+ * well as which versions of the Erlang communication protocol the node
* supports.
*
* <p>
@@ -136,7 +135,7 @@ public class OtpEpmd {
*/
public static boolean publishPort(final OtpLocalNode node)
throws IOException {
- Socket s = null;
+ OtpTransport s = null;
s = r4_publish(node);
@@ -156,16 +155,16 @@ public class OtpEpmd {
* This method does not report any failures.
*/
public static void unPublishPort(final OtpLocalNode node) {
- Socket s = null;
+ OtpTransport s = null;
try {
- s = new Socket((String) null, EpmdPort.get());
+ s = node.createTransport((String) null, EpmdPort.get());
@SuppressWarnings("resource")
final OtpOutputStream obuf = new OtpOutputStream();
obuf.write2BE(node.alive().length() + 1);
obuf.write1(stopReq);
obuf.writeN(node.alive().getBytes());
- obuf.writeTo(s.getOutputStream());
+ obuf.writeToAndFlush(s.getOutputStream());
// don't even wait for a response (is there one?)
if (traceLevel >= traceThreshold) {
System.out.println("-> UNPUBLISH " + node + " port="
@@ -187,12 +186,12 @@ public class OtpEpmd {
private static int r4_lookupPort(final AbstractNode node)
throws IOException {
int port = 0;
- Socket s = null;
+ OtpTransport s = null;
try {
@SuppressWarnings("resource")
final OtpOutputStream obuf = new OtpOutputStream();
- s = new Socket(node.host(), EpmdPort.get());
+ s = node.createTransport(node.host(), EpmdPort.get());
// build and send epmd request
// length[2], tag[1], alivename[n] (length = n+1)
@@ -201,7 +200,7 @@ public class OtpEpmd {
obuf.writeN(node.alive().getBytes());
// send request
- obuf.writeTo(s.getOutputStream());
+ obuf.writeToAndFlush(s.getOutputStream());
if (traceLevel >= traceThreshold) {
System.out.println("-> LOOKUP (r4) " + node);
@@ -242,7 +241,7 @@ public class OtpEpmd {
System.out.println("<- (no response)");
}
throw new IOException("Nameserver not responding on " + node.host()
- + " when looking up " + node.alive());
+ + " when looking up " + node.alive(), e);
} catch (final OtpErlangDecodeException e) {
if (traceLevel >= traceThreshold) {
System.out.println("<- (invalid response)");
@@ -276,14 +275,14 @@ public class OtpEpmd {
* fatal. If we manage to successfully communicate with an r4 epmd, we
* return either the socket, or null, depending on the result.
*/
- private static Socket r4_publish(final OtpLocalNode node)
+ private static OtpTransport r4_publish(final OtpLocalNode node)
throws IOException {
- Socket s = null;
+ OtpTransport s = null;
try {
@SuppressWarnings("resource")
final OtpOutputStream obuf = new OtpOutputStream();
- s = new Socket((String) null, EpmdPort.get());
+ s = node.createTransport((String) null, EpmdPort.get());
obuf.write2BE(node.alive().length() + 13);
@@ -301,7 +300,7 @@ public class OtpEpmd {
obuf.write2BE(0); // No extra
// send request
- obuf.writeTo(s.getOutputStream());
+ obuf.writeToAndFlush(s.getOutputStream());
if (traceLevel >= traceThreshold) {
System.out.println("-> PUBLISH (r4) " + node + " port="
@@ -356,23 +355,34 @@ public class OtpEpmd {
}
public static String[] lookupNames() throws IOException {
- return lookupNames(InetAddress.getByName(null));
+ return lookupNames(InetAddress.getByName(null),
+ new OtpSocketTransportFactory());
+ }
+
+ public static String[] lookupNames(
+ final OtpTransportFactory transportFactory) throws IOException {
+ return lookupNames(InetAddress.getByName(null), transportFactory);
}
public static String[] lookupNames(final InetAddress address)
throws IOException {
- Socket s = null;
+ return lookupNames(address, new OtpSocketTransportFactory());
+ }
+
+ public static String[] lookupNames(final InetAddress address,
+ final OtpTransportFactory transportFactory) throws IOException {
+ OtpTransport s = null;
try {
@SuppressWarnings("resource")
final OtpOutputStream obuf = new OtpOutputStream();
try {
- s = new Socket(address, EpmdPort.get());
+ s = transportFactory.createTransport(address, EpmdPort.get());
obuf.write2BE(1);
obuf.write1(names4req);
// send request
- obuf.writeTo(s.getOutputStream());
+ obuf.writeToAndFlush(s.getOutputStream());
if (traceLevel >= traceThreshold) {
System.out.println("-> NAMES (r4) ");
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpLocalNode.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpLocalNode.java
index b996ba6f6c..dd1d299297 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpLocalNode.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpLocalNode.java
@@ -29,12 +29,7 @@ public class OtpLocalNode extends AbstractNode {
private int refId[];
protected int port;
- protected java.net.Socket epmd;
-
- protected OtpLocalNode() {
- super();
- init();
- }
+ protected OtpTransport epmd;
/**
* Create a node with the given name and the default cookie.
@@ -45,6 +40,16 @@ public class OtpLocalNode extends AbstractNode {
}
/**
+ * Create a node with the given name, transport factory and the default
+ * cookie.
+ */
+ protected OtpLocalNode(final String node,
+ final OtpTransportFactory transportFactory) {
+ super(node, transportFactory);
+ init();
+ }
+
+ /**
* Create a node with the given name and cookie.
*/
protected OtpLocalNode(final String node, final String cookie) {
@@ -52,6 +57,15 @@ public class OtpLocalNode extends AbstractNode {
init();
}
+ /**
+ * Create a node with the given name, cookie and transport factory.
+ */
+ protected OtpLocalNode(final String node, final String cookie,
+ final OtpTransportFactory transportFactory) {
+ super(node, cookie, transportFactory);
+ init();
+ }
+
private void init() {
serial = 0;
pidCount = 1;
@@ -77,7 +91,7 @@ public class OtpLocalNode extends AbstractNode {
* @param s
* The socket connecting this node to Epmd.
*/
- protected void setEpmd(final java.net.Socket s) {
+ protected void setEpmd(final OtpTransport s) {
epmd = s;
}
@@ -86,7 +100,7 @@ public class OtpLocalNode extends AbstractNode {
*
* @return The socket connecting this node to Epmd.
*/
- protected java.net.Socket getEpmd() {
+ protected OtpTransport getEpmd() {
return epmd;
}
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java
index d5edd135cf..7512d34c21 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java
@@ -20,8 +20,6 @@ package com.ericsson.otp.erlang;
import java.io.IOException;
import java.lang.ref.WeakReference;
-import java.net.ServerSocket;
-import java.net.Socket;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Hashtable;
@@ -97,7 +95,39 @@ public class OtpNode extends OtpLocalNode {
*
*/
public OtpNode(final String node) throws IOException {
- this(node, defaultCookie, 0);
+ super(node);
+
+ init(0);
+ }
+
+ /**
+ * <p>
+ * Create a node using the default cookie. The default cookie is found by
+ * reading the first line of the .erlang.cookie file in the user's home
+ * directory. The home directory is obtained from the System property
+ * "user.home".
+ * </p>
+ *
+ * <p>
+ * If the file does not exist, an empty string is used. This method makes no
+ * attempt to create the file.
+ * </p>
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param transportFactory
+ * the transport factory to use when creating connections.
+ *
+ * @exception IOException
+ * if communication could not be initialized.
+ *
+ */
+ public OtpNode(final String node,
+ final OtpTransportFactory transportFactory) throws IOException {
+ super(node, transportFactory);
+
+ init(0);
}
/**
@@ -128,6 +158,28 @@ public class OtpNode extends OtpLocalNode {
* the authorization cookie that will be used by this node when
* it communicates with other nodes.
*
+ * @param transportFactory
+ * the transport factory to use when creating connections.
+ *
+ * @exception IOException
+ * if communication could not be initialized.
+ *
+ */
+ public OtpNode(final String node, final String cookie,
+ final OtpTransportFactory transportFactory) throws IOException {
+ this(node, cookie, 0, transportFactory);
+ }
+
+ /**
+ * Create a node.
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param cookie
+ * the authorization cookie that will be used by this node when
+ * it communicates with other nodes.
+ *
* @param port
* the port number you wish to use for incoming connections.
* Specifying 0 lets the system choose an available port.
@@ -143,6 +195,34 @@ public class OtpNode extends OtpLocalNode {
init(port);
}
+ /**
+ * Create a node.
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param cookie
+ * the authorization cookie that will be used by this node when
+ * it communicates with other nodes.
+ *
+ * @param port
+ * the port number you wish to use for incoming connections.
+ * Specifying 0 lets the system choose an available port.
+ *
+ * @param transportFactory
+ * the transport factory to use when creating connections.
+ *
+ * @exception IOException
+ * if communication could not be initialized.
+ *
+ */
+ public OtpNode(final String node, final String cookie, final int port,
+ final OtpTransportFactory transportFactory) throws IOException {
+ super(node, cookie, transportFactory);
+
+ init(port);
+ }
+
private synchronized void init(final int aport) throws IOException {
if (!initDone) {
connections = new Hashtable<String, OtpCookedConnection>(17,
@@ -681,12 +761,12 @@ public class OtpNode extends OtpLocalNode {
* this thread simply listens for incoming connections
*/
public class Acceptor extends Thread {
- private final ServerSocket sock;
+ private final OtpServerTransport sock;
private final int acceptorPort;
private volatile boolean done = false;
Acceptor(final int port) throws IOException {
- sock = new ServerSocket(port);
+ sock = createServerTransport(port);
acceptorPort = sock.getLocalPort();
OtpNode.this.port = acceptorPort;
@@ -720,7 +800,7 @@ public class OtpNode extends OtpLocalNode {
localStatus(node, false, null);
}
- private void closeSock(final ServerSocket s) {
+ private void closeSock(final OtpServerTransport s) {
try {
if (s != null) {
s.close();
@@ -729,7 +809,7 @@ public class OtpNode extends OtpLocalNode {
}
}
- private void closeSock(final Socket s) {
+ private void closeSock(final OtpTransport s) {
try {
if (s != null) {
s.close();
@@ -744,7 +824,7 @@ public class OtpNode extends OtpLocalNode {
@Override
public void run() {
- Socket newsock = null;
+ OtpTransport newsock = null;
OtpCookedConnection conn = null;
localStatus(node, true, null);
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpOutputStream.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpOutputStream.java
index b8493b57ff..2ec583ff5c 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpOutputStream.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpOutputStream.java
@@ -21,6 +21,7 @@ package com.ericsson.otp.erlang;
// import java.io.OutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -202,6 +203,16 @@ public class OtpOutputStream extends ByteArrayOutputStream {
super.count += len;
}
+ @Override
+ public synchronized void writeTo(OutputStream out) throws IOException {
+ super.writeTo(out);
+ }
+
+ public synchronized void writeToAndFlush(OutputStream out) throws IOException {
+ super.writeTo(out);
+ out.flush();
+ }
+
/**
* Write the low byte of a value to the stream.
*
@@ -887,7 +898,7 @@ public class OtpOutputStream extends ByteArrayOutputStream {
if (oos.size() < 5) {
// fast path for small terms
try {
- oos.writeTo(this);
+ oos.writeToAndFlush(this);
// if the term is written as a compressed term, the output
// stream is closed, so we do this here, too
close();
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpPeer.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpPeer.java
index 2c79c04247..cb09b40f47 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpPeer.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpPeer.java
@@ -32,8 +32,8 @@ public class OtpPeer extends AbstractNode {
* common protocol version we both support
*/
- OtpPeer() {
- super();
+ OtpPeer(final OtpTransportFactory transportFactory) {
+ super(transportFactory);
}
/**
@@ -47,6 +47,19 @@ public class OtpPeer extends AbstractNode {
}
/**
+ * Create a peer node with custom transport factory.
+ *
+ * @param node
+ * the name of the node.
+ * @param transportFactory
+ * custom transport factory
+ */
+ public OtpPeer(final String node, final OtpTransportFactory
+ transportFactory) {
+ super(node, transportFactory);
+ }
+
+ /**
* Create a connection to a remote node.
*
* @param self
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSelf.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSelf.java
index 166dac5701..5b9d13ad81 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSelf.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSelf.java
@@ -19,8 +19,6 @@
package com.ericsson.otp.erlang;
import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
import java.net.UnknownHostException;
/**
@@ -48,7 +46,7 @@ import java.net.UnknownHostException;
*
*/
public class OtpSelf extends OtpLocalNode {
- private final ServerSocket sock;
+ private final OtpServerTransport sock;
private final OtpErlangPid pid;
/**
@@ -67,12 +65,43 @@ public class OtpSelf extends OtpLocalNode {
* @param node
* the name of this node.
*
+ * @exception IOException
+ * in case of server transport failure
+ *
*/
public OtpSelf(final String node) throws IOException {
this(node, defaultCookie, 0);
}
/**
+ * <p>
+ * Create a self node using the default cookie and custom transport factory.
+ * The default cookie is found by reading the first line of the
+ * .erlang.cookie file in the user's home directory. The home directory is
+ * obtained from the System property "user.home".
+ * </p>
+ *
+ * <p>
+ * If the file does not exist, an empty string is used. This method makes no
+ * attempt to create the file.
+ * </p>
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param transportFactory
+ * the transport factory to use when creating connections.
+ *
+ * @exception IOException
+ * in case of server transport failure
+ *
+ */
+ public OtpSelf(final String node,
+ final OtpTransportFactory transportFactory) throws IOException {
+ this(node, defaultCookie, 0, transportFactory);
+ }
+
+ /**
* Create a self node.
*
* @param node
@@ -81,16 +110,95 @@ public class OtpSelf extends OtpLocalNode {
* @param cookie
* the authorization cookie that will be used by this node when
* it communicates with other nodes.
+ *
+ * @exception IOException
+ * in case of server transport failure
*/
public OtpSelf(final String node, final String cookie) throws IOException {
this(node, cookie, 0);
}
+ /**
+ * Create a self node.
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param cookie
+ * the authorization cookie that will be used by this node when
+ * it communicates with other nodes.
+ *
+ * @param transportFactory
+ * the transport factory to use when creating connections.
+ *
+ * @exception IOException
+ * in case of server transport failure
+ */
+ public OtpSelf(final String node, final String cookie,
+ final OtpTransportFactory transportFactory) throws IOException {
+ this(node, cookie, 0, transportFactory);
+ }
+
+ /**
+ * Create a self node.
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param cookie
+ * the authorization cookie that will be used by this node when
+ * it communicates with other nodes.
+ *
+ * @param port
+ * the port number you wish to use for incoming connections.
+ * Specifying 0 lets the system choose an available port.
+ *
+ * @param transportFactory
+ * the transport factory to use when creating connections.
+ *
+ * @exception IOException
+ * in case of server transport failure
+ */
public OtpSelf(final String node, final String cookie, final int port)
throws IOException {
super(node, cookie);
- sock = new ServerSocket(port);
+ sock = createServerTransport(port);
+
+ if (port != 0) {
+ this.port = port;
+ } else {
+ this.port = sock.getLocalPort();
+ }
+
+ pid = createPid();
+ }
+
+ /**
+ * Create a self node.
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param cookie
+ * the authorization cookie that will be used by this node when
+ * it communicates with other nodes.
+ *
+ * @param port
+ * the port number you wish to use for incoming connections.
+ * Specifying 0 lets the system choose an available port.
+ *
+ * @param transportFactory
+ * the transport factory to use when creating connections.
+ *
+ * @exception IOException
+ * in case of server transport failure
+ */
+ public OtpSelf(final String node, final String cookie, final int port,
+ final OtpTransportFactory transportFactory) throws IOException {
+ super(node, cookie, transportFactory);
+
+ sock = createServerTransport(port);
if (port != 0) {
this.port = port;
@@ -179,7 +287,7 @@ public class OtpSelf extends OtpLocalNode {
* authorized to connect.
*/
public OtpConnection accept() throws IOException, OtpAuthException {
- Socket newsock = null;
+ OtpTransport newsock = null;
while (true) {
try {
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpServerSocketTransport.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpServerSocketTransport.java
new file mode 100644
index 0000000000..0e25b6bfb7
--- /dev/null
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpServerSocketTransport.java
@@ -0,0 +1,68 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2015. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+package com.ericsson.otp.erlang;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+/**
+ * Default socket-based server transport
+ *
+ * @author Dmitriy Kargapolov
+ */
+public class OtpServerSocketTransport implements OtpServerTransport {
+
+ /**
+ * Underlying server socket
+ */
+ private final ServerSocket socket;
+
+ /**
+ * @see ServerSocket#ServerSocket(int)
+ */
+ public OtpServerSocketTransport(final int port) throws IOException {
+ socket = new ServerSocket(port);
+ }
+
+ /**
+ * @see ServerSocket#getLocalPort()
+ */
+ public int getLocalPort() {
+ return socket.getLocalPort();
+ }
+
+ /**
+ * @see ServerSocket#accept()
+ */
+ public OtpTransport accept() throws IOException {
+ final Socket sock = socket.accept();
+ sock.setTcpNoDelay(true);
+ return new OtpSocketTransport(sock);
+ }
+
+ /**
+ * @see ServerSocket#close()
+ */
+ public void close() throws IOException {
+ socket.close();
+ }
+
+}
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpServerTransport.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpServerTransport.java
new file mode 100644
index 0000000000..4d31380bee
--- /dev/null
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpServerTransport.java
@@ -0,0 +1,46 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2015. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+package com.ericsson.otp.erlang;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+/**
+ * Server-side connection-oriented transport interface.
+ *
+ * @author Dmitriy Kargapolov
+ */
+public interface OtpServerTransport {
+
+ /**
+ * @see ServerSocket#getLocalPort()
+ */
+ int getLocalPort();
+
+ /**
+ * @see ServerSocket#accept()
+ */
+ OtpTransport accept() throws IOException;
+
+ /**
+ * @see ServerSocket#close()
+ */
+ void close() throws IOException;
+}
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSocketTransport.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSocketTransport.java
new file mode 100644
index 0000000000..f690ab59ed
--- /dev/null
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSocketTransport.java
@@ -0,0 +1,89 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2015. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+package com.ericsson.otp.erlang;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+/**
+ * Default socket-based client transport
+ *
+ * @author Dmitriy Kargapolov
+ */
+public class OtpSocketTransport implements OtpTransport {
+
+ /**
+ * Underlying socket
+ */
+ private final Socket socket;
+
+ /**
+ * @see Socket#Socket(String, int)
+ */
+ public OtpSocketTransport(final String addr, final int port)
+ throws UnknownHostException, IOException {
+ socket = new Socket(addr, port);
+ socket.setTcpNoDelay(true);
+ }
+
+ /**
+ * @see Socket#Socket(InetAddress, int)
+ */
+ public OtpSocketTransport(final InetAddress addr, final int port)
+ throws UnknownHostException, IOException {
+ socket = new Socket(addr, port);
+ socket.setTcpNoDelay(true);
+ }
+
+ /**
+ * Socket wrapping constructor
+ *
+ * @param s
+ * socket to wrap
+ */
+ public OtpSocketTransport(final Socket s) {
+ socket = s;
+ }
+
+ /**
+ * @see Socket#getInputStream()
+ */
+ public InputStream getInputStream() throws IOException {
+ return socket.getInputStream();
+ }
+
+ /**
+ * @see Socket#getOutputStream()
+ */
+ public OutputStream getOutputStream() throws IOException {
+ return socket.getOutputStream();
+ }
+
+ /**
+ * @see Socket#close()
+ */
+ public void close() throws IOException {
+ socket.close();
+ }
+}
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSocketTransportFactory.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSocketTransportFactory.java
new file mode 100644
index 0000000000..f6b5bfc86d
--- /dev/null
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpSocketTransportFactory.java
@@ -0,0 +1,56 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2015. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+package com.ericsson.otp.erlang;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+/**
+ * Default socket-based transport factory
+ *
+ * @author Dmitriy Kargapolov
+ */
+public class OtpSocketTransportFactory implements OtpTransportFactory {
+
+ /**
+ * @see OtpTransportFactory#createTransport(String, int)
+ */
+ public OtpTransport createTransport(final String addr, final int port)
+ throws IOException {
+ return new OtpSocketTransport(addr, port);
+ }
+
+ /**
+ * @see OtpTransportFactory#createTransport(InetAddress, int)
+ */
+ public OtpTransport createTransport(final InetAddress addr, final int port)
+ throws IOException {
+ return new OtpSocketTransport(addr, port);
+ }
+
+ /**
+ * @see OtpTransportFactory#createServerTransport(int)
+ */
+ public OtpServerTransport createServerTransport(final int port)
+ throws IOException {
+ return new OtpServerSocketTransport(port);
+ }
+
+}
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpTransport.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpTransport.java
new file mode 100644
index 0000000000..51c62d9ef0
--- /dev/null
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpTransport.java
@@ -0,0 +1,49 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2015. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+package com.ericsson.otp.erlang;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+/**
+ * Client-side connection-oriented transport interface.
+ *
+ * @author Dmitriy Kargapolov
+ */
+public interface OtpTransport {
+
+ /**
+ * @see Socket#getInputStream()
+ */
+ public abstract InputStream getInputStream() throws IOException;
+
+ /**
+ * @see Socket#getOutputStream()
+ */
+ public abstract OutputStream getOutputStream() throws IOException;
+
+ /**
+ * @see Socket#close()
+ */
+ public abstract void close() throws IOException;
+
+}
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpTransportFactory.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpTransportFactory.java
new file mode 100644
index 0000000000..bd404daea5
--- /dev/null
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpTransportFactory.java
@@ -0,0 +1,124 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2015. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+package com.ericsson.otp.erlang;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+/**
+ * Factory class used to create client- and server-side transport instances. One
+ * static instance of class implementing this interface is created when program
+ * loaded. Default implementation used is {@link OtpSocketTransportFactory}.
+ * JInterface user can specify custom transport factory implementing this
+ * interface in the following ways:
+ * <dl>
+ * <dt>defining static class as internal to class holding main() method</dt>
+ * <dd>In the systems, where main class can be retrieved with
+ * <code>System.getProperty("sun.java.command")</code>, user can define static
+ * class <b>OtpErlangSystemTuner</b> internal to the main class, providing at
+ * least one static method with the name <b>getOtpTransportFactory</b>, with no
+ * parameters, returning object of class implementing
+ * <b>OtpTransportFactory</b>, for example:
+ *
+ * <pre>
+ *
+ * public class MyMainClass {
+ *
+ * public static class OtpErlangSystemTuner {
+ * ...
+ * public static OtpTransportFactory getOtpTransportFactory() {
+ * return new MyTransportFactory();
+ * }
+ * }
+ *
+ * public static class MyTransportFactory implements OtpTransportFactory {
+ * ...
+ * }
+ *
+ * public static void main(String[] args) {
+ * ...
+ * }
+ * }
+ *
+ *
+ * </pre>
+ *
+ * </dd>
+ *
+ * <dt>specifying factory class in the system properties</dt>
+ * <dd>User-defined transport factory class may be specified via system property
+ * <b>OtpTransportFactory</b>, for example:
+ *
+ * <pre>
+ *
+ * package com.my.company;
+ *
+ * public static class MyTransportFactory implements OtpTransportFactory {
+ * ...
+ * }
+ * </pre>
+ *
+ * In such case program may be run with
+ * -DOtpTransportFactory=com.my.company.MyTransportFactory, or other way of
+ * setting system property <i>before execution of static initializers</i> may be
+ * used.</dd>
+ * </dl>
+ *
+ * @author Dmitriy Kargapolov
+ */
+public interface OtpTransportFactory {
+
+ /**
+ * Create instance of {@link OtpTransport}
+ *
+ * @param addr
+ * host name or IP address string
+ * @param port
+ * port number
+ * @return new socket object
+ * @throws IOException
+ */
+ public abstract OtpTransport createTransport(String addr, int port)
+ throws IOException;
+
+ /**
+ * Create instance of {@link OtpTransport}
+ *
+ * @param addr
+ * peer address
+ * @param port
+ * port number
+ * @return new socket object
+ * @throws IOException
+ */
+ public abstract OtpTransport createTransport(InetAddress addr, int port)
+ throws IOException;
+
+ /**
+ * Create instance of {@link OtpServerTransport}
+ *
+ * @param port
+ * port number to listen on
+ * @return new socket object
+ * @throws IOException
+ */
+ public OtpServerTransport createServerTransport(int port)
+ throws IOException;
+}
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/java_files b/lib/jinterface/java_src/com/ericsson/otp/erlang/java_files
index 62fa7f990e..a0f19bc1aa 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/java_files
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/java_files
@@ -53,7 +53,13 @@ COMM = \
OtpOutputStream \
OtpPeer \
OtpSelf \
- OtpServer
+ OtpServer \
+ OtpServerSocketTransport \
+ OtpServerTransport \
+ OtpSocketTransport \
+ OtpSocketTransportFactory \
+ OtpTransport \
+ OtpTransportFactory
ERL = \
OtpErlangAtom \