From 9e4895da833b7777e69efc173f5dc777aaea3201 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Thu, 29 Nov 2012 01:24:43 +0100 Subject: Add support for busy port message queue --- erts/emulator/drivers/common/inet_drv.c | 72 +++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) (limited to 'erts/emulator/drivers') diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 4a87b0a385..f68ce68407 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -692,6 +692,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_LOPT_UDP_READ_PACKETS 33 /* Number of packets to read */ #define INET_OPT_RAW 34 /* Raw socket options */ #define INET_LOPT_TCP_SEND_TIMEOUT_CLOSE 35 /* auto-close on send timeout or not */ +#define INET_LOPT_TCP_MSGQ_HIWTRMRK 36 /* set local high watermark */ +#define INET_LOPT_TCP_MSGQ_LOWTRMRK 37 /* set local low watermark */ /* SCTP options: a separate range, from 100: */ #define SCTP_OPT_RTOINFO 100 #define SCTP_OPT_ASSOCINFO 101 @@ -808,6 +810,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy */ #define INET_LOW_WATERMARK (1024*4) /* 4k pending => allow more */ +#define INET_HIGH_MSGQ_WATERMARK (1024*8) /* 8k pending high => busy */ +#define INET_LOW_MSGQ_WATERMARK (1024*4) /* 4k pending => allow more */ #define INET_INFINITY 0xffffffff /* infinity value */ @@ -5537,6 +5541,28 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) } continue; + case INET_LOPT_TCP_MSGQ_HIWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT high; + if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN + || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival) + return -1; + high = (ErlDrvSizeT) ival; + erl_drv_busy_msgq_limits(desc->port, NULL, &high); + } + continue; + + case INET_LOPT_TCP_MSGQ_LOWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT low; + if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN + || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival) + return -1; + low = (ErlDrvSizeT) ival; + erl_drv_busy_msgq_limits(desc->port, &low, NULL); + } + continue; + case INET_LOPT_TCP_SEND_TIMEOUT: if (desc->stype == SOCK_STREAM) { tcp_descriptor* tdesc = (tcp_descriptor*) desc; @@ -6422,6 +6448,32 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc, } continue; + case INET_LOPT_TCP_MSGQ_HIWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT high = ERL_DRV_BUSY_MSGQ_READ_ONLY; + *ptr++ = opt; + erl_drv_busy_msgq_limits(desc->port, NULL, &high); + ival = high > INT_MAX ? INT_MAX : (int) high; + put_int32(ival, ptr); + } + else { + TRUNCATE_TO(0,ptr); + } + continue; + + case INET_LOPT_TCP_MSGQ_LOWTRMRK: + if (desc->stype == SOCK_STREAM) { + ErlDrvSizeT low = ERL_DRV_BUSY_MSGQ_READ_ONLY; + *ptr++ = opt; + erl_drv_busy_msgq_limits(desc->port, &low, NULL); + ival = low > INT_MAX ? INT_MAX : (int) low; + put_int32(ival, ptr); + } + else { + TRUNCATE_TO(0,ptr); + } + continue; + case INET_LOPT_TCP_SEND_TIMEOUT: if (desc->stype == SOCK_STREAM) { *ptr++ = opt; @@ -8029,6 +8081,7 @@ static int tcp_inet_init(void) static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args) { + ErlDrvSizeT q_low, q_high; tcp_descriptor* desc; DEBUGF(("tcp_inet_start(%ld) {\r\n", (long)port)); @@ -8038,6 +8091,17 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args) return ERL_DRV_ERROR_ERRNO; desc->high = INET_HIGH_WATERMARK; desc->low = INET_LOW_WATERMARK; + q_high = INET_HIGH_MSGQ_WATERMARK; + q_low = INET_LOW_MSGQ_WATERMARK; + if (q_low < ERL_DRV_BUSY_MSGQ_LIM_MIN) + q_low = ERL_DRV_BUSY_MSGQ_LIM_MIN; + else if (q_low > ERL_DRV_BUSY_MSGQ_LIM_MAX) + q_low = ERL_DRV_BUSY_MSGQ_LIM_MAX; + if (q_high < ERL_DRV_BUSY_MSGQ_LIM_MIN) + q_high = ERL_DRV_BUSY_MSGQ_LIM_MIN; + else if (q_high > ERL_DRV_BUSY_MSGQ_LIM_MAX) + q_high = ERL_DRV_BUSY_MSGQ_LIM_MAX; + erl_drv_busy_msgq_limits(port, &q_low, &q_high); desc->send_timeout = INET_INFINITY; desc->send_timeout_close = 0; desc->busy_on_send = 0; @@ -8061,6 +8125,7 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args) static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s, ErlDrvTermData owner, int* err) { + ErlDrvSizeT q_low, q_high; ErlDrvPort port = desc->inet.port; tcp_descriptor* copy_desc; @@ -8099,6 +8164,13 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s, FREE(copy_desc); return NULL; } + + /* Read busy msgq limits of parent */ + q_low = q_high = ERL_DRV_BUSY_MSGQ_READ_ONLY; + erl_drv_busy_msgq_limits(desc->inet.port, &q_low, &q_high); + /* Write same busy msgq limits to child */ + erl_drv_busy_msgq_limits(port, &q_low, &q_high); + copy_desc->inet.port = port; copy_desc->inet.dport = driver_mk_port(port); *err = 0; -- cgit v1.2.3