aboutsummaryrefslogtreecommitdiffstats
path: root/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java')
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java584
1 files changed, 584 insertions, 0 deletions
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java
new file mode 100644
index 0000000000..8e8bd473c8
--- /dev/null
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpConnection.java
@@ -0,0 +1,584 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2000-2009. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+package com.ericsson.otp.erlang;
+
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Maintains a connection between a Java process and a remote Erlang, Java or C
+ * node. The object maintains connection state and allows data to be sent to and
+ * received from the peer.
+ *
+ * <p>
+ * Once a connection is established between the local node and a remote node,
+ * the connection object can be used to send and receive messages between the
+ * nodes and make rpc calls (assuming that the remote node is a real Erlang
+ * node).
+ *
+ * <p>
+ * The various receive methods are all blocking and will return only when a
+ * valid message has been received or an exception is raised.
+ *
+ * <p>
+ * If an exception occurs in any of the methods in this class, the connection
+ * will be closed and must be explicitely reopened in order to resume
+ * communication with the peer.
+ *
+ * <p>
+ * It is not possible to create an instance of this class directly.
+ * OtpConnection objects are returned by {@link OtpSelf#connect(OtpPeer)
+ * OtpSelf.connect()} and {@link OtpSelf#accept() OtpSelf.accept()}.
+ */
+public class OtpConnection extends AbstractConnection {
+ protected OtpSelf self;
+ protected GenericQueue queue; // messages get delivered here
+
+ /*
+ * Accept an incoming connection from a remote node. Used by {@link
+ * OtpSelf#accept() OtpSelf.accept()} to create a connection based on data
+ * received when handshaking with the peer node, when the remote node is the
+ * connection intitiator.
+ *
+ * @exception java.io.IOException if it was not possible to connect to the
+ * peer.
+ *
+ * @exception OtpAuthException if handshake resulted in an authentication
+ * error
+ */
+ // package scope
+ OtpConnection(final OtpSelf self, final Socket s) throws IOException,
+ OtpAuthException {
+ super(self, s);
+ this.self = self;
+ queue = new GenericQueue();
+ start();
+ }
+
+ /*
+ * Intiate and open a connection to a remote node.
+ *
+ * @exception java.io.IOException if it was not possible to connect to the
+ * peer.
+ *
+ * @exception OtpAuthException if handshake resulted in an authentication
+ * error.
+ */
+ // package scope
+ OtpConnection(final OtpSelf self, final OtpPeer other) throws IOException,
+ OtpAuthException {
+ super(self, other);
+ this.self = self;
+ queue = new GenericQueue();
+ start();
+ }
+
+ @Override
+ public void deliver(final Exception e) {
+ queue.put(e);
+ }
+
+ @Override
+ public void deliver(final OtpMsg msg) {
+ queue.put(msg);
+ }
+
+ /**
+ * Get information about the node at the peer end of this connection.
+ *
+ * @return the {@link OtpPeer Node} representing the peer node.
+ */
+ public OtpPeer peer() {
+ return peer;
+ }
+
+ /**
+ * Get information about the node at the local end of this connection.
+ *
+ * @return the {@link OtpSelf Node} representing the local node.
+ */
+ public OtpSelf self() {
+ return self;
+ }
+
+ /**
+ * Return the number of messages currently waiting in the receive queue for
+ * this connection.
+ */
+ public int msgCount() {
+ return queue.getCount();
+ }
+
+ /**
+ * Receive a message from a remote process. This method blocks until a valid
+ * message is received or an exception is raised.
+ *
+ * <p>
+ * If the remote node sends a message that cannot be decoded properly, the
+ * connection is closed and the method throws an exception.
+ *
+ * @return an object containing a single Erlang term.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ *
+ * @exception OtpErlangExit
+ * if an exit signal is received from a process on the
+ * peer node.
+ *
+ * @exception OtpAuthException
+ * if the remote node sends a message containing an
+ * invalid cookie.
+ */
+ public OtpErlangObject receive() throws IOException, OtpErlangExit,
+ OtpAuthException {
+ try {
+ return receiveMsg().getMsg();
+ } catch (final OtpErlangDecodeException e) {
+ close();
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Receive a message from a remote process. This method blocks at most for
+ * the specified time, until a valid message is received or an exception is
+ * raised.
+ *
+ * <p>
+ * If the remote node sends a message that cannot be decoded properly, the
+ * connection is closed and the method throws an exception.
+ *
+ * @param timeout
+ * the time in milliseconds that this operation will block.
+ * Specify 0 to poll the queue.
+ *
+ * @return an object containing a single Erlang term.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ *
+ * @exception OtpErlangExit
+ * if an exit signal is received from a process on the
+ * peer node.
+ *
+ * @exception OtpAuthException
+ * if the remote node sends a message containing an
+ * invalid cookie.
+ *
+ * @exception InterruptedException
+ * if no message if the method times out before a message
+ * becomes available.
+ */
+ public OtpErlangObject receive(final long timeout)
+ throws InterruptedException, IOException, OtpErlangExit,
+ OtpAuthException {
+ try {
+ return receiveMsg(timeout).getMsg();
+ } catch (final OtpErlangDecodeException e) {
+ close();
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Receive a raw (still encoded) message from a remote process. This message
+ * blocks until a valid message is received or an exception is raised.
+ *
+ * <p>
+ * If the remote node sends a message that cannot be decoded properly, the
+ * connection is closed and the method throws an exception.
+ *
+ * @return an object containing a raw (still encoded) Erlang term.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ *
+ * @exception OtpErlangExit
+ * if an exit signal is received from a process on the
+ * peer node, or if the connection is lost for any
+ * reason.
+ *
+ * @exception OtpAuthException
+ * if the remote node sends a message containing an
+ * invalid cookie.
+ */
+ public OtpInputStream receiveBuf() throws IOException, OtpErlangExit,
+ OtpAuthException {
+ return receiveMsg().getMsgBuf();
+ }
+
+ /**
+ * Receive a raw (still encoded) message from a remote process. This message
+ * blocks at most for the specified time until a valid message is received
+ * or an exception is raised.
+ *
+ * <p>
+ * If the remote node sends a message that cannot be decoded properly, the
+ * connection is closed and the method throws an exception.
+ *
+ * @param timeout
+ * the time in milliseconds that this operation will block.
+ * Specify 0 to poll the queue.
+ *
+ * @return an object containing a raw (still encoded) Erlang term.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ *
+ * @exception OtpErlangExit
+ * if an exit signal is received from a process on the
+ * peer node, or if the connection is lost for any
+ * reason.
+ *
+ * @exception OtpAuthException
+ * if the remote node sends a message containing an
+ * invalid cookie.
+ *
+ * @exception InterruptedException
+ * if no message if the method times out before a message
+ * becomes available.
+ */
+ public OtpInputStream receiveBuf(final long timeout)
+ throws InterruptedException, IOException, OtpErlangExit,
+ OtpAuthException {
+ return receiveMsg(timeout).getMsgBuf();
+ }
+
+ /**
+ * Receive a messge complete with sender and recipient information.
+ *
+ * @return an {@link OtpMsg OtpMsg} containing the header information about
+ * the sender and recipient, as well as the actual message contents.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ *
+ * @exception OtpErlangExit
+ * if an exit signal is received from a process on the
+ * peer node, or if the connection is lost for any
+ * reason.
+ *
+ * @exception OtpAuthException
+ * if the remote node sends a message containing an
+ * invalid cookie.
+ */
+ public OtpMsg receiveMsg() throws IOException, OtpErlangExit,
+ OtpAuthException {
+ final Object o = queue.get();
+
+ if (o instanceof OtpMsg) {
+ return (OtpMsg) o;
+ } else if (o instanceof IOException) {
+ throw (IOException) o;
+ } else if (o instanceof OtpErlangExit) {
+ throw (OtpErlangExit) o;
+ } else if (o instanceof OtpAuthException) {
+ throw (OtpAuthException) o;
+ }
+
+ return null;
+ }
+
+ /**
+ * Receive a messge complete with sender and recipient information. This
+ * method blocks at most for the specified time.
+ *
+ * @param timeout
+ * the time in milliseconds that this operation will block.
+ * Specify 0 to poll the queue.
+ *
+ * @return an {@link OtpMsg OtpMsg} containing the header information about
+ * the sender and recipient, as well as the actual message contents.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ *
+ * @exception OtpErlangExit
+ * if an exit signal is received from a process on the
+ * peer node, or if the connection is lost for any
+ * reason.
+ *
+ * @exception OtpAuthException
+ * if the remote node sends a message containing an
+ * invalid cookie.
+ *
+ * @exception InterruptedException
+ * if no message if the method times out before a message
+ * becomes available.
+ */
+ public OtpMsg receiveMsg(final long timeout) throws InterruptedException,
+ IOException, OtpErlangExit, OtpAuthException {
+ final Object o = queue.get(timeout);
+
+ if (o instanceof OtpMsg) {
+ return (OtpMsg) o;
+ } else if (o instanceof IOException) {
+ throw (IOException) o;
+ } else if (o instanceof OtpErlangExit) {
+ throw (OtpErlangExit) o;
+ } else if (o instanceof OtpAuthException) {
+ throw (OtpAuthException) o;
+ }
+
+ return null;
+ }
+
+ /**
+ * Send a message to a process on a remote node.
+ *
+ * @param dest
+ * the Erlang PID of the remote process.
+ * @param msg
+ * the message to send.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ */
+ public void send(final OtpErlangPid dest, final OtpErlangObject msg)
+ throws IOException {
+ // encode and send the message
+ super.sendBuf(self.pid(), dest, new OtpOutputStream(msg));
+ }
+
+ /**
+ * Send a message to a named process on a remote node.
+ *
+ * @param dest
+ * the name of the remote process.
+ * @param msg
+ * the message to send.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ */
+ public void send(final String dest, final OtpErlangObject msg)
+ throws IOException {
+ // encode and send the message
+ super.sendBuf(self.pid(), dest, new OtpOutputStream(msg));
+ }
+
+ /**
+ * Send a pre-encoded message to a named process on a remote node.
+ *
+ * @param dest
+ * the name of the remote process.
+ * @param payload
+ * the encoded message to send.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ */
+ public void sendBuf(final String dest, final OtpOutputStream payload)
+ throws IOException {
+ super.sendBuf(self.pid(), dest, payload);
+ }
+
+ /**
+ * Send a pre-encoded message to a process on a remote node.
+ *
+ * @param dest
+ * the Erlang PID of the remote process.
+ * @param msg
+ * the encoded message to send.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ */
+ public void sendBuf(final OtpErlangPid dest, final OtpOutputStream payload)
+ throws IOException {
+ super.sendBuf(self.pid(), dest, payload);
+ }
+
+ /**
+ * Send an RPC request to the remote Erlang node. This convenience function
+ * creates the following message and sends it to 'rex' on the remote node:
+ *
+ * <pre>
+ * { self, { call, Mod, Fun, Args, user } }
+ * </pre>
+ *
+ * <p>
+ * Note that this method has unpredicatble results if the remote node is not
+ * an Erlang node.
+ * </p>
+ *
+ * @param mod
+ * the name of the Erlang module containing the function to
+ * be called.
+ * @param fun
+ * the name of the function to call.
+ * @param args
+ * an array of Erlang terms, to be used as arguments to the
+ * function.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ */
+ public void sendRPC(final String mod, final String fun,
+ final OtpErlangObject[] args) throws IOException {
+ sendRPC(mod, fun, new OtpErlangList(args));
+ }
+
+ /**
+ * Send an RPC request to the remote Erlang node. This convenience function
+ * creates the following message and sends it to 'rex' on the remote node:
+ *
+ * <pre>
+ * { self, { call, Mod, Fun, Args, user } }
+ * </pre>
+ *
+ * <p>
+ * Note that this method has unpredicatble results if the remote node is not
+ * an Erlang node.
+ * </p>
+ *
+ * @param mod
+ * the name of the Erlang module containing the function to
+ * be called.
+ * @param fun
+ * the name of the function to call.
+ * @param args
+ * a list of Erlang terms, to be used as arguments to the
+ * function.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ */
+ public void sendRPC(final String mod, final String fun,
+ final OtpErlangList args) throws IOException {
+ final OtpErlangObject[] rpc = new OtpErlangObject[2];
+ final OtpErlangObject[] call = new OtpErlangObject[5];
+
+ /* {self, { call, Mod, Fun, Args, user}} */
+
+ call[0] = new OtpErlangAtom("call");
+ call[1] = new OtpErlangAtom(mod);
+ call[2] = new OtpErlangAtom(fun);
+ call[3] = args;
+ call[4] = new OtpErlangAtom("user");
+
+ rpc[0] = self.pid();
+ rpc[1] = new OtpErlangTuple(call);
+
+ send("rex", new OtpErlangTuple(rpc));
+ }
+
+ /**
+ * Receive an RPC reply from the remote Erlang node. This convenience
+ * function receives a message from the remote node, and expects it to have
+ * the following format:
+ *
+ * <pre>
+ * { rex, Term }
+ * </pre>
+ *
+ * @return the second element of the tuple if the received message is a
+ * two-tuple, otherwise null. No further error checking is
+ * performed.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ *
+ * @exception OtpErlangExit
+ * if an exit signal is received from a process on the
+ * peer node.
+ *
+ * @exception OtpAuthException
+ * if the remote node sends a message containing an
+ * invalid cookie.
+ */
+ public OtpErlangObject receiveRPC() throws IOException, OtpErlangExit,
+ OtpAuthException {
+
+ final OtpErlangObject msg = receive();
+
+ if (msg instanceof OtpErlangTuple) {
+ final OtpErlangTuple t = (OtpErlangTuple) msg;
+ if (t.arity() == 2) {
+ return t.elementAt(1); // obs: second element
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Create a link between the local node and the specified process on the
+ * remote node. If the link is still active when the remote process
+ * terminates, an exit signal will be sent to this connection. Use
+ * {@link #unlink unlink()} to remove the link.
+ *
+ * @param dest
+ * the Erlang PID of the remote process.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ */
+ public void link(final OtpErlangPid dest) throws IOException {
+ super.sendLink(self.pid(), dest);
+ }
+
+ /**
+ * Remove a link between the local node and the specified process on the
+ * remote node. This method deactivates links created with
+ * {@link #link link()}.
+ *
+ * @param dest
+ * the Erlang PID of the remote process.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ */
+ public void unlink(final OtpErlangPid dest) throws IOException {
+ super.sendUnlink(self.pid(), dest);
+ }
+
+ /**
+ * Send an exit signal to a remote process.
+ *
+ * @param dest
+ * the Erlang PID of the remote process.
+ * @param reason
+ * an Erlang term describing the exit reason.
+ *
+ * @exception java.io.IOException
+ * if the connection is not active or a communication
+ * error occurs.
+ */
+ public void exit(final OtpErlangPid dest, final OtpErlangObject reason)
+ throws IOException {
+ super.sendExit2(self.pid(), dest, reason);
+ }
+}