aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2019-01-29 10:19:47 +0100
committerLukas Larsson <[email protected]>2019-02-22 11:12:54 +0100
commit1066040c35e96566e415c50042345fba865f10c8 (patch)
tree8d36d991907bf6a1f4b34667b45889f8edc57c48 /erts/emulator/beam/dist.c
parent6493f5e396c7528ef6696ce440619550edb5c6ff (diff)
downloadotp-1066040c35e96566e415c50042345fba865f10c8.tar.gz
otp-1066040c35e96566e415c50042345fba865f10c8.tar.bz2
otp-1066040c35e96566e415c50042345fba865f10c8.zip
erts: Refactor ErtsSendContext to be ErtsDSigSendContext
This commit removed the general send context (which was used very little anyways) and only uses the distributed send context. This will make it easier to use the dist API at the cost of a little bit more code for the local send.
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c416
1 files changed, 225 insertions, 191 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 749760a1b3..7856d8003c 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -177,9 +177,9 @@ static Export *dist_ctrl_put_data_trap;
/* forward declarations */
-static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, Eterm from, int force_busy);
-static int dsig_send_exit_ctx(ErtsSendContext *ctx, Eterm ctl, Eterm msg);
-static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
+static void erts_schedule_dist_command(Port *, DistEntry *);
+static int dsig_send_exit(ErtsDSigSendContext *ctx, Eterm ctl, Eterm msg);
+static int dsig_send_ctl(ErtsDSigSendContext *ctx, Eterm ctl);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
static Sint abort_connection(DistEntry* dep, Uint32 conn_id);
@@ -860,20 +860,20 @@ static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf)
int erts_dsend_context_dtor(Binary* ctx_bin)
{
- ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin);
- switch (ctx->dss.phase) {
+ ErtsDSigSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin);
+ switch (ctx->phase) {
case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
- DESTROY_SAVED_WSTACK(&ctx->dss.u.sc.wstack);
+ DESTROY_SAVED_WSTACK(&ctx->u.sc.wstack);
break;
case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
- DESTROY_SAVED_WSTACK(&ctx->dss.u.ec.wstack);
+ DESTROY_SAVED_WSTACK(&ctx->u.ec.wstack);
break;
default:;
}
- if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) {
+ if (ctx->phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->obuf) {
int i;
- for (i = 0; i < ctx->dss.fragments; i++)
- free_dist_obuf(&ctx->dss.obuf[i]);
+ for (i = 0; i < ctx->fragments; i++)
+ free_dist_obuf(&ctx->obuf[i]);
}
if (ctx->deref_dep)
erts_deref_dist_entry(ctx->dep);
@@ -881,10 +881,10 @@ int erts_dsend_context_dtor(Binary* ctx_bin)
return 1;
}
-Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx)
+Eterm erts_dsend_export_trap_context(Process* p, ErtsDSigSendContext* ctx)
{
struct exported_ctx {
- ErtsSendContext ctx;
+ ErtsDSigSendContext ctx;
ErtsAtomCacheMap acm;
};
Binary* ctx_bin = erts_create_magic_binary(sizeof(struct exported_ctx),
@@ -892,12 +892,12 @@ Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx)
struct exported_ctx* dst = ERTS_MAGIC_BIN_DATA(ctx_bin);
Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
- sys_memcpy(&dst->ctx, ctx, sizeof(ErtsSendContext));
- ASSERT(ctx->dss.ctl == make_tuple(ctx->ctl_heap));
- dst->ctx.dss.ctl = make_tuple(dst->ctx.ctl_heap);
- if (ctx->dss.acmp) {
- sys_memcpy(&dst->acm, ctx->dss.acmp, sizeof(ErtsAtomCacheMap));
- dst->ctx.dss.acmp = &dst->acm;
+ sys_memcpy(&dst->ctx, ctx, sizeof(ErtsDSigSendContext));
+ ASSERT(ctx->ctl == make_tuple(ctx->ctl_heap));
+ dst->ctx.ctl = make_tuple(dst->ctx.ctl_heap);
+ if (ctx->acmp) {
+ sys_memcpy(&dst->acm, ctx->acmp, sizeof(ErtsAtomCacheMap));
+ dst->ctx.acmp = &dst->acm;
}
return erts_mk_magic_ref(&hp, &MSO(p), ctx_bin);
}
@@ -918,78 +918,58 @@ Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx)
** Send a DOP_LINK link message
*/
int
-erts_dsig_send_link(ErtsDSigData *dsdp, Eterm local, Eterm remote)
+erts_dsig_send_link(ErtsDSigSendContext *ctx, Eterm local, Eterm remote)
{
- DeclareTmpHeapNoproc(ctl_heap,4);
- Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_LINK), local, remote);
- int res;
- UseTmpHeapNoproc(4);
-
- res = dsig_send_ctl(dsdp, ctl, 0);
- UnUseTmpHeapNoproc(4);
- return res;
+ Eterm ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_LINK), local, remote);
+ return dsig_send_ctl(ctx, ctl);
}
int
-erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote)
+erts_dsig_send_unlink(ErtsDSigSendContext *ctx, Eterm local, Eterm remote)
{
- DeclareTmpHeapNoproc(ctl_heap,4);
- Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_UNLINK), local, remote);
- int res;
-
- UseTmpHeapNoproc(4);
- res = dsig_send_ctl(dsdp, ctl, 0);
- UnUseTmpHeapNoproc(4);
- return res;
+ Eterm ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_UNLINK), local, remote);
+ return dsig_send_ctl(ctx, ctl);
}
/* A local process that's being monitored by a remote one exits. We send:
{DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason} */
int
-erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm from, Eterm watcher, Eterm watched,
+erts_dsig_send_m_exit(ErtsDSigSendContext *ctx, Eterm watcher, Eterm watched,
Eterm ref, Eterm reason)
{
Eterm ctl, msg;
- DeclareTmpHeapNoproc(ctl_heap,6);
- int res;
- if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ if (~ctx->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
/*
* Receiver does not support DOP_MONITOR_P_EXIT (see dsig_send_monitor)
*/
return ERTS_DSIG_SEND_OK;
}
- UseTmpHeapNoproc(6);
-
- if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
- ctl = TUPLE4(&ctl_heap[0], make_small(DOP_PAYLOAD_MONITOR_P_EXIT),
+ if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_MONITOR_P_EXIT),
watched, watcher, ref);
msg = reason;
} else {
- ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
+ ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
watched, watcher, ref, reason);
msg = THE_NON_VALUE;
}
- res = dsig_send_exit(dsdp, ctl, msg, from, 1);
- UnUseTmpHeapNoproc(6);
- return res;
+ return dsig_send_exit(ctx, ctl, msg);
}
/* We want to monitor a process (named or unnamed) on another node, we send:
{DOP_MONITOR_P, Local pid, Remote pid or name, Ref}, which is exactly what's
needed on the other side... */
int
-erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
+erts_dsig_send_monitor(ErtsDSigSendContext *ctx, Eterm watcher, Eterm watched,
Eterm ref)
{
Eterm ctl;
- DeclareTmpHeapNoproc(ctl_heap,5);
- int res;
- if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ if (~ctx->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
/*
* Receiver does not support DOP_MONITOR_P.
* Just avoid sending it and by doing that reduce this monitor
@@ -999,48 +979,40 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
return ERTS_DSIG_SEND_OK;
}
- UseTmpHeapNoproc(5);
- ctl = TUPLE4(&ctl_heap[0],
+ ctl = TUPLE4(&ctx->ctl_heap[0],
make_small(DOP_MONITOR_P),
watcher, watched, ref);
- res = dsig_send_ctl(dsdp, ctl, 0);
- UnUseTmpHeapNoproc(5);
- return res;
+ return dsig_send_ctl(ctx, ctl);
}
/* A local process monitoring a remote one wants to stop monitoring, either
because of a demonitor bif call or because the local process died. We send
{DOP_DEMONITOR_P, Local pid, Remote pid or name, ref} */
int
-erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
- Eterm watched, Eterm ref, int force)
+erts_dsig_send_demonitor(ErtsDSigSendContext *ctx, Eterm watcher,
+ Eterm watched, Eterm ref)
{
Eterm ctl;
- DeclareTmpHeapNoproc(ctl_heap,5);
- int res;
- if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ if (~ctx->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
/*
* Receiver does not support DOP_DEMONITOR_P (see dsig_send_monitor)
*/
return ERTS_DSIG_SEND_OK;
}
- UseTmpHeapNoproc(5);
- ctl = TUPLE4(&ctl_heap[0],
+ ctl = TUPLE4(&ctx->ctl_heap[0],
make_small(DOP_DEMONITOR_P),
watcher, watched, ref);
- res = dsig_send_ctl(dsdp, ctl, force);
- UnUseTmpHeapNoproc(5);
- return res;
+ return dsig_send_ctl(ctx, ctl);
}
-static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) {
+static int can_send_seqtrace_token(ErtsDSigSendContext* ctx, Eterm token) {
Eterm label;
- if (ctx->dsd.flags & DFLAG_BIG_SEQTRACE_LABELS) {
+ if (ctx->flags & DFLAG_BIG_SEQTRACE_LABELS) {
/* The other end is capable of handling arbitrary seq_trace labels. */
return 1;
}
@@ -1056,11 +1028,11 @@ static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) {
}
int
-erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
+erts_dsig_send_msg(ErtsDSigSendContext* ctx, Eterm remote, Eterm message)
{
Eterm ctl;
Eterm token = NIL;
- Process *sender = ctx->dsd.proc;
+ Process *sender = ctx->c_p;
int res;
#ifdef USE_VM_PROBES
Sint tok_label = 0;
@@ -1081,7 +1053,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
*node_name = *sender_name = *receiver_name = '\0';
if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) {
erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)),
- "%T", ctx->dsd.dep->sysname);
+ "%T", ctx->dep->sysname);
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
"%T", sender->common.id);
erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
@@ -1101,7 +1073,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
send_token = (token != NIL && can_send_seqtrace_token(ctx, token));
- if (ctx->dsd.flags & DFLAG_SEND_SENDER) {
+ if (ctx->flags & DFLAG_SEND_SENDER) {
dist_op = make_small(send_token ?
DOP_SEND_SENDER_TT :
DOP_SEND_SENDER);
@@ -1124,21 +1096,18 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
msize, tok_label, tok_lastcnt, tok_serial);
DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- ctx->dss.ctl = ctl;
- ctx->dss.msg = message;
- ctx->dss.force_busy = 0;
- ctx->dss.force_encode = 0;
- res = erts_dsig_send(&ctx->dsd, &ctx->dss);
+ ctx->ctl = ctl;
+ ctx->msg = message;
+ res = erts_dsig_send(ctx);
return res;
}
int
-erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, ErtsSendContext* ctx)
+erts_dsig_send_reg_msg(ErtsDSigSendContext* ctx, Eterm remote_name, Eterm message)
{
Eterm ctl;
Eterm token = NIL;
- Process *sender = ctx->dsd.proc;
- int res;
+ Process *sender = ctx->c_p;
#ifdef USE_VM_PROBES
Sint tok_label = 0;
Sint tok_lastcnt = 0;
@@ -1158,7 +1127,7 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, ErtsSendContext* ctx)
*node_name = *sender_name = *receiver_name = '\0';
if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) {
erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)),
- "%T", ctx->dsd.dep->sysname);
+ "%T", ctx->dep->sysname);
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
"%T", sender->common.id);
erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
@@ -1183,24 +1152,19 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, ErtsSendContext* ctx)
msize, tok_label, tok_lastcnt, tok_serial);
DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- ctx->dss.ctl = ctl;
- ctx->dss.msg = message;
- ctx->dss.force_busy = 0;
- ctx->dss.force_encode = 0;
- res = erts_dsig_send(&ctx->dsd, &ctx->dss);
- return res;
+ ctx->ctl = ctl;
+ ctx->msg = message;
+ return erts_dsig_send(ctx);
}
/* local has died, deliver the exit signal to remote */
int
-erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
+erts_dsig_send_exit_tt(ErtsDSigSendContext *ctx, Eterm local, Eterm remote,
Eterm reason, Eterm token)
{
Eterm ctl, msg = THE_NON_VALUE;
- DeclareTmpHeapNoproc(ctl_heap,6);
- int res;
#ifdef USE_VM_PROBES
- Process *sender = dsdp->proc;
+ Process *sender = ctx->c_p;
Sint tok_label = 0;
Sint tok_lastcnt = 0;
Sint tok_serial = 0;
@@ -1210,31 +1174,29 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
DTRACE_CHARBUF(reason_str, 128);
#endif
- UseTmpHeapNoproc(6);
-
- if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD)
+ if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD)
msg = reason;
if (have_seqtrace(token)) {
- seq_trace_update_send(dsdp->proc);
+ seq_trace_update_send(ctx->c_p);
seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local);
- if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
- ctl = TUPLE4(&ctl_heap[0],
+ if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE4(&ctx->ctl_heap[0],
make_small(DOP_PAYLOAD_EXIT_TT), local, remote, token);
} else
- ctl = TUPLE5(&ctl_heap[0],
+ ctl = TUPLE5(&ctx->ctl_heap[0],
make_small(DOP_EXIT_TT), local, remote, token, reason);
} else {
- if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD)
- ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote);
+ if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD)
+ ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote);
else
- ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
+ ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
}
#ifdef USE_VM_PROBES
*node_name = *sender_name = *remote_name = '\0';
if (DTRACE_ENABLED(process_exit_signal_remote)) {
erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)),
- "%T", dsdp->dep->sysname);
+ "%T", ctx->dep->sysname);
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
"%T", sender->common.id);
erts_snprintf(remote_name, sizeof(DTRACE_CHARBUF_NAME(remote_name)),
@@ -1250,39 +1212,30 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
#endif
DTRACE7(process_exit_signal_remote, sender_name, node_name,
remote_name, reason_str, tok_label, tok_lastcnt, tok_serial);
- res = dsig_send_exit(dsdp, ctl, msg, local, 1);
- UnUseTmpHeapNoproc(6);
- return res;
+ return dsig_send_exit(ctx, ctl, msg);
}
int
-erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm from, Eterm local, Eterm remote, Eterm reason)
+erts_dsig_send_exit(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, Eterm reason)
{
- DeclareTmpHeapNoproc(ctl_heap,5);
- int res;
- Eterm ctl, msg = dsdp->dep->flags & DFLAG_EXIT_PAYLOAD ? reason : THE_NON_VALUE;
-
- UseTmpHeapNoproc(5);
+ Eterm ctl, msg = ctx->dep->flags & DFLAG_EXIT_PAYLOAD ? reason : THE_NON_VALUE;
- if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
- ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote);
+ if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote);
msg = reason;
} else {
- ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
+ ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
msg = THE_NON_VALUE;
}
- res = dsig_send_exit(dsdp, ctl, msg, from, 1);
- UnUseTmpHeapNoproc(5);
- return res;
+ return dsig_send_exit(ctx, ctl, msg);
}
int
-erts_dsig_send_exit2(ErtsSendContext *ctx, Eterm local, Eterm remote, Eterm reason)
+erts_dsig_send_exit2(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, Eterm reason)
{
- int res;
Eterm ctl, msg;
- if (ctx->dsd.dep->flags & DFLAG_EXIT_PAYLOAD) {
+ if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) {
ctl = TUPLE3(&ctx->ctl_heap[0],
make_small(DOP_PAYLOAD_EXIT2), local, remote);
msg = reason;
@@ -1292,25 +1245,19 @@ erts_dsig_send_exit2(ErtsSendContext *ctx, Eterm local, Eterm remote, Eterm reas
msg = THE_NON_VALUE;
}
- res = dsig_send_exit_ctx(ctx, ctl, msg);
- return res;
+ return dsig_send_exit(ctx, ctl, msg);
}
int
-erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
+erts_dsig_send_group_leader(ErtsDSigSendContext *ctx, Eterm leader, Eterm remote)
{
- DeclareTmpHeapNoproc(ctl_heap,4);
- int res;
Eterm ctl;
- UseTmpHeapNoproc(4);
- ctl = TUPLE3(&ctl_heap[0],
+ ctl = TUPLE3(&ctx->ctl_heap[0],
make_small(DOP_GROUP_LEADER), leader, remote);
- res = dsig_send_ctl(dsdp, ctl, 0);
- UnUseTmpHeapNoproc(4);
- return res;
+ return dsig_send_ctl(ctx, ctl);
}
struct dist_sequences {
@@ -1666,7 +1613,7 @@ int erts_net_message(Port *prt,
switch (type = unsigned_val(tuple[1])) {
case DOP_LINK: {
- ErtsDSigData dsd;
+ ErtsDSigSendContext ctx;
int code;
if (tuple_arity != 3) {
@@ -1704,9 +1651,9 @@ int erts_net_message(Port *prt,
erts_link_release_both(ldp);
}
- code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0);
+ code = erts_dsig_prepare(&ctx, dep, NULL, 0, ERTS_DSP_NO_LOCK, 1, 1, 0);
if (code == ERTS_DSIG_PREP_CONNECTED) {
- code = erts_dsig_send_exit(&dsd, to, to, from, am_noproc);
+ code = erts_dsig_send_exit(&ctx, to, from, am_noproc);
ASSERT(code == ERTS_DSIG_SEND_OK);
}
@@ -1739,7 +1686,7 @@ int erts_net_message(Port *prt,
/* A remote process wants to monitor us, we get:
{DOP_MONITOR_P, Remote pid, local pid or name, ref} */
Eterm pid, name;
- ErtsDSigData dsd;
+ ErtsDSigSendContext ctx;
int code;
if (tuple_arity != 4) {
@@ -1794,10 +1741,9 @@ int erts_net_message(Port *prt,
}
- code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0);
+ code = erts_dsig_prepare(&ctx, dep, NULL, 0, ERTS_DSP_NO_LOCK, 1, 1, 0);
if (code == ERTS_DSIG_PREP_CONNECTED) {
- code = erts_dsig_send_m_exit(&dsd, pid, watcher, watched, ref,
- am_noproc);
+ code = erts_dsig_send_m_exit(&ctx, watcher, watched, ref, am_noproc);
ASSERT(code == ERTS_DSIG_SEND_OK);
}
@@ -2152,45 +2098,21 @@ data_error:
return -1;
}
-static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, Eterm from, int force_busy)
+static int dsig_send_exit(ErtsDSigSendContext *ctx, Eterm ctl, Eterm msg)
{
- struct erts_dsig_send_context ctx;
- int ret;
- ctx.ctl = ctl;
- ctx.msg = msg;
- ctx.from = from;
- ctx.force_busy = force_busy;
- ctx.force_encode = force_busy;
- ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
- ctx.reds = 1; /* provoke assert below (no reduction count with force_encode) */
- ret = erts_dsig_send(dsdp, &ctx);
- ASSERT(ret != ERTS_DSIG_SEND_CONTINUE);
- return ret;
+ ctx->ctl = ctl;
+ ctx->msg = msg;
+ return erts_dsig_send(ctx);
}
-static int dsig_send_exit_ctx(ErtsSendContext *ctx, Eterm ctl, Eterm msg)
+static int dsig_send_ctl(ErtsDSigSendContext *ctx, Eterm ctl)
{
int ret;
- ctx->dss.ctl = ctl;
- ctx->dss.msg = msg;
- ctx->dss.force_busy = 0;
- ctx->dss.force_encode = 0;
- ret = erts_dsig_send(&ctx->dsd, &ctx->dss);
- return ret;
-}
-
-static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
-{
- struct erts_dsig_send_context ctx;
- int ret;
- ctx.ctl = ctl;
- ctx.msg = THE_NON_VALUE;
- ctx.from = THE_NON_VALUE;
- ctx.force_busy = force_busy;
- ctx.force_encode = 1;
- ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
- ctx.reds = 1; /* provoke assert below (no reduction count without msg) */
- ret = erts_dsig_send(dsdp, &ctx);
+ ctx->ctl = ctl;
+ ctx->msg = THE_NON_VALUE;
+ ctx->from = THE_NON_VALUE;
+ ctx->reds = 1; /* provoke assert below (no reduction count without msg) */
+ ret = erts_dsig_send(ctx);
ASSERT(ret != ERTS_DSIG_SEND_CONTINUE);
return ret;
}
@@ -2221,7 +2143,117 @@ notify_dist_data(Process *c_p, Eterm pid)
}
int
-erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
+erts_dsig_prepare(ErtsDSigSendContext *ctx,
+ DistEntry *dep,
+ Process *proc,
+ ErtsProcLocks proc_locks,
+ ErtsDSigPrepLock dspl,
+ int no_suspend,
+ int no_trap,
+ int connect)
+{
+ int res;
+
+ if (!erts_is_alive)
+ return ERTS_DSIG_PREP_NOT_ALIVE;
+ if (!dep) {
+ ASSERT(!connect);
+ return ERTS_DSIG_PREP_NOT_CONNECTED;
+ }
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ if (connect) {
+ erts_proc_lc_might_unlock(proc, proc_locks);
+ }
+#endif
+
+retry:
+ erts_de_rlock(dep);
+
+ if (dep->state == ERTS_DE_STATE_CONNECTED) {
+ res = ERTS_DSIG_PREP_CONNECTED;
+ }
+ else if (dep->state == ERTS_DE_STATE_PENDING) {
+ res = ERTS_DSIG_PREP_PENDING;
+ }
+ else if (dep->state == ERTS_DE_STATE_EXITING) {
+ res = ERTS_DSIG_PREP_NOT_CONNECTED;
+ goto fail;
+ }
+ else if (connect) {
+ ASSERT(dep->state == ERTS_DE_STATE_IDLE);
+ erts_de_runlock(dep);
+ if (!erts_auto_connect(dep, proc, proc_locks)) {
+ return ERTS_DSIG_PREP_NOT_ALIVE;
+ }
+ goto retry;
+ }
+ else {
+ ASSERT(dep->state == ERTS_DE_STATE_IDLE);
+ res = ERTS_DSIG_PREP_NOT_CONNECTED;
+ goto fail;
+ }
+
+ if (no_suspend) {
+ if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) {
+ res = ERTS_DSIG_PREP_WOULD_SUSPEND;
+ goto fail;
+ }
+ }
+
+ ctx->c_p = proc;
+ ctx->dep = dep;
+ ctx->deref_dep = 0;
+ ctx->cid = dep->cid;
+ ctx->connection_id = dep->connection_id;
+ ctx->no_suspend = no_suspend;
+ ctx->no_trap = no_trap;
+ ctx->flags = dep->flags;
+ ctx->return_term = am_true;
+ ctx->phase = ERTS_DSIG_SEND_PHASE_INIT;
+ ctx->from = proc ? proc->common.id : am_undefined;
+ ctx->reds = no_trap ? 1 : (Sint) (ERTS_BIF_REDS_LEFT(proc) * TERM_TO_BINARY_LOOP_FACTOR);
+ if (dspl == ERTS_DSP_NO_LOCK)
+ erts_de_runlock(dep);
+ return res;
+
+ fail:
+ erts_de_runlock(dep);
+ return res;
+}
+
+static
+void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry)
+{
+ DistEntry *dep;
+ Eterm id;
+
+ if (prt) {
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ASSERT((erts_atomic32_read_nob(&prt->state)
+ & ERTS_PORT_SFLGS_DEAD) == 0);
+
+ dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ ASSERT(dep);
+ id = prt->common.id;
+ }
+ else {
+ ASSERT(dist_entry);
+ ERTS_LC_ASSERT(erts_lc_rwmtx_is_rlocked(&dist_entry->rwmtx)
+ || erts_lc_rwmtx_is_rwlocked(&dist_entry->rwmtx));
+ ASSERT(is_internal_port(dist_entry->cid));
+
+ dep = dist_entry;
+ id = dep->cid;
+ }
+
+ if (!erts_atomic_xchg_mb(&dep->dist_cmd_scheduled, 1))
+ erts_port_task_schedule(id, &dep->dist_cmd, ERTS_PORT_TASK_DIST_CMD);
+}
+
+
+int
+erts_dsig_send(ErtsDSigSendContext *ctx)
{
int retval;
Sint initial_reds = ctx->reds;
@@ -2230,11 +2262,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
while (1) {
switch (ctx->phase) {
case ERTS_DSIG_SEND_PHASE_INIT:
- ctx->flags = dsdp->flags;
- ctx->c_p = dsdp->proc;
+ ctx->flags = ctx->flags;
+ ctx->c_p = ctx->c_p;
- if (!ctx->c_p || dsdp->no_suspend)
- ctx->force_busy = 1;
+ if (!ctx->c_p) {
+ ctx->no_trap = 1;
+ ctx->no_suspend = 1;
+ }
ERTS_LC_ASSERT(!ctx->c_p
|| (ERTS_PROC_LOCK_MAIN
@@ -2274,7 +2308,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
- if (!ctx->force_encode) {
+ if (!ctx->no_trap) {
if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
retval = ERTS_DSIG_SEND_CONTINUE;
goto done;
@@ -2326,7 +2360,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE;
case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
- if (!ctx->force_encode) {
+ if (!ctx->no_trap) {
if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags,
ctx->acmp, &ctx->u.ec, &ctx->reds)) {
retval = ERTS_DSIG_SEND_CONTINUE;
@@ -2397,7 +2431,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
for (; i < ctx->fragments; i++) {
free_dist_obuf(&ctx->obuf[i]);
}
- if (!ctx->force_encode && !ctx->force_busy)
+ if (!ctx->no_trap && !ctx->no_suspend)
ctx->reds -= ctx->fragments;
ctx->fragments = fin_fragments + 1;
}
@@ -2414,7 +2448,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
* Signal encoded; now verify that the connection still exists,
* and if so enqueue the signal and schedule it for send.
*/
- DistEntry *dep = dsdp->dep;
+ DistEntry *dep = ctx->dep;
int suspended = 0;
int resume = 0;
int i;
@@ -2422,7 +2456,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
cid = dep->cid;
if (dep->state == ERTS_DE_STATE_EXITING
|| dep->state == ERTS_DE_STATE_IDLE
- || dep->connection_id != dsdp->connection_id) {
+ || dep->connection_id != ctx->connection_id) {
/* Not the same connection as when we started; drop message... */
erts_de_runlock(dep);
for (i = 0; i < ctx->fragments; i++)
@@ -2443,20 +2477,20 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
for (fragments = 0, obsz = 0;
fragments < ctx->fragments &&
((ctx->reds > 0 && (qsize + obsz) < erts_dist_buf_busy_limit) ||
- ctx->force_encode || ctx->force_busy);
+ ctx->no_trap || ctx->no_suspend);
fragments++) {
#ifdef DEBUG
int reds = 100;
#else
int reds = 10;
#endif
- if (!ctx->force_encode && !ctx->force_busy)
+ if (!ctx->no_trap && !ctx->no_suspend)
ctx->reds -= reds;
obsz += size_obuf(&ctx->obuf[fragments]);
}
ASSERT(fragments == ctx->fragments ||
- (!ctx->force_encode && !ctx->force_busy));
+ (!ctx->no_trap && !ctx->no_suspend));
erts_mtx_lock(&dep->qlock);
qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz);
@@ -2477,7 +2511,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
/* else: requester will send itself the message... */
qflgs &= ~ERTS_DE_QFLG_REQ_INFO;
}
- if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) {
+ if (!ctx->no_suspend && (qflgs & ERTS_DE_QFLG_BUSY)) {
erts_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(ctx->c_p);
@@ -2506,7 +2540,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->obuf = &ctx->obuf[fragments];
}
- if (!ctx->force_busy) {
+ if (!ctx->no_suspend) {
qflgs = erts_atomic32_read_nob(&dep->qflgs);
if (!(qflgs & ERTS_DE_QFLG_BUSY)) {
if (suspended)
@@ -4352,16 +4386,16 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options)
}
case am_true: {
- ErtsDSigData dsd;
- dsd.node = Node;
+ ErtsDSigSendContext ctx;
+ ctx.node = Node;
dep = erts_find_or_insert_dist_entry(Node);
if (dep == erts_this_dist_entry)
break;
- switch (erts_dsig_prepare(&dsd, dep, p,
+ switch (erts_dsig_prepare(&ctx, dep, p,
ERTS_PROC_LOCK_MAIN,
- ERTS_DSP_RLOCK, 0, async_connect)) {
+ ERTS_DSP_RLOCK, 0, 0, async_connect)) {
case ERTS_DSIG_PREP_NOT_ALIVE:
case ERTS_DSIG_PREP_NOT_CONNECTED:
/* Trap to either send 'nodedown' or do passive connection attempt */