From 9e4895da833b7777e69efc173f5dc777aaea3201 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Thu, 29 Nov 2012 01:24:43 +0100 Subject: Add support for busy port message queue --- erts/emulator/beam/erl_port_task.c | 431 ++++++++++++++++++++++++++++--------- 1 file changed, 327 insertions(+), 104 deletions(-) (limited to 'erts/emulator/beam/erl_port_task.c') diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index b6b113b753..89f6a4e774 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -539,103 +539,257 @@ set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp) } } + /* - * No-suspend handles. + * Busy port queue management */ -#ifdef ERTS_SMP -static void -free_port_task_handle_list(void *vpthlp) +static erts_aint32_t +check_unset_busy_port_q(Port *pp, + erts_aint32_t flags, + ErtsPortTaskBusyPortQ *bpq) { - erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp); -} -#endif + ErlDrvSizeT qsize, low; + int resume_procs = 0; -static void -schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp) -{ -#ifdef ERTS_SMP - erts_schedule_thr_prgr_later_op(free_port_task_handle_list, - (void *) pthlp, - &pthlp->u.release); -#else - erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp); -#endif + ASSERT(bpq); + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); + + erts_port_task_sched_lock(&pp->sched); + qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size); + low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low); + if (qsize < low) { + erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q + | ERTS_PTS_FLG_BUSY_PORT_Q); + flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask); + if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q) + resume_procs = 1; + } + else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) { + flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, + ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); + flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q; + } + erts_port_task_sched_unlock(&pp->sched); + if (resume_procs) + erts_port_resume_procs(pp); + + return flags; } static ERTS_INLINE void -abort_nosuspend_task(ErtsPortTaskType type, - ErtsPortTaskTypeData *tdp) +aborted_proc2port_data(Port *pp, ErlDrvSizeT size) { + ErtsPortTaskBusyPortQ *bpq; + erts_aint32_t flags; + ErlDrvSizeT qsz; - if (type != ERTS_PORT_TASK_PROC_SIG) - ERTS_INTERNAL_ERROR("Invalid no-suspend port task type"); + ASSERT(pp->sched.taskq.bpq); - tdp->psig.callback(NULL, - ERTS_PORT_SFLG_INVALID, - ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, - &tdp->psig.data); -} + if (size == 0) + return; + bpq = pp->sched.taskq.bpq; -static void -save_nosuspend_handle(Port *pp, ErtsPortTask *ptp) -{ - erts_aint32_t act; - ErtsPortTaskHandleList *pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST, - sizeof(ErtsPortTaskHandleList)); + qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size, + (erts_aint_t) -size); + ASSERT(qsz + size > qsz); + flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + ASSERT(pp->sched.taskq.bpq); + if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q + | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q) + return; + if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) + erts_smp_atomic32_read_bor_nob(&pp->sched.flags, + ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); +} - set_handle(ptp, &pthlp->handle); +static ERTS_INLINE void +dequeued_proc2port_data(Port *pp, ErlDrvSizeT size) +{ + ErtsPortTaskBusyPortQ *bpq; + erts_aint32_t flags; + ErlDrvSizeT qsz; - ASSERT(ptp == handle2task(&pthlp->handle)); - ASSERT(ptp->u.alive.handle == &pthlp->handle); + ASSERT(pp->sched.taskq.bpq); - act = erts_smp_atomic32_read_nob(&pp->sched.flags); + if (size == 0) + return; - if (!(act & ERTS_PTS_FLG_BUSY)) { + bpq = pp->sched.taskq.bpq; - erts_port_task_sched_lock(&pp->sched); - pthlp->u.next = pp->sched.taskq.local.busy.nosuspend; - pp->sched.taskq.local.busy.nosuspend = pthlp; - erts_port_task_sched_unlock(&pp->sched); + qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size, + (erts_aint_t) -size); + ASSERT(qsz + size > qsz); + flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q)) + return; + if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low)) + check_unset_busy_port_q(pp, flags, bpq); +} - act = erts_smp_atomic32_read_nob(&pp->sched.flags); +static ERTS_INLINE erts_aint32_t +enqueue_proc2port_data(Port *pp, + ErtsProc2PortSigData *sigdp, + erts_aint32_t flags) +{ + ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq; + if (sigdp && bpq) { + ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp); + if (size) { + erts_aint_t asize = erts_smp_atomic_add_read_acqb(&bpq->size, + (erts_aint_t) size); + ErlDrvSizeT qsz = (ErlDrvSizeT) asize; + + ASSERT(qsz - size < qsz); + + if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) { + flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags, + ERTS_PTS_FLG_BUSY_PORT_Q); + flags |= ERTS_PTS_FLG_BUSY_PORT_Q; + qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size); + if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) { + flags = (erts_smp_atomic32_read_bor_relb( + &pp->sched.flags, + ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)); + flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q; + } + } + ASSERT(!(flags & ERTS_PTS_FLG_EXIT)); + } + } + return flags; +} - while (1) { - erts_aint32_t exp, new; +/* + * erl_drv_busy_msgq_limits() is called by drivers either reading or + * writing the limits. + * + * A limit of zero is interpreted as a read only request (using a + * limit of zero would not be useful). Other values are interpreted + * as a write-read request. + */ - if ((act & (ERTS_PTS_FLG_BUSY|ERTS_PTS_FLG_HAVE_NS_TASKS)) - == ERTS_PTS_FLG_HAVE_NS_TASKS) - return; +void +erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp) +{ + Port *pp = erts_drvport2port(dport, NULL); + ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq; + int written = 0, resume_procs = 0; + ErlDrvSizeT low, high; + + if (!pp || !bpq) { + if (lowp) + *lowp = ERL_DRV_BUSY_MSGQ_DISABLED; + if (highp) + *highp = ERL_DRV_BUSY_MSGQ_DISABLED; + return; + } - if (act & ERTS_PTS_FLG_BUSY) { - erts_aint32_t s; - s = erts_smp_atomic32_cmpxchg_nob(&ptp->state, - ERTS_PT_STATE_ABORTED, - ERTS_PT_STATE_SCHEDULED); - if (s == ERTS_PT_STATE_SCHEDULED) { - reset_port_task_handle(&pthlp->handle); - break; /* Abort task */ - } - /* Else: someone else handled it */ - return; - } + low = lowp ? *lowp : 0; + high = highp ? *highp : 0; - new = exp = act; + erts_port_task_sched_lock(&pp->sched); - new |= ERTS_PTS_FLG_HAVE_NS_TASKS; + if (low == ERL_DRV_BUSY_MSGQ_DISABLED + || high == ERL_DRV_BUSY_MSGQ_DISABLED) { + /* Disable busy msgq feature */ + erts_aint32_t flags; + pp->sched.taskq.bpq = NULL; + flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); + flags = erts_smp_atomic32_read_band_acqb(&pp->sched.flags, flags); + if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q) + resume_procs = 1; + } + else { - act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); - if (act == exp) - return; + if (!low) + low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low); + else { + if (bpq->high < low) + bpq->high = low; + erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low); + written = 1; + } + + if (!high) + high = bpq->high; + else { + if (low > high) { + low = high; + erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low); + } + bpq->high = high; + written = 1; + } + if (written) { + ErlDrvSizeT size = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size); + if (size > high) + erts_smp_atomic32_read_bor_relb(&pp->sched.flags, + ERTS_PTS_FLG_BUSY_PORT_Q); + else if (size < low) + erts_smp_atomic32_read_bor_relb(&pp->sched.flags, + ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); } } + erts_port_task_sched_unlock(&pp->sched); - abort_nosuspend_task(ptp->type, &ptp->u.alive.td); + if (resume_procs) + erts_port_resume_procs(pp); + if (lowp) + *lowp = low; + if (highp) + *highp = high; } +/* + * No-suspend handles. + */ + +#ifdef ERTS_SMP +static void +free_port_task_handle_list(void *vpthlp) +{ + erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp); +} +#endif + +static void +schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp) +{ +#ifdef ERTS_SMP + erts_schedule_thr_prgr_later_op(free_port_task_handle_list, + (void *) pthlp, + &pthlp->u.release); +#else + erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp); +#endif +} + +static ERTS_INLINE void +abort_nosuspend_task(Port *pp, + ErtsPortTaskType type, + ErtsPortTaskTypeData *tdp) +{ + + ASSERT(type == ERTS_PORT_TASK_PROC_SIG); + + if (!pp->sched.taskq.bpq) + tdp->psig.callback(NULL, + ERTS_PORT_SFLG_INVALID, + ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, + &tdp->psig.data); + else { + ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data); + tdp->psig.callback(NULL, + ERTS_PORT_SFLG_INVALID, + ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, + &tdp->psig.data); + aborted_proc2port_data(pp, size); + } +} static ErtsPortTaskHandleList * get_free_nosuspend_handles(Port *pp) @@ -724,14 +878,29 @@ pop_port(ErtsRunQueue *runq) * Task queue operations */ -static ERTS_INLINE erts_aint32_t -enqueue_task(Port *pp, ErtsPortTask *ptp) +static ERTS_INLINE int +enqueue_task(Port *pp, + ErtsPortTask *ptp, + ErtsProc2PortSigData *sigdp, + ErtsPortTaskHandleList *ns_pthlp, + erts_aint32_t *flagsp) + { + int res; + erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT; erts_aint32_t flags; ptp->u.alive.next = NULL; + if (ns_pthlp) + fail_flags |= ERTS_PTS_FLG_BUSY_PORT; erts_port_task_sched_lock(&pp->sched); flags = erts_smp_atomic32_read_nob(&pp->sched.flags); - if (!(flags & ERTS_PTS_FLG_EXIT)) { + if (flags & fail_flags) + res = 0; + else { + if (ns_pthlp) { + ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend; + pp->sched.taskq.local.busy.nosuspend = ns_pthlp; + } if (pp->sched.taskq.in.last) { ASSERT(pp->sched.taskq.in.first); ASSERT(!pp->sched.taskq.in.last->u.alive.next); @@ -744,9 +913,12 @@ enqueue_task(Port *pp, ErtsPortTask *ptp) pp->sched.taskq.in.first = ptp; } pp->sched.taskq.in.last = ptp; + flags = enqueue_proc2port_data(pp, sigdp, flags); + res = 1; } erts_port_task_sched_unlock(&pp->sched); - return flags; + *flagsp = flags; + return res; } static ERTS_INLINE void @@ -754,7 +926,7 @@ prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) { erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags); - if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY)) { + if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) { *execqp = pp->sched.taskq.local.first; *processing_busy_q_p = 0; } @@ -799,10 +971,9 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q) *execq = NULL; - /* guess a likely value */ - act = ERTS_PTS_FLG_EXEC; - if (execq) - act |= ERTS_PTS_FLG_HAVE_TASKS; + act = erts_smp_atomic32_read_nob(&pp->sched.flags); + if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) + act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq); while (1) { erts_aint32_t new, exp; @@ -829,9 +1000,12 @@ select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) { erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) + flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq); + ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); - if (flags & ERTS_PTS_FLG_BUSY) { + if (flags & ERTS_PTS_FLG_BUSY_PORT) { if (*processing_busy_q_p) { ErtsPortTask *ptp; @@ -880,7 +1054,7 @@ check_task_for_exec(Port *pp, ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p); - if ((flags & ERTS_PTS_FLG_BUSY) + if ((flags & ERTS_PTS_FLG_BUSY_PORT) && (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) { busy_wait_move_to_busy_queue(pp, ptp); @@ -902,7 +1076,7 @@ check_task_for_exec(Port *pp, else { /* Processing busy queue */ - ASSERT(!(flags & ERTS_PTS_FLG_BUSY)); + ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT)); ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p); @@ -1019,10 +1193,7 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp) erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks); break; case ERTS_PORT_TASK_PROC_SIG: - ptp->u.alive.td.psig.callback(NULL, - ERTS_PORT_SFLG_INVALID, - ERTS_PROC2PORT_SIG_ABORT, - &ptp->u.alive.td.psig.data); + ERTS_INTERNAL_ERROR("Aborted process to port signal"); break; default: break; @@ -1042,14 +1213,15 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp) void erts_port_task_abort_nosuspend_tasks(Port *pp) { + erts_aint32_t flags; ErtsPortTaskHandleList *abort_list; #ifdef ERTS_SMP ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID; #endif erts_port_task_sched_lock(&pp->sched); - erts_smp_atomic32_read_band_nob(&pp->sched.flags, - ~ERTS_PTS_FLG_HAVE_NS_TASKS); + flags = erts_smp_atomic32_read_band_nob(&pp->sched.flags, + ~ERTS_PTS_FLG_HAVE_NS_TASKS); abort_list = pp->sched.taskq.local.busy.nosuspend; pp->sched.taskq.local.busy.nosuspend = NULL; erts_port_task_sched_unlock(&pp->sched); @@ -1117,7 +1289,7 @@ erts_port_task_abort_nosuspend_tasks(Port *pp) #endif schedule_port_task_handle_list_free(pthlp); - abort_nosuspend_task(type, &td); + abort_nosuspend_task(pp, type, &td); } } @@ -1131,6 +1303,8 @@ erts_port_task_schedule(Eterm id, ErtsPortTaskType type, ...) { + ErtsProc2PortSigData *sigdp = NULL; + ErtsPortTaskHandleList *ns_pthlp = NULL; #ifdef ERTS_SMP ErtsRunQueue *xrunq; ErtsThrPrgrDelayHandle dhndl; @@ -1138,7 +1312,7 @@ erts_port_task_schedule(Eterm id, ErtsRunQueue *runq; Port *pp; ErtsPortTask *ptp = NULL; - erts_aint32_t act; + erts_aint32_t act, add_flags; if (pthp && erts_port_task_is_scheduled(pthp)) { ASSERT(0); @@ -1195,7 +1369,6 @@ erts_port_task_schedule(Eterm id, break; } case ERTS_PORT_TASK_PROC_SIG: { - ErtsProc2PortSigData *sigdp; va_list argp; ASSERT(!pthp); va_start(argp, type); @@ -1204,31 +1377,40 @@ erts_port_task_schedule(Eterm id, ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback); ptp->u.alive.flags |= va_arg(argp, int); va_end(argp); - if (ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND) - save_nosuspend_handle(pp, ptp); - else + if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND)) set_handle(ptp, pthp); + else { + ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST, + sizeof(ErtsPortTaskHandleList)); + set_handle(ptp, &ns_pthlp->handle); + } break; } default: break; } - act = enqueue_task(pp, ptp); - if (act & ERTS_PTS_FLG_EXIT) { + if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) { reset_handle(ptp); - goto fail; + if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT)) + goto abort_nosuspend; + else + goto fail; } + add_flags = ERTS_PTS_FLG_HAVE_TASKS; + if (ns_pthlp) + add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS; + while (1) { erts_aint32_t new, exp; - if ((act & ERTS_PTS_FLG_HAVE_TASKS) + if ((act & add_flags) == add_flags && (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) goto done; /* Done */ new = exp = act; - new |= ERTS_PTS_FLG_HAVE_TASKS; + new |= add_flags; if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) new |= ERTS_PTS_FLG_IN_RUNQ; @@ -1281,6 +1463,22 @@ done: return 0; +abort_nosuspend: + +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_port_dec_refc(pp); +#endif + + abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td); + + ASSERT(ns_pthlp); + erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp); + if (ptp) + port_task_free(ptp); + + return 0; + fail: #ifdef ERTS_SMP @@ -1288,6 +1486,9 @@ fail: erts_port_dec_refc(pp); #endif + if (ns_pthlp) + erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp); + if (ptp) port_task_free(ptp); @@ -1444,13 +1645,24 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) ptp->u.alive.td.io.event_data); io_tasks_executed++; break; - case ERTS_PORT_TASK_PROC_SIG: + case ERTS_PORT_TASK_PROC_SIG: { + ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); - reds += ptp->u.alive.td.psig.callback(pp, - state, - ERTS_PROC2PORT_SIG_EXEC, - &ptp->u.alive.td.psig.data); + if (!pp->sched.taskq.bpq) + reds += ptp->u.alive.td.psig.callback(pp, + state, + ERTS_PROC2PORT_SIG_EXEC, + sigdp); + else { + ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp); + reds += ptp->u.alive.td.psig.callback(pp, + state, + ERTS_PROC2PORT_SIG_EXEC, + sigdp); + dequeued_proc2port_data(pp, size); + } break; + } case ERTS_PORT_TASK_DIST_CMD: reds += erts_dist_command(pp, CONTEXT_REDS-reds); break; @@ -1633,12 +1845,23 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp) break; case ERTS_PORT_TASK_DIST_CMD: break; - case ERTS_PORT_TASK_PROC_SIG: - ptp->u.alive.td.psig.callback(NULL, - ERTS_PORT_SFLG_INVALID, - ERTS_PROC2PORT_SIG_ABORT_CLOSED, - &ptp->u.alive.td.psig.data); + case ERTS_PORT_TASK_PROC_SIG: { + ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data; + if (!pp->sched.taskq.bpq) + ptp->u.alive.td.psig.callback(NULL, + ERTS_PORT_SFLG_INVALID, + ERTS_PROC2PORT_SIG_ABORT_CLOSED, + sigdp); + else { + ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp); + ptp->u.alive.td.psig.callback(NULL, + ERTS_PORT_SFLG_INVALID, + ERTS_PROC2PORT_SIG_ABORT_CLOSED, + sigdp); + aborted_proc2port_data(pp, size); + } break; + } default: erl_exit(ERTS_ABORT_EXIT, "Invalid port task type: %d\n", -- cgit v1.2.3