aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c389
1 files changed, 248 insertions, 141 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index e8b25d42a2..797e4cdadc 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -50,12 +50,14 @@
#include "erl_map.h"
#include "erl_bif_unique.h"
#include "erl_hl_timer.h"
+#include "erl_time.h"
extern ErlDrvEntry fd_driver_entry;
-#ifndef __OSE__
extern ErlDrvEntry vanilla_driver_entry;
-#endif
extern ErlDrvEntry spawn_driver_entry;
+#ifndef __WIN32__
+extern ErlDrvEntry forker_driver_entry;
+#endif
extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */
erts_driver_t *driver_list; /* List of all drivers, static and dynamic. */
@@ -73,6 +75,9 @@ const Port erts_invalid_port = {{ERTS_INVALID_PORT}};
erts_driver_t vanilla_driver;
erts_driver_t spawn_driver;
+#ifndef __WIN32__
+erts_driver_t forker_driver;
+#endif
erts_driver_t fd_driver;
int erts_port_synchronous_ops = 0;
@@ -86,6 +91,7 @@ static void deliver_result(Eterm sender, Eterm pid, Eterm res);
static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *);
static void terminate_port(Port *p);
static void pdl_init(void);
+static int driver_failure_term(ErlDrvPort ix, Eterm term, int eof);
#ifdef ERTS_SMP
static void driver_monitor_lock_pdl(Port *p);
static void driver_monitor_unlock_pdl(Port *p);
@@ -307,12 +313,9 @@ static Port *create_port(char *name,
size_t port_size, busy_port_queue_size, size;
erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED;
erts_aint32_t x_pts_flgs = 0;
-#ifdef DEBUG
- /* Make sure the debug flags survives until port is freed */
- state |= ERTS_PORT_SFLG_PORT_DEBUG;
-#endif
#ifdef ERTS_SMP
+ ErtsRunQueue *runq;
if (!driver_lock) {
/* Align size for mutex following port struct */
port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
@@ -322,6 +325,12 @@ static Port *create_port(char *name,
#endif
port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
+#ifdef DEBUG
+ /* Make sure the debug flags survives until port is freed */
+ state |= ERTS_PORT_SFLG_PORT_DEBUG;
+#endif
+
+
busy_port_queue_size
= ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ)
? 0
@@ -357,8 +366,12 @@ static Port *create_port(char *name,
p += sizeof(erts_mtx_t);
state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
}
- erts_smp_atomic_set_nob(&prt->run_queue,
- (erts_aint_t) erts_get_runq_current(NULL));
+ if (erts_get_scheduler_data())
+ runq = erts_get_runq_current(NULL);
+ else
+ runq = ERTS_RUNQ_IX(0);
+ erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq);
+
prt->xports = NULL;
#else
erts_atomic32_init_nob(&prt->refc, 1);
@@ -385,6 +398,7 @@ static Port *create_port(char *name,
ERTS_PTMR_INIT(prt);
erts_port_task_handle_init(&prt->timeout_task);
prt->psd = NULL;
+ prt->async_open_port = NULL;
prt->drv_data = (SWord) 0;
prt->os_pid = -1;
@@ -466,6 +480,11 @@ erts_port_free(Port *prt)
erts_port_task_fini_sched(&prt->sched);
+ if (prt->async_open_port) {
+ erts_free(ERTS_ALC_T_PRTSD, prt->async_open_port);
+ prt->async_open_port = NULL;
+ }
+
#ifdef ERTS_SMP
ASSERT(prt->lock);
if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK)
@@ -687,6 +706,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
error_number = error_type = 0;
if (driver->start) {
+ ERTS_MSACC_PUSH_STATE_M();
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(port, am_in, am_start);
}
@@ -697,6 +717,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
DTRACE3(driver_start, process_str, driver->name, port_str);
}
#endif
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
fpe_was_unmasked = erts_block_fpe();
drv_data = (*driver->start)(ERTS_Port2ErlDrvPort(port), name, opts);
if (((SWord) drv_data) == -1)
@@ -716,6 +737,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
}
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
port->caller = NIL;
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(port, am_out, am_start);
@@ -1105,7 +1127,7 @@ io_list_vec_len(Eterm obj, int* vsize, Uint* csize,
Uint p_v_size = 0;
Uint p_c_size = 0;
Uint p_in_clist = 0;
- Uint total; /* Uint due to halfword emulator */
+ Uint total;
goto L_jump_start; /* avoid a push */
@@ -1412,7 +1434,7 @@ queue_port_sched_op_reply(Process *rp,
erts_factory_trim_and_close(factory, &msg, 1);
- erts_queue_message(rp, rp_locksp, factory->heap_frags, msg, NIL);
+ erts_queue_message(rp, rp_locksp, factory->message, msg, NIL);
}
static void
@@ -1420,12 +1442,9 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
{
Process *rp = erts_proc_lookup_raw(to);
if (rp) {
- ErlOffHeap *ohp;
- ErlHeapFragment* bp;
ErtsHeapFactory factory;
Eterm msg_copy;
Uint hsz, msg_sz;
- Eterm *hp;
ErtsProcLocks rp_locks = 0;
hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
@@ -1436,18 +1455,13 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
hsz += msg_sz;
}
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
- if (is_immed(msg))
- msg_copy = msg;
- else {
- msg_copy = copy_struct(msg, msg_sz, &hp, ohp);
- factory.hp = hp;
- }
+ (void) erts_factory_message_create(&factory, rp,
+ &rp_locks, hsz);
+ msg_copy = (is_immed(msg)
+ ? msg
+ : copy_struct(msg, msg_sz,
+ &factory.hp,
+ factory.off_heap));
queue_port_sched_op_reply(rp,
&rp_locks,
@@ -1535,6 +1549,26 @@ erts_schedule_proc2port_signal(Process *c_p,
return ERTS_PORT_OP_SCHEDULED;
}
+static int
+erts_schedule_port2port_signal(Eterm port_num, ErtsProc2PortSigData *sigdp,
+ int task_flags,
+ ErtsProc2PortSigCallback callback)
+{
+ Port *prt = erts_port_lookup_raw(port_num);
+
+ if (!prt)
+ return -1;
+
+ sigdp->caller = ERTS_INVALID_PID;
+
+ return erts_port_task_schedule(prt->common.id,
+ NULL,
+ ERTS_PORT_TASK_PROC_SIG,
+ sigdp,
+ callback,
+ task_flags);
+}
+
static ERTS_INLINE void
send_badsig(Port *prt) {
ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
@@ -1679,6 +1713,7 @@ call_driver_outputv(int bang_op,
else {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ErlDrvSizeT size = evp->size;
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
@@ -1699,6 +1734,8 @@ call_driver_outputv(int bang_op,
esdp->io.out += (Uint64) size;
else
erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size);
+
+ ERTS_MSACC_POP_STATE_M();
}
}
@@ -1779,6 +1816,7 @@ call_driver_output(int bang_op,
send_badsig(prt);
else {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
@@ -1798,6 +1836,8 @@ call_driver_output(int bang_op,
esdp->io.out += (Uint64) size;
else
erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size);
+
+ ERTS_MSACC_POP_STATE_M();
}
}
@@ -2370,6 +2410,11 @@ erts_port_exit(Process *c_p,
| ERTS_PORT_SIG_FLG_BROKEN_LINK
| ERTS_PORT_SIG_FLG_FORCE_SCHED)) == 0);
+#ifndef __WIN32__
+ if (prt->drv_ptr == &forker_driver)
+ return ERTS_PORT_OP_DROPPED;
+#endif
+
if (!(flags & ERTS_PORT_SIG_FLG_FORCE_SCHED)) {
ErtsTryImmDrvCallState try_call_state
= ERTS_INIT_TRY_IMM_DRV_CALL_STATE(c_p,
@@ -2734,6 +2779,72 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
port_sig_link);
}
+static void
+init_ack_send_reply(Port *port, Eterm resp)
+{
+
+ if (!is_internal_port(resp)) {
+ Process *rp = erts_proc_lookup_raw(port->async_open_port->to);
+ erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
+ erts_remove_link(&ERTS_P_LINKS(port), port->async_open_port->to);
+ erts_remove_link(&ERTS_P_LINKS(rp), port->common.id);
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ }
+ port_sched_op_reply(port->async_open_port->to,
+ port->async_open_port->ref,
+ resp);
+
+ erts_free(ERTS_ALC_T_PRTSD, port->async_open_port);
+ port->async_open_port = NULL;
+}
+
+void
+erl_drv_init_ack(ErlDrvPort ix, ErlDrvData res) {
+ Port *port = erts_drvport2port(ix);
+ SWord err_type = (SWord)res;
+ Eterm resp;
+
+ if (port == ERTS_INVALID_ERL_DRV_PORT && port->async_open_port)
+ return;
+
+ if (port->async_open_port) {
+ switch(err_type) {
+ case -3:
+ resp = am_badarg;
+ break;
+ case -2: {
+ char *str = erl_errno_id(errno);
+ resp = erts_atom_put((byte *) str, strlen(str),
+ ERTS_ATOM_ENC_LATIN1, 1);
+ break;
+ }
+ case -1:
+ resp = am_einval;
+ break;
+ default:
+ resp = port->common.id;
+ break;
+ }
+
+ init_ack_send_reply(port, resp);
+
+ if (err_type == -1 || err_type == -2 || err_type == -3)
+ driver_failure_term(ix, am_normal, 0);
+ port->drv_data = err_type;
+ }
+}
+
+void
+erl_drv_set_os_pid(ErlDrvPort ix, ErlDrvSInt pid) {
+ Port *port = erts_drvport2port(ix);
+
+ if (port == ERTS_INVALID_ERL_DRV_PORT)
+ return;
+
+ port->os_pid = (SWord)pid;
+
+}
+
void erts_init_io(int port_tab_size,
int port_tab_size_ignore_files,
int legacy_port_tab)
@@ -2794,10 +2905,11 @@ void erts_init_io(int port_tab_size,
erts_smp_rwmtx_rwlock(&erts_driver_list_lock);
init_driver(&fd_driver, &fd_driver_entry, NULL);
-#ifndef __OSE__
init_driver(&vanilla_driver, &vanilla_driver_entry, NULL);
-#endif
init_driver(&spawn_driver, &spawn_driver_entry, NULL);
+#ifndef __WIN32__
+ init_driver(&forker_driver, &forker_driver_entry, NULL);
+#endif
erts_init_static_drivers();
for (dp = driver_tab; *dp != NULL; dp++)
erts_add_driver_entry(*dp, NULL, 1);
@@ -2859,6 +2971,9 @@ void erts_lcnt_enable_io_lock_count(int enable) {
lcnt_enable_drv_lock_count(&vanilla_driver, enable);
lcnt_enable_drv_lock_count(&spawn_driver, enable);
+#ifndef __WIN32__
+ lcnt_enable_drv_lock_count(&forker_driver, enable);
+#endif
lcnt_enable_drv_lock_count(&fd_driver, enable);
/* enable lock counting in all drivers */
for (dp = driver_list; dp; dp = dp->next) {
@@ -3054,16 +3169,17 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
if (rp) {
Eterm tuple;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
Eterm* hp;
Uint sz_res;
sz_res = size_object(res);
- hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks,
+ sz_res + 3, &hp, &ohp);
res = copy_struct(res, sz_res, &hp, ohp);
tuple = TUPLE2(hp, sender, res);
- erts_queue_message(rp, &rp_locks, bp, tuple, NIL);
+ erts_queue_message(rp, &rp_locks, mp, tuple, NIL);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -3091,7 +3207,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
Eterm tuple;
Process* rp;
Eterm* hp;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
@@ -3117,7 +3233,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
if (!rp)
return;
- hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp);
listp = NIL;
if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) {
@@ -3159,7 +3275,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -3233,7 +3349,7 @@ deliver_vec_message(Port* prt, /* Port */
Eterm tuple;
Process* rp;
Eterm* hp;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
@@ -3265,7 +3381,7 @@ deliver_vec_message(Port* prt, /* Port */
need += (hlen+csize)*2;
}
- hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp);
listp = NIL;
iov += vsize;
@@ -3326,7 +3442,7 @@ deliver_vec_message(Port* prt, /* Port */
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
erts_proc_dec_refc(rp);
@@ -3366,6 +3482,7 @@ static void flush_port(Port *p)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
if (p->drv_ptr->flush != NULL) {
+ ERTS_MSACC_PUSH_STATE_M();
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_flush)) {
DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p)
@@ -3375,9 +3492,11 @@ static void flush_port(Port *p)
if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(p, am_in, am_flush);
}
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
fpe_was_unmasked = erts_block_fpe();
(*p->drv_ptr->flush)((ErlDrvData)p->drv_data);
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(p, am_out, am_flush);
}
@@ -3425,6 +3544,7 @@ terminate_port(Port *prt)
drv = prt->drv_ptr;
if ((drv != NULL) && (drv->stop != NULL)) {
int fpe_was_unmasked = erts_block_fpe();
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_stop)) {
DTRACE_FORMAT_COMMON_PID_AND_PORT(connected_id, prt)
@@ -3433,6 +3553,7 @@ terminate_port(Port *prt)
#endif
(*drv->stop)((ErlDrvData)prt->drv_data);
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
#ifdef ERTS_SMP
if (prt->xports)
erts_port_handle_xports(prt);
@@ -3743,6 +3864,7 @@ call_driver_control(Eterm caller,
ErlDrvSizeT *from_size)
{
ErlDrvSSizeT cres;
+ ERTS_MSACC_PUSH_STATE_M();
if (!prt->drv_ptr->control)
return ERTS_PORT_OP_BADARG;
@@ -3756,6 +3878,8 @@ call_driver_control(Eterm caller,
command, size);
}
#endif
+
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
prt->caller = caller;
cres = prt->drv_ptr->control((ErlDrvData) prt->drv_data,
@@ -3766,6 +3890,8 @@ call_driver_control(Eterm caller,
*from_size);
prt->caller = NIL;
+ ERTS_MSACC_POP_STATE_M();
+
if (cres < 0)
return ERTS_PORT_OP_BADARG;
@@ -3817,7 +3943,6 @@ write_port_control_result(int control_flags,
ErlDrvSizeT resp_size,
char *pre_alloc_buf,
Eterm **hpp,
- ErlHeapFragment *bp,
ErlOffHeap *ohp)
{
Eterm res;
@@ -3891,16 +4016,13 @@ port_sig_control(Port *prt,
if (res == ERTS_PORT_OP_DONE) {
Eterm msg;
- Eterm *hp;
- ErlHeapFragment *bp;
- ErlOffHeap *ohp;
ErtsHeapFactory factory;
Process *rp;
ErtsProcLocks rp_locks = 0;
Uint hsz, rsz;
int control_flags;
- rp = erts_proc_lookup_raw(sigdp->caller);
+ rp = sigdp->caller == ERTS_INVALID_PID ? NULL : erts_proc_lookup_raw(sigdp->caller);
if (!rp)
goto done;
@@ -3913,22 +4035,15 @@ port_sig_control(Port *prt,
hsz = rsz + ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
+ (void) erts_factory_message_create(&factory, rp,
+ &rp_locks, hsz);
msg = write_port_control_result(control_flags,
resp_bufp,
resp_size,
&resp_buf[0],
- &hp,
- bp,
- ohp);
- factory.hp = hp;
-
+ &factory.hp,
+ factory.off_heap);
queue_port_sched_op_reply(rp,
&rp_locks,
&factory,
@@ -3943,7 +4058,8 @@ port_sig_control(Port *prt,
/* failure */
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
+ if (sigdp->caller != ERTS_INVALID_PID)
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
done:
@@ -3953,6 +4069,23 @@ done:
return ERTS_PORT_REDS_CONTROL;
}
+/*
+ * This is an asynchronous control call. I.e. it will not return anything
+ * to the caller.
+ */
+int
+erl_drv_port_control(Eterm port_num, char cmd, char* buff, ErlDrvSizeT size)
+{
+ ErtsProc2PortSigData *sigdp = erts_port_task_alloc_p2p_sig_data();
+
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL | ERTS_P2P_SIG_DATA_FLG_REPLY;
+ sigdp->u.control.binp = NULL;
+ sigdp->u.control.command = cmd;
+ sigdp->u.control.bufp = buff;
+ sigdp->u.control.size = size;
+
+ return erts_schedule_port2port_signal(port_num, sigdp, 0, port_sig_control);
+}
ErtsPortOpResult
erts_port_control(Process* c_p,
@@ -4069,7 +4202,6 @@ erts_port_control(Process* c_p,
resp_size,
&resp_buf[0],
&hp,
- NULL,
&c_p->off_heap);
BUMP_REDS(c_p, ERTS_PORT_REDS_CONTROL);
return ERTS_PORT_OP_DONE;
@@ -4091,10 +4223,10 @@ erts_port_control(Process* c_p,
binp = NULL;
if (is_binary(data) && binary_bitoffset(data) == 0) {
- Eterm *ebinp = binary_val_rel(data, NULL);
+ Eterm *ebinp = binary_val(data);
ASSERT(!tmp_alloced);
if (*ebinp == HEADER_SUB_BIN)
- ebinp = binary_val_rel(((ErlSubBin *) ebinp)->orig, NULL);
+ ebinp = binary_val(((ErlSubBin *) ebinp)->orig);
if (*ebinp != HEADER_PROC_BIN)
copy = 1;
else {
@@ -4147,6 +4279,7 @@ call_driver_call(Eterm caller,
unsigned *ret_flagsp)
{
ErlDrvSSizeT cres;
+ ERTS_MSACC_PUSH_STATE_M();
if (!prt->drv_ptr->call)
return ERTS_PORT_OP_BADARG;
@@ -4162,6 +4295,8 @@ call_driver_call(Eterm caller,
}
#endif
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
+
prt->caller = caller;
cres = prt->drv_ptr->call((ErlDrvData) prt->drv_data,
command,
@@ -4172,6 +4307,8 @@ call_driver_call(Eterm caller,
ret_flagsp);
prt->caller = NIL;
+ ERTS_MSACC_POP_STATE_M();
+
if (cres <= 0
|| ((byte) (*resp_bufp)[0]) != VERSION_MAGIC)
return ERTS_PORT_OP_BADARG;
@@ -4228,21 +4365,14 @@ port_sig_call(Port *prt,
hsz = erts_decode_ext_size((byte *) resp_bufp, resp_size);
if (hsz >= 0) {
- ErlHeapFragment* bp;
- ErlOffHeap* ohp;
ErtsHeapFactory factory;
byte *endp;
hsz += 3; /* ok tuple */
hsz += ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
+ (void) erts_factory_message_create(&factory, rp, &rp_locks, hsz);
endp = (byte *) resp_bufp;
- erts_factory_message_init(&factory, rp, hp, bp);
msg = erts_decode_ext(&factory, &endp);
if (is_value(msg)) {
hp = erts_produce_heap(&factory,
@@ -4503,7 +4633,9 @@ port_sig_info(Port *prt,
sigdp->u.info.item);
if (is_value(value)) {
ErtsHeapFactory factory;
- erts_factory_message_init(&factory, NULL, hp, bp);
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = bp;
+ erts_factory_selfcontained_message_init(&factory, mp, hp);
queue_port_sched_op_reply(rp,
&rp_locks,
&factory,
@@ -4591,8 +4723,8 @@ reply_io_bytes(void *vreq)
rp = erts_proc_lookup(req->pid);
if (rp) {
- ErlOffHeap *ohp = NULL;
- ErlHeapFragment *bp = NULL;
+ ErlOffHeap *ohp;
+ ErtsMessage *mp;
ErtsProcLocks rp_locks;
Eterm ref, msg, ein, eout, *hp;
Uint64 in, out;
@@ -4614,7 +4746,7 @@ reply_io_bytes(void *vreq)
erts_bld_uint64(NULL, &hsz, in);
erts_bld_uint64(NULL, &hsz, out);
- hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp);
ref = make_internal_ref(hp);
write_ref_thing(hp, req->refn[0], req->refn[1], req->refn[2]);
@@ -4624,7 +4756,7 @@ reply_io_bytes(void *vreq)
eout = erts_bld_uint64(&hp, NULL, out);
msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout);
- erts_queue_message(rp, &rp_locks, bp, msg, NIL);
+ erts_queue_message(rp, &rp_locks, mp, msg, NIL);
if (req->sched_id == sched_id)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -4730,6 +4862,10 @@ print_port_info(Port *p, int to, void *arg)
erts_print(to, arg, "Port is a file: %s\n",p->name);
} else if (p->drv_ptr == &spawn_driver) {
erts_print(to, arg, "Port controls external process: %s\n",p->name);
+#ifndef __WIN32__
+ } else if (p->drv_ptr == &forker_driver) {
+ erts_print(to, arg, "Port controls forker process: %s\n",p->name);
+#endif
} else {
erts_print(to, arg, "Port controls linked-in driver: %s\n",p->name);
}
@@ -4876,6 +5012,7 @@ int get_port_flags(ErlDrvPort ix)
void erts_raw_port_command(Port* p, byte* buf, Uint len)
{
int fpe_was_unmasked;
+ ERTS_MSACC_PUSH_STATE_M();
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
@@ -4896,9 +5033,11 @@ void erts_raw_port_command(Port* p, byte* buf, Uint len)
DTRACE4(driver_output, "-raw-", port_str, p->name, len);
}
#endif
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
fpe_was_unmasked = erts_block_fpe();
(*p->drv_ptr->output)((ErlDrvData)p->drv_data, (char*) buf, (int) len);
erts_unblock_fpe(fpe_was_unmasked);
+ ERTS_MSACC_POP_STATE_M();
}
int async_ready(Port *p, void* data)
@@ -4910,6 +5049,7 @@ int async_ready(Port *p, void* data)
if (p) {
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
if (p->drv_ptr->ready_async != NULL) {
+ ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_ready_async)) {
DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p)
@@ -4918,6 +5058,7 @@ int async_ready(Port *p, void* data)
#endif
(*p->drv_ptr->ready_async)((ErlDrvData)p->drv_data, data);
need_free = 0;
+ ERTS_MSACC_POP_STATE_M();
}
erts_port_driver_callback_epilogue(p, NULL);
@@ -5069,11 +5210,11 @@ ErlDrvTermData driver_mk_term_nil(void)
void driver_report_exit(ErlDrvPort ix, int status)
{
Eterm* hp;
+ ErlOffHeap *ohp;
Eterm tuple;
Process *rp;
Eterm pid;
- ErlHeapFragment *bp = NULL;
- ErlOffHeap *ohp;
+ ErtsMessage *mp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
Port* prt = erts_drvport2port(ix);
@@ -5093,13 +5234,13 @@ void driver_report_exit(ErlDrvPort ix, int status)
if (!rp)
return;
- hp = erts_alloc_message_heap(3+3, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, 3+3, &hp, &ohp);
tuple = TUPLE2(hp, am_exit_status, make_small(status));
hp += 3;
tuple = TUPLE2(hp, prt->common.id, tuple);
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -5209,7 +5350,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
ErtsProcLocks rp_locks = 0;
struct b2t_states__ b2t;
int scheduler;
- int is_heap_need_limited = 1;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_UNDEF(mess,NIL);
@@ -5248,25 +5388,17 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
break;
case ERL_DRV_INT: /* signed int argument */
ERTS_DDT_CHK_ENOUGH_ARGS(1);
-#if HALFWORD_HEAP
- erts_bld_sint64(NULL, &need, (Sint64)ptr[0]);
-#else
/* check for bignum */
if (!IS_SSMALL((Sint)ptr[0]))
need += BIG_UINT_HEAP_SIZE; /* use small_to_big */
-#endif
ptr++;
depth++;
break;
case ERL_DRV_UINT: /* unsigned int argument */
ERTS_DDT_CHK_ENOUGH_ARGS(1);
-#if HALFWORD_HEAP
- erts_bld_uint64(NULL, &need, (Uint64)ptr[0]);
-#else
/* check for bignum */
if (!IS_USMALL(0, (Uint)ptr[0]))
need += BIG_UINT_HEAP_SIZE; /* use small_to_big */
-#endif
ptr++;
depth++;
break;
@@ -5386,9 +5518,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
need += hsz;
ptr += 2;
depth++;
- if (size > MAP_SMALL_MAP_LIMIT*3) { /* may contain big map */
- is_heap_need_limited = 0;
- }
break;
}
case ERL_DRV_MAP: { /* int */
@@ -5396,7 +5525,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
if ((int) ptr[0] < 0) ERTS_DDT_FAIL;
if (ptr[0] > MAP_SMALL_MAP_LIMIT) {
need += HASHMAP_ESTIMATED_HEAP_SIZE(ptr[0]);
- is_heap_need_limited = 0;
} else {
need += MAP_HEADER_FLATMAP_SZ + 1 + 2*ptr[0];
}
@@ -5435,17 +5563,7 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
goto done;
}
- /* Try copy directly to destination heap if we know there are no big maps */
- if (is_heap_need_limited) {
- ErlOffHeap *ohp;
- ErlHeapFragment* bp;
- Eterm* hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
- }
- else {
- erts_factory_message_init(&factory, NULL, NULL,
- new_message_buffer(need));
- }
+ (void) erts_factory_message_create(&factory, rp, &rp_locks, need);
/*
* Interpret the instructions and build the term.
@@ -5465,10 +5583,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
break;
case ERL_DRV_INT: /* signed int argument */
-#if HALFWORD_HEAP
- erts_reserve_heap(&factory, BIG_NEED_SIZE(2));
- mess = erts_bld_sint64(&factory.hp, NULL, (Sint64)ptr[0]);
-#else
erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE);
if (IS_SSMALL((Sint)ptr[0]))
mess = make_small((Sint)ptr[0]);
@@ -5476,15 +5590,10 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
mess = small_to_big((Sint)ptr[0], factory.hp);
factory.hp += BIG_UINT_HEAP_SIZE;
}
-#endif
ptr++;
break;
case ERL_DRV_UINT: /* unsigned int argument */
-#if HALFWORD_HEAP
- erts_reserve_heap(&factory, BIG_NEED_FOR_BITS(64));
- mess = erts_bld_uint64(&factory.hp, NULL, (Uint64)ptr[0]);
-#else
erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE);
if (IS_USMALL(0, (Uint)ptr[0]))
mess = make_small((Uint)ptr[0]);
@@ -5492,7 +5601,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
mess = uint_to_big((Uint)ptr[0], factory.hp);
factory.hp += BIG_UINT_HEAP_SIZE;
}
-#endif
ptr++;
break;
@@ -5724,9 +5832,9 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
if (res > 0) {
mess = ESTACK_POP(stack); /* get resulting value */
- erts_factory_close(&factory);
+ erts_factory_trim_and_close(&factory, &mess, 1);
/* send message */
- erts_queue_message(rp, &rp_locks, factory.heap_frags, mess, am_undefined);
+ erts_queue_message(rp, &rp_locks, factory.message, mess, am_undefined);
}
else {
if (b2t.ix > b2t.used)
@@ -6762,6 +6870,28 @@ driver_get_now(ErlDrvNowData *now_data)
return 0;
}
+ErlDrvTime
+erl_drv_monotonic_time(ErlDrvTimeUnit time_unit)
+{
+ return (ErlDrvTime) erts_napi_monotonic_time((int) time_unit);
+}
+
+ErlDrvTime
+erl_drv_time_offset(ErlDrvTimeUnit time_unit)
+{
+ return (ErlDrvTime) erts_napi_time_offset((int) time_unit);
+}
+
+ErlDrvTime
+erl_drv_convert_time_unit(ErlDrvTime val,
+ ErlDrvTimeUnit from,
+ ErlDrvTimeUnit to)
+{
+ return (ErlDrvTime) erts_napi_convert_time_unit((ErtsMonotonicTime) val,
+ (int) from,
+ (int) to);
+}
+
static void ref_to_driver_monitor(Eterm ref, ErlDrvMonitor *mon)
{
RefThing *refp;
@@ -6809,7 +6939,7 @@ int driver_monitor_process(ErlDrvPort drvport,
{
Port *prt;
int ret;
-#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -6820,16 +6950,6 @@ int driver_monitor_process(ErlDrvPort drvport,
/* Now (in SMP) we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
-
-#if !HEAP_ON_C_STACK
- if (!sched) {
- /* Need a separate allocation for the ref :( */
- Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
- sizeof(Eterm)*REF_THING_SIZE);
- ret = do_driver_monitor_process(prt,buf,process,monitor);
- erts_free(ERTS_ALC_T_TEMP_TERM,buf);
- } else
-#endif
{
DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
UseTmpHeapNoproc(REF_THING_SIZE);
@@ -6882,7 +7002,7 @@ int driver_demonitor_process(ErlDrvPort drvport,
{
Port *prt;
int ret;
-#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -6893,15 +7013,6 @@ int driver_demonitor_process(ErlDrvPort drvport,
/* Now we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
-#if !HEAP_ON_C_STACK
- if (!sched) {
- /* Need a separate allocation for the ref :( */
- Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
- sizeof(Eterm)*REF_THING_SIZE);
- ret = do_driver_demonitor_process(prt,buf,monitor);
- erts_free(ERTS_ALC_T_TEMP_TERM,buf);
- } else
-#endif
{
DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
UseTmpHeapNoproc(REF_THING_SIZE);
@@ -6937,7 +7048,7 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport,
{
Port *prt;
ErlDrvTermData ret;
-#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -6948,16 +7059,6 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport,
/* Now we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
-
-#if !HEAP_ON_C_STACK
- if (!sched) {
- /* Need a separate allocation for the ref :( */
- Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
- sizeof(Eterm)*REF_THING_SIZE);
- ret = do_driver_get_monitored_process(prt,buf,monitor);
- erts_free(ERTS_ALC_T_TEMP_TERM,buf);
- } else
-#endif
{
DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
UseTmpHeapNoproc(REF_THING_SIZE);
@@ -6981,6 +7082,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
void (*callback)(ErlDrvData drv_data, ErlDrvMonitor *monitor);
ErlDrvMonitor drv_monitor;
int fpe_was_unmasked;
+ ERTS_MSACC_PUSH_STATE_M();
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
ASSERT(prt->drv_ptr != NULL);
@@ -6992,6 +7094,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
callback = prt->drv_ptr->process_exit;
ASSERT(callback != NULL);
ref_to_driver_monitor(ref,&drv_monitor);
+ ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
DRV_MONITOR_UNLOCK_PDL(prt);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_process_exit)) {
@@ -7003,6 +7106,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
(*callback)((ErlDrvData) (prt->drv_data), &drv_monitor);
erts_unblock_fpe(fpe_was_unmasked);
DRV_MONITOR_LOCK_PDL(prt);
+ ERTS_MSACC_POP_STATE_M();
/* remove monitor *after* callback */
rmon = erts_remove_monitor(&ERTS_P_MONITORS(prt), ref);
DRV_MONITOR_UNLOCK_PDL(prt);
@@ -7023,6 +7127,9 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ if (prt->async_open_port)
+ init_ack_send_reply(prt, prt->common.id);
if (eof)
flush_linebuf_messages(prt, state);
if (state & ERTS_PORT_SFLG_CLOSING) {