aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/beam/bif.c6
-rw-r--r--erts/emulator/beam/dist.c46
-rw-r--r--erts/emulator/beam/erl_bif_port.c3
-rw-r--r--erts/emulator/beam/erl_node_tables.c6
-rw-r--r--erts/emulator/beam/erl_node_tables.h6
-rw-r--r--erts/emulator/beam/erl_port.h22
-rw-r--r--erts/emulator/beam/erl_port_task.c6
-rw-r--r--erts/emulator/beam/erl_port_task.h2
-rw-r--r--erts/emulator/beam/erl_process.c97
-rw-r--r--erts/emulator/beam/erl_process.h168
-rw-r--r--erts/emulator/beam/global.h2
-rw-r--r--erts/emulator/beam/io.c60
12 files changed, 287 insertions, 137 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 5ea441ad81..dc0dcbabd6 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -1908,7 +1908,7 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) {
/* We have waited for locks, trace schedule ports */
if (pt) {
- erts_aint32_t state;
+ erts_aint32_t flags;
if (IS_TRACED_FL(pt, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(pt, am_in, am_command);
}
@@ -1916,9 +1916,9 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) {
profile_runnable_port(pt, am_active);
}
- state = erts_atomic32_read_nob(&pt->state);
+ flags = erts_atomic32_read_rb(&pt->sched.flags);
/* XXX let port_command handle the busy stuff !!! */
- if (state & ERTS_PORT_SFLG_PORT_BUSY) {
+ if (flags & ERTS_PTS_FLG_BUSY) {
if (suspend) {
erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt);
if (erts_system_monitor_flags.busy_port) {
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 432b842848..f28d9dff09 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -171,11 +171,10 @@ get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
return NULL;
}
else {
- ErtsProcList *plp;
- plp = dep->suspended.first;
- dep->suspended.first = NULL;
- dep->suspended.last = NULL;
- return plp;
+ ErtsProcList *suspended = dep->suspended;
+ dep->suspended = NULL;
+ erts_proclist_fetch(&suspended, NULL);
+ return suspended;
}
}
@@ -1698,7 +1697,6 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
erts_smp_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(c_p);
- plp->next = NULL;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
suspended = 1;
erts_smp_mtx_lock(&dep->qlock);
@@ -1731,11 +1729,7 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
else {
/* Enqueue suspended process on dist entry */
ASSERT(plp);
- if (dep->suspended.last)
- dep->suspended.last->next = plp;
- else
- dep->suspended.first = plp;
- dep->suspended.last = plp;
+ erts_proclist_store_last(&dep->suspended, plp);
}
}
@@ -1914,7 +1908,7 @@ erts_dist_command(Port *prt, int reds_limit)
ErtsDistOutputQueue oq, foq;
DistEntry *dep = prt->dist_entry;
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
- erts_aint32_t state;
+ erts_aint32_t sched_flags;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -1957,12 +1951,12 @@ erts_dist_command(Port *prt, int reds_limit)
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
- state = erts_atomic32_read_nob(&prt->state);
+ sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
if (reds > reds_limit)
goto preempted;
- if (!(state & ERTS_PORT_SFLG_PORT_BUSY) && foq.first) {
+ if (!(sched_flags & ERTS_PTS_FLG_BUSY) && foq.first) {
int preempt = 0;
do {
Uint size;
@@ -1978,9 +1972,9 @@ erts_dist_command(Port *prt, int reds_limit)
obufsize += size_obuf(fob);
foq.first = foq.first->next;
free_dist_obuf(fob);
- state = erts_atomic32_read_nob(&prt->state);
- preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD);
- if (state & ERTS_PORT_SFLG_PORT_BUSY)
+ sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
+ if (sched_flags & ERTS_PTS_FLG_BUSY)
break;
} while (foq.first && !preempt);
if (!foq.first)
@@ -1989,7 +1983,7 @@ erts_dist_command(Port *prt, int reds_limit)
goto preempted;
}
- if (state & ERTS_PORT_SFLG_PORT_BUSY) {
+ if (sched_flags & ERTS_PTS_FLG_BUSY) {
if (oq.first) {
ErtsDistOutputBuf *ob;
int preempt;
@@ -2060,9 +2054,9 @@ erts_dist_command(Port *prt, int reds_limit)
obufsize += size_obuf(fob);
oq.first = oq.first->next;
free_dist_obuf(fob);
- state = erts_atomic32_read_nob(&prt->state);
- preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD);
- if ((state & ERTS_PORT_SFLG_PORT_BUSY) && oq.first && !preempt)
+ sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
+ if ((sched_flags & ERTS_PTS_FLG_BUSY) && oq.first && !preempt)
goto finalize_only;
}
@@ -2091,7 +2085,7 @@ erts_dist_command(Port *prt, int reds_limit)
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
obufsize = 0;
- if (!(state & ERTS_PORT_SFLG_PORT_BUSY)
+ if (!(sched_flags & ERTS_PTS_FLG_BUSY)
&& (dep->qflgs & ERTS_DE_QFLG_BUSY)
&& dep->qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
@@ -2145,7 +2139,7 @@ erts_dist_command(Port *prt, int reds_limit)
ASSERT(oq.first || !oq.last);
ASSERT(!oq.first || oq.last);
- if (state & ERTS_PORT_SFLGS_DEAD) {
+ if (sched_flags & ERTS_PTS_FLG_EXIT) {
/*
* Port died during port command; clean up 'oq'
* and 'foq'. Things buffered in dist entry after
@@ -2602,11 +2596,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
plp->next = NULL;
erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
erts_smp_mtx_lock(&dep->qlock);
- if (dep->suspended.last)
- dep->suspended.last->next = plp;
- else
- dep->suspended.first = plp;
- dep->suspended.last = plp;
+ erts_proclist_store_last(&dep->suspended, plp);
erts_smp_mtx_unlock(&dep->qlock);
goto yield;
}
diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c
index 31151dc9f5..477671f11f 100644
--- a/erts/emulator/beam/erl_bif_port.c
+++ b/erts/emulator/beam/erl_bif_port.c
@@ -168,7 +168,8 @@ do_port_command(Process *BIF_P, Eterm arg1, Eterm arg2, Eterm arg3,
ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP);
}
else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE)
- && (erts_atomic32_read_nob(&p->state) & ERTS_PORT_SFLG_PORT_BUSY)) {
+ && (erts_smp_atomic32_read_nob(&p->sched.flags)
+ & ERTS_PTS_FLG_BUSY)) {
if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) {
ERTS_BIF_PREP_RET(res, am_false);
}
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index 67f413ef28..6bda07e9b0 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -116,8 +116,7 @@ dist_table_alloc(void *dep_tmpl)
dep->qsize = 0;
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
- dep->suspended.first = NULL;
- dep->suspended.last = NULL;
+ dep->suspended = NULL;
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
@@ -769,8 +768,7 @@ void erts_init_node_tables(void)
erts_this_dist_entry->qsize = 0;
erts_this_dist_entry->out_queue.first = NULL;
erts_this_dist_entry->out_queue.last = NULL;
- erts_this_dist_entry->suspended.first = NULL;
- erts_this_dist_entry->suspended.last = NULL;
+ erts_this_dist_entry->suspended = NULL;
erts_this_dist_entry->finalized_out_queue.first = NULL;
erts_this_dist_entry->finalized_out_queue.last = NULL;
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index 3a74888522..af60071ea5 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -84,10 +84,6 @@ typedef struct {
} ErtsDistOutputQueue;
struct ErtsProcList_;
-typedef struct {
- struct ErtsProcList_ *first;
- struct ErtsProcList_ *last;
-} ErtsDistSuspended;
/*
* Lock order:
@@ -134,7 +130,7 @@ typedef struct dist_entry_ {
Uint32 qflgs;
Sint qsize;
ErtsDistOutputQueue out_queue;
- ErtsDistSuspended suspended;
+ struct ErtsProcList_ *suspended;
ErtsDistOutputQueue finalized_out_queue;
erts_smp_atomic_t dist_cmd_scheduled;
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index ce8d809245..00394f279e 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -214,22 +214,20 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
#define ERTS_PORT_SFLG_BINARY_IO ((Uint32) (1 << 3))
#define ERTS_PORT_SFLG_SOFT_EOF ((Uint32) (1 << 4))
/* Flow control */
-#define ERTS_PORT_SFLG_PORT_BUSY ((Uint32) (1 << 5))
/* Port is closing (no i/o accepted) */
-#define ERTS_PORT_SFLG_CLOSING ((Uint32) (1 << 6))
+#define ERTS_PORT_SFLG_CLOSING ((Uint32) (1 << 5))
/* Send a closed message when terminating */
-#define ERTS_PORT_SFLG_SEND_CLOSED ((Uint32) (1 << 7))
+#define ERTS_PORT_SFLG_SEND_CLOSED ((Uint32) (1 << 6))
/* Line orinted io on port */
-#define ERTS_PORT_SFLG_LINEBUF_IO ((Uint32) (1 << 8))
+#define ERTS_PORT_SFLG_LINEBUF_IO ((Uint32) (1 << 7))
/* Immortal port (only certain system ports) */
-#define ERTS_PORT_SFLG_IMMORTAL ((Uint32) (1 << 9))
-#define ERTS_PORT_SFLG_FREE ((Uint32) (1 << 10))
-#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 11))
+#define ERTS_PORT_SFLG_FREE ((Uint32) (1 << 8))
+#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 9))
/* Port uses port specific locking (opposed to driver specific locking) */
-#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 12))
-#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 13))
+#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 10))
+#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 11))
/* Last port to terminate halts the emulator */
-#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 14))
+#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 12))
#ifdef DEBUG
/* Only debug: make sure all flags aren't cleared unintentionally */
#define ERTS_PORT_SFLG_PORT_DEBUG ((Uint32) (1 << 31))
@@ -237,8 +235,7 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
/* Combinations of port status flags */
#define ERTS_PORT_SFLGS_DEAD \
- (ERTS_PORT_SFLG_FREE \
- | ERTS_PORT_SFLG_INITIALIZING)
+ (ERTS_PORT_SFLG_FREE | ERTS_PORT_SFLG_INITIALIZING)
#define ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \
(ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID)
#define ERTS_PORT_SFLGS_INVALID_LOOKUP \
@@ -246,7 +243,6 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
| ERTS_PORT_SFLG_CLOSING)
#define ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP \
(ERTS_PORT_SFLGS_INVALID_LOOKUP \
- | ERTS_PORT_SFLG_PORT_BUSY \
| ERTS_PORT_SFLG_DISTRIBUTION)
void print_port_info(Port *, int, void *);
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index edcd3112da..dceecda973 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -530,6 +530,7 @@ fail:
void
erts_port_task_free_port(Port *pp)
{
+ ErtsProcList *suspended;
erts_aint32_t flags;
ErtsRunQueue *runq;
@@ -542,6 +543,8 @@ erts_port_task_free_port(Port *pp)
erts_port_task_sched_lock(&pp->sched);
flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
ERTS_PTS_FLG_EXIT);
+ suspended = pp->suspended;
+ pp->suspended = NULL;
erts_port_task_sched_unlock(&pp->sched);
erts_atomic32_read_bset_relb(&pp->state,
(ERTS_PORT_SFLG_CLOSING
@@ -550,6 +553,9 @@ erts_port_task_free_port(Port *pp)
erts_smp_runq_unlock(runq);
+ if (erts_proclist_fetch(&suspended, NULL))
+ erts_resume_processes(suspended);
+
if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
begin_port_cleanup(pp, NULL);
}
diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h
index 3e87d5b778..71b1cc6e25 100644
--- a/erts/emulator/beam/erl_port_task.h
+++ b/erts/emulator/beam/erl_port_task.h
@@ -60,6 +60,8 @@ extern erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1)
#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2)
#define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3)
+#define ERTS_PTS_FLG_BUSY (((erts_aint32_t) 1) << 4)
+
typedef struct ErtsPortTask_ ErtsPortTask;
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 07b1ecdbb4..6cf267b01e 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -770,14 +770,6 @@ proclist_destroy(ErtsProcList *plp)
proclist_free(plp);
}
-static ERTS_INLINE int
-proclist_same(ErtsProcList *plp, Process *p)
-{
- return (plp->pid == p->common.id
- && (plp->started_interval
- == p->common.u.alive.started_interval));
-}
-
ErtsProcList *
erts_proclist_create(Process *p)
{
@@ -790,12 +782,6 @@ erts_proclist_destroy(ErtsProcList *plp)
proclist_destroy(plp);
}
-int
-erts_proclist_same(ErtsProcList *plp, Process *p)
-{
- return proclist_same(plp, p);
-}
-
void *
erts_psd_set_init(Process *p, ErtsProcLocks plocks, int ix, void *data)
{
@@ -4980,8 +4966,7 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
else if (on) { /* ------ BLOCK ------ */
if (schdlr_sspnd.msb.procs) {
plp = proclist_create(p);
- plp->next = schdlr_sspnd.msb.procs;
- schdlr_sspnd.msb.procs = plp;
+ erts_proclist_store_last(&schdlr_sspnd.msb.procs, plp);
p->flags |= F_HAVE_BLCKD_MSCHED;
ASSERT(erts_smp_atomic32_read_nob(&schdlr_sspnd.active) == 1);
ASSERT(p->scheduler_data->no == 1);
@@ -5066,8 +5051,7 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
~ERTS_SCHDLR_SSPND_CHNG_WAITER);
}
plp = proclist_create(p);
- plp->next = schdlr_sspnd.msb.procs;
- schdlr_sspnd.msb.procs = plp;
+ erts_proclist_store_last(&schdlr_sspnd.msb.procs, plp);
ASSERT(p->scheduler_data);
}
}
@@ -5078,20 +5062,16 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
}
else { /* ------ UNBLOCK ------ */
if (p->flags & F_HAVE_BLCKD_MSCHED) {
- ErtsProcList **plpp = &schdlr_sspnd.msb.procs;
- plp = schdlr_sspnd.msb.procs;
+ ErtsProcList *plp = erts_proclist_peek_first(schdlr_sspnd.msb.procs);
while (plp) {
- if (!proclist_same(plp, p)){
- plpp = &plp->next;
- plp = plp->next;
- }
- else {
- *plpp = plp->next;
- proclist_destroy(plp);
+ ErtsProcList *tmp_plp = plp;
+ plp = erts_proclist_peek_next(schdlr_sspnd.msb.procs, plp);
+ if (erts_proclist_same(tmp_plp, p)) {
+ erts_proclist_remove(&schdlr_sspnd.msb.procs, tmp_plp);
+ proclist_destroy(tmp_plp);
if (!all)
break;
- plp = *plpp;
}
}
}
@@ -5160,23 +5140,25 @@ erts_multi_scheduling_blockers(Process *p)
Eterm res = NIL;
erts_smp_mtx_lock(&schdlr_sspnd.mtx);
- if (schdlr_sspnd.msb.procs) {
+ if (!erts_proclist_is_empty(schdlr_sspnd.msb.procs)) {
Eterm *hp, *hp_end;
ErtsProcList *plp1, *plp2;
- Uint max_size;
- ASSERT(schdlr_sspnd.msb.procs);
- for (max_size = 0, plp1 = schdlr_sspnd.msb.procs;
+ Uint max_size = 0;
+
+ for (plp1 = erts_proclist_peek_first(schdlr_sspnd.msb.procs);
plp1;
- plp1 = plp1->next) {
+ plp1 = erts_proclist_peek_next(schdlr_sspnd.msb.procs, plp1)) {
max_size += 2;
}
ASSERT(max_size);
hp = HAlloc(p, max_size);
hp_end = hp + max_size;
- for (plp1 = schdlr_sspnd.msb.procs; plp1; plp1 = plp1->next) {
- for (plp2 = schdlr_sspnd.msb.procs;
+ for (plp1 = erts_proclist_peek_first(schdlr_sspnd.msb.procs);
+ plp1;
+ plp1 = erts_proclist_peek_next(schdlr_sspnd.msb.procs, plp1)) {
+ for (plp2 = erts_proclist_peek_first(schdlr_sspnd.msb.procs);
plp2->pid != plp1->pid;
- plp2 = plp2->next);
+ plp2 = erts_proclist_peek_next(schdlr_sspnd.msb.procs, plp2));
if (plp2 == plp1) {
res = CONS(hp, plp1->pid, res);
hp += 2;
@@ -6081,23 +6063,25 @@ erts_process_status(Process *c_p, ErtsProcLocks c_p_locks,
void
erts_suspend(Process* c_p, ErtsProcLocks c_p_locks, Port *busy_port)
{
-#ifdef DEBUG
- int res;
-#endif
+ int suspend;
+
ASSERT(c_p == erts_get_current_process());
ERTS_SMP_LC_ASSERT(c_p_locks == erts_proc_lc_my_proc_locks(c_p));
if (!(c_p_locks & ERTS_PROC_LOCK_STATUS))
erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
+ if (busy_port)
+ suspend = erts_save_suspend_process_on_port(busy_port, c_p);
+ else
+ suspend = 1;
+
+ if (suspend) {
#ifdef DEBUG
- res =
+ int res =
#endif
- suspend_process(c_p, c_p);
-
- ASSERT(res);
-
- if (busy_port)
- erts_wake_process_later(busy_port, c_p);
+ suspend_process(c_p, c_p);
+ ASSERT(res);
+ }
if (!(c_p_locks & ERTS_PROC_LOCK_STATUS))
erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
@@ -6116,16 +6100,19 @@ erts_resume(Process* process, ErtsProcLocks process_locks)
}
int
-erts_resume_processes(ErtsProcList *plp)
+erts_resume_processes(ErtsProcList *list)
{
+ /* 'list' is expected to have been fetched (i.e. not a ring anymore) */
int nresumed = 0;
+ ErtsProcList *plp = list;
+
while (plp) {
Process *proc;
ErtsProcList *fplp;
ASSERT(is_internal_pid(plp->pid));
proc = erts_pid2proc(NULL, 0, plp->pid, ERTS_PROC_LOCK_STATUS);
if (proc) {
- if (proclist_same(plp, proc)) {
+ if (erts_proclist_same(plp, proc)) {
resume_process(proc);
nresumed++;
}
@@ -6354,9 +6341,8 @@ Process *schedule(Process *p, int calls)
#ifdef ERTS_SMP
{
ErtsProcList *pnd_xtrs = rq->procs.pending_exiters;
- rq->procs.pending_exiters = NULL;
-
- if (pnd_xtrs) {
+ if (erts_proclist_fetch(&pnd_xtrs, NULL)) {
+ rq->procs.pending_exiters = NULL;
erts_smp_runq_unlock(rq);
handle_pending_exiters(pnd_xtrs);
erts_smp_runq_lock(rq);
@@ -7700,12 +7686,14 @@ erts_handle_pending_exit(Process *c_p, ErtsProcLocks locks)
static void
handle_pending_exiters(ErtsProcList *pnd_xtrs)
{
+ /* 'list' is expected to have been fetched (i.e. not a ring anymore) */
ErtsProcList *plp = pnd_xtrs;
- ErtsProcList *free_plp;
+
while (plp) {
+ ErtsProcList *free_plp;
Process *p = erts_pid2proc(NULL, 0, plp->pid, ERTS_PROC_LOCKS_ALL);
if (p) {
- if (proclist_same(plp, p)) {
+ if (erts_proclist_same(plp, p)) {
erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state);
if (!(state & ERTS_PSFLG_RUNNING)) {
ASSERT(state & ERTS_PSFLG_PENDING_EXIT);
@@ -7734,8 +7722,7 @@ save_pending_exiter(Process *p)
erts_smp_runq_lock(rq);
- plp->next = rq->procs.pending_exiters;
- rq->procs.pending_exiters = plp;
+ erts_proclist_store_last(&rq->procs.pending_exiters, plp);
erts_smp_runq_unlock(rq);
wake_scheduler(rq, 1);
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 1feb5b96db..d613321421 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -290,6 +290,7 @@ struct ErtsProcList_ {
Eterm pid;
Uint64 started_interval;
ErtsProcList* next;
+ ErtsProcList* prev;
};
typedef struct ErtsMiscOpList_ ErtsMiscOpList;
@@ -1135,7 +1136,172 @@ Uint64 erts_step_proc_interval(void);
ErtsProcList *erts_proclist_create(Process *);
void erts_proclist_destroy(ErtsProcList *);
-int erts_proclist_same(ErtsProcList *, Process *);
+
+ERTS_GLB_INLINE int erts_proclist_same(ErtsProcList *, Process *);
+ERTS_GLB_INLINE void erts_proclist_store_first(ErtsProcList **, ErtsProcList *);
+ERTS_GLB_INLINE void erts_proclist_store_last(ErtsProcList **, ErtsProcList *);
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_first(ErtsProcList *);
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_last(ErtsProcList *);
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_next(ErtsProcList *, ErtsProcList *);
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_prev(ErtsProcList *, ErtsProcList *);
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_fetch_first(ErtsProcList **);
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_fetch_last(ErtsProcList **);
+ERTS_GLB_INLINE int erts_proclist_fetch(ErtsProcList **, ErtsProcList **);
+ERTS_GLB_INLINE void erts_proclist_remove(ErtsProcList **, ErtsProcList *);
+ERTS_GLB_INLINE int erts_proclist_is_empty(ErtsProcList *);
+ERTS_GLB_INLINE int erts_proclist_is_first(ErtsProcList *, ErtsProcList *);
+ERTS_GLB_INLINE int erts_proclist_is_last(ErtsProcList *, ErtsProcList *);
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE int
+erts_proclist_same(ErtsProcList *plp, Process *p)
+{
+ return (plp->pid == p->common.id
+ && (plp->started_interval
+ == p->common.u.alive.started_interval));
+}
+
+ERTS_GLB_INLINE void erts_proclist_store_first(ErtsProcList **list,
+ ErtsProcList *element)
+{
+ if (!*list)
+ element->next = element->prev = element;
+ else {
+ element->prev = (*list)->prev;
+ element->next = *list;
+ element->prev->next = element;
+ element->next->prev = element;
+ }
+ *list = element;
+}
+
+ERTS_GLB_INLINE void erts_proclist_store_last(ErtsProcList **list,
+ ErtsProcList *element)
+{
+ if (!*list) {
+ element->next = element->prev = element;
+ *list = element;
+ }
+ else {
+ element->prev = (*list)->prev;
+ element->next = *list;
+ element->prev->next = element;
+ element->next->prev = element;
+ }
+}
+
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_first(ErtsProcList *list)
+{
+ return list;
+}
+
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_last(ErtsProcList *list)
+{
+ if (!list)
+ return NULL;
+ else
+ return list->prev;
+}
+
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_next(ErtsProcList *list,
+ ErtsProcList *element)
+{
+ ErtsProcList *next;
+ ASSERT(list && element);
+ next = element->next;
+ return list == next ? NULL : next;
+}
+
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_prev(ErtsProcList *list,
+ ErtsProcList *element)
+{
+ ErtsProcList *prev;
+ ASSERT(list && element);
+ prev = element->prev;
+ return list == element ? NULL : prev;
+}
+
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_fetch_first(ErtsProcList **list)
+{
+ if (!*list)
+ return NULL;
+ else {
+ ErtsProcList *res = *list;
+ if (res == *list)
+ *list = NULL;
+ else
+ *list = res->next;
+ res->next->prev = res->prev;
+ res->prev->next = res->next;
+ return res;
+ }
+}
+
+ERTS_GLB_INLINE ErtsProcList *erts_proclist_fetch_last(ErtsProcList **list)
+{
+ if (!*list)
+ return NULL;
+ else {
+ ErtsProcList *res = (*list)->prev;
+ if (res == *list)
+ *list = NULL;
+ res->next->prev = res->prev;
+ res->prev->next = res->next;
+ return res;
+ }
+}
+
+ERTS_GLB_INLINE int erts_proclist_fetch(ErtsProcList **list_first,
+ ErtsProcList **list_last)
+{
+ if (!*list_first) {
+ if (list_last)
+ *list_last = NULL;
+ return 0;
+ }
+ else {
+ if (list_last)
+ *list_last = (*list_first)->prev;
+ (*list_first)->prev->next = NULL;
+ (*list_first)->prev = NULL;
+ return !0;
+ }
+}
+
+ERTS_GLB_INLINE void erts_proclist_remove(ErtsProcList **list,
+ ErtsProcList *element)
+{
+ ASSERT(list && *list);
+ if (*list == element) {
+ *list = element->next;
+ if (*list == element)
+ *list = NULL;
+ }
+ element->next->prev = element->prev;
+ element->prev->next = element->next;
+}
+
+ERTS_GLB_INLINE int erts_proclist_is_empty(ErtsProcList *list)
+{
+ return list == NULL;
+}
+
+ERTS_GLB_INLINE int erts_proclist_is_first(ErtsProcList *list,
+ ErtsProcList *element)
+{
+ ASSERT(list && element);
+ return list == element;
+}
+
+ERTS_GLB_INLINE int erts_proclist_is_last(ErtsProcList *list,
+ ErtsProcList *element)
+{
+ ASSERT(list && element);
+ return list->prev == element;
+}
+
+#endif
void erts_schedule_thr_prgr_later_op(void (*)(void *),
void *,
diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h
index bce4619645..6713481a04 100644
--- a/erts/emulator/beam/global.h
+++ b/erts/emulator/beam/global.h
@@ -895,7 +895,7 @@ typedef struct {
int erts_add_driver_entry(ErlDrvEntry *drv, DE_Handle *handle, int driver_list_locked);
void erts_destroy_driver(erts_driver_t *drv);
-void erts_wake_process_later(Port*, Process*);
+int erts_save_suspend_process_on_port(Port*, Process*);
Port *erts_open_driver(erts_driver_t*, Eterm, char*, SysDriverOpts*, int *, int *);
int erts_is_port_ioq_empty(Port *);
void erts_terminate_port(Port *);
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 9d38e5539e..96a89a3b4e 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -476,23 +476,19 @@ static void stopq(Port* prt)
}
}
-void
-erts_wake_process_later(Port *prt, Process *process)
+int
+erts_save_suspend_process_on_port(Port *prt, Process *process)
{
- ErtsProcList** p;
- ErtsProcList* new_p;
-
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-
- if (erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD)
- return;
-
- for (p = &(prt->suspended); *p != NULL; p = &((*p)->next))
- /* Empty loop body */;
-
- new_p = erts_proclist_create(process);
- new_p->next = NULL;
- *p = new_p;
+ int saved;
+ erts_aint32_t flags;
+ erts_port_task_sched_lock(&prt->sched);
+ flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ saved = ((flags & (ERTS_PTS_FLG_BUSY
+ | ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY);
+ if (saved)
+ erts_proclist_store_last(&prt->suspended, erts_proclist_create(process));
+ erts_port_task_sched_unlock(&prt->sched);
+ return saved;
}
/*
@@ -2094,9 +2090,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
#endif
state = erts_atomic32_read_nob(&p->state);
- if ((state & (ERTS_PORT_SFLGS_DEAD
- | ERTS_PORT_SFLG_EXITING
- | ERTS_PORT_SFLG_IMMORTAL))
+ if ((state & (ERTS_PORT_SFLGS_DEAD|ERTS_PORT_SFLG_EXITING))
|| ((reason == am_normal) &&
((from != p->connected) && (from != p->common.id)))) {
return;
@@ -2468,6 +2462,8 @@ void
set_busy_port(ErlDrvPort port_num, int on)
{
Port *prt;
+ erts_aint32_t flags;
+
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(port_str, 16);
#endif
@@ -2479,8 +2475,10 @@ set_busy_port(ErlDrvPort port_num, int on)
return;
if (on) {
- erts_atomic32_read_bor_relb(&prt->state,
- ERTS_PORT_SFLG_PORT_BUSY);
+ flags = erts_smp_atomic32_read_bor_nob(&prt->sched.flags,
+ ERTS_PTS_FLG_BUSY);
+ if (flags & ERTS_PTS_FLG_BUSY)
+ return; /* Already busy */
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_busy)) {
erts_snprintf(port_str, sizeof(port_str),
@@ -2489,10 +2487,12 @@ set_busy_port(ErlDrvPort port_num, int on)
}
#endif
} else {
- ErtsProcList* plp = prt->suspended;
- erts_atomic32_read_band_relb(&prt->state,
- ~ERTS_PORT_SFLG_PORT_BUSY);
- prt->suspended = NULL;
+ ErtsProcList *plp;
+
+ flags = erts_smp_atomic32_read_band_nob(&prt->sched.flags,
+ ~ERTS_PTS_FLG_BUSY);
+ if (!(flags & ERTS_PTS_FLG_BUSY))
+ return; /* Already non-busy */
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_not_busy)) {
@@ -2517,7 +2517,13 @@ set_busy_port(ErlDrvPort port_num, int on)
* the first process.
*/
- if (plp) {
+ erts_port_task_sched_lock(&prt->sched);
+ plp = prt->suspended;
+ prt->suspended = NULL;
+ erts_port_task_sched_unlock(&prt->sched);
+
+ if (erts_proclist_fetch(&plp, NULL)) {
+
#ifdef USE_VM_PROBES
/*
* Hrm, for blocked dist ports, plp always seems to be NULL.
@@ -2540,8 +2546,10 @@ set_busy_port(ErlDrvPort port_num, int on)
}
}
#endif
+
/* First proc should be resumed last */
if (plp->next) {
+ plp->next->prev = NULL;
erts_resume_processes(plp->next);
plp->next = NULL;
}