aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/drivers/common
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-11-29 01:24:43 +0100
committerRickard Green <[email protected]>2012-12-07 00:24:27 +0100
commit9e4895da833b7777e69efc173f5dc777aaea3201 (patch)
tree02338dbdbf0b449b7b643437bf2cdd0cf73e4615 /erts/emulator/drivers/common
parent43ebafb5fb40aee326b951d18c1880e6e5fdef6b (diff)
downloadotp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.gz
otp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.bz2
otp-9e4895da833b7777e69efc173f5dc777aaea3201.zip
Add support for busy port message queue
Diffstat (limited to 'erts/emulator/drivers/common')
-rw-r--r--erts/emulator/drivers/common/inet_drv.c72
1 files changed, 72 insertions, 0 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c
index 4a87b0a385..f68ce68407 100644
--- a/erts/emulator/drivers/common/inet_drv.c
+++ b/erts/emulator/drivers/common/inet_drv.c
@@ -692,6 +692,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_LOPT_UDP_READ_PACKETS 33 /* Number of packets to read */
#define INET_OPT_RAW 34 /* Raw socket options */
#define INET_LOPT_TCP_SEND_TIMEOUT_CLOSE 35 /* auto-close on send timeout or not */
+#define INET_LOPT_TCP_MSGQ_HIWTRMRK 36 /* set local high watermark */
+#define INET_LOPT_TCP_MSGQ_LOWTRMRK 37 /* set local low watermark */
/* SCTP options: a separate range, from 100: */
#define SCTP_OPT_RTOINFO 100
#define SCTP_OPT_ASSOCINFO 101
@@ -808,6 +810,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy */
#define INET_LOW_WATERMARK (1024*4) /* 4k pending => allow more */
+#define INET_HIGH_MSGQ_WATERMARK (1024*8) /* 8k pending high => busy */
+#define INET_LOW_MSGQ_WATERMARK (1024*4) /* 4k pending => allow more */
#define INET_INFINITY 0xffffffff /* infinity value */
@@ -5537,6 +5541,28 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
}
continue;
+ case INET_LOPT_TCP_MSGQ_HIWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT high;
+ if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN
+ || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival)
+ return -1;
+ high = (ErlDrvSizeT) ival;
+ erl_drv_busy_msgq_limits(desc->port, NULL, &high);
+ }
+ continue;
+
+ case INET_LOPT_TCP_MSGQ_LOWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT low;
+ if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN
+ || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival)
+ return -1;
+ low = (ErlDrvSizeT) ival;
+ erl_drv_busy_msgq_limits(desc->port, &low, NULL);
+ }
+ continue;
+
case INET_LOPT_TCP_SEND_TIMEOUT:
if (desc->stype == SOCK_STREAM) {
tcp_descriptor* tdesc = (tcp_descriptor*) desc;
@@ -6422,6 +6448,32 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc,
}
continue;
+ case INET_LOPT_TCP_MSGQ_HIWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT high = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ *ptr++ = opt;
+ erl_drv_busy_msgq_limits(desc->port, NULL, &high);
+ ival = high > INT_MAX ? INT_MAX : (int) high;
+ put_int32(ival, ptr);
+ }
+ else {
+ TRUNCATE_TO(0,ptr);
+ }
+ continue;
+
+ case INET_LOPT_TCP_MSGQ_LOWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT low = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ *ptr++ = opt;
+ erl_drv_busy_msgq_limits(desc->port, &low, NULL);
+ ival = low > INT_MAX ? INT_MAX : (int) low;
+ put_int32(ival, ptr);
+ }
+ else {
+ TRUNCATE_TO(0,ptr);
+ }
+ continue;
+
case INET_LOPT_TCP_SEND_TIMEOUT:
if (desc->stype == SOCK_STREAM) {
*ptr++ = opt;
@@ -8029,6 +8081,7 @@ static int tcp_inet_init(void)
static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
{
+ ErlDrvSizeT q_low, q_high;
tcp_descriptor* desc;
DEBUGF(("tcp_inet_start(%ld) {\r\n", (long)port));
@@ -8038,6 +8091,17 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
return ERL_DRV_ERROR_ERRNO;
desc->high = INET_HIGH_WATERMARK;
desc->low = INET_LOW_WATERMARK;
+ q_high = INET_HIGH_MSGQ_WATERMARK;
+ q_low = INET_LOW_MSGQ_WATERMARK;
+ if (q_low < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+ q_low = ERL_DRV_BUSY_MSGQ_LIM_MIN;
+ else if (q_low > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+ q_low = ERL_DRV_BUSY_MSGQ_LIM_MAX;
+ if (q_high < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+ q_high = ERL_DRV_BUSY_MSGQ_LIM_MIN;
+ else if (q_high > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+ q_high = ERL_DRV_BUSY_MSGQ_LIM_MAX;
+ erl_drv_busy_msgq_limits(port, &q_low, &q_high);
desc->send_timeout = INET_INFINITY;
desc->send_timeout_close = 0;
desc->busy_on_send = 0;
@@ -8061,6 +8125,7 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
ErlDrvTermData owner, int* err)
{
+ ErlDrvSizeT q_low, q_high;
ErlDrvPort port = desc->inet.port;
tcp_descriptor* copy_desc;
@@ -8099,6 +8164,13 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
FREE(copy_desc);
return NULL;
}
+
+ /* Read busy msgq limits of parent */
+ q_low = q_high = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ erl_drv_busy_msgq_limits(desc->inet.port, &q_low, &q_high);
+ /* Write same busy msgq limits to child */
+ erl_drv_busy_msgq_limits(port, &q_low, &q_high);
+
copy_desc->inet.port = port;
copy_desc->inet.dport = driver_mk_port(port);
*err = 0;