diff options
Diffstat (limited to 'erts/emulator/drivers/common')
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 687 |
1 files changed, 481 insertions, 206 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index c803e6b51e..2ff5f744d6 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -280,6 +280,57 @@ static unsigned long one_value = 1; # define SCTP_EOF MSG_EOF #endif +/* More Solaris 10 fixes: */ +#if ! HAVE_DECL_SCTP_CLOSED && HAVE_DECL_SCTPS_IDLE +# define SCTP_CLOSED SCTPS_IDLE +# undef HAVE_DECL_SCTP_CLOSED +# define HAVE_DECL_SCTP_CLOSED 1 +#endif +#if ! HAVE_DECL_SCTP_BOUND && HAVE_DECL_SCTPS_BOUND +# define SCTP_BOUND SCTPS_BOUND +# undef HAVE_DECL_SCTP_BOUND +# define HAVE_DECL_SCTP_BOUND 1 +#endif +#if ! HAVE_DECL_SCTP_LISTEN && HAVE_DECL_SCTPS_LISTEN +# define SCTP_LISTEN SCTPS_LISTEN +# undef HAVE_DECL_SCTP_LISTEN +# define HAVE_DECL_SCTP_LISTEN 1 +#endif +#if ! HAVE_DECL_SCTP_COOKIE_WAIT && HAVE_DECL_SCTPS_COOKIE_WAIT +# define SCTP_COOKIE_WAIT SCTPS_COOKIE_WAIT +# undef HAVE_DECL_SCTP_COOKIE_WAIT +# define HAVE_DECL_SCTP_COOKIE_WAIT 1 +#endif +#if ! HAVE_DECL_SCTP_COOKIE_ECHOED && HAVE_DECL_SCTPS_COOKIE_ECHOED +# define SCTP_COOKIE_ECHOED SCTPS_COOKIE_ECHOED +# undef HAVE_DECL_SCTP_COOKIE_ECHOED +# define HAVE_DECL_SCTP_COOKIE_ECHOED 1 +#endif +#if ! HAVE_DECL_SCTP_ESTABLISHED && HAVE_DECL_SCTPS_ESTABLISHED +# define SCTP_ESTABLISHED SCTPS_ESTABLISHED +# undef HAVE_DECL_SCTP_ESTABLISHED +# define HAVE_DECL_SCTP_ESTABLISHED 1 +#endif +#if ! HAVE_DECL_SCTP_SHUTDOWN_PENDING && HAVE_DECL_SCTPS_SHUTDOWN_PENDING +# define SCTP_SHUTDOWN_PENDING SCTPS_SHUTDOWN_PENDING +# undef HAVE_DECL_SCTP_SHUTDOWN_PENDING +# define HAVE_DECL_SCTP_SHUTDOWN_PENDING 1 +#endif +#if ! HAVE_DECL_SCTP_SHUTDOWN_SENT && HAVE_DECL_SCTPS_SHUTDOWN_SENT +# define SCTP_SHUTDOWN_SENT SCTPS_SHUTDOWN_SENT +# undef HAVE_DECL_SCTP_SHUTDOWN_SENT +# define HAVE_DECL_SCTP_SHUTDOWN_SENT 1 +#endif +#if ! HAVE_DECL_SCTP_SHUTDOWN_RECEIVED && HAVE_DECL_SCTPS_SHUTDOWN_RECEIVED +# define SCTP_SHUTDOWN_RECEIVED SCTPS_SHUTDOWN_RECEIVED +# undef HAVE_DECL_SCTP_SHUTDOWN_RECEIVED +# define HAVE_DECL_SCTP_SHUTDOWN_RECEIVED 1 +#endif +#if ! HAVE_DECL_SCTP_SHUTDOWN_ACK_SENT && HAVE_DECL_SCTPS_SHUTDOWN_ACK_SENT +# define SCTP_SHUTDOWN_ACK_SENT SCTPS_SHUTDOWN_ACK_SENT +# undef HAVE_DECL_SCTP_SHUTDOWN_ACK_SENT +# define HAVE_DECL_SCTP_SHUTDOWN_ACK_SENT 1 +#endif /* New spelling in lksctp 2.6.22 or maybe even earlier: * adaption -> adaptation */ @@ -294,12 +345,13 @@ static unsigned long one_value = 1; # define sctp_adaptation_layer_event sctp_adaption_layer_event #endif -static void *h_libsctp = NULL; #ifdef __GNUC__ static typeof(sctp_bindx) *p_sctp_bindx = NULL; +static typeof(sctp_peeloff) *p_sctp_peeloff = NULL; #else static int (*p_sctp_bindx)(int sd, struct sockaddr *addrs, int addrcnt, int flags) = NULL; +static int (*p_sctp_peeloff)(int sd, sctp_assoc_t assoc_id) = NULL; #endif #endif /* SCTP supported */ @@ -427,7 +479,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_AF_ANY 3 /* INADDR_ANY or IN6ADDR_ANY_INIT */ #define INET_AF_LOOPBACK 4 /* INADDR_LOOPBACK or IN6ADDR_LOOPBACK_INIT */ -/* INET_REQ_GETTYPE enumeration */ +/* open and INET_REQ_GETTYPE enumeration */ #define INET_TYPE_STREAM 1 #define INET_TYPE_DGRAM 2 #define INET_TYPE_SEQPACKET 3 @@ -484,16 +536,19 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_REQ_IFSET 23 #define INET_REQ_SUBSCRIBE 24 #define INET_REQ_GETIFADDRS 25 +#define INET_REQ_ACCEPT 26 +#define INET_REQ_LISTEN 27 /* TCP requests */ -#define TCP_REQ_ACCEPT 40 -#define TCP_REQ_LISTEN 41 +/* #define TCP_REQ_ACCEPT 40 MOVED */ +/* #define TCP_REQ_LISTEN 41 MERGED */ #define TCP_REQ_RECV 42 #define TCP_REQ_UNRECV 43 #define TCP_REQ_SHUTDOWN 44 /* UDP and SCTP requests */ #define PACKET_REQ_RECV 60 /* Common for UDP and SCTP */ -#define SCTP_REQ_LISTEN 61 /* Different from TCP; not for UDP */ +/* #define SCTP_REQ_LISTEN 61 MERGED Different from TCP; not for UDP */ #define SCTP_REQ_BINDX 62 /* Multi-home SCTP bind */ +#define SCTP_REQ_PEELOFF 63 /* INET_REQ_SUBSCRIBE sub-requests */ #define INET_SUBS_EMPTY_OUT_Q 1 @@ -507,7 +562,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) /* *_REQ_* replies */ #define INET_REP_ERROR 0 #define INET_REP_OK 1 -#define INET_REP_SCTP 2 +#define INET_REP 2 /* INET_REQ_SETOPTS and INET_REQ_GETOPTS options */ #define INET_OPT_REUSEADDR 0 /* enable/disable local address reuse */ @@ -628,10 +683,14 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) ** End of interface constants. **--------------------------------------------------------------------------*/ -#define INET_STATE_CLOSED 0 -#define INET_STATE_OPEN (INET_F_OPEN) -#define INET_STATE_BOUND (INET_STATE_OPEN | INET_F_BOUND) -#define INET_STATE_CONNECTED (INET_STATE_BOUND | INET_F_ACTIVE) +#define INET_STATE_CLOSED (0) +#define INET_STATE_OPEN (INET_F_OPEN) +#define INET_STATE_BOUND (INET_STATE_OPEN | INET_F_BOUND) +#define INET_STATE_CONNECTED (INET_STATE_BOUND | INET_F_ACTIVE) +#define INET_STATE_LISTENING (INET_STATE_BOUND | INET_F_LISTEN) +#define INET_STATE_CONNECTING (INET_STATE_BOUND | INET_F_CON) +#define INET_STATE_ACCEPTING (INET_STATE_LISTENING | INET_F_ACC) +#define INET_STATE_MULTI_ACCEPTING (INET_STATE_ACCEPTING | INET_F_MULTI_CLIENT) #define IS_OPEN(d) \ (((d)->state & INET_F_OPEN) == INET_F_OPEN) @@ -674,7 +733,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #endif -#define BIN_REALLOC_LIMIT(x) (((x)*3)/4) /* 75% */ +#define BIN_REALLOC_MARGIN(x) ((x)/4) /* 25% */ /* The general purpose sockaddr */ typedef union { @@ -809,16 +868,6 @@ typedef struct { -#define TCP_STATE_CLOSED INET_STATE_CLOSED -#define TCP_STATE_OPEN (INET_F_OPEN) -#define TCP_STATE_BOUND (TCP_STATE_OPEN | INET_F_BOUND) -#define TCP_STATE_CONNECTED (TCP_STATE_BOUND | INET_F_ACTIVE) -#define TCP_STATE_LISTEN (TCP_STATE_BOUND | INET_F_LISTEN) -#define TCP_STATE_CONNECTING (TCP_STATE_BOUND | INET_F_CON) -#define TCP_STATE_ACCEPTING (TCP_STATE_LISTEN | INET_F_ACC) -#define TCP_STATE_MULTI_ACCEPTING (TCP_STATE_ACCEPTING | INET_F_MULTI_CLIENT) - - #define TCP_MAX_PACKET_SIZE 0x4000000 /* 64 M */ #define MAX_VSIZE 16 /* Max number of entries allowed in an I/O @@ -874,12 +923,6 @@ static struct erl_drv_entry tcp_inet_driver_entry = inet_stop_select }; -#define PACKET_STATE_CLOSED INET_STATE_CLOSED -#define PACKET_STATE_OPEN (INET_F_OPEN) -#define PACKET_STATE_BOUND (PACKET_STATE_OPEN | INET_F_BOUND) -#define SCTP_STATE_LISTEN (PACKET_STATE_BOUND | INET_F_LISTEN) -#define SCTP_STATE_CONNECTING (PACKET_STATE_BOUND | INET_F_CON) -#define PACKET_STATE_CONNECTED (PACKET_STATE_BOUND | INET_F_ACTIVE) static int packet_inet_init(void); @@ -997,6 +1040,9 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event); typedef struct { inet_descriptor inet; /* common data structure (DON'T MOVE) */ int read_packets; /* Number of packets to read per invocation */ + int i_bufsz; /* current input buffer size */ + ErlDrvBinary* i_buf; /* current binary buffer */ + char* i_ptr; /* current pos in buf */ } udp_descriptor; @@ -1851,6 +1897,26 @@ static int inet_reply_ok(inet_descriptor* desc) return driver_send_term(desc->port, caller, spec, i); } +#ifdef HAVE_SCTP +static int inet_reply_ok_port(inet_descriptor* desc, ErlDrvTermData dport) +{ + ErlDrvTermData spec[2*LOAD_ATOM_CNT + 2*LOAD_PORT_CNT + 2*LOAD_TUPLE_CNT]; + ErlDrvTermData caller = desc->caller; + int i = 0; + + i = LOAD_ATOM(spec, i, am_inet_reply); + i = LOAD_PORT(spec, i, desc->dport); + i = LOAD_ATOM(spec, i, am_ok); + i = LOAD_PORT(spec, i, dport); + i = LOAD_TUPLE(spec, i, 2); + i = LOAD_TUPLE(spec, i, 3); + ASSERT(i == sizeof(spec)/sizeof(*spec)); + + desc->caller = 0; + return driver_send_term(desc->port, caller, spec, i); +} +#endif + /* send: ** {inet_reply, S, {error, Reason}} */ @@ -2389,14 +2455,19 @@ static ErlDrvTermData am_sctp_rtoinfo, /* Option names */ am_active, am_inactive, /* For #sctp_status{}: */ - am_empty, am_closed, +# if HAVE_DECL_SCTP_EMPTY + am_empty, +# endif +# if HAVE_DECL_SCTP_BOUND + am_bound, +# endif +# if HAVE_DECL_SCTP_LISTEN + am_listen, +# endif am_cookie_wait, am_cookie_echoed, am_established, am_shutdown_pending, am_shutdown_sent, am_shutdown_received, am_shutdown_ack_sent; - /* Not yet implemented in the Linux kernel: - ** am_bound, am_listen; - */ /* ** Parsing of "sctp_sndrcvinfo": ancillary data coming with received msgs. @@ -2665,7 +2736,8 @@ static int sctp_parse_async_event # ifdef HAVE_STRUCT_SCTP_REMOTE_ERROR_SRE_DATA chunk = (char*) (&(sptr->sre_data)); # else - chunk = ((char*)sptr) + sizeof(*sptr); + chunk = ((char*) &(sptr->sre_assoc_id)) + + sizeof(sptr->sre_assoc_id); # endif chlen = sptr->sre_length - (chunk - (char *)sptr); i = sctp_parse_error_chunk(spec, i, chunk, chlen); @@ -2716,7 +2788,8 @@ static int sctp_parse_async_event # ifdef HAVE_STRUCT_SCTP_SEND_FAILED_SSF_DATA chunk = (char*) (&(sptr->ssf_data)); # else - chunk = ((char*)sptr) + sizeof(*sptr); + chunk = ((char*) &(sptr->ssf_assoc_id)) + + sizeof(sptr->ssf_assoc_id); # endif chlen = sptr->ssf_length - (chunk - (char*) sptr); choff = chunk - bin->orig_bytes; @@ -3390,8 +3463,15 @@ static void inet_init_sctp(void) { INIT_ATOM(inactive); /* For #sctp_status{}: */ +# if HAVE_DECL_SCTP_EMPTY INIT_ATOM(empty); - INIT_ATOM(closed); +# endif +# if HAVE_DECL_SCTP_BOUND + INIT_ATOM(bound); +# endif +# if HAVE_DECL_SCTP_LISTEN + INIT_ATOM(listen); +# endif INIT_ATOM(cookie_wait); INIT_ATOM(cookie_echoed); INIT_ATOM(established); @@ -3399,10 +3479,6 @@ static void inet_init_sctp(void) { INIT_ATOM(shutdown_sent); INIT_ATOM(shutdown_received); INIT_ATOM(shutdown_ack_sent); - /* Not yet implemented in the Linux kernel: - ** INIT_ATOM(bound); - ** INIT_ATOM(listen); - */ } #endif /* HAVE_SCTP */ @@ -3453,17 +3529,32 @@ static int inet_init() /* Check the size of SCTP AssocID -- currently both this driver and the Erlang part require 32 bit: */ ASSERT(sizeof(sctp_assoc_t)==ASSOC_ID_LEN); -# ifndef LIBSCTP -# error LIBSCTP not defined -# endif - if (erts_sys_ddll_open_noext(STRINGIFY(LIBSCTP), &h_libsctp, NULL) == 0) { - void *ptr; - if (erts_sys_ddll_sym(h_libsctp, "sctp_bindx", &ptr) == 0) { - p_sctp_bindx = ptr; - inet_init_sctp(); - add_driver_entry(&sctp_inet_driver_entry); +# if defined(HAVE_SCTP_BINDX) && defined (HAVE_SCTP_PEELOFF) + p_sctp_bindx = sctp_bindx; + p_sctp_peeloff = sctp_peeloff; + inet_init_sctp(); + add_driver_entry(&sctp_inet_driver_entry); +# else +# ifndef LIBSCTP +# error LIBSCTP not defined +# endif + { + static void *h_libsctp = NULL; + + if (erts_sys_ddll_open_noext(STRINGIFY(LIBSCTP), &h_libsctp, NULL) + == 0) { + void *ptr; + if (erts_sys_ddll_sym(h_libsctp, "sctp_bindx", &ptr) == 0) { + p_sctp_bindx = ptr; + inet_init_sctp(); + add_driver_entry(&sctp_inet_driver_entry); + if (erts_sys_ddll_sym(h_libsctp, "sctp_peeloff", &ptr) == 0) { + p_sctp_peeloff = ptr; + } + } } } +# endif #endif /* remove the dummy inet driver */ @@ -4459,6 +4550,7 @@ static int inet_ctl_ifset(inet_descriptor* desc, char* buf, int len, +#if defined(__WIN32__) || defined(HAVE_GETIFADDRS) /* Latin-1 to utf8 */ static int utf8_len(const char *c, int m) { @@ -4481,6 +4573,7 @@ static void utf8_encode(const char *c, int m, char *p) { } } } +#endif #if defined(__WIN32__) @@ -6736,7 +6829,7 @@ static int sctp_fill_opts(inet_descriptor* desc, char* buf, int buflen, 2*LOAD_ATOM_CNT + LOAD_INT_CNT + 2*LOAD_TUPLE_CNT); i = LOAD_ATOM (spec, i, am_sctp_adaptation_layer); i = LOAD_ATOM (spec, i, am_sctp_setadaptation); - i = LOAD_INT (spec, i, ad.ssb_adaptation_ind); + i = LOAD_INT (spec, i, sock_ntohl(ad.ssb_adaptation_ind)); i = LOAD_TUPLE (spec, i, 2); i = LOAD_TUPLE (spec, i, 2); break; @@ -6879,7 +6972,7 @@ static int sctp_fill_opts(inet_descriptor* desc, char* buf, int buflen, break; } /* The following option is not available in Solaris 10: */ -# ifdef SCTP_DELAYED_ACK_TIME +# if HAVE_DECL_SCTP_DELAYED_ACK_TIME case SCTP_OPT_DELAYED_ACK_TIME: { struct sctp_assoc_value av; @@ -6926,7 +7019,7 @@ static int sctp_fill_opts(inet_descriptor* desc, char* buf, int buflen, switch(st.sstat_state) { /* SCTP_EMPTY is not supported on SOLARIS10: */ -# ifdef SCTP_EMPTY +# if HAVE_DECL_SCTP_EMPTY case SCTP_EMPTY: i = LOAD_ATOM (spec, i, am_empty); break; @@ -6934,14 +7027,16 @@ static int sctp_fill_opts(inet_descriptor* desc, char* buf, int buflen, case SCTP_CLOSED: i = LOAD_ATOM (spec, i, am_closed); break; - /* The following states are not supported by Linux Kernel SCTP yet: +# if HAVE_DECL_SCTP_BOUND case SCTP_BOUND: i = LOAD_ATOM (spec, i, am_bound); break; +# endif +# if HAVE_DECL_SCTP_LISTEN case SCTP_LISTEN: i = LOAD_ATOM (spec, i, am_listen); break; - */ +# endif case SCTP_COOKIE_WAIT: i = LOAD_ATOM (spec, i, am_cookie_wait); break; @@ -7032,7 +7127,7 @@ static int sctp_fill_opts(inet_descriptor* desc, char* buf, int buflen, driver_send_term(desc->port, driver_caller(desc->port), spec, i); FREE(spec); - (*dest)[0] = INET_REP_SCTP; + (*dest)[0] = INET_REP; return 1; /* Response length */ # undef PLACE_FOR # undef RETURN_ERROR @@ -7807,22 +7902,22 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s, static void tcp_close_check(tcp_descriptor* desc) { /* XXX:PaN - multiple clients to handle! */ - if (desc->inet.state == TCP_STATE_ACCEPTING) { + if (desc->inet.state == INET_STATE_ACCEPTING) { inet_async_op *this_op = desc->inet.opt; sock_select(INETP(desc), FD_ACCEPT, 0); - desc->inet.state = TCP_STATE_LISTEN; + desc->inet.state = INET_STATE_LISTENING; if (this_op != NULL) { driver_demonitor_process(desc->inet.port, &(this_op->monitor)); } async_error_am(INETP(desc), am_closed); } - else if (desc->inet.state == TCP_STATE_MULTI_ACCEPTING) { + else if (desc->inet.state == INET_STATE_MULTI_ACCEPTING) { int id,req; ErlDrvTermData caller; ErlDrvMonitor monitor; sock_select(INETP(desc), FD_ACCEPT, 0); - desc->inet.state = TCP_STATE_LISTEN; + desc->inet.state = INET_STATE_LISTENING; while (deq_multi_op(desc,&id,&req,&caller,NULL,&monitor) == 0) { driver_demonitor_process(desc->inet.port, &monitor); send_async_error(desc->inet.port, desc->inet.dport, id, caller, am_closed); @@ -7830,10 +7925,10 @@ static void tcp_close_check(tcp_descriptor* desc) clean_multi_timers(&(desc->mtd), desc->inet.port); } - else if (desc->inet.state == TCP_STATE_CONNECTING) { + else if (desc->inet.state == INET_STATE_CONNECTING) { async_error_am(INETP(desc), am_closed); } - else if (desc->inet.state == TCP_STATE_CONNECTED) { + else if (desc->inet.state == INET_STATE_CONNECTED) { async_error_am_all(INETP(desc), am_closed); } } @@ -7865,40 +7960,62 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, { tcp_descriptor* desc = (tcp_descriptor*)e; switch(cmd) { - case INET_REQ_OPEN: /* open socket and return internal index */ + case INET_REQ_OPEN: { /* open socket and return internal index */ + int domain; DEBUGF(("tcp_inet_ctl(%ld): OPEN\r\n", (long)desc->inet.port)); - if ((len == 1) && (buf[0] == INET_AF_INET)) - return - inet_ctl_open(INETP(desc), AF_INET, SOCK_STREAM, rbuf, rsize); + if (len != 2) return ctl_error(EINVAL, rbuf, rsize); + switch(buf[0]) { + case INET_AF_INET: + domain = AF_INET; + break; #if defined(HAVE_IN6) && defined(AF_INET6) - else if ((len == 1) && (buf[0] == INET_AF_INET6)) - return - inet_ctl_open(INETP(desc), AF_INET6, SOCK_STREAM, rbuf, rsize); + case INET_AF_INET6: + domain = AF_INET6; + break; #else - else if ((len == 1) && (buf[0] == INET_AF_INET6)) - return ctl_xerror("eafnosupport",rbuf,rsize); + case INET_AF_INET6: + return ctl_xerror("eafnosupport", rbuf, rsize); + break; #endif - else + default: return ctl_error(EINVAL, rbuf, rsize); + } + if (buf[1] != INET_TYPE_STREAM) return ctl_error(EINVAL, rbuf, rsize); + return inet_ctl_open(INETP(desc), domain, SOCK_STREAM, rbuf, rsize); + break; + } - case INET_REQ_FDOPEN: /* pass in an open socket */ - DEBUGF(("tcp_inet_ctl(%ld): FDOPEN\r\n", (long)desc->inet.port)); - if ((len == 5) && (buf[0] == INET_AF_INET)) - return inet_ctl_fdopen(INETP(desc), AF_INET, SOCK_STREAM, - (SOCKET) get_int32(buf+1), rbuf, rsize); + case INET_REQ_FDOPEN: { /* pass in an open socket */ + int domain; + DEBUGF(("tcp_inet_ctl(%ld): FDOPEN\r\n", (long)desc->inet.port)); + if (len != 6) return ctl_error(EINVAL, rbuf, rsize); + switch(buf[0]) { + case INET_AF_INET: + domain = AF_INET; + break; #if defined(HAVE_IN6) && defined(AF_INET6) - else if ((len == 5) && (buf[0] == INET_AF_INET6)) - return inet_ctl_fdopen(INETP(desc), AF_INET6, SOCK_STREAM, - (SOCKET) get_int32(buf+1), rbuf, rsize); + case INET_AF_INET6: + domain = AF_INET6; + break; +#else + case INET_AF_INET6: + return ctl_xerror("eafnosupport", rbuf, rsize); + break; #endif - else + default: return ctl_error(EINVAL, rbuf, rsize); + } + if (buf[1] != INET_TYPE_STREAM) return ctl_error(EINVAL, rbuf, rsize); + return inet_ctl_fdopen(INETP(desc), domain, SOCK_STREAM, + (SOCKET) get_int32(buf+2), rbuf, rsize); + break; + } - case TCP_REQ_LISTEN: { /* argument backlog */ + case INET_REQ_LISTEN: { /* argument backlog */ int backlog; DEBUGF(("tcp_inet_ctl(%ld): LISTEN\r\n", (long)desc->inet.port)); - if (desc->inet.state == TCP_STATE_CLOSED) + if (desc->inet.state == INET_STATE_CLOSED) return ctl_xerror(EXBADPORT, rbuf, rsize); if (!IS_OPEN(INETP(desc))) return ctl_xerror(EXBADPORT, rbuf, rsize); @@ -7909,7 +8026,7 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, backlog = get_int16(buf); if (IS_SOCKET_ERROR(sock_listen(desc->inet.s, backlog))) return ctl_error(sock_errno(), rbuf, rsize); - desc->inet.state = TCP_STATE_LISTEN; + desc->inet.state = INET_STATE_LISTENING; return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); } @@ -7945,13 +8062,13 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, ((sock_errno() == ERRNO_BLOCK) || /* Winsock2 */ (sock_errno() == EINPROGRESS))) { /* Unix & OSE!! */ sock_select(INETP(desc), FD_CONNECT, 1); - desc->inet.state = TCP_STATE_CONNECTING; + desc->inet.state = INET_STATE_CONNECTING; if (timeout != INET_INFINITY) driver_set_timer(desc->inet.port, timeout); enq_async(INETP(desc), tbuf, INET_REQ_CONNECT); } else if (code == 0) { /* ok we are connected */ - desc->inet.state = TCP_STATE_CONNECTED; + desc->inet.state = INET_STATE_CONNECTED; if (desc->inet.active) sock_select(INETP(desc), (FD_READ|FD_CLOSE), 1); enq_async(INETP(desc), tbuf, INET_REQ_CONNECT); @@ -7963,7 +8080,7 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); } - case TCP_REQ_ACCEPT: { /* do async accept */ + case INET_REQ_ACCEPT: { /* do async accept */ char tbuf[2]; unsigned timeout; inet_address remote; @@ -7973,14 +8090,14 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, DEBUGF(("tcp_inet_ctl(%ld): ACCEPT\r\n", (long)desc->inet.port)); /* INPUT: Timeout(4) */ - if ((desc->inet.state != TCP_STATE_LISTEN && desc->inet.state != TCP_STATE_ACCEPTING && - desc->inet.state != TCP_STATE_MULTI_ACCEPTING) || len != 4) { + if ((desc->inet.state != INET_STATE_LISTENING && desc->inet.state != INET_STATE_ACCEPTING && + desc->inet.state != INET_STATE_MULTI_ACCEPTING) || len != 4) { return ctl_error(EINVAL, rbuf, rsize); } timeout = get_int32(buf); - if (desc->inet.state == TCP_STATE_ACCEPTING) { + if (desc->inet.state == INET_STATE_ACCEPTING) { unsigned long time_left = 0; int oid = 0; ErlDrvTermData ocaller = ERL_DRV_NIL; @@ -8009,10 +8126,10 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, mtd = add_multi_timer(&(desc->mtd), desc->inet.port, caller, timeout, &tcp_inet_multi_timeout); } - enq_multi_op(desc, tbuf, TCP_REQ_ACCEPT, caller, mtd, &monitor); - desc->inet.state = TCP_STATE_MULTI_ACCEPTING; + enq_multi_op(desc, tbuf, INET_REQ_ACCEPT, caller, mtd, &monitor); + desc->inet.state = INET_STATE_MULTI_ACCEPTING; return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); - } else if (desc->inet.state == TCP_STATE_MULTI_ACCEPTING) { + } else if (desc->inet.state == INET_STATE_MULTI_ACCEPTING) { ErlDrvTermData caller = driver_caller(desc->inet.port); MultiTimerData *mtd = NULL; ErlDrvMonitor monitor; @@ -8024,7 +8141,7 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, mtd = add_multi_timer(&(desc->mtd), desc->inet.port, caller, timeout, &tcp_inet_multi_timeout); } - enq_multi_op(desc, tbuf, TCP_REQ_ACCEPT, caller, mtd, &monitor); + enq_multi_op(desc, tbuf, INET_REQ_ACCEPT, caller, mtd, &monitor); return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); } else { n = sizeof(desc->inet.remote); @@ -8036,8 +8153,8 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, &monitor) != 0) { return ctl_xerror("noproc", rbuf, rsize); } - enq_async_w_tmo(INETP(desc), tbuf, TCP_REQ_ACCEPT, timeout, &monitor); - desc->inet.state = TCP_STATE_ACCEPTING; + enq_async_w_tmo(INETP(desc), tbuf, INET_REQ_ACCEPT, timeout, &monitor); + desc->inet.state = INET_STATE_ACCEPTING; sock_select(INETP(desc),FD_ACCEPT,1); if (timeout != INET_INFINITY) { driver_set_timer(desc->inet.port, timeout); @@ -8064,8 +8181,8 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, driver_select(accept_desc->inet.port, accept_desc->inet.event, ERL_DRV_READ, 1); #endif - accept_desc->inet.state = TCP_STATE_CONNECTED; - enq_async(INETP(desc), tbuf, TCP_REQ_ACCEPT); + accept_desc->inet.state = INET_STATE_CONNECTED; + enq_async(INETP(desc), tbuf, INET_REQ_ACCEPT); async_ok_port(INETP(desc), accept_desc->inet.dport); } return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); @@ -8171,7 +8288,7 @@ static void tcp_inet_timeout(ErlDrvData e) (long)desc->inet.port, desc->inet.s)); if ((state & INET_F_MULTI_CLIENT)) { /* Multi-client always means multi-timers */ fire_multi_timers(&(desc->mtd), desc->inet.port, e); - } else if ((state & TCP_STATE_CONNECTED) == TCP_STATE_CONNECTED) { + } else if ((state & INET_STATE_CONNECTED) == INET_STATE_CONNECTED) { if (desc->busy_on_send) { ASSERT(IS_BUSY(INETP(desc))); desc->inet.caller = desc->inet.busy_caller; @@ -8191,20 +8308,20 @@ static void tcp_inet_timeout(ErlDrvData e) async_error_am(INETP(desc), am_timeout); } } - else if ((state & TCP_STATE_CONNECTING) == TCP_STATE_CONNECTING) { + else if ((state & INET_STATE_CONNECTING) == INET_STATE_CONNECTING) { /* assume connect timeout */ /* close the socket since it's not usable (see man pages) */ erl_inet_close(INETP(desc)); async_error_am(INETP(desc), am_timeout); } - else if ((state & TCP_STATE_ACCEPTING) == TCP_STATE_ACCEPTING) { + else if ((state & INET_STATE_ACCEPTING) == INET_STATE_ACCEPTING) { inet_async_op *this_op = desc->inet.opt; /* timer is set on accept */ sock_select(INETP(desc), FD_ACCEPT, 0); if (this_op != NULL) { driver_demonitor_process(desc->inet.port, &(this_op->monitor)); } - desc->inet.state = TCP_STATE_LISTEN; + desc->inet.state = INET_STATE_LISTENING; async_error_am(INETP(desc), am_timeout); } DEBUGF(("tcp_inet_timeout(%ld) }\r\n", (long)desc->inet.port)); @@ -8222,7 +8339,7 @@ static void tcp_inet_multi_timeout(ErlDrvData e, ErlDrvTermData caller) driver_demonitor_process(desc->inet.port, &monitor); if (desc->multi_first == NULL) { sock_select(INETP(desc),FD_ACCEPT,0); - desc->inet.state = TCP_STATE_LISTEN; /* restore state */ + desc->inet.state = INET_STATE_LISTENING; /* restore state */ } send_async_error(desc->inet.port, desc->inet.dport, id, caller, am_timeout); } @@ -8288,7 +8405,7 @@ static void tcp_inet_process_exit(ErlDrvData e, ErlDrvMonitor *monitorp) ErlDrvTermData who = driver_get_monitored_process(desc->inet.port,monitorp); int state = desc->inet.state; - if ((state & TCP_STATE_MULTI_ACCEPTING) == TCP_STATE_MULTI_ACCEPTING) { + if ((state & INET_STATE_MULTI_ACCEPTING) == INET_STATE_MULTI_ACCEPTING) { int id,req; MultiTimerData *timeout; if (remove_multi_op(desc, &id, &req, who, &timeout, NULL) != 0) { @@ -8299,15 +8416,15 @@ static void tcp_inet_process_exit(ErlDrvData e, ErlDrvMonitor *monitorp) } if (desc->multi_first == NULL) { sock_select(INETP(desc),FD_ACCEPT,0); - desc->inet.state = TCP_STATE_LISTEN; /* restore state */ + desc->inet.state = INET_STATE_LISTENING; /* restore state */ } - } else if ((state & TCP_STATE_ACCEPTING) == TCP_STATE_ACCEPTING) { + } else if ((state & INET_STATE_ACCEPTING) == INET_STATE_ACCEPTING) { int did,drid; ErlDrvTermData dcaller; deq_async(INETP(desc), &did, &dcaller, &drid); driver_cancel_timer(desc->inet.port); sock_select(INETP(desc),FD_ACCEPT,0); - desc->inet.state = TCP_STATE_LISTEN; /* restore state */ + desc->inet.state = INET_STATE_LISTENING; /* restore state */ } } @@ -8844,8 +8961,8 @@ static void tcp_inet_event(ErlDrvData e, ErlDrvEvent event) /* socket has input: -** 1. TCP_STATE_ACCEPTING => non block accept ? -** 2. TCP_STATE_CONNECTED => read input +** 1. INET_STATE_ACCEPTING => non block accept ? +** 2. INET_STATE_CONNECTED => read input */ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) { @@ -8854,7 +8971,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) long port = (long) desc->inet.port; /* Used after driver_exit() */ #endif DEBUGF(("tcp_inet_input(%ld) {s=%d\r\n", port, desc->inet.s)); - if (desc->inet.state == TCP_STATE_ACCEPTING) { + if (desc->inet.state == INET_STATE_ACCEPTING) { SOCKET s; unsigned int len; inet_address remote; @@ -8869,7 +8986,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) } sock_select(INETP(desc),FD_ACCEPT,0); - desc->inet.state = TCP_STATE_LISTEN; /* restore state */ + desc->inet.state = INET_STATE_LISTENING; /* restore state */ if (this_op != NULL) { driver_demonitor_process(desc->inet.port, &(this_op->monitor)); @@ -8909,11 +9026,11 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) driver_select(accept_desc->inet.port, accept_desc->inet.event, ERL_DRV_READ, 1); #endif - accept_desc->inet.state = TCP_STATE_CONNECTED; + accept_desc->inet.state = INET_STATE_CONNECTED; ret = async_ok_port(INETP(desc), accept_desc->inet.dport); goto done; } - } else if (desc->inet.state == TCP_STATE_MULTI_ACCEPTING) { + } else if (desc->inet.state == INET_STATE_MULTI_ACCEPTING) { SOCKET s; unsigned int len; inet_address remote; @@ -8925,7 +9042,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) int times = 0; #endif - while (desc->inet.state == TCP_STATE_MULTI_ACCEPTING) { + while (desc->inet.state == INET_STATE_MULTI_ACCEPTING) { len = sizeof(desc->inet.remote); s = sock_accept(desc->inet.s, (struct sockaddr*) &remote, &len); @@ -8945,7 +9062,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) if (desc->multi_first == NULL) { sock_select(INETP(desc),FD_ACCEPT,0); - desc->inet.state = TCP_STATE_LISTEN; /* restore state */ + desc->inet.state = INET_STATE_LISTENING; /* restore state */ } if (timeout != NULL) { @@ -8976,7 +9093,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) driver_select(accept_desc->inet.port, accept_desc->inet.event, ERL_DRV_READ, 1); #endif - accept_desc->inet.state = TCP_STATE_CONNECTED; + accept_desc->inet.state = INET_STATE_CONNECTED; ret = send_async_ok_port(desc->inet.port, desc->inet.dport, id, caller, accept_desc->inet.dport); } @@ -9254,8 +9371,8 @@ static void tcp_inet_drv_input(ErlDrvData data, ErlDrvEvent event) } /* socket ready for ouput: -** 1. TCP_STATE_CONNECTING => non block connect ? -** 2. TCP_STATE_CONNECTED => write output +** 1. INET_STATE_CONNECTING => non block connect ? +** 2. INET_STATE_CONNECTED => write output */ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) { @@ -9264,7 +9381,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) DEBUGF(("tcp_inet_output(%ld) {s=%d\r\n", (long)desc->inet.port, desc->inet.s)); - if (desc->inet.state == TCP_STATE_CONNECTING) { + if (desc->inet.state == INET_STATE_CONNECTING) { sock_select(INETP(desc),FD_CONNECT,0); driver_cancel_timer(ix); /* posssibly cancel a timer */ @@ -9284,7 +9401,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) (struct sockaddr*) &desc->inet.remote, &sz); if (IS_SOCKET_ERROR(code)) { - desc->inet.state = TCP_STATE_BOUND; /* restore state */ + desc->inet.state = INET_STATE_BOUND; /* restore state */ ret = async_error(INETP(desc), sock_errno()); goto done; } @@ -9297,7 +9414,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) (void *)&error, &sz); if ((code < 0) || error) { - desc->inet.state = TCP_STATE_BOUND; /* restore state */ + desc->inet.state = INET_STATE_BOUND; /* restore state */ ret = async_error(INETP(desc), error); goto done; } @@ -9305,7 +9422,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) #endif /* SO_ERROR */ #endif /* !__WIN32__ */ - desc->inet.state = TCP_STATE_CONNECTED; + desc->inet.state = INET_STATE_CONNECTED; if (desc->inet.active) sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); async_ok(INETP(desc)); @@ -9405,6 +9522,59 @@ static int should_use_so_bsdcompat(void) #endif /* __linux__ */ #endif /* HAVE_SO_BSDCOMPAT */ + + +#ifdef HAVE_SCTP +/* Copy a descriptor, by creating a new port with same settings + * as the descriptor desc. + * return NULL on error (ENFILE no ports avail) + */ +static udp_descriptor* sctp_inet_copy(udp_descriptor* desc, SOCKET s, int* err) +{ + ErlDrvPort port = desc->inet.port; + udp_descriptor* copy_desc; + + copy_desc = (udp_descriptor*) sctp_inet_start(port, NULL); + + /* Setup event if needed */ + if ((copy_desc->inet.s = s) != INVALID_SOCKET) { + if ((copy_desc->inet.event = sock_create_event(INETP(copy_desc))) == + INVALID_EVENT) { + *err = sock_errno(); + FREE(copy_desc); + return NULL; + } + } + + /* Some flags must be inherited at this point */ + copy_desc->inet.mode = desc->inet.mode; + copy_desc->inet.exitf = desc->inet.exitf; + copy_desc->inet.bit8f = desc->inet.bit8f; + copy_desc->inet.deliver = desc->inet.deliver; + copy_desc->inet.htype = desc->inet.htype; + copy_desc->inet.psize = desc->inet.psize; + copy_desc->inet.stype = desc->inet.stype; + copy_desc->inet.sfamily = desc->inet.sfamily; + copy_desc->inet.hsz = desc->inet.hsz; + copy_desc->inet.bufsz = desc->inet.bufsz; + + /* The new port will be linked and connected to the caller */ + port = driver_create_port(port, desc->inet.caller, "sctp_inet", + (ErlDrvData) copy_desc); + if ((long)port == -1) { + *err = ENFILE; + FREE(copy_desc); + return NULL; + } + copy_desc->inet.port = port; + copy_desc->inet.dport = driver_mk_port(port); + *err = 0; + return copy_desc; +} +#endif + + + static int packet_inet_init() { return 0; @@ -9423,6 +9593,9 @@ static ErlDrvData packet_inet_start(ErlDrvPort port, char* args, int protocol) return ERL_DRV_ERROR_ERRNO; desc->read_packets = INET_PACKET_POLL; + desc->i_bufsz = 0; + desc->i_buf = NULL; + desc->i_ptr = NULL; return drvd; } @@ -9447,6 +9620,10 @@ static void packet_inet_stop(ErlDrvData e) */ udp_descriptor * udesc = (udp_descriptor*) e; inet_descriptor* descr = INETP(udesc); + if (udesc->i_buf != NULL) { + release_buffer(udesc->i_buf); + udesc->i_buf = NULL; + } ASSERT(NO_SUBSCRIBERS(&(descr->empty_out_q_subs))); inet_stop(descr); @@ -9471,21 +9648,31 @@ static int packet_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, udp_descriptor * udesc = (udp_descriptor *) e; inet_descriptor* desc = INETP(udesc); int type = SOCK_DGRAM; - int af; -#ifdef HAVE_SCTP - if (IS_SCTP(desc)) type = SOCK_SEQPACKET; -#endif + int af = AF_INET; switch(cmd) { case INET_REQ_OPEN: /* open socket and return internal index */ DEBUGF(("packet_inet_ctl(%ld): OPEN\r\n", (long)desc->port)); - if (len != 1) { + if (len != 2) { return ctl_error(EINVAL, rbuf, rsize); } switch (buf[0]) { case INET_AF_INET: af = AF_INET; break; #if defined(HAVE_IN6) && defined(AF_INET6) - case INET_AF_INET6: af = AF_INET6; break; + case INET_AF_INET6: af = AF_INET6; break; +#else + case INET_AF_INET6: + return ctl_xerror("eafnosupport", rbuf, rsize); + break; +#endif + default: + return ctl_error(EINVAL, rbuf, rsize); + } + switch (buf[1]) { + case INET_TYPE_STREAM: type = SOCK_STREAM; break; + case INET_TYPE_DGRAM: type = SOCK_DGRAM; break; +#ifdef HAVE_SCTP + case INET_TYPE_SEQPACKET: type = SOCK_SEQPACKET; break; #endif default: return ctl_error(EINVAL, rbuf, rsize); @@ -9512,18 +9699,35 @@ static int packet_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, return replen; - case INET_REQ_FDOPEN: /* pass in an open (and bound) socket */ + case INET_REQ_FDOPEN: { /* pass in an open (and bound) socket */ + SOCKET s; DEBUGF(("packet inet_ctl(%ld): FDOPEN\r\n", (long)desc->port)); - if ((len == 5) && (buf[0] == INET_AF_INET)) - replen = inet_ctl_fdopen(desc, AF_INET, SOCK_DGRAM, - (SOCKET)get_int32(buf+1),rbuf,rsize); + if (len != 6) { + return ctl_error(EINVAL, rbuf, rsize); + } + switch (buf[0]) { + case INET_AF_INET: af = AF_INET; break; #if defined(HAVE_IN6) && defined(AF_INET6) - else if ((len == 5) && (buf[0] == INET_AF_INET6)) - replen = inet_ctl_fdopen(desc, AF_INET6, SOCK_DGRAM, - (SOCKET)get_int32(buf+1),rbuf,rsize); + case INET_AF_INET6: af = AF_INET6; break; +#else + case INET_AF_INET6: + return ctl_xerror("eafnosupport", rbuf, rsize); + break; #endif - else + default: + return ctl_error(EINVAL, rbuf, rsize); + } + switch (buf[1]) { + case INET_TYPE_STREAM: type = SOCK_STREAM; break; + case INET_TYPE_DGRAM: type = SOCK_DGRAM; break; +#ifdef HAVE_SCTP + case INET_TYPE_SEQPACKET: type = SOCK_SEQPACKET; break; +#endif + default: return ctl_error(EINVAL, rbuf, rsize); + } + s = (SOCKET)get_int32(buf+2); + replen = inet_ctl_fdopen(desc, af, type, s, rbuf, rsize); if ((*rbuf)[0] != INET_REP_ERROR) { if (desc->active) @@ -9543,6 +9747,7 @@ static int packet_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, #endif } return replen; + } case INET_REQ_CLOSE: @@ -9595,14 +9800,14 @@ static int packet_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, if (IS_SOCKET_ERROR(code) && (sock_errno() == EINPROGRESS)) { /* XXX: Unix only -- WinSock would have a different cond! */ - desc->state = SCTP_STATE_CONNECTING; + desc->state = INET_STATE_CONNECTING; if (timeout != INET_INFINITY) driver_set_timer(desc->port, timeout); enq_async(desc, tbuf, INET_REQ_CONNECT); } else if (code == 0) { /* OK we are connected */ sock_select(desc, FD_CONNECT, 0); - desc->state = PACKET_STATE_CONNECTED; + desc->state = INET_STATE_CONNECTED; enq_async(desc, tbuf, INET_REQ_CONNECT); async_ok(desc); } @@ -9648,11 +9853,11 @@ static int packet_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, } #ifdef HAVE_SCTP - case SCTP_REQ_LISTEN: + case INET_REQ_LISTEN: { /* LISTEN is only for SCTP sockets, not UDP. This code is borrowed from the TCP section. Returns: {ok,[]} on success. */ - int flag; + int backlog; DEBUGF(("packet_inet_ctl(%ld): LISTEN\r\n", (long)desc->port)); if (!IS_SCTP(desc)) @@ -9662,15 +9867,14 @@ static int packet_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, if (!IS_BOUND(desc)) return ctl_xerror(EXBADSEQ, rbuf, rsize); - /* The arg is a binary value: 1:enable, 0:disable */ - if (len != 1) + if (len != 2) return ctl_error(EINVAL, rbuf, rsize); - flag = get_int8(buf); + backlog = get_int16(buf); - if (IS_SOCKET_ERROR(sock_listen(desc->s, flag))) + if (IS_SOCKET_ERROR(sock_listen(desc->s, backlog))) return ctl_error(sock_errno(), rbuf, rsize); - desc->state = SCTP_STATE_LISTEN; /* XXX: not used? */ + desc->state = INET_STATE_LISTENING; /* XXX: not used? */ return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); } @@ -9716,6 +9920,46 @@ static int packet_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); } + + case SCTP_REQ_PEELOFF: + { + Uint32 assoc_id; + udp_descriptor* new_udesc; + int err; + SOCKET new_socket; + + DEBUGF(("packet_inet_ctl(%ld): PEELOFF\r\n", (long)desc->port)); + if (!IS_SCTP(desc)) + return ctl_xerror(EXBADPORT, rbuf, rsize); + if (!IS_OPEN(desc)) + return ctl_xerror(EXBADPORT, rbuf, rsize); + if (!IS_BOUND(desc)) + return ctl_xerror(EXBADSEQ, rbuf, rsize); + if (! p_sctp_peeloff) + return ctl_error(ENOTSUP, rbuf, rsize); + + if (len != 4) + return ctl_error(EINVAL, rbuf, rsize); + assoc_id = get_int32(buf); + + new_socket = p_sctp_peeloff(desc->s, assoc_id); + if (IS_SOCKET_ERROR(new_socket)) { + return ctl_error(sock_errno(), rbuf, rsize); + } + + desc->caller = driver_caller(desc->port); + if ((new_udesc = sctp_inet_copy(udesc, new_socket, &err)) == NULL) { + sock_close(new_socket); + desc->caller = 0; + return ctl_error(err, rbuf, rsize); + } + new_udesc->inet.state = INET_STATE_CONNECTED; + new_udesc->inet.stype = SOCK_STREAM; + + inet_reply_ok_port(desc, new_udesc->inet.dport); + (*rbuf)[0] = INET_REP; + return 1; + } #endif /* HAVE_SCTP */ case PACKET_REQ_RECV: @@ -9914,12 +10158,8 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) { inet_descriptor* desc = INETP(udesc); int n; - unsigned int len; inet_address other; char abuf[sizeof(inet_address)]; /* buffer address; enough??? */ - int sz; - char* ptr; - ErlDrvBinary* buf; /* binary */ int packet_count = udesc->read_packets; int count = 0; /* number of packets delivered to owner */ #ifdef HAVE_SCTP @@ -9930,23 +10170,39 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) #endif while(packet_count--) { - len = sizeof(other); - sz = desc->bufsz; - /* Allocate space for message and address. NB: "bufsz" is in "desc", - but the "buf" itself is allocated separately: - */ - if ((buf = alloc_buffer(sz+len)) == NULL) - return packet_error(udesc, ENOMEM); - ptr = buf->orig_bytes + len; /* pointer to message part */ + unsigned int len = sizeof(other); + + /* udesc->i_buf is only kept between SCTP fragments */ + if (udesc->i_buf == NULL) { + udesc->i_bufsz = desc->bufsz + len; + if ((udesc->i_buf = alloc_buffer(udesc->i_bufsz)) == NULL) + return packet_error(udesc, ENOMEM); + /* pointer to message start */ + udesc->i_ptr = udesc->i_buf->orig_bytes + len; + } else { + ErlDrvBinary* tmp; + int bufsz; + bufsz = desc->bufsz + (udesc->i_ptr - udesc->i_buf->orig_bytes); + if ((tmp = realloc_buffer(udesc->i_buf, bufsz)) == NULL) { + release_buffer(udesc->i_buf); + udesc->i_buf = NULL; + return packet_error(udesc, ENOMEM); + } else { + udesc->i_ptr = + tmp->orig_bytes + (udesc->i_ptr - udesc->i_buf->orig_bytes); + udesc->i_buf = tmp; + udesc->i_bufsz = bufsz; + } + } /* Note: On Windows NT, recvfrom() fails if the socket is connected. */ #ifdef HAVE_SCTP /* For SCTP we must use recvmsg() */ if (IS_SCTP(desc)) { - iov->iov_base = ptr; /* Data will come here */ - iov->iov_len = sz; /* Remaining buffer space */ + iov->iov_base = udesc->i_ptr; /* Data will come here */ + iov->iov_len = desc->bufsz; /* Remaining buffer space */ - mhdr.msg_name = &other; /* Peer addr comes into "other" */ + mhdr.msg_name = &other; /* Peer addr comes into "other" */ mhdr.msg_namelen = len; mhdr.msg_iov = iov; mhdr.msg_iovlen = 1; @@ -9956,42 +10212,28 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) /* Do the actual SCTP receive: */ n = sock_recvmsg(desc->s, &mhdr, 0); + len = mhdr.msg_namelen; goto check_result; } #endif /* Use recv() instead on connected sockets. */ if ((desc->state & INET_F_ACTIVE)) { - n = sock_recv(desc->s, ptr, sz, 0); + n = sock_recv(desc->s, udesc->i_ptr, desc->bufsz, 0); other = desc->remote; + goto check_result; } - else - n = sock_recvfrom(desc->s, ptr, sz, 0, &other.sa, &len); - -#ifdef HAVE_SCTP + n = sock_recvfrom(desc->s, udesc->i_ptr, desc->bufsz, + 0, &other.sa, &len); check_result: -#endif /* Analyse the result: */ - if (IS_SOCKET_ERROR(n) -#ifdef HAVE_SCTP - || (short_recv = (IS_SCTP(desc) && !(mhdr.msg_flags & MSG_EOR))) - /* NB: here we check for EOR not being set -- this is an error as - well, we don't support partial msgs: - */ -#endif - ) { + if (IS_SOCKET_ERROR(n)) { int err = sock_errno(); - release_buffer(buf); if (err != ERRNO_BLOCK) { + /* real error */ + release_buffer(udesc->i_buf); + udesc->i_buf = NULL; if (!desc->active) { -#ifdef HAVE_SCTP - if (short_recv) { - async_error_am(desc, am_short_recv); - } else { - async_error(desc, err); - } -#else async_error(desc, err); -#endif driver_cancel_timer(desc->port); sock_select(desc,FD_READ,0); } @@ -9999,46 +10241,69 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) /* This is for an active desc only: */ packet_error_message(udesc, err); } + return count; } - else if (!desc->active) + /* would block error - try again */ + if (!desc->active +#ifdef HAVE_SCTP + || short_recv +#endif + ) { sock_select(desc,FD_READ,1); + } return count; /* strange, not ready */ } - else { - int offs; - int nsz; + +#ifdef HAVE_SCTP + if (IS_SCTP(desc) && (short_recv = !(mhdr.msg_flags & MSG_EOR))) { + /* SCTP non-final message fragment */ + inet_input_count(desc, n); + udesc->i_ptr += n; + continue; /* wait for more fragments */ + } +#endif + + { + /* message received */ int code; - unsigned int alen = len; void * extra = NULL; + char * ptr; inet_input_count(desc, n); - inet_get_address(desc->sfamily, abuf, &other, &alen); - /* Copy formatted address to the buffer allocated; "alen" is the - actual length which must be <= than the original reserved "len". + udesc->i_ptr += n; + inet_get_address(desc->sfamily, abuf, &other, &len); + /* Copy formatted address to the buffer allocated; "len" is the + actual length which must be <= than the original reserved. This means that the addr + data in the buffer are contiguous, - but they may start not at the "orig_bytes", but with some "offs" - from them: + but they may start not at the "orig_bytes", instead at "ptr": */ - ASSERT (alen <= len); - sys_memcpy(ptr - alen, abuf, alen); - ptr -= alen; - nsz = n + alen; /* nsz = data + address */ - offs = ptr - buf->orig_bytes; /* initial pointer offset */ + ASSERT (len <= sizeof(other)); + ptr = udesc->i_buf->orig_bytes + sizeof(other) - len; + sys_memcpy(ptr, abuf, len); /* Check if we need to reallocate binary */ if ((desc->mode == INET_MODE_BINARY) && - (desc->hsz < n) && (nsz < BIN_REALLOC_LIMIT(sz))) { + (desc->hsz < (udesc->i_ptr - ptr)) && + ((udesc->i_ptr - ptr) + BIN_REALLOC_MARGIN(desc->bufsz) >= + udesc->i_bufsz)) { ErlDrvBinary* tmp; - if ((tmp = realloc_buffer(buf,nsz+offs)) != NULL) - buf = tmp; + int bufsz; + bufsz = udesc->i_ptr - udesc->i_buf->orig_bytes; + if ((tmp = realloc_buffer(udesc->i_buf, bufsz)) != NULL) { + udesc->i_buf = tmp; + udesc->i_bufsz = bufsz; + } } #ifdef HAVE_SCTP if (IS_SCTP(desc)) extra = &mhdr; #endif /* Actual parsing and return of the data received, occur here: */ - code = packet_reply_binary_data(desc, (unsigned int)alen, - buf, offs, nsz, extra); - free_buffer(buf); + code = packet_reply_binary_data(desc, len, udesc->i_buf, + ptr - udesc->i_buf->orig_bytes, + udesc->i_ptr - ptr, + extra); + free_buffer(udesc->i_buf); + udesc->i_buf = NULL; if (code < 0) return count; count++; @@ -10048,7 +10313,17 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) return count; /* passive mode (read one packet only) */ } } + } /* while(packet_count--) { */ + + /* we ran out of tries (packet_count) either on an active socket + * that got that many messages or an SCTP socket that got that + * many message fragments but still not the final + */ +#ifdef HAVE_SCTP + if (short_recv) { + sock_select(desc, FD_READ, 1); } +#endif return count; } @@ -10058,7 +10333,7 @@ static void packet_inet_drv_output(ErlDrvData e, ErlDrvEvent event) } /* UDP/SCTP socket ready for output: -** This is a Back-End for Non-Block SCTP Connect (SCTP_STATE_CONNECTING) +** This is a Back-End for Non-Block SCTP Connect (INET_STATE_CONNECTING) */ static int packet_inet_output(udp_descriptor* udesc, HANDLE event) { @@ -10069,7 +10344,7 @@ static int packet_inet_output(udp_descriptor* udesc, HANDLE event) DEBUGF(("packet_inet_output(%ld) {s=%d\r\n", (long)desc->port, desc->s)); - if (desc->state == SCTP_STATE_CONNECTING) { + if (desc->state == INET_STATE_CONNECTING) { sock_select(desc, FD_CONNECT, 0); driver_cancel_timer(ix); /* posssibly cancel a timer */ @@ -10089,7 +10364,7 @@ static int packet_inet_output(udp_descriptor* udesc, HANDLE event) (struct sockaddr*) &desc->remote, &sz); if (IS_SOCKET_ERROR(code)) { - desc->state = PACKET_STATE_BOUND; /* restore state */ + desc->state = INET_STATE_BOUND; /* restore state */ ret = async_error(desc, sock_errno()); goto done; } @@ -10102,7 +10377,7 @@ static int packet_inet_output(udp_descriptor* udesc, HANDLE event) (void *)&error, &sz); if ((code < 0) || error) { - desc->state = PACKET_STATE_BOUND; /* restore state */ + desc->state = INET_STATE_BOUND; /* restore state */ ret = async_error(desc, error); goto done; } @@ -10110,7 +10385,7 @@ static int packet_inet_output(udp_descriptor* udesc, HANDLE event) #endif /* SO_ERROR */ #endif /* !__WIN32__ */ - desc->state = PACKET_STATE_CONNECTED; + desc->state = INET_STATE_CONNECTED; async_ok(desc); } else { |