aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c406
1 files changed, 252 insertions, 154 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 026f0a62d4..70474898b2 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -3138,60 +3138,60 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
BIF_ERROR(BIF_P, BADARG);
}
-/**********************************************************************
- ** Allocate a dist entry, set node name install the connection handler
- ** setnode_3({name@host, Creation}, Cid, {Type, Version, Initial, IC, OC})
- ** Type = flag field, where the flags are specified in dist.h
- ** Version = distribution version, >= 1
- ** IC = in_cookie (ignored)
- ** OC = out_cookie (ignored)
- **
- ** Note that in distribution protocols above 1, the Initial parameter
- ** is always NIL and the cookies are always the atom '', cookies are not
- ** sent in the distribution messages but are only used in
- ** the handshake.
- **
- ***********************************************************************/
+/*
+ * erts_internal:create_dist_channel/4 is used by
+ * erlang:setnode/3.
+ */
+
+typedef struct {
+ DistEntry *dep;
+ Uint flags;
+ Uint version;
+} ErtsSetupConnDistCtrl;
+
+static void
+setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
+ Eterm ctrlr, Uint flags,
+ Uint version);
-BIF_RETTYPE setnode_3(BIF_ALIST_3)
+static Eterm
+setup_connection_distctrl(Process *c_p, void *arg,
+ int *redsp, ErlHeapFragment **bpp);
+
+BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
{
BIF_RETTYPE ret;
Uint flags;
- unsigned long version;
- Eterm ic, oc;
- Eterm *tp;
+ Uint version;
+ Eterm *hp, res_tag = THE_NON_VALUE, res = THE_NON_VALUE;
DistEntry *dep = NULL;
- ErtsProcLocks proc_unlock = 0;
- Process *proc;
+ int de_locked = 0;
Port *pp = NULL;
- Eterm notify_proc;
- erts_aint32_t qflgs;
/*
* Check and pick out arguments
*/
- if (!is_node_name_atom(BIF_ARG_1) ||
- !(is_internal_port(BIF_ARG_2)
- || is_internal_pid(BIF_ARG_2))
- || (erts_this_node->sysname == am_Noname)) {
- goto badarg;
- }
+ /* Node name... */
+ if (!is_node_name_atom(BIF_ARG_1))
+ goto badarg;
- if (!is_tuple(BIF_ARG_3))
- goto badarg;
- tp = tuple_val(BIF_ARG_3);
- if (*tp++ != make_arityval(4))
- goto badarg;
- if (!is_small(*tp))
- goto badarg;
- flags = unsigned_val(*tp++);
- if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0)
- goto badarg;
- ic = *(++tp);
- oc = *(++tp);
- if (!is_atom(ic) || !is_atom(oc))
- goto badarg;
+ /* Distribution controller... */
+ if (!is_internal_port(BIF_ARG_2) && !is_internal_pid(BIF_ARG_2))
+ goto badarg;
+
+ /* Dist flags... */
+ if (!is_small(BIF_ARG_3))
+ goto badarg;
+ flags = unsigned_val(BIF_ARG_3);
+
+ /* Version... */
+ if (!is_small(BIF_ARG_4))
+ goto badarg;
+ version = unsigned_val(BIF_ARG_4);
+
+ if (version == 0)
+ goto badarg;
if (~flags & DFLAG_DIST_MANDATORY) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
@@ -3222,74 +3222,79 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
else if (!dep)
goto system_limit; /* Should never happen!!! */
+ erts_de_rlock(dep);
+ de_locked = -1;
+
+ if (dep->state == ERTS_DE_STATE_EXITING) {
+ /* Suspend on dist entry waiting for the exit to finish */
+ ErtsProcList *plp = erts_proclist_create(BIF_P);
+ plp->next = NULL;
+ erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
+ erts_mtx_lock(&dep->qlock);
+ erts_proclist_store_last(&dep->suspended, plp);
+ erts_mtx_unlock(&dep->qlock);
+ goto yield;
+ }
+
+ erts_de_runlock(dep);
+ de_locked = 0;
+
if (is_internal_pid(BIF_ARG_2)) {
if (BIF_P->common.id == BIF_ARG_2) {
- proc_unlock = 0;
- proc = BIF_P;
- }
- else {
- proc_unlock = ERTS_PROC_LOCK_MAIN;
- proc = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN,
- BIF_ARG_2, proc_unlock);
- }
- erts_de_rwlock(dep);
-
- if (!proc)
- goto badarg;
- else if (proc == ERTS_PROC_LOCK_BUSY) {
- proc_unlock = 0;
- goto yield;
- }
+ ErtsSetupConnDistCtrl scdc;
- erts_proc_lock(proc, ERTS_PROC_LOCK_STATUS);
- proc_unlock |= ERTS_PROC_LOCK_STATUS;
+ scdc.dep = dep;
+ scdc.flags = flags;
+ scdc.version = version;
- if (ERTS_PROC_GET_DIST_ENTRY(proc)) {
- if (dep == ERTS_PROC_GET_DIST_ENTRY(proc)
- && (proc->flags & F_DISTRIBUTION)
- && dep->cid == BIF_ARG_2) {
- ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep));
- goto done;
- }
- goto badarg;
- }
+ res = setup_connection_distctrl(BIF_P, &scdc, NULL, NULL);
+ BUMP_REDS(BIF_P, 5);
+ dep = NULL;
- if (dep->state == ERTS_DE_STATE_EXITING) {
- /* Suspend on dist entry waiting for the exit to finish */
- ErtsProcList *plp = erts_proclist_create(BIF_P);
- plp->next = NULL;
- erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_mtx_lock(&dep->qlock);
- erts_proclist_store_last(&dep->suspended, plp);
- erts_mtx_unlock(&dep->qlock);
- goto yield;
- }
- if (dep->state != ERTS_DE_STATE_PENDING) {
- if (dep->state == ERTS_DE_STATE_IDLE)
- erts_set_dist_entry_pending(dep);
- else
+ if (res == am_badarg)
goto badarg;
+
+ ASSERT(is_internal_magic_ref(res));
+ res_tag = am_ok; /* Connection up */
}
+ else {
+ ErtsSetupConnDistCtrl *scdcp;
- if (is_not_nil(dep->cid))
- goto badarg;
+ scdcp = erts_alloc(ERTS_ALC_T_SETUP_CONN_ARG,
+ sizeof(ErtsSetupConnDistCtrl));
- proc->flags |= F_DISTRIBUTION;
- ERTS_PROC_SET_DIST_ENTRY(proc, dep);
+ scdcp->dep = dep;
+ scdcp->flags = flags;
+ scdcp->version = version;
- proc_unlock &= ~ERTS_PROC_LOCK_STATUS;
- erts_proc_unlock(proc, ERTS_PROC_LOCK_STATUS);
+ res = erts_proc_sig_send_rpc_request(BIF_P,
+ BIF_ARG_2,
+ !0,
+ setup_connection_distctrl,
+ (void *) scdcp);
+ if (is_non_value(res))
+ goto badarg;
- dep->send = NULL; /* Only for distr ports... */
+ dep = NULL;
+ ASSERT(is_internal_ordinary_ref(res));
+
+ res_tag = am_message; /* Caller need to wait for dhandle in message */
+ }
+ hp = HAlloc(BIF_P, 3);
}
else {
+ int new;
pp = erts_id2port_sflgs(BIF_ARG_2,
BIF_P,
ERTS_PROC_LOCK_MAIN,
ERTS_PORT_SFLGS_INVALID_LOOKUP);
erts_de_rwlock(dep);
+ de_locked = 1;
+
+ if (dep->state == ERTS_DE_STATE_EXITING)
+ goto badarg;
if (!pp || (erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLG_EXITING))
@@ -3298,65 +3303,108 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
goto badarg;
- if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) {
- ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep));
- goto done; /* Already set */
- }
+ if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
+ new = 0;
+ else {
+ if (dep->state != ERTS_DE_STATE_PENDING) {
+ if (dep->state == ERTS_DE_STATE_IDLE)
+ erts_set_dist_entry_pending(dep);
+ else
+ goto badarg;
+ }
- if (dep->state == ERTS_DE_STATE_EXITING) {
- /* Suspend on dist entry waiting for the exit to finish */
- ErtsProcList *plp = erts_proclist_create(BIF_P);
- plp->next = NULL;
- erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_mtx_lock(&dep->qlock);
- erts_proclist_store_last(&dep->suspended, plp);
- erts_mtx_unlock(&dep->qlock);
- goto yield;
- }
- if (dep->state != ERTS_DE_STATE_PENDING) {
- if (dep->state == ERTS_DE_STATE_IDLE)
- erts_set_dist_entry_pending(dep);
- else
+ if (pp->dist_entry || is_not_nil(dep->cid))
goto badarg;
- }
- if (pp->dist_entry || is_not_nil(dep->cid))
- goto badarg;
+ erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
- erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+ pp->dist_entry = dep;
- pp->dist_entry = dep;
+ ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
- ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
+ dep->send = (pp->drv_ptr->outputv
+ ? dist_port_commandv
+ : dist_port_command);
+ ASSERT(dep->send);
- dep->send = (pp->drv_ptr->outputv
- ? dist_port_commandv
- : dist_port_command);
- ASSERT(dep->send);
+ /*
+ * Dist-ports do not use the "busy port message queue" functionality, but
+ * instead use "busy dist entry" functionality.
+ */
+ {
+ ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
+ erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
+ }
- /*
- * Dist-ports do not use the "busy port message queue" functionality, but
- * instead use "busy dist entry" functionality.
- */
- {
- ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
- erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
+ setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version);
+ de_locked = 0;
+ new = !0;
}
+ hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE);
+ res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep);
+ res_tag = am_ok; /* Connection up */
+ if (new)
+ dep = NULL; /* inc of refc transferred to port (dist_entry field) */
+ }
+
+ ASSERT(is_value(res) && is_value(res_tag));
+
+ res = TUPLE2(hp, res_tag, res);
+
+ ERTS_BIF_PREP_RET(ret, res);
+
+ done:
+
+ if (dep && dep != erts_this_dist_entry) {
+ if (de_locked) {
+ if (de_locked > 0)
+ erts_de_rwunlock(dep);
+ else
+ erts_de_runlock(dep);
+ }
+ erts_deref_dist_entry(dep);
}
+ if (pp)
+ erts_port_release(pp);
+
+ return ret;
+
+ yield:
+ ERTS_BIF_PREP_YIELD4(ret,
+ bif_export[BIF_erts_internal_create_dist_channel_4],
+ BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4);
+ goto done;
+
+ badarg:
+ ERTS_BIF_PREP_RET(ret, am_badarg);
+ goto done;
+
+ system_limit:
+ ERTS_BIF_PREP_RET(ret, am_system_limit);
+ goto done;
+}
+
+static void
+setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
+ Eterm ctrlr, Uint flags,
+ Uint version)
+{
+ Eterm notify_proc = NIL;
+ erts_aint32_t qflgs;
+
dep->version = version;
dep->creation = 0;
-#ifdef DEBUG
+ ASSERT(is_internal_port(ctrlr) || is_internal_pid(ctrlr));
ASSERT(erts_atomic_read_nob(&dep->qsize) == 0
|| (dep->state == ERTS_DE_STATE_PENDING));
-#endif
if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
create_cache(dep);
- erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
+ erts_set_dist_entry_connected(dep, ctrlr, flags);
notify_proc = NIL;
if (erts_atomic_read_nob(&dep->qsize)) {
@@ -3375,50 +3423,100 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
}
}
}
- erts_de_rwunlock(dep);
- if (is_internal_pid(notify_proc))
- notify_dist_data(BIF_P, notify_proc);
- ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep));
+ erts_de_rwunlock(dep);
- dep = NULL; /* inc of refc transferred to port (dist_entry field) */
+ if (is_internal_pid(notify_proc))
+ notify_dist_data(c_p, notify_proc);
inc_no_nodes();
- send_nodes_mon_msgs(BIF_P,
+ send_nodes_mon_msgs(c_p,
am_nodeup,
- BIF_ARG_1,
+ dep->sysname,
flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
NIL);
- done:
+}
- if (dep && dep != erts_this_dist_entry) {
- erts_de_rwunlock(dep);
- erts_deref_dist_entry(dep);
+static Eterm
+setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp)
+{
+ ErtsSetupConnDistCtrl *scdcp = (ErtsSetupConnDistCtrl *) arg;
+ DistEntry *dep = scdcp->dep;
+ int dep_locked = 0;
+ Eterm *hp;
+ erts_aint32_t state;
+
+ if (redsp)
+ *redsp = 1;
+
+ state = erts_atomic32_read_nob(&c_p->state);
+
+ if (state & ERTS_PSFLG_EXITING)
+ goto badarg;
+
+ erts_de_rwlock(dep);
+ dep_locked = !0;
+
+ if (dep->state == ERTS_DE_STATE_EXITING)
+ goto badarg;
+
+ if (ERTS_PROC_GET_DIST_ENTRY(c_p)) {
+ if (dep == ERTS_PROC_GET_DIST_ENTRY(c_p)
+ && (c_p->flags & F_DISTRIBUTION)
+ && dep->cid == c_p->common.id) {
+ goto connected;
+ }
+ goto badarg;
}
- if (pp)
- erts_port_release(pp);
+ if (dep->state != ERTS_DE_STATE_PENDING) {
+ if (dep->state == ERTS_DE_STATE_IDLE)
+ erts_set_dist_entry_pending(dep);
+ else
+ goto badarg;
+ }
- if (proc_unlock)
- erts_proc_unlock(proc, proc_unlock);
+ if (is_not_nil(dep->cid))
+ goto badarg;
- return ret;
+ c_p->flags |= F_DISTRIBUTION;
+ ERTS_PROC_SET_DIST_ENTRY(c_p, dep);
- yield:
- ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P,
- BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
- goto done;
+ dep->send = NULL; /* Only for distr ports... */
- badarg:
- ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
- goto done;
+ if (redsp)
+ *redsp = 5;
- system_limit:
- ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT);
- goto done;
+ setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id,
+ scdcp->flags, scdcp->version);
+connected:
+
+ /* we take over previous inc in refc of dep */
+
+ if (!bpp) /* called directly... */
+ return erts_make_dhandle(c_p, dep);
+
+ erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);
+
+ *bpp = new_message_buffer(ERTS_MAGIC_REF_THING_SIZE);
+ hp = (*bpp)->mem;
+ return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep);
+
+badarg:
+
+ if (bpp) /* not called directly */
+ erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);
+
+ if (dep_locked)
+ erts_de_rwunlock(dep);
+
+ erts_deref_dist_entry(dep);
+
+ return am_badarg;
}
+
BIF_RETTYPE erts_internal_get_dflags_0(BIF_ALIST_0)
{
return erts_dflags_record;