aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c1227
1 files changed, 652 insertions, 575 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 9a67e05bfa..fbcb0c31fc 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -1,18 +1,19 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2013. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2014. All Rights Reserved.
*
- * The contents of this file are subject to the Erlang Public License,
- * Version 1.1, (the "License"); you may not use this file except in
- * compliance with the License. You should have received a copy of the
- * Erlang Public License along with this software. If not, it can be
- * retrieved online at http://www.erlang.org/.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * Software distributed under the License is distributed on an "AS IS"
- * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- * the License for the specific language governing rights and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
* %CopyrightEnd%
*/
@@ -46,9 +47,15 @@
#define ERTS_WANT_EXTERNAL_TAGS
#include "external.h"
#include "dtrace-wrapper.h"
+#include "erl_map.h"
+#include "erl_bif_unique.h"
+#include "erl_hl_timer.h"
+#include "erl_time.h"
extern ErlDrvEntry fd_driver_entry;
+#ifndef __OSE__
extern ErlDrvEntry vanilla_driver_entry;
+#endif
extern ErlDrvEntry spawn_driver_entry;
extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */
@@ -60,8 +67,6 @@ static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error o
per thread basis (for BC interfaces) */
ErtsPTab erts_port erts_align_attribute(ERTS_CACHE_LINE_SIZE); /* The port table */
-erts_smp_atomic_t erts_bytes_out; /* No bytes sent out of the system */
-erts_smp_atomic_t erts_bytes_in; /* No bytes gotten into the system */
const ErlDrvTermData driver_term_nil = (ErlDrvTermData)NIL;
@@ -75,6 +80,9 @@ int erts_port_synchronous_ops = 0;
int erts_port_schedule_all_ops = 0;
int erts_port_parallelism = 0;
+static erts_atomic64_t bytes_in;
+static erts_atomic64_t bytes_out;
+
static void deliver_result(Eterm sender, Eterm pid, Eterm res);
static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *);
static void terminate_port(Port *p);
@@ -244,11 +252,13 @@ static ERTS_INLINE void port_init_instr(Port *prt
ASSERT(prt->drv_ptr && prt->lock);
if (!prt->drv_ptr->lock) {
char *lock_str = "port_lock";
+ erts_mtx_init_locked_x(prt->lock, lock_str, id,
#ifdef ERTS_ENABLE_LOCK_COUNT
- if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PORTLOCK))
- lock_str = NULL;
+ (erts_lcnt_rt_options & ERTS_LCNT_OPT_PORTLOCK)
+#else
+ 0
#endif
- erts_mtx_init_locked_x(prt->lock, lock_str, id);
+ );
}
#endif
erts_port_task_init_sched(&prt->sched, id);
@@ -364,9 +374,8 @@ static Port *create_port(char *name,
ERTS_P_LINKS(prt) = NULL;
ERTS_P_MONITORS(prt) = NULL;
prt->linebuf = NULL;
- prt->bp = NULL;
prt->suspended = NULL;
- prt->data = am_undefined;
+ erts_init_port_data(prt);
prt->port_data_lock = NULL;
prt->control_flags = 0;
prt->bytes_in = 0;
@@ -374,11 +383,7 @@ static Port *create_port(char *name,
prt->dist_entry = NULL;
ERTS_PORT_INIT_CONNECTED(prt, pid);
prt->common.u.alive.reg = NULL;
-#ifdef ERTS_SMP
- prt->common.u.alive.ptimer = NULL;
-#else
- sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer));
-#endif
+ ERTS_PTMR_INIT(prt);
erts_port_task_handle_init(&prt->timeout_task);
prt->psd = NULL;
prt->drv_data = (SWord) 0;
@@ -387,7 +392,7 @@ static Port *create_port(char *name,
/* Set default tracing */
erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt));
- ASSERT(((char *) prt) == ((char *) &prt->common));
+ ERTS_CT_ASSERT(offsetof(Port,common) == 0);
#if !ERTS_PORT_INIT_INSTR_NEED_ID
/*
@@ -458,11 +463,7 @@ erts_port_free(Port *prt)
| ERTS_PORT_SFLG_FREE));
ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG);
-#ifdef ERTS_SMP
- ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->common.refc) == 0);
-#else
- ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->refc) == 0);
-#endif
+ ERTS_LC_ASSERT(erts_atomic_read_nob(&prt->common.refc.atmc) == 0);
erts_port_task_fini_sched(&prt->sched);
@@ -731,11 +732,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
/*
* Must clean up the port.
*/
-#ifdef ERTS_SMP
- erts_cancel_smp_ptimer(port->common.u.alive.ptimer);
-#else
- erts_cancel_timer(&(port->common.u.alive.tm));
-#endif
+ erts_cancel_port_timer(port);
stopq(port);
if (port->linebuf != NULL) {
erts_free(ERTS_ALC_T_LINEBUF,
@@ -910,8 +907,8 @@ int erts_port_handle_xports(Port *prt)
(iov)->iov_base = (ptr); \
(iov)->iov_len = (len); \
if (sizeof((iov)->iov_len) < sizeof(len) \
- /* Check if (len) overflowed (iov)->iov_len */ \
- && ((len) >> (sizeof((iov)->iov_len)*CHAR_BIT)) != 0) { \
+ /* Check if (len) overflowed (iov)->iov_len */ \
+ && (iov)->iov_len != (len)) { \
goto L_overflow; \
} \
*(bv)++ = (bin); \
@@ -1214,9 +1211,10 @@ typedef struct {
static ERTS_INLINE ErtsTryImmDrvCallResult
try_imm_drv_call(ErtsTryImmDrvCallState *sp)
{
+ unsigned int prof_runnable_ports;
ErtsTryImmDrvCallResult res;
int reds_left_in;
- erts_aint32_t invalid_state, invalid_sched_flags;
+ erts_aint32_t act, exp, invalid_state, invalid_sched_flags;
Port *prt = sp->port;
Process *c_p = sp->c_p;
@@ -1243,18 +1241,39 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp)
goto locked_fail;
}
- sp->sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
- if (sp->sched_flags & invalid_sched_flags) {
- res = ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS;
- goto locked_fail;
- }
+ prof_runnable_ports = erts_system_profile_flags.runnable_ports;
+ if (prof_runnable_ports)
+ erts_port_task_sched_lock(&prt->sched);
+ act = erts_smp_atomic32_read_nob(&prt->sched.flags);
+
+ do {
+ erts_aint32_t new;
+
+ if (act & invalid_sched_flags) {
+ res = ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS;
+ sp->sched_flags = act;
+ goto locked_fail;
+ }
+ exp = act;
+ new = act | ERTS_PTS_FLG_EXEC_IMM;
+ act = erts_smp_atomic32_cmpxchg_mb(&prt->sched.flags, new, exp);
+ } while (act != exp);
+
+ sp->sched_flags = act;
if (!c_p)
reds_left_in = CONTEXT_REDS/10;
else {
if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS))
trace_virtual_sched(c_p, am_out);
+ /*
+ * No status lock held while sending runnable
+ * proc trace messages. It is however not needed
+ * in this case, since only this thread can send
+ * such messages for this process until the process
+ * has been scheduled out.
+ */
if (erts_system_profile_flags.runnable_procs
&& erts_system_profile_flags.exclusive)
profile_runnable_proc(c_p, am_inactive);
@@ -1269,11 +1288,14 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp)
ERTS_SMP_CHK_NO_PROC_LOCKS;
- if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS))
- trace_sched_ports_where(prt, am_in, sp->port_op);
- if (erts_system_profile_flags.runnable_ports
- && !erts_port_is_scheduled(prt))
- profile_runnable_port(prt, am_active);
+ if (prof_runnable_ports | IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) {
+ if (prof_runnable_ports && !(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
+ profile_runnable_port(prt, am_active);
+ if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS))
+ trace_sched_ports_where(prt, am_in, sp->port_op);
+ if (prof_runnable_ports)
+ erts_port_task_sched_unlock(&prt->sched);
+ }
sp->fpe_was_unmasked = erts_block_fpe();
@@ -1290,17 +1312,31 @@ finalize_imm_drv_call(ErtsTryImmDrvCallState *sp)
int reds;
Port *prt = sp->port;
Process *c_p = sp->c_p;
+ erts_aint32_t act;
+ unsigned int prof_runnable_ports;
reds = prt->reds;
reds += erts_port_driver_callback_epilogue(prt, NULL);
erts_unblock_fpe(sp->fpe_was_unmasked);
- if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS))
- trace_sched_ports_where(prt, am_out, sp->port_op);
- if (erts_system_profile_flags.runnable_ports
- && !erts_port_is_scheduled(prt))
- profile_runnable_port(prt, am_inactive);
+ prof_runnable_ports = erts_system_profile_flags.runnable_ports;
+ if (prof_runnable_ports)
+ erts_port_task_sched_lock(&prt->sched);
+
+ act = erts_smp_atomic32_read_band_mb(&prt->sched.flags,
+ ~ERTS_PTS_FLG_EXEC_IMM);
+ ERTS_SMP_LC_ASSERT(act & ERTS_PTS_FLG_EXEC_IMM);
+
+ if (prof_runnable_ports | IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) {
+ if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS))
+ trace_sched_ports_where(prt, am_out, sp->port_op);
+ if (prof_runnable_ports) {
+ if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
+ profile_runnable_port(prt, am_inactive);
+ erts_port_task_sched_unlock(&prt->sched);
+ }
+ }
erts_port_release(prt);
@@ -1315,6 +1351,13 @@ finalize_imm_drv_call(ErtsTryImmDrvCallState *sp)
if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS))
trace_virtual_sched(c_p, am_in);
+ /*
+ * No status lock held while sending runnable
+ * proc trace messages. It is however not needed
+ * in this case, since only this thread can send
+ * such messages for this process until the process
+ * has been scheduled out.
+ */
if (erts_system_profile_flags.runnable_procs
&& erts_system_profile_flags.exclusive)
profile_runnable_proc(c_p, am_active);
@@ -1331,7 +1374,7 @@ force_imm_drv_call(ErtsTryImmDrvCallState *sp)
erts_aint32_t invalid_state;
Port *prt = sp->port;
- ASSERT(ERTS_IS_CRASH_DUMPING)
+ ASSERT(ERTS_IS_CRASH_DUMPING);
ASSERT(is_atom(sp->port_op));
invalid_state = sp->state;
@@ -1355,39 +1398,22 @@ finalize_force_imm_drv_call(ErtsTryImmDrvCallState *sp)
static ERTS_INLINE void
queue_port_sched_op_reply(Process *rp,
ErtsProcLocks *rp_locksp,
- Eterm *hp_start,
- Eterm *hp,
- Uint h_size,
- ErlHeapFragment* bp,
+ ErtsHeapFactory* factory,
Uint32 *ref_num,
Eterm msg)
{
- Eterm ref = make_internal_ref(hp);
+ Eterm* hp = erts_produce_heap(factory, ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE, 0);
+ Eterm ref;
+
+ ref= make_internal_ref(hp);
write_ref_thing(hp, ref_num[0], ref_num[1], ref_num[2]);
hp += REF_THING_SIZE;
msg = TUPLE2(hp, ref, msg);
- hp += 3;
- if (!bp) {
- HRelease(rp, hp_start + h_size, hp);
- }
- else {
- Uint used_h_size = hp - hp_start;
- ASSERT(h_size >= used_h_size);
- if (h_size > used_h_size)
- bp = erts_resize_message_buffer(bp, used_h_size, &msg, 1);
- }
+ erts_factory_trim_and_close(factory, &msg, 1);
- erts_queue_message(rp,
- rp_locksp,
- bp,
- msg,
- NIL
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, rp_locksp, factory->heap_frags, msg, NIL);
}
static void
@@ -1397,9 +1423,10 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
if (rp) {
ErlOffHeap *ohp;
ErlHeapFragment* bp;
+ ErtsHeapFactory factory;
Eterm msg_copy;
Uint hsz, msg_sz;
- Eterm *hp, *hp_start;
+ Eterm *hp;
ErtsProcLocks rp_locks = 0;
hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
@@ -1410,22 +1437,22 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
hsz += msg_sz;
}
- hp_start = hp = erts_alloc_message_heap(hsz,
+ hp = erts_alloc_message_heap(hsz,
&bp,
&ohp,
rp,
&rp_locks);
+ erts_factory_message_init(&factory, rp, hp, bp);
if (is_immed(msg))
msg_copy = msg;
- else
+ else {
msg_copy = copy_struct(msg, msg_sz, &hp, ohp);
+ factory.hp = hp;
+ }
queue_port_sched_op_reply(rp,
&rp_locks,
- hp_start,
- hp,
- hsz,
- bp,
+ &factory,
ref_num,
msg_copy);
@@ -1442,6 +1469,7 @@ erts_schedule_proc2port_signal(Process *c_p,
Eterm *refp,
ErtsProc2PortSigData *sigdp,
int task_flags,
+ ErtsPortTaskHandle *pthp,
ErtsProc2PortSigCallback callback)
{
int sched_res;
@@ -1491,7 +1519,7 @@ erts_schedule_proc2port_signal(Process *c_p,
/* Schedule port close call for later execution... */
sched_res = erts_port_task_schedule(prt->common.id,
- NULL,
+ pthp,
ERTS_PORT_TASK_PROC_SIG,
sigdp,
callback,
@@ -1520,12 +1548,10 @@ erts_schedule_proc2port_signal(Process *c_p,
}
static ERTS_INLINE void
-send_badsig(Port *prt)
-{
+send_badsig(Port *prt) {
ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
Process* rp;
Eterm connected = ERTS_PORT_GET_CONNECTED(prt);
-
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_LC_ASSERT(erts_get_scheduler_id());
@@ -1545,15 +1571,13 @@ send_badsig(Port *prt)
0);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
- }
-}
+ } /* exit sent */
+} /* send_badsig */
static void
-badsig_received(int bang_op,
- Port *prt,
+badsig_received(int bang_op, Port *prt,
erts_aint32_t state,
- int bad_output_value)
-{
+ int bad_output_value) {
/*
* if (bang_op)
* we are part of a "Prt ! Something" operation
@@ -1569,12 +1593,12 @@ badsig_received(int bang_op,
}
if (bang_op)
send_badsig(prt);
- }
-}
+ } /* not invalid */
+} /* behaved accordingly */
static int
-port_badsig(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
-{
+port_badsig(Port *prt, erts_aint32_t state, int op,
+ ErtsProc2PortSigData *sigdp) {
if (op == ERTS_PROC2PORT_SIG_EXEC)
badsig_received(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BANG_OP,
prt,
@@ -1583,16 +1607,14 @@ port_badsig(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
return ERTS_PORT_REDS_BADSIG;
-}
-
-
-/*
- * bad_port_signal() will
+} /* port_badsig */
+/* bad_port_signal() will
* - preserve signal order of signals.
* - send a 'badsig' exit signal to connected process if 'from' is an
* internal pid and the port is alive when the bad signal reaches
* it.
*/
+
static ErtsPortOpResult
bad_port_signal(Process *c_p,
int flags,
@@ -1640,6 +1662,7 @@ bad_port_signal(Process *c_p,
refp,
sigdp,
0,
+ NULL,
port_badsig);
}
@@ -1666,6 +1689,7 @@ call_driver_outputv(int bang_op,
if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt))
send_badsig(prt);
else {
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ErlDrvSizeT size = evp->size;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
@@ -1683,7 +1707,10 @@ call_driver_outputv(int bang_op,
prt->caller = NIL;
prt->bytes_out += size;
- erts_smp_atomic_add_nob(&erts_bytes_out, size);
+ if (esdp)
+ esdp->io.out += (Uint64) size;
+ else
+ erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size);
}
}
@@ -1763,7 +1790,7 @@ call_driver_output(int bang_op,
if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt))
send_badsig(prt);
else {
-
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
@@ -1779,7 +1806,10 @@ call_driver_output(int bang_op,
prt->caller = NIL;
prt->bytes_out += size;
- erts_smp_atomic_add_nob(&erts_bytes_out, size);
+ if (esdp)
+ esdp->io.out += (Uint64) size;
+ else
+ erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size);
}
}
@@ -1849,8 +1879,11 @@ erts_port_output(Process *c_p,
ErlIOVec *evp = NULL;
char *buf = NULL;
int force_immediate_call = (flags & ERTS_PORT_SIG_FLG_FORCE_IMM_CALL);
+ int async_nosuspend;
+ ErtsPortTaskHandle *ns_pthp;
ASSERT((flags & ~(ERTS_PORT_SIG_FLG_BANG_OP
+ | ERTS_PORT_SIG_FLG_ASYNC
| ERTS_PORT_SIG_FLG_NOSUSPEND
| ERTS_PORT_SIG_FLG_FORCE
| ERTS_PORT_SIG_FLG_FORCE_IMM_CALL)) == 0);
@@ -1872,6 +1905,12 @@ erts_port_output(Process *c_p,
? ERTS_PORT_OP_DROPPED
: ERTS_PORT_OP_BUSY);
+ async_nosuspend = ((flags & (ERTS_PORT_SIG_FLG_ASYNC
+ | ERTS_PORT_SIG_FLG_NOSUSPEND
+ | ERTS_PORT_SIG_FLG_FORCE))
+ == (ERTS_PORT_SIG_FLG_ASYNC
+ | ERTS_PORT_SIG_FLG_NOSUSPEND));
+
try_call = (force_immediate_call /* crash dumping */
|| !(sched_flags & (invalid_flags
| ERTS_PTS_FLGS_FORCE_SCHEDULE_OP)));
@@ -2006,6 +2045,15 @@ erts_port_output(Process *c_p,
return ERTS_PORT_OP_DONE;
case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS:
sched_flags = try_call_state.sched_flags;
+ if (async_nosuspend
+ && (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))) {
+ driver_free_binary(cbin);
+ if (evp != &ev)
+ erts_free(ERTS_ALC_T_TMP, evp);
+ return ((sched_flags & ERTS_PTS_FLG_EXIT)
+ ? ERTS_PORT_OP_DROPPED
+ : ERTS_PORT_OP_BUSY);
+ }
case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK:
/* Schedule outputv() call instead... */
break;
@@ -2153,6 +2201,13 @@ erts_port_output(Process *c_p,
return ERTS_PORT_OP_DONE;
case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS:
sched_flags = try_call_state.sched_flags;
+ if (async_nosuspend
+ && (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))) {
+ erts_free(ERTS_ALC_T_TMP, buf);
+ return ((sched_flags & ERTS_PTS_FLG_EXIT)
+ ? ERTS_PORT_OP_DROPPED
+ : ERTS_PORT_OP_BUSY);
+ }
case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK:
/* Schedule outputv() call instead... */
break;
@@ -2174,20 +2229,33 @@ erts_port_output(Process *c_p,
task_flags = ERTS_PT_FLG_WAIT_BUSY;
sigdp->flags |= flags;
+ ns_pthp = NULL;
if (flags & (ERTS_P2P_SIG_DATA_FLG_FORCE|ERTS_P2P_SIG_DATA_FLG_NOSUSPEND)) {
task_flags = 0;
if (flags & ERTS_P2P_SIG_DATA_FLG_FORCE)
sigdp->flags &= ~ERTS_P2P_SIG_DATA_FLG_NOSUSPEND;
+ else if (async_nosuspend) {
+ ErtsSchedulerData *esdp = (c_p
+ ? ERTS_PROC_GET_SCHDATA(c_p)
+ : erts_get_scheduler_data());
+ ASSERT(esdp);
+ ns_pthp = &esdp->nosuspend_port_task_handle;
+ sigdp->flags &= ~ERTS_P2P_SIG_DATA_FLG_NOSUSPEND;
+ }
else if (flags & ERTS_P2P_SIG_DATA_FLG_NOSUSPEND)
task_flags = ERTS_PT_FLG_NOSUSPEND;
}
+ ASSERT(ns_pthp || !async_nosuspend);
+ ASSERT(async_nosuspend || !ns_pthp);
+
res = erts_schedule_proc2port_signal(c_p,
prt,
c_p ? c_p->common.id : ERTS_INVALID_PID,
refp,
sigdp,
task_flags,
+ ns_pthp,
port_sig_callback);
if (res != ERTS_PORT_OP_SCHEDULED) {
@@ -2198,9 +2266,23 @@ erts_port_output(Process *c_p,
return res;
}
- if (!(sched_flags & ERTS_PTS_FLG_EXIT) && (sched_flags & busy_flgs))
- return ERTS_PORT_OP_BUSY_SCHEDULED;
-
+ if (!(flags & ERTS_PORT_SIG_FLG_FORCE)) {
+ sched_flags = erts_smp_atomic32_read_acqb(&prt->sched.flags);
+ if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)) {
+ if (async_nosuspend)
+ erts_port_task_tmp_handle_detach(ns_pthp);
+ }
+ else {
+ if (!async_nosuspend)
+ return ERTS_PORT_OP_BUSY_SCHEDULED;
+ else {
+ if (erts_port_task_abort(ns_pthp) == 0)
+ return ERTS_PORT_OP_BUSY;
+ else
+ erts_port_task_tmp_handle_detach(ns_pthp);
+ }
+ }
+ }
return res;
bad_value:
@@ -2296,6 +2378,7 @@ erts_port_exit(Process *c_p,
ErlHeapFragment *bp = NULL;
ASSERT((flags & ~(ERTS_PORT_SIG_FLG_BANG_OP
+ | ERTS_PORT_SIG_FLG_ASYNC
| ERTS_PORT_SIG_FLG_BROKEN_LINK
| ERTS_PORT_SIG_FLG_FORCE_SCHED)) == 0);
@@ -2356,6 +2439,7 @@ erts_port_exit(Process *c_p,
refp,
sigdp,
0,
+ NULL,
port_sig_exit);
if (res == ERTS_PORT_OP_DROPPED) {
@@ -2424,7 +2508,7 @@ set_port_connected(int bang_op,
DTRACE_CHARBUF(newprocess_str, DTRACE_TERM_BUF_SIZE);
dtrace_pid_str(connect, process_str);
- erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id);
+ erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id);
dtrace_proc_str(rp, newprocess_str);
DTRACE4(port_connect, process_str, port_str, prt->name, newprocess_str);
}
@@ -2471,7 +2555,8 @@ erts_port_connect(Process *c_p,
!refp,
am_connect);
- ASSERT((flags & ~ERTS_PORT_SIG_FLG_BANG_OP) == 0);
+ ASSERT((flags & ~(ERTS_PORT_SIG_FLG_BANG_OP
+ | ERTS_PORT_SIG_FLG_ASYNC)) == 0);
if (is_not_internal_pid(connect))
connect_id = NIL; /* Fail in op (for signal order) */
@@ -2510,6 +2595,7 @@ erts_port_connect(Process *c_p,
refp,
sigdp,
0,
+ NULL,
port_sig_connect);
}
@@ -2566,6 +2652,7 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp)
refp,
sigdp,
0,
+ NULL,
port_sig_unlink);
}
@@ -2655,18 +2742,29 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
refp,
sigdp,
0,
+ NULL,
port_sig_link);
}
void erts_init_io(int port_tab_size,
- int port_tab_size_ignore_files)
+ int port_tab_size_ignore_files,
+ int legacy_port_tab)
{
ErlDrvEntry** dp;
+ UWord common_element_size;
erts_smp_rwmtx_opt_t drv_list_rwmtx_opts = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER;
drv_list_rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ;
drv_list_rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED;
+ erts_atomic64_init_nob(&bytes_in, 0);
+ erts_atomic64_init_nob(&bytes_out, 0);
+
+ common_element_size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
+ common_element_size += ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ));
+ common_element_size += 10; /* name */
#ifdef ERTS_SMP
+ common_element_size += sizeof(erts_mtx_t);
+
init_xports_list_alloc();
#endif
@@ -2687,18 +2785,20 @@ void erts_init_io(int port_tab_size,
&drv_list_rwmtx_opts,
"driver_list");
driver_list = NULL;
- erts_smp_tsd_key_create(&driver_list_lock_status_key);
- erts_smp_tsd_key_create(&driver_list_last_error_key);
+ erts_smp_tsd_key_create(&driver_list_lock_status_key,
+ "erts_driver_list_lock_status_key");
+ erts_smp_tsd_key_create(&driver_list_last_error_key,
+ "erts_driver_list_last_error_key");
erts_ptab_init_table(&erts_port,
ERTS_ALC_T_PORT_TABLE,
NULL,
(ErtsPTabElementCommon *) &erts_invalid_port.common,
port_tab_size,
- "port_table");
-
- erts_smp_atomic_init_nob(&erts_bytes_out, 0);
- erts_smp_atomic_init_nob(&erts_bytes_in, 0);
+ common_element_size, /* Doesn't need to be excact */
+ "port_table",
+ legacy_port_tab,
+ 1);
sys_init_io();
@@ -2706,8 +2806,11 @@ void erts_init_io(int port_tab_size,
erts_smp_rwmtx_rwlock(&erts_driver_list_lock);
init_driver(&fd_driver, &fd_driver_entry, NULL);
+#ifndef __OSE__
init_driver(&vanilla_driver, &vanilla_driver_entry, NULL);
+#endif
init_driver(&spawn_driver, &spawn_driver_entry, NULL);
+ erts_init_static_drivers();
for (dp = driver_tab; *dp != NULL; dp++)
erts_add_driver_entry(*dp, NULL, 1);
@@ -2716,7 +2819,6 @@ void erts_init_io(int port_tab_size,
}
#if defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP)
-
static ERTS_INLINE void lcnt_enable_drv_lock_count(erts_driver_t *dp, int enable)
{
if (dp->lock) {
@@ -2756,25 +2858,26 @@ static ERTS_INLINE void lcnt_enable_port_lock_count(Port *prt, int enable)
}
}
-void erts_lcnt_enable_io_lock_count(int enable)
-{
+void erts_lcnt_enable_io_lock_count(int enable) {
erts_driver_t *dp;
- int i, max = erts_ptab_max(&erts_port);
+ int ix, max = erts_ptab_max(&erts_port);
+ Port *prt;
- for (i = 0; i < max; i++) {
- Port *prt = erts_pix2port(i);
- if (prt)
+ for (ix = 0; ix < max; ix++) {
+ if ((prt = erts_pix2port(ix)) != NULL) {
lcnt_enable_port_lock_count(prt, enable);
- }
+ }
+ } /* for all ports */
lcnt_enable_drv_lock_count(&vanilla_driver, enable);
lcnt_enable_drv_lock_count(&spawn_driver, enable);
lcnt_enable_drv_lock_count(&fd_driver, enable);
- for (dp = driver_list; dp; dp = dp->next)
+ /* enable lock counting in all drivers */
+ for (dp = driver_list; dp; dp = dp->next) {
lcnt_enable_drv_lock_count(dp, enable);
-}
-#endif
-
+ }
+} /* enable/disable lock counting of ports */
+#endif /* defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP) */
/*
* Buffering of data when using line oriented I/O on ports
*/
@@ -2959,7 +3062,7 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
rp = (scheduler
? erts_proc_lookup(pid)
- : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_SMP_INC_REFC));
+ : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_INC_REFC));
if (rp) {
Eterm tuple;
@@ -2972,16 +3075,12 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks);
res = copy_struct(res, sz_res, &hp, ohp);
tuple = TUPLE2(hp, sender, res);
- erts_queue_message(rp, &rp_locks, bp, tuple, NIL
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, &rp_locks, bp, tuple, NIL);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
- erts_smp_proc_dec_refc(rp);
+ erts_proc_dec_refc(rp);
}
}
@@ -3025,7 +3124,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
rp = (scheduler
? erts_proc_lookup(to)
- : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC));
+ : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_INC_REFC));
if (!rp)
return;
@@ -3040,8 +3139,6 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
Binary* bptr;
bptr = erts_bin_nrml_alloc(len);
- bptr->flags = 0;
- bptr->orig_size = len;
erts_refc_init(&bptr->refc, 1);
sys_memcpy(bptr->orig_bytes, buf, len);
@@ -3074,15 +3171,11 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
- erts_smp_proc_dec_refc(rp);
+ erts_proc_dec_refc(rp);
}
/*
@@ -3168,7 +3261,7 @@ deliver_vec_message(Port* prt, /* Port */
rp = (scheduler
? erts_proc_lookup(to)
- : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC));
+ : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_INC_REFC));
if (!rp)
return;
@@ -3245,14 +3338,10 @@ deliver_vec_message(Port* prt, /* Port */
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
- erts_smp_proc_dec_refc(rp);
+ erts_proc_dec_refc(rp);
}
@@ -3342,11 +3431,8 @@ terminate_port(Port *prt)
send_closed_port_id = NIL;
}
-#ifdef ERTS_SMP
- erts_cancel_smp_ptimer(prt->common.u.alive.ptimer);
-#else
- erts_cancel_timer(&prt->common.u.alive.tm);
-#endif
+ if (ERTS_PTMR_IS_SET(prt))
+ erts_cancel_port_timer(prt);
drv = prt->drv_ptr;
if ((drv != NULL) && (drv->stop != NULL)) {
@@ -3375,11 +3461,8 @@ terminate_port(Port *prt)
erts_free(ERTS_ALC_T_LINEBUF, (void *) prt->linebuf);
prt->linebuf = NULL;
}
- if (prt->bp != NULL) {
- free_message_buffer(prt->bp);
- prt->bp = NULL;
- prt->data = am_undefined;
- }
+
+ erts_cleanup_port_data(prt);
if (prt->psd)
erts_free(ERTS_ALC_T_PRTSD, prt->psd);
@@ -3395,7 +3478,7 @@ terminate_port(Port *prt)
if ((state & ERTS_PORT_SFLG_HALT)
&& (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) {
erts_port_release(prt); /* We will exit and never return */
- erl_exit_flush_async(erts_halt_code, "");
+ erts_flush_async_exit(erts_halt_code, "");
}
if (is_internal_port(send_closed_port_id))
deliver_result(send_closed_port_id, connected_id, am_closed);
@@ -3527,9 +3610,9 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed)
DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);
DTRACE_CHARBUF(rreason_str, 64);
- erts_snprintf(from_str, sizeof(from_str), "%T", from);
+ erts_snprintf(from_str, sizeof(DTRACE_CHARBUF_NAME(from_str)), "%T", from);
dtrace_port_str(p, port_str);
- erts_snprintf(rreason_str, sizeof(rreason_str), "%T", rreason);
+ erts_snprintf(rreason_str, sizeof(DTRACE_CHARBUF_NAME(rreason_str)), "%T", rreason);
DTRACE4(port_exit, from_str, port_str, p->name, rreason_str);
}
#endif
@@ -3547,6 +3630,8 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed)
if (send_closed)
set_state_flags |= ERTS_PORT_SFLG_SEND_CLOSED;
+ erts_port_task_sched_enter_exiting_state(&p->sched);
+
state = erts_atomic32_read_bor_mb(&p->state, set_state_flags);
state |= set_state_flags;
@@ -3629,6 +3714,10 @@ erts_port_command(Process *c_p,
ASSERT(port);
flags |= ERTS_PORT_SIG_FLG_BANG_OP;
+ if (!erts_port_synchronous_ops) {
+ flags |= ERTS_PORT_SIG_FLG_ASYNC;
+ refp = NULL;
+ }
if (is_tuple_arity(command, 2)) {
Eterm cntd;
@@ -3636,21 +3725,14 @@ erts_port_command(Process *c_p,
cntd = tp[1];
if (is_internal_pid(cntd)) {
if (tp[2] == am_close) {
- if (!erts_port_synchronous_ops)
- refp = NULL;
flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND;
return erts_port_exit(c_p, flags, port, cntd, am_normal, refp);
} else if (is_tuple_arity(tp[2], 2)) {
tp = tuple_val(tp[2]);
if (tp[1] == am_command) {
- if (!(flags & ERTS_PORT_SIG_FLG_NOSUSPEND)
- && !erts_port_synchronous_ops)
- refp = NULL;
return erts_port_output(c_p, flags, port, cntd, tp[2], refp);
}
else if (tp[1] == am_connect) {
- if (!erts_port_synchronous_ops)
- refp = NULL;
flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND;
return erts_port_connect(c_p, flags, port, cntd, tp[2], refp);
}
@@ -3659,8 +3741,6 @@ erts_port_command(Process *c_p,
}
/* badsig */
- if (!erts_port_synchronous_ops)
- refp = NULL;
flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND;
return bad_port_signal(c_p, flags, port, c_p->common.id, refp, am_command);
}
@@ -3823,12 +3903,13 @@ port_sig_control(Port *prt,
if (res == ERTS_PORT_OP_DONE) {
Eterm msg;
- Eterm *hp, *hp_start;
+ Eterm *hp;
ErlHeapFragment *bp;
ErlOffHeap *ohp;
+ ErtsHeapFactory factory;
Process *rp;
ErtsProcLocks rp_locks = 0;
- Uint hsz;
+ Uint hsz, rsz;
int control_flags;
rp = erts_proc_lookup_raw(sigdp->caller);
@@ -3837,17 +3918,19 @@ port_sig_control(Port *prt,
control_flags = prt->control_flags;
- hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
- hsz += port_control_result_size(control_flags,
+ rsz = port_control_result_size(control_flags,
resp_bufp,
&resp_size,
&resp_buf[0]);
+ hsz = rsz + ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
+
- hp_start = hp = erts_alloc_message_heap(hsz,
+ hp = erts_alloc_message_heap(hsz,
&bp,
&ohp,
rp,
&rp_locks);
+ erts_factory_message_init(&factory, rp, hp, bp);
msg = write_port_control_result(control_flags,
resp_bufp,
@@ -3856,13 +3939,11 @@ port_sig_control(Port *prt,
&hp,
bp,
ohp);
+ factory.hp = hp;
queue_port_sched_op_reply(rp,
&rp_locks,
- hp_start,
- hp,
- hsz,
- bp,
+ &factory,
sigdp->ref,
msg);
@@ -3980,6 +4061,9 @@ erts_port_control(Process* c_p,
size,
&resp_bufp,
&resp_size);
+
+ control_flags = prt->control_flags;
+
finalize_imm_drv_call(&try_call_state);
if (tmp_alloced)
erts_free(ERTS_ALC_T_TMP, bufp);
@@ -3987,8 +4071,6 @@ erts_port_control(Process* c_p,
return ERTS_PORT_OP_BADARG;
}
- control_flags = prt->control_flags;
-
hsz = port_control_result_size(control_flags,
resp_bufp,
&resp_size,
@@ -4029,7 +4111,7 @@ erts_port_control(Process* c_p,
copy = 1;
else {
binp = ((ProcBin *) ebinp)->val;
- ASSERT(bufp < bufp + size);
+ ASSERT(bufp <= bufp + size);
ASSERT(binp->orig_bytes <= bufp
&& bufp + size <= binp->orig_bytes + binp->orig_size);
erts_refc_inc(&binp->refc, 1);
@@ -4057,6 +4139,7 @@ erts_port_control(Process* c_p,
retvalp,
sigdp,
0,
+ NULL,
port_sig_control);
if (res != ERTS_PORT_OP_SCHEDULED) {
cleanup_scheduled_control(binp, bufp);
@@ -4147,11 +4230,9 @@ port_sig_call(Port *prt,
if (res == ERTS_PORT_OP_DONE) {
Eterm msg;
Eterm *hp;
- ErlHeapFragment *bp;
- ErlOffHeap *ohp;
Process *rp;
ErtsProcLocks rp_locks = 0;
- Uint hsz;
+ Sint hsz;
rp = erts_proc_lookup_raw(sigdp->caller);
if (!rp)
@@ -4159,29 +4240,31 @@ port_sig_call(Port *prt,
hsz = erts_decode_ext_size((byte *) resp_bufp, resp_size);
if (hsz >= 0) {
- Eterm *hp_start;
+ ErlHeapFragment* bp;
+ ErlOffHeap* ohp;
+ ErtsHeapFactory factory;
byte *endp;
hsz += 3; /* ok tuple */
hsz += ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
- hp_start = hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
+ hp = erts_alloc_message_heap(hsz,
+ &bp,
+ &ohp,
+ rp,
+ &rp_locks);
endp = (byte *) resp_bufp;
- msg = erts_decode_ext(&hp, ohp, &endp);
+ erts_factory_message_init(&factory, rp, hp, bp);
+ msg = erts_decode_ext(&factory, &endp);
if (is_value(msg)) {
+ hp = erts_produce_heap(&factory,
+ 3,
+ ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE);
msg = TUPLE2(hp, am_ok, msg);
- hp += 3;
queue_port_sched_op_reply(rp,
&rp_locks,
- hp_start,
- hp,
- hsz,
- bp,
+ &factory,
sigdp->ref,
msg);
@@ -4189,8 +4272,6 @@ port_sig_call(Port *prt,
erts_smp_proc_unlock(rp, rp_locks);
goto done;
}
- if (bp)
- free_message_buffer(bp);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
}
@@ -4267,10 +4348,11 @@ erts_port_call(Process* c_p,
try_call_res = try_imm_drv_call(&try_call_state);
switch (try_call_res) {
case ERTS_TRY_IMM_DRV_CALL_OK: {
- Eterm *hp, *hp_end;
- Uint hsz;
+ ErtsHeapFactory factory;
+ Sint hsz;
unsigned ret_flags = 0U;
Eterm term;
+ Eterm* hp;
res = call_driver_call(c_p->common.id,
prt,
@@ -4290,15 +4372,14 @@ erts_port_call(Process* c_p,
if (hsz < 0)
return ERTS_PORT_OP_BADARG;
hsz += 3;
- hp = HAlloc(c_p, hsz);
- hp_end = hp + hsz;
+ erts_factory_proc_prealloc_init(&factory, c_p, hsz);
endp = (byte *) resp_bufp;
- term = erts_decode_ext(&hp, &MSO(c_p), &endp);
+ term = erts_decode_ext(&factory, &endp);
if (term == THE_NON_VALUE)
return ERTS_PORT_OP_BADARG;
+ hp = erts_produce_heap(&factory,3,0);
*retvalp = TUPLE2(hp, am_ok, term);
- hp += 3;
- HRelease(c_p, hp_end, hp);
+ erts_factory_close(&factory);
if (resp_bufp != &resp_buf[0]
&& !(ret_flags & DRIVER_CALL_KEEP_BUFFER))
driver_free(resp_bufp);
@@ -4337,6 +4418,7 @@ erts_port_call(Process* c_p,
retvalp,
sigdp,
0,
+ NULL,
port_sig_call);
if (res != ERTS_PORT_OP_SCHEDULED) {
cleanup_scheduled_call(bufp);
@@ -4375,7 +4457,7 @@ make_port_info_term(Eterm **hpp_start,
int len;
int start;
static Eterm item[] = ERTS_PORT_INFO_1_ITEMS;
- static Eterm value[sizeof(item)/sizeof(item[0])];
+ Eterm value[sizeof(item)/sizeof(item[0])];
start = 0;
len = sizeof(item)/sizeof(item[0]);
@@ -4432,12 +4514,11 @@ port_sig_info(Port *prt,
prt,
sigdp->u.info.item);
if (is_value(value)) {
+ ErtsHeapFactory factory;
+ erts_factory_message_init(&factory, NULL, hp, bp);
queue_port_sched_op_reply(rp,
&rp_locks,
- hp_start,
- hp,
- hsz,
- bp,
+ &factory,
sigdp->ref,
value);
}
@@ -4503,228 +4584,106 @@ erts_port_info(Process* c_p,
retvalp,
sigdp,
0,
+ NULL,
port_sig_info);
}
-static int
-port_sig_set_data(Port *prt,
- erts_aint32_t state,
- int op,
- ErtsProc2PortSigData *sigdp)
-{
- ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY);
-
- if (op == ERTS_PROC2PORT_SIG_EXEC) {
- if (prt->bp)
- free_message_buffer(prt->bp);
- prt->bp = sigdp->u.set_data.bp;
- prt->data = sigdp->u.set_data.data;
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true);
- }
- else {
- if (sigdp->u.set_data.bp)
- free_message_buffer(sigdp->u.set_data.bp);
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
- }
- return ERTS_PORT_REDS_SET_DATA;
-}
-
-ErtsPortOpResult
-erts_port_set_data(Process* c_p,
- Port *prt,
- Eterm data,
- Eterm *refp)
-{
- ErtsPortOpResult res;
- Eterm set_data;
- ErlHeapFragment *bp;
- ErtsProc2PortSigData *sigdp;
- ErtsTryImmDrvCallResult try_call_res;
- ErtsTryImmDrvCallState try_call_state
- = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
- c_p,
- prt,
- ERTS_PORT_SFLGS_INVALID_LOOKUP,
- 0,
- !refp,
- am_set_data);
-
- if (is_immed(data)) {
- set_data = data;
- bp = NULL;
- }
- else {
- Eterm *hp;
- Uint sz = size_object(data);
- bp = new_message_buffer(sz);
- hp = bp->mem;
- set_data = copy_struct(data, sz, &hp, &bp->off_heap);
- }
-
- try_call_res = try_imm_drv_call(&try_call_state);
- switch (try_call_res) {
- case ERTS_TRY_IMM_DRV_CALL_OK:
- if (prt->bp)
- free_message_buffer(prt->bp);
- prt->bp = bp;
- prt->data = set_data;
- finalize_imm_drv_call(&try_call_state);
- BUMP_REDS(c_p, ERTS_PORT_REDS_SET_DATA);
- return ERTS_PORT_OP_DONE;
- case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
- return ERTS_PORT_OP_DROPPED;
- case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS:
- case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK:
- /* Schedule call instead... */
- break;
- }
-
- sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = ERTS_P2P_SIG_TYPE_SET_DATA;
- sigdp->u.set_data.data = set_data;
- sigdp->u.set_data.bp = bp;
-
- res = erts_schedule_proc2port_signal(c_p,
- prt,
- c_p->common.id,
- refp,
- sigdp,
- 0,
- port_sig_set_data);
- if (res != ERTS_PORT_OP_SCHEDULED && bp)
- free_message_buffer(bp);
- return res;
-}
+typedef struct {
+ Uint sched_id;
+ Eterm pid;
+ Uint32 refn[ERTS_REF_NUMBERS];
+ erts_smp_atomic32_t refc;
+} ErtsIOBytesReq;
-static int
-port_sig_get_data(Port *prt,
- erts_aint32_t state,
- int op,
- ErtsProc2PortSigData *sigdp)
+static void
+reply_io_bytes(void *vreq)
{
- ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY);
- if (op != ERTS_PROC2PORT_SIG_EXEC)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
- else {
- Process *rp;
- ErtsProcLocks rp_locks = 0;
+ ErtsIOBytesReq *req = (ErtsIOBytesReq *) vreq;
+ Process *rp;
- rp = erts_proc_lookup_raw(sigdp->caller);
- if (rp) {
- Uint hsz;
- Eterm *hp, *hp_start;
- Eterm data, msg;
- ErlHeapFragment *bp;
- ErlOffHeap *ohp;
+ rp = erts_proc_lookup(req->pid);
+ if (rp) {
+ ErlOffHeap *ohp = NULL;
+ ErlHeapFragment *bp = NULL;
+ ErtsProcLocks rp_locks;
+ Eterm ref, msg, ein, eout, *hp;
+ Uint64 in, out;
+ Uint hsz;
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ Uint sched_id = esdp->no;
+ in = esdp->io.in;
+ out = esdp->io.out;
+ if (req->sched_id != sched_id)
+ rp_locks = 0;
+ else {
+ in += (Uint64) erts_atomic64_read_nob(&bytes_in);
+ out += (Uint64) erts_atomic64_read_nob(&bytes_out);
+ rp_locks = ERTS_PROC_LOCK_MAIN;
+ }
- hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
- hsz += 3;
- if (prt->bp)
- hsz += prt->bp->used_size;
+ hsz = 5 /* 4-tuple */ + REF_THING_SIZE;
- hp_start = hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
+ erts_bld_uint64(NULL, &hsz, in);
+ erts_bld_uint64(NULL, &hsz, out);
- if (is_immed(prt->data))
- data = prt->data;
- else
- data = copy_struct(prt->data,
- prt->bp->used_size,
- &hp,
- &bp->off_heap);
+ hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks);
+ ref = make_internal_ref(hp);
+ write_ref_thing(hp, req->refn[0], req->refn[1], req->refn[2]);
+ hp += REF_THING_SIZE;
+ ein = erts_bld_uint64(&hp, NULL, in);
+ eout = erts_bld_uint64(&hp, NULL, out);
- msg = TUPLE2(hp, am_ok, data);
- hp += 3;
+ msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout);
+ erts_queue_message(rp, &rp_locks, bp, msg, NIL);
- queue_port_sched_op_reply(rp,
- &rp_locks,
- hp_start,
- hp,
- hsz,
- bp,
- sigdp->ref,
- msg);
- if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
- }
+ if (req->sched_id == sched_id)
+ rp_locks &= ~ERTS_PROC_LOCK_MAIN;
+ if (rp_locks)
+ erts_smp_proc_unlock(rp, rp_locks);
}
- return ERTS_PORT_REDS_GET_DATA;
+
+ if (erts_smp_atomic32_dec_read_nob(&req->refc) == 0)
+ erts_free(ERTS_ALC_T_IOB_REQ, req);
}
-ErtsPortOpResult
-erts_port_get_data(Process* c_p,
- Port *prt,
- Eterm *retvalp)
+Eterm
+erts_request_io_bytes(Process *c_p)
{
- ErtsProc2PortSigData *sigdp;
- ErtsTryImmDrvCallResult try_call_res;
- ErtsTryImmDrvCallState try_call_state
- = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
- c_p,
- prt,
- ERTS_PORT_SFLGS_INVALID_LOOKUP,
- 0,
- 0,
- am_get_data);
+ Uint *hp;
+ Eterm ref;
+ Uint32 *refn;
+ ErtsSchedulerData *esdp = ERTS_PROC_GET_SCHDATA(c_p);
+ ErtsIOBytesReq *req = erts_alloc(ERTS_ALC_T_IOB_REQ,
+ sizeof(ErtsIOBytesReq));
+
+ hp = HAlloc(c_p, REF_THING_SIZE);
+ ref = erts_sched_make_ref_in_buffer(esdp, hp);
+ refn = internal_ref_numbers(ref);
+
+ req->sched_id = esdp->no;
+ req->pid = c_p->common.id;
+ req->refn[0] = refn[0];
+ req->refn[1] = refn[1];
+ req->refn[2] = refn[2];
+ erts_smp_atomic32_init_nob(&req->refc,
+ (erts_aint32_t) erts_no_schedulers);
- try_call_res = try_imm_drv_call(&try_call_state);
- switch (try_call_res) {
- case ERTS_TRY_IMM_DRV_CALL_OK: {
- Eterm *hp;
- Eterm data;
- ErlHeapFragment *bp;
- Uint sz;
- if (is_immed(prt->data)) {
- bp = NULL;
- data = prt->data;
- }
- else {
- bp = new_message_buffer(prt->bp->used_size);
- data = copy_struct(prt->data,
- prt->bp->used_size,
- &hp,
- &bp->off_heap);
- }
- finalize_imm_drv_call(&try_call_state);
- if (is_immed(data))
- sz = 0;
- else
- sz = bp->used_size;
-
- hp = HAlloc(c_p, sz + 3);
- if (is_not_immed(data)) {
- data = copy_struct(data, bp->used_size, &hp, &MSO(c_p));
- free_message_buffer(bp);
- }
- *retvalp = TUPLE2(hp, am_ok, data);
- BUMP_REDS(c_p, ERTS_PORT_REDS_GET_DATA);
- return ERTS_PORT_OP_DONE;
- }
- case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
- return ERTS_PORT_OP_DROPPED;
- case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS:
- case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK:
- /* Schedule call instead... */
- break;
- }
+#ifdef ERTS_SMP
+ if (erts_no_schedulers > 1)
+ erts_schedule_multi_misc_aux_work(1,
+ erts_no_schedulers,
+ reply_io_bytes,
+ (void *) req);
+#endif
- sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = ERTS_P2P_SIG_TYPE_GET_DATA;
+ reply_io_bytes((void *) req);
- return erts_schedule_proc2port_signal(c_p,
- prt,
- c_p->common.id,
- retvalp,
- sigdp,
- 0,
- port_sig_get_data);
+ return ref;
}
+
typedef struct {
int to;
void *arg;
@@ -4815,7 +4774,7 @@ set_busy_port(ErlDrvPort dprt, int on)
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_busy)) {
- erts_snprintf(port_str, sizeof(port_str),
+ erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
"%T", prt->common.id);
DTRACE1(port_busy, port_str);
}
@@ -4828,7 +4787,7 @@ set_busy_port(ErlDrvPort dprt, int on)
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_not_busy)) {
- erts_snprintf(port_str, sizeof(port_str),
+ erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
"%T", prt->common.id);
DTRACE1(port_not_busy, port_str);
}
@@ -4880,9 +4839,9 @@ erts_port_resume_procs(Port *prt)
DTRACE_CHARBUF(pid_str, 16);
ErtsProcList* plp2 = plp;
- erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id);
+ erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id);
while (plp2 != NULL) {
- erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid);
+ erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->pid);
DTRACE2(process_port_unblocked, pid_str, port_str);
}
}
@@ -4934,7 +4893,7 @@ void erts_raw_port_command(Port* p, byte* buf, Uint len)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
if (len > (Uint) INT_MAX)
- erl_exit(ERTS_ABORT_EXIT,
+ erts_exit(ERTS_ABORT_EXIT,
"Absurdly large data buffer (%beu bytes) passed to"
"output callback of %s driver.\n",
len,
@@ -4981,7 +4940,8 @@ int async_ready(Port *p, void* data)
static void
report_missing_drv_callback(Port *p, char *drv_type, char *callback)
{
- ErtsPortNames *pnp = erts_get_port_names(p->common.id);
+ ErtsPortNames *pnp = erts_get_port_names(p->common.id,
+ ERTS_Port2ErlDrvPort(p));
char *unknown = "<unknown>";
char *drv_name = pnp->driver_name ? pnp->driver_name : unknown;
char *prt_name = pnp->name ? pnp->name : unknown;
@@ -4996,15 +4956,25 @@ report_missing_drv_callback(Port *p, char *drv_type, char *callback)
void
erts_stale_drv_select(Eterm port,
+ ErlDrvPort drv_port,
ErlDrvEvent hndl,
int mode,
int deselect)
{
char *type;
- ErlDrvPort drv_port = ERTS_Port2ErlDrvPort(erts_port_lookup_raw(port));
- ErtsPortNames *pnp = erts_get_port_names(port);
+ ErtsPortNames *pnp;
erts_dsprintf_buf_t *dsbufp;
+ if (drv_port == ERTS_INVALID_ERL_DRV_PORT) {
+ Port *prt = erts_port_lookup_raw(port);
+ if (prt)
+ drv_port = ERTS_Port2ErlDrvPort(prt);
+ else
+ drv_port = ERTS_INVALID_ERL_DRV_PORT;
+ }
+
+ pnp = erts_get_port_names(port, drv_port);
+
switch (mode) {
case ERL_DRV_READ | ERL_DRV_WRITE:
type = "Input/Output";
@@ -5039,12 +5009,16 @@ erts_stale_drv_select(Eterm port,
}
ErtsPortNames *
-erts_get_port_names(Eterm id)
+erts_get_port_names(Eterm id, ErlDrvPort drv_port)
{
- Port *prt = erts_port_lookup_raw(id);
+ Port *prt;
ErtsPortNames *pnp;
ASSERT(is_nil(id) || is_internal_port(id));
+ prt = ERTS_ErlDrvPort2Port(drv_port);
+ if (prt == ERTS_INVALID_ERL_DRV_PORT)
+ prt = erts_port_lookup_raw(id);
+
if (!prt) {
pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, sizeof(ErtsPortNames));
pnp->name = NULL;
@@ -5056,6 +5030,7 @@ erts_get_port_names(Eterm id)
size_t pnp_len = sizeof(ErtsPortNames);
#ifndef DEBUG
pnp_len += 100; /* In most cases 100 characters will be enough... */
+ ASSERT(prt->common.id == id);
#endif
pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len);
do {
@@ -5098,24 +5073,6 @@ erts_free_port_names(ErtsPortNames *pnp)
erts_free(ERTS_ALC_T_PORT_NAMES, pnp);
}
-static void schedule_port_timeout(Port *p)
-{
- /*
- * Scheduling of port timeouts can be done without port locking, but
- * since the task handle is stored in the port structure and the ptimer
- * structure is protected by the port lock we require the port to be
- * locked for now...
- *
- * TODO: Implement scheduling of port timeouts without locking
- * the port.
- * /Rickard
- */
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
- erts_port_task_schedule(p->common.id,
- &p->timeout_task,
- ERTS_PORT_TASK_TIMEOUT);
-}
-
ErlDrvTermData driver_mk_term_nil(void)
{
return driver_term_nil;
@@ -5144,7 +5101,7 @@ void driver_report_exit(ErlDrvPort ix, int status)
rp = (scheduler
? erts_proc_lookup(pid)
- : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_SMP_INC_REFC));
+ : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_INC_REFC));
if (!rp)
return;
@@ -5154,15 +5111,11 @@ void driver_report_exit(ErlDrvPort ix, int status)
hp += 3;
tuple = TUPLE2(hp, prt->common.id, tuple);
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
- erts_smp_proc_dec_refc(rp);
+ erts_proc_dec_refc(rp);
}
#define ERTS_B2T_STATES_DEF_STATES_SZ 5
@@ -5254,22 +5207,27 @@ cleanup_b2t_states(struct b2t_states__ *b2tsp)
static int
driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
{
+#define HEAP_EXTRA 200
#define ERTS_DDT_FAIL do { res = -1; goto done; } while (0)
Uint need = 0;
int depth = 0;
int res;
- Eterm *hp = NULL, *hp_start = NULL, *hp_end = NULL;
ErlDrvTermData* ptr;
ErlDrvTermData* ptr_end;
DECLARE_ESTACK(stack);
- Eterm mess = NIL; /* keeps compiler happy */
+ Eterm mess;
Process* rp = NULL;
- ErlHeapFragment *bp = NULL;
- ErlOffHeap *ohp;
+ ErtsHeapFactory factory;
ErtsProcLocks rp_locks = 0;
struct b2t_states__ b2t;
- int scheduler = 1; /* Silence erroneous warning... */
+ int scheduler;
+ int is_heap_need_limited = 1;
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ ERTS_UNDEF(mess,NIL);
+ ERTS_UNDEF(scheduler,1);
+
+ factory.mode = FACTORY_CLOSED;
init_b2t_states(&b2t);
/*
@@ -5433,14 +5391,34 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
#ifdef DEBUG
b2t.org_ext[b2t.ix] = ext;
#endif
- hsz = erts_binary2term_prepare(&b2t.state[b2t.ix++], ext, size);
+ hsz = erts_binary2term_prepare(&b2t.state[b2t.ix], ext, size);
if (hsz < 0)
ERTS_DDT_FAIL; /* Invalid data */
+ b2t.state[b2t.ix++].heap_size = hsz;
need += hsz;
ptr += 2;
depth++;
+ if (size > MAP_SMALL_MAP_LIMIT*3) { /* may contain big map */
+ is_heap_need_limited = 0;
+ }
+ break;
+ }
+ case ERL_DRV_MAP: { /* int */
+ ERTS_DDT_CHK_ENOUGH_ARGS(1);
+ if ((int) ptr[0] < 0) ERTS_DDT_FAIL;
+ if (ptr[0] > MAP_SMALL_MAP_LIMIT) {
+ need += HASHMAP_ESTIMATED_HEAP_SIZE(ptr[0]);
+ is_heap_need_limited = 0;
+ } else {
+ need += MAP_HEADER_FLATMAP_SZ + 1 + 2*ptr[0];
+ }
+ depth -= 2*ptr[0];
+ if (depth < 0) ERTS_DDT_FAIL;
+ ptr++;
+ depth++;
break;
}
+
default:
ERTS_DDT_FAIL;
}
@@ -5463,14 +5441,23 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
scheduler = erts_get_scheduler_id() != 0;
rp = (scheduler
? erts_proc_lookup(to)
- : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC));
+ : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_INC_REFC));
if (!rp) {
res = 0;
goto done;
}
- hp_start = hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
- hp_end = hp + need;
+ /* Try copy directly to destination heap if we know there are no big maps */
+ if (is_heap_need_limited) {
+ ErlOffHeap *ohp;
+ ErlHeapFragment* bp;
+ Eterm* hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+ erts_factory_message_init(&factory, rp, hp, bp);
+ }
+ else {
+ erts_factory_message_init(&factory, NULL, NULL,
+ new_message_buffer(need));
+ }
/*
* Interpret the instructions and build the term.
@@ -5491,13 +5478,15 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
case ERL_DRV_INT: /* signed int argument */
#if HALFWORD_HEAP
- mess = erts_bld_sint64(&hp, NULL, (Sint64)ptr[0]);
+ erts_reserve_heap(&factory, BIG_NEED_SIZE(2));
+ mess = erts_bld_sint64(&factory.hp, NULL, (Sint64)ptr[0]);
#else
+ erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE);
if (IS_SSMALL((Sint)ptr[0]))
mess = make_small((Sint)ptr[0]);
else {
- mess = small_to_big((Sint)ptr[0], hp);
- hp += BIG_UINT_HEAP_SIZE;
+ mess = small_to_big((Sint)ptr[0], factory.hp);
+ factory.hp += BIG_UINT_HEAP_SIZE;
}
#endif
ptr++;
@@ -5505,25 +5494,29 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
case ERL_DRV_UINT: /* unsigned int argument */
#if HALFWORD_HEAP
- mess = erts_bld_uint64(&hp, NULL, (Uint64)ptr[0]);
+ erts_reserve_heap(&factory, BIG_NEED_FOR_BITS(64));
+ mess = erts_bld_uint64(&factory.hp, NULL, (Uint64)ptr[0]);
#else
+ erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE);
if (IS_USMALL(0, (Uint)ptr[0]))
mess = make_small((Uint)ptr[0]);
else {
- mess = uint_to_big((Uint)ptr[0], hp);
- hp += BIG_UINT_HEAP_SIZE;
+ mess = uint_to_big((Uint)ptr[0], factory.hp);
+ factory.hp += BIG_UINT_HEAP_SIZE;
}
#endif
ptr++;
break;
case ERL_DRV_INT64: /* pointer to unsigned 64-bit int argument */
- mess = erts_bld_sint64(&hp, NULL, *((Sint64 *) ptr[0]));
+ erts_reserve_heap(&factory, BIG_NEED_FOR_BITS(64));
+ mess = erts_bld_sint64(&factory.hp, NULL, *((Sint64 *) ptr[0]));
ptr++;
break;
case ERL_DRV_UINT64: /* pointer to unsigned 64-bit int argument */
- mess = erts_bld_uint64(&hp, NULL, *((Uint64 *) ptr[0]));
+ erts_reserve_heap(&factory, BIG_NEED_FOR_BITS(64));
+ mess = erts_bld_uint64(&factory.hp, NULL, *((Uint64 *) ptr[0]));
ptr++;
break;
@@ -5537,11 +5530,14 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
Uint size = ptr[1];
Uint offset = ptr[2];
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) size);
+ if (esdp)
+ esdp->io.in += (Uint64) size;
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) size);
if (size <= ERL_ONHEAP_BIN_LIMIT) {
- ErlHeapBin* hbp = (ErlHeapBin *) hp;
- hp += heap_bin_size(size);
+ ErlHeapBin* hbp = (ErlHeapBin *) erts_produce_heap(&factory,
+ heap_bin_size(size), HEAP_EXTRA);
hbp->thing_word = header_heap_bin(size);
hbp->size = size;
if (size > 0) {
@@ -5550,18 +5546,18 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
mess = make_binary(hbp);
}
else {
- ProcBin* pb = (ProcBin *) hp;
+ ProcBin* pb = (ProcBin *) erts_produce_heap(&factory,
+ PROC_BIN_SIZE, HEAP_EXTRA);
driver_binary_inc_refc(b); /* caller will free binary */
pb->thing_word = HEADER_PROC_BIN;
pb->size = size;
- pb->next = ohp->first;
- ohp->first = (struct erl_off_heap_header*)pb;
+ pb->next = factory.off_heap->first;
+ factory.off_heap->first = (struct erl_off_heap_header*)pb;
pb->val = ErlDrvBinary2Binary(b);
pb->bytes = ((byte*) b->orig_bytes) + offset;
pb->flags = 0;
mess = make_binary(pb);
- hp += PROC_BIN_SIZE;
- OH_OVERHEAD(ohp, pb->size / sizeof(Eterm));
+ OH_OVERHEAD(factory.off_heap, pb->size / sizeof(Eterm));
}
ptr += 3;
break;
@@ -5571,11 +5567,15 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
byte *bufp = (byte *) ptr[0];
Uint size = (Uint) ptr[1];
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) size);
+ if (esdp)
+ esdp->io.in += (Uint64) size;
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) size);
if (size <= ERL_ONHEAP_BIN_LIMIT) {
- ErlHeapBin* hbp = (ErlHeapBin *) hp;
- hp += heap_bin_size(size);
+ ErlHeapBin* hbp = (ErlHeapBin *) erts_produce_heap(&factory,
+ heap_bin_size(size),
+ HEAP_EXTRA);
hbp->thing_word = header_heap_bin(size);
hbp->size = size;
if (size > 0) {
@@ -5588,20 +5588,18 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
ProcBin* pbp;
Binary* bp = erts_bin_nrml_alloc(size);
ASSERT(bufp);
- bp->flags = 0;
- bp->orig_size = (SWord) size;
erts_refc_init(&bp->refc, 1);
sys_memcpy((void *) bp->orig_bytes, (void *) bufp, size);
- pbp = (ProcBin *) hp;
- hp += PROC_BIN_SIZE;
+ pbp = (ProcBin *) erts_produce_heap(&factory,
+ PROC_BIN_SIZE, HEAP_EXTRA);
pbp->thing_word = HEADER_PROC_BIN;
pbp->size = size;
- pbp->next = ohp->first;
- ohp->first = (struct erl_off_heap_header*)pbp;
+ pbp->next = factory.off_heap->first;
+ factory.off_heap->first = (struct erl_off_heap_header*)pbp;
pbp->val = bp;
pbp->bytes = (byte*) bp->orig_bytes;
pbp->flags = 0;
- OH_OVERHEAD(ohp, pbp->size / sizeof(Eterm));
+ OH_OVERHEAD(factory.off_heap, pbp->size / sizeof(Eterm));
mess = make_binary(pbp);
}
ptr += 2;
@@ -5609,14 +5607,19 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
}
case ERL_DRV_STRING: /* char*, length */
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) ptr[1]);
- mess = buf_to_intlist(&hp, (char*)ptr[0], ptr[1], NIL);
+ if (esdp)
+ esdp->io.in += (Uint64) ptr[1];
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) ptr[1]);
+ erts_reserve_heap(&factory, 2*ptr[1]);
+ mess = buf_to_intlist(&factory.hp, (char*)ptr[0], ptr[1], NIL);
ptr += 2;
break;
case ERL_DRV_STRING_CONS: /* char*, length */
mess = ESTACK_POP(stack);
- mess = buf_to_intlist(&hp, (char*)ptr[0], ptr[1], mess);
+ erts_reserve_heap(&factory, 2*ptr[1]);
+ mess = buf_to_intlist(&factory.hp, (char*)ptr[0], ptr[1], mess);
ptr += 2;
break;
@@ -5625,11 +5628,12 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
mess = ESTACK_POP(stack);
i--;
+ erts_reserve_heap(&factory, 2*i);
while(i > 0) {
Eterm hd = ESTACK_POP(stack);
- mess = CONS(hp, hd, mess);
- hp += 2;
+ mess = CONS(factory.hp, hd, mess);
+ factory.hp += 2;
i--;
}
ptr++;
@@ -5638,13 +5642,12 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
case ERL_DRV_TUPLE: { /* int */
int size = (int)ptr[0];
- Eterm* tp = hp;
+ Eterm* tp = erts_produce_heap(&factory, size+1, HEAP_EXTRA);
*tp = make_arityval(size);
mess = make_tuple(tp);
tp += size; /* point at last element */
- hp = tp+1; /* advance "heap" pointer */
while(size--) {
*tp-- = ESTACK_POP(stack);
@@ -5660,23 +5663,69 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
case ERL_DRV_FLOAT: { /* double * */
FloatDef f;
+ Eterm* fp = erts_produce_heap(&factory, FLOAT_SIZE_OBJECT, HEAP_EXTRA);
- mess = make_float(hp);
+ mess = make_float(fp);
f.fd = *((double *) ptr[0]);
- PUT_DOUBLE(f, hp);
- hp += FLOAT_SIZE_OBJECT;
+ if (!erts_isfinite(f.fd))
+ ERTS_DDT_FAIL;
+ PUT_DOUBLE(f, fp);
ptr++;
break;
}
case ERL_DRV_EXT2TERM: /* char *ext, int size */
ASSERT(b2t.org_ext[b2t.ix] == (byte *) ptr[0]);
- mess = erts_binary2term_create(&b2t.state[b2t.ix++], &hp, ohp);
+
+ erts_reserve_heap(&factory, b2t.state[b2t.ix].heap_size);
+ mess = erts_binary2term_create(&b2t.state[b2t.ix++], &factory);
if (mess == THE_NON_VALUE)
ERTS_DDT_FAIL;
ptr += 2;
break;
+ case ERL_DRV_MAP: { /* int */
+ int size = (int)ptr[0];
+ if (size > MAP_SMALL_MAP_LIMIT) {
+ int ix = 2*size;
+ Eterm* leafs;
+
+ erts_produce_heap(&factory, ix, HEAP_EXTRA);
+ leafs = factory.hp;
+ while(ix--) { *--leafs = ESTACK_POP(stack); }
+
+ mess = erts_hashmap_from_array(&factory, leafs, size, 1);
+ if (is_non_value(mess))
+ ERTS_DDT_FAIL;
+ } else {
+ Eterm* vp;
+ flatmap_t *mp;
+ Eterm* tp = erts_produce_heap(&factory,
+ 2*size + 1 + MAP_HEADER_FLATMAP_SZ,
+ HEAP_EXTRA);
+
+ *tp = make_arityval(size);
+
+ mp = (flatmap_t*) (tp + 1 + size);
+ mp->thing_word = MAP_HEADER_FLATMAP;
+ mp->size = size;
+ mp->keys = make_tuple(tp);
+ mess = make_flatmap(mp);
+
+ tp += size; /* point at last key */
+ vp = factory.hp - 1; /* point at last value */
+
+ while(size--) {
+ *vp-- = ESTACK_POP(stack);
+ *tp-- = ESTACK_POP(stack);
+ }
+ if (!erts_validate_and_sort_flatmap(mp))
+ ERTS_DDT_FAIL;
+ }
+ ptr++;
+ break;
+ }
+
}
ESTACK_PUSH(stack, mess);
}
@@ -5687,42 +5736,28 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
if (res > 0) {
mess = ESTACK_POP(stack); /* get resulting value */
- if (bp)
- bp = erts_resize_message_buffer(bp, hp - hp_start, &mess, 1);
- else {
- ASSERT(hp);
- HRelease(rp, hp_end, hp);
- }
+ erts_factory_close(&factory);
/* send message */
- erts_queue_message(rp, &rp_locks, bp, mess, am_undefined
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, &rp_locks, factory.heap_frags, mess, am_undefined);
}
else {
if (b2t.ix > b2t.used)
b2t.used = b2t.ix;
for (b2t.ix = 0; b2t.ix < b2t.used; b2t.ix++)
erts_binary2term_abort(&b2t.state[b2t.ix]);
- if (bp)
- free_message_buffer(bp);
- else if (hp) {
- HRelease(rp, hp_end, hp);
- }
+ erts_factory_undo(&factory);
}
-#ifdef ERTS_SMP
if (rp) {
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
- erts_smp_proc_dec_refc(rp);
+ erts_proc_dec_refc(rp);
}
-#endif
cleanup_b2t_states(&b2t);
DESTROY_ESTACK(stack);
return res;
#undef ERTS_DDT_FAIL
+#undef HEAP_EXTRA
}
static ERTS_INLINE int
@@ -5851,6 +5886,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
{
erts_aint32_t state;
Port* prt = erts_drvport2port_state(ix, &state);
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -5861,7 +5897,10 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
return 0;
prt->bytes_in += (hlen + len);
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len));
+ if (esdp)
+ esdp->io.in += (Uint64) (hlen + len);
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
return erts_net_message(prt,
prt->dist_entry,
@@ -5886,6 +5925,7 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
{
erts_aint32_t state;
Port* prt = erts_drvport2port_state(ix, &state);
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -5897,7 +5937,10 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
return 0;
prt->bytes_in += (hlen + len);
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len));
+ if (esdp)
+ esdp->io.in += (Uint64) (hlen + len);
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
if (len == 0)
return erts_net_message(prt,
@@ -5937,6 +5980,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
ErlDrvBinary** binv;
Port* prt;
erts_aint32_t state;
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -5945,10 +5989,6 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
return driver_output2(ix, hbuf, hlen, NULL, 0);
size = vec->size - skip; /* Size of remaining bytes in vector */
- ASSERT(hlen >= 0); /* debug only */
- if (hlen < 0)
- hlen = 0;
-
prt = erts_drvport2port_state(ix, &state);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
@@ -5971,7 +6011,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
binv++;
n--;
} else {
- iov->iov_base += skip;
+ iov->iov_base = ((char *)(iov->iov_base)) + skip;
iov->iov_len -= skip;
skip = 0;
}
@@ -5979,7 +6019,10 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
/* XXX handle distribution !!! */
prt->bytes_in += (hlen + size);
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + size));
+ if (esdp)
+ esdp->io.in += (Uint64) (hlen + size);
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + size));
deliver_vec_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen,
binv, iov, n, size);
return 0;
@@ -6055,9 +6098,7 @@ driver_alloc_binary(ErlDrvSizeT size)
bin = erts_bin_drv_alloc_fnf((Uint) size);
if (!bin)
return NULL; /* The driver write must take action */
- bin->flags = BIN_FLAG_DRV;
erts_refc_init(&bin->refc, 1);
- bin->orig_size = (SWord) size;
return Binary2ErlDrvBinary(bin);
}
@@ -6087,7 +6128,6 @@ ErlDrvBinary* driver_realloc_binary(ErlDrvBinary* bin, ErlDrvSizeT size)
if (!newbin)
return NULL;
- newbin->orig_size = size;
return Binary2ErlDrvBinary(newbin);
}
@@ -6231,7 +6271,7 @@ driver_pdl_create(ErlDrvPort dp)
return NULL;
pdl = erts_alloc(ERTS_ALC_T_PORT_DATA_LOCK,
sizeof(struct erl_drv_port_data_lock));
- erts_mtx_init(&pdl->mtx, "port_data_lock");
+ erts_mtx_init_x(&pdl->mtx, "port_data_lock", pp->common.id, 1);
pdl_init_refc(pdl);
erts_port_inc_refc(pp);
pdl->prt = pp;
@@ -6416,7 +6456,7 @@ int driver_enqv(ErlDrvPort ix, ErlIOVec* vec, ErlDrvSizeT skip)
n--;
}
else {
- iov->iov_base += skip;
+ iov->iov_base = ((char *)(iov->iov_base)) + skip;
iov->iov_len -= skip;
skip = 0;
}
@@ -6481,7 +6521,7 @@ int driver_pushqv(ErlDrvPort ix, ErlIOVec* vec, ErlDrvSizeT skip)
n--;
}
else {
- iov->iov_base += skip;
+ iov->iov_base = ((char *)(iov->iov_base)) + skip;
iov->iov_len -= skip;
skip = 0;
}
@@ -6540,7 +6580,7 @@ ErlDrvSizeT driver_deq(ErlDrvPort ix, ErlDrvSizeT size)
q->v_head++;
}
else {
- q->v_head->iov_base += size;
+ q->v_head->iov_base = ((char *)(q->v_head->iov_base)) + size;
q->v_head->iov_len -= size;
size = 0;
}
@@ -6671,18 +6711,6 @@ int driver_pushq(ErlDrvPort ix, char* buffer, ErlDrvSizeT len)
return code;
}
-static ERTS_INLINE void
-drv_cancel_timer(Port *prt)
-{
-#ifdef ERTS_SMP
- erts_cancel_smp_ptimer(prt->common.u.alive.ptimer);
-#else
- erts_cancel_timer(&prt->common.u.alive.tm);
-#endif
- if (erts_port_task_is_scheduled(&prt->timeout_task))
- erts_port_task_abort(&prt->timeout_task);
-}
-
int driver_set_timer(ErlDrvPort ix, unsigned long t)
{
Port* prt = erts_drvport2port(ix);
@@ -6694,19 +6722,8 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t)
if (prt->drv_ptr->timeout == NULL)
return -1;
- drv_cancel_timer(prt);
-#ifdef ERTS_SMP
- erts_create_smp_ptimer(&prt->common.u.alive.ptimer,
- prt->common.id,
- (ErlTimeoutProc) schedule_port_timeout,
- t);
-#else
- erts_set_timer(&prt->common.u.alive.tm,
- (ErlTimeoutProc) schedule_port_timeout,
- NULL,
- prt,
- t);
-#endif
+
+ erts_set_port_timer(prt, (Sint64) t);
return 0;
}
@@ -6716,28 +6733,28 @@ int driver_cancel_timer(ErlDrvPort ix)
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- drv_cancel_timer(prt);
+ erts_cancel_port_timer(prt);
return 0;
}
-
int
driver_read_timer(ErlDrvPort ix, unsigned long* t)
{
Port* prt = erts_drvport2port(ix);
+ Sint64 left;
ERTS_SMP_CHK_NO_PROC_LOCKS;
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-#ifdef ERTS_SMP
- *t = (prt->common.u.alive.ptimer
- ? erts_time_left(&prt->common.u.alive.ptimer->timer.tm)
- : 0);
-#else
- *t = erts_time_left(&prt->common.u.alive.tm);
-#endif
+
+ left = erts_read_port_timer(prt);
+ if (left < 0)
+ left = 0;
+
+ *t = (unsigned long) left;
+
return 0;
}
@@ -6757,11 +6774,33 @@ driver_get_now(ErlDrvNowData *now_data)
return 0;
}
+ErlDrvTime
+erl_drv_monotonic_time(ErlDrvTimeUnit time_unit)
+{
+ return (ErlDrvTime) erts_napi_monotonic_time((int) time_unit);
+}
+
+ErlDrvTime
+erl_drv_time_offset(ErlDrvTimeUnit time_unit)
+{
+ return (ErlDrvTime) erts_napi_time_offset((int) time_unit);
+}
+
+ErlDrvTime
+erl_drv_convert_time_unit(ErlDrvTime val,
+ ErlDrvTimeUnit from,
+ ErlDrvTimeUnit to)
+{
+ return (ErlDrvTime) erts_napi_convert_time_unit((ErtsMonotonicTime) val,
+ (int) from,
+ (int) to);
+}
+
static void ref_to_driver_monitor(Eterm ref, ErlDrvMonitor *mon)
{
RefThing *refp;
ASSERT(is_internal_ref(ref));
- ASSERT(sizeof(RefThing) <= sizeof(ErlDrvMonitor));
+ ERTS_CT_ASSERT(sizeof(RefThing) <= sizeof(ErlDrvMonitor));
refp = ref_thing_ptr(ref);
memset(mon,0,sizeof(ErlDrvMonitor));
memcpy(mon,refp,sizeof(RefThing));
@@ -7213,7 +7252,7 @@ void *driver_dl_open(char * path)
int res;
int *last_error_p = erts_smp_tsd_get(driver_list_last_error_key);
int locked = maybe_lock_driver_list();
- if ((res = erts_sys_ddll_open(path, &ptr)) == 0) {
+ if ((res = erts_sys_ddll_open(path, &ptr, NULL)) == 0) {
maybe_unlock_driver_list(locked);
return ptr;
} else {
@@ -7268,7 +7307,7 @@ char *driver_dl_error(void)
#define ERL_DRV_SYS_INFO_SIZE(LAST_FIELD) \
- (((size_t) &((ErlDrvSysInfo *) 0)->LAST_FIELD) \
+ (offsetof(ErlDrvSysInfo, LAST_FIELD) \
+ sizeof(((ErlDrvSysInfo *) 0)->LAST_FIELD))
void
@@ -7284,7 +7323,7 @@ driver_system_info(ErlDrvSysInfo *sip, size_t si_size)
* of ErlDrvSysInfo (introduced in driver version 1.0).
*/
if (!sip || si_size < ERL_DRV_SYS_INFO_SIZE(smp_support))
- erl_exit(1,
+ erts_exit(ERTS_ERROR_EXIT,
"driver_system_info(%p, %ld) called with invalid arguments\n",
sip, si_size);
@@ -7330,6 +7369,18 @@ driver_system_info(ErlDrvSysInfo *sip, size_t si_size)
sip->nif_major_version = ERL_NIF_MAJOR_VERSION;
sip->nif_minor_version = ERL_NIF_MINOR_VERSION;
}
+ /*
+ * 'dirty_scheduler_support' is the last field in the 4th version
+ * (driver version 3.1, NIF version 2.7)
+ */
+ if (si_size >= ERL_DRV_SYS_INFO_SIZE(dirty_scheduler_support)) {
+#if defined(ERL_NIF_DIRTY_SCHEDULER_SUPPORT) && defined(USE_THREADS)
+ sip->dirty_scheduler_support = 1;
+#else
+ sip->dirty_scheduler_support = 0;
+#endif
+ }
+
}
@@ -7393,6 +7444,8 @@ no_stop_select_callback(ErlDrvEvent event, void* private)
erts_send_error_to_logger_nogl(dsbufp);
}
+#define IS_DRIVER_VERSION_GE(DE,MAJOR,MINOR) \
+ ((DE)->major_version >= (MAJOR) && (DE)->minor_version >= (MINOR))
static int
init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
@@ -7416,10 +7469,11 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
erts_atom_put((byte *) drv->name,
sys_strlen(drv->name),
ERTS_ATOM_ENC_LATIN1,
- 1)
+ 1),
#else
- NIL
+ NIL,
#endif
+ 1
);
}
#endif
@@ -7439,6 +7493,7 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
drv->timeout = de->timeout ? de->timeout : no_timeout_callback;
drv->ready_async = de->ready_async;
drv->process_exit = de->process_exit;
+ drv->emergency_close = IS_DRIVER_VERSION_GE(de,3,2) ? de->emergency_close : NULL;
if (de->stop_select)
drv->stop_select = de->stop_select;
else
@@ -7457,6 +7512,8 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
}
}
+#undef IS_DRIVER_VERSION_GE
+
void
erts_destroy_driver(erts_driver_t *drv)
{
@@ -7573,15 +7630,15 @@ int null_func(void)
}
int
-erl_drv_putenv(char *key, char *value)
+erl_drv_putenv(const char *key, char *value)
{
- return erts_sys_putenv_raw(key, value);
+ return erts_sys_putenv_raw((char*)key, value);
}
int
-erl_drv_getenv(char *key, char *value, size_t *value_size)
+erl_drv_getenv(const char *key, char *value, size_t *value_size)
{
- return erts_sys_getenv_raw(key, value, value_size);
+ return erts_sys_getenv_raw((char*)key, value, value_size);
}
/* get heart_port
@@ -7600,7 +7657,7 @@ Port *erts_get_heart_port(void)
if (!port)
continue;
/* only examine undead or alive ports */
- if (erts_atomic32_read_nob(&port->state) & ERTS_PORT_SFLGS_DEAD)
+ if (erts_atomic32_read_nob(&port->state) & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
continue;
/* immediate atom compare */
reg = port->common.u.alive.reg;
@@ -7611,3 +7668,23 @@ Port *erts_get_heart_port(void)
return NULL;
}
+
+void erts_emergency_close_ports(void)
+{
+ int ix, max = erts_ptab_max(&erts_port);
+
+ for (ix = 0; ix < max; ix++) {
+ Port *port = erts_pix2port(ix);
+
+ if (!port)
+ continue;
+ /* only examine undead or alive ports */
+ if (erts_atomic32_read_nob(&port->state) & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ continue;
+
+ /* emergency close socket */
+ if (port->drv_ptr->emergency_close) {
+ port->drv_ptr->emergency_close((ErlDrvData) port->drv_data);
+ }
+ }
+}