aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c41
1 files changed, 22 insertions, 19 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 2446b3c074..7322239a73 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -375,7 +375,6 @@ static Port *create_port(char *name,
prt->control_flags = 0;
prt->bytes_in = 0;
prt->bytes_out = 0;
- prt->dist_entry = NULL;
ERTS_PORT_INIT_CONNECTED(prt, pid);
prt->common.u.alive.reg = NULL;
ERTS_PTMR_INIT(prt);
@@ -3287,7 +3286,6 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
if (trace_send)
trace_port_send(prt, to, tuple, 1);
- ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
if (rp_locks)
erts_proc_unlock(rp, rp_locks);
@@ -3459,7 +3457,6 @@ deliver_vec_message(Port* prt, /* Port */
if (IS_TRACED_FL(prt, F_TRACE_SEND))
trace_port_send(prt, to, tuple, 1);
- ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
erts_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -3615,12 +3612,12 @@ terminate_port(Port *prt)
erts_cleanup_port_data(prt);
+ ASSERT(erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY) == NULL);
+
psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd);
if (psd)
erts_free(ERTS_ALC_T_PRTSD, psd);
- ASSERT(prt->dist_entry == NULL);
-
kill_port(prt);
/*
@@ -3761,10 +3758,12 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed,
DRV_MONITOR_UNLOCK_PDL(prt);
}
- if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && prt->dist_entry) {
- erts_do_net_exits(prt->dist_entry, modified_reason);
- erts_deref_dist_entry(prt->dist_entry);
- prt->dist_entry = NULL;
+ if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ ASSERT(dep);
+ erts_do_net_exits(dep, modified_reason);
+ erts_deref_dist_entry(dep);
+ erts_prtsd_set(prt, ERTS_PRTSD_DIST_ENTRY, NULL);
erts_atomic32_read_band_relb(&prt->state,
~ERTS_PORT_SFLG_DISTRIBUTION);
}
@@ -4074,7 +4073,7 @@ done:
* to the caller.
*/
int
-erl_drv_port_control(Eterm port_num, char cmd, char* buff, ErlDrvSizeT size)
+erl_drv_port_control(Eterm port_num, unsigned int cmd, char* buff, ErlDrvSizeT size)
{
ErtsProc2PortSigData *sigdp = erts_port_task_alloc_p2p_sig_data();
@@ -5052,7 +5051,7 @@ set_busy_port(ErlDrvPort dprt, int on)
DTRACE1(port_not_busy, port_str);
}
#endif
- if (prt->dist_entry) {
+ if (erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY) != NULL) {
/*
* Processes suspended on distribution ports are
* normally queued on the dist entry.
@@ -5382,7 +5381,6 @@ void driver_report_exit(ErlDrvPort ix, int status)
if (IS_TRACED_FL(prt, F_TRACE_SEND))
trace_port_send(prt, pid, tuple, 1);
- ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
erts_proc_unlock(rp, rp_locks);
@@ -5988,8 +5986,6 @@ driver_deliver_term(Port *prt, Eterm to, ErlDrvTermData* data, int len)
from = prt->common.id;
}
- /* send message */
- ERL_MESSAGE_TOKEN(factory.message) = am_undefined;
erts_queue_message(rp, rp_locks, factory.message, mess, from);
}
else if (res == -2) {
@@ -6174,9 +6170,12 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
else
erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
- erts_atomic64_inc_nob(&prt->dist_entry->in);
+ DistEntry* dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ Uint32 conn_id = (Uint32)(UWord) erts_prtsd_get(prt, ERTS_PRTSD_CONN_ID);
+ erts_atomic64_inc_nob(&dep->in);
return erts_net_message(prt,
- prt->dist_entry,
+ dep,
+ conn_id,
(byte*) hbuf, hlen,
(byte*) (bin->orig_bytes+offs), len);
}
@@ -6215,15 +6214,19 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
else
erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
- erts_atomic64_inc_nob(&prt->dist_entry->in);
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ Uint32 conn_id = (Uint32)(UWord) erts_prtsd_get(prt, ERTS_PRTSD_CONN_ID);
+ erts_atomic64_inc_nob(&dep->in);
if (len == 0)
return erts_net_message(prt,
- prt->dist_entry,
+ dep,
+ conn_id,
NULL, 0,
(byte*) hbuf, hlen);
else
return erts_net_message(prt,
- prt->dist_entry,
+ dep,
+ conn_id,
(byte*) hbuf, hlen,
(byte*) buf, len);
}