diff options
author | Rickard Green <[email protected]> | 2018-03-07 01:17:21 +0100 |
---|---|---|
committer | Rickard Green <[email protected]> | 2018-03-21 10:27:03 +0100 |
commit | 4bc282d812cc2c49aa3e2d073e96c720f16aa270 (patch) | |
tree | a7b00cd079368590dc09f62a4d5402be157462ca /erts/emulator/beam/io.c | |
parent | 348a4e057db36fac13f1551c0a1c17f0d376da48 (diff) | |
download | otp-4bc282d812cc2c49aa3e2d073e96c720f16aa270.tar.gz otp-4bc282d812cc2c49aa3e2d073e96c720f16aa270.tar.bz2 otp-4bc282d812cc2c49aa3e2d073e96c720f16aa270.zip |
Implementation of true asynchronous signaling between processes
Communication between Erlang processes has conceptually always been
performed through asynchronous signaling. The runtime system
implementation has however previously preformed most operation
synchronously. In a system with only one true thread of execution, this
is not problematic (often the opposite). In a system with multiple threads
of execution (as current runtime system implementation with SMP support)
it becomes problematic. This since it often involves locking of structures
when updating them which in turn cause resource contention. Utilizing
true asynchronous communication often avoids these resource contention
issues.
The case that triggered this change was contention on the link lock due
to frequent updates of the monitor trees during communication with a
frequently used server. The signal order delivery guarantees of the
language makes it hard to change the implementation of only some signals
to use true asynchronous signaling. Therefore the implementations
of (almost) all signals have been changed.
Currently the following signals have been implemented as true
asynchronous signals:
- Message signals
- Exit signals
- Monitor signals
- Demonitor signals
- Monitor triggered signals (DOWN, CHANGE, etc)
- Link signals
- Unlink signals
- Group leader signals
All of the above already defined as asynchronous signals in the
language. The implementation of messages signals was quite
asynchronous to begin with, but had quite strict delivery constraints
due to the ordering guarantees of signals between a pair of processes.
The previously used message queue partitioned into two halves has been
replaced by a more general signal queue partitioned into three parts
that service all kinds of signals. More details regarding the signal
queue can be found in comments in the erl_proc_sig_queue.h file.
The monitor and link implementations have also been completely replaced
in order to fit the new asynchronous signaling implementation as good
as possible. More details regarding the new monitor and link
implementations can be found in the erl_monitor_link.h file.
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r-- | erts/emulator/beam/io.c | 755 |
1 files changed, 245 insertions, 510 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index e4a5f2b6b6..b2afdc6bf2 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1996-2017. All Rights Reserved. + * Copyright Ericsson AB 1996-2018. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ #include "erl_hl_timer.h" #include "erl_time.h" #include "erl_io_queue.h" +#include "erl_proc_sig_queue.h" extern ErlDrvEntry fd_driver_entry; extern ErlDrvEntry vanilla_driver_entry; @@ -366,6 +367,7 @@ static Port *create_port(char *name, prt->drv_ptr = driver; ERTS_P_LINKS(prt) = NULL; ERTS_P_MONITORS(prt) = NULL; + ERTS_P_LT_MONITORS(prt) = NULL; prt->linebuf = NULL; prt->suspended = NULL; erts_init_port_data(prt); @@ -748,8 +750,8 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ Port *creator_port; Port* port; erts_driver_t *driver; - Process *rp; erts_mtx_t *driver_lock = NULL; + ErtsLinkData *ldp; ERTS_CHK_NO_PROC_LOCKS; @@ -761,17 +763,13 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ if (creator_port == ERTS_INVALID_ERL_DRV_PORT) return ERTS_INVALID_ERL_DRV_PORT; - rp = erts_proc_lookup(pid); - if (!rp) - return ERTS_INVALID_ERL_DRV_PORT; - ERTS_LC_ASSERT(erts_lc_is_port_locked(creator_port)); driver = creator_port->drv_ptr; erts_rwmtx_rlock(&erts_driver_list_lock); if (!erts_ddll_driver_ok(driver->handle)) { erts_rwmtx_runlock(&erts_driver_list_lock); - return ERTS_INVALID_ERL_DRV_PORT; + return ERTS_INVALID_ERL_DRV_PORT; } if (driver->handle != NULL) { @@ -799,9 +797,15 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ } ERTS_LC_ASSERT(erts_lc_is_port_locked(port)); - erts_proc_lock(rp, ERTS_PROC_LOCK_LINK); - if (ERTS_PROC_IS_EXITING(rp)) { - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + ldp = erts_link_create(ERTS_LNK_TYPE_PORT, + port->common.id, pid); + ASSERT(ldp->a.other.item == pid); + ASSERT(ldp->b.other.item == port->common.id); + erts_link_tree_insert(&ERTS_P_LINKS(port), &ldp->a); + + if (!erts_proc_sig_send_link(NULL, pid, &ldp->b)) { + erts_link_tree_delete(&ERTS_P_LINKS(port), &ldp->a); + erts_link_release_both(ldp); if (driver->handle) { erts_rwmtx_rlock(&erts_driver_list_lock); erts_ddll_decrement_port_count(driver->handle); @@ -812,10 +816,6 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ return ERTS_INVALID_ERL_DRV_PORT; } - erts_add_link(&ERTS_P_LINKS(port), LINK_PID, pid); - erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, port->common.id); - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - if (!driver_lock) { ErtsXPortsList *xplp = xports_list_alloc(); xplp->port = port; @@ -1167,21 +1167,10 @@ erts_schedule_proc2port_signal(Process *c_p, * otherwise, next receive will *not* work * as expected! */ - erts_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); - - if (ERTS_PROC_PENDING_EXIT(c_p)) { - /* need to exit caller instead */ - erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); - KILL_CATCHES(c_p); - c_p->freason = EXC_EXIT; - return ERTS_PORT_OP_CALLER_EXIT; - } - - ERTS_MSGQ_MV_INQ2PRIVQ(c_p); - c_p->msg.save = c_p->msg.last; - erts_proc_unlock(c_p, (ERTS_PROC_LOCKS_MSG_RECEIVE - | ERTS_PROC_LOCK_MAIN)); + ERTS_RECV_MARK_SAVE(c_p); + ERTS_RECV_MARK_SET(c_p); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); } @@ -1239,29 +1228,12 @@ erts_schedule_port2port_signal(Eterm port_num, ErtsProc2PortSigData *sigdp, static ERTS_INLINE void send_badsig(Port *prt) { - ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; - Process* rp; Eterm connected = ERTS_PORT_GET_CONNECTED(prt); ERTS_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(erts_get_scheduler_id()); - ASSERT(is_internal_pid(connected)); - - rp = erts_proc_lookup_raw(connected); - if (rp) { - erts_proc_lock(rp, rp_locks); - if (!ERTS_PROC_IS_EXITING(rp)) - (void) erts_send_exit_signal(NULL, - prt->common.id, - rp, - &rp_locks, - am_badsig, - NIL, - NULL, - 0); - if (rp_locks) - erts_proc_unlock(rp, rp_locks); - } /* exit sent */ + erts_proc_sig_send_exit(NULL, prt->common.id, connected, + am_badsig, NIL, 0); } /* send_badsig */ static void @@ -2194,11 +2166,11 @@ call_deliver_port_exit(int bang_op, } if (broken_link) { - ErtsLink *lnk = erts_remove_link(&ERTS_P_LINKS(prt), from); - if (lnk) - erts_destroy_link(lnk); - else + ErtsLink *lnk = erts_link_tree_lookup(ERTS_P_LINKS(prt), from); + if (!lnk) return ERTS_PORT_OP_DROPPED; + erts_link_tree_delete(&ERTS_P_LINKS(prt), lnk); + erts_link_release(lnk); } if (IS_TRACED_FL(prt, F_TRACE_RECEIVE)) @@ -2367,27 +2339,30 @@ set_port_connected(int bang_op, #endif } else { /* Port BIF operation */ - Process *rp = erts_proc_lookup_raw(connect); - if (!rp) - return ERTS_PORT_OP_DROPPED; - erts_proc_lock(rp, ERTS_PROC_LOCK_LINK); - if (ERTS_PROC_IS_EXITING(rp)) { - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - return ERTS_PORT_OP_DROPPED; - } - - erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, prt->common.id); - erts_add_link(&ERTS_P_LINKS(prt), LINK_PID, connect); - - if (IS_TRACED_FL(rp, F_TRACE_PROCS)) - trace_proc(NULL, 0, rp, am_getting_linked, prt->common.id); + int created; + ErtsLink *lnk; + + if (is_not_internal_pid(connect)) + return ERTS_PORT_OP_DROPPED; + + lnk = erts_link_tree_lookup_create(&ERTS_P_LINKS(prt), &created, + ERTS_LNK_TYPE_PORT, prt->common.id, + connect); + if (created) { + ErtsLinkData *ldp; + ErtsLink *olnk = erts_link_to_other(lnk, &ldp); + ASSERT(olnk->other.item == prt->common.id); + if (!erts_proc_sig_send_link(NULL, connect, olnk)) { + erts_link_tree_delete(&ERTS_P_LINKS(prt), lnk); + erts_link_release_both(ldp); + return ERTS_PORT_OP_DROPPED; + } + if (IS_TRACED_FL(prt, F_TRACE_PORTS)) + trace_port(prt, am_getting_linked, connect); + } ERTS_PORT_SET_CONNECTED(prt, connect); - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - - if (IS_TRACED_FL(prt, F_TRACE_PORTS)) - trace_port(prt, am_getting_linked, connect); if (IS_TRACED_FL(prt, F_TRACE_RECEIVE)) trace_port_receive(prt, from, am_connect, connect); if (IS_TRACED_FL(prt, F_TRACE_SEND)) { @@ -2494,13 +2469,24 @@ erts_port_connect(Process *c_p, } static void -port_unlink(Port *prt, Eterm from) +port_unlink(Port *prt, ErtsLink *lnk) { - ErtsLink *lnk = erts_remove_link(&ERTS_P_LINKS(prt), from); - if (lnk) { + ErtsLinkData *ldp; + ErtsLink *dlnk, *llnk; + + llnk = erts_link_to_other(lnk, &ldp); + dlnk = erts_link_tree_key_delete(&ERTS_P_LINKS(prt), llnk); + if (!dlnk) + erts_link_release(lnk); + else { if (IS_TRACED_FL(prt, F_TRACE_PORTS)) - trace_port(prt, am_getting_unlinked, from); - erts_destroy_link(lnk); + trace_port(prt, am_getting_unlinked, llnk->other.item); + if (dlnk == llnk) + erts_link_release_both(ldp); + else { + erts_link_release(lnk); + erts_link_release(dlnk); + } } } @@ -2508,14 +2494,14 @@ static int port_sig_unlink(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { if (op == ERTS_PROC2PORT_SIG_EXEC) - port_unlink(prt, sigdp->u.unlink.from); + port_unlink(prt, sigdp->u.unlink.lnk); if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); return ERTS_PORT_REDS_UNLINK; } ErtsPortOpResult -erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp) +erts_port_unlink(Process *c_p, Port *prt, ErtsLink *lnk, Eterm *refp) { ErtsProc2PortSigData *sigdp; ErtsTryImmDrvCallState try_call_state @@ -2528,7 +2514,7 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp) switch (try_imm_drv_call(&try_call_state)) { case ERTS_TRY_IMM_DRV_CALL_OK: - port_unlink(prt, from); + port_unlink(prt, lnk); finalize_imm_drv_call(&try_call_state); BUMP_REDS(c_p, ERTS_PORT_REDS_UNLINK); return ERTS_PORT_OP_DONE; @@ -2541,11 +2527,12 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp) sigdp = erts_port_task_alloc_p2p_sig_data(); sigdp->flags = ERTS_P2P_SIG_TYPE_UNLINK; - sigdp->u.unlink.from = from; - + sigdp->u.unlink.port_id = prt->common.id; + sigdp->u.unlink.lnk = lnk; + return erts_schedule_proc2port_signal(c_p, prt, - c_p ? c_p->common.id : from, + c_p->common.id, refp, sigdp, 0, @@ -2554,45 +2541,24 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp) } static void -port_link_failure(Eterm port_id, Eterm linker) +port_link_failure(Eterm port_id, ErtsLink *lnk) { - Process *rp; - ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND; - ASSERT(is_internal_pid(linker)); - rp = erts_pid2proc(NULL, 0, linker, rp_locks); - if (rp) { - ErtsLink *rlnk = erts_remove_link(&ERTS_P_LINKS(rp), port_id); - if (rlnk) { - int xres = erts_send_exit_signal(NULL, - port_id, - rp, - &rp_locks, - am_noproc, - NIL, - NULL, - 0); - if (xres >= 0) { - /* We didn't exit the process and it is traced */ - if (IS_TRACED_FL(rp, F_TRACE_PROCS)) - trace_proc(NULL, 0, rp, am_getting_unlinked, port_id); - } - if (rp_locks) - erts_proc_unlock(rp, rp_locks); - } - } + erts_proc_sig_send_link_exit(NULL, port_id, lnk, am_noproc, NIL); } static void -port_link(Port *prt, erts_aint32_t state, Eterm to) +port_link(Port *prt, erts_aint32_t state, ErtsLink *lnk) { - if (IS_TRACED_FL(prt, F_TRACE_PORTS)) - trace_port(prt, am_getting_linked, to); - if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) { - erts_add_link(&ERTS_P_LINKS(prt), LINK_PID, to); - } else { - port_link_failure(prt->common.id, to); - if (IS_TRACED_FL(prt, F_TRACE_PORTS)) - trace_port(prt, am_unlink, to); + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) + port_link_failure(prt->common.id, lnk); + else { + ErtsLink *rlnk; + rlnk = erts_link_tree_insert_addr_replace(&ERTS_P_LINKS(prt), + lnk); + if (rlnk) + erts_link_release(rlnk); + else if (IS_TRACED_FL(prt, F_TRACE_PORTS)) + trace_port(prt, am_getting_linked, lnk->other.item); } } @@ -2600,17 +2566,16 @@ static int port_sig_link(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { if (op == ERTS_PROC2PORT_SIG_EXEC) - port_link(prt, state, sigdp->u.link.to); - else { - port_link_failure(sigdp->u.link.port, sigdp->u.link.to); - } + port_link(prt, state, sigdp->u.link.lnk); + else + port_link_failure(sigdp->u.link.port_id, sigdp->u.link.lnk); if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); return ERTS_PORT_REDS_LINK; } ErtsPortOpResult -erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) +erts_port_link(Process *c_p, Port *prt, ErtsLink *lnk, Eterm *refp) { ErtsProc2PortSigData *sigdp; ErtsTryImmDrvCallState try_call_state @@ -2623,7 +2588,7 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) switch (try_imm_drv_call(&try_call_state)) { case ERTS_TRY_IMM_DRV_CALL_OK: - port_link(prt, try_call_state.state, to); + port_link(prt, try_call_state.state, lnk); finalize_imm_drv_call(&try_call_state); BUMP_REDS(c_p, ERTS_PORT_REDS_LINK); return ERTS_PORT_OP_DONE; @@ -2636,12 +2601,12 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) sigdp = erts_port_task_alloc_p2p_sig_data(); sigdp->flags = ERTS_P2P_SIG_TYPE_LINK; - sigdp->u.link.port = prt->common.id; - sigdp->u.link.to = to; + sigdp->u.link.port_id = prt->common.id; + sigdp->u.link.lnk = lnk; return erts_schedule_proc2port_signal(c_p, prt, - c_p ? c_p->common.id : to, + c_p->common.id, refp, sigdp, 0, @@ -2650,51 +2615,23 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) } static void -port_monitor_failure(Eterm port_id, Eterm origin, Eterm ref_DOWN) +port_monitor_failure(Eterm port_id, ErtsMonitor *mon) { - Process *origin_p; - ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK; - ASSERT(is_internal_pid(origin)); - - origin_p = erts_pid2proc(NULL, 0, origin, p_locks); - if (! origin_p) { return; } - - /* Send the DOWN message immediately. Ref is made on the fly because - * caller has never seen it yet. */ - erts_queue_monitor_message(origin_p, &p_locks, ref_DOWN, - am_port, port_id, am_noproc); - erts_proc_unlock(origin_p, p_locks); + erts_proc_sig_send_monitor_down(mon, am_noproc); } /* Origin wants to monitor port Prt. State contains possible error, which has * happened just before. Name is either NIL or an atom, if user monitors * a port by name. Ref is premade reference that will be returned to user */ static void -port_monitor(Port *prt, erts_aint32_t state, Eterm origin, - Eterm name, Eterm ref) +port_monitor(Port *prt, erts_aint32_t state, ErtsMonitor *mon) { - Eterm name_or_nil = is_atom(name) ? name : NIL; - - ASSERT(is_pid(origin)); - ASSERT(is_atom(name) || is_port(name) || name == NIL); - ASSERT(is_internal_ordinary_ref(ref)); - - if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) { - ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK; - - Process *origin_p = erts_pid2proc(NULL, 0, origin, p_locks); - if (! origin_p) { - goto failure; - } - erts_add_monitor(&ERTS_P_MONITORS(origin_p), MON_ORIGIN, ref, - prt->common.id, name_or_nil); - erts_add_monitor(&ERTS_P_MONITORS(prt), MON_TARGET, ref, - origin, name_or_nil); - - erts_proc_unlock(origin_p, p_locks); - } else { -failure: - port_monitor_failure(prt->common.id, origin, ref); + ASSERT(prt); + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) + port_monitor_failure(prt->common.id, mon); + else { + ASSERT(erts_monitor_is_target(mon)); + erts_monitor_list_insert(&ERTS_P_LT_MONITORS(prt), mon); } } @@ -2702,24 +2639,11 @@ static int port_sig_monitor(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { - Eterm hp[ERTS_REF_THING_SIZE]; - Eterm ref = make_internal_ref(&hp); - write_ref_thing(hp, sigdp->ref[0], sigdp->ref[1], sigdp->ref[2]); - - if (op == ERTS_PROC2PORT_SIG_EXEC) { - /* erts_add_monitor call inside port_monitor will copy ref from hp */ - port_monitor(prt, state, - sigdp->u.monitor.origin, - sigdp->u.monitor.name, - ref); - } else { - port_monitor_failure(sigdp->u.monitor.name, - sigdp->u.monitor.origin, - ref); - } - if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) { - port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); - } + if (op == ERTS_PROC2PORT_SIG_EXEC) + port_monitor(prt, state, sigdp->u.monitor.mon); + else + port_monitor_failure(sigdp->u.monitor.port_id, + sigdp->u.monitor.mon); return ERTS_PORT_REDS_MONITOR; } @@ -2727,95 +2651,63 @@ port_sig_monitor(Port *prt, erts_aint32_t state, int op, * a reference (ref may be rewritten to be used to serve additionally as a * signal id). Name is atom if user monitors port by name or NIL */ ErtsPortOpResult -erts_port_monitor(Process *origin, Port *port, Eterm name, Eterm *refp) +erts_port_monitor(Process *c_p, Port *port, ErtsMonitor *mon) { ErtsProc2PortSigData *sigdp; ErtsTryImmDrvCallState try_call_state = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( - origin, port, ERTS_PORT_SFLGS_INVALID_LOOKUP, + c_p, + port, + ERTS_PORT_SFLGS_INVALID_LOOKUP, 0, - 0, /* trap_ref is always set so !trap_ref always is false */ + !0, am_monitor); - ASSERT(origin); + ASSERT(c_p); ASSERT(port); - ASSERT(is_atom(name) || is_port(name)); - ASSERT(refp); + ASSERT(mon); switch (try_imm_drv_call(&try_call_state)) { case ERTS_TRY_IMM_DRV_CALL_OK: - port_monitor(port, try_call_state.state, origin->common.id, name, *refp); + port_monitor(port, try_call_state.state, mon); finalize_imm_drv_call(&try_call_state); - BUMP_REDS(origin, ERTS_PORT_REDS_MONITOR); + BUMP_REDS(c_p, ERTS_PORT_REDS_MONITOR); return ERTS_PORT_OP_DONE; case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: - return ERTS_PORT_OP_BADARG; + return ERTS_PORT_OP_DROPPED; default: break; /* Schedule call instead... */ } sigdp = erts_port_task_alloc_p2p_sig_data(); sigdp->flags = ERTS_P2P_SIG_TYPE_MONITOR; - sigdp->u.monitor.origin = origin->common.id; - sigdp->u.monitor.name = name; /* either named monitor, or port id */ + sigdp->u.monitor.port_id = port->common.id; + sigdp->u.monitor.mon = mon; /* Ref contents will be initialized here */ - return erts_schedule_proc2port_signal(origin, port, origin->common.id, - refp, sigdp, 0, NULL, + return erts_schedule_proc2port_signal(c_p, + port, + c_p->common.id, + NULL, + sigdp, + 0, + NULL, port_sig_monitor); } -static void -port_demonitor_failure(Eterm port_id, Eterm origin, Eterm ref) -{ - Process *origin_p; - ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; - ErtsMonitor *mon1; - ASSERT(is_internal_pid(origin)); - - origin_p = erts_pid2proc(NULL, 0, origin, rp_locks); - if (! origin_p) { return; } - - /* do not send any DOWN messages, drop monitors on process */ - mon1 = erts_remove_monitor(&ERTS_P_MONITORS(origin_p), ref); - if (mon1 != NULL) { - erts_destroy_monitor(mon1); - } - - erts_proc_unlock(origin_p, rp_locks); -} - /* Origin wants to demonitor port Prt. State contains possible error, which has * happened just before. Ref is reference to monitor */ static void -port_demonitor(Port *port, erts_aint32_t state, Eterm origin, Eterm ref) +port_demonitor(Port *port, erts_aint32_t state, ErtsMonitor *mon) { - ASSERT(port); - ASSERT(is_pid(origin)); - ASSERT(is_internal_ref(ref)); - - if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) { - ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK; - Process *origin_p = erts_pid2proc(NULL, 0, origin, p_locks); - if (origin_p) { - ErtsMonitor *mon1 = erts_remove_monitor(&ERTS_P_MONITORS(origin_p), - ref); - if (mon1 != NULL) { - erts_destroy_monitor(mon1); - } - } - if (1) { - ErtsMonitor *mon2 = erts_remove_monitor(&ERTS_P_MONITORS(port), - ref); - if (mon2 != NULL) { - erts_destroy_monitor(mon2); - } - } - if (origin_p) { /* when origin is dying, it won't be found */ - erts_proc_unlock(origin_p, p_locks); - } - } else { - port_demonitor_failure(port->common.id, origin, ref); + ErtsMonitorData *mdp = erts_monitor_to_data(mon); + ASSERT(port && mon); + ASSERT(erts_monitor_is_origin(mon)); + if (!erts_monitor_is_in_table(&mdp->target)) + erts_monitor_release(mon); + else { + erts_monitor_list_delete(&ERTS_P_LT_MONITORS(port), &mdp->target); + erts_monitor_release_both(mdp); } } @@ -2823,73 +2715,47 @@ static int port_sig_demonitor(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { - Eterm hp[ERTS_REF_THING_SIZE]; - Eterm ref = make_internal_ref(&hp); - write_ref_thing(hp, sigdp->u.demonitor.ref[0], - sigdp->u.demonitor.ref[1], - sigdp->u.demonitor.ref[2]); - if (op == ERTS_PROC2PORT_SIG_EXEC) { - port_demonitor(prt, state, sigdp->u.demonitor.origin, ref); - } else { - port_demonitor_failure(sigdp->u.demonitor.name, - sigdp->u.demonitor.origin, - ref); - } - if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) { - port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); - } + if (op == ERTS_PROC2PORT_SIG_EXEC) + port_demonitor(prt, state, sigdp->u.demonitor.mon); + else + erts_monitor_release(sigdp->u.demonitor.mon); return ERTS_PORT_REDS_DEMONITOR; } -/* Removes monitor between origin and target, identified by ref. - * Mode defines normal or relaxed demonitor rules (process is at death) */ -ErtsPortOpResult erts_port_demonitor(Process *origin, ErtsDemonitorMode mode, - Port *target, Eterm ref, - Eterm *trap_ref) +ErtsPortOpResult +erts_port_demonitor(Process *c_p, Port *prt, ErtsMonitor *mon) { - Process *c_p = mode == ERTS_PORT_DEMONITOR_NORMAL ? origin : NULL; ErtsProc2PortSigData *sigdp; ErtsTryImmDrvCallState try_call_state = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( c_p, - target, ERTS_PORT_SFLGS_INVALID_LOOKUP, + prt, ERTS_PORT_SFLGS_INVALID_LOOKUP, 0, - !trap_ref, + !0, am_demonitor); - ASSERT(origin); - ASSERT(target); - ASSERT(is_internal_ref(ref)); + ASSERT(c_p && prt && mon); switch (try_imm_drv_call(&try_call_state)) { case ERTS_TRY_IMM_DRV_CALL_OK: - port_demonitor(target, try_call_state.state, origin->common.id, ref); + port_demonitor(prt, try_call_state.state, mon); finalize_imm_drv_call(&try_call_state); - if (mode == ERTS_PORT_DEMONITOR_NORMAL) { - BUMP_REDS(origin, ERTS_PORT_REDS_DEMONITOR); - } + BUMP_REDS(c_p, ERTS_PORT_REDS_DEMONITOR); return ERTS_PORT_OP_DONE; case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: - return ERTS_PORT_OP_BADARG; + return ERTS_PORT_OP_DROPPED; default: break; /* Schedule call instead... */ } sigdp = erts_port_task_alloc_p2p_sig_data(); sigdp->flags = ERTS_P2P_SIG_TYPE_DEMONITOR; - sigdp->u.demonitor.origin = origin->common.id; - sigdp->u.demonitor.name = target->common.id; - { - Uint32 *nums = internal_ref_numbers(ref); - /* Start from 1 skip ref arity */ - sys_memcpy(sigdp->u.demonitor.ref, - nums, - sizeof(sigdp->u.demonitor.ref)); - } + sigdp->u.demonitor.port_id = prt->common.id; + sigdp->u.demonitor.mon = mon; /* Ref contents will be initialized here */ - return erts_schedule_proc2port_signal(c_p, target, origin->common.id, - trap_ref, sigdp, 0, NULL, + return erts_schedule_proc2port_signal(c_p, prt, c_p->common.id, + NULL, sigdp, 0, NULL, port_sig_demonitor); } @@ -2898,11 +2764,13 @@ init_ack_send_reply(Port *port, Eterm resp) { if (!is_internal_port(resp)) { - Process *rp = erts_proc_lookup_raw(port->async_open_port->to); - erts_proc_lock(rp, ERTS_PROC_LOCK_LINK); - erts_remove_link(&ERTS_P_LINKS(port), port->async_open_port->to); - erts_remove_link(&ERTS_P_LINKS(rp), port->common.id); - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + Eterm proc = port->async_open_port->to; + ErtsLink *lnk = erts_link_tree_lookup(ERTS_P_LINKS(port), + proc); + if (lnk) { + erts_link_tree_delete(&ERTS_P_LINKS(port), lnk); + erts_proc_sig_send_unlink(NULL, lnk); + } } port_sched_op_reply(port->async_open_port->to, port->async_open_port->ref, @@ -3680,6 +3548,7 @@ terminate_port(Port *prt) ASSERT(!ERTS_P_LINKS(prt)); ASSERT(!ERTS_P_MONITORS(prt)); + ASSERT(!ERTS_P_LT_MONITORS(prt)); /* state may be altered by kill_port() below */ state = erts_atomic32_read_band_nob(&prt->state, @@ -3767,146 +3636,25 @@ erts_terminate_port(Port *pp) terminate_port(pp); } -static void port_fire_one_monitor(ErtsMonitor *mon, void *ctx0); -static void sweep_one_monitor(ErtsMonitor *mon, void *vpsc) -{ - switch (mon->type) { - case MON_ORIGIN: { - ErtsMonitor *rmon; - Process *rp; - - ASSERT(is_internal_pid(mon->u.pid)); - rp = erts_pid2proc(NULL, 0, mon->u.pid, ERTS_PROC_LOCK_LINK); - if (!rp) { - goto done; - } - rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - if (rmon == NULL) { - goto done; - } - erts_destroy_monitor(rmon); - } break; - case MON_TARGET: { - port_fire_one_monitor(mon, vpsc); /* forward call */ - } break; - } - done: - erts_destroy_monitor(mon); -} - - - typedef struct { - Port *port; + Eterm port_id; Eterm reason; -} SweepContext; +} ErtsPortExitContext; -static void sweep_one_link(ErtsLink *lnk, void *vpsc) +static void link_port_exit(ErtsLink *lnk, void *vpectxt) { - SweepContext *psc = vpsc; - DistEntry *dep; - Process *rp; - Eterm port_id = psc->port->common.id; - - ASSERT(lnk->type == LINK_PID); - - if (IS_TRACED_FL(psc->port, F_TRACE_PORTS)) - trace_port(psc->port, am_unlink, lnk->pid); - - if (is_external_pid(lnk->pid)) { - dep = external_pid_dist_entry(lnk->pid); - if(dep != erts_this_dist_entry) { - ErtsDistLinkData dld; - ErtsDSigData dsd; - int code; - code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); - switch (code) { - case ERTS_DSIG_PREP_NOT_ALIVE: - case ERTS_DSIG_PREP_NOT_CONNECTED: - break; - case ERTS_DSIG_PREP_PENDING: - case ERTS_DSIG_PREP_CONNECTED: - erts_remove_dist_link(&dld, port_id, lnk->pid, dep); - erts_destroy_dist_link(&dld); - code = erts_dsig_send_exit(&dsd, port_id, lnk->pid, - psc->reason); - ASSERT(code == ERTS_DSIG_SEND_OK); - break; - default: - ASSERT(! "Invalid dsig prepare result"); - break; - } - } - } else { - ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND; - ASSERT(is_internal_pid(lnk->pid)); - rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks); - if (rp) { - ErtsLink *rlnk = erts_remove_link(&ERTS_P_LINKS(rp), port_id); - - if (rlnk) { - int xres = erts_send_exit_signal(NULL, - port_id, - rp, - &rp_locks, - psc->reason, - NIL, - NULL, - 0); - if (xres >= 0) { - if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) { - erts_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND); - rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND; - } - /* We didn't exit the process and it is traced */ - if (IS_TRACED_FL(rp, F_TRACE_PROCS)) - trace_proc(NULL, 0, rp, am_getting_unlinked, port_id); - } - erts_destroy_link(rlnk); - } - - erts_proc_unlock(rp, rp_locks); - } - } - erts_destroy_link(lnk); + ErtsPortExitContext *pectxt = vpectxt; + erts_proc_sig_send_link_exit(NULL, pectxt->port_id, + lnk, pectxt->reason, NIL); } -static void -port_fire_one_monitor(ErtsMonitor *mon, void *ctx0) +static void monitor_port_exit(ErtsMonitor *mon, void *vpectxt) { - Process *origin; - ErtsProcLocks origin_locks; - - if (mon->type != MON_TARGET || ! is_pid(mon->u.pid)) { - return; - } - /* - * Proceed here if someone monitors us, we (port) are the target and - * origin is some process - */ - origin_locks = ERTS_PROC_LOCKS_MSG_SEND | ERTS_PROC_LOCK_LINK; - - origin = erts_pid2proc(NULL, 0, mon->u.pid, origin_locks); - if (origin) { - DeclareTmpHeapNoproc(lhp,3); - SweepContext *ctx = (SweepContext *)ctx0; - ErtsMonitor *rmon; - Eterm watched = (is_atom(mon->name) - ? TUPLE2(lhp, mon->name, erts_this_dist_entry->sysname) - : ctx->port->common.id); - - erts_queue_monitor_message(origin, &origin_locks, mon->ref, am_port, - watched, ctx->reason); - UnUseTmpHeapNoproc(3); - - rmon = erts_remove_monitor(&ERTS_P_MONITORS(origin), mon->ref); - erts_proc_unlock(origin, origin_locks); - - if (rmon) { - erts_destroy_monitor(rmon); - } - } + ErtsPortExitContext *pectxt = vpectxt; + if (erts_monitor_is_target(mon)) + erts_proc_sig_send_monitor_down(mon, pectxt->reason); + else + erts_proc_sig_send_demonitor(mon); } /* 'from' is sending 'this_port' an exit signal, (this_port must be internal). @@ -3923,9 +3671,12 @@ int erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed, int drop_normal) { - ErtsLink *lnk; + ErtsLink *links; + ErtsMonitor *monitors; + ErtsMonitor *lt_monitors; Eterm modified_reason; erts_aint32_t state, set_state_flags; + ErtsPortExitContext pectxt; ERTS_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -3973,23 +3724,36 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed, set_busy_port(ERTS_Port2ErlDrvPort(prt), 0); + links = ERTS_P_LINKS(prt); + ERTS_P_LINKS(prt) = NULL; + monitors = ERTS_P_MONITORS(prt); + ERTS_P_MONITORS(prt) = NULL; + lt_monitors = ERTS_P_LT_MONITORS(prt); + ERTS_P_LT_MONITORS(prt) = NULL; + if (prt->common.u.alive.reg != NULL) (void) erts_unregister_name(NULL, 0, prt, prt->common.u.alive.reg->name); - { - SweepContext sc = {prt, modified_reason}; - lnk = ERTS_P_LINKS(prt); - ERTS_P_LINKS(prt) = NULL; - erts_sweep_links(lnk, &sweep_one_link, &sc); + pectxt.port_id = prt->common.id; + pectxt.reason = modified_reason; + + if (links) + erts_monitor_tree_foreach_delete(&links, + link_port_exit, + (void *) &pectxt); + + if (monitors || lt_monitors) { + DRV_MONITOR_LOCK_PDL(prt); + if (monitors) + erts_monitor_tree_foreach_delete(&monitors, + monitor_port_exit, + (void *) &pectxt); + if (lt_monitors) + erts_monitor_list_foreach_delete(<_monitors, + monitor_port_exit, + (void *) &pectxt); + DRV_MONITOR_UNLOCK_PDL(prt); } - DRV_MONITOR_LOCK_PDL(prt); - { - SweepContext ctx = {prt, modified_reason}; - ErtsMonitor *moni = ERTS_P_MONITORS(prt); - ERTS_P_MONITORS(prt) = NULL; - erts_sweep_monitors(moni, &sweep_one_monitor, &ctx); - } - DRV_MONITOR_UNLOCK_PDL(prt); if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && prt->dist_entry) { erts_do_net_exits(prt->dist_entry, modified_reason); @@ -5069,14 +4833,18 @@ typedef struct { static void prt_one_monitor(ErtsMonitor *mon, void *vprtd) { + ErtsMonitorData *mdp = erts_monitor_to_data(mon); prt_one_lnk_data *prtd = (prt_one_lnk_data *) vprtd; - erts_print(prtd->to, prtd->arg, "(%T,%T)", mon->u.pid, mon->ref); + if (mon->type == ERTS_MON_TYPE_RESOURCE && erts_monitor_is_target(mon)) + erts_print(prtd->to, prtd->arg, "(%p,%T)", mon->other.ptr, mdp->ref); + else + erts_print(prtd->to, prtd->arg, "(%T,%T)", mon->other.item, mdp->ref); } static void prt_one_lnk(ErtsLink *lnk, void *vprtd) { prt_one_lnk_data *prtd = (prt_one_lnk_data *) vprtd; - erts_print(prtd->to, prtd->arg, "%T", lnk->pid); + erts_print(prtd->to, prtd->arg, "%T", lnk->other.item); } static void dump_port_state(fmtfn_t to, void *arg, erts_aint32_t state) @@ -5188,15 +4956,18 @@ print_port_info(Port *p, fmtfn_t to, void *arg) prtd.to = to; prtd.arg = arg; erts_print(to, arg, "Links: "); - erts_doforall_links(ERTS_P_LINKS(p), &prt_one_lnk, &prtd); + erts_link_tree_foreach(ERTS_P_LINKS(p), prt_one_lnk, (void *) &prtd); erts_print(to, arg, "\n"); } - if (ERTS_P_MONITORS(p)) { + if (ERTS_P_MONITORS(p) || ERTS_P_LT_MONITORS(p)) { prt_one_lnk_data prtd; prtd.to = to; prtd.arg = arg; erts_print(to, arg, "Monitors: "); - erts_doforall_monitors(ERTS_P_MONITORS(p), &prt_one_monitor, &prtd); + erts_monitor_tree_foreach(ERTS_P_MONITORS(p), prt_one_monitor, + (void *) &prtd); + erts_monitor_list_foreach(ERTS_P_LT_MONITORS(p), prt_one_monitor, + (void *) &prtd); erts_print(to, arg, "\n"); } if (p->suspended) { @@ -7033,24 +6804,23 @@ static int do_driver_monitor_process(Port *prt, ErlDrvMonitor *monitor) { Eterm buf[ERTS_REF_THING_SIZE]; - Process *rp; Eterm ref; + ErtsMonitorData *mdp; - if (prt->drv_ptr->process_exit == NULL) { + if (!prt->drv_ptr->process_exit) return -1; - } - rp = erts_pid2proc_opt(NULL, 0, - (Eterm) process, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - if (!rp) { - return 1; - } ref = erts_make_ref_in_buffer(buf); - erts_add_monitor(&ERTS_P_MONITORS(prt), MON_ORIGIN, ref, rp->common.id, NIL); - erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, ref, prt->common.id, NIL); + mdp = erts_monitor_create(ERTS_MON_TYPE_PORT, ref, + prt->common.id, process, NIL); + + if (!erts_proc_sig_send_monitor(&mdp->target, process)) { + erts_monitor_release_both(mdp); + return 1; + } + + erts_monitor_tree_insert(&ERTS_P_MONITORS(prt), &mdp->origin); - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); erts_ref_to_driver_monitor(ref,monitor); return 0; } @@ -7083,37 +6853,17 @@ int driver_monitor_process(ErlDrvPort drvport, static int do_driver_demonitor_process(Port *prt, const ErlDrvMonitor *monitor) { Eterm heap[ERTS_REF_THING_SIZE]; - Process *rp; Eterm ref; ErtsMonitor *mon; - Eterm to; ref = erts_driver_monitor_to_ref(heap, monitor); - mon = erts_lookup_monitor(ERTS_P_MONITORS(prt), ref); - if (mon == NULL) { - return 1; - } - ASSERT(mon->type == MON_ORIGIN); - to = mon->u.pid; - ASSERT(is_internal_pid(to)); - rp = erts_pid2proc_opt(NULL, - 0, - to, - ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - mon = erts_remove_monitor(&ERTS_P_MONITORS(prt), ref); - if (mon) { - erts_destroy_monitor(mon); - } - if (rp) { - ErtsMonitor *rmon; - rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - if (rmon != NULL) { - erts_destroy_monitor(rmon); - } - } + mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(prt), ref); + if (!mon || !erts_monitor_is_origin(mon)) + return 1; + + erts_monitor_tree_delete(&ERTS_P_MONITORS(prt), mon); + erts_proc_sig_send_demonitor(mon); return 0; } @@ -7142,19 +6892,16 @@ static ErlDrvTermData do_driver_get_monitored_process(Port *prt,const ErlDrvMoni { Eterm ref; ErtsMonitor *mon; - Eterm to; Eterm heap[ERTS_REF_THING_SIZE]; ref = erts_driver_monitor_to_ref(heap, monitor); - mon = erts_lookup_monitor(ERTS_P_MONITORS(prt), ref); - if (mon == NULL) { + mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(prt), ref); + if (!mon || !erts_monitor_is_origin(mon)) return driver_term_nil; - } - ASSERT(mon->type == MON_ORIGIN); - to = mon->u.pid; - ASSERT(is_internal_pid(to)); - return (ErlDrvTermData) to; + + ASSERT(is_internal_pid(mon->other.item)); + return (ErlDrvTermData) mon->other.item; } @@ -7186,24 +6933,27 @@ int driver_compare_monitors(const ErlDrvMonitor *monitor1, ERTS_REF_THING_SIZE*sizeof(Eterm)); } -void erts_fire_port_monitor(Port *prt, Eterm ref) +void erts_fire_port_monitor(Port *prt, ErtsMonitor *tmon) { - ErtsMonitor *rmon; + ErtsMonitorData *mdp; void (*callback)(ErlDrvData drv_data, ErlDrvMonitor *monitor); ErlDrvMonitor drv_monitor; int fpe_was_unmasked; ERTS_MSACC_PUSH_STATE_M(); ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); - ASSERT(prt->drv_ptr != NULL); + ASSERT(prt->drv_ptr != NULL); + ASSERT(erts_monitor_is_target(tmon)); + mdp = erts_monitor_to_data(tmon); DRV_MONITOR_LOCK_PDL(prt); - if (erts_lookup_monitor(ERTS_P_MONITORS(prt), ref) == NULL) { + if (!erts_monitor_is_in_table(&mdp->origin)) { DRV_MONITOR_UNLOCK_PDL(prt); + erts_monitor_release(tmon); return; } callback = prt->drv_ptr->process_exit; ASSERT(callback != NULL); - erts_ref_to_driver_monitor(ref,&drv_monitor); + erts_ref_to_driver_monitor(mdp->ref,&drv_monitor); ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT); DRV_MONITOR_UNLOCK_PDL(prt); #ifdef USE_VM_PROBES @@ -7227,11 +6977,9 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) DRV_MONITOR_LOCK_PDL(prt); ERTS_MSACC_POP_STATE_M(); /* remove monitor *after* callback */ - rmon = erts_remove_monitor(&ERTS_P_MONITORS(prt), ref); + erts_monitor_tree_delete(&ERTS_P_MONITORS(prt), &mdp->origin); DRV_MONITOR_UNLOCK_PDL(prt); - if (rmon) { - erts_destroy_monitor(rmon); - } + erts_monitor_release_both(mdp); } @@ -7276,8 +7024,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) int driver_exit(ErlDrvPort ix, int err) { Port* prt = erts_drvport2port(ix); - Process* rp; - ErtsLink *lnk, *rlnk = NULL; + ErtsLink *lnk; Eterm connected; ERTS_CHK_NO_PROC_LOCKS; @@ -7286,22 +7033,10 @@ int driver_exit(ErlDrvPort ix, int err) return -1; connected = ERTS_PORT_GET_CONNECTED(prt); - rp = erts_pid2proc(NULL, 0, connected, ERTS_PROC_LOCK_LINK); - if (rp) { - rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id); - } - - lnk = erts_remove_link(&ERTS_P_LINKS(prt), connected); - - if (rp) - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - - if (rlnk != NULL) { - erts_destroy_link(rlnk); - } - - if (lnk != NULL) { - erts_destroy_link(lnk); + lnk = erts_link_tree_lookup(ERTS_P_LINKS(prt), connected); + if (lnk) { + erts_link_tree_delete(&ERTS_P_LINKS(prt), lnk); + erts_proc_sig_send_unlink(NULL, lnk); } if (err == 0) |