aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/beam/dist.c120
-rw-r--r--erts/emulator/beam/dist.h3
-rw-r--r--lib/kernel/src/net_kernel.erl32
3 files changed, 101 insertions, 54 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 1b5b730764..4a25fbdd4c 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -120,7 +120,7 @@ static void clear_dist_entry(DistEntry*);
static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
-static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id);
+static Sint abort_connection(DistEntry* dep, Uint32 conn_id);
static erts_atomic_t no_caches;
static erts_atomic_t no_nodes;
@@ -517,6 +517,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
ASSERT(is_nil(tdep->cid));
ASSERT(i < no_pending);
pending[i++] = tdep;
+ erts_ref_dist_entry(tdep);
}
ASSERT(i == no_pending);
}
@@ -543,7 +544,8 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
if (no_pending) {
for (i = 0; i < no_pending; i++) {
- abort_pending_connection(pending[i], pending[i]->connection_id);
+ abort_connection(pending[i], pending[i]->connection_id);
+ erts_deref_dist_entry(pending[i]);
}
erts_free(ERTS_ALC_T_TMP, pending);
}
@@ -624,7 +626,6 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
reason == am_normal ? am_connection_closed : reason);
clear_dist_entry(dep);
-
}
dec_no_nodes();
@@ -2904,24 +2905,31 @@ erts_dist_port_not_busy(Port *prt)
erts_schedule_dist_command(prt, NULL);
}
+static void kill_connection(DistEntry *dep)
+{
+ ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep));
+ ASSERT(dep->status == ERTS_DE_SFLG_CONNECTED);
+
+ dep->status |= ERTS_DE_SFLG_EXITING;
+ erts_mtx_lock(&dep->qlock);
+ ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
+ erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT);
+ erts_mtx_unlock(&dep->qlock);
+
+ if (is_internal_port(dep->cid))
+ erts_schedule_dist_command(NULL, dep);
+ else if (is_internal_pid(dep->cid))
+ schedule_kill_dist_ctrl_proc(dep->cid);
+}
+
void
erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
{
erts_de_rwlock(dep);
if (connection_id == dep->connection_id
- && !(dep->status & ERTS_DE_SFLG_EXITING)) {
-
- dep->status |= ERTS_DE_SFLG_EXITING;
-
- erts_mtx_lock(&dep->qlock);
- ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
- erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT);
- erts_mtx_unlock(&dep->qlock);
+ && dep->status == ERTS_DE_SFLG_CONNECTED) {
- if (is_internal_port(dep->cid))
- erts_schedule_dist_command(NULL, dep);
- else if (is_internal_pid(dep->cid))
- schedule_kill_dist_ctrl_proc(dep->cid);
+ kill_connection(dep);
}
erts_de_rwunlock(dep);
}
@@ -3260,6 +3268,11 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
}
/*
+ * ToDo: Should we not pass connection_id as well
+ * to make sure it's the right connection we commit.
+ */
+
+ /*
* Arguments seem to be in order.
*/
@@ -3302,6 +3315,23 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
goto badarg;
}
+ if (dep->status & ERTS_DE_SFLG_EXITING) {
+ /* Suspend on dist entry waiting for the exit to finish */
+ ErtsProcList *plp = erts_proclist_create(BIF_P);
+ plp->next = NULL;
+ erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
+ erts_mtx_lock(&dep->qlock);
+ erts_proclist_store_last(&dep->suspended, plp);
+ erts_mtx_unlock(&dep->qlock);
+ goto yield;
+ }
+ if (dep->status != ERTS_DE_SFLG_PENDING) {
+ if (dep->status == 0)
+ erts_set_dist_entry_pending(dep);
+ else
+ goto badarg;
+ }
+
if (is_not_nil(dep->cid))
goto badarg;
@@ -3344,8 +3374,12 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
erts_mtx_unlock(&dep->qlock);
goto yield;
}
-
- ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING));
+ if (dep->status != ERTS_DE_SFLG_PENDING) {
+ if (dep->status == 0)
+ erts_set_dist_entry_pending(dep);
+ else
+ goto badarg;
+ }
if (pp->dist_entry || is_not_nil(dep->cid))
goto badarg;
@@ -3457,7 +3491,11 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1)
BIF_ERROR(BIF_P, BADARG);
}
dep = erts_find_or_insert_dist_entry(BIF_ARG_1);
- ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */
+
+ if (dep == erts_this_dist_entry) {
+ erts_deref_dist_entry(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
erts_de_rwlock(dep);
@@ -3468,8 +3506,8 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1)
conn_id = dep->connection_id;
}
else {
- ASSERT(!"SVERK: What to do?");
- conn_id = dep->connection_id;
+ ASSERT(dep->status & ERTS_DE_SFLG_EXITING);
+ conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK;
}
erts_de_rwunlock(dep);
hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE);
@@ -3478,23 +3516,24 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1)
BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle));
}
-static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id)
+static Sint abort_connection(DistEntry* dep, Uint32 conn_id)
{
- Sint reds = 0;
-
erts_de_rwlock(dep);
- if (dep->status != ERTS_DE_SFLG_PENDING || dep->connection_id != conn_id) {
- erts_de_rwunlock(dep);
+ if (dep->connection_id != conn_id)
+ ;
+ else if (dep->status == ERTS_DE_SFLG_CONNECTED) {
+ kill_connection(dep);
}
- else {
- NetExitsContext nec = {dep};
- ErtsLink *nlinks;
- ErtsLink *node_links;
- ErtsMonitor *monitors;
- ErtsAtomCache *cache;
- ErtsDistOutputBuf *obuf;
+ else if (dep->status == ERTS_DE_SFLG_PENDING) {
+ NetExitsContext nec = {dep};
+ ErtsLink *nlinks;
+ ErtsLink *node_links;
+ ErtsMonitor *monitors;
+ ErtsAtomCache *cache;
+ ErtsDistOutputBuf *obuf;
ErtsProcList *resume_procs;
+ Sint reds = 0;
ASSERT(is_nil(dep->cid));
@@ -3534,10 +3573,11 @@ static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id)
}
delete_cache(cache);
-
free_de_out_queues(dep, obuf);
+ return reds;
}
- return reds;
+ erts_de_rwunlock(dep);
+ return 0;
}
BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2)
@@ -3550,13 +3590,13 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2)
}
tp = tuple_val(BIF_ARG_2);
dep = erts_dhandle_to_dist_entry(tp[2]);
- if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) {
+ if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)
+ || dep == erts_this_dist_entry) {
BIF_ERROR(BIF_P, BADARG);
}
- ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */
if (dep) {
- Sint reds = abort_pending_connection(dep, unsigned_val(tp[1]));
+ Sint reds = abort_connection(dep, unsigned_val(tp[1]));
BUMP_REDS(BIF_P, reds);
}
BIF_RET(am_true);
@@ -3584,11 +3624,13 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks)
net_kernel = erts_whereis_process(proc, proc_locks,
am_net_kernel, nk_locks, 0);
if (!net_kernel) {
- abort_pending_connection(dep, conn_id);
+ abort_connection(dep, conn_id);
return 0;
}
- /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */
+ /*
+ * Send {auto_connect, Node, ConnId, DHandle} to net_kernel
+ */
mp = erts_alloc_message_heap(net_kernel, &nk_locks,
5 + ERTS_MAGIC_REF_THING_SIZE,
&hp, &ohp);
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index 2685a55b97..a205ce0cb5 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -172,7 +172,7 @@ erts_dsig_prepare(ErtsDSigData *dsdp,
return ERTS_DSIG_PREP_NOT_CONNECTED;
dep = erts_find_or_insert_dist_entry(dsdp->node);
- ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */
+ ASSERT(dep != erts_this_dist_entry);
deref_dep = 1;
}
@@ -192,7 +192,6 @@ retry:
res = ERTS_DSIG_PREP_PENDING;
}
else if (dep->status & ERTS_DE_SFLG_EXITING) {
- /* SVERK is this ok, or should we trigger another connection setup */
res = ERTS_DSIG_PREP_NOT_CONNECTED;
goto fail;
}
diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl
index eee915b15b..fb17e7c1b6 100644
--- a/lib/kernel/src/net_kernel.erl
+++ b/lib/kernel/src/net_kernel.erl
@@ -796,19 +796,23 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) ->
AcceptPid ! {self(), {accept_pending, already_pending}},
{noreply, State};
_ ->
- ConnId = case (catch erlang:new_connection_id(Node)) of
- {Nr,_DHandle}=CI when is_integer(Nr) -> CI
- %% SVERK What to do?
- end,
- ets:insert(sys_dist, #connection{node = Node,
- conn_id = ConnId,
- state = pending,
- owner = AcceptPid,
- address = Address,
- type = Type}),
- AcceptPid ! {self(),{accept_pending,ok}},
- Owners = [{AcceptPid,Node} | State#state.conn_owners],
- {noreply, State#state{conn_owners = Owners}}
+ case (catch erlang:new_connection_id(Node)) of
+ {Nr,_DHandle}=ConnId when is_integer(Nr) ->
+ ets:insert(sys_dist, #connection{node = Node,
+ conn_id = ConnId,
+ state = pending,
+ owner = AcceptPid,
+ address = Address,
+ type = Type}),
+ AcceptPid ! {self(),{accept_pending,ok}},
+ Owners = [{AcceptPid,Node} | State#state.conn_owners],
+ {noreply, State#state{conn_owners = Owners}};
+
+ _ ->
+ error_logger:error_msg("~n** Cannot get connection id for node ~w~n",
+ [Node]),
+ AcceptPid ! {self(),{accept_pending,nok_pending}}
+ end
end;
handle_info({SetupPid, {is_pending, Node}}, State) ->
@@ -1009,7 +1013,9 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) ->
AcceptPid = Conn#connection.pending_owner,
Owners = State#state.conn_owners,
Pend = lists:keydelete(AcceptPid, 1, State#state.pend_owners),
+ erlang:abort_connection_id(Node, Conn#connection.conn_id),
Conn1 = Conn#connection { owner = AcceptPid,
+ conn_id = erlang:new_connection_id(Node),
pending_owner = undefined,
state = pending },
ets:insert(sys_dist, Conn1),