aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/examples/uds_dist/c_src
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/kernel/examples/uds_dist/c_src
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/kernel/examples/uds_dist/c_src')
-rw-r--r--lib/kernel/examples/uds_dist/c_src/Makefile32
-rw-r--r--lib/kernel/examples/uds_dist/c_src/uds_drv.c1065
2 files changed, 1097 insertions, 0 deletions
diff --git a/lib/kernel/examples/uds_dist/c_src/Makefile b/lib/kernel/examples/uds_dist/c_src/Makefile
new file mode 100644
index 0000000000..de3a3730c9
--- /dev/null
+++ b/lib/kernel/examples/uds_dist/c_src/Makefile
@@ -0,0 +1,32 @@
+# Example makefile, Solaris only
+CC = gcc
+CFLAGS=-O3 -g -fPIC -pedantic -Wall -I$(ERL_INCLUDE)
+LD=ld
+RM_RF=rm -rf
+INSTALL_DIR=/usr/ucb/install -d
+LIBRARIES=-lc -ltermlib -lresolv -ldl -lm -lsocket -lnsl
+TARGET_DIR=../priv/lib
+OBJECT_DIR=../priv/obj
+SHLIB_EXT=.so
+OBJ_EXT=.o
+TARGET_NAME=uds_drv$(SHLIB_EXT)
+TARGET=$(TARGET_DIR)/$(TARGET_NAME)
+OBJECTS=$(OBJECT_DIR)/uds_drv$(OBJ_EXT)
+
+LDFLAGS=-G -h $(TARGET_NAME)
+
+# Works if building in open source source tree
+ERL_INCLUDE=$(ERL_TOP)/erts/emulator/beam
+
+opt: setup $(OBJECTS)
+ $(LD) $(LDFLAGS) $(OBJECTS) -o $(TARGET) $(LIBRARIES)
+
+setup:
+ $(INSTALL_DIR) $(TARGET_DIR)
+ $(INSTALL_DIR) $(OBJECT_DIR)
+
+$(OBJECT_DIR)/%.o: %.c
+ $(CC) $(CFLAGS) -c -o $@ $<
+
+clean:
+ $(RM_RF) $(TARGET_DIR) $(OBJECT_DIR)
diff --git a/lib/kernel/examples/uds_dist/c_src/uds_drv.c b/lib/kernel/examples/uds_dist/c_src/uds_drv.c
new file mode 100644
index 0000000000..fb10a375f4
--- /dev/null
+++ b/lib/kernel/examples/uds_dist/c_src/uds_drv.c
@@ -0,0 +1,1065 @@
+/* ``The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved via the world wide web at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+ * Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+ * AB. All Rights Reserved.''
+ *
+ * $Id$
+ */
+
+/*
+ * Purpose: Special purpouse Unix domain socket driver for distribution.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <fcntl.h>
+
+#define HAVE_UIO_H
+#include "erl_driver.h"
+
+#define DEBUG
+/*#define HARDDEBUG 1*/
+/*
+** Some constants/macros
+*/
+
+#ifdef HARDDEBUG
+#define DEBUGF(P) debugf P
+#include <stdarg.h>
+static void debugf(char *str, ...)
+{
+ va_list ap;
+ va_start(ap,str);
+ fprintf(stderr,"Uds_drv debug: ");
+ vfprintf(stderr,str, ap);
+ fprintf(stderr,"\r\n");
+ va_end(ap);
+}
+#ifndef DEBUG
+#define DEBUG 1
+#endif
+#else
+#define DEBUGF(P)
+#endif
+
+
+#ifdef DEBUG
+#define ASSERT(X) \
+do { \
+ if (!(X)) { \
+ fprintf(stderr,"Assertion (%s) failed at line %d file %s\r\n", #X, \
+ __LINE__, __FILE__); \
+ exit(1); \
+ } \
+} while(0)
+#define ASSERT_NONBLOCK(FD) ASSERT(fcntl((FD), F_GETFL, 0) & O_NONBLOCK)
+#else
+#define ASSERT(X)
+#define ASSERT_NONBLOCK(FD)
+#endif
+
+
+
+#define SET_NONBLOCKING(FD) \
+ fcntl((FD), F_SETFL, \
+ fcntl((FD), F_GETFL, 0) | O_NONBLOCK)
+
+#define ALLOC(X) my_malloc(X)
+#define REALLOC(P,X) my_realloc(P,X)
+#define FREE(X) driver_free(X)
+
+#define CHUNK_SIZE 256
+
+#define DIST_MAGIC_RECV_TAG 100
+
+/*
+** The max length of an I/O vector seams to be impossible to find
+** out (?), so this is just a value known to work on solaris.
+*/
+#define IO_VECTOR_MAX 16
+
+#define SOCKET_PATH "/tmp/erlang"
+#define LOCK_SUFFIX ".lock"
+
+#define NORMAL_READ_FAILURE -1
+#define SEVERE_READ_FAILURE -2
+#define EOF_READ_FAILURE -3
+
+/*
+** Internal structures
+*/
+
+#define HEADER_LENGTH 4
+
+typedef enum {
+ portTypeUnknown, /* An uninitialized port */
+ portTypeListener, /* A listening port/socket */
+ portTypeAcceptor, /* An intermidiate stage when accepting
+ on a listen port */
+ portTypeConnector, /* An intermediate stage when connecting */
+ portTypeCommand, /* A connected open port in command mode */
+ portTypeIntermediate, /* A connected open port in special half
+ active mode */
+ portTypeData /* A connectec open port in data mode */
+} PortType;
+
+typedef unsigned char Byte;
+typedef unsigned int Word;
+
+typedef struct uds_data {
+ int fd; /* File descriptor */
+ ErlDrvPort port; /* The port identifier */
+ int lockfd; /* The file descriptor for a lock file in
+ case of listen sockets */
+ Byte creation; /* The creation serial derived from the
+ lockfile */
+ PortType type; /* Type of port */
+ char *name; /* Short name of socket for unlink */
+ Word sent; /* Messages sent */
+ Word received; /* Messages received */
+ struct uds_data *partner; /* The partner in an accept/listen pair */
+ struct uds_data *next; /* Next structure in list */
+
+ /* The input buffer and it's data */
+ int buffer_size; /* The allocated size of the input buffer */
+ int buffer_pos; /* Current position in input buffer */
+ int header_pos; /* Where the current header is in the
+ input buffer */
+ Byte *buffer; /* The actual input buffer */
+} UdsData;
+
+/*
+** Interface routines
+*/
+static ErlDrvData uds_start(ErlDrvPort port, char *buff);
+static void uds_stop(ErlDrvData handle);
+static void uds_command(ErlDrvData handle, char *buff, int bufflen);
+static void uds_input(ErlDrvData handle, ErlDrvEvent event);
+static void uds_output(ErlDrvData handle, ErlDrvEvent event);
+static void uds_finish(void);
+static int uds_control(ErlDrvData handle, unsigned int command,
+ char* buf, int count, char** res, int res_size);
+static void uds_stop_select(ErlDrvEvent event, void*);
+
+/*
+** Local helpers forward declarations
+*/
+
+static void uds_command_listen(UdsData *ud, char *buff, int bufflen);
+static void uds_command_accept(UdsData *ud, char *buff, int bufflen);
+static void uds_command_connect(UdsData *ud, char *buff, int bufflen);
+
+static void do_stop(UdsData *ud, int shutting_down);
+static void do_send(UdsData *ud, char *buff, int bufflen);
+static void do_recv(UdsData *ud);
+
+static int report_control_error(char **buffer, int buff_len,
+ char *error_message);
+static int send_out_queue(UdsData *ud);
+static int buffered_read_package(UdsData *ud, char **result);
+static int read_at_least(UdsData *ud, int num);
+static int get_packet_length(char *b);
+static void put_packet_length(char *b, int len);
+static void *my_malloc(size_t size);
+static void *my_realloc(void *optr, size_t size);
+static int try_lock(char *sockname, Byte *p_creation);
+static int ensure_dir(char *path);
+static void do_unlink(char *name);
+
+/*
+** Global data
+*/
+
+/* The driver entry */
+ErlDrvEntry uds_driver_entry = {
+ NULL, /* init, N/A */
+ uds_start, /* start, called when port is opened */
+ uds_stop, /* stop, called when port is closed */
+ uds_command, /* output, called when erlang has sent */
+ uds_input, /* ready_input, called when input descriptor
+ ready */
+ uds_output, /* ready_output, called when output
+ descriptor ready */
+ "uds_drv", /* char *driver_name, the argument to open_port */
+ uds_finish, /* finish, called when unloaded */
+ NULL, /* void * that is not used (BC) */
+ uds_control, /* control, port_control callback */
+ NULL, /* timeout, called on timeouts */
+ NULL, /* outputv, vector output interface */
+ NULL, /* ready_async */
+ NULL, /* flush */
+ NULL, /* call */
+ NULL, /* event */
+ ERL_DRV_EXTENDED_MARKER,
+ ERL_DRV_EXTENDED_MAJOR_VERSION,
+ ERL_DRV_EXTENDED_MINOR_VERSION,
+ 0, /* ERL_DRV_FLAGs */
+ NULL,
+ NULL, /* process_exit */
+ uds_stop_select
+};
+
+/* Beginning of linked list of ports */
+static UdsData *first_data;
+
+/*
+**
+** Driver interface routines
+**
+*/
+
+/*
+** Driver initialization routine
+*/
+DRIVER_INIT(uds_drv)
+{
+ first_data = NULL;
+ return &uds_driver_entry;
+}
+
+/*
+** A port is opened, we need no information whatsoever about the socket
+** at this stage.
+*/
+static ErlDrvData uds_start(ErlDrvPort port, char *buff)
+{
+ UdsData *ud;
+
+ ud = ALLOC(sizeof(UdsData));
+ ud->fd = -1;
+ ud->lockfd = -1;
+ ud->creation = 0;
+ ud->port = port;
+ ud->type = portTypeUnknown;
+ ud->name = NULL;
+ ud->buffer_size = 0;
+ ud->buffer_pos = 0;
+ ud->header_pos = 0;
+ ud->buffer = NULL;
+ ud->sent = 0;
+ ud->received = 0;
+ ud->partner = NULL;
+ ud->next = first_data;
+ first_data = ud;
+
+ return((ErlDrvData) ud);
+}
+
+/*
+** Close the socket/port and free up
+*/
+static void uds_stop(ErlDrvData handle)
+{
+ do_stop((UdsData *) handle, 0);
+}
+
+/*
+** Command interface, operates in two modes, Command mode and data mode.
+** Mode is shifted with the port_control function.
+** Command mode protocol:
+** 'L'<socketname>: Lock and listen on socket.
+** 'A'<listennumber as 32 bit bigendian>: Accept from the port referenced by the
+** "listennumber"
+** 'C'<socketname>: Connect to the socket named <socketname>
+** 'S'<data>: Send the data <data>
+** 'R': Receive one packet of data
+** Data mode protocol:
+** Send anything that arrives (no opcodes/skip opcodes).
+*/
+
+static void uds_command(ErlDrvData handle, char *buff, int bufflen)
+{
+ UdsData *ud = (UdsData *) handle;
+
+ if (ud->type == portTypeData || ud->type == portTypeIntermediate) {
+ DEBUGF(("Passive do_send %d",bufflen));
+ do_send(ud, buff + 1, bufflen - 1); /* XXX */
+ return;
+ }
+ if (bufflen == 0) {
+ return;
+ }
+ switch (*buff) {
+ case 'L':
+ if (ud->type != portTypeUnknown) {
+ driver_failure_posix(ud->port, ENOTSUP);
+ return;
+ }
+ uds_command_listen(ud,buff,bufflen);
+ return;
+ case 'A':
+ if (ud->type != portTypeUnknown) {
+ driver_failure_posix(ud->port, ENOTSUP);
+ return;
+ }
+ uds_command_accept(ud,buff,bufflen);
+ return;
+ case 'C':
+ if (ud->type != portTypeUnknown) {
+ driver_failure_posix(ud->port, ENOTSUP);
+ return;
+ }
+ uds_command_connect(ud,buff,bufflen);
+ return;
+ case 'S':
+ if (ud->type != portTypeCommand) {
+ driver_failure_posix(ud->port, ENOTSUP);
+ return;
+ }
+ do_send(ud, buff + 1, bufflen - 1);
+ return;
+ case 'R':
+ if (ud->type != portTypeCommand) {
+ driver_failure_posix(ud->port, ENOTSUP);
+ return;
+ }
+ do_recv(ud);
+ return;
+ default:
+ ASSERT(0);
+ return;
+ }
+}
+
+static void uds_input(ErlDrvData handle, ErlDrvEvent event)
+{
+ UdsData *ud = (UdsData *) handle;
+
+ DEBUGF(("In uds_input type = %d",ud->type));
+ if (ud->type == portTypeListener) {
+ UdsData *ad = ud->partner;
+ struct sockaddr_un peer;
+ int pl = sizeof(struct sockaddr_un);
+ int fd;
+
+ ASSERT(ad != NULL);
+ if ((fd = accept(ud->fd, (struct sockaddr *) &peer, &pl)) < 0) {
+ if (errno != EWOULDBLOCK) {
+ DEBUGF(("Accept failed."));
+ driver_failure_posix(ud->port, errno);
+ return;
+ }
+ DEBUGF(("Accept would block."));
+ return;
+ }
+ SET_NONBLOCKING(fd);
+ ad->fd = fd;
+ ad->partner = NULL;
+ ad->type = portTypeCommand;
+ ud->partner = NULL;
+ DEBUGF(("Accept successful."));
+ driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
+ driver_output(ad->port, "Aok",3);
+ return;
+ }
+ /* OK, normal data or command port */
+ ASSERT(ud->type >= portTypeCommand);
+#ifdef HARDDEBUG
+ if (ud->type == portTypeData)
+ DEBUGF(("Passive do_recv"));
+#endif
+ do_recv(ud);
+}
+
+static void uds_output(ErlDrvData handle, ErlDrvEvent event)
+{
+ UdsData *ud = (UdsData *) handle;
+ if (ud->type == portTypeConnector) {
+ ud->type = portTypeCommand;
+ driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE, 0);
+ driver_output(ud->port, "Cok",3);
+ return;
+ }
+ ASSERT(ud->type == portTypeCommand || ud->type == portTypeData);
+ send_out_queue(ud);
+}
+
+static void uds_finish(void)
+{
+ while (first_data != NULL) {
+ do_stop(first_data, 1);
+ }
+}
+
+/*
+** Protocol to control:
+** 'C': Set port in command mode.
+** 'I': Set port in intermidiate mode
+** 'D': Set port in data mode
+** 'N': Get identification number for listen port
+** 'S': Get statistics
+** 'T': Send a tick message
+** 'R': Get creation number of listen socket
+** Answer is one byte status (0 == ok, Other is followed by error as string)
+** followed by data if applicable
+*/
+static int uds_control(ErlDrvData handle, unsigned int command,
+ char* buf, int count, char** res, int res_size)
+{
+/* Local macro to ensure large enough buffer. */
+#define ENSURE(N) \
+ do { \
+ if (res_size < N) { \
+ *res = ALLOC(N); \
+ } \
+ } while(0)
+
+ UdsData *ud = (UdsData *) handle;
+
+ DEBUGF(("Control, type = %d, fd = %d, command = %c", ud->type, ud->fd,
+ (char) command));
+ switch (command) {
+ case 'S':
+ {
+ ENSURE(13);
+ **res = 0;
+ put_packet_length((*res) + 1, ud->received);
+ put_packet_length((*res) + 5, ud->sent);
+ put_packet_length((*res) + 9, driver_sizeq(ud->port));
+ return 13;
+ }
+ case 'C':
+ if (ud->type < portTypeCommand) {
+ return report_control_error(res, res_size, "einval");
+ }
+ ud->type = portTypeCommand;
+ driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
+ ENSURE(1);
+ **res = 0;
+ return 1;
+ case 'I':
+ if (ud->type < portTypeCommand) {
+ return report_control_error(res, res_size, "einval");
+ }
+ ud->type = portTypeIntermediate;
+ driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
+ ENSURE(1);
+ **res = 0;
+ return 1;
+ case 'D':
+ if (ud->type < portTypeCommand) {
+ return report_control_error(res, res_size, "einval");
+ }
+ ud->type = portTypeData;
+ do_recv(ud);
+ ENSURE(1);
+ **res = 0;
+ return 1;
+ case 'N':
+ if (ud->type != portTypeListener) {
+ return report_control_error(res, res_size, "einval");
+ }
+ ENSURE(5);
+ (*res)[0] = 0;
+ put_packet_length((*res) + 1, ud->fd);
+ return 5;
+ case 'T': /* tick */
+ if (ud->type != portTypeData) {
+ return report_control_error(res, res_size, "einval");
+ }
+ do_send(ud,"",0);
+ ENSURE(1);
+ **res = 0;
+ return 1;
+ case 'R':
+ if (ud->type != portTypeListener) {
+ return report_control_error(res, res_size, "einval");
+ }
+ ENSURE(2);
+ (*res)[0] = 0;
+ (*res)[1] = ud->creation;
+ return 2;
+ default:
+ return report_control_error(res, res_size, "einval");
+ }
+#undef ENSURE
+}
+
+static void uds_stop_select(ErlDrvEvent event, void* _)
+{
+ close((int)(long)event);
+}
+
+/*
+**
+** Local helpers
+**
+*/
+
+/*
+** Command implementations
+*/
+static void uds_command_connect(UdsData *ud, char *buff, int bufflen)
+{
+ char *str;
+ int fd;
+ struct sockaddr_un s_un;
+ int length;
+ int res;
+
+ str = ALLOC(25);
+ sprintf(str, "erl%d", (int) getpid()); /* A temporary sufficiently
+ unique name */
+ do_unlink(str);
+ s_un.sun_family = AF_UNIX;
+ strcpy(s_un.sun_path, SOCKET_PATH "/");
+ strcat(s_un.sun_path, str);
+ DEBUGF(("Connect own filename: %s", s_un.sun_path));
+ length = sizeof(s_un.sun_family) + strlen(s_un.sun_path);
+ ud->name = str;
+ ud->type = portTypeCommand;
+ if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
+ DEBUGF(("socket call failed, errno = %d"));
+ driver_failure_posix(ud->port, errno);
+ return;
+ }
+ ud->fd = fd;
+ if ((res = bind(fd, (struct sockaddr *) &s_un, length)) < 0) {
+ DEBUGF(("bind call failed, errno = %d",errno));
+ driver_failure_posix(ud->port, errno);
+ return;
+ }
+ str = ALLOC(bufflen);
+ memcpy(str, buff + 1, bufflen - 1);
+ str[bufflen - 1] = '\0';
+ strcpy(s_un.sun_path, SOCKET_PATH "/");
+ strcat(s_un.sun_path, str);
+ length = sizeof(s_un.sun_family) + strlen(s_un.sun_path);
+ DEBUGF(("Connect peer filename: %s", s_un.sun_path));
+ SET_NONBLOCKING(fd);
+ if (connect(fd, (struct sockaddr *) &s_un, length) < 0) {
+ if (errno != EINPROGRESS) {
+ driver_failure_posix(ud->port, errno);
+ } else {
+ DEBUGF(("Connect pending"));
+ ud->type = portTypeConnector;
+ driver_select(ud->port, (ErlDrvEvent) ud->fd,
+ ERL_DRV_WRITE|ERL_DRV_USE, 1);
+ }
+ } else {
+ DEBUGF(("Connect done"));
+ driver_output(ud->port, "Cok", 3);
+ }
+ FREE(str);
+}
+
+static void uds_command_accept(UdsData *ud, char *buff, int bufflen)
+{
+ int listen_no;
+ UdsData *lp;
+
+ if (bufflen < 5) {
+ driver_failure_posix(ud->port, EINVAL);
+ return;
+ }
+
+ listen_no = get_packet_length(buff + 1); /* Same format as
+ packet headers */
+ DEBUGF(("Accept listen_no = %d",listen_no));
+ for (lp = first_data; lp != NULL && lp->fd != listen_no; lp = lp->next)
+ ;
+ if (lp == NULL) {
+ DEBUGF(("Could not find listen port"));
+ driver_failure_posix(ud->port, EINVAL);
+ return;
+ }
+ if (lp->partner != NULL) {
+ DEBUGF(("Listen port busy"));
+ driver_failure_posix(ud->port, EADDRINUSE);
+ return;
+ }
+ lp->partner = ud;
+ ud->partner = lp;
+ ud->type = portTypeAcceptor;
+ driver_select(lp->port,(ErlDrvEvent) lp->fd, ERL_DRV_READ|ERL_DRV_USE, 1);
+ /* Silent, answer will be sent in input routine */
+}
+
+static void uds_command_listen(UdsData *ud, char *buff, int bufflen)
+{
+ char *str;
+ int fd;
+ struct sockaddr_un s_un;
+ int length;
+ int res;
+ UdsData *tmp;
+ Byte creation;
+
+ str = ALLOC(bufflen);
+ memcpy(str, buff + 1,bufflen - 1);
+ str[bufflen - 1] = '\0';
+
+ /*
+ ** Before trying lockfiles etc, we need to assure that our own process is
+ ** not using the filename. Advisory locks can be recursive in one process.
+ */
+ for(tmp = first_data; tmp != NULL; tmp = tmp->next) {
+ if (tmp->name != NULL && strcmp(str, tmp->name) == 0) {
+ driver_failure_posix(ud->port, EADDRINUSE);
+ FREE(str);
+ return;
+ }
+ }
+
+ if ((fd = try_lock(str, &creation)) < 0) {
+ driver_failure_posix(ud->port, EADDRINUSE);
+ FREE(str);
+ return;
+ }
+ s_un.sun_family = AF_UNIX;
+ strcpy(s_un.sun_path, SOCKET_PATH "/");
+ strcat(s_un.sun_path, str);
+ length = sizeof(s_un.sun_family) + strlen(s_un.sun_path);
+ ud->name = str;
+ ud->type = portTypeListener;
+ ud->lockfd = fd;
+ ud->creation = creation;
+ if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
+ DEBUGF(("socket call failed, errno = %d"));
+ driver_failure_posix(ud->port, errno);
+ return;
+ }
+ SET_NONBLOCKING(fd);
+ ud->fd = fd;
+ do_unlink(str);
+ DEBUGF(("Listen filename: %s", s_un.sun_path));
+ if ((res = bind(fd, (struct sockaddr *) &s_un, length)) < 0) {
+ DEBUGF(("bind call failed, errno = %d",errno));
+ driver_failure_posix(ud->port, errno);
+ return;
+ }
+
+ if ((res = listen(fd, 5)) < 0) {
+ DEBUGF(("listen call failed, errno = %d"));
+ driver_failure_posix(ud->port, errno);
+ return;
+ }
+ driver_output(ud->port, "Lok", 3);
+}
+
+/*
+** Input/output/stop helpers
+*/
+static void do_stop(UdsData *ud, int shutting_down)
+{
+ UdsData **tmp;
+
+ DEBUGF(("Cleaning up, type = %d, fd = %d, lockfd = %d", ud->type,
+ ud->fd, ud->lockfd));
+ for (tmp = &first_data; *tmp != NULL && *tmp != ud; tmp = &((*tmp)->next))
+ ;
+ ASSERT(*tmp != NULL);
+ *tmp = (*tmp)->next;
+ if (ud->buffer != NULL) {
+ FREE(ud->buffer);
+ }
+ if (ud->fd >= 0) {
+ driver_select(ud->port, (ErlDrvEvent) ud->fd,
+ ERL_DRV_READ|ERL_DRV_WRITE|ERL_DRV_USE, 0);
+ }
+ if (ud->name) {
+ do_unlink(ud->name);
+ FREE(ud->name);
+ }
+ if (ud->lockfd >= 0) {
+ ASSERT(ud->type == portTypeListener);
+ close(ud->lockfd); /* the lock will be released */
+ /* But leave the file there for the creation counter... */
+ }
+ if (!shutting_down) { /* Dont bother if the driver is shutting down. */
+ if (ud->partner != NULL) {
+ if (ud->type == portTypeAcceptor) {
+ UdsData *listener = ud->partner;
+ listener->partner = NULL;
+ driver_select(listener->port, (ErlDrvEvent) listener->fd,
+ ERL_DRV_READ, 0);
+ } else {
+ UdsData *acceptor = ud->partner;
+ ASSERT(ud->type == portTypeListener);
+ acceptor->partner = NULL;
+ driver_failure_eof(acceptor->port);
+ }
+ }
+ }
+ FREE(ud);
+}
+
+/*
+** Actually send the data
+*/
+static void do_send(UdsData *ud, char *buff, int bufflen)
+{
+ char header[4];
+ int written;
+ SysIOVec iov[2];
+ ErlIOVec eio;
+ ErlDrvBinary *binv[] = {NULL,NULL};
+
+ put_packet_length(header, bufflen);
+ DEBUGF(("Write packet header %u,%u,%u,%u.", (Word) header[0],
+ (Word) header[1], (Word) header[2],(Word) header[3]));
+ iov[0].iov_base = (char *) header;
+ iov[0].iov_len = 4;
+ iov[1].iov_base = buff;
+ iov[1].iov_len = bufflen;
+ eio.iov = iov;
+ eio.binv = binv;
+ eio.vsize = 2;
+ eio.size = bufflen + 4;
+ written = 0;
+ if (driver_sizeq(ud->port) == 0) {
+ if ((written = writev(ud->fd, iov, 2)) == eio.size) {
+ ud->sent += written;
+ if (ud->type == portTypeCommand) {
+ driver_output(ud->port, "Sok", 3);
+ }
+ DEBUGF(("Wrote all %d bytes immediately.",written));
+ return;
+ } else if (written < 0) {
+ if (errno != EWOULDBLOCK) {
+ driver_failure_eof(ud->port);
+ return;
+ } else {
+ written = 0;
+ }
+ } else {
+ ud->sent += written;
+ }
+ DEBUGF(("Wrote %d bytes immediately.",written));
+ /* Enqueue remaining */
+ }
+ driver_enqv(ud->port, &eio, written);
+ DEBUGF(("Sending output queue."));
+ send_out_queue(ud);
+}
+
+static void do_recv(UdsData *ud)
+{
+ int res;
+ char *ibuf;
+ ASSERT_NONBLOCK(ud->fd);
+ DEBUGF(("do_recv called, type = %d", ud->type));
+ for(;;) {
+ if ((res = buffered_read_package(ud,&ibuf)) < 0) {
+ if (res == NORMAL_READ_FAILURE) {
+ DEBUGF(("do_recv normal read failed"));
+ driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ|ERL_DRV_USE, 1);
+ } else {
+ DEBUGF(("do_recv fatal read failed (%d) (%d)",errno, res));
+ driver_failure_eof(ud->port);
+ }
+ return;
+ }
+ DEBUGF(("do_recv got package, port type = %d", ud->type));
+ /* Got a package */
+ if (ud->type == portTypeCommand) {
+ ibuf[-1] = 'R'; /* There is always room for a single byte opcode
+ before the actual buffer (where the packet
+ header was) */
+ driver_output(ud->port,ibuf - 1, res + 1);
+ driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0);
+ return;
+ } else {
+ ibuf[-1] = DIST_MAGIC_RECV_TAG; /* XXX */
+ driver_output(ud->port,ibuf - 1, res + 1);
+ driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ|ERL_DRV_USE, 1);
+ }
+ }
+}
+
+
+/*
+** Report control error, helper for error messages from control
+*/
+static int report_control_error(char **buffer, int buff_len,
+ char *error_message)
+{
+ int elen = strlen(error_message);
+ if (elen + 1 < buff_len) {
+ *buffer = ALLOC(elen + 1);
+ }
+ **buffer = 1;
+ memcpy((*buffer) + 1, error_message, elen);
+ return elen + 1;
+}
+
+/*
+** Lower level I/O helpers
+*/
+static int send_out_queue(UdsData *ud)
+{
+ ASSERT_NONBLOCK(ud->fd);
+ for(;;) {
+ int vlen;
+ SysIOVec *tmp = driver_peekq(ud->port, &vlen);
+ int wrote;
+ if (tmp == NULL) {
+ DEBUGF(("Write queue empty."));
+ driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE, 0);
+ if (ud->type == portTypeCommand) {
+ driver_output(ud->port, "Sok", 3);
+ }
+ return 0;
+ }
+ if (vlen > IO_VECTOR_MAX) {
+ vlen = IO_VECTOR_MAX;
+ }
+ DEBUGF(("Trying to writev %d vectors", vlen));
+#ifdef HARDDEBUG
+ {
+ int i;
+ for (i = 0; i < vlen; ++i) {
+ DEBUGF(("Buffer %d: length %d", i, tmp[i].iov_len));
+ }
+ }
+#endif
+ if ((wrote = writev(ud->fd, tmp, vlen)) < 0) {
+ if (errno == EWOULDBLOCK) {
+ DEBUGF(("Write failed normal."));
+ driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE|ERL_DRV_USE, 1);
+ return 0;
+ } else {
+ DEBUGF(("Write failed fatal (%d).", errno));
+ driver_failure_eof(ud->port);
+ return -1;
+ }
+ }
+ driver_deq(ud->port, wrote);
+ ud->sent += wrote;
+ DEBUGF(("Wrote %d bytes of data.",wrote));
+ }
+}
+
+static int buffered_read_package(UdsData *ud, char **result)
+{
+ int res;
+ int data_size;
+
+ if (ud->buffer_pos < ud->header_pos + HEADER_LENGTH) {
+ /* The header is not read yet */
+ DEBUGF(("Header not read yet"));
+ if ((res = read_at_least(ud, ud->header_pos + HEADER_LENGTH -
+ ud->buffer_pos)) < 0) {
+ DEBUGF(("Header read failed"));
+ return res;
+ }
+ }
+ DEBUGF(("Header is read"));
+ /* We have at least the header read */
+ data_size = get_packet_length((char *) ud->buffer + ud->header_pos);
+ DEBUGF(("Input packet size = %d", data_size));
+ if (ud->buffer_pos < ud->header_pos + HEADER_LENGTH + data_size) {
+ /* We need to read more */
+ DEBUGF(("Need to read more (bufferpos %d, want %d)", ud->buffer_pos,
+ ud->header_pos + HEADER_LENGTH + data_size));
+ if ((res = read_at_least(ud,
+ ud->header_pos + HEADER_LENGTH +
+ data_size - ud->buffer_pos)) < 0) {
+ DEBUGF(("Data read failed"));
+ return res;
+ }
+ }
+ DEBUGF(("Data is completely read"));
+ *result = (char *) ud->buffer + ud->header_pos + HEADER_LENGTH;
+ ud->header_pos += HEADER_LENGTH + data_size;
+ return data_size;
+}
+
+static int read_at_least(UdsData *ud, int num)
+{
+ int got;
+ if (ud->buffer_pos + num > ud->buffer_size) {
+ /* No place in the buffer, try to pack it */
+ if (ud->header_pos > 0) {
+ int offset = ud->header_pos;
+ DEBUGF(("Packing buffer, buffer_pos was %d, buffer_size was %d "
+ "offset %d num %d header_pos %d.",
+ ud->buffer_pos, ud->buffer_size,
+ offset, num, ud->header_pos));
+ memmove(ud->buffer, ud->buffer + ud->header_pos,
+ ud->buffer_pos - ud->header_pos);
+ ud->buffer_pos -= offset;
+ ud->header_pos -= offset;
+ }
+ /* The buffer is packed, look for space again and reallocate if
+ needed */
+ if (ud->buffer_pos + num > ud->buffer_size) {
+ /* Let's grow in chunks of 256 */
+ ud->buffer_size = (((ud->buffer_pos + num) /
+ CHUNK_SIZE) + 1) * CHUNK_SIZE;
+ DEBUGF(("New buffer size %d.",ud->buffer_size));
+ /* We will always keep one extra byte before the buffer to
+ allow insertion of an opcode */
+ if (!ud->buffer) {
+ ud->buffer = ALLOC(ud->buffer_size);
+ } else {
+ ud->buffer = REALLOC(ud->buffer, ud->buffer_size);
+ }
+ }
+ }
+ /* OK, now we have a large enough buffer, try to read into it */
+ if ((got = read(ud->fd, ud->buffer + ud->buffer_pos,
+ ud->buffer_size - ud->buffer_pos)) < 0) {
+ /* It failed, the question is why... */
+ if (errno == EAGAIN) {
+ return NORMAL_READ_FAILURE;
+ }
+ return SEVERE_READ_FAILURE;
+ } else if (got == 0) {
+ return EOF_READ_FAILURE;
+ }
+ DEBUGF(("Got %d bytes.", got));
+ ud->received += got;
+ ud->buffer_pos += got;
+ /* So, we got some bytes, but enough ? */
+ if (got < num) {
+ return NORMAL_READ_FAILURE;
+ }
+ return 0;
+}
+
+static int get_packet_length(char *b)
+{
+ Byte *u = (Byte *) b;
+ int x = (((Word) u[0]) << 24) | (((Word) u[1]) << 16) |
+ (((Word) u[2]) << 8) | ((Word) u[3]);
+ DEBUGF(("Packet length %d.", x));
+ return x;
+}
+
+static void put_packet_length(char *b, int len)
+{
+ Byte *p = (Byte *) b;
+ Word n = (Word) len;
+ p[0] = (n >> 24) & 0xFF;
+ p[1] = (n >> 16) & 0xFF;
+ p[2] = (n >> 8) & 0xFF;
+ p[3] = n & 0xFF;
+}
+
+/*
+** Malloc wrappers
+** Note!
+** The function erl_exit is actually not a pert of the
+** driver interface, but it is very nice to use if one wants to halt
+** with a core and an erlang crash dump.
+*/
+static void *my_malloc(size_t size)
+{
+ void erl_exit(int, char *, ...);
+ void *ptr;
+
+ if ((ptr = driver_alloc(size)) == NULL) {
+ erl_exit(1,"Could not allocate %d bytes of memory",(int) size);
+ }
+ return ptr;
+}
+
+static void *my_realloc(void *ptr, size_t size)
+{
+ void erl_exit(int, char *, ...);
+ void *nptr;
+ if ((nptr = driver_realloc(ptr, size)) == NULL) {
+ erl_exit(1,"Could not reallocate %d bytes of memory",(int) size);
+ }
+ return nptr;
+}
+
+
+/*
+** Socket file handling helpers
+*/
+
+/*
+** Check that directory exists, create if not (only works for one level)
+*/
+static int ensure_dir(char *path)
+{
+ if (mkdir(path,0777) != 0 && errno != EEXIST) {
+ return -1;
+ }
+ return 0;
+}
+
+/*
+** Try to open a lock file and lock the first byte write-only (advisory)
+** return the file descriptor if succesful, otherwise -1 (<0).
+*/
+static int try_lock(char *sockname, Byte *p_creation)
+{
+ char *lockname;
+ int lockfd;
+ struct flock fl;
+ Byte creation;
+
+ lockname = ALLOC(strlen(SOCKET_PATH)+1+strlen(sockname)+
+ strlen(LOCK_SUFFIX)+1);
+ sprintf(lockname,SOCKET_PATH "/%s" LOCK_SUFFIX, sockname);
+ DEBUGF(("lockname = %s", lockname));
+ if (ensure_dir(SOCKET_PATH) != 0) {
+ DEBUGF(("ensure_dir failed, errno = %d", errno));
+ FREE(lockname);
+ return -1;
+ }
+ if ((lockfd = open(lockname, O_RDWR | O_CREAT, 0666)) < 0) {
+ DEBUGF(("open failed, errno = %d", errno));
+ FREE(lockname);
+ return -1;
+ }
+ FREE(lockname);
+ memset(&fl,0,sizeof(fl));
+ fl.l_type = F_WRLCK;
+ fl.l_whence = SEEK_SET;
+ fl.l_start = 0;
+ fl.l_len = 1;
+ if (fcntl(lockfd, F_SETLK, &fl) < 0) {
+ DEBUGF(("fcntl failed, errno = %d", errno));
+ close(lockfd);
+ return -1;
+ }
+ /* OK, check for creation and update */
+ if (read(lockfd, &creation, 1) < 1) {
+ creation = 0;
+ } else {
+ creation = (creation + 1) % 4;
+ }
+ lseek(lockfd, 0, SEEK_SET);
+ write(lockfd, &creation, 1);
+ fsync(lockfd); /* This could be concidered dangerous (blocking) */
+ *p_creation = creation;
+ return lockfd;
+}
+
+static void do_unlink(char *name)
+{
+ char buff[100];
+ char *str = buff;
+ int len = strlen(SOCKET_PATH) + 1 + strlen(name) + 1;
+
+ if (len > 100) {
+ str = ALLOC(len);
+ }
+ sprintf(str,SOCKET_PATH "/%s",name);
+ unlink(str);
+ if (str != buff) {
+ FREE(str);
+ }
+}
+