aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
authorRickard Green <rickard@erlang.org>2012-11-29 01:24:43 +0100
committerRickard Green <rickard@erlang.org>2012-12-07 00:24:27 +0100
commit9e4895da833b7777e69efc173f5dc777aaea3201 (patch)
tree02338dbdbf0b449b7b643437bf2cdd0cf73e4615 /erts/emulator/beam/io.c
parent43ebafb5fb40aee326b951d18c1880e6e5fdef6b (diff)
downloadotp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.gz
otp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.bz2
otp-9e4895da833b7777e69efc173f5dc777aaea3201.zip
Add support for busy port message queue
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c163
1 files changed, 93 insertions, 70 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 9024e9ab52..32f3a675fa 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -270,9 +270,10 @@ static Port *create_port(char *name,
Eterm pid,
int *enop)
{
+ ErtsPortTaskBusyPortQ *busy_port_queue;
Port *prt;
char *p;
- size_t port_size, size;
+ size_t port_size, busy_port_queue_size, size;
erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED;
erts_aint32_t x_pts_flgs = 0;
#ifdef DEBUG
@@ -288,7 +289,13 @@ static Port *create_port(char *name,
}
else
#endif
- port_size = size = sizeof(Port);
+ port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
+
+ busy_port_queue_size
+ = ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ)
+ ? 0
+ : ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ)));
+ size += busy_port_queue_size;
size += sys_strlen(name) + 1;
@@ -302,6 +309,13 @@ static Port *create_port(char *name,
prt = (Port *) p;
p += port_size;
+ if (!busy_port_queue_size)
+ busy_port_queue = NULL;
+ else {
+ busy_port_queue = (ErtsPortTaskBusyPortQ *) p;
+ p += busy_port_queue_size;
+ }
+
#ifdef ERTS_SMP
if (driver_lock) {
prt->lock = driver_lock;
@@ -320,6 +334,8 @@ static Port *create_port(char *name,
prt->cleanup = 0;
#endif
+ erts_port_task_pre_init_sched(&prt->sched, busy_port_queue);
+
prt->name = p;
sys_strcpy(p, name);
prt->drv_ptr = driver;
@@ -503,8 +519,7 @@ erts_save_suspend_process_on_port(Port *prt, Process *process)
erts_aint32_t flags;
erts_port_task_sched_lock(&prt->sched);
flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
- saved = ((flags & (ERTS_PTS_FLG_BUSY
- | ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY);
+ saved = (flags & ERTS_PTS_FLGS_BUSY) && !(flags & ERTS_PTS_FLG_EXIT);
if (saved)
erts_proclist_store_last(&prt->suspended, erts_proclist_create(process));
erts_port_task_sched_unlock(&prt->sched);
@@ -1530,7 +1545,7 @@ bad_port_signal(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = flags;
+ sigdp->flags = (flags & ~ERTS_P2P_SIG_TYPE_MASK) | ERTS_P2P_SIG_TYPE_BAD;
return erts_schedule_proc2port_signal(c_p,
prt,
@@ -1736,7 +1751,7 @@ erts_port_output(Process *c_p,
erts_driver_t *drv = prt->drv_ptr;
size_t size;
int try_call;
- erts_aint32_t sched_flags, busy_flag, invalid_flags;
+ erts_aint32_t sched_flags, busy_flgs, invalid_flags;
int task_flags;
ErtsProc2PortSigCallback port_sig_callback;
ErlDrvBinary *cbin = NULL;
@@ -1747,10 +1762,10 @@ erts_port_output(Process *c_p,
| ERTS_PORT_SIG_FLG_NOSUSPEND
| ERTS_PORT_SIG_FLG_FORCE)) == 0);
- busy_flag = ((flags & ERTS_PORT_SIG_FLG_FORCE)
+ busy_flgs = ((flags & ERTS_PORT_SIG_FLG_FORCE)
? ((erts_aint32_t) 0)
- : ERTS_PTS_FLG_BUSY);
- invalid_flags = busy_flag;
+ : ERTS_PTS_FLGS_BUSY);
+ invalid_flags = busy_flgs;
if (!refp)
invalid_flags |= ERTS_PTS_FLG_PARALLELISM;
@@ -1759,10 +1774,10 @@ erts_port_output(Process *c_p,
*/
sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
- if (sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT))
- return ((sched_flags & busy_flag)
- ? ERTS_PORT_OP_BUSY
- : ERTS_PORT_OP_DROPPED);
+ if (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))
+ return ((sched_flags & ERTS_PTS_FLG_EXIT)
+ ? ERTS_PORT_OP_DROPPED
+ : ERTS_PORT_OP_BUSY);
try_call = !(sched_flags & (invalid_flags|ERTS_PTS_FLGS_FORCE_SCHEDULE_OP));
@@ -1949,6 +1964,7 @@ erts_port_output(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
sigdp->u.outputv.from = from;
sigdp->u.outputv.evp = evp;
sigdp->u.outputv.cbinp = cbin;
@@ -2038,6 +2054,7 @@ erts_port_output(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUT;
sigdp->u.output.from = from;
sigdp->u.output.bufp = buf;
sigdp->u.output.size = size;
@@ -2045,7 +2062,7 @@ erts_port_output(Process *c_p,
}
task_flags = ERTS_PT_FLG_WAIT_BUSY;
- sigdp->flags = flags;
+ sigdp->flags |= flags;
if (flags & (ERTS_P2P_SIG_DATA_FLG_FORCE|ERTS_P2P_SIG_DATA_FLG_NOSUSPEND)) {
task_flags = 0;
if (flags & ERTS_P2P_SIG_DATA_FLG_FORCE)
@@ -2070,7 +2087,7 @@ erts_port_output(Process *c_p,
return res;
}
- if ((sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY)
+ if (!(sched_flags & ERTS_PTS_FLG_EXIT) && (sched_flags & busy_flgs))
return ERTS_PORT_OP_BUSY_SCHEDULED;
return res;
@@ -2201,7 +2218,7 @@ erts_port_exit(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = flags;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_EXIT | flags;
sigdp->u.exit.from = from;
if (is_immed(reason)) {
@@ -2367,7 +2384,7 @@ erts_port_connect(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = flags;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CONNECT | flags;
sigdp->u.connect.from = from;
sigdp->u.connect.connected = connect_id;
@@ -2424,7 +2441,7 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp)
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_UNLINK;
sigdp->u.unlink.from = from;
return erts_schedule_proc2port_signal(c_p,
@@ -2509,7 +2526,7 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_LINK;
sigdp->u.link.port = prt->common.id;
sigdp->u.link.to = to;
@@ -3851,7 +3868,7 @@ erts_port_control(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL;
sigdp->u.control.binp = binp;
sigdp->u.control.command = command;
sigdp->u.control.bufp = bufp;
@@ -4131,7 +4148,7 @@ erts_port_call(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CALL;
sigdp->u.call.command = command;
sigdp->u.call.bufp = bufp;
sigdp->u.call.size = size;
@@ -4298,7 +4315,7 @@ erts_port_info(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_INFO;
sigdp->u.info.item = item;
return erts_schedule_proc2port_signal(c_p,
@@ -4383,7 +4400,7 @@ erts_port_set_data(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_SET_DATA;
sigdp->u.set_data.data = set_data;
sigdp->u.set_data.bp = bp;
@@ -4516,7 +4533,7 @@ erts_port_get_data(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_GET_DATA;
return erts_schedule_proc2port_signal(c_p,
prt,
@@ -4608,8 +4625,8 @@ set_busy_port(ErlDrvPort dprt, int on)
if (on) {
flags = erts_smp_atomic32_read_bor_acqb(&prt->sched.flags,
- ERTS_PTS_FLG_BUSY);
- if (flags & ERTS_PTS_FLG_BUSY)
+ ERTS_PTS_FLG_BUSY_PORT);
+ if (flags & ERTS_PTS_FLG_BUSY_PORT)
return; /* Already busy */
if (flags & ERTS_PTS_FLG_HAVE_NS_TASKS)
@@ -4623,11 +4640,9 @@ set_busy_port(ErlDrvPort dprt, int on)
}
#endif
} else {
- ErtsProcList *plp;
-
flags = erts_smp_atomic32_read_band_acqb(&prt->sched.flags,
- ~ERTS_PTS_FLG_BUSY);
- if (!(flags & ERTS_PTS_FLG_BUSY))
+ ~ERTS_PTS_FLG_BUSY_PORT);
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT))
return; /* Already non-busy */
#ifdef USE_VM_PROBES
@@ -4645,52 +4660,60 @@ set_busy_port(ErlDrvPort dprt, int on)
erts_dist_port_not_busy(prt);
}
- /*
- * Resume, in a round-robin fashion, all processes waiting on the port.
- *
- * This version submitted by Tony Rogvall. The earlier version used
- * to resume the processes in order, which caused starvation of all but
- * the first process.
- */
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
+ erts_port_resume_procs(prt);
+ }
+}
- erts_port_task_sched_lock(&prt->sched);
- plp = prt->suspended;
- prt->suspended = NULL;
- erts_port_task_sched_unlock(&prt->sched);
+void
+erts_port_resume_procs(Port *prt)
+{
+ /*
+ * Resume, in a round-robin fashion, all processes waiting on the port.
+ *
+ * This version submitted by Tony Rogvall. The earlier version used
+ * to resume the processes in order, which caused starvation of all but
+ * the first process.
+ */
+ ErtsProcList *plp;
- if (erts_proclist_fetch(&plp, NULL)) {
+ erts_port_task_sched_lock(&prt->sched);
+ plp = prt->suspended;
+ prt->suspended = NULL;
+ erts_port_task_sched_unlock(&prt->sched);
+
+ if (erts_proclist_fetch(&plp, NULL)) {
#ifdef USE_VM_PROBES
- /*
- * Hrm, for blocked dist ports, plp always seems to be NULL.
- * That's not so fun.
- * Well, another way to get the same info is using a D
- * script to correlate an earlier process-port_blocked+pid
- * event with a later process-scheduled event. That's
- * subject to the multi-CPU races with how events are
- * handled, but hey, that way works most of the time.
- */
- if (DTRACE_ENABLED(process_port_unblocked)) {
- DTRACE_CHARBUF(pid_str, 16);
- ErtsProcList* plp2 = plp;
-
- erts_snprintf(port_str, sizeof(port_str),
- "%T", prt->common.id);
- while (plp2 != NULL) {
- erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid);
- DTRACE2(process_port_unblocked, pid_str, port_str);
- }
- }
-#endif
+ /*
+ * Hrm, for blocked dist ports, plp always seems to be NULL.
+ * That's not so fun.
+ * Well, another way to get the same info is using a D
+ * script to correlate an earlier process-port_blocked+pid
+ * event with a later process-scheduled event. That's
+ * subject to the multi-CPU races with how events are
+ * handled, but hey, that way works most of the time.
+ */
+ if (DTRACE_ENABLED(process_port_unblocked)) {
+ DTRACE_CHARBUF(port_str, 16);
+ DTRACE_CHARBUF(pid_str, 16);
+ ErtsProcList* plp2 = plp;
- /* First proc should be resumed last */
- if (plp->next) {
- plp->next->prev = NULL;
- erts_resume_processes(plp->next);
- plp->next = NULL;
+ erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id);
+ while (plp2 != NULL) {
+ erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid);
+ DTRACE2(process_port_unblocked, pid_str, port_str);
}
- erts_resume_processes(plp);
- }
+ }
+#endif
+
+ /* First proc should be resumed last */
+ if (plp->next) {
+ plp->next->prev = NULL;
+ erts_resume_processes(plp->next);
+ plp->next = NULL;
+ }
+ erts_resume_processes(plp);
}
}