diff options
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 455 |
1 files changed, 394 insertions, 61 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index ee3b9f2a98..56a16a87a1 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -429,6 +429,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL; #define SOCKET_STATE_CONNECTING (SOCKET_STATE_OPEN | SOCKET_FLAG_CON) #define SOCKET_STATE_ACCEPTING (SOCKET_STATE_LISTENING | SOCKET_FLAG_ACC) #define SOCKET_STATE_CLOSING (SOCKET_FLAG_CLOSE) +#define SOCKET_STATE_DTOR (0xFFFF) #define IS_CLOSED(d) \ ((d)->state == SOCKET_STATE_CLOSED) @@ -485,6 +486,9 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL; #define SOCKET_OPT_VALUE_TYPE_INT 1 #define SOCKET_OPT_VALUE_TYPE_BOOL 2 +#define ESOCK_DESC_PATTERN_CREATED 0x03030303 +#define ESOCK_DESC_PATTERN_DTOR 0xC0C0C0C0 + typedef union { struct { // 0 = not open, 1 = open @@ -528,6 +532,7 @@ typedef union { #define SOCKET_TYPE_SEQPACKET 5 /* protocol */ +#define SOCKET_PROTOCOL_DEFAULT 0 #define SOCKET_PROTOCOL_IP 1 #define SOCKET_PROTOCOL_TCP 2 #define SOCKET_PROTOCOL_UDP 3 @@ -658,6 +663,10 @@ typedef union { #define SOCKET_SUPPORTS_OPTIONS 0x0001 #define SOCKET_SUPPORTS_SCTP 0x0002 #define SOCKET_SUPPORTS_IPV6 0x0003 +#define SOCKET_SUPPORTS_LOCAL 0x0004 + +#define ESOCK_WHICH_PROTO_ERROR -1 +#define ESOCK_WHICH_PROTO_UNSUP -2 @@ -803,6 +812,14 @@ typedef struct { typedef struct { + /* + * +++ This is a way to, possibly, detect memory overrides "and stuff" +++ + * + * We have two patterns. One is set when the descriptor is created (allocated) + * and one is set when the descriptor is dtor'ed. + */ + Uint32 pattern; + /* +++ The actual socket +++ */ SOCKET sock; HANDLE event; @@ -820,6 +837,10 @@ typedef struct { ErlNifPid ctrlPid; ESockMonitor ctrlMon; + /* +++ Connector process +++ */ + ErlNifPid connPid; + ESockMonitor connMon; + /* +++ Write stuff +++ */ ErlNifMutex* writeMtx; ESockRequestor currentWriter; @@ -994,12 +1015,15 @@ static ERL_NIF_TERM nsupports_options_udp(ErlNifEnv* env); static ERL_NIF_TERM nsupports_options_sctp(ErlNifEnv* env); static ERL_NIF_TERM nsupports_sctp(ErlNifEnv* env); static ERL_NIF_TERM nsupports_ipv6(ErlNifEnv* env); +static ERL_NIF_TERM nsupports_local(ErlNifEnv* env); static ERL_NIF_TERM nopen(ErlNifEnv* env, int domain, int type, int protocol, char* netns); +static BOOLEAN_T nopen_which_protocol(SOCKET sock, int* proto); + static ERL_NIF_TERM nbind(ErlNifEnv* env, ESockDescriptor* descP, ESockAddress* sockAddrP, @@ -2369,6 +2393,8 @@ static int socket_setopt(int sock, const void* optVal, const socklen_t optLen); +static BOOLEAN_T is_connector(ErlNifEnv* env, + ESockDescriptor* descP); static BOOLEAN_T verify_is_connected(ESockDescriptor* descP, int* err); static ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event); @@ -2644,6 +2670,7 @@ static char str_exsend[] = "exsend"; // failed send GLOBAL_ATOM_DECL(ctrunc); \ GLOBAL_ATOM_DECL(data); \ GLOBAL_ATOM_DECL(debug); \ + GLOBAL_ATOM_DECL(default); \ GLOBAL_ATOM_DECL(default_send_params); \ GLOBAL_ATOM_DECL(delayed_ack_time); \ GLOBAL_ATOM_DECL(dgram); \ @@ -3046,6 +3073,9 @@ ERL_NIF_TERM nif_info(ErlNifEnv* env, * {tcp, [{Opt, boolean()}]}, * {udp, [{Opt, boolean()}]}, * {sctp, [{Opt, boolean()}]}] + * sctp boolean() + * ipv6 boolean() + * local boolean() */ static @@ -3073,13 +3103,10 @@ ERL_NIF_TERM nif_supports(ErlNifEnv* env, -/* nopen - create an endpoint for communication +/* nsupports - what features do we support * - * Assumes the input has been validated. - * - * Normally we want debugging on (individual) sockets to be controlled - * by the sockets own debug flag. But since we don't even have a socket - * yet, we must use the global debug flag. + * This is to prove information about what features actually + * work on the current platform. */ #if !defined(__WIN32__) static @@ -3102,6 +3129,10 @@ ERL_NIF_TERM nsupports(ErlNifEnv* env, int key) result = nsupports_ipv6(env); break; + case SOCKET_SUPPORTS_LOCAL: + result = nsupports_local(env); + break; + default: result = esock_atom_false; break; @@ -4004,7 +4035,7 @@ ERL_NIF_TERM nsupports_options_tcp(ErlNifEnv* env) /* *** SOCKET_OPT_TCP_MAXSEG => TCP_MAXSEG *** */ -#if defined(TCP_) +#if defined(TCP_MAXSEG) tmp = MKT2(env, esock_atom_maxseg, esock_atom_true); #else tmp = MKT2(env, esock_atom_maxseg, esock_atom_false); @@ -4018,7 +4049,7 @@ ERL_NIF_TERM nsupports_options_tcp(ErlNifEnv* env) /* *** SOCKET_OPT_TCP_NODELAY => TCP_NODELAY *** */ -#if defined(TCP_) +#if defined(TCP_NODELAY) tmp = MKT2(env, esock_atom_nodelay, esock_atom_true); #else tmp = MKT2(env, esock_atom_nodelay, esock_atom_false); @@ -4324,6 +4355,24 @@ ERL_NIF_TERM nsupports_ipv6(ErlNifEnv* env) +#if !defined(__WIN32__) +static +ERL_NIF_TERM nsupports_local(ErlNifEnv* env) +{ + ERL_NIF_TERM supports; + +#if defined(AF_LOCAL) + supports = esock_atom_true; +#else + supports = esock_atom_false; +#endif + + return supports; +} +#endif + + + /* ---------------------------------------------------------------------- * nif_open * @@ -4421,6 +4470,7 @@ ERL_NIF_TERM nif_open(ErlNifEnv* env, * yet, we must use the global debug flag. */ #if !defined(__WIN32__) + static ERL_NIF_TERM nopen(ErlNifEnv* env, int domain, int type, int protocol, @@ -4428,7 +4478,7 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, { ESockDescriptor* descP; ERL_NIF_TERM res; - int save_errno = 0; + int proto = protocol, save_errno = 0; SOCKET sock; HANDLE event; #ifdef HAVE_SETNS @@ -4448,11 +4498,35 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, return esock_make_error_errno(env, save_errno); #endif - if ((sock = sock_open(domain, type, protocol)) == INVALID_SOCKET) + if ((sock = sock_open(domain, type, proto)) == INVALID_SOCKET) return esock_make_error_errno(env, sock_errno()); SGDBG( ("SOCKET", "nopen -> open success: %d\r\n", sock) ); + + /* NOTE that if the protocol = 0 (default) and the domain is not + * local (AF_LOCAL) we need to explicitly get the protocol here! + */ + + if ((proto == 0) +#if defined(AF_LOCAL) + && (domain != AF_LOCAL) +#endif + ) + if (!nopen_which_protocol(sock, &proto)) { + if (proto == ESOCK_WHICH_PROTO_ERROR) { + save_errno = sock_errno(); + while ((sock_close(sock) == INVALID_SOCKET) && + (sock_errno() == EINTR)); + return esock_make_error_errno(env, save_errno); + } else { + while ((sock_close(sock) == INVALID_SOCKET) && + (sock_errno() == EINTR)); + return esock_make_error(env, esock_atom_eafnosupport); + } + } + + #ifdef HAVE_SETNS if ((netns != NULL) && !restore_network_namespace(current_ns, sock, &save_errno)) @@ -4484,7 +4558,7 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, descP->state = SOCKET_STATE_OPEN; descP->domain = domain; descP->type = type; - descP->protocol = protocol; + descP->protocol = proto; /* Does this apply to other types? Such as RAW? * Also, is this really correct? Should we not wait for bind? @@ -4521,6 +4595,32 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, return esock_make_ok2(env, res); } + + +static +BOOLEAN_T nopen_which_protocol(SOCKET sock, int* proto) +{ +#if defined(SO_PROTOCOL) + int val; + SOCKOPTLEN_T valSz = sizeof(val); + int res; + + res = sock_getopt(sock, SOL_SOCKET, SO_PROTOCOL, &val, &valSz); + + if (res != 0) { + *proto = ESOCK_WHICH_PROTO_ERROR; + return FALSE; + } else { + *proto = val; + return TRUE; + } +#else + *proto = ESOCK_WHICH_PROTO_UNSUP; + return FALSE; +#endif +} + + #endif // if !defined(__WIN32__) @@ -4813,17 +4913,17 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, return esock_make_error(env, atom_closed); if (!IS_OPEN(descP)) { - SSDBG( descP, ("SOCKET", "nif_connect -> not open\r\n") ); + SSDBG( descP, ("SOCKET", "nconnect -> not open\r\n") ); return esock_make_error(env, atom_exbadstate); } if (IS_CONNECTED(descP)) { - SSDBG( descP, ("SOCKET", "nif_connect -> already connected\r\n") ); + SSDBG( descP, ("SOCKET", "nconnect -> already connected\r\n") ); return esock_make_error(env, atom_eisconn); } - if (IS_CONNECTING(descP)) { - SSDBG( descP, ("SOCKET", "nif_connect -> already connecting\r\n") ); + if (IS_CONNECTING(descP) && !is_connector(env, descP)) { + SSDBG( descP, ("SOCKET", "nconnect -> already connecting\r\n") ); return esock_make_error(env, esock_atom_einval); } @@ -4837,31 +4937,93 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, descP->addrLen); save_errno = sock_errno(); - SSDBG( descP, ("SOCKET", "nif_connect -> connect result: %d, %d\r\n", + SSDBG( descP, ("SOCKET", "nconnect -> connect result: %d, %d\r\n", code, save_errno) ); - if (IS_SOCKET_ERROR(code) && - ((save_errno == ERRNO_BLOCK) || /* Winsock2 */ - (save_errno == EINPROGRESS))) { /* Unix & OSE!! */ - ref = MKREF(env); - descP->state = SOCKET_STATE_CONNECTING; - if ((sres = esock_select_write(env, descP->sock, descP, NULL, - sockRef, ref)) < 0) { - res = esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); - } else { - res = esock_make_ok2(env, ref); + if (IS_SOCKET_ERROR(code)) { + switch (save_errno) { + case ERRNO_BLOCK: /* Winsock2 */ + case EINPROGRESS: /* Unix & OSE!! */ + SSDBG( descP, ("SOCKET", "nconnect -> would block => select\r\n") ); + + ref = MKREF(env); + + if (IS_CONNECTING(descP)) { + /* Glitch */ + res = esock_make_ok2(env, ref); + } else { + + /* First time here */ + + if (enif_self(env, &descP->connPid) == NULL) + return esock_make_error(env, atom_exself); + + if (MONP("nconnect -> conn", + env, descP, + &descP->connPid, + &descP->connMon) != 0) + return esock_make_error(env, atom_exmon); + + descP->state = SOCKET_STATE_CONNECTING; + + if ((sres = esock_select_write(env, descP->sock, descP, NULL, + sockRef, ref)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + res = esock_make_ok2(env, ref); + } + } + break; + + case EISCONN: + SSDBG( descP, ("SOCKET", "nconnect -> *already* connected\r\n") ); + { + /* This is ***strange*** so make sure */ + int err = 0; + if (!verify_is_connected(descP, &err)) { + descP->state = SOCKET_STATE_OPEN; /* restore state */ + res = esock_make_error_errno(env, err); + } else { + descP->state = SOCKET_STATE_CONNECTED; + /* And just to be on the safe side, reset these */ + enif_set_pid_undefined(&descP->connPid); + DEMONP("nconnect -> connected", + env, descP, &descP->connMon); + descP->isReadable = TRUE; + descP->isWritable = TRUE; + res = esock_atom_ok; + } + } + break; + + default: + SSDBG( descP, ("SOCKET", "nconnect -> other error(1): %d\r\n", + save_errno) ); + res = esock_make_error_errno(env, save_errno); + break; } + } else if (code == 0) { /* ok we are connected */ + SSDBG( descP, ("SOCKET", "nconnect -> connected\r\n") ); + descP->state = SOCKET_STATE_CONNECTED; + enif_set_pid_undefined(&descP->connPid); + DEMONP("nconnect -> connected", env, descP, &descP->connMon); descP->isReadable = TRUE; descP->isWritable = TRUE; res = esock_atom_ok; + } else { + /* Do we really need this case? */ + + SSDBG( descP, ("SOCKET", "nconnect -> other error(2): %d\r\n", + save_errno) ); + res = esock_make_error_errno(env, save_errno); } @@ -4916,7 +5078,7 @@ ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, { int error; - if (descP->state != SOCKET_STATE_CONNECTING) + if (!IS_CONNECTING(descP)) return esock_make_error(env, atom_enotconn); if (!verify_is_connected(descP, &error)) { @@ -4925,6 +5087,8 @@ ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, } descP->state = SOCKET_STATE_CONNECTED; + enif_set_pid_undefined(&descP->connPid); + DEMONP("nfinalize_connection -> connected", env, descP, &descP->connMon); descP->isReadable = TRUE; descP->isWritable = TRUE; @@ -4987,6 +5151,29 @@ BOOLEAN_T verify_is_connected(ESockDescriptor* descP, int* err) +/* *** is_connector *** + * Check if the current process is the connector process. + */ +#if !defined(__WIN32__) +static +BOOLEAN_T is_connector(ErlNifEnv* env, + ESockDescriptor* descP) +{ + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return FALSE; + + if (COMPARE_PIDS(&descP->connPid, &caller) == 0) + return TRUE; + else + return FALSE; + +} +#endif + + + /* ---------------------------------------------------------------------- * nif_listen * @@ -5253,6 +5440,7 @@ ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env, enif_set_pid_undefined(&descP->currentAcceptor.pid); res = esock_make_error(env, atom_exmon); } else { + ESOCK_ASSERT(!descP->currentAcceptor.env); descP->currentAcceptor.env = esock_alloc_env("current acceptor"); descP->currentAcceptor.ref = CP_TERM(descP->currentAcceptor.env, accRef); @@ -5411,6 +5599,7 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, esock_free_env("naccept_accepting_current_accept - " "current-accept-env", descP->currentAcceptor.env); + descP->currentAcceptor.env = NULL; if (!activate_next_acceptor(env, descP, sockRef)) { @@ -5422,6 +5611,7 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, descP->state = SOCKET_STATE_LISTENING; descP->currentAcceptorP = NULL; + ESOCK_ASSERT(!descP->currentAcceptor.env); descP->currentAcceptor.env = NULL; MON_INIT(&descP->currentAcceptor.mon); } @@ -5448,6 +5638,7 @@ ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env, ESockRequestor req; ERL_NIF_TERM res, reason; + req.env = NULL; if (save_errno == ERRNO_BLOCK) { /* @@ -5475,6 +5666,7 @@ ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env, req.pid) ); esock_send_abort_msg(env, sockRef, req.ref, req.env, reason, &req.pid); + req.env = NULL; DEMONP("naccept_accepting_current_error -> pop'ed writer", env, descP, &req.mon); } @@ -8860,35 +9052,67 @@ ERL_NIF_TERM nsetopt_lvl_ip_update_membership(ErlNifEnv* env, #endif // It must be a map - if (!IS_MAP(env, eVal)) + if (!IS_MAP(env, eVal)) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ip_update_membership -> " + "value *not* a map\r\n") ); return enif_make_badarg(env); + } // It must have atleast two attributes - if (!enif_get_map_size(env, eVal, &sz) || (sz >= 2)) + if (!enif_get_map_size(env, eVal, &sz) || (sz < 2)) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ip_update_membership -> " + "invalid map value: %T\r\n", eVal) ); return enif_make_badarg(env); + } - if (!GET_MAP_VAL(env, eVal, atom_multiaddr, &eMultiAddr)) + if (!GET_MAP_VAL(env, eVal, atom_multiaddr, &eMultiAddr)) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ip_update_membership -> " + "failed get multiaddr (map) attribute\r\n") ); return enif_make_badarg(env); + } - if (!GET_MAP_VAL(env, eVal, atom_interface, &eInterface)) + if (!GET_MAP_VAL(env, eVal, atom_interface, &eInterface)) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ip_update_membership -> " + "failed get interface (map) attribute\r\n") ); return enif_make_badarg(env); + } if ((xres = esock_decode_ip4_address(env, eMultiAddr, - &mreq.imr_multiaddr)) != NULL) + &mreq.imr_multiaddr)) != NULL) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ip_update_membership -> " + "failed decode multiaddr %T: %s\r\n", eMultiAddr, xres) ); return esock_make_error_str(env, xres); + } if ((xres = esock_decode_ip4_address(env, eInterface, - &mreq.imr_interface)) != NULL) + &mreq.imr_interface)) != NULL) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ip_update_membership -> " + "failed decode interface %T: %s\r\n", eInterface, xres) ); return esock_make_error_str(env, xres); + } res = socket_setopt(descP->sock, level, opt, &mreq, sizeof(mreq)); - if (res != 0) - result = esock_make_error_errno(env, sock_errno()); - else + if (res != 0) { + int save_errno = sock_errno(); + + result = esock_make_error_errno(env, save_errno); + + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ip_update_membership -> " + "failed setopt: %T (%d)\r\n", result, save_errno) ); + + } else { result = esock_atom_ok; + } return result; } @@ -9492,33 +9716,65 @@ ERL_NIF_TERM nsetopt_lvl_ipv6_update_membership(ErlNifEnv* env, #endif // It must be a map - if (!IS_MAP(env, eVal)) + if (!IS_MAP(env, eVal)) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ipv6_update_membership -> " + "value *not* a map\r\n") ); return enif_make_badarg(env); + } // It must have atleast two attributes - if (!enif_get_map_size(env, eVal, &sz) || (sz >= 2)) + if (!enif_get_map_size(env, eVal, &sz) || (sz < 2)) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ipv6_update_membership -> " + "invalid map value: %T\r\n", eVal) ); return enif_make_badarg(env); + } - if (!GET_MAP_VAL(env, eVal, atom_multiaddr, &eMultiAddr)) + if (!GET_MAP_VAL(env, eVal, atom_multiaddr, &eMultiAddr)) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ipv6_update_membership -> " + "failed get multiaddr (map) attribute\r\n") ); return enif_make_badarg(env); + } - if (!GET_MAP_VAL(env, eVal, atom_interface, &eInterface)) + if (!GET_MAP_VAL(env, eVal, atom_interface, &eInterface)) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ipv6_update_membership -> " + "failed get interface (map) attribute\r\n") ); return enif_make_badarg(env); + } if ((xres = esock_decode_ip6_address(env, eMultiAddr, - &mreq.ipv6mr_multiaddr)) != NULL) + &mreq.ipv6mr_multiaddr)) != NULL) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ipv6_update_membership -> " + "failed decode multiaddr %T: %s\r\n", eMultiAddr, xres) ); return esock_make_error_str(env, xres); + } - if (!GET_UINT(env, eInterface, &mreq.ipv6mr_interface)) + if (!GET_UINT(env, eInterface, &mreq.ipv6mr_interface)) { + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ip_update_membership -> " + "failed decode interface %T: %s\r\n", eInterface, xres) ); return esock_make_error(env, esock_atom_einval); + } res = socket_setopt(descP->sock, level, opt, &mreq, sizeof(mreq)); - if (res != 0) - result = esock_make_error_errno(env, sock_errno()); - else + if (res != 0) { + int save_errno = sock_errno(); + + result = esock_make_error_errno(env, save_errno); + + SSDBG( descP, + ("SOCKET", "nsetopt_lvl_ipv6_update_membership -> " + "failed setopt: %T (%d)\r\n", result, save_errno) ); + + } else { result = esock_atom_ok; + } return result; } @@ -10890,7 +11146,15 @@ ERL_NIF_TERM ngetopt_otp_protocol(ErlNifEnv* env, switch (val) { case IPPROTO_IP: +#if defined(AF_LOCAL) + if (descP->domain == AF_LOCAL) { + result = esock_make_ok2(env, esock_atom_default); + } else { + result = esock_make_ok2(env, esock_atom_ip); + } +#else result = esock_make_ok2(env, esock_atom_ip); +#endif break; case IPPROTO_TCP: @@ -11462,7 +11726,14 @@ ERL_NIF_TERM ngetopt_lvl_sock_protocol(ErlNifEnv* env, } else { switch (val) { case IPPROTO_IP: +#if defined(AF_LOCAL) + if (descP->domain == AF_LOCAL) + result = esock_make_ok2(env, esock_atom_default); + else + result = esock_make_ok2(env, esock_atom_ip); +#else result = esock_make_ok2(env, esock_atom_ip); +#endif break; case IPPROTO_TCP: @@ -14037,6 +14308,7 @@ ERL_NIF_TERM send_check_ok(ErlNifEnv* env, DEMONP("send_check_ok -> current writer", env, descP, &descP->currentWriter.mon); esock_free_env("send_check_ok", descP->currentWriter.env); + descP->currentWriter.env = NULL; } SSDBG( descP, @@ -14049,6 +14321,7 @@ ERL_NIF_TERM send_check_ok(ErlNifEnv* env, if (!activate_next_writer(env, descP, sockRef)) { descP->currentWriterP = NULL; + ESOCK_ASSERT(!descP->currentWriter.env); descP->currentWriter.env = NULL; descP->currentWriter.ref = esock_atom_undefined; enif_set_pid_undefined(&descP->currentWriter.pid); @@ -14074,6 +14347,7 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env, ESockRequestor req; ERL_NIF_TERM reason; + req.env = NULL; cnt_inc(&descP->writeFails, 1); SSDBG( descP, ("SOCKET", "send_check_fail -> error: %d\r\n", saveErrno) ); @@ -14090,6 +14364,7 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env, ("SOCKET", "send_check_fail -> abort %T\r\n", req.pid) ); esock_send_abort_msg(env, sockRef, req.ref, req.env, reason, &req.pid); + req.env = NULL; DEMONP("send_check_fail -> pop'ed writer", env, descP, &req.mon); } } @@ -14131,6 +14406,7 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env, enif_set_pid_undefined(&descP->currentWriter.pid); return esock_make_error(env, atom_exmon); } else { + ESOCK_ASSERT(!descP->currentWriter.env); descP->currentWriter.env = esock_alloc_env("current-writer"); descP->currentWriter.ref = CP_TERM(descP->currentWriter.env, sendRef); descP->currentWriterP = &descP->currentWriter; @@ -14260,7 +14536,7 @@ char* recv_init_current_reader(ErlNifEnv* env, enif_set_pid_undefined(&descP->currentReader.pid); return str_exmon; } else { - + ESOCK_ASSERT(!descP->currentReader.env); descP->currentReader.env = esock_alloc_env("current-reader"); descP->currentReader.ref = CP_TERM(descP->currentReader.env, recvRef); @@ -14341,6 +14617,7 @@ void recv_error_current_reader(ErlNifEnv* env, { ESockRequestor req; + req.env = NULL; if (descP->currentReaderP != NULL) { DEMONP("recv_error_current_reader -> current reader", @@ -14352,6 +14629,7 @@ void recv_error_current_reader(ErlNifEnv* env, req.pid) ); esock_send_abort_msg(env, sockRef, req.ref, req.env, reason, &req.pid); + req.env = NULL; DEMONP("recv_error_current_reader -> pop'ed reader", env, descP, &req.mon); } @@ -15123,8 +15401,8 @@ char* encode_msghdr(ErlNifEnv* env, "\r\n read: %d" "\r\n", read) ); - /* The address is not used if we are connected, - * so check (length = 0) before we try to encodel + /* The address is not used if we are connected (unless, maybe, + * family is 'local'), so check (length = 0) before we try to encodel */ if (msgHdrP->msg_namelen != 0) { if ((xres = esock_encode_sockaddr(env, @@ -16246,8 +16524,6 @@ char* encode_cmsghdr_data_ipv6(ErlNifEnv* env, size_t dataLen, ERL_NIF_TERM* eCMsgHdrData) { - char* xres; - switch (type) { #if defined(IPV6_PKTINFO) case IPV6_PKTINFO: @@ -16255,6 +16531,7 @@ char* encode_cmsghdr_data_ipv6(ErlNifEnv* env, struct in6_pktinfo* pktInfoP = (struct in6_pktinfo*) dataP; ERL_NIF_TERM ifIndex = MKI(env, pktInfoP->ipi6_ifindex); ERL_NIF_TERM addr; + char* xres; if ((xres = esock_encode_ip6_address(env, &pktInfoP->ipi6_addr, @@ -16764,8 +17041,10 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) if ((descP = enif_alloc_resource(sockets, sizeof(ESockDescriptor))) != NULL) { char buf[64]; /* Buffer used for building the mutex name */ - // This needs to be released when the socket is closed! - // descP->env = enif_alloc_env(); + descP->pattern = ESOCK_DESC_PATTERN_CREATED; + + enif_set_pid_undefined(&descP->connPid); + MON_INIT(&descP->connMon); sprintf(buf, "esock[w,%d]", sock); descP->writeMtx = MCREATE(buf); @@ -17027,6 +17306,10 @@ BOOLEAN_T eproto2proto(ErlNifEnv* env, } switch (ep) { + case SOCKET_PROTOCOL_DEFAULT: + *proto = 0; // default - note that _IP also has the value 0... + break; + case SOCKET_PROTOCOL_IP: *proto = IPPROTO_IP; break; @@ -17647,6 +17930,7 @@ int esock_select_cancel(ErlNifEnv* env, esock_send_abort_msg(env, sockRef, \ reqP->ref, reqP->env, \ reason, &reqP->pid); \ + reqP->env = NULL; \ \ } else { \ \ @@ -17745,7 +18029,7 @@ REQ_SEARCH4PID_FUNCS reqP->pid = pid; \ if (MONP("reader_push -> " #F " request", \ env, descP, &pid, &reqP->mon) != 0) { \ - FREE(reqP); \ + FREE(e); \ return esock_make_error(env, atom_exmon); \ } \ reqP->env = esock_alloc_env(#F "_push"); \ @@ -17822,6 +18106,9 @@ BOOLEAN_T requestor_pop(ESockRequestQueue* q, { ESockRequestQueueElement* e = qpop(q); + if (reqP->env) + esock_free_env("requestor_pop", reqP->env); + if (e != NULL) { reqP->pid = e->data.pid; reqP->mon = e->data.mon; @@ -17933,6 +18220,8 @@ BOOLEAN_T qunqueue(ErlNifEnv* env, } } + if (e->data.env) + esock_free_env("qunqueue", e->data.env); FREE(e); return TRUE; @@ -18074,6 +18363,18 @@ ERL_NIF_TERM esock_make_monitor_term(ErlNifEnv* env, const ESockMonitor* monP) * ---------------------------------------------------------------------- */ + +static void free_request_queue(ESockRequestQueue* q) +{ + while (q->first) { + ESockRequestQueueElement* free_me = q->first; + q->first = free_me->nextP; + if (free_me->data.env) + esock_free_env("dtor", free_me->data.env); + FREE(free_me); + } +} + /* ========================================================================= * socket_dtor - Callback function for resource destructor * @@ -18084,11 +18385,30 @@ void socket_dtor(ErlNifEnv* env, void* obj) #if !defined(__WIN32__) ESockDescriptor* descP = (ESockDescriptor*) obj; - MDESTROY(descP->writeMtx); - MDESTROY(descP->readMtx); - MDESTROY(descP->accMtx); - MDESTROY(descP->closeMtx); - MDESTROY(descP->cfgMtx); + MDESTROY(descP->writeMtx); descP->writeMtx = NULL; + MDESTROY(descP->readMtx); descP->readMtx = NULL; + MDESTROY(descP->accMtx); descP->accMtx = NULL; + MDESTROY(descP->closeMtx); descP->closeMtx = NULL; + MDESTROY(descP->cfgMtx); descP->cfgMtx = NULL; + + if (descP->currentReader.env) { + esock_free_env("dtor reader", descP->currentReader.env); + descP->currentReader.env = NULL; + } + if (descP->currentWriter.env) { + esock_free_env("dtor writer", descP->currentWriter.env); + descP->currentWriter.env = NULL; + } + if (descP->currentAcceptor.env) { + esock_free_env("dtor acceptor", descP->currentAcceptor.env); + descP->currentAcceptor.env = NULL; + } + free_request_queue(&descP->readersQ); + free_request_queue(&descP->writersQ); + free_request_queue(&descP->acceptorsQ); + + descP->state = SOCKET_STATE_DTOR; + descP->pattern = ESOCK_DESC_PATTERN_DTOR; #endif } @@ -18321,6 +18641,7 @@ void socket_stop_handle_current(ErlNifEnv* env, "current %s %T\r\n", reqP->ref, role, reqP->pid); } + reqP->env = NULL; } } @@ -18376,6 +18697,7 @@ void inform_waiting_procs(ErlNifEnv* env, currentP->data.pid); } + currentP->data.env = NULL, DEMONP("inform_waiting_procs -> current 'request'", env, descP, ¤tP->data.mon); @@ -18510,6 +18832,17 @@ void socket_down(ErlNifEnv* env, MON2T(env, mon)); } + } else if (COMPARE_PIDS(&descP->connPid, pid) == 0) { + + /* The connPid is only set during the connection. + * The same goes for the monitor (connMon). + */ + + descP->state = SOCKET_STATE_OPEN; /* restore state */ + enif_set_pid_undefined(&descP->connPid); + DEMONP("socket_down -> connector", + env, descP, &descP->connMon); + } else { /* check all operation queue(s): acceptor, writer and reader. |