diff options
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 133 |
1 files changed, 82 insertions, 51 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 259a27cf57..222b369a3f 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -1010,6 +1010,9 @@ static MultiTimerData *add_multi_timer(MultiTimerData **first, ErlDrvPort port, static void fire_multi_timers(MultiTimerData **first, ErlDrvPort port, ErlDrvData data); static void remove_multi_timer(MultiTimerData **first, ErlDrvPort port, MultiTimerData *p); +static void cancel_multi_timer(MultiTimerData **first, ErlDrvPort port, + void (*timeout_fun)(ErlDrvData drv_data, + ErlDrvTermData caller)); static void tcp_inet_multi_timeout(ErlDrvData e, ErlDrvTermData caller); static void clean_multi_timers(MultiTimerData **first, ErlDrvPort port); @@ -9779,15 +9782,14 @@ static void tcp_close_check(tcp_descriptor* desc) driver_demonitor_process(desc->inet.port, &monitor); send_async_error(desc->inet.dport, id, caller, am_closed); } - clean_multi_timers(&(desc->mtd), desc->inet.port); } - else if (desc->inet.state == INET_STATE_CONNECTING) { async_error_am(INETP(desc), am_closed); } else if (desc->inet.state == INET_STATE_CONNECTED) { async_error_am_all(INETP(desc), am_closed); } + clean_multi_timers(&(desc->mtd), desc->inet.port); } /* @@ -9830,6 +9832,15 @@ static void tcp_desc_close(tcp_descriptor* desc) erl_inet_close(INETP(desc)); } +static void tcp_inet_recv_timeout(ErlDrvData e, ErlDrvTermData dummy) +{ + tcp_descriptor* desc = (tcp_descriptor*)e; + ASSERT(!desc->inet.active); + sock_select(INETP(desc),(FD_READ|FD_CLOSE),0); + desc->i_remain = 0; + async_error_am(INETP(desc), am_timeout); +} + /* TCP requests from Erlang */ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, ErlDrvSizeT len, @@ -10117,7 +10128,8 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, async_error_am(INETP(desc), am_timeout); else { if (timeout != INET_INFINITY) - driver_set_timer(desc->inet.port, timeout); + add_multi_timer(&(desc->mtd), INETP(desc)->port, 0, + timeout, &tcp_inet_recv_timeout); if (!INETP(desc)->is_ignored) sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); else @@ -10223,12 +10235,27 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, } +static void tcp_inet_send_timeout(ErlDrvData e, ErlDrvTermData dummy) +{ + tcp_descriptor* desc = (tcp_descriptor*)e; + ASSERT(IS_BUSY(INETP(desc))); + ASSERT(desc->busy_on_send); + desc->inet.caller = desc->inet.busy_caller; + desc->inet.state &= ~INET_F_BUSY; + desc->busy_on_send = 0; + set_busy_port(desc->inet.port, 0); + inet_reply_error_am(INETP(desc), am_timeout); + if (desc->send_timeout_close) { + tcp_desc_close(desc); + } +} + /* ** tcp_inet_timeout: ** called when timer expire: ** TCP socket may be: ** -** a) receiving -- deselect +** a) receiving -- send timeout ** b) connecting -- close socket ** c) accepting -- reset listener ** @@ -10244,24 +10271,7 @@ static void tcp_inet_timeout(ErlDrvData e) if ((state & INET_F_MULTI_CLIENT)) { /* Multi-client always means multi-timers */ fire_multi_timers(&(desc->mtd), desc->inet.port, e); } else if ((state & INET_STATE_CONNECTED) == INET_STATE_CONNECTED) { - if (desc->busy_on_send) { - ASSERT(IS_BUSY(INETP(desc))); - desc->inet.caller = desc->inet.busy_caller; - desc->inet.state &= ~INET_F_BUSY; - desc->busy_on_send = 0; - set_busy_port(desc->inet.port, 0); - inet_reply_error_am(INETP(desc), am_timeout); - if (desc->send_timeout_close) { - tcp_desc_close(desc); - } - } - else { - /* assume recv timeout */ - ASSERT(!desc->inet.active); - sock_select(INETP(desc),(FD_READ|FD_CLOSE),0); - desc->i_remain = 0; - async_error_am(INETP(desc), am_timeout); - } + fire_multi_timers(&(desc->mtd), desc->inet.port, e); } else if ((state & INET_STATE_CONNECTING) == INET_STATE_CONNECTING) { /* assume connect timeout */ @@ -10429,7 +10439,7 @@ static int tcp_recv_closed(tcp_descriptor* desc) desc->inet.caller = desc->inet.busy_caller; tcp_clear_output(desc); if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(&(desc->mtd), INETP(desc)->port, &tcp_inet_send_timeout); desc->busy_on_send = 0; DEBUGF(("tcp_recv_closed(%ld): busy on send\r\n", port)); } @@ -10444,9 +10454,10 @@ static int tcp_recv_closed(tcp_descriptor* desc) */ desc->tcp_add_flags |= TCP_ADDF_DELAYED_CLOSE_SEND; } + if (!desc->inet.active) { - /* We must cancel any timer here ! */ - driver_cancel_timer(desc->inet.port); + /* We must cancel any timer here ! */ + clean_multi_timers(&(desc->mtd), INETP(desc)->port); /* passive mode do not terminate port ! */ tcp_clear_input(desc); if (desc->inet.exitf) { @@ -10481,7 +10492,7 @@ static int tcp_recv_error(tcp_descriptor* desc, int err) desc->inet.caller = desc->inet.busy_caller; tcp_clear_output(desc); if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(&(desc->mtd), INETP(desc)->port, &tcp_inet_send_timeout); desc->busy_on_send = 0; } desc->inet.state &= ~INET_F_BUSY; @@ -10490,7 +10501,7 @@ static int tcp_recv_error(tcp_descriptor* desc, int err) } if (!desc->inet.active) { /* We must cancel any timer here ! */ - driver_cancel_timer(desc->inet.port); + clean_multi_timers(&(desc->mtd), INETP(desc)->port); tcp_clear_input(desc); if (desc->inet.exitf) { tcp_desc_close(desc); @@ -10595,13 +10606,13 @@ static int tcp_deliver(tcp_descriptor* desc, int len) if (len == 0) { /* empty buffer or waiting for more input */ if ((desc->i_buf == NULL) || (desc->i_remain > 0)) - return count; + return 0; if ((n = tcp_remain(desc, &len)) != 0) { if (n < 0) /* packet error */ return n; if (len > 0) /* more data pending */ desc->i_remain = len; - return count; + return 0; } } @@ -10653,9 +10664,7 @@ static int tcp_deliver(tcp_descriptor* desc, int len) len = 0; if (!desc->inet.active) { - if (!desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); - } + cancel_multi_timer(&(desc->mtd), INETP(desc)->port, &tcp_inet_recv_timeout); sock_select(INETP(desc),(FD_READ|FD_CLOSE),0); if (desc->i_buf != NULL) tcp_restart_input(desc); @@ -10681,7 +10690,7 @@ static int tcp_recv(tcp_descriptor* desc, int request_len) int len; int nread; - if (desc->i_buf == NULL) { /* allocte a read buffer */ + if (desc->i_buf == NULL) { /* allocate a read buffer */ int sz = (request_len > 0) ? request_len : desc->inet.bufsz; if ((desc->i_buf = alloc_buffer(sz)) == NULL) @@ -10754,10 +10763,11 @@ static int tcp_recv(tcp_descriptor* desc, int request_len) return tcp_deliver(desc, desc->i_ptr - desc->i_ptr_start); } else { - if ((nread = tcp_remain(desc, &len)) < 0) + nread = tcp_remain(desc, &len); + if (nread < 0) return tcp_recv_error(desc, EMSGSIZE); else if (nread == 0) - return tcp_deliver(desc, len); + return tcp_deliver(desc, len); else if (len > 0) desc->i_remain = len; /* set remain */ } @@ -11135,8 +11145,8 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) if (IS_BUSY(INETP(desc))) { desc->inet.caller = desc->inet.busy_caller; if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); - desc->busy_on_send = 0; + cancel_multi_timer(&(desc->mtd), INETP(desc)->port, &tcp_inet_send_timeout); + desc->busy_on_send = 0; } desc->inet.state &= ~INET_F_BUSY; set_busy_port(desc->inet.port, 0); @@ -11288,7 +11298,9 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) set_busy_port(desc->inet.port, 1); if (desc->send_timeout != INET_INFINITY) { desc->busy_on_send = 1; - driver_set_timer(desc->inet.port, desc->send_timeout); + add_multi_timer(&(desc->mtd), INETP(desc)->port, + 0 /* arg */, desc->send_timeout /* timeout */, + &tcp_inet_send_timeout); } return 1; } @@ -11386,7 +11398,9 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, ErlDrvSizeT len) set_busy_port(desc->inet.port, 1); if (desc->send_timeout != INET_INFINITY) { desc->busy_on_send = 1; - driver_set_timer(desc->inet.port, desc->send_timeout); + add_multi_timer(&(desc->mtd), INETP(desc)->port, + 0 /* arg */, desc->send_timeout /* timeout */, + &tcp_inet_send_timeout); } return 1; } @@ -11490,7 +11504,8 @@ static int tcp_sendfile_completed(tcp_descriptor* desc) { /* if we have a timer then cancel and send ok to client */ if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(&(desc->mtd), INETP(desc)->port, + &tcp_inet_send_timeout); desc->busy_on_send = 0; } @@ -11822,7 +11837,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) set_busy_port(desc->inet.port, 0); /* if we have a timer then cancel and send ok to client */ if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(&(desc->mtd), INETP(desc)->port, &tcp_inet_send_timeout); desc->busy_on_send = 0; } inet_reply_ok(INETP(desc)); @@ -12634,7 +12649,7 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) udesc->i_buf = NULL; if (!desc->active) { async_error(desc, err); - driver_cancel_timer(desc->port); + driver_cancel_timer(desc->port); sock_select(desc,FD_READ,0); } else { @@ -12723,7 +12738,7 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) return count; count++; if (!desc->active) { - driver_cancel_timer(desc->port); /* possibly cancel */ + driver_cancel_timer(desc->port); sock_select(desc,FD_READ,0); return count; /* passive mode (read one packet only) */ } @@ -12806,26 +12821,28 @@ static void fire_multi_timers(MultiTimerData **first, ErlDrvPort port, ErlDrvData data) { ErlDrvTime next_timeout; - if (!*first) { + MultiTimerData *curr = *first; + if (!curr) { ASSERT(0); return; } #ifdef DEBUG { ErlDrvTime chk = erl_drv_monotonic_time(ERL_DRV_MSEC); - ASSERT(chk >= (*first)->when); + ASSERT(chk >= curr->when); } #endif do { - MultiTimerData *save = *first; - *first = save->next; + MultiTimerData *save = curr; + *first = save->next; + curr = *first; (*(save->timeout_function))(data,save->caller); FREE(save); - if (*first == NULL) { + if (curr == NULL) { return; } - (*first)->prev = NULL; - next_timeout = (*first)->when - erl_drv_monotonic_time(ERL_DRV_MSEC); + curr->prev = NULL; + next_timeout = curr->when - erl_drv_monotonic_time(ERL_DRV_MSEC); } while (next_timeout <= 0); driver_set_timer(port, (unsigned long) next_timeout); } @@ -12862,6 +12879,20 @@ static void remove_multi_timer(MultiTimerData **first, ErlDrvPort port, MultiTim FREE(p); } +/* Cancel a timer based on the timeout_fun */ +static void cancel_multi_timer(MultiTimerData **first, ErlDrvPort port, + void (*timeout_fun)(ErlDrvData drv_data, + ErlDrvTermData caller)) +{ + MultiTimerData *timer = *first; + while(timer && timer->timeout_function != timeout_fun) { + timer = timer->next; + } + if (timer) { + remove_multi_timer(first, port, timer); + } +} + static MultiTimerData *add_multi_timer(MultiTimerData **first, ErlDrvPort port, ErlDrvTermData caller, unsigned timeout, void (*timeout_fun)(ErlDrvData drv_data, @@ -12869,7 +12900,7 @@ static MultiTimerData *add_multi_timer(MultiTimerData **first, ErlDrvPort port, { MultiTimerData *mtd, *p, *s; mtd = ALLOC(sizeof(MultiTimerData)); - mtd->when = erl_drv_monotonic_time(ERL_DRV_MSEC) + ((ErlDrvTime) timeout) + 1; + mtd->when = erl_drv_monotonic_time(ERL_DRV_MSEC) + ((ErlDrvTime) timeout); mtd->timeout_function = timeout_fun; mtd->caller = caller; mtd->next = mtd->prev = NULL; |