aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c165
1 files changed, 82 insertions, 83 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index a1da1addf9..8bbe6450eb 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -772,77 +772,25 @@ void init_dist(void)
#define ErtsDistOutputBuf2Binary(OB) OB->bin
-#ifdef DEBUG
-
-struct obuf_list;
-struct obuf_list {
- erts_refc_t refc;
- struct obuf_list *next;
- struct obuf_list *prev;
-};
-#define obuf_list_size sizeof(struct obuf_list)
-static struct obuf_list *erts_obuf_list = NULL;
-static erts_mtx_t erts_obuf_list_mtx;
-
-static void
-insert_obuf(struct obuf_list *obuf, erts_aint_t initial) {
- erts_mtx_lock(&erts_obuf_list_mtx);
- obuf->next = erts_obuf_list;
- obuf->prev = NULL;
- erts_refc_init(&obuf->refc, initial);
- if (erts_obuf_list)
- erts_obuf_list->prev = obuf;
- erts_obuf_list = obuf;
- erts_mtx_unlock(&erts_obuf_list_mtx);
-}
-
-static void
-remove_obuf(struct obuf_list *obuf) {
- if (erts_refc_dectest(&obuf->refc, 0) == 0) {
- erts_mtx_lock(&erts_obuf_list_mtx);
- if (obuf->prev) {
- obuf->prev->next = obuf->next;
- } else {
- erts_obuf_list = obuf->next;
- }
- if (obuf->next) obuf->next->prev = obuf->prev;
- erts_mtx_unlock(&erts_obuf_list_mtx);
- }
-}
-
-void check_obuf(void);
-void check_obuf(void) {
- erts_mtx_lock(&erts_obuf_list_mtx);
- ERTS_ASSERT(erts_obuf_list == NULL);
- erts_mtx_unlock(&erts_obuf_list_mtx);
-}
-#else
-#define insert_obuf(...)
-#define remove_obuf(...)
-#define obuf_list_size 0
-#endif
-
static ERTS_INLINE ErtsDistOutputBuf *
alloc_dist_obuf(Uint size, Uint headers)
{
int i;
ErtsDistOutputBuf *obuf;
Uint obuf_size = sizeof(ErtsDistOutputBuf)*(headers) +
- sizeof(byte)*size + obuf_list_size;
+ sizeof(byte)*size;
Binary *bin = erts_bin_drv_alloc(obuf_size);
- size += obuf_list_size;
obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[size];
erts_refc_add(&bin->intern.refc, headers - 1, 1);
for (i = 0; i < headers; i++) {
obuf[i].bin = bin;
- obuf[i].extp = (byte *)&bin->orig_bytes[0] + obuf_list_size;
+ obuf[i].extp = (byte *)&bin->orig_bytes[0];
#ifdef DEBUG
obuf[i].dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN;
obuf[i].alloc_endp = obuf->extp + size;
ASSERT(bin == ErtsDistOutputBuf2Binary(obuf));
#endif
}
- insert_obuf((struct obuf_list*)&bin->orig_bytes[0], headers);
return obuf;
}
@@ -851,10 +799,7 @@ free_dist_obuf(ErtsDistOutputBuf *obuf)
{
Binary *bin = ErtsDistOutputBuf2Binary(obuf);
ASSERT(obuf->dbg_pattern == ERTS_DIST_OUTPUT_BUF_DBG_PATTERN);
- remove_obuf((struct obuf_list*)&bin->orig_bytes[0]);
- if (erts_refc_dectest(&bin->intern.refc, 0) == 0) {
- erts_bin_free(bin);
- }
+ erts_bin_release(bin);
}
static ERTS_INLINE Sint
@@ -1968,6 +1913,7 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
reason = tuple[5];
+ edep = NULL;
}
if (is_not_ref(ref))
@@ -2014,12 +1960,14 @@ int erts_net_message(Port *prt,
}
token = NIL;
reason = tuple[4];
+ edep = NULL;
} else if (type == DOP_EXIT_TT){
if (tuple_arity != 5) {
goto invalid_message;
}
token = tuple[4];
reason = tuple[5];
+ edep = NULL;
} else if (type == DOP_PAYLOAD_EXIT) {
if (tuple_arity != 3) {
goto invalid_message;
@@ -2067,12 +2015,14 @@ int erts_net_message(Port *prt,
}
reason = tuple[4];
token = NIL;
+ edep = NULL;
} else if (type == DOP_EXIT2_TT) {
if (tuple_arity != 5) {
goto invalid_message;
}
token = tuple[4];
reason = tuple[5];
+ edep = NULL;
} else if (type == DOP_PAYLOAD_EXIT2) {
if (tuple_arity != 3) {
goto invalid_message;
@@ -2430,8 +2380,8 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
case ERTS_DSIG_SEND_PHASE_FIN: {
ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp);
- ASSERT(((byte*)&ctx->obuf->bin->orig_bytes[0]+obuf_list_size) <= ctx->obuf->extp - ctx->max_finalize_prepend);
- ASSERT(ctx->obuf->ext_endp <= ((byte*)ctx->obuf->bin->orig_bytes+obuf_list_size) + ctx->data_size + ctx->dhdr_ext_size);
+ ASSERT(((byte*)&ctx->obuf->bin->orig_bytes[0]) <= ctx->obuf->extp - ctx->max_finalize_prepend);
+ ASSERT(ctx->obuf->ext_endp <= ((byte*)ctx->obuf->bin->orig_bytes) + ctx->data_size + ctx->dhdr_ext_size);
ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp;
@@ -2935,7 +2885,9 @@ erts_dist_command(Port *prt, int initial_reds)
ob = oq.first;
ASSERT(ob);
do {
+ obufsize += size_obuf(ob);
reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds);
+ obufsize -= size_obuf(ob);
if (reds < 0)
break;
last_finalized = ob;
@@ -2974,7 +2926,9 @@ erts_dist_command(Port *prt, int initial_reds)
while (oq.first && !preempt) {
ErtsDistOutputBuf *fob;
Uint size;
+ obufsize += size_obuf(oq.first);
reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds);
+ obufsize -= size_obuf(oq.first);
if (reds < 0) {
preempt = 1;
break;
@@ -3407,14 +3361,13 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
{
DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
- Sint reds = initial_reds;
+ Sint reds = initial_reds, obufsize = 0;
ErtsDistOutputBuf *obuf;
- Eterm *hp;
+ Eterm *hp, res;
ProcBin *pb;
erts_aint_t qsize;
Uint32 conn_id, get_size;
- Eterm res;
- Uint hsz, bin_sz;
+ Uint hsz = 0, bin_sz;
if (!dep)
BIF_ERROR(BIF_P, EXC_NOTSUP);
@@ -3466,7 +3419,9 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
}
obuf = dep->tmp_out_queue.first;
+ obufsize += size_obuf(obuf);
reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds);
+ obufsize -= size_obuf(obuf);
if (reds < 0) {
erts_de_runlock(dep);
ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1],
@@ -3482,8 +3437,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
erts_de_runlock(dep);
- bin_sz = obuf->ext_endp - obuf->extp;
- hsz = PROC_BIN_SIZE;
+ bin_sz = obuf->ext_endp - obuf->extp + obuf->hdr_endp - obuf->hdrp;
get_size = dep->opts & ERTS_DIST_CTRL_OPT_GET_SIZE;
if (get_size) {
@@ -3492,18 +3446,50 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
hsz += BIG_UINT_HEAP_SIZE;
}
- hp = HAlloc(BIF_P, hsz);
- pb = (ProcBin *) (char *) hp;
- pb->thing_word = HEADER_PROC_BIN;
- pb->size = bin_sz;
- pb->next = MSO(BIF_P).first;
- MSO(BIF_P).first = (struct erl_off_heap_header*) pb;
- pb->val = ErtsDistOutputBuf2Binary(obuf);
- pb->bytes = (byte*) obuf->extp;
- pb->flags = 0;
- hp += PROC_BIN_SIZE;
+ if (!obuf->hdrp) {
+ hp = HAlloc(BIF_P, PROC_BIN_SIZE + hsz);
+ pb = (ProcBin *) (char *) hp;
+ pb->thing_word = HEADER_PROC_BIN;
+ pb->size = obuf->ext_endp - obuf->extp;
+ pb->next = MSO(BIF_P).first;
+ MSO(BIF_P).first = (struct erl_off_heap_header*) pb;
+ pb->val = ErtsDistOutputBuf2Binary(obuf);
+ pb->bytes = (byte*) obuf->extp;
+ pb->flags = 0;
+ res = make_binary(pb);
+ } else {
+ hp = HAlloc(BIF_P, PROC_BIN_SIZE * 2 + 4 + hsz);
+ pb = (ProcBin *) (char *) hp;
+ pb->thing_word = HEADER_PROC_BIN;
+ pb->size = obuf->ext_endp - obuf->extp;
+ pb->next = MSO(BIF_P).first;
+ MSO(BIF_P).first = (struct erl_off_heap_header*) pb;
+ pb->val = ErtsDistOutputBuf2Binary(obuf);
+ pb->bytes = (byte*) obuf->extp;
+ pb->flags = 0;
+ hp += PROC_BIN_SIZE;
+
+ res = CONS(hp, make_binary(pb), NIL);
+ hp += 2;
+
+ pb = (ProcBin *) (char *) hp;
+ pb->thing_word = HEADER_PROC_BIN;
+ pb->size = obuf->hdr_endp - obuf->hdrp;
+ pb->next = MSO(BIF_P).first;
+ MSO(BIF_P).first = (struct erl_off_heap_header*) pb;
+ pb->val = ErtsDistOutputBuf2Binary(obuf);
+ erts_refc_inc(&pb->val->intern.refc, 1);
+ pb->bytes = (byte*) obuf->hdrp;
+ pb->flags = 0;
+ hp += PROC_BIN_SIZE;
+ res = CONS(hp, make_binary(pb), res);
+ hp += 2;
+ }
+
+ obufsize += size_obuf(obuf);
+
+ qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) -obufsize);
- qsize = erts_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf));
ASSERT(qsize >= 0);
if (qsize < erts_dist_buf_busy_limit/2
@@ -3518,8 +3504,6 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
}
}
- res = make_binary(pb);
-
if (get_size) {
Eterm sz_term;
if (IS_USMALL(0, bin_sz))
@@ -4711,10 +4695,6 @@ init_nodes_monitors(void)
{
erts_mtx_init(&nodes_monitors_mtx, "nodes_monitors", NIL,
ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION);
-#ifdef DEBUG
- erts_mtx_init(&erts_obuf_list_mtx, "sad", NIL,
- ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION);
-#endif
nodes_monitors = NULL;
no_nodes_monitors = 0;
}
@@ -5105,3 +5085,22 @@ erts_processes_monitoring_nodes(Process *c_p)
return ctxt.res;
}
+
+static void
+print_suspended_on_de(fmtfn_t to, void *to_arg, DistEntry *dep)
+{
+ for (; dep; dep = dep->next) {
+ ErtsProcList *curr = erts_proclist_peek_first(dep->suspended);
+ while (curr) {
+ if (!is_internal_pid(curr->u.pid))
+ print_process_info(to, to_arg, curr->u.p, 0);
+ curr = erts_proclist_peek_next(dep->suspended, curr);
+ }
+ }
+}
+
+void
+erts_dist_print_procs_suspended_on_de(fmtfn_t to, void *to_arg) {
+ print_suspended_on_de(to, to_arg, erts_hidden_dist_entries);
+ print_suspended_on_de(to, to_arg, erts_visible_dist_entries);
+}