/* * %CopyrightBegin% * * Copyright Ericsson AB 2000-2010. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ package com.ericsson.otp.erlang; import java.io.IOException; import java.net.Socket; import java.util.Random; /** * Maintains a connection between a Java process and a remote Erlang, Java or C * node. The object maintains connection state and allows data to be sent to and * received from the peer. * *
* This abstract class provides the neccesary methods to maintain the actual * connection and encode the messages and headers in the proper format according * to the Erlang distribution protocol. Subclasses can use these methods to * provide a more or less transparent communication channel as desired. *
* ** Note that no receive methods are provided. Subclasses must provide methods * for message delivery, and may implement their own receive methods. *
* *
* If an exception occurs in any of the methods in this class, the connection * will be closed and must be reopened in order to resume communication with the * peer. This will be indicated to the subclass by passing the exception to its * delivery() method. *
* ** The System property OtpConnection.trace can be used to change the initial * trace level setting for all connections. Normally the initial trace level is * 0 and connections are not traced unless {@link #setTraceLevel * setTraceLevel()} is used to change the setting for a particular connection. * OtpConnection.trace can be used to turn on tracing by default for all * connections. *
*/ public abstract class AbstractConnection extends Thread { protected static final int headerLen = 2048; // more than enough protected static final byte passThrough = (byte) 0x70; protected static final byte version = (byte) 0x83; // Erlang message header tags protected static final int linkTag = 1; protected static final int sendTag = 2; protected static final int exitTag = 3; protected static final int unlinkTag = 4; protected static final int regSendTag = 6; protected static final int groupLeaderTag = 7; protected static final int exit2Tag = 8; protected static final int sendTTTag = 12; protected static final int exitTTTag = 13; protected static final int regSendTTTag = 16; protected static final int exit2TTTag = 18; // MD5 challenge messsage tags protected static final int ChallengeReply = 'r'; protected static final int ChallengeAck = 'a'; protected static final int ChallengeStatus = 's'; private volatile boolean done = false; protected boolean connected = false; // connection status protected Socket socket; // communication channel protected OtpPeer peer; // who are we connected to protected OtpLocalNode localNode; // this nodes id String name; // local name of this connection protected boolean cookieOk = false; // already checked the cookie for this // connection protected boolean sendCookie = true; // Send cookies in messages? // tracelevel constants protected int traceLevel = 0; protected static int defaultLevel = 0; protected static int sendThreshold = 1; protected static int ctrlThreshold = 2; protected static int handshakeThreshold = 3; protected static Random random = null; private int flags = 0; static { // trace this connection? final String trace = System.getProperties().getProperty( "OtpConnection.trace"); try { if (trace != null) { defaultLevel = Integer.valueOf(trace).intValue(); } } catch (final NumberFormatException e) { defaultLevel = 0; } random = new Random(); } // private AbstractConnection() { // } /** * Accept an incoming connection from a remote node. Used by * {@link OtpSelf#accept() OtpSelf.accept()} to create a connection based on * data received when handshaking with the peer node, when the remote node * is the connection intitiator. * * @exception java.io.IOException * if it was not possible to connect to the peer. * * @exception OtpAuthException * if handshake resulted in an authentication error */ protected AbstractConnection(final OtpLocalNode self, final Socket s) throws IOException, OtpAuthException { localNode = self; peer = new OtpPeer(); socket = s; socket.setTcpNoDelay(true); traceLevel = defaultLevel; setDaemon(true); if (traceLevel >= handshakeThreshold) { System.out.println("<- ACCEPT FROM " + s.getInetAddress() + ":" + s.getPort()); } // get his info recvName(peer); // now find highest common dist value if (peer.proto != self.proto || self.distHigh < peer.distLow || self.distLow > peer.distHigh) { close(); throw new IOException( "No common protocol found - cannot accept connection"); } // highest common version: min(peer.distHigh, self.distHigh) peer.distChoose = peer.distHigh > self.distHigh ? self.distHigh : peer.distHigh; doAccept(); name = peer.node(); } /** * Intiate and open a connection to a remote node. * * @exception java.io.IOException * if it was not possible to connect to the peer. * * @exception OtpAuthException * if handshake resulted in an authentication error. */ protected AbstractConnection(final OtpLocalNode self, final OtpPeer other) throws IOException, OtpAuthException { peer = other; localNode = self; socket = null; int port; traceLevel = defaultLevel; setDaemon(true); // now get a connection between the two... port = OtpEpmd.lookupPort(peer); // now find highest common dist value if (peer.proto != self.proto || self.distHigh < peer.distLow || self.distLow > peer.distHigh) { throw new IOException("No common protocol found - cannot connect"); } // highest common version: min(peer.distHigh, self.distHigh) peer.distChoose = peer.distHigh > self.distHigh ? self.distHigh : peer.distHigh; doConnect(port); name = peer.node(); connected = true; } /** * Deliver communication exceptions to the recipient. */ public abstract void deliver(Exception e); /** * Deliver messages to the recipient. */ public abstract void deliver(OtpMsg msg); /** * Send a pre-encoded message to a named process on a remote node. * * @param dest * the name of the remote process. * @param payload * the encoded message to send. * * @exception java.io.IOException * if the connection is not active or a communication error * occurs. */ protected void sendBuf(final OtpErlangPid from, final String dest, final OtpOutputStream payload) throws IOException { if (!connected) { throw new IOException("Not connected"); } @SuppressWarnings("resource") final OtpOutputStream header = new OtpOutputStream(headerLen); // preamble: 4 byte length + "passthrough" tag + version header.write4BE(0); // reserve space for length header.write1(passThrough); header.write1(version); // header info header.write_tuple_head(4); header.write_long(regSendTag); header.write_any(from); if (sendCookie) { header.write_atom(localNode.cookie()); } else { header.write_atom(""); } header.write_atom(dest); // version for payload header.write1(version); // fix up length in preamble header.poke4BE(0, header.size() + payload.size() - 4); do_send(header, payload); } /** * Send a pre-encoded message to a process on a remote node. * * @param dest * the Erlang PID of the remote process. * @param payload * the encoded message to send. * * @exception java.io.IOException * if the connection is not active or a communication error * occurs. */ protected void sendBuf(final OtpErlangPid from, final OtpErlangPid dest, final OtpOutputStream payload) throws IOException { if (!connected) { throw new IOException("Not connected"); } @SuppressWarnings("resource") final OtpOutputStream header = new OtpOutputStream(headerLen); // preamble: 4 byte length + "passthrough" tag + version header.write4BE(0); // reserve space for length header.write1(passThrough); header.write1(version); // header info header.write_tuple_head(3); header.write_long(sendTag); if (sendCookie) { header.write_atom(localNode.cookie()); } else { header.write_atom(""); } header.write_any(dest); // version for payload header.write1(version); // fix up length in preamble header.poke4BE(0, header.size() + payload.size() - 4); do_send(header, payload); } /* * Send an auth error to peer because he sent a bad cookie. The auth error * uses his cookie (not revealing ours). This is just like send_reg * otherwise */ private void cookieError(final OtpLocalNode local, final OtpErlangAtom cookie) throws OtpAuthException { try { @SuppressWarnings("resource") final OtpOutputStream header = new OtpOutputStream(headerLen); // preamble: 4 byte length + "passthrough" tag + version header.write4BE(0); // reserve space for length header.write1(passThrough); header.write1(version); header.write_tuple_head(4); header.write_long(regSendTag); header.write_any(local.createPid()); // disposable pid header.write_atom(cookie.atomValue()); // important: his cookie, // not mine... header.write_atom("auth"); // version for payload header.write1(version); // the payload // the no_auth message (copied from Erlang) Don't change this // (Erlang will crash) // {$gen_cast, {print, "~n** Unauthorized cookie ~w **~n", // [foo@aule]}} final OtpErlangObject[] msg = new OtpErlangObject[2]; final OtpErlangObject[] msgbody = new OtpErlangObject[3]; msgbody[0] = new OtpErlangAtom("print"); msgbody[1] = new OtpErlangString("~n** Bad cookie sent to " + local + " **~n"); // Erlang will crash and burn if there is no third argument here... msgbody[2] = new OtpErlangList(); // empty list msg[0] = new OtpErlangAtom("$gen_cast"); msg[1] = new OtpErlangTuple(msgbody); @SuppressWarnings("resource") final OtpOutputStream payload = new OtpOutputStream( new OtpErlangTuple(msg)); // fix up length in preamble header.poke4BE(0, header.size() + payload.size() - 4); try { do_send(header, payload); } catch (final IOException e) { } // ignore } finally { close(); } throw new OtpAuthException("Remote cookie not authorized: " + cookie.atomValue()); } // link to pid /** * Create a link between the local node and the specified process on the * remote node. If the link is still active when the remote process * terminates, an exit signal will be sent to this connection. Use * {@link #sendUnlink unlink()} to remove the link. * * @param dest * the Erlang PID of the remote process. * * @exception java.io.IOException * if the connection is not active or a communication error * occurs. */ protected void sendLink(final OtpErlangPid from, final OtpErlangPid dest) throws IOException { if (!connected) { throw new IOException("Not connected"); } @SuppressWarnings("resource") final OtpOutputStream header = new OtpOutputStream(headerLen); // preamble: 4 byte length + "passthrough" tag header.write4BE(0); // reserve space for length header.write1(passThrough); header.write1(version); // header header.write_tuple_head(3); header.write_long(linkTag); header.write_any(from); header.write_any(dest); // fix up length in preamble header.poke4BE(0, header.size() - 4); do_send(header); } /** * Remove a link between the local node and the specified process on the * remote node. This method deactivates links created with {@link #sendLink * link()}. * * @param dest * the Erlang PID of the remote process. * * @exception java.io.IOException * if the connection is not active or a communication error * occurs. */ protected void sendUnlink(final OtpErlangPid from, final OtpErlangPid dest) throws IOException { if (!connected) { throw new IOException("Not connected"); } @SuppressWarnings("resource") final OtpOutputStream header = new OtpOutputStream(headerLen); // preamble: 4 byte length + "passthrough" tag header.write4BE(0); // reserve space for length header.write1(passThrough); header.write1(version); // header header.write_tuple_head(3); header.write_long(unlinkTag); header.write_any(from); header.write_any(dest); // fix up length in preamble header.poke4BE(0, header.size() - 4); do_send(header); } /* used internally when "processes" terminate */ protected void sendExit(final OtpErlangPid from, final OtpErlangPid dest, final OtpErlangObject reason) throws IOException { sendExit(exitTag, from, dest, reason); } /** * Send an exit signal to a remote process. * * @param dest * the Erlang PID of the remote process. * @param reason * an Erlang term describing the exit reason. * * @exception java.io.IOException * if the connection is not active or a communication error * occurs. */ protected void sendExit2(final OtpErlangPid from, final OtpErlangPid dest, final OtpErlangObject reason) throws IOException { sendExit(exit2Tag, from, dest, reason); } private void sendExit(final int tag, final OtpErlangPid from, final OtpErlangPid dest, final OtpErlangObject reason) throws IOException { if (!connected) { throw new IOException("Not connected"); } @SuppressWarnings("resource") final OtpOutputStream header = new OtpOutputStream(headerLen); // preamble: 4 byte length + "passthrough" tag header.write4BE(0); // reserve space for length header.write1(passThrough); header.write1(version); // header header.write_tuple_head(4); header.write_long(tag); header.write_any(from); header.write_any(dest); header.write_any(reason); // fix up length in preamble header.poke4BE(0, header.size() - 4); do_send(header); } @SuppressWarnings("resource") @Override public void run() { if (!connected) { deliver(new IOException("Not connected")); return; } final byte[] lbuf = new byte[4]; OtpInputStream ibuf; OtpErlangObject traceobj; int len; final byte[] tock = { 0, 0, 0, 0 }; try { receive_loop: while (!done) { // don't return until we get a real message // or a failure of some kind (e.g. EXIT) // read length and read buffer must be atomic! do { // read 4 bytes - get length of incoming packet // socket.getInputStream().read(lbuf); readSock(socket, lbuf); ibuf = new OtpInputStream(lbuf, flags); len = ibuf.read4BE(); // received tick? send tock! if (len == 0) { synchronized (this) { socket.getOutputStream().write(tock); } } } while (len == 0); // tick_loop // got a real message (maybe) - read len bytes final byte[] tmpbuf = new byte[len]; // i = socket.getInputStream().read(tmpbuf); readSock(socket, tmpbuf); ibuf.close(); ibuf = new OtpInputStream(tmpbuf, flags); if (ibuf.read1() != passThrough) { break receive_loop; } // got a real message (really) OtpErlangObject reason = null; OtpErlangAtom cookie = null; OtpErlangObject tmp = null; OtpErlangTuple head = null; OtpErlangAtom toName; OtpErlangPid to; OtpErlangPid from; int tag; // decode the header tmp = ibuf.read_any(); if (!(tmp instanceof OtpErlangTuple)) { break receive_loop; } head = (OtpErlangTuple) tmp; if (!(head.elementAt(0) instanceof OtpErlangLong)) { break receive_loop; } // lets see what kind of message this is tag = (int) ((OtpErlangLong) head.elementAt(0)).longValue(); switch (tag) { case sendTag: // { SEND, Cookie, ToPid } case sendTTTag: // { SEND, Cookie, ToPid, TraceToken } if (!cookieOk) { // we only check this once, he can send us bad cookies // later if he likes if (!(head.elementAt(1) instanceof OtpErlangAtom)) { break receive_loop; } cookie = (OtpErlangAtom) head.elementAt(1); if (sendCookie) { if (!cookie.atomValue().equals(localNode.cookie())) { cookieError(localNode, cookie); } } else { if (!cookie.atomValue().equals("")) { cookieError(localNode, cookie); } } cookieOk = true; } if (traceLevel >= sendThreshold) { System.out.println("<- " + headerType(head) + " " + head); /* show received payload too */ ibuf.mark(0); traceobj = ibuf.read_any(); if (traceobj != null) { System.out.println(" " + traceobj); } else { System.out.println(" (null)"); } ibuf.reset(); } to = (OtpErlangPid) head.elementAt(2); deliver(new OtpMsg(to, ibuf)); break; case regSendTag: // { REG_SEND, FromPid, Cookie, ToName } case regSendTTTag: // { REG_SEND, FromPid, Cookie, ToName, // TraceToken } if (!cookieOk) { // we only check this once, he can send us bad cookies // later if he likes if (!(head.elementAt(2) instanceof OtpErlangAtom)) { break receive_loop; } cookie = (OtpErlangAtom) head.elementAt(2); if (sendCookie) { if (!cookie.atomValue().equals(localNode.cookie())) { cookieError(localNode, cookie); } } else { if (!cookie.atomValue().equals("")) { cookieError(localNode, cookie); } } cookieOk = true; } if (traceLevel >= sendThreshold) { System.out.println("<- " + headerType(head) + " " + head); /* show received payload too */ ibuf.mark(0); traceobj = ibuf.read_any(); if (traceobj != null) { System.out.println(" " + traceobj); } else { System.out.println(" (null)"); } ibuf.reset(); } from = (OtpErlangPid) head.elementAt(1); toName = (OtpErlangAtom) head.elementAt(3); deliver(new OtpMsg(from, toName.atomValue(), ibuf)); break; case exitTag: // { EXIT, FromPid, ToPid, Reason } case exit2Tag: // { EXIT2, FromPid, ToPid, Reason } if (head.elementAt(3) == null) { break receive_loop; } if (traceLevel >= ctrlThreshold) { System.out.println("<- " + headerType(head) + " " + head); } from = (OtpErlangPid) head.elementAt(1); to = (OtpErlangPid) head.elementAt(2); reason = head.elementAt(3); deliver(new OtpMsg(tag, from, to, reason)); break; case exitTTTag: // { EXIT, FromPid, ToPid, TraceToken, Reason } case exit2TTTag: // { EXIT2, FromPid, ToPid, TraceToken, // Reason // } // as above, but bifferent element number if (head.elementAt(4) == null) { break receive_loop; } if (traceLevel >= ctrlThreshold) { System.out.println("<- " + headerType(head) + " " + head); } from = (OtpErlangPid) head.elementAt(1); to = (OtpErlangPid) head.elementAt(2); reason = head.elementAt(4); deliver(new OtpMsg(tag, from, to, reason)); break; case linkTag: // { LINK, FromPid, ToPid} case unlinkTag: // { UNLINK, FromPid, ToPid} if (traceLevel >= ctrlThreshold) { System.out.println("<- " + headerType(head) + " " + head); } from = (OtpErlangPid) head.elementAt(1); to = (OtpErlangPid) head.elementAt(2); deliver(new OtpMsg(tag, from, to)); break; // absolutely no idea what to do with these, so we ignore // them... case groupLeaderTag: // { GROUPLEADER, FromPid, ToPid} // (just show trace) if (traceLevel >= ctrlThreshold) { System.out.println("<- " + headerType(head) + " " + head); } break; default: // garbage? break receive_loop; } } // end receive_loop // this section reachable only with break // we have received garbage from peer deliver(new OtpErlangExit("Remote is sending garbage")); } // try catch (final OtpAuthException e) { deliver(e); } catch (final OtpErlangDecodeException e) { deliver(new OtpErlangExit("Remote is sending garbage")); } catch (final IOException e) { deliver(new OtpErlangExit("Remote has closed connection")); } finally { close(); } } /** ** Set the trace level for this connection. Normally tracing is off by * default unless System property OtpConnection.trace was set. *
* ** The following levels are valid: 0 turns off tracing completely, 1 shows * ordinary send and receive messages, 2 shows control messages such as link * and unlink, 3 shows handshaking at connection setup, and 4 shows * communication with Epmd. Each level includes the information shown by the * lower ones. *
* * @param level * the level to set. * * @return the previous trace level. */ public int setTraceLevel(final int level) { final int oldLevel = traceLevel; // pin the value int theLevel = level; if (level < 0) { theLevel = 0; } else if (level > 4) { theLevel = 4; } traceLevel = theLevel; return oldLevel; } /** * Get the trace level for this connection. * * @return the current trace level. */ public int getTraceLevel() { return traceLevel; } /** * Close the connection to the remote node. */ public void close() { done = true; connected = false; synchronized (this) { try { if (socket != null) { if (traceLevel >= ctrlThreshold) { System.out.println("-> CLOSE"); } socket.close(); } } catch (final IOException e) { /* ignore socket close errors */ } finally { socket = null; } } } @Override protected void finalize() { close(); } /** * Determine if the connection is still alive. Note that this method only * reports the status of the connection, and that it is possible that there * are unread messages waiting in the receive queue. * * @return true if the connection is alive. */ public boolean isConnected() { return connected; } // used by send and send_reg (message types with payload) protected synchronized void do_send(final OtpOutputStream header, final OtpOutputStream payload) throws IOException { try { if (traceLevel >= sendThreshold) { // Need to decode header and output buffer to show trace // message! // First make OtpInputStream, then decode. try { final OtpErlangObject h = header.getOtpInputStream(5) .read_any(); System.out.println("-> " + headerType(h) + " " + h); OtpErlangObject o = payload.getOtpInputStream(0).read_any(); System.out.println(" " + o); o = null; } catch (final OtpErlangDecodeException e) { System.out.println(" " + "can't decode output buffer:" + e); } } header.writeTo(socket.getOutputStream()); payload.writeTo(socket.getOutputStream()); } catch (final IOException e) { close(); throw e; } } // used by the other message types protected synchronized void do_send(final OtpOutputStream header) throws IOException { try { if (traceLevel >= ctrlThreshold) { try { final OtpErlangObject h = header.getOtpInputStream(5) .read_any(); System.out.println("-> " + headerType(h) + " " + h); } catch (final OtpErlangDecodeException e) { System.out.println(" " + "can't decode output buffer: " + e); } } header.writeTo(socket.getOutputStream()); } catch (final IOException e) { close(); throw e; } } protected String headerType(final OtpErlangObject h) { int tag = -1; if (h instanceof OtpErlangTuple) { tag = (int) ((OtpErlangLong) ((OtpErlangTuple) h).elementAt(0)) .longValue(); } switch (tag) { case linkTag: return "LINK"; case sendTag: return "SEND"; case exitTag: return "EXIT"; case unlinkTag: return "UNLINK"; case regSendTag: return "REG_SEND"; case groupLeaderTag: return "GROUP_LEADER"; case exit2Tag: return "EXIT2"; case sendTTTag: return "SEND_TT"; case exitTTTag: return "EXIT_TT"; case regSendTTTag: return "REG_SEND_TT"; case exit2TTTag: return "EXIT2_TT"; } return "(unknown type)"; } /* this method now throws exception if we don't get full read */ protected int readSock(final Socket s, final byte[] b) throws IOException { int got = 0; final int len = b.length; int i; synchronized (this) { if (s == null) { throw new IOException("expected " + len + " bytes, socket was closed"); } } while (got < len) { i = s.getInputStream().read(b, got, len - got); if (i < 0) { throw new IOException("expected " + len + " bytes, got EOF after " + got + " bytes"); } else if (i == 0 && len != 0) { /* * This is a corner case. According to * http://java.sun.com/j2se/1.4.2/docs/api/ class InputStream * is.read(,,l) can only return 0 if l==0. In other words it * should not happen, but apparently did. */ throw new IOException("Remote connection closed"); } else { got += i; } } return got; } protected void doAccept() throws IOException, OtpAuthException { try { sendStatus("ok"); final int our_challenge = genChallenge(); sendChallenge(peer.distChoose, localNode.flags, our_challenge); final int her_challenge = recvChallengeReply(our_challenge); final byte[] our_digest = genDigest(her_challenge, localNode.cookie()); sendChallengeAck(our_digest); connected = true; cookieOk = true; sendCookie = false; } catch (final IOException ie) { close(); throw ie; } catch (final OtpAuthException ae) { close(); throw ae; } catch (final Exception e) { final String nn = peer.node(); close(); final IOException ioe = new IOException( "Error accepting connection from " + nn); ioe.initCause(e); throw ioe; } if (traceLevel >= handshakeThreshold) { System.out.println("<- MD5 ACCEPTED " + peer.host()); } } protected void doConnect(final int port) throws IOException, OtpAuthException { try { socket = new Socket(peer.host(), port); socket.setTcpNoDelay(true); if (traceLevel >= handshakeThreshold) { System.out.println("-> MD5 CONNECT TO " + peer.host() + ":" + port); } sendName(peer.distChoose, localNode.flags); recvStatus(); final int her_challenge = recvChallenge(); final byte[] our_digest = genDigest(her_challenge, localNode.cookie()); final int our_challenge = genChallenge(); sendChallengeReply(our_challenge, our_digest); recvChallengeAck(our_challenge); cookieOk = true; sendCookie = false; } catch (final OtpAuthException ae) { close(); throw ae; } catch (final Exception e) { close(); final IOException ioe = new IOException( "Cannot connect to peer node"); ioe.initCause(e); throw ioe; } } // This is nooo good as a challenge, // XXX fix me. static protected int genChallenge() { return random.nextInt(); } // Used to debug print a message digest static String hex0(final byte x) { final char tab[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; int uint; if (x < 0) { uint = x & 0x7F; uint |= 1 << 7; } else { uint = x; } return "" + tab[uint >>> 4] + tab[uint & 0xF]; } static String hex(final byte[] b) { final StringBuffer sb = new StringBuffer(); try { int i; for (i = 0; i < b.length; ++i) { sb.append(hex0(b[i])); } } catch (final Exception e) { // Debug function, ignore errors. } return sb.toString(); } protected byte[] genDigest(final int challenge, final String cookie) { int i; long ch2; if (challenge < 0) { ch2 = 1L << 31; ch2 |= challenge & 0x7FFFFFFF; } else { ch2 = challenge; } final OtpMD5 context = new OtpMD5(); context.update(cookie); context.update("" + ch2); final int[] tmp = context.final_bytes(); final byte[] res = new byte[tmp.length]; for (i = 0; i < tmp.length; ++i) { res[i] = (byte) (tmp[i] & 0xFF); } return res; } protected void sendName(final int dist, final int aflags) throws IOException { @SuppressWarnings("resource") final OtpOutputStream obuf = new OtpOutputStream(); final String str = localNode.node(); obuf.write2BE(str.length() + 7); // 7 bytes + nodename obuf.write1(AbstractNode.NTYPE_R6); obuf.write2BE(dist); obuf.write4BE(aflags); obuf.write(str.getBytes()); obuf.writeTo(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendName" + " flags=" + aflags + " dist=" + dist + " local=" + localNode); } } protected void sendChallenge(final int dist, final int aflags, final int challenge) throws IOException { @SuppressWarnings("resource") final OtpOutputStream obuf = new OtpOutputStream(); final String str = localNode.node(); obuf.write2BE(str.length() + 11); // 11 bytes + nodename obuf.write1(AbstractNode.NTYPE_R6); obuf.write2BE(dist); obuf.write4BE(aflags); obuf.write4BE(challenge); obuf.write(str.getBytes()); obuf.writeTo(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendChallenge" + " flags=" + aflags + " dist=" + dist + " challenge=" + challenge + " local=" + localNode); } } protected byte[] read2BytePackage() throws IOException, OtpErlangDecodeException { final byte[] lbuf = new byte[2]; byte[] tmpbuf; readSock(socket, lbuf); @SuppressWarnings("resource") final OtpInputStream ibuf = new OtpInputStream(lbuf, 0); final int len = ibuf.read2BE(); tmpbuf = new byte[len]; readSock(socket, tmpbuf); return tmpbuf; } protected void recvName(final OtpPeer apeer) throws IOException { String hisname = ""; try { final byte[] tmpbuf = read2BytePackage(); @SuppressWarnings("resource") final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0); byte[] tmpname; final int len = tmpbuf.length; apeer.ntype = ibuf.read1(); if (apeer.ntype != AbstractNode.NTYPE_R6) { throw new IOException("Unknown remote node type"); } apeer.distLow = apeer.distHigh = ibuf.read2BE(); if (apeer.distLow < 5) { throw new IOException("Unknown remote node type"); } apeer.flags = ibuf.read4BE(); tmpname = new byte[len - 7]; ibuf.readN(tmpname); hisname = OtpErlangString.newString(tmpname); // Set the old nodetype parameter to indicate hidden/normal status // When the old handshake is removed, the ntype should also be. if ((apeer.flags & AbstractNode.dFlagPublished) != 0) { apeer.ntype = AbstractNode.NTYPE_R4_ERLANG; } else { apeer.ntype = AbstractNode.NTYPE_R4_HIDDEN; } if ((apeer.flags & AbstractNode.dFlagExtendedReferences) == 0) { throw new IOException( "Handshake failed - peer cannot handle extended references"); } if ((apeer.flags & AbstractNode.dFlagExtendedPidsPorts) == 0) { throw new IOException( "Handshake failed - peer cannot handle extended pids and ports"); } } catch (final OtpErlangDecodeException e) { throw new IOException("Handshake failed - not enough data"); } final int i = hisname.indexOf('@', 0); apeer.node = hisname; apeer.alive = hisname.substring(0, i); apeer.host = hisname.substring(i + 1, hisname.length()); if (traceLevel >= handshakeThreshold) { System.out.println("<- " + "HANDSHAKE" + " ntype=" + apeer.ntype + " dist=" + apeer.distHigh + " remote=" + apeer); } } protected int recvChallenge() throws IOException { int challenge; try { final byte[] buf = read2BytePackage(); @SuppressWarnings("resource") final OtpInputStream ibuf = new OtpInputStream(buf, 0); peer.ntype = ibuf.read1(); if (peer.ntype != AbstractNode.NTYPE_R6) { throw new IOException("Unexpected peer type"); } peer.distLow = peer.distHigh = ibuf.read2BE(); peer.flags = ibuf.read4BE(); challenge = ibuf.read4BE(); final byte[] tmpname = new byte[buf.length - 11]; ibuf.readN(tmpname); final String hisname = OtpErlangString.newString(tmpname); if (!hisname.equals(peer.node)) { throw new IOException( "Handshake failed - peer has wrong name: " + hisname); } if ((peer.flags & AbstractNode.dFlagExtendedReferences) == 0) { throw new IOException( "Handshake failed - peer cannot handle extended references"); } if ((peer.flags & AbstractNode.dFlagExtendedPidsPorts) == 0) { throw new IOException( "Handshake failed - peer cannot handle extended pids and ports"); } } catch (final OtpErlangDecodeException e) { throw new IOException("Handshake failed - not enough data"); } if (traceLevel >= handshakeThreshold) { System.out.println("<- " + "HANDSHAKE recvChallenge" + " from=" + peer.node + " challenge=" + challenge + " local=" + localNode); } return challenge; } protected void sendChallengeReply(final int challenge, final byte[] digest) throws IOException { @SuppressWarnings("resource") final OtpOutputStream obuf = new OtpOutputStream(); obuf.write2BE(21); obuf.write1(ChallengeReply); obuf.write4BE(challenge); obuf.write(digest); obuf.writeTo(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendChallengeReply" + " challenge=" + challenge + " digest=" + hex(digest) + " local=" + localNode); } } // Would use Array.equals in newer JDK... private boolean digests_equals(final byte[] a, final byte[] b) { int i; for (i = 0; i < 16; ++i) { if (a[i] != b[i]) { return false; } } return true; } protected int recvChallengeReply(final int our_challenge) throws IOException, OtpAuthException { int challenge; final byte[] her_digest = new byte[16]; try { final byte[] buf = read2BytePackage(); @SuppressWarnings("resource") final OtpInputStream ibuf = new OtpInputStream(buf, 0); final int tag = ibuf.read1(); if (tag != ChallengeReply) { throw new IOException("Handshake protocol error"); } challenge = ibuf.read4BE(); ibuf.readN(her_digest); final byte[] our_digest = genDigest(our_challenge, localNode.cookie()); if (!digests_equals(her_digest, our_digest)) { throw new OtpAuthException("Peer authentication error."); } } catch (final OtpErlangDecodeException e) { throw new IOException("Handshake failed - not enough data"); } if (traceLevel >= handshakeThreshold) { System.out.println("<- " + "HANDSHAKE recvChallengeReply" + " from=" + peer.node + " challenge=" + challenge + " digest=" + hex(her_digest) + " local=" + localNode); } return challenge; } protected void sendChallengeAck(final byte[] digest) throws IOException { @SuppressWarnings("resource") final OtpOutputStream obuf = new OtpOutputStream(); obuf.write2BE(17); obuf.write1(ChallengeAck); obuf.write(digest); obuf.writeTo(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendChallengeAck" + " digest=" + hex(digest) + " local=" + localNode); } } protected void recvChallengeAck(final int our_challenge) throws IOException, OtpAuthException { final byte[] her_digest = new byte[16]; try { final byte[] buf = read2BytePackage(); @SuppressWarnings("resource") final OtpInputStream ibuf = new OtpInputStream(buf, 0); final int tag = ibuf.read1(); if (tag != ChallengeAck) { throw new IOException("Handshake protocol error"); } ibuf.readN(her_digest); final byte[] our_digest = genDigest(our_challenge, localNode.cookie()); if (!digests_equals(her_digest, our_digest)) { throw new OtpAuthException("Peer authentication error."); } } catch (final OtpErlangDecodeException e) { throw new IOException("Handshake failed - not enough data"); } catch (final Exception e) { throw new OtpAuthException("Peer authentication error."); } if (traceLevel >= handshakeThreshold) { System.out.println("<- " + "HANDSHAKE recvChallengeAck" + " from=" + peer.node + " digest=" + hex(her_digest) + " local=" + localNode); } } protected void sendStatus(final String status) throws IOException { @SuppressWarnings("resource") final OtpOutputStream obuf = new OtpOutputStream(); obuf.write2BE(status.length() + 1); obuf.write1(ChallengeStatus); obuf.write(status.getBytes()); obuf.writeTo(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendStatus" + " status=" + status + " local=" + localNode); } } protected void recvStatus() throws IOException { try { final byte[] buf = read2BytePackage(); @SuppressWarnings("resource") final OtpInputStream ibuf = new OtpInputStream(buf, 0); final int tag = ibuf.read1(); if (tag != ChallengeStatus) { throw new IOException("Handshake protocol error"); } final byte[] tmpbuf = new byte[buf.length - 1]; ibuf.readN(tmpbuf); final String status = OtpErlangString.newString(tmpbuf); if (status.compareTo("ok") != 0) { throw new IOException("Peer replied with status '" + status + "' instead of 'ok'"); } } catch (final OtpErlangDecodeException e) { throw new IOException("Handshake failed - not enough data"); } if (traceLevel >= handshakeThreshold) { System.out.println("<- " + "HANDSHAKE recvStatus (ok)" + " local=" + localNode); } } public void setFlags(final int flags) { this.flags = flags; } public int getFlags() { return flags; } }