diff options
author | Sverker Eriksson <[email protected]> | 2014-07-01 22:04:55 +0200 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2014-08-29 16:11:24 +0200 |
commit | 88b094b6439737b61c117cd6873beea4518757a8 (patch) | |
tree | 6854431d5c2d8e5969a11a9a89fea9ed8b22a266 /erts/emulator/beam/dist.h | |
parent | 1af8998028f77b4ca01c52972a5983b072ef02d1 (diff) | |
download | otp-88b094b6439737b61c117cd6873beea4518757a8.tar.gz otp-88b094b6439737b61c117cd6873beea4518757a8.tar.bz2 otp-88b094b6439737b61c117cd6873beea4518757a8.zip |
erts: Implement yielding for distributed send of large messages
Use same mechanism as term_to_binary to yield
while encoding large messages for distributed send.
Diffstat (limited to 'erts/emulator/beam/dist.h')
-rw-r--r-- | erts/emulator/beam/dist.h | 97 |
1 files changed, 95 insertions, 2 deletions
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index f32b999198..2a2ba0c83f 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -22,6 +22,7 @@ #include "erl_process.h" #include "erl_node_tables.h" +#include "zlib.h" #define DFLAG_PUBLISHED 0x01 #define DFLAG_ATOM_CACHE 0x02 @@ -264,17 +265,105 @@ erts_destroy_dist_link(ErtsDistLinkData *dldp) #endif + + +/* Define for testing */ +/* #define EXTREME_TTB_TRAPPING 1 */ + +#ifndef EXTREME_TTB_TRAPPING +#define TERM_TO_BINARY_LOOP_FACTOR 32 +#else +#define TERM_TO_BINARY_LOOP_FACTOR 1 +#endif + +typedef enum { TTBSize, TTBEncode, TTBCompress } TTBState; +typedef struct TTBSizeContext_ { + Uint flags; + int level; + Uint result; + Eterm obj; + ErtsEStack estack; +} TTBSizeContext; + +typedef struct TTBEncodeContext_ { + Uint flags; + int level; + byte* ep; + Eterm obj; + ErtsWStack wstack; + Binary *result_bin; +} TTBEncodeContext; + +typedef struct { + Uint real_size; + Uint dest_len; + byte *dbytes; + Binary *result_bin; + Binary *destination_bin; + z_stream stream; +} TTBCompressContext; + +typedef struct { + int alive; + TTBState state; + union { + TTBSizeContext sc; + TTBEncodeContext ec; + TTBCompressContext cc; + } s; +} TTBContext; + +enum erts_dsig_send_phase { + ERTS_DSIG_SEND_PHASE_INIT, + ERTS_DSIG_SEND_PHASE_MSG_SIZE, + ERTS_DSIG_SEND_PHASE_ALLOC, + ERTS_DSIG_SEND_PHASE_MSG_ENCODE, + ERTS_DSIG_SEND_PHASE_FIN +}; + +struct erts_dsig_send_context { + enum erts_dsig_send_phase phase; + Sint reds; + + Eterm ctl; + Eterm msg; + int force_busy; + Uint32 pass_through_size; + Uint data_size, dhdr_ext_size; + ErtsAtomCacheMap *acmp; + ErtsDistOutputBuf *obuf; + Uint32 flags; + Process *c_p; + union { + TTBSizeContext sc; + TTBEncodeContext ec; + }u; +}; + +typedef struct { + int suspend; + + Eterm ctl_heap[6]; + ErtsDSigData dsd; + DistEntry* dep_to_deref; + struct erts_dsig_send_context dss; + + Eterm return_term; +}ErtsSendContext; + + /* * erts_dsig_send_* return values. */ #define ERTS_DSIG_SEND_OK 0 #define ERTS_DSIG_SEND_YIELD 1 +#define ERTS_DSIG_SEND_CONTINUE 2 extern int erts_dsig_send_link(ErtsDSigData *, Eterm, Eterm); -extern int erts_dsig_send_msg(ErtsDSigData *, Eterm, Eterm); +extern int erts_dsig_send_msg(Eterm, Eterm, ErtsSendContext*); extern int erts_dsig_send_exit_tt(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm); extern int erts_dsig_send_unlink(ErtsDSigData *, Eterm, Eterm); -extern int erts_dsig_send_reg_msg(ErtsDSigData *, Eterm, Eterm); +extern int erts_dsig_send_reg_msg(Eterm, Eterm, ErtsSendContext*); extern int erts_dsig_send_group_leader(ErtsDSigData *, Eterm, Eterm); extern int erts_dsig_send_exit(ErtsDSigData *, Eterm, Eterm, Eterm); extern int erts_dsig_send_exit2(ErtsDSigData *, Eterm, Eterm, Eterm); @@ -282,6 +371,10 @@ extern int erts_dsig_send_demonitor(ErtsDSigData *, Eterm, Eterm, Eterm, int); extern int erts_dsig_send_monitor(ErtsDSigData *, Eterm, Eterm, Eterm); extern int erts_dsig_send_m_exit(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm); +extern int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx); +extern void erts_dsend_context_dtor(Binary*); +extern Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx); + extern int erts_dist_command(Port *prt, int reds); extern void erts_dist_port_not_busy(Port *prt); extern void erts_kill_dist_connection(DistEntry *dep, Uint32); |