diff options
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r-- | erts/emulator/beam/dist.c | 165 |
1 files changed, 82 insertions, 83 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index a1da1addf9..8bbe6450eb 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -772,77 +772,25 @@ void init_dist(void) #define ErtsDistOutputBuf2Binary(OB) OB->bin -#ifdef DEBUG - -struct obuf_list; -struct obuf_list { - erts_refc_t refc; - struct obuf_list *next; - struct obuf_list *prev; -}; -#define obuf_list_size sizeof(struct obuf_list) -static struct obuf_list *erts_obuf_list = NULL; -static erts_mtx_t erts_obuf_list_mtx; - -static void -insert_obuf(struct obuf_list *obuf, erts_aint_t initial) { - erts_mtx_lock(&erts_obuf_list_mtx); - obuf->next = erts_obuf_list; - obuf->prev = NULL; - erts_refc_init(&obuf->refc, initial); - if (erts_obuf_list) - erts_obuf_list->prev = obuf; - erts_obuf_list = obuf; - erts_mtx_unlock(&erts_obuf_list_mtx); -} - -static void -remove_obuf(struct obuf_list *obuf) { - if (erts_refc_dectest(&obuf->refc, 0) == 0) { - erts_mtx_lock(&erts_obuf_list_mtx); - if (obuf->prev) { - obuf->prev->next = obuf->next; - } else { - erts_obuf_list = obuf->next; - } - if (obuf->next) obuf->next->prev = obuf->prev; - erts_mtx_unlock(&erts_obuf_list_mtx); - } -} - -void check_obuf(void); -void check_obuf(void) { - erts_mtx_lock(&erts_obuf_list_mtx); - ERTS_ASSERT(erts_obuf_list == NULL); - erts_mtx_unlock(&erts_obuf_list_mtx); -} -#else -#define insert_obuf(...) -#define remove_obuf(...) -#define obuf_list_size 0 -#endif - static ERTS_INLINE ErtsDistOutputBuf * alloc_dist_obuf(Uint size, Uint headers) { int i; ErtsDistOutputBuf *obuf; Uint obuf_size = sizeof(ErtsDistOutputBuf)*(headers) + - sizeof(byte)*size + obuf_list_size; + sizeof(byte)*size; Binary *bin = erts_bin_drv_alloc(obuf_size); - size += obuf_list_size; obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[size]; erts_refc_add(&bin->intern.refc, headers - 1, 1); for (i = 0; i < headers; i++) { obuf[i].bin = bin; - obuf[i].extp = (byte *)&bin->orig_bytes[0] + obuf_list_size; + obuf[i].extp = (byte *)&bin->orig_bytes[0]; #ifdef DEBUG obuf[i].dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; obuf[i].alloc_endp = obuf->extp + size; ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); #endif } - insert_obuf((struct obuf_list*)&bin->orig_bytes[0], headers); return obuf; } @@ -851,10 +799,7 @@ free_dist_obuf(ErtsDistOutputBuf *obuf) { Binary *bin = ErtsDistOutputBuf2Binary(obuf); ASSERT(obuf->dbg_pattern == ERTS_DIST_OUTPUT_BUF_DBG_PATTERN); - remove_obuf((struct obuf_list*)&bin->orig_bytes[0]); - if (erts_refc_dectest(&bin->intern.refc, 0) == 0) { - erts_bin_free(bin); - } + erts_bin_release(bin); } static ERTS_INLINE Sint @@ -1968,6 +1913,7 @@ int erts_net_message(Port *prt, goto invalid_message; } reason = tuple[5]; + edep = NULL; } if (is_not_ref(ref)) @@ -2014,12 +1960,14 @@ int erts_net_message(Port *prt, } token = NIL; reason = tuple[4]; + edep = NULL; } else if (type == DOP_EXIT_TT){ if (tuple_arity != 5) { goto invalid_message; } token = tuple[4]; reason = tuple[5]; + edep = NULL; } else if (type == DOP_PAYLOAD_EXIT) { if (tuple_arity != 3) { goto invalid_message; @@ -2067,12 +2015,14 @@ int erts_net_message(Port *prt, } reason = tuple[4]; token = NIL; + edep = NULL; } else if (type == DOP_EXIT2_TT) { if (tuple_arity != 5) { goto invalid_message; } token = tuple[4]; reason = tuple[5]; + edep = NULL; } else if (type == DOP_PAYLOAD_EXIT2) { if (tuple_arity != 3) { goto invalid_message; @@ -2430,8 +2380,8 @@ erts_dsig_send(ErtsDSigSendContext *ctx) case ERTS_DSIG_SEND_PHASE_FIN: { ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp); - ASSERT(((byte*)&ctx->obuf->bin->orig_bytes[0]+obuf_list_size) <= ctx->obuf->extp - ctx->max_finalize_prepend); - ASSERT(ctx->obuf->ext_endp <= ((byte*)ctx->obuf->bin->orig_bytes+obuf_list_size) + ctx->data_size + ctx->dhdr_ext_size); + ASSERT(((byte*)&ctx->obuf->bin->orig_bytes[0]) <= ctx->obuf->extp - ctx->max_finalize_prepend); + ASSERT(ctx->obuf->ext_endp <= ((byte*)ctx->obuf->bin->orig_bytes) + ctx->data_size + ctx->dhdr_ext_size); ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; @@ -2935,7 +2885,9 @@ erts_dist_command(Port *prt, int initial_reds) ob = oq.first; ASSERT(ob); do { + obufsize += size_obuf(ob); reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds); + obufsize -= size_obuf(ob); if (reds < 0) break; last_finalized = ob; @@ -2974,7 +2926,9 @@ erts_dist_command(Port *prt, int initial_reds) while (oq.first && !preempt) { ErtsDistOutputBuf *fob; Uint size; + obufsize += size_obuf(oq.first); reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds); + obufsize -= size_obuf(oq.first); if (reds < 0) { preempt = 1; break; @@ -3407,14 +3361,13 @@ dist_ctrl_get_data_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P); - Sint reds = initial_reds; + Sint reds = initial_reds, obufsize = 0; ErtsDistOutputBuf *obuf; - Eterm *hp; + Eterm *hp, res; ProcBin *pb; erts_aint_t qsize; Uint32 conn_id, get_size; - Eterm res; - Uint hsz, bin_sz; + Uint hsz = 0, bin_sz; if (!dep) BIF_ERROR(BIF_P, EXC_NOTSUP); @@ -3466,7 +3419,9 @@ dist_ctrl_get_data_1(BIF_ALIST_1) } obuf = dep->tmp_out_queue.first; + obufsize += size_obuf(obuf); reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds); + obufsize -= size_obuf(obuf); if (reds < 0) { erts_de_runlock(dep); ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1], @@ -3482,8 +3437,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_de_runlock(dep); - bin_sz = obuf->ext_endp - obuf->extp; - hsz = PROC_BIN_SIZE; + bin_sz = obuf->ext_endp - obuf->extp + obuf->hdr_endp - obuf->hdrp; get_size = dep->opts & ERTS_DIST_CTRL_OPT_GET_SIZE; if (get_size) { @@ -3492,18 +3446,50 @@ dist_ctrl_get_data_1(BIF_ALIST_1) hsz += BIG_UINT_HEAP_SIZE; } - hp = HAlloc(BIF_P, hsz); - pb = (ProcBin *) (char *) hp; - pb->thing_word = HEADER_PROC_BIN; - pb->size = bin_sz; - pb->next = MSO(BIF_P).first; - MSO(BIF_P).first = (struct erl_off_heap_header*) pb; - pb->val = ErtsDistOutputBuf2Binary(obuf); - pb->bytes = (byte*) obuf->extp; - pb->flags = 0; - hp += PROC_BIN_SIZE; + if (!obuf->hdrp) { + hp = HAlloc(BIF_P, PROC_BIN_SIZE + hsz); + pb = (ProcBin *) (char *) hp; + pb->thing_word = HEADER_PROC_BIN; + pb->size = obuf->ext_endp - obuf->extp; + pb->next = MSO(BIF_P).first; + MSO(BIF_P).first = (struct erl_off_heap_header*) pb; + pb->val = ErtsDistOutputBuf2Binary(obuf); + pb->bytes = (byte*) obuf->extp; + pb->flags = 0; + res = make_binary(pb); + } else { + hp = HAlloc(BIF_P, PROC_BIN_SIZE * 2 + 4 + hsz); + pb = (ProcBin *) (char *) hp; + pb->thing_word = HEADER_PROC_BIN; + pb->size = obuf->ext_endp - obuf->extp; + pb->next = MSO(BIF_P).first; + MSO(BIF_P).first = (struct erl_off_heap_header*) pb; + pb->val = ErtsDistOutputBuf2Binary(obuf); + pb->bytes = (byte*) obuf->extp; + pb->flags = 0; + hp += PROC_BIN_SIZE; + + res = CONS(hp, make_binary(pb), NIL); + hp += 2; + + pb = (ProcBin *) (char *) hp; + pb->thing_word = HEADER_PROC_BIN; + pb->size = obuf->hdr_endp - obuf->hdrp; + pb->next = MSO(BIF_P).first; + MSO(BIF_P).first = (struct erl_off_heap_header*) pb; + pb->val = ErtsDistOutputBuf2Binary(obuf); + erts_refc_inc(&pb->val->intern.refc, 1); + pb->bytes = (byte*) obuf->hdrp; + pb->flags = 0; + hp += PROC_BIN_SIZE; + res = CONS(hp, make_binary(pb), res); + hp += 2; + } + + obufsize += size_obuf(obuf); + + qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) -obufsize); - qsize = erts_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf)); ASSERT(qsize >= 0); if (qsize < erts_dist_buf_busy_limit/2 @@ -3518,8 +3504,6 @@ dist_ctrl_get_data_1(BIF_ALIST_1) } } - res = make_binary(pb); - if (get_size) { Eterm sz_term; if (IS_USMALL(0, bin_sz)) @@ -4711,10 +4695,6 @@ init_nodes_monitors(void) { erts_mtx_init(&nodes_monitors_mtx, "nodes_monitors", NIL, ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION); -#ifdef DEBUG - erts_mtx_init(&erts_obuf_list_mtx, "sad", NIL, - ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION); -#endif nodes_monitors = NULL; no_nodes_monitors = 0; } @@ -5105,3 +5085,22 @@ erts_processes_monitoring_nodes(Process *c_p) return ctxt.res; } + +static void +print_suspended_on_de(fmtfn_t to, void *to_arg, DistEntry *dep) +{ + for (; dep; dep = dep->next) { + ErtsProcList *curr = erts_proclist_peek_first(dep->suspended); + while (curr) { + if (!is_internal_pid(curr->u.pid)) + print_process_info(to, to_arg, curr->u.p, 0); + curr = erts_proclist_peek_next(dep->suspended, curr); + } + } +} + +void +erts_dist_print_procs_suspended_on_de(fmtfn_t to, void *to_arg) { + print_suspended_on_de(to, to_arg, erts_hidden_dist_entries); + print_suspended_on_de(to, to_arg, erts_visible_dist_entries); +} |