diff options
34 files changed, 3352 insertions, 598 deletions
diff --git a/erts/doc/src/Makefile b/erts/doc/src/Makefile index b96cbbce40..444adf4a6e 100644 --- a/erts/doc/src/Makefile +++ b/erts/doc/src/Makefile @@ -173,6 +173,8 @@ release_docs_spec: docs "$(RELSYSDIR)/doc/html" $(INSTALL_DATA) $(ERL_TOP)/erts/example/time_compat.erl \ "$(RELSYSDIR)/doc/html" + $(INSTALL_DATA) $(ERL_TOP)/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl \ + "$(RELSYSDIR)/doc/html" $(INSTALL_DATA) $(INFO_FILE) "$(RELSYSDIR)" $(INSTALL_DIR) "$(RELEASE_PATH)/man/man3" $(INSTALL_DATA) $(MAN3DIR)/* "$(RELEASE_PATH)/man/man3" diff --git a/erts/doc/src/alt_dist.xml b/erts/doc/src/alt_dist.xml index be969a8267..d3731a5391 100644 --- a/erts/doc/src/alt_dist.xml +++ b/erts/doc/src/alt_dist.xml @@ -47,23 +47,30 @@ runs on. The reason the C code is not made portable, is simply readability.</p> - <note> - <p>This section was written a long time ago. Most of it is still - valid, but some things have changed since then. - Most notably is the driver interface. Some updates have been made - to the documentation of the driver presented here, - but more can be done and is planned for the future. - The reader is encouraged to read the - <seealso marker="erl_driver"><c>erl_driver</c></seealso> and - <seealso marker="driver_entry"><c>driver_entry</c></seealso> - documentation also.</p> - </note> - <section> <title>Introduction</title> <p>To implement a new carrier for the Erlang distribution, the main steps are as follows.</p> + <note><p> + As of ERTS version 10.0 support for distribution controller + processes has been introduced. That is, the traffic over a + distribution channel can be managed by a process instead of + only by a port. This makes it possible to implement large + parts of the logic in Erlang code, and you perhaps do not + even need a new driver for the protocol. One example could + be Erlang distribution over UDP using <c>gen_udp</c> (your + Erlang code will of course have to take care of retranspissions, + etc in this example). That is, depending on what you want + to do you perhaps do not need to implement a driver at all + and can then skip the driver related sections below. + The <c>gen_tcp_dist</c> example described in the + <seealso marker="#distribution_module">Distribution + Module</seealso> section utilize distribution controller + processes and can be worth having a look at if you want to + use distribution controller processes. + </p></note> + <section> <title>Writing an Erlang Driver</title> <p>First, the protocol must be available to the Erlang machine, which @@ -152,7 +159,712 @@ </section> <section> + <marker id="distribution_module"/> + <title>Distribution Module</title> + <p> + The distribution module expose an API that <c>net_kernel</c> call + in order to manage connections to other nodes. The module name + should have the suffix <c>_dist</c>. + </p> + <p> + The module needs to create some kind of listening entity (process + or port) and an acceptor process that accepts incoming connections + using the listening entity. For each connection, the module at least + needs to create one connection supervisor process, which also is + responsible for the handshake when setting up the connection, and + a distribution controller (process or port) responsible for + transport of data over the connection. The distribution controller + and the connection supervisor process should be linked together + so both of them are cleaned up when the connection is taken down. + </p> + <p> + Note that there need to be exactly one distribution controller + per connection. A process or port can only be distribution + controller for one connection. The registration as distribution + controller cannot be undone. It will stick until the distribution + controller terminates. The distribution controller should not + ignore exit signals. It is allowed to trap exits, but it should + then voluntarily terminate when an exit signal is received. + </p> + <p> + An example implementation of a distribution module can be found + in + <url href="gen_tcp_dist.erl">$ERL_TOP/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl</url>. + It implements the distribution over TCP/IP using the <c>gen_tcp</c> + API with distribution controllers implemented by processes. This + instead of using port distribution controllers as the ordinary TCP/IP + distribution uses. + </p> + + <section> + <marker id="distribution_module_exported_callback_functions"/> + <title>Exported Callback Functions</title> + + <p> + The following functions are mandatory: + </p> + <taglist> + <tag><marker id="listen"/><c>listen(Name) -></c><br/> <c>{ok, {Listen, Address, Creation}} | {error, Error} </c></tag> + <item> + <p> + <c>listen/1</c> is called once in order to listen for incoming + connection requests. The call is made when the distribution is brought + up. The argument <c>Name</c> is the part of the node name before + the <c>@</c> sign in the full node name. It can be either an atom or a + string. + </p> + <p> + The return value consists of a <c>Listen</c> handle (which is + later passed to the <seealso marker="#accept"><c>accept/1</c></seealso> + callback), <c>Address</c> which is a <c>#net_address{}</c> record + with information about the address for the node (the + <c>#net_address{}</c> record is defined in + <c>kernel/include/net_address.hrl</c>), and <c>Creation</c> which + (currently) is an integer <c>1</c>, <c>2</c>, or <c>3</c>. + </p> + <p> + If <seealso marker="erts:epmd"><c>epmd</c></seealso> is to be used + for node discovery, you typically want to use the (unfortunately + undocumented) <c>erl_epmd</c> module (part of the <c>kernel</c> + application) in order to register the listen port with <c>epmd</c> + and retrieve <c>Creation</c> to use. + </p> + </item> + + <tag><marker id="accept"/><c>accept(Listen) -></c><br/> <c>AcceptorPid</c></tag> + <item> + <p> + <c>accept/1</c> should spawn a process that accepts connections. This + process should preferably execute on <c>max</c> priority. The process + identifier of this process should be returned. + </p> + <p> + The <c>Listen</c> argument will be the same as the <c>Listen</c> handle + part of the return value of the + <seealso marker="#listen"><c>listen/1</c></seealso> callback above. + <c>accept/1</c> is called only once when the distribution protocol is + started. + </p> + <p> + The caller of this function is a representative for <c>net_kernel</c> + (this may or may not be the process registered as <c>net_kernel</c>) + and is in this document identified as <c>Kernel</c>. + When a connection has been accepted by the acceptor process, it needs + to inform <c>Kernel</c> about the accepted connection. This is done by + passing a message on the form: + </p> + <code type="none"><![CDATA[Kernel ! {accept, AcceptorPid, DistController, Family, Proto}]]></code> + <p> + <c>DistController</c> is either the process or port identifier + of the distribution controller for the connection. The + distribution controller should be created by the acceptor + processes when a new connection is accepted. Its job is to + dispatch traffic on the connection. + </p> + <c>Kernel</c> responds with one of the following messages: + <taglist> + <tag><c>{Kernel, controller, SupervisorPid}</c></tag> + <item> + <p> + The request was accepted and <c>SupervisorPid</c> is the + process identifier of the connection supervisor process + (which is created in the + <seealso marker="#accept_connection"><c>accept_connection/5</c></seealso> + callback). + </p> + </item> + <tag><c>{Kernel, unsupported_protocol}</c></tag> + <item> + <p> + The request was rejected. This is a fatal error. The acceptor + process should terminate. + </p> + </item> + </taglist> + <p> + When an accept sequence has been completed the acceptor process + is expected to continue accepting further requests. + </p> + </item> + + <tag><marker id="accept_connection"/><c>accept_connection(AcceptorPid, DistCtrl, MyNode, Allowed, SetupTime) -></c><br/> <c>ConnectionSupervisorPid</c></tag> + <item> + <p> + <c>accept_connection/5</c> should spawn a process that will + perform the Erlang distribution handshake for the connection. + If the handshake successfully completes it should continue to + function as a connection supervisor. This process + should preferably execute on <c>max</c> priority. + </p> + <p>The arguments:</p> + <taglist> + <tag><c>AcceptorPid</c></tag> + <item> + <p> + Process identifier of the process created by the + <seealso marker="#accept"><c>accept/1</c></seealso> + callback. + </p> + </item> + <tag><c>DistCtrl</c></tag> + <item> + <p>The identifier of the distribution controller identifier + created by the acceptor process. To be passed along to + <c>dist_util:handshake_other_started(HsData)</c>. + </p> + </item> + <tag><c>MyNode</c></tag> + <item> + <p> + Node name of this node. To be passed along to + <c>dist_util:handshake_other_started(HsData)</c>. + </p> + </item> + <tag><c>Allowed</c></tag> + <item> + <p> + To be passed along to + <c>dist_util:handshake_other_started(HsData)</c>. + </p> + </item> + <tag><c>SetupTime</c></tag> + <item> + <p> + Time used for creating a setup timer by a + call to <c>dist_util:start_timer(SetupTime)</c>. + The timer should be passed along to + <c>dist_util:handshake_other_started(HsData)</c>. + </p> + </item> + </taglist> + <p> + The created process should provide callbacks and other + information needed for the handshake in a + <seealso marker="#hs_data_record"><c>#hs_data{}</c></seealso> + record and call <c>dist_util:handshake_other_started(HsData)</c> + with this record. + </p> + <p> + <c>dist_util:handshake_other_started(HsData)</c> will perform + the handshake and if the handshake successfully completes this + process will then continue in a connection supervisor loop + as long as the connection is up. + </p> + </item> + + <tag><marker id="setup"/><c>setup(Node, Type, MyNode, LongOrShortNames, SetupTime) -></c><br/> <c>ConnectionSupervisorPid</c></tag> + <item> + <p> + <c>setup/5</c> should spawn a process that connects to + <c>Node</c>. When connection has been established it should + perform the Erlang distribution handshake for the connection. + If the handshake successfully completes it should continue to + function as a connection supervisor. This process + should preferably execute on <c>max</c> priority. + </p> + <p>The arguments:</p> + <taglist> + <tag><c>Node</c></tag> + <item> + <p> + Node name of remote node. To be passed along to + <c>dist_util:handshake_we_started(HsData)</c>. + </p> + </item> + <tag><c>Type</c></tag> + <item> + <p> + Connection type. To be passed along to + <c>dist_util:handshake_we_started(HsData)</c>. + </p> + </item> + <tag><c>MyNode</c></tag> + <item> + <p> + Node name of this node. To be passed along to + <c>dist_util:handshake_we_started(HsData)</c>. + </p> + </item> + <tag><c>LongOrShortNames</c></tag> + <item> + <p> + Either the atom <c>longnames</c> or + the atom <c>shortnames</c> indicating + whether long or short names is used. + </p> + </item> + <tag><c>SetupTime</c></tag> + <item> + <p> + Time used for creating a setup timer by a + call to <c>dist_util:start_timer(SetupTime)</c>. + The timer should be passed along to + <c>dist_util:handshake_we_started(HsData)</c>. + </p> + </item> + </taglist> + <p> + The caller of this function is a representative for <c>net_kernel</c> + (this may or may not be the process registered as <c>net_kernel</c>) + and is in this document identified as <c>Kernel</c>. + </p> + <p> + This function should, besides spawning the connection supervisor, + also create a distribution controller. The distribution + controller is either a process or a port which is responsible + for dispatching traffic. + </p> + <p> + The created process should provide callbacks and other + information needed for the handshake in a + <seealso marker="#hs_data_record"><c>#hs_data{}</c></seealso> + record and call <c>dist_util:handshake_we_started(HsData)</c> + with this record. + </p> + <p> + <c>dist_util:handshake_we_started(HsData)</c> will perform + the handshake and the handshake successfully completes this + process will then continue in a connection supervisor loop + as long as the connection is up. + </p> + </item> + + <tag><marker id="close"/><c>close(Listen) -></c><br/> <c>void()</c></tag> + + <item><p> + Called in order to close the <c>Listen</c> handle + that originally was passed from the + <seealso marker="#listen"><c>listen/1</c></seealso> callback. + </p></item> + + <tag><marker id="select"/><c>select(NodeName) -></c><br/> <c>boolean()</c></tag> + <item> + <p>Return <c>true</c> if the host name part + of the <c>NodeName</c> is valid for use + with this protocol; otherwise, <c>false</c>. + </p> + </item> + + </taglist> + + <p> + There are also two optional functions that may be + exported: + </p> + <taglist> + <tag><marker id="select"/><c>setopts(Listen, Opts) -></c><br/> <c>ok | {error, Error}</c></tag> + <item> + <p> + The argument <c>Listen</c> is the handle originally passed + from the + <seealso marker="#listen"><c>listen/1</c></seealso> callback. + The argument <c>Opts</c> is a list of options to set on future + connections. + </p> + </item> + + <tag><marker id="select"/><c>getopts(Listen, Opts) -></c><br/> <c>{ok, OptionValues} | {error, Error}</c></tag> + <item> + <p> + The argument <c>Listen</c> is the handle originally passed + from the + <seealso marker="#listen"><c>listen/1</c></seealso> callback. + The argument <c>Opts</c> is a list of options to read for future + connections. + </p> + </item> + </taglist> + + </section> + <section> + <marker id="hs_data_record"/> + <title>The #hs_data{} Record</title> + <p> + The <c>dist_util:handshake_we_started/1</c> and + <c>dist_util:handshake_other_started/1</c> functions + takes a <c>#hs_data{}</c> record as argument. There + are quite a lot of fields in this record that you + need to set. The record is defined in + <c>kernel/include/dist_util.hrl</c>. Not documented + fields should not be set, i.e., should be left as + <c>undefined</c>. + </p> + <p> + The following <c>#hs_data{}</c> record fields need + to be set unless otherwise stated:</p> + <taglist> + <tag><marker id="hs_data_kernel_pid"/><c>kernel_pid</c></tag> + <item> + <p> + Process identifier of the <c>Kernel</c> process. That is, + the process that called either + <seealso marker="#setup"><c>setup/5</c></seealso> or + <seealso marker="#accept_connection"><c>accept_connection/5</c></seealso>. + </p> + </item> + + <tag><marker id="hs_data_other_node"/><c>other_node</c></tag> + <item> + <p>Name of the other node. This field is only + mandatory when this node initiates the connection. + That is, when connection is set up via + <seealso marker="#setup"><c>setup/5</c></seealso>. + </p> + </item> + + <tag><marker id="hs_data_this_node"/><c>this_node</c></tag> + <item> + <p> + The node name of this node. + </p> + </item> + + <tag><marker id="hs_data_socket"/><c>socket</c></tag> + <item> + <p> + The identifier of the distribution controller. + </p> + </item> + + <tag><marker id="hs_data_timer"/><c>timer</c></tag> + <item> + <p> + The timer created using <c>dist_util:start_timer/1</c>. + </p> + </item> + + <tag><marker id="hs_data_allowed"/><c>allowed</c></tag> + <item> + <p>Information passed as <c>Allowed</c> to + <c>accept_connection/5</c>. This field is only + mandatory when the remote node initiated the + connection. That is, when the connection is set + up via + <seealso marker="#accept_connection"><c>accept_connection/5</c></seealso>. + </p> + </item> + + <tag><marker id="hs_data_f_send"/><c>f_send</c></tag> + <item> + <p> + A fun with the following signature: + </p> + <code type="none"><![CDATA[fun (DistCtrlr, Data) -> ok | {error, Error}]]></code> + <p> + where <c>DistCtrlr</c> is the identifier of + the distribution controller and <c>Data</c> + is io data to pass to the other side. + </p> + <p>Only used during handshake phase.</p> + </item> + + <tag><marker id="hs_data_f_recv"/><c>f_recv</c></tag> + <item> + <p> + A fun with the following signature: + </p> + <code type="none"><![CDATA[fun (DistCtrlr, Length) -> {ok, Packet} | {error, Reason}]]></code> + <p> + where <c>DistCtrlr</c> is the identifier of the distribution + controller. + If <c>Length</c> is <c>0</c>, all available bytes should be + returned. If <c>Length > 0</c>, exactly <c>Length</c> bytes + should be returned, or an error; possibly discarding less + than <c>Length</c> bytes of data when the connection is + closed from the other side. + It is used for passive receive of data from the + other end. + </p> + <p>Only used during handshake phase.</p> + </item> + + <tag><marker id="hs_data_f_setopts_pre_nodeup"/><c>f_setopts_pre_nodeup</c></tag> + <item> + <p> + A fun with the following signature: + </p> + <code type="none"><![CDATA[fun (DistCtrlr) -> ok | {error, Error}]]></code> + <p> + where <c>DistCtrlr</c> is the identifier of + the distribution controller. Called just + before the distribution channel is taken up + for normal traffic. + </p> + <p>Only used during handshake phase.</p> + </item> + + <tag><marker id="hs_data_f_setopts_post_nodeup"/><c>f_setopts_post_nodeup</c></tag> + <item> + <p> + A fun with the following signature: + </p> + <code type="none"><![CDATA[fun (DistCtrlr) -> ok | {error, Error}]]></code> + <p> + where <c>DistCtrlr</c> is the identifier of + the distribution controller. Called just + after distribution channel has been taken + up for normal traffic. + </p> + <p>Only used during handshake phase.</p> + </item> + + <tag><marker id="hs_data_f_getll"/><c>f_getll</c></tag> + <item> + <p> + A fun with the following signature: + </p> + <code type="none"><![CDATA[fun (DistCtrlr) -> ID]]></code> + <p> + where <c>DistCtrlr</c> is the identifier of + the distribution controller and <c>ID</c> is + the identifier of the low level entity that + handles the connection (often <c>DistCtrlr</c> + itself). + </p> + <p>Only used during handshake phase.</p> + </item> + + <tag><marker id="hs_data_f_address"/><c>f_address</c></tag> + <item> + <p> + A fun with the following signature: + </p> + <code type="none"><![CDATA[fun (DistCtrlr, Node) -> NetAddress]]></code> + <p> + where <c>DistCtrlr</c> is the identifier of + the distribution controller, <c>Node</c> + is the node name of the node on the other end, + and <c>NetAddress</c> is a <c>#net_address{}</c> + record with information about the address + for the <c>Node</c> on the other end of the + connection. The <c>#net_address{}</c> record + is defined in + <c>kernel/include/net_address.hrl</c>. + </p> + <p>Only used during handshake phase.</p> + </item> + + <tag><marker id="hs_data_mf_tick"/><c>mf_tick</c></tag> + <item> + <p> + A fun with the following signature: + </p> + <code type="none"><![CDATA[fun (DistCtrlr) -> void()]]></code> + <p> + where <c>DistCtrlr</c> is the identifier + of the distribution controller. This + function should send information over + the connection that is not interpreted + by the other end while increasing the + statistics of received packets on the + other end. This is usually implemented by + sending an empty packet. + </p> + <note><p> + It is of vital importance that this operation + does not block the caller for a long time. + This since it is called from the connection + supervisor. + </p></note> + <p>Used when connection is up.</p> + </item> + + <tag><marker id="hs_data_mf_getstat"/><c>mf_getstat</c></tag> + <item> + <p> + A fun with the following signature: + </p> + <code type="none"><![CDATA[fun (DistCtrlr) -> {ok, Received, Sent, PendSend}]]></code> + <p> + where <c>DistCtrlr</c> is the identifier + of the distribution controller, <c>Received</c> + is received packets, <c>Sent</c> is + sent packets, and <c>PendSend</c> is + amount of packets in queue to be sent + or a <c>boolean()</c> indicating whether + there are packets in queue to be sent. + </p> + <note><p> + It is of vital importance that this operation + does not block the caller for a long time. + This since it is called from the connection + supervisor. + </p></note> + <p>Used when connection is up.</p> + </item> + + <tag><marker id="hs_data_request_type"/><c>request_type</c></tag> + <item> + <p> + The request <c>Type</c> as passed to + <seealso marker="#setup"><c>setup/5</c></seealso>. + This is only mandatory when the connection has + been initiated by this node. That is, the connection + is set up via <c>setup/5</c>. + </p> + </item> + + <tag><marker id="hs_data_mf_setopts"/><c>mf_setopts</c></tag> + <item> + <p> + A fun with the following signature: + </p> + <code type="none"><![CDATA[fun (DistCtrl, Opts) -> ok | {error, Error}]]></code> + <p> + where <c>DistCtrlr</c> is the identifier + of the distribution controller and <c>Opts</c> + is a list of options to set on the connection. + </p> + <p>This function is optional. Used when connection is up.</p> + </item> + + <tag><marker id="hs_data_mf_getopts"/><c>mf_getopts</c></tag> + <item> + <p> + A fun with the following signature: + </p> + <code type="none"><![CDATA[fun (DistCtrl, Opts) -> {ok, OptionValues} | {error, Error}]]></code> + <p> + where <c>DistCtrlr</c> is the identifier + of the distribution controller and <c>Opts</c> + is a list of options to read for the connection. + </p> + <p>This function is optional. Used when connection is up.</p> + </item> + + <tag><marker id="hs_data_f_handshake_complete"/><c>f_handshake_complete</c></tag> + <item> + <p> + A fun with the following signature: + </p> + <code type="none"><![CDATA[fun (DistCtrlr, Node, DHandle) -> void()]]></code> + <p> + where <c>DistCtrlr</c> is the identifier + of the distribution controller, <c>Node</c> is + the node name of the node connected at the other + end, and <c>DHandle</c> is a distribution handle + needed by a distribution controller process when + calling the following BIFs: + </p> + <list> + <item><p><seealso marker="erts:erlang#dist_ctrl_get_data/1"><c>erlang:dist_ctrl_get_data/1</c></seealso></p></item> + <item><p><seealso marker="erts:erlang#dist_ctrl_get_data_notification/1"><c>erlang:dist_ctrl_get_data_notification/1</c></seealso></p></item> + <item><p><seealso marker="erts:erlang#dist_ctrl_input_handler/2"><c>erlang:dist_ctrl_input_handler/2</c></seealso></p></item> + <item><p><seealso marker="erts:erlang#dist_ctrl_put_data/2"><c>erlang:dist_ctrl_put_data/2</c></seealso></p></item> + </list> + <p> + This function is called when the handshake has + completed and the distribution channel is up. + The distribution controller can begin dispatching + traffic over the channel. This function is optional. + </p> + <p>Only used during handshake phase.</p> + </item> + + <tag><marker id="hs_data_add_flags"/><c>add_flags</c></tag> + <item> + <p> + <seealso marker="erl_dist_protocol#dflags">Distribution flags</seealso> + to add to the connection. Currently all (non obsolete) flags will + automatically be enabled. + </p> + <p> + This flag field is optional. + </p> + </item> + + <tag><marker id="hs_data_reject_flags"/><c>reject_flags</c></tag> + <item> + <p> + <seealso marker="erl_dist_protocol#dflags">Distribution flags</seealso> + to reject. Currently the following distribution flags can be rejected: + </p> + <taglist> + <tag><c>DFLAG_DIST_HDR_ATOM_CACHE</c></tag> + <item>Do not use atom cache over this connection.</item> + <tag><c>DFLAGS_STRICT_ORDER_DELIVERY</c></tag> + <item>Do not use any features that require strict + order delivery.</item> + </taglist> + <p> + This flag field is optional. + </p> + </item> + + <tag><marker id="hs_data_require_flags"/><c>require_flags</c></tag> + <item> + <p> + Require these <seealso marker="erl_dist_protocol#dflags">distribution + flags</seealso> to be used. The connection will be aborted during the + handshake if the other end does not use them. + </p> + <p> + This flag field is optional. + </p> + </item> + + </taglist> + </section> + + <section> + <marker id="distribution_data_delivery"/> + <title>Distribution Data Delivery</title> + <p> + When using the default configuration, the data to pass + over a connection needs to be delivered as is + to the node on the receiving end in the <em>exact same + order</em>, with no loss of data what so ever, as sent + from the sending node. + </p> + <p> + The data delivery order can be relaxed by disabling + features that require strict ordering. This is done by + passing the <c>?DFLAGS_STRICT_ORDER_DELIVERY</c> + <seealso marker="erl_dist_protocol#dflags">distribution + flags</seealso> in the + <seealso marker="alt_dist#hs_data_reject_flags"><c>reject_flags</c></seealso> + field of the <seealso marker="#hs_data_record"><c>#hs_data{}</c></seealso> + record used when setting up the connection. When relaxed + ordering is used, only the order of signals with the same + sender/receiver pair has to be preserved. + However, note that disabling the features that require + strict ordering may have a negative impact on performance, + throughput, and/or latency. + </p> + </section> + + <section> + <marker id="enable_your_distribution_module"/> + <title>Enable Your Distribution Module</title> + + <p>For <c>net_kernel</c> to find out which distribution module to use, + the <c>erl</c> command-line argument <c>-proto_dist</c> is used. It + is followed by one or more distribution module names, with suffix + "_dist" removed. That is, <c>gen_tcp_dist</c> as a distribution module + is specified as <c>-proto_dist gen_tcp</c>.</p> + + <p>If no <c>epmd</c> (TCP port mapper daemon) is used, also command-line + option <c>-no_epmd</c> is to be specified, which makes + Erlang skip the <c>epmd</c> startup, both as an OS process and as an + Erlang ditto.</p> + </section> + + </section> + + <section> <title>The Driver</title> + + <note> + <p>This section was written a long time ago. Most of it is still + valid, but some things have changed since then. Some updates have + been made to the documentation of the driver presented here, + but more can be done and is planned for the future. + The reader is encouraged to read the + <seealso marker="erl_driver"><c>erl_driver</c></seealso> and + <seealso marker="driver_entry"><c>driver_entry</c></seealso> + documentation also.</p> + </note> + <p>Although Erlang drivers in general can be beyond the scope of this section, a brief introduction seems to be in place.</p> diff --git a/erts/doc/src/erl_dist_protocol.xml b/erts/doc/src/erl_dist_protocol.xml index 610351db6c..a78b13aaa4 100644 --- a/erts/doc/src/erl_dist_protocol.xml +++ b/erts/doc/src/erl_dist_protocol.xml @@ -829,7 +829,31 @@ DiB == gen_digest(ChA, ICA)? <item> <p>The node understand UTF-8 encoded atoms.</p> </item> + <tag><c>-define(DFLAG_MAP_TAG, 16#20000).</c></tag> + <item> + <p>The node understand the map tag.</p> + </item> + <tag><c>-define(DFLAG_BIG_CREATION, 16#40000).</c></tag> + <item> + <p>The node understand big node creation.</p> + </item> + <tag><c>-define(DFLAG_SEND_SENDER, 16#80000).</c></tag> + <item> + <p> + Use the <c>SEND_SENDER</c> + <seealso marker="#control_message">control message</seealso> + instead of the <c>SEND</c> control message and use the + <c>SEND_SENDER_TT</c> control message instead + of the <c>SEND_TT</c> control message. + </p> + </item> </taglist> + <p> + There are also a collection of <c>DFLAG</c>s bitwise or:ed + together in the <c>DFLAGS_STRICT_ORDER_DELIVERY</c> macro. + These flags corresponds to features that require strict + ordering of data over distribution channels. + </p> </section> </section> @@ -922,6 +946,7 @@ DiB == gen_digest(ChA, ICA)? </item> </taglist> + <marker id="control_message"/> <p>The <c>ControlMessage</c> is a tuple, where the first element indicates which distributed operation it encodes:</p> @@ -1028,4 +1053,49 @@ DiB == gen_digest(ChA, ICA)? </item> </taglist> </section> + + <section> + <title>New Ctrlmessages for Erlang/OTP 21</title> + <taglist> + <tag><c>SEND_SENDER</c></tag> + <item> + <p><c>{22, FromPid, ToPid}</c></p> + <p>Followed by <c>Message</c>.</p> + <p> + This control messages replace the <c>SEND</c> control + message and will be sent when the distribution flag + <seealso marker="erl_dist_protocol#dflags"><c>DFLAG_SEND_SENDER</c></seealso> + has been negotiated in the connection setup handshake. + </p> + <note><p> + Messages encoded before the connection has + been set up may still use the <c>SEND</c> control + message. However, once a <c>SEND_SENDER</c> or <c>SEND_SENDER_TT</c> + control message has been sent, no more <c>SEND</c> + control messages will be sent in the same direction + on the connection. + </p></note> + </item> + <tag><c>SEND_SENDER_TT</c></tag> + <item> + <p><c>{23, FromPid, ToPid, TraceToken}</c></p> + <p>Followed by <c>Message</c>.</p> + <p> + This control messages replace the <c>SEND_TT</c> control + message and will be sent when the distribution flag + <seealso marker="erl_dist_protocol#dflags"><c>DFLAG_SEND_SENDER</c></seealso> + has been negotiated in the connection setup handshake. + </p> + <note><p> + Messages encoded before the connection has + been set up may still use the <c>SEND_TT</c> control + message. However, once a <c>SEND_SENDER</c> or <c>SEND_SENDER_TT</c> + control message has been sent, no more <c>SEND_TT</c> + control messages will be sent in the same direction + on the connection. + </p></note> + </item> + </taglist> + </section> + </chapter> diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index 5664609f60..797b36bf13 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -181,6 +181,14 @@ </taglist> </desc> </datatype> + + <datatype> + <name name="dist_handle"></name> + <desc> + <p>An opaque handle identifing a distribution channel.</p> + </desc> + </datatype> + </datatypes> <funcs> @@ -1215,6 +1223,141 @@ end</code> </func> <func> + <name name="dist_ctrl_get_data" arity="1"/> + <fsummary>Get distribution channel data to pass to another node.</fsummary> + <desc> + <p> + Get distribution channel data from the local node that is + to be passed to the remote node. The distribution channel + is identified by <c><anno>DHandle</anno></c>. If no data + is available, the atom <c>none</c> is returned. One + can request to be informed by a message when more + data is available by calling + <seealso marker="erlang#dist_ctrl_get_data_notification/1"><c>erlang:dist_ctrl_get_data_notification(DHandle)</c></seealso>. + </p> + <note><p> + Only the process registered as distribution + controller for the distribution channel identified by + <c><anno>DHandle</anno></c> is allowed to call this + function. + </p></note> + <p> + This function is used when implementing an alternative + distribution carrier using processes as distribution + controllers. <c><anno>DHandle</anno></c> is retrived + via the callback + <seealso marker="erts:alt_dist#hs_data_f_handshake_complete"><c>f_handshake_complete</c></seealso>. + More information can be found in the documentation of + <seealso marker="erts:alt_dist#distribution_module">ERTS + User's Guide ➜ How to implement an Alternative Carrier + for the Erlang Distribution ➜ Distribution Module</seealso>. + </p> + </desc> + </func> + + <func> + <name name="dist_ctrl_get_data_notification" arity="1"/> + <fsummary>Request notification about available outgoing distribution channel data.</fsummary> + <desc> + <p> + Request notification when more data is available to + fetch using + <seealso marker="erlang#dist_ctrl_get_data/1"><c>erlang:dist_ctrl_get_data(DHandle)</c></seealso> + for the distribution channel identified by + <c><anno>DHandle</anno></c>. When more data is present, + the caller will be sent the message <c>dist_data</c>. + Once a <c>dist_data</c> messages has been sent, no + more <c>dist_data</c> messages will be sent until + the <c>dist_ctrl_get_data_notification/1</c> function has been called + again. + </p> + <note><p> + Only the process registered as distribution + controller for the distribution channel identified by + <c><anno>DHandle</anno></c> is allowed to call this + function. + </p></note> + <p> + This function is used when implementing an alternative + distribution carrier using processes as distribution + controllers. <c><anno>DHandle</anno></c> is retrived + via the callback + <seealso marker="erts:alt_dist#hs_data_f_handshake_complete"><c>f_handshake_complete</c></seealso>. + More information can be found in the documentation of + <seealso marker="erts:alt_dist#distribution_module">ERTS + User's Guide ➜ How to implement an Alternative Carrier + for the Erlang Distribution ➜ Distribution Module</seealso>. + </p> + </desc> + </func> + + <func> + <name name="dist_ctrl_input_handler" arity="2"/> + <fsummary>Register distribution channel input handler process.</fsummary> + <desc> + <p> + Register an alternate input handler process for the + distribution channel identified by <c><anno>DHandle</anno></c>. + Once this function has been called, <c><anno>InputHandler</anno></c> + is the only process allowed to call + <seealso marker="erlang#dist_ctrl_put_data/2"><c>erlang:dist_ctrl_put_data(DHandle, Data)</c></seealso> + with the <c><anno>DHandle</anno></c> identifing this distribution + channel. + </p> + <note><p> + Only the process registered as distribution + controller for the distribution channel identified by + <c><anno>DHandle</anno></c> is allowed to call this + function. + </p></note> + <p> + This function is used when implementing an alternative + distribution carrier using processes as distribution + controllers. <c><anno>DHandle</anno></c> is retrived + via the callback + <seealso marker="erts:alt_dist#hs_data_f_handshake_complete"><c>f_handshake_complete</c></seealso>. + More information can be found in the documentation of + <seealso marker="erts:alt_dist#distribution_module">ERTS + User's Guide ➜ How to implement an Alternative Carrier + for the Erlang Distribution ➜ Distribution Module</seealso>. + </p> + </desc> + </func> + + <func> + <name name="dist_ctrl_put_data" arity="2"/> + <fsummary>Pass data into the VM from a distribution channel.</fsummary> + <desc> + <p> + Deliver distribution channel data from a remote node to the + local node. + </p> + <note><p> + Only the process registered as distribution + controller for the distribution channel identified by + <c><anno>DHandle</anno></c> is allowed to call this + function unless an alternate input handler process + has been registered using + <seealso marker="erlang#dist_ctrl_input_handler/2"><c>erlang:dist_ctrl_input_handler(DHandle, InputHandler)</c></seealso>. + If an alternate input handler has been registered, only + the registered input handler process is allowed to call + this function. + </p></note> + <p> + This function is used when implementing an alternative + distribution carrier using processes as distribution + controllers. <c><anno>DHandle</anno></c> is retrived + via the callback + <seealso marker="erts:alt_dist#hs_data_f_handshake_complete"><c>f_handshake_complete</c></seealso>. + More information can be found in the documentation of + <seealso marker="erts:alt_dist#distribution_module">ERTS + User's Guide ➜ How to implement an Alternative Carrier + for the Erlang Distribution ➜ Distribution Module</seealso>. + </p> + </desc> + </func> + + <func> <name name="element" arity="2"/> <fsummary>Return the Nth element of a tuple.</fsummary> <type_desc variable="N">1..tuple_size(<anno>Tuple</anno>)</type_desc> diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index a44d23b181..fc55b687d4 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -217,6 +217,8 @@ atom discard atom display_items atom dist atom dist_cmd +atom dist_ctrl_put_data +atom dist_data atom Div='/' atom div atom dlink diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 7e26af5abd..cbbc2e88a1 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -437,7 +437,6 @@ BIF_RETTYPE demonitor(Process *c_p, Eterm ref, Eterm *multip) ErtsMonitor *mon = NULL; /* The monitor entry to delete */ Eterm to = NIL; /* Monitor link traget */ DistEntry *dep = NULL; /* Target's distribution entry */ - int deref_de = 0; BIF_RETTYPE res = am_false; int unlock_link = 1; @@ -467,8 +466,6 @@ BIF_RETTYPE demonitor(Process *c_p, Eterm ref, Eterm *multip) ASSERT(is_node_name_atom(to)); dep = erts_sysname_to_connected_dist_entry(to); ASSERT(dep != erts_this_dist_entry); - if (dep) - deref_de = 1; } else if (is_port(to)) { if (port_dist_entry(to) != erts_this_dist_entry) { goto badarg; @@ -486,11 +483,6 @@ BIF_RETTYPE demonitor(Process *c_p, Eterm ref, Eterm *multip) unlock_link = 0; } else { /* Local monitor */ - if (deref_de) { - deref_de = 0; - erts_deref_dist_entry(dep); - } - dep = NULL; demonitor_local_process(c_p, ref, to, &res); } break; @@ -505,11 +497,6 @@ done: if (unlock_link) erts_proc_unlock(c_p, ERTS_PROC_LOCK_LINK); - if (deref_de) { - ASSERT(dep); - erts_deref_dist_entry(dep); - } - ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); BIF_RET(res); } @@ -844,7 +831,6 @@ BIF_RETTYPE monitor_2(BIF_ALIST_2) Eterm target = BIF_ARG_2; BIF_RETTYPE ret; DistEntry *dep = NULL; - int deref_de = 0; /* Only process monitors are implemented */ switch (BIF_ARG_1) { @@ -904,21 +890,14 @@ local_port: } dep = erts_sysname_to_connected_dist_entry(remote_node); if (dep == erts_this_dist_entry) { - deref_de = 1; ret = local_name_monitor(BIF_P, BIF_ARG_1, name); } else { - if (dep) - deref_de = 1; ret = remote_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, dep, name, 1); } } else { badarg: ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG); } - if (deref_de) { - deref_de = 0; - erts_deref_dist_entry(dep); - } return ret; } @@ -2000,6 +1979,7 @@ static Sint remote_send(Process *p, DistEntry *dep, ASSERT(is_atom(to) || is_external_pid(to)); + ctx->dep = dep; code = erts_dsig_prepare(&ctx->dsd, dep, p, ERTS_DSP_NO_LOCK, !ctx->suspend); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: @@ -2201,7 +2181,6 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) if (dep == erts_this_dist_entry) { Eterm id; - erts_deref_dist_entry(dep); if (IS_TRACED_FL(p, F_TRACE_SEND)) trace_send(p, to, msg); if (ERTS_PROC_GET_SAVED_CALLS_BUF(p)) @@ -2224,11 +2203,9 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) } ret = remote_send(p, dep, tp[1], to, msg, ctx); - if (ret != SEND_YIELD_CONTINUE) { - if (dep) { - erts_deref_dist_entry(dep); - } - } else { + if (ret == SEND_YIELD_CONTINUE) { + if (dep) + erts_ref_dist_entry(dep); ctx->dep_to_deref = dep; } return ret; @@ -4164,7 +4141,6 @@ BIF_RETTYPE list_to_pid_1(BIF_ALIST_1) goto bad; if(dep == erts_this_dist_entry) { - erts_deref_dist_entry(dep); BIF_RET(make_internal_pid(make_pid_data(c, b))); } else { @@ -4184,13 +4160,10 @@ BIF_RETTYPE list_to_pid_1(BIF_ALIST_1) etp->data.ui[0] = make_pid_data(c, b); MSO(BIF_P).first = (struct erl_off_heap_header*) etp; - erts_deref_dist_entry(dep); BIF_RET(make_external_pid(etp)); } bad: - if (dep) - erts_deref_dist_entry(dep); if (buf) erts_free(ERTS_ALC_T_TMP, (void *) buf); BIF_ERROR(BIF_P, BADARG); @@ -4235,7 +4208,6 @@ BIF_RETTYPE list_to_port_1(BIF_ALIST_1) goto bad; if(dep == erts_this_dist_entry) { - erts_deref_dist_entry(dep); BIF_RET(make_internal_port(p)); } else { @@ -4255,13 +4227,10 @@ BIF_RETTYPE list_to_port_1(BIF_ALIST_1) etp->data.ui[0] = p; MSO(BIF_P).first = (struct erl_off_heap_header*) etp; - erts_deref_dist_entry(dep); BIF_RET(make_external_port(etp)); } bad: - if (dep) - erts_deref_dist_entry(dep); BIF_ERROR(BIF_P, BADARG); } @@ -4381,12 +4350,9 @@ BIF_RETTYPE list_to_ref_1(BIF_ALIST_1) res = make_external_ref(etp); } - erts_deref_dist_entry(dep); BIF_RET(res); bad: - if (dep) - erts_deref_dist_entry(dep); BIF_ERROR(BIF_P, BADARG); } diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 962b00ae7b..2f7927d685 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -154,6 +154,12 @@ bif erlang:spawn_opt/1 bif erlang:setnode/2 bif erlang:setnode/3 bif erlang:dist_exit/3 +bif erlang:dist_get_stat/1 +bif erlang:dist_ctrl_input_handler/2 +bif erlang:dist_ctrl_put_data/2 +bif erlang:dist_ctrl_get_data/1 +bif erlang:dist_ctrl_get_data_notification/1 + # Static native functions in erts_internal bif erts_internal:port_info/1 diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 491c4d378e..10ec275922 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -121,7 +121,7 @@ Export* dexit_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ - +static Export *dist_ctrl_put_data_trap; /* forward declarations */ @@ -156,9 +156,7 @@ create_cache(DistEntry *dep) int i; ErtsAtomCache *cp; - ERTS_LC_ASSERT( - is_internal_port(dep->cid) - && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); + ERTS_LC_ASSERT(is_nil(dep->cid)); ASSERT(!dep->cache); dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE, @@ -176,11 +174,13 @@ Uint erts_dist_cache_size(void) } static ErtsProcList * -get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs) +get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs) { + erts_aint32_t qflgs; ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); - dep->qflgs &= ~unset_qflgs; - if (dep->qflgs & ERTS_DE_QFLG_EXIT) { + qflgs = erts_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs); + qflgs &= ~unset_qflgs; + if (qflgs & ERTS_DE_QFLG_EXIT) { /* No resume when exit has been scheduled */ return NULL; } @@ -446,7 +446,35 @@ inc_no_nodes(void) #endif erts_atomic_inc_mb(&no_nodes); } - + +static void +kill_dist_ctrl_proc(void *vpid) +{ + Eterm pid = (Eterm) vpid; + ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; + Process *rp = erts_pid2proc(NULL, 0, pid, rp_locks); + if (rp) { + erts_send_exit_signal(NULL, rp->common.id, rp, &rp_locks, + am_kill, NIL, NULL, 0); + if (rp_locks) + erts_proc_unlock(rp, rp_locks); + } +} + +static void +schedule_kill_dist_ctrl_proc(Eterm pid) +{ + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + int sched_id = 1; + if (!esdp || ERTS_SCHEDULER_IS_DIRTY(esdp)) + sched_id = 1; + else + sched_id = (int) esdp->no; + erts_schedule_misc_aux_work(sched_id, + kill_dist_ctrl_proc, + (void *) (UWord) pid); +} + /* * proc is currently running or exiting process. */ @@ -456,58 +484,62 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ DistEntry *tdep; - int no_dist_port = 0; + int no_dist_ctrl = 0; Eterm nd_reason = (reason == am_no_network ? am_no_network : am_net_kernel_terminated); erts_rwmtx_rlock(&erts_dist_table_rwmtx); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) - no_dist_port++; + no_dist_ctrl++; for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) - no_dist_port++; + no_dist_ctrl++; /* KILL all port controllers */ - if (no_dist_port == 0) + if (no_dist_ctrl == 0) erts_rwmtx_runlock(&erts_dist_table_rwmtx); else { Eterm def_buf[128]; int i = 0; - Eterm *dist_port; + Eterm *dist_ctrl; - if (no_dist_port <= sizeof(def_buf)/sizeof(def_buf[0])) - dist_port = &def_buf[0]; + if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0])) + dist_ctrl = &def_buf[0]; else - dist_port = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*no_dist_port); + dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, + sizeof(Eterm)*no_dist_ctrl); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) { - ASSERT(is_internal_port(tdep->cid)); - dist_port[i++] = tdep->cid; + ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + dist_ctrl[i++] = tdep->cid; } for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) { - ASSERT(is_internal_port(tdep->cid)); - dist_port[i++] = tdep->cid; + ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + dist_ctrl[i++] = tdep->cid; } erts_rwmtx_runlock(&erts_dist_table_rwmtx); - for (i = 0; i < no_dist_port; i++) { - Port *prt = erts_port_lookup(dist_port[i], - ERTS_PORT_SFLGS_INVALID_LOOKUP); - if (!prt) - continue; - ASSERT(erts_atomic32_read_nob(&prt->state) - & ERTS_PORT_SFLG_DISTRIBUTION); - - erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED, - prt, dist_port[i], nd_reason, NULL); + for (i = 0; i < no_dist_ctrl; i++) { + if (is_internal_pid(dist_ctrl[i])) + schedule_kill_dist_ctrl_proc(dist_ctrl[i]); + else { + Port *prt = erts_port_lookup(dist_ctrl[i], + ERTS_PORT_SFLGS_INVALID_LOOKUP); + if (prt) { + ASSERT(erts_atomic32_read_nob(&prt->state) + & ERTS_PORT_SFLG_DISTRIBUTION); + + erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED, + prt, dist_ctrl[i], nd_reason, NULL); + } + } } - if (dist_port != &def_buf[0]) - erts_free(ERTS_ALC_T_TMP, dist_port); + if (dist_ctrl != &def_buf[0]) + erts_free(ERTS_ALC_T_TMP, dist_ctrl); } /* - * When last dist port exits, node will be taken + * When last dist ctrl exits, node will be taken * from alive to not alive. */ ASSERT(is_nil(nodedown.reason) && !nodedown.bp); @@ -524,7 +556,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) &nodedown.bp->off_heap); } } - else { /* Call from distribution port */ + else { /* Call from distribution controller (port/process) */ NetExitsContext nec = {dep}; ErtsLink *nlinks; ErtsLink *node_links; @@ -534,24 +566,23 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1); erts_de_rwlock(dep); - ERTS_LC_ASSERT(is_internal_port(dep->cid) - && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); + if (is_internal_port(dep->cid)) { + ERTS_LC_ASSERT(erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); - if (erts_port_task_is_scheduled(&dep->dist_cmd)) - erts_port_task_abort(&dep->dist_cmd); + if (erts_port_task_is_scheduled(&dep->dist_cmd)) + erts_port_task_abort(&dep->dist_cmd); + } if (dep->status & ERTS_DE_SFLG_EXITING) { #ifdef DEBUG - erts_mtx_lock(&dep->qlock); - ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT); - erts_mtx_unlock(&dep->qlock); + ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT); #endif } else { dep->status |= ERTS_DE_SFLG_EXITING; erts_mtx_lock(&dep->qlock); - ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); - dep->qflgs |= ERTS_DE_QFLG_EXIT; + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT); erts_mtx_unlock(&dep->qlock); } @@ -616,6 +647,9 @@ void init_dist(void) dgroup_leader_trap = trap_function(am_dgroup_leader,2); dexit_trap = trap_function(am_dexit, 2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); + dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, + am_dist_ctrl_put_data, + 2); } #define ErtsDistOutputBuf2Binary(OB) \ @@ -658,6 +692,8 @@ static void clear_dist_entry(DistEntry *dep) ErtsDistOutputBuf *obuf; erts_de_rwlock(dep); + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) NIL); cache = dep->cache; dep->cache = NULL; @@ -671,6 +707,9 @@ static void clear_dist_entry(DistEntry *dep) erts_mtx_lock(&dep->qlock); + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); + if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; else { @@ -678,8 +717,15 @@ static void clear_dist_entry(DistEntry *dep) obuf = dep->out_queue.first; } + if (dep->tmp_out_queue.first) { + dep->tmp_out_queue.last->next = obuf; + obuf = dep->tmp_out_queue.first; + } + dep->out_queue.first = NULL; dep->out_queue.last = NULL; + dep->tmp_out_queue.first = NULL; + dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; dep->status = 0; @@ -704,8 +750,9 @@ static void clear_dist_entry(DistEntry *dep) if (obufsize) { erts_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; + ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); + erts_atomic_add_nob(&dep->qsize, + (erts_aint_t) -obufsize); erts_mtx_unlock(&dep->qlock); } } @@ -904,11 +951,30 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) } #endif - if (token != NIL) - ctl = TUPLE4(&ctx->ctl_heap[0], - make_small(DOP_SEND_TT), am_Empty, remote, token); - else - ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_SEND), am_Empty, remote); + if (token != NIL) { + Eterm el1, el2; + if (ctx->dep->flags & DFLAG_SEND_SENDER) { + el1 = make_small(DOP_SEND_SENDER_TT); + el2 = sender->common.id; + } + else { + el1 = make_small(DOP_SEND_TT); + el2 = am_Empty; + } + ctl = TUPLE4(&ctx->ctl_heap[0], el1, el2, remote, token); + } + else { + Eterm el1, el2; + if (ctx->dep->flags & DFLAG_SEND_SENDER) { + el1 = make_small(DOP_SEND_SENDER); + el2 = sender->common.id; + } + else { + el1 = make_small(DOP_SEND); + el2 = am_Empty; + } + ctl = TUPLE3(&ctx->ctl_heap[0], el1, el2, remote); + } DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, @@ -1145,6 +1211,7 @@ int erts_net_message(Port *prt, ErtsLink *lnk; Uint tuple_arity; int res; + Uint32 connection_id; #ifdef ERTS_DIST_MSG_DBG ErlDrvSizeT orig_len = len; #endif @@ -1153,14 +1220,17 @@ int erts_net_message(Port *prt, ERTS_CHK_NO_PROC_LOCKS; - ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT(!prt || erts_lc_is_port_locked(prt)); if (!erts_is_alive) { UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; } + + if (hlen != 0) goto data_error; + if (len == 0) { /* HANDLE TICK !!! */ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; @@ -1184,25 +1254,31 @@ int erts_net_message(Port *prt, goto data_error; } - res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache); + res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache, &connection_id); - if (res >= 0) - res = ctl_len = erts_decode_dist_ext_size(&ede); - else { + switch (res) { + case ERTS_PREP_DIST_EXT_CLOSED: + return 0; /* Connection not alive; ignore signal... */ + case ERTS_PREP_DIST_EXT_FAILED: #ifdef ERTS_DIST_MSG_DBG erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n"); bw(buf, orig_len); #endif - ctl_len = 0; - } - - if (res < 0) { + goto data_error; + case ERTS_PREP_DIST_EXT_SUCCESS: + ctl_len = erts_decode_dist_ext_size(&ede); + if (ctl_len < 0) { #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); - bw(buf, orig_len); + erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); + bw(buf, orig_len); #endif - PURIFY_MSG("data error"); - goto data_error; + PURIFY_MSG("data error"); + goto data_error; + } + break; + default: + ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()"); + break; } if (ctl_len > DIST_CTL_DEFAULT_SIZE) { @@ -1233,6 +1309,7 @@ int erts_net_message(Port *prt, } token_size = 0; + token = NIL; switch (type = unsigned_val(tuple[1])) { case DOP_LINK: @@ -1462,38 +1539,52 @@ int erts_net_message(Port *prt, } break; + case DOP_SEND_SENDER_TT: { + Uint xsize; case DOP_SEND_TT: + if (tuple_arity != 4) { goto invalid_message; } - - token_size = size_object(tuple[4]); - /* Fall through ... */ + + token = tuple[4]; + token_size = size_object(token); + xsize = ERTS_HEAP_FRAG_SIZE(token_size); + goto send_common; + + case DOP_SEND_SENDER: case DOP_SEND: + + token = NIL; + xsize = 0; + if (tuple_arity != 3) + goto invalid_message; + + send_common: + /* - * There is intentionally no testing of the cookie (it is always '') - * from R9B and onwards. + * If DOP_SEND_SENDER or DOP_SEND_SENDER_TT element 2 contains + * the sender pid (i.e. DFLAG_SEND_SENDER is set); otherwise, + * the atom '' (empty cookie). */ + ASSERT((type == DOP_SEND_SENDER || type == DOP_SEND_SENDER_TT) + ? (is_pid(tuple[2]) && (dep->flags & DFLAG_SEND_SENDER)) + : tuple[2] == am_Empty); + #ifdef ERTS_DIST_MSG_DBG dist_msg_dbg(&ede, "MSG", buf, orig_len); #endif - if (type != DOP_SEND_TT && tuple_arity != 3) { - goto invalid_message; - } to = tuple[3]; if (is_not_pid(to)) { goto invalid_message; } rp = erts_proc_lookup(to); if (rp) { - Uint xsize = type == DOP_SEND ? 0 : ERTS_HEAP_FRAG_SIZE(token_size); ErtsProcLocks locks = 0; ErtsDistExternal *ede_copy; ede_copy = erts_make_dist_ext_copy(&ede, xsize); - if (type == DOP_SEND) { - token = NIL; - } else { + if (is_not_nil(token)) { ErlHeapFragment *heap_frag; ErlOffHeap *ohp; ASSERT(xsize); @@ -1501,15 +1592,15 @@ int erts_net_message(Port *prt, ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); hp = heap_frag->mem; ohp = &heap_frag->off_heap; - token = tuple[4]; token = copy_struct(token, token_size, &hp, ohp); } - erts_queue_dist_message(rp, locks, ede_copy, token, tuple[2]); + erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty); if (locks) erts_proc_unlock(rp, locks); } break; + } case DOP_MONITOR_P_EXIT: { /* We are monitoring a process on the remote node which dies, we get @@ -1723,7 +1814,7 @@ decode_error: } data_error: UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - erts_deliver_port_exit(prt, dep->cid, am_killed, 0, 1); + erts_kill_dist_connection(dep, connection_id); ERTS_CHK_NO_PROC_LOCKS; return -1; } @@ -1744,6 +1835,31 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy) return ret; } +static ERTS_INLINE void +notify_dist_data(Process *c_p, Eterm pid) +{ + Process *rp; + ErtsProcLocks rp_locks; + + ASSERT(erts_get_scheduler_data() + && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data())); + ASSERT(is_internal_pid(pid)); + + if (c_p && c_p->common.id == pid) { + rp = c_p; + rp_locks = ERTS_PROC_LOCK_MAIN; + } + else { + rp = erts_proc_lookup(pid); + rp_locks = 0; + } + + if (rp) { + ErtsMessage *mp = erts_alloc_message(0, NULL); + erts_queue_message(rp, rp_locks, mp, am_dist_data, am_system); + } +} + int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) { @@ -1859,12 +1975,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) free_dist_obuf(ctx->obuf); } else { + Sint qsize; + erts_aint32_t qflgs; ErtsProcList *plp = NULL; + Eterm notify_proc = NIL; + Sint obsz = size_obuf(ctx->obuf); + erts_mtx_lock(&dep->qlock); - dep->qsize += size_obuf(ctx->obuf); - if (dep->qsize >= erts_dist_buf_busy_limit) - dep->qflgs |= ERTS_DE_QFLG_BUSY; - if (!ctx->force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) { + qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz); + ASSERT(qsize >= obsz); + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) { + erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY); + qflgs |= ERTS_DE_QFLG_BUSY; + } + if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) { + /* Previously empty queue and info requested... */ + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + /* else: requester will send itself the message... */ + qflgs &= ~ERTS_DE_QFLG_REQ_INFO; + } + if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) { erts_mtx_unlock(&dep->qlock); plp = erts_proclist_create(ctx->c_p); @@ -1881,7 +2017,8 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) dep->out_queue.last = ctx->obuf; if (!ctx->force_busy) { - if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) { + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (!(qflgs & ERTS_DE_QFLG_BUSY)) { if (suspended) resume = 1; /* was busy when we started, but isn't now */ #ifdef USE_VM_PROBES @@ -1906,8 +2043,11 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } erts_mtx_unlock(&dep->qlock); - erts_schedule_dist_command(NULL, dep); + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); erts_de_runlock(dep); + if (is_internal_pid(notify_proc)) + notify_dist_data(ctx->c_p, notify_proc); if (resume) { erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN); @@ -1961,16 +2101,20 @@ static Uint dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; - Uint size = obuf->ext_endp - obuf->extp; + ErlDrvSizeT size; + char *bufp; ERTS_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (size > (Uint) INT_MAX) - erts_exit(ERTS_DUMP_EXIT, - "Absurdly large distribution output data buffer " - "(%beu bytes) passed.\n", - size); + if (!obuf) { + size = 0; + bufp = NULL; + } + else { + size = obuf->ext_endp - obuf->extp; + bufp = (char*) obuf->extp; + } #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_output)) { @@ -1985,11 +2129,10 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) remote_str, size); } #endif + prt->caller = NIL; fpe_was_unmasked = erts_block_fpe(); - (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, - (char*) obuf->extp, - (int) size); + (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, bufp, size); erts_unblock_fpe(fpe_was_unmasked); return size; } @@ -1998,7 +2141,7 @@ static Uint dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; - Uint size = obuf->ext_endp - obuf->extp; + ErlDrvSizeT size; SysIOVec iov[2]; ErlDrvBinary* bv[2]; ErlIOVec eiov; @@ -2006,25 +2149,33 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) ERTS_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (size > (Uint) INT_MAX) - erts_exit(ERTS_DUMP_EXIT, - "Absurdly large distribution output data buffer " - "(%beu bytes) passed.\n", - size); - iov[0].iov_base = NULL; iov[0].iov_len = 0; bv[0] = NULL; - iov[1].iov_base = obuf->extp; - iov[1].iov_len = size; - bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + if (!obuf) { + size = 0; + eiov.vsize = 1; + } + else { + size = obuf->ext_endp - obuf->extp; + eiov.vsize = 2; + + iov[1].iov_base = obuf->extp; + iov[1].iov_len = size; + bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + } - eiov.vsize = 2; eiov.size = size; eiov.iov = iov; eiov.binv = bv; + if (size > (Uint) INT_MAX) + erts_exit(ERTS_DUMP_EXIT, + "Absurdly large distribution output data buffer " + "(%beu bytes) passed.\n", + size); + ASSERT(prt->drv_ptr->outputv); #ifdef USE_VM_PROBES @@ -2072,7 +2223,7 @@ erts_dist_command(Port *prt, int reds_limit) Sint reds = ERTS_PORT_REDS_DIST_CMD_START; Uint32 status; Uint32 flags; - Sint obufsize = 0; + Sint qsize, obufsize = 0; ErtsDistOutputQueue oq, foq; DistEntry *dep = prt->dist_entry; Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); @@ -2081,9 +2232,6 @@ erts_dist_command(Port *prt, int reds_limit) ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); - erts_refc_inc(&dep->refc, 1); /* Otherwise dist_entry might be - removed if port command fails */ - erts_atomic_set_mb(&dep->dist_cmd_scheduled, 0); erts_de_rlock(dep); @@ -2094,7 +2242,6 @@ erts_dist_command(Port *prt, int reds_limit) if (status & ERTS_DE_SFLG_EXITING) { erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); - erts_deref_dist_entry(dep); return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; } @@ -2128,20 +2275,20 @@ erts_dist_command(Port *prt, int reds_limit) if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { int preempt = 0; do { - Uint size; - ErtsDistOutputBuf *fob; - - size = (*send)(prt, foq.first); - esdp->io.out += (Uint64) size; + Uint size; + ErtsDistOutputBuf *fob; + size = (*send)(prt, foq.first); + erts_atomic64_inc_nob(&dep->out); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(foq.first->extp, size); + erts_fprintf(stderr, ">> "); + bw(foq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = foq.first; - obufsize += size_obuf(fob); - foq.first = foq.first->next; - free_dist_obuf(fob); + reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = foq.first; + obufsize += size_obuf(fob); + foq.first = foq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) @@ -2202,31 +2349,33 @@ erts_dist_command(Port *prt, int reds_limit) } } else { + int de_busy; int preempt = 0; while (oq.first && !preempt) { - ErtsDistOutputBuf *fob; - Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp < oq.first->ext_endp); - size = (*send)(prt, oq.first); - esdp->io.out += (Uint64) size; + ErtsDistOutputBuf *fob; + Uint size; + oq.first->extp + = erts_encode_ext_dist_header_finalize(oq.first->extp, + dep->cache, + flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' + needed */ + ASSERT(&oq.first->data[0] <= oq.first->extp + && oq.first->extp < oq.first->ext_endp); + size = (*send)(prt, oq.first); + erts_atomic64_inc_nob(&dep->out); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(oq.first->extp, size); + erts_fprintf(stderr, ">> "); + bw(oq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = oq.first; - obufsize += size_obuf(fob); - oq.first = oq.first->next; - free_dist_obuf(fob); + reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = oq.first; + obufsize += size_obuf(fob); + oq.first = oq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) @@ -2255,12 +2404,13 @@ erts_dist_command(Port *prt, int reds_limit) * processes. */ erts_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; + de_busy = !!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY); + qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize, + (erts_aint_t) -obufsize); + ASSERT(qsize >= 0); obufsize = 0; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) - && (dep->qflgs & ERTS_DE_QFLG_BUSY) - && dep->qsize < erts_dist_buf_busy_limit) { + && de_busy && qsize < erts_dist_buf_busy_limit) { ErtsProcList *suspendees; int resumed; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); @@ -2280,8 +2430,13 @@ erts_dist_command(Port *prt, int reds_limit) if (obufsize != 0) { ASSERT(obufsize > 0); erts_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; +#ifdef DEBUG + qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize, + (erts_aint_t) -obufsize); + ASSERT(qsize >= 0); +#else + erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); +#endif erts_mtx_unlock(&dep->qlock); } @@ -2299,8 +2454,6 @@ erts_dist_command(Port *prt, int reds_limit) if (reds > INT_MAX/2) reds = INT_MAX/2; - erts_deref_dist_entry(dep); - return reds; preempted: @@ -2337,7 +2490,7 @@ erts_dist_command(Port *prt, int reds_limit) #ifdef DEBUG erts_mtx_lock(&dep->qlock); - ASSERT(dep->qsize == obufsize); + ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); erts_mtx_unlock(&dep->qlock); #endif } @@ -2348,7 +2501,7 @@ erts_dist_command(Port *prt, int reds_limit) * in out_queue. */ erts_mtx_lock(&dep->qlock); - dep->qsize -= obufsize; + erts_atomic_add_nob(&dep->qsize, -obufsize); obufsize = 0; oq.last->next = dep->out_queue.first; dep->out_queue.first = oq.first; @@ -2362,6 +2515,370 @@ erts_dist_command(Port *prt, int reds_limit) goto done; } +#if 0 + +int +dist_data_finalize(Process *c_p, int reds_limit) +{ + int reds = 5; + DistEntry *dep = ; + ErtsDistOutputQueue oq, foq; + ErtsDistOutputBuf *ob; + int preempt; + + + erts_mtx_lock(&dep->qlock); + flags = dep->flags; + oq.first = dep->out_queue.first; + oq.last = dep->out_queue.last; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + erts_mtx_unlock(&dep->qlock); + + if (!oq.first) { + ASSERT(!oq.last); + oq.first = dep->tmp_out_queue.first; + oq.last = dep->tmp_out_queue.last; + } + else { + ErtsDistOutputBuf *f, *l; + ASSERT(oq.last); + if (dep->tmp_out_queue.last) { + dep->tmp_out_queue.last->next = oq.first; + oq.first = dep->tmp_out_queue.first; + } + } + + if (!oq.first) { + /* Nothing to do... */ + ASSERT(!oq.last); + return reds; + } + + foq.first = dep->finalized_out_queue.first; + foq.last = dep->finalized_out_queue.last; + + preempt = 0; + ob = oq.first; + ASSERT(ob); + + do { + ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, + dep->cache, + flags); + if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' + needed */ + ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + preempt = reds > reds_limit; + if (preempt) + break; + ob = ob->next; + } while (ob); + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the last buffer that we finalized. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + if (!preempt) { + /* All buffers finalized */ + foq.last = oq.last; + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + foq.last = ob; + oq.first = ob->next; + if (oq.first) + ob->next = NULL; + else + oq.last = NULL; + } + + dep->finalized_out_queue.first = foq.first; + dep->finalized_out_queue.last = foq.last; + dep->tmp_out_queue.first = oq.first; + dep->tmp_out_queue.last = oq.last; + + return reds; +} + +#endif + +BIF_RETTYPE +dist_ctrl_get_data_notification_1(BIF_ALIST_1) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + erts_aint32_t qflgs; + erts_aint_t qsize; + Eterm receiver = NIL; + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + BIF_ERROR(BIF_P, BADARG); + + /* + * Caller is the only one that can consume from this queue + * and the only one that can set the req-info flag... + */ + + erts_de_rlock(dep); + + ASSERT(dep->cid == BIF_P->common.id); + + qflgs = erts_atomic32_read_acqb(&dep->qflgs); + + if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) { + qsize = erts_atomic_read_acqb(&dep->qsize); + ASSERT(qsize >= 0); + if (qsize > 0) + receiver = BIF_P->common.id; /* Notify ourselves... */ + else { /* Empty queue; set req-info flag... */ + qflgs = erts_atomic32_read_bor_mb(&dep->qflgs, + ERTS_DE_QFLG_REQ_INFO); + qsize = erts_atomic_read_acqb(&dep->qsize); + ASSERT(qsize >= 0); + if (qsize > 0) { + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) + receiver = BIF_P->common.id; /* Notify ourselves... */ + /* else: someone else will notify us... */ + } + /* else: still empty queue... */ + } + } + /* else: Already requested... */ + + erts_de_runlock(dep); + + if (is_internal_pid(receiver)) + notify_dist_data(BIF_P, receiver); + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_ctrl_put_data_2(BIF_ALIST_2) +{ + DistEntry *dep; + ErlDrvSizeT size; + Eterm input_handler; + + if (is_binary(BIF_ARG_2)) + size = binary_size(BIF_ARG_2); + else if (is_nil(BIF_ARG_2)) + size = 0; + else if (is_list(BIF_ARG_2)) + BIF_TRAP2(dist_ctrl_put_data_trap, + BIF_P, BIF_ARG_1, BIF_ARG_2); + else + BIF_ERROR(BIF_P, BADARG); + + dep = erts_dhandle_to_dist_entry(BIF_ARG_1); + if (!dep) + BIF_ERROR(BIF_P, BADARG); + + input_handler = (Eterm) erts_atomic_read_nob(&dep->input_handler); + + if (input_handler != BIF_P->common.id) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + erts_atomic64_inc_nob(&dep->in); + + if (size != 0) { + byte *data, *temp_alloc = NULL; + + data = (byte *) erts_get_aligned_binary_bytes(BIF_ARG_2, &temp_alloc); + if (!data) + BIF_ERROR(BIF_P, BADARG); + + erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + + (void) erts_net_message(NULL, dep, NULL, 0, data, size); + /* + * We ignore any decode failures. On fatal failures the + * connection will be taken down by killing the + * distribution channel controller... + */ + + erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + + BUMP_REDS(BIF_P, 5); + + erts_free_aligned_binary_bytes(temp_alloc); + + } + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_get_stat_1(BIF_ALIST_1) +{ + Sint64 read, write, pend; + Eterm res, *hp, **hpp; + Uint sz, *szp; + DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1); + + if (!dep) + BIF_ERROR(BIF_P, BADARG); + + erts_de_rlock(dep); + + read = (Sint64) erts_atomic64_read_nob(&dep->in); + write = (Sint64) erts_atomic64_read_nob(&dep->out); + pend = (Sint64) erts_atomic_read_nob(&dep->qsize); + + erts_de_runlock(dep); + + sz = 0; + szp = &sz; + hpp = NULL; + + while (1) { + res = erts_bld_tuple(hpp, szp, 4, + am_ok, + erts_bld_sint64(hpp, szp, read), + erts_bld_sint64(hpp, szp, write), + pend ? am_true : am_false); + if (hpp) + break; + hp = HAlloc(BIF_P, sz); + hpp = &hp; + szp = NULL; + } + + BIF_RET(res); +} + +BIF_RETTYPE +dist_ctrl_input_handler_2(BIF_ALIST_2) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + BIF_ERROR(BIF_P, BADARG); + + if (is_not_internal_pid(BIF_ARG_2)) + BIF_ERROR(BIF_P, BADARG); + + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) BIF_ARG_2); + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_ctrl_get_data_1(BIF_ALIST_1) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + int reds = 1; + ErtsDistOutputBuf *obuf; + Eterm *hp; + ProcBin *pb; + erts_aint_t qsize; + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + BIF_ERROR(BIF_P, BADARG); + + erts_de_rlock(dep); + + if (dep->status & ERTS_DE_SFLG_EXITING) + goto return_none; + + ASSERT(dep->cid == BIF_P->common.id); + +#if 0 + if (dep->finalized_out_queue.first) { + obuf = dep->finalized_out_queue.first; + dep->finalized_out_queue.first = obuf->next; + if (!obuf->next) + dep->finalized_out_queue.last = NULL; + } + else +#endif + { + if (!dep->tmp_out_queue.first) { + ASSERT(!dep->tmp_out_queue.last); + qsize = erts_atomic_read_acqb(&dep->qsize); + if (qsize > 0) { + erts_mtx_lock(&dep->qlock); + dep->tmp_out_queue.first = dep->out_queue.first; + dep->tmp_out_queue.last = dep->out_queue.last; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + erts_mtx_unlock(&dep->qlock); + } + } + + if (!dep->tmp_out_queue.first) { + ASSERT(!dep->tmp_out_queue.last); + return_none: + erts_de_runlock(dep); + BIF_RET(am_none); + } + else { + obuf = dep->tmp_out_queue.first; + dep->tmp_out_queue.first = obuf->next; + if (!obuf->next) + dep->tmp_out_queue.last = NULL; + } + + obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp, + dep->cache, + dep->flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */ + ASSERT(&obuf->data[0] <= obuf->extp + && obuf->extp < obuf->ext_endp); + } + + erts_atomic64_inc_nob(&dep->out); + + erts_de_runlock(dep); + + hp = HAlloc(BIF_P, PROC_BIN_SIZE); + pb = (ProcBin *) (char *) hp; + pb->thing_word = HEADER_PROC_BIN; + pb->size = obuf->ext_endp - obuf->extp; + pb->next = MSO(BIF_P).first; + MSO(BIF_P).first = (struct erl_off_heap_header*) pb; + pb->val = ErtsDistOutputBuf2Binary(obuf); + pb->bytes = (byte*) obuf->extp; + pb->flags = 0; + + qsize = erts_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf)); + ASSERT(qsize >= 0); + + if (qsize < erts_dist_buf_busy_limit/2 + && (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) { + ErtsProcList *resume_procs = NULL; + erts_mtx_lock(&dep->qlock); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); + erts_mtx_unlock(&dep->qlock); + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + } + + BIF_RET2(make_binary(pb), reds); +} + void erts_dist_port_not_busy(Port *prt) { @@ -2385,18 +2902,20 @@ void erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { erts_de_rwlock(dep); - if (is_internal_port(dep->cid) - && connection_id == dep->connection_id + if (connection_id == dep->connection_id && !(dep->status & ERTS_DE_SFLG_EXITING)) { dep->status |= ERTS_DE_SFLG_EXITING; erts_mtx_lock(&dep->qlock); - ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); - dep->qflgs |= ERTS_DE_QFLG_EXIT; + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); erts_mtx_unlock(&dep->qlock); - erts_schedule_dist_command(NULL, dep); + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + else if (is_internal_pid(dep->cid)) + schedule_kill_dist_ctrl_proc(dep->cid); } erts_de_rwunlock(dep); } @@ -2513,9 +3032,6 @@ info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connecte } erts_print(to, arg, "Name: %T", dep->sysname); -#ifdef DEBUG - erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 0)); -#endif erts_print(to, arg, "\n"); if (!connected && is_nil(dep->cid)) { if (dep->nlinks) { @@ -2635,17 +3151,23 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; } - net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN, - am_net_kernel, ERTS_PROC_LOCK_MAIN, 0); - if (!net_kernel) + net_kernel = erts_whereis_process(BIF_P, + ERTS_PROC_LOCK_MAIN, + am_net_kernel, + ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS, + 0); + if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel)) goto error; /* By setting F_DISTRIBUTION on net_kernel, - * do_net_exist will be called when net_kernel is terminated !! */ + * erts_do_net_exits will be called when net_kernel is terminated !! */ net_kernel->flags |= F_DISTRIBUTION; - if (net_kernel != BIF_P) - erts_proc_unlock(net_kernel, ERTS_PROC_LOCK_MAIN); + erts_proc_unlock(net_kernel, + (ERTS_PROC_LOCK_STATUS + | ((net_kernel != BIF_P) + ? ERTS_PROC_LOCK_MAIN + : 0))); #ifdef DEBUG erts_rwmtx_rlock(&erts_dist_table_rwmtx); @@ -2662,6 +3184,14 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) erts_thr_progress_unblock(); erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + /* + * Note erts_this_dist_entry is changed by erts_set_this_node(), + * so we *need* to use the new one after erts_set_this_node() + * is called. + */ + erts_ref_dist_entry(erts_this_dist_entry); + ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry); + BIF_RET(am_true); error: @@ -2691,18 +3221,18 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) Eterm ic, oc; Eterm *tp; DistEntry *dep = NULL; + ErtsProcLocks proc_unlock = 0; + Process *proc; Port *pp = NULL; - /* Prepare for success */ - ERTS_BIF_PREP_RET(ret, am_true); - /* * Check and pick out arguments */ if (!is_node_name_atom(BIF_ARG_1) || - is_not_internal_port(BIF_ARG_2) || - (erts_this_node->sysname == am_Noname)) { + !(is_internal_port(BIF_ARG_2) + || is_internal_pid(BIF_ARG_2)) + || (erts_this_node->sysname == am_Noname)) { goto badarg; } @@ -2746,77 +3276,120 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) else if (!dep) goto system_limit; /* Should never happen!!! */ - pp = erts_id2port_sflgs(BIF_ARG_2, - BIF_P, - ERTS_PROC_LOCK_MAIN, - ERTS_PORT_SFLGS_INVALID_LOOKUP); - erts_de_rwlock(dep); + if (is_internal_pid(BIF_ARG_2)) { + if (BIF_P->common.id == BIF_ARG_2) { + proc_unlock = 0; + proc = BIF_P; + } + else { + proc_unlock = ERTS_PROC_LOCK_MAIN; + proc = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN, + BIF_ARG_2, proc_unlock); + } + erts_de_rwlock(dep); - if (!pp || (erts_atomic32_read_nob(&pp->state) - & ERTS_PORT_SFLG_EXITING)) - goto badarg; + if (!proc) + goto badarg; + else if (proc == ERTS_PROC_LOCK_BUSY) { + proc_unlock = 0; + goto yield; + } - if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) - goto badarg; + erts_proc_lock(proc, ERTS_PROC_LOCK_STATUS); + proc_unlock |= ERTS_PROC_LOCK_STATUS; - if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) - goto done; /* Already set */ + if (ERTS_PROC_GET_DIST_ENTRY(proc)) { + if (dep == ERTS_PROC_GET_DIST_ENTRY(proc) + && (proc->flags & F_DISTRIBUTION) + && dep->cid == BIF_ARG_2) + goto done; + goto badarg; + } - if (dep->status & ERTS_DE_SFLG_EXITING) { - /* Suspend on dist entry waiting for the exit to finish */ - ErtsProcList *plp = erts_proclist_create(BIF_P); - plp->next = NULL; - erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); - erts_mtx_lock(&dep->qlock); - erts_proclist_store_last(&dep->suspended, plp); - erts_mtx_unlock(&dep->qlock); - goto yield; - } + if (is_not_nil(dep->cid)) + goto badarg; - ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + proc->flags |= F_DISTRIBUTION; + ERTS_PROC_SET_DIST_ENTRY(proc, dep); - if (pp->dist_entry || is_not_nil(dep->cid)) - goto badarg; + proc_unlock &= ~ERTS_PROC_LOCK_STATUS; + erts_proc_unlock(proc, ERTS_PROC_LOCK_STATUS); - erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + dep->send = NULL; /* Only for distr ports... */ - /* - * Dist-ports do not use the "busy port message queue" functionality, but - * instead use "busy dist entry" functionality. - */ - { - ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; - erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); } + else { - pp->dist_entry = dep; + pp = erts_id2port_sflgs(BIF_ARG_2, + BIF_P, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); + erts_de_rwlock(dep); + + if (!pp || (erts_atomic32_read_nob(&pp->state) + & ERTS_PORT_SFLG_EXITING)) + goto badarg; + + if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) + goto badarg; + + if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) + goto done; /* Already set */ + + if (dep->status & ERTS_DE_SFLG_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_mtx_lock(&dep->qlock); + erts_proclist_store_last(&dep->suspended, plp); + erts_mtx_unlock(&dep->qlock); + goto yield; + } - dep->version = version; - dep->creation = 0; + ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); - ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + if (pp->dist_entry || is_not_nil(dep->cid)) + goto badarg; -#if 1 - dep->send = (pp->drv_ptr->outputv - ? dist_port_commandv - : dist_port_command); -#else - dep->send = dist_port_command; -#endif - ASSERT(dep->send); + erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + + pp->dist_entry = dep; + + ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + + dep->send = (pp->drv_ptr->outputv + ? dist_port_commandv + : dist_port_command); + ASSERT(dep->send); + + /* + * Dist-ports do not use the "busy port message queue" functionality, but + * instead use "busy dist entry" functionality. + */ + { + ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; + erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); + } + + } + + dep->version = version; + dep->creation = 0; #ifdef DEBUG - erts_mtx_lock(&dep->qlock); - ASSERT(dep->qsize == 0); - erts_mtx_unlock(&dep->qlock); + ASSERT(erts_atomic_read_nob(&dep->qsize) == 0); #endif - erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); - if (flags & DFLAG_DIST_HDR_ATOM_CACHE) create_cache(dep); + erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + erts_de_rwunlock(dep); + + ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); + dep = NULL; /* inc of refc transferred to port (dist_entry field) */ inc_no_nodes(); @@ -2836,6 +3409,9 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if (pp) erts_port_release(pp); + if (proc_unlock) + erts_proc_unlock(proc, proc_unlock); + return ret; yield: @@ -3138,7 +3714,6 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); done: - erts_deref_dist_entry(dep); BIF_RET(am_true); } diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 05016cafc5..d4765c50b8 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -44,6 +44,7 @@ #define DFLAG_UTF8_ATOMS 0x10000 #define DFLAG_MAP_TAG 0x20000 #define DFLAG_BIG_CREATION 0x40000 +#define DFLAG_SEND_SENDER 0x80000 /* All flags that should be enabled when term_to_binary/1 is used. */ #define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \ @@ -74,6 +75,9 @@ #define DOP_DEMONITOR_P 20 #define DOP_MONITOR_P_EXIT 21 +#define DOP_SEND_SENDER 22 +#define DOP_SEND_SENDER_TT 23 + /* distribution trap functions */ extern Export* dsend2_trap; extern Export* dsend3_trap; @@ -161,13 +165,10 @@ erts_dsig_prepare(ErtsDSigData *dsdp, goto fail; } if (no_suspend) { - failure = ERTS_DSIG_PREP_CONNECTED; - erts_mtx_lock(&dep->qlock); - if (dep->qflgs & ERTS_DE_QFLG_BUSY) + if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { failure = ERTS_DSIG_PREP_WOULD_SUSPEND; - erts_mtx_unlock(&dep->qlock); - if (failure == ERTS_DSIG_PREP_WOULD_SUSPEND) goto fail; + } } dsdp->proc = proc; dsdp->dep = dep; @@ -349,6 +350,7 @@ typedef struct { Eterm ctl_heap[6]; ErtsDSigData dsd; DistEntry* dep_to_deref; + DistEntry *dep; struct erts_dsig_send_context dss; Eterm return_term; diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 2ff95a3338..d0942ccef8 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -3750,7 +3750,6 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) subres = make_link_list(BIF_P, dep->nlinks, NIL); subres = make_link_list(BIF_P, dep->node_links, subres); erts_de_links_unlock(dep); - erts_deref_dist_entry(dep); BIF_RET(subres); } else { BIF_RET(am_undefined); @@ -3781,7 +3780,6 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) erts_de_links_lock(dep); ml = make_monitor_list(BIF_P, dep->monitors); erts_de_links_unlock(dep); - erts_deref_dist_entry(dep); BIF_RET(ml); } else { BIF_RET(am_undefined); @@ -3796,7 +3794,6 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) else { Uint cno = dist_entry_channel_no(dep); res = make_small(cno); - erts_deref_dist_entry(dep); } BIF_RET(res); } @@ -3858,15 +3855,14 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) DFLAG_BIT_BINARIES); BIF_RET(erts_term_to_binary(BIF_P, tp[2], 0, dflags)); } - else if (ERTS_IS_ATOM_STR("dist_port", tp[1])) { + else if (ERTS_IS_ATOM_STR("dist_ctrl", tp[1])) { Eterm res = am_undefined; DistEntry *dep = erts_sysname_to_connected_dist_entry(tp[2]); if (dep) { erts_de_rlock(dep); - if (is_internal_port(dep->cid)) + if (is_internal_port(dep->cid) || is_internal_pid(dep->cid)) res = dep->cid; erts_de_runlock(dep); - erts_deref_dist_entry(dep); } BIF_RET(res); } @@ -4275,7 +4271,6 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2) con_id = dep->connection_id; erts_de_runlock(dep); erts_kill_dist_connection(dep, con_id); - erts_deref_dist_entry(dep); BIF_RET(am_true); } } diff --git a/erts/emulator/beam/erl_monitors.c b/erts/emulator/beam/erl_monitors.c index 67c552b364..1c840d89f6 100644 --- a/erts/emulator/beam/erl_monitors.c +++ b/erts/emulator/beam/erl_monitors.c @@ -993,7 +993,6 @@ Eterm erts_debug_dump_monitors_1(BIF_ALIST_1) erts_dump_monitors(dep->monitors,0); erts_de_links_unlock(dep); erts_printf("Monitors dumped-------------------------\n"); - erts_deref_dist_entry(dep); BIF_RET(am_true); } else { BIF_ERROR(p,BADARG); @@ -1038,7 +1037,6 @@ Eterm erts_debug_dump_links_1(BIF_ALIST_1) erts_dump_links(dep->nlinks,0); erts_de_links_unlock(dep); erts_printf("Links dumped----------------------------\n"); - erts_deref_dist_entry(dep); BIF_RET(am_true); } else { BIF_ERROR(p,BADARG); diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index f8e9fec27a..0f3dfa797c 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -29,6 +29,8 @@ #include "error.h" #include "erl_thr_progress.h" #include "dtrace-wrapper.h" +#include "erl_binary.h" +#include "erl_bif_unique.h" Hash erts_dist_table; Hash erts_node_table; @@ -57,6 +59,58 @@ static ErtsMonotonicTime node_tab_delete_delay; /* -- The distribution table ---------------------------------------------- */ +#define ErtsBin2DistEntry(B) \ + ((DistEntry *) ERTS_MAGIC_BIN_DATA((B))) +#define ErtsDistEntry2Bin(DEP) \ + ((Binary *) ERTS_MAGIC_BIN_FROM_DATA((DEP))) + +static ERTS_INLINE erts_aint_t +de_refc_read(DistEntry *dep, erts_aint_t min) +{ + return erts_refc_read(&ErtsDistEntry2Bin(dep)->intern.refc, min); +} + +static ERTS_INLINE erts_aint_t +de_refc_inc_read(DistEntry *dep, erts_aint_t min) +{ + return erts_refc_inctest(&ErtsDistEntry2Bin(dep)->intern.refc, min); +} + +static ERTS_INLINE void +de_refc_inc(DistEntry *dep, erts_aint_t min) +{ + erts_refc_inc(&ErtsDistEntry2Bin(dep)->intern.refc, min); +} + +static ERTS_INLINE void +de_refc_dec(DistEntry *dep, erts_aint_t min) +{ +#ifdef DEBUG + (void) erts_refc_read(&ErtsDistEntry2Bin(dep)->intern.refc, min+1); +#endif + erts_bin_release(ErtsDistEntry2Bin(dep)); +} + +static ERTS_INLINE erts_aint_t +de_refc_dec_read(DistEntry *dep, erts_aint_t min) +{ + return erts_refc_dectest(&ErtsDistEntry2Bin(dep)->intern.refc, min); +} + +void +erts_ref_dist_entry(DistEntry *dep) +{ + ASSERT(dep); + de_refc_inc(dep, 1); +} + +void +erts_deref_dist_entry(DistEntry *dep) +{ + ASSERT(dep); + de_refc_dec(dep, 0); +} + #ifdef DEBUG static int is_in_de_list(DistEntry *dep, DistEntry *dep_list) @@ -85,22 +139,39 @@ dist_table_cmp(void *dep1, void *dep2) static void* dist_table_alloc(void *dep_tmpl) { +#ifdef DEBUG + erts_aint_t refc; +#endif Eterm sysname; + Binary *bin; DistEntry *dep; erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER; rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ; sysname = ((DistEntry *) dep_tmpl)->sysname; - dep = (DistEntry *) erts_alloc(ERTS_ALC_T_DIST_ENTRY, sizeof(DistEntry)); + + bin = erts_create_magic_binary_x(sizeof(DistEntry), + erts_dist_entry_destructor, + ERTS_ALC_T_DIST_ENTRY, + 0); + dep = ErtsBin2DistEntry(bin); dist_entries++; +#ifdef DEBUG + refc = +#else + (void) +#endif + de_refc_dec_read(dep, -1); + ASSERT(refc == -1); + dep->prev = NULL; - erts_refc_init(&dep->refc, -1); erts_rwmtx_init_opt(&dep->rwmtx, &rwmtx_opt, "dist_entry", sysname, ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION); dep->sysname = sysname; dep->cid = NIL; + erts_atomic_init_nob(&dep->input_handler, (erts_aint_t) NIL); dep->connection_id = 0; dep->status = 0; dep->flags = 0; @@ -114,12 +185,16 @@ dist_table_alloc(void *dep_tmpl) erts_mtx_init(&dep->qlock, "dist_entry_out_queue", sysname, ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION); - dep->qflgs = 0; - dep->qsize = 0; + erts_atomic32_init_nob(&dep->qflgs, 0); + erts_atomic_init_nob(&dep->qsize, 0); + erts_atomic64_init_nob(&dep->in, 0); + erts_atomic64_init_nob(&dep->out, 0); dep->out_queue.first = NULL; dep->out_queue.last = NULL; dep->suspended = NULL; + dep->tmp_out_queue.first = NULL; + dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; @@ -181,7 +256,7 @@ dist_table_free(void *vdep) #ifdef DEBUG sys_memset(vdep, 0x77, sizeof(DistEntry)); #endif - erts_free(ERTS_ALC_T_DIST_ENTRY, (void *) dep); + erts_bin_free(ErtsDistEntry2Bin(dep)); ASSERT(dist_entries > 0); dist_entries--; @@ -199,19 +274,52 @@ erts_dist_table_info(fmtfn_t to, void *to_arg) erts_rwmtx_runlock(&erts_dist_table_rwmtx); } +static ERTS_INLINE DistEntry *find_dist_entry(Eterm sysname, + int inc_refc, + int connected_only) +{ + DistEntry *res; + DistEntry de; + de.sysname = sysname; + erts_rwmtx_rlock(&erts_dist_table_rwmtx); + res = hash_get(&erts_dist_table, (void *) &de); + if (res) { + if (connected_only && is_nil(res->cid)) + res = NULL; + else { + int pend_delete; + erts_aint_t refc; + if (inc_refc) { + refc = de_refc_inc_read(res, 1); + pend_delete = refc < 2; + } + else { + refc = de_refc_read(res, 0); + pend_delete = refc < 1; + } + if (pend_delete) /* Pending delete */ + de_refc_inc(res, 1); + } + } + erts_rwmtx_runlock(&erts_dist_table_rwmtx); + return res; +} + DistEntry * erts_channel_no_to_dist_entry(Uint cno) { + /* + * Does NOT increase reference count! + */ + /* * For this node (and previous incarnations of this node), * ERST_INTERNAL_CHANNEL_NO (will always be 0 I guess) is used as * channel no. For other nodes, the atom index of the atom corresponding * to the node name is used as channel no. */ - if(cno == ERST_INTERNAL_CHANNEL_NO) { - erts_refc_inc(&erts_this_dist_entry->refc, 2); + if (cno == ERST_INTERNAL_CHANNEL_NO) return erts_this_dist_entry; - } if((cno > MAX_ATOM_INDEX) || (cno >= atom_table_size()) @@ -220,80 +328,97 @@ erts_channel_no_to_dist_entry(Uint cno) /* cno is a valid atom index; find corresponding dist entry (if there is one) */ - return erts_find_dist_entry(make_atom(cno)); + return find_dist_entry(make_atom(cno), 0, 0); } - DistEntry * erts_sysname_to_connected_dist_entry(Eterm sysname) { - DistEntry de; - DistEntry *res_dep; - de.sysname = sysname; - - if(erts_this_dist_entry->sysname == sysname) { - erts_refc_inc(&erts_this_dist_entry->refc, 2); + /* + * Does NOT increase reference count! + */ + if(erts_this_dist_entry->sysname == sysname) return erts_this_dist_entry; - } - - erts_rwmtx_rlock(&erts_dist_table_rwmtx); - res_dep = (DistEntry *) hash_get(&erts_dist_table, (void *) &de); - if (res_dep) { - erts_aint_t refc = erts_refc_inctest(&res_dep->refc, 1); - if (refc < 2) /* Pending delete */ - erts_refc_inc(&res_dep->refc, 1); - } - erts_rwmtx_runlock(&erts_dist_table_rwmtx); - if (res_dep) { - int deref; - erts_rwmtx_rlock(&res_dep->rwmtx); - deref = is_nil(res_dep->cid); - erts_rwmtx_runlock(&res_dep->rwmtx); - if (deref) { - erts_deref_dist_entry(res_dep); - res_dep = NULL; - } - } - return res_dep; + return find_dist_entry(sysname, 0, 1); } DistEntry *erts_find_or_insert_dist_entry(Eterm sysname) { + /* + * This function DOES increase reference count! + */ DistEntry *res; DistEntry de; erts_aint_t refc; - res = erts_find_dist_entry(sysname); + res = find_dist_entry(sysname, 1, 0); if (res) return res; de.sysname = sysname; erts_rwmtx_rwlock(&erts_dist_table_rwmtx); res = hash_put(&erts_dist_table, (void *) &de); - refc = erts_refc_inctest(&res->refc, 0); + refc = de_refc_inc_read(res, 0); if (refc < 2) /* New or pending delete */ - erts_refc_inc(&res->refc, 1); + de_refc_inc(res, 1); erts_rwmtx_rwunlock(&erts_dist_table_rwmtx); return res; } DistEntry *erts_find_dist_entry(Eterm sysname) { - DistEntry *res; - DistEntry de; - de.sysname = sysname; - erts_rwmtx_rlock(&erts_dist_table_rwmtx); - res = hash_get(&erts_dist_table, (void *) &de); - if (res) { - erts_aint_t refc = erts_refc_inctest(&res->refc, 1); - if (refc < 2) /* Pending delete */ - erts_refc_inc(&res->refc, 1); - } - erts_rwmtx_runlock(&erts_dist_table_rwmtx); - return res; + /* + * Does NOT increase reference count! + */ + return find_dist_entry(sysname, 0, 0); } -static void try_delete_dist_entry(void *vdep) +DistEntry * +erts_dhandle_to_dist_entry(Eterm dhandle) { - DistEntry *dep = (DistEntry *) vdep; + Binary *bin; + if (!is_internal_magic_ref(dhandle)) + return NULL; + bin = erts_magic_ref2bin(dhandle); + if (ERTS_MAGIC_BIN_DESTRUCTOR(bin) != erts_dist_entry_destructor) + return NULL; + return ErtsBin2DistEntry(bin); +} + +Eterm +erts_make_dhandle(Process *c_p, DistEntry *dep) +{ + Binary *bin; + Eterm *hp; + + bin = ErtsDistEntry2Bin(dep); + ASSERT(bin); + ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_dist_entry_destructor); + hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE); + return erts_mk_magic_ref(&hp, &c_p->off_heap, bin); +} + +static void try_delete_dist_entry(void *vbin); + +static void +prepare_try_delete_dist_entry(void *vbin) +{ + Binary *bin = (Binary *) vbin; + DistEntry *dep = ErtsBin2DistEntry(bin); + Uint size; + erts_aint_t refc; + + refc = de_refc_read(dep, 0); + if (refc > 0) + return; + + size = ERTS_MAGIC_BIN_SIZE(sizeof(DistEntry)); + erts_schedule_thr_prgr_later_cleanup_op(try_delete_dist_entry, + vbin, &dep->later_op, size); +} + +static void try_delete_dist_entry(void *vbin) +{ + Binary *bin = (Binary *) vbin; + DistEntry *dep = ErtsBin2DistEntry(bin); erts_aint_t refc; erts_rwmtx_rwlock(&erts_dist_table_rwmtx); @@ -312,26 +437,39 @@ static void try_delete_dist_entry(void *vdep) * * If refc > 0, the entry is in use. Keep the entry. */ - refc = erts_refc_dectest(&dep->refc, -1); + refc = de_refc_dec_read(dep, -1); if (refc == -1) (void) hash_erase(&erts_dist_table, (void *) dep); erts_rwmtx_rwunlock(&erts_dist_table_rwmtx); - if (refc == 0) - erts_schedule_delete_dist_entry(dep); + if (refc == 0) { + if (node_tab_delete_delay == 0) + prepare_try_delete_dist_entry(vbin); + else if (node_tab_delete_delay > 0) + erts_start_timer_callback(node_tab_delete_delay, + prepare_try_delete_dist_entry, + vbin); + } } -void erts_schedule_delete_dist_entry(DistEntry *dep) +int erts_dist_entry_destructor(Binary *bin) { - ASSERT(dep != erts_this_dist_entry); - if (dep != erts_this_dist_entry) { - if (node_tab_delete_delay == 0) - try_delete_dist_entry((void *) dep); - else if (node_tab_delete_delay > 0) - erts_start_timer_callback(node_tab_delete_delay, - try_delete_dist_entry, - (void *) dep); - } + DistEntry *dep = ErtsBin2DistEntry(bin); + erts_aint_t refc; + + refc = de_refc_read(dep, -1); + + if (refc == -1) + return 1; /* Allow deallocation of structure... */ + + if (node_tab_delete_delay == 0) + prepare_try_delete_dist_entry((void *) bin); + else if (node_tab_delete_delay > 0) + erts_start_timer_callback(node_tab_delete_delay, + prepare_try_delete_dist_entry, + (void *) bin); + + return 0; } Uint @@ -384,7 +522,7 @@ erts_set_dist_entry_not_connected(DistEntry *dep) erts_rwmtx_rwlock(&erts_dist_table_rwmtx); ASSERT(dep != erts_this_dist_entry); - ASSERT(is_internal_port(dep->cid)); + ASSERT(is_internal_port(dep->cid) || is_internal_pid(dep->cid)); if(dep->flags & DFLAG_PUBLISHED) { if(dep->prev) { @@ -439,7 +577,7 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) ASSERT(dep != erts_this_dist_entry); ASSERT(is_nil(dep->cid)); - ASSERT(is_internal_port(cid)); + ASSERT(is_internal_port(cid) || is_internal_pid(cid)); if(dep->prev) { ASSERT(is_in_de_list(dep, erts_not_connected_dist_entries)); @@ -459,10 +597,19 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) dep->status |= ERTS_DE_SFLG_CONNECTED; dep->flags = flags; dep->cid = cid; + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) cid); + dep->connection_id++; dep->connection_id &= ERTS_DIST_EXT_CON_ID_MASK; dep->prev = NULL; + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); + erts_atomic32_set_nob(&dep->qflgs, + (is_internal_port(cid) + ? ERTS_DE_QFLG_PORT_CTRL + : ERTS_DE_QFLG_PROC_CTRL)); if(flags & DFLAG_PUBLISHED) { dep->next = erts_visible_dist_entries; if(erts_visible_dist_entries) { @@ -716,19 +863,18 @@ void erts_set_this_node(Eterm sysname, Uint creation) { ERTS_LC_ASSERT(erts_thr_progress_is_blocking()); - ASSERT(erts_refc_read(&erts_this_dist_entry->refc, 2)); + ASSERT(2 <= de_refc_read(erts_this_dist_entry, 2)); if (erts_refc_dectest(&erts_this_node->refc, 0) == 0) try_delete_node(erts_this_node); - if (erts_refc_dectest(&erts_this_dist_entry->refc, 0) == 0) - try_delete_dist_entry(erts_this_dist_entry); + erts_deref_dist_entry(erts_this_dist_entry); erts_this_node = NULL; /* to make sure refc is bumped for this node */ erts_this_node = erts_find_or_insert_node(sysname, creation); erts_this_dist_entry = erts_this_node->dist_entry; - erts_refc_inc(&erts_this_dist_entry->refc, 2); + erts_ref_dist_entry(erts_this_dist_entry); erts_this_node_sysname = erts_this_node_sysname_BUFFER; erts_snprintf(erts_this_node_sysname, sizeof(erts_this_node_sysname_BUFFER), @@ -797,9 +943,9 @@ void erts_init_node_tables(int dd_sec) ASSERT(erts_this_node->dist_entry != NULL); erts_this_dist_entry = erts_this_node->dist_entry; /* +1 for erts_this_dist_entry */ - /* +1 for erts_this_node->dist_entry */ - erts_refc_init(&erts_this_dist_entry->refc, 2); + erts_ref_dist_entry(erts_this_dist_entry); + ASSERT(2 == de_refc_read(erts_this_dist_entry, 2)); erts_this_node_sysname = erts_this_node_sysname_BUFFER; erts_snprintf(erts_this_node_sysname, sizeof(erts_this_node_sysname_BUFFER), @@ -876,6 +1022,7 @@ static Eterm AM_node_references; static Eterm AM_system; static Eterm AM_timer; static Eterm AM_delayed_delete_timer; +static Eterm AM_thread_progress_delete_timer; static void setup_reference_table(void); static Eterm reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp); @@ -965,6 +1112,7 @@ erts_get_node_and_dist_references(struct process *proc) INIT_AM(timer); INIT_AM(system); INIT_AM(delayed_delete_timer); + INIT_AM(thread_progress_delete_timer); references_atoms_need_init = 0; } @@ -1148,6 +1296,10 @@ insert_offheap2(ErlOffHeap *oh, void *arg) insert_offheap(oh, a->type, a->id); } +#define ErtsIsDistEntryBinary(Bin) \ + (((Bin)->intern.flags & BIN_FLAG_MAGIC) \ + && ERTS_MAGIC_BIN_DESTRUCTOR((Bin)) == erts_dist_entry_destructor) + static void insert_offheap(ErlOffHeap *oh, int type, Eterm id) { @@ -1158,7 +1310,10 @@ insert_offheap(ErlOffHeap *oh, int type, Eterm id) for (u.hdr = oh->first; u.hdr; u.hdr = u.hdr->next) { switch (thing_subtag(u.hdr->thing_word)) { case REF_SUBTAG: - if(IsMatchProgBinary(u.mref->mb)) { + if (ErtsIsDistEntryBinary(u.mref->mb)) + insert_dist_entry(ErtsBin2DistEntry(u.mref->mb), + type, id, 0); + else if(IsMatchProgBinary(u.mref->mb)) { InsertedBin *ib; int insert_bin = 1; for (ib = inserted_bins; ib; ib = ib->next) @@ -1301,26 +1456,34 @@ insert_delayed_delete_node(void *state, ErtsMonotonicTime timeout_pos, void *vnp) { - DeclareTmpHeapNoproc(heap,3); - UseTmpHeapNoproc(3); + Eterm heap[3]; insert_node((ErlNode *) vnp, SYSTEM_REF, TUPLE2(&heap[0], AM_system, AM_delayed_delete_timer)); - UnUseTmpHeapNoproc(3); +} + +static void +insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vbin) +{ + DistEntry *dep = ErtsBin2DistEntry(vbin); + Eterm heap[3]; + insert_dist_entry(dep, + SYSTEM_REF, + TUPLE2(&heap[0], AM_system, AM_thread_progress_delete_timer), + 0); } static void insert_delayed_delete_dist_entry(void *state, ErtsMonotonicTime timeout_pos, - void *vdep) + void *vbin) { - DeclareTmpHeapNoproc(heap,3); - UseTmpHeapNoproc(3); - insert_dist_entry((DistEntry *) vdep, + DistEntry *dep = ErtsBin2DistEntry(vbin); + Eterm heap[3]; + insert_dist_entry(dep, SYSTEM_REF, TUPLE2(&heap[0], AM_system, AM_delayed_delete_timer), 0); - UnUseTmpHeapNoproc(3); } static void @@ -1354,9 +1517,12 @@ setup_reference_table(void) erts_debug_callback_timer_foreach(try_delete_node, insert_delayed_delete_node, NULL); - erts_debug_callback_timer_foreach(try_delete_dist_entry, + erts_debug_callback_timer_foreach(prepare_try_delete_dist_entry, insert_delayed_delete_dist_entry, NULL); + erts_debug_later_op_foreach(try_delete_dist_entry, + insert_thr_prgr_delete_dist_entry, + NULL); UseTmpHeapNoproc(3); insert_node(erts_this_node, @@ -1421,6 +1587,14 @@ setup_reference_table(void) insert_links(ERTS_P_LINKS(proc), proc->common.id); if (ERTS_P_MONITORS(proc)) insert_monitors(ERTS_P_MONITORS(proc), proc->common.id); + { + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc); + if (dep) + insert_dist_entry(dep, + CTRL_REF, + proc->common.id, + 0); + } } } @@ -1719,7 +1893,7 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) /* DistList = [{Dist, Refc, ReferenceIdList}] */ tup = MK_3TUP(referred_dists[i].dist->sysname, - MK_UINT(erts_refc_read(&referred_dists[i].dist->refc, 0)), + MK_UINT(de_refc_read(referred_dists[i].dist, 0)), dril); dl = MK_CONS(tup, dl); } diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 7974b25444..3bba673435 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -47,6 +47,9 @@ #define ERTS_PORT_TASK_ONLY_BASIC_TYPES__ #include "erl_port_task.h" #undef ERTS_PORT_TASK_ONLY_BASIC_TYPES__ +#define ERTS_BINARY_TYPES_ONLY__ +#include "erl_binary.h" +#undef ERTS_BINARY_TYPES_ONLY__ #define ERTS_NODE_TAB_DELAY_GC_DEFAULT (60) #define ERTS_NODE_TAB_DELAY_GC_MAX (100*1000*1000) @@ -60,11 +63,17 @@ #define ERTS_DE_SFLGS_ALL (ERTS_DE_SFLG_CONNECTED \ | ERTS_DE_SFLG_EXITING) -#define ERTS_DE_QFLG_BUSY (((Uint32) 1) << 0) -#define ERTS_DE_QFLG_EXIT (((Uint32) 1) << 1) +#define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0) +#define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1) +#define ERTS_DE_QFLG_REQ_INFO (((erts_aint32_t) 1) << 2) +#define ERTS_DE_QFLG_PORT_CTRL (((erts_aint32_t) 1) << 3) +#define ERTS_DE_QFLG_PROC_CTRL (((erts_aint32_t) 1) << 4) #define ERTS_DE_QFLGS_ALL (ERTS_DE_QFLG_BUSY \ - | ERTS_DE_QFLG_EXIT) + | ERTS_DE_QFLG_EXIT \ + | ERTS_DE_QFLG_REQ_INFO \ + | ERTS_DE_QFLG_PORT_CTRL \ + | ERTS_DE_QFLG_PROC_CTRL) #if defined(ARCH_64) #define ERTS_DIST_OUTPUT_BUF_DBG_PATTERN ((Uint) 0xf713f713f713f713UL) @@ -106,12 +115,13 @@ typedef struct dist_entry_ { HashBucket hash_bucket; /* Hash bucket */ struct dist_entry_ *next; /* Next entry in dist_table (not sorted) */ struct dist_entry_ *prev; /* Previous entry in dist_table (not sorted) */ - erts_refc_t refc; /* Reference count */ - erts_rwmtx_t rwmtx; /* Protects all fields below until lck_mtx. */ + erts_rwmtx_t rwmtx; /* Protects all fields below until lck_mtx. */ Eterm sysname; /* name@host atom for efficiency */ Uint32 creation; /* creation of connected node */ - Eterm cid; /* connection handler (pid or port), NIL == free */ + erts_atomic_t input_handler; /* Input handler */ + Eterm cid; /* connection handler (pid or port), + NIL == free */ Uint32 connection_id; /* Connection id incremented on connect */ Uint32 status; /* Slot status, like exiting reserved etc */ Uint32 flags; /* Distribution flags, like hidden, @@ -119,7 +129,7 @@ typedef struct dist_entry_ { unsigned long version; /* Protocol version */ - erts_mtx_t lnk_mtx; /* Protects node_links, nlinks, and + erts_mtx_t lnk_mtx; /* Protects node_links, nlinks, and monitors. */ ErtsLink *node_links; /* In a dist entry, node links are kept in a separate tree, while they are @@ -131,12 +141,15 @@ typedef struct dist_entry_ { ErtsLink *nlinks; /* Link tree with subtrees */ ErtsMonitor *monitors; /* Monitor tree */ - erts_mtx_t qlock; /* Protects qflgs and out_queue */ - Uint32 qflgs; - Sint qsize; + erts_mtx_t qlock; /* Protects qflgs and out_queue */ + erts_atomic32_t qflgs; + erts_atomic_t qsize; + erts_atomic64_t in; + erts_atomic64_t out; ErtsDistOutputQueue out_queue; struct ErtsProcList_ *suspended; + ErtsDistOutputQueue tmp_out_queue; ErtsDistOutputQueue finalized_out_queue; erts_atomic_t dist_cmd_scheduled; ErtsPortTaskHandle dist_cmd; @@ -144,6 +157,8 @@ typedef struct dist_entry_ { Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); struct cache* cache; /* The atom cache */ + + ErtsThrPrgrLaterOp later_op; } DistEntry; typedef struct erl_node_ { @@ -193,12 +208,12 @@ Eterm erts_get_node_and_dist_references(struct process *); int erts_lc_is_de_rwlocked(DistEntry *); int erts_lc_is_de_rlocked(DistEntry *); #endif +int erts_dist_entry_destructor(Binary *bin); +DistEntry *erts_dhandle_to_dist_entry(Eterm dhandle); +Eterm erts_make_dhandle(Process *c_p, DistEntry *dep); +void erts_ref_dist_entry(DistEntry *dep); +void erts_deref_dist_entry(DistEntry *dep); -#ifdef ERTS_ENABLE_LOCK_COUNT -void erts_lcnt_update_distribution_locks(int enable); -#endif - -ERTS_GLB_INLINE void erts_deref_dist_entry(DistEntry *dep); ERTS_GLB_INLINE void erts_deref_node_entry(ErlNode *np); ERTS_GLB_INLINE void erts_de_rlock(DistEntry *dep); ERTS_GLB_INLINE void erts_de_runlock(DistEntry *dep); @@ -210,14 +225,6 @@ ERTS_GLB_INLINE void erts_de_links_unlock(DistEntry *dep); #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE void -erts_deref_dist_entry(DistEntry *dep) -{ - ASSERT(dep); - if (erts_refc_dectest(&dep->refc, 0) == 0) - erts_schedule_delete_dist_entry(dep); -} - -ERTS_GLB_INLINE void erts_deref_node_entry(ErlNode *np) { ASSERT(np); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index c29b13c6c1..ab5030e5b9 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -776,6 +776,11 @@ erts_pre_init_process(void) = ERTS_PSD_ETS_FIXED_TABLES_GET_LOCKS; erts_psd_required_locks[ERTS_PSD_ETS_FIXED_TABLES].set_locks = ERTS_PSD_ETS_FIXED_TABLES_SET_LOCKS; + + erts_psd_required_locks[ERTS_PSD_DIST_ENTRY].get_locks + = ERTS_PSD_DIST_ENTRY_GET_LOCKS; + erts_psd_required_locks[ERTS_PSD_DIST_ENTRY].set_locks + = ERTS_PSD_DIST_ENTRY_SET_LOCKS; #endif } @@ -13082,7 +13087,6 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) } erts_destroy_monitor(rmon); } - erts_deref_dist_entry(dep); } } else { ASSERT(is_pid(mon->u.pid) || is_port(mon->u.pid)); @@ -13322,7 +13326,6 @@ static void doit_exit_link(ErtsLink *lnk, void *vpcontext) erts_de_links_unlock(dep); if (rlnk) erts_destroy_link(rlnk); - erts_deref_dist_entry(dep); } break; @@ -13430,7 +13433,7 @@ erts_continue_exit_process(Process *p) ErtsMonitor *mon; ErtsProcLocks curr_locks = ERTS_PROC_LOCK_MAIN; Eterm reason = p->fvalue; - DistEntry *dep; + DistEntry *dep = NULL; erts_aint32_t state; int delay_del_proc = 0; @@ -13631,13 +13634,16 @@ erts_continue_exit_process(Process *p) if (refc_inced && !(n & ERTS_PSFLG_IN_RUNQ)) erts_proc_dec_refc(p); } - - dep = (p->flags & F_DISTRIBUTION) ? erts_this_dist_entry : NULL; + + dep = ((p->flags & F_DISTRIBUTION) + ? ERTS_PROC_SET_DIST_ENTRY(p, NULL) + : NULL); erts_proc_unlock(p, ERTS_PROC_LOCKS_ALL); if (dep) { - erts_do_net_exits(dep, reason); + erts_do_net_exits(dep, (reason == am_kill) ? am_killed : reason); + erts_deref_dist_entry(dep); } /* @@ -14045,3 +14051,24 @@ erts_dbg_check_halloc_lock(Process *p) return 0; } #endif + +void +erts_debug_later_op_foreach(void (*callback)(void*), + void (*func)(void *, ErtsThrPrgrVal, void *), + void *arg) +{ + int six; + if (!erts_thr_progress_is_blocking()) + ERTS_INTERNAL_ERROR("Not blocking thread progress"); + + for (six = 0; six < erts_no_schedulers; six++) { + ErtsSchedulerData *esdp = &erts_aligned_scheduler_data[six].esd; + ErtsThrPrgrLaterOp *lop = esdp->aux_work_data.later_op.first; + + while (lop) { + if (lop->func == callback) + func(arg, lop->later, lop->data); + lop = lop->next; + } + } +} diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 7ca37882c2..150f22556c 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -697,6 +697,11 @@ extern ErtsAlignedSchedulerData *erts_aligned_dirty_io_scheduler_data; int erts_lc_runq_is_locked(ErtsRunQueue *); #endif +void +erts_debug_later_op_foreach(void (*callback)(void*), + void (*func)(void *, ErtsThrPrgrVal, void *), + void *arg); + #ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS void erts_empty_runq(ErtsRunQueue *rq); @@ -803,14 +808,15 @@ erts_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi) #define ERTS_PSD_NIF_TRAP_EXPORT 5 #define ERTS_PSD_ETS_OWNED_TABLES 6 #define ERTS_PSD_ETS_FIXED_TABLES 7 -#define ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF 8 +#define ERTS_PSD_DIST_ENTRY 8 +#define ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF 9 /* keep last... */ -#define ERTS_PSD_SIZE 9 +#define ERTS_PSD_SIZE 10 #if !defined(HIPE) # undef ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF # undef ERTS_PSD_SIZE -# define ERTS_PSD_SIZE 8 +# define ERTS_PSD_SIZE 9 #endif typedef struct { @@ -844,6 +850,9 @@ typedef struct { #define ERTS_PSD_ETS_FIXED_TABLES_GET_LOCKS ERTS_PROC_LOCK_MAIN #define ERTS_PSD_ETS_FIXED_TABLES_SET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_DIST_ENTRY_GET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_DIST_ENTRY_SET_LOCKS ERTS_PROC_LOCK_MAIN + typedef struct { ErtsProcLocks get_locks; ErtsProcLocks set_locks; @@ -2041,6 +2050,11 @@ erts_psd_set(Process *p, int ix, void *data) #define ERTS_PROC_SET_NIF_TRAP_EXPORT(P, NTE) \ erts_psd_set((P), ERTS_PSD_NIF_TRAP_EXPORT, (void *) (NTE)) +#define ERTS_PROC_GET_DIST_ENTRY(P) \ + ((DistEntry *) erts_psd_get((P), ERTS_PSD_DIST_ENTRY)) +#define ERTS_PROC_SET_DIST_ENTRY(P, DE) \ + ((DistEntry *) erts_psd_set((P), ERTS_PSD_DIST_ENTRY, (void *) (DE))) + #ifdef HIPE #define ERTS_PROC_GET_SUSPENDED_SAVED_CALLS_BUF(P) \ ((struct saved_calls *) erts_psd_get((P), ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF)) diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 0874be7250..1c2f8f9843 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -616,7 +616,7 @@ erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize) sys_memcpy((void *) ep, (void *) edep, dist_ext_sz); ep += dist_ext_sz; if (new_edep->dep) - erts_refc_inc(&new_edep->dep->refc, 1); + erts_ref_dist_entry(new_edep->dep); new_edep->extp = ep; new_edep->ext_endp = ep + ext_sz; new_edep->heap_size = -1; @@ -629,7 +629,8 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, byte *ext, Uint size, DistEntry *dep, - ErtsAtomCache *cache) + ErtsAtomCache *cache, + Uint32 *connection_id) { #undef ERTS_EXT_FAIL #undef ERTS_EXT_HDR_FAIL @@ -650,33 +651,36 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, if (size < 2) ERTS_EXT_FAIL; + if (!dep) + ERTS_INTERNAL_ERROR("Invalid use"); + if (ep[0] != VERSION_MAGIC) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); - if (dep) - erts_dsprintf(dsbufp, - "** Got message from incompatible erlang on " - "channel %d\n", - dist_entry_channel_no(dep)); - else - erts_dsprintf(dsbufp, - "** Attempt to convert old incompatible " - "binary %d\n", - *ep); + erts_dsprintf(dsbufp, + "** Got message from incompatible erlang on " + "channel %d\n", + dist_entry_channel_no(dep)); erts_send_error_to_logger_nogl(dsbufp); ERTS_EXT_FAIL; } edep->flags = 0; edep->dep = dep; - if (dep) { - erts_de_rlock(dep); - if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) - edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; - - edep->flags |= (dep->connection_id & ERTS_DIST_EXT_CON_ID_MASK); - erts_de_runlock(dep); + + erts_de_rlock(dep); + + if ((dep->status & (ERTS_DE_SFLG_EXITING|ERTS_DE_SFLG_CONNECTED)) + != ERTS_DE_SFLG_CONNECTED) { + erts_de_runlock(dep); + return ERTS_PREP_DIST_EXT_CLOSED; } + if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) + edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; + + *connection_id = dep->connection_id; + edep->flags |= (dep->connection_id & ERTS_DIST_EXT_CON_ID_MASK); + if (ep[1] != DIST_HEADER) { if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) ERTS_EXT_HDR_FAIL; @@ -835,14 +839,15 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, ERTS_EXT_FAIL; #endif - return 0; + erts_de_runlock(dep); + + return ERTS_PREP_DIST_EXT_SUCCESS; #undef CHKSIZE #undef ERTS_EXT_FAIL #undef ERTS_EXT_HDR_FAIL - bad_hdr: - if (dep) { + bad_hdr: { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T got a corrupted distribution header from %T " @@ -855,10 +860,11 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, erts_dsprintf(dsbufp, ">>"); erts_send_warning_to_logger_nogl(dsbufp); } - fail: - if (dep) - erts_kill_dist_connection(dep, dep->connection_id); - return -1; + fail: { + erts_de_runlock(dep); + erts_kill_dist_connection(dep, *connection_id); + } + return ERTS_PREP_DIST_EXT_FAILED; } static void diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index f00426cc16..3c61d013da 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -185,8 +185,13 @@ ERTS_GLB_INLINE void *erts_dist_ext_trailer(ErtsDistExternal *); ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Uint); void *erts_dist_ext_trailer(ErtsDistExternal *); void erts_destroy_dist_ext_copy(ErtsDistExternal *); + +#define ERTS_PREP_DIST_EXT_FAILED (-1) +#define ERTS_PREP_DIST_EXT_SUCCESS (0) +#define ERTS_PREP_DIST_EXT_CLOSED (1) + int erts_prepare_dist_ext(ErtsDistExternal *, byte *, Uint, - DistEntry *, ErtsAtomCache *); + DistEntry *, ErtsAtomCache *, Uint32 *); Sint erts_decode_dist_ext_size(ErtsDistExternal *); Eterm erts_decode_dist_ext(ErtsHeapFactory* factory, ErtsDistExternal *); diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index c8925e159e..04108e5f20 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -6682,6 +6682,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, else erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len)); if (state & ERTS_PORT_SFLG_DISTRIBUTION) { + erts_atomic64_inc_nob(&prt->dist_entry->in); return erts_net_message(prt, prt->dist_entry, (byte*) hbuf, hlen, @@ -6722,6 +6723,7 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, else erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len)); if (state & ERTS_PORT_SFLG_DISTRIBUTION) { + erts_atomic64_inc_nob(&prt->dist_entry->in); if (len == 0) return erts_net_message(prt, prt->dist_entry, diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index b4ec99f902..28be4bfe37 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -418,18 +418,20 @@ make_busy(Node, Time) when is_integer(Time) -> Own = 500, freeze_node(Node, Time+Own), Data = make_busy_data(), + DCtrl = dctrl(Node), %% first make port busy Pid = spawn_link(fun () -> forever(fun () -> - dport_reg_send(Node, - '__noone__', - Data) + dctrl_dop_reg_send(Node, + '__noone__', + Data) end) end), receive after Own -> ok end, until(fun () -> - case process_info(Pid, status) of - {status, suspended} -> true; + case {DCtrl, process_info(Pid, status)} of + {DPrt, {status, suspended}} when is_port(DPrt) -> true; + {DPid, {status, waiting}} when is_pid(DPid) -> true; _ -> false end end), @@ -1703,37 +1705,38 @@ bad_dist_ext_check_msgs([M|Ms]) -> bad_dist_ext_check_msgs(Ms) end. +ensure_dctrl(Node) -> + case dctrl(Node) of + undefined -> + pong = net_adm:ping(Node), + dctrl(Node); + DCtrl -> + DCtrl + end. -dport_reg_send(Node, Name, Msg) -> - DPrt = case dport(Node) of - undefined -> - pong = net_adm:ping(Node), - dport(Node); - Prt -> - Prt - end, - port_command(DPrt, [dmsg_hdr(), - dmsg_ext({?DOP_REG_SEND, - self(), - ?COOKIE, - Name}), - dmsg_ext(Msg)]). - - -dport_send(To, Msg) -> +dctrl_send(DPrt, Data) when is_port(DPrt) -> + port_command(DPrt, Data); +dctrl_send(DPid, Data) when is_pid(DPid) -> + Ref = make_ref(), + DPid ! {send, self(), Ref, Data}, + receive {Ref, Res} -> Res end. + +dctrl_dop_reg_send(Node, Name, Msg) -> + dctrl_send(ensure_dctrl(Node), + [dmsg_hdr(), + dmsg_ext({?DOP_REG_SEND, + self(), + ?COOKIE, + Name}), + dmsg_ext(Msg)]). + +dctrl_dop_send(To, Msg) -> Node = node(To), - DPrt = case dport(Node) of - undefined -> - pong = net_adm:ping(Node), - dport(Node); - Prt -> - Prt - end, - port_command(DPrt, [dmsg_hdr(), - dmsg_ext({?DOP_SEND, - ?COOKIE, - To}), - dmsg_ext(Msg)]). + dctrl_send(ensure_dctrl(Node), + [dmsg_hdr(), + dmsg_ext({?DOP_SEND, ?COOKIE, To}), + dmsg_ext(Msg)]). + send_bad_structure(Offender,Victim,Bad,WhereToPutSelf) -> send_bad_structure(Offender,Victim,Bad,WhereToPutSelf,[]). send_bad_structure(Offender,Victim,Bad,WhereToPutSelf,PayLoad) -> @@ -1743,7 +1746,7 @@ send_bad_structure(Offender,Victim,Bad,WhereToPutSelf,PayLoad) -> fun () -> Node = node(Victim), pong = net_adm:ping(Node), - DPrt = dport(Node), + DCtrl = dctrl(Node), Bad1 = case WhereToPutSelf of 0 -> Bad; @@ -1756,7 +1759,7 @@ send_bad_structure(Offender,Victim,Bad,WhereToPutSelf,PayLoad) -> [] -> []; _Other -> [dmsg_ext(PayLoad)] end, - port_command(DPrt, DData), + dctrl_send(DCtrl, DData), Parent ! {DData,Done} end), receive @@ -1784,11 +1787,11 @@ send_bad_msgs(BadNode, To, Repeat) when is_atom(BadNode), fun () -> Node = node(To), pong = net_adm:ping(Node), - DPrt = dport(Node), + DCtrl = dctrl(Node), DData = [dmsg_hdr(), dmsg_ext({?DOP_SEND, ?COOKIE, To}), dmsg_bad_atom_cache_ref()], - repeat(fun () -> port_command(DPrt, DData) end, Repeat), + repeat(fun () -> dctrl_send(DCtrl, DData) end, Repeat), Parent ! Done end), receive Done -> ok end. @@ -1810,11 +1813,12 @@ send_bad_ctl(BadNode, ToNode) when is_atom(BadNode), is_atom(ToNode) -> replace}), CtlBeginSize = size(Ctl) - size(Replace), <<CtlBegin:CtlBeginSize/binary, Replace/binary>> = Ctl, - port_command(dport(ToNode), - [dmsg_fake_hdr2(), - CtlBegin, - dmsg_bad_atom_cache_ref(), - dmsg_ext({a, message})]), + DCtrl = dctrl(ToNode), + Data = [dmsg_fake_hdr2(), + CtlBegin, + dmsg_bad_atom_cache_ref(), + dmsg_ext({a, message})], + dctrl_send(DCtrl, Data), Parent ! Done end), receive Done -> ok end. @@ -1827,17 +1831,17 @@ send_bad_dhdr(BadNode, ToNode) when is_atom(BadNode), is_atom(ToNode) -> spawn_link(BadNode, fun () -> pong = net_adm:ping(ToNode), - port_command(dport(ToNode), dmsg_bad_hdr()), + dctrl_send(dctrl(ToNode), dmsg_bad_hdr()), Parent ! Done end), receive Done -> ok end. -dport(Node) when is_atom(Node) -> +dctrl(Node) when is_atom(Node) -> case catch erts_debug:get_internal_state(available_internal_state) of true -> true; _ -> erts_debug:set_internal_state(available_internal_state, true) end, - erts_debug:get_internal_state({dist_port, Node}). + erts_debug:get_internal_state({dist_ctrl, Node}). dmsg_hdr() -> [131, % Version Magic @@ -1979,7 +1983,7 @@ freeze_node(Node, MS) -> fun () -> erts_debug:set_internal_state(available_internal_state, true), - dport_send(Freezer, DoingIt), + dctrl_dop_send(Freezer, DoingIt), receive after Own -> ok end, erts_debug:set_internal_state(block, MS+Own) end), diff --git a/erts/emulator/test/erl_link_SUITE.erl b/erts/emulator/test/erl_link_SUITE.erl index 5622cce980..d8c5b663e3 100644 --- a/erts/emulator/test/erl_link_SUITE.erl +++ b/erts/emulator/test/erl_link_SUITE.erl @@ -533,7 +533,7 @@ freeze_node(Node, MS) -> fun () -> erts_debug:set_internal_state(available_internal_state, true), - dport_send(Freezer, DoingIt), + dctrl_dop_send(Freezer, DoingIt), receive after Own -> ok end, erts_debug:set_internal_state(block, MS+Own) end), @@ -544,20 +544,22 @@ make_busy(Node, Time) when is_integer(Time) -> Own = 500, freeze_node(Node, Time+Own), Data = busy_data(), + DCtrl = dctrl(Node), %% first make port busy Pid = spawn_link(fun () -> forever(fun () -> - dport_reg_send(Node, - '__noone__', - Data) + dctrl_dop_reg_send(Node, + '__noone__', + Data) end) end), receive after Own -> ok end, wait_until(fun () -> - case process_info(Pid, status) of - {status, suspended} -> true; - _ -> false - end + case {DCtrl, process_info(Pid, status)} of + {DPrt, {status, suspended}} when is_port(DPrt) -> true; + {DPid, {status, waiting}} when is_pid(DPid) -> true; + _ -> false + end end), %% then dist entry make_busy(Node, [nosuspend], Data), @@ -1048,42 +1050,45 @@ stop_node(Node) -> -define(DOP_DEMONITOR_P, 20). -define(DOP_MONITOR_P_EXIT, 21). -dport_send(To, Msg) -> - Node = node(To), - DPrt = case dport(Node) of - undefined -> - pong = net_adm:ping(Node), - dport(Node); - Prt -> - Prt - end, - port_command(DPrt, [dmsg_hdr(), - dmsg_ext({?DOP_SEND, - ?COOKIE, - To}), - dmsg_ext(Msg)]). - -dport_reg_send(Node, Name, Msg) -> - DPrt = case dport(Node) of - undefined -> - pong = net_adm:ping(Node), - dport(Node); - Prt -> - Prt - end, - port_command(DPrt, [dmsg_hdr(), - dmsg_ext({?DOP_REG_SEND, - self(), - ?COOKIE, - Name}), - dmsg_ext(Msg)]). - -dport(Node) when is_atom(Node) -> +ensure_dctrl(Node) -> + case dctrl(Node) of + undefined -> + pong = net_adm:ping(Node), + dctrl(Node); + DCtrl -> + DCtrl + end. + +dctrl_send(DPrt, Data) when is_port(DPrt) -> + port_command(DPrt, Data); +dctrl_send(DPid, Data) when is_pid(DPid) -> + Ref = make_ref(), + DPid ! {send, self(), Ref, Data}, + receive {Ref, Res} -> Res end. + +dctrl_dop_send(To, Msg) -> + dctrl_send(ensure_dctrl(node(To)), + [dmsg_hdr(), + dmsg_ext({?DOP_SEND, + ?COOKIE, + To}), + dmsg_ext(Msg)]). + +dctrl_dop_reg_send(Node, Name, Msg) -> + dctrl_send(ensure_dctrl(Node), + [dmsg_hdr(), + dmsg_ext({?DOP_REG_SEND, + self(), + ?COOKIE, + Name}), + dmsg_ext(Msg)]). + +dctrl(Node) when is_atom(Node) -> case catch erts_debug:get_internal_state(available_internal_state) of true -> true; _ -> erts_debug:set_internal_state(available_internal_state, true) end, - erts_debug:get_internal_state({dist_port, Node}). + erts_debug:get_internal_state({dist_ctrl, Node}). dmsg_hdr() -> [131, % Version Magic diff --git a/erts/emulator/test/node_container_SUITE.erl b/erts/emulator/test/node_container_SUITE.erl index 8e9e3cb05a..be90f929df 100644 --- a/erts/emulator/test/node_container_SUITE.erl +++ b/erts/emulator/test/node_container_SUITE.erl @@ -405,6 +405,7 @@ node_table_gc(Config) when is_list(Config) -> PreKnown = nodes(known), io:format("PreKnown = ~p~n", [PreKnown]), make_node_garbage(0, 200000, 1000, []), + receive after 1000 -> ok end, %% Wait for thread progress... PostKnown = nodes(known), PostAreas = erlang:system_info(allocated_areas), io:format("PostKnown = ~p~n", [PostKnown]), diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam Binary files differindex 58c17dc416..81527dc164 100644 --- a/erts/preloaded/ebin/erlang.beam +++ b/erts/preloaded/ebin/erlang.beam diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam Binary files differindex 6691749dcb..5416826f19 100644 --- a/erts/preloaded/ebin/erts_internal.beam +++ b/erts/preloaded/ebin/erts_internal.beam diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 72dd804412..4315b53358 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -48,6 +48,12 @@ await_sched_wall_time_modifications/2, gather_gc_info_result/1]). +-export([dist_ctrl_input_handler/2, + dist_ctrl_put_data/2, + dist_ctrl_get_data/1, + dist_ctrl_get_data_notification/1, + dist_get_stat/1]). + -deprecated([now/0]). %% Get rid of autoimports of spawn to avoid clashes with ourselves. @@ -87,6 +93,10 @@ -export_type([prepared_code/0]). +-opaque dist_handle() :: atom(). + +-export_type([dist_handle/0]). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Native code BIF stubs and their types %% (BIF's actually implemented in this module goes last in the file) @@ -1641,7 +1651,7 @@ setnode(_P1, _P2) -> erlang:nif_error(undefined). %% setnode/3 --spec erlang:setnode(P1, P2, P3) -> true when +-spec erlang:setnode(P1, P2, P3) -> dist_handle() when P1 :: atom(), P2 :: port(), P3 :: {term(), term(), term(), term()}. @@ -3204,6 +3214,47 @@ port_get_data(_Port) -> erlang:nif_error(undefined). %% +%% Distribution channel management +%% + +-spec erlang:dist_ctrl_input_handler(DHandle, InputHandler) -> 'ok' when + DHandle :: dist_handle(), + InputHandler :: pid(). + +dist_ctrl_input_handler(_DHandle, _InputHandler) -> + erlang:nif_error(undefined). + +-spec erlang:dist_ctrl_put_data(DHandle, Data) -> 'ok' when + DHandle :: dist_handle(), + Data :: iodata(). + +dist_ctrl_put_data(_DHandle, _Data) -> + erlang:nif_error(undefined). + +-spec erlang:dist_ctrl_get_data(DHandle) -> Data | 'none' when + DHandle :: dist_handle(), + Data :: iodata(). + +dist_ctrl_get_data(_DHandle) -> + erlang:nif_error(undefined). + +-spec erlang:dist_ctrl_get_data_notification(DHandle) -> 'ok' when + DHandle :: dist_handle(). + +dist_ctrl_get_data_notification(_DHandle) -> + erlang:nif_error(undefined). + +-spec erlang:dist_get_stat(DHandle) -> Res when + DHandle :: dist_handle(), + InputPackets :: non_neg_integer(), + OutputPackets :: non_neg_integer(), + PendingOutputPackets :: boolean(), + Res :: {'ok', InputPackets, OutputPackets, PendingOutputPackets}. + +dist_get_stat(_DHandle) -> + erlang:nif_error(undefined). + +%% %% If the emulator wants to perform a distributed command and %% a connection is not established to the actual node the following %% functions are called in order to set up the connection and then diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 26fb1458af..bb1824ecd4 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -61,6 +61,8 @@ -export([trace/3, trace_pattern/3]). +-export([dist_ctrl_put_data/2]). + %% Auto import name clash -export([check_process_code/1]). @@ -461,3 +463,38 @@ trace(_PidSpec, _How, _FlagList) -> FlagList :: [ ]. trace_pattern(_MFA, _MatchSpec, _FlagList) -> erlang:nif_error(undefined). + +-spec dist_ctrl_put_data(DHandle, Data) -> 'ok' when + DHandle :: erlang:dist_handle(), + Data :: iolist(). + +dist_ctrl_put_data(DHandle, IoList) -> + %% + %% Helper for erlang:dist_ctrl_put_data/2 + %% + %% erlang:dist_ctrl_put_data/2 traps to + %% this function if second argument is + %% a list... + %% + try + Binary = erlang:iolist_to_binary(IoList), + %% Restart erlang:dist_ctrl_put_data/2 + %% with the iolist converted to a binary... + erlang:dist_ctrl_put_data(DHandle, Binary) + catch + Class : Reason -> + %% Throw exception as if thrown from + %% erlang:dist_ctrl_put_data/2 ... + RootST = try erlang:error(Reason) + catch + error:Reason -> + case erlang:get_stacktrace() of + [] -> []; + ST -> tl(ST) + end + end, + StackTrace = [{erlang, dist_ctrl_put_data, + [DHandle, IoList], []} + | RootST], + erlang:raise(Class, Reason, StackTrace) + end. diff --git a/lib/kernel/examples/Makefile b/lib/kernel/examples/Makefile index 26ec58f571..f86e662838 100644 --- a/lib/kernel/examples/Makefile +++ b/lib/kernel/examples/Makefile @@ -45,7 +45,7 @@ RELSYSDIR = $(RELEASE_PATH)/lib/kernel-$(KERNEL_VSN)/examples # Pack and install the complete directory structure from # here (CWD) and down, for all examples. -EXAMPLES = uds_dist +EXAMPLES = uds_dist gen_tcp_dist release_spec: $(INSTALL_DIR) "$(RELSYSDIR)" diff --git a/lib/kernel/examples/gen_tcp_dist/Makefile b/lib/kernel/examples/gen_tcp_dist/Makefile new file mode 100644 index 0000000000..65513a1729 --- /dev/null +++ b/lib/kernel/examples/gen_tcp_dist/Makefile @@ -0,0 +1,20 @@ +RM=rm -f +CP=cp +EBIN=ebin +ERLC=erlc +# Works if building in open source source tree +KERNEL_INCLUDE=$(ERL_TOP)/lib/kernel/include +ERLCFLAGS+= -W -I$(KERNEL_INCLUDE) + +MODULES=gen_tcp_dist + +TARGET_FILES=$(MODULES:%=$(EBIN)/%.beam) + +opt: $(TARGET_FILES) + +$(EBIN)/%.beam: src/%.erl + $(ERLC) $(ERLCFLAGS) -o$(EBIN) $< + +clean: + $(RM) $(TARGET_FILES) + diff --git a/lib/kernel/examples/gen_tcp_dist/ebin/.gitignore b/lib/kernel/examples/gen_tcp_dist/ebin/.gitignore new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/lib/kernel/examples/gen_tcp_dist/ebin/.gitignore diff --git a/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl b/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl new file mode 100644 index 0000000000..98554ed805 --- /dev/null +++ b/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl @@ -0,0 +1,781 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% +-module(gen_tcp_dist). + +%% +%% This is an example of how to plug in an arbitrary distribution +%% carrier for Erlang using distribution processes. +%% +%% This example uses gen_tcp for transportation of data, but +%% you can use whatever underlying protocol you want as long +%% as your implementation reliably delivers data chunks to the +%% receiving VM in the order they were sent from the sending +%% VM. +%% +%% This code is a rewrite of the lib/kernel/src/inet_tcp_dist.erl +%% distribution impementation for TCP used by default. That +%% implementation use distribution ports instead of distribution +%% processes and is more efficient compared to this implementation. +%% This since this implementation more or less gets the +%% distribution processes in between the VM and the ports without +%% any gain specific gain. +%% + +-export([listen/1, accept/1, accept_connection/5, + setup/5, close/1, select/1, is_node_name/1]). + +%% Optional +-export([setopts/2, getopts/2]). + +%% internal exports + +-export([dist_cntrlr_setup/1, dist_cntrlr_input_setup/3, + dist_cntrlr_tick_handler/1]). + +-export([accept_loop/2,do_accept/6,do_setup/6]). + +-import(error_logger,[error_msg/2]). + +-include("net_address.hrl"). + +-include("dist.hrl"). +-include("dist_util.hrl"). + +%% ------------------------------------------------------------ +%% Select this protocol based on node name +%% select(Node) => Bool +%% ------------------------------------------------------------ + +select(Node) -> + case split_node(atom_to_list(Node), $@, []) of + [_, Host] -> + case inet:getaddr(Host, inet) of + {ok,_} -> true; + _ -> false + end; + _ -> false + end. + +%% ------------------------------------------------------------ +%% Create the listen socket, i.e. the port that this erlang +%% node is accessible through. +%% ------------------------------------------------------------ + +listen(Name) -> + case do_listen([binary, {active, false}, {packet,2}, {reuseaddr, true}]) of + {ok, Socket} -> + TcpAddress = get_tcp_address(Socket), + {_,Port} = TcpAddress#net_address.address, + ErlEpmd = net_kernel:epmd_module(), + case ErlEpmd:register_node(Name, Port) of + {ok, Creation} -> + {ok, {Socket, TcpAddress, Creation}}; + Error -> + Error + end; + Error -> + Error + end. + +do_listen(Options) -> + {First,Last} = case application:get_env(kernel,inet_dist_listen_min) of + {ok,N} when is_integer(N) -> + case application:get_env(kernel, + inet_dist_listen_max) of + {ok,M} when is_integer(M) -> + {N,M}; + _ -> + {N,N} + end; + _ -> + {0,0} + end, + do_listen(First, Last, listen_options([{backlog,128}|Options])). + +do_listen(First,Last,_) when First > Last -> + {error,eaddrinuse}; +do_listen(First,Last,Options) -> + case gen_tcp:listen(First, Options) of + {error, eaddrinuse} -> + do_listen(First+1,Last,Options); + Other -> + Other + end. + +listen_options(Opts0) -> + Opts1 = + case application:get_env(kernel, inet_dist_use_interface) of + {ok, Ip} -> + [{ip, Ip} | Opts0]; + _ -> + Opts0 + end, + case application:get_env(kernel, inet_dist_listen_options) of + {ok,ListenOpts} -> + ListenOpts ++ Opts1; + _ -> + Opts1 + end. + + +%% ------------------------------------------------------------ +%% Accepts new connection attempts from other Erlang nodes. +%% ------------------------------------------------------------ + +accept(Listen) -> + spawn_opt(?MODULE, accept_loop, [self(), Listen], [link, {priority, max}]). + +accept_loop(Kernel, Listen) -> + ?trace("~p~n",[{?MODULE, accept_loop, self()}]), + case gen_tcp:accept(Listen) of + {ok, Socket} -> + DistCtrl = spawn_dist_cntrlr(Socket), + ?trace("~p~n",[{?MODULE, accept_loop, accepted, Socket, DistCtrl, self()}]), + flush_controller(DistCtrl, Socket), + gen_tcp:controlling_process(Socket, DistCtrl), + flush_controller(DistCtrl, Socket), + Kernel ! {accept,self(),DistCtrl,inet,tcp}, + receive + {Kernel, controller, Pid} -> + call_ctrlr(DistCtrl, {supervisor, Pid}), + Pid ! {self(), controller}; + {Kernel, unsupported_protocol} -> + exit(unsupported_protocol) + end, + accept_loop(Kernel, Listen); + Error -> + exit(Error) + end. + +flush_controller(Pid, Socket) -> + receive + {tcp, Socket, Data} -> + Pid ! {tcp, Socket, Data}, + flush_controller(Pid, Socket); + {tcp_closed, Socket} -> + Pid ! {tcp_closed, Socket}, + flush_controller(Pid, Socket) + after 0 -> + ok + end. + +%% ------------------------------------------------------------ +%% Accepts a new connection attempt from another Erlang node. +%% Performs the handshake with the other side. +%% ------------------------------------------------------------ + +accept_connection(AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> + spawn_opt(?MODULE, do_accept, + [self(), AcceptPid, DistCtrl, MyNode, Allowed, SetupTime], + [link, {priority, max}]). + +do_accept(Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> + ?trace("~p~n",[{?MODULE, do_accept, self(), MyNode}]), + receive + {AcceptPid, controller} -> + Timer = dist_util:start_timer(SetupTime), + case check_ip(DistCtrl) of + true -> + HSData0 = hs_data_common(DistCtrl), + HSData = HSData0#hs_data{kernel_pid = Kernel, + this_node = MyNode, + socket = DistCtrl, + timer = Timer, + this_flags = 0, + allowed = Allowed}, + dist_util:handshake_other_started(HSData); + {false,IP} -> + error_msg("** Connection attempt from " + "disallowed IP ~w ** ~n", [IP]), + ?shutdown(no_node) + end + end. + +%% we may not always want the nodelay behaviour +%% for performance reasons + +nodelay() -> + case application:get_env(kernel, dist_nodelay) of + undefined -> + {nodelay, true}; + {ok, true} -> + {nodelay, true}; + {ok, false} -> + {nodelay, false}; + _ -> + {nodelay, true} + end. + +%% ------------------------------------------------------------ +%% Setup a new connection to another Erlang node. +%% Performs the handshake with the other side. +%% ------------------------------------------------------------ + +setup(Node, Type, MyNode, LongOrShortNames,SetupTime) -> + spawn_opt(?MODULE, do_setup, + [self(), Node, Type, MyNode, LongOrShortNames, SetupTime], + [link, {priority, max}]). + +do_setup(Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> + ?trace("~p~n",[{?MODULE, do_setup, self(), Node}]), + [Name, Address] = splitnode(Node, LongOrShortNames), + case inet:getaddr(Address, inet) of + {ok, Ip} -> + Timer = dist_util:start_timer(SetupTime), + ErlEpmd = net_kernel:epmd_module(), + case ErlEpmd:port_please(Name, Ip) of + {port, TcpPort, Version} -> + ?trace("port_please(~p) -> version ~p~n", + [Node,Version]), + dist_util:reset_timer(Timer), + case + gen_tcp:connect( + Ip, TcpPort, + connect_options([binary, {active, false}, {packet, 2}])) + of + {ok, Socket} -> + DistCtrl = spawn_dist_cntrlr(Socket), + call_ctrlr(DistCtrl, {supervisor, self()}), + flush_controller(DistCtrl, Socket), + gen_tcp:controlling_process(Socket, DistCtrl), + flush_controller(DistCtrl, Socket), + HSData0 = hs_data_common(DistCtrl), + HSData = HSData0#hs_data{kernel_pid = Kernel, + other_node = Node, + this_node = MyNode, + socket = DistCtrl, + timer = Timer, + this_flags = 0, + other_version = Version, + request_type = Type}, + dist_util:handshake_we_started(HSData); + _ -> + %% Other Node may have closed since + %% port_please ! + ?trace("other node (~p) " + "closed since port_please.~n", + [Node]), + ?shutdown(Node) + end; + _ -> + ?trace("port_please (~p) " + "failed.~n", [Node]), + ?shutdown(Node) + end; + _Other -> + ?trace("inet_getaddr(~p) " + "failed (~p).~n", [Node,_Other]), + ?shutdown(Node) + end. + +connect_options(Opts) -> + case application:get_env(kernel, inet_dist_connect_options) of + {ok,ConnectOpts} -> + ConnectOpts ++ Opts; + _ -> + Opts + end. + +%% +%% Close a socket. +%% +close(Listen) -> + gen_tcp:close(Listen). + + +%% If Node is illegal terminate the connection setup!! +splitnode(Node, LongOrShortNames) -> + case split_node(atom_to_list(Node), $@, []) of + [Name|Tail] when Tail =/= [] -> + Host = lists:append(Tail), + case split_node(Host, $., []) of + [_] when LongOrShortNames =:= longnames -> + case inet:parse_address(Host) of + {ok, _} -> + [Name, Host]; + _ -> + error_msg("** System running to use " + "fully qualified " + "hostnames **~n" + "** Hostname ~ts is illegal **~n", + [Host]), + ?shutdown(Node) + end; + L when length(L) > 1, LongOrShortNames =:= shortnames -> + error_msg("** System NOT running to use fully qualified " + "hostnames **~n" + "** Hostname ~ts is illegal **~n", + [Host]), + ?shutdown(Node); + _ -> + [Name, Host] + end; + [_] -> + error_msg("** Nodename ~p illegal, no '@' character **~n", + [Node]), + ?shutdown(Node); + _ -> + error_msg("** Nodename ~p illegal **~n", [Node]), + ?shutdown(Node) + end. + +split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])]; +split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]); +split_node([], _, Ack) -> [lists:reverse(Ack)]. + +%% ------------------------------------------------------------ +%% Fetch local information about a Socket. +%% ------------------------------------------------------------ +get_tcp_address(Socket) -> + {ok, Address} = inet:sockname(Socket), + {ok, Host} = inet:gethostname(), + #net_address { + address = Address, + host = Host, + protocol = tcp, + family = inet + }. + +%% ------------------------------------------------------------ +%% Do only accept new connection attempts from nodes at our +%% own LAN, if the check_ip environment parameter is true. +%% ------------------------------------------------------------ +check_ip(DistCtrl) -> + case application:get_env(check_ip) of + {ok, true} -> + case get_ifs(DistCtrl) of + {ok, IFs, IP} -> + check_ip(IFs, IP); + _ -> + ?shutdown(no_node) + end; + _ -> + true + end. + +get_ifs(DistCtrl) -> + Socket = call_ctrlr(DistCtrl, socket), + case inet:peername(Socket) of + {ok, {IP, _}} -> + case inet:getif(Socket) of + {ok, IFs} -> {ok, IFs, IP}; + Error -> Error + end; + Error -> + Error + end. + +check_ip([{OwnIP, _, Netmask}|IFs], PeerIP) -> + case {inet_tcp:mask(Netmask, PeerIP), inet_tcp:mask(Netmask, OwnIP)} of + {M, M} -> true; + _ -> check_ip(IFs, PeerIP) + end; +check_ip([], PeerIP) -> + {false, PeerIP}. + +is_node_name(Node) when is_atom(Node) -> + case split_node(atom_to_list(Node), $@, []) of + [_, _Host] -> true; + _ -> false + end; +is_node_name(_Node) -> + false. + +hs_data_common(DistCtrl) -> + TickHandler = call_ctrlr(DistCtrl, tick_handler), + Socket = call_ctrlr(DistCtrl, socket), + #hs_data{f_send = send_fun(), + f_recv = recv_fun(), + f_setopts_pre_nodeup = setopts_pre_nodeup_fun(), + f_setopts_post_nodeup = setopts_post_nodeup_fun(), + f_getll = getll_fun(), + f_handshake_complete = handshake_complete_fun(), + f_address = address_fun(), + mf_setopts = setopts_fun(DistCtrl, Socket), + mf_getopts = getopts_fun(DistCtrl, Socket), + mf_getstat = getstat_fun(DistCtrl, Socket), + mf_tick = tick_fun(DistCtrl, TickHandler)}. + +%%% ------------------------------------------------------------ +%%% Distribution controller processes +%%% ------------------------------------------------------------ + +%% +%% There will be five parties working together when the +%% connection is up: +%% - The gen_tcp socket. Providing a tcp/ip connection +%% to the other node. +%% - The output handler. It will dispatch all outgoing +%% traffic from the VM to the gen_tcp socket. This +%% process is registered as distribution controller +%% for this channel with the VM. +%% - The input handler. It will dispatch all incoming +%% traffic from the gen_tcp socket to the VM. This +%% process is also the socket owner and receives +%% incoming traffic using active-N. +%% - The tick handler. Dispatches asynchronous tick +%% requests to the socket. It executes on max priority +%% since it is important to get ticks through to the +%% other end. +%% - The channel supervisor (provided by dist_util). It +%% monitors traffic. Issue tick requests to the tick +%% handler when no outgoing traffic is seen and bring +%% the connection down if no incoming traffic is seen. +%% This process also executes on max priority. +%% +%% These parties are linked togheter so should one +%% of them fail, all of them are terminated and the +%% connection is taken down. +%% + +%% In order to avoid issues with lingering signal binaries +%% we enable off-heap message queue data as well as fullsweep +%% after 0. The fullsweeps will be cheap since we have more +%% or less no live data. +-define(DIST_CNTRL_COMMON_SPAWN_OPTS, + [{message_queue_data, off_heap}, + {fullsweep_after, 0}]). + +tick_fun(DistCtrl, TickHandler) -> + fun (Ctrl) when Ctrl == DistCtrl -> + TickHandler ! tick + end. + +getstat_fun(DistCtrl, Socket) -> + fun (Ctrl) when Ctrl == DistCtrl -> + case inet:getstat(Socket, [recv_cnt, send_cnt, send_pend]) of + {ok, Stat} -> + split_stat(Stat,0,0,0); + Error -> + Error + end + end. + +split_stat([{recv_cnt, R}|Stat], _, W, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_cnt, W}|Stat], R, _, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_pend, P}|Stat], R, W, _) -> + split_stat(Stat, R, W, P); +split_stat([], R, W, P) -> + {ok, R, W, P}. + +setopts_fun(DistCtrl, Socket) -> + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + setopts(Socket, Opts) + end. + +getopts_fun(DistCtrl, Socket) -> + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + getopts(Socket, Opts) + end. + +setopts(S, Opts) -> + case [Opt || {K,_}=Opt <- Opts, + K =:= active orelse K =:= deliver orelse K =:= packet] of + [] -> inet:setopts(S,Opts); + Opts1 -> {error, {badopts,Opts1}} + end. + +getopts(S, Opts) -> + inet:getopts(S, Opts). + +send_fun() -> + fun (Ctrlr, Packet) -> + call_ctrlr(Ctrlr, {send, Packet}) + end. + +recv_fun() -> + fun (Ctrlr, Length, Timeout) -> + case call_ctrlr(Ctrlr, {recv, Length, Timeout}) of + {ok, Bin} when is_binary(Bin) -> + {ok, binary_to_list(Bin)}; + Other -> + Other + end + end. + +getll_fun() -> + fun (Ctrlr) -> + call_ctrlr(Ctrlr, getll) + end. + +address_fun() -> + fun (Ctrlr, Node) -> + case call_ctrlr(Ctrlr, {address, Node}) of + {error, no_node} -> %% No '@' or more than one '@' in node name. + ?shutdown(no_node); + Res -> + Res + end + end. + +setopts_pre_nodeup_fun() -> + fun (Ctrlr) -> + call_ctrlr(Ctrlr, pre_nodeup) + end. + +setopts_post_nodeup_fun() -> + fun (Ctrlr) -> + call_ctrlr(Ctrlr, post_nodeup) + end. + +handshake_complete_fun() -> + fun (Ctrlr, Node, DHandle) -> + call_ctrlr(Ctrlr, {handshake_complete, Node, DHandle}) + end. + +call_ctrlr(Ctrlr, Msg) -> + Ref = erlang:monitor(process, Ctrlr), + Ctrlr ! {Ref, self(), Msg}, + receive + {Ref, Res} -> + erlang:demonitor(Ref, [flush]), + Res; + {'DOWN', Ref, process, Ctrlr, Reason} -> + exit({dist_controller_exit, Reason}) + end. + +%% +%% The tick handler process writes a tick to the +%% socket when it receives a 'tick' message from +%% the connection supervisor. +%% +%% We are not allowed to block the connection +%% superviser when writing a tick and we also want +%% the tick to go through even during a heavily +%% loaded system. gen_tcp does not have a +%% non-blocking send operation exposed in its API +%% and we don't want to run the distribution +%% controller under high priority. Therefore this +%% sparate process with max prio that dispatches +%% ticks. +%% +dist_cntrlr_tick_handler(Socket) -> + receive + tick -> + %% May block due to busy port... + sock_send(Socket, ""); + _ -> + ok + end, + dist_cntrlr_tick_handler(Socket). + +spawn_dist_cntrlr(Socket) -> + spawn_opt(?MODULE, dist_cntrlr_setup, [Socket], + [{priority, max}] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS). + +dist_cntrlr_setup(Socket) -> + TickHandler = spawn_opt(?MODULE, dist_cntrlr_tick_handler, + [Socket], + [link, {priority, max}] + ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS), + dist_cntrlr_setup_loop(Socket, TickHandler, undefined). + +%% +%% During the handshake phase we loop in dist_cntrlr_setup(). +%% When the connection is up we spawn an input handler and +%% continue as output handler. +%% +dist_cntrlr_setup_loop(Socket, TickHandler, Sup) -> + receive + {tcp_closed, Socket} -> + exit(connection_closed); + + {Ref, From, {supervisor, Pid}} -> + Res = link(Pid), + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Pid); + + {Ref, From, tick_handler} -> + From ! {Ref, TickHandler}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, socket} -> + From ! {Ref, Socket}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, {send, Packet}} -> + Res = gen_tcp:send(Socket, Packet), + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, {recv, Length, Timeout}} -> + Res = gen_tcp:recv(Socket, Length, Timeout), + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, getll} -> + From ! {Ref, {ok, self()}}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, {address, Node}} -> + Res = case inet:peername(Socket) of + {ok, Address} -> + case split_node(atom_to_list(Node), $@, []) of + [_,Host] -> + #net_address{address=Address,host=Host, + protocol=tcp, family=inet}; + _ -> + {error, no_node} + end + end, + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, pre_nodeup} -> + Res = inet:setopts(Socket, + [{active, false}, + {packet, 4}, + nodelay()]), + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, post_nodeup} -> + Res = inet:setopts(Socket, + [{active, false}, + {packet, 4}, + nodelay()]), + From ! {Ref, Res}, + dist_cntrlr_setup_loop(Socket, TickHandler, Sup); + + {Ref, From, {handshake_complete, _Node, DHandle}} -> + From ! {Ref, ok}, + %% Handshake complete! Begin dispatching traffic... + + %% We use separate process for dispatching input. This + %% is not necessary, but it enables parallel execution + %% of independent work loads at the same time as it + %% simplifies the the implementation... + InputHandler = spawn_opt(?MODULE, dist_cntrlr_input_setup, + [DHandle, Socket, Sup], + [link] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS), + + flush_controller(InputHandler, Socket), + gen_tcp:controlling_process(Socket, InputHandler), + flush_controller(InputHandler, Socket), + + ok = erlang:dist_ctrl_input_handler(DHandle, InputHandler), + + InputHandler ! DHandle, + + %% From now on we execute on normal priority + process_flag(priority, normal), + erlang:dist_ctrl_get_data_notification(DHandle), + dist_cntrlr_output_loop(DHandle, Socket) + end. + +%% We use active 10 for good throughput while still +%% maintaining back-pressure if the input controller +%% isn't able to handle all incoming messages... +-define(ACTIVE_INPUT, 10). + +dist_cntrlr_input_setup(DHandle, Socket, Sup) -> + link(Sup), + %% Ensure we don't try to put data before registerd + %% as input handler... + receive + DHandle -> + dist_cntrlr_input_loop(DHandle, Socket, 0) + end. + +dist_cntrlr_input_loop(DHandle, Socket, N) when N =< ?ACTIVE_INPUT/2 -> + inet:setopts(Socket, [{active, ?ACTIVE_INPUT - N}]), + dist_cntrlr_input_loop(DHandle, Socket, ?ACTIVE_INPUT); +dist_cntrlr_input_loop(DHandle, Socket, N) -> + receive + {tcp_closed, Socket} -> + %% Connection to remote node terminated... + exit(connection_closed); + + {tcp, Socket, Data} -> + %% Incoming data from remote node... + try erlang:dist_ctrl_put_data(DHandle, Data) + catch _ : _ -> death_row() + end, + dist_cntrlr_input_loop(DHandle, Socket, N-1); + + _ -> + %% Ignore... + dist_cntrlr_input_loop(DHandle, Socket, N) + end. + +dist_cntrlr_send_data(DHandle, Socket) -> + case erlang:dist_ctrl_get_data(DHandle) of + none -> + erlang:dist_ctrl_get_data_notification(DHandle); + Data -> + sock_send(Socket, Data), + dist_cntrlr_send_data(DHandle, Socket) + end. + + +dist_cntrlr_output_loop(DHandle, Socket) -> + receive + dist_data -> + %% Outgoing data from this node... + try dist_cntrlr_send_data(DHandle, Socket) + catch _ : _ -> death_row() + end, + dist_cntrlr_output_loop(DHandle, Socket); + + {send, From, Ref, Data} -> + %% This is for testing only! + %% + %% Needed by some OTP distribution + %% test suites... + sock_send(Socket, Data), + From ! {Ref, ok}, + dist_cntrlr_output_loop(DHandle, Socket); + + _ -> + %% Drop garbage message... + dist_cntrlr_output_loop(DHandle, Socket) + + end. + +sock_send(Socket, Data) -> + try gen_tcp:send(Socket, Data) of + ok -> ok; + {error, Reason} -> death_row({send_error, Reason}) + catch + Type : Reason -> death_row({send_error, {Type, Reason}}) + end. + +death_row() -> + death_row(connection_closed). + +death_row(normal) -> + %% We do not want to exit with normal + %% exit reason since it wont bring down + %% linked processes... + death_row(); +death_row(Reason) -> + %% When the connection is on its way down operations + %% begin to fail. We catch the failures and call + %% this function waiting for termination. We should + %% be terminated by one of our links to the other + %% involved parties that began bringing the + %% connection down. By waiting for termination we + %% avoid altering the exit reason for the connection + %% teardown. We however limit the wait to 5 seconds + %% and bring down the connection ourselves if not + %% terminated... + receive after 5000 -> exit(Reason) end. diff --git a/lib/kernel/include/dist.hrl b/lib/kernel/include/dist.hrl index d6bccdf474..db4a5eaebc 100644 --- a/lib/kernel/include/dist.hrl +++ b/lib/kernel/include/dist.hrl @@ -40,3 +40,33 @@ -define(DFLAG_UTF8_ATOMS, 16#10000). -define(DFLAG_MAP_TAG, 16#20000). -define(DFLAG_BIG_CREATION, 16#40000). +-define(DFLAG_SEND_SENDER, 16#80000). + +%% DFLAGs that require strict ordering or:ed together... +-define(DFLAGS_STRICT_ORDER_DELIVERY, + ?DFLAG_DIST_HDR_ATOM_CACHE). + + +%% Also update dflag2str() in ../src/dist_util.erl +%% when adding flags... + +-define(DFLAGS_ALL, + (?DFLAG_PUBLISHED + bor ?DFLAG_ATOM_CACHE + bor ?DFLAG_EXTENDED_REFERENCES + bor ?DFLAG_DIST_MONITOR + bor ?DFLAG_FUN_TAGS + bor ?DFLAG_DIST_MONITOR_NAME + bor ?DFLAG_HIDDEN_ATOM_CACHE + bor ?DFLAG_NEW_FUN_TAGS + bor ?DFLAG_EXTENDED_PIDS_PORTS + bor ?DFLAG_EXPORT_PTR_TAG + bor ?DFLAG_BIT_BINARIES + bor ?DFLAG_NEW_FLOATS + bor ?DFLAG_UNICODE_IO + bor ?DFLAG_DIST_HDR_ATOM_CACHE + bor ?DFLAG_SMALL_ATOM_TAGS + bor ?DFLAG_UTF8_ATOMS + bor ?DFLAG_MAP_TAG + bor ?DFLAG_BIG_CREATION + bor ?DFLAG_SEND_SENDER)). diff --git a/lib/kernel/include/dist_util.hrl b/lib/kernel/include/dist_util.hrl index e3d2fe0eb6..eeb0f8dd43 100644 --- a/lib/kernel/include/dist_util.hrl +++ b/lib/kernel/include/dist_util.hrl @@ -29,9 +29,9 @@ -endif. -ifdef(dist_trace). --define(trace(Fmt,Args), io:format("~p ~p:~s",[erlang:timestamp(),node(),lists:flatten(io_lib:format(Fmt, Args))])). +-define(trace(Fmt,Args), io:format("~p ~p:~s",[erlang:convert_time_unit(erlang:monotonic_time()-erlang:system_info(start_time), native, microsecond),node(),lists:flatten(io_lib:format(Fmt, Args))])). % Use the one below for config-file (early boot) connection tracing -%-define(trace(Fmt,Args), erlang:display([erlang:now(),node(),lists:flatten(io_lib:format(Fmt, Args))])). +%-define(trace(Fmt,Args), erlang:display([erlang:convert_time_unit(erlang:monotonic_time()-erlang:system_info(start_time), native, microsecond),node(),lists:flatten(io_lib:format(Fmt, Args))])). -define(trace_factor,8). -else. -define(trace(Fmt,Args), ok). @@ -78,7 +78,13 @@ %% New in kernel-5.1 (OTP 19.1): mf_setopts, %% netkernel:setopts on active connection - mf_getopts %% netkernel:getopts on active connection + mf_getopts, %% netkernel:getopts on active connection + + %% New in kernel-6.0 (OTP 21.0) + f_handshake_complete, %% Notify handshake complete + add_flags, %% dflags to add + reject_flags, %% dflags not to use (not all can be rejected) + require_flags %% dflags that are required }). diff --git a/lib/kernel/src/dist_util.erl b/lib/kernel/src/dist_util.erl index b3507e5d13..08bd5946cd 100644 --- a/lib/kernel/src/dist_util.erl +++ b/lib/kernel/src/dist_util.erl @@ -74,6 +74,48 @@ ticked = 0 }). +dflag2str(?DFLAG_PUBLISHED) -> + "PUBLISHED"; +dflag2str(?DFLAG_ATOM_CACHE) -> + "ATOM_CACHE"; +dflag2str(?DFLAG_EXTENDED_REFERENCES) -> + "EXTENDED_REFERENCES"; +dflag2str(?DFLAG_DIST_MONITOR) -> + "DIST_MONITOR"; +dflag2str(?DFLAG_FUN_TAGS) -> + "FUN_TAGS"; +dflag2str(?DFLAG_DIST_MONITOR_NAME) -> + "DIST_MONITOR_NAME"; +dflag2str(?DFLAG_HIDDEN_ATOM_CACHE) -> + "HIDDEN_ATOM_CACHE"; +dflag2str(?DFLAG_NEW_FUN_TAGS) -> + "NEW_FUN_TAGS"; +dflag2str(?DFLAG_EXTENDED_PIDS_PORTS) -> + "EXTENDED_PIDS_PORTS"; +dflag2str(?DFLAG_EXPORT_PTR_TAG) -> + "EXPORT_PTR_TAG"; +dflag2str(?DFLAG_BIT_BINARIES) -> + "BIT_BINARIES"; +dflag2str(?DFLAG_NEW_FLOATS) -> + "NEW_FLOATS"; +dflag2str(?DFLAG_UNICODE_IO) -> + "UNICODE_IO"; +dflag2str(?DFLAG_DIST_HDR_ATOM_CACHE) -> + "DIST_HDR_ATOM_CACHE"; +dflag2str(?DFLAG_SMALL_ATOM_TAGS) -> + "SMALL_ATOM_TAGS"; +dflag2str(?DFLAG_UTF8_ATOMS) -> + "UTF8_ATOMS"; +dflag2str(?DFLAG_MAP_TAG) -> + "MAP_TAG"; +dflag2str(?DFLAG_BIG_CREATION) -> + "BIG_CREATION"; +dflag2str(?DFLAG_SEND_SENDER) -> + "SEND_SENDER"; +dflag2str(_) -> + "UNKNOWN". + + remove_flag(Flag, Flags) -> case Flags band Flag of 0 -> @@ -82,13 +124,13 @@ remove_flag(Flag, Flags) -> Flags - Flag end. -adjust_flags(ThisFlags, OtherFlags) -> +adjust_flags(ThisFlags, OtherFlags, RejectFlags) -> case (?DFLAG_PUBLISHED band ThisFlags) band OtherFlags of 0 -> {remove_flag(?DFLAG_PUBLISHED, ThisFlags), remove_flag(?DFLAG_PUBLISHED, OtherFlags)}; _ -> - {ThisFlags, OtherFlags} + {ThisFlags, OtherFlags band (bnot RejectFlags)} end. publish_flag(hidden, _) -> @@ -101,36 +143,71 @@ publish_flag(_, OtherNode) -> 0 end. -make_this_flags(RequestType, OtherNode) -> - publish_flag(RequestType, OtherNode) bor - %% The parenthesis below makes the compiler generate better code. - (?DFLAG_EXPORT_PTR_TAG bor - ?DFLAG_EXTENDED_PIDS_PORTS bor - ?DFLAG_EXTENDED_REFERENCES bor - ?DFLAG_DIST_MONITOR bor - ?DFLAG_FUN_TAGS bor - ?DFLAG_DIST_MONITOR_NAME bor - ?DFLAG_HIDDEN_ATOM_CACHE bor - ?DFLAG_NEW_FUN_TAGS bor - ?DFLAG_BIT_BINARIES bor - ?DFLAG_NEW_FLOATS bor - ?DFLAG_UNICODE_IO bor - ?DFLAG_DIST_HDR_ATOM_CACHE bor - ?DFLAG_SMALL_ATOM_TAGS bor - ?DFLAG_UTF8_ATOMS bor - ?DFLAG_MAP_TAG bor - ?DFLAG_BIG_CREATION). - -handshake_other_started(#hs_data{request_type=ReqType}=HSData0) -> +-define(DFLAGS_REMOVABLE, + (?DFLAG_DIST_HDR_ATOM_CACHE + bor ?DFLAG_HIDDEN_ATOM_CACHE + bor ?DFLAG_ATOM_CACHE)). + +-define(DFLAGS_ADDABLE, + (?DFLAGS_ALL + band (bnot (?DFLAG_PUBLISHED + bor ?DFLAG_HIDDEN_ATOM_CACHE + bor ?DFLAG_ATOM_CACHE)))). + +-define(DFLAGS_THIS_DEFAULT, + (?DFLAG_EXPORT_PTR_TAG + bor ?DFLAG_EXTENDED_PIDS_PORTS + bor ?DFLAG_EXTENDED_REFERENCES + bor ?DFLAG_DIST_MONITOR + bor ?DFLAG_FUN_TAGS + bor ?DFLAG_DIST_MONITOR_NAME + bor ?DFLAG_NEW_FUN_TAGS + bor ?DFLAG_BIT_BINARIES + bor ?DFLAG_NEW_FLOATS + bor ?DFLAG_UNICODE_IO + bor ?DFLAG_DIST_HDR_ATOM_CACHE + bor ?DFLAG_SMALL_ATOM_TAGS + bor ?DFLAG_UTF8_ATOMS + bor ?DFLAG_MAP_TAG + bor ?DFLAG_BIG_CREATION + bor ?DFLAG_SEND_SENDER)). + +make_this_flags(RequestType, AddFlags, RemoveFlags, OtherNode) -> + case RemoveFlags band (bnot ?DFLAGS_REMOVABLE) of + 0 -> ok; + Rerror -> exit({"Rejecting non rejectable flags", Rerror}) + end, + case AddFlags band (bnot ?DFLAGS_ADDABLE) of + 0 -> ok; + Aerror -> exit({"Adding non addable flags", Aerror}) + end, + Flgs0 = ?DFLAGS_THIS_DEFAULT, + Flgs1 = Flgs0 bor publish_flag(RequestType, OtherNode), + Flgs2 = Flgs1 bor AddFlags, + Flgs3 = Flgs2 band (bnot (?DFLAG_HIDDEN_ATOM_CACHE + bor ?DFLAG_ATOM_CACHE)), + Flgs3 band (bnot RemoveFlags). + +handshake_other_started(#hs_data{request_type=ReqType, + add_flags=AddFlgs0, + reject_flags=RejFlgs0, + require_flags=ReqFlgs0}=HSData0) -> + AddFlgs = convert_flags(AddFlgs0), + RejFlgs = convert_flags(RejFlgs0), + ReqFlgs = convert_flags(ReqFlgs0), {PreOtherFlags,Node,Version} = recv_name(HSData0), - PreThisFlags = make_this_flags(ReqType, Node), + PreThisFlags = make_this_flags(ReqType, AddFlgs, RejFlgs, Node), {ThisFlags, OtherFlags} = adjust_flags(PreThisFlags, - PreOtherFlags), + PreOtherFlags, + RejFlgs), HSData = HSData0#hs_data{this_flags=ThisFlags, other_flags=OtherFlags, other_version=Version, other_node=Node, - other_started=true}, + other_started=true, + add_flags=AddFlgs, + reject_flags=RejFlgs, + require_flags=ReqFlgs}, check_dflags(HSData), is_allowed(HSData), ?debug({"MD5 connection from ~p (V~p)~n", @@ -165,23 +242,18 @@ is_allowed(#hs_data{other_node = Node, end. %% -%% Check that both nodes can handle the same types of extended -%% node containers. If they can not, abort the connection. +%% Check mandatory flags... %% check_dflags(#hs_data{other_node = Node, other_flags = OtherFlags, - other_started = OtherStarted} = HSData) -> - - Mandatory = [{?DFLAG_EXTENDED_REFERENCES, "EXTENDED_REFERENCES"}, - {?DFLAG_EXTENDED_PIDS_PORTS, "EXTENDED_PIDS_PORTS"}, - {?DFLAG_UTF8_ATOMS, "UTF8_ATOMS"}], - Missing = lists:filtermap(fun({Bit, Str}) -> - case Bit band OtherFlags of - Bit -> false; - 0 -> {true, Str} - end - end, - Mandatory), + other_started = OtherStarted, + require_flags = RequiredFlags} = HSData) -> + Mandatory = ((?DFLAG_EXTENDED_REFERENCES + bor ?DFLAG_EXTENDED_PIDS_PORTS + bor ?DFLAG_UTF8_ATOMS) + bor RequiredFlags), + Missing = check_mandatory(0, ?DFLAGS_ALL, Mandatory, + OtherFlags, []), case Missing of [] -> ok; @@ -201,6 +273,22 @@ check_dflags(#hs_data{other_node = Node, ?shutdown2(Node, {check_dflags_failed, Missing}) end. +check_mandatory(_Bit, 0, _Mandatory, _OtherFlags, Missing) -> + Missing; +check_mandatory(Bit, Left, Mandatory, OtherFlags, Missing) -> + DFlag = (1 bsl Bit), + NewLeft = Left band (bnot DFlag), + NewMissing = case {DFlag band Mandatory, + DFlag band OtherFlags} of + {DFlag, 0} -> + %% Mandatory and missing... + [dflag2str(DFlag) | Missing]; + _ -> + %% Not mandatory or present... + Missing + end, + check_mandatory(Bit+1, NewLeft, Mandatory, OtherFlags, NewMissing). + %% No nodedown will be sent if we fail before this process has %% succeeded to mark the node as pending. @@ -314,13 +402,24 @@ flush_down() -> end. handshake_we_started(#hs_data{request_type=ReqType, - other_node=Node}=PreHSData) -> - PreThisFlags = make_this_flags(ReqType, Node), - HSData = PreHSData#hs_data{this_flags=PreThisFlags}, + other_node=Node, + add_flags=AddFlgs0, + reject_flags=RejFlgs0, + require_flags=ReqFlgs0}=PreHSData) -> + AddFlgs = convert_flags(AddFlgs0), + RejFlgs = convert_flags(RejFlgs0), + ReqFlgs = convert_flags(ReqFlgs0), + PreThisFlags = make_this_flags(ReqType, AddFlgs, RejFlgs, Node), + HSData = PreHSData#hs_data{this_flags = PreThisFlags, + add_flags = AddFlgs, + reject_flags = RejFlgs, + require_flags = ReqFlgs}, send_name(HSData), recv_status(HSData), {PreOtherFlags,ChallengeA} = recv_challenge(HSData), - {ThisFlags,OtherFlags} = adjust_flags(PreThisFlags, PreOtherFlags), + {ThisFlags,OtherFlags} = adjust_flags(PreThisFlags, + PreOtherFlags, + RejFlgs), NewHSData = HSData#hs_data{this_flags = ThisFlags, other_flags = OtherFlags, other_started = false}, @@ -336,15 +435,16 @@ handshake_we_started(#hs_data{request_type=ReqType, handshake_we_started(OldHsData) when element(1,OldHsData) =:= hs_data -> handshake_we_started(convert_old_hsdata(OldHsData)). -convert_old_hsdata({hs_data, KP, ON, TN, S, T, TF, A, OV, OF, OS, FS, FR, - FS_PRE, FS_POST, FG, FA, MFT, MFG, RT}) -> - #hs_data{ - kernel_pid = KP, other_node = ON, this_node = TN, socket = S, timer = T, - this_flags = TF, allowed = A, other_version = OV, other_flags = OF, - other_started = OS, f_send = FS, f_recv = FR, f_setopts_pre_nodeup = FS_PRE, - f_setopts_post_nodeup = FS_POST, f_getll = FG, f_address = FA, - mf_tick = MFT, mf_getstat = MFG, request_type = RT}. +convert_old_hsdata(OldHsData) -> + OHSDL = tuple_to_list(OldHsData), + NoMissing = tuple_size(#hs_data{}) - tuple_size(OldHsData), + true = NoMissing > 0, + list_to_tuple(OHSDL ++ lists:duplicate(NoMissing, undefined)). +convert_flags(Flags) when is_integer(Flags) -> + Flags; +convert_flags(_Undefined) -> + 0. %% -------------------------------------------------------------- %% The connection has been established. @@ -359,15 +459,20 @@ connection(#hs_data{other_node = Node, PType = publish_type(HSData#hs_data.other_flags), case FPreNodeup(Socket) of ok -> - do_setnode(HSData), % Succeeds or exits the process. + DHandle = do_setnode(HSData), % Succeeds or exits the process. Address = FAddress(Socket,Node), mark_nodeup(HSData,Address), case FPostNodeup(Socket) of ok -> + case HSData#hs_data.f_handshake_complete of + undefined -> ok; + HsComplete -> HsComplete(Socket, Node, DHandle) + end, con_loop({HSData#hs_data.kernel_pid, Node, Socket, PType, + DHandle, HSData#hs_data.mf_tick, HSData#hs_data.mf_getstat, HSData#hs_data.mf_setopts, @@ -425,18 +530,16 @@ do_setnode(#hs_data{other_node = Node, socket = Socket, [Node, Port, {publish_type(Flags), '(', Flags, ')', Version}]), - case (catch - erlang:setnode(Node, Port, - {Flags, Version, '', ''})) of - {'EXIT', {system_limit, _}} -> + try + erlang:setnode(Node, Port, {Flags, Version, '', ''}) + catch + error:system_limit -> error_msg("** Distribution system limit reached, " "no table space left for node ~w ** ~n", [Node]), ?shutdown(Node); - {'EXIT', Other} -> - exit(Other); - _Else -> - ok + error:Other -> + exit({Other, erlang:get_stacktrace()}) end; _ -> error_msg("** Distribution connection error, " @@ -468,7 +571,13 @@ mark_nodeup(#hs_data{kernel_pid = Kernel, ?shutdown(Node) end. -con_loop({Kernel, Node, Socket, Type, MFTick, MFGetstat, MFSetOpts, MFGetOpts}=ConData, +getstat(DHandle, _Socket, undefined) -> + erlang:dist_get_stat(DHandle); +getstat(_DHandle, Socket, MFGetstat) -> + MFGetstat(Socket). + +con_loop({Kernel, Node, Socket, Type, DHandle, MFTick, MFGetstat, + MFSetOpts, MFGetOpts}=ConData, Tick) -> receive {tcp_closed, Socket} -> @@ -476,7 +585,7 @@ con_loop({Kernel, Node, Socket, Type, MFTick, MFGetstat, MFSetOpts, MFGetOpts}=C {Kernel, disconnect} -> ?shutdown2(Node, disconnected); {Kernel, aux_tick} -> - case MFGetstat(Socket) of + case getstat(DHandle, Socket, MFGetstat) of {ok, _, _, PendWrite} -> send_tick(Socket, PendWrite, MFTick); _ -> @@ -484,7 +593,7 @@ con_loop({Kernel, Node, Socket, Type, MFTick, MFGetstat, MFSetOpts, MFGetOpts}=C end, con_loop(ConData, Tick); {Kernel, tick} -> - case send_tick(Socket, Tick, Type, + case send_tick(DHandle, Socket, Tick, Type, MFTick, MFGetstat) of {ok, NewTick} -> con_loop(ConData, NewTick); @@ -497,7 +606,7 @@ con_loop({Kernel, Node, Socket, Type, MFTick, MFGetstat, MFSetOpts, MFGetOpts}=C ?shutdown2(Node, send_net_tick_failed) end; {From, get_status} -> - case MFGetstat(Socket) of + case getstat(DHandle, Socket, MFGetstat) of {ok, Read, Write, _} -> From ! {self(), get_status, {ok, Read, Write}}, con_loop(ConData, Tick); @@ -735,14 +844,14 @@ send_status(#hs_data{socket = Socket, other_node = Node, %% we haven't read anything as a hidden node only ticks when it receives %% a TICK !! -send_tick(Socket, Tick, Type, MFTick, MFGetstat) -> +send_tick(DHandle, Socket, Tick, Type, MFTick, MFGetstat) -> #tick{tick = T0, read = Read, write = Write, ticked = Ticked} = Tick, T = T0 + 1, T1 = T rem 4, - case MFGetstat(Socket) of + case getstat(DHandle, Socket, MFGetstat) of {ok, Read, _, _} when Ticked =:= T -> {error, not_responding}; {ok, Read, W, Pend} when Type =:= hidden -> @@ -771,11 +880,10 @@ send_tick(Socket, Tick, Type, MFTick, MFGetstat) -> Error end. -send_tick(Socket, 0, MFTick) -> - MFTick(Socket); -send_tick(_, _Pend, _) -> - %% Dont send tick if pending write. - ok. +send_tick(_, Pend, _) when Pend /= false, Pend /= 0 -> + ok; %% Dont send tick if pending write. +send_tick(Socket, _Pend, MFTick) -> + MFTick(Socket). %% ------------------------------------------------------------ %% Connection setup timeout timer. diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index ddda396713..fb4faea420 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -423,8 +423,8 @@ handle_call({connect, Type, Node}, From, State) -> {ok, SetupPid} -> Owners = [{SetupPid, Node} | State#state.conn_owners], {noreply,State#state{conn_owners=Owners}}; - _ -> - ?connect_failure(Node, {setup_call, failed}), + _Error -> + ?connect_failure(Node, {setup_call, failed, _Error}), async_reply({reply, false, State}, From) end end; diff --git a/lib/tools/emacs/erlang.el b/lib/tools/emacs/erlang.el index 012de479d3..e6e2050f29 100644 --- a/lib/tools/emacs/erlang.el +++ b/lib/tools/emacs/erlang.el @@ -900,6 +900,11 @@ resulting regexp is surrounded by \\_< and \\_>." "display" "display_nl" "display_string" + "dist_get_stat" + "dist_ctrl_get_data" + "dist_ctrl_get_data_notification" + "dist_ctrl_input_handler" + "dist_ctrl_put_data" "dist_exit" "dlink" "dmonitor_node" |