aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c1008
1 files changed, 765 insertions, 243 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 81a05a6b87..0377f6cb5e 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2014. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2016. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -47,16 +47,18 @@
#define ERTS_WANT_EXTERNAL_TAGS
#include "external.h"
#include "dtrace-wrapper.h"
+#include "lttng-wrapper.h"
#include "erl_map.h"
#include "erl_bif_unique.h"
#include "erl_hl_timer.h"
#include "erl_time.h"
extern ErlDrvEntry fd_driver_entry;
-#ifndef __OSE__
extern ErlDrvEntry vanilla_driver_entry;
-#endif
extern ErlDrvEntry spawn_driver_entry;
+#ifndef __WIN32__
+extern ErlDrvEntry forker_driver_entry;
+#endif
extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */
erts_driver_t *driver_list; /* List of all drivers, static and dynamic. */
@@ -74,6 +76,9 @@ const Port erts_invalid_port = {{ERTS_INVALID_PORT}};
erts_driver_t vanilla_driver;
erts_driver_t spawn_driver;
+#ifndef __WIN32__
+erts_driver_t forker_driver;
+#endif
erts_driver_t fd_driver;
int erts_port_synchronous_ops = 0;
@@ -83,10 +88,11 @@ int erts_port_parallelism = 0;
static erts_atomic64_t bytes_in;
static erts_atomic64_t bytes_out;
-static void deliver_result(Eterm sender, Eterm pid, Eterm res);
+static void deliver_result(Port *p, Eterm sender, Eterm pid, Eterm res);
static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *);
static void terminate_port(Port *p);
static void pdl_init(void);
+static int driver_failure_term(ErlDrvPort ix, Eterm term, int eof);
#ifdef ERTS_SMP
static void driver_monitor_lock_pdl(Port *p);
static void driver_monitor_unlock_pdl(Port *p);
@@ -206,6 +212,7 @@ static ERTS_INLINE void
kill_port(Port *pp)
{
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ ERTS_TRACER_CLEAR(&ERTS_TRACER(pp));
erts_ptab_delete_element(&erts_port, &pp->common); /* Time of death */
erts_port_task_free_port(pp);
/* In non-smp case the port structure may have been deallocated now */
@@ -308,12 +315,9 @@ static Port *create_port(char *name,
size_t port_size, busy_port_queue_size, size;
erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED;
erts_aint32_t x_pts_flgs = 0;
-#ifdef DEBUG
- /* Make sure the debug flags survives until port is freed */
- state |= ERTS_PORT_SFLG_PORT_DEBUG;
-#endif
#ifdef ERTS_SMP
+ ErtsRunQueue *runq;
if (!driver_lock) {
/* Align size for mutex following port struct */
port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
@@ -323,6 +327,12 @@ static Port *create_port(char *name,
#endif
port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
+#ifdef DEBUG
+ /* Make sure the debug flags survives until port is freed */
+ state |= ERTS_PORT_SFLG_PORT_DEBUG;
+#endif
+
+
busy_port_queue_size
= ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ)
? 0
@@ -358,8 +368,12 @@ static Port *create_port(char *name,
p += sizeof(erts_mtx_t);
state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
}
- erts_smp_atomic_set_nob(&prt->run_queue,
- (erts_aint_t) erts_get_runq_current(NULL));
+ if (erts_get_scheduler_data())
+ runq = erts_get_runq_current(NULL);
+ else
+ runq = ERTS_RUNQ_IX(0);
+ erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq);
+
prt->xports = NULL;
#else
erts_atomic32_init_nob(&prt->refc, 1);
@@ -385,12 +399,13 @@ static Port *create_port(char *name,
prt->common.u.alive.reg = NULL;
ERTS_PTMR_INIT(prt);
erts_port_task_handle_init(&prt->timeout_task);
- prt->psd = NULL;
+ erts_smp_atomic_init_nob(&prt->psd, (erts_aint_t) NULL);
+ prt->async_open_port = NULL;
prt->drv_data = (SWord) 0;
prt->os_pid = -1;
/* Set default tracing */
- erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt));
+ erts_get_default_port_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER(prt));
ERTS_CT_ASSERT(offsetof(Port,common) == 0);
@@ -467,6 +482,11 @@ erts_port_free(Port *prt)
erts_port_task_fini_sched(&prt->sched);
+ if (prt->async_open_port) {
+ erts_free(ERTS_ALC_T_PRTSD, prt->async_open_port);
+ prt->async_open_port = NULL;
+ }
+
#ifdef ERTS_SMP
ASSERT(prt->lock);
if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK)
@@ -688,8 +708,9 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
error_number = error_type = 0;
if (driver->start) {
+ ERTS_MSACC_PUSH_STATE_M();
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
- trace_sched_ports_where(port, am_in, am_start);
+ trace_sched_ports_where(port, am_in, am_open);
}
port->caller = pid;
#ifdef USE_VM_PROBES
@@ -698,6 +719,19 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
DTRACE3(driver_start, process_str, driver->name, port_str);
}
#endif
+
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
+
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_start)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(pid, proc_str);
+ lttng_port_to_str(port, port_str);
+ LTTNG3(driver_start, proc_str, driver->name, port_str);
+ }
+#endif
+
fpe_was_unmasked = erts_block_fpe();
drv_data = (*driver->start)(ERTS_Port2ErlDrvPort(port), name, opts);
if (((SWord) drv_data) == -1)
@@ -717,9 +751,10 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
}
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
port->caller = NIL;
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
- trace_sched_ports_where(port, am_out, am_start);
+ trace_sched_ports_where(port, am_out, am_open);
}
#ifdef ERTS_SMP
if (port->xports)
@@ -1106,7 +1141,7 @@ io_list_vec_len(Eterm obj, int* vsize, Uint* csize,
Uint p_v_size = 0;
Uint p_c_size = 0;
Uint p_in_clist = 0;
- Uint total; /* Uint due to halfword emulator */
+ Uint total;
goto L_jump_start; /* avoid a push */
@@ -1266,7 +1301,7 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp)
reds_left_in = CONTEXT_REDS/10;
else {
if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS))
- trace_virtual_sched(c_p, am_out);
+ trace_sched(c_p, ERTS_PROC_LOCK_MAIN, am_out);
/*
* No status lock held while sending runnable
* proc trace messages. It is however not needed
@@ -1350,7 +1385,7 @@ finalize_imm_drv_call(ErtsTryImmDrvCallState *sp)
}
if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS))
- trace_virtual_sched(c_p, am_in);
+ trace_sched(c_p, ERTS_PROC_LOCK_MAIN, am_in);
/*
* No status lock held while sending runnable
* proc trace messages. It is however not needed
@@ -1397,10 +1432,11 @@ finalize_force_imm_drv_call(ErtsTryImmDrvCallState *sp)
static ERTS_INLINE void
queue_port_sched_op_reply(Process *rp,
- ErtsProcLocks *rp_locksp,
+ ErtsProcLocks rp_locks,
ErtsHeapFactory* factory,
Uint32 *ref_num,
- Eterm msg)
+ Eterm msg,
+ Port* prt)
{
Eterm* hp = erts_produce_heap(factory, ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE, 0);
Eterm ref;
@@ -1413,20 +1449,18 @@ queue_port_sched_op_reply(Process *rp,
erts_factory_trim_and_close(factory, &msg, 1);
- erts_queue_message(rp, rp_locksp, factory->heap_frags, msg, NIL);
+ erts_queue_message(rp, rp_locks, factory->message, msg,
+ prt ? prt->common.id : am_undefined);
}
static void
-port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
+port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg, Port* prt)
{
Process *rp = erts_proc_lookup_raw(to);
if (rp) {
- ErlOffHeap *ohp;
- ErlHeapFragment* bp;
ErtsHeapFactory factory;
Eterm msg_copy;
Uint hsz, msg_sz;
- Eterm *hp;
ErtsProcLocks rp_locks = 0;
hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
@@ -1437,24 +1471,20 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
hsz += msg_sz;
}
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
- if (is_immed(msg))
- msg_copy = msg;
- else {
- msg_copy = copy_struct(msg, msg_sz, &hp, ohp);
- factory.hp = hp;
- }
+ (void) erts_factory_message_create(&factory, rp,
+ &rp_locks, hsz);
+ msg_copy = (is_immed(msg)
+ ? msg
+ : copy_struct(msg, msg_sz,
+ &factory.hp,
+ factory.off_heap));
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
ref_num,
- msg_copy);
+ msg_copy,
+ prt);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -1509,9 +1539,8 @@ erts_schedule_proc2port_signal(Process *c_p,
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
c_p->msg.save = c_p->msg.last;
- erts_smp_proc_unlock(c_p,
- (ERTS_PROC_LOCK_MAIN
- | ERTS_PROC_LOCKS_MSG_RECEIVE));
+ erts_smp_proc_unlock(c_p, (ERTS_PROC_LOCKS_MSG_RECEIVE
+ | ERTS_PROC_LOCK_MAIN));
}
@@ -1529,13 +1558,44 @@ erts_schedule_proc2port_signal(Process *c_p,
erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MAIN);
if (sched_res != 0) {
- if (refp)
+ if (refp) {
+ /*
+ * We need to restore the message queue save
+ * pointer to the beginning of the message queue
+ * since the caller now wont wait for a message
+ * containing the reference created above...
+ */
+ ASSERT(c_p);
+ erts_smp_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
+ JOIN_MESSAGE(c_p);
+ erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
*refp = NIL;
+ }
return ERTS_PORT_OP_DROPPED;
}
return ERTS_PORT_OP_SCHEDULED;
}
+static int
+erts_schedule_port2port_signal(Eterm port_num, ErtsProc2PortSigData *sigdp,
+ int task_flags,
+ ErtsProc2PortSigCallback callback)
+{
+ Port *prt = erts_port_lookup_raw(port_num);
+
+ if (!prt)
+ return -1;
+
+ sigdp->caller = ERTS_INVALID_PID;
+
+ return erts_port_task_schedule(prt->common.id,
+ NULL,
+ ERTS_PORT_TASK_PROC_SIG,
+ sigdp,
+ callback,
+ task_flags);
+}
+
static ERTS_INLINE void
send_badsig(Port *prt) {
ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
@@ -1594,7 +1654,7 @@ port_badsig(Port *prt, erts_aint32_t state, int op,
state,
sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT);
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg, prt);
return ERTS_PORT_REDS_BADSIG;
} /* port_badsig */
/* bad_port_signal() will
@@ -1680,16 +1740,30 @@ call_driver_outputv(int bang_op,
else {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ErlDrvSizeT size = evp->size;
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
+
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, caller, am_commandv, evp);
+
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_outputv)) {
DTRACE_FORMAT_COMMON_PID_AND_PORT(caller, prt);
DTRACE4(driver_outputv, process_str, port_str, prt->name, size);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_outputv)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(caller, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG4(driver_outputv, proc_str, port_str, prt->name, size);
+ }
+#endif
prt->caller = caller;
(*drv->outputv)((ErlDrvData) prt->drv_data, evp);
@@ -1700,6 +1774,8 @@ call_driver_outputv(int bang_op,
esdp->io.out += (Uint64) size;
else
erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size);
+
+ ERTS_MSACC_POP_STATE_M();
}
}
@@ -1713,7 +1789,6 @@ cleanup_scheduled_outputv(ErlIOVec *ev, ErlDrvBinary *cbinp)
driver_free_binary(ev->binv[i]);
if (cbinp)
driver_free_binary(cbinp);
- erts_free(ERTS_ALC_T_DRV_CMD_DATA, ev);
}
static int
@@ -1748,7 +1823,7 @@ port_sig_outputv(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *s
}
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, reply);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, reply, prt);
cleanup_scheduled_outputv(sigdp->u.outputv.evp,
sigdp->u.outputv.cbinp);
@@ -1780,6 +1855,7 @@ call_driver_output(int bang_op,
send_badsig(prt);
else {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
@@ -1789,6 +1865,18 @@ call_driver_output(int bang_op,
DTRACE4(driver_output, process_str, port_str, prt->name, size);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_output)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(caller, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG4(driver_output, proc_str, port_str, prt->name, size);
+ }
+#endif
+
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, caller, am_command, bufp, size);
prt->caller = caller;
(*drv->output)((ErlDrvData) prt->drv_data, bufp, size);
@@ -1799,6 +1887,8 @@ call_driver_output(int bang_op,
esdp->io.out += (Uint64) size;
else
erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size);
+
+ ERTS_MSACC_POP_STATE_M();
}
}
@@ -1841,13 +1931,195 @@ port_sig_output(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *si
}
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, reply);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, reply, prt);
cleanup_scheduled_output(sigdp->u.output.bufp);
return ERTS_PORT_REDS_CMD_OUTPUT;
}
+
+/*
+ * This erts_port_output will always create a port task.
+ * The call is treated as a port_command call, i.e. no
+ * badsig i generated if the input in invalid. However
+ * an error_logger message is generated.
+ */
+int
+erts_port_output_async(Port *prt, Eterm from, Eterm list)
+{
+
+ ErtsPortOpResult res;
+ ErtsProc2PortSigData *sigdp;
+ erts_driver_t *drv = prt->drv_ptr;
+ size_t size;
+ int task_flags;
+ ErtsProc2PortSigCallback port_sig_callback;
+ ErlDrvBinary *cbin = NULL;
+ ErlIOVec *evp = NULL;
+ char *buf = NULL;
+ ErtsPortTaskHandle *ns_pthp;
+
+ if (drv->outputv) {
+ ErlIOVec ev;
+ SysIOVec* ivp;
+ ErlDrvBinary** bvp;
+ int vsize;
+ Uint csize;
+ Uint pvsize;
+ Uint pcsize;
+ size_t iov_offset, binv_offset, alloc_size;
+ Uint blimit = 0;
+ char *ptr;
+ int i;
+
+ Eterm* bptr = NULL;
+ Uint offset;
+
+ if (is_binary(list)) {
+ /* We optimize for when we get a procbin without offset */
+ Eterm real_bin;
+ int bitoffs;
+ int bitsize;
+ ERTS_GET_REAL_BIN(list, real_bin, offset, bitoffs, bitsize);
+ bptr = binary_val(real_bin);
+ if (*bptr == HEADER_PROC_BIN && bitoffs == 0) {
+ size = binary_size(list);
+ vsize = 1;
+ } else
+ bptr = NULL;
+ }
+
+ if (!bptr) {
+ if (io_list_vec_len(list, &vsize, &csize, &pvsize, &pcsize, &size))
+ goto bad_value;
+
+ /* To pack or not to pack (small binaries) ...? */
+ if (vsize >= SMALL_WRITE_VEC) {
+ /* Do pack */
+ vsize = pvsize + 1;
+ csize = pcsize;
+ blimit = ERL_SMALL_IO_BIN_LIMIT;
+ }
+ cbin = driver_alloc_binary(csize);
+ if (!cbin)
+ erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
+ }
+
+
+ iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlIOVec));
+ binv_offset = iov_offset;
+ binv_offset += ERTS_ALC_DATA_ALIGN_SIZE((vsize+1)*sizeof(SysIOVec));
+ alloc_size = binv_offset;
+ alloc_size += (vsize+1)*sizeof(ErlDrvBinary *);
+
+ sigdp = erts_port_task_alloc_p2p_sig_data_extra(alloc_size, (void**)&ptr);
+
+ evp = (ErlIOVec *) ptr;
+ ivp = evp->iov = (SysIOVec *) (ptr + iov_offset);
+ bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset);
+
+ ivp[0].iov_base = NULL;
+ ivp[0].iov_len = 0;
+ bvp[0] = NULL;
+
+ if (bptr) {
+ ProcBin* pb = (ProcBin *) bptr;
+
+ ivp[1].iov_base = pb->bytes+offset;
+ ivp[1].iov_len = size;
+ bvp[1] = Binary2ErlDrvBinary(pb->val);
+
+ evp->vsize = 1;
+ } else {
+
+ evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
+ if (evp->vsize < 0) {
+ if (evp != &ev)
+ erts_free(ERTS_ALC_T_DRV_CMD_DATA, evp);
+ driver_free_binary(cbin);
+ goto bad_value;
+ }
+ }
+#if 0
+ /* This assertion may say something useful, but it can
+ be falsified during the emulator test suites. */
+ ASSERT(evp->vsize == vsize);
+#endif
+ evp->vsize++;
+ evp->size = size; /* total size */
+
+ /* Need to increase refc on all binaries */
+ for (i = 1; i < evp->vsize; i++)
+ if (bvp[i])
+ driver_binary_inc_refc(bvp[i]);
+
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
+ sigdp->u.outputv.from = from;
+ sigdp->u.outputv.evp = evp;
+ sigdp->u.outputv.cbinp = cbin;
+ port_sig_callback = port_sig_outputv;
+ } else {
+ ErlDrvSizeT ERTS_DECLARE_DUMMY(r);
+
+ /*
+ * Apperently there exist code that write 1 byte to
+ * much in buffer. Where it resides I don't know, but
+ * we can live with one byte extra allocated...
+ */
+
+ if (erts_iolist_size(list, &size))
+ goto bad_value;
+
+ buf = erts_alloc(ERTS_ALC_T_DRV_CMD_DATA, size + 1);
+
+ r = erts_iolist_to_buf(list, buf, size);
+ ASSERT(ERTS_IOLIST_TO_BUF_SUCCEEDED(r));
+
+ sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUT;
+ sigdp->u.output.from = from;
+ sigdp->u.output.bufp = buf;
+ sigdp->u.output.size = size;
+ port_sig_callback = port_sig_output;
+ }
+ sigdp->flags = 0;
+ ns_pthp = NULL;
+ task_flags = 0;
+
+ res = erts_schedule_proc2port_signal(NULL,
+ prt,
+ ERTS_INVALID_PID,
+ NULL,
+ sigdp,
+ task_flags,
+ ns_pthp,
+ port_sig_callback);
+
+ if (res != ERTS_PORT_OP_SCHEDULED) {
+ if (drv->outputv)
+ cleanup_scheduled_outputv(evp, cbin);
+ else
+ cleanup_scheduled_output(buf);
+ return 1;
+ }
+ return 1;
+
+bad_value:
+
+ /*
+ * We call badsig directly here as this function is called with
+ * the main lock of the calling process still held.
+ * At the moment this operation is always not a bang_op, so
+ * only an error_logger message should be generated, no badsig.
+ */
+
+ badsig_received(0, prt, erts_atomic32_read_nob(&prt->state), 1);
+
+ return 0;
+
+}
+
ErtsPortOpResult
erts_port_output(Process *c_p,
int flags,
@@ -1857,7 +2129,7 @@ erts_port_output(Process *c_p,
Eterm *refp)
{
ErtsPortOpResult res;
- ErtsProc2PortSigData *sigdp;
+ ErtsProc2PortSigData *sigdp = NULL;
erts_driver_t *drv = prt->drv_ptr;
size_t size;
int try_call;
@@ -1910,7 +2182,6 @@ erts_port_output(Process *c_p,
DTRACE4(port_command, process_str, port_str, prt->name, "command");
}
#endif
-
if (drv->outputv) {
ErlIOVec ev;
SysIOVec iv[SMALL_WRITE_VEC];
@@ -1939,10 +2210,13 @@ erts_port_output(Process *c_p,
evp = &ev;
}
else {
- char *ptr = erts_alloc((try_call
- ? ERTS_ALC_T_TMP
- : ERTS_ALC_T_DRV_CMD_DATA), alloc_size);
-
+ char *ptr;
+ if (try_call) {
+ ptr = erts_alloc(ERTS_ALC_T_TMP, alloc_size);
+ } else {
+ sigdp = erts_port_task_alloc_p2p_sig_data_extra(
+ alloc_size, (void**)&ptr);
+ }
evp = (ErlIOVec *) ptr;
ivp = evp->iov = (SysIOVec *) (ptr + iov_offset);
bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset);
@@ -1971,9 +2245,12 @@ erts_port_output(Process *c_p,
bvp[0] = NULL;
evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
if (evp->vsize < 0) {
- if (evp != &ev)
- erts_free(try_call ? ERTS_ALC_T_TMP : ERTS_ALC_T_DRV_CMD_DATA,
- evp);
+ if (evp != &ev) {
+ if (try_call)
+ erts_free(ERTS_ALC_T_TMP, evp);
+ else
+ erts_port_task_free_p2p_sig_data(sigdp);
+ }
driver_free_binary(cbin);
goto bad_value;
}
@@ -2025,8 +2302,10 @@ erts_port_output(Process *c_p,
/* Fall through... */
case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
driver_free_binary(cbin);
- if (evp != &ev)
+ if (evp != &ev) {
+ ASSERT(!sigdp);
erts_free(ERTS_ALC_T_TMP, evp);
+ }
if (try_call_res != ERTS_TRY_IMM_DRV_CALL_OK)
return ERTS_PORT_OP_DROPPED;
if (c_p)
@@ -2037,8 +2316,10 @@ erts_port_output(Process *c_p,
if (async_nosuspend
&& (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))) {
driver_free_binary(cbin);
- if (evp != &ev)
+ if (evp != &ev) {
+ ASSERT(!sigdp);
erts_free(ERTS_ALC_T_TMP, evp);
+ }
return ((sched_flags & ERTS_PTS_FLG_EXIT)
? ERTS_PORT_OP_DROPPED
: ERTS_PORT_OP_BUSY);
@@ -2053,9 +2334,16 @@ erts_port_output(Process *c_p,
if (bvp[i])
driver_binary_inc_refc(bvp[i]);
- new_evp = erts_alloc(ERTS_ALC_T_DRV_CMD_DATA, alloc_size);
+ /* The port task and iovec is allocated in the
+ same structure as an optimization. This
+ is especially important in erts_port_output_async
+ of when !try_call */
+ ASSERT(sigdp == NULL);
+ sigdp = erts_port_task_alloc_p2p_sig_data_extra(
+ alloc_size, (void**)&new_evp);
if (evp != &ev) {
+ /* Copy from TMP alloc to port task */
sys_memcpy((void *) new_evp, (void *) evp, alloc_size);
new_evp->iov = (SysIOVec *) (((char *) new_evp)
+ iov_offset);
@@ -2103,7 +2391,6 @@ erts_port_output(Process *c_p,
evp = new_evp;
}
- sigdp = erts_port_task_alloc_p2p_sig_data();
sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
sigdp->u.outputv.from = from;
sigdp->u.outputv.evp = evp;
@@ -2225,7 +2512,7 @@ erts_port_output(Process *c_p,
sigdp->flags &= ~ERTS_P2P_SIG_DATA_FLG_NOSUSPEND;
else if (async_nosuspend) {
ErtsSchedulerData *esdp = (c_p
- ? ERTS_PROC_GET_SCHDATA(c_p)
+ ? erts_proc_sched_data(c_p)
: erts_get_scheduler_data());
ASSERT(esdp);
ns_pthp = &esdp->nosuspend_port_task_handle;
@@ -2312,7 +2599,10 @@ call_deliver_port_exit(int bang_op,
return ERTS_PORT_OP_DROPPED;
}
- if (!erts_deliver_port_exit(prt, from, reason, bang_op))
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, from, am_close);
+
+ if (!erts_deliver_port_exit(prt, from, reason, bang_op, broken_link))
return ERTS_PORT_OP_DROPPED;
#ifdef USE_VM_PROBES
@@ -2349,7 +2639,7 @@ port_sig_exit(Port *prt,
if (sigdp->u.exit.bp)
free_message_buffer(sigdp->u.exit.bp);
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, msg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, msg, prt);
return ERTS_PORT_REDS_EXIT;
}
@@ -2371,6 +2661,11 @@ erts_port_exit(Process *c_p,
| ERTS_PORT_SIG_FLG_BROKEN_LINK
| ERTS_PORT_SIG_FLG_FORCE_SCHED)) == 0);
+#ifndef __WIN32__
+ if (prt->drv_ptr == &forker_driver)
+ return ERTS_PORT_OP_DROPPED;
+#endif
+
if (!(flags & ERTS_PORT_SIG_FLG_FORCE_SCHED)) {
ErtsTryImmDrvCallState try_call_state
= ERTS_INIT_TRY_IMM_DRV_CALL_STATE(c_p,
@@ -2378,13 +2673,13 @@ erts_port_exit(Process *c_p,
ERTS_PORT_SFLGS_INVALID_LOOKUP,
0,
!refp,
- am_exit);
+ am_close);
switch (try_imm_drv_call(&try_call_state)) {
case ERTS_TRY_IMM_DRV_CALL_OK: {
res = call_deliver_port_exit(flags & ERTS_PORT_SIG_FLG_BANG_OP,
- from,
+ c_p ? c_p->common.id : from,
prt,
try_call_state.state,
reason,
@@ -2463,8 +2758,11 @@ set_port_connected(int bang_op,
return ERTS_PORT_OP_DROPPED;
}
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, from, am_connect, connect);
+
ERTS_PORT_SET_CONNECTED(prt, connect);
- deliver_result(prt->common.id, from, am_connected);
+ deliver_result(prt, prt->common.id, from, am_connected);
#ifdef USE_VM_PROBES
if(DTRACE_ENABLED(port_command)) {
@@ -2486,10 +2784,22 @@ set_port_connected(int bang_op,
erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, prt->common.id);
erts_add_link(&ERTS_P_LINKS(prt), LINK_PID, connect);
+ if (IS_TRACED_FL(rp, F_TRACE_PROCS))
+ trace_proc(NULL, 0, rp, am_getting_linked, prt->common.id);
+
ERTS_PORT_SET_CONNECTED(prt, connect);
erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_getting_linked, connect);
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, from, am_connect, connect);
+ if (IS_TRACED_FL(prt, F_TRACE_SEND)) {
+ Eterm hp[3];
+ trace_port_send(prt, from, TUPLE2(hp, prt->common.id, am_connected), 1);
+ }
+
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_connect)) {
DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);
@@ -2522,7 +2832,7 @@ port_sig_connect(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *s
msg = am_true;
}
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, msg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, msg, prt);
return ERTS_PORT_REDS_CONNECT;
}
@@ -2592,8 +2902,11 @@ static void
port_unlink(Port *prt, Eterm from)
{
ErtsLink *lnk = erts_remove_link(&ERTS_P_LINKS(prt), from);
- if (lnk)
+ if (lnk) {
+ if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_getting_unlinked, from);
erts_destroy_link(lnk);
+ }
}
static int
@@ -2602,7 +2915,7 @@ port_sig_unlink(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *si
if (op == ERTS_PROC2PORT_SIG_EXEC)
port_unlink(prt, sigdp->u.unlink.from);
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
return ERTS_PORT_REDS_UNLINK;
}
@@ -2663,10 +2976,10 @@ port_link_failure(Eterm port_id, Eterm linker)
NIL,
NULL,
0);
- if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
+ if (xres >= 0) {
/* We didn't exit the process and it is traced */
if (IS_TRACED_FL(rp, F_TRACE_PROCS))
- trace_proc(NULL, rp, am_getting_unlinked, port_id);
+ trace_proc(NULL, 0, rp, am_getting_unlinked, port_id);
}
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -2677,10 +2990,15 @@ port_link_failure(Eterm port_id, Eterm linker)
static void
port_link(Port *prt, erts_aint32_t state, Eterm to)
{
- if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP))
+ if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_getting_linked, to);
+ if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) {
erts_add_link(&ERTS_P_LINKS(prt), LINK_PID, to);
- else
+ } else {
port_link_failure(prt->common.id, to);
+ if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_unlink, to);
+ }
}
static int
@@ -2688,10 +3006,11 @@ port_sig_link(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigd
{
if (op == ERTS_PROC2PORT_SIG_EXEC)
port_link(prt, state, sigdp->u.link.to);
- else
+ else {
port_link_failure(sigdp->u.link.port, sigdp->u.link.to);
+ }
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
return ERTS_PORT_REDS_LINK;
}
@@ -2735,6 +3054,73 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
port_sig_link);
}
+static void
+init_ack_send_reply(Port *port, Eterm resp)
+{
+
+ if (!is_internal_port(resp)) {
+ Process *rp = erts_proc_lookup_raw(port->async_open_port->to);
+ erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
+ erts_remove_link(&ERTS_P_LINKS(port), port->async_open_port->to);
+ erts_remove_link(&ERTS_P_LINKS(rp), port->common.id);
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ }
+ port_sched_op_reply(port->async_open_port->to,
+ port->async_open_port->ref,
+ resp,
+ port);
+
+ erts_free(ERTS_ALC_T_PRTSD, port->async_open_port);
+ port->async_open_port = NULL;
+}
+
+void
+erl_drv_init_ack(ErlDrvPort ix, ErlDrvData res) {
+ Port *port = erts_drvport2port(ix);
+ SWord err_type = (SWord)res;
+ Eterm resp;
+
+ if (port == ERTS_INVALID_ERL_DRV_PORT && port->async_open_port)
+ return;
+
+ if (port->async_open_port) {
+ switch(err_type) {
+ case -3:
+ resp = am_badarg;
+ break;
+ case -2: {
+ char *str = erl_errno_id(errno);
+ resp = erts_atom_put((byte *) str, strlen(str),
+ ERTS_ATOM_ENC_LATIN1, 1);
+ break;
+ }
+ case -1:
+ resp = am_einval;
+ break;
+ default:
+ resp = port->common.id;
+ break;
+ }
+
+ init_ack_send_reply(port, resp);
+
+ if (err_type == -1 || err_type == -2 || err_type == -3)
+ driver_failure_term(ix, am_normal, 0);
+ port->drv_data = err_type;
+ }
+}
+
+void
+erl_drv_set_os_pid(ErlDrvPort ix, ErlDrvSInt pid) {
+ Port *port = erts_drvport2port(ix);
+
+ if (port == ERTS_INVALID_ERL_DRV_PORT)
+ return;
+
+ port->os_pid = (SWord)pid;
+
+}
+
void erts_init_io(int port_tab_size,
int port_tab_size_ignore_files,
int legacy_port_tab)
@@ -2795,10 +3181,11 @@ void erts_init_io(int port_tab_size,
erts_smp_rwmtx_rwlock(&erts_driver_list_lock);
init_driver(&fd_driver, &fd_driver_entry, NULL);
-#ifndef __OSE__
init_driver(&vanilla_driver, &vanilla_driver_entry, NULL);
-#endif
init_driver(&spawn_driver, &spawn_driver_entry, NULL);
+#ifndef __WIN32__
+ init_driver(&forker_driver, &forker_driver_entry, NULL);
+#endif
erts_init_static_drivers();
for (dp = driver_tab; *dp != NULL; dp++)
erts_add_driver_entry(*dp, NULL, 1);
@@ -2860,6 +3247,9 @@ void erts_lcnt_enable_io_lock_count(int enable) {
lcnt_enable_drv_lock_count(&vanilla_driver, enable);
lcnt_enable_drv_lock_count(&spawn_driver, enable);
+#ifndef __WIN32__
+ lcnt_enable_drv_lock_count(&forker_driver, enable);
+#endif
lcnt_enable_drv_lock_count(&fd_driver, enable);
/* enable lock counting in all drivers */
for (dp = driver_list; dp; dp = dp->next) {
@@ -3039,7 +3429,7 @@ static int read_linebuf(LineBufContext *bp)
}
static void
-deliver_result(Eterm sender, Eterm pid, Eterm res)
+deliver_result(Port *prt, Eterm sender, Eterm pid, Eterm res)
{
Process *rp;
ErtsProcLocks rp_locks = 0;
@@ -3047,24 +3437,35 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ASSERT(!prt || prt->common.id == sender);
+#ifdef ERTS_SMP
+ ASSERT(!prt || erts_lc_is_port_locked(prt));
+#endif
+
ASSERT(is_internal_port(sender) && is_internal_pid(pid));
rp = (scheduler
? erts_proc_lookup(pid)
: erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_INC_REFC));
+ if (prt && IS_TRACED_FL(prt, F_TRACE_SEND)) {
+ Eterm hp[3];
+ trace_port_send(prt, pid, TUPLE2(hp, sender, res), !!rp);
+ }
+
if (rp) {
Eterm tuple;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
Eterm* hp;
Uint sz_res;
sz_res = size_object(res);
- hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks,
+ sz_res + 3, &hp, &ohp);
res = copy_struct(res, sz_res, &hp, ohp);
tuple = TUPLE2(hp, sender, res);
- erts_queue_message(rp, &rp_locks, bp, tuple, NIL);
+ erts_queue_message(rp, rp_locks, mp, tuple, sender);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -3092,10 +3493,11 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
Eterm tuple;
Process* rp;
Eterm* hp;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
+ int trace_send = IS_TRACED_FL(prt, F_TRACE_SEND);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -3118,7 +3520,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
if (!rp)
return;
- hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(trace_send ? NULL : rp, &rp_locks, need, &hp, &ohp);
listp = NIL;
if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) {
@@ -3160,7 +3562,11 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ if (trace_send)
+ trace_port_send(prt, to, tuple, 1);
+
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
+ erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -3234,11 +3640,12 @@ deliver_vec_message(Port* prt, /* Port */
Eterm tuple;
Process* rp;
Eterm* hp;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
erts_aint32_t state;
+ int trace_send = IS_TRACED_FL(prt, F_TRACE_SEND);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -3266,7 +3673,7 @@ deliver_vec_message(Port* prt, /* Port */
need += (hlen+csize)*2;
}
- hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(trace_send ? NULL : rp, &rp_locks, need, &hp, &ohp);
listp = NIL;
iov += vsize;
@@ -3327,7 +3734,11 @@ deliver_vec_message(Port* prt, /* Port */
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ if (IS_TRACED_FL(prt, F_TRACE_SEND))
+ trace_port_send(prt, to, tuple, 1);
+
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
+ erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
erts_proc_dec_refc(rp);
@@ -3367,18 +3778,32 @@ static void flush_port(Port *p)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
if (p->drv_ptr->flush != NULL) {
+ ERTS_MSACC_PUSH_STATE_M();
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_flush)) {
DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p)
DTRACE3(driver_flush, process_str, port_str, p->name);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_flush)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(p), proc_str);
+ lttng_port_to_str(p, port_str);
+ LTTNG3(driver_flush, proc_str, port_str, p->name);
+ }
+#endif
+
+
if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(p, am_in, am_flush);
}
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
fpe_was_unmasked = erts_block_fpe();
(*p->drv_ptr->flush)((ErlDrvData)p->drv_data);
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(p, am_out, am_flush);
}
@@ -3402,6 +3827,7 @@ terminate_port(Port *prt)
Eterm connected_id = NIL /* Initialize to silence compiler */;
erts_driver_t *drv;
erts_aint32_t state;
+ ErtsPrtSD *psd;
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -3426,20 +3852,37 @@ terminate_port(Port *prt)
drv = prt->drv_ptr;
if ((drv != NULL) && (drv->stop != NULL)) {
int fpe_was_unmasked = erts_block_fpe();
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_stop)) {
DTRACE_FORMAT_COMMON_PID_AND_PORT(connected_id, prt)
DTRACE3(driver_stop, process_str, drv->name, port_str);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_stop)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(connected_id, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG3(driver_stop, proc_str, port_str, drv->name);
+ }
+#endif
+
(*drv->stop)((ErlDrvData)prt->drv_data);
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
#ifdef ERTS_SMP
if (prt->xports)
erts_port_handle_xports(prt);
ASSERT(!prt->xports);
#endif
}
+
+ if (is_internal_port(send_closed_port_id)
+ && IS_TRACED_FL(prt, F_TRACE_SEND))
+ trace_port_send(prt, connected_id, am_closed, 1);
+
if(drv->handle != NULL) {
erts_smp_rwmtx_rlock(&erts_driver_list_lock);
erts_ddll_decrement_port_count(drv->handle);
@@ -3453,8 +3896,9 @@ terminate_port(Port *prt)
erts_cleanup_port_data(prt);
- if (prt->psd)
- erts_free(ERTS_ALC_T_PRTSD, prt->psd);
+ psd = (ErtsPrtSD *) erts_smp_atomic_read_nob(&prt->psd);
+ if (psd)
+ erts_free(ERTS_ALC_T_PRTSD, psd);
ASSERT(prt->dist_entry == NULL);
@@ -3470,7 +3914,7 @@ terminate_port(Port *prt)
erts_flush_async_exit(erts_halt_code, "");
}
if (is_internal_port(send_closed_port_id))
- deliver_result(send_closed_port_id, connected_id, am_closed);
+ deliver_result(NULL, send_closed_port_id, connected_id, am_closed);
}
void
@@ -3503,7 +3947,7 @@ static void sweep_one_monitor(ErtsMonitor *mon, void *vpsc)
typedef struct {
- Eterm port;
+ Port *port;
Eterm reason;
} SweepContext;
@@ -3512,10 +3956,13 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
SweepContext *psc = vpsc;
DistEntry *dep;
Process *rp;
-
+ Eterm port_id = psc->port->common.id;
ASSERT(lnk->type == LINK_PID);
-
+
+ if (IS_TRACED_FL(psc->port, F_TRACE_PORTS))
+ trace_port(psc->port, am_unlink, lnk->pid);
+
if (is_external_pid(lnk->pid)) {
dep = external_pid_dist_entry(lnk->pid);
if(dep != erts_this_dist_entry) {
@@ -3528,9 +3975,9 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
case ERTS_DSIG_PREP_NOT_CONNECTED:
break;
case ERTS_DSIG_PREP_CONNECTED:
- erts_remove_dist_link(&dld, psc->port, lnk->pid, dep);
+ erts_remove_dist_link(&dld, port_id, lnk->pid, dep);
erts_destroy_dist_link(&dld);
- code = erts_dsig_send_exit(&dsd, psc->port, lnk->pid,
+ code = erts_dsig_send_exit(&dsd, port_id, lnk->pid,
psc->reason);
ASSERT(code == ERTS_DSIG_SEND_OK);
break;
@@ -3544,23 +3991,25 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
ASSERT(is_internal_pid(lnk->pid));
rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks);
if (rp) {
- ErtsLink *rlnk = erts_remove_link(&ERTS_P_LINKS(rp), psc->port);
+ ErtsLink *rlnk = erts_remove_link(&ERTS_P_LINKS(rp), port_id);
if (rlnk) {
int xres = erts_send_exit_signal(NULL,
- psc->port,
+ port_id,
rp,
&rp_locks,
psc->reason,
NIL,
NULL,
0);
- if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
+ if (xres >= 0) {
+ if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) {
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND);
+ rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND;
+ }
/* We didn't exit the process and it is traced */
- if (IS_TRACED_FL(rp, F_TRACE_PROCS)) {
- trace_proc(NULL, rp, am_getting_unlinked,
- psc->port);
- }
+ if (IS_TRACED_FL(rp, F_TRACE_PROCS))
+ trace_proc(NULL, 0, rp, am_getting_unlinked, port_id);
}
erts_destroy_link(rlnk);
}
@@ -3582,7 +4031,8 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
*/
int
-erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed)
+erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed,
+ int drop_normal)
{
ErtsLink *lnk;
Eterm rreason;
@@ -3612,8 +4062,10 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed)
| ERTS_PORT_SFLG_CLOSING))
return 0;
- if (reason == am_normal && from != ERTS_PORT_GET_CONNECTED(p) && from != p->common.id)
+ if (reason == am_normal && from != ERTS_PORT_GET_CONNECTED(p)
+ && from != p->common.id && drop_normal) {
return 0;
+ }
set_state_flags = ERTS_PORT_SFLG_EXITING;
if (send_closed)
@@ -3624,9 +4076,8 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed)
state = erts_atomic32_read_bor_mb(&p->state, set_state_flags);
state |= set_state_flags;
- if (IS_TRACED_FL(p, F_TRACE_PORTS)) {
+ if (IS_TRACED_FL(p, F_TRACE_PORTS))
trace_port(p, am_closed, reason);
- }
erts_trace_check_exiting(p->common.id);
@@ -3636,7 +4087,7 @@ erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed)
(void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name);
{
- SweepContext sc = {p->common.id, rreason};
+ SweepContext sc = {p, rreason};
lnk = ERTS_P_LINKS(p);
ERTS_P_LINKS(p) = NULL;
erts_sweep_links(lnk, &sweep_one_link, &sc);
@@ -3744,6 +4195,7 @@ call_driver_control(Eterm caller,
ErlDrvSizeT *from_size)
{
ErlDrvSSizeT cres;
+ ERTS_MSACC_PUSH_STATE_M();
if (!prt->drv_ptr->control)
return ERTS_PORT_OP_BADARG;
@@ -3758,6 +4210,21 @@ call_driver_control(Eterm caller,
}
#endif
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, caller, am_control, command, bufp, size);
+
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
+
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_control)) {
+ lttng_decl_procbuf(proc_str);
+ lttng_decl_portbuf(port_str);
+ lttng_pid_to_str(caller, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG5(driver_control, proc_str, port_str, prt->name, command, size);
+ }
+#endif
+
prt->caller = caller;
cres = prt->drv_ptr->control((ErlDrvData) prt->drv_data,
command,
@@ -3767,9 +4234,14 @@ call_driver_control(Eterm caller,
*from_size);
prt->caller = NIL;
+ ERTS_MSACC_POP_STATE_M();
+
if (cres < 0)
return ERTS_PORT_OP_BADARG;
+ if (IS_TRACED_FL(prt, F_TRACE_SEND))
+ trace_port_send_binary(prt, caller, am_control, *resp_bufp, cres);
+
*from_size = (ErlDrvSizeT) cres;
return ERTS_PORT_OP_DONE;
@@ -3818,7 +4290,6 @@ write_port_control_result(int control_flags,
ErlDrvSizeT resp_size,
char *pre_alloc_buf,
Eterm **hpp,
- ErlHeapFragment *bp,
ErlOffHeap *ohp)
{
Eterm res;
@@ -3892,16 +4363,13 @@ port_sig_control(Port *prt,
if (res == ERTS_PORT_OP_DONE) {
Eterm msg;
- Eterm *hp;
- ErlHeapFragment *bp;
- ErlOffHeap *ohp;
ErtsHeapFactory factory;
Process *rp;
ErtsProcLocks rp_locks = 0;
Uint hsz, rsz;
int control_flags;
- rp = erts_proc_lookup_raw(sigdp->caller);
+ rp = sigdp->caller == ERTS_INVALID_PID ? NULL : erts_proc_lookup_raw(sigdp->caller);
if (!rp)
goto done;
@@ -3914,27 +4382,21 @@ port_sig_control(Port *prt,
hsz = rsz + ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
+ (void) erts_factory_message_create(&factory, rp,
+ &rp_locks, hsz);
msg = write_port_control_result(control_flags,
resp_bufp,
resp_size,
&resp_buf[0],
- &hp,
- bp,
- ohp);
- factory.hp = hp;
-
+ &factory.hp,
+ factory.off_heap);
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
sigdp->ref,
- msg);
+ msg,
+ prt);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -3944,7 +4406,8 @@ port_sig_control(Port *prt,
/* failure */
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
+ if (sigdp->caller != ERTS_INVALID_PID)
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg, prt);
done:
@@ -3954,6 +4417,23 @@ done:
return ERTS_PORT_REDS_CONTROL;
}
+/*
+ * This is an asynchronous control call. I.e. it will not return anything
+ * to the caller.
+ */
+int
+erl_drv_port_control(Eterm port_num, char cmd, char* buff, ErlDrvSizeT size)
+{
+ ErtsProc2PortSigData *sigdp = erts_port_task_alloc_p2p_sig_data();
+
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL | ERTS_P2P_SIG_DATA_FLG_REPLY;
+ sigdp->u.control.binp = NULL;
+ sigdp->u.control.command = cmd;
+ sigdp->u.control.bufp = buff;
+ sigdp->u.control.size = size;
+
+ return erts_schedule_port2port_signal(port_num, sigdp, 0, port_sig_control);
+}
ErtsPortOpResult
erts_port_control(Process* c_p,
@@ -4070,7 +4550,6 @@ erts_port_control(Process* c_p,
resp_size,
&resp_buf[0],
&hp,
- NULL,
&c_p->off_heap);
BUMP_REDS(c_p, ERTS_PORT_REDS_CONTROL);
return ERTS_PORT_OP_DONE;
@@ -4092,10 +4571,10 @@ erts_port_control(Process* c_p,
binp = NULL;
if (is_binary(data) && binary_bitoffset(data) == 0) {
- Eterm *ebinp = binary_val_rel(data, NULL);
+ Eterm *ebinp = binary_val(data);
ASSERT(!tmp_alloced);
if (*ebinp == HEADER_SUB_BIN)
- ebinp = binary_val_rel(((ErlSubBin *) ebinp)->orig, NULL);
+ ebinp = binary_val(((ErlSubBin *) ebinp)->orig);
if (*ebinp != HEADER_PROC_BIN)
copy = 1;
else {
@@ -4148,6 +4627,7 @@ call_driver_call(Eterm caller,
unsigned *ret_flagsp)
{
ErlDrvSSizeT cres;
+ ERTS_MSACC_PUSH_STATE_M();
if (!prt->drv_ptr->call)
return ERTS_PORT_OP_BADARG;
@@ -4162,6 +4642,20 @@ call_driver_call(Eterm caller,
DTRACE5(driver_call, process_str, port_str, prt->name, command, size);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_call)) {
+ lttng_decl_procbuf(proc_str);
+ lttng_decl_portbuf(port_str);
+ lttng_pid_to_str(caller,proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG5(driver_call, proc_str, port_str, prt->name, command, size);
+ }
+#endif
+
+ if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
+ trace_port_receive(prt, caller, am_call, command, bufp, size);
+
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
prt->caller = caller;
cres = prt->drv_ptr->call((ErlDrvData) prt->drv_data,
@@ -4173,10 +4667,15 @@ call_driver_call(Eterm caller,
ret_flagsp);
prt->caller = NIL;
+ ERTS_MSACC_POP_STATE_M();
+
if (cres <= 0
|| ((byte) (*resp_bufp)[0]) != VERSION_MAGIC)
return ERTS_PORT_OP_BADARG;
+ if (IS_TRACED_FL(prt, F_TRACE_SEND))
+ trace_port_send_binary(prt, caller, am_call, *resp_bufp, cres);
+
*from_size = (ErlDrvSizeT) cres;
return ERTS_PORT_OP_DONE;
@@ -4229,22 +4728,15 @@ port_sig_call(Port *prt,
hsz = erts_decode_ext_size((byte *) resp_bufp, resp_size);
if (hsz >= 0) {
- ErlHeapFragment* bp;
- ErlOffHeap* ohp;
ErtsHeapFactory factory;
byte *endp;
hsz += 3; /* ok tuple */
hsz += ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
+ (void) erts_factory_message_create(&factory, rp, &rp_locks, hsz);
endp = (byte *) resp_bufp;
- erts_factory_message_init(&factory, rp, hp, bp);
- msg = erts_decode_ext(&factory, &endp);
+ msg = erts_decode_ext(&factory, &endp, 0);
if (is_value(msg)) {
hp = erts_produce_heap(&factory,
3,
@@ -4252,10 +4744,11 @@ port_sig_call(Port *prt,
msg = TUPLE2(hp, am_ok, msg);
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
sigdp->ref,
- msg);
+ msg,
+ prt);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -4267,7 +4760,7 @@ port_sig_call(Port *prt,
}
}
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg, prt);
done:
@@ -4363,7 +4856,7 @@ erts_port_call(Process* c_p,
hsz += 3;
erts_factory_proc_prealloc_init(&factory, c_p, hsz);
endp = (byte *) resp_bufp;
- term = erts_decode_ext(&factory, &endp);
+ term = erts_decode_ext(&factory, &endp, 0);
if (term == THE_NON_VALUE)
return ERTS_PORT_OP_BADARG;
hp = erts_produce_heap(&factory,3,0);
@@ -4482,7 +4975,7 @@ port_sig_info(Port *prt,
{
ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY);
if (op != ERTS_PROC2PORT_SIG_EXEC)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_undefined);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_undefined, prt);
else {
Eterm *hp, *hp_start;
Uint hsz;
@@ -4504,12 +4997,15 @@ port_sig_info(Port *prt,
sigdp->u.info.item);
if (is_value(value)) {
ErtsHeapFactory factory;
- erts_factory_message_init(&factory, NULL, hp, bp);
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = bp;
+ erts_factory_selfcontained_message_init(&factory, mp, hp);
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
sigdp->ref,
- value);
+ value,
+ prt);
}
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -4592,8 +5088,8 @@ reply_io_bytes(void *vreq)
rp = erts_proc_lookup(req->pid);
if (rp) {
- ErlOffHeap *ohp = NULL;
- ErlHeapFragment *bp = NULL;
+ ErlOffHeap *ohp;
+ ErtsMessage *mp;
ErtsProcLocks rp_locks;
Eterm ref, msg, ein, eout, *hp;
Uint64 in, out;
@@ -4615,7 +5111,7 @@ reply_io_bytes(void *vreq)
erts_bld_uint64(NULL, &hsz, in);
erts_bld_uint64(NULL, &hsz, out);
- hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp);
ref = make_internal_ref(hp);
write_ref_thing(hp, req->refn[0], req->refn[1], req->refn[2]);
@@ -4625,7 +5121,8 @@ reply_io_bytes(void *vreq)
eout = erts_bld_uint64(&hp, NULL, out);
msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout);
- erts_queue_message(rp, &rp_locks, bp, msg, NIL);
+
+ erts_queue_message(rp, rp_locks, mp, msg, am_system);
if (req->sched_id == sched_id)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -4643,7 +5140,7 @@ erts_request_io_bytes(Process *c_p)
Uint *hp;
Eterm ref;
Uint32 *refn;
- ErtsSchedulerData *esdp = ERTS_PROC_GET_SCHDATA(c_p);
+ ErtsSchedulerData *esdp = erts_proc_sched_data(c_p);
ErtsIOBytesReq *req = erts_alloc(ERTS_ALC_T_IOB_REQ,
sizeof(ErtsIOBytesReq));
@@ -4731,6 +5228,10 @@ print_port_info(Port *p, int to, void *arg)
erts_print(to, arg, "Port is a file: %s\n",p->name);
} else if (p->drv_ptr == &spawn_driver) {
erts_print(to, arg, "Port controls external process: %s\n",p->name);
+#ifndef __WIN32__
+ } else if (p->drv_ptr == &forker_driver) {
+ erts_print(to, arg, "Port controls forker process: %s\n",p->name);
+#endif
} else {
erts_print(to, arg, "Port controls linked-in driver: %s\n",p->name);
}
@@ -4877,6 +5378,7 @@ int get_port_flags(ErlDrvPort ix)
void erts_raw_port_command(Port* p, byte* buf, Uint len)
{
int fpe_was_unmasked;
+ ERTS_MSACC_PUSH_STATE_M();
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
@@ -4897,9 +5399,11 @@ void erts_raw_port_command(Port* p, byte* buf, Uint len)
DTRACE4(driver_output, "-raw-", port_str, p->name, len);
}
#endif
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
fpe_was_unmasked = erts_block_fpe();
(*p->drv_ptr->output)((ErlDrvData)p->drv_data, (char*) buf, (int) len);
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
}
int async_ready(Port *p, void* data)
@@ -4911,14 +5415,25 @@ int async_ready(Port *p, void* data)
if (p) {
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
if (p->drv_ptr->ready_async != NULL) {
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_ready_async)) {
DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p)
DTRACE3(driver_ready_async, process_str, port_str, p->name);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_ready_async)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(p), proc_str);
+ lttng_port_to_str(p, port_str);
+ LTTNG3(driver_ready_async, proc_str, port_str, p->name);
+ }
+#endif
(*p->drv_ptr->ready_async)((ErlDrvData)p->drv_data, data);
need_free = 0;
+ ERTS_MSACC_POP_STATE_M();
}
erts_port_driver_callback_epilogue(p, NULL);
@@ -5070,14 +5585,15 @@ ErlDrvTermData driver_mk_term_nil(void)
void driver_report_exit(ErlDrvPort ix, int status)
{
Eterm* hp;
+ ErlOffHeap *ohp;
Eterm tuple;
Process *rp;
Eterm pid;
- ErlHeapFragment *bp = NULL;
- ErlOffHeap *ohp;
+ ErtsMessage *mp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
Port* prt = erts_drvport2port(ix);
+ int trace_send = IS_TRACED_FL(prt, F_TRACE_SEND);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return;
@@ -5094,13 +5610,17 @@ void driver_report_exit(ErlDrvPort ix, int status)
if (!rp)
return;
- hp = erts_alloc_message_heap(3+3, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(trace_send ? NULL : rp, &rp_locks, 3+3, &hp, &ohp);
tuple = TUPLE2(hp, am_exit_status, make_small(status));
hp += 3;
tuple = TUPLE2(hp, prt->common.id, tuple);
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ if (IS_TRACED_FL(prt, F_TRACE_SEND))
+ trace_port_send(prt, pid, tuple, 1);
+
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
+ erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -5194,13 +5714,13 @@ cleanup_b2t_states(struct b2t_states__ *b2tsp)
*/
static int
-driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
+driver_deliver_term(Port *prt, Eterm to, ErlDrvTermData* data, int len)
{
#define HEAP_EXTRA 200
#define ERTS_DDT_FAIL do { res = -1; goto done; } while (0)
Uint need = 0;
int depth = 0;
- int res;
+ int res = 0;
ErlDrvTermData* ptr;
ErlDrvTermData* ptr_end;
DECLARE_ESTACK(stack);
@@ -5210,7 +5730,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
ErtsProcLocks rp_locks = 0;
struct b2t_states__ b2t;
int scheduler;
- int is_heap_need_limited = 1;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_UNDEF(mess,NIL);
@@ -5249,25 +5768,17 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
break;
case ERL_DRV_INT: /* signed int argument */
ERTS_DDT_CHK_ENOUGH_ARGS(1);
-#if HALFWORD_HEAP
- erts_bld_sint64(NULL, &need, (Sint64)ptr[0]);
-#else
/* check for bignum */
if (!IS_SSMALL((Sint)ptr[0]))
need += BIG_UINT_HEAP_SIZE; /* use small_to_big */
-#endif
ptr++;
depth++;
break;
case ERL_DRV_UINT: /* unsigned int argument */
ERTS_DDT_CHK_ENOUGH_ARGS(1);
-#if HALFWORD_HEAP
- erts_bld_uint64(NULL, &need, (Uint64)ptr[0]);
-#else
/* check for bignum */
if (!IS_USMALL(0, (Uint)ptr[0]))
need += BIG_UINT_HEAP_SIZE; /* use small_to_big */
-#endif
ptr++;
depth++;
break;
@@ -5387,9 +5898,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
need += hsz;
ptr += 2;
depth++;
- if (size > MAP_SMALL_MAP_LIMIT*3) { /* may contain big map */
- is_heap_need_limited = 0;
- }
break;
}
case ERL_DRV_MAP: { /* int */
@@ -5397,7 +5905,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
if ((int) ptr[0] < 0) ERTS_DDT_FAIL;
if (ptr[0] > MAP_SMALL_MAP_LIMIT) {
need += HASHMAP_ESTIMATED_HEAP_SIZE(ptr[0]);
- is_heap_need_limited = 0;
} else {
need += MAP_HEADER_FLATMAP_SZ + 1 + 2*ptr[0];
}
@@ -5432,20 +5939,25 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
? erts_proc_lookup(to)
: erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_INC_REFC));
if (!rp) {
- res = 0;
- goto done;
- }
+ if (!prt || !IS_TRACED_FL(prt, F_TRACE_SEND))
+ goto done;
+ if (!erts_is_tracer_proc_enabled_send(NULL, 0, &prt->common))
+ goto done;
- /* Try copy directly to destination heap if we know there are no big maps */
- if (is_heap_need_limited) {
- ErlOffHeap *ohp;
- ErlHeapFragment* bp;
- Eterm* hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
- }
- else {
- erts_factory_message_init(&factory, NULL, NULL,
- new_message_buffer(need));
+ res = -2;
+
+ /* We allocate a temporary heap to be used to create
+ the message that may be sent using tracing */
+ erts_factory_tmp_init(&factory, erts_alloc(ERTS_ALC_T_DRIVER, need*sizeof(Eterm)),
+ need, ERTS_ALC_T_DRIVER);
+
+ } else {
+ /* We force the creation of a heap fragment (rp == NULL) when send
+ tracing so that we don't have the main lock of the process while
+ tracing */
+ Process *trace_rp = prt && IS_TRACED_FL(prt, F_TRACE_SEND) ? NULL : rp;
+ (void) erts_factory_message_create(&factory, trace_rp, &rp_locks, need);
+ res = 1;
}
/*
@@ -5466,10 +5978,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
break;
case ERL_DRV_INT: /* signed int argument */
-#if HALFWORD_HEAP
- erts_reserve_heap(&factory, BIG_NEED_SIZE(2));
- mess = erts_bld_sint64(&factory.hp, NULL, (Sint64)ptr[0]);
-#else
erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE);
if (IS_SSMALL((Sint)ptr[0]))
mess = make_small((Sint)ptr[0]);
@@ -5477,15 +5985,10 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
mess = small_to_big((Sint)ptr[0], factory.hp);
factory.hp += BIG_UINT_HEAP_SIZE;
}
-#endif
ptr++;
break;
case ERL_DRV_UINT: /* unsigned int argument */
-#if HALFWORD_HEAP
- erts_reserve_heap(&factory, BIG_NEED_FOR_BITS(64));
- mess = erts_bld_uint64(&factory.hp, NULL, (Uint64)ptr[0]);
-#else
erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE);
if (IS_USMALL(0, (Uint)ptr[0]))
mess = make_small((Uint)ptr[0]);
@@ -5493,7 +5996,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
mess = uint_to_big((Uint)ptr[0], factory.hp);
factory.hp += BIG_UINT_HEAP_SIZE;
}
-#endif
ptr++;
break;
@@ -5719,22 +6221,45 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
ESTACK_PUSH(stack, mess);
}
- res = 1;
-
done:
if (res > 0) {
+ Eterm from = am_undefined;
mess = ESTACK_POP(stack); /* get resulting value */
- erts_factory_close(&factory);
+ erts_factory_trim_and_close(&factory, &mess, 1);
+
+ if (prt) {
+ if (IS_TRACED_FL(prt, F_TRACE_SEND)) {
+ trace_port_send(prt, to, mess, 1);
+ }
+ from = prt->common.id;
+ }
+
/* send message */
- erts_queue_message(rp, &rp_locks, factory.heap_frags, mess, am_undefined);
+ ERL_MESSAGE_TOKEN(factory.message) = am_undefined;
+ erts_queue_message(rp, rp_locks, factory.message, mess, from);
+ }
+ else if (res == -2) {
+ /* this clause only happens when we were requested to
+ generate a send trace, but the process to send to
+ did not exist any more */
+ mess = ESTACK_POP(stack); /* get resulting value */
+
+ trace_port_send(prt, to, mess, 0);
+
+ erts_factory_trim_and_close(&factory, &mess, 1);
+ erts_free(ERTS_ALC_T_DRIVER, factory.hp_start);
+ res = 0;
}
else {
if (b2t.ix > b2t.used)
b2t.used = b2t.ix;
for (b2t.ix = 0; b2t.ix < b2t.used; b2t.ix++)
erts_binary2term_abort(&b2t.state[b2t.ix]);
- erts_factory_undo(&factory);
+ if (factory.mode != FACTORY_CLOSED) {
+ ERL_MESSAGE_TERM(factory.message) = am_undefined;
+ erts_factory_undo(&factory);
+ }
}
if (rp) {
if (rp_locks)
@@ -5750,7 +6275,8 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
}
static ERTS_INLINE int
-deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p)
+deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p,
+ Port **trace_prt)
{
#ifdef ERTS_SMP
ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
@@ -5778,8 +6304,11 @@ deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p)
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
erts_thr_progress_unmanaged_continue(dhndl);
ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
- }
+ } else
#endif
+ {
+ *trace_prt = prt;
+ }
ERTS_SMP_LC_ASSERT(dhndl == ERTS_THR_PRGR_DHANDLE_MANAGED
? erts_lc_is_port_locked(prt)
: !erts_lc_is_port_locked(prt));
@@ -5790,10 +6319,11 @@ int erl_drv_output_term(ErlDrvTermData port_id, ErlDrvTermData* data, int len)
{
/* May be called from arbitrary thread */
Eterm connected;
- int res = deliver_term_check_port(port_id, &connected);
+ Port *prt = NULL;
+ int res = deliver_term_check_port(port_id, &connected, &prt);
if (res <= 0)
return res;
- return driver_deliver_term(connected, data, len);
+ return driver_deliver_term(prt, connected, data, len);
}
/*
@@ -5817,7 +6347,7 @@ driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len)
if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
- return driver_deliver_term(ERTS_PORT_GET_CONNECTED(prt), data, len);
+ return driver_deliver_term(prt, ERTS_PORT_GET_CONNECTED(prt), data, len);
}
int erl_drv_send_term(ErlDrvTermData port_id,
@@ -5826,10 +6356,11 @@ int erl_drv_send_term(ErlDrvTermData port_id,
int len)
{
/* May be called from arbitrary thread */
- int res = deliver_term_check_port(port_id, NULL);
+ Port *prt = NULL;
+ int res = deliver_term_check_port(port_id, NULL, &prt);
if (res <= 0)
return res;
- return driver_deliver_term(to, data, len);
+ return driver_deliver_term(prt, to, data, len);
}
/*
@@ -5848,20 +6379,21 @@ driver_send_term(ErlDrvPort drvport,
* to make this access safe without using a less efficient
* internal data representation for ErlDrvPort.
*/
+ Port* prt = NULL;
ERTS_SMP_CHK_NO_PROC_LOCKS;
#ifdef ERTS_SMP
if (erts_thr_progress_is_managed_thread())
#endif
{
erts_aint32_t state;
- Port* prt = erts_drvport2port_state(drvport, &state);
+ prt = erts_drvport2port_state(drvport, &state);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1; /* invalid (dead) */
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
}
- return driver_deliver_term(to, data, len);
+ return driver_deliver_term(prt, to, data, len);
}
@@ -6832,7 +7364,7 @@ int driver_monitor_process(ErlDrvPort drvport,
{
Port *prt;
int ret;
-#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -6843,16 +7375,6 @@ int driver_monitor_process(ErlDrvPort drvport,
/* Now (in SMP) we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
-
-#if !HEAP_ON_C_STACK
- if (!sched) {
- /* Need a separate allocation for the ref :( */
- Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
- sizeof(Eterm)*REF_THING_SIZE);
- ret = do_driver_monitor_process(prt,buf,process,monitor);
- erts_free(ERTS_ALC_T_TEMP_TERM,buf);
- } else
-#endif
{
DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
UseTmpHeapNoproc(REF_THING_SIZE);
@@ -6905,7 +7427,7 @@ int driver_demonitor_process(ErlDrvPort drvport,
{
Port *prt;
int ret;
-#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -6916,15 +7438,6 @@ int driver_demonitor_process(ErlDrvPort drvport,
/* Now we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
-#if !HEAP_ON_C_STACK
- if (!sched) {
- /* Need a separate allocation for the ref :( */
- Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
- sizeof(Eterm)*REF_THING_SIZE);
- ret = do_driver_demonitor_process(prt,buf,monitor);
- erts_free(ERTS_ALC_T_TEMP_TERM,buf);
- } else
-#endif
{
DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
UseTmpHeapNoproc(REF_THING_SIZE);
@@ -6960,7 +7473,7 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport,
{
Port *prt;
ErlDrvTermData ret;
-#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -6971,16 +7484,6 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport,
/* Now we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
-
-#if !HEAP_ON_C_STACK
- if (!sched) {
- /* Need a separate allocation for the ref :( */
- Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
- sizeof(Eterm)*REF_THING_SIZE);
- ret = do_driver_get_monitored_process(prt,buf,monitor);
- erts_free(ERTS_ALC_T_TEMP_TERM,buf);
- } else
-#endif
{
DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
UseTmpHeapNoproc(REF_THING_SIZE);
@@ -7004,6 +7507,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
void (*callback)(ErlDrvData drv_data, ErlDrvMonitor *monitor);
ErlDrvMonitor drv_monitor;
int fpe_was_unmasked;
+ ERTS_MSACC_PUSH_STATE_M();
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
ASSERT(prt->drv_ptr != NULL);
@@ -7015,6 +7519,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
callback = prt->drv_ptr->process_exit;
ASSERT(callback != NULL);
ref_to_driver_monitor(ref,&drv_monitor);
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
DRV_MONITOR_UNLOCK_PDL(prt);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_process_exit)) {
@@ -7022,10 +7527,20 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
DTRACE3(driver_process_exit, process_str, port_str, prt->name);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_process_exit)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(prt), proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG3(driver_process_exit, proc_str, port_str, prt->name);
+ }
+#endif
fpe_was_unmasked = erts_block_fpe();
(*callback)((ErlDrvData) (prt->drv_data), &drv_monitor);
erts_unblock_fpe(fpe_was_unmasked);
DRV_MONITOR_LOCK_PDL(prt);
+ ERTS_MSACC_POP_STATE_M();
/* remove monitor *after* callback */
rmon = erts_remove_monitor(&ERTS_P_MONITORS(prt), ref);
DRV_MONITOR_UNLOCK_PDL(prt);
@@ -7046,12 +7561,15 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ if (prt->async_open_port)
+ init_ack_send_reply(prt, prt->common.id);
if (eof)
flush_linebuf_messages(prt, state);
if (state & ERTS_PORT_SFLG_CLOSING) {
terminate_port(prt);
} else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) {
- deliver_result(prt->common.id, ERTS_PORT_GET_CONNECTED(prt), am_eof);
+ deliver_result(prt, prt->common.id, ERTS_PORT_GET_CONNECTED(prt), am_eof);
} else {
/* XXX UGLY WORK AROUND, Let erts_deliver_port_exit() terminate the port */
if (prt->port_data_lock)
@@ -7059,7 +7577,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
prt->ioq.size = 0;
if (prt->port_data_lock)
driver_pdl_unlock(prt->port_data_lock);
- erts_deliver_port_exit(prt, prt->common.id, eof ? am_normal : term, 0);
+ erts_deliver_port_exit(prt, prt->common.id, eof ? am_normal : term, 0, 0);
}
return 0;
}
@@ -7363,11 +7881,13 @@ driver_system_info(ErlDrvSysInfo *sip, size_t si_size)
* (driver version 3.1, NIF version 2.7)
*/
if (si_size >= ERL_DRV_SYS_INFO_SIZE(dirty_scheduler_support)) {
-#if defined(ERL_NIF_DIRTY_SCHEDULER_SUPPORT) && defined(USE_THREADS)
- sip->dirty_scheduler_support = 1;
+ sip->dirty_scheduler_support =
+#ifdef ERTS_DIRTY_SCHEDULERS
+ 1
#else
- sip->dirty_scheduler_support = 0;
+ 0
#endif
+ ;
}
}
@@ -7495,6 +8015,8 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
int fpe_was_unmasked = erts_block_fpe();
DTRACE4(driver_init, drv->name, drv->version.major, drv->version.minor,
drv->flags);
+ LTTNG4(driver_init, drv->name, drv->version.major, drv->version.minor,
+ drv->flags);
res = (*de->init)();
erts_unblock_fpe(fpe_was_unmasked);
return res;