From 84adefa331c4159d432d22840663c38f155cd4c1 Mon Sep 17 00:00:00 2001 From: Erlang/OTP Date: Fri, 20 Nov 2009 14:54:40 +0000 Subject: The R13B03 release. --- .../java_src/com/ericsson/otp/erlang/OtpNode.java | 807 +++++++++++++++++++++ 1 file changed, 807 insertions(+) create mode 100644 lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java (limited to 'lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java') diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java new file mode 100644 index 0000000000..d499fae3fb --- /dev/null +++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java @@ -0,0 +1,807 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2000-2009. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ +package com.ericsson.otp.erlang; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Collection; +import java.util.Enumeration; +import java.util.Hashtable; +import java.util.Iterator; + +/** + *

+ * Represents a local OTP node. This class is used when you do not wish to + * manage connections yourself - outgoing connections are established as needed, + * and incoming connections accepted automatically. This class supports the use + * of a mailbox API for communication, while management of the underlying + * communication mechanism is automatic and hidden from the application + * programmer. + *

+ * + *

+ * Once an instance of this class has been created, obtain one or more mailboxes + * in order to send or receive messages. The first message sent to a given node + * will cause a connection to be set up to that node. Any messages received will + * be delivered to the appropriate mailboxes. + *

+ * + *

+ * To shut down the node, call {@link #close close()}. This will prevent the + * node from accepting additional connections and it will cause all existing + * connections to be closed. Any unread messages in existing mailboxes can still + * be read, however no new messages will be delivered to the mailboxes. + *

+ * + *

+ * Note that the use of this class requires that Epmd (Erlang Port Mapper + * Daemon) is running on each cooperating host. This class does not start Epmd + * automatically as Erlang does, you must start it manually or through some + * other means. See the Erlang documentation for more information about this. + *

+ */ +public class OtpNode extends OtpLocalNode { + private boolean initDone = false; + + // thread to manage incoming connections + private Acceptor acceptor = null; + + // keep track of all connections + Hashtable connections = null; + + // keep track of all mailboxes + Mailboxes mboxes = null; + + // handle status changes + OtpNodeStatus handler; + + // flags + private int flags = 0; + + /** + *

+ * Create a node using the default cookie. The default cookie is found by + * reading the first line of the .erlang.cookie file in the user's home + * directory. The home directory is obtained from the System property + * "user.home". + *

+ * + *

+ * If the file does not exist, an empty string is used. This method makes no + * attempt to create the file. + *

+ * + * @param node + * the name of this node. + * + * @exception IOException + * if communication could not be initialized. + * + */ + public OtpNode(final String node) throws IOException { + this(node, defaultCookie, 0); + } + + /** + * Create a node. + * + * @param node + * the name of this node. + * + * @param cookie + * the authorization cookie that will be used by this node when + * it communicates with other nodes. + * + * @exception IOException + * if communication could not be initialized. + * + */ + public OtpNode(final String node, final String cookie) throws IOException { + this(node, cookie, 0); + } + + /** + * Create a node. + * + * @param node + * the name of this node. + * + * @param cookie + * the authorization cookie that will be used by this node when + * it communicates with other nodes. + * + * @param port + * the port number you wish to use for incoming connections. + * Specifying 0 lets the system choose an available port. + * + * @exception IOException + * if communication could not be initialized. + * + */ + public OtpNode(final String node, final String cookie, final int port) + throws IOException { + super(node, cookie); + + init(port); + } + + private synchronized void init(final int port) throws IOException { + if (!initDone) { + connections = new Hashtable(17, + (float) 0.95); + mboxes = new Mailboxes(); + acceptor = new Acceptor(port); + initDone = true; + } + } + + /** + * Close the node. Unpublish the node from Epmd (preventing new connections) + * and close all existing connections. + */ + public synchronized void close() { + acceptor.quit(); + OtpCookedConnection conn; + final Collection coll = connections.values(); + final Iterator it = coll.iterator(); + + mboxes.clear(); + + while (it.hasNext()) { + conn = it.next(); + it.remove(); + conn.close(); + } + initDone = false; + } + + @Override + protected void finalize() { + close(); + } + + /** + * Create an unnamed {@link OtpMbox mailbox} that can be used to send and + * receive messages with other, similar mailboxes and with Erlang processes. + * Messages can be sent to this mailbox by using its associated + * {@link OtpMbox#self pid}. + * + * @return a mailbox. + */ + public OtpMbox createMbox() { + return mboxes.create(); + } + + /** + * Close the specified mailbox with reason 'normal'. + * + * @param mbox + * the mailbox to close. + * + *

+ * After this operation, the mailbox will no longer be able to + * receive messages. Any delivered but as yet unretrieved + * messages can still be retrieved however. + *

+ * + *

+ * If there are links from the mailbox to other + * {@link OtpErlangPid pids}, they will be broken when this + * method is called and exit signals with reason 'normal' will be + * sent. + *

+ * + */ + public void closeMbox(final OtpMbox mbox) { + closeMbox(mbox, new OtpErlangAtom("normal")); + } + + /** + * Close the specified mailbox with the given reason. + * + * @param mbox + * the mailbox to close. + * @param reason + * an Erlang term describing the reason for the termination. + * + *

+ * After this operation, the mailbox will no longer be able to + * receive messages. Any delivered but as yet unretrieved + * messages can still be retrieved however. + *

+ * + *

+ * If there are links from the mailbox to other + * {@link OtpErlangPid pids}, they will be broken when this + * method is called and exit signals with the given reason will + * be sent. + *

+ * + */ + public void closeMbox(final OtpMbox mbox, final OtpErlangObject reason) { + if (mbox != null) { + mboxes.remove(mbox); + mbox.name = null; + mbox.breakLinks(reason); + } + } + + /** + * Create an named mailbox that can be used to send and receive messages + * with other, similar mailboxes and with Erlang processes. Messages can be + * sent to this mailbox by using its registered name or the associated + * {@link OtpMbox#self pid}. + * + * @param name + * a name to register for this mailbox. The name must be unique + * within this OtpNode. + * + * @return a mailbox, or null if the name was already in use. + * + */ + public OtpMbox createMbox(final String name) { + return mboxes.create(name); + } + + /** + *

+ * Register or remove a name for the given mailbox. Registering a name for a + * mailbox enables others to send messages without knowing the + * {@link OtpErlangPid pid} of the mailbox. A mailbox can have at most one + * name; if the mailbox already had a name, calling this method will + * supercede that name. + *

+ * + * @param name + * the name to register for the mailbox. Specify null to + * unregister the existing name from this mailbox. + * + * @param mbox + * the mailbox to associate with the name. + * + * @return true if the name was available, or false otherwise. + */ + public boolean registerName(final String name, final OtpMbox mbox) { + return mboxes.register(name, mbox); + } + + /** + * Get a list of all known registered names on this node. + * + * @return an array of Strings, containins all known registered names on + * this node. + */ + + public String[] getNames() { + return mboxes.names(); + } + + /** + * Determine the {@link OtpErlangPid pid} corresponding to a registered name + * on this node. + * + * @return the {@link OtpErlangPid pid} corresponding to the registered + * name, or null if the name is not known on this node. + */ + public OtpErlangPid whereis(final String name) { + final OtpMbox m = mboxes.get(name); + if (m != null) { + return m.self(); + } + return null; + } + + /** + * Register interest in certain system events. The {@link OtpNodeStatus + * OtpNodeStatus} handler object contains callback methods, that will be + * called when certain events occur. + * + * @param handler + * the callback object to register. To clear the handler, specify + * null as the handler to use. + * + */ + public synchronized void registerStatusHandler(final OtpNodeStatus handler) { + this.handler = handler; + } + + /** + *

+ * Determine if another node is alive. This method has the side effect of + * setting up a connection to the remote node (if possible). Only a single + * outgoing message is sent; the timeout is how long to wait for a response. + *

+ * + *

+ * Only a single attempt is made to connect to the remote node, so for + * example it is not possible to specify an extremely long timeout and + * expect to be notified when the node eventually comes up. If you wish to + * wait for a remote node to be started, the following construction may be + * useful: + *

+ * + *
+     * // ping every 2 seconds until positive response
+     * while (!me.ping(him, 2000))
+     *     ;
+     * 
+ * + * @param node + * the name of the node to ping. + * + * @param timeout + * the time, in milliseconds, to wait for response before + * returning false. + * + * @return true if the node was alive and the correct ping response was + * returned. false if the correct response was not returned on time. + */ + /* + * internal info about the message formats... + * + * the request: -> REG_SEND {6,#Pid,'',net_kernel} + * {'$gen_call',{#Pid,#Ref},{is_auth,bingo@aule}} + * + * the reply: <- SEND {2,'',#Pid} {#Ref,yes} + */ + public boolean ping(final String node, final long timeout) { + if (node.equals(this.node)) { + return true; + } else if (node.indexOf('@', 0) < 0 + && node.equals(this.node + .substring(0, this.node.indexOf('@', 0)))) { + return true; + } + + // other node + OtpMbox mbox = null; + try { + mbox = createMbox(); + mbox.send("net_kernel", node, getPingTuple(mbox)); + final OtpErlangObject reply = mbox.receive(timeout); + + final OtpErlangTuple t = (OtpErlangTuple) reply; + final OtpErlangAtom a = (OtpErlangAtom) t.elementAt(1); + return "yes".equals(a.atomValue()); + } catch (final Exception e) { + } finally { + closeMbox(mbox); + } + return false; + } + + /* create the outgoing ping message */ + private OtpErlangTuple getPingTuple(final OtpMbox mbox) { + final OtpErlangObject[] ping = new OtpErlangObject[3]; + final OtpErlangObject[] pid = new OtpErlangObject[2]; + final OtpErlangObject[] node = new OtpErlangObject[2]; + + pid[0] = mbox.self(); + pid[1] = createRef(); + + node[0] = new OtpErlangAtom("is_auth"); + node[1] = new OtpErlangAtom(node()); + + ping[0] = new OtpErlangAtom("$gen_call"); + ping[1] = new OtpErlangTuple(pid); + ping[2] = new OtpErlangTuple(node); + + return new OtpErlangTuple(ping); + } + + /* + * this method simulates net_kernel only for the purpose of replying to + * pings. + */ + private boolean netKernel(final OtpMsg m) { + OtpMbox mbox = null; + try { + final OtpErlangTuple t = (OtpErlangTuple) m.getMsg(); + final OtpErlangTuple req = (OtpErlangTuple) t.elementAt(1); // actual + // request + + final OtpErlangPid pid = (OtpErlangPid) req.elementAt(0); // originating + // pid + + final OtpErlangObject[] pong = new OtpErlangObject[2]; + pong[0] = req.elementAt(1); // his #Ref + pong[1] = new OtpErlangAtom("yes"); + + mbox = createMbox(); + mbox.send(pid, new OtpErlangTuple(pong)); + return true; + } catch (final Exception e) { + } finally { + closeMbox(mbox); + } + return false; + } + + /* + * OtpCookedConnection delivers messages here return true if message was + * delivered successfully, or false otherwise. + */ + boolean deliver(final OtpMsg m) { + OtpMbox mbox = null; + + try { + final int t = m.type(); + + if (t == OtpMsg.regSendTag) { + final String name = m.getRecipientName(); + /* special case for netKernel requests */ + if (name.equals("net_kernel")) { + return netKernel(m); + } else { + mbox = mboxes.get(name); + } + } else { + mbox = mboxes.get(m.getRecipientPid()); + } + + if (mbox == null) { + return false; + } + mbox.deliver(m); + } catch (final Exception e) { + return false; + } + + return true; + } + + /* + * OtpCookedConnection delivers errors here, we send them on to the handler + * specified by the application + */ + void deliverError(final OtpCookedConnection conn, final Exception e) { + removeConnection(conn); + remoteStatus(conn.name, false, e); + } + + /* + * find or create a connection to the given node + */ + OtpCookedConnection getConnection(final String node) { + OtpPeer peer = null; + OtpCookedConnection conn = null; + + synchronized (connections) { + // first just try looking up the name as-is + conn = connections.get(node); + + if (conn == null) { + // in case node had no '@' add localhost info and try again + peer = new OtpPeer(node); + conn = connections.get(peer.node()); + + if (conn == null) { + try { + conn = new OtpCookedConnection(this, peer); + conn.setFlags(flags); + addConnection(conn); + } catch (final Exception e) { + /* false = outgoing */ + connAttempt(peer.node(), false, e); + } + } + } + return conn; + } + } + + void addConnection(final OtpCookedConnection conn) { + if (conn != null && conn.name != null) { + connections.put(conn.name, conn); + remoteStatus(conn.name, true, null); + } + } + + private void removeConnection(final OtpCookedConnection conn) { + if (conn != null && conn.name != null) { + connections.remove(conn.name); + } + } + + /* use these wrappers to call handler functions */ + private synchronized void remoteStatus(final String node, final boolean up, + final Object info) { + if (handler == null) { + return; + } + try { + handler.remoteStatus(node, up, info); + } catch (final Exception e) { + } + } + + synchronized void localStatus(final String node, final boolean up, + final Object info) { + if (handler == null) { + return; + } + try { + handler.localStatus(node, up, info); + } catch (final Exception e) { + } + } + + synchronized void connAttempt(final String node, final boolean incoming, + final Object info) { + if (handler == null) { + return; + } + try { + handler.connAttempt(node, incoming, info); + } catch (final Exception e) { + } + } + + /* + * this class used to wrap the mailbox hashtables so we can use weak + * references + */ + public class Mailboxes { + // mbox pids here + private Hashtable> byPid = null; + // mbox names here + private Hashtable> byName = null; + + public Mailboxes() { + byPid = new Hashtable>(17, + (float) 0.95); + byName = new Hashtable>(17, + (float) 0.95); + } + + public OtpMbox create(final String name) { + OtpMbox m = null; + + synchronized (byName) { + if (get(name) != null) { + return null; + } + final OtpErlangPid pid = createPid(); + m = new OtpMbox(OtpNode.this, pid, name); + byPid.put(pid, new WeakReference(m)); + byName.put(name, new WeakReference(m)); + } + return m; + } + + public OtpMbox create() { + final OtpErlangPid pid = createPid(); + final OtpMbox m = new OtpMbox(OtpNode.this, pid); + byPid.put(pid, new WeakReference(m)); + return m; + } + + public void clear() { + byPid.clear(); + byName.clear(); + } + + public String[] names() { + String allnames[] = null; + + synchronized (byName) { + final int n = byName.size(); + final Enumeration keys = byName.keys(); + allnames = new String[n]; + + int i = 0; + while (keys.hasMoreElements()) { + allnames[i++] = keys.nextElement(); + } + } + return allnames; + } + + public boolean register(final String name, final OtpMbox mbox) { + if (name == null) { + if (mbox.name != null) { + byName.remove(mbox.name); + mbox.name = null; + } + } else { + synchronized (byName) { + if (get(name) != null) { + return false; + } + byName.put(name, new WeakReference(mbox)); + mbox.name = name; + } + } + return true; + } + + /* + * look up a mailbox based on its name. If the mailbox has gone out of + * scope we also remove the reference from the hashtable so we don't + * find it again. + */ + public OtpMbox get(final String name) { + final WeakReference wr = byName.get(name); + + if (wr != null) { + final OtpMbox m = wr.get(); + + if (m != null) { + return m; + } + byName.remove(name); + } + return null; + } + + /* + * look up a mailbox based on its pid. If the mailbox has gone out of + * scope we also remove the reference from the hashtable so we don't + * find it again. + */ + public OtpMbox get(final OtpErlangPid pid) { + final WeakReference wr = byPid.get(pid); + + if (wr != null) { + final OtpMbox m = wr.get(); + + if (m != null) { + return m; + } + byPid.remove(pid); + } + return null; + } + + public void remove(final OtpMbox mbox) { + byPid.remove(mbox.self); + if (mbox.name != null) { + byName.remove(mbox.name); + } + } + } + + /* + * this thread simply listens for incoming connections + */ + public class Acceptor extends Thread { + private final ServerSocket sock; + private final int port; + private volatile boolean done = false; + + Acceptor(final int port) throws IOException { + sock = new ServerSocket(port); + this.port = sock.getLocalPort(); + OtpNode.this.port = this.port; + + setDaemon(true); + setName("acceptor"); + publishPort(); + start(); + } + + private boolean publishPort() throws IOException { + if (getEpmd() != null) { + return false; // already published + } + OtpEpmd.publishPort(OtpNode.this); + return true; + } + + private void unPublishPort() { + // unregister with epmd + OtpEpmd.unPublishPort(OtpNode.this); + + // close the local descriptor (if we have one) + closeSock(epmd); + epmd = null; + } + + public void quit() { + unPublishPort(); + done = true; + closeSock(sock); + localStatus(node, false, null); + } + + private void closeSock(final ServerSocket s) { + try { + if (s != null) { + s.close(); + } + } catch (final Exception e) { + } + } + + private void closeSock(final Socket s) { + try { + if (s != null) { + s.close(); + } + } catch (final Exception e) { + } + } + + public int port() { + return port; + } + + @Override + public void run() { + Socket newsock = null; + OtpCookedConnection conn = null; + + localStatus(node, true, null); + + accept_loop: while (!done) { + conn = null; + + try { + newsock = sock.accept(); + } catch (final Exception e) { + // Problem in java1.2.2: accept throws SocketException + // when socket is closed. This will happen when + // acceptor.quit() + // is called. acceptor.quit() will call localStatus(...), so + // we have to check if that's where we come from. + if (!done) { + localStatus(node, false, e); + } + break accept_loop; + } + + try { + synchronized (connections) { + conn = new OtpCookedConnection(OtpNode.this, newsock); + conn.setFlags(flags); + addConnection(conn); + } + } catch (final OtpAuthException e) { + if (conn != null && conn.name != null) { + connAttempt(conn.name, true, e); + } else { + connAttempt("unknown", true, e); + } + closeSock(newsock); + } catch (final IOException e) { + if (conn != null && conn.name != null) { + connAttempt(conn.name, true, e); + } else { + connAttempt("unknown", true, e); + } + closeSock(newsock); + } catch (final Exception e) { + closeSock(newsock); + closeSock(sock); + localStatus(node, false, e); + break accept_loop; + } + } // while + + // if we have exited loop we must do this too + unPublishPort(); + } + } + + public void setFlags(final int flags) { + this.flags = flags; + } +} -- cgit v1.2.3