aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c130
1 files changed, 92 insertions, 38 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 8bbe6450eb..ff19ef018e 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -55,7 +55,6 @@
*/
#if 0
#define ERTS_DIST_MSG_DBG
-FILE *dbg_file;
#endif
#if 0
/* Enable this to print the dist debug messages to a file instead */
@@ -67,6 +66,7 @@ FILE *dbg_file;
#endif
#if defined(ERTS_DIST_MSG_DBG) || defined(ERTS_RAW_DIST_MSG_DBG)
+FILE *dbg_file;
static void bw(byte *buf, ErlDrvSizeT sz)
{
bin_write(ERTS_PRINT_FILE, dbg_file, buf, sz);
@@ -743,7 +743,7 @@ void init_dist(void)
sprintf(buff, ERTS_DIST_MSG_DBG_FILE, getpid());
dbg_file = fopen(buff,"w+");
}
-#elif defined (ERTS_DIST_MSG_DBG)
+#elif defined(ERTS_DIST_MSG_DBG) || defined(ERTS_RAW_DIST_MSG_DBG)
dbg_file = stderr;
#endif
@@ -775,19 +775,25 @@ void init_dist(void)
static ERTS_INLINE ErtsDistOutputBuf *
alloc_dist_obuf(Uint size, Uint headers)
{
- int i;
+ Uint obuf_size = sizeof(ErtsDistOutputBuf)*(headers);
ErtsDistOutputBuf *obuf;
- Uint obuf_size = sizeof(ErtsDistOutputBuf)*(headers) +
- sizeof(byte)*size;
- Binary *bin = erts_bin_drv_alloc(obuf_size);
- obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[size];
+ Binary *bin;
+ byte *extp;
+ int i;
+
+ bin = erts_bin_drv_alloc(obuf_size + size);
erts_refc_add(&bin->intern.refc, headers - 1, 1);
+
+ obuf = (ErtsDistOutputBuf *)&bin->orig_bytes[0];
+ extp = (byte *)&bin->orig_bytes[obuf_size];
+
for (i = 0; i < headers; i++) {
obuf[i].bin = bin;
- obuf[i].extp = (byte *)&bin->orig_bytes[0];
+ obuf[i].extp = extp;
#ifdef DEBUG
obuf[i].dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN;
- obuf[i].alloc_endp = obuf->extp + size;
+ obuf[i].ext_startp = extp;
+ obuf[i].alloc_endp = &extp[size];
ASSERT(bin == ErtsDistOutputBuf2Binary(obuf));
#endif
}
@@ -1360,7 +1366,7 @@ erts_dist_seq_tree_foreach_delete_yielding(DistSeqNode **root,
limit);
if (res > 0) {
if (ysp != &ys)
- erts_free(ERTS_ALC_T_ML_YIELD_STATE, ysp);
+ erts_free(ERTS_ALC_T_SEQ_YIELD_STATE, ysp);
*vyspp = NULL;
}
else {
@@ -1838,7 +1844,7 @@ int erts_net_message(Port *prt,
if (locks)
erts_proc_unlock(rp, locks);
- } else if (ede_hfrag) {
+ } else if (ede_hfrag != NULL) {
erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
free_message_buffer(ede_hfrag);
}
@@ -1880,16 +1886,18 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
rp = erts_proc_lookup(to);
+
if (rp) {
ErtsProcLocks locks = 0;
erts_queue_dist_message(rp, locks, edep, ede_hfrag, token, am_Empty);
if (locks)
erts_proc_unlock(rp, locks);
- } else if (ede_hfrag) {
+ } else if (ede_hfrag != NULL) {
erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
free_message_buffer(ede_hfrag);
}
+
break;
}
@@ -1930,15 +1938,19 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- if (!erts_proc_lookup(watcher)) break; /* Process not alive */
-
- if (reason == THE_NON_VALUE) {
+ if (!erts_proc_lookup(watcher)) {
+ if (ede_hfrag != NULL) {
+ erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
+ free_message_buffer(ede_hfrag);
+ }
+ break; /* Process not alive */
+ }
#ifdef ERTS_DIST_MSG_DBG
+ if (reason == THE_NON_VALUE) {
dist_msg_dbg(edep, "MSG", buf, orig_len);
-#endif
-
}
+#endif
erts_proc_sig_send_dist_monitor_down(
dep, ref, watched, watcher, edep, ede_hfrag, reason);
@@ -1987,13 +1999,19 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- if (!erts_proc_lookup(to)) break; /* Process not alive */
+ if (!erts_proc_lookup(to)) {
+ if (ede_hfrag != NULL) {
+ erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
+ free_message_buffer(ede_hfrag);
+ }
+ break; /* Process not alive */
+ }
- if (reason == THE_NON_VALUE) {
#ifdef ERTS_DIST_MSG_DBG
+ if (reason == THE_NON_VALUE) {
dist_msg_dbg(edep, "MSG", buf, orig_len);
-#endif
}
+#endif
erts_proc_sig_send_dist_link_exit(dep,
from, to, edep, ede_hfrag,
@@ -2042,13 +2060,19 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- if (!erts_proc_lookup(to)) break; /* Process not alive */
+ if (!erts_proc_lookup(to)) {
+ if (ede_hfrag != NULL) {
+ erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
+ free_message_buffer(ede_hfrag);
+ }
+ break; /* Process not alive */
+ }
- if (reason == THE_NON_VALUE) {
#ifdef ERTS_DIST_MSG_DBG
+ if (reason == THE_NON_VALUE) {
dist_msg_dbg(edep, "MSG", buf, orig_len);
-#endif
}
+#endif
erts_proc_sig_send_dist_exit(dep, from, to, edep, ede_hfrag, reason, token);
break;
@@ -2301,8 +2325,18 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
ctx->data_size = ctx->max_finalize_prepend;
erts_reset_atom_cache_map(ctx->acmp);
- erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size);
+ switch (erts_encode_dist_ext_size(ctx->ctl, ctx->flags,
+ ctx->acmp, &ctx->data_size)) {
+ case ERTS_EXT_SZ_OK:
+ break;
+ case ERTS_EXT_SZ_SYSTEM_LIMIT:
+ retval = ERTS_DSIG_SEND_TOO_LRG;
+ goto done;
+ case ERTS_EXT_SZ_YIELD:
+ ERTS_INTERNAL_ERROR("Unexpected yield result");
+ break;
+ }
if (is_non_value(ctx->msg)) {
ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
break;
@@ -2312,17 +2346,31 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
ctx->u.sc.level = 0;
ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
- case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
- if (!ctx->no_trap) {
- if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
- retval = ERTS_DSIG_SEND_CONTINUE;
- goto done;
- }
- } else {
- erts_encode_dist_ext_size(ctx->msg, ctx->flags, ctx->acmp, &ctx->data_size);
+ case ERTS_DSIG_SEND_PHASE_MSG_SIZE: {
+ ErtsExtSzRes sz_res;
+ sz_res = (!ctx->no_trap
+ ? erts_encode_dist_ext_size_ctx(ctx->msg,
+ ctx,
+ &ctx->data_size)
+ : erts_encode_dist_ext_size(ctx->msg,
+ ctx->flags,
+ ctx->acmp,
+ &ctx->data_size));
+ switch (sz_res) {
+ case ERTS_EXT_SZ_OK:
+ break;
+ case ERTS_EXT_SZ_SYSTEM_LIMIT:
+ retval = ERTS_DSIG_SEND_TOO_LRG;
+ goto done;
+ case ERTS_EXT_SZ_YIELD:
+ if (ctx->no_trap)
+ ERTS_INTERNAL_ERROR("Unexpected yield result");
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
}
ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
+ }
case ERTS_DSIG_SEND_PHASE_ALLOC:
erts_finalize_atom_cache_map(ctx->acmp, ctx->flags);
@@ -2341,7 +2389,8 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
(ctx->fragments-1) * ERTS_DIST_FRAGMENT_HEADER_SIZE,
ctx->fragments);
ctx->obuf->ext_start = &ctx->obuf->extp[0];
- ctx->obuf->ext_endp = &ctx->obuf->extp[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size;
+ ctx->obuf->ext_endp = &ctx->obuf->extp[0] + ctx->max_finalize_prepend
+ + ctx->dhdr_ext_size;
/* Encode internal version of dist header */
ctx->obuf->extp = erts_encode_ext_dist_header_setup(
@@ -2380,8 +2429,8 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
case ERTS_DSIG_SEND_PHASE_FIN: {
ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp);
- ASSERT(((byte*)&ctx->obuf->bin->orig_bytes[0]) <= ctx->obuf->extp - ctx->max_finalize_prepend);
- ASSERT(ctx->obuf->ext_endp <= ((byte*)ctx->obuf->bin->orig_bytes) + ctx->data_size + ctx->dhdr_ext_size);
+ ASSERT(ctx->obuf->ext_startp <= ctx->obuf->extp - ctx->max_finalize_prepend);
+ ASSERT(ctx->obuf->ext_endp <= (byte*)ctx->obuf->ext_startp + ctx->data_size + ctx->dhdr_ext_size);
ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp;
@@ -3424,6 +3473,8 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
obufsize -= size_obuf(obuf);
if (reds < 0) {
erts_de_runlock(dep);
+ if (obufsize)
+ erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize);
ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1],
BIF_P, BIF_ARG_1);
}
@@ -3457,6 +3508,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
pb->bytes = (byte*) obuf->extp;
pb->flags = 0;
res = make_binary(pb);
+ hp += PROC_BIN_SIZE;
} else {
hp = HAlloc(BIF_P, PROC_BIN_SIZE * 2 + 4 + hsz);
pb = (ProcBin *) (char *) hp;
@@ -3748,10 +3800,12 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */
BIF_RETTYPE setnode_2(BIF_ALIST_2)
{
Process *net_kernel;
- Uint32 creation;
+ Uint creation;
/* valid creation ? */
- if(!term_to_Uint32(BIF_ARG_2, &creation))
+ if(!term_to_Uint(BIF_ARG_2, &creation))
+ goto error;
+ if(creation > 3)
goto error;
/* valid node name ? */
@@ -3795,7 +3849,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
erts_thr_progress_block();
inc_no_nodes();
- erts_set_this_node(BIF_ARG_1, creation);
+ erts_set_this_node(BIF_ARG_1, (Uint32) creation);
erts_is_alive = 1;
send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL);
erts_thr_progress_unblock();