diff options
Diffstat (limited to 'erts/emulator/beam')
39 files changed, 1104 insertions, 492 deletions
diff --git a/erts/emulator/beam/atom.c b/erts/emulator/beam/atom.c index fe91134ef4..fd2adac676 100644 --- a/erts/emulator/beam/atom.c +++ b/erts/emulator/beam/atom.c @@ -435,6 +435,9 @@ init_atom_table(void) f.cmp = (HCMP_FUN) atom_cmp; f.alloc = (HALLOC_FUN) atom_alloc; f.free = (HFREE_FUN) atom_free; + f.meta_alloc = (HMALLOC_FUN) erts_alloc; + f.meta_free = (HMFREE_FUN) erts_free; + f.meta_print = (HMPRINT_FUN) erts_print; atom_text_pos = NULL; atom_text_end = NULL; diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index ea04495574..ae7f405cce 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -71,6 +71,7 @@ atom absoluteURI atom ac atom accessor atom active +atom active_tasks atom all atom all_but_first atom all_names @@ -209,6 +210,7 @@ atom dsend_continue_trap atom dunlink atom duplicate_bag atom dupnames +atom einval atom elib_malloc atom emulator atom enable_trace @@ -350,6 +352,7 @@ atom memory_internal atom memory_types atom message atom message_binary +atom message_queue_data atom message_queue_len atom messages atom merge_trap @@ -361,6 +364,7 @@ atom min_heap_size atom min_bin_vheap_size atom minor_version atom Minus='-' +atom mixed atom module atom module_info atom monitored_by @@ -423,11 +427,12 @@ atom notify atom notsup atom nouse_stdio atom objects -atom off_heap_message_queue +atom off_heap atom offset atom ok atom old_heap_block_size atom old_heap_size +atom on_heap atom on_load atom open atom open_error @@ -505,6 +510,7 @@ atom return_from atom return_to atom return_trace atom run_queue +atom run_queue_lengths atom runnable atom runnable_ports atom runnable_procs @@ -572,7 +578,9 @@ atom timeout_value atom Times='*' atom timestamp atom total +atom total_active_tasks atom total_heap_size +atom total_run_queue_lengths atom tpkt atom trace trace_ts traced atom trace_control_word diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index 6b6c066211..956454d9b3 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -826,6 +826,9 @@ check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp); erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MSGQ); + literals = (char*) modp->old.code_hdr->literals_start; + lit_bsize = (char*) modp->old.code_hdr->literals_end - literals; + for (msgp = rp->msg.first; msgp; msgp = msgp->next) { if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) hfrag = &msgp->hfrag; @@ -837,16 +840,13 @@ check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) if (check_mod_funs(rp, &hfrag->off_heap, mod_start, mod_size)) return am_true; /* Should not contain any constants... */ - ASSERT(!any_heap_ref_ptrs(&hfrag->mem[0], - &hfrag->mem[hfrag->used_size], - mod_start, - mod_size)); + ASSERT(!any_heap_refs(&hfrag->mem[0], + &hfrag->mem[hfrag->used_size], + literals, + lit_bsize)); } } - literals = (char*) modp->old.code_hdr->literals_start; - lit_bsize = (char*) modp->old.code_hdr->literals_end - literals; - while (1) { /* Check heap, stack etc... */ @@ -881,7 +881,7 @@ check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) hp = &hfrag->mem[0]; hp_end = &hfrag->mem[hfrag->used_size]; - if (any_heap_ref_ptrs(hp, hp_end, mod_start, lit_bsize)) + if (any_heap_refs(hp, hp_end, literals, lit_bsize)) goto try_literal_gc; } @@ -902,7 +902,7 @@ check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) hp = &hfrag->mem[0]; hp_end = &hfrag->mem[hfrag->used_size]; - ASSERT(!any_heap_ref_ptrs(hp, hp_end, mod_start, lit_bsize)); + ASSERT(!any_heap_refs(hp, hp_end, literals, lit_bsize)); } } diff --git a/erts/emulator/beam/beam_bp.c b/erts/emulator/beam/beam_bp.c index 2a8663d7ee..2a23da4f25 100644 --- a/erts/emulator/beam/beam_bp.c +++ b/erts/emulator/beam/beam_bp.c @@ -75,6 +75,16 @@ extern BeamInstr beam_return_time_trace[1]; /* OpCode(i_return_time_trace) */ erts_smp_atomic32_t erts_active_bp_index; erts_smp_atomic32_t erts_staging_bp_index; +/* + * Inlined helpers + */ + +static ERTS_INLINE ErtsMonotonicTime +get_mtime(Process *c_p) +{ + return erts_get_monotonic_time(ERTS_PROC_GET_SCHDATA(c_p)); +} + /* ************************************************************************* ** Local prototypes */ @@ -97,9 +107,6 @@ static int clear_function_break(BeamInstr *pc, Uint break_flags); static BpDataTime* get_time_break(BeamInstr *pc); static GenericBpData* check_break(BeamInstr *pc, Uint break_flags); -static void bp_time_diff(bp_data_time_item_t *item, - process_breakpoint_time_t *pbt, - Uint ms, Uint s, Uint us); static void bp_meta_unref(BpMetaPid* bmp); static void bp_count_unref(BpCount* bcp); @@ -110,13 +117,8 @@ static void uninstall_breakpoint(BeamInstr* pc); /* bp_hash */ #define BP_TIME_ADD(pi0, pi1) \ do { \ - Uint r; \ (pi0)->count += (pi1)->count; \ - (pi0)->s_time += (pi1)->s_time; \ - (pi0)->us_time += (pi1)->us_time; \ - r = (pi0)->us_time / 1000000; \ - (pi0)->s_time += r; \ - (pi0)->us_time = (pi0)->us_time % 1000000; \ + (pi0)->time += (pi1)->time; \ } while(0) static void bp_hash_init(bp_time_hash_t *hash, Uint n); @@ -948,7 +950,7 @@ do_call_trace(Process* c_p, BeamInstr* I, Eterm* reg, void erts_trace_time_call(Process* c_p, BeamInstr* I, BpDataTime* bdt) { - Uint ms,s,us; + ErtsMonotonicTime time; process_breakpoint_time_t *pbt = NULL; bp_data_time_item_t sitem, *item = NULL; bp_time_hash_t *h = NULL; @@ -961,7 +963,7 @@ erts_trace_time_call(Process* c_p, BeamInstr* I, BpDataTime* bdt) * from the process psd */ pbt = ERTS_PROC_GET_CALL_TIME(c_p); - get_sys_now(&ms, &s, &us); + time = get_mtime(c_p); /* get pbt * timestamp = t0 @@ -976,7 +978,7 @@ erts_trace_time_call(Process* c_p, BeamInstr* I, BpDataTime* bdt) } else { ASSERT(pbt->pc); /* add time to previous code */ - bp_time_diff(&sitem, pbt, ms, s, us); + sitem.time = time - pbt->time; sitem.pid = c_p->common.id; sitem.count = 0; @@ -1002,8 +1004,7 @@ erts_trace_time_call(Process* c_p, BeamInstr* I, BpDataTime* bdt) /* Add count to this code */ sitem.pid = c_p->common.id; sitem.count = 1; - sitem.s_time = 0; - sitem.us_time = 0; + sitem.time = 0; /* this breakpoint */ ASSERT(bdt); @@ -1020,15 +1021,13 @@ erts_trace_time_call(Process* c_p, BeamInstr* I, BpDataTime* bdt) } pbt->pc = I; - pbt->ms = ms; - pbt->s = s; - pbt->us = us; + pbt->time = time; } void erts_trace_time_return(Process *p, BeamInstr *pc) { - Uint ms,s,us; + ErtsMonotonicTime time; process_breakpoint_time_t *pbt = NULL; bp_data_time_item_t sitem, *item = NULL; bp_time_hash_t *h = NULL; @@ -1041,7 +1040,7 @@ erts_trace_time_return(Process *p, BeamInstr *pc) * from the process psd */ pbt = ERTS_PROC_GET_CALL_TIME(p); - get_sys_now(&ms,&s,&us); + time = get_mtime(p); /* get pbt * lookup bdt from code @@ -1057,7 +1056,7 @@ erts_trace_time_return(Process *p, BeamInstr *pc) */ ASSERT(pbt->pc); - bp_time_diff(&sitem, pbt, ms, s, us); + sitem.time = time - pbt->time; sitem.pid = p->common.id; sitem.count = 0; @@ -1080,9 +1079,7 @@ erts_trace_time_return(Process *p, BeamInstr *pc) } pbt->pc = pc; - pbt->ms = ms; - pbt->s = s; - pbt->us = us; + pbt->time = time; } } @@ -1183,10 +1180,14 @@ int erts_is_time_break(Process *p, BeamInstr *pc, Eterm *retval) { for(ix = 0; ix < hash.n; ix++) { item = &(hash.item[ix]); if (item->pid != NIL) { + ErtsMonotonicTime sec, usec; + usec = ERTS_MONOTONIC_TO_USEC(item->time); + sec = usec / 1000000; + usec = usec - sec*1000000; t = TUPLE4(hp, item->pid, make_small(item->count), - make_small(item->s_time), - make_small(item->us_time)); + make_small((Uint) sec), + make_small((Uint) usec)); hp += 5; *retval = CONS(hp, t, *retval); hp += 2; } @@ -1266,8 +1267,7 @@ static void bp_hash_rehash(bp_time_hash_t *hash, Uint n) { } item[hval].pid = hash->item[ix].pid; item[hval].count = hash->item[ix].count; - item[hval].s_time = hash->item[ix].s_time; - item[hval].us_time = hash->item[ix].us_time; + item[hval].time = hash->item[ix].time; } } @@ -1315,8 +1315,7 @@ static ERTS_INLINE bp_data_time_item_t * bp_hash_put(bp_time_hash_t *hash, bp_da item = &(hash->item[hval]); item->pid = sitem->pid; - item->s_time = sitem->s_time; - item->us_time = sitem->us_time; + item->time = sitem->time; item->count = sitem->count; hash->used++; @@ -1330,41 +1329,7 @@ static void bp_hash_delete(bp_time_hash_t *hash) { hash->item = NULL; } -static void bp_time_diff(bp_data_time_item_t *item, /* out */ - process_breakpoint_time_t *pbt, /* in */ - Uint ms, Uint s, Uint us) { - int ds,dus; -#ifdef DEBUG - int dms; - - - dms = ms - pbt->ms; -#endif - ds = s - pbt->s; - dus = us - pbt->us; - - /* get_sys_now may return zero difftime, - * this is ok. - */ - -#ifdef DEBUG - ASSERT(dms >= 0 || ds >= 0 || dus >= 0); -#endif - - if (dus < 0) { - dus += 1000000; - ds -= 1; - } - if (ds < 0) { - ds += 1000000; - } - - item->s_time = ds; - item->us_time = dus; -} - void erts_schedule_time_break(Process *p, Uint schedule) { - Uint ms, s, us; process_breakpoint_time_t *pbt = NULL; bp_data_time_item_t sitem, *item = NULL; bp_time_hash_t *h = NULL; @@ -1387,8 +1352,7 @@ void erts_schedule_time_break(Process *p, Uint schedule) { pbdt = get_time_break(pbt->pc); if (pbdt) { - get_sys_now(&ms,&s,&us); - bp_time_diff(&sitem, pbt, ms, s, us); + sitem.time = get_mtime(p) - pbt->time; sitem.pid = p->common.id; sitem.count = 0; @@ -1410,10 +1374,7 @@ void erts_schedule_time_break(Process *p, Uint schedule) { * timestamp it and remove the previous * timestamp in the psd. */ - get_sys_now(&ms,&s,&us); - pbt->ms = ms; - pbt->s = s; - pbt->us = us; + pbt->time = get_mtime(p); break; default : ASSERT(0); diff --git a/erts/emulator/beam/beam_bp.h b/erts/emulator/beam/beam_bp.h index 97d0539ac7..2b89d6fc71 100644 --- a/erts/emulator/beam/beam_bp.h +++ b/erts/emulator/beam/beam_bp.h @@ -29,8 +29,7 @@ typedef struct { Eterm pid; Sint count; - Uint s_time; - Uint us_time; + ErtsMonotonicTime time; } bp_data_time_item_t; typedef struct { @@ -46,9 +45,7 @@ typedef struct bp_data_time { /* Call time */ } BpDataTime; typedef struct { - Uint ms; - Uint s; - Uint us; + ErtsMonotonicTime time; BeamInstr *pc; } process_breakpoint_time_t; /* used within psd */ diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c index 1a4133bceb..4d7b00b032 100644 --- a/erts/emulator/beam/beam_emu.c +++ b/erts/emulator/beam/beam_emu.c @@ -1843,8 +1843,8 @@ void process_main(void) * in the queue. This since messages with data outside * the heap will be corrupted by a GC. */ - ASSERT(!(c_p->flags & F_DISABLE_GC)); - c_p->flags |= F_DISABLE_GC; + ASSERT(!(c_p->flags & F_DELAY_GC)); + c_p->flags |= F_DELAY_GC; loop_rec__: PROCESS_MAIN_CHK_LOCKS(c_p); @@ -1858,7 +1858,7 @@ void process_main(void) if (ERTS_PROC_PENDING_EXIT(c_p)) { erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); SWAPOUT; - c_p->flags &= ~F_DISABLE_GC; + c_p->flags &= ~F_DELAY_GC; goto do_schedule; /* Will be rescheduled for exit */ } ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p); @@ -1868,7 +1868,7 @@ void process_main(void) else #endif { - c_p->flags &= ~F_DISABLE_GC; + c_p->flags &= ~F_DELAY_GC; SET_I((BeamInstr *) Arg(0)); Goto(*I); /* Jump to a wait or wait_timeout instruction */ } @@ -1978,7 +1978,7 @@ void process_main(void) CANCEL_TIMER(c_p); erts_save_message_in_proc(c_p, msgp); - c_p->flags &= ~F_DISABLE_GC; + c_p->flags &= ~F_DELAY_GC; if (ERTS_IS_GC_DESIRED_INTERNAL(c_p, HTOP, E)) { /* @@ -2000,7 +2000,7 @@ void process_main(void) */ OpCase(loop_rec_end_f): { - ASSERT(c_p->flags & F_DISABLE_GC); + ASSERT(c_p->flags & F_DELAY_GC); SET_I((BeamInstr *) Arg(0)); SAVE_MESSAGE(c_p); @@ -2009,7 +2009,7 @@ void process_main(void) goto loop_rec__; } - c_p->flags &= ~F_DISABLE_GC; + c_p->flags &= ~F_DELAY_GC; c_p->i = I; SWAPOUT; c_p->arity = 0; @@ -3558,6 +3558,16 @@ do { \ StoreBifResult(1, result); } + OpCase(i_get_hash_cId): + { + Eterm arg; + Eterm result; + + GetArg1(0, arg); + result = erts_pd_hash_get_with_hx(c_p, Arg(1), arg); + StoreBifResult(2, result); + } + { Eterm case_end_val; diff --git a/erts/emulator/beam/beam_load.c b/erts/emulator/beam/beam_load.c index 5db971b6af..d367cce212 100644 --- a/erts/emulator/beam/beam_load.c +++ b/erts/emulator/beam/beam_load.c @@ -4284,6 +4284,53 @@ gen_get_map_element(LoaderState* stp, GenOpArg Fail, GenOpArg Src, return op; } +static int +hash_internal_genop_arg(LoaderState* stp, GenOpArg Key, Uint32* hx) +{ + switch (Key.type) { + case TAG_a: + *hx = atom_tab(atom_val(Key.val))->slot.bucket.hvalue; + return 1; + case TAG_i: + *hx = Key.val; + return 1; + case TAG_n: + *hx = make_internal_hash(NIL); + return 1; + case TAG_q: + *hx = make_internal_hash(stp->literals[Key.val].term); + return 1; + default: + return 0; + } +} + + +static GenOp* +gen_get(LoaderState* stp, GenOpArg Src, GenOpArg Dst) +{ + GenOp* op; + Uint32 hx = 0; + + NEW_GENOP(stp, op); + op->next = NULL; + if (hash_internal_genop_arg(stp, Src, &hx)) { + op->arity = 3; + op->op = genop_i_get_hash_3; + op->a[0] = Src; + op->a[1].type = TAG_u; + op->a[1].val = (BeamInstr) hx; + op->a[2] = Dst; + } else { + op->arity = 2; + op->op = genop_i_get_2; + op->a[0] = Src; + op->a[1] = Dst; + } + return op; +} + + static GenOp* gen_get_map_elements(LoaderState* stp, GenOpArg Fail, GenOpArg Src, GenOpArg Size, GenOpArg* Rest) diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 886b19fe6e..bb9165cd79 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -910,13 +910,22 @@ BIF_RETTYPE spawn_opt_1(BIF_ALIST_1) so.priority = PRIORITY_LOW; else goto error; - } else if (arg == am_off_heap_message_queue) { - if (val == am_true) - so.flags |= SPO_OFF_HEAP_MSGQ; - else if (val == am_false) + } else if (arg == am_message_queue_data) { + switch (val) { + case am_mixed: + so.flags &= ~(SPO_OFF_HEAP_MSGQ|SPO_ON_HEAP_MSGQ); + break; + case am_on_heap: so.flags &= ~SPO_OFF_HEAP_MSGQ; - else + so.flags |= SPO_ON_HEAP_MSGQ; + break; + case am_off_heap: + so.flags &= ~SPO_ON_HEAP_MSGQ; + so.flags |= SPO_OFF_HEAP_MSGQ; + break; + default: goto error; + } } else if (arg == am_min_heap_size && is_small(val)) { Sint min_heap_size = signed_val(val); if (min_heap_size < 0) { @@ -1695,15 +1704,10 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2) } BIF_RET(old_value); } - else if (BIF_ARG_1 == am_off_heap_message_queue) { - int enable; - if (BIF_ARG_2 == am_true) - enable = 1; - else if (BIF_ARG_2 == am_false) - enable = 0; - else + else if (BIF_ARG_1 == am_message_queue_data) { + old_value = erts_change_message_queue_management(BIF_P, BIF_ARG_2); + if (is_non_value(old_value)) goto error; - old_value = erts_change_off_heap_message_queue_state(BIF_P, enable); BIF_RET(old_value); } else if (BIF_ARG_1 == am_sensitive) { diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 07d4702b92..0aee8681c6 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -115,7 +115,7 @@ bif erlang:time_offset/0 bif erlang:time_offset/1 bif erlang:timestamp/0 -bif erlang:open_port/2 +bif erts_internal:open_port/2 bif erlang:pid_to_list/1 bif erlang:ports/0 diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index d5443476ec..e31ef29562 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -1136,7 +1136,6 @@ int erts_net_message(Port *prt, Process* rp; DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE); Eterm* ctl = ctl_default; - ErlOffHeap off_heap; ErtsHeapFactory factory; Eterm* hp; Sint type; @@ -1151,9 +1150,6 @@ int erts_net_message(Port *prt, #endif UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - /* Thanks to Luke Gorrie */ - off_heap.first = NULL; - off_heap.overhead = 0; ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -1214,15 +1210,15 @@ int erts_net_message(Port *prt, } hp = ctl; - erts_factory_static_init(&factory, ctl, ctl_len, &off_heap); + erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF); arg = erts_decode_dist_ext(&factory, &ede); if (is_non_value(arg)) { #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "DIST MSG DEBUG: erts_dist_ext_size(CTL) failed:\n"); + erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext(CTL) failed:\n"); bw(buf, orig_len); #endif PURIFY_MSG("data error"); - goto data_error; + goto decode_error; } ctl_len = t - buf; @@ -1702,7 +1698,7 @@ int erts_net_message(Port *prt, goto invalid_message; } - erts_cleanup_offheap(&off_heap); + erts_factory_close(&factory); if (ctl != ctl_default) { erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); } @@ -1715,12 +1711,13 @@ int erts_net_message(Port *prt, erts_dsprintf(dsbufp, "Invalid distribution message: %.200T", arg); erts_send_error_to_logger_nogl(dsbufp); } - data_error: +decode_error: PURIFY_MSG("data error"); - erts_cleanup_offheap(&off_heap); + erts_factory_close(&factory); if (ctl != ctl_default) { erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); } +data_error: UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); erts_deliver_port_exit(prt, dep->cid, am_killed, 0); ERTS_SMP_CHK_NO_PROC_LOCKS; diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index 99458b4268..5544712e8d 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -292,7 +292,7 @@ static void set_default_literal_alloc_opts(struct au_init *ip) { SET_DEFAULT_ALLOC_OPTS(ip); - ip->enable = AU_ALLOC_DEFAULT_ENABLE(1); + ip->enable = 1; ip->thr_spec = 0; ip->atype = BESTFIT; ip->init.bf.ao = 1; diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 82c2aa4b9e..ab3ebd6bac 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -592,7 +592,7 @@ static Eterm pi_args[] = { am_min_bin_vheap_size, am_current_location, am_current_stacktrace, - am_off_heap_message_queue + am_message_queue_data }; #define ERTS_PI_ARGS ((int) (sizeof(pi_args)/sizeof(Eterm))) @@ -640,7 +640,7 @@ pi_arg2ix(Eterm arg) case am_min_bin_vheap_size: return 28; case am_current_location: return 29; case am_current_stacktrace: return 30; - case am_off_heap_message_queue: return 31; + case am_message_queue_data: return 31; default: return -1; } } @@ -1499,8 +1499,22 @@ process_info_aux(Process *BIF_P, break; } - case am_off_heap_message_queue: - res = BIF_P->flags & F_OFF_HEAP_MSGQ ? am_true : am_false; + case am_message_queue_data: + switch (rp->flags & (F_OFF_HEAP_MSGQ|F_ON_HEAP_MSGQ)) { + case F_OFF_HEAP_MSGQ: + res = am_off_heap; + break; + case F_ON_HEAP_MSGQ: + res = am_on_heap; + break; + case 0: + res = am_mixed; + break; + default: + res = am_error; + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + break; + } hp = HAlloc(BIF_P, 3); break; @@ -2665,9 +2679,18 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) BIF_RET(am_true); } #endif - else if (BIF_ARG_1 == am_off_heap_message_queue) { - BIF_RET(erts_default_spo_flags & SPO_OFF_HEAP_MSGQ - ? am_true : am_false); + else if (BIF_ARG_1 == am_message_queue_data) { + switch (erts_default_spo_flags & (SPO_ON_HEAP_MSGQ|SPO_OFF_HEAP_MSGQ)) { + case SPO_OFF_HEAP_MSGQ: + BIF_RET(am_off_heap); + case SPO_ON_HEAP_MSGQ: + BIF_RET(am_on_heap); + case 0: + BIF_RET(am_mixed); + default: + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + BIF_RET(am_error); + } } else if (ERTS_IS_ATOM_STR("compile_info",BIF_ARG_1)) { Uint sz; @@ -3176,6 +3199,39 @@ BIF_RETTYPE statistics_1(BIF_ALIST_1) if (is_non_value(res)) BIF_RET(am_undefined); BIF_TRAP1(gather_sched_wall_time_res_trap, BIF_P, res); + } else if (BIF_ARG_1 == am_total_active_tasks + || BIF_ARG_1 == am_total_run_queue_lengths) { + Uint no = erts_run_queues_len(NULL, 0, BIF_ARG_1 == am_total_active_tasks); + if (IS_USMALL(0, no)) + res = make_small(no); + else { + Eterm *hp = HAlloc(BIF_P, BIG_UINT_HEAP_SIZE); + res = uint_to_big(no, hp); + } + BIF_RET(res); + } else if (BIF_ARG_1 == am_active_tasks + || BIF_ARG_1 == am_run_queue_lengths) { + Eterm res, *hp, **hpp; + Uint sz, *szp; + int no_qs = erts_no_run_queues; + Uint *qszs = erts_alloc(ERTS_ALC_T_TMP,sizeof(Uint)*no_qs*2); + (void) erts_run_queues_len(qszs, 0, BIF_ARG_1 == am_active_tasks); + sz = 0; + szp = &sz; + hpp = NULL; + while (1) { + int i; + for (i = 0; i < no_qs; i++) + qszs[no_qs+i] = erts_bld_uint(hpp, szp, qszs[i]); + res = erts_bld_list(hpp, szp, no_qs, &qszs[no_qs]); + if (hpp) { + erts_free(ERTS_ALC_T_TMP, qszs); + BIF_RET(res); + } + hp = HAlloc(BIF_P, sz); + szp = NULL; + hpp = &hp; + } } else if (BIF_ARG_1 == am_context_switches) { Eterm cs = erts_make_integer(erts_get_total_context_switches(), BIF_P); hp = HAlloc(BIF_P, 3); @@ -3224,7 +3280,7 @@ BIF_RETTYPE statistics_1(BIF_ALIST_1) res = TUPLE2(hp, b1, b2); BIF_RET(res); } else if (BIF_ARG_1 == am_run_queue) { - res = erts_run_queues_len(NULL); + res = erts_run_queues_len(NULL, 1, 0); BIF_RET(make_small(res)); } else if (BIF_ARG_1 == am_wall_clock) { UWord w1, w2; @@ -3244,7 +3300,7 @@ BIF_RETTYPE statistics_1(BIF_ALIST_1) Uint sz, *szp; int no_qs = erts_no_run_queues; Uint *qszs = erts_alloc(ERTS_ALC_T_TMP,sizeof(Uint)*no_qs*2); - (void) erts_run_queues_len(qszs); + (void) erts_run_queues_len(qszs, 0, 0); sz = 0; szp = &sz; hpp = NULL; diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c index e47d7bcbbb..839abd0424 100644 --- a/erts/emulator/beam/erl_bif_port.c +++ b/erts/emulator/beam/erl_bif_port.c @@ -41,6 +41,7 @@ #include "external.h" #include "packet_parser.h" #include "erl_bits.h" +#include "erl_bif_unique.h" #include "dtrace-wrapper.h" static Port *open_port(Process* p, Eterm name, Eterm settings, int *err_typep, int *err_nump); @@ -50,10 +51,10 @@ static void free_args(char **); char *erts_default_arg0 = "default"; -BIF_RETTYPE open_port_2(BIF_ALIST_2) +BIF_RETTYPE erts_internal_open_port_2(BIF_ALIST_2) { Port *port; - Eterm port_id; + Eterm res; char *str; int err_type, err_num; @@ -61,27 +62,58 @@ BIF_RETTYPE open_port_2(BIF_ALIST_2) if (!port) { if (err_type == -3) { ASSERT(err_num == BADARG || err_num == SYSTEM_LIMIT); - BIF_ERROR(BIF_P, err_num); + if (err_num == BADARG) + res = am_badarg; + else if (err_num == SYSTEM_LIMIT) + res = am_system_limit; + else + /* this is only here to silence gcc, it should not happen */ + BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); } else if (err_type == -2) { str = erl_errno_id(err_num); + res = erts_atom_put((byte *) str, strlen(str), ERTS_ATOM_ENC_LATIN1, 1); } else { - str = "einval"; + res = am_einval; } - BIF_P->fvalue = erts_atom_put((byte *) str, strlen(str), ERTS_ATOM_ENC_LATIN1, 1); - BIF_ERROR(BIF_P, EXC_ERROR); - } + BIF_RET(res); + } + + if (port->drv_ptr->flags & ERL_DRV_FLAG_USE_INIT_ACK) { + /* Copied from erl_port_task.c */ + port->async_open_port = erts_alloc(ERTS_ALC_T_PRTSD, + sizeof(*port->async_open_port)); + erts_make_ref_in_array(port->async_open_port->ref); + port->async_open_port->to = BIF_P->common.id; + + erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCKS_MSG_RECEIVE | ERTS_PROC_LOCK_LINK); + if (ERTS_PROC_PENDING_EXIT(BIF_P)) { + /* need to exit caller instead */ + erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCKS_MSG_RECEIVE | ERTS_PROC_LOCK_LINK); + KILL_CATCHES(BIF_P); + BIF_P->freason = EXC_EXIT; + erts_port_release(port); + BIF_RET(am_badarg); + } + + ERTS_SMP_MSGQ_MV_INQ2PRIVQ(BIF_P); + BIF_P->msg.save = BIF_P->msg.last; - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK); + erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCKS_MSG_RECEIVE); + + res = erts_proc_store_ref(BIF_P, port->async_open_port->ref); + } else { + res = port->common.id; + erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK); + } - port_id = port->common.id; erts_add_link(&ERTS_P_LINKS(port), LINK_PID, BIF_P->common.id); - erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, port_id); + erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, port->common.id); erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); erts_port_release(port); - BIF_RET(port_id); + BIF_RET(res); } static ERTS_INLINE Port * diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h index 1093366e08..bda4d5d1c6 100644 --- a/erts/emulator/beam/erl_driver.h +++ b/erts/emulator/beam/erl_driver.h @@ -125,7 +125,7 @@ typedef struct { #define ERL_DRV_EXTENDED_MARKER (0xfeeeeeed) #define ERL_DRV_EXTENDED_MAJOR_VERSION 3 -#define ERL_DRV_EXTENDED_MINOR_VERSION 2 +#define ERL_DRV_EXTENDED_MINOR_VERSION 3 /* * The emulator will refuse to load a driver with a major version @@ -163,6 +163,7 @@ typedef struct { #define ERL_DRV_FLAG_USE_PORT_LOCKING (1 << 0) #define ERL_DRV_FLAG_SOFT_BUSY (1 << 1) #define ERL_DRV_FLAG_NO_BUSY_MSGQ (1 << 2) +#define ERL_DRV_FLAG_USE_INIT_ACK (1 << 3) /* * Integer types @@ -690,6 +691,12 @@ EXTERN char *driver_dl_error(void); EXTERN int erl_drv_putenv(const char *key, char *value); EXTERN int erl_drv_getenv(const char *key, char *value, size_t *value_size); +/* spawn start init ack */ +EXTERN void erl_drv_init_ack(ErlDrvPort ix, ErlDrvData res); + +/* set the pid seen in port_info */ +EXTERN void erl_drv_set_os_pid(ErlDrvPort ix, ErlDrvSInt pid); + #endif /* !ERL_DRIVER_TYPES_ONLY */ #ifdef WIN32_DYNAMIC_ERL_DRIVER diff --git a/erts/emulator/beam/erl_fun.c b/erts/emulator/beam/erl_fun.c index 4268e2d40a..cff476694c 100644 --- a/erts/emulator/beam/erl_fun.c +++ b/erts/emulator/beam/erl_fun.c @@ -66,6 +66,9 @@ erts_init_fun_table(void) f.cmp = (HCMP_FUN) fun_cmp; f.alloc = (HALLOC_FUN) fun_alloc; f.free = (HFREE_FUN) fun_free; + f.meta_alloc = (HMALLOC_FUN) erts_alloc; + f.meta_free = (HMFREE_FUN) erts_free; + f.meta_print = (HMPRINT_FUN) erts_print; hash_init(ERTS_ALC_T_FUN_TABLE, &erts_fun_table, "fun_table", 16, f); } diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index c50756d56b..f1962e5cac 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -145,6 +145,7 @@ static void offset_rootset(Process *p, Sint offs, char* area, Uint area_size, Eterm* objv, int nobj); static void offset_off_heap(Process* p, Sint offs, char* area, Uint area_size); static void offset_mqueue(Process *p, Sint offs, char* area, Uint area_size); +static void move_msgq_to_heap(Process *p); static void init_gc_info(ErtsGCInfo *gcip); @@ -440,8 +441,15 @@ delay_garbage_collection(Process *p, ErlHeapFragment *live_hf_end, int need) ERTS_HOLE_CHECK(p); - if (p->live_hf_end == ERTS_INVALID_HFRAG_PTR) + if ((p->flags & F_DISABLE_GC) + && p->live_hf_end == ERTS_INVALID_HFRAG_PTR) { + /* + * A BIF yielded with disabled GC. Remember + * heap fragments created by the BIF until we + * do next GC. + */ p->live_hf_end = live_hf_end; + } if (need == 0) return 1; @@ -513,6 +521,14 @@ young_gen_usage(Process *p) Eterm *aheap; hsz = p->mbuf_sz; + + if (p->flags & F_ON_HEAP_MSGQ) { + ErtsMessage *mp; + for (mp = p->msg.first; mp; mp = mp->next) + if (mp->data.attached) + hsz += erts_msg_attached_data_size(mp); + } + aheap = p->abandoned_heap; if (!aheap) hsz += p->htop - p->heap; @@ -564,10 +580,12 @@ garbage_collect(Process* p, ErlHeapFragment *live_hf_end, DTRACE_CHARBUF(pidbuf, DTRACE_TERM_BUF_SIZE); #endif - if (p->flags & F_DISABLE_GC) + if (p->flags & (F_DISABLE_GC|F_DELAY_GC)) return delay_garbage_collection(p, live_hf_end, need); - if (p->live_hf_end != ERTS_INVALID_HFRAG_PTR) + if (p->abandoned_heap) + live_hf_end = ERTS_INVALID_HFRAG_PTR; + else if (p->live_hf_end != ERTS_INVALID_HFRAG_PTR) live_hf_end = p->live_hf_end; esdp = erts_get_scheduler_data(); @@ -734,6 +752,12 @@ erts_garbage_collect_hibernate(Process* p) p->arg_reg, p->arity); + ERTS_HEAP_FREE(ERTS_ALC_T_HEAP, + (p->abandoned_heap + ? p->abandoned_heap + : p->heap), + p->heap_sz * sizeof(Eterm)); + p->heap = heap; p->high_water = htop; p->htop = htop; @@ -1025,10 +1049,13 @@ minor_collection(Process* p, ErlHeapFragment *live_hf_end, do_minor(p, live_hf_end, (char *) mature, mature_size*sizeof(Eterm), new_sz, objv, nobj); + if (p->flags & F_ON_HEAP_MSGQ) + move_msgq_to_heap(p); + new_mature = p->old_htop - prev_old_htop; size_after = new_mature; - size_after += HEAP_TOP(p) - HEAP_START(p); + size_after += HEAP_TOP(p) - HEAP_START(p) + p->mbuf_sz; *recl += (size_before - size_after); ErtsGcQuickSanityCheck(p); @@ -1441,7 +1468,7 @@ major_collection(Process* p, ErlHeapFragment *live_hf_end, (p->abandoned_heap ? p->abandoned_heap : HEAP_START(p)), - (HEAP_END(p) - HEAP_START(p)) * sizeof(Eterm)); + p->heap_sz * sizeof(Eterm)); p->abandoned_heap = NULL; p->flags &= ~F_ABANDONED_HEAP_USE; HEAP_START(p) = n_heap; @@ -1452,9 +1479,14 @@ major_collection(Process* p, ErlHeapFragment *live_hf_end, HIGH_WATER(p) = HEAP_TOP(p); + remove_message_buffers(p); + + if (p->flags & F_ON_HEAP_MSGQ) + move_msgq_to_heap(p); + ErtsGcQuickSanityCheck(p); - size_after = HEAP_TOP(p) - HEAP_START(p); + size_after = HEAP_TOP(p) - HEAP_START(p) + p->mbuf_sz; *recl += size_before - size_after; adjusted = adjust_after_fullsweep(p, need, objv, nobj); @@ -1462,8 +1494,6 @@ major_collection(Process* p, ErlHeapFragment *live_hf_end, #ifdef HARDDEBUG disallow_heap_frag_ref_in_heap(p); #endif - remove_message_buffers(p); - ErtsGcQuickSanityCheck(p); return gc_cost(size_after, adjusted ? size_after : 0); @@ -1991,6 +2021,173 @@ collect_live_heap_frags(Process* p, ErlHeapFragment *live_hf_end, return n_htop; } +static ERTS_INLINE void +copy_one_frag(Eterm** hpp, ErlOffHeap* off_heap, + ErlHeapFragment *bp, Eterm *refs, int nrefs) +{ + Uint sz; + int i; + Sint offs; + struct erl_off_heap_header* oh; + Eterm *fhp, *hp; + + OH_OVERHEAD(off_heap, bp->off_heap.overhead); + sz = bp->used_size; + + fhp = bp->mem; + hp = *hpp; + offs = hp - fhp; + + oh = NULL; + while (sz--) { + Uint cpy_sz; + Eterm val = *fhp++; + + switch (primary_tag(val)) { + case TAG_PRIMARY_IMMED1: + *hp++ = val; + break; + case TAG_PRIMARY_LIST: + case TAG_PRIMARY_BOXED: + *hp++ = offset_ptr(val, offs); + break; + case TAG_PRIMARY_HEADER: + *hp++ = val; + switch (val & _HEADER_SUBTAG_MASK) { + case ARITYVAL_SUBTAG: + break; + case REFC_BINARY_SUBTAG: + case FUN_SUBTAG: + case EXTERNAL_PID_SUBTAG: + case EXTERNAL_PORT_SUBTAG: + case EXTERNAL_REF_SUBTAG: + oh = (struct erl_off_heap_header*) (hp-1); + cpy_sz = thing_arityval(val); + goto cpy_words; + default: + cpy_sz = header_arity(val); + + cpy_words: + ASSERT(sz >= cpy_sz); + sz -= cpy_sz; + while (cpy_sz >= 8) { + cpy_sz -= 8; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + } + switch (cpy_sz) { + case 7: *hp++ = *fhp++; + case 6: *hp++ = *fhp++; + case 5: *hp++ = *fhp++; + case 4: *hp++ = *fhp++; + case 3: *hp++ = *fhp++; + case 2: *hp++ = *fhp++; + case 1: *hp++ = *fhp++; + default: break; + } + if (oh) { + /* Add to offheap list */ + oh->next = off_heap->first; + off_heap->first = oh; + ASSERT(*hpp <= (Eterm*)oh); + ASSERT(hp > (Eterm*)oh); + oh = NULL; + } + break; + } + break; + } + } + + ASSERT(bp->used_size == hp - *hpp); + *hpp = hp; + + for (i = 0; i < nrefs; i++) { + if (is_not_immed(refs[i])) + refs[i] = offset_ptr(refs[i], offs); + } + bp->off_heap.first = NULL; +} + +static void +move_msgq_to_heap(Process *p) +{ + ErtsMessage **mpp = &p->msg.first; + + while (*mpp) { + ErtsMessage *mp = *mpp; + + if (mp->data.attached) { + ErlHeapFragment *bp; + ErtsHeapFactory factory; + + erts_factory_proc_prealloc_init(&factory, p, + erts_msg_attached_data_size(mp)); + + if (is_non_value(ERL_MESSAGE_TERM(mp))) { + if (mp->data.dist_ext) { + ASSERT(mp->data.dist_ext->heap_size >= 0); + if (is_not_nil(ERL_MESSAGE_TOKEN(mp))) { + bp = erts_dist_ext_trailer(mp->data.dist_ext); + ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp), + bp->used_size, + &factory.hp, + factory.off_heap); + erts_cleanup_offheap(&bp->off_heap); + } + ERL_MESSAGE_TERM(mp) = erts_decode_dist_ext(&factory, + mp->data.dist_ext); + erts_free_dist_ext_copy(mp->data.dist_ext); + mp->data.dist_ext = NULL; + } + } + else { + + if (mp->data.attached == ERTS_MSG_COMBINED_HFRAG) + bp = &mp->hfrag; + else + bp = mp->data.heap_frag; + + if (bp->next) + erts_move_multi_frags(&factory.hp, factory.off_heap, bp, + mp->m, ERL_MESSAGE_REF_ARRAY_SZ, 0); + else + copy_one_frag(&factory.hp, factory.off_heap, bp, + mp->m, ERL_MESSAGE_REF_ARRAY_SZ); + + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { + mp->data.heap_frag = NULL; + free_message_buffer(bp); + } + else { + ErtsMessage *tmp = erts_alloc_message(0, NULL); + sys_memcpy((void *) tmp->m, (void *) mp->m, + sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ); + tmp->next = mp->next; + if (p->msg.save == &mp->next) + p->msg.save = &tmp->next; + if (p->msg.last == &mp->next) + p->msg.last = &tmp->next; + *mpp = tmp; + mp->next = NULL; + erts_cleanup_messages(mp); + mp = tmp; + } + } + + erts_factory_close(&factory); + } + + mpp = &(*mpp)->next; + } +} + static Uint setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset) { @@ -2080,9 +2277,8 @@ setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset) case F_OFF_HEAP_MSGQ_CHNG: case 0: { /* - * Off heap message queue disabled, i.e. we may - * have references from the message queue to the - * heap... + * We do not have off heap message queue enabled, i.e. we + * need to add message queue to rootset... */ ErtsMessage *mp; @@ -2583,33 +2779,37 @@ offset_mqueue(Process *p, Sint offs, char* area, Uint area_size) { ErtsMessage* mp = p->msg.first; - while (mp != NULL) { - Eterm mesg = ERL_MESSAGE_TERM(mp); - if (is_value(mesg)) { - switch (primary_tag(mesg)) { - case TAG_PRIMARY_LIST: - case TAG_PRIMARY_BOXED: - if (ErtsInArea(ptr_val(mesg), area, area_size)) { - ERL_MESSAGE_TERM(mp) = offset_ptr(mesg, offs); + if ((p->flags & (F_OFF_HEAP_MSGQ|F_OFF_HEAP_MSGQ_CHNG)) != F_OFF_HEAP_MSGQ) { + + while (mp != NULL) { + Eterm mesg = ERL_MESSAGE_TERM(mp); + if (is_value(mesg)) { + switch (primary_tag(mesg)) { + case TAG_PRIMARY_LIST: + case TAG_PRIMARY_BOXED: + if (ErtsInArea(ptr_val(mesg), area, area_size)) { + ERL_MESSAGE_TERM(mp) = offset_ptr(mesg, offs); + } + break; } - break; } - } - mesg = ERL_MESSAGE_TOKEN(mp); - if (is_boxed(mesg) && ErtsInArea(ptr_val(mesg), area, area_size)) { - ERL_MESSAGE_TOKEN(mp) = offset_ptr(mesg, offs); - } + mesg = ERL_MESSAGE_TOKEN(mp); + if (is_boxed(mesg) && ErtsInArea(ptr_val(mesg), area, area_size)) { + ERL_MESSAGE_TOKEN(mp) = offset_ptr(mesg, offs); + } #ifdef USE_VM_PROBES - mesg = ERL_MESSAGE_DT_UTAG(mp); - if (is_boxed(mesg) && ErtsInArea(ptr_val(mesg), area, area_size)) { - ERL_MESSAGE_DT_UTAG(mp) = offset_ptr(mesg, offs); - } + mesg = ERL_MESSAGE_DT_UTAG(mp); + if (is_boxed(mesg) && ErtsInArea(ptr_val(mesg), area, area_size)) { + ERL_MESSAGE_DT_UTAG(mp) = offset_ptr(mesg, offs); + } #endif - ASSERT((is_nil(ERL_MESSAGE_TOKEN(mp)) || - is_tuple(ERL_MESSAGE_TOKEN(mp)) || - is_atom(ERL_MESSAGE_TOKEN(mp)))); - mp = mp->next; + ASSERT((is_nil(ERL_MESSAGE_TOKEN(mp)) || + is_tuple(ERL_MESSAGE_TOKEN(mp)) || + is_atom(ERL_MESSAGE_TOKEN(mp)))); + mp = mp->next; + } + } } diff --git a/erts/emulator/beam/erl_hl_timer.c b/erts/emulator/beam/erl_hl_timer.c index 6853278828..734057a12c 100644 --- a/erts/emulator/beam/erl_hl_timer.c +++ b/erts/emulator/beam/erl_hl_timer.c @@ -2018,7 +2018,7 @@ bif_timer_access_request(void *vreq) static int try_access_sched_remote_btm(ErtsSchedulerData *esdp, Process *c_p, Uint32 sid, - Eterm tref, Uint32 *trefn, + Uint32 *trefn, int async, int cancel, int info, Eterm *resp) { @@ -2078,13 +2078,13 @@ try_access_sched_remote_btm(ErtsSchedulerData *esdp, } else { ErtsMessage *mp; - Eterm tag, res, msg; + Eterm tag, res, msg, tref; Uint hsz; Eterm *hp; ErtsProcLocks proc_locks = ERTS_PROC_LOCK_MAIN; ErlOffHeap *ohp; - hsz = 4; + hsz = 4 + REF_THING_SIZE; if (time_left > (Sint64) MAX_SMALL) hsz += ERTS_SINT64_HEAP_SIZE(time_left); @@ -2095,6 +2095,13 @@ try_access_sched_remote_btm(ErtsSchedulerData *esdp, else tag = am_read_timer; + write_ref_thing(hp, + trefn[0], + trefn[1], + trefn[2]); + tref = make_internal_ref(hp); + hp += REF_THING_SIZE; + if (time_left < 0) res = am_false; else if (time_left <= (Sint64) MAX_SMALL) @@ -2145,8 +2152,8 @@ access_bif_timer(Process *c_p, Eterm tref, int cancel, int async, int info) info); ERTS_BIF_PREP_RET(ret, res); } - else if (try_access_sched_remote_btm(esdp, c_p, sid, - tref, trefn, + else if (try_access_sched_remote_btm(esdp, c_p, + sid, trefn, async, cancel, info, &res)) { ERTS_BIF_PREP_RET(ret, res); diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index 296cfdabc3..e3390c2769 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -388,6 +388,7 @@ erl_init(int ncpu, erts_mseg_late_init(); /* Must be after timer (erts_init_time()) and thread initializations */ #endif + erl_sys_late_init(); #ifdef HIPE hipe_mode_switch_init(); /* Must be after init_load/beam_catches/init */ #endif @@ -630,7 +631,8 @@ void erts_usage(void) erts_fprintf(stderr, "-W<i|w|e> set error logger warnings mapping,\n"); erts_fprintf(stderr, " see error_logger documentation for details\n"); - erts_fprintf(stderr, "-xohmq bool set default off_heap_message_queue flag for processes\n"); + erts_fprintf(stderr, "-xmqd val set default message queue data flag for processes,\n"); + erts_fprintf(stderr, " valid values are: off_heap | on_heap | mixed\n"); erts_fprintf(stderr, "-zdbbl size set the distribution buffer busy limit in kilobytes\n"); erts_fprintf(stderr, " valid range is [1-%d]\n", INT_MAX/1024); erts_fprintf(stderr, "-zdntgc time set delayed node table gc in seconds\n"); @@ -2020,15 +2022,21 @@ erl_start(int argc, char **argv) case 'x': { char *sub_param = argv[i]+2; - if (has_prefix("ohmq", sub_param)) { - arg = get_arg(sub_param+4, argv[i+1], &i); - if (sys_strcmp(arg, "true") == 0) - erts_default_spo_flags |= SPO_OFF_HEAP_MSGQ; - else if (sys_strcmp(arg, "false") == 0) + if (has_prefix("mqd", sub_param)) { + arg = get_arg(sub_param+3, argv[i+1], &i); + if (sys_strcmp(arg, "mixed") == 0) + erts_default_spo_flags &= ~(SPO_ON_HEAP_MSGQ|SPO_OFF_HEAP_MSGQ); + else if (sys_strcmp(arg, "on_heap") == 0) { erts_default_spo_flags &= ~SPO_OFF_HEAP_MSGQ; + erts_default_spo_flags |= SPO_ON_HEAP_MSGQ; + } + else if (sys_strcmp(arg, "off_heap") == 0) { + erts_default_spo_flags &= ~SPO_ON_HEAP_MSGQ; + erts_default_spo_flags |= SPO_OFF_HEAP_MSGQ; + } else { erts_fprintf(stderr, - "Invalid off_heap_message_queue flag: %s\n", arg); + "Invalid message_queue_data flag: %s\n", arg); erts_usage(); } } else { diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index 84bee976ff..f7b4bd8041 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -113,9 +113,6 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "environ", NULL }, #endif { "efile_drv", "address" }, -#if defined(ENABLE_CHILD_WAITER_THREAD) || defined(ERTS_SMP) - { "child_status", NULL }, -#endif { "drv_ev_state_grow", NULL, }, { "drv_ev_state", "address" }, { "safe_hash", "address" }, diff --git a/erts/emulator/beam/erl_map.h b/erts/emulator/beam/erl_map.h index be6f791a4e..052fa99f03 100644 --- a/erts/emulator/beam/erl_map.h +++ b/erts/emulator/beam/erl_map.h @@ -180,14 +180,17 @@ typedef struct hashmap_head_s { [one cons cell + one list term in parent node] per key [one header + one boxed term in parent node] per inner node [one header + one size word] for root node + Observed average number of nodes per key is about 0.35. */ -#define HASHMAP_HEAP_SIZE(KEYS,NODES) ((KEYS)*3 + (NODES)*2) +#define HASHMAP_WORDS_PER_KEY 3 +#define HASHMAP_WORDS_PER_NODE 2 #ifdef DEBUG -# define HASHMAP_ESTIMATED_NODE_COUNT(KEYS) (KEYS) +# define HASHMAP_ESTIMATED_TOT_NODE_SIZE(KEYS) \ + (HASHMAP_WORDS_PER_NODE * (KEYS) * 3/10) /* slightly under estimated */ #else -# define HASHMAP_ESTIMATED_NODE_COUNT(KEYS) (2*(KEYS)/5) +# define HASHMAP_ESTIMATED_TOT_NODE_SIZE(KEYS) \ + (HASHMAP_WORDS_PER_NODE * (KEYS) * 4/10) /* slightly over estimated */ #endif #define HASHMAP_ESTIMATED_HEAP_SIZE(KEYS) \ - HASHMAP_HEAP_SIZE(KEYS,HASHMAP_ESTIMATED_NODE_COUNT(KEYS)) - + ((KEYS)*HASHMAP_WORDS_PER_KEY + HASHMAP_ESTIMATED_TOT_NODE_SIZE(KEYS)) #endif diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 1811651a58..b3e74e3e6a 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -630,9 +630,24 @@ erts_try_alloc_message_on_heap(Process *pp, #endif else { in_message_fragment: - - mp = erts_alloc_message(sz, hpp); - *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + if (!((*psp) & ERTS_PSFLG_ON_HEAP_MSGQ)) { + mp = erts_alloc_message(sz, hpp); + *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + } + else { + mp = erts_alloc_message(0, NULL); + if (!sz) { + *hpp = NULL; + *ohpp = NULL; + } + else { + ErlHeapFragment *bp; + bp = new_message_buffer(sz); + *hpp = &bp->mem[0]; + mp->data.heap_frag = bp; + *ohpp = &bp->off_heap; + } + } *on_heap_p = 0; } @@ -750,7 +765,7 @@ erts_send_message(Process* sender, if (is_immed(DT_UTAG(sender))) utag = DT_UTAG(sender); else - utag = copy_struct(DT_UTAG(sender), dt_utag_size, ohp); + utag = copy_struct(DT_UTAG(sender), dt_utag_size, &hp, ohp); #ifdef DTRACE_TAG_HARDDEBUG erts_fprintf(stderr, "Dtrace -> (%T) Spreading tag (%T) with " @@ -1022,12 +1037,12 @@ erts_complete_off_heap_message_queue_change(Process *c_p) ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ); /* - * This job was first initiated when the process changed - * "off heap message queue" state from false to true. Since - * then ERTS_PSFLG_OFF_HEAP_MSGQ has been set. However, the - * state change might have been changed again (multiple times) - * since then. Check users last requested state (the flag - * F_OFF_HEAP_MSGQ), and make the state consistent with that. + * This job was first initiated when the process changed to off heap + * message queue management. Since then ERTS_PSFLG_OFF_HEAP_MSGQ + * has been set. However, the management state might have been changed + * again (multiple times) since then. Check users last requested state + * (the flags F_OFF_HEAP_MSGQ, and F_ON_HEAP_MSGQ), and make the state + * consistent with that. */ if (!(c_p->flags & F_OFF_HEAP_MSGQ)) @@ -1068,8 +1083,9 @@ change_off_heap_msgq(void *vcohmq) } Eterm -erts_change_off_heap_message_queue_state(Process *c_p, int enable) +erts_change_message_queue_management(Process *c_p, Eterm new_state) { + Eterm res; #ifdef DEBUG if (c_p->flags & F_OFF_HEAP_MSGQ) { @@ -1088,57 +1104,117 @@ erts_change_off_heap_message_queue_state(Process *c_p, int enable) } #endif - if (c_p->flags & F_OFF_HEAP_MSGQ) { - /* Off heap message queue is enabled */ + switch (c_p->flags & (F_OFF_HEAP_MSGQ|F_ON_HEAP_MSGQ)) { + + case F_OFF_HEAP_MSGQ: + res = am_off_heap; - if (!enable) { + switch (new_state) { + case am_off_heap: + break; + case am_on_heap: + c_p->flags |= F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_ON_HEAP_MSGQ); + /* fall through */ + case am_mixed: c_p->flags &= ~F_OFF_HEAP_MSGQ; /* * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ - * if a change is ongoing. It will be adjusted when the - * change completes... + * if a off heap change is ongoing. It will be adjusted + * when the change completes... */ if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { /* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */ erts_smp_atomic32_read_band_nob(&c_p->state, ~ERTS_PSFLG_OFF_HEAP_MSGQ); } + break; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + case F_ON_HEAP_MSGQ: + res = am_on_heap; + + switch (new_state) { + case am_on_heap: + break; + case am_mixed: + c_p->flags &= ~F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_ON_HEAP_MSGQ); + break; + case am_off_heap: + c_p->flags &= ~F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_ON_HEAP_MSGQ); + goto change_to_off_heap; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + case 0: + res = am_mixed; + + switch (new_state) { + case am_mixed: + break; + case am_on_heap: + c_p->flags |= F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_ON_HEAP_MSGQ); + break; + case am_off_heap: + goto change_to_off_heap; + default: + res = THE_NON_VALUE; /* badarg */ + break; } + break; - return am_true; /* Old state */ + default: + res = am_error; + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + break; } - /* Off heap message queue is disabled */ + return res; + +change_to_off_heap: - if (enable) { - c_p->flags |= F_OFF_HEAP_MSGQ; + c_p->flags |= F_OFF_HEAP_MSGQ; + + /* + * We do not have to schedule a change if + * we have an ongoing off heap change... + */ + if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { + ErtsChangeOffHeapMessageQueue *cohmq; /* - * We do not have to schedule a change if - * we have an ongoing change... + * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait + * thread progress before completing the change in + * order to ensure that all senders observe that + * messages should be passed off heap. When the + * change has completed, GC does not need to inspect + * the message queue at all. */ - if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { - ErtsChangeOffHeapMessageQueue *cohmq; - /* - * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait - * thread progress before completing the change in - * order to ensure that all senders observe that - * messages should be passed off heap. When the - * change has completed, GC does not need to inspect - * the message queue at all. - */ - erts_smp_atomic32_read_bor_nob(&c_p->state, - ERTS_PSFLG_OFF_HEAP_MSGQ); - c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; - cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, - sizeof(ErtsChangeOffHeapMessageQueue)); - cohmq->pid = c_p->common.id; - erts_schedule_thr_prgr_later_op(change_off_heap_msgq, - (void *) cohmq, - &cohmq->lop); - } + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_OFF_HEAP_MSGQ); + c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; + cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, + sizeof(ErtsChangeOffHeapMessageQueue)); + cohmq->pid = c_p->common.id; + erts_schedule_thr_prgr_later_op(change_off_heap_msgq, + (void *) cohmq, + &cohmq->lop); } - return am_false; /* Old state */ + return res; } int @@ -1501,6 +1577,9 @@ void erts_factory_selfcontained_message_init(ErtsHeapFactory* factory, ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end); } +/* One static sized heap that must suffice. + No extra heap fragments will be allocated. +*/ void erts_factory_static_init(ErtsHeapFactory* factory, Eterm* hp, Uint size, @@ -1515,6 +1594,23 @@ void erts_factory_static_init(ErtsHeapFactory* factory, factory->off_heap_saved.overhead = factory->off_heap->overhead; } +/* A temporary heap with default buffer allocated/freed by client. + * factory_close is same as factory_undo + */ +void erts_factory_tmp_init(ErtsHeapFactory* factory, Eterm* hp, Uint size, + Uint32 atype) +{ + factory->mode = FACTORY_TMP; + factory->hp_start = hp; + factory->hp = hp; + factory->hp_end = hp + size; + factory->heap_frags = NULL; + factory->off_heap_saved.first = NULL; + factory->off_heap_saved.overhead = 0; + factory->off_heap = &factory->off_heap_saved; + factory->alloc_type = atype; +} + /* When we know the term is an immediate and need no heap. */ void erts_factory_dummy_init(ErtsHeapFactory* factory) @@ -1565,6 +1661,7 @@ static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra) else { /* Fall through */ case FACTORY_HEAP_FRAGS: + case FACTORY_TMP: bp = factory->heap_frags; } @@ -1630,6 +1727,9 @@ void erts_factory_close(ErtsHeapFactory* factory) bp->used_size = factory->hp - bp->mem; } break; + case FACTORY_TMP: + erts_factory_undo(factory); + break; case FACTORY_STATIC: break; case FACTORY_CLOSED: break; default: @@ -1756,8 +1856,20 @@ void erts_factory_undo(ErtsHeapFactory* factory) factory->message->data.heap_frag = factory->heap_frags; erts_cleanup_messages(factory->message); break; + case FACTORY_TMP: case FACTORY_HEAP_FRAGS: - free_message_buffer(factory->heap_frags); + erts_cleanup_offheap(factory->off_heap); + factory->off_heap->first = NULL; + + bp = factory->heap_frags; + while (bp != NULL) { + ErlHeapFragment* next_bp = bp->next; + + ASSERT(bp->off_heap.first == NULL); + ERTS_HEAP_FREE(factory->alloc_type, (void *) bp, + ERTS_HEAP_FRAG_SIZE(bp->alloc_size)); + bp = next_bp; + } break; case FACTORY_CLOSED: break; diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 76387bc34c..60035d15ae 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -58,7 +58,8 @@ typedef struct { FACTORY_HALLOC, FACTORY_MESSAGE, FACTORY_HEAP_FRAGS, - FACTORY_STATIC + FACTORY_STATIC, + FACTORY_TMP } mode; Process* p; Eterm* hp_start; @@ -80,6 +81,7 @@ ErtsMessage *erts_factory_message_create(ErtsHeapFactory *, Process *, ErtsProcLocks *, Uint sz); void erts_factory_selfcontained_message_init(ErtsHeapFactory*, ErtsMessage *, Eterm *); void erts_factory_static_init(ErtsHeapFactory*, Eterm* hp, Uint size, ErlOffHeap*); +void erts_factory_tmp_init(ErtsHeapFactory*, Eterm* hp, Uint size, Uint32 atype); void erts_factory_dummy_init(ErtsHeapFactory*); Eterm* erts_produce_heap(ErtsHeapFactory*, Uint need, Uint xtra); @@ -284,7 +286,7 @@ void erts_cleanup_offheap(ErlOffHeap *offheap); void erts_save_message_in_proc(Process *p, ErtsMessage *msg); Sint erts_move_messages_off_heap(Process *c_p); Sint erts_complete_off_heap_message_queue_change(Process *c_p); -Eterm erts_change_off_heap_message_queue_state(Process *c_p, int enable); +Eterm erts_change_message_queue_management(Process *c_p, Eterm new_state); int erts_decode_dist_message(Process *, ErtsProcLocks, ErtsMessage *, int); diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 74e3e81d6f..13f14adbab 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -752,6 +752,10 @@ erts_set_this_node(Eterm sysname, Uint creation) erts_this_dist_entry = erts_this_node->dist_entry; erts_refc_inc(&erts_this_dist_entry->refc, 2); + + erts_this_node_sysname = erts_this_node_sysname_BUFFER; + erts_snprintf(erts_this_node_sysname, sizeof(erts_this_node_sysname_BUFFER), + "%T", sysname); } Uint @@ -783,10 +787,13 @@ void erts_init_node_tables(int dd_sec) erts_smp_rwmtx_init_opt(&erts_node_table_rwmtx, &rwmtx_opt, "node_table"); erts_smp_rwmtx_init_opt(&erts_dist_table_rwmtx, &rwmtx_opt, "dist_table"); - f.hash = (H_FUN) dist_table_hash; - f.cmp = (HCMP_FUN) dist_table_cmp; - f.alloc = (HALLOC_FUN) dist_table_alloc; - f.free = (HFREE_FUN) dist_table_free; + f.hash = (H_FUN) dist_table_hash; + f.cmp = (HCMP_FUN) dist_table_cmp; + f.alloc = (HALLOC_FUN) dist_table_alloc; + f.free = (HFREE_FUN) dist_table_free; + f.meta_alloc = (HMALLOC_FUN) erts_alloc; + f.meta_free = (HMFREE_FUN) erts_free; + f.meta_print = (HMPRINT_FUN) erts_print; hash_init(ERTS_ALC_T_DIST_TABLE, &erts_dist_table, "dist_table", 11, f); f.hash = (H_FUN) node_table_hash; diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 64278d2ea0..fb2f2a5407 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -41,6 +41,7 @@ #include "sys.h" #include "hash.h" +#include "erl_alloc.h" #include "erl_process.h" #include "erl_monitors.h" #include "erl_smp.h" diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index acd68ef0ad..fa97707a87 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -187,6 +187,11 @@ struct _erl_drv_port { ErtsPrtSD *psd; /* Port specific data */ int reds; /* Only used while executing driver callbacks */ + + struct { + Eterm to; + Uint32 ref[ERTS_MAX_REF_NUMBERS]; + } *async_open_port; /* Reference used with async open port */ }; @@ -687,7 +692,7 @@ erts_drvport2port_state(ErlDrvPort drvport, erts_aint32_t *statep) Port *prt = ERTS_ErlDrvPort2Port(drvport); erts_aint32_t state; ASSERT(prt); - ERTS_LC_ASSERT(erts_lc_is_emu_thr()); +// ERTS_LC_ASSERT(erts_lc_is_emu_thr()); if (prt == ERTS_INVALID_ERL_DRV_PORT) return ERTS_INVALID_ERL_DRV_PORT; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt) @@ -944,4 +949,9 @@ ErtsPortOpResult erts_port_control(Process *, Port *, unsigned int, Eterm, Eterm ErtsPortOpResult erts_port_call(Process *, Port *, unsigned int, Eterm, Eterm *); ErtsPortOpResult erts_port_info(Process *, Port *, Eterm, Eterm *); +/* + * Signals from ports to ports. Used by sys drivers. + */ +int erl_drv_port_control(Eterm, char, char*, ErlDrvSizeT); + #endif diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index a714068314..9495436ba6 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -335,7 +335,6 @@ static ErtsAlignedSchedulerSleepInfo *aligned_dirty_io_sched_sleep_info; static Uint last_reductions; static Uint last_exact_reductions; -Uint erts_default_process_flags; Eterm erts_system_monitor; Eterm erts_system_monitor_long_gc; Uint erts_system_monitor_long_schedule; @@ -499,9 +498,6 @@ dbg_chk_aux_work_val(erts_aint32_t value) #if HAVE_ERTS_MSEG valid |= ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK; #endif -#ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN - valid |= ERTS_SSI_AUX_WORK_CHECK_CHILDREN; -#endif #ifdef ERTS_SSI_AUX_WORK_REAP_PORTS valid |= ERTS_SSI_AUX_WORK_REAP_PORTS; #endif @@ -588,8 +584,6 @@ erts_pre_init_process(void) = "MISC_THR_PRGR"; erts_aux_work_flag_descr[ERTS_SSI_AUX_WORK_MISC_IX] = "MISC"; - erts_aux_work_flag_descr[ERTS_SSI_AUX_WORK_CHECK_CHILDREN_IX] - = "CHECK_CHILDREN"; erts_aux_work_flag_descr[ERTS_SSI_AUX_WORK_SET_TMO_IX] = "SET_TMO"; erts_aux_work_flag_descr[ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK_IX] @@ -677,7 +671,6 @@ erts_init_process(int ncpu, int proc_tab_size, int legacy_proc_tab) last_reductions = 0; last_exact_reductions = 0; - erts_default_process_flags = 0; } void @@ -2097,34 +2090,6 @@ erts_debug_wait_completed(Process *c_p, int flags) } -#ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN -void -erts_smp_notify_check_children_needed(void) -{ - int i; - for (i = 0; i < erts_no_schedulers; i++) - set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(i), - ERTS_SSI_AUX_WORK_CHECK_CHILDREN); -#ifdef ERTS_DIRTY_SCHEDULERS - for (i = 0; i < erts_no_dirty_cpu_schedulers; i++) - set_aux_work_flags_wakeup_nob(ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(i), - ERTS_SSI_AUX_WORK_CHECK_CHILDREN); - for (i = 0; i < erts_no_dirty_io_schedulers; i++) - set_aux_work_flags_wakeup_nob(ERTS_DIRTY_IO_SCHED_SLEEP_INFO_IX(i), - ERTS_SSI_AUX_WORK_CHECK_CHILDREN); -#endif -} - -static ERTS_INLINE erts_aint32_t -handle_check_children(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting) -{ - unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_CHECK_CHILDREN); - erts_check_children(); - return aux_work & ~ERTS_SSI_AUX_WORK_CHECK_CHILDREN; -} - -#endif - static void notify_reap_ports_relb(void) { @@ -2278,10 +2243,6 @@ handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t orig_aux_work, int waiting) HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_MISC, handle_misc_aux_work); -#ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN - HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_CHECK_CHILDREN, - handle_check_children); -#endif HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_SET_TMO, handle_setup_aux_work_timer); @@ -3099,7 +3060,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) } #ifndef ERTS_SMP - if (rq->len != 0 || rq->misc.start) + if (erts_smp_atomic32_read_dirty(&rq->len) != 0 || rq->misc.start) goto sys_woken; #else flgs = erts_smp_atomic32_read_acqb(&ssi->flags); @@ -3198,7 +3159,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) } #ifndef ERTS_SMP - if (rq->len == 0 && !rq->misc.start) + if (erts_smp_atomic32_read_dirty(&rq->len) == 0 && !rq->misc.start) goto sys_aux_work; sys_woken: #else @@ -4915,7 +4876,7 @@ erts_fprintf(stderr, "--------------------------------\n"); rq->out_of_work_count = 0; (void) ERTS_RUNQ_FLGS_READ_BSET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO, flags); - rq->max_len = rq->len; + rq->max_len = erts_smp_atomic32_read_dirty(&rq->len); for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { ErtsRunQueueInfo *rqi; rqi = (pix == ERTS_PORT_PRIO_LEVEL @@ -5073,7 +5034,7 @@ wakeup_other_check(ErtsRunQueue *rq, Uint32 flags) { int wo_reds = rq->wakeup_other_reds; if (wo_reds) { - int left_len = rq->len - 1; + int left_len = erts_smp_atomic32_read_dirty(&rq->len) - 1; if (left_len < 1) { int wo_reduce = wo_reds << wakeup_other.dec_shift; wo_reduce &= wakeup_other.dec_mask; @@ -5146,7 +5107,7 @@ wakeup_other_check_legacy(ErtsRunQueue *rq, Uint32 flags) { int wo_reds = rq->wakeup_other_reds; if (wo_reds) { - erts_aint32_t len = rq->len; + erts_aint32_t len = erts_smp_atomic32_read_dirty(&rq->len); if (len < 2) { rq->wakeup_other -= ERTS_WAKEUP_OTHER_DEC_LEGACY*wo_reds; if (rq->wakeup_other < 0) @@ -5242,7 +5203,7 @@ runq_supervisor(void *unused) ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); if (ERTS_RUNQ_FLGS_GET(rq) & ERTS_RUNQ_FLG_NONEMPTY) { erts_smp_runq_lock(rq); - if (rq->len != 0) + if (erts_smp_atomic32_read_dirty(&rq->len) != 0) wake_scheduler_on_empty_runq(rq); /* forced wakeup... */ erts_smp_runq_unlock(rq); } @@ -5583,7 +5544,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online } rq->out_of_work_count = 0; rq->max_len = 0; - rq->len = 0; + erts_smp_atomic32_set_nob(&rq->len, 0); rq->wakeup_other = 0; rq->wakeup_other_reds = 0; rq->halt_in_progress = 0; @@ -7952,6 +7913,9 @@ sched_thread_func(void *vesdp) erts_sched_init_time_sup(esdp); + (void) ERTS_RUNQ_FLGS_SET_NOB(esdp->run_queue, + ERTS_RUNQ_FLG_EXEC); + #ifdef ERTS_SMP tse = erts_tse_fetch(); erts_tse_prepare_timed(tse); @@ -8949,24 +8913,39 @@ resume_process_1(BIF_ALIST_1) } Uint -erts_run_queues_len(Uint *qlen) +erts_run_queues_len(Uint *qlen, int atomic_queues_read, int incl_active_sched) { int i = 0; Uint len = 0; - ERTS_ATOMIC_FOREACH_RUNQ(rq, - { - Sint pqlen = 0; - int pix; - for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) - pqlen += RUNQ_READ_LEN(&rq->procs.prio_info[pix].len); + if (atomic_queues_read) + ERTS_ATOMIC_FOREACH_RUNQ(rq, + { + Sint rq_len = (Sint) erts_smp_atomic32_read_dirty(&rq->len); + ASSERT(rq_len >= 0); + if (incl_active_sched + && (ERTS_RUNQ_FLGS_GET_NOB(rq) & ERTS_RUNQ_FLG_EXEC)) { + rq_len++; + } + if (qlen) + qlen[i++] = rq_len; + len += (Uint) rq_len; + } + ); + else { + for (i = 0; i < erts_no_run_queues; i++) { + ErtsRunQueue *rq = ERTS_RUNQ_IX(i); + Sint rq_len = (Sint) erts_smp_atomic32_read_nob(&rq->len); + ASSERT(rq_len >= 0); + if (incl_active_sched + && (ERTS_RUNQ_FLGS_GET_NOB(rq) & ERTS_RUNQ_FLG_EXEC)) { + rq_len++; + } + if (qlen) + qlen[i] = rq_len; + len += (Uint) rq_len; + } - if (pqlen < 0) - pqlen = 0; - if (qlen) - qlen[i++] = pqlen; - len += pqlen; } - ); return len; } @@ -9259,6 +9238,8 @@ Process *schedule(Process *p, int calls) } else { sched_out_proc: + ASSERT(!(p->flags & F_DELAY_GC)); + #ifdef ERTS_SMP ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); esdp = p->scheduler_data; @@ -9393,8 +9374,10 @@ Process *schedule(Process *p, int calls) if (flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND|ERTS_RUNQ_FLG_SUSPENDED)) { if (flags & ERTS_RUNQ_FLG_SUSPENDED) { + (void) ERTS_RUNQ_FLGS_UNSET_NOB(rq, ERTS_RUNQ_FLG_EXEC); suspend_scheduler(esdp); - flags = ERTS_RUNQ_FLGS_GET_NOB(rq); + flags = ERTS_RUNQ_FLGS_SET_NOB(rq, ERTS_RUNQ_FLG_EXEC); + flags |= ERTS_RUNQ_FLG_EXEC; } if (flags & ERTS_RUNQ_FLG_CHK_CPU_BIND) { flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND); @@ -9483,7 +9466,10 @@ Process *schedule(Process *p, int calls) } #endif + (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_EXEC); scheduler_wait(&fcalls, esdp, rq); + flags = ERTS_RUNQ_FLGS_SET_NOB(rq, ERTS_RUNQ_FLG_EXEC); + flags |= ERTS_RUNQ_FLG_EXEC; #ifdef ERTS_SMP non_empty_runq(rq); @@ -10732,7 +10718,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). Eterm args, /* Arguments for function (must be well-formed list). */ ErlSpawnOpts* so) /* Options for spawn. */ { - Uint flags = erts_default_process_flags; + Uint flags = 0; ErtsRunQueue *rq = NULL; Process *p; Sint arity; /* Number of arguments. */ @@ -10778,6 +10764,10 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). state |= ERTS_PSFLG_OFF_HEAP_MSGQ; flags |= F_OFF_HEAP_MSGQ; } + else if (so->flags & SPO_ON_HEAP_MSGQ) { + state |= ERTS_PSFLG_ON_HEAP_MSGQ; + flags |= F_ON_HEAP_MSGQ; + } if (!rq) rq = erts_get_runq_proc(parent); @@ -11267,6 +11257,7 @@ erts_cleanup_empty_process(Process* p) static void delete_process(Process* p) { + Eterm *heap; VERBOSE(DEBUG_PROCESSES, ("Removing process: %T\n",p->common.id)); VERBOSE(DEBUG_SHCOPY, ("[pid=%T] delete process: %p %p %p %p\n", p->common.id, HEAP_START(p), HEAP_END(p), OLD_HEAP(p), OLD_HEND(p))); @@ -11293,16 +11284,17 @@ delete_process(Process* p) * Release heaps. Clobber contents in DEBUG build. */ - -#ifdef DEBUG - sys_memset(p->heap, DEBUG_BAD_BYTE, p->heap_sz*sizeof(Eterm)); -#endif - #ifdef HIPE hipe_delete_process(&p->hipe); #endif - ERTS_HEAP_FREE(ERTS_ALC_T_HEAP, (void*) p->heap, p->heap_sz*sizeof(Eterm)); + heap = p->abandoned_heap ? p->abandoned_heap : p->heap; + +#ifdef DEBUG + sys_memset(heap, DEBUG_BAD_BYTE, p->heap_sz*sizeof(Eterm)); +#endif + + ERTS_HEAP_FREE(ERTS_ALC_T_HEAP, (void*) heap, p->heap_sz*sizeof(Eterm)); if (p->old_heap != NULL) { #ifdef DEBUG @@ -11321,6 +11313,9 @@ delete_process(Process* p) free_message_buffer(p->mbuf); } + if (p->msg_frag) + erts_cleanup_messages(p->msg_frag); + erts_erase_dicts(p); /* free all pending messages */ diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index a72c5b8ad4..71818b2330 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -170,8 +170,10 @@ extern int erts_sched_thread_suggested_stack_size; (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 5)) #define ERTS_RUNQ_FLG_PROTECTED \ (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 6)) +#define ERTS_RUNQ_FLG_EXEC \ + (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 7)) -#define ERTS_RUNQ_FLG_MAX (ERTS_RUNQ_FLG_BASE2 + 7) +#define ERTS_RUNQ_FLG_MAX (ERTS_RUNQ_FLG_BASE2 + 8) #define ERTS_RUNQ_FLGS_MIGRATION_QMASKS \ (ERTS_RUNQ_FLGS_EMIGRATE_QMASK \ @@ -215,6 +217,9 @@ extern int erts_sched_thread_suggested_stack_size; #define ERTS_RUNQ_FLGS_SET(RQ, FLGS) \ ((Uint32) erts_smp_atomic32_read_bor_relb(&(RQ)->flags, \ (erts_aint32_t) (FLGS))) +#define ERTS_RUNQ_FLGS_SET_NOB(RQ, FLGS) \ + ((Uint32) erts_smp_atomic32_read_bor_nob(&(RQ)->flags, \ + (erts_aint32_t) (FLGS))) #define ERTS_RUNQ_FLGS_BSET(RQ, MSK, FLGS) \ ((Uint32) erts_smp_atomic32_read_bset_relb(&(RQ)->flags, \ (erts_aint32_t) (MSK), \ @@ -222,6 +227,9 @@ extern int erts_sched_thread_suggested_stack_size; #define ERTS_RUNQ_FLGS_UNSET(RQ, FLGS) \ ((Uint32) erts_smp_atomic32_read_band_relb(&(RQ)->flags, \ (erts_aint32_t) ~(FLGS))) +#define ERTS_RUNQ_FLGS_UNSET_NOB(RQ, FLGS) \ + ((Uint32) erts_smp_atomic32_read_band_nob(&(RQ)->flags, \ + (erts_aint32_t) ~(FLGS))) #define ERTS_RUNQ_FLGS_GET(RQ) \ ((Uint32) erts_smp_atomic32_read_acqb(&(RQ)->flags)) #define ERTS_RUNQ_FLGS_GET_NOB(RQ) \ @@ -293,7 +301,6 @@ typedef enum { ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN_IX, ERTS_SSI_AUX_WORK_MISC_THR_PRGR_IX, ERTS_SSI_AUX_WORK_MISC_IX, - ERTS_SSI_AUX_WORK_CHECK_CHILDREN_IX, ERTS_SSI_AUX_WORK_SET_TMO_IX, ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK_IX, ERTS_SSI_AUX_WORK_REAP_PORTS_IX, @@ -326,8 +333,6 @@ typedef enum { (((erts_aint32_t) 1) << ERTS_SSI_AUX_WORK_MISC_THR_PRGR_IX) #define ERTS_SSI_AUX_WORK_MISC \ (((erts_aint32_t) 1) << ERTS_SSI_AUX_WORK_MISC_IX) -#define ERTS_SSI_AUX_WORK_CHECK_CHILDREN \ - (((erts_aint32_t) 1) << ERTS_SSI_AUX_WORK_CHECK_CHILDREN_IX) #define ERTS_SSI_AUX_WORK_SET_TMO \ (((erts_aint32_t) 1) << ERTS_SSI_AUX_WORK_SET_TMO_IX) #define ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK \ @@ -467,7 +472,7 @@ struct ErtsRunQueue_ { int full_reds_history[ERTS_FULL_REDS_HISTORY_SIZE]; int out_of_work_count; erts_aint32_t max_len; - erts_aint32_t len; + erts_smp_atomic32_t len; int wakeup_other; int wakeup_other_reds; int halt_in_progress; @@ -715,7 +720,19 @@ erts_smp_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio) ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - len = erts_smp_atomic32_read_nob(&rqi->len); + len = erts_smp_atomic32_read_dirty(&rq->len); + +#ifdef ERTS_SMP + if (len == 0) + erts_non_empty_runq(rq); +#endif + len++; + if (rq->max_len < len) + rq->max_len = len; + ASSERT(len > 0); + erts_smp_atomic32_set_nob(&rq->len, len); + + len = erts_smp_atomic32_read_dirty(&rqi->len); ASSERT(len >= 0); if (len == 0) { ASSERT((erts_smp_atomic32_read_nob(&rq->flags) @@ -728,15 +745,6 @@ erts_smp_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio) rqi->max_len = len; erts_smp_atomic32_set_relb(&rqi->len, len); - -#ifdef ERTS_SMP - if (rq->len == 0) - erts_non_empty_runq(rq); -#endif - rq->len++; - if (rq->max_len < rq->len) - rq->max_len = len; - ASSERT(rq->len > 0); } ERTS_GLB_INLINE void @@ -746,7 +754,12 @@ erts_smp_dec_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio) ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - len = erts_smp_atomic32_read_nob(&rqi->len); + len = erts_smp_atomic32_read_dirty(&rq->len); + len--; + ASSERT(len >= 0); + erts_smp_atomic32_set_nob(&rq->len, len); + + len = erts_smp_atomic32_read_dirty(&rqi->len); len--; ASSERT(len >= 0); if (len == 0) { @@ -757,8 +770,6 @@ erts_smp_dec_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio) } erts_smp_atomic32_set_relb(&rqi->len, len); - rq->len--; - ASSERT(rq->len >= 0); } ERTS_GLB_INLINE void @@ -768,7 +779,7 @@ erts_smp_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi) ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - len = erts_smp_atomic32_read_nob(&rqi->len); + len = erts_smp_atomic32_read_dirty(&rqi->len); ASSERT(rqi->max_len >= len); rqi->max_len = len; } @@ -1141,14 +1152,15 @@ void erts_check_for_holes(Process* p); #define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16) #define ERTS_PSFLG_DELAYED_SYS ERTS_PSFLG_BIT(17) #define ERTS_PSFLG_OFF_HEAP_MSGQ ERTS_PSFLG_BIT(18) +#define ERTS_PSFLG_ON_HEAP_MSGQ ERTS_PSFLG_BIT(19) #ifdef ERTS_DIRTY_SCHEDULERS -#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(19) -#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(20) -#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(21) -#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(22) -#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 23) +#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(20) +#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(21) +#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(22) +#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(23) +#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 24) #else -#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 19) +#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 20) #endif #define ERTS_PSFLGS_IN_PRQ_MASK (ERTS_PSFLG_IN_PRQ_MAX \ @@ -1197,6 +1209,7 @@ void erts_check_for_holes(Process* p); #define SPO_MONITOR 4 #define SPO_SYSTEM_PROC 8 #define SPO_OFF_HEAP_MSGQ 16 +#define SPO_ON_HEAP_MSGQ 32 extern int erts_default_spo_flags; @@ -1244,7 +1257,6 @@ Eterm* erts_heap_alloc(Process* p, Uint need, Uint xtra); Eterm* erts_set_hole_marker(Eterm* ptr, Uint sz); #endif -extern Uint erts_default_process_flags; extern erts_smp_rwmtx_t erts_cpu_bind_rwmtx; /* If any of the erts_system_monitor_* variables are set (enabled), ** erts_system_monitor must be != NIL, to allow testing on just @@ -1285,10 +1297,23 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags; #define F_HAVE_BLCKD_MSCHED (1 << 8) /* Process has blocked multi-scheduling */ #define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */ #define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */ -#define F_DISABLE_GC (1 << 11) /* Disable GC */ +#define F_DISABLE_GC (1 << 11) /* Disable GC (see below) */ #define F_OFF_HEAP_MSGQ (1 << 12) /* Off heap msg queue */ -#define F_OFF_HEAP_MSGQ_CHNG (1 << 13) /* Off heap msg queue changing */ -#define F_ABANDONED_HEAP_USE (1 << 14) /* Have usage of abandoned heap */ +#define F_ON_HEAP_MSGQ (1 << 13) /* Off heap msg queue */ +#define F_OFF_HEAP_MSGQ_CHNG (1 << 14) /* Off heap msg queue changing */ +#define F_ABANDONED_HEAP_USE (1 << 15) /* Have usage of abandoned heap */ +#define F_DELAY_GC (1 << 16) /* Similar to disable GC (see below) */ + +/* + * F_DISABLE_GC and F_DELAY_GC are similar. Both will prevent + * GC of the process, but it is important to use the right + * one: + * - F_DISABLE_GC should *only* be used by BIFs. This when + * the BIF needs to yield while preventig a GC. + * - F_DELAY_GC should only be used when GC is temporarily + * disabled while the process is scheduled. A process must + * not be scheduled out while F_DELAY_GC is set. + */ /* process trace_flags */ #define F_SENSITIVE (1 << 0) @@ -1652,11 +1677,8 @@ Eterm erts_multi_scheduling_blockers(Process *); void erts_start_schedulers(void); void erts_alloc_notify_delayed_dealloc(int); void erts_alloc_ensure_handle_delayed_dealloc_call(int); -#ifdef ERTS_SMP void erts_notify_canceled_timer(ErtsSchedulerData *, int); #endif -void erts_smp_notify_check_children_needed(void); -#endif #if ERTS_USE_ASYNC_READY_Q void erts_notify_check_async_ready_queue(void *); #endif @@ -1676,7 +1698,7 @@ void erts_sched_notify_check_cpu_bind(void); Uint erts_active_schedulers(void); void erts_init_process(int, int, int); Eterm erts_process_status(Process *, ErtsProcLocks, Process *, Eterm); -Uint erts_run_queues_len(Uint *); +Uint erts_run_queues_len(Uint *, int, int); void erts_add_to_runq(Process *); Eterm erts_bound_schedulers_term(Process *c_p); Eterm erts_get_cpu_topology_term(Process *c_p, Eterm which); diff --git a/erts/emulator/beam/erl_process_dict.c b/erts/emulator/beam/erl_process_dict.c index f82cad745a..da9ebd92ab 100644 --- a/erts/emulator/beam/erl_process_dict.c +++ b/erts/emulator/beam/erl_process_dict.c @@ -53,14 +53,17 @@ /* Hash utility macros */ #define HASH_RANGE(PDict) ((PDict)->homeSize + (PDict)->splitPosition) -#define MAKE_HASH(Term) \ -((is_small(Term)) ? unsigned_val(Term) : \ - ((is_atom(Term)) ? \ - (atom_tab(atom_val(term))->slot.bucket.hvalue) : \ - make_hash2(Term))) +#define MAKE_HASH(Term) \ + ((is_small(Term)) ? unsigned_val(Term) : \ + ((is_atom(Term)) ? \ + (atom_tab(atom_val(Term))->slot.bucket.hvalue) : \ + make_internal_hash(Term))) #define PD_SZ2BYTES(Sz) (sizeof(ProcDict) + ((Sz) - 1)*sizeof(Eterm)) +#define pd_hash_value(Pdict, Key) \ + pd_hash_value_to_ix(Pdict, MAKE_HASH((Key))) + /* Memory allocation macros */ #define PD_ALLOC(Sz) \ erts_alloc(ERTS_ALC_T_PROC_DICT, (Sz)) @@ -82,6 +85,7 @@ */ static void pd_hash_erase(Process *p, Eterm id, Eterm *ret); static void pd_hash_erase_all(Process *p); +static Eterm pd_hash_get_with_hval(Process *p, Eterm bucket, Eterm id); static Eterm pd_hash_get_keys(Process *p, Eterm value); static Eterm pd_hash_get_all_keys(Process *p, ProcDict *pd); static Eterm pd_hash_get_all(Process *p, ProcDict *pd); @@ -93,7 +97,7 @@ static void grow(Process *p); static void array_shrink(ProcDict **ppd, unsigned int need); static Eterm array_put(ProcDict **ppdict, unsigned int ndx, Eterm term); -static unsigned int pd_hash_value(ProcDict *pdict, Eterm term); +static unsigned int pd_hash_value_to_ix(ProcDict *pdict, Uint32 hx); static unsigned int next_array_size(unsigned int need); /* @@ -390,40 +394,55 @@ static void pd_hash_erase_all(Process *p) } } +Eterm erts_pd_hash_get_with_hx(Process *p, Uint32 hx, Eterm id) +{ + unsigned int hval; + ProcDict *pd = p->dictionary; + + if (pd == NULL) + return am_undefined; + hval = pd_hash_value_to_ix(pd, hx); + return pd_hash_get_with_hval(p, ARRAY_GET(pd, hval), id); +} + Eterm erts_pd_hash_get(Process *p, Eterm id) { unsigned int hval; - Eterm tmp; ProcDict *pd = p->dictionary; if (pd == NULL) return am_undefined; hval = pd_hash_value(pd, id); - tmp = ARRAY_GET(pd, hval); - if (is_boxed(tmp)) { /* Tuple */ - ASSERT(is_tuple(tmp)); - if (EQ(tuple_val(tmp)[1], id)) { - return tuple_val(tmp)[2]; + return pd_hash_get_with_hval(p, ARRAY_GET(pd, hval), id); +} + +Eterm pd_hash_get_with_hval(Process *p, Eterm bucket, Eterm id) +{ + if (is_boxed(bucket)) { /* Tuple */ + ASSERT(is_tuple(bucket)); + if (EQ(tuple_val(bucket)[1], id)) { + return tuple_val(bucket)[2]; } - } else if (is_list(tmp)) { - for (; tmp != NIL && !EQ(tuple_val(TCAR(tmp))[1], id); tmp = TCDR(tmp)) { + } else if (is_list(bucket)) { + for (; bucket != NIL && !EQ(tuple_val(TCAR(bucket))[1], id); bucket = TCDR(bucket)) { ; } - if (tmp != NIL) { - return tuple_val(TCAR(tmp))[2]; + if (bucket != NIL) { + return tuple_val(TCAR(bucket))[2]; } - } else if (is_not_nil(tmp)) { + } else if (is_not_nil(bucket)) { #ifdef DEBUG erts_fprintf(stderr, "Process dictionary for process %T is broken, trying to " "display term found in line %d:\n" - "%T\n", p->common.id, __LINE__, tmp); + "%T\n", p->common.id, __LINE__, bucket); #endif erl_exit(1, "Damaged process dictionary found during get/1."); } return am_undefined; } + #define PD_GET_TKEY(Dst,Src) \ do { \ ASSERT(is_tuple((Src))); \ @@ -932,17 +951,16 @@ static Eterm array_put(ProcDict **ppdict, unsigned int ndx, Eterm term) ** Basic utilities */ -static unsigned int pd_hash_value(ProcDict *pdict, Eterm term) +static unsigned int pd_hash_value_to_ix(ProcDict *pdict, Uint32 hx) { - Uint hash, high; - - hash = MAKE_HASH(term); - high = hash % (pdict->homeSize*2); + Uint high; + high = hx % (pdict->homeSize*2); if (high >= HASH_RANGE(pdict)) - return hash % pdict->homeSize; + return hx % pdict->homeSize; return high; } + static unsigned int next_array_size(unsigned int need) { static unsigned int tab[] = diff --git a/erts/emulator/beam/erl_process_dict.h b/erts/emulator/beam/erl_process_dict.h index cc53800eb5..9aa21b7c38 100644 --- a/erts/emulator/beam/erl_process_dict.h +++ b/erts/emulator/beam/erl_process_dict.h @@ -39,5 +39,6 @@ void erts_deep_dictionary_dump(int to, void *to_arg, Eterm erts_dictionary_copy(struct process *p, ProcDict *pd); Eterm erts_pd_hash_get(struct process *p, Eterm id); +Eterm erts_pd_hash_get_with_hx(Process *p, Uint32 hx, Eterm id); #endif diff --git a/erts/emulator/beam/export.c b/erts/emulator/beam/export.c index 2420df36b5..581efe6eec 100644 --- a/erts/emulator/beam/export.c +++ b/erts/emulator/beam/export.c @@ -184,6 +184,9 @@ init_export_table(void) f.cmp = (HCMP_FUN) export_cmp; f.alloc = (HALLOC_FUN) export_alloc; f.free = (HFREE_FUN) export_free; + f.meta_alloc = (HMALLOC_FUN) erts_alloc; + f.meta_free = (HMFREE_FUN) erts_free; + f.meta_print = (HMPRINT_FUN) erts_print; for (i=0; i<ERTS_NUM_CODE_IX; i++) { erts_index_init(ERTS_ALC_T_EXPORT_TABLE, &export_tables[i], "export_list", diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index 98c275a20c..0bf5988244 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -1492,6 +1492,7 @@ extern void erts_match_prog_foreach_offheap(Binary *b, extern erts_driver_t vanilla_driver; extern erts_driver_t spawn_driver; +extern erts_driver_t forker_driver; extern erts_driver_t fd_driver; int erts_beam_jump_table(void); diff --git a/erts/emulator/beam/hash.c b/erts/emulator/beam/hash.c index e0fde337f2..5a0b93f693 100644 --- a/erts/emulator/beam/hash.c +++ b/erts/emulator/beam/hash.c @@ -27,8 +27,6 @@ #endif #include "sys.h" -#include "erl_vm.h" -#include "global.h" #include "hash.h" /* @@ -66,6 +64,7 @@ void hash_get_info(HashInfo *hi, Hash *h) int i; int max_depth = 0; int objects = 0; + int used = 0; for (i = 0; i < size; i++) { int depth = 0; @@ -76,14 +75,18 @@ void hash_get_info(HashInfo *hi, Hash *h) depth++; b = b->next; } - if (depth > max_depth) - max_depth = depth; + if (depth) { + used++; + if (depth > max_depth) + max_depth = depth; + } } + ASSERT(objects == h->nobjs); hi->name = h->name; hi->size = h->size; - hi->used = h->used; - hi->objs = objects; + hi->used = used; + hi->objs = h->nobjs; hi->depth = max_depth; } @@ -98,11 +101,11 @@ void hash_info(int to, void *arg, Hash* h) hash_get_info(&hi, h); - erts_print(to, arg, "=hash_table:%s\n", hi.name); - erts_print(to, arg, "size: %d\n", hi.size); - erts_print(to, arg, "used: %d\n", hi.used); - erts_print(to, arg, "objs: %d\n", hi.objs); - erts_print(to, arg, "depth: %d\n", hi.depth); + h->fun.meta_print(to, arg, "=hash_table:%s\n", hi.name); + h->fun.meta_print(to, arg, "size: %d\n", hi.size); + h->fun.meta_print(to, arg, "used: %d\n", hi.used); + h->fun.meta_print(to, arg, "objs: %d\n", hi.objs); + h->fun.meta_print(to, arg, "depth: %d\n", hi.depth); } @@ -119,47 +122,56 @@ hash_table_sz(Hash *h) } +static ERTS_INLINE void set_thresholds(Hash* h) +{ + h->grow_threshold = (8*h->size)/5; /* grow at 160% load */ + if (h->size_ix > h->min_size_ix) + h->shrink_threshold = h->size / 5; /* shrink at 20% load */ + else + h->shrink_threshold = -1; /* never shrink below inital size */ +} + /* ** init a pre allocated or static hash structure ** and allocate buckets. */ -Hash* hash_init(ErtsAlcType_t type, Hash* h, char* name, int size, HashFunctions fun) +Hash* hash_init(int type, Hash* h, char* name, int size, HashFunctions fun) { int sz; int ix = 0; - h->type = type; + h->meta_alloc_type = type; while (h_size_table[ix] != -1 && h_size_table[ix] < size) ix++; if (h_size_table[ix] == -1) - erl_exit(1, "panic: too large hash table size (%d)\n", size); + return NULL; size = h_size_table[ix]; sz = size*sizeof(HashBucket*); - h->bucket = (HashBucket**) erts_alloc(h->type, sz); + h->bucket = (HashBucket**) fun.meta_alloc(h->meta_alloc_type, sz); sys_memzero(h->bucket, sz); h->is_allocated = 0; h->name = name; h->fun = fun; h->size = size; - h->size20percent = h->size/5; - h->size80percent = (4*h->size)/5; - h->ix = ix; - h->used = 0; + h->size_ix = ix; + h->min_size_ix = ix; + h->nobjs = 0; + set_thresholds(h); return h; } /* ** Create a new hash table */ -Hash* hash_new(ErtsAlcType_t type, char* name, int size, HashFunctions fun) +Hash* hash_new(int type, char* name, int size, HashFunctions fun) { Hash* h; - h = erts_alloc(type, sizeof(Hash)); + h = fun.meta_alloc(type, sizeof(Hash)); h = hash_init(type, h, name, size, fun); h->is_allocated = 1; @@ -183,9 +195,9 @@ void hash_delete(Hash* h) b = b_next; } } - erts_free(h->type, h->bucket); + h->fun.meta_free(h->meta_alloc_type, h->bucket); if (h->is_allocated) - erts_free(h->type, (void*) h); + h->fun.meta_free(h->meta_alloc_type, (void*) h); } /* @@ -199,39 +211,34 @@ static void rehash(Hash* h, int grow) int i; if (grow) { - if ((h_size_table[h->ix+1]) == -1) + if ((h_size_table[h->size_ix+1]) == -1) return; - h->ix++; + h->size_ix++; } else { - if (h->ix == 0) + if (h->size_ix == 0) return; - h->ix--; + h->size_ix--; } - h->size = h_size_table[h->ix]; - h->size20percent = h->size/5; - h->size80percent = (4*h->size)/5; + h->size = h_size_table[h->size_ix]; sz = h->size*sizeof(HashBucket*); - new_bucket = (HashBucket **) erts_alloc(h->type, sz); + new_bucket = (HashBucket **) h->fun.meta_alloc(h->meta_alloc_type, sz); sys_memzero(new_bucket, sz); - h->used = 0; - for (i = 0; i < old_size; i++) { HashBucket* b = h->bucket[i]; while (b != (HashBucket*) 0) { HashBucket* b_next = b->next; int ix = b->hvalue % h->size; - if (new_bucket[ix] == NULL) - h->used++; b->next = new_bucket[ix]; new_bucket[ix] = b; b = b_next; } } - erts_free(h->type, (void *) h->bucket); + h->fun.meta_free(h->meta_alloc_type, (void *) h->bucket); h->bucket = new_bucket; + set_thresholds(h); } /* @@ -268,68 +275,15 @@ void* hash_put(Hash* h, void* tmpl) } b = (HashBucket*) h->fun.alloc(tmpl); - if (h->bucket[ix] == NULL) - h->used++; - b->hvalue = hval; b->next = h->bucket[ix]; h->bucket[ix] = b; - if (h->used > h->size80percent) /* rehash at 80% */ + if (++h->nobjs > h->grow_threshold) rehash(h, 1); return (void*) b; } -static void -hash_insert_entry(Hash* h, HashBucket* entry) -{ - HashValue hval = entry->hvalue; - int ix = hval % h->size; - HashBucket* b = h->bucket[ix]; - - while (b != (HashBucket*) 0) { - if ((b->hvalue == hval) && (h->fun.cmp((void*)entry, (void*)b) == 0)) { - abort(); /* Should not happen */ - } - b = b->next; - } - - if (h->bucket[ix] == NULL) - h->used++; - - entry->next = h->bucket[ix]; - h->bucket[ix] = entry; - - if (h->used > h->size80percent) /* rehash at 80% */ - rehash(h, 1); -} - - -/* - * Move all entries in src into dst; empty src. - * Entries in src must not exist in dst. - */ -void -erts_hash_merge(Hash* src, Hash* dst) -{ - int limit = src->size; - HashBucket** bucket = src->bucket; - int i; - - src->used = 0; - for (i = 0; i < limit; i++) { - HashBucket* b = bucket[i]; - HashBucket* next; - - bucket[i] = NULL; - while (b) { - next = b->next; - hash_insert_entry(dst, b); - b = next; - } - } -} - /* ** Erase hash entry return template if erased ** return 0 if not erased @@ -348,9 +302,7 @@ void* hash_erase(Hash* h, void* tmpl) else h->bucket[ix] = b->next; h->fun.free((void*)b); - if (h->bucket[ix] == NULL) - h->used--; - if (h->used < h->size20percent) /* rehash at 20% */ + if (--h->nobjs < h->shrink_threshold) rehash(h, 0); return tmpl; } @@ -381,9 +333,7 @@ hash_remove(Hash *h, void *tmpl) prev->next = b->next; else h->bucket[ix] = b->next; - if (h->bucket[ix] == NULL) - h->used--; - if (h->used < h->size20percent) /* rehash at 20% */ + if (--h->nobjs < h->shrink_threshold) rehash(h, 0); return (void *) b; } diff --git a/erts/emulator/beam/hash.h b/erts/emulator/beam/hash.h index 87fdb360e3..e94aaa0a84 100644 --- a/erts/emulator/beam/hash.h +++ b/erts/emulator/beam/hash.h @@ -29,14 +29,17 @@ #include "sys.h" #endif -#include "erl_alloc.h" - typedef unsigned long HashValue; +typedef struct hash Hash; typedef int (*HCMP_FUN)(void*, void*); typedef HashValue (*H_FUN)(void*); typedef void* (*HALLOC_FUN)(void*); typedef void (*HFREE_FUN)(void*); +/* Meta functions */ +typedef void* (*HMALLOC_FUN)(int,size_t); +typedef void (*HMFREE_FUN)(int,void*); +typedef int (*HMPRINT_FUN)(int,void*,char*, ...); /* ** This bucket must be placed in top of @@ -55,6 +58,9 @@ typedef struct hash_functions HCMP_FUN cmp; HALLOC_FUN alloc; HFREE_FUN free; + HMALLOC_FUN meta_alloc; + HMFREE_FUN meta_free; + HMPRINT_FUN meta_print; } HashFunctions; typedef struct { @@ -65,22 +71,23 @@ typedef struct { int depth; } HashInfo; -typedef struct hash +struct hash { HashFunctions fun; /* Function block */ int is_allocated; /* 0 iff hash structure is on stack or is static */ - ErtsAlcType_t type; + int meta_alloc_type; /* argument to pass to meta_alloc and meta_free */ char* name; /* Table name (static string, for debugging) */ int size; /* Number of slots */ - int size20percent; /* 20 percent of number of slots */ - int size80percent; /* 80 percent of number of slots */ - int ix; /* Size index in size table */ - int used; /* Number of slots used */ + int shrink_threshold; + int grow_threshold; + int size_ix; /* Size index in size table */ + int min_size_ix; /* Never shrink table smaller than this */ + int nobjs; /* Number of objects in table */ HashBucket** bucket; /* Vector of bucket pointers (objects) */ -} Hash; +}; -Hash* hash_new(ErtsAlcType_t, char*, int, HashFunctions); -Hash* hash_init(ErtsAlcType_t, Hash*, char*, int, HashFunctions); +Hash* hash_new(int, char*, int, HashFunctions); +Hash* hash_init(int, Hash*, char*, int, HashFunctions); void hash_delete(Hash*); void hash_get_info(HashInfo*, Hash*); @@ -93,6 +100,4 @@ void* hash_erase(Hash*, void*); void* hash_remove(Hash*, void*); void hash_foreach(Hash*, void (*func)(void *, void *), void *); -void erts_hash_merge(Hash* src, Hash* dst); - #endif diff --git a/erts/emulator/beam/index.h b/erts/emulator/beam/index.h index 14fab41026..99b2bdfab0 100644 --- a/erts/emulator/beam/index.h +++ b/erts/emulator/beam/index.h @@ -30,6 +30,10 @@ #include "hash.h" #endif +#ifndef ERL_ALLOC_H__ +#include "erl_alloc.h" +#endif + typedef struct index_slot { HashBucket bucket; diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 56a04f9b7f..93c591b124 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -54,6 +54,9 @@ extern ErlDrvEntry fd_driver_entry; extern ErlDrvEntry vanilla_driver_entry; extern ErlDrvEntry spawn_driver_entry; +#ifndef __WIN32__ +extern ErlDrvEntry forker_driver_entry; +#endif extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */ erts_driver_t *driver_list; /* List of all drivers, static and dynamic. */ @@ -71,6 +74,9 @@ const Port erts_invalid_port = {{ERTS_INVALID_PORT}}; erts_driver_t vanilla_driver; erts_driver_t spawn_driver; +#ifndef __WIN32__ +erts_driver_t forker_driver; +#endif erts_driver_t fd_driver; int erts_port_synchronous_ops = 0; @@ -84,6 +90,7 @@ static void deliver_result(Eterm sender, Eterm pid, Eterm res); static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *); static void terminate_port(Port *p); static void pdl_init(void); +static int driver_failure_term(ErlDrvPort ix, Eterm term, int eof); #ifdef ERTS_SMP static void driver_monitor_lock_pdl(Port *p); static void driver_monitor_unlock_pdl(Port *p); @@ -305,12 +312,9 @@ static Port *create_port(char *name, size_t port_size, busy_port_queue_size, size; erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED; erts_aint32_t x_pts_flgs = 0; -#ifdef DEBUG - /* Make sure the debug flags survives until port is freed */ - state |= ERTS_PORT_SFLG_PORT_DEBUG; -#endif #ifdef ERTS_SMP + ErtsRunQueue *runq; if (!driver_lock) { /* Align size for mutex following port struct */ port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); @@ -320,6 +324,12 @@ static Port *create_port(char *name, #endif port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); +#ifdef DEBUG + /* Make sure the debug flags survives until port is freed */ + state |= ERTS_PORT_SFLG_PORT_DEBUG; +#endif + + busy_port_queue_size = ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ) ? 0 @@ -355,8 +365,12 @@ static Port *create_port(char *name, p += sizeof(erts_mtx_t); state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } - erts_smp_atomic_set_nob(&prt->run_queue, - (erts_aint_t) erts_get_runq_current(NULL)); + if (erts_get_scheduler_data()) + runq = erts_get_runq_current(NULL); + else + runq = ERTS_RUNQ_IX(0); + erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq); + prt->xports = NULL; #else erts_atomic32_init_nob(&prt->refc, 1); @@ -383,6 +397,7 @@ static Port *create_port(char *name, ERTS_PTMR_INIT(prt); erts_port_task_handle_init(&prt->timeout_task); prt->psd = NULL; + prt->async_open_port = NULL; prt->drv_data = (SWord) 0; prt->os_pid = -1; @@ -464,6 +479,11 @@ erts_port_free(Port *prt) erts_port_task_fini_sched(&prt->sched); + if (prt->async_open_port) { + erts_free(ERTS_ALC_T_PRTSD, prt->async_open_port); + prt->async_open_port = NULL; + } + #ifdef ERTS_SMP ASSERT(prt->lock); if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) @@ -1525,6 +1545,26 @@ erts_schedule_proc2port_signal(Process *c_p, return ERTS_PORT_OP_SCHEDULED; } +static int +erts_schedule_port2port_signal(Eterm port_num, ErtsProc2PortSigData *sigdp, + int task_flags, + ErtsProc2PortSigCallback callback) +{ + Port *prt = erts_port_lookup_raw(port_num); + + if (!prt) + return -1; + + sigdp->caller = ERTS_INVALID_PID; + + return erts_port_task_schedule(prt->common.id, + NULL, + ERTS_PORT_TASK_PROC_SIG, + sigdp, + callback, + task_flags); +} + static ERTS_INLINE void send_badsig(Port *prt) { ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; @@ -2360,6 +2400,11 @@ erts_port_exit(Process *c_p, | ERTS_PORT_SIG_FLG_BROKEN_LINK | ERTS_PORT_SIG_FLG_FORCE_SCHED)) == 0); +#ifndef __WIN32__ + if (prt->drv_ptr == &forker_driver) + return ERTS_PORT_OP_DROPPED; +#endif + if (!(flags & ERTS_PORT_SIG_FLG_FORCE_SCHED)) { ErtsTryImmDrvCallState try_call_state = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(c_p, @@ -2724,6 +2769,72 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) port_sig_link); } +static void +init_ack_send_reply(Port *port, Eterm resp) +{ + + if (!is_internal_port(resp)) { + Process *rp = erts_proc_lookup_raw(port->async_open_port->to); + erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK); + erts_remove_link(&ERTS_P_LINKS(port), port->async_open_port->to); + erts_remove_link(&ERTS_P_LINKS(rp), port->common.id); + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + } + port_sched_op_reply(port->async_open_port->to, + port->async_open_port->ref, + resp); + + erts_free(ERTS_ALC_T_PRTSD, port->async_open_port); + port->async_open_port = NULL; +} + +void +erl_drv_init_ack(ErlDrvPort ix, ErlDrvData res) { + Port *port = erts_drvport2port(ix); + SWord err_type = (SWord)res; + Eterm resp; + + if (port == ERTS_INVALID_ERL_DRV_PORT && port->async_open_port) + return; + + if (port->async_open_port) { + switch(err_type) { + case -3: + resp = am_badarg; + break; + case -2: { + char *str = erl_errno_id(errno); + resp = erts_atom_put((byte *) str, strlen(str), + ERTS_ATOM_ENC_LATIN1, 1); + break; + } + case -1: + resp = am_einval; + break; + default: + resp = port->common.id; + break; + } + + init_ack_send_reply(port, resp); + + if (err_type == -1 || err_type == -2 || err_type == -3) + driver_failure_term(ix, am_normal, 0); + port->drv_data = err_type; + } +} + +void +erl_drv_set_os_pid(ErlDrvPort ix, ErlDrvSInt pid) { + Port *port = erts_drvport2port(ix); + + if (port == ERTS_INVALID_ERL_DRV_PORT) + return; + + port->os_pid = (SWord)pid; + +} + void erts_init_io(int port_tab_size, int port_tab_size_ignore_files, int legacy_port_tab) @@ -2786,6 +2897,9 @@ void erts_init_io(int port_tab_size, init_driver(&fd_driver, &fd_driver_entry, NULL); init_driver(&vanilla_driver, &vanilla_driver_entry, NULL); init_driver(&spawn_driver, &spawn_driver_entry, NULL); +#ifndef __WIN32__ + init_driver(&forker_driver, &forker_driver_entry, NULL); +#endif erts_init_static_drivers(); for (dp = driver_tab; *dp != NULL; dp++) erts_add_driver_entry(*dp, NULL, 1); @@ -2847,6 +2961,9 @@ void erts_lcnt_enable_io_lock_count(int enable) { lcnt_enable_drv_lock_count(&vanilla_driver, enable); lcnt_enable_drv_lock_count(&spawn_driver, enable); +#ifndef __WIN32__ + lcnt_enable_drv_lock_count(&forker_driver, enable); +#endif lcnt_enable_drv_lock_count(&fd_driver, enable); /* enable lock counting in all drivers */ for (dp = driver_list; dp; dp = dp->next) { @@ -3885,7 +4002,7 @@ port_sig_control(Port *prt, Uint hsz, rsz; int control_flags; - rp = erts_proc_lookup_raw(sigdp->caller); + rp = sigdp->caller == ERTS_INVALID_PID ? NULL : erts_proc_lookup_raw(sigdp->caller); if (!rp) goto done; @@ -3921,7 +4038,8 @@ port_sig_control(Port *prt, /* failure */ - port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); + if (sigdp->caller != ERTS_INVALID_PID) + port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); done: @@ -3931,6 +4049,23 @@ done: return ERTS_PORT_REDS_CONTROL; } +/* + * This is an asynchronous control call. I.e. it will not return anything + * to the caller. + */ +int +erl_drv_port_control(Eterm port_num, char cmd, char* buff, ErlDrvSizeT size) +{ + ErtsProc2PortSigData *sigdp = erts_port_task_alloc_p2p_sig_data(); + + sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL | ERTS_P2P_SIG_DATA_FLG_REPLY; + sigdp->u.control.binp = NULL; + sigdp->u.control.command = cmd; + sigdp->u.control.bufp = buff; + sigdp->u.control.size = size; + + return erts_schedule_port2port_signal(port_num, sigdp, 0, port_sig_control); +} ErtsPortOpResult erts_port_control(Process* c_p, @@ -4702,6 +4837,10 @@ print_port_info(Port *p, int to, void *arg) erts_print(to, arg, "Port is a file: %s\n",p->name); } else if (p->drv_ptr == &spawn_driver) { erts_print(to, arg, "Port controls external process: %s\n",p->name); +#ifndef __WIN32__ + } else if (p->drv_ptr == &forker_driver) { + erts_print(to, arg, "Port controls forker process: %s\n",p->name); +#endif } else { erts_print(to, arg, "Port controls linked-in driver: %s\n",p->name); } @@ -6933,6 +7072,9 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) if (prt == ERTS_INVALID_ERL_DRV_PORT) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + + if (prt->async_open_port) + init_ack_send_reply(prt, prt->common.id); if (eof) flush_linebuf_messages(prt, state); if (state & ERTS_PORT_SFLG_CLOSING) { diff --git a/erts/emulator/beam/module.c b/erts/emulator/beam/module.c index f6794c012f..f5c7b177d3 100644 --- a/erts/emulator/beam/module.c +++ b/erts/emulator/beam/module.c @@ -103,6 +103,9 @@ void init_module_table(void) f.cmp = (HCMP_FUN) module_cmp; f.alloc = (HALLOC_FUN) module_alloc; f.free = (HFREE_FUN) module_free; + f.meta_alloc = (HMALLOC_FUN) erts_alloc; + f.meta_free = (HMFREE_FUN) erts_free; + f.meta_print = (HMPRINT_FUN) erts_print; for (i = 0; i < ERTS_NUM_CODE_IX; i++) { erts_index_init(ERTS_ALC_T_MODULE_TABLE, &module_tables[i], "module_code", diff --git a/erts/emulator/beam/ops.tab b/erts/emulator/beam/ops.tab index 46fefb88af..081c4108a0 100644 --- a/erts/emulator/beam/ops.tab +++ b/erts/emulator/beam/ops.tab @@ -1036,7 +1036,7 @@ call_bif e bif0 u$bif:erlang:self/0 Dst=d => self Dst bif0 u$bif:erlang:node/0 Dst=d => node Dst -bif1 Fail Bif=u$bif:erlang:get/1 Src=s Dst=d => i_get Src Dst +bif1 Fail Bif=u$bif:erlang:get/1 Src=s Dst=d => gen_get(Src, Dst) bif2 Jump=j u$bif:erlang:element/2 S1=s S2=xy Dst=d => gen_element(Jump, S1, S2, Dst) @@ -1045,6 +1045,7 @@ bif1 p Bif S1 Dst => bif1_body Bif S1 Dst bif2 p Bif S1 S2 Dst => i_bif2_body Bif S1 S2 Dst bif2 Fail Bif S1 S2 Dst => i_bif2 Fail Bif S1 S2 Dst +i_get_hash c I d i_get s d %macro: self Self diff --git a/erts/emulator/beam/register.c b/erts/emulator/beam/register.c index 7ade8bca0f..fdb6cbc813 100644 --- a/erts/emulator/beam/register.c +++ b/erts/emulator/beam/register.c @@ -151,6 +151,9 @@ void init_register_table(void) f.cmp = (HCMP_FUN) reg_cmp; f.alloc = (HALLOC_FUN) reg_alloc; f.free = (HFREE_FUN) reg_free; + f.meta_alloc = (HMALLOC_FUN) erts_alloc; + f.meta_free = (HMFREE_FUN) erts_free; + f.meta_print = (HMPRINT_FUN) erts_print; hash_init(ERTS_ALC_T_REG_TABLE, &process_reg, "process_reg", PREG_HASH_SIZE, f); diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h index 2170d416c8..53f8313daa 100644 --- a/erts/emulator/beam/sys.h +++ b/erts/emulator/beam/sys.h @@ -87,15 +87,6 @@ # define NO_FPE_SIGNALS #endif -#ifdef DISABLE_CHILD_WAITER_THREAD -#undef ENABLE_CHILD_WAITER_THREAD -#endif - -#if defined(ERTS_SMP) && !defined(DISABLE_CHILD_WAITER_THREAD) -#undef ENABLE_CHILD_WAITER_THREAD -#define ENABLE_CHILD_WAITER_THREAD 1 -#endif - #define ERTS_I64_LITERAL(X) X##LL #define ErtsInArea(ptr,start,nbytes) \ @@ -746,6 +737,7 @@ void erts_sys_main_thread(void); extern int erts_sys_prepare_crash_dump(int secs); extern void erts_sys_pre_init(void); extern void erl_sys_init(void); +extern void erl_sys_late_init(void); extern void erl_sys_args(int *argc, char **argv); extern void erl_sys_schedule(int); void sys_tty_reset(int); |