aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/beam/erl_port_task.c66
1 files changed, 56 insertions, 10 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index b661c26036..8ceadcdb8c 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -120,7 +120,9 @@ struct ErtsPortTaskBusyCallerTable_ {
};
-static void begin_port_cleanup(Port *pp, ErtsPortTask **execq);
+static void begin_port_cleanup(Port *pp,
+ ErtsPortTask **execq,
+ int *processing_busy_q_p);
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task,
ErtsPortTask,
@@ -1525,7 +1527,7 @@ erts_port_task_free_port(Port *pp)
erts_resume_processes(suspended);
if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
- begin_port_cleanup(pp, NULL);
+ begin_port_cleanup(pp, NULL, NULL);
}
/*
@@ -1681,8 +1683,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
begin_handle_tasks:
if (state & ERTS_PORT_SFLG_FREE) {
reds += ERTS_PORT_REDS_FREE;
-
- begin_port_cleanup(pp, &execq);
+ begin_port_cleanup(pp, &execq, &processing_busy_q);
break;
}
@@ -1773,22 +1774,66 @@ release_port(void *vport)
#endif
static void
-begin_port_cleanup(Port *pp, ErtsPortTask **execqp)
+begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
int i, max;
- ErtsPortTask *qs[2];
+ ErtsPortTaskBusyCallerTable *tabp;
+ ErtsPortTask *qs[3];
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
/*
- * Handle remaining tasks...
+ * Abort remaining tasks...
+ *
+ * We want to process queues in the following order in order
+ * to preserve signal ordering guarantees:
+ * 1. Local busy queue
+ * 2. Local queue
+ * 3. In queue
*/
max = 0;
- if (execqp && *execqp) {
- qs[max++] = *execqp;
+ if (!execqp) {
+ if (pp->sched.taskq.local.busy.first)
+ qs[max++] = pp->sched.taskq.local.busy.first;
+ if (pp->sched.taskq.local.first)
+ qs[max++] = pp->sched.taskq.local.first;
+ }
+ else {
+ if (*processing_busy_q_p) {
+ if (*execqp)
+ qs[max++] = *execqp;
+ if (pp->sched.taskq.local.first)
+ qs[max++] = pp->sched.taskq.local.first;
+ }
+ else {
+ if (pp->sched.taskq.local.busy.first)
+ qs[max++] = pp->sched.taskq.local.busy.first;
+ if (*execqp)
+ qs[max++] = *execqp;
+ }
*execqp = NULL;
+ *processing_busy_q_p = 0;
+ }
+ pp->sched.taskq.local.busy.first = NULL;
+ pp->sched.taskq.local.busy.last = NULL;
+ pp->sched.taskq.local.first = NULL;
+ tabp = pp->sched.taskq.local.busy.table;
+ if (tabp) {
+ int bix;
+ for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
+ ErtsPortTaskBusyCaller *bcp = tabp->bucket[bix];
+ while (bcp) {
+ ErtsPortTaskBusyCaller *free_bcp = bcp;
+ bcp = bcp->next;
+ if (free_bcp != &tabp->pre_alloc_busy_caller)
+ erts_free(ERTS_ALC_T_BUSY_CALLER, free_bcp);
+ }
+ }
+
+ busy_caller_table_free(tabp);
+ pp->sched.taskq.local.busy.table = NULL;
}
erts_port_task_sched_lock(&pp->sched);
@@ -1875,7 +1920,8 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp)
}
erts_smp_atomic32_read_band_nob(&pp->sched.flags,
- ~ERTS_PTS_FLG_HAVE_TASKS);
+ ~(ERTS_PTS_FLG_HAVE_BUSY_TASKS
+ |ERTS_PTS_FLG_HAVE_TASKS));
/*
* Schedule cleanup of port structure...