diff options
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r-- | erts/emulator/beam/io.c | 366 |
1 files changed, 225 insertions, 141 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 81a05a6b87..797e4cdadc 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -53,10 +53,11 @@ #include "erl_time.h" extern ErlDrvEntry fd_driver_entry; -#ifndef __OSE__ extern ErlDrvEntry vanilla_driver_entry; -#endif extern ErlDrvEntry spawn_driver_entry; +#ifndef __WIN32__ +extern ErlDrvEntry forker_driver_entry; +#endif extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */ erts_driver_t *driver_list; /* List of all drivers, static and dynamic. */ @@ -74,6 +75,9 @@ const Port erts_invalid_port = {{ERTS_INVALID_PORT}}; erts_driver_t vanilla_driver; erts_driver_t spawn_driver; +#ifndef __WIN32__ +erts_driver_t forker_driver; +#endif erts_driver_t fd_driver; int erts_port_synchronous_ops = 0; @@ -87,6 +91,7 @@ static void deliver_result(Eterm sender, Eterm pid, Eterm res); static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *); static void terminate_port(Port *p); static void pdl_init(void); +static int driver_failure_term(ErlDrvPort ix, Eterm term, int eof); #ifdef ERTS_SMP static void driver_monitor_lock_pdl(Port *p); static void driver_monitor_unlock_pdl(Port *p); @@ -308,12 +313,9 @@ static Port *create_port(char *name, size_t port_size, busy_port_queue_size, size; erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED; erts_aint32_t x_pts_flgs = 0; -#ifdef DEBUG - /* Make sure the debug flags survives until port is freed */ - state |= ERTS_PORT_SFLG_PORT_DEBUG; -#endif #ifdef ERTS_SMP + ErtsRunQueue *runq; if (!driver_lock) { /* Align size for mutex following port struct */ port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); @@ -323,6 +325,12 @@ static Port *create_port(char *name, #endif port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); +#ifdef DEBUG + /* Make sure the debug flags survives until port is freed */ + state |= ERTS_PORT_SFLG_PORT_DEBUG; +#endif + + busy_port_queue_size = ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ) ? 0 @@ -358,8 +366,12 @@ static Port *create_port(char *name, p += sizeof(erts_mtx_t); state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } - erts_smp_atomic_set_nob(&prt->run_queue, - (erts_aint_t) erts_get_runq_current(NULL)); + if (erts_get_scheduler_data()) + runq = erts_get_runq_current(NULL); + else + runq = ERTS_RUNQ_IX(0); + erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq); + prt->xports = NULL; #else erts_atomic32_init_nob(&prt->refc, 1); @@ -386,6 +398,7 @@ static Port *create_port(char *name, ERTS_PTMR_INIT(prt); erts_port_task_handle_init(&prt->timeout_task); prt->psd = NULL; + prt->async_open_port = NULL; prt->drv_data = (SWord) 0; prt->os_pid = -1; @@ -467,6 +480,11 @@ erts_port_free(Port *prt) erts_port_task_fini_sched(&prt->sched); + if (prt->async_open_port) { + erts_free(ERTS_ALC_T_PRTSD, prt->async_open_port); + prt->async_open_port = NULL; + } + #ifdef ERTS_SMP ASSERT(prt->lock); if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) @@ -688,6 +706,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ error_number = error_type = 0; if (driver->start) { + ERTS_MSACC_PUSH_STATE_M(); if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) { trace_sched_ports_where(port, am_in, am_start); } @@ -698,6 +717,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ DTRACE3(driver_start, process_str, driver->name, port_str); } #endif + ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT); fpe_was_unmasked = erts_block_fpe(); drv_data = (*driver->start)(ERTS_Port2ErlDrvPort(port), name, opts); if (((SWord) drv_data) == -1) @@ -717,6 +737,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ } erts_unblock_fpe(fpe_was_unmasked); + ERTS_MSACC_POP_STATE_M(); port->caller = NIL; if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) { trace_sched_ports_where(port, am_out, am_start); @@ -1106,7 +1127,7 @@ io_list_vec_len(Eterm obj, int* vsize, Uint* csize, Uint p_v_size = 0; Uint p_c_size = 0; Uint p_in_clist = 0; - Uint total; /* Uint due to halfword emulator */ + Uint total; goto L_jump_start; /* avoid a push */ @@ -1413,7 +1434,7 @@ queue_port_sched_op_reply(Process *rp, erts_factory_trim_and_close(factory, &msg, 1); - erts_queue_message(rp, rp_locksp, factory->heap_frags, msg, NIL); + erts_queue_message(rp, rp_locksp, factory->message, msg, NIL); } static void @@ -1421,12 +1442,9 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg) { Process *rp = erts_proc_lookup_raw(to); if (rp) { - ErlOffHeap *ohp; - ErlHeapFragment* bp; ErtsHeapFactory factory; Eterm msg_copy; Uint hsz, msg_sz; - Eterm *hp; ErtsProcLocks rp_locks = 0; hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; @@ -1437,18 +1455,13 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg) hsz += msg_sz; } - hp = erts_alloc_message_heap(hsz, - &bp, - &ohp, - rp, - &rp_locks); - erts_factory_message_init(&factory, rp, hp, bp); - if (is_immed(msg)) - msg_copy = msg; - else { - msg_copy = copy_struct(msg, msg_sz, &hp, ohp); - factory.hp = hp; - } + (void) erts_factory_message_create(&factory, rp, + &rp_locks, hsz); + msg_copy = (is_immed(msg) + ? msg + : copy_struct(msg, msg_sz, + &factory.hp, + factory.off_heap)); queue_port_sched_op_reply(rp, &rp_locks, @@ -1536,6 +1549,26 @@ erts_schedule_proc2port_signal(Process *c_p, return ERTS_PORT_OP_SCHEDULED; } +static int +erts_schedule_port2port_signal(Eterm port_num, ErtsProc2PortSigData *sigdp, + int task_flags, + ErtsProc2PortSigCallback callback) +{ + Port *prt = erts_port_lookup_raw(port_num); + + if (!prt) + return -1; + + sigdp->caller = ERTS_INVALID_PID; + + return erts_port_task_schedule(prt->common.id, + NULL, + ERTS_PORT_TASK_PROC_SIG, + sigdp, + callback, + task_flags); +} + static ERTS_INLINE void send_badsig(Port *prt) { ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; @@ -1680,6 +1713,7 @@ call_driver_outputv(int bang_op, else { ErtsSchedulerData *esdp = erts_get_scheduler_data(); ErlDrvSizeT size = evp->size; + ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt) || ERTS_IS_CRASH_DUMPING); @@ -1700,6 +1734,8 @@ call_driver_outputv(int bang_op, esdp->io.out += (Uint64) size; else erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size); + + ERTS_MSACC_POP_STATE_M(); } } @@ -1780,6 +1816,7 @@ call_driver_output(int bang_op, send_badsig(prt); else { ErtsSchedulerData *esdp = erts_get_scheduler_data(); + ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt) || ERTS_IS_CRASH_DUMPING); @@ -1799,6 +1836,8 @@ call_driver_output(int bang_op, esdp->io.out += (Uint64) size; else erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size); + + ERTS_MSACC_POP_STATE_M(); } } @@ -2371,6 +2410,11 @@ erts_port_exit(Process *c_p, | ERTS_PORT_SIG_FLG_BROKEN_LINK | ERTS_PORT_SIG_FLG_FORCE_SCHED)) == 0); +#ifndef __WIN32__ + if (prt->drv_ptr == &forker_driver) + return ERTS_PORT_OP_DROPPED; +#endif + if (!(flags & ERTS_PORT_SIG_FLG_FORCE_SCHED)) { ErtsTryImmDrvCallState try_call_state = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(c_p, @@ -2735,6 +2779,72 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) port_sig_link); } +static void +init_ack_send_reply(Port *port, Eterm resp) +{ + + if (!is_internal_port(resp)) { + Process *rp = erts_proc_lookup_raw(port->async_open_port->to); + erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK); + erts_remove_link(&ERTS_P_LINKS(port), port->async_open_port->to); + erts_remove_link(&ERTS_P_LINKS(rp), port->common.id); + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + } + port_sched_op_reply(port->async_open_port->to, + port->async_open_port->ref, + resp); + + erts_free(ERTS_ALC_T_PRTSD, port->async_open_port); + port->async_open_port = NULL; +} + +void +erl_drv_init_ack(ErlDrvPort ix, ErlDrvData res) { + Port *port = erts_drvport2port(ix); + SWord err_type = (SWord)res; + Eterm resp; + + if (port == ERTS_INVALID_ERL_DRV_PORT && port->async_open_port) + return; + + if (port->async_open_port) { + switch(err_type) { + case -3: + resp = am_badarg; + break; + case -2: { + char *str = erl_errno_id(errno); + resp = erts_atom_put((byte *) str, strlen(str), + ERTS_ATOM_ENC_LATIN1, 1); + break; + } + case -1: + resp = am_einval; + break; + default: + resp = port->common.id; + break; + } + + init_ack_send_reply(port, resp); + + if (err_type == -1 || err_type == -2 || err_type == -3) + driver_failure_term(ix, am_normal, 0); + port->drv_data = err_type; + } +} + +void +erl_drv_set_os_pid(ErlDrvPort ix, ErlDrvSInt pid) { + Port *port = erts_drvport2port(ix); + + if (port == ERTS_INVALID_ERL_DRV_PORT) + return; + + port->os_pid = (SWord)pid; + +} + void erts_init_io(int port_tab_size, int port_tab_size_ignore_files, int legacy_port_tab) @@ -2795,10 +2905,11 @@ void erts_init_io(int port_tab_size, erts_smp_rwmtx_rwlock(&erts_driver_list_lock); init_driver(&fd_driver, &fd_driver_entry, NULL); -#ifndef __OSE__ init_driver(&vanilla_driver, &vanilla_driver_entry, NULL); -#endif init_driver(&spawn_driver, &spawn_driver_entry, NULL); +#ifndef __WIN32__ + init_driver(&forker_driver, &forker_driver_entry, NULL); +#endif erts_init_static_drivers(); for (dp = driver_tab; *dp != NULL; dp++) erts_add_driver_entry(*dp, NULL, 1); @@ -2860,6 +2971,9 @@ void erts_lcnt_enable_io_lock_count(int enable) { lcnt_enable_drv_lock_count(&vanilla_driver, enable); lcnt_enable_drv_lock_count(&spawn_driver, enable); +#ifndef __WIN32__ + lcnt_enable_drv_lock_count(&forker_driver, enable); +#endif lcnt_enable_drv_lock_count(&fd_driver, enable); /* enable lock counting in all drivers */ for (dp = driver_list; dp; dp = dp->next) { @@ -3055,16 +3169,17 @@ deliver_result(Eterm sender, Eterm pid, Eterm res) if (rp) { Eterm tuple; - ErlHeapFragment *bp; + ErtsMessage *mp; ErlOffHeap *ohp; Eterm* hp; Uint sz_res; sz_res = size_object(res); - hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, + sz_res + 3, &hp, &ohp); res = copy_struct(res, sz_res, &hp, ohp); tuple = TUPLE2(hp, sender, res); - erts_queue_message(rp, &rp_locks, bp, tuple, NIL); + erts_queue_message(rp, &rp_locks, mp, tuple, NIL); if (rp_locks) erts_smp_proc_unlock(rp, rp_locks); @@ -3092,7 +3207,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, Eterm tuple; Process* rp; Eterm* hp; - ErlHeapFragment *bp; + ErtsMessage *mp; ErlOffHeap *ohp; ErtsProcLocks rp_locks = 0; int scheduler = erts_get_scheduler_id() != 0; @@ -3118,7 +3233,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, if (!rp) return; - hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp); listp = NIL; if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) { @@ -3160,7 +3275,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, tuple = TUPLE2(hp, prt->common.id, tuple); hp += 3; - erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined); + erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined); if (rp_locks) erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) @@ -3234,7 +3349,7 @@ deliver_vec_message(Port* prt, /* Port */ Eterm tuple; Process* rp; Eterm* hp; - ErlHeapFragment *bp; + ErtsMessage *mp; ErlOffHeap *ohp; ErtsProcLocks rp_locks = 0; int scheduler = erts_get_scheduler_id() != 0; @@ -3266,7 +3381,7 @@ deliver_vec_message(Port* prt, /* Port */ need += (hlen+csize)*2; } - hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp); listp = NIL; iov += vsize; @@ -3327,7 +3442,7 @@ deliver_vec_message(Port* prt, /* Port */ tuple = TUPLE2(hp, prt->common.id, tuple); hp += 3; - erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined); + erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined); erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) erts_proc_dec_refc(rp); @@ -3367,6 +3482,7 @@ static void flush_port(Port *p) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); if (p->drv_ptr->flush != NULL) { + ERTS_MSACC_PUSH_STATE_M(); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_flush)) { DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p) @@ -3376,9 +3492,11 @@ static void flush_port(Port *p) if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) { trace_sched_ports_where(p, am_in, am_flush); } + ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT); fpe_was_unmasked = erts_block_fpe(); (*p->drv_ptr->flush)((ErlDrvData)p->drv_data); erts_unblock_fpe(fpe_was_unmasked); + ERTS_MSACC_POP_STATE_M(); if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) { trace_sched_ports_where(p, am_out, am_flush); } @@ -3426,6 +3544,7 @@ terminate_port(Port *prt) drv = prt->drv_ptr; if ((drv != NULL) && (drv->stop != NULL)) { int fpe_was_unmasked = erts_block_fpe(); + ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_stop)) { DTRACE_FORMAT_COMMON_PID_AND_PORT(connected_id, prt) @@ -3434,6 +3553,7 @@ terminate_port(Port *prt) #endif (*drv->stop)((ErlDrvData)prt->drv_data); erts_unblock_fpe(fpe_was_unmasked); + ERTS_MSACC_POP_STATE_M(); #ifdef ERTS_SMP if (prt->xports) erts_port_handle_xports(prt); @@ -3744,6 +3864,7 @@ call_driver_control(Eterm caller, ErlDrvSizeT *from_size) { ErlDrvSSizeT cres; + ERTS_MSACC_PUSH_STATE_M(); if (!prt->drv_ptr->control) return ERTS_PORT_OP_BADARG; @@ -3757,6 +3878,8 @@ call_driver_control(Eterm caller, command, size); } #endif + + ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT); prt->caller = caller; cres = prt->drv_ptr->control((ErlDrvData) prt->drv_data, @@ -3767,6 +3890,8 @@ call_driver_control(Eterm caller, *from_size); prt->caller = NIL; + ERTS_MSACC_POP_STATE_M(); + if (cres < 0) return ERTS_PORT_OP_BADARG; @@ -3818,7 +3943,6 @@ write_port_control_result(int control_flags, ErlDrvSizeT resp_size, char *pre_alloc_buf, Eterm **hpp, - ErlHeapFragment *bp, ErlOffHeap *ohp) { Eterm res; @@ -3892,16 +4016,13 @@ port_sig_control(Port *prt, if (res == ERTS_PORT_OP_DONE) { Eterm msg; - Eterm *hp; - ErlHeapFragment *bp; - ErlOffHeap *ohp; ErtsHeapFactory factory; Process *rp; ErtsProcLocks rp_locks = 0; Uint hsz, rsz; int control_flags; - rp = erts_proc_lookup_raw(sigdp->caller); + rp = sigdp->caller == ERTS_INVALID_PID ? NULL : erts_proc_lookup_raw(sigdp->caller); if (!rp) goto done; @@ -3914,22 +4035,15 @@ port_sig_control(Port *prt, hsz = rsz + ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; - hp = erts_alloc_message_heap(hsz, - &bp, - &ohp, - rp, - &rp_locks); - erts_factory_message_init(&factory, rp, hp, bp); + (void) erts_factory_message_create(&factory, rp, + &rp_locks, hsz); msg = write_port_control_result(control_flags, resp_bufp, resp_size, &resp_buf[0], - &hp, - bp, - ohp); - factory.hp = hp; - + &factory.hp, + factory.off_heap); queue_port_sched_op_reply(rp, &rp_locks, &factory, @@ -3944,7 +4058,8 @@ port_sig_control(Port *prt, /* failure */ - port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); + if (sigdp->caller != ERTS_INVALID_PID) + port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); done: @@ -3954,6 +4069,23 @@ done: return ERTS_PORT_REDS_CONTROL; } +/* + * This is an asynchronous control call. I.e. it will not return anything + * to the caller. + */ +int +erl_drv_port_control(Eterm port_num, char cmd, char* buff, ErlDrvSizeT size) +{ + ErtsProc2PortSigData *sigdp = erts_port_task_alloc_p2p_sig_data(); + + sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL | ERTS_P2P_SIG_DATA_FLG_REPLY; + sigdp->u.control.binp = NULL; + sigdp->u.control.command = cmd; + sigdp->u.control.bufp = buff; + sigdp->u.control.size = size; + + return erts_schedule_port2port_signal(port_num, sigdp, 0, port_sig_control); +} ErtsPortOpResult erts_port_control(Process* c_p, @@ -4070,7 +4202,6 @@ erts_port_control(Process* c_p, resp_size, &resp_buf[0], &hp, - NULL, &c_p->off_heap); BUMP_REDS(c_p, ERTS_PORT_REDS_CONTROL); return ERTS_PORT_OP_DONE; @@ -4092,10 +4223,10 @@ erts_port_control(Process* c_p, binp = NULL; if (is_binary(data) && binary_bitoffset(data) == 0) { - Eterm *ebinp = binary_val_rel(data, NULL); + Eterm *ebinp = binary_val(data); ASSERT(!tmp_alloced); if (*ebinp == HEADER_SUB_BIN) - ebinp = binary_val_rel(((ErlSubBin *) ebinp)->orig, NULL); + ebinp = binary_val(((ErlSubBin *) ebinp)->orig); if (*ebinp != HEADER_PROC_BIN) copy = 1; else { @@ -4148,6 +4279,7 @@ call_driver_call(Eterm caller, unsigned *ret_flagsp) { ErlDrvSSizeT cres; + ERTS_MSACC_PUSH_STATE_M(); if (!prt->drv_ptr->call) return ERTS_PORT_OP_BADARG; @@ -4163,6 +4295,8 @@ call_driver_call(Eterm caller, } #endif + ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT); + prt->caller = caller; cres = prt->drv_ptr->call((ErlDrvData) prt->drv_data, command, @@ -4173,6 +4307,8 @@ call_driver_call(Eterm caller, ret_flagsp); prt->caller = NIL; + ERTS_MSACC_POP_STATE_M(); + if (cres <= 0 || ((byte) (*resp_bufp)[0]) != VERSION_MAGIC) return ERTS_PORT_OP_BADARG; @@ -4229,21 +4365,14 @@ port_sig_call(Port *prt, hsz = erts_decode_ext_size((byte *) resp_bufp, resp_size); if (hsz >= 0) { - ErlHeapFragment* bp; - ErlOffHeap* ohp; ErtsHeapFactory factory; byte *endp; hsz += 3; /* ok tuple */ hsz += ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; - hp = erts_alloc_message_heap(hsz, - &bp, - &ohp, - rp, - &rp_locks); + (void) erts_factory_message_create(&factory, rp, &rp_locks, hsz); endp = (byte *) resp_bufp; - erts_factory_message_init(&factory, rp, hp, bp); msg = erts_decode_ext(&factory, &endp); if (is_value(msg)) { hp = erts_produce_heap(&factory, @@ -4504,7 +4633,9 @@ port_sig_info(Port *prt, sigdp->u.info.item); if (is_value(value)) { ErtsHeapFactory factory; - erts_factory_message_init(&factory, NULL, hp, bp); + ErtsMessage *mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = bp; + erts_factory_selfcontained_message_init(&factory, mp, hp); queue_port_sched_op_reply(rp, &rp_locks, &factory, @@ -4592,8 +4723,8 @@ reply_io_bytes(void *vreq) rp = erts_proc_lookup(req->pid); if (rp) { - ErlOffHeap *ohp = NULL; - ErlHeapFragment *bp = NULL; + ErlOffHeap *ohp; + ErtsMessage *mp; ErtsProcLocks rp_locks; Eterm ref, msg, ein, eout, *hp; Uint64 in, out; @@ -4615,7 +4746,7 @@ reply_io_bytes(void *vreq) erts_bld_uint64(NULL, &hsz, in); erts_bld_uint64(NULL, &hsz, out); - hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp); ref = make_internal_ref(hp); write_ref_thing(hp, req->refn[0], req->refn[1], req->refn[2]); @@ -4625,7 +4756,7 @@ reply_io_bytes(void *vreq) eout = erts_bld_uint64(&hp, NULL, out); msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout); - erts_queue_message(rp, &rp_locks, bp, msg, NIL); + erts_queue_message(rp, &rp_locks, mp, msg, NIL); if (req->sched_id == sched_id) rp_locks &= ~ERTS_PROC_LOCK_MAIN; @@ -4731,6 +4862,10 @@ print_port_info(Port *p, int to, void *arg) erts_print(to, arg, "Port is a file: %s\n",p->name); } else if (p->drv_ptr == &spawn_driver) { erts_print(to, arg, "Port controls external process: %s\n",p->name); +#ifndef __WIN32__ + } else if (p->drv_ptr == &forker_driver) { + erts_print(to, arg, "Port controls forker process: %s\n",p->name); +#endif } else { erts_print(to, arg, "Port controls linked-in driver: %s\n",p->name); } @@ -4877,6 +5012,7 @@ int get_port_flags(ErlDrvPort ix) void erts_raw_port_command(Port* p, byte* buf, Uint len) { int fpe_was_unmasked; + ERTS_MSACC_PUSH_STATE_M(); ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); @@ -4897,9 +5033,11 @@ void erts_raw_port_command(Port* p, byte* buf, Uint len) DTRACE4(driver_output, "-raw-", port_str, p->name, len); } #endif + ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT); fpe_was_unmasked = erts_block_fpe(); (*p->drv_ptr->output)((ErlDrvData)p->drv_data, (char*) buf, (int) len); erts_unblock_fpe(fpe_was_unmasked); + ERTS_MSACC_POP_STATE_M(); } int async_ready(Port *p, void* data) @@ -4911,6 +5049,7 @@ int async_ready(Port *p, void* data) if (p) { ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); if (p->drv_ptr->ready_async != NULL) { + ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_ready_async)) { DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p) @@ -4919,6 +5058,7 @@ int async_ready(Port *p, void* data) #endif (*p->drv_ptr->ready_async)((ErlDrvData)p->drv_data, data); need_free = 0; + ERTS_MSACC_POP_STATE_M(); } erts_port_driver_callback_epilogue(p, NULL); @@ -5070,11 +5210,11 @@ ErlDrvTermData driver_mk_term_nil(void) void driver_report_exit(ErlDrvPort ix, int status) { Eterm* hp; + ErlOffHeap *ohp; Eterm tuple; Process *rp; Eterm pid; - ErlHeapFragment *bp = NULL; - ErlOffHeap *ohp; + ErtsMessage *mp; ErtsProcLocks rp_locks = 0; int scheduler = erts_get_scheduler_id() != 0; Port* prt = erts_drvport2port(ix); @@ -5094,13 +5234,13 @@ void driver_report_exit(ErlDrvPort ix, int status) if (!rp) return; - hp = erts_alloc_message_heap(3+3, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, 3+3, &hp, &ohp); tuple = TUPLE2(hp, am_exit_status, make_small(status)); hp += 3; tuple = TUPLE2(hp, prt->common.id, tuple); - erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined); + erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined); erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) @@ -5210,7 +5350,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) ErtsProcLocks rp_locks = 0; struct b2t_states__ b2t; int scheduler; - int is_heap_need_limited = 1; ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_UNDEF(mess,NIL); @@ -5249,25 +5388,17 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) break; case ERL_DRV_INT: /* signed int argument */ ERTS_DDT_CHK_ENOUGH_ARGS(1); -#if HALFWORD_HEAP - erts_bld_sint64(NULL, &need, (Sint64)ptr[0]); -#else /* check for bignum */ if (!IS_SSMALL((Sint)ptr[0])) need += BIG_UINT_HEAP_SIZE; /* use small_to_big */ -#endif ptr++; depth++; break; case ERL_DRV_UINT: /* unsigned int argument */ ERTS_DDT_CHK_ENOUGH_ARGS(1); -#if HALFWORD_HEAP - erts_bld_uint64(NULL, &need, (Uint64)ptr[0]); -#else /* check for bignum */ if (!IS_USMALL(0, (Uint)ptr[0])) need += BIG_UINT_HEAP_SIZE; /* use small_to_big */ -#endif ptr++; depth++; break; @@ -5387,9 +5518,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) need += hsz; ptr += 2; depth++; - if (size > MAP_SMALL_MAP_LIMIT*3) { /* may contain big map */ - is_heap_need_limited = 0; - } break; } case ERL_DRV_MAP: { /* int */ @@ -5397,7 +5525,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) if ((int) ptr[0] < 0) ERTS_DDT_FAIL; if (ptr[0] > MAP_SMALL_MAP_LIMIT) { need += HASHMAP_ESTIMATED_HEAP_SIZE(ptr[0]); - is_heap_need_limited = 0; } else { need += MAP_HEADER_FLATMAP_SZ + 1 + 2*ptr[0]; } @@ -5436,17 +5563,7 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) goto done; } - /* Try copy directly to destination heap if we know there are no big maps */ - if (is_heap_need_limited) { - ErlOffHeap *ohp; - ErlHeapFragment* bp; - Eterm* hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks); - erts_factory_message_init(&factory, rp, hp, bp); - } - else { - erts_factory_message_init(&factory, NULL, NULL, - new_message_buffer(need)); - } + (void) erts_factory_message_create(&factory, rp, &rp_locks, need); /* * Interpret the instructions and build the term. @@ -5466,10 +5583,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) break; case ERL_DRV_INT: /* signed int argument */ -#if HALFWORD_HEAP - erts_reserve_heap(&factory, BIG_NEED_SIZE(2)); - mess = erts_bld_sint64(&factory.hp, NULL, (Sint64)ptr[0]); -#else erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE); if (IS_SSMALL((Sint)ptr[0])) mess = make_small((Sint)ptr[0]); @@ -5477,15 +5590,10 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) mess = small_to_big((Sint)ptr[0], factory.hp); factory.hp += BIG_UINT_HEAP_SIZE; } -#endif ptr++; break; case ERL_DRV_UINT: /* unsigned int argument */ -#if HALFWORD_HEAP - erts_reserve_heap(&factory, BIG_NEED_FOR_BITS(64)); - mess = erts_bld_uint64(&factory.hp, NULL, (Uint64)ptr[0]); -#else erts_reserve_heap(&factory, BIG_UINT_HEAP_SIZE); if (IS_USMALL(0, (Uint)ptr[0])) mess = make_small((Uint)ptr[0]); @@ -5493,7 +5601,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) mess = uint_to_big((Uint)ptr[0], factory.hp); factory.hp += BIG_UINT_HEAP_SIZE; } -#endif ptr++; break; @@ -5725,9 +5832,9 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) if (res > 0) { mess = ESTACK_POP(stack); /* get resulting value */ - erts_factory_close(&factory); + erts_factory_trim_and_close(&factory, &mess, 1); /* send message */ - erts_queue_message(rp, &rp_locks, factory.heap_frags, mess, am_undefined); + erts_queue_message(rp, &rp_locks, factory.message, mess, am_undefined); } else { if (b2t.ix > b2t.used) @@ -6832,7 +6939,7 @@ int driver_monitor_process(ErlDrvPort drvport, { Port *prt; int ret; -#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) +#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK) ErtsSchedulerData *sched = erts_get_scheduler_data(); #endif @@ -6843,16 +6950,6 @@ int driver_monitor_process(ErlDrvPort drvport, /* Now (in SMP) we should have either the port lock (if we have a scheduler) or the port data lock (if we're a driver thread) */ ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock)); - -#if !HEAP_ON_C_STACK - if (!sched) { - /* Need a separate allocation for the ref :( */ - Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM, - sizeof(Eterm)*REF_THING_SIZE); - ret = do_driver_monitor_process(prt,buf,process,monitor); - erts_free(ERTS_ALC_T_TEMP_TERM,buf); - } else -#endif { DeclareTmpHeapNoproc(buf,REF_THING_SIZE); UseTmpHeapNoproc(REF_THING_SIZE); @@ -6905,7 +7002,7 @@ int driver_demonitor_process(ErlDrvPort drvport, { Port *prt; int ret; -#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) +#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK) ErtsSchedulerData *sched = erts_get_scheduler_data(); #endif @@ -6916,15 +7013,6 @@ int driver_demonitor_process(ErlDrvPort drvport, /* Now we should have either the port lock (if we have a scheduler) or the port data lock (if we're a driver thread) */ ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock)); -#if !HEAP_ON_C_STACK - if (!sched) { - /* Need a separate allocation for the ref :( */ - Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM, - sizeof(Eterm)*REF_THING_SIZE); - ret = do_driver_demonitor_process(prt,buf,monitor); - erts_free(ERTS_ALC_T_TEMP_TERM,buf); - } else -#endif { DeclareTmpHeapNoproc(buf,REF_THING_SIZE); UseTmpHeapNoproc(REF_THING_SIZE); @@ -6960,7 +7048,7 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport, { Port *prt; ErlDrvTermData ret; -#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) +#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK) ErtsSchedulerData *sched = erts_get_scheduler_data(); #endif @@ -6971,16 +7059,6 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport, /* Now we should have either the port lock (if we have a scheduler) or the port data lock (if we're a driver thread) */ ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock)); - -#if !HEAP_ON_C_STACK - if (!sched) { - /* Need a separate allocation for the ref :( */ - Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM, - sizeof(Eterm)*REF_THING_SIZE); - ret = do_driver_get_monitored_process(prt,buf,monitor); - erts_free(ERTS_ALC_T_TEMP_TERM,buf); - } else -#endif { DeclareTmpHeapNoproc(buf,REF_THING_SIZE); UseTmpHeapNoproc(REF_THING_SIZE); @@ -7004,6 +7082,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) void (*callback)(ErlDrvData drv_data, ErlDrvMonitor *monitor); ErlDrvMonitor drv_monitor; int fpe_was_unmasked; + ERTS_MSACC_PUSH_STATE_M(); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); ASSERT(prt->drv_ptr != NULL); @@ -7015,6 +7094,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) callback = prt->drv_ptr->process_exit; ASSERT(callback != NULL); ref_to_driver_monitor(ref,&drv_monitor); + ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT); DRV_MONITOR_UNLOCK_PDL(prt); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_process_exit)) { @@ -7026,6 +7106,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) (*callback)((ErlDrvData) (prt->drv_data), &drv_monitor); erts_unblock_fpe(fpe_was_unmasked); DRV_MONITOR_LOCK_PDL(prt); + ERTS_MSACC_POP_STATE_M(); /* remove monitor *after* callback */ rmon = erts_remove_monitor(&ERTS_P_MONITORS(prt), ref); DRV_MONITOR_UNLOCK_PDL(prt); @@ -7046,6 +7127,9 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) if (prt == ERTS_INVALID_ERL_DRV_PORT) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + + if (prt->async_open_port) + init_ack_send_reply(prt, prt->common.id); if (eof) flush_linebuf_messages(prt, state); if (state & ERTS_PORT_SFLG_CLOSING) { |