aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/drivers/common
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/drivers/common')
-rw-r--r--erts/emulator/drivers/common/efile_drv.c48
-rw-r--r--erts/emulator/drivers/common/erl_efile.h1
-rw-r--r--erts/emulator/drivers/common/gzio.c63
-rw-r--r--erts/emulator/drivers/common/gzio.h16
-rw-r--r--erts/emulator/drivers/common/gzio_zutil.h9
-rw-r--r--erts/emulator/drivers/common/inet_drv.c146
6 files changed, 199 insertions, 84 deletions
diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c
index 8de578d8b7..e040864d24 100644
--- a/erts/emulator/drivers/common/efile_drv.c
+++ b/erts/emulator/drivers/common/efile_drv.c
@@ -111,7 +111,6 @@
#include "erl_driver.h"
#include "erl_efile.h"
#include "erl_threads.h"
-#include "zlib.h"
#include "gzio.h"
#include "dtrace-wrapper.h"
#include <ctype.h>
@@ -168,7 +167,7 @@ dt_private *get_dt_private(int);
#ifdef USE_THREADS
-#define IF_THRDS if (sys_info.async_threads > 0)
+#define THRDS_AVAILABLE (sys_info.async_threads > 0)
#ifdef HARDDEBUG /* HARDDEBUG in io.c is expected too */
#define TRACE_DRIVER fprintf(stderr, "Efile: ")
#else
@@ -178,24 +177,26 @@ dt_private *get_dt_private(int);
#define MUTEX_LOCK(m) do { IF_THRDS { TRACE_DRIVER; driver_pdl_lock(m); } } while (0)
#define MUTEX_UNLOCK(m) do { IF_THRDS { TRACE_DRIVER; driver_pdl_unlock(m); } } while (0)
#else
-#define IF_THRDS if (0)
+#define THRDS_AVAILABLE (0)
#define MUTEX_INIT(m, p)
#define MUTEX_LOCK(m)
#define MUTEX_UNLOCK(m)
#endif
+#define IF_THRDS if (THRDS_AVAILABLE)
+#define SENDFILE_FLGS_USE_THREADS (1 << 0)
/**
* On DARWIN sendfile can deadlock with close if called in
* different threads. So until Apple fixes so that sendfile
* is not buggy we disable usage of the async pool for
* DARWIN. The testcase t_sendfile_crashduring reproduces
- * this error when using +A 10.
+ * this error when using +A 10 and enabling SENDFILE_FLGS_USE_THREADS.
*/
#if defined(__APPLE__) && defined(__MACH__)
-#define USE_THRDS_FOR_SENDFILE 0
+#define USE_THRDS_FOR_SENDFILE(DATA) 0
#else
-#define USE_THRDS_FOR_SENDFILE (sys_info.async_threads > 0)
+#define USE_THRDS_FOR_SENDFILE(DATA) (DATA->flags & SENDFILE_FLGS_USE_THREADS)
#endif /* defined(__APPLE__) && defined(__MACH__) */
@@ -301,7 +302,7 @@ static void file_stop_select(ErlDrvEvent event, void* _);
enum e_timer {timer_idle, timer_again, timer_write};
#ifdef HAVE_SENDFILE
enum e_sendfile {sending, not_sending};
-static void free_sendfile(void *data);
+#define SENDFILE_USE_THREADS (1 << 0)
#endif /* HAVE_SENDFILE */
struct t_data;
@@ -818,7 +819,7 @@ file_start(ErlDrvPort port, char* command)
static void do_close(int flags, SWord fd) {
if (flags & EFILE_COMPRESSED) {
- erts_gzclose((gzFile)(fd));
+ erts_gzclose((ErtsGzFile)(fd));
} else {
efile_closefile((int) fd);
}
@@ -1136,7 +1137,7 @@ static void invoke_read(void *data)
}
read_size = size;
if (d->flags & EFILE_COMPRESSED) {
- read_size = erts_gzread((gzFile)d->fd,
+ read_size = erts_gzread((ErtsGzFile)d->fd,
d->c.read.binp->orig_bytes + d->c.read.bin_offset,
size);
status = (read_size != (size_t) -1);
@@ -1209,7 +1210,7 @@ static void invoke_read_line(void *data)
size = need - d->c.read_line.read_size;
}
if (d->flags & EFILE_COMPRESSED) {
- read_size = erts_gzread((gzFile)d->fd,
+ read_size = erts_gzread((ErtsGzFile)d->fd,
d->c.read_line.binp->orig_bytes +
d->c.read_line.read_offset + d->c.read_line.read_size,
size);
@@ -1250,7 +1251,7 @@ static void invoke_read_line(void *data)
d->c.read_line.read_size -= too_much;
ASSERT(d->c.read_line.read_size >= 0);
if (d->flags & EFILE_COMPRESSED) {
- Sint64 location = erts_gzseek((gzFile)d->fd,
+ Sint64 location = erts_gzseek((ErtsGzFile)d->fd,
-((Sint64) too_much), EFILE_SEEK_CUR);
if (location == -1) {
d->result_ok = 0;
@@ -1535,7 +1536,7 @@ static void invoke_writev(void *data) {
*/
errno = EINVAL;
if (! (status =
- erts_gzwrite((gzFile)d->fd,
+ erts_gzwrite((ErtsGzFile)d->fd,
iov[i].iov_base,
iov[i].iov_len)) == iov[i].iov_len) {
d->errInfo.posix_errno =
@@ -1797,7 +1798,7 @@ static void invoke_lseek(void *data)
d->errInfo.posix_errno = EINVAL;
status = 0;
} else {
- d->c.lseek.location = erts_gzseek((gzFile)d->fd,
+ d->c.lseek.location = erts_gzseek((ErtsGzFile)d->fd,
offset, d->c.lseek.origin);
if (d->c.lseek.location == -1) {
d->errInfo.posix_errno = errno;
@@ -1885,7 +1886,7 @@ static void invoke_open(void *data)
if (status || (d->errInfo.posix_errno != EISDIR)) {
mode = (d->flags & EFILE_MODE_READ) ? "rb" : "wb";
d->fd = (SWord) erts_gzopen(d->b, mode);
- if ((gzFile)d->fd) {
+ if ((ErtsGzFile)d->fd) {
status = 1;
} else {
if (errno == 0) {
@@ -1933,7 +1934,7 @@ static void invoke_sendfile(void *data)
d->c.sendfile.written += nbytes;
- if (result == 1 || (result == 0 && USE_THRDS_FOR_SENDFILE)) {
+ if (result == 1 || (result == 0 && USE_THRDS_FOR_SENDFILE(d))) {
d->result_ok = 0;
} else if (result == 0 && (d->errInfo.posix_errno == EAGAIN
|| d->errInfo.posix_errno == EINTR)) {
@@ -1950,7 +1951,7 @@ static void invoke_sendfile(void *data)
static void free_sendfile(void *data) {
struct t_data *d = (struct t_data *)data;
- if (USE_THRDS_FOR_SENDFILE) {
+ if (USE_THRDS_FOR_SENDFILE(d)) {
SET_NONBLOCKING(d->c.sendfile.out_fd);
} else {
MUTEX_LOCK(d->c.sendfile.q_mtx);
@@ -4123,8 +4124,16 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
goto done;
}
- if (hd_len != 0 || tl_len != 0 || flags != 0) {
- /* We do not allow header, trailers and/or flags right now */
+ if (hd_len != 0 || tl_len != 0) {
+ /* We do not allow header, trailers */
+ reply_posix_error(desc, EINVAL);
+ goto done;
+ }
+
+
+ if (flags & SENDFILE_FLGS_USE_THREADS && !THRDS_AVAILABLE) {
+ /* We do not allow use_threads flag on a system where
+ no threads are available. */
reply_posix_error(desc, EINVAL);
goto done;
}
@@ -4134,6 +4143,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
d->command = command;
d->invoke = invoke_sendfile;
d->free = free_sendfile;
+ d->flags = flags;
d->level = 2;
d->c.sendfile.out_fd = (int) out_fd;
@@ -4153,7 +4163,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
d->c.sendfile.nbytes = nbytes;
- if (USE_THRDS_FOR_SENDFILE) {
+ if (USE_THRDS_FOR_SENDFILE(d)) {
SET_BLOCKING(d->c.sendfile.out_fd);
} else {
/**
diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h
index 5387f75efc..95c036db8f 100644
--- a/erts/emulator/drivers/common/erl_efile.h
+++ b/erts/emulator/drivers/common/erl_efile.h
@@ -34,6 +34,7 @@
#define EFILE_COMPRESSED 8
#define EFILE_MODE_EXCL 16
#define EFILE_NO_TRUNCATE 32 /* Special for reopening on VxWorks */
+#define EFILE_MODE_SYNC 64
/*
* Seek modes for efile_seek().
diff --git a/erts/emulator/drivers/common/gzio.c b/erts/emulator/drivers/common/gzio.c
index e085c262b0..8ec2c3f762 100644
--- a/erts/emulator/drivers/common/gzio.c
+++ b/erts/emulator/drivers/common/gzio.c
@@ -73,15 +73,15 @@ typedef struct gz_stream {
int transparent; /* 1 if input file is not a .gz file */
char mode; /* 'w' or 'r' */
int position; /* Position (for seek) */
- int (*destroy)OF((struct gz_stream*)); /* Function to destroy
+ int (*destroy)(struct gz_stream*); /* Function to destroy
* this structure. */
} gz_stream;
-local gzFile gz_open OF((const char *path, const char *mode));
-local int get_byte OF((gz_stream *s));
-local void check_header OF((gz_stream *s));
-local int destroy OF((gz_stream *s));
-local uLong getLong OF((gz_stream *s));
+local ErtsGzFile gz_open (const char *path, const char *mode);
+local int get_byte (gz_stream *s);
+local void check_header (gz_stream *s);
+local int destroy (gz_stream *s);
+local uLong getLong (gz_stream *s);
#ifdef UNIX
/*
@@ -144,7 +144,7 @@ local uLong getLong OF((gz_stream *s));
can be checked to distinguish the two cases (if errno is zero, the
zlib error is Z_MEM_ERROR).
*/
-local gzFile gz_open (path, mode)
+local ErtsGzFile gz_open (path, mode)
const char *path;
const char *mode;
{
@@ -179,7 +179,7 @@ local gzFile gz_open (path, mode)
s->path = (char*)ALLOC(FILENAME_BYTELEN(path)+FILENAME_CHARSIZE);
if (s->path == NULL) {
- return s->destroy(s), (gzFile)Z_NULL;
+ return s->destroy(s), (ErtsGzFile)Z_NULL;
}
FILENAME_COPY(s->path, path); /* do this early for debugging */
@@ -197,7 +197,7 @@ local gzFile gz_open (path, mode)
} while (*p++ && m < fmode + sizeof(fmode) - 1);
*m = '\0';
if (s->mode == '\0')
- return s->destroy(s), (gzFile)Z_NULL;
+ return s->destroy(s), (ErtsGzFile)Z_NULL;
if (s->mode == 'w') {
err = deflateInit2(&(s->stream), level,
@@ -207,7 +207,7 @@ local gzFile gz_open (path, mode)
s->stream.next_out = s->outbuf = (Byte*)ALLOC(Z_BUFSIZE);
if (err != Z_OK || s->outbuf == Z_NULL) {
- return s->destroy(s), (gzFile)Z_NULL;
+ return s->destroy(s), (ErtsGzFile)Z_NULL;
}
} else {
/*
@@ -221,7 +221,7 @@ local gzFile gz_open (path, mode)
s->stream.next_in = s->inbuf = (Byte*)ALLOC(Z_BUFSIZE);
if (err != Z_OK || s->inbuf == Z_NULL) {
- return s->destroy(s), (gzFile)Z_NULL;
+ return s->destroy(s), (ErtsGzFile)Z_NULL;
}
}
s->stream.avail_out = Z_BUFSIZE;
@@ -229,17 +229,16 @@ local gzFile gz_open (path, mode)
errno = 0;
#if defined(FILENAMES_16BIT)
{
- char wfmode[160];
- int i=0,j;
- for(j=0;fmode[j] != '\0';++j) {
- wfmode[i++]=fmode[j];
- wfmode[i++]='\0';
+ WCHAR wfmode[80];
+ int i = 0;
+ int j;
+ for(j = 0; fmode[j] != '\0'; ++j) {
+ wfmode[i++] = (WCHAR) fmode[j];
}
- wfmode[i++] = '\0';
- wfmode[i++] = '\0';
- s->file = F_OPEN(path, wfmode);
+ wfmode[i++] = L'\0';
+ s->file = _wfopen((WCHAR *)path, wfmode);
if (s->file == NULL) {
- return s->destroy(s), (gzFile)Z_NULL;
+ return s->destroy(s), (ErtsGzFile)Z_NULL;
}
}
#elif defined(UNIX)
@@ -249,18 +248,18 @@ local gzFile gz_open (path, mode)
s->file = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0666);
}
if (s->file == -1) {
- return s->destroy(s), (gzFile)Z_NULL;
+ return s->destroy(s), (ErtsGzFile)Z_NULL;
}
#else
- s->file = F_OPEN(path, fmode);
+ s->file = fopen(path, fmode);
if (s->file == NULL) {
- return s->destroy(s), (gzFile)Z_NULL;
+ return s->destroy(s), (ErtsGzFile)Z_NULL;
}
#endif
if (s->mode == 'r') {
check_header(s); /* skip the .gz header */
}
- return (gzFile)s;
+ return (ErtsGzFile)s;
}
/* ===========================================================================
@@ -296,7 +295,7 @@ local int gz_rewind (gz_stream *s)
/* ===========================================================================
Opens a gzip (.gz) file for reading or writing.
*/
-gzFile erts_gzopen (path, mode)
+ErtsGzFile erts_gzopen (path, mode)
const char *path;
const char *mode;
{
@@ -447,7 +446,7 @@ local int destroy (s)
gzread returns the number of bytes actually read (0 for end of file).
*/
int
-erts_gzread(gzFile file, voidp buf, unsigned len)
+erts_gzread(ErtsGzFile file, voidp buf, unsigned len)
{
gz_stream *s = (gz_stream*)file;
Bytef *start = buf; /* starting point for crc computation */
@@ -557,7 +556,7 @@ erts_gzread(gzFile file, voidp buf, unsigned len)
gzwrite returns the number of bytes actually written (0 in case of error).
*/
int
-erts_gzwrite(gzFile file, voidp buf, unsigned len)
+erts_gzwrite(ErtsGzFile file, voidp buf, unsigned len)
{
gz_stream *s = (gz_stream*)file;
@@ -593,7 +592,7 @@ erts_gzwrite(gzFile file, voidp buf, unsigned len)
*/
int
-erts_gzseek(gzFile file, int offset, int whence)
+erts_gzseek(ErtsGzFile file, int offset, int whence)
{
int pos;
gz_stream* s = (gz_stream *) file;
@@ -655,7 +654,7 @@ erts_gzseek(gzFile file, int offset, int whence)
degrade compression.
*/
int
-erts_gzflush(gzFile file, int flush)
+erts_gzflush(ErtsGzFile file, int flush)
{
uInt len;
int done = 0;
@@ -714,7 +713,7 @@ local uLong getLong (s)
and deallocates all the (de)compression state.
*/
int
-erts_gzclose(gzFile file)
+erts_gzclose(ErtsGzFile file)
{
int err;
gz_stream *s = (gz_stream*)file;
@@ -723,9 +722,9 @@ erts_gzclose(gzFile file)
if (s->mode == 'w') {
err = erts_gzflush (file, Z_FINISH);
- if (err != Z_OK) return s->destroy(file);
+ if (err != Z_OK) return s->destroy(s);
}
- return s->destroy(file);
+ return s->destroy(s);
}
diff --git a/erts/emulator/drivers/common/gzio.h b/erts/emulator/drivers/common/gzio.h
index 3f1e546140..ea50d922ec 100644
--- a/erts/emulator/drivers/common/gzio.h
+++ b/erts/emulator/drivers/common/gzio.h
@@ -17,11 +17,15 @@
* %CopyrightEnd%
*/
-gzFile erts_gzopen (const char *path, const char *mode);
-int erts_gzread(gzFile file, voidp buf, unsigned len);
-int erts_gzwrite(gzFile file, voidp buf, unsigned len);
-int erts_gzseek(gzFile, int, int);
-int erts_gzflush(gzFile file, int flush);
-int erts_gzclose(gzFile file);
+#include "zlib.h"
+
+typedef struct erts_gzFile* ErtsGzFile;
+
+ErtsGzFile erts_gzopen (const char *path, const char *mode);
+int erts_gzread(ErtsGzFile file, voidp buf, unsigned len);
+int erts_gzwrite(ErtsGzFile file, voidp buf, unsigned len);
+int erts_gzseek(ErtsGzFile, int, int);
+int erts_gzflush(ErtsGzFile file, int flush);
+int erts_gzclose(ErtsGzFile file);
ErlDrvBinary* erts_gzinflate_buffer(char*, uLong);
ErlDrvBinary* erts_gzdeflate_buffer(char*, uLong);
diff --git a/erts/emulator/drivers/common/gzio_zutil.h b/erts/emulator/drivers/common/gzio_zutil.h
index 00eccc80fc..854205cc2c 100644
--- a/erts/emulator/drivers/common/gzio_zutil.h
+++ b/erts/emulator/drivers/common/gzio_zutil.h
@@ -23,12 +23,6 @@
* that may change or not exist at all.
*/
-#ifndef HAVE_LIBZ
-/* Use our "real" copy of zutil.h if we don't use shared zlib */
-#include "zutil.h"
-
-#else /* HAVE_LIBZ: Shared zlib is used */
-
#define local static
#define DEF_MEM_LEVEL 8
#define zmemcpy sys_memcpy
@@ -77,6 +71,3 @@
# define OS_CODE 0x03 /* assume Unix */
#endif
-
-#endif /* HAVE_LIBZ */
-
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c
index 45fac69303..4a861b121c 100644
--- a/erts/emulator/drivers/common/inet_drv.c
+++ b/erts/emulator/drivers/common/inet_drv.c
@@ -86,6 +86,13 @@
#endif
typedef unsigned long long llu_t;
+#ifndef INT16_MIN
+#define INT16_MIN (-32768)
+#endif
+#ifndef INT16_MAX
+#define INT16_MAX (32767)
+#endif
+
#ifdef __WIN32__
#define STRNCASECMP strncasecmp
@@ -612,6 +619,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_PASSIVE 0 /* false */
#define INET_ACTIVE 1 /* true */
#define INET_ONCE 2 /* true; active once then passive */
+#define INET_MULTI 3 /* true; active N then passive */
/* INET_REQ_GETSTATUS enumeration */
#define INET_F_OPEN 0x0001
@@ -846,9 +854,10 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_IFNAMSIZ 16
/* INET Ignore states */
-#define INET_IGNORE_NONE 0
-#define INET_IGNORE_READ 1
-#define INET_IGNORE_WRITE 1 << 1
+#define INET_IGNORE_NONE 0
+#define INET_IGNORE_READ (1 << 0)
+#define INET_IGNORE_WRITE (1 << 1)
+#define INET_IGNORE_PASSIVE (1 << 2)
/* Max length of Erlang Term Buffer (for outputting structured terms): */
#ifdef HAVE_SCTP
@@ -958,6 +967,7 @@ typedef struct {
inet_async_op op_queue[INET_MAX_ASYNC]; /* call queue */
int active; /* 0 = passive, 1 = active, 2 = active once */
+ Sint16 active_count; /* counter for {active,N} */
int stype; /* socket type:
SOCK_STREAM/SOCK_DGRAM/SOCK_SEQPACKET */
int sprotocol; /* socket protocol:
@@ -1193,22 +1203,36 @@ static int packet_inet_output(udp_descriptor* udesc, HANDLE event);
static int async_ref = 0; /* async reference id generator */
#define NEW_ASYNC_ID() ((async_ref++) & 0xffff)
+/* check for transition from active to passive */
+#define INET_CHECK_ACTIVE_TO_PASSIVE(inet) \
+ do { \
+ if ((inet)->active == INET_ONCE) \
+ (inet)->active = INET_PASSIVE; \
+ else if ((inet)->active == INET_MULTI && --((inet)->active_count) == 0) { \
+ (inet)->active = INET_PASSIVE; \
+ packet_passive_message(inet); \
+ } \
+ } while (0)
static ErlDrvTermData am_ok;
static ErlDrvTermData am_tcp;
static ErlDrvTermData am_udp;
static ErlDrvTermData am_error;
+static ErlDrvTermData am_einval;
static ErlDrvTermData am_inet_async;
static ErlDrvTermData am_inet_reply;
static ErlDrvTermData am_timeout;
static ErlDrvTermData am_closed;
+static ErlDrvTermData am_tcp_passive;
static ErlDrvTermData am_tcp_closed;
static ErlDrvTermData am_tcp_error;
+static ErlDrvTermData am_udp_passive;
static ErlDrvTermData am_udp_error;
static ErlDrvTermData am_empty_out_q;
static ErlDrvTermData am_ssl_tls;
#ifdef HAVE_SCTP
static ErlDrvTermData am_sctp;
+static ErlDrvTermData am_sctp_passive;
static ErlDrvTermData am_sctp_error;
static ErlDrvTermData am_true;
static ErlDrvTermData am_false;
@@ -1218,6 +1242,7 @@ static ErlDrvTermData am_list;
static ErlDrvTermData am_binary;
static ErlDrvTermData am_active;
static ErlDrvTermData am_once;
+static ErlDrvTermData am_multi;
static ErlDrvTermData am_buffer;
static ErlDrvTermData am_linger;
static ErlDrvTermData am_recbuf;
@@ -1466,8 +1491,8 @@ static int load_ip_and_port
unsigned int alen = len;
char abuf [len];
int res = inet_get_address(abuf, (inet_address*) addr, &alen);
- ASSERT(res==0);
- res = 0;
+ ASSERT(res==0); (void)res;
+
/* Now "abuf" contains: Family(1b), Port(2b), IP(4|16b) */
/* NB: the following functions are safe to use, as they create tuples
@@ -3368,6 +3393,34 @@ static int packet_binary_message
}
/*
+** active mode message: send active-to-passive transition message
+** {tcp_passive, S} or
+** {udp_passive, S} or
+** {sctp_passive, S}
+*/
+ static int packet_passive_message(inet_descriptor* desc)
+ {
+ ErlDrvTermData spec[6];
+ int i = 0;
+
+ DEBUGF(("packet_passive_message(%ld):\r\n", (long)desc->port));
+
+ if (desc->sprotocol == IPPROTO_TCP)
+ i = LOAD_ATOM(spec, i, am_tcp_passive);
+ else {
+#ifdef HAVE_SCTP
+ i = LOAD_ATOM(spec, i, IS_SCTP(desc) ? am_sctp_passive : am_udp_passive);
+#else
+ i = LOAD_ATOM(spec, i, am_udp_passive);
+#endif
+ }
+ i = LOAD_PORT(spec, i, desc->dport);
+ i = LOAD_TUPLE(spec, i, 2);
+ ASSERT(i <= 6);
+ return erl_drv_output_term(desc->dport, spec, i);
+ }
+
+/*
** send active message {udp_error|sctp_error, S, Error}
*/
static int packet_error_message(udp_descriptor* udesc, int err)
@@ -3409,7 +3462,7 @@ static int tcp_reply_data(tcp_descriptor* desc, char* buf, int len)
int code;
const char* body = buf;
int bodylen = len;
-
+
packet_get_body(desc->inet.htype, &body, &bodylen);
if (desc->inet.deliver == INET_DELIVER_PORT) {
@@ -3427,8 +3480,7 @@ static int tcp_reply_data(tcp_descriptor* desc, char* buf, int len)
if (code < 0)
return code;
- if (desc->inet.active == INET_ONCE)
- desc->inet.active = INET_PASSIVE;
+ INET_CHECK_ACTIVE_TO_PASSIVE(INETP(desc));
return code;
}
@@ -3455,8 +3507,7 @@ tcp_reply_binary_data(tcp_descriptor* desc, ErlDrvBinary* bin, int offs, int len
}
if (code < 0)
return code;
- if (desc->inet.active == INET_ONCE)
- desc->inet.active = INET_PASSIVE;
+ INET_CHECK_ACTIVE_TO_PASSIVE(INETP(desc));
return code;
}
@@ -3479,8 +3530,7 @@ packet_reply_binary_data(inet_descriptor* desc, unsigned int hsz,
code = packet_binary_message(desc, bin, offs, len, extra);
if (code < 0)
return code;
- if (desc->active == INET_ONCE)
- desc->active = INET_PASSIVE;
+ INET_CHECK_ACTIVE_TO_PASSIVE(desc);
return code;
}
}
@@ -3525,6 +3575,7 @@ sock_init(void) /* May be called multiple times. */
#ifdef HAVE_SCTP
static void inet_init_sctp(void) {
INIT_ATOM(sctp);
+ INIT_ATOM(sctp_passive);
INIT_ATOM(sctp_error);
INIT_ATOM(true);
INIT_ATOM(false);
@@ -3534,6 +3585,7 @@ static void inet_init_sctp(void) {
INIT_ATOM(binary);
INIT_ATOM(active);
INIT_ATOM(once);
+ INIT_ATOM(multi);
INIT_ATOM(buffer);
INIT_ATOM(linger);
INIT_ATOM(recbuf);
@@ -3659,12 +3711,15 @@ static int inet_init()
INIT_ATOM(tcp);
INIT_ATOM(udp);
INIT_ATOM(error);
+ INIT_ATOM(einval);
INIT_ATOM(inet_async);
INIT_ATOM(inet_reply);
INIT_ATOM(timeout);
INIT_ATOM(closed);
+ INIT_ATOM(tcp_passive);
INIT_ATOM(tcp_closed);
INIT_ATOM(tcp_error);
+ INIT_ATOM(udp_passive);
INIT_ATOM(udp_error);
INIT_ATOM(empty_out_q);
INIT_ATOM(ssl_tls);
@@ -5652,8 +5707,25 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
case INET_LOPT_ACTIVE:
DEBUGF(("inet_set_opts(%ld): s=%d, ACTIVE=%d\r\n",
- (long)desc->port, desc->s,ival));
+ (long)desc->port, desc->s, ival));
desc->active = ival;
+ if (desc->active == INET_MULTI) {
+ long ac = desc->active_count;
+ Sint16 nval = get_int16(ptr);
+ ptr += 2;
+ len -= 2;
+ ac += nval;
+ if (ac > INT16_MAX || ac < INT16_MIN)
+ return -1;
+ desc->active_count += nval;
+ if (desc->active_count < 0)
+ desc->active_count = 0;
+ if (desc->active_count == 0) {
+ desc->active = INET_PASSIVE;
+ packet_passive_message(desc);
+ }
+ } else
+ desc->active_count = 0;
if ((desc->stype == SOCK_STREAM) && (desc->active != INET_PASSIVE) &&
(desc->state == INET_STATE_CLOSED)) {
tcp_closed_message((tcp_descriptor *) desc);
@@ -5964,7 +6036,8 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
/* XXX fprintf(stderr,"desc->htype == %d, old_htype == %d,
desc->active == %d, old_active == %d\r\n",(int)desc->htype,
(int) old_htype, (int) desc->active, (int) old_active );*/
- return 1+(desc->htype == old_htype && desc->active == INET_ONCE);
+ return 1+(desc->htype == old_htype &&
+ (desc->active == INET_ONCE || desc->active == INET_MULTI));
}
return 0;
}
@@ -6097,6 +6170,21 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len)
case INET_LOPT_ACTIVE:
desc->active = get_int32(curr); curr += 4;
+ if (desc->active == INET_MULTI) {
+ long ac = desc->active_count;
+ Sint16 nval = get_int16(curr); curr += 2;
+ ac += nval;
+ if (ac > INT16_MAX || ac < INT16_MIN)
+ return -1;
+ desc->active_count += nval;
+ if (desc->active_count < 0)
+ desc->active_count = 0;
+ if (desc->active_count == 0) {
+ desc->active = INET_PASSIVE;
+ packet_passive_message(desc);
+ }
+ } else
+ desc->active_count = 0;
res = 0;
continue;
@@ -6619,6 +6707,11 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc,
case INET_LOPT_ACTIVE:
*ptr++ = opt;
put_int32(desc->active, ptr);
+ if (desc->active == INET_MULTI) {
+ PLACE_FOR(2,ptr);
+ put_int16(desc->active_count, ptr);
+ ptr += 2;
+ }
continue;
case INET_LOPT_PACKET:
*ptr++ = opt;
@@ -6991,7 +7084,10 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc,
}
case INET_LOPT_ACTIVE:
{
- PLACE_FOR(spec, i, 2*LOAD_ATOM_CNT + LOAD_TUPLE_CNT);
+ if (desc->active == INET_MULTI)
+ PLACE_FOR(spec, i, LOAD_ATOM_CNT + LOAD_INT_CNT + LOAD_TUPLE_CNT);
+ else
+ PLACE_FOR(spec, i, 2*LOAD_ATOM_CNT + LOAD_TUPLE_CNT);
i = LOAD_ATOM (spec, i, am_active);
switch (desc->active)
{
@@ -7004,6 +7100,9 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc,
case INET_ONCE :
{ i = LOAD_ATOM (spec, i, am_once); break; }
+ case INET_MULTI :
+ { i = LOAD_INT(spec, i, desc->active_count); break; }
+
default: ASSERT (0);
}
i = LOAD_TUPLE (spec, i, 2);
@@ -7800,6 +7899,7 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol)
socket */
desc->deliver = INET_DELIVER_TERM; /* standard term format */
desc->active = INET_PASSIVE; /* start passive */
+ desc->active_count = 0;
desc->oph = NULL;
desc->opt = NULL;
@@ -8208,11 +8308,19 @@ static ErlDrvSSizeT inet_ctl(inet_descriptor* desc, int cmd, char* buf,
if (*buf == 1 && !desc->is_ignored) {
sock_select(desc, (FD_READ|FD_WRITE|FD_CLOSE|ERL_DRV_USE_NO_CALLBACK), 0);
- desc->is_ignored = INET_IGNORE_READ;
+ if (desc->active)
+ desc->is_ignored = INET_IGNORE_READ;
+ else
+ desc->is_ignored = INET_IGNORE_PASSIVE;
} else if (*buf == 0 && desc->is_ignored) {
- int flags = (FD_READ|FD_CLOSE|((desc->is_ignored & INET_IGNORE_WRITE)?FD_WRITE:0));
+ int flags = FD_CLOSE;
+ if (desc->is_ignored & INET_IGNORE_READ)
+ flags |= FD_READ;
+ if (desc->is_ignored & INET_IGNORE_WRITE)
+ flags |= FD_WRITE;
desc->is_ignored = INET_IGNORE_NONE;
- sock_select(desc, flags, 1);
+ if (flags != FD_CLOSE)
+ sock_select(desc, flags, 1);
} else
return ctl_error(EINVAL, rbuf, rsize);
@@ -8889,6 +8997,8 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd,
driver_set_timer(desc->inet.port, timeout);
if (!INETP(desc)->is_ignored)
sock_select(INETP(desc),(FD_READ|FD_CLOSE),1);
+ else
+ INETP(desc)->is_ignored |= INET_IGNORE_READ;
}
}
return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize);