From 84adefa331c4159d432d22840663c38f155cd4c1 Mon Sep 17 00:00:00 2001 From: Erlang/OTP Date: Fri, 20 Nov 2009 14:54:40 +0000 Subject: The R13B03 release. --- erts/etc/common/inet_gethost.c | 2757 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 2757 insertions(+) create mode 100644 erts/etc/common/inet_gethost.c (limited to 'erts/etc/common/inet_gethost.c') diff --git a/erts/etc/common/inet_gethost.c b/erts/etc/common/inet_gethost.c new file mode 100644 index 0000000000..ff16ee02c4 --- /dev/null +++ b/erts/etc/common/inet_gethost.c @@ -0,0 +1,2757 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 1998-2009. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ +/* + * Erlang port program to do the name service lookup for the erlang + * distribution and inet part of the kernel. + * A pool of subprocess is kept, to which a pair of pipes is connected. + * The main process schedules requests among the different subprocesses + * (created with fork()), to be able to handle as many requests as possible + * simultaneously. The controlling erlang machine may request a "cancel", + * in which case the process may be killed and restarted when the need arises. + * The single numeric parameter to this program is the maximum port pool size, + * which is the size of the bookkeeping array. + * + * Windows: + * There is instead of a pool of processes a pool of threads. + * Communication is not done through pipes but via message queues between + * the threads. The only "pipes" involved are the ones used for communicating + * with Erlang. + * Important note: + * For unknown reasons, the combination of a thread doing blocking I/O on + * a named pipe at the same time as another thread tries to resolve a hostname + * may (with certain software configurations) block the gethostbyname call (!) + * For that reason, standard input (and standard output) should be opened + * in asynchronous mode (FILE_FLAG_OVERLAPPED), which has to be done by Erlang. + * A special flag to open_port is used to work around this behaviour in winsock + * and the threads doing read and write handle asynchronous I/O. + * The ReadFile and WriteFile calls try to cope with both types of I/O, why + * the code is not really as Microsoft describes "the right way to do it" in + * their documentation. Important to note is that **there is no supported way + * to retrieve the information if the HANDLE was opened with + * FILE_FLAG_OVERLAPPED from the HANDLE itself**. + * + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#ifdef WIN32 + +#define WIN32_LEAN_AND_MEAN +#include +#include +#include +#include +#include + +/* These are not used even if they would exist which they should not */ +#undef HAVE_GETADDRINFO +#undef HAVE_GETIPNODEBYNAME +#undef HAVE_GETHOSTBYNAME2 +#undef HAVE_GETNAMEINFO +#undef HAVE_GETIPNODEBYADDR + +#else /* Unix */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef HAVE_SYS_TIME_H +#include +#else +#include +#endif +#include + +#ifndef RETSIGTYPE +#define RETSIGTYPE void +#endif + +/* To simplify #ifdef code further down - select only one to be defined... +** Use them in pairs - if one is broken do not trust its mate. +**/ +#if defined(HAVE_GETADDRINFO) && defined(HAVE_GETNAMEINFO) +#undef HAVE_GETIPNODEBYNAME +#undef HAVE_GETIPNODEBYADDR +#undef HAVE_GETHOSTBYNAME2 +#elif defined(HAVE_GETIPNODEBYNAME) && defined(HAVE_GETIPNODEBYADDR) +#undef HAVE_GETADDRINFO +#undef HAVE_GETNAMEINFO +#undef HAVE_GETHOSTBYNAME2 +#else +#undef HAVE_GETIPNODEBYNAME +#undef HAVE_GETIPNODEBYADDR +#undef HAVE_GETADDRINFO +#undef HAVE_GETNAMEINFO +#endif + +#endif /* !WIN32 */ + +#define PACKET_BYTES 4 +#ifdef WIN32 +#define READ_PACKET_BYTES(X,Y,Z) read_int32((X),(Y),(Z)) +#else +#define READ_PACKET_BYTES(X,Y) read_int32((X),(Y)) +#endif +#define PUT_PACKET_BYTES(X,Y) put_int32((X),(Y)) +/* The serial numbers of the requests */ +typedef int SerialType; + +#define INVALID_SERIAL -1 + +/* The operations performed by this program */ +typedef unsigned char OpType; + +#define OP_GETHOSTBYNAME 1 +#define OP_GETHOSTBYADDR 2 +#define OP_CANCEL_REQUEST 3 +#define OP_CONTROL 4 + +/* The protocol (IPV4/IPV6) */ +typedef unsigned char ProtoType; + +#define PROTO_IPV4 1 +#define PROTO_IPV6 2 + +/* OP_CONTROL */ +typedef unsigned char CtlType; +#define SETOPT_DEBUG_LEVEL 0 + +/* The unit of an IP address (0 == error, 4 == IPV4, 16 == IPV6) */ +typedef unsigned char UnitType; + +#define UNIT_ERROR 0 +#define UNIT_IPV4 4 +#define UNIT_IPV6 16 + +/* And the byte type */ +typedef unsigned char AddrByte; /* Must be compatible with character + datatype */ + +/* + * Marshalled format of request: + *{ + * Serial: 32 bit big endian + * Op:8 bit [1,2,3] + * If op == 1 { + * Proto:8 bit [1,2] + * Str: Null terminated array of characters + * } Else if op == 2 { + * Proto:8 bit [1,2] + * If proto == 1 { + * B0..B3: 4 bytes, most significant first + * } Else (proto == 2) { + * B0..B15: 16 bytes, most significant first + * } + * } + * (No more if op == 3) + *} + * The request arrives as a packet, with 4 packet size bytes. + */ + +/* The main process unpackes the marshalled message and sends the data + * to a suitable port process or, in the case of a close request, kills the + * suitable port process. There is also a que of requests linked together, + * for when all subrocesses are busy. + */ + +typedef struct QueItem { + struct QueItem *next; + int req_size; + AddrByte request[1]; +} QueItem; /* Variable size due to request's variable size */ + +QueItem *que_first; +QueItem *que_last; + +#ifdef WIN32 +typedef struct mesq { + HANDLE data_present; + CRITICAL_SECTION crit; + int shutdown; + QueItem *first; + QueItem *last; +} MesQ; + +MesQ *to_erlang; +MesQ *from_erlang; +#endif + +/* + * Marshalled format of reply: + *{ + * Serial: 32 bit big endian + * Unit: 8 bit, same as h_length or 0 for error + * if unit == 0 { + * Str: Null terminated character string explaining the error + * } else { + * Naddr: 32 bit big endian + * if unit = 4 { + * (B0..B3)0..(B0..B3)Naddr-1: Naddr*4 bytes most significant first + * } else if unit == 16 { + * (B0..B15)0..(B0..B15)Naddr-1: Naddr*16 bytes most significant first + * } + * Nnames: 32 bit big endian >= 1 + * Name0: Null terminated string of characters + * Alias[0]..Alias[Nnames - 2]: Nnames - 1 Null terminated strings of chars + * } + *} + * Four packet size bytes prepended (big endian) + */ +/* Internal error codes */ +#define ERRCODE_NOTSUP 1 +#define ERRCODE_HOST_NOT_FOUND 2 +#define ERRCODE_TRY_AGAIN 3 +#define ERRCODE_NO_RECOVERY 4 +#define ERRCODE_NO_DATA 5 +#define ERRCODE_NETDB_INTERNAL 7 + +/* + * Each worker process is represented in the parent by the following struct + */ + +typedef unsigned WorkerState; + +#define WORKER_EMPTY 0 /* No process created */ +#define WORKER_FREE 1 /* Living waiting process */ +#define WORKER_BUSY 2 /* Living busy process */ +#define WORKER_STALLED 3 /* Living cancelled process */ + +/* The timeout when killing a child process in seconds*/ +#define CHILDWAIT_TMO 1 +/* The domainname size_limit */ +#define DOMAINNAME_MAX 258 /* 255 + Opcode + Protocol + Null termination */ + +typedef struct { + WorkerState state; +#ifdef WIN32 + DWORD pid; /* 0 if unused */ + MesQ *writeto; /* Message queues */ + MesQ *readfrom; +#else + pid_t pid; /* -1 if unused */ + int writeto, readfrom; /* Pipes */ +#endif + SerialType serial; + AddrByte domain[DOMAINNAME_MAX]; + QueItem *que_first; + QueItem *que_last; + int que_size; +} Worker; + +int num_busy_workers; +int num_free_workers; +int num_stalled_workers; +int max_workers; +int greedy_threshold; +Worker *busy_workers; /* Workers doing any job that someone really is + interested in */ +Worker *free_workers; /* Really free workers */ +Worker *stalled_workers; /* May still deliver answers which we will + discard */ +#define BEE_GREEDY() (num_busy_workers >= greedy_threshold) + +static char *program_name; + +static int debug_level; +#ifdef WIN32 +static HANDLE debug_console_allocated = INVALID_HANDLE_VALUE; +#endif + +#ifdef NODEBUG +#define DEBUGF(L,P) /* Nothing */ +#else +#define DEBUGF(Level,Printf) do { if (debug_level >= (Level)) \ + debugf Printf;} while(0) +#endif +#define ALLOC(Size) my_malloc(Size) +#define REALLOC(Old, Size) my_realloc((Old), (Size)) +#define FREE(Ptr) free(Ptr) + +#ifdef WIN32 +#define WAKEUP_WINSOCK() do { \ + char dummy_buff[100]; \ + gethostname(dummy_buff,99); \ +} while (0) +#endif + +/* The internal prototypes */ +static char *format_address(int siz, AddrByte *addr); +static void debugf(char *format, ...); +static void warning(char *format, ...); +static void fatal(char *format, ...); +static void *my_malloc(size_t size); +static void *my_realloc(void *old, size_t size); +static int get_int32(AddrByte *buff); +static void put_int32(AddrByte *buff, int value); +static int create_worker(Worker *pworker, int save_que); +static int map_netdb_error(int netdb_code); +#if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) +static int map_netdb_error_ai(int netdb_code); +#endif +static char *errcode_to_string(int errcode); +static size_t build_error_reply(SerialType serial, int errnum, + AddrByte **preply, + size_t *preply_size); +#ifdef HAVE_GETADDRINFO +static size_t build_reply_ai(SerialType serial, int, struct addrinfo *, + AddrByte **preply, size_t *preply_size); +#endif +static size_t build_reply(SerialType serial, struct hostent *he, + AddrByte **preply, size_t *preply_size); +static int read_request(AddrByte **buff, size_t *buff_size); +static OpType get_op(AddrByte *buff); +static AddrByte *get_op_addr(AddrByte *buff); +static SerialType get_serial(AddrByte *buff); +static ProtoType get_proto(AddrByte *buff); +static CtlType get_ctl(AddrByte *buff); +static AddrByte *get_data(AddrByte *buff); +static int get_debug_level(AddrByte *buff); +static int relay_reply(Worker *pw); +static int ignore_reply(Worker *pw); +static void init_workers(int max); +static void kill_worker(Worker *pw); +static Worker *pick_worker(void); +static void kill_last_picked_worker(void); +static void stall_worker(SerialType serial); +static int handle_io_busy(int ndx); +static int handle_io_free(int ndx); +static int handle_io_stalled(int ndx); +static void check_que(void); +static void main_loop(void); +static void usage(char *unknown); +static void domaincopy(AddrByte *out,AddrByte *in); +static int domaineq(AddrByte *d1, AddrByte *d2); +static int get_domainname(AddrByte *inbuff, int insize, AddrByte *domainbuff); +static Worker *pick_worker_greedy(AddrByte *domainbuff); +static void restart_worker(Worker *w); +static void start_que_request(Worker *w) ; +#ifdef WIN32 +static int read_int32(HANDLE fd, int *res, HANDLE ev); +static int read_exact(HANDLE fd, void *vbuff, DWORD nbytes, HANDLE ev); +static int write_exact(HANDLE fd, AddrByte *buff, DWORD len,HANDLE ev); +DWORD WINAPI worker_loop(void *v); +DWORD WINAPI reader(void *data); +DWORD WINAPI writer(void *data); +static int send_mes_to_worker(QueItem *m, Worker *pw); +BOOL create_mesq(MesQ **q); +BOOL enque_mesq(MesQ *q, QueItem *m); +BOOL deque_mesq(MesQ *q, QueItem **m); +BOOL close_mesq(MesQ *q); +HANDLE event_mesq(MesQ *q); +#else +static size_t read_int32(int fd, int *res); +static ssize_t read_exact(int fd, void *vbuff, size_t nbytes); +static int write_exact(int fd, AddrByte *buff, int len); +void reap_children(int ignored); +static void init_signals(void); +static void kill_all_workers(void); +static void close_all_worker_fds(void); +static int worker_loop(void); +static int fillin_reply(Worker *pw); +static int send_request_to_worker(AddrByte *pr, int rsize, Worker *pw); +#endif + +#define ERL_DBG_LVL_ENV_VAR "ERL_INET_GETHOST_DEBUG" + +static int +get_env_debug_level(void) +{ +#ifdef __WIN32__ + char value[21]; /* Enough for any 64-bit values */ + DWORD sz = GetEnvironmentVariable((LPCTSTR) ERL_DBG_LVL_ENV_VAR, + (LPTSTR) value, + (DWORD) sizeof(value)); + if (sz == 0 || sz > sizeof(value)) + return 0; +#else + char *value = getenv(ERL_DBG_LVL_ENV_VAR); + if (!value) + return 0; +#endif + return atoi(value); +} + +#ifdef WIN32 +static void do_allocate_console(void) +{ + AllocConsole(); + debug_console_allocated = CreateFile ("CONOUT$", GENERIC_WRITE, + FILE_SHARE_WRITE, NULL, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, NULL); +} +#ifdef HARDDEBUG +DWORD WINAPI pseudo_worker_loop(void *v); +static void poll_gethost(int row); +#endif +#endif + +/* + * Main + */ +int main(int argc, char **argv) +{ + int num_workers = 1; + char **ap = argv + 1; + int x; + int disable_greedy = 0; + + program_name = *argv; + que_first = que_last = NULL; + debug_level = get_env_debug_level(); + greedy_threshold = 0; + + while (*ap) { + if (!strcmp(*ap, "-d")) { + ++debug_level; + } else if(!strcmp(*ap, "-g") && *(ap + 1)) { + ++ap; + x = atoi(*ap); + if (!x) { + usage(*ap); + } else { + greedy_threshold = x; + } + } else if(!strcmp(*ap, "-ng")) { + disable_greedy = 1; + } else { + x = atoi(*ap); + if (!x) { + usage(*ap); + } else { + num_workers = x; + } + } + ++ap; + } + +#ifdef WIN32 + if (num_workers > 60 || greedy_threshold > 60) { + usage("More than 60 workers on windows impossible!"); + num_workers = 60; + greedy_threshold = 0; + } +#endif + + if(!greedy_threshold) { + greedy_threshold = (3*num_workers)/4; /* 75% */ + if (!greedy_threshold) { + greedy_threshold = num_workers; + } + } + + if (disable_greedy) { + greedy_threshold = num_workers + 1; + } + +#ifdef WIN32 + { + WORD wr; + WSADATA wsa_data; + int wsa_error; + wr = MAKEWORD(2,0); + + wsa_error = WSAStartup(wr,&wsa_data); + if (wsa_error) { + fatal("Could not open usable winsock library."); + } + if (LOBYTE(wsa_data.wVersion) != 2 || HIBYTE(wsa_data.wVersion) != 0) { + fatal("Could not open recent enough winsock library."); + } + + if (debug_level >= 1) { + do_allocate_console(); + + DEBUGF(1,("num_workers = %d, greedy_threshold = %d, " + "debug_level = %d.", + num_workers, greedy_threshold, debug_level)); + } + } + WAKEUP_WINSOCK(); /* Why on earth is this needed? */ + +#endif + + init_workers(num_workers); + + main_loop(); +#ifndef WIN32 + kill_all_workers(); +#endif + return 0; +} + +static void usage(char *unknown) +{ + fprintf(stderr,"%s: Unknown option \"%s\"\n" + "Usage: %s [-d [-d ...]] [-g ] " + "[]\n", + program_name, unknown, program_name); +} + +/* + * Main process main loop + */ + +static int handle_io_busy(int ndx) +{ + /* Probably an answer */ + int res; + res = relay_reply(&busy_workers[ndx]); + if (res < 0) { + /* Bad worker */ + if (busy_workers[ndx].que_size) { + restart_worker(&busy_workers[ndx]); + start_que_request(&busy_workers[ndx]); + return 0; + } else { + kill_worker(&busy_workers[ndx]); + --num_busy_workers; + busy_workers[ndx] = busy_workers[num_busy_workers]; + } + return 1; + } else if (res == 0) { + /* Erlang has closed */ + return -1; + } else { + if (busy_workers[ndx].que_size) { + start_que_request(&busy_workers[ndx]); + return 0; + } + /* The worker is no longer busy, it should be in the free list */ + free_workers[num_free_workers] = busy_workers[ndx]; + free_workers[num_free_workers].state = WORKER_FREE; + ++num_free_workers; + --num_busy_workers; + busy_workers[ndx] = busy_workers[num_busy_workers]; + return 1; + } +} + +static int handle_io_free(int ndx) +{ + /* IO from a free worker means "kill me" */ + DEBUGF(1,("Free worker[%ld] spontaneously died.", + (long) free_workers[ndx].pid)); + kill_worker(&free_workers[ndx]); + --num_free_workers; + free_workers[ndx] = free_workers[num_free_workers]; + return 1; +} + +static int handle_io_stalled(int ndx) +{ + int res; + res = ignore_reply(&stalled_workers[ndx]); + if (res <= 0) { + /* Bad worker */ + kill_worker(&stalled_workers[ndx]); + --num_stalled_workers; + stalled_workers[ndx] = stalled_workers[num_stalled_workers]; + return 1; + } else { + DEBUGF(3,("Ignoring reply from stalled worker[%ld].", + (long) stalled_workers[ndx].pid)); + free_workers[num_free_workers] = stalled_workers[ndx]; + free_workers[num_free_workers].state = WORKER_FREE; + ++num_free_workers; + --num_stalled_workers; + stalled_workers[ndx] = stalled_workers[num_stalled_workers]; + return 1; + } +} + +static void check_que(void) +{ + /* Check if anything in the que can be handled */ + Worker *cw; + + while (que_first) { + QueItem *qi,*nxt; + qi = que_first; + nxt = qi->next; /* Need to save before it's getting put in another que + in threaded solution */ + if ((cw = pick_worker()) == NULL) { + break; + } +#ifdef WIN32 + { + SerialType save_serial = get_serial(que_first->request); + if (send_mes_to_worker(que_first, cw) != 0) { + kill_last_picked_worker(); + continue; + } + cw->serial = save_serial; + } +#else + if (send_request_to_worker(que_first->request, + que_first->req_size, cw) != 0) { + /* Couldn't send request, kill the worker and retry */ + kill_last_picked_worker(); + continue; + } + cw->serial = get_serial(que_first->request); +#endif + /* Went well, lets deque */ + que_first = nxt; + if (que_first == NULL) { + que_last = NULL; + } + DEBUGF(3,("Did deque serial %d, Que is %sempty", + get_serial(qi->request), (que_first) ? "not " : "")); +#ifndef WIN32 + FREE(qi); +#endif + } +} + +static int clean_que_of(SerialType s) +{ + QueItem **qi; + int i; + + for(qi=&que_first;*qi != NULL && + s != get_serial((*qi)->request); qi = &((*qi)->next)) + ; + if(*qi != NULL) { + QueItem *r = *qi; + *qi = (*qi)->next; + FREE(r); + if(que_last == r) { + /* Lost the "last" pointer, should be very uncommon + if the que is not empty, so we simply do a traversal + to reclaim it. */ + if (que_first == NULL) { + que_last = NULL; + } else { + for (que_last=que_first;que_last->next != NULL; + que_last = que_last->next) + ; + } + } + DEBUGF(3,("Removing serial %d from global que on request, " + "que %sempty",s, (que_first) ? "not " : "")); + return 1; + } + for (i = 0; i < num_busy_workers; ++i) { + for(qi=&(busy_workers[i].que_first);*qi != NULL && + s != get_serial((*qi)->request); qi = &((*qi)->next)) + ; + if(*qi != NULL) { + QueItem *r = *qi; + *qi = (*qi)->next; + FREE(r); + if(busy_workers[i].que_last == r) { + /* Lost the "last" pointer, should be very uncommon + if the que is not empty, so we simply do a traversal + to reclaim it. */ + if (busy_workers[i].que_first == NULL) { + busy_workers[i].que_last = NULL; + if (busy_workers[i].que_size != 1) { + fatal("Worker que size counter incorrect, internal datastructure error."); + } + } else { + for (busy_workers[i].que_last = busy_workers[i].que_first; + busy_workers[i].que_last->next != NULL; + busy_workers[i].que_last = busy_workers[i].que_last->next) + ; + } + } + --(busy_workers[i].que_size); + DEBUGF(3,("Removing serial %d from worker[%ld] specific que " + "on request, que %sempty", + s, (long) busy_workers[i].pid, + (busy_workers[i].que_first) ? "not " : "")); + return 1; + } + } + return 0; +} + +static void main_loop(void) +{ + AddrByte *inbuff = NULL; + int insize; + int i,w; +#ifdef WIN32 + HANDLE handles[64]; + DWORD num_handles; + DWORD index; + QueItem *qi; +#else + size_t inbuff_size = 0; + fd_set fds; + int max_fd; +#endif + int new_data; + int save_serial; + /* It's important that the free workers list is handled first */ + Worker *workers[3] = {free_workers, busy_workers, stalled_workers}; + int *wsizes[3] = {&num_free_workers, &num_busy_workers, + &num_stalled_workers}; + int (*handlers[3])(int) = {&handle_io_free, &handle_io_busy, + &handle_io_stalled}; + Worker *cw; + AddrByte domainbuff[DOMAINNAME_MAX]; + +#ifdef WIN32 + + { + DWORD dummy; + /* Create the reader and writer */ + if ((!create_mesq(&to_erlang)) || (!create_mesq(&from_erlang))) { + fatal("Could not create message que! errno = %d.",GetLastError()); + } + if (((HANDLE) _beginthreadex(NULL,0,writer,to_erlang,0,&dummy)) + == NULL) { + fatal("Could not create writer thread! errno = %d.",GetLastError()); + } + if (((HANDLE) _beginthreadex(NULL,0,reader,from_erlang,0,&dummy)) + == NULL) { + fatal("Could not create reader thread! errno = %d.",GetLastError()); + } + DEBUGF(4,("Created reader and writer threads.")); +#ifdef HARDDEBUG + poll_gethost(__LINE__); +#endif + } +#endif + + for(;;) { +#ifdef WIN32 + num_handles = 0; + handles[num_handles++] = event_mesq(from_erlang); + for (w = 0; w < 3; ++w) { + for (i = 0; i < *wsizes[w]; ++i) { + handles[num_handles++] = event_mesq(workers[w][i].readfrom); + } + } + + if ((index = WaitForMultipleObjects(num_handles, handles, FALSE, INFINITE)) + == WAIT_FAILED) { + fatal("Could not WaitForMultpleObjects! errno = %d.",GetLastError()); + } + w = 0; + index -= WAIT_OBJECT_0; + + DEBUGF(4,("Got data on index %d.",index)); + if (index > 0) { + if (((int)index - 1) < *wsizes[0]) { + (*handlers[0])(index - 1); + } else if (((int)index - 1) < ((*wsizes[0]) + (*wsizes[1]))) { + (*handlers[1])(index - 1 - (*wsizes[0])); + } else { + (*handlers[2])(index - 1 - (*wsizes[0]) - (*wsizes[1])); + } + } + new_data = (index == 0); +#else + max_fd = 0; + FD_ZERO(&fds); + FD_SET(0,&fds); + for (w = 0; w < 3; ++w) { + for (i = 0; i < *wsizes[w]; ++i) { + FD_SET(workers[w][i].readfrom,&fds); + if (workers[w][i].readfrom > max_fd) { + max_fd = workers[w][i].readfrom; + } + } + } + for (;;) { + if (select(max_fd + 1,&fds,NULL,NULL,NULL) < 0) { + if (errno == EINTR) { + continue; + } else { + fatal("Select failed (invalid internal structures?), " + "errno = %d.",errno); + } + } + break; + } + for (w = 0; w < 3; ++w) { + for (i = 0; i < *wsizes[w]; ++i) { + if (FD_ISSET(workers[w][i].readfrom, &fds)) { + int hres = (*handlers[w])(i); + if (hres < 0) { + return; + } else { + i -= hres; /* We'll retry this position, if hres == 1. + The position is usually + replaced with another worker, + a worker with + I/O usually changes state as we + use blocking file I/O */ + } + } + } + } + new_data = FD_ISSET(0,&fds); + +#endif + + check_que(); + + /* Now check for new requests... */ + if (new_data) { /* Erlang... */ + OpType op; +#ifdef WIN32 + if (!deque_mesq(from_erlang,&qi)) { + DEBUGF(1,("Erlang has closed.")); + return; + } + insize = qi->req_size; + inbuff = qi->request; + DEBUGF(4,("Got data from erlang.")); + DEBUGF(4,("OPeration == %d.",get_op(inbuff))); +#else + insize = read_request(&inbuff, &inbuff_size); + if (insize == 0) { /* Other errors taken care of in + read_request */ + DEBUGF(1,("Erlang has closed.")); + return; + } +#endif + op = get_op(inbuff); + if (op == OP_CANCEL_REQUEST) { + SerialType serial = get_serial(inbuff); + if (!clean_que_of(serial)) { + for (i = 0; i < num_busy_workers; ++i) { + if (busy_workers[i].serial == serial) { + if (busy_workers[i].que_size) { + restart_worker(&busy_workers[i]); + start_que_request(&busy_workers[i]); + } else { + stall_worker(i); + check_que(); + } + break; + } + } + } +#ifdef WIN32 + FREE(qi); +#endif + continue; /* New select */ + } else if (op == OP_CONTROL) { + CtlType ctl; + SerialType serial = get_serial(inbuff); + if (serial != INVALID_SERIAL) { + fatal("Invalid serial: %d.", serial); + } + switch (ctl = get_ctl(inbuff)) { + case SETOPT_DEBUG_LEVEL: + { + int tmp_debug_level = get_debug_level(inbuff); +#ifdef WIN32 + if (debug_console_allocated == INVALID_HANDLE_VALUE && + tmp_debug_level > 0) { + DWORD res; + do_allocate_console(); + WriteFile(debug_console_allocated, + "Hej\n",4,&res,NULL); + } +#endif + debug_level = tmp_debug_level; + DEBUGF(debug_level, ("debug_level = %d", debug_level)); + for (w = 0; w < 3; ++w) { + for (i = 0; i < *wsizes[w]; i++) { + int res; +#ifdef WIN32 + QueItem *m; +#endif + cw = &(workers[w][i]); +#ifdef WIN32 + m = ALLOC(sizeof(QueItem) - 1 + qi->req_size); + memcpy(m->request, qi->request, + (m->req_size = qi->req_size)); + m->next = NULL; + if ((res = send_mes_to_worker(m, cw)) != 0) { + FREE(m); + } +#else + res = send_request_to_worker(inbuff, insize, cw); +#endif + if (res != 0) { + kill_worker(cw); + (*wsizes[w])--; + *cw = workers[w][*wsizes[w]]; + } + } + } + } + break; + default: + warning("Unknown control requested from erlang (%d), " + "message discarded.", (int) ctl); + break; + } +#ifdef WIN32 + FREE(qi); +#endif + continue; /* New select */ + } else { + ProtoType proto; + if (op != OP_GETHOSTBYNAME && op != OP_GETHOSTBYADDR) { + warning("Unknown operation requested from erlang (%d), " + "message discarded.", op); +#ifdef WIN32 + FREE(qi); +#endif + continue; + } + if ((proto = get_proto(inbuff)) != PROTO_IPV4 && + proto != PROTO_IPV6) { + warning("Unknown protocol requested from erlang (%d), " + "message discarded.", proto); +#ifdef WIN32 + FREE(qi); +#endif + continue; + } + if (get_domainname(inbuff,insize,domainbuff) < 0) { + warning("Malformed message sent from erlang, no domain, " + "message discarded.", op); +#ifdef WIN32 + FREE(qi); +#endif + continue; + } + } + + if (BEE_GREEDY()) { + DEBUGF(4,("Beeing greedy!")); + if ((cw = pick_worker_greedy(domainbuff)) != NULL) { + /* Put it in the worker specific que if the + domainname matches... */ +#ifndef WIN32 + QueItem *qi = ALLOC(sizeof(QueItem) - 1 + + insize); + qi->req_size = insize; + memcpy(&(qi->request), inbuff, insize); + qi->next = NULL; +#endif + if (!cw->que_first) { + cw->que_first = cw->que_last = qi; + } else { + cw->que_last->next = qi; + cw->que_last = qi; + } + ++(cw->que_size); + continue; + } + /* Otherwise busyness as usual */ + } + + save_serial = get_serial(inbuff); + + while ((cw = pick_worker()) != NULL) { + int res; +#ifdef WIN32 + res = send_mes_to_worker(qi,cw); +#else + res = send_request_to_worker(inbuff, insize, cw); +#endif + if (res == 0) { + break; + } else { + kill_last_picked_worker(); + } + } + + if (cw == NULL) { + /* Insert into que */ +#ifndef WIN32 + QueItem *qi = ALLOC(sizeof(QueItem) - 1 + + insize); + qi->req_size = insize; + memcpy(&(qi->request), inbuff, insize); + qi->next = NULL; +#endif + if (!que_first) { + que_first = que_last = qi; + } else { + que_last->next = qi; + que_last = qi; + } + } else { + cw->serial = save_serial; + domaincopy(cw->domain, domainbuff); + } + } + } +} + +/* + * Main process worker administration + */ + +static void init_workers(int max) +{ + max_workers = max; + num_busy_workers = 0; + num_free_workers = 0; + num_stalled_workers = 0; + + busy_workers = ALLOC(sizeof(Worker) * max_workers); + free_workers = ALLOC(sizeof(Worker) * max_workers); + stalled_workers = ALLOC(sizeof(Worker) * max_workers); +#ifndef WIN32 + init_signals(); +#endif +} + +#ifdef WIN32 +static void kill_worker(Worker *pw) +{ + /* Cannot really kill a thread in win32, have to just leave it to die */ + close_mesq(pw->writeto); + close_mesq(pw->readfrom); + pw->state = WORKER_EMPTY; +} +#else +static void kill_worker(Worker *pw) +{ + fd_set fds; + struct timeval tmo; + int selret; + static char buff[1024]; + + DEBUGF(3,("Killing worker[%ld] with fd %d, serial %d", + (long) pw->pid, + (int) pw->readfrom, + (int) pw->serial)); + kill(pw->pid, SIGUSR1); + /* This is all just to check that the child died, not + really necessary */ + for(;;) { + FD_ZERO(&fds); + FD_SET(pw->readfrom, &fds); + tmo.tv_usec=0; + tmo.tv_sec = CHILDWAIT_TMO; + selret = select(pw->readfrom+1, &fds, NULL, NULL, &tmo); + if (selret < 0) { + if (errno != EINTR) { + warning("Unable to select on dying child file descriptor, " + "errno = %d.",errno); + break; + } + } else if (selret == 0) { + warning("Timeout waiting for child process to die, " + "ignoring child (pid = %d).", pw->pid); + break; + } else { + int ret; + if ((ret = read(pw->readfrom, buff, 1024)) < 0) { + if (errno != EINTR) { + warning("Child file descriptor not closed properly, " + "errno = %d", errno); + break; + } + } else if (ret == 0) { + break; + } + /* continue */ + } + } + /* Waiting is done by signal handler... */ + close(pw->readfrom); + close(pw->writeto); + pw->state = WORKER_EMPTY; + /* Leave rest as is... */ +} + +static void kill_all_workers(void) +/* Emergency function, will not check that the children died... */ +{ + int i; + for (i = 0; i < num_busy_workers; ++i) { + kill(busy_workers[i].pid, SIGUSR1); + } + for (i = 0; i < num_free_workers; ++i) { + kill(free_workers[i].pid, SIGUSR1); + } + for (i = 0; i < num_stalled_workers; ++i) { + kill(stalled_workers[i].pid, SIGUSR1); + } +} +#endif /* !WIN32 */ + +static Worker *pick_worker(void) +{ + Worker tmp; + if (num_free_workers > 0) { + --num_free_workers; + tmp = free_workers[num_free_workers]; + } else if (num_stalled_workers > 0) { + /* "restart" the worker... */ + --num_stalled_workers; + kill_worker(&(stalled_workers[num_stalled_workers])); + if (create_worker(&tmp,0) < 0) { + warning("Unable to create worker process, insufficient " + "resources"); + return NULL; + } + } else { + if (num_busy_workers == max_workers) { + return NULL; + } + if (create_worker(&tmp,0) < 0) { + warning("Unable to create worker process, insufficient " + "resources"); + return NULL; + } + } + /* tmp contains a worker now, make it busy and put it in the right + array */ + tmp.state = WORKER_BUSY; + busy_workers[num_busy_workers] = tmp; + ++num_busy_workers; + return &(busy_workers[num_busy_workers-1]); +} + +static Worker *pick_worker_greedy(AddrByte *domainbuff) +{ + int i; + int ql = 0; + int found = -1; + for (i=0; i < num_busy_workers; ++i) { + if (domaineq(busy_workers[i].domain, domainbuff)) { + if ((found < 0) || (busy_workers[i].que_size < + busy_workers[found].que_size)) { + found = i; + ql = busy_workers[i].que_size; + } + } + } + if (found >= 0) { + return &busy_workers[found]; + } + return NULL; +} + +static void restart_worker(Worker *w) +{ + kill_worker(w); + if (create_worker(w,1) < 0) { + fatal("Unable to create worker process, insufficient resources"); + } +} + +static void kill_last_picked_worker(void) +{ + kill_worker( &(busy_workers[num_busy_workers-1])); + --num_busy_workers; +} + +/* + * Starts a request qued to a specific worker, check_que starts normally queued requests. + * We expect a que here... + */ +static void start_que_request(Worker *w) +{ + QueItem *qi; + SerialType save_serial; + if (!w->que_first || !w->que_size) { + fatal("Expected que'd requests but found none, " + "internal datastructure corrupted!"); + } + qi = w->que_first; + w->que_first = w->que_first->next; + if (!w->que_first) { + w->que_last = NULL; + } + --(w->que_size); + save_serial = get_serial(qi->request); +#ifdef WIN32 + while (send_mes_to_worker(qi, w) != 0) { + restart_worker(w); + } +#else + while (send_request_to_worker(qi->request, + qi->req_size, w) != 0) { + restart_worker(w); + } +#endif + w->serial = save_serial; + DEBUGF(3,("Did deque serial %d from worker[%ld] specific que, " + "Que is %sempty", + get_serial(qi->request), (long) w->pid, + (w->que_first) ? "not " : "")); +#ifndef WIN32 + FREE(qi); +#endif +} + +#ifndef WIN32 +/* Signal utilities */ +static RETSIGTYPE (*sys_sigset(int sig, RETSIGTYPE (*func)(int)))(int) +{ + struct sigaction act, oact; + + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + act.sa_handler = func; + sigaction(sig, &act, &oact); + return(oact.sa_handler); +} + + +static void sys_sigblock(int sig) +{ + sigset_t mask; + + sigemptyset(&mask); + sigaddset(&mask, sig); + sigprocmask(SIG_BLOCK, &mask, (sigset_t *)NULL); +} + +static void sys_sigrelease(int sig) +{ + sigset_t mask; + + sigemptyset(&mask); + sigaddset(&mask, sig); + sigprocmask(SIG_UNBLOCK, &mask, (sigset_t *)NULL); +} + +/* Child signal handler */ +void reap_children(int ignored) +{ + int res; + sys_sigblock(SIGCHLD); + for (;;) { + while ((res = waitpid((pid_t)-1, NULL, WNOHANG)) > 0) + ; + if (!(res < 0 && errno == EAGAIN)) { + DEBUGF(4,("reap_children: res = %d, errno = %d.",res,errno)); + break; + } + } + sys_sigrelease(SIGCHLD); +} + +static void init_signals(void) +{ + sys_sigset(SIGCHLD,&reap_children); /* SIG_IGN would give same result + on most (?) platforms. */ + sys_sigset(SIGPIPE, SIG_IGN); +} +#endif + +static void stall_worker(int ndx) +{ + --num_busy_workers; + stalled_workers[num_stalled_workers] = busy_workers[ndx]; + stalled_workers[num_stalled_workers].state = WORKER_STALLED; + busy_workers[ndx] = busy_workers[num_busy_workers]; + DEBUGF(3, ("Stalled worker[%ld]", + (long) stalled_workers[num_stalled_workers].pid)); + ++num_stalled_workers; +} + + +/* + * Main loop message passing + */ +#ifndef WIN32 +static int read_request(AddrByte **buff, size_t *buff_size) +{ + int siz; + int r; + + if ((r = READ_PACKET_BYTES(0,&siz)) != PACKET_BYTES) { + if (r == 0) { + return 0; + } else { + fatal("Unexpected end of file on main input, errno = %d",errno); + } + } + + if (siz > *buff_size) { + if (buff_size == 0) { + *buff = ALLOC((*buff_size = siz)); + } else { + *buff = REALLOC(*buff, (*buff_size = siz)); + } + } + if (read_exact(0,*buff, siz) != siz) { + fatal("Unexpected end of file on main input, errno = %d",errno); + } + if (siz < 5) { + fatal("Unexpected message on main input, message size %d less " + "than minimum."); + } + return siz; +} + +#endif /* !WIN32 */ + +static OpType get_op(AddrByte *buff) +{ + return (OpType) buff[4]; +} + +static AddrByte *get_op_addr(AddrByte *buff) +{ + return buff + 4; +} + +static SerialType get_serial(AddrByte *buff) +{ + return get_int32(buff); +} + +static ProtoType get_proto(AddrByte *buff) +{ + return (ProtoType) buff[5]; +} + +static CtlType get_ctl(AddrByte *buff) +{ + return (CtlType) buff[5]; +} + +static AddrByte *get_data(AddrByte *buff) +{ + return buff + 6; +} + +static int get_debug_level(AddrByte *buff) +{ + return get_int32(buff + 6); +} + +#ifdef WIN32 +static int send_mes_to_worker(QueItem *m, Worker *pw) +{ + if (!enque_mesq(pw->writeto, m)) { + warning("Unable to send to child process."); + return -1; + } + return 0; +} +#else +static int send_request_to_worker(AddrByte *pr, int rsize, Worker *pw) +{ + AddrByte hdr[PACKET_BYTES]; + + PUT_PACKET_BYTES(hdr, rsize); + if (write_exact(pw->writeto, hdr, PACKET_BYTES) < 0) { + warning("Unable to write to child process."); + return -1; + } + if (write_exact(pw->writeto, (AddrByte *) pr, rsize) < 0) { + warning("Unable to write to child process."); + return -1; + } + return 0; +} +#endif /* !WIN32 */ + +#ifdef WIN32 +static int relay_reply(Worker *pw) +{ + QueItem *m; + if (!deque_mesq(pw->readfrom,&m)) { + return 0; + } + if (!enque_mesq(to_erlang,m)) { + FREE(m); + return 0; + } + return 1; +} + +static int ignore_reply(Worker *pw) { + QueItem *m; + if (!deque_mesq(pw->readfrom,&m)) { + return 0; + } + FREE(m); + return 1; +} + +#else + +/* Static buffers used by the next three functions */ +static AddrByte *relay_buff = NULL; +static int relay_buff_size = 0; + +static int fillin_reply(Worker *pw) +{ + int length; + + if (READ_PACKET_BYTES(pw->readfrom, &length) != PACKET_BYTES) { + warning("Malformed reply (header) from worker process %d.", + pw->pid); + return -1; + } + + if (relay_buff_size < (length + PACKET_BYTES)) { + if (!relay_buff_size) { + relay_buff = + ALLOC((relay_buff_size = (length + PACKET_BYTES))); + } else { + relay_buff = + REALLOC(relay_buff, + (relay_buff_size = (length + PACKET_BYTES))); + } + } + PUT_PACKET_BYTES(relay_buff, length); + if (read_exact(pw->readfrom, relay_buff + PACKET_BYTES, length) != + length) { + warning("Malformed reply (data) from worker process %d.", pw->pid); + return -1; + } + return length; +} + +static int relay_reply(Worker *pw) +{ + int length = fillin_reply(pw); /* Filled into the "global" buffer */ + int res; + + if (length < 0) { + return -1; + } + if ((res = write_exact(1, relay_buff, length + PACKET_BYTES)) < 0) { + fatal("Cannot write reply to erlang process, errno = %d.", errno); + } else if (res == 0) { + DEBUGF(1,("Erlang has closed write pipe.")); + return 0; + } + return length; +} + +static int ignore_reply(Worker *pw) +{ + return fillin_reply(pw); +} + +#endif /* !WIN32 */ + +/* + * Domain name "parsing" and worker specific queing + */ +static void domaincopy(AddrByte *out, AddrByte *in) +{ + AddrByte *ptr = out; + *ptr++ = *in++; + *ptr++ = *in++; + switch(*out) { + case OP_GETHOSTBYNAME: + while(*in != '\0' && *in != '.') + ++in; + strncpy((char*)ptr, (char*)in, DOMAINNAME_MAX-2); + ptr[DOMAINNAME_MAX-3] = '\0'; + DEBUGF(4,("Saved domainname %s.", ptr)); + return; + case OP_GETHOSTBYADDR: + memcpy(ptr,in, ((out[1] == PROTO_IPV4) ? UNIT_IPV4 : UNIT_IPV6) - 1); + DEBUGF(4, ("Saved domain address: %s.", + format_address(((out[1] == PROTO_IPV4) ? + UNIT_IPV4 : UNIT_IPV6) - 1,ptr))); + return; + default: + fatal("Trying to copy buffer not containing valid domain, [%d,%d].", + (int) out[0], (int) out[1]); + } +} + +static int domaineq(AddrByte *d1, AddrByte *d2) +{ + if (d1[0] != d2[0] || d1[1] != d2[1]) { + return 0; + } + switch (d1[0]) { + case OP_GETHOSTBYNAME: + return !strcmp((char*)d1+2,(char*)d2+2); + case OP_GETHOSTBYADDR: + return !memcmp(d1+2,d2+2, ((d1[1] == PROTO_IPV4) + ? UNIT_IPV4 : UNIT_IPV6) - 1); + default: + fatal("Trying to compare buffers not containing valid domain, " + "[%d,%d].", + (int) d1[0], (int) d1[1]); + return -1; /* Lint... */ + } +} + +static int get_domainname(AddrByte *inbuff, int insize, AddrByte *domainbuff) +{ + OpType op = get_op(inbuff); + ProtoType proto; + int i; + AddrByte *data; + + data = get_data(inbuff); + switch (op) { + case OP_GETHOSTBYNAME: + data = get_data(inbuff); + for (i = (data - inbuff); i < insize && inbuff[i] != '\0'; ++i) + ; + if (i < insize) { + domaincopy(domainbuff, get_op_addr(inbuff)); + return 0; + } + DEBUGF(3, ("Could not pick valid domainname in " + "gethostbyname operation")); + return -1; + case OP_GETHOSTBYADDR: + proto = get_proto(inbuff); + i = insize - (data - inbuff); + if ((proto == PROTO_IPV4 && i == UNIT_IPV4) || + (proto == PROTO_IPV6 && i == UNIT_IPV6)) { + /* An address buffer */ + domaincopy(domainbuff, get_op_addr(inbuff)); + return 0; + } + DEBUGF(3, ("Could not pick valid domainname in gethostbyaddr " + "operation")); + return -1; + default: + DEBUGF(2, ("Could not pick valid domainname because of " + "invalid opcode %d.", (int) op)); + return -1; + } +} + +/* + * Worker subprocesses with utilities + */ +#ifdef WIN32 +static int create_worker(Worker *pworker, int save_que) +{ + MesQ **thread_data = ALLOC(2*sizeof(MesQ *)); + DWORD tid; + + + if (!create_mesq(thread_data)) { + fatal("Could not create, pipes for subprocess, errno = %d", + GetLastError()); + } + if (!create_mesq(thread_data + 1)) { + fatal("Could not create, pipes for subprocess, errno = %d", + GetLastError()); + } + /* Save those before the thread starts */ + pworker->writeto = thread_data[0]; + pworker->readfrom = thread_data[1]; + + if (((HANDLE) _beginthreadex(NULL, 0, worker_loop, thread_data, 0, &tid)) + == NULL) { + fatal("Could not create thread errno = %d", + GetLastError()); + } + pworker->pid = tid; + pworker->state = WORKER_FREE; + pworker->serial = INVALID_SERIAL; + if (!save_que) { + pworker->que_first = pworker->que_last = NULL; + pworker->que_size = 0; + } + DEBUGF(3,("Created worker[%ld] with fd %d", + (long) pworker->pid, (int) pworker->readfrom)); + return 0; +} + +#else + +static int create_worker(Worker *pworker, int save_que) +{ + int p0[2], p1[2]; + pid_t child; + + if (pipe(p0)) { + warning("Could not create, pipes for subprocess, errno = %d", + errno); + return -1; + } + + if (pipe(p1)) { + warning("Could not create, pipes for subprocess, errno = %d", + errno); + close(p0[0]); + close(p0[1]); + return -1; + } + if ((child = fork()) < 0) { /* failure */ + warning("Could not fork(), errno = %d", + errno); + close(p0[0]); + close(p0[1]); + close(p1[0]); + close(p1[1]); + return -1; + } else if (child > 0) { /* parent */ + close(p0[1]); + close(p1[0]); + pworker->writeto = p1[1]; + pworker->readfrom = p0[0]; + pworker->pid = child; + pworker->state = WORKER_FREE; + pworker->serial = INVALID_SERIAL; + if (!save_que) { + pworker->que_first = pworker->que_last = NULL; + pworker->que_size = 0; + } + DEBUGF(3,("Created worker[%ld] with fd %d", + (long) pworker->pid, (int) pworker->readfrom)); + return 0; + } else { /* child */ + close(p1[1]); + close(p0[0]); + close_all_worker_fds(); + /* Make "fatal" not find any children */ + num_busy_workers = num_free_workers = num_stalled_workers = 0; + if((dup2(p1[0],0) < 0) || (dup2(p0[1],1) < 0)) { + fatal("Worker could not dup2(), errno = %d", + errno); + return -1; /* lint... */ + } + close(p1[0]); + close(p0[1]); + signal(SIGCHLD, SIG_IGN); + return worker_loop(); + } +} + +static void close_all_worker_fds(void) +{ + int w,i; + Worker *workers[3] = {free_workers, busy_workers, stalled_workers}; + int wsizes[3] = {num_free_workers, num_busy_workers, + num_stalled_workers}; + for (w = 0; w < 3; ++w) { + for (i = 0; i < wsizes[w]; ++i) { + if (workers[w][i].state != WORKER_EMPTY) { + close(workers[w][i].readfrom); + close(workers[w][i].writeto); + } + } + } +} + +#endif /* !WIN32 */ + +#ifdef WIN32 +DWORD WINAPI worker_loop(void *v) +#else +static int worker_loop(void) +#endif +{ + AddrByte *req = NULL; + size_t req_size = 0; + int this_size; + AddrByte *reply = NULL; + size_t reply_size = 0; + size_t data_size; + +#ifdef WIN32 + QueItem *m = NULL; + MesQ *readfrom = ((MesQ **) v)[0]; + MesQ *writeto = ((MesQ **) v)[1]; + /* XXX:PaN */ + FREE(v); +#endif + + for(;;) { +#ifdef HAVE_GETADDRINFO + struct addrinfo *ai = NULL; +#endif + struct hostent *he = NULL; +#ifdef HAVE_GETNAMEINFO + struct sockaddr *sa = NULL; + char name[NI_MAXHOST]; +#endif +#if defined(HAVE_GETIPNODEBYNAME) || defined(HAVE_GETIPNODEBYADDR) + int free_he = 0; +#endif + int error_num = 0; + SerialType serial; + OpType op; + ProtoType proto; + AddrByte *data; + +#ifdef WIN32 + WaitForSingleObject(event_mesq(readfrom),INFINITE); + DEBUGF(4,("Worker got data on message que.")); + + if(!deque_mesq(readfrom,&m)) { + goto fail; + } + this_size = m->req_size; + req = m->request; +#else + if (READ_PACKET_BYTES(0,&this_size) != PACKET_BYTES) { + DEBUGF(2,("Worker got error/EOF while reading size, exiting.")); + exit(0); + } + if (this_size > req_size) { + if (req == NULL) { + req = ALLOC((req_size = this_size)); + } else { + req = REALLOC(req, (req_size = this_size)); + } + } + if (read_exact(0, req, (size_t) this_size) != this_size) { + DEBUGF(1,("Worker got EOF while reading data, exiting.")); + exit(0); + } +#endif + /* Decode the request... */ + serial = get_serial(req); + if (OP_CONTROL == (op = get_op(req))) { + CtlType ctl; + if (serial != INVALID_SERIAL) { + DEBUGF(1, ("Worker got invalid serial: %d.", serial)); + exit(0); + } + switch (ctl = get_ctl(req)) { + case SETOPT_DEBUG_LEVEL: + debug_level = get_debug_level(req); + DEBUGF(debug_level, + ("Worker debug_level = %d.", debug_level)); + break; + } + continue; + } + proto = get_proto(req); + data = get_data(req); + DEBUGF(4,("Worker got request, op = %d, proto = %d, data = %s.", + op,proto,data)); + /* Got a request, lets go... */ + switch (op) { + case OP_GETHOSTBYNAME: + switch (proto) { + +#ifdef HAVE_IN6 + case PROTO_IPV6: { /* switch (proto) { */ +#ifdef HAVE_GETADDRINFO + struct addrinfo hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_flags = (AI_CANONNAME|AI_V4MAPPED|AI_ADDRCONFIG); + hints.ai_socktype = SOCK_STREAM; + hints.ai_family = AF_INET6; + DEBUGF(5, ("Starting getaddrinfo(%s, ...)", data)); + error_num = getaddrinfo((char *)data, NULL, &hints, &ai); + DEBUGF(5,("getaddrinfo returned %d", error_num)); + if (error_num) { + error_num = map_netdb_error_ai(error_num); + } +#elif defined(HAVE_GETIPNODEBYNAME) /*#ifdef HAVE_GETADDRINFO */ + DEBUGF(5,("Starting getipnodebyname(%s)",data)); + he = getipnodebyname(data, AF_INET6, AI_DEFAULT, &error_num); + if (he) { + free_he = 1; + error_num = 0; + DEBUGF(5,("getipnodebyname(,AF_INET6,,) OK")); + } else { + DEBUGF(5,("getipnodebyname(,AF_INET6,,) error %d", error_num)); + error_num = map_netdb_error(error_num); + } +#elif defined(HAVE_GETHOSTBYNAME2) /*#ifdef HAVE_GETADDRINFO */ + DEBUGF(5,("Starting gethostbyname2(%s, AF_INET6)",data)); + he = gethostbyname2((char*)data, AF_INET6); + if (he) { + error_num = 0; + DEBUGF(5,("gethostbyname2(, AF_INET6) OK")); + } else { + error_num = map_netdb_error(h_errno); + DEBUGF(5,("gethostbyname2(, AF_INET6) error %d", h_errno)); + } +#else + error_num = ERRCODE_NOTSUP; +#endif /*#ifdef HAVE_GETADDRINFO */ + } break; +#endif /*ifdef HAVE_IN6 */ + + case PROTO_IPV4: { /* switch (proto) { */ + DEBUGF(5,("Starting gethostbyname(%s)",data)); + he = gethostbyname((char*)data); + if (he) { + error_num = 0; + DEBUGF(5,("gethostbyname OK")); + } else { + error_num = map_netdb_error(h_errno); + DEBUGF(5,("gethostbyname error %d", h_errno)); + } + } break; + + default: /* switch (proto) { */ + /* Not supported... */ + error_num = ERRCODE_NOTSUP; + break; + } /* switch (proto) { */ + + if (he) { + data_size = build_reply(serial, he, &reply, &reply_size); +#ifdef HAVE_GETIPNODEBYNAME + if (free_he) { + freehostent(he); + } +#endif +#ifdef HAVE_GETADDRINFO + } else if (ai) { + data_size = build_reply_ai(serial, 16, ai, + &reply, &reply_size); + freeaddrinfo(ai); +#endif + } else { + data_size = build_error_reply(serial, error_num, + &reply, &reply_size); + } + break; /* case OP_GETHOSTBYNAME: */ + + case OP_GETHOSTBYADDR: /* switch (op) { */ + switch (proto) { +#ifdef HAVE_IN6 + case PROTO_IPV6: { +#ifdef HAVE_GETNAMEINFO + struct sockaddr_in6 *sin; + socklen_t salen = sizeof(*sin); + + sin = ALLOC(salen); +#ifndef NO_SA_LEN + sin->sin6_len = salen; +#endif + sin->sin6_family = AF_INET6; + sin->sin6_port = 0; + memcpy(&sin->sin6_addr, data, 16); + sa = (struct sockaddr *)sin; + DEBUGF(5,("Starting getnameinfo(,,%s,16,,,)", + format_address(16, data))); + error_num = getnameinfo(sa, salen, name, sizeof(name), + NULL, 0, NI_NAMEREQD); + DEBUGF(5,("getnameinfo returned %d", error_num)); + if (error_num) { + error_num = map_netdb_error_ai(error_num); + sa = NULL; + } +#elif defined(HAVE_GETIPNODEBYADDR) /*#ifdef HAVE_GETNAMEINFO*/ + struct in6_addr ia; + memcpy(ia.s6_addr, data, 16); + DEBUGF(5,("Starting getipnodebyaddr(%s,16,AF_INET6,)", + format_address(16, data))); + he = getipnodebyaddr(&ia, 16, AF_INET6, &error_num); + free_he = 1; + if (! he) { + DEBUGF(5,("getipnodebyaddr error %d", error_num)); + error_num = map_netdb_error(error_num); + } else { + DEBUGF(5,("getipnodebyaddr OK")); + } +#else /*#ifdef HAVE_GETNAMEINFO*/ + struct in6_addr ia; + memcpy(ia.s6_addr, data, 16); + DEBUGF(5,("Starting gethostbyaddr(%s,16,AF_INET6)", + format_address(16, data))); + he = gethostbyaddr((const char *) &ia, 16, AF_INET6); + if (! he) { + error_num = map_netdb_error(h_errno); + DEBUGF(5,("gethostbyaddr error %d", h_errno)); + } else { + DEBUGF(5,("gethostbyaddr OK")); + } +#endif /* #ifdef HAVE_GETNAMEINFO */ + } break; /* case PROTO_IPV6: { */ +#endif /* #ifdef HAVE_IN6 */ + + case PROTO_IPV4: { /* switch(proto) { */ + struct in_addr ia; + memcpy(&ia.s_addr, data, 4); /* Alignment required... */ + DEBUGF(5,("Starting gethostbyaddr(%s,4,AF_INET)", + format_address(4, data))); + he = gethostbyaddr((const char *) &ia, 4, AF_INET); + if (! he) { + error_num = map_netdb_error(h_errno); + DEBUGF(5,("gethostbyaddr error %d", h_errno)); + } else { + DEBUGF(5,("gethostbyaddr OK")); + } + } break; + + default: + error_num = ERRCODE_NOTSUP; + } /* switch(proto) { */ + + if (he) { + data_size = build_reply(serial, he, &reply, &reply_size); +#ifdef HAVE_GETIPNODEBYADDR + if (free_he) { + freehostent(he); + } +#endif +#ifdef HAVE_GETNAMEINFO + } else if (sa) { + struct addrinfo res; + memset(&res, 0, sizeof(res)); + res.ai_canonname = name; + res.ai_addr = sa; + res.ai_next = NULL; + data_size = build_reply_ai(serial, 16, &res, + &reply, &reply_size); + free(sa); +#endif + } else { + data_size = build_error_reply(serial, error_num, + &reply, &reply_size); + } + break; /* case OP_GETHOSTBYADR: */ + + default: + data_size = build_error_reply(serial, ERRCODE_NOTSUP, + &reply, &reply_size); + break; + } /* switch (op) { */ + +#ifdef WIN32 + m = REALLOC(m, sizeof(QueItem) - 1 + data_size - PACKET_BYTES); + m->next = NULL; + m->req_size = data_size - PACKET_BYTES; + memcpy(m->request,reply + PACKET_BYTES,data_size - PACKET_BYTES); + if (!enque_mesq(writeto,m)) { + goto fail; + } + m = NULL; +#else + write(1, reply, data_size); /* No signals expected */ +#endif + } /* for (;;) */ + +#ifdef WIN32 + fail: + if (m != NULL) { + FREE(m); + } + close_mesq(readfrom); + close_mesq(writeto); + if (reply) { + FREE(reply); + } + return 1; +#endif +} + +static int map_netdb_error(int netdb_code) +{ + switch (netdb_code) { +#ifdef HOST_NOT_FOUND + case HOST_NOT_FOUND: + return ERRCODE_HOST_NOT_FOUND; +#endif +#ifdef TRY_AGAIN + case TRY_AGAIN: + return ERRCODE_TRY_AGAIN; +#endif +#ifdef NO_RECOVERY + case NO_RECOVERY: + return ERRCODE_NO_RECOVERY; +#endif +#if defined(NO_DATA) || defined(NO_ADDRESS) +#ifdef NO_DATA + case NO_DATA: +#endif +#ifdef NO_ADDRESS +#if !defined(NO_DATA) || (NO_DATA != NO_ADDRESS) + case NO_ADDRESS: +#endif +#endif + return ERRCODE_NO_DATA; +#endif + default: + return ERRCODE_NETDB_INTERNAL; + } +} + +#if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) +static int map_netdb_error_ai(int netdb_code) +{ + switch(netdb_code) { +#ifdef EAI_ADDRFAMILY + case EAI_ADDRFAMILY: + return ERRCODE_NETDB_INTERNAL; +#endif + case EAI_AGAIN: + return ERRCODE_TRY_AGAIN; + case EAI_BADFLAGS: + return ERRCODE_NETDB_INTERNAL; + case EAI_FAIL: + return ERRCODE_HOST_NOT_FOUND; + case EAI_FAMILY: + return ERRCODE_NETDB_INTERNAL; + case EAI_MEMORY: + return ERRCODE_NETDB_INTERNAL; +#if defined(EAI_NODATA) && EAI_NODATA != EAI_NONAME + case EAI_NODATA: + return ERRCODE_HOST_NOT_FOUND; +#endif + case EAI_NONAME: + return ERRCODE_HOST_NOT_FOUND; + case EAI_SERVICE: + return ERRCODE_NETDB_INTERNAL; + case EAI_SOCKTYPE: + return ERRCODE_NETDB_INTERNAL; + default: + return ERRCODE_NETDB_INTERNAL; + } +} +#endif /* #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) */ + + +static char *errcode_to_string(int errcode) +{ + switch (errcode) { + case ERRCODE_NOTSUP: + return "enotsup"; + case ERRCODE_HOST_NOT_FOUND: + /* + * I would preffer + * return "host_not_found"; + * but have to keep compatibility with the old + * inet_gethost's error codes... + */ + return "notfound"; + case ERRCODE_TRY_AGAIN: + return "try_again"; + case ERRCODE_NO_RECOVERY: + return "no_recovery"; + case ERRCODE_NO_DATA: + return "no_data"; + default: + /*case ERRCODE_NETDB_INTERNAL:*/ + return "netdb_internal"; + } +} + +static size_t build_error_reply(SerialType serial, int errnum, + AddrByte **preply, + size_t *preply_size) +{ + char *errstring = errcode_to_string(errnum); + int string_need = strlen(errstring) + 1; /* a '\0' too */ + unsigned need; + AddrByte *ptr; + + need = PACKET_BYTES + 4 /* Serial */ + 1 /* Unit */ + string_need; + if (*preply_size < need) { + if (*preply_size == 0) { + *preply = ALLOC((*preply_size = need)); + } else { + *preply = REALLOC(*preply, + (*preply_size = need)); + } + } + ptr = *preply; + PUT_PACKET_BYTES(ptr,need - PACKET_BYTES); + ptr += PACKET_BYTES; + put_int32(ptr,serial); + ptr +=4; + *ptr++ = (AddrByte) 0; /* 4 or 16 */ + strcpy((char*)ptr, errstring); + return need; +} + + + +static size_t build_reply(SerialType serial, struct hostent *he, + AddrByte **preply, size_t *preply_size) +{ + unsigned need; + int strings_need; + int num_strings; + int num_addresses; + int i; + AddrByte *ptr; + int unit = he->h_length; + + for (num_addresses = 0; he->h_addr_list[num_addresses] != NULL; + ++num_addresses) + ; + strings_need = strlen(he->h_name) + 1; /* 1 for null byte */ + num_strings = 1; + if (he->h_aliases) { + for(i=0; he->h_aliases[i] != NULL; ++i) { + strings_need += strlen(he->h_aliases[i]) + 1; + ++num_strings; + } + } + + need = PACKET_BYTES + + 4 /* Serial */ + 1 /* Unit */ + 4 /* Naddr */ + + (unit * num_addresses) /* Address bytes */ + + 4 /* Nnames */ + strings_need /* The name and alias strings */; + + if (*preply_size < need) { + if (*preply_size == 0) { + *preply = ALLOC((*preply_size = need)); + } else { + *preply = REALLOC(*preply, + (*preply_size = need)); + } + } + ptr = *preply; + PUT_PACKET_BYTES(ptr,need - PACKET_BYTES); + ptr += PACKET_BYTES; + put_int32(ptr,serial); + ptr +=4; + *ptr++ = (AddrByte) unit; /* 4 or 16 */ + put_int32(ptr, num_addresses); + ptr += 4; + for (i = 0; i < num_addresses; ++i) { + memcpy(ptr, he->h_addr_list[i], unit); + ptr += unit; + } + put_int32(ptr, num_strings); + ptr += 4; + strcpy((char*)ptr, he->h_name); + ptr += 1 + strlen(he->h_name); + for (i = 0; i < (num_strings - 1); ++i) { + strcpy((char*)ptr, he->h_aliases[i]); + ptr += 1 + strlen(he->h_aliases[i]); + } + return need; +} + +#if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) +static size_t build_reply_ai(SerialType serial, int addrlen, + struct addrinfo *res0, + AddrByte **preply, size_t *preply_size) +{ + struct addrinfo *res; + int num_strings; + int num_addresses; + AddrByte *ptr; + int need; + + num_addresses = 0; + num_strings = 0; + need = PACKET_BYTES + + 4 /* Serial */ + 1 /* addrlen */ + + 4 /* Naddr */ + 4 /* Nnames */; + + for (res = res0; res != NULL; res = res->ai_next) { + if (res->ai_addr) { + num_addresses++; + need += addrlen; + } + if (res->ai_canonname) { + num_strings++; + need += strlen(res->ai_canonname) + 1; + } + } + + if (*preply_size < need) { + if (*preply_size == 0) { + *preply = ALLOC((*preply_size = need)); + } else { + *preply = REALLOC(*preply, + (*preply_size = need)); + } + } + + ptr = *preply; + PUT_PACKET_BYTES(ptr,need - PACKET_BYTES); + ptr += PACKET_BYTES; + put_int32(ptr,serial); + ptr +=4; + *ptr++ = (AddrByte) addrlen; /* 4 or 16 */ + put_int32(ptr, num_addresses); + ptr += 4; + for (res = res0; res != NULL && num_addresses; res = res->ai_next) { + if (res->ai_addr == NULL) + continue; + if (addrlen == 4) + memcpy(ptr, &((struct sockaddr_in *)res->ai_addr)->sin_addr, addrlen); +#ifdef AF_INET6 + else if (addrlen == 16) + memcpy(ptr, &((struct sockaddr_in6 *)res->ai_addr)->sin6_addr, addrlen); +#endif + else + memcpy(ptr, res->ai_addr->sa_data, addrlen); + ptr += addrlen; + num_addresses--; + } + put_int32(ptr, num_strings); + ptr += 4; + for (res = res0; res != NULL && num_strings; res = res->ai_next) { + if (res->ai_canonname == NULL) + continue; + strcpy((char *)ptr, res->ai_canonname); + ptr += strlen(res->ai_canonname) + 1; + num_strings--; + } + return need; +} + +#endif /* #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) */ + + + +/* + * Encode/decode/read/write + */ + +static int get_int32(AddrByte *b) +{ + int res; + res = (unsigned) b[3]; + res |= ((unsigned) b[2]) << 8; + res |= ((unsigned) b[1]) << 16; + res |= ((unsigned) b[0]) << 24; + return res; +} + +static void put_int32(AddrByte *buff, int value) +{ + buff[0] = (((unsigned) value) >> 24) & 0xFF; + buff[1] = (((unsigned) value) >> 16) & 0xFF; + buff[2] = (((unsigned) value) >> 8) & 0xFF; + buff[3] = ((unsigned) value) & 0xFF; +} +#ifdef WIN32 + +static int read_int32(HANDLE fd, int *res, HANDLE ev) +{ + AddrByte b[4]; + int r; + if ((r = read_exact(fd,b,4,ev)) < 0) { + return -1; + } else if (r == 0) { + return 0; + } else { + *res = (unsigned) b[3]; + *res |= ((unsigned) b[2]) << 8; + *res |= ((unsigned) b[1]) << 16; + *res |= ((unsigned) b[0]) << 24; + } + return 4; +} +/* + * The standard input is expected to be opened with FILE_FLAG_OVERLAPPED + * but this code should handle both cases (although winsock might not). + */ +static int read_exact(HANDLE fd, void *vbuff, DWORD nbytes, HANDLE ev) +{ + DWORD ret,got; + BOOL stat; + char *buff = vbuff; + OVERLAPPED ov; + DWORD err; + + + got = 0; + for(;;) { + memset(&ov,0,sizeof(ov)); + ov.hEvent = ev; + ResetEvent(ov.hEvent); + stat = ReadFile(fd, buff, nbytes - got, &ret, &ov); + if (!stat) { + if ((err = GetLastError()) == ERROR_IO_PENDING) { + DEBUGF(4,("Overlapped read, waiting for completion...")); + WaitForSingleObject(ov.hEvent,INFINITE); + stat = GetOverlappedResult(fd,&ov,&ret,TRUE); + DEBUGF(4,("Overlapped read, completed with status %d," + " result %d",stat,ret)); + } + if (!stat) { + if (GetLastError() == ERROR_BROKEN_PIPE) { + DEBUGF(1, ("End of file while reading from pipe.")); + return 0; + } else { + DEBUGF(1, ("Error while reading from pipe," + " errno = %d", + GetLastError())); + return -1; + } + } + } else { + DEBUGF(4,("Read completed syncronously, result %d",ret)); + } + if (ret == 0) { + DEBUGF(1, ("End of file detected as zero read from pipe.")); + return 0; + } + if (ret < nbytes - got) { + DEBUGF(4,("Not all data read from pipe, still %d bytes to read.", + nbytes - (got + ret))); + got += ret; + buff += ret; + } else { + return nbytes; + } + } +} +/* + * Now, we actually expect a HANDLE opened with FILE_FLAG_OVERLAPPED, + * but this code should handle both cases (although winsock + * does not always..) + */ +static int write_exact(HANDLE fd, AddrByte *buff, DWORD len, HANDLE ev) +{ + DWORD res,stat; + DWORD x = len; + OVERLAPPED ov; + DWORD err; + + + for(;;) { + memset(&ov,0,sizeof(ov)); + ov.hEvent = ev; + ResetEvent(ov.hEvent); + stat = WriteFile(fd,buff,x,&res,&ov); + if (!stat) { + if ((err = GetLastError()) == ERROR_IO_PENDING) { + DEBUGF(4,("Overlapped write, waiting for competion...")); + WaitForSingleObject(ov.hEvent,INFINITE); + stat = GetOverlappedResult(fd,&ov,&res,TRUE); + DEBUGF(4,("Overlapped write, completed with status %d," + " result %d",stat,res)); + } + if (!stat) { + if (GetLastError() == ERROR_BROKEN_PIPE) { + return 0; + } else { + return -1; + } + } + } else { + DEBUGF(4,("Write completed syncronously, result %d",res)); + } + + if (res < x) { + /* Microsoft states this can happen as HANDLE is a pipe... */ + DEBUGF(4,("Not all data written to pipe, still %d bytes to write.", + x - res)); + x -= res; + buff += res; + } else { + return len; + } + } +} + +DWORD WINAPI reader(void *data) { + MesQ *mq = (MesQ *) data; + QueItem *m; + int siz; + int r; + HANDLE inp; + int x = 0; + HANDLE ev = CreateEvent(NULL, TRUE, FALSE, NULL); + + inp = GetStdHandle(STD_INPUT_HANDLE); + for (;;) { + if ((r = READ_PACKET_BYTES(inp,&siz,ev)) != 4) { + DEBUGF(1,("Erlang has closed (reading)")); + exit(0); + } + DEBUGF(4,("Read packet of size %d from erlang",siz)); + m = ALLOC(sizeof(QueItem) - 1 + siz); + if (read_exact(inp, m->request, siz,ev) != siz) { + fatal("Unexpected end of file on main input, errno = %d",errno); + } + if (siz < 5) { + fatal("Unexpected message on main input, message size %d less " + "than minimum."); + } + m->req_size = siz; + m->next = NULL; + if (!enque_mesq(mq, m)) { + fatal("Reader could not talk to main thread!"); + } + } +} + +DWORD WINAPI writer(void *data) +{ + MesQ *mq = (MesQ *) data; + QueItem *m; + HANDLE outp = GetStdHandle(STD_OUTPUT_HANDLE); + AddrByte hdr[PACKET_BYTES]; + HANDLE ev = CreateEvent(NULL, TRUE, FALSE, NULL); + + + for (;;) { + WaitForSingleObject(event_mesq(mq),INFINITE); + if (!deque_mesq(mq, &m)) { + fatal("Writer could not talk to main thread!"); + } + PUT_PACKET_BYTES(hdr, m->req_size); + if (write_exact(outp, hdr, 4, ev) != 4) { + DEBUGF(1,("Erlang has closed (writing)")); + exit(0); + } + if (write_exact(outp, m->request, m->req_size, ev) != m->req_size) { + DEBUGF(1,("Erlang has closed (writing)")); + exit(0); + } + FREE(m); + } +} + + +#else + +static size_t read_int32(int fd, int *res) +{ + AddrByte b[4]; + int r; + if ((r = read_exact(fd,b,4)) < 0) { + return -1; + } else if (r == 0) { + return 0; + } else { + *res = (unsigned) b[3]; + *res |= ((unsigned) b[2]) << 8; + *res |= ((unsigned) b[1]) << 16; + *res |= ((unsigned) b[0]) << 24; + } + return 4; +} + +static ssize_t read_exact(int fd, void *vbuff, size_t nbytes) +{ + ssize_t ret, got; + char *buff = vbuff; + + got = 0; + for(;;) { + ret = read(fd, buff, nbytes - got); + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + DEBUGF(1, ("Error while reading from pipe," + " errno = %d", + errno)); + return -1; + } + } else if (ret == 0) { + DEBUGF(1, ("End of file while reading from pipe.")); + if (got == 0) { + return 0; /* "Normal" EOF */ + } else { + return -1; + } + } else if (ret < nbytes - got) { + got += ret; + buff += ret; + } else { + return nbytes; + } + } +} + +static int write_exact(int fd, AddrByte *buff, int len) +{ + int res; + int x = len; + for(;;) { + if((res = write(fd, buff, x)) == x) { + break; + } + if (res < 0) { + if (errno == EINTR) { + continue; + } else if (errno == EPIPE) { + return 0; + } +#ifdef ENXIO + else if (errno == ENXIO) { + return 0; + } +#endif + else { + return -1; + } + } else { + /* Hmmm, blocking write but not all written, could this happen + if the other end was closed during the operation? Well, + it costs very little to handle anyway... */ + x -= res; + buff += res; + } + } + return len; +} + +#endif /* !WIN32 */ + +/* + * Debug and memory allocation + */ + +static char *format_address(int siz, AddrByte *addr) +{ + static char buff[50]; + char tmp[10]; + if (siz > 16) { + return "(unknown)"; + } + *buff='\0'; + if (siz <= 4) { + while(siz--) { + sprintf(tmp,"%d",(int) *addr++); + strcat(buff,tmp); + if(siz) { + strcat(buff,"."); + } + } + return buff; + } + while(siz--) { + sprintf(tmp,"%02x",(int) *addr++); + strcat(buff,tmp); + if(siz) { + strcat(buff,":"); + } + } + return buff; +} + +static void debugf(char *format, ...) +{ + char buff[2048]; + char *ptr; + va_list ap; + + va_start(ap,format); +#ifdef WIN32 + sprintf(buff,"%s[%d] (DEBUG):",program_name,(int) GetCurrentThreadId()); +#else + sprintf(buff,"%s[%d] (DEBUG):",program_name,(int) getpid()); +#endif + ptr = buff + strlen(buff); + vsprintf(ptr,format,ap); + strcat(ptr,"\r\n"); +#ifdef WIN32 + if (debug_console_allocated != INVALID_HANDLE_VALUE) { + DWORD res; + WriteFile(debug_console_allocated,buff,strlen(buff),&res,NULL); + } +#else + write(2,buff,strlen(buff)); +#endif + va_end(ap); +} + +static void warning(char *format, ...) +{ + char buff[2048]; + char *ptr; + va_list ap; + + va_start(ap,format); + sprintf(buff,"%s[%d]: WARNING:",program_name, (int) getpid()); + ptr = buff + strlen(buff); + vsprintf(ptr,format,ap); + strcat(ptr,"\r\n"); +#ifdef WIN32 + { + DWORD res; + WriteFile(GetStdHandle(STD_ERROR_HANDLE),buff,strlen(buff),&res,NULL); + } +#else + write(2,buff,strlen(buff)); +#endif + va_end(ap); +} + +static void fatal(char *format, ...) +{ + char buff[2048]; + char *ptr; + va_list ap; + + va_start(ap,format); + sprintf(buff,"%s[%d]: FATAL ERROR:",program_name, (int) getpid()); + ptr = buff + strlen(buff); + vsprintf(ptr,format,ap); + strcat(ptr,"\r\n"); +#ifdef WIN32 + { + DWORD res; + WriteFile(GetStdHandle(STD_ERROR_HANDLE),buff,strlen(buff),&res,NULL); + } +#else + write(2,buff,strlen(buff)); +#endif + va_end(ap); +#ifndef WIN32 + kill_all_workers(); +#endif + exit(1); +} + +static void *my_malloc(size_t size) +{ + void *ptr = malloc(size); + if (!ptr) { + fatal("Cannot allocate %u bytes of memory.", (unsigned) size); + return NULL; /* lint... */ + } + return ptr; +} + +static void *my_realloc(void *old, size_t size) +{ + void *ptr = realloc(old, size); + if (!ptr) { + fatal("Cannot reallocate %u bytes of memory from %p.", + (unsigned) size, old); + return NULL; /* lint... */ + } + return ptr; +} + +#ifdef WIN32 + +BOOL create_mesq(MesQ **q) +{ + MesQ *tmp = malloc(sizeof(MesQ)); + tmp->data_present = CreateEvent(NULL, TRUE, FALSE,NULL); + if (tmp->data_present == NULL) { + free(tmp); + return FALSE; + } + InitializeCriticalSection(&(tmp->crit)); /* Cannot fail */ + tmp->shutdown = 0; + tmp->first = NULL; + tmp->last = NULL; + *q = tmp; + return TRUE; +} + +BOOL enque_mesq(MesQ *q, QueItem *m) +{ + EnterCriticalSection(&(q->crit)); + if (q->shutdown) { + LeaveCriticalSection(&(q->crit)); + return FALSE; + } + if (q->last == NULL) { + q->first = q->last = m; + } else { + q->last->next = m; + q->last = m; + } + m->next = NULL; + if (!SetEvent(q->data_present)) { + fprintf(stderr,"Fatal: Unable to signal event in %s:%d, last error: %d\n", + __FILE__,__LINE__,GetLastError()); + exit(1); /* Unable to continue at all */ + } + LeaveCriticalSection(&(q->crit)); + return TRUE; +} + +BOOL deque_mesq(MesQ *q, QueItem **m) +{ + EnterCriticalSection(&(q->crit)); + if (q->first == NULL) { /* Usually shutdown from other end */ + ResetEvent(q->data_present); + LeaveCriticalSection(&(q->crit)); + return FALSE; + } + *m = q->first; + q->first = q->first->next; + if (q->first == NULL) { + q->last = NULL; + ResetEvent(q->data_present); + } + (*m)->next = NULL; + LeaveCriticalSection(&(q->crit)); + return TRUE; +} + +BOOL close_mesq(MesQ *q) +{ + QueItem *tmp; + EnterCriticalSection(&(q->crit)); + if (!q->shutdown) { + q->shutdown = TRUE; + if (!SetEvent(q->data_present)) { + fprintf(stderr, + "Fatal: Unable to signal event in %s:%d, last error: %d\n", + __FILE__,__LINE__,GetLastError()); + exit(1); /* Unable to continue at all */ + } + LeaveCriticalSection(&(q->crit)); + return FALSE; + } + /* Noone else is supposed to use this object any more */ + LeaveCriticalSection(&(q->crit)); + DeleteCriticalSection(&(q->crit)); + CloseHandle(q->data_present); + tmp = q->first; + while(tmp) { + q->first = q->first->next; + free(tmp); + tmp = q->first; + } + free(q); + return TRUE; +} + +HANDLE event_mesq(MesQ *q) +{ + return q->data_present; +} + +#ifdef HARDDEBUG +DWORD WINAPI pseudo_worker_loop(void *v) +{ + HOSTENT *hep; + + DEBUGF(1,("gethostbyname(\"ftp.funet.fi\") starting")); + hep = gethostbyname("ftp.funet.fi"); + + DEBUGF(1,("gethostbyname(\"ftp.funet.fi\") -> %d OK",(int) hep)); + return 0; +} + +static void poll_gethost(int row) { + HANDLE h; + DWORD tid; + h = (HANDLE) _beginthreadex(NULL, 0, pseudo_worker_loop, NULL, 0, &tid); + if (h == NULL) { + DEBUGF(1,("Failed to spawn pseudo worker (%d)...",row)); + } else { + DEBUGF(1,("Waiting for pseudo worker (%d)", row)); + WaitForSingleObject(h,INFINITE); + DEBUGF(1,("Done Waiting for pseudo worker (%d)", row)); + } +} +#endif + +#endif /* WIN32 */ -- cgit v1.2.3