aboutsummaryrefslogtreecommitdiffstats
path: root/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java')
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java807
1 files changed, 807 insertions, 0 deletions
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java
new file mode 100644
index 0000000000..d499fae3fb
--- /dev/null
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java
@@ -0,0 +1,807 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2000-2009. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+package com.ericsson.otp.erlang;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Hashtable;
+import java.util.Iterator;
+
+/**
+ * <p>
+ * Represents a local OTP node. This class is used when you do not wish to
+ * manage connections yourself - outgoing connections are established as needed,
+ * and incoming connections accepted automatically. This class supports the use
+ * of a mailbox API for communication, while management of the underlying
+ * communication mechanism is automatic and hidden from the application
+ * programmer.
+ * </p>
+ *
+ * <p>
+ * Once an instance of this class has been created, obtain one or more mailboxes
+ * in order to send or receive messages. The first message sent to a given node
+ * will cause a connection to be set up to that node. Any messages received will
+ * be delivered to the appropriate mailboxes.
+ * </p>
+ *
+ * <p>
+ * To shut down the node, call {@link #close close()}. This will prevent the
+ * node from accepting additional connections and it will cause all existing
+ * connections to be closed. Any unread messages in existing mailboxes can still
+ * be read, however no new messages will be delivered to the mailboxes.
+ * </p>
+ *
+ * <p>
+ * Note that the use of this class requires that Epmd (Erlang Port Mapper
+ * Daemon) is running on each cooperating host. This class does not start Epmd
+ * automatically as Erlang does, you must start it manually or through some
+ * other means. See the Erlang documentation for more information about this.
+ * </p>
+ */
+public class OtpNode extends OtpLocalNode {
+ private boolean initDone = false;
+
+ // thread to manage incoming connections
+ private Acceptor acceptor = null;
+
+ // keep track of all connections
+ Hashtable<String, OtpCookedConnection> connections = null;
+
+ // keep track of all mailboxes
+ Mailboxes mboxes = null;
+
+ // handle status changes
+ OtpNodeStatus handler;
+
+ // flags
+ private int flags = 0;
+
+ /**
+ * <p>
+ * Create a node using the default cookie. The default cookie is found by
+ * reading the first line of the .erlang.cookie file in the user's home
+ * directory. The home directory is obtained from the System property
+ * "user.home".
+ * </p>
+ *
+ * <p>
+ * If the file does not exist, an empty string is used. This method makes no
+ * attempt to create the file.
+ * </p>
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @exception IOException
+ * if communication could not be initialized.
+ *
+ */
+ public OtpNode(final String node) throws IOException {
+ this(node, defaultCookie, 0);
+ }
+
+ /**
+ * Create a node.
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param cookie
+ * the authorization cookie that will be used by this node when
+ * it communicates with other nodes.
+ *
+ * @exception IOException
+ * if communication could not be initialized.
+ *
+ */
+ public OtpNode(final String node, final String cookie) throws IOException {
+ this(node, cookie, 0);
+ }
+
+ /**
+ * Create a node.
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param cookie
+ * the authorization cookie that will be used by this node when
+ * it communicates with other nodes.
+ *
+ * @param port
+ * the port number you wish to use for incoming connections.
+ * Specifying 0 lets the system choose an available port.
+ *
+ * @exception IOException
+ * if communication could not be initialized.
+ *
+ */
+ public OtpNode(final String node, final String cookie, final int port)
+ throws IOException {
+ super(node, cookie);
+
+ init(port);
+ }
+
+ private synchronized void init(final int port) throws IOException {
+ if (!initDone) {
+ connections = new Hashtable<String, OtpCookedConnection>(17,
+ (float) 0.95);
+ mboxes = new Mailboxes();
+ acceptor = new Acceptor(port);
+ initDone = true;
+ }
+ }
+
+ /**
+ * Close the node. Unpublish the node from Epmd (preventing new connections)
+ * and close all existing connections.
+ */
+ public synchronized void close() {
+ acceptor.quit();
+ OtpCookedConnection conn;
+ final Collection<OtpCookedConnection> coll = connections.values();
+ final Iterator<OtpCookedConnection> it = coll.iterator();
+
+ mboxes.clear();
+
+ while (it.hasNext()) {
+ conn = it.next();
+ it.remove();
+ conn.close();
+ }
+ initDone = false;
+ }
+
+ @Override
+ protected void finalize() {
+ close();
+ }
+
+ /**
+ * Create an unnamed {@link OtpMbox mailbox} that can be used to send and
+ * receive messages with other, similar mailboxes and with Erlang processes.
+ * Messages can be sent to this mailbox by using its associated
+ * {@link OtpMbox#self pid}.
+ *
+ * @return a mailbox.
+ */
+ public OtpMbox createMbox() {
+ return mboxes.create();
+ }
+
+ /**
+ * Close the specified mailbox with reason 'normal'.
+ *
+ * @param mbox
+ * the mailbox to close.
+ *
+ * <p>
+ * After this operation, the mailbox will no longer be able to
+ * receive messages. Any delivered but as yet unretrieved
+ * messages can still be retrieved however.
+ * </p>
+ *
+ * <p>
+ * If there are links from the mailbox to other
+ * {@link OtpErlangPid pids}, they will be broken when this
+ * method is called and exit signals with reason 'normal' will be
+ * sent.
+ * </p>
+ *
+ */
+ public void closeMbox(final OtpMbox mbox) {
+ closeMbox(mbox, new OtpErlangAtom("normal"));
+ }
+
+ /**
+ * Close the specified mailbox with the given reason.
+ *
+ * @param mbox
+ * the mailbox to close.
+ * @param reason
+ * an Erlang term describing the reason for the termination.
+ *
+ * <p>
+ * After this operation, the mailbox will no longer be able to
+ * receive messages. Any delivered but as yet unretrieved
+ * messages can still be retrieved however.
+ * </p>
+ *
+ * <p>
+ * If there are links from the mailbox to other
+ * {@link OtpErlangPid pids}, they will be broken when this
+ * method is called and exit signals with the given reason will
+ * be sent.
+ * </p>
+ *
+ */
+ public void closeMbox(final OtpMbox mbox, final OtpErlangObject reason) {
+ if (mbox != null) {
+ mboxes.remove(mbox);
+ mbox.name = null;
+ mbox.breakLinks(reason);
+ }
+ }
+
+ /**
+ * Create an named mailbox that can be used to send and receive messages
+ * with other, similar mailboxes and with Erlang processes. Messages can be
+ * sent to this mailbox by using its registered name or the associated
+ * {@link OtpMbox#self pid}.
+ *
+ * @param name
+ * a name to register for this mailbox. The name must be unique
+ * within this OtpNode.
+ *
+ * @return a mailbox, or null if the name was already in use.
+ *
+ */
+ public OtpMbox createMbox(final String name) {
+ return mboxes.create(name);
+ }
+
+ /**
+ * <p>
+ * Register or remove a name for the given mailbox. Registering a name for a
+ * mailbox enables others to send messages without knowing the
+ * {@link OtpErlangPid pid} of the mailbox. A mailbox can have at most one
+ * name; if the mailbox already had a name, calling this method will
+ * supercede that name.
+ * </p>
+ *
+ * @param name
+ * the name to register for the mailbox. Specify null to
+ * unregister the existing name from this mailbox.
+ *
+ * @param mbox
+ * the mailbox to associate with the name.
+ *
+ * @return true if the name was available, or false otherwise.
+ */
+ public boolean registerName(final String name, final OtpMbox mbox) {
+ return mboxes.register(name, mbox);
+ }
+
+ /**
+ * Get a list of all known registered names on this node.
+ *
+ * @return an array of Strings, containins all known registered names on
+ * this node.
+ */
+
+ public String[] getNames() {
+ return mboxes.names();
+ }
+
+ /**
+ * Determine the {@link OtpErlangPid pid} corresponding to a registered name
+ * on this node.
+ *
+ * @return the {@link OtpErlangPid pid} corresponding to the registered
+ * name, or null if the name is not known on this node.
+ */
+ public OtpErlangPid whereis(final String name) {
+ final OtpMbox m = mboxes.get(name);
+ if (m != null) {
+ return m.self();
+ }
+ return null;
+ }
+
+ /**
+ * Register interest in certain system events. The {@link OtpNodeStatus
+ * OtpNodeStatus} handler object contains callback methods, that will be
+ * called when certain events occur.
+ *
+ * @param handler
+ * the callback object to register. To clear the handler, specify
+ * null as the handler to use.
+ *
+ */
+ public synchronized void registerStatusHandler(final OtpNodeStatus handler) {
+ this.handler = handler;
+ }
+
+ /**
+ * <p>
+ * Determine if another node is alive. This method has the side effect of
+ * setting up a connection to the remote node (if possible). Only a single
+ * outgoing message is sent; the timeout is how long to wait for a response.
+ * </p>
+ *
+ * <p>
+ * Only a single attempt is made to connect to the remote node, so for
+ * example it is not possible to specify an extremely long timeout and
+ * expect to be notified when the node eventually comes up. If you wish to
+ * wait for a remote node to be started, the following construction may be
+ * useful:
+ * </p>
+ *
+ * <pre>
+ * // ping every 2 seconds until positive response
+ * while (!me.ping(him, 2000))
+ * ;
+ * </pre>
+ *
+ * @param node
+ * the name of the node to ping.
+ *
+ * @param timeout
+ * the time, in milliseconds, to wait for response before
+ * returning false.
+ *
+ * @return true if the node was alive and the correct ping response was
+ * returned. false if the correct response was not returned on time.
+ */
+ /*
+ * internal info about the message formats...
+ *
+ * the request: -> REG_SEND {6,#Pid<[email protected]>,'',net_kernel}
+ * {'$gen_call',{#Pid<[email protected]>,#Ref<[email protected]>},{is_auth,bingo@aule}}
+ *
+ * the reply: <- SEND {2,'',#Pid<[email protected]>} {#Ref<[email protected]>,yes}
+ */
+ public boolean ping(final String node, final long timeout) {
+ if (node.equals(this.node)) {
+ return true;
+ } else if (node.indexOf('@', 0) < 0
+ && node.equals(this.node
+ .substring(0, this.node.indexOf('@', 0)))) {
+ return true;
+ }
+
+ // other node
+ OtpMbox mbox = null;
+ try {
+ mbox = createMbox();
+ mbox.send("net_kernel", node, getPingTuple(mbox));
+ final OtpErlangObject reply = mbox.receive(timeout);
+
+ final OtpErlangTuple t = (OtpErlangTuple) reply;
+ final OtpErlangAtom a = (OtpErlangAtom) t.elementAt(1);
+ return "yes".equals(a.atomValue());
+ } catch (final Exception e) {
+ } finally {
+ closeMbox(mbox);
+ }
+ return false;
+ }
+
+ /* create the outgoing ping message */
+ private OtpErlangTuple getPingTuple(final OtpMbox mbox) {
+ final OtpErlangObject[] ping = new OtpErlangObject[3];
+ final OtpErlangObject[] pid = new OtpErlangObject[2];
+ final OtpErlangObject[] node = new OtpErlangObject[2];
+
+ pid[0] = mbox.self();
+ pid[1] = createRef();
+
+ node[0] = new OtpErlangAtom("is_auth");
+ node[1] = new OtpErlangAtom(node());
+
+ ping[0] = new OtpErlangAtom("$gen_call");
+ ping[1] = new OtpErlangTuple(pid);
+ ping[2] = new OtpErlangTuple(node);
+
+ return new OtpErlangTuple(ping);
+ }
+
+ /*
+ * this method simulates net_kernel only for the purpose of replying to
+ * pings.
+ */
+ private boolean netKernel(final OtpMsg m) {
+ OtpMbox mbox = null;
+ try {
+ final OtpErlangTuple t = (OtpErlangTuple) m.getMsg();
+ final OtpErlangTuple req = (OtpErlangTuple) t.elementAt(1); // actual
+ // request
+
+ final OtpErlangPid pid = (OtpErlangPid) req.elementAt(0); // originating
+ // pid
+
+ final OtpErlangObject[] pong = new OtpErlangObject[2];
+ pong[0] = req.elementAt(1); // his #Ref
+ pong[1] = new OtpErlangAtom("yes");
+
+ mbox = createMbox();
+ mbox.send(pid, new OtpErlangTuple(pong));
+ return true;
+ } catch (final Exception e) {
+ } finally {
+ closeMbox(mbox);
+ }
+ return false;
+ }
+
+ /*
+ * OtpCookedConnection delivers messages here return true if message was
+ * delivered successfully, or false otherwise.
+ */
+ boolean deliver(final OtpMsg m) {
+ OtpMbox mbox = null;
+
+ try {
+ final int t = m.type();
+
+ if (t == OtpMsg.regSendTag) {
+ final String name = m.getRecipientName();
+ /* special case for netKernel requests */
+ if (name.equals("net_kernel")) {
+ return netKernel(m);
+ } else {
+ mbox = mboxes.get(name);
+ }
+ } else {
+ mbox = mboxes.get(m.getRecipientPid());
+ }
+
+ if (mbox == null) {
+ return false;
+ }
+ mbox.deliver(m);
+ } catch (final Exception e) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /*
+ * OtpCookedConnection delivers errors here, we send them on to the handler
+ * specified by the application
+ */
+ void deliverError(final OtpCookedConnection conn, final Exception e) {
+ removeConnection(conn);
+ remoteStatus(conn.name, false, e);
+ }
+
+ /*
+ * find or create a connection to the given node
+ */
+ OtpCookedConnection getConnection(final String node) {
+ OtpPeer peer = null;
+ OtpCookedConnection conn = null;
+
+ synchronized (connections) {
+ // first just try looking up the name as-is
+ conn = connections.get(node);
+
+ if (conn == null) {
+ // in case node had no '@' add localhost info and try again
+ peer = new OtpPeer(node);
+ conn = connections.get(peer.node());
+
+ if (conn == null) {
+ try {
+ conn = new OtpCookedConnection(this, peer);
+ conn.setFlags(flags);
+ addConnection(conn);
+ } catch (final Exception e) {
+ /* false = outgoing */
+ connAttempt(peer.node(), false, e);
+ }
+ }
+ }
+ return conn;
+ }
+ }
+
+ void addConnection(final OtpCookedConnection conn) {
+ if (conn != null && conn.name != null) {
+ connections.put(conn.name, conn);
+ remoteStatus(conn.name, true, null);
+ }
+ }
+
+ private void removeConnection(final OtpCookedConnection conn) {
+ if (conn != null && conn.name != null) {
+ connections.remove(conn.name);
+ }
+ }
+
+ /* use these wrappers to call handler functions */
+ private synchronized void remoteStatus(final String node, final boolean up,
+ final Object info) {
+ if (handler == null) {
+ return;
+ }
+ try {
+ handler.remoteStatus(node, up, info);
+ } catch (final Exception e) {
+ }
+ }
+
+ synchronized void localStatus(final String node, final boolean up,
+ final Object info) {
+ if (handler == null) {
+ return;
+ }
+ try {
+ handler.localStatus(node, up, info);
+ } catch (final Exception e) {
+ }
+ }
+
+ synchronized void connAttempt(final String node, final boolean incoming,
+ final Object info) {
+ if (handler == null) {
+ return;
+ }
+ try {
+ handler.connAttempt(node, incoming, info);
+ } catch (final Exception e) {
+ }
+ }
+
+ /*
+ * this class used to wrap the mailbox hashtables so we can use weak
+ * references
+ */
+ public class Mailboxes {
+ // mbox pids here
+ private Hashtable<OtpErlangPid, WeakReference<OtpMbox>> byPid = null;
+ // mbox names here
+ private Hashtable<String, WeakReference<OtpMbox>> byName = null;
+
+ public Mailboxes() {
+ byPid = new Hashtable<OtpErlangPid, WeakReference<OtpMbox>>(17,
+ (float) 0.95);
+ byName = new Hashtable<String, WeakReference<OtpMbox>>(17,
+ (float) 0.95);
+ }
+
+ public OtpMbox create(final String name) {
+ OtpMbox m = null;
+
+ synchronized (byName) {
+ if (get(name) != null) {
+ return null;
+ }
+ final OtpErlangPid pid = createPid();
+ m = new OtpMbox(OtpNode.this, pid, name);
+ byPid.put(pid, new WeakReference<OtpMbox>(m));
+ byName.put(name, new WeakReference<OtpMbox>(m));
+ }
+ return m;
+ }
+
+ public OtpMbox create() {
+ final OtpErlangPid pid = createPid();
+ final OtpMbox m = new OtpMbox(OtpNode.this, pid);
+ byPid.put(pid, new WeakReference<OtpMbox>(m));
+ return m;
+ }
+
+ public void clear() {
+ byPid.clear();
+ byName.clear();
+ }
+
+ public String[] names() {
+ String allnames[] = null;
+
+ synchronized (byName) {
+ final int n = byName.size();
+ final Enumeration<String> keys = byName.keys();
+ allnames = new String[n];
+
+ int i = 0;
+ while (keys.hasMoreElements()) {
+ allnames[i++] = keys.nextElement();
+ }
+ }
+ return allnames;
+ }
+
+ public boolean register(final String name, final OtpMbox mbox) {
+ if (name == null) {
+ if (mbox.name != null) {
+ byName.remove(mbox.name);
+ mbox.name = null;
+ }
+ } else {
+ synchronized (byName) {
+ if (get(name) != null) {
+ return false;
+ }
+ byName.put(name, new WeakReference<OtpMbox>(mbox));
+ mbox.name = name;
+ }
+ }
+ return true;
+ }
+
+ /*
+ * look up a mailbox based on its name. If the mailbox has gone out of
+ * scope we also remove the reference from the hashtable so we don't
+ * find it again.
+ */
+ public OtpMbox get(final String name) {
+ final WeakReference<OtpMbox> wr = byName.get(name);
+
+ if (wr != null) {
+ final OtpMbox m = wr.get();
+
+ if (m != null) {
+ return m;
+ }
+ byName.remove(name);
+ }
+ return null;
+ }
+
+ /*
+ * look up a mailbox based on its pid. If the mailbox has gone out of
+ * scope we also remove the reference from the hashtable so we don't
+ * find it again.
+ */
+ public OtpMbox get(final OtpErlangPid pid) {
+ final WeakReference<OtpMbox> wr = byPid.get(pid);
+
+ if (wr != null) {
+ final OtpMbox m = wr.get();
+
+ if (m != null) {
+ return m;
+ }
+ byPid.remove(pid);
+ }
+ return null;
+ }
+
+ public void remove(final OtpMbox mbox) {
+ byPid.remove(mbox.self);
+ if (mbox.name != null) {
+ byName.remove(mbox.name);
+ }
+ }
+ }
+
+ /*
+ * this thread simply listens for incoming connections
+ */
+ public class Acceptor extends Thread {
+ private final ServerSocket sock;
+ private final int port;
+ private volatile boolean done = false;
+
+ Acceptor(final int port) throws IOException {
+ sock = new ServerSocket(port);
+ this.port = sock.getLocalPort();
+ OtpNode.this.port = this.port;
+
+ setDaemon(true);
+ setName("acceptor");
+ publishPort();
+ start();
+ }
+
+ private boolean publishPort() throws IOException {
+ if (getEpmd() != null) {
+ return false; // already published
+ }
+ OtpEpmd.publishPort(OtpNode.this);
+ return true;
+ }
+
+ private void unPublishPort() {
+ // unregister with epmd
+ OtpEpmd.unPublishPort(OtpNode.this);
+
+ // close the local descriptor (if we have one)
+ closeSock(epmd);
+ epmd = null;
+ }
+
+ public void quit() {
+ unPublishPort();
+ done = true;
+ closeSock(sock);
+ localStatus(node, false, null);
+ }
+
+ private void closeSock(final ServerSocket s) {
+ try {
+ if (s != null) {
+ s.close();
+ }
+ } catch (final Exception e) {
+ }
+ }
+
+ private void closeSock(final Socket s) {
+ try {
+ if (s != null) {
+ s.close();
+ }
+ } catch (final Exception e) {
+ }
+ }
+
+ public int port() {
+ return port;
+ }
+
+ @Override
+ public void run() {
+ Socket newsock = null;
+ OtpCookedConnection conn = null;
+
+ localStatus(node, true, null);
+
+ accept_loop: while (!done) {
+ conn = null;
+
+ try {
+ newsock = sock.accept();
+ } catch (final Exception e) {
+ // Problem in java1.2.2: accept throws SocketException
+ // when socket is closed. This will happen when
+ // acceptor.quit()
+ // is called. acceptor.quit() will call localStatus(...), so
+ // we have to check if that's where we come from.
+ if (!done) {
+ localStatus(node, false, e);
+ }
+ break accept_loop;
+ }
+
+ try {
+ synchronized (connections) {
+ conn = new OtpCookedConnection(OtpNode.this, newsock);
+ conn.setFlags(flags);
+ addConnection(conn);
+ }
+ } catch (final OtpAuthException e) {
+ if (conn != null && conn.name != null) {
+ connAttempt(conn.name, true, e);
+ } else {
+ connAttempt("unknown", true, e);
+ }
+ closeSock(newsock);
+ } catch (final IOException e) {
+ if (conn != null && conn.name != null) {
+ connAttempt(conn.name, true, e);
+ } else {
+ connAttempt("unknown", true, e);
+ }
+ closeSock(newsock);
+ } catch (final Exception e) {
+ closeSock(newsock);
+ closeSock(sock);
+ localStatus(node, false, e);
+ break accept_loop;
+ }
+ } // while
+
+ // if we have exited loop we must do this too
+ unPublishPort();
+ }
+ }
+
+ public void setFlags(final int flags) {
+ this.flags = flags;
+ }
+}