diff options
author | Rickard Green <[email protected]> | 2012-11-29 01:24:43 +0100 |
---|---|---|
committer | Rickard Green <[email protected]> | 2012-12-07 00:24:27 +0100 |
commit | 9e4895da833b7777e69efc173f5dc777aaea3201 (patch) | |
tree | 02338dbdbf0b449b7b643437bf2cdd0cf73e4615 /erts/emulator/beam/io.c | |
parent | 43ebafb5fb40aee326b951d18c1880e6e5fdef6b (diff) | |
download | otp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.gz otp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.bz2 otp-9e4895da833b7777e69efc173f5dc777aaea3201.zip |
Add support for busy port message queue
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r-- | erts/emulator/beam/io.c | 163 |
1 files changed, 93 insertions, 70 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 9024e9ab52..32f3a675fa 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -270,9 +270,10 @@ static Port *create_port(char *name, Eterm pid, int *enop) { + ErtsPortTaskBusyPortQ *busy_port_queue; Port *prt; char *p; - size_t port_size, size; + size_t port_size, busy_port_queue_size, size; erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED; erts_aint32_t x_pts_flgs = 0; #ifdef DEBUG @@ -288,7 +289,13 @@ static Port *create_port(char *name, } else #endif - port_size = size = sizeof(Port); + port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); + + busy_port_queue_size + = ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ) + ? 0 + : ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ))); + size += busy_port_queue_size; size += sys_strlen(name) + 1; @@ -302,6 +309,13 @@ static Port *create_port(char *name, prt = (Port *) p; p += port_size; + if (!busy_port_queue_size) + busy_port_queue = NULL; + else { + busy_port_queue = (ErtsPortTaskBusyPortQ *) p; + p += busy_port_queue_size; + } + #ifdef ERTS_SMP if (driver_lock) { prt->lock = driver_lock; @@ -320,6 +334,8 @@ static Port *create_port(char *name, prt->cleanup = 0; #endif + erts_port_task_pre_init_sched(&prt->sched, busy_port_queue); + prt->name = p; sys_strcpy(p, name); prt->drv_ptr = driver; @@ -503,8 +519,7 @@ erts_save_suspend_process_on_port(Port *prt, Process *process) erts_aint32_t flags; erts_port_task_sched_lock(&prt->sched); flags = erts_smp_atomic32_read_nob(&prt->sched.flags); - saved = ((flags & (ERTS_PTS_FLG_BUSY - | ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY); + saved = (flags & ERTS_PTS_FLGS_BUSY) && !(flags & ERTS_PTS_FLG_EXIT); if (saved) erts_proclist_store_last(&prt->suspended, erts_proclist_create(process)); erts_port_task_sched_unlock(&prt->sched); @@ -1530,7 +1545,7 @@ bad_port_signal(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = flags; + sigdp->flags = (flags & ~ERTS_P2P_SIG_TYPE_MASK) | ERTS_P2P_SIG_TYPE_BAD; return erts_schedule_proc2port_signal(c_p, prt, @@ -1736,7 +1751,7 @@ erts_port_output(Process *c_p, erts_driver_t *drv = prt->drv_ptr; size_t size; int try_call; - erts_aint32_t sched_flags, busy_flag, invalid_flags; + erts_aint32_t sched_flags, busy_flgs, invalid_flags; int task_flags; ErtsProc2PortSigCallback port_sig_callback; ErlDrvBinary *cbin = NULL; @@ -1747,10 +1762,10 @@ erts_port_output(Process *c_p, | ERTS_PORT_SIG_FLG_NOSUSPEND | ERTS_PORT_SIG_FLG_FORCE)) == 0); - busy_flag = ((flags & ERTS_PORT_SIG_FLG_FORCE) + busy_flgs = ((flags & ERTS_PORT_SIG_FLG_FORCE) ? ((erts_aint32_t) 0) - : ERTS_PTS_FLG_BUSY); - invalid_flags = busy_flag; + : ERTS_PTS_FLGS_BUSY); + invalid_flags = busy_flgs; if (!refp) invalid_flags |= ERTS_PTS_FLG_PARALLELISM; @@ -1759,10 +1774,10 @@ erts_port_output(Process *c_p, */ sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); - if (sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT)) - return ((sched_flags & busy_flag) - ? ERTS_PORT_OP_BUSY - : ERTS_PORT_OP_DROPPED); + if (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT)) + return ((sched_flags & ERTS_PTS_FLG_EXIT) + ? ERTS_PORT_OP_DROPPED + : ERTS_PORT_OP_BUSY); try_call = !(sched_flags & (invalid_flags|ERTS_PTS_FLGS_FORCE_SCHEDULE_OP)); @@ -1949,6 +1964,7 @@ erts_port_output(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV; sigdp->u.outputv.from = from; sigdp->u.outputv.evp = evp; sigdp->u.outputv.cbinp = cbin; @@ -2038,6 +2054,7 @@ erts_port_output(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUT; sigdp->u.output.from = from; sigdp->u.output.bufp = buf; sigdp->u.output.size = size; @@ -2045,7 +2062,7 @@ erts_port_output(Process *c_p, } task_flags = ERTS_PT_FLG_WAIT_BUSY; - sigdp->flags = flags; + sigdp->flags |= flags; if (flags & (ERTS_P2P_SIG_DATA_FLG_FORCE|ERTS_P2P_SIG_DATA_FLG_NOSUSPEND)) { task_flags = 0; if (flags & ERTS_P2P_SIG_DATA_FLG_FORCE) @@ -2070,7 +2087,7 @@ erts_port_output(Process *c_p, return res; } - if ((sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY) + if (!(sched_flags & ERTS_PTS_FLG_EXIT) && (sched_flags & busy_flgs)) return ERTS_PORT_OP_BUSY_SCHEDULED; return res; @@ -2201,7 +2218,7 @@ erts_port_exit(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = flags; + sigdp->flags = ERTS_P2P_SIG_TYPE_EXIT | flags; sigdp->u.exit.from = from; if (is_immed(reason)) { @@ -2367,7 +2384,7 @@ erts_port_connect(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = flags; + sigdp->flags = ERTS_P2P_SIG_TYPE_CONNECT | flags; sigdp->u.connect.from = from; sigdp->u.connect.connected = connect_id; @@ -2424,7 +2441,7 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp) } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_UNLINK; sigdp->u.unlink.from = from; return erts_schedule_proc2port_signal(c_p, @@ -2509,7 +2526,7 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_LINK; sigdp->u.link.port = prt->common.id; sigdp->u.link.to = to; @@ -3851,7 +3868,7 @@ erts_port_control(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL; sigdp->u.control.binp = binp; sigdp->u.control.command = command; sigdp->u.control.bufp = bufp; @@ -4131,7 +4148,7 @@ erts_port_call(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_CALL; sigdp->u.call.command = command; sigdp->u.call.bufp = bufp; sigdp->u.call.size = size; @@ -4298,7 +4315,7 @@ erts_port_info(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_INFO; sigdp->u.info.item = item; return erts_schedule_proc2port_signal(c_p, @@ -4383,7 +4400,7 @@ erts_port_set_data(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_SET_DATA; sigdp->u.set_data.data = set_data; sigdp->u.set_data.bp = bp; @@ -4516,7 +4533,7 @@ erts_port_get_data(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_GET_DATA; return erts_schedule_proc2port_signal(c_p, prt, @@ -4608,8 +4625,8 @@ set_busy_port(ErlDrvPort dprt, int on) if (on) { flags = erts_smp_atomic32_read_bor_acqb(&prt->sched.flags, - ERTS_PTS_FLG_BUSY); - if (flags & ERTS_PTS_FLG_BUSY) + ERTS_PTS_FLG_BUSY_PORT); + if (flags & ERTS_PTS_FLG_BUSY_PORT) return; /* Already busy */ if (flags & ERTS_PTS_FLG_HAVE_NS_TASKS) @@ -4623,11 +4640,9 @@ set_busy_port(ErlDrvPort dprt, int on) } #endif } else { - ErtsProcList *plp; - flags = erts_smp_atomic32_read_band_acqb(&prt->sched.flags, - ~ERTS_PTS_FLG_BUSY); - if (!(flags & ERTS_PTS_FLG_BUSY)) + ~ERTS_PTS_FLG_BUSY_PORT); + if (!(flags & ERTS_PTS_FLG_BUSY_PORT)) return; /* Already non-busy */ #ifdef USE_VM_PROBES @@ -4645,52 +4660,60 @@ set_busy_port(ErlDrvPort dprt, int on) erts_dist_port_not_busy(prt); } - /* - * Resume, in a round-robin fashion, all processes waiting on the port. - * - * This version submitted by Tony Rogvall. The earlier version used - * to resume the processes in order, which caused starvation of all but - * the first process. - */ + if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q)) + erts_port_resume_procs(prt); + } +} - erts_port_task_sched_lock(&prt->sched); - plp = prt->suspended; - prt->suspended = NULL; - erts_port_task_sched_unlock(&prt->sched); +void +erts_port_resume_procs(Port *prt) +{ + /* + * Resume, in a round-robin fashion, all processes waiting on the port. + * + * This version submitted by Tony Rogvall. The earlier version used + * to resume the processes in order, which caused starvation of all but + * the first process. + */ + ErtsProcList *plp; - if (erts_proclist_fetch(&plp, NULL)) { + erts_port_task_sched_lock(&prt->sched); + plp = prt->suspended; + prt->suspended = NULL; + erts_port_task_sched_unlock(&prt->sched); + + if (erts_proclist_fetch(&plp, NULL)) { #ifdef USE_VM_PROBES - /* - * Hrm, for blocked dist ports, plp always seems to be NULL. - * That's not so fun. - * Well, another way to get the same info is using a D - * script to correlate an earlier process-port_blocked+pid - * event with a later process-scheduled event. That's - * subject to the multi-CPU races with how events are - * handled, but hey, that way works most of the time. - */ - if (DTRACE_ENABLED(process_port_unblocked)) { - DTRACE_CHARBUF(pid_str, 16); - ErtsProcList* plp2 = plp; - - erts_snprintf(port_str, sizeof(port_str), - "%T", prt->common.id); - while (plp2 != NULL) { - erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); - DTRACE2(process_port_unblocked, pid_str, port_str); - } - } -#endif + /* + * Hrm, for blocked dist ports, plp always seems to be NULL. + * That's not so fun. + * Well, another way to get the same info is using a D + * script to correlate an earlier process-port_blocked+pid + * event with a later process-scheduled event. That's + * subject to the multi-CPU races with how events are + * handled, but hey, that way works most of the time. + */ + if (DTRACE_ENABLED(process_port_unblocked)) { + DTRACE_CHARBUF(port_str, 16); + DTRACE_CHARBUF(pid_str, 16); + ErtsProcList* plp2 = plp; - /* First proc should be resumed last */ - if (plp->next) { - plp->next->prev = NULL; - erts_resume_processes(plp->next); - plp->next = NULL; + erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id); + while (plp2 != NULL) { + erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); + DTRACE2(process_port_unblocked, pid_str, port_str); } - erts_resume_processes(plp); - } + } +#endif + + /* First proc should be resumed last */ + if (plp->next) { + plp->next->prev = NULL; + erts_resume_processes(plp->next); + plp->next = NULL; + } + erts_resume_processes(plp); } } |