diff options
Diffstat (limited to 'erts/emulator')
-rw-r--r-- | erts/emulator/beam/dist.c | 8 | ||||
-rw-r--r-- | erts/emulator/beam/dist.h | 3 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_info.c | 7 | ||||
-rw-r--r-- | erts/emulator/beam/erl_init.c | 25 | ||||
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 831 | ||||
-rw-r--r-- | erts/emulator/test/distribution_SUITE.erl | 93 |
6 files changed, 843 insertions, 124 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 16b6aeac3f..4497e17d79 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -97,6 +97,8 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) #define PASS_THROUGH 'p' /* This code should go */ int erts_is_alive; /* System must be blocked on change */ +int erts_dist_buf_busy_limit; + /* distribution trap functions */ Export* dsend2_trap = NULL; @@ -1453,8 +1455,6 @@ int erts_net_message(Port *prt, return -1; } -#define ERTS_DE_BUSY_LIMIT (128*1024) - static int dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy) { @@ -1540,7 +1540,7 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy) ErtsProcList *plp = NULL; erts_smp_spin_lock(&dep->qlock); dep->qsize += size_obuf(obuf); - if (dep->qsize >= ERTS_DE_BUSY_LIMIT) + if (dep->qsize >= erts_dist_buf_busy_limit) dep->qflgs |= ERTS_DE_QFLG_BUSY; if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) { erts_smp_spin_unlock(&dep->qlock); @@ -1911,7 +1911,7 @@ erts_dist_command(Port *prt, int reds_limit) ASSERT(dep->qsize >= obufsize); dep->qsize -= obufsize; obufsize = 0; - if (de_busy && !prt_busy && dep->qsize < ERTS_DE_BUSY_LIMIT) { + if (de_busy && !prt_busy && dep->qsize < erts_dist_buf_busy_limit) { ErtsProcList *suspendees; int resumed; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index fa19c7fb45..28cdd05c3c 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -99,7 +99,8 @@ typedef struct { #define ERTS_DE_IS_CONNECTED(DEP) \ (!ERTS_DE_IS_NOT_CONNECTED((DEP))) - +#define ERTS_DE_BUSY_LIMIT (128*1024) +extern int erts_dist_buf_busy_limit; extern int erts_is_alive; /* diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 40d8dc097c..801263ec26 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -2533,6 +2533,13 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) BIF_RET(erts_nif_taints(BIF_P)); } else if (ERTS_IS_ATOM_STR("reader_groups_map", BIF_ARG_1)) { BIF_RET(erts_get_reader_groups_map(BIF_P)); + } else if (ERTS_IS_ATOM_STR("dist_buf_busy_limit", BIF_ARG_1)) { + Uint hsz = 0; + + (void) erts_bld_uint(NULL, &hsz, erts_dist_buf_busy_limit); + hp = hsz ? HAlloc(BIF_P, hsz) : NULL; + res = erts_bld_uint(&hp, NULL, erts_dist_buf_busy_limit); + BIF_RET(res); } BIF_ERROR(BIF_P, BADARG); diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index 4ae656a3ad..36fefc5cba 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -535,7 +535,8 @@ void erts_usage(void) erts_fprintf(stderr, "-W<i|w> set error logger warnings mapping,\n"); erts_fprintf(stderr, " see error_logger documentation for details\n"); - + erts_fprintf(stderr, "-zdbbl size set the distribution buffer busy limit in kilobytes\n"); + erts_fprintf(stderr, " valid range is [1-%d]\n", INT_MAX/1024); erts_fprintf(stderr, "\n"); erts_fprintf(stderr, "Note that if the emulator is started with erlexec (typically\n"); erts_fprintf(stderr, "from the erl script), these flags should be specified with +.\n"); @@ -818,7 +819,7 @@ early_init(int *argc, char **argv) /* erl_sys_args(argc, argv); erts_ets_realloc_always_moves = 0; - + erts_dist_buf_busy_limit = ERTS_DE_BUSY_LIMIT; } #ifndef ERTS_SMP @@ -1346,6 +1347,26 @@ erl_start(int argc, char **argv) } break; + case 'z': { + char *sub_param = argv[i]+2; + int new_limit; + + if (has_prefix("dbbl", sub_param)) { + arg = get_arg(sub_param+4, argv[i+1], &i); + new_limit = atoi(arg); + if (new_limit < 1 || INT_MAX/1024 < new_limit) { + erts_fprintf(stderr, "Invalid dbbl limit: %d\n", new_limit); + erts_usage(); + } else { + erts_dist_buf_busy_limit = new_limit*1024; + } + } else { + erts_fprintf(stderr, "bad -z option %s\n", argv[i]); + erts_usage(); + } + break; + } + default: erts_fprintf(stderr, "%s unknown flag %s\n", argv[0], argv[i]); erts_usage(); diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 3de48194fb..f07e4793d2 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -54,6 +54,9 @@ #ifdef HAVE_IFADDRS_H #include <ifaddrs.h> #endif +#ifdef HAVE_NETPACKET_PACKET_H +#include <netpacket/packet.h> +#endif /* All platforms fail on malloc errors. */ #define FATAL_MALLOC @@ -85,6 +88,7 @@ #include <winsock2.h> #endif #include <windows.h> +#include <iphlpapi.h> #include <Ws2tcpip.h> /* NEED VC 6.0 !!! */ @@ -467,6 +471,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_REQ_IFGET 22 #define INET_REQ_IFSET 23 #define INET_REQ_SUBSCRIBE 24 +#define INET_REQ_GETIFADDRS 25 /* TCP requests */ #define TCP_REQ_ACCEPT 40 #define TCP_REQ_LISTEN 41 @@ -3824,39 +3829,81 @@ do { if ((end)-(ptr) < (n)) goto error; } while(0) static char* sockaddr_to_buf(struct sockaddr* addr, char* ptr, char* end) { if (addr->sa_family == AF_INET || addr->sa_family == 0) { - struct in_addr a; - buf_check(ptr,end,sizeof(struct in_addr)); - a = ((struct sockaddr_in*) addr)->sin_addr; - sys_memcpy(ptr, (char*)&a, sizeof(struct in_addr)); - return ptr + sizeof(struct in_addr); + struct in_addr *p = &(((struct sockaddr_in*) addr)->sin_addr); + buf_check(ptr, end, 1 + sizeof(struct in_addr)); + *ptr = INET_AF_INET; + sys_memcpy(ptr+1, (char*)p, sizeof(struct in_addr)); + return ptr + 1 + sizeof(struct in_addr); } #if defined(HAVE_IN6) && defined(AF_INET6) else if (addr->sa_family == AF_INET6) { - struct in6_addr a; - buf_check(ptr,end,sizeof(struct in6_addr)); - a = ((struct sockaddr_in6*) addr)->sin6_addr; - sys_memcpy(ptr, (char*)&a, sizeof(struct in6_addr)); - return ptr + sizeof(struct in6_addr); + struct in6_addr *p = &(((struct sockaddr_in6*) addr)->sin6_addr); + buf_check(ptr, end, 1 + sizeof(struct in6_addr)); + *ptr = INET_AF_INET6; + sys_memcpy(ptr+1, (char*)p, sizeof(struct in6_addr)); + return ptr + 1 + sizeof(struct in6_addr); + } +#endif +#if defined(AF_LINK) + else if (addr->sa_family == AF_LINK) { + struct sockaddr_dl *sdl_p = (struct sockaddr_dl*) addr; + buf_check(ptr, end, 2 + sdl_p->sdl_alen); + put_int16(sdl_p->sdl_alen, ptr); ptr += 2; + sys_memcpy(ptr, sdl_p->sdl_data + sdl_p->sdl_nlen, sdl_p->sdl_alen); + return ptr + sdl_p->sdl_alen; + } +#endif +#if defined(AF_PACKET) && defined(HAVE_NETPACKET_PACKET_H) + else if(addr->sa_family == AF_PACKET) { + struct sockaddr_ll *sll_p = (struct sockaddr_ll*) addr; + buf_check(ptr, end, 2 + sll_p->sll_halen); + put_int16(sll_p->sll_halen, ptr); ptr += 2; + sys_memcpy(ptr, sll_p->sll_addr, sll_p->sll_halen); + return ptr + sll_p->sll_halen; } #endif + return ptr; error: return NULL; - } static char* buf_to_sockaddr(char* ptr, char* end, struct sockaddr* addr) { - buf_check(ptr,end,sizeof(struct in_addr)); - sys_memcpy((char*) &((struct sockaddr_in*)addr)->sin_addr, ptr, - sizeof(struct in_addr)); - addr->sa_family = AF_INET; - return ptr + sizeof(struct in_addr); - + buf_check(ptr,end,1); + switch (*ptr++) { + case INET_AF_INET: { + struct in_addr *p = &((struct sockaddr_in*)addr)->sin_addr; + buf_check(ptr,end,sizeof(struct in_addr)); + sys_memcpy((char*) p, ptr, sizeof(struct in_addr)); + addr->sa_family = AF_INET; + return ptr + sizeof(struct in_addr); + } + case INET_AF_INET6: { + struct in6_addr *p = &((struct sockaddr_in6*)addr)->sin6_addr; + buf_check(ptr,end,sizeof(struct in6_addr)); + sys_memcpy((char*) p, ptr, sizeof(struct in6_addr)); + addr->sa_family = AF_INET6; + return ptr + sizeof(struct in6_addr); + } + } error: return NULL; } +#if defined (IFF_POINTOPOINT) +#define IFGET_FLAGS(cflags) IFGET_FLAGS_P2P(cflags, IFF_POINTOPOINT) +#elif defined IFF_POINTTOPOINT +#define IFGET_FLAGS(cflags) IFGET_FLAGS_P2P(cflags, IFF_POINTTOPOINT) +#endif + +#define IFGET_FLAGS_P2P(cflags, iff_ptp) \ + ((((cflags) & IFF_UP) ? INET_IFF_UP : 0) | \ + (((cflags) & IFF_BROADCAST) ? INET_IFF_BROADCAST : 0) | \ + (((cflags) & IFF_LOOPBACK) ? INET_IFF_LOOPBACK : 0) | \ + (((cflags) & iff_ptp) ? INET_IFF_POINTTOPOINT : 0) | \ + (((cflags) & IFF_UP) ? INET_IFF_RUNNING : 0) | /* emulate running ? */ \ + (((cflags) & IFF_MULTICAST) ? INET_IFF_MULTICAST : 0)) #if defined(__WIN32__) && defined(SIO_GET_INTERFACE_LIST) @@ -3894,7 +3941,6 @@ static int inet_ctl_getiflist(inet_descriptor* desc, char** rbuf, int rsize) return ctl_reply(INET_REP_OK, sbuf, sptr - sbuf, rbuf, rsize); } - /* input is an ip-address in string format i.e A.B.C.D ** scan the INTERFACE_LIST to get the options */ @@ -3980,27 +4026,12 @@ static int inet_ctl_ifget(inet_descriptor* desc, char* buf, int len, break; case INET_IFOPT_FLAGS: { - long eflags = 0; int flags = ifp->iiFlags; /* just enumerate the interfaces (no names) */ - /* translate flags */ - if (flags & IFF_UP) - eflags |= INET_IFF_UP; - if (flags & IFF_BROADCAST) - eflags |= INET_IFF_BROADCAST; - if (flags & IFF_LOOPBACK) - eflags |= INET_IFF_LOOPBACK; - if (flags & IFF_POINTTOPOINT) - eflags |= INET_IFF_POINTTOPOINT; - if (flags & IFF_UP) /* emulate runnign ? */ - eflags |= INET_IFF_RUNNING; - if (flags & IFF_MULTICAST) - eflags |= INET_IFF_MULTICAST; - buf_check(sptr, s_end, 5); *sptr++ = INET_IFOPT_FLAGS; - put_int32(eflags, sptr); + put_int32(IFGET_FLAGS(flags), sptr); sptr += 4; break; } @@ -4021,7 +4052,6 @@ static int inet_ctl_ifset(inet_descriptor* desc, char* buf, int len, return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); } - #elif defined(SIOCGIFCONF) && defined(SIOCSIFFLAGS) /* cygwin has SIOCGIFCONF but not SIOCSIFFLAGS (Nov 2002) */ @@ -4032,69 +4062,77 @@ static int inet_ctl_ifset(inet_descriptor* desc, char* buf, int len, #define SIZEA(p) (sizeof (p)) #endif - -static int inet_ctl_getiflist(inet_descriptor* desc, char** rbuf, int rsize) -{ - struct ifconf ifc; - struct ifreq *ifr; - char *buf; - int buflen, ifc_len, i; - char *sbuf, *sp; - - /* Courtesy of Per Bergqvist and W. Richard Stevens */ - - ifc_len = 0; - buflen = 100 * sizeof(struct ifreq); - buf = ALLOC(buflen); +static int get_ifconf(SOCKET s, struct ifconf *ifcp) { + int ifc_len = 0; + int buflen = 100 * sizeof(struct ifreq); + char *buf = ALLOC(buflen); for (;;) { - ifc.ifc_len = buflen; - ifc.ifc_buf = buf; - if (ioctl(desc->s, SIOCGIFCONF, (char *)&ifc) < 0) { + ifcp->ifc_len = buflen; + ifcp->ifc_buf = buf; + if (ioctl(s, SIOCGIFCONF, (char *)ifcp) < 0) { int res = sock_errno(); if (res != EINVAL || ifc_len) { FREE(buf); - return ctl_error(res, rbuf, rsize); + return -1; } } else { - if (ifc.ifc_len == ifc_len) break; /* buf large enough */ - ifc_len = ifc.ifc_len; + if (ifcp->ifc_len == ifc_len) break; /* buf large enough */ + ifc_len = ifcp->ifc_len; } buflen += 10 * sizeof(struct ifreq); buf = (char *)REALLOC(buf, buflen); } - - sp = sbuf = ALLOC(ifc_len+1); + return 0; +} + +static void free_ifconf(struct ifconf *ifcp) { + FREE(ifcp->ifc_buf); +} + +static int inet_ctl_getiflist(inet_descriptor* desc, char** rbuf, int rsize) +{ + struct ifconf ifc; + struct ifreq *ifrp; + char *sbuf, *sp; + int i; + + /* Courtesy of Per Bergqvist and W. Richard Stevens */ + + if (get_ifconf(desc->s, &ifc) < 0) { + return ctl_error(sock_errno(), rbuf, rsize); + } + + sp = sbuf = ALLOC(ifc.ifc_len+1); *sp++ = INET_REP_OK; i = 0; for (;;) { int n; - - ifr = (struct ifreq *) VOIDP(buf + i); - n = sizeof(ifr->ifr_name) + SIZEA(ifr->ifr_addr); - if (n < sizeof(*ifr)) n = sizeof(*ifr); - if (i+n > ifc_len) break; + + ifrp = (struct ifreq *) VOIDP(ifc.ifc_buf + i); + n = sizeof(ifrp->ifr_name) + SIZEA(ifrp->ifr_addr); + if (n < sizeof(*ifrp)) n = sizeof(*ifrp); + if (i+n > ifc.ifc_len) break; i += n; - - switch (ifr->ifr_addr.sa_family) { + + switch (ifrp->ifr_addr.sa_family) { #if defined(HAVE_IN6) && defined(AF_INET6) case AF_INET6: #endif case AF_INET: - ASSERT(sp+IFNAMSIZ+1 < sbuf+buflen+1) - strncpy(sp, ifr->ifr_name, IFNAMSIZ); + ASSERT(sp+IFNAMSIZ+1 < sbuf+ifc.ifc_len+1) + strncpy(sp, ifrp->ifr_name, IFNAMSIZ); sp[IFNAMSIZ] = '\0'; sp += strlen(sp), ++sp; } - - if (i >= ifc_len) break; + + if (i >= ifc.ifc_len) break; } - FREE(buf); + free_ifconf(&ifc); *rbuf = sbuf; return sp - sbuf; } - /* FIXME: temporary hack */ #ifndef IFHWADDRLEN #define IFHWADDRLEN 6 @@ -4133,37 +4171,52 @@ static int inet_ctl_ifget(inet_descriptor* desc, char* buf, int len, #ifdef SIOCGIFHWADDR if (ioctl(desc->s, SIOCGIFHWADDR, (char *)&ifreq) < 0) break; - buf_check(sptr, s_end, 1+IFHWADDRLEN); + buf_check(sptr, s_end, 1+2+IFHWADDRLEN); *sptr++ = INET_IFOPT_HWADDR; + put_int16(IFHWADDRLEN, sptr); sptr += 2; /* raw memcpy (fix include autoconf later) */ sys_memcpy(sptr, (char*)(&ifreq.ifr_hwaddr.sa_data), IFHWADDRLEN); sptr += IFHWADDRLEN; -#elif defined(HAVE_GETIFADDRS) - struct ifaddrs *ifa, *ifp; - int found = 0; - - if (getifaddrs(&ifa) == -1) - goto error; +#elif defined(SIOCGENADDR) + if (ioctl(desc->s, SIOCGENADDR, (char *)&ifreq) < 0) + break; + buf_check(sptr, s_end, 1+2+sizeof(ifreq.ifr_enaddr)); + *sptr++ = INET_IFOPT_HWADDR; + put_int16(sizeof(ifreq.ifr_enaddr), sptr); sptr += 2; + /* raw memcpy (fix include autoconf later) */ + sys_memcpy(sptr, (char*)(&ifreq.ifr_enaddr), + sizeof(ifreq.ifr_enaddr)); + sptr += sizeof(ifreq.ifr_enaddr); +#elif defined(HAVE_GETIFADDRS) && defined(AF_LINK) + struct ifaddrs *ifa, *ifp; + struct sockaddr_dl *sdlp; + int found = 0; + + if (getifaddrs(&ifa) == -1) + goto error; - for (ifp = ifa; ifp; ifp = ifp->ifa_next) { - if ((ifp->ifa_addr->sa_family == AF_LINK) && - (sys_strcmp(ifp->ifa_name, ifreq.ifr_name) == 0)) { - found = 1; - break; - } - } + for (ifp = ifa; ifp; ifp = ifp->ifa_next) { + if ((ifp->ifa_addr->sa_family == AF_LINK) && + (sys_strcmp(ifp->ifa_name, ifreq.ifr_name) == 0)) { + found = 1; + break; + } + } - if (found == 0) { - freeifaddrs(ifa); - break; - } + if (found == 0) { + freeifaddrs(ifa); + break; + } + sdlp = (struct sockaddr_dl *)ifp->ifa_addr; - buf_check(sptr, s_end, 1+IFHWADDRLEN); - *sptr++ = INET_IFOPT_HWADDR; - sys_memcpy(sptr, ((struct sockaddr_dl *)ifp->ifa_addr)->sdl_data + - ((struct sockaddr_dl *)ifp->ifa_addr)->sdl_nlen, IFHWADDRLEN); - freeifaddrs(ifa); - sptr += IFHWADDRLEN; + buf_check(sptr, s_end, 1+2+sdlp->sdl_alen); + *sptr++ = INET_IFOPT_HWADDR; + put_int16(sdlp->sdl_alen, sptr); sptr += 2; + sys_memcpy(sptr, + sdlp->sdl_data + sdlp->sdl_nlen, + sdlp->sdl_alen); + freeifaddrs(ifa); + sptr += sdlp->sdl_alen; #endif break; } @@ -4240,29 +4293,15 @@ static int inet_ctl_ifget(inet_descriptor* desc, char* buf, int len, case INET_IFOPT_FLAGS: { int flags; - int eflags = 0; if (ioctl(desc->s, SIOCGIFFLAGS, (char*)&ifreq) < 0) flags = 0; else flags = ifreq.ifr_flags; - /* translate flags */ - if (flags & IFF_UP) - eflags |= INET_IFF_UP; - if (flags & IFF_BROADCAST) - eflags |= INET_IFF_BROADCAST; - if (flags & IFF_LOOPBACK) - eflags |= INET_IFF_LOOPBACK; - if (flags & IFF_POINTOPOINT) - eflags |= INET_IFF_POINTTOPOINT; - if (flags & IFF_RUNNING) - eflags |= INET_IFF_RUNNING; - if (flags & IFF_MULTICAST) - eflags |= INET_IFF_MULTICAST; buf_check(sptr, s_end, 5); *sptr++ = INET_IFOPT_FLAGS; - put_int32(eflags, sptr); + put_int32(IFGET_FLAGS(flags), sptr); sptr += 4; break; } @@ -4300,17 +4339,22 @@ static int inet_ctl_ifset(inet_descriptor* desc, char* buf, int len, (void) ioctl(desc->s, SIOCSIFADDR, (char*)&ifreq); break; - case INET_IFOPT_HWADDR: - buf_check(buf, b_end, IFHWADDRLEN); + case INET_IFOPT_HWADDR: { + unsigned int len; + buf_check(buf, b_end, 2); + len = get_int16(buf); buf += 2; + buf_check(buf, b_end, len); #ifdef SIOCSIFHWADDR /* raw memcpy (fix include autoconf later) */ - sys_memcpy((char*)(&ifreq.ifr_hwaddr.sa_data), buf, IFHWADDRLEN); + sys_memset((char*)(&ifreq.ifr_hwaddr.sa_data), + '\0', sizeof(ifreq.ifr_hwaddr.sa_data)); + sys_memcpy((char*)(&ifreq.ifr_hwaddr.sa_data), buf, len); (void) ioctl(desc->s, SIOCSIFHWADDR, (char *)&ifreq); #endif - buf += IFHWADDRLEN; + buf += len; break; - + } case INET_IFOPT_BROADADDR: #ifdef SIOCSIFBRDADDR @@ -4415,6 +4459,551 @@ static int inet_ctl_ifset(inet_descriptor* desc, char* buf, int len, #endif + + +/* Latin-1 to utf8 */ + +static int utf8_len(const char *c, int m) { + int l; + for (l = 0; m; c++, l++, m--) { + if (*c == '\0') break; + if ((*c & 0x7f) != *c) l++; + } + return l; +} + +static void utf8_encode(const char *c, int m, char *p) { + for (; m; c++, m--) { + if (*c == '\0') break; + if ((*c & 0x7f) != *c) { + *p++ = (char) (0xC0 | (0x03 & (*c >> 6))); + *p++ = (char) (0x80 | (0x3F & *c)); + } else { + *p++ = (char) *c; + } + } +} + +#if defined(__WIN32__) + +static void set_netmask_bytes(char *c, int len, int pref_len) { + int i, m; + for (i = 0, m = pref_len >> 3; i < m && i < len; i++) c[i] = '\xFF'; + if (i < len) c[i++] = 0xFF << (8 - (pref_len & 7)); + for (; i < len; i++) c[i] = '\0'; +} + + +int eq_masked_bytes(char *a, char *b, int pref_len) { + int i, m; + for (i = 0, m = pref_len >> 3; i < m; i++) { + if (a[i] != b[i]) return 0; + } + m = pref_len & 7; + if (m) { + m = 0xFF & (0xFF << (8 - m)); + if ((a[i] & m) != (b[i] & m)) return 0; + } + return !0; +} + +static int inet_ctl_getifaddrs(inet_descriptor* desc_p, + char **rbuf_pp, int rsize) +{ + int i; + DWORD ret, n; + IP_INTERFACE_INFO *info_p; + MIB_IPADDRTABLE *ip_addrs_p; + IP_ADAPTER_ADDRESSES *ip_adaddrs_p, *ia_p; + + char *buf_p; + char *buf_alloc_p; + int buf_size =512; +# define BUF_ENSURE(Size) \ + do { \ + int NEED_, GOT_ = buf_p - buf_alloc_p; \ + NEED_ = GOT_ + (Size); \ + if (NEED_ > buf_size) { \ + buf_size = NEED_ + 512; \ + buf_alloc_p = REALLOC(buf_alloc_p, buf_size); \ + buf_p = buf_alloc_p + GOT_; \ + } \ + } while(0) +# define SOCKADDR_TO_BUF(opt, sa) \ + do { \ + if (sa) { \ + char *P_; \ + *buf_p++ = (opt); \ + while (! (P_ = sockaddr_to_buf((sa), buf_p, \ + buf_alloc_p+buf_size))) { \ + int GOT_ = buf_p - buf_alloc_p; \ + buf_size += 512; \ + buf_alloc_p = REALLOC(buf_alloc_p, buf_size); \ + buf_p = buf_alloc_p + GOT_; \ + } \ + if (P_ == buf_p) { \ + buf_p--; \ + } else { \ + buf_p = P_; \ + } \ + } \ + } while (0) + + { + /* Try GetAdaptersAddresses, if it is available */ + unsigned long ip_adaddrs_size = 16 * 1024; + ULONG family = AF_UNSPEC; + ULONG flags = + GAA_FLAG_INCLUDE_PREFIX | GAA_FLAG_SKIP_ANYCAST | + GAA_FLAG_SKIP_DNS_SERVER | GAA_FLAG_SKIP_FRIENDLY_NAME | + GAA_FLAG_SKIP_MULTICAST; + ULONG (WINAPI *fpGetAdaptersAddresses) + (ULONG, ULONG, PVOID, PIP_ADAPTER_ADDRESSES, PULONG); + HMODULE iphlpapi = GetModuleHandle("iphlpapi"); + fpGetAdaptersAddresses = (void *) + (iphlpapi ? + GetProcAddress(iphlpapi, "GetAdaptersAddresses") : + NULL); + if (fpGetAdaptersAddresses) { + ip_adaddrs_p = ALLOC(ip_adaddrs_size); + for (i = 17; i; i--) { + ret = fpGetAdaptersAddresses( + family, flags, NULL, ip_adaddrs_p, &ip_adaddrs_size); + ip_adaddrs_p = REALLOC(ip_adaddrs_p, ip_adaddrs_size); + if (ret == NO_ERROR) break; + if (ret == ERROR_BUFFER_OVERFLOW) continue; + i = 0; + } + if (! i) { + FREE(ip_adaddrs_p); + ip_adaddrs_p = NULL; + } + } else ip_adaddrs_p = NULL; + } + + { + /* Load the IP_INTERFACE_INFO table (only IPv4 interfaces), + * reliable source of interface names on XP + */ + unsigned long info_size = 4 * 1024; + info_p = ALLOC(info_size); + for (i = 17; i; i--) { + ret = GetInterfaceInfo(info_p, &info_size); + info_p = REALLOC(info_p, info_size); + if (ret == NO_ERROR) break; + if (ret == ERROR_INSUFFICIENT_BUFFER) continue; + i = 0; + } + if (! i) { + FREE(info_p); + info_p = NULL; + } + } + + if (! ip_adaddrs_p) { + /* If GetAdaptersAddresses gave nothing we fall back to + * MIB_IPADDRTABLE (only IPv4 interfaces) + */ + unsigned long ip_addrs_size = 16 * sizeof(*ip_addrs_p); + ip_addrs_p = ALLOC(ip_addrs_size); + for (i = 17; i; i--) { + ret = GetIpAddrTable(ip_addrs_p, &ip_addrs_size, FALSE); + ip_addrs_p = REALLOC(ip_addrs_p, ip_addrs_size); + if (ret == NO_ERROR) break; + if (ret == ERROR_INSUFFICIENT_BUFFER) continue; + i = 0; + } + if (! i) { + if (info_p) FREE(info_p); + FREE(ip_addrs_p); + return ctl_reply(INET_REP_OK, NULL, 0, rbuf_pp, rsize); + } + } else ip_addrs_p = NULL; + + buf_p = buf_alloc_p = ALLOC(buf_size); + *buf_p++ = INET_REP_OK; + + /* Iterate over MIB_IPADDRTABLE or IP_ADAPTER_ADDRESSES */ + for (ia_p = NULL, ip_addrs_p ? ((void *)(i = 0)) : (ia_p = ip_adaddrs_p); + ip_addrs_p ? (i < ip_addrs_p->dwNumEntries) : (ia_p != NULL); + ip_addrs_p ? ((void *)(i++)) : (ia_p = ia_p->Next)) { + MIB_IPADDRROW *ipaddrrow_p = NULL; + DWORD flags = INET_IFF_MULTICAST; + DWORD index = 0; + WCHAR *wname_p = NULL; + MIB_IFROW ifrow; + + if (ip_addrs_p) { + ipaddrrow_p = ip_addrs_p->table + i; + index = ipaddrrow_p->dwIndex; + } else { + index = ia_p->IfIndex; + if (ia_p->Flags & IP_ADAPTER_NO_MULTICAST) { + flags &= ~INET_IFF_MULTICAST; + } + } +index: + if (! index) goto done; + sys_memzero(&ifrow, sizeof(ifrow)); + ifrow.dwIndex = index; + if (GetIfEntry(&ifrow) != NO_ERROR) break; + /* Find the interface name - first try MIB_IFROW.wzname */ + if (ifrow.wszName[0] != 0) { + wname_p = ifrow.wszName; + } else { + /* Then try IP_ADAPTER_INDEX_MAP.Name (only IPv4 adapters) */ + int j; + for (j = 0; j < info_p->NumAdapters; j++) { + if (info_p->Adapter[j].Index == (ULONG) ifrow.dwIndex) { + if (info_p->Adapter[j].Name[0] != 0) { + wname_p = info_p->Adapter[j].Name; + } + break; + } + } + } + if (wname_p) { + int len; + /* Convert interface name to UTF-8 */ + len = + WideCharToMultiByte( + CP_UTF8, 0, wname_p, -1, NULL, 0, NULL, NULL); + if (! len) break; + BUF_ENSURE(len); + WideCharToMultiByte( + CP_UTF8, 0, wname_p, -1, buf_p, len, NULL, NULL); + buf_p += len; + } else { + /* Found no name - + * use "MIB_IFROW.dwIndex: MIB_IFROW.bDescr" as name instead */ + int l; + l = utf8_len(ifrow.bDescr, ifrow.dwDescrLen); + BUF_ENSURE(9 + l+1); + buf_p += + erts_sprintf( + buf_p, "%lu: ", (unsigned long) ifrow.dwIndex); + utf8_encode(ifrow.bDescr, ifrow.dwDescrLen, buf_p); + buf_p += l; + *buf_p++ = '\0'; + } + /* Interface flags, often make up broadcast and multicast flags */ + switch (ifrow.dwType) { + case IF_TYPE_ETHERNET_CSMACD: + flags |= INET_IFF_BROADCAST; + break; + case IF_TYPE_SOFTWARE_LOOPBACK: + flags |= INET_IFF_LOOPBACK; + flags &= ~INET_IFF_MULTICAST; + break; + default: + flags &= ~INET_IFF_MULTICAST; + break; + } + if (ifrow.dwAdminStatus) { + flags |= INET_IFF_UP; + switch (ifrow.dwOperStatus) { + case IF_OPER_STATUS_CONNECTING: + flags |= INET_IFF_POINTTOPOINT; + break; + case IF_OPER_STATUS_CONNECTED: + flags |= INET_IFF_RUNNING | INET_IFF_POINTTOPOINT; + break; + case IF_OPER_STATUS_OPERATIONAL: + flags |= INET_IFF_RUNNING; + break; + } + } + BUF_ENSURE(1 + 4); + *buf_p++ = INET_IFOPT_FLAGS; + put_int32(flags, buf_p); buf_p += 4; + if (ipaddrrow_p) { + /* Legacy implementation through GetIpAddrTable */ + struct sockaddr_in sin; + /* IP Address */ + sys_memzero(&sin, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = ipaddrrow_p->dwAddr; + BUF_ENSURE(1); + /* Netmask */ + SOCKADDR_TO_BUF(INET_IFOPT_ADDR, (struct sockaddr *) &sin); + sin.sin_addr.s_addr = ipaddrrow_p->dwMask; + BUF_ENSURE(1); + SOCKADDR_TO_BUF(INET_IFOPT_NETMASK, (struct sockaddr *) &sin); + if (flags & INET_IFF_BROADCAST) { + /* Broadcast address - fake it*/ + sin.sin_addr.s_addr = ipaddrrow_p->dwAddr; + sin.sin_addr.s_addr |= ~ipaddrrow_p->dwMask; + BUF_ENSURE(1); + SOCKADDR_TO_BUF( + INET_IFOPT_BROADADDR, (struct sockaddr *) &sin); + } + } else { + IP_ADAPTER_UNICAST_ADDRESS *p; + /* IP Address(es) */ + for (p = ia_p->FirstUnicastAddress; + p; + p = p->Next) + { + IP_ADAPTER_PREFIX *q; + ULONG shortest_length; + struct sockaddr *shortest_p, *sa_p = p->Address.lpSockaddr; + BUF_ENSURE(1); + SOCKADDR_TO_BUF(INET_IFOPT_ADDR, sa_p); + shortest_p = NULL; + shortest_length = 0; + for (q = ia_p->FirstPrefix; + q; + q = q->Next) { + struct sockaddr *sp_p = q->Address.lpSockaddr; + if (sa_p->sa_family != sp_p->sa_family) continue; + switch (sa_p->sa_family) { + case AF_INET: { + struct sockaddr_in sin; + DWORD sa, sp, mask; + sa = ntohl((DWORD) + ((struct sockaddr_in *) + sa_p)->sin_addr.s_addr); + sp = ntohl((DWORD) + ((struct sockaddr_in *) + sp_p)->sin_addr.s_addr); + mask = 0xFFFFFFFF << (32 - q->PrefixLength); + if ((sa & mask) != (sp & mask)) continue; + if ((! shortest_p) + || q->PrefixLength < shortest_length) { + shortest_p = sp_p; + shortest_length = q->PrefixLength; + } + } break; + case AF_INET6: { + struct sockaddr_in6 sin6; + if (!eq_masked_bytes((char *) + &((struct sockaddr_in6 *) + sa_p)->sin6_addr, + (char *) + &((struct sockaddr_in6 *) + sp_p)->sin6_addr, + q->PrefixLength)) { + continue; + } + if ((! shortest_p) + || q->PrefixLength < shortest_length) { + shortest_p = sp_p; + shortest_length = q->PrefixLength; + } + } break; + } + } + if (! shortest_p) { + /* Found no shortest prefix */ + shortest_p = sa_p; + switch (shortest_p->sa_family) { + case AF_INET: { + /* Fall back to old classfull network addresses */ + DWORD addr = ntohl(((struct sockaddr_in *)shortest_p) + ->sin_addr.s_addr); + if (! (addr & 0x800000)) { + /* Class A */ + shortest_length = 8; + } else if (! (addr & 0x400000)) { + /* Class B */ + shortest_length = 16; + } else if (! (addr & 0x200000)) { + /* Class C */ + shortest_length = 24; + } else { + shortest_length = 32; + } + } break; + case AF_INET6: { + /* Just play it safe */ + shortest_length = 128; + } break; + } + } + switch (shortest_p->sa_family) { + case AF_INET: { + struct sockaddr_in sin; + DWORD mask = 0xFFFFFFFF << (32 - shortest_length); + sys_memzero(&sin, sizeof(sin)); + sin.sin_family = shortest_p->sa_family; + sin.sin_addr.s_addr = htonl(mask); + BUF_ENSURE(1); + SOCKADDR_TO_BUF(INET_IFOPT_NETMASK, + (struct sockaddr *) &sin); + if (flags & INET_IFF_BROADCAST) { + DWORD sp = + ntohl((DWORD) + ((struct sockaddr_in *)shortest_p) + -> sin_addr.s_addr); + sin.sin_addr.s_addr = htonl(sp | ~mask); + BUF_ENSURE(1); + SOCKADDR_TO_BUF(INET_IFOPT_BROADADDR, + (struct sockaddr *) &sin); + } + } break; + case AF_INET6: { + struct sockaddr_in6 sin6; + sys_memzero(&sin6, sizeof(sin6)); + sin6.sin6_family = shortest_p->sa_family; + set_netmask_bytes((char *) &sin6.sin6_addr, + 16, + shortest_length); + BUF_ENSURE(1); + SOCKADDR_TO_BUF(INET_IFOPT_NETMASK, + (struct sockaddr *) &sin6); + } break; + } + } + } + if (ifrow.dwPhysAddrLen) { + /* Hardware Address */ + BUF_ENSURE(1 + 2 + ifrow.dwPhysAddrLen); + *buf_p++ = INET_IFOPT_HWADDR; + put_int16(ifrow.dwPhysAddrLen, buf_p); buf_p += 2; + sys_memcpy(buf_p, ifrow.bPhysAddr, ifrow.dwPhysAddrLen); + buf_p += ifrow.dwPhysAddrLen; + } + +done: + /* That is all for this interface */ + BUF_ENSURE(1); + *buf_p++ = '\0'; + if (ia_p && + ia_p->Ipv6IfIndex && + ia_p->Ipv6IfIndex != index) + { + /* Oops, there was an other interface for IPv6. Possible? XXX */ + index = ia_p->Ipv6IfIndex; + goto index; + } + } + + if (ip_adaddrs_p) FREE(ip_adaddrs_p); + if (info_p) FREE(info_p); + if (ip_addrs_p) FREE(ip_addrs_p); + + buf_size = buf_p - buf_alloc_p; + buf_alloc_p = REALLOC(buf_alloc_p, buf_size); + /* buf_p is now unreliable */ + *rbuf_pp = buf_alloc_p; + return buf_size; +# undef BUF_ENSURE +} + +#elif defined(HAVE_GETIFADDRS) + +static int inet_ctl_getifaddrs(inet_descriptor* desc_p, + char **rbuf_pp, int rsize) +{ + struct ifaddrs *ifa_p, *ifa_free_p; + + int buf_size; + char *buf_p; + char *buf_alloc_p; + + buf_size = 512; + buf_alloc_p = ALLOC(buf_size); + buf_p = buf_alloc_p; +# define BUF_ENSURE(Size) \ + do { \ + int NEED_, GOT_ = buf_p - buf_alloc_p; \ + NEED_ = GOT_ + (Size); \ + if (NEED_ > buf_size) { \ + buf_size = NEED_ + 512; \ + buf_alloc_p = REALLOC(buf_alloc_p, buf_size); \ + buf_p = buf_alloc_p + GOT_; \ + } \ + } while (0) +# define SOCKADDR_TO_BUF(opt, sa) \ + do { \ + if (sa) { \ + char *P_; \ + *buf_p++ = (opt); \ + while (! (P_ = sockaddr_to_buf((sa), buf_p, \ + buf_alloc_p+buf_size))) { \ + int GOT_ = buf_p - buf_alloc_p; \ + buf_size += 512; \ + buf_alloc_p = REALLOC(buf_alloc_p, buf_size); \ + buf_p = buf_alloc_p + GOT_; \ + } \ + if (P_ == buf_p) { \ + buf_p--; \ + } else { \ + buf_p = P_; \ + } \ + } \ + } while (0) + + if (getifaddrs(&ifa_p) < 0) { + return ctl_error(sock_errno(), rbuf_pp, rsize); + } + ifa_free_p = ifa_p; + *buf_p++ = INET_REP_OK; + for (; ifa_p; ifa_p = ifa_p->ifa_next) { + int len = utf8_len(ifa_p->ifa_name, -1); + BUF_ENSURE(len+1 + 1+4 + 1); + utf8_encode(ifa_p->ifa_name, -1, buf_p); + buf_p += len; + *buf_p++ = '\0'; + *buf_p++ = INET_IFOPT_FLAGS; + put_int32(IFGET_FLAGS(ifa_p->ifa_flags), buf_p); buf_p += 4; + if (ifa_p->ifa_addr->sa_family == AF_INET +#if defined(AF_INET6) + || ifa_p->ifa_addr->sa_family == AF_INET6 +#endif + ) { + SOCKADDR_TO_BUF(INET_IFOPT_ADDR, ifa_p->ifa_addr); + BUF_ENSURE(1); + SOCKADDR_TO_BUF(INET_IFOPT_NETMASK, ifa_p->ifa_netmask); + if (ifa_p->ifa_flags & IFF_POINTOPOINT) { + BUF_ENSURE(1); + SOCKADDR_TO_BUF(INET_IFOPT_DSTADDR, ifa_p->ifa_dstaddr); + } else if (ifa_p->ifa_flags & IFF_BROADCAST) { + BUF_ENSURE(1); + SOCKADDR_TO_BUF(INET_IFOPT_BROADADDR, ifa_p->ifa_broadaddr); + } + } +#if defined(AF_LINK) || defined(AF_PACKET) + else if ( +#if defined(AF_LINK) + ifa_p->ifa_addr->sa_family == AF_LINK +#else + 0 +#endif +#if defined(AF_PACKET) + || ifa_p->ifa_addr->sa_family == AF_PACKET +#endif + ) { + char *bp = buf_p; + BUF_ENSURE(1); + SOCKADDR_TO_BUF(INET_IFOPT_HWADDR, ifa_p->ifa_addr); + if (buf_p - bp < 4) buf_p = bp; /* Empty hwaddr */ + } +#endif + BUF_ENSURE(1); + *buf_p++ = '\0'; + } + buf_size = buf_p - buf_alloc_p; + buf_alloc_p = REALLOC(buf_alloc_p, buf_size); + /* buf_p is now unreliable */ + freeifaddrs(ifa_free_p); + *rbuf_pp = buf_alloc_p; + return buf_size; +# undef BUF_ENSURE +} + +#else + +static int inet_ctl_getifaddrs(inet_descriptor* desc_p, + char **rbuf_pp, int rsize) +{ + return ctl_error(ENOTSUP, rbuf_pp, rsize); +} + +#endif + + + #ifdef VXWORKS /* ** THIS is a terrible creature, a bug in the TCP part @@ -5293,12 +5882,15 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len) if (pmtud_enable) cflags |= SPP_PMTUD_ENABLE; if (pmtud_disable) cflags |= SPP_PMTUD_DISABLE; +# ifdef HAVE_STRUCT_SCTP_PADDRPARAMS_SPP_SACKDELAY + /* The followings are missing in FreeBSD 7.1 */ sackdelay_enable =eflags& SCTP_FLAG_SACDELAY_ENABLE; sackdelay_disable=eflags& SCTP_FLAG_SACDELAY_DISABLE; if (sackdelay_enable && sackdelay_disable) return -1; if (sackdelay_enable) cflags |= SPP_SACKDELAY_ENABLE; if (sackdelay_disable) cflags |= SPP_SACKDELAY_DISABLE; +# endif arg.pap.spp_flags = cflags; # endif @@ -6199,13 +6791,15 @@ static int sctp_fill_opts(inet_descriptor* desc, char* buf, int buflen, if (ap.spp_flags & SPP_PMTUD_DISABLE) { i = LOAD_ATOM (spec, i, am_pmtud_disable); n++; } - +# ifdef HAVE_STRUCT_SCTP_PADDRPARAMS_SPP_SACKDELAY + /* SPP_SACKDELAY_* not in FreeBSD 7.1 */ if (ap.spp_flags & SPP_SACKDELAY_ENABLE) { i = LOAD_ATOM (spec, i, am_sackdelay_enable); n++; } if (ap.spp_flags & SPP_SACKDELAY_DISABLE) { i = LOAD_ATOM (spec, i, am_sackdelay_disable); n++; } # endif +# endif PLACE_FOR(spec, i, LOAD_NIL_CNT + LOAD_LIST_CNT + 2*LOAD_TUPLE_CNT); @@ -6676,6 +7270,13 @@ static int inet_ctl(inet_descriptor* desc, int cmd, char* buf, int len, return inet_ctl_getiflist(desc, rbuf, rsize); } + case INET_REQ_GETIFADDRS: { + DEBUGF(("inet_ctl(%ld): GETIFADDRS\r\n", (long)desc->port)); + if (!IS_OPEN(desc)) + return ctl_xerror(EXBADPORT, rbuf, rsize); + return inet_ctl_getifaddrs(desc, rbuf, rsize); + } + case INET_REQ_IFGET: { DEBUGF(("inet_ctl(%ld): IFGET\r\n", (long)desc->port)); if (!IS_OPEN(desc)) diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index 7c19274696..79252d0593 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -27,6 +27,7 @@ -export([all/1, ping/1, bulk_send/1, bulk_send_small/1, bulk_send_big/1, + bulk_send_bigbig/1, local_send/1, local_send_small/1, local_send_big/1, local_send_legal/1, link_to_busy/1, exit_to_busy/1, lost_exit/1, link_to_dead/1, link_to_dead_new_node/1, @@ -50,7 +51,8 @@ -export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0, roundtrip/1, bounce/1, do_dist_auto_connect/1, inet_rpc_server/1, dist_parallel_sender/3, dist_parallel_receiver/0, - dist_evil_parallel_receiver/0]). + dist_evil_parallel_receiver/0, + sendersender/4, sendersender2/4]). all(suite) -> [ ping, bulk_send, local_send, link_to_busy, exit_to_busy, @@ -121,7 +123,7 @@ bulk_send(doc) -> "the time. This tests that a process that is suspended on a ", "busy port will eventually be resumed."]; bulk_send(suite) -> - [bulk_send_small, bulk_send_big]. + [bulk_send_small, bulk_send_big, bulk_send_bigbig]. bulk_send_small(Config) when is_list(Config) -> ?line bulk_send(64, 32). @@ -129,6 +131,9 @@ bulk_send_small(Config) when is_list(Config) -> bulk_send_big(Config) when is_list(Config) -> ?line bulk_send(32, 64). +bulk_send_bigbig(Config) when is_list(Config) -> + ?line bulk_sendsend(32*5, 4). + bulk_send(Terms, BinSize) -> ?line Dog = test_server:timetrap(test_server:seconds(30)), @@ -145,6 +150,53 @@ bulk_send(Terms, BinSize) -> ?line test_server:timetrap_cancel(Dog), {comment, integer_to_list(trunc(Size/1024/Elapsed+0.5)) ++ " K/s"}. +bulk_sendsend(Terms, BinSize) -> + {Rate1, MonitorCount1} = bulk_sendsend2(Terms, BinSize, 5), + {Rate2, MonitorCount2} = bulk_sendsend2(Terms, BinSize, 995), + Ratio = if MonitorCount2 == 0 -> MonitorCount1 / 1.0; + true -> MonitorCount1 / MonitorCount2 + end, + %% A somewhat arbitrary ratio, but hopefully one that will accomodate + %% a wide range of CPU speeds. + true = (Ratio > 8.0), + {comment, + integer_to_list(Rate1) ++ " K/s, " ++ + integer_to_list(Rate2) ++ " K/s, " ++ + integer_to_list(MonitorCount1) ++ " monitor msgs, " ++ + integer_to_list(MonitorCount2) ++ " monitor msgs, " ++ + float_to_list(Ratio) ++ " monitor ratio"}. + +bulk_sendsend2(Terms, BinSize, BusyBufSize) -> + ?line Dog = test_server:timetrap(test_server:seconds(30)), + + ?line io:format("Sending ~w binaries, each of size ~w K", + [Terms, BinSize]), + ?line {ok, NodeRecv} = start_node(bulk_receiver), + ?line Recv = spawn(NodeRecv, erlang, apply, [fun receiver/2, [0, 0]]), + ?line Bin = list_to_binary(lists:duplicate(BinSize*1024, 253)), + ?line Size = Terms*size(Bin), + + %% SLF LEFT OFF HERE. + %% When the caller uses small hunks, like 4k via + %% bulk_sendsend(32*5, 4), then (on my laptop at least), we get + %% zero monitor messages. But if we use "+zdbbl 5", then we + %% get a lot of monitor messages. So, if we can count up the + %% total number of monitor messages that we get when running both + %% default busy size and "+zdbbl 5", and if the 5 case gets + %% "many many more" monitor messages, then we know we're working. + + ?line {ok, NodeSend} = start_node(bulk_sender, "+zdbbl " ++ integer_to_list(BusyBufSize)), + ?line _Send = spawn(NodeSend, erlang, apply, [fun sendersender/4, [self(), Recv, Bin, Terms]]), + ?line {Elapsed, {TermsN, SizeN}, MonitorCount} = + receive {sendersender, BigRes} -> + BigRes + end, + ?line stop_node(NodeRecv), + ?line stop_node(NodeSend), + + ?line test_server:timetrap_cancel(Dog), + {trunc(SizeN/1024/Elapsed+0.5), MonitorCount}. + sender(To, _Bin, 0) -> To ! {done, self()}, receive @@ -155,6 +207,43 @@ sender(To, Bin, Left) -> To ! {term, Bin}, sender(To, Bin, Left-1). +%% Sender process to be run on a slave node + +sendersender(Parent, To, Bin, Left) -> + erlang:system_monitor(self(), [busy_dist_port]), + [spawn(fun() -> sendersender2(To, Bin, Left, false) end) || + _ <- lists:seq(1,1)], + {USec, {Res, MonitorCount}} = + timer:tc(?MODULE, sendersender2, [To, Bin, Left, true]), + Parent ! {sendersender, {USec/1000000, Res, MonitorCount}}. + +sendersender2(To, Bin, Left, SendDone) -> + sendersender3(To, Bin, Left, SendDone, 0). + +sendersender3(To, _Bin, 0, SendDone, MonitorCount) -> + if SendDone -> + To ! {done, self()}; + true -> + ok + end, + receive + {monitor, _Pid, _Type, _Info} = M -> + sendersender3(To, _Bin, 0, SendDone, MonitorCount + 1) + after 0 -> + if SendDone -> + receive + Any when is_tuple(Any), size(Any) == 2 -> + {Any, MonitorCount} + end; + true -> + exit(normal) + end + end; +sendersender3(To, Bin, Left, SendDone, MonitorCount) -> + To ! {term, Bin}, + %%timer:sleep(50), + sendersender3(To, Bin, Left-1, SendDone, MonitorCount). + %% Receiver process to be run on a slave node. receiver(Terms, Size) -> |