aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c708
1 files changed, 546 insertions, 162 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 81a05a6b87..de73da8e22 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2014. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2016. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -47,16 +47,18 @@
#define ERTS_WANT_EXTERNAL_TAGS
#include "external.h"
#include "dtrace-wrapper.h"
+#include "lttng-wrapper.h"
#include "erl_map.h"
#include "erl_bif_unique.h"
#include "erl_hl_timer.h"
#include "erl_time.h"
extern ErlDrvEntry fd_driver_entry;
-#ifndef __OSE__
extern ErlDrvEntry vanilla_driver_entry;
-#endif
extern ErlDrvEntry spawn_driver_entry;
+#ifndef __WIN32__
+extern ErlDrvEntry forker_driver_entry;
+#endif
extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */
erts_driver_t *driver_list; /* List of all drivers, static and dynamic. */
@@ -74,6 +76,9 @@ const Port erts_invalid_port = {{ERTS_INVALID_PORT}};
erts_driver_t vanilla_driver;
erts_driver_t spawn_driver;
+#ifndef __WIN32__
+erts_driver_t forker_driver;
+#endif
erts_driver_t fd_driver;
int erts_port_synchronous_ops = 0;
@@ -87,6 +92,7 @@ static void deliver_result(Eterm sender, Eterm pid, Eterm res);
static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *);
static void terminate_port(Port *p);
static void pdl_init(void);
+static int driver_failure_term(ErlDrvPort ix, Eterm term, int eof);
#ifdef ERTS_SMP
static void driver_monitor_lock_pdl(Port *p);
static void driver_monitor_unlock_pdl(Port *p);
@@ -308,12 +314,9 @@ static Port *create_port(char *name,
size_t port_size, busy_port_queue_size, size;
erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED;
erts_aint32_t x_pts_flgs = 0;
-#ifdef DEBUG
- /* Make sure the debug flags survives until port is freed */
- state |= ERTS_PORT_SFLG_PORT_DEBUG;
-#endif
#ifdef ERTS_SMP
+ ErtsRunQueue *runq;
if (!driver_lock) {
/* Align size for mutex following port struct */
port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
@@ -323,6 +326,12 @@ static Port *create_port(char *name,
#endif
port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
+#ifdef DEBUG
+ /* Make sure the debug flags survives until port is freed */
+ state |= ERTS_PORT_SFLG_PORT_DEBUG;
+#endif
+
+
busy_port_queue_size
= ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ)
? 0
@@ -358,8 +367,12 @@ static Port *create_port(char *name,
p += sizeof(erts_mtx_t);
state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
}
- erts_smp_atomic_set_nob(&prt->run_queue,
- (erts_aint_t) erts_get_runq_current(NULL));
+ if (erts_get_scheduler_data())
+ runq = erts_get_runq_current(NULL);
+ else
+ runq = ERTS_RUNQ_IX(0);
+ erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq);
+
prt->xports = NULL;
#else
erts_atomic32_init_nob(&prt->refc, 1);
@@ -385,7 +398,8 @@ static Port *create_port(char *name,
prt->common.u.alive.reg = NULL;
ERTS_PTMR_INIT(prt);
erts_port_task_handle_init(&prt->timeout_task);
- prt->psd = NULL;
+ erts_smp_atomic_init_nob(&prt->psd, (erts_aint_t) NULL);
+ prt->async_open_port = NULL;
prt->drv_data = (SWord) 0;
prt->os_pid = -1;
@@ -467,6 +481,11 @@ erts_port_free(Port *prt)
erts_port_task_fini_sched(&prt->sched);
+ if (prt->async_open_port) {
+ erts_free(ERTS_ALC_T_PRTSD, prt->async_open_port);
+ prt->async_open_port = NULL;
+ }
+
#ifdef ERTS_SMP
ASSERT(prt->lock);
if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK)
@@ -688,6 +707,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
error_number = error_type = 0;
if (driver->start) {
+ ERTS_MSACC_PUSH_STATE_M();
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(port, am_in, am_start);
}
@@ -698,6 +718,19 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
DTRACE3(driver_start, process_str, driver->name, port_str);
}
#endif
+
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
+
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_start)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(pid, proc_str);
+ lttng_port_to_str(port, port_str);
+ LTTNG3(driver_start, proc_str, driver->name, port_str);
+ }
+#endif
+
fpe_was_unmasked = erts_block_fpe();
drv_data = (*driver->start)(ERTS_Port2ErlDrvPort(port), name, opts);
if (((SWord) drv_data) == -1)
@@ -717,6 +750,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
}
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
port->caller = NIL;
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(port, am_out, am_start);
@@ -1106,7 +1140,7 @@ io_list_vec_len(Eterm obj, int* vsize, Uint* csize,
Uint p_v_size = 0;
Uint p_c_size = 0;
Uint p_in_clist = 0;
- Uint total; /* Uint due to halfword emulator */
+ Uint total;
goto L_jump_start; /* avoid a push */
@@ -1413,7 +1447,7 @@ queue_port_sched_op_reply(Process *rp,
erts_factory_trim_and_close(factory, &msg, 1);
- erts_queue_message(rp, rp_locksp, factory->heap_frags, msg, NIL);
+ erts_queue_message(rp, rp_locksp, factory->message, msg, NIL);
}
static void
@@ -1421,12 +1455,9 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
{
Process *rp = erts_proc_lookup_raw(to);
if (rp) {
- ErlOffHeap *ohp;
- ErlHeapFragment* bp;
ErtsHeapFactory factory;
Eterm msg_copy;
Uint hsz, msg_sz;
- Eterm *hp;
ErtsProcLocks rp_locks = 0;
hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
@@ -1437,18 +1468,13 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
hsz += msg_sz;
}
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
- if (is_immed(msg))
- msg_copy = msg;
- else {
- msg_copy = copy_struct(msg, msg_sz, &hp, ohp);
- factory.hp = hp;
- }
+ (void) erts_factory_message_create(&factory, rp,
+ &rp_locks, hsz);
+ msg_copy = (is_immed(msg)
+ ? msg
+ : copy_struct(msg, msg_sz,
+ &factory.hp,
+ factory.off_heap));
queue_port_sched_op_reply(rp,
&rp_locks,
@@ -1529,13 +1555,44 @@ erts_schedule_proc2port_signal(Process *c_p,
erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MAIN);
if (sched_res != 0) {
- if (refp)
+ if (refp) {
+ /*
+ * We need to restore the message queue save
+ * pointer to the beginning of the message queue
+ * since the caller now wont wait for a message
+ * containing the reference created above...
+ */
+ ASSERT(c_p);
+ erts_smp_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
+ JOIN_MESSAGE(c_p);
+ erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
*refp = NIL;
+ }
return ERTS_PORT_OP_DROPPED;
}
return ERTS_PORT_OP_SCHEDULED;
}
+static int
+erts_schedule_port2port_signal(Eterm port_num, ErtsProc2PortSigData *sigdp,
+ int task_flags,
+ ErtsProc2PortSigCallback callback)
+{
+ Port *prt = erts_port_lookup_raw(port_num);
+
+ if (!prt)
+ return -1;
+
+ sigdp->caller = ERTS_INVALID_PID;
+
+ return erts_port_task_schedule(prt->common.id,
+ NULL,
+ ERTS_PORT_TASK_PROC_SIG,
+ sigdp,
+ callback,
+ task_flags);
+}
+
static ERTS_INLINE void
send_badsig(Port *prt) {
ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
@@ -1680,6 +1737,7 @@ call_driver_outputv(int bang_op,
else {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ErlDrvSizeT size = evp->size;
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
@@ -1690,6 +1748,15 @@ call_driver_outputv(int bang_op,
DTRACE4(driver_outputv, process_str, port_str, prt->name, size);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_outputv)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(caller, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG4(driver_outputv, proc_str, port_str, prt->name, size);
+ }
+#endif
prt->caller = caller;
(*drv->outputv)((ErlDrvData) prt->drv_data, evp);
@@ -1700,6 +1767,8 @@ call_driver_outputv(int bang_op,
esdp->io.out += (Uint64) size;
else
erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size);
+
+ ERTS_MSACC_POP_STATE_M();
}
}
@@ -1713,7 +1782,6 @@ cleanup_scheduled_outputv(ErlIOVec *ev, ErlDrvBinary *cbinp)
driver_free_binary(ev->binv[i]);
if (cbinp)
driver_free_binary(cbinp);
- erts_free(ERTS_ALC_T_DRV_CMD_DATA, ev);
}
static int
@@ -1780,6 +1848,7 @@ call_driver_output(int bang_op,
send_badsig(prt);
else {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
@@ -1789,6 +1858,15 @@ call_driver_output(int bang_op,
DTRACE4(driver_output, process_str, port_str, prt->name, size);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_output)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(caller, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG4(driver_output, proc_str, port_str, prt->name, size);
+ }
+#endif
prt->caller = caller;
(*drv->output)((ErlDrvData) prt->drv_data, bufp, size);
@@ -1799,6 +1877,8 @@ call_driver_output(int bang_op,
esdp->io.out += (Uint64) size;
else
erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size);
+
+ ERTS_MSACC_POP_STATE_M();
}
}
@@ -1848,6 +1928,188 @@ port_sig_output(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *si
return ERTS_PORT_REDS_CMD_OUTPUT;
}
+
+/*
+ * This erts_port_output will always create a port task.
+ * The call is treated as a port_command call, i.e. no
+ * badsig i generated if the input in invalid. However
+ * an error_logger message is generated.
+ */
+int
+erts_port_output_async(Port *prt, Eterm from, Eterm list)
+{
+
+ ErtsPortOpResult res;
+ ErtsProc2PortSigData *sigdp;
+ erts_driver_t *drv = prt->drv_ptr;
+ size_t size;
+ int task_flags;
+ ErtsProc2PortSigCallback port_sig_callback;
+ ErlDrvBinary *cbin = NULL;
+ ErlIOVec *evp = NULL;
+ char *buf = NULL;
+ ErtsPortTaskHandle *ns_pthp;
+
+ if (drv->outputv) {
+ ErlIOVec ev;
+ SysIOVec* ivp;
+ ErlDrvBinary** bvp;
+ int vsize;
+ Uint csize;
+ Uint pvsize;
+ Uint pcsize;
+ size_t iov_offset, binv_offset, alloc_size;
+ Uint blimit = 0;
+ char *ptr;
+ int i;
+
+ Eterm* bptr = NULL;
+ Uint offset;
+
+ if (is_binary(list)) {
+ /* We optimize for when we get a procbin without offset */
+ Eterm real_bin;
+ int bitoffs;
+ int bitsize;
+ ERTS_GET_REAL_BIN(list, real_bin, offset, bitoffs, bitsize);
+ bptr = binary_val(real_bin);
+ if (*bptr == HEADER_PROC_BIN && bitoffs == 0) {
+ size = binary_size(list);
+ vsize = 1;
+ } else
+ bptr = NULL;
+ }
+
+ if (!bptr) {
+ if (io_list_vec_len(list, &vsize, &csize, &pvsize, &pcsize, &size))
+ goto bad_value;
+
+ /* To pack or not to pack (small binaries) ...? */
+ if (vsize >= SMALL_WRITE_VEC) {
+ /* Do pack */
+ vsize = pvsize + 1;
+ csize = pcsize;
+ blimit = ERL_SMALL_IO_BIN_LIMIT;
+ }
+ cbin = driver_alloc_binary(csize);
+ if (!cbin)
+ erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
+ }
+
+
+ iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlIOVec));
+ binv_offset = iov_offset;
+ binv_offset += ERTS_ALC_DATA_ALIGN_SIZE((vsize+1)*sizeof(SysIOVec));
+ alloc_size = binv_offset;
+ alloc_size += (vsize+1)*sizeof(ErlDrvBinary *);
+
+ sigdp = erts_port_task_alloc_p2p_sig_data_extra(alloc_size, (void**)&ptr);
+
+ evp = (ErlIOVec *) ptr;
+ ivp = evp->iov = (SysIOVec *) (ptr + iov_offset);
+ bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset);
+
+ ivp[0].iov_base = NULL;
+ ivp[0].iov_len = 0;
+ bvp[0] = NULL;
+
+ if (bptr) {
+ ProcBin* pb = (ProcBin *) bptr;
+
+ ivp[1].iov_base = pb->bytes+offset;
+ ivp[1].iov_len = size;
+ bvp[1] = Binary2ErlDrvBinary(pb->val);
+
+ evp->vsize = 1;
+ } else {
+
+ evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
+ if (evp->vsize < 0) {
+ if (evp != &ev)
+ erts_free(ERTS_ALC_T_DRV_CMD_DATA, evp);
+ driver_free_binary(cbin);
+ goto bad_value;
+ }
+ }
+#if 0
+ /* This assertion may say something useful, but it can
+ be falsified during the emulator test suites. */
+ ASSERT(evp->vsize == vsize);
+#endif
+ evp->vsize++;
+ evp->size = size; /* total size */
+
+ /* Need to increase refc on all binaries */
+ for (i = 1; i < evp->vsize; i++)
+ if (bvp[i])
+ driver_binary_inc_refc(bvp[i]);
+
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
+ sigdp->u.outputv.from = from;
+ sigdp->u.outputv.evp = evp;
+ sigdp->u.outputv.cbinp = cbin;
+ port_sig_callback = port_sig_outputv;
+ } else {
+ ErlDrvSizeT ERTS_DECLARE_DUMMY(r);
+
+ /*
+ * Apperently there exist code that write 1 byte to
+ * much in buffer. Where it resides I don't know, but
+ * we can live with one byte extra allocated...
+ */
+
+ if (erts_iolist_size(list, &size))
+ goto bad_value;
+
+ buf = erts_alloc(ERTS_ALC_T_DRV_CMD_DATA, size + 1);
+
+ r = erts_iolist_to_buf(list, buf, size);
+ ASSERT(ERTS_IOLIST_TO_BUF_SUCCEEDED(r));
+
+ sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUT;
+ sigdp->u.output.from = from;
+ sigdp->u.output.bufp = buf;
+ sigdp->u.output.size = size;
+ port_sig_callback = port_sig_output;
+ }
+ sigdp->flags = 0;
+ ns_pthp = NULL;
+ task_flags = 0;
+
+ res = erts_schedule_proc2port_signal(NULL,
+ prt,
+ ERTS_INVALID_PID,
+ NULL,
+ sigdp,
+ task_flags,
+ ns_pthp,
+ port_sig_callback);
+
+ if (res != ERTS_PORT_OP_SCHEDULED) {
+ if (drv->outputv)
+ cleanup_scheduled_outputv(evp, cbin);
+ else
+ cleanup_scheduled_output(buf);
+ return 1;
+ }
+ return 1;
+
+bad_value:
+
+ /*
+ * We call badsig directly here as this function is called with
+ * the main lock of the calling process still held.
+ * At the moment this operation is always not a bang_op, so
+ * only an error_logger message should be generated, no badsig.
+ */
+
+ badsig_received(0, prt, erts_atomic32_read_nob(&prt->state), 1);
+
+ return 0;
+
+}
+
ErtsPortOpResult
erts_port_output(Process *c_p,
int flags,
@@ -1857,7 +2119,7 @@ erts_port_output(Process *c_p,
Eterm *refp)
{
ErtsPortOpResult res;
- ErtsProc2PortSigData *sigdp;
+ ErtsProc2PortSigData *sigdp = NULL;
erts_driver_t *drv = prt->drv_ptr;
size_t size;
int try_call;
@@ -1910,7 +2172,6 @@ erts_port_output(Process *c_p,
DTRACE4(port_command, process_str, port_str, prt->name, "command");
}
#endif
-
if (drv->outputv) {
ErlIOVec ev;
SysIOVec iv[SMALL_WRITE_VEC];
@@ -1939,10 +2200,13 @@ erts_port_output(Process *c_p,
evp = &ev;
}
else {
- char *ptr = erts_alloc((try_call
- ? ERTS_ALC_T_TMP
- : ERTS_ALC_T_DRV_CMD_DATA), alloc_size);
-
+ char *ptr;
+ if (try_call) {
+ ptr = erts_alloc(ERTS_ALC_T_TMP, alloc_size);
+ } else {
+ sigdp = erts_port_task_alloc_p2p_sig_data_extra(
+ alloc_size, (void**)&ptr);
+ }
evp = (ErlIOVec *) ptr;
ivp = evp->iov = (SysIOVec *) (ptr + iov_offset);
bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset);
@@ -1971,9 +2235,12 @@ erts_port_output(Process *c_p,
bvp[0] = NULL;
evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
if (evp->vsize < 0) {
- if (evp != &ev)
- erts_free(try_call ? ERTS_ALC_T_TMP : ERTS_ALC_T_DRV_CMD_DATA,
- evp);
+ if (evp != &ev) {
+ if (try_call)
+ erts_free(ERTS_ALC_T_TMP, evp);
+ else
+ erts_port_task_free_p2p_sig_data(sigdp);
+ }
driver_free_binary(cbin);
goto bad_value;
}
@@ -2025,8 +2292,10 @@ erts_port_output(Process *c_p,
/* Fall through... */
case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
driver_free_binary(cbin);
- if (evp != &ev)
+ if (evp != &ev) {
+ ASSERT(!sigdp);
erts_free(ERTS_ALC_T_TMP, evp);
+ }
if (try_call_res != ERTS_TRY_IMM_DRV_CALL_OK)
return ERTS_PORT_OP_DROPPED;
if (c_p)
@@ -2037,8 +2306,10 @@ erts_port_output(Process *c_p,
if (async_nosuspend
&& (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))) {
driver_free_binary(cbin);
- if (evp != &ev)
+ if (evp != &ev) {
+ ASSERT(!sigdp);
erts_free(ERTS_ALC_T_TMP, evp);
+ }
return ((sched_flags & ERTS_PTS_FLG_EXIT)
? ERTS_PORT_OP_DROPPED
: ERTS_PORT_OP_BUSY);
@@ -2053,9 +2324,16 @@ erts_port_output(Process *c_p,
if (bvp[i])
driver_binary_inc_refc(bvp[i]);
- new_evp = erts_alloc(ERTS_ALC_T_DRV_CMD_DATA, alloc_size);
+ /* The port task and iovec is allocated in the
+ same structure as an optimization. This
+ is especially important in erts_port_output_async
+ of when !try_call */
+ ASSERT(sigdp == NULL);
+ sigdp = erts_port_task_alloc_p2p_sig_data_extra(
+ alloc_size, (void**)&new_evp);
if (evp != &ev) {
+ /* Copy from TMP alloc to port task */
sys_memcpy((void *) new_evp, (void *) evp, alloc_size);
new_evp->iov = (SysIOVec *) (((char *) new_evp)
+ iov_offset);
@@ -2103,7 +2381,6 @@ erts_port_output(Process *c_p,
evp = new_evp;
}
- sigdp = erts_port_task_alloc_p2p_sig_data();
sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
sigdp->u.outputv.from = from;
sigdp->u.outputv.evp = evp;
@@ -2371,6 +2648,11 @@ erts_port_exit(Process *c_p,
| ERTS_PORT_SIG_FLG_BROKEN_LINK
| ERTS_PORT_SIG_FLG_FORCE_SCHED)) == 0);
+#ifndef __WIN32__
+ if (prt->drv_ptr == &forker_driver)
+ return ERTS_PORT_OP_DROPPED;
+#endif
+
if (!(flags & ERTS_PORT_SIG_FLG_FORCE_SCHED)) {
ErtsTryImmDrvCallState try_call_state
= ERTS_INIT_TRY_IMM_DRV_CALL_STATE(c_p,
@@ -2735,6 +3017,72 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
port_sig_link);
}
+static void
+init_ack_send_reply(Port *port, Eterm resp)
+{
+
+ if (!is_internal_port(resp)) {
+ Process *rp = erts_proc_lookup_raw(port->async_open_port->to);
+ erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
+ erts_remove_link(&ERTS_P_LINKS(port), port->async_open_port->to);
+ erts_remove_link(&ERTS_P_LINKS(rp), port->common.id);
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ }
+ port_sched_op_reply(port->async_open_port->to,
+ port->async_open_port->ref,
+ resp);
+
+ erts_free(ERTS_ALC_T_PRTSD, port->async_open_port);
+ port->async_open_port = NULL;
+}
+
+void
+erl_drv_init_ack(ErlDrvPort ix, ErlDrvData res) {
+ Port *port = erts_drvport2port(ix);
+ SWord err_type = (SWord)res;
+ Eterm resp;
+
+ if (port == ERTS_INVALID_ERL_DRV_PORT && port->async_open_port)
+ return;
+
+ if (port->async_open_port) {
+ switch(err_type) {
+ case -3:
+ resp = am_badarg;
+ break;
+ case -2: {
+ char *str = erl_errno_id(errno);
+ resp = erts_atom_put((byte *) str, strlen(str),
+ ERTS_ATOM_ENC_LATIN1, 1);
+ break;
+ }
+ case -1:
+ resp = am_einval;
+ break;
+ default:
+ resp = port->common.id;
+ break;
+ }
+
+ init_ack_send_reply(port, resp);
+
+ if (err_type == -1 || err_type == -2 || err_type == -3)
+ driver_failure_term(ix, am_normal, 0);
+ port->drv_data = err_type;
+ }
+}
+
+void
+erl_drv_set_os_pid(ErlDrvPort ix, ErlDrvSInt pid) {
+ Port *port = erts_drvport2port(ix);
+
+ if (port == ERTS_INVALID_ERL_DRV_PORT)
+ return;
+
+ port->os_pid = (SWord)pid;
+
+}
+
void erts_init_io(int port_tab_size,
int port_tab_size_ignore_files,
int legacy_port_tab)
@@ -2795,10 +3143,11 @@ void erts_init_io(int port_tab_size,
erts_smp_rwmtx_rwlock(&erts_driver_list_lock);
init_driver(&fd_driver, &fd_driver_entry, NULL);
-#ifndef __OSE__
init_driver(&vanilla_driver, &vanilla_driver_entry, NULL);
-#endif
init_driver(&spawn_driver, &spawn_driver_entry, NULL);
+#ifndef __WIN32__
+ init_driver(&forker_driver, &forker_driver_entry, NULL);
+#endif
erts_init_static_drivers();
for (dp = driver_tab; *dp != NULL; dp++)
erts_add_driver_entry(*dp, NULL, 1);
@@ -2860,6 +3209,9 @@ void erts_lcnt_enable_io_lock_count(int enable) {
lcnt_enable_drv_lock_count(&vanilla_driver, enable);
lcnt_enable_drv_lock_count(&spawn_driver, enable);
+#ifndef __WIN32__
+ lcnt_enable_drv_lock_count(&forker_driver, enable);
+#endif
lcnt_enable_drv_lock_count(&fd_driver, enable);
/* enable lock counting in all drivers */
for (dp = driver_list; dp; dp = dp->next) {
@@ -3055,16 +3407,17 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
if (rp) {
Eterm tuple;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
Eterm* hp;
Uint sz_res;
sz_res = size_object(res);
- hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks,
+ sz_res + 3, &hp, &ohp);
res = copy_struct(res, sz_res, &hp, ohp);
tuple = TUPLE2(hp, sender, res);
- erts_queue_message(rp, &rp_locks, bp, tuple, NIL);
+ erts_queue_message(rp, &rp_locks, mp, tuple, NIL);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -3092,7 +3445,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
Eterm tuple;
Process* rp;
Eterm* hp;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
@@ -3118,7 +3471,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
if (!rp)
return;
- hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp);
listp = NIL;
if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) {
@@ -3160,7 +3513,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -3234,7 +3587,7 @@ deliver_vec_message(Port* prt, /* Port */
Eterm tuple;
Process* rp;
Eterm* hp;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
@@ -3266,7 +3619,7 @@ deliver_vec_message(Port* prt, /* Port */
need += (hlen+csize)*2;
}
- hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp);
listp = NIL;
iov += vsize;
@@ -3327,7 +3680,7 @@ deliver_vec_message(Port* prt, /* Port */
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
erts_proc_dec_refc(rp);
@@ -3367,18 +3720,32 @@ static void flush_port(Port *p)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
if (p->drv_ptr->flush != NULL) {
+ ERTS_MSACC_PUSH_STATE_M();
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_flush)) {
DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p)
DTRACE3(driver_flush, process_str, port_str, p->name);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_flush)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(p), proc_str);
+ lttng_port_to_str(p, port_str);
+ LTTNG3(driver_flush, proc_str, port_str, p->name);
+ }
+#endif
+
+
if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(p, am_in, am_flush);
}
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
fpe_was_unmasked = erts_block_fpe();
(*p->drv_ptr->flush)((ErlDrvData)p->drv_data);
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(p, am_out, am_flush);
}
@@ -3402,6 +3769,7 @@ terminate_port(Port *prt)
Eterm connected_id = NIL /* Initialize to silence compiler */;
erts_driver_t *drv;
erts_aint32_t state;
+ ErtsPrtSD *psd;
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -3426,14 +3794,26 @@ terminate_port(Port *prt)
drv = prt->drv_ptr;
if ((drv != NULL) && (drv->stop != NULL)) {
int fpe_was_unmasked = erts_block_fpe();
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_stop)) {
DTRACE_FORMAT_COMMON_PID_AND_PORT(connected_id, prt)
DTRACE3(driver_stop, process_str, drv->name, port_str);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_stop)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(connected_id, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG3(driver_stop, proc_str, drv->name, port_str);
+ }
+#endif
+
(*drv->stop)((ErlDrvData)prt->drv_data);
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
#ifdef ERTS_SMP
if (prt->xports)
erts_port_handle_xports(prt);
@@ -3453,8 +3833,9 @@ terminate_port(Port *prt)
erts_cleanup_port_data(prt);
- if (prt->psd)
- erts_free(ERTS_ALC_T_PRTSD, prt->psd);
+ psd = (ErtsPrtSD *) erts_smp_atomic_read_nob(&prt->psd);
+ if (psd)
+ erts_free(ERTS_ALC_T_PRTSD, psd);
ASSERT(prt->dist_entry == NULL);
@@ -3744,6 +4125,7 @@ call_driver_control(Eterm caller,
ErlDrvSizeT *from_size)
{
ErlDrvSSizeT cres;
+ ERTS_MSACC_PUSH_STATE_M();
if (!prt->drv_ptr->control)
return ERTS_PORT_OP_BADARG;
@@ -3757,6 +4139,18 @@ call_driver_control(Eterm caller,
command, size);
}
#endif
+
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
+
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_control)) {
+ lttng_decl_procbuf(proc_str);
+ lttng_decl_portbuf(port_str);
+ lttng_pid_to_str(caller, proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG5(driver_control, proc_str, port_str, prt->name, command, size);
+ }
+#endif
prt->caller = caller;
cres = prt->drv_ptr->control((ErlDrvData) prt->drv_data,
@@ -3767,6 +4161,8 @@ call_driver_control(Eterm caller,
*from_size);
prt->caller = NIL;
+ ERTS_MSACC_POP_STATE_M();
+
if (cres < 0)
return ERTS_PORT_OP_BADARG;
@@ -3818,7 +4214,6 @@ write_port_control_result(int control_flags,
ErlDrvSizeT resp_size,
char *pre_alloc_buf,
Eterm **hpp,
- ErlHeapFragment *bp,
ErlOffHeap *ohp)
{
Eterm res;
@@ -3892,16 +4287,13 @@ port_sig_control(Port *prt,
if (res == ERTS_PORT_OP_DONE) {
Eterm msg;
- Eterm *hp;
- ErlHeapFragment *bp;
- ErlOffHeap *ohp;
ErtsHeapFactory factory;
Process *rp;
ErtsProcLocks rp_locks = 0;
Uint hsz, rsz;
int control_flags;
- rp = erts_proc_lookup_raw(sigdp->caller);
+ rp = sigdp->caller == ERTS_INVALID_PID ? NULL : erts_proc_lookup_raw(sigdp->caller);
if (!rp)
goto done;
@@ -3914,22 +4306,15 @@ port_sig_control(Port *prt,
hsz = rsz + ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
+ (void) erts_factory_message_create(&factory, rp,
+ &rp_locks, hsz);
msg = write_port_control_result(control_flags,
resp_bufp,
resp_size,
&resp_buf[0],
- &hp,
- bp,
- ohp);
- factory.hp = hp;
-
+ &factory.hp,
+ factory.off_heap);
queue_port_sched_op_reply(rp,
&rp_locks,
&factory,
@@ -3944,7 +4329,8 @@ port_sig_control(Port *prt,
/* failure */
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
+ if (sigdp->caller != ERTS_INVALID_PID)
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
done:
@@ -3954,6 +4340,23 @@ done:
return ERTS_PORT_REDS_CONTROL;
}
+/*
+ * This is an asynchronous control call. I.e. it will not return anything
+ * to the caller.
+ */
+int
+erl_drv_port_control(Eterm port_num, char cmd, char* buff, ErlDrvSizeT size)
+{
+ ErtsProc2PortSigData *sigdp = erts_port_task_alloc_p2p_sig_data();
+
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL | ERTS_P2P_SIG_DATA_FLG_REPLY;
+ sigdp->u.control.binp = NULL;
+ sigdp->u.control.command = cmd;
+ sigdp->u.control.bufp = buff;
+ sigdp->u.control.size = size;
+
+ return erts_schedule_port2port_signal(port_num, sigdp, 0, port_sig_control);
+}
ErtsPortOpResult
erts_port_control(Process* c_p,
@@ -4070,7 +4473,6 @@ erts_port_control(Process* c_p,
resp_size,
&resp_buf[0],
&hp,
- NULL,
&c_p->off_heap);
BUMP_REDS(c_p, ERTS_PORT_REDS_CONTROL);
return ERTS_PORT_OP_DONE;
@@ -4092,10 +4494,10 @@ erts_port_control(Process* c_p,
binp = NULL;
if (is_binary(data) && binary_bitoffset(data) == 0) {
- Eterm *ebinp = binary_val_rel(data, NULL);
+ Eterm *ebinp = binary_val(data);
ASSERT(!tmp_alloced);
if (*ebinp == HEADER_SUB_BIN)
- ebinp = binary_val_rel(((ErlSubBin *) ebinp)->orig, NULL);
+ ebinp = binary_val(((ErlSubBin *) ebinp)->orig);
if (*ebinp != HEADER_PROC_BIN)
copy = 1;
else {
@@ -4148,6 +4550,7 @@ call_driver_call(Eterm caller,
unsigned *ret_flagsp)
{
ErlDrvSSizeT cres;
+ ERTS_MSACC_PUSH_STATE_M();
if (!prt->drv_ptr->call)
return ERTS_PORT_OP_BADARG;
@@ -4162,6 +4565,17 @@ call_driver_call(Eterm caller,
DTRACE5(driver_call, process_str, port_str, prt->name, command, size);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_call)) {
+ lttng_decl_procbuf(proc_str);
+ lttng_decl_portbuf(port_str);
+ lttng_pid_to_str(caller,proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG5(driver_call, proc_str, port_str, prt->name, command, size);
+ }
+#endif
+
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
prt->caller = caller;
cres = prt->drv_ptr->call((ErlDrvData) prt->drv_data,
@@ -4173,6 +4587,8 @@ call_driver_call(Eterm caller,
ret_flagsp);
prt->caller = NIL;
+ ERTS_MSACC_POP_STATE_M();
+
if (cres <= 0
|| ((byte) (*resp_bufp)[0]) != VERSION_MAGIC)
return ERTS_PORT_OP_BADARG;
@@ -4229,22 +4645,15 @@ port_sig_call(Port *prt,
hsz = erts_decode_ext_size((byte *) resp_bufp, resp_size);
if (hsz >= 0) {
- ErlHeapFragment* bp;
- ErlOffHeap* ohp;
ErtsHeapFactory factory;
byte *endp;
hsz += 3; /* ok tuple */
hsz += ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
+ (void) erts_factory_message_create(&factory, rp, &rp_locks, hsz);
endp = (byte *) resp_bufp;
- erts_factory_message_init(&factory, rp, hp, bp);
- msg = erts_decode_ext(&factory, &endp);
+ msg = erts_decode_ext(&factory, &endp, 0);
if (is_value(msg)) {
hp = erts_produce_heap(&factory,
3,
@@ -4363,7 +4772,7 @@ erts_port_call(Process* c_p,
hsz += 3;
erts_factory_proc_prealloc_init(&factory, c_p, hsz);
endp = (byte *) resp_bufp;
- term = erts_decode_ext(&factory, &endp);
+ term = erts_decode_ext(&factory, &endp, 0);
if (term == THE_NON_VALUE)
return ERTS_PORT_OP_BADARG;
hp = erts_produce_heap(&factory,3,0);
@@ -4504,7 +4913,9 @@ port_sig_info(Port *prt,
sigdp->u.info.item);
if (is_value(value)) {
ErtsHeapFactory factory;
- erts_factory_message_init(&factory, NULL, hp, bp);
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = bp;
+ erts_factory_selfcontained_message_init(&factory, mp, hp);
queue_port_sched_op_reply(rp,
&rp_locks,
&factory,
@@ -4592,8 +5003,8 @@ reply_io_bytes(void *vreq)
rp = erts_proc_lookup(req->pid);
if (rp) {
- ErlOffHeap *ohp = NULL;
- ErlHeapFragment *bp = NULL;
+ ErlOffHeap *ohp;
+ ErtsMessage *mp;
ErtsProcLocks rp_locks;
Eterm ref, msg, ein, eout, *hp;
Uint64 in, out;
@@ -4615,7 +5026,7 @@ reply_io_bytes(void *vreq)
erts_bld_uint64(NULL, &hsz, in);
erts_bld_uint64(NULL, &hsz, out);
- hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp);
ref = make_internal_ref(hp);
write_ref_thing(hp, req->refn[0], req->refn[1], req->refn[2]);
@@ -4625,7 +5036,7 @@ reply_io_bytes(void *vreq)
eout = erts_bld_uint64(&hp, NULL, out);
msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout);
- erts_queue_message(rp, &rp_locks, bp, msg, NIL);
+ erts_queue_message(rp, &rp_locks, mp, msg, NIL);
if (req->sched_id == sched_id)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -4731,6 +5142,10 @@ print_port_info(Port *p, int to, void *arg)
erts_print(to, arg, "Port is a file: %s\n",p->name);
} else if (p->drv_ptr == &spawn_driver) {
erts_print(to, arg, "Port controls external process: %s\n",p->name);
+#ifndef __WIN32__
+ } else if (p->drv_ptr == &forker_driver) {
+ erts_print(to, arg, "Port controls forker process: %s\n",p->name);
+#endif
} else {
erts_print(to, arg, "Port controls linked-in driver: %s\n",p->name);
}
@@ -4877,6 +5292,7 @@ int get_port_flags(ErlDrvPort ix)
void erts_raw_port_command(Port* p, byte* buf, Uint len)
{
int fpe_was_unmasked;
+ ERTS_MSACC_PUSH_STATE_M();
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
@@ -4897,9 +5313,11 @@ void erts_raw_port_command(Port* p, byte* buf, Uint len)
DTRACE4(driver_output, "-raw-", port_str, p->name, len);
}
#endif
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
fpe_was_unmasked = erts_block_fpe();
(*p->drv_ptr->output)((ErlDrvData)p->drv_data, (char*) buf, (int) len);
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
}
int async_ready(Port *p, void* data)
@@ -4911,14 +5329,25 @@ int async_ready(Port *p, void* data)
if (p) {
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
if (p->drv_ptr->ready_async != NULL) {
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_ready_async)) {
DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p)
DTRACE3(driver_ready_async, process_str, port_str, p->name);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_ready_async)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(p), proc_str);
+ lttng_port_to_str(p, port_str);
+ LTTNG3(driver_ready_async, proc_str, port_str, p->name);
+ }
+#endif
(*p->drv_ptr->ready_async)((ErlDrvData)p->drv_data, data);
need_free = 0;
+ ERTS_MSACC_POP_STATE_M();
}
erts_port_driver_callback_epilogue(p, NULL);
@@ -5070,11 +5499,11 @@ ErlDrvTermData driver_mk_term_nil(void)
void driver_report_exit(ErlDrvPort ix, int status)
{
Eterm* hp;
+ ErlOffHeap *ohp;
Eterm tuple;
Process *rp;
Eterm pid;
- ErlHeapFragment *bp = NULL;
- ErlOffHeap *ohp;
+ ErtsMessage *mp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
Port* prt = erts_drvport2port(ix);
@@ -5094,13 +5523,13 @@ void driver_report_exit(ErlDrvPort ix, int status)
if (!rp)
return;
- hp = erts_alloc_message_heap(3+3, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, 3+3, &hp, &ohp);
tuple = TUPLE2(hp, am_exit_status, make_small(status));
hp += 3;
tuple = TUPLE2(hp, prt->common.id, tuple);
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -5210,7 +5639,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
ErtsProcLocks rp_locks = 0;
struct b2t_states__ b2t;
int scheduler;
- int is_heap_need_limited = 1;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_UNDEF(mess,NIL);
@@ -5249,25 +5677,17 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
break;
case ERL_DRV_INT: /* signed int argument */
ERTS_DDT_CHK_ENOUGH_ARGS(1);
-#if HALFWORD_HEAP
- erts_bld_sint64(NULL, &need, (Sint64)ptr[0]);
-#else
/* check for bignum */
if (!IS_SSMALL((Sint)ptr[0]))
need += BIG_UINT_HEAP_SIZE; /* use small_to_big */
-#endif
ptr++;
depth++;
break;
case ERL_DRV_UINT: /* unsigned int argument */
ERTS_DDT_CHK_ENOUGH_ARGS(1);
-#if HALFWORD_HEAP
- erts_bld_uint64(NULL, &need, (Uint64)ptr[0]);
-#else
/* check for bignum */
if (!IS_USMALL(0, (Uint)ptr[0]))
need += BIG_UINT_HEAP_SIZE; /* use small_to_big */
-#endif
ptr++;
depth++;
break;
@@ -5387,9 +5807,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
need += hsz;
ptr += 2;
depth++;
- if (size > MAP_SMALL_MAP_LIMIT*3) { /* may contain big map */
- is_heap_need_limited = 0;
- }
break;
}
case ERL_DRV_MAP: { /* int */
@@ -5397,7 +5814,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
if ((int) ptr[0] < 0) ERTS_DDT_FAIL;
if (ptr[0] > MAP_SMALL_MAP_LIMIT) {
need += HASHMAP_ESTIMATED_HEAP_SIZE(ptr[0]);
- is_heap_need_limited = 0;
} else {
need += MAP_HEADER_FLATMAP_SZ + 1 + 2*ptr[0];
}
@@ -5436,17 +5852,7 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
goto done;
}
- /* Try copy directly to destination heap if we know there are no big maps */
- if (is_heap_need_limited) {
- ErlOffHeap *ohp;
- ErlHeapFragment* bp;
- Eterm* hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
- }
- else {
- erts_factory_message_init(&factory, NULL, NULL,
- new_message_buffer(need));
- }
+ (void) erts_factory_message_create(&factory, rp, &rp_locks, need);
/*
* Interpret the instructions and build the term.
@@ -5466,10 +5872,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
break;
case ERL_DRV_INT: /* signed int argument */
-#if HALFWORD_HEAP
- erts_reserve_heap(&factory, BIG_NEED_SIZE(2));
- mess = erts_bld_sint64(&factory.hp, NULL, (Sint64)ptr[0]);
-#else
erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE);
if (IS_SSMALL((Sint)ptr[0]))
mess = make_small((Sint)ptr[0]);
@@ -5477,15 +5879,10 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
mess = small_to_big((Sint)ptr[0], factory.hp);
factory.hp += BIG_UINT_HEAP_SIZE;
}
-#endif
ptr++;
break;
case ERL_DRV_UINT: /* unsigned int argument */
-#if HALFWORD_HEAP
- erts_reserve_heap(&factory, BIG_NEED_FOR_BITS(64));
- mess = erts_bld_uint64(&factory.hp, NULL, (Uint64)ptr[0]);
-#else
erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE);
if (IS_USMALL(0, (Uint)ptr[0]))
mess = make_small((Uint)ptr[0]);
@@ -5493,7 +5890,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
mess = uint_to_big((Uint)ptr[0], factory.hp);
factory.hp += BIG_UINT_HEAP_SIZE;
}
-#endif
ptr++;
break;
@@ -5725,9 +6121,9 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
if (res > 0) {
mess = ESTACK_POP(stack); /* get resulting value */
- erts_factory_close(&factory);
+ erts_factory_trim_and_close(&factory, &mess, 1);
/* send message */
- erts_queue_message(rp, &rp_locks, factory.heap_frags, mess, am_undefined);
+ erts_queue_message(rp, &rp_locks, factory.message, mess, am_undefined);
}
else {
if (b2t.ix > b2t.used)
@@ -6832,7 +7228,7 @@ int driver_monitor_process(ErlDrvPort drvport,
{
Port *prt;
int ret;
-#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -6843,16 +7239,6 @@ int driver_monitor_process(ErlDrvPort drvport,
/* Now (in SMP) we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
-
-#if !HEAP_ON_C_STACK
- if (!sched) {
- /* Need a separate allocation for the ref :( */
- Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
- sizeof(Eterm)*REF_THING_SIZE);
- ret = do_driver_monitor_process(prt,buf,process,monitor);
- erts_free(ERTS_ALC_T_TEMP_TERM,buf);
- } else
-#endif
{
DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
UseTmpHeapNoproc(REF_THING_SIZE);
@@ -6905,7 +7291,7 @@ int driver_demonitor_process(ErlDrvPort drvport,
{
Port *prt;
int ret;
-#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -6916,15 +7302,6 @@ int driver_demonitor_process(ErlDrvPort drvport,
/* Now we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
-#if !HEAP_ON_C_STACK
- if (!sched) {
- /* Need a separate allocation for the ref :( */
- Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
- sizeof(Eterm)*REF_THING_SIZE);
- ret = do_driver_demonitor_process(prt,buf,monitor);
- erts_free(ERTS_ALC_T_TEMP_TERM,buf);
- } else
-#endif
{
DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
UseTmpHeapNoproc(REF_THING_SIZE);
@@ -6960,7 +7337,7 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport,
{
Port *prt;
ErlDrvTermData ret;
-#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -6971,16 +7348,6 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport,
/* Now we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
-
-#if !HEAP_ON_C_STACK
- if (!sched) {
- /* Need a separate allocation for the ref :( */
- Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
- sizeof(Eterm)*REF_THING_SIZE);
- ret = do_driver_get_monitored_process(prt,buf,monitor);
- erts_free(ERTS_ALC_T_TEMP_TERM,buf);
- } else
-#endif
{
DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
UseTmpHeapNoproc(REF_THING_SIZE);
@@ -7004,6 +7371,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
void (*callback)(ErlDrvData drv_data, ErlDrvMonitor *monitor);
ErlDrvMonitor drv_monitor;
int fpe_was_unmasked;
+ ERTS_MSACC_PUSH_STATE_M();
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
ASSERT(prt->drv_ptr != NULL);
@@ -7015,6 +7383,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
callback = prt->drv_ptr->process_exit;
ASSERT(callback != NULL);
ref_to_driver_monitor(ref,&drv_monitor);
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
DRV_MONITOR_UNLOCK_PDL(prt);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_process_exit)) {
@@ -7022,10 +7391,20 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
DTRACE3(driver_process_exit, process_str, port_str, prt->name);
}
#endif
+#ifdef USE_LTTNG_VM_TRACEPOINTS
+ if (LTTNG_ENABLED(driver_process_exit)) {
+ lttng_decl_portbuf(port_str);
+ lttng_decl_procbuf(proc_str);
+ lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(prt), proc_str);
+ lttng_port_to_str(prt, port_str);
+ LTTNG3(driver_process_exit, proc_str, port_str, prt->name);
+ }
+#endif
fpe_was_unmasked = erts_block_fpe();
(*callback)((ErlDrvData) (prt->drv_data), &drv_monitor);
erts_unblock_fpe(fpe_was_unmasked);
DRV_MONITOR_LOCK_PDL(prt);
+ ERTS_MSACC_POP_STATE_M();
/* remove monitor *after* callback */
rmon = erts_remove_monitor(&ERTS_P_MONITORS(prt), ref);
DRV_MONITOR_UNLOCK_PDL(prt);
@@ -7046,6 +7425,9 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ if (prt->async_open_port)
+ init_ack_send_reply(prt, prt->common.id);
if (eof)
flush_linebuf_messages(prt, state);
if (state & ERTS_PORT_SFLG_CLOSING) {
@@ -7495,6 +7877,8 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
int fpe_was_unmasked = erts_block_fpe();
DTRACE4(driver_init, drv->name, drv->version.major, drv->version.minor,
drv->flags);
+ LTTNG4(driver_init, drv->name, drv->version.major, drv->version.minor,
+ drv->flags);
res = (*de->init)();
erts_unblock_fpe(fpe_was_unmasked);
return res;