aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c678
1 files changed, 558 insertions, 120 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 41b0fcdd1b..0377f6cb5e 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2014. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2016. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -47,6 +47,7 @@
#define ERTS_WANT_EXTERNAL_TAGS
#include "external.h"
#include "dtrace-wrapper.h"
+#include "lttng-wrapper.h"
#include "erl_map.h"
#include "erl_bif_unique.h"
#include "erl_hl_timer.h"
@@ -87,7 +88,7 @@ int erts_port_parallelism = 0;
static erts_atomic64_t bytes_in;
static erts_atomic64_t bytes_out;
-static void deliver_result(Eterm sender, Eterm pid, Eterm res);
+static void deliver_result(Port *p, Eterm sender, Eterm pid, Eterm res);
static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *);
static void terminate_port(Port *p);
static void pdl_init(void);
@@ -211,6 +212,7 @@ static ERTS_INLINE void
kill_port(Port *pp)
{
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ ERTS_TRACER_CLEAR(&ERTS_TRACER(pp));
erts_ptab_delete_element(&erts_port, &pp->common); /* Time of death */
erts_port_task_free_port(pp);
/* In non-smp case the port structure may have been deallocated now */
@@ -397,13 +399,13 @@ static Port *create_port(char *name,
prt->common.u.alive.reg = NULL;
ERTS_PTMR_INIT(prt);
erts_port_task_handle_init(&prt->timeout_task);
- prt->psd = NULL;
+ erts_smp_atomic_init_nob(&prt->psd, (erts_aint_t) NULL);
prt->async_open_port = NULL;
prt->drv_data = (SWord) 0;
prt->os_pid = -1;
/* Set default tracing */
- erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt));
+ erts_get_default_port_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER(prt));
ERTS_CT_ASSERT(offsetof(Port,common) == 0);
@@ -708,7 +710,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
if (driver->start) {
ERTS_MSACC_PUSH_STATE_M();
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
- trace_sched_ports_where(port, am_in, am_start);
+ trace_sched_ports_where(port, am_in, am_open);
}
port->caller = pid;
#ifdef USE_VM_PROBES
@@ -717,7 +719,19 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
DTRACE3(driver_start, process_str, driver->name, port_str);
}
#endif
+
ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
+
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_start)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(pid, proc_str);
+ lttng_port_to_str(port, port_str);
+ LTTNG3(driver_start, proc_str, driver->name, port_str);
+ }
+#endif
+
fpe_was_unmasked = erts_block_fpe();
drv_data = (*driver->start)(ERTS_Port2ErlDrvPort(port), name, opts);
if (((SWord) drv_data) == -1)
@@ -740,7 +754,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
ERTS_MSACC_POP_STATE_M();
port->caller = NIL;
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
- trace_sched_ports_where(port, am_out, am_start);
+ trace_sched_ports_where(port, am_out, am_open);
}
#ifdef ERTS_SMP
if (port->xports)
@@ -1287,7 +1301,7 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp)
reds_left_in = CONTEXT_REDS/10;
else {
if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS))
- trace_virtual_sched(c_p, am_out);
+ trace_sched(c_p, ERTS_PROC_LOCK_MAIN, am_out);
/*
* No status lock held while sending runnable
* proc trace messages. It is however not needed
@@ -1371,7 +1385,7 @@ finalize_imm_drv_call(ErtsTryImmDrvCallState *sp)
}
if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS))
- trace_virtual_sched(c_p, am_in);
+ trace_sched(c_p, ERTS_PROC_LOCK_MAIN, am_in);
/*
* No status lock held while sending runnable
* proc trace messages. It is however not needed
@@ -1418,10 +1432,11 @@ finalize_force_imm_drv_call(ErtsTryImmDrvCallState *sp)
static ERTS_INLINE void
queue_port_sched_op_reply(Process *rp,
- ErtsProcLocks *rp_locksp,
+ ErtsProcLocks rp_locks,
ErtsHeapFactory* factory,
Uint32 *ref_num,
- Eterm msg)
+ Eterm msg,
+ Port* prt)
{
Eterm* hp = erts_produce_heap(factory, ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE, 0);
Eterm ref;
@@ -1434,11 +1449,12 @@ queue_port_sched_op_reply(Process *rp,
erts_factory_trim_and_close(factory, &msg, 1);
- erts_queue_message(rp, rp_locksp, factory->message, msg, NIL);
+ erts_queue_message(rp, rp_locks, factory->message, msg,
+ prt ? prt->common.id : am_undefined);
}
static void
-port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
+port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg, Port* prt)
{
Process *rp = erts_proc_lookup_raw(to);
if (rp) {
@@ -1464,10 +1480,11 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
factory.off_heap));
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
ref_num,
- msg_copy);
+ msg_copy,
+ prt);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -1522,9 +1539,8 @@ erts_schedule_proc2port_signal(Process *c_p,
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
c_p->msg.save = c_p->msg.last;
- erts_smp_proc_unlock(c_p,
- (ERTS_PROC_LOCK_MAIN
- | ERTS_PROC_LOCKS_MSG_RECEIVE));
+ erts_smp_proc_unlock(c_p, (ERTS_PROC_LOCKS_MSG_RECEIVE
+ | ERTS_PROC_LOCK_MAIN));
}
@@ -1542,8 +1558,19 @@ erts_schedule_proc2port_signal(Process *c_p,
erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MAIN);
if (sched_res != 0) {
- if (refp)
+ if (refp) {
+ /*
+ * We need to restore the message queue save
+ * pointer to the beginning of the message queue
+ * since the caller now wont wait for a message
+ * containing the reference created above...
+ */
+ ASSERT(c_p);
+ erts_smp_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
+ JOIN_MESSAGE(c_p);
+ erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
*refp = NIL;
+ }
return ERTS_PORT_OP_DROPPED;
}
return ERTS_PORT_OP_SCHEDULED;
@@ -1627,7 +1654,7 @@ port_badsig(Port *prt, erts_aint32_t state, int op,
state,
sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT);
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg, prt);
return ERTS_PORT_REDS_BADSIG;
} /* port_badsig */
/* bad_port_signal() will
@@ -1718,12 +1745,25 @@ call_driver_outputv(int bang_op,
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
+
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, caller, am_commandv, evp);
+
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_outputv)) {
DTRACE_FORMAT_COMMON_PID_AND_PORT(caller, prt);
DTRACE4(driver_outputv, process_str, port_str, prt->name, size);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_outputv)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(caller, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG4(driver_outputv, proc_str, port_str, prt->name, size);
+ }
+#endif
prt->caller = caller;
(*drv->outputv)((ErlDrvData) prt->drv_data, evp);
@@ -1749,7 +1789,6 @@ cleanup_scheduled_outputv(ErlIOVec *ev, ErlDrvBinary *cbinp)
driver_free_binary(ev->binv[i]);
if (cbinp)
driver_free_binary(cbinp);
- erts_free(ERTS_ALC_T_DRV_CMD_DATA, ev);
}
static int
@@ -1784,7 +1823,7 @@ port_sig_outputv(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *s
}
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, reply);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, reply, prt);
cleanup_scheduled_outputv(sigdp->u.outputv.evp,
sigdp->u.outputv.cbinp);
@@ -1826,6 +1865,18 @@ call_driver_output(int bang_op,
DTRACE4(driver_output, process_str, port_str, prt->name, size);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_output)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(caller, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG4(driver_output, proc_str, port_str, prt->name, size);
+ }
+#endif
+
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, caller, am_command, bufp, size);
prt->caller = caller;
(*drv->output)((ErlDrvData) prt->drv_data, bufp, size);
@@ -1880,13 +1931,195 @@ port_sig_output(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *si
}
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, reply);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, reply, prt);
cleanup_scheduled_output(sigdp->u.output.bufp);
return ERTS_PORT_REDS_CMD_OUTPUT;
}
+
+/*
+ * This erts_port_output will always create a port task.
+ * The call is treated as a port_command call, i.e. no
+ * badsig i generated if the input in invalid. However
+ * an error_logger message is generated.
+ */
+int
+erts_port_output_async(Port *prt, Eterm from, Eterm list)
+{
+
+ ErtsPortOpResult res;
+ ErtsProc2PortSigData *sigdp;
+ erts_driver_t *drv = prt->drv_ptr;
+ size_t size;
+ int task_flags;
+ ErtsProc2PortSigCallback port_sig_callback;
+ ErlDrvBinary *cbin = NULL;
+ ErlIOVec *evp = NULL;
+ char *buf = NULL;
+ ErtsPortTaskHandle *ns_pthp;
+
+ if (drv->outputv) {
+ ErlIOVec ev;
+ SysIOVec* ivp;
+ ErlDrvBinary** bvp;
+ int vsize;
+ Uint csize;
+ Uint pvsize;
+ Uint pcsize;
+ size_t iov_offset, binv_offset, alloc_size;
+ Uint blimit = 0;
+ char *ptr;
+ int i;
+
+ Eterm* bptr = NULL;
+ Uint offset;
+
+ if (is_binary(list)) {
+ /* We optimize for when we get a procbin without offset */
+ Eterm real_bin;
+ int bitoffs;
+ int bitsize;
+ ERTS_GET_REAL_BIN(list, real_bin, offset, bitoffs, bitsize);
+ bptr = binary_val(real_bin);
+ if (*bptr == HEADER_PROC_BIN && bitoffs == 0) {
+ size = binary_size(list);
+ vsize = 1;
+ } else
+ bptr = NULL;
+ }
+
+ if (!bptr) {
+ if (io_list_vec_len(list, &vsize, &csize, &pvsize, &pcsize, &size))
+ goto bad_value;
+
+ /* To pack or not to pack (small binaries) ...? */
+ if (vsize >= SMALL_WRITE_VEC) {
+ /* Do pack */
+ vsize = pvsize + 1;
+ csize = pcsize;
+ blimit = ERL_SMALL_IO_BIN_LIMIT;
+ }
+ cbin = driver_alloc_binary(csize);
+ if (!cbin)
+ erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
+ }
+
+
+ iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlIOVec));
+ binv_offset = iov_offset;
+ binv_offset += ERTS_ALC_DATA_ALIGN_SIZE((vsize+1)*sizeof(SysIOVec));
+ alloc_size = binv_offset;
+ alloc_size += (vsize+1)*sizeof(ErlDrvBinary *);
+
+ sigdp = erts_port_task_alloc_p2p_sig_data_extra(alloc_size, (void**)&ptr);
+
+ evp = (ErlIOVec *) ptr;
+ ivp = evp->iov = (SysIOVec *) (ptr + iov_offset);
+ bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset);
+
+ ivp[0].iov_base = NULL;
+ ivp[0].iov_len = 0;
+ bvp[0] = NULL;
+
+ if (bptr) {
+ ProcBin* pb = (ProcBin *) bptr;
+
+ ivp[1].iov_base = pb->bytes+offset;
+ ivp[1].iov_len = size;
+ bvp[1] = Binary2ErlDrvBinary(pb->val);
+
+ evp->vsize = 1;
+ } else {
+
+ evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
+ if (evp->vsize < 0) {
+ if (evp != &ev)
+ erts_free(ERTS_ALC_T_DRV_CMD_DATA, evp);
+ driver_free_binary(cbin);
+ goto bad_value;
+ }
+ }
+#if 0
+ /* This assertion may say something useful, but it can
+ be falsified during the emulator test suites. */
+ ASSERT(evp->vsize == vsize);
+#endif
+ evp->vsize++;
+ evp->size = size; /* total size */
+
+ /* Need to increase refc on all binaries */
+ for (i = 1; i < evp->vsize; i++)
+ if (bvp[i])
+ driver_binary_inc_refc(bvp[i]);
+
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
+ sigdp->u.outputv.from = from;
+ sigdp->u.outputv.evp = evp;
+ sigdp->u.outputv.cbinp = cbin;
+ port_sig_callback = port_sig_outputv;
+ } else {
+ ErlDrvSizeT ERTS_DECLARE_DUMMY(r);
+
+ /*
+ * Apperently there exist code that write 1 byte to
+ * much in buffer. Where it resides I don't know, but
+ * we can live with one byte extra allocated...
+ */
+
+ if (erts_iolist_size(list, &size))
+ goto bad_value;
+
+ buf = erts_alloc(ERTS_ALC_T_DRV_CMD_DATA, size + 1);
+
+ r = erts_iolist_to_buf(list, buf, size);
+ ASSERT(ERTS_IOLIST_TO_BUF_SUCCEEDED(r));
+
+ sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUT;
+ sigdp->u.output.from = from;
+ sigdp->u.output.bufp = buf;
+ sigdp->u.output.size = size;
+ port_sig_callback = port_sig_output;
+ }
+ sigdp->flags = 0;
+ ns_pthp = NULL;
+ task_flags = 0;
+
+ res = erts_schedule_proc2port_signal(NULL,
+ prt,
+ ERTS_INVALID_PID,
+ NULL,
+ sigdp,
+ task_flags,
+ ns_pthp,
+ port_sig_callback);
+
+ if (res != ERTS_PORT_OP_SCHEDULED) {
+ if (drv->outputv)
+ cleanup_scheduled_outputv(evp, cbin);
+ else
+ cleanup_scheduled_output(buf);
+ return 1;
+ }
+ return 1;
+
+bad_value:
+
+ /*
+ * We call badsig directly here as this function is called with
+ * the main lock of the calling process still held.
+ * At the moment this operation is always not a bang_op, so
+ * only an error_logger message should be generated, no badsig.
+ */
+
+ badsig_received(0, prt, erts_atomic32_read_nob(&prt->state), 1);
+
+ return 0;
+
+}
+
ErtsPortOpResult
erts_port_output(Process *c_p,
int flags,
@@ -1896,7 +2129,7 @@ erts_port_output(Process *c_p,
Eterm *refp)
{
ErtsPortOpResult res;
- ErtsProc2PortSigData *sigdp;
+ ErtsProc2PortSigData *sigdp = NULL;
erts_driver_t *drv = prt->drv_ptr;
size_t size;
int try_call;
@@ -1949,7 +2182,6 @@ erts_port_output(Process *c_p,
DTRACE4(port_command, process_str, port_str, prt->name, "command");
}
#endif
-
if (drv->outputv) {
ErlIOVec ev;
SysIOVec iv[SMALL_WRITE_VEC];
@@ -1978,10 +2210,13 @@ erts_port_output(Process *c_p,
evp = &ev;
}
else {
- char *ptr = erts_alloc((try_call
- ? ERTS_ALC_T_TMP
- : ERTS_ALC_T_DRV_CMD_DATA), alloc_size);
-
+ char *ptr;
+ if (try_call) {
+ ptr = erts_alloc(ERTS_ALC_T_TMP, alloc_size);
+ } else {
+ sigdp = erts_port_task_alloc_p2p_sig_data_extra(
+ alloc_size, (void**)&ptr);
+ }
evp = (ErlIOVec *) ptr;
ivp = evp->iov = (SysIOVec *) (ptr + iov_offset);
bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset);
@@ -2010,9 +2245,12 @@ erts_port_output(Process *c_p,
bvp[0] = NULL;
evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
if (evp->vsize < 0) {
- if (evp != &ev)
- erts_free(try_call ? ERTS_ALC_T_TMP : ERTS_ALC_T_DRV_CMD_DATA,
- evp);
+ if (evp != &ev) {
+ if (try_call)
+ erts_free(ERTS_ALC_T_TMP, evp);
+ else
+ erts_port_task_free_p2p_sig_data(sigdp);
+ }
driver_free_binary(cbin);
goto bad_value;
}
@@ -2064,8 +2302,10 @@ erts_port_output(Process *c_p,
/* Fall through... */
case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
driver_free_binary(cbin);
- if (evp != &ev)
+ if (evp != &ev) {
+ ASSERT(!sigdp);
erts_free(ERTS_ALC_T_TMP, evp);
+ }
if (try_call_res != ERTS_TRY_IMM_DRV_CALL_OK)
return ERTS_PORT_OP_DROPPED;
if (c_p)
@@ -2076,8 +2316,10 @@ erts_port_output(Process *c_p,
if (async_nosuspend
&& (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))) {
driver_free_binary(cbin);
- if (evp != &ev)
+ if (evp != &ev) {
+ ASSERT(!sigdp);
erts_free(ERTS_ALC_T_TMP, evp);
+ }
return ((sched_flags & ERTS_PTS_FLG_EXIT)
? ERTS_PORT_OP_DROPPED
: ERTS_PORT_OP_BUSY);
@@ -2092,9 +2334,16 @@ erts_port_output(Process *c_p,
if (bvp[i])
driver_binary_inc_refc(bvp[i]);
- new_evp = erts_alloc(ERTS_ALC_T_DRV_CMD_DATA, alloc_size);
+ /* The port task and iovec is allocated in the
+ same structure as an optimization. This
+ is especially important in erts_port_output_async
+ of when !try_call */
+ ASSERT(sigdp == NULL);
+ sigdp = erts_port_task_alloc_p2p_sig_data_extra(
+ alloc_size, (void**)&new_evp);
if (evp != &ev) {
+ /* Copy from TMP alloc to port task */
sys_memcpy((void *) new_evp, (void *) evp, alloc_size);
new_evp->iov = (SysIOVec *) (((char *) new_evp)
+ iov_offset);
@@ -2142,7 +2391,6 @@ erts_port_output(Process *c_p,
evp = new_evp;
}
- sigdp = erts_port_task_alloc_p2p_sig_data();
sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
sigdp->u.outputv.from = from;
sigdp->u.outputv.evp = evp;
@@ -2264,7 +2512,7 @@ erts_port_output(Process *c_p,
sigdp->flags &= ~ERTS_P2P_SIG_DATA_FLG_NOSUSPEND;
else if (async_nosuspend) {
ErtsSchedulerData *esdp = (c_p
- ? ERTS_PROC_GET_SCHDATA(c_p)
+ ? erts_proc_sched_data(c_p)
: erts_get_scheduler_data());
ASSERT(esdp);
ns_pthp = &esdp->nosuspend_port_task_handle;
@@ -2351,7 +2599,10 @@ call_deliver_port_exit(int bang_op,
return ERTS_PORT_OP_DROPPED;
}
- if (!erts_deliver_port_exit(prt, from, reason, bang_op))
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, from, am_close);
+
+ if (!erts_deliver_port_exit(prt, from, reason, bang_op, broken_link))
return ERTS_PORT_OP_DROPPED;
#ifdef USE_VM_PROBES
@@ -2388,7 +2639,7 @@ port_sig_exit(Port *prt,
if (sigdp->u.exit.bp)
free_message_buffer(sigdp->u.exit.bp);
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, msg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, msg, prt);
return ERTS_PORT_REDS_EXIT;
}
@@ -2422,13 +2673,13 @@ erts_port_exit(Process *c_p,
ERTS_PORT_SFLGS_INVALID_LOOKUP,
0,
!refp,
- am_exit);
+ am_close);
switch (try_imm_drv_call(&try_call_state)) {
case ERTS_TRY_IMM_DRV_CALL_OK: {
res = call_deliver_port_exit(flags & ERTS_PORT_SIG_FLG_BANG_OP,
- from,
+ c_p ? c_p->common.id : from,
prt,
try_call_state.state,
reason,
@@ -2507,8 +2758,11 @@ set_port_connected(int bang_op,
return ERTS_PORT_OP_DROPPED;
}
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, from, am_connect, connect);
+
ERTS_PORT_SET_CONNECTED(prt, connect);
- deliver_result(prt->common.id, from, am_connected);
+ deliver_result(prt, prt->common.id, from, am_connected);
#ifdef USE_VM_PROBES
if(DTRACE_ENABLED(port_command)) {
@@ -2530,10 +2784,22 @@ set_port_connected(int bang_op,
erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, prt->common.id);
erts_add_link(&ERTS_P_LINKS(prt), LINK_PID, connect);
+ if (IS_TRACED_FL(rp, F_TRACE_PROCS))
+ trace_proc(NULL, 0, rp, am_getting_linked, prt->common.id);
+
ERTS_PORT_SET_CONNECTED(prt, connect);
erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_getting_linked, connect);
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, from, am_connect, connect);
+ if (IS_TRACED_FL(prt, F_TRACE_SEND)) {
+ Eterm hp[3];
+ trace_port_send(prt, from, TUPLE2(hp, prt->common.id, am_connected), 1);
+ }
+
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_connect)) {
DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);
@@ -2566,7 +2832,7 @@ port_sig_connect(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *s
msg = am_true;
}
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, msg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, msg, prt);
return ERTS_PORT_REDS_CONNECT;
}
@@ -2636,8 +2902,11 @@ static void
port_unlink(Port *prt, Eterm from)
{
ErtsLink *lnk = erts_remove_link(&ERTS_P_LINKS(prt), from);
- if (lnk)
+ if (lnk) {
+ if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_getting_unlinked, from);
erts_destroy_link(lnk);
+ }
}
static int
@@ -2646,7 +2915,7 @@ port_sig_unlink(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *si
if (op == ERTS_PROC2PORT_SIG_EXEC)
port_unlink(prt, sigdp->u.unlink.from);
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
return ERTS_PORT_REDS_UNLINK;
}
@@ -2707,10 +2976,10 @@ port_link_failure(Eterm port_id, Eterm linker)
NIL,
NULL,
0);
- if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
+ if (xres >= 0) {
/* We didn't exit the process and it is traced */
if (IS_TRACED_FL(rp, F_TRACE_PROCS))
- trace_proc(NULL, rp, am_getting_unlinked, port_id);
+ trace_proc(NULL, 0, rp, am_getting_unlinked, port_id);
}
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -2721,10 +2990,15 @@ port_link_failure(Eterm port_id, Eterm linker)
static void
port_link(Port *prt, erts_aint32_t state, Eterm to)
{
- if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP))
+ if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_getting_linked, to);
+ if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) {
erts_add_link(&ERTS_P_LINKS(prt), LINK_PID, to);
- else
+ } else {
port_link_failure(prt->common.id, to);
+ if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_unlink, to);
+ }
}
static int
@@ -2732,10 +3006,11 @@ port_sig_link(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigd
{
if (op == ERTS_PROC2PORT_SIG_EXEC)
port_link(prt, state, sigdp->u.link.to);
- else
+ else {
port_link_failure(sigdp->u.link.port, sigdp->u.link.to);
+ }
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
return ERTS_PORT_REDS_LINK;
}
@@ -2792,7 +3067,8 @@ init_ack_send_reply(Port *port, Eterm resp)
}
port_sched_op_reply(port->async_open_port->to,
port->async_open_port->ref,
- resp);
+ resp,
+ port);
erts_free(ERTS_ALC_T_PRTSD, port->async_open_port);
port->async_open_port = NULL;
@@ -3153,7 +3429,7 @@ static int read_linebuf(LineBufContext *bp)
}
static void
-deliver_result(Eterm sender, Eterm pid, Eterm res)
+deliver_result(Port *prt, Eterm sender, Eterm pid, Eterm res)
{
Process *rp;
ErtsProcLocks rp_locks = 0;
@@ -3161,12 +3437,22 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ASSERT(!prt || prt->common.id == sender);
+#ifdef ERTS_SMP
+ ASSERT(!prt || erts_lc_is_port_locked(prt));
+#endif
+
ASSERT(is_internal_port(sender) && is_internal_pid(pid));
rp = (scheduler
? erts_proc_lookup(pid)
: erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_INC_REFC));
+ if (prt && IS_TRACED_FL(prt, F_TRACE_SEND)) {
+ Eterm hp[3];
+ trace_port_send(prt, pid, TUPLE2(hp, sender, res), !!rp);
+ }
+
if (rp) {
Eterm tuple;
ErtsMessage *mp;
@@ -3179,7 +3465,7 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
sz_res + 3, &hp, &ohp);
res = copy_struct(res, sz_res, &hp, ohp);
tuple = TUPLE2(hp, sender, res);
- erts_queue_message(rp, &rp_locks, mp, tuple, NIL);
+ erts_queue_message(rp, rp_locks, mp, tuple, sender);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -3211,6 +3497,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
ErlOffHeap *ohp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
+ int trace_send = IS_TRACED_FL(prt, F_TRACE_SEND);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -3233,7 +3520,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
if (!rp)
return;
- mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp);
+ mp = erts_alloc_message_heap(trace_send ? NULL : rp, &rp_locks, need, &hp, &ohp);
listp = NIL;
if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) {
@@ -3275,7 +3562,11 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
+ if (trace_send)
+ trace_port_send(prt, to, tuple, 1);
+
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
+ erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -3354,6 +3645,7 @@ deliver_vec_message(Port* prt, /* Port */
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
erts_aint32_t state;
+ int trace_send = IS_TRACED_FL(prt, F_TRACE_SEND);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -3381,7 +3673,7 @@ deliver_vec_message(Port* prt, /* Port */
need += (hlen+csize)*2;
}
- mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp);
+ mp = erts_alloc_message_heap(trace_send ? NULL : rp, &rp_locks, need, &hp, &ohp);
listp = NIL;
iov += vsize;
@@ -3442,7 +3734,11 @@ deliver_vec_message(Port* prt, /* Port */
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
+ if (IS_TRACED_FL(prt, F_TRACE_SEND))
+ trace_port_send(prt, to, tuple, 1);
+
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
+ erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
erts_proc_dec_refc(rp);
@@ -3489,6 +3785,17 @@ static void flush_port(Port *p)
DTRACE3(driver_flush, process_str, port_str, p->name);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_flush)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(p), proc_str);
+ lttng_port_to_str(p, port_str);
+ LTTNG3(driver_flush, proc_str, port_str, p->name);
+ }
+#endif
+
+
if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(p, am_in, am_flush);
}
@@ -3520,6 +3827,7 @@ terminate_port(Port *prt)
Eterm connected_id = NIL /* Initialize to silence compiler */;
erts_driver_t *drv;
erts_aint32_t state;
+ ErtsPrtSD *psd;
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -3551,6 +3859,16 @@ terminate_port(Port *prt)
DTRACE3(driver_stop, process_str, drv->name, port_str);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_stop)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(connected_id, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG3(driver_stop, proc_str, port_str, drv->name);
+ }
+#endif
+
(*drv->stop)((ErlDrvData)prt->drv_data);
erts_unblock_fpe(fpe_was_unmasked);
ERTS_MSACC_POP_STATE_M();
@@ -3560,6 +3878,11 @@ terminate_port(Port *prt)
ASSERT(!prt->xports);
#endif
}
+
+ if (is_internal_port(send_closed_port_id)
+ && IS_TRACED_FL(prt, F_TRACE_SEND))
+ trace_port_send(prt, connected_id, am_closed, 1);
+
if(drv->handle != NULL) {
erts_smp_rwmtx_rlock(&erts_driver_list_lock);
erts_ddll_decrement_port_count(drv->handle);
@@ -3573,8 +3896,9 @@ terminate_port(Port *prt)
erts_cleanup_port_data(prt);
- if (prt->psd)
- erts_free(ERTS_ALC_T_PRTSD, prt->psd);
+ psd = (ErtsPrtSD *) erts_smp_atomic_read_nob(&prt->psd);
+ if (psd)
+ erts_free(ERTS_ALC_T_PRTSD, psd);
ASSERT(prt->dist_entry == NULL);
@@ -3587,10 +3911,10 @@ terminate_port(Port *prt)
if ((state & ERTS_PORT_SFLG_HALT)
&& (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) {
erts_port_release(prt); /* We will exit and never return */
- erl_exit_flush_async(erts_halt_code, "");
+ erts_flush_async_exit(erts_halt_code, "");
}
if (is_internal_port(send_closed_port_id))
- deliver_result(send_closed_port_id, connected_id, am_closed);
+ deliver_result(NULL, send_closed_port_id, connected_id, am_closed);
}
void
@@ -3623,7 +3947,7 @@ static void sweep_one_monitor(ErtsMonitor *mon, void *vpsc)
typedef struct {
- Eterm port;
+ Port *port;
Eterm reason;
} SweepContext;
@@ -3632,10 +3956,13 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
SweepContext *psc = vpsc;
DistEntry *dep;
Process *rp;
-
+ Eterm port_id = psc->port->common.id;
ASSERT(lnk->type == LINK_PID);
-
+
+ if (IS_TRACED_FL(psc->port, F_TRACE_PORTS))
+ trace_port(psc->port, am_unlink, lnk->pid);
+
if (is_external_pid(lnk->pid)) {
dep = external_pid_dist_entry(lnk->pid);
if(dep != erts_this_dist_entry) {
@@ -3648,9 +3975,9 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
case ERTS_DSIG_PREP_NOT_CONNECTED:
break;
case ERTS_DSIG_PREP_CONNECTED:
- erts_remove_dist_link(&dld, psc->port, lnk->pid, dep);
+ erts_remove_dist_link(&dld, port_id, lnk->pid, dep);
erts_destroy_dist_link(&dld);
- code = erts_dsig_send_exit(&dsd, psc->port, lnk->pid,
+ code = erts_dsig_send_exit(&dsd, port_id, lnk->pid,
psc->reason);
ASSERT(code == ERTS_DSIG_SEND_OK);
break;
@@ -3664,23 +3991,25 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
ASSERT(is_internal_pid(lnk->pid));
rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks);
if (rp) {
- ErtsLink *rlnk = erts_remove_link(&ERTS_P_LINKS(rp), psc->port);
+ ErtsLink *rlnk = erts_remove_link(&ERTS_P_LINKS(rp), port_id);
if (rlnk) {
int xres = erts_send_exit_signal(NULL,
- psc->port,
+ port_id,
rp,
&rp_locks,
psc->reason,
NIL,
NULL,
0);
- if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
+ if (xres >= 0) {
+ if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) {
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND);
+ rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND;
+ }
/* We didn't exit the process and it is traced */
- if (IS_TRACED_FL(rp, F_TRACE_PROCS)) {
- trace_proc(NULL, rp, am_getting_unlinked,
- psc->port);
- }
+ if (IS_TRACED_FL(rp, F_TRACE_PROCS))
+ trace_proc(NULL, 0, rp, am_getting_unlinked, port_id);
}
erts_destroy_link(rlnk);
}
@@ -3702,7 +4031,8 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
*/
int
-erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed)
+erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed,
+ int drop_normal)
{
ErtsLink *lnk;
Eterm rreason;
@@ -3732,8 +4062,10 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed)
| ERTS_PORT_SFLG_CLOSING))
return 0;
- if (reason == am_normal && from != ERTS_PORT_GET_CONNECTED(p) && from != p->common.id)
+ if (reason == am_normal && from != ERTS_PORT_GET_CONNECTED(p)
+ && from != p->common.id && drop_normal) {
return 0;
+ }
set_state_flags = ERTS_PORT_SFLG_EXITING;
if (send_closed)
@@ -3744,9 +4076,8 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed)
state = erts_atomic32_read_bor_mb(&p->state, set_state_flags);
state |= set_state_flags;
- if (IS_TRACED_FL(p, F_TRACE_PORTS)) {
+ if (IS_TRACED_FL(p, F_TRACE_PORTS))
trace_port(p, am_closed, reason);
- }
erts_trace_check_exiting(p->common.id);
@@ -3756,7 +4087,7 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed)
(void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name);
{
- SweepContext sc = {p->common.id, rreason};
+ SweepContext sc = {p, rreason};
lnk = ERTS_P_LINKS(p);
ERTS_P_LINKS(p) = NULL;
erts_sweep_links(lnk, &sweep_one_link, &sc);
@@ -3878,9 +4209,22 @@ call_driver_control(Eterm caller,
command, size);
}
#endif
-
+
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, caller, am_control, command, bufp, size);
+
ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_control)) {
+ lttng_decl_procbuf(proc_str);
+ lttng_decl_portbuf(port_str);
+ lttng_pid_to_str(caller, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG5(driver_control, proc_str, port_str, prt->name, command, size);
+ }
+#endif
+
prt->caller = caller;
cres = prt->drv_ptr->control((ErlDrvData) prt->drv_data,
command,
@@ -3895,6 +4239,9 @@ call_driver_control(Eterm caller,
if (cres < 0)
return ERTS_PORT_OP_BADARG;
+ if (IS_TRACED_FL(prt, F_TRACE_SEND))
+ trace_port_send_binary(prt, caller, am_control, *resp_bufp, cres);
+
*from_size = (ErlDrvSizeT) cres;
return ERTS_PORT_OP_DONE;
@@ -4045,10 +4392,11 @@ port_sig_control(Port *prt,
&factory.hp,
factory.off_heap);
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
sigdp->ref,
- msg);
+ msg,
+ prt);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -4059,7 +4407,7 @@ port_sig_control(Port *prt,
/* failure */
if (sigdp->caller != ERTS_INVALID_PID)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg, prt);
done:
@@ -4294,6 +4642,18 @@ call_driver_call(Eterm caller,
DTRACE5(driver_call, process_str, port_str, prt->name, command, size);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_call)) {
+ lttng_decl_procbuf(proc_str);
+ lttng_decl_portbuf(port_str);
+ lttng_pid_to_str(caller,proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG5(driver_call, proc_str, port_str, prt->name, command, size);
+ }
+#endif
+
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, caller, am_call, command, bufp, size);
ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
@@ -4313,6 +4673,9 @@ call_driver_call(Eterm caller,
|| ((byte) (*resp_bufp)[0]) != VERSION_MAGIC)
return ERTS_PORT_OP_BADARG;
+ if (IS_TRACED_FL(prt, F_TRACE_SEND))
+ trace_port_send_binary(prt, caller, am_call, *resp_bufp, cres);
+
*from_size = (ErlDrvSizeT) cres;
return ERTS_PORT_OP_DONE;
@@ -4373,7 +4736,7 @@ port_sig_call(Port *prt,
(void) erts_factory_message_create(&factory, rp, &rp_locks, hsz);
endp = (byte *) resp_bufp;
- msg = erts_decode_ext(&factory, &endp);
+ msg = erts_decode_ext(&factory, &endp, 0);
if (is_value(msg)) {
hp = erts_produce_heap(&factory,
3,
@@ -4381,10 +4744,11 @@ port_sig_call(Port *prt,
msg = TUPLE2(hp, am_ok, msg);
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
sigdp->ref,
- msg);
+ msg,
+ prt);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -4396,7 +4760,7 @@ port_sig_call(Port *prt,
}
}
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg, prt);
done:
@@ -4492,7 +4856,7 @@ erts_port_call(Process* c_p,
hsz += 3;
erts_factory_proc_prealloc_init(&factory, c_p, hsz);
endp = (byte *) resp_bufp;
- term = erts_decode_ext(&factory, &endp);
+ term = erts_decode_ext(&factory, &endp, 0);
if (term == THE_NON_VALUE)
return ERTS_PORT_OP_BADARG;
hp = erts_produce_heap(&factory,3,0);
@@ -4611,7 +4975,7 @@ port_sig_info(Port *prt,
{
ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY);
if (op != ERTS_PROC2PORT_SIG_EXEC)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_undefined);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_undefined, prt);
else {
Eterm *hp, *hp_start;
Uint hsz;
@@ -4637,10 +5001,11 @@ port_sig_info(Port *prt,
mp->data.heap_frag = bp;
erts_factory_selfcontained_message_init(&factory, mp, hp);
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
sigdp->ref,
- value);
+ value,
+ prt);
}
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -4756,7 +5121,8 @@ reply_io_bytes(void *vreq)
eout = erts_bld_uint64(&hp, NULL, out);
msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout);
- erts_queue_message(rp, &rp_locks, mp, msg, NIL);
+
+ erts_queue_message(rp, rp_locks, mp, msg, am_system);
if (req->sched_id == sched_id)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -4774,7 +5140,7 @@ erts_request_io_bytes(Process *c_p)
Uint *hp;
Eterm ref;
Uint32 *refn;
- ErtsSchedulerData *esdp = ERTS_PROC_GET_SCHDATA(c_p);
+ ErtsSchedulerData *esdp = erts_proc_sched_data(c_p);
ErtsIOBytesReq *req = erts_alloc(ERTS_ALC_T_IOB_REQ,
sizeof(ErtsIOBytesReq));
@@ -5018,7 +5384,7 @@ void erts_raw_port_command(Port* p, byte* buf, Uint len)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
if (len > (Uint) INT_MAX)
- erl_exit(ERTS_ABORT_EXIT,
+ erts_exit(ERTS_ABORT_EXIT,
"Absurdly large data buffer (%beu bytes) passed to"
"output callback of %s driver.\n",
len,
@@ -5056,6 +5422,15 @@ int async_ready(Port *p, void* data)
DTRACE3(driver_ready_async, process_str, port_str, p->name);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_ready_async)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(p), proc_str);
+ lttng_port_to_str(p, port_str);
+ LTTNG3(driver_ready_async, proc_str, port_str, p->name);
+ }
+#endif
(*p->drv_ptr->ready_async)((ErlDrvData)p->drv_data, data);
need_free = 0;
ERTS_MSACC_POP_STATE_M();
@@ -5218,6 +5593,7 @@ void driver_report_exit(ErlDrvPort ix, int status)
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
Port* prt = erts_drvport2port(ix);
+ int trace_send = IS_TRACED_FL(prt, F_TRACE_SEND);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return;
@@ -5234,13 +5610,17 @@ void driver_report_exit(ErlDrvPort ix, int status)
if (!rp)
return;
- mp = erts_alloc_message_heap(rp, &rp_locks, 3+3, &hp, &ohp);
+ mp = erts_alloc_message_heap(trace_send ? NULL : rp, &rp_locks, 3+3, &hp, &ohp);
tuple = TUPLE2(hp, am_exit_status, make_small(status));
hp += 3;
tuple = TUPLE2(hp, prt->common.id, tuple);
- erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
+ if (IS_TRACED_FL(prt, F_TRACE_SEND))
+ trace_port_send(prt, pid, tuple, 1);
+
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
+ erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -5334,13 +5714,13 @@ cleanup_b2t_states(struct b2t_states__ *b2tsp)
*/
static int
-driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
+driver_deliver_term(Port *prt, Eterm to, ErlDrvTermData* data, int len)
{
#define HEAP_EXTRA 200
#define ERTS_DDT_FAIL do { res = -1; goto done; } while (0)
Uint need = 0;
int depth = 0;
- int res;
+ int res = 0;
ErlDrvTermData* ptr;
ErlDrvTermData* ptr_end;
DECLARE_ESTACK(stack);
@@ -5559,11 +5939,26 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
? erts_proc_lookup(to)
: erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_INC_REFC));
if (!rp) {
- res = 0;
- goto done;
- }
+ if (!prt || !IS_TRACED_FL(prt, F_TRACE_SEND))
+ goto done;
+ if (!erts_is_tracer_proc_enabled_send(NULL, 0, &prt->common))
+ goto done;
- (void) erts_factory_message_create(&factory, rp, &rp_locks, need);
+ res = -2;
+
+ /* We allocate a temporary heap to be used to create
+ the message that may be sent using tracing */
+ erts_factory_tmp_init(&factory, erts_alloc(ERTS_ALC_T_DRIVER, need*sizeof(Eterm)),
+ need, ERTS_ALC_T_DRIVER);
+
+ } else {
+ /* We force the creation of a heap fragment (rp == NULL) when send
+ tracing so that we don't have the main lock of the process while
+ tracing */
+ Process *trace_rp = prt && IS_TRACED_FL(prt, F_TRACE_SEND) ? NULL : rp;
+ (void) erts_factory_message_create(&factory, trace_rp, &rp_locks, need);
+ res = 1;
+ }
/*
* Interpret the instructions and build the term.
@@ -5826,22 +6221,45 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
ESTACK_PUSH(stack, mess);
}
- res = 1;
-
done:
if (res > 0) {
+ Eterm from = am_undefined;
mess = ESTACK_POP(stack); /* get resulting value */
erts_factory_trim_and_close(&factory, &mess, 1);
+
+ if (prt) {
+ if (IS_TRACED_FL(prt, F_TRACE_SEND)) {
+ trace_port_send(prt, to, mess, 1);
+ }
+ from = prt->common.id;
+ }
+
/* send message */
- erts_queue_message(rp, &rp_locks, factory.message, mess, am_undefined);
+ ERL_MESSAGE_TOKEN(factory.message) = am_undefined;
+ erts_queue_message(rp, rp_locks, factory.message, mess, from);
+ }
+ else if (res == -2) {
+ /* this clause only happens when we were requested to
+ generate a send trace, but the process to send to
+ did not exist any more */
+ mess = ESTACK_POP(stack); /* get resulting value */
+
+ trace_port_send(prt, to, mess, 0);
+
+ erts_factory_trim_and_close(&factory, &mess, 1);
+ erts_free(ERTS_ALC_T_DRIVER, factory.hp_start);
+ res = 0;
}
else {
if (b2t.ix > b2t.used)
b2t.used = b2t.ix;
for (b2t.ix = 0; b2t.ix < b2t.used; b2t.ix++)
erts_binary2term_abort(&b2t.state[b2t.ix]);
- erts_factory_undo(&factory);
+ if (factory.mode != FACTORY_CLOSED) {
+ ERL_MESSAGE_TERM(factory.message) = am_undefined;
+ erts_factory_undo(&factory);
+ }
}
if (rp) {
if (rp_locks)
@@ -5857,7 +6275,8 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
}
static ERTS_INLINE int
-deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p)
+deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p,
+ Port **trace_prt)
{
#ifdef ERTS_SMP
ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
@@ -5885,8 +6304,11 @@ deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p)
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
erts_thr_progress_unmanaged_continue(dhndl);
ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
- }
+ } else
#endif
+ {
+ *trace_prt = prt;
+ }
ERTS_SMP_LC_ASSERT(dhndl == ERTS_THR_PRGR_DHANDLE_MANAGED
? erts_lc_is_port_locked(prt)
: !erts_lc_is_port_locked(prt));
@@ -5897,10 +6319,11 @@ int erl_drv_output_term(ErlDrvTermData port_id, ErlDrvTermData* data, int len)
{
/* May be called from arbitrary thread */
Eterm connected;
- int res = deliver_term_check_port(port_id, &connected);
+ Port *prt = NULL;
+ int res = deliver_term_check_port(port_id, &connected, &prt);
if (res <= 0)
return res;
- return driver_deliver_term(connected, data, len);
+ return driver_deliver_term(prt, connected, data, len);
}
/*
@@ -5924,7 +6347,7 @@ driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len)
if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
- return driver_deliver_term(ERTS_PORT_GET_CONNECTED(prt), data, len);
+ return driver_deliver_term(prt, ERTS_PORT_GET_CONNECTED(prt), data, len);
}
int erl_drv_send_term(ErlDrvTermData port_id,
@@ -5933,10 +6356,11 @@ int erl_drv_send_term(ErlDrvTermData port_id,
int len)
{
/* May be called from arbitrary thread */
- int res = deliver_term_check_port(port_id, NULL);
+ Port *prt = NULL;
+ int res = deliver_term_check_port(port_id, NULL, &prt);
if (res <= 0)
return res;
- return driver_deliver_term(to, data, len);
+ return driver_deliver_term(prt, to, data, len);
}
/*
@@ -5955,20 +6379,21 @@ driver_send_term(ErlDrvPort drvport,
* to make this access safe without using a less efficient
* internal data representation for ErlDrvPort.
*/
+ Port* prt = NULL;
ERTS_SMP_CHK_NO_PROC_LOCKS;
#ifdef ERTS_SMP
if (erts_thr_progress_is_managed_thread())
#endif
{
erts_aint32_t state;
- Port* prt = erts_drvport2port_state(drvport, &state);
+ prt = erts_drvport2port_state(drvport, &state);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1; /* invalid (dead) */
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
}
- return driver_deliver_term(to, data, len);
+ return driver_deliver_term(prt, to, data, len);
}
@@ -7102,6 +7527,15 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
DTRACE3(driver_process_exit, process_str, port_str, prt->name);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_process_exit)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(prt), proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG3(driver_process_exit, proc_str, port_str, prt->name);
+ }
+#endif
fpe_was_unmasked = erts_block_fpe();
(*callback)((ErlDrvData) (prt->drv_data), &drv_monitor);
erts_unblock_fpe(fpe_was_unmasked);
@@ -7135,7 +7569,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
if (state & ERTS_PORT_SFLG_CLOSING) {
terminate_port(prt);
} else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) {
- deliver_result(prt->common.id, ERTS_PORT_GET_CONNECTED(prt), am_eof);
+ deliver_result(prt, prt->common.id, ERTS_PORT_GET_CONNECTED(prt), am_eof);
} else {
/* XXX UGLY WORK AROUND, Let erts_deliver_port_exit() terminate the port */
if (prt->port_data_lock)
@@ -7143,7 +7577,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
prt->ioq.size = 0;
if (prt->port_data_lock)
driver_pdl_unlock(prt->port_data_lock);
- erts_deliver_port_exit(prt, prt->common.id, eof ? am_normal : term, 0);
+ erts_deliver_port_exit(prt, prt->common.id, eof ? am_normal : term, 0, 0);
}
return 0;
}
@@ -7396,7 +7830,7 @@ driver_system_info(ErlDrvSysInfo *sip, size_t si_size)
* of ErlDrvSysInfo (introduced in driver version 1.0).
*/
if (!sip || si_size < ERL_DRV_SYS_INFO_SIZE(smp_support))
- erl_exit(1,
+ erts_exit(ERTS_ERROR_EXIT,
"driver_system_info(%p, %ld) called with invalid arguments\n",
sip, si_size);
@@ -7447,11 +7881,13 @@ driver_system_info(ErlDrvSysInfo *sip, size_t si_size)
* (driver version 3.1, NIF version 2.7)
*/
if (si_size >= ERL_DRV_SYS_INFO_SIZE(dirty_scheduler_support)) {
-#if defined(ERL_NIF_DIRTY_SCHEDULER_SUPPORT) && defined(USE_THREADS)
- sip->dirty_scheduler_support = 1;
+ sip->dirty_scheduler_support =
+#ifdef ERTS_DIRTY_SCHEDULERS
+ 1
#else
- sip->dirty_scheduler_support = 0;
+ 0
#endif
+ ;
}
}
@@ -7579,6 +8015,8 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
int fpe_was_unmasked = erts_block_fpe();
DTRACE4(driver_init, drv->name, drv->version.major, drv->version.minor,
drv->flags);
+ LTTNG4(driver_init, drv->name, drv->version.major, drv->version.minor,
+ drv->flags);
res = (*de->init)();
erts_unblock_fpe(fpe_was_unmasked);
return res;