aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/drivers/common/inet_drv.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/drivers/common/inet_drv.c')
-rw-r--r--erts/emulator/drivers/common/inet_drv.c503
1 files changed, 281 insertions, 222 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c
index 8f4fff0f40..301ce2d0e2 100644
--- a/erts/emulator/drivers/common/inet_drv.c
+++ b/erts/emulator/drivers/common/inet_drv.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1997-2012. All Rights Reserved.
+ * Copyright Ericsson AB 1997-2013. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
@@ -284,27 +284,15 @@ static unsigned long one_value = 1;
#else
-#ifdef VXWORKS
-#include <sockLib.h>
-#include <sys/times.h>
-#include <iosLib.h>
-#include <taskLib.h>
-#include <selectLib.h>
-#include <ioLib.h>
-#else
#include <sys/time.h>
#ifdef NETDB_H_NEEDS_IN_H
#include <netinet/in.h>
#endif
#include <netdb.h>
-#endif
#include <sys/socket.h>
#include <netinet/in.h>
-#ifdef VXWORKS
-#include <rpc/rpctypes.h>
-#endif
#ifdef DEF_INADDR_LOOPBACK_IN_RPC_TYPES_H
#include <rpc/types.h>
#endif
@@ -312,12 +300,10 @@ static unsigned long one_value = 1;
#include <netinet/tcp.h>
#include <arpa/inet.h>
-#if (!defined(VXWORKS))
#include <sys/param.h>
#ifdef HAVE_ARPA_NAMESER_H
#include <arpa/nameser.h>
#endif
-#endif
#ifdef HAVE_SYS_SOCKIO_H
#include <sys/sockio.h>
@@ -331,7 +317,7 @@ static unsigned long one_value = 1;
/* SCTP support -- currently for UNIX platforms only: */
#undef HAVE_SCTP
-#if (!defined(VXWORKS) && !defined(__WIN32__) && defined(HAVE_SCTP_H))
+#if (!defined(__WIN32__) && defined(HAVE_SCTP_H))
#include <netinet/sctp.h>
@@ -478,15 +464,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define sock_connect(s, addr, len) connect((s), (addr), (len))
#define sock_listen(s, b) listen((s), (b))
#define sock_bind(s, addr, len) bind((s), (addr), (len))
-#ifdef VXWORKS
-#define sock_getopt(s,t,n,v,l) wrap_sockopt(&getsockopt,\
- s,t,n,v,(unsigned int)(l))
-#define sock_setopt(s,t,n,v,l) wrap_sockopt(&setsockopt,\
- s,t,n,v,(unsigned int)(l))
-#else
#define sock_getopt(s,t,n,v,l) getsockopt((s),(t),(n),(v),(l))
#define sock_setopt(s,t,n,v,l) setsockopt((s),(t),(n),(v),(l))
-#endif
#define sock_name(s, addr, len) getsockname((s), (addr), (len))
#define sock_peer(s, addr, len) getpeername((s), (addr), (len))
#define sock_ntohs(x) ntohs((x))
@@ -535,6 +514,12 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#endif /* __WIN32__ */
+#ifdef HAVE_SOCKLEN_T
+# define SOCKLEN_T socklen_t
+#else
+# define SOCKLEN_T int
+#endif
+
#include "packet_parser.h"
#define get_int24(s) ((((unsigned char*) (s))[0] << 16) | \
@@ -675,6 +660,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define UDP_OPT_MULTICAST_LOOP 13 /* set/get IP multicast loopback */
#define UDP_OPT_ADD_MEMBERSHIP 14 /* add an IP group membership */
#define UDP_OPT_DROP_MEMBERSHIP 15 /* drop an IP group membership */
+#define INET_OPT_IPV6_V6ONLY 16 /* IPv6 only socket, no mapped v4 addrs */
/* LOPT is local options */
#define INET_LOPT_BUFFER 20 /* min buffer size hint */
#define INET_LOPT_HEADER 21 /* list header size */
@@ -685,13 +671,15 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_LOPT_EXITONCLOSE 26 /* exit port on active close or not ! */
#define INET_LOPT_TCP_HIWTRMRK 27 /* set local high watermark */
#define INET_LOPT_TCP_LOWTRMRK 28 /* set local low watermark */
-#define INET_LOPT_BIT8 29 /* set 8 bit detection */
+ /* 29 unused */
#define INET_LOPT_TCP_SEND_TIMEOUT 30 /* set send timeout */
#define INET_LOPT_TCP_DELAY_SEND 31 /* Delay sends until next poll */
#define INET_LOPT_PACKET_SIZE 32 /* Max packet size */
#define INET_LOPT_UDP_READ_PACKETS 33 /* Number of packets to read */
#define INET_OPT_RAW 34 /* Raw socket options */
#define INET_LOPT_TCP_SEND_TIMEOUT_CLOSE 35 /* auto-close on send timeout or not */
+#define INET_LOPT_MSGQ_HIWTRMRK 36 /* set local msgq high watermark */
+#define INET_LOPT_MSGQ_LOWTRMRK 37 /* set local msgq low watermark */
/* SCTP options: a separate range, from 100: */
#define SCTP_OPT_RTOINFO 100
#define SCTP_OPT_ASSOCINFO 101
@@ -720,12 +708,6 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_IFOPT_FLAGS 6
#define INET_IFOPT_HWADDR 7
-/* INET_LOPT_BIT8 options */
-#define INET_BIT8_CLEAR 0
-#define INET_BIT8_SET 1
-#define INET_BIT8_ON 2
-#define INET_BIT8_OFF 3
-
/* INET_REQ_GETSTAT enumeration */
#define INET_STAT_RECV_CNT 1
#define INET_STAT_RECV_MAX 2
@@ -808,6 +790,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy */
#define INET_LOW_WATERMARK (1024*4) /* 4k pending => allow more */
+#define INET_HIGH_MSGQ_WATERMARK (1024*8) /* 8k pending high => busy */
+#define INET_LOW_MSGQ_WATERMARK (1024*4) /* 4k pending => allow more */
#define INET_INFINITY 0xffffffff /* infinity value */
@@ -899,7 +883,7 @@ typedef struct subs_list_ {
#define NO_PROCESS 0
#define NO_SUBSCRIBERS(SLP) ((SLP)->subscriber == NO_PROCESS)
-static void send_to_subscribers(ErlDrvPort, subs_list *, int,
+static void send_to_subscribers(ErlDrvTermData, subs_list *, int,
ErlDrvTermData [], int);
static void free_subscribers(subs_list*);
static int save_subscriber(subs_list *, ErlDrvTermData);
@@ -921,7 +905,6 @@ typedef struct {
int mode; /* BINARY | LIST
(affect how to interpret hsz) */
int exitf; /* exit port on close or not */
- int bit8f; /* check if data has bit number 7 set */
int deliver; /* Delivery mode, TERM or PORT */
ErlDrvTermData caller; /* recipient of sync reply */
@@ -940,8 +923,6 @@ typedef struct {
int sfamily; /* address family */
enum PacketParseType htype; /* header type (TCP only?) */
unsigned int psize; /* max packet size (TCP only?) */
- int bit8; /* set if bit8f==true and data some data
- seen had the 7th bit set */
inet_address remote; /* remote address for connected sockets */
inet_address peer_addr; /* fake peer address */
inet_address name_addr; /* fake local address */
@@ -952,12 +933,20 @@ typedef struct {
int bufsz; /* minimum buffer constraint */
unsigned int hsz; /* the list header size, -1 is large !!! */
/* statistics */
- unsigned long recv_oct[2]; /* number of received octets >= 64 bits */
+#ifdef ARCH_64
+ Uint64 recv_oct; /* number of received octets, 64 bits */
+#else
+ Uint32 recv_oct[2]; /* number of received octets, 64 bits */
+#endif
unsigned long recv_cnt; /* number of packets received */
unsigned long recv_max; /* maximum packet size received */
double recv_avg; /* average packet size received */
double recv_dvi; /* avarage deviation from avg_size */
- unsigned long send_oct[2]; /* number of octets sent >= 64 bits */
+#ifdef ARCH_64
+ Uint64 send_oct; /* number of octets sent, 64 bits */
+#else
+ Uint32 send_oct[2]; /* number of octets sent, 64 bits */
+#endif
unsigned long send_cnt; /* number of packets sent */
unsigned long send_max; /* maximum packet send */
double send_avg; /* average packet size sent */
@@ -1191,6 +1180,7 @@ static ErlDrvTermData am_reuseaddr;
static ErlDrvTermData am_dontroute;
static ErlDrvTermData am_priority;
static ErlDrvTermData am_tos;
+static ErlDrvTermData am_ipv6_v6only;
#endif
/* speical errors for bad ports and sequences */
@@ -1895,8 +1885,7 @@ static int deq_async(inet_descriptor* desc, int* ap, ErlDrvTermData* cp, int* rp
** {inet_async, Port, Ref, ok}
*/
static int
-send_async_ok(ErlDrvPort port, ErlDrvTermData Port, int Ref,
- ErlDrvTermData recipient)
+send_async_ok(ErlDrvTermData Port, int Ref,ErlDrvTermData recipient)
{
ErlDrvTermData spec[2*LOAD_ATOM_CNT + LOAD_PORT_CNT +
LOAD_INT_CNT + LOAD_TUPLE_CNT];
@@ -1910,14 +1899,14 @@ send_async_ok(ErlDrvPort port, ErlDrvTermData Port, int Ref,
ASSERT(i == sizeof(spec)/sizeof(*spec));
- return driver_send_term(port, recipient, spec, i);
+ return erl_drv_send_term(Port, recipient, spec, i);
}
/* send message:
** {inet_async, Port, Ref, {ok,Port2}}
*/
static int
-send_async_ok_port(ErlDrvPort port, ErlDrvTermData Port, int Ref,
+send_async_ok_port(ErlDrvTermData Port, int Ref,
ErlDrvTermData recipient, ErlDrvTermData Port2)
{
ErlDrvTermData spec[2*LOAD_ATOM_CNT + 2*LOAD_PORT_CNT +
@@ -1936,14 +1925,14 @@ send_async_ok_port(ErlDrvPort port, ErlDrvTermData Port, int Ref,
ASSERT(i == sizeof(spec)/sizeof(*spec));
- return driver_send_term(port, recipient, spec, i);
+ return erl_drv_send_term(Port, recipient, spec, i);
}
/* send message:
** {inet_async, Port, Ref, {error,Reason}}
*/
static int
-send_async_error(ErlDrvPort port, ErlDrvTermData Port, int Ref,
+send_async_error(ErlDrvTermData Port, int Ref,
ErlDrvTermData recipient, ErlDrvTermData Reason)
{
ErlDrvTermData spec[3*LOAD_ATOM_CNT + LOAD_PORT_CNT +
@@ -1961,7 +1950,7 @@ send_async_error(ErlDrvPort port, ErlDrvTermData Port, int Ref,
i = LOAD_TUPLE(spec, i, 4);
ASSERT(i == sizeof(spec)/sizeof(*spec));
DEBUGF(("send_async_error %ld %ld\r\n", recipient, Reason));
- return driver_send_term(port, recipient, spec, i);
+ return erl_drv_send_term(Port, recipient, spec, i);
}
@@ -1973,7 +1962,7 @@ static int async_ok(inet_descriptor* desc)
if (deq_async(desc, &aid, &caller, &req) < 0)
return -1;
- return send_async_ok(desc->port, desc->dport, aid, caller);
+ return send_async_ok(desc->dport, aid, caller);
}
static int async_ok_port(inet_descriptor* desc, ErlDrvTermData Port2)
@@ -1984,7 +1973,7 @@ static int async_ok_port(inet_descriptor* desc, ErlDrvTermData Port2)
if (deq_async(desc, &aid, &caller, &req) < 0)
return -1;
- return send_async_ok_port(desc->port, desc->dport, aid, caller, Port2);
+ return send_async_ok_port(desc->dport, aid, caller, Port2);
}
static int async_error_am(inet_descriptor* desc, ErlDrvTermData reason)
@@ -1995,8 +1984,7 @@ static int async_error_am(inet_descriptor* desc, ErlDrvTermData reason)
if (deq_async(desc, &aid, &caller, &req) < 0)
return -1;
- return send_async_error(desc->port, desc->dport, aid, caller,
- reason);
+ return send_async_error(desc->dport, aid, caller, reason);
}
/* dequeue all operations */
@@ -2007,8 +1995,7 @@ static int async_error_am_all(inet_descriptor* desc, ErlDrvTermData reason)
ErlDrvTermData caller;
while (deq_async(desc, &aid, &caller, &req) == 0) {
- send_async_error(desc->port, desc->dport, aid, caller,
- reason);
+ send_async_error(desc->dport, aid, caller, reason);
}
return 0;
}
@@ -2036,7 +2023,7 @@ static int inet_reply_ok(inet_descriptor* desc)
ASSERT(i == sizeof(spec)/sizeof(*spec));
desc->caller = 0;
- return driver_send_term(desc->port, caller, spec, i);
+ return erl_drv_send_term(desc->dport, caller, spec, i);
}
#ifdef HAVE_SCTP
@@ -2055,7 +2042,7 @@ static int inet_reply_ok_port(inet_descriptor* desc, ErlDrvTermData dport)
ASSERT(i == sizeof(spec)/sizeof(*spec));
desc->caller = 0;
- return driver_send_term(desc->port, caller, spec, i);
+ return erl_drv_send_term(desc->dport, caller, spec, i);
}
#endif
@@ -2078,7 +2065,7 @@ static int inet_reply_error_am(inet_descriptor* desc, ErlDrvTermData reason)
desc->caller = 0;
DEBUGF(("inet_reply_error_am %ld %ld\r\n", caller, reason));
- return driver_send_term(desc->port, caller, spec, i);
+ return erl_drv_send_term(desc->dport, caller, spec, i);
}
/* send:
@@ -2187,12 +2174,12 @@ static int http_response_inetdrv(void *arg, int major, int minor,
i = LOAD_TUPLE(spec, i, 2);
i = LOAD_TUPLE(spec, i, 4);
ASSERT(i<=27);
- return driver_send_term(desc->inet.port, caller, spec, i);
+ return erl_drv_send_term(desc->inet.dport, caller, spec, i);
}
else {
i = LOAD_TUPLE(spec, i, 3);
ASSERT(i<=27);
- return driver_output_term(desc->inet.port, spec, i);
+ return erl_drv_output_term(desc->inet.dport, spec, i);
}
}
@@ -2284,12 +2271,12 @@ http_request_inetdrv(void* arg, const http_atom_t* meth, const char* meth_ptr,
i = LOAD_TUPLE(spec, i, 2);
i = LOAD_TUPLE(spec, i, 4);
ASSERT(i <= 43);
- return driver_send_term(desc->inet.port, caller, spec, i);
+ return erl_drv_send_term(desc->inet.dport, caller, spec, i);
}
else {
i = LOAD_TUPLE(spec, i, 3);
ASSERT(i <= 43);
- return driver_output_term(desc->inet.port, spec, i);
+ return erl_drv_output_term(desc->inet.dport, spec, i);
}
}
@@ -2338,12 +2325,12 @@ http_header_inetdrv(void* arg, const http_atom_t* name, const char* name_ptr,
i = LOAD_TUPLE(spec, i, 2);
i = LOAD_TUPLE(spec, i, 4);
ASSERT(i <= 26);
- return driver_send_term(desc->inet.port, caller, spec, i);
+ return erl_drv_send_term(desc->inet.dport, caller, spec, i);
}
else {
i = LOAD_TUPLE(spec, i, 3);
ASSERT(i <= 26);
- return driver_output_term(desc->inet.port, spec, i);
+ return erl_drv_output_term(desc->inet.dport, spec, i);
}
}
@@ -2369,7 +2356,7 @@ static int http_eoh_inetdrv(void* arg)
i = LOAD_TUPLE(spec, i, 2);
i = LOAD_TUPLE(spec, i, 4);
ASSERT(i <= 14);
- return driver_send_term(desc->inet.port, caller, spec, i);
+ return erl_drv_send_term(desc->inet.dport, caller, spec, i);
}
else {
/* {http, S, http_eoh} */
@@ -2378,7 +2365,7 @@ static int http_eoh_inetdrv(void* arg)
i = LOAD_ATOM(spec, i, am_http_eoh);
i = LOAD_TUPLE(spec, i, 3);
ASSERT(i <= 14);
- return driver_output_term(desc->inet.port, spec, i);
+ return erl_drv_output_term(desc->inet.dport, spec, i);
}
}
@@ -2406,7 +2393,7 @@ static int http_error_inetdrv(void* arg, const char* buf, int len)
i = LOAD_TUPLE(spec, i, 2);
i = LOAD_TUPLE(spec, i, 4);
ASSERT(i <= 19);
- return driver_send_term(desc->inet.port, caller, spec, i);
+ return erl_drv_send_term(desc->inet.dport, caller, spec, i);
}
else {
/* {http, S, {http_error,Line} */
@@ -2417,7 +2404,7 @@ static int http_error_inetdrv(void* arg, const char* buf, int len)
i = LOAD_TUPLE(spec, i, 2);
i = LOAD_TUPLE(spec, i, 3);
ASSERT(i <= 19);
- return driver_output_term(desc->inet.port, spec, i);
+ return erl_drv_output_term(desc->inet.dport, spec, i);
}
}
@@ -2470,11 +2457,11 @@ int ssl_tls_inetdrv(void* arg, unsigned type, unsigned major, unsigned minor,
i = LOAD_TUPLE(spec, i, 2);
i = LOAD_TUPLE(spec, i, 4);
ASSERT(i <= 28);
- ret = driver_send_term(desc->inet.port, caller, spec, i);
+ ret = erl_drv_send_term(desc->inet.dport, caller, spec, i);
}
else {
ASSERT(i <= 28);
- ret = driver_output_term(desc->inet.port, spec, i);
+ ret = erl_drv_output_term(desc->inet.dport, spec, i);
}
done:
driver_free_binary(bin);
@@ -2524,7 +2511,7 @@ static int inet_async_data(inet_descriptor* desc, const char* buf, int len)
i = LOAD_TUPLE(spec, i, 4);
ASSERT(i == 15);
desc->caller = 0;
- return driver_send_term(desc->port, caller, spec, i);
+ return erl_drv_send_term(desc->dport, caller, spec, i);
}
else {
/* INET_MODE_BINARY => [H1,H2,...HSz | Binary] */
@@ -2538,7 +2525,7 @@ static int inet_async_data(inet_descriptor* desc, const char* buf, int len)
i = LOAD_TUPLE(spec, i, 4);
ASSERT(i <= 20);
desc->caller = 0;
- code = driver_send_term(desc->port, caller, spec, i);
+ code = erl_drv_send_term(desc->dport, caller, spec, i);
return code;
}
}
@@ -3131,7 +3118,7 @@ inet_async_binary_data
ASSERT(i <= PACKET_ERL_DRV_TERM_DATA_LEN);
desc->caller = 0;
- return driver_send_term(desc->port, caller, spec, i);
+ return erl_drv_send_term(desc->dport, caller, spec, i);
}
/*
@@ -3154,7 +3141,7 @@ static int tcp_message(inet_descriptor* desc, const char* buf, int len)
i = LOAD_STRING(spec, i, buf, len); /* => [H1,H2,...Hn] */
i = LOAD_TUPLE(spec, i, 3);
ASSERT(i <= 20);
- return driver_output_term(desc->port, spec, i);
+ return erl_drv_output_term(desc->dport, spec, i);
}
else {
/* INET_MODE_BINARY => [H1,H2,...HSz | Binary] */
@@ -3166,7 +3153,7 @@ static int tcp_message(inet_descriptor* desc, const char* buf, int len)
i = LOAD_STRING_CONS(spec, i, buf, hsz);
i = LOAD_TUPLE(spec, i, 3);
ASSERT(i <= 20);
- code = driver_output_term(desc->port, spec, i);
+ code = erl_drv_output_term(desc->dport, spec, i);
return code;
}
}
@@ -3201,7 +3188,7 @@ tcp_binary_message(inet_descriptor* desc, ErlDrvBinary* bin, int offs, int len)
}
i = LOAD_TUPLE(spec, i, 3);
ASSERT(i <= 20);
- return driver_output_term(desc->port, spec, i);
+ return erl_drv_output_term(desc->dport, spec, i);
}
/*
@@ -3220,7 +3207,7 @@ static int tcp_closed_message(tcp_descriptor* desc)
i = LOAD_PORT(spec, i, desc->inet.dport);
i = LOAD_TUPLE(spec, i, 2);
ASSERT(i <= 6);
- return driver_output_term(desc->inet.port, spec, i);
+ return erl_drv_output_term(desc->inet.dport, spec, i);
}
return 0;
}
@@ -3241,7 +3228,7 @@ static int tcp_error_message(tcp_descriptor* desc, int err)
i = LOAD_ATOM(spec, i, am_err);
i = LOAD_TUPLE(spec, i, 3);
ASSERT(i <= 8);
- return driver_output_term(desc->inet.port, spec, i);
+ return erl_drv_output_term(desc->inet.dport, spec, i);
}
/*
@@ -3332,7 +3319,7 @@ static int packet_binary_message
/* Close up the outer 5-tuple: */
i = LOAD_TUPLE(spec, i, 5);
ASSERT(i <= PACKET_ERL_DRV_TERM_DATA_LEN);
- return driver_output_term(desc->port, spec, i);
+ return erl_drv_output_term(desc->dport, spec, i);
}
/*
@@ -3359,21 +3346,10 @@ static int packet_error_message(udp_descriptor* udesc, int err)
i = LOAD_ATOM(spec, i, am_err);
i = LOAD_TUPLE(spec, i, 3);
ASSERT(i == sizeof(spec)/sizeof(*spec));
- return driver_output_term(desc->port, spec, i);
+ return erl_drv_output_term(desc->dport, spec, i);
}
-/* scan buffer for bit 7 */
-static void scanbit8(inet_descriptor* desc, const char* buf, int len)
-{
- int c;
-
- if (!desc->bit8f || desc->bit8) return;
- c = 0;
- while(len--) c |= *buf++;
- desc->bit8 = ((c & 0x80) != 0);
-}
-
/*
** active=TRUE:
** (NOTE! distribution MUST use active=TRUE, deliver=PORT)
@@ -3391,8 +3367,6 @@ static int tcp_reply_data(tcp_descriptor* desc, char* buf, int len)
packet_get_body(desc->inet.htype, &body, &bodylen);
- scanbit8(INETP(desc), body, bodylen);
-
if (desc->inet.deliver == INET_DELIVER_PORT) {
code = inet_port_data(INETP(desc), body, bodylen);
}
@@ -3424,8 +3398,6 @@ tcp_reply_binary_data(tcp_descriptor* desc, ErlDrvBinary* bin, int offs, int len
packet_get_body(desc->inet.htype, &body, &bodylen);
offs = body - bin->orig_bytes; /* body offset now */
- scanbit8(INETP(desc), body, bodylen);
-
if (desc->inet.deliver == INET_DELIVER_PORT)
code = inet_port_binary_data(INETP(desc), bin, offs, bodylen);
else if ((code=packet_parse(desc->inet.htype, buf, len, &desc->http_state,
@@ -3451,8 +3423,6 @@ packet_reply_binary_data(inet_descriptor* desc, unsigned int hsz,
{
int code;
- scanbit8(desc, bin->orig_bytes+offs, len);
-
if (desc->active == INET_PASSIVE)
/* "inet" is actually for both UDP and SCTP, as well as TCP! */
return inet_async_binary_data(desc, hsz, bin, offs, len, extra);
@@ -3527,6 +3497,7 @@ static void inet_init_sctp(void) {
INIT_ATOM(dontroute);
INIT_ATOM(priority);
INIT_ATOM(tos);
+ INIT_ATOM(ipv6_v6only);
/* Option names */
INIT_ATOM(sctp_rtoinfo);
@@ -3883,8 +3854,10 @@ static void desc_close(inet_descriptor* desc)
desc->forced_events = 0;
desc->send_would_block = 0;
#endif
- // We should close the fd here, but the other driver might still
- // be selecting on it.
+ /*
+ * We should close the fd here, but the other driver might still
+ * be selecting on it.
+ */
if (!desc->is_ignored)
driver_select(desc->port,(ErlDrvEvent)(long)desc->event,
ERL_DRV_USE, 0);
@@ -5313,50 +5286,6 @@ static ErlDrvSSizeT inet_ctl_getifaddrs(inet_descriptor* desc_p,
#endif
-
-
-#ifdef VXWORKS
-/*
-** THIS is a terrible creature, a bug in the TCP part
-** of the old VxWorks stack (non SENS) created a race.
-** If (and only if?) a socket got closed from the other
-** end and we tried a set/getsockopt on the TCP level,
-** the task would generate a bus error...
-*/
-static STATUS wrap_sockopt(STATUS (*function)() /* Yep, no parameter
- check */,
- int s, int level, int optname,
- char *optval, unsigned int optlen
- /* optlen is a pointer if function
- is getsockopt... */)
-{
- fd_set rs;
- struct timeval timeout;
- int to_read;
- int ret;
-
- FD_ZERO(&rs);
- FD_SET(s,&rs);
- memset(&timeout,0,sizeof(timeout));
- if (level == IPPROTO_TCP) {
- taskLock();
- if (select(s+1,&rs,NULL,NULL,&timeout)) {
- if (ioctl(s,FIONREAD,(int)&to_read) == ERROR ||
- to_read == 0) { /* End of file, other end closed? */
- sock_errno() = EBADF;
- taskUnlock();
- return ERROR;
- }
- }
- ret = (*function)(s,level,optname,optval,optlen);
- taskUnlock();
- } else {
- ret = (*function)(s,level,optname,optval,optlen);
- }
- return ret;
-}
-#endif
-
/* Per H @ Tail-f: The original code here had problems that possibly
only occur if you abuse it for non-INET sockets, but anyway:
a) If the getsockopt for SO_PRIORITY or IP_TOS failed, the actual
@@ -5382,13 +5311,8 @@ static int setopt_prio_tos_trick
int res;
int res_prio;
int res_tos;
-#ifdef HAVE_SOCKLEN_T
- socklen_t
-#else
- int
-#endif
- tmp_arg_sz_prio = sizeof(tmp_ival_prio),
- tmp_arg_sz_tos = sizeof(tmp_ival_tos);
+ SOCKLEN_T tmp_arg_sz_prio = sizeof(tmp_ival_prio);
+ SOCKLEN_T tmp_arg_sz_tos = sizeof(tmp_ival_tos);
res_prio = sock_getopt(fd, SOL_SOCKET, SO_PRIORITY,
(char *) &tmp_ival_prio, &tmp_arg_sz_prio);
@@ -5532,29 +5456,6 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
desc->exitf = ival;
continue;
- case INET_LOPT_BIT8:
- DEBUGF(("inet_set_opts(%ld): s=%d, BIT8=%d\r\n",
- (long)desc->port, desc->s, ival));
- switch(ival) {
- case INET_BIT8_ON:
- desc->bit8f = 1;
- desc->bit8 = 0;
- break;
- case INET_BIT8_OFF:
- desc->bit8f = 0;
- desc->bit8 = 0;
- break;
- case INET_BIT8_CLEAR:
- desc->bit8f = 1;
- desc->bit8 = 0;
- break;
- case INET_BIT8_SET:
- desc->bit8f = 1;
- desc->bit8 = 1;
- break;
- }
- continue;
-
case INET_LOPT_TCP_HIWTRMRK:
if (desc->stype == SOCK_STREAM) {
tcp_descriptor* tdesc = (tcp_descriptor*) desc;
@@ -5575,6 +5476,26 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
}
continue;
+ case INET_LOPT_MSGQ_HIWTRMRK: {
+ ErlDrvSizeT high;
+ if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN
+ || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival)
+ return -1;
+ high = (ErlDrvSizeT) ival;
+ erl_drv_busy_msgq_limits(desc->port, NULL, &high);
+ continue;
+ }
+
+ case INET_LOPT_MSGQ_LOWTRMRK: {
+ ErlDrvSizeT low;
+ if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN
+ || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival)
+ return -1;
+ low = (ErlDrvSizeT) ival;
+ erl_drv_busy_msgq_limits(desc->port, &low, NULL);
+ continue;
+ }
+
case INET_LOPT_TCP_SEND_TIMEOUT:
if (desc->stype == SOCK_STREAM) {
tcp_descriptor* tdesc = (tcp_descriptor*) desc;
@@ -5636,23 +5557,11 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
case INET_OPT_SNDBUF: type = SO_SNDBUF;
DEBUGF(("inet_set_opts(%ld): s=%d, SO_SNDBUF=%d\r\n",
(long)desc->port, desc->s, ival));
- /*
- * Setting buffer sizes in VxWorks gives unexpected results
- * our workaround is to leave it at default.
- */
-#ifdef VXWORKS
- goto skip_os_setopt;
-#else
break;
-#endif
case INET_OPT_RCVBUF: type = SO_RCVBUF;
DEBUGF(("inet_set_opts(%ld): s=%d, SO_RCVBUF=%d\r\n",
(long)desc->port, desc->s, ival));
-#ifdef VXWORKS
- goto skip_os_setopt;
-#else
break;
-#endif
case INET_OPT_LINGER: type = SO_LINGER;
if (len < 4)
return -1;
@@ -5743,6 +5652,23 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
#endif /* HAVE_MULTICAST_SUPPORT */
+ case INET_OPT_IPV6_V6ONLY:
+#if HAVE_DECL_IPV6_V6ONLY
+ proto = IPPROTO_IPV6;
+ type = IPV6_V6ONLY;
+ propagate = 1;
+ DEBUGF(("inet_set_opts(%ld): s=%d, IPV6_V6ONLY=%d\r\n",
+ (long)desc->port, desc->s, ival));
+ break;
+#elif defined(__WIN32__) && defined(HAVE_IN6) && defined(AF_INET6)
+ /* Fake a'la OpenBSD; set to 'true' is fine but 'false' invalid. */
+ if (ival != 0) continue;
+ else return -1;
+ break;
+#else
+ continue;
+#endif
+
case INET_OPT_RAW:
if (len < 8) {
return -1;
@@ -5774,9 +5700,6 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
}
DEBUGF(("inet_set_opts(%ld): s=%d returned %d\r\n",
(long)desc->port, desc->s, res));
-#ifdef VXWORKS
-skip_os_setopt:
-#endif
if (type == SO_RCVBUF) {
/* make sure we have desc->bufsz >= SO_RCVBUF */
if (ival > desc->bufsz)
@@ -6075,6 +5998,22 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len)
continue; /* Option not supported -- ignore it */
# endif
+ case INET_OPT_IPV6_V6ONLY:
+# if HAVE_DECL_IPV6_V6ONLY
+ {
+ arg.ival= get_int32 (curr); curr += 4;
+ proto = IPPROTO_IPV6;
+ type = IPV6_V6ONLY;
+ arg_ptr = (char*) (&arg.ival);
+ arg_sz = sizeof ( arg.ival);
+ break;
+ }
+# elif defined(__WIN32__) && defined(HAVE_IN6) && defined(AF_INET6)
+# error Here is a fix for Win IPv6 SCTP missing
+# else
+ continue; /* Option not supported -- ignore it */
+# endif
+
case SCTP_OPT_AUTOCLOSE:
{
arg.ival= get_int32 (curr); curr += 4;
@@ -6437,15 +6376,6 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc,
put_int32(desc->exitf, ptr);
continue;
- case INET_LOPT_BIT8:
- *ptr++ = opt;
- if (desc->bit8f) {
- put_int32(desc->bit8, ptr);
- } else {
- put_int32(INET_BIT8_OFF, ptr);
- }
- continue;
-
case INET_LOPT_TCP_HIWTRMRK:
if (desc->stype == SOCK_STREAM) {
*ptr++ = opt;
@@ -6466,6 +6396,24 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc,
}
continue;
+ case INET_LOPT_MSGQ_HIWTRMRK: {
+ ErlDrvSizeT high = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ *ptr++ = opt;
+ erl_drv_busy_msgq_limits(desc->port, NULL, &high);
+ ival = high > INT_MAX ? INT_MAX : (int) high;
+ put_int32(ival, ptr);
+ continue;
+ }
+
+ case INET_LOPT_MSGQ_LOWTRMRK: {
+ ErlDrvSizeT low = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ *ptr++ = opt;
+ erl_drv_busy_msgq_limits(desc->port, &low, NULL);
+ ival = low > INT_MAX ? INT_MAX : (int) low;
+ put_int32(ival, ptr);
+ continue;
+ }
+
case INET_LOPT_TCP_SEND_TIMEOUT:
if (desc->stype == SOCK_STREAM) {
*ptr++ = opt;
@@ -6572,6 +6520,22 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc,
break;
#endif /* HAVE_MULTICAST_SUPPORT */
+ case INET_OPT_IPV6_V6ONLY:
+#if HAVE_DECL_IPV6_V6ONLY
+ proto = IPPROTO_IPV6;
+ type = IPV6_V6ONLY;
+ break;
+#elif defined(__WIN32__) && defined(HAVE_IN6) && defined(AF_INET6)
+ /* Fake reading 'true' */
+ *ptr++ = opt;
+ put_int32(1, ptr);
+ ptr += 4;
+ continue;
+#else
+ TRUNCATE_TO(0,ptr);
+ continue; /* skip - no result */
+#endif
+
case INET_OPT_RAW:
{
int data_provided;
@@ -6876,6 +6840,7 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc,
case INET_OPT_DONTROUTE:
case INET_OPT_PRIORITY :
case INET_OPT_TOS :
+ case INET_OPT_IPV6_V6ONLY:
case SCTP_OPT_AUTOCLOSE:
case SCTP_OPT_MAXSEG :
/* The following options return true or false: */
@@ -6948,6 +6913,20 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc,
continue;
# endif
}
+ case INET_OPT_IPV6_V6ONLY:
+# if HAVE_DECL_IPV6_V6ONLY
+ {
+ proto = IPPROTO_IPV6;
+ type = IPV6_V6ONLY;
+ tag = am_ipv6_v6only;
+ break;
+ }
+# elif defined(__WIN32__) && defined(HAVE_IN6) && defined(AF_INET6)
+# error Here is a fix for Win IPv6 SCTP needed
+# else
+ /* Not supported -- ignore */
+ continue;
+# endif
case SCTP_OPT_AUTOCLOSE:
{
proto = IPPROTO_SCTP;
@@ -7348,7 +7327,7 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc,
i = LOAD_TUPLE(spec, i, 3);
/* Now, convert "spec" into the returnable term: */
- driver_send_term(desc->port, driver_caller(desc->port), spec, i);
+ erl_drv_send_term(desc->dport, driver_caller(desc->port), spec, i);
FREE(spec);
(*dest)[0] = INET_REP;
@@ -7398,13 +7377,21 @@ static ErlDrvSSizeT inet_fill_stat(inet_descriptor* desc,
val = (unsigned long) driver_sizeq(desc->port);
break;
case INET_STAT_RECV_OCT:
+#ifdef ARCH_64
+ put_int64(desc->recv_oct, dst); /* write it all */
+#else
put_int32(desc->recv_oct[1], dst); /* write high 32bit */
put_int32(desc->recv_oct[0], dst+4); /* write low 32bit */
+#endif
dst += 8;
continue;
case INET_STAT_SEND_OCT:
+#ifdef ARCH_64
+ put_int64(desc->send_oct, dst); /* write it all */
+#else
put_int32(desc->send_oct[1], dst); /* write high 32bit */
put_int32(desc->send_oct[0], dst+4); /* write low 32bit */
+#endif
dst += 8;
continue;
default: return -1; /* invalid argument */
@@ -7430,7 +7417,7 @@ send_empty_out_q_msgs(inet_descriptor* desc)
ASSERT(msg_len == sizeof(msg)/sizeof(*msg));
- send_to_subscribers(desc->port,
+ send_to_subscribers(desc->dport,
&desc->empty_out_q_subs,
1,
msg,
@@ -7474,6 +7461,20 @@ static void inet_stop(inet_descriptor* desc)
FREE(desc);
}
+static void set_default_msgq_limits(ErlDrvPort port)
+{
+ ErlDrvSizeT q_high = INET_HIGH_MSGQ_WATERMARK;
+ ErlDrvSizeT q_low = INET_LOW_MSGQ_WATERMARK;
+ if (q_low < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+ q_low = ERL_DRV_BUSY_MSGQ_LIM_MIN;
+ else if (q_low > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+ q_low = ERL_DRV_BUSY_MSGQ_LIM_MAX;
+ if (q_high < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+ q_high = ERL_DRV_BUSY_MSGQ_LIM_MIN;
+ else if (q_high > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+ q_high = ERL_DRV_BUSY_MSGQ_LIM_MAX;
+ erl_drv_busy_msgq_limits(port, &q_low, &q_high);
+}
/* Allocate descriptor */
static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol)
@@ -7504,8 +7505,6 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol)
desc->mode = INET_MODE_LIST; /* list mode */
desc->exitf = 1; /* exit port when close on active
socket */
- desc->bit8f = 0;
- desc->bit8 = 0;
desc->deliver = INET_DELIVER_TERM; /* standard term format */
desc->active = INET_PASSIVE; /* start passive */
desc->oph = NULL;
@@ -7514,12 +7513,20 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol)
desc->peer_ptr = NULL;
desc->name_ptr = NULL;
+#ifdef ARCH_64
+ desc->recv_oct = 0;
+#else
desc->recv_oct[0] = desc->recv_oct[1] = 0;
+#endif
desc->recv_cnt = 0;
desc->recv_max = 0;
desc->recv_avg = 0.0;
desc->recv_dvi = 0.0;
+#ifdef ARCH_64
+ desc->send_oct = 0;
+#else
desc->send_oct[0] = desc->send_oct[1] = 0;
+#endif
desc->send_cnt = 0;
desc->send_max = 0;
desc->send_avg = 0.0;
@@ -7813,8 +7820,8 @@ static ErlDrvSSizeT inet_ctl(inet_descriptor* desc, int cmd, char* buf,
desc->state = INET_STATE_BOUND;
if ((port = inet_address_port(&local)) == 0) {
- len = sizeof(local);
- sock_name(desc->s, (struct sockaddr*) &local, (unsigned int*)&len);
+ SOCKLEN_T adrlen = sizeof(local);
+ sock_name(desc->s, &local.sa, &adrlen);
port = inet_address_port(&local);
}
port = sock_ntohs(port);
@@ -7833,7 +7840,7 @@ static ErlDrvSSizeT inet_ctl(inet_descriptor* desc, int cmd, char* buf,
if (!IS_CONNECTED(desc))
return ctl_error(ENOTCONN, rbuf, rsize);
- if (!desc->stype == SOCK_STREAM)
+ if (desc->stype != SOCK_STREAM)
return ctl_error(EINVAL, rbuf, rsize);
if (*buf == 1 && !desc->is_ignored) {
@@ -7849,8 +7856,6 @@ static ErlDrvSSizeT inet_ctl(inet_descriptor* desc, int cmd, char* buf,
return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
}
-#ifndef VXWORKS
-
case INET_REQ_GETSERVBYNAME: { /* L1 Name-String L2 Proto-String */
char namebuf[256];
char protobuf[256];
@@ -7901,8 +7906,6 @@ static ErlDrvSSizeT inet_ctl(inet_descriptor* desc, int cmd, char* buf,
return ctl_reply(INET_REP_OK, srv->s_name, len, rbuf, rsize);
}
-#endif /* !VXWORKS */
-
default:
return ctl_xerror(EXBADPORT, rbuf, rsize);
}
@@ -7912,14 +7915,19 @@ static ErlDrvSSizeT inet_ctl(inet_descriptor* desc, int cmd, char* buf,
static void inet_output_count(inet_descriptor* desc, ErlDrvSizeT len)
{
unsigned long n = desc->send_cnt + 1;
- unsigned long t = desc->send_oct[0] + len;
+#ifndef ARCH_64
+ Uint32 t = desc->send_oct[0] + len;
int c = (t < desc->send_oct[0]);
+#endif
double avg = desc->send_avg;
- /* at least 64 bit octet count */
+#ifdef ARCH_64
+ desc->send_oct += len;
+#else
+ /* 64 bit octet count in 32 bit words */
desc->send_oct[0] = t;
desc->send_oct[1] += c;
-
+#endif
if (n == 0) /* WRAP, use old avg as input to a new sequence */
n = 1;
desc->send_avg += (len - avg) / n;
@@ -7932,14 +7940,20 @@ static void inet_output_count(inet_descriptor* desc, ErlDrvSizeT len)
static void inet_input_count(inet_descriptor* desc, ErlDrvSizeT len)
{
unsigned long n = desc->recv_cnt + 1;
- unsigned long t = desc->recv_oct[0] + len;
+#ifndef ARCH_64
+ Uint32 t = (desc->recv_oct[0] + len);
int c = (t < desc->recv_oct[0]);
+#endif
double avg = desc->recv_avg;
double dvi;
- /* at least 64 bit octet count */
+#ifdef ARCH_64
+ desc->recv_oct += len;
+#else
+ /* 64 bit octet count in 32 bit words */
desc->recv_oct[0] = t;
desc->recv_oct[1] += c;
+#endif
if (n == 0) /* WRAP */
n = 1;
@@ -8081,7 +8095,7 @@ static int tcp_inet_init(void)
/* initialize the TCP descriptor */
-static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
+static ErlDrvData prep_tcp_inet_start(ErlDrvPort port, char* args)
{
tcp_descriptor* desc;
DEBUGF(("tcp_inet_start(%ld) {\r\n", (long)port));
@@ -8108,6 +8122,12 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
return (ErlDrvData) desc;
}
+static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
+{
+ ErlDrvData data = prep_tcp_inet_start(port, args);
+ set_default_msgq_limits(port);
+ return data;
+}
/* Copy a descriptor, by creating a new port with same settings
* as the descriptor desc.
* return NULL on error (SYSTEM_LIMIT no ports avail)
@@ -8115,10 +8135,11 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
ErlDrvTermData owner, int* err)
{
+ ErlDrvSizeT q_low, q_high;
ErlDrvPort port = desc->inet.port;
tcp_descriptor* copy_desc;
- copy_desc = (tcp_descriptor*) tcp_inet_start(port, NULL);
+ copy_desc = (tcp_descriptor*) prep_tcp_inet_start(port, NULL);
/* Setup event if needed */
if ((copy_desc->inet.s = s) != INVALID_SOCKET) {
@@ -8133,7 +8154,6 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
/* Some flags must be inherited at this point */
copy_desc->inet.mode = desc->inet.mode;
copy_desc->inet.exitf = desc->inet.exitf;
- copy_desc->inet.bit8f = desc->inet.bit8f;
copy_desc->inet.deliver = desc->inet.deliver;
copy_desc->inet.htype = desc->inet.htype;
copy_desc->inet.psize = desc->inet.psize;
@@ -8153,6 +8173,13 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
FREE(copy_desc);
return NULL;
}
+
+ /* Read busy msgq limits of parent */
+ q_low = q_high = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ erl_drv_busy_msgq_limits(desc->inet.port, &q_low, &q_high);
+ /* Write same busy msgq limits to child */
+ erl_drv_busy_msgq_limits(port, &q_low, &q_high);
+
copy_desc->inet.port = port;
copy_desc->inet.dport = driver_mk_port(port);
*err = 0;
@@ -8185,7 +8212,7 @@ static void tcp_close_check(tcp_descriptor* desc)
desc->inet.state = INET_STATE_LISTENING;
while (deq_multi_op(desc,&id,&req,&caller,NULL,&monitor) == 0) {
driver_demonitor_process(desc->inet.port, &monitor);
- send_async_error(desc->inet.port, desc->inet.dport, id, caller, am_closed);
+ send_async_error(desc->inet.dport, id, caller, am_closed);
}
clean_multi_timers(&(desc->mtd), desc->inet.port);
}
@@ -8609,7 +8636,7 @@ static void tcp_inet_multi_timeout(ErlDrvData e, ErlDrvTermData caller)
sock_select(INETP(desc),FD_ACCEPT,0);
desc->inet.state = INET_STATE_LISTENING; /* restore state */
}
- send_async_error(desc->inet.port, desc->inet.dport, id, caller, am_timeout);
+ send_async_error(desc->inet.dport, id, caller, am_timeout);
}
@@ -8789,7 +8816,7 @@ static int tcp_recv_error(tcp_descriptor* desc, int err)
if (desc->inet.exitf)
driver_exit(desc->inet.port, err);
else
- desc_close(INETP(desc));
+ desc_close_read(INETP(desc));
}
return -1;
}
@@ -9350,7 +9377,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event)
if (s == INVALID_SOCKET) { /* Not ERRNO_BLOCK, that's handled right away */
- ret = send_async_error(desc->inet.port, desc->inet.dport,
+ ret = send_async_error(desc->inet.dport,
id, caller, error_atom(sock_errno()));
goto done;
}
@@ -9360,7 +9387,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event)
if ((accept_desc = tcp_inet_copy(desc,s,caller,&err)) == NULL) {
sock_close(s);
- ret = send_async_error(desc->inet.port, desc->inet.dport,
+ ret = send_async_error(desc->inet.dport,
id, caller, error_atom(err));
goto done;
}
@@ -9371,7 +9398,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event)
ERL_DRV_READ, 1);
#endif
accept_desc->inet.state = INET_STATE_CONNECTED;
- ret = send_async_ok_port(desc->inet.port, desc->inet.dport,
+ ret = send_async_ok_port(desc->inet.dport,
id, caller, accept_desc->inet.dport);
}
}
@@ -9731,6 +9758,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\r\n",
(long)desc->inet.port, desc->inet.s, vsize));
if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, iov, vsize, &n, 0))) {
+ write_error:
if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) {
DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\r\n",
(long)desc->inet.port, vsize, sock_errno()));
@@ -9741,6 +9769,22 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
desc->inet.send_would_block = 1;
#endif
goto done;
+ } else if (n == 0) { /* Workaround for redhat/CentOS 6.3 returning
+ 0 when sending packets with
+ sizes > (max 32 bit signed int) */
+ size_t howmuch = 0x7FFFFFFF; /* max signed 32 bit */
+ int x;
+ for(x = 0; x < vsize && iov[x].iov_len == 0; ++x)
+ ;
+ if (x < vsize) {
+ if (howmuch > iov[x].iov_len) {
+ howmuch = iov[x].iov_len;
+ }
+ n = sock_send(desc->inet.s, iov[x].iov_base,howmuch,0);
+ if (IS_SOCKET_ERROR(n)) {
+ goto write_error;
+ }
+ }
}
if (driver_deq(ix, n) <= desc->low) {
if (IS_BUSY(INETP(desc))) {
@@ -9818,12 +9862,15 @@ static int should_use_so_bsdcompat(void)
* as the descriptor desc.
* return NULL on error (ENFILE no ports avail)
*/
+static ErlDrvData packet_inet_start(ErlDrvPort port, char* args, int protocol);
+
static udp_descriptor* sctp_inet_copy(udp_descriptor* desc, SOCKET s, int* err)
{
+ ErlDrvSizeT q_low, q_high;
ErlDrvPort port = desc->inet.port;
udp_descriptor* copy_desc;
- copy_desc = (udp_descriptor*) sctp_inet_start(port, NULL);
+ copy_desc = (udp_descriptor*) packet_inet_start(port, NULL, IPPROTO_SCTP);
/* Setup event if needed */
if ((copy_desc->inet.s = s) != INVALID_SOCKET) {
@@ -9838,7 +9885,6 @@ static udp_descriptor* sctp_inet_copy(udp_descriptor* desc, SOCKET s, int* err)
/* Some flags must be inherited at this point */
copy_desc->inet.mode = desc->inet.mode;
copy_desc->inet.exitf = desc->inet.exitf;
- copy_desc->inet.bit8f = desc->inet.bit8f;
copy_desc->inet.deliver = desc->inet.deliver;
copy_desc->inet.htype = desc->inet.htype;
copy_desc->inet.psize = desc->inet.psize;
@@ -9855,9 +9901,17 @@ static udp_descriptor* sctp_inet_copy(udp_descriptor* desc, SOCKET s, int* err)
FREE(copy_desc);
return NULL;
}
+
+ /* Read busy msgq limits of parent */
+ q_low = q_high = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ erl_drv_busy_msgq_limits(desc->inet.port, &q_low, &q_high);
+ /* Write same busy msgq limits to child */
+ erl_drv_busy_msgq_limits(port, &q_low, &q_high);
+
copy_desc->inet.port = port;
copy_desc->inet.dport = driver_mk_port(port);
*err = 0;
+
return copy_desc;
}
#endif
@@ -9890,13 +9944,17 @@ static ErlDrvData packet_inet_start(ErlDrvPort port, char* args, int protocol)
static ErlDrvData udp_inet_start(ErlDrvPort port, char *args)
{
- return packet_inet_start(port, args, IPPROTO_UDP);
+ ErlDrvData data = packet_inet_start(port, args, IPPROTO_UDP);
+ set_default_msgq_limits(port);
+ return data;
}
#ifdef HAVE_SCTP
static ErlDrvData sctp_inet_start(ErlDrvPort port, char *args)
{
- return packet_inet_start(port, args, IPPROTO_SCTP);
+ ErlDrvData data = packet_inet_start(port, args, IPPROTO_SCTP);
+ set_default_msgq_limits(port);
+ return data;
}
#endif
@@ -10240,6 +10298,7 @@ static ErlDrvSSizeT packet_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf,
}
new_udesc->inet.state = INET_STATE_CONNECTED;
new_udesc->inet.stype = SOCK_STREAM;
+ SET_NONBLOCKING(new_udesc->inet.s);
inet_reply_ok_port(desc, new_udesc->inet.dport);
(*rbuf)[0] = INET_REP;
@@ -11013,7 +11072,7 @@ subs_list *subs;
static void send_to_subscribers
(
- ErlDrvPort port,
+ ErlDrvTermData port,
subs_list *subs,
int free_subs,
ErlDrvTermData msg[],
@@ -11030,7 +11089,7 @@ static void send_to_subscribers
this = subs;
while(this) {
- (void) driver_send_term(port, this->subscriber, msg, msg_len);
+ (void) erl_drv_send_term(port, this->subscriber, msg, msg_len);
if(free_subs && !first) {
next = this->next;