diff options
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r-- | erts/emulator/beam/io.c | 558 |
1 files changed, 263 insertions, 295 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 8c67f731f4..9ae973e108 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1996-2013. All Rights Reserved. + * Copyright Ericsson AB 1996-2014. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in @@ -46,9 +46,12 @@ #define ERTS_WANT_EXTERNAL_TAGS #include "external.h" #include "dtrace-wrapper.h" +#include "erl_map.h" extern ErlDrvEntry fd_driver_entry; +#ifndef __OSE__ extern ErlDrvEntry vanilla_driver_entry; +#endif extern ErlDrvEntry spawn_driver_entry; extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */ @@ -244,11 +247,13 @@ static ERTS_INLINE void port_init_instr(Port *prt ASSERT(prt->drv_ptr && prt->lock); if (!prt->drv_ptr->lock) { char *lock_str = "port_lock"; + erts_mtx_init_locked_x(prt->lock, lock_str, id, #ifdef ERTS_ENABLE_LOCK_COUNT - if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PORTLOCK)) - lock_str = NULL; + (erts_lcnt_rt_options & ERTS_LCNT_OPT_PORTLOCK) +#else + 0 #endif - erts_mtx_init_locked_x(prt->lock, lock_str, id); + ); } #endif erts_port_task_init_sched(&prt->sched, id); @@ -364,9 +369,8 @@ static Port *create_port(char *name, ERTS_P_LINKS(prt) = NULL; ERTS_P_MONITORS(prt) = NULL; prt->linebuf = NULL; - prt->bp = NULL; prt->suspended = NULL; - prt->data = am_undefined; + erts_init_port_data(prt); prt->port_data_lock = NULL; prt->control_flags = 0; prt->bytes_in = 0; @@ -910,8 +914,8 @@ int erts_port_handle_xports(Port *prt) (iov)->iov_base = (ptr); \ (iov)->iov_len = (len); \ if (sizeof((iov)->iov_len) < sizeof(len) \ - /* Check if (len) overflowed (iov)->iov_len */ \ - && ((len) >> (sizeof((iov)->iov_len)*CHAR_BIT)) != 0) { \ + /* Check if (len) overflowed (iov)->iov_len */ \ + && (iov)->iov_len != (len)) { \ goto L_overflow; \ } \ *(bv)++ = (bin); \ @@ -1214,9 +1218,10 @@ typedef struct { static ERTS_INLINE ErtsTryImmDrvCallResult try_imm_drv_call(ErtsTryImmDrvCallState *sp) { + unsigned int prof_runnable_ports; ErtsTryImmDrvCallResult res; int reds_left_in; - erts_aint32_t invalid_state, invalid_sched_flags; + erts_aint32_t act, exp, invalid_state, invalid_sched_flags; Port *prt = sp->port; Process *c_p = sp->c_p; @@ -1243,18 +1248,39 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp) goto locked_fail; } - sp->sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); - if (sp->sched_flags & invalid_sched_flags) { - res = ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS; - goto locked_fail; - } + prof_runnable_ports = erts_system_profile_flags.runnable_ports; + if (prof_runnable_ports) + erts_port_task_sched_lock(&prt->sched); + act = erts_smp_atomic32_read_nob(&prt->sched.flags); + + do { + erts_aint32_t new; + + if (act & invalid_sched_flags) { + res = ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS; + sp->sched_flags = act; + goto locked_fail; + } + exp = act; + new = act | ERTS_PTS_FLG_EXEC_IMM; + act = erts_smp_atomic32_cmpxchg_mb(&prt->sched.flags, new, exp); + } while (act != exp); + + sp->sched_flags = act; if (!c_p) reds_left_in = CONTEXT_REDS/10; else { if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS)) trace_virtual_sched(c_p, am_out); + /* + * No status lock held while sending runnable + * proc trace messages. It is however not needed + * in this case, since only this thread can send + * such messages for this process until the process + * has been scheduled out. + */ if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) profile_runnable_proc(c_p, am_inactive); @@ -1269,11 +1295,14 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp) ERTS_SMP_CHK_NO_PROC_LOCKS; - if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) - trace_sched_ports_where(prt, am_in, sp->port_op); - if (erts_system_profile_flags.runnable_ports - && !erts_port_is_scheduled(prt)) - profile_runnable_port(prt, am_active); + if (prof_runnable_ports | IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) { + if (prof_runnable_ports && !(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) + profile_runnable_port(prt, am_active); + if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) + trace_sched_ports_where(prt, am_in, sp->port_op); + if (prof_runnable_ports) + erts_port_task_sched_unlock(&prt->sched); + } sp->fpe_was_unmasked = erts_block_fpe(); @@ -1290,17 +1319,31 @@ finalize_imm_drv_call(ErtsTryImmDrvCallState *sp) int reds; Port *prt = sp->port; Process *c_p = sp->c_p; + erts_aint32_t act; + unsigned int prof_runnable_ports; reds = prt->reds; reds += erts_port_driver_callback_epilogue(prt, NULL); erts_unblock_fpe(sp->fpe_was_unmasked); - if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) - trace_sched_ports_where(prt, am_out, sp->port_op); - if (erts_system_profile_flags.runnable_ports - && !erts_port_is_scheduled(prt)) - profile_runnable_port(prt, am_inactive); + prof_runnable_ports = erts_system_profile_flags.runnable_ports; + if (prof_runnable_ports) + erts_port_task_sched_lock(&prt->sched); + + act = erts_smp_atomic32_read_band_mb(&prt->sched.flags, + ~ERTS_PTS_FLG_EXEC_IMM); + ERTS_SMP_LC_ASSERT(act & ERTS_PTS_FLG_EXEC_IMM); + + if (prof_runnable_ports | IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) { + if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) + trace_sched_ports_where(prt, am_out, sp->port_op); + if (prof_runnable_ports) { + if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) + profile_runnable_port(prt, am_inactive); + erts_port_task_sched_unlock(&prt->sched); + } + } erts_port_release(prt); @@ -1315,6 +1358,13 @@ finalize_imm_drv_call(ErtsTryImmDrvCallState *sp) if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS)) trace_virtual_sched(c_p, am_in); + /* + * No status lock held while sending runnable + * proc trace messages. It is however not needed + * in this case, since only this thread can send + * such messages for this process until the process + * has been scheduled out. + */ if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) profile_runnable_proc(c_p, am_active); @@ -1331,7 +1381,7 @@ force_imm_drv_call(ErtsTryImmDrvCallState *sp) erts_aint32_t invalid_state; Port *prt = sp->port; - ASSERT(ERTS_IS_CRASH_DUMPING) + ASSERT(ERTS_IS_CRASH_DUMPING); ASSERT(is_atom(sp->port_op)); invalid_state = sp->state; @@ -1442,6 +1492,7 @@ erts_schedule_proc2port_signal(Process *c_p, Eterm *refp, ErtsProc2PortSigData *sigdp, int task_flags, + ErtsPortTaskHandle *pthp, ErtsProc2PortSigCallback callback) { int sched_res; @@ -1491,7 +1542,7 @@ erts_schedule_proc2port_signal(Process *c_p, /* Schedule port close call for later execution... */ sched_res = erts_port_task_schedule(prt->common.id, - NULL, + pthp, ERTS_PORT_TASK_PROC_SIG, sigdp, callback, @@ -1629,6 +1680,7 @@ bad_port_signal(Process *c_p, refp, sigdp, 0, + NULL, port_badsig); } @@ -1838,8 +1890,11 @@ erts_port_output(Process *c_p, ErlIOVec *evp = NULL; char *buf = NULL; int force_immediate_call = (flags & ERTS_PORT_SIG_FLG_FORCE_IMM_CALL); + int async_nosuspend; + ErtsPortTaskHandle *ns_pthp; ASSERT((flags & ~(ERTS_PORT_SIG_FLG_BANG_OP + | ERTS_PORT_SIG_FLG_ASYNC | ERTS_PORT_SIG_FLG_NOSUSPEND | ERTS_PORT_SIG_FLG_FORCE | ERTS_PORT_SIG_FLG_FORCE_IMM_CALL)) == 0); @@ -1861,6 +1916,12 @@ erts_port_output(Process *c_p, ? ERTS_PORT_OP_DROPPED : ERTS_PORT_OP_BUSY); + async_nosuspend = ((flags & (ERTS_PORT_SIG_FLG_ASYNC + | ERTS_PORT_SIG_FLG_NOSUSPEND + | ERTS_PORT_SIG_FLG_FORCE)) + == (ERTS_PORT_SIG_FLG_ASYNC + | ERTS_PORT_SIG_FLG_NOSUSPEND)); + try_call = (force_immediate_call /* crash dumping */ || !(sched_flags & (invalid_flags | ERTS_PTS_FLGS_FORCE_SCHEDULE_OP))); @@ -1995,6 +2056,15 @@ erts_port_output(Process *c_p, return ERTS_PORT_OP_DONE; case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: sched_flags = try_call_state.sched_flags; + if (async_nosuspend + && (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))) { + driver_free_binary(cbin); + if (evp != &ev) + erts_free(ERTS_ALC_T_TMP, evp); + return ((sched_flags & ERTS_PTS_FLG_EXIT) + ? ERTS_PORT_OP_DROPPED + : ERTS_PORT_OP_BUSY); + } case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: /* Schedule outputv() call instead... */ break; @@ -2142,6 +2212,13 @@ erts_port_output(Process *c_p, return ERTS_PORT_OP_DONE; case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: sched_flags = try_call_state.sched_flags; + if (async_nosuspend + && (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))) { + erts_free(ERTS_ALC_T_TMP, buf); + return ((sched_flags & ERTS_PTS_FLG_EXIT) + ? ERTS_PORT_OP_DROPPED + : ERTS_PORT_OP_BUSY); + } case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: /* Schedule outputv() call instead... */ break; @@ -2163,20 +2240,33 @@ erts_port_output(Process *c_p, task_flags = ERTS_PT_FLG_WAIT_BUSY; sigdp->flags |= flags; + ns_pthp = NULL; if (flags & (ERTS_P2P_SIG_DATA_FLG_FORCE|ERTS_P2P_SIG_DATA_FLG_NOSUSPEND)) { task_flags = 0; if (flags & ERTS_P2P_SIG_DATA_FLG_FORCE) sigdp->flags &= ~ERTS_P2P_SIG_DATA_FLG_NOSUSPEND; + else if (async_nosuspend) { + ErtsSchedulerData *esdp = (c_p + ? ERTS_PROC_GET_SCHDATA(c_p) + : erts_get_scheduler_data()); + ASSERT(esdp); + ns_pthp = &esdp->nosuspend_port_task_handle; + sigdp->flags &= ~ERTS_P2P_SIG_DATA_FLG_NOSUSPEND; + } else if (flags & ERTS_P2P_SIG_DATA_FLG_NOSUSPEND) task_flags = ERTS_PT_FLG_NOSUSPEND; } + ASSERT(ns_pthp || !async_nosuspend); + ASSERT(async_nosuspend || !ns_pthp); + res = erts_schedule_proc2port_signal(c_p, prt, c_p ? c_p->common.id : ERTS_INVALID_PID, refp, sigdp, task_flags, + ns_pthp, port_sig_callback); if (res != ERTS_PORT_OP_SCHEDULED) { @@ -2187,9 +2277,23 @@ erts_port_output(Process *c_p, return res; } - if (!(sched_flags & ERTS_PTS_FLG_EXIT) && (sched_flags & busy_flgs)) - return ERTS_PORT_OP_BUSY_SCHEDULED; - + if (!(flags & ERTS_PORT_SIG_FLG_FORCE)) { + sched_flags = erts_smp_atomic32_read_acqb(&prt->sched.flags); + if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)) { + if (async_nosuspend) + erts_port_task_tmp_handle_detach(ns_pthp); + } + else { + if (!async_nosuspend) + return ERTS_PORT_OP_BUSY_SCHEDULED; + else { + if (erts_port_task_abort(ns_pthp) == 0) + return ERTS_PORT_OP_BUSY; + else + erts_port_task_tmp_handle_detach(ns_pthp); + } + } + } return res; bad_value: @@ -2285,6 +2389,7 @@ erts_port_exit(Process *c_p, ErlHeapFragment *bp = NULL; ASSERT((flags & ~(ERTS_PORT_SIG_FLG_BANG_OP + | ERTS_PORT_SIG_FLG_ASYNC | ERTS_PORT_SIG_FLG_BROKEN_LINK | ERTS_PORT_SIG_FLG_FORCE_SCHED)) == 0); @@ -2345,6 +2450,7 @@ erts_port_exit(Process *c_p, refp, sigdp, 0, + NULL, port_sig_exit); if (res == ERTS_PORT_OP_DROPPED) { @@ -2413,7 +2519,7 @@ set_port_connected(int bang_op, DTRACE_CHARBUF(newprocess_str, DTRACE_TERM_BUF_SIZE); dtrace_pid_str(connect, process_str); - erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id); + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); dtrace_proc_str(rp, newprocess_str); DTRACE4(port_connect, process_str, port_str, prt->name, newprocess_str); } @@ -2460,7 +2566,8 @@ erts_port_connect(Process *c_p, !refp, am_connect); - ASSERT((flags & ~ERTS_PORT_SIG_FLG_BANG_OP) == 0); + ASSERT((flags & ~(ERTS_PORT_SIG_FLG_BANG_OP + | ERTS_PORT_SIG_FLG_ASYNC)) == 0); if (is_not_internal_pid(connect)) connect_id = NIL; /* Fail in op (for signal order) */ @@ -2499,6 +2606,7 @@ erts_port_connect(Process *c_p, refp, sigdp, 0, + NULL, port_sig_connect); } @@ -2555,6 +2663,7 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp) refp, sigdp, 0, + NULL, port_sig_unlink); } @@ -2644,18 +2753,26 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) refp, sigdp, 0, + NULL, port_sig_link); } void erts_init_io(int port_tab_size, - int port_tab_size_ignore_files) + int port_tab_size_ignore_files, + int legacy_port_tab) { ErlDrvEntry** dp; + UWord common_element_size; erts_smp_rwmtx_opt_t drv_list_rwmtx_opts = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER; drv_list_rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ; drv_list_rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED; + common_element_size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); + common_element_size += ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ)); + common_element_size += 10; /* name */ #ifdef ERTS_SMP + common_element_size += sizeof(erts_mtx_t); + init_xports_list_alloc(); #endif @@ -2676,15 +2793,19 @@ void erts_init_io(int port_tab_size, &drv_list_rwmtx_opts, "driver_list"); driver_list = NULL; - erts_smp_tsd_key_create(&driver_list_lock_status_key); - erts_smp_tsd_key_create(&driver_list_last_error_key); + erts_smp_tsd_key_create(&driver_list_lock_status_key, + "erts_driver_list_lock_status_key"); + erts_smp_tsd_key_create(&driver_list_last_error_key, + "erts_driver_list_last_error_key"); erts_ptab_init_table(&erts_port, ERTS_ALC_T_PORT_TABLE, NULL, (ErtsPTabElementCommon *) &erts_invalid_port.common, port_tab_size, - "port_table"); + common_element_size, /* Doesn't need to be excact */ + "port_table", + legacy_port_tab); erts_smp_atomic_init_nob(&erts_bytes_out, 0); erts_smp_atomic_init_nob(&erts_bytes_in, 0); @@ -2695,8 +2816,11 @@ void erts_init_io(int port_tab_size, erts_smp_rwmtx_rwlock(&erts_driver_list_lock); init_driver(&fd_driver, &fd_driver_entry, NULL); +#ifndef __OSE__ init_driver(&vanilla_driver, &vanilla_driver_entry, NULL); +#endif init_driver(&spawn_driver, &spawn_driver_entry, NULL); + erts_init_static_drivers(); for (dp = driver_tab; *dp != NULL; dp++) erts_add_driver_entry(*dp, NULL, 1); @@ -3364,11 +3488,8 @@ terminate_port(Port *prt) erts_free(ERTS_ALC_T_LINEBUF, (void *) prt->linebuf); prt->linebuf = NULL; } - if (prt->bp != NULL) { - free_message_buffer(prt->bp); - prt->bp = NULL; - prt->data = am_undefined; - } + + erts_cleanup_port_data(prt); if (prt->psd) erts_free(ERTS_ALC_T_PRTSD, prt->psd); @@ -3516,9 +3637,9 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed) DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); DTRACE_CHARBUF(rreason_str, 64); - erts_snprintf(from_str, sizeof(from_str), "%T", from); + erts_snprintf(from_str, sizeof(DTRACE_CHARBUF_NAME(from_str)), "%T", from); dtrace_port_str(p, port_str); - erts_snprintf(rreason_str, sizeof(rreason_str), "%T", rreason); + erts_snprintf(rreason_str, sizeof(DTRACE_CHARBUF_NAME(rreason_str)), "%T", rreason); DTRACE4(port_exit, from_str, port_str, p->name, rreason_str); } #endif @@ -3536,6 +3657,8 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed) if (send_closed) set_state_flags |= ERTS_PORT_SFLG_SEND_CLOSED; + erts_port_task_sched_enter_exiting_state(&p->sched); + state = erts_atomic32_read_bor_mb(&p->state, set_state_flags); state |= set_state_flags; @@ -3618,6 +3741,10 @@ erts_port_command(Process *c_p, ASSERT(port); flags |= ERTS_PORT_SIG_FLG_BANG_OP; + if (!erts_port_synchronous_ops) { + flags |= ERTS_PORT_SIG_FLG_ASYNC; + refp = NULL; + } if (is_tuple_arity(command, 2)) { Eterm cntd; @@ -3625,21 +3752,14 @@ erts_port_command(Process *c_p, cntd = tp[1]; if (is_internal_pid(cntd)) { if (tp[2] == am_close) { - if (!erts_port_synchronous_ops) - refp = NULL; flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND; return erts_port_exit(c_p, flags, port, cntd, am_normal, refp); } else if (is_tuple_arity(tp[2], 2)) { tp = tuple_val(tp[2]); if (tp[1] == am_command) { - if (!(flags & ERTS_PORT_SIG_FLG_NOSUSPEND) - && !erts_port_synchronous_ops) - refp = NULL; return erts_port_output(c_p, flags, port, cntd, tp[2], refp); } else if (tp[1] == am_connect) { - if (!erts_port_synchronous_ops) - refp = NULL; flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND; return erts_port_connect(c_p, flags, port, cntd, tp[2], refp); } @@ -3648,8 +3768,6 @@ erts_port_command(Process *c_p, } /* badsig */ - if (!erts_port_synchronous_ops) - refp = NULL; flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND; return bad_port_signal(c_p, flags, port, c_p->common.id, refp, am_command); } @@ -4018,7 +4136,7 @@ erts_port_control(Process* c_p, copy = 1; else { binp = ((ProcBin *) ebinp)->val; - ASSERT(bufp < bufp + size); + ASSERT(bufp <= bufp + size); ASSERT(binp->orig_bytes <= bufp && bufp + size <= binp->orig_bytes + binp->orig_size); erts_refc_inc(&binp->refc, 1); @@ -4046,6 +4164,7 @@ erts_port_control(Process* c_p, retvalp, sigdp, 0, + NULL, port_sig_control); if (res != ERTS_PORT_OP_SCHEDULED) { cleanup_scheduled_control(binp, bufp); @@ -4140,7 +4259,7 @@ port_sig_call(Port *prt, ErlOffHeap *ohp; Process *rp; ErtsProcLocks rp_locks = 0; - Uint hsz; + Sint hsz; rp = erts_proc_lookup_raw(sigdp->caller); if (!rp) @@ -4257,7 +4376,7 @@ erts_port_call(Process* c_p, switch (try_call_res) { case ERTS_TRY_IMM_DRV_CALL_OK: { Eterm *hp, *hp_end; - Uint hsz; + Sint hsz; unsigned ret_flags = 0U; Eterm term; @@ -4326,6 +4445,7 @@ erts_port_call(Process* c_p, retvalp, sigdp, 0, + NULL, port_sig_call); if (res != ERTS_PORT_OP_SCHEDULED) { cleanup_scheduled_call(bufp); @@ -4492,228 +4612,10 @@ erts_port_info(Process* c_p, retvalp, sigdp, 0, + NULL, port_sig_info); } -static int -port_sig_set_data(Port *prt, - erts_aint32_t state, - int op, - ErtsProc2PortSigData *sigdp) -{ - ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY); - - if (op == ERTS_PROC2PORT_SIG_EXEC) { - if (prt->bp) - free_message_buffer(prt->bp); - prt->bp = sigdp->u.set_data.bp; - prt->data = sigdp->u.set_data.data; - port_sched_op_reply(sigdp->caller, sigdp->ref, am_true); - } - else { - if (sigdp->u.set_data.bp) - free_message_buffer(sigdp->u.set_data.bp); - port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); - } - return ERTS_PORT_REDS_SET_DATA; -} - -ErtsPortOpResult -erts_port_set_data(Process* c_p, - Port *prt, - Eterm data, - Eterm *refp) -{ - ErtsPortOpResult res; - Eterm set_data; - ErlHeapFragment *bp; - ErtsProc2PortSigData *sigdp; - ErtsTryImmDrvCallResult try_call_res; - ErtsTryImmDrvCallState try_call_state - = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( - c_p, - prt, - ERTS_PORT_SFLGS_INVALID_LOOKUP, - 0, - !refp, - am_set_data); - - if (is_immed(data)) { - set_data = data; - bp = NULL; - } - else { - Eterm *hp; - Uint sz = size_object(data); - bp = new_message_buffer(sz); - hp = bp->mem; - set_data = copy_struct(data, sz, &hp, &bp->off_heap); - } - - try_call_res = try_imm_drv_call(&try_call_state); - switch (try_call_res) { - case ERTS_TRY_IMM_DRV_CALL_OK: - if (prt->bp) - free_message_buffer(prt->bp); - prt->bp = bp; - prt->data = set_data; - finalize_imm_drv_call(&try_call_state); - BUMP_REDS(c_p, ERTS_PORT_REDS_SET_DATA); - return ERTS_PORT_OP_DONE; - case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: - return ERTS_PORT_OP_DROPPED; - case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: - case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: - /* Schedule call instead... */ - break; - } - - sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = ERTS_P2P_SIG_TYPE_SET_DATA; - sigdp->u.set_data.data = set_data; - sigdp->u.set_data.bp = bp; - - res = erts_schedule_proc2port_signal(c_p, - prt, - c_p->common.id, - refp, - sigdp, - 0, - port_sig_set_data); - if (res != ERTS_PORT_OP_SCHEDULED && bp) - free_message_buffer(bp); - return res; -} - -static int -port_sig_get_data(Port *prt, - erts_aint32_t state, - int op, - ErtsProc2PortSigData *sigdp) -{ - ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY); - if (op != ERTS_PROC2PORT_SIG_EXEC) - port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); - else { - Process *rp; - ErtsProcLocks rp_locks = 0; - - rp = erts_proc_lookup_raw(sigdp->caller); - if (rp) { - Uint hsz; - Eterm *hp, *hp_start; - Eterm data, msg; - ErlHeapFragment *bp; - ErlOffHeap *ohp; - - hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; - hsz += 3; - if (prt->bp) - hsz += prt->bp->used_size; - - hp_start = hp = erts_alloc_message_heap(hsz, - &bp, - &ohp, - rp, - &rp_locks); - - if (is_immed(prt->data)) - data = prt->data; - else - data = copy_struct(prt->data, - prt->bp->used_size, - &hp, - &bp->off_heap); - - - - msg = TUPLE2(hp, am_ok, data); - hp += 3; - - queue_port_sched_op_reply(rp, - &rp_locks, - hp_start, - hp, - hsz, - bp, - sigdp->ref, - msg); - if (rp_locks) - erts_smp_proc_unlock(rp, rp_locks); - } - } - return ERTS_PORT_REDS_GET_DATA; -} - -ErtsPortOpResult -erts_port_get_data(Process* c_p, - Port *prt, - Eterm *retvalp) -{ - ErtsProc2PortSigData *sigdp; - ErtsTryImmDrvCallResult try_call_res; - ErtsTryImmDrvCallState try_call_state - = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( - c_p, - prt, - ERTS_PORT_SFLGS_INVALID_LOOKUP, - 0, - 0, - am_get_data); - - try_call_res = try_imm_drv_call(&try_call_state); - switch (try_call_res) { - case ERTS_TRY_IMM_DRV_CALL_OK: { - Eterm *hp; - Eterm data; - ErlHeapFragment *bp; - Uint sz; - if (is_immed(prt->data)) { - bp = NULL; - data = prt->data; - } - else { - bp = new_message_buffer(prt->bp->used_size); - data = copy_struct(prt->data, - prt->bp->used_size, - &hp, - &bp->off_heap); - } - finalize_imm_drv_call(&try_call_state); - if (is_immed(data)) - sz = 0; - else - sz = bp->used_size; - - hp = HAlloc(c_p, sz + 3); - if (is_not_immed(data)) { - data = copy_struct(data, bp->used_size, &hp, &MSO(c_p)); - free_message_buffer(bp); - } - *retvalp = TUPLE2(hp, am_ok, data); - BUMP_REDS(c_p, ERTS_PORT_REDS_GET_DATA); - return ERTS_PORT_OP_DONE; - } - case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: - return ERTS_PORT_OP_DROPPED; - case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: - case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: - /* Schedule call instead... */ - break; - } - - sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = ERTS_P2P_SIG_TYPE_GET_DATA; - - return erts_schedule_proc2port_signal(c_p, - prt, - c_p->common.id, - retvalp, - sigdp, - 0, - port_sig_get_data); -} - typedef struct { int to; void *arg; @@ -4804,7 +4706,7 @@ set_busy_port(ErlDrvPort dprt, int on) #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_busy)) { - erts_snprintf(port_str, sizeof(port_str), + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); DTRACE1(port_busy, port_str); } @@ -4817,7 +4719,7 @@ set_busy_port(ErlDrvPort dprt, int on) #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_not_busy)) { - erts_snprintf(port_str, sizeof(port_str), + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); DTRACE1(port_not_busy, port_str); } @@ -4869,9 +4771,9 @@ erts_port_resume_procs(Port *prt) DTRACE_CHARBUF(pid_str, 16); ErtsProcList* plp2 = plp; - erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id); + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); while (plp2 != NULL) { - erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); + erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->pid); DTRACE2(process_port_unblocked, pid_str, port_str); } } @@ -4970,7 +4872,8 @@ int async_ready(Port *p, void* data) static void report_missing_drv_callback(Port *p, char *drv_type, char *callback) { - ErtsPortNames *pnp = erts_get_port_names(p->common.id); + ErtsPortNames *pnp = erts_get_port_names(p->common.id, + ERTS_Port2ErlDrvPort(p)); char *unknown = "<unknown>"; char *drv_name = pnp->driver_name ? pnp->driver_name : unknown; char *prt_name = pnp->name ? pnp->name : unknown; @@ -4985,15 +4888,25 @@ report_missing_drv_callback(Port *p, char *drv_type, char *callback) void erts_stale_drv_select(Eterm port, + ErlDrvPort drv_port, ErlDrvEvent hndl, int mode, int deselect) { char *type; - ErlDrvPort drv_port = ERTS_Port2ErlDrvPort(erts_port_lookup_raw(port)); - ErtsPortNames *pnp = erts_get_port_names(port); + ErtsPortNames *pnp; erts_dsprintf_buf_t *dsbufp; + if (drv_port == ERTS_INVALID_ERL_DRV_PORT) { + Port *prt = erts_port_lookup_raw(port); + if (prt) + drv_port = ERTS_Port2ErlDrvPort(prt); + else + drv_port = ERTS_INVALID_ERL_DRV_PORT; + } + + pnp = erts_get_port_names(port, drv_port); + switch (mode) { case ERL_DRV_READ | ERL_DRV_WRITE: type = "Input/Output"; @@ -5028,12 +4941,16 @@ erts_stale_drv_select(Eterm port, } ErtsPortNames * -erts_get_port_names(Eterm id) +erts_get_port_names(Eterm id, ErlDrvPort drv_port) { - Port *prt = erts_port_lookup_raw(id); + Port *prt; ErtsPortNames *pnp; ASSERT(is_nil(id) || is_internal_port(id)); + prt = ERTS_ErlDrvPort2Port(drv_port); + if (prt == ERTS_INVALID_ERL_DRV_PORT) + prt = erts_port_lookup_raw(id); + if (!prt) { pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, sizeof(ErtsPortNames)); pnp->name = NULL; @@ -5045,6 +4962,7 @@ erts_get_port_names(Eterm id) size_t pnp_len = sizeof(ErtsPortNames); #ifndef DEBUG pnp_len += 100; /* In most cases 100 characters will be enough... */ + ASSERT(prt->common.id == id); #endif pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len); do { @@ -5430,6 +5348,17 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) depth++; break; } + case ERL_DRV_MAP: { /* int */ + ERTS_DDT_CHK_ENOUGH_ARGS(1); + if ((int) ptr[0] < 0) ERTS_DDT_FAIL; + need += MAP_HEADER_SIZE + 1 + 2*ptr[0]; + depth -= 2*ptr[0]; + if (depth < 0) ERTS_DDT_FAIL; + ptr++; + depth++; + break; + } + default: ERTS_DDT_FAIL; } @@ -5666,6 +5595,36 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) ptr += 2; break; + case ERL_DRV_MAP: { /* int */ + int size = (int)ptr[0]; + Eterm* tp = hp; + Eterm* vp; + map_t *mp; + + *tp = make_arityval(size); + + hp += 1 + size; + mp = (map_t*)hp; + mp->thing_word = MAP_HEADER; + mp->size = size; + mp->keys = make_tuple(tp); + mess = make_map(mp); + + hp += MAP_HEADER_SIZE + size; /* advance "heap" pointer */ + + tp += size; /* point at last key */ + vp = hp - 1; /* point at last value */ + + while(size--) { + *vp-- = ESTACK_POP(stack); + *tp-- = ESTACK_POP(stack); + } + if (!erts_validate_and_sort_map(mp)) + ERTS_DDT_FAIL; + ptr++; + break; + } + } ESTACK_PUSH(stack, mess); } @@ -5934,10 +5893,6 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, return driver_output2(ix, hbuf, hlen, NULL, 0); size = vec->size - skip; /* Size of remaining bytes in vector */ - ASSERT(hlen >= 0); /* debug only */ - if (hlen < 0) - hlen = 0; - prt = erts_drvport2port_state(ix, &state); if (prt == ERTS_INVALID_ERL_DRV_PORT) return -1; @@ -5960,7 +5915,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, binv++; n--; } else { - iov->iov_base += skip; + iov->iov_base = ((char *)(iov->iov_base)) + skip; iov->iov_len -= skip; skip = 0; } @@ -6220,7 +6175,7 @@ driver_pdl_create(ErlDrvPort dp) return NULL; pdl = erts_alloc(ERTS_ALC_T_PORT_DATA_LOCK, sizeof(struct erl_drv_port_data_lock)); - erts_mtx_init(&pdl->mtx, "port_data_lock"); + erts_mtx_init_x(&pdl->mtx, "port_data_lock", pp->common.id, 1); pdl_init_refc(pdl); erts_port_inc_refc(pp); pdl->prt = pp; @@ -6405,7 +6360,7 @@ int driver_enqv(ErlDrvPort ix, ErlIOVec* vec, ErlDrvSizeT skip) n--; } else { - iov->iov_base += skip; + iov->iov_base = ((char *)(iov->iov_base)) + skip; iov->iov_len -= skip; skip = 0; } @@ -6470,7 +6425,7 @@ int driver_pushqv(ErlDrvPort ix, ErlIOVec* vec, ErlDrvSizeT skip) n--; } else { - iov->iov_base += skip; + iov->iov_base = ((char *)(iov->iov_base)) + skip; iov->iov_len -= skip; skip = 0; } @@ -6529,7 +6484,7 @@ ErlDrvSizeT driver_deq(ErlDrvPort ix, ErlDrvSizeT size) q->v_head++; } else { - q->v_head->iov_base += size; + q->v_head->iov_base = ((char *)(q->v_head->iov_base)) + size; q->v_head->iov_len -= size; size = 0; } @@ -7202,7 +7157,7 @@ void *driver_dl_open(char * path) int res; int *last_error_p = erts_smp_tsd_get(driver_list_last_error_key); int locked = maybe_lock_driver_list(); - if ((res = erts_sys_ddll_open(path, &ptr)) == 0) { + if ((res = erts_sys_ddll_open(path, &ptr, NULL)) == 0) { maybe_unlock_driver_list(locked); return ptr; } else { @@ -7257,7 +7212,7 @@ char *driver_dl_error(void) #define ERL_DRV_SYS_INFO_SIZE(LAST_FIELD) \ - (((size_t) &((ErlDrvSysInfo *) 0)->LAST_FIELD) \ + (offsetof(ErlDrvSysInfo, LAST_FIELD) \ + sizeof(((ErlDrvSysInfo *) 0)->LAST_FIELD)) void @@ -7319,6 +7274,18 @@ driver_system_info(ErlDrvSysInfo *sip, size_t si_size) sip->nif_major_version = ERL_NIF_MAJOR_VERSION; sip->nif_minor_version = ERL_NIF_MINOR_VERSION; } + /* + * 'dirty_scheduler_support' is the last field in the 4th version + * (driver version 3.1, NIF version 2.7) + */ + if (si_size >= ERL_DRV_SYS_INFO_SIZE(dirty_scheduler_support)) { +#if defined(ERL_NIF_DIRTY_SCHEDULER_SUPPORT) && defined(USE_THREADS) + sip->dirty_scheduler_support = 1; +#else + sip->dirty_scheduler_support = 0; +#endif + } + } @@ -7405,10 +7372,11 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle) erts_atom_put((byte *) drv->name, sys_strlen(drv->name), ERTS_ATOM_ENC_LATIN1, - 1) + 1), #else - NIL + NIL, #endif + 1 ); } #endif |