aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c192
1 files changed, 187 insertions, 5 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index fe1a7ba345..8a2a43bebd 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -43,6 +43,7 @@
#include "erl_version.h"
#include "error.h"
#include "erl_async.h"
+#include "dtrace-wrapper.h"
extern ErlDrvEntry fd_driver_entry;
extern ErlDrvEntry vanilla_driver_entry;
@@ -180,6 +181,20 @@ typedef struct line_buf_context {
#define LINEBUF_INITIAL 100
+#ifdef USE_VM_PROBES
+#define DTRACE_FORMAT_COMMON_PID_AND_PORT(PID, PORT) \
+ DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); \
+ DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); \
+ \
+ dtrace_pid_str((PID), process_str); \
+ dtrace_port_str((PORT), port_str);
+#define DTRACE_FORMAT_COMMON_PROC_AND_PORT(PID, PORT) \
+ DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); \
+ DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); \
+ \
+ dtrace_proc_str((PID), process_str); \
+ dtrace_port_str((PORT), port_str);
+#endif
/* The 'number' field in a port now has two parts: the lowest bits
contain the index in the port table, and the higher bits are a counter
@@ -639,6 +654,12 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
trace_sched_ports_where(port, am_in, am_start);
}
port->caller = pid;
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(driver_start)) {
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(pid, port)
+ DTRACE3(driver_start, process_str, driver->name, port_str);
+ }
+#endif
fpe_was_unmasked = erts_block_fpe();
drv_data = (*driver->start)((ErlDrvPort)(port_ix),
name, opts);
@@ -1170,6 +1191,12 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list)
ev.size = size; /* total size */
ev.iov = ivp;
ev.binv = bvp;
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(driver_outputv)) {
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(caller_id, p)
+ DTRACE4(driver_outputv, process_str, port_str, p->name, size);
+ }
+#endif
fpe_was_unmasked = erts_block_fpe();
(*drv->outputv)((ErlDrvData)p->drv_data, &ev);
erts_unblock_fpe(fpe_was_unmasked);
@@ -1189,8 +1216,21 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list)
buf = erts_alloc(ERTS_ALC_T_TMP, size+1);
r = io_list_to_buf(list, buf, size);
+#ifdef USE_VM_PROBES
+ if(DTRACE_ENABLED(port_command)) {
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(caller_id, p)
+ DTRACE4(port_command, process_str, port_str, p->name, "command");
+ }
+#endif
+
if (r >= 0) {
size -= r;
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(driver_output)) {
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(caller_id, p)
+ DTRACE4(driver_output, process_str, port_str, p->name, size);
+ }
+#endif
fpe_was_unmasked = erts_block_fpe();
(*drv->output)((ErlDrvData)p->drv_data, buf, size);
erts_unblock_fpe(fpe_was_unmasked);
@@ -1214,6 +1254,12 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list)
*/
buf = erts_alloc(ERTS_ALC_T_TMP, size+1);
r = io_list_to_buf(list, buf, size);
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(driver_output)) {
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(caller_id, p)
+ DTRACE4(driver_output, process_str, port_str, p->name, size);
+ }
+#endif
fpe_was_unmasked = erts_block_fpe();
(*drv->output)((ErlDrvData)p->drv_data, buf, size);
erts_unblock_fpe(fpe_was_unmasked);
@@ -1529,7 +1575,11 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks);
res = copy_struct(res, sz_res, &hp, ohp);
tuple = TUPLE2(hp, sender, res);
- erts_queue_message(rp, &rp_locks, bp, tuple, NIL);
+ erts_queue_message(rp, &rp_locks, bp, tuple, NIL
+#ifdef USE_VM_PROBES
+ , NIL
+#endif
+ );
erts_smp_proc_unlock(rp, rp_locks);
erts_smp_proc_dec_refc(rp);
}
@@ -1618,7 +1668,11 @@ static void deliver_read_message(Port* prt, Eterm to,
tuple = TUPLE2(hp, prt->id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined
+#ifdef USE_VM_PROBES
+ , NIL
+#endif
+ );
erts_smp_proc_unlock(rp, rp_locks);
erts_smp_proc_dec_refc(rp);
}
@@ -1771,7 +1825,11 @@ deliver_vec_message(Port* prt, /* Port */
tuple = TUPLE2(hp, prt->id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined
+#ifdef USE_VM_PROBES
+ , NIL
+#endif
+ );
erts_smp_proc_unlock(rp, rp_locks);
erts_smp_proc_dec_refc(rp);
}
@@ -1810,6 +1868,12 @@ static void flush_port(Port *p)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
if (p->drv_ptr->flush != NULL) {
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(driver_flush)) {
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p)
+ DTRACE3(driver_flush, process_str, port_str, p->name);
+ }
+#endif
if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(p, am_in, am_flush);
}
@@ -1837,6 +1901,7 @@ terminate_port(Port *prt)
Eterm send_closed_port_id;
Eterm connected_id = NIL /* Initialize to silence compiler */;
erts_driver_t *drv;
+ int halt;
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -1844,6 +1909,8 @@ terminate_port(Port *prt)
ASSERT(!prt->nlinks);
ASSERT(!prt->monitors);
+ /* prt->status may be altered by kill_port()below */
+ halt = (prt->status & ERTS_PORT_SFLG_HALT) != 0;
if (prt->status & ERTS_PORT_SFLG_SEND_CLOSED) {
erts_port_status_band_set(prt, ~ERTS_PORT_SFLG_SEND_CLOSED);
send_closed_port_id = prt->id;
@@ -1862,6 +1929,12 @@ terminate_port(Port *prt)
drv = prt->drv_ptr;
if ((drv != NULL) && (drv->stop != NULL)) {
int fpe_was_unmasked = erts_block_fpe();
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(driver_stop)) {
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt)
+ DTRACE3(driver_stop, process_str, drv->name, port_str);
+ }
+#endif
(*drv->stop)((ErlDrvData)prt->drv_data);
erts_unblock_fpe(fpe_was_unmasked);
#ifdef ERTS_SMP
@@ -1895,6 +1968,10 @@ terminate_port(Port *prt)
* We don't want to send the closed message until after the
* port has been removed from the port table (in kill_port()).
*/
+ if (halt && (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) {
+ erts_smp_port_unlock(prt); /* We will exit and never return */
+ erl_exit_flush_async(erts_halt_code, "");
+ }
if (is_internal_port(send_closed_port_id))
deliver_result(send_closed_port_id, connected_id, am_closed);
@@ -2019,6 +2096,19 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
rreason = (reason == am_kill) ? am_killed : reason;
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(port_exit)) {
+ DTRACE_CHARBUF(from_str, DTRACE_TERM_BUF_SIZE);
+ DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);
+ DTRACE_CHARBUF(rreason_str, 64);
+
+ erts_snprintf(from_str, sizeof(from_str), "%T", from);
+ dtrace_port_str(p, port_str);
+ erts_snprintf(rreason_str, sizeof(rreason_str), "%T", rreason);
+ DTRACE4(port_exit, from_str, port_str, p->name, rreason_str);
+ }
+#endif
+
if ((p->status & (ERTS_PORT_SFLGS_DEAD
| ERTS_PORT_SFLG_EXITING
| ERTS_PORT_SFLG_IMMORTAL))
@@ -2119,6 +2209,13 @@ void erts_port_command(Process *proc,
if (tp[2] == am_close) {
erts_port_status_bor_set(port, ERTS_PORT_SFLG_SEND_CLOSED);
erts_do_exit_port(port, pid, am_normal);
+
+#ifdef USE_VM_PROBES
+ if(DTRACE_ENABLED(port_command)) {
+ DTRACE_FORMAT_COMMON_PROC_AND_PORT(proc, port)
+ DTRACE4(port_command, process_str, port_str, port->name, "close");
+ }
+#endif
goto done;
} else if (is_tuple_arity(tp[2], 2)) {
tp = tuple_val(tp[2]);
@@ -2126,6 +2223,12 @@ void erts_port_command(Process *proc,
if (erts_write_to_port(caller_id, port, tp[2]) == 0)
goto done;
} else if ((tp[1] == am_connect) && is_internal_pid(tp[2])) {
+#ifdef USE_VM_PROBES
+ if(DTRACE_ENABLED(port_command)) {
+ DTRACE_FORMAT_COMMON_PROC_AND_PORT(proc, port)
+ DTRACE4(port_command, process_str, port_str, port->name, "connect");
+ }
+#endif
port->connected = tp[2];
deliver_result(port->id, pid, am_connected);
goto done;
@@ -2228,6 +2331,15 @@ erts_port_control(Process* p, Port* prt, Uint command, Eterm iolist)
erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN);
ERTS_SMP_CHK_NO_PROC_LOCKS;
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(port_control) || DTRACE_ENABLED(driver_control)) {
+ DTRACE_FORMAT_COMMON_PROC_AND_PORT(p, prt);
+ DTRACE4(port_control, process_str, port_str, prt->name, command);
+ DTRACE5(driver_control, process_str, port_str, prt->name,
+ command, to_len);
+ }
+#endif
+
/*
* Call the port's control routine.
*/
@@ -2368,6 +2480,10 @@ print_port_info(int to, void *arg, int i)
void
set_busy_port(ErlDrvPort port_num, int on)
{
+#ifdef USE_VM_PROBES
+ DTRACE_CHARBUF(port_str, 16);
+#endif
+
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num]));
@@ -2375,12 +2491,26 @@ set_busy_port(ErlDrvPort port_num, int on)
if (on) {
erts_port_status_bor_set(&erts_port[port_num],
ERTS_PORT_SFLG_PORT_BUSY);
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(port_busy)) {
+ erts_snprintf(port_str, sizeof(port_str),
+ "%T", erts_port[port_num].id);
+ DTRACE1(port_busy, port_str);
+ }
+#endif
} else {
ErtsProcList* plp = erts_port[port_num].suspended;
erts_port_status_band_set(&erts_port[port_num],
~ERTS_PORT_SFLG_PORT_BUSY);
erts_port[port_num].suspended = NULL;
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(port_not_busy)) {
+ erts_snprintf(port_str, sizeof(port_str),
+ "%T", erts_port[port_num].id);
+ DTRACE1(port_not_busy, port_str);
+ }
+#endif
if (erts_port[port_num].dist_entry) {
/*
* Processes suspended on distribution ports are
@@ -2398,6 +2528,28 @@ set_busy_port(ErlDrvPort port_num, int on)
*/
if (plp) {
+#ifdef USE_VM_PROBES
+ /*
+ * Hrm, for blocked dist ports, plp always seems to be NULL.
+ * That's not so fun.
+ * Well, another way to get the same info is using a D
+ * script to correlate an earlier process-port_blocked+pid
+ * event with a later process-scheduled event. That's
+ * subject to the multi-CPU races with how events are
+ * handled, but hey, that way works most of the time.
+ */
+ if (DTRACE_ENABLED(process_port_unblocked)) {
+ DTRACE_CHARBUF(pid_str, 16);
+ ErtsProcList* plp2 = plp;
+
+ erts_snprintf(port_str, sizeof(port_str),
+ "%T", erts_port[port_num]);
+ while (plp2 != NULL) {
+ erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid);
+ DTRACE2(process_port_unblocked, pid_str, port_str);
+ }
+ }
+#endif
/* First proc should be resumed last */
if (plp->next) {
erts_resume_processes(plp->next);
@@ -2444,6 +2596,14 @@ void erts_raw_port_command(Port* p, byte* buf, Uint len)
p->drv_ptr->name ? p->drv_ptr->name : "unknown");
p->caller = NIL;
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(driver_output)) {
+ DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);
+
+ dtrace_port_str(p, port_str);
+ DTRACE4(driver_output, "-raw-", port_str, p->name, len);
+ }
+#endif
fpe_was_unmasked = erts_block_fpe();
(*p->drv_ptr->output)((ErlDrvData)p->drv_data, (char*) buf, (int) len);
erts_unblock_fpe(fpe_was_unmasked);
@@ -2459,6 +2619,12 @@ int async_ready(Port *p, void* data)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
ASSERT(!(p->status & ERTS_PORT_SFLGS_DEAD));
if (p->drv_ptr->ready_async != NULL) {
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(driver_ready_async)) {
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p)
+ DTRACE3(driver_ready_async, process_str, port_str, p->name);
+ }
+#endif
(*p->drv_ptr->ready_async)((ErlDrvData)p->drv_data, data);
need_free = 0;
#ifdef ERTS_SMP
@@ -2653,7 +2819,11 @@ void driver_report_exit(int ix, int status)
hp += 3;
tuple = TUPLE2(hp, prt->id, tuple);
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined
+#ifdef USE_VM_PROBES
+ , NIL
+#endif
+ );
erts_smp_proc_unlock(rp, rp_locks);
erts_smp_proc_dec_refc(rp);
@@ -3203,7 +3373,11 @@ driver_deliver_term(ErlDrvPort port,
HRelease(rp, hp_end, hp);
}
/* send message */
- erts_queue_message(rp, &rp_locks, bp, mess, am_undefined);
+ erts_queue_message(rp, &rp_locks, bp, mess, am_undefined
+#ifdef USE_VM_PROBES
+ , NIL
+#endif
+ );
}
else {
if (b2t.ix > b2t.used)
@@ -4434,6 +4608,12 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
ASSERT(callback != NULL);
ref_to_driver_monitor(ref,&drv_monitor);
DRV_MONITOR_UNLOCK_PDL(prt);
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(driver_process_exit)) {
+ DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt)
+ DTRACE3(driver_process_exit, process_str, port_str, prt->name);
+ }
+#endif
fpe_was_unmasked = erts_block_fpe();
(*callback)((ErlDrvData) (prt->drv_data), &drv_monitor);
erts_unblock_fpe(fpe_was_unmasked);
@@ -4877,6 +5057,8 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
else {
int res;
int fpe_was_unmasked = erts_block_fpe();
+ DTRACE4(driver_init, drv->name, drv->version.major, drv->version.minor,
+ drv->flags);
res = (*de->init)();
erts_unblock_fpe(fpe_was_unmasked);
return res;