diff options
author | Rickard Green <[email protected]> | 2012-09-16 02:45:32 +0200 |
---|---|---|
committer | Rickard Green <[email protected]> | 2012-12-03 21:18:10 +0100 |
commit | 23c6f9e07a3cae7c05e55abd01ff798384241538 (patch) | |
tree | dc62a1976380ea69f3843e1d53ef53526fdabd3e /erts/emulator/beam/io.c | |
parent | 34fc6f243f8a462f4b2370a9fe5376df1ca08f1d (diff) | |
download | otp-23c6f9e07a3cae7c05e55abd01ff798384241538.tar.gz otp-23c6f9e07a3cae7c05e55abd01ff798384241538.tar.bz2 otp-23c6f9e07a3cae7c05e55abd01ff798384241538.zip |
Add erl_drv_[send|output]_term
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r-- | erts/emulator/beam/io.c | 149 |
1 files changed, 120 insertions, 29 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 96a89a3b4e..6776f72894 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -323,7 +323,7 @@ static Port *create_port(char *name, prt->bytes_in = 0; prt->bytes_out = 0; prt->dist_entry = NULL; - prt->connected = pid; + ERTS_PORT_INIT_CONNECTED(prt, pid); prt->common.u.alive.reg = NULL; #ifdef ERTS_SMP prt->common.u.alive.ptimer = NULL; @@ -1684,7 +1684,7 @@ static void flush_linebuf_messages(Port *prt, erts_aint32_t state) while((ret = flush_linebuf(&lc)) > LINEBUF_EMPTY) deliver_read_message(prt, state, - prt->connected, + ERTS_PORT_GET_CONNECTED(prt), NULL, 0, LINEBUF_DATA(lc), @@ -1847,7 +1847,7 @@ static void flush_port(Port *p) if (p->drv_ptr->flush != NULL) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_flush)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p) + DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p) DTRACE3(driver_flush, process_str, port_str, p->name); } #endif @@ -1892,7 +1892,7 @@ terminate_port(Port *prt) ~ERTS_PORT_SFLG_SEND_CLOSED); if (state & ERTS_PORT_SFLG_SEND_CLOSED) { send_closed_port_id = prt->common.id; - connected_id = prt->connected; + connected_id = ERTS_PORT_GET_CONNECTED(prt); } else { send_closed_port_id = NIL; @@ -1909,7 +1909,7 @@ terminate_port(Port *prt) int fpe_was_unmasked = erts_block_fpe(); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_stop)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt) + DTRACE_FORMAT_COMMON_PID_AND_PORT(connected_id, prt) DTRACE3(driver_stop, process_str, drv->name, port_str); } #endif @@ -2092,7 +2092,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) state = erts_atomic32_read_nob(&p->state); if ((state & (ERTS_PORT_SFLGS_DEAD|ERTS_PORT_SFLG_EXITING)) || ((reason == am_normal) && - ((from != p->connected) && (from != p->common.id)))) { + ((from != ERTS_PORT_GET_CONNECTED(p)) && (from != p->common.id)))) { return; } @@ -2186,7 +2186,7 @@ void erts_port_command(Process *proc, if (is_tuple_arity(command, 2)) { tp = tuple_val(command); - if ((pid = port->connected) == tp[1]) { + if ((pid = ERTS_PORT_GET_CONNECTED(port)) == tp[1]) { /* PID must be connected */ if (tp[2] == am_close) { erts_atomic32_read_bor_relb(&port->state, @@ -2212,7 +2212,7 @@ void erts_port_command(Process *proc, DTRACE4(port_command, process_str, port_str, port->name, "connect"); } #endif - port->connected = tp[2]; + ERTS_PORT_SET_CONNECTED_RELB(port, tp[2]); deliver_result(port->common.id, pid, am_connected); goto done; } @@ -2223,7 +2223,7 @@ void erts_port_command(Process *proc, { ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; Process* rp = erts_pid2proc(NULL, 0, - port->connected, rp_locks); + ERTS_PORT_GET_CONNECTED(port), rp_locks); if (rp) { (void) erts_send_exit_signal(NULL, port->common.id, @@ -2423,7 +2423,7 @@ print_port_info(Port *p, int to, void *arg) erts_print(to, arg, "=port:%T\n", p->common.id); erts_print(to, arg, "Slot: %d\n", internal_port_index(p->common.id)); if (state & ERTS_PORT_SFLG_CONNECTED) { - erts_print(to, arg, "Connected: %T", p->connected); + erts_print(to, arg, "Connected: %T", ERTS_PORT_GET_CONNECTED(p)); erts_print(to, arg, "\n"); } @@ -2627,7 +2627,7 @@ int async_ready(Port *p, void* data) if (p->drv_ptr->ready_async != NULL) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_ready_async)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p) + DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p) DTRACE3(driver_ready_async, process_str, port_str, p->name); } #endif @@ -2804,7 +2804,7 @@ void driver_report_exit(ErlDrvPort ix, int status) ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - pid = prt->connected; + pid = ERTS_PORT_GET_CONNECTED(prt); ASSERT(is_internal_pid(pid)); rp = (scheduler @@ -3384,28 +3384,114 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) #undef ERTS_DDT_FAIL } +static ERTS_INLINE int +deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p) +{ +#ifdef ERTS_SMP + ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay(); +#endif + Port *prt = erts_port_lookup_raw((Eterm) port_id); + erts_aint32_t state = erts_atomic32_read_nob(&prt->state); + if (connected_p) { +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + ETHR_MEMBAR(ETHR_LoadLoad); +#endif + *connected_p = ERTS_PORT_GET_CONNECTED(prt); + } +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) { + erts_thr_progress_unmanaged_continue(dhndl); + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); + } +#endif + ERTS_SMP_LC_ASSERT(dhndl == ERTS_THR_PRGR_DHANDLE_MANAGED + ? erts_lc_is_port_locked(prt) + : !erts_lc_is_port_locked(prt)); + return ((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + ? -1 + : ((state & ERTS_PORT_SFLG_CLOSING) ? 0 : 1)); +} + +int erl_drv_output_term(ErlDrvTermData port_id, ErlDrvTermData* data, int len) +{ + /* May be called from arbitrary thread */ + Eterm connected; + int res = deliver_term_check_port(port_id, &connected); + if (res <= 0) + return res; + return driver_deliver_term(connected, data, len); +} + +/* + * driver_output_term() is deprecated, and has been scheduled for + * removal in OTP-R17. It is replaced by erl_drv_output_term() + * above. + */ int driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len) { erts_aint32_t state; - Port* prt = erts_drvport2port(drvport, &state); + Port* prt; + + ERTS_SMP_CHK_NO_PROC_LOCKS; + /* NOTE! It *not* safe to access 'drvport' from unmanaged threads. */ + prt = erts_drvport2port(drvport, &state); if (!prt) return -1; /* invalid (dead) */ ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) - return driver_deliver_term(prt->connected, data, len); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + return -1; else if (state & ERTS_PORT_SFLG_CLOSING) - return 0; /* closing */ - else - return -1; /* invalid (dead) */ + return 0; + + return driver_deliver_term(ERTS_PORT_GET_CONNECTED(prt), data, len); } +int erl_drv_send_term(ErlDrvTermData port_id, + ErlDrvTermData to, + ErlDrvTermData* data, + int len) +{ + /* May be called from arbitrary thread */ + int res = deliver_term_check_port(port_id, NULL); + if (res <= 0) + return res; + return driver_deliver_term(to, data, len); +} +/* + * driver_send_term() is deprecated, and has been scheduled for + * removal in OTP-R17. It is replaced by erl_drv_send_term() above. + */ int -driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len) +driver_send_term(ErlDrvPort drvport, + ErlDrvTermData to, + ErlDrvTermData* data, + int len) { - /* driver_send_term() assume port is ok... */ + /* + * NOTE! It is *not* safe to access the 'drvport' parameter + * from unmanaged threads. Also note that it is impossible + * to make this access safe without using a less efficient + * internal data representation for ErlDrvPort. + */ + ERTS_SMP_CHK_NO_PROC_LOCKS; +#ifdef ERTS_SMP + if (erts_thr_progress_is_managed_thread()) +#endif + { + erts_aint32_t state; + Port* prt = erts_drvport2port(drvport, &state); + if (!prt) + return -1; /* invalid (dead) */ + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + return -1; + else if (state & ERTS_PORT_SFLG_CLOSING) + return 0; + } return driver_deliver_term(to, data, len); } @@ -3438,7 +3524,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, (byte*) (bin->orig_bytes+offs), len); } else - deliver_bin_message(prt, prt->connected, + deliver_bin_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen, bin, offs, len); return 0; } @@ -3481,9 +3567,11 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, (byte*) buf, len); } else if (state & ERTS_PORT_SFLG_LINEBUF_IO) - deliver_linebuf_message(prt, state, prt->connected, hbuf, hlen, buf, len); + deliver_linebuf_message(prt, state, ERTS_PORT_GET_CONNECTED(prt), + hbuf, hlen, buf, len); else - deliver_read_message(prt, state, prt->connected, hbuf, hlen, buf, len, 0); + deliver_read_message(prt, state, ERTS_PORT_GET_CONNECTED(prt), + hbuf, hlen, buf, len, 0); return 0; } @@ -3548,7 +3636,8 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, /* XXX handle distribution !!! */ prt->bytes_in += (hlen + size); erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + size)); - deliver_vec_message(prt, prt->connected, hbuf, hlen, binv, iov, n, size); + deliver_vec_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen, + binv, iov, n, size); return 0; } @@ -4581,7 +4670,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) DRV_MONITOR_UNLOCK_PDL(prt); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_process_exit)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt) + DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(prt), prt) DTRACE3(driver_process_exit, process_str, port_str, prt->name); } #endif @@ -4614,7 +4703,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) if (state & ERTS_PORT_SFLG_CLOSING) { terminate_port(prt); } else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) { - deliver_result(prt->common.id, prt->connected, am_eof); + deliver_result(prt->common.id, ERTS_PORT_GET_CONNECTED(prt), am_eof); } else { /* XXX UGLY WORK AROUND, Let do_exit_port terminate the port */ if (prt->port_data_lock) @@ -4638,18 +4727,20 @@ int driver_exit(ErlDrvPort ix, int err) Port* prt = erts_drvport2port(ix, NULL); Process* rp; ErtsLink *lnk, *rlnk = NULL; + Eterm connected; ERTS_SMP_CHK_NO_PROC_LOCKS; if (prt == NULL) return -1; - rp = erts_pid2proc(NULL, 0, prt->connected, ERTS_PROC_LOCK_LINK); + connected = ERTS_PORT_GET_CONNECTED(prt); + rp = erts_pid2proc(NULL, 0, connected, ERTS_PROC_LOCK_LINK); if (rp) { rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id); } - lnk = erts_remove_link(&ERTS_P_LINKS(prt),prt->connected); + lnk = erts_remove_link(&ERTS_P_LINKS(prt), connected); #ifdef ERTS_SMP if (rp) @@ -4718,7 +4809,7 @@ ErlDrvTermData driver_connected(ErlDrvPort ix) if (prt == NULL) return NIL; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - return prt->connected; + return ERTS_PORT_GET_CONNECTED(prt); } ErlDrvTermData driver_caller(ErlDrvPort ix) |