diff options
Diffstat (limited to 'erts/emulator/drivers')
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 257 |
1 files changed, 167 insertions, 90 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 44b0dcea1d..b44464d6da 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -38,6 +38,7 @@ #include <ctype.h> #include <sys/types.h> #include <errno.h> +#include <stdint.h> #define IDENTITY(c) c #define STRINGIFY_1(b) IDENTITY(#b) @@ -959,6 +960,7 @@ static size_t my_strnlen(const char *s, size_t maxlen) #endif #endif +typedef struct _tcp_descriptor tcp_descriptor; #if defined(TCP_CORK) #define INET_TCP_NOPUSH TCP_CORK @@ -1014,16 +1016,19 @@ typedef struct _multi_timer_data { struct _multi_timer_data *prev; } MultiTimerData; -static MultiTimerData *add_multi_timer(MultiTimerData **first, ErlDrvPort port, - ErlDrvTermData caller, unsigned timeout, - void (*timeout_fun)(ErlDrvData drv_data, - ErlDrvTermData caller)); -static void fire_multi_timers(MultiTimerData **first, ErlDrvPort port, +static MultiTimerData *add_multi_timer(tcp_descriptor *desc, ErlDrvPort port, + ErlDrvTermData caller, unsigned timeout, + void (*timeout_fun)(ErlDrvData drv_data, + ErlDrvTermData caller)); +static void fire_multi_timers(tcp_descriptor *desc, ErlDrvPort port, ErlDrvData data); -static void remove_multi_timer(MultiTimerData **first, ErlDrvPort port, MultiTimerData *p); +static void remove_multi_timer(tcp_descriptor *desc, ErlDrvPort port, MultiTimerData *p); +static void cancel_multi_timer(tcp_descriptor *desc, ErlDrvPort port, + void (*timeout_fun)(ErlDrvData drv_data, + ErlDrvTermData caller)); static void tcp_inet_multi_timeout(ErlDrvData e, ErlDrvTermData caller); -static void clean_multi_timers(MultiTimerData **first, ErlDrvPort port); +static void clean_multi_timers(tcp_descriptor *desc, ErlDrvPort port); typedef struct { int id; /* id used to identify reply */ @@ -1282,7 +1287,7 @@ static struct erl_drv_entry sctp_inet_driver_entry = }; #endif -typedef struct { +struct _tcp_descriptor { inet_descriptor inet; /* common data structure (DON'T MOVE) */ int high; /* high watermark */ int low; /* low watermark */ @@ -1298,7 +1303,8 @@ typedef struct { int http_state; /* 0 = response|request 1=headers fields */ inet_async_multi_op *multi_first;/* NULL == no multi-accept-queue, op is in ordinary queue */ inet_async_multi_op *multi_last; - MultiTimerData *mtd; /* Timer structures for multiple accept */ + MultiTimerData *mtd; /* Timer structures for multiple accept */ + MultiTimerData *mtd_cache; /* A cache for timer allocations */ #ifdef HAVE_SENDFILE struct { ErlDrvSizeT ioq_skip; /* The number of bytes in the queue at the time @@ -1314,7 +1320,7 @@ typedef struct { Uint64 length; } sendfile; #endif -} tcp_descriptor; +}; /* send function */ static int tcp_send(tcp_descriptor* desc, char* ptr, ErlDrvSizeT len); @@ -9779,6 +9785,7 @@ static ErlDrvData prep_tcp_inet_start(ErlDrvPort port, char* args) desc->tcp_add_flags = 0; desc->http_state = 0; desc->mtd = NULL; + desc->mtd_cache = NULL; desc->multi_first = desc->multi_last = NULL; DEBUGF(("tcp_inet_start(%ld) }\r\n", (long)port)); return (ErlDrvData) desc; @@ -9882,15 +9889,14 @@ static void tcp_close_check(tcp_descriptor* desc) driver_demonitor_process(desc->inet.port, &monitor); send_async_error(desc->inet.dport, id, caller, am_closed); } - clean_multi_timers(&(desc->mtd), desc->inet.port); } - else if (desc->inet.state == INET_STATE_CONNECTING) { async_error_am(INETP(desc), am_closed); } else if (desc->inet.state == INET_STATE_CONNECTED) { async_error_am_all(INETP(desc), am_closed); } + clean_multi_timers(desc, desc->inet.port); } /* @@ -9933,6 +9939,15 @@ static void tcp_desc_close(tcp_descriptor* desc) erl_inet_close(INETP(desc)); } +static void tcp_inet_recv_timeout(ErlDrvData e, ErlDrvTermData dummy) +{ + tcp_descriptor* desc = (tcp_descriptor*)e; + ASSERT(!desc->inet.active); + sock_select(INETP(desc),(FD_READ|FD_CLOSE),0); + desc->i_remain = 0; + async_error_am(INETP(desc), am_timeout); +} + /* TCP requests from Erlang */ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, ErlDrvSizeT len, @@ -10103,12 +10118,12 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, if (time_left <= 0) { time_left = 1; } - omtd = add_multi_timer(&(desc->mtd), desc->inet.port, ocaller, + omtd = add_multi_timer(desc, desc->inet.port, ocaller, time_left, &tcp_inet_multi_timeout); } enq_old_multi_op(desc, oid, oreq, ocaller, omtd, &omonitor); if (timeout != INET_INFINITY) { - mtd = add_multi_timer(&(desc->mtd), desc->inet.port, caller, + mtd = add_multi_timer(desc, desc->inet.port, caller, timeout, &tcp_inet_multi_timeout); } enq_multi_op(desc, tbuf, INET_REQ_ACCEPT, caller, mtd, &monitor); @@ -10123,7 +10138,7 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, return ctl_xerror("noproc", rbuf, rsize); } if (timeout != INET_INFINITY) { - mtd = add_multi_timer(&(desc->mtd), desc->inet.port, caller, + mtd = add_multi_timer(desc, desc->inet.port, caller, timeout, &tcp_inet_multi_timeout); } enq_multi_op(desc, tbuf, INET_REQ_ACCEPT, caller, mtd, &monitor); @@ -10220,7 +10235,8 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, async_error_am(INETP(desc), am_timeout); else { if (timeout != INET_INFINITY) - driver_set_timer(desc->inet.port, timeout); + add_multi_timer(desc, INETP(desc)->port, 0, + timeout, &tcp_inet_recv_timeout); if (!INETP(desc)->is_ignored) sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); else @@ -10325,12 +10341,27 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, } +static void tcp_inet_send_timeout(ErlDrvData e, ErlDrvTermData dummy) +{ + tcp_descriptor* desc = (tcp_descriptor*)e; + ASSERT(IS_BUSY(INETP(desc))); + ASSERT(desc->busy_on_send); + desc->inet.caller = desc->inet.busy_caller; + desc->inet.state &= ~INET_F_BUSY; + desc->busy_on_send = 0; + set_busy_port(desc->inet.port, 0); + inet_reply_error_am(INETP(desc), am_timeout); + if (desc->send_timeout_close) { + tcp_desc_close(desc); + } +} + /* ** tcp_inet_timeout: ** called when timer expire: ** TCP socket may be: ** -** a) receiving -- deselect +** a) receiving -- send timeout ** b) connecting -- close socket ** c) accepting -- reset listener ** @@ -10344,26 +10375,9 @@ static void tcp_inet_timeout(ErlDrvData e) DEBUGF(("tcp_inet_timeout(%ld) {s=%d\r\n", (long)desc->inet.port, desc->inet.s)); if ((state & INET_F_MULTI_CLIENT)) { /* Multi-client always means multi-timers */ - fire_multi_timers(&(desc->mtd), desc->inet.port, e); + fire_multi_timers(desc, desc->inet.port, e); } else if ((state & INET_STATE_CONNECTED) == INET_STATE_CONNECTED) { - if (desc->busy_on_send) { - ASSERT(IS_BUSY(INETP(desc))); - desc->inet.caller = desc->inet.busy_caller; - desc->inet.state &= ~INET_F_BUSY; - desc->busy_on_send = 0; - set_busy_port(desc->inet.port, 0); - inet_reply_error_am(INETP(desc), am_timeout); - if (desc->send_timeout_close) { - tcp_desc_close(desc); - } - } - else { - /* assume recv timeout */ - ASSERT(!desc->inet.active); - sock_select(INETP(desc),(FD_READ|FD_CLOSE),0); - desc->i_remain = 0; - async_error_am(INETP(desc), am_timeout); - } + fire_multi_timers(desc, desc->inet.port, e); } else if ((state & INET_STATE_CONNECTING) == INET_STATE_CONNECTING) { /* assume connect timeout */ @@ -10493,7 +10507,7 @@ static void tcp_inet_process_exit(ErlDrvData e, ErlDrvMonitor *monitorp) return; } if (timeout != NULL) { - remove_multi_timer(&(desc->mtd), desc->inet.port, timeout); + remove_multi_timer(desc, desc->inet.port, timeout); } if (desc->multi_first == NULL) { sock_select(INETP(desc),FD_ACCEPT,0); @@ -10532,7 +10546,7 @@ static int tcp_recv_closed(tcp_descriptor* desc) desc->inet.caller = desc->inet.busy_caller; tcp_clear_output(desc); if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(desc, INETP(desc)->port, &tcp_inet_send_timeout); desc->busy_on_send = 0; DEBUGF(("tcp_recv_closed(%ld): busy on send\r\n", port)); } @@ -10555,9 +10569,10 @@ static int tcp_recv_closed(tcp_descriptor* desc) */ desc->tcp_add_flags |= TCP_ADDF_DELAYED_CLOSE_SEND; } + if (!desc->inet.active) { - /* We must cancel any timer here ! */ - driver_cancel_timer(desc->inet.port); + /* We must cancel any timer here ! */ + clean_multi_timers(desc, INETP(desc)->port); /* passive mode do not terminate port ! */ tcp_clear_input(desc); if (desc->inet.exitf) { @@ -10592,7 +10607,7 @@ static int tcp_recv_error(tcp_descriptor* desc, int err) desc->inet.caller = desc->inet.busy_caller; tcp_clear_output(desc); if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(desc, INETP(desc)->port, &tcp_inet_send_timeout); desc->busy_on_send = 0; } desc->inet.state &= ~INET_F_BUSY; @@ -10606,7 +10621,7 @@ static int tcp_recv_error(tcp_descriptor* desc, int err) #endif if (!desc->inet.active) { /* We must cancel any timer here ! */ - driver_cancel_timer(desc->inet.port); + clean_multi_timers(desc, INETP(desc)->port); tcp_clear_input(desc); if (desc->inet.exitf) { tcp_desc_close(desc); @@ -10711,13 +10726,13 @@ static int tcp_deliver(tcp_descriptor* desc, int len) if (len == 0) { /* empty buffer or waiting for more input */ if ((desc->i_buf == NULL) || (desc->i_remain > 0)) - return count; + return 0; if ((n = tcp_remain(desc, &len)) != 0) { if (n < 0) /* packet error */ return n; if (len > 0) /* more data pending */ desc->i_remain = len; - return count; + return 0; } } @@ -10769,9 +10784,7 @@ static int tcp_deliver(tcp_descriptor* desc, int len) len = 0; if (!desc->inet.active) { - if (!desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); - } + cancel_multi_timer(desc, INETP(desc)->port, &tcp_inet_recv_timeout); sock_select(INETP(desc),(FD_READ|FD_CLOSE),0); if (desc->i_buf != NULL) tcp_restart_input(desc); @@ -10797,7 +10810,7 @@ static int tcp_recv(tcp_descriptor* desc, int request_len) int len; int nread; - if (desc->i_buf == NULL) { /* allocte a read buffer */ + if (desc->i_buf == NULL) { /* allocate a read buffer */ int sz = (request_len > 0) ? request_len : desc->inet.bufsz; if ((desc->i_buf = alloc_buffer(sz)) == NULL) @@ -10870,10 +10883,11 @@ static int tcp_recv(tcp_descriptor* desc, int request_len) return tcp_deliver(desc, desc->i_ptr - desc->i_ptr_start); } else { - if ((nread = tcp_remain(desc, &len)) < 0) + nread = tcp_remain(desc, &len); + if (nread < 0) return tcp_recv_error(desc, EMSGSIZE); else if (nread == 0) - return tcp_deliver(desc, len); + return tcp_deliver(desc, len); else if (len > 0) desc->i_remain = len; /* set remain */ } @@ -11192,7 +11206,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) } if (timeout != NULL) { - remove_multi_timer(&(desc->mtd), desc->inet.port, timeout); + remove_multi_timer(desc, desc->inet.port, timeout); } driver_demonitor_process(desc->inet.port, &monitor); @@ -11251,8 +11265,8 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) if (IS_BUSY(INETP(desc))) { desc->inet.caller = desc->inet.busy_caller; if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); - desc->busy_on_send = 0; + cancel_multi_timer(desc, INETP(desc)->port, &tcp_inet_send_timeout); + desc->busy_on_send = 0; } desc->inet.state &= ~INET_F_BUSY; set_busy_port(desc->inet.port, 0); @@ -11357,6 +11371,12 @@ static int tcp_shutdown_error(tcp_descriptor* desc, int err) return tcp_send_or_shutdown_error(desc, err); } +static void tcp_inet_delay_send(ErlDrvData data, ErlDrvTermData dummy) +{ + tcp_descriptor *desc = (tcp_descriptor*)data; + (void)tcp_inet_output(desc, INETP(desc)->s); +} + /* ** Send non-blocking vector data */ @@ -11409,7 +11429,9 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) set_busy_port(desc->inet.port, 1); if (desc->send_timeout != INET_INFINITY) { desc->busy_on_send = 1; - driver_set_timer(desc->inet.port, desc->send_timeout); + add_multi_timer(desc, INETP(desc)->port, + 0 /* arg */, desc->send_timeout /* timeout */, + &tcp_inet_send_timeout); } return 1; } @@ -11424,7 +11446,10 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) INETP(desc)->is_ignored |= INET_IGNORE_WRITE; n = 0; } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { - n = 0; + driver_enqv(ix, ev, 0); + add_multi_timer(desc, INETP(desc)->port, 0, + 0, &tcp_inet_delay_send); + return 0; } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, ev->iov, vsize, &n, 0))) { if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) { @@ -11507,7 +11532,9 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, ErlDrvSizeT len) set_busy_port(desc->inet.port, 1); if (desc->send_timeout != INET_INFINITY) { desc->busy_on_send = 1; - driver_set_timer(desc->inet.port, desc->send_timeout); + add_multi_timer(desc, INETP(desc)->port, + 0 /* arg */, desc->send_timeout /* timeout */, + &tcp_inet_send_timeout); } return 1; } @@ -11611,7 +11638,8 @@ static int tcp_sendfile_completed(tcp_descriptor* desc) { /* if we have a timer then cancel and send ok to client */ if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(desc, INETP(desc)->port, + &tcp_inet_send_timeout); desc->busy_on_send = 0; } @@ -11918,6 +11946,12 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) #ifdef __WIN32__ desc->inet.send_would_block = 1; #endif + /* If DELAY_SEND is set ready_output may have + been called without doing select so we do + a select in order to get into the correct + state */ + if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) + sock_select(INETP(desc), FD_WRITE, 1); goto done; } else if (n == 0) { /* Workaround for redhat/CentOS 6.3 returning 0 when sending packets with @@ -11943,7 +11977,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) set_busy_port(desc->inet.port, 0); /* if we have a timer then cancel and send ok to client */ if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(desc, INETP(desc)->port, &tcp_inet_send_timeout); desc->busy_on_send = 0; } inet_reply_ok(INETP(desc)); @@ -12755,7 +12789,7 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) udesc->i_buf = NULL; if (!desc->active) { async_error(desc, err); - driver_cancel_timer(desc->port); + driver_cancel_timer(desc->port); sock_select(desc,FD_READ,0); } else { @@ -12844,7 +12878,7 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) return count; count++; if (!desc->active) { - driver_cancel_timer(desc->port); /* possibly cancel */ + driver_cancel_timer(desc->port); sock_select(desc,FD_READ,0); return count; /* passive mode (read one packet only) */ } @@ -12923,55 +12957,69 @@ make_noninheritable_handle(SOCKET s) * Multi-timers */ -static void fire_multi_timers(MultiTimerData **first, ErlDrvPort port, +static void fire_multi_timers(tcp_descriptor *desc, ErlDrvPort port, ErlDrvData data) { ErlDrvTime next_timeout; - if (!*first) { + MultiTimerData *curr = desc->mtd; + if (!curr) { ASSERT(0); return; } #ifdef DEBUG { ErlDrvTime chk = erl_drv_monotonic_time(ERL_DRV_MSEC); - ASSERT(chk >= (*first)->when); + ASSERT(chk >= curr->when); } #endif do { - MultiTimerData *save = *first; - *first = save->next; + MultiTimerData *save = curr; + (*(save->timeout_function))(data,save->caller); - FREE(save); - if (*first == NULL) { + + curr = curr->next; + + if (desc->mtd_cache == NULL) + desc->mtd_cache = save; + else + FREE(save); + + if (curr == NULL) { + desc->mtd = NULL; return; } - (*first)->prev = NULL; - next_timeout = (*first)->when - erl_drv_monotonic_time(ERL_DRV_MSEC); + curr->prev = NULL; + next_timeout = curr->when - erl_drv_monotonic_time(ERL_DRV_MSEC); } while (next_timeout <= 0); + desc->mtd = curr; driver_set_timer(port, (unsigned long) next_timeout); } -static void clean_multi_timers(MultiTimerData **first, ErlDrvPort port) +static void clean_multi_timers(tcp_descriptor *desc, ErlDrvPort port) { - MultiTimerData *p; - if (*first) { + if (desc->mtd) { driver_cancel_timer(port); } - while (*first) { - p = *first; - *first = p->next; - FREE(p); + while (desc->mtd) { + MultiTimerData *p = desc->mtd; + desc->mtd = p->next; + FREE(p); + } + desc->mtd = NULL; + if (desc->mtd_cache) { + FREE(desc->mtd_cache); + desc->mtd_cache = NULL; } } -static void remove_multi_timer(MultiTimerData **first, ErlDrvPort port, MultiTimerData *p) +static void remove_multi_timer(tcp_descriptor *desc, ErlDrvPort port, MultiTimerData *p) { if (p->prev != NULL) { p->prev->next = p->next; } else { driver_cancel_timer(port); - *first = p->next; - if (*first) { - ErlDrvTime ntmo = (*first)->when - erl_drv_monotonic_time(ERL_DRV_MSEC); + desc->mtd = p->next; + if (desc->mtd) { + ErlDrvTime ntmo = desc->mtd->when - erl_drv_monotonic_time(ERL_DRV_MSEC); if (ntmo < 0) ntmo = 0; driver_set_timer(port, (unsigned long) ntmo); @@ -12980,36 +13028,67 @@ static void remove_multi_timer(MultiTimerData **first, ErlDrvPort port, MultiTim if (p->next != NULL) { p->next->prev = p->prev; } - FREE(p); + if (desc->mtd_cache == NULL) + desc->mtd_cache = p; + else + FREE(p); } -static MultiTimerData *add_multi_timer(MultiTimerData **first, ErlDrvPort port, +/* Cancel a timer based on the timeout_fun */ +static void cancel_multi_timer(tcp_descriptor *desc, ErlDrvPort port, + void (*timeout_fun)(ErlDrvData drv_data, + ErlDrvTermData caller)) +{ + MultiTimerData *timer = desc->mtd; + while(timer && timer->timeout_function != timeout_fun) { + timer = timer->next; + } + if (timer) { + remove_multi_timer(desc, port, timer); + } +} + +static MultiTimerData *add_multi_timer(tcp_descriptor *desc, ErlDrvPort port, ErlDrvTermData caller, unsigned timeout, void (*timeout_fun)(ErlDrvData drv_data, ErlDrvTermData caller)) { MultiTimerData *mtd, *p, *s; - mtd = ALLOC(sizeof(MultiTimerData)); - mtd->when = erl_drv_monotonic_time(ERL_DRV_MSEC) + ((ErlDrvTime) timeout) + 1; + + /* Use cached timer if available */ + if (desc->mtd_cache != NULL) { + mtd = desc->mtd_cache; + desc->mtd_cache = NULL; + } else + mtd = ALLOC(sizeof(MultiTimerData)); + + if (timeout) + mtd->when = erl_drv_monotonic_time(ERL_DRV_MSEC) + ((ErlDrvTime) timeout); + else + mtd->when = INT64_MIN; /* Don't have to get the time for 0 msec timeouts */ + mtd->timeout_function = timeout_fun; mtd->caller = caller; mtd->next = mtd->prev = NULL; - for(p = *first,s = NULL; p != NULL; s = p, p = p->next) { + + /* Find correct slot in timer linked list */ + for(p = desc->mtd,s = NULL; p != NULL; s = p, p = p->next) { if (p->when >= mtd->when) { break; } } + /* Insert in linked list */ if (!p) { if (!s) { - *first = mtd; + desc->mtd = mtd; } else { s->next = mtd; mtd->prev = s; } } else { if (!s) { - *first = mtd; + desc->mtd = mtd; } else { s->next = mtd; mtd->prev = s; @@ -13017,10 +13096,8 @@ static MultiTimerData *add_multi_timer(MultiTimerData **first, ErlDrvPort port, mtd->next = p; p->prev = mtd; } + /* Possibly set new timer */ if (!s) { - if (mtd->next) { - driver_cancel_timer(port); - } driver_set_timer(port,timeout); } return mtd; |