diff options
author | Rickard Green <[email protected]> | 2012-09-14 21:42:11 +0200 |
---|---|---|
committer | Rickard Green <[email protected]> | 2012-12-03 21:18:09 +0100 |
commit | 34fc6f243f8a462f4b2370a9fe5376df1ca08f1d (patch) | |
tree | 1a7377203f573c40269f12b59611a68e0859c627 | |
parent | 620c8c5bfe4c2b306a7bc0a7d41749bddea4ee62 (diff) | |
download | otp-34fc6f243f8a462f4b2370a9fe5376df1ca08f1d.tar.gz otp-34fc6f243f8a462f4b2370a9fe5376df1ca08f1d.tar.bz2 otp-34fc6f243f8a462f4b2370a9fe5376df1ca08f1d.zip |
Move busy port flag
-rw-r--r-- | erts/emulator/beam/bif.c | 6 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 46 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_port.c | 3 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.c | 6 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.h | 6 | ||||
-rw-r--r-- | erts/emulator/beam/erl_port.h | 22 | ||||
-rw-r--r-- | erts/emulator/beam/erl_port_task.c | 6 | ||||
-rw-r--r-- | erts/emulator/beam/erl_port_task.h | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 97 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 168 | ||||
-rw-r--r-- | erts/emulator/beam/global.h | 2 | ||||
-rw-r--r-- | erts/emulator/beam/io.c | 60 |
12 files changed, 287 insertions, 137 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 5ea441ad81..dc0dcbabd6 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -1908,7 +1908,7 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { /* We have waited for locks, trace schedule ports */ if (pt) { - erts_aint32_t state; + erts_aint32_t flags; if (IS_TRACED_FL(pt, F_TRACE_SCHED_PORTS)) { trace_sched_ports_where(pt, am_in, am_command); } @@ -1916,9 +1916,9 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { profile_runnable_port(pt, am_active); } - state = erts_atomic32_read_nob(&pt->state); + flags = erts_atomic32_read_rb(&pt->sched.flags); /* XXX let port_command handle the busy stuff !!! */ - if (state & ERTS_PORT_SFLG_PORT_BUSY) { + if (flags & ERTS_PTS_FLG_BUSY) { if (suspend) { erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt); if (erts_system_monitor_flags.busy_port) { diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 432b842848..f28d9dff09 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -171,11 +171,10 @@ get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs) return NULL; } else { - ErtsProcList *plp; - plp = dep->suspended.first; - dep->suspended.first = NULL; - dep->suspended.last = NULL; - return plp; + ErtsProcList *suspended = dep->suspended; + dep->suspended = NULL; + erts_proclist_fetch(&suspended, NULL); + return suspended; } } @@ -1698,7 +1697,6 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy) erts_smp_mtx_unlock(&dep->qlock); plp = erts_proclist_create(c_p); - plp->next = NULL; erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); suspended = 1; erts_smp_mtx_lock(&dep->qlock); @@ -1731,11 +1729,7 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy) else { /* Enqueue suspended process on dist entry */ ASSERT(plp); - if (dep->suspended.last) - dep->suspended.last->next = plp; - else - dep->suspended.first = plp; - dep->suspended.last = plp; + erts_proclist_store_last(&dep->suspended, plp); } } @@ -1914,7 +1908,7 @@ erts_dist_command(Port *prt, int reds_limit) ErtsDistOutputQueue oq, foq; DistEntry *dep = prt->dist_entry; Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); - erts_aint32_t state; + erts_aint32_t sched_flags; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -1957,12 +1951,12 @@ erts_dist_command(Port *prt, int reds_limit) dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; - state = erts_atomic32_read_nob(&prt->state); + sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); if (reds > reds_limit) goto preempted; - if (!(state & ERTS_PORT_SFLG_PORT_BUSY) && foq.first) { + if (!(sched_flags & ERTS_PTS_FLG_BUSY) && foq.first) { int preempt = 0; do { Uint size; @@ -1978,9 +1972,9 @@ erts_dist_command(Port *prt, int reds_limit) obufsize += size_obuf(fob); foq.first = foq.first->next; free_dist_obuf(fob); - state = erts_atomic32_read_nob(&prt->state); - preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD); - if (state & ERTS_PORT_SFLG_PORT_BUSY) + sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + if (sched_flags & ERTS_PTS_FLG_BUSY) break; } while (foq.first && !preempt); if (!foq.first) @@ -1989,7 +1983,7 @@ erts_dist_command(Port *prt, int reds_limit) goto preempted; } - if (state & ERTS_PORT_SFLG_PORT_BUSY) { + if (sched_flags & ERTS_PTS_FLG_BUSY) { if (oq.first) { ErtsDistOutputBuf *ob; int preempt; @@ -2060,9 +2054,9 @@ erts_dist_command(Port *prt, int reds_limit) obufsize += size_obuf(fob); oq.first = oq.first->next; free_dist_obuf(fob); - state = erts_atomic32_read_nob(&prt->state); - preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD); - if ((state & ERTS_PORT_SFLG_PORT_BUSY) && oq.first && !preempt) + sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + if ((sched_flags & ERTS_PTS_FLG_BUSY) && oq.first && !preempt) goto finalize_only; } @@ -2091,7 +2085,7 @@ erts_dist_command(Port *prt, int reds_limit) ASSERT(dep->qsize >= obufsize); dep->qsize -= obufsize; obufsize = 0; - if (!(state & ERTS_PORT_SFLG_PORT_BUSY) + if (!(sched_flags & ERTS_PTS_FLG_BUSY) && (dep->qflgs & ERTS_DE_QFLG_BUSY) && dep->qsize < erts_dist_buf_busy_limit) { ErtsProcList *suspendees; @@ -2145,7 +2139,7 @@ erts_dist_command(Port *prt, int reds_limit) ASSERT(oq.first || !oq.last); ASSERT(!oq.first || oq.last); - if (state & ERTS_PORT_SFLGS_DEAD) { + if (sched_flags & ERTS_PTS_FLG_EXIT) { /* * Port died during port command; clean up 'oq' * and 'foq'. Things buffered in dist entry after @@ -2602,11 +2596,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) plp->next = NULL; erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); erts_smp_mtx_lock(&dep->qlock); - if (dep->suspended.last) - dep->suspended.last->next = plp; - else - dep->suspended.first = plp; - dep->suspended.last = plp; + erts_proclist_store_last(&dep->suspended, plp); erts_smp_mtx_unlock(&dep->qlock); goto yield; } diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c index 31151dc9f5..477671f11f 100644 --- a/erts/emulator/beam/erl_bif_port.c +++ b/erts/emulator/beam/erl_bif_port.c @@ -168,7 +168,8 @@ do_port_command(Process *BIF_P, Eterm arg1, Eterm arg2, Eterm arg3, ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP); } else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE) - && (erts_atomic32_read_nob(&p->state) & ERTS_PORT_SFLG_PORT_BUSY)) { + && (erts_smp_atomic32_read_nob(&p->sched.flags) + & ERTS_PTS_FLG_BUSY)) { if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) { ERTS_BIF_PREP_RET(res, am_false); } diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 67f413ef28..6bda07e9b0 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -116,8 +116,7 @@ dist_table_alloc(void *dep_tmpl) dep->qsize = 0; dep->out_queue.first = NULL; dep->out_queue.last = NULL; - dep->suspended.first = NULL; - dep->suspended.last = NULL; + dep->suspended = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; @@ -769,8 +768,7 @@ void erts_init_node_tables(void) erts_this_dist_entry->qsize = 0; erts_this_dist_entry->out_queue.first = NULL; erts_this_dist_entry->out_queue.last = NULL; - erts_this_dist_entry->suspended.first = NULL; - erts_this_dist_entry->suspended.last = NULL; + erts_this_dist_entry->suspended = NULL; erts_this_dist_entry->finalized_out_queue.first = NULL; erts_this_dist_entry->finalized_out_queue.last = NULL; diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 3a74888522..af60071ea5 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -84,10 +84,6 @@ typedef struct { } ErtsDistOutputQueue; struct ErtsProcList_; -typedef struct { - struct ErtsProcList_ *first; - struct ErtsProcList_ *last; -} ErtsDistSuspended; /* * Lock order: @@ -134,7 +130,7 @@ typedef struct dist_entry_ { Uint32 qflgs; Sint qsize; ErtsDistOutputQueue out_queue; - ErtsDistSuspended suspended; + struct ErtsProcList_ *suspended; ErtsDistOutputQueue finalized_out_queue; erts_smp_atomic_t dist_cmd_scheduled; diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index ce8d809245..00394f279e 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -214,22 +214,20 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ #define ERTS_PORT_SFLG_BINARY_IO ((Uint32) (1 << 3)) #define ERTS_PORT_SFLG_SOFT_EOF ((Uint32) (1 << 4)) /* Flow control */ -#define ERTS_PORT_SFLG_PORT_BUSY ((Uint32) (1 << 5)) /* Port is closing (no i/o accepted) */ -#define ERTS_PORT_SFLG_CLOSING ((Uint32) (1 << 6)) +#define ERTS_PORT_SFLG_CLOSING ((Uint32) (1 << 5)) /* Send a closed message when terminating */ -#define ERTS_PORT_SFLG_SEND_CLOSED ((Uint32) (1 << 7)) +#define ERTS_PORT_SFLG_SEND_CLOSED ((Uint32) (1 << 6)) /* Line orinted io on port */ -#define ERTS_PORT_SFLG_LINEBUF_IO ((Uint32) (1 << 8)) +#define ERTS_PORT_SFLG_LINEBUF_IO ((Uint32) (1 << 7)) /* Immortal port (only certain system ports) */ -#define ERTS_PORT_SFLG_IMMORTAL ((Uint32) (1 << 9)) -#define ERTS_PORT_SFLG_FREE ((Uint32) (1 << 10)) -#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 11)) +#define ERTS_PORT_SFLG_FREE ((Uint32) (1 << 8)) +#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 9)) /* Port uses port specific locking (opposed to driver specific locking) */ -#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 12)) -#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 13)) +#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 10)) +#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 11)) /* Last port to terminate halts the emulator */ -#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 14)) +#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 12)) #ifdef DEBUG /* Only debug: make sure all flags aren't cleared unintentionally */ #define ERTS_PORT_SFLG_PORT_DEBUG ((Uint32) (1 << 31)) @@ -237,8 +235,7 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ /* Combinations of port status flags */ #define ERTS_PORT_SFLGS_DEAD \ - (ERTS_PORT_SFLG_FREE \ - | ERTS_PORT_SFLG_INITIALIZING) + (ERTS_PORT_SFLG_FREE | ERTS_PORT_SFLG_INITIALIZING) #define ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \ (ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID) #define ERTS_PORT_SFLGS_INVALID_LOOKUP \ @@ -246,7 +243,6 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ | ERTS_PORT_SFLG_CLOSING) #define ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP \ (ERTS_PORT_SFLGS_INVALID_LOOKUP \ - | ERTS_PORT_SFLG_PORT_BUSY \ | ERTS_PORT_SFLG_DISTRIBUTION) void print_port_info(Port *, int, void *); diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index edcd3112da..dceecda973 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -530,6 +530,7 @@ fail: void erts_port_task_free_port(Port *pp) { + ErtsProcList *suspended; erts_aint32_t flags; ErtsRunQueue *runq; @@ -542,6 +543,8 @@ erts_port_task_free_port(Port *pp) erts_port_task_sched_lock(&pp->sched); flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags, ERTS_PTS_FLG_EXIT); + suspended = pp->suspended; + pp->suspended = NULL; erts_port_task_sched_unlock(&pp->sched); erts_atomic32_read_bset_relb(&pp->state, (ERTS_PORT_SFLG_CLOSING @@ -550,6 +553,9 @@ erts_port_task_free_port(Port *pp) erts_smp_runq_unlock(runq); + if (erts_proclist_fetch(&suspended, NULL)) + erts_resume_processes(suspended); + if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) begin_port_cleanup(pp, NULL); } diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h index 3e87d5b778..71b1cc6e25 100644 --- a/erts/emulator/beam/erl_port_task.h +++ b/erts/emulator/beam/erl_port_task.h @@ -60,6 +60,8 @@ extern erts_smp_atomic_t erts_port_task_outstanding_io_tasks; #define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1) #define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2) #define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3) +#define ERTS_PTS_FLG_BUSY (((erts_aint32_t) 1) << 4) + typedef struct ErtsPortTask_ ErtsPortTask; diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 07b1ecdbb4..6cf267b01e 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -770,14 +770,6 @@ proclist_destroy(ErtsProcList *plp) proclist_free(plp); } -static ERTS_INLINE int -proclist_same(ErtsProcList *plp, Process *p) -{ - return (plp->pid == p->common.id - && (plp->started_interval - == p->common.u.alive.started_interval)); -} - ErtsProcList * erts_proclist_create(Process *p) { @@ -790,12 +782,6 @@ erts_proclist_destroy(ErtsProcList *plp) proclist_destroy(plp); } -int -erts_proclist_same(ErtsProcList *plp, Process *p) -{ - return proclist_same(plp, p); -} - void * erts_psd_set_init(Process *p, ErtsProcLocks plocks, int ix, void *data) { @@ -4980,8 +4966,7 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) else if (on) { /* ------ BLOCK ------ */ if (schdlr_sspnd.msb.procs) { plp = proclist_create(p); - plp->next = schdlr_sspnd.msb.procs; - schdlr_sspnd.msb.procs = plp; + erts_proclist_store_last(&schdlr_sspnd.msb.procs, plp); p->flags |= F_HAVE_BLCKD_MSCHED; ASSERT(erts_smp_atomic32_read_nob(&schdlr_sspnd.active) == 1); ASSERT(p->scheduler_data->no == 1); @@ -5066,8 +5051,7 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) ~ERTS_SCHDLR_SSPND_CHNG_WAITER); } plp = proclist_create(p); - plp->next = schdlr_sspnd.msb.procs; - schdlr_sspnd.msb.procs = plp; + erts_proclist_store_last(&schdlr_sspnd.msb.procs, plp); ASSERT(p->scheduler_data); } } @@ -5078,20 +5062,16 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) } else { /* ------ UNBLOCK ------ */ if (p->flags & F_HAVE_BLCKD_MSCHED) { - ErtsProcList **plpp = &schdlr_sspnd.msb.procs; - plp = schdlr_sspnd.msb.procs; + ErtsProcList *plp = erts_proclist_peek_first(schdlr_sspnd.msb.procs); while (plp) { - if (!proclist_same(plp, p)){ - plpp = &plp->next; - plp = plp->next; - } - else { - *plpp = plp->next; - proclist_destroy(plp); + ErtsProcList *tmp_plp = plp; + plp = erts_proclist_peek_next(schdlr_sspnd.msb.procs, plp); + if (erts_proclist_same(tmp_plp, p)) { + erts_proclist_remove(&schdlr_sspnd.msb.procs, tmp_plp); + proclist_destroy(tmp_plp); if (!all) break; - plp = *plpp; } } } @@ -5160,23 +5140,25 @@ erts_multi_scheduling_blockers(Process *p) Eterm res = NIL; erts_smp_mtx_lock(&schdlr_sspnd.mtx); - if (schdlr_sspnd.msb.procs) { + if (!erts_proclist_is_empty(schdlr_sspnd.msb.procs)) { Eterm *hp, *hp_end; ErtsProcList *plp1, *plp2; - Uint max_size; - ASSERT(schdlr_sspnd.msb.procs); - for (max_size = 0, plp1 = schdlr_sspnd.msb.procs; + Uint max_size = 0; + + for (plp1 = erts_proclist_peek_first(schdlr_sspnd.msb.procs); plp1; - plp1 = plp1->next) { + plp1 = erts_proclist_peek_next(schdlr_sspnd.msb.procs, plp1)) { max_size += 2; } ASSERT(max_size); hp = HAlloc(p, max_size); hp_end = hp + max_size; - for (plp1 = schdlr_sspnd.msb.procs; plp1; plp1 = plp1->next) { - for (plp2 = schdlr_sspnd.msb.procs; + for (plp1 = erts_proclist_peek_first(schdlr_sspnd.msb.procs); + plp1; + plp1 = erts_proclist_peek_next(schdlr_sspnd.msb.procs, plp1)) { + for (plp2 = erts_proclist_peek_first(schdlr_sspnd.msb.procs); plp2->pid != plp1->pid; - plp2 = plp2->next); + plp2 = erts_proclist_peek_next(schdlr_sspnd.msb.procs, plp2)); if (plp2 == plp1) { res = CONS(hp, plp1->pid, res); hp += 2; @@ -6081,23 +6063,25 @@ erts_process_status(Process *c_p, ErtsProcLocks c_p_locks, void erts_suspend(Process* c_p, ErtsProcLocks c_p_locks, Port *busy_port) { -#ifdef DEBUG - int res; -#endif + int suspend; + ASSERT(c_p == erts_get_current_process()); ERTS_SMP_LC_ASSERT(c_p_locks == erts_proc_lc_my_proc_locks(c_p)); if (!(c_p_locks & ERTS_PROC_LOCK_STATUS)) erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); + if (busy_port) + suspend = erts_save_suspend_process_on_port(busy_port, c_p); + else + suspend = 1; + + if (suspend) { #ifdef DEBUG - res = + int res = #endif - suspend_process(c_p, c_p); - - ASSERT(res); - - if (busy_port) - erts_wake_process_later(busy_port, c_p); + suspend_process(c_p, c_p); + ASSERT(res); + } if (!(c_p_locks & ERTS_PROC_LOCK_STATUS)) erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); @@ -6116,16 +6100,19 @@ erts_resume(Process* process, ErtsProcLocks process_locks) } int -erts_resume_processes(ErtsProcList *plp) +erts_resume_processes(ErtsProcList *list) { + /* 'list' is expected to have been fetched (i.e. not a ring anymore) */ int nresumed = 0; + ErtsProcList *plp = list; + while (plp) { Process *proc; ErtsProcList *fplp; ASSERT(is_internal_pid(plp->pid)); proc = erts_pid2proc(NULL, 0, plp->pid, ERTS_PROC_LOCK_STATUS); if (proc) { - if (proclist_same(plp, proc)) { + if (erts_proclist_same(plp, proc)) { resume_process(proc); nresumed++; } @@ -6354,9 +6341,8 @@ Process *schedule(Process *p, int calls) #ifdef ERTS_SMP { ErtsProcList *pnd_xtrs = rq->procs.pending_exiters; - rq->procs.pending_exiters = NULL; - - if (pnd_xtrs) { + if (erts_proclist_fetch(&pnd_xtrs, NULL)) { + rq->procs.pending_exiters = NULL; erts_smp_runq_unlock(rq); handle_pending_exiters(pnd_xtrs); erts_smp_runq_lock(rq); @@ -7700,12 +7686,14 @@ erts_handle_pending_exit(Process *c_p, ErtsProcLocks locks) static void handle_pending_exiters(ErtsProcList *pnd_xtrs) { + /* 'list' is expected to have been fetched (i.e. not a ring anymore) */ ErtsProcList *plp = pnd_xtrs; - ErtsProcList *free_plp; + while (plp) { + ErtsProcList *free_plp; Process *p = erts_pid2proc(NULL, 0, plp->pid, ERTS_PROC_LOCKS_ALL); if (p) { - if (proclist_same(plp, p)) { + if (erts_proclist_same(plp, p)) { erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); if (!(state & ERTS_PSFLG_RUNNING)) { ASSERT(state & ERTS_PSFLG_PENDING_EXIT); @@ -7734,8 +7722,7 @@ save_pending_exiter(Process *p) erts_smp_runq_lock(rq); - plp->next = rq->procs.pending_exiters; - rq->procs.pending_exiters = plp; + erts_proclist_store_last(&rq->procs.pending_exiters, plp); erts_smp_runq_unlock(rq); wake_scheduler(rq, 1); diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 1feb5b96db..d613321421 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -290,6 +290,7 @@ struct ErtsProcList_ { Eterm pid; Uint64 started_interval; ErtsProcList* next; + ErtsProcList* prev; }; typedef struct ErtsMiscOpList_ ErtsMiscOpList; @@ -1135,7 +1136,172 @@ Uint64 erts_step_proc_interval(void); ErtsProcList *erts_proclist_create(Process *); void erts_proclist_destroy(ErtsProcList *); -int erts_proclist_same(ErtsProcList *, Process *); + +ERTS_GLB_INLINE int erts_proclist_same(ErtsProcList *, Process *); +ERTS_GLB_INLINE void erts_proclist_store_first(ErtsProcList **, ErtsProcList *); +ERTS_GLB_INLINE void erts_proclist_store_last(ErtsProcList **, ErtsProcList *); +ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_first(ErtsProcList *); +ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_last(ErtsProcList *); +ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_next(ErtsProcList *, ErtsProcList *); +ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_prev(ErtsProcList *, ErtsProcList *); +ERTS_GLB_INLINE ErtsProcList *erts_proclist_fetch_first(ErtsProcList **); +ERTS_GLB_INLINE ErtsProcList *erts_proclist_fetch_last(ErtsProcList **); +ERTS_GLB_INLINE int erts_proclist_fetch(ErtsProcList **, ErtsProcList **); +ERTS_GLB_INLINE void erts_proclist_remove(ErtsProcList **, ErtsProcList *); +ERTS_GLB_INLINE int erts_proclist_is_empty(ErtsProcList *); +ERTS_GLB_INLINE int erts_proclist_is_first(ErtsProcList *, ErtsProcList *); +ERTS_GLB_INLINE int erts_proclist_is_last(ErtsProcList *, ErtsProcList *); + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE int +erts_proclist_same(ErtsProcList *plp, Process *p) +{ + return (plp->pid == p->common.id + && (plp->started_interval + == p->common.u.alive.started_interval)); +} + +ERTS_GLB_INLINE void erts_proclist_store_first(ErtsProcList **list, + ErtsProcList *element) +{ + if (!*list) + element->next = element->prev = element; + else { + element->prev = (*list)->prev; + element->next = *list; + element->prev->next = element; + element->next->prev = element; + } + *list = element; +} + +ERTS_GLB_INLINE void erts_proclist_store_last(ErtsProcList **list, + ErtsProcList *element) +{ + if (!*list) { + element->next = element->prev = element; + *list = element; + } + else { + element->prev = (*list)->prev; + element->next = *list; + element->prev->next = element; + element->next->prev = element; + } +} + +ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_first(ErtsProcList *list) +{ + return list; +} + +ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_last(ErtsProcList *list) +{ + if (!list) + return NULL; + else + return list->prev; +} + +ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_next(ErtsProcList *list, + ErtsProcList *element) +{ + ErtsProcList *next; + ASSERT(list && element); + next = element->next; + return list == next ? NULL : next; +} + +ERTS_GLB_INLINE ErtsProcList *erts_proclist_peek_prev(ErtsProcList *list, + ErtsProcList *element) +{ + ErtsProcList *prev; + ASSERT(list && element); + prev = element->prev; + return list == element ? NULL : prev; +} + +ERTS_GLB_INLINE ErtsProcList *erts_proclist_fetch_first(ErtsProcList **list) +{ + if (!*list) + return NULL; + else { + ErtsProcList *res = *list; + if (res == *list) + *list = NULL; + else + *list = res->next; + res->next->prev = res->prev; + res->prev->next = res->next; + return res; + } +} + +ERTS_GLB_INLINE ErtsProcList *erts_proclist_fetch_last(ErtsProcList **list) +{ + if (!*list) + return NULL; + else { + ErtsProcList *res = (*list)->prev; + if (res == *list) + *list = NULL; + res->next->prev = res->prev; + res->prev->next = res->next; + return res; + } +} + +ERTS_GLB_INLINE int erts_proclist_fetch(ErtsProcList **list_first, + ErtsProcList **list_last) +{ + if (!*list_first) { + if (list_last) + *list_last = NULL; + return 0; + } + else { + if (list_last) + *list_last = (*list_first)->prev; + (*list_first)->prev->next = NULL; + (*list_first)->prev = NULL; + return !0; + } +} + +ERTS_GLB_INLINE void erts_proclist_remove(ErtsProcList **list, + ErtsProcList *element) +{ + ASSERT(list && *list); + if (*list == element) { + *list = element->next; + if (*list == element) + *list = NULL; + } + element->next->prev = element->prev; + element->prev->next = element->next; +} + +ERTS_GLB_INLINE int erts_proclist_is_empty(ErtsProcList *list) +{ + return list == NULL; +} + +ERTS_GLB_INLINE int erts_proclist_is_first(ErtsProcList *list, + ErtsProcList *element) +{ + ASSERT(list && element); + return list == element; +} + +ERTS_GLB_INLINE int erts_proclist_is_last(ErtsProcList *list, + ErtsProcList *element) +{ + ASSERT(list && element); + return list->prev == element; +} + +#endif void erts_schedule_thr_prgr_later_op(void (*)(void *), void *, diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index bce4619645..6713481a04 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -895,7 +895,7 @@ typedef struct { int erts_add_driver_entry(ErlDrvEntry *drv, DE_Handle *handle, int driver_list_locked); void erts_destroy_driver(erts_driver_t *drv); -void erts_wake_process_later(Port*, Process*); +int erts_save_suspend_process_on_port(Port*, Process*); Port *erts_open_driver(erts_driver_t*, Eterm, char*, SysDriverOpts*, int *, int *); int erts_is_port_ioq_empty(Port *); void erts_terminate_port(Port *); diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 9d38e5539e..96a89a3b4e 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -476,23 +476,19 @@ static void stopq(Port* prt) } } -void -erts_wake_process_later(Port *prt, Process *process) +int +erts_save_suspend_process_on_port(Port *prt, Process *process) { - ErtsProcList** p; - ErtsProcList* new_p; - - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - - if (erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD) - return; - - for (p = &(prt->suspended); *p != NULL; p = &((*p)->next)) - /* Empty loop body */; - - new_p = erts_proclist_create(process); - new_p->next = NULL; - *p = new_p; + int saved; + erts_aint32_t flags; + erts_port_task_sched_lock(&prt->sched); + flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + saved = ((flags & (ERTS_PTS_FLG_BUSY + | ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY); + if (saved) + erts_proclist_store_last(&prt->suspended, erts_proclist_create(process)); + erts_port_task_sched_unlock(&prt->sched); + return saved; } /* @@ -2094,9 +2090,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) #endif state = erts_atomic32_read_nob(&p->state); - if ((state & (ERTS_PORT_SFLGS_DEAD - | ERTS_PORT_SFLG_EXITING - | ERTS_PORT_SFLG_IMMORTAL)) + if ((state & (ERTS_PORT_SFLGS_DEAD|ERTS_PORT_SFLG_EXITING)) || ((reason == am_normal) && ((from != p->connected) && (from != p->common.id)))) { return; @@ -2468,6 +2462,8 @@ void set_busy_port(ErlDrvPort port_num, int on) { Port *prt; + erts_aint32_t flags; + #ifdef USE_VM_PROBES DTRACE_CHARBUF(port_str, 16); #endif @@ -2479,8 +2475,10 @@ set_busy_port(ErlDrvPort port_num, int on) return; if (on) { - erts_atomic32_read_bor_relb(&prt->state, - ERTS_PORT_SFLG_PORT_BUSY); + flags = erts_smp_atomic32_read_bor_nob(&prt->sched.flags, + ERTS_PTS_FLG_BUSY); + if (flags & ERTS_PTS_FLG_BUSY) + return; /* Already busy */ #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_busy)) { erts_snprintf(port_str, sizeof(port_str), @@ -2489,10 +2487,12 @@ set_busy_port(ErlDrvPort port_num, int on) } #endif } else { - ErtsProcList* plp = prt->suspended; - erts_atomic32_read_band_relb(&prt->state, - ~ERTS_PORT_SFLG_PORT_BUSY); - prt->suspended = NULL; + ErtsProcList *plp; + + flags = erts_smp_atomic32_read_band_nob(&prt->sched.flags, + ~ERTS_PTS_FLG_BUSY); + if (!(flags & ERTS_PTS_FLG_BUSY)) + return; /* Already non-busy */ #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_not_busy)) { @@ -2517,7 +2517,13 @@ set_busy_port(ErlDrvPort port_num, int on) * the first process. */ - if (plp) { + erts_port_task_sched_lock(&prt->sched); + plp = prt->suspended; + prt->suspended = NULL; + erts_port_task_sched_unlock(&prt->sched); + + if (erts_proclist_fetch(&plp, NULL)) { + #ifdef USE_VM_PROBES /* * Hrm, for blocked dist ports, plp always seems to be NULL. @@ -2540,8 +2546,10 @@ set_busy_port(ErlDrvPort port_num, int on) } } #endif + /* First proc should be resumed last */ if (plp->next) { + plp->next->prev = NULL; erts_resume_processes(plp->next); plp->next = NULL; } |