diff options
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r-- | erts/emulator/beam/erl_port_task.c | 229 |
1 files changed, 174 insertions, 55 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index 7d53ce7152..3102e44c11 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -1,18 +1,19 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 2006-2013. All Rights Reserved. + * Copyright Ericsson AB 2006-2016. All Rights Reserved. * - * The contents of this file are subject to the Erlang Public License, - * Version 1.1, (the "License"); you may not use this file except in - * compliance with the License. You should have received a copy of the - * Erlang Public License along with this software. If not, it can be - * retrieved online at http://www.erlang.org/. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * * %CopyrightEnd% */ @@ -32,7 +33,9 @@ #include "global.h" #include "erl_port_task.h" #include "dist.h" +#include "erl_check_io.h" #include "dtrace-wrapper.h" +#include "lttng-wrapper.h" #include <stdarg.h> /* @@ -67,6 +70,25 @@ static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_q #else #define DTRACE_DRIVER(PROBE_NAME, PP) do {} while(0) #endif +#ifdef USE_LTTNG_VM_TRACEPOINTS +#define LTTNG_DRIVER(TRACEPOINT, PP) \ + if (LTTNG_ENABLED(TRACEPOINT)) { \ + lttng_decl_portbuf(port_str); \ + lttng_decl_procbuf(proc_str); \ + lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(PP), proc_str); \ + lttng_port_to_str((PP), port_str); \ + LTTNG3(TRACEPOINT, proc_str, port_str, (PP)->name); \ + } +#else +#define LTTNG_DRIVER(TRACEPOINT, PP) do {} while(0) +#endif + +#define ERTS_SMP_LC_VERIFY_RQ(RQ, PP) \ + do { \ + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); \ + ERTS_SMP_LC_ASSERT((RQ) == ((ErtsRunQueue *) \ + erts_smp_atomic_read_nob(&(PP)->run_queue))); \ + } while (0) erts_smp_atomic_t erts_port_task_outstanding_io_tasks; @@ -171,10 +193,9 @@ p2p_sig_data_to_task(ErtsProc2PortSigData *sigdp) return ptp; } -ErtsProc2PortSigData * -erts_port_task_alloc_p2p_sig_data(void) +static ERTS_INLINE ErtsProc2PortSigData * +p2p_sig_data_init(ErtsPortTask *ptp) { - ErtsPortTask *ptp = port_task_alloc(); ptp->type = ERTS_PORT_TASK_PROC_SIG; ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP; @@ -185,6 +206,31 @@ erts_port_task_alloc_p2p_sig_data(void) return &ptp->u.alive.td.psig.data; } +ErtsProc2PortSigData * +erts_port_task_alloc_p2p_sig_data(void) +{ + ErtsPortTask *ptp = port_task_alloc(); + + return p2p_sig_data_init(ptp); +} + +ErtsProc2PortSigData * +erts_port_task_alloc_p2p_sig_data_extra(size_t extra, void **extra_ptr) +{ + ErtsPortTask *ptp = erts_alloc(ERTS_ALC_T_PORT_TASK, + sizeof(ErtsPortTask) + extra); + + *extra_ptr = ptp+1; + + return p2p_sig_data_init(ptp); +} + +void +erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData *sigdp) +{ + schedule_port_task_free(p2p_sig_data_to_task(sigdp)); +} + static ERTS_INLINE Eterm task_caller(ErtsPortTask *ptp) { @@ -543,6 +589,16 @@ reset_handle(ErtsPortTask *ptp) } static ERTS_INLINE void +reset_executed_io_task_handle(ErtsPortTask *ptp) +{ + if (ptp->u.alive.handle) { + ASSERT(ptp == handle2task(ptp->u.alive.handle)); + erts_io_notify_port_task_executed(ptp->u.alive.handle); + reset_port_task_handle(ptp->u.alive.handle); + } +} + +static ERTS_INLINE void set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp) { ptp->u.alive.handle = pthp; @@ -798,12 +854,13 @@ schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp) static ERTS_INLINE void abort_nosuspend_task(Port *pp, ErtsPortTaskType type, - ErtsPortTaskTypeData *tdp) + ErtsPortTaskTypeData *tdp, + int bpq_data) { ASSERT(type == ERTS_PORT_TASK_PROC_SIG); - if (!pp->sched.taskq.bpq) + if (!bpq_data) tdp->psig.callback(NULL, ERTS_PORT_SFLG_INVALID, ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, @@ -877,6 +934,11 @@ enqueue_port(ErtsRunQueue *runq, Port *pp) ASSERT(runq->ports.start && runq->ports.end); erts_smp_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL); + +#ifdef ERTS_SMP + if (runq->halt_in_progress) + erts_non_empty_runq(runq); +#endif } static ERTS_INLINE Port * @@ -986,6 +1048,7 @@ static ERTS_INLINE int finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q) { erts_aint32_t act; + unsigned int prof_runnable_ports; if (!processing_busy_q) pp->sched.taskq.local.first = *execq; @@ -1002,6 +1065,10 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q) if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq); + prof_runnable_ports = erts_system_profile_flags.runnable_ports; + if (prof_runnable_ports) + erts_port_task_sched_lock(&pp->sched); + while (1) { erts_aint32_t new, exp; @@ -1013,12 +1080,24 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q) act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); - ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ)); + ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ)); + ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_EXEC_IMM)); if (exp == act) break; } + if (prof_runnable_ports | IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { + /* trace port scheduling, out */ + if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) + trace_sched_ports(pp, am_out); + if (prof_runnable_ports) { + if (!(act & (ERTS_PTS_FLG_EXEC_IMM|ERTS_PTS_FLG_HAVE_TASKS))) + profile_runnable_port(pp, am_inactive); + erts_port_task_sched_unlock(&pp->sched); + } + } + return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0; } @@ -1340,7 +1419,7 @@ erts_port_task_abort_nosuspend_tasks(Port *pp) #endif schedule_port_task_handle_list_free(pthlp); - abort_nosuspend_task(pp, type, &td); + abort_nosuspend_task(pp, type, &td, pp->sched.taskq.bpq != NULL); } } @@ -1364,11 +1443,9 @@ erts_port_task_schedule(Eterm id, Port *pp; ErtsPortTask *ptp = NULL; erts_aint32_t act, add_flags; + unsigned int prof_runnable_ports; - if (pthp && erts_port_task_is_scheduled(pthp)) { - ASSERT(0); - erts_port_task_abort(pthp); - } + ERTS_LC_ASSERT(!pthp || !erts_port_task_is_scheduled(pthp)); ASSERT(is_internal_port(id)); @@ -1452,6 +1529,10 @@ erts_port_task_schedule(Eterm id, if (ns_pthlp) add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS; + prof_runnable_ports = erts_system_profile_flags.runnable_ports; + if (prof_runnable_ports) + erts_port_task_sched_lock(&pp->sched); + while (1) { erts_aint32_t new, exp; @@ -1476,6 +1557,13 @@ erts_port_task_schedule(Eterm id, goto done; /* Died after our task insert... */ } + if (prof_runnable_ports) { + if (!(act & ERTS_PTS_FLG_EXEC_IMM)) + profile_runnable_port(pp, am_active); + erts_port_task_sched_unlock(&pp->sched); + prof_runnable_ports = 0; + } + /* Enqueue port on run-queue */ runq = erts_port_runq(pp); @@ -1484,8 +1572,10 @@ erts_port_task_schedule(Eterm id, #ifdef ERTS_SMP xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); + ERTS_SMP_LC_ASSERT(runq != xrunq); + ERTS_SMP_LC_VERIFY_RQ(runq, pp); if (xrunq) { - /* Port emigrated ... */ + /* Emigrate port ... */ erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); erts_smp_runq_unlock(runq); runq = erts_port_runq(pp); @@ -1495,10 +1585,6 @@ erts_port_task_schedule(Eterm id, #endif enqueue_port(runq, pp); - - if (erts_system_profile_flags.runnable_ports) { - profile_runnable_port(pp, am_active); - } erts_smp_runq_unlock(runq); @@ -1506,6 +1592,9 @@ erts_port_task_schedule(Eterm id, done: + if (prof_runnable_ports) + erts_port_task_sched_unlock(&pp->sched); + #ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_port_dec_refc(pp); @@ -1520,7 +1609,7 @@ abort_nosuspend: erts_port_dec_refc(pp); #endif - abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td); + abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td, 0); ASSERT(ns_pthlp); erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp); @@ -1595,6 +1684,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_aint32_t state; int active; Uint64 start_time = 0; + ErtsSchedulerData *esdp = runq->scheduler; + ERTS_MSACC_PUSH_STATE_M(); ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); @@ -1604,12 +1695,13 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) goto done; } + ERTS_SMP_LC_VERIFY_RQ(runq, pp); + erts_smp_runq_unlock(runq); *curr_port_pp = pp; if (erts_sched_stat.enabled) { - ErtsSchedulerData *esdp = erts_get_scheduler_data(); Uint old = ERTS_PORT_SCHED_ID(pp, esdp->no); int migrated = old && old != esdp->no; @@ -1636,6 +1728,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) state = erts_atomic32_read_nob(&pp->state); pp->reds = ERTS_PORT_REDS_EXECUTE; + ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT); goto begin_handle_tasks; while (1) { @@ -1654,8 +1747,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) goto aborted_port_task; } - reset_handle(ptp); - if (erts_system_monitor_long_schedule != 0) { start_time = erts_timestamp_millis(); } @@ -1666,40 +1757,57 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) switch (ptp->type) { case ERTS_PORT_TASK_TIMEOUT: - reds = ERTS_PORT_REDS_TIMEOUT; - if (!(state & ERTS_PORT_SFLGS_DEAD)) { - DTRACE_DRIVER(driver_timeout, pp); - (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data); - } + reset_handle(ptp); + if (!ERTS_PTMR_IS_TIMED_OUT(pp)) + reds = 0; + else { + ERTS_PTMR_CLEAR(pp); + reds = ERTS_PORT_REDS_TIMEOUT; + if (!(state & ERTS_PORT_SFLGS_DEAD)) { + DTRACE_DRIVER(driver_timeout, pp); + LTTNG_DRIVER(driver_timeout, pp); + if (IS_TRACED_FL(pp, F_TRACE_RECEIVE)) + trace_port(pp, am_receive, am_timeout); + (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data); + } + } break; case ERTS_PORT_TASK_INPUT: reds = ERTS_PORT_REDS_INPUT; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_input, pp); - /* NOTE some windows drivers use ->ready_input for input and output */ + LTTNG_DRIVER(driver_ready_input, pp); + /* NOTE some windows drivers use ->ready_input + for input and output */ (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->u.alive.td.io.event); + reset_executed_io_task_handle(ptp); io_tasks_executed++; break; case ERTS_PORT_TASK_OUTPUT: reds = ERTS_PORT_REDS_OUTPUT; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_output, pp); + LTTNG_DRIVER(driver_ready_output, pp); (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->u.alive.td.io.event); + reset_executed_io_task_handle(ptp); io_tasks_executed++; break; case ERTS_PORT_TASK_EVENT: reds = ERTS_PORT_REDS_EVENT; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_event, pp); + LTTNG_DRIVER(driver_event, pp); (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->u.alive.td.io.event, ptp->u.alive.td.io.event_data); + reset_executed_io_task_handle(ptp); io_tasks_executed++; break; case ERTS_PORT_TASK_PROC_SIG: { ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data; + reset_handle(ptp); ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); if (!pp->sched.taskq.bpq) reds = ptp->u.alive.td.psig.callback(pp, @@ -1717,10 +1825,11 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) break; } case ERTS_PORT_TASK_DIST_CMD: + reset_handle(ptp); reds = erts_dist_command(pp, CONTEXT_REDS - pp->reds); break; default: - erl_exit(ERTS_ABORT_EXIT, + erts_exit(ERTS_ABORT_EXIT, "Invalid port task type: %d\n", (int) ptp->type); break; @@ -1758,11 +1867,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) } erts_unblock_fpe(fpe_was_unmasked); + ERTS_MSACC_POP_STATE_M(); - /* trace port scheduling, out */ - if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { - trace_sched_ports(pp, am_out); - } if (io_tasks_executed) { ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) @@ -1785,11 +1891,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_smp_runq_lock(runq); - if (!active) { - if (erts_system_profile_flags.runnable_ports) - profile_runnable_port(pp, am_inactive); - } - else { + if (active) { #ifdef ERTS_SMP ErtsRunQueue *xrunq; #endif @@ -1798,6 +1900,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) #ifdef ERTS_SMP xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); + ERTS_SMP_LC_ASSERT(runq != xrunq); + ERTS_SMP_LC_VERIFY_RQ(runq, pp); if (!xrunq) { #endif enqueue_port(runq, pp); @@ -1805,7 +1909,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) #ifdef ERTS_SMP } else { - /* Port emigrated ... */ + /* Emigrate port... */ erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); erts_smp_runq_unlock(runq); @@ -1827,7 +1931,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) runq->scheduler->reductions += reds; ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds); + ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds); return res; } @@ -1838,6 +1942,16 @@ release_port(void *vport) { erts_port_dec_refc((Port *) vport); } + +static void +schedule_release_port(void *vport) { + Port *pp = (Port*)vport; + /* This is only used when a port release was ordered from a non-scheduler */ + erts_schedule_thr_prgr_later_op(release_port, + (void *) pp, + &pp->common.u.release); +} + #endif static void @@ -1980,7 +2094,7 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) break; } default: - erl_exit(ERTS_ABORT_EXIT, + erts_exit(ERTS_ABORT_EXIT, "Invalid port task type: %d\n", (int) ptp->type); } @@ -2019,9 +2133,9 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) DTRACE_CHARBUF(pid_str, 16); ErtsProcList* plp2 = plp; - erts_snprintf(port_str, sizeof(port_str), "%T", pp->common.id); + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", pp->common.id); while (plp2 != NULL) { - erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); + erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->pid); DTRACE2(process_port_unblocked, pid_str, port_str); } } @@ -2033,10 +2147,15 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) * Schedule cleanup of port structure... */ #ifdef ERTS_SMP - /* Has to be more or less immediate to release any driver */ - erts_schedule_thr_prgr_later_op(release_port, - (void *) pp, - &pp->common.u.release); + /* We might not be a scheduler, eg. traceing to port we are sys_msg_dispatcher */ + if (!erts_get_scheduler_data()) { + erts_schedule_misc_aux_work(1, schedule_release_port, (void*)pp); + } else { + /* Has to be more or less immediate to release any driver */ + erts_schedule_thr_prgr_later_op(release_port, + (void *) pp, + &pp->common.u.release); + } #else pp->cleanup = 1; #endif |