aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
authorDmytro Lytovchenko <[email protected]>2016-05-25 14:37:03 +0200
committerLukas Larsson <[email protected]>2016-06-10 09:46:33 +0200
commit3ee5343415d6ae0ce1ff1c2a2555051431a9315e (patch)
tree50f0acc46c6f30bf9ce58b9851ca1a2cd92f1772 /erts/emulator/beam/io.c
parentd26c15e07229c90ba8353bd78d5406ada0f13271 (diff)
downloadotp-3ee5343415d6ae0ce1ff1c2a2555051431a9315e.tar.gz
otp-3ee5343415d6ae0ce1ff1c2a2555051431a9315e.tar.bz2
otp-3ee5343415d6ae0ce1ff1c2a2555051431a9315e.zip
erts: Add port monitors
* erlang:monitor/2 with port argument is added, erlang:demonitor, using port task API and avoiding locking; * port_info and process_info support for monitored ports (with named port monitors support); * Exit signals contain type 'process' or 'port'; * Propagation of port exit signals; * Self-cleaning when origin process dies with monitor on; * 8 test cases + testcase for port driver crashing; * Documentation for all of the above (monitor, demonitor, port_info and process_info) updated
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c390
1 files changed, 340 insertions, 50 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 01df5476db..cb8792dffa 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -1260,7 +1260,7 @@ typedef struct {
/*
* Try doing an immediate driver callback call from a process. If
* this fail, the operation should be scheduled in the normal case...
- *
+ * Returns: ok to do the call, or error (lock busy, does not exist, etc)
*/
static ERTS_INLINE ErtsTryImmDrvCallResult
try_imm_drv_call(ErtsTryImmDrvCallState *sp)
@@ -3074,6 +3074,250 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
}
static void
+port_monitor_failure(Eterm port_id, Eterm origin, Eterm ref_DOWN)
+{
+ Process *origin_p;
+ ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK;
+ ASSERT(is_internal_pid(origin));
+
+ origin_p = erts_pid2proc(NULL, 0, origin, p_locks);
+ if (! origin_p) { return; }
+
+ /* Send the DOWN message immediately. Ref is made on the fly because
+ * caller has never seen it yet. */
+ erts_queue_monitor_message(origin_p, &p_locks, ref_DOWN,
+ am_port, port_id, am_noproc);
+ erts_smp_proc_unlock(origin_p, p_locks);
+}
+
+/* Origin wants to monitor port Prt. State contains possible error, which has
+ * happened just before. Name is either NIL or an atom, if user monitors
+ * a port by name. Ref is premade reference that will be returned to user */
+static void
+port_monitor(Port *prt, erts_aint32_t state, Eterm origin,
+ Eterm name, Eterm ref)
+{
+ Eterm name_or_nil = is_atom(name) ? name : NIL;
+
+ ASSERT(is_pid(origin));
+ ASSERT(is_atom(name) || is_port(name) || name == NIL);
+ ASSERT(is_internal_ref(ref));
+
+ if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) {
+ ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK;
+
+ Process *origin_p = erts_pid2proc(NULL, 0, origin, p_locks);
+ if (! origin_p) {
+ goto failure;
+ }
+ erts_add_monitor(&ERTS_P_MONITORS(origin_p), MON_ORIGIN, ref,
+ prt->common.id, name_or_nil);
+ erts_add_monitor(&ERTS_P_MONITORS(prt), MON_TARGET, ref,
+ origin, name_or_nil);
+
+ erts_smp_proc_unlock(origin_p, p_locks);
+ } else {
+failure:
+ port_monitor_failure(prt->common.id, origin, ref);
+ }
+}
+
+static int
+port_sig_monitor(Port *prt, erts_aint32_t state, int op,
+ ErtsProc2PortSigData *sigdp)
+{
+ Eterm hp[REF_THING_SIZE];
+ Eterm ref = make_internal_ref(&hp);
+ write_ref_thing(hp, sigdp->ref[0], sigdp->ref[1], sigdp->ref[2]);
+
+ if (op == ERTS_PROC2PORT_SIG_EXEC) {
+ /* erts_add_monitor call inside port_monitor will copy ref from hp */
+ port_monitor(prt, state,
+ sigdp->u.monitor.origin,
+ sigdp->u.monitor.name,
+ ref);
+ } else {
+ port_monitor_failure(sigdp->u.monitor.name,
+ sigdp->u.monitor.origin,
+ ref);
+ }
+ if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) {
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ }
+ return ERTS_PORT_REDS_MONITOR;
+}
+
+/* Creates monitor between Origin and Target. Ref must be initialized to
+ * a reference (ref may be rewritten to be used to serve additionally as a
+ * signal id). Name is atom if user monitors port by name or NIL */
+ErtsPortOpResult
+erts_port_monitor(Process *origin, Port *port, Eterm name, Eterm *refp)
+{
+ ErtsProc2PortSigData *sigdp;
+ ErtsTryImmDrvCallState try_call_state
+ = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
+ origin, port, ERTS_PORT_SFLGS_INVALID_LOOKUP,
+ 0,
+ 0, /* trap_ref is always set so !trap_ref always is false */
+ am_monitor);
+
+ ASSERT(origin);
+ ASSERT(port);
+ ASSERT(is_atom(name) || is_port(name));
+ ASSERT(refp);
+
+ switch (try_imm_drv_call(&try_call_state)) {
+ case ERTS_TRY_IMM_DRV_CALL_OK:
+ port_monitor(port, try_call_state.state, origin->common.id, name, *refp);
+ finalize_imm_drv_call(&try_call_state);
+ BUMP_REDS(origin, ERTS_PORT_REDS_MONITOR);
+ return ERTS_PORT_OP_DONE;
+ case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
+ return ERTS_PORT_OP_BADARG;
+ default:
+ break; /* Schedule call instead... */
+ }
+
+ sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_MONITOR;
+ sigdp->u.monitor.origin = origin->common.id;
+ sigdp->u.monitor.name = name; /* either named monitor, or port id */
+
+ /* Ref contents will be initialized here */
+ return erts_schedule_proc2port_signal(origin, port, origin->common.id,
+ refp, sigdp, 0, NULL,
+ port_sig_monitor);
+}
+
+static void
+port_demonitor_failure(Eterm port_id, Eterm origin, Eterm ref)
+{
+ Process *origin_p;
+ ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK;
+ ErtsMonitor *mon1;
+ ASSERT(is_internal_pid(origin));
+
+ origin_p = erts_pid2proc(NULL, 0, origin, rp_locks);
+ if (! origin_p) { return; }
+
+ /* do not send any DOWN messages, drop monitors on process */
+ mon1 = erts_remove_monitor(&ERTS_P_MONITORS(origin_p), ref);
+ if (mon1 != NULL) {
+ erts_destroy_monitor(mon1);
+ }
+
+ erts_smp_proc_unlock(origin_p, rp_locks);
+}
+
+/* Origin wants to demonitor port Prt. State contains possible error, which has
+ * happened just before. Ref is reference to monitor */
+static void
+port_demonitor(Port *port, erts_aint32_t state, Eterm origin, Eterm ref)
+{
+ ASSERT(port);
+ ASSERT(is_pid(origin));
+ ASSERT(is_internal_ref(ref));
+
+ if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) {
+ ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK;
+ Process *origin_p = erts_pid2proc(NULL, 0, origin, p_locks);
+ if (origin_p) {
+ ErtsMonitor *mon1 = erts_remove_monitor(&ERTS_P_MONITORS(origin_p),
+ ref);
+ if (mon1 != NULL) {
+ erts_destroy_monitor(mon1);
+ }
+ }
+ if (1) {
+ ErtsMonitor *mon2 = erts_remove_monitor(&ERTS_P_MONITORS(port),
+ ref);
+ if (mon2 != NULL) {
+ erts_destroy_monitor(mon2);
+ }
+ }
+ if (origin_p) { /* when origin is dying, it won't be found */
+ erts_smp_proc_unlock(origin_p, p_locks);
+ }
+ } else {
+ port_demonitor_failure(port->common.id, origin, ref);
+ }
+}
+
+static int
+port_sig_demonitor(Port *prt, erts_aint32_t state, int op,
+ ErtsProc2PortSigData *sigdp)
+{
+ Eterm hp[REF_THING_SIZE];
+ Eterm ref = make_internal_ref(&hp);
+ write_ref_thing(hp, sigdp->u.demonitor.ref[0],
+ sigdp->u.demonitor.ref[1],
+ sigdp->u.demonitor.ref[2]);
+ if (op == ERTS_PROC2PORT_SIG_EXEC) {
+ port_demonitor(prt, state, sigdp->u.demonitor.origin, ref);
+ } else {
+ port_demonitor_failure(sigdp->u.demonitor.name,
+ sigdp->u.demonitor.origin,
+ ref);
+ }
+ if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) {
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ }
+ return ERTS_PORT_REDS_DEMONITOR;
+}
+
+/* Removes monitor between origin and target, identified by ref.
+ * Mode defines normal or relaxed demonitor rules (process is at death) */
+ErtsPortOpResult erts_port_demonitor(Process *origin, ErtsDemonitorMode mode,
+ Port *target, Eterm ref,
+ Eterm *trap_ref)
+{
+ Process *c_p = mode == ERTS_PORT_DEMONITOR_NORMAL ? origin : NULL;
+ ErtsProc2PortSigData *sigdp;
+ ErtsTryImmDrvCallState try_call_state
+ = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
+ c_p,
+ target, ERTS_PORT_SFLGS_INVALID_LOOKUP,
+ 0,
+ !trap_ref,
+ am_demonitor);
+
+ ASSERT(origin);
+ ASSERT(target);
+ ASSERT(is_internal_ref(ref));
+
+ switch (try_imm_drv_call(&try_call_state)) {
+ case ERTS_TRY_IMM_DRV_CALL_OK:
+ port_demonitor(target, try_call_state.state, origin->common.id, ref);
+ finalize_imm_drv_call(&try_call_state);
+ if (mode == ERTS_PORT_DEMONITOR_NORMAL) {
+ BUMP_REDS(origin, ERTS_PORT_REDS_DEMONITOR);
+ }
+ return ERTS_PORT_OP_DONE;
+ case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
+ return ERTS_PORT_OP_BADARG;
+ default:
+ break; /* Schedule call instead... */
+ }
+
+ sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_DEMONITOR;
+ sigdp->u.demonitor.origin = origin->common.id;
+ sigdp->u.demonitor.name = target->common.id;
+ {
+ RefThing *reft = ref_thing_ptr(ref);
+ /* Start from 1 skip ref arity */
+ sys_memcpy(sigdp->u.demonitor.ref,
+ internal_thing_ref_numbers(reft),
+ sizeof(sigdp->u.demonitor.ref));
+ }
+
+ /* Ref contents will be initialized here */
+ return erts_schedule_proc2port_signal(c_p, target, origin->common.id,
+ trap_ref, sigdp, 0, NULL,
+ port_sig_demonitor);
+}
+
+static void
init_ack_send_reply(Port *port, Eterm resp)
{
@@ -3942,23 +4186,30 @@ erts_terminate_port(Port *pp)
terminate_port(pp);
}
+static void port_fire_one_monitor(ErtsMonitor *mon, void *ctx0);
static void sweep_one_monitor(ErtsMonitor *mon, void *vpsc)
{
- ErtsMonitor *rmon;
- Process *rp;
+ switch (mon->type) {
+ case MON_ORIGIN: {
+ ErtsMonitor *rmon;
+ Process *rp;
- ASSERT(mon->type == MON_ORIGIN);
- ASSERT(is_internal_pid(mon->pid));
- rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK);
- if (!rp) {
- goto done;
- }
- rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- if (rmon == NULL) {
- goto done;
+ ASSERT(is_internal_pid(mon->pid));
+ rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK);
+ if (!rp) {
+ goto done;
+ }
+ rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ if (rmon == NULL) {
+ goto done;
+ }
+ erts_destroy_monitor(rmon);
+ } break;
+ case MON_TARGET: {
+ port_fire_one_monitor(mon, vpsc); /* forward call */
+ } break;
}
- erts_destroy_monitor(rmon);
done:
erts_destroy_monitor(mon);
}
@@ -4039,6 +4290,43 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
erts_destroy_link(lnk);
}
+static void
+port_fire_one_monitor(ErtsMonitor *mon, void *ctx0)
+{
+ Process *origin;
+ ErtsProcLocks origin_locks;
+
+ if (mon->type != MON_TARGET || ! is_pid(mon->pid)) {
+ return;
+ }
+ /*
+ * Proceed here if someone monitors us, we (port) are the target and
+ * origin is some process
+ */
+ origin_locks = ERTS_PROC_LOCKS_MSG_SEND | ERTS_PROC_LOCK_LINK;
+
+ origin = erts_pid2proc(NULL, 0, mon->pid, origin_locks);
+ if (origin) {
+ DeclareTmpHeapNoproc(lhp,3);
+ SweepContext *ctx = (SweepContext *)ctx0;
+ ErtsMonitor *rmon;
+ Eterm watched = (is_atom(mon->name)
+ ? TUPLE2(lhp, mon->name, erts_this_dist_entry->sysname)
+ : ctx->port->common.id);
+
+ erts_queue_monitor_message(origin, &origin_locks, mon->ref, am_port,
+ watched, ctx->reason);
+ UnUseTmpHeapNoproc(3);
+
+ rmon = erts_remove_monitor(&ERTS_P_MONITORS(origin), mon->ref);
+ erts_smp_proc_unlock(origin, origin_locks);
+
+ if (rmon) {
+ erts_destroy_monitor(rmon);
+ }
+ }
+}
+
/* 'from' is sending 'this_port' an exit signal, (this_port must be internal).
* If reason is normal we don't do anything, *unless* from is our connected
* process in which case we close the port. Any other reason kills the port.
@@ -4050,39 +4338,40 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
*/
int
-erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed,
+erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed,
int drop_normal)
{
ErtsLink *lnk;
- Eterm rreason;
+ Eterm modified_reason;
erts_aint32_t state, set_state_flags;
ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- rreason = (reason == am_kill) ? am_killed : reason;
+ modified_reason = (reason == am_kill) ? am_killed : reason;
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_exit)) {
DTRACE_CHARBUF(from_str, DTRACE_TERM_BUF_SIZE);
DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);
- DTRACE_CHARBUF(rreason_str, 64);
+ DTRACE_CHARBUF(reason_str, 64);
erts_snprintf(from_str, sizeof(DTRACE_CHARBUF_NAME(from_str)), "%T", from);
- dtrace_port_str(p, port_str);
- erts_snprintf(rreason_str, sizeof(DTRACE_CHARBUF_NAME(rreason_str)), "%T", rreason);
- DTRACE4(port_exit, from_str, port_str, p->name, rreason_str);
+ dtrace_port_str(prt, port_str);
+ erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)), "%T",
+ modified_reason);
+ DTRACE4(port_exit, from_str, port_str, prt->name, reason_str);
}
#endif
- state = erts_atomic32_read_nob(&p->state);
+ state = erts_atomic32_read_nob(&prt->state);
if (state & (ERTS_PORT_SFLGS_DEAD
| ERTS_PORT_SFLG_EXITING
| ERTS_PORT_SFLG_CLOSING))
return 0;
- if (reason == am_normal && from != ERTS_PORT_GET_CONNECTED(p)
- && from != p->common.id && drop_normal) {
+ if (reason == am_normal && from != ERTS_PORT_GET_CONNECTED(prt)
+ && from != prt->common.id && drop_normal) {
return 0;
}
@@ -4090,53 +4379,54 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed,
if (send_closed)
set_state_flags |= ERTS_PORT_SFLG_SEND_CLOSED;
- erts_port_task_sched_enter_exiting_state(&p->sched);
+ erts_port_task_sched_enter_exiting_state(&prt->sched);
- state = erts_atomic32_read_bor_mb(&p->state, set_state_flags);
+ state = erts_atomic32_read_bor_mb(&prt->state, set_state_flags);
state |= set_state_flags;
- if (IS_TRACED_FL(p, F_TRACE_PORTS))
- trace_port(p, am_closed, reason);
+ if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_closed, reason);
- erts_trace_check_exiting(p->common.id);
+ erts_trace_check_exiting(prt->common.id);
- set_busy_port(ERTS_Port2ErlDrvPort(p), 0);
+ set_busy_port(ERTS_Port2ErlDrvPort(prt), 0);
- if (p->common.u.alive.reg != NULL)
- (void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name);
+ if (prt->common.u.alive.reg != NULL)
+ (void) erts_unregister_name(NULL, 0, prt, prt->common.u.alive.reg->name);
{
- SweepContext sc = {p, rreason};
- lnk = ERTS_P_LINKS(p);
- ERTS_P_LINKS(p) = NULL;
+ SweepContext sc = {prt, modified_reason};
+ lnk = ERTS_P_LINKS(prt);
+ ERTS_P_LINKS(prt) = NULL;
erts_sweep_links(lnk, &sweep_one_link, &sc);
}
- DRV_MONITOR_LOCK_PDL(p);
+ DRV_MONITOR_LOCK_PDL(prt);
{
- ErtsMonitor *moni = ERTS_P_MONITORS(p);
- ERTS_P_MONITORS(p) = NULL;
- erts_sweep_monitors(moni, &sweep_one_monitor, NULL);
+ SweepContext ctx = {prt, modified_reason};
+ ErtsMonitor *moni = ERTS_P_MONITORS(prt);
+ ERTS_P_MONITORS(prt) = NULL;
+ erts_sweep_monitors(moni, &sweep_one_monitor, &ctx);
}
- DRV_MONITOR_UNLOCK_PDL(p);
+ DRV_MONITOR_UNLOCK_PDL(prt);
- if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) {
- erts_do_net_exits(p->dist_entry, rreason);
- erts_deref_dist_entry(p->dist_entry);
- p->dist_entry = NULL;
- erts_atomic32_read_band_relb(&p->state,
+ if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && prt->dist_entry) {
+ erts_do_net_exits(prt->dist_entry, modified_reason);
+ erts_deref_dist_entry(prt->dist_entry);
+ prt->dist_entry = NULL;
+ erts_atomic32_read_band_relb(&prt->state,
~ERTS_PORT_SFLG_DISTRIBUTION);
}
- if ((reason != am_kill) && !is_port_ioq_empty(p)) {
+ if ((reason != am_kill) && !is_port_ioq_empty(prt)) {
/* must turn exiting flag off */
- erts_atomic32_read_bset_relb(&p->state,
+ erts_atomic32_read_bset_relb(&prt->state,
(ERTS_PORT_SFLG_EXITING
| ERTS_PORT_SFLG_CLOSING),
ERTS_PORT_SFLG_CLOSING);
- flush_port(p);
+ flush_port(prt);
}
else {
- terminate_port(p);
+ terminate_port(prt);
}
return 1;