aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_port_task.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-11-29 01:24:43 +0100
committerRickard Green <[email protected]>2012-12-07 00:24:27 +0100
commit9e4895da833b7777e69efc173f5dc777aaea3201 (patch)
tree02338dbdbf0b449b7b643437bf2cdd0cf73e4615 /erts/emulator/beam/erl_port_task.c
parent43ebafb5fb40aee326b951d18c1880e6e5fdef6b (diff)
downloadotp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.gz
otp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.bz2
otp-9e4895da833b7777e69efc173f5dc777aaea3201.zip
Add support for busy port message queue
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r--erts/emulator/beam/erl_port_task.c431
1 files changed, 327 insertions, 104 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index b6b113b753..89f6a4e774 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -539,103 +539,257 @@ set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
}
}
+
/*
- * No-suspend handles.
+ * Busy port queue management
*/
-#ifdef ERTS_SMP
-static void
-free_port_task_handle_list(void *vpthlp)
+static erts_aint32_t
+check_unset_busy_port_q(Port *pp,
+ erts_aint32_t flags,
+ ErtsPortTaskBusyPortQ *bpq)
{
- erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
-}
-#endif
+ ErlDrvSizeT qsize, low;
+ int resume_procs = 0;
-static void
-schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
-{
-#ifdef ERTS_SMP
- erts_schedule_thr_prgr_later_op(free_port_task_handle_list,
- (void *) pthlp,
- &pthlp->u.release);
-#else
- erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp);
-#endif
+ ASSERT(bpq);
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+
+ erts_port_task_sched_lock(&pp->sched);
+ qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
+ low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
+ if (qsize < low) {
+ erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
+ | ERTS_PTS_FLG_BUSY_PORT_Q);
+ flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask);
+ if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
+ resume_procs = 1;
+ }
+ else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
+ flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags,
+ ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+ flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
+ }
+ erts_port_task_sched_unlock(&pp->sched);
+ if (resume_procs)
+ erts_port_resume_procs(pp);
+
+ return flags;
}
static ERTS_INLINE void
-abort_nosuspend_task(ErtsPortTaskType type,
- ErtsPortTaskTypeData *tdp)
+aborted_proc2port_data(Port *pp, ErlDrvSizeT size)
{
+ ErtsPortTaskBusyPortQ *bpq;
+ erts_aint32_t flags;
+ ErlDrvSizeT qsz;
- if (type != ERTS_PORT_TASK_PROC_SIG)
- ERTS_INTERNAL_ERROR("Invalid no-suspend port task type");
+ ASSERT(pp->sched.taskq.bpq);
- tdp->psig.callback(NULL,
- ERTS_PORT_SFLG_INVALID,
- ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
- &tdp->psig.data);
-}
+ if (size == 0)
+ return;
+ bpq = pp->sched.taskq.bpq;
-static void
-save_nosuspend_handle(Port *pp, ErtsPortTask *ptp)
-{
- erts_aint32_t act;
- ErtsPortTaskHandleList *pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
- sizeof(ErtsPortTaskHandleList));
+ qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) -size);
+ ASSERT(qsz + size > qsz);
+ flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ ASSERT(pp->sched.taskq.bpq);
+ if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
+ | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q)
+ return;
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low))
+ erts_smp_atomic32_read_bor_nob(&pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+}
- set_handle(ptp, &pthlp->handle);
+static ERTS_INLINE void
+dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
+{
+ ErtsPortTaskBusyPortQ *bpq;
+ erts_aint32_t flags;
+ ErlDrvSizeT qsz;
- ASSERT(ptp == handle2task(&pthlp->handle));
- ASSERT(ptp->u.alive.handle == &pthlp->handle);
+ ASSERT(pp->sched.taskq.bpq);
- act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (size == 0)
+ return;
- if (!(act & ERTS_PTS_FLG_BUSY)) {
+ bpq = pp->sched.taskq.bpq;
- erts_port_task_sched_lock(&pp->sched);
- pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
- pp->sched.taskq.local.busy.nosuspend = pthlp;
- erts_port_task_sched_unlock(&pp->sched);
+ qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) -size);
+ ASSERT(qsz + size > qsz);
+ flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
+ return;
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low))
+ check_unset_busy_port_q(pp, flags, bpq);
+}
- act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+static ERTS_INLINE erts_aint32_t
+enqueue_proc2port_data(Port *pp,
+ ErtsProc2PortSigData *sigdp,
+ erts_aint32_t flags)
+{
+ ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
+ if (sigdp && bpq) {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ if (size) {
+ erts_aint_t asize = erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) size);
+ ErlDrvSizeT qsz = (ErlDrvSizeT) asize;
+
+ ASSERT(qsz - size < qsz);
+
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
+ flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags,
+ ERTS_PTS_FLG_BUSY_PORT_Q);
+ flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
+ qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size);
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) {
+ flags = (erts_smp_atomic32_read_bor_relb(
+ &pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
+ flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
+ }
+ }
+ ASSERT(!(flags & ERTS_PTS_FLG_EXIT));
+ }
+ }
+ return flags;
+}
- while (1) {
- erts_aint32_t exp, new;
+/*
+ * erl_drv_busy_msgq_limits() is called by drivers either reading or
+ * writing the limits.
+ *
+ * A limit of zero is interpreted as a read only request (using a
+ * limit of zero would not be useful). Other values are interpreted
+ * as a write-read request.
+ */
- if ((act & (ERTS_PTS_FLG_BUSY|ERTS_PTS_FLG_HAVE_NS_TASKS))
- == ERTS_PTS_FLG_HAVE_NS_TASKS)
- return;
+void
+erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
+{
+ Port *pp = erts_drvport2port(dport, NULL);
+ ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
+ int written = 0, resume_procs = 0;
+ ErlDrvSizeT low, high;
+
+ if (!pp || !bpq) {
+ if (lowp)
+ *lowp = ERL_DRV_BUSY_MSGQ_DISABLED;
+ if (highp)
+ *highp = ERL_DRV_BUSY_MSGQ_DISABLED;
+ return;
+ }
- if (act & ERTS_PTS_FLG_BUSY) {
- erts_aint32_t s;
- s = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
- ERTS_PT_STATE_ABORTED,
- ERTS_PT_STATE_SCHEDULED);
- if (s == ERTS_PT_STATE_SCHEDULED) {
- reset_port_task_handle(&pthlp->handle);
- break; /* Abort task */
- }
- /* Else: someone else handled it */
- return;
- }
+ low = lowp ? *lowp : 0;
+ high = highp ? *highp : 0;
- new = exp = act;
+ erts_port_task_sched_lock(&pp->sched);
- new |= ERTS_PTS_FLG_HAVE_NS_TASKS;
+ if (low == ERL_DRV_BUSY_MSGQ_DISABLED
+ || high == ERL_DRV_BUSY_MSGQ_DISABLED) {
+ /* Disable busy msgq feature */
+ erts_aint32_t flags;
+ pp->sched.taskq.bpq = NULL;
+ flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+ flags = erts_smp_atomic32_read_band_acqb(&pp->sched.flags, flags);
+ if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
+ resume_procs = 1;
+ }
+ else {
- act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
- if (act == exp)
- return;
+ if (!low)
+ low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
+ else {
+ if (bpq->high < low)
+ bpq->high = low;
+ erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
+ written = 1;
+ }
+
+ if (!high)
+ high = bpq->high;
+ else {
+ if (low > high) {
+ low = high;
+ erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
+ }
+ bpq->high = high;
+ written = 1;
+ }
+ if (written) {
+ ErlDrvSizeT size = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
+ if (size > high)
+ erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ ERTS_PTS_FLG_BUSY_PORT_Q);
+ else if (size < low)
+ erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
}
}
+ erts_port_task_sched_unlock(&pp->sched);
- abort_nosuspend_task(ptp->type, &ptp->u.alive.td);
+ if (resume_procs)
+ erts_port_resume_procs(pp);
+ if (lowp)
+ *lowp = low;
+ if (highp)
+ *highp = high;
}
+/*
+ * No-suspend handles.
+ */
+
+#ifdef ERTS_SMP
+static void
+free_port_task_handle_list(void *vpthlp)
+{
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
+}
+#endif
+
+static void
+schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
+{
+#ifdef ERTS_SMP
+ erts_schedule_thr_prgr_later_op(free_port_task_handle_list,
+ (void *) pthlp,
+ &pthlp->u.release);
+#else
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp);
+#endif
+}
+
+static ERTS_INLINE void
+abort_nosuspend_task(Port *pp,
+ ErtsPortTaskType type,
+ ErtsPortTaskTypeData *tdp)
+{
+
+ ASSERT(type == ERTS_PORT_TASK_PROC_SIG);
+
+ if (!pp->sched.taskq.bpq)
+ tdp->psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
+ &tdp->psig.data);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data);
+ tdp->psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
+ &tdp->psig.data);
+ aborted_proc2port_data(pp, size);
+ }
+}
static ErtsPortTaskHandleList *
get_free_nosuspend_handles(Port *pp)
@@ -724,14 +878,29 @@ pop_port(ErtsRunQueue *runq)
* Task queue operations
*/
-static ERTS_INLINE erts_aint32_t
-enqueue_task(Port *pp, ErtsPortTask *ptp)
+static ERTS_INLINE int
+enqueue_task(Port *pp,
+ ErtsPortTask *ptp,
+ ErtsProc2PortSigData *sigdp,
+ ErtsPortTaskHandleList *ns_pthlp,
+ erts_aint32_t *flagsp)
+
{
+ int res;
+ erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT;
erts_aint32_t flags;
ptp->u.alive.next = NULL;
+ if (ns_pthlp)
+ fail_flags |= ERTS_PTS_FLG_BUSY_PORT;
erts_port_task_sched_lock(&pp->sched);
flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
- if (!(flags & ERTS_PTS_FLG_EXIT)) {
+ if (flags & fail_flags)
+ res = 0;
+ else {
+ if (ns_pthlp) {
+ ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
+ pp->sched.taskq.local.busy.nosuspend = ns_pthlp;
+ }
if (pp->sched.taskq.in.last) {
ASSERT(pp->sched.taskq.in.first);
ASSERT(!pp->sched.taskq.in.last->u.alive.next);
@@ -744,9 +913,12 @@ enqueue_task(Port *pp, ErtsPortTask *ptp)
pp->sched.taskq.in.first = ptp;
}
pp->sched.taskq.in.last = ptp;
+ flags = enqueue_proc2port_data(pp, sigdp, flags);
+ res = 1;
}
erts_port_task_sched_unlock(&pp->sched);
- return flags;
+ *flagsp = flags;
+ return res;
}
static ERTS_INLINE void
@@ -754,7 +926,7 @@ prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags);
- if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY)) {
+ if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) {
*execqp = pp->sched.taskq.local.first;
*processing_busy_q_p = 0;
}
@@ -799,10 +971,9 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
*execq = NULL;
- /* guess a likely value */
- act = ERTS_PTS_FLG_EXEC;
- if (execq)
- act |= ERTS_PTS_FLG_HAVE_TASKS;
+ act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
+ act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq);
while (1) {
erts_aint32_t new, exp;
@@ -829,9 +1000,12 @@ select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
+ flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq);
+
ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
- if (flags & ERTS_PTS_FLG_BUSY) {
+ if (flags & ERTS_PTS_FLG_BUSY_PORT) {
if (*processing_busy_q_p) {
ErtsPortTask *ptp;
@@ -880,7 +1054,7 @@ check_task_for_exec(Port *pp,
ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
- if ((flags & ERTS_PTS_FLG_BUSY)
+ if ((flags & ERTS_PTS_FLG_BUSY_PORT)
&& (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) {
busy_wait_move_to_busy_queue(pp, ptp);
@@ -902,7 +1076,7 @@ check_task_for_exec(Port *pp,
else {
/* Processing busy queue */
- ASSERT(!(flags & ERTS_PTS_FLG_BUSY));
+ ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT));
ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
@@ -1019,10 +1193,7 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
break;
case ERTS_PORT_TASK_PROC_SIG:
- ptp->u.alive.td.psig.callback(NULL,
- ERTS_PORT_SFLG_INVALID,
- ERTS_PROC2PORT_SIG_ABORT,
- &ptp->u.alive.td.psig.data);
+ ERTS_INTERNAL_ERROR("Aborted process to port signal");
break;
default:
break;
@@ -1042,14 +1213,15 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
void
erts_port_task_abort_nosuspend_tasks(Port *pp)
{
+ erts_aint32_t flags;
ErtsPortTaskHandleList *abort_list;
#ifdef ERTS_SMP
ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID;
#endif
erts_port_task_sched_lock(&pp->sched);
- erts_smp_atomic32_read_band_nob(&pp->sched.flags,
- ~ERTS_PTS_FLG_HAVE_NS_TASKS);
+ flags = erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_NS_TASKS);
abort_list = pp->sched.taskq.local.busy.nosuspend;
pp->sched.taskq.local.busy.nosuspend = NULL;
erts_port_task_sched_unlock(&pp->sched);
@@ -1117,7 +1289,7 @@ erts_port_task_abort_nosuspend_tasks(Port *pp)
#endif
schedule_port_task_handle_list_free(pthlp);
- abort_nosuspend_task(type, &td);
+ abort_nosuspend_task(pp, type, &td);
}
}
@@ -1131,6 +1303,8 @@ erts_port_task_schedule(Eterm id,
ErtsPortTaskType type,
...)
{
+ ErtsProc2PortSigData *sigdp = NULL;
+ ErtsPortTaskHandleList *ns_pthlp = NULL;
#ifdef ERTS_SMP
ErtsRunQueue *xrunq;
ErtsThrPrgrDelayHandle dhndl;
@@ -1138,7 +1312,7 @@ erts_port_task_schedule(Eterm id,
ErtsRunQueue *runq;
Port *pp;
ErtsPortTask *ptp = NULL;
- erts_aint32_t act;
+ erts_aint32_t act, add_flags;
if (pthp && erts_port_task_is_scheduled(pthp)) {
ASSERT(0);
@@ -1195,7 +1369,6 @@ erts_port_task_schedule(Eterm id,
break;
}
case ERTS_PORT_TASK_PROC_SIG: {
- ErtsProc2PortSigData *sigdp;
va_list argp;
ASSERT(!pthp);
va_start(argp, type);
@@ -1204,31 +1377,40 @@ erts_port_task_schedule(Eterm id,
ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback);
ptp->u.alive.flags |= va_arg(argp, int);
va_end(argp);
- if (ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND)
- save_nosuspend_handle(pp, ptp);
- else
+ if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND))
set_handle(ptp, pthp);
+ else {
+ ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
+ sizeof(ErtsPortTaskHandleList));
+ set_handle(ptp, &ns_pthlp->handle);
+ }
break;
}
default:
break;
}
- act = enqueue_task(pp, ptp);
- if (act & ERTS_PTS_FLG_EXIT) {
+ if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) {
reset_handle(ptp);
- goto fail;
+ if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT))
+ goto abort_nosuspend;
+ else
+ goto fail;
}
+ add_flags = ERTS_PTS_FLG_HAVE_TASKS;
+ if (ns_pthlp)
+ add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS;
+
while (1) {
erts_aint32_t new, exp;
- if ((act & ERTS_PTS_FLG_HAVE_TASKS)
+ if ((act & add_flags) == add_flags
&& (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
goto done; /* Done */
new = exp = act;
- new |= ERTS_PTS_FLG_HAVE_TASKS;
+ new |= add_flags;
if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
new |= ERTS_PTS_FLG_IN_RUNQ;
@@ -1281,6 +1463,22 @@ done:
return 0;
+abort_nosuspend:
+
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_port_dec_refc(pp);
+#endif
+
+ abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td);
+
+ ASSERT(ns_pthlp);
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
+ if (ptp)
+ port_task_free(ptp);
+
+ return 0;
+
fail:
#ifdef ERTS_SMP
@@ -1288,6 +1486,9 @@ fail:
erts_port_dec_refc(pp);
#endif
+ if (ns_pthlp)
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
+
if (ptp)
port_task_free(ptp);
@@ -1444,13 +1645,24 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ptp->u.alive.td.io.event_data);
io_tasks_executed++;
break;
- case ERTS_PORT_TASK_PROC_SIG:
+ case ERTS_PORT_TASK_PROC_SIG: {
+ ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
- reds += ptp->u.alive.td.psig.callback(pp,
- state,
- ERTS_PROC2PORT_SIG_EXEC,
- &ptp->u.alive.td.psig.data);
+ if (!pp->sched.taskq.bpq)
+ reds += ptp->u.alive.td.psig.callback(pp,
+ state,
+ ERTS_PROC2PORT_SIG_EXEC,
+ sigdp);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ reds += ptp->u.alive.td.psig.callback(pp,
+ state,
+ ERTS_PROC2PORT_SIG_EXEC,
+ sigdp);
+ dequeued_proc2port_data(pp, size);
+ }
break;
+ }
case ERTS_PORT_TASK_DIST_CMD:
reds += erts_dist_command(pp, CONTEXT_REDS-reds);
break;
@@ -1633,12 +1845,23 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp)
break;
case ERTS_PORT_TASK_DIST_CMD:
break;
- case ERTS_PORT_TASK_PROC_SIG:
- ptp->u.alive.td.psig.callback(NULL,
- ERTS_PORT_SFLG_INVALID,
- ERTS_PROC2PORT_SIG_ABORT_CLOSED,
- &ptp->u.alive.td.psig.data);
+ case ERTS_PORT_TASK_PROC_SIG: {
+ ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
+ if (!pp->sched.taskq.bpq)
+ ptp->u.alive.td.psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_CLOSED,
+ sigdp);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ ptp->u.alive.td.psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_CLOSED,
+ sigdp);
+ aborted_proc2port_data(pp, size);
+ }
break;
+ }
default:
erl_exit(ERTS_ABORT_EXIT,
"Invalid port task type: %d\n",