aboutsummaryrefslogtreecommitdiffstats
path: root/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java
diff options
context:
space:
mode:
authorVlad Dumitrescu <[email protected]>2014-11-13 10:28:08 +0100
committerMarcus Arendt <[email protected]>2014-11-25 12:12:10 +0100
commit26732f2fdd4720deee653ab96cd453ed65dcaa87 (patch)
tree6b4ed8080ce4ca92af211955da635628d02a1793 /lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java
parentbb31ca7d6cf2d08d1efe2ba3bce37c9f8d8634bc (diff)
downloadotp-26732f2fdd4720deee653ab96cd453ed65dcaa87.tar.gz
otp-26732f2fdd4720deee653ab96cd453ed65dcaa87.tar.bz2
otp-26732f2fdd4720deee653ab96cd453ed65dcaa87.zip
[jinterface] cleanup code according to new style
Diffstat (limited to 'lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java')
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java1994
1 files changed, 1001 insertions, 993 deletions
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java
index b8a973753a..1b0fe3e2e6 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java
@@ -1,19 +1,19 @@
/*
* %CopyrightBegin%
- *
+ *
* Copyright Ericsson AB 2000-2010. All Rights Reserved.
- *
+ *
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
- *
+ *
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
- *
+ *
* %CopyrightEnd%
*/
@@ -27,26 +27,26 @@ import java.util.Random;
* Maintains a connection between a Java process and a remote Erlang, Java or C
* node. The object maintains connection state and allows data to be sent to and
* received from the peer.
- *
+ *
* <p>
* This abstract class provides the neccesary methods to maintain the actual
* connection and encode the messages and headers in the proper format according
* to the Erlang distribution protocol. Subclasses can use these methods to
* provide a more or less transparent communication channel as desired.
* </p>
- *
+ *
* <p>
* Note that no receive methods are provided. Subclasses must provide methods
* for message delivery, and may implement their own receive methods.
* <p>
- *
+ *
* <p>
* If an exception occurs in any of the methods in this class, the connection
* will be closed and must be reopened in order to resume communication with the
* peer. This will be indicated to the subclass by passing the exception to its
* delivery() method.
* </p>
- *
+ *
* <p>
* The System property OtpConnection.trace can be used to change the initial
* trace level setting for all connections. Normally the initial trace level is
@@ -106,104 +106,104 @@ public abstract class AbstractConnection extends Thread {
private int flags = 0;
static {
- // trace this connection?
- final String trace = System.getProperties().getProperty(
- "OtpConnection.trace");
- try {
- if (trace != null) {
- defaultLevel = Integer.valueOf(trace).intValue();
- }
- } catch (final NumberFormatException e) {
- defaultLevel = 0;
- }
- random = new Random();
+ // trace this connection?
+ final String trace = System.getProperties().getProperty(
+ "OtpConnection.trace");
+ try {
+ if (trace != null) {
+ defaultLevel = Integer.valueOf(trace).intValue();
+ }
+ } catch (final NumberFormatException e) {
+ defaultLevel = 0;
+ }
+ random = new Random();
}
// private AbstractConnection() {
// }
/**
- * Accept an incoming connection from a remote node. Used by {@link
- * OtpSelf#accept() OtpSelf.accept()} to create a connection based on data
- * received when handshaking with the peer node, when the remote node is the
- * connection intitiator.
- *
- * @exception java.io.IOException if it was not possible to connect to the
- * peer.
- *
- * @exception OtpAuthException if handshake resulted in an authentication
- * error
+ * Accept an incoming connection from a remote node. Used by
+ * {@link OtpSelf#accept() OtpSelf.accept()} to create a connection based on
+ * data received when handshaking with the peer node, when the remote node
+ * is the connection intitiator.
+ *
+ * @exception java.io.IOException
+ * if it was not possible to connect to the peer.
+ *
+ * @exception OtpAuthException
+ * if handshake resulted in an authentication error
*/
protected AbstractConnection(final OtpLocalNode self, final Socket s)
- throws IOException, OtpAuthException {
- this.localNode = self;
- peer = new OtpPeer();
- socket = s;
-
- socket.setTcpNoDelay(true);
-
- traceLevel = defaultLevel;
- setDaemon(true);
-
- if (traceLevel >= handshakeThreshold) {
- System.out.println("<- ACCEPT FROM " + s.getInetAddress() + ":"
- + s.getPort());
- }
-
- // get his info
- recvName(peer);
-
- // now find highest common dist value
- if (peer.proto != self.proto || self.distHigh < peer.distLow
- || self.distLow > peer.distHigh) {
- close();
- throw new IOException(
- "No common protocol found - cannot accept connection");
- }
- // highest common version: min(peer.distHigh, self.distHigh)
- peer.distChoose = peer.distHigh > self.distHigh ? self.distHigh
- : peer.distHigh;
-
- doAccept();
- name = peer.node();
+ throws IOException, OtpAuthException {
+ localNode = self;
+ peer = new OtpPeer();
+ socket = s;
+
+ socket.setTcpNoDelay(true);
+
+ traceLevel = defaultLevel;
+ setDaemon(true);
+
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("<- ACCEPT FROM " + s.getInetAddress() + ":"
+ + s.getPort());
+ }
+
+ // get his info
+ recvName(peer);
+
+ // now find highest common dist value
+ if (peer.proto != self.proto || self.distHigh < peer.distLow
+ || self.distLow > peer.distHigh) {
+ close();
+ throw new IOException(
+ "No common protocol found - cannot accept connection");
+ }
+ // highest common version: min(peer.distHigh, self.distHigh)
+ peer.distChoose = peer.distHigh > self.distHigh ? self.distHigh
+ : peer.distHigh;
+
+ doAccept();
+ name = peer.node();
}
/**
* Intiate and open a connection to a remote node.
- *
- * @exception java.io.IOException if it was not possible to connect to the
- * peer.
- *
- * @exception OtpAuthException if handshake resulted in an authentication
- * error.
+ *
+ * @exception java.io.IOException
+ * if it was not possible to connect to the peer.
+ *
+ * @exception OtpAuthException
+ * if handshake resulted in an authentication error.
*/
protected AbstractConnection(final OtpLocalNode self, final OtpPeer other)
- throws IOException, OtpAuthException {
- peer = other;
- this.localNode = self;
- socket = null;
- int port;
+ throws IOException, OtpAuthException {
+ peer = other;
+ localNode = self;
+ socket = null;
+ int port;
- traceLevel = defaultLevel;
- setDaemon(true);
+ traceLevel = defaultLevel;
+ setDaemon(true);
- // now get a connection between the two...
- port = OtpEpmd.lookupPort(peer);
+ // now get a connection between the two...
+ port = OtpEpmd.lookupPort(peer);
- // now find highest common dist value
- if (peer.proto != self.proto || self.distHigh < peer.distLow
- || self.distLow > peer.distHigh) {
- throw new IOException("No common protocol found - cannot connect");
- }
+ // now find highest common dist value
+ if (peer.proto != self.proto || self.distHigh < peer.distLow
+ || self.distLow > peer.distHigh) {
+ throw new IOException("No common protocol found - cannot connect");
+ }
- // highest common version: min(peer.distHigh, self.distHigh)
- peer.distChoose = peer.distHigh > self.distHigh ? self.distHigh
- : peer.distHigh;
+ // highest common version: min(peer.distHigh, self.distHigh)
+ peer.distChoose = peer.distHigh > self.distHigh ? self.distHigh
+ : peer.distHigh;
- doConnect(port);
+ doConnect(port);
- name = peer.node();
- connected = true;
+ name = peer.node();
+ connected = true;
}
/**
@@ -218,91 +218,91 @@ public abstract class AbstractConnection extends Thread {
/**
* Send a pre-encoded message to a named process on a remote node.
- *
+ *
* @param dest
* the name of the remote process.
* @param payload
* the encoded message to send.
- *
+ *
* @exception java.io.IOException
* if the connection is not active or a communication error
* occurs.
*/
protected void sendBuf(final OtpErlangPid from, final String dest,
- final OtpOutputStream payload) throws IOException {
- if (!connected) {
- throw new IOException("Not connected");
- }
- @SuppressWarnings("resource")
- final OtpOutputStream header = new OtpOutputStream(headerLen);
-
- // preamble: 4 byte length + "passthrough" tag + version
- header.write4BE(0); // reserve space for length
- header.write1(passThrough);
- header.write1(version);
-
- // header info
- header.write_tuple_head(4);
- header.write_long(regSendTag);
- header.write_any(from);
- if (sendCookie) {
- header.write_atom(localNode.cookie());
- } else {
- header.write_atom("");
- }
- header.write_atom(dest);
-
- // version for payload
- header.write1(version);
-
- // fix up length in preamble
- header.poke4BE(0, header.size() + payload.size() - 4);
-
- do_send(header, payload);
+ final OtpOutputStream payload) throws IOException {
+ if (!connected) {
+ throw new IOException("Not connected");
+ }
+ @SuppressWarnings("resource")
+ final OtpOutputStream header = new OtpOutputStream(headerLen);
+
+ // preamble: 4 byte length + "passthrough" tag + version
+ header.write4BE(0); // reserve space for length
+ header.write1(passThrough);
+ header.write1(version);
+
+ // header info
+ header.write_tuple_head(4);
+ header.write_long(regSendTag);
+ header.write_any(from);
+ if (sendCookie) {
+ header.write_atom(localNode.cookie());
+ } else {
+ header.write_atom("");
+ }
+ header.write_atom(dest);
+
+ // version for payload
+ header.write1(version);
+
+ // fix up length in preamble
+ header.poke4BE(0, header.size() + payload.size() - 4);
+
+ do_send(header, payload);
}
/**
* Send a pre-encoded message to a process on a remote node.
- *
+ *
* @param dest
* the Erlang PID of the remote process.
* @param payload
* the encoded message to send.
- *
+ *
* @exception java.io.IOException
* if the connection is not active or a communication error
* occurs.
*/
protected void sendBuf(final OtpErlangPid from, final OtpErlangPid dest,
- final OtpOutputStream payload) throws IOException {
- if (!connected) {
- throw new IOException("Not connected");
- }
- @SuppressWarnings("resource")
- final OtpOutputStream header = new OtpOutputStream(headerLen);
-
- // preamble: 4 byte length + "passthrough" tag + version
- header.write4BE(0); // reserve space for length
- header.write1(passThrough);
- header.write1(version);
-
- // header info
- header.write_tuple_head(3);
- header.write_long(sendTag);
- if (sendCookie) {
- header.write_atom(localNode.cookie());
- } else {
- header.write_atom("");
- }
- header.write_any(dest);
-
- // version for payload
- header.write1(version);
-
- // fix up length in preamble
- header.poke4BE(0, header.size() + payload.size() - 4);
-
- do_send(header, payload);
+ final OtpOutputStream payload) throws IOException {
+ if (!connected) {
+ throw new IOException("Not connected");
+ }
+ @SuppressWarnings("resource")
+ final OtpOutputStream header = new OtpOutputStream(headerLen);
+
+ // preamble: 4 byte length + "passthrough" tag + version
+ header.write4BE(0); // reserve space for length
+ header.write1(passThrough);
+ header.write1(version);
+
+ // header info
+ header.write_tuple_head(3);
+ header.write_long(sendTag);
+ if (sendCookie) {
+ header.write_atom(localNode.cookie());
+ } else {
+ header.write_atom("");
+ }
+ header.write_any(dest);
+
+ // version for payload
+ header.write1(version);
+
+ // fix up length in preamble
+ header.poke4BE(0, header.size() + payload.size() - 4);
+
+ do_send(header, payload);
}
/*
@@ -311,60 +311,60 @@ public abstract class AbstractConnection extends Thread {
* otherwise
*/
private void cookieError(final OtpLocalNode local,
- final OtpErlangAtom cookie) throws OtpAuthException {
- try {
- @SuppressWarnings("resource")
- final OtpOutputStream header = new OtpOutputStream(headerLen);
-
- // preamble: 4 byte length + "passthrough" tag + version
- header.write4BE(0); // reserve space for length
- header.write1(passThrough);
- header.write1(version);
-
- header.write_tuple_head(4);
- header.write_long(regSendTag);
- header.write_any(local.createPid()); // disposable pid
- header.write_atom(cookie.atomValue()); // important: his cookie,
- // not mine...
- header.write_atom("auth");
-
- // version for payload
- header.write1(version);
-
- // the payload
-
- // the no_auth message (copied from Erlang) Don't change this
- // (Erlang will crash)
- // {$gen_cast, {print, "~n** Unauthorized cookie ~w **~n",
- // [foo@aule]}}
- final OtpErlangObject[] msg = new OtpErlangObject[2];
- final OtpErlangObject[] msgbody = new OtpErlangObject[3];
-
- msgbody[0] = new OtpErlangAtom("print");
- msgbody[1] = new OtpErlangString("~n** Bad cookie sent to " + local
- + " **~n");
- // Erlang will crash and burn if there is no third argument here...
- msgbody[2] = new OtpErlangList(); // empty list
-
- msg[0] = new OtpErlangAtom("$gen_cast");
- msg[1] = new OtpErlangTuple(msgbody);
-
- @SuppressWarnings("resource")
- final OtpOutputStream payload = new OtpOutputStream(
- new OtpErlangTuple(msg));
-
- // fix up length in preamble
- header.poke4BE(0, header.size() + payload.size() - 4);
-
- try {
- do_send(header, payload);
- } catch (final IOException e) {
- } // ignore
- } finally {
- close();
- }
- throw new OtpAuthException("Remote cookie not authorized: "
- + cookie.atomValue());
+ final OtpErlangAtom cookie) throws OtpAuthException {
+ try {
+ @SuppressWarnings("resource")
+ final OtpOutputStream header = new OtpOutputStream(headerLen);
+
+ // preamble: 4 byte length + "passthrough" tag + version
+ header.write4BE(0); // reserve space for length
+ header.write1(passThrough);
+ header.write1(version);
+
+ header.write_tuple_head(4);
+ header.write_long(regSendTag);
+ header.write_any(local.createPid()); // disposable pid
+ header.write_atom(cookie.atomValue()); // important: his cookie,
+ // not mine...
+ header.write_atom("auth");
+
+ // version for payload
+ header.write1(version);
+
+ // the payload
+
+ // the no_auth message (copied from Erlang) Don't change this
+ // (Erlang will crash)
+ // {$gen_cast, {print, "~n** Unauthorized cookie ~w **~n",
+ // [foo@aule]}}
+ final OtpErlangObject[] msg = new OtpErlangObject[2];
+ final OtpErlangObject[] msgbody = new OtpErlangObject[3];
+
+ msgbody[0] = new OtpErlangAtom("print");
+ msgbody[1] = new OtpErlangString("~n** Bad cookie sent to " + local
+ + " **~n");
+ // Erlang will crash and burn if there is no third argument here...
+ msgbody[2] = new OtpErlangList(); // empty list
+
+ msg[0] = new OtpErlangAtom("$gen_cast");
+ msg[1] = new OtpErlangTuple(msgbody);
+
+ @SuppressWarnings("resource")
+ final OtpOutputStream payload = new OtpOutputStream(
+ new OtpErlangTuple(msg));
+
+ // fix up length in preamble
+ header.poke4BE(0, header.size() + payload.size() - 4);
+
+ try {
+ do_send(header, payload);
+ } catch (final IOException e) {
+ } // ignore
+ } finally {
+ close();
+ }
+ throw new OtpAuthException("Remote cookie not authorized: "
+ + cookie.atomValue());
}
// link to pid
@@ -374,364 +374,364 @@ public abstract class AbstractConnection extends Thread {
* remote node. If the link is still active when the remote process
* terminates, an exit signal will be sent to this connection. Use
* {@link #sendUnlink unlink()} to remove the link.
- *
+ *
* @param dest
* the Erlang PID of the remote process.
- *
+ *
* @exception java.io.IOException
* if the connection is not active or a communication error
* occurs.
*/
protected void sendLink(final OtpErlangPid from, final OtpErlangPid dest)
- throws IOException {
- if (!connected) {
- throw new IOException("Not connected");
- }
- @SuppressWarnings("resource")
- final OtpOutputStream header = new OtpOutputStream(headerLen);
+ throws IOException {
+ if (!connected) {
+ throw new IOException("Not connected");
+ }
+ @SuppressWarnings("resource")
+ final OtpOutputStream header = new OtpOutputStream(headerLen);
- // preamble: 4 byte length + "passthrough" tag
- header.write4BE(0); // reserve space for length
- header.write1(passThrough);
- header.write1(version);
+ // preamble: 4 byte length + "passthrough" tag
+ header.write4BE(0); // reserve space for length
+ header.write1(passThrough);
+ header.write1(version);
- // header
- header.write_tuple_head(3);
- header.write_long(linkTag);
- header.write_any(from);
- header.write_any(dest);
+ // header
+ header.write_tuple_head(3);
+ header.write_long(linkTag);
+ header.write_any(from);
+ header.write_any(dest);
- // fix up length in preamble
- header.poke4BE(0, header.size() - 4);
+ // fix up length in preamble
+ header.poke4BE(0, header.size() - 4);
- do_send(header);
+ do_send(header);
}
/**
* Remove a link between the local node and the specified process on the
* remote node. This method deactivates links created with {@link #sendLink
* link()}.
- *
+ *
* @param dest
* the Erlang PID of the remote process.
- *
+ *
* @exception java.io.IOException
* if the connection is not active or a communication error
* occurs.
*/
protected void sendUnlink(final OtpErlangPid from, final OtpErlangPid dest)
- throws IOException {
- if (!connected) {
- throw new IOException("Not connected");
- }
- @SuppressWarnings("resource")
- final OtpOutputStream header = new OtpOutputStream(headerLen);
+ throws IOException {
+ if (!connected) {
+ throw new IOException("Not connected");
+ }
+ @SuppressWarnings("resource")
+ final OtpOutputStream header = new OtpOutputStream(headerLen);
- // preamble: 4 byte length + "passthrough" tag
- header.write4BE(0); // reserve space for length
- header.write1(passThrough);
- header.write1(version);
+ // preamble: 4 byte length + "passthrough" tag
+ header.write4BE(0); // reserve space for length
+ header.write1(passThrough);
+ header.write1(version);
- // header
- header.write_tuple_head(3);
- header.write_long(unlinkTag);
- header.write_any(from);
- header.write_any(dest);
+ // header
+ header.write_tuple_head(3);
+ header.write_long(unlinkTag);
+ header.write_any(from);
+ header.write_any(dest);
- // fix up length in preamble
- header.poke4BE(0, header.size() - 4);
+ // fix up length in preamble
+ header.poke4BE(0, header.size() - 4);
- do_send(header);
+ do_send(header);
}
/* used internally when "processes" terminate */
protected void sendExit(final OtpErlangPid from, final OtpErlangPid dest,
- final OtpErlangObject reason) throws IOException {
- sendExit(exitTag, from, dest, reason);
+ final OtpErlangObject reason) throws IOException {
+ sendExit(exitTag, from, dest, reason);
}
/**
* Send an exit signal to a remote process.
- *
+ *
* @param dest
* the Erlang PID of the remote process.
* @param reason
* an Erlang term describing the exit reason.
- *
+ *
* @exception java.io.IOException
* if the connection is not active or a communication error
* occurs.
*/
protected void sendExit2(final OtpErlangPid from, final OtpErlangPid dest,
- final OtpErlangObject reason) throws IOException {
- sendExit(exit2Tag, from, dest, reason);
+ final OtpErlangObject reason) throws IOException {
+ sendExit(exit2Tag, from, dest, reason);
}
private void sendExit(final int tag, final OtpErlangPid from,
- final OtpErlangPid dest, final OtpErlangObject reason)
- throws IOException {
- if (!connected) {
- throw new IOException("Not connected");
- }
- @SuppressWarnings("resource")
- final OtpOutputStream header = new OtpOutputStream(headerLen);
+ final OtpErlangPid dest, final OtpErlangObject reason)
+ throws IOException {
+ if (!connected) {
+ throw new IOException("Not connected");
+ }
+ @SuppressWarnings("resource")
+ final OtpOutputStream header = new OtpOutputStream(headerLen);
- // preamble: 4 byte length + "passthrough" tag
- header.write4BE(0); // reserve space for length
- header.write1(passThrough);
- header.write1(version);
+ // preamble: 4 byte length + "passthrough" tag
+ header.write4BE(0); // reserve space for length
+ header.write1(passThrough);
+ header.write1(version);
- // header
- header.write_tuple_head(4);
- header.write_long(tag);
- header.write_any(from);
- header.write_any(dest);
- header.write_any(reason);
+ // header
+ header.write_tuple_head(4);
+ header.write_long(tag);
+ header.write_any(from);
+ header.write_any(dest);
+ header.write_any(reason);
- // fix up length in preamble
- header.poke4BE(0, header.size() - 4);
+ // fix up length in preamble
+ header.poke4BE(0, header.size() - 4);
- do_send(header);
+ do_send(header);
}
@SuppressWarnings("resource")
@Override
public void run() {
- if (!connected) {
- deliver(new IOException("Not connected"));
- return;
- }
-
- final byte[] lbuf = new byte[4];
- OtpInputStream ibuf;
- OtpErlangObject traceobj;
- int len;
- final byte[] tock = { 0, 0, 0, 0 };
-
- try {
- receive_loop: while (!done) {
- // don't return until we get a real message
- // or a failure of some kind (e.g. EXIT)
- // read length and read buffer must be atomic!
- do {
- // read 4 bytes - get length of incoming packet
- // socket.getInputStream().read(lbuf);
- readSock(socket, lbuf);
- ibuf = new OtpInputStream(lbuf, flags);
- len = ibuf.read4BE();
-
- // received tick? send tock!
- if (len == 0) {
- synchronized (this) {
- socket.getOutputStream().write(tock);
- }
- }
-
- } while (len == 0); // tick_loop
-
- // got a real message (maybe) - read len bytes
- final byte[] tmpbuf = new byte[len];
- // i = socket.getInputStream().read(tmpbuf);
- readSock(socket, tmpbuf);
- ibuf.close();
- ibuf = new OtpInputStream(tmpbuf, flags);
-
- if (ibuf.read1() != passThrough) {
- break receive_loop;
- }
-
- // got a real message (really)
- OtpErlangObject reason = null;
- OtpErlangAtom cookie = null;
- OtpErlangObject tmp = null;
- OtpErlangTuple head = null;
- OtpErlangAtom toName;
- OtpErlangPid to;
- OtpErlangPid from;
- int tag;
-
- // decode the header
- tmp = ibuf.read_any();
- if (!(tmp instanceof OtpErlangTuple)) {
- break receive_loop;
- }
-
- head = (OtpErlangTuple) tmp;
- if (!(head.elementAt(0) instanceof OtpErlangLong)) {
- break receive_loop;
- }
-
- // lets see what kind of message this is
- tag = (int) ((OtpErlangLong) head.elementAt(0)).longValue();
-
- switch (tag) {
- case sendTag: // { SEND, Cookie, ToPid }
- case sendTTTag: // { SEND, Cookie, ToPid, TraceToken }
- if (!cookieOk) {
- // we only check this once, he can send us bad cookies
- // later if he likes
- if (!(head.elementAt(1) instanceof OtpErlangAtom)) {
- break receive_loop;
- }
- cookie = (OtpErlangAtom) head.elementAt(1);
- if (sendCookie) {
- if (!cookie.atomValue().equals(localNode.cookie())) {
- cookieError(localNode, cookie);
- }
- } else {
- if (!cookie.atomValue().equals("")) {
- cookieError(localNode, cookie);
- }
- }
- cookieOk = true;
- }
-
- if (traceLevel >= sendThreshold) {
- System.out.println("<- " + headerType(head) + " "
- + head);
-
- /* show received payload too */
- ibuf.mark(0);
- traceobj = ibuf.read_any();
-
- if (traceobj != null) {
- System.out.println(" " + traceobj);
- } else {
- System.out.println(" (null)");
- }
- ibuf.reset();
- }
-
- to = (OtpErlangPid) head.elementAt(2);
-
- deliver(new OtpMsg(to, ibuf));
- break;
-
- case regSendTag: // { REG_SEND, FromPid, Cookie, ToName }
- case regSendTTTag: // { REG_SEND, FromPid, Cookie, ToName,
- // TraceToken }
- if (!cookieOk) {
- // we only check this once, he can send us bad cookies
- // later if he likes
- if (!(head.elementAt(2) instanceof OtpErlangAtom)) {
- break receive_loop;
- }
- cookie = (OtpErlangAtom) head.elementAt(2);
- if (sendCookie) {
- if (!cookie.atomValue().equals(localNode.cookie())) {
- cookieError(localNode, cookie);
- }
- } else {
- if (!cookie.atomValue().equals("")) {
- cookieError(localNode, cookie);
- }
- }
- cookieOk = true;
- }
-
- if (traceLevel >= sendThreshold) {
- System.out.println("<- " + headerType(head) + " "
- + head);
-
- /* show received payload too */
- ibuf.mark(0);
- traceobj = ibuf.read_any();
-
- if (traceobj != null) {
- System.out.println(" " + traceobj);
- } else {
- System.out.println(" (null)");
- }
- ibuf.reset();
- }
-
- from = (OtpErlangPid) head.elementAt(1);
- toName = (OtpErlangAtom) head.elementAt(3);
-
- deliver(new OtpMsg(from, toName.atomValue(), ibuf));
- break;
-
- case exitTag: // { EXIT, FromPid, ToPid, Reason }
- case exit2Tag: // { EXIT2, FromPid, ToPid, Reason }
- if (head.elementAt(3) == null) {
- break receive_loop;
- }
- if (traceLevel >= ctrlThreshold) {
- System.out.println("<- " + headerType(head) + " "
- + head);
- }
-
- from = (OtpErlangPid) head.elementAt(1);
- to = (OtpErlangPid) head.elementAt(2);
- reason = head.elementAt(3);
-
- deliver(new OtpMsg(tag, from, to, reason));
- break;
-
- case exitTTTag: // { EXIT, FromPid, ToPid, TraceToken, Reason }
- case exit2TTTag: // { EXIT2, FromPid, ToPid, TraceToken,
- // Reason
- // }
- // as above, but bifferent element number
- if (head.elementAt(4) == null) {
- break receive_loop;
- }
- if (traceLevel >= ctrlThreshold) {
- System.out.println("<- " + headerType(head) + " "
- + head);
- }
-
- from = (OtpErlangPid) head.elementAt(1);
- to = (OtpErlangPid) head.elementAt(2);
- reason = head.elementAt(4);
-
- deliver(new OtpMsg(tag, from, to, reason));
- break;
-
- case linkTag: // { LINK, FromPid, ToPid}
- case unlinkTag: // { UNLINK, FromPid, ToPid}
- if (traceLevel >= ctrlThreshold) {
- System.out.println("<- " + headerType(head) + " "
- + head);
- }
-
- from = (OtpErlangPid) head.elementAt(1);
- to = (OtpErlangPid) head.elementAt(2);
-
- deliver(new OtpMsg(tag, from, to));
- break;
-
- // absolutely no idea what to do with these, so we ignore
- // them...
- case groupLeaderTag: // { GROUPLEADER, FromPid, ToPid}
- // (just show trace)
- if (traceLevel >= ctrlThreshold) {
- System.out.println("<- " + headerType(head) + " "
- + head);
- }
- break;
-
- default:
- // garbage?
- break receive_loop;
- }
- } // end receive_loop
-
- // this section reachable only with break
- // we have received garbage from peer
- deliver(new OtpErlangExit("Remote is sending garbage"));
-
- } // try
-
- catch (final OtpAuthException e) {
- deliver(e);
- } catch (final OtpErlangDecodeException e) {
- deliver(new OtpErlangExit("Remote is sending garbage"));
- } catch (final IOException e) {
- deliver(new OtpErlangExit("Remote has closed connection"));
- } finally {
- close();
- }
+ if (!connected) {
+ deliver(new IOException("Not connected"));
+ return;
+ }
+
+ final byte[] lbuf = new byte[4];
+ OtpInputStream ibuf;
+ OtpErlangObject traceobj;
+ int len;
+ final byte[] tock = { 0, 0, 0, 0 };
+
+ try {
+ receive_loop: while (!done) {
+ // don't return until we get a real message
+ // or a failure of some kind (e.g. EXIT)
+ // read length and read buffer must be atomic!
+ do {
+ // read 4 bytes - get length of incoming packet
+ // socket.getInputStream().read(lbuf);
+ readSock(socket, lbuf);
+ ibuf = new OtpInputStream(lbuf, flags);
+ len = ibuf.read4BE();
+
+ // received tick? send tock!
+ if (len == 0) {
+ synchronized (this) {
+ socket.getOutputStream().write(tock);
+ }
+ }
+
+ } while (len == 0); // tick_loop
+
+ // got a real message (maybe) - read len bytes
+ final byte[] tmpbuf = new byte[len];
+ // i = socket.getInputStream().read(tmpbuf);
+ readSock(socket, tmpbuf);
+ ibuf.close();
+ ibuf = new OtpInputStream(tmpbuf, flags);
+
+ if (ibuf.read1() != passThrough) {
+ break receive_loop;
+ }
+
+ // got a real message (really)
+ OtpErlangObject reason = null;
+ OtpErlangAtom cookie = null;
+ OtpErlangObject tmp = null;
+ OtpErlangTuple head = null;
+ OtpErlangAtom toName;
+ OtpErlangPid to;
+ OtpErlangPid from;
+ int tag;
+
+ // decode the header
+ tmp = ibuf.read_any();
+ if (!(tmp instanceof OtpErlangTuple)) {
+ break receive_loop;
+ }
+
+ head = (OtpErlangTuple) tmp;
+ if (!(head.elementAt(0) instanceof OtpErlangLong)) {
+ break receive_loop;
+ }
+
+ // lets see what kind of message this is
+ tag = (int) ((OtpErlangLong) head.elementAt(0)).longValue();
+
+ switch (tag) {
+ case sendTag: // { SEND, Cookie, ToPid }
+ case sendTTTag: // { SEND, Cookie, ToPid, TraceToken }
+ if (!cookieOk) {
+ // we only check this once, he can send us bad cookies
+ // later if he likes
+ if (!(head.elementAt(1) instanceof OtpErlangAtom)) {
+ break receive_loop;
+ }
+ cookie = (OtpErlangAtom) head.elementAt(1);
+ if (sendCookie) {
+ if (!cookie.atomValue().equals(localNode.cookie())) {
+ cookieError(localNode, cookie);
+ }
+ } else {
+ if (!cookie.atomValue().equals("")) {
+ cookieError(localNode, cookie);
+ }
+ }
+ cookieOk = true;
+ }
+
+ if (traceLevel >= sendThreshold) {
+ System.out.println("<- " + headerType(head) + " "
+ + head);
+
+ /* show received payload too */
+ ibuf.mark(0);
+ traceobj = ibuf.read_any();
+
+ if (traceobj != null) {
+ System.out.println(" " + traceobj);
+ } else {
+ System.out.println(" (null)");
+ }
+ ibuf.reset();
+ }
+
+ to = (OtpErlangPid) head.elementAt(2);
+
+ deliver(new OtpMsg(to, ibuf));
+ break;
+
+ case regSendTag: // { REG_SEND, FromPid, Cookie, ToName }
+ case regSendTTTag: // { REG_SEND, FromPid, Cookie, ToName,
+ // TraceToken }
+ if (!cookieOk) {
+ // we only check this once, he can send us bad cookies
+ // later if he likes
+ if (!(head.elementAt(2) instanceof OtpErlangAtom)) {
+ break receive_loop;
+ }
+ cookie = (OtpErlangAtom) head.elementAt(2);
+ if (sendCookie) {
+ if (!cookie.atomValue().equals(localNode.cookie())) {
+ cookieError(localNode, cookie);
+ }
+ } else {
+ if (!cookie.atomValue().equals("")) {
+ cookieError(localNode, cookie);
+ }
+ }
+ cookieOk = true;
+ }
+
+ if (traceLevel >= sendThreshold) {
+ System.out.println("<- " + headerType(head) + " "
+ + head);
+
+ /* show received payload too */
+ ibuf.mark(0);
+ traceobj = ibuf.read_any();
+
+ if (traceobj != null) {
+ System.out.println(" " + traceobj);
+ } else {
+ System.out.println(" (null)");
+ }
+ ibuf.reset();
+ }
+
+ from = (OtpErlangPid) head.elementAt(1);
+ toName = (OtpErlangAtom) head.elementAt(3);
+
+ deliver(new OtpMsg(from, toName.atomValue(), ibuf));
+ break;
+
+ case exitTag: // { EXIT, FromPid, ToPid, Reason }
+ case exit2Tag: // { EXIT2, FromPid, ToPid, Reason }
+ if (head.elementAt(3) == null) {
+ break receive_loop;
+ }
+ if (traceLevel >= ctrlThreshold) {
+ System.out.println("<- " + headerType(head) + " "
+ + head);
+ }
+
+ from = (OtpErlangPid) head.elementAt(1);
+ to = (OtpErlangPid) head.elementAt(2);
+ reason = head.elementAt(3);
+
+ deliver(new OtpMsg(tag, from, to, reason));
+ break;
+
+ case exitTTTag: // { EXIT, FromPid, ToPid, TraceToken, Reason }
+ case exit2TTTag: // { EXIT2, FromPid, ToPid, TraceToken,
+ // Reason
+ // }
+ // as above, but bifferent element number
+ if (head.elementAt(4) == null) {
+ break receive_loop;
+ }
+ if (traceLevel >= ctrlThreshold) {
+ System.out.println("<- " + headerType(head) + " "
+ + head);
+ }
+
+ from = (OtpErlangPid) head.elementAt(1);
+ to = (OtpErlangPid) head.elementAt(2);
+ reason = head.elementAt(4);
+
+ deliver(new OtpMsg(tag, from, to, reason));
+ break;
+
+ case linkTag: // { LINK, FromPid, ToPid}
+ case unlinkTag: // { UNLINK, FromPid, ToPid}
+ if (traceLevel >= ctrlThreshold) {
+ System.out.println("<- " + headerType(head) + " "
+ + head);
+ }
+
+ from = (OtpErlangPid) head.elementAt(1);
+ to = (OtpErlangPid) head.elementAt(2);
+
+ deliver(new OtpMsg(tag, from, to));
+ break;
+
+ // absolutely no idea what to do with these, so we ignore
+ // them...
+ case groupLeaderTag: // { GROUPLEADER, FromPid, ToPid}
+ // (just show trace)
+ if (traceLevel >= ctrlThreshold) {
+ System.out.println("<- " + headerType(head) + " "
+ + head);
+ }
+ break;
+
+ default:
+ // garbage?
+ break receive_loop;
+ }
+ } // end receive_loop
+
+ // this section reachable only with break
+ // we have received garbage from peer
+ deliver(new OtpErlangExit("Remote is sending garbage"));
+
+ } // try
+
+ catch (final OtpAuthException e) {
+ deliver(e);
+ } catch (final OtpErlangDecodeException e) {
+ deliver(new OtpErlangExit("Remote is sending garbage"));
+ } catch (final IOException e) {
+ deliver(new OtpErlangExit("Remote has closed connection"));
+ } finally {
+ close();
+ }
}
/**
@@ -739,7 +739,7 @@ public abstract class AbstractConnection extends Thread {
* Set the trace level for this connection. Normally tracing is off by
* default unless System property OtpConnection.trace was set.
* </p>
- *
+ *
* <p>
* The following levels are valid: 0 turns off tracing completely, 1 shows
* ordinary send and receive messages, 2 shows control messages such as link
@@ -747,632 +747,640 @@ public abstract class AbstractConnection extends Thread {
* communication with Epmd. Each level includes the information shown by the
* lower ones.
* </p>
- *
+ *
* @param level
* the level to set.
- *
+ *
* @return the previous trace level.
*/
- public int setTraceLevel(int level) {
- final int oldLevel = traceLevel;
+ public int setTraceLevel(final int level) {
+ final int oldLevel = traceLevel;
- // pin the value
- int theLevel = level;
- if (level < 0) {
- theLevel = 0;
- } else if (level > 4) {
- theLevel = 4;
- }
+ // pin the value
+ int theLevel = level;
+ if (level < 0) {
+ theLevel = 0;
+ } else if (level > 4) {
+ theLevel = 4;
+ }
- traceLevel = theLevel;
+ traceLevel = theLevel;
- return oldLevel;
+ return oldLevel;
}
/**
* Get the trace level for this connection.
- *
+ *
* @return the current trace level.
*/
public int getTraceLevel() {
- return traceLevel;
+ return traceLevel;
}
/**
* Close the connection to the remote node.
*/
public void close() {
- done = true;
- connected = false;
- synchronized (this) {
- try {
- if (socket != null) {
- if (traceLevel >= ctrlThreshold) {
- System.out.println("-> CLOSE");
- }
- socket.close();
- }
- } catch (final IOException e) { /* ignore socket close errors */
- } finally {
- socket = null;
- }
- }
+ done = true;
+ connected = false;
+ synchronized (this) {
+ try {
+ if (socket != null) {
+ if (traceLevel >= ctrlThreshold) {
+ System.out.println("-> CLOSE");
+ }
+ socket.close();
+ }
+ } catch (final IOException e) { /* ignore socket close errors */
+ } finally {
+ socket = null;
+ }
+ }
}
@Override
protected void finalize() {
- close();
+ close();
}
/**
* Determine if the connection is still alive. Note that this method only
* reports the status of the connection, and that it is possible that there
* are unread messages waiting in the receive queue.
- *
+ *
* @return true if the connection is alive.
*/
public boolean isConnected() {
- return connected;
+ return connected;
}
// used by send and send_reg (message types with payload)
protected synchronized void do_send(final OtpOutputStream header,
- final OtpOutputStream payload) throws IOException {
- try {
- if (traceLevel >= sendThreshold) {
- // Need to decode header and output buffer to show trace
- // message!
- // First make OtpInputStream, then decode.
- try {
- final OtpErlangObject h = header.getOtpInputStream(5)
- .read_any();
- System.out.println("-> " + headerType(h) + " " + h);
-
- OtpErlangObject o = payload.getOtpInputStream(0).read_any();
- System.out.println(" " + o);
- o = null;
- } catch (final OtpErlangDecodeException e) {
- System.out.println(" " + "can't decode output buffer:"
- + e);
- }
- }
-
- header.writeTo(socket.getOutputStream());
- payload.writeTo(socket.getOutputStream());
- } catch (final IOException e) {
- close();
- throw e;
- }
+ final OtpOutputStream payload) throws IOException {
+ try {
+ if (traceLevel >= sendThreshold) {
+ // Need to decode header and output buffer to show trace
+ // message!
+ // First make OtpInputStream, then decode.
+ try {
+ final OtpErlangObject h = header.getOtpInputStream(5)
+ .read_any();
+ System.out.println("-> " + headerType(h) + " " + h);
+
+ OtpErlangObject o = payload.getOtpInputStream(0).read_any();
+ System.out.println(" " + o);
+ o = null;
+ } catch (final OtpErlangDecodeException e) {
+ System.out.println(" " + "can't decode output buffer:"
+ + e);
+ }
+ }
+
+ header.writeTo(socket.getOutputStream());
+ payload.writeTo(socket.getOutputStream());
+ } catch (final IOException e) {
+ close();
+ throw e;
+ }
}
// used by the other message types
protected synchronized void do_send(final OtpOutputStream header)
- throws IOException {
- try {
- if (traceLevel >= ctrlThreshold) {
- try {
- final OtpErlangObject h = header.getOtpInputStream(5)
- .read_any();
- System.out.println("-> " + headerType(h) + " " + h);
- } catch (final OtpErlangDecodeException e) {
- System.out.println(" " + "can't decode output buffer: "
- + e);
- }
- }
- header.writeTo(socket.getOutputStream());
- } catch (final IOException e) {
- close();
- throw e;
- }
+ throws IOException {
+ try {
+ if (traceLevel >= ctrlThreshold) {
+ try {
+ final OtpErlangObject h = header.getOtpInputStream(5)
+ .read_any();
+ System.out.println("-> " + headerType(h) + " " + h);
+ } catch (final OtpErlangDecodeException e) {
+ System.out.println(" " + "can't decode output buffer: "
+ + e);
+ }
+ }
+ header.writeTo(socket.getOutputStream());
+ } catch (final IOException e) {
+ close();
+ throw e;
+ }
}
protected String headerType(final OtpErlangObject h) {
- int tag = -1;
+ int tag = -1;
- if (h instanceof OtpErlangTuple) {
- tag = (int) ((OtpErlangLong) ((OtpErlangTuple) h).elementAt(0))
- .longValue();
- }
+ if (h instanceof OtpErlangTuple) {
+ tag = (int) ((OtpErlangLong) ((OtpErlangTuple) h).elementAt(0))
+ .longValue();
+ }
- switch (tag) {
- case linkTag:
- return "LINK";
+ switch (tag) {
+ case linkTag:
+ return "LINK";
- case sendTag:
- return "SEND";
+ case sendTag:
+ return "SEND";
- case exitTag:
- return "EXIT";
+ case exitTag:
+ return "EXIT";
- case unlinkTag:
- return "UNLINK";
+ case unlinkTag:
+ return "UNLINK";
- case regSendTag:
- return "REG_SEND";
+ case regSendTag:
+ return "REG_SEND";
- case groupLeaderTag:
- return "GROUP_LEADER";
+ case groupLeaderTag:
+ return "GROUP_LEADER";
- case exit2Tag:
- return "EXIT2";
+ case exit2Tag:
+ return "EXIT2";
- case sendTTTag:
- return "SEND_TT";
+ case sendTTTag:
+ return "SEND_TT";
- case exitTTTag:
- return "EXIT_TT";
+ case exitTTTag:
+ return "EXIT_TT";
- case regSendTTTag:
- return "REG_SEND_TT";
+ case regSendTTTag:
+ return "REG_SEND_TT";
- case exit2TTTag:
- return "EXIT2_TT";
- }
+ case exit2TTTag:
+ return "EXIT2_TT";
+ }
- return "(unknown type)";
+ return "(unknown type)";
}
/* this method now throws exception if we don't get full read */
protected int readSock(final Socket s, final byte[] b) throws IOException {
- int got = 0;
- final int len = b.length;
- int i;
-
- synchronized (this) {
- if (s == null) {
- throw new IOException("expected " + len
- + " bytes, socket was closed");
- }
- }
-
- while (got < len) {
- i = s.getInputStream().read(b, got, len - got);
-
- if (i < 0) {
- throw new IOException("expected " + len
- + " bytes, got EOF after " + got + " bytes");
- } else if (i == 0 && len != 0) {
- /*
- * This is a corner case. According to
- * http://java.sun.com/j2se/1.4.2/docs/api/ class InputStream
- * is.read(,,l) can only return 0 if l==0. In other words it
- * should not happen, but apparently did.
- */
- throw new IOException("Remote connection closed");
- } else {
- got += i;
- }
- }
- return got;
+ int got = 0;
+ final int len = b.length;
+ int i;
+
+ synchronized (this) {
+ if (s == null) {
+ throw new IOException("expected " + len
+ + " bytes, socket was closed");
+ }
+ }
+
+ while (got < len) {
+ i = s.getInputStream().read(b, got, len - got);
+
+ if (i < 0) {
+ throw new IOException("expected " + len
+ + " bytes, got EOF after " + got + " bytes");
+ } else if (i == 0 && len != 0) {
+ /*
+ * This is a corner case. According to
+ * http://java.sun.com/j2se/1.4.2/docs/api/ class InputStream
+ * is.read(,,l) can only return 0 if l==0. In other words it
+ * should not happen, but apparently did.
+ */
+ throw new IOException("Remote connection closed");
+ } else {
+ got += i;
+ }
+ }
+ return got;
}
protected void doAccept() throws IOException, OtpAuthException {
- try {
- sendStatus("ok");
- final int our_challenge = genChallenge();
- sendChallenge(peer.distChoose, localNode.flags, our_challenge);
- final int her_challenge = recvChallengeReply(our_challenge);
- final byte[] our_digest = genDigest(her_challenge, localNode.cookie());
- sendChallengeAck(our_digest);
- connected = true;
- cookieOk = true;
- sendCookie = false;
- } catch (final IOException ie) {
- close();
- throw ie;
- } catch (final OtpAuthException ae) {
- close();
- throw ae;
- } catch (final Exception e) {
- final String nn = peer.node();
- close();
- IOException ioe = new IOException("Error accepting connection from " + nn);
- ioe.initCause(e);
- throw ioe;
- }
- if (traceLevel >= handshakeThreshold) {
- System.out.println("<- MD5 ACCEPTED " + peer.host());
- }
+ try {
+ sendStatus("ok");
+ final int our_challenge = genChallenge();
+ sendChallenge(peer.distChoose, localNode.flags, our_challenge);
+ final int her_challenge = recvChallengeReply(our_challenge);
+ final byte[] our_digest = genDigest(her_challenge,
+ localNode.cookie());
+ sendChallengeAck(our_digest);
+ connected = true;
+ cookieOk = true;
+ sendCookie = false;
+ } catch (final IOException ie) {
+ close();
+ throw ie;
+ } catch (final OtpAuthException ae) {
+ close();
+ throw ae;
+ } catch (final Exception e) {
+ final String nn = peer.node();
+ close();
+ final IOException ioe = new IOException(
+ "Error accepting connection from " + nn);
+ ioe.initCause(e);
+ throw ioe;
+ }
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("<- MD5 ACCEPTED " + peer.host());
+ }
}
protected void doConnect(final int port) throws IOException,
- OtpAuthException {
- try {
- socket = new Socket(peer.host(), port);
- socket.setTcpNoDelay(true);
-
- if (traceLevel >= handshakeThreshold) {
- System.out.println("-> MD5 CONNECT TO " + peer.host() + ":"
- + port);
- }
- sendName(peer.distChoose, localNode.flags);
- recvStatus();
- final int her_challenge = recvChallenge();
- final byte[] our_digest = genDigest(her_challenge, localNode.cookie());
- final int our_challenge = genChallenge();
- sendChallengeReply(our_challenge, our_digest);
- recvChallengeAck(our_challenge);
- cookieOk = true;
- sendCookie = false;
- } catch (final OtpAuthException ae) {
- close();
- throw ae;
- } catch (final Exception e) {
- close();
- IOException ioe = new IOException("Cannot connect to peer node");
- ioe.initCause(e);
- throw ioe;
- }
+ OtpAuthException {
+ try {
+ socket = new Socket(peer.host(), port);
+ socket.setTcpNoDelay(true);
+
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("-> MD5 CONNECT TO " + peer.host() + ":"
+ + port);
+ }
+ sendName(peer.distChoose, localNode.flags);
+ recvStatus();
+ final int her_challenge = recvChallenge();
+ final byte[] our_digest = genDigest(her_challenge,
+ localNode.cookie());
+ final int our_challenge = genChallenge();
+ sendChallengeReply(our_challenge, our_digest);
+ recvChallengeAck(our_challenge);
+ cookieOk = true;
+ sendCookie = false;
+ } catch (final OtpAuthException ae) {
+ close();
+ throw ae;
+ } catch (final Exception e) {
+ close();
+ final IOException ioe = new IOException(
+ "Cannot connect to peer node");
+ ioe.initCause(e);
+ throw ioe;
+ }
}
// This is nooo good as a challenge,
// XXX fix me.
static protected int genChallenge() {
- return random.nextInt();
+ return random.nextInt();
}
// Used to debug print a message digest
static String hex0(final byte x) {
- final char tab[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
- 'a', 'b', 'c', 'd', 'e', 'f' };
- int uint;
- if (x < 0) {
- uint = x & 0x7F;
- uint |= 1 << 7;
- } else {
- uint = x;
- }
- return "" + tab[uint >>> 4] + tab[uint & 0xF];
+ final char tab[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ 'a', 'b', 'c', 'd', 'e', 'f' };
+ int uint;
+ if (x < 0) {
+ uint = x & 0x7F;
+ uint |= 1 << 7;
+ } else {
+ uint = x;
+ }
+ return "" + tab[uint >>> 4] + tab[uint & 0xF];
}
static String hex(final byte[] b) {
- final StringBuffer sb = new StringBuffer();
- try {
- int i;
- for (i = 0; i < b.length; ++i) {
- sb.append(hex0(b[i]));
- }
- } catch (final Exception e) {
- // Debug function, ignore errors.
- }
- return sb.toString();
+ final StringBuffer sb = new StringBuffer();
+ try {
+ int i;
+ for (i = 0; i < b.length; ++i) {
+ sb.append(hex0(b[i]));
+ }
+ } catch (final Exception e) {
+ // Debug function, ignore errors.
+ }
+ return sb.toString();
}
protected byte[] genDigest(final int challenge, final String cookie) {
- int i;
- long ch2;
-
- if (challenge < 0) {
- ch2 = 1L << 31;
- ch2 |= challenge & 0x7FFFFFFF;
- } else {
- ch2 = challenge;
- }
- final OtpMD5 context = new OtpMD5();
- context.update(cookie);
- context.update("" + ch2);
-
- final int[] tmp = context.final_bytes();
- final byte[] res = new byte[tmp.length];
- for (i = 0; i < tmp.length; ++i) {
- res[i] = (byte) (tmp[i] & 0xFF);
- }
- return res;
+ int i;
+ long ch2;
+
+ if (challenge < 0) {
+ ch2 = 1L << 31;
+ ch2 |= challenge & 0x7FFFFFFF;
+ } else {
+ ch2 = challenge;
+ }
+ final OtpMD5 context = new OtpMD5();
+ context.update(cookie);
+ context.update("" + ch2);
+
+ final int[] tmp = context.final_bytes();
+ final byte[] res = new byte[tmp.length];
+ for (i = 0; i < tmp.length; ++i) {
+ res[i] = (byte) (tmp[i] & 0xFF);
+ }
+ return res;
}
- protected void sendName(final int dist, final int aflags) throws IOException {
+ protected void sendName(final int dist, final int aflags)
+ throws IOException {
- @SuppressWarnings("resource")
- final OtpOutputStream obuf = new OtpOutputStream();
- final String str = localNode.node();
- obuf.write2BE(str.length() + 7); // 7 bytes + nodename
- obuf.write1(AbstractNode.NTYPE_R6);
- obuf.write2BE(dist);
- obuf.write4BE(aflags);
- obuf.write(str.getBytes());
-
- obuf.writeTo(socket.getOutputStream());
-
- if (traceLevel >= handshakeThreshold) {
- System.out.println("-> " + "HANDSHAKE sendName" + " flags=" + aflags
- + " dist=" + dist + " local=" + localNode);
- }
+ @SuppressWarnings("resource")
+ final OtpOutputStream obuf = new OtpOutputStream();
+ final String str = localNode.node();
+ obuf.write2BE(str.length() + 7); // 7 bytes + nodename
+ obuf.write1(AbstractNode.NTYPE_R6);
+ obuf.write2BE(dist);
+ obuf.write4BE(aflags);
+ obuf.write(str.getBytes());
+
+ obuf.writeTo(socket.getOutputStream());
+
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("-> " + "HANDSHAKE sendName" + " flags="
+ + aflags + " dist=" + dist + " local=" + localNode);
+ }
}
protected void sendChallenge(final int dist, final int aflags,
- final int challenge) throws IOException {
+ final int challenge) throws IOException {
- @SuppressWarnings("resource")
- final OtpOutputStream obuf = new OtpOutputStream();
- final String str = localNode.node();
- obuf.write2BE(str.length() + 11); // 11 bytes + nodename
- obuf.write1(AbstractNode.NTYPE_R6);
- obuf.write2BE(dist);
- obuf.write4BE(aflags);
- obuf.write4BE(challenge);
- obuf.write(str.getBytes());
-
- obuf.writeTo(socket.getOutputStream());
-
- if (traceLevel >= handshakeThreshold) {
- System.out.println("-> " + "HANDSHAKE sendChallenge" + " flags="
- + aflags + " dist=" + dist + " challenge=" + challenge
- + " local=" + localNode);
- }
+ @SuppressWarnings("resource")
+ final OtpOutputStream obuf = new OtpOutputStream();
+ final String str = localNode.node();
+ obuf.write2BE(str.length() + 11); // 11 bytes + nodename
+ obuf.write1(AbstractNode.NTYPE_R6);
+ obuf.write2BE(dist);
+ obuf.write4BE(aflags);
+ obuf.write4BE(challenge);
+ obuf.write(str.getBytes());
+
+ obuf.writeTo(socket.getOutputStream());
+
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("-> " + "HANDSHAKE sendChallenge" + " flags="
+ + aflags + " dist=" + dist + " challenge=" + challenge
+ + " local=" + localNode);
+ }
}
protected byte[] read2BytePackage() throws IOException,
- OtpErlangDecodeException {
+ OtpErlangDecodeException {
- final byte[] lbuf = new byte[2];
- byte[] tmpbuf;
+ final byte[] lbuf = new byte[2];
+ byte[] tmpbuf;
- readSock(socket, lbuf);
- @SuppressWarnings("resource")
- final OtpInputStream ibuf = new OtpInputStream(lbuf, 0);
- final int len = ibuf.read2BE();
- tmpbuf = new byte[len];
- readSock(socket, tmpbuf);
- return tmpbuf;
+ readSock(socket, lbuf);
+ @SuppressWarnings("resource")
+ final OtpInputStream ibuf = new OtpInputStream(lbuf, 0);
+ final int len = ibuf.read2BE();
+ tmpbuf = new byte[len];
+ readSock(socket, tmpbuf);
+ return tmpbuf;
}
protected void recvName(final OtpPeer apeer) throws IOException {
- String hisname = "";
-
- try {
- final byte[] tmpbuf = read2BytePackage();
- @SuppressWarnings("resource")
- final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0);
- byte[] tmpname;
- final int len = tmpbuf.length;
- apeer.ntype = ibuf.read1();
- if (apeer.ntype != AbstractNode.NTYPE_R6) {
- throw new IOException("Unknown remote node type");
- }
- apeer.distLow = apeer.distHigh = ibuf.read2BE();
- if (apeer.distLow < 5) {
- throw new IOException("Unknown remote node type");
- }
- apeer.flags = ibuf.read4BE();
- tmpname = new byte[len - 7];
- ibuf.readN(tmpname);
- hisname = OtpErlangString.newString(tmpname);
- // Set the old nodetype parameter to indicate hidden/normal status
- // When the old handshake is removed, the ntype should also be.
- if ((apeer.flags & AbstractNode.dFlagPublished) != 0) {
- apeer.ntype = AbstractNode.NTYPE_R4_ERLANG;
- } else {
- apeer.ntype = AbstractNode.NTYPE_R4_HIDDEN;
- }
-
- if ((apeer.flags & AbstractNode.dFlagExtendedReferences) == 0) {
- throw new IOException(
- "Handshake failed - peer cannot handle extended references");
- }
-
- if ((apeer.flags & AbstractNode.dFlagExtendedPidsPorts) == 0) {
- throw new IOException(
- "Handshake failed - peer cannot handle extended pids and ports");
- }
-
- } catch (final OtpErlangDecodeException e) {
- throw new IOException("Handshake failed - not enough data");
- }
-
- final int i = hisname.indexOf('@', 0);
- apeer.node = hisname;
- apeer.alive = hisname.substring(0, i);
- apeer.host = hisname.substring(i + 1, hisname.length());
-
- if (traceLevel >= handshakeThreshold) {
- System.out.println("<- " + "HANDSHAKE" + " ntype=" + apeer.ntype
- + " dist=" + apeer.distHigh + " remote=" + apeer);
- }
+ String hisname = "";
+
+ try {
+ final byte[] tmpbuf = read2BytePackage();
+ @SuppressWarnings("resource")
+ final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0);
+ byte[] tmpname;
+ final int len = tmpbuf.length;
+ apeer.ntype = ibuf.read1();
+ if (apeer.ntype != AbstractNode.NTYPE_R6) {
+ throw new IOException("Unknown remote node type");
+ }
+ apeer.distLow = apeer.distHigh = ibuf.read2BE();
+ if (apeer.distLow < 5) {
+ throw new IOException("Unknown remote node type");
+ }
+ apeer.flags = ibuf.read4BE();
+ tmpname = new byte[len - 7];
+ ibuf.readN(tmpname);
+ hisname = OtpErlangString.newString(tmpname);
+ // Set the old nodetype parameter to indicate hidden/normal status
+ // When the old handshake is removed, the ntype should also be.
+ if ((apeer.flags & AbstractNode.dFlagPublished) != 0) {
+ apeer.ntype = AbstractNode.NTYPE_R4_ERLANG;
+ } else {
+ apeer.ntype = AbstractNode.NTYPE_R4_HIDDEN;
+ }
+
+ if ((apeer.flags & AbstractNode.dFlagExtendedReferences) == 0) {
+ throw new IOException(
+ "Handshake failed - peer cannot handle extended references");
+ }
+
+ if ((apeer.flags & AbstractNode.dFlagExtendedPidsPorts) == 0) {
+ throw new IOException(
+ "Handshake failed - peer cannot handle extended pids and ports");
+ }
+
+ } catch (final OtpErlangDecodeException e) {
+ throw new IOException("Handshake failed - not enough data");
+ }
+
+ final int i = hisname.indexOf('@', 0);
+ apeer.node = hisname;
+ apeer.alive = hisname.substring(0, i);
+ apeer.host = hisname.substring(i + 1, hisname.length());
+
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("<- " + "HANDSHAKE" + " ntype=" + apeer.ntype
+ + " dist=" + apeer.distHigh + " remote=" + apeer);
+ }
}
protected int recvChallenge() throws IOException {
- int challenge;
-
- try {
- final byte[] buf = read2BytePackage();
- @SuppressWarnings("resource")
- final OtpInputStream ibuf = new OtpInputStream(buf, 0);
- peer.ntype = ibuf.read1();
- if (peer.ntype != AbstractNode.NTYPE_R6) {
- throw new IOException("Unexpected peer type");
- }
- peer.distLow = peer.distHigh = ibuf.read2BE();
- peer.flags = ibuf.read4BE();
- challenge = ibuf.read4BE();
- final byte[] tmpname = new byte[buf.length - 11];
- ibuf.readN(tmpname);
- final String hisname = OtpErlangString.newString(tmpname);
- if (!hisname.equals(peer.node)) {
- throw new IOException(
- "Handshake failed - peer has wrong name: " + hisname);
- }
-
- if ((peer.flags & AbstractNode.dFlagExtendedReferences) == 0) {
- throw new IOException(
- "Handshake failed - peer cannot handle extended references");
- }
-
- if ((peer.flags & AbstractNode.dFlagExtendedPidsPorts) == 0) {
- throw new IOException(
- "Handshake failed - peer cannot handle extended pids and ports");
- }
-
- } catch (final OtpErlangDecodeException e) {
- throw new IOException("Handshake failed - not enough data");
- }
-
- if (traceLevel >= handshakeThreshold) {
- System.out.println("<- " + "HANDSHAKE recvChallenge" + " from="
- + peer.node + " challenge=" + challenge + " local=" + localNode);
- }
-
- return challenge;
+ int challenge;
+
+ try {
+ final byte[] buf = read2BytePackage();
+ @SuppressWarnings("resource")
+ final OtpInputStream ibuf = new OtpInputStream(buf, 0);
+ peer.ntype = ibuf.read1();
+ if (peer.ntype != AbstractNode.NTYPE_R6) {
+ throw new IOException("Unexpected peer type");
+ }
+ peer.distLow = peer.distHigh = ibuf.read2BE();
+ peer.flags = ibuf.read4BE();
+ challenge = ibuf.read4BE();
+ final byte[] tmpname = new byte[buf.length - 11];
+ ibuf.readN(tmpname);
+ final String hisname = OtpErlangString.newString(tmpname);
+ if (!hisname.equals(peer.node)) {
+ throw new IOException(
+ "Handshake failed - peer has wrong name: " + hisname);
+ }
+
+ if ((peer.flags & AbstractNode.dFlagExtendedReferences) == 0) {
+ throw new IOException(
+ "Handshake failed - peer cannot handle extended references");
+ }
+
+ if ((peer.flags & AbstractNode.dFlagExtendedPidsPorts) == 0) {
+ throw new IOException(
+ "Handshake failed - peer cannot handle extended pids and ports");
+ }
+
+ } catch (final OtpErlangDecodeException e) {
+ throw new IOException("Handshake failed - not enough data");
+ }
+
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("<- " + "HANDSHAKE recvChallenge" + " from="
+ + peer.node + " challenge=" + challenge + " local="
+ + localNode);
+ }
+
+ return challenge;
}
protected void sendChallengeReply(final int challenge, final byte[] digest)
- throws IOException {
+ throws IOException {
- @SuppressWarnings("resource")
- final OtpOutputStream obuf = new OtpOutputStream();
- obuf.write2BE(21);
- obuf.write1(ChallengeReply);
- obuf.write4BE(challenge);
- obuf.write(digest);
- obuf.writeTo(socket.getOutputStream());
-
- if (traceLevel >= handshakeThreshold) {
- System.out.println("-> " + "HANDSHAKE sendChallengeReply"
- + " challenge=" + challenge + " digest=" + hex(digest)
- + " local=" + localNode);
- }
+ @SuppressWarnings("resource")
+ final OtpOutputStream obuf = new OtpOutputStream();
+ obuf.write2BE(21);
+ obuf.write1(ChallengeReply);
+ obuf.write4BE(challenge);
+ obuf.write(digest);
+ obuf.writeTo(socket.getOutputStream());
+
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("-> " + "HANDSHAKE sendChallengeReply"
+ + " challenge=" + challenge + " digest=" + hex(digest)
+ + " local=" + localNode);
+ }
}
// Would use Array.equals in newer JDK...
private boolean digests_equals(final byte[] a, final byte[] b) {
- int i;
- for (i = 0; i < 16; ++i) {
- if (a[i] != b[i]) {
- return false;
- }
- }
- return true;
+ int i;
+ for (i = 0; i < 16; ++i) {
+ if (a[i] != b[i]) {
+ return false;
+ }
+ }
+ return true;
}
protected int recvChallengeReply(final int our_challenge)
- throws IOException, OtpAuthException {
-
- int challenge;
- final byte[] her_digest = new byte[16];
-
- try {
- final byte[] buf = read2BytePackage();
- @SuppressWarnings("resource")
- final OtpInputStream ibuf = new OtpInputStream(buf, 0);
- final int tag = ibuf.read1();
- if (tag != ChallengeReply) {
- throw new IOException("Handshake protocol error");
- }
- challenge = ibuf.read4BE();
- ibuf.readN(her_digest);
- final byte[] our_digest = genDigest(our_challenge, localNode.cookie());
- if (!digests_equals(her_digest, our_digest)) {
- throw new OtpAuthException("Peer authentication error.");
- }
- } catch (final OtpErlangDecodeException e) {
- throw new IOException("Handshake failed - not enough data");
- }
-
- if (traceLevel >= handshakeThreshold) {
- System.out.println("<- " + "HANDSHAKE recvChallengeReply"
- + " from=" + peer.node + " challenge=" + challenge
- + " digest=" + hex(her_digest) + " local=" + localNode);
- }
-
- return challenge;
+ throws IOException, OtpAuthException {
+
+ int challenge;
+ final byte[] her_digest = new byte[16];
+
+ try {
+ final byte[] buf = read2BytePackage();
+ @SuppressWarnings("resource")
+ final OtpInputStream ibuf = new OtpInputStream(buf, 0);
+ final int tag = ibuf.read1();
+ if (tag != ChallengeReply) {
+ throw new IOException("Handshake protocol error");
+ }
+ challenge = ibuf.read4BE();
+ ibuf.readN(her_digest);
+ final byte[] our_digest = genDigest(our_challenge,
+ localNode.cookie());
+ if (!digests_equals(her_digest, our_digest)) {
+ throw new OtpAuthException("Peer authentication error.");
+ }
+ } catch (final OtpErlangDecodeException e) {
+ throw new IOException("Handshake failed - not enough data");
+ }
+
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("<- " + "HANDSHAKE recvChallengeReply"
+ + " from=" + peer.node + " challenge=" + challenge
+ + " digest=" + hex(her_digest) + " local=" + localNode);
+ }
+
+ return challenge;
}
protected void sendChallengeAck(final byte[] digest) throws IOException {
- @SuppressWarnings("resource")
- final OtpOutputStream obuf = new OtpOutputStream();
- obuf.write2BE(17);
- obuf.write1(ChallengeAck);
- obuf.write(digest);
+ @SuppressWarnings("resource")
+ final OtpOutputStream obuf = new OtpOutputStream();
+ obuf.write2BE(17);
+ obuf.write1(ChallengeAck);
+ obuf.write(digest);
- obuf.writeTo(socket.getOutputStream());
+ obuf.writeTo(socket.getOutputStream());
- if (traceLevel >= handshakeThreshold) {
- System.out.println("-> " + "HANDSHAKE sendChallengeAck"
- + " digest=" + hex(digest) + " local=" + localNode);
- }
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("-> " + "HANDSHAKE sendChallengeAck"
+ + " digest=" + hex(digest) + " local=" + localNode);
+ }
}
protected void recvChallengeAck(final int our_challenge)
- throws IOException, OtpAuthException {
-
- final byte[] her_digest = new byte[16];
- try {
- final byte[] buf = read2BytePackage();
- @SuppressWarnings("resource")
- final OtpInputStream ibuf = new OtpInputStream(buf, 0);
- final int tag = ibuf.read1();
- if (tag != ChallengeAck) {
- throw new IOException("Handshake protocol error");
- }
- ibuf.readN(her_digest);
- final byte[] our_digest = genDigest(our_challenge, localNode.cookie());
- if (!digests_equals(her_digest, our_digest)) {
- throw new OtpAuthException("Peer authentication error.");
- }
- } catch (final OtpErlangDecodeException e) {
- throw new IOException("Handshake failed - not enough data");
- } catch (final Exception e) {
- throw new OtpAuthException("Peer authentication error.");
- }
-
- if (traceLevel >= handshakeThreshold) {
- System.out.println("<- " + "HANDSHAKE recvChallengeAck" + " from="
- + peer.node + " digest=" + hex(her_digest) + " local="
- + localNode);
- }
+ throws IOException, OtpAuthException {
+
+ final byte[] her_digest = new byte[16];
+ try {
+ final byte[] buf = read2BytePackage();
+ @SuppressWarnings("resource")
+ final OtpInputStream ibuf = new OtpInputStream(buf, 0);
+ final int tag = ibuf.read1();
+ if (tag != ChallengeAck) {
+ throw new IOException("Handshake protocol error");
+ }
+ ibuf.readN(her_digest);
+ final byte[] our_digest = genDigest(our_challenge,
+ localNode.cookie());
+ if (!digests_equals(her_digest, our_digest)) {
+ throw new OtpAuthException("Peer authentication error.");
+ }
+ } catch (final OtpErlangDecodeException e) {
+ throw new IOException("Handshake failed - not enough data");
+ } catch (final Exception e) {
+ throw new OtpAuthException("Peer authentication error.");
+ }
+
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("<- " + "HANDSHAKE recvChallengeAck" + " from="
+ + peer.node + " digest=" + hex(her_digest) + " local="
+ + localNode);
+ }
}
protected void sendStatus(final String status) throws IOException {
- @SuppressWarnings("resource")
- final OtpOutputStream obuf = new OtpOutputStream();
- obuf.write2BE(status.length() + 1);
- obuf.write1(ChallengeStatus);
- obuf.write(status.getBytes());
+ @SuppressWarnings("resource")
+ final OtpOutputStream obuf = new OtpOutputStream();
+ obuf.write2BE(status.length() + 1);
+ obuf.write1(ChallengeStatus);
+ obuf.write(status.getBytes());
- obuf.writeTo(socket.getOutputStream());
+ obuf.writeTo(socket.getOutputStream());
- if (traceLevel >= handshakeThreshold) {
- System.out.println("-> " + "HANDSHAKE sendStatus" + " status="
- + status + " local=" + localNode);
- }
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("-> " + "HANDSHAKE sendStatus" + " status="
+ + status + " local=" + localNode);
+ }
}
protected void recvStatus() throws IOException {
- try {
- final byte[] buf = read2BytePackage();
- @SuppressWarnings("resource")
- final OtpInputStream ibuf = new OtpInputStream(buf, 0);
- final int tag = ibuf.read1();
- if (tag != ChallengeStatus) {
- throw new IOException("Handshake protocol error");
- }
- final byte[] tmpbuf = new byte[buf.length - 1];
- ibuf.readN(tmpbuf);
- final String status = OtpErlangString.newString(tmpbuf);
-
- if (status.compareTo("ok") != 0) {
- throw new IOException("Peer replied with status '" + status
- + "' instead of 'ok'");
- }
- } catch (final OtpErlangDecodeException e) {
- throw new IOException("Handshake failed - not enough data");
- }
- if (traceLevel >= handshakeThreshold) {
- System.out.println("<- " + "HANDSHAKE recvStatus (ok)" + " local="
- + localNode);
- }
+ try {
+ final byte[] buf = read2BytePackage();
+ @SuppressWarnings("resource")
+ final OtpInputStream ibuf = new OtpInputStream(buf, 0);
+ final int tag = ibuf.read1();
+ if (tag != ChallengeStatus) {
+ throw new IOException("Handshake protocol error");
+ }
+ final byte[] tmpbuf = new byte[buf.length - 1];
+ ibuf.readN(tmpbuf);
+ final String status = OtpErlangString.newString(tmpbuf);
+
+ if (status.compareTo("ok") != 0) {
+ throw new IOException("Peer replied with status '" + status
+ + "' instead of 'ok'");
+ }
+ } catch (final OtpErlangDecodeException e) {
+ throw new IOException("Handshake failed - not enough data");
+ }
+ if (traceLevel >= handshakeThreshold) {
+ System.out.println("<- " + "HANDSHAKE recvStatus (ok)" + " local="
+ + localNode);
+ }
}
public void setFlags(final int flags) {
- this.flags = flags;
+ this.flags = flags;
}
public int getFlags() {
- return flags;
+ return flags;
}
}