aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-09-16 02:45:32 +0200
committerRickard Green <[email protected]>2012-12-03 21:18:10 +0100
commit23c6f9e07a3cae7c05e55abd01ff798384241538 (patch)
treedc62a1976380ea69f3843e1d53ef53526fdabd3e /erts/emulator/beam
parent34fc6f243f8a462f4b2370a9fe5376df1ca08f1d (diff)
downloadotp-23c6f9e07a3cae7c05e55abd01ff798384241538.tar.gz
otp-23c6f9e07a3cae7c05e55abd01ff798384241538.tar.bz2
otp-23c6f9e07a3cae7c05e55abd01ff798384241538.zip
Add erl_drv_[send|output]_term
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/erl_bif_info.c2
-rw-r--r--erts/emulator/beam/erl_bif_port.c8
-rw-r--r--erts/emulator/beam/erl_driver.h30
-rw-r--r--erts/emulator/beam/erl_port.h13
-rw-r--r--erts/emulator/beam/erl_port_task.c2
-rw-r--r--erts/emulator/beam/io.c149
6 files changed, 164 insertions, 40 deletions
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 0586a89041..44c29252b2 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -2915,7 +2915,7 @@ static BIF_RETTYPE port_info(Process* p, Eterm portid, Eterm item)
}
else if (item == am_connected) {
hp = HAlloc(p, 3);
- res = prt->connected; /* internal pid */
+ res = ERTS_PORT_GET_CONNECTED(prt); /* internal pid */
}
else if (item == am_input) {
Uint hsz = 3;
diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c
index 477671f11f..0d0733b8d0 100644
--- a/erts/emulator/beam/erl_bif_port.c
+++ b/erts/emulator/beam/erl_bif_port.c
@@ -354,7 +354,7 @@ port_call(Process* c_p, Eterm arg1, Eterm arg2, Eterm arg3)
DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);
DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);
- dtrace_pid_str(p->connected, process_str);
+ dtrace_pid_str(ERTS_PORT_GET_CONNECTED(p), process_str);
dtrace_port_str(p, port_str);
DTRACE5(driver_call, process_str, port_str, p->name, op, real_size);
}
@@ -516,7 +516,7 @@ BIF_RETTYPE port_close_1(BIF_ALIST_1)
erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
BIF_ERROR(BIF_P, BADARG);
}
- erts_do_exit_port(p, p->connected, am_normal);
+ erts_do_exit_port(p, ERTS_PORT_GET_CONNECTED(p), am_normal);
/* if !ERTS_SMP: since we terminate port with reason normal
we SHOULD never get an exit signal ourselves
*/
@@ -553,7 +553,7 @@ BIF_RETTYPE port_connect_2(BIF_ALIST_2)
erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- prt->connected = pid; /* internal pid */
+ ERTS_PORT_SET_CONNECTED_RELB(prt, pid); /* internal pid */
erts_port_release(prt);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_connect)) {
@@ -561,7 +561,7 @@ BIF_RETTYPE port_connect_2(BIF_ALIST_2)
DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);
DTRACE_CHARBUF(newprocess_str, DTRACE_TERM_BUF_SIZE);
- dtrace_pid_str(prt->connected, process_str);
+ dtrace_pid_str(pid, process_str);
erts_snprintf(port_str, sizeof(port_str), "%T", prt->id);
dtrace_proc_str(rp, newprocess_str);
DTRACE4(port_connect, process_str, port_str, prt->name, newprocess_str);
diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h
index 1dc43419fe..fb9e92e44b 100644
--- a/erts/emulator/beam/erl_driver.h
+++ b/erts/emulator/beam/erl_driver.h
@@ -136,7 +136,7 @@ typedef struct {
#define ERL_DRV_EXTENDED_MARKER (0xfeeeeeed)
#define ERL_DRV_EXTENDED_MAJOR_VERSION 2
-#define ERL_DRV_EXTENDED_MINOR_VERSION 0
+#define ERL_DRV_EXTENDED_MINOR_VERSION 1
/*
* The emulator will refuse to load a driver with different major
@@ -601,11 +601,33 @@ EXTERN ErlDrvPort driver_create_port(ErlDrvPort creator_port,
ErlDrvData drv_data);
+/*
+ * driver_output_term() is deprecated, and scheduled for removal in
+ * OTP-R17. Use erl_drv_output_term() instead. For more information
+ * see the erl_driver(3) documentation.
+ */
+EXTERN int driver_output_term(ErlDrvPort ix,
+ ErlDrvTermData* data,
+ int len) ERL_DRV_DEPRECATED_FUNC;
+/*
+ * driver_send_term() is deprecated, and scheduled for removal in
+ * OTP-R17. Use erl_drv_send_term() instead. For more information
+ * see the erl_driver(3) documentation.
+ */
+EXTERN int driver_send_term(ErlDrvPort ix,
+ ErlDrvTermData to,
+ ErlDrvTermData* data,
+ int len) ERL_DRV_DEPRECATED_FUNC;
+
/* output term data to the port owner */
-EXTERN int driver_output_term(ErlDrvPort ix, ErlDrvTermData* data, int len);
+EXTERN int erl_drv_output_term(ErlDrvTermData port,
+ ErlDrvTermData* data,
+ int len);
/* output term data to a specific process */
-EXTERN int driver_send_term(ErlDrvPort ix, ErlDrvTermData to,
- ErlDrvTermData* data, int len);
+EXTERN int erl_drv_send_term(ErlDrvTermData port,
+ ErlDrvTermData to,
+ ErlDrvTermData* data,
+ int len);
/* Async IO functions */
EXTERN long driver_async(ErlDrvPort ix,
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index 00394f279e..6ad92dcd7d 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -113,7 +113,7 @@ struct _erl_drv_port {
erts_atomic32_t refc;
int cleanup;
#endif
- Eterm connected; /* A connected process */
+ erts_smp_atomic_t connected;/* A connected process */
Eterm caller; /* Current caller. */
Eterm data; /* Data associated with port. */
ErlHeapFragment* bp; /* Heap fragment holding data (NULL if imm data). */
@@ -358,6 +358,17 @@ erts_smp_port_unlock(Port *prt)
#define ERTS_PORT_SCHED_ID(P, ID) \
((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID)))
+#define ERTS_PORT_INIT_CONNECTED(PRT, PID) \
+ erts_smp_atomic_init_nob(&(PRT)->connected, (erts_aint_t) (PID))
+#define ERTS_PORT_SET_CONNECTED(PRT, PID) \
+ erts_smp_atomic_set_nob(&(PRT)->connected, (erts_aint_t) (PID))
+#define ERTS_PORT_SET_CONNECTED_RELB(PRT, PID) \
+ erts_smp_atomic_set_relb(&(PRT)->connected, (erts_aint_t) (PID))
+#define ERTS_PORT_GET_CONNECTED(PRT) \
+ ((Eterm) erts_smp_atomic_read_nob(&(PRT)->connected))
+#define ERTS_PORT_GET_CONNECTED_ACQB(PRT) \
+ ((Eterm) erts_smp_atomic_read_acqb(&(PRT)->connected))
+
extern const Port erts_invalid_port;
#define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port)
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index dceecda973..e613ad0144 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -56,7 +56,7 @@
DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); \
DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); \
\
- dtrace_pid_str(PP->connected, process_str); \
+ dtrace_pid_str(ERTS_PORT_GET_CONNECTED(PP), process_str); \
dtrace_port_str(PP, port_str); \
DTRACE3(PROBE_NAME, process_str, port_str, PP->name); \
}
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 96a89a3b4e..6776f72894 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -323,7 +323,7 @@ static Port *create_port(char *name,
prt->bytes_in = 0;
prt->bytes_out = 0;
prt->dist_entry = NULL;
- prt->connected = pid;
+ ERTS_PORT_INIT_CONNECTED(prt, pid);
prt->common.u.alive.reg = NULL;
#ifdef ERTS_SMP
prt->common.u.alive.ptimer = NULL;
@@ -1684,7 +1684,7 @@ static void flush_linebuf_messages(Port *prt, erts_aint32_t state)
while((ret = flush_linebuf(&lc)) > LINEBUF_EMPTY)
deliver_read_message(prt,
state,
- prt->connected,
+ ERTS_PORT_GET_CONNECTED(prt),
NULL,
0,
LINEBUF_DATA(lc),
@@ -1847,7 +1847,7 @@ static void flush_port(Port *p)
if (p->drv_ptr->flush != NULL) {
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_flush)) {
- DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p)
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p)
DTRACE3(driver_flush, process_str, port_str, p->name);
}
#endif
@@ -1892,7 +1892,7 @@ terminate_port(Port *prt)
~ERTS_PORT_SFLG_SEND_CLOSED);
if (state & ERTS_PORT_SFLG_SEND_CLOSED) {
send_closed_port_id = prt->common.id;
- connected_id = prt->connected;
+ connected_id = ERTS_PORT_GET_CONNECTED(prt);
}
else {
send_closed_port_id = NIL;
@@ -1909,7 +1909,7 @@ terminate_port(Port *prt)
int fpe_was_unmasked = erts_block_fpe();
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_stop)) {
- DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt)
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(connected_id, prt)
DTRACE3(driver_stop, process_str, drv->name, port_str);
}
#endif
@@ -2092,7 +2092,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
state = erts_atomic32_read_nob(&p->state);
if ((state & (ERTS_PORT_SFLGS_DEAD|ERTS_PORT_SFLG_EXITING))
|| ((reason == am_normal) &&
- ((from != p->connected) && (from != p->common.id)))) {
+ ((from != ERTS_PORT_GET_CONNECTED(p)) && (from != p->common.id)))) {
return;
}
@@ -2186,7 +2186,7 @@ void erts_port_command(Process *proc,
if (is_tuple_arity(command, 2)) {
tp = tuple_val(command);
- if ((pid = port->connected) == tp[1]) {
+ if ((pid = ERTS_PORT_GET_CONNECTED(port)) == tp[1]) {
/* PID must be connected */
if (tp[2] == am_close) {
erts_atomic32_read_bor_relb(&port->state,
@@ -2212,7 +2212,7 @@ void erts_port_command(Process *proc,
DTRACE4(port_command, process_str, port_str, port->name, "connect");
}
#endif
- port->connected = tp[2];
+ ERTS_PORT_SET_CONNECTED_RELB(port, tp[2]);
deliver_result(port->common.id, pid, am_connected);
goto done;
}
@@ -2223,7 +2223,7 @@ void erts_port_command(Process *proc,
{
ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
Process* rp = erts_pid2proc(NULL, 0,
- port->connected, rp_locks);
+ ERTS_PORT_GET_CONNECTED(port), rp_locks);
if (rp) {
(void) erts_send_exit_signal(NULL,
port->common.id,
@@ -2423,7 +2423,7 @@ print_port_info(Port *p, int to, void *arg)
erts_print(to, arg, "=port:%T\n", p->common.id);
erts_print(to, arg, "Slot: %d\n", internal_port_index(p->common.id));
if (state & ERTS_PORT_SFLG_CONNECTED) {
- erts_print(to, arg, "Connected: %T", p->connected);
+ erts_print(to, arg, "Connected: %T", ERTS_PORT_GET_CONNECTED(p));
erts_print(to, arg, "\n");
}
@@ -2627,7 +2627,7 @@ int async_ready(Port *p, void* data)
if (p->drv_ptr->ready_async != NULL) {
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_ready_async)) {
- DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p)
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p)
DTRACE3(driver_ready_async, process_str, port_str, p->name);
}
#endif
@@ -2804,7 +2804,7 @@ void driver_report_exit(ErlDrvPort ix, int status)
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- pid = prt->connected;
+ pid = ERTS_PORT_GET_CONNECTED(prt);
ASSERT(is_internal_pid(pid));
rp = (scheduler
@@ -3384,28 +3384,114 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
#undef ERTS_DDT_FAIL
}
+static ERTS_INLINE int
+deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p)
+{
+#ifdef ERTS_SMP
+ ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
+#endif
+ Port *prt = erts_port_lookup_raw((Eterm) port_id);
+ erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
+ if (connected_p) {
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ ETHR_MEMBAR(ETHR_LoadLoad);
+#endif
+ *connected_p = ERTS_PORT_GET_CONNECTED(prt);
+ }
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
+ erts_thr_progress_unmanaged_continue(dhndl);
+ ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
+ }
+#endif
+ ERTS_SMP_LC_ASSERT(dhndl == ERTS_THR_PRGR_DHANDLE_MANAGED
+ ? erts_lc_is_port_locked(prt)
+ : !erts_lc_is_port_locked(prt));
+ return ((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ ? -1
+ : ((state & ERTS_PORT_SFLG_CLOSING) ? 0 : 1));
+}
+
+int erl_drv_output_term(ErlDrvTermData port_id, ErlDrvTermData* data, int len)
+{
+ /* May be called from arbitrary thread */
+ Eterm connected;
+ int res = deliver_term_check_port(port_id, &connected);
+ if (res <= 0)
+ return res;
+ return driver_deliver_term(connected, data, len);
+}
+
+/*
+ * driver_output_term() is deprecated, and has been scheduled for
+ * removal in OTP-R17. It is replaced by erl_drv_output_term()
+ * above.
+ */
int
driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len)
{
erts_aint32_t state;
- Port* prt = erts_drvport2port(drvport, &state);
+ Port* prt;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ /* NOTE! It *not* safe to access 'drvport' from unmanaged threads. */
+ prt = erts_drvport2port(drvport, &state);
if (!prt)
return -1; /* invalid (dead) */
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP))
- return driver_deliver_term(prt->connected, data, len);
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ return -1;
else if (state & ERTS_PORT_SFLG_CLOSING)
- return 0; /* closing */
- else
- return -1; /* invalid (dead) */
+ return 0;
+
+ return driver_deliver_term(ERTS_PORT_GET_CONNECTED(prt), data, len);
}
+int erl_drv_send_term(ErlDrvTermData port_id,
+ ErlDrvTermData to,
+ ErlDrvTermData* data,
+ int len)
+{
+ /* May be called from arbitrary thread */
+ int res = deliver_term_check_port(port_id, NULL);
+ if (res <= 0)
+ return res;
+ return driver_deliver_term(to, data, len);
+}
+/*
+ * driver_send_term() is deprecated, and has been scheduled for
+ * removal in OTP-R17. It is replaced by erl_drv_send_term() above.
+ */
int
-driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len)
+driver_send_term(ErlDrvPort drvport,
+ ErlDrvTermData to,
+ ErlDrvTermData* data,
+ int len)
{
- /* driver_send_term() assume port is ok... */
+ /*
+ * NOTE! It is *not* safe to access the 'drvport' parameter
+ * from unmanaged threads. Also note that it is impossible
+ * to make this access safe without using a less efficient
+ * internal data representation for ErlDrvPort.
+ */
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+#ifdef ERTS_SMP
+ if (erts_thr_progress_is_managed_thread())
+#endif
+ {
+ erts_aint32_t state;
+ Port* prt = erts_drvport2port(drvport, &state);
+ if (!prt)
+ return -1; /* invalid (dead) */
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ return -1;
+ else if (state & ERTS_PORT_SFLG_CLOSING)
+ return 0;
+ }
return driver_deliver_term(to, data, len);
}
@@ -3438,7 +3524,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
(byte*) (bin->orig_bytes+offs), len);
}
else
- deliver_bin_message(prt, prt->connected,
+ deliver_bin_message(prt, ERTS_PORT_GET_CONNECTED(prt),
hbuf, hlen, bin, offs, len);
return 0;
}
@@ -3481,9 +3567,11 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
(byte*) buf, len);
}
else if (state & ERTS_PORT_SFLG_LINEBUF_IO)
- deliver_linebuf_message(prt, state, prt->connected, hbuf, hlen, buf, len);
+ deliver_linebuf_message(prt, state, ERTS_PORT_GET_CONNECTED(prt),
+ hbuf, hlen, buf, len);
else
- deliver_read_message(prt, state, prt->connected, hbuf, hlen, buf, len, 0);
+ deliver_read_message(prt, state, ERTS_PORT_GET_CONNECTED(prt),
+ hbuf, hlen, buf, len, 0);
return 0;
}
@@ -3548,7 +3636,8 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
/* XXX handle distribution !!! */
prt->bytes_in += (hlen + size);
erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + size));
- deliver_vec_message(prt, prt->connected, hbuf, hlen, binv, iov, n, size);
+ deliver_vec_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen,
+ binv, iov, n, size);
return 0;
}
@@ -4581,7 +4670,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
DRV_MONITOR_UNLOCK_PDL(prt);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_process_exit)) {
- DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt)
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(prt), prt)
DTRACE3(driver_process_exit, process_str, port_str, prt->name);
}
#endif
@@ -4614,7 +4703,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
if (state & ERTS_PORT_SFLG_CLOSING) {
terminate_port(prt);
} else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) {
- deliver_result(prt->common.id, prt->connected, am_eof);
+ deliver_result(prt->common.id, ERTS_PORT_GET_CONNECTED(prt), am_eof);
} else {
/* XXX UGLY WORK AROUND, Let do_exit_port terminate the port */
if (prt->port_data_lock)
@@ -4638,18 +4727,20 @@ int driver_exit(ErlDrvPort ix, int err)
Port* prt = erts_drvport2port(ix, NULL);
Process* rp;
ErtsLink *lnk, *rlnk = NULL;
+ Eterm connected;
ERTS_SMP_CHK_NO_PROC_LOCKS;
if (prt == NULL)
return -1;
- rp = erts_pid2proc(NULL, 0, prt->connected, ERTS_PROC_LOCK_LINK);
+ connected = ERTS_PORT_GET_CONNECTED(prt);
+ rp = erts_pid2proc(NULL, 0, connected, ERTS_PROC_LOCK_LINK);
if (rp) {
rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id);
}
- lnk = erts_remove_link(&ERTS_P_LINKS(prt),prt->connected);
+ lnk = erts_remove_link(&ERTS_P_LINKS(prt), connected);
#ifdef ERTS_SMP
if (rp)
@@ -4718,7 +4809,7 @@ ErlDrvTermData driver_connected(ErlDrvPort ix)
if (prt == NULL)
return NIL;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- return prt->connected;
+ return ERTS_PORT_GET_CONNECTED(prt);
}
ErlDrvTermData driver_caller(ErlDrvPort ix)