aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c102
1 files changed, 72 insertions, 30 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 30390cdb5e..cd799e04b8 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -588,13 +588,13 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
erts_port_task_abort(&dep->dist_cmd);
}
- if (dep->status & ERTS_DE_SFLG_EXITING) {
+ if (dep->state == ERTS_DE_STATE_EXITING) {
#ifdef DEBUG
ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT);
#endif
}
else {
- dep->status |= ERTS_DE_SFLG_EXITING;
+ dep->state = ERTS_DE_STATE_EXITING;
erts_mtx_lock(&dep->qlock);
ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT);
@@ -641,6 +641,14 @@ trap_function(Eterm func, int arity)
return erts_export_put(am_erlang, func, arity);
}
+/*
+ * Sync with dist_util.erl:
+ *
+ * -record(erts_dflags,
+ * {default, mandatory, addable, rejectable, strict_order}).
+ */
+static Eterm erts_dflags_record;
+
void init_dist(void)
{
init_nodes_monitors();
@@ -657,6 +665,16 @@ void init_dist(void)
dist_ctrl_put_data_trap = erts_export_put(am_erts_internal,
am_dist_ctrl_put_data,
2);
+ {
+ Eterm* hp = erts_alloc(ERTS_ALC_T_LITERAL, (1+6)*sizeof(Eterm));
+ erts_dflags_record = TUPLE6(hp, am_erts_dflags,
+ make_small(DFLAG_DIST_DEFAULT),
+ make_small(DFLAG_DIST_MANDATORY),
+ make_small(DFLAG_DIST_ADDABLE),
+ make_small(DFLAG_DIST_REJECTABLE),
+ make_small(DFLAG_DIST_STRICT_ORDER));
+ erts_set_literal_tag(&erts_dflags_record, hp, (1+6));
+ }
}
#define ErtsDistOutputBuf2Binary(OB) \
@@ -767,7 +785,7 @@ static void clear_dist_entry(DistEntry *dep)
erts_atomic64_set_nob(&dep->out, 0);
obuf = clear_de_out_queues(dep);
- dep->status = 0;
+ dep->state = ERTS_DE_STATE_IDLE;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
erts_mtx_unlock(&dep->qlock);
@@ -1982,6 +2000,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
break;
}
ctx->u.ec.flags = ctx->flags;
+ ctx->u.ec.hopefull_flags = 0;
ctx->u.ec.level = 0;
ctx->u.ec.wstack.wstart = NULL;
ctx->obuf->msg_start = ctx->obuf->ext_endp;
@@ -2005,6 +2024,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp;
+ ctx->obuf->hopefull_flags = ctx->u.ec.hopefull_flags;
/*
* Signal encoded; now verify that the connection still exists,
* and if so enqueue the signal and schedule it for send.
@@ -2012,8 +2032,8 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->obuf->next = NULL;
erts_de_rlock(dep);
cid = dep->cid;
- if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED))
- || dep->status & ERTS_DE_SFLG_EXITING
+ if (dep->state == ERTS_DE_STATE_EXITING
+ || dep->state == ERTS_DE_STATE_IDLE
|| dep->connection_id != dsdp->connection_id) {
/* Not the same connection as when we started; drop message... */
erts_de_runlock(dep);
@@ -2088,7 +2108,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
}
erts_mtx_unlock(&dep->qlock);
- if (!(dep->status & ERTS_DE_SFLG_PENDING)) {
+ if (dep->state != ERTS_DE_STATE_PENDING) {
if (is_internal_port(dep->cid))
erts_schedule_dist_command(NULL, dep);
}
@@ -2270,7 +2290,7 @@ int
erts_dist_command(Port *prt, int initial_reds)
{
Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START;
- Uint32 status;
+ enum dist_entry_state state;
Uint32 flags;
Sint qsize, obufsize = 0;
ErtsDistOutputQueue oq, foq;
@@ -2285,17 +2305,17 @@ erts_dist_command(Port *prt, int initial_reds)
erts_de_rlock(dep);
flags = dep->flags;
- status = dep->status;
+ state = dep->state;
send = dep->send;
erts_de_runlock(dep);
- if (status & ERTS_DE_SFLG_EXITING) {
+ if (state == ERTS_DE_STATE_EXITING) {
erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1);
reds -= ERTS_PORT_REDS_DIST_CMD_EXIT;
return initial_reds - reds;
}
- ASSERT(!(status & ERTS_DE_SFLG_PENDING));
+ ASSERT(state != ERTS_DE_STATE_PENDING);
ASSERT(send);
@@ -2831,7 +2851,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
erts_de_rlock(dep);
- if (dep->status & ERTS_DE_SFLG_EXITING)
+ if (dep->state == ERTS_DE_STATE_EXITING)
goto return_none;
ASSERT(dep->cid == BIF_P->common.id);
@@ -2934,9 +2954,9 @@ erts_dist_port_not_busy(Port *prt)
static void kill_connection(DistEntry *dep)
{
ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep));
- ASSERT(dep->status == ERTS_DE_SFLG_CONNECTED);
+ ASSERT(dep->state == ERTS_DE_STATE_CONNECTED);
- dep->status |= ERTS_DE_SFLG_EXITING;
+ dep->state = ERTS_DE_STATE_EXITING;
erts_mtx_lock(&dep->qlock);
ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT);
@@ -2953,7 +2973,7 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
{
erts_de_rwlock(dep);
if (connection_id == dep->connection_id
- && dep->status == ERTS_DE_SFLG_CONNECTED) {
+ && dep->state == ERTS_DE_STATE_CONNECTED) {
kill_connection(dep);
}
@@ -3341,7 +3361,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
goto badarg;
}
- if (dep->status & ERTS_DE_SFLG_EXITING) {
+ if (dep->state == ERTS_DE_STATE_EXITING) {
/* Suspend on dist entry waiting for the exit to finish */
ErtsProcList *plp = erts_proclist_create(BIF_P);
plp->next = NULL;
@@ -3351,8 +3371,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
erts_mtx_unlock(&dep->qlock);
goto yield;
}
- if (dep->status != ERTS_DE_SFLG_PENDING) {
- if (dep->status == 0)
+ if (dep->state != ERTS_DE_STATE_PENDING) {
+ if (dep->state == ERTS_DE_STATE_IDLE)
erts_set_dist_entry_pending(dep);
else
goto badarg;
@@ -3390,7 +3410,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
goto done; /* Already set */
}
- if (dep->status & ERTS_DE_SFLG_EXITING) {
+ if (dep->state == ERTS_DE_STATE_EXITING) {
/* Suspend on dist entry waiting for the exit to finish */
ErtsProcList *plp = erts_proclist_create(BIF_P);
plp->next = NULL;
@@ -3400,8 +3420,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
erts_mtx_unlock(&dep->qlock);
goto yield;
}
- if (dep->status != ERTS_DE_SFLG_PENDING) {
- if (dep->status == 0)
+ if (dep->state != ERTS_DE_STATE_PENDING) {
+ if (dep->state == ERTS_DE_STATE_IDLE)
erts_set_dist_entry_pending(dep);
else
goto badarg;
@@ -3437,7 +3457,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
#ifdef DEBUG
ASSERT(erts_atomic_read_nob(&dep->qsize) == 0
- || (dep->status & ERTS_DE_SFLG_PENDING));
+ || (dep->state == ERTS_DE_STATE_PENDING));
#endif
if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
@@ -3506,6 +3526,11 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
goto done;
}
+BIF_RETTYPE erts_internal_get_dflags_0(BIF_ALIST_0)
+{
+ return erts_dflags_record;
+}
+
BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1)
{
DistEntry* dep;
@@ -3525,15 +3550,20 @@ BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1)
erts_de_rwlock(dep);
- if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING)
+ switch (dep->state) {
+ case ERTS_DE_STATE_PENDING:
+ case ERTS_DE_STATE_CONNECTED:
conn_id = dep->connection_id;
- else if (dep->status == 0) {
+ break;
+ case ERTS_DE_STATE_IDLE:
erts_set_dist_entry_pending(dep);
conn_id = dep->connection_id;
- }
- else {
- ASSERT(dep->status & ERTS_DE_SFLG_EXITING);
+ break;
+ case ERTS_DE_STATE_EXITING:
conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK;
+ break;
+ default:
+ erts_exit(ERTS_ABORT_EXIT, "Invalid dep->state (%d)\n", dep->state);
}
erts_de_rwunlock(dep);
hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE);
@@ -3548,10 +3578,10 @@ static Sint abort_connection(DistEntry* dep, Uint32 conn_id)
if (dep->connection_id != conn_id)
;
- else if (dep->status == ERTS_DE_SFLG_CONNECTED) {
+ else if (dep->state == ERTS_DE_STATE_CONNECTED) {
kill_connection(dep);
}
- else if (dep->status == ERTS_DE_SFLG_PENDING) {
+ else if (dep->state == ERTS_DE_STATE_PENDING) {
NetExitsContext nec = {dep};
ErtsLink *nlinks;
ErtsLink *node_links;
@@ -3600,6 +3630,17 @@ static Sint abort_connection(DistEntry* dep, Uint32 conn_id)
delete_cache(cache);
free_de_out_queues(dep, obuf);
+
+ /*
+ * We wait to make DistEntry idle and accept new connection attempts
+ * until all is cleared and deallocated. This to get some back pressure
+ * against repeated failing connection attempts saturating all CPUs
+ * with cleanup jobs.
+ */
+ erts_de_rwlock(dep);
+ ASSERT(dep->state == ERTS_DE_STATE_EXITING);
+ dep->state = ERTS_DE_STATE_IDLE;
+ erts_de_rwunlock(dep);
return reds;
}
erts_de_rwunlock(dep);
@@ -3631,7 +3672,7 @@ BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2)
int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks)
{
erts_de_rwlock(dep);
- if (dep->status != 0) {
+ if (dep->state != ERTS_DE_STATE_IDLE) {
erts_de_rwunlock(dep);
}
else {
@@ -3907,7 +3948,8 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options)
erts_proc_lock(p, ERTS_PROC_LOCK_LINK);
erts_de_rlock(dep);
- if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED))) {
+ if (dep->state == ERTS_DE_STATE_IDLE) {
+ ASSERT(!dep->node_links);
erts_proc_unlock(p, ERTS_PROC_LOCK_LINK);
erts_de_runlock(dep);
goto done;