aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_process.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_process.c')
-rw-r--r--erts/emulator/beam/erl_process.c346
1 files changed, 180 insertions, 166 deletions
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index bad9da90ea..e490ffea5a 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -148,6 +148,7 @@ extern BeamInstr beam_apply[];
extern BeamInstr beam_exit[];
extern BeamInstr beam_continue_exit[];
+int erts_default_spo_flags = 0;
int erts_eager_check_io = 1;
int erts_sched_compact_load;
int erts_sched_balance_util = 0;
@@ -351,7 +352,8 @@ struct erts_system_profile_flags_t erts_system_profile_flags;
typedef enum {
ERTS_PSTT_GC, /* Garbage Collect */
- ERTS_PSTT_CPC /* Check Process Code */
+ ERTS_PSTT_CPC, /* Check Process Code */
+ ERTS_PSTT_COHMQ /* Change off heap message queue */
} ErtsProcSysTaskType;
#define ERTS_MAX_PROC_SYS_TASK_ARGS 2
@@ -982,7 +984,7 @@ reply_sched_wall_time(void *vswtrp)
Eterm **hpp;
Uint sz, *szp;
ErlOffHeap *ohp = NULL;
- ErlHeapFragment *bp = NULL;
+ ErtsMessage *mp = NULL;
ASSERT(esdp);
#ifdef ERTS_DIRTY_SCHEDULERS
@@ -1038,12 +1040,12 @@ reply_sched_wall_time(void *vswtrp)
if (hpp)
break;
- hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, sz, &hp, &ohp);
szp = NULL;
hpp = &hp;
}
- erts_queue_message(rp, &rp_locks, bp, msg, NIL);
+ erts_queue_message(rp, &rp_locks, mp, msg, NIL);
if (swtrp->req_sched == esdp->no)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -6294,22 +6296,99 @@ erts_schedule_process(Process *p, erts_aint32_t state, ErtsProcLocks locks)
schedule_process(p, state, locks);
}
-static void
-schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy)
+static int
+schedule_process_sys_task(Process *p, erts_aint32_t prio, ErtsProcSysTask *st)
{
- /*
- * Expects status lock to be locked when called, and
- * returns with status lock unlocked...
- */
- erts_aint32_t a = state, n, enq_prio = -1;
+ int res;
+ int locked;
+ ErtsProcSysTaskQs *stqs, *free_stqs;
+ erts_aint32_t state, a, n, enq_prio;
int enqueue; /* < 0 -> use proxy */
- unsigned int prof_runnable_procs = erts_system_profile_flags.runnable_procs;
+ unsigned int prof_runnable_procs;
+
+ res = 1; /* prepare for success */
+ st->next = st->prev = st; /* Prep for empty prio queue */
+ state = erts_smp_atomic32_read_nob(&p->state);
+ prof_runnable_procs = erts_system_profile_flags.runnable_procs;
+ locked = 0;
+ free_stqs = NULL;
+ if (state & ERTS_PSFLG_ACTIVE_SYS)
+ stqs = NULL;
+ else {
+ alloc_qs:
+ stqs = proc_sys_task_queues_alloc();
+ stqs->qmask = 1 << prio;
+ stqs->ncount = 0;
+ stqs->q[PRIORITY_MAX] = NULL;
+ stqs->q[PRIORITY_HIGH] = NULL;
+ stqs->q[PRIORITY_NORMAL] = NULL;
+ stqs->q[PRIORITY_LOW] = NULL;
+ stqs->q[prio] = st;
+ }
+
+ if (!locked) {
+ locked = 1;
+ erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
+
+ state = erts_smp_atomic32_read_nob(&p->state);
+ if (state & ERTS_PSFLG_EXITING) {
+ free_stqs = stqs;
+ res = 0;
+ goto cleanup;
+ }
+ }
+
+ if (!p->sys_task_qs) {
+ if (stqs)
+ p->sys_task_qs = stqs;
+ else
+ goto alloc_qs;
+ }
+ else {
+ free_stqs = stqs;
+ stqs = p->sys_task_qs;
+ if (!stqs->q[prio]) {
+ stqs->q[prio] = st;
+ stqs->qmask |= 1 << prio;
+ }
+ else {
+ st->next = stqs->q[prio];
+ st->prev = stqs->q[prio]->prev;
+ st->next->prev = st;
+ st->prev->next = st;
+ ASSERT(stqs->qmask & (1 << prio));
+ }
+ }
+
+ if (ERTS_PSFLGS_GET_ACT_PRIO(state) > prio) {
+ erts_aint32_t n, a, e;
+ /* Need to elevate actual prio */
+
+ a = state;
+ do {
+ if (ERTS_PSFLGS_GET_ACT_PRIO(a) <= prio) {
+ n = a;
+ break;
+ }
+ n = e = a;
+ n &= ~ERTS_PSFLGS_ACT_PRIO_MASK;
+ n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET);
+ a = erts_smp_atomic32_cmpxchg_nob(&p->state, n, e);
+ } while (a != e);
+ state = n;
+ }
+
+
+ a = state;
+ enq_prio = -1;
/* Status lock prevents out of order "runnable proc" trace msgs */
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- if (!prof_runnable_procs)
+ if (!prof_runnable_procs) {
erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
+ locked = 0;
+ }
ASSERT(!(state & ERTS_PSFLG_PROXY));
@@ -6317,8 +6396,10 @@ schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy)
erts_aint32_t e;
n = e = a;
- if (a & ERTS_PSFLG_FREE)
+ if (a & ERTS_PSFLG_FREE) {
+ res = 0;
goto cleanup; /* We don't want to schedule free processes... */
+ }
enqueue = ERTS_ENQUEUE_NOT;
n |= ERTS_PSFLG_ACTIVE_SYS;
@@ -6342,29 +6423,24 @@ schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy)
}
erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
- prof_runnable_procs = 0;
+ locked = 0;
}
- if (enqueue != ERTS_ENQUEUE_NOT) {
- Process *sched_p;
- if (enqueue > 0)
- sched_p = p;
- else {
- sched_p = make_proxy_proc(proxy, p, enq_prio);
- proxy = NULL;
- }
- add2runq(sched_p, n, enq_prio);
- }
+ if (enqueue != ERTS_ENQUEUE_NOT)
+ add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio),
+ n, enq_prio);
cleanup:
- if (prof_runnable_procs)
+ if (locked)
erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
- if (proxy)
- free_proxy_proc(proxy);
+ if (free_stqs)
+ proc_sys_task_queues_free(free_stqs);
ERTS_SMP_LC_ASSERT(!(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)));
+
+ return res;
}
static ERTS_INLINE int
@@ -9696,13 +9772,13 @@ Process *schedule(Process *p, int calls)
}
}
- if (!(state & ERTS_PSFLG_EXITING)
- && ((FLAGS(p) & F_FORCE_GC)
- || (MSO(p).overhead > BIN_VHEAP_SZ(p)))) {
- reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity);
- if (reds <= 0) {
- p->fcalls = reds;
- goto sched_out_proc;
+ if (ERTS_IS_GC_DESIRED(p)) {
+ if (!(state & ERTS_PSFLG_EXITING) && !(p->flags & F_DISABLE_GC)) {
+ reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity);
+ if (reds <= 0) {
+ p->fcalls = reds;
+ goto sched_out_proc;
+ }
}
}
@@ -9742,7 +9818,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result)
if (rp) {
ErtsProcLocks rp_locks;
ErlOffHeap *ohp;
- ErlHeapFragment* bp;
+ ErtsMessage *mp;
Eterm *hp, msg, req_id, result;
Uint st_result_sz, hsz;
#ifdef DEBUG
@@ -9754,11 +9830,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result)
st_result_sz = is_immed(st_result) ? 0 : size_object(st_result);
hsz = st->req_id_sz + st_result_sz + 4 /* 3-tuple */;
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp);
#ifdef DEBUG
hp_start = hp;
@@ -9783,7 +9855,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result)
ASSERT(hp_start + hsz == hp);
#endif
- erts_queue_message(rp, &rp_locks, bp, msg, NIL);
+ erts_queue_message(rp, &rp_locks, mp, msg, NIL);
if (c_p == rp)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -10005,6 +10077,10 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds)
st = NULL;
}
break;
+ case ERTS_PSTT_COHMQ:
+ reds += erts_complete_off_heap_message_queue_change(c_p);
+ st_res = am_true;
+ break;
default:
ERTS_INTERNAL_ERROR("Invalid process sys task type");
st_res = am_false;
@@ -10047,6 +10123,9 @@ cleanup_sys_tasks(Process *c_p, erts_aint32_t in_state, int in_reds)
case ERTS_PSTT_CPC:
st_res = am_false;
break;
+ case ERTS_PSTT_COHMQ:
+ st_res = am_false;
+ break;
default:
ERTS_INTERNAL_ERROR("Invalid process sys task type");
st_res = am_false;
@@ -10065,10 +10144,8 @@ BIF_RETTYPE
erts_internal_request_system_task_3(BIF_ALIST_3)
{
Process *rp = erts_proc_lookup(BIF_ARG_1);
- ErtsProcSysTaskQs *stqs, *free_stqs = NULL;
ErtsProcSysTask *st = NULL;
- erts_aint32_t prio, rp_state;
- int rp_locked;
+ erts_aint32_t prio;
Eterm noproc_res, req_type;
if (!rp && !is_internal_pid(BIF_ARG_1)) {
@@ -10125,7 +10202,6 @@ erts_internal_request_system_task_3(BIF_ALIST_3)
}
st = erts_alloc(ERTS_ALC_T_PROC_SYS_TSK,
ERTS_PROC_SYS_TASK_SIZE(tot_sz));
- st->next = st->prev = st; /* Prep for empty prio queue */
ERTS_INIT_OFF_HEAP(&st->off_heap);
hp = &st->heap[0];
@@ -10169,95 +10245,11 @@ erts_internal_request_system_task_3(BIF_ALIST_3)
goto badarg;
}
- rp_state = erts_smp_atomic32_read_nob(&rp->state);
-
- rp_locked = 0;
-
- free_stqs = NULL;
- if (rp_state & ERTS_PSFLG_ACTIVE_SYS)
- stqs = NULL;
- else {
- alloc_qs:
- stqs = proc_sys_task_queues_alloc();
- stqs->qmask = 1 << prio;
- stqs->ncount = 0;
- stqs->q[PRIORITY_MAX] = NULL;
- stqs->q[PRIORITY_HIGH] = NULL;
- stqs->q[PRIORITY_NORMAL] = NULL;
- stqs->q[PRIORITY_LOW] = NULL;
- stqs->q[prio] = st;
- }
-
- if (!rp_locked) {
- rp_locked = 1;
- erts_smp_proc_lock(rp, ERTS_PROC_LOCK_STATUS);
-
- rp_state = erts_smp_atomic32_read_nob(&rp->state);
- if (rp_state & ERTS_PSFLG_EXITING) {
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- rp = NULL;
- free_stqs = stqs;
- goto noproc;
- }
+ if (!schedule_process_sys_task(rp, prio, st)) {
+ noproc:
+ notify_sys_task_executed(BIF_P, st, noproc_res);
}
- if (!rp->sys_task_qs) {
- if (stqs)
- rp->sys_task_qs = stqs;
- else
- goto alloc_qs;
- }
- else {
- if (stqs)
- free_stqs = stqs;
- stqs = rp->sys_task_qs;
- if (!stqs->q[prio]) {
- stqs->q[prio] = st;
- stqs->qmask |= 1 << prio;
- }
- else {
- st->next = stqs->q[prio];
- st->prev = stqs->q[prio]->prev;
- st->next->prev = st;
- st->prev->next = st;
- ASSERT(stqs->qmask & (1 << prio));
- }
- }
-
- if (ERTS_PSFLGS_GET_ACT_PRIO(rp_state) > prio) {
- erts_aint32_t n, a, e;
- /* Need to elevate actual prio */
-
- a = rp_state;
- do {
- if (ERTS_PSFLGS_GET_ACT_PRIO(a) <= prio) {
- n = a;
- break;
- }
- n = e = a;
- n &= ~ERTS_PSFLGS_ACT_PRIO_MASK;
- n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET);
- a = erts_smp_atomic32_cmpxchg_nob(&rp->state, n, e);
- } while (a != e);
- rp_state = n;
- }
-
- /*
- * schedule_process_sys_task() unlocks status
- * lock on process.
- */
- schedule_process_sys_task(rp, rp_state, NULL);
-
- if (free_stqs)
- proc_sys_task_queues_free(free_stqs);
-
- BIF_RET(am_ok);
-
-noproc:
-
- notify_sys_task_executed(BIF_P, st, noproc_res);
- if (free_stqs)
- proc_sys_task_queues_free(free_stqs);
BIF_RET(am_ok);
badarg:
@@ -10266,11 +10258,35 @@ badarg:
erts_cleanup_offheap(&st->off_heap);
erts_free(ERTS_ALC_T_PROC_SYS_TSK, st);
}
- if (free_stqs)
- proc_sys_task_queues_free(free_stqs);
BIF_ERROR(BIF_P, BADARG);
}
+void
+erts_schedule_complete_off_heap_message_queue_change(Eterm pid)
+{
+ Process *rp = erts_proc_lookup(pid);
+ if (rp) {
+ ErtsProcSysTask *st;
+ erts_aint32_t state;
+ int i;
+
+ st = erts_alloc(ERTS_ALC_T_PROC_SYS_TSK,
+ ERTS_PROC_SYS_TASK_SIZE(0));
+ st->type = ERTS_PSTT_COHMQ;
+ st->requester = NIL;
+ st->reply_tag = NIL;
+ st->req_id = NIL;
+ st->req_id_sz = 0;
+ for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++)
+ st->arg[i] = NIL;
+ ERTS_INIT_OFF_HEAP(&st->off_heap);
+ state = erts_smp_atomic32_read_nob(&rp->state);
+
+ if (!schedule_process_sys_task(rp, ERTS_PSFLGS_GET_USR_PRIO(state), st))
+ erts_free(ERTS_ALC_T_PROC_SYS_TSK, st);
+ }
+}
+
static void
save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio)
{
@@ -10716,6 +10732,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
Eterm args, /* Arguments for function (must be well-formed list). */
ErlSpawnOpts* so) /* Options for spawn. */
{
+ Uint flags = erts_default_process_flags;
ErtsRunQueue *rq = NULL;
Process *p;
Sint arity; /* Number of arguments. */
@@ -10753,6 +10770,11 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
state |= (((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_ACT_PRIO_OFFSET)
| ((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_USR_PRIO_OFFSET));
+ if (so->flags & SPO_OFF_HEAP_MSGQ) {
+ state |= ERTS_PSFLG_OFF_HEAP_MSGQ;
+ flags |= F_OFF_HEAP_MSGQ;
+ }
+
if (!rq)
rq = erts_get_runq_proc(parent);
@@ -10775,7 +10797,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
BM_SWAP_TIMER(size,system);
heap_need = arg_size;
- p->flags = erts_default_process_flags;
+ p->flags = flags;
p->static_flags = 0;
if (so->flags & SPO_SYSTEM_PROC)
@@ -10824,6 +10846,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->stop = p->hend = p->heap + sz;
p->htop = p->heap;
p->heap_sz = sz;
+ p->abandoned_heap = NULL;
+ p->live_hf_end = ERTS_INVALID_HFRAG_PTR;
p->catches = 0;
p->bin_vheap_sz = p->min_vheap_size;
@@ -10894,6 +10918,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->accessor_bif_timers = NULL;
#endif
p->mbuf = NULL;
+ p->msg_frag = NULL;
p->mbuf_sz = 0;
p->psd = NULL;
p->dictionary = NULL;
@@ -11029,6 +11054,8 @@ void erts_init_empty_process(Process *p)
p->stop = NULL;
p->hend = NULL;
p->heap = NULL;
+ p->abandoned_heap = NULL;
+ p->live_hf_end = ERTS_INVALID_HFRAG_PTR;
p->gen_gcs = 0;
p->max_gen_gcs = 0;
p->min_heap_size = 0;
@@ -11061,6 +11088,7 @@ void erts_init_empty_process(Process *p)
p->old_htop = NULL;
p->old_heap = NULL;
p->mbuf = NULL;
+ p->msg_frag = NULL;
p->mbuf_sz = 0;
p->psd = NULL;
ERTS_P_MONITORS(p) = NULL;
@@ -11149,6 +11177,8 @@ erts_debug_verify_clean_empty_process(Process* p)
ASSERT(p->htop == NULL);
ASSERT(p->stop == NULL);
ASSERT(p->hend == NULL);
+ ASSERT(p->abandoned_heap == NULL);
+ ASSERT(p->live_hf_end == ERTS_INVALID_HFRAG_PTR);
ASSERT(p->heap == NULL);
ASSERT(p->common.id == ERTS_INVALID_PID);
ASSERT(ERTS_TRACER_PROC(p) == NIL);
@@ -11226,8 +11256,6 @@ erts_cleanup_empty_process(Process* p)
static void
delete_process(Process* p)
{
- ErlMessage* mp;
-
VERBOSE(DEBUG_PROCESSES, ("Removing process: %T\n",p->common.id));
/* Cleanup psd */
@@ -11283,24 +11311,8 @@ delete_process(Process* p)
erts_erase_dicts(p);
/* free all pending messages */
- mp = p->msg.first;
- while(mp != NULL) {
- ErlMessage* next_mp = mp->next;
- if (mp->data.attached) {
- if (is_value(mp->m[0]))
- free_message_buffer(mp->data.heap_frag);
- else {
- if (is_not_nil(mp->m[1])) {
- ErlHeapFragment *heap_frag;
- heap_frag = (ErlHeapFragment *) mp->data.dist_ext->ext_endp;
- erts_cleanup_offheap(&heap_frag->off_heap);
- }
- erts_free_dist_ext_copy(mp->data.dist_ext);
- }
- }
- free_message(mp);
- mp = next_mp;
- }
+ erts_cleanup_messages(p->msg.first);
+ p->msg.first = NULL;
ASSERT(!p->nodes_monitors);
ASSERT(!p->suspend_monitors);
@@ -11488,6 +11500,9 @@ static ERTS_INLINE void
send_exit_message(Process *to, ErtsProcLocks *to_locksp,
Eterm exit_term, Uint term_size, Eterm token)
{
+ ErtsMessage *mp;
+ ErlOffHeap *ohp;
+
if (token == NIL
#ifdef USE_VM_PROBES
|| token == am_have_dt_utag
@@ -11495,14 +11510,12 @@ send_exit_message(Process *to, ErtsProcLocks *to_locksp,
) {
Eterm* hp;
Eterm mess;
- ErlHeapFragment* bp;
- ErlOffHeap *ohp;
- hp = erts_alloc_message_heap(term_size, &bp, &ohp, to, to_locksp);
+ mp = erts_alloc_message_heap(to, to_locksp,
+ term_size, &hp, &ohp);
mess = copy_struct(exit_term, term_size, &hp, ohp);
- erts_queue_message(to, to_locksp, bp, mess, NIL);
+ erts_queue_message(to, to_locksp, mp, mess, NIL);
} else {
- ErlHeapFragment* bp;
Eterm* hp;
Eterm mess;
Eterm temp_token;
@@ -11510,13 +11523,14 @@ send_exit_message(Process *to, ErtsProcLocks *to_locksp,
ASSERT(is_tuple(token));
sz_token = size_object(token);
- bp = new_message_buffer(term_size+sz_token);
- hp = bp->mem;
- mess = copy_struct(exit_term, term_size, &hp, &bp->off_heap);
+
+ mp = erts_alloc_message_heap(to, to_locksp,
+ term_size+sz_token, &hp, &ohp);
+ mess = copy_struct(exit_term, term_size, &hp, ohp);
/* the trace token must in this case be updated by the caller */
seq_trace_output(token, mess, SEQ_TRACE_SEND, to->common.id, NULL);
- temp_token = copy_struct(token, sz_token, &hp, &bp->off_heap);
- erts_queue_message(to, to_locksp, bp, mess, temp_token);
+ temp_token = copy_struct(token, sz_token, &hp, ohp);
+ erts_queue_message(to, to_locksp, mp, mess, temp_token);
}
}