From 3ee5343415d6ae0ce1ff1c2a2555051431a9315e Mon Sep 17 00:00:00 2001 From: Dmytro Lytovchenko Date: Wed, 25 May 2016 14:37:03 +0200 Subject: erts: Add port monitors * erlang:monitor/2 with port argument is added, erlang:demonitor, using port task API and avoiding locking; * port_info and process_info support for monitored ports (with named port monitors support); * Exit signals contain type 'process' or 'port'; * Propagation of port exit signals; * Self-cleaning when origin process dies with monitor on; * 8 test cases + testcase for port driver crashing; * Documentation for all of the above (monitor, demonitor, port_info and process_info) updated --- erts/emulator/beam/bif.c | 410 ++++++++++++------- erts/emulator/beam/erl_bif_info.c | 100 +++-- erts/emulator/beam/erl_bif_port.c | 6 + erts/emulator/beam/erl_port.h | 58 ++- erts/emulator/beam/erl_process.c | 21 +- erts/emulator/beam/io.c | 390 +++++++++++++++--- erts/emulator/beam/register.c | 51 +-- erts/emulator/beam/register.h | 2 +- erts/emulator/test/monitor_SUITE.erl | 12 +- erts/emulator/test/port_SUITE.erl | 434 +++++++++++++++++---- erts/emulator/test/port_SUITE_data/Makefile.src | 2 +- .../test/port_SUITE_data/sleep_failure_drv.c | 76 ++++ 12 files changed, 1226 insertions(+), 336 deletions(-) create mode 100644 erts/emulator/test/port_SUITE_data/sleep_failure_drv.c (limited to 'erts/emulator') diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index b18910e2c7..fc14061a44 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -282,20 +282,17 @@ res_no_proc: { } } -#define ERTS_DEMONITOR_FALSE 2 -#define ERTS_DEMONITOR_TRUE 1 -#define ERTS_DEMONITOR_BADARG 0 -#define ERTS_DEMONITOR_YIELD_TRUE -1 -#define ERTS_DEMONITOR_INTERNAL_ERROR -2 - -static int +/* This function is allowed to return range of values handled by demonitor/1-2 + * Namely: atoms true, false, yield, internal_error, badarg or THE_NON_VALUE + */ +static Eterm remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) { ErtsDSigData dsd; ErtsMonitor *dmon; ErtsMonitor *mon; int code; - int res; + Eterm res = am_false; #ifndef ERTS_SMP int stale_mon = 0; #endif @@ -328,7 +325,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) mon = erts_remove_monitor(&ERTS_P_MONITORS(c_p), ref); erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_LINK); - res = ERTS_DEMONITOR_TRUE; + res = am_true; break; case ERTS_DSIG_PREP_CONNECTED: @@ -352,7 +349,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) * This is possible when smp support is enabled. * 'DOWN' message just arrived. */ - res = ERTS_DEMONITOR_TRUE; + res = am_true; } else { /* @@ -367,16 +364,13 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) : mon->pid), ref, 0); - res = (code == ERTS_DSIG_SEND_YIELD - ? ERTS_DEMONITOR_YIELD_TRUE - : ERTS_DEMONITOR_TRUE); + res = (code == ERTS_DSIG_SEND_YIELD ? am_yield : am_true); erts_destroy_monitor(dmon); - } break; default: ASSERT(! "Invalid dsig prepare result"); - return ERTS_DEMONITOR_INTERNAL_ERROR; + return am_internal_error; } #ifndef ERTS_SMP @@ -404,27 +398,96 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) return res; } -static int demonitor(Process *c_p, Eterm ref, Eterm *multip) +static ERTS_INLINE void +demonitor_local_process(Process *c_p, Eterm ref, Eterm to, Eterm *res) +{ + Process *rp = erts_pid2proc_opt(c_p, + ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK, + to, + ERTS_PROC_LOCK_LINK, + ERTS_P2P_FLG_ALLOW_OTHER_X); + ErtsMonitor *mon = erts_remove_monitor(&ERTS_P_MONITORS(c_p), ref); + +#ifndef ERTS_SMP + ASSERT(mon); +#else + if (!mon) + *res = am_false; + else +#endif + { + *res = am_true; + erts_destroy_monitor(mon); + } + if (rp) { + ErtsMonitor *rmon; + rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); + if (rp != c_p) + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + if (rmon != NULL) + erts_destroy_monitor(rmon); + } + else { + ERTS_SMP_ASSERT_IS_NOT_EXITING(c_p); + } +} + +static ERTS_INLINE BIF_RETTYPE +demonitor_local_port(Process *origin, Eterm ref, Eterm target) { - ErtsMonitor *mon = NULL; /* The monitor entry to delete */ - Process *rp; /* Local target process */ - Eterm to = NIL; /* Monitor link traget */ - DistEntry *dep = NULL; /* Target's distribution entry */ - int deref_de = 0; - int res; - int unlock_link = 1; + BIF_RETTYPE res = am_false; + Port *port = erts_port_lookup_raw(target); + + if (!port) { + BIF_ERROR(origin, BADARG); + } + erts_smp_proc_unlock(origin, ERTS_PROC_LOCK_LINK); + + if (port) { + Eterm trap_ref; + switch (erts_port_demonitor(origin, ERTS_PORT_DEMONITOR_NORMAL, + port, ref, &trap_ref)) { + case ERTS_PORT_OP_DROPPED: + case ERTS_PORT_OP_BADARG: + break; + case ERTS_PORT_OP_SCHEDULED: + BIF_TRAP3(await_port_send_result_trap, origin, trap_ref, + am_busy_port, am_true); + /* the busy_port atom will never be returned, because it cannot be + * returned from erts_port_(de)monitor, but just in case if in future + * internal API changes - you may see this atom */ + default: + break; + } + } + else { + ERTS_SMP_ASSERT_IS_NOT_EXITING(origin); + } + BIF_RET(res); +} +/* Can return atom true, false, yield, internal_error, badarg or + * THE_NON_VALUE if error occured or trap has been set up + */ +static +BIF_RETTYPE demonitor(Process *c_p, Eterm ref, Eterm *multip) +{ + ErtsMonitor *mon = NULL; /* The monitor entry to delete */ + Eterm to = NIL; /* Monitor link traget */ + DistEntry *dep = NULL; /* Target's distribution entry */ + int deref_de = 0; + BIF_RETTYPE res = am_false; + int unlock_link = 1; erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_LINK); if (is_not_internal_ref(ref)) { - res = ERTS_DEMONITOR_BADARG; + res = am_badarg; goto done; /* Cannot be this monitor's ref */ } mon = erts_lookup_monitor(ERTS_P_MONITORS(c_p), ref); if (!mon) { - res = ERTS_DEMONITOR_FALSE; goto done; } @@ -432,70 +495,50 @@ static int demonitor(Process *c_p, Eterm ref, Eterm *multip) case MON_TIME_OFFSET: *multip = am_true; erts_demonitor_time_offset(ref); - res = ERTS_DEMONITOR_TRUE; + res = am_true; break; case MON_ORIGIN: to = mon->pid; *multip = am_false; if (is_atom(to)) { - /* Monitoring a name at node to */ - ASSERT(is_node_name_atom(to)); - dep = erts_sysname_to_connected_dist_entry(to); - ASSERT(dep != erts_this_dist_entry); - if (dep) - deref_de = 1; + /* Monitoring a name at node to */ + ASSERT(is_node_name_atom(to)); + dep = erts_sysname_to_connected_dist_entry(to); + ASSERT(dep != erts_this_dist_entry); + if (dep) + deref_de = 1; + } else if (is_port(to)) { + if (port_dist_entry(to) != erts_this_dist_entry) { + goto badarg; + } + res = demonitor_local_port(c_p, ref, to); + unlock_link = 0; + goto done; } else { - ASSERT(is_pid(to)); - dep = pid_dist_entry(to); + ASSERT(is_pid(to)); + dep = pid_dist_entry(to); } if (dep != erts_this_dist_entry) { - res = remote_demonitor(c_p, dep, ref, to); - /* remote_demonitor() unlocks link lock on c_p */ - unlock_link = 0; + res = remote_demonitor(c_p, dep, ref, to); + /* remote_demonitor() unlocks link lock on c_p */ + unlock_link = 0; } else { /* Local monitor */ - if (deref_de) { - deref_de = 0; - erts_deref_dist_entry(dep); - } - dep = NULL; - rp = erts_pid2proc_opt(c_p, - ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK, - to, - ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - mon = erts_remove_monitor(&ERTS_P_MONITORS(c_p), ref); -#ifndef ERTS_SMP - ASSERT(mon); -#else - if (!mon) - res = ERTS_DEMONITOR_FALSE; - else -#endif - { - res = ERTS_DEMONITOR_TRUE; - erts_destroy_monitor(mon); - } - if (rp) { - ErtsMonitor *rmon; - rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); - if (rp != c_p) - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - if (rmon != NULL) - erts_destroy_monitor(rmon); - } - else { - ERTS_SMP_ASSERT_IS_NOT_EXITING(c_p); - } - + if (deref_de) { + deref_de = 0; + erts_deref_dist_entry(dep); + } + dep = NULL; + demonitor_local_process(c_p, ref, to, &res); } break; - default: - res = ERTS_DEMONITOR_BADARG; + default /* case */ : +badarg: + res = am_badarg; /* will be converted to error by caller */ *multip = am_false; break; } - done: +done: if (unlock_link) erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_LINK); @@ -506,21 +549,20 @@ static int demonitor(Process *c_p, Eterm ref, Eterm *multip) } ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); - return res; + BIF_RET(res); } BIF_RETTYPE demonitor_1(BIF_ALIST_1) { Eterm multi; switch (demonitor(BIF_P, BIF_ARG_1, &multi)) { - case ERTS_DEMONITOR_FALSE: - case ERTS_DEMONITOR_TRUE: - BIF_RET(am_true); - case ERTS_DEMONITOR_YIELD_TRUE: - ERTS_BIF_YIELD_RETURN(BIF_P, am_true); - case ERTS_DEMONITOR_BADARG: - BIF_ERROR(BIF_P, BADARG); - case ERTS_DEMONITOR_INTERNAL_ERROR: + case am_false: + case am_true: BIF_RET(am_true); + case THE_NON_VALUE: BIF_RET(THE_NON_VALUE); + case am_yield: ERTS_BIF_YIELD_RETURN(BIF_P, am_true); + case am_badarg: BIF_ERROR(BIF_P, BADARG); + + case am_internal_error: default: ASSERT(! "demonitor(): internal error"); BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); @@ -529,11 +571,11 @@ BIF_RETTYPE demonitor_1(BIF_ALIST_1) BIF_RETTYPE demonitor_2(BIF_ALIST_2) { - Eterm res = am_true; - Eterm multi = am_false; - int info = 0; - int flush = 0; - Eterm list = BIF_ARG_2; + BIF_RETTYPE res = am_true; + Eterm multi = am_false; + int info = 0; + int flush = 0; + Eterm list = BIF_ARG_2; while (is_list(list)) { Eterm* consp = list_val(list); @@ -554,24 +596,27 @@ BIF_RETTYPE demonitor_2(BIF_ALIST_2) goto badarg; switch (demonitor(BIF_P, BIF_ARG_1, &multi)) { - case ERTS_DEMONITOR_FALSE: + case THE_NON_VALUE: + /* If other error occured or trap has been set up - pass through */ + BIF_RET(THE_NON_VALUE); + case am_false: if (info) res = am_false; if (flush) { - flush_messages: +flush_messages: BIF_TRAP3(flush_monitor_messages_trap, BIF_P, BIF_ARG_1, multi, res); } - case ERTS_DEMONITOR_TRUE: + case am_true: if (multi == am_true && flush) goto flush_messages; BIF_RET(res); - case ERTS_DEMONITOR_YIELD_TRUE: + case am_yield: ERTS_BIF_YIELD_RETURN(BIF_P, am_true); - case ERTS_DEMONITOR_BADARG: - badarg: + case am_badarg: +badarg: BIF_ERROR(BIF_P, BADARG); - case ERTS_DEMONITOR_INTERNAL_ERROR: + case am_internal_error: default: ASSERT(! "demonitor(): internal error"); BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); @@ -615,14 +660,13 @@ erts_queue_monitor_message(Process *p, erts_queue_message(p, *p_locksp, msgp, tup, am_system); } -static BIF_RETTYPE +static Eterm local_pid_monitor(Process *p, Eterm target, Eterm mon_ref, int boolean) { - BIF_RETTYPE ret; - Process *rp; + Eterm ret = mon_ref; + Process *rp; ErtsProcLocks p_locks = ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK; - ERTS_BIF_PREP_RET(ret, mon_ref); if (target == p->common.id) { return ret; } @@ -658,40 +702,112 @@ local_pid_monitor(Process *p, Eterm target, Eterm mon_ref, int boolean) } static BIF_RETTYPE -local_name_monitor(Process *p, Eterm target_name) +local_port_monitor(Process *origin, Eterm target) { - BIF_RETTYPE ret; - Eterm mon_ref; - ErtsProcLocks p_locks = ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK; - Process *rp; + BIF_RETTYPE ref = erts_make_ref(origin); + Port *port = erts_sig_lookup_port(origin, target); + ErtsProcLocks p_locks = ERTS_PROC_LOCK_MAIN; - mon_ref = erts_make_ref(p); - ERTS_BIF_PREP_RET(ret, mon_ref); - erts_smp_proc_lock(p, ERTS_PROC_LOCK_LINK); - rp = erts_whereis_process(p, p_locks, target_name, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - if (!rp) { - DeclareTmpHeap(lhp,3,p); + if (!port) { +res_no_proc: + /* Send the DOWN message immediately. Ref is made on the fly because + * caller has never seen it yet. */ + erts_queue_monitor_message(origin, &p_locks, ref, + am_port, target, am_noproc); + } + else { + switch (erts_port_monitor(origin, port, target, &ref)) { + case ERTS_PORT_OP_DROPPED: + case ERTS_PORT_OP_BADARG: + goto res_no_proc; + case ERTS_PORT_OP_SCHEDULED: + BIF_TRAP3(await_port_send_result_trap, origin, ref, + am_busy_port, ref); + /* the busy_port atom will never be returned, because it cannot be + * returned from erts_port_monitor, but just in case if in future + * internal API changes - you may see this atom */ + default: + break; + } + } + erts_smp_proc_unlock(origin, p_locks & ~ERTS_PROC_LOCK_MAIN); + BIF_RET(ref); +} + +/* Type = process | port :: atom(), 1st argument passed to erlang:monitor/2 + */ +static BIF_RETTYPE +local_name_monitor(Process *self, Eterm type, Eterm target_name) +{ + BIF_RETTYPE ret = erts_make_ref(self); + + ErtsProcLocks p_locks = ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK; + Process *proc = NULL; + Port *port = NULL; + + erts_smp_proc_lock(self, ERTS_PROC_LOCK_LINK); + + erts_whereis_name(self, p_locks, target_name, + &proc, ERTS_PROC_LOCK_LINK, + ERTS_P2P_FLG_ALLOW_OTHER_X, + &port, 0); + + /* If the name is not registered, + * or if we asked for proc and got a port, + * or if we asked for port and got a proc, + * we just send the 'DOWN' message. + */ + if ((!proc && !port) || + (type == am_process && port) || + (type == am_port && proc)) { + DeclareTmpHeap(lhp,3,self); Eterm item; - UseTmpHeap(3,p); - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK); + UseTmpHeap(3,self); + + erts_smp_proc_unlock(self, ERTS_PROC_LOCK_LINK); p_locks &= ~ERTS_PROC_LOCK_LINK; + item = TUPLE2(lhp, target_name, erts_this_dist_entry->sysname); - erts_queue_monitor_message(p, &p_locks, - mon_ref, am_process, item, am_noproc); - UnUseTmpHeap(3,p); - } - else if (rp != p) { - erts_add_monitor(&ERTS_P_MONITORS(p), MON_ORIGIN, mon_ref, rp->common.id, - target_name); - erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, mon_ref, p->common.id, - target_name); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + erts_queue_monitor_message(self, &p_locks, + ret, + type, /* = process|port :: atom() */ + item, am_noproc); + UnUseTmpHeap(3,self); + } + else if (port) { + erts_smp_proc_unlock(self, p_locks & ~ERTS_PROC_LOCK_MAIN); + p_locks &= ~ERTS_PROC_LOCK_MAIN; + + switch (erts_port_monitor(self, port, target_name, &ret)) { + case ERTS_PORT_OP_DONE: + return ret; + case ERTS_PORT_OP_SCHEDULED: { /* Scheduled a signal */ + ASSERT(is_internal_ref(ret)); + BIF_TRAP3(await_port_send_result_trap, self, + ret, am_true, ret); + /* bif_trap returns */ + } break; + default: + goto badarg; + } + } + else if (proc != self) { + erts_add_monitor(&ERTS_P_MONITORS(self), MON_ORIGIN, ret, + proc->common.id, target_name); + erts_add_monitor(&ERTS_P_MONITORS(proc), MON_TARGET, ret, + self->common.id, target_name); + erts_smp_proc_unlock(proc, ERTS_PROC_LOCK_LINK); } - erts_smp_proc_unlock(p, p_locks & ~ERTS_PROC_LOCK_MAIN); - - return ret; + if (p_locks) { + erts_smp_proc_unlock(self, p_locks & ~ERTS_PROC_LOCK_MAIN); + } + BIF_RET(ret); +badarg: + if (p_locks) { + erts_smp_proc_unlock(self, p_locks & ~ERTS_PROC_LOCK_MAIN); + } + BIF_ERROR(self, BADARG); } static BIF_RETTYPE @@ -758,7 +874,7 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, break; } - return ret; + BIF_RET(ret); } BIF_RETTYPE monitor_2(BIF_ALIST_2) @@ -772,8 +888,9 @@ BIF_RETTYPE monitor_2(BIF_ALIST_2) switch (BIF_ARG_1) { case am_time_offset: { Eterm ref; - if (BIF_ARG_2 != am_clock_service) - goto error; + if (BIF_ARG_2 != am_clock_service) { + goto badarg; + } ref = erts_make_ref(BIF_P); erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK); erts_add_monitor(&ERTS_P_MONITORS(BIF_P), MON_TIME_OFFSET, @@ -783,46 +900,57 @@ BIF_RETTYPE monitor_2(BIF_ALIST_2) BIF_RET(ref); } case am_process: + case am_port: break; default: - goto error; + goto badarg; } - if (is_internal_pid(target)) { - local_pid: - ret = local_pid_monitor(BIF_P, target, erts_make_ref(BIF_P), 0); - } else if (is_external_pid(target)) { + if (is_internal_pid(target) && BIF_ARG_1 == am_process) { +local_pid: + ret = local_pid_monitor(BIF_P, target, erts_make_ref(BIF_P), 0); + } else if (is_external_pid(target) && BIF_ARG_1 == am_process) { dep = external_pid_dist_entry(target); if (dep == erts_this_dist_entry) goto local_pid; ret = remote_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, dep, target, 0); + } else if (is_internal_port(target) && BIF_ARG_1 == am_port) { +local_port: + ret = local_port_monitor(BIF_P, target); + } else if (is_external_port(target) && BIF_ARG_1 == am_port) { + dep = external_port_dist_entry(target); + if (dep == erts_this_dist_entry) { + goto local_port; + } + goto badarg; /* No want remote port */ } else if (is_atom(target)) { - ret = local_name_monitor(BIF_P, target); + ret = local_name_monitor(BIF_P, BIF_ARG_1, target); } else if (is_tuple(target)) { Eterm *tp = tuple_val(target); Eterm remote_node; Eterm name; - if (arityval(*tp) != 2) - goto error; + if (arityval(*tp) != 2) { + goto badarg; + } remote_node = tp[2]; name = tp[1]; if (!is_atom(remote_node) || !is_atom(name)) { - goto error; + goto badarg; } if (!erts_is_alive && remote_node != am_Noname) { - goto error; /* Remote monitor from (this) undistributed node */ + goto badarg; /* Remote monitor from (this) undistributed node */ } dep = erts_sysname_to_connected_dist_entry(remote_node); if (dep == erts_this_dist_entry) { deref_de = 1; - ret = local_name_monitor(BIF_P, name); + ret = local_name_monitor(BIF_P, BIF_ARG_1, name); } else { if (dep) deref_de = 1; ret = remote_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, dep, name, 1); } } else { - error: +badarg: ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG); } if (deref_de) { diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index b410578d37..3fb866733c 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -361,8 +361,13 @@ erts_print_system_version(int to, void *arg, Process *c_p) } typedef struct { + /* {Entity,Node} = {monitor.Name,monitor.Pid} for external by name + * {Entity,Node} = {monitor.Pid,NIL} for external/external by pid + * {Entity,Node} = {monitor.Name,erlang:node()} for internal by name */ Eterm entity; Eterm node; + /* pid is actual target being monitored, no matter pid/port or name */ + Eterm pid; } MonitorInfo; typedef struct { @@ -420,21 +425,27 @@ static void collect_one_origin_monitor(ErtsMonitor *mon, void *vmicp) EXTEND_MONITOR_INFOS(micp); if (is_atom(mon->pid)) { /* external by name */ micp->mi[micp->mi_i].entity = mon->name; - micp->mi[micp->mi_i].node = mon->pid; - micp->sz += 3; /* need one 2-tuple */ + micp->mi[micp->mi_i].node = mon->pid; + micp->sz += 3; /* need one 2-tuple */ } else if (is_external_pid(mon->pid)) { /* external by pid */ micp->mi[micp->mi_i].entity = mon->pid; - micp->mi[micp->mi_i].node = NIL; - micp->sz += NC_HEAP_SIZE(mon->pid); + micp->mi[micp->mi_i].node = NIL; + micp->sz += NC_HEAP_SIZE(mon->pid); } else if (!is_nil(mon->name)) { /* internal by name */ micp->mi[micp->mi_i].entity = mon->name; - micp->mi[micp->mi_i].node = erts_this_dist_entry->sysname; - micp->sz += 3; /* need one 2-tuple */ + micp->mi[micp->mi_i].node = erts_this_dist_entry->sysname; + micp->sz += 3; /* need one 2-tuple */ } else { /* internal by pid */ micp->mi[micp->mi_i].entity = mon->pid; - micp->mi[micp->mi_i].node = NIL; + micp->mi[micp->mi_i].node = NIL; /* no additional heap space needed */ } + + /* have always pid at hand, to assist with figuring out if its a port or + * a process, when we monitored by name and process_info is requested. + * See: erl_bif_info.c:process_info_aux section for am_monitors */ + micp->mi[micp->mi_i].pid = mon->pid; + micp->mi_i++; micp->sz += 2 + 3; /* For a cons cell and a 2-tuple */ } @@ -1190,37 +1201,49 @@ process_info_aux(Process *BIF_P, case am_monitors: { MonitorInfoCollection mic; - int i; + int i; INIT_MONITOR_INFOS(mic); - erts_doforall_monitors(ERTS_P_MONITORS(rp),&collect_one_origin_monitor,&mic); - hp = HAlloc(BIF_P, 3 + mic.sz); + erts_doforall_monitors(ERTS_P_MONITORS(rp), + &collect_one_origin_monitor, &mic); + hp = HAlloc(BIF_P, 3 + mic.sz); res = NIL; for (i = 0; i < mic.mi_i; i++) { if (is_atom(mic.mi[i].entity)) { /* Monitor by name. - * Build {process, {Name, Node}} and cons it. + * Build {process|port, {Name, Node}} and cons it. */ Eterm t1, t2; + /* If pid is an atom, then it is a remote named monitor, which + has to be a process */ + Eterm m_type = is_port(mic.mi[i].pid) ? am_port : am_process; + ASSERT(is_pid(mic.mi[i].pid) + || is_port(mic.mi[i].pid) + || is_atom(mic.mi[i].pid)); t1 = TUPLE2(hp, mic.mi[i].entity, mic.mi[i].node); hp += 3; - t2 = TUPLE2(hp, am_process, t1); + t2 = TUPLE2(hp, m_type, t1); hp += 3; res = CONS(hp, t2, res); - hp += 2; + hp += 2; } else { - /* Monitor by pid. Build {process, Pid} and cons it. */ + /* Monitor by pid. Build {process|port, Pid} and cons it. */ Eterm t; Eterm pid = STORE_NC(&hp, &MSO(BIF_P), mic.mi[i].entity); - t = TUPLE2(hp, am_process, pid); + + Eterm m_type = is_port(mic.mi[i].pid) ? am_port : am_process; + ASSERT(is_pid(mic.mi[i].pid) + || is_port(mic.mi[i].pid)); + + t = TUPLE2(hp, m_type, pid); hp += 3; res = CONS(hp, t, res); - hp += 2; + hp += 2; } } - DESTROY_MONITOR_INFOS(mic); + DESTROY_MONITOR_INFOS(mic); break; } @@ -2880,7 +2903,8 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) */ Eterm -erts_bld_port_info(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, Port *prt, Eterm item) +erts_bld_port_info(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, Port *prt, + Eterm item) { Eterm res = THE_NON_VALUE; @@ -2928,8 +2952,8 @@ erts_bld_port_info(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, Port *prt, Eterm ite Eterm item; INIT_MONITOR_INFOS(mic); - - erts_doforall_monitors(ERTS_P_MONITORS(prt), &collect_one_origin_monitor, &mic); + erts_doforall_monitors(ERTS_P_MONITORS(prt), + &collect_one_origin_monitor, &mic); if (szp) *szp += mic.sz; @@ -2938,14 +2962,16 @@ erts_bld_port_info(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, Port *prt, Eterm ite res = NIL; for (i = 0; i < mic.mi_i; i++) { Eterm t; - item = STORE_NC(hpp, ohp, mic.mi[i].entity); - t = TUPLE2(*hpp, am_process, item); + Eterm m_type; + + item = STORE_NC(hpp, ohp, mic.mi[i].entity); + m_type = is_port(item) ? am_port : am_process; + t = TUPLE2(*hpp, m_type, item); *hpp += 3; res = CONS(*hpp, t, res); *hpp += 2; } - } - + } // hpp DESTROY_MONITOR_INFOS(mic); if (szp) { @@ -2953,6 +2979,32 @@ erts_bld_port_info(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, Port *prt, Eterm ite goto done; } } + else if (item == am_monitored_by) { + MonitorInfoCollection mic; + int i; + Eterm item; + + INIT_MONITOR_INFOS(mic); + erts_doforall_monitors(ERTS_P_MONITORS(prt), + &collect_one_target_monitor, &mic); + if (szp) + *szp += mic.sz; + + if (hpp) { + res = NIL; + for (i = 0; i < mic.mi_i; ++i) { + item = STORE_NC(hpp, ohp, mic.mi[i].entity); + res = CONS(*hpp, item, res); + *hpp += 2; + } + } // hpp + DESTROY_MONITOR_INFOS(mic); + + if (szp) { + res = am_true; + goto done; + } + } else if (item == am_name) { int count = sys_strlen(prt->name); diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c index 37f4e1de49..208935bed9 100644 --- a/erts/emulator/beam/erl_bif_port.c +++ b/erts/emulator/beam/erl_bif_port.c @@ -139,6 +139,12 @@ sig_lookup_port(Process *c_p, Eterm id_or_name) return lookup_port(c_p, id_or_name, ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); } +/* Non-inline copy of sig_lookup_port to be exported */ +Port *erts_sig_lookup_port(Process *c_p, Eterm id_or_name) +{ + return lookup_port(c_p, id_or_name, ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); +} + static ERTS_INLINE Port * data_lookup_port(Process *c_p, Eterm id_or_name) { diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index f0075ca2b9..f90844ccc8 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -361,6 +361,8 @@ Eterm erts_request_io_bytes(Process *c_p); #define ERTS_PORT_REDS_CONNECT (CONTEXT_REDS/200) #define ERTS_PORT_REDS_UNLINK (CONTEXT_REDS/200) #define ERTS_PORT_REDS_LINK (CONTEXT_REDS/200) +#define ERTS_PORT_REDS_MONITOR (CONTEXT_REDS/200) +#define ERTS_PORT_REDS_DEMONITOR (CONTEXT_REDS/200) #define ERTS_PORT_REDS_BADSIG (CONTEXT_REDS/200) #define ERTS_PORT_REDS_CONTROL (CONTEXT_REDS/100) #define ERTS_PORT_REDS_CALL (CONTEXT_REDS/50) @@ -850,16 +852,20 @@ void erts_port_resume_procs(Port *); struct binary; -#define ERTS_P2P_SIG_TYPE_BAD 0 -#define ERTS_P2P_SIG_TYPE_OUTPUT 1 -#define ERTS_P2P_SIG_TYPE_OUTPUTV 2 -#define ERTS_P2P_SIG_TYPE_CONNECT 3 -#define ERTS_P2P_SIG_TYPE_EXIT 4 -#define ERTS_P2P_SIG_TYPE_CONTROL 5 -#define ERTS_P2P_SIG_TYPE_CALL 6 -#define ERTS_P2P_SIG_TYPE_INFO 7 -#define ERTS_P2P_SIG_TYPE_LINK 8 -#define ERTS_P2P_SIG_TYPE_UNLINK 9 +enum { + ERTS_P2P_SIG_TYPE_BAD = 0, + ERTS_P2P_SIG_TYPE_OUTPUT = 1, + ERTS_P2P_SIG_TYPE_OUTPUTV = 2, + ERTS_P2P_SIG_TYPE_CONNECT = 3, + ERTS_P2P_SIG_TYPE_EXIT = 4, + ERTS_P2P_SIG_TYPE_CONTROL = 5, + ERTS_P2P_SIG_TYPE_CALL = 6, + ERTS_P2P_SIG_TYPE_INFO = 7, + ERTS_P2P_SIG_TYPE_LINK = 8, + ERTS_P2P_SIG_TYPE_UNLINK = 9, + ERTS_P2P_SIG_TYPE_MONITOR = 10, + ERTS_P2P_SIG_TYPE_DEMONITOR = 11 +}; #define ERTS_P2P_SIG_TYPE_BITS 4 #define ERTS_P2P_SIG_TYPE_MASK \ @@ -921,6 +927,15 @@ struct ErtsProc2PortSigData_ { struct { Eterm from; } unlink; + struct { + Eterm origin; /* who receives monitor event, pid */ + Eterm name; /* either name for named monitor, or port id */ + } monitor; + struct { + Eterm origin; /* who is at the other end of the monitor, pid */ + Eterm name; /* port id */ + Uint32 ref[ERTS_MAX_REF_NUMBERS]; /* box contents of a ref */ + } demonitor; } u; } ; @@ -1017,6 +1032,29 @@ ErtsPortOpResult erts_port_control(Process *, Port *, unsigned int, Eterm, Eterm ErtsPortOpResult erts_port_call(Process *, Port *, unsigned int, Eterm, Eterm *); ErtsPortOpResult erts_port_info(Process *, Port *, Eterm, Eterm *); +/* Creates monitor between Origin and Target. Ref must be initialized to + * a reference (ref may be rewritten to be used to serve additionally as a + * signal id). Name is atom if user monitors port by name or NIL */ +ErtsPortOpResult erts_port_monitor(Process *origin, Port *target, Eterm name, + Eterm *ref); + +typedef enum { + /* Normal demonitor rules apply with locking and reductions bump */ + ERTS_PORT_DEMONITOR_NORMAL = 1, + /* Relaxed demonitor rules when process is about to die, which means that + * pid lookup won't work, locks won't work, no reductions bump. */ + ERTS_PORT_DEMONITOR_ORIGIN_ON_DEATHBED = 2, +} ErtsDemonitorMode; + +/* Removes monitor between origin and target, identified by ref. + * origin_is_dying can be 0 (false, normal locking rules and reductions bump + * apply) or 1 (true, in case when we avoid origin locking) */ +ErtsPortOpResult erts_port_demonitor(Process *origin, ErtsDemonitorMode mode, + Port *target, Eterm ref, + Eterm *trap_ref); +/* defined in erl_bif_port.c */ +Port *erts_sig_lookup_port(Process *c_p, Eterm id_or_name); + int erts_port_output_async(Port *, Eterm, Eterm); /* diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index c0b1d7246c..2c3d1bb7eb 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -12239,7 +12239,6 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) ExitMonitorContext *pcontext = vpcontext; DistEntry *dep; ErtsMonitor *rmon; - Process *rp; switch (mon->type) { case MON_ORIGIN: @@ -12268,9 +12267,10 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_deref_dist_entry(dep); } } else { - ASSERT(is_pid(mon->pid)); - if (is_internal_pid(mon->pid)) { /* local by pid or name */ - rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK); + ASSERT(is_pid(mon->pid) || is_port(mon->pid)); + /* if is local by pid or name */ + if (is_internal_pid(mon->pid)) { + Process *rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK); if (!rp) { goto done; } @@ -12280,7 +12280,17 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) goto done; } erts_destroy_monitor(rmon); - } else { /* remote by pid */ + } else if (is_internal_port(mon->pid)) { + /* Is a local port */ + Port *prt = erts_port_lookup_raw(mon->pid); + if (!prt) { + goto done; + } + erts_port_demonitor(pcontext->p, + ERTS_PORT_DEMONITOR_ORIGIN_ON_DEATHBED, + prt, mon->ref, NULL); + return; /* let erts_port_demonitor do the deletion */ + } else { /* remote by pid */ ASSERT(is_external_pid(mon->pid)); dep = external_pid_dist_entry(mon->pid); ASSERT(dep != NULL); @@ -12318,6 +12328,7 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_port_release(prt); } else if (is_internal_pid(mon->pid)) {/* local by name or pid */ Eterm watched; + Process *rp; DeclareTmpHeapNoproc(lhp,3); ErtsProcLocks rp_locks = (ERTS_PROC_LOCK_LINK | ERTS_PROC_LOCKS_MSG_SEND); diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 01df5476db..cb8792dffa 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -1260,7 +1260,7 @@ typedef struct { /* * Try doing an immediate driver callback call from a process. If * this fail, the operation should be scheduled in the normal case... - * + * Returns: ok to do the call, or error (lock busy, does not exist, etc) */ static ERTS_INLINE ErtsTryImmDrvCallResult try_imm_drv_call(ErtsTryImmDrvCallState *sp) @@ -3073,6 +3073,250 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) port_sig_link); } +static void +port_monitor_failure(Eterm port_id, Eterm origin, Eterm ref_DOWN) +{ + Process *origin_p; + ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK; + ASSERT(is_internal_pid(origin)); + + origin_p = erts_pid2proc(NULL, 0, origin, p_locks); + if (! origin_p) { return; } + + /* Send the DOWN message immediately. Ref is made on the fly because + * caller has never seen it yet. */ + erts_queue_monitor_message(origin_p, &p_locks, ref_DOWN, + am_port, port_id, am_noproc); + erts_smp_proc_unlock(origin_p, p_locks); +} + +/* Origin wants to monitor port Prt. State contains possible error, which has + * happened just before. Name is either NIL or an atom, if user monitors + * a port by name. Ref is premade reference that will be returned to user */ +static void +port_monitor(Port *prt, erts_aint32_t state, Eterm origin, + Eterm name, Eterm ref) +{ + Eterm name_or_nil = is_atom(name) ? name : NIL; + + ASSERT(is_pid(origin)); + ASSERT(is_atom(name) || is_port(name) || name == NIL); + ASSERT(is_internal_ref(ref)); + + if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) { + ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK; + + Process *origin_p = erts_pid2proc(NULL, 0, origin, p_locks); + if (! origin_p) { + goto failure; + } + erts_add_monitor(&ERTS_P_MONITORS(origin_p), MON_ORIGIN, ref, + prt->common.id, name_or_nil); + erts_add_monitor(&ERTS_P_MONITORS(prt), MON_TARGET, ref, + origin, name_or_nil); + + erts_smp_proc_unlock(origin_p, p_locks); + } else { +failure: + port_monitor_failure(prt->common.id, origin, ref); + } +} + +static int +port_sig_monitor(Port *prt, erts_aint32_t state, int op, + ErtsProc2PortSigData *sigdp) +{ + Eterm hp[REF_THING_SIZE]; + Eterm ref = make_internal_ref(&hp); + write_ref_thing(hp, sigdp->ref[0], sigdp->ref[1], sigdp->ref[2]); + + if (op == ERTS_PROC2PORT_SIG_EXEC) { + /* erts_add_monitor call inside port_monitor will copy ref from hp */ + port_monitor(prt, state, + sigdp->u.monitor.origin, + sigdp->u.monitor.name, + ref); + } else { + port_monitor_failure(sigdp->u.monitor.name, + sigdp->u.monitor.origin, + ref); + } + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) { + port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + } + return ERTS_PORT_REDS_MONITOR; +} + +/* Creates monitor between Origin and Target. Ref must be initialized to + * a reference (ref may be rewritten to be used to serve additionally as a + * signal id). Name is atom if user monitors port by name or NIL */ +ErtsPortOpResult +erts_port_monitor(Process *origin, Port *port, Eterm name, Eterm *refp) +{ + ErtsProc2PortSigData *sigdp; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( + origin, port, ERTS_PORT_SFLGS_INVALID_LOOKUP, + 0, + 0, /* trap_ref is always set so !trap_ref always is false */ + am_monitor); + + ASSERT(origin); + ASSERT(port); + ASSERT(is_atom(name) || is_port(name)); + ASSERT(refp); + + switch (try_imm_drv_call(&try_call_state)) { + case ERTS_TRY_IMM_DRV_CALL_OK: + port_monitor(port, try_call_state.state, origin->common.id, name, *refp); + finalize_imm_drv_call(&try_call_state); + BUMP_REDS(origin, ERTS_PORT_REDS_MONITOR); + return ERTS_PORT_OP_DONE; + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + return ERTS_PORT_OP_BADARG; + default: + break; /* Schedule call instead... */ + } + + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = ERTS_P2P_SIG_TYPE_MONITOR; + sigdp->u.monitor.origin = origin->common.id; + sigdp->u.monitor.name = name; /* either named monitor, or port id */ + + /* Ref contents will be initialized here */ + return erts_schedule_proc2port_signal(origin, port, origin->common.id, + refp, sigdp, 0, NULL, + port_sig_monitor); +} + +static void +port_demonitor_failure(Eterm port_id, Eterm origin, Eterm ref) +{ + Process *origin_p; + ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; + ErtsMonitor *mon1; + ASSERT(is_internal_pid(origin)); + + origin_p = erts_pid2proc(NULL, 0, origin, rp_locks); + if (! origin_p) { return; } + + /* do not send any DOWN messages, drop monitors on process */ + mon1 = erts_remove_monitor(&ERTS_P_MONITORS(origin_p), ref); + if (mon1 != NULL) { + erts_destroy_monitor(mon1); + } + + erts_smp_proc_unlock(origin_p, rp_locks); +} + +/* Origin wants to demonitor port Prt. State contains possible error, which has + * happened just before. Ref is reference to monitor */ +static void +port_demonitor(Port *port, erts_aint32_t state, Eterm origin, Eterm ref) +{ + ASSERT(port); + ASSERT(is_pid(origin)); + ASSERT(is_internal_ref(ref)); + + if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) { + ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK; + Process *origin_p = erts_pid2proc(NULL, 0, origin, p_locks); + if (origin_p) { + ErtsMonitor *mon1 = erts_remove_monitor(&ERTS_P_MONITORS(origin_p), + ref); + if (mon1 != NULL) { + erts_destroy_monitor(mon1); + } + } + if (1) { + ErtsMonitor *mon2 = erts_remove_monitor(&ERTS_P_MONITORS(port), + ref); + if (mon2 != NULL) { + erts_destroy_monitor(mon2); + } + } + if (origin_p) { /* when origin is dying, it won't be found */ + erts_smp_proc_unlock(origin_p, p_locks); + } + } else { + port_demonitor_failure(port->common.id, origin, ref); + } +} + +static int +port_sig_demonitor(Port *prt, erts_aint32_t state, int op, + ErtsProc2PortSigData *sigdp) +{ + Eterm hp[REF_THING_SIZE]; + Eterm ref = make_internal_ref(&hp); + write_ref_thing(hp, sigdp->u.demonitor.ref[0], + sigdp->u.demonitor.ref[1], + sigdp->u.demonitor.ref[2]); + if (op == ERTS_PROC2PORT_SIG_EXEC) { + port_demonitor(prt, state, sigdp->u.demonitor.origin, ref); + } else { + port_demonitor_failure(sigdp->u.demonitor.name, + sigdp->u.demonitor.origin, + ref); + } + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) { + port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + } + return ERTS_PORT_REDS_DEMONITOR; +} + +/* Removes monitor between origin and target, identified by ref. + * Mode defines normal or relaxed demonitor rules (process is at death) */ +ErtsPortOpResult erts_port_demonitor(Process *origin, ErtsDemonitorMode mode, + Port *target, Eterm ref, + Eterm *trap_ref) +{ + Process *c_p = mode == ERTS_PORT_DEMONITOR_NORMAL ? origin : NULL; + ErtsProc2PortSigData *sigdp; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( + c_p, + target, ERTS_PORT_SFLGS_INVALID_LOOKUP, + 0, + !trap_ref, + am_demonitor); + + ASSERT(origin); + ASSERT(target); + ASSERT(is_internal_ref(ref)); + + switch (try_imm_drv_call(&try_call_state)) { + case ERTS_TRY_IMM_DRV_CALL_OK: + port_demonitor(target, try_call_state.state, origin->common.id, ref); + finalize_imm_drv_call(&try_call_state); + if (mode == ERTS_PORT_DEMONITOR_NORMAL) { + BUMP_REDS(origin, ERTS_PORT_REDS_DEMONITOR); + } + return ERTS_PORT_OP_DONE; + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + return ERTS_PORT_OP_BADARG; + default: + break; /* Schedule call instead... */ + } + + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = ERTS_P2P_SIG_TYPE_DEMONITOR; + sigdp->u.demonitor.origin = origin->common.id; + sigdp->u.demonitor.name = target->common.id; + { + RefThing *reft = ref_thing_ptr(ref); + /* Start from 1 skip ref arity */ + sys_memcpy(sigdp->u.demonitor.ref, + internal_thing_ref_numbers(reft), + sizeof(sigdp->u.demonitor.ref)); + } + + /* Ref contents will be initialized here */ + return erts_schedule_proc2port_signal(c_p, target, origin->common.id, + trap_ref, sigdp, 0, NULL, + port_sig_demonitor); +} + static void init_ack_send_reply(Port *port, Eterm resp) { @@ -3942,23 +4186,30 @@ erts_terminate_port(Port *pp) terminate_port(pp); } +static void port_fire_one_monitor(ErtsMonitor *mon, void *ctx0); static void sweep_one_monitor(ErtsMonitor *mon, void *vpsc) { - ErtsMonitor *rmon; - Process *rp; + switch (mon->type) { + case MON_ORIGIN: { + ErtsMonitor *rmon; + Process *rp; - ASSERT(mon->type == MON_ORIGIN); - ASSERT(is_internal_pid(mon->pid)); - rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK); - if (!rp) { - goto done; - } - rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - if (rmon == NULL) { - goto done; + ASSERT(is_internal_pid(mon->pid)); + rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK); + if (!rp) { + goto done; + } + rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + if (rmon == NULL) { + goto done; + } + erts_destroy_monitor(rmon); + } break; + case MON_TARGET: { + port_fire_one_monitor(mon, vpsc); /* forward call */ + } break; } - erts_destroy_monitor(rmon); done: erts_destroy_monitor(mon); } @@ -4039,6 +4290,43 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc) erts_destroy_link(lnk); } +static void +port_fire_one_monitor(ErtsMonitor *mon, void *ctx0) +{ + Process *origin; + ErtsProcLocks origin_locks; + + if (mon->type != MON_TARGET || ! is_pid(mon->pid)) { + return; + } + /* + * Proceed here if someone monitors us, we (port) are the target and + * origin is some process + */ + origin_locks = ERTS_PROC_LOCKS_MSG_SEND | ERTS_PROC_LOCK_LINK; + + origin = erts_pid2proc(NULL, 0, mon->pid, origin_locks); + if (origin) { + DeclareTmpHeapNoproc(lhp,3); + SweepContext *ctx = (SweepContext *)ctx0; + ErtsMonitor *rmon; + Eterm watched = (is_atom(mon->name) + ? TUPLE2(lhp, mon->name, erts_this_dist_entry->sysname) + : ctx->port->common.id); + + erts_queue_monitor_message(origin, &origin_locks, mon->ref, am_port, + watched, ctx->reason); + UnUseTmpHeapNoproc(3); + + rmon = erts_remove_monitor(&ERTS_P_MONITORS(origin), mon->ref); + erts_smp_proc_unlock(origin, origin_locks); + + if (rmon) { + erts_destroy_monitor(rmon); + } + } +} + /* 'from' is sending 'this_port' an exit signal, (this_port must be internal). * If reason is normal we don't do anything, *unless* from is our connected * process in which case we close the port. Any other reason kills the port. @@ -4050,39 +4338,40 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc) */ int -erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed, +erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed, int drop_normal) { ErtsLink *lnk; - Eterm rreason; + Eterm modified_reason; erts_aint32_t state, set_state_flags; ERTS_SMP_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - rreason = (reason == am_kill) ? am_killed : reason; + modified_reason = (reason == am_kill) ? am_killed : reason; #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_exit)) { DTRACE_CHARBUF(from_str, DTRACE_TERM_BUF_SIZE); DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); - DTRACE_CHARBUF(rreason_str, 64); + DTRACE_CHARBUF(reason_str, 64); erts_snprintf(from_str, sizeof(DTRACE_CHARBUF_NAME(from_str)), "%T", from); - dtrace_port_str(p, port_str); - erts_snprintf(rreason_str, sizeof(DTRACE_CHARBUF_NAME(rreason_str)), "%T", rreason); - DTRACE4(port_exit, from_str, port_str, p->name, rreason_str); + dtrace_port_str(prt, port_str); + erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)), "%T", + modified_reason); + DTRACE4(port_exit, from_str, port_str, prt->name, reason_str); } #endif - state = erts_atomic32_read_nob(&p->state); + state = erts_atomic32_read_nob(&prt->state); if (state & (ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_EXITING | ERTS_PORT_SFLG_CLOSING)) return 0; - if (reason == am_normal && from != ERTS_PORT_GET_CONNECTED(p) - && from != p->common.id && drop_normal) { + if (reason == am_normal && from != ERTS_PORT_GET_CONNECTED(prt) + && from != prt->common.id && drop_normal) { return 0; } @@ -4090,53 +4379,54 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed, if (send_closed) set_state_flags |= ERTS_PORT_SFLG_SEND_CLOSED; - erts_port_task_sched_enter_exiting_state(&p->sched); + erts_port_task_sched_enter_exiting_state(&prt->sched); - state = erts_atomic32_read_bor_mb(&p->state, set_state_flags); + state = erts_atomic32_read_bor_mb(&prt->state, set_state_flags); state |= set_state_flags; - if (IS_TRACED_FL(p, F_TRACE_PORTS)) - trace_port(p, am_closed, reason); + if (IS_TRACED_FL(prt, F_TRACE_PORTS)) + trace_port(prt, am_closed, reason); - erts_trace_check_exiting(p->common.id); + erts_trace_check_exiting(prt->common.id); - set_busy_port(ERTS_Port2ErlDrvPort(p), 0); + set_busy_port(ERTS_Port2ErlDrvPort(prt), 0); - if (p->common.u.alive.reg != NULL) - (void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name); + if (prt->common.u.alive.reg != NULL) + (void) erts_unregister_name(NULL, 0, prt, prt->common.u.alive.reg->name); { - SweepContext sc = {p, rreason}; - lnk = ERTS_P_LINKS(p); - ERTS_P_LINKS(p) = NULL; + SweepContext sc = {prt, modified_reason}; + lnk = ERTS_P_LINKS(prt); + ERTS_P_LINKS(prt) = NULL; erts_sweep_links(lnk, &sweep_one_link, &sc); } - DRV_MONITOR_LOCK_PDL(p); + DRV_MONITOR_LOCK_PDL(prt); { - ErtsMonitor *moni = ERTS_P_MONITORS(p); - ERTS_P_MONITORS(p) = NULL; - erts_sweep_monitors(moni, &sweep_one_monitor, NULL); + SweepContext ctx = {prt, modified_reason}; + ErtsMonitor *moni = ERTS_P_MONITORS(prt); + ERTS_P_MONITORS(prt) = NULL; + erts_sweep_monitors(moni, &sweep_one_monitor, &ctx); } - DRV_MONITOR_UNLOCK_PDL(p); + DRV_MONITOR_UNLOCK_PDL(prt); - if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) { - erts_do_net_exits(p->dist_entry, rreason); - erts_deref_dist_entry(p->dist_entry); - p->dist_entry = NULL; - erts_atomic32_read_band_relb(&p->state, + if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && prt->dist_entry) { + erts_do_net_exits(prt->dist_entry, modified_reason); + erts_deref_dist_entry(prt->dist_entry); + prt->dist_entry = NULL; + erts_atomic32_read_band_relb(&prt->state, ~ERTS_PORT_SFLG_DISTRIBUTION); } - if ((reason != am_kill) && !is_port_ioq_empty(p)) { + if ((reason != am_kill) && !is_port_ioq_empty(prt)) { /* must turn exiting flag off */ - erts_atomic32_read_bset_relb(&p->state, + erts_atomic32_read_bset_relb(&prt->state, (ERTS_PORT_SFLG_EXITING | ERTS_PORT_SFLG_CLOSING), ERTS_PORT_SFLG_CLOSING); - flush_port(p); + flush_port(prt); } else { - terminate_port(p); + terminate_port(prt); } return 1; diff --git a/erts/emulator/beam/register.c b/erts/emulator/beam/register.c index 77f79fcea4..ac7096745e 100644 --- a/erts/emulator/beam/register.c +++ b/erts/emulator/beam/register.c @@ -323,7 +323,8 @@ erts_whereis_name(Process *c_p, Process** proc, ErtsProcLocks need_locks, int flags, - Port** port) + Port** port, + int lock_port) { RegProc* rp = NULL; HashValue hval; @@ -406,31 +407,33 @@ erts_whereis_name(Process *c_p, *port = NULL; else { #ifdef ERTS_SMP - if (pending_port == rp->pt) - pending_port = NULL; - else { - if (pending_port) { - /* Ahh! Registered port changed while reg lock - was unlocked... */ - erts_port_release(pending_port); - pending_port = NULL; - } + if (lock_port) { + if (pending_port == rp->pt) + pending_port = NULL; + else { + if (pending_port) { + /* Ahh! Registered port changed while reg lock + was unlocked... */ + erts_port_release(pending_port); + pending_port = NULL; + } - if (erts_smp_port_trylock(rp->pt) == EBUSY) { - Eterm id = rp->pt->common.id; /* id read only... */ - /* Unlock all locks, acquire port lock, and restart... */ - if (current_c_p_locks) { - erts_smp_proc_unlock(c_p, current_c_p_locks); - current_c_p_locks = 0; - } - reg_read_unlock(); - pending_port = erts_id2port(id); - goto restart; - } - } + if (erts_smp_port_trylock(rp->pt) == EBUSY) { + Eterm id = rp->pt->common.id; /* id read only... */ + /* Unlock all locks, acquire port lock, and restart... */ + if (current_c_p_locks) { + erts_smp_proc_unlock(c_p, current_c_p_locks); + current_c_p_locks = 0; + } + reg_read_unlock(); + pending_port = erts_id2port(id); + goto restart; + } + } + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(rp->pt)); + } #endif *port = rp->pt; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(*port)); } } @@ -452,7 +455,7 @@ erts_whereis_process(Process *c_p, int flags) { Process *proc; - erts_whereis_name(c_p, c_p_locks, name, &proc, need_locks, flags, NULL); + erts_whereis_name(c_p, c_p_locks, name, &proc, need_locks, flags, NULL, 0); return proc; } diff --git a/erts/emulator/beam/register.h b/erts/emulator/beam/register.h index 88ab7b7bf1..d839f55d6b 100644 --- a/erts/emulator/beam/register.h +++ b/erts/emulator/beam/register.h @@ -49,7 +49,7 @@ int erts_register_name(Process *, Eterm, Eterm); Eterm erts_whereis_name_to_id(Process *, Eterm); void erts_whereis_name(Process *, ErtsProcLocks, Eterm, Process**, ErtsProcLocks, int, - Port**); + Port**, int); Process *erts_whereis_process(Process *, ErtsProcLocks, Eterm, diff --git a/erts/emulator/test/monitor_SUITE.erl b/erts/emulator/test/monitor_SUITE.erl index 8955e62df5..90d2bd8c5d 100644 --- a/erts/emulator/test/monitor_SUITE.erl +++ b/erts/emulator/test/monitor_SUITE.erl @@ -21,6 +21,7 @@ -module(monitor_SUITE). -include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). -export([all/0, suite/0, groups/0, case_1/1, case_1a/1, case_2/1, case_2a/1, mon_e_1/1, demon_e_1/1, demon_1/1, @@ -706,7 +707,7 @@ named_down(Config) when is_list(Config) -> spawn_opt(fun () -> WFun = fun (F, hej) -> F(F, hopp); -(F, hopp) -> F(F, hej) + (F, hopp) -> F(F, hej) end, NoSchedulers = erlang:system_info(schedulers_online), lists:foreach(fun (_) -> @@ -726,13 +727,14 @@ named_down(Config) when is_list(Config) -> NamedProc = spawn_link(fun () -> receive after infinity -> ok end end), - true = register(Name, NamedProc), + ?assertEqual(true, register(Name, NamedProc)), unlink(NamedProc), exit(NamedProc, bang), Mon = erlang:monitor(process, Name), - receive {'DOWN',Mon, _, _, _} -> ok end, - true = register(Name, self()), - true = unregister(Name), + receive {'DOWN',Mon, _, _, bang} -> ok + after 3000 -> ?assert(false) end, + ?assertEqual(true, register(Name, self())), + ?assertEqual(true, unregister(Name)), process_flag(priority,Prio), ok. diff --git a/erts/emulator/test/port_SUITE.erl b/erts/emulator/test/port_SUITE.erl index 79abcbde5f..ee07699884 100644 --- a/erts/emulator/test/port_SUITE.erl +++ b/erts/emulator/test/port_SUITE.erl @@ -74,27 +74,68 @@ %% --export([all/0, suite/0, groups/0, - init_per_testcase/2, end_per_testcase/2, - init_per_suite/1, end_per_suite/1, - stream_small/1, stream_big/1, - basic_ping/1, slow_writes/1, bad_packet/1, bad_port_messages/1, - mul_basic/1, mul_slow_writes/1, - dying_port/1, port_program_with_path/1, - open_input_file_port/1, open_output_file_port/1, - count_fds/1, - iter_max_ports/1, eof/1, input_only/1, output_only/1, - name1/1, - t_binary/1, parallell/1, t_exit/1, - env/1, huge_env/1, bad_env/1, cd/1, exit_status/1, - bad_args/1, - tps_16_bytes/1, tps_1K/1, line/1, stderr_to_stdout/1, - otp_3906/1, otp_4389/1, win_massive/1, win_massive_client/1, - mix_up_ports/1, otp_5112/1, otp_5119/1, otp_6224/1, - exit_status_multi_scheduling_block/1, ports/1, - spawn_driver/1, spawn_executable/1, close_deaf_port/1, - port_setget_data/1, - unregister_name/1, parallelism_option/1]). +-export([all/0, suite/0, groups/0, init_per_testcase/2, end_per_testcase/2, + init_per_suite/1, end_per_suite/1]). +-export([ + bad_args/1, + bad_env/1, + bad_packet/1, + bad_port_messages/1, + basic_ping/1, + cd/1, + close_deaf_port/1, + count_fds/1, + dying_port/1, + env/1, + eof/1, + exit_status/1, + exit_status_multi_scheduling_block/1, + huge_env/1, + input_only/1, + iter_max_ports/1, + line/1, + mix_up_ports/1, + mon_port_invalid_type/1, + mon_port_bad_named/1, + mon_port_bad_remote_on_local/1, + mon_port_local/1, + mon_port_name_demonitor/1, + mon_port_named/1, + mon_port_origin_dies/1, + mon_port_pid_demonitor/1, + mon_port_remote_on_remote/1, + mon_port_driver_die/1, + mon_port_driver_die_demonitor/1, + mul_basic/1, + mul_slow_writes/1, + name1/1, + open_input_file_port/1, + open_output_file_port/1, + otp_3906/1, + otp_4389/1, + otp_5112/1, + otp_5119/1, + otp_6224/1, + output_only/1, + parallelism_option/1, + parallell/1, + port_program_with_path/1, + port_setget_data/1, + ports/1, + slow_writes/1, + spawn_driver/1, + spawn_executable/1, + stderr_to_stdout/1, + stream_big/1, + stream_small/1, + t_binary/1, + t_exit/1, + tps_16_bytes/1, + tps_1K/1, + unregister_name/1, + win_massive/1, + win_massive_client/1 +]). -export([do_iter_max_ports/2]). @@ -105,12 +146,13 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("kernel/include/file.hrl"). +-include_lib("eunit/include/eunit.hrl"). suite() -> [{ct_hooks,[ts_install_cth]}, {timetrap, {seconds, 10}}]. -all() -> +all() -> [otp_6224, {group, stream}, basic_ping, slow_writes, bad_packet, bad_port_messages, {group, options}, {group, multiple_packets}, parallell, dying_port, @@ -123,14 +165,32 @@ all() -> exit_status_multi_scheduling_block, ports, spawn_driver, spawn_executable, close_deaf_port, unregister_name, port_setget_data, - parallelism_option]. - -groups() -> + parallelism_option, + mon_port_invalid_type, + mon_port_local, + mon_port_remote_on_remote, + mon_port_bad_remote_on_local, + mon_port_origin_dies, + mon_port_named, + mon_port_bad_named, + mon_port_pid_demonitor, + mon_port_name_demonitor, + mon_port_driver_die, + mon_port_driver_die_demonitor + ]. + +groups() -> [{stream, [], [stream_small, stream_big]}, {options, [], [t_binary, eof, input_only, output_only]}, {multiple_packets, [], [mul_basic, mul_slow_writes]}, {tps, [], [tps_16_bytes, tps_1K]}]. +init_per_testcase(Case, Config) when Case =:= mon_port_driver_die; + Case =:= mon_port_driver_die_demonitor -> + case erlang:system_info(schedulers_online) of + 1 -> {skip, "Need 2 schedulers to run testcase"}; + _ -> Config + end; init_per_testcase(Case, Config) -> [{testcase, Case} |Config]. @@ -160,7 +220,7 @@ do_win_massive() -> ct:timetrap({minutes, 6}), SuiteDir = filename:dirname(code:which(?MODULE)), Ports = " +Q 8192", - {ok, Node} = + {ok, Node} = test_server:start_node(win_massive, slave, [{args, " -pa " ++ SuiteDir ++ Ports}]), @@ -169,7 +229,7 @@ do_win_massive() -> ok. win_massive_client(N) -> - {ok,P}=gen_tcp:listen(?WIN_MASSIVE_PORT,[{reuseaddr,true}]), + {ok,P}=gen_tcp:listen(?WIN_MASSIVE_PORT,[{reuseaddr,true}]), L = win_massive_loop(P,N), Len = length(L), lists:foreach(fun(E) -> @@ -278,7 +338,7 @@ bad_port_messages(Config) when is_list(Config) -> bad_message(PortTest, {self(),{connect,no_pid}}), ok. -bad_message(PortTest, Message) -> +bad_message(PortTest, Message) -> P = open_port({spawn,PortTest}, []), P ! Message, receive @@ -773,7 +833,7 @@ line(Config) when is_list(Config) -> S1 = lists:flatten(io_lib:format("-l~w", [length(L1)])), io:format("S1 = ~w, L1 = ~w~n", [S1,L1]), port_expect(Config,[{L1, - [{eol, Packet1}, {noeol, Packet2}, eof]}], 0, + [{eol, Packet1}, {noeol, Packet2}, eof]}], 0, S1, [{line,Siz},eof]), %% Test that lonely Don't get treated as newlines port_expect(Config,[{lists:append([Packet1, [13], Packet2, @@ -844,9 +904,9 @@ env(Config) when is_list(Config) -> {"glurf","a glorfy string"}]), %% A lot of non existing variables (mingled with existing) - NotExistingList = [{lists:flatten(io_lib:format("V~p_not_existing",[X])),false} + NotExistingList = [{lists:flatten(io_lib:format("V~p_not_existing",[X])),false} || X <- lists:seq(1,150)], - ExistingList = [{lists:flatten(io_lib:format("V~p_existing",[X])),"a_value"} + ExistingList = [{lists:flatten(io_lib:format("V~p_existing",[X])),"a_value"} || X <- lists:seq(1,150)], env_slave(Temp, lists:sort(ExistingList ++ NotExistingList)), ok. @@ -1320,22 +1380,22 @@ spawn_driver(Config) when is_list(Config) -> ok = load_driver(Path, "echo_drv"), Port = erlang:open_port({spawn_driver, "echo_drv"}, []), Port ! {self(), {command, "Hello port!"}}, - receive - {Port, {data, "Hello port!"}} = Msg1 -> + receive + {Port, {data, "Hello port!"}} = Msg1 -> io:format("~p~n", [Msg1]), - ok; + ok; Other -> ct:fail({unexpected, Other}) end, Port ! {self(), close}, receive {Port, closed} -> ok end, - Port2 = erlang:open_port({spawn_driver, "echo_drv -Hello port?"}, + Port2 = erlang:open_port({spawn_driver, "echo_drv -Hello port?"}, []), - receive - {Port2, {data, "Hello port?"}} = Msg2 -> + receive + {Port2, {data, "Hello port?"}} = Msg2 -> io:format("~p~n", [Msg2]), - ok; + ok; Other2 -> ct:fail({unexpected2, Other2}) end, @@ -1354,23 +1414,23 @@ parallelism_option(Config) when is_list(Config) -> [{parallelism, true}]), {parallelism, true} = erlang:port_info(Port, parallelism), Port ! {self(), {command, "Hello port!"}}, - receive - {Port, {data, "Hello port!"}} = Msg1 -> + receive + {Port, {data, "Hello port!"}} = Msg1 -> io:format("~p~n", [Msg1]), - ok; + ok; Other -> ct:fail({unexpected, Other}) end, Port ! {self(), close}, receive {Port, closed} -> ok end, - Port2 = erlang:open_port({spawn_driver, "echo_drv -Hello port?"}, + Port2 = erlang:open_port({spawn_driver, "echo_drv -Hello port?"}, [{parallelism, false}]), {parallelism, false} = erlang:port_info(Port2, parallelism), - receive - {Port2, {data, "Hello port?"}} = Msg2 -> + receive + {Port2, {data, "Hello port?"}} = Msg2 -> io:format("~p~n", [Msg2]), - ok; + ok; Other2 -> ct:fail({unexpected2, Other2}) end, @@ -1389,20 +1449,20 @@ spawn_executable(Config) when is_list(Config) -> ["echo_args"] = run_echo_args(DataDir,[binary, "echo_args"]), ["echo_arguments"] = run_echo_args(DataDir,["echo_arguments"]), ["echo_arguments"] = run_echo_args(DataDir,[binary, "echo_arguments"]), - [ExactFile1,"hello world","dlrow olleh"] = + [ExactFile1,"hello world","dlrow olleh"] = run_echo_args(DataDir,[ExactFile1,"hello world","dlrow olleh"]), [ExactFile1] = run_echo_args(DataDir,[default]), [ExactFile1] = run_echo_args(DataDir,[binary, default]), - [ExactFile1,"hello world","dlrow olleh"] = + [ExactFile1,"hello world","dlrow olleh"] = run_echo_args(DataDir,[switch_order,ExactFile1,"hello world", "dlrow olleh"]), - [ExactFile1,"hello world","dlrow olleh"] = + [ExactFile1,"hello world","dlrow olleh"] = run_echo_args(DataDir,[binary,switch_order,ExactFile1,"hello world", "dlrow olleh"]), [ExactFile1,"hello world","dlrow olleh"] = run_echo_args(DataDir,[default,"hello world","dlrow olleh"]), - [ExactFile1,"hello world","dlrow olleh"] = + [ExactFile1,"hello world","dlrow olleh"] = run_echo_args_2("\""++ExactFile1++"\" "++"\"hello world\" \"dlrow olleh\""), [ExactFile1,"hello world","dlrow olleh"] = run_echo_args_2(unicode:characters_to_binary("\""++ExactFile1++"\" "++"\"hello world\" \"dlrow olleh\"")), @@ -1418,7 +1478,7 @@ spawn_executable(Config) when is_list(Config) -> [ExactFile2] = run_echo_args(SpaceDir,[]), ["echo_args"] = run_echo_args(SpaceDir,["echo_args"]), ["echo_arguments"] = run_echo_args(SpaceDir,["echo_arguments"]), - [ExactFile2,"hello world","dlrow olleh"] = + [ExactFile2,"hello world","dlrow olleh"] = run_echo_args(SpaceDir,[ExactFile2,"hello world","dlrow olleh"]), [ExactFile2,"hello world","dlrow olleh"] = run_echo_args(SpaceDir,[binary, ExactFile2,"hello world","dlrow olleh"]), @@ -1429,16 +1489,16 @@ spawn_executable(Config) when is_list(Config) -> run_echo_args(SpaceDir,[binary, ExactFile2,"hello \"world\"","\"dlrow\" olleh"]), [ExactFile2] = run_echo_args(SpaceDir,[default]), - [ExactFile2,"hello world","dlrow olleh"] = + [ExactFile2,"hello world","dlrow olleh"] = run_echo_args(SpaceDir,[switch_order,ExactFile2,"hello world", "dlrow olleh"]), - [ExactFile2,"hello world","dlrow olleh"] = + [ExactFile2,"hello world","dlrow olleh"] = run_echo_args(SpaceDir,[default,"hello world","dlrow olleh"]), - [ExactFile2,"hello world","dlrow olleh"] = + [ExactFile2,"hello world","dlrow olleh"] = run_echo_args_2("\""++ExactFile2++"\" "++"\"hello world\" \"dlrow olleh\""), [ExactFile2,"hello world","dlrow olleh"] = run_echo_args_2(unicode:characters_to_binary("\""++ExactFile2++"\" "++"\"hello world\" \"dlrow olleh\"")), - ExeExt = + ExeExt = case string:to_lower(lists:last(string:tokens(ExactFile2,"."))) of "exe" -> ".exe"; @@ -1452,17 +1512,17 @@ spawn_executable(Config) when is_list(Config) -> [ExactFile3] = run_echo_args(SpaceDir,Executable2,[]), ["echo_args"] = run_echo_args(SpaceDir,Executable2,["echo_args"]), ["echo_arguments"] = run_echo_args(SpaceDir,Executable2,["echo_arguments"]), - [ExactFile3,"hello world","dlrow olleh"] = + [ExactFile3,"hello world","dlrow olleh"] = run_echo_args(SpaceDir,Executable2,[ExactFile3,"hello world","dlrow olleh"]), [ExactFile3] = run_echo_args(SpaceDir,Executable2,[default]), - [ExactFile3,"hello world","dlrow olleh"] = + [ExactFile3,"hello world","dlrow olleh"] = run_echo_args(SpaceDir,Executable2, [switch_order,ExactFile3,"hello world", "dlrow olleh"]), - [ExactFile3,"hello world","dlrow olleh"] = + [ExactFile3,"hello world","dlrow olleh"] = run_echo_args(SpaceDir,Executable2, [default,"hello world","dlrow olleh"]), - [ExactFile3,"hello world","dlrow olleh"] = + [ExactFile3,"hello world","dlrow olleh"] = run_echo_args_2("\""++ExactFile3++"\" "++"\"hello world\" \"dlrow olleh\""), [ExactFile3,"hello world","dlrow olleh"] = run_echo_args_2(unicode:characters_to_binary("\""++ExactFile3++"\" "++"\"hello world\" \"dlrow olleh\"")), @@ -1510,11 +1570,11 @@ test_bat_file(Dir) -> <<"\r\n">>], file:write_file(Full,list_to_binary(D)), EF = filename:basename(FN), - [DN,"hello","world"] = + [DN,"hello","world"] = run_echo_args(Dir,FN, [default,"hello","world"]), %% The arg0 argumant should be ignored when running batch files - [DN,"hello","world"] = + [DN,"hello","world"] = run_echo_args(Dir,FN, ["knaskurt","hello","world"]), EF = filename:basename(DN), @@ -1533,10 +1593,10 @@ test_sh_file(Dir) -> <<"done\n">>], file:write_file(Full,list_to_binary(D)), chmodplusx(Full), - [Full,"hello","world"] = + [Full,"hello","world"] = run_echo_args(Dir,FN, [default,"hello","world"]), - [Full,"hello","world of spaces"] = + [Full,"hello","world of spaces"] = run_echo_args(Dir,FN, [default,"hello","world of spaces"]), file:write_file(filename:join([Dir,"testfile1"]),<<"testdata1">>), @@ -1544,7 +1604,7 @@ test_sh_file(Dir) -> Pattern = filename:join([Dir,"testfile*"]), L = filelib:wildcard(Pattern), 2 = length(L), - [Full,"hello",Pattern] = + [Full,"hello",Pattern] = run_echo_args(Dir,FN, [default,"hello",Pattern]), ok. @@ -1620,10 +1680,10 @@ mix_up_ports(Config) when is_list(Config) -> ok = load_driver(Path, "echo_drv"), Port = erlang:open_port({spawn, "echo_drv"}, []), Port ! {self(), {command, "Hello port!"}}, - receive - {Port, {data, "Hello port!"}} = Msg1 -> + receive + {Port, {data, "Hello port!"}} = Msg1 -> io:format("~p~n", [Msg1]), - ok; + ok; Other -> ct:fail({unexpected, Other}) end, @@ -1631,7 +1691,7 @@ mix_up_ports(Config) when is_list(Config) -> receive {Port, closed} -> ok end, loop(start, done, fun(P) -> - Q = + Q = (catch erlang:open_port({spawn, "echo_drv"}, [])), %% io:format("~p ", [Q]), if is_port(Q) -> @@ -1642,7 +1702,7 @@ mix_up_ports(Config) when is_list(Config) -> end end), Port ! {self(), {command, "Hello again port!"}}, - receive + receive Msg2 -> ct:fail({unexpected, Msg2}) after 1000 -> @@ -1802,7 +1862,7 @@ exit_status_msb_test(Config, SleepSecs) when is_list(Config) -> %% We want to start port programs from as many schedulers as possible %% and we want these port programs to terminate while multi-scheduling %% is blocked. - %% + %% NoSchedsOnln = erlang:system_info(schedulers_online), Parent = self(), io:format("SleepSecs = ~p~n", [SleepSecs]), @@ -2214,7 +2274,7 @@ ports_snapshots(0, _, _) -> ok; ports_snapshots(Iter, TrafficPid, OtherPorts) -> - TrafficPid ! start, + TrafficPid ! start, receive after 1 -> ok end, Snapshot = erlang:ports(), @@ -2243,7 +2303,7 @@ ports_traffic_stopped(MaxPorts, {PortList, PortCnt}) -> end. ports_traffic_started(MaxPorts, {PortList, PortCnt}, EventList) -> - receive + receive {Pid, stop} -> %%io:format("Traffic stopped in ~p\n",[self()]), Pid ! {self(), EventList, PortList}, @@ -2256,7 +2316,7 @@ ports_traffic_started(MaxPorts, {PortList, PortCnt}, EventList) -> ports_traffic_do(MaxPorts, {PortList, PortCnt}, EventList) -> N = uniform(MaxPorts), case N > PortCnt of - true -> % Open port + true -> % Open port P = open_port({spawn, "exit_drv"}, []), %%io:format("Created port ~p\n",[P]), ports_traffic_started(MaxPorts, {[P|PortList], PortCnt+1}, @@ -2270,7 +2330,7 @@ ports_traffic_do(MaxPorts, {PortList, PortCnt}, EventList) -> [{close,P}|EventList]) end. -ports_verify(Ports, PortsAfter, EventList) -> +ports_verify(Ports, PortsAfter, EventList) -> %%io:format("Candidate=~p\nEvents=~p\n", [PortsAfter, EventList]), case lists:sort(Ports) =:= lists:sort(PortsAfter) of true -> @@ -2280,10 +2340,10 @@ ports_verify(Ports, PortsAfter, EventList) -> %% Note that we track the event list "backwards", undoing open/close: case EventList of [{open,P} | Tail] -> - ports_verify(Ports, lists:delete(P,PortsAfter), Tail); + ports_verify(Ports, lists:delete(P,PortsAfter), Tail); [{close,P} | Tail] -> - ports_verify(Ports, [P | PortsAfter], Tail); + ports_verify(Ports, [P | PortsAfter], Tail); [] -> ct:fail("Inconsistent snapshot from erlang:ports()") @@ -2391,3 +2451,227 @@ wait_until(Fun) -> receive after 100 -> ok end, wait_until(Fun) end. + +%% Attempt to monitor pid as port, and port as pid +mon_port_invalid_type(_Config) -> + Port = hd(erlang:ports()), + ?assertError(badarg, erlang:monitor(port, self())), + ?assertError(badarg, erlang:monitor(process, Port)), + ok. + +%% With local port +mon_port_local(Config) -> + Port1 = create_port(Config, ["-h1", "-q"]), % will close after we send 1 byte + Ref1 = erlang:monitor(port, Port1), + ?assertMatch({proc_monitors, true, port_monitored_by, true}, + port_is_monitored(self(), Port1)), + Port1 ! {self(), {command, <<"1">>}}, % port test will close self immediately + receive ExitP1 -> ?assertMatch({'DOWN', Ref1, port, Port1, _}, ExitP1) + after 1000 -> ?assert(false) end, + ?assertMatch({proc_monitors, false, port_monitored_by, false}, + port_is_monitored(self(), Port1)), + + %% Trying to re-monitor a port which exists but is not healthy will + %% succeed but then will immediately send DOWN + Ref2 = erlang:monitor(port, Port1), + receive ExitP2 -> ?assertMatch({'DOWN', Ref2, port, Port1, _}, ExitP2) + after 1000 -> ?assert(false) end, + ok. + +%% With remote port on remote node (should fail) +mon_port_remote_on_remote(_Config) -> + Port3 = binary_to_term(<<131, 102, % Ext term format: PORT_EXT + 100, 0, 13, "fgsfds@fgsfds", % Node :: ATOM_EXT + 1:32/big, % Id + 0>>), % Creation + ?assertError(badarg, erlang:monitor(port, Port3)), + ok. + +%% Remote port belongs to this node and does not exist +%% Port4 produces #Port<0.167772160> which should not exist in a test run +mon_port_bad_remote_on_local(_Config) -> + Port4 = binary_to_term(<<131, 102, % Ext term format: PORT_EXT + 100, 0, 13, "nonode@nohost", % Node + 167772160:32/big, % Id + 0>>), % Creation + ?assertError(badarg, erlang:monitor(port, Port4)), + ok. + +%% Monitor owner (origin) dies before port is closed +mon_port_origin_dies(Config) -> + Port5 = create_port(Config, ["-h1", "-q"]), % will close after we send 1 byte + Self5 = self(), + Proc5 = spawn(fun() -> + Self5 ! test5_started, + erlang:monitor(port, Port5), + receive stop -> ok end + end), + erlang:monitor(process, Proc5), % we want to sync with its death + receive test5_started -> ok + after 1000 -> ?assert(false) end, + ?assertMatch({proc_monitors, true, port_monitored_by, true}, + port_is_monitored(Proc5, Port5)), + Proc5 ! stop, + % receive from monitor (removing race condition) + receive ExitP5 -> ?assertMatch({'DOWN', _, process, Proc5, _}, ExitP5) + after 1000 -> ?assert(false) end, + ?assertMatch({proc_monitors, false, port_monitored_by, false}, + port_is_monitored(Proc5, Port5)), + Port5 ! {self(), {command, <<"1">>}}, % make port quit + ok. + +%% Monitor a named port +mon_port_named(Config) -> + Name6 = test_port6, + Port6 = create_port(Config, ["-h1", "-q"]), % will close after we send 1 byte + erlang:register(Name6, Port6), + erlang:monitor(port, Name6), + ?assertMatch({proc_monitors, true, port_monitored_by, true}, + port_is_monitored(self(), Name6)), + Port6 ! {self(), {command, <<"1">>}}, % port test will close self immediately + receive ExitP6 -> ?assertMatch({'DOWN', _, port, {Name6, _}, _}, ExitP6) + after 1000 -> ?assert(false) end, + ?assertMatch({proc_monitors, false, port_monitored_by, false}, + port_is_monitored(self(), Name6)), + ok. + +%% Named does not exist: Should succeed but immediately send 'DOWN' +mon_port_bad_named(_Config) -> + Name7 = test_port7, + erlang:monitor(port, Name7), + receive {'DOWN', _, port, {Name7, _}, noproc} -> ok + after 1000 -> ?assert(false) end, + ok. + +%% Monitor a pid and demonitor by ref +mon_port_pid_demonitor(Config) -> + Port8 = create_port(Config, ["-h1", "-q"]), % will close after we send 1 byte + Ref8 = erlang:monitor(port, Port8), + ?assertMatch({proc_monitors, true, port_monitored_by, true}, + port_is_monitored(self(), Port8)), + erlang:demonitor(Ref8), + ?assertMatch({proc_monitors, false, port_monitored_by, false}, + port_is_monitored(self(), Port8)), + Port8 ! {self(), {command, <<"1">>}}, % port test will close self immediately + ok. + +%% Monitor by name and demonitor by ref +mon_port_name_demonitor(Config) -> + Name9 = test_port9, + Port9 = create_port(Config, ["-h1", "-q"]), % will close after we send 1 byte + erlang:register(Name9, Port9), + Ref9 = erlang:monitor(port, Name9), + ?assertMatch({proc_monitors, true, port_monitored_by, true}, + port_is_monitored(self(), Name9)), + erlang:demonitor(Ref9), + ?assertMatch({proc_monitors, false, port_monitored_by, false}, + port_is_monitored(self(), Name9)), + Port9 ! {self(), {command, <<"1">>}}, % port test will close self immediately + ok. + +%% 1. Spawn a port which will sleep 3 seconds +%% 2. Port driver and dies horribly (via C driver_failure call). This should +%% mark port as exiting or something. +%% 3. While the command happens, a monitor is requested on the port +mon_port_driver_die(Config) -> + erlang:process_flag(scheduler, 1), + + Path = proplists:get_value(data_dir, Config), + ok = load_driver(Path, "sleep_failure_drv"), + Port = open_port({spawn, "sleep_failure_drv"}, []), + + Self = self(), + erlang:spawn_opt(fun() -> + timer:sleep(250), + Ref = erlang:monitor(port, Port), + %% Now check that msg actually arrives + receive + {'DOWN', Ref, _Port2, _, _} = M -> Self ! M + after 3000 -> Self ! no_down_message + end + end,[{scheduler, 2}]), + Port ! {self(), {command, "Fail, please!"}}, + receive + A when is_atom(A) -> ?assertEqual(A, 'A_should_be_printed'); + {'DOWN', _R, port, Port, noproc} -> ok; + {'DOWN', _R, _P, _, _} = M -> ct:fail({got_wrong_down,M}) + after 5000 -> ?assert(false) + end, + ok. + + +%% 1. Spawn a port which will sleep 3 seconds +%% 2. Monitor port +%% 3. Port driver and dies horribly (via C driver_failure call). This should +%% mark port as exiting or something. +%% 4. While the command happens, a demonitor is requested on the port +mon_port_driver_die_demonitor(Config) -> + erlang:process_flag(scheduler, 1), + + Path = proplists:get_value(data_dir, Config), + ok = load_driver(Path, "sleep_failure_drv"), + Port = open_port({spawn, "sleep_failure_drv"}, []), + + Self = self(), + erlang:spawn_opt( + fun() -> + Ref = erlang:monitor(port, Port), + Self ! Ref, + timer:sleep(250), + erlang:demonitor(Ref), + %% Now check that msg still arrives, + %% the demon should have arrived after + %% the port exited + receive + {'DOWN', Ref, _Port2, _, _} = M -> Self ! M + after 3000 -> Self ! no_down_message + end + end,[{scheduler, 2}]), + Ref = receive R -> R end, + Port ! {self(), {command, "Fail, please!"}}, + receive + {'DOWN', Ref, port, Port, normal} -> ok; + {'DOWN', _R, _P, _, _} = M -> ct:fail({got_wrong_down,M}) + after 5000 -> ?assert(false) + end, + ok. + +%% @doc Makes a controllable port for testing. Underlying mechanism of this +%% port is not important, only important is our ability to close/kill it or +%% have it monitored. +create_port(Config, Args) -> + DataDir = ?config(data_dir, Config), + %% Borrow port test utility from port SUITE + Program = filename:join([DataDir, "port_test"]), + erlang:open_port({spawn_executable, Program}, [{args, Args}]). + +%% @doc Checks if process Pid exists, and if so, if its monitoring (or not) +%% the Port (or if port doesn't exist, we assume answer is no). +port_is_monitored(Pid, Port) when is_pid(Pid), is_port(Port) -> + %% Variant for when port is a port id (port()) + A = case erlang:process_info(Pid, monitors) of + undefined -> false; + {monitors, ProcMTargets} -> lists:member({port, Port}, ProcMTargets) + end, + B = case erlang:port_info(Port, monitored_by) of + undefined -> false; + {monitored_by, PortMonitors} -> lists:member(Pid, PortMonitors) + end, + {proc_monitors, A, port_monitored_by, B}; +port_is_monitored(Pid, PortName) when is_pid(Pid), is_atom(PortName) -> + %% Variant for when port is an atom + A = case erlang:process_info(Pid, monitors) of + undefined -> false; + {monitors, ProcMTargets} -> + lists:member({port, {PortName, node()}}, ProcMTargets) + end, + B = case erlang:whereis(PortName) of + undefined -> false; % name is not registered or is dead + PortId -> + case erlang:port_info(PortId, monitored_by) of + undefined -> false; % is dead + {monitored_by, PortMonitors} -> + lists:member(Pid, PortMonitors) + end + end, + {proc_monitors, A, port_monitored_by, B}. diff --git a/erts/emulator/test/port_SUITE_data/Makefile.src b/erts/emulator/test/port_SUITE_data/Makefile.src index ff822ae720..fb7685c4b6 100644 --- a/erts/emulator/test/port_SUITE_data/Makefile.src +++ b/erts/emulator/test/port_SUITE_data/Makefile.src @@ -4,7 +4,7 @@ CFLAGS = @CFLAGS@ -I@erl_include@ @DEFS@ CROSSLDFLAGS = @CROSSLDFLAGS@ PROGS = port_test@exe@ echo_args@exe@ dead_port@exe@ -DRIVERS = echo_drv@dll@ exit_drv@dll@ failure_drv@dll@ +DRIVERS = echo_drv@dll@ exit_drv@dll@ failure_drv@dll@ sleep_failure_drv@dll@ all: $(PROGS) $(DRIVERS) port_test.@EMULATOR@ diff --git a/erts/emulator/test/port_SUITE_data/sleep_failure_drv.c b/erts/emulator/test/port_SUITE_data/sleep_failure_drv.c new file mode 100644 index 0000000000..1f52646572 --- /dev/null +++ b/erts/emulator/test/port_SUITE_data/sleep_failure_drv.c @@ -0,0 +1,76 @@ +#include +#include "erl_driver.h" +#ifdef __WIN32__ +# include +#else +# include +#endif + +typedef struct _erl_drv_data FailureDrvData; + +static FailureDrvData *failure_drv_start(ErlDrvPort, char *); +static void failure_drv_stop(FailureDrvData *); +static void failure_drv_output(ErlDrvData, char *, ErlDrvSizeT); +static void failure_drv_finish(void); + +static ErlDrvEntry failure_drv_entry = { + NULL, /* init */ + failure_drv_start, + failure_drv_stop, + failure_drv_output, + NULL, /* ready_input */ + NULL, /* ready_output */ + "sleep_failure_drv", + NULL, /* finish */ + NULL, /* handle */ + NULL, /* control */ + NULL, /* timeout */ + NULL, /* outputv */ + NULL, /* ready_async */ + NULL, + NULL, + NULL, + ERL_DRV_EXTENDED_MARKER, + ERL_DRV_EXTENDED_MAJOR_VERSION, + ERL_DRV_EXTENDED_MINOR_VERSION, + 0, + NULL, + NULL, + NULL, +}; + + + +/* ------------------------------------------------------------------------- +** Entry functions +**/ + +DRIVER_INIT(failure_drv) +{ + return &failure_drv_entry; +} + +static FailureDrvData *failure_drv_start(ErlDrvPort port, char *command) { + void *void_ptr; + + return void_ptr = port; +} + +static void failure_drv_stop(FailureDrvData *data_p) { +} + +static void failure_drv_output(ErlDrvData drv_data, char *buf, ErlDrvSizeT len) { + FailureDrvData *data_p = (FailureDrvData *) drv_data; + void *void_ptr; + ErlDrvPort port = void_ptr = data_p; + +#ifdef __WIN32__ + Sleep(3000); +#else + sleep(3); +#endif + driver_failure(port, 0); +} + +static void failure_drv_finish() { +} -- cgit v1.2.3