aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-09-16 02:45:32 +0200
committerRickard Green <[email protected]>2012-12-03 21:18:10 +0100
commit23c6f9e07a3cae7c05e55abd01ff798384241538 (patch)
treedc62a1976380ea69f3843e1d53ef53526fdabd3e /erts/emulator/beam/io.c
parent34fc6f243f8a462f4b2370a9fe5376df1ca08f1d (diff)
downloadotp-23c6f9e07a3cae7c05e55abd01ff798384241538.tar.gz
otp-23c6f9e07a3cae7c05e55abd01ff798384241538.tar.bz2
otp-23c6f9e07a3cae7c05e55abd01ff798384241538.zip
Add erl_drv_[send|output]_term
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c149
1 files changed, 120 insertions, 29 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 96a89a3b4e..6776f72894 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -323,7 +323,7 @@ static Port *create_port(char *name,
prt->bytes_in = 0;
prt->bytes_out = 0;
prt->dist_entry = NULL;
- prt->connected = pid;
+ ERTS_PORT_INIT_CONNECTED(prt, pid);
prt->common.u.alive.reg = NULL;
#ifdef ERTS_SMP
prt->common.u.alive.ptimer = NULL;
@@ -1684,7 +1684,7 @@ static void flush_linebuf_messages(Port *prt, erts_aint32_t state)
while((ret = flush_linebuf(&lc)) > LINEBUF_EMPTY)
deliver_read_message(prt,
state,
- prt->connected,
+ ERTS_PORT_GET_CONNECTED(prt),
NULL,
0,
LINEBUF_DATA(lc),
@@ -1847,7 +1847,7 @@ static void flush_port(Port *p)
if (p->drv_ptr->flush != NULL) {
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_flush)) {
- DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p)
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p)
DTRACE3(driver_flush, process_str, port_str, p->name);
}
#endif
@@ -1892,7 +1892,7 @@ terminate_port(Port *prt)
~ERTS_PORT_SFLG_SEND_CLOSED);
if (state & ERTS_PORT_SFLG_SEND_CLOSED) {
send_closed_port_id = prt->common.id;
- connected_id = prt->connected;
+ connected_id = ERTS_PORT_GET_CONNECTED(prt);
}
else {
send_closed_port_id = NIL;
@@ -1909,7 +1909,7 @@ terminate_port(Port *prt)
int fpe_was_unmasked = erts_block_fpe();
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_stop)) {
- DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt)
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(connected_id, prt)
DTRACE3(driver_stop, process_str, drv->name, port_str);
}
#endif
@@ -2092,7 +2092,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
state = erts_atomic32_read_nob(&p->state);
if ((state & (ERTS_PORT_SFLGS_DEAD|ERTS_PORT_SFLG_EXITING))
|| ((reason == am_normal) &&
- ((from != p->connected) && (from != p->common.id)))) {
+ ((from != ERTS_PORT_GET_CONNECTED(p)) && (from != p->common.id)))) {
return;
}
@@ -2186,7 +2186,7 @@ void erts_port_command(Process *proc,
if (is_tuple_arity(command, 2)) {
tp = tuple_val(command);
- if ((pid = port->connected) == tp[1]) {
+ if ((pid = ERTS_PORT_GET_CONNECTED(port)) == tp[1]) {
/* PID must be connected */
if (tp[2] == am_close) {
erts_atomic32_read_bor_relb(&port->state,
@@ -2212,7 +2212,7 @@ void erts_port_command(Process *proc,
DTRACE4(port_command, process_str, port_str, port->name, "connect");
}
#endif
- port->connected = tp[2];
+ ERTS_PORT_SET_CONNECTED_RELB(port, tp[2]);
deliver_result(port->common.id, pid, am_connected);
goto done;
}
@@ -2223,7 +2223,7 @@ void erts_port_command(Process *proc,
{
ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
Process* rp = erts_pid2proc(NULL, 0,
- port->connected, rp_locks);
+ ERTS_PORT_GET_CONNECTED(port), rp_locks);
if (rp) {
(void) erts_send_exit_signal(NULL,
port->common.id,
@@ -2423,7 +2423,7 @@ print_port_info(Port *p, int to, void *arg)
erts_print(to, arg, "=port:%T\n", p->common.id);
erts_print(to, arg, "Slot: %d\n", internal_port_index(p->common.id));
if (state & ERTS_PORT_SFLG_CONNECTED) {
- erts_print(to, arg, "Connected: %T", p->connected);
+ erts_print(to, arg, "Connected: %T", ERTS_PORT_GET_CONNECTED(p));
erts_print(to, arg, "\n");
}
@@ -2627,7 +2627,7 @@ int async_ready(Port *p, void* data)
if (p->drv_ptr->ready_async != NULL) {
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_ready_async)) {
- DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p)
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p)
DTRACE3(driver_ready_async, process_str, port_str, p->name);
}
#endif
@@ -2804,7 +2804,7 @@ void driver_report_exit(ErlDrvPort ix, int status)
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- pid = prt->connected;
+ pid = ERTS_PORT_GET_CONNECTED(prt);
ASSERT(is_internal_pid(pid));
rp = (scheduler
@@ -3384,28 +3384,114 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
#undef ERTS_DDT_FAIL
}
+static ERTS_INLINE int
+deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p)
+{
+#ifdef ERTS_SMP
+ ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
+#endif
+ Port *prt = erts_port_lookup_raw((Eterm) port_id);
+ erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
+ if (connected_p) {
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ ETHR_MEMBAR(ETHR_LoadLoad);
+#endif
+ *connected_p = ERTS_PORT_GET_CONNECTED(prt);
+ }
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
+ erts_thr_progress_unmanaged_continue(dhndl);
+ ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
+ }
+#endif
+ ERTS_SMP_LC_ASSERT(dhndl == ERTS_THR_PRGR_DHANDLE_MANAGED
+ ? erts_lc_is_port_locked(prt)
+ : !erts_lc_is_port_locked(prt));
+ return ((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ ? -1
+ : ((state & ERTS_PORT_SFLG_CLOSING) ? 0 : 1));
+}
+
+int erl_drv_output_term(ErlDrvTermData port_id, ErlDrvTermData* data, int len)
+{
+ /* May be called from arbitrary thread */
+ Eterm connected;
+ int res = deliver_term_check_port(port_id, &connected);
+ if (res <= 0)
+ return res;
+ return driver_deliver_term(connected, data, len);
+}
+
+/*
+ * driver_output_term() is deprecated, and has been scheduled for
+ * removal in OTP-R17. It is replaced by erl_drv_output_term()
+ * above.
+ */
int
driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len)
{
erts_aint32_t state;
- Port* prt = erts_drvport2port(drvport, &state);
+ Port* prt;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ /* NOTE! It *not* safe to access 'drvport' from unmanaged threads. */
+ prt = erts_drvport2port(drvport, &state);
if (!prt)
return -1; /* invalid (dead) */
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP))
- return driver_deliver_term(prt->connected, data, len);
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ return -1;
else if (state & ERTS_PORT_SFLG_CLOSING)
- return 0; /* closing */
- else
- return -1; /* invalid (dead) */
+ return 0;
+
+ return driver_deliver_term(ERTS_PORT_GET_CONNECTED(prt), data, len);
}
+int erl_drv_send_term(ErlDrvTermData port_id,
+ ErlDrvTermData to,
+ ErlDrvTermData* data,
+ int len)
+{
+ /* May be called from arbitrary thread */
+ int res = deliver_term_check_port(port_id, NULL);
+ if (res <= 0)
+ return res;
+ return driver_deliver_term(to, data, len);
+}
+/*
+ * driver_send_term() is deprecated, and has been scheduled for
+ * removal in OTP-R17. It is replaced by erl_drv_send_term() above.
+ */
int
-driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len)
+driver_send_term(ErlDrvPort drvport,
+ ErlDrvTermData to,
+ ErlDrvTermData* data,
+ int len)
{
- /* driver_send_term() assume port is ok... */
+ /*
+ * NOTE! It is *not* safe to access the 'drvport' parameter
+ * from unmanaged threads. Also note that it is impossible
+ * to make this access safe without using a less efficient
+ * internal data representation for ErlDrvPort.
+ */
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+#ifdef ERTS_SMP
+ if (erts_thr_progress_is_managed_thread())
+#endif
+ {
+ erts_aint32_t state;
+ Port* prt = erts_drvport2port(drvport, &state);
+ if (!prt)
+ return -1; /* invalid (dead) */
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ return -1;
+ else if (state & ERTS_PORT_SFLG_CLOSING)
+ return 0;
+ }
return driver_deliver_term(to, data, len);
}
@@ -3438,7 +3524,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
(byte*) (bin->orig_bytes+offs), len);
}
else
- deliver_bin_message(prt, prt->connected,
+ deliver_bin_message(prt, ERTS_PORT_GET_CONNECTED(prt),
hbuf, hlen, bin, offs, len);
return 0;
}
@@ -3481,9 +3567,11 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
(byte*) buf, len);
}
else if (state & ERTS_PORT_SFLG_LINEBUF_IO)
- deliver_linebuf_message(prt, state, prt->connected, hbuf, hlen, buf, len);
+ deliver_linebuf_message(prt, state, ERTS_PORT_GET_CONNECTED(prt),
+ hbuf, hlen, buf, len);
else
- deliver_read_message(prt, state, prt->connected, hbuf, hlen, buf, len, 0);
+ deliver_read_message(prt, state, ERTS_PORT_GET_CONNECTED(prt),
+ hbuf, hlen, buf, len, 0);
return 0;
}
@@ -3548,7 +3636,8 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
/* XXX handle distribution !!! */
prt->bytes_in += (hlen + size);
erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + size));
- deliver_vec_message(prt, prt->connected, hbuf, hlen, binv, iov, n, size);
+ deliver_vec_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen,
+ binv, iov, n, size);
return 0;
}
@@ -4581,7 +4670,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
DRV_MONITOR_UNLOCK_PDL(prt);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_process_exit)) {
- DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt)
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(prt), prt)
DTRACE3(driver_process_exit, process_str, port_str, prt->name);
}
#endif
@@ -4614,7 +4703,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
if (state & ERTS_PORT_SFLG_CLOSING) {
terminate_port(prt);
} else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) {
- deliver_result(prt->common.id, prt->connected, am_eof);
+ deliver_result(prt->common.id, ERTS_PORT_GET_CONNECTED(prt), am_eof);
} else {
/* XXX UGLY WORK AROUND, Let do_exit_port terminate the port */
if (prt->port_data_lock)
@@ -4638,18 +4727,20 @@ int driver_exit(ErlDrvPort ix, int err)
Port* prt = erts_drvport2port(ix, NULL);
Process* rp;
ErtsLink *lnk, *rlnk = NULL;
+ Eterm connected;
ERTS_SMP_CHK_NO_PROC_LOCKS;
if (prt == NULL)
return -1;
- rp = erts_pid2proc(NULL, 0, prt->connected, ERTS_PROC_LOCK_LINK);
+ connected = ERTS_PORT_GET_CONNECTED(prt);
+ rp = erts_pid2proc(NULL, 0, connected, ERTS_PROC_LOCK_LINK);
if (rp) {
rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id);
}
- lnk = erts_remove_link(&ERTS_P_LINKS(prt),prt->connected);
+ lnk = erts_remove_link(&ERTS_P_LINKS(prt), connected);
#ifdef ERTS_SMP
if (rp)
@@ -4718,7 +4809,7 @@ ErlDrvTermData driver_connected(ErlDrvPort ix)
if (prt == NULL)
return NIL;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- return prt->connected;
+ return ERTS_PORT_GET_CONNECTED(prt);
}
ErlDrvTermData driver_caller(ErlDrvPort ix)