diff options
-rw-r--r-- | erts/emulator/beam/dist.c | 120 | ||||
-rw-r--r-- | erts/emulator/beam/dist.h | 3 | ||||
-rw-r--r-- | lib/kernel/src/net_kernel.erl | 32 |
3 files changed, 101 insertions, 54 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 1b5b730764..4a25fbdd4c 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -120,7 +120,7 @@ static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); -static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id); +static Sint abort_connection(DistEntry* dep, Uint32 conn_id); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -517,6 +517,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) ASSERT(is_nil(tdep->cid)); ASSERT(i < no_pending); pending[i++] = tdep; + erts_ref_dist_entry(tdep); } ASSERT(i == no_pending); } @@ -543,7 +544,8 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (no_pending) { for (i = 0; i < no_pending; i++) { - abort_pending_connection(pending[i], pending[i]->connection_id); + abort_connection(pending[i], pending[i]->connection_id); + erts_deref_dist_entry(pending[i]); } erts_free(ERTS_ALC_T_TMP, pending); } @@ -624,7 +626,6 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) reason == am_normal ? am_connection_closed : reason); clear_dist_entry(dep); - } dec_no_nodes(); @@ -2904,24 +2905,31 @@ erts_dist_port_not_busy(Port *prt) erts_schedule_dist_command(prt, NULL); } +static void kill_connection(DistEntry *dep) +{ + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + ASSERT(dep->status == ERTS_DE_SFLG_CONNECTED); + + dep->status |= ERTS_DE_SFLG_EXITING; + erts_mtx_lock(&dep->qlock); + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); + erts_mtx_unlock(&dep->qlock); + + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + else if (is_internal_pid(dep->cid)) + schedule_kill_dist_ctrl_proc(dep->cid); +} + void erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { erts_de_rwlock(dep); if (connection_id == dep->connection_id - && !(dep->status & ERTS_DE_SFLG_EXITING)) { - - dep->status |= ERTS_DE_SFLG_EXITING; - - erts_mtx_lock(&dep->qlock); - ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); - erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); - erts_mtx_unlock(&dep->qlock); + && dep->status == ERTS_DE_SFLG_CONNECTED) { - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); - else if (is_internal_pid(dep->cid)) - schedule_kill_dist_ctrl_proc(dep->cid); + kill_connection(dep); } erts_de_rwunlock(dep); } @@ -3260,6 +3268,11 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) } /* + * ToDo: Should we not pass connection_id as well + * to make sure it's the right connection we commit. + */ + + /* * Arguments seem to be in order. */ @@ -3302,6 +3315,23 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto badarg; } + if (dep->status & ERTS_DE_SFLG_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_mtx_lock(&dep->qlock); + erts_proclist_store_last(&dep->suspended, plp); + erts_mtx_unlock(&dep->qlock); + goto yield; + } + if (dep->status != ERTS_DE_SFLG_PENDING) { + if (dep->status == 0) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } + if (is_not_nil(dep->cid)) goto badarg; @@ -3344,8 +3374,12 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_mtx_unlock(&dep->qlock); goto yield; } - - ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + if (dep->status != ERTS_DE_SFLG_PENDING) { + if (dep->status == 0) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } if (pp->dist_entry || is_not_nil(dep->cid)) goto badarg; @@ -3457,7 +3491,11 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) BIF_ERROR(BIF_P, BADARG); } dep = erts_find_or_insert_dist_entry(BIF_ARG_1); - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + if (dep == erts_this_dist_entry) { + erts_deref_dist_entry(dep); + BIF_ERROR(BIF_P, BADARG); + } erts_de_rwlock(dep); @@ -3468,8 +3506,8 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) conn_id = dep->connection_id; } else { - ASSERT(!"SVERK: What to do?"); - conn_id = dep->connection_id; + ASSERT(dep->status & ERTS_DE_SFLG_EXITING); + conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; } erts_de_rwunlock(dep); hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); @@ -3478,23 +3516,24 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); } -static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) +static Sint abort_connection(DistEntry* dep, Uint32 conn_id) { - Sint reds = 0; - erts_de_rwlock(dep); - if (dep->status != ERTS_DE_SFLG_PENDING || dep->connection_id != conn_id) { - erts_de_rwunlock(dep); + if (dep->connection_id != conn_id) + ; + else if (dep->status == ERTS_DE_SFLG_CONNECTED) { + kill_connection(dep); } - else { - NetExitsContext nec = {dep}; - ErtsLink *nlinks; - ErtsLink *node_links; - ErtsMonitor *monitors; - ErtsAtomCache *cache; - ErtsDistOutputBuf *obuf; + else if (dep->status == ERTS_DE_SFLG_PENDING) { + NetExitsContext nec = {dep}; + ErtsLink *nlinks; + ErtsLink *node_links; + ErtsMonitor *monitors; + ErtsAtomCache *cache; + ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; + Sint reds = 0; ASSERT(is_nil(dep->cid)); @@ -3534,10 +3573,11 @@ static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) } delete_cache(cache); - free_de_out_queues(dep, obuf); + return reds; } - return reds; + erts_de_rwunlock(dep); + return 0; } BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) @@ -3550,13 +3590,13 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) } tp = tuple_val(BIF_ARG_2); dep = erts_dhandle_to_dist_entry(tp[2]); - if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1) + || dep == erts_this_dist_entry) { BIF_ERROR(BIF_P, BADARG); } - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ if (dep) { - Sint reds = abort_pending_connection(dep, unsigned_val(tp[1])); + Sint reds = abort_connection(dep, unsigned_val(tp[1])); BUMP_REDS(BIF_P, reds); } BIF_RET(am_true); @@ -3584,11 +3624,13 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) net_kernel = erts_whereis_process(proc, proc_locks, am_net_kernel, nk_locks, 0); if (!net_kernel) { - abort_pending_connection(dep, conn_id); + abort_connection(dep, conn_id); return 0; } - /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */ + /* + * Send {auto_connect, Node, ConnId, DHandle} to net_kernel + */ mp = erts_alloc_message_heap(net_kernel, &nk_locks, 5 + ERTS_MAGIC_REF_THING_SIZE, &hp, &ohp); diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 2685a55b97..a205ce0cb5 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -172,7 +172,7 @@ erts_dsig_prepare(ErtsDSigData *dsdp, return ERTS_DSIG_PREP_NOT_CONNECTED; dep = erts_find_or_insert_dist_entry(dsdp->node); - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + ASSERT(dep != erts_this_dist_entry); deref_dep = 1; } @@ -192,7 +192,6 @@ retry: res = ERTS_DSIG_PREP_PENDING; } else if (dep->status & ERTS_DE_SFLG_EXITING) { - /* SVERK is this ok, or should we trigger another connection setup */ res = ERTS_DSIG_PREP_NOT_CONNECTED; goto fail; } diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index eee915b15b..fb17e7c1b6 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -796,19 +796,23 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) -> AcceptPid ! {self(), {accept_pending, already_pending}}, {noreply, State}; _ -> - ConnId = case (catch erlang:new_connection_id(Node)) of - {Nr,_DHandle}=CI when is_integer(Nr) -> CI - %% SVERK What to do? - end, - ets:insert(sys_dist, #connection{node = Node, - conn_id = ConnId, - state = pending, - owner = AcceptPid, - address = Address, - type = Type}), - AcceptPid ! {self(),{accept_pending,ok}}, - Owners = [{AcceptPid,Node} | State#state.conn_owners], - {noreply, State#state{conn_owners = Owners}} + case (catch erlang:new_connection_id(Node)) of + {Nr,_DHandle}=ConnId when is_integer(Nr) -> + ets:insert(sys_dist, #connection{node = Node, + conn_id = ConnId, + state = pending, + owner = AcceptPid, + address = Address, + type = Type}), + AcceptPid ! {self(),{accept_pending,ok}}, + Owners = [{AcceptPid,Node} | State#state.conn_owners], + {noreply, State#state{conn_owners = Owners}}; + + _ -> + error_logger:error_msg("~n** Cannot get connection id for node ~w~n", + [Node]), + AcceptPid ! {self(),{accept_pending,nok_pending}} + end end; handle_info({SetupPid, {is_pending, Node}}, State) -> @@ -1009,7 +1013,9 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) -> AcceptPid = Conn#connection.pending_owner, Owners = State#state.conn_owners, Pend = lists:keydelete(AcceptPid, 1, State#state.pend_owners), + erlang:abort_connection_id(Node, Conn#connection.conn_id), Conn1 = Conn#connection { owner = AcceptPid, + conn_id = erlang:new_connection_id(Node), pending_owner = undefined, state = pending }, ets:insert(sys_dist, Conn1), |