From 1066040c35e96566e415c50042345fba865f10c8 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Tue, 29 Jan 2019 10:19:47 +0100 Subject: erts: Refactor ErtsSendContext to be ErtsDSigSendContext This commit removed the general send context (which was used very little anyways) and only uses the distributed send context. This will make it easier to use the dist API at the cost of a little bit more code for the local send. --- erts/emulator/beam/bif.c | 173 +++++++-------- erts/emulator/beam/dist.c | 416 +++++++++++++++++++---------------- erts/emulator/beam/dist.h | 201 ++++------------- erts/emulator/beam/erl_node_tables.c | 2 +- erts/emulator/beam/erl_process.c | 32 ++- erts/emulator/beam/external.c | 2 +- 6 files changed, 360 insertions(+), 466 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index eb44d5632a..3cf0a02679 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -184,7 +184,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) DistEntry *dep; ErtsLink *lnk; int code; - ErtsDSigData dsd; + ErtsDSigSendContext ctx; dep = external_pid_dist_entry(BIF_ARG_1); if (dep == erts_this_dist_entry) @@ -201,9 +201,9 @@ BIF_RETTYPE link_1(BIF_ALIST_1) ldp = erts_link_to_data(lnk); - code = erts_dsig_prepare(&dsd, dep, BIF_P, + code = erts_dsig_prepare(&ctx, dep, BIF_P, ERTS_PROC_LOCK_MAIN, - ERTS_DSP_RLOCK, 0, 1); + ERTS_DSP_RLOCK, 0, 1, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: @@ -222,7 +222,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) ASSERT(inserted); (void)inserted; erts_de_runlock(dep); - code = erts_dsig_send_link(&dsd, BIF_P->common.id, BIF_ARG_1); + code = erts_dsig_send_link(&ctx, BIF_P->common.id, BIF_ARG_1); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); ASSERT(code == ERTS_DSIG_SEND_OK); @@ -307,7 +307,7 @@ demonitor(Process *c_p, Eterm ref, Eterm *multip) DistEntry *dep; int code = ERTS_DSIG_SEND_OK; int deleted; - ErtsDSigData dsd; + ErtsDSigSendContext ctx; ASSERT(is_external_pid(to) || is_node_name_atom(to)); @@ -323,8 +323,8 @@ demonitor(Process *c_p, Eterm ref, Eterm *multip) } } - code = erts_dsig_prepare(&dsd, dep, c_p, ERTS_PROC_LOCK_MAIN, - ERTS_DSP_RLOCK, 0, 0); + code = erts_dsig_prepare(&ctx, dep, c_p, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_RLOCK, 0, 1, 0); deleted = erts_monitor_dist_delete(&mdp->target); @@ -353,8 +353,8 @@ demonitor(Process *c_p, Eterm ref, Eterm *multip) * monitor list since in case of monitor name * the atom is stored there. Yield if necessary. */ - code = erts_dsig_send_demonitor(&dsd, c_p->common.id, - watched, mdp->ref, 0); + code = erts_dsig_send_demonitor(&ctx, c_p->common.id, + watched, mdp->ref); break; } @@ -534,7 +534,7 @@ BIF_RETTYPE monitor_2(BIF_ALIST_2) } if (is_external_pid(target)) { - ErtsDSigData dsd; + ErtsDSigSendContext ctx; int code; dep = external_pid_dist_entry(target); @@ -552,9 +552,9 @@ BIF_RETTYPE monitor_2(BIF_ALIST_2) BIF_P->common.id, id, name); erts_monitor_tree_insert(&ERTS_P_MONITORS(BIF_P), &mdp->origin); - code = erts_dsig_prepare(&dsd, dep, + code = erts_dsig_prepare(&ctx, dep, BIF_P, ERTS_PROC_LOCK_MAIN, - ERTS_DSP_RLOCK, 0, 1); + ERTS_DSP_RLOCK, 0, 1, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: @@ -569,7 +569,7 @@ BIF_RETTYPE monitor_2(BIF_ALIST_2) ASSERT(inserted); (void)inserted; erts_de_runlock(dep); - code = erts_dsig_send_monitor(&dsd, BIF_P->common.id, target, ref); + code = erts_dsig_send_monitor(&ctx, BIF_P->common.id, target, ref); break; } @@ -915,7 +915,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) ErtsLinkData *ldp; DistEntry *dep; int code; - ErtsDSigData dsd; + ErtsDSigSendContext ctx; dep = external_pid_dist_entry(BIF_ARG_1); if (dep == erts_this_dist_entry) @@ -933,15 +933,15 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) else erts_link_release(lnk); - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, - ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&ctx, dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 1, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: BIF_RET(am_true); case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: - code = erts_dsig_send_unlink(&dsd, BIF_P->common.id, BIF_ARG_1); + code = erts_dsig_send_unlink(&ctx, BIF_P->common.id, BIF_ARG_1); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); break; @@ -1302,18 +1302,11 @@ static BIF_RETTYPE send_exit_signal_bif(Process *c_p, Eterm id, Eterm reason, in ERTS_BIF_PREP_RET(ret_val, am_true); /* Old incarnation of this node... */ else { int code; - ErtsSendContext ctx; - - ctx.suspend = !0; - ctx.connect = !0; - ctx.deref_dep = 0; - ctx.return_term = am_true; - ctx.dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(c_p) * TERM_TO_BINARY_LOOP_FACTOR); - ctx.dss.phase = ERTS_DSIG_SEND_PHASE_INIT; - ctx.dss.from = c_p->common.id; - - code = erts_dsig_prepare(&ctx.dsd, dep, c_p, ERTS_PROC_LOCK_MAIN, - ERTS_DSP_NO_LOCK, 0, 1); + ErtsDSigSendContext ctx; + + code = erts_dsig_prepare(&ctx, dep, c_p, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 0, 1); + switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: @@ -1827,33 +1820,36 @@ ebif_bang_2(BIF_ALIST_2) static Sint remote_send(Process *p, DistEntry *dep, - Eterm to, Eterm full_to, Eterm msg, - ErtsSendContext* ctx) + Eterm to, Eterm node, Eterm full_to, Eterm msg, + Eterm return_term, Eterm *ctxpp, + int connect, int suspend) { Sint res; int code; + ErtsDSigSendContext ctx; ASSERT(is_atom(to) || is_external_pid(to)); - ctx->dep = dep; - code = erts_dsig_prepare(&ctx->dsd, dep, p, ERTS_PROC_LOCK_MAIN, + code = erts_dsig_prepare(&ctx, dep, p, ERTS_PROC_LOCK_MAIN, ERTS_DSP_NO_LOCK, - !ctx->suspend, ctx->connect); + !suspend, 0, connect); + ctx.return_term = return_term; + ctx.node = node; switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: res = SEND_NOCONNECT; break; case ERTS_DSIG_PREP_WOULD_SUSPEND: - ASSERT(!ctx->suspend); + ASSERT(!suspend); res = SEND_YIELD; break; case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: { if (is_atom(to)) - code = erts_dsig_send_reg_msg(to, msg, ctx); + code = erts_dsig_send_reg_msg(&ctx, to, msg); else - code = erts_dsig_send_msg(to, msg, ctx); + code = erts_dsig_send_msg(&ctx, to, msg); /* * Note that reductions have been bumped on calling * process by erts_dsig_send_reg_msg() or @@ -1861,9 +1857,19 @@ static Sint remote_send(Process *p, DistEntry *dep, */ if (code == ERTS_DSIG_SEND_YIELD) res = SEND_YIELD_RETURN; - else if (code == ERTS_DSIG_SEND_CONTINUE) + else if (code == ERTS_DSIG_SEND_CONTINUE) { + erts_set_gc_state(p, 0); + + /* Keep a reference to the dist entry if the + name is an not a pid. */ + if (is_atom(to)) { + erts_ref_dist_entry(ctx.dep); + ctx.deref_dep = 1; + } + + *ctxpp = erts_dsend_export_trap_context(p, &ctx); res = SEND_YIELD_CONTINUE; - else if (code == ERTS_DSIG_SEND_TOO_LRG) + } else if (code == ERTS_DSIG_SEND_TOO_LRG) res = SEND_SYSTEM_LIMIT; else res = 0; @@ -1885,7 +1891,8 @@ static Sint remote_send(Process *p, DistEntry *dep, } static Sint -do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) +do_send(Process *p, Eterm to, Eterm msg, Eterm return_term, Eterm *refp, + Eterm *dist_ctx, int connect, int suspend) { Eterm portid; Port *pt; @@ -1917,7 +1924,8 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) erts_send_error_to_logger(p->group_leader, dsbufp); return 0; } - return remote_send(p, dep, to, to, msg, ctx); + return remote_send(p, dep, to, dep->sysname, to, msg, return_term, + dist_ctx, connect, suspend); } else if (is_atom(to)) { Eterm id = erts_whereis_name_to_id(p, to); @@ -1972,7 +1980,7 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) ret_val = 0; if (pt) { - int ps_flags = ctx->suspend ? 0 : ERTS_PORT_SIG_FLG_NOSUSPEND; + int ps_flags = suspend ? 0 : ERTS_PORT_SIG_FLG_NOSUSPEND; *refp = NIL; if (IS_TRACED_FL(p, F_TRACE_SEND)) /* trace once only !! */ @@ -1987,12 +1995,12 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) switch (erts_port_command(p, ps_flags, pt, msg, refp)) { case ERTS_PORT_OP_BUSY: /* Nothing has been sent */ - if (ctx->suspend) + if (suspend) erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt); return SEND_YIELD; case ERTS_PORT_OP_BUSY_SCHEDULED: /* Message was sent */ - if (ctx->suspend) { + if (suspend) { erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt); ret_val = SEND_YIELD_RETURN; break; @@ -2063,13 +2071,10 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) ASSERT(dep != erts_this_dist_entry); deref_dep = 1; } - ctx->dsd.node = tp[2]; - ret = remote_send(p, dep, tp[1], to, msg, ctx); - if (ret == SEND_YIELD_CONTINUE) { - erts_ref_dist_entry(ctx->dep); - ctx->deref_dep = 1; - } + ret = remote_send(p, dep, tp[1], tp[2], to, msg, return_term, + dist_ctx, connect, suspend); + if (deref_dep) erts_deref_dist_entry(dep); return ret; @@ -2108,26 +2113,16 @@ BIF_RETTYPE send_3(BIF_ALIST_3) Eterm l = opts; Sint result; - - DeclareTypedTmpHeap(ErtsSendContext, ctx, BIF_P); + int connect = 1, suspend = 1; + Eterm ctx; ERTS_MSACC_PUSH_STATE_M_X(); - UseTmpHeap(sizeof(ErtsSendContext)/sizeof(Eterm), BIF_P); - - ctx->suspend = !0; - ctx->connect = !0; - ctx->deref_dep = 0; - ctx->return_term = am_ok; - ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); - ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; - ctx->dss.from = BIF_P->common.id; - while (is_list(l)) { if (CAR(list_val(l)) == am_noconnect) { - ctx->connect = 0; + connect = 0; } else if (CAR(list_val(l)) == am_nosuspend) { - ctx->suspend = 0; + suspend = 0; } else { ERTS_BIF_PREP_ERROR(retval, p, BADARG); goto done; @@ -2144,7 +2139,7 @@ BIF_RETTYPE send_3(BIF_ALIST_3) #endif ERTS_MSACC_SET_STATE_CACHED_M_X(ERTS_MSACC_STATE_SEND); - result = do_send(p, to, msg, &ref, ctx); + result = do_send(p, to, msg, am_ok, &ref, &ctx, connect, suspend); ERTS_MSACC_POP_STATE_M_X(); if (result >= 0) { @@ -2157,22 +2152,21 @@ BIF_RETTYPE send_3(BIF_ALIST_3) switch (result) { case SEND_NOCONNECT: - if (ctx->connect) { + if (connect) { ERTS_BIF_PREP_RET(retval, am_ok); } else { ERTS_BIF_PREP_RET(retval, am_noconnect); } break; case SEND_YIELD: - if (ctx->suspend) { - ERTS_BIF_PREP_YIELD3(retval, - bif_export[BIF_send_3], p, to, msg, opts); + if (suspend) { + ERTS_BIF_PREP_YIELD3(retval, bif_export[BIF_send_3], p, to, msg, opts); } else { ERTS_BIF_PREP_RET(retval, am_nosuspend); } break; case SEND_YIELD_RETURN: - if (!ctx->suspend) { + if (!suspend) { ERTS_BIF_PREP_RET(retval, am_nosuspend); break; } @@ -2197,9 +2191,7 @@ BIF_RETTYPE send_3(BIF_ALIST_3) break; case SEND_YIELD_CONTINUE: BUMP_ALL_REDS(p); - erts_set_gc_state(p, 0); - ERTS_BIF_PREP_TRAP1(retval, &dsend_continue_trap_export, p, - erts_dsend_export_trap_context(p, ctx)); + ERTS_BIF_PREP_TRAP1(retval, &dsend_continue_trap_export, p, ctx); break; default: erts_exit(ERTS_ABORT_EXIT, "send_3 invalid result %d\n", (int)result); @@ -2207,7 +2199,6 @@ BIF_RETTYPE send_3(BIF_ALIST_3) } done: - UnUseTmpHeap(sizeof(ErtsSendContext)/sizeof(Eterm), BIF_P); return retval; } @@ -2221,14 +2212,14 @@ BIF_RETTYPE send_2(BIF_ALIST_2) static BIF_RETTYPE dsend_continue_trap_1(BIF_ALIST_1) { Binary* bin = erts_magic_ref2bin(BIF_ARG_1); - ErtsSendContext* ctx = (ErtsSendContext*) ERTS_MAGIC_BIN_DATA(bin); + ErtsDSigSendContext *ctx = (ErtsDSigSendContext*) ERTS_MAGIC_BIN_DATA(bin); Sint initial_reds = (Sint) (ERTS_BIF_REDS_LEFT(BIF_P) * TERM_TO_BINARY_LOOP_FACTOR); int result; ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_dsend_context_dtor); - ctx->dss.reds = initial_reds; - result = erts_dsig_send(&ctx->dsd, &ctx->dss); + ctx->reds = initial_reds; + result = erts_dsig_send(ctx); switch (result) { case ERTS_DSIG_SEND_OK: @@ -2237,7 +2228,7 @@ static BIF_RETTYPE dsend_continue_trap_1(BIF_ALIST_1) break; case ERTS_DSIG_SEND_YIELD: /*SEND_YIELD_RETURN*/ erts_set_gc_state(BIF_P, 1); - if (!ctx->suspend) + if (ctx->no_suspend) BIF_RET(am_nosuspend); ERTS_BIF_YIELD_RETURN(BIF_P, ctx->return_term); @@ -2262,21 +2253,14 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) Eterm retval; Eterm ref; Sint result; - DeclareTypedTmpHeap(ErtsSendContext, ctx, p); + Eterm ctx; ERTS_MSACC_PUSH_AND_SET_STATE_M_X(ERTS_MSACC_STATE_SEND); - UseTmpHeap(sizeof(ErtsSendContext)/sizeof(Eterm), p); + #ifdef DEBUG ref = NIL; #endif - ctx->suspend = !0; - ctx->connect = !0; - ctx->deref_dep = 0; - ctx->return_term = msg; - ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); - ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; - ctx->dss.from = p->common.id; - result = do_send(p, to, msg, &ref, ctx); + result = do_send(p, to, msg, msg, &ref, &ctx, 1, 1); ERTS_MSACC_POP_STATE_M_X(); @@ -2318,9 +2302,7 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) break; case SEND_YIELD_CONTINUE: BUMP_ALL_REDS(p); - erts_set_gc_state(p, 0); - ERTS_BIF_PREP_TRAP1(retval, &dsend_continue_trap_export, p, - erts_dsend_export_trap_context(p, ctx)); + ERTS_BIF_PREP_TRAP1(retval, &dsend_continue_trap_export, p, ctx); break; default: erts_exit(ERTS_ABORT_EXIT, "invalid send result %d\n", (int)result); @@ -2328,7 +2310,6 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) } done: - UnUseTmpHeap(sizeof(ErtsSendContext)/sizeof(Eterm), p); return retval; } @@ -4385,21 +4366,21 @@ BIF_RETTYPE erts_internal_group_leader_2(BIF_ALIST_2) if (is_external_pid(BIF_ARG_2)) { DistEntry *dep; int code; - ErtsDSigData dsd; + ErtsDSigSendContext ctx; dep = external_pid_dist_entry(BIF_ARG_2); ERTS_ASSERT(dep); if(dep == erts_this_dist_entry) BIF_ERROR(BIF_P, BADARG); - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, - ERTS_DSP_NO_LOCK, 0, 1); + code = erts_dsig_prepare(&ctx, dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 1, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: BIF_RET(am_true); case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: - code = erts_dsig_send_group_leader(&dsd, BIF_ARG_1, BIF_ARG_2); + code = erts_dsig_send_group_leader(&ctx, BIF_ARG_1, BIF_ARG_2); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 749760a1b3..7856d8003c 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -177,9 +177,9 @@ static Export *dist_ctrl_put_data_trap; /* forward declarations */ -static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, Eterm from, int force_busy); -static int dsig_send_exit_ctx(ErtsSendContext *ctx, Eterm ctl, Eterm msg); -static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); +static void erts_schedule_dist_command(Port *, DistEntry *); +static int dsig_send_exit(ErtsDSigSendContext *ctx, Eterm ctl, Eterm msg); +static int dsig_send_ctl(ErtsDSigSendContext *ctx, Eterm ctl); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); static Sint abort_connection(DistEntry* dep, Uint32 conn_id); @@ -860,20 +860,20 @@ static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) int erts_dsend_context_dtor(Binary* ctx_bin) { - ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); - switch (ctx->dss.phase) { + ErtsDSigSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); + switch (ctx->phase) { case ERTS_DSIG_SEND_PHASE_MSG_SIZE: - DESTROY_SAVED_WSTACK(&ctx->dss.u.sc.wstack); + DESTROY_SAVED_WSTACK(&ctx->u.sc.wstack); break; case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: - DESTROY_SAVED_WSTACK(&ctx->dss.u.ec.wstack); + DESTROY_SAVED_WSTACK(&ctx->u.ec.wstack); break; default:; } - if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) { + if (ctx->phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->obuf) { int i; - for (i = 0; i < ctx->dss.fragments; i++) - free_dist_obuf(&ctx->dss.obuf[i]); + for (i = 0; i < ctx->fragments; i++) + free_dist_obuf(&ctx->obuf[i]); } if (ctx->deref_dep) erts_deref_dist_entry(ctx->dep); @@ -881,10 +881,10 @@ int erts_dsend_context_dtor(Binary* ctx_bin) return 1; } -Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx) +Eterm erts_dsend_export_trap_context(Process* p, ErtsDSigSendContext* ctx) { struct exported_ctx { - ErtsSendContext ctx; + ErtsDSigSendContext ctx; ErtsAtomCacheMap acm; }; Binary* ctx_bin = erts_create_magic_binary(sizeof(struct exported_ctx), @@ -892,12 +892,12 @@ Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx) struct exported_ctx* dst = ERTS_MAGIC_BIN_DATA(ctx_bin); Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE); - sys_memcpy(&dst->ctx, ctx, sizeof(ErtsSendContext)); - ASSERT(ctx->dss.ctl == make_tuple(ctx->ctl_heap)); - dst->ctx.dss.ctl = make_tuple(dst->ctx.ctl_heap); - if (ctx->dss.acmp) { - sys_memcpy(&dst->acm, ctx->dss.acmp, sizeof(ErtsAtomCacheMap)); - dst->ctx.dss.acmp = &dst->acm; + sys_memcpy(&dst->ctx, ctx, sizeof(ErtsDSigSendContext)); + ASSERT(ctx->ctl == make_tuple(ctx->ctl_heap)); + dst->ctx.ctl = make_tuple(dst->ctx.ctl_heap); + if (ctx->acmp) { + sys_memcpy(&dst->acm, ctx->acmp, sizeof(ErtsAtomCacheMap)); + dst->ctx.acmp = &dst->acm; } return erts_mk_magic_ref(&hp, &MSO(p), ctx_bin); } @@ -918,78 +918,58 @@ Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx) ** Send a DOP_LINK link message */ int -erts_dsig_send_link(ErtsDSigData *dsdp, Eterm local, Eterm remote) +erts_dsig_send_link(ErtsDSigSendContext *ctx, Eterm local, Eterm remote) { - DeclareTmpHeapNoproc(ctl_heap,4); - Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_LINK), local, remote); - int res; - UseTmpHeapNoproc(4); - - res = dsig_send_ctl(dsdp, ctl, 0); - UnUseTmpHeapNoproc(4); - return res; + Eterm ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_LINK), local, remote); + return dsig_send_ctl(ctx, ctl); } int -erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote) +erts_dsig_send_unlink(ErtsDSigSendContext *ctx, Eterm local, Eterm remote) { - DeclareTmpHeapNoproc(ctl_heap,4); - Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_UNLINK), local, remote); - int res; - - UseTmpHeapNoproc(4); - res = dsig_send_ctl(dsdp, ctl, 0); - UnUseTmpHeapNoproc(4); - return res; + Eterm ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_UNLINK), local, remote); + return dsig_send_ctl(ctx, ctl); } /* A local process that's being monitored by a remote one exits. We send: {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason} */ int -erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm from, Eterm watcher, Eterm watched, +erts_dsig_send_m_exit(ErtsDSigSendContext *ctx, Eterm watcher, Eterm watched, Eterm ref, Eterm reason) { Eterm ctl, msg; - DeclareTmpHeapNoproc(ctl_heap,6); - int res; - if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~ctx->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_MONITOR_P_EXIT (see dsig_send_monitor) */ return ERTS_DSIG_SEND_OK; } - UseTmpHeapNoproc(6); - - if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) { - ctl = TUPLE4(&ctl_heap[0], make_small(DOP_PAYLOAD_MONITOR_P_EXIT), + if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) { + ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_MONITOR_P_EXIT), watched, watcher, ref); msg = reason; } else { - ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT), + ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_MONITOR_P_EXIT), watched, watcher, ref, reason); msg = THE_NON_VALUE; } - res = dsig_send_exit(dsdp, ctl, msg, from, 1); - UnUseTmpHeapNoproc(6); - return res; + return dsig_send_exit(ctx, ctl, msg); } /* We want to monitor a process (named or unnamed) on another node, we send: {DOP_MONITOR_P, Local pid, Remote pid or name, Ref}, which is exactly what's needed on the other side... */ int -erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, +erts_dsig_send_monitor(ErtsDSigSendContext *ctx, Eterm watcher, Eterm watched, Eterm ref) { Eterm ctl; - DeclareTmpHeapNoproc(ctl_heap,5); - int res; - if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~ctx->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_MONITOR_P. * Just avoid sending it and by doing that reduce this monitor @@ -999,48 +979,40 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, return ERTS_DSIG_SEND_OK; } - UseTmpHeapNoproc(5); - ctl = TUPLE4(&ctl_heap[0], + ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_MONITOR_P), watcher, watched, ref); - res = dsig_send_ctl(dsdp, ctl, 0); - UnUseTmpHeapNoproc(5); - return res; + return dsig_send_ctl(ctx, ctl); } /* A local process monitoring a remote one wants to stop monitoring, either because of a demonitor bif call or because the local process died. We send {DOP_DEMONITOR_P, Local pid, Remote pid or name, ref} */ int -erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, - Eterm watched, Eterm ref, int force) +erts_dsig_send_demonitor(ErtsDSigSendContext *ctx, Eterm watcher, + Eterm watched, Eterm ref) { Eterm ctl; - DeclareTmpHeapNoproc(ctl_heap,5); - int res; - if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~ctx->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_DEMONITOR_P (see dsig_send_monitor) */ return ERTS_DSIG_SEND_OK; } - UseTmpHeapNoproc(5); - ctl = TUPLE4(&ctl_heap[0], + ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_DEMONITOR_P), watcher, watched, ref); - res = dsig_send_ctl(dsdp, ctl, force); - UnUseTmpHeapNoproc(5); - return res; + return dsig_send_ctl(ctx, ctl); } -static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) { +static int can_send_seqtrace_token(ErtsDSigSendContext* ctx, Eterm token) { Eterm label; - if (ctx->dsd.flags & DFLAG_BIG_SEQTRACE_LABELS) { + if (ctx->flags & DFLAG_BIG_SEQTRACE_LABELS) { /* The other end is capable of handling arbitrary seq_trace labels. */ return 1; } @@ -1056,11 +1028,11 @@ static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) { } int -erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) +erts_dsig_send_msg(ErtsDSigSendContext* ctx, Eterm remote, Eterm message) { Eterm ctl; Eterm token = NIL; - Process *sender = ctx->dsd.proc; + Process *sender = ctx->c_p; int res; #ifdef USE_VM_PROBES Sint tok_label = 0; @@ -1081,7 +1053,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) *node_name = *sender_name = *receiver_name = '\0'; if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) { erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)), - "%T", ctx->dsd.dep->sysname); + "%T", ctx->dep->sysname); erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), "%T", sender->common.id); erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), @@ -1101,7 +1073,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) send_token = (token != NIL && can_send_seqtrace_token(ctx, token)); - if (ctx->dsd.flags & DFLAG_SEND_SENDER) { + if (ctx->flags & DFLAG_SEND_SENDER) { dist_op = make_small(send_token ? DOP_SEND_SENDER_TT : DOP_SEND_SENDER); @@ -1124,21 +1096,18 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); - ctx->dss.ctl = ctl; - ctx->dss.msg = message; - ctx->dss.force_busy = 0; - ctx->dss.force_encode = 0; - res = erts_dsig_send(&ctx->dsd, &ctx->dss); + ctx->ctl = ctl; + ctx->msg = message; + res = erts_dsig_send(ctx); return res; } int -erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, ErtsSendContext* ctx) +erts_dsig_send_reg_msg(ErtsDSigSendContext* ctx, Eterm remote_name, Eterm message) { Eterm ctl; Eterm token = NIL; - Process *sender = ctx->dsd.proc; - int res; + Process *sender = ctx->c_p; #ifdef USE_VM_PROBES Sint tok_label = 0; Sint tok_lastcnt = 0; @@ -1158,7 +1127,7 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, ErtsSendContext* ctx) *node_name = *sender_name = *receiver_name = '\0'; if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) { erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)), - "%T", ctx->dsd.dep->sysname); + "%T", ctx->dep->sysname); erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), "%T", sender->common.id); erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), @@ -1183,24 +1152,19 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, ErtsSendContext* ctx) msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); - ctx->dss.ctl = ctl; - ctx->dss.msg = message; - ctx->dss.force_busy = 0; - ctx->dss.force_encode = 0; - res = erts_dsig_send(&ctx->dsd, &ctx->dss); - return res; + ctx->ctl = ctl; + ctx->msg = message; + return erts_dsig_send(ctx); } /* local has died, deliver the exit signal to remote */ int -erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, +erts_dsig_send_exit_tt(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, Eterm reason, Eterm token) { Eterm ctl, msg = THE_NON_VALUE; - DeclareTmpHeapNoproc(ctl_heap,6); - int res; #ifdef USE_VM_PROBES - Process *sender = dsdp->proc; + Process *sender = ctx->c_p; Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; @@ -1210,31 +1174,29 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, DTRACE_CHARBUF(reason_str, 128); #endif - UseTmpHeapNoproc(6); - - if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) + if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) msg = reason; if (have_seqtrace(token)) { - seq_trace_update_send(dsdp->proc); + seq_trace_update_send(ctx->c_p); seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local); - if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) { - ctl = TUPLE4(&ctl_heap[0], + if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) { + ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_EXIT_TT), local, remote, token); } else - ctl = TUPLE5(&ctl_heap[0], + ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_EXIT_TT), local, remote, token, reason); } else { - if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) - ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote); + if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) + ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote); else - ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); + ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); } #ifdef USE_VM_PROBES *node_name = *sender_name = *remote_name = '\0'; if (DTRACE_ENABLED(process_exit_signal_remote)) { erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)), - "%T", dsdp->dep->sysname); + "%T", ctx->dep->sysname); erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), "%T", sender->common.id); erts_snprintf(remote_name, sizeof(DTRACE_CHARBUF_NAME(remote_name)), @@ -1250,39 +1212,30 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, #endif DTRACE7(process_exit_signal_remote, sender_name, node_name, remote_name, reason_str, tok_label, tok_lastcnt, tok_serial); - res = dsig_send_exit(dsdp, ctl, msg, local, 1); - UnUseTmpHeapNoproc(6); - return res; + return dsig_send_exit(ctx, ctl, msg); } int -erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm from, Eterm local, Eterm remote, Eterm reason) +erts_dsig_send_exit(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, Eterm reason) { - DeclareTmpHeapNoproc(ctl_heap,5); - int res; - Eterm ctl, msg = dsdp->dep->flags & DFLAG_EXIT_PAYLOAD ? reason : THE_NON_VALUE; - - UseTmpHeapNoproc(5); + Eterm ctl, msg = ctx->dep->flags & DFLAG_EXIT_PAYLOAD ? reason : THE_NON_VALUE; - if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) { - ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote); + if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) { + ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote); msg = reason; } else { - ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); + ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); msg = THE_NON_VALUE; } - res = dsig_send_exit(dsdp, ctl, msg, from, 1); - UnUseTmpHeapNoproc(5); - return res; + return dsig_send_exit(ctx, ctl, msg); } int -erts_dsig_send_exit2(ErtsSendContext *ctx, Eterm local, Eterm remote, Eterm reason) +erts_dsig_send_exit2(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, Eterm reason) { - int res; Eterm ctl, msg; - if (ctx->dsd.dep->flags & DFLAG_EXIT_PAYLOAD) { + if (ctx->dep->flags & DFLAG_EXIT_PAYLOAD) { ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_PAYLOAD_EXIT2), local, remote); msg = reason; @@ -1292,25 +1245,19 @@ erts_dsig_send_exit2(ErtsSendContext *ctx, Eterm local, Eterm remote, Eterm reas msg = THE_NON_VALUE; } - res = dsig_send_exit_ctx(ctx, ctl, msg); - return res; + return dsig_send_exit(ctx, ctl, msg); } int -erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote) +erts_dsig_send_group_leader(ErtsDSigSendContext *ctx, Eterm leader, Eterm remote) { - DeclareTmpHeapNoproc(ctl_heap,4); - int res; Eterm ctl; - UseTmpHeapNoproc(4); - ctl = TUPLE3(&ctl_heap[0], + ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_GROUP_LEADER), leader, remote); - res = dsig_send_ctl(dsdp, ctl, 0); - UnUseTmpHeapNoproc(4); - return res; + return dsig_send_ctl(ctx, ctl); } struct dist_sequences { @@ -1666,7 +1613,7 @@ int erts_net_message(Port *prt, switch (type = unsigned_val(tuple[1])) { case DOP_LINK: { - ErtsDSigData dsd; + ErtsDSigSendContext ctx; int code; if (tuple_arity != 3) { @@ -1704,9 +1651,9 @@ int erts_net_message(Port *prt, erts_link_release_both(ldp); } - code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&ctx, dep, NULL, 0, ERTS_DSP_NO_LOCK, 1, 1, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_exit(&dsd, to, to, from, am_noproc); + code = erts_dsig_send_exit(&ctx, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); } @@ -1739,7 +1686,7 @@ int erts_net_message(Port *prt, /* A remote process wants to monitor us, we get: {DOP_MONITOR_P, Remote pid, local pid or name, ref} */ Eterm pid, name; - ErtsDSigData dsd; + ErtsDSigSendContext ctx; int code; if (tuple_arity != 4) { @@ -1794,10 +1741,9 @@ int erts_net_message(Port *prt, } - code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&ctx, dep, NULL, 0, ERTS_DSP_NO_LOCK, 1, 1, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_m_exit(&dsd, pid, watcher, watched, ref, - am_noproc); + code = erts_dsig_send_m_exit(&ctx, watcher, watched, ref, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); } @@ -2152,45 +2098,21 @@ data_error: return -1; } -static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, Eterm from, int force_busy) +static int dsig_send_exit(ErtsDSigSendContext *ctx, Eterm ctl, Eterm msg) { - struct erts_dsig_send_context ctx; - int ret; - ctx.ctl = ctl; - ctx.msg = msg; - ctx.from = from; - ctx.force_busy = force_busy; - ctx.force_encode = force_busy; - ctx.phase = ERTS_DSIG_SEND_PHASE_INIT; - ctx.reds = 1; /* provoke assert below (no reduction count with force_encode) */ - ret = erts_dsig_send(dsdp, &ctx); - ASSERT(ret != ERTS_DSIG_SEND_CONTINUE); - return ret; + ctx->ctl = ctl; + ctx->msg = msg; + return erts_dsig_send(ctx); } -static int dsig_send_exit_ctx(ErtsSendContext *ctx, Eterm ctl, Eterm msg) +static int dsig_send_ctl(ErtsDSigSendContext *ctx, Eterm ctl) { int ret; - ctx->dss.ctl = ctl; - ctx->dss.msg = msg; - ctx->dss.force_busy = 0; - ctx->dss.force_encode = 0; - ret = erts_dsig_send(&ctx->dsd, &ctx->dss); - return ret; -} - -static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy) -{ - struct erts_dsig_send_context ctx; - int ret; - ctx.ctl = ctl; - ctx.msg = THE_NON_VALUE; - ctx.from = THE_NON_VALUE; - ctx.force_busy = force_busy; - ctx.force_encode = 1; - ctx.phase = ERTS_DSIG_SEND_PHASE_INIT; - ctx.reds = 1; /* provoke assert below (no reduction count without msg) */ - ret = erts_dsig_send(dsdp, &ctx); + ctx->ctl = ctl; + ctx->msg = THE_NON_VALUE; + ctx->from = THE_NON_VALUE; + ctx->reds = 1; /* provoke assert below (no reduction count without msg) */ + ret = erts_dsig_send(ctx); ASSERT(ret != ERTS_DSIG_SEND_CONTINUE); return ret; } @@ -2221,7 +2143,117 @@ notify_dist_data(Process *c_p, Eterm pid) } int -erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) +erts_dsig_prepare(ErtsDSigSendContext *ctx, + DistEntry *dep, + Process *proc, + ErtsProcLocks proc_locks, + ErtsDSigPrepLock dspl, + int no_suspend, + int no_trap, + int connect) +{ + int res; + + if (!erts_is_alive) + return ERTS_DSIG_PREP_NOT_ALIVE; + if (!dep) { + ASSERT(!connect); + return ERTS_DSIG_PREP_NOT_CONNECTED; + } + +#ifdef ERTS_ENABLE_LOCK_CHECK + if (connect) { + erts_proc_lc_might_unlock(proc, proc_locks); + } +#endif + +retry: + erts_de_rlock(dep); + + if (dep->state == ERTS_DE_STATE_CONNECTED) { + res = ERTS_DSIG_PREP_CONNECTED; + } + else if (dep->state == ERTS_DE_STATE_PENDING) { + res = ERTS_DSIG_PREP_PENDING; + } + else if (dep->state == ERTS_DE_STATE_EXITING) { + res = ERTS_DSIG_PREP_NOT_CONNECTED; + goto fail; + } + else if (connect) { + ASSERT(dep->state == ERTS_DE_STATE_IDLE); + erts_de_runlock(dep); + if (!erts_auto_connect(dep, proc, proc_locks)) { + return ERTS_DSIG_PREP_NOT_ALIVE; + } + goto retry; + } + else { + ASSERT(dep->state == ERTS_DE_STATE_IDLE); + res = ERTS_DSIG_PREP_NOT_CONNECTED; + goto fail; + } + + if (no_suspend) { + if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { + res = ERTS_DSIG_PREP_WOULD_SUSPEND; + goto fail; + } + } + + ctx->c_p = proc; + ctx->dep = dep; + ctx->deref_dep = 0; + ctx->cid = dep->cid; + ctx->connection_id = dep->connection_id; + ctx->no_suspend = no_suspend; + ctx->no_trap = no_trap; + ctx->flags = dep->flags; + ctx->return_term = am_true; + ctx->phase = ERTS_DSIG_SEND_PHASE_INIT; + ctx->from = proc ? proc->common.id : am_undefined; + ctx->reds = no_trap ? 1 : (Sint) (ERTS_BIF_REDS_LEFT(proc) * TERM_TO_BINARY_LOOP_FACTOR); + if (dspl == ERTS_DSP_NO_LOCK) + erts_de_runlock(dep); + return res; + + fail: + erts_de_runlock(dep); + return res; +} + +static +void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry) +{ + DistEntry *dep; + Eterm id; + + if (prt) { + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); + ASSERT((erts_atomic32_read_nob(&prt->state) + & ERTS_PORT_SFLGS_DEAD) == 0); + + dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); + ASSERT(dep); + id = prt->common.id; + } + else { + ASSERT(dist_entry); + ERTS_LC_ASSERT(erts_lc_rwmtx_is_rlocked(&dist_entry->rwmtx) + || erts_lc_rwmtx_is_rwlocked(&dist_entry->rwmtx)); + ASSERT(is_internal_port(dist_entry->cid)); + + dep = dist_entry; + id = dep->cid; + } + + if (!erts_atomic_xchg_mb(&dep->dist_cmd_scheduled, 1)) + erts_port_task_schedule(id, &dep->dist_cmd, ERTS_PORT_TASK_DIST_CMD); +} + + +int +erts_dsig_send(ErtsDSigSendContext *ctx) { int retval; Sint initial_reds = ctx->reds; @@ -2230,11 +2262,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) while (1) { switch (ctx->phase) { case ERTS_DSIG_SEND_PHASE_INIT: - ctx->flags = dsdp->flags; - ctx->c_p = dsdp->proc; + ctx->flags = ctx->flags; + ctx->c_p = ctx->c_p; - if (!ctx->c_p || dsdp->no_suspend) - ctx->force_busy = 1; + if (!ctx->c_p) { + ctx->no_trap = 1; + ctx->no_suspend = 1; + } ERTS_LC_ASSERT(!ctx->c_p || (ERTS_PROC_LOCK_MAIN @@ -2274,7 +2308,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; case ERTS_DSIG_SEND_PHASE_MSG_SIZE: - if (!ctx->force_encode) { + if (!ctx->no_trap) { if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { retval = ERTS_DSIG_SEND_CONTINUE; goto done; @@ -2326,7 +2360,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: - if (!ctx->force_encode) { + if (!ctx->no_trap) { if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) { retval = ERTS_DSIG_SEND_CONTINUE; @@ -2397,7 +2431,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) for (; i < ctx->fragments; i++) { free_dist_obuf(&ctx->obuf[i]); } - if (!ctx->force_encode && !ctx->force_busy) + if (!ctx->no_trap && !ctx->no_suspend) ctx->reds -= ctx->fragments; ctx->fragments = fin_fragments + 1; } @@ -2414,7 +2448,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) * Signal encoded; now verify that the connection still exists, * and if so enqueue the signal and schedule it for send. */ - DistEntry *dep = dsdp->dep; + DistEntry *dep = ctx->dep; int suspended = 0; int resume = 0; int i; @@ -2422,7 +2456,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) cid = dep->cid; if (dep->state == ERTS_DE_STATE_EXITING || dep->state == ERTS_DE_STATE_IDLE - || dep->connection_id != dsdp->connection_id) { + || dep->connection_id != ctx->connection_id) { /* Not the same connection as when we started; drop message... */ erts_de_runlock(dep); for (i = 0; i < ctx->fragments; i++) @@ -2443,20 +2477,20 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) for (fragments = 0, obsz = 0; fragments < ctx->fragments && ((ctx->reds > 0 && (qsize + obsz) < erts_dist_buf_busy_limit) || - ctx->force_encode || ctx->force_busy); + ctx->no_trap || ctx->no_suspend); fragments++) { #ifdef DEBUG int reds = 100; #else int reds = 10; #endif - if (!ctx->force_encode && !ctx->force_busy) + if (!ctx->no_trap && !ctx->no_suspend) ctx->reds -= reds; obsz += size_obuf(&ctx->obuf[fragments]); } ASSERT(fragments == ctx->fragments || - (!ctx->force_encode && !ctx->force_busy)); + (!ctx->no_trap && !ctx->no_suspend)); erts_mtx_lock(&dep->qlock); qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz); @@ -2477,7 +2511,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) /* else: requester will send itself the message... */ qflgs &= ~ERTS_DE_QFLG_REQ_INFO; } - if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) { + if (!ctx->no_suspend && (qflgs & ERTS_DE_QFLG_BUSY)) { erts_mtx_unlock(&dep->qlock); plp = erts_proclist_create(ctx->c_p); @@ -2506,7 +2540,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->obuf = &ctx->obuf[fragments]; } - if (!ctx->force_busy) { + if (!ctx->no_suspend) { qflgs = erts_atomic32_read_nob(&dep->qflgs); if (!(qflgs & ERTS_DE_QFLG_BUSY)) { if (suspended) @@ -4352,16 +4386,16 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) } case am_true: { - ErtsDSigData dsd; - dsd.node = Node; + ErtsDSigSendContext ctx; + ctx.node = Node; dep = erts_find_or_insert_dist_entry(Node); if (dep == erts_this_dist_entry) break; - switch (erts_dsig_prepare(&dsd, dep, p, + switch (erts_dsig_prepare(&ctx, dep, p, ERTS_PROC_LOCK_MAIN, - ERTS_DSP_RLOCK, 0, async_connect)) { + ERTS_DSP_RLOCK, 0, 0, async_connect)) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: /* Trap to either send 'nodedown' or do passive connection attempt */ diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 8ea214a0f2..c4bb967592 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -142,16 +142,6 @@ typedef enum { } ErtsDSigPrepLock; -typedef struct { - Process *proc; - DistEntry *dep; - Eterm node; /* used if dep == NULL */ - Eterm cid; - Eterm connection_id; - int no_suspend; - Uint32 flags; -} ErtsDSigData; - /* Must be larger or equal to 16 */ #ifdef DEBUG #define ERTS_DIST_FRAGMENT_SIZE 16 @@ -182,124 +172,6 @@ extern int erts_is_alive; /* Pending connection; signals can be enqueued */ #define ERTS_DSIG_PREP_PENDING 4 -ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *, - DistEntry*, - Process *, - ErtsProcLocks, - ErtsDSigPrepLock, - int, - int); - -ERTS_GLB_INLINE -void erts_schedule_dist_command(Port *, DistEntry *); - -int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks); - -#if ERTS_GLB_INLINE_INCL_FUNC_DEF - -ERTS_GLB_INLINE int -erts_dsig_prepare(ErtsDSigData *dsdp, - DistEntry *dep, - Process *proc, - ErtsProcLocks proc_locks, - ErtsDSigPrepLock dspl, - int no_suspend, - int connect) -{ - int res; - - if (!erts_is_alive) - return ERTS_DSIG_PREP_NOT_ALIVE; - if (!dep) { - ASSERT(!connect); - return ERTS_DSIG_PREP_NOT_CONNECTED; - } - -#ifdef ERTS_ENABLE_LOCK_CHECK - if (connect) { - erts_proc_lc_might_unlock(proc, proc_locks); - } -#endif - -retry: - erts_de_rlock(dep); - - if (dep->state == ERTS_DE_STATE_CONNECTED) { - res = ERTS_DSIG_PREP_CONNECTED; - } - else if (dep->state == ERTS_DE_STATE_PENDING) { - res = ERTS_DSIG_PREP_PENDING; - } - else if (dep->state == ERTS_DE_STATE_EXITING) { - res = ERTS_DSIG_PREP_NOT_CONNECTED; - goto fail; - } - else if (connect) { - ASSERT(dep->state == ERTS_DE_STATE_IDLE); - erts_de_runlock(dep); - if (!erts_auto_connect(dep, proc, proc_locks)) { - return ERTS_DSIG_PREP_NOT_ALIVE; - } - goto retry; - } - else { - ASSERT(dep->state == ERTS_DE_STATE_IDLE); - res = ERTS_DSIG_PREP_NOT_CONNECTED; - goto fail; - } - - if (no_suspend) { - if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { - res = ERTS_DSIG_PREP_WOULD_SUSPEND; - goto fail; - } - } - dsdp->proc = proc; - dsdp->dep = dep; - dsdp->cid = dep->cid; - dsdp->connection_id = dep->connection_id; - dsdp->no_suspend = no_suspend; - dsdp->flags = dep->flags; - if (dspl == ERTS_DSP_NO_LOCK) - erts_de_runlock(dep); - return res; - - fail: - erts_de_runlock(dep); - return res; -} - -ERTS_GLB_INLINE -void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry) -{ - DistEntry *dep; - Eterm id; - - if (prt) { - ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); - ASSERT((erts_atomic32_read_nob(&prt->state) - & ERTS_PORT_SFLGS_DEAD) == 0); - - dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); - ASSERT(dep); - id = prt->common.id; - } - else { - ASSERT(dist_entry); - ERTS_LC_ASSERT(erts_lc_rwmtx_is_rlocked(&dist_entry->rwmtx) - || erts_lc_rwmtx_is_rwlocked(&dist_entry->rwmtx)); - ASSERT(is_internal_port(dist_entry->cid)); - - dep = dist_entry; - id = dep->cid; - } - - if (!erts_atomic_xchg_mb(&dep->dist_cmd_scheduled, 1)) - erts_port_task_schedule(id, &dep->dist_cmd, ERTS_PORT_TASK_DIST_CMD); -} - -#endif - #ifdef DEBUG #define ERTS_DBG_CHK_NO_DIST_LNK(D, R, L) \ erts_dbg_chk_no_dist_proc_link((D), (R), (L)) @@ -363,15 +235,26 @@ enum erts_dsig_send_phase { ERTS_DSIG_SEND_PHASE_SEND }; -struct erts_dsig_send_context { - enum erts_dsig_send_phase phase; - Sint reds; +typedef struct erts_dsig_send_context { + int connect; + int no_suspend; + int no_trap; Eterm ctl; Eterm msg; Eterm from; - int force_busy; - int force_encode; + Eterm ctl_heap[6]; + Eterm return_term; + + DistEntry *dep; + Eterm node; /* used if dep == NULL */ + Eterm cid; + Eterm connection_id; + int deref_dep; + + enum erts_dsig_send_phase phase; + Sint reds; + Uint32 max_finalize_prepend; Uint data_size, dhdr_ext_size; ErtsAtomCacheMap *acmp; @@ -383,20 +266,8 @@ struct erts_dsig_send_context { TTBSizeContext sc; TTBEncodeContext ec; }u; -}; -typedef struct { - int suspend; - int connect; - - Eterm ctl_heap[6]; - ErtsDSigData dsd; - DistEntry *dep; - int deref_dep; - struct erts_dsig_send_context dss; - - Eterm return_term; -}ErtsSendContext; +} ErtsDSigSendContext; typedef struct dist_sequences DistSeqNode; @@ -408,21 +279,21 @@ typedef struct dist_sequences DistSeqNode; #define ERTS_DSIG_SEND_CONTINUE 2 #define ERTS_DSIG_SEND_TOO_LRG 3 -extern int erts_dsig_send_link(ErtsDSigData *, Eterm, Eterm); -extern int erts_dsig_send_msg(Eterm, Eterm, ErtsSendContext*); -extern int erts_dsig_send_exit_tt(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm); -extern int erts_dsig_send_unlink(ErtsDSigData *, Eterm, Eterm); -extern int erts_dsig_send_reg_msg(Eterm, Eterm, ErtsSendContext*); -extern int erts_dsig_send_group_leader(ErtsDSigData *, Eterm, Eterm); -extern int erts_dsig_send_exit(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm); -extern int erts_dsig_send_exit2(ErtsSendContext *, Eterm, Eterm, Eterm); -extern int erts_dsig_send_demonitor(ErtsDSigData *, Eterm, Eterm, Eterm, int); -extern int erts_dsig_send_monitor(ErtsDSigData *, Eterm, Eterm, Eterm); -extern int erts_dsig_send_m_exit(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm, Eterm); - -extern int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx); +extern int erts_dsig_send_msg(ErtsDSigSendContext*, Eterm, Eterm); +extern int erts_dsig_send_reg_msg(ErtsDSigSendContext*, Eterm, Eterm); +extern int erts_dsig_send_link(ErtsDSigSendContext *, Eterm, Eterm); +extern int erts_dsig_send_exit_tt(ErtsDSigSendContext *, Eterm, Eterm, Eterm, Eterm); +extern int erts_dsig_send_unlink(ErtsDSigSendContext *, Eterm, Eterm); +extern int erts_dsig_send_group_leader(ErtsDSigSendContext *, Eterm, Eterm); +extern int erts_dsig_send_exit(ErtsDSigSendContext *, Eterm, Eterm, Eterm); +extern int erts_dsig_send_exit2(ErtsDSigSendContext *, Eterm, Eterm, Eterm); +extern int erts_dsig_send_demonitor(ErtsDSigSendContext *, Eterm, Eterm, Eterm); +extern int erts_dsig_send_monitor(ErtsDSigSendContext *, Eterm, Eterm, Eterm); +extern int erts_dsig_send_m_exit(ErtsDSigSendContext *, Eterm, Eterm, Eterm, Eterm); + +extern int erts_dsig_send(ErtsDSigSendContext *dsdp); extern int erts_dsend_context_dtor(Binary*); -extern Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx); +extern Eterm erts_dsend_export_trap_context(Process* p, ErtsDSigSendContext* ctx); extern int erts_dist_command(Port *prt, int reds); extern void erts_dist_port_not_busy(Port *prt); @@ -436,4 +307,14 @@ extern void erts_dist_seq_tree_foreach( DistEntry *dep, int (*func)(ErtsDistExternal *, void*, Sint), void *args); +extern int erts_dsig_prepare(ErtsDSigSendContext *, + DistEntry*, + Process *, + ErtsProcLocks, + ErtsDSigPrepLock, + int, + int, + int); + +int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks); #endif diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index ede9f092e9..b301e5d56a 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -1516,7 +1516,7 @@ insert_offheap(ErlOffHeap *oh, int type, Eterm id) } } else if (IsSendCtxBinary(u.mref->mb)) { - ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(u.mref->mb); + ErtsDSigSendContext* ctx = ERTS_MAGIC_BIN_DATA(u.mref->mb); if (ctx->deref_dep) insert_dist_entry(ctx->dep, type, id, 0); } diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 837b3f4ace..422f6a51d9 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -12085,7 +12085,7 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds) case ERTS_MON_TYPE_DIST_PROC: { ErtsMonLnkDist *dist; DistEntry *dep; - ErtsDSigData dsd; + ErtsDSigSendContext ctx; int code; Eterm watcher; Eterm watched; @@ -12104,14 +12104,13 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds) ASSERT(dep); dist = ((ErtsMonitorDataExtended *) mdp)->dist; ASSERT(dist); - code = erts_dsig_prepare(&dsd, dep, NULL, 0, - ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&ctx, dep, NULL, 0, + ERTS_DSP_NO_LOCK, 1, 1, 0); switch (code) { case ERTS_DSIG_PREP_CONNECTED: case ERTS_DSIG_PREP_PENDING: - if (dist->connection_id == dsd.connection_id) { - code = erts_dsig_send_m_exit(&dsd, - c_p->common.id, + if (dist->connection_id == ctx.connection_id) { + code = erts_dsig_send_m_exit(&ctx, watcher, watched, mdp->ref, @@ -12164,7 +12163,7 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds) case ERTS_MON_TYPE_DIST_PROC: { ErtsMonLnkDist *dist; DistEntry *dep; - ErtsDSigData dsd; + ErtsDSigSendContext ctx; int code; Eterm watched; @@ -12181,17 +12180,16 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds) ASSERT(is_external_pid(watched)); dep = external_pid_dist_entry(watched); } - code = erts_dsig_prepare(&dsd, dep, NULL, 0, - ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&ctx, dep, NULL, 0, + ERTS_DSP_NO_LOCK, 1, 1, 0); switch (code) { case ERTS_DSIG_PREP_CONNECTED: case ERTS_DSIG_PREP_PENDING: - if (dist->connection_id == dsd.connection_id) { - code = erts_dsig_send_demonitor(&dsd, + if (dist->connection_id == ctx.connection_id) { + code = erts_dsig_send_demonitor(&ctx, c_p->common.id, watched, - mdp->ref, - 1); + mdp->ref); ASSERT(code == ERTS_DSIG_SEND_OK); } default: @@ -12247,7 +12245,7 @@ erts_proc_exit_handle_link(ErtsLink *lnk, void *vctxt, Sint reds) DistEntry *dep; ErtsMonLnkDist *dist; ErtsLink *dlnk; - ErtsDSigData dsd; + ErtsDSigSendContext ctx; int code; dlnk = erts_link_to_other(lnk, &ldp); @@ -12261,12 +12259,12 @@ erts_proc_exit_handle_link(ErtsLink *lnk, void *vctxt, Sint reds) if (!erts_link_dist_delete(dlnk)) ldp = NULL; - code = erts_dsig_prepare(&dsd, dep, c_p, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&ctx, dep, c_p, 0, ERTS_DSP_NO_LOCK, 1, 1, 0); switch (code) { case ERTS_DSIG_PREP_CONNECTED: case ERTS_DSIG_PREP_PENDING: - if (dist->connection_id == dsd.connection_id) { - code = erts_dsig_send_exit_tt(&dsd, + if (dist->connection_id == ctx.connection_id) { + code = erts_dsig_send_exit_tt(&ctx, c_p->common.id, lnk->other.item, reason, diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 84e6e0d6fd..988ce242c4 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -620,7 +620,7 @@ int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, } } -int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx, Uint* szp) +int erts_encode_dist_ext_size_int(Eterm term, ErtsDSigSendContext *ctx, Uint* szp) { Uint sz; if (encode_size_struct_int(&ctx->u.sc, ctx->acmp, term, ctx->flags, &ctx->reds, &sz)) { -- cgit v1.2.3