From 78fad16ef7c5477239bc0b51125fabfe6567039d Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 30 Jun 2017 16:32:42 +0200 Subject: Support for distribution controller processes --- erts/doc/src/alt_dist.xml | 653 +++++++++++++++++++++++- erts/doc/src/erlang.xml | 154 ++++++ erts/emulator/beam/atom.names | 2 + erts/emulator/beam/bif.tab | 6 + erts/emulator/beam/dist.c | 872 ++++++++++++++++++++++++++------- erts/emulator/beam/erl_bif_info.c | 4 +- erts/emulator/beam/erl_node_tables.c | 26 +- erts/emulator/beam/erl_node_tables.h | 16 +- erts/emulator/beam/erl_process.c | 16 +- erts/emulator/beam/erl_process.h | 15 +- erts/emulator/beam/external.c | 56 ++- erts/emulator/beam/external.h | 7 +- erts/emulator/beam/io.c | 2 + erts/preloaded/ebin/erlang.beam | Bin 106204 -> 107148 bytes erts/preloaded/ebin/erts_internal.beam | Bin 11100 -> 11868 bytes erts/preloaded/src/erlang.erl | 53 +- erts/preloaded/src/erts_internal.erl | 37 ++ 17 files changed, 1701 insertions(+), 218 deletions(-) (limited to 'erts') diff --git a/erts/doc/src/alt_dist.xml b/erts/doc/src/alt_dist.xml index be969a8267..3d87e9dcdb 100644 --- a/erts/doc/src/alt_dist.xml +++ b/erts/doc/src/alt_dist.xml @@ -47,23 +47,25 @@ runs on. The reason the C code is not made portable, is simply readability.

- -

This section was written a long time ago. Most of it is still - valid, but some things have changed since then. - Most notably is the driver interface. Some updates have been made - to the documentation of the driver presented here, - but more can be done and is planned for the future. - The reader is encouraged to read the - erl_driver and - driver_entry - documentation also.

-
-
Introduction

To implement a new carrier for the Erlang distribution, the main steps are as follows.

+

+ As of ERTS version 10.0 support for distribution controller + processes has been introduced. That is, the traffic over a + distribution channel can be managed by a process instead of + only by a port. This makes it possible to implement large + parts of the logic in Erlang code, and you perhaps do not + even need a new driver for the protocol. One example could + be Erlang distribution over UDP using gen_udp (your + Erlang code will of course have to take care of retranspissions, + etc in this example). That is, depending on what you want + to do you perhaps do not need to implement a driver at all + and can then skip the driver related sections below. +

+
Writing an Erlang Driver

First, the protocol must be available to the Erlang machine, which @@ -151,8 +153,635 @@

+
+ + Distribution Module +

+ The distribution module expose an API that net_kernel call + in order to manage connections to other nodes. The module name + should have the suffix _dist. +

+

+ The module needs to create some kind of listening entity (process + or port) and an acceptor process that accepts incoming connections + using the listening entity. For each connection, the module at least + needs to create one connection supervisor process, which also is + responsible for the handshake when setting up the connection, and + a distribution controller (process or port) responsible for + transport of data over the connection. The distribution controller + and the connection supervisor process should be linked together + so both of them are cleaned up when the connection is taken down. +

+

+ Note that there need to be exactly one distribution controller + per connection. A process or port can only be distribution + controller for one connection. The registration as distribution + controller cannot be undone. It will stick until the distribution + controller terminates. The distribution controller should not + ignore exit signals. It is allowed to trap exits, but it should + then voluntarily terminate when an exit signal is received. +

+ +
+ + Exported Callback Functions + +

+ The following functions are mandatory: +

+ + listen(Name) ->
  {ok, {Listen, Address, Creation}} | {error, Error}
+ +

+ listen/1 is called once in order to listen for incoming + connection requests. The call is made when the distribution is brought + up. The argument Name is the part of the node name before + the @ sign in the full node name. It can be either an atom or a + string. +

+

+ The return value consists of a Listen handle (which is + later passed to the accept/1 + callback), Address which is a #net_address{} record + with information about the address for the node (the + #net_address{} record is defined in + kernel/include/net_address.hrl), and Creation which + (currently) is an integer 1, 2, or 3. +

+

+ If epmd is to be used + for node discovery, you typically want to use the (unfortunately + undocumented) erl_epmd module (part of the kernel + application) in order to register the listen port with epmd + and retrieve Creation to use. +

+
+ + accept(Listen) ->
  AcceptorPid
+ +

+ accept/1 should spawn a process that accepts connections. This + process should preferably execute on max priority. The process + identifier of this process should be returned. +

+

+ The Listen argument will be the same as the Listen handle + part of the return value of the + listen/1 callback above. + accept/1 is called only once when the distribution protocol is + started. +

+

+ The caller of this function is a representative for net_kernel + (this may or may not be the process registered as net_kernel) + and is in this document identified as Kernel. + When a connection has been accepted by the acceptor process, it needs + to inform Kernel about the accepted connection. This is done by + passing a message on the form: +

+ +

+ DistController is either the process or port identifier + of the distribution controller for the connection. The + distribution controller should be created by the acceptor + processes when a new connection is accepted. Its job is to + dispatch traffic on the connection. +

+ Kernel responds with one of the following messages: + + {Kernel, controller, SupervisorPid} + +

+ The request was accepted and SupervisorPid is the + process identifier of the connection supervisor process + (which is created in the + accept_connection/5 + callback). +

+
+ {Kernel, unsupported_protocol} + +

+ The request was rejected. This is a fatal error. The acceptor + process should terminate. +

+
+
+

+ When an accept sequence has been completed the acceptor process + is expected to continue accepting further requests. +

+
+ + accept_connection(AcceptorPid, DistCtrl, MyNode, Allowed, SetupTime) ->
  ConnectionSupervisorPid
+ +

+ accept_connection/5 should spawn a process that will + perform the Erlang distribution handshake for the connection. + If the handshake successfully completes it should continue to + function as a connection supervisor. This process + should preferably execute on max priority. +

+

The arguments:

+ + AcceptorPid + +

+ Process identifier of the process created by the + accept/1 + callback. +

+
+ DistCtrl + +

The identifier of the distribution controller identifier + created by the acceptor process. To be passed along to + dist_util:handshake_other_started(HsData). +

+
+ MyNode + +

+ Node name of this node. To be passed along to + dist_util:handshake_other_started(HsData). +

+
+ Allowed + +

+ To be passed along to + dist_util:handshake_other_started(HsData). +

+
+ SetupTime + +

+ Time used for creating a setup timer by a + call to dist_util:start_timer(SetupTime). + The timer should be passed along to + dist_util:handshake_other_started(HsData). +

+
+
+

+ The created process should provide callbacks and other + information needed for the handshake in a + #hs_data{} + record and call dist_util:handshake_other_started(HsData) + with this record. +

+

+ dist_util:handshake_other_started(HsData) will perform + the handshake and if the handshake successfully completes this + process will then continue in a connection supervisor loop + as long as the connection is up. +

+
+ + setup(Node, Type, MyNode, LongOrShortNames, SetupTime) ->
  ConnectionSupervisorPid
+ +

+ setup/5 should spawn a process that connects to + Node. When connection has been established it should + perform the Erlang distribution handshake for the connection. + If the handshake successfully completes it should continue to + function as a connection supervisor. This process + should preferably execute on max priority. +

+

The arguments:

+ + Node + +

+ Node name of remote node. To be passed along to + dist_util:handshake_we_started(HsData). +

+
+ Type + +

+ Connection type. To be passed along to + dist_util:handshake_we_started(HsData). +

+
+ MyNode + +

+ Node name of this node. To be passed along to + dist_util:handshake_we_started(HsData). +

+
+ LongOrShortNames + +

+ Either the atom longnames or + the atom shortnames indicating + whether long or short names is used. +

+
+ SetupTime + +

+ Time used for creating a setup timer by a + call to dist_util:start_timer(SetupTime). + The timer should be passed along to + dist_util:handshake_we_started(HsData). +

+
+
+

+ The caller of this function is a representative for net_kernel + (this may or may not be the process registered as net_kernel) + and is in this document identified as Kernel. +

+

+ This function should, besides spawning the connection supervisor, + also create a distribution controller. The distribution + controller is either a process or a port which is responsible + for dispatching traffic. +

+

+ The created process should provide callbacks and other + information needed for the handshake in a + #hs_data{} + record and call dist_util:handshake_we_started(HsData) + with this record. +

+

+ dist_util:handshake_we_started(HsData) will perform + the handshake and the handshake successfully completes this + process will then continue in a connection supervisor loop + as long as the connection is up. +

+
+ + close(Listen) ->
  void()
+ +

+ Called in order to close the Listen handle + that originally was passed from the + listen/1 callback. +

+ + select(NodeName) ->
  boolean()
+ +

Return true if the host name part + of the NodeName is valid for use + with this protocol; otherwise, false. +

+
+ +
+ +

+ There are also two optional functions that may be + exported: +

+ + setopts(Listen, Opts) ->
  ok | {error, Error}
+ +

+ The argument Listen is the handle originally passed + from the + listen/1 callback. + The argument Opts is a list of options to set on future + connections. +

+
+ + getopts(Listen, Opts) ->
  {ok, OptionValues} | {error, Error}
+ +

+ The argument Listen is the handle originally passed + from the + listen/1 callback. + The argument Opts is a list of options to read for future + connections. +

+
+
+ +
+
+ + The #hs_data{} Record +

+ The dist_util:handshake_we_started/1 and + dist_util:handshake_other_started/1 functions + takes a #hs_data{} record as argument. There + are quite a lot of fields in this record that you + need to set. The record is defined in + kernel/include/dist_util.hrl. Not documented + fields should not be set, i.e., should be left as + undefined. +

+

+ The following #hs_data{} record fields need + to be set unless otherwise stated:

+ + kernel_pid + +

+ Process identifier of the Kernel process. That is, + the process that called either + setup/5 or + accept_connection/5. +

+
+ + other_node + +

Name of the other node. This field is only + mandatory when this node initiates the connection. + That is, when connection is set up via + setup/5. +

+
+ + this_node + +

+ The node name of this node. +

+
+ + socket + +

+ The identifier of the distribution controller. +

+
+ + timer + +

+ The timer created using dist_util:start_timer/1. +

+
+ + allowed + +

Information passed as Allowed to + accept_connection/5. This field is only + mandatory when the remote node initiated the + connection. That is, when the connection is set + up via + accept_connection/5. +

+
+ + f_send + +

+ A fun with the following signature: +

+ ok | {error, Error}]]> +

+ where DistCtrlr is the identifier of + the distribution controller and Data + is io data to pass to the other side. +

+

Only used during handshake phase.

+
+ + f_recv + +

+ A fun with the following signature: +

+ {ok, Packet} | {error, Reason}]]> +

+ where DistCtrlr is the identifier of the distribution + controller. + If Length is 0, all available bytes should be + returned. If Length > 0, exactly Length bytes + should be returned, or an error; possibly discarding less + than Length bytes of data when the connection is + closed from the other side. + It is used for passive receive of data from the + other end. +

+

Only used during handshake phase.

+
+ + f_setopts_pre_nodeup + +

+ A fun with the following signature: +

+ ok | {error, Error}]]> +

+ where DistCtrlr is the identifier of + the distribution controller. Called just + before the distribution channel is taken up + for normal traffic. +

+

Only used during handshake phase.

+
+ + f_setopts_post_nodeup + +

+ A fun with the following signature: +

+ ok | {error, Error}]]> +

+ where DistCtrlr is the identifier of + the distribution controller. Called just + after distribution channel has been taken + up for normal traffic. +

+

Only used during handshake phase.

+
+ + f_getll + +

+ A fun with the following signature: +

+ ID]]> +

+ where DistCtrlr is the identifier of + the distribution controller and ID is + the identifier of the low level entity that + handles the connection (often DistCtrlr + itself). +

+

Only used during handshake phase.

+
+ + f_address + +

+ A fun with the following signature: +

+ NetAddress]]> +

+ where DistCtrlr is the identifier of + the distribution controller, Node + is the node name of the node on the other end, + and NetAddress is a #net_address{} + record with information about the address + for the Node on the other end of the + connection. The #net_address{} record + is defined in + kernel/include/net_address.hrl. +

+

Only used during handshake phase.

+
+ + mf_tick + +

+ A fun with the following signature: +

+ void()]]> +

+ where DistCtrlr is the identifier + of the distribution controller. This + function should send information over + the connection that is not interpreted + by the other end while increasing the + statistics of received packets on the + other end. This is usually implemented by + sending an empty packet. +

+

+ It is of vital importance that this operation + does not block the caller for a long time. + This since it is called from the connection + supervisor. +

+

Used when connection is up.

+
+ + mf_getstat + +

+ A fun with the following signature: +

+ {ok, Received, Sent, PendSend}]]> +

+ where DistCtrlr is the identifier + of the distribution controller, Received + is received packets, Sent is + sent packets, and PendSend is + amount of packets in queue to be sent + or a boolean() indicating whether + there are packets in queue to be sent. +

+

+ It is of vital importance that this operation + does not block the caller for a long time. + This since it is called from the connection + supervisor. +

+

Used when connection is up.

+
+ + request_type + +

+ The request Type as passed to + setup/5. + This is only mandatory when the connection has + been initiated by this node. That is, the connection + is set up via setup/5. +

+
+ + mf_setopts + +

+ A fun with the following signature: +

+ ok | {error, Error}]]> +

+ where DistCtrlr is the identifier + of the distribution controller and Opts + is a list of options to set on the connection. +

+

This function is optional. Used when connection is up.

+
+ + mf_getopts + +

+ A fun with the following signature: +

+ {ok, OptionValues} | {error, Error}]]> +

+ where DistCtrlr is the identifier + of the distribution controller and Opts + is a list of options to read for the connection. +

+

This function is optional. Used when connection is up.

+
+ + f_handshake_complete + +

+ A fun with the following signature: +

+ void()]]> +

+ where DistCtrlr is the identifier + of the distribution controller, Node is + the node name of the node connected at the other + end, and DHandle is a distribution handle + needed by a distribution controller process when + calling the following BIFs: +

+ +

erlang:dist_ctrl_get_data/1

+

erlang:dist_ctrl_get_data_notification/1

+

erlang:dist_ctrl_input_handler/2

+

erlang:dist_ctrl_put_data/2

+
+

+ This function is called when the handshake has + completed and the distribution channel is up. + The distribution controller can begin dispatching + traffic over the channel. This function is optional. +

+

Only used during handshake phase.

+
+ +
+
+ +
+ + Enable Your Distribution Module + +

For net_kernel to find out which distribution module to use, + the erl command-line argument -proto_dist is used. It + is followed by one or more distribution module names, with suffix + "_dist" removed. That is, gen_tcp_dist as a distribution module + is specified as -proto_dist gen_tcp.

+ +

If no epmd (TCP port mapper daemon) is used, also command-line + option -no_epmd is to be specified, which makes + Erlang skip the epmd startup, both as an OS process and as an + Erlang ditto.

+
+ +
+
The Driver + + +

This section was written a long time ago. Most of it is still + valid, but some things have changed since then. Some updates have + been made to the documentation of the driver presented here, + but more can be done and is planned for the future. + The reader is encouraged to read the + erl_driver and + driver_entry + documentation also.

+
+

Although Erlang drivers in general can be beyond the scope of this section, a brief introduction seems to be in place.

diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index 105734d5b2..8b0f97f30f 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -181,6 +181,14 @@ + + + + +

An opaque handle identifing a distribution channel.

+
+
+ @@ -1214,6 +1222,152 @@ end + + + Get distribution channel data to pass to another node. + +

+ Get distribution channel data from the local node that is + to be passed to the remote node. The distribution channel + is identified by DHandle. If no data + is available, the atom none is returned. One + can request to be informed by a message when more + data is available by calling + erlang:dist_ctrl_get_data_notification(DHandle). +

+

+ The data retreived from this function needs to be delivered + as is to the node on the other end in the exact same order, + with no loss of data what so ever, as retrived from this + function. +

+

+ Only the process registered as distribution + controller for the distribution channel identified by + DHandle is allowed to call this + function. +

+

+ This function is used when implementing an alternative + distribution carrier using processes as distribution + controllers. DHandle is retrived + via the callback + f_handshake_complete. + More information about this can be found in the documentation of + ERTS + User's Guide ➜ How to implement an Alternative Carrier + for the Erlang Distribution ➜ Distribution Module. +

+
+
+ + + + Request notification about available outgoing distribution channel data. + +

+ Request notification when more data is available to + fetch using + erlang:dist_ctrl_get_data(DHandle) + for the distribution channel identified by + DHandle. When more data is present, + the caller will be sent the message dist_data. + Once a dist_data messages has been sent, no + more dist_data messages will be sent until + the dist_ctrl_get_data_notification/1 function has been called + again. +

+

+ Only the process registered as distribution + controller for the distribution channel identified by + DHandle is allowed to call this + function. +

+

+ This function is used when implementing an alternative + distribution carrier using processes as distribution + controllers. DHandle is retrived + via the callback + f_handshake_complete. + More information about this can be found in the documentation of + ERTS + User's Guide ➜ How to implement an Alternative Carrier + for the Erlang Distribution ➜ Distribution Module. +

+
+
+ + + + Register distribution channel input handler process. + +

+ Register an alternate input handler process for the + distribution channel identified by DHandle. + Once this function has been called, InputHandler + is the only process allowed to call + erlang:dist_ctrl_put_data(DHandle, Data) + with the DHandle identifing this distribution + channel. +

+

+ Only the process registered as distribution + controller for the distribution channel identified by + DHandle is allowed to call this + function. +

+

+ This function is used when implementing an alternative + distribution carrier using processes as distribution + controllers. DHandle is retrived + via the callback + f_handshake_complete. + More information about this can be found in the documentation of + ERTS + User's Guide ➜ How to implement an Alternative Carrier + for the Erlang Distribution ➜ Distribution Module. +

+
+
+ + + + Pass data into the VM from a distribution channel. + +

+ Deliver distribution channel data from a remote node to the + local node. +

+

+ The data passed to the VM using this function needs to be + passed in the exact same order, and with no loss of data + what so ever, as sent from the node on the other end. +

+

+ Only the process registered as distribution + controller for the distribution channel identified by + DHandle is allowed to call this + function unless an alternate input handler process + has been registered using + erlang:dist_ctrl_input_handler(DHandle, InputHandler). + If an alternate input handler has been registered, only + the registered input handler process is allowed to call + this function. +

+

+ This function is used when implementing an alternative + distribution carrier using processes as distribution + controllers. DHandle is retrived + via the callback + f_handshake_complete. + More information about this can be found in the documentation of + ERTS + User's Guide ➜ How to implement an Alternative Carrier + for the Erlang Distribution ➜ Distribution Module. +

+
+
+ Return the Nth element of a tuple. diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index a44d23b181..fc55b687d4 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -217,6 +217,8 @@ atom discard atom display_items atom dist atom dist_cmd +atom dist_ctrl_put_data +atom dist_data atom Div='/' atom div atom dlink diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index a8bbf5f8c1..28ca449a23 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -154,6 +154,12 @@ bif erlang:spawn_opt/1 bif erlang:setnode/2 bif erlang:setnode/3 bif erlang:dist_exit/3 +bif erlang:dist_get_stat/1 +bif erlang:dist_ctrl_input_handler/2 +bif erlang:dist_ctrl_put_data/2 +bif erlang:dist_ctrl_get_data/1 +bif erlang:dist_ctrl_get_data_notification/1 + # Static native functions in erts_internal bif erts_internal:port_info/1 diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index d9feec4722..5d7501f234 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -121,7 +121,7 @@ Export* dexit_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ - +static Export *dist_ctrl_put_data_trap; /* forward declarations */ @@ -156,9 +156,7 @@ create_cache(DistEntry *dep) int i; ErtsAtomCache *cp; - ERTS_SMP_LC_ASSERT( - is_internal_port(dep->cid) - && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); + ERTS_SMP_LC_ASSERT(is_nil(dep->cid)); ASSERT(!dep->cache); dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE, @@ -450,7 +448,35 @@ inc_no_nodes(void) #endif erts_smp_atomic_inc_mb(&no_nodes); } - + +static void +kill_dist_ctrl_proc(void *vpid) +{ + Eterm pid = (Eterm) vpid; + ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; + Process *rp = erts_pid2proc(NULL, 0, pid, rp_locks); + if (rp) { + erts_send_exit_signal(NULL, rp->common.id, rp, &rp_locks, + am_kill, NIL, NULL, 0); + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + } +} + +static void +schedule_kill_dist_ctrl_proc(Eterm pid) +{ + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + int sched_id = 1; + if (!esdp || ERTS_SCHEDULER_IS_DIRTY(esdp)) + sched_id = 1; + else + sched_id = (int) esdp->no; + erts_schedule_misc_aux_work(sched_id, + kill_dist_ctrl_proc, + (void *) (UWord) pid); +} + /* * proc is currently running or exiting process. */ @@ -460,58 +486,62 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ DistEntry *tdep; - int no_dist_port = 0; + int no_dist_ctrl = 0; Eterm nd_reason = (reason == am_no_network ? am_no_network : am_net_kernel_terminated); erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) - no_dist_port++; + no_dist_ctrl++; for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) - no_dist_port++; + no_dist_ctrl++; /* KILL all port controllers */ - if (no_dist_port == 0) + if (no_dist_ctrl == 0) erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); else { Eterm def_buf[128]; int i = 0; - Eterm *dist_port; + Eterm *dist_ctrl; - if (no_dist_port <= sizeof(def_buf)/sizeof(def_buf[0])) - dist_port = &def_buf[0]; + if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0])) + dist_ctrl = &def_buf[0]; else - dist_port = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*no_dist_port); + dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, + sizeof(Eterm)*no_dist_ctrl); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) { - ASSERT(is_internal_port(tdep->cid)); - dist_port[i++] = tdep->cid; + ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + dist_ctrl[i++] = tdep->cid; } for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) { - ASSERT(is_internal_port(tdep->cid)); - dist_port[i++] = tdep->cid; + ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + dist_ctrl[i++] = tdep->cid; } erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); - for (i = 0; i < no_dist_port; i++) { - Port *prt = erts_port_lookup(dist_port[i], - ERTS_PORT_SFLGS_INVALID_LOOKUP); - if (!prt) - continue; - ASSERT(erts_atomic32_read_nob(&prt->state) - & ERTS_PORT_SFLG_DISTRIBUTION); - - erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED, - prt, dist_port[i], nd_reason, NULL); + for (i = 0; i < no_dist_ctrl; i++) { + if (is_internal_pid(dist_ctrl[i])) + schedule_kill_dist_ctrl_proc(dist_ctrl[i]); + else { + Port *prt = erts_port_lookup(dist_ctrl[i], + ERTS_PORT_SFLGS_INVALID_LOOKUP); + if (prt) { + ASSERT(erts_atomic32_read_nob(&prt->state) + & ERTS_PORT_SFLG_DISTRIBUTION); + + erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED, + prt, dist_ctrl[i], nd_reason, NULL); + } + } } - if (dist_port != &def_buf[0]) - erts_free(ERTS_ALC_T_TMP, dist_port); + if (dist_ctrl != &def_buf[0]) + erts_free(ERTS_ALC_T_TMP, dist_ctrl); } /* - * When last dist port exits, node will be taken + * When last dist ctrl exits, node will be taken * from alive to not alive. */ ASSERT(is_nil(nodedown.reason) && !nodedown.bp); @@ -528,7 +558,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) &nodedown.bp->off_heap); } } - else { /* Call from distribution port */ + else { /* Call from distribution controller (port/process) */ NetExitsContext nec = {dep}; ErtsLink *nlinks; ErtsLink *node_links; @@ -538,11 +568,12 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) erts_smp_atomic_set_mb(&dep->dist_cmd_scheduled, 1); erts_smp_de_rwlock(dep); - ERTS_SMP_LC_ASSERT(is_internal_port(dep->cid) - && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); + if (is_internal_port(dep->cid)) { + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); - if (erts_port_task_is_scheduled(&dep->dist_cmd)) - erts_port_task_abort(&dep->dist_cmd); + if (erts_port_task_is_scheduled(&dep->dist_cmd)) + erts_port_task_abort(&dep->dist_cmd); + } if (dep->status & ERTS_DE_SFLG_EXITING) { #ifdef DEBUG @@ -618,6 +649,9 @@ void init_dist(void) dgroup_leader_trap = trap_function(am_dgroup_leader,2); dexit_trap = trap_function(am_dexit, 2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); + dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, + am_dist_ctrl_put_data, + 2); } #define ErtsDistOutputBuf2Binary(OB) \ @@ -660,6 +694,8 @@ static void clear_dist_entry(DistEntry *dep) ErtsDistOutputBuf *obuf; erts_smp_de_rwlock(dep); + erts_smp_atomic_set_nob(&dep->input_handler, + (erts_aint_t) NIL); cache = dep->cache; dep->cache = NULL; @@ -673,6 +709,9 @@ static void clear_dist_entry(DistEntry *dep) erts_smp_mtx_lock(&dep->qlock); + erts_smp_atomic64_set_nob(&dep->in, 0); + erts_smp_atomic64_set_nob(&dep->out, 0); + if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; else { @@ -680,8 +719,15 @@ static void clear_dist_entry(DistEntry *dep) obuf = dep->out_queue.first; } + if (dep->tmp_out_queue.first) { + dep->tmp_out_queue.last->next = obuf; + obuf = dep->tmp_out_queue.first; + } + dep->out_queue.first = NULL; dep->out_queue.last = NULL; + dep->tmp_out_queue.first = NULL; + dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; dep->status = 0; @@ -1148,6 +1194,7 @@ int erts_net_message(Port *prt, ErtsLink *lnk; Uint tuple_arity; int res; + Uint32 connection_id; #ifdef ERTS_DIST_MSG_DBG ErlDrvSizeT orig_len = len; #endif @@ -1156,14 +1203,17 @@ int erts_net_message(Port *prt, ERTS_SMP_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_SMP_LC_ASSERT(!prt || erts_lc_is_port_locked(prt)); if (!erts_is_alive) { UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; } + + if (hlen != 0) goto data_error; + if (len == 0) { /* HANDLE TICK !!! */ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; @@ -1187,25 +1237,31 @@ int erts_net_message(Port *prt, goto data_error; } - res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache); + res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache, &connection_id); - if (res >= 0) - res = ctl_len = erts_decode_dist_ext_size(&ede); - else { + switch (res) { + case ERTS_PREP_DIST_EXT_CLOSED: + return 0; /* Connection not alive; ignore signal... */ + case ERTS_PREP_DIST_EXT_FAILED: #ifdef ERTS_DIST_MSG_DBG erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n"); bw(buf, orig_len); #endif - ctl_len = 0; - } - - if (res < 0) { + goto data_error; + case ERTS_PREP_DIST_EXT_SUCCESS: + ctl_len = erts_decode_dist_ext_size(&ede); + if (ctl_len < 0) { #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); - bw(buf, orig_len); + erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); + bw(buf, orig_len); #endif - PURIFY_MSG("data error"); - goto data_error; + PURIFY_MSG("data error"); + goto data_error; + } + break; + default: + ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()"); + break; } if (ctl_len > DIST_CTL_DEFAULT_SIZE) { @@ -1726,7 +1782,7 @@ decode_error: } data_error: UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - erts_deliver_port_exit(prt, dep->cid, am_killed, 0, 1); + erts_kill_dist_connection(dep, connection_id); ERTS_SMP_CHK_NO_PROC_LOCKS; return -1; } @@ -1747,6 +1803,31 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy) return ret; } +static ERTS_INLINE void +notify_dist_data(Process *c_p, Eterm pid) +{ + Process *rp; + ErtsProcLocks rp_locks; + + ASSERT(erts_get_scheduler_data() + && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data())); + ASSERT(is_internal_pid(pid)); + + if (c_p && c_p->common.id == pid) { + rp = c_p; + rp_locks = ERTS_PROC_LOCK_MAIN; + } + else { + rp = erts_proc_lookup(pid); + rp_locks = 0; + } + + if (rp) { + ErtsMessage *mp = erts_alloc_message(0, NULL); + erts_queue_message(rp, rp_locks, mp, am_dist_data, am_system); + } +} + int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) { @@ -1865,14 +1946,28 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) Sint qsize; erts_aint32_t qflgs; ErtsProcList *plp = NULL; + Eterm notify_proc = NIL; + Sint obsz = size_obuf(ctx->obuf); + erts_smp_mtx_lock(&dep->qlock); - qsize = erts_smp_atomic_add_read_nob(&dep->qsize, - (erts_aint_t) size_obuf(ctx->obuf)); + qsize = erts_smp_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz); + ASSERT(qsize >= obsz); qflgs = erts_smp_atomic32_read_nob(&dep->qflgs); if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) { erts_smp_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY); qflgs |= ERTS_DE_QFLG_BUSY; } + if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) { + /* Previously empty queue and info requested... */ + qflgs = erts_smp_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + /* else: requester will send itself the message... */ + qflgs &= ~ERTS_DE_QFLG_REQ_INFO; + } if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) { erts_smp_mtx_unlock(&dep->qlock); @@ -1916,8 +2011,11 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } erts_smp_mtx_unlock(&dep->qlock); - erts_schedule_dist_command(NULL, dep); + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); erts_smp_de_runlock(dep); + if (is_internal_pid(notify_proc)) + notify_dist_data(ctx->c_p, notify_proc); if (resume) { erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN); @@ -1971,16 +2069,20 @@ static Uint dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; - Uint size = obuf->ext_endp - obuf->extp; + ErlDrvSizeT size; + char *bufp; ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (size > (Uint) INT_MAX) - erts_exit(ERTS_DUMP_EXIT, - "Absurdly large distribution output data buffer " - "(%beu bytes) passed.\n", - size); + if (!obuf) { + size = 0; + bufp = NULL; + } + else { + size = obuf->ext_endp - obuf->extp; + bufp = (char*) obuf->extp; + } #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_output)) { @@ -1995,11 +2097,10 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) remote_str, size); } #endif + prt->caller = NIL; fpe_was_unmasked = erts_block_fpe(); - (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, - (char*) obuf->extp, - (int) size); + (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, bufp, size); erts_unblock_fpe(fpe_was_unmasked); return size; } @@ -2008,7 +2109,7 @@ static Uint dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; - Uint size = obuf->ext_endp - obuf->extp; + ErlDrvSizeT size; SysIOVec iov[2]; ErlDrvBinary* bv[2]; ErlIOVec eiov; @@ -2016,25 +2117,33 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (size > (Uint) INT_MAX) - erts_exit(ERTS_DUMP_EXIT, - "Absurdly large distribution output data buffer " - "(%beu bytes) passed.\n", - size); - iov[0].iov_base = NULL; iov[0].iov_len = 0; bv[0] = NULL; - iov[1].iov_base = obuf->extp; - iov[1].iov_len = size; - bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + if (!obuf) { + size = 0; + eiov.vsize = 1; + } + else { + size = obuf->ext_endp - obuf->extp; + eiov.vsize = 2; + + iov[1].iov_base = obuf->extp; + iov[1].iov_len = size; + bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + } - eiov.vsize = 2; eiov.size = size; eiov.iov = iov; eiov.binv = bv; + if (size > (Uint) INT_MAX) + erts_exit(ERTS_DUMP_EXIT, + "Absurdly large distribution output data buffer " + "(%beu bytes) passed.\n", + size); + ASSERT(prt->drv_ptr->outputv); #ifdef USE_VM_PROBES @@ -2138,20 +2247,20 @@ erts_dist_command(Port *prt, int reds_limit) if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { int preempt = 0; do { - Uint size; - ErtsDistOutputBuf *fob; - - size = (*send)(prt, foq.first); - esdp->io.out += (Uint64) size; + Uint size; + ErtsDistOutputBuf *fob; + size = (*send)(prt, foq.first); + erts_smp_atomic64_inc_nob(&dep->out); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(foq.first->extp, size); + erts_fprintf(stderr, ">> "); + bw(foq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = foq.first; - obufsize += size_obuf(fob); - foq.first = foq.first->next; - free_dist_obuf(fob); + reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = foq.first; + obufsize += size_obuf(fob); + foq.first = foq.first->next; + free_dist_obuf(fob); sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) @@ -2215,29 +2324,30 @@ erts_dist_command(Port *prt, int reds_limit) int de_busy; int preempt = 0; while (oq.first && !preempt) { - ErtsDistOutputBuf *fob; - Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp < oq.first->ext_endp); - size = (*send)(prt, oq.first); - esdp->io.out += (Uint64) size; + ErtsDistOutputBuf *fob; + Uint size; + oq.first->extp + = erts_encode_ext_dist_header_finalize(oq.first->extp, + dep->cache, + flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' + needed */ + ASSERT(&oq.first->data[0] <= oq.first->extp + && oq.first->extp < oq.first->ext_endp); + size = (*send)(prt, oq.first); + erts_smp_atomic64_inc_nob(&dep->out); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(oq.first->extp, size); + erts_fprintf(stderr, ">> "); + bw(oq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = oq.first; - obufsize += size_obuf(fob); - oq.first = oq.first->next; - free_dist_obuf(fob); + reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = oq.first; + obufsize += size_obuf(fob); + oq.first = oq.first->next; + free_dist_obuf(fob); sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) @@ -2379,6 +2489,374 @@ erts_dist_command(Port *prt, int reds_limit) goto done; } +#if 0 + +int +dist_data_finalize(Process *c_p, int reds_limit) +{ + int reds = 5; + DistEntry *dep = ; + ErtsDistOutputQueue oq, foq; + ErtsDistOutputBuf *ob; + int preempt; + + + erts_smp_mtx_lock(&dep->qlock); + flags = dep->flags; + oq.first = dep->out_queue.first; + oq.last = dep->out_queue.last; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + erts_smp_mtx_unlock(&dep->qlock); + + if (!oq.first) { + ASSERT(!oq.last); + oq.first = dep->tmp_out_queue.first; + oq.last = dep->tmp_out_queue.last; + } + else { + ErtsDistOutputBuf *f, *l; + ASSERT(oq.last); + if (dep->tmp_out_queue.last) { + dep->tmp_out_queue.last->next = oq.first; + oq.first = dep->tmp_out_queue.first; + } + } + + if (!oq.first) { + /* Nothing to do... */ + ASSERT(!oq.last); + return reds; + } + + foq.first = dep->finalized_out_queue.first; + foq.last = dep->finalized_out_queue.last; + + preempt = 0; + ob = oq.first; + ASSERT(ob); + + do { + ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, + dep->cache, + flags); + if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' + needed */ + ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + preempt = reds > reds_limit; + if (preempt) + break; + ob = ob->next; + } while (ob); + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the last buffer that we finalized. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + if (!preempt) { + /* All buffers finalized */ + foq.last = oq.last; + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + foq.last = ob; + oq.first = ob->next; + if (oq.first) + ob->next = NULL; + else + oq.last = NULL; + } + + dep->finalized_out_queue.first = foq.first; + dep->finalized_out_queue.last = foq.last; + dep->tmp_out_queue.first = oq.first; + dep->tmp_out_queue.last = oq.last; + + return reds; +} + +#endif + +BIF_RETTYPE +dist_ctrl_get_data_notification_1(BIF_ALIST_1) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + erts_aint32_t qflgs; + erts_aint_t qsize; + Eterm receiver = NIL; + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (dep->sysname != BIF_ARG_1) + BIF_ERROR(BIF_P, BADARG); + + /* + * Caller is the only one that can consume from this queue + * and the only one that can set the req-info flag... + */ + + erts_smp_de_rlock(dep); + + ASSERT(dep->cid == BIF_P->common.id); + + qflgs = erts_smp_atomic32_read_acqb(&dep->qflgs); + + if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) { + qsize = erts_smp_atomic_read_acqb(&dep->qsize); + ASSERT(qsize >= 0); + if (qsize > 0) + receiver = BIF_P->common.id; /* Notify ourselves... */ + else { /* Empty queue; set req-info flag... */ + qflgs = erts_smp_atomic32_read_bor_mb(&dep->qflgs, + ERTS_DE_QFLG_REQ_INFO); + qsize = erts_smp_atomic_read_acqb(&dep->qsize); + ASSERT(qsize >= 0); + if (qsize > 0) { + qflgs = erts_smp_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) + receiver = BIF_P->common.id; /* Notify ourselves... */ + /* else: someone else will notify us... */ + } + /* else: still empty queue... */ + } + } + /* else: Already requested... */ + + erts_smp_de_runlock(dep); + + if (is_internal_pid(receiver)) + notify_dist_data(BIF_P, receiver); + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_ctrl_put_data_2(BIF_ALIST_2) +{ + DistEntry *dep; + ErlDrvSizeT size; + Eterm input_handler; + + if (is_binary(BIF_ARG_2)) + size = binary_size(BIF_ARG_2); + else if (is_nil(BIF_ARG_2)) + size = 0; + else if (is_list(BIF_ARG_2)) + BIF_TRAP2(dist_ctrl_put_data_trap, + BIF_P, BIF_ARG_1, BIF_ARG_2); + else + BIF_ERROR(BIF_P, BADARG); + + dep = erts_find_dist_entry(BIF_ARG_1); + if (!dep) + BIF_ERROR(BIF_P, BADARG); + + input_handler = (Eterm) erts_smp_atomic_read_nob(&dep->input_handler); + + if (input_handler != BIF_P->common.id) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + erts_smp_atomic64_inc_nob(&dep->in); + + if (size != 0) { + byte *data, *temp_alloc = NULL; + + data = (byte *) erts_get_aligned_binary_bytes(BIF_ARG_2, &temp_alloc); + if (!data) + BIF_ERROR(BIF_P, BADARG); + + erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + + (void) erts_net_message(NULL, dep, NULL, 0, data, size); + /* + * We ignore any decode failures. On fatal failures the + * connection will be taken down by killing the + * distribution channel controller... + */ + + erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + + BUMP_REDS(BIF_P, 5); + + erts_free_aligned_binary_bytes(temp_alloc); + + } + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_get_stat_1(BIF_ALIST_1) +{ + Sint64 read, write, pend; + Eterm res, *hp, **hpp; + Uint sz, *szp; + DistEntry *dep = erts_find_dist_entry(BIF_ARG_1); + + if (!dep) + BIF_ERROR(BIF_P, BADARG); + + ASSERT(dep->sysname == BIF_ARG_1); + + erts_smp_de_rlock(dep); + + read = (Sint64) erts_smp_atomic64_read_nob(&dep->in); + write = (Sint64) erts_smp_atomic64_read_nob(&dep->out); + pend = (Sint64) erts_smp_atomic_read_nob(&dep->qsize); + + erts_smp_de_runlock(dep); + + erts_deref_dist_entry(dep); + + sz = 0; + szp = &sz; + hpp = NULL; + + while (1) { + res = erts_bld_tuple(hpp, szp, 4, + am_ok, + erts_bld_sint64(hpp, szp, read), + erts_bld_sint64(hpp, szp, write), + pend ? am_true : am_false); + if (hpp) + break; + hp = HAlloc(BIF_P, sz); + hpp = &hp; + szp = NULL; + } + + BIF_RET(res); +} + +BIF_RETTYPE +dist_ctrl_input_handler_2(BIF_ALIST_2) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (dep->sysname != BIF_ARG_1) + BIF_ERROR(BIF_P, BADARG); + + if (is_not_internal_pid(BIF_ARG_2)) + BIF_ERROR(BIF_P, BADARG); + + erts_smp_atomic_set_nob(&dep->input_handler, + (erts_aint_t) BIF_ARG_2); + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_ctrl_get_data_1(BIF_ALIST_1) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + int reds = 1; + ErtsDistOutputBuf *obuf; + Eterm *hp; + ProcBin *pb; + erts_aint_t qsize; + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (dep->sysname != BIF_ARG_1) + BIF_ERROR(BIF_P, BADARG); + + erts_smp_de_rlock(dep); + + if (dep->status & ERTS_DE_SFLG_EXITING) + goto return_none; + + ASSERT(dep->cid == BIF_P->common.id); + +#if 0 + if (dep->finalized_out_queue.first) { + obuf = dep->finalized_out_queue.first; + dep->finalized_out_queue.first = obuf->next; + if (!obuf->next) + dep->finalized_out_queue.last = NULL; + } + else +#endif + { + if (!dep->tmp_out_queue.first) { + ASSERT(!dep->tmp_out_queue.last); + qsize = erts_smp_atomic_read_acqb(&dep->qsize); + if (qsize > 0) { + erts_smp_mtx_lock(&dep->qlock); + dep->tmp_out_queue.first = dep->out_queue.first; + dep->tmp_out_queue.last = dep->out_queue.last; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + erts_smp_mtx_unlock(&dep->qlock); + } + } + + if (!dep->tmp_out_queue.first) { + ASSERT(!dep->tmp_out_queue.last); + return_none: + erts_smp_de_runlock(dep); + BIF_RET(am_none); + } + else { + obuf = dep->tmp_out_queue.first; + dep->tmp_out_queue.first = obuf->next; + if (!obuf->next) + dep->tmp_out_queue.last = NULL; + } + + obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp, + dep->cache, + dep->flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */ + ASSERT(&obuf->data[0] <= obuf->extp + && obuf->extp < obuf->ext_endp); + } + + erts_smp_atomic64_inc_nob(&dep->out); + + erts_smp_de_runlock(dep); + + hp = HAlloc(BIF_P, PROC_BIN_SIZE); + pb = (ProcBin *) (char *) hp; + pb->thing_word = HEADER_PROC_BIN; + pb->size = obuf->ext_endp - obuf->extp; + pb->next = MSO(BIF_P).first; + MSO(BIF_P).first = (struct erl_off_heap_header*) pb; + pb->val = ErtsDistOutputBuf2Binary(obuf); + pb->bytes = (byte*) obuf->extp; + pb->flags = 0; + + qsize = erts_smp_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf)); + ASSERT(qsize >= 0); + + if (qsize < erts_dist_buf_busy_limit/2 + && (erts_smp_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) { + ErtsProcList *resume_procs = NULL; + erts_smp_mtx_lock(&dep->qlock); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); + erts_smp_mtx_unlock(&dep->qlock); + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + } + + BIF_RET2(make_binary(pb), reds); +} + void erts_dist_port_not_busy(Port *prt) { @@ -2402,8 +2880,7 @@ void erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { erts_smp_de_rwlock(dep); - if (is_internal_port(dep->cid) - && connection_id == dep->connection_id + if (connection_id == dep->connection_id && !(dep->status & ERTS_DE_SFLG_EXITING)) { dep->status |= ERTS_DE_SFLG_EXITING; @@ -2413,7 +2890,10 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) erts_smp_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); erts_smp_mtx_unlock(&dep->qlock); - erts_schedule_dist_command(NULL, dep); + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + else if (is_internal_pid(dep->cid)) + schedule_kill_dist_ctrl_proc(dep->cid); } erts_smp_de_rwunlock(dep); } @@ -2652,17 +3132,23 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; } - net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN, - am_net_kernel, ERTS_PROC_LOCK_MAIN, 0); - if (!net_kernel) + net_kernel = erts_whereis_process(BIF_P, + ERTS_PROC_LOCK_MAIN, + am_net_kernel, + ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS, + 0); + if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel)) goto error; /* By setting F_DISTRIBUTION on net_kernel, - * do_net_exist will be called when net_kernel is terminated !! */ + * erts_do_net_exits will be called when net_kernel is terminated !! */ net_kernel->flags |= F_DISTRIBUTION; - if (net_kernel != BIF_P) - erts_smp_proc_unlock(net_kernel, ERTS_PROC_LOCK_MAIN); + erts_smp_proc_unlock(net_kernel, + (ERTS_PROC_LOCK_STATUS + | ((net_kernel != BIF_P) + ? ERTS_PROC_LOCK_MAIN + : 0))); #ifdef DEBUG erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx); @@ -2679,6 +3165,14 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) erts_smp_thr_progress_unblock(); erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + /* + * Note erts_this_dist_entry is changed by erts_set_this_node(), + * so we *need* to use the new one after erts_set_this_node() + * is called. + */ + erts_smp_refc_inc(&erts_this_dist_entry->refc, 1); + ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry); + BIF_RET(am_true); error: @@ -2708,18 +3202,21 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) Eterm ic, oc; Eterm *tp; DistEntry *dep = NULL; + ErtsProcLocks proc_unlock = 0; + Process *proc; Port *pp = NULL; /* Prepare for success */ - ERTS_BIF_PREP_RET(ret, am_true); + ERTS_BIF_PREP_RET(ret, BIF_ARG_1); /* * Check and pick out arguments */ if (!is_node_name_atom(BIF_ARG_1) || - is_not_internal_port(BIF_ARG_2) || - (erts_this_node->sysname == am_Noname)) { + !(is_internal_port(BIF_ARG_2) + || is_internal_pid(BIF_ARG_2)) + || (erts_this_node->sysname == am_Noname)) { goto badarg; } @@ -2763,74 +3260,116 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) else if (!dep) goto system_limit; /* Should never happen!!! */ - pp = erts_id2port_sflgs(BIF_ARG_2, - BIF_P, - ERTS_PROC_LOCK_MAIN, - ERTS_PORT_SFLGS_INVALID_LOOKUP); - erts_smp_de_rwlock(dep); + if (is_internal_pid(BIF_ARG_2)) { + if (BIF_P->common.id == BIF_ARG_2) { + proc_unlock = 0; + proc = BIF_P; + } + else { + proc_unlock = ERTS_PROC_LOCK_MAIN; + proc = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN, + BIF_ARG_2, proc_unlock); + } + erts_smp_de_rwlock(dep); - if (!pp || (erts_atomic32_read_nob(&pp->state) - & ERTS_PORT_SFLG_EXITING)) - goto badarg; + if (!proc) + goto badarg; + else if (proc == ERTS_PROC_LOCK_BUSY) { + proc_unlock = 0; + goto yield; + } - if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) - goto badarg; + erts_smp_proc_lock(proc, ERTS_PROC_LOCK_STATUS); + proc_unlock |= ERTS_PROC_LOCK_STATUS; - if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) - goto done; /* Already set */ + if (ERTS_PROC_GET_DIST_ENTRY(proc)) { + if (dep == ERTS_PROC_GET_DIST_ENTRY(proc) + && (proc->flags & F_DISTRIBUTION) + && dep->cid == BIF_ARG_2) + goto done; + goto badarg; + } - if (dep->status & ERTS_DE_SFLG_EXITING) { - /* Suspend on dist entry waiting for the exit to finish */ - ErtsProcList *plp = erts_proclist_create(BIF_P); - plp->next = NULL; - erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); - erts_smp_mtx_lock(&dep->qlock); - erts_proclist_store_last(&dep->suspended, plp); - erts_smp_mtx_unlock(&dep->qlock); - goto yield; - } + if (is_not_nil(dep->cid)) + goto badarg; - ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + proc->flags |= F_DISTRIBUTION; + ERTS_PROC_SET_DIST_ENTRY(proc, dep); - if (pp->dist_entry || is_not_nil(dep->cid)) - goto badarg; + proc_unlock &= ~ERTS_PROC_LOCK_STATUS; + erts_smp_proc_unlock(proc, ERTS_PROC_LOCK_STATUS); - erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + dep->send = NULL; /* Only for distr ports... */ - /* - * Dist-ports do not use the "busy port message queue" functionality, but - * instead use "busy dist entry" functionality. - */ - { - ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; - erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); } + else { - pp->dist_entry = dep; + pp = erts_id2port_sflgs(BIF_ARG_2, + BIF_P, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); + erts_smp_de_rwlock(dep); + + if (!pp || (erts_atomic32_read_nob(&pp->state) + & ERTS_PORT_SFLG_EXITING)) + goto badarg; + + if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) + goto badarg; + + if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) + goto done; /* Already set */ + + if (dep->status & ERTS_DE_SFLG_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_smp_mtx_lock(&dep->qlock); + erts_proclist_store_last(&dep->suspended, plp); + erts_smp_mtx_unlock(&dep->qlock); + goto yield; + } - dep->version = version; - dep->creation = 0; + ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); - ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + if (pp->dist_entry || is_not_nil(dep->cid)) + goto badarg; -#if 1 - dep->send = (pp->drv_ptr->outputv - ? dist_port_commandv - : dist_port_command); -#else - dep->send = dist_port_command; -#endif - ASSERT(dep->send); + erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + + pp->dist_entry = dep; + + ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + + dep->send = (pp->drv_ptr->outputv + ? dist_port_commandv + : dist_port_command); + ASSERT(dep->send); + + /* + * Dist-ports do not use the "busy port message queue" functionality, but + * instead use "busy dist entry" functionality. + */ + { + ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; + erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); + } + + } + + dep->version = version; + dep->creation = 0; #ifdef DEBUG ASSERT(erts_smp_atomic_read_nob(&dep->qsize) == 0); #endif - erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); - if (flags & DFLAG_DIST_HDR_ATOM_CACHE) create_cache(dep); + erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + erts_smp_de_rwunlock(dep); dep = NULL; /* inc of refc transferred to port (dist_entry field) */ @@ -2851,6 +3390,9 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if (pp) erts_port_release(pp); + if (proc_unlock) + erts_smp_proc_unlock(proc, proc_unlock); + return ret; yield: diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 96f9b284b3..8846866eb7 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -3938,12 +3938,12 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) DFLAG_BIT_BINARIES); BIF_RET(erts_term_to_binary(BIF_P, tp[2], 0, dflags)); } - else if (ERTS_IS_ATOM_STR("dist_port", tp[1])) { + else if (ERTS_IS_ATOM_STR("dist_ctrl", tp[1])) { Eterm res = am_undefined; DistEntry *dep = erts_sysname_to_connected_dist_entry(tp[2]); if (dep) { erts_smp_de_rlock(dep); - if (is_internal_port(dep->cid)) + if (is_internal_port(dep->cid) || is_internal_pid(dep->cid)) res = dep->cid; erts_smp_de_runlock(dep); erts_deref_dist_entry(dep); diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index c1796a8894..4ef1d54906 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -102,6 +102,7 @@ dist_table_alloc(void *dep_tmpl) erts_smp_rwmtx_init_opt_x(&dep->rwmtx, &rwmtx_opt, "dist_entry", chnl_nr); dep->sysname = sysname; dep->cid = NIL; + erts_smp_atomic_init_nob(&dep->input_handler, (erts_aint_t) NIL); dep->connection_id = 0; dep->status = 0; dep->flags = 0; @@ -115,10 +116,14 @@ dist_table_alloc(void *dep_tmpl) erts_smp_mtx_init_x(&dep->qlock, "dist_entry_out_queue", chnl_nr); erts_smp_atomic32_init_nob(&dep->qflgs, 0); erts_smp_atomic_init_nob(&dep->qsize, 0); + erts_smp_atomic64_init_nob(&dep->in, 0); + erts_smp_atomic64_init_nob(&dep->out, 0); dep->out_queue.first = NULL; dep->out_queue.last = NULL; dep->suspended = NULL; + dep->tmp_out_queue.first = NULL; + dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; @@ -383,7 +388,7 @@ erts_set_dist_entry_not_connected(DistEntry *dep) erts_smp_rwmtx_rwlock(&erts_dist_table_rwmtx); ASSERT(dep != erts_this_dist_entry); - ASSERT(is_internal_port(dep->cid)); + ASSERT(is_internal_port(dep->cid) || is_internal_pid(dep->cid)); if(dep->flags & DFLAG_PUBLISHED) { if(dep->prev) { @@ -438,7 +443,7 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) ASSERT(dep != erts_this_dist_entry); ASSERT(is_nil(dep->cid)); - ASSERT(is_internal_port(cid)); + ASSERT(is_internal_port(cid) || is_internal_pid(cid)); if(dep->prev) { ASSERT(is_in_de_list(dep, erts_not_connected_dist_entries)); @@ -458,10 +463,19 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) dep->status |= ERTS_DE_SFLG_CONNECTED; dep->flags = flags; dep->cid = cid; + erts_smp_atomic_set_nob(&dep->input_handler, + (erts_aint_t) cid); + dep->connection_id++; dep->connection_id &= ERTS_DIST_EXT_CON_ID_MASK; dep->prev = NULL; + erts_smp_atomic64_set_nob(&dep->in, 0); + erts_smp_atomic64_set_nob(&dep->out, 0); + erts_smp_atomic32_set_nob(&dep->qflgs, + (is_internal_port(cid) + ? ERTS_DE_QFLG_PORT_CTRL + : ERTS_DE_QFLG_PROC_CTRL)); if(flags & DFLAG_PUBLISHED) { dep->next = erts_visible_dist_entries; if(erts_visible_dist_entries) { @@ -1397,6 +1411,14 @@ setup_reference_table(void) insert_links(ERTS_P_LINKS(proc), proc->common.id); if (ERTS_P_MONITORS(proc)) insert_monitors(ERTS_P_MONITORS(proc), proc->common.id); + { + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc); + if (dep) + insert_dist_entry(dep, + CTRL_REF, + proc->common.id, + 0); + } } } diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 04acfe41b1..5cff5e14c2 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -61,11 +61,17 @@ #define ERTS_DE_SFLGS_ALL (ERTS_DE_SFLG_CONNECTED \ | ERTS_DE_SFLG_EXITING) -#define ERTS_DE_QFLG_BUSY (((Uint32) 1) << 0) -#define ERTS_DE_QFLG_EXIT (((Uint32) 1) << 1) +#define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0) +#define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1) +#define ERTS_DE_QFLG_REQ_INFO (((erts_aint32_t) 1) << 2) +#define ERTS_DE_QFLG_PORT_CTRL (((erts_aint32_t) 1) << 3) +#define ERTS_DE_QFLG_PROC_CTRL (((erts_aint32_t) 1) << 4) #define ERTS_DE_QFLGS_ALL (ERTS_DE_QFLG_BUSY \ - | ERTS_DE_QFLG_EXIT) + | ERTS_DE_QFLG_EXIT \ + | ERTS_DE_QFLG_REQ_INFO \ + | ERTS_DE_QFLG_PORT_CTRL \ + | ERTS_DE_QFLG_PROC_CTRL) #if defined(ARCH_64) #define ERTS_DIST_OUTPUT_BUF_DBG_PATTERN ((Uint) 0xf713f713f713f713UL) @@ -112,6 +118,7 @@ typedef struct dist_entry_ { erts_smp_rwmtx_t rwmtx; /* Protects all fields below until lck_mtx. */ Eterm sysname; /* name@host atom for efficiency */ Uint32 creation; /* creation of connected node */ + erts_smp_atomic_t input_handler; /* Input handler */ Eterm cid; /* connection handler (pid or port), NIL == free */ Uint32 connection_id; /* Connection id incremented on connect */ Uint32 status; /* Slot status, like exiting reserved etc */ @@ -135,9 +142,12 @@ typedef struct dist_entry_ { erts_smp_mtx_t qlock; /* Protects qflgs and out_queue */ erts_smp_atomic32_t qflgs; erts_smp_atomic_t qsize; + erts_smp_atomic64_t in; + erts_smp_atomic64_t out; ErtsDistOutputQueue out_queue; struct ErtsProcList_ *suspended; + ErtsDistOutputQueue tmp_out_queue; ErtsDistOutputQueue finalized_out_queue; erts_smp_atomic_t dist_cmd_scheduled; ErtsPortTaskHandle dist_cmd; diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index fc2b34e70f..2780c111af 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -796,6 +796,11 @@ erts_pre_init_process(void) = ERTS_PSD_ETS_FIXED_TABLES_GET_LOCKS; erts_psd_required_locks[ERTS_PSD_ETS_FIXED_TABLES].set_locks = ERTS_PSD_ETS_FIXED_TABLES_SET_LOCKS; + + erts_psd_required_locks[ERTS_PSD_DIST_ENTRY].get_locks + = ERTS_PSD_DIST_ENTRY_GET_LOCKS; + erts_psd_required_locks[ERTS_PSD_DIST_ENTRY].set_locks + = ERTS_PSD_DIST_ENTRY_SET_LOCKS; #endif } @@ -13857,7 +13862,7 @@ erts_continue_exit_process(Process *p) ErtsMonitor *mon; ErtsProcLocks curr_locks = ERTS_PROC_LOCK_MAIN; Eterm reason = p->fvalue; - DistEntry *dep; + DistEntry *dep = NULL; erts_aint32_t state; int delay_del_proc = 0; @@ -14064,13 +14069,16 @@ erts_continue_exit_process(Process *p) if (refc_inced && !(n & ERTS_PSFLG_IN_RUNQ)) erts_proc_dec_refc(p); } - - dep = (p->flags & F_DISTRIBUTION) ? erts_this_dist_entry : NULL; + + dep = ((p->flags & F_DISTRIBUTION) + ? ERTS_PROC_SET_DIST_ENTRY(p, NULL) + : NULL); erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL); if (dep) { - erts_do_net_exits(dep, reason); + erts_do_net_exits(dep, (reason == am_kill) ? am_killed : reason); + erts_deref_dist_entry(dep); } /* diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 9d7ba27c50..9e2ac15f13 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -835,14 +835,15 @@ erts_smp_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi) #define ERTS_PSD_NIF_TRAP_EXPORT 5 #define ERTS_PSD_ETS_OWNED_TABLES 6 #define ERTS_PSD_ETS_FIXED_TABLES 7 -#define ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF 8 +#define ERTS_PSD_DIST_ENTRY 8 +#define ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF 9 /* keep last... */ -#define ERTS_PSD_SIZE 9 +#define ERTS_PSD_SIZE 10 #if !defined(HIPE) # undef ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF # undef ERTS_PSD_SIZE -# define ERTS_PSD_SIZE 8 +# define ERTS_PSD_SIZE 9 #endif typedef struct { @@ -876,6 +877,9 @@ typedef struct { #define ERTS_PSD_ETS_FIXED_TABLES_GET_LOCKS ERTS_PROC_LOCK_MAIN #define ERTS_PSD_ETS_FIXED_TABLES_SET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_DIST_ENTRY_GET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_DIST_ENTRY_SET_LOCKS ERTS_PROC_LOCK_MAIN + typedef struct { ErtsProcLocks get_locks; ErtsProcLocks set_locks; @@ -2103,6 +2107,11 @@ erts_psd_set(Process *p, int ix, void *data) #define ERTS_PROC_SET_NIF_TRAP_EXPORT(P, NTE) \ erts_psd_set((P), ERTS_PSD_NIF_TRAP_EXPORT, (void *) (NTE)) +#define ERTS_PROC_GET_DIST_ENTRY(P) \ + ((DistEntry *) erts_psd_get((P), ERTS_PSD_DIST_ENTRY)) +#define ERTS_PROC_SET_DIST_ENTRY(P, DE) \ + ((DistEntry *) erts_psd_set((P), ERTS_PSD_DIST_ENTRY, (void *) (DE))) + #ifdef HIPE #define ERTS_PROC_GET_SUSPENDED_SAVED_CALLS_BUF(P) \ ((struct saved_calls *) erts_psd_get((P), ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF)) diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 1560844521..5bce4c3c92 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -629,7 +629,8 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, byte *ext, Uint size, DistEntry *dep, - ErtsAtomCache *cache) + ErtsAtomCache *cache, + Uint32 *connection_id) { #undef ERTS_EXT_FAIL #undef ERTS_EXT_HDR_FAIL @@ -650,33 +651,36 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, if (size < 2) ERTS_EXT_FAIL; + if (!dep) + ERTS_INTERNAL_ERROR("Invalid use"); + if (ep[0] != VERSION_MAGIC) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); - if (dep) - erts_dsprintf(dsbufp, - "** Got message from incompatible erlang on " - "channel %d\n", - dist_entry_channel_no(dep)); - else - erts_dsprintf(dsbufp, - "** Attempt to convert old incompatible " - "binary %d\n", - *ep); + erts_dsprintf(dsbufp, + "** Got message from incompatible erlang on " + "channel %d\n", + dist_entry_channel_no(dep)); erts_send_error_to_logger_nogl(dsbufp); ERTS_EXT_FAIL; } edep->flags = 0; edep->dep = dep; - if (dep) { - erts_smp_de_rlock(dep); - if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) - edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; - - edep->flags |= (dep->connection_id & ERTS_DIST_EXT_CON_ID_MASK); - erts_smp_de_runlock(dep); + + erts_smp_de_rlock(dep); + + if ((dep->status & (ERTS_DE_SFLG_EXITING|ERTS_DE_SFLG_CONNECTED)) + != ERTS_DE_SFLG_CONNECTED) { + erts_smp_de_runlock(dep); + return ERTS_PREP_DIST_EXT_CLOSED; } + if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) + edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; + + *connection_id = dep->connection_id; + edep->flags |= (dep->connection_id & ERTS_DIST_EXT_CON_ID_MASK); + if (ep[1] != DIST_HEADER) { if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) ERTS_EXT_HDR_FAIL; @@ -835,14 +839,15 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, ERTS_EXT_FAIL; #endif - return 0; + erts_smp_de_runlock(dep); + + return ERTS_PREP_DIST_EXT_SUCCESS; #undef CHKSIZE #undef ERTS_EXT_FAIL #undef ERTS_EXT_HDR_FAIL - bad_hdr: - if (dep) { + bad_hdr: { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T got a corrupted distribution header from %T " @@ -855,10 +860,11 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, erts_dsprintf(dsbufp, ">>"); erts_send_warning_to_logger_nogl(dsbufp); } - fail: - if (dep) - erts_kill_dist_connection(dep, dep->connection_id); - return -1; + fail: { + erts_smp_de_runlock(dep); + erts_kill_dist_connection(dep, *connection_id); + } + return ERTS_PREP_DIST_EXT_FAILED; } static void diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index f00426cc16..3c61d013da 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -185,8 +185,13 @@ ERTS_GLB_INLINE void *erts_dist_ext_trailer(ErtsDistExternal *); ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Uint); void *erts_dist_ext_trailer(ErtsDistExternal *); void erts_destroy_dist_ext_copy(ErtsDistExternal *); + +#define ERTS_PREP_DIST_EXT_FAILED (-1) +#define ERTS_PREP_DIST_EXT_SUCCESS (0) +#define ERTS_PREP_DIST_EXT_CLOSED (1) + int erts_prepare_dist_ext(ErtsDistExternal *, byte *, Uint, - DistEntry *, ErtsAtomCache *); + DistEntry *, ErtsAtomCache *, Uint32 *); Sint erts_decode_dist_ext_size(ErtsDistExternal *); Eterm erts_decode_dist_ext(ErtsHeapFactory* factory, ErtsDistExternal *); diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 75545df80a..8c59d65332 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -6727,6 +6727,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, else erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len)); if (state & ERTS_PORT_SFLG_DISTRIBUTION) { + erts_smp_atomic64_inc_nob(&prt->dist_entry->in); return erts_net_message(prt, prt->dist_entry, (byte*) hbuf, hlen, @@ -6767,6 +6768,7 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, else erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len)); if (state & ERTS_PORT_SFLG_DISTRIBUTION) { + erts_smp_atomic64_inc_nob(&prt->dist_entry->in); if (len == 0) return erts_net_message(prt, prt->dist_entry, diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam index 58c17dc416..93c85039e7 100644 Binary files a/erts/preloaded/ebin/erlang.beam and b/erts/preloaded/ebin/erlang.beam differ diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam index 6691749dcb..6fbf53492b 100644 Binary files a/erts/preloaded/ebin/erts_internal.beam and b/erts/preloaded/ebin/erts_internal.beam differ diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 72dd804412..4315b53358 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -48,6 +48,12 @@ await_sched_wall_time_modifications/2, gather_gc_info_result/1]). +-export([dist_ctrl_input_handler/2, + dist_ctrl_put_data/2, + dist_ctrl_get_data/1, + dist_ctrl_get_data_notification/1, + dist_get_stat/1]). + -deprecated([now/0]). %% Get rid of autoimports of spawn to avoid clashes with ourselves. @@ -87,6 +93,10 @@ -export_type([prepared_code/0]). +-opaque dist_handle() :: atom(). + +-export_type([dist_handle/0]). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Native code BIF stubs and their types %% (BIF's actually implemented in this module goes last in the file) @@ -1641,7 +1651,7 @@ setnode(_P1, _P2) -> erlang:nif_error(undefined). %% setnode/3 --spec erlang:setnode(P1, P2, P3) -> true when +-spec erlang:setnode(P1, P2, P3) -> dist_handle() when P1 :: atom(), P2 :: port(), P3 :: {term(), term(), term(), term()}. @@ -3203,6 +3213,47 @@ port_set_data(_Port, _Data) -> port_get_data(_Port) -> erlang:nif_error(undefined). +%% +%% Distribution channel management +%% + +-spec erlang:dist_ctrl_input_handler(DHandle, InputHandler) -> 'ok' when + DHandle :: dist_handle(), + InputHandler :: pid(). + +dist_ctrl_input_handler(_DHandle, _InputHandler) -> + erlang:nif_error(undefined). + +-spec erlang:dist_ctrl_put_data(DHandle, Data) -> 'ok' when + DHandle :: dist_handle(), + Data :: iodata(). + +dist_ctrl_put_data(_DHandle, _Data) -> + erlang:nif_error(undefined). + +-spec erlang:dist_ctrl_get_data(DHandle) -> Data | 'none' when + DHandle :: dist_handle(), + Data :: iodata(). + +dist_ctrl_get_data(_DHandle) -> + erlang:nif_error(undefined). + +-spec erlang:dist_ctrl_get_data_notification(DHandle) -> 'ok' when + DHandle :: dist_handle(). + +dist_ctrl_get_data_notification(_DHandle) -> + erlang:nif_error(undefined). + +-spec erlang:dist_get_stat(DHandle) -> Res when + DHandle :: dist_handle(), + InputPackets :: non_neg_integer(), + OutputPackets :: non_neg_integer(), + PendingOutputPackets :: boolean(), + Res :: {'ok', InputPackets, OutputPackets, PendingOutputPackets}. + +dist_get_stat(_DHandle) -> + erlang:nif_error(undefined). + %% %% If the emulator wants to perform a distributed command and %% a connection is not established to the actual node the following diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 26fb1458af..bb1824ecd4 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -61,6 +61,8 @@ -export([trace/3, trace_pattern/3]). +-export([dist_ctrl_put_data/2]). + %% Auto import name clash -export([check_process_code/1]). @@ -461,3 +463,38 @@ trace(_PidSpec, _How, _FlagList) -> FlagList :: [ ]. trace_pattern(_MFA, _MatchSpec, _FlagList) -> erlang:nif_error(undefined). + +-spec dist_ctrl_put_data(DHandle, Data) -> 'ok' when + DHandle :: erlang:dist_handle(), + Data :: iolist(). + +dist_ctrl_put_data(DHandle, IoList) -> + %% + %% Helper for erlang:dist_ctrl_put_data/2 + %% + %% erlang:dist_ctrl_put_data/2 traps to + %% this function if second argument is + %% a list... + %% + try + Binary = erlang:iolist_to_binary(IoList), + %% Restart erlang:dist_ctrl_put_data/2 + %% with the iolist converted to a binary... + erlang:dist_ctrl_put_data(DHandle, Binary) + catch + Class : Reason -> + %% Throw exception as if thrown from + %% erlang:dist_ctrl_put_data/2 ... + RootST = try erlang:error(Reason) + catch + error:Reason -> + case erlang:get_stacktrace() of + [] -> []; + ST -> tl(ST) + end + end, + StackTrace = [{erlang, dist_ctrl_put_data, + [DHandle, IoList], []} + | RootST], + erlang:raise(Class, Reason, StackTrace) + end. -- cgit v1.2.3