diff options
Diffstat (limited to 'erts/emulator/beam/bif.c')
-rw-r--r-- | erts/emulator/beam/bif.c | 583 |
1 files changed, 270 insertions, 313 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 1cdce49eef..98dc9a3902 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -37,10 +37,13 @@ #include "erl_db_util.h" #include "register.h" #include "erl_thr_progress.h" +#define ERTS_PTAB_WANT_BIF_IMPL__ +#include "erl_ptab.h" static Export* flush_monitor_message_trap = NULL; static Export* set_cpu_topology_trap = NULL; static Export* await_proc_exit_trap = NULL; +static Export* await_port_send_result_trap = NULL; Export* erts_format_cpu_topology_trap = NULL; static Export *await_sched_wall_time_mod_trap; @@ -83,8 +86,10 @@ static int insert_internal_link(Process* p, Eterm rpid) ASSERT(is_internal_pid(rpid)); #ifdef ERTS_SMP - if (IS_TRACED(p) && (p->trace_flags & (F_TRACE_SOL|F_TRACE_SOL1))) + if (IS_TRACED(p) + && (ERTS_TRACE_FLAGS(p) & (F_TRACE_SOL|F_TRACE_SOL1))) { rp_locks = ERTS_PROC_LOCKS_ALL; + } erts_smp_proc_lock(p, ERTS_PROC_LOCK_LINK); #endif @@ -100,27 +105,27 @@ static int insert_internal_link(Process* p, Eterm rpid) } if (p != rp) { - erts_add_link(&(p->nlinks), LINK_PID, rp->id); - erts_add_link(&(rp->nlinks), LINK_PID, p->id); + erts_add_link(&ERTS_P_LINKS(p), LINK_PID, rp->common.id); + erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, p->common.id); - ASSERT(is_nil(p->tracer_proc) - || is_internal_pid(p->tracer_proc) - || is_internal_port(p->tracer_proc)); + ASSERT(is_nil(ERTS_TRACER_PROC(p)) + || is_internal_pid(ERTS_TRACER_PROC(p)) + || is_internal_port(ERTS_TRACER_PROC(p))); if (IS_TRACED(p)) { - if (p->trace_flags & (F_TRACE_SOL|F_TRACE_SOL1)) { - rp->trace_flags |= (p->trace_flags & TRACEE_FLAGS); - rp->tracer_proc = p->tracer_proc; /* maybe steal */ + if (ERTS_TRACE_FLAGS(p) & (F_TRACE_SOL|F_TRACE_SOL1)) { + ERTS_TRACE_FLAGS(rp) |= (ERTS_TRACE_FLAGS(p) & TRACEE_FLAGS); + ERTS_TRACER_PROC(rp) = ERTS_TRACER_PROC(p); /* maybe steal */ - if (p->trace_flags & F_TRACE_SOL1) { /* maybe override */ - rp->trace_flags &= ~(F_TRACE_SOL1 | F_TRACE_SOL); - p->trace_flags &= ~(F_TRACE_SOL1 | F_TRACE_SOL); + if (ERTS_TRACE_FLAGS(p) & F_TRACE_SOL1) { /* maybe override */ + ERTS_TRACE_FLAGS(rp) &= ~(F_TRACE_SOL1 | F_TRACE_SOL); + ERTS_TRACE_FLAGS(p) &= ~(F_TRACE_SOL1 | F_TRACE_SOL); } } } } if (IS_TRACED_FL(rp, F_TRACE_PROCS)) - trace_proc(p, rp, am_getting_linked, p->id); + trace_proc(p, rp, am_getting_linked, p->common.id); if (p == rp) erts_smp_proc_unlock(p, rp_locks & ~ERTS_PROC_LOCK_MAIN); @@ -144,10 +149,6 @@ BIF_RETTYPE link_1(BIF_ALIST_1) /* check that the pid or port which is our argument is OK */ if (is_internal_pid(BIF_ARG_1)) { - if (internal_pid_index(BIF_ARG_1) >= erts_max_processes) { - BIF_ERROR(BIF_P, BADARG); - } - if (insert_internal_link(BIF_P, BIF_ARG_1)) { BIF_RET(am_true); } @@ -157,19 +158,37 @@ BIF_RETTYPE link_1(BIF_ALIST_1) } if (is_internal_port(BIF_ARG_1)) { - Port *pt = erts_id2port(BIF_ARG_1, BIF_P, ERTS_PROC_LOCK_MAIN); - if (!pt) { + int send_link_signal = 0; + Port *prt = erts_port_lookup(BIF_ARG_1, ERTS_PORT_SFLGS_INVALID_LOOKUP); + if (!prt) { goto res_no_proc; } erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK); - if (erts_add_link(&(BIF_P->nlinks), LINK_PID, BIF_ARG_1) >= 0) - erts_add_link(&(pt->nlinks), LINK_PID, BIF_P->id); + if (erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, BIF_ARG_1) >= 0) + send_link_signal = 1; /* else: already linked */ erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); - erts_smp_port_unlock(pt); + + if (send_link_signal) { + Eterm ref; + Eterm *refp = erts_port_synchronous_ops ? &ref : NULL; + + switch (erts_port_link(BIF_P, prt, BIF_P->common.id, refp)) { + case ERTS_PORT_OP_DROPPED: + case ERTS_PORT_OP_BADARG: + goto res_no_proc; + case ERTS_PORT_OP_SCHEDULED: + if (refp) { + ASSERT(is_internal_ref(ref)); + BIF_TRAP3(await_port_send_result_trap, BIF_P, ref, am_true, am_true); + } + default: + break; + } + } BIF_RET(am_true); } else if (is_external_port(BIF_ARG_1) @@ -182,7 +201,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK); /* We may earn time by checking first that we're not linked already */ - if (erts_lookup_link(BIF_P->nlinks, BIF_ARG_1) != NULL) { + if (erts_lookup_link(ERTS_P_LINKS(BIF_P), BIF_ARG_1) != NULL) { erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); BIF_RET(am_true); } @@ -209,10 +228,10 @@ BIF_RETTYPE link_1(BIF_ALIST_1) erts_smp_de_links_lock(dep); - erts_add_link(&(BIF_P->nlinks), LINK_PID, BIF_ARG_1); + erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, BIF_ARG_1); lnk = erts_add_or_lookup_link(&(dep->nlinks), LINK_PID, - BIF_P->id); + BIF_P->common.id); ASSERT(lnk != NULL); erts_add_link(&ERTS_LINK_ROOT(lnk), LINK_PID, BIF_ARG_1); @@ -220,7 +239,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) erts_smp_de_runlock(dep); erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); - code = erts_dsig_send_link(&dsd, BIF_P->id, BIF_ARG_1); + code = erts_dsig_send_link(&dsd, BIF_P->common.id, BIF_ARG_1); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); @@ -289,7 +308,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) if (dmon) erts_destroy_monitor(dmon); } - mon = erts_remove_monitor(&c_p->monitors, ref); + mon = erts_remove_monitor(&ERTS_P_MONITORS(c_p), ref); erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_LINK); res = ERTS_DEMONITOR_TRUE; @@ -298,7 +317,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) case ERTS_DSIG_PREP_CONNECTED: erts_smp_de_links_lock(dep); - mon = erts_remove_monitor(&c_p->monitors, ref); + mon = erts_remove_monitor(&ERTS_P_MONITORS(c_p), ref); dmon = erts_remove_monitor(&dep->monitors, ref); erts_smp_de_links_unlock(dep); erts_smp_de_runlock(dep); @@ -325,7 +344,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) * the atom is stored there. Yield if necessary. */ code = erts_dsig_send_demonitor(&dsd, - c_p->id, + c_p->common.id, (mon->name != NIL ? mon->name : mon->pid), @@ -387,7 +406,7 @@ static int demonitor(Process *c_p, Eterm ref) goto done; /* Cannot be this monitor's ref */ } - mon = erts_lookup_monitor(c_p->monitors, ref); + mon = erts_lookup_monitor(ERTS_P_MONITORS(c_p), ref); if (!mon) { res = ERTS_DEMONITOR_FALSE; goto done; @@ -426,7 +445,7 @@ static int demonitor(Process *c_p, Eterm ref) to, ERTS_PROC_LOCK_LINK, ERTS_P2P_FLG_ALLOW_OTHER_X); - mon = erts_remove_monitor(&c_p->monitors, ref); + mon = erts_remove_monitor(&ERTS_P_MONITORS(c_p), ref); #ifndef ERTS_SMP ASSERT(mon); #else @@ -440,7 +459,7 @@ static int demonitor(Process *c_p, Eterm ref) } if (rp) { ErtsMonitor *rmon; - rmon = erts_remove_monitor(&(rp->monitors), ref); + rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); if (rp != c_p) erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); if (rmon != NULL) @@ -582,7 +601,7 @@ local_pid_monitor(Process *p, Eterm target) mon_ref = erts_make_ref(p); ERTS_BIF_PREP_RET(ret, mon_ref); - if (target == p->id) { + if (target == p->common.id) { return ret; } @@ -599,8 +618,8 @@ local_pid_monitor(Process *p, Eterm target) else { ASSERT(rp != p); - erts_add_monitor(&(p->monitors), MON_ORIGIN, mon_ref, target, NIL); - erts_add_monitor(&(rp->monitors), MON_TARGET, mon_ref, p->id, NIL); + erts_add_monitor(&ERTS_P_MONITORS(p), MON_ORIGIN, mon_ref, target, NIL); + erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, mon_ref, p->common.id, NIL); erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); } @@ -635,9 +654,9 @@ local_name_monitor(Process *p, Eterm target_name) UnUseTmpHeap(3,p); } else if (rp != p) { - erts_add_monitor(&(p->monitors), MON_ORIGIN, mon_ref, rp->id, + erts_add_monitor(&ERTS_P_MONITORS(p), MON_ORIGIN, mon_ref, rp->common.id, target_name); - erts_add_monitor(&(rp->monitors), MON_TARGET, mon_ref, p->id, + erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, mon_ref, p->common.id, target_name); erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); } @@ -689,16 +708,16 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, erts_smp_de_links_lock(dep); - erts_add_monitor(&(p->monitors), MON_ORIGIN, mon_ref, p_trgt, + erts_add_monitor(&ERTS_P_MONITORS(p), MON_ORIGIN, mon_ref, p_trgt, p_name); - erts_add_monitor(&(dep->monitors), MON_TARGET, mon_ref, p->id, + erts_add_monitor(&(dep->monitors), MON_TARGET, mon_ref, p->common.id, d_name); erts_smp_de_links_unlock(dep); erts_smp_de_runlock(dep); erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK); - code = erts_dsig_send_monitor(&dsd, p->id, target, mon_ref); + code = erts_dsig_send_monitor(&dsd, p->common.id, target, mon_ref); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_PREP_YIELD_RETURN(ret, p, mon_ref); else @@ -941,36 +960,39 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) } if (is_internal_port(BIF_ARG_1)) { - Port *pt = erts_id2port_sflgs(BIF_ARG_1, - BIF_P, - ERTS_PROC_LOCK_MAIN, - ERTS_PORT_SFLGS_DEAD); - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCK_STATUS); #ifdef ERTS_SMP - if (ERTS_PROC_PENDING_EXIT(BIF_P)) { - if (pt) - erts_smp_port_unlock(pt); + if (ERTS_PROC_PENDING_EXIT(BIF_P)) goto handle_pending_exit; - } #endif - l = erts_remove_link(&BIF_P->nlinks, BIF_ARG_1); - - ASSERT(pt || !l); - - if (pt) { - rl = erts_remove_link(&pt->nlinks, BIF_P->id); - erts_smp_port_unlock(pt); - if (rl) - erts_destroy_link(rl); - } + l = erts_remove_link(&ERTS_P_LINKS(BIF_P), BIF_ARG_1); erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCK_STATUS); - if (l) + if (l) { + Port *prt; + erts_destroy_link(l); + /* Send unlink signal */ + prt = erts_port_lookup(BIF_ARG_1, ERTS_PORT_SFLGS_DEAD); + if (prt) { + ErtsPortOpResult res; + Eterm ref; + Eterm *refp = erts_port_synchronous_ops ? &ref : NULL; +#ifdef DEBUG + ref = NIL; +#endif + res = erts_port_unlink(BIF_P, prt, BIF_P->common.id, refp); + + if (refp && res == ERTS_PORT_OP_SCHEDULED) { + ASSERT(is_internal_ref(ref)); + BIF_TRAP3(await_port_send_result_trap, BIF_P, ref, am_true, am_true); + } + } + } + BIF_RET(am_true); } else if (is_external_port(BIF_ARG_1) @@ -993,7 +1015,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) if (ERTS_PROC_PENDING_EXIT(BIF_P)) goto handle_pending_exit; #endif - l = erts_remove_link(&BIF_P->nlinks,BIF_ARG_1); + l = erts_remove_link(&ERTS_P_LINKS(BIF_P), BIF_ARG_1); erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCK_STATUS); @@ -1022,8 +1044,8 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) #endif case ERTS_DSIG_PREP_CONNECTED: - erts_remove_dist_link(&dld, BIF_P->id, BIF_ARG_1, dep); - code = erts_dsig_send_unlink(&dsd, BIF_P->id, BIF_ARG_1); + erts_remove_dist_link(&dld, BIF_P->common.id, BIF_ARG_1, dep); + code = erts_dsig_send_unlink(&dsd, BIF_P->common.id, BIF_ARG_1); erts_destroy_dist_link(&dld); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); @@ -1037,10 +1059,6 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) /* Internal pid... */ - /* process ok ? */ - if (internal_pid_index(BIF_ARG_1) >= erts_max_processes) - BIF_ERROR(BIF_P, BADARG); - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCK_STATUS); /* get process struct */ @@ -1059,7 +1077,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) #endif /* unlink and ignore errors */ - l = erts_remove_link(&BIF_P->nlinks,BIF_ARG_1); + l = erts_remove_link(&ERTS_P_LINKS(BIF_P), BIF_ARG_1); if (l != NULL) erts_destroy_link(l); @@ -1067,12 +1085,12 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) ERTS_SMP_ASSERT_IS_NOT_EXITING(BIF_P); } else { - rl = erts_remove_link(&(rp->nlinks),BIF_P->id); + rl = erts_remove_link(&ERTS_P_LINKS(rp), BIF_P->common.id); if (rl != NULL) erts_destroy_link(rl); if (IS_TRACED_FL(rp, F_TRACE_PROCS) && rl != NULL) { - trace_proc(BIF_P, rp, am_getting_unlinked, BIF_P->id); + trace_proc(BIF_P, rp, am_getting_unlinked, BIF_P->common.id); } if (rp != BIF_P) @@ -1345,15 +1363,28 @@ BIF_RETTYPE exit_2(BIF_ALIST_2) */ if (is_internal_port(BIF_ARG_1)) { - Port *prt; - erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); - prt = erts_id2port(BIF_ARG_1, NULL, 0); + Port *prt = erts_port_lookup(BIF_ARG_1, ERTS_PORT_SFLGS_INVALID_LOOKUP); + if (prt) { - erts_do_exit_port(prt, BIF_P->id, BIF_ARG_2); - erts_port_release(prt); + Eterm ref; + Eterm *refp = erts_port_synchronous_ops ? &ref : NULL; + ErtsPortOpResult res; + +#ifdef DEBUG + ref = NIL; +#endif + + res = erts_port_exit(BIF_P, 0, prt, BIF_P->common.id, BIF_ARG_2, refp); + + ERTS_BIF_CHK_EXITED(BIF_P); + + if (refp && res == ERTS_PORT_OP_SCHEDULED) { + ASSERT(is_internal_ref(ref)); + BIF_TRAP3(await_port_send_result_trap, BIF_P, ref, am_true, am_true); + } + } - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); - ERTS_BIF_CHK_EXITED(BIF_P); + BIF_RET(am_true); } else if(is_external_port(BIF_ARG_1) @@ -1379,7 +1410,7 @@ BIF_RETTYPE exit_2(BIF_ALIST_2) case ERTS_DSIG_PREP_NOT_CONNECTED: BIF_TRAP2(dexit_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); case ERTS_DSIG_PREP_CONNECTED: - code = erts_dsig_send_exit2(&dsd, BIF_P->id, BIF_ARG_1, BIF_ARG_2); + code = erts_dsig_send_exit2(&dsd, BIF_P->common.id, BIF_ARG_1, BIF_ARG_2); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); @@ -1397,9 +1428,7 @@ BIF_RETTYPE exit_2(BIF_ALIST_2) */ ErtsProcLocks rp_locks; - if (internal_pid_index(BIF_ARG_1) >= erts_max_processes) - BIF_ERROR(BIF_P, BADARG); - if (BIF_ARG_1 == BIF_P->id) { + if (BIF_ARG_1 == BIF_P->common.id) { rp_locks = ERTS_PROC_LOCKS_ALL; rp = BIF_P; erts_smp_proc_lock(rp, ERTS_PROC_LOCKS_ALL_MINOR); @@ -1417,7 +1446,7 @@ BIF_RETTYPE exit_2(BIF_ALIST_2) * Send an exit signal. */ erts_send_exit_signal(BIF_P, - BIF_P->id, + BIF_P->common.id, rp, &rp_locks, BIF_ARG_2, @@ -1519,22 +1548,24 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2) } /* * NOTE: It is important that we check for pending exit signals - * and handle them before flag trap_exit is set to true. - * For more info, see implementation of erts_send_exit_signal(). + * and handle them before returning if trap_exit is set to + * true. For more info, see implementation of + * erts_send_exit_signal(). */ - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_STATUS); - ERTS_SMP_LC_ASSERT((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS) - & erts_proc_lc_my_proc_locks(BIF_P)); - ERTS_SMP_BIF_CHK_PENDING_EXIT(BIF_P, - ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); if (trap_exit) - state = erts_smp_atomic32_read_bor_nob(&BIF_P->state, - ERTS_PSFLG_TRAP_EXIT); + state = erts_smp_atomic32_read_bor_mb(&BIF_P->state, + ERTS_PSFLG_TRAP_EXIT); else - state = erts_smp_atomic32_read_band_nob(&BIF_P->state, - ~ERTS_PSFLG_TRAP_EXIT); + state = erts_smp_atomic32_read_band_mb(&BIF_P->state, + ~ERTS_PSFLG_TRAP_EXIT); +#ifdef ERTS_SMP + if (ERTS_PROC_PENDING_EXIT(BIF_P)) { + erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); + ERTS_BIF_EXITED(BIF_P); + } +#endif + old_value = (state & ERTS_PSFLG_TRAP_EXIT) ? am_true : am_false; - erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_STATUS); BIF_RET(old_value); } else if (BIF_ARG_1 == am_scheduler) { @@ -1617,11 +1648,13 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2) goto error; } erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR); - old_value = BIF_P->trace_flags & F_SENSITIVE ? am_true : am_false; + old_value = (ERTS_TRACE_FLAGS(BIF_P) & F_SENSITIVE + ? am_true + : am_false); if (is_sensitive) { - BIF_P->trace_flags |= F_SENSITIVE; + ERTS_TRACE_FLAGS(BIF_P) |= F_SENSITIVE; } else { - BIF_P->trace_flags &= ~F_SENSITIVE; + ERTS_TRACE_FLAGS(BIF_P) &= ~F_SENSITIVE; } erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR); BIF_RET(old_value); @@ -1747,8 +1780,9 @@ ebif_bang_2(BIF_ALIST_2) #define SEND_BADARG (-4) #define SEND_USER_ERROR (-5) #define SEND_INTERNAL_ERROR (-6) +#define SEND_AWAIT_RESULT (-7) -Sint do_send(Process *p, Eterm to, Eterm msg, int suspend); +Sint do_send(Process *p, Eterm to, Eterm msg, int suspend, Eterm *refp); static Sint remote_send(Process *p, DistEntry *dep, Eterm to, Eterm full_to, Eterm msg, int suspend) @@ -1802,7 +1836,7 @@ static Sint remote_send(Process *p, DistEntry *dep, } Sint -do_send(Process *p, Eterm to, Eterm msg, int suspend) { +do_send(Process *p, Eterm to, Eterm msg, int suspend, Eterm *refp) { Eterm portid; Port *pt; Process* rp; @@ -1814,16 +1848,10 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { trace_send(p, to, msg); if (ERTS_PROC_GET_SAVED_CALLS_BUF(p)) save_calls(p, &exp_send); - - if (internal_pid_index(to) >= erts_max_processes) - return SEND_BADARG; - rp = erts_proc_lookup_raw(to); - - if (!rp) { - ERTS_SMP_ASSERT_IS_NOT_EXITING(p); + rp = erts_proc_lookup_raw(to); + if (!rp) return 0; - } } else if (is_external_pid(to)) { dep = external_pid_dist_entry(to); if(dep == erts_this_dist_entry) { @@ -1832,7 +1860,7 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { "Discarding message %T from %T to %T in an old " "incarnation (%d) of this node (%d)\n", msg, - p->id, + p->common.id, to, external_pid_creation(to), erts_this_node->creation); @@ -1841,45 +1869,24 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { } return remote_send(p, dep, to, to, msg, suspend); } else if (is_atom(to)) { - - /* Need to virtual schedule out sending process - * because of lock wait. This is only necessary - * for internal port calling but the lock is bundled - * with name lookup. - */ - - if (IS_TRACED_FL(p, F_TRACE_SCHED_PROCS)) { - trace_virtual_sched(p, am_out); - } - if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { - profile_runnable_proc(p, am_inactive); - } - erts_whereis_name(p, ERTS_PROC_LOCK_MAIN, - to, - &rp, 0, 0, - &pt); + Eterm id = erts_whereis_name_to_id(p, to); + + rp = erts_proc_lookup(id); + if (rp) + goto send_message; + pt = erts_port_lookup(id, ERTS_PORT_SFLGS_INVALID_LOOKUP); if (pt) { - portid = pt->id; + portid = id; goto port_common; } - - /* Not a port virtually schedule the process back in */ - if (IS_TRACED_FL(p, F_TRACE_SCHED_PROCS)) { - trace_virtual_sched(p, am_in); - } - if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { - profile_runnable_proc(p, am_active); - } if (IS_TRACED(p)) trace_send(p, to, msg); if (ERTS_PROC_GET_SAVED_CALLS_BUF(p)) save_calls(p, &exp_send); - if (!rp) { - return SEND_BADARG; - } + return SEND_BADARG; } else if (is_external_port(to) && (external_port_dist_entry(to) == erts_this_dist_entry)) { @@ -1888,50 +1895,56 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { "Discarding message %T from %T to %T in an old " "incarnation (%d) of this node (%d)\n", msg, - p->id, + p->common.id, to, external_port_creation(to), erts_this_node->creation); erts_send_error_to_logger(p->group_leader, dsbufp); return 0; } else if (is_internal_port(to)) { + int ret_val; portid = to; - /* schedule out calling process, waiting for lock*/ - if (IS_TRACED_FL(p, F_TRACE_SCHED_PROCS)) { - trace_virtual_sched(p, am_out); - } - if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { - profile_runnable_proc(p, am_inactive); - } - pt = erts_id2port(to, p, ERTS_PROC_LOCK_MAIN); + + pt = erts_port_lookup(portid, ERTS_PORT_SFLGS_INVALID_LOOKUP); + port_common: - ERTS_SMP_LC_ASSERT(!pt || erts_lc_is_port_locked(pt)); + ret_val = 0; - /* We have waited for locks, trace schedule ports */ - if (pt && IS_TRACED_FL(pt, F_TRACE_SCHED_PORTS)) { - trace_sched_ports_where(pt, am_in, am_command); - } - if (pt && erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(pt)) { - profile_runnable_port(pt, am_active); - } - - /* XXX let port_command handle the busy stuff !!! */ - if (pt && (pt->status & ERTS_PORT_SFLG_PORT_BUSY)) { - if (suspend) { - erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt); - if (erts_system_monitor_flags.busy_port) { - monitor_generic(p, am_busy_port, portid); + if (pt) { + int ps_flags = suspend ? 0 : ERTS_PORT_SIG_FLG_NOSUSPEND; + *refp = NIL; + + switch (erts_port_command(p, ps_flags, pt, msg, refp)) { + case ERTS_PORT_OP_CALLER_EXIT: + /* We are exiting... */ + return SEND_USER_ERROR; + case ERTS_PORT_OP_BUSY: + /* Nothing has been sent */ + if (suspend) + erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt); + return SEND_YIELD; + case ERTS_PORT_OP_BUSY_SCHEDULED: + /* Message was sent */ + if (suspend) { + erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt); + ret_val = SEND_YIELD_RETURN; + break; } + /* Fall through */ + case ERTS_PORT_OP_SCHEDULED: + if (is_not_nil(*refp)) { + ASSERT(is_internal_ref(*refp)); + ret_val = SEND_AWAIT_RESULT; + } + break; + case ERTS_PORT_OP_DROPPED: + case ERTS_PORT_OP_BADARG: + case ERTS_PORT_OP_DONE: + break; + default: + ERTS_INTERNAL_ERROR("Unexpected erts_port_command() result"); + break; } - /* Virtually schedule out the port before releasing */ - if (IS_TRACED_FL(pt, F_TRACE_SCHED_PORTS)) { - trace_sched_ports_where(pt, am_out, am_command); - } - if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(pt)) { - profile_runnable_port(pt, am_inactive); - } - erts_port_release(pt); - return SEND_YIELD; } if (IS_TRACED(p)) /* trace once only !! */ @@ -1949,30 +1962,11 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { SEQ_TRACE_SEND, portid, p); } - /* XXX NO GC in port command */ - erts_port_command(p, p->id, pt, msg); - if (pt) { - /* Virtually schedule out the port before releasing */ - if (IS_TRACED_FL(pt, F_TRACE_SCHED_PORTS)) { - trace_sched_ports_where(pt, am_out, am_command); - } - if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(pt)) { - profile_runnable_port(pt, am_inactive); - } - erts_port_release(pt); - } - /* Virtually schedule in process */ - if (IS_TRACED_FL(p, F_TRACE_SCHED_PROCS)) { - trace_virtual_sched(p, am_in); - } - if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { - profile_runnable_proc(p, am_active); - } if (ERTS_PROC_IS_EXITING(p)) { KILL_CATCHES(p); /* Must exit */ return SEND_USER_ERROR; } - return 0; + return ret_val; } else if (is_tuple(to)) { /* Remote send */ int ret; tp = tuple_val(to); @@ -1988,47 +1982,24 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { dep = erts_sysname_to_connected_dist_entry(tp[2]); if (dep == erts_this_dist_entry) { + Eterm id; erts_deref_dist_entry(dep); if (IS_TRACED(p)) trace_send(p, to, msg); if (ERTS_PROC_GET_SAVED_CALLS_BUF(p)) save_calls(p, &exp_send); - - /* Need to virtual schedule out sending process - * because of lock wait. This is only necessary - * for internal port calling but the lock is bundled. - */ - - if (IS_TRACED_FL(p, F_TRACE_SCHED_PROCS)) { - trace_virtual_sched(p, am_out); - } - if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { - profile_runnable_proc(p, am_inactive); - } - erts_whereis_name(p, ERTS_PROC_LOCK_MAIN, - tp[1], - &rp, 0, 0, - &pt); + id = erts_whereis_name_to_id(p, tp[1]); + + rp = erts_proc_lookup_raw(id); + if (rp) + goto send_message; + pt = erts_port_lookup(id, ERTS_PORT_SFLGS_INVALID_LOOKUP); if (pt) { - portid = pt->id; + portid = id; goto port_common; } - /* Port lookup failed, virtually schedule the process - * back in. - */ - - if (IS_TRACED_FL(p, F_TRACE_SCHED_PROCS)) { - trace_virtual_sched(p, am_in); - } - if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { - profile_runnable_proc(p, am_active); - } - - if (!rp) { - return 0; - } - goto send_message; + return 0; } ret = remote_send(p, dep, tp[1], to, msg, suspend); @@ -2067,6 +2038,7 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { BIF_RETTYPE send_3(BIF_ALIST_3) { + Eterm ref; Process *p = BIF_P; Eterm to = BIF_ARG_1; Eterm msg = BIF_ARG_2; @@ -2090,12 +2062,18 @@ BIF_RETTYPE send_3(BIF_ALIST_3) if(!is_nil(l)) { BIF_ERROR(p, BADARG); } - - result = do_send(p, to, msg, suspend); + +#ifdef DEBUG + ref = NIL; +#endif + + result = do_send(p, to, msg, suspend, &ref); if (result > 0) { ERTS_VBUMP_REDS(p, result); BIF_RET(am_ok); - } else switch (result) { + } + + switch (result) { case 0: BIF_RET(am_ok); break; @@ -2118,6 +2096,9 @@ BIF_RETTYPE send_3(BIF_ALIST_3) ERTS_BIF_YIELD_RETURN(p, am_ok); else BIF_RET(am_nosuspend); + case SEND_AWAIT_RESULT: + ASSERT(is_internal_ref(ref)); + BIF_TRAP3(await_port_send_result_trap, p, ref, am_nosuspend, am_ok); case SEND_BADARG: BIF_ERROR(p, BADARG); break; @@ -2142,12 +2123,21 @@ BIF_RETTYPE send_2(BIF_ALIST_2) Eterm erl_send(Process *p, Eterm to, Eterm msg) { - Sint result = do_send(p, to, msg, !0); + Eterm ref; + Sint result; + +#ifdef DEBUG + ref = NIL; +#endif + + result = do_send(p, to, msg, !0, &ref); if (result > 0) { ERTS_VBUMP_REDS(p, result); BIF_RET(msg); - } else switch (result) { + } + + switch (result) { case 0: BIF_RET(msg); break; @@ -2159,6 +2149,9 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) break; case SEND_YIELD_RETURN: ERTS_BIF_YIELD_RETURN(p, msg); + case SEND_AWAIT_RESULT: + ASSERT(is_internal_ref(ref)); + BIF_TRAP3(await_port_send_result_trap, p, ref, msg, msg); case SEND_BADARG: BIF_ERROR(p, BADARG); break; @@ -3126,7 +3119,7 @@ BIF_RETTYPE list_to_tuple_1(BIF_ALIST_1) BIF_RETTYPE self_0(BIF_ALIST_0) { - BIF_RET(BIF_P->id); + BIF_RET(BIF_P->common.id); } /**********************************************************************/ @@ -3163,11 +3156,9 @@ static erts_smp_spinlock_t make_ref_lock; static erts_smp_mtx_t ports_snapshot_mtx; erts_smp_atomic_t erts_dead_ports_ptr; /* To store dying ports during snapshot */ -Eterm erts_make_ref_in_buffer(Eterm buffer[REF_THING_SIZE]) +void +erts_make_ref_in_array(Uint32 ref[ERTS_MAX_REF_NUMBERS]) { - Eterm* hp = buffer; - Uint32 ref0, ref1, ref2; - erts_smp_spin_lock(&make_ref_lock); reference0++; @@ -3179,24 +3170,36 @@ Eterm erts_make_ref_in_buffer(Eterm buffer[REF_THING_SIZE]) } } - ref0 = reference0; - ref1 = reference1; - ref2 = reference2; + ref[0] = reference0; + ref[1] = reference1; + ref[2] = reference2; erts_smp_spin_unlock(&make_ref_lock); +} + +Eterm erts_make_ref_in_buffer(Eterm buffer[REF_THING_SIZE]) +{ + Eterm* hp = buffer; + Uint32 ref[ERTS_MAX_REF_NUMBERS]; - write_ref_thing(hp, ref0, ref1, ref2); + erts_make_ref_in_array(ref); + write_ref_thing(hp, ref[0], ref[1], ref[2]); return make_internal_ref(hp); } Eterm erts_make_ref(Process *p) { Eterm* hp; + Uint32 ref[ERTS_MAX_REF_NUMBERS]; ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN & erts_proc_lc_my_proc_locks(p)); hp = HAlloc(p, REF_THING_SIZE); - return erts_make_ref_in_buffer(hp); + + erts_make_ref_in_array(ref); + write_ref_thing(hp, ref[0], ref[1], ref[2]); + + return make_internal_ref(hp); } BIF_RETTYPE make_ref_0(BIF_ALIST_0) @@ -3460,7 +3463,7 @@ BIF_RETTYPE garbage_collect_1(BIF_ALIST_1) BIF_ERROR(BIF_P, BADARG); } - if (BIF_P->id == BIF_ARG_1) + if (BIF_P->common.id == BIF_ARG_1) rp = BIF_P; else { #ifdef ERTS_SMP @@ -3500,71 +3503,23 @@ BIF_RETTYPE garbage_collect_0(BIF_ALIST_0) } /**********************************************************************/ -/* Return a list of active ports */ +/* + * The erlang:processes/0 BIF. + */ -BIF_RETTYPE ports_0(BIF_ALIST_0) +BIF_RETTYPE processes_0(BIF_ALIST_0) { - Eterm res = NIL; - Eterm* port_buf = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*erts_max_ports); - Eterm* pp = port_buf; - Eterm* dead_ports; - int alive, dead; - Uint32 next_ss; - int i; - - /* To get a consistent snapshot... - * We add alive ports from start of the buffer - * while dying ports are added from the other end by the killing threads. - */ - - erts_smp_mtx_lock(&ports_snapshot_mtx); /* One snapshot at a time */ - - erts_smp_atomic_set_nob(&erts_dead_ports_ptr, - (erts_aint_t) (port_buf + erts_max_ports)); - - next_ss = erts_smp_atomic32_inc_read_relb(&erts_ports_snapshot); - - for (i = erts_max_ports-1; i >= 0; i--) { - Port* prt = &erts_port[i]; - erts_smp_port_state_lock(prt); - if (!(prt->status & ERTS_PORT_SFLGS_DEAD) - && prt->snapshot != next_ss) { - ASSERT(prt->snapshot == next_ss - 1); - *pp++ = prt->id; - prt->snapshot = next_ss; /* Consumed by this snapshot */ - } - erts_smp_port_state_unlock(prt); - } - - dead_ports = (Eterm*)erts_smp_atomic_xchg_nob(&erts_dead_ports_ptr, - (erts_aint_t) NULL); - erts_smp_mtx_unlock(&ports_snapshot_mtx); - - ASSERT(pp <= dead_ports); - - alive = pp - port_buf; - dead = port_buf + erts_max_ports - dead_ports; - - ASSERT((alive+dead) <= erts_max_ports); - - if (alive+dead > 0) { - erts_aint_t i; - Eterm *hp = HAlloc(BIF_P, (alive+dead)*2); - - for (i = 0; i < alive; i++) { - res = CONS(hp, port_buf[i], res); - hp += 2; - } - for (i = 0; i < dead; i++) { - res = CONS(hp, dead_ports[i], res); - hp += 2; - } - } + return erts_ptab_list(BIF_P, &erts_proc); +} - erts_free(ERTS_ALC_T_TMP, port_buf); +/**********************************************************************/ +/* + * The erlang:ports/0 BIF. + */ - BIF_RET(res); +BIF_RETTYPE ports_0(BIF_ALIST_0) +{ + return erts_ptab_list(BIF_P, &erts_port); } /**********************************************************************/ @@ -4194,12 +4149,13 @@ BIF_RETTYPE system_flag_2(BIF_ALIST_2) BIF_RET(old_value); } } else if (BIF_ARG_1 == make_small(1)) { - Uint i; + int i, max; ErlMessage* mp; erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); erts_smp_thr_progress_block(); - for (i = 0; i < erts_max_processes; i++) { + max = erts_ptab_max(&erts_proc); + for (i = 0; i < max; i++) { Process *p = erts_pix2proc(i); if (p) { #ifdef USE_VM_PROBES @@ -4546,6 +4502,8 @@ void erts_init_bif(void) am_format_cpu_topology, 1); await_proc_exit_trap = erts_export_put(am_erlang,am_await_proc_exit,3); + await_port_send_result_trap + = erts_export_put(am_erts_internal, am_await_port_send_result, 3); await_sched_wall_time_mod_trap = erts_export_put(am_erlang, am_await_sched_wall_time_modifications, 2); erts_smp_atomic32_init_nob(&sched_wall_time, 0); @@ -4561,19 +4519,18 @@ bif erlang:send_to_logger/2 BIF_RETTYPE send_to_logger_2(BIF_ALIST_2) { byte *buf; - int len; + ErlDrvSizeT len; if (!is_atom(BIF_ARG_1) || !(is_list(BIF_ARG_2) || is_nil(BIF_ARG_1))) { BIF_ERROR(BIF_P,BADARG); } - len = io_list_len(BIF_ARG_2); - if (len < 0) + if (erts_iolist_size(BIF_ARG_2, &len) != 0) BIF_ERROR(BIF_P,BADARG); else if (len == 0) buf = ""; else { #ifdef DEBUG - int len2; + ErlDrvSizeT len2; #endif buf = (byte *) erts_alloc(ERTS_ALC_T_TMP, len+1); #ifdef DEBUG @@ -4581,7 +4538,7 @@ BIF_RETTYPE send_to_logger_2(BIF_ALIST_2) #else (void) #endif - io_list_to_buf(BIF_ARG_2, buf, len); + erts_iolist_to_buf(BIF_ARG_2, buf, len); ASSERT(len2 == len); buf[len] = '\0'; switch (BIF_ARG_1) { @@ -4675,7 +4632,6 @@ BIF_RETTYPE dt_prepend_vm_tag_data_1(BIF_ALIST_1) #ifdef USE_VM_PROBES Eterm b; Eterm *hp; - hp = HAlloc(BIF_P,2); if (is_binary((DT_UTAG(BIF_P)))) { Uint sz = binary_size(DT_UTAG(BIF_P)); int i; @@ -4692,6 +4648,7 @@ BIF_RETTYPE dt_prepend_vm_tag_data_1(BIF_ALIST_1) } else { b = new_binary(BIF_P,(byte *)"\0",1); } + hp = HAlloc(BIF_P,2); BIF_RET(CONS(hp,b,BIF_ARG_1)); #else BIF_RET(BIF_ARG_1); @@ -4702,7 +4659,6 @@ BIF_RETTYPE dt_append_vm_tag_data_1(BIF_ALIST_1) #ifdef USE_VM_PROBES Eterm b; Eterm *hp; - hp = HAlloc(BIF_P,2); if (is_binary((DT_UTAG(BIF_P)))) { Uint sz = binary_size(DT_UTAG(BIF_P)); int i; @@ -4719,6 +4675,7 @@ BIF_RETTYPE dt_append_vm_tag_data_1(BIF_ALIST_1) } else { b = new_binary(BIF_P,(byte *)"\0",1); } + hp = HAlloc(BIF_P,2); BIF_RET(CONS(hp,BIF_ARG_1,b)); #else BIF_RET(BIF_ARG_1); @@ -4742,14 +4699,14 @@ BIF_RETTYPE dt_spread_tag_1(BIF_ALIST_1) #ifdef DTRACE_TAG_HARDDEBUG erts_fprintf(stderr, "Dtrace -> (%T) start spreading tag %T\r\n", - BIF_P->id,DT_UTAG(BIF_P)); + BIF_P->common.id,DT_UTAG(BIF_P)); #endif } else { DT_UTAG_FLAGS(BIF_P) &= ~DT_UTAG_SPREADING; #ifdef DTRACE_TAG_HARDDEBUG erts_fprintf(stderr, "Dtrace -> (%T) stop spreading tag %T\r\n", - BIF_P->id,DT_UTAG(BIF_P)); + BIF_P->common.id,DT_UTAG(BIF_P)); #endif } } @@ -4775,7 +4732,7 @@ BIF_RETTYPE dt_restore_tag_1(BIF_ALIST_1) #ifdef DTRACE_TAG_HARDDEBUG erts_fprintf(stderr, "Dtrace -> (%T) restore Killing tag!\r\n", - BIF_P->id); + BIF_P->common.id); #endif } DT_UTAG(BIF_P) = NIL; @@ -4792,12 +4749,12 @@ BIF_RETTYPE dt_restore_tag_1(BIF_ALIST_1) erts_fprintf(stderr, "Dtrace -> (%T) restore stop spreading " "tag %T\r\n", - BIF_P->id, tpl[2]); + BIF_P->common.id, tpl[2]); } else if ((x & DT_UTAG_SPREADING) && !(DT_UTAG_FLAGS(BIF_P) & DT_UTAG_SPREADING)) { erts_fprintf(stderr, "Dtrace -> (%T) restore start spreading " - "tag %T\r\n",BIF_P->id,tpl[2]); + "tag %T\r\n",BIF_P->common.id,tpl[2]); } #endif DT_UTAG_FLAGS(BIF_P) = x; |