aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
authorDmytro Lytovchenko <[email protected]>2016-05-25 14:37:03 +0200
committerLukas Larsson <[email protected]>2016-06-10 09:46:33 +0200
commit3ee5343415d6ae0ce1ff1c2a2555051431a9315e (patch)
tree50f0acc46c6f30bf9ce58b9851ca1a2cd92f1772 /erts/emulator/beam
parentd26c15e07229c90ba8353bd78d5406ada0f13271 (diff)
downloadotp-3ee5343415d6ae0ce1ff1c2a2555051431a9315e.tar.gz
otp-3ee5343415d6ae0ce1ff1c2a2555051431a9315e.tar.bz2
otp-3ee5343415d6ae0ce1ff1c2a2555051431a9315e.zip
erts: Add port monitors
* erlang:monitor/2 with port argument is added, erlang:demonitor, using port task API and avoiding locking; * port_info and process_info support for monitored ports (with named port monitors support); * Exit signals contain type 'process' or 'port'; * Propagation of port exit signals; * Self-cleaning when origin process dies with monitor on; * 8 test cases + testcase for port driver crashing; * Documentation for all of the above (monitor, demonitor, port_info and process_info) updated
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/bif.c410
-rw-r--r--erts/emulator/beam/erl_bif_info.c100
-rw-r--r--erts/emulator/beam/erl_bif_port.c6
-rw-r--r--erts/emulator/beam/erl_port.h58
-rw-r--r--erts/emulator/beam/erl_process.c21
-rw-r--r--erts/emulator/beam/io.c390
-rw-r--r--erts/emulator/beam/register.c51
-rw-r--r--erts/emulator/beam/register.h2
8 files changed, 783 insertions, 255 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index b18910e2c7..fc14061a44 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -282,20 +282,17 @@ res_no_proc: {
}
}
-#define ERTS_DEMONITOR_FALSE 2
-#define ERTS_DEMONITOR_TRUE 1
-#define ERTS_DEMONITOR_BADARG 0
-#define ERTS_DEMONITOR_YIELD_TRUE -1
-#define ERTS_DEMONITOR_INTERNAL_ERROR -2
-
-static int
+/* This function is allowed to return range of values handled by demonitor/1-2
+ * Namely: atoms true, false, yield, internal_error, badarg or THE_NON_VALUE
+ */
+static Eterm
remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to)
{
ErtsDSigData dsd;
ErtsMonitor *dmon;
ErtsMonitor *mon;
int code;
- int res;
+ Eterm res = am_false;
#ifndef ERTS_SMP
int stale_mon = 0;
#endif
@@ -328,7 +325,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to)
mon = erts_remove_monitor(&ERTS_P_MONITORS(c_p), ref);
erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_LINK);
- res = ERTS_DEMONITOR_TRUE;
+ res = am_true;
break;
case ERTS_DSIG_PREP_CONNECTED:
@@ -352,7 +349,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to)
* This is possible when smp support is enabled.
* 'DOWN' message just arrived.
*/
- res = ERTS_DEMONITOR_TRUE;
+ res = am_true;
}
else {
/*
@@ -367,16 +364,13 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to)
: mon->pid),
ref,
0);
- res = (code == ERTS_DSIG_SEND_YIELD
- ? ERTS_DEMONITOR_YIELD_TRUE
- : ERTS_DEMONITOR_TRUE);
+ res = (code == ERTS_DSIG_SEND_YIELD ? am_yield : am_true);
erts_destroy_monitor(dmon);
-
}
break;
default:
ASSERT(! "Invalid dsig prepare result");
- return ERTS_DEMONITOR_INTERNAL_ERROR;
+ return am_internal_error;
}
#ifndef ERTS_SMP
@@ -404,27 +398,96 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to)
return res;
}
-static int demonitor(Process *c_p, Eterm ref, Eterm *multip)
+static ERTS_INLINE void
+demonitor_local_process(Process *c_p, Eterm ref, Eterm to, Eterm *res)
+{
+ Process *rp = erts_pid2proc_opt(c_p,
+ ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK,
+ to,
+ ERTS_PROC_LOCK_LINK,
+ ERTS_P2P_FLG_ALLOW_OTHER_X);
+ ErtsMonitor *mon = erts_remove_monitor(&ERTS_P_MONITORS(c_p), ref);
+
+#ifndef ERTS_SMP
+ ASSERT(mon);
+#else
+ if (!mon)
+ *res = am_false;
+ else
+#endif
+ {
+ *res = am_true;
+ erts_destroy_monitor(mon);
+ }
+ if (rp) {
+ ErtsMonitor *rmon;
+ rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref);
+ if (rp != c_p)
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ if (rmon != NULL)
+ erts_destroy_monitor(rmon);
+ }
+ else {
+ ERTS_SMP_ASSERT_IS_NOT_EXITING(c_p);
+ }
+}
+
+static ERTS_INLINE BIF_RETTYPE
+demonitor_local_port(Process *origin, Eterm ref, Eterm target)
{
- ErtsMonitor *mon = NULL; /* The monitor entry to delete */
- Process *rp; /* Local target process */
- Eterm to = NIL; /* Monitor link traget */
- DistEntry *dep = NULL; /* Target's distribution entry */
- int deref_de = 0;
- int res;
- int unlock_link = 1;
+ BIF_RETTYPE res = am_false;
+ Port *port = erts_port_lookup_raw(target);
+
+ if (!port) {
+ BIF_ERROR(origin, BADARG);
+ }
+ erts_smp_proc_unlock(origin, ERTS_PROC_LOCK_LINK);
+
+ if (port) {
+ Eterm trap_ref;
+ switch (erts_port_demonitor(origin, ERTS_PORT_DEMONITOR_NORMAL,
+ port, ref, &trap_ref)) {
+ case ERTS_PORT_OP_DROPPED:
+ case ERTS_PORT_OP_BADARG:
+ break;
+ case ERTS_PORT_OP_SCHEDULED:
+ BIF_TRAP3(await_port_send_result_trap, origin, trap_ref,
+ am_busy_port, am_true);
+ /* the busy_port atom will never be returned, because it cannot be
+ * returned from erts_port_(de)monitor, but just in case if in future
+ * internal API changes - you may see this atom */
+ default:
+ break;
+ }
+ }
+ else {
+ ERTS_SMP_ASSERT_IS_NOT_EXITING(origin);
+ }
+ BIF_RET(res);
+}
+/* Can return atom true, false, yield, internal_error, badarg or
+ * THE_NON_VALUE if error occured or trap has been set up
+ */
+static
+BIF_RETTYPE demonitor(Process *c_p, Eterm ref, Eterm *multip)
+{
+ ErtsMonitor *mon = NULL; /* The monitor entry to delete */
+ Eterm to = NIL; /* Monitor link traget */
+ DistEntry *dep = NULL; /* Target's distribution entry */
+ int deref_de = 0;
+ BIF_RETTYPE res = am_false;
+ int unlock_link = 1;
erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_LINK);
if (is_not_internal_ref(ref)) {
- res = ERTS_DEMONITOR_BADARG;
+ res = am_badarg;
goto done; /* Cannot be this monitor's ref */
}
mon = erts_lookup_monitor(ERTS_P_MONITORS(c_p), ref);
if (!mon) {
- res = ERTS_DEMONITOR_FALSE;
goto done;
}
@@ -432,70 +495,50 @@ static int demonitor(Process *c_p, Eterm ref, Eterm *multip)
case MON_TIME_OFFSET:
*multip = am_true;
erts_demonitor_time_offset(ref);
- res = ERTS_DEMONITOR_TRUE;
+ res = am_true;
break;
case MON_ORIGIN:
to = mon->pid;
*multip = am_false;
if (is_atom(to)) {
- /* Monitoring a name at node to */
- ASSERT(is_node_name_atom(to));
- dep = erts_sysname_to_connected_dist_entry(to);
- ASSERT(dep != erts_this_dist_entry);
- if (dep)
- deref_de = 1;
+ /* Monitoring a name at node to */
+ ASSERT(is_node_name_atom(to));
+ dep = erts_sysname_to_connected_dist_entry(to);
+ ASSERT(dep != erts_this_dist_entry);
+ if (dep)
+ deref_de = 1;
+ } else if (is_port(to)) {
+ if (port_dist_entry(to) != erts_this_dist_entry) {
+ goto badarg;
+ }
+ res = demonitor_local_port(c_p, ref, to);
+ unlock_link = 0;
+ goto done;
} else {
- ASSERT(is_pid(to));
- dep = pid_dist_entry(to);
+ ASSERT(is_pid(to));
+ dep = pid_dist_entry(to);
}
if (dep != erts_this_dist_entry) {
- res = remote_demonitor(c_p, dep, ref, to);
- /* remote_demonitor() unlocks link lock on c_p */
- unlock_link = 0;
+ res = remote_demonitor(c_p, dep, ref, to);
+ /* remote_demonitor() unlocks link lock on c_p */
+ unlock_link = 0;
}
else { /* Local monitor */
- if (deref_de) {
- deref_de = 0;
- erts_deref_dist_entry(dep);
- }
- dep = NULL;
- rp = erts_pid2proc_opt(c_p,
- ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK,
- to,
- ERTS_PROC_LOCK_LINK,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- mon = erts_remove_monitor(&ERTS_P_MONITORS(c_p), ref);
-#ifndef ERTS_SMP
- ASSERT(mon);
-#else
- if (!mon)
- res = ERTS_DEMONITOR_FALSE;
- else
-#endif
- {
- res = ERTS_DEMONITOR_TRUE;
- erts_destroy_monitor(mon);
- }
- if (rp) {
- ErtsMonitor *rmon;
- rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref);
- if (rp != c_p)
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- if (rmon != NULL)
- erts_destroy_monitor(rmon);
- }
- else {
- ERTS_SMP_ASSERT_IS_NOT_EXITING(c_p);
- }
-
+ if (deref_de) {
+ deref_de = 0;
+ erts_deref_dist_entry(dep);
+ }
+ dep = NULL;
+ demonitor_local_process(c_p, ref, to, &res);
}
break;
- default:
- res = ERTS_DEMONITOR_BADARG;
+ default /* case */ :
+badarg:
+ res = am_badarg; /* will be converted to error by caller */
*multip = am_false;
break;
}
- done:
+done:
if (unlock_link)
erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_LINK);
@@ -506,21 +549,20 @@ static int demonitor(Process *c_p, Eterm ref, Eterm *multip)
}
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
- return res;
+ BIF_RET(res);
}
BIF_RETTYPE demonitor_1(BIF_ALIST_1)
{
Eterm multi;
switch (demonitor(BIF_P, BIF_ARG_1, &multi)) {
- case ERTS_DEMONITOR_FALSE:
- case ERTS_DEMONITOR_TRUE:
- BIF_RET(am_true);
- case ERTS_DEMONITOR_YIELD_TRUE:
- ERTS_BIF_YIELD_RETURN(BIF_P, am_true);
- case ERTS_DEMONITOR_BADARG:
- BIF_ERROR(BIF_P, BADARG);
- case ERTS_DEMONITOR_INTERNAL_ERROR:
+ case am_false:
+ case am_true: BIF_RET(am_true);
+ case THE_NON_VALUE: BIF_RET(THE_NON_VALUE);
+ case am_yield: ERTS_BIF_YIELD_RETURN(BIF_P, am_true);
+ case am_badarg: BIF_ERROR(BIF_P, BADARG);
+
+ case am_internal_error:
default:
ASSERT(! "demonitor(): internal error");
BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR);
@@ -529,11 +571,11 @@ BIF_RETTYPE demonitor_1(BIF_ALIST_1)
BIF_RETTYPE demonitor_2(BIF_ALIST_2)
{
- Eterm res = am_true;
- Eterm multi = am_false;
- int info = 0;
- int flush = 0;
- Eterm list = BIF_ARG_2;
+ BIF_RETTYPE res = am_true;
+ Eterm multi = am_false;
+ int info = 0;
+ int flush = 0;
+ Eterm list = BIF_ARG_2;
while (is_list(list)) {
Eterm* consp = list_val(list);
@@ -554,24 +596,27 @@ BIF_RETTYPE demonitor_2(BIF_ALIST_2)
goto badarg;
switch (demonitor(BIF_P, BIF_ARG_1, &multi)) {
- case ERTS_DEMONITOR_FALSE:
+ case THE_NON_VALUE:
+ /* If other error occured or trap has been set up - pass through */
+ BIF_RET(THE_NON_VALUE);
+ case am_false:
if (info)
res = am_false;
if (flush) {
- flush_messages:
+flush_messages:
BIF_TRAP3(flush_monitor_messages_trap, BIF_P,
BIF_ARG_1, multi, res);
}
- case ERTS_DEMONITOR_TRUE:
+ case am_true:
if (multi == am_true && flush)
goto flush_messages;
BIF_RET(res);
- case ERTS_DEMONITOR_YIELD_TRUE:
+ case am_yield:
ERTS_BIF_YIELD_RETURN(BIF_P, am_true);
- case ERTS_DEMONITOR_BADARG:
- badarg:
+ case am_badarg:
+badarg:
BIF_ERROR(BIF_P, BADARG);
- case ERTS_DEMONITOR_INTERNAL_ERROR:
+ case am_internal_error:
default:
ASSERT(! "demonitor(): internal error");
BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR);
@@ -615,14 +660,13 @@ erts_queue_monitor_message(Process *p,
erts_queue_message(p, *p_locksp, msgp, tup, am_system);
}
-static BIF_RETTYPE
+static Eterm
local_pid_monitor(Process *p, Eterm target, Eterm mon_ref, int boolean)
{
- BIF_RETTYPE ret;
- Process *rp;
+ Eterm ret = mon_ref;
+ Process *rp;
ErtsProcLocks p_locks = ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK;
- ERTS_BIF_PREP_RET(ret, mon_ref);
if (target == p->common.id) {
return ret;
}
@@ -658,40 +702,112 @@ local_pid_monitor(Process *p, Eterm target, Eterm mon_ref, int boolean)
}
static BIF_RETTYPE
-local_name_monitor(Process *p, Eterm target_name)
+local_port_monitor(Process *origin, Eterm target)
{
- BIF_RETTYPE ret;
- Eterm mon_ref;
- ErtsProcLocks p_locks = ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK;
- Process *rp;
+ BIF_RETTYPE ref = erts_make_ref(origin);
+ Port *port = erts_sig_lookup_port(origin, target);
+ ErtsProcLocks p_locks = ERTS_PROC_LOCK_MAIN;
- mon_ref = erts_make_ref(p);
- ERTS_BIF_PREP_RET(ret, mon_ref);
- erts_smp_proc_lock(p, ERTS_PROC_LOCK_LINK);
- rp = erts_whereis_process(p, p_locks, target_name, ERTS_PROC_LOCK_LINK,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- if (!rp) {
- DeclareTmpHeap(lhp,3,p);
+ if (!port) {
+res_no_proc:
+ /* Send the DOWN message immediately. Ref is made on the fly because
+ * caller has never seen it yet. */
+ erts_queue_monitor_message(origin, &p_locks, ref,
+ am_port, target, am_noproc);
+ }
+ else {
+ switch (erts_port_monitor(origin, port, target, &ref)) {
+ case ERTS_PORT_OP_DROPPED:
+ case ERTS_PORT_OP_BADARG:
+ goto res_no_proc;
+ case ERTS_PORT_OP_SCHEDULED:
+ BIF_TRAP3(await_port_send_result_trap, origin, ref,
+ am_busy_port, ref);
+ /* the busy_port atom will never be returned, because it cannot be
+ * returned from erts_port_monitor, but just in case if in future
+ * internal API changes - you may see this atom */
+ default:
+ break;
+ }
+ }
+ erts_smp_proc_unlock(origin, p_locks & ~ERTS_PROC_LOCK_MAIN);
+ BIF_RET(ref);
+}
+
+/* Type = process | port :: atom(), 1st argument passed to erlang:monitor/2
+ */
+static BIF_RETTYPE
+local_name_monitor(Process *self, Eterm type, Eterm target_name)
+{
+ BIF_RETTYPE ret = erts_make_ref(self);
+
+ ErtsProcLocks p_locks = ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK;
+ Process *proc = NULL;
+ Port *port = NULL;
+
+ erts_smp_proc_lock(self, ERTS_PROC_LOCK_LINK);
+
+ erts_whereis_name(self, p_locks, target_name,
+ &proc, ERTS_PROC_LOCK_LINK,
+ ERTS_P2P_FLG_ALLOW_OTHER_X,
+ &port, 0);
+
+ /* If the name is not registered,
+ * or if we asked for proc and got a port,
+ * or if we asked for port and got a proc,
+ * we just send the 'DOWN' message.
+ */
+ if ((!proc && !port) ||
+ (type == am_process && port) ||
+ (type == am_port && proc)) {
+ DeclareTmpHeap(lhp,3,self);
Eterm item;
- UseTmpHeap(3,p);
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK);
+ UseTmpHeap(3,self);
+
+ erts_smp_proc_unlock(self, ERTS_PROC_LOCK_LINK);
p_locks &= ~ERTS_PROC_LOCK_LINK;
+
item = TUPLE2(lhp, target_name, erts_this_dist_entry->sysname);
- erts_queue_monitor_message(p, &p_locks,
- mon_ref, am_process, item, am_noproc);
- UnUseTmpHeap(3,p);
- }
- else if (rp != p) {
- erts_add_monitor(&ERTS_P_MONITORS(p), MON_ORIGIN, mon_ref, rp->common.id,
- target_name);
- erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, mon_ref, p->common.id,
- target_name);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ erts_queue_monitor_message(self, &p_locks,
+ ret,
+ type, /* = process|port :: atom() */
+ item, am_noproc);
+ UnUseTmpHeap(3,self);
+ }
+ else if (port) {
+ erts_smp_proc_unlock(self, p_locks & ~ERTS_PROC_LOCK_MAIN);
+ p_locks &= ~ERTS_PROC_LOCK_MAIN;
+
+ switch (erts_port_monitor(self, port, target_name, &ret)) {
+ case ERTS_PORT_OP_DONE:
+ return ret;
+ case ERTS_PORT_OP_SCHEDULED: { /* Scheduled a signal */
+ ASSERT(is_internal_ref(ret));
+ BIF_TRAP3(await_port_send_result_trap, self,
+ ret, am_true, ret);
+ /* bif_trap returns */
+ } break;
+ default:
+ goto badarg;
+ }
+ }
+ else if (proc != self) {
+ erts_add_monitor(&ERTS_P_MONITORS(self), MON_ORIGIN, ret,
+ proc->common.id, target_name);
+ erts_add_monitor(&ERTS_P_MONITORS(proc), MON_TARGET, ret,
+ self->common.id, target_name);
+ erts_smp_proc_unlock(proc, ERTS_PROC_LOCK_LINK);
}
- erts_smp_proc_unlock(p, p_locks & ~ERTS_PROC_LOCK_MAIN);
-
- return ret;
+ if (p_locks) {
+ erts_smp_proc_unlock(self, p_locks & ~ERTS_PROC_LOCK_MAIN);
+ }
+ BIF_RET(ret);
+badarg:
+ if (p_locks) {
+ erts_smp_proc_unlock(self, p_locks & ~ERTS_PROC_LOCK_MAIN);
+ }
+ BIF_ERROR(self, BADARG);
}
static BIF_RETTYPE
@@ -758,7 +874,7 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2,
break;
}
- return ret;
+ BIF_RET(ret);
}
BIF_RETTYPE monitor_2(BIF_ALIST_2)
@@ -772,8 +888,9 @@ BIF_RETTYPE monitor_2(BIF_ALIST_2)
switch (BIF_ARG_1) {
case am_time_offset: {
Eterm ref;
- if (BIF_ARG_2 != am_clock_service)
- goto error;
+ if (BIF_ARG_2 != am_clock_service) {
+ goto badarg;
+ }
ref = erts_make_ref(BIF_P);
erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK);
erts_add_monitor(&ERTS_P_MONITORS(BIF_P), MON_TIME_OFFSET,
@@ -783,46 +900,57 @@ BIF_RETTYPE monitor_2(BIF_ALIST_2)
BIF_RET(ref);
}
case am_process:
+ case am_port:
break;
default:
- goto error;
+ goto badarg;
}
- if (is_internal_pid(target)) {
- local_pid:
- ret = local_pid_monitor(BIF_P, target, erts_make_ref(BIF_P), 0);
- } else if (is_external_pid(target)) {
+ if (is_internal_pid(target) && BIF_ARG_1 == am_process) {
+local_pid:
+ ret = local_pid_monitor(BIF_P, target, erts_make_ref(BIF_P), 0);
+ } else if (is_external_pid(target) && BIF_ARG_1 == am_process) {
dep = external_pid_dist_entry(target);
if (dep == erts_this_dist_entry)
goto local_pid;
ret = remote_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, dep, target, 0);
+ } else if (is_internal_port(target) && BIF_ARG_1 == am_port) {
+local_port:
+ ret = local_port_monitor(BIF_P, target);
+ } else if (is_external_port(target) && BIF_ARG_1 == am_port) {
+ dep = external_port_dist_entry(target);
+ if (dep == erts_this_dist_entry) {
+ goto local_port;
+ }
+ goto badarg; /* No want remote port */
} else if (is_atom(target)) {
- ret = local_name_monitor(BIF_P, target);
+ ret = local_name_monitor(BIF_P, BIF_ARG_1, target);
} else if (is_tuple(target)) {
Eterm *tp = tuple_val(target);
Eterm remote_node;
Eterm name;
- if (arityval(*tp) != 2)
- goto error;
+ if (arityval(*tp) != 2) {
+ goto badarg;
+ }
remote_node = tp[2];
name = tp[1];
if (!is_atom(remote_node) || !is_atom(name)) {
- goto error;
+ goto badarg;
}
if (!erts_is_alive && remote_node != am_Noname) {
- goto error; /* Remote monitor from (this) undistributed node */
+ goto badarg; /* Remote monitor from (this) undistributed node */
}
dep = erts_sysname_to_connected_dist_entry(remote_node);
if (dep == erts_this_dist_entry) {
deref_de = 1;
- ret = local_name_monitor(BIF_P, name);
+ ret = local_name_monitor(BIF_P, BIF_ARG_1, name);
} else {
if (dep)
deref_de = 1;
ret = remote_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, dep, name, 1);
}
} else {
- error:
+badarg:
ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
}
if (deref_de) {
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index b410578d37..3fb866733c 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -361,8 +361,13 @@ erts_print_system_version(int to, void *arg, Process *c_p)
}
typedef struct {
+ /* {Entity,Node} = {monitor.Name,monitor.Pid} for external by name
+ * {Entity,Node} = {monitor.Pid,NIL} for external/external by pid
+ * {Entity,Node} = {monitor.Name,erlang:node()} for internal by name */
Eterm entity;
Eterm node;
+ /* pid is actual target being monitored, no matter pid/port or name */
+ Eterm pid;
} MonitorInfo;
typedef struct {
@@ -420,21 +425,27 @@ static void collect_one_origin_monitor(ErtsMonitor *mon, void *vmicp)
EXTEND_MONITOR_INFOS(micp);
if (is_atom(mon->pid)) { /* external by name */
micp->mi[micp->mi_i].entity = mon->name;
- micp->mi[micp->mi_i].node = mon->pid;
- micp->sz += 3; /* need one 2-tuple */
+ micp->mi[micp->mi_i].node = mon->pid;
+ micp->sz += 3; /* need one 2-tuple */
} else if (is_external_pid(mon->pid)) { /* external by pid */
micp->mi[micp->mi_i].entity = mon->pid;
- micp->mi[micp->mi_i].node = NIL;
- micp->sz += NC_HEAP_SIZE(mon->pid);
+ micp->mi[micp->mi_i].node = NIL;
+ micp->sz += NC_HEAP_SIZE(mon->pid);
} else if (!is_nil(mon->name)) { /* internal by name */
micp->mi[micp->mi_i].entity = mon->name;
- micp->mi[micp->mi_i].node = erts_this_dist_entry->sysname;
- micp->sz += 3; /* need one 2-tuple */
+ micp->mi[micp->mi_i].node = erts_this_dist_entry->sysname;
+ micp->sz += 3; /* need one 2-tuple */
} else { /* internal by pid */
micp->mi[micp->mi_i].entity = mon->pid;
- micp->mi[micp->mi_i].node = NIL;
+ micp->mi[micp->mi_i].node = NIL;
/* no additional heap space needed */
}
+
+ /* have always pid at hand, to assist with figuring out if its a port or
+ * a process, when we monitored by name and process_info is requested.
+ * See: erl_bif_info.c:process_info_aux section for am_monitors */
+ micp->mi[micp->mi_i].pid = mon->pid;
+
micp->mi_i++;
micp->sz += 2 + 3; /* For a cons cell and a 2-tuple */
}
@@ -1190,37 +1201,49 @@ process_info_aux(Process *BIF_P,
case am_monitors: {
MonitorInfoCollection mic;
- int i;
+ int i;
INIT_MONITOR_INFOS(mic);
- erts_doforall_monitors(ERTS_P_MONITORS(rp),&collect_one_origin_monitor,&mic);
- hp = HAlloc(BIF_P, 3 + mic.sz);
+ erts_doforall_monitors(ERTS_P_MONITORS(rp),
+ &collect_one_origin_monitor, &mic);
+ hp = HAlloc(BIF_P, 3 + mic.sz);
res = NIL;
for (i = 0; i < mic.mi_i; i++) {
if (is_atom(mic.mi[i].entity)) {
/* Monitor by name.
- * Build {process, {Name, Node}} and cons it.
+ * Build {process|port, {Name, Node}} and cons it.
*/
Eterm t1, t2;
+ /* If pid is an atom, then it is a remote named monitor, which
+ has to be a process */
+ Eterm m_type = is_port(mic.mi[i].pid) ? am_port : am_process;
+ ASSERT(is_pid(mic.mi[i].pid)
+ || is_port(mic.mi[i].pid)
+ || is_atom(mic.mi[i].pid));
t1 = TUPLE2(hp, mic.mi[i].entity, mic.mi[i].node);
hp += 3;
- t2 = TUPLE2(hp, am_process, t1);
+ t2 = TUPLE2(hp, m_type, t1);
hp += 3;
res = CONS(hp, t2, res);
- hp += 2;
+ hp += 2;
}
else {
- /* Monitor by pid. Build {process, Pid} and cons it. */
+ /* Monitor by pid. Build {process|port, Pid} and cons it. */
Eterm t;
Eterm pid = STORE_NC(&hp, &MSO(BIF_P), mic.mi[i].entity);
- t = TUPLE2(hp, am_process, pid);
+
+ Eterm m_type = is_port(mic.mi[i].pid) ? am_port : am_process;
+ ASSERT(is_pid(mic.mi[i].pid)
+ || is_port(mic.mi[i].pid));
+
+ t = TUPLE2(hp, m_type, pid);
hp += 3;
res = CONS(hp, t, res);
- hp += 2;
+ hp += 2;
}
}
- DESTROY_MONITOR_INFOS(mic);
+ DESTROY_MONITOR_INFOS(mic);
break;
}
@@ -2880,7 +2903,8 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1)
*/
Eterm
-erts_bld_port_info(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, Port *prt, Eterm item)
+erts_bld_port_info(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, Port *prt,
+ Eterm item)
{
Eterm res = THE_NON_VALUE;
@@ -2928,8 +2952,8 @@ erts_bld_port_info(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, Port *prt, Eterm ite
Eterm item;
INIT_MONITOR_INFOS(mic);
-
- erts_doforall_monitors(ERTS_P_MONITORS(prt), &collect_one_origin_monitor, &mic);
+ erts_doforall_monitors(ERTS_P_MONITORS(prt),
+ &collect_one_origin_monitor, &mic);
if (szp)
*szp += mic.sz;
@@ -2938,14 +2962,16 @@ erts_bld_port_info(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, Port *prt, Eterm ite
res = NIL;
for (i = 0; i < mic.mi_i; i++) {
Eterm t;
- item = STORE_NC(hpp, ohp, mic.mi[i].entity);
- t = TUPLE2(*hpp, am_process, item);
+ Eterm m_type;
+
+ item = STORE_NC(hpp, ohp, mic.mi[i].entity);
+ m_type = is_port(item) ? am_port : am_process;
+ t = TUPLE2(*hpp, m_type, item);
*hpp += 3;
res = CONS(*hpp, t, res);
*hpp += 2;
}
- }
-
+ } // hpp
DESTROY_MONITOR_INFOS(mic);
if (szp) {
@@ -2953,6 +2979,32 @@ erts_bld_port_info(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, Port *prt, Eterm ite
goto done;
}
}
+ else if (item == am_monitored_by) {
+ MonitorInfoCollection mic;
+ int i;
+ Eterm item;
+
+ INIT_MONITOR_INFOS(mic);
+ erts_doforall_monitors(ERTS_P_MONITORS(prt),
+ &collect_one_target_monitor, &mic);
+ if (szp)
+ *szp += mic.sz;
+
+ if (hpp) {
+ res = NIL;
+ for (i = 0; i < mic.mi_i; ++i) {
+ item = STORE_NC(hpp, ohp, mic.mi[i].entity);
+ res = CONS(*hpp, item, res);
+ *hpp += 2;
+ }
+ } // hpp
+ DESTROY_MONITOR_INFOS(mic);
+
+ if (szp) {
+ res = am_true;
+ goto done;
+ }
+ }
else if (item == am_name) {
int count = sys_strlen(prt->name);
diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c
index 37f4e1de49..208935bed9 100644
--- a/erts/emulator/beam/erl_bif_port.c
+++ b/erts/emulator/beam/erl_bif_port.c
@@ -139,6 +139,12 @@ sig_lookup_port(Process *c_p, Eterm id_or_name)
return lookup_port(c_p, id_or_name, ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
}
+/* Non-inline copy of sig_lookup_port to be exported */
+Port *erts_sig_lookup_port(Process *c_p, Eterm id_or_name)
+{
+ return lookup_port(c_p, id_or_name, ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
+}
+
static ERTS_INLINE Port *
data_lookup_port(Process *c_p, Eterm id_or_name)
{
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index f0075ca2b9..f90844ccc8 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -361,6 +361,8 @@ Eterm erts_request_io_bytes(Process *c_p);
#define ERTS_PORT_REDS_CONNECT (CONTEXT_REDS/200)
#define ERTS_PORT_REDS_UNLINK (CONTEXT_REDS/200)
#define ERTS_PORT_REDS_LINK (CONTEXT_REDS/200)
+#define ERTS_PORT_REDS_MONITOR (CONTEXT_REDS/200)
+#define ERTS_PORT_REDS_DEMONITOR (CONTEXT_REDS/200)
#define ERTS_PORT_REDS_BADSIG (CONTEXT_REDS/200)
#define ERTS_PORT_REDS_CONTROL (CONTEXT_REDS/100)
#define ERTS_PORT_REDS_CALL (CONTEXT_REDS/50)
@@ -850,16 +852,20 @@ void erts_port_resume_procs(Port *);
struct binary;
-#define ERTS_P2P_SIG_TYPE_BAD 0
-#define ERTS_P2P_SIG_TYPE_OUTPUT 1
-#define ERTS_P2P_SIG_TYPE_OUTPUTV 2
-#define ERTS_P2P_SIG_TYPE_CONNECT 3
-#define ERTS_P2P_SIG_TYPE_EXIT 4
-#define ERTS_P2P_SIG_TYPE_CONTROL 5
-#define ERTS_P2P_SIG_TYPE_CALL 6
-#define ERTS_P2P_SIG_TYPE_INFO 7
-#define ERTS_P2P_SIG_TYPE_LINK 8
-#define ERTS_P2P_SIG_TYPE_UNLINK 9
+enum {
+ ERTS_P2P_SIG_TYPE_BAD = 0,
+ ERTS_P2P_SIG_TYPE_OUTPUT = 1,
+ ERTS_P2P_SIG_TYPE_OUTPUTV = 2,
+ ERTS_P2P_SIG_TYPE_CONNECT = 3,
+ ERTS_P2P_SIG_TYPE_EXIT = 4,
+ ERTS_P2P_SIG_TYPE_CONTROL = 5,
+ ERTS_P2P_SIG_TYPE_CALL = 6,
+ ERTS_P2P_SIG_TYPE_INFO = 7,
+ ERTS_P2P_SIG_TYPE_LINK = 8,
+ ERTS_P2P_SIG_TYPE_UNLINK = 9,
+ ERTS_P2P_SIG_TYPE_MONITOR = 10,
+ ERTS_P2P_SIG_TYPE_DEMONITOR = 11
+};
#define ERTS_P2P_SIG_TYPE_BITS 4
#define ERTS_P2P_SIG_TYPE_MASK \
@@ -921,6 +927,15 @@ struct ErtsProc2PortSigData_ {
struct {
Eterm from;
} unlink;
+ struct {
+ Eterm origin; /* who receives monitor event, pid */
+ Eterm name; /* either name for named monitor, or port id */
+ } monitor;
+ struct {
+ Eterm origin; /* who is at the other end of the monitor, pid */
+ Eterm name; /* port id */
+ Uint32 ref[ERTS_MAX_REF_NUMBERS]; /* box contents of a ref */
+ } demonitor;
} u;
} ;
@@ -1017,6 +1032,29 @@ ErtsPortOpResult erts_port_control(Process *, Port *, unsigned int, Eterm, Eterm
ErtsPortOpResult erts_port_call(Process *, Port *, unsigned int, Eterm, Eterm *);
ErtsPortOpResult erts_port_info(Process *, Port *, Eterm, Eterm *);
+/* Creates monitor between Origin and Target. Ref must be initialized to
+ * a reference (ref may be rewritten to be used to serve additionally as a
+ * signal id). Name is atom if user monitors port by name or NIL */
+ErtsPortOpResult erts_port_monitor(Process *origin, Port *target, Eterm name,
+ Eterm *ref);
+
+typedef enum {
+ /* Normal demonitor rules apply with locking and reductions bump */
+ ERTS_PORT_DEMONITOR_NORMAL = 1,
+ /* Relaxed demonitor rules when process is about to die, which means that
+ * pid lookup won't work, locks won't work, no reductions bump. */
+ ERTS_PORT_DEMONITOR_ORIGIN_ON_DEATHBED = 2,
+} ErtsDemonitorMode;
+
+/* Removes monitor between origin and target, identified by ref.
+ * origin_is_dying can be 0 (false, normal locking rules and reductions bump
+ * apply) or 1 (true, in case when we avoid origin locking) */
+ErtsPortOpResult erts_port_demonitor(Process *origin, ErtsDemonitorMode mode,
+ Port *target, Eterm ref,
+ Eterm *trap_ref);
+/* defined in erl_bif_port.c */
+Port *erts_sig_lookup_port(Process *c_p, Eterm id_or_name);
+
int erts_port_output_async(Port *, Eterm, Eterm);
/*
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index c0b1d7246c..2c3d1bb7eb 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -12239,7 +12239,6 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext)
ExitMonitorContext *pcontext = vpcontext;
DistEntry *dep;
ErtsMonitor *rmon;
- Process *rp;
switch (mon->type) {
case MON_ORIGIN:
@@ -12268,9 +12267,10 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext)
erts_deref_dist_entry(dep);
}
} else {
- ASSERT(is_pid(mon->pid));
- if (is_internal_pid(mon->pid)) { /* local by pid or name */
- rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK);
+ ASSERT(is_pid(mon->pid) || is_port(mon->pid));
+ /* if is local by pid or name */
+ if (is_internal_pid(mon->pid)) {
+ Process *rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK);
if (!rp) {
goto done;
}
@@ -12280,7 +12280,17 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext)
goto done;
}
erts_destroy_monitor(rmon);
- } else { /* remote by pid */
+ } else if (is_internal_port(mon->pid)) {
+ /* Is a local port */
+ Port *prt = erts_port_lookup_raw(mon->pid);
+ if (!prt) {
+ goto done;
+ }
+ erts_port_demonitor(pcontext->p,
+ ERTS_PORT_DEMONITOR_ORIGIN_ON_DEATHBED,
+ prt, mon->ref, NULL);
+ return; /* let erts_port_demonitor do the deletion */
+ } else { /* remote by pid */
ASSERT(is_external_pid(mon->pid));
dep = external_pid_dist_entry(mon->pid);
ASSERT(dep != NULL);
@@ -12318,6 +12328,7 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext)
erts_port_release(prt);
} else if (is_internal_pid(mon->pid)) {/* local by name or pid */
Eterm watched;
+ Process *rp;
DeclareTmpHeapNoproc(lhp,3);
ErtsProcLocks rp_locks = (ERTS_PROC_LOCK_LINK
| ERTS_PROC_LOCKS_MSG_SEND);
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 01df5476db..cb8792dffa 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -1260,7 +1260,7 @@ typedef struct {
/*
* Try doing an immediate driver callback call from a process. If
* this fail, the operation should be scheduled in the normal case...
- *
+ * Returns: ok to do the call, or error (lock busy, does not exist, etc)
*/
static ERTS_INLINE ErtsTryImmDrvCallResult
try_imm_drv_call(ErtsTryImmDrvCallState *sp)
@@ -3074,6 +3074,250 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
}
static void
+port_monitor_failure(Eterm port_id, Eterm origin, Eterm ref_DOWN)
+{
+ Process *origin_p;
+ ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK;
+ ASSERT(is_internal_pid(origin));
+
+ origin_p = erts_pid2proc(NULL, 0, origin, p_locks);
+ if (! origin_p) { return; }
+
+ /* Send the DOWN message immediately. Ref is made on the fly because
+ * caller has never seen it yet. */
+ erts_queue_monitor_message(origin_p, &p_locks, ref_DOWN,
+ am_port, port_id, am_noproc);
+ erts_smp_proc_unlock(origin_p, p_locks);
+}
+
+/* Origin wants to monitor port Prt. State contains possible error, which has
+ * happened just before. Name is either NIL or an atom, if user monitors
+ * a port by name. Ref is premade reference that will be returned to user */
+static void
+port_monitor(Port *prt, erts_aint32_t state, Eterm origin,
+ Eterm name, Eterm ref)
+{
+ Eterm name_or_nil = is_atom(name) ? name : NIL;
+
+ ASSERT(is_pid(origin));
+ ASSERT(is_atom(name) || is_port(name) || name == NIL);
+ ASSERT(is_internal_ref(ref));
+
+ if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) {
+ ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK;
+
+ Process *origin_p = erts_pid2proc(NULL, 0, origin, p_locks);
+ if (! origin_p) {
+ goto failure;
+ }
+ erts_add_monitor(&ERTS_P_MONITORS(origin_p), MON_ORIGIN, ref,
+ prt->common.id, name_or_nil);
+ erts_add_monitor(&ERTS_P_MONITORS(prt), MON_TARGET, ref,
+ origin, name_or_nil);
+
+ erts_smp_proc_unlock(origin_p, p_locks);
+ } else {
+failure:
+ port_monitor_failure(prt->common.id, origin, ref);
+ }
+}
+
+static int
+port_sig_monitor(Port *prt, erts_aint32_t state, int op,
+ ErtsProc2PortSigData *sigdp)
+{
+ Eterm hp[REF_THING_SIZE];
+ Eterm ref = make_internal_ref(&hp);
+ write_ref_thing(hp, sigdp->ref[0], sigdp->ref[1], sigdp->ref[2]);
+
+ if (op == ERTS_PROC2PORT_SIG_EXEC) {
+ /* erts_add_monitor call inside port_monitor will copy ref from hp */
+ port_monitor(prt, state,
+ sigdp->u.monitor.origin,
+ sigdp->u.monitor.name,
+ ref);
+ } else {
+ port_monitor_failure(sigdp->u.monitor.name,
+ sigdp->u.monitor.origin,
+ ref);
+ }
+ if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) {
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ }
+ return ERTS_PORT_REDS_MONITOR;
+}
+
+/* Creates monitor between Origin and Target. Ref must be initialized to
+ * a reference (ref may be rewritten to be used to serve additionally as a
+ * signal id). Name is atom if user monitors port by name or NIL */
+ErtsPortOpResult
+erts_port_monitor(Process *origin, Port *port, Eterm name, Eterm *refp)
+{
+ ErtsProc2PortSigData *sigdp;
+ ErtsTryImmDrvCallState try_call_state
+ = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
+ origin, port, ERTS_PORT_SFLGS_INVALID_LOOKUP,
+ 0,
+ 0, /* trap_ref is always set so !trap_ref always is false */
+ am_monitor);
+
+ ASSERT(origin);
+ ASSERT(port);
+ ASSERT(is_atom(name) || is_port(name));
+ ASSERT(refp);
+
+ switch (try_imm_drv_call(&try_call_state)) {
+ case ERTS_TRY_IMM_DRV_CALL_OK:
+ port_monitor(port, try_call_state.state, origin->common.id, name, *refp);
+ finalize_imm_drv_call(&try_call_state);
+ BUMP_REDS(origin, ERTS_PORT_REDS_MONITOR);
+ return ERTS_PORT_OP_DONE;
+ case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
+ return ERTS_PORT_OP_BADARG;
+ default:
+ break; /* Schedule call instead... */
+ }
+
+ sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_MONITOR;
+ sigdp->u.monitor.origin = origin->common.id;
+ sigdp->u.monitor.name = name; /* either named monitor, or port id */
+
+ /* Ref contents will be initialized here */
+ return erts_schedule_proc2port_signal(origin, port, origin->common.id,
+ refp, sigdp, 0, NULL,
+ port_sig_monitor);
+}
+
+static void
+port_demonitor_failure(Eterm port_id, Eterm origin, Eterm ref)
+{
+ Process *origin_p;
+ ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK;
+ ErtsMonitor *mon1;
+ ASSERT(is_internal_pid(origin));
+
+ origin_p = erts_pid2proc(NULL, 0, origin, rp_locks);
+ if (! origin_p) { return; }
+
+ /* do not send any DOWN messages, drop monitors on process */
+ mon1 = erts_remove_monitor(&ERTS_P_MONITORS(origin_p), ref);
+ if (mon1 != NULL) {
+ erts_destroy_monitor(mon1);
+ }
+
+ erts_smp_proc_unlock(origin_p, rp_locks);
+}
+
+/* Origin wants to demonitor port Prt. State contains possible error, which has
+ * happened just before. Ref is reference to monitor */
+static void
+port_demonitor(Port *port, erts_aint32_t state, Eterm origin, Eterm ref)
+{
+ ASSERT(port);
+ ASSERT(is_pid(origin));
+ ASSERT(is_internal_ref(ref));
+
+ if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) {
+ ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK;
+ Process *origin_p = erts_pid2proc(NULL, 0, origin, p_locks);
+ if (origin_p) {
+ ErtsMonitor *mon1 = erts_remove_monitor(&ERTS_P_MONITORS(origin_p),
+ ref);
+ if (mon1 != NULL) {
+ erts_destroy_monitor(mon1);
+ }
+ }
+ if (1) {
+ ErtsMonitor *mon2 = erts_remove_monitor(&ERTS_P_MONITORS(port),
+ ref);
+ if (mon2 != NULL) {
+ erts_destroy_monitor(mon2);
+ }
+ }
+ if (origin_p) { /* when origin is dying, it won't be found */
+ erts_smp_proc_unlock(origin_p, p_locks);
+ }
+ } else {
+ port_demonitor_failure(port->common.id, origin, ref);
+ }
+}
+
+static int
+port_sig_demonitor(Port *prt, erts_aint32_t state, int op,
+ ErtsProc2PortSigData *sigdp)
+{
+ Eterm hp[REF_THING_SIZE];
+ Eterm ref = make_internal_ref(&hp);
+ write_ref_thing(hp, sigdp->u.demonitor.ref[0],
+ sigdp->u.demonitor.ref[1],
+ sigdp->u.demonitor.ref[2]);
+ if (op == ERTS_PROC2PORT_SIG_EXEC) {
+ port_demonitor(prt, state, sigdp->u.demonitor.origin, ref);
+ } else {
+ port_demonitor_failure(sigdp->u.demonitor.name,
+ sigdp->u.demonitor.origin,
+ ref);
+ }
+ if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) {
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ }
+ return ERTS_PORT_REDS_DEMONITOR;
+}
+
+/* Removes monitor between origin and target, identified by ref.
+ * Mode defines normal or relaxed demonitor rules (process is at death) */
+ErtsPortOpResult erts_port_demonitor(Process *origin, ErtsDemonitorMode mode,
+ Port *target, Eterm ref,
+ Eterm *trap_ref)
+{
+ Process *c_p = mode == ERTS_PORT_DEMONITOR_NORMAL ? origin : NULL;
+ ErtsProc2PortSigData *sigdp;
+ ErtsTryImmDrvCallState try_call_state
+ = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
+ c_p,
+ target, ERTS_PORT_SFLGS_INVALID_LOOKUP,
+ 0,
+ !trap_ref,
+ am_demonitor);
+
+ ASSERT(origin);
+ ASSERT(target);
+ ASSERT(is_internal_ref(ref));
+
+ switch (try_imm_drv_call(&try_call_state)) {
+ case ERTS_TRY_IMM_DRV_CALL_OK:
+ port_demonitor(target, try_call_state.state, origin->common.id, ref);
+ finalize_imm_drv_call(&try_call_state);
+ if (mode == ERTS_PORT_DEMONITOR_NORMAL) {
+ BUMP_REDS(origin, ERTS_PORT_REDS_DEMONITOR);
+ }
+ return ERTS_PORT_OP_DONE;
+ case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
+ return ERTS_PORT_OP_BADARG;
+ default:
+ break; /* Schedule call instead... */
+ }
+
+ sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_DEMONITOR;
+ sigdp->u.demonitor.origin = origin->common.id;
+ sigdp->u.demonitor.name = target->common.id;
+ {
+ RefThing *reft = ref_thing_ptr(ref);
+ /* Start from 1 skip ref arity */
+ sys_memcpy(sigdp->u.demonitor.ref,
+ internal_thing_ref_numbers(reft),
+ sizeof(sigdp->u.demonitor.ref));
+ }
+
+ /* Ref contents will be initialized here */
+ return erts_schedule_proc2port_signal(c_p, target, origin->common.id,
+ trap_ref, sigdp, 0, NULL,
+ port_sig_demonitor);
+}
+
+static void
init_ack_send_reply(Port *port, Eterm resp)
{
@@ -3942,23 +4186,30 @@ erts_terminate_port(Port *pp)
terminate_port(pp);
}
+static void port_fire_one_monitor(ErtsMonitor *mon, void *ctx0);
static void sweep_one_monitor(ErtsMonitor *mon, void *vpsc)
{
- ErtsMonitor *rmon;
- Process *rp;
+ switch (mon->type) {
+ case MON_ORIGIN: {
+ ErtsMonitor *rmon;
+ Process *rp;
- ASSERT(mon->type == MON_ORIGIN);
- ASSERT(is_internal_pid(mon->pid));
- rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK);
- if (!rp) {
- goto done;
- }
- rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- if (rmon == NULL) {
- goto done;
+ ASSERT(is_internal_pid(mon->pid));
+ rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK);
+ if (!rp) {
+ goto done;
+ }
+ rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ if (rmon == NULL) {
+ goto done;
+ }
+ erts_destroy_monitor(rmon);
+ } break;
+ case MON_TARGET: {
+ port_fire_one_monitor(mon, vpsc); /* forward call */
+ } break;
}
- erts_destroy_monitor(rmon);
done:
erts_destroy_monitor(mon);
}
@@ -4039,6 +4290,43 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
erts_destroy_link(lnk);
}
+static void
+port_fire_one_monitor(ErtsMonitor *mon, void *ctx0)
+{
+ Process *origin;
+ ErtsProcLocks origin_locks;
+
+ if (mon->type != MON_TARGET || ! is_pid(mon->pid)) {
+ return;
+ }
+ /*
+ * Proceed here if someone monitors us, we (port) are the target and
+ * origin is some process
+ */
+ origin_locks = ERTS_PROC_LOCKS_MSG_SEND | ERTS_PROC_LOCK_LINK;
+
+ origin = erts_pid2proc(NULL, 0, mon->pid, origin_locks);
+ if (origin) {
+ DeclareTmpHeapNoproc(lhp,3);
+ SweepContext *ctx = (SweepContext *)ctx0;
+ ErtsMonitor *rmon;
+ Eterm watched = (is_atom(mon->name)
+ ? TUPLE2(lhp, mon->name, erts_this_dist_entry->sysname)
+ : ctx->port->common.id);
+
+ erts_queue_monitor_message(origin, &origin_locks, mon->ref, am_port,
+ watched, ctx->reason);
+ UnUseTmpHeapNoproc(3);
+
+ rmon = erts_remove_monitor(&ERTS_P_MONITORS(origin), mon->ref);
+ erts_smp_proc_unlock(origin, origin_locks);
+
+ if (rmon) {
+ erts_destroy_monitor(rmon);
+ }
+ }
+}
+
/* 'from' is sending 'this_port' an exit signal, (this_port must be internal).
* If reason is normal we don't do anything, *unless* from is our connected
* process in which case we close the port. Any other reason kills the port.
@@ -4050,39 +4338,40 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
*/
int
-erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed,
+erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed,
int drop_normal)
{
ErtsLink *lnk;
- Eterm rreason;
+ Eterm modified_reason;
erts_aint32_t state, set_state_flags;
ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- rreason = (reason == am_kill) ? am_killed : reason;
+ modified_reason = (reason == am_kill) ? am_killed : reason;
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_exit)) {
DTRACE_CHARBUF(from_str, DTRACE_TERM_BUF_SIZE);
DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);
- DTRACE_CHARBUF(rreason_str, 64);
+ DTRACE_CHARBUF(reason_str, 64);
erts_snprintf(from_str, sizeof(DTRACE_CHARBUF_NAME(from_str)), "%T", from);
- dtrace_port_str(p, port_str);
- erts_snprintf(rreason_str, sizeof(DTRACE_CHARBUF_NAME(rreason_str)), "%T", rreason);
- DTRACE4(port_exit, from_str, port_str, p->name, rreason_str);
+ dtrace_port_str(prt, port_str);
+ erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)), "%T",
+ modified_reason);
+ DTRACE4(port_exit, from_str, port_str, prt->name, reason_str);
}
#endif
- state = erts_atomic32_read_nob(&p->state);
+ state = erts_atomic32_read_nob(&prt->state);
if (state & (ERTS_PORT_SFLGS_DEAD
| ERTS_PORT_SFLG_EXITING
| ERTS_PORT_SFLG_CLOSING))
return 0;
- if (reason == am_normal && from != ERTS_PORT_GET_CONNECTED(p)
- && from != p->common.id && drop_normal) {
+ if (reason == am_normal && from != ERTS_PORT_GET_CONNECTED(prt)
+ && from != prt->common.id && drop_normal) {
return 0;
}
@@ -4090,53 +4379,54 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed,
if (send_closed)
set_state_flags |= ERTS_PORT_SFLG_SEND_CLOSED;
- erts_port_task_sched_enter_exiting_state(&p->sched);
+ erts_port_task_sched_enter_exiting_state(&prt->sched);
- state = erts_atomic32_read_bor_mb(&p->state, set_state_flags);
+ state = erts_atomic32_read_bor_mb(&prt->state, set_state_flags);
state |= set_state_flags;
- if (IS_TRACED_FL(p, F_TRACE_PORTS))
- trace_port(p, am_closed, reason);
+ if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_closed, reason);
- erts_trace_check_exiting(p->common.id);
+ erts_trace_check_exiting(prt->common.id);
- set_busy_port(ERTS_Port2ErlDrvPort(p), 0);
+ set_busy_port(ERTS_Port2ErlDrvPort(prt), 0);
- if (p->common.u.alive.reg != NULL)
- (void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name);
+ if (prt->common.u.alive.reg != NULL)
+ (void) erts_unregister_name(NULL, 0, prt, prt->common.u.alive.reg->name);
{
- SweepContext sc = {p, rreason};
- lnk = ERTS_P_LINKS(p);
- ERTS_P_LINKS(p) = NULL;
+ SweepContext sc = {prt, modified_reason};
+ lnk = ERTS_P_LINKS(prt);
+ ERTS_P_LINKS(prt) = NULL;
erts_sweep_links(lnk, &sweep_one_link, &sc);
}
- DRV_MONITOR_LOCK_PDL(p);
+ DRV_MONITOR_LOCK_PDL(prt);
{
- ErtsMonitor *moni = ERTS_P_MONITORS(p);
- ERTS_P_MONITORS(p) = NULL;
- erts_sweep_monitors(moni, &sweep_one_monitor, NULL);
+ SweepContext ctx = {prt, modified_reason};
+ ErtsMonitor *moni = ERTS_P_MONITORS(prt);
+ ERTS_P_MONITORS(prt) = NULL;
+ erts_sweep_monitors(moni, &sweep_one_monitor, &ctx);
}
- DRV_MONITOR_UNLOCK_PDL(p);
+ DRV_MONITOR_UNLOCK_PDL(prt);
- if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) {
- erts_do_net_exits(p->dist_entry, rreason);
- erts_deref_dist_entry(p->dist_entry);
- p->dist_entry = NULL;
- erts_atomic32_read_band_relb(&p->state,
+ if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && prt->dist_entry) {
+ erts_do_net_exits(prt->dist_entry, modified_reason);
+ erts_deref_dist_entry(prt->dist_entry);
+ prt->dist_entry = NULL;
+ erts_atomic32_read_band_relb(&prt->state,
~ERTS_PORT_SFLG_DISTRIBUTION);
}
- if ((reason != am_kill) && !is_port_ioq_empty(p)) {
+ if ((reason != am_kill) && !is_port_ioq_empty(prt)) {
/* must turn exiting flag off */
- erts_atomic32_read_bset_relb(&p->state,
+ erts_atomic32_read_bset_relb(&prt->state,
(ERTS_PORT_SFLG_EXITING
| ERTS_PORT_SFLG_CLOSING),
ERTS_PORT_SFLG_CLOSING);
- flush_port(p);
+ flush_port(prt);
}
else {
- terminate_port(p);
+ terminate_port(prt);
}
return 1;
diff --git a/erts/emulator/beam/register.c b/erts/emulator/beam/register.c
index 77f79fcea4..ac7096745e 100644
--- a/erts/emulator/beam/register.c
+++ b/erts/emulator/beam/register.c
@@ -323,7 +323,8 @@ erts_whereis_name(Process *c_p,
Process** proc,
ErtsProcLocks need_locks,
int flags,
- Port** port)
+ Port** port,
+ int lock_port)
{
RegProc* rp = NULL;
HashValue hval;
@@ -406,31 +407,33 @@ erts_whereis_name(Process *c_p,
*port = NULL;
else {
#ifdef ERTS_SMP
- if (pending_port == rp->pt)
- pending_port = NULL;
- else {
- if (pending_port) {
- /* Ahh! Registered port changed while reg lock
- was unlocked... */
- erts_port_release(pending_port);
- pending_port = NULL;
- }
+ if (lock_port) {
+ if (pending_port == rp->pt)
+ pending_port = NULL;
+ else {
+ if (pending_port) {
+ /* Ahh! Registered port changed while reg lock
+ was unlocked... */
+ erts_port_release(pending_port);
+ pending_port = NULL;
+ }
- if (erts_smp_port_trylock(rp->pt) == EBUSY) {
- Eterm id = rp->pt->common.id; /* id read only... */
- /* Unlock all locks, acquire port lock, and restart... */
- if (current_c_p_locks) {
- erts_smp_proc_unlock(c_p, current_c_p_locks);
- current_c_p_locks = 0;
- }
- reg_read_unlock();
- pending_port = erts_id2port(id);
- goto restart;
- }
- }
+ if (erts_smp_port_trylock(rp->pt) == EBUSY) {
+ Eterm id = rp->pt->common.id; /* id read only... */
+ /* Unlock all locks, acquire port lock, and restart... */
+ if (current_c_p_locks) {
+ erts_smp_proc_unlock(c_p, current_c_p_locks);
+ current_c_p_locks = 0;
+ }
+ reg_read_unlock();
+ pending_port = erts_id2port(id);
+ goto restart;
+ }
+ }
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(rp->pt));
+ }
#endif
*port = rp->pt;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(*port));
}
}
@@ -452,7 +455,7 @@ erts_whereis_process(Process *c_p,
int flags)
{
Process *proc;
- erts_whereis_name(c_p, c_p_locks, name, &proc, need_locks, flags, NULL);
+ erts_whereis_name(c_p, c_p_locks, name, &proc, need_locks, flags, NULL, 0);
return proc;
}
diff --git a/erts/emulator/beam/register.h b/erts/emulator/beam/register.h
index 88ab7b7bf1..d839f55d6b 100644
--- a/erts/emulator/beam/register.h
+++ b/erts/emulator/beam/register.h
@@ -49,7 +49,7 @@ int erts_register_name(Process *, Eterm, Eterm);
Eterm erts_whereis_name_to_id(Process *, Eterm);
void erts_whereis_name(Process *, ErtsProcLocks,
Eterm, Process**, ErtsProcLocks, int,
- Port**);
+ Port**, int);
Process *erts_whereis_process(Process *,
ErtsProcLocks,
Eterm,