diff options
Diffstat (limited to 'erts/emulator/drivers/common')
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 165 |
1 files changed, 117 insertions, 48 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 3210ffa92a..236b8710fb 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -678,6 +678,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_LOPT_UDP_READ_PACKETS 33 /* Number of packets to read */ #define INET_OPT_RAW 34 /* Raw socket options */ #define INET_LOPT_TCP_SEND_TIMEOUT_CLOSE 35 /* auto-close on send timeout or not */ +#define INET_LOPT_TCP_MSGQ_HIWTRMRK 36 /* set local high watermark */ +#define INET_LOPT_TCP_MSGQ_LOWTRMRK 37 /* set local low watermark */ /* SCTP options: a separate range, from 100: */ #define SCTP_OPT_RTOINFO 100 #define SCTP_OPT_ASSOCINFO 101 @@ -788,6 +790,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy */ #define INET_LOW_WATERMARK (1024*4) /* 4k pending => allow more */ +#define INET_HIGH_MSGQ_WATERMARK (1024*8) /* 8k pending high => busy */ +#define INET_LOW_MSGQ_WATERMARK (1024*4) /* 4k pending => allow more */ #define INET_INFINITY 0xffffffff /* infinity value */ @@ -879,7 +883,7 @@ typedef struct subs_list_ { #define NO_PROCESS 0 #define NO_SUBSCRIBERS(SLP) ((SLP)->subscriber == NO_PROCESS) -static void send_to_subscribers(ErlDrvPort, subs_list *, int, +static void send_to_subscribers(ErlDrvTermData, subs_list *, int, ErlDrvTermData [], int); static void free_subscribers(subs_list*); static int save_subscriber(subs_list *, ErlDrvTermData); @@ -1873,8 +1877,7 @@ static int deq_async(inet_descriptor* desc, int* ap, ErlDrvTermData* cp, int* rp ** {inet_async, Port, Ref, ok} */ static int -send_async_ok(ErlDrvPort port, ErlDrvTermData Port, int Ref, - ErlDrvTermData recipient) +send_async_ok(ErlDrvTermData Port, int Ref,ErlDrvTermData recipient) { ErlDrvTermData spec[2*LOAD_ATOM_CNT + LOAD_PORT_CNT + LOAD_INT_CNT + LOAD_TUPLE_CNT]; @@ -1888,14 +1891,14 @@ send_async_ok(ErlDrvPort port, ErlDrvTermData Port, int Ref, ASSERT(i == sizeof(spec)/sizeof(*spec)); - return driver_send_term(port, recipient, spec, i); + return erl_drv_send_term(Port, recipient, spec, i); } /* send message: ** {inet_async, Port, Ref, {ok,Port2}} */ static int -send_async_ok_port(ErlDrvPort port, ErlDrvTermData Port, int Ref, +send_async_ok_port(ErlDrvTermData Port, int Ref, ErlDrvTermData recipient, ErlDrvTermData Port2) { ErlDrvTermData spec[2*LOAD_ATOM_CNT + 2*LOAD_PORT_CNT + @@ -1914,14 +1917,14 @@ send_async_ok_port(ErlDrvPort port, ErlDrvTermData Port, int Ref, ASSERT(i == sizeof(spec)/sizeof(*spec)); - return driver_send_term(port, recipient, spec, i); + return erl_drv_send_term(Port, recipient, spec, i); } /* send message: ** {inet_async, Port, Ref, {error,Reason}} */ static int -send_async_error(ErlDrvPort port, ErlDrvTermData Port, int Ref, +send_async_error(ErlDrvTermData Port, int Ref, ErlDrvTermData recipient, ErlDrvTermData Reason) { ErlDrvTermData spec[3*LOAD_ATOM_CNT + LOAD_PORT_CNT + @@ -1939,7 +1942,7 @@ send_async_error(ErlDrvPort port, ErlDrvTermData Port, int Ref, i = LOAD_TUPLE(spec, i, 4); ASSERT(i == sizeof(spec)/sizeof(*spec)); DEBUGF(("send_async_error %ld %ld\r\n", recipient, Reason)); - return driver_send_term(port, recipient, spec, i); + return erl_drv_send_term(Port, recipient, spec, i); } @@ -1951,7 +1954,7 @@ static int async_ok(inet_descriptor* desc) if (deq_async(desc, &aid, &caller, &req) < 0) return -1; - return send_async_ok(desc->port, desc->dport, aid, caller); + return send_async_ok(desc->dport, aid, caller); } static int async_ok_port(inet_descriptor* desc, ErlDrvTermData Port2) @@ -1962,7 +1965,7 @@ static int async_ok_port(inet_descriptor* desc, ErlDrvTermData Port2) if (deq_async(desc, &aid, &caller, &req) < 0) return -1; - return send_async_ok_port(desc->port, desc->dport, aid, caller, Port2); + return send_async_ok_port(desc->dport, aid, caller, Port2); } static int async_error_am(inet_descriptor* desc, ErlDrvTermData reason) @@ -1973,8 +1976,7 @@ static int async_error_am(inet_descriptor* desc, ErlDrvTermData reason) if (deq_async(desc, &aid, &caller, &req) < 0) return -1; - return send_async_error(desc->port, desc->dport, aid, caller, - reason); + return send_async_error(desc->dport, aid, caller, reason); } /* dequeue all operations */ @@ -1985,8 +1987,7 @@ static int async_error_am_all(inet_descriptor* desc, ErlDrvTermData reason) ErlDrvTermData caller; while (deq_async(desc, &aid, &caller, &req) == 0) { - send_async_error(desc->port, desc->dport, aid, caller, - reason); + send_async_error(desc->dport, aid, caller, reason); } return 0; } @@ -2014,7 +2015,7 @@ static int inet_reply_ok(inet_descriptor* desc) ASSERT(i == sizeof(spec)/sizeof(*spec)); desc->caller = 0; - return driver_send_term(desc->port, caller, spec, i); + return erl_drv_send_term(desc->dport, caller, spec, i); } #ifdef HAVE_SCTP @@ -2033,7 +2034,7 @@ static int inet_reply_ok_port(inet_descriptor* desc, ErlDrvTermData dport) ASSERT(i == sizeof(spec)/sizeof(*spec)); desc->caller = 0; - return driver_send_term(desc->port, caller, spec, i); + return erl_drv_send_term(desc->dport, caller, spec, i); } #endif @@ -2056,7 +2057,7 @@ static int inet_reply_error_am(inet_descriptor* desc, ErlDrvTermData reason) desc->caller = 0; DEBUGF(("inet_reply_error_am %ld %ld\r\n", caller, reason)); - return driver_send_term(desc->port, caller, spec, i); + return erl_drv_send_term(desc->dport, caller, spec, i); } /* send: @@ -2165,12 +2166,12 @@ static int http_response_inetdrv(void *arg, int major, int minor, i = LOAD_TUPLE(spec, i, 2); i = LOAD_TUPLE(spec, i, 4); ASSERT(i<=27); - return driver_send_term(desc->inet.port, caller, spec, i); + return erl_drv_send_term(desc->inet.dport, caller, spec, i); } else { i = LOAD_TUPLE(spec, i, 3); ASSERT(i<=27); - return driver_output_term(desc->inet.port, spec, i); + return erl_drv_output_term(desc->inet.dport, spec, i); } } @@ -2262,12 +2263,12 @@ http_request_inetdrv(void* arg, const http_atom_t* meth, const char* meth_ptr, i = LOAD_TUPLE(spec, i, 2); i = LOAD_TUPLE(spec, i, 4); ASSERT(i <= 43); - return driver_send_term(desc->inet.port, caller, spec, i); + return erl_drv_send_term(desc->inet.dport, caller, spec, i); } else { i = LOAD_TUPLE(spec, i, 3); ASSERT(i <= 43); - return driver_output_term(desc->inet.port, spec, i); + return erl_drv_output_term(desc->inet.dport, spec, i); } } @@ -2316,12 +2317,12 @@ http_header_inetdrv(void* arg, const http_atom_t* name, const char* name_ptr, i = LOAD_TUPLE(spec, i, 2); i = LOAD_TUPLE(spec, i, 4); ASSERT(i <= 26); - return driver_send_term(desc->inet.port, caller, spec, i); + return erl_drv_send_term(desc->inet.dport, caller, spec, i); } else { i = LOAD_TUPLE(spec, i, 3); ASSERT(i <= 26); - return driver_output_term(desc->inet.port, spec, i); + return erl_drv_output_term(desc->inet.dport, spec, i); } } @@ -2347,7 +2348,7 @@ static int http_eoh_inetdrv(void* arg) i = LOAD_TUPLE(spec, i, 2); i = LOAD_TUPLE(spec, i, 4); ASSERT(i <= 14); - return driver_send_term(desc->inet.port, caller, spec, i); + return erl_drv_send_term(desc->inet.dport, caller, spec, i); } else { /* {http, S, http_eoh} */ @@ -2356,7 +2357,7 @@ static int http_eoh_inetdrv(void* arg) i = LOAD_ATOM(spec, i, am_http_eoh); i = LOAD_TUPLE(spec, i, 3); ASSERT(i <= 14); - return driver_output_term(desc->inet.port, spec, i); + return erl_drv_output_term(desc->inet.dport, spec, i); } } @@ -2384,7 +2385,7 @@ static int http_error_inetdrv(void* arg, const char* buf, int len) i = LOAD_TUPLE(spec, i, 2); i = LOAD_TUPLE(spec, i, 4); ASSERT(i <= 19); - return driver_send_term(desc->inet.port, caller, spec, i); + return erl_drv_send_term(desc->inet.dport, caller, spec, i); } else { /* {http, S, {http_error,Line} */ @@ -2395,7 +2396,7 @@ static int http_error_inetdrv(void* arg, const char* buf, int len) i = LOAD_TUPLE(spec, i, 2); i = LOAD_TUPLE(spec, i, 3); ASSERT(i <= 19); - return driver_output_term(desc->inet.port, spec, i); + return erl_drv_output_term(desc->inet.dport, spec, i); } } @@ -2448,11 +2449,11 @@ int ssl_tls_inetdrv(void* arg, unsigned type, unsigned major, unsigned minor, i = LOAD_TUPLE(spec, i, 2); i = LOAD_TUPLE(spec, i, 4); ASSERT(i <= 28); - ret = driver_send_term(desc->inet.port, caller, spec, i); + ret = erl_drv_send_term(desc->inet.dport, caller, spec, i); } else { ASSERT(i <= 28); - ret = driver_output_term(desc->inet.port, spec, i); + ret = erl_drv_output_term(desc->inet.dport, spec, i); } done: driver_free_binary(bin); @@ -2502,7 +2503,7 @@ static int inet_async_data(inet_descriptor* desc, const char* buf, int len) i = LOAD_TUPLE(spec, i, 4); ASSERT(i == 15); desc->caller = 0; - return driver_send_term(desc->port, caller, spec, i); + return erl_drv_send_term(desc->dport, caller, spec, i); } else { /* INET_MODE_BINARY => [H1,H2,...HSz | Binary] */ @@ -2516,7 +2517,7 @@ static int inet_async_data(inet_descriptor* desc, const char* buf, int len) i = LOAD_TUPLE(spec, i, 4); ASSERT(i <= 20); desc->caller = 0; - code = driver_send_term(desc->port, caller, spec, i); + code = erl_drv_send_term(desc->dport, caller, spec, i); return code; } } @@ -3109,7 +3110,7 @@ inet_async_binary_data ASSERT(i <= PACKET_ERL_DRV_TERM_DATA_LEN); desc->caller = 0; - return driver_send_term(desc->port, caller, spec, i); + return erl_drv_send_term(desc->dport, caller, spec, i); } /* @@ -3132,7 +3133,7 @@ static int tcp_message(inet_descriptor* desc, const char* buf, int len) i = LOAD_STRING(spec, i, buf, len); /* => [H1,H2,...Hn] */ i = LOAD_TUPLE(spec, i, 3); ASSERT(i <= 20); - return driver_output_term(desc->port, spec, i); + return erl_drv_output_term(desc->dport, spec, i); } else { /* INET_MODE_BINARY => [H1,H2,...HSz | Binary] */ @@ -3144,7 +3145,7 @@ static int tcp_message(inet_descriptor* desc, const char* buf, int len) i = LOAD_STRING_CONS(spec, i, buf, hsz); i = LOAD_TUPLE(spec, i, 3); ASSERT(i <= 20); - code = driver_output_term(desc->port, spec, i); + code = erl_drv_output_term(desc->dport, spec, i); return code; } } @@ -3179,7 +3180,7 @@ tcp_binary_message(inet_descriptor* desc, ErlDrvBinary* bin, int offs, int len) } i = LOAD_TUPLE(spec, i, 3); ASSERT(i <= 20); - return driver_output_term(desc->port, spec, i); + return erl_drv_output_term(desc->dport, spec, i); } /* @@ -3198,7 +3199,7 @@ static int tcp_closed_message(tcp_descriptor* desc) i = LOAD_PORT(spec, i, desc->inet.dport); i = LOAD_TUPLE(spec, i, 2); ASSERT(i <= 6); - return driver_output_term(desc->inet.port, spec, i); + return erl_drv_output_term(desc->inet.dport, spec, i); } return 0; } @@ -3219,7 +3220,7 @@ static int tcp_error_message(tcp_descriptor* desc, int err) i = LOAD_ATOM(spec, i, am_err); i = LOAD_TUPLE(spec, i, 3); ASSERT(i <= 8); - return driver_output_term(desc->inet.port, spec, i); + return erl_drv_output_term(desc->inet.dport, spec, i); } /* @@ -3310,7 +3311,7 @@ static int packet_binary_message /* Close up the outer 5-tuple: */ i = LOAD_TUPLE(spec, i, 5); ASSERT(i <= PACKET_ERL_DRV_TERM_DATA_LEN); - return driver_output_term(desc->port, spec, i); + return erl_drv_output_term(desc->dport, spec, i); } /* @@ -3337,7 +3338,7 @@ static int packet_error_message(udp_descriptor* udesc, int err) i = LOAD_ATOM(spec, i, am_err); i = LOAD_TUPLE(spec, i, 3); ASSERT(i == sizeof(spec)/sizeof(*spec)); - return driver_output_term(desc->port, spec, i); + return erl_drv_output_term(desc->dport, spec, i); } @@ -5465,6 +5466,28 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) } continue; + case INET_LOPT_TCP_MSGQ_HIWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT high; + if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN + || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival) + return -1; + high = (ErlDrvSizeT) ival; + erl_drv_busy_msgq_limits(desc->port, NULL, &high); + } + continue; + + case INET_LOPT_TCP_MSGQ_LOWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT low; + if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN + || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival) + return -1; + low = (ErlDrvSizeT) ival; + erl_drv_busy_msgq_limits(desc->port, &low, NULL); + } + continue; + case INET_LOPT_TCP_SEND_TIMEOUT: if (desc->stype == SOCK_STREAM) { tcp_descriptor* tdesc = (tcp_descriptor*) desc; @@ -6365,6 +6388,32 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc, } continue; + case INET_LOPT_TCP_MSGQ_HIWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT high = ERL_DRV_BUSY_MSGQ_READ_ONLY; + *ptr++ = opt; + erl_drv_busy_msgq_limits(desc->port, NULL, &high); + ival = high > INT_MAX ? INT_MAX : (int) high; + put_int32(ival, ptr); + } + else { + TRUNCATE_TO(0,ptr); + } + continue; + + case INET_LOPT_TCP_MSGQ_LOWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT low = ERL_DRV_BUSY_MSGQ_READ_ONLY; + *ptr++ = opt; + erl_drv_busy_msgq_limits(desc->port, &low, NULL); + ival = low > INT_MAX ? INT_MAX : (int) low; + put_int32(ival, ptr); + } + else { + TRUNCATE_TO(0,ptr); + } + continue; + case INET_LOPT_TCP_SEND_TIMEOUT: if (desc->stype == SOCK_STREAM) { *ptr++ = opt; @@ -7278,7 +7327,7 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc, i = LOAD_TUPLE(spec, i, 3); /* Now, convert "spec" into the returnable term: */ - driver_send_term(desc->port, driver_caller(desc->port), spec, i); + erl_drv_send_term(desc->dport, driver_caller(desc->port), spec, i); FREE(spec); (*dest)[0] = INET_REP; @@ -7360,7 +7409,7 @@ send_empty_out_q_msgs(inet_descriptor* desc) ASSERT(msg_len == sizeof(msg)/sizeof(*msg)); - send_to_subscribers(desc->port, + send_to_subscribers(desc->dport, &desc->empty_out_q_subs, 1, msg, @@ -8007,6 +8056,7 @@ static int tcp_inet_init(void) static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args) { + ErlDrvSizeT q_low, q_high; tcp_descriptor* desc; DEBUGF(("tcp_inet_start(%ld) {\r\n", (long)port)); @@ -8016,6 +8066,17 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args) return ERL_DRV_ERROR_ERRNO; desc->high = INET_HIGH_WATERMARK; desc->low = INET_LOW_WATERMARK; + q_high = INET_HIGH_MSGQ_WATERMARK; + q_low = INET_LOW_MSGQ_WATERMARK; + if (q_low < ERL_DRV_BUSY_MSGQ_LIM_MIN) + q_low = ERL_DRV_BUSY_MSGQ_LIM_MIN; + else if (q_low > ERL_DRV_BUSY_MSGQ_LIM_MAX) + q_low = ERL_DRV_BUSY_MSGQ_LIM_MAX; + if (q_high < ERL_DRV_BUSY_MSGQ_LIM_MIN) + q_high = ERL_DRV_BUSY_MSGQ_LIM_MIN; + else if (q_high > ERL_DRV_BUSY_MSGQ_LIM_MAX) + q_high = ERL_DRV_BUSY_MSGQ_LIM_MAX; + erl_drv_busy_msgq_limits(port, &q_low, &q_high); desc->send_timeout = INET_INFINITY; desc->send_timeout_close = 0; desc->busy_on_send = 0; @@ -8039,6 +8100,7 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args) static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s, ErlDrvTermData owner, int* err) { + ErlDrvSizeT q_low, q_high; ErlDrvPort port = desc->inet.port; tcp_descriptor* copy_desc; @@ -8076,6 +8138,13 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s, FREE(copy_desc); return NULL; } + + /* Read busy msgq limits of parent */ + q_low = q_high = ERL_DRV_BUSY_MSGQ_READ_ONLY; + erl_drv_busy_msgq_limits(desc->inet.port, &q_low, &q_high); + /* Write same busy msgq limits to child */ + erl_drv_busy_msgq_limits(port, &q_low, &q_high); + copy_desc->inet.port = port; copy_desc->inet.dport = driver_mk_port(port); *err = 0; @@ -8108,7 +8177,7 @@ static void tcp_close_check(tcp_descriptor* desc) desc->inet.state = INET_STATE_LISTENING; while (deq_multi_op(desc,&id,&req,&caller,NULL,&monitor) == 0) { driver_demonitor_process(desc->inet.port, &monitor); - send_async_error(desc->inet.port, desc->inet.dport, id, caller, am_closed); + send_async_error(desc->inet.dport, id, caller, am_closed); } clean_multi_timers(&(desc->mtd), desc->inet.port); } @@ -8532,7 +8601,7 @@ static void tcp_inet_multi_timeout(ErlDrvData e, ErlDrvTermData caller) sock_select(INETP(desc),FD_ACCEPT,0); desc->inet.state = INET_STATE_LISTENING; /* restore state */ } - send_async_error(desc->inet.port, desc->inet.dport, id, caller, am_timeout); + send_async_error(desc->inet.dport, id, caller, am_timeout); } @@ -9273,7 +9342,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) if (s == INVALID_SOCKET) { /* Not ERRNO_BLOCK, that's handled right away */ - ret = send_async_error(desc->inet.port, desc->inet.dport, + ret = send_async_error(desc->inet.dport, id, caller, error_atom(sock_errno())); goto done; } @@ -9283,7 +9352,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) if ((accept_desc = tcp_inet_copy(desc,s,caller,&err)) == NULL) { sock_close(s); - ret = send_async_error(desc->inet.port, desc->inet.dport, + ret = send_async_error(desc->inet.dport, id, caller, error_atom(err)); goto done; } @@ -9294,7 +9363,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) ERL_DRV_READ, 1); #endif accept_desc->inet.state = INET_STATE_CONNECTED; - ret = send_async_ok_port(desc->inet.port, desc->inet.dport, + ret = send_async_ok_port(desc->inet.dport, id, caller, accept_desc->inet.dport); } } @@ -10936,7 +11005,7 @@ subs_list *subs; static void send_to_subscribers ( - ErlDrvPort port, + ErlDrvTermData port, subs_list *subs, int free_subs, ErlDrvTermData msg[], @@ -10953,7 +11022,7 @@ static void send_to_subscribers this = subs; while(this) { - (void) driver_send_term(port, this->subscriber, msg, msg_len); + (void) erl_drv_send_term(port, this->subscriber, msg, msg_len); if(free_subs && !first) { next = this->next; |