diff options
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r-- | erts/emulator/beam/io.c | 885 |
1 files changed, 552 insertions, 333 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 9076bbe73c..2bd31ee97e 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -1,18 +1,19 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1996-2013. All Rights Reserved. + * Copyright Ericsson AB 1996-2014. All Rights Reserved. * - * The contents of this file are subject to the Erlang Public License, - * Version 1.1, (the "License"); you may not use this file except in - * compliance with the License. You should have received a copy of the - * Erlang Public License along with this software. If not, it can be - * retrieved online at http://www.erlang.org/. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * * %CopyrightEnd% */ @@ -46,9 +47,15 @@ #define ERTS_WANT_EXTERNAL_TAGS #include "external.h" #include "dtrace-wrapper.h" +#include "erl_map.h" +#include "erl_bif_unique.h" +#include "erl_hl_timer.h" +#include "erl_time.h" extern ErlDrvEntry fd_driver_entry; +#ifndef __OSE__ extern ErlDrvEntry vanilla_driver_entry; +#endif extern ErlDrvEntry spawn_driver_entry; extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */ @@ -60,8 +67,6 @@ static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error o per thread basis (for BC interfaces) */ ErtsPTab erts_port erts_align_attribute(ERTS_CACHE_LINE_SIZE); /* The port table */ -erts_smp_atomic_t erts_bytes_out; /* No bytes sent out of the system */ -erts_smp_atomic_t erts_bytes_in; /* No bytes gotten into the system */ const ErlDrvTermData driver_term_nil = (ErlDrvTermData)NIL; @@ -75,6 +80,9 @@ int erts_port_synchronous_ops = 0; int erts_port_schedule_all_ops = 0; int erts_port_parallelism = 0; +static erts_atomic64_t bytes_in; +static erts_atomic64_t bytes_out; + static void deliver_result(Eterm sender, Eterm pid, Eterm res); static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *); static void terminate_port(Port *p); @@ -244,11 +252,13 @@ static ERTS_INLINE void port_init_instr(Port *prt ASSERT(prt->drv_ptr && prt->lock); if (!prt->drv_ptr->lock) { char *lock_str = "port_lock"; + erts_mtx_init_locked_x(prt->lock, lock_str, id, #ifdef ERTS_ENABLE_LOCK_COUNT - if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PORTLOCK)) - lock_str = NULL; + (erts_lcnt_rt_options & ERTS_LCNT_OPT_PORTLOCK) +#else + 0 #endif - erts_mtx_init_locked_x(prt->lock, lock_str, id); + ); } #endif erts_port_task_init_sched(&prt->sched, id); @@ -373,11 +383,7 @@ static Port *create_port(char *name, prt->dist_entry = NULL; ERTS_PORT_INIT_CONNECTED(prt, pid); prt->common.u.alive.reg = NULL; -#ifdef ERTS_SMP - prt->common.u.alive.ptimer = NULL; -#else - sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer)); -#endif + ERTS_PTMR_INIT(prt); erts_port_task_handle_init(&prt->timeout_task); prt->psd = NULL; prt->drv_data = (SWord) 0; @@ -386,7 +392,7 @@ static Port *create_port(char *name, /* Set default tracing */ erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt)); - ASSERT(((char *) prt) == ((char *) &prt->common)); + ERTS_CT_ASSERT(offsetof(Port,common) == 0); #if !ERTS_PORT_INIT_INSTR_NEED_ID /* @@ -457,11 +463,7 @@ erts_port_free(Port *prt) | ERTS_PORT_SFLG_FREE)); ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG); -#ifdef ERTS_SMP - ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->common.refc) == 0); -#else - ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->refc) == 0); -#endif + ERTS_LC_ASSERT(erts_atomic_read_nob(&prt->common.refc.atmc) == 0); erts_port_task_fini_sched(&prt->sched); @@ -730,11 +732,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ /* * Must clean up the port. */ -#ifdef ERTS_SMP - erts_cancel_smp_ptimer(port->common.u.alive.ptimer); -#else - erts_cancel_timer(&(port->common.u.alive.tm)); -#endif + erts_cancel_port_timer(port); stopq(port); if (port->linebuf != NULL) { erts_free(ERTS_ALC_T_LINEBUF, @@ -909,8 +907,8 @@ int erts_port_handle_xports(Port *prt) (iov)->iov_base = (ptr); \ (iov)->iov_len = (len); \ if (sizeof((iov)->iov_len) < sizeof(len) \ - /* Check if (len) overflowed (iov)->iov_len */ \ - && ((len) >> (sizeof((iov)->iov_len)*CHAR_BIT)) != 0) { \ + /* Check if (len) overflowed (iov)->iov_len */ \ + && (iov)->iov_len != (len)) { \ goto L_overflow; \ } \ *(bv)++ = (bin); \ @@ -1213,9 +1211,10 @@ typedef struct { static ERTS_INLINE ErtsTryImmDrvCallResult try_imm_drv_call(ErtsTryImmDrvCallState *sp) { + unsigned int prof_runnable_ports; ErtsTryImmDrvCallResult res; int reds_left_in; - erts_aint32_t invalid_state, invalid_sched_flags; + erts_aint32_t act, exp, invalid_state, invalid_sched_flags; Port *prt = sp->port; Process *c_p = sp->c_p; @@ -1242,18 +1241,39 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp) goto locked_fail; } - sp->sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); - if (sp->sched_flags & invalid_sched_flags) { - res = ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS; - goto locked_fail; - } + prof_runnable_ports = erts_system_profile_flags.runnable_ports; + if (prof_runnable_ports) + erts_port_task_sched_lock(&prt->sched); + act = erts_smp_atomic32_read_nob(&prt->sched.flags); + + do { + erts_aint32_t new; + + if (act & invalid_sched_flags) { + res = ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS; + sp->sched_flags = act; + goto locked_fail; + } + exp = act; + new = act | ERTS_PTS_FLG_EXEC_IMM; + act = erts_smp_atomic32_cmpxchg_mb(&prt->sched.flags, new, exp); + } while (act != exp); + + sp->sched_flags = act; if (!c_p) reds_left_in = CONTEXT_REDS/10; else { if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS)) trace_virtual_sched(c_p, am_out); + /* + * No status lock held while sending runnable + * proc trace messages. It is however not needed + * in this case, since only this thread can send + * such messages for this process until the process + * has been scheduled out. + */ if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) profile_runnable_proc(c_p, am_inactive); @@ -1268,11 +1288,14 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp) ERTS_SMP_CHK_NO_PROC_LOCKS; - if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) - trace_sched_ports_where(prt, am_in, sp->port_op); - if (erts_system_profile_flags.runnable_ports - && !erts_port_is_scheduled(prt)) - profile_runnable_port(prt, am_active); + if (prof_runnable_ports | IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) { + if (prof_runnable_ports && !(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) + profile_runnable_port(prt, am_active); + if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) + trace_sched_ports_where(prt, am_in, sp->port_op); + if (prof_runnable_ports) + erts_port_task_sched_unlock(&prt->sched); + } sp->fpe_was_unmasked = erts_block_fpe(); @@ -1289,17 +1312,31 @@ finalize_imm_drv_call(ErtsTryImmDrvCallState *sp) int reds; Port *prt = sp->port; Process *c_p = sp->c_p; + erts_aint32_t act; + unsigned int prof_runnable_ports; reds = prt->reds; reds += erts_port_driver_callback_epilogue(prt, NULL); erts_unblock_fpe(sp->fpe_was_unmasked); - if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) - trace_sched_ports_where(prt, am_out, sp->port_op); - if (erts_system_profile_flags.runnable_ports - && !erts_port_is_scheduled(prt)) - profile_runnable_port(prt, am_inactive); + prof_runnable_ports = erts_system_profile_flags.runnable_ports; + if (prof_runnable_ports) + erts_port_task_sched_lock(&prt->sched); + + act = erts_smp_atomic32_read_band_mb(&prt->sched.flags, + ~ERTS_PTS_FLG_EXEC_IMM); + ERTS_SMP_LC_ASSERT(act & ERTS_PTS_FLG_EXEC_IMM); + + if (prof_runnable_ports | IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) { + if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) + trace_sched_ports_where(prt, am_out, sp->port_op); + if (prof_runnable_ports) { + if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) + profile_runnable_port(prt, am_inactive); + erts_port_task_sched_unlock(&prt->sched); + } + } erts_port_release(prt); @@ -1314,6 +1351,13 @@ finalize_imm_drv_call(ErtsTryImmDrvCallState *sp) if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS)) trace_virtual_sched(c_p, am_in); + /* + * No status lock held while sending runnable + * proc trace messages. It is however not needed + * in this case, since only this thread can send + * such messages for this process until the process + * has been scheduled out. + */ if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) profile_runnable_proc(c_p, am_active); @@ -1354,39 +1398,22 @@ finalize_force_imm_drv_call(ErtsTryImmDrvCallState *sp) static ERTS_INLINE void queue_port_sched_op_reply(Process *rp, ErtsProcLocks *rp_locksp, - Eterm *hp_start, - Eterm *hp, - Uint h_size, - ErlHeapFragment* bp, + ErtsHeapFactory* factory, Uint32 *ref_num, Eterm msg) { - Eterm ref = make_internal_ref(hp); + Eterm* hp = erts_produce_heap(factory, ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE, 0); + Eterm ref; + + ref= make_internal_ref(hp); write_ref_thing(hp, ref_num[0], ref_num[1], ref_num[2]); hp += REF_THING_SIZE; msg = TUPLE2(hp, ref, msg); - hp += 3; - if (!bp) { - HRelease(rp, hp_start + h_size, hp); - } - else { - Uint used_h_size = hp - hp_start; - ASSERT(h_size >= used_h_size); - if (h_size > used_h_size) - bp = erts_resize_message_buffer(bp, used_h_size, &msg, 1); - } + erts_factory_trim_and_close(factory, &msg, 1); - erts_queue_message(rp, - rp_locksp, - bp, - msg, - NIL -#ifdef USE_VM_PROBES - , NIL -#endif - ); + erts_queue_message(rp, rp_locksp, factory->heap_frags, msg, NIL); } static void @@ -1396,9 +1423,10 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg) if (rp) { ErlOffHeap *ohp; ErlHeapFragment* bp; + ErtsHeapFactory factory; Eterm msg_copy; Uint hsz, msg_sz; - Eterm *hp, *hp_start; + Eterm *hp; ErtsProcLocks rp_locks = 0; hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; @@ -1409,22 +1437,22 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg) hsz += msg_sz; } - hp_start = hp = erts_alloc_message_heap(hsz, + hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks); + erts_factory_message_init(&factory, rp, hp, bp); if (is_immed(msg)) msg_copy = msg; - else + else { msg_copy = copy_struct(msg, msg_sz, &hp, ohp); + factory.hp = hp; + } queue_port_sched_op_reply(rp, &rp_locks, - hp_start, - hp, - hsz, - bp, + &factory, ref_num, msg_copy); @@ -1509,12 +1537,10 @@ erts_schedule_proc2port_signal(Process *c_p, } static ERTS_INLINE void -send_badsig(Port *prt) -{ +send_badsig(Port *prt) { ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; Process* rp; Eterm connected = ERTS_PORT_GET_CONNECTED(prt); - ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(erts_get_scheduler_id()); @@ -1534,15 +1560,13 @@ send_badsig(Port *prt) 0); if (rp_locks) erts_smp_proc_unlock(rp, rp_locks); - } -} + } /* exit sent */ +} /* send_badsig */ static void -badsig_received(int bang_op, - Port *prt, +badsig_received(int bang_op, Port *prt, erts_aint32_t state, - int bad_output_value) -{ + int bad_output_value) { /* * if (bang_op) * we are part of a "Prt ! Something" operation @@ -1558,12 +1582,12 @@ badsig_received(int bang_op, } if (bang_op) send_badsig(prt); - } -} + } /* not invalid */ +} /* behaved accordingly */ static int -port_badsig(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) -{ +port_badsig(Port *prt, erts_aint32_t state, int op, + ErtsProc2PortSigData *sigdp) { if (op == ERTS_PROC2PORT_SIG_EXEC) badsig_received(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BANG_OP, prt, @@ -1572,16 +1596,14 @@ port_badsig(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); return ERTS_PORT_REDS_BADSIG; -} - - -/* - * bad_port_signal() will +} /* port_badsig */ +/* bad_port_signal() will * - preserve signal order of signals. * - send a 'badsig' exit signal to connected process if 'from' is an * internal pid and the port is alive when the bad signal reaches * it. */ + static ErtsPortOpResult bad_port_signal(Process *c_p, int flags, @@ -1656,6 +1678,7 @@ call_driver_outputv(int bang_op, if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt)) send_badsig(prt); else { + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ErlDrvSizeT size = evp->size; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt) @@ -1673,7 +1696,10 @@ call_driver_outputv(int bang_op, prt->caller = NIL; prt->bytes_out += size; - erts_smp_atomic_add_nob(&erts_bytes_out, size); + if (esdp) + esdp->io.out += (Uint64) size; + else + erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size); } } @@ -1753,7 +1779,7 @@ call_driver_output(int bang_op, if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt)) send_badsig(prt); else { - + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt) || ERTS_IS_CRASH_DUMPING); @@ -1769,7 +1795,10 @@ call_driver_output(int bang_op, prt->caller = NIL; prt->bytes_out += size; - erts_smp_atomic_add_nob(&erts_bytes_out, size); + if (esdp) + esdp->io.out += (Uint64) size; + else + erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size); } } @@ -2468,7 +2497,7 @@ set_port_connected(int bang_op, DTRACE_CHARBUF(newprocess_str, DTRACE_TERM_BUF_SIZE); dtrace_pid_str(connect, process_str); - erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id); + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); dtrace_proc_str(rp, newprocess_str); DTRACE4(port_connect, process_str, port_str, prt->name, newprocess_str); } @@ -2716,6 +2745,9 @@ void erts_init_io(int port_tab_size, drv_list_rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ; drv_list_rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED; + erts_atomic64_init_nob(&bytes_in, 0); + erts_atomic64_init_nob(&bytes_out, 0); + common_element_size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); common_element_size += ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ)); common_element_size += 10; /* name */ @@ -2742,8 +2774,10 @@ void erts_init_io(int port_tab_size, &drv_list_rwmtx_opts, "driver_list"); driver_list = NULL; - erts_smp_tsd_key_create(&driver_list_lock_status_key); - erts_smp_tsd_key_create(&driver_list_last_error_key); + erts_smp_tsd_key_create(&driver_list_lock_status_key, + "erts_driver_list_lock_status_key"); + erts_smp_tsd_key_create(&driver_list_last_error_key, + "erts_driver_list_last_error_key"); erts_ptab_init_table(&erts_port, ERTS_ALC_T_PORT_TABLE, @@ -2752,10 +2786,8 @@ void erts_init_io(int port_tab_size, port_tab_size, common_element_size, /* Doesn't need to be excact */ "port_table", - legacy_port_tab); - - erts_smp_atomic_init_nob(&erts_bytes_out, 0); - erts_smp_atomic_init_nob(&erts_bytes_in, 0); + legacy_port_tab, + 1); sys_init_io(); @@ -2763,8 +2795,11 @@ void erts_init_io(int port_tab_size, erts_smp_rwmtx_rwlock(&erts_driver_list_lock); init_driver(&fd_driver, &fd_driver_entry, NULL); +#ifndef __OSE__ init_driver(&vanilla_driver, &vanilla_driver_entry, NULL); +#endif init_driver(&spawn_driver, &spawn_driver_entry, NULL); + erts_init_static_drivers(); for (dp = driver_tab; *dp != NULL; dp++) erts_add_driver_entry(*dp, NULL, 1); @@ -2773,7 +2808,6 @@ void erts_init_io(int port_tab_size, } #if defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP) - static ERTS_INLINE void lcnt_enable_drv_lock_count(erts_driver_t *dp, int enable) { if (dp->lock) { @@ -2813,25 +2847,26 @@ static ERTS_INLINE void lcnt_enable_port_lock_count(Port *prt, int enable) } } -void erts_lcnt_enable_io_lock_count(int enable) -{ +void erts_lcnt_enable_io_lock_count(int enable) { erts_driver_t *dp; - int i, max = erts_ptab_max(&erts_port); + int ix, max = erts_ptab_max(&erts_port); + Port *prt; - for (i = 0; i < max; i++) { - Port *prt = erts_pix2port(i); - if (prt) + for (ix = 0; ix < max; ix++) { + if ((prt = erts_pix2port(ix)) != NULL) { lcnt_enable_port_lock_count(prt, enable); - } + } + } /* for all ports */ lcnt_enable_drv_lock_count(&vanilla_driver, enable); lcnt_enable_drv_lock_count(&spawn_driver, enable); lcnt_enable_drv_lock_count(&fd_driver, enable); - for (dp = driver_list; dp; dp = dp->next) + /* enable lock counting in all drivers */ + for (dp = driver_list; dp; dp = dp->next) { lcnt_enable_drv_lock_count(dp, enable); -} -#endif - + } +} /* enable/disable lock counting of ports */ +#endif /* defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP) */ /* * Buffering of data when using line oriented I/O on ports */ @@ -3016,7 +3051,7 @@ deliver_result(Eterm sender, Eterm pid, Eterm res) rp = (scheduler ? erts_proc_lookup(pid) - : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_SMP_INC_REFC)); + : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_INC_REFC)); if (rp) { Eterm tuple; @@ -3029,16 +3064,12 @@ deliver_result(Eterm sender, Eterm pid, Eterm res) hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks); res = copy_struct(res, sz_res, &hp, ohp); tuple = TUPLE2(hp, sender, res); - erts_queue_message(rp, &rp_locks, bp, tuple, NIL -#ifdef USE_VM_PROBES - , NIL -#endif - ); + erts_queue_message(rp, &rp_locks, bp, tuple, NIL); if (rp_locks) erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) - erts_smp_proc_dec_refc(rp); + erts_proc_dec_refc(rp); } } @@ -3082,7 +3113,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, rp = (scheduler ? erts_proc_lookup(to) - : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC)); + : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_INC_REFC)); if (!rp) return; @@ -3097,8 +3128,6 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, Binary* bptr; bptr = erts_bin_nrml_alloc(len); - bptr->flags = 0; - bptr->orig_size = len; erts_refc_init(&bptr->refc, 1); sys_memcpy(bptr->orig_bytes, buf, len); @@ -3131,15 +3160,11 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, tuple = TUPLE2(hp, prt->common.id, tuple); hp += 3; - erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined -#ifdef USE_VM_PROBES - , NIL -#endif - ); + erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined); if (rp_locks) erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) - erts_smp_proc_dec_refc(rp); + erts_proc_dec_refc(rp); } /* @@ -3225,7 +3250,7 @@ deliver_vec_message(Port* prt, /* Port */ rp = (scheduler ? erts_proc_lookup(to) - : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC)); + : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_INC_REFC)); if (!rp) return; @@ -3302,14 +3327,10 @@ deliver_vec_message(Port* prt, /* Port */ tuple = TUPLE2(hp, prt->common.id, tuple); hp += 3; - erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined -#ifdef USE_VM_PROBES - , NIL -#endif - ); + erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined); erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) - erts_smp_proc_dec_refc(rp); + erts_proc_dec_refc(rp); } @@ -3399,11 +3420,8 @@ terminate_port(Port *prt) send_closed_port_id = NIL; } -#ifdef ERTS_SMP - erts_cancel_smp_ptimer(prt->common.u.alive.ptimer); -#else - erts_cancel_timer(&prt->common.u.alive.tm); -#endif + if (ERTS_PTMR_IS_SET(prt)) + erts_cancel_port_timer(prt); drv = prt->drv_ptr; if ((drv != NULL) && (drv->stop != NULL)) { @@ -3581,9 +3599,9 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed) DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); DTRACE_CHARBUF(rreason_str, 64); - erts_snprintf(from_str, sizeof(from_str), "%T", from); + erts_snprintf(from_str, sizeof(DTRACE_CHARBUF_NAME(from_str)), "%T", from); dtrace_port_str(p, port_str); - erts_snprintf(rreason_str, sizeof(rreason_str), "%T", rreason); + erts_snprintf(rreason_str, sizeof(DTRACE_CHARBUF_NAME(rreason_str)), "%T", rreason); DTRACE4(port_exit, from_str, port_str, p->name, rreason_str); } #endif @@ -3874,12 +3892,13 @@ port_sig_control(Port *prt, if (res == ERTS_PORT_OP_DONE) { Eterm msg; - Eterm *hp, *hp_start; + Eterm *hp; ErlHeapFragment *bp; ErlOffHeap *ohp; + ErtsHeapFactory factory; Process *rp; ErtsProcLocks rp_locks = 0; - Uint hsz; + Uint hsz, rsz; int control_flags; rp = erts_proc_lookup_raw(sigdp->caller); @@ -3888,17 +3907,19 @@ port_sig_control(Port *prt, control_flags = prt->control_flags; - hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; - hsz += port_control_result_size(control_flags, + rsz = port_control_result_size(control_flags, resp_bufp, &resp_size, &resp_buf[0]); + hsz = rsz + ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; - hp_start = hp = erts_alloc_message_heap(hsz, + + hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks); + erts_factory_message_init(&factory, rp, hp, bp); msg = write_port_control_result(control_flags, resp_bufp, @@ -3907,13 +3928,11 @@ port_sig_control(Port *prt, &hp, bp, ohp); + factory.hp = hp; queue_port_sched_op_reply(rp, &rp_locks, - hp_start, - hp, - hsz, - bp, + &factory, sigdp->ref, msg); @@ -4031,6 +4050,9 @@ erts_port_control(Process* c_p, size, &resp_bufp, &resp_size); + + control_flags = prt->control_flags; + finalize_imm_drv_call(&try_call_state); if (tmp_alloced) erts_free(ERTS_ALC_T_TMP, bufp); @@ -4038,8 +4060,6 @@ erts_port_control(Process* c_p, return ERTS_PORT_OP_BADARG; } - control_flags = prt->control_flags; - hsz = port_control_result_size(control_flags, resp_bufp, &resp_size, @@ -4199,8 +4219,6 @@ port_sig_call(Port *prt, if (res == ERTS_PORT_OP_DONE) { Eterm msg; Eterm *hp; - ErlHeapFragment *bp; - ErlOffHeap *ohp; Process *rp; ErtsProcLocks rp_locks = 0; Sint hsz; @@ -4211,29 +4229,31 @@ port_sig_call(Port *prt, hsz = erts_decode_ext_size((byte *) resp_bufp, resp_size); if (hsz >= 0) { - Eterm *hp_start; + ErlHeapFragment* bp; + ErlOffHeap* ohp; + ErtsHeapFactory factory; byte *endp; hsz += 3; /* ok tuple */ hsz += ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; - hp_start = hp = erts_alloc_message_heap(hsz, - &bp, - &ohp, - rp, - &rp_locks); + hp = erts_alloc_message_heap(hsz, + &bp, + &ohp, + rp, + &rp_locks); endp = (byte *) resp_bufp; - msg = erts_decode_ext(&hp, ohp, &endp); + erts_factory_message_init(&factory, rp, hp, bp); + msg = erts_decode_ext(&factory, &endp); if (is_value(msg)) { + hp = erts_produce_heap(&factory, + 3, + ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE); msg = TUPLE2(hp, am_ok, msg); - hp += 3; queue_port_sched_op_reply(rp, &rp_locks, - hp_start, - hp, - hsz, - bp, + &factory, sigdp->ref, msg); @@ -4241,8 +4261,6 @@ port_sig_call(Port *prt, erts_smp_proc_unlock(rp, rp_locks); goto done; } - if (bp) - free_message_buffer(bp); if (rp_locks) erts_smp_proc_unlock(rp, rp_locks); } @@ -4319,10 +4337,11 @@ erts_port_call(Process* c_p, try_call_res = try_imm_drv_call(&try_call_state); switch (try_call_res) { case ERTS_TRY_IMM_DRV_CALL_OK: { - Eterm *hp, *hp_end; + ErtsHeapFactory factory; Sint hsz; unsigned ret_flags = 0U; Eterm term; + Eterm* hp; res = call_driver_call(c_p->common.id, prt, @@ -4342,15 +4361,14 @@ erts_port_call(Process* c_p, if (hsz < 0) return ERTS_PORT_OP_BADARG; hsz += 3; - hp = HAlloc(c_p, hsz); - hp_end = hp + hsz; + erts_factory_proc_prealloc_init(&factory, c_p, hsz); endp = (byte *) resp_bufp; - term = erts_decode_ext(&hp, &MSO(c_p), &endp); + term = erts_decode_ext(&factory, &endp); if (term == THE_NON_VALUE) return ERTS_PORT_OP_BADARG; + hp = erts_produce_heap(&factory,3,0); *retvalp = TUPLE2(hp, am_ok, term); - hp += 3; - HRelease(c_p, hp_end, hp); + erts_factory_close(&factory); if (resp_bufp != &resp_buf[0] && !(ret_flags & DRIVER_CALL_KEEP_BUFFER)) driver_free(resp_bufp); @@ -4428,7 +4446,7 @@ make_port_info_term(Eterm **hpp_start, int len; int start; static Eterm item[] = ERTS_PORT_INFO_1_ITEMS; - static Eterm value[sizeof(item)/sizeof(item[0])]; + Eterm value[sizeof(item)/sizeof(item[0])]; start = 0; len = sizeof(item)/sizeof(item[0]); @@ -4485,12 +4503,11 @@ port_sig_info(Port *prt, prt, sigdp->u.info.item); if (is_value(value)) { + ErtsHeapFactory factory; + erts_factory_message_init(&factory, NULL, hp, bp); queue_port_sched_op_reply(rp, &rp_locks, - hp_start, - hp, - hsz, - bp, + &factory, sigdp->ref, value); } @@ -4561,6 +4578,102 @@ erts_port_info(Process* c_p, } typedef struct { + Uint sched_id; + Eterm pid; + Uint32 refn[ERTS_REF_NUMBERS]; + erts_smp_atomic32_t refc; +} ErtsIOBytesReq; + +static void +reply_io_bytes(void *vreq) +{ + ErtsIOBytesReq *req = (ErtsIOBytesReq *) vreq; + Process *rp; + + rp = erts_proc_lookup(req->pid); + if (rp) { + ErlOffHeap *ohp = NULL; + ErlHeapFragment *bp = NULL; + ErtsProcLocks rp_locks; + Eterm ref, msg, ein, eout, *hp; + Uint64 in, out; + Uint hsz; + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + Uint sched_id = esdp->no; + in = esdp->io.in; + out = esdp->io.out; + if (req->sched_id != sched_id) + rp_locks = 0; + else { + in += (Uint64) erts_atomic64_read_nob(&bytes_in); + out += (Uint64) erts_atomic64_read_nob(&bytes_out); + rp_locks = ERTS_PROC_LOCK_MAIN; + } + + hsz = 5 /* 4-tuple */ + REF_THING_SIZE; + + erts_bld_uint64(NULL, &hsz, in); + erts_bld_uint64(NULL, &hsz, out); + + hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks); + + ref = make_internal_ref(hp); + write_ref_thing(hp, req->refn[0], req->refn[1], req->refn[2]); + hp += REF_THING_SIZE; + + ein = erts_bld_uint64(&hp, NULL, in); + eout = erts_bld_uint64(&hp, NULL, out); + + msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout); + erts_queue_message(rp, &rp_locks, bp, msg, NIL); + + if (req->sched_id == sched_id) + rp_locks &= ~ERTS_PROC_LOCK_MAIN; + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + } + + if (erts_smp_atomic32_dec_read_nob(&req->refc) == 0) + erts_free(ERTS_ALC_T_IOB_REQ, req); +} + +Eterm +erts_request_io_bytes(Process *c_p) +{ + Uint *hp; + Eterm ref; + Uint32 *refn; + ErtsSchedulerData *esdp = ERTS_PROC_GET_SCHDATA(c_p); + ErtsIOBytesReq *req = erts_alloc(ERTS_ALC_T_IOB_REQ, + sizeof(ErtsIOBytesReq)); + + hp = HAlloc(c_p, REF_THING_SIZE); + ref = erts_sched_make_ref_in_buffer(esdp, hp); + refn = internal_ref_numbers(ref); + + req->sched_id = esdp->no; + req->pid = c_p->common.id; + req->refn[0] = refn[0]; + req->refn[1] = refn[1]; + req->refn[2] = refn[2]; + erts_smp_atomic32_init_nob(&req->refc, + (erts_aint32_t) erts_no_schedulers); + +#ifdef ERTS_SMP + if (erts_no_schedulers > 1) + erts_schedule_multi_misc_aux_work(1, + erts_no_schedulers, + reply_io_bytes, + (void *) req); +#endif + + reply_io_bytes((void *) req); + + return ref; +} + + +typedef struct { int to; void *arg; } prt_one_lnk_data; @@ -4650,7 +4763,7 @@ set_busy_port(ErlDrvPort dprt, int on) #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_busy)) { - erts_snprintf(port_str, sizeof(port_str), + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); DTRACE1(port_busy, port_str); } @@ -4663,7 +4776,7 @@ set_busy_port(ErlDrvPort dprt, int on) #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_not_busy)) { - erts_snprintf(port_str, sizeof(port_str), + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); DTRACE1(port_not_busy, port_str); } @@ -4715,9 +4828,9 @@ erts_port_resume_procs(Port *prt) DTRACE_CHARBUF(pid_str, 16); ErtsProcList* plp2 = plp; - erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id); + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); while (plp2 != NULL) { - erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); + erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->pid); DTRACE2(process_port_unblocked, pid_str, port_str); } } @@ -4949,24 +5062,6 @@ erts_free_port_names(ErtsPortNames *pnp) erts_free(ERTS_ALC_T_PORT_NAMES, pnp); } -static void schedule_port_timeout(Port *p) -{ - /* - * Scheduling of port timeouts can be done without port locking, but - * since the task handle is stored in the port structure and the ptimer - * structure is protected by the port lock we require the port to be - * locked for now... - * - * TODO: Implement scheduling of port timeouts without locking - * the port. - * /Rickard - */ - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); - erts_port_task_schedule(p->common.id, - &p->timeout_task, - ERTS_PORT_TASK_TIMEOUT); -} - ErlDrvTermData driver_mk_term_nil(void) { return driver_term_nil; @@ -4995,7 +5090,7 @@ void driver_report_exit(ErlDrvPort ix, int status) rp = (scheduler ? erts_proc_lookup(pid) - : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_SMP_INC_REFC)); + : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_INC_REFC)); if (!rp) return; @@ -5005,15 +5100,11 @@ void driver_report_exit(ErlDrvPort ix, int status) hp += 3; tuple = TUPLE2(hp, prt->common.id, tuple); - erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined -#ifdef USE_VM_PROBES - , NIL -#endif - ); + erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined); erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) - erts_smp_proc_dec_refc(rp); + erts_proc_dec_refc(rp); } #define ERTS_B2T_STATES_DEF_STATES_SZ 5 @@ -5105,22 +5196,27 @@ cleanup_b2t_states(struct b2t_states__ *b2tsp) static int driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) { +#define HEAP_EXTRA 200 #define ERTS_DDT_FAIL do { res = -1; goto done; } while (0) Uint need = 0; int depth = 0; int res; - Eterm *hp = NULL, *hp_start = NULL, *hp_end = NULL; ErlDrvTermData* ptr; ErlDrvTermData* ptr_end; DECLARE_ESTACK(stack); - Eterm mess = NIL; /* keeps compiler happy */ + Eterm mess; Process* rp = NULL; - ErlHeapFragment *bp = NULL; - ErlOffHeap *ohp; + ErtsHeapFactory factory; ErtsProcLocks rp_locks = 0; struct b2t_states__ b2t; - int scheduler = 1; /* Silence erroneous warning... */ + int scheduler; + int is_heap_need_limited = 1; + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + + ERTS_UNDEF(mess,NIL); + ERTS_UNDEF(scheduler,1); + factory.mode = FACTORY_CLOSED; init_b2t_states(&b2t); /* @@ -5284,14 +5380,34 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) #ifdef DEBUG b2t.org_ext[b2t.ix] = ext; #endif - hsz = erts_binary2term_prepare(&b2t.state[b2t.ix++], ext, size); + hsz = erts_binary2term_prepare(&b2t.state[b2t.ix], ext, size); if (hsz < 0) ERTS_DDT_FAIL; /* Invalid data */ + b2t.state[b2t.ix++].heap_size = hsz; need += hsz; ptr += 2; depth++; + if (size > MAP_SMALL_MAP_LIMIT*3) { /* may contain big map */ + is_heap_need_limited = 0; + } break; } + case ERL_DRV_MAP: { /* int */ + ERTS_DDT_CHK_ENOUGH_ARGS(1); + if ((int) ptr[0] < 0) ERTS_DDT_FAIL; + if (ptr[0] > MAP_SMALL_MAP_LIMIT) { + need += HASHMAP_ESTIMATED_HEAP_SIZE(ptr[0]); + is_heap_need_limited = 0; + } else { + need += MAP_HEADER_FLATMAP_SZ + 1 + 2*ptr[0]; + } + depth -= 2*ptr[0]; + if (depth < 0) ERTS_DDT_FAIL; + ptr++; + depth++; + break; + } + default: ERTS_DDT_FAIL; } @@ -5314,14 +5430,23 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) scheduler = erts_get_scheduler_id() != 0; rp = (scheduler ? erts_proc_lookup(to) - : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC)); + : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_INC_REFC)); if (!rp) { res = 0; goto done; } - hp_start = hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks); - hp_end = hp + need; + /* Try copy directly to destination heap if we know there are no big maps */ + if (is_heap_need_limited) { + ErlOffHeap *ohp; + ErlHeapFragment* bp; + Eterm* hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks); + erts_factory_message_init(&factory, rp, hp, bp); + } + else { + erts_factory_message_init(&factory, NULL, NULL, + new_message_buffer(need)); + } /* * Interpret the instructions and build the term. @@ -5342,13 +5467,15 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) case ERL_DRV_INT: /* signed int argument */ #if HALFWORD_HEAP - mess = erts_bld_sint64(&hp, NULL, (Sint64)ptr[0]); + erts_reserve_heap(&factory, BIG_NEED_SIZE(2)); + mess = erts_bld_sint64(&factory.hp, NULL, (Sint64)ptr[0]); #else + erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE); if (IS_SSMALL((Sint)ptr[0])) mess = make_small((Sint)ptr[0]); else { - mess = small_to_big((Sint)ptr[0], hp); - hp += BIG_UINT_HEAP_SIZE; + mess = small_to_big((Sint)ptr[0], factory.hp); + factory.hp += BIG_UINT_HEAP_SIZE; } #endif ptr++; @@ -5356,25 +5483,29 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) case ERL_DRV_UINT: /* unsigned int argument */ #if HALFWORD_HEAP - mess = erts_bld_uint64(&hp, NULL, (Uint64)ptr[0]); + erts_reserve_heap(&factory, BIG_NEED_FOR_BITS(64)); + mess = erts_bld_uint64(&factory.hp, NULL, (Uint64)ptr[0]); #else + erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE); if (IS_USMALL(0, (Uint)ptr[0])) mess = make_small((Uint)ptr[0]); else { - mess = uint_to_big((Uint)ptr[0], hp); - hp += BIG_UINT_HEAP_SIZE; + mess = uint_to_big((Uint)ptr[0], factory.hp); + factory.hp += BIG_UINT_HEAP_SIZE; } #endif ptr++; break; case ERL_DRV_INT64: /* pointer to unsigned 64-bit int argument */ - mess = erts_bld_sint64(&hp, NULL, *((Sint64 *) ptr[0])); + erts_reserve_heap(&factory, BIG_NEED_FOR_BITS(64)); + mess = erts_bld_sint64(&factory.hp, NULL, *((Sint64 *) ptr[0])); ptr++; break; case ERL_DRV_UINT64: /* pointer to unsigned 64-bit int argument */ - mess = erts_bld_uint64(&hp, NULL, *((Uint64 *) ptr[0])); + erts_reserve_heap(&factory, BIG_NEED_FOR_BITS(64)); + mess = erts_bld_uint64(&factory.hp, NULL, *((Uint64 *) ptr[0])); ptr++; break; @@ -5388,11 +5519,14 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) Uint size = ptr[1]; Uint offset = ptr[2]; - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) size); + if (esdp) + esdp->io.in += (Uint64) size; + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) size); if (size <= ERL_ONHEAP_BIN_LIMIT) { - ErlHeapBin* hbp = (ErlHeapBin *) hp; - hp += heap_bin_size(size); + ErlHeapBin* hbp = (ErlHeapBin *) erts_produce_heap(&factory, + heap_bin_size(size), HEAP_EXTRA); hbp->thing_word = header_heap_bin(size); hbp->size = size; if (size > 0) { @@ -5401,18 +5535,18 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) mess = make_binary(hbp); } else { - ProcBin* pb = (ProcBin *) hp; + ProcBin* pb = (ProcBin *) erts_produce_heap(&factory, + PROC_BIN_SIZE, HEAP_EXTRA); driver_binary_inc_refc(b); /* caller will free binary */ pb->thing_word = HEADER_PROC_BIN; pb->size = size; - pb->next = ohp->first; - ohp->first = (struct erl_off_heap_header*)pb; + pb->next = factory.off_heap->first; + factory.off_heap->first = (struct erl_off_heap_header*)pb; pb->val = ErlDrvBinary2Binary(b); pb->bytes = ((byte*) b->orig_bytes) + offset; pb->flags = 0; mess = make_binary(pb); - hp += PROC_BIN_SIZE; - OH_OVERHEAD(ohp, pb->size / sizeof(Eterm)); + OH_OVERHEAD(factory.off_heap, pb->size / sizeof(Eterm)); } ptr += 3; break; @@ -5422,11 +5556,15 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) byte *bufp = (byte *) ptr[0]; Uint size = (Uint) ptr[1]; - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) size); + if (esdp) + esdp->io.in += (Uint64) size; + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) size); if (size <= ERL_ONHEAP_BIN_LIMIT) { - ErlHeapBin* hbp = (ErlHeapBin *) hp; - hp += heap_bin_size(size); + ErlHeapBin* hbp = (ErlHeapBin *) erts_produce_heap(&factory, + heap_bin_size(size), + HEAP_EXTRA); hbp->thing_word = header_heap_bin(size); hbp->size = size; if (size > 0) { @@ -5439,20 +5577,18 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) ProcBin* pbp; Binary* bp = erts_bin_nrml_alloc(size); ASSERT(bufp); - bp->flags = 0; - bp->orig_size = (SWord) size; erts_refc_init(&bp->refc, 1); sys_memcpy((void *) bp->orig_bytes, (void *) bufp, size); - pbp = (ProcBin *) hp; - hp += PROC_BIN_SIZE; + pbp = (ProcBin *) erts_produce_heap(&factory, + PROC_BIN_SIZE, HEAP_EXTRA); pbp->thing_word = HEADER_PROC_BIN; pbp->size = size; - pbp->next = ohp->first; - ohp->first = (struct erl_off_heap_header*)pbp; + pbp->next = factory.off_heap->first; + factory.off_heap->first = (struct erl_off_heap_header*)pbp; pbp->val = bp; pbp->bytes = (byte*) bp->orig_bytes; pbp->flags = 0; - OH_OVERHEAD(ohp, pbp->size / sizeof(Eterm)); + OH_OVERHEAD(factory.off_heap, pbp->size / sizeof(Eterm)); mess = make_binary(pbp); } ptr += 2; @@ -5460,14 +5596,19 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) } case ERL_DRV_STRING: /* char*, length */ - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) ptr[1]); - mess = buf_to_intlist(&hp, (char*)ptr[0], ptr[1], NIL); + if (esdp) + esdp->io.in += (Uint64) ptr[1]; + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) ptr[1]); + erts_reserve_heap(&factory, 2*ptr[1]); + mess = buf_to_intlist(&factory.hp, (char*)ptr[0], ptr[1], NIL); ptr += 2; break; case ERL_DRV_STRING_CONS: /* char*, length */ mess = ESTACK_POP(stack); - mess = buf_to_intlist(&hp, (char*)ptr[0], ptr[1], mess); + erts_reserve_heap(&factory, 2*ptr[1]); + mess = buf_to_intlist(&factory.hp, (char*)ptr[0], ptr[1], mess); ptr += 2; break; @@ -5476,11 +5617,12 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) mess = ESTACK_POP(stack); i--; + erts_reserve_heap(&factory, 2*i); while(i > 0) { Eterm hd = ESTACK_POP(stack); - mess = CONS(hp, hd, mess); - hp += 2; + mess = CONS(factory.hp, hd, mess); + factory.hp += 2; i--; } ptr++; @@ -5489,13 +5631,12 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) case ERL_DRV_TUPLE: { /* int */ int size = (int)ptr[0]; - Eterm* tp = hp; + Eterm* tp = erts_produce_heap(&factory, size+1, HEAP_EXTRA); *tp = make_arityval(size); mess = make_tuple(tp); tp += size; /* point at last element */ - hp = tp+1; /* advance "heap" pointer */ while(size--) { *tp-- = ESTACK_POP(stack); @@ -5511,23 +5652,69 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) case ERL_DRV_FLOAT: { /* double * */ FloatDef f; + Eterm* fp = erts_produce_heap(&factory, FLOAT_SIZE_OBJECT, HEAP_EXTRA); - mess = make_float(hp); + mess = make_float(fp); f.fd = *((double *) ptr[0]); - PUT_DOUBLE(f, hp); - hp += FLOAT_SIZE_OBJECT; + if (!erts_isfinite(f.fd)) + ERTS_DDT_FAIL; + PUT_DOUBLE(f, fp); ptr++; break; } case ERL_DRV_EXT2TERM: /* char *ext, int size */ ASSERT(b2t.org_ext[b2t.ix] == (byte *) ptr[0]); - mess = erts_binary2term_create(&b2t.state[b2t.ix++], &hp, ohp); + + erts_reserve_heap(&factory, b2t.state[b2t.ix].heap_size); + mess = erts_binary2term_create(&b2t.state[b2t.ix++], &factory); if (mess == THE_NON_VALUE) ERTS_DDT_FAIL; ptr += 2; break; + case ERL_DRV_MAP: { /* int */ + int size = (int)ptr[0]; + if (size > MAP_SMALL_MAP_LIMIT) { + int ix = 2*size; + Eterm* leafs; + + erts_produce_heap(&factory, ix, HEAP_EXTRA); + leafs = factory.hp; + while(ix--) { *--leafs = ESTACK_POP(stack); } + + mess = erts_hashmap_from_array(&factory, leafs, size, 1); + if (is_non_value(mess)) + ERTS_DDT_FAIL; + } else { + Eterm* vp; + flatmap_t *mp; + Eterm* tp = erts_produce_heap(&factory, + 2*size + 1 + MAP_HEADER_FLATMAP_SZ, + HEAP_EXTRA); + + *tp = make_arityval(size); + + mp = (flatmap_t*) (tp + 1 + size); + mp->thing_word = MAP_HEADER_FLATMAP; + mp->size = size; + mp->keys = make_tuple(tp); + mess = make_flatmap(mp); + + tp += size; /* point at last key */ + vp = factory.hp - 1; /* point at last value */ + + while(size--) { + *vp-- = ESTACK_POP(stack); + *tp-- = ESTACK_POP(stack); + } + if (!erts_validate_and_sort_flatmap(mp)) + ERTS_DDT_FAIL; + } + ptr++; + break; + } + } ESTACK_PUSH(stack, mess); } @@ -5538,42 +5725,28 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) if (res > 0) { mess = ESTACK_POP(stack); /* get resulting value */ - if (bp) - bp = erts_resize_message_buffer(bp, hp - hp_start, &mess, 1); - else { - ASSERT(hp); - HRelease(rp, hp_end, hp); - } + erts_factory_close(&factory); /* send message */ - erts_queue_message(rp, &rp_locks, bp, mess, am_undefined -#ifdef USE_VM_PROBES - , NIL -#endif - ); + erts_queue_message(rp, &rp_locks, factory.heap_frags, mess, am_undefined); } else { if (b2t.ix > b2t.used) b2t.used = b2t.ix; for (b2t.ix = 0; b2t.ix < b2t.used; b2t.ix++) erts_binary2term_abort(&b2t.state[b2t.ix]); - if (bp) - free_message_buffer(bp); - else if (hp) { - HRelease(rp, hp_end, hp); - } + erts_factory_undo(&factory); } -#ifdef ERTS_SMP if (rp) { if (rp_locks) erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) - erts_smp_proc_dec_refc(rp); + erts_proc_dec_refc(rp); } -#endif cleanup_b2t_states(&b2t); DESTROY_ESTACK(stack); return res; #undef ERTS_DDT_FAIL +#undef HEAP_EXTRA } static ERTS_INLINE int @@ -5702,6 +5875,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, { erts_aint32_t state; Port* prt = erts_drvport2port_state(ix, &state); + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -5712,7 +5886,10 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, return 0; prt->bytes_in += (hlen + len); - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len)); + if (esdp) + esdp->io.in += (Uint64) (hlen + len); + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len)); if (state & ERTS_PORT_SFLG_DISTRIBUTION) { return erts_net_message(prt, prt->dist_entry, @@ -5737,6 +5914,7 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, { erts_aint32_t state; Port* prt = erts_drvport2port_state(ix, &state); + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -5748,7 +5926,10 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, return 0; prt->bytes_in += (hlen + len); - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len)); + if (esdp) + esdp->io.in += (Uint64) (hlen + len); + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len)); if (state & ERTS_PORT_SFLG_DISTRIBUTION) { if (len == 0) return erts_net_message(prt, @@ -5788,6 +5969,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, ErlDrvBinary** binv; Port* prt; erts_aint32_t state; + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -5826,7 +6008,10 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, /* XXX handle distribution !!! */ prt->bytes_in += (hlen + size); - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + size)); + if (esdp) + esdp->io.in += (Uint64) (hlen + size); + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + size)); deliver_vec_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen, binv, iov, n, size); return 0; @@ -5902,9 +6087,7 @@ driver_alloc_binary(ErlDrvSizeT size) bin = erts_bin_drv_alloc_fnf((Uint) size); if (!bin) return NULL; /* The driver write must take action */ - bin->flags = BIN_FLAG_DRV; erts_refc_init(&bin->refc, 1); - bin->orig_size = (SWord) size; return Binary2ErlDrvBinary(bin); } @@ -5934,7 +6117,6 @@ ErlDrvBinary* driver_realloc_binary(ErlDrvBinary* bin, ErlDrvSizeT size) if (!newbin) return NULL; - newbin->orig_size = size; return Binary2ErlDrvBinary(newbin); } @@ -6078,7 +6260,7 @@ driver_pdl_create(ErlDrvPort dp) return NULL; pdl = erts_alloc(ERTS_ALC_T_PORT_DATA_LOCK, sizeof(struct erl_drv_port_data_lock)); - erts_mtx_init(&pdl->mtx, "port_data_lock"); + erts_mtx_init_x(&pdl->mtx, "port_data_lock", pp->common.id, 1); pdl_init_refc(pdl); erts_port_inc_refc(pp); pdl->prt = pp; @@ -6518,18 +6700,6 @@ int driver_pushq(ErlDrvPort ix, char* buffer, ErlDrvSizeT len) return code; } -static ERTS_INLINE void -drv_cancel_timer(Port *prt) -{ -#ifdef ERTS_SMP - erts_cancel_smp_ptimer(prt->common.u.alive.ptimer); -#else - erts_cancel_timer(&prt->common.u.alive.tm); -#endif - if (erts_port_task_is_scheduled(&prt->timeout_task)) - erts_port_task_abort(&prt->timeout_task); -} - int driver_set_timer(ErlDrvPort ix, unsigned long t) { Port* prt = erts_drvport2port(ix); @@ -6541,19 +6711,8 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t) if (prt->drv_ptr->timeout == NULL) return -1; - drv_cancel_timer(prt); -#ifdef ERTS_SMP - erts_create_smp_ptimer(&prt->common.u.alive.ptimer, - prt->common.id, - (ErlTimeoutProc) schedule_port_timeout, - t); -#else - erts_set_timer(&prt->common.u.alive.tm, - (ErlTimeoutProc) schedule_port_timeout, - NULL, - prt, - t); -#endif + + erts_set_port_timer(prt, (Sint64) t); return 0; } @@ -6563,28 +6722,28 @@ int driver_cancel_timer(ErlDrvPort ix) if (prt == ERTS_INVALID_ERL_DRV_PORT) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - drv_cancel_timer(prt); + erts_cancel_port_timer(prt); return 0; } - int driver_read_timer(ErlDrvPort ix, unsigned long* t) { Port* prt = erts_drvport2port(ix); + Sint64 left; ERTS_SMP_CHK_NO_PROC_LOCKS; if (prt == ERTS_INVALID_ERL_DRV_PORT) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); -#ifdef ERTS_SMP - *t = (prt->common.u.alive.ptimer - ? erts_time_left(&prt->common.u.alive.ptimer->timer.tm) - : 0); -#else - *t = erts_time_left(&prt->common.u.alive.tm); -#endif + + left = erts_read_port_timer(prt); + if (left < 0) + left = 0; + + *t = (unsigned long) left; + return 0; } @@ -6604,11 +6763,33 @@ driver_get_now(ErlDrvNowData *now_data) return 0; } +ErlDrvTime +erl_drv_monotonic_time(ErlDrvTimeUnit time_unit) +{ + return (ErlDrvTime) erts_napi_monotonic_time((int) time_unit); +} + +ErlDrvTime +erl_drv_time_offset(ErlDrvTimeUnit time_unit) +{ + return (ErlDrvTime) erts_napi_time_offset((int) time_unit); +} + +ErlDrvTime +erl_drv_convert_time_unit(ErlDrvTime val, + ErlDrvTimeUnit from, + ErlDrvTimeUnit to) +{ + return (ErlDrvTime) erts_napi_convert_time_unit((ErtsMonotonicTime) val, + (int) from, + (int) to); +} + static void ref_to_driver_monitor(Eterm ref, ErlDrvMonitor *mon) { RefThing *refp; ASSERT(is_internal_ref(ref)); - ASSERT(sizeof(RefThing) <= sizeof(ErlDrvMonitor)); + ERTS_CT_ASSERT(sizeof(RefThing) <= sizeof(ErlDrvMonitor)); refp = ref_thing_ptr(ref); memset(mon,0,sizeof(ErlDrvMonitor)); memcpy(mon,refp,sizeof(RefThing)); @@ -7060,7 +7241,7 @@ void *driver_dl_open(char * path) int res; int *last_error_p = erts_smp_tsd_get(driver_list_last_error_key); int locked = maybe_lock_driver_list(); - if ((res = erts_sys_ddll_open(path, &ptr)) == 0) { + if ((res = erts_sys_ddll_open(path, &ptr, NULL)) == 0) { maybe_unlock_driver_list(locked); return ptr; } else { @@ -7115,7 +7296,7 @@ char *driver_dl_error(void) #define ERL_DRV_SYS_INFO_SIZE(LAST_FIELD) \ - (((size_t) &((ErlDrvSysInfo *) 0)->LAST_FIELD) \ + (offsetof(ErlDrvSysInfo, LAST_FIELD) \ + sizeof(((ErlDrvSysInfo *) 0)->LAST_FIELD)) void @@ -7177,6 +7358,18 @@ driver_system_info(ErlDrvSysInfo *sip, size_t si_size) sip->nif_major_version = ERL_NIF_MAJOR_VERSION; sip->nif_minor_version = ERL_NIF_MINOR_VERSION; } + /* + * 'dirty_scheduler_support' is the last field in the 4th version + * (driver version 3.1, NIF version 2.7) + */ + if (si_size >= ERL_DRV_SYS_INFO_SIZE(dirty_scheduler_support)) { +#if defined(ERL_NIF_DIRTY_SCHEDULER_SUPPORT) && defined(USE_THREADS) + sip->dirty_scheduler_support = 1; +#else + sip->dirty_scheduler_support = 0; +#endif + } + } @@ -7240,6 +7433,8 @@ no_stop_select_callback(ErlDrvEvent event, void* private) erts_send_error_to_logger_nogl(dsbufp); } +#define IS_DRIVER_VERSION_GE(DE,MAJOR,MINOR) \ + ((DE)->major_version >= (MAJOR) && (DE)->minor_version >= (MINOR)) static int init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle) @@ -7263,10 +7458,11 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle) erts_atom_put((byte *) drv->name, sys_strlen(drv->name), ERTS_ATOM_ENC_LATIN1, - 1) + 1), #else - NIL + NIL, #endif + 1 ); } #endif @@ -7286,6 +7482,7 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle) drv->timeout = de->timeout ? de->timeout : no_timeout_callback; drv->ready_async = de->ready_async; drv->process_exit = de->process_exit; + drv->emergency_close = IS_DRIVER_VERSION_GE(de,3,2) ? de->emergency_close : NULL; if (de->stop_select) drv->stop_select = de->stop_select; else @@ -7304,6 +7501,8 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle) } } +#undef IS_DRIVER_VERSION_GE + void erts_destroy_driver(erts_driver_t *drv) { @@ -7420,15 +7619,15 @@ int null_func(void) } int -erl_drv_putenv(char *key, char *value) +erl_drv_putenv(const char *key, char *value) { - return erts_sys_putenv_raw(key, value); + return erts_sys_putenv_raw((char*)key, value); } int -erl_drv_getenv(char *key, char *value, size_t *value_size) +erl_drv_getenv(const char *key, char *value, size_t *value_size) { - return erts_sys_getenv_raw(key, value, value_size); + return erts_sys_getenv_raw((char*)key, value, value_size); } /* get heart_port @@ -7447,7 +7646,7 @@ Port *erts_get_heart_port(void) if (!port) continue; /* only examine undead or alive ports */ - if (erts_atomic32_read_nob(&port->state) & ERTS_PORT_SFLGS_DEAD) + if (erts_atomic32_read_nob(&port->state) & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) continue; /* immediate atom compare */ reg = port->common.u.alive.reg; @@ -7458,3 +7657,23 @@ Port *erts_get_heart_port(void) return NULL; } + +void erts_emergency_close_ports(void) +{ + int ix, max = erts_ptab_max(&erts_port); + + for (ix = 0; ix < max; ix++) { + Port *port = erts_pix2port(ix); + + if (!port) + continue; + /* only examine undead or alive ports */ + if (erts_atomic32_read_nob(&port->state) & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + continue; + + /* emergency close socket */ + if (port->drv_ptr->emergency_close) { + port->drv_ptr->emergency_close((ErlDrvData) port->drv_data); + } + } +} |