aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c74
1 files changed, 43 insertions, 31 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index b14ca77a04..1d307ae15e 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -1432,10 +1432,11 @@ finalize_force_imm_drv_call(ErtsTryImmDrvCallState *sp)
static ERTS_INLINE void
queue_port_sched_op_reply(Process *rp,
- ErtsProcLocks *rp_locksp,
+ ErtsProcLocks rp_locks,
ErtsHeapFactory* factory,
Uint32 *ref_num,
- Eterm msg)
+ Eterm msg,
+ Port* prt)
{
Eterm* hp = erts_produce_heap(factory, ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE, 0);
Eterm ref;
@@ -1448,11 +1449,12 @@ queue_port_sched_op_reply(Process *rp,
erts_factory_trim_and_close(factory, &msg, 1);
- erts_queue_message(rp, rp_locksp, factory->message, msg);
+ erts_queue_message(rp, rp_locks, factory->message, msg,
+ prt ? prt->common.id : am_undefined);
}
static void
-port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
+port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg, Port* prt)
{
Process *rp = erts_proc_lookup_raw(to);
if (rp) {
@@ -1478,10 +1480,11 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
factory.off_heap));
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
ref_num,
- msg_copy);
+ msg_copy,
+ prt);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -1651,7 +1654,7 @@ port_badsig(Port *prt, erts_aint32_t state, int op,
state,
sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT);
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg, prt);
return ERTS_PORT_REDS_BADSIG;
} /* port_badsig */
/* bad_port_signal() will
@@ -1820,7 +1823,7 @@ port_sig_outputv(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *s
}
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, reply);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, reply, prt);
cleanup_scheduled_outputv(sigdp->u.outputv.evp,
sigdp->u.outputv.cbinp);
@@ -1928,7 +1931,7 @@ port_sig_output(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *si
}
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, reply);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, reply, prt);
cleanup_scheduled_output(sigdp->u.output.bufp);
@@ -2636,7 +2639,7 @@ port_sig_exit(Port *prt,
if (sigdp->u.exit.bp)
free_message_buffer(sigdp->u.exit.bp);
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, msg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, msg, prt);
return ERTS_PORT_REDS_EXIT;
}
@@ -2829,7 +2832,7 @@ port_sig_connect(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *s
msg = am_true;
}
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, msg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, msg, prt);
return ERTS_PORT_REDS_CONNECT;
}
@@ -2912,7 +2915,7 @@ port_sig_unlink(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *si
if (op == ERTS_PROC2PORT_SIG_EXEC)
port_unlink(prt, sigdp->u.unlink.from);
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
return ERTS_PORT_REDS_UNLINK;
}
@@ -3007,7 +3010,7 @@ port_sig_link(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigd
port_link_failure(sigdp->u.link.port, sigdp->u.link.to);
}
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
return ERTS_PORT_REDS_LINK;
}
@@ -3064,7 +3067,8 @@ init_ack_send_reply(Port *port, Eterm resp)
}
port_sched_op_reply(port->async_open_port->to,
port->async_open_port->ref,
- resp);
+ resp,
+ port);
erts_free(ERTS_ALC_T_PRTSD, port->async_open_port);
port->async_open_port = NULL;
@@ -3461,7 +3465,7 @@ deliver_result(Port *prt, Eterm sender, Eterm pid, Eterm res)
sz_res + 3, &hp, &ohp);
res = copy_struct(res, sz_res, &hp, ohp);
tuple = TUPLE2(hp, sender, res);
- erts_queue_message(rp, &rp_locks, mp, tuple);
+ erts_queue_message(rp, rp_locks, mp, tuple, sender);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -3562,7 +3566,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
trace_port_send(prt, to, tuple, 1);
ERL_MESSAGE_TOKEN(mp) = am_undefined;
- erts_queue_message(rp, &rp_locks, mp, tuple);
+ erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -3734,7 +3738,7 @@ deliver_vec_message(Port* prt, /* Port */
trace_port_send(prt, to, tuple, 1);
ERL_MESSAGE_TOKEN(mp) = am_undefined;
- erts_queue_message(rp, &rp_locks, mp, tuple);
+ erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
erts_proc_dec_refc(rp);
@@ -4388,10 +4392,11 @@ port_sig_control(Port *prt,
&factory.hp,
factory.off_heap);
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
sigdp->ref,
- msg);
+ msg,
+ prt);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -4402,7 +4407,7 @@ port_sig_control(Port *prt,
/* failure */
if (sigdp->caller != ERTS_INVALID_PID)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg, prt);
done:
@@ -4739,10 +4744,11 @@ port_sig_call(Port *prt,
msg = TUPLE2(hp, am_ok, msg);
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
sigdp->ref,
- msg);
+ msg,
+ prt);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -4754,7 +4760,7 @@ port_sig_call(Port *prt,
}
}
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg, prt);
done:
@@ -4969,7 +4975,7 @@ port_sig_info(Port *prt,
{
ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY);
if (op != ERTS_PROC2PORT_SIG_EXEC)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_undefined);
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_undefined, prt);
else {
Eterm *hp, *hp_start;
Uint hsz;
@@ -4995,10 +5001,11 @@ port_sig_info(Port *prt,
mp->data.heap_frag = bp;
erts_factory_selfcontained_message_init(&factory, mp, hp);
queue_port_sched_op_reply(rp,
- &rp_locks,
+ rp_locks,
&factory,
sigdp->ref,
- value);
+ value,
+ prt);
}
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -5115,7 +5122,7 @@ reply_io_bytes(void *vreq)
msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout);
- erts_queue_message(rp, &rp_locks, mp, msg);
+ erts_queue_message(rp, rp_locks, mp, msg, am_system);
if (req->sched_id == sched_id)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -5613,7 +5620,7 @@ void driver_report_exit(ErlDrvPort ix, int status)
trace_port_send(prt, pid, tuple, 1);
ERL_MESSAGE_TOKEN(mp) = am_undefined;
- erts_queue_message(rp, &rp_locks, mp, tuple);
+ erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -6217,15 +6224,20 @@ driver_deliver_term(Port *prt, Eterm to, ErlDrvTermData* data, int len)
done:
if (res > 0) {
+ Eterm from = am_undefined;
mess = ESTACK_POP(stack); /* get resulting value */
erts_factory_trim_and_close(&factory, &mess, 1);
- if (prt && IS_TRACED_FL(prt, F_TRACE_SEND))
- trace_port_send(prt, to, mess, 1);
+ if (prt) {
+ if (IS_TRACED_FL(prt, F_TRACE_SEND)) {
+ trace_port_send(prt, to, mess, 1);
+ }
+ from = prt->common.id;
+ }
/* send message */
ERL_MESSAGE_TOKEN(factory.message) = am_undefined;
- erts_queue_message(rp, &rp_locks, factory.message, mess);
+ erts_queue_message(rp, rp_locks, factory.message, mess, from);
}
else if (res == -2) {
/* this clause only happens when we were requested to