aboutsummaryrefslogtreecommitdiffstats
path: root/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java')
-rw-r--r--lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java1007
1 files changed, 543 insertions, 464 deletions
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java
index 68addb9f2c..7512d34c21 100644
--- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java
+++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpNode.java
@@ -1,27 +1,25 @@
-/*
+/*
* %CopyrightBegin%
- *
+ *
* Copyright Ericsson AB 2000-2012. All Rights Reserved.
- *
+ *
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
- *
+ *
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
- *
+ *
* %CopyrightEnd%
*/
package com.ericsson.otp.erlang;
import java.io.IOException;
import java.lang.ref.WeakReference;
-import java.net.ServerSocket;
-import java.net.Socket;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Hashtable;
@@ -36,21 +34,21 @@ import java.util.Iterator;
* communication mechanism is automatic and hidden from the application
* programmer.
* </p>
- *
+ *
* <p>
* Once an instance of this class has been created, obtain one or more mailboxes
* in order to send or receive messages. The first message sent to a given node
* will cause a connection to be set up to that node. Any messages received will
* be delivered to the appropriate mailboxes.
* </p>
- *
+ *
* <p>
* To shut down the node, call {@link #close close()}. This will prevent the
* node from accepting additional connections and it will cause all existing
* connections to be closed. Any unread messages in existing mailboxes can still
* be read, however no new messages will be delivered to the mailboxes.
* </p>
- *
+ *
* <p>
* Note that the use of this class requires that Epmd (Erlang Port Mapper
* Daemon) is running on each cooperating host. This class does not start Epmd
@@ -83,74 +81,156 @@ public class OtpNode extends OtpLocalNode {
* directory. The home directory is obtained from the System property
* "user.home".
* </p>
- *
+ *
* <p>
* If the file does not exist, an empty string is used. This method makes no
* attempt to create the file.
* </p>
- *
+ *
* @param node
* the name of this node.
- *
+ *
* @exception IOException
* if communication could not be initialized.
- *
+ *
*/
public OtpNode(final String node) throws IOException {
- this(node, defaultCookie, 0);
+ super(node);
+
+ init(0);
+ }
+
+ /**
+ * <p>
+ * Create a node using the default cookie. The default cookie is found by
+ * reading the first line of the .erlang.cookie file in the user's home
+ * directory. The home directory is obtained from the System property
+ * "user.home".
+ * </p>
+ *
+ * <p>
+ * If the file does not exist, an empty string is used. This method makes no
+ * attempt to create the file.
+ * </p>
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param transportFactory
+ * the transport factory to use when creating connections.
+ *
+ * @exception IOException
+ * if communication could not be initialized.
+ *
+ */
+ public OtpNode(final String node,
+ final OtpTransportFactory transportFactory) throws IOException {
+ super(node, transportFactory);
+
+ init(0);
}
/**
* Create a node.
- *
+ *
* @param node
* the name of this node.
- *
+ *
* @param cookie
* the authorization cookie that will be used by this node when
* it communicates with other nodes.
- *
+ *
* @exception IOException
* if communication could not be initialized.
- *
+ *
*/
public OtpNode(final String node, final String cookie) throws IOException {
- this(node, cookie, 0);
+ this(node, cookie, 0);
+ }
+
+ /**
+ * Create a node.
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param cookie
+ * the authorization cookie that will be used by this node when
+ * it communicates with other nodes.
+ *
+ * @param transportFactory
+ * the transport factory to use when creating connections.
+ *
+ * @exception IOException
+ * if communication could not be initialized.
+ *
+ */
+ public OtpNode(final String node, final String cookie,
+ final OtpTransportFactory transportFactory) throws IOException {
+ this(node, cookie, 0, transportFactory);
}
/**
* Create a node.
- *
+ *
* @param node
* the name of this node.
- *
+ *
* @param cookie
* the authorization cookie that will be used by this node when
* it communicates with other nodes.
- *
+ *
* @param port
* the port number you wish to use for incoming connections.
* Specifying 0 lets the system choose an available port.
- *
+ *
* @exception IOException
* if communication could not be initialized.
- *
+ *
*/
public OtpNode(final String node, final String cookie, final int port)
- throws IOException {
- super(node, cookie);
+ throws IOException {
+ super(node, cookie);
+
+ init(port);
+ }
+
+ /**
+ * Create a node.
+ *
+ * @param node
+ * the name of this node.
+ *
+ * @param cookie
+ * the authorization cookie that will be used by this node when
+ * it communicates with other nodes.
+ *
+ * @param port
+ * the port number you wish to use for incoming connections.
+ * Specifying 0 lets the system choose an available port.
+ *
+ * @param transportFactory
+ * the transport factory to use when creating connections.
+ *
+ * @exception IOException
+ * if communication could not be initialized.
+ *
+ */
+ public OtpNode(final String node, final String cookie, final int port,
+ final OtpTransportFactory transportFactory) throws IOException {
+ super(node, cookie, transportFactory);
- init(port);
+ init(port);
}
private synchronized void init(final int aport) throws IOException {
- if (!initDone) {
- connections = new Hashtable<String, OtpCookedConnection>(17,
- (float) 0.95);
- mboxes = new Mailboxes();
- acceptor = new Acceptor(aport);
- initDone = true;
- }
+ if (!initDone) {
+ connections = new Hashtable<String, OtpCookedConnection>(17,
+ (float) 0.95);
+ mboxes = new Mailboxes();
+ acceptor = new Acceptor(aport);
+ initDone = true;
+ }
}
/**
@@ -158,24 +238,24 @@ public class OtpNode extends OtpLocalNode {
* and close all existing connections.
*/
public synchronized void close() {
- acceptor.quit();
- OtpCookedConnection conn;
- final Collection<OtpCookedConnection> coll = connections.values();
- final Iterator<OtpCookedConnection> it = coll.iterator();
-
- mboxes.clear();
-
- while (it.hasNext()) {
- conn = it.next();
- it.remove();
- conn.close();
- }
- initDone = false;
+ acceptor.quit();
+ OtpCookedConnection conn;
+ final Collection<OtpCookedConnection> coll = connections.values();
+ final Iterator<OtpCookedConnection> it = coll.iterator();
+
+ mboxes.clear();
+
+ while (it.hasNext()) {
+ conn = it.next();
+ it.remove();
+ conn.close();
+ }
+ initDone = false;
}
@Override
protected void finalize() {
- close();
+ close();
}
/**
@@ -183,65 +263,65 @@ public class OtpNode extends OtpLocalNode {
* receive messages with other, similar mailboxes and with Erlang processes.
* Messages can be sent to this mailbox by using its associated
* {@link OtpMbox#self() pid}.
- *
+ *
* @return a mailbox.
*/
public OtpMbox createMbox() {
- return mboxes.create();
+ return mboxes.create();
}
/**
* Close the specified mailbox with reason 'normal'.
- *
+ *
* @param mbox
* the mailbox to close.
- *
+ *
* <p>
* After this operation, the mailbox will no longer be able to
* receive messages. Any delivered but as yet unretrieved
* messages can still be retrieved however.
* </p>
- *
+ *
* <p>
* If there are links from the mailbox to other
* {@link OtpErlangPid pids}, they will be broken when this
* method is called and exit signals with reason 'normal' will be
* sent.
* </p>
- *
+ *
*/
public void closeMbox(final OtpMbox mbox) {
- closeMbox(mbox, new OtpErlangAtom("normal"));
+ closeMbox(mbox, new OtpErlangAtom("normal"));
}
/**
* Close the specified mailbox with the given reason.
- *
+ *
* @param mbox
* the mailbox to close.
* @param reason
* an Erlang term describing the reason for the termination.
- *
+ *
* <p>
* After this operation, the mailbox will no longer be able to
* receive messages. Any delivered but as yet unretrieved
* messages can still be retrieved however.
* </p>
- *
+ *
* <p>
* If there are links from the mailbox to other
* {@link OtpErlangPid pids}, they will be broken when this
* method is called and exit signals with the given reason will
* be sent.
* </p>
- *
+ *
*/
public void closeMbox(final OtpMbox mbox, final OtpErlangObject reason) {
- if (mbox != null) {
- mboxes.remove(mbox);
- mbox.name = null;
- mbox.breakLinks(reason);
- }
+ if (mbox != null) {
+ mboxes.remove(mbox);
+ mbox.name = null;
+ mbox.breakLinks(reason);
+ }
}
/**
@@ -249,16 +329,16 @@ public class OtpNode extends OtpLocalNode {
* with other, similar mailboxes and with Erlang processes. Messages can be
* sent to this mailbox by using its registered name or the associated
* {@link OtpMbox#self() pid}.
- *
+ *
* @param name
* a name to register for this mailbox. The name must be unique
* within this OtpNode.
- *
+ *
* @return a mailbox, or null if the name was already in use.
- *
+ *
*/
public OtpMbox createMbox(final String name) {
- return mboxes.create(name);
+ return mboxes.create(name);
}
/**
@@ -269,58 +349,58 @@ public class OtpNode extends OtpLocalNode {
* name; if the mailbox already had a name, calling this method will
* supercede that name.
* </p>
- *
+ *
* @param name
* the name to register for the mailbox. Specify null to
* unregister the existing name from this mailbox.
- *
+ *
* @param mbox
* the mailbox to associate with the name.
- *
+ *
* @return true if the name was available, or false otherwise.
*/
public boolean registerName(final String name, final OtpMbox mbox) {
- return mboxes.register(name, mbox);
+ return mboxes.register(name, mbox);
}
/**
* Get a list of all known registered names on this node.
- *
+ *
* @return an array of Strings, containins all known registered names on
* this node.
*/
public String[] getNames() {
- return mboxes.names();
+ return mboxes.names();
}
/**
* Determine the {@link OtpErlangPid pid} corresponding to a registered name
* on this node.
- *
+ *
* @return the {@link OtpErlangPid pid} corresponding to the registered
* name, or null if the name is not known on this node.
*/
public OtpErlangPid whereis(final String name) {
- final OtpMbox m = mboxes.get(name);
- if (m != null) {
- return m.self();
- }
- return null;
+ final OtpMbox m = mboxes.get(name);
+ if (m != null) {
+ return m.self();
+ }
+ return null;
}
/**
* Register interest in certain system events. The {@link OtpNodeStatus
* OtpNodeStatus} handler object contains callback methods, that will be
* called when certain events occur.
- *
+ *
* @param ahandler
* the callback object to register. To clear the handler, specify
* null as the handler to use.
- *
+ *
*/
public synchronized void registerStatusHandler(final OtpNodeStatus ahandler) {
- this.handler = ahandler;
+ handler = ahandler;
}
/**
@@ -329,7 +409,7 @@ public class OtpNode extends OtpLocalNode {
* setting up a connection to the remote node (if possible). Only a single
* outgoing message is sent; the timeout is how long to wait for a response.
* </p>
- *
+ *
* <p>
* Only a single attempt is made to connect to the remote node, so for
* example it is not possible to specify an extremely long timeout and
@@ -337,74 +417,73 @@ public class OtpNode extends OtpLocalNode {
* wait for a remote node to be started, the following construction may be
* useful:
* </p>
- *
+ *
* <pre>
* // ping every 2 seconds until positive response
* while (!me.ping(him, 2000))
* ;
* </pre>
- *
+ *
* @param anode
* the name of the node to ping.
- *
+ *
* @param timeout
* the time, in milliseconds, to wait for response before
* returning false.
- *
+ *
* @return true if the node was alive and the correct ping response was
* returned. false if the correct response was not returned on time.
*/
/*
* internal info about the message formats...
- *
+ *
* the request: -> REG_SEND {6,#Pid<[email protected]>,'',net_kernel}
* {'$gen_call',{#Pid<[email protected]>,#Ref<[email protected]>},{is_auth,bingo@aule}}
- *
+ *
* the reply: <- SEND {2,'',#Pid<[email protected]>} {#Ref<[email protected]>,yes}
*/
public boolean ping(final String anode, final long timeout) {
- if (anode.equals(this.node)) {
- return true;
- } else if (anode.indexOf('@', 0) < 0
- && anode.equals(this.node
- .substring(0, this.node.indexOf('@', 0)))) {
- return true;
- }
-
- // other node
- OtpMbox mbox = null;
- try {
- mbox = createMbox();
- mbox.send("net_kernel", anode, getPingTuple(mbox));
- final OtpErlangObject reply = mbox.receive(timeout);
-
- final OtpErlangTuple t = (OtpErlangTuple) reply;
- final OtpErlangAtom a = (OtpErlangAtom) t.elementAt(1);
- return "yes".equals(a.atomValue());
- } catch (final Exception e) {
- } finally {
- closeMbox(mbox);
- }
- return false;
+ if (anode.equals(node)) {
+ return true;
+ } else if (anode.indexOf('@', 0) < 0
+ && anode.equals(node.substring(0, node.indexOf('@', 0)))) {
+ return true;
+ }
+
+ // other node
+ OtpMbox mbox = null;
+ try {
+ mbox = createMbox();
+ mbox.send("net_kernel", anode, getPingTuple(mbox));
+ final OtpErlangObject reply = mbox.receive(timeout);
+
+ final OtpErlangTuple t = (OtpErlangTuple) reply;
+ final OtpErlangAtom a = (OtpErlangAtom) t.elementAt(1);
+ return "yes".equals(a.atomValue());
+ } catch (final Exception e) {
+ } finally {
+ closeMbox(mbox);
+ }
+ return false;
}
/* create the outgoing ping message */
private OtpErlangTuple getPingTuple(final OtpMbox mbox) {
- final OtpErlangObject[] ping = new OtpErlangObject[3];
- final OtpErlangObject[] pid = new OtpErlangObject[2];
- final OtpErlangObject[] anode = new OtpErlangObject[2];
+ final OtpErlangObject[] ping = new OtpErlangObject[3];
+ final OtpErlangObject[] pid = new OtpErlangObject[2];
+ final OtpErlangObject[] anode = new OtpErlangObject[2];
- pid[0] = mbox.self();
- pid[1] = createRef();
+ pid[0] = mbox.self();
+ pid[1] = createRef();
- anode[0] = new OtpErlangAtom("is_auth");
- anode[1] = new OtpErlangAtom(node());
+ anode[0] = new OtpErlangAtom("is_auth");
+ anode[1] = new OtpErlangAtom(node());
- ping[0] = new OtpErlangAtom("$gen_call");
- ping[1] = new OtpErlangTuple(pid);
- ping[2] = new OtpErlangTuple(anode);
+ ping[0] = new OtpErlangAtom("$gen_call");
+ ping[1] = new OtpErlangTuple(pid);
+ ping[2] = new OtpErlangTuple(anode);
- return new OtpErlangTuple(ping);
+ return new OtpErlangTuple(ping);
}
/*
@@ -412,27 +491,27 @@ public class OtpNode extends OtpLocalNode {
* pings.
*/
private boolean netKernel(final OtpMsg m) {
- OtpMbox mbox = null;
- try {
- final OtpErlangTuple t = (OtpErlangTuple) m.getMsg();
- final OtpErlangTuple req = (OtpErlangTuple) t.elementAt(1); // actual
- // request
-
- final OtpErlangPid pid = (OtpErlangPid) req.elementAt(0); // originating
- // pid
-
- final OtpErlangObject[] pong = new OtpErlangObject[2];
- pong[0] = req.elementAt(1); // his #Ref
- pong[1] = new OtpErlangAtom("yes");
-
- mbox = createMbox();
- mbox.send(pid, new OtpErlangTuple(pong));
- return true;
- } catch (final Exception e) {
- } finally {
- closeMbox(mbox);
- }
- return false;
+ OtpMbox mbox = null;
+ try {
+ final OtpErlangTuple t = (OtpErlangTuple) m.getMsg();
+ final OtpErlangTuple req = (OtpErlangTuple) t.elementAt(1); // actual
+ // request
+
+ final OtpErlangPid pid = (OtpErlangPid) req.elementAt(0); // originating
+ // pid
+
+ final OtpErlangObject[] pong = new OtpErlangObject[2];
+ pong[0] = req.elementAt(1); // his #Ref
+ pong[1] = new OtpErlangAtom("yes");
+
+ mbox = createMbox();
+ mbox.send(pid, new OtpErlangTuple(pong));
+ return true;
+ } catch (final Exception e) {
+ } finally {
+ closeMbox(mbox);
+ }
+ return false;
}
/*
@@ -440,31 +519,31 @@ public class OtpNode extends OtpLocalNode {
* delivered successfully, or false otherwise.
*/
boolean deliver(final OtpMsg m) {
- OtpMbox mbox = null;
-
- try {
- final int t = m.type();
-
- if (t == OtpMsg.regSendTag) {
- final String name = m.getRecipientName();
- /* special case for netKernel requests */
- if (name.equals("net_kernel")) {
- return netKernel(m);
- }
- mbox = mboxes.get(name);
- } else {
- mbox = mboxes.get(m.getRecipientPid());
- }
-
- if (mbox == null) {
- return false;
- }
- mbox.deliver(m);
- } catch (final Exception e) {
- return false;
- }
-
- return true;
+ OtpMbox mbox = null;
+
+ try {
+ final int t = m.type();
+
+ if (t == OtpMsg.regSendTag) {
+ final String name = m.getRecipientName();
+ /* special case for netKernel requests */
+ if (name.equals("net_kernel")) {
+ return netKernel(m);
+ }
+ mbox = mboxes.get(name);
+ } else {
+ mbox = mboxes.get(m.getRecipientPid());
+ }
+
+ if (mbox == null) {
+ return false;
+ }
+ mbox.deliver(m);
+ } catch (final Exception e) {
+ return false;
+ }
+
+ return true;
}
/*
@@ -472,86 +551,86 @@ public class OtpNode extends OtpLocalNode {
* specified by the application
*/
void deliverError(final OtpCookedConnection conn, final Exception e) {
- removeConnection(conn);
- remoteStatus(conn.name, false, e);
+ removeConnection(conn);
+ remoteStatus(conn.name, false, e);
}
/*
* find or create a connection to the given node
*/
OtpCookedConnection getConnection(final String anode) {
- OtpPeer peer = null;
- OtpCookedConnection conn = null;
-
- synchronized (connections) {
- // first just try looking up the name as-is
- conn = connections.get(anode);
-
- if (conn == null) {
- // in case node had no '@' add localhost info and try again
- peer = new OtpPeer(anode);
- conn = connections.get(peer.node());
-
- if (conn == null) {
- try {
- conn = new OtpCookedConnection(this, peer);
- conn.setFlags(connFlags);
- addConnection(conn);
- } catch (final Exception e) {
- /* false = outgoing */
- connAttempt(peer.node(), false, e);
- }
- }
- }
- return conn;
- }
+ OtpPeer peer = null;
+ OtpCookedConnection conn = null;
+
+ synchronized (connections) {
+ // first just try looking up the name as-is
+ conn = connections.get(anode);
+
+ if (conn == null) {
+ // in case node had no '@' add localhost info and try again
+ peer = new OtpPeer(anode);
+ conn = connections.get(peer.node());
+
+ if (conn == null) {
+ try {
+ conn = new OtpCookedConnection(this, peer);
+ conn.setFlags(connFlags);
+ addConnection(conn);
+ } catch (final Exception e) {
+ /* false = outgoing */
+ connAttempt(peer.node(), false, e);
+ }
+ }
+ }
+ return conn;
+ }
}
void addConnection(final OtpCookedConnection conn) {
- if (conn != null && conn.name != null) {
- connections.put(conn.name, conn);
- remoteStatus(conn.name, true, null);
- }
+ if (conn != null && conn.name != null) {
+ connections.put(conn.name, conn);
+ remoteStatus(conn.name, true, null);
+ }
}
private void removeConnection(final OtpCookedConnection conn) {
- if (conn != null && conn.name != null) {
- connections.remove(conn.name);
- }
+ if (conn != null && conn.name != null) {
+ connections.remove(conn.name);
+ }
}
/* use these wrappers to call handler functions */
- private synchronized void remoteStatus(final String anode, final boolean up,
- final Object info) {
- if (handler == null) {
- return;
- }
- try {
- handler.remoteStatus(anode, up, info);
- } catch (final Exception e) {
- }
+ private synchronized void remoteStatus(final String anode,
+ final boolean up, final Object info) {
+ if (handler == null) {
+ return;
+ }
+ try {
+ handler.remoteStatus(anode, up, info);
+ } catch (final Exception e) {
+ }
}
synchronized void localStatus(final String anode, final boolean up,
- final Object info) {
- if (handler == null) {
- return;
- }
- try {
- handler.localStatus(anode, up, info);
- } catch (final Exception e) {
- }
+ final Object info) {
+ if (handler == null) {
+ return;
+ }
+ try {
+ handler.localStatus(anode, up, info);
+ } catch (final Exception e) {
+ }
}
synchronized void connAttempt(final String anode, final boolean incoming,
- final Object info) {
- if (handler == null) {
- return;
- }
- try {
- handler.connAttempt(anode, incoming, info);
- } catch (final Exception e) {
- }
+ final Object info) {
+ if (handler == null) {
+ return;
+ }
+ try {
+ handler.connAttempt(anode, incoming, info);
+ } catch (final Exception e) {
+ }
}
/*
@@ -559,248 +638,248 @@ public class OtpNode extends OtpLocalNode {
* references
*/
public class Mailboxes {
- // mbox pids here
- private Hashtable<OtpErlangPid, WeakReference<OtpMbox>> byPid = null;
- // mbox names here
- private Hashtable<String, WeakReference<OtpMbox>> byName = null;
-
- public Mailboxes() {
- byPid = new Hashtable<OtpErlangPid, WeakReference<OtpMbox>>(17,
- (float) 0.95);
- byName = new Hashtable<String, WeakReference<OtpMbox>>(17,
- (float) 0.95);
- }
-
- public OtpMbox create(final String name) {
- OtpMbox m = null;
-
- synchronized (byName) {
- if (get(name) != null) {
- return null;
- }
- final OtpErlangPid pid = createPid();
- m = new OtpMbox(OtpNode.this, pid, name);
- byPid.put(pid, new WeakReference<OtpMbox>(m));
- byName.put(name, new WeakReference<OtpMbox>(m));
- }
- return m;
- }
-
- public OtpMbox create() {
- final OtpErlangPid pid = createPid();
- final OtpMbox m = new OtpMbox(OtpNode.this, pid);
- byPid.put(pid, new WeakReference<OtpMbox>(m));
- return m;
- }
-
- public void clear() {
- byPid.clear();
- byName.clear();
- }
-
- public String[] names() {
- String allnames[] = null;
-
- synchronized (byName) {
- final int n = byName.size();
- final Enumeration<String> keys = byName.keys();
- allnames = new String[n];
-
- int i = 0;
- while (keys.hasMoreElements()) {
- allnames[i++] = keys.nextElement();
- }
- }
- return allnames;
- }
-
- public boolean register(final String name, final OtpMbox mbox) {
- if (name == null) {
- if (mbox.name != null) {
- byName.remove(mbox.name);
- mbox.name = null;
- }
- } else {
- synchronized (byName) {
- if (get(name) != null) {
- return false;
- }
- byName.put(name, new WeakReference<OtpMbox>(mbox));
- mbox.name = name;
- }
- }
- return true;
- }
-
- /*
- * look up a mailbox based on its name. If the mailbox has gone out of
- * scope we also remove the reference from the hashtable so we don't
- * find it again.
- */
- public OtpMbox get(final String name) {
- final WeakReference<OtpMbox> wr = byName.get(name);
-
- if (wr != null) {
- final OtpMbox m = wr.get();
-
- if (m != null) {
- return m;
- }
- byName.remove(name);
- }
- return null;
- }
-
- /*
- * look up a mailbox based on its pid. If the mailbox has gone out of
- * scope we also remove the reference from the hashtable so we don't
- * find it again.
- */
- public OtpMbox get(final OtpErlangPid pid) {
- final WeakReference<OtpMbox> wr = byPid.get(pid);
-
- if (wr != null) {
- final OtpMbox m = wr.get();
-
- if (m != null) {
- return m;
- }
- byPid.remove(pid);
- }
- return null;
- }
-
- public void remove(final OtpMbox mbox) {
- byPid.remove(mbox.self);
- if (mbox.name != null) {
- byName.remove(mbox.name);
- }
- }
+ // mbox pids here
+ private Hashtable<OtpErlangPid, WeakReference<OtpMbox>> byPid = null;
+ // mbox names here
+ private Hashtable<String, WeakReference<OtpMbox>> byName = null;
+
+ public Mailboxes() {
+ byPid = new Hashtable<OtpErlangPid, WeakReference<OtpMbox>>(17,
+ (float) 0.95);
+ byName = new Hashtable<String, WeakReference<OtpMbox>>(17,
+ (float) 0.95);
+ }
+
+ public OtpMbox create(final String name) {
+ OtpMbox m = null;
+
+ synchronized (byName) {
+ if (get(name) != null) {
+ return null;
+ }
+ final OtpErlangPid pid = createPid();
+ m = new OtpMbox(OtpNode.this, pid, name);
+ byPid.put(pid, new WeakReference<OtpMbox>(m));
+ byName.put(name, new WeakReference<OtpMbox>(m));
+ }
+ return m;
+ }
+
+ public OtpMbox create() {
+ final OtpErlangPid pid = createPid();
+ final OtpMbox m = new OtpMbox(OtpNode.this, pid);
+ byPid.put(pid, new WeakReference<OtpMbox>(m));
+ return m;
+ }
+
+ public void clear() {
+ byPid.clear();
+ byName.clear();
+ }
+
+ public String[] names() {
+ String allnames[] = null;
+
+ synchronized (byName) {
+ final int n = byName.size();
+ final Enumeration<String> keys = byName.keys();
+ allnames = new String[n];
+
+ int i = 0;
+ while (keys.hasMoreElements()) {
+ allnames[i++] = keys.nextElement();
+ }
+ }
+ return allnames;
+ }
+
+ public boolean register(final String name, final OtpMbox mbox) {
+ if (name == null) {
+ if (mbox.name != null) {
+ byName.remove(mbox.name);
+ mbox.name = null;
+ }
+ } else {
+ synchronized (byName) {
+ if (get(name) != null) {
+ return false;
+ }
+ byName.put(name, new WeakReference<OtpMbox>(mbox));
+ mbox.name = name;
+ }
+ }
+ return true;
+ }
+
+ /*
+ * look up a mailbox based on its name. If the mailbox has gone out of
+ * scope we also remove the reference from the hashtable so we don't
+ * find it again.
+ */
+ public OtpMbox get(final String name) {
+ final WeakReference<OtpMbox> wr = byName.get(name);
+
+ if (wr != null) {
+ final OtpMbox m = wr.get();
+
+ if (m != null) {
+ return m;
+ }
+ byName.remove(name);
+ }
+ return null;
+ }
+
+ /*
+ * look up a mailbox based on its pid. If the mailbox has gone out of
+ * scope we also remove the reference from the hashtable so we don't
+ * find it again.
+ */
+ public OtpMbox get(final OtpErlangPid pid) {
+ final WeakReference<OtpMbox> wr = byPid.get(pid);
+
+ if (wr != null) {
+ final OtpMbox m = wr.get();
+
+ if (m != null) {
+ return m;
+ }
+ byPid.remove(pid);
+ }
+ return null;
+ }
+
+ public void remove(final OtpMbox mbox) {
+ byPid.remove(mbox.self);
+ if (mbox.name != null) {
+ byName.remove(mbox.name);
+ }
+ }
}
/*
* this thread simply listens for incoming connections
*/
public class Acceptor extends Thread {
- private final ServerSocket sock;
- private final int acceptorPort;
- private volatile boolean done = false;
-
- Acceptor(final int port) throws IOException {
- sock = new ServerSocket(port);
- this.acceptorPort = sock.getLocalPort();
- OtpNode.this.port = this.acceptorPort;
-
- setDaemon(true);
- setName("acceptor");
- publishPort();
- start();
- }
-
- private boolean publishPort() throws IOException {
- if (getEpmd() != null) {
- return false; // already published
- }
- OtpEpmd.publishPort(OtpNode.this);
- return true;
- }
-
- private void unPublishPort() {
- // unregister with epmd
- OtpEpmd.unPublishPort(OtpNode.this);
-
- // close the local descriptor (if we have one)
- closeSock(epmd);
- epmd = null;
- }
-
- public void quit() {
- unPublishPort();
- done = true;
- closeSock(sock);
- localStatus(node, false, null);
- }
-
- private void closeSock(final ServerSocket s) {
- try {
- if (s != null) {
- s.close();
- }
- } catch (final Exception e) {
- }
- }
-
- private void closeSock(final Socket s) {
- try {
- if (s != null) {
- s.close();
- }
- } catch (final Exception e) {
- }
- }
-
- public int port() {
- return acceptorPort;
- }
-
- @Override
- public void run() {
- Socket newsock = null;
- OtpCookedConnection conn = null;
-
- localStatus(node, true, null);
-
- accept_loop: while (!done) {
- conn = null;
-
- try {
- newsock = sock.accept();
- } catch (final Exception e) {
- // Problem in java1.2.2: accept throws SocketException
- // when socket is closed. This will happen when
- // acceptor.quit()
- // is called. acceptor.quit() will call localStatus(...), so
- // we have to check if that's where we come from.
- if (!done) {
- localStatus(node, false, e);
- }
- break accept_loop;
- }
-
- try {
- synchronized (connections) {
- conn = new OtpCookedConnection(OtpNode.this, newsock);
- conn.setFlags(connFlags);
- addConnection(conn);
- }
- } catch (final OtpAuthException e) {
- if (conn != null && conn.name != null) {
- connAttempt(conn.name, true, e);
- } else {
- connAttempt("unknown", true, e);
- }
- closeSock(newsock);
- } catch (final IOException e) {
- if (conn != null && conn.name != null) {
- connAttempt(conn.name, true, e);
- } else {
- connAttempt("unknown", true, e);
- }
- closeSock(newsock);
- } catch (final Exception e) {
- closeSock(newsock);
- closeSock(sock);
- localStatus(node, false, e);
- break accept_loop;
- }
- } // while
-
- // if we have exited loop we must do this too
- unPublishPort();
- }
+ private final OtpServerTransport sock;
+ private final int acceptorPort;
+ private volatile boolean done = false;
+
+ Acceptor(final int port) throws IOException {
+ sock = createServerTransport(port);
+ acceptorPort = sock.getLocalPort();
+ OtpNode.this.port = acceptorPort;
+
+ setDaemon(true);
+ setName("acceptor");
+ publishPort();
+ start();
+ }
+
+ private boolean publishPort() throws IOException {
+ if (getEpmd() != null) {
+ return false; // already published
+ }
+ OtpEpmd.publishPort(OtpNode.this);
+ return true;
+ }
+
+ private void unPublishPort() {
+ // unregister with epmd
+ OtpEpmd.unPublishPort(OtpNode.this);
+
+ // close the local descriptor (if we have one)
+ closeSock(epmd);
+ epmd = null;
+ }
+
+ public void quit() {
+ unPublishPort();
+ done = true;
+ closeSock(sock);
+ localStatus(node, false, null);
+ }
+
+ private void closeSock(final OtpServerTransport s) {
+ try {
+ if (s != null) {
+ s.close();
+ }
+ } catch (final Exception e) {
+ }
+ }
+
+ private void closeSock(final OtpTransport s) {
+ try {
+ if (s != null) {
+ s.close();
+ }
+ } catch (final Exception e) {
+ }
+ }
+
+ public int port() {
+ return acceptorPort;
+ }
+
+ @Override
+ public void run() {
+ OtpTransport newsock = null;
+ OtpCookedConnection conn = null;
+
+ localStatus(node, true, null);
+
+ accept_loop: while (!done) {
+ conn = null;
+
+ try {
+ newsock = sock.accept();
+ } catch (final Exception e) {
+ // Problem in java1.2.2: accept throws SocketException
+ // when socket is closed. This will happen when
+ // acceptor.quit()
+ // is called. acceptor.quit() will call localStatus(...), so
+ // we have to check if that's where we come from.
+ if (!done) {
+ localStatus(node, false, e);
+ }
+ break accept_loop;
+ }
+
+ try {
+ synchronized (connections) {
+ conn = new OtpCookedConnection(OtpNode.this, newsock);
+ conn.setFlags(connFlags);
+ addConnection(conn);
+ }
+ } catch (final OtpAuthException e) {
+ if (conn != null && conn.name != null) {
+ connAttempt(conn.name, true, e);
+ } else {
+ connAttempt("unknown", true, e);
+ }
+ closeSock(newsock);
+ } catch (final IOException e) {
+ if (conn != null && conn.name != null) {
+ connAttempt(conn.name, true, e);
+ } else {
+ connAttempt("unknown", true, e);
+ }
+ closeSock(newsock);
+ } catch (final Exception e) {
+ closeSock(newsock);
+ closeSock(sock);
+ localStatus(node, false, e);
+ break accept_loop;
+ }
+ } // while
+
+ // if we have exited loop we must do this too
+ unPublishPort();
+ }
}
public void setFlags(final int flags) {
- this.connFlags = flags;
+ connFlags = flags;
}
}