From 23c6f9e07a3cae7c05e55abd01ff798384241538 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Sun, 16 Sep 2012 02:45:32 +0200 Subject: Add erl_drv_[send|output]_term --- erts/emulator/beam/erl_bif_info.c | 2 +- erts/emulator/beam/erl_bif_port.c | 8 +- erts/emulator/beam/erl_driver.h | 30 +++++++- erts/emulator/beam/erl_port.h | 13 +++- erts/emulator/beam/erl_port_task.c | 2 +- erts/emulator/beam/io.c | 149 +++++++++++++++++++++++++++++-------- 6 files changed, 164 insertions(+), 40 deletions(-) (limited to 'erts/emulator/beam') diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 0586a89041..44c29252b2 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -2915,7 +2915,7 @@ static BIF_RETTYPE port_info(Process* p, Eterm portid, Eterm item) } else if (item == am_connected) { hp = HAlloc(p, 3); - res = prt->connected; /* internal pid */ + res = ERTS_PORT_GET_CONNECTED(prt); /* internal pid */ } else if (item == am_input) { Uint hsz = 3; diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c index 477671f11f..0d0733b8d0 100644 --- a/erts/emulator/beam/erl_bif_port.c +++ b/erts/emulator/beam/erl_bif_port.c @@ -354,7 +354,7 @@ port_call(Process* c_p, Eterm arg1, Eterm arg2, Eterm arg3) DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); - dtrace_pid_str(p->connected, process_str); + dtrace_pid_str(ERTS_PORT_GET_CONNECTED(p), process_str); dtrace_port_str(p, port_str); DTRACE5(driver_call, process_str, port_str, p->name, op, real_size); } @@ -516,7 +516,7 @@ BIF_RETTYPE port_close_1(BIF_ALIST_1) erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); BIF_ERROR(BIF_P, BADARG); } - erts_do_exit_port(p, p->connected, am_normal); + erts_do_exit_port(p, ERTS_PORT_GET_CONNECTED(p), am_normal); /* if !ERTS_SMP: since we terminate port with reason normal we SHOULD never get an exit signal ourselves */ @@ -553,7 +553,7 @@ BIF_RETTYPE port_connect_2(BIF_ALIST_2) erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - prt->connected = pid; /* internal pid */ + ERTS_PORT_SET_CONNECTED_RELB(prt, pid); /* internal pid */ erts_port_release(prt); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_connect)) { @@ -561,7 +561,7 @@ BIF_RETTYPE port_connect_2(BIF_ALIST_2) DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); DTRACE_CHARBUF(newprocess_str, DTRACE_TERM_BUF_SIZE); - dtrace_pid_str(prt->connected, process_str); + dtrace_pid_str(pid, process_str); erts_snprintf(port_str, sizeof(port_str), "%T", prt->id); dtrace_proc_str(rp, newprocess_str); DTRACE4(port_connect, process_str, port_str, prt->name, newprocess_str); diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h index 1dc43419fe..fb9e92e44b 100644 --- a/erts/emulator/beam/erl_driver.h +++ b/erts/emulator/beam/erl_driver.h @@ -136,7 +136,7 @@ typedef struct { #define ERL_DRV_EXTENDED_MARKER (0xfeeeeeed) #define ERL_DRV_EXTENDED_MAJOR_VERSION 2 -#define ERL_DRV_EXTENDED_MINOR_VERSION 0 +#define ERL_DRV_EXTENDED_MINOR_VERSION 1 /* * The emulator will refuse to load a driver with different major @@ -601,11 +601,33 @@ EXTERN ErlDrvPort driver_create_port(ErlDrvPort creator_port, ErlDrvData drv_data); +/* + * driver_output_term() is deprecated, and scheduled for removal in + * OTP-R17. Use erl_drv_output_term() instead. For more information + * see the erl_driver(3) documentation. + */ +EXTERN int driver_output_term(ErlDrvPort ix, + ErlDrvTermData* data, + int len) ERL_DRV_DEPRECATED_FUNC; +/* + * driver_send_term() is deprecated, and scheduled for removal in + * OTP-R17. Use erl_drv_send_term() instead. For more information + * see the erl_driver(3) documentation. + */ +EXTERN int driver_send_term(ErlDrvPort ix, + ErlDrvTermData to, + ErlDrvTermData* data, + int len) ERL_DRV_DEPRECATED_FUNC; + /* output term data to the port owner */ -EXTERN int driver_output_term(ErlDrvPort ix, ErlDrvTermData* data, int len); +EXTERN int erl_drv_output_term(ErlDrvTermData port, + ErlDrvTermData* data, + int len); /* output term data to a specific process */ -EXTERN int driver_send_term(ErlDrvPort ix, ErlDrvTermData to, - ErlDrvTermData* data, int len); +EXTERN int erl_drv_send_term(ErlDrvTermData port, + ErlDrvTermData to, + ErlDrvTermData* data, + int len); /* Async IO functions */ EXTERN long driver_async(ErlDrvPort ix, diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index 00394f279e..6ad92dcd7d 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -113,7 +113,7 @@ struct _erl_drv_port { erts_atomic32_t refc; int cleanup; #endif - Eterm connected; /* A connected process */ + erts_smp_atomic_t connected;/* A connected process */ Eterm caller; /* Current caller. */ Eterm data; /* Data associated with port. */ ErlHeapFragment* bp; /* Heap fragment holding data (NULL if imm data). */ @@ -358,6 +358,17 @@ erts_smp_port_unlock(Port *prt) #define ERTS_PORT_SCHED_ID(P, ID) \ ((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID))) +#define ERTS_PORT_INIT_CONNECTED(PRT, PID) \ + erts_smp_atomic_init_nob(&(PRT)->connected, (erts_aint_t) (PID)) +#define ERTS_PORT_SET_CONNECTED(PRT, PID) \ + erts_smp_atomic_set_nob(&(PRT)->connected, (erts_aint_t) (PID)) +#define ERTS_PORT_SET_CONNECTED_RELB(PRT, PID) \ + erts_smp_atomic_set_relb(&(PRT)->connected, (erts_aint_t) (PID)) +#define ERTS_PORT_GET_CONNECTED(PRT) \ + ((Eterm) erts_smp_atomic_read_nob(&(PRT)->connected)) +#define ERTS_PORT_GET_CONNECTED_ACQB(PRT) \ + ((Eterm) erts_smp_atomic_read_acqb(&(PRT)->connected)) + extern const Port erts_invalid_port; #define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port) diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index dceecda973..e613ad0144 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -56,7 +56,7 @@ DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); \ DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); \ \ - dtrace_pid_str(PP->connected, process_str); \ + dtrace_pid_str(ERTS_PORT_GET_CONNECTED(PP), process_str); \ dtrace_port_str(PP, port_str); \ DTRACE3(PROBE_NAME, process_str, port_str, PP->name); \ } diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 96a89a3b4e..6776f72894 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -323,7 +323,7 @@ static Port *create_port(char *name, prt->bytes_in = 0; prt->bytes_out = 0; prt->dist_entry = NULL; - prt->connected = pid; + ERTS_PORT_INIT_CONNECTED(prt, pid); prt->common.u.alive.reg = NULL; #ifdef ERTS_SMP prt->common.u.alive.ptimer = NULL; @@ -1684,7 +1684,7 @@ static void flush_linebuf_messages(Port *prt, erts_aint32_t state) while((ret = flush_linebuf(&lc)) > LINEBUF_EMPTY) deliver_read_message(prt, state, - prt->connected, + ERTS_PORT_GET_CONNECTED(prt), NULL, 0, LINEBUF_DATA(lc), @@ -1847,7 +1847,7 @@ static void flush_port(Port *p) if (p->drv_ptr->flush != NULL) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_flush)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p) + DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p) DTRACE3(driver_flush, process_str, port_str, p->name); } #endif @@ -1892,7 +1892,7 @@ terminate_port(Port *prt) ~ERTS_PORT_SFLG_SEND_CLOSED); if (state & ERTS_PORT_SFLG_SEND_CLOSED) { send_closed_port_id = prt->common.id; - connected_id = prt->connected; + connected_id = ERTS_PORT_GET_CONNECTED(prt); } else { send_closed_port_id = NIL; @@ -1909,7 +1909,7 @@ terminate_port(Port *prt) int fpe_was_unmasked = erts_block_fpe(); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_stop)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt) + DTRACE_FORMAT_COMMON_PID_AND_PORT(connected_id, prt) DTRACE3(driver_stop, process_str, drv->name, port_str); } #endif @@ -2092,7 +2092,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) state = erts_atomic32_read_nob(&p->state); if ((state & (ERTS_PORT_SFLGS_DEAD|ERTS_PORT_SFLG_EXITING)) || ((reason == am_normal) && - ((from != p->connected) && (from != p->common.id)))) { + ((from != ERTS_PORT_GET_CONNECTED(p)) && (from != p->common.id)))) { return; } @@ -2186,7 +2186,7 @@ void erts_port_command(Process *proc, if (is_tuple_arity(command, 2)) { tp = tuple_val(command); - if ((pid = port->connected) == tp[1]) { + if ((pid = ERTS_PORT_GET_CONNECTED(port)) == tp[1]) { /* PID must be connected */ if (tp[2] == am_close) { erts_atomic32_read_bor_relb(&port->state, @@ -2212,7 +2212,7 @@ void erts_port_command(Process *proc, DTRACE4(port_command, process_str, port_str, port->name, "connect"); } #endif - port->connected = tp[2]; + ERTS_PORT_SET_CONNECTED_RELB(port, tp[2]); deliver_result(port->common.id, pid, am_connected); goto done; } @@ -2223,7 +2223,7 @@ void erts_port_command(Process *proc, { ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; Process* rp = erts_pid2proc(NULL, 0, - port->connected, rp_locks); + ERTS_PORT_GET_CONNECTED(port), rp_locks); if (rp) { (void) erts_send_exit_signal(NULL, port->common.id, @@ -2423,7 +2423,7 @@ print_port_info(Port *p, int to, void *arg) erts_print(to, arg, "=port:%T\n", p->common.id); erts_print(to, arg, "Slot: %d\n", internal_port_index(p->common.id)); if (state & ERTS_PORT_SFLG_CONNECTED) { - erts_print(to, arg, "Connected: %T", p->connected); + erts_print(to, arg, "Connected: %T", ERTS_PORT_GET_CONNECTED(p)); erts_print(to, arg, "\n"); } @@ -2627,7 +2627,7 @@ int async_ready(Port *p, void* data) if (p->drv_ptr->ready_async != NULL) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_ready_async)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p) + DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p) DTRACE3(driver_ready_async, process_str, port_str, p->name); } #endif @@ -2804,7 +2804,7 @@ void driver_report_exit(ErlDrvPort ix, int status) ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - pid = prt->connected; + pid = ERTS_PORT_GET_CONNECTED(prt); ASSERT(is_internal_pid(pid)); rp = (scheduler @@ -3384,28 +3384,114 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) #undef ERTS_DDT_FAIL } +static ERTS_INLINE int +deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p) +{ +#ifdef ERTS_SMP + ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay(); +#endif + Port *prt = erts_port_lookup_raw((Eterm) port_id); + erts_aint32_t state = erts_atomic32_read_nob(&prt->state); + if (connected_p) { +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + ETHR_MEMBAR(ETHR_LoadLoad); +#endif + *connected_p = ERTS_PORT_GET_CONNECTED(prt); + } +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) { + erts_thr_progress_unmanaged_continue(dhndl); + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); + } +#endif + ERTS_SMP_LC_ASSERT(dhndl == ERTS_THR_PRGR_DHANDLE_MANAGED + ? erts_lc_is_port_locked(prt) + : !erts_lc_is_port_locked(prt)); + return ((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + ? -1 + : ((state & ERTS_PORT_SFLG_CLOSING) ? 0 : 1)); +} + +int erl_drv_output_term(ErlDrvTermData port_id, ErlDrvTermData* data, int len) +{ + /* May be called from arbitrary thread */ + Eterm connected; + int res = deliver_term_check_port(port_id, &connected); + if (res <= 0) + return res; + return driver_deliver_term(connected, data, len); +} + +/* + * driver_output_term() is deprecated, and has been scheduled for + * removal in OTP-R17. It is replaced by erl_drv_output_term() + * above. + */ int driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len) { erts_aint32_t state; - Port* prt = erts_drvport2port(drvport, &state); + Port* prt; + + ERTS_SMP_CHK_NO_PROC_LOCKS; + /* NOTE! It *not* safe to access 'drvport' from unmanaged threads. */ + prt = erts_drvport2port(drvport, &state); if (!prt) return -1; /* invalid (dead) */ ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) - return driver_deliver_term(prt->connected, data, len); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + return -1; else if (state & ERTS_PORT_SFLG_CLOSING) - return 0; /* closing */ - else - return -1; /* invalid (dead) */ + return 0; + + return driver_deliver_term(ERTS_PORT_GET_CONNECTED(prt), data, len); } +int erl_drv_send_term(ErlDrvTermData port_id, + ErlDrvTermData to, + ErlDrvTermData* data, + int len) +{ + /* May be called from arbitrary thread */ + int res = deliver_term_check_port(port_id, NULL); + if (res <= 0) + return res; + return driver_deliver_term(to, data, len); +} +/* + * driver_send_term() is deprecated, and has been scheduled for + * removal in OTP-R17. It is replaced by erl_drv_send_term() above. + */ int -driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len) +driver_send_term(ErlDrvPort drvport, + ErlDrvTermData to, + ErlDrvTermData* data, + int len) { - /* driver_send_term() assume port is ok... */ + /* + * NOTE! It is *not* safe to access the 'drvport' parameter + * from unmanaged threads. Also note that it is impossible + * to make this access safe without using a less efficient + * internal data representation for ErlDrvPort. + */ + ERTS_SMP_CHK_NO_PROC_LOCKS; +#ifdef ERTS_SMP + if (erts_thr_progress_is_managed_thread()) +#endif + { + erts_aint32_t state; + Port* prt = erts_drvport2port(drvport, &state); + if (!prt) + return -1; /* invalid (dead) */ + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + return -1; + else if (state & ERTS_PORT_SFLG_CLOSING) + return 0; + } return driver_deliver_term(to, data, len); } @@ -3438,7 +3524,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, (byte*) (bin->orig_bytes+offs), len); } else - deliver_bin_message(prt, prt->connected, + deliver_bin_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen, bin, offs, len); return 0; } @@ -3481,9 +3567,11 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, (byte*) buf, len); } else if (state & ERTS_PORT_SFLG_LINEBUF_IO) - deliver_linebuf_message(prt, state, prt->connected, hbuf, hlen, buf, len); + deliver_linebuf_message(prt, state, ERTS_PORT_GET_CONNECTED(prt), + hbuf, hlen, buf, len); else - deliver_read_message(prt, state, prt->connected, hbuf, hlen, buf, len, 0); + deliver_read_message(prt, state, ERTS_PORT_GET_CONNECTED(prt), + hbuf, hlen, buf, len, 0); return 0; } @@ -3548,7 +3636,8 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, /* XXX handle distribution !!! */ prt->bytes_in += (hlen + size); erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + size)); - deliver_vec_message(prt, prt->connected, hbuf, hlen, binv, iov, n, size); + deliver_vec_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen, + binv, iov, n, size); return 0; } @@ -4581,7 +4670,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) DRV_MONITOR_UNLOCK_PDL(prt); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_process_exit)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt) + DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(prt), prt) DTRACE3(driver_process_exit, process_str, port_str, prt->name); } #endif @@ -4614,7 +4703,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) if (state & ERTS_PORT_SFLG_CLOSING) { terminate_port(prt); } else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) { - deliver_result(prt->common.id, prt->connected, am_eof); + deliver_result(prt->common.id, ERTS_PORT_GET_CONNECTED(prt), am_eof); } else { /* XXX UGLY WORK AROUND, Let do_exit_port terminate the port */ if (prt->port_data_lock) @@ -4638,18 +4727,20 @@ int driver_exit(ErlDrvPort ix, int err) Port* prt = erts_drvport2port(ix, NULL); Process* rp; ErtsLink *lnk, *rlnk = NULL; + Eterm connected; ERTS_SMP_CHK_NO_PROC_LOCKS; if (prt == NULL) return -1; - rp = erts_pid2proc(NULL, 0, prt->connected, ERTS_PROC_LOCK_LINK); + connected = ERTS_PORT_GET_CONNECTED(prt); + rp = erts_pid2proc(NULL, 0, connected, ERTS_PROC_LOCK_LINK); if (rp) { rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id); } - lnk = erts_remove_link(&ERTS_P_LINKS(prt),prt->connected); + lnk = erts_remove_link(&ERTS_P_LINKS(prt), connected); #ifdef ERTS_SMP if (rp) @@ -4718,7 +4809,7 @@ ErlDrvTermData driver_connected(ErlDrvPort ix) if (prt == NULL) return NIL; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - return prt->connected; + return ERTS_PORT_GET_CONNECTED(prt); } ErlDrvTermData driver_caller(ErlDrvPort ix) -- cgit v1.2.3