aboutsummaryrefslogtreecommitdiffstats
path: root/erts/etc/common/inet_gethost.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/etc/common/inet_gethost.c')
-rw-r--r--erts/etc/common/inet_gethost.c2757
1 files changed, 2757 insertions, 0 deletions
diff --git a/erts/etc/common/inet_gethost.c b/erts/etc/common/inet_gethost.c
new file mode 100644
index 0000000000..ff16ee02c4
--- /dev/null
+++ b/erts/etc/common/inet_gethost.c
@@ -0,0 +1,2757 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 1998-2009. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+/*
+ * Erlang port program to do the name service lookup for the erlang
+ * distribution and inet part of the kernel.
+ * A pool of subprocess is kept, to which a pair of pipes is connected.
+ * The main process schedules requests among the different subprocesses
+ * (created with fork()), to be able to handle as many requests as possible
+ * simultaneously. The controlling erlang machine may request a "cancel",
+ * in which case the process may be killed and restarted when the need arises.
+ * The single numeric parameter to this program is the maximum port pool size,
+ * which is the size of the bookkeeping array.
+ *
+ * Windows:
+ * There is instead of a pool of processes a pool of threads.
+ * Communication is not done through pipes but via message queues between
+ * the threads. The only "pipes" involved are the ones used for communicating
+ * with Erlang.
+ * Important note:
+ * For unknown reasons, the combination of a thread doing blocking I/O on
+ * a named pipe at the same time as another thread tries to resolve a hostname
+ * may (with certain software configurations) block the gethostbyname call (!)
+ * For that reason, standard input (and standard output) should be opened
+ * in asynchronous mode (FILE_FLAG_OVERLAPPED), which has to be done by Erlang.
+ * A special flag to open_port is used to work around this behaviour in winsock
+ * and the threads doing read and write handle asynchronous I/O.
+ * The ReadFile and WriteFile calls try to cope with both types of I/O, why
+ * the code is not really as Microsoft describes "the right way to do it" in
+ * their documentation. Important to note is that **there is no supported way
+ * to retrieve the information if the HANDLE was opened with
+ * FILE_FLAG_OVERLAPPED from the HANDLE itself**.
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#ifdef WIN32
+
+#define WIN32_LEAN_AND_MEAN
+#include <winsock2.h>
+#include <windows.h>
+#include <process.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+/* These are not used even if they would exist which they should not */
+#undef HAVE_GETADDRINFO
+#undef HAVE_GETIPNODEBYNAME
+#undef HAVE_GETHOSTBYNAME2
+#undef HAVE_GETNAMEINFO
+#undef HAVE_GETIPNODEBYADDR
+
+#else /* Unix */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdarg.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <errno.h>
+#include <signal.h>
+
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#else
+#include <time.h>
+#endif
+#include <sys/times.h>
+
+#ifndef RETSIGTYPE
+#define RETSIGTYPE void
+#endif
+
+/* To simplify #ifdef code further down - select only one to be defined...
+** Use them in pairs - if one is broken do not trust its mate.
+**/
+#if defined(HAVE_GETADDRINFO) && defined(HAVE_GETNAMEINFO)
+#undef HAVE_GETIPNODEBYNAME
+#undef HAVE_GETIPNODEBYADDR
+#undef HAVE_GETHOSTBYNAME2
+#elif defined(HAVE_GETIPNODEBYNAME) && defined(HAVE_GETIPNODEBYADDR)
+#undef HAVE_GETADDRINFO
+#undef HAVE_GETNAMEINFO
+#undef HAVE_GETHOSTBYNAME2
+#else
+#undef HAVE_GETIPNODEBYNAME
+#undef HAVE_GETIPNODEBYADDR
+#undef HAVE_GETADDRINFO
+#undef HAVE_GETNAMEINFO
+#endif
+
+#endif /* !WIN32 */
+
+#define PACKET_BYTES 4
+#ifdef WIN32
+#define READ_PACKET_BYTES(X,Y,Z) read_int32((X),(Y),(Z))
+#else
+#define READ_PACKET_BYTES(X,Y) read_int32((X),(Y))
+#endif
+#define PUT_PACKET_BYTES(X,Y) put_int32((X),(Y))
+/* The serial numbers of the requests */
+typedef int SerialType;
+
+#define INVALID_SERIAL -1
+
+/* The operations performed by this program */
+typedef unsigned char OpType;
+
+#define OP_GETHOSTBYNAME 1
+#define OP_GETHOSTBYADDR 2
+#define OP_CANCEL_REQUEST 3
+#define OP_CONTROL 4
+
+/* The protocol (IPV4/IPV6) */
+typedef unsigned char ProtoType;
+
+#define PROTO_IPV4 1
+#define PROTO_IPV6 2
+
+/* OP_CONTROL */
+typedef unsigned char CtlType;
+#define SETOPT_DEBUG_LEVEL 0
+
+/* The unit of an IP address (0 == error, 4 == IPV4, 16 == IPV6) */
+typedef unsigned char UnitType;
+
+#define UNIT_ERROR 0
+#define UNIT_IPV4 4
+#define UNIT_IPV6 16
+
+/* And the byte type */
+typedef unsigned char AddrByte; /* Must be compatible with character
+ datatype */
+
+/*
+ * Marshalled format of request:
+ *{
+ * Serial: 32 bit big endian
+ * Op:8 bit [1,2,3]
+ * If op == 1 {
+ * Proto:8 bit [1,2]
+ * Str: Null terminated array of characters
+ * } Else if op == 2 {
+ * Proto:8 bit [1,2]
+ * If proto == 1 {
+ * B0..B3: 4 bytes, most significant first
+ * } Else (proto == 2) {
+ * B0..B15: 16 bytes, most significant first
+ * }
+ * }
+ * (No more if op == 3)
+ *}
+ * The request arrives as a packet, with 4 packet size bytes.
+ */
+
+/* The main process unpackes the marshalled message and sends the data
+ * to a suitable port process or, in the case of a close request, kills the
+ * suitable port process. There is also a que of requests linked together,
+ * for when all subrocesses are busy.
+ */
+
+typedef struct QueItem {
+ struct QueItem *next;
+ int req_size;
+ AddrByte request[1];
+} QueItem; /* Variable size due to request's variable size */
+
+QueItem *que_first;
+QueItem *que_last;
+
+#ifdef WIN32
+typedef struct mesq {
+ HANDLE data_present;
+ CRITICAL_SECTION crit;
+ int shutdown;
+ QueItem *first;
+ QueItem *last;
+} MesQ;
+
+MesQ *to_erlang;
+MesQ *from_erlang;
+#endif
+
+/*
+ * Marshalled format of reply:
+ *{
+ * Serial: 32 bit big endian
+ * Unit: 8 bit, same as h_length or 0 for error
+ * if unit == 0 {
+ * Str: Null terminated character string explaining the error
+ * } else {
+ * Naddr: 32 bit big endian
+ * if unit = 4 {
+ * (B0..B3)0..(B0..B3)Naddr-1: Naddr*4 bytes most significant first
+ * } else if unit == 16 {
+ * (B0..B15)0..(B0..B15)Naddr-1: Naddr*16 bytes most significant first
+ * }
+ * Nnames: 32 bit big endian >= 1
+ * Name0: Null terminated string of characters
+ * Alias[0]..Alias[Nnames - 2]: Nnames - 1 Null terminated strings of chars
+ * }
+ *}
+ * Four packet size bytes prepended (big endian)
+ */
+/* Internal error codes */
+#define ERRCODE_NOTSUP 1
+#define ERRCODE_HOST_NOT_FOUND 2
+#define ERRCODE_TRY_AGAIN 3
+#define ERRCODE_NO_RECOVERY 4
+#define ERRCODE_NO_DATA 5
+#define ERRCODE_NETDB_INTERNAL 7
+
+/*
+ * Each worker process is represented in the parent by the following struct
+ */
+
+typedef unsigned WorkerState;
+
+#define WORKER_EMPTY 0 /* No process created */
+#define WORKER_FREE 1 /* Living waiting process */
+#define WORKER_BUSY 2 /* Living busy process */
+#define WORKER_STALLED 3 /* Living cancelled process */
+
+/* The timeout when killing a child process in seconds*/
+#define CHILDWAIT_TMO 1
+/* The domainname size_limit */
+#define DOMAINNAME_MAX 258 /* 255 + Opcode + Protocol + Null termination */
+
+typedef struct {
+ WorkerState state;
+#ifdef WIN32
+ DWORD pid; /* 0 if unused */
+ MesQ *writeto; /* Message queues */
+ MesQ *readfrom;
+#else
+ pid_t pid; /* -1 if unused */
+ int writeto, readfrom; /* Pipes */
+#endif
+ SerialType serial;
+ AddrByte domain[DOMAINNAME_MAX];
+ QueItem *que_first;
+ QueItem *que_last;
+ int que_size;
+} Worker;
+
+int num_busy_workers;
+int num_free_workers;
+int num_stalled_workers;
+int max_workers;
+int greedy_threshold;
+Worker *busy_workers; /* Workers doing any job that someone really is
+ interested in */
+Worker *free_workers; /* Really free workers */
+Worker *stalled_workers; /* May still deliver answers which we will
+ discard */
+#define BEE_GREEDY() (num_busy_workers >= greedy_threshold)
+
+static char *program_name;
+
+static int debug_level;
+#ifdef WIN32
+static HANDLE debug_console_allocated = INVALID_HANDLE_VALUE;
+#endif
+
+#ifdef NODEBUG
+#define DEBUGF(L,P) /* Nothing */
+#else
+#define DEBUGF(Level,Printf) do { if (debug_level >= (Level)) \
+ debugf Printf;} while(0)
+#endif
+#define ALLOC(Size) my_malloc(Size)
+#define REALLOC(Old, Size) my_realloc((Old), (Size))
+#define FREE(Ptr) free(Ptr)
+
+#ifdef WIN32
+#define WAKEUP_WINSOCK() do { \
+ char dummy_buff[100]; \
+ gethostname(dummy_buff,99); \
+} while (0)
+#endif
+
+/* The internal prototypes */
+static char *format_address(int siz, AddrByte *addr);
+static void debugf(char *format, ...);
+static void warning(char *format, ...);
+static void fatal(char *format, ...);
+static void *my_malloc(size_t size);
+static void *my_realloc(void *old, size_t size);
+static int get_int32(AddrByte *buff);
+static void put_int32(AddrByte *buff, int value);
+static int create_worker(Worker *pworker, int save_que);
+static int map_netdb_error(int netdb_code);
+#if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO)
+static int map_netdb_error_ai(int netdb_code);
+#endif
+static char *errcode_to_string(int errcode);
+static size_t build_error_reply(SerialType serial, int errnum,
+ AddrByte **preply,
+ size_t *preply_size);
+#ifdef HAVE_GETADDRINFO
+static size_t build_reply_ai(SerialType serial, int, struct addrinfo *,
+ AddrByte **preply, size_t *preply_size);
+#endif
+static size_t build_reply(SerialType serial, struct hostent *he,
+ AddrByte **preply, size_t *preply_size);
+static int read_request(AddrByte **buff, size_t *buff_size);
+static OpType get_op(AddrByte *buff);
+static AddrByte *get_op_addr(AddrByte *buff);
+static SerialType get_serial(AddrByte *buff);
+static ProtoType get_proto(AddrByte *buff);
+static CtlType get_ctl(AddrByte *buff);
+static AddrByte *get_data(AddrByte *buff);
+static int get_debug_level(AddrByte *buff);
+static int relay_reply(Worker *pw);
+static int ignore_reply(Worker *pw);
+static void init_workers(int max);
+static void kill_worker(Worker *pw);
+static Worker *pick_worker(void);
+static void kill_last_picked_worker(void);
+static void stall_worker(SerialType serial);
+static int handle_io_busy(int ndx);
+static int handle_io_free(int ndx);
+static int handle_io_stalled(int ndx);
+static void check_que(void);
+static void main_loop(void);
+static void usage(char *unknown);
+static void domaincopy(AddrByte *out,AddrByte *in);
+static int domaineq(AddrByte *d1, AddrByte *d2);
+static int get_domainname(AddrByte *inbuff, int insize, AddrByte *domainbuff);
+static Worker *pick_worker_greedy(AddrByte *domainbuff);
+static void restart_worker(Worker *w);
+static void start_que_request(Worker *w) ;
+#ifdef WIN32
+static int read_int32(HANDLE fd, int *res, HANDLE ev);
+static int read_exact(HANDLE fd, void *vbuff, DWORD nbytes, HANDLE ev);
+static int write_exact(HANDLE fd, AddrByte *buff, DWORD len,HANDLE ev);
+DWORD WINAPI worker_loop(void *v);
+DWORD WINAPI reader(void *data);
+DWORD WINAPI writer(void *data);
+static int send_mes_to_worker(QueItem *m, Worker *pw);
+BOOL create_mesq(MesQ **q);
+BOOL enque_mesq(MesQ *q, QueItem *m);
+BOOL deque_mesq(MesQ *q, QueItem **m);
+BOOL close_mesq(MesQ *q);
+HANDLE event_mesq(MesQ *q);
+#else
+static size_t read_int32(int fd, int *res);
+static ssize_t read_exact(int fd, void *vbuff, size_t nbytes);
+static int write_exact(int fd, AddrByte *buff, int len);
+void reap_children(int ignored);
+static void init_signals(void);
+static void kill_all_workers(void);
+static void close_all_worker_fds(void);
+static int worker_loop(void);
+static int fillin_reply(Worker *pw);
+static int send_request_to_worker(AddrByte *pr, int rsize, Worker *pw);
+#endif
+
+#define ERL_DBG_LVL_ENV_VAR "ERL_INET_GETHOST_DEBUG"
+
+static int
+get_env_debug_level(void)
+{
+#ifdef __WIN32__
+ char value[21]; /* Enough for any 64-bit values */
+ DWORD sz = GetEnvironmentVariable((LPCTSTR) ERL_DBG_LVL_ENV_VAR,
+ (LPTSTR) value,
+ (DWORD) sizeof(value));
+ if (sz == 0 || sz > sizeof(value))
+ return 0;
+#else
+ char *value = getenv(ERL_DBG_LVL_ENV_VAR);
+ if (!value)
+ return 0;
+#endif
+ return atoi(value);
+}
+
+#ifdef WIN32
+static void do_allocate_console(void)
+{
+ AllocConsole();
+ debug_console_allocated = CreateFile ("CONOUT$", GENERIC_WRITE,
+ FILE_SHARE_WRITE, NULL,
+ OPEN_EXISTING,
+ FILE_ATTRIBUTE_NORMAL, NULL);
+}
+#ifdef HARDDEBUG
+DWORD WINAPI pseudo_worker_loop(void *v);
+static void poll_gethost(int row);
+#endif
+#endif
+
+/*
+ * Main
+ */
+int main(int argc, char **argv)
+{
+ int num_workers = 1;
+ char **ap = argv + 1;
+ int x;
+ int disable_greedy = 0;
+
+ program_name = *argv;
+ que_first = que_last = NULL;
+ debug_level = get_env_debug_level();
+ greedy_threshold = 0;
+
+ while (*ap) {
+ if (!strcmp(*ap, "-d")) {
+ ++debug_level;
+ } else if(!strcmp(*ap, "-g") && *(ap + 1)) {
+ ++ap;
+ x = atoi(*ap);
+ if (!x) {
+ usage(*ap);
+ } else {
+ greedy_threshold = x;
+ }
+ } else if(!strcmp(*ap, "-ng")) {
+ disable_greedy = 1;
+ } else {
+ x = atoi(*ap);
+ if (!x) {
+ usage(*ap);
+ } else {
+ num_workers = x;
+ }
+ }
+ ++ap;
+ }
+
+#ifdef WIN32
+ if (num_workers > 60 || greedy_threshold > 60) {
+ usage("More than 60 workers on windows impossible!");
+ num_workers = 60;
+ greedy_threshold = 0;
+ }
+#endif
+
+ if(!greedy_threshold) {
+ greedy_threshold = (3*num_workers)/4; /* 75% */
+ if (!greedy_threshold) {
+ greedy_threshold = num_workers;
+ }
+ }
+
+ if (disable_greedy) {
+ greedy_threshold = num_workers + 1;
+ }
+
+#ifdef WIN32
+ {
+ WORD wr;
+ WSADATA wsa_data;
+ int wsa_error;
+ wr = MAKEWORD(2,0);
+
+ wsa_error = WSAStartup(wr,&wsa_data);
+ if (wsa_error) {
+ fatal("Could not open usable winsock library.");
+ }
+ if (LOBYTE(wsa_data.wVersion) != 2 || HIBYTE(wsa_data.wVersion) != 0) {
+ fatal("Could not open recent enough winsock library.");
+ }
+
+ if (debug_level >= 1) {
+ do_allocate_console();
+
+ DEBUGF(1,("num_workers = %d, greedy_threshold = %d, "
+ "debug_level = %d.",
+ num_workers, greedy_threshold, debug_level));
+ }
+ }
+ WAKEUP_WINSOCK(); /* Why on earth is this needed? */
+
+#endif
+
+ init_workers(num_workers);
+
+ main_loop();
+#ifndef WIN32
+ kill_all_workers();
+#endif
+ return 0;
+}
+
+static void usage(char *unknown)
+{
+ fprintf(stderr,"%s: Unknown option \"%s\"\n"
+ "Usage: %s [-d [-d ...]] [-g <greedy threshold>] "
+ "[<number of workers>]\n",
+ program_name, unknown, program_name);
+}
+
+/*
+ * Main process main loop
+ */
+
+static int handle_io_busy(int ndx)
+{
+ /* Probably an answer */
+ int res;
+ res = relay_reply(&busy_workers[ndx]);
+ if (res < 0) {
+ /* Bad worker */
+ if (busy_workers[ndx].que_size) {
+ restart_worker(&busy_workers[ndx]);
+ start_que_request(&busy_workers[ndx]);
+ return 0;
+ } else {
+ kill_worker(&busy_workers[ndx]);
+ --num_busy_workers;
+ busy_workers[ndx] = busy_workers[num_busy_workers];
+ }
+ return 1;
+ } else if (res == 0) {
+ /* Erlang has closed */
+ return -1;
+ } else {
+ if (busy_workers[ndx].que_size) {
+ start_que_request(&busy_workers[ndx]);
+ return 0;
+ }
+ /* The worker is no longer busy, it should be in the free list */
+ free_workers[num_free_workers] = busy_workers[ndx];
+ free_workers[num_free_workers].state = WORKER_FREE;
+ ++num_free_workers;
+ --num_busy_workers;
+ busy_workers[ndx] = busy_workers[num_busy_workers];
+ return 1;
+ }
+}
+
+static int handle_io_free(int ndx)
+{
+ /* IO from a free worker means "kill me" */
+ DEBUGF(1,("Free worker[%ld] spontaneously died.",
+ (long) free_workers[ndx].pid));
+ kill_worker(&free_workers[ndx]);
+ --num_free_workers;
+ free_workers[ndx] = free_workers[num_free_workers];
+ return 1;
+}
+
+static int handle_io_stalled(int ndx)
+{
+ int res;
+ res = ignore_reply(&stalled_workers[ndx]);
+ if (res <= 0) {
+ /* Bad worker */
+ kill_worker(&stalled_workers[ndx]);
+ --num_stalled_workers;
+ stalled_workers[ndx] = stalled_workers[num_stalled_workers];
+ return 1;
+ } else {
+ DEBUGF(3,("Ignoring reply from stalled worker[%ld].",
+ (long) stalled_workers[ndx].pid));
+ free_workers[num_free_workers] = stalled_workers[ndx];
+ free_workers[num_free_workers].state = WORKER_FREE;
+ ++num_free_workers;
+ --num_stalled_workers;
+ stalled_workers[ndx] = stalled_workers[num_stalled_workers];
+ return 1;
+ }
+}
+
+static void check_que(void)
+{
+ /* Check if anything in the que can be handled */
+ Worker *cw;
+
+ while (que_first) {
+ QueItem *qi,*nxt;
+ qi = que_first;
+ nxt = qi->next; /* Need to save before it's getting put in another que
+ in threaded solution */
+ if ((cw = pick_worker()) == NULL) {
+ break;
+ }
+#ifdef WIN32
+ {
+ SerialType save_serial = get_serial(que_first->request);
+ if (send_mes_to_worker(que_first, cw) != 0) {
+ kill_last_picked_worker();
+ continue;
+ }
+ cw->serial = save_serial;
+ }
+#else
+ if (send_request_to_worker(que_first->request,
+ que_first->req_size, cw) != 0) {
+ /* Couldn't send request, kill the worker and retry */
+ kill_last_picked_worker();
+ continue;
+ }
+ cw->serial = get_serial(que_first->request);
+#endif
+ /* Went well, lets deque */
+ que_first = nxt;
+ if (que_first == NULL) {
+ que_last = NULL;
+ }
+ DEBUGF(3,("Did deque serial %d, Que is %sempty",
+ get_serial(qi->request), (que_first) ? "not " : ""));
+#ifndef WIN32
+ FREE(qi);
+#endif
+ }
+}
+
+static int clean_que_of(SerialType s)
+{
+ QueItem **qi;
+ int i;
+
+ for(qi=&que_first;*qi != NULL &&
+ s != get_serial((*qi)->request); qi = &((*qi)->next))
+ ;
+ if(*qi != NULL) {
+ QueItem *r = *qi;
+ *qi = (*qi)->next;
+ FREE(r);
+ if(que_last == r) {
+ /* Lost the "last" pointer, should be very uncommon
+ if the que is not empty, so we simply do a traversal
+ to reclaim it. */
+ if (que_first == NULL) {
+ que_last = NULL;
+ } else {
+ for (que_last=que_first;que_last->next != NULL;
+ que_last = que_last->next)
+ ;
+ }
+ }
+ DEBUGF(3,("Removing serial %d from global que on request, "
+ "que %sempty",s, (que_first) ? "not " : ""));
+ return 1;
+ }
+ for (i = 0; i < num_busy_workers; ++i) {
+ for(qi=&(busy_workers[i].que_first);*qi != NULL &&
+ s != get_serial((*qi)->request); qi = &((*qi)->next))
+ ;
+ if(*qi != NULL) {
+ QueItem *r = *qi;
+ *qi = (*qi)->next;
+ FREE(r);
+ if(busy_workers[i].que_last == r) {
+ /* Lost the "last" pointer, should be very uncommon
+ if the que is not empty, so we simply do a traversal
+ to reclaim it. */
+ if (busy_workers[i].que_first == NULL) {
+ busy_workers[i].que_last = NULL;
+ if (busy_workers[i].que_size != 1) {
+ fatal("Worker que size counter incorrect, internal datastructure error.");
+ }
+ } else {
+ for (busy_workers[i].que_last = busy_workers[i].que_first;
+ busy_workers[i].que_last->next != NULL;
+ busy_workers[i].que_last = busy_workers[i].que_last->next)
+ ;
+ }
+ }
+ --(busy_workers[i].que_size);
+ DEBUGF(3,("Removing serial %d from worker[%ld] specific que "
+ "on request, que %sempty",
+ s, (long) busy_workers[i].pid,
+ (busy_workers[i].que_first) ? "not " : ""));
+ return 1;
+ }
+ }
+ return 0;
+}
+
+static void main_loop(void)
+{
+ AddrByte *inbuff = NULL;
+ int insize;
+ int i,w;
+#ifdef WIN32
+ HANDLE handles[64];
+ DWORD num_handles;
+ DWORD index;
+ QueItem *qi;
+#else
+ size_t inbuff_size = 0;
+ fd_set fds;
+ int max_fd;
+#endif
+ int new_data;
+ int save_serial;
+ /* It's important that the free workers list is handled first */
+ Worker *workers[3] = {free_workers, busy_workers, stalled_workers};
+ int *wsizes[3] = {&num_free_workers, &num_busy_workers,
+ &num_stalled_workers};
+ int (*handlers[3])(int) = {&handle_io_free, &handle_io_busy,
+ &handle_io_stalled};
+ Worker *cw;
+ AddrByte domainbuff[DOMAINNAME_MAX];
+
+#ifdef WIN32
+
+ {
+ DWORD dummy;
+ /* Create the reader and writer */
+ if ((!create_mesq(&to_erlang)) || (!create_mesq(&from_erlang))) {
+ fatal("Could not create message que! errno = %d.",GetLastError());
+ }
+ if (((HANDLE) _beginthreadex(NULL,0,writer,to_erlang,0,&dummy))
+ == NULL) {
+ fatal("Could not create writer thread! errno = %d.",GetLastError());
+ }
+ if (((HANDLE) _beginthreadex(NULL,0,reader,from_erlang,0,&dummy))
+ == NULL) {
+ fatal("Could not create reader thread! errno = %d.",GetLastError());
+ }
+ DEBUGF(4,("Created reader and writer threads."));
+#ifdef HARDDEBUG
+ poll_gethost(__LINE__);
+#endif
+ }
+#endif
+
+ for(;;) {
+#ifdef WIN32
+ num_handles = 0;
+ handles[num_handles++] = event_mesq(from_erlang);
+ for (w = 0; w < 3; ++w) {
+ for (i = 0; i < *wsizes[w]; ++i) {
+ handles[num_handles++] = event_mesq(workers[w][i].readfrom);
+ }
+ }
+
+ if ((index = WaitForMultipleObjects(num_handles, handles, FALSE, INFINITE))
+ == WAIT_FAILED) {
+ fatal("Could not WaitForMultpleObjects! errno = %d.",GetLastError());
+ }
+ w = 0;
+ index -= WAIT_OBJECT_0;
+
+ DEBUGF(4,("Got data on index %d.",index));
+ if (index > 0) {
+ if (((int)index - 1) < *wsizes[0]) {
+ (*handlers[0])(index - 1);
+ } else if (((int)index - 1) < ((*wsizes[0]) + (*wsizes[1]))) {
+ (*handlers[1])(index - 1 - (*wsizes[0]));
+ } else {
+ (*handlers[2])(index - 1 - (*wsizes[0]) - (*wsizes[1]));
+ }
+ }
+ new_data = (index == 0);
+#else
+ max_fd = 0;
+ FD_ZERO(&fds);
+ FD_SET(0,&fds);
+ for (w = 0; w < 3; ++w) {
+ for (i = 0; i < *wsizes[w]; ++i) {
+ FD_SET(workers[w][i].readfrom,&fds);
+ if (workers[w][i].readfrom > max_fd) {
+ max_fd = workers[w][i].readfrom;
+ }
+ }
+ }
+ for (;;) {
+ if (select(max_fd + 1,&fds,NULL,NULL,NULL) < 0) {
+ if (errno == EINTR) {
+ continue;
+ } else {
+ fatal("Select failed (invalid internal structures?), "
+ "errno = %d.",errno);
+ }
+ }
+ break;
+ }
+ for (w = 0; w < 3; ++w) {
+ for (i = 0; i < *wsizes[w]; ++i) {
+ if (FD_ISSET(workers[w][i].readfrom, &fds)) {
+ int hres = (*handlers[w])(i);
+ if (hres < 0) {
+ return;
+ } else {
+ i -= hres; /* We'll retry this position, if hres == 1.
+ The position is usually
+ replaced with another worker,
+ a worker with
+ I/O usually changes state as we
+ use blocking file I/O */
+ }
+ }
+ }
+ }
+ new_data = FD_ISSET(0,&fds);
+
+#endif
+
+ check_que();
+
+ /* Now check for new requests... */
+ if (new_data) { /* Erlang... */
+ OpType op;
+#ifdef WIN32
+ if (!deque_mesq(from_erlang,&qi)) {
+ DEBUGF(1,("Erlang has closed."));
+ return;
+ }
+ insize = qi->req_size;
+ inbuff = qi->request;
+ DEBUGF(4,("Got data from erlang."));
+ DEBUGF(4,("OPeration == %d.",get_op(inbuff)));
+#else
+ insize = read_request(&inbuff, &inbuff_size);
+ if (insize == 0) { /* Other errors taken care of in
+ read_request */
+ DEBUGF(1,("Erlang has closed."));
+ return;
+ }
+#endif
+ op = get_op(inbuff);
+ if (op == OP_CANCEL_REQUEST) {
+ SerialType serial = get_serial(inbuff);
+ if (!clean_que_of(serial)) {
+ for (i = 0; i < num_busy_workers; ++i) {
+ if (busy_workers[i].serial == serial) {
+ if (busy_workers[i].que_size) {
+ restart_worker(&busy_workers[i]);
+ start_que_request(&busy_workers[i]);
+ } else {
+ stall_worker(i);
+ check_que();
+ }
+ break;
+ }
+ }
+ }
+#ifdef WIN32
+ FREE(qi);
+#endif
+ continue; /* New select */
+ } else if (op == OP_CONTROL) {
+ CtlType ctl;
+ SerialType serial = get_serial(inbuff);
+ if (serial != INVALID_SERIAL) {
+ fatal("Invalid serial: %d.", serial);
+ }
+ switch (ctl = get_ctl(inbuff)) {
+ case SETOPT_DEBUG_LEVEL:
+ {
+ int tmp_debug_level = get_debug_level(inbuff);
+#ifdef WIN32
+ if (debug_console_allocated == INVALID_HANDLE_VALUE &&
+ tmp_debug_level > 0) {
+ DWORD res;
+ do_allocate_console();
+ WriteFile(debug_console_allocated,
+ "Hej\n",4,&res,NULL);
+ }
+#endif
+ debug_level = tmp_debug_level;
+ DEBUGF(debug_level, ("debug_level = %d", debug_level));
+ for (w = 0; w < 3; ++w) {
+ for (i = 0; i < *wsizes[w]; i++) {
+ int res;
+#ifdef WIN32
+ QueItem *m;
+#endif
+ cw = &(workers[w][i]);
+#ifdef WIN32
+ m = ALLOC(sizeof(QueItem) - 1 + qi->req_size);
+ memcpy(m->request, qi->request,
+ (m->req_size = qi->req_size));
+ m->next = NULL;
+ if ((res = send_mes_to_worker(m, cw)) != 0) {
+ FREE(m);
+ }
+#else
+ res = send_request_to_worker(inbuff, insize, cw);
+#endif
+ if (res != 0) {
+ kill_worker(cw);
+ (*wsizes[w])--;
+ *cw = workers[w][*wsizes[w]];
+ }
+ }
+ }
+ }
+ break;
+ default:
+ warning("Unknown control requested from erlang (%d), "
+ "message discarded.", (int) ctl);
+ break;
+ }
+#ifdef WIN32
+ FREE(qi);
+#endif
+ continue; /* New select */
+ } else {
+ ProtoType proto;
+ if (op != OP_GETHOSTBYNAME && op != OP_GETHOSTBYADDR) {
+ warning("Unknown operation requested from erlang (%d), "
+ "message discarded.", op);
+#ifdef WIN32
+ FREE(qi);
+#endif
+ continue;
+ }
+ if ((proto = get_proto(inbuff)) != PROTO_IPV4 &&
+ proto != PROTO_IPV6) {
+ warning("Unknown protocol requested from erlang (%d), "
+ "message discarded.", proto);
+#ifdef WIN32
+ FREE(qi);
+#endif
+ continue;
+ }
+ if (get_domainname(inbuff,insize,domainbuff) < 0) {
+ warning("Malformed message sent from erlang, no domain, "
+ "message discarded.", op);
+#ifdef WIN32
+ FREE(qi);
+#endif
+ continue;
+ }
+ }
+
+ if (BEE_GREEDY()) {
+ DEBUGF(4,("Beeing greedy!"));
+ if ((cw = pick_worker_greedy(domainbuff)) != NULL) {
+ /* Put it in the worker specific que if the
+ domainname matches... */
+#ifndef WIN32
+ QueItem *qi = ALLOC(sizeof(QueItem) - 1 +
+ insize);
+ qi->req_size = insize;
+ memcpy(&(qi->request), inbuff, insize);
+ qi->next = NULL;
+#endif
+ if (!cw->que_first) {
+ cw->que_first = cw->que_last = qi;
+ } else {
+ cw->que_last->next = qi;
+ cw->que_last = qi;
+ }
+ ++(cw->que_size);
+ continue;
+ }
+ /* Otherwise busyness as usual */
+ }
+
+ save_serial = get_serial(inbuff);
+
+ while ((cw = pick_worker()) != NULL) {
+ int res;
+#ifdef WIN32
+ res = send_mes_to_worker(qi,cw);
+#else
+ res = send_request_to_worker(inbuff, insize, cw);
+#endif
+ if (res == 0) {
+ break;
+ } else {
+ kill_last_picked_worker();
+ }
+ }
+
+ if (cw == NULL) {
+ /* Insert into que */
+#ifndef WIN32
+ QueItem *qi = ALLOC(sizeof(QueItem) - 1 +
+ insize);
+ qi->req_size = insize;
+ memcpy(&(qi->request), inbuff, insize);
+ qi->next = NULL;
+#endif
+ if (!que_first) {
+ que_first = que_last = qi;
+ } else {
+ que_last->next = qi;
+ que_last = qi;
+ }
+ } else {
+ cw->serial = save_serial;
+ domaincopy(cw->domain, domainbuff);
+ }
+ }
+ }
+}
+
+/*
+ * Main process worker administration
+ */
+
+static void init_workers(int max)
+{
+ max_workers = max;
+ num_busy_workers = 0;
+ num_free_workers = 0;
+ num_stalled_workers = 0;
+
+ busy_workers = ALLOC(sizeof(Worker) * max_workers);
+ free_workers = ALLOC(sizeof(Worker) * max_workers);
+ stalled_workers = ALLOC(sizeof(Worker) * max_workers);
+#ifndef WIN32
+ init_signals();
+#endif
+}
+
+#ifdef WIN32
+static void kill_worker(Worker *pw)
+{
+ /* Cannot really kill a thread in win32, have to just leave it to die */
+ close_mesq(pw->writeto);
+ close_mesq(pw->readfrom);
+ pw->state = WORKER_EMPTY;
+}
+#else
+static void kill_worker(Worker *pw)
+{
+ fd_set fds;
+ struct timeval tmo;
+ int selret;
+ static char buff[1024];
+
+ DEBUGF(3,("Killing worker[%ld] with fd %d, serial %d",
+ (long) pw->pid,
+ (int) pw->readfrom,
+ (int) pw->serial));
+ kill(pw->pid, SIGUSR1);
+ /* This is all just to check that the child died, not
+ really necessary */
+ for(;;) {
+ FD_ZERO(&fds);
+ FD_SET(pw->readfrom, &fds);
+ tmo.tv_usec=0;
+ tmo.tv_sec = CHILDWAIT_TMO;
+ selret = select(pw->readfrom+1, &fds, NULL, NULL, &tmo);
+ if (selret < 0) {
+ if (errno != EINTR) {
+ warning("Unable to select on dying child file descriptor, "
+ "errno = %d.",errno);
+ break;
+ }
+ } else if (selret == 0) {
+ warning("Timeout waiting for child process to die, "
+ "ignoring child (pid = %d).", pw->pid);
+ break;
+ } else {
+ int ret;
+ if ((ret = read(pw->readfrom, buff, 1024)) < 0) {
+ if (errno != EINTR) {
+ warning("Child file descriptor not closed properly, "
+ "errno = %d", errno);
+ break;
+ }
+ } else if (ret == 0) {
+ break;
+ }
+ /* continue */
+ }
+ }
+ /* Waiting is done by signal handler... */
+ close(pw->readfrom);
+ close(pw->writeto);
+ pw->state = WORKER_EMPTY;
+ /* Leave rest as is... */
+}
+
+static void kill_all_workers(void)
+/* Emergency function, will not check that the children died... */
+{
+ int i;
+ for (i = 0; i < num_busy_workers; ++i) {
+ kill(busy_workers[i].pid, SIGUSR1);
+ }
+ for (i = 0; i < num_free_workers; ++i) {
+ kill(free_workers[i].pid, SIGUSR1);
+ }
+ for (i = 0; i < num_stalled_workers; ++i) {
+ kill(stalled_workers[i].pid, SIGUSR1);
+ }
+}
+#endif /* !WIN32 */
+
+static Worker *pick_worker(void)
+{
+ Worker tmp;
+ if (num_free_workers > 0) {
+ --num_free_workers;
+ tmp = free_workers[num_free_workers];
+ } else if (num_stalled_workers > 0) {
+ /* "restart" the worker... */
+ --num_stalled_workers;
+ kill_worker(&(stalled_workers[num_stalled_workers]));
+ if (create_worker(&tmp,0) < 0) {
+ warning("Unable to create worker process, insufficient "
+ "resources");
+ return NULL;
+ }
+ } else {
+ if (num_busy_workers == max_workers) {
+ return NULL;
+ }
+ if (create_worker(&tmp,0) < 0) {
+ warning("Unable to create worker process, insufficient "
+ "resources");
+ return NULL;
+ }
+ }
+ /* tmp contains a worker now, make it busy and put it in the right
+ array */
+ tmp.state = WORKER_BUSY;
+ busy_workers[num_busy_workers] = tmp;
+ ++num_busy_workers;
+ return &(busy_workers[num_busy_workers-1]);
+}
+
+static Worker *pick_worker_greedy(AddrByte *domainbuff)
+{
+ int i;
+ int ql = 0;
+ int found = -1;
+ for (i=0; i < num_busy_workers; ++i) {
+ if (domaineq(busy_workers[i].domain, domainbuff)) {
+ if ((found < 0) || (busy_workers[i].que_size <
+ busy_workers[found].que_size)) {
+ found = i;
+ ql = busy_workers[i].que_size;
+ }
+ }
+ }
+ if (found >= 0) {
+ return &busy_workers[found];
+ }
+ return NULL;
+}
+
+static void restart_worker(Worker *w)
+{
+ kill_worker(w);
+ if (create_worker(w,1) < 0) {
+ fatal("Unable to create worker process, insufficient resources");
+ }
+}
+
+static void kill_last_picked_worker(void)
+{
+ kill_worker( &(busy_workers[num_busy_workers-1]));
+ --num_busy_workers;
+}
+
+/*
+ * Starts a request qued to a specific worker, check_que starts normally queued requests.
+ * We expect a que here...
+ */
+static void start_que_request(Worker *w)
+{
+ QueItem *qi;
+ SerialType save_serial;
+ if (!w->que_first || !w->que_size) {
+ fatal("Expected que'd requests but found none, "
+ "internal datastructure corrupted!");
+ }
+ qi = w->que_first;
+ w->que_first = w->que_first->next;
+ if (!w->que_first) {
+ w->que_last = NULL;
+ }
+ --(w->que_size);
+ save_serial = get_serial(qi->request);
+#ifdef WIN32
+ while (send_mes_to_worker(qi, w) != 0) {
+ restart_worker(w);
+ }
+#else
+ while (send_request_to_worker(qi->request,
+ qi->req_size, w) != 0) {
+ restart_worker(w);
+ }
+#endif
+ w->serial = save_serial;
+ DEBUGF(3,("Did deque serial %d from worker[%ld] specific que, "
+ "Que is %sempty",
+ get_serial(qi->request), (long) w->pid,
+ (w->que_first) ? "not " : ""));
+#ifndef WIN32
+ FREE(qi);
+#endif
+}
+
+#ifndef WIN32
+/* Signal utilities */
+static RETSIGTYPE (*sys_sigset(int sig, RETSIGTYPE (*func)(int)))(int)
+{
+ struct sigaction act, oact;
+
+ sigemptyset(&act.sa_mask);
+ act.sa_flags = 0;
+ act.sa_handler = func;
+ sigaction(sig, &act, &oact);
+ return(oact.sa_handler);
+}
+
+
+static void sys_sigblock(int sig)
+{
+ sigset_t mask;
+
+ sigemptyset(&mask);
+ sigaddset(&mask, sig);
+ sigprocmask(SIG_BLOCK, &mask, (sigset_t *)NULL);
+}
+
+static void sys_sigrelease(int sig)
+{
+ sigset_t mask;
+
+ sigemptyset(&mask);
+ sigaddset(&mask, sig);
+ sigprocmask(SIG_UNBLOCK, &mask, (sigset_t *)NULL);
+}
+
+/* Child signal handler */
+void reap_children(int ignored)
+{
+ int res;
+ sys_sigblock(SIGCHLD);
+ for (;;) {
+ while ((res = waitpid((pid_t)-1, NULL, WNOHANG)) > 0)
+ ;
+ if (!(res < 0 && errno == EAGAIN)) {
+ DEBUGF(4,("reap_children: res = %d, errno = %d.",res,errno));
+ break;
+ }
+ }
+ sys_sigrelease(SIGCHLD);
+}
+
+static void init_signals(void)
+{
+ sys_sigset(SIGCHLD,&reap_children); /* SIG_IGN would give same result
+ on most (?) platforms. */
+ sys_sigset(SIGPIPE, SIG_IGN);
+}
+#endif
+
+static void stall_worker(int ndx)
+{
+ --num_busy_workers;
+ stalled_workers[num_stalled_workers] = busy_workers[ndx];
+ stalled_workers[num_stalled_workers].state = WORKER_STALLED;
+ busy_workers[ndx] = busy_workers[num_busy_workers];
+ DEBUGF(3, ("Stalled worker[%ld]",
+ (long) stalled_workers[num_stalled_workers].pid));
+ ++num_stalled_workers;
+}
+
+
+/*
+ * Main loop message passing
+ */
+#ifndef WIN32
+static int read_request(AddrByte **buff, size_t *buff_size)
+{
+ int siz;
+ int r;
+
+ if ((r = READ_PACKET_BYTES(0,&siz)) != PACKET_BYTES) {
+ if (r == 0) {
+ return 0;
+ } else {
+ fatal("Unexpected end of file on main input, errno = %d",errno);
+ }
+ }
+
+ if (siz > *buff_size) {
+ if (buff_size == 0) {
+ *buff = ALLOC((*buff_size = siz));
+ } else {
+ *buff = REALLOC(*buff, (*buff_size = siz));
+ }
+ }
+ if (read_exact(0,*buff, siz) != siz) {
+ fatal("Unexpected end of file on main input, errno = %d",errno);
+ }
+ if (siz < 5) {
+ fatal("Unexpected message on main input, message size %d less "
+ "than minimum.");
+ }
+ return siz;
+}
+
+#endif /* !WIN32 */
+
+static OpType get_op(AddrByte *buff)
+{
+ return (OpType) buff[4];
+}
+
+static AddrByte *get_op_addr(AddrByte *buff)
+{
+ return buff + 4;
+}
+
+static SerialType get_serial(AddrByte *buff)
+{
+ return get_int32(buff);
+}
+
+static ProtoType get_proto(AddrByte *buff)
+{
+ return (ProtoType) buff[5];
+}
+
+static CtlType get_ctl(AddrByte *buff)
+{
+ return (CtlType) buff[5];
+}
+
+static AddrByte *get_data(AddrByte *buff)
+{
+ return buff + 6;
+}
+
+static int get_debug_level(AddrByte *buff)
+{
+ return get_int32(buff + 6);
+}
+
+#ifdef WIN32
+static int send_mes_to_worker(QueItem *m, Worker *pw)
+{
+ if (!enque_mesq(pw->writeto, m)) {
+ warning("Unable to send to child process.");
+ return -1;
+ }
+ return 0;
+}
+#else
+static int send_request_to_worker(AddrByte *pr, int rsize, Worker *pw)
+{
+ AddrByte hdr[PACKET_BYTES];
+
+ PUT_PACKET_BYTES(hdr, rsize);
+ if (write_exact(pw->writeto, hdr, PACKET_BYTES) < 0) {
+ warning("Unable to write to child process.");
+ return -1;
+ }
+ if (write_exact(pw->writeto, (AddrByte *) pr, rsize) < 0) {
+ warning("Unable to write to child process.");
+ return -1;
+ }
+ return 0;
+}
+#endif /* !WIN32 */
+
+#ifdef WIN32
+static int relay_reply(Worker *pw)
+{
+ QueItem *m;
+ if (!deque_mesq(pw->readfrom,&m)) {
+ return 0;
+ }
+ if (!enque_mesq(to_erlang,m)) {
+ FREE(m);
+ return 0;
+ }
+ return 1;
+}
+
+static int ignore_reply(Worker *pw) {
+ QueItem *m;
+ if (!deque_mesq(pw->readfrom,&m)) {
+ return 0;
+ }
+ FREE(m);
+ return 1;
+}
+
+#else
+
+/* Static buffers used by the next three functions */
+static AddrByte *relay_buff = NULL;
+static int relay_buff_size = 0;
+
+static int fillin_reply(Worker *pw)
+{
+ int length;
+
+ if (READ_PACKET_BYTES(pw->readfrom, &length) != PACKET_BYTES) {
+ warning("Malformed reply (header) from worker process %d.",
+ pw->pid);
+ return -1;
+ }
+
+ if (relay_buff_size < (length + PACKET_BYTES)) {
+ if (!relay_buff_size) {
+ relay_buff =
+ ALLOC((relay_buff_size = (length + PACKET_BYTES)));
+ } else {
+ relay_buff =
+ REALLOC(relay_buff,
+ (relay_buff_size = (length + PACKET_BYTES)));
+ }
+ }
+ PUT_PACKET_BYTES(relay_buff, length);
+ if (read_exact(pw->readfrom, relay_buff + PACKET_BYTES, length) !=
+ length) {
+ warning("Malformed reply (data) from worker process %d.", pw->pid);
+ return -1;
+ }
+ return length;
+}
+
+static int relay_reply(Worker *pw)
+{
+ int length = fillin_reply(pw); /* Filled into the "global" buffer */
+ int res;
+
+ if (length < 0) {
+ return -1;
+ }
+ if ((res = write_exact(1, relay_buff, length + PACKET_BYTES)) < 0) {
+ fatal("Cannot write reply to erlang process, errno = %d.", errno);
+ } else if (res == 0) {
+ DEBUGF(1,("Erlang has closed write pipe."));
+ return 0;
+ }
+ return length;
+}
+
+static int ignore_reply(Worker *pw)
+{
+ return fillin_reply(pw);
+}
+
+#endif /* !WIN32 */
+
+/*
+ * Domain name "parsing" and worker specific queing
+ */
+static void domaincopy(AddrByte *out, AddrByte *in)
+{
+ AddrByte *ptr = out;
+ *ptr++ = *in++;
+ *ptr++ = *in++;
+ switch(*out) {
+ case OP_GETHOSTBYNAME:
+ while(*in != '\0' && *in != '.')
+ ++in;
+ strncpy((char*)ptr, (char*)in, DOMAINNAME_MAX-2);
+ ptr[DOMAINNAME_MAX-3] = '\0';
+ DEBUGF(4,("Saved domainname %s.", ptr));
+ return;
+ case OP_GETHOSTBYADDR:
+ memcpy(ptr,in, ((out[1] == PROTO_IPV4) ? UNIT_IPV4 : UNIT_IPV6) - 1);
+ DEBUGF(4, ("Saved domain address: %s.",
+ format_address(((out[1] == PROTO_IPV4) ?
+ UNIT_IPV4 : UNIT_IPV6) - 1,ptr)));
+ return;
+ default:
+ fatal("Trying to copy buffer not containing valid domain, [%d,%d].",
+ (int) out[0], (int) out[1]);
+ }
+}
+
+static int domaineq(AddrByte *d1, AddrByte *d2)
+{
+ if (d1[0] != d2[0] || d1[1] != d2[1]) {
+ return 0;
+ }
+ switch (d1[0]) {
+ case OP_GETHOSTBYNAME:
+ return !strcmp((char*)d1+2,(char*)d2+2);
+ case OP_GETHOSTBYADDR:
+ return !memcmp(d1+2,d2+2, ((d1[1] == PROTO_IPV4)
+ ? UNIT_IPV4 : UNIT_IPV6) - 1);
+ default:
+ fatal("Trying to compare buffers not containing valid domain, "
+ "[%d,%d].",
+ (int) d1[0], (int) d1[1]);
+ return -1; /* Lint... */
+ }
+}
+
+static int get_domainname(AddrByte *inbuff, int insize, AddrByte *domainbuff)
+{
+ OpType op = get_op(inbuff);
+ ProtoType proto;
+ int i;
+ AddrByte *data;
+
+ data = get_data(inbuff);
+ switch (op) {
+ case OP_GETHOSTBYNAME:
+ data = get_data(inbuff);
+ for (i = (data - inbuff); i < insize && inbuff[i] != '\0'; ++i)
+ ;
+ if (i < insize) {
+ domaincopy(domainbuff, get_op_addr(inbuff));
+ return 0;
+ }
+ DEBUGF(3, ("Could not pick valid domainname in "
+ "gethostbyname operation"));
+ return -1;
+ case OP_GETHOSTBYADDR:
+ proto = get_proto(inbuff);
+ i = insize - (data - inbuff);
+ if ((proto == PROTO_IPV4 && i == UNIT_IPV4) ||
+ (proto == PROTO_IPV6 && i == UNIT_IPV6)) {
+ /* An address buffer */
+ domaincopy(domainbuff, get_op_addr(inbuff));
+ return 0;
+ }
+ DEBUGF(3, ("Could not pick valid domainname in gethostbyaddr "
+ "operation"));
+ return -1;
+ default:
+ DEBUGF(2, ("Could not pick valid domainname because of "
+ "invalid opcode %d.", (int) op));
+ return -1;
+ }
+}
+
+/*
+ * Worker subprocesses with utilities
+ */
+#ifdef WIN32
+static int create_worker(Worker *pworker, int save_que)
+{
+ MesQ **thread_data = ALLOC(2*sizeof(MesQ *));
+ DWORD tid;
+
+
+ if (!create_mesq(thread_data)) {
+ fatal("Could not create, pipes for subprocess, errno = %d",
+ GetLastError());
+ }
+ if (!create_mesq(thread_data + 1)) {
+ fatal("Could not create, pipes for subprocess, errno = %d",
+ GetLastError());
+ }
+ /* Save those before the thread starts */
+ pworker->writeto = thread_data[0];
+ pworker->readfrom = thread_data[1];
+
+ if (((HANDLE) _beginthreadex(NULL, 0, worker_loop, thread_data, 0, &tid))
+ == NULL) {
+ fatal("Could not create thread errno = %d",
+ GetLastError());
+ }
+ pworker->pid = tid;
+ pworker->state = WORKER_FREE;
+ pworker->serial = INVALID_SERIAL;
+ if (!save_que) {
+ pworker->que_first = pworker->que_last = NULL;
+ pworker->que_size = 0;
+ }
+ DEBUGF(3,("Created worker[%ld] with fd %d",
+ (long) pworker->pid, (int) pworker->readfrom));
+ return 0;
+}
+
+#else
+
+static int create_worker(Worker *pworker, int save_que)
+{
+ int p0[2], p1[2];
+ pid_t child;
+
+ if (pipe(p0)) {
+ warning("Could not create, pipes for subprocess, errno = %d",
+ errno);
+ return -1;
+ }
+
+ if (pipe(p1)) {
+ warning("Could not create, pipes for subprocess, errno = %d",
+ errno);
+ close(p0[0]);
+ close(p0[1]);
+ return -1;
+ }
+ if ((child = fork()) < 0) { /* failure */
+ warning("Could not fork(), errno = %d",
+ errno);
+ close(p0[0]);
+ close(p0[1]);
+ close(p1[0]);
+ close(p1[1]);
+ return -1;
+ } else if (child > 0) { /* parent */
+ close(p0[1]);
+ close(p1[0]);
+ pworker->writeto = p1[1];
+ pworker->readfrom = p0[0];
+ pworker->pid = child;
+ pworker->state = WORKER_FREE;
+ pworker->serial = INVALID_SERIAL;
+ if (!save_que) {
+ pworker->que_first = pworker->que_last = NULL;
+ pworker->que_size = 0;
+ }
+ DEBUGF(3,("Created worker[%ld] with fd %d",
+ (long) pworker->pid, (int) pworker->readfrom));
+ return 0;
+ } else { /* child */
+ close(p1[1]);
+ close(p0[0]);
+ close_all_worker_fds();
+ /* Make "fatal" not find any children */
+ num_busy_workers = num_free_workers = num_stalled_workers = 0;
+ if((dup2(p1[0],0) < 0) || (dup2(p0[1],1) < 0)) {
+ fatal("Worker could not dup2(), errno = %d",
+ errno);
+ return -1; /* lint... */
+ }
+ close(p1[0]);
+ close(p0[1]);
+ signal(SIGCHLD, SIG_IGN);
+ return worker_loop();
+ }
+}
+
+static void close_all_worker_fds(void)
+{
+ int w,i;
+ Worker *workers[3] = {free_workers, busy_workers, stalled_workers};
+ int wsizes[3] = {num_free_workers, num_busy_workers,
+ num_stalled_workers};
+ for (w = 0; w < 3; ++w) {
+ for (i = 0; i < wsizes[w]; ++i) {
+ if (workers[w][i].state != WORKER_EMPTY) {
+ close(workers[w][i].readfrom);
+ close(workers[w][i].writeto);
+ }
+ }
+ }
+}
+
+#endif /* !WIN32 */
+
+#ifdef WIN32
+DWORD WINAPI worker_loop(void *v)
+#else
+static int worker_loop(void)
+#endif
+{
+ AddrByte *req = NULL;
+ size_t req_size = 0;
+ int this_size;
+ AddrByte *reply = NULL;
+ size_t reply_size = 0;
+ size_t data_size;
+
+#ifdef WIN32
+ QueItem *m = NULL;
+ MesQ *readfrom = ((MesQ **) v)[0];
+ MesQ *writeto = ((MesQ **) v)[1];
+ /* XXX:PaN */
+ FREE(v);
+#endif
+
+ for(;;) {
+#ifdef HAVE_GETADDRINFO
+ struct addrinfo *ai = NULL;
+#endif
+ struct hostent *he = NULL;
+#ifdef HAVE_GETNAMEINFO
+ struct sockaddr *sa = NULL;
+ char name[NI_MAXHOST];
+#endif
+#if defined(HAVE_GETIPNODEBYNAME) || defined(HAVE_GETIPNODEBYADDR)
+ int free_he = 0;
+#endif
+ int error_num = 0;
+ SerialType serial;
+ OpType op;
+ ProtoType proto;
+ AddrByte *data;
+
+#ifdef WIN32
+ WaitForSingleObject(event_mesq(readfrom),INFINITE);
+ DEBUGF(4,("Worker got data on message que."));
+
+ if(!deque_mesq(readfrom,&m)) {
+ goto fail;
+ }
+ this_size = m->req_size;
+ req = m->request;
+#else
+ if (READ_PACKET_BYTES(0,&this_size) != PACKET_BYTES) {
+ DEBUGF(2,("Worker got error/EOF while reading size, exiting."));
+ exit(0);
+ }
+ if (this_size > req_size) {
+ if (req == NULL) {
+ req = ALLOC((req_size = this_size));
+ } else {
+ req = REALLOC(req, (req_size = this_size));
+ }
+ }
+ if (read_exact(0, req, (size_t) this_size) != this_size) {
+ DEBUGF(1,("Worker got EOF while reading data, exiting."));
+ exit(0);
+ }
+#endif
+ /* Decode the request... */
+ serial = get_serial(req);
+ if (OP_CONTROL == (op = get_op(req))) {
+ CtlType ctl;
+ if (serial != INVALID_SERIAL) {
+ DEBUGF(1, ("Worker got invalid serial: %d.", serial));
+ exit(0);
+ }
+ switch (ctl = get_ctl(req)) {
+ case SETOPT_DEBUG_LEVEL:
+ debug_level = get_debug_level(req);
+ DEBUGF(debug_level,
+ ("Worker debug_level = %d.", debug_level));
+ break;
+ }
+ continue;
+ }
+ proto = get_proto(req);
+ data = get_data(req);
+ DEBUGF(4,("Worker got request, op = %d, proto = %d, data = %s.",
+ op,proto,data));
+ /* Got a request, lets go... */
+ switch (op) {
+ case OP_GETHOSTBYNAME:
+ switch (proto) {
+
+#ifdef HAVE_IN6
+ case PROTO_IPV6: { /* switch (proto) { */
+#ifdef HAVE_GETADDRINFO
+ struct addrinfo hints;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_flags = (AI_CANONNAME|AI_V4MAPPED|AI_ADDRCONFIG);
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_family = AF_INET6;
+ DEBUGF(5, ("Starting getaddrinfo(%s, ...)", data));
+ error_num = getaddrinfo((char *)data, NULL, &hints, &ai);
+ DEBUGF(5,("getaddrinfo returned %d", error_num));
+ if (error_num) {
+ error_num = map_netdb_error_ai(error_num);
+ }
+#elif defined(HAVE_GETIPNODEBYNAME) /*#ifdef HAVE_GETADDRINFO */
+ DEBUGF(5,("Starting getipnodebyname(%s)",data));
+ he = getipnodebyname(data, AF_INET6, AI_DEFAULT, &error_num);
+ if (he) {
+ free_he = 1;
+ error_num = 0;
+ DEBUGF(5,("getipnodebyname(,AF_INET6,,) OK"));
+ } else {
+ DEBUGF(5,("getipnodebyname(,AF_INET6,,) error %d", error_num));
+ error_num = map_netdb_error(error_num);
+ }
+#elif defined(HAVE_GETHOSTBYNAME2) /*#ifdef HAVE_GETADDRINFO */
+ DEBUGF(5,("Starting gethostbyname2(%s, AF_INET6)",data));
+ he = gethostbyname2((char*)data, AF_INET6);
+ if (he) {
+ error_num = 0;
+ DEBUGF(5,("gethostbyname2(, AF_INET6) OK"));
+ } else {
+ error_num = map_netdb_error(h_errno);
+ DEBUGF(5,("gethostbyname2(, AF_INET6) error %d", h_errno));
+ }
+#else
+ error_num = ERRCODE_NOTSUP;
+#endif /*#ifdef HAVE_GETADDRINFO */
+ } break;
+#endif /*ifdef HAVE_IN6 */
+
+ case PROTO_IPV4: { /* switch (proto) { */
+ DEBUGF(5,("Starting gethostbyname(%s)",data));
+ he = gethostbyname((char*)data);
+ if (he) {
+ error_num = 0;
+ DEBUGF(5,("gethostbyname OK"));
+ } else {
+ error_num = map_netdb_error(h_errno);
+ DEBUGF(5,("gethostbyname error %d", h_errno));
+ }
+ } break;
+
+ default: /* switch (proto) { */
+ /* Not supported... */
+ error_num = ERRCODE_NOTSUP;
+ break;
+ } /* switch (proto) { */
+
+ if (he) {
+ data_size = build_reply(serial, he, &reply, &reply_size);
+#ifdef HAVE_GETIPNODEBYNAME
+ if (free_he) {
+ freehostent(he);
+ }
+#endif
+#ifdef HAVE_GETADDRINFO
+ } else if (ai) {
+ data_size = build_reply_ai(serial, 16, ai,
+ &reply, &reply_size);
+ freeaddrinfo(ai);
+#endif
+ } else {
+ data_size = build_error_reply(serial, error_num,
+ &reply, &reply_size);
+ }
+ break; /* case OP_GETHOSTBYNAME: */
+
+ case OP_GETHOSTBYADDR: /* switch (op) { */
+ switch (proto) {
+#ifdef HAVE_IN6
+ case PROTO_IPV6: {
+#ifdef HAVE_GETNAMEINFO
+ struct sockaddr_in6 *sin;
+ socklen_t salen = sizeof(*sin);
+
+ sin = ALLOC(salen);
+#ifndef NO_SA_LEN
+ sin->sin6_len = salen;
+#endif
+ sin->sin6_family = AF_INET6;
+ sin->sin6_port = 0;
+ memcpy(&sin->sin6_addr, data, 16);
+ sa = (struct sockaddr *)sin;
+ DEBUGF(5,("Starting getnameinfo(,,%s,16,,,)",
+ format_address(16, data)));
+ error_num = getnameinfo(sa, salen, name, sizeof(name),
+ NULL, 0, NI_NAMEREQD);
+ DEBUGF(5,("getnameinfo returned %d", error_num));
+ if (error_num) {
+ error_num = map_netdb_error_ai(error_num);
+ sa = NULL;
+ }
+#elif defined(HAVE_GETIPNODEBYADDR) /*#ifdef HAVE_GETNAMEINFO*/
+ struct in6_addr ia;
+ memcpy(ia.s6_addr, data, 16);
+ DEBUGF(5,("Starting getipnodebyaddr(%s,16,AF_INET6,)",
+ format_address(16, data)));
+ he = getipnodebyaddr(&ia, 16, AF_INET6, &error_num);
+ free_he = 1;
+ if (! he) {
+ DEBUGF(5,("getipnodebyaddr error %d", error_num));
+ error_num = map_netdb_error(error_num);
+ } else {
+ DEBUGF(5,("getipnodebyaddr OK"));
+ }
+#else /*#ifdef HAVE_GETNAMEINFO*/
+ struct in6_addr ia;
+ memcpy(ia.s6_addr, data, 16);
+ DEBUGF(5,("Starting gethostbyaddr(%s,16,AF_INET6)",
+ format_address(16, data)));
+ he = gethostbyaddr((const char *) &ia, 16, AF_INET6);
+ if (! he) {
+ error_num = map_netdb_error(h_errno);
+ DEBUGF(5,("gethostbyaddr error %d", h_errno));
+ } else {
+ DEBUGF(5,("gethostbyaddr OK"));
+ }
+#endif /* #ifdef HAVE_GETNAMEINFO */
+ } break; /* case PROTO_IPV6: { */
+#endif /* #ifdef HAVE_IN6 */
+
+ case PROTO_IPV4: { /* switch(proto) { */
+ struct in_addr ia;
+ memcpy(&ia.s_addr, data, 4); /* Alignment required... */
+ DEBUGF(5,("Starting gethostbyaddr(%s,4,AF_INET)",
+ format_address(4, data)));
+ he = gethostbyaddr((const char *) &ia, 4, AF_INET);
+ if (! he) {
+ error_num = map_netdb_error(h_errno);
+ DEBUGF(5,("gethostbyaddr error %d", h_errno));
+ } else {
+ DEBUGF(5,("gethostbyaddr OK"));
+ }
+ } break;
+
+ default:
+ error_num = ERRCODE_NOTSUP;
+ } /* switch(proto) { */
+
+ if (he) {
+ data_size = build_reply(serial, he, &reply, &reply_size);
+#ifdef HAVE_GETIPNODEBYADDR
+ if (free_he) {
+ freehostent(he);
+ }
+#endif
+#ifdef HAVE_GETNAMEINFO
+ } else if (sa) {
+ struct addrinfo res;
+ memset(&res, 0, sizeof(res));
+ res.ai_canonname = name;
+ res.ai_addr = sa;
+ res.ai_next = NULL;
+ data_size = build_reply_ai(serial, 16, &res,
+ &reply, &reply_size);
+ free(sa);
+#endif
+ } else {
+ data_size = build_error_reply(serial, error_num,
+ &reply, &reply_size);
+ }
+ break; /* case OP_GETHOSTBYADR: */
+
+ default:
+ data_size = build_error_reply(serial, ERRCODE_NOTSUP,
+ &reply, &reply_size);
+ break;
+ } /* switch (op) { */
+
+#ifdef WIN32
+ m = REALLOC(m, sizeof(QueItem) - 1 + data_size - PACKET_BYTES);
+ m->next = NULL;
+ m->req_size = data_size - PACKET_BYTES;
+ memcpy(m->request,reply + PACKET_BYTES,data_size - PACKET_BYTES);
+ if (!enque_mesq(writeto,m)) {
+ goto fail;
+ }
+ m = NULL;
+#else
+ write(1, reply, data_size); /* No signals expected */
+#endif
+ } /* for (;;) */
+
+#ifdef WIN32
+ fail:
+ if (m != NULL) {
+ FREE(m);
+ }
+ close_mesq(readfrom);
+ close_mesq(writeto);
+ if (reply) {
+ FREE(reply);
+ }
+ return 1;
+#endif
+}
+
+static int map_netdb_error(int netdb_code)
+{
+ switch (netdb_code) {
+#ifdef HOST_NOT_FOUND
+ case HOST_NOT_FOUND:
+ return ERRCODE_HOST_NOT_FOUND;
+#endif
+#ifdef TRY_AGAIN
+ case TRY_AGAIN:
+ return ERRCODE_TRY_AGAIN;
+#endif
+#ifdef NO_RECOVERY
+ case NO_RECOVERY:
+ return ERRCODE_NO_RECOVERY;
+#endif
+#if defined(NO_DATA) || defined(NO_ADDRESS)
+#ifdef NO_DATA
+ case NO_DATA:
+#endif
+#ifdef NO_ADDRESS
+#if !defined(NO_DATA) || (NO_DATA != NO_ADDRESS)
+ case NO_ADDRESS:
+#endif
+#endif
+ return ERRCODE_NO_DATA;
+#endif
+ default:
+ return ERRCODE_NETDB_INTERNAL;
+ }
+}
+
+#if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO)
+static int map_netdb_error_ai(int netdb_code)
+{
+ switch(netdb_code) {
+#ifdef EAI_ADDRFAMILY
+ case EAI_ADDRFAMILY:
+ return ERRCODE_NETDB_INTERNAL;
+#endif
+ case EAI_AGAIN:
+ return ERRCODE_TRY_AGAIN;
+ case EAI_BADFLAGS:
+ return ERRCODE_NETDB_INTERNAL;
+ case EAI_FAIL:
+ return ERRCODE_HOST_NOT_FOUND;
+ case EAI_FAMILY:
+ return ERRCODE_NETDB_INTERNAL;
+ case EAI_MEMORY:
+ return ERRCODE_NETDB_INTERNAL;
+#if defined(EAI_NODATA) && EAI_NODATA != EAI_NONAME
+ case EAI_NODATA:
+ return ERRCODE_HOST_NOT_FOUND;
+#endif
+ case EAI_NONAME:
+ return ERRCODE_HOST_NOT_FOUND;
+ case EAI_SERVICE:
+ return ERRCODE_NETDB_INTERNAL;
+ case EAI_SOCKTYPE:
+ return ERRCODE_NETDB_INTERNAL;
+ default:
+ return ERRCODE_NETDB_INTERNAL;
+ }
+}
+#endif /* #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) */
+
+
+static char *errcode_to_string(int errcode)
+{
+ switch (errcode) {
+ case ERRCODE_NOTSUP:
+ return "enotsup";
+ case ERRCODE_HOST_NOT_FOUND:
+ /*
+ * I would preffer
+ * return "host_not_found";
+ * but have to keep compatibility with the old
+ * inet_gethost's error codes...
+ */
+ return "notfound";
+ case ERRCODE_TRY_AGAIN:
+ return "try_again";
+ case ERRCODE_NO_RECOVERY:
+ return "no_recovery";
+ case ERRCODE_NO_DATA:
+ return "no_data";
+ default:
+ /*case ERRCODE_NETDB_INTERNAL:*/
+ return "netdb_internal";
+ }
+}
+
+static size_t build_error_reply(SerialType serial, int errnum,
+ AddrByte **preply,
+ size_t *preply_size)
+{
+ char *errstring = errcode_to_string(errnum);
+ int string_need = strlen(errstring) + 1; /* a '\0' too */
+ unsigned need;
+ AddrByte *ptr;
+
+ need = PACKET_BYTES + 4 /* Serial */ + 1 /* Unit */ + string_need;
+ if (*preply_size < need) {
+ if (*preply_size == 0) {
+ *preply = ALLOC((*preply_size = need));
+ } else {
+ *preply = REALLOC(*preply,
+ (*preply_size = need));
+ }
+ }
+ ptr = *preply;
+ PUT_PACKET_BYTES(ptr,need - PACKET_BYTES);
+ ptr += PACKET_BYTES;
+ put_int32(ptr,serial);
+ ptr +=4;
+ *ptr++ = (AddrByte) 0; /* 4 or 16 */
+ strcpy((char*)ptr, errstring);
+ return need;
+}
+
+
+
+static size_t build_reply(SerialType serial, struct hostent *he,
+ AddrByte **preply, size_t *preply_size)
+{
+ unsigned need;
+ int strings_need;
+ int num_strings;
+ int num_addresses;
+ int i;
+ AddrByte *ptr;
+ int unit = he->h_length;
+
+ for (num_addresses = 0; he->h_addr_list[num_addresses] != NULL;
+ ++num_addresses)
+ ;
+ strings_need = strlen(he->h_name) + 1; /* 1 for null byte */
+ num_strings = 1;
+ if (he->h_aliases) {
+ for(i=0; he->h_aliases[i] != NULL; ++i) {
+ strings_need += strlen(he->h_aliases[i]) + 1;
+ ++num_strings;
+ }
+ }
+
+ need = PACKET_BYTES +
+ 4 /* Serial */ + 1 /* Unit */ + 4 /* Naddr */ +
+ (unit * num_addresses) /* Address bytes */ +
+ 4 /* Nnames */ + strings_need /* The name and alias strings */;
+
+ if (*preply_size < need) {
+ if (*preply_size == 0) {
+ *preply = ALLOC((*preply_size = need));
+ } else {
+ *preply = REALLOC(*preply,
+ (*preply_size = need));
+ }
+ }
+ ptr = *preply;
+ PUT_PACKET_BYTES(ptr,need - PACKET_BYTES);
+ ptr += PACKET_BYTES;
+ put_int32(ptr,serial);
+ ptr +=4;
+ *ptr++ = (AddrByte) unit; /* 4 or 16 */
+ put_int32(ptr, num_addresses);
+ ptr += 4;
+ for (i = 0; i < num_addresses; ++i) {
+ memcpy(ptr, he->h_addr_list[i], unit);
+ ptr += unit;
+ }
+ put_int32(ptr, num_strings);
+ ptr += 4;
+ strcpy((char*)ptr, he->h_name);
+ ptr += 1 + strlen(he->h_name);
+ for (i = 0; i < (num_strings - 1); ++i) {
+ strcpy((char*)ptr, he->h_aliases[i]);
+ ptr += 1 + strlen(he->h_aliases[i]);
+ }
+ return need;
+}
+
+#if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO)
+static size_t build_reply_ai(SerialType serial, int addrlen,
+ struct addrinfo *res0,
+ AddrByte **preply, size_t *preply_size)
+{
+ struct addrinfo *res;
+ int num_strings;
+ int num_addresses;
+ AddrByte *ptr;
+ int need;
+
+ num_addresses = 0;
+ num_strings = 0;
+ need = PACKET_BYTES +
+ 4 /* Serial */ + 1 /* addrlen */ +
+ 4 /* Naddr */ + 4 /* Nnames */;
+
+ for (res = res0; res != NULL; res = res->ai_next) {
+ if (res->ai_addr) {
+ num_addresses++;
+ need += addrlen;
+ }
+ if (res->ai_canonname) {
+ num_strings++;
+ need += strlen(res->ai_canonname) + 1;
+ }
+ }
+
+ if (*preply_size < need) {
+ if (*preply_size == 0) {
+ *preply = ALLOC((*preply_size = need));
+ } else {
+ *preply = REALLOC(*preply,
+ (*preply_size = need));
+ }
+ }
+
+ ptr = *preply;
+ PUT_PACKET_BYTES(ptr,need - PACKET_BYTES);
+ ptr += PACKET_BYTES;
+ put_int32(ptr,serial);
+ ptr +=4;
+ *ptr++ = (AddrByte) addrlen; /* 4 or 16 */
+ put_int32(ptr, num_addresses);
+ ptr += 4;
+ for (res = res0; res != NULL && num_addresses; res = res->ai_next) {
+ if (res->ai_addr == NULL)
+ continue;
+ if (addrlen == 4)
+ memcpy(ptr, &((struct sockaddr_in *)res->ai_addr)->sin_addr, addrlen);
+#ifdef AF_INET6
+ else if (addrlen == 16)
+ memcpy(ptr, &((struct sockaddr_in6 *)res->ai_addr)->sin6_addr, addrlen);
+#endif
+ else
+ memcpy(ptr, res->ai_addr->sa_data, addrlen);
+ ptr += addrlen;
+ num_addresses--;
+ }
+ put_int32(ptr, num_strings);
+ ptr += 4;
+ for (res = res0; res != NULL && num_strings; res = res->ai_next) {
+ if (res->ai_canonname == NULL)
+ continue;
+ strcpy((char *)ptr, res->ai_canonname);
+ ptr += strlen(res->ai_canonname) + 1;
+ num_strings--;
+ }
+ return need;
+}
+
+#endif /* #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) */
+
+
+
+/*
+ * Encode/decode/read/write
+ */
+
+static int get_int32(AddrByte *b)
+{
+ int res;
+ res = (unsigned) b[3];
+ res |= ((unsigned) b[2]) << 8;
+ res |= ((unsigned) b[1]) << 16;
+ res |= ((unsigned) b[0]) << 24;
+ return res;
+}
+
+static void put_int32(AddrByte *buff, int value)
+{
+ buff[0] = (((unsigned) value) >> 24) & 0xFF;
+ buff[1] = (((unsigned) value) >> 16) & 0xFF;
+ buff[2] = (((unsigned) value) >> 8) & 0xFF;
+ buff[3] = ((unsigned) value) & 0xFF;
+}
+#ifdef WIN32
+
+static int read_int32(HANDLE fd, int *res, HANDLE ev)
+{
+ AddrByte b[4];
+ int r;
+ if ((r = read_exact(fd,b,4,ev)) < 0) {
+ return -1;
+ } else if (r == 0) {
+ return 0;
+ } else {
+ *res = (unsigned) b[3];
+ *res |= ((unsigned) b[2]) << 8;
+ *res |= ((unsigned) b[1]) << 16;
+ *res |= ((unsigned) b[0]) << 24;
+ }
+ return 4;
+}
+/*
+ * The standard input is expected to be opened with FILE_FLAG_OVERLAPPED
+ * but this code should handle both cases (although winsock might not).
+ */
+static int read_exact(HANDLE fd, void *vbuff, DWORD nbytes, HANDLE ev)
+{
+ DWORD ret,got;
+ BOOL stat;
+ char *buff = vbuff;
+ OVERLAPPED ov;
+ DWORD err;
+
+
+ got = 0;
+ for(;;) {
+ memset(&ov,0,sizeof(ov));
+ ov.hEvent = ev;
+ ResetEvent(ov.hEvent);
+ stat = ReadFile(fd, buff, nbytes - got, &ret, &ov);
+ if (!stat) {
+ if ((err = GetLastError()) == ERROR_IO_PENDING) {
+ DEBUGF(4,("Overlapped read, waiting for completion..."));
+ WaitForSingleObject(ov.hEvent,INFINITE);
+ stat = GetOverlappedResult(fd,&ov,&ret,TRUE);
+ DEBUGF(4,("Overlapped read, completed with status %d,"
+ " result %d",stat,ret));
+ }
+ if (!stat) {
+ if (GetLastError() == ERROR_BROKEN_PIPE) {
+ DEBUGF(1, ("End of file while reading from pipe."));
+ return 0;
+ } else {
+ DEBUGF(1, ("Error while reading from pipe,"
+ " errno = %d",
+ GetLastError()));
+ return -1;
+ }
+ }
+ } else {
+ DEBUGF(4,("Read completed syncronously, result %d",ret));
+ }
+ if (ret == 0) {
+ DEBUGF(1, ("End of file detected as zero read from pipe."));
+ return 0;
+ }
+ if (ret < nbytes - got) {
+ DEBUGF(4,("Not all data read from pipe, still %d bytes to read.",
+ nbytes - (got + ret)));
+ got += ret;
+ buff += ret;
+ } else {
+ return nbytes;
+ }
+ }
+}
+/*
+ * Now, we actually expect a HANDLE opened with FILE_FLAG_OVERLAPPED,
+ * but this code should handle both cases (although winsock
+ * does not always..)
+ */
+static int write_exact(HANDLE fd, AddrByte *buff, DWORD len, HANDLE ev)
+{
+ DWORD res,stat;
+ DWORD x = len;
+ OVERLAPPED ov;
+ DWORD err;
+
+
+ for(;;) {
+ memset(&ov,0,sizeof(ov));
+ ov.hEvent = ev;
+ ResetEvent(ov.hEvent);
+ stat = WriteFile(fd,buff,x,&res,&ov);
+ if (!stat) {
+ if ((err = GetLastError()) == ERROR_IO_PENDING) {
+ DEBUGF(4,("Overlapped write, waiting for competion..."));
+ WaitForSingleObject(ov.hEvent,INFINITE);
+ stat = GetOverlappedResult(fd,&ov,&res,TRUE);
+ DEBUGF(4,("Overlapped write, completed with status %d,"
+ " result %d",stat,res));
+ }
+ if (!stat) {
+ if (GetLastError() == ERROR_BROKEN_PIPE) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ } else {
+ DEBUGF(4,("Write completed syncronously, result %d",res));
+ }
+
+ if (res < x) {
+ /* Microsoft states this can happen as HANDLE is a pipe... */
+ DEBUGF(4,("Not all data written to pipe, still %d bytes to write.",
+ x - res));
+ x -= res;
+ buff += res;
+ } else {
+ return len;
+ }
+ }
+}
+
+DWORD WINAPI reader(void *data) {
+ MesQ *mq = (MesQ *) data;
+ QueItem *m;
+ int siz;
+ int r;
+ HANDLE inp;
+ int x = 0;
+ HANDLE ev = CreateEvent(NULL, TRUE, FALSE, NULL);
+
+ inp = GetStdHandle(STD_INPUT_HANDLE);
+ for (;;) {
+ if ((r = READ_PACKET_BYTES(inp,&siz,ev)) != 4) {
+ DEBUGF(1,("Erlang has closed (reading)"));
+ exit(0);
+ }
+ DEBUGF(4,("Read packet of size %d from erlang",siz));
+ m = ALLOC(sizeof(QueItem) - 1 + siz);
+ if (read_exact(inp, m->request, siz,ev) != siz) {
+ fatal("Unexpected end of file on main input, errno = %d",errno);
+ }
+ if (siz < 5) {
+ fatal("Unexpected message on main input, message size %d less "
+ "than minimum.");
+ }
+ m->req_size = siz;
+ m->next = NULL;
+ if (!enque_mesq(mq, m)) {
+ fatal("Reader could not talk to main thread!");
+ }
+ }
+}
+
+DWORD WINAPI writer(void *data)
+{
+ MesQ *mq = (MesQ *) data;
+ QueItem *m;
+ HANDLE outp = GetStdHandle(STD_OUTPUT_HANDLE);
+ AddrByte hdr[PACKET_BYTES];
+ HANDLE ev = CreateEvent(NULL, TRUE, FALSE, NULL);
+
+
+ for (;;) {
+ WaitForSingleObject(event_mesq(mq),INFINITE);
+ if (!deque_mesq(mq, &m)) {
+ fatal("Writer could not talk to main thread!");
+ }
+ PUT_PACKET_BYTES(hdr, m->req_size);
+ if (write_exact(outp, hdr, 4, ev) != 4) {
+ DEBUGF(1,("Erlang has closed (writing)"));
+ exit(0);
+ }
+ if (write_exact(outp, m->request, m->req_size, ev) != m->req_size) {
+ DEBUGF(1,("Erlang has closed (writing)"));
+ exit(0);
+ }
+ FREE(m);
+ }
+}
+
+
+#else
+
+static size_t read_int32(int fd, int *res)
+{
+ AddrByte b[4];
+ int r;
+ if ((r = read_exact(fd,b,4)) < 0) {
+ return -1;
+ } else if (r == 0) {
+ return 0;
+ } else {
+ *res = (unsigned) b[3];
+ *res |= ((unsigned) b[2]) << 8;
+ *res |= ((unsigned) b[1]) << 16;
+ *res |= ((unsigned) b[0]) << 24;
+ }
+ return 4;
+}
+
+static ssize_t read_exact(int fd, void *vbuff, size_t nbytes)
+{
+ ssize_t ret, got;
+ char *buff = vbuff;
+
+ got = 0;
+ for(;;) {
+ ret = read(fd, buff, nbytes - got);
+ if (ret < 0) {
+ if (errno == EINTR) {
+ continue;
+ } else {
+ DEBUGF(1, ("Error while reading from pipe,"
+ " errno = %d",
+ errno));
+ return -1;
+ }
+ } else if (ret == 0) {
+ DEBUGF(1, ("End of file while reading from pipe."));
+ if (got == 0) {
+ return 0; /* "Normal" EOF */
+ } else {
+ return -1;
+ }
+ } else if (ret < nbytes - got) {
+ got += ret;
+ buff += ret;
+ } else {
+ return nbytes;
+ }
+ }
+}
+
+static int write_exact(int fd, AddrByte *buff, int len)
+{
+ int res;
+ int x = len;
+ for(;;) {
+ if((res = write(fd, buff, x)) == x) {
+ break;
+ }
+ if (res < 0) {
+ if (errno == EINTR) {
+ continue;
+ } else if (errno == EPIPE) {
+ return 0;
+ }
+#ifdef ENXIO
+ else if (errno == ENXIO) {
+ return 0;
+ }
+#endif
+ else {
+ return -1;
+ }
+ } else {
+ /* Hmmm, blocking write but not all written, could this happen
+ if the other end was closed during the operation? Well,
+ it costs very little to handle anyway... */
+ x -= res;
+ buff += res;
+ }
+ }
+ return len;
+}
+
+#endif /* !WIN32 */
+
+/*
+ * Debug and memory allocation
+ */
+
+static char *format_address(int siz, AddrByte *addr)
+{
+ static char buff[50];
+ char tmp[10];
+ if (siz > 16) {
+ return "(unknown)";
+ }
+ *buff='\0';
+ if (siz <= 4) {
+ while(siz--) {
+ sprintf(tmp,"%d",(int) *addr++);
+ strcat(buff,tmp);
+ if(siz) {
+ strcat(buff,".");
+ }
+ }
+ return buff;
+ }
+ while(siz--) {
+ sprintf(tmp,"%02x",(int) *addr++);
+ strcat(buff,tmp);
+ if(siz) {
+ strcat(buff,":");
+ }
+ }
+ return buff;
+}
+
+static void debugf(char *format, ...)
+{
+ char buff[2048];
+ char *ptr;
+ va_list ap;
+
+ va_start(ap,format);
+#ifdef WIN32
+ sprintf(buff,"%s[%d] (DEBUG):",program_name,(int) GetCurrentThreadId());
+#else
+ sprintf(buff,"%s[%d] (DEBUG):",program_name,(int) getpid());
+#endif
+ ptr = buff + strlen(buff);
+ vsprintf(ptr,format,ap);
+ strcat(ptr,"\r\n");
+#ifdef WIN32
+ if (debug_console_allocated != INVALID_HANDLE_VALUE) {
+ DWORD res;
+ WriteFile(debug_console_allocated,buff,strlen(buff),&res,NULL);
+ }
+#else
+ write(2,buff,strlen(buff));
+#endif
+ va_end(ap);
+}
+
+static void warning(char *format, ...)
+{
+ char buff[2048];
+ char *ptr;
+ va_list ap;
+
+ va_start(ap,format);
+ sprintf(buff,"%s[%d]: WARNING:",program_name, (int) getpid());
+ ptr = buff + strlen(buff);
+ vsprintf(ptr,format,ap);
+ strcat(ptr,"\r\n");
+#ifdef WIN32
+ {
+ DWORD res;
+ WriteFile(GetStdHandle(STD_ERROR_HANDLE),buff,strlen(buff),&res,NULL);
+ }
+#else
+ write(2,buff,strlen(buff));
+#endif
+ va_end(ap);
+}
+
+static void fatal(char *format, ...)
+{
+ char buff[2048];
+ char *ptr;
+ va_list ap;
+
+ va_start(ap,format);
+ sprintf(buff,"%s[%d]: FATAL ERROR:",program_name, (int) getpid());
+ ptr = buff + strlen(buff);
+ vsprintf(ptr,format,ap);
+ strcat(ptr,"\r\n");
+#ifdef WIN32
+ {
+ DWORD res;
+ WriteFile(GetStdHandle(STD_ERROR_HANDLE),buff,strlen(buff),&res,NULL);
+ }
+#else
+ write(2,buff,strlen(buff));
+#endif
+ va_end(ap);
+#ifndef WIN32
+ kill_all_workers();
+#endif
+ exit(1);
+}
+
+static void *my_malloc(size_t size)
+{
+ void *ptr = malloc(size);
+ if (!ptr) {
+ fatal("Cannot allocate %u bytes of memory.", (unsigned) size);
+ return NULL; /* lint... */
+ }
+ return ptr;
+}
+
+static void *my_realloc(void *old, size_t size)
+{
+ void *ptr = realloc(old, size);
+ if (!ptr) {
+ fatal("Cannot reallocate %u bytes of memory from %p.",
+ (unsigned) size, old);
+ return NULL; /* lint... */
+ }
+ return ptr;
+}
+
+#ifdef WIN32
+
+BOOL create_mesq(MesQ **q)
+{
+ MesQ *tmp = malloc(sizeof(MesQ));
+ tmp->data_present = CreateEvent(NULL, TRUE, FALSE,NULL);
+ if (tmp->data_present == NULL) {
+ free(tmp);
+ return FALSE;
+ }
+ InitializeCriticalSection(&(tmp->crit)); /* Cannot fail */
+ tmp->shutdown = 0;
+ tmp->first = NULL;
+ tmp->last = NULL;
+ *q = tmp;
+ return TRUE;
+}
+
+BOOL enque_mesq(MesQ *q, QueItem *m)
+{
+ EnterCriticalSection(&(q->crit));
+ if (q->shutdown) {
+ LeaveCriticalSection(&(q->crit));
+ return FALSE;
+ }
+ if (q->last == NULL) {
+ q->first = q->last = m;
+ } else {
+ q->last->next = m;
+ q->last = m;
+ }
+ m->next = NULL;
+ if (!SetEvent(q->data_present)) {
+ fprintf(stderr,"Fatal: Unable to signal event in %s:%d, last error: %d\n",
+ __FILE__,__LINE__,GetLastError());
+ exit(1); /* Unable to continue at all */
+ }
+ LeaveCriticalSection(&(q->crit));
+ return TRUE;
+}
+
+BOOL deque_mesq(MesQ *q, QueItem **m)
+{
+ EnterCriticalSection(&(q->crit));
+ if (q->first == NULL) { /* Usually shutdown from other end */
+ ResetEvent(q->data_present);
+ LeaveCriticalSection(&(q->crit));
+ return FALSE;
+ }
+ *m = q->first;
+ q->first = q->first->next;
+ if (q->first == NULL) {
+ q->last = NULL;
+ ResetEvent(q->data_present);
+ }
+ (*m)->next = NULL;
+ LeaveCriticalSection(&(q->crit));
+ return TRUE;
+}
+
+BOOL close_mesq(MesQ *q)
+{
+ QueItem *tmp;
+ EnterCriticalSection(&(q->crit));
+ if (!q->shutdown) {
+ q->shutdown = TRUE;
+ if (!SetEvent(q->data_present)) {
+ fprintf(stderr,
+ "Fatal: Unable to signal event in %s:%d, last error: %d\n",
+ __FILE__,__LINE__,GetLastError());
+ exit(1); /* Unable to continue at all */
+ }
+ LeaveCriticalSection(&(q->crit));
+ return FALSE;
+ }
+ /* Noone else is supposed to use this object any more */
+ LeaveCriticalSection(&(q->crit));
+ DeleteCriticalSection(&(q->crit));
+ CloseHandle(q->data_present);
+ tmp = q->first;
+ while(tmp) {
+ q->first = q->first->next;
+ free(tmp);
+ tmp = q->first;
+ }
+ free(q);
+ return TRUE;
+}
+
+HANDLE event_mesq(MesQ *q)
+{
+ return q->data_present;
+}
+
+#ifdef HARDDEBUG
+DWORD WINAPI pseudo_worker_loop(void *v)
+{
+ HOSTENT *hep;
+
+ DEBUGF(1,("gethostbyname(\"ftp.funet.fi\") starting"));
+ hep = gethostbyname("ftp.funet.fi");
+
+ DEBUGF(1,("gethostbyname(\"ftp.funet.fi\") -> %d OK",(int) hep));
+ return 0;
+}
+
+static void poll_gethost(int row) {
+ HANDLE h;
+ DWORD tid;
+ h = (HANDLE) _beginthreadex(NULL, 0, pseudo_worker_loop, NULL, 0, &tid);
+ if (h == NULL) {
+ DEBUGF(1,("Failed to spawn pseudo worker (%d)...",row));
+ } else {
+ DEBUGF(1,("Waiting for pseudo worker (%d)", row));
+ WaitForSingleObject(h,INFINITE);
+ DEBUGF(1,("Done Waiting for pseudo worker (%d)", row));
+ }
+}
+#endif
+
+#endif /* WIN32 */