diff options
Diffstat (limited to 'erts/emulator/beam')
37 files changed, 1947 insertions, 944 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 42a6991c6e..cef633bd93 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -108,6 +108,7 @@ atom asynchronous atom atom atom atom_used atom attributes +atom auto_connect atom await_microstate_accounting_modifications atom await_port_send_result atom await_proc_exit @@ -198,9 +199,7 @@ atom debug_flags atom decimals atom default atom delay_trap -atom dexit atom depth -atom dgroup_leader atom dictionary atom dirty_bif_exception atom dirty_bif_result @@ -221,7 +220,6 @@ atom dist_ctrl_put_data atom dist_data atom Div='/' atom div -atom dlink atom dmonitor_node atom dmonitor_p atom DollarDollar='$$' @@ -230,9 +228,7 @@ atom dollar_endonly atom dotall atom driver atom driver_options -atom dsend atom dsend_continue_trap -atom dunlink atom duplicate_bag atom duplicated atom dupnames diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index 00822d1415..87367b44ab 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -768,36 +768,45 @@ BIF_RETTYPE finish_after_on_load_2(BIF_ALIST_2) ErtsCodeIndex code_ix; Module* modp; + if (BIF_ARG_2 != am_false && BIF_ARG_2 != am_true) { + BIF_ERROR(BIF_P, BADARG); + } + if (!erts_try_seize_code_write_permission(BIF_P)) { ERTS_BIF_YIELD2(bif_export[BIF_finish_after_on_load_2], BIF_P, BIF_ARG_1, BIF_ARG_2); } - /* ToDo: Use code_ix staging instead of thread blocking */ - - erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); - erts_thr_progress_block(); - code_ix = erts_active_code_ix(); modp = erts_get_module(BIF_ARG_1, code_ix); - if (!modp || !modp->on_load || !modp->on_load->code_hdr) { - error: - erts_thr_progress_unblock(); - erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + if (!modp || !modp->on_load || !modp->on_load->code_hdr + || !modp->on_load->code_hdr->on_load_function_ptr) { + erts_release_code_write_permission(); BIF_ERROR(BIF_P, BADARG); } - if (modp->on_load->code_hdr->on_load_function_ptr == NULL) { - goto error; - } - if (BIF_ARG_2 != am_false && BIF_ARG_2 != am_true) { - goto error; - } if (BIF_ARG_2 == am_true) { + struct m mods[1]; + int is_blocking = 0; int i, num_exps; + erts_start_staging_code_ix(0); + code_ix = erts_staging_code_ix(); + modp = erts_get_module(BIF_ARG_1, code_ix); + + ASSERT(modp && modp->on_load && modp->on_load->code_hdr + && modp->on_load->code_hdr->on_load_function_ptr); + + if (erts_is_default_trace_enabled() + || IF_HIPE(hipe_need_blocking(modp))) { + + erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + erts_thr_progress_block(); + is_blocking = 1; + } + /* * Make the code with the on_load function current. */ @@ -831,11 +840,13 @@ BIF_RETTYPE finish_after_on_load_2(BIF_ALIST_2) } } modp->curr.code_hdr->on_load_function_ptr = NULL; - set_default_trace_pattern(BIF_ARG_1); - #ifdef HIPE - hipe_redirect_to_module(modp); - #endif - } else if (BIF_ARG_2 == am_false) { + + mods[0].modp = modp; + mods[0].module = BIF_ARG_1; + mods[0].exception = THE_NON_VALUE; + return staging_epilogue(BIF_P, 1, am_true, is_blocking, mods, 1, 0); + } + else if (BIF_ARG_2 == am_false) { int i, num_exps; /* @@ -855,8 +866,6 @@ BIF_RETTYPE finish_after_on_load_2(BIF_ALIST_2) ep->beam[1] = 0; } } - erts_thr_progress_unblock(); - erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); erts_release_code_write_permission(); BIF_RET(am_true); } diff --git a/erts/emulator/beam/beam_bp.c b/erts/emulator/beam/beam_bp.c index fe1e15701b..0832b3f374 100644 --- a/erts/emulator/beam/beam_bp.c +++ b/erts/emulator/beam/beam_bp.c @@ -998,7 +998,9 @@ do_call_trace(Process* c_p, ErtsCodeInfo* info, Eterm* reg, fixup_cp_before_trace(c_p, &return_to_trace); + ERTS_UNREQ_PROC_MAIN_LOCK(c_p); flags = erts_call_trace(c_p, info, ms, reg, local, &tracer); + ERTS_REQ_PROC_MAIN_LOCK(c_p); /* restore cp after potential fixup */ c_p->cp = cp_save; diff --git a/erts/emulator/beam/beam_debug.c b/erts/emulator/beam/beam_debug.c index 70078c8c59..509aa2a84f 100644 --- a/erts/emulator/beam/beam_debug.c +++ b/erts/emulator/beam/beam_debug.c @@ -408,7 +408,10 @@ print_op(fmtfn_t to, void *to_arg, int op, int size, BeamInstr* addr) addr++; ap = addr; } else { - BeamInstr instr_word = addr++[0]; +#if defined(ARCH_64) && defined(CODE_MODEL_SMALL) + BeamInstr instr_word = addr[0]; +#endif + addr++; /* * Copy all arguments to a local buffer for the unpacking. diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c index aa94fbf536..ef9abcde08 100644 --- a/erts/emulator/beam/beam_emu.c +++ b/erts/emulator/beam/beam_emu.c @@ -400,12 +400,13 @@ static BeamInstr* apply_fun(Process* p, Eterm fun, Eterm args, Eterm* reg) NOINLINE; static Eterm new_fun(Process* p, Eterm* reg, ErlFunEntry* fe, int num_free) NOINLINE; -static Eterm new_map(Process* p, Eterm* reg, Uint live, Uint n, BeamInstr* ptr) NOINLINE; -static Eterm new_small_map_lit(Process* p, Eterm* reg, Eterm keys_literal, +static Eterm erts_gc_new_map(Process* p, Eterm* reg, Uint live, + Uint n, BeamInstr* ptr) NOINLINE; +static Eterm erts_gc_new_small_map_lit(Process* p, Eterm* reg, Eterm keys_literal, Uint live, BeamInstr* ptr) NOINLINE; -static Eterm update_map_assoc(Process* p, Eterm* reg, Uint live, +static Eterm erts_gc_update_map_assoc(Process* p, Eterm* reg, Uint live, Uint n, BeamInstr* new_p) NOINLINE; -static Eterm update_map_exact(Process* p, Eterm* reg, Uint live, +static Eterm erts_gc_update_map_exact(Process* p, Eterm* reg, Uint live, Uint n, Eterm* new_p) NOINLINE; static Eterm get_map_element(Eterm map, Eterm key); static Eterm get_map_element_hash(Eterm map, Eterm key, Uint32 hx); @@ -1453,6 +1454,7 @@ handle_error(Process* c_p, BeamInstr* pc, Eterm* reg, ErtsCodeMFA *bif_mfa) reg[3] = c_p->ftrace; if ((new_pc = next_catch(c_p, reg))) { c_p->cp = 0; /* To avoid keeping stale references. */ + c_p->msg.saved_last = 0; /* No longer safe to use this position */ return new_pc; } if (c_p->catches > 0) erts_exit(ERTS_ERROR_EXIT, "Catch not found"); @@ -2755,7 +2757,7 @@ do { \ static Eterm -new_map(Process* p, Eterm* reg, Uint live, Uint n, BeamInstr* ptr) +erts_gc_new_map(Process* p, Eterm* reg, Uint live, Uint n, BeamInstr* ptr) { Uint i; Uint need = n + 1 /* hdr */ + 1 /*size*/ + 1 /* ptr */ + 1 /* arity */; @@ -2812,7 +2814,8 @@ new_map(Process* p, Eterm* reg, Uint live, Uint n, BeamInstr* ptr) } static Eterm -new_small_map_lit(Process* p, Eterm* reg, Eterm keys_literal, Uint live, BeamInstr* ptr) +erts_gc_new_small_map_lit(Process* p, Eterm* reg, Eterm keys_literal, + Uint live, BeamInstr* ptr) { Eterm* keys = tuple_val(keys_literal); Uint n = arityval(*keys); @@ -2846,7 +2849,8 @@ new_small_map_lit(Process* p, Eterm* reg, Eterm keys_literal, Uint live, BeamIns } static Eterm -update_map_assoc(Process* p, Eterm* reg, Uint live, Uint n, BeamInstr* new_p) +erts_gc_update_map_assoc(Process* p, Eterm* reg, Uint live, + Uint n, BeamInstr* new_p) { Uint num_old; Uint num_updates; @@ -2892,7 +2896,7 @@ update_map_assoc(Process* p, Eterm* reg, Uint live, Uint n, BeamInstr* new_p) */ if (num_old == 0) { - return new_map(p, reg, live, n, new_p); + return erts_gc_new_map(p, reg, live, n, new_p); } /* @@ -3048,7 +3052,7 @@ update_map_assoc(Process* p, Eterm* reg, Uint live, Uint n, BeamInstr* new_p) */ static Eterm -update_map_exact(Process* p, Eterm* reg, Uint live, Uint n, Eterm* new_p) +erts_gc_update_map_exact(Process* p, Eterm* reg, Uint live, Uint n, Eterm* new_p) { Uint i; Uint num_old; @@ -3199,13 +3203,16 @@ erts_is_builtin(Eterm Mod, Eterm Name, int arity) Export e; Export* ep; - if (Mod == am_erlang && Name == am_apply && arity == 3) { - /* - * Special case. apply/3 is built-in (implemented in C), - * but implemented in a different way than all other - * BIFs. - */ - return 1; + if (Mod == am_erlang) { + /* + * Special case for built-in functions that are implemented + * as instructions as opposed to SNIFs. + */ + if (Name == am_apply && (arity == 2 || arity == 3)) { + return 1; + } else if (Name == am_yield && arity == 0) { + return 1; + } } e.info.mfa.module = Mod; diff --git a/erts/emulator/beam/beam_load.c b/erts/emulator/beam/beam_load.c index 00dd28b26c..4fcfea527c 100644 --- a/erts/emulator/beam/beam_load.c +++ b/erts/emulator/beam/beam_load.c @@ -806,11 +806,6 @@ erts_finish_loading(Binary* magic, Process* c_p, struct erl_module_instance* inst_p; Uint size; - /* - * No other process may run since we will update the export - * table which is not protected by any locks. - */ - ERTS_LC_ASSERT(erts_initialized == 0 || erts_has_code_write_permission() || erts_thr_progress_is_blocking()); /* @@ -2389,8 +2384,18 @@ load_code(LoaderState* stp) code[ci++] = NIL; break; case TAG_q: - new_literal_patch(stp, ci); - code[ci++] = tmp_op->a[arg].val; + { + BeamInstr val = tmp_op->a[arg].val; + Eterm term = stp->literals[val].term; + new_literal_patch(stp, ci); + code[ci++] = val; + switch (loader_tag(term)) { + case LOADER_X_REG: + case LOADER_Y_REG: + LoadError1(stp, "the term '%T' would be confused " + "with a register", term); + } + } break; default: LoadError1(stp, "bad tag %d for general source", @@ -5027,7 +5032,7 @@ freeze_code(LoaderState* stp) */ codev[pos] = (BeamInstr) (codev + value); } else { -#ifdef DEBUG +#if defined(DEBUG) && defined(BEAM_WIDE_MASK) Uint w; #endif Sint32 rel = lp->offset + value; diff --git a/erts/emulator/beam/beam_ranges.c b/erts/emulator/beam/beam_ranges.c index 6e373a3480..01bda7f3c1 100644 --- a/erts/emulator/beam/beam_ranges.c +++ b/erts/emulator/beam/beam_ranges.c @@ -32,6 +32,15 @@ typedef struct { erts_atomic_t end; /* (BeamInstr*) Points one word beyond last function in module. */ } Range; +/* + * Used for crash dumping of literals. The size of erts_dump_lit_areas is + * always twice the number of active ranges (to allow for literals in both + * current and old code). + */ + +ErtsLiteralArea** erts_dump_lit_areas; +Uint erts_dump_num_lit_areas; + /* Range 'end' needs to be atomic as we purge module by setting end=start in active code_ix */ #define RANGE_END(R) ((BeamInstr*)erts_atomic_read_nob(&(R)->end)) @@ -97,6 +106,11 @@ erts_init_ranges(void) r[i].allocated = 0; erts_atomic_init_nob(&r[i].mid, 0); } + + erts_dump_num_lit_areas = 8; + erts_dump_lit_areas = (ErtsLiteralArea **) + erts_alloc(ERTS_ALC_T_CRASH_DUMP, + erts_dump_num_lit_areas * sizeof(ErtsLiteralArea*)); } void @@ -164,6 +178,14 @@ erts_end_staging_ranges(int commit) erts_atomic_set_nob(&r[dst].mid, (erts_aint_t) (r[dst].modules + r[dst].n / 2)); + + if (r[dst].allocated * 2 > erts_dump_num_lit_areas) { + erts_dump_num_lit_areas *= 2; + erts_dump_lit_areas = (ErtsLiteralArea **) + erts_realloc(ERTS_ALC_T_CRASH_DUMP, + (void *) erts_dump_lit_areas, + erts_dump_num_lit_areas * sizeof(ErtsLiteralArea*)); + } } } diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index ad555bb195..50699eac31 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -227,17 +227,40 @@ BIF_RETTYPE link_1(BIF_ALIST_1) goto res_no_proc; } - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, dep, BIF_P, + (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: - /* Let the dlink trap handle it */ - case ERTS_DSIG_PREP_NOT_CONNECTED: - erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); - BIF_TRAP1(dlink_trap, BIF_P, BIF_ARG_1); - + case ERTS_DSIG_PREP_NOT_CONNECTED: { + ErtsProcLocks locks = ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK; + erts_aint32_t state; + erts_proc_lock(BIF_P, (ERTS_PROC_LOCKS_ALL & ~locks)); + locks = ERTS_PROC_LOCKS_ALL; + erts_send_exit_signal(BIF_P, BIF_ARG_1, BIF_P, &locks, + am_noconnection, NIL, NULL, 0); + erts_proc_unlock(BIF_P, locks & ERTS_PROC_LOCKS_ALL_MINOR); + + /* + * Copy-paste from old dist_exit_3, not sure if we really + * need erts_handle_pending_exit when exit_2 does not. + */ + state = erts_atomic32_read_acqb(&BIF_P->state); + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { +#ifdef ERTS_SMP + if (state & ERTS_PSFLG_PENDING_EXIT) + erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); +#endif + ERTS_BIF_EXITED(BIF_P); + } + BIF_RET(am_true); + } + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: - /* We are connected. Setup link and send link signal */ - + /* + * We have (pending) connection. + * Setup link and enqueue link signal. + */ erts_de_links_lock(dep); erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, BIF_ARG_1); @@ -256,8 +279,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } } @@ -292,7 +314,8 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) ERTS_LC_ASSERT((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK) == erts_proc_lc_my_proc_locks(c_p)); - code = erts_dsig_prepare(&dsd, dep, c_p, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, dep, c_p, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_RLOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: @@ -313,6 +336,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) res = am_true; break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_de_links_lock(dep); @@ -347,8 +371,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) } break; default: - ASSERT(! "Invalid dsig prepare result"); - return am_internal_error; + ERTS_ASSERT(! "Invalid dsig prepare result"); } @@ -767,23 +790,20 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, BIF_RETTYPE ret; int code; + ASSERT(dep); erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, dep, + p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: - /* Let the dmonitor_p trap handle it */ case ERTS_DSIG_PREP_NOT_CONNECTED: erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); ERTS_BIF_PREP_TRAP2(ret, dmonitor_p_trap, p, bifarg1, bifarg2); break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: - if (!(dep->flags & DFLAG_DIST_MONITOR) - || (byname && !(dep->flags & DFLAG_DIST_MONITOR_NAME))) { - erts_de_runlock(dep); - erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); - ERTS_BIF_PREP_ERROR(ret, p, BADARG); - } - else { + { Eterm p_trgt, p_name, d_name, mon_ref; mon_ref = erts_make_ref(p); @@ -818,9 +838,7 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, } break; default: - ASSERT(! "Invalid dsig prepare result"); - ERTS_BIF_PREP_ERROR(ret, p, EXC_INTERNAL_ERROR); - break; + ERTS_ASSERT(! "Invalid dsig prepare result"); } BIF_RET(ret); @@ -888,17 +906,17 @@ local_port: if (!erts_is_alive && remote_node != am_Noname) { goto badarg; /* Remote monitor from (this) undistributed node */ } - dep = erts_sysname_to_connected_dist_entry(remote_node); + dep = erts_find_or_insert_dist_entry(remote_node); if (dep == erts_this_dist_entry) { ret = local_name_monitor(BIF_P, BIF_ARG_1, name); } else { ret = remote_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, dep, name, 1); } + erts_deref_dist_entry(dep); } else { badarg: ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG); } - return ret; } @@ -1158,21 +1176,14 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) BIF_RET(am_true); } - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: -#if 1 BIF_RET(am_true); -#else - /* - * This is how we used to do it, but the link is obviously not - * active, so I see no point in setting up a connection. - * /Rickard - */ - BIF_TRAP1(dunlink_trap, BIF_P, BIF_ARG_1); -#endif + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_remove_dist_link(&dld, BIF_P->common.id, BIF_ARG_1, dep); code = erts_dsig_send_unlink(&dsd, BIF_P->common.id, BIF_ARG_1); @@ -1545,22 +1556,24 @@ BIF_RETTYPE exit_2(BIF_ALIST_2) DistEntry *dep; dep = external_pid_dist_entry(BIF_ARG_1); + ERTS_ASSERT(dep); if(dep == erts_this_dist_entry) BIF_RET(am_true); - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: - BIF_TRAP2(dexit_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); + BIF_RET(am_true); + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: code = erts_dsig_send_exit2(&dsd, BIF_P->common.id, BIF_ARG_1, BIF_ARG_2); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } else if (is_not_internal_pid(BIF_ARG_1)) { @@ -1964,7 +1977,7 @@ ebif_bang_2(BIF_ALIST_2) * Send a message to Process, Port or Registered Process. * Returns non-negative reduction bump or negative result code. */ -#define SEND_TRAP (-1) +#define SEND_NOCONNECT (-1) #define SEND_YIELD (-2) #define SEND_YIELD_RETURN (-3) #define SEND_BADARG (-4) @@ -1980,20 +1993,22 @@ static Sint remote_send(Process *p, DistEntry *dep, { Sint res; int code; - ASSERT(is_atom(to) || is_external_pid(to)); ctx->dep = dep; - code = erts_dsig_prepare(&ctx->dsd, dep, p, ERTS_DSP_NO_LOCK, !ctx->suspend); + code = erts_dsig_prepare(&ctx->dsd, dep, p, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, + !ctx->suspend, ctx->connect); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: - res = SEND_TRAP; + res = SEND_NOCONNECT; break; case ERTS_DSIG_PREP_WOULD_SUSPEND: ASSERT(!ctx->suspend); res = SEND_YIELD; break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: { if (is_atom(to)) @@ -2170,6 +2185,7 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) } return ret_val; } else if (is_tuple(to)) { /* Remote send */ + int deref_dep = 0; int ret; tp = tuple_val(to); if (*tp != make_arityval(2)) @@ -2177,11 +2193,10 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) if (is_not_atom(tp[1]) || is_not_atom(tp[2])) return SEND_BADARG; - /* sysname_to_connected_dist_entry will return NULL if there - is no dist_entry or the dist_entry has no port, + /* erts_find_dist_entry will return NULL if there is no dist_entry but remote_send() will handle that. */ - dep = erts_sysname_to_connected_dist_entry(tp[2]); + dep = erts_find_dist_entry(tp[2]); if (dep == erts_this_dist_entry) { Eterm id; @@ -2205,13 +2220,20 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) } return 0; } + if (dep == NULL) { + dep = erts_find_or_insert_dist_entry(tp[2]); + ASSERT(dep != erts_this_dist_entry); + deref_dep = 1; + } + ctx->dsd.node = tp[2]; ret = remote_send(p, dep, tp[1], to, msg, ctx); if (ret == SEND_YIELD_CONTINUE) { - if (dep) - erts_ref_dist_entry(dep); - ctx->dep_to_deref = dep; + erts_ref_dist_entry(ctx->dep); + ctx->deref_dep = 1; } + if (deref_dep) + erts_deref_dist_entry(dep); return ret; } else { if (IS_TRACED_FL(p, F_TRACE_SEND)) @@ -2251,7 +2273,6 @@ BIF_RETTYPE send_3(BIF_ALIST_3) Eterm msg = BIF_ARG_2; Eterm opts = BIF_ARG_3; - int connect = !0; Eterm l = opts; Sint result; @@ -2262,14 +2283,15 @@ BIF_RETTYPE send_3(BIF_ALIST_3) UseTmpHeap(sizeof(ErtsSendContext)/sizeof(Eterm), BIF_P); ctx->suspend = !0; - ctx->dep_to_deref = NULL; + ctx->connect = !0; + ctx->deref_dep = 0; ctx->return_term = am_ok; ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; while (is_list(l)) { if (CAR(list_val(l)) == am_noconnect) { - connect = 0; + ctx->connect = 0; } else if (CAR(list_val(l)) == am_nosuspend) { ctx->suspend = 0; } else { @@ -2306,9 +2328,9 @@ BIF_RETTYPE send_3(BIF_ALIST_3) goto yield_return; ERTS_BIF_PREP_RET(retval, am_ok); break; - case SEND_TRAP: - if (connect) { - ERTS_BIF_PREP_TRAP3(retval, dsend3_trap, p, to, msg, opts); + case SEND_NOCONNECT: + if (ctx->connect) { + ERTS_BIF_PREP_RET(retval, am_ok); } else { ERTS_BIF_PREP_RET(retval, am_noconnect); } @@ -2412,7 +2434,8 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) ref = NIL; #endif ctx->suspend = !0; - ctx->dep_to_deref = NULL; + ctx->connect = !0; + ctx->deref_dep = 0; ctx->return_term = msg; ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; @@ -2436,8 +2459,8 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) goto yield_return; ERTS_BIF_PREP_RET(retval, msg); break; - case SEND_TRAP: - ERTS_BIF_PREP_TRAP2(retval, dsend2_trap, p, to, msg); + case SEND_NOCONNECT: + ERTS_BIF_PREP_RET(retval, msg); break; case SEND_YIELD: ERTS_BIF_PREP_YIELD2(retval, bif_export[BIF_send_2], p, to, msg); @@ -4200,7 +4223,7 @@ BIF_RETTYPE list_to_port_1(BIF_ALIST_1) cp += 6; /* strlen("#Port<") */ - if (sscanf(cp, "%u.%u>", &n, &p) < 2) + if (sscanf(cp, "%u.%u>", (unsigned int*)&n, (unsigned int*)&p) < 2) goto bad; if (p > ERTS_MAX_PORT_NUMBER) @@ -4384,23 +4407,24 @@ BIF_RETTYPE group_leader_2(BIF_ALIST_2) int code; ErtsDSigData dsd; dep = external_pid_dist_entry(BIF_ARG_2); + ERTS_ASSERT(dep); if(dep == erts_this_dist_entry) BIF_ERROR(BIF_P, BADARG); - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: - BIF_RET(am_true); case ERTS_DSIG_PREP_NOT_CONNECTED: - BIF_TRAP2(dgroup_leader_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); + BIF_RET(am_true); + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: code = erts_dsig_send_group_leader(&dsd, BIF_ARG_1, BIF_ARG_2); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } else if (is_internal_pid(BIF_ARG_2)) { diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 5d5ea3cdc4..1bd4acbf95 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -153,14 +153,12 @@ bif erlang:whereis/1 bif erlang:spawn_opt/1 bif erlang:setnode/2 bif erlang:setnode/3 -bif erlang:dist_exit/3 bif erlang:dist_get_stat/1 bif erlang:dist_ctrl_input_handler/2 bif erlang:dist_ctrl_put_data/2 bif erlang:dist_ctrl_get_data/1 bif erlang:dist_ctrl_get_data_notification/1 - # Static native functions in erts_internal bif erts_internal:port_info/1 bif erts_internal:port_info/2 @@ -693,4 +691,6 @@ bif erlang:iolist_to_iovec/1 # New in 21.0 # +bif erts_internal:new_connection/1 +bif erts_internal:abort_connection/2 bif erts_internal:map_next/3 diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c index 35b2365655..d9ee940662 100644 --- a/erts/emulator/beam/break.c +++ b/erts/emulator/beam/break.c @@ -58,6 +58,8 @@ static void dump_attributes(fmtfn_t to, void *to_arg, byte* ptr, int size); extern char* erts_system_version[]; +#define WRITE_BUFFER_SIZE (64*1024) + static void port_info(fmtfn_t to, void *to_arg) { @@ -673,18 +675,28 @@ bin_check(void) static Sint64 crash_dump_limit = ERTS_SINT64_MAX; static Sint64 crash_dump_written = 0; -static int crash_dump_limited_writer(void* vfdp, char* buf, size_t len) +typedef struct LimitedWriterInfo_ { + fmtfn_t to; + void* to_arg; +} LimitedWriterInfo; + +static int +crash_dump_limited_writer(void* vfdp, char* buf, size_t len) { const char stop_msg[] = "\n=abort:CRASH DUMP SIZE LIMIT REACHED\n"; + LimitedWriterInfo* lwi = (LimitedWriterInfo *) vfdp; crash_dump_written += len; if (crash_dump_written <= crash_dump_limit) { - return erts_write_fd(vfdp, buf, len); + return lwi->to(lwi->to_arg, buf, len); } len -= (crash_dump_written - crash_dump_limit); - erts_write_fd(vfdp, buf, len); - erts_write_fd(vfdp, (char*)stop_msg, sizeof(stop_msg)-1); + lwi->to(lwi->to_arg, buf, len); + lwi->to(lwi->to_arg, (char*)stop_msg, sizeof(stop_msg)-1); + if (lwi->to == &erts_write_fp) { + fclose((FILE *) lwi->to_arg); + } /* We assume that crash dump was called from erts_exit_vv() */ erts_exit_epilogue(); @@ -707,6 +719,9 @@ erl_crash_dump_v(char *file, int line, char* fmt, va_list args) int i; fmtfn_t to = &erts_write_fd; void* to_arg; + FILE* fp = 0; + LimitedWriterInfo lwi; + static char* write_buffer; /* 'static' to avoid a leak warning in valgrind */ if (ERTS_SOMEONE_IS_CRASH_DUMPING) return; @@ -810,9 +825,30 @@ erl_crash_dump_v(char *file, int line, char* fmt, va_list args) fd = open(dumpname,O_WRONLY | O_CREAT | O_TRUNC,0640); if (fd < 0) return; /* Can't create the crash dump, skip it */ - to_arg = (void*)&fd; + + /* + * Wrap into a FILE* so that we can use buffered output. Set an + * explicit buffer to make sure the first write does not fail because + * of a failure to allocate a buffer. + */ + write_buffer = (char *) erts_alloc_fnf(ERTS_ALC_T_TMP, WRITE_BUFFER_SIZE); + if (write_buffer && (fp = fdopen(fd, "w")) != NULL) { + setvbuf(fp, write_buffer, _IOFBF, WRITE_BUFFER_SIZE); + lwi.to = &erts_write_fp; + lwi.to_arg = (void*)fp; + } else { + lwi.to = &erts_write_fd; + lwi.to_arg = (void*)&fd; + } + if (to == &crash_dump_limited_writer) { + to_arg = (void *) &lwi; + } else { + to = lwi.to; + to_arg = lwi.to_arg; + } + time(&now); - erts_cbprintf(to, to_arg, "=erl_crash_dump:0.3\n%s", ctime(&now)); + erts_cbprintf(to, to_arg, "=erl_crash_dump:0.4\n%s", ctime(&now)); if (file != NULL) erts_cbprintf(to, to_arg, "The error occurred in file %s, line %d\n", file, line); @@ -916,6 +952,9 @@ erl_crash_dump_v(char *file, int line, char* fmt, va_list args) } erts_cbprintf(to, to_arg, "=end\n"); + if (fp) { + fclose(fp); + } close(fd); erts_fprintf(stderr,"done\n"); } diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index bc168fc58d..8593ad867a 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -103,21 +103,12 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) -#define PASS_THROUGH 'p' /* This code should go */ - int erts_is_alive; /* System must be blocked on change */ int erts_dist_buf_busy_limit; /* distribution trap functions */ -Export* dsend2_trap = NULL; -Export* dsend3_trap = NULL; -/*Export* dsend_nosuspend_trap = NULL;*/ -Export* dlink_trap = NULL; -Export* dunlink_trap = NULL; Export* dmonitor_node_trap = NULL; -Export* dgroup_leader_trap = NULL; -Export* dexit_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ @@ -129,6 +120,7 @@ static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); +static Sint abort_connection(DistEntry* dep, Uint32 conn_id); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -383,22 +375,24 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) if (!rp) goto done; erts_proc_lock(rp, rp_locks); - rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name); - if (rlnk != NULL) { - ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE)); - erts_destroy_link(rlnk); - } - n = ERTS_LINK_REFC(lnk); - for (i = 0; i < n; ++i) { - Eterm tup; - Eterm *hp; - ErtsMessage *msgp; - - msgp = erts_alloc_message_heap(rp, &rp_locks, - 3, &hp, &ohp); - tup = TUPLE2(hp, am_nodedown, name); - erts_queue_message(rp, rp_locks, msgp, tup, am_system); - } + if (!ERTS_PROC_IS_EXITING(rp)) { + rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name); + if (rlnk != NULL) { + ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE)); + erts_destroy_link(rlnk); + } + n = ERTS_LINK_REFC(lnk); + for (i = 0; i < n; ++i) { + Eterm tup; + Eterm *hp; + ErtsMessage *msgp; + + msgp = erts_alloc_message_heap(rp, &rp_locks, + 3, &hp, &ohp); + tup = TUPLE2(hp, am_nodedown, name); + erts_queue_message(rp, rp_locks, msgp, tup, am_system); + } + } erts_proc_unlock(rp, rp_locks); } done: @@ -484,41 +478,55 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ DistEntry *tdep; - int no_dist_ctrl = 0; + int no_dist_ctrl; + int no_pending; Eterm nd_reason = (reason == am_no_network ? am_no_network : am_net_kernel_terminated); + int i = 0; + Eterm *dist_ctrl; + DistEntry** pending; + + ERTS_UNDEF(dist_ctrl, NULL); + ERTS_UNDEF(pending, NULL); + erts_rwmtx_rlock(&erts_dist_table_rwmtx); - for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) - no_dist_ctrl++; - for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) - no_dist_ctrl++; + no_dist_ctrl = (erts_no_of_hidden_dist_entries + + erts_no_of_visible_dist_entries); + no_pending = erts_no_of_pending_dist_entries; /* KILL all port controllers */ - if (no_dist_ctrl == 0) - erts_rwmtx_runlock(&erts_dist_table_rwmtx); - else { - Eterm def_buf[128]; - int i = 0; - Eterm *dist_ctrl; - - if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0])) - dist_ctrl = &def_buf[0]; - else - dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*no_dist_ctrl); + if (no_dist_ctrl) { + dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, + sizeof(Eterm)*no_dist_ctrl); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); dist_ctrl[i++] = tdep->cid; } for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); dist_ctrl[i++] = tdep->cid; } - erts_rwmtx_runlock(&erts_dist_table_rwmtx); + ASSERT(i == no_dist_ctrl); + } + if (no_pending) { + pending = erts_alloc(ERTS_ALC_T_TMP, sizeof(DistEntry*)*no_pending); + i = 0; + for (tdep = erts_pending_dist_entries; tdep; tdep = tdep->next) { + ASSERT(is_nil(tdep->cid)); + ASSERT(i < no_pending); + pending[i++] = tdep; + erts_ref_dist_entry(tdep); + } + ASSERT(i == no_pending); + } + erts_rwmtx_runlock(&erts_dist_table_rwmtx); - for (i = 0; i < no_dist_ctrl; i++) { + if (no_dist_ctrl) { + for (i = 0; i < no_dist_ctrl; i++) { if (is_internal_pid(dist_ctrl[i])) schedule_kill_dist_ctrl_proc(dist_ctrl[i]); else { @@ -532,11 +540,18 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) prt, dist_ctrl[i], nd_reason, NULL); } } - } + } + erts_free(ERTS_ALC_T_TMP, dist_ctrl); + } + + if (no_pending) { + for (i = 0; i < no_pending; i++) { + abort_connection(pending[i], pending[i]->connection_id); + erts_deref_dist_entry(pending[i]); + } + erts_free(ERTS_ALC_T_TMP, pending); + } - if (dist_ctrl != &def_buf[0]) - erts_free(ERTS_ALC_T_TMP, dist_ctrl); - } /* * When last dist ctrl exits, node will be taken @@ -613,7 +628,6 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) reason == am_normal ? am_connection_closed : reason); clear_dist_entry(dep); - } dec_no_nodes(); @@ -638,14 +652,7 @@ void init_dist(void) erts_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ - dsend2_trap = trap_function(am_dsend,2); - dsend3_trap = trap_function(am_dsend,3); - /* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/ - dlink_trap = trap_function(am_dlink,1); - dunlink_trap = trap_function(am_dunlink,1); dmonitor_node_trap = trap_function(am_dmonitor_node,3); - dgroup_leader_trap = trap_function(am_dgroup_leader,2); - dexit_trap = trap_function(am_dexit, 2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, am_dist_ctrl_put_data, @@ -664,6 +671,7 @@ alloc_dist_obuf(Uint size) obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0]; #ifdef DEBUG obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; + obuf->alloc_endp = obuf->data + size; ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); #endif return obuf; @@ -684,31 +692,11 @@ size_obuf(ErtsDistOutputBuf *obuf) return bin->orig_size; } -static void clear_dist_entry(DistEntry *dep) +static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) { - Sint obufsize = 0; - ErtsAtomCache *cache; - ErtsProcList *suspendees; ErtsDistOutputBuf *obuf; - erts_de_rwlock(dep); - erts_atomic_set_nob(&dep->input_handler, - (erts_aint_t) NIL); - cache = dep->cache; - dep->cache = NULL; - -#ifdef DEBUG - erts_de_links_lock(dep); - ASSERT(!dep->nlinks); - ASSERT(!dep->node_links); - ASSERT(!dep->monitors); - erts_de_links_unlock(dep); -#endif - - erts_mtx_lock(&dep->qlock); - - erts_atomic64_set_nob(&dep->in, 0); - erts_atomic64_set_nob(&dep->out, 0); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; @@ -728,17 +716,13 @@ static void clear_dist_entry(DistEntry *dep) dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; - dep->status = 0; - suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); - erts_mtx_unlock(&dep->qlock); - erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); - dep->send = NULL; - erts_de_rwunlock(dep); - - erts_resume_processes(suspendees); + return obuf; +} - delete_cache(cache); +static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) +{ + Sint obufsize = 0; while (obuf) { ErtsDistOutputBuf *fobuf; @@ -750,13 +734,56 @@ static void clear_dist_entry(DistEntry *dep) if (obufsize) { erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); + ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); erts_mtx_unlock(&dep->qlock); } } +static void clear_dist_entry(DistEntry *dep) +{ + ErtsAtomCache *cache; + ErtsProcList *suspendees; + ErtsDistOutputBuf *obuf; + + erts_de_rwlock(dep); + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) NIL); + cache = dep->cache; + dep->cache = NULL; + +#ifdef DEBUG + erts_de_links_lock(dep); + ASSERT(!dep->nlinks); + ASSERT(!dep->node_links); + ASSERT(!dep->monitors); + erts_de_links_unlock(dep); +#endif + + erts_mtx_lock(&dep->qlock); + + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); + + obuf = clear_de_out_queues(dep); + dep->status = 0; + suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_de_rwunlock(dep); + + erts_resume_processes(suspendees); + + delete_cache(cache); + + free_de_out_queues(dep, obuf); + if (dep->transcode_ctx) + transcode_free_ctx(dep); +} + int erts_dsend_context_dtor(Binary* ctx_bin) { ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); @@ -772,8 +799,8 @@ int erts_dsend_context_dtor(Binary* ctx_bin) if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) { free_dist_obuf(ctx->dss.obuf); } - if (ctx->dep_to_deref) - erts_deref_dist_entry(ctx->dep_to_deref); + if (ctx->deref_dep) + erts_deref_dist_entry(ctx->dep); return 1; } @@ -1324,7 +1351,7 @@ int erts_net_message(Port *prt, /* This is tricky (we MUST force a distributed send) */ ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_exit(&dsd, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); @@ -1416,7 +1443,7 @@ int erts_net_message(Port *prt, if (!rp) { ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, am_noproc); @@ -1879,33 +1906,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) { ctx->acmp = erts_get_atom_cache_map(ctx->c_p); - ctx->pass_through_size = 0; + ctx->max_finalize_prepend = 0; } else { ctx->acmp = NULL; - ctx->pass_through_size = 1; + ctx->max_finalize_prepend = 3; } #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl); - if (is_value(ctx->msg)) - erts_fprintf(stderr, " MSG: %T\n", ctx->msg); + erts_fprintf(stderr, ">> CTL: %T\n", ctx->ctl); + if (is_value(ctx->msg)) + erts_fprintf(stderr, " MSG: %T\n", ctx->msg); #endif - ctx->data_size = ctx->pass_through_size; + ctx->data_size = ctx->max_finalize_prepend; erts_reset_atom_cache_map(ctx->acmp); erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size); - if (is_value(ctx->msg)) { - ctx->u.sc.wstack.wstart = NULL; - ctx->u.sc.flags = ctx->flags; - ctx->u.sc.level = 0; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; - } - break; + if (is_non_value(ctx->msg)) { + ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; + break; + } + ctx->u.sc.wstack.wstart = NULL; + ctx->u.sc.flags = ctx->flags; + ctx->u.sc.level = 0; + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; case ERTS_DSIG_SEND_PHASE_MSG_SIZE: if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { retval = ERTS_DSIG_SEND_CONTINUE; @@ -1920,23 +1946,24 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->data_size += ctx->dhdr_ext_size; ctx->obuf = alloc_dist_obuf(ctx->data_size); - ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size; + ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size; /* Encode internal version of dist header */ ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp); /* Encode control message */ erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); - if (is_value(ctx->msg)) { - ctx->u.ec.flags = ctx->flags; - ctx->u.ec.level = 0; - ctx->u.ec.wstack.wstart = NULL; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; - } - break; + if (is_non_value(ctx->msg)) { + ctx->obuf->msg_start = NULL; + ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; + break; + } + ctx->u.ec.flags = ctx->flags; + ctx->u.ec.level = 0; + ctx->u.ec.wstack.wstart = NULL; + ctx->obuf->msg_start = ctx->obuf->ext_endp; - case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; + case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) { retval = ERTS_DSIG_SEND_CONTINUE; goto done; @@ -1949,7 +1976,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) int resume = 0; ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp); - ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size); + ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend); ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size); ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; @@ -1961,9 +1988,9 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->obuf->next = NULL; erts_de_rlock(dep); cid = dep->cid; - if (cid != dsdp->cid - || dep->connection_id != dsdp->connection_id - || dep->status & ERTS_DE_SFLG_EXITING) { + if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED)) + || dep->status & ERTS_DE_SFLG_EXITING + || dep->connection_id != dsdp->connection_id) { /* Not the same connection as when we started; drop message... */ erts_de_runlock(dep); free_dist_obuf(ctx->obuf); @@ -2037,8 +2064,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } erts_mtx_unlock(&dep->qlock); - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); + if (!(dep->status & ERTS_DE_SFLG_PENDING)) { + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + } + else { + notify_proc = NIL; + } erts_de_runlock(dep); if (is_internal_pid(notify_proc)) notify_dist_data(ctx->c_p, notify_proc); @@ -2203,7 +2235,6 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) #endif #define ERTS_PORT_REDS_DIST_CMD_START 5 -#define ERTS_PORT_REDS_DIST_CMD_FINALIZE 3 #define ERTS_PORT_REDS_DIST_CMD_EXIT 200 #define ERTS_PORT_REDS_DIST_CMD_RESUMED 5 #define ERTS_PORT_REDS_DIST_CMD_DATA(SZ) \ @@ -2212,9 +2243,9 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__))) int -erts_dist_command(Port *prt, int reds_limit) +erts_dist_command(Port *prt, int initial_reds) { - Sint reds = ERTS_PORT_REDS_DIST_CMD_START; + Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START; Uint32 status; Uint32 flags; Sint qsize, obufsize = 0; @@ -2236,9 +2267,12 @@ erts_dist_command(Port *prt, int reds_limit) if (status & ERTS_DE_SFLG_EXITING) { erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); - return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; + reds -= ERTS_PORT_REDS_DIST_CMD_EXIT; + return initial_reds - reds; } + ASSERT(!(status & ERTS_DE_SFLG_PENDING)); + ASSERT(send); /* @@ -2263,7 +2297,7 @@ erts_dist_command(Port *prt, int reds_limit) sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - if (reds > reds_limit) + if (reds < 0) goto preempted; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { @@ -2278,13 +2312,13 @@ erts_dist_command(Port *prt, int reds_limit) erts_fprintf(stderr, ">> "); bw(foq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = foq.first; - obufsize += size_obuf(fob); - foq.first = foq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = foq.first; + obufsize += size_obuf(fob); + foq.first = foq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds < 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) break; } while (foq.first && !preempt); @@ -2297,81 +2331,71 @@ erts_dist_command(Port *prt, int reds_limit) if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) { if (oq.first) { ErtsDistOutputBuf *ob; - int preempt; + ErtsDistOutputBuf *last_finalized = NULL; finalize_only: - preempt = 0; ob = oq.first; ASSERT(ob); do { - ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, - dep->cache, - flags); - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - preempt = reds > reds_limit; - if (preempt) - break; - ob = ob->next; + reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds); + if (reds >= 0) { + last_finalized = ob; + ob = ob->next; + } } while (ob); - /* - * At least one buffer was finalized; if we got preempted, - * ob points to the last buffer that we finalized. - */ - if (foq.last) - foq.last->next = oq.first; - else - foq.first = oq.first; - if (!preempt) { - /* All buffers finalized */ - foq.last = oq.last; - oq.first = oq.last = NULL; - } - else { - /* Not all buffers finalized; split oq. */ - foq.last = ob; - oq.first = ob->next; - if (oq.first) - ob->next = NULL; - else - oq.last = NULL; - } - if (preempt) - goto preempted; + if (last_finalized) { + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the next buffer to continue finalize. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + foq.last = last_finalized; + if (!ob) { + /* All buffers finalized */ + ASSERT(foq.last == oq.last); + ASSERT(foq.last->next == NULL); + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + ASSERT(foq.last->next == ob); + foq.last->next = NULL; + oq.first = ob; + } + } + if (reds <= 0) + goto preempted; } } else { int de_busy; int preempt = 0; while (oq.first && !preempt) { - ErtsDistOutputBuf *fob; - Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp < oq.first->ext_endp); - size = (*send)(prt, oq.first); + ErtsDistOutputBuf *fob; + Uint size; + reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds); + if (reds < 0) { + preempt = 1; + break; + } + ASSERT(&oq.first->data[0] <= oq.first->extp + && oq.first->extp < oq.first->ext_endp); + size = (*send)(prt, oq.first); erts_atomic64_inc_nob(&dep->out); - esdp->io.out += (Uint64) size; + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = oq.first; - obufsize += size_obuf(fob); - oq.first = oq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = oq.first; + obufsize += size_obuf(fob); + oq.first = oq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds <= 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) goto finalize_only; } @@ -2411,7 +2435,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } else erts_mtx_unlock(&dep->qlock); @@ -2434,8 +2458,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); } - ASSERT(foq.first || !foq.last); - ASSERT(!foq.first || foq.last); + ASSERT(!!foq.first == !!foq.last); ASSERT(!dep->finalized_out_queue.first); ASSERT(!dep->finalized_out_queue.last); @@ -2445,10 +2468,10 @@ erts_dist_command(Port *prt, int reds_limit) } /* Avoid wrapping reduction counter... */ - if (reds > INT_MAX/2) - reds = INT_MAX/2; + if (reds < INT_MIN/2) + reds = INT_MIN/2; - return reds; + return initial_reds - reds; preempted: /* @@ -2456,8 +2479,7 @@ erts_dist_command(Port *prt, int reds_limit) * since last call to driver. */ - ASSERT(oq.first || !oq.last); - ASSERT(!oq.first || oq.last); + ASSERT(!!oq.first == !!oq.last); if (sched_flags & ERTS_PTS_FLG_EXIT) { /* @@ -2481,12 +2503,6 @@ erts_dist_command(Port *prt, int reds_limit) foq.first = NULL; foq.last = NULL; - -#ifdef DEBUG - erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); - erts_mtx_unlock(&dep->qlock); -#endif } else { if (oq.first) { @@ -2776,7 +2792,8 @@ BIF_RETTYPE dist_ctrl_get_data_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); - int reds = 1; + const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P); + Sint reds = initial_reds; ErtsDistOutputBuf *obuf; Eterm *hp; ProcBin *pb; @@ -2807,6 +2824,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) { if (!dep->tmp_out_queue.first) { ASSERT(!dep->tmp_out_queue.last); + ASSERT(!dep->transcode_ctx); qsize = erts_atomic_read_acqb(&dep->qsize); if (qsize > 0) { erts_mtx_lock(&dep->qlock); @@ -2824,21 +2842,18 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_de_runlock(dep); BIF_RET(am_none); } - else { - obuf = dep->tmp_out_queue.first; - dep->tmp_out_queue.first = obuf->next; - if (!obuf->next) - dep->tmp_out_queue.last = NULL; + + obuf = dep->tmp_out_queue.first; + reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds); + if (reds < 0) { + erts_de_runlock(dep); + ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1], + BIF_P, BIF_ARG_1); } - obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp, - dep->cache, - dep->flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */ - ASSERT(&obuf->data[0] <= obuf->extp - && obuf->extp < obuf->ext_endp); + dep->tmp_out_queue.first = obuf->next; + if (!obuf->next) + dep->tmp_out_queue.last = NULL; } erts_atomic64_inc_nob(&dep->out); @@ -2866,11 +2881,11 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_mtx_unlock(&dep->qlock); if (resume_procs) { int resumed = erts_resume_processes(resume_procs); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } } - BIF_RET2(make_binary(pb), reds); + BIF_RET2(make_binary(pb), (initial_reds - reds)); } void @@ -2892,24 +2907,31 @@ erts_dist_port_not_busy(Port *prt) erts_schedule_dist_command(prt, NULL); } +static void kill_connection(DistEntry *dep) +{ + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + ASSERT(dep->status == ERTS_DE_SFLG_CONNECTED); + + dep->status |= ERTS_DE_SFLG_EXITING; + erts_mtx_lock(&dep->qlock); + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); + erts_mtx_unlock(&dep->qlock); + + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + else if (is_internal_pid(dep->cid)) + schedule_kill_dist_ctrl_proc(dep->cid); +} + void erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { erts_de_rwlock(dep); if (connection_id == dep->connection_id - && !(dep->status & ERTS_DE_SFLG_EXITING)) { - - dep->status |= ERTS_DE_SFLG_EXITING; - - erts_mtx_lock(&dep->qlock); - ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); - erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); - erts_mtx_unlock(&dep->qlock); + && dep->status == ERTS_DE_SFLG_CONNECTED) { - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); - else if (is_internal_pid(dep->cid)) - schedule_kill_dist_ctrl_proc(dep->cid); + kill_connection(dep); } erts_de_rwunlock(dep); } @@ -3069,6 +3091,10 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ info_dist_entry(to, arg, dep, 0, 1); } + for (dep = erts_pending_dist_entries; dep; dep = dep->next) { + info_dist_entry(to, arg, dep, 0, 0); + } + for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { info_dist_entry(to, arg, dep, 0, 0); @@ -3091,7 +3117,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ monitor_node -- turn on/off node monitoring node controller only: - dist_exit/3 -- send exit signals from remote to local process dist_link/2 -- link a remote process to a local dist_unlink/2 -- unlink a remote from a local ****************************************************************************/ @@ -3101,15 +3126,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ /********************************************************************** ** Set the node name of current node fail if node already is set. ** setnode(name@host, Creation) - ** loads functions pointer to trap_functions from module erlang. - ** erlang:dsend/2 - ** erlang:dlink/1 - ** erlang:dunlink/1 - ** erlang:dmonitor_node/3 - ** erlang:dgroup_leader/2 - ** erlang:dexit/2 - ** -- are these needed ? - ** dexit/1 ***********************************************************************/ BIF_RETTYPE setnode_2(BIF_ALIST_2) @@ -3133,15 +3149,8 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; /* Check that all trap functions are defined !! */ - if (dsend2_trap->addressv[0] == NULL || - dsend3_trap->addressv[0] == NULL || - /* dsend_nosuspend_trap->address == NULL ||*/ - dlink_trap->addressv[0] == NULL || - dunlink_trap->addressv[0] == NULL || - dmonitor_node_trap->addressv[0] == NULL || - dgroup_leader_trap->addressv[0] == NULL || - dmonitor_p_trap->addressv[0] == NULL || - dexit_trap->addressv[0] == NULL) { + if (dmonitor_node_trap->addressv[0] == NULL || + dmonitor_p_trap->addressv[0] == NULL) { goto error; } @@ -3218,6 +3227,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) ErtsProcLocks proc_unlock = 0; Process *proc; Port *pp = NULL; + Eterm notify_proc; + erts_aint32_t qflgs; /* * Check and pick out arguments @@ -3245,21 +3256,25 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if (!is_atom(ic) || !is_atom(oc)) goto badarg; - /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */ - if (!(DFLAG_EXTENDED_REFERENCES & flags)) { + if (~flags & DFLAG_DIST_MANDATORY) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T", BIF_P->common.id); if (BIF_P->common.u.alive.reg) erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name); erts_dsprintf(dsbufp, " attempted to enable connection to node %T " - "which is not able to handle extended references.\n", + "which does not support all mandatory capabilities.\n", BIF_ARG_1); erts_send_error_to_logger(BIF_P->group_leader, dsbufp); goto badarg; } /* + * ToDo: Should we not pass connection_id as well + * to make sure it's the right connection we commit. + */ + + /* * Arguments seem to be in order. */ @@ -3302,6 +3317,23 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto badarg; } + if (dep->status & ERTS_DE_SFLG_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_mtx_lock(&dep->qlock); + erts_proclist_store_last(&dep->suspended, plp); + erts_mtx_unlock(&dep->qlock); + goto yield; + } + if (dep->status != ERTS_DE_SFLG_PENDING) { + if (dep->status == 0) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } + if (is_not_nil(dep->cid)) goto badarg; @@ -3344,8 +3376,12 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_mtx_unlock(&dep->qlock); goto yield; } - - ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + if (dep->status != ERTS_DE_SFLG_PENDING) { + if (dep->status == 0) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } if (pp->dist_entry || is_not_nil(dep->cid)) goto badarg; @@ -3376,7 +3412,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) dep->creation = 0; #ifdef DEBUG - ASSERT(erts_atomic_read_nob(&dep->qsize) == 0); + ASSERT(erts_atomic_read_nob(&dep->qsize) == 0 + || (dep->status & ERTS_DE_SFLG_PENDING)); #endif if (flags & DFLAG_DIST_HDR_ATOM_CACHE) @@ -3384,7 +3421,26 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + notify_proc = NIL; + if (erts_atomic_read_nob(&dep->qsize)) { + if (is_internal_port(dep->cid)) { + erts_schedule_dist_command(NULL, dep); + } + else { + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + } + } + } erts_de_rwunlock(dep); + if (is_internal_pid(notify_proc)) + notify_dist_data(BIF_P, notify_proc); ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); @@ -3426,79 +3482,168 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; } +BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) +{ + DistEntry* dep; + Uint32 conn_id; + Eterm* hp; + Eterm dhandle; + + if (is_not_atom(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_find_or_insert_dist_entry(BIF_ARG_1); -/**********************************************************************/ -/* dist_exit(Local, Term, Remote) -> Bool */ + if (dep == erts_this_dist_entry) { + erts_deref_dist_entry(dep); + BIF_ERROR(BIF_P, BADARG); + } -BIF_RETTYPE dist_exit_3(BIF_ALIST_3) + erts_de_rwlock(dep); + + if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING) + conn_id = dep->connection_id; + else if (dep->status == 0) { + erts_set_dist_entry_pending(dep); + conn_id = dep->connection_id; + } + else { + ASSERT(dep->status & ERTS_DE_SFLG_EXITING); + conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + } + erts_de_rwunlock(dep); + hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); + dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep); + erts_deref_dist_entry(dep); + BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); +} + +static Sint abort_connection(DistEntry* dep, Uint32 conn_id) { - Eterm local; - Eterm remote; - DistEntry *rdep; + erts_de_rwlock(dep); - local = BIF_ARG_1; - remote = BIF_ARG_3; + if (dep->connection_id != conn_id) + ; + else if (dep->status == ERTS_DE_SFLG_CONNECTED) { + kill_connection(dep); + } + else if (dep->status == ERTS_DE_SFLG_PENDING) { + NetExitsContext nec = {dep}; + ErtsLink *nlinks; + ErtsLink *node_links; + ErtsMonitor *monitors; + ErtsAtomCache *cache; + ErtsDistOutputBuf *obuf; + ErtsProcList *resume_procs; + Sint reds = 0; + + ASSERT(is_nil(dep->cid)); - /* Check that remote is a remote process */ - if (is_not_external_pid(remote)) - goto error; + erts_de_links_lock(dep); + monitors = dep->monitors; + nlinks = dep->nlinks; + node_links = dep->node_links; + dep->monitors = NULL; + dep->nlinks = NULL; + dep->node_links = NULL; + erts_de_links_unlock(dep); - rdep = external_dist_entry(remote); - - if(rdep == erts_this_dist_entry) - goto error; + cache = dep->cache; + dep->cache = NULL; + erts_mtx_lock(&dep->qlock); + obuf = dep->out_queue.first; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + ASSERT(!dep->tmp_out_queue.first); + ASSERT(!dep->finalized_out_queue.first); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; - /* Check that local is local */ - if (is_internal_pid(local)) { - Process *lp; - ErtsProcLocks lp_locks; - if (BIF_P->common.id == local) { - lp_locks = ERTS_PROC_LOCKS_ALL; - lp = BIF_P; - erts_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR); - } - else { - lp_locks = ERTS_PROC_LOCKS_XSIG_SEND; - lp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, - local, lp_locks); - if (!lp) { - BIF_RET(am_true); /* ignore */ - } - } - - (void) erts_send_exit_signal(BIF_P, - remote, - lp, - &lp_locks, - BIF_ARG_2, - NIL, - NULL, - 0); - if (lp == BIF_P) - lp_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_proc_unlock(lp, lp_locks); - if (lp == BIF_P) { - erts_aint32_t state = erts_atomic32_read_acqb(&BIF_P->state); - /* - * We may have exited current process and may have to take action. - */ - if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { - if (state & ERTS_PSFLG_PENDING_EXIT) - erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); - ERTS_BIF_EXITED(BIF_P); - } - } + erts_set_dist_entry_not_connected(dep); + + erts_de_rwunlock(dep); + + erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec); + erts_sweep_links(nlinks, &doit_link_net_exits, &nec); + erts_sweep_links(node_links, &doit_node_link_net_exits, &nec); + + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + + delete_cache(cache); + free_de_out_queues(dep, obuf); + return reds; } - else if (is_external_pid(local) - && external_dist_entry(local) == erts_this_dist_entry) { - BIF_RET(am_true); /* ignore */ + erts_de_rwunlock(dep); + return 0; +} + +BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2) +{ + DistEntry* dep; + Eterm* tp; + + if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { + BIF_ERROR(BIF_P, BADARG); + } + tp = tuple_val(BIF_ARG_2); + dep = erts_dhandle_to_dist_entry(tp[2]); + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1) + || dep == erts_this_dist_entry) { + BIF_ERROR(BIF_P, BADARG); + } + + if (dep) { + Sint reds = abort_connection(dep, unsigned_val(tp[1])); + BUMP_REDS(BIF_P, reds); } - else - goto error; BIF_RET(am_true); +} - error: - BIF_ERROR(BIF_P, BADARG); +int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) +{ + erts_de_rwlock(dep); + if (dep->status != 0) { + erts_de_rwunlock(dep); + } + else { + Process* net_kernel; + ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *mp; + Eterm msg, dhandle; + Uint32 conn_id; + + erts_set_dist_entry_pending(dep); + conn_id = dep->connection_id; + erts_de_rwunlock(dep); + + net_kernel = erts_whereis_process(proc, proc_locks, + am_net_kernel, nk_locks, 0); + if (!net_kernel) { + abort_connection(dep, conn_id); + return 0; + } + + /* + * Send {auto_connect, Node, ConnId, DHandle} to net_kernel + */ + mp = erts_alloc_message_heap(net_kernel, &nk_locks, + 5 + ERTS_MAGIC_REF_THING_SIZE, + &hp, &ohp); + dhandle = erts_build_dhandle(&hp, ohp, dep); + msg = TUPLE4(hp, am_auto_connect, dep->sysname, make_small(conn_id), + dhandle); + erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); + erts_proc_unlock(net_kernel, nk_locks); + } + + return 1; } /**********************************************************************/ @@ -3574,9 +3719,11 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) ASSERT(erts_no_of_not_connected_dist_entries > 0); ASSERT(erts_no_of_hidden_dist_entries >= 0); + ASSERT(erts_no_of_pending_dist_entries >= 0); ASSERT(erts_no_of_visible_dist_entries >= 0); if(not_connected) - length += (erts_no_of_not_connected_dist_entries - 1); + length += ((erts_no_of_not_connected_dist_entries - 1) + + erts_no_of_pending_dist_entries); if(hidden) length += erts_no_of_hidden_dist_entries; if(visible) @@ -3596,13 +3743,18 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) #ifdef DEBUG endp = hp + length*2; #endif - if(not_connected) + if(not_connected) { for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { result = CONS(hp, dep->sysname, result); hp += 2; } + } + for(dep = erts_pending_dist_entries; dep; dep = dep->next) { + result = CONS(hp, dep->sysname, result); + hp += 2; } + } if(hidden) for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { result = CONS(hp, dep->sysname, result); @@ -3647,11 +3799,17 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) DistEntry *dep; ErtsLink *lnk; Eterm l; + int async_connect = 1; for (l = Options; l != NIL && is_list(l); l = CDR(list_val(l))) { Eterm t = CAR(list_val(l)); - /* allow_passive_connect the only available option right now */ - if (t != am_allow_passive_connect) { + if (t == am_allow_passive_connect) { + /* + * Handle this horrible feature by falling back on old synchronous + * auto-connect (if needed) + */ + async_connect = 0; + } else { BIF_ERROR(p, BADARG); } } @@ -3665,50 +3823,90 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) && (Node != erts_this_node->sysname))) { BIF_ERROR(p, BADARG); } - dep = erts_sysname_to_connected_dist_entry(Node); - if (!dep) { - do_trap: - BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); - } - if (dep == erts_this_dist_entry) - goto done; - - erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - erts_de_rlock(dep); - if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); - erts_de_runlock(dep); - goto do_trap; - } - erts_de_links_lock(dep); - erts_de_runlock(dep); if (Bool == am_true) { - ASSERT(dep->cid != NIL); - lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, - p->common.id); - ++ERTS_LINK_REFC(lnk); - lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); - ++ERTS_LINK_REFC(lnk); - } - else { - lnk = erts_lookup_link(dep->node_links, p->common.id); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&(dep->node_links), - p->common.id)); - } - } - lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), - Node)); + ErtsDSigData dsd; + dsd.node = Node; + dep = erts_find_or_insert_dist_entry(Node); + if (dep == erts_this_dist_entry) + goto done; + + erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + + switch (erts_dsig_prepare(&dsd, dep, p, + (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, async_connect)) { + case ERTS_DSIG_PREP_NOT_ALIVE: + case ERTS_DSIG_PREP_NOT_CONNECTED: + /* Trap to either send 'nodedown' or do passive connection attempt */ + trap: + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_deref_dist_entry(dep); + BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); + case ERTS_DSIG_PREP_PENDING: + if (!async_connect) { + /* + * Pending connection may fail, so we must trap + * to ensure passive connection attempt + */ + erts_de_runlock(dep); + goto trap; } - } + /*fall through*/ + case ERTS_DSIG_PREP_CONNECTED: + erts_de_links_lock(dep); + erts_de_runlock(dep); + lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, + p->common.id); + ++ERTS_LINK_REFC(lnk); + lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); + ++ERTS_LINK_REFC(lnk); + erts_de_links_unlock(dep); + break; + default: + ERTS_ASSERT(! "Invalid dsig prepare result"); + } + erts_deref_dist_entry(dep); + } + else { /* Bool == false */ + dep = erts_sysname_to_connected_dist_entry(Node); + if (!dep) { + /* + * Before OTP-21 this case triggered auto-connect + * and a 'nodedown' message if that failed. + * Now it's a simple no-op which feels more reasonable. + */ + BIF_RET(am_true); + } + if (dep == erts_this_dist_entry) + goto done; + + erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + erts_de_rlock(dep); + if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED))) { + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_de_runlock(dep); + goto done; + } + erts_de_links_lock(dep); + erts_de_runlock(dep); + lnk = erts_lookup_link(dep->node_links, p->common.id); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&(dep->node_links), + p->common.id)); + } + } + lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), + Node)); + } + } + erts_de_links_unlock(dep); } - erts_de_links_unlock(dep); erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); done: diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index d4765c50b8..a96e39be89 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -45,6 +45,18 @@ #define DFLAG_MAP_TAG 0x20000 #define DFLAG_BIG_CREATION 0x40000 #define DFLAG_SEND_SENDER 0x80000 +#define DFLAG_NO_MAGIC 0x100000 + +/* Mandatory flags for distribution (sync with dist_util.erl) */ +#define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \ + | DFLAG_EXTENDED_PIDS_PORTS \ + | DFLAG_UTF8_ATOMS \ + | DFLAG_NEW_FUN_TAGS) + +/* Additional optimistic flags when encoding toward pending connection */ +#define DFLAG_DIST_HOPEFULLY (DFLAG_NO_MAGIC \ + | DFLAG_EXPORT_PTR_TAG \ + | DFLAG_BIT_BINARIES) /* All flags that should be enabled when term_to_binary/1 is used. */ #define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \ @@ -79,25 +91,19 @@ #define DOP_SEND_SENDER_TT 23 /* distribution trap functions */ -extern Export* dsend2_trap; -extern Export* dsend3_trap; -extern Export* dlink_trap; -extern Export* dunlink_trap; extern Export* dmonitor_node_trap; -extern Export* dgroup_leader_trap; -extern Export* dexit_trap; extern Export* dmonitor_p_trap; typedef enum { ERTS_DSP_NO_LOCK, - ERTS_DSP_RLOCK, - ERTS_DSP_RWLOCK + ERTS_DSP_RLOCK } ErtsDSigPrepLock; typedef struct { Process *proc; DistEntry *dep; + Eterm node; /* used if dep == NULL */ Eterm cid; Eterm connection_id; int no_suspend; @@ -117,14 +123,10 @@ extern int erts_is_alive; /* * erts_dsig_prepare() prepares a send of a distributed signal. - * One of the values defined below are returned. If the returned - * value is another than ERTS_DSIG_PREP_CONNECTED, the - * distributed signal cannot be sent before appropriate actions - * have been taken. Appropriate actions would typically be setting - * up the connection. + * One of the values defined below are returned. */ -/* Connected; signal can be sent. */ +/* Connected; signals can be enqueued and sent. */ #define ERTS_DSIG_PREP_CONNECTED 0 /* Not connected; connection needs to be set up. */ #define ERTS_DSIG_PREP_NOT_CONNECTED 1 @@ -132,43 +134,80 @@ extern int erts_is_alive; #define ERTS_DSIG_PREP_WOULD_SUSPEND 2 /* System not alive (distributed) */ #define ERTS_DSIG_PREP_NOT_ALIVE 3 +/* Pending connection; signals can be enqueued */ +#define ERTS_DSIG_PREP_PENDING 4 ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *, - DistEntry *, + DistEntry*, Process *, + ErtsProcLocks, ErtsDSigPrepLock, + int, int); ERTS_GLB_INLINE void erts_schedule_dist_command(Port *, DistEntry *); +int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks); + #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *dsdp, DistEntry *dep, Process *proc, + ErtsProcLocks proc_locks, ErtsDSigPrepLock dspl, - int no_suspend) + int no_suspend, + int connect) { - int failure; + int res; + if (!erts_is_alive) return ERTS_DSIG_PREP_NOT_ALIVE; - if (!dep) - return ERTS_DSIG_PREP_NOT_CONNECTED; - if (dspl == ERTS_DSP_RWLOCK) - erts_de_rwlock(dep); - else - erts_de_rlock(dep); - if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - failure = ERTS_DSIG_PREP_NOT_CONNECTED; + if (!dep) { + ASSERT(!connect); + return ERTS_DSIG_PREP_NOT_CONNECTED; + } + +#ifdef ERTS_ENABLE_LOCK_CHECK + if (connect) { + erts_proc_lc_might_unlock(proc, proc_locks); + } +#endif + +retry: + erts_de_rlock(dep); + + if (ERTS_DE_IS_CONNECTED(dep)) { + res = ERTS_DSIG_PREP_CONNECTED; + } + else if (dep->status & ERTS_DE_SFLG_PENDING) { + res = ERTS_DSIG_PREP_PENDING; + } + else if (dep->status & ERTS_DE_SFLG_EXITING) { + res = ERTS_DSIG_PREP_NOT_CONNECTED; + goto fail; + } + else if (connect) { + ASSERT(dep->status == 0); + erts_de_runlock(dep); + if (!erts_auto_connect(dep, proc, proc_locks)) { + return ERTS_DSIG_PREP_NOT_ALIVE; + } + goto retry; + } + else { + ASSERT(dep->status == 0); + res = ERTS_DSIG_PREP_NOT_CONNECTED; goto fail; } + if (no_suspend) { - if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { - failure = ERTS_DSIG_PREP_WOULD_SUSPEND; + if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { + res = ERTS_DSIG_PREP_WOULD_SUSPEND; goto fail; - } + } } dsdp->proc = proc; dsdp->dep = dep; @@ -177,15 +216,11 @@ erts_dsig_prepare(ErtsDSigData *dsdp, dsdp->no_suspend = no_suspend; if (dspl == ERTS_DSP_NO_LOCK) erts_de_runlock(dep); - return ERTS_DSIG_PREP_CONNECTED; + return res; fail: - if (dspl == ERTS_DSP_RWLOCK) - erts_de_rwunlock(dep); - else - erts_de_runlock(dep); - return failure; - + erts_de_runlock(dep); + return res; } ERTS_GLB_INLINE @@ -332,7 +367,7 @@ struct erts_dsig_send_context { Eterm ctl; Eterm msg; int force_busy; - Uint32 pass_through_size; + Uint32 max_finalize_prepend; Uint data_size, dhdr_ext_size; ErtsAtomCacheMap *acmp; ErtsDistOutputBuf *obuf; @@ -346,11 +381,12 @@ struct erts_dsig_send_context { typedef struct { int suspend; + int connect; Eterm ctl_heap[6]; ErtsDSigData dsd; - DistEntry* dep_to_deref; DistEntry *dep; + int deref_dep; struct erts_dsig_send_context dss; Eterm return_term; diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 878b971b07..c6cc5c78b3 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -260,6 +260,8 @@ type MREF_TAB LONG_LIVED SYSTEM magic_ref_table type MINDIRECTION FIXED_SIZE SYSTEM magic_indirection type BINARY_FIND SHORT_LIVED PROCESSES binary_find type OPEN_PORT_ENV TEMPORARY SYSTEM open_port_env +type CRASH_DUMP STANDARD SYSTEM crash_dump +type DIST_TRANSCODE SHORT_LIVED SYSTEM dist_transcode_context type THR_Q_EL STANDARD SYSTEM thr_q_element type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 27bbf70c0b..9d05680723 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -3830,10 +3830,10 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) BIF_RET(res); } } - else if (ERTS_IS_ATOM_STR("term_to_binary_no_funs", tp[1])) { - Uint dflags = (DFLAG_EXTENDED_REFERENCES | - DFLAG_EXTENDED_PIDS_PORTS | - DFLAG_BIT_BINARIES); + else if (ERTS_IS_ATOM_STR("term_to_binary_tuple_fallbacks", tp[1])) { + Uint dflags = (TERM_TO_BINARY_DFLAGS + & ~DFLAG_EXPORT_PTR_TAG + & ~DFLAG_BIT_BINARIES); BIF_RET(erts_term_to_binary(BIF_P, tp[2], 0, dflags)); } else if (ERTS_IS_ATOM_STR("dist_ctrl", tp[1])) { diff --git a/erts/emulator/beam/erl_cpu_topology.c b/erts/emulator/beam/erl_cpu_topology.c index 49f9beb19f..6f8d2f8c35 100644 --- a/erts/emulator/beam/erl_cpu_topology.c +++ b/erts/emulator/beam/erl_cpu_topology.c @@ -602,7 +602,7 @@ write_schedulers_bind_change(erts_cpu_topology_t *cpudata, int size) cpu_bind_order_sort(cpudata, size, cpu_bind_order, 1); - for (cpu_ix = 0; cpu_ix < size && cpu_ix < erts_no_schedulers; cpu_ix++) + for (cpu_ix = 0; cpu_ix < size && s_ix <= erts_no_schedulers; cpu_ix++) if (erts_is_cpu_available(cpuinfo, cpudata[cpu_ix].logical)) scheduler2cpu_map[s_ix++].bind_id = cpudata[cpu_ix].logical; } diff --git a/erts/emulator/beam/erl_io_queue.c b/erts/emulator/beam/erl_io_queue.c index 190ba6bbb9..40d69ea6b0 100644 --- a/erts/emulator/beam/erl_io_queue.c +++ b/erts/emulator/beam/erl_io_queue.c @@ -658,7 +658,7 @@ io_list_vec_count(Eterm obj, Uint *v_size, int erts_ioq_iolist_vec_len(Eterm obj, int* vsize, Uint* csize, Uint* pvsize, Uint* pcsize, - Uint* total_size, Uint blimit) + size_t* total_size, Uint blimit) { DECLARE_ESTACK(s); Eterm* objp; @@ -669,7 +669,7 @@ erts_ioq_iolist_vec_len(Eterm obj, int* vsize, Uint* csize, Uint p_v_size = 0; Uint p_c_size = 0; Uint p_in_clist = 0; - Uint total; + size_t total; goto L_jump_start; /* avoid a push */ diff --git a/erts/emulator/beam/erl_io_queue.h b/erts/emulator/beam/erl_io_queue.h index 51abe99510..7d0fe6751c 100644 --- a/erts/emulator/beam/erl_io_queue.h +++ b/erts/emulator/beam/erl_io_queue.h @@ -103,7 +103,7 @@ Uint erts_ioq_sizeq(ErtsIOQueue *q); int erts_ioq_iolist_vec_len(Eterm obj, int* vsize, Uint* csize, Uint* pvsize, Uint* pcsize, - Uint* total_size, Uint blimit); + size_t* total_size, Uint blimit); int erts_ioq_iolist_to_vec(Eterm obj, SysIOVec* iov, ErtsIOQBinary** binv, ErtsIOQBinary* cbin, Uint bin_limit, int driver_binary); @@ -111,7 +111,7 @@ int erts_ioq_iolist_to_vec(Eterm obj, SysIOVec* iov, ERTS_GLB_INLINE int erts_ioq_iodata_vec_len(Eterm obj, int* vsize, Uint* csize, Uint* pvsize, Uint* pcsize, - Uint* total_size, Uint blimit); + size_t* total_size, Uint blimit); ERTS_GLB_INLINE int erts_ioq_iodata_to_vec(Eterm obj, SysIOVec* iov, ErtsIOQBinary** binv, ErtsIOQBinary* cbin, @@ -123,7 +123,7 @@ int erts_ioq_iodata_to_vec(Eterm obj, SysIOVec* iov, ERTS_GLB_INLINE int erts_ioq_iodata_vec_len(Eterm obj, int* vsize, Uint* csize, Uint* pvsize, Uint* pcsize, - Uint* total_size, Uint blimit) { + size_t* total_size, Uint blimit) { if (is_binary(obj)) { /* We optimize for when we get a procbin without a bit-offset * that fits in one iov slot diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 9c8cf84e43..a14f4f51d8 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -167,10 +167,9 @@ typedef struct { Sint len; /* queue length */ /* - * The following two fields are used by the recv_mark/1 and + * The following field is used by the recv_mark/1 and * recv_set/1 instructions. */ - BeamInstr* mark; /* address to rec_loop/2 instruction */ ErtsMessage** saved_last; /* saved last pointer */ } ErlMessageQueue; @@ -236,12 +235,17 @@ typedef struct erl_trace_message_queue__ { (p)->msg.len--; \ if (__mp == NULL) \ (p)->msg.last = (p)->msg.save; \ - (p)->msg.mark = 0; \ } while(0) -/* Reset message save point (after receive match) */ -#define JOIN_MESSAGE(p) \ - (p)->msg.save = &(p)->msg.first +/* + * Reset message save point (after receive match). + * Also invalidate the saved position since it may no + * longer be safe to use. + */ +#define JOIN_MESSAGE(p) do { \ + (p)->msg.save = &(p)->msg.first; \ + (p)->msg.saved_last = 0; \ +} while(0) /* Save current message */ #define SAVE_MESSAGE(p) \ diff --git a/erts/emulator/beam/erl_msacc.h b/erts/emulator/beam/erl_msacc.h index 2d4637f800..2588dec903 100644 --- a/erts/emulator/beam/erl_msacc.h +++ b/erts/emulator/beam/erl_msacc.h @@ -270,18 +270,32 @@ void erts_msacc_init_thread(char *type, int id, int liberty); #define ERTS_MSACC_PUSH_STATE_M() \ ERTS_MSACC_DECLARE_CACHE(); \ ERTS_MSACC_PUSH_STATE_CACHED_M() -#define ERTS_MSACC_PUSH_STATE_CACHED_M() \ - __erts_msacc_state = ERTS_MSACC_IS_ENABLED_CACHED() ? \ - erts_msacc_get_state_m__(__erts_msacc_cache) : ERTS_MSACC_STATE_OTHER +#define ERTS_MSACC_PUSH_STATE_CACHED_M() \ + do { \ + if (ERTS_MSACC_IS_ENABLED_CACHED()) { \ + ASSERT(!__erts_msacc_cache->unmanaged); \ + __erts_msacc_state = erts_msacc_get_state_m__(__erts_msacc_cache); \ + } else { \ + __erts_msacc_state = ERTS_MSACC_STATE_OTHER; \ + } \ + } while(0) #define ERTS_MSACC_SET_STATE_M(state) \ ERTS_MSACC_DECLARE_CACHE(); \ ERTS_MSACC_SET_STATE_CACHED_M(state) -#define ERTS_MSACC_SET_STATE_CACHED_M(state) \ - if (ERTS_MSACC_IS_ENABLED_CACHED()) \ - erts_msacc_set_state_m__(__erts_msacc_cache, state, 1) -#define ERTS_MSACC_POP_STATE_M() \ - if (ERTS_MSACC_IS_ENABLED_CACHED()) \ - erts_msacc_set_state_m__(__erts_msacc_cache, __erts_msacc_state, 0) +#define ERTS_MSACC_SET_STATE_CACHED_M(state) \ + do { \ + if (ERTS_MSACC_IS_ENABLED_CACHED()) { \ + ASSERT(!__erts_msacc_cache->unmanaged); \ + erts_msacc_set_state_m__(__erts_msacc_cache, state, 1); \ + } \ + } while(0) +#define ERTS_MSACC_POP_STATE_M() \ + do { \ + if (ERTS_MSACC_IS_ENABLED_CACHED()) { \ + ASSERT(!__erts_msacc_cache->unmanaged); \ + erts_msacc_set_state_m__(__erts_msacc_cache, __erts_msacc_state, 0); \ + } \ + } while(0) #define ERTS_MSACC_PUSH_AND_SET_STATE_M(state) \ ERTS_MSACC_PUSH_STATE_M(); ERTS_MSACC_SET_STATE_CACHED_M(state) diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c index f7f12efe28..d1018bab26 100644 --- a/erts/emulator/beam/erl_nif.c +++ b/erts/emulator/beam/erl_nif.c @@ -1201,11 +1201,12 @@ size_t enif_binary_to_term(ErlNifEnv *dst_env, Sint size; ErtsHeapFactory factory; byte *bp = (byte*) data; + Uint32 flags = 0; - ERTS_CT_ASSERT(ERL_NIF_BIN2TERM_SAFE == ERTS_DIST_EXT_BTT_SAFE); - - if (opts & ~ERL_NIF_BIN2TERM_SAFE) { - return 0; + switch ((Uint32)opts) { + case 0: break; + case ERL_NIF_BIN2TERM_SAFE: flags = ERTS_DIST_EXT_BTT_SAFE; break; + default: return 0; } if ((size = erts_decode_ext_size(bp, data_sz)) < 0) return 0; @@ -1217,7 +1218,7 @@ size_t enif_binary_to_term(ErlNifEnv *dst_env, erts_factory_dummy_init(&factory); } - *term = erts_decode_ext(&factory, &bp, (Uint32)opts); + *term = erts_decode_ext(&factory, &bp, flags); if (is_non_value(*term)) { return 0; @@ -3323,36 +3324,38 @@ static int examine_iovec_term(Eterm list, UWord max_length, iovec_slice_t *resul size = binary_size(binary); binary_header = binary_val(binary); - /* If we're a sub-binary we'll need to check our underlying binary to - * determine whether we're on-heap or not. */ - if(thing_subtag(*binary_header) == SUB_BINARY_SUBTAG) { - ErlSubBin *sb = (ErlSubBin*)binary_header; + if (size > 0) { + /* If we're a sub-binary we'll need to check our underlying binary + * to determine whether we're on-heap or not. */ + if (thing_subtag(*binary_header) == SUB_BINARY_SUBTAG) { + ErlSubBin *sb = (ErlSubBin*)binary_header; - /* Reject bitstrings */ - if((sb->bitoffs + sb->bitsize) > 0) { - return 0; - } + /* Reject bitstrings */ + if((sb->bitoffs + sb->bitsize) > 0) { + return 0; + } - ASSERT(size <= binary_size(sb->orig)); - binary_header = binary_val(sb->orig); - } + ASSERT(size <= binary_size(sb->orig)); + binary_header = binary_val(sb->orig); + } - if(thing_subtag(*binary_header) == HEAP_BINARY_SUBTAG) { - ASSERT(size <= ERL_ONHEAP_BIN_LIMIT); + if (thing_subtag(*binary_header) == HEAP_BINARY_SUBTAG) { + ASSERT(size <= ERL_ONHEAP_BIN_LIMIT); - result->iovec_len += 1; - result->onheap_size += size; - } else { - ASSERT(thing_subtag(*binary_header) == REFC_BINARY_SUBTAG); + result->iovec_len += 1; + result->onheap_size += size; + } else { + ASSERT(thing_subtag(*binary_header) == REFC_BINARY_SUBTAG); - result->iovec_len += 1 + size / MAX_SYSIOVEC_IOVLEN; - result->offheap_size += size; + result->iovec_len += 1 + size / MAX_SYSIOVEC_IOVLEN; + result->offheap_size += size; + } } result->sublist_length += 1; lookahead = CDR(cell); - if(result->sublist_length >= max_length) { + if (result->sublist_length >= max_length) { break; } } @@ -3385,6 +3388,10 @@ static void inspect_raw_binary_data(Eterm binary, ErlNifBinary *result) { if (thing_subtag(*parent_header) == REFC_BINARY_SUBTAG) { ProcBin *pb = (ProcBin*)parent_header; + if (pb->flags & (PB_IS_WRITABLE | PB_ACTIVE_WRITER)) { + erts_emasculate_writable_binary(pb); + } + ASSERT(pb->val != NULL); ASSERT(byte_offset < pb->size); ASSERT(&pb->bytes[byte_offset] >= (byte*)(pb->val)->orig_bytes); @@ -3428,7 +3435,7 @@ static int fill_iovec_with_slice(ErlNifEnv *env, /* If this isn't a refc binary, copy its contents to the onheap buffer * and reference that instead. */ - if (raw_data.ref_bin == NULL) { + if (raw_data.size > 0 && raw_data.ref_bin == NULL) { ASSERT(onheap_offset < onheap_data.size); ASSERT(slice->onheap_size > 0); @@ -3439,12 +3446,11 @@ static int fill_iovec_with_slice(ErlNifEnv *env, raw_data.ref_bin = onheap_data.ref_bin; } - ASSERT(raw_data.ref_bin != NULL); - while (raw_data.size > 0) { UWord chunk_len = MIN(raw_data.size, MAX_SYSIOVEC_IOVLEN); ASSERT(iovec_idx < iovec->iovcnt); + ASSERT(raw_data.ref_bin != NULL); iovec->iov[iovec_idx].iov_base = raw_data.data; iovec->iov[iovec_idx].iov_len = chunk_len; diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 0f3dfa797c..eaf133f5c0 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -39,9 +39,11 @@ erts_rwmtx_t erts_node_table_rwmtx; DistEntry *erts_hidden_dist_entries; DistEntry *erts_visible_dist_entries; +DistEntry *erts_pending_dist_entries; DistEntry *erts_not_connected_dist_entries; /* including erts_this_dist_entry */ Sint erts_no_of_hidden_dist_entries; Sint erts_no_of_visible_dist_entries; +Sint erts_no_of_pending_dist_entries; Sint erts_no_of_not_connected_dist_entries; /* including erts_this_dist_entry */ DistEntry *erts_this_dist_entry; @@ -101,7 +103,9 @@ void erts_ref_dist_entry(DistEntry *dep) { ASSERT(dep); - de_refc_inc(dep, 1); + if (de_refc_inc_read(dep, 1) == 1) { + de_refc_inc(dep, 2); /* Pending delete */ + } } void @@ -139,9 +143,7 @@ dist_table_cmp(void *dep1, void *dep2) static void* dist_table_alloc(void *dep_tmpl) { -#ifdef DEBUG erts_aint_t refc; -#endif Eterm sysname; Binary *bin; DistEntry *dep; @@ -158,13 +160,8 @@ dist_table_alloc(void *dep_tmpl) dist_entries++; -#ifdef DEBUG - refc = -#else - (void) -#endif - de_refc_dec_read(dep, -1); - ASSERT(refc == -1); + refc = de_refc_dec_read(dep, -1); + ASSERT(refc == -1); (void)refc; dep->prev = NULL; erts_rwmtx_init_opt(&dep->rwmtx, &rwmtx_opt, "dist_entry", sysname, @@ -202,6 +199,7 @@ dist_table_alloc(void *dep_tmpl) erts_port_task_handle_init(&dep->dist_cmd); dep->send = NULL; dep->cache = NULL; + dep->transcode_ctx = NULL; /* Link in */ @@ -224,6 +222,8 @@ dist_table_free(void *vdep) { DistEntry *dep = (DistEntry *) vdep; + ASSERT(de_refc_read(dep, -1) == -1); + ASSERT(dep->status == 0); ASSERT(is_nil(dep->cid)); ASSERT(dep->nlinks == NULL); ASSERT(dep->node_links == NULL); @@ -384,56 +384,92 @@ erts_dhandle_to_dist_entry(Eterm dhandle) } Eterm -erts_make_dhandle(Process *c_p, DistEntry *dep) +erts_build_dhandle(Eterm **hpp, ErlOffHeap* ohp, DistEntry *dep) { - Binary *bin; - Eterm *hp; - - bin = ErtsDistEntry2Bin(dep); + Binary *bin = ErtsDistEntry2Bin(dep); ASSERT(bin); ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_dist_entry_destructor); - hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE); - return erts_mk_magic_ref(&hp, &c_p->off_heap, bin); + return erts_mk_magic_ref(hpp, ohp, bin); +} + +Eterm +erts_make_dhandle(Process *c_p, DistEntry *dep) +{ + Eterm *hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE); + return erts_build_dhandle(&hp, &c_p->off_heap, dep); } -static void try_delete_dist_entry(void *vbin); +static void start_timer_delete_dist_entry(void *vdep); +static void prepare_try_delete_dist_entry(void *vdep); +static void try_delete_dist_entry(DistEntry*); + +static void schedule_delete_dist_entry(DistEntry* dep) +{ + /* + * Here we need thread progress to wait for other threads, that may have + * done lookup without refc++, to do either refc++ or drop their refs. + * + * Note that timeouts do not guarantee thread progress. + */ + erts_schedule_thr_prgr_later_op(start_timer_delete_dist_entry, + dep, &dep->later_op); +} static void -prepare_try_delete_dist_entry(void *vbin) +start_timer_delete_dist_entry(void *vdep) { - Binary *bin = (Binary *) vbin; - DistEntry *dep = ErtsBin2DistEntry(bin); - Uint size; + if (node_tab_delete_delay == 0) { + prepare_try_delete_dist_entry(vdep); + } + else { + ASSERT(node_tab_delete_delay > 0); + erts_start_timer_callback(node_tab_delete_delay, + prepare_try_delete_dist_entry, + vdep); + } +} + +static void +prepare_try_delete_dist_entry(void *vdep) +{ + DistEntry *dep = vdep; erts_aint_t refc; + /* + * Time has passed since we decremented refc to zero and DistEntry may + * have been revived. Do a fast check without table lock first. + */ refc = de_refc_read(dep, 0); - if (refc > 0) - return; - - size = ERTS_MAGIC_BIN_SIZE(sizeof(DistEntry)); - erts_schedule_thr_prgr_later_cleanup_op(try_delete_dist_entry, - vbin, &dep->later_op, size); + if (refc == 0) { + try_delete_dist_entry(dep); + } + else { + /* + * Someone has done lookup and done refc++ for us. + */ + refc = de_refc_dec_read(dep, 0); + if (refc == 0) + schedule_delete_dist_entry(dep); + } } -static void try_delete_dist_entry(void *vbin) +static void try_delete_dist_entry(DistEntry* dep) { - Binary *bin = (Binary *) vbin; - DistEntry *dep = ErtsBin2DistEntry(bin); erts_aint_t refc; erts_rwmtx_rwlock(&erts_dist_table_rwmtx); /* * Another thread might have looked up this dist entry after * we decided to delete it (refc became zero). If so, the other - * thread incremented refc twice. Once for the new reference - * and once for this thread. + * thread incremented refc one extra step for this thread. * - * If refc reach -1, no one has used the entry since we - * set up the timer. Delete the entry. + * If refc reach -1, no one has done lookup and no one can do lookup + * as we have table lock. Delete the entry. * - * If refc reach 0, the entry is currently not in use - * but has been used since we set up the timer. Set up a - * new timer. + * If refc reach 0, someone raced us and either + * (1) did lookup with own refc++ and already released it again + * (2) did lookup without own refc++ + * Schedule new delete operation. * * If refc > 0, the entry is in use. Keep the entry. */ @@ -443,12 +479,7 @@ static void try_delete_dist_entry(void *vbin) erts_rwmtx_rwunlock(&erts_dist_table_rwmtx); if (refc == 0) { - if (node_tab_delete_delay == 0) - prepare_try_delete_dist_entry(vbin); - else if (node_tab_delete_delay > 0) - erts_start_timer_callback(node_tab_delete_delay, - prepare_try_delete_dist_entry, - vbin); + schedule_delete_dist_entry(dep); } } @@ -462,12 +493,7 @@ int erts_dist_entry_destructor(Binary *bin) if (refc == -1) return 1; /* Allow deallocation of structure... */ - if (node_tab_delete_delay == 0) - prepare_try_delete_dist_entry((void *) bin); - else if (node_tab_delete_delay > 0) - erts_start_timer_callback(node_tab_delete_delay, - prepare_try_delete_dist_entry, - (void *) bin); + schedule_delete_dist_entry(dep); return 0; } @@ -498,12 +524,17 @@ erts_dist_table_size(void) i++; ASSERT(i == erts_no_of_hidden_dist_entries); i = 0; + for(dep = erts_pending_dist_entries; dep; dep = dep->next) + i++; + ASSERT(i == erts_no_of_pending_dist_entries); + i = 0; for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) i++; ASSERT(i == erts_no_of_not_connected_dist_entries); ASSERT(dist_entries == (erts_no_of_visible_dist_entries + erts_no_of_hidden_dist_entries + + erts_no_of_pending_dist_entries + erts_no_of_not_connected_dist_entries)); #endif @@ -518,43 +549,46 @@ erts_dist_table_size(void) void erts_set_dist_entry_not_connected(DistEntry *dep) { + DistEntry** head; + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); erts_rwmtx_rwlock(&erts_dist_table_rwmtx); ASSERT(dep != erts_this_dist_entry); - ASSERT(is_internal_port(dep->cid) || is_internal_pid(dep->cid)); - - if(dep->flags & DFLAG_PUBLISHED) { - if(dep->prev) { - ASSERT(is_in_de_list(dep, erts_visible_dist_entries)); - dep->prev->next = dep->next; - } - else { - ASSERT(erts_visible_dist_entries == dep); - erts_visible_dist_entries = dep->next; - } - ASSERT(erts_no_of_visible_dist_entries > 0); - erts_no_of_visible_dist_entries--; + if (dep->status & ERTS_DE_SFLG_PENDING) { + ASSERT(is_nil(dep->cid)); + ASSERT(erts_no_of_pending_dist_entries > 0); + erts_no_of_pending_dist_entries--; + head = &erts_pending_dist_entries; } else { - if(dep->prev) { - ASSERT(is_in_de_list(dep, erts_hidden_dist_entries)); - dep->prev->next = dep->next; - } - else { - ASSERT(erts_hidden_dist_entries == dep); - erts_hidden_dist_entries = dep->next; - } - - ASSERT(erts_no_of_hidden_dist_entries > 0); - erts_no_of_hidden_dist_entries--; + ASSERT(dep->status != 0); + ASSERT(is_internal_port(dep->cid) || is_internal_pid(dep->cid)); + if (dep->flags & DFLAG_PUBLISHED) { + ASSERT(erts_no_of_visible_dist_entries > 0); + erts_no_of_visible_dist_entries--; + head = &erts_visible_dist_entries; + } + else { + ASSERT(erts_no_of_hidden_dist_entries > 0); + erts_no_of_hidden_dist_entries--; + head = &erts_hidden_dist_entries; + } } + if(dep->prev) { + ASSERT(is_in_de_list(dep, *head)); + dep->prev->next = dep->next; + } + else { + ASSERT(*head == dep); + *head = dep->next; + } if(dep->next) dep->next->prev = dep->prev; - dep->status &= ~ERTS_DE_SFLG_CONNECTED; + dep->status &= ~(ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED); dep->flags = 0; dep->prev = NULL; dep->cid = NIL; @@ -570,46 +604,87 @@ erts_set_dist_entry_not_connected(DistEntry *dep) } void +erts_set_dist_entry_pending(DistEntry *dep) +{ + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + erts_rwmtx_rwlock(&erts_dist_table_rwmtx); + + ASSERT(dep != erts_this_dist_entry); + ASSERT(dep->status == 0); + ASSERT(is_nil(dep->cid)); + + if(dep->prev) { + ASSERT(is_in_de_list(dep, erts_not_connected_dist_entries)); + dep->prev->next = dep->next; + } + else { + ASSERT(dep == erts_not_connected_dist_entries); + erts_not_connected_dist_entries = dep->next; + } + + if(dep->next) + dep->next->prev = dep->prev; + + erts_no_of_not_connected_dist_entries--; + + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); + dep->connection_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + + dep->prev = NULL; + dep->next = erts_pending_dist_entries; + if(erts_pending_dist_entries) { + ASSERT(erts_pending_dist_entries->prev == NULL); + erts_pending_dist_entries->prev = dep; + } + erts_pending_dist_entries = dep; + erts_no_of_pending_dist_entries++; + erts_rwmtx_rwunlock(&erts_dist_table_rwmtx); +} + +void erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) { + erts_aint32_t set_qflgs; + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); erts_rwmtx_rwlock(&erts_dist_table_rwmtx); ASSERT(dep != erts_this_dist_entry); ASSERT(is_nil(dep->cid)); + ASSERT(dep->status & ERTS_DE_SFLG_PENDING); ASSERT(is_internal_port(cid) || is_internal_pid(cid)); if(dep->prev) { - ASSERT(is_in_de_list(dep, erts_not_connected_dist_entries)); + ASSERT(is_in_de_list(dep, erts_pending_dist_entries)); dep->prev->next = dep->next; } else { - ASSERT(erts_not_connected_dist_entries == dep); - erts_not_connected_dist_entries = dep->next; + ASSERT(erts_pending_dist_entries == dep); + erts_pending_dist_entries = dep->next; } if(dep->next) dep->next->prev = dep->prev; - ASSERT(erts_no_of_not_connected_dist_entries > 0); - erts_no_of_not_connected_dist_entries--; + ASSERT(erts_no_of_pending_dist_entries > 0); + erts_no_of_pending_dist_entries--; + dep->status &= ~ERTS_DE_SFLG_PENDING; dep->status |= ERTS_DE_SFLG_CONNECTED; - dep->flags = flags; + dep->flags = flags & ~DFLAG_NO_MAGIC; dep->cid = cid; erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) cid); - dep->connection_id++; - dep->connection_id &= ERTS_DIST_EXT_CON_ID_MASK; dep->prev = NULL; erts_atomic64_set_nob(&dep->in, 0); erts_atomic64_set_nob(&dep->out, 0); - erts_atomic32_set_nob(&dep->qflgs, - (is_internal_port(cid) - ? ERTS_DE_QFLG_PORT_CTRL - : ERTS_DE_QFLG_PROC_CTRL)); + set_qflgs = (is_internal_port(cid) ? + ERTS_DE_QFLG_PORT_CTRL : ERTS_DE_QFLG_PROC_CTRL); + erts_atomic32_read_bor_nob(&dep->qflgs, set_qflgs); + if(flags & DFLAG_PUBLISHED) { dep->next = erts_visible_dist_entries; if(erts_visible_dist_entries) { @@ -929,9 +1004,11 @@ void erts_init_node_tables(int dd_sec) erts_hidden_dist_entries = NULL; erts_visible_dist_entries = NULL; + erts_pending_dist_entries = NULL; erts_not_connected_dist_entries = NULL; erts_no_of_hidden_dist_entries = 0; erts_no_of_visible_dist_entries = 0; + erts_no_of_pending_dist_entries = 0; erts_no_of_not_connected_dist_entries = 0; node_tmpl.sysname = am_Noname; @@ -1057,6 +1134,7 @@ typedef struct { typedef struct dist_referrer_ { struct dist_referrer_ *next; int heap_ref; + int ets_ref; int node_ref; int ctrl_ref; int system_ref; @@ -1175,10 +1253,11 @@ insert_dist_referrer(ReferredDist *referred_dist, else { Uint *hp = &drp->id_heap[0]; ASSERT(is_tuple(id)); - drp->id = copy_struct(id, size_object(id), &hp, NULL); + drp->id = copy_struct(id, size_object(id), &hp, NULL); } drp->creation = creation; drp->heap_ref = 0; + drp->ets_ref = 0; drp->node_ref = 0; drp->ctrl_ref = 0; drp->system_ref = 0; @@ -1188,6 +1267,7 @@ insert_dist_referrer(ReferredDist *referred_dist, case NODE_REF: drp->node_ref++; break; case CTRL_REF: drp->ctrl_ref++; break; case HEAP_REF: drp->heap_ref++; break; + case ETS_REF: drp->ets_ref++; break; case SYSTEM_REF: drp->system_ref++; break; default: ASSERT(0); } @@ -1300,6 +1380,11 @@ insert_offheap2(ErlOffHeap *oh, void *arg) (((Bin)->intern.flags & BIN_FLAG_MAGIC) \ && ERTS_MAGIC_BIN_DESTRUCTOR((Bin)) == erts_dist_entry_destructor) +#define IsSendCtxBinary(Bin) \ + (((Bin)->intern.flags & BIN_FLAG_MAGIC) \ + && ERTS_MAGIC_BIN_DESTRUCTOR((Bin)) == erts_dsend_context_dtor) + + static void insert_offheap(ErlOffHeap *oh, int type, Eterm id) { @@ -1336,7 +1421,12 @@ insert_offheap(ErlOffHeap *oh, int type, Eterm id) inserted_bins = nib; UnUseTmpHeapNoproc(BIG_UINT_HEAP_SIZE); } - } + } + else if (IsSendCtxBinary(u.mref->mb)) { + ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(u.mref->mb); + if (ctx->deref_dep) + insert_dist_entry(ctx->dep, type, id, 0); + } break; case REFC_BINARY_SUBTAG: case FUN_SUBTAG: @@ -1463,9 +1553,9 @@ insert_delayed_delete_node(void *state, } static void -insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vbin) +insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vdep) { - DistEntry *dep = ErtsBin2DistEntry(vbin); + DistEntry *dep = vdep; Eterm heap[3]; insert_dist_entry(dep, SYSTEM_REF, @@ -1476,9 +1566,9 @@ insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vbin static void insert_delayed_delete_dist_entry(void *state, ErtsMonotonicTime timeout_pos, - void *vbin) + void *vdep) { - DistEntry *dep = ErtsBin2DistEntry(vbin); + DistEntry *dep = vdep; Eterm heap[3]; insert_dist_entry(dep, SYSTEM_REF, @@ -1520,7 +1610,7 @@ setup_reference_table(void) erts_debug_callback_timer_foreach(prepare_try_delete_dist_entry, insert_delayed_delete_dist_entry, NULL); - erts_debug_later_op_foreach(try_delete_dist_entry, + erts_debug_later_op_foreach(start_timer_delete_dist_entry, insert_thr_prgr_delete_dist_entry, NULL); @@ -1686,6 +1776,15 @@ setup_reference_table(void) insert_monitors(dep->monitors, dep->sysname); } + for(dep = erts_pending_dist_entries; dep; dep = dep->next) { + if(dep->nlinks) + insert_links2(dep->nlinks, dep->sysname); + if(dep->node_links) + insert_links(dep->node_links, dep->sysname); + if(dep->monitors) + insert_monitors(dep->monitors, dep->sysname); + } + /* Not connected dist entries should not have any links, but inspect them anyway */ for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { @@ -1855,6 +1954,10 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) tup = MK_2TUP(AM_heap, MK_UINT(drp->heap_ref)); drl = MK_CONS(tup, drl); } + if(drp->ets_ref) { + tup = MK_2TUP(AM_ets, MK_UINT(drp->ets_ref)); + drl = MK_CONS(tup, drl); + } if(drp->system_ref) { tup = MK_2TUP(AM_system, MK_UINT(drp->system_ref)); drl = MK_CONS(tup, drl); @@ -1871,12 +1974,17 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) else if (is_tuple(drp->id)) { Eterm *t; ASSERT(drp->system_ref && !drp->node_ref - && !drp->ctrl_ref && !drp->heap_ref); + && !drp->ctrl_ref && !drp->heap_ref && !drp->ets_ref); t = tuple_val(drp->id); ASSERT(2 == arityval(t[0])); tup = MK_2TUP(t[1], t[2]); } - else { + else if (drp->ets_ref) { + ASSERT(!drp->heap_ref && !drp->node_ref && + !drp->ctrl_ref && !drp->system_ref); + tup = MK_2TUP(AM_ets, drp->id); + } + else { ASSERT(!drp->ctrl_ref && drp->node_ref); ASSERT(is_atom(drp->id)); tup = MK_2TUP(drp->id, MK_UINT(drp->creation)); diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 3bba673435..8d29c83e15 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -57,11 +57,9 @@ #define ERST_INTERNAL_CHANNEL_NO 0 -#define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 0) -#define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 1) - -#define ERTS_DE_SFLGS_ALL (ERTS_DE_SFLG_CONNECTED \ - | ERTS_DE_SFLG_EXITING) +#define ERTS_DE_SFLG_PENDING (((Uint32) 1) << 0) +#define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 1) +#define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 2) #define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0) #define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1) @@ -85,10 +83,12 @@ typedef struct ErtsDistOutputBuf_ ErtsDistOutputBuf; struct ErtsDistOutputBuf_ { #ifdef DEBUG Uint dbg_pattern; + byte *alloc_endp; #endif ErtsDistOutputBuf *next; byte *extp; byte *ext_endp; + byte *msg_start; byte data[1]; }; @@ -109,8 +109,6 @@ struct ErtsProcList_; * unlock mutexes with higher numbers before mutexes with higher numbers. */ -struct erl_link; - typedef struct dist_entry_ { HashBucket hash_bucket; /* Hash bucket */ struct dist_entry_ *next; /* Next entry in dist_table (not sorted) */ @@ -159,6 +157,8 @@ typedef struct dist_entry_ { struct cache* cache; /* The atom cache */ ErtsThrPrgrLaterOp later_op; + + struct transcode_context* transcode_ctx; } DistEntry; typedef struct erl_node_ { @@ -177,9 +177,11 @@ extern erts_rwmtx_t erts_node_table_rwmtx; extern DistEntry *erts_hidden_dist_entries; extern DistEntry *erts_visible_dist_entries; +extern DistEntry *erts_pending_dist_entries; extern DistEntry *erts_not_connected_dist_entries; extern Sint erts_no_of_hidden_dist_entries; extern Sint erts_no_of_visible_dist_entries; +extern Sint erts_no_of_pending_dist_entries; extern Sint erts_no_of_not_connected_dist_entries; extern DistEntry *erts_this_dist_entry; @@ -195,6 +197,7 @@ void erts_schedule_delete_dist_entry(DistEntry *); Uint erts_dist_table_size(void); void erts_dist_table_info(fmtfn_t, void *); void erts_set_dist_entry_not_connected(DistEntry *); +void erts_set_dist_entry_pending(DistEntry *); void erts_set_dist_entry_connected(DistEntry *, Eterm, Uint); ErlNode *erts_find_or_insert_node(Eterm, Uint32); void erts_schedule_delete_node(ErlNode *); @@ -210,6 +213,7 @@ int erts_lc_is_de_rlocked(DistEntry *); #endif int erts_dist_entry_destructor(Binary *bin); DistEntry *erts_dhandle_to_dist_entry(Eterm dhandle); +Eterm erts_build_dhandle(Eterm **hpp, ErlOffHeap*, DistEntry*); Eterm erts_make_dhandle(Process *c_p, DistEntry *dep); void erts_ref_dist_entry(DistEntry *dep); void erts_deref_dist_entry(DistEntry *dep); @@ -271,5 +275,6 @@ erts_de_links_unlock(DistEntry *dep) #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ void erts_debug_test_node_tab_delayed_delete(Sint64 millisecs); +void erts_lcnt_update_distribution_locks(int enable); #endif diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 3d12b3bc21..a807d60ec7 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -3276,7 +3276,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) erts_aint32_t aux_work = 0; int thr_prgr_active = 1; erts_aint32_t flgs; - ERTS_MSACC_PUSH_STATE_M(); + ERTS_MSACC_PUSH_STATE(); ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq)); @@ -3385,9 +3385,9 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) - 1) + 1; } else timeout = -1; - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_SLEEP); + ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_SLEEP); res = erts_tse_twait(ssi->event, timeout); - ERTS_MSACC_POP_STATE_M(); + ERTS_MSACC_POP_STATE(); current_time = ERTS_SCHEDULER_IS_DIRTY(esdp) ? 0 : erts_get_monotonic_time(esdp); } while (res == EINTR); @@ -9701,7 +9701,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) | ERTS_PROC_LOCK_STATUS | ERTS_PROC_LOCK_TRACE)); - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_OTHER); + ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_OTHER); if (state & ERTS_PSFLG_FREE) { if (!is_normal_sched) { @@ -9916,7 +9916,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) case 0: /* No process at all */ default: ASSERT(qmask == 0); - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_OTHER); + ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_OTHER); goto check_activities_to_run; } @@ -10034,7 +10034,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) } - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_EMULATOR); + ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_EMULATOR); if (flags & ERTS_RUNQ_FLG_PROTECTED) @@ -10824,7 +10824,7 @@ request_system_task(Process *c_p, Eterm requester, Eterm target, goto badarg; req_type = tp[1]; req_id = tp[2]; - req_id_sz = is_immed(req_id) ? req_id : size_object(req_id); + req_id_sz = is_immed(req_id) ? 0 : size_object(req_id); tot_sz = req_id_sz; for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) { int tix = 3 + i; @@ -12659,9 +12659,11 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { + int code = erts_dsig_prepare(&dsd, dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + code = erts_dsig_send_demonitor(&dsd, rmon->u.pid, mon->name, @@ -12705,9 +12707,11 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { + int code = erts_dsig_prepare(&dsd, dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + code = erts_dsig_send_demonitor(&dsd, rmon->u.pid, mon->u.pid, @@ -12764,8 +12768,8 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); + int code = erts_dsig_prepare(&dsd, dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, mon->u.pid, @@ -12887,14 +12891,18 @@ static void doit_exit_link(ErtsLink *lnk, void *vpcontext) int code; ErtsDistLinkData dld; erts_remove_dist_link(&dld, p->common.id, item, dep); - erts_proc_lock(p, ERTS_PROC_LOCK_MAIN); - code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_exit_tt(&dsd, p->common.id, item, - reason, SEQ_TRACE_TOKEN(p)); - ASSERT(code == ERTS_DSIG_SEND_OK); - } - erts_proc_unlock(p, ERTS_PROC_LOCK_MAIN); + if (dld.d_lnk) { + erts_proc_lock(p, ERTS_PROC_LOCK_MAIN); + code = erts_dsig_prepare(&dsd, dep, p, 0, ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + + code = erts_dsig_send_exit_tt(&dsd, p->common.id, item, + reason, SEQ_TRACE_TOKEN(p)); + ASSERT(code == ERTS_DSIG_SEND_OK); + } + erts_proc_unlock(p, ERTS_PROC_LOCK_MAIN); + } erts_destroy_dist_link(&dld); } } diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c index 5a2c262ff1..0b7f361622 100644 --- a/erts/emulator/beam/erl_process_dump.c +++ b/erts/emulator/beam/erl_process_dump.c @@ -32,6 +32,7 @@ #include "dist.h" #include "beam_catches.h" #include "erl_binary.h" +#include "erl_map.h" #define ERTS_WANT_EXTERNAL_TAGS #include "external.h" @@ -51,6 +52,11 @@ static void print_function_from_pc(fmtfn_t to, void *to_arg, BeamInstr* x); static void heap_dump(fmtfn_t to, void *to_arg, Eterm x); static void dump_binaries(fmtfn_t to, void *to_arg, Binary* root); static void dump_externally(fmtfn_t to, void *to_arg, Eterm term); +static void mark_literal(Eterm* ptr); +static void init_literal_areas(void); +static void dump_literals(fmtfn_t to, void *to_arg); +static void dump_module_literals(fmtfn_t to, void *to_arg, + ErtsLiteralArea* lit_area); static Binary* all_binaries; @@ -58,14 +64,14 @@ extern BeamInstr beam_apply[]; extern BeamInstr beam_exit[]; extern BeamInstr beam_continue_exit[]; - void erts_deep_process_dump(fmtfn_t to, void *to_arg) { int i, max = erts_ptab_max(&erts_proc); all_binaries = NULL; - + init_literal_areas(); + for (i = 0; i < max; i++) { Process *p = erts_pix2proc(i); if (p && p->i != ENULL) { @@ -75,6 +81,7 @@ erts_deep_process_dump(fmtfn_t to, void *to_arg) } } + dump_literals(to, to_arg); dump_binaries(to, to_arg, all_binaries); } @@ -373,7 +380,9 @@ heap_dump(fmtfn_t to, void *to_arg, Eterm x) next = (Eterm *) x; } else if (is_list(x)) { ptr = list_val(x); - if (ptr[0] != OUR_NIL) { + if (erts_is_literal(x, ptr)) { + mark_literal(ptr); + } else if (ptr[0] != OUR_NIL) { erts_print(to, to_arg, PTR_FMT ":l", ptr); dump_element(to, to_arg, ptr[0]); erts_putc(to, to_arg, '|'); @@ -392,7 +401,9 @@ heap_dump(fmtfn_t to, void *to_arg, Eterm x) ptr = boxed_val(x); hdr = *ptr; - if (hdr != OUR_NIL) { /* If not visited */ + if (erts_is_literal(x, ptr)) { + mark_literal(ptr); + } else if (hdr != OUR_NIL) { erts_print(to, to_arg, PTR_FMT ":", ptr); if (is_arity_value(hdr)) { Uint i; @@ -498,11 +509,77 @@ heap_dump(fmtfn_t to, void *to_arg, Eterm x) erts_print(to, to_arg, "p<%beu.%beu>\n", port_channel_no(x), port_number(x)); *ptr = OUR_NIL; + } else if (is_map_header(hdr)) { + if (is_flatmap_header(hdr)) { + flatmap_t* fmp = (flatmap_t *) flatmap_val(x); + Eterm* values = ptr + sizeof(flatmap_t) / sizeof(Eterm); + Uint map_size = fmp->size; + int i; + + erts_print(to, to_arg, "Mf" ETERM_FMT ":", map_size); + dump_element(to, to_arg, fmp->keys); + erts_putc(to, to_arg, ':'); + for (i = 0; i < map_size; i++) { + dump_element(to, to_arg, values[i]); + if (is_immed(values[i])) { + values[i] = make_small(0); + } + if (i < map_size-1) { + erts_putc(to, to_arg, ','); + } + } + erts_putc(to, to_arg, '\n'); + *ptr = OUR_NIL; + x = fmp->keys; + if (map_size) { + fmp->keys = (Eterm) next; + next = &values[map_size-1]; + } + continue; + } else { + Uint i; + Uint sz = 0; + Eterm* nodes = ptr + 1; + + switch (MAP_HEADER_TYPE(hdr)) { + case MAP_HEADER_TAG_HAMT_HEAD_ARRAY: + nodes++; + sz = 16; + erts_print(to, to_arg, "Mh" ETERM_FMT ":" ETERM_FMT ":", + hashmap_size(x), sz); + break; + case MAP_HEADER_TAG_HAMT_HEAD_BITMAP: + nodes++; + sz = hashmap_bitcount(MAP_HEADER_VAL(hdr)); + erts_print(to, to_arg, "Mh" ETERM_FMT ":" ETERM_FMT ":", + hashmap_size(x), sz); + break; + case MAP_HEADER_TAG_HAMT_NODE_BITMAP: + sz = hashmap_bitcount(MAP_HEADER_VAL(hdr)); + erts_print(to, to_arg, "Mn" ETERM_FMT ":", sz); + break; + } + *ptr = OUR_NIL; + for (i = 0; i < sz; i++) { + dump_element(to, to_arg, nodes[i]); + if (is_immed(nodes[i])) { + nodes[i] = make_small(0); + } + if (i < sz-1) { + erts_putc(to, to_arg, ','); + } + } + erts_putc(to, to_arg, '\n'); + x = nodes[0]; + nodes[0] = (Eterm) next; + next = &nodes[sz-1]; + continue; + } } else { /* * All other we dump in the external term format. */ - dump_externally(to, to_arg, x); + dump_externally(to, to_arg, x); erts_putc(to, to_arg, '\n'); *ptr = OUR_NIL; } @@ -564,11 +641,6 @@ dump_externally(fmtfn_t to, void *to_arg, Eterm term) } } - /* Do not handle maps */ - if (is_map(term)) { - term = am_undefined; - } - s = p = sbuf; erts_encode_ext(term, &p); erts_print(to, to_arg, "E%X:", p-s); @@ -577,6 +649,284 @@ dump_externally(fmtfn_t to, void *to_arg, Eterm term) } } +/* + * Handle dumping of literal areas. + */ + +static ErtsLiteralArea** lit_areas; +static Uint num_lit_areas; + +static int compare_areas(const void * a, const void * b) +{ + ErtsLiteralArea** a_p = (ErtsLiteralArea **) a; + ErtsLiteralArea** b_p = (ErtsLiteralArea **) b; + + if (*a_p < *b_p) { + return -1; + } else if (*b_p < *a_p) { + return 1; + } else { + return 0; + } +} + + +static void +init_literal_areas(void) +{ + int i; + Module* modp; + ErtsCodeIndex code_ix; + ErtsLiteralArea** area_p; + + code_ix = erts_active_code_ix(); + erts_rlock_old_code(code_ix); + + lit_areas = area_p = erts_dump_lit_areas; + num_lit_areas = 0; + for (i = 0; i < module_code_size(code_ix); i++) { + modp = module_code(i, code_ix); + if (modp == NULL) { + continue; + } + if (modp->curr.code_length > 0 && + modp->curr.code_hdr->literal_area) { + *area_p++ = modp->curr.code_hdr->literal_area; + } + if (modp->old.code_length > 0 && modp->old.code_hdr->literal_area) { + *area_p++ = modp->old.code_hdr->literal_area; + } + } + + num_lit_areas = area_p - lit_areas; + ASSERT(num_lit_areas <= erts_dump_num_lit_areas); + for (i = 0; i < num_lit_areas; i++) { + lit_areas[i]->off_heap = 0; + } + + qsort(lit_areas, num_lit_areas, sizeof(ErtsLiteralArea *), + compare_areas); + + erts_runlock_old_code(code_ix); +} + +static int search_areas(const void * a, const void * b) { + Eterm* key = (Eterm *) a; + ErtsLiteralArea** b_p = (ErtsLiteralArea **) b; + if (key < b_p[0]->start) { + return -1; + } else if (b_p[0]->end <= key) { + return 1; + } else { + return 0; + } +} + +static void mark_literal(Eterm* ptr) +{ + ErtsLiteralArea** ap; + + ap = bsearch(ptr, lit_areas, num_lit_areas, sizeof(ErtsLiteralArea*), + search_areas); + + /* + * If the literal was created by native code, this search will not + * find it and ap will be NULL. + */ + + if (ap) { + ap[0]->off_heap = (struct erl_off_heap_header *) 1; + } +} + + +static void +dump_literals(fmtfn_t to, void *to_arg) +{ + ErtsCodeIndex code_ix; + int i; + + code_ix = erts_active_code_ix(); + erts_rlock_old_code(code_ix); + + erts_print(to, to_arg, "=literals\n"); + for (i = 0; i < num_lit_areas; i++) { + if (lit_areas[i]->off_heap) { + dump_module_literals(to, to_arg, lit_areas[i]); + } + } + + erts_runlock_old_code(code_ix); +} + +static void +dump_module_literals(fmtfn_t to, void *to_arg, ErtsLiteralArea* lit_area) +{ + Eterm* htop; + Eterm* hend; + + htop = lit_area->start; + hend = lit_area->end; + while (htop < hend) { + Eterm w = *htop; + Eterm term; + Uint size; + + switch (primary_tag(w)) { + case TAG_PRIMARY_HEADER: + term = make_boxed(htop); + erts_print(to, to_arg, PTR_FMT ":", htop); + if (is_arity_value(w)) { + Uint i; + Uint arity = arityval(w); + + erts_print(to, to_arg, "t" ETERM_FMT ":", arity); + for (i = 1; i <= arity; i++) { + dump_element(to, to_arg, htop[i]); + if (i < arity) { + erts_putc(to, to_arg, ','); + } + } + erts_putc(to, to_arg, '\n'); + } else if (w == HEADER_FLONUM) { + FloatDef f; + char sbuf[31]; + int i; + + GET_DOUBLE_DATA((htop+1), f); + i = sys_double_to_chars(f.fd, sbuf, sizeof(sbuf)); + sys_memset(sbuf+i, 0, 31-i); + erts_print(to, to_arg, "F%X:%s\n", i, sbuf); + } else if (_is_bignum_header(w)) { + erts_print(to, to_arg, "B%T\n", term); + } else if (is_binary_header(w)) { + Uint tag = thing_subtag(w); + Uint size = binary_size(term); + Uint i; + + if (tag == HEAP_BINARY_SUBTAG) { + byte* p; + + erts_print(to, to_arg, "Yh%X:", size); + p = binary_bytes(term); + for (i = 0; i < size; i++) { + erts_print(to, to_arg, "%02X", p[i]); + } + } else if (tag == REFC_BINARY_SUBTAG) { + ProcBin* pb = (ProcBin *) binary_val(term); + Binary* val = pb->val; + + if (erts_atomic_xchg_nob(&val->intern.refc, 0) != 0) { + val->intern.flags = (UWord) all_binaries; + all_binaries = val; + } + erts_print(to, to_arg, + "Yc" PTR_FMT ":" PTR_FMT ":" PTR_FMT, + val, + pb->bytes - (byte *)val->orig_bytes, + size); + } else if (tag == SUB_BINARY_SUBTAG) { + ErlSubBin* Sb = (ErlSubBin *) binary_val(term); + Eterm* real_bin; + void* val; + + real_bin = boxed_val(Sb->orig); + if (thing_subtag(*real_bin) == REFC_BINARY_SUBTAG) { + /* + * Unvisited REFC_BINARY: Point directly to + * the binary. + */ + ProcBin* pb = (ProcBin *) real_bin; + val = pb->val; + } else { + /* + * Heap binary or visited REFC binary: Point + * to heap binary or ProcBin on the heap. + */ + val = real_bin; + } + erts_print(to, to_arg, + "Ys" PTR_FMT ":" PTR_FMT ":" PTR_FMT, + val, Sb->offs, size); + } + erts_putc(to, to_arg, '\n'); + } else if (is_map_header(w)) { + if (is_flatmap_header(w)) { + flatmap_t* fmp = (flatmap_t *) flatmap_val(term); + Eterm* values = htop + sizeof(flatmap_t) / sizeof(Eterm); + Uint map_size = fmp->size; + int i; + + erts_print(to, to_arg, "Mf" ETERM_FMT ":", map_size); + dump_element(to, to_arg, fmp->keys); + erts_putc(to, to_arg, ':'); + for (i = 0; i < map_size; i++) { + dump_element(to, to_arg, values[i]); + if (i < map_size-1) { + erts_putc(to, to_arg, ','); + } + } + erts_putc(to, to_arg, '\n'); + } else { + Uint i; + Uint sz = 0; + Eterm* nodes = htop + 1; + + switch (MAP_HEADER_TYPE(w)) { + case MAP_HEADER_TAG_HAMT_HEAD_ARRAY: + nodes++; + sz = 16; + erts_print(to, to_arg, "Mh" ETERM_FMT ":" ETERM_FMT ":", + hashmap_size(term), sz); + break; + case MAP_HEADER_TAG_HAMT_HEAD_BITMAP: + nodes++; + sz = hashmap_bitcount(MAP_HEADER_VAL(w)); + erts_print(to, to_arg, "Mh" ETERM_FMT ":" ETERM_FMT ":", + hashmap_size(term), sz); + break; + case MAP_HEADER_TAG_HAMT_NODE_BITMAP: + sz = hashmap_bitcount(MAP_HEADER_VAL(w)); + erts_print(to, to_arg, "Mn" ETERM_FMT ":", sz); + break; + } + for (i = 0; i < sz; i++) { + dump_element(to, to_arg, nodes[i]); + if (i < sz-1) { + erts_putc(to, to_arg, ','); + } + } + erts_putc(to, to_arg, '\n'); + } + } + size = 1 + header_arity(w); + switch (w & _HEADER_SUBTAG_MASK) { + case MAP_SUBTAG: + if (is_flatmap_header(w)) { + size += 1 + flatmap_get_size(htop); + } else { + size += hashmap_bitcount(MAP_HEADER_VAL(w)); + } + break; + case SUB_BINARY_SUBTAG: + size += 1; + break; + } + break; + default: + ASSERT(!is_header(htop[1])); + erts_print(to, to_arg, PTR_FMT ":l", htop); + dump_element(to, to_arg, htop[0]); + erts_putc(to, to_arg, '|'); + dump_element(to, to_arg, htop[1]); + erts_putc(to, to_arg, '\n'); + size = 2; + break; + } + htop += size; + } +} + void erts_dump_process_state(fmtfn_t to, void *to_arg, erts_aint32_t psflg) { char *s; diff --git a/erts/emulator/beam/erl_term.h b/erts/emulator/beam/erl_term.h index 6daf043117..7d42d1b396 100644 --- a/erts/emulator/beam/erl_term.h +++ b/erts/emulator/beam/erl_term.h @@ -55,9 +55,6 @@ struct erl_node_; /* Declared in erl_node_tables.h */ #if defined(ARCH_64) # define TAG_PTR_MASK__ 0x7 # if !defined(ERTS_HAVE_OS_PHYSICAL_MEMORY_RESERVATION) -# ifdef HIPE -# error Hipe on 64-bit needs a real mmap as it does not support the literal tag -# endif # define TAG_LITERAL_PTR 0x4 # else # undef TAG_LITERAL_PTR @@ -862,7 +859,7 @@ do { \ ((ErtsMRefThing *) (Hp))->mb = (Binp); \ ((ErtsMRefThing *) (Hp))->next = (Ohp)->first; \ (Ohp)->first = (struct erl_off_heap_header*) (Hp); \ - ASSERT(erts_is_ref_numbers_magic(&(Binp)->refn)); \ + ASSERT(erts_is_ref_numbers_magic((Binp)->refn)); \ } while (0) #endif /* ARCH_32 */ diff --git a/erts/emulator/beam/erl_thr_progress.h b/erts/emulator/beam/erl_thr_progress.h index fa936b5707..8c029bcf99 100644 --- a/erts/emulator/beam/erl_thr_progress.h +++ b/erts/emulator/beam/erl_thr_progress.h @@ -28,7 +28,7 @@ * Author: Rickard Green */ -#if !defined(ERL_THR_PROGRESS_H__TSD_TYPE__) +#ifndef ERL_THR_PROGRESS_H__TSD_TYPE__ #define ERL_THR_PROGRESS_H__TSD_TYPE__ #include "sys.h" diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 970158933f..f2e6399ad7 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -122,8 +122,9 @@ static int encode_size_struct_int(struct TTBSizeContext_*, ErtsAtomCacheMap *acm static Export binary_to_term_trap_export; static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1); -static BIF_RETTYPE binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* context_b, - Export *bif, Eterm arg0, Eterm arg1); +static Sint transcode_dist_obuf(ErtsDistOutputBuf*, DistEntry*, Uint32 dflags, Sint reds); + + void erts_init_external(void) { erts_init_trap_export(&term_to_binary_trap_export, @@ -222,23 +223,10 @@ erts_destroy_atom_cache_map(ErtsAtomCacheMap *acmp) static ERTS_INLINE void insert_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint32 dflags) { - /* - * If the receiver do not understand utf8 atoms - * and this atom cannot be represented in latin1, - * we are not allowed to cache it. - * - * In this case all atoms are assumed to have - * latin1 encoding in the cache. By refusing it - * in the cache we will instead encode it using - * ATOM_UTF8_EXT/SMALL_ATOM_UTF8_EXT which the - * receiver do not recognize and tear down the - * connection. - */ - if (acmp && acmp->sz < ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES - && ((dflags & DFLAG_UTF8_ATOMS) - || atom_tab(atom_val(atom))->latin1_chars >= 0)) { + if (acmp && acmp->sz < ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES) { int ix; ASSERT(acmp->hdr_sz < 0); + ASSERT(dflags & DFLAG_UTF8_ATOMS); ix = atom2cix(atom); if (acmp->cache[ix].iix < 0) { acmp->cache[ix].iix = acmp->sz; @@ -258,9 +246,7 @@ get_iix_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint32 dflags) ASSERT(is_atom(atom)); ix = atom2cix(atom); if (acmp->cache[ix].iix < 0) { - ASSERT(acmp->sz == ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES - || (!(dflags & DFLAG_UTF8_ATOMS) - && atom_tab(atom_val(atom))->latin1_chars < 0)); + ASSERT(acmp->sz == ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES); return -1; } else { @@ -274,7 +260,6 @@ void erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) { if (acmp) { - int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); int long_atoms = 0; /* !0 if one or more atoms are longer than 255. */ int i; int sz; @@ -285,6 +270,7 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) + 1 /* number of internal cache entries */ ; int min_sz; + ASSERT(dflags & DFLAG_UTF8_ATOMS); ASSERT(acmp->hdr_sz < 0); /* Make sure cache update instructions fit */ min_sz = fix_sz+(2+4)*acmp->sz; @@ -296,7 +282,7 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) atom = acmp->cache[acmp->cix[i]].atom; ASSERT(is_atom(atom)); a = atom_tab(atom_val(atom)); - len = (int) (utf8_atoms ? a->len : a->latin1_chars); + len = (int) a->len; ASSERT(len >= 0); if (!long_atoms && len > 255) long_atoms = 1; @@ -366,18 +352,62 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) } } -byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint32 dflags) + +#define PASS_THROUGH 'p' + +Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, + DistEntry* dep, + Uint32 dflags, + Sint reds) { byte *ip; byte instr_buf[(2+4)*ERTS_ATOM_CACHE_SIZE]; int ci, sz; byte dist_hdr_flags; int long_atoms; - int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); - register byte *ep = ext; - ASSERT(ep[0] == VERSION_MAGIC); - if (ep[1] != DIST_HEADER) - return ext; + register byte *ep = ob->extp; + ASSERT(dflags & DFLAG_UTF8_ATOMS); + + /* + * The buffer can have different layouts at this point depending on + * what was known when encoded: + * + * Pending connection: CtrlTerm [, MsgTerm] + * With atom cache : VERSION_MAGIC, DIST_HEADER, ..., CtrlTerm [, MsgTerm] + * No atom cache : VERSION_MAGIC, CtrlTerm [, VERSION_MAGIC, MsgTerm] + */ + + if (ep[0] != VERSION_MAGIC || dep->transcode_ctx) { + /* + * Was encoded without atom cache toward pending connection. + */ + ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT); + + if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG + | DFLAG_DIST_HDR_ATOM_CACHE)) { + reds = transcode_dist_obuf(ob, dep, dflags, reds); + if (reds < 0) + return reds; + ep = ob->extp; + } + if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) { + /* + * Encoding was done without atom caching but receiver expects + * a dist header, so we prepend an empty one. + */ + *--ep = 0; /* NumberOfAtomCacheRefs */ + *--ep = DIST_HEADER; + *--ep = VERSION_MAGIC; + } + goto done; + } + else if (ep[1] != DIST_HEADER) { + ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT); + ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)); + /* Node without atom cache, 'pass through' needed */ + *--ep = PASS_THROUGH; + goto done; + } dist_hdr_flags = ep[2]; long_atoms = ERTS_DIST_HDR_LONG_ATOMS_FLG & ((int) dist_hdr_flags); @@ -405,6 +435,7 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint / sizeof(Uint32))+1]; register Uint32 flgs; int iix, flgs_bytes, flgs_buf_ix, used_half_bytes; + ErtsAtomCache* cache = dep->cache; #ifdef DEBUG int tot_used_half_bytes; #endif @@ -447,17 +478,9 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint Atom *a; cache->out_arr[cix] = atom; a = atom_tab(atom_val(atom)); - if (utf8_atoms) { - sz = a->len; - ep -= sz; - sys_memcpy((void *) ep, (void *) a->name, sz); - } - else { - ASSERT(0 <= a->latin1_chars && a->latin1_chars <= MAX_ATOM_CHARACTERS); - ep -= a->latin1_chars; - sz = erts_utf8_to_latin1(ep, a->name, a->len); - ASSERT(a->latin1_chars == sz); - } + sz = a->len; + ep -= sz; + sys_memcpy((void *) ep, (void *) a->name, sz); if (long_atoms) { ep -= 2; put_int16(sz, ep); @@ -504,12 +527,16 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint break; } } + reds -= 3; /*was ERTS_PORT_REDS_DIST_CMD_FINALIZE*/ } --ep; put_int8(ci, ep); *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; - return ep; +done: + ob->extp = ep; + ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + return reds < 0 ? 0 : reds; } int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, @@ -520,7 +547,7 @@ int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, return -1; } else { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC))) #endif sz++ /* VERSION_MAGIC */; @@ -536,7 +563,7 @@ int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx return -1; } else { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(ctx->flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC))) #endif sz++ /* VERSION_MAGIC */; @@ -568,7 +595,7 @@ int erts_encode_dist_ext(Eterm term, byte **ext, Uint32 flags, ErtsAtomCacheMap { if (!ctx || !ctx->wstack.wstart) { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC))) #endif *(*ext)++ = VERSION_MAGIC; } @@ -643,7 +670,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, #endif register byte *ep = ext; - int utf8_atoms = (int) (dep->flags & DFLAG_UTF8_ATOMS); + ASSERT(dep->flags & DFLAG_UTF8_ATOMS); edep->heap_size = -1; edep->ext_endp = ext+size; @@ -669,17 +696,16 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, erts_de_rlock(dep); - if ((dep->status & (ERTS_DE_SFLG_EXITING|ERTS_DE_SFLG_CONNECTED)) - != ERTS_DE_SFLG_CONNECTED) { + if (dep->status != ERTS_DE_SFLG_CONNECTED && + dep->status != ERTS_DE_SFLG_PENDING) { erts_de_runlock(dep); return ERTS_PREP_DIST_EXT_CLOSED; } - if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; *connection_id = dep->connection_id; - edep->flags |= (dep->connection_id & ERTS_DIST_EXT_CON_ID_MASK); + edep->connection_id = dep->connection_id; if (ep[1] != DIST_HEADER) { if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) @@ -806,9 +832,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, CHKSIZE(len); atom = erts_atom_put((byte *) ep, len, - (utf8_atoms - ? ERTS_ATOM_ENC_UTF8 - : ERTS_ATOM_ENC_LATIN1), + ERTS_ATOM_ENC_UTF8, 0); if (is_non_value(atom)) ERTS_EXT_HDR_FAIL; @@ -895,7 +919,7 @@ bad_dist_ext(ErtsDistExternal *edep) erts_dsprintf(dsbufp, ", %d=%T", i, edep->attab.atom[i]); } erts_send_warning_to_logger_nogl(dsbufp); - erts_kill_dist_connection(dep, ERTS_DIST_EXT_CON_ID(edep)); + erts_kill_dist_connection(dep, edep->connection_id); } } @@ -1220,7 +1244,8 @@ typedef struct B2TContext_t { ErtsBinary2TermState b2ts; Uint32 flags; SWord reds; - Eterm trap_bin; + Uint used_bytes; /* In: boolean, Out: bytes */ + Eterm trap_bin; /* THE_NON_VALUE if not exported */ Export *bif; Eterm arg[2]; enum B2TState state; @@ -1314,6 +1339,11 @@ binary2term_prepare(ErtsBinary2TermState *state, byte *data, Sint data_size, ctx->u.uc.dbytes = state->extp; ctx->u.uc.dleft = dest_len; + if (ctx->used_bytes) { + ASSERT(ctx->used_bytes == 1); + /* to be subtracted by stream.avail_in when done */ + ctx->used_bytes = data_size; + } ctx->state = B2TUncompressChunk; *ctxp = ctx; } @@ -1416,13 +1446,15 @@ static int b2t_context_destructor(Binary *context_bin) return 1; } +static BIF_RETTYPE binary_to_term_int(Process*, Eterm bin, B2TContext*); + + static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1) { Binary *context_bin = erts_magic_ref2bin(BIF_ARG_1); ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(context_bin) == b2t_context_destructor); - return binary_to_term_int(BIF_P, 0, THE_NON_VALUE, context_bin, NULL, - THE_NON_VALUE, THE_NON_VALUE); + return binary_to_term_int(BIF_P, THE_NON_VALUE, ERTS_MAGIC_BIN_DATA(context_bin)); } @@ -1448,6 +1480,8 @@ static B2TContext* b2t_export_context(Process* p, B2TContext* src) b2t_context_destructor); B2TContext* ctx = ERTS_MAGIC_BIN_DATA(context_b); Eterm* hp; + + ASSERT(is_non_value(src->trap_bin)); sys_memcpy(ctx, src, sizeof(B2TContext)); if (ctx->state >= B2TDecode && ctx->u.dc.next == &src->u.dc.res) { ctx->u.dc.next = &ctx->u.dc.res; @@ -1457,8 +1491,7 @@ static B2TContext* b2t_export_context(Process* p, B2TContext* src) return ctx; } -static BIF_RETTYPE binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* context_b, - Export *bif_init, Eterm arg0, Eterm arg1) +static BIF_RETTYPE binary_to_term_int(Process* p, Eterm bin, B2TContext *ctx) { BIF_RETTYPE ret_val; #ifdef EXTREME_B2T_TRAPPING @@ -1466,25 +1499,17 @@ static BIF_RETTYPE binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binar #else SWord initial_reds = (Uint)(ERTS_BIF_REDS_LEFT(p) * B2T_BYTES_PER_REDUCTION); #endif - B2TContext c_buff; - B2TContext *ctx; int is_first_call; - if (context_b == NULL) { + if (is_value(bin)) { /* Setup enough to get started */ is_first_call = 1; - ctx = &c_buff; ctx->state = B2TPrepare; ctx->aligned_alloc = NULL; - ctx->flags = flags; - ctx->bif = bif_init; - ctx->arg[0] = arg0; - ctx->arg[1] = arg1; - IF_DEBUG(ctx->trap_bin = THE_NON_VALUE;) } else { - is_first_call = 0; - ctx = ERTS_MAGIC_BIN_DATA(context_b); + ASSERT(is_value(ctx->trap_bin)); ASSERT(ctx->state != B2TPrepare); + is_first_call = 0; } ctx->reds = initial_reds; @@ -1528,6 +1553,10 @@ static BIF_RETTYPE binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binar && zret == Z_STREAM_END && ctx->u.uc.dleft == 0) { ctx->reds -= chunk; + if (ctx->used_bytes) { + ASSERT(ctx->used_bytes > 5 + ctx->u.uc.stream.avail_in); + ctx->used_bytes -= ctx->u.uc.stream.avail_in; + } ctx->state = B2TSizeInit; } else { @@ -1546,11 +1575,11 @@ static BIF_RETTYPE binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binar break; case B2TDecodeInit: - if (ctx == &c_buff && ctx->b2ts.extsize > ctx->reds) { + if (is_non_value(ctx->trap_bin) && ctx->b2ts.extsize > ctx->reds) { /* dec_term will maybe trap, allocate space for magic bin before result term to make it easy to trim with HRelease. */ - ctx = b2t_export_context(p, &c_buff); + ctx = b2t_export_context(p, ctx); } ctx->u.dc.ep = ctx->b2ts.extp; ctx->u.dc.res = (Eterm) (UWord) NULL; @@ -1593,6 +1622,25 @@ static BIF_RETTYPE binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binar return ret_val; case B2TDone: + if (ctx->used_bytes) { + Eterm *hp; + Eterm used; + if (!ctx->b2ts.exttmp) { + ASSERT(ctx->used_bytes == 1); + ctx->used_bytes = (ctx->u.dc.ep - ctx->b2ts.extp + +1); /* VERSION_MAGIC */ + } + if (IS_USMALL(0, ctx->used_bytes)) { + hp = erts_produce_heap(&ctx->u.dc.factory, 3, 0); + used = make_small(ctx->used_bytes); + } + else { + hp = erts_produce_heap(&ctx->u.dc.factory, 3+BIG_UINT_HEAP_SIZE, 0); + used = uint_to_big(ctx->used_bytes, hp); + hp += BIG_UINT_HEAP_SIZE; + } + ctx->u.dc.res = TUPLE2(hp, ctx->u.dc.res, used); + } b2t_destroy_context(ctx); if (ctx->u.dc.factory.hp > ctx->u.dc.factory.hp_end) { @@ -1613,11 +1661,10 @@ static BIF_RETTYPE binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binar } }while (ctx->reds > 0 || ctx->state >= B2TDone); - if (ctx == &c_buff) { - ASSERT(ctx->trap_bin == THE_NON_VALUE); - ctx = b2t_export_context(p, &c_buff); + if (is_non_value(ctx->trap_bin)) { + ctx = b2t_export_context(p, ctx); + ASSERT(is_value(ctx->trap_bin)); } - ASSERT(ctx->trap_bin != THE_NON_VALUE); if (is_first_call) { erts_set_gc_state(p, 0); @@ -1634,23 +1681,35 @@ HIPE_WRAPPER_BIF_DISABLE_GC(binary_to_term, 1) BIF_RETTYPE binary_to_term_1(BIF_ALIST_1) { - return binary_to_term_int(BIF_P, 0, BIF_ARG_1, NULL, bif_export[BIF_binary_to_term_1], - BIF_ARG_1, THE_NON_VALUE); + B2TContext ctx; + + ctx.flags = 0; + ctx.used_bytes = 0; + ctx.trap_bin = THE_NON_VALUE; + ctx.bif = bif_export[BIF_binary_to_term_1]; + ctx.arg[0] = BIF_ARG_1; + ctx.arg[1] = THE_NON_VALUE; + return binary_to_term_int(BIF_P, BIF_ARG_1, &ctx); } HIPE_WRAPPER_BIF_DISABLE_GC(binary_to_term, 2) BIF_RETTYPE binary_to_term_2(BIF_ALIST_2) { + B2TContext ctx; Eterm opts; Eterm opt; - Uint32 flags = 0; + ctx.flags = 0; + ctx.used_bytes = 0; opts = BIF_ARG_2; while (is_list(opts)) { opt = CAR(list_val(opts)); if (opt == am_safe) { - flags |= ERTS_DIST_EXT_BTT_SAFE; + ctx.flags |= ERTS_DIST_EXT_BTT_SAFE; + } + else if (opt == am_used) { + ctx.used_bytes = 1; } else { goto error; @@ -1661,8 +1720,11 @@ BIF_RETTYPE binary_to_term_2(BIF_ALIST_2) if (is_not_nil(opts)) goto error; - return binary_to_term_int(BIF_P, flags, BIF_ARG_1, NULL, bif_export[BIF_binary_to_term_2], - BIF_ARG_1, BIF_ARG_2); + ctx.trap_bin = THE_NON_VALUE; + ctx.bif = bif_export[BIF_binary_to_term_2]; + ctx.arg[0] = BIF_ARG_1; + ctx.arg[1] = BIF_ARG_2; + return binary_to_term_int(BIF_P, BIF_ARG_1, &ctx); error: BIF_ERROR(BIF_P, BADARG); @@ -2099,7 +2161,7 @@ enc_atom(ErtsAtomCacheMap *acmp, Eterm atom, byte *ep, Uint32 dflags) { int iix; int len; - int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); + const int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); ASSERT(is_atom(atom)); @@ -2494,8 +2556,6 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, break; } - L_jump_start: - if (ctx && --r <= 0) { *reds = 0; ctx->obj = obj; @@ -2503,6 +2563,8 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, WSTACK_SAVE(s, &ctx->wstack); return -1; } + + L_jump_start: switch(tag_val_def(obj)) { case NIL_DEF: *ep++ = NIL_EXT; @@ -2867,62 +2929,27 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, ErlFunThing* funp = (ErlFunThing *) fun_val(obj); int ei; - if ((dflags & DFLAG_NEW_FUN_TAGS) != 0) { - *ep++ = NEW_FUN_EXT; - WSTACK_PUSH2(s, ENC_PATCH_FUN_SIZE, - (UWord) ep); /* Position for patching in size */ - ep += 4; - *ep = funp->arity; - ep += 1; - sys_memcpy(ep, funp->fe->uniq, 16); - ep += 16; - put_int32(funp->fe->index, ep); - ep += 4; - put_int32(funp->num_free, ep); - ep += 4; - ep = enc_atom(acmp, funp->fe->module, ep, dflags); - ep = enc_term(acmp, make_small(funp->fe->old_index), ep, dflags, off_heap); - ep = enc_term(acmp, make_small(funp->fe->old_uniq), ep, dflags, off_heap); - ep = enc_pid(acmp, funp->creator, ep, dflags); - } else { - /* - * Communicating with an obsolete erl_interface or - * jinterface node. Convert the fun to a tuple to - * avoid crasching. - */ - - /* Tag, arity */ - *ep++ = SMALL_TUPLE_EXT; - put_int8(5, ep); - ep += 1; - - /* 'fun' */ - ep = enc_atom(acmp, am_fun, ep, dflags); - - /* Module name */ - ep = enc_atom(acmp, funp->fe->module, ep, dflags); - - /* Index, Uniq */ - *ep++ = INTEGER_EXT; - put_int32(funp->fe->old_index, ep); - ep += 4; - *ep++ = INTEGER_EXT; - put_int32(funp->fe->old_uniq, ep); - ep += 4; - - /* Environment sub-tuple arity */ - ASSERT(funp->num_free < MAX_ARG); - *ep++ = SMALL_TUPLE_EXT; - put_int8(funp->num_free, ep); - ep += 1; - } - for (ei = funp->num_free-1; ei > 0; ei--) { + ASSERT(dflags & DFLAG_NEW_FUN_TAGS); + *ep++ = NEW_FUN_EXT; + WSTACK_PUSH2(s, ENC_PATCH_FUN_SIZE, + (UWord) ep); /* Position for patching in size */ + ep += 4; + *ep = funp->arity; + ep += 1; + sys_memcpy(ep, funp->fe->uniq, 16); + ep += 16; + put_int32(funp->fe->index, ep); + ep += 4; + put_int32(funp->num_free, ep); + ep += 4; + ep = enc_atom(acmp, funp->fe->module, ep, dflags); + ep = enc_term(acmp, make_small(funp->fe->old_index), ep, dflags, off_heap); + ep = enc_term(acmp, make_small(funp->fe->old_uniq), ep, dflags, off_heap); + ep = enc_pid(acmp, funp->creator, ep, dflags); + + for (ei = funp->num_free-1; ei >= 0; ei--) { WSTACK_PUSH2(s, ENC_TERM, (UWord) funp->env[ei]); } - if (funp->num_free != 0) { - obj = funp->env[0]; - goto L_jump_start; - } } break; } @@ -3260,6 +3287,7 @@ dec_term_atom_common: n--; if (ctx) { if (reds < n) { + ASSERT(reds > 0); ctx->state = B2TDecodeList; ctx->u.dc.remaining_n = n - reds; n = reds; @@ -4000,6 +4028,7 @@ dec_term_atom_common: if (ctx) { ctx->state = B2TDone; ctx->reds = reds; + ctx->u.dc.ep = ep; } return ep; @@ -4269,24 +4298,12 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, { ErlFunThing* funp = (ErlFunThing *) fun_val(obj); - if ((dflags & DFLAG_NEW_FUN_TAGS) != 0) { - result += 20+1+1+4; /* New ID + Tag */ - result += 4; /* Length field (number of free variables */ - result += encode_size_struct2(acmp, funp->creator, dflags); - result += encode_size_struct2(acmp, funp->fe->module, dflags); - result += 2 * (1+4); /* Index, Uniq */ - } else { - /* - * Size when fun is mapped to a tuple. - */ - result += 1 + 1; /* Tuple tag, arity */ - result += 1 + 1 + 2 + - atom_tab(atom_val(am_fun))->len; /* 'fun' */ - result += 1 + 1 + 2 + - atom_tab(atom_val(funp->fe->module))->len; /* Module name */ - result += 2 * (1 + 4); /* Index + Uniq */ - result += 1 + (funp->num_free < 0x100 ? 1 : 4); - } + ASSERT(dflags & DFLAG_NEW_FUN_TAGS); + result += 20+1+1+4; /* New ID + Tag */ + result += 4; /* Length field (number of free variables */ + result += encode_size_struct2(acmp, funp->creator, dflags); + result += encode_size_struct2(acmp, funp->fe->module, dflags); + result += 2 * (1+4); /* Index, Uniq */ if (funp->num_free > 1) { WSTACK_PUSH2(s, (UWord) (funp->env + 1), (UWord) TERM_ARRAY_OP(funp->num_free-1)); @@ -4374,7 +4391,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags, B2TContext* ctx) } } else - reds = 0; /* not used but compiler warns anyway */ + ERTS_UNDEF(reds, 0); heap_size = 0; terms = 1; @@ -4684,3 +4701,177 @@ error: #undef SKIP2 #undef CHKSIZE } + + +struct transcode_context { + enum { + TRANSCODE_DEC_MSG_SIZE, + TRANSCODE_DEC_MSG, + TRANSCODE_ENC_CTL, + TRANSCODE_ENC_MSG + }state; + Eterm ctl_term; + Eterm* ctl_heap; + ErtsHeapFactory ctl_factory; + Eterm* msg_heap; + B2TContext b2t; + TTBEncodeContext ttb; +#ifdef DEBUG + ErtsDistOutputBuf* dbg_ob; +#endif +}; + +void transcode_free_ctx(DistEntry* dep) +{ + struct transcode_context* ctx = dep->transcode_ctx; + + erts_factory_close(&ctx->ctl_factory); + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->ctl_heap); + + if (ctx->msg_heap) { + erts_factory_close(&ctx->b2t.u.dc.factory); + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->msg_heap); + } + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx); + dep->transcode_ctx = NULL; +} + +Sint transcode_dist_obuf(ErtsDistOutputBuf* ob, + DistEntry* dep, + Uint32 dflags, + Sint reds) +{ + Sint hsz; + byte* decp; + const int have_msg = !!ob->msg_start; + int i; + struct transcode_context* ctx = dep->transcode_ctx; + + if (!ctx) { /* first call for 'ob' */ + + if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG)) { + /* + * Receiver does not support bitstrings and/or export funs. + * We need to transcode control and message terms to use tuple fallbacks. + */ + ctx = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, sizeof(struct transcode_context)); + dep->transcode_ctx = ctx; + #ifdef DEBUG + ctx->dbg_ob = ob; + #endif + + hsz = decoded_size(ob->extp, ob->ext_endp, 0, NULL); + ctx->ctl_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm)); + erts_factory_tmp_init(&ctx->ctl_factory, ctx->ctl_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE); + ctx->msg_heap = NULL; + + decp = dec_term(NULL, &ctx->ctl_factory, ob->extp, &ctx->ctl_term, NULL); + if (have_msg) { + ASSERT(decp == ob->msg_start); (void)decp; + ctx->b2t.u.sc.ep = NULL; + ctx->b2t.state = B2TSize; + ctx->b2t.aligned_alloc = NULL; + ctx->b2t.b2ts.exttmp = 0; + ctx->state = TRANSCODE_DEC_MSG_SIZE; + } + else { + ASSERT(decp == ob->ext_endp); + ctx->state = TRANSCODE_ENC_CTL; + } + } + else { + /* + * No need for full transcoding, but primitive receiver (erl_/jinterface) + * expects VERSION_MAGIC before both control and message terms. + */ + if (ob->msg_start) { + Sint ctl_bytes = ob->msg_start - ob->extp; + ASSERT(ob->extp < ob->msg_start && ob->msg_start < ob->ext_endp); + /* Move control term back 1 byte to make room */ + sys_memmove(ob->extp-1, ob->extp, ctl_bytes); + *--(ob->msg_start) = VERSION_MAGIC; + --(ob->extp); + reds -= ctl_bytes / (B2T_BYTES_PER_REDUCTION * B2T_MEMCPY_FACTOR); + } + *--(ob->extp) = VERSION_MAGIC; + goto done; + } + } + else { + ASSERT(ctx->dbg_ob == ob); + } + ctx->b2t.reds = reds * B2T_BYTES_PER_REDUCTION; + + switch (ctx->state) { + case TRANSCODE_DEC_MSG_SIZE: + hsz = decoded_size(ob->msg_start, ob->ext_endp, 0, &ctx->b2t); + if (ctx->b2t.state == B2TSize) { + return -1; + } + ASSERT(ctx->b2t.state == B2TDecodeInit); + ctx->msg_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm)); + ctx->b2t.u.dc.ep = ob->msg_start; + ctx->b2t.u.dc.res = (Eterm) NULL; + ctx->b2t.u.dc.next = &ctx->b2t.u.dc.res; + erts_factory_tmp_init(&ctx->b2t.u.dc.factory, + ctx->msg_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE); + ctx->b2t.u.dc.flat_maps.wstart = NULL; + ctx->b2t.u.dc.hamt_array.pstart = NULL; + ctx->b2t.state = B2TDecode; + + ctx->state = TRANSCODE_DEC_MSG; + case TRANSCODE_DEC_MSG: + if (ctx->b2t.reds <= 0) + ctx->b2t.reds = 1; + decp = dec_term(NULL, NULL, NULL, NULL, &ctx->b2t); + if (ctx->b2t.state < B2TDone) { + return -1; + } + ASSERT(ctx->b2t.state == B2TDone); + ASSERT(decp && decp <= ob->ext_endp); + reds = ctx->b2t.reds / B2T_BYTES_PER_REDUCTION; + b2t_destroy_context(&ctx->b2t); + + ctx->state = TRANSCODE_ENC_CTL; + case TRANSCODE_ENC_CTL: + if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)) { + ASSERT(!(dflags & DFLAG_NO_MAGIC)); + ob->extp -= 2; /* VERSION_MAGIC x 2 */ + } + ob->ext_endp = ob->extp; + i = erts_encode_dist_ext(ctx->ctl_term, &ob->ext_endp, dflags, + NULL, NULL, NULL); + ASSERT(i == 0); (void)i; + ASSERT(ob->ext_endp <= ob->alloc_endp); + + if (!have_msg) { + break; + } + ob->msg_start = ob->ext_endp; + ctx->ttb.wstack.wstart = NULL; + ctx->ttb.flags = dflags; + ctx->ttb.level = 0; + + ctx->state = TRANSCODE_ENC_MSG; + case TRANSCODE_ENC_MSG: + reds *= TERM_TO_BINARY_LOOP_FACTOR; + if (erts_encode_dist_ext(ctx->b2t.u.dc.res, &ob->ext_endp, dflags, NULL, + &ctx->ttb, &reds)) { + return -1; + } + reds /= TERM_TO_BINARY_LOOP_FACTOR; + + ASSERT(ob->ext_endp <= ob->alloc_endp); + + } + transcode_free_ctx(dep); + +done: + if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--(ob->extp) = PASS_THROUGH; + + if (reds < 0) + reds = 0; + + return reds; +} diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index 3c61d013da..f9f8abcc27 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -109,35 +109,26 @@ typedef struct { } ErtsAtomTranslationTable; /* - * These flags are tagged onto the high bits of a connection ID and stored in - * the ErtsDistExternal structure's flags field. They are used to indicate - * various bits of state necessary to decode binaries in a variety of - * scenarios. The mask ERTS_DIST_EXT_CON_ID_MASK is used later to separate the - * connection ID from the flags. Be careful to ensure that the mask does not - * overlap any of the bits used for flags, or ERTS will leak flags bits into - * connection IDs and leak connection ID bits into the flags. + * These flags are stored in the ErtsDistExternal structure's flags field. + * They are used to indicate various bits of state necessary to decode binaries + * in a variety of scenarios. */ -#define ERTS_DIST_EXT_DFLAG_HDR ((Uint32) 0x80000000) -#define ERTS_DIST_EXT_ATOM_TRANS_TAB ((Uint32) 0x40000000) -#define ERTS_DIST_EXT_BTT_SAFE ((Uint32) 0x20000000) -#define ERTS_DIST_EXT_CON_ID_MASK ((Uint32) 0x1fffffff) +#define ERTS_DIST_EXT_DFLAG_HDR ((Uint32) 0x1) +#define ERTS_DIST_EXT_ATOM_TRANS_TAB ((Uint32) 0x2) +#define ERTS_DIST_EXT_BTT_SAFE ((Uint32) 0x4) + +#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) /* also in net_kernel.erl */ -#define ERTS_DIST_EXT_CON_ID(DIST_EXTP) \ - ((DIST_EXTP)->flags & ERTS_DIST_EXT_CON_ID_MASK) typedef struct { DistEntry *dep; byte *extp; byte *ext_endp; Sint heap_size; + Uint32 connection_id; Uint32 flags; ErtsAtomTranslationTable attab; } ErtsDistExternal; -typedef struct { - int have_header; - int cache_entries; -} ErtsDistHeaderPeek; - #define ERTS_DIST_EXT_SIZE(EDEP) \ (sizeof(ErtsDistExternal) \ - (((EDEP)->flags & ERTS_DIST_EXT_ATOM_TRANS_TAB) \ @@ -163,7 +154,7 @@ void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint32); Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *); byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *); -byte *erts_encode_ext_dist_header_finalize(byte *, ErtsAtomCache *, Uint32); +Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, DistEntry *, Uint32 dflags, Sint reds); struct erts_dsig_send_context; int erts_encode_dist_ext_size(Eterm, Uint32, ErtsAtomCacheMap*, Uint* szp); int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx, Uint* szp); @@ -177,9 +168,6 @@ Uint erts_encode_ext_size_ets(Eterm); void erts_encode_ext(Eterm, byte **); byte* erts_encode_ext_ets(Eterm, byte *, struct erl_off_heap_header** ext_off_heap); -#ifdef ERTS_WANT_EXTERNAL_TAGS -ERTS_GLB_INLINE void erts_peek_dist_header(ErtsDistHeaderPeek *, byte *, Uint); -#endif ERTS_GLB_INLINE void erts_free_dist_ext_copy(ErtsDistExternal *); ERTS_GLB_INLINE void *erts_dist_ext_trailer(ErtsDistExternal *); ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Uint); @@ -207,23 +195,9 @@ void erts_binary2term_abort(ErtsBinary2TermState *); Eterm erts_binary2term_create(ErtsBinary2TermState *, ErtsHeapFactory*); int erts_debug_max_atom_out_cache_index(void); int erts_debug_atom_to_out_cache_index(Eterm); - +void transcode_free_ctx(DistEntry* dep); #if ERTS_GLB_INLINE_INCL_FUNC_DEF -#ifdef ERTS_WANT_EXTERNAL_TAGS -ERTS_GLB_INLINE void -erts_peek_dist_header(ErtsDistHeaderPeek *dhpp, byte *ext, Uint sz) -{ - if (ext[0] == VERSION_MAGIC - || ext[1] != DIST_HEADER - || sz < (1+1+1)) - dhpp->have_header = 0; - else { - dhpp->have_header = 1; - dhpp->cache_entries = (int) get_int8(&ext[2]); - } -} -#endif ERTS_GLB_INLINE void erts_free_dist_ext_copy(ErtsDistExternal *edep) diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index 09aeba00fa..3dd3a60939 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -947,6 +947,8 @@ void erts_update_ranges(BeamInstr* code, Uint size); void erts_remove_from_ranges(BeamInstr* code); UWord erts_ranges_sz(void); void erts_lookup_function_info(FunctionInfo* fi, BeamInstr* pc, int full_info); +ErtsLiteralArea** erts_dump_lit_areas; +Uint erts_dump_num_lit_areas; /* break.c */ void init_break_handler(void); diff --git a/erts/emulator/beam/instrs.tab b/erts/emulator/beam/instrs.tab index 20d356a81d..c17d1a8f69 100644 --- a/erts/emulator/beam/instrs.tab +++ b/erts/emulator/beam/instrs.tab @@ -788,7 +788,8 @@ is_eq_exact(Fail, X, Y) { } i_is_eq_exact_literal(Fail, Src, Literal) { - if (!eq($Src, $Literal)) { + Eterm src = $Src; + if (is_immed(src) || !eq(src, $Literal)) { $FAIL($Fail); } } @@ -800,7 +801,8 @@ is_ne_exact(Fail, X, Y) { } i_is_ne_exact_literal(Fail, Src, Literal) { - if (eq($Src, $Literal)) { + Eterm src = $Src; + if (!is_immed(src) && eq(src, $Literal)) { $FAIL($Fail); } } diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 85013af3ad..9933c8dda4 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -3829,11 +3829,12 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc) ErtsDistLinkData dld; ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_remove_dist_link(&dld, port_id, lnk->pid, dep); erts_destroy_dist_link(&dld); diff --git a/erts/emulator/beam/macros.tab b/erts/emulator/beam/macros.tab index e0b5f56b53..494fe8961e 100644 --- a/erts/emulator/beam/macros.tab +++ b/erts/emulator/beam/macros.tab @@ -20,13 +20,12 @@ // // -// Use if there is a garbage collection before storing to a -// general destination (either X or Y register). +// Define a regular expression that will match instructions that +// perform GC. That will allow beam_makeops to check for instructions +// that don't use $REFRESH_GEN_DEST() when they should. // -REFRESH_GEN_DEST() { - dst_ptr = REG_TARGET_PTR(dst); -} +GC_REGEXP=erts_garbage_collect|erts_gc|GcBifFunction; // $Offset is relative to the start of the instruction (not to the // location of the failure label reference). Since combined diff --git a/erts/emulator/beam/map_instrs.tab b/erts/emulator/beam/map_instrs.tab index bbb2f49b66..c594a87298 100644 --- a/erts/emulator/beam/map_instrs.tab +++ b/erts/emulator/beam/map_instrs.tab @@ -31,7 +31,7 @@ new_map(Dst, Live, N) { Eterm res; HEAVY_SWAPOUT; - res = new_map(c_p, reg, $Live, $N, $NEXT_INSTRUCTION); + res = erts_gc_new_map(c_p, reg, $Live, $N, $NEXT_INSTRUCTION); HEAVY_SWAPIN; $REFRESH_GEN_DEST(); $Dst = res; @@ -44,7 +44,7 @@ i_new_small_map_lit(Dst, Live, Keys) { Eterm keys = $Keys; HEAVY_SWAPOUT; - res = new_small_map_lit(c_p, reg, keys, $Live, $NEXT_INSTRUCTION); + res = erts_gc_new_small_map_lit(c_p, reg, keys, $Live, $NEXT_INSTRUCTION); HEAVY_SWAPIN; $REFRESH_GEN_DEST(); $Dst = res; @@ -133,7 +133,7 @@ update_map_assoc(Src, Dst, Live, N) { reg[live] = $Src; HEAVY_SWAPOUT; - res = update_map_assoc(c_p, reg, live, $N, $NEXT_INSTRUCTION); + res = erts_gc_update_map_assoc(c_p, reg, live, $N, $NEXT_INSTRUCTION); HEAVY_SWAPIN; ASSERT(is_value(res)); $REFRESH_GEN_DEST(); @@ -147,7 +147,7 @@ update_map_exact(Fail, Src, Dst, Live, N) { reg[live] = $Src; HEAVY_SWAPOUT; - res = update_map_exact(c_p, reg, live, $N, $NEXT_INSTRUCTION); + res = erts_gc_update_map_exact(c_p, reg, live, $N, $NEXT_INSTRUCTION); HEAVY_SWAPIN; if (is_value(res)) { $REFRESH_GEN_DEST(); diff --git a/erts/emulator/beam/module.c b/erts/emulator/beam/module.c index baeec115ea..1712dc803c 100644 --- a/erts/emulator/beam/module.c +++ b/erts/emulator/beam/module.c @@ -254,4 +254,3 @@ void module_end_staging(int commit) IF_DEBUG(dbg_load_code_ix = -1); } - diff --git a/erts/emulator/beam/module.h b/erts/emulator/beam/module.h index 9a81e6035b..a3f1ce1705 100644 --- a/erts/emulator/beam/module.h +++ b/erts/emulator/beam/module.h @@ -45,7 +45,7 @@ typedef struct erl_module { int seen; /* Used by finish_loading() */ struct erl_module_instance curr; - struct erl_module_instance old; /* protected by "old_code" rwlock */ + struct erl_module_instance old; /* active protected by "old_code" rwlock */ struct erl_module_instance* on_load; } Module; diff --git a/erts/emulator/beam/msg_instrs.tab b/erts/emulator/beam/msg_instrs.tab index 8055a8616f..d6d4d2fb49 100644 --- a/erts/emulator/beam/msg_instrs.tab +++ b/erts/emulator/beam/msg_instrs.tab @@ -43,27 +43,23 @@ // * // */ -recv_mark(Dest) { +i_recv_mark() { /* - * Save the current position in message buffer and the - * the label for the loop_rec/2 instruction for the - * the receive statement. + * Save the current position in message buffer. */ - $SET_REL_I(c_p->msg.mark, $Dest); c_p->msg.saved_last = c_p->msg.last; } i_recv_set() { /* - * If the mark is valid (points to the loop_rec/2 - * instruction that follows), we know that the saved - * position points to the first message that could - * possibly be matched out. + * If c_p->msg.saved_last is non-zero, it points to the first + * message that could possibly be matched out. * - * If the mark is invalid, we do nothing, meaning that - * we will look through all messages in the message queue. + * If c_p->msg.saved_last is zero, it means that it was invalidated + * because another receive was executed before this i_recv_set() + * instruction was reached. */ - if (c_p->msg.mark == (BeamInstr *) ($NEXT_INSTRUCTION)) { + if (c_p->msg.saved_last) { c_p->msg.save = c_p->msg.saved_last; } SET_I($NEXT_INSTRUCTION); @@ -131,6 +127,7 @@ i_loop_rec(Dest) { ASSERT(HTOP == c_p->htop && E == c_p->stop); /* TODO: Add DTrace probe for this bad message situation? */ UNLINK_MESSAGE(c_p, msgp); + c_p->msg.saved_last = 0; /* Better safe than sorry. */ msgp->next = NULL; erts_cleanup_messages(msgp); goto loop_rec__; diff --git a/erts/emulator/beam/ops.tab b/erts/emulator/beam/ops.tab index 7a2c39b3a8..3df91056cb 100644 --- a/erts/emulator/beam/ops.tab +++ b/erts/emulator/beam/ops.tab @@ -511,8 +511,6 @@ put_list y x x put_list y y x put_list x y x -put_list y x x - # put_list SrcReg Constant Dst put_list x c x @@ -1567,7 +1565,12 @@ on_load # # R14A. # -recv_mark f +# Modified in OTP 21 because it turns out that we don't need the +# label after all. +# + +recv_mark f => i_recv_mark +i_recv_mark recv_set Fail | label Lbl | loop_rec Lf Reg => \ i_recv_set | label Lbl | loop_rec Lf Reg |