diff options
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 58 | ||||
-rw-r--r-- | erts/preloaded/src/prim_inet.erl | 21 |
2 files changed, 56 insertions, 23 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 5196eb51c6..e001f31932 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -293,6 +293,10 @@ static BOOL (WINAPI *fpSetHandleInformation)(HANDLE,DWORD,DWORD); static unsigned long zero_value = 0; static unsigned long one_value = 1; +#define TCP_SHUT_WR SD_SEND +#define TCP_SHUT_RD SD_RECEIVE +#define TCP_SHUT_RDWR SD_BOTH + #elif defined (__OSE__) /* @@ -421,6 +425,10 @@ typedef unsigned long u_long; inet_driver_select((d), (flags), (onoff)); \ } while(0) +#define TCP_SHUT_WR SHUT_WR +#define TCP_SHUT_RD SHUT_RD +#define TCP_SHUT_RDWR SHUT_RDWR + #else /* !__OSE__ && !__WIN32__ */ #include <sys/time.h> @@ -691,6 +699,9 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) inet_driver_select((d)->port, (ErlDrvEvent)(long)(d)->event, (flags), (onoff)); \ } while(0) +#define TCP_SHUT_WR SHUT_WR +#define TCP_SHUT_RD SHUT_RD +#define TCP_SHUT_RDWR SHUT_RDWR #endif /* !__WIN32__ && !__OSE__ */ @@ -820,6 +831,10 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define TCP_ADDF_CLOSE_SENT 2 /* Close sent (active mode only) */ #define TCP_ADDF_DELAYED_CLOSE_RECV 4 /* If receive fails, report {error,closed} (passive mode) */ #define TCP_ADDF_DELAYED_CLOSE_SEND 8 /* If send fails, report {error,closed} (passive mode) */ +#define TCP_ADDF_PENDING_SHUT_WR 16 /* Call shutdown(sock, SHUT_WR) when queue empties */ +#define TCP_ADDF_PENDING_SHUT_RDWR 32 /* Call shutdown(sock, SHUT_RDWR) when queue empties */ +#define TCP_ADDF_PENDING_SHUTDOWN \ + (TCP_ADDF_PENDING_SHUT_WR | TCP_ADDF_PENDING_SHUT_RDWR) /* *_REQ_* replies */ #define INET_REP_ERROR 0 @@ -1407,6 +1422,8 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev); static int tcp_recv(tcp_descriptor* desc, int request_len); static int tcp_deliver(tcp_descriptor* desc, int len); +static int tcp_shutdown_error(tcp_descriptor* desc, int err); + static int tcp_inet_output(tcp_descriptor* desc, HANDLE event); static int tcp_inet_input(tcp_descriptor* desc, HANDLE event); @@ -9473,10 +9490,18 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, return ctl_error(EINVAL, rbuf, rsize); } how = buf[0]; - if (sock_shutdown(INETP(desc)->s, how) == 0) { + if (how != TCP_SHUT_RD && driver_sizeq(desc->inet.port) > 0) { + if (how == TCP_SHUT_WR) { + desc->tcp_add_flags |= TCP_ADDF_PENDING_SHUT_WR; + } else if (how == TCP_SHUT_RDWR) { + desc->tcp_add_flags |= TCP_ADDF_PENDING_SHUT_RDWR; + } return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); - } else { + } + if (IS_SOCKET_ERROR(sock_shutdown(INETP(desc)->s, how))) { return ctl_error(sock_errno(), rbuf, rsize); + } else { + return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); } } default: @@ -9613,6 +9638,8 @@ static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev) else inet_reply_error(INETP(desc), ENOTCONN); } + else if (desc->tcp_add_flags & TCP_ADDF_PENDING_SHUTDOWN) + tcp_shutdown_error(desc, EPIPE); else if (tcp_sendv(desc, ev) == 0) inet_reply_ok(INETP(desc)); DEBUGF(("tcp_inet_commandv(%ld) }\r\n", (long)desc->inet.port)); @@ -10506,7 +10533,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) return ret; } -static int tcp_send_error(tcp_descriptor* desc, int err) +static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) { /* * If the port is busy, we must do some clean-up before proceeding. @@ -10563,6 +10590,16 @@ static int tcp_send_error(tcp_descriptor* desc, int err) return -1; } +static int tcp_send_error(tcp_descriptor* desc, int err) +{ + return tcp_send_or_shutdown_error(desc, err); +} + +static int tcp_shutdown_error(tcp_descriptor* desc, int err) +{ + return tcp_send_or_shutdown_error(desc, err); +} + /* ** Send non-blocking vector data */ @@ -10763,6 +10800,19 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, ErlDrvSizeT len) return 0; } +/* shutdown on the socket: +** Assume caller has confirmed TCP_ADDF_PENDING_SHUTDOWN is set. +*/ +static void tcp_shutdown_async(tcp_descriptor* desc) +{ + int how; + + how = (desc->tcp_add_flags & TCP_ADDF_PENDING_SHUT_WR) ? + TCP_SHUT_WR : TCP_SHUT_RDWR; + if (IS_SOCKET_ERROR(sock_shutdown(INETP(desc)->s, how))) + tcp_shutdown_error(desc, sock_errno()); +} + #ifdef __OSE__ static void tcp_inet_drv_output_ose(ErlDrvData data, ErlDrvEvent event) @@ -10891,6 +10941,8 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) if ((iov = driver_peekq(ix, &vsize)) == NULL) { sock_select(INETP(desc), FD_WRITE, 0); send_empty_out_q_msgs(INETP(desc)); + if (desc->tcp_add_flags & TCP_ADDF_PENDING_SHUTDOWN) + tcp_shutdown_async(desc); goto done; } vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize; diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl index 79ff013c77..622e1be869 100644 --- a/erts/preloaded/src/prim_inet.erl +++ b/erts/preloaded/src/prim_inet.erl @@ -127,37 +127,18 @@ drv2protocol(_) -> undefined. %% TODO: shutdown equivalent for SCTP %% shutdown(S, read) when is_port(S) -> - shutdown_2(S, 0); + shutdown_1(S, 0); shutdown(S, write) when is_port(S) -> shutdown_1(S, 1); shutdown(S, read_write) when is_port(S) -> shutdown_1(S, 2). shutdown_1(S, How) -> - case subscribe(S, [subs_empty_out_q]) of - {ok,[{subs_empty_out_q,N}]} when N > 0 -> - shutdown_pend_loop(S, N); %% wait for pending output to be sent - _Other -> ok - end, - shutdown_2(S, How). - -shutdown_2(S, How) -> case ctl_cmd(S, ?TCP_REQ_SHUTDOWN, [How]) of {ok, []} -> ok; {error,_}=Error -> Error end. -shutdown_pend_loop(S, N0) -> - receive - {empty_out_q,S} -> ok - after ?INET_CLOSE_TIMEOUT -> - case getstat(S, [send_pend]) of - {ok,[{send_pend,N0}]} -> ok; - {ok,[{send_pend,N}]} -> shutdown_pend_loop(S, N); - _ -> ok - end - end. - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% %% CLOSE(insock()) -> ok |