diff options
author | Rickard Green <[email protected]> | 2012-11-29 01:24:43 +0100 |
---|---|---|
committer | Rickard Green <[email protected]> | 2012-12-07 00:24:27 +0100 |
commit | 9e4895da833b7777e69efc173f5dc777aaea3201 (patch) | |
tree | 02338dbdbf0b449b7b643437bf2cdd0cf73e4615 /erts/emulator | |
parent | 43ebafb5fb40aee326b951d18c1880e6e5fdef6b (diff) | |
download | otp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.gz otp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.bz2 otp-9e4895da833b7777e69efc173f5dc777aaea3201.zip |
Add support for busy port message queue
Diffstat (limited to 'erts/emulator')
-rw-r--r-- | erts/emulator/beam/dist.c | 19 | ||||
-rw-r--r-- | erts/emulator/beam/erl_driver.h | 10 | ||||
-rw-r--r-- | erts/emulator/beam/erl_port.h | 64 | ||||
-rw-r--r-- | erts/emulator/beam/erl_port_task.c | 431 | ||||
-rw-r--r-- | erts/emulator/beam/erl_port_task.h | 46 | ||||
-rw-r--r-- | erts/emulator/beam/io.c | 163 | ||||
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 72 |
7 files changed, 610 insertions, 195 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index a939b90eac..fd5efd89f1 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -2039,7 +2039,7 @@ erts_dist_command(Port *prt, int reds_limit) if (reds > reds_limit) goto preempted; - if (!(sched_flags & ERTS_PTS_FLG_BUSY) && foq.first) { + if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { int preempt = 0; do { Uint size; @@ -2057,7 +2057,7 @@ erts_dist_command(Port *prt, int reds_limit) free_dist_obuf(fob); sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); - if (sched_flags & ERTS_PTS_FLG_BUSY) + if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) break; } while (foq.first && !preempt); if (!foq.first) @@ -2066,7 +2066,7 @@ erts_dist_command(Port *prt, int reds_limit) goto preempted; } - if (sched_flags & ERTS_PTS_FLG_BUSY) { + if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) { if (oq.first) { ErtsDistOutputBuf *ob; int preempt; @@ -2139,7 +2139,7 @@ erts_dist_command(Port *prt, int reds_limit) free_dist_obuf(fob); sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); - if ((sched_flags & ERTS_PTS_FLG_BUSY) && oq.first && !preempt) + if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) goto finalize_only; } @@ -2168,7 +2168,7 @@ erts_dist_command(Port *prt, int reds_limit) ASSERT(dep->qsize >= obufsize); dep->qsize -= obufsize; obufsize = 0; - if (!(sched_flags & ERTS_PTS_FLG_BUSY) + if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && (dep->qflgs & ERTS_DE_QFLG_BUSY) && dep->qsize < erts_dist_buf_busy_limit) { ErtsProcList *suspendees; @@ -2692,6 +2692,15 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + /* + * Dist-ports do not use the "busy port message queue" functionality, but + * instead use "busy dist entry" functionality. + */ + { + ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; + erl_drv_busy_msgq_limits((ErlDrvPort) pp, &disable, NULL); + } + pp->dist_entry = dep; dep->version = version; diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h index fb9e92e44b..cbc7bd4ede 100644 --- a/erts/emulator/beam/erl_driver.h +++ b/erts/emulator/beam/erl_driver.h @@ -157,6 +157,7 @@ typedef struct { #define ERL_DRV_FLAG_USE_PORT_LOCKING (1 << 0) #define ERL_DRV_FLAG_SOFT_BUSY (1 << 1) +#define ERL_DRV_FLAG_NO_BUSY_MSGQ (1 << 2) /* * Integer types @@ -384,9 +385,18 @@ typedef struct erl_drv_entry { ErlDrvEntry* driver_init(void) #endif +#define ERL_DRV_BUSY_MSGQ_DISABLED (~((ErlDrvSizeT) 0)) +#define ERL_DRV_BUSY_MSGQ_READ_ONLY ((ErlDrvSizeT) 0) +#define ERL_DRV_BUSY_MSGQ_LIM_MAX (ERL_DRV_BUSY_MSGQ_DISABLED - 1) +#define ERL_DRV_BUSY_MSGQ_LIM_MIN ((ErlDrvSizeT) 1) + /* * These are the functions available for driver writers. */ +EXTERN void erl_drv_busy_msgq_limits(ErlDrvPort port, + ErlDrvSizeT *low, + ErlDrvSizeT *high); + EXTERN int driver_select(ErlDrvPort port, ErlDrvEvent event, int mode, int on); EXTERN int driver_event(ErlDrvPort port, ErlDrvEvent event, ErlDrvEventData event_data); diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index 1eb6dea710..67c8c28f3f 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -761,15 +761,36 @@ erts_port_driver_callback_epilogue(Port *prt, erts_aint32_t *statep) #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ +void erts_port_resume_procs(Port *); + struct binary; -#define ERTS_P2P_SIG_DATA_FLG_BANG_OP (1 << 0) -#define ERTS_P2P_SIG_DATA_FLG_REPLY (1 << 1) -#define ERTS_P2P_SIG_DATA_FLG_NOSUSPEND (1 << 2) -#define ERTS_P2P_SIG_DATA_FLG_FORCE (1 << 3) -#define ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT (1 << 4) -#define ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK (1 << 5) -#define ERTS_P2P_SIG_DATA_FLG_SCHED (1 << 6) +#define ERTS_P2P_SIG_TYPE_BAD 0 +#define ERTS_P2P_SIG_TYPE_OUTPUT 1 +#define ERTS_P2P_SIG_TYPE_OUTPUTV 2 +#define ERTS_P2P_SIG_TYPE_CONNECT 3 +#define ERTS_P2P_SIG_TYPE_EXIT 4 +#define ERTS_P2P_SIG_TYPE_CONTROL 5 +#define ERTS_P2P_SIG_TYPE_CALL 6 +#define ERTS_P2P_SIG_TYPE_INFO 7 +#define ERTS_P2P_SIG_TYPE_LINK 8 +#define ERTS_P2P_SIG_TYPE_UNLINK 9 +#define ERTS_P2P_SIG_TYPE_SET_DATA 10 +#define ERTS_P2P_SIG_TYPE_GET_DATA 11 + +#define ERTS_P2P_SIG_TYPE_BITS 4 +#define ERTS_P2P_SIG_TYPE_MASK \ + ((1 << ERTS_P2P_SIG_TYPE_BITS) - 1) + +#define ERTS_P2P_SIG_DATA_FLG(N) \ + (1 << (ERTS_P2P_SIG_TYPE_BITS + (N))) +#define ERTS_P2P_SIG_DATA_FLG_BANG_OP ERTS_P2P_SIG_DATA_FLG(0) +#define ERTS_P2P_SIG_DATA_FLG_REPLY ERTS_P2P_SIG_DATA_FLG(1) +#define ERTS_P2P_SIG_DATA_FLG_NOSUSPEND ERTS_P2P_SIG_DATA_FLG(2) +#define ERTS_P2P_SIG_DATA_FLG_FORCE ERTS_P2P_SIG_DATA_FLG(3) +#define ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT ERTS_P2P_SIG_DATA_FLG(4) +#define ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK ERTS_P2P_SIG_DATA_FLG(5) +#define ERTS_P2P_SIG_DATA_FLG_SCHED ERTS_P2P_SIG_DATA_FLG(6) struct ErtsProc2PortSigData_ { int flags; @@ -823,6 +844,35 @@ struct ErtsProc2PortSigData_ { } u; } ; +ERTS_GLB_INLINE int +erts_proc2port_sig_is_command_op(ErtsProc2PortSigData *sigdp); +ERTS_GLB_INLINE ErlDrvSizeT +erts_proc2port_sig_command_data_size(ErtsProc2PortSigData *sigdp); + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE int +erts_proc2port_sig_is_command_op(ErtsProc2PortSigData *sigdp) +{ + switch (sigdp->flags & ERTS_P2P_SIG_TYPE_MASK) { + case ERTS_P2P_SIG_TYPE_OUTPUT: return !0; + case ERTS_P2P_SIG_TYPE_OUTPUTV: return !0; + default: return 0; + } +} + +ERTS_GLB_INLINE ErlDrvSizeT +erts_proc2port_sig_command_data_size(ErtsProc2PortSigData *sigdp) +{ + switch (sigdp->flags & ERTS_P2P_SIG_TYPE_MASK) { + case ERTS_P2P_SIG_TYPE_OUTPUT: return sigdp->u.output.size; + case ERTS_P2P_SIG_TYPE_OUTPUTV: return sigdp->u.outputv.evp->size; + default: return (ErlDrvSizeT) 0; + } +} + +#endif + #define ERTS_PROC2PORT_SIG_EXEC 0 #define ERTS_PROC2PORT_SIG_ABORT 1 #define ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND 2 diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index b6b113b753..89f6a4e774 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -539,103 +539,257 @@ set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp) } } + /* - * No-suspend handles. + * Busy port queue management */ -#ifdef ERTS_SMP -static void -free_port_task_handle_list(void *vpthlp) +static erts_aint32_t +check_unset_busy_port_q(Port *pp, + erts_aint32_t flags, + ErtsPortTaskBusyPortQ *bpq) { - erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp); -} -#endif + ErlDrvSizeT qsize, low; + int resume_procs = 0; -static void -schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp) -{ -#ifdef ERTS_SMP - erts_schedule_thr_prgr_later_op(free_port_task_handle_list, - (void *) pthlp, - &pthlp->u.release); -#else - erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp); -#endif + ASSERT(bpq); + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); + + erts_port_task_sched_lock(&pp->sched); + qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size); + low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low); + if (qsize < low) { + erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q + | ERTS_PTS_FLG_BUSY_PORT_Q); + flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask); + if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q) + resume_procs = 1; + } + else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) { + flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, + ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); + flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q; + } + erts_port_task_sched_unlock(&pp->sched); + if (resume_procs) + erts_port_resume_procs(pp); + + return flags; } static ERTS_INLINE void -abort_nosuspend_task(ErtsPortTaskType type, - ErtsPortTaskTypeData *tdp) +aborted_proc2port_data(Port *pp, ErlDrvSizeT size) { + ErtsPortTaskBusyPortQ *bpq; + erts_aint32_t flags; + ErlDrvSizeT qsz; - if (type != ERTS_PORT_TASK_PROC_SIG) - ERTS_INTERNAL_ERROR("Invalid no-suspend port task type"); + ASSERT(pp->sched.taskq.bpq); - tdp->psig.callback(NULL, - ERTS_PORT_SFLG_INVALID, - ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, - &tdp->psig.data); -} + if (size == 0) + return; + bpq = pp->sched.taskq.bpq; -static void -save_nosuspend_handle(Port *pp, ErtsPortTask *ptp) -{ - erts_aint32_t act; - ErtsPortTaskHandleList *pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST, - sizeof(ErtsPortTaskHandleList)); + qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size, + (erts_aint_t) -size); + ASSERT(qsz + size > qsz); + flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + ASSERT(pp->sched.taskq.bpq); + if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q + | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q) + return; + if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) + erts_smp_atomic32_read_bor_nob(&pp->sched.flags, + ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); +} - set_handle(ptp, &pthlp->handle); +static ERTS_INLINE void +dequeued_proc2port_data(Port *pp, ErlDrvSizeT size) +{ + ErtsPortTaskBusyPortQ *bpq; + erts_aint32_t flags; + ErlDrvSizeT qsz; - ASSERT(ptp == handle2task(&pthlp->handle)); - ASSERT(ptp->u.alive.handle == &pthlp->handle); + ASSERT(pp->sched.taskq.bpq); - act = erts_smp_atomic32_read_nob(&pp->sched.flags); + if (size == 0) + return; - if (!(act & ERTS_PTS_FLG_BUSY)) { + bpq = pp->sched.taskq.bpq; - erts_port_task_sched_lock(&pp->sched); - pthlp->u.next = pp->sched.taskq.local.busy.nosuspend; - pp->sched.taskq.local.busy.nosuspend = pthlp; - erts_port_task_sched_unlock(&pp->sched); + qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size, + (erts_aint_t) -size); + ASSERT(qsz + size > qsz); + flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q)) + return; + if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low)) + check_unset_busy_port_q(pp, flags, bpq); +} - act = erts_smp_atomic32_read_nob(&pp->sched.flags); +static ERTS_INLINE erts_aint32_t +enqueue_proc2port_data(Port *pp, + ErtsProc2PortSigData *sigdp, + erts_aint32_t flags) +{ + ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq; + if (sigdp && bpq) { + ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp); + if (size) { + erts_aint_t asize = erts_smp_atomic_add_read_acqb(&bpq->size, + (erts_aint_t) size); + ErlDrvSizeT qsz = (ErlDrvSizeT) asize; + + ASSERT(qsz - size < qsz); + + if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) { + flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags, + ERTS_PTS_FLG_BUSY_PORT_Q); + flags |= ERTS_PTS_FLG_BUSY_PORT_Q; + qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size); + if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) { + flags = (erts_smp_atomic32_read_bor_relb( + &pp->sched.flags, + ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)); + flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q; + } + } + ASSERT(!(flags & ERTS_PTS_FLG_EXIT)); + } + } + return flags; +} - while (1) { - erts_aint32_t exp, new; +/* + * erl_drv_busy_msgq_limits() is called by drivers either reading or + * writing the limits. + * + * A limit of zero is interpreted as a read only request (using a + * limit of zero would not be useful). Other values are interpreted + * as a write-read request. + */ - if ((act & (ERTS_PTS_FLG_BUSY|ERTS_PTS_FLG_HAVE_NS_TASKS)) - == ERTS_PTS_FLG_HAVE_NS_TASKS) - return; +void +erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp) +{ + Port *pp = erts_drvport2port(dport, NULL); + ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq; + int written = 0, resume_procs = 0; + ErlDrvSizeT low, high; + + if (!pp || !bpq) { + if (lowp) + *lowp = ERL_DRV_BUSY_MSGQ_DISABLED; + if (highp) + *highp = ERL_DRV_BUSY_MSGQ_DISABLED; + return; + } - if (act & ERTS_PTS_FLG_BUSY) { - erts_aint32_t s; - s = erts_smp_atomic32_cmpxchg_nob(&ptp->state, - ERTS_PT_STATE_ABORTED, - ERTS_PT_STATE_SCHEDULED); - if (s == ERTS_PT_STATE_SCHEDULED) { - reset_port_task_handle(&pthlp->handle); - break; /* Abort task */ - } - /* Else: someone else handled it */ - return; - } + low = lowp ? *lowp : 0; + high = highp ? *highp : 0; - new = exp = act; + erts_port_task_sched_lock(&pp->sched); - new |= ERTS_PTS_FLG_HAVE_NS_TASKS; + if (low == ERL_DRV_BUSY_MSGQ_DISABLED + || high == ERL_DRV_BUSY_MSGQ_DISABLED) { + /* Disable busy msgq feature */ + erts_aint32_t flags; + pp->sched.taskq.bpq = NULL; + flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); + flags = erts_smp_atomic32_read_band_acqb(&pp->sched.flags, flags); + if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q) + resume_procs = 1; + } + else { - act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); - if (act == exp) - return; + if (!low) + low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low); + else { + if (bpq->high < low) + bpq->high = low; + erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low); + written = 1; + } + + if (!high) + high = bpq->high; + else { + if (low > high) { + low = high; + erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low); + } + bpq->high = high; + written = 1; + } + if (written) { + ErlDrvSizeT size = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size); + if (size > high) + erts_smp_atomic32_read_bor_relb(&pp->sched.flags, + ERTS_PTS_FLG_BUSY_PORT_Q); + else if (size < low) + erts_smp_atomic32_read_bor_relb(&pp->sched.flags, + ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); } } + erts_port_task_sched_unlock(&pp->sched); - abort_nosuspend_task(ptp->type, &ptp->u.alive.td); + if (resume_procs) + erts_port_resume_procs(pp); + if (lowp) + *lowp = low; + if (highp) + *highp = high; } +/* + * No-suspend handles. + */ + +#ifdef ERTS_SMP +static void +free_port_task_handle_list(void *vpthlp) +{ + erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp); +} +#endif + +static void +schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp) +{ +#ifdef ERTS_SMP + erts_schedule_thr_prgr_later_op(free_port_task_handle_list, + (void *) pthlp, + &pthlp->u.release); +#else + erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp); +#endif +} + +static ERTS_INLINE void +abort_nosuspend_task(Port *pp, + ErtsPortTaskType type, + ErtsPortTaskTypeData *tdp) +{ + + ASSERT(type == ERTS_PORT_TASK_PROC_SIG); + + if (!pp->sched.taskq.bpq) + tdp->psig.callback(NULL, + ERTS_PORT_SFLG_INVALID, + ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, + &tdp->psig.data); + else { + ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data); + tdp->psig.callback(NULL, + ERTS_PORT_SFLG_INVALID, + ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, + &tdp->psig.data); + aborted_proc2port_data(pp, size); + } +} static ErtsPortTaskHandleList * get_free_nosuspend_handles(Port *pp) @@ -724,14 +878,29 @@ pop_port(ErtsRunQueue *runq) * Task queue operations */ -static ERTS_INLINE erts_aint32_t -enqueue_task(Port *pp, ErtsPortTask *ptp) +static ERTS_INLINE int +enqueue_task(Port *pp, + ErtsPortTask *ptp, + ErtsProc2PortSigData *sigdp, + ErtsPortTaskHandleList *ns_pthlp, + erts_aint32_t *flagsp) + { + int res; + erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT; erts_aint32_t flags; ptp->u.alive.next = NULL; + if (ns_pthlp) + fail_flags |= ERTS_PTS_FLG_BUSY_PORT; erts_port_task_sched_lock(&pp->sched); flags = erts_smp_atomic32_read_nob(&pp->sched.flags); - if (!(flags & ERTS_PTS_FLG_EXIT)) { + if (flags & fail_flags) + res = 0; + else { + if (ns_pthlp) { + ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend; + pp->sched.taskq.local.busy.nosuspend = ns_pthlp; + } if (pp->sched.taskq.in.last) { ASSERT(pp->sched.taskq.in.first); ASSERT(!pp->sched.taskq.in.last->u.alive.next); @@ -744,9 +913,12 @@ enqueue_task(Port *pp, ErtsPortTask *ptp) pp->sched.taskq.in.first = ptp; } pp->sched.taskq.in.last = ptp; + flags = enqueue_proc2port_data(pp, sigdp, flags); + res = 1; } erts_port_task_sched_unlock(&pp->sched); - return flags; + *flagsp = flags; + return res; } static ERTS_INLINE void @@ -754,7 +926,7 @@ prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) { erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags); - if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY)) { + if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) { *execqp = pp->sched.taskq.local.first; *processing_busy_q_p = 0; } @@ -799,10 +971,9 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q) *execq = NULL; - /* guess a likely value */ - act = ERTS_PTS_FLG_EXEC; - if (execq) - act |= ERTS_PTS_FLG_HAVE_TASKS; + act = erts_smp_atomic32_read_nob(&pp->sched.flags); + if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) + act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq); while (1) { erts_aint32_t new, exp; @@ -829,9 +1000,12 @@ select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) { erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) + flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq); + ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); - if (flags & ERTS_PTS_FLG_BUSY) { + if (flags & ERTS_PTS_FLG_BUSY_PORT) { if (*processing_busy_q_p) { ErtsPortTask *ptp; @@ -880,7 +1054,7 @@ check_task_for_exec(Port *pp, ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p); - if ((flags & ERTS_PTS_FLG_BUSY) + if ((flags & ERTS_PTS_FLG_BUSY_PORT) && (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) { busy_wait_move_to_busy_queue(pp, ptp); @@ -902,7 +1076,7 @@ check_task_for_exec(Port *pp, else { /* Processing busy queue */ - ASSERT(!(flags & ERTS_PTS_FLG_BUSY)); + ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT)); ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p); @@ -1019,10 +1193,7 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp) erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks); break; case ERTS_PORT_TASK_PROC_SIG: - ptp->u.alive.td.psig.callback(NULL, - ERTS_PORT_SFLG_INVALID, - ERTS_PROC2PORT_SIG_ABORT, - &ptp->u.alive.td.psig.data); + ERTS_INTERNAL_ERROR("Aborted process to port signal"); break; default: break; @@ -1042,14 +1213,15 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp) void erts_port_task_abort_nosuspend_tasks(Port *pp) { + erts_aint32_t flags; ErtsPortTaskHandleList *abort_list; #ifdef ERTS_SMP ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID; #endif erts_port_task_sched_lock(&pp->sched); - erts_smp_atomic32_read_band_nob(&pp->sched.flags, - ~ERTS_PTS_FLG_HAVE_NS_TASKS); + flags = erts_smp_atomic32_read_band_nob(&pp->sched.flags, + ~ERTS_PTS_FLG_HAVE_NS_TASKS); abort_list = pp->sched.taskq.local.busy.nosuspend; pp->sched.taskq.local.busy.nosuspend = NULL; erts_port_task_sched_unlock(&pp->sched); @@ -1117,7 +1289,7 @@ erts_port_task_abort_nosuspend_tasks(Port *pp) #endif schedule_port_task_handle_list_free(pthlp); - abort_nosuspend_task(type, &td); + abort_nosuspend_task(pp, type, &td); } } @@ -1131,6 +1303,8 @@ erts_port_task_schedule(Eterm id, ErtsPortTaskType type, ...) { + ErtsProc2PortSigData *sigdp = NULL; + ErtsPortTaskHandleList *ns_pthlp = NULL; #ifdef ERTS_SMP ErtsRunQueue *xrunq; ErtsThrPrgrDelayHandle dhndl; @@ -1138,7 +1312,7 @@ erts_port_task_schedule(Eterm id, ErtsRunQueue *runq; Port *pp; ErtsPortTask *ptp = NULL; - erts_aint32_t act; + erts_aint32_t act, add_flags; if (pthp && erts_port_task_is_scheduled(pthp)) { ASSERT(0); @@ -1195,7 +1369,6 @@ erts_port_task_schedule(Eterm id, break; } case ERTS_PORT_TASK_PROC_SIG: { - ErtsProc2PortSigData *sigdp; va_list argp; ASSERT(!pthp); va_start(argp, type); @@ -1204,31 +1377,40 @@ erts_port_task_schedule(Eterm id, ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback); ptp->u.alive.flags |= va_arg(argp, int); va_end(argp); - if (ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND) - save_nosuspend_handle(pp, ptp); - else + if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND)) set_handle(ptp, pthp); + else { + ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST, + sizeof(ErtsPortTaskHandleList)); + set_handle(ptp, &ns_pthlp->handle); + } break; } default: break; } - act = enqueue_task(pp, ptp); - if (act & ERTS_PTS_FLG_EXIT) { + if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) { reset_handle(ptp); - goto fail; + if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT)) + goto abort_nosuspend; + else + goto fail; } + add_flags = ERTS_PTS_FLG_HAVE_TASKS; + if (ns_pthlp) + add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS; + while (1) { erts_aint32_t new, exp; - if ((act & ERTS_PTS_FLG_HAVE_TASKS) + if ((act & add_flags) == add_flags && (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) goto done; /* Done */ new = exp = act; - new |= ERTS_PTS_FLG_HAVE_TASKS; + new |= add_flags; if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) new |= ERTS_PTS_FLG_IN_RUNQ; @@ -1281,6 +1463,22 @@ done: return 0; +abort_nosuspend: + +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_port_dec_refc(pp); +#endif + + abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td); + + ASSERT(ns_pthlp); + erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp); + if (ptp) + port_task_free(ptp); + + return 0; + fail: #ifdef ERTS_SMP @@ -1288,6 +1486,9 @@ fail: erts_port_dec_refc(pp); #endif + if (ns_pthlp) + erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp); + if (ptp) port_task_free(ptp); @@ -1444,13 +1645,24 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) ptp->u.alive.td.io.event_data); io_tasks_executed++; break; - case ERTS_PORT_TASK_PROC_SIG: + case ERTS_PORT_TASK_PROC_SIG: { + ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); - reds += ptp->u.alive.td.psig.callback(pp, - state, - ERTS_PROC2PORT_SIG_EXEC, - &ptp->u.alive.td.psig.data); + if (!pp->sched.taskq.bpq) + reds += ptp->u.alive.td.psig.callback(pp, + state, + ERTS_PROC2PORT_SIG_EXEC, + sigdp); + else { + ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp); + reds += ptp->u.alive.td.psig.callback(pp, + state, + ERTS_PROC2PORT_SIG_EXEC, + sigdp); + dequeued_proc2port_data(pp, size); + } break; + } case ERTS_PORT_TASK_DIST_CMD: reds += erts_dist_command(pp, CONTEXT_REDS-reds); break; @@ -1633,12 +1845,23 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp) break; case ERTS_PORT_TASK_DIST_CMD: break; - case ERTS_PORT_TASK_PROC_SIG: - ptp->u.alive.td.psig.callback(NULL, - ERTS_PORT_SFLG_INVALID, - ERTS_PROC2PORT_SIG_ABORT_CLOSED, - &ptp->u.alive.td.psig.data); + case ERTS_PORT_TASK_PROC_SIG: { + ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data; + if (!pp->sched.taskq.bpq) + ptp->u.alive.td.psig.callback(NULL, + ERTS_PORT_SFLG_INVALID, + ERTS_PROC2PORT_SIG_ABORT_CLOSED, + sigdp); + else { + ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp); + ptp->u.alive.td.psig.callback(NULL, + ERTS_PORT_SFLG_INVALID, + ERTS_PROC2PORT_SIG_ABORT_CLOSED, + sigdp); + aborted_proc2port_data(pp, size); + } break; + } default: erl_exit(ERTS_ABORT_EXIT, "Invalid port task type: %d\n", diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h index c41f8104c7..9976604042 100644 --- a/erts/emulator/beam/erl_port_task.h +++ b/erts/emulator/beam/erl_port_task.h @@ -66,16 +66,20 @@ typedef enum { extern erts_smp_atomic_t erts_port_task_outstanding_io_tasks; #endif -#define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0) -#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1) -#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2) -#define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3) -#define ERTS_PTS_FLG_BUSY (((erts_aint32_t) 1) << 4) -#define ERTS_PTS_FLG_HAVE_BUSY_TASKS (((erts_aint32_t) 1) << 5) -#define ERTS_PTS_FLG_HAVE_NS_TASKS (((erts_aint32_t) 1) << 6) -#define ERTS_PTS_FLG_PARALLELISM (((erts_aint32_t) 1) << 7) -#define ERTS_PTS_FLG_FORCE_SCHED (((erts_aint32_t) 1) << 8) +#define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0) +#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1) +#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2) +#define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3) +#define ERTS_PTS_FLG_BUSY_PORT (((erts_aint32_t) 1) << 4) +#define ERTS_PTS_FLG_BUSY_PORT_Q (((erts_aint32_t) 1) << 5) +#define ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q (((erts_aint32_t) 1) << 6) +#define ERTS_PTS_FLG_HAVE_BUSY_TASKS (((erts_aint32_t) 1) << 7) +#define ERTS_PTS_FLG_HAVE_NS_TASKS (((erts_aint32_t) 1) << 8) +#define ERTS_PTS_FLG_PARALLELISM (((erts_aint32_t) 1) << 9) +#define ERTS_PTS_FLG_FORCE_SCHED (((erts_aint32_t) 1) << 10) +#define ERTS_PTS_FLGS_BUSY \ + (ERTS_PTS_FLG_BUSY_PORT | ERTS_PTS_FLG_BUSY_PORT_Q) #define ERTS_PTS_FLGS_FORCE_SCHEDULE_OP \ (ERTS_PTS_FLG_EXIT \ @@ -84,6 +88,15 @@ extern erts_smp_atomic_t erts_port_task_outstanding_io_tasks; | ERTS_PTS_FLG_EXEC \ | ERTS_PTS_FLG_FORCE_SCHED) +#define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH 8192 +#define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW 4096 + +typedef struct { + ErlDrvSizeT high; + erts_smp_atomic_t low; + erts_smp_atomic_t size; +} ErtsPortTaskBusyPortQ; + typedef struct ErtsPortTask_ ErtsPortTask; typedef struct ErtsPortTaskBusyCallerTable_ ErtsPortTaskBusyCallerTable; typedef struct ErtsPortTaskHandleList_ ErtsPortTaskHandleList; @@ -104,6 +117,7 @@ typedef struct { ErtsPortTask *first; ErtsPortTask *last; } in; + ErtsPortTaskBusyPortQ *bpq; } taskq; erts_smp_atomic32_t flags; #ifdef ERTS_SMP @@ -113,6 +127,8 @@ typedef struct { ERTS_GLB_INLINE void erts_port_task_handle_init(ErtsPortTaskHandle *pthp); ERTS_GLB_INLINE int erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp); +ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp, + ErtsPortTaskBusyPortQ *bpq); ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp, Eterm id); ERTS_GLB_INLINE void erts_port_task_fini_sched(ErtsPortTaskSched *ptsp); @@ -138,6 +154,18 @@ erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp) return ((void *) erts_smp_atomic_read_nob(pthp)) != NULL; } +ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp, + ErtsPortTaskBusyPortQ *bpq) +{ + if (bpq) { + erts_aint_t low = (erts_aint_t) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW; + erts_smp_atomic_init_nob(&bpq->low, low); + bpq->high = (ErlDrvSizeT) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH; + erts_smp_atomic_init_nob(&bpq->size, (erts_aint_t) 0); + } + ptsp->taskq.bpq = bpq; +} + ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp, Eterm instr_id) { diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 9024e9ab52..32f3a675fa 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -270,9 +270,10 @@ static Port *create_port(char *name, Eterm pid, int *enop) { + ErtsPortTaskBusyPortQ *busy_port_queue; Port *prt; char *p; - size_t port_size, size; + size_t port_size, busy_port_queue_size, size; erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED; erts_aint32_t x_pts_flgs = 0; #ifdef DEBUG @@ -288,7 +289,13 @@ static Port *create_port(char *name, } else #endif - port_size = size = sizeof(Port); + port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); + + busy_port_queue_size + = ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ) + ? 0 + : ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ))); + size += busy_port_queue_size; size += sys_strlen(name) + 1; @@ -302,6 +309,13 @@ static Port *create_port(char *name, prt = (Port *) p; p += port_size; + if (!busy_port_queue_size) + busy_port_queue = NULL; + else { + busy_port_queue = (ErtsPortTaskBusyPortQ *) p; + p += busy_port_queue_size; + } + #ifdef ERTS_SMP if (driver_lock) { prt->lock = driver_lock; @@ -320,6 +334,8 @@ static Port *create_port(char *name, prt->cleanup = 0; #endif + erts_port_task_pre_init_sched(&prt->sched, busy_port_queue); + prt->name = p; sys_strcpy(p, name); prt->drv_ptr = driver; @@ -503,8 +519,7 @@ erts_save_suspend_process_on_port(Port *prt, Process *process) erts_aint32_t flags; erts_port_task_sched_lock(&prt->sched); flags = erts_smp_atomic32_read_nob(&prt->sched.flags); - saved = ((flags & (ERTS_PTS_FLG_BUSY - | ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY); + saved = (flags & ERTS_PTS_FLGS_BUSY) && !(flags & ERTS_PTS_FLG_EXIT); if (saved) erts_proclist_store_last(&prt->suspended, erts_proclist_create(process)); erts_port_task_sched_unlock(&prt->sched); @@ -1530,7 +1545,7 @@ bad_port_signal(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = flags; + sigdp->flags = (flags & ~ERTS_P2P_SIG_TYPE_MASK) | ERTS_P2P_SIG_TYPE_BAD; return erts_schedule_proc2port_signal(c_p, prt, @@ -1736,7 +1751,7 @@ erts_port_output(Process *c_p, erts_driver_t *drv = prt->drv_ptr; size_t size; int try_call; - erts_aint32_t sched_flags, busy_flag, invalid_flags; + erts_aint32_t sched_flags, busy_flgs, invalid_flags; int task_flags; ErtsProc2PortSigCallback port_sig_callback; ErlDrvBinary *cbin = NULL; @@ -1747,10 +1762,10 @@ erts_port_output(Process *c_p, | ERTS_PORT_SIG_FLG_NOSUSPEND | ERTS_PORT_SIG_FLG_FORCE)) == 0); - busy_flag = ((flags & ERTS_PORT_SIG_FLG_FORCE) + busy_flgs = ((flags & ERTS_PORT_SIG_FLG_FORCE) ? ((erts_aint32_t) 0) - : ERTS_PTS_FLG_BUSY); - invalid_flags = busy_flag; + : ERTS_PTS_FLGS_BUSY); + invalid_flags = busy_flgs; if (!refp) invalid_flags |= ERTS_PTS_FLG_PARALLELISM; @@ -1759,10 +1774,10 @@ erts_port_output(Process *c_p, */ sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); - if (sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT)) - return ((sched_flags & busy_flag) - ? ERTS_PORT_OP_BUSY - : ERTS_PORT_OP_DROPPED); + if (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT)) + return ((sched_flags & ERTS_PTS_FLG_EXIT) + ? ERTS_PORT_OP_DROPPED + : ERTS_PORT_OP_BUSY); try_call = !(sched_flags & (invalid_flags|ERTS_PTS_FLGS_FORCE_SCHEDULE_OP)); @@ -1949,6 +1964,7 @@ erts_port_output(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV; sigdp->u.outputv.from = from; sigdp->u.outputv.evp = evp; sigdp->u.outputv.cbinp = cbin; @@ -2038,6 +2054,7 @@ erts_port_output(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUT; sigdp->u.output.from = from; sigdp->u.output.bufp = buf; sigdp->u.output.size = size; @@ -2045,7 +2062,7 @@ erts_port_output(Process *c_p, } task_flags = ERTS_PT_FLG_WAIT_BUSY; - sigdp->flags = flags; + sigdp->flags |= flags; if (flags & (ERTS_P2P_SIG_DATA_FLG_FORCE|ERTS_P2P_SIG_DATA_FLG_NOSUSPEND)) { task_flags = 0; if (flags & ERTS_P2P_SIG_DATA_FLG_FORCE) @@ -2070,7 +2087,7 @@ erts_port_output(Process *c_p, return res; } - if ((sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY) + if (!(sched_flags & ERTS_PTS_FLG_EXIT) && (sched_flags & busy_flgs)) return ERTS_PORT_OP_BUSY_SCHEDULED; return res; @@ -2201,7 +2218,7 @@ erts_port_exit(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = flags; + sigdp->flags = ERTS_P2P_SIG_TYPE_EXIT | flags; sigdp->u.exit.from = from; if (is_immed(reason)) { @@ -2367,7 +2384,7 @@ erts_port_connect(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = flags; + sigdp->flags = ERTS_P2P_SIG_TYPE_CONNECT | flags; sigdp->u.connect.from = from; sigdp->u.connect.connected = connect_id; @@ -2424,7 +2441,7 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp) } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_UNLINK; sigdp->u.unlink.from = from; return erts_schedule_proc2port_signal(c_p, @@ -2509,7 +2526,7 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_LINK; sigdp->u.link.port = prt->common.id; sigdp->u.link.to = to; @@ -3851,7 +3868,7 @@ erts_port_control(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL; sigdp->u.control.binp = binp; sigdp->u.control.command = command; sigdp->u.control.bufp = bufp; @@ -4131,7 +4148,7 @@ erts_port_call(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_CALL; sigdp->u.call.command = command; sigdp->u.call.bufp = bufp; sigdp->u.call.size = size; @@ -4298,7 +4315,7 @@ erts_port_info(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_INFO; sigdp->u.info.item = item; return erts_schedule_proc2port_signal(c_p, @@ -4383,7 +4400,7 @@ erts_port_set_data(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_SET_DATA; sigdp->u.set_data.data = set_data; sigdp->u.set_data.bp = bp; @@ -4516,7 +4533,7 @@ erts_port_get_data(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_GET_DATA; return erts_schedule_proc2port_signal(c_p, prt, @@ -4608,8 +4625,8 @@ set_busy_port(ErlDrvPort dprt, int on) if (on) { flags = erts_smp_atomic32_read_bor_acqb(&prt->sched.flags, - ERTS_PTS_FLG_BUSY); - if (flags & ERTS_PTS_FLG_BUSY) + ERTS_PTS_FLG_BUSY_PORT); + if (flags & ERTS_PTS_FLG_BUSY_PORT) return; /* Already busy */ if (flags & ERTS_PTS_FLG_HAVE_NS_TASKS) @@ -4623,11 +4640,9 @@ set_busy_port(ErlDrvPort dprt, int on) } #endif } else { - ErtsProcList *plp; - flags = erts_smp_atomic32_read_band_acqb(&prt->sched.flags, - ~ERTS_PTS_FLG_BUSY); - if (!(flags & ERTS_PTS_FLG_BUSY)) + ~ERTS_PTS_FLG_BUSY_PORT); + if (!(flags & ERTS_PTS_FLG_BUSY_PORT)) return; /* Already non-busy */ #ifdef USE_VM_PROBES @@ -4645,52 +4660,60 @@ set_busy_port(ErlDrvPort dprt, int on) erts_dist_port_not_busy(prt); } - /* - * Resume, in a round-robin fashion, all processes waiting on the port. - * - * This version submitted by Tony Rogvall. The earlier version used - * to resume the processes in order, which caused starvation of all but - * the first process. - */ + if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q)) + erts_port_resume_procs(prt); + } +} - erts_port_task_sched_lock(&prt->sched); - plp = prt->suspended; - prt->suspended = NULL; - erts_port_task_sched_unlock(&prt->sched); +void +erts_port_resume_procs(Port *prt) +{ + /* + * Resume, in a round-robin fashion, all processes waiting on the port. + * + * This version submitted by Tony Rogvall. The earlier version used + * to resume the processes in order, which caused starvation of all but + * the first process. + */ + ErtsProcList *plp; - if (erts_proclist_fetch(&plp, NULL)) { + erts_port_task_sched_lock(&prt->sched); + plp = prt->suspended; + prt->suspended = NULL; + erts_port_task_sched_unlock(&prt->sched); + + if (erts_proclist_fetch(&plp, NULL)) { #ifdef USE_VM_PROBES - /* - * Hrm, for blocked dist ports, plp always seems to be NULL. - * That's not so fun. - * Well, another way to get the same info is using a D - * script to correlate an earlier process-port_blocked+pid - * event with a later process-scheduled event. That's - * subject to the multi-CPU races with how events are - * handled, but hey, that way works most of the time. - */ - if (DTRACE_ENABLED(process_port_unblocked)) { - DTRACE_CHARBUF(pid_str, 16); - ErtsProcList* plp2 = plp; - - erts_snprintf(port_str, sizeof(port_str), - "%T", prt->common.id); - while (plp2 != NULL) { - erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); - DTRACE2(process_port_unblocked, pid_str, port_str); - } - } -#endif + /* + * Hrm, for blocked dist ports, plp always seems to be NULL. + * That's not so fun. + * Well, another way to get the same info is using a D + * script to correlate an earlier process-port_blocked+pid + * event with a later process-scheduled event. That's + * subject to the multi-CPU races with how events are + * handled, but hey, that way works most of the time. + */ + if (DTRACE_ENABLED(process_port_unblocked)) { + DTRACE_CHARBUF(port_str, 16); + DTRACE_CHARBUF(pid_str, 16); + ErtsProcList* plp2 = plp; - /* First proc should be resumed last */ - if (plp->next) { - plp->next->prev = NULL; - erts_resume_processes(plp->next); - plp->next = NULL; + erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id); + while (plp2 != NULL) { + erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); + DTRACE2(process_port_unblocked, pid_str, port_str); } - erts_resume_processes(plp); - } + } +#endif + + /* First proc should be resumed last */ + if (plp->next) { + plp->next->prev = NULL; + erts_resume_processes(plp->next); + plp->next = NULL; + } + erts_resume_processes(plp); } } diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 4a87b0a385..f68ce68407 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -692,6 +692,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_LOPT_UDP_READ_PACKETS 33 /* Number of packets to read */ #define INET_OPT_RAW 34 /* Raw socket options */ #define INET_LOPT_TCP_SEND_TIMEOUT_CLOSE 35 /* auto-close on send timeout or not */ +#define INET_LOPT_TCP_MSGQ_HIWTRMRK 36 /* set local high watermark */ +#define INET_LOPT_TCP_MSGQ_LOWTRMRK 37 /* set local low watermark */ /* SCTP options: a separate range, from 100: */ #define SCTP_OPT_RTOINFO 100 #define SCTP_OPT_ASSOCINFO 101 @@ -808,6 +810,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy */ #define INET_LOW_WATERMARK (1024*4) /* 4k pending => allow more */ +#define INET_HIGH_MSGQ_WATERMARK (1024*8) /* 8k pending high => busy */ +#define INET_LOW_MSGQ_WATERMARK (1024*4) /* 4k pending => allow more */ #define INET_INFINITY 0xffffffff /* infinity value */ @@ -5537,6 +5541,28 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) } continue; + case INET_LOPT_TCP_MSGQ_HIWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT high; + if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN + || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival) + return -1; + high = (ErlDrvSizeT) ival; + erl_drv_busy_msgq_limits(desc->port, NULL, &high); + } + continue; + + case INET_LOPT_TCP_MSGQ_LOWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT low; + if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN + || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival) + return -1; + low = (ErlDrvSizeT) ival; + erl_drv_busy_msgq_limits(desc->port, &low, NULL); + } + continue; + case INET_LOPT_TCP_SEND_TIMEOUT: if (desc->stype == SOCK_STREAM) { tcp_descriptor* tdesc = (tcp_descriptor*) desc; @@ -6422,6 +6448,32 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc, } continue; + case INET_LOPT_TCP_MSGQ_HIWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT high = ERL_DRV_BUSY_MSGQ_READ_ONLY; + *ptr++ = opt; + erl_drv_busy_msgq_limits(desc->port, NULL, &high); + ival = high > INT_MAX ? INT_MAX : (int) high; + put_int32(ival, ptr); + } + else { + TRUNCATE_TO(0,ptr); + } + continue; + + case INET_LOPT_TCP_MSGQ_LOWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT low = ERL_DRV_BUSY_MSGQ_READ_ONLY; + *ptr++ = opt; + erl_drv_busy_msgq_limits(desc->port, &low, NULL); + ival = low > INT_MAX ? INT_MAX : (int) low; + put_int32(ival, ptr); + } + else { + TRUNCATE_TO(0,ptr); + } + continue; + case INET_LOPT_TCP_SEND_TIMEOUT: if (desc->stype == SOCK_STREAM) { *ptr++ = opt; @@ -8029,6 +8081,7 @@ static int tcp_inet_init(void) static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args) { + ErlDrvSizeT q_low, q_high; tcp_descriptor* desc; DEBUGF(("tcp_inet_start(%ld) {\r\n", (long)port)); @@ -8038,6 +8091,17 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args) return ERL_DRV_ERROR_ERRNO; desc->high = INET_HIGH_WATERMARK; desc->low = INET_LOW_WATERMARK; + q_high = INET_HIGH_MSGQ_WATERMARK; + q_low = INET_LOW_MSGQ_WATERMARK; + if (q_low < ERL_DRV_BUSY_MSGQ_LIM_MIN) + q_low = ERL_DRV_BUSY_MSGQ_LIM_MIN; + else if (q_low > ERL_DRV_BUSY_MSGQ_LIM_MAX) + q_low = ERL_DRV_BUSY_MSGQ_LIM_MAX; + if (q_high < ERL_DRV_BUSY_MSGQ_LIM_MIN) + q_high = ERL_DRV_BUSY_MSGQ_LIM_MIN; + else if (q_high > ERL_DRV_BUSY_MSGQ_LIM_MAX) + q_high = ERL_DRV_BUSY_MSGQ_LIM_MAX; + erl_drv_busy_msgq_limits(port, &q_low, &q_high); desc->send_timeout = INET_INFINITY; desc->send_timeout_close = 0; desc->busy_on_send = 0; @@ -8061,6 +8125,7 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args) static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s, ErlDrvTermData owner, int* err) { + ErlDrvSizeT q_low, q_high; ErlDrvPort port = desc->inet.port; tcp_descriptor* copy_desc; @@ -8099,6 +8164,13 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s, FREE(copy_desc); return NULL; } + + /* Read busy msgq limits of parent */ + q_low = q_high = ERL_DRV_BUSY_MSGQ_READ_ONLY; + erl_drv_busy_msgq_limits(desc->inet.port, &q_low, &q_high); + /* Write same busy msgq limits to child */ + erl_drv_busy_msgq_limits(port, &q_low, &q_high); + copy_desc->inet.port = port; copy_desc->inet.dport = driver_mk_port(port); *err = 0; |