From eaeec753c035c2214be2930290a0a6de411566b0 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 7 Dec 2018 15:49:55 +0100 Subject: Pluggable distribution socket implementation for EI --- lib/erl_interface/src/connect/ei_connect.c | 1086 +++++++++++++++++++-------- lib/erl_interface/src/connect/eirecv.c | 62 +- lib/erl_interface/src/connect/send.c | 74 +- lib/erl_interface/src/connect/send_exit.c | 25 +- lib/erl_interface/src/connect/send_reg.c | 64 +- lib/erl_interface/src/epmd/epmd_port.c | 85 ++- lib/erl_interface/src/epmd/epmd_publish.c | 36 +- lib/erl_interface/src/epmd/epmd_unpublish.c | 33 +- lib/erl_interface/src/legacy/erl_connect.c | 9 +- lib/erl_interface/src/misc/ei_internal.h | 18 +- lib/erl_interface/src/misc/ei_portio.c | 858 ++++++++++++++++----- lib/erl_interface/src/misc/ei_portio.h | 95 ++- lib/erl_interface/src/not_used/send_link.c | 3 +- 13 files changed, 1800 insertions(+), 648 deletions(-) (limited to 'lib/erl_interface/src') diff --git a/lib/erl_interface/src/connect/ei_connect.c b/lib/erl_interface/src/connect/ei_connect.c index 9df4fa3b6c..1132c9fc23 100644 --- a/lib/erl_interface/src/connect/ei_connect.c +++ b/lib/erl_interface/src/connect/ei_connect.c @@ -21,8 +21,6 @@ * Purpose: Connect to any node at any host. (EI version) */ -#include "eidef.h" - #include #include #include @@ -84,7 +82,9 @@ #include #include #include +#include +#include "eidef.h" #include "eiext.h" #include "ei_portio.h" #include "ei_internal.h" @@ -103,6 +103,10 @@ int ei_tracelevel = 0; #define COOKIE_FILE "/.erlang.cookie" #define EI_MAX_HOME_PATH 1024 +#define EI_SOCKET_CALLBACKS_SZ_V1 \ + (offsetof(ei_socket_callbacks, get_fd) \ + + sizeof(int (*)(void *))) + /* FIXME why not macro? */ static char *null_cookie = ""; @@ -113,35 +117,51 @@ static int get_home(char *buf, int size); static unsigned gen_challenge(void); static void gen_digest(unsigned challenge, char cookie[], unsigned char digest[16]); -static int send_status(int fd, char *status, unsigned ms); -static int recv_status(int fd, unsigned ms); -static int send_challenge(int fd, char *nodename, - unsigned challenge, unsigned version, unsigned ms); -static int recv_challenge(int fd, unsigned *challenge, - unsigned *version, - unsigned *flags, ErlConnect *namebuf, unsigned ms); -static int send_challenge_reply(int fd, unsigned char digest[16], +static int send_status(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, char *status, unsigned ms); +static int recv_status(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned ms); +static int send_challenge(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + char *nodename, unsigned challenge, + unsigned version, unsigned ms); +static int recv_challenge(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + unsigned *challenge, unsigned *version, + unsigned *flags, char *namebuf, unsigned ms); +static int send_challenge_reply(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned char digest[16], unsigned challenge, unsigned ms); -static int recv_challenge_reply(int fd, - unsigned our_challenge, +static int recv_challenge_reply(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned our_challenge, char cookie[], unsigned *her_challenge, unsigned ms); -static int send_challenge_ack(int fd, unsigned char digest[16], unsigned ms); -static int recv_challenge_ack(int fd, - unsigned our_challenge, +static int send_challenge_ack(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned char digest[16], + unsigned ms); +static int recv_challenge_ack(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned our_challenge, char cookie[], unsigned ms); -static int send_name(int fd, char *nodename, - unsigned version, unsigned ms); +static int send_name(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + char *nodename, unsigned version, unsigned ms); -/* Common for both handshake types */ -static int recv_name(int fd, - unsigned *version, - unsigned *flags, ErlConnect *namebuf, unsigned ms); +static int recv_name(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + unsigned *version, unsigned *flags, char *namebuf, + unsigned ms); static struct hostent* dyn_gethostbyname_r(const char *name, struct hostent *hostp, char **buffer_p, int buflen, int *h_errnop); +static void abort_connection(ei_socket_callbacks *cbs, void *ctx); +static int close_connection(ei_socket_callbacks *cbs, void *ctx, int fd); + +static char * +estr(int e) +{ + char *str = strerror(e); + if (!str) + return "unknown error"; + return str; +} /*************************************************************************** @@ -154,25 +174,206 @@ dyn_gethostbyname_r(const char *name, struct hostent *hostp, char **buffer_p, typedef struct ei_socket_info_s { int socket; + ei_socket_callbacks *cbs; + void *ctx; int dist_version; ei_cnode cnode; /* A copy, not a pointer. We don't know when freed */ char cookie[EI_MAX_COOKIE_SIZE+1]; } ei_socket_info; +/*************************************************************************** + * + * XXX + * + ***************************************************************************/ + +#ifndef ETHR_HAVE___atomic_compare_exchange_n +# define ETHR_HAVE___atomic_compare_exchange_n 0 +#endif +#ifndef ETHR_HAVE___atomic_load_n +# define ETHR_HAVE___atomic_load_n 0 +#endif +#ifndef ETHR_HAVE___atomic_store_n +# define ETHR_HAVE___atomic_store_n 0 +#endif + +#if defined(_REENTRANT) \ + && (!(ETHR_HAVE___atomic_compare_exchange_n & SIZEOF_VOID_P) \ + || !(ETHR_HAVE___atomic_load_n & SIZEOF_VOID_P) \ + || !(ETHR_HAVE___atomic_store_n & SIZEOF_VOID_P)) +# undef EI_DISABLE_SEQ_SOCKET_INFO +# define EI_DISABLE_SEQ_SOCKET_INFO +#endif + +#ifdef __WIN32__ +# undef EI_DISABLE_SEQ_SOCKET_INFO +# define EI_DISABLE_SEQ_SOCKET_INFO +#endif + +#ifndef EI_DISABLE_SEQ_SOCKET_INFO + +#ifdef _REENTRANT + +#define EI_ATOMIC_CMPXCHG_ACQ_REL(VARP, XCHGP, NEW) \ + __atomic_compare_exchange_n((VARP), (XCHGP), (NEW), 0, \ + __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE) +#define EI_ATOMIC_LOAD_ACQ(VARP) \ + __atomic_load_n((VARP), __ATOMIC_ACQUIRE) +#define EI_ATOMIC_STORE_REL(VARP, NEW) \ + __atomic_store_n((VARP), (NEW), __ATOMIC_RELEASE) + +#else /* ! _REENTRANT */ + +#define EI_ATOMIC_CMPXCHG_ACQ_REL(VARP, XCHGP, NEW) \ + (*(VARP) == *(XCHGP) \ + ? ((*(VARP) = (NEW)), !0) \ + : ((*(XCHGP) = *(VARP)), 0)) +#define EI_ATOMIC_LOAD_ACQ(VARP) (*(VARP)) +#define EI_ATOMIC_STORE_REL(VARP, NEW) (*(VARP) = (NEW)) + +#endif /* ! _REENTRANT */ + +#define EI_SOCKET_INFO_SEG_BITS 5 +#define EI_SOCKET_INFO_SEG_SIZE (1 << EI_SOCKET_INFO_SEG_BITS) +#define EI_SOCKET_INFO_SEG_MASK (EI_SOCKET_INFO_SEG_SIZE - 1) + +typedef struct { + int max_fds; + ei_socket_info *segments[1]; /* Larger in reality... */ +} ei_socket_info_data__; + +static ei_socket_info_data__ *socket_info_data = NULL; + +static int init_socket_info(void) +{ + int max_fds; + int i; + size_t segments_len; + ei_socket_info_data__ *info_data, *xchg; + + if (EI_ATOMIC_LOAD_ACQ(&socket_info_data) != NULL) + return !0; /* Already initialized... */ + +#if defined(HAVE_SYSCONF) && defined(_SC_OPEN_MAX) + max_fds = sysconf(_SC_OPEN_MAX); +#else + max_fds = 1024; +#endif + + if (max_fds < 0) + return 0; + + segments_len = ((max_fds-1)/EI_SOCKET_INFO_SEG_SIZE + 1); + + info_data = malloc(sizeof(ei_socket_info_data__) + + (sizeof(ei_socket_info *)*(segments_len-1))); + if (!info_data) + return 0; + + info_data->max_fds = max_fds; + for (i = 0; i < segments_len; i++) + info_data->segments[i] = NULL; + + xchg = NULL; + if (!EI_ATOMIC_CMPXCHG_ACQ_REL(&socket_info_data, &xchg, info_data)) + free(info_data); /* Already initialized... */ + + return !0; +} + +static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode *ec, + ei_socket_callbacks *cbs, void *ctx) +{ + int six; + ei_socket_info *seg, *si; + int socket; + + if (fd < 0 || socket_info_data->max_fds <= fd) + return -1; + + socket = fd; + six = fd >> EI_SOCKET_INFO_SEG_BITS; + seg = EI_ATOMIC_LOAD_ACQ(&socket_info_data->segments[six]); + + if (!seg) { + ei_socket_info *xchg; + int i; + seg = malloc(sizeof(ei_socket_info)*EI_SOCKET_INFO_SEG_SIZE); + if (!seg) + return -1; + for (i = 0; i < EI_SOCKET_INFO_SEG_SIZE; i++) { + seg[i].socket = -1; + } + + xchg = NULL; + if (!EI_ATOMIC_CMPXCHG_ACQ_REL(&socket_info_data->segments[six], &xchg, seg)) { + free(seg); + seg = xchg; + } + } + + si = &seg[fd & EI_SOCKET_INFO_SEG_MASK]; + + if (dist_version < 0) { + socket = -1; + si->cbs = NULL; + si->ctx = NULL; + } + else { + si->dist_version = dist_version; + si->cnode = *ec; + si->cbs = cbs; + si->ctx = ctx; + strcpy(si->cookie, cookie); + } + + EI_ATOMIC_STORE_REL(&si->socket, socket); + + return 0; +} + +static ei_socket_info* get_ei_socket_info(int fd) +{ + int six, socket; + ei_socket_info *seg, *si; + + if (fd < 0 || socket_info_data->max_fds <= fd) + return NULL; + + six = fd >> EI_SOCKET_INFO_SEG_BITS; + seg = EI_ATOMIC_LOAD_ACQ(&socket_info_data->segments[six]); + + if (!seg) + return NULL; + + si = &seg[fd & EI_SOCKET_INFO_SEG_MASK]; + socket = EI_ATOMIC_LOAD_ACQ(&si->socket); + if (socket != fd) + return NULL; + return si; +} + +#else /* EI_DISABLE_SEQ_SOCKET_INFO */ + int ei_n_sockets = 0, ei_sz_sockets = 0; ei_socket_info *ei_sockets = NULL; + #ifdef _REENTRANT ei_mutex_t* ei_sockets_lock = NULL; #endif /* _REENTRANT */ +static int init_socket_info(void) +{ +#ifdef _REENTRANT + if (ei_sockets_lock == NULL) { + ei_sockets_lock = ei_mutex_create(); + } +#endif /* _REENTRANT */ + return !0; +} -/*************************************************************************** - * - * XXX - * - ***************************************************************************/ - -static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode *ec) +static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode *ec, + ei_socket_callbacks *cbs, void *ctx) { int i; @@ -182,11 +383,13 @@ static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode * for (i = 0; i < ei_n_sockets; ++i) { if (ei_sockets[i].socket == fd) { if (dist_version == -1) { - memmove(&ei_sockets[i], &ei_sockets[i+1], + memmove(&ei_sockets[i], &ei_sockets[i+1], sizeof(ei_sockets[0])*(ei_n_sockets-i-1)); } else { ei_sockets[i].dist_version = dist_version; /* Copy the content, see ei_socket_info */ + ei_sockets[i].cbs = cbs; + ei_sockets[i].ctx = ctx; ei_sockets[i].cnode = *ec; strcpy(ei_sockets[i].cookie, cookie); } @@ -209,7 +412,9 @@ static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode * } ei_sockets[ei_n_sockets].socket = fd; ei_sockets[ei_n_sockets].dist_version = dist_version; - ei_sockets[i].cnode = *ec; + ei_sockets[ei_n_sockets].cnode = *ec; + ei_sockets[ei_n_sockets].cbs = cbs; + ei_sockets[ei_n_sockets].ctx = ctx; strcpy(ei_sockets[ei_n_sockets].cookie, cookie); ++ei_n_sockets; } @@ -219,14 +424,6 @@ static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode * return 0; } -#if 0 -/* FIXME not used ?! */ -static int remove_ei_socket_info(int fd, int dist_version, char* cookie) -{ - return put_ei_socket_info(fd, -1, NULL); -} -#endif - static ei_socket_info* get_ei_socket_info(int fd) { int i; @@ -248,6 +445,13 @@ static ei_socket_info* get_ei_socket_info(int fd) return NULL; } +#endif /* EI_DISABLE_SEQ_SOCKET_INFO */ + +static int remove_ei_socket_info(int fd) +{ + return put_ei_socket_info(fd, -1, NULL, NULL, NULL, NULL); +} + ei_cnode *ei_fd_to_cnode(int fd) { ei_socket_info *sockinfo = get_ei_socket_info(fd); @@ -255,6 +459,19 @@ ei_cnode *ei_fd_to_cnode(int fd) return &sockinfo->cnode; } +int ei_get_cbs_ctx__(ei_socket_callbacks **cbs, void **ctx, int fd) +{ + ei_socket_info *sockinfo = get_ei_socket_info(fd); + if (sockinfo) { + *cbs = sockinfo->cbs; + *ctx = sockinfo->ctx; + return 0; + } + + *cbs = NULL; + *ctx = NULL; + return EBADF; +} /*************************************************************************** * Get/Set tracelevel @@ -333,21 +550,6 @@ const char *ei_getfdcookie(int fd) return r; } -/* call with cookie to set value to use on descriptor fd, -* or specify NULL to use default -*/ -/* FIXME why defined but not used? */ -#if 0 -static int ei_setfdcookie(ei_cnode* ec, int fd, char *cookie) -{ - int dist_version = ei_distversion(fd); - - if (cookie == NULL) - cookie = ec->ei_connect_cookie; - return put_ei_socket_info(fd, dist_version, cookie); -} -#endif - static int get_int32(unsigned char *s) { return ((s[0] << 24) | (s[1] << 16) | (s[2] << 8) | (s[3] )); @@ -405,12 +607,16 @@ static int initWinSock(void) * Initailize by setting: * thishostname, thisalivename, thisnodename and thisipaddr */ -int ei_connect_xinit(ei_cnode* ec, const char *thishostname, - const char *thisalivename, const char *thisnodename, - Erl_IpAddr thisipaddr, const char *cookie, - const short creation) +int ei_connect_xinit_ussi(ei_cnode* ec, const char *thishostname, + const char *thisalivename, const char *thisnodename, + Erl_IpAddr thisipaddr, const char *cookie, + const short creation, ei_socket_callbacks *cbs, + int cbs_sz, void *setup_context) { char *dbglevel; + + if (cbs != &ei_default_socket_callbacks) + EI_SET_HAVE_PLUGIN_SOCKET_IMPL__; /* FIXME this code was enabled for 'erl'_connect_xinit(), why not here? */ #if 0 @@ -422,12 +628,16 @@ int ei_connect_xinit(ei_cnode* ec, const char *thishostname, #endif #endif -#ifdef _REENTRANT - if (ei_sockets_lock == NULL) { - ei_sockets_lock = ei_mutex_create(); + if (!init_socket_info()) { + EI_TRACE_ERR0("ei_connect_xinit","can't initiate socket info"); + return ERL_ERROR; } -#endif /* _REENTRANT */ + if (cbs_sz < EI_SOCKET_CALLBACKS_SZ_V1) { + EI_TRACE_ERR0("ei_connect_xinit","invalid size of ei_socket_callbacks struct"); + return ERL_ERROR; + } + ec->creation = creation & 0x3; /* 2 bits */ if (cookie) { @@ -469,6 +679,9 @@ int ei_connect_xinit(ei_cnode* ec, const char *thishostname, ec->self.serial = 0; ec->self.creation = creation & 0x3; /* 2 bits */ + ec->cbs = cbs; + ec->setup_context = setup_context; + if ((dbglevel = getenv("EI_TRACELEVEL")) != NULL || (dbglevel = getenv("ERL_DEBUG_DIST")) != NULL) ei_tracelevel = atoi(dbglevel); @@ -476,14 +689,27 @@ int ei_connect_xinit(ei_cnode* ec, const char *thishostname, return 0; } +int ei_connect_xinit(ei_cnode* ec, const char *thishostname, + const char *thisalivename, const char *thisnodename, + Erl_IpAddr thisipaddr, const char *cookie, + const short creation) +{ + return ei_connect_xinit_ussi(ec, thishostname, thisalivename, thisnodename, + thisipaddr, cookie, creation, + &ei_default_socket_callbacks, + sizeof(ei_default_socket_callbacks), + NULL); +} /* * Initialize by set: thishostname, thisalivename, * thisnodename and thisipaddr. At success return 0, * otherwise return -1. */ -int ei_connect_init(ei_cnode* ec, const char* this_node_name, - const char *cookie, short creation) +int ei_connect_init_ussi(ei_cnode* ec, const char* this_node_name, + const char *cookie, short creation, + ei_socket_callbacks *cbs, int cbs_sz, + void *setup_context) { char thishostname[EI_MAXHOSTNAMELEN+1]; char thisnodename[MAXNODELEN+1]; @@ -500,12 +726,7 @@ int ei_connect_init(ei_cnode* ec, const char* this_node_name, return ERL_ERROR; } #endif /* win32 */ -#ifdef _REENTRANT - if (ei_sockets_lock == NULL) { - ei_sockets_lock = ei_mutex_create(); - } -#endif /* _REENTRANT */ - + /* gethostname requires len to be max(hostname) + 1 */ if (gethostname(thishostname, EI_MAXHOSTNAMELEN+1) == -1) { #ifdef __WIN32__ @@ -561,43 +782,22 @@ int ei_connect_init(ei_cnode* ec, const char* this_node_name, sprintf(thisnodename, "%s@%s", this_node_name, hp->h_name); } } - res = ei_connect_xinit(ec, thishostname, thisalivename, thisnodename, - (struct in_addr *)*hp->h_addr_list, cookie, creation); + res = ei_connect_xinit_ussi(ec, thishostname, thisalivename, thisnodename, + (struct in_addr *)*hp->h_addr_list, cookie, creation, + cbs, cbs_sz, setup_context); if (buf != buffer) free(buf); return res; } - -/* connects to port at ip-address ip_addr -* and returns fd to socket -* port has to be in host byte order -*/ -static int cnct(uint16 port, struct in_addr *ip_addr, int addr_len, unsigned ms) +int ei_connect_init(ei_cnode* ec, const char* this_node_name, + const char *cookie, short creation) { - int s, res; - struct sockaddr_in iserv_addr; - - if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - erl_errno = errno; - return ERL_ERROR; - } - - memset((char*)&iserv_addr, 0, sizeof(struct sockaddr_in)); - memcpy((char*)&iserv_addr.sin_addr, (char*)ip_addr, addr_len); - iserv_addr.sin_family = AF_INET; - iserv_addr.sin_port = htons(port); - - if ((res = ei_connect_t(s, (struct sockaddr*)&iserv_addr, - sizeof(iserv_addr),ms)) < 0) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - closesocket(s); - return ERL_ERROR; - } - - return s; -} /* cnct */ - + return ei_connect_init_ussi(ec, this_node_name, cookie, creation, + &ei_default_socket_callbacks, + sizeof(ei_default_socket_callbacks), + NULL); +} /* * Same as ei_gethostbyname_r, but also handles ERANGE error @@ -758,91 +958,218 @@ int ei_connect(ei_cnode* ec, char *nodename) * the node through epmd at that host * */ -int ei_xconnect_tmo(ei_cnode* ec, Erl_IpAddr adr, char *alivename, unsigned ms) +int ei_xconnect_tmo(ei_cnode* ec, Erl_IpAddr ip_addr, char *alivename, unsigned ms) { - struct in_addr *ip_addr=(struct in_addr *) adr; + ei_socket_callbacks *cbs = ec->cbs; + void *ctx; int rport = 0; /*uint16 rport = 0;*/ int sockd; - int one = 1; int dist = 0; - ErlConnect her_name; unsigned her_flags, her_version; - + unsigned our_challenge, her_challenge; + unsigned char our_digest[16]; + int err; + int pkt_sz; + struct sockaddr_in addr; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + erl_errno = EIO; /* Default error code */ EI_TRACE_CONN1("ei_xconnect","-> CONNECT attempt to connect to %s", alivename); - if ((rport = ei_epmd_port_tmo(ip_addr,alivename,&dist, ms)) < 0) { + if ((rport = ei_epmd_port_tmo(ip_addr,alivename,&dist, tmo)) < 0) { EI_TRACE_ERR0("ei_xconnect","-> CONNECT can't get remote port"); /* ei_epmd_port_tmo() has set erl_errno */ return ERL_NO_PORT; } - - /* we now have port number to enode, try to connect */ - if((sockd = cnct((uint16)rport, ip_addr, sizeof(struct in_addr),ms)) < 0) { - EI_TRACE_ERR0("ei_xconnect","-> CONNECT socket connect failed"); - /* cnct() has set erl_errno */ - return ERL_CONNECT_FAIL; - } - - EI_TRACE_CONN0("ei_xconnect","-> CONNECT connected to remote"); - /* FIXME why connect before checking 'dist' output from ei_epmd_port() ?! */ if (dist <= 4) { EI_TRACE_ERR0("ei_xconnect","-> CONNECT remote version not compatible"); - goto error; + return ERL_ERROR; } - else { - unsigned our_challenge, her_challenge; - unsigned char our_digest[16]; - - if (send_name(sockd, ec->thisnodename, (unsigned) dist, ms)) - goto error; - if (recv_status(sockd, ms)) - goto error; - if (recv_challenge(sockd, &her_challenge, &her_version, - &her_flags, &her_name, ms)) - goto error; - our_challenge = gen_challenge(); - gen_digest(her_challenge, ec->ei_connect_cookie, our_digest); - if (send_challenge_reply(sockd, our_digest, our_challenge, ms)) - goto error; - if (recv_challenge_ack(sockd, our_challenge, - ec->ei_connect_cookie, ms)) - goto error; - put_ei_socket_info(sockd, dist, null_cookie, ec); /* FIXME check == 0 */ + + err = ei_socket_ctx__(cbs, &ctx, ec->setup_context); + if (err) { + EI_TRACE_ERR2("ei_xconnect","-> SOCKET failed: %s (%d)", + estr(err), err); + erl_errno = err; + return ERL_CONNECT_FAIL; + } + + memset((void *) &addr, 0, sizeof(struct sockaddr_in)); + memcpy((void *) &addr.sin_addr, (void *) ip_addr, sizeof(addr.sin_addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(rport); + + err = ei_connect_ctx_t__(cbs, ctx, (void *) &addr, sizeof(addr), tmo); + if (err) { + EI_TRACE_ERR2("ei_xconnect","-> CONNECT socket connect failed: %s (%d)", + estr(err), err); + abort_connection(cbs, ctx); + erl_errno = err; + return ERL_CONNECT_FAIL; } - setsockopt(sockd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); - setsockopt(sockd, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); + EI_TRACE_CONN0("ei_xconnect","-> CONNECT connected to remote"); - EI_TRACE_CONN1("ei_xconnect","-> CONNECT (ok) remote = %s",alivename); + err = EI_GET_FD__(cbs, ctx, &sockd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + goto error; + } + + err = cbs->handshake_packet_header_size(ctx, &pkt_sz); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + goto error; + } + + if (send_name(cbs, ctx, pkt_sz, ec->thisnodename, (unsigned) dist, tmo)) + goto error; + if (recv_status(cbs, ctx, pkt_sz, tmo)) + goto error; + if (recv_challenge(cbs, ctx, pkt_sz, &her_challenge, + &her_version, &her_flags, NULL, tmo)) + goto error; + our_challenge = gen_challenge(); + gen_digest(her_challenge, ec->ei_connect_cookie, our_digest); + if (send_challenge_reply(cbs, ctx, pkt_sz, our_digest, our_challenge, tmo)) + goto error; + if (recv_challenge_ack(cbs, ctx, pkt_sz, our_challenge, + ec->ei_connect_cookie, tmo)) + goto error; + if (put_ei_socket_info(sockd, dist, null_cookie, ec, cbs, ctx) != 0) + goto error; + + if (cbs->connect_handshake_complete) { + err = cbs->connect_handshake_complete(ctx); + if (err) { + EI_TRACE_ERR2("ei_xconnect","-> CONNECT failed: %s (%d)", + estr(err), err); + close_connection(cbs, ctx, sockd); + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } + } + EI_TRACE_CONN1("ei_xconnect","-> CONNECT (ok) remote = %s",alivename); + erl_errno = 0; return sockd; error: EI_TRACE_ERR0("ei_xconnect","-> CONNECT failed"); - closesocket(sockd); + abort_connection(cbs, ctx); return ERL_ERROR; } /* ei_xconnect */ -int ei_xconnect(ei_cnode* ec, Erl_IpAddr adr, char *alivename) +int ei_xconnect(ei_cnode* ec, Erl_IpAddr ip_addr, char *alivename) { - return ei_xconnect_tmo(ec, adr, alivename, 0); + return ei_xconnect_tmo(ec, ip_addr, alivename, 0); } +int ei_listen(ei_cnode *ec, int *port, int backlog) +{ + struct in_addr ip_addr; + ip_addr.s_addr = htonl(INADDR_ANY); + return ei_xlisten(ec, &ip_addr, port, backlog); +} + +int ei_xlisten(ei_cnode *ec, struct in_addr *ip_addr, int *port, int backlog) +{ + ei_socket_callbacks *cbs = ec->cbs; + struct sockaddr_in sock_addr; + void *ctx; + int fd, err, len; + + err = ei_socket_ctx__(cbs, &ctx, ec->setup_context); + if (err) { + EI_TRACE_ERR2("ei_xlisten","-> SOCKET failed: %s (%d)", + estr(err), err); + erl_errno = err; + return ERL_ERROR; + } + + memset((void *) &sock_addr, 0, sizeof(struct sockaddr_in)); + memcpy((void *) &sock_addr.sin_addr, (void *) ip_addr, sizeof(*ip_addr)); + sock_addr.sin_family = AF_INET; + sock_addr.sin_port = htons((short) *port); + + len = sizeof(sock_addr); + err = ei_listen_ctx__(cbs, ctx, (void *) &sock_addr, &len, backlog); + if (err) { + EI_TRACE_ERR2("ei_xlisten","-> listen failed: %s (%d)", + estr(err), err); + erl_errno = err; + goto error; + } + + if (len != sizeof(sock_addr)) { + if (len < offsetof(struct sockaddr_in, sin_addr) + sizeof(sock_addr.sin_addr) + || len < offsetof(struct sockaddr_in, sin_port) + sizeof(sock_addr.sin_port)) { + erl_errno = EIO; + EI_TRACE_ERR0("ei_xlisten","-> get info failed"); + goto error; + } + } + + memcpy((void *) ip_addr, (void *) &sock_addr.sin_addr, sizeof(*ip_addr)); + *port = (int) ntohs(sock_addr.sin_port); + + err = EI_GET_FD__(cbs, ctx, &fd); + if (err) { + erl_errno = err; + goto error; + } + + if (put_ei_socket_info(fd, 0, null_cookie, ec, cbs, ctx) != 0) { + EI_TRACE_ERR0("ei_xlisten","-> save socket info failed"); + erl_errno = EIO; + goto error; + } + + erl_errno = 0; + + return fd; + +error: + abort_connection(cbs, ctx); + return ERL_ERROR; +} + +static int close_connection(ei_socket_callbacks *cbs, void *ctx, int fd) +{ + int err; + remove_ei_socket_info(fd); + err = ei_close_ctx__(cbs, ctx); + if (err) { + erl_errno = err; + return ERL_ERROR; + } + return 0; +} - /* - * For symmetry reasons -*/ -#if 0 int ei_close_connection(int fd) { - return closesocket(fd); + ei_socket_callbacks *cbs; + void *ctx; + int err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) + erl_errno = err; + else { + if (close_connection(cbs, ctx, fd) == 0) + return 0; + } + EI_TRACE_ERR2("ei_close_connection","<- CLOSE socket close failed: %s (%d)", + estr(erl_errno), erl_errno); + return ERL_ERROR; } /* ei_close_connection */ -#endif + +static void abort_connection(ei_socket_callbacks *cbs, void *ctx) +{ + (void) ei_close_ctx__(cbs, ctx); +} /* * Accept and initiate a connection from another @@ -857,25 +1184,71 @@ int ei_accept(ei_cnode* ec, int lfd, ErlConnect *conp) int ei_accept_tmo(ei_cnode* ec, int lfd, ErlConnect *conp, unsigned ms) { int fd; - struct sockaddr_in cli_addr; - int cli_addr_len=sizeof(struct sockaddr_in); unsigned her_version, her_flags; - ErlConnect her_name; + char tmp_nodename[MAXNODELEN+1]; + char *her_name; + int pkt_sz, err; + struct sockaddr_in addr; + int addr_len = sizeof(struct sockaddr_in); + ei_socket_callbacks *cbs; + void *ctx; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; erl_errno = EIO; /* Default error code */ + + err = EI_GET_CBS_CTX__(&cbs, &ctx, lfd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } + EI_TRACE_CONN0("ei_accept","<- ACCEPT waiting for connection"); + + if (conp) { + her_name = &conp->nodename[0]; + } + else { + her_name = &tmp_nodename[0]; + } - if ((fd = ei_accept_t(lfd, (struct sockaddr*) &cli_addr, - &cli_addr_len, ms )) < 0) { - EI_TRACE_ERR0("ei_accept","<- ACCEPT socket accept failed"); - erl_errno = (fd == -2) ? ETIMEDOUT : EIO; - goto error; + /* + * ei_accept_ctx_t__() replaces the pointer to the listen context + * with a pointer to the accepted connection context on success. + */ + err = ei_accept_ctx_t__(cbs, &ctx, (void *) &addr, &addr_len, tmo); + if (err) { + EI_TRACE_ERR2("ei_accept","<- ACCEPT socket accept failed: %s (%d)", + estr(err), err); + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } + + err = EI_GET_FD__(cbs, ctx, &fd); + if (err) { + EI_TRACE_ERR2("ei_accept","<- ACCEPT get fd failed: %s (%d)", + estr(err), err); + EI_CONN_SAVE_ERRNO__(err); + } + + if (addr_len != sizeof(struct sockaddr_in)) { + if (addr_len < (offsetof(struct sockaddr_in, sin_addr) + + sizeof(addr.sin_addr))) { + EI_TRACE_ERR0("ei_accept","<- ACCEPT get addr failed"); + goto error; + } + } + + err = cbs->handshake_packet_header_size(ctx, &pkt_sz); + if (err) { + EI_TRACE_ERR2("ei_accept","<- ACCEPT get packet size failed: %s (%d)", + estr(err), err); + EI_CONN_SAVE_ERRNO__(err); } EI_TRACE_CONN0("ei_accept","<- ACCEPT connected to remote"); - if (recv_name(fd, &her_version, &her_flags, &her_name, ms)) { + if (recv_name(cbs, ctx, pkt_sz, &her_version, &her_flags, her_name, tmo)) { EI_TRACE_ERR0("ei_accept","<- ACCEPT initial ident failed"); goto error; } @@ -888,34 +1261,46 @@ int ei_accept_tmo(ei_cnode* ec, int lfd, ErlConnect *conp, unsigned ms) unsigned our_challenge; unsigned her_challenge; unsigned char our_digest[16]; - - if (send_status(fd,"ok", ms)) + + if (send_status(cbs, ctx, pkt_sz, "ok", tmo)) goto error; our_challenge = gen_challenge(); - if (send_challenge(fd, ec->thisnodename, - our_challenge, her_version, ms)) + if (send_challenge(cbs, ctx, pkt_sz, ec->thisnodename, + our_challenge, her_version, tmo)) goto error; - if (recv_challenge_reply(fd, our_challenge, - ec->ei_connect_cookie, - &her_challenge, ms)) + if (recv_challenge_reply(cbs, ctx, pkt_sz, our_challenge, + ec->ei_connect_cookie, &her_challenge, tmo)) goto error; gen_digest(her_challenge, ec->ei_connect_cookie, our_digest); - if (send_challenge_ack(fd, our_digest, ms)) + if (send_challenge_ack(cbs, ctx, pkt_sz, our_digest, tmo)) goto error; - put_ei_socket_info(fd, her_version, null_cookie, ec); + if (put_ei_socket_info(fd, her_version, null_cookie, ec, cbs, ctx) != 0) + goto error; + } + if (conp) { + memcpy((void *) conp->ipadr, (void *) &addr.sin_addr, sizeof(conp->ipadr)); + strcpy(&conp->nodename[0], her_name); + } + + if (cbs->accept_handshake_complete) { + err = cbs->accept_handshake_complete(ctx); + if (err) { + EI_TRACE_ERR2("ei_xconnect","-> ACCEPT handshake failed: %s (%d)", + estr(err), err); + close_connection(cbs, ctx, fd); + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } } - if (conp) - *conp = her_name; - EI_TRACE_CONN1("ei_accept","<- ACCEPT (ok) remote = %s",her_name.nodename); + EI_TRACE_CONN1("ei_accept","<- ACCEPT (ok) remote = %s",her_name); erl_errno = 0; /* No error */ return fd; error: EI_TRACE_ERR0("ei_accept","<- ACCEPT failed"); - if (fd>=0) - closesocket(fd); + abort_connection(cbs, ctx); return ERL_ERROR; } /* ei_accept */ @@ -927,36 +1312,57 @@ error: */ int ei_receive_tmo(int fd, unsigned char *bufp, int bufsize, unsigned ms) { - int len; + ssize_t len; unsigned char fourbyte[4]={0,0,0,0}; - int res; - - if ((res = ei_read_fill_t(fd, (char *) bufp, 4, ms)) != 4) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + int err; + ei_socket_callbacks *cbs; + void *ctx; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + + err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } + + len = (ssize_t) 4; + err = ei_read_fill_ctx_t__(cbs, ctx, (char *) bufp, &len, tmo); + if (!err && len != (ssize_t) 4) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); return ERL_ERROR; } /* Tick handling */ - if ((len = get_int32(bufp)) == ERL_TICK) - { - ei_write_fill_t(fd, (char *) fourbyte, 4, ms); + len = get_int32(bufp); + if (len == ERL_TICK) { + len = 4; + ei_write_fill_ctx_t__(cbs, ctx, (char *) fourbyte, &len, tmo); /* FIXME ok to ignore error or timeout? */ erl_errno = EAGAIN; return ERL_TICK; } - else if (len > bufsize) - { + + if (len > bufsize) { /* FIXME: We should drain the message. */ erl_errno = EMSGSIZE; return ERL_ERROR; } - else if ((res = ei_read_fill_t(fd, (char *) bufp, len, ms)) != len) - { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return ERL_ERROR; + else { + ssize_t need = len; + err = ei_read_fill_ctx_t__(cbs, ctx, (char *) bufp, &len, tmo); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } + if (len != need) { + erl_errno = EIO; + return ERL_ERROR; + } } - return len; + return (int) len; } @@ -1112,36 +1518,11 @@ int ei_rpc_to(ei_cnode *ec, int fd, char *mod, char *fun, int ei_rpc_from(ei_cnode *ec, int fd, int timeout, erlang_msg *msg, ei_x_buff *x) { - fd_set readmask; - struct timeval tv; - struct timeval *t = NULL; - - if (timeout >= 0) { - tv.tv_sec = timeout / 1000; - tv.tv_usec = (timeout % 1000) * 1000; - t = &tv; - } - - FD_ZERO(&readmask); - FD_SET(fd,&readmask); - - switch (select(fd+1, &readmask, NULL, NULL, t)) { - case -1: - erl_errno = EIO; - return ERL_ERROR; - - case 0: - erl_errno = ETIMEDOUT; - return ERL_TIMEOUT; - - default: - if (FD_ISSET(fd, &readmask)) { - return ei_xreceive_msg(fd, msg, x); - } else { - erl_errno = EIO; - return ERL_ERROR; - } - } + unsigned tmo = timeout < 0 ? EI_SCLBK_INF_TMO : (unsigned) timeout; + int res = ei_xreceive_msg_tmo(fd, msg, x, tmo); + if (res < 0 && erl_errno == ETIMEDOUT) + return ERL_TIMEOUT; + return res; } /* rpc_from */ /* @@ -1295,19 +1676,34 @@ static char *hex(char digest[16], char buff[33]) return buff; } -static int read_2byte_package(int fd, char **buf, int *buflen, - int *is_static, unsigned ms) +static int read_hs_package(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, char **buf, int *buflen, + int *is_static, unsigned ms) { - unsigned char nbuf[2]; + unsigned char nbuf[4]; unsigned char *x = nbuf; - unsigned len; - int res; - - if((res = ei_read_fill_t(fd, (char *)nbuf, 2, ms)) != 2) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + ssize_t len, need; + int err; + + len = (ssize_t) pkt_sz; + err = ei_read_fill_ctx_t__(cbs, ctx, (char *)nbuf, &len, ms); + if (!err && len != (ssize_t) pkt_sz) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); return -1; } - len = get16be(x); + + switch (pkt_sz) { + case 2: + len = get16be(x); + break; + case 4: + len = get32be(x); + break; + default: + return -1; + } if (len > *buflen) { if (*is_static) { @@ -1329,20 +1725,26 @@ static int read_2byte_package(int fd, char **buf, int *buflen, *buflen = len; } } - if ((res = ei_read_fill_t(fd, *buf, len, ms)) != len) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + need = len; + err = ei_read_fill_ctx_t__(cbs, ctx, *buf, &len, ms); + if (!err && len != need) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); return -1; } return len; } -static int send_status(int fd, char *status, unsigned ms) +static int send_status(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, char *status, unsigned ms) { char *buf, *s; char dbuf[DEFBUF_SIZ]; - int siz = strlen(status) + 1 + 2; - int res; + int siz = strlen(status) + 1 + pkt_sz; + int err; + ssize_t len; buf = (siz > DEFBUF_SIZ) ? malloc(siz) : dbuf; if (!buf) { @@ -1350,14 +1752,28 @@ static int send_status(int fd, char *status, unsigned ms) return -1; } s = buf; - put16be(s,siz - 2); + switch (pkt_sz) { + case 2: + put16be(s,siz - 2); + break; + case 4: + put32be(s,siz - 4); + break; + default: + return -1; + } put8(s, 's'); memcpy(s, status, strlen(status)); - if ((res = ei_write_fill_t(fd, buf, siz, ms)) != siz) { - EI_TRACE_ERR0("send_status","-> SEND_STATUS socket write failed"); + len = (ssize_t) siz; + err = ei_write_fill_ctx_t__(cbs, ctx, buf, &len, ms); + if (!err && len != (ssize_t) siz) + err = EIO; + if (err) { + EI_TRACE_ERR2("send_status","-> SEND_STATUS socket write failed: %s (%d)", + estr(err), err); if (buf != dbuf) - free(buf); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + free(buf); + EI_CONN_SAVE_ERRNO__(err); return -1; } EI_TRACE_CONN1("send_status","-> SEND_STATUS (%s)",status); @@ -1367,7 +1783,8 @@ static int send_status(int fd, char *status, unsigned ms) return 0; } -static int recv_status(int fd, unsigned ms) +static int recv_status(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned ms) { char dbuf[DEFBUF_SIZ]; char *buf = dbuf; @@ -1375,7 +1792,8 @@ static int recv_status(int fd, unsigned ms) int buflen = DEFBUF_SIZ; int rlen; - if ((rlen = read_2byte_package(fd, &buf, &buflen, &is_static, ms)) <= 0) { + if ((rlen = read_hs_package(cbs, ctx, pkt_sz, + &buf, &buflen, &is_static, ms)) <= 0) { EI_TRACE_ERR1("recv_status", "<- RECV_STATUS socket read failed (%d)", rlen); goto error; @@ -1396,7 +1814,10 @@ error: return -1; } -static int send_name_or_challenge(int fd, char *nodename, +static int send_name_or_challenge(ei_socket_callbacks *cbs, + void *ctx, + int pkt_sz, + char *nodename, int f_chall, unsigned challenge, unsigned version, @@ -1405,9 +1826,10 @@ static int send_name_or_challenge(int fd, char *nodename, char *buf; unsigned char *s; char dbuf[DEFBUF_SIZ]; - int siz = 2 + 1 + 2 + 4 + strlen(nodename); + int siz = pkt_sz + 1 + 2 + 4 + strlen(nodename); const char* function[] = {"SEND_NAME", "SEND_CHALLENGE"}; - int res; + int err; + ssize_t len; if (f_chall) siz += 4; @@ -1417,7 +1839,16 @@ static int send_name_or_challenge(int fd, char *nodename, return -1; } s = (unsigned char *)buf; - put16be(s,siz - 2); + switch (pkt_sz) { + case 2: + put16be(s,siz - 2); + break; + case 4: + put32be(s,siz - 4); + break; + default: + return -1; + } put8(s, 'n'); put16be(s, version); put32be(s, (DFLAG_EXTENDED_REFERENCES @@ -1433,13 +1864,16 @@ static int send_name_or_challenge(int fd, char *nodename, if (f_chall) put32be(s, challenge); memcpy(s, nodename, strlen(nodename)); - - if ((res = ei_write_fill_t(fd, buf, siz, ms)) != siz) { + len = (ssize_t) siz; + err = ei_write_fill_ctx_t__(cbs, ctx, buf, &len, ms); + if (!err && len != (ssize_t) siz) + err = EIO; + if (err) { EI_TRACE_ERR1("send_name_or_challenge", "-> %s socket write failed", function[f_chall]); if (buf != dbuf) free(buf); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + EI_CONN_SAVE_ERRNO__(err); return -1; } @@ -1448,9 +1882,9 @@ static int send_name_or_challenge(int fd, char *nodename, return 0; } -static int recv_challenge(int fd, unsigned *challenge, - unsigned *version, - unsigned *flags, ErlConnect *namebuf, unsigned ms) +static int recv_challenge(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned *challenge, unsigned *version, + unsigned *flags, char *namebuf, unsigned ms) { char dbuf[DEFBUF_SIZ]; char *buf = dbuf; @@ -1458,13 +1892,13 @@ static int recv_challenge(int fd, unsigned *challenge, int buflen = DEFBUF_SIZ; int rlen; char *s; - struct sockaddr_in sin; - socklen_t sin_len = sizeof(sin); char tag; - + char tmp_nodename[MAXNODELEN+1]; + erl_errno = EIO; /* Default */ - if ((rlen = read_2byte_package(fd, &buf, &buflen, &is_static, ms)) <= 0) { + if ((rlen = read_hs_package(cbs, ctx, pkt_sz, &buf, &buflen, + &is_static, ms)) <= 0) { EI_TRACE_ERR1("recv_challenge", "<- RECV_CHALLENGE socket read failed (%d)",rlen); goto error; @@ -1505,22 +1939,19 @@ static int recv_challenge(int fd, unsigned *challenge, goto error; } - if (getpeername(fd, (struct sockaddr *) &sin, &sin_len) < 0) { - EI_TRACE_ERR0("recv_challenge","<- RECV_CHALLENGE can't get peername"); - erl_errno = errno; - goto error; - } - memcpy(namebuf->ipadr, &(sin.sin_addr.s_addr), - sizeof(sin.sin_addr.s_addr)); - memcpy(namebuf->nodename, s, rlen - 11); - namebuf->nodename[rlen - 11] = '\0'; + if (!namebuf) + namebuf = &tmp_nodename[0]; + + memcpy(namebuf, s, rlen - 11); + namebuf[rlen - 11] = '\0'; + if (!is_static) free(buf); EI_TRACE_CONN4("recv_challenge","<- RECV_CHALLENGE (ok) node = %s, " "version = %u, " "flags = %u, " "challenge = %d", - namebuf->nodename, + namebuf, *version, *flags, *challenge @@ -1533,24 +1964,40 @@ error: return -1; } -static int send_challenge_reply(int fd, unsigned char digest[16], +static int send_challenge_reply(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned char digest[16], unsigned challenge, unsigned ms) { char *s; char buf[DEFBUF_SIZ]; - int siz = 2 + 1 + 4 + 16; - int res; + int siz = pkt_sz + 1 + 4 + 16; + int err; + ssize_t len; s = buf; - put16be(s,siz - 2); + switch (pkt_sz) { + case 2: + put16be(s,siz - 2); + break; + case 4: + put32be(s,siz - 4); + break; + default: + return -1; + } put8(s, 'r'); put32be(s, challenge); memcpy(s, digest, 16); - - if ((res = ei_write_fill_t(fd, buf, siz, ms)) != siz) { - EI_TRACE_ERR0("send_challenge_reply", - "-> SEND_CHALLENGE_REPLY socket write failed"); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + + len = (ssize_t) siz; + err = ei_write_fill_ctx_t__(cbs, ctx, buf, &len, ms); + if (!err && len != (ssize_t) siz) + err = EIO; + if (err) { + EI_TRACE_ERR2("send_challenge_reply", + "-> SEND_CHALLENGE_REPLY socket write failed: %s (%d)", + estr(err), err); + EI_CONN_SAVE_ERRNO__(err); return -1; } @@ -1563,11 +2010,13 @@ static int send_challenge_reply(int fd, unsigned char digest[16], return 0; } -static int recv_challenge_reply (int fd, - unsigned our_challenge, - char cookie[], - unsigned *her_challenge, - unsigned ms) +static int recv_challenge_reply(ei_socket_callbacks *cbs, + void *ctx, + int pkt_sz, + unsigned our_challenge, + char cookie[], + unsigned *her_challenge, + unsigned ms) { char dbuf[DEFBUF_SIZ]; char *buf = dbuf; @@ -1580,7 +2029,7 @@ static int recv_challenge_reply (int fd, erl_errno = EIO; /* Default */ - if ((rlen = read_2byte_package(fd, &buf, &buflen, &is_static, ms)) != 21) { + if ((rlen = read_hs_package(cbs, ctx, pkt_sz, &buf, &buflen, &is_static, ms)) != 21) { EI_TRACE_ERR1("recv_challenge_reply", "<- RECV_CHALLENGE_REPLY socket read failed (%d)",rlen); goto error; @@ -1620,23 +2069,38 @@ error: return -1; } -static int send_challenge_ack(int fd, unsigned char digest[16], unsigned ms) +static int send_challenge_ack(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + unsigned char digest[16], unsigned ms) { char *s; char buf[DEFBUF_SIZ]; - int siz = 2 + 1 + 16; - int res; + int siz = pkt_sz + 1 + 16; + int err; + ssize_t len; s = buf; - - put16be(s,siz - 2); + switch (pkt_sz) { + case 2: + put16be(s,siz - 2); + break; + case 4: + put32be(s,siz - 4); + break; + default: + return -1; + } put8(s, 'a'); memcpy(s, digest, 16); - if ((res = ei_write_fill_t(fd, buf, siz, ms)) != siz) { - EI_TRACE_ERR0("recv_challenge_reply", - "-> SEND_CHALLENGE_ACK socket write failed"); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + len = (ssize_t) siz; + err = ei_write_fill_ctx_t__(cbs, ctx, buf, &len, ms); + if (!err && len != (ssize_t) siz) + err = EIO; + if (err) { + EI_TRACE_ERR2("recv_challenge_reply", + "-> SEND_CHALLENGE_ACK socket write failed: %s (%d)", + estr(err), err); + EI_CONN_SAVE_ERRNO__(err); return -1; } @@ -1649,8 +2113,8 @@ static int send_challenge_ack(int fd, unsigned char digest[16], unsigned ms) return 0; } -static int recv_challenge_ack(int fd, - unsigned our_challenge, +static int recv_challenge_ack(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned our_challenge, char cookie[], unsigned ms) { char dbuf[DEFBUF_SIZ]; @@ -1664,7 +2128,7 @@ static int recv_challenge_ack(int fd, erl_errno = EIO; /* Default */ - if ((rlen = read_2byte_package(fd, &buf, &buflen, &is_static, ms)) != 17) { + if ((rlen = read_hs_package(cbs, ctx, pkt_sz, &buf, &buflen, &is_static, ms)) != 17) { EI_TRACE_ERR1("recv_challenge_ack", "<- RECV_CHALLENGE_ACK socket read failed (%d)",rlen); goto error; @@ -1701,20 +2165,24 @@ error: return -1; } -static int send_name(int fd, char *nodename, unsigned version, unsigned ms) +static int send_name(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + char *nodename, unsigned version, unsigned ms) { - return send_name_or_challenge(fd, nodename, 0, 0, version, ms); + return send_name_or_challenge(cbs, ctx, pkt_sz, nodename, 0, + 0, version, ms); } -static int send_challenge(int fd, char *nodename, - unsigned challenge, unsigned version, unsigned ms) +static int send_challenge(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + char *nodename, unsigned challenge, unsigned version, + unsigned ms) { - return send_name_or_challenge(fd, nodename, 1, challenge, version, ms); + return send_name_or_challenge(cbs, ctx, pkt_sz, nodename, 1, + challenge, version, ms); } -static int recv_name(int fd, - unsigned *version, - unsigned *flags, ErlConnect *namebuf, unsigned ms) +static int recv_name(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned *version, + unsigned *flags, char *namebuf, unsigned ms) { char dbuf[DEFBUF_SIZ]; char *buf = dbuf; @@ -1722,13 +2190,13 @@ static int recv_name(int fd, int buflen = DEFBUF_SIZ; int rlen; char *s; - struct sockaddr_in sin; - socklen_t sin_len = sizeof(sin); + char tmp_nodename[MAXNODELEN+1]; char tag; erl_errno = EIO; /* Default */ - if ((rlen = read_2byte_package(fd, &buf, &buflen, &is_static, ms)) <= 0) { + if ((rlen = read_hs_package(cbs, ctx, pkt_sz, &buf, &buflen, + &is_static, ms)) <= 0) { EI_TRACE_ERR1("recv_name","<- RECV_NAME socket read failed (%d)",rlen); goto error; } @@ -1759,21 +2227,18 @@ static int recv_name(int fd, erl_errno = EIO; goto error; } - - if (getpeername(fd, (struct sockaddr *) &sin, &sin_len) < 0) { - EI_TRACE_ERR0("recv_name","<- RECV_NAME can't get peername"); - erl_errno = errno; - goto error; - } - memcpy(namebuf->ipadr, &(sin.sin_addr.s_addr), - sizeof(sin.sin_addr.s_addr)); - memcpy(namebuf->nodename, s, rlen - 7); - namebuf->nodename[rlen - 7] = '\0'; + + if (!namebuf) + namebuf = &tmp_nodename[0]; + + memcpy(namebuf, s, rlen - 7); + namebuf[rlen - 7] = '\0'; + if (!is_static) free(buf); EI_TRACE_CONN3("recv_name", "<- RECV_NAME (ok) node = %s, version = %u, flags = %u", - namebuf->nodename,*version,*flags); + namebuf,*version,*flags); erl_errno = 0; return 0; @@ -1867,3 +2332,4 @@ static int get_cookie(char *buf, int bufsize) return 1; /* Success! */ } + diff --git a/lib/erl_interface/src/connect/eirecv.c b/lib/erl_interface/src/connect/eirecv.c index 7b9dbfc387..47eea06ced 100644 --- a/lib/erl_interface/src/connect/eirecv.c +++ b/lib/erl_interface/src/connect/eirecv.c @@ -60,22 +60,36 @@ ei_recv_internal (int fd, int arity; int version; int index = 0; - int i = 0; - int res; + int err; int show_this_msg = 0; + ei_socket_callbacks *cbs; + void *ctx; + ssize_t rlen; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + + err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; + } /* get length field */ - if ((res = ei_read_fill_t(fd, header, 4, ms)) != 4) - { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + rlen = 4; + err = ei_read_fill_ctx_t__(cbs, ctx, header, &rlen, tmo); + if (!err && rlen != 4) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); return -1; } + len = get32be(s); /* got tick - respond and return */ if (!len) { char tock[] = {0,0,0,0}; - ei_write_fill_t(fd, tock, sizeof(tock), ms); /* Failure no problem */ + ssize_t wlen = sizeof(tock); + ei_write_fill_ctx_t__(cbs, ctx, tock, &wlen, tmo); /* Failure no problem */ *msglenp = 0; return 0; /* maybe flag ERL_EAGAIN [sverkerw] */ } @@ -86,9 +100,12 @@ ei_recv_internal (int fd, ei_trace(-1,NULL); /* read enough to get at least entire header */ - bytesread = (len > EIRECVBUF ? EIRECVBUF : len); - if ((i = ei_read_fill_t(fd,header,bytesread,ms)) != bytesread) { - erl_errno = (i == -2) ? ETIMEDOUT : EIO; + rlen = bytesread = (len > EIRECVBUF ? EIRECVBUF : len); + err = ei_read_fill_ctx_t__(cbs, ctx, header, &rlen, tmo); + if (!err && rlen != bytesread) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); return -1; } @@ -212,12 +229,17 @@ ei_recv_internal (int fd, */ if (msglen > *bufsz) { if (staticbufp) { - int sz = EIRECVBUF; /* flush in rest of packet */ while (remain > 0) { - if (remain < sz) sz = remain; - if ((i=ei_read_fill_t(fd,header,sz,ms)) <= 0) break; - remain -= i; + rlen = remain > EIRECVBUF ? EIRECVBUF : remain; + err = ei_read_fill_ctx_t__(cbs, ctx, header, &rlen, tmo); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; + } + if (rlen == 0) + break; + remain -= rlen; } erl_errno = EMSGSIZE; return -1; @@ -247,11 +269,15 @@ ei_recv_internal (int fd, /* read the rest of the message into callers buffer */ if (remain > 0) { - if ((i = ei_read_fill_t(fd,mbuf+bytesread-index,remain,ms)) != remain) { - *msglenp = bytesread-index+1; /* actual bytes in users buffer */ - erl_errno = (i == -2) ? ETIMEDOUT : EIO; - return -1; - } + rlen = remain; + err = ei_read_fill_ctx_t__(cbs, ctx, mbuf+bytesread-index, &rlen, tmo); + if (!err && rlen != remain) + err = EIO; + if (err) { + *msglenp = bytesread-index+1; /* actual bytes in users buffer */ + EI_CONN_SAVE_ERRNO__(err); + return -1; + } } if (show_this_msg) diff --git a/lib/erl_interface/src/connect/send.c b/lib/erl_interface/src/connect/send.c index 37d7db6d68..d97532d123 100644 --- a/lib/erl_interface/src/connect/send.c +++ b/lib/erl_interface/src/connect/send.c @@ -58,10 +58,17 @@ int ei_send_encoded_tmo(int fd, const erlang_pid *to, char *s, header[1200]; /* see size calculation below */ erlang_trace *token = NULL; int index = 5; /* reserve 5 bytes for control message */ - int res; -#ifdef HAVE_WRITEV - struct iovec v[2]; -#endif + int err; + ei_socket_callbacks *cbs; + void *ctx; + ssize_t len, tot_len; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + + err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } /* are we tracing? */ /* check that he can receive trace tokens first */ @@ -91,30 +98,47 @@ int ei_send_encoded_tmo(int fd, const erlang_pid *to, if (ei_tracelevel >= 4) ei_show_sendmsg(stderr,header,msg); -#ifdef HAVE_WRITEV - - v[0].iov_base = (char *)header; - v[0].iov_len = index; - v[1].iov_base = (char *)msg; - v[1].iov_len = msglen; - - if ((res = ei_writev_fill_t(fd,v,2,ms)) != index+msglen) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; - } - -#else /* !HAVE_WRITEV */ - - if ((res = ei_write_fill_t(fd,header,index,ms)) != index) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + +#ifdef EI_HAVE_STRUCT_IOVEC__ + if (ei_socket_callbacks_have_writev__(cbs)) { + struct iovec v[2]; + + v[0].iov_base = (char *)header; + v[0].iov_len = index; + v[1].iov_base = (char *)msg; + v[1].iov_len = msglen; + + len = tot_len = (ssize_t) index+msglen; + err = ei_writev_fill_ctx_t__(cbs, ctx, v, 2, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; + } + + return 0; } - if ((res = ei_write_fill_t(fd,msg,msglen,ms)) != msglen) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; +#endif /* EI_HAVE_STRUCT_IOVEC__ */ + + /* no writev() */ + len = tot_len = (ssize_t) index; + err = ei_write_fill_ctx_t__(cbs, ctx, header, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; } -#endif /* !HAVE_WRITEV */ + len = tot_len = (ssize_t) msglen; + err = ei_write_fill_ctx_t__(cbs, ctx, msg, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; + } return 0; } diff --git a/lib/erl_interface/src/connect/send_exit.c b/lib/erl_interface/src/connect/send_exit.c index 2e298e3221..b4f7e14c7f 100644 --- a/lib/erl_interface/src/connect/send_exit.c +++ b/lib/erl_interface/src/connect/send_exit.c @@ -55,6 +55,17 @@ int ei_send_exit_tmo(int fd, const erlang_pid *from, const erlang_pid *to, char *s; int index = 0; int len = strlen(reason) + 1080; /* see below */ + ei_socket_callbacks *cbs; + void *ctx; + int err; + ssize_t wlen; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + + err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } if (len > EISMALLBUF) if (!(dbuf = malloc(len))) @@ -92,10 +103,16 @@ int ei_send_exit_tmo(int fd, const erlang_pid *from, const erlang_pid *to, if (ei_tracelevel >= 4) ei_show_sendmsg(stderr,msgbuf,NULL); - ei_write_fill_t(fd,msgbuf,index,ms); - /* FIXME ignore timeout etc? erl_errno?! */ - - if (dbuf) free(dbuf); + wlen = (ssize_t) index; + err = ei_write_fill_ctx_t__(cbs, ctx, msgbuf, &wlen, tmo); + if (!err && wlen != (ssize_t) index) + err = EIO; + if (dbuf) + free(dbuf); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } return 0; } diff --git a/lib/erl_interface/src/connect/send_reg.c b/lib/erl_interface/src/connect/send_reg.c index 62478f042d..80d61e57b5 100644 --- a/lib/erl_interface/src/connect/send_reg.c +++ b/lib/erl_interface/src/connect/send_reg.c @@ -51,11 +51,17 @@ int ei_send_reg_encoded_tmo(int fd, const erlang_pid *from, char *s, header[1400]; /* see size calculation below */ erlang_trace *token = NULL; int index = 5; /* reserve 5 bytes for control message */ - int res; + int err; + ei_socket_callbacks *cbs; + void *ctx; + ssize_t len, tot_len; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; -#ifdef HAVE_WRITEV - struct iovec v[2]; -#endif + err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } /* are we tracing? */ /* check that he can receive trace tokens first */ @@ -86,29 +92,45 @@ int ei_send_reg_encoded_tmo(int fd, const erlang_pid *from, if (ei_tracelevel >= 4) ei_show_sendmsg(stderr,header,msg); -#ifdef HAVE_WRITEV +#ifdef EI_HAVE_STRUCT_IOVEC__ + if (ei_socket_callbacks_have_writev__(cbs)) { + struct iovec v[2]; - v[0].iov_base = (char *)header; - v[0].iov_len = index; - v[1].iov_base = (char *)msg; - v[1].iov_len = msglen; + v[0].iov_base = (char *)header; + v[0].iov_len = index; + v[1].iov_base = (char *)msg; + v[1].iov_len = msglen; - if ((res = ei_writev_fill_t(fd,v,2,ms)) != index+msglen) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + len = tot_len = (ssize_t) index+msglen; + err = ei_writev_fill_ctx_t__(cbs, ctx, v, 2, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; + } + return 0; } -#else - +#endif /* EI_HAVE_STRUCT_IOVEC__ */ + /* no writev() */ - if ((res = ei_write_fill_t(fd,header,index,ms)) != index) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + len = tot_len = (ssize_t) index; + err = ei_write_fill_ctx_t__(cbs, ctx, header, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; } - if ((res = ei_write_fill_t(fd,msg,msglen,ms)) != msglen) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + + len = tot_len = (ssize_t) msglen; + err = ei_write_fill_ctx_t__(cbs, ctx, msg, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; } -#endif return 0; } diff --git a/lib/erl_interface/src/epmd/epmd_port.c b/lib/erl_interface/src/epmd/epmd_port.c index 2ec418b24a..492c3fb3aa 100644 --- a/lib/erl_interface/src/epmd/epmd_port.c +++ b/lib/erl_interface/src/epmd/epmd_port.c @@ -62,31 +62,38 @@ int ei_epmd_connect_tmo(struct in_addr *inaddr, unsigned ms) { static unsigned int epmd_port = 0; - struct sockaddr_in saddr; - int sd; - int res; + int port, sd, err; + struct in_addr ip_addr; + struct sockaddr_in addr; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + + err = ei_socket__(&sd); + if (err) { + erl_errno = err; + return -1; + } if (epmd_port == 0) { char* port_str = getenv("ERL_EPMD_PORT"); epmd_port = (port_str != NULL) ? atoi(port_str) : EPMD_PORT; } - memset(&saddr, 0, sizeof(saddr)); - saddr.sin_port = htons(epmd_port); - saddr.sin_family = AF_INET; - if (!inaddr) saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - else memmove(&saddr.sin_addr,inaddr,sizeof(saddr.sin_addr)); + port = (int) epmd_port; - if (((sd = socket(PF_INET, SOCK_STREAM, 0)) < 0)) - { - erl_errno = errno; - return -1; + if (!inaddr) { + ip_addr.s_addr = htonl(INADDR_LOOPBACK); + inaddr = &ip_addr; } + + memset((void *) &addr, 0, sizeof(struct sockaddr_in)); + memcpy((void *) &addr.sin_addr, (void *) inaddr, sizeof(addr.sin_addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); - if ((res = ei_connect_t(sd,(struct sockaddr *)&saddr,sizeof(saddr),ms)) < 0) - { - erl_errno = (res == -2) ? ETIMEDOUT : errno; - closesocket(sd); + err = ei_connect_t__(sd, (void *) &addr, sizeof(addr), tmo); + if (err) { + erl_errno = err; + ei_close__(sd); return -1; } @@ -104,6 +111,9 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, int port; int dist_high, dist_low, proto; int res; + int err; + ssize_t dlen; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; #if defined(VXWORKS) char ntoabuf[32]; #endif @@ -124,10 +134,14 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, return -1; } - if ((res = ei_write_fill_t(fd, buf, len+2, ms)) != len+2) { - closesocket(fd); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + dlen = len + 2; + err = ei_write_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) len + 2) + erl_errno = EIO; + if (err) { + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); + return -1; } #ifdef VXWORKS @@ -142,12 +156,15 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, "-> PORT2_REQ alive=%s ip=%s",alive,inet_ntoa(*addr)); #endif - /* read first two bytes (response type, response) */ - if ((res = ei_read_fill_t(fd, buf, 2, ms)) != 2) { - EI_TRACE_ERR0("ei_epmd_r4_port","<- CLOSE"); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - closesocket(fd); - return -2; /* version mismatch */ + dlen = (ssize_t) 2; + err = ei_read_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) 2) + erl_errno = EIO; + if (err) { + EI_TRACE_ERR0("ei_epmd_r4_port","<- CLOSE"); + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); + return -2; } s = buf; @@ -156,7 +173,7 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, if (res != EI_EPMD_PORT2_RESP) { /* response type */ EI_TRACE_ERR1("ei_epmd_r4_port","<- unknown (%d)",res); EI_TRACE_ERR0("ei_epmd_r4_port","-> CLOSE"); - closesocket(fd); + ei_close__(fd); erl_errno = EIO; return -1; } @@ -167,7 +184,7 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, if ((res = get8(s))) { /* got negative response */ EI_TRACE_ERR1("ei_epmd_r4_port","<- PORT2_RESP result=%d (failure)",res); - closesocket(fd); + ei_close__(fd); erl_errno = EIO; return -1; } @@ -175,14 +192,18 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, EI_TRACE_CONN1("ei_epmd_r4_port","<- PORT2_RESP result=%d (ok)",res); /* expecting remaining 8 bytes */ - if ((res = ei_read_fill_t(fd,buf,8,ms)) != 8) { + dlen = (ssize_t) 8; + err = ei_read_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) 8) + err = EIO; + if (err) { EI_TRACE_ERR0("ei_epmd_r4_port","<- CLOSE"); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - closesocket(fd); + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); return -1; } - closesocket(fd); + ei_close__(fd); s = buf; port = get16be(s); diff --git a/lib/erl_interface/src/epmd/epmd_publish.c b/lib/erl_interface/src/epmd/epmd_publish.c index 47d68a6db0..20b8e867e8 100644 --- a/lib/erl_interface/src/epmd/epmd_publish.c +++ b/lib/erl_interface/src/epmd/epmd_publish.c @@ -68,8 +68,10 @@ static int ei_epmd_r4_publish (int port, const char *alive, unsigned ms) int nlen = strlen(alive); int len = elen + nlen + 13; /* hard coded: be careful! */ int n; - int res, creation; - + int err, res, creation; + ssize_t dlen; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + if (len > sizeof(buf)-2) { erl_errno = ERANGE; @@ -93,29 +95,39 @@ static int ei_epmd_r4_publish (int port, const char *alive, unsigned ms) if ((fd = ei_epmd_connect_tmo(NULL,ms)) < 0) return fd; - if ((res = ei_write_fill_t(fd, buf, len+2, ms)) != len+2) { - closesocket(fd); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + dlen = (ssize_t) len+2; + err = ei_write_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) len + 2) + erl_errno = EIO; + if (err) { + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); + return -1; } EI_TRACE_CONN6("ei_epmd_r4_publish", "-> ALIVE2_REQ alive=%s port=%d ntype=%d " "proto=%d dist-high=%d dist-low=%d", alive,port,'H',EI_MYPROTO,EI_DIST_HIGH,EI_DIST_LOW); - - if ((n = ei_read_fill_t(fd, buf, 4, ms)) != 4) { + + dlen = (ssize_t) 4; + err = ei_read_fill_t__(fd, buf, &dlen, tmo); + n = (int) dlen; + if (!err && n != 4) + err = EIO; + if (err) { EI_TRACE_ERR0("ei_epmd_r4_publish","<- CLOSE"); - closesocket(fd); - erl_errno = (n == -2) ? ETIMEDOUT : EIO; + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); return -2; /* version mismatch */ } + /* Don't close fd here! It keeps us registered with epmd */ s = buf; if (((res=get8(s)) != EI_EPMD_ALIVE2_RESP)) { /* response */ EI_TRACE_ERR1("ei_epmd_r4_publish","<- unknown (%d)",res); EI_TRACE_ERR0("ei_epmd_r4_publish","-> CLOSE"); - closesocket(fd); + ei_close__(fd); erl_errno = EIO; return -1; } @@ -124,7 +136,7 @@ static int ei_epmd_r4_publish (int port, const char *alive, unsigned ms) if (((res=get8(s)) != 0)) { /* 0 == success */ EI_TRACE_ERR1("ei_epmd_r4_publish"," result=%d (fail)",res); - closesocket(fd); + ei_close__(fd); erl_errno = EIO; return -1; } diff --git a/lib/erl_interface/src/epmd/epmd_unpublish.c b/lib/erl_interface/src/epmd/epmd_unpublish.c index 255d0ffb59..c112f74147 100644 --- a/lib/erl_interface/src/epmd/epmd_unpublish.c +++ b/lib/erl_interface/src/epmd/epmd_unpublish.c @@ -58,7 +58,9 @@ int ei_unpublish_tmo(const char *alive, unsigned ms) char buf[EPMDBUF]; char *s = (char*)buf; int len = 1 + strlen(alive); - int fd, res; + int fd, err; + ssize_t dlen; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; if (len > sizeof(buf)-3) { erl_errno = ERANGE; @@ -72,20 +74,29 @@ int ei_unpublish_tmo(const char *alive, unsigned ms) /* FIXME can't connect, return success?! At least commen whats up */ if ((fd = ei_epmd_connect_tmo(NULL,ms)) < 0) return fd; - if ((res = ei_write_fill_t(fd, buf, len+2,ms)) != len+2) { - closesocket(fd); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + dlen = (ssize_t) len+2; + err = ei_write_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) len + 2) + erl_errno = EIO; + if (err) { + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); + return -1; } EI_TRACE_CONN1("ei_unpublish_tmo","-> STOP %s",alive); - - if ((res = ei_read_fill_t(fd, buf, 7, ms)) != 7) { - closesocket(fd); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + + dlen = (ssize_t) 7; + err = ei_read_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) 7) + erl_errno = EIO; + if (err) { + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); + return -1; } - closesocket(fd); + + ei_close__(fd); buf[7]=(char)0; /* terminate the string */ if (!strcmp("STOPPED",(char *)buf)) { diff --git a/lib/erl_interface/src/legacy/erl_connect.c b/lib/erl_interface/src/legacy/erl_connect.c index 7ffd545d3e..e2fd4611c0 100644 --- a/lib/erl_interface/src/legacy/erl_connect.c +++ b/lib/erl_interface/src/legacy/erl_connect.c @@ -179,15 +179,13 @@ int erl_xconnect(Erl_IpAddr addr, char *alivename) * * API: erl_close_connection() * - * Close a connection. FIXME call ei_close_connection() later. - * * Returns 0 on success and -1 on failure. * ***************************************************************************/ int erl_close_connection(int fd) { - return closesocket(fd); + return ei_close_connection(fd); } /* @@ -220,7 +218,10 @@ int erl_reg_send(int fd, char *server_name, ETERM *msg) ei_x_buff x; int r; - ei_x_new_with_version(&x); + if (ei_x_new_with_version(&x) < 0) { + erl_errno = ENOMEM; + return 0; + } if (ei_x_encode_term(&x, msg) < 0) { erl_errno = EINVAL; r = 0; diff --git a/lib/erl_interface/src/misc/ei_internal.h b/lib/erl_interface/src/misc/ei_internal.h index aa6aacd703..0c58245c0a 100644 --- a/lib/erl_interface/src/misc/ei_internal.h +++ b/lib/erl_interface/src/misc/ei_internal.h @@ -22,19 +22,20 @@ #ifndef _EI_INTERNAL_H #define _EI_INTERNAL_H +#ifdef EI_HIDE_REAL_ERRNO +# define EI_CONN_SAVE_ERRNO__(E) \ + ((E) == ETIMEDOUT ? (erl_errno = ETIMEDOUT) : (erl_errno = EIO)) +#else +# define EI_CONN_SAVE_ERRNO__(E) \ + (erl_errno = (E)) +#endif + /* * Some useful stuff not to be exported to users. */ #ifdef __WIN32__ #define MAXPATHLEN 256 -#define writesocket(sock,buf,nbyte) send(sock,buf,nbyte,0) -#define readsocket(sock,buf,nbyte) recv(sock,buf,nbyte,0) -#else /* not __WIN32__ */ -#define writesocket write -#define readsocket read -#define closesocket close -#define ioctlsocket ioctl #endif /* @@ -155,4 +156,7 @@ extern int ei_tracelevel; void ei_trace_printf(const char *name, int level, const char *format, ...); int ei_internal_use_r9_pids_ports(void); + +int ei_get_cbs_ctx__(ei_socket_callbacks **cbs, void **ctx, int fd); + #endif /* _EI_INTERNAL_H */ diff --git a/lib/erl_interface/src/misc/ei_portio.c b/lib/erl_interface/src/misc/ei_portio.c index 8cd35bf2e5..726b1af82d 100644 --- a/lib/erl_interface/src/misc/ei_portio.c +++ b/lib/erl_interface/src/misc/ei_portio.c @@ -22,6 +22,7 @@ #ifdef __WIN32__ #include #include +#include #include #include #include @@ -35,10 +36,6 @@ static unsigned long param_one = 1; #define SET_BLOCKING(Sock) ioctlsocket((Sock),FIONBIO,¶m_zero) #define SET_NONBLOCKING(Sock) ioctlsocket((Sock),FIONBIO,¶m_one) -#define ERROR_WOULDBLOCK WSAEWOULDBLOCK -#define ERROR_TIMEDOUT WSAETIMEDOUT -#define ERROR_INPROGRESS WSAEINPROGRESS -#define GET_SOCKET_ERROR() WSAGetLastError() #define MEANS_SOCKET_ERROR(Ret) ((Ret == SOCKET_ERROR)) #define IS_INVALID_SOCKET(Sock) ((Sock) == INVALID_SOCKET) @@ -53,15 +50,16 @@ static unsigned long param_one = 1; #include #include #include +#include +#include +#include +#include +#include static unsigned long param_zero = 0; static unsigned long param_one = 1; #define SET_BLOCKING(Sock) ioctl((Sock),FIONBIO,(int)¶m_zero) #define SET_NONBLOCKING(Sock) ioctl((Sock),FIONBIO,(int)¶m_one) -#define ERROR_WOULDBLOCK EWOULDBLOCK -#define ERROR_TIMEDOUT ETIMEDOUT -#define ERROR_INPROGRESS EINPROGRESS -#define GET_SOCKET_ERROR() (errno) #define MEANS_SOCKET_ERROR(Ret) ((Ret) == ERROR) #define IS_INVALID_SOCKET(Sock) ((Sock) < 0) @@ -69,106 +67,394 @@ static unsigned long param_one = 1; #include #include #include -#include #include #include #include +#include +#include +#include +#include -#ifndef EWOULDBLOCK -#define ERROR_WOULDBLOCK EAGAIN -#else -#define ERROR_WOULDBLOCK EWOULDBLOCK -#endif #define SET_BLOCKING(fd) fcntl((fd), F_SETFL, \ fcntl((fd), F_GETFL, 0) & ~O_NONBLOCK) #define SET_NONBLOCKING(fd) fcntl((fd), F_SETFL, \ fcntl((fd), F_GETFL, 0) | O_NONBLOCK) -#define ERROR_TIMEDOUT ETIMEDOUT -#define ERROR_INPROGRESS EINPROGRESS -#define GET_SOCKET_ERROR() (errno) #define MEANS_SOCKET_ERROR(Ret) ((Ret) < 0) #define IS_INVALID_SOCKET(Sock) ((Sock) < 0) #endif /* common includes */ -#include "eidef.h" #include #include #include -#include "ei_portio.h" -#include "ei_internal.h" - #ifdef HAVE_SYS_TIME_H #include #else #include #endif +#include "eidef.h" +#include "ei_portio.h" +#include "ei_internal.h" + +#ifdef __WIN32__ + +#define writesocket(sock,buf,nbyte) send(sock,buf,nbyte,0) +#define readsocket(sock,buf,nbyte) recv(sock,buf,nbyte,0) -#ifdef HAVE_WRITEV -static int ei_writev_t(int fd, struct iovec *iov, int iovcnt, unsigned ms) +static int get_error(void) { - int res; - if (ms != 0) { - fd_set writemask; - struct timeval tv; - tv.tv_sec = (time_t) (ms / 1000U); - ms %= 1000U; - tv.tv_usec = (time_t) (ms * 1000U); - FD_ZERO(&writemask); - FD_SET(fd,&writemask); - switch (select(fd+1, NULL, &writemask, NULL, &tv)) { - case -1 : - return -1; /* i/o error */ - case 0: - return -2; /* timeout */ - default: - if (!FD_ISSET(fd, &writemask)) { - return -1; /* Other error */ - } - } + switch (WSAGetLastError()) { + case WSAEWOULDBLOCK: return EWOULDBLOCK; + case WSAETIMEDOUT: return ETIMEDOUT; + case WSAEINPROGRESS: return EINPROGRESS; + case WSA_NOT_ENOUGH_MEMORY: return ENOMEM; + case WSA_INVALID_PARAMETER: return EINVAL; + case WSAEBADF: return EBADF; + case WSAEINVAL: return EINVAL; + case WSAEADDRINUSE: return EADDRINUSE; + case WSAENETUNREACH: return ENETUNREACH; + case WSAECONNABORTED: return ECONNABORTED; + case WSAECONNRESET: return ECONNRESET; + case WSAECONNREFUSED: return ECONNREFUSED; + case WSAEHOSTUNREACH: return EHOSTUNREACH; + case WSAEMFILE: return EMFILE; + case WSAEALREADY: return EALREADY; + default: return EIO; } +} + +#else /* not __WIN32__ */ + +#define writesocket write +#define readsocket read +#define closesocket close +#define ioctlsocket ioctl + +static int get_error(void) +{ + int err = errno; + if (err == 0) + return EIO; /* Make sure never to return 0 as error code... */ + return err; +} + +#endif + +int ei_plugin_socket_impl__ = 0; + +/* + * Callbacks for communication over TCP/IPv4 + */ + +static int tcp_get_fd(void *ctx, int *fd) +{ + return EI_DFLT_CTX_TO_FD__(ctx, fd); +} + +static int tcp_hs_packet_header_size(void *ctx, int *sz) +{ + int fd; + *sz = 2; + return EI_DFLT_CTX_TO_FD__(ctx, &fd); +} + +static int tcp_handshake_complete(void *ctx) +{ + int res, fd, one = 1; + + res = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (res) + return res; + + res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + res = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + return 0; +} + +static int tcp_socket(void **ctx, void *setup_ctx) +{ + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (MEANS_SOCKET_ERROR(fd)) + return get_error(); + + *ctx = EI_FD_AS_CTX__(fd); + return 0; +} + +static int tcp_close(void *ctx) +{ + int fd, res; + + res = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (res) + return res; + + res = closesocket(fd); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + return 0; +} + +static int tcp_listen(void *ctx, void *addr, int *len, int backlog) +{ + int res, fd; + socklen_t sz = (socklen_t) *len; + int on = 1; + + res = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (res) + return res; + + res = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + res = bind(fd, (struct sockaddr *) addr, sz); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + res = getsockname(fd, (struct sockaddr *) addr, (socklen_t *) &sz); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + *len = (int) sz; + + res = listen(fd, backlog); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + return 0; +} + +static int tcp_accept(void **ctx, void *addr, int *len, unsigned unused) +{ + int fd, res; + socklen_t addr_len = (socklen_t) *len; + + if (!ctx) + return EINVAL; + + res = EI_DFLT_CTX_TO_FD__(*ctx, &fd); + if (res) + return res; + + res = accept(fd, (struct sockaddr*) &addr, &addr_len); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + *len = (int) addr_len; + + *ctx = EI_FD_AS_CTX__(res); + return 0; +} + +static int tcp_connect(void *ctx, void *addr, int len, unsigned unused) +{ + int res, fd; + + res = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (res) + return res; + + res = connect(fd, (struct sockaddr *) addr, len); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + return 0; +} + +#if defined(EI_HAVE_STRUCT_IOVEC__) && defined(HAVE_WRITEV) + +static int tcp_writev(void *ctx, const void *viov, int iovcnt, ssize_t *len, unsigned unused) +{ + const struct iovec *iov = (const struct iovec *) viov; + int fd, error; + ssize_t res; + + error = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (error) + return error; + res = writev(fd, iov, iovcnt); - return (res < 0) ? -1 : res; + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + *len = res; + return 0; +} + +#endif + +static int tcp_write(void *ctx, const char* buf, ssize_t *len, unsigned unused) +{ + int error, fd; + ssize_t res; + + error = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (error) + return error; + + res = writesocket(fd, buf, *len); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + *len = res; + return 0; } -int ei_writev_fill_t(int fd, const struct iovec *iov, int iovcnt, unsigned ms) +static int tcp_read(void *ctx, char* buf, ssize_t *len, unsigned unused) { - int i; - int done; + int error, fd; + ssize_t res; + + error = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (error) + return error; + + res = readsocket(fd, buf, *len); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + *len = res; + return 0; +} + +ei_socket_callbacks ei_default_socket_callbacks = { + 0, /* flags */ + tcp_socket, + tcp_close, + tcp_listen, + tcp_accept, + tcp_connect, +#if defined(EI_HAVE_STRUCT_IOVEC__) && defined(HAVE_WRITEV) + tcp_writev, +#else + NULL, +#endif + tcp_write, + tcp_read, + + tcp_hs_packet_header_size, + tcp_handshake_complete, + tcp_handshake_complete, + tcp_get_fd + +}; + + +/* + * + */ + +#if defined(EI_HAVE_STRUCT_IOVEC__) + +int ei_socket_callbacks_have_writev__(ei_socket_callbacks *cbs) +{ + return !!cbs->writev; +} + +static int writev_ctx_t__(ei_socket_callbacks *cbs, void *ctx, + const struct iovec *iov, int iovcnt, + ssize_t *len, + unsigned ms) +{ + int error; + + if (!(cbs->flags & EI_SCLBK_FLG_FULL_IMPL) && ms != EI_SCLBK_INF_TMO) { + int fd; + + error = EI_GET_FD__(cbs, ctx, &fd); + if (error) + return error; + + do { + fd_set writemask; + struct timeval tv; + + tv.tv_sec = (time_t) (ms / 1000U); + ms %= 1000U; + tv.tv_usec = (time_t) (ms * 1000U); + FD_ZERO(&writemask); + FD_SET(fd,&writemask); + switch (select(fd+1, NULL, &writemask, NULL, &tv)) { + case -1 : + error = get_error(); + if (error != EINTR) + return error; + break; + case 0: + return ETIMEDOUT; /* timeout */ + default: + if (!FD_ISSET(fd, &writemask)) { + return EIO; /* Other error */ + } + error = 0; + break; + } + } while (error == EINTR); + } + do { + error = cbs->writev(ctx, (const void *) iov, iovcnt, len, ms); + } while (error == EINTR); + return error; +} + +int ei_writev_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, + const struct iovec *iov, int iovcnt, + ssize_t *len, + unsigned ms) +{ + ssize_t i, done, sum; struct iovec *iov_base = NULL; struct iovec *current_iov; int current_iovcnt; - int sum; + int fd, error; + int basic; + + if (!cbs->writev) + return ENOTSUP; + + error = EI_GET_FD__(cbs, ctx, &fd); + if (error) + return error; + basic = !(cbs->flags & EI_SCLBK_FLG_FULL_IMPL); + for (sum = 0, i = 0; i < iovcnt; ++i) { sum += iov[i].iov_len; } - if (ms != 0U) { + if (basic && ms != 0U) { SET_NONBLOCKING(fd); } current_iovcnt = iovcnt; current_iov = (struct iovec *) iov; done = 0; for (;;) { - i = ei_writev_t(fd, current_iov, current_iovcnt, ms); - if (i <= 0) { /* ei_writev_t should always return at least 1 */ + + error = writev_ctx_t__(cbs, ctx, current_iov, current_iovcnt, &i, ms); + if (error) { + *len = done; if (ms != 0U) { SET_BLOCKING(fd); } if (iov_base != NULL) { free(iov_base); } - return (i); - } + return error; + } done += i; if (done < sum) { if (iov_base == NULL) { iov_base = malloc(sizeof(struct iovec) * iovcnt); if (iov_base == NULL) { - return -1; + *len = done; + return ENOMEM; } memcpy(iov_base, iov, sizeof(struct iovec) * iovcnt); current_iov = iov_base; @@ -189,195 +475,383 @@ int ei_writev_fill_t(int fd, const struct iovec *iov, int iovcnt, unsigned break; } } - if (ms != 0U) { + if (basic && ms != 0U) { SET_BLOCKING(fd); } if (iov_base != NULL) { free(iov_base); } - return (sum); + *len = done; + return 0; } +#endif /* defined(EI_HAVE_STRUCT_IOVEC__) */ -#endif - -int ei_connect_t(int fd, void *sinp, int sin_siz, unsigned ms) +int ei_socket_ctx__(ei_socket_callbacks *cbs, void **ctx, void *setup_ctx) { int res; - int error; - int s_res; - struct timeval tv; - fd_set writefds; - fd_set exceptfds; - - if (ms == 0) { - res = connect(fd, sinp, sin_siz); - return (res < 0) ? -1 : res; - } else { - SET_NONBLOCKING(fd); - res = connect(fd, sinp, sin_siz); - error = GET_SOCKET_ERROR(); - SET_BLOCKING(fd); - if (!MEANS_SOCKET_ERROR(res)) { - return (res < 0) ? -1 : res; - } else { - if (error != ERROR_WOULDBLOCK && - error != ERROR_INPROGRESS) { - return -1; - } else { - tv.tv_sec = (long) (ms/1000U); - ms %= 1000U; - tv.tv_usec = (long) (ms * 1000U); - FD_ZERO(&writefds); - FD_SET(fd,&writefds); - FD_ZERO(&exceptfds); - FD_SET(fd,&exceptfds); - s_res = select(fd + 1, NULL, &writefds, &exceptfds, &tv); - switch (s_res) { - case 0: - return -2; - case 1: - if (FD_ISSET(fd, &exceptfds)) { - return -1; - } else { - return 0; /* Connect completed */ - } - default: - return -1; - } - } - } - } + + do { + res = cbs->socket(ctx, setup_ctx); + } while (res == EINTR); + + return res; } -int ei_accept_t(int fd, void *addr, void *addrlen, unsigned ms) +int ei_close_ctx__(ei_socket_callbacks *cbs, void *ctx) { - int res; - if (ms != 0) { - fd_set readmask; - struct timeval tv; - tv.tv_sec = (time_t) (ms / 1000U); - ms %= 1000U; - tv.tv_usec = (time_t) (ms * 1000U); - FD_ZERO(&readmask); - FD_SET(fd,&readmask); - switch (select(fd+1, &readmask, NULL, NULL, &tv)) { - case -1 : - return -1; /* i/o error */ - case 0: - return -2; /* timeout */ - default: - if (!FD_ISSET(fd, &readmask)) { - return -1; /* Other error */ - } - } - } - res = (int) accept(fd,addr,addrlen); - return (res < 0) ? -1 : res; + return cbs->close(ctx); } + +int ei_connect_ctx_t__(ei_socket_callbacks *cbs, void *ctx, + void *addr, int len, unsigned ms) +{ + int res, fd; + + if ((cbs->flags & EI_SCLBK_FLG_FULL_IMPL) || ms == EI_SCLBK_INF_TMO) { + do { + res = cbs->connect(ctx, addr, len, ms); + } while (res == EINTR); + return res; + } + + res = EI_GET_FD__(cbs, ctx, &fd); + if (res) + return res; + SET_NONBLOCKING(fd); + do { + res = cbs->connect(ctx, addr, len, 0); + } while (res == EINTR); + SET_BLOCKING(fd); + + switch (res) { + case EINPROGRESS: + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + break; + default: + return res; + } + while (1) { + struct timeval tv; + fd_set writefds; + fd_set exceptfds; + + tv.tv_sec = (long) (ms/1000U); + ms %= 1000U; + tv.tv_usec = (long) (ms * 1000U); + FD_ZERO(&writefds); + FD_SET(fd,&writefds); + FD_ZERO(&exceptfds); + FD_SET(fd,&exceptfds); + res = select(fd + 1, NULL, &writefds, &exceptfds, &tv); + switch (res) { + case -1: + res = get_error(); + if (res != EINTR) + return res; + break; + case 0: + return ETIMEDOUT; + case 1: + if (!FD_ISSET(fd, &exceptfds)) + return 0; /* Connect completed */ + /* fall through... */ + default: + return EIO; + } + } +} -static int ei_read_t(int fd, char* buf, int len, unsigned ms) +int ei_listen_ctx__(ei_socket_callbacks *cbs, void *ctx, + void *adr, int *len, int backlog) { int res; - if (ms != 0) { - fd_set readmask; - struct timeval tv; - tv.tv_sec = (time_t) (ms / 1000U); - ms %= 1000U; - tv.tv_usec = (time_t) (ms * 1000U); - FD_ZERO(&readmask); - FD_SET(fd,&readmask); - switch (select(fd+1, &readmask, NULL, NULL, &tv)) { - case -1 : - return -1; /* i/o error */ - case 0: - return -2; /* timeout */ - default: - if (!FD_ISSET(fd, &readmask)) { - return -1; /* Other error */ - } - } + + do { + res = cbs->listen(ctx, adr, len, backlog); + } while (res == EINTR); + return res; +} + +int ei_accept_ctx_t__(ei_socket_callbacks *cbs, void **ctx, + void *addr, int *len, unsigned ms) +{ + int error; + + if (!(cbs->flags & EI_SCLBK_FLG_FULL_IMPL) && ms != EI_SCLBK_INF_TMO) { + int fd; + + error = EI_GET_FD__(cbs, *ctx, &fd); + if (error) + return error; + + do { + fd_set readmask; + struct timeval tv; + + tv.tv_sec = (time_t) (ms / 1000U); + ms %= 1000U; + tv.tv_usec = (time_t) (ms * 1000U); + FD_ZERO(&readmask); + FD_SET(fd,&readmask); + switch (select(fd+1, &readmask, NULL, NULL, &tv)) { + case -1 : + error = get_error(); + if (error != EINTR) + return error; + break; + case 0: + return ETIMEDOUT; /* timeout */ + default: + if (!FD_ISSET(fd, &readmask)) { + return EIO; /* Other error */ + } + error = 0; + break; + } + } while (error == EINTR); } - res = readsocket(fd, buf, len); - return (res < 0) ? -1 : res; + do { + error = cbs->accept(ctx, addr, len, ms); + } while (error == EINTR); + return error; } -static int ei_write_t(int fd, const char* buf, int len, unsigned ms) +static int read_ctx_t__(ei_socket_callbacks *cbs, void *ctx, + char* buf, ssize_t *len, unsigned ms) { - int res; - if (ms != 0) { - fd_set writemask; - struct timeval tv; - tv.tv_sec = (time_t) (ms / 1000U); - ms %= 1000U; - tv.tv_usec = (time_t) (ms * 1000U); - FD_ZERO(&writemask); - FD_SET(fd,&writemask); - switch (select(fd+1, NULL, &writemask, NULL, &tv)) { - case -1 : - return -1; /* i/o error */ - case 0: - return -2; /* timeout */ - default: - if (!FD_ISSET(fd, &writemask)) { - return -1; /* Other error */ - } - } + int error; + + if (!(cbs->flags & EI_SCLBK_FLG_FULL_IMPL) && ms != EI_SCLBK_INF_TMO) { + int fd; + + error = EI_GET_FD__(cbs, ctx, &fd); + if (error) + return error; + + do { + fd_set readmask; + struct timeval tv; + + tv.tv_sec = (time_t) (ms / 1000U); + ms %= 1000U; + tv.tv_usec = (time_t) (ms * 1000U); + FD_ZERO(&readmask); + FD_SET(fd,&readmask); + switch (select(fd+1, &readmask, NULL, NULL, &tv)) { + case -1 : + error = get_error(); + if (error != EINTR) + return error; + break; + case 0: + return ETIMEDOUT; /* timeout */ + default: + if (!FD_ISSET(fd, &readmask)) { + return EIO; /* Other error */ + } + error = 0; + break; + } + } while (error == EINTR); + } + do { + error = cbs->read(ctx, buf, len, ms); + } while (error == EINTR); + return error; +} + +static int write_ctx_t__(ei_socket_callbacks *cbs, void *ctx, const char* buf, ssize_t *len, unsigned ms) +{ + int error; + + if (!(cbs->flags & EI_SCLBK_FLG_FULL_IMPL) && ms != EI_SCLBK_INF_TMO) { + int fd; + + error = EI_GET_FD__(cbs, ctx, &fd); + if (error) + return error; + + do { + fd_set writemask; + struct timeval tv; + + tv.tv_sec = (time_t) (ms / 1000U); + ms %= 1000U; + tv.tv_usec = (time_t) (ms * 1000U); + FD_ZERO(&writemask); + FD_SET(fd,&writemask); + switch (select(fd+1, NULL, &writemask, NULL, &tv)) { + case -1 : + error = get_error(); + if (error != EINTR) + return error; + break; + case 0: + return ETIMEDOUT; /* timeout */ + default: + if (!FD_ISSET(fd, &writemask)) { + return EIO; /* Other error */ + } + error = 0; + break; + } + } while (error == EINTR); } - res = writesocket(fd, buf, len); - return (res < 0) ? -1 : res; + do { + error = cbs->write(ctx, buf, len, ms); + } while (error == EINTR); + return error; } /* * Fill buffer, return buffer length, 0 for EOF, < 0 (and sets errno) * for error. */ -int ei_read_fill_t(int fd, char* buf, int len, unsigned ms) +int ei_read_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, char* buf, ssize_t *len, unsigned ms) { - int i,got=0; + ssize_t got = 0; + ssize_t want = *len; do { - i = ei_read_t(fd, buf+got, len-got, ms); - if (i <= 0) - return (i); - got += i; - } while (got < len); - return (len); - + ssize_t read_len = want-got; + int error; + + do { + error = read_ctx_t__(cbs, ctx, buf+got, &read_len, ms); + } while (error == EINTR); + if (error) + return error; + if (read_len == 0) { + *len = got; + return 0; + } + got += read_len; + } while (got < want); + + *len = got; + return 0; } /* read_fill */ -int ei_read_fill(int fd, char* buf, int len) +int ei_read_fill_ctx__(ei_socket_callbacks *cbs, void *ctx, char* buf, ssize_t *len) { - return ei_read_fill_t(fd, buf, len, 0); + return ei_read_fill_ctx_t__(cbs, ctx, buf, len, 0); } /* write entire buffer on fd or fail (setting errno) */ -int ei_write_fill_t(int fd, const char *buf, int len, unsigned ms) +int ei_write_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, const char *buf, ssize_t *len, unsigned ms) { - int i,done=0; - if (ms != 0U) { + ssize_t tot = *len, done = 0; + int error, fd = -1, basic = !(cbs->flags & EI_SCLBK_FLG_FULL_IMPL); + + if (basic && ms != 0U) { + error = EI_GET_FD__(cbs, ctx, &fd); + if (error) + return error; SET_NONBLOCKING(fd); } do { - i = ei_write_t(fd, buf+done, len-done, ms); - if (i <= 0) { - if (ms != 0U) { + ssize_t write_len = tot-done; + error = write_ctx_t__(cbs, ctx, buf+done, &write_len, ms); + if (error) { + *len = done; + if (basic && ms != 0U) { SET_BLOCKING(fd); } - return (i); + return error; } - done += i; - } while (done < len); - if (ms != 0U) { + done += write_len; + } while (done < tot); + if (basic && ms != 0U) { SET_BLOCKING(fd); } - return (len); + *len = done; + return 0; +} + +int ei_write_fill_ctx__(ei_socket_callbacks *cbs, void *ctx, const char *buf, ssize_t *len) +{ + return ei_write_fill_ctx_t__(cbs, ctx, buf, len, 0); +} + +/* + * Internal API for TCP/IPv4 + */ + +int ei_connect_t__(int fd, void *addr, int len, unsigned ms) +{ + return ei_connect_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + addr, len, ms); } -int ei_write_fill(int fd, const char *buf, int len) +int ei_socket__(int *fd) { - return ei_write_fill_t(fd, buf, len, 0); + void *ctx; + int error = ei_socket_ctx__(&ei_default_socket_callbacks, &ctx, NULL); + if (error) + return error; + return EI_GET_FD__(&ei_default_socket_callbacks, ctx, fd); } +int ei_close__(int fd) +{ + return ei_close_ctx__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd)); +} + +int ei_listen__(int fd, void *adr, int *len, int backlog) +{ + return ei_listen_ctx__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + adr, len, backlog); +} + +int ei_accept_t__(int *fd, void *addr, int *len, unsigned ms) +{ + void *ctx = EI_FD_AS_CTX__(*fd); + int error = ei_accept_ctx_t__(&ei_default_socket_callbacks, &ctx, + addr, len, ms); + if (error) + return error; + return EI_GET_FD__(&ei_default_socket_callbacks, ctx, fd); +} + +int ei_read_fill_t__(int fd, char* buf, ssize_t *len, unsigned ms) +{ + return ei_read_fill_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + buf, len, ms); +} + +int ei_read_fill__(int fd, char* buf, ssize_t *len) +{ + return ei_read_fill_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + buf, len, 0); +} + +int ei_write_fill_t__(int fd, const char *buf, ssize_t *len, unsigned ms) +{ + return ei_write_fill_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + buf, len, ms); +} + +int ei_write_fill__(int fd, const char *buf, ssize_t *len) +{ + return ei_write_fill_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + buf, len, 0); +} + +#if defined(EI_HAVE_STRUCT_IOVEC__) && defined(HAVE_WRITEV) + +int ei_writev_fill_t__(int fd, const struct iovec *iov, int iovcnt, ssize_t *len, unsigned ms) +{ + return ei_writev_fill_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + iov, iovcnt, len, ms); +} + +#endif + diff --git a/lib/erl_interface/src/misc/ei_portio.h b/lib/erl_interface/src/misc/ei_portio.h index bded811a35..a84b5ca09c 100644 --- a/lib/erl_interface/src/misc/ei_portio.h +++ b/lib/erl_interface/src/misc/ei_portio.h @@ -21,21 +21,94 @@ */ #ifndef _EI_PORTIO_H #define _EI_PORTIO_H -#if !defined(__WIN32__) && !defined(VXWORKS) -#ifdef HAVE_WRITEV + +#undef EI_HAVE_STRUCT_IOVEC__ +#if !defined(__WIN32__) && !defined(VXWORKS) && defined(HAVE_SYS_UIO_H) /* Declaration of struct iovec *iov should be visible in this scope. */ -#include +# include +# define EI_HAVE_STRUCT_IOVEC__ #endif + +/* + * Internal API. Should not be used outside of the erl_interface application... + */ + +int ei_socket_ctx__(ei_socket_callbacks *cbs, void **ctx, void *setup); +int ei_close_ctx__(ei_socket_callbacks *cbs, void *ctx); +int ei_listen_ctx__(ei_socket_callbacks *cbs, void *ctx, void *adr, int *len, int backlog); +int ei_accept_ctx_t__(ei_socket_callbacks *cbs, void **ctx, void *addr, int *len, unsigned ms); +int ei_connect_ctx_t__(ei_socket_callbacks *cbs, void *ctx, void *addr, int len, unsigned ms); +int ei_read_fill_ctx__(ei_socket_callbacks *cbs, void *ctx, char* buf, ssize_t *len); +int ei_write_fill_ctx__(ei_socket_callbacks *cbs, void *ctx, const char *buf, ssize_t *len); +int ei_read_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, char* buf, ssize_t *len, unsigned ms); +int ei_write_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, const char *buf, ssize_t *len, unsigned ms); +#if defined(EI_HAVE_STRUCT_IOVEC__) +int ei_writev_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, const struct iovec *iov, int iovcnt, ssize_t *len, unsigned ms); +int ei_socket_callbacks_have_writev__(ei_socket_callbacks *cbs); #endif -int ei_accept_t(int fd, void *addr, void *addrlen, unsigned ms); -int ei_connect_t(int fd, void *sinp, int sin_siz, unsigned ms); -int ei_read_fill(int fd, char* buf, int len); -int ei_write_fill(int fd, const char *buf, int len); -int ei_read_fill_t(int fd, char* buf, int len, unsigned ms); -int ei_write_fill_t(int fd, const char *buf, int len, unsigned ms); -#ifdef HAVE_WRITEV -int ei_writev_fill_t(int fd, const struct iovec *iov, int iovcnt, unsigned ms); +ei_socket_callbacks ei_default_socket_callbacks; + +#define EI_FD_AS_CTX__(FD) \ + ((void *) (long) (FD)) + +#define EI_DFLT_CTX_TO_FD__(CTX, FD) \ + ((int) (long) (CTX) < 0 \ + ? EBADF \ + : (*(FD) = (int) (long) (CTX), 0)) + +#define EI_GET_FD__(CBS, CTX, FD) \ + ((CBS) == &ei_default_socket_callbacks \ + ? EI_DFLT_CTX_TO_FD__((CTX), FD) \ + : (CBS)->get_fd((CTX), (FD))) + +extern int ei_plugin_socket_impl__; + +#if !defined(_REENTRANT) + +#define EI_HAVE_PLUGIN_SOCKET_IMPL__ \ + ei_plugin_socket_impl__ +#define EI_SET_HAVE_PLUGIN_SOCKET_IMPL__ \ + ei_plugin_socket_impl__ = 1 + +#elif ((ETHR_HAVE___atomic_load_n & SIZEOF_INT) \ + && (ETHR_HAVE___atomic_store_n & SIZEOF_INT)) + +#define EI_HAVE_PLUGIN_SOCKET_IMPL__ \ + __atomic_load_n(&ei_plugin_socket_impl__, __ATOMIC_ACQUIRE) +#define EI_SET_HAVE_PLUGIN_SOCKET_IMPL__ \ + __atomic_store_n(&ei_plugin_socket_impl__, 1, __ATOMIC_RELEASE) + +#else + +/* No gcc atomics; always lookup using ei_get_cbs_ctx()... */ +#define EI_HAVE_PLUGIN_SOCKET_IMPL__ 0 +#define EI_SET_HAVE_PLUGIN_SOCKET_IMPL__ (void) 0 + +#endif + +#define EI_GET_CBS_CTX__(CBS, CTX, FD) \ + (EI_HAVE_PLUGIN_SOCKET_IMPL__ \ + ? ei_get_cbs_ctx__((CBS), (CTX), (FD)) \ + : ((FD) < 0 \ + ? EBADF \ + : (*(CBS) = &ei_default_socket_callbacks, \ + *(CTX) = EI_FD_AS_CTX__((FD)), \ + 0))) +/* + * The following uses our own TCP/IPv4 socket implementation... + */ +int ei_socket__(int *fd); +int ei_close__(int fd); +int ei_listen__(int fd, void *adr, int *len, int backlog); +int ei_accept_t__(int *fd, void *addr, int *len, unsigned ms); +int ei_connect_t__(int fd, void *addr, int len, unsigned ms); +int ei_read_fill__(int fd, char* buf, ssize_t *len); +int ei_write_fill__(int fd, const char *buf, ssize_t *len); +int ei_read_fill_t__(int fd, char* buf, ssize_t *len, unsigned ms); +int ei_write_fill_t__(int fd, const char *buf, ssize_t *len, unsigned ms); +#if defined(EI_HAVE_STRUCT_IOVEC__) && defined(HAVE_WRITEV) +int ei_writev_fill_t__(int fd, const struct iovec *iov, int iovcnt, ssize_t *len, unsigned ms); #endif #endif /* _EI_PORTIO_H */ diff --git a/lib/erl_interface/src/not_used/send_link.c b/lib/erl_interface/src/not_used/send_link.c index 7be476fd93..38fae27df4 100644 --- a/lib/erl_interface/src/not_used/send_link.c +++ b/lib/erl_interface/src/not_used/send_link.c @@ -50,6 +50,7 @@ static int link_unlink(int fd, const erlang_pid *from, const erlang_pid *to, char *s; int index = 0; int n; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; index = 5; /* max sizes: */ ei_encode_version(msgbuf,&index); /* 1 */ @@ -69,7 +70,7 @@ static int link_unlink(int fd, const erlang_pid *from, const erlang_pid *to, if (ei_trace_distribution > 1) ei_show_sendmsg(stderr,msgbuf,NULL); #endif - n = ei_write_fill_t(fd,msgbuf,index,ms); + n = ei_write_fill_t__(fd,msgbuf,index,tmo); return (n==index ? 0 : -1); } -- cgit v1.2.3