/* * %CopyrightBegin% * * Copyright Ericsson AB 1998-2009. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ /* * Erlang port program to do the name service lookup for the erlang * distribution and inet part of the kernel. * A pool of subprocess is kept, to which a pair of pipes is connected. * The main process schedules requests among the different subprocesses * (created with fork()), to be able to handle as many requests as possible * simultaneously. The controlling erlang machine may request a "cancel", * in which case the process may be killed and restarted when the need arises. * The single numeric parameter to this program is the maximum port pool size, * which is the size of the bookkeeping array. * * Windows: * There is instead of a pool of processes a pool of threads. * Communication is not done through pipes but via message queues between * the threads. The only "pipes" involved are the ones used for communicating * with Erlang. * Important note: * For unknown reasons, the combination of a thread doing blocking I/O on * a named pipe at the same time as another thread tries to resolve a hostname * may (with certain software configurations) block the gethostbyname call (!) * For that reason, standard input (and standard output) should be opened * in asynchronous mode (FILE_FLAG_OVERLAPPED), which has to be done by Erlang. * A special flag to open_port is used to work around this behaviour in winsock * and the threads doing read and write handle asynchronous I/O. * The ReadFile and WriteFile calls try to cope with both types of I/O, why * the code is not really as Microsoft describes "the right way to do it" in * their documentation. Important to note is that **there is no supported way * to retrieve the information if the HANDLE was opened with * FILE_FLAG_OVERLAPPED from the HANDLE itself**. * */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "erl_printf.h" #ifdef WIN32 #define WIN32_LEAN_AND_MEAN #include #include #include #include #include /* These are not used even if they would exist which they should not */ #undef HAVE_GETADDRINFO #undef HAVE_GETIPNODEBYNAME #undef HAVE_GETHOSTBYNAME2 #undef HAVE_GETNAMEINFO #undef HAVE_GETIPNODEBYADDR #else /* Unix */ #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef HAVE_SYS_TIME_H #include #else #include #endif #include #ifndef RETSIGTYPE #define RETSIGTYPE void #endif /* To simplify #ifdef code further down - select only one to be defined... ** Use them in pairs - if one is broken do not trust its mate. **/ #if defined(HAVE_GETADDRINFO) && defined(HAVE_GETNAMEINFO) #undef HAVE_GETIPNODEBYNAME #undef HAVE_GETIPNODEBYADDR #undef HAVE_GETHOSTBYNAME2 #elif defined(HAVE_GETIPNODEBYNAME) && defined(HAVE_GETIPNODEBYADDR) #undef HAVE_GETADDRINFO #undef HAVE_GETNAMEINFO #undef HAVE_GETHOSTBYNAME2 #else #undef HAVE_GETIPNODEBYNAME #undef HAVE_GETIPNODEBYADDR #undef HAVE_GETADDRINFO #undef HAVE_GETNAMEINFO #endif #endif /* !WIN32 */ #define PACKET_BYTES 4 #ifdef WIN32 #define READ_PACKET_BYTES(X,Y,Z) read_int32((X),(Y),(Z)) #else #define READ_PACKET_BYTES(X,Y) read_int32((X),(Y)) #endif #define PUT_PACKET_BYTES(X,Y) put_int32((X),(Y)) /* The serial numbers of the requests */ typedef int SerialType; #define INVALID_SERIAL -1 /* The operations performed by this program */ typedef unsigned char OpType; #define OP_GETHOSTBYNAME 1 #define OP_GETHOSTBYADDR 2 #define OP_CANCEL_REQUEST 3 #define OP_CONTROL 4 /* The protocol (IPV4/IPV6) */ typedef unsigned char ProtoType; #define PROTO_IPV4 1 #define PROTO_IPV6 2 /* OP_CONTROL */ typedef unsigned char CtlType; #define SETOPT_DEBUG_LEVEL 0 /* The unit of an IP address (0 == error, 4 == IPV4, 16 == IPV6) */ typedef unsigned char UnitType; #define UNIT_ERROR 0 #define UNIT_IPV4 4 #define UNIT_IPV6 16 /* And the byte type */ typedef unsigned char AddrByte; /* Must be compatible with character datatype */ /* * Marshalled format of request: *{ * Serial: 32 bit big endian * Op:8 bit [1,2,3] * If op == 1 { * Proto:8 bit [1,2] * Str: Null terminated array of characters * } Else if op == 2 { * Proto:8 bit [1,2] * If proto == 1 { * B0..B3: 4 bytes, most significant first * } Else (proto == 2) { * B0..B15: 16 bytes, most significant first * } * } * (No more if op == 3) *} * The request arrives as a packet, with 4 packet size bytes. */ /* The main process unpackes the marshalled message and sends the data * to a suitable port process or, in the case of a close request, kills the * suitable port process. There is also a que of requests linked together, * for when all subrocesses are busy. */ typedef struct QueItem { struct QueItem *next; int req_size; AddrByte request[1]; } QueItem; /* Variable size due to request's variable size */ QueItem *que_first; QueItem *que_last; #ifdef WIN32 typedef struct mesq { HANDLE data_present; CRITICAL_SECTION crit; int shutdown; QueItem *first; QueItem *last; } MesQ; MesQ *to_erlang; MesQ *from_erlang; #endif /* * Marshalled format of reply: *{ * Serial: 32 bit big endian * Unit: 8 bit, same as h_length or 0 for error * if unit == 0 { * Str: Null terminated character string explaining the error * } else { * Naddr: 32 bit big endian * if unit = 4 { * (B0..B3)0..(B0..B3)Naddr-1: Naddr*4 bytes most significant first * } else if unit == 16 { * (B0..B15)0..(B0..B15)Naddr-1: Naddr*16 bytes most significant first * } * Nnames: 32 bit big endian >= 1 * Name0: Null terminated string of characters * Alias[0]..Alias[Nnames - 2]: Nnames - 1 Null terminated strings of chars * } *} * Four packet size bytes prepended (big endian) */ /* Internal error codes */ #define ERRCODE_NOTSUP 1 #define ERRCODE_HOST_NOT_FOUND 2 #define ERRCODE_TRY_AGAIN 3 #define ERRCODE_NO_RECOVERY 4 #define ERRCODE_NO_DATA 5 #define ERRCODE_NETDB_INTERNAL 7 /* * Each worker process is represented in the parent by the following struct */ typedef unsigned WorkerState; #define WORKER_EMPTY 0 /* No process created */ #define WORKER_FREE 1 /* Living waiting process */ #define WORKER_BUSY 2 /* Living busy process */ #define WORKER_STALLED 3 /* Living cancelled process */ /* The timeout when killing a child process in seconds*/ #define CHILDWAIT_TMO 1 /* The domainname size_limit */ #define DOMAINNAME_MAX 258 /* 255 + Opcode + Protocol + Null termination */ typedef struct { WorkerState state; #ifdef WIN32 DWORD pid; /* 0 if unused */ MesQ *writeto; /* Message queues */ MesQ *readfrom; #else pid_t pid; /* -1 if unused */ int writeto, readfrom; /* Pipes */ #endif SerialType serial; AddrByte domain[DOMAINNAME_MAX]; QueItem *que_first; QueItem *que_last; int que_size; } Worker; int num_busy_workers; int num_free_workers; int num_stalled_workers; int max_workers; int greedy_threshold; Worker *busy_workers; /* Workers doing any job that someone really is interested in */ Worker *free_workers; /* Really free workers */ Worker *stalled_workers; /* May still deliver answers which we will discard */ #define BEE_GREEDY() (num_busy_workers >= greedy_threshold) static char *program_name; static int debug_level; #ifdef WIN32 static HANDLE debug_console_allocated = INVALID_HANDLE_VALUE; #endif #ifdef NODEBUG #define DEBUGF(L,P) /* Nothing */ #else #define DEBUGF(Level,Printf) do { if (debug_level >= (Level)) \ debugf Printf;} while(0) #endif #define ALLOC(Size) my_malloc(Size) #define REALLOC(Old, Size) my_realloc((Old), (Size)) #define FREE(Ptr) free(Ptr) #ifdef WIN32 #define WAKEUP_WINSOCK() do { \ char dummy_buff[100]; \ gethostname(dummy_buff,99); \ } while (0) #endif /* The internal prototypes */ static char *format_address(int siz, AddrByte *addr); static void debugf(char *format, ...); static void warning(char *format, ...); static void fatal(char *format, ...); static void *my_malloc(size_t size); static void *my_realloc(void *old, size_t size); static int get_int32(AddrByte *buff); static void put_int32(AddrByte *buff, int value); static int create_worker(Worker *pworker, int save_que); static int map_netdb_error(int netdb_code); #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) static int map_netdb_error_ai(int netdb_code); #endif static char *errcode_to_string(int errcode); static size_t build_error_reply(SerialType serial, int errnum, AddrByte **preply, size_t *preply_size); #ifdef HAVE_GETADDRINFO static size_t build_reply_ai(SerialType serial, int, struct addrinfo *, AddrByte **preply, size_t *preply_size); #endif static size_t build_reply(SerialType serial, struct hostent *he, AddrByte **preply, size_t *preply_size); static int read_request(AddrByte **buff, size_t *buff_size); static OpType get_op(AddrByte *buff); static AddrByte *get_op_addr(AddrByte *buff); static SerialType get_serial(AddrByte *buff); static ProtoType get_proto(AddrByte *buff); static CtlType get_ctl(AddrByte *buff); static AddrByte *get_data(AddrByte *buff); static int get_debug_level(AddrByte *buff); static int relay_reply(Worker *pw); static int ignore_reply(Worker *pw); static void init_workers(int max); static void kill_worker(Worker *pw); static Worker *pick_worker(void); static void kill_last_picked_worker(void); static void stall_worker(SerialType serial); static int handle_io_busy(int ndx); static int handle_io_free(int ndx); static int handle_io_stalled(int ndx); static void check_que(void); static void main_loop(void); static void usage(char *unknown); static void domaincopy(AddrByte *out,AddrByte *in); static int domaineq(AddrByte *d1, AddrByte *d2); static int get_domainname(AddrByte *inbuff, int insize, AddrByte *domainbuff); static Worker *pick_worker_greedy(AddrByte *domainbuff); static void restart_worker(Worker *w); static void start_que_request(Worker *w) ; #ifdef WIN32 static int read_int32(HANDLE fd, int *res, HANDLE ev); static int read_exact(HANDLE fd, void *vbuff, DWORD nbytes, HANDLE ev); static int write_exact(HANDLE fd, AddrByte *buff, DWORD len,HANDLE ev); DWORD WINAPI worker_loop(void *v); DWORD WINAPI reader(void *data); DWORD WINAPI writer(void *data); static int send_mes_to_worker(QueItem *m, Worker *pw); BOOL create_mesq(MesQ **q); BOOL enque_mesq(MesQ *q, QueItem *m); BOOL deque_mesq(MesQ *q, QueItem **m); BOOL close_mesq(MesQ *q); HANDLE event_mesq(MesQ *q); #else static size_t read_int32(int fd, int *res); static ssize_t read_exact(int fd, void *vbuff, size_t nbytes); static int write_exact(int fd, AddrByte *buff, int len); void reap_children(int ignored); static void init_signals(void); static void kill_all_workers(void); static void close_all_worker_fds(void); static int worker_loop(void); static int fillin_reply(Worker *pw); static int send_request_to_worker(AddrByte *pr, int rsize, Worker *pw); #endif #define ERL_DBG_LVL_ENV_VAR "ERL_INET_GETHOST_DEBUG" static int get_env_debug_level(void) { #ifdef __WIN32__ char value[21]; /* Enough for any 64-bit values */ DWORD sz = GetEnvironmentVariable((LPCTSTR) ERL_DBG_LVL_ENV_VAR, (LPTSTR) value, (DWORD) sizeof(value)); if (sz == 0 || sz > sizeof(value)) return 0; #else char *value = getenv(ERL_DBG_LVL_ENV_VAR); if (!value) return 0; #endif return atoi(value); } #ifdef WIN32 static void do_allocate_console(void) { AllocConsole(); debug_console_allocated = CreateFile ("CONOUT$", GENERIC_WRITE, FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); } #ifdef HARDDEBUG DWORD WINAPI pseudo_worker_loop(void *v); static void poll_gethost(int row); #endif #endif /* * Main */ int main(int argc, char **argv) { int num_workers = 1; char **ap = argv + 1; int x; int disable_greedy = 0; program_name = *argv; que_first = que_last = NULL; debug_level = get_env_debug_level(); greedy_threshold = 0; while (*ap) { if (!strcmp(*ap, "-d")) { ++debug_level; } else if(!strcmp(*ap, "-g") && *(ap + 1)) { ++ap; x = atoi(*ap); if (!x) { usage(*ap); } else { greedy_threshold = x; } } else if(!strcmp(*ap, "-ng")) { disable_greedy = 1; } else { x = atoi(*ap); if (!x) { usage(*ap); } else { num_workers = x; } } ++ap; } #ifdef WIN32 if (num_workers > 60 || greedy_threshold > 60) { usage("More than 60 workers on windows impossible!"); num_workers = 60; greedy_threshold = 0; } #endif if(!greedy_threshold) { greedy_threshold = (3*num_workers)/4; /* 75% */ if (!greedy_threshold) { greedy_threshold = num_workers; } } if (disable_greedy) { greedy_threshold = num_workers + 1; } #ifdef WIN32 { WORD wr; WSADATA wsa_data; int wsa_error; wr = MAKEWORD(2,0); wsa_error = WSAStartup(wr,&wsa_data); if (wsa_error) { fatal("Could not open usable winsock library."); } if (LOBYTE(wsa_data.wVersion) != 2 || HIBYTE(wsa_data.wVersion) != 0) { fatal("Could not open recent enough winsock library."); } if (debug_level >= 1) { do_allocate_console(); DEBUGF(1,("num_workers = %d, greedy_threshold = %d, " "debug_level = %d.", num_workers, greedy_threshold, debug_level)); } } WAKEUP_WINSOCK(); /* Why on earth is this needed? */ #endif init_workers(num_workers); main_loop(); #ifndef WIN32 kill_all_workers(); #endif return 0; } static void usage(char *unknown) { fprintf(stderr,"%s: Unknown option \"%s\"\n" "Usage: %s [-d [-d ...]] [-g ] " "[]\n", program_name, unknown, program_name); } /* * Main process main loop */ static int handle_io_busy(int ndx) { /* Probably an answer */ int res; res = relay_reply(&busy_workers[ndx]); if (res < 0) { /* Bad worker */ if (busy_workers[ndx].que_size) { restart_worker(&busy_workers[ndx]); start_que_request(&busy_workers[ndx]); return 0; } else { kill_worker(&busy_workers[ndx]); --num_busy_workers; busy_workers[ndx] = busy_workers[num_busy_workers]; } return 1; } else if (res == 0) { /* Erlang has closed */ return -1; } else { if (busy_workers[ndx].que_size) { start_que_request(&busy_workers[ndx]); return 0; } /* The worker is no longer busy, it should be in the free list */ free_workers[num_free_workers] = busy_workers[ndx]; free_workers[num_free_workers].state = WORKER_FREE; ++num_free_workers; --num_busy_workers; busy_workers[ndx] = busy_workers[num_busy_workers]; return 1; } } static int handle_io_free(int ndx) { /* IO from a free worker means "kill me" */ DEBUGF(1,("Free worker[%ld] spontaneously died.", (long) free_workers[ndx].pid)); kill_worker(&free_workers[ndx]); --num_free_workers; free_workers[ndx] = free_workers[num_free_workers]; return 1; } static int handle_io_stalled(int ndx) { int res; res = ignore_reply(&stalled_workers[ndx]); if (res <= 0) { /* Bad worker */ kill_worker(&stalled_workers[ndx]); --num_stalled_workers; stalled_workers[ndx] = stalled_workers[num_stalled_workers]; return 1; } else { DEBUGF(3,("Ignoring reply from stalled worker[%ld].", (long) stalled_workers[ndx].pid)); free_workers[num_free_workers] = stalled_workers[ndx]; free_workers[num_free_workers].state = WORKER_FREE; ++num_free_workers; --num_stalled_workers; stalled_workers[ndx] = stalled_workers[num_stalled_workers]; return 1; } } static void check_que(void) { /* Check if anything in the que can be handled */ Worker *cw; while (que_first) { QueItem *qi,*nxt; qi = que_first; nxt = qi->next; /* Need to save before it's getting put in another que in threaded solution */ if ((cw = pick_worker()) == NULL) { break; } #ifdef WIN32 { SerialType save_serial = get_serial(que_first->request); if (send_mes_to_worker(que_first, cw) != 0) { kill_last_picked_worker(); continue; } cw->serial = save_serial; } #else if (send_request_to_worker(que_first->request, que_first->req_size, cw) != 0) { /* Couldn't send request, kill the worker and retry */ kill_last_picked_worker(); continue; } cw->serial = get_serial(que_first->request); #endif /* Went well, lets deque */ que_first = nxt; if (que_first == NULL) { que_last = NULL; } DEBUGF(3,("Did deque serial %d, Que is %sempty", get_serial(qi->request), (que_first) ? "not " : "")); #ifndef WIN32 FREE(qi); #endif } } static int clean_que_of(SerialType s) { QueItem **qi; int i; for(qi=&que_first;*qi != NULL && s != get_serial((*qi)->request); qi = &((*qi)->next)) ; if(*qi != NULL) { QueItem *r = *qi; *qi = (*qi)->next; FREE(r); if(que_last == r) { /* Lost the "last" pointer, should be very uncommon if the que is not empty, so we simply do a traversal to reclaim it. */ if (que_first == NULL) { que_last = NULL; } else { for (que_last=que_first;que_last->next != NULL; que_last = que_last->next) ; } } DEBUGF(3,("Removing serial %d from global que on request, " "que %sempty",s, (que_first) ? "not " : "")); return 1; } for (i = 0; i < num_busy_workers; ++i) { for(qi=&(busy_workers[i].que_first);*qi != NULL && s != get_serial((*qi)->request); qi = &((*qi)->next)) ; if(*qi != NULL) { QueItem *r = *qi; *qi = (*qi)->next; FREE(r); if(busy_workers[i].que_last == r) { /* Lost the "last" pointer, should be very uncommon if the que is not empty, so we simply do a traversal to reclaim it. */ if (busy_workers[i].que_first == NULL) { busy_workers[i].que_last = NULL; if (busy_workers[i].que_size != 1) { fatal("Worker que size counter incorrect, internal datastructure error."); } } else { for (busy_workers[i].que_last = busy_workers[i].que_first; busy_workers[i].que_last->next != NULL; busy_workers[i].que_last = busy_workers[i].que_last->next) ; } } --(busy_workers[i].que_size); DEBUGF(3,("Removing serial %d from worker[%ld] specific que " "on request, que %sempty", s, (long) busy_workers[i].pid, (busy_workers[i].que_first) ? "not " : "")); return 1; } } return 0; } static void main_loop(void) { AddrByte *inbuff = NULL; int insize; int i,w; #ifdef WIN32 HANDLE handles[64]; DWORD num_handles; DWORD index; QueItem *qi; #else size_t inbuff_size = 0; fd_set fds; int max_fd; #endif int new_data; int save_serial; /* It's important that the free workers list is handled first */ Worker *workers[3] = {free_workers, busy_workers, stalled_workers}; int *wsizes[3] = {&num_free_workers, &num_busy_workers, &num_stalled_workers}; int (*handlers[3])(int) = {&handle_io_free, &handle_io_busy, &handle_io_stalled}; Worker *cw; AddrByte domainbuff[DOMAINNAME_MAX]; #ifdef WIN32 { DWORD dummy; /* Create the reader and writer */ if ((!create_mesq(&to_erlang)) || (!create_mesq(&from_erlang))) { fatal("Could not create message que! errno = %d.",GetLastError()); } if (((HANDLE) _beginthreadex(NULL,0,writer,to_erlang,0,&dummy)) == NULL) { fatal("Could not create writer thread! errno = %d.",GetLastError()); } if (((HANDLE) _beginthreadex(NULL,0,reader,from_erlang,0,&dummy)) == NULL) { fatal("Could not create reader thread! errno = %d.",GetLastError()); } DEBUGF(4,("Created reader and writer threads.")); #ifdef HARDDEBUG poll_gethost(__LINE__); #endif } #endif for(;;) { #ifdef WIN32 num_handles = 0; handles[num_handles++] = event_mesq(from_erlang); for (w = 0; w < 3; ++w) { for (i = 0; i < *wsizes[w]; ++i) { handles[num_handles++] = event_mesq(workers[w][i].readfrom); } } if ((index = WaitForMultipleObjects(num_handles, handles, FALSE, INFINITE)) == WAIT_FAILED) { fatal("Could not WaitForMultpleObjects! errno = %d.",GetLastError()); } w = 0; index -= WAIT_OBJECT_0; DEBUGF(4,("Got data on index %d.",index)); if (index > 0) { if (((int)index - 1) < *wsizes[0]) { (*handlers[0])(index - 1); } else if (((int)index - 1) < ((*wsizes[0]) + (*wsizes[1]))) { (*handlers[1])(index - 1 - (*wsizes[0])); } else { (*handlers[2])(index - 1 - (*wsizes[0]) - (*wsizes[1])); } } new_data = (index == 0); #else max_fd = 0; FD_ZERO(&fds); FD_SET(0,&fds); for (w = 0; w < 3; ++w) { for (i = 0; i < *wsizes[w]; ++i) { FD_SET(workers[w][i].readfrom,&fds); if (workers[w][i].readfrom > max_fd) { max_fd = workers[w][i].readfrom; } } } for (;;) { if (select(max_fd + 1,&fds,NULL,NULL,NULL) < 0) { if (errno == EINTR) { continue; } else { fatal("Select failed (invalid internal structures?), " "errno = %d.",errno); } } break; } for (w = 0; w < 3; ++w) { for (i = 0; i < *wsizes[w]; ++i) { if (FD_ISSET(workers[w][i].readfrom, &fds)) { int hres = (*handlers[w])(i); if (hres < 0) { return; } else { i -= hres; /* We'll retry this position, if hres == 1. The position is usually replaced with another worker, a worker with I/O usually changes state as we use blocking file I/O */ } } } } new_data = FD_ISSET(0,&fds); #endif check_que(); /* Now check for new requests... */ if (new_data) { /* Erlang... */ OpType op; #ifdef WIN32 if (!deque_mesq(from_erlang,&qi)) { DEBUGF(1,("Erlang has closed.")); return; } insize = qi->req_size; inbuff = qi->request; DEBUGF(4,("Got data from erlang.")); DEBUGF(4,("OPeration == %d.",get_op(inbuff))); #else insize = read_request(&inbuff, &inbuff_size); if (insize == 0) { /* Other errors taken care of in read_request */ DEBUGF(1,("Erlang has closed.")); return; } #endif op = get_op(inbuff); if (op == OP_CANCEL_REQUEST) { SerialType serial = get_serial(inbuff); if (!clean_que_of(serial)) { for (i = 0; i < num_busy_workers; ++i) { if (busy_workers[i].serial == serial) { if (busy_workers[i].que_size) { restart_worker(&busy_workers[i]); start_que_request(&busy_workers[i]); } else { stall_worker(i); check_que(); } break; } } } #ifdef WIN32 FREE(qi); #endif continue; /* New select */ } else if (op == OP_CONTROL) { CtlType ctl; SerialType serial = get_serial(inbuff); if (serial != INVALID_SERIAL) { fatal("Invalid serial: %d.", serial); } switch (ctl = get_ctl(inbuff)) { case SETOPT_DEBUG_LEVEL: { int tmp_debug_level = get_debug_level(inbuff); #ifdef WIN32 if (debug_console_allocated == INVALID_HANDLE_VALUE && tmp_debug_level > 0) { DWORD res; do_allocate_console(); WriteFile(debug_console_allocated, "Hej\n",4,&res,NULL); } #endif debug_level = tmp_debug_level; DEBUGF(debug_level, ("debug_level = %d", debug_level)); for (w = 0; w < 3; ++w) { for (i = 0; i < *wsizes[w]; i++) { int res; #ifdef WIN32 QueItem *m; #endif cw = &(workers[w][i]); #ifdef WIN32 m = ALLOC(sizeof(QueItem) - 1 + qi->req_size); memcpy(m->request, qi->request, (m->req_size = qi->req_size)); m->next = NULL; if ((res = send_mes_to_worker(m, cw)) != 0) { FREE(m); } #else res = send_request_to_worker(inbuff, insize, cw); #endif if (res != 0) { kill_worker(cw); (*wsizes[w])--; *cw = workers[w][*wsizes[w]]; } } } } break; default: warning("Unknown control requested from erlang (%d), " "message discarded.", (int) ctl); break; } #ifdef WIN32 FREE(qi); #endif continue; /* New select */ } else { ProtoType proto; if (op != OP_GETHOSTBYNAME && op != OP_GETHOSTBYADDR) { warning("Unknown operation requested from erlang (%d), " "message discarded.", op); #ifdef WIN32 FREE(qi); #endif continue; } if ((proto = get_proto(inbuff)) != PROTO_IPV4 && proto != PROTO_IPV6) { warning("Unknown protocol requested from erlang (%d), " "message discarded.", proto); #ifdef WIN32 FREE(qi); #endif continue; } if (get_domainname(inbuff,insize,domainbuff) < 0) { warning("Malformed message sent from erlang, no domain, " "message discarded.", op); #ifdef WIN32 FREE(qi); #endif continue; } } if (BEE_GREEDY()) { DEBUGF(4,("Beeing greedy!")); if ((cw = pick_worker_greedy(domainbuff)) != NULL) { /* Put it in the worker specific que if the domainname matches... */ #ifndef WIN32 QueItem *qi = ALLOC(sizeof(QueItem) - 1 + insize); qi->req_size = insize; memcpy(&(qi->request), inbuff, insize); qi->next = NULL; #endif if (!cw->que_first) { cw->que_first = cw->que_last = qi; } else { cw->que_last->next = qi; cw->que_last = qi; } ++(cw->que_size); continue; } /* Otherwise busyness as usual */ } save_serial = get_serial(inbuff); while ((cw = pick_worker()) != NULL) { int res; #ifdef WIN32 res = send_mes_to_worker(qi,cw); #else res = send_request_to_worker(inbuff, insize, cw); #endif if (res == 0) { break; } else { kill_last_picked_worker(); } } if (cw == NULL) { /* Insert into que */ #ifndef WIN32 QueItem *qi = ALLOC(sizeof(QueItem) - 1 + insize); qi->req_size = insize; memcpy(&(qi->request), inbuff, insize); qi->next = NULL; #endif if (!que_first) { que_first = que_last = qi; } else { que_last->next = qi; que_last = qi; } } else { cw->serial = save_serial; domaincopy(cw->domain, domainbuff); } } } } /* * Main process worker administration */ static void init_workers(int max) { max_workers = max; num_busy_workers = 0; num_free_workers = 0; num_stalled_workers = 0; busy_workers = ALLOC(sizeof(Worker) * max_workers); free_workers = ALLOC(sizeof(Worker) * max_workers); stalled_workers = ALLOC(sizeof(Worker) * max_workers); #ifndef WIN32 init_signals(); #endif } #ifdef WIN32 static void kill_worker(Worker *pw) { /* Cannot really kill a thread in win32, have to just leave it to die */ close_mesq(pw->writeto); close_mesq(pw->readfrom); pw->state = WORKER_EMPTY; } #else static void kill_worker(Worker *pw) { fd_set fds; struct timeval tmo; int selret; static char buff[1024]; DEBUGF(3,("Killing worker[%ld] with fd %d, serial %d", (long) pw->pid, (int) pw->readfrom, (int) pw->serial)); kill(pw->pid, SIGUSR1); /* This is all just to check that the child died, not really necessary */ for(;;) { FD_ZERO(&fds); FD_SET(pw->readfrom, &fds); tmo.tv_usec=0; tmo.tv_sec = CHILDWAIT_TMO; selret = select(pw->readfrom+1, &fds, NULL, NULL, &tmo); if (selret < 0) { if (errno != EINTR) { warning("Unable to select on dying child file descriptor, " "errno = %d.",errno); break; } } else if (selret == 0) { warning("Timeout waiting for child process to die, " "ignoring child (pid = %d).", pw->pid); break; } else { int ret; if ((ret = read(pw->readfrom, buff, 1024)) < 0) { if (errno != EINTR) { warning("Child file descriptor not closed properly, " "errno = %d", errno); break; } } else if (ret == 0) { break; } /* continue */ } } /* Waiting is done by signal handler... */ close(pw->readfrom); close(pw->writeto); pw->state = WORKER_EMPTY; /* Leave rest as is... */ } static void kill_all_workers(void) /* Emergency function, will not check that the children died... */ { int i; for (i = 0; i < num_busy_workers; ++i) { kill(busy_workers[i].pid, SIGUSR1); } for (i = 0; i < num_free_workers; ++i) { kill(free_workers[i].pid, SIGUSR1); } for (i = 0; i < num_stalled_workers; ++i) { kill(stalled_workers[i].pid, SIGUSR1); } } #endif /* !WIN32 */ static Worker *pick_worker(void) { Worker tmp; if (num_free_workers > 0) { --num_free_workers; tmp = free_workers[num_free_workers]; } else if (num_stalled_workers > 0) { /* "restart" the worker... */ --num_stalled_workers; kill_worker(&(stalled_workers[num_stalled_workers])); if (create_worker(&tmp,0) < 0) { warning("Unable to create worker process, insufficient " "resources"); return NULL; } } else { if (num_busy_workers == max_workers) { return NULL; } if (create_worker(&tmp,0) < 0) { warning("Unable to create worker process, insufficient " "resources"); return NULL; } } /* tmp contains a worker now, make it busy and put it in the right array */ tmp.state = WORKER_BUSY; busy_workers[num_busy_workers] = tmp; ++num_busy_workers; return &(busy_workers[num_busy_workers-1]); } static Worker *pick_worker_greedy(AddrByte *domainbuff) { int i; int ql = 0; int found = -1; for (i=0; i < num_busy_workers; ++i) { if (domaineq(busy_workers[i].domain, domainbuff)) { if ((found < 0) || (busy_workers[i].que_size < busy_workers[found].que_size)) { found = i; ql = busy_workers[i].que_size; } } } if (found >= 0) { return &busy_workers[found]; } return NULL; } static void restart_worker(Worker *w) { kill_worker(w); if (create_worker(w,1) < 0) { fatal("Unable to create worker process, insufficient resources"); } } static void kill_last_picked_worker(void) { kill_worker( &(busy_workers[num_busy_workers-1])); --num_busy_workers; } /* * Starts a request qued to a specific worker, check_que starts normally queued requests. * We expect a que here... */ static void start_que_request(Worker *w) { QueItem *qi; SerialType save_serial; if (!w->que_first || !w->que_size) { fatal("Expected que'd requests but found none, " "internal datastructure corrupted!"); } qi = w->que_first; w->que_first = w->que_first->next; if (!w->que_first) { w->que_last = NULL; } --(w->que_size); save_serial = get_serial(qi->request); #ifdef WIN32 while (send_mes_to_worker(qi, w) != 0) { restart_worker(w); } #else while (send_request_to_worker(qi->request, qi->req_size, w) != 0) { restart_worker(w); } #endif w->serial = save_serial; DEBUGF(3,("Did deque serial %d from worker[%ld] specific que, " "Que is %sempty", get_serial(qi->request), (long) w->pid, (w->que_first) ? "not " : "")); #ifndef WIN32 FREE(qi); #endif } #ifndef WIN32 /* Signal utilities */ static RETSIGTYPE (*sys_sigset(int sig, RETSIGTYPE (*func)(int)))(int) { struct sigaction act, oact; sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = func; sigaction(sig, &act, &oact); return(oact.sa_handler); } static void sys_sigblock(int sig) { sigset_t mask; sigemptyset(&mask); sigaddset(&mask, sig); sigprocmask(SIG_BLOCK, &mask, (sigset_t *)NULL); } static void sys_sigrelease(int sig) { sigset_t mask; sigemptyset(&mask); sigaddset(&mask, sig); sigprocmask(SIG_UNBLOCK, &mask, (sigset_t *)NULL); } /* Child signal handler */ void reap_children(int ignored) { int res; sys_sigblock(SIGCHLD); for (;;) { while ((res = waitpid((pid_t)-1, NULL, WNOHANG)) > 0) ; if (!(res < 0 && errno == EAGAIN)) { DEBUGF(4,("reap_children: res = %d, errno = %d.",res,errno)); break; } } sys_sigrelease(SIGCHLD); } static void init_signals(void) { sys_sigset(SIGCHLD,&reap_children); /* SIG_IGN would give same result on most (?) platforms. */ sys_sigset(SIGPIPE, SIG_IGN); } #endif static void stall_worker(int ndx) { --num_busy_workers; stalled_workers[num_stalled_workers] = busy_workers[ndx]; stalled_workers[num_stalled_workers].state = WORKER_STALLED; busy_workers[ndx] = busy_workers[num_busy_workers]; DEBUGF(3, ("Stalled worker[%ld]", (long) stalled_workers[num_stalled_workers].pid)); ++num_stalled_workers; } /* * Main loop message passing */ #ifndef WIN32 static int read_request(AddrByte **buff, size_t *buff_size) { int siz; int r; if ((r = READ_PACKET_BYTES(0,&siz)) != PACKET_BYTES) { if (r == 0) { return 0; } else { fatal("Unexpected end of file on main input, errno = %d",errno); } } if (siz > *buff_size) { if (buff_size == 0) { *buff = ALLOC((*buff_size = siz)); } else { *buff = REALLOC(*buff, (*buff_size = siz)); } } if (read_exact(0,*buff, siz) != siz) { fatal("Unexpected end of file on main input, errno = %d",errno); } if (siz < 5) { fatal("Unexpected message on main input, message size %d less " "than minimum."); } return siz; } #endif /* !WIN32 */ static OpType get_op(AddrByte *buff) { return (OpType) buff[4]; } static AddrByte *get_op_addr(AddrByte *buff) { return buff + 4; } static SerialType get_serial(AddrByte *buff) { return get_int32(buff); } static ProtoType get_proto(AddrByte *buff) { return (ProtoType) buff[5]; } static CtlType get_ctl(AddrByte *buff) { return (CtlType) buff[5]; } static AddrByte *get_data(AddrByte *buff) { return buff + 6; } static int get_debug_level(AddrByte *buff) { return get_int32(buff + 6); } #ifdef WIN32 static int send_mes_to_worker(QueItem *m, Worker *pw) { if (!enque_mesq(pw->writeto, m)) { warning("Unable to send to child process."); return -1; } return 0; } #else static int send_request_to_worker(AddrByte *pr, int rsize, Worker *pw) { AddrByte hdr[PACKET_BYTES]; PUT_PACKET_BYTES(hdr, rsize); if (write_exact(pw->writeto, hdr, PACKET_BYTES) < 0) { warning("Unable to write to child process."); return -1; } if (write_exact(pw->writeto, (AddrByte *) pr, rsize) < 0) { warning("Unable to write to child process."); return -1; } return 0; } #endif /* !WIN32 */ #ifdef WIN32 static int relay_reply(Worker *pw) { QueItem *m; if (!deque_mesq(pw->readfrom,&m)) { return 0; } if (!enque_mesq(to_erlang,m)) { FREE(m); return 0; } return 1; } static int ignore_reply(Worker *pw) { QueItem *m; if (!deque_mesq(pw->readfrom,&m)) { return 0; } FREE(m); return 1; } #else /* Static buffers used by the next three functions */ static AddrByte *relay_buff = NULL; static int relay_buff_size = 0; static int fillin_reply(Worker *pw) { int length; if (READ_PACKET_BYTES(pw->readfrom, &length) != PACKET_BYTES) { warning("Malformed reply (header) from worker process %d.", pw->pid); return -1; } if (relay_buff_size < (length + PACKET_BYTES)) { if (!relay_buff_size) { relay_buff = ALLOC((relay_buff_size = (length + PACKET_BYTES))); } else { relay_buff = REALLOC(relay_buff, (relay_buff_size = (length + PACKET_BYTES))); } } PUT_PACKET_BYTES(relay_buff, length); if (read_exact(pw->readfrom, relay_buff + PACKET_BYTES, length) != length) { warning("Malformed reply (data) from worker process %d.", pw->pid); return -1; } return length; } static int relay_reply(Worker *pw) { int length = fillin_reply(pw); /* Filled into the "global" buffer */ int res; if (length < 0) { return -1; } if ((res = write_exact(1, relay_buff, length + PACKET_BYTES)) < 0) { fatal("Cannot write reply to erlang process, errno = %d.", errno); } else if (res == 0) { DEBUGF(1,("Erlang has closed write pipe.")); return 0; } return length; } static int ignore_reply(Worker *pw) { return fillin_reply(pw); } #endif /* !WIN32 */ /* * Domain name "parsing" and worker specific queing */ static void domaincopy(AddrByte *out, AddrByte *in) { AddrByte *ptr = out; *ptr++ = *in++; *ptr++ = *in++; switch(*out) { case OP_GETHOSTBYNAME: while(*in != '\0' && *in != '.') ++in; strncpy((char*)ptr, (char*)in, DOMAINNAME_MAX-2); ptr[DOMAINNAME_MAX-3] = '\0'; DEBUGF(4,("Saved domainname %s.", ptr)); return; case OP_GETHOSTBYADDR: memcpy(ptr,in, ((out[1] == PROTO_IPV4) ? UNIT_IPV4 : UNIT_IPV6) - 1); DEBUGF(4, ("Saved domain address: %s.", format_address(((out[1] == PROTO_IPV4) ? UNIT_IPV4 : UNIT_IPV6) - 1,ptr))); return; default: fatal("Trying to copy buffer not containing valid domain, [%d,%d].", (int) out[0], (int) out[1]); } } static int domaineq(AddrByte *d1, AddrByte *d2) { if (d1[0] != d2[0] || d1[1] != d2[1]) { return 0; } switch (d1[0]) { case OP_GETHOSTBYNAME: return !strcmp((char*)d1+2,(char*)d2+2); case OP_GETHOSTBYADDR: return !memcmp(d1+2,d2+2, ((d1[1] == PROTO_IPV4) ? UNIT_IPV4 : UNIT_IPV6) - 1); default: fatal("Trying to compare buffers not containing valid domain, " "[%d,%d].", (int) d1[0], (int) d1[1]); return -1; /* Lint... */ } } static int get_domainname(AddrByte *inbuff, int insize, AddrByte *domainbuff) { OpType op = get_op(inbuff); ProtoType proto; int i; AddrByte *data; data = get_data(inbuff); switch (op) { case OP_GETHOSTBYNAME: data = get_data(inbuff); for (i = (data - inbuff); i < insize && inbuff[i] != '\0'; ++i) ; if (i < insize) { domaincopy(domainbuff, get_op_addr(inbuff)); return 0; } DEBUGF(3, ("Could not pick valid domainname in " "gethostbyname operation")); return -1; case OP_GETHOSTBYADDR: proto = get_proto(inbuff); i = insize - (data - inbuff); if ((proto == PROTO_IPV4 && i == UNIT_IPV4) || (proto == PROTO_IPV6 && i == UNIT_IPV6)) { /* An address buffer */ domaincopy(domainbuff, get_op_addr(inbuff)); return 0; } DEBUGF(3, ("Could not pick valid domainname in gethostbyaddr " "operation")); return -1; default: DEBUGF(2, ("Could not pick valid domainname because of " "invalid opcode %d.", (int) op)); return -1; } } /* * Worker subprocesses with utilities */ #ifdef WIN32 static int create_worker(Worker *pworker, int save_que) { MesQ **thread_data = ALLOC(2*sizeof(MesQ *)); DWORD tid; if (!create_mesq(thread_data)) { fatal("Could not create, pipes for subprocess, errno = %d", GetLastError()); } if (!create_mesq(thread_data + 1)) { fatal("Could not create, pipes for subprocess, errno = %d", GetLastError()); } /* Save those before the thread starts */ pworker->writeto = thread_data[0]; pworker->readfrom = thread_data[1]; if (((HANDLE) _beginthreadex(NULL, 0, worker_loop, thread_data, 0, &tid)) == NULL) { fatal("Could not create thread errno = %d", GetLastError()); } pworker->pid = tid; pworker->state = WORKER_FREE; pworker->serial = INVALID_SERIAL; if (!save_que) { pworker->que_first = pworker->que_last = NULL; pworker->que_size = 0; } DEBUGF(3,("Created worker[%ld] with fd %d", (long) pworker->pid, (int) pworker->readfrom)); return 0; } #else static int create_worker(Worker *pworker, int save_que) { int p0[2], p1[2]; pid_t child; if (pipe(p0)) { warning("Could not create, pipes for subprocess, errno = %d", errno); return -1; } if (pipe(p1)) { warning("Could not create, pipes for subprocess, errno = %d", errno); close(p0[0]); close(p0[1]); return -1; } if ((child = fork()) < 0) { /* failure */ warning("Could not fork(), errno = %d", errno); close(p0[0]); close(p0[1]); close(p1[0]); close(p1[1]); return -1; } else if (child > 0) { /* parent */ close(p0[1]); close(p1[0]); pworker->writeto = p1[1]; pworker->readfrom = p0[0]; pworker->pid = child; pworker->state = WORKER_FREE; pworker->serial = INVALID_SERIAL; if (!save_que) { pworker->que_first = pworker->que_last = NULL; pworker->que_size = 0; } DEBUGF(3,("Created worker[%ld] with fd %d", (long) pworker->pid, (int) pworker->readfrom)); return 0; } else { /* child */ close(p1[1]); close(p0[0]); close_all_worker_fds(); /* Make "fatal" not find any children */ num_busy_workers = num_free_workers = num_stalled_workers = 0; if((dup2(p1[0],0) < 0) || (dup2(p0[1],1) < 0)) { fatal("Worker could not dup2(), errno = %d", errno); return -1; /* lint... */ } close(p1[0]); close(p0[1]); signal(SIGCHLD, SIG_IGN); return worker_loop(); } } static void close_all_worker_fds(void) { int w,i; Worker *workers[3] = {free_workers, busy_workers, stalled_workers}; int wsizes[3] = {num_free_workers, num_busy_workers, num_stalled_workers}; for (w = 0; w < 3; ++w) { for (i = 0; i < wsizes[w]; ++i) { if (workers[w][i].state != WORKER_EMPTY) { close(workers[w][i].readfrom); close(workers[w][i].writeto); } } } } #endif /* !WIN32 */ #ifdef WIN32 DWORD WINAPI worker_loop(void *v) #else static int worker_loop(void) #endif { AddrByte *req = NULL; size_t req_size = 0; int this_size; AddrByte *reply = NULL; size_t reply_size = 0; size_t data_size; #ifdef WIN32 QueItem *m = NULL; MesQ *readfrom = ((MesQ **) v)[0]; MesQ *writeto = ((MesQ **) v)[1]; /* XXX:PaN */ FREE(v); #endif for(;;) { #ifdef HAVE_GETADDRINFO struct addrinfo *ai = NULL; #endif struct hostent *he = NULL; #ifdef HAVE_GETNAMEINFO struct sockaddr *sa = NULL; char name[NI_MAXHOST]; #endif #if defined(HAVE_GETIPNODEBYNAME) || defined(HAVE_GETIPNODEBYADDR) int free_he = 0; #endif int error_num = 0; SerialType serial; OpType op; ProtoType proto; AddrByte *data; #ifdef WIN32 WaitForSingleObject(event_mesq(readfrom),INFINITE); DEBUGF(4,("Worker got data on message que.")); if(!deque_mesq(readfrom,&m)) { goto fail; } this_size = m->req_size; req = m->request; #else if (READ_PACKET_BYTES(0,&this_size) != PACKET_BYTES) { DEBUGF(2,("Worker got error/EOF while reading size, exiting.")); exit(0); } if (this_size > req_size) { if (req == NULL) { req = ALLOC((req_size = this_size)); } else { req = REALLOC(req, (req_size = this_size)); } } if (read_exact(0, req, (size_t) this_size) != this_size) { DEBUGF(1,("Worker got EOF while reading data, exiting.")); exit(0); } #endif /* Decode the request... */ serial = get_serial(req); if (OP_CONTROL == (op = get_op(req))) { CtlType ctl; if (serial != INVALID_SERIAL) { DEBUGF(1, ("Worker got invalid serial: %d.", serial)); exit(0); } switch (ctl = get_ctl(req)) { case SETOPT_DEBUG_LEVEL: debug_level = get_debug_level(req); DEBUGF(debug_level, ("Worker debug_level = %d.", debug_level)); break; } continue; } proto = get_proto(req); data = get_data(req); DEBUGF(4,("Worker got request, op = %d, proto = %d, data = %s.", op,proto,data)); /* Got a request, lets go... */ switch (op) { case OP_GETHOSTBYNAME: switch (proto) { #ifdef HAVE_IN6 case PROTO_IPV6: { /* switch (proto) { */ #ifdef HAVE_GETADDRINFO struct addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_flags = (AI_CANONNAME|AI_V4MAPPED|AI_ADDRCONFIG); hints.ai_socktype = SOCK_STREAM; hints.ai_family = AF_INET6; DEBUGF(5, ("Starting getaddrinfo(%s, ...)", data)); error_num = getaddrinfo((char *)data, NULL, &hints, &ai); DEBUGF(5,("getaddrinfo returned %d", error_num)); if (error_num) { error_num = map_netdb_error_ai(error_num); } #elif defined(HAVE_GETIPNODEBYNAME) /*#ifdef HAVE_GETADDRINFO */ DEBUGF(5,("Starting getipnodebyname(%s)",data)); he = getipnodebyname(data, AF_INET6, AI_DEFAULT, &error_num); if (he) { free_he = 1; error_num = 0; DEBUGF(5,("getipnodebyname(,AF_INET6,,) OK")); } else { DEBUGF(5,("getipnodebyname(,AF_INET6,,) error %d", error_num)); error_num = map_netdb_error(error_num); } #elif defined(HAVE_GETHOSTBYNAME2) /*#ifdef HAVE_GETADDRINFO */ DEBUGF(5,("Starting gethostbyname2(%s, AF_INET6)",data)); he = gethostbyname2((char*)data, AF_INET6); if (he) { error_num = 0; DEBUGF(5,("gethostbyname2(, AF_INET6) OK")); } else { error_num = map_netdb_error(h_errno); DEBUGF(5,("gethostbyname2(, AF_INET6) error %d", h_errno)); } #else error_num = ERRCODE_NOTSUP; #endif /*#ifdef HAVE_GETADDRINFO */ } break; #endif /*ifdef HAVE_IN6 */ case PROTO_IPV4: { /* switch (proto) { */ DEBUGF(5,("Starting gethostbyname(%s)",data)); he = gethostbyname((char*)data); if (he) { error_num = 0; DEBUGF(5,("gethostbyname OK")); } else { error_num = map_netdb_error(h_errno); DEBUGF(5,("gethostbyname error %d", h_errno)); } } break; default: /* switch (proto) { */ /* Not supported... */ error_num = ERRCODE_NOTSUP; break; } /* switch (proto) { */ if (he) { data_size = build_reply(serial, he, &reply, &reply_size); #ifdef HAVE_GETIPNODEBYNAME if (free_he) { freehostent(he); } #endif #ifdef HAVE_GETADDRINFO } else if (ai) { data_size = build_reply_ai(serial, 16, ai, &reply, &reply_size); freeaddrinfo(ai); #endif } else { data_size = build_error_reply(serial, error_num, &reply, &reply_size); } break; /* case OP_GETHOSTBYNAME: */ case OP_GETHOSTBYADDR: /* switch (op) { */ switch (proto) { #ifdef HAVE_IN6 case PROTO_IPV6: { #ifdef HAVE_GETNAMEINFO struct sockaddr_in6 *sin; socklen_t salen = sizeof(*sin); sin = ALLOC(salen); #ifndef NO_SA_LEN sin->sin6_len = salen; #endif sin->sin6_family = AF_INET6; sin->sin6_port = 0; memcpy(&sin->sin6_addr, data, 16); sa = (struct sockaddr *)sin; DEBUGF(5,("Starting getnameinfo(,,%s,16,,,)", format_address(16, data))); error_num = getnameinfo(sa, salen, name, sizeof(name), NULL, 0, NI_NAMEREQD); DEBUGF(5,("getnameinfo returned %d", error_num)); if (error_num) { error_num = map_netdb_error_ai(error_num); sa = NULL; } #elif defined(HAVE_GETIPNODEBYADDR) /*#ifdef HAVE_GETNAMEINFO*/ struct in6_addr ia; memcpy(ia.s6_addr, data, 16); DEBUGF(5,("Starting getipnodebyaddr(%s,16,AF_INET6,)", format_address(16, data))); he = getipnodebyaddr(&ia, 16, AF_INET6, &error_num); free_he = 1; if (! he) { DEBUGF(5,("getipnodebyaddr error %d", error_num)); error_num = map_netdb_error(error_num); } else { DEBUGF(5,("getipnodebyaddr OK")); } #else /*#ifdef HAVE_GETNAMEINFO*/ struct in6_addr ia; memcpy(ia.s6_addr, data, 16); DEBUGF(5,("Starting gethostbyaddr(%s,16,AF_INET6)", format_address(16, data))); he = gethostbyaddr((const char *) &ia, 16, AF_INET6); if (! he) { error_num = map_netdb_error(h_errno); DEBUGF(5,("gethostbyaddr error %d", h_errno)); } else { DEBUGF(5,("gethostbyaddr OK")); } #endif /* #ifdef HAVE_GETNAMEINFO */ } break; /* case PROTO_IPV6: { */ #endif /* #ifdef HAVE_IN6 */ case PROTO_IPV4: { /* switch(proto) { */ struct in_addr ia; memcpy(&ia.s_addr, data, 4); /* Alignment required... */ DEBUGF(5,("Starting gethostbyaddr(%s,4,AF_INET)", format_address(4, data))); he = gethostbyaddr((const char *) &ia, 4, AF_INET); if (! he) { error_num = map_netdb_error(h_errno); DEBUGF(5,("gethostbyaddr error %d", h_errno)); } else { DEBUGF(5,("gethostbyaddr OK")); } } break; default: error_num = ERRCODE_NOTSUP; } /* switch(proto) { */ if (he) { data_size = build_reply(serial, he, &reply, &reply_size); #ifdef HAVE_GETIPNODEBYADDR if (free_he) { freehostent(he); } #endif #ifdef HAVE_GETNAMEINFO } else if (sa) { struct addrinfo res; memset(&res, 0, sizeof(res)); res.ai_canonname = name; res.ai_addr = sa; res.ai_next = NULL; data_size = build_reply_ai(serial, 16, &res, &reply, &reply_size); free(sa); #endif } else { data_size = build_error_reply(serial, error_num, &reply, &reply_size); } break; /* case OP_GETHOSTBYADR: */ default: data_size = build_error_reply(serial, ERRCODE_NOTSUP, &reply, &reply_size); break; } /* switch (op) { */ #ifdef WIN32 m = REALLOC(m, sizeof(QueItem) - 1 + data_size - PACKET_BYTES); m->next = NULL; m->req_size = data_size - PACKET_BYTES; memcpy(m->request,reply + PACKET_BYTES,data_size - PACKET_BYTES); if (!enque_mesq(writeto,m)) { goto fail; } m = NULL; #else write(1, reply, data_size); /* No signals expected */ #endif } /* for (;;) */ #ifdef WIN32 fail: if (m != NULL) { FREE(m); } close_mesq(readfrom); close_mesq(writeto); if (reply) { FREE(reply); } return 1; #endif } static int map_netdb_error(int netdb_code) { switch (netdb_code) { #ifdef HOST_NOT_FOUND case HOST_NOT_FOUND: return ERRCODE_HOST_NOT_FOUND; #endif #ifdef TRY_AGAIN case TRY_AGAIN: return ERRCODE_TRY_AGAIN; #endif #ifdef NO_RECOVERY case NO_RECOVERY: return ERRCODE_NO_RECOVERY; #endif #if defined(NO_DATA) || defined(NO_ADDRESS) #ifdef NO_DATA case NO_DATA: #endif #ifdef NO_ADDRESS #if !defined(NO_DATA) || (NO_DATA != NO_ADDRESS) case NO_ADDRESS: #endif #endif return ERRCODE_NO_DATA; #endif default: return ERRCODE_NETDB_INTERNAL; } } #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) static int map_netdb_error_ai(int netdb_code) { switch(netdb_code) { #ifdef EAI_ADDRFAMILY case EAI_ADDRFAMILY: return ERRCODE_NETDB_INTERNAL; #endif case EAI_AGAIN: return ERRCODE_TRY_AGAIN; case EAI_BADFLAGS: return ERRCODE_NETDB_INTERNAL; case EAI_FAIL: return ERRCODE_HOST_NOT_FOUND; case EAI_FAMILY: return ERRCODE_NETDB_INTERNAL; case EAI_MEMORY: return ERRCODE_NETDB_INTERNAL; #if defined(EAI_NODATA) && EAI_NODATA != EAI_NONAME case EAI_NODATA: return ERRCODE_HOST_NOT_FOUND; #endif case EAI_NONAME: return ERRCODE_HOST_NOT_FOUND; case EAI_SERVICE: return ERRCODE_NETDB_INTERNAL; case EAI_SOCKTYPE: return ERRCODE_NETDB_INTERNAL; default: return ERRCODE_NETDB_INTERNAL; } } #endif /* #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) */ static char *errcode_to_string(int errcode) { switch (errcode) { case ERRCODE_NOTSUP: return "enotsup"; case ERRCODE_HOST_NOT_FOUND: /* * I would preffer * return "host_not_found"; * but have to keep compatibility with the old * inet_gethost's error codes... */ return "notfound"; case ERRCODE_TRY_AGAIN: return "try_again"; case ERRCODE_NO_RECOVERY: return "no_recovery"; case ERRCODE_NO_DATA: return "no_data"; default: /*case ERRCODE_NETDB_INTERNAL:*/ return "netdb_internal"; } } static size_t build_error_reply(SerialType serial, int errnum, AddrByte **preply, size_t *preply_size) { char *errstring = errcode_to_string(errnum); int string_need = strlen(errstring) + 1; /* a '\0' too */ unsigned need; AddrByte *ptr; need = PACKET_BYTES + 4 /* Serial */ + 1 /* Unit */ + string_need; if (*preply_size < need) { if (*preply_size == 0) { *preply = ALLOC((*preply_size = need)); } else { *preply = REALLOC(*preply, (*preply_size = need)); } } ptr = *preply; PUT_PACKET_BYTES(ptr,need - PACKET_BYTES); ptr += PACKET_BYTES; put_int32(ptr,serial); ptr +=4; *ptr++ = (AddrByte) 0; /* 4 or 16 */ strcpy((char*)ptr, errstring); return need; } static size_t build_reply(SerialType serial, struct hostent *he, AddrByte **preply, size_t *preply_size) { unsigned need; int strings_need; int num_strings; int num_addresses; int i; AddrByte *ptr; int unit = he->h_length; for (num_addresses = 0; he->h_addr_list[num_addresses] != NULL; ++num_addresses) ; strings_need = strlen(he->h_name) + 1; /* 1 for null byte */ num_strings = 1; if (he->h_aliases) { for(i=0; he->h_aliases[i] != NULL; ++i) { strings_need += strlen(he->h_aliases[i]) + 1; ++num_strings; } } need = PACKET_BYTES + 4 /* Serial */ + 1 /* Unit */ + 4 /* Naddr */ + (unit * num_addresses) /* Address bytes */ + 4 /* Nnames */ + strings_need /* The name and alias strings */; if (*preply_size < need) { if (*preply_size == 0) { *preply = ALLOC((*preply_size = need)); } else { *preply = REALLOC(*preply, (*preply_size = need)); } } ptr = *preply; PUT_PACKET_BYTES(ptr,need - PACKET_BYTES); ptr += PACKET_BYTES; put_int32(ptr,serial); ptr +=4; *ptr++ = (AddrByte) unit; /* 4 or 16 */ put_int32(ptr, num_addresses); ptr += 4; for (i = 0; i < num_addresses; ++i) { memcpy(ptr, he->h_addr_list[i], unit); ptr += unit; } put_int32(ptr, num_strings); ptr += 4; strcpy((char*)ptr, he->h_name); ptr += 1 + strlen(he->h_name); for (i = 0; i < (num_strings - 1); ++i) { strcpy((char*)ptr, he->h_aliases[i]); ptr += 1 + strlen(he->h_aliases[i]); } return need; } #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) static size_t build_reply_ai(SerialType serial, int addrlen, struct addrinfo *res0, AddrByte **preply, size_t *preply_size) { struct addrinfo *res; int num_strings; int num_addresses; AddrByte *ptr; int need; num_addresses = 0; num_strings = 0; need = PACKET_BYTES + 4 /* Serial */ + 1 /* addrlen */ + 4 /* Naddr */ + 4 /* Nnames */; for (res = res0; res != NULL; res = res->ai_next) { if (res->ai_addr) { num_addresses++; need += addrlen; } if (res->ai_canonname) { num_strings++; need += strlen(res->ai_canonname) + 1; } } if (*preply_size < need) { if (*preply_size == 0) { *preply = ALLOC((*preply_size = need)); } else { *preply = REALLOC(*preply, (*preply_size = need)); } } ptr = *preply; PUT_PACKET_BYTES(ptr,need - PACKET_BYTES); ptr += PACKET_BYTES; put_int32(ptr,serial); ptr +=4; *ptr++ = (AddrByte) addrlen; /* 4 or 16 */ put_int32(ptr, num_addresses); ptr += 4; for (res = res0; res != NULL && num_addresses; res = res->ai_next) { if (res->ai_addr == NULL) continue; if (addrlen == 4) memcpy(ptr, &((struct sockaddr_in *)res->ai_addr)->sin_addr, addrlen); #ifdef AF_INET6 else if (addrlen == 16) memcpy(ptr, &((struct sockaddr_in6 *)res->ai_addr)->sin6_addr, addrlen); #endif else memcpy(ptr, res->ai_addr->sa_data, addrlen); ptr += addrlen; num_addresses--; } put_int32(ptr, num_strings); ptr += 4; for (res = res0; res != NULL && num_strings; res = res->ai_next) { if (res->ai_canonname == NULL) continue; strcpy((char *)ptr, res->ai_canonname); ptr += strlen(res->ai_canonname) + 1; num_strings--; } return need; } #endif /* #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) */ /* * Encode/decode/read/write */ static int get_int32(AddrByte *b) { int res; res = (unsigned) b[3]; res |= ((unsigned) b[2]) << 8; res |= ((unsigned) b[1]) << 16; res |= ((unsigned) b[0]) << 24; return res; } static void put_int32(AddrByte *buff, int value) { buff[0] = (((unsigned) value) >> 24) & 0xFF; buff[1] = (((unsigned) value) >> 16) & 0xFF; buff[2] = (((unsigned) value) >> 8) & 0xFF; buff[3] = ((unsigned) value) & 0xFF; } #ifdef WIN32 static int read_int32(HANDLE fd, int *res, HANDLE ev) { AddrByte b[4]; int r; if ((r = read_exact(fd,b,4,ev)) < 0) { return -1; } else if (r == 0) { return 0; } else { *res = (unsigned) b[3]; *res |= ((unsigned) b[2]) << 8; *res |= ((unsigned) b[1]) << 16; *res |= ((unsigned) b[0]) << 24; } return 4; } /* * The standard input is expected to be opened with FILE_FLAG_OVERLAPPED * but this code should handle both cases (although winsock might not). */ static int read_exact(HANDLE fd, void *vbuff, DWORD nbytes, HANDLE ev) { DWORD ret,got; BOOL stat; char *buff = vbuff; OVERLAPPED ov; DWORD err; got = 0; for(;;) { memset(&ov,0,sizeof(ov)); ov.hEvent = ev; ResetEvent(ov.hEvent); stat = ReadFile(fd, buff, nbytes - got, &ret, &ov); if (!stat) { if ((err = GetLastError()) == ERROR_IO_PENDING) { DEBUGF(4,("Overlapped read, waiting for completion...")); WaitForSingleObject(ov.hEvent,INFINITE); stat = GetOverlappedResult(fd,&ov,&ret,TRUE); DEBUGF(4,("Overlapped read, completed with status %d," " result %d",stat,ret)); } if (!stat) { if (GetLastError() == ERROR_BROKEN_PIPE) { DEBUGF(1, ("End of file while reading from pipe.")); return 0; } else { DEBUGF(1, ("Error while reading from pipe," " errno = %d", GetLastError())); return -1; } } } else { DEBUGF(4,("Read completed syncronously, result %d",ret)); } if (ret == 0) { DEBUGF(1, ("End of file detected as zero read from pipe.")); return 0; } if (ret < nbytes - got) { DEBUGF(4,("Not all data read from pipe, still %d bytes to read.", nbytes - (got + ret))); got += ret; buff += ret; } else { return nbytes; } } } /* * Now, we actually expect a HANDLE opened with FILE_FLAG_OVERLAPPED, * but this code should handle both cases (although winsock * does not always..) */ static int write_exact(HANDLE fd, AddrByte *buff, DWORD len, HANDLE ev) { DWORD res,stat; DWORD x = len; OVERLAPPED ov; DWORD err; for(;;) { memset(&ov,0,sizeof(ov)); ov.hEvent = ev; ResetEvent(ov.hEvent); stat = WriteFile(fd,buff,x,&res,&ov); if (!stat) { if ((err = GetLastError()) == ERROR_IO_PENDING) { DEBUGF(4,("Overlapped write, waiting for competion...")); WaitForSingleObject(ov.hEvent,INFINITE); stat = GetOverlappedResult(fd,&ov,&res,TRUE); DEBUGF(4,("Overlapped write, completed with status %d," " result %d",stat,res)); } if (!stat) { if (GetLastError() == ERROR_BROKEN_PIPE) { return 0; } else { return -1; } } } else { DEBUGF(4,("Write completed syncronously, result %d",res)); } if (res < x) { /* Microsoft states this can happen as HANDLE is a pipe... */ DEBUGF(4,("Not all data written to pipe, still %d bytes to write.", x - res)); x -= res; buff += res; } else { return len; } } } DWORD WINAPI reader(void *data) { MesQ *mq = (MesQ *) data; QueItem *m; int siz; int r; HANDLE inp; int x = 0; HANDLE ev = CreateEvent(NULL, TRUE, FALSE, NULL); inp = GetStdHandle(STD_INPUT_HANDLE); for (;;) { if ((r = READ_PACKET_BYTES(inp,&siz,ev)) != 4) { DEBUGF(1,("Erlang has closed (reading)")); exit(0); } DEBUGF(4,("Read packet of size %d from erlang",siz)); m = ALLOC(sizeof(QueItem) - 1 + siz); if (read_exact(inp, m->request, siz,ev) != siz) { fatal("Unexpected end of file on main input, errno = %d",errno); } if (siz < 5) { fatal("Unexpected message on main input, message size %d less " "than minimum."); } m->req_size = siz; m->next = NULL; if (!enque_mesq(mq, m)) { fatal("Reader could not talk to main thread!"); } } } DWORD WINAPI writer(void *data) { MesQ *mq = (MesQ *) data; QueItem *m; HANDLE outp = GetStdHandle(STD_OUTPUT_HANDLE); AddrByte hdr[PACKET_BYTES]; HANDLE ev = CreateEvent(NULL, TRUE, FALSE, NULL); for (;;) { WaitForSingleObject(event_mesq(mq),INFINITE); if (!deque_mesq(mq, &m)) { fatal("Writer could not talk to main thread!"); } PUT_PACKET_BYTES(hdr, m->req_size); if (write_exact(outp, hdr, 4, ev) != 4) { DEBUGF(1,("Erlang has closed (writing)")); exit(0); } if (write_exact(outp, m->request, m->req_size, ev) != m->req_size) { DEBUGF(1,("Erlang has closed (writing)")); exit(0); } FREE(m); } } #else static size_t read_int32(int fd, int *res) { AddrByte b[4]; int r; if ((r = read_exact(fd,b,4)) < 0) { return -1; } else if (r == 0) { return 0; } else { *res = (unsigned) b[3]; *res |= ((unsigned) b[2]) << 8; *res |= ((unsigned) b[1]) << 16; *res |= ((unsigned) b[0]) << 24; } return 4; } static ssize_t read_exact(int fd, void *vbuff, size_t nbytes) { ssize_t ret, got; char *buff = vbuff; got = 0; for(;;) { ret = read(fd, buff, nbytes - got); if (ret < 0) { if (errno == EINTR) { continue; } else { DEBUGF(1, ("Error while reading from pipe," " errno = %d", errno)); return -1; } } else if (ret == 0) { DEBUGF(1, ("End of file while reading from pipe.")); if (got == 0) { return 0; /* "Normal" EOF */ } else { return -1; } } else if (ret < nbytes - got) { got += ret; buff += ret; } else { return nbytes; } } } static int write_exact(int fd, AddrByte *buff, int len) { int res; int x = len; for(;;) { if((res = write(fd, buff, x)) == x) { break; } if (res < 0) { if (errno == EINTR) { continue; } else if (errno == EPIPE) { return 0; } #ifdef ENXIO else if (errno == ENXIO) { return 0; } #endif else { return -1; } } else { /* Hmmm, blocking write but not all written, could this happen if the other end was closed during the operation? Well, it costs very little to handle anyway... */ x -= res; buff += res; } } return len; } #endif /* !WIN32 */ /* * Debug and memory allocation */ static char *format_address(int siz, AddrByte *addr) { static char buff[50]; char tmp[10]; if (siz > 16) { return "(unknown)"; } *buff='\0'; if (siz <= 4) { while(siz--) { sprintf(tmp,"%d",(int) *addr++); strcat(buff,tmp); if(siz) { strcat(buff,"."); } } return buff; } while(siz--) { sprintf(tmp,"%02x",(int) *addr++); strcat(buff,tmp); if(siz) { strcat(buff,":"); } } return buff; } static void debugf(char *format, ...) { char buff[2048]; char *ptr; va_list ap; va_start(ap,format); #ifdef WIN32 sprintf(buff,"%s[%d] (DEBUG):",program_name,(int) GetCurrentThreadId()); #else sprintf(buff,"%s[%d] (DEBUG):",program_name,(int) getpid()); #endif ptr = buff + strlen(buff); erts_vsnprintf(ptr,sizeof(buff)-strlen(buff)-2,format,ap); strcat(ptr,"\r\n"); #ifdef WIN32 if (debug_console_allocated != INVALID_HANDLE_VALUE) { DWORD res; WriteFile(debug_console_allocated,buff,strlen(buff),&res,NULL); } #else write(2,buff,strlen(buff)); #endif va_end(ap); } static void warning(char *format, ...) { char buff[2048]; char *ptr; va_list ap; va_start(ap,format); sprintf(buff,"%s[%d]: WARNING:",program_name, (int) getpid()); ptr = buff + strlen(buff); erts_vsnprintf(ptr,sizeof(buff)-strlen(buff)-2,format,ap); strcat(ptr,"\r\n"); #ifdef WIN32 { DWORD res; WriteFile(GetStdHandle(STD_ERROR_HANDLE),buff,strlen(buff),&res,NULL); } #else write(2,buff,strlen(buff)); #endif va_end(ap); } static void fatal(char *format, ...) { char buff[2048]; char *ptr; va_list ap; va_start(ap,format); sprintf(buff,"%s[%d]: FATAL ERROR:",program_name, (int) getpid()); ptr = buff + strlen(buff); erts_vsnprintf(ptr,sizeof(buff)-strlen(buff)-2,format,ap); strcat(ptr,"\r\n"); #ifdef WIN32 { DWORD res; WriteFile(GetStdHandle(STD_ERROR_HANDLE),buff,strlen(buff),&res,NULL); } #else write(2,buff,strlen(buff)); #endif va_end(ap); #ifndef WIN32 kill_all_workers(); #endif exit(1); } static void *my_malloc(size_t size) { void *ptr = malloc(size); if (!ptr) { fatal("Cannot allocate %u bytes of memory.", (unsigned) size); return NULL; /* lint... */ } return ptr; } static void *my_realloc(void *old, size_t size) { void *ptr = realloc(old, size); if (!ptr) { fatal("Cannot reallocate %u bytes of memory from %p.", (unsigned) size, old); return NULL; /* lint... */ } return ptr; } #ifdef WIN32 BOOL create_mesq(MesQ **q) { MesQ *tmp = malloc(sizeof(MesQ)); tmp->data_present = CreateEvent(NULL, TRUE, FALSE,NULL); if (tmp->data_present == NULL) { free(tmp); return FALSE; } InitializeCriticalSection(&(tmp->crit)); /* Cannot fail */ tmp->shutdown = 0; tmp->first = NULL; tmp->last = NULL; *q = tmp; return TRUE; } BOOL enque_mesq(MesQ *q, QueItem *m) { EnterCriticalSection(&(q->crit)); if (q->shutdown) { LeaveCriticalSection(&(q->crit)); return FALSE; } if (q->last == NULL) { q->first = q->last = m; } else { q->last->next = m; q->last = m; } m->next = NULL; if (!SetEvent(q->data_present)) { fprintf(stderr,"Fatal: Unable to signal event in %s:%d, last error: %d\n", __FILE__,__LINE__,GetLastError()); exit(1); /* Unable to continue at all */ } LeaveCriticalSection(&(q->crit)); return TRUE; } BOOL deque_mesq(MesQ *q, QueItem **m) { EnterCriticalSection(&(q->crit)); if (q->first == NULL) { /* Usually shutdown from other end */ ResetEvent(q->data_present); LeaveCriticalSection(&(q->crit)); return FALSE; } *m = q->first; q->first = q->first->next; if (q->first == NULL) { q->last = NULL; ResetEvent(q->data_present); } (*m)->next = NULL; LeaveCriticalSection(&(q->crit)); return TRUE; } BOOL close_mesq(MesQ *q) { QueItem *tmp; EnterCriticalSection(&(q->crit)); if (!q->shutdown) { q->shutdown = TRUE; if (!SetEvent(q->data_present)) { fprintf(stderr, "Fatal: Unable to signal event in %s:%d, last error: %d\n", __FILE__,__LINE__,GetLastError()); exit(1); /* Unable to continue at all */ } LeaveCriticalSection(&(q->crit)); return FALSE; } /* Noone else is supposed to use this object any more */ LeaveCriticalSection(&(q->crit)); DeleteCriticalSection(&(q->crit)); CloseHandle(q->data_present); tmp = q->first; while(tmp) { q->first = q->first->next; free(tmp); tmp = q->first; } free(q); return TRUE; } HANDLE event_mesq(MesQ *q) { return q->data_present; } #ifdef HARDDEBUG DWORD WINAPI pseudo_worker_loop(void *v) { HOSTENT *hep; DEBUGF(1,("gethostbyname(\"ftp.funet.fi\") starting")); hep = gethostbyname("ftp.funet.fi"); DEBUGF(1,("gethostbyname(\"ftp.funet.fi\") -> %d OK",(int) hep)); return 0; } static void poll_gethost(int row) { HANDLE h; DWORD tid; h = (HANDLE) _beginthreadex(NULL, 0, pseudo_worker_loop, NULL, 0, &tid); if (h == NULL) { DEBUGF(1,("Failed to spawn pseudo worker (%d)...",row)); } else { DEBUGF(1,("Waiting for pseudo worker (%d)", row)); WaitForSingleObject(h,INFINITE); DEBUGF(1,("Done Waiting for pseudo worker (%d)", row)); } } #endif #endif /* WIN32 */