From 84adefa331c4159d432d22840663c38f155cd4c1 Mon Sep 17 00:00:00 2001 From: Erlang/OTP Date: Fri, 20 Nov 2009 14:54:40 +0000 Subject: The R13B03 release. --- lib/kernel/examples/Makefile | 54 ++ lib/kernel/examples/uds_dist/c_src/Makefile | 32 + lib/kernel/examples/uds_dist/c_src/uds_drv.c | 1065 +++++++++++++++++++++++ lib/kernel/examples/uds_dist/ebin/.gitignore | 0 lib/kernel/examples/uds_dist/priv/.gitignore | 0 lib/kernel/examples/uds_dist/src/Makefile | 27 + lib/kernel/examples/uds_dist/src/uds.erl | 166 ++++ lib/kernel/examples/uds_dist/src/uds_dist.app | 7 + lib/kernel/examples/uds_dist/src/uds_dist.erl | 304 +++++++ lib/kernel/examples/uds_dist/src/uds_server.erl | 156 ++++ 10 files changed, 1811 insertions(+) create mode 100644 lib/kernel/examples/Makefile create mode 100644 lib/kernel/examples/uds_dist/c_src/Makefile create mode 100644 lib/kernel/examples/uds_dist/c_src/uds_drv.c create mode 100644 lib/kernel/examples/uds_dist/ebin/.gitignore create mode 100644 lib/kernel/examples/uds_dist/priv/.gitignore create mode 100644 lib/kernel/examples/uds_dist/src/Makefile create mode 100644 lib/kernel/examples/uds_dist/src/uds.erl create mode 100644 lib/kernel/examples/uds_dist/src/uds_dist.app create mode 100644 lib/kernel/examples/uds_dist/src/uds_dist.erl create mode 100644 lib/kernel/examples/uds_dist/src/uds_server.erl (limited to 'lib/kernel/examples') diff --git a/lib/kernel/examples/Makefile b/lib/kernel/examples/Makefile new file mode 100644 index 0000000000..fb27f8d438 --- /dev/null +++ b/lib/kernel/examples/Makefile @@ -0,0 +1,54 @@ +# ``The contents of this file are subject to the Erlang Public License, +# Version 1.1, (the "License"); you may not use this file except in +# compliance with the License. You should have received a copy of the +# Erlang Public License along with this software. If not, it can be +# retrieved via the world wide web at http://www.erlang.org/. +# +# Software distributed under the License is distributed on an "AS IS" +# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +# the License for the specific language governing rights and limitations +# under the License. +# +# The Initial Developer of the Original Code is Ericsson Utvecklings AB. +# Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings +# AB. All Rights Reserved.'' +# +# $Id$ +# +include $(ERL_TOP)/make/target.mk + +include $(ERL_TOP)/make/$(TARGET)/otp.mk + +# ---------------------------------------------------- +# Common Macros +# ---------------------------------------------------- +include ../vsn.mk + +# ---------------------------------------------------- +# Targets +# ---------------------------------------------------- + +debug opt: + +clean: + +docs: + +# ---------------------------------------------------- +# Release Target +# ---------------------------------------------------- +include $(ERL_TOP)/make/otp_release_targets.mk + +RELSYSDIR = $(RELEASE_PATH)/lib/kernel-$(KERNEL_VSN)/examples + +# Pack and install the complete directory structure from +# here (CWD) and down, for all examples. + +EXAMPLES = uds_dist + +release_spec: + $(INSTALL_DIR) $(RELSYSDIR) + tar cf - $(EXAMPLES) | \ + (cd $(RELSYSDIR); tar xf - ; chmod -R ug+w $(EXAMPLES) ) + +release_docs_spec: diff --git a/lib/kernel/examples/uds_dist/c_src/Makefile b/lib/kernel/examples/uds_dist/c_src/Makefile new file mode 100644 index 0000000000..de3a3730c9 --- /dev/null +++ b/lib/kernel/examples/uds_dist/c_src/Makefile @@ -0,0 +1,32 @@ +# Example makefile, Solaris only +CC = gcc +CFLAGS=-O3 -g -fPIC -pedantic -Wall -I$(ERL_INCLUDE) +LD=ld +RM_RF=rm -rf +INSTALL_DIR=/usr/ucb/install -d +LIBRARIES=-lc -ltermlib -lresolv -ldl -lm -lsocket -lnsl +TARGET_DIR=../priv/lib +OBJECT_DIR=../priv/obj +SHLIB_EXT=.so +OBJ_EXT=.o +TARGET_NAME=uds_drv$(SHLIB_EXT) +TARGET=$(TARGET_DIR)/$(TARGET_NAME) +OBJECTS=$(OBJECT_DIR)/uds_drv$(OBJ_EXT) + +LDFLAGS=-G -h $(TARGET_NAME) + +# Works if building in open source source tree +ERL_INCLUDE=$(ERL_TOP)/erts/emulator/beam + +opt: setup $(OBJECTS) + $(LD) $(LDFLAGS) $(OBJECTS) -o $(TARGET) $(LIBRARIES) + +setup: + $(INSTALL_DIR) $(TARGET_DIR) + $(INSTALL_DIR) $(OBJECT_DIR) + +$(OBJECT_DIR)/%.o: %.c + $(CC) $(CFLAGS) -c -o $@ $< + +clean: + $(RM_RF) $(TARGET_DIR) $(OBJECT_DIR) diff --git a/lib/kernel/examples/uds_dist/c_src/uds_drv.c b/lib/kernel/examples/uds_dist/c_src/uds_drv.c new file mode 100644 index 0000000000..fb10a375f4 --- /dev/null +++ b/lib/kernel/examples/uds_dist/c_src/uds_drv.c @@ -0,0 +1,1065 @@ +/* ``The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved via the world wide web at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * The Initial Developer of the Original Code is Ericsson Utvecklings AB. + * Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings + * AB. All Rights Reserved.'' + * + * $Id$ + */ + +/* + * Purpose: Special purpouse Unix domain socket driver for distribution. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define HAVE_UIO_H +#include "erl_driver.h" + +#define DEBUG +/*#define HARDDEBUG 1*/ +/* +** Some constants/macros +*/ + +#ifdef HARDDEBUG +#define DEBUGF(P) debugf P +#include +static void debugf(char *str, ...) +{ + va_list ap; + va_start(ap,str); + fprintf(stderr,"Uds_drv debug: "); + vfprintf(stderr,str, ap); + fprintf(stderr,"\r\n"); + va_end(ap); +} +#ifndef DEBUG +#define DEBUG 1 +#endif +#else +#define DEBUGF(P) +#endif + + +#ifdef DEBUG +#define ASSERT(X) \ +do { \ + if (!(X)) { \ + fprintf(stderr,"Assertion (%s) failed at line %d file %s\r\n", #X, \ + __LINE__, __FILE__); \ + exit(1); \ + } \ +} while(0) +#define ASSERT_NONBLOCK(FD) ASSERT(fcntl((FD), F_GETFL, 0) & O_NONBLOCK) +#else +#define ASSERT(X) +#define ASSERT_NONBLOCK(FD) +#endif + + + +#define SET_NONBLOCKING(FD) \ + fcntl((FD), F_SETFL, \ + fcntl((FD), F_GETFL, 0) | O_NONBLOCK) + +#define ALLOC(X) my_malloc(X) +#define REALLOC(P,X) my_realloc(P,X) +#define FREE(X) driver_free(X) + +#define CHUNK_SIZE 256 + +#define DIST_MAGIC_RECV_TAG 100 + +/* +** The max length of an I/O vector seams to be impossible to find +** out (?), so this is just a value known to work on solaris. +*/ +#define IO_VECTOR_MAX 16 + +#define SOCKET_PATH "/tmp/erlang" +#define LOCK_SUFFIX ".lock" + +#define NORMAL_READ_FAILURE -1 +#define SEVERE_READ_FAILURE -2 +#define EOF_READ_FAILURE -3 + +/* +** Internal structures +*/ + +#define HEADER_LENGTH 4 + +typedef enum { + portTypeUnknown, /* An uninitialized port */ + portTypeListener, /* A listening port/socket */ + portTypeAcceptor, /* An intermidiate stage when accepting + on a listen port */ + portTypeConnector, /* An intermediate stage when connecting */ + portTypeCommand, /* A connected open port in command mode */ + portTypeIntermediate, /* A connected open port in special half + active mode */ + portTypeData /* A connectec open port in data mode */ +} PortType; + +typedef unsigned char Byte; +typedef unsigned int Word; + +typedef struct uds_data { + int fd; /* File descriptor */ + ErlDrvPort port; /* The port identifier */ + int lockfd; /* The file descriptor for a lock file in + case of listen sockets */ + Byte creation; /* The creation serial derived from the + lockfile */ + PortType type; /* Type of port */ + char *name; /* Short name of socket for unlink */ + Word sent; /* Messages sent */ + Word received; /* Messages received */ + struct uds_data *partner; /* The partner in an accept/listen pair */ + struct uds_data *next; /* Next structure in list */ + + /* The input buffer and it's data */ + int buffer_size; /* The allocated size of the input buffer */ + int buffer_pos; /* Current position in input buffer */ + int header_pos; /* Where the current header is in the + input buffer */ + Byte *buffer; /* The actual input buffer */ +} UdsData; + +/* +** Interface routines +*/ +static ErlDrvData uds_start(ErlDrvPort port, char *buff); +static void uds_stop(ErlDrvData handle); +static void uds_command(ErlDrvData handle, char *buff, int bufflen); +static void uds_input(ErlDrvData handle, ErlDrvEvent event); +static void uds_output(ErlDrvData handle, ErlDrvEvent event); +static void uds_finish(void); +static int uds_control(ErlDrvData handle, unsigned int command, + char* buf, int count, char** res, int res_size); +static void uds_stop_select(ErlDrvEvent event, void*); + +/* +** Local helpers forward declarations +*/ + +static void uds_command_listen(UdsData *ud, char *buff, int bufflen); +static void uds_command_accept(UdsData *ud, char *buff, int bufflen); +static void uds_command_connect(UdsData *ud, char *buff, int bufflen); + +static void do_stop(UdsData *ud, int shutting_down); +static void do_send(UdsData *ud, char *buff, int bufflen); +static void do_recv(UdsData *ud); + +static int report_control_error(char **buffer, int buff_len, + char *error_message); +static int send_out_queue(UdsData *ud); +static int buffered_read_package(UdsData *ud, char **result); +static int read_at_least(UdsData *ud, int num); +static int get_packet_length(char *b); +static void put_packet_length(char *b, int len); +static void *my_malloc(size_t size); +static void *my_realloc(void *optr, size_t size); +static int try_lock(char *sockname, Byte *p_creation); +static int ensure_dir(char *path); +static void do_unlink(char *name); + +/* +** Global data +*/ + +/* The driver entry */ +ErlDrvEntry uds_driver_entry = { + NULL, /* init, N/A */ + uds_start, /* start, called when port is opened */ + uds_stop, /* stop, called when port is closed */ + uds_command, /* output, called when erlang has sent */ + uds_input, /* ready_input, called when input descriptor + ready */ + uds_output, /* ready_output, called when output + descriptor ready */ + "uds_drv", /* char *driver_name, the argument to open_port */ + uds_finish, /* finish, called when unloaded */ + NULL, /* void * that is not used (BC) */ + uds_control, /* control, port_control callback */ + NULL, /* timeout, called on timeouts */ + NULL, /* outputv, vector output interface */ + NULL, /* ready_async */ + NULL, /* flush */ + NULL, /* call */ + NULL, /* event */ + ERL_DRV_EXTENDED_MARKER, + ERL_DRV_EXTENDED_MAJOR_VERSION, + ERL_DRV_EXTENDED_MINOR_VERSION, + 0, /* ERL_DRV_FLAGs */ + NULL, + NULL, /* process_exit */ + uds_stop_select +}; + +/* Beginning of linked list of ports */ +static UdsData *first_data; + +/* +** +** Driver interface routines +** +*/ + +/* +** Driver initialization routine +*/ +DRIVER_INIT(uds_drv) +{ + first_data = NULL; + return &uds_driver_entry; +} + +/* +** A port is opened, we need no information whatsoever about the socket +** at this stage. +*/ +static ErlDrvData uds_start(ErlDrvPort port, char *buff) +{ + UdsData *ud; + + ud = ALLOC(sizeof(UdsData)); + ud->fd = -1; + ud->lockfd = -1; + ud->creation = 0; + ud->port = port; + ud->type = portTypeUnknown; + ud->name = NULL; + ud->buffer_size = 0; + ud->buffer_pos = 0; + ud->header_pos = 0; + ud->buffer = NULL; + ud->sent = 0; + ud->received = 0; + ud->partner = NULL; + ud->next = first_data; + first_data = ud; + + return((ErlDrvData) ud); +} + +/* +** Close the socket/port and free up +*/ +static void uds_stop(ErlDrvData handle) +{ + do_stop((UdsData *) handle, 0); +} + +/* +** Command interface, operates in two modes, Command mode and data mode. +** Mode is shifted with the port_control function. +** Command mode protocol: +** 'L': Lock and listen on socket. +** 'A': Accept from the port referenced by the +** "listennumber" +** 'C': Connect to the socket named +** 'S': Send the data +** 'R': Receive one packet of data +** Data mode protocol: +** Send anything that arrives (no opcodes/skip opcodes). +*/ + +static void uds_command(ErlDrvData handle, char *buff, int bufflen) +{ + UdsData *ud = (UdsData *) handle; + + if (ud->type == portTypeData || ud->type == portTypeIntermediate) { + DEBUGF(("Passive do_send %d",bufflen)); + do_send(ud, buff + 1, bufflen - 1); /* XXX */ + return; + } + if (bufflen == 0) { + return; + } + switch (*buff) { + case 'L': + if (ud->type != portTypeUnknown) { + driver_failure_posix(ud->port, ENOTSUP); + return; + } + uds_command_listen(ud,buff,bufflen); + return; + case 'A': + if (ud->type != portTypeUnknown) { + driver_failure_posix(ud->port, ENOTSUP); + return; + } + uds_command_accept(ud,buff,bufflen); + return; + case 'C': + if (ud->type != portTypeUnknown) { + driver_failure_posix(ud->port, ENOTSUP); + return; + } + uds_command_connect(ud,buff,bufflen); + return; + case 'S': + if (ud->type != portTypeCommand) { + driver_failure_posix(ud->port, ENOTSUP); + return; + } + do_send(ud, buff + 1, bufflen - 1); + return; + case 'R': + if (ud->type != portTypeCommand) { + driver_failure_posix(ud->port, ENOTSUP); + return; + } + do_recv(ud); + return; + default: + ASSERT(0); + return; + } +} + +static void uds_input(ErlDrvData handle, ErlDrvEvent event) +{ + UdsData *ud = (UdsData *) handle; + + DEBUGF(("In uds_input type = %d",ud->type)); + if (ud->type == portTypeListener) { + UdsData *ad = ud->partner; + struct sockaddr_un peer; + int pl = sizeof(struct sockaddr_un); + int fd; + + ASSERT(ad != NULL); + if ((fd = accept(ud->fd, (struct sockaddr *) &peer, &pl)) < 0) { + if (errno != EWOULDBLOCK) { + DEBUGF(("Accept failed.")); + driver_failure_posix(ud->port, errno); + return; + } + DEBUGF(("Accept would block.")); + return; + } + SET_NONBLOCKING(fd); + ad->fd = fd; + ad->partner = NULL; + ad->type = portTypeCommand; + ud->partner = NULL; + DEBUGF(("Accept successful.")); + driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0); + driver_output(ad->port, "Aok",3); + return; + } + /* OK, normal data or command port */ + ASSERT(ud->type >= portTypeCommand); +#ifdef HARDDEBUG + if (ud->type == portTypeData) + DEBUGF(("Passive do_recv")); +#endif + do_recv(ud); +} + +static void uds_output(ErlDrvData handle, ErlDrvEvent event) +{ + UdsData *ud = (UdsData *) handle; + if (ud->type == portTypeConnector) { + ud->type = portTypeCommand; + driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE, 0); + driver_output(ud->port, "Cok",3); + return; + } + ASSERT(ud->type == portTypeCommand || ud->type == portTypeData); + send_out_queue(ud); +} + +static void uds_finish(void) +{ + while (first_data != NULL) { + do_stop(first_data, 1); + } +} + +/* +** Protocol to control: +** 'C': Set port in command mode. +** 'I': Set port in intermidiate mode +** 'D': Set port in data mode +** 'N': Get identification number for listen port +** 'S': Get statistics +** 'T': Send a tick message +** 'R': Get creation number of listen socket +** Answer is one byte status (0 == ok, Other is followed by error as string) +** followed by data if applicable +*/ +static int uds_control(ErlDrvData handle, unsigned int command, + char* buf, int count, char** res, int res_size) +{ +/* Local macro to ensure large enough buffer. */ +#define ENSURE(N) \ + do { \ + if (res_size < N) { \ + *res = ALLOC(N); \ + } \ + } while(0) + + UdsData *ud = (UdsData *) handle; + + DEBUGF(("Control, type = %d, fd = %d, command = %c", ud->type, ud->fd, + (char) command)); + switch (command) { + case 'S': + { + ENSURE(13); + **res = 0; + put_packet_length((*res) + 1, ud->received); + put_packet_length((*res) + 5, ud->sent); + put_packet_length((*res) + 9, driver_sizeq(ud->port)); + return 13; + } + case 'C': + if (ud->type < portTypeCommand) { + return report_control_error(res, res_size, "einval"); + } + ud->type = portTypeCommand; + driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0); + ENSURE(1); + **res = 0; + return 1; + case 'I': + if (ud->type < portTypeCommand) { + return report_control_error(res, res_size, "einval"); + } + ud->type = portTypeIntermediate; + driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0); + ENSURE(1); + **res = 0; + return 1; + case 'D': + if (ud->type < portTypeCommand) { + return report_control_error(res, res_size, "einval"); + } + ud->type = portTypeData; + do_recv(ud); + ENSURE(1); + **res = 0; + return 1; + case 'N': + if (ud->type != portTypeListener) { + return report_control_error(res, res_size, "einval"); + } + ENSURE(5); + (*res)[0] = 0; + put_packet_length((*res) + 1, ud->fd); + return 5; + case 'T': /* tick */ + if (ud->type != portTypeData) { + return report_control_error(res, res_size, "einval"); + } + do_send(ud,"",0); + ENSURE(1); + **res = 0; + return 1; + case 'R': + if (ud->type != portTypeListener) { + return report_control_error(res, res_size, "einval"); + } + ENSURE(2); + (*res)[0] = 0; + (*res)[1] = ud->creation; + return 2; + default: + return report_control_error(res, res_size, "einval"); + } +#undef ENSURE +} + +static void uds_stop_select(ErlDrvEvent event, void* _) +{ + close((int)(long)event); +} + +/* +** +** Local helpers +** +*/ + +/* +** Command implementations +*/ +static void uds_command_connect(UdsData *ud, char *buff, int bufflen) +{ + char *str; + int fd; + struct sockaddr_un s_un; + int length; + int res; + + str = ALLOC(25); + sprintf(str, "erl%d", (int) getpid()); /* A temporary sufficiently + unique name */ + do_unlink(str); + s_un.sun_family = AF_UNIX; + strcpy(s_un.sun_path, SOCKET_PATH "/"); + strcat(s_un.sun_path, str); + DEBUGF(("Connect own filename: %s", s_un.sun_path)); + length = sizeof(s_un.sun_family) + strlen(s_un.sun_path); + ud->name = str; + ud->type = portTypeCommand; + if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { + DEBUGF(("socket call failed, errno = %d")); + driver_failure_posix(ud->port, errno); + return; + } + ud->fd = fd; + if ((res = bind(fd, (struct sockaddr *) &s_un, length)) < 0) { + DEBUGF(("bind call failed, errno = %d",errno)); + driver_failure_posix(ud->port, errno); + return; + } + str = ALLOC(bufflen); + memcpy(str, buff + 1, bufflen - 1); + str[bufflen - 1] = '\0'; + strcpy(s_un.sun_path, SOCKET_PATH "/"); + strcat(s_un.sun_path, str); + length = sizeof(s_un.sun_family) + strlen(s_un.sun_path); + DEBUGF(("Connect peer filename: %s", s_un.sun_path)); + SET_NONBLOCKING(fd); + if (connect(fd, (struct sockaddr *) &s_un, length) < 0) { + if (errno != EINPROGRESS) { + driver_failure_posix(ud->port, errno); + } else { + DEBUGF(("Connect pending")); + ud->type = portTypeConnector; + driver_select(ud->port, (ErlDrvEvent) ud->fd, + ERL_DRV_WRITE|ERL_DRV_USE, 1); + } + } else { + DEBUGF(("Connect done")); + driver_output(ud->port, "Cok", 3); + } + FREE(str); +} + +static void uds_command_accept(UdsData *ud, char *buff, int bufflen) +{ + int listen_no; + UdsData *lp; + + if (bufflen < 5) { + driver_failure_posix(ud->port, EINVAL); + return; + } + + listen_no = get_packet_length(buff + 1); /* Same format as + packet headers */ + DEBUGF(("Accept listen_no = %d",listen_no)); + for (lp = first_data; lp != NULL && lp->fd != listen_no; lp = lp->next) + ; + if (lp == NULL) { + DEBUGF(("Could not find listen port")); + driver_failure_posix(ud->port, EINVAL); + return; + } + if (lp->partner != NULL) { + DEBUGF(("Listen port busy")); + driver_failure_posix(ud->port, EADDRINUSE); + return; + } + lp->partner = ud; + ud->partner = lp; + ud->type = portTypeAcceptor; + driver_select(lp->port,(ErlDrvEvent) lp->fd, ERL_DRV_READ|ERL_DRV_USE, 1); + /* Silent, answer will be sent in input routine */ +} + +static void uds_command_listen(UdsData *ud, char *buff, int bufflen) +{ + char *str; + int fd; + struct sockaddr_un s_un; + int length; + int res; + UdsData *tmp; + Byte creation; + + str = ALLOC(bufflen); + memcpy(str, buff + 1,bufflen - 1); + str[bufflen - 1] = '\0'; + + /* + ** Before trying lockfiles etc, we need to assure that our own process is + ** not using the filename. Advisory locks can be recursive in one process. + */ + for(tmp = first_data; tmp != NULL; tmp = tmp->next) { + if (tmp->name != NULL && strcmp(str, tmp->name) == 0) { + driver_failure_posix(ud->port, EADDRINUSE); + FREE(str); + return; + } + } + + if ((fd = try_lock(str, &creation)) < 0) { + driver_failure_posix(ud->port, EADDRINUSE); + FREE(str); + return; + } + s_un.sun_family = AF_UNIX; + strcpy(s_un.sun_path, SOCKET_PATH "/"); + strcat(s_un.sun_path, str); + length = sizeof(s_un.sun_family) + strlen(s_un.sun_path); + ud->name = str; + ud->type = portTypeListener; + ud->lockfd = fd; + ud->creation = creation; + if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { + DEBUGF(("socket call failed, errno = %d")); + driver_failure_posix(ud->port, errno); + return; + } + SET_NONBLOCKING(fd); + ud->fd = fd; + do_unlink(str); + DEBUGF(("Listen filename: %s", s_un.sun_path)); + if ((res = bind(fd, (struct sockaddr *) &s_un, length)) < 0) { + DEBUGF(("bind call failed, errno = %d",errno)); + driver_failure_posix(ud->port, errno); + return; + } + + if ((res = listen(fd, 5)) < 0) { + DEBUGF(("listen call failed, errno = %d")); + driver_failure_posix(ud->port, errno); + return; + } + driver_output(ud->port, "Lok", 3); +} + +/* +** Input/output/stop helpers +*/ +static void do_stop(UdsData *ud, int shutting_down) +{ + UdsData **tmp; + + DEBUGF(("Cleaning up, type = %d, fd = %d, lockfd = %d", ud->type, + ud->fd, ud->lockfd)); + for (tmp = &first_data; *tmp != NULL && *tmp != ud; tmp = &((*tmp)->next)) + ; + ASSERT(*tmp != NULL); + *tmp = (*tmp)->next; + if (ud->buffer != NULL) { + FREE(ud->buffer); + } + if (ud->fd >= 0) { + driver_select(ud->port, (ErlDrvEvent) ud->fd, + ERL_DRV_READ|ERL_DRV_WRITE|ERL_DRV_USE, 0); + } + if (ud->name) { + do_unlink(ud->name); + FREE(ud->name); + } + if (ud->lockfd >= 0) { + ASSERT(ud->type == portTypeListener); + close(ud->lockfd); /* the lock will be released */ + /* But leave the file there for the creation counter... */ + } + if (!shutting_down) { /* Dont bother if the driver is shutting down. */ + if (ud->partner != NULL) { + if (ud->type == portTypeAcceptor) { + UdsData *listener = ud->partner; + listener->partner = NULL; + driver_select(listener->port, (ErlDrvEvent) listener->fd, + ERL_DRV_READ, 0); + } else { + UdsData *acceptor = ud->partner; + ASSERT(ud->type == portTypeListener); + acceptor->partner = NULL; + driver_failure_eof(acceptor->port); + } + } + } + FREE(ud); +} + +/* +** Actually send the data +*/ +static void do_send(UdsData *ud, char *buff, int bufflen) +{ + char header[4]; + int written; + SysIOVec iov[2]; + ErlIOVec eio; + ErlDrvBinary *binv[] = {NULL,NULL}; + + put_packet_length(header, bufflen); + DEBUGF(("Write packet header %u,%u,%u,%u.", (Word) header[0], + (Word) header[1], (Word) header[2],(Word) header[3])); + iov[0].iov_base = (char *) header; + iov[0].iov_len = 4; + iov[1].iov_base = buff; + iov[1].iov_len = bufflen; + eio.iov = iov; + eio.binv = binv; + eio.vsize = 2; + eio.size = bufflen + 4; + written = 0; + if (driver_sizeq(ud->port) == 0) { + if ((written = writev(ud->fd, iov, 2)) == eio.size) { + ud->sent += written; + if (ud->type == portTypeCommand) { + driver_output(ud->port, "Sok", 3); + } + DEBUGF(("Wrote all %d bytes immediately.",written)); + return; + } else if (written < 0) { + if (errno != EWOULDBLOCK) { + driver_failure_eof(ud->port); + return; + } else { + written = 0; + } + } else { + ud->sent += written; + } + DEBUGF(("Wrote %d bytes immediately.",written)); + /* Enqueue remaining */ + } + driver_enqv(ud->port, &eio, written); + DEBUGF(("Sending output queue.")); + send_out_queue(ud); +} + +static void do_recv(UdsData *ud) +{ + int res; + char *ibuf; + ASSERT_NONBLOCK(ud->fd); + DEBUGF(("do_recv called, type = %d", ud->type)); + for(;;) { + if ((res = buffered_read_package(ud,&ibuf)) < 0) { + if (res == NORMAL_READ_FAILURE) { + DEBUGF(("do_recv normal read failed")); + driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ|ERL_DRV_USE, 1); + } else { + DEBUGF(("do_recv fatal read failed (%d) (%d)",errno, res)); + driver_failure_eof(ud->port); + } + return; + } + DEBUGF(("do_recv got package, port type = %d", ud->type)); + /* Got a package */ + if (ud->type == portTypeCommand) { + ibuf[-1] = 'R'; /* There is always room for a single byte opcode + before the actual buffer (where the packet + header was) */ + driver_output(ud->port,ibuf - 1, res + 1); + driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ, 0); + return; + } else { + ibuf[-1] = DIST_MAGIC_RECV_TAG; /* XXX */ + driver_output(ud->port,ibuf - 1, res + 1); + driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_READ|ERL_DRV_USE, 1); + } + } +} + + +/* +** Report control error, helper for error messages from control +*/ +static int report_control_error(char **buffer, int buff_len, + char *error_message) +{ + int elen = strlen(error_message); + if (elen + 1 < buff_len) { + *buffer = ALLOC(elen + 1); + } + **buffer = 1; + memcpy((*buffer) + 1, error_message, elen); + return elen + 1; +} + +/* +** Lower level I/O helpers +*/ +static int send_out_queue(UdsData *ud) +{ + ASSERT_NONBLOCK(ud->fd); + for(;;) { + int vlen; + SysIOVec *tmp = driver_peekq(ud->port, &vlen); + int wrote; + if (tmp == NULL) { + DEBUGF(("Write queue empty.")); + driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE, 0); + if (ud->type == portTypeCommand) { + driver_output(ud->port, "Sok", 3); + } + return 0; + } + if (vlen > IO_VECTOR_MAX) { + vlen = IO_VECTOR_MAX; + } + DEBUGF(("Trying to writev %d vectors", vlen)); +#ifdef HARDDEBUG + { + int i; + for (i = 0; i < vlen; ++i) { + DEBUGF(("Buffer %d: length %d", i, tmp[i].iov_len)); + } + } +#endif + if ((wrote = writev(ud->fd, tmp, vlen)) < 0) { + if (errno == EWOULDBLOCK) { + DEBUGF(("Write failed normal.")); + driver_select(ud->port, (ErlDrvEvent) ud->fd, ERL_DRV_WRITE|ERL_DRV_USE, 1); + return 0; + } else { + DEBUGF(("Write failed fatal (%d).", errno)); + driver_failure_eof(ud->port); + return -1; + } + } + driver_deq(ud->port, wrote); + ud->sent += wrote; + DEBUGF(("Wrote %d bytes of data.",wrote)); + } +} + +static int buffered_read_package(UdsData *ud, char **result) +{ + int res; + int data_size; + + if (ud->buffer_pos < ud->header_pos + HEADER_LENGTH) { + /* The header is not read yet */ + DEBUGF(("Header not read yet")); + if ((res = read_at_least(ud, ud->header_pos + HEADER_LENGTH - + ud->buffer_pos)) < 0) { + DEBUGF(("Header read failed")); + return res; + } + } + DEBUGF(("Header is read")); + /* We have at least the header read */ + data_size = get_packet_length((char *) ud->buffer + ud->header_pos); + DEBUGF(("Input packet size = %d", data_size)); + if (ud->buffer_pos < ud->header_pos + HEADER_LENGTH + data_size) { + /* We need to read more */ + DEBUGF(("Need to read more (bufferpos %d, want %d)", ud->buffer_pos, + ud->header_pos + HEADER_LENGTH + data_size)); + if ((res = read_at_least(ud, + ud->header_pos + HEADER_LENGTH + + data_size - ud->buffer_pos)) < 0) { + DEBUGF(("Data read failed")); + return res; + } + } + DEBUGF(("Data is completely read")); + *result = (char *) ud->buffer + ud->header_pos + HEADER_LENGTH; + ud->header_pos += HEADER_LENGTH + data_size; + return data_size; +} + +static int read_at_least(UdsData *ud, int num) +{ + int got; + if (ud->buffer_pos + num > ud->buffer_size) { + /* No place in the buffer, try to pack it */ + if (ud->header_pos > 0) { + int offset = ud->header_pos; + DEBUGF(("Packing buffer, buffer_pos was %d, buffer_size was %d " + "offset %d num %d header_pos %d.", + ud->buffer_pos, ud->buffer_size, + offset, num, ud->header_pos)); + memmove(ud->buffer, ud->buffer + ud->header_pos, + ud->buffer_pos - ud->header_pos); + ud->buffer_pos -= offset; + ud->header_pos -= offset; + } + /* The buffer is packed, look for space again and reallocate if + needed */ + if (ud->buffer_pos + num > ud->buffer_size) { + /* Let's grow in chunks of 256 */ + ud->buffer_size = (((ud->buffer_pos + num) / + CHUNK_SIZE) + 1) * CHUNK_SIZE; + DEBUGF(("New buffer size %d.",ud->buffer_size)); + /* We will always keep one extra byte before the buffer to + allow insertion of an opcode */ + if (!ud->buffer) { + ud->buffer = ALLOC(ud->buffer_size); + } else { + ud->buffer = REALLOC(ud->buffer, ud->buffer_size); + } + } + } + /* OK, now we have a large enough buffer, try to read into it */ + if ((got = read(ud->fd, ud->buffer + ud->buffer_pos, + ud->buffer_size - ud->buffer_pos)) < 0) { + /* It failed, the question is why... */ + if (errno == EAGAIN) { + return NORMAL_READ_FAILURE; + } + return SEVERE_READ_FAILURE; + } else if (got == 0) { + return EOF_READ_FAILURE; + } + DEBUGF(("Got %d bytes.", got)); + ud->received += got; + ud->buffer_pos += got; + /* So, we got some bytes, but enough ? */ + if (got < num) { + return NORMAL_READ_FAILURE; + } + return 0; +} + +static int get_packet_length(char *b) +{ + Byte *u = (Byte *) b; + int x = (((Word) u[0]) << 24) | (((Word) u[1]) << 16) | + (((Word) u[2]) << 8) | ((Word) u[3]); + DEBUGF(("Packet length %d.", x)); + return x; +} + +static void put_packet_length(char *b, int len) +{ + Byte *p = (Byte *) b; + Word n = (Word) len; + p[0] = (n >> 24) & 0xFF; + p[1] = (n >> 16) & 0xFF; + p[2] = (n >> 8) & 0xFF; + p[3] = n & 0xFF; +} + +/* +** Malloc wrappers +** Note! +** The function erl_exit is actually not a pert of the +** driver interface, but it is very nice to use if one wants to halt +** with a core and an erlang crash dump. +*/ +static void *my_malloc(size_t size) +{ + void erl_exit(int, char *, ...); + void *ptr; + + if ((ptr = driver_alloc(size)) == NULL) { + erl_exit(1,"Could not allocate %d bytes of memory",(int) size); + } + return ptr; +} + +static void *my_realloc(void *ptr, size_t size) +{ + void erl_exit(int, char *, ...); + void *nptr; + if ((nptr = driver_realloc(ptr, size)) == NULL) { + erl_exit(1,"Could not reallocate %d bytes of memory",(int) size); + } + return nptr; +} + + +/* +** Socket file handling helpers +*/ + +/* +** Check that directory exists, create if not (only works for one level) +*/ +static int ensure_dir(char *path) +{ + if (mkdir(path,0777) != 0 && errno != EEXIST) { + return -1; + } + return 0; +} + +/* +** Try to open a lock file and lock the first byte write-only (advisory) +** return the file descriptor if succesful, otherwise -1 (<0). +*/ +static int try_lock(char *sockname, Byte *p_creation) +{ + char *lockname; + int lockfd; + struct flock fl; + Byte creation; + + lockname = ALLOC(strlen(SOCKET_PATH)+1+strlen(sockname)+ + strlen(LOCK_SUFFIX)+1); + sprintf(lockname,SOCKET_PATH "/%s" LOCK_SUFFIX, sockname); + DEBUGF(("lockname = %s", lockname)); + if (ensure_dir(SOCKET_PATH) != 0) { + DEBUGF(("ensure_dir failed, errno = %d", errno)); + FREE(lockname); + return -1; + } + if ((lockfd = open(lockname, O_RDWR | O_CREAT, 0666)) < 0) { + DEBUGF(("open failed, errno = %d", errno)); + FREE(lockname); + return -1; + } + FREE(lockname); + memset(&fl,0,sizeof(fl)); + fl.l_type = F_WRLCK; + fl.l_whence = SEEK_SET; + fl.l_start = 0; + fl.l_len = 1; + if (fcntl(lockfd, F_SETLK, &fl) < 0) { + DEBUGF(("fcntl failed, errno = %d", errno)); + close(lockfd); + return -1; + } + /* OK, check for creation and update */ + if (read(lockfd, &creation, 1) < 1) { + creation = 0; + } else { + creation = (creation + 1) % 4; + } + lseek(lockfd, 0, SEEK_SET); + write(lockfd, &creation, 1); + fsync(lockfd); /* This could be concidered dangerous (blocking) */ + *p_creation = creation; + return lockfd; +} + +static void do_unlink(char *name) +{ + char buff[100]; + char *str = buff; + int len = strlen(SOCKET_PATH) + 1 + strlen(name) + 1; + + if (len > 100) { + str = ALLOC(len); + } + sprintf(str,SOCKET_PATH "/%s",name); + unlink(str); + if (str != buff) { + FREE(str); + } +} + diff --git a/lib/kernel/examples/uds_dist/ebin/.gitignore b/lib/kernel/examples/uds_dist/ebin/.gitignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/lib/kernel/examples/uds_dist/priv/.gitignore b/lib/kernel/examples/uds_dist/priv/.gitignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/lib/kernel/examples/uds_dist/src/Makefile b/lib/kernel/examples/uds_dist/src/Makefile new file mode 100644 index 0000000000..338d29b23d --- /dev/null +++ b/lib/kernel/examples/uds_dist/src/Makefile @@ -0,0 +1,27 @@ +# Example makefile + +RM=rm -f +CP=cp +EBIN=../ebin +EMULATOR=beam +ERLC=erlc +# Works if building in open source source tree +KERNEL_INCLUDE=$(ERL_TOP)/lib/kernel/src +ERLCFLAGS+= -W -b$(EMULATOR) -I$(KERNEL_INCLUDE) +APP=uds_dist.app + +MODULES=uds_server uds uds_dist + +TARGET_FILES=$(MODULES:%=$(EBIN)/%.$(EMULATOR)) + +opt: $(TARGET_FILES) $(EBIN)/$(APP) + +$(EBIN)/%.$(EMULATOR): %.erl + $(ERLC) $(ERLCFLAGS) -o$(EBIN) $< + +$(EBIN)/$(APP): $(APP) + $(CP) $(APP) $(EBIN)/$(APP) + +clean: + $(RM) $(TARGET_FILES) $(EBIN)/$(APP) + diff --git a/lib/kernel/examples/uds_dist/src/uds.erl b/lib/kernel/examples/uds_dist/src/uds.erl new file mode 100644 index 0000000000..ae1a78c44b --- /dev/null +++ b/lib/kernel/examples/uds_dist/src/uds.erl @@ -0,0 +1,166 @@ +-module(uds). + +-export([listen/1, connect/1, accept/1, send/2, recv/1, close/1, + get_port/1, get_status_counters/1, set_mode/2, controlling_process/2, + tick/1, get_creation/1]). + +-define(decode(A,B,C,D), (((A) bsl 24) bor + ((B) bsl 16) bor ((C) bsl 8) bor (D))). +-define(encode(N), [(((N) bsr 24) band 16#FF), (((N) bsr 16) band 16#FF), + (((N) bsr 8) band 16#FF), ((N) band 16#FF)]). +-define(check_server(), case whereis(uds_server) of + undefined -> + exit(uds_server_not_started); + _ -> + ok + end). + +listen(Name) -> + ?check_server(), + command(port(),$L,Name). + + +connect(Name) -> + ?check_server(), + command(port(),$C,Name). + +accept(Port) -> + ?check_server(), + case control(Port,$N) of + {ok, N} -> + command(port(),$A,N); + Else -> + Else + end. + +send(Port,Data) -> + ?check_server(), + command(Port, $S, Data). + +recv(Port) -> + ?check_server(), + command(Port, $R, []). + +close(Port) -> + ?check_server(), + (catch unlink(Port)), %% Avoids problem with trap exits. + case (catch erlang:port_close(Port)) of + {'EXIT', Reason} -> + {error, closed}; + _ -> + ok + end. + +get_port(Port) -> + ?check_server(), + {ok,Port}. + +get_status_counters(Port) -> + ?check_server(), + case control(Port, $S) of + {ok, {C0, C1, C2}} -> + {ok, C0, C1, C2}; + Other -> + Other + end. + +get_creation(Port) -> + ?check_server(), + case control(Port, $R) of + {ok, [A]} -> + A; + Else -> + Else + end. + + +set_mode(Port, command) -> + ?check_server(), + control(Port,$C); +set_mode(Port,intermediate) -> + ?check_server(), + control(Port,$I); +set_mode(Port,data) -> + ?check_server(), + control(Port,$D). + +tick(Port) -> + ?check_server(), + control(Port,$T). + +controlling_process(Port, Pid) -> + ?check_server(), + case (catch erlang:port_connect(Port, Pid)) of + true -> + (catch unlink(Port)), + ok; + {'EXIT', {badarg, _}} -> + {error, closed}; + Else -> + exit({unexpected_driver_response, Else}) + end. + + +control(Port, Command) -> + case (catch erlang:port_control(Port, Command, [])) of + [0] -> + ok; + [0,A] -> + {ok, [A]}; + [0,A,B,C,D] -> + {ok, [A,B,C,D]}; + [0,A1,B1,C1,D1,A2,B2,C2,D2,A3,B3,C3,D3] -> + {ok, {?decode(A1,B1,C1,D1),?decode(A2,B2,C2,D2), + ?decode(A3,B3,C3,D3)}}; + [1|Error] -> + exit({error, list_to_atom(Error)}); + {'EXIT', {badarg, _}} -> + {error, closed}; + Else -> + exit({unexpected_driver_response, Else}) + end. + + +command(Port, Command, Parameters) -> + SavedTrapExit = process_flag(trap_exit,true), + case (catch erlang:port_command(Port,[Command | Parameters])) of + true -> + receive + {Port, {data, [Command, $o, $k]}} -> + process_flag(trap_exit,SavedTrapExit), + {ok, Port}; + {Port, {data, [Command |T]}} -> + process_flag(trap_exit,SavedTrapExit), + {ok, T}; + {Port, Else} -> + process_flag(trap_exit,SavedTrapExit), + exit({unexpected_driver_response, Else}); + {'EXIT', Port, normal} -> + process_flag(trap_exit,SavedTrapExit), + {error, closed}; + {'EXIT', Port, Error} -> + process_flag(trap_exit,SavedTrapExit), + exit(Error) + end; + {'EXIT', {badarg, _}} -> + process_flag(trap_exit,SavedTrapExit), + {error, closed}; + Unexpected -> + process_flag(trap_exit,SavedTrapExit), + exit({unexpected_driver_response, Unexpected}) + end. + +port() -> + SavedTrapExit = process_flag(trap_exit,true), + case open_port({spawn, "uds_drv"},[]) of + P when port(P) -> + process_flag(trap_exit,SavedTrapExit), + P; + {'EXIT',Error} -> + process_flag(trap_exit,SavedTrapExit), + exit(Error); + Else -> + process_flag(trap_exit,SavedTrapExit), + exit({unexpected_driver_response, Else}) + end. + diff --git a/lib/kernel/examples/uds_dist/src/uds_dist.app b/lib/kernel/examples/uds_dist/src/uds_dist.app new file mode 100644 index 0000000000..2a58694c94 --- /dev/null +++ b/lib/kernel/examples/uds_dist/src/uds_dist.app @@ -0,0 +1,7 @@ +{application, uds_dist, + [{description, "SSL socket version 2"}, + {vsn, "1.0"}, + {modules, [uds_server]}, + {registered, [uds_server]}, + {applications, [kernel, stdlib]}, + {env, []}]}. diff --git a/lib/kernel/examples/uds_dist/src/uds_dist.erl b/lib/kernel/examples/uds_dist/src/uds_dist.erl new file mode 100644 index 0000000000..7a9c15a3c8 --- /dev/null +++ b/lib/kernel/examples/uds_dist/src/uds_dist.erl @@ -0,0 +1,304 @@ +%% ``The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved via the world wide web at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. +%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings +%% AB. All Rights Reserved.'' +%% +%% $Id$ +%% +-module(uds_dist). + +%% Handles the connection setup phase with other Erlang nodes. + +-export([childspecs/0, listen/1, accept/1, accept_connection/5, + setup/4, close/1, select/1, is_node_name/1]). + +%% internal exports + +-export([accept_loop/2,do_accept/6,do_setup/5, getstat/1,tick/1]). + +-import(error_logger,[error_msg/2]). + +-include("net_address.hrl"). + + + +-define(to_port(Socket, Data), + case uds:send(Socket, Data) of + {error, closed} -> + self() ! {uds_closed, Socket}, + {error, closed}; + R -> + R + end). + + +-include("dist.hrl"). +-include("dist_util.hrl"). +-record(tick, {read = 0, + write = 0, + tick = 0, + ticked = 0 + }). + + +%% ------------------------------------------------------------- +%% This function should return a valid childspec, so that +%% the primitive ssl_server gets supervised +%% ------------------------------------------------------------- +childspecs() -> + {ok, [{uds_server,{uds_server, start_link, []}, + permanent, 2000, worker, [uds_server]}]}. + + +%% ------------------------------------------------------------ +%% Select this protocol based on node name +%% select(Node) => Bool +%% ------------------------------------------------------------ + +select(Node) -> + {ok, MyHost} = inet:gethostname(), + case split_node(atom_to_list(Node), $@, []) of + [_, MyHost] -> + true; + _ -> + false + end. + +%% ------------------------------------------------------------ +%% Create the listen socket, i.e. the port that this erlang +%% node is accessible through. +%% ------------------------------------------------------------ + +listen(Name) -> + case uds:listen(atom_to_list(Name)) of + {ok, Socket} -> + {ok, {Socket, + #net_address{address = [], + host = inet:gethostname(), + protocol = uds, + family = uds}, + uds:get_creation(Socket)}}; + Error -> + Error + end. + +%% ------------------------------------------------------------ +%% Accepts new connection attempts from other Erlang nodes. +%% ------------------------------------------------------------ + +accept(Listen) -> + spawn_link(?MODULE, accept_loop, [self(), Listen]). + +accept_loop(Kernel, Listen) -> + process_flag(priority, max), + case uds:accept(Listen) of + {ok, Socket} -> + Kernel ! {accept,self(),Socket,uds,uds}, + controller(Kernel, Socket), + accept_loop(Kernel, Listen); + Error -> + exit(Error) + end. + +controller(Kernel, Socket) -> + receive + {Kernel, controller, Pid} -> + uds:controlling_process(Socket, Pid), + Pid ! {self(), controller}; + {Kernel, unsupported_protocol} -> + exit(unsupported_protocol) + end. + +%% ------------------------------------------------------------ +%% Accepts a new connection attempt from another Erlang node. +%% Performs the handshake with the other side. +%% ------------------------------------------------------------ + +accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) -> + spawn_link(?MODULE, do_accept, + [self(), AcceptPid, Socket, MyNode, + Allowed, SetupTime]). + +do_accept(Kernel, AcceptPid, Socket, MyNode, Allowed, SetupTime) -> + process_flag(priority, max), + receive + {AcceptPid, controller} -> + Timer = dist_util:start_timer(SetupTime), + HSData = #hs_data{ + kernel_pid = Kernel, + this_node = MyNode, + socket = Socket, + timer = Timer, + this_flags = ?DFLAG_PUBLISHED bor + ?DFLAG_ATOM_CACHE bor + ?DFLAG_EXTENDED_REFERENCES bor + ?DFLAG_DIST_MONITOR bor + ?DFLAG_FUN_TAGS, + allowed = Allowed, + f_send = fun(S,D) -> uds:send(S,D) end, + f_recv = fun(S,N,T) -> uds:recv(S) + end, + f_setopts_pre_nodeup = + fun(S) -> + uds:set_mode(S, intermediate) + end, + f_setopts_post_nodeup = + fun(S) -> + uds:set_mode(S, data) + end, + f_getll = fun(S) -> + uds:get_port(S) + end, + f_address = fun get_remote_id/2, + mf_tick = {?MODULE, tick}, + mf_getstat = {?MODULE,getstat} + }, + dist_util:handshake_other_started(HSData) + end. + +%% ------------------------------------------------------------ +%% Get remote information about a Socket. +%% ------------------------------------------------------------ + +get_remote_id(Socket, Node) -> + [_, Host] = split_node(atom_to_list(Node), $@, []), + #net_address { + address = [], + host = Host, + protocol = uds, + family = uds }. + +%% ------------------------------------------------------------ +%% Setup a new connection to another Erlang node. +%% Performs the handshake with the other side. +%% ------------------------------------------------------------ + +setup(Node, MyNode, LongOrShortNames,SetupTime) -> + spawn_link(?MODULE, do_setup, [self(), + Node, + MyNode, + LongOrShortNames, + SetupTime]). + +do_setup(Kernel, Node, MyNode, LongOrShortNames,SetupTime) -> + process_flag(priority, max), + ?trace("~p~n",[{uds_dist,self(),setup,Node}]), + [Name, Address] = splitnode(Node, LongOrShortNames), + {ok, MyName} = inet:gethostname(), + case Address of + MyName -> + Timer = dist_util:start_timer(SetupTime), + case uds:connect(Name) of + {ok, Socket} -> + HSData = #hs_data{ + kernel_pid = Kernel, + other_node = Node, + this_node = MyNode, + socket = Socket, + timer = Timer, + this_flags = ?DFLAG_PUBLISHED bor + ?DFLAG_ATOM_CACHE bor + ?DFLAG_EXTENDED_REFERENCES bor + ?DFLAG_DIST_MONITOR bor + ?DFLAG_FUN_TAGS, + other_version = 1, + f_send = fun(S,D) -> + uds:send(S,D) + end, + f_recv = fun(S,N,T) -> + uds:recv(S) + end, + f_setopts_pre_nodeup = + fun(S) -> + uds:set_mode(S, intermediate) + end, + f_setopts_post_nodeup = + fun(S) -> + uds:set_mode(S, data) + end, + f_getll = fun(S) -> + uds:get_port(S) + end, + f_address = + fun(_,_) -> + #net_address{ + address = [], + host = Address, + protocol = uds, + family = uds} + end, + mf_tick = {?MODULE, tick}, + mf_getstat = {?MODULE,getstat} + }, + dist_util:handshake_we_started(HSData); + _ -> + ?shutdown(Node) + end; + Other -> + ?shutdown(Node) + end. + +%% +%% Close a socket. +%% +close(Socket) -> + uds:close(Socket). + + +%% If Node is illegal terminate the connection setup!! +splitnode(Node, LongOrShortNames) -> + case split_node(atom_to_list(Node), $@, []) of + [Name|Tail] when Tail /= [] -> + Host = lists:append(Tail), + case split_node(Host, $., []) of + [_] when LongOrShortNames == longnames -> + error_msg("** System running to use " + "fully qualified " + "hostnames **~n" + "** Hostname ~s is illegal **~n", + [Host]), + ?shutdown(Node); + L when length(L) > 1, LongOrShortNames == shortnames -> + error_msg("** System NOT running to use fully qualified " + "hostnames **~n" + "** Hostname ~s is illegal **~n", + [Host]), + ?shutdown(Node); + _ -> + [Name, Host] + end; + [_] -> + error_msg("** Nodename ~p illegal, no '@' character **~n", + [Node]), + ?shutdown(Node); + _ -> + error_msg("** Nodename ~p illegal **~n", [Node]), + ?shutdown(Node) + end. + +split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])]; +split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]); +split_node([], _, Ack) -> [lists:reverse(Ack)]. + +is_node_name(Node) when atom(Node) -> + case split_node(atom_to_list(Node), $@, []) of + [_, Host] -> true; + _ -> false + end; +is_node_name(Node) -> + false. + +tick(Sock) -> + uds:tick(Sock). +getstat(Socket) -> + uds:get_status_counters(Socket). diff --git a/lib/kernel/examples/uds_dist/src/uds_server.erl b/lib/kernel/examples/uds_dist/src/uds_server.erl new file mode 100644 index 0000000000..c060130f9d --- /dev/null +++ b/lib/kernel/examples/uds_dist/src/uds_server.erl @@ -0,0 +1,156 @@ +%%%---------------------------------------------------------------------- +%%% File : uds_server.erl +%%% Purpose : Holder for the uds_drv ddll driver. +%%% Created : 15 Mar 2000 +%%%---------------------------------------------------------------------- + +-module(uds_server). + +-behaviour(gen_server). + +%% External exports +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(DRIVER_NAME,"uds_drv"). + +%%%---------------------------------------------------------------------- +%%% API +%%%---------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%%---------------------------------------------------------------------- +%%% Callback functions from gen_server +%%%---------------------------------------------------------------------- + +%%---------------------------------------------------------------------- +%% Func: init/1 +%% Returns: {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%%---------------------------------------------------------------------- +init([]) -> + process_flag(trap_exit,true), + case load_driver() of + ok -> + {ok, []}; + {error, already_loaded} -> + {ok, []}; + Error -> + exit(Error) + end. + + +%%---------------------------------------------------------------------- +%% Func: handle_call/3 +%% Returns: {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | (terminate/2 is called) +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- +handle_call(Request, From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%---------------------------------------------------------------------- +%% Func: handle_cast/2 +%% Returns: {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- +handle_cast(Msg, State) -> + {noreply, State}. + +%%---------------------------------------------------------------------- +%% Func: handle_info/2 +%% Returns: {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- +handle_info(Info, State) -> + {noreply, State}. + +%%---------------------------------------------------------------------- +%% Func: terminate/2 +%% Purpose: Shutdown the server +%% Returns: any (ignored by gen_server) +%%---------------------------------------------------------------------- +terminate(Reason, State) -> + erl_ddll:unload_driver(?DRIVER_NAME), + ok. + +%%---------------------------------------------------------------------- +%% Func: code_change/3 +%% Purpose: Convert process state when code is changed +%% Returns: {ok, NewState} +%%---------------------------------------------------------------------- +code_change(OldVsn, State, Extra) -> + {ok, State}. + +%%%---------------------------------------------------------------------- +%%% Internal functions +%%%---------------------------------------------------------------------- + +%% +%% Actually load the driver. +%% +load_driver() -> + Dir = find_priv_lib(), + erl_ddll:load_driver(Dir,?DRIVER_NAME). + +%% +%% As this server may be started by the distribution, it is not safe to assume +%% a working code server, neither a working file server. +%% I try to utilize the most primitive interfaces available to determine +%% the directory of the port_program. +%% +find_priv_lib() -> + PrivDir = case (catch code:priv_dir(uds_dist)) of + {'EXIT', _} -> + %% Code server probably not startet yet + {ok, P} = erl_prim_loader:get_path(), + ModuleFile = atom_to_list(?MODULE) ++ extension(), + Pd = (catch lists:foldl + (fun(X,Acc) -> + M = filename:join([X, ModuleFile]), + %% The file server probably not started + %% either, has to use raw interface. + case file:raw_read_file_info(M) of + {ok,_} -> + %% Found our own module in the + %% path, lets bail out with + %% the priv_dir of this directory + Y = filename:split(X), + throw(filename:join + (lists:sublist + (Y,length(Y) - 1) + ++ ["priv"])); + _ -> + Acc + end + end, + false,P)), + case Pd of + false -> + exit(uds_dist_priv_lib_indeterminate); + _ -> + Pd + end; + Dir -> + Dir + end, + filename:join([PrivDir, "lib"]). + +extension() -> + %% erlang:info(machine) returns machine name as text in all uppercase + "." ++ lists:map(fun(X) -> + X + $a - $A + end, + erlang:info(machine)). + -- cgit v1.2.3