diff options
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 281 | ||||
-rw-r--r-- | erts/emulator/test/Makefile | 13 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 82 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest.hrl | 32 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_client.hrl | 141 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_client.erl | 617 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_client_gen.erl | 45 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_client_socket.erl | 45 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_gen.erl | 123 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_server.erl | 678 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_server_gen.erl | 34 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_server_socket.erl | 34 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_socket.erl | 345 |
13 files changed, 2299 insertions, 171 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index cdd9f0fb15..3da895e644 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -436,7 +436,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL; #define SOCKET_RECV_FLAG_LOW SOCKET_RECV_FLAG_CMSG_CLOEXEC #define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_TRUNC -#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048 +#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 8192 #define SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT 1024 #define SOCKET_SEND_CTRL_BUFFER_SIZE_DEFAULT 1024 @@ -775,7 +775,7 @@ typedef struct { ErlNifEnv* env; /* +++ Controller (owner) process +++ */ - ErlNifPid ctrlPid; + ErlNifPid ctrlPid; // ErlNifMonitor ctrlMon; ESockMonitor ctrlMon; @@ -5232,7 +5232,6 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, else save_errno = -1; // The value does not actually matter in this case - return send_check_result(env, descP, written, sndDataP->size, save_errno, sendRef); @@ -6185,8 +6184,10 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, int type = descP->type; int protocol = descP->protocol; - SSDBG( descP, ("SOCKET", "nclose -> [%d] entry (0x%lX, 0x%lX, 0x%lX)\r\n", + SSDBG( descP, ("SOCKET", + "nclose -> [%d] entry (0x%lX, 0x%lX, 0x%lX, 0x%lX)\r\n", descP->sock, + descP->state, descP->currentWriterP, descP->currentReaderP, descP->currentAcceptorP) ); @@ -6246,7 +6247,6 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "nclose -> [%d] stop was called\r\n", descP->sock) ); dec_socket(domain, type, protocol); - DEMONP("nclose -> closer", env, descP, &descP->closerMon); reply = esock_atom_ok; } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) { /* The stop callback function has been *scheduled* which means that we @@ -6284,8 +6284,9 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "nclose -> [%d] done when: " + "\r\n state: 0x%lX" "\r\n reply: %T" - "\r\n", descP->sock, reply) ); + "\r\n", descP->sock, descP->state, reply) ); return reply; } @@ -16920,8 +16921,29 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) MLOCK(descP->accMtx); MLOCK(descP->closeMtx); - SSDBG( descP, ("SOCKET", "socket_stop -> all mutex(s) locked\r\n") ); - + SSDBG( descP, ("SOCKET", "socket_stop -> " + "[%d, %T] all mutex(s) locked when counters:" + "\r\n writePkgCnt: %u" + "\r\n writeByteCnt: %u" + "\r\n writeTries: %u" + "\r\n writeWaits: %u" + "\r\n writeFails: %u" + "\r\n readPkgCnt: %u" + "\r\n readByteCnt: %u" + "\r\n readTries: %u" + "\r\n readWaits: %u" + "\r\n", + descP->sock, descP->ctrlPid, + descP->writePkgCnt, + descP->writeByteCnt, + descP->writeTries, + descP->writeWaits, + descP->writeFails, + descP->readPkgCnt, + descP->readByteCnt, + descP->readTries, + descP->readWaits) ); + descP->state = SOCKET_STATE_CLOSING; // Just in case...??? descP->isReadable = FALSE; descP->isWritable = FALSE; @@ -17063,7 +17085,8 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * </KOLLA> */ - if (descP->closeLocal) { + if (descP->closeLocal && + !is_direct_call) { /* +++ send close message to the waiting process +++ * @@ -17073,13 +17096,18 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * * WHAT HAPPENS IF THE RECEIVER HAS DIED IN THE MEANTIME???? * + * Also, we should really *always* use a tag unique to this + * (nif-) module. Some like (in this case): + * + * {'$socket', close, CloseRef} + * * </KOLLA> */ - send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid); + send_msg(env, + MKT2(env, atom_close, descP->closeRef), &descP->closerPid); - DEMONP("socket_stop -> closer", - env, descP, &descP->closerMon); + DEMONP("socket_stop -> closer", env, descP, &descP->closerMon); } else { @@ -17218,137 +17246,144 @@ void socket_down(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "socket_down -> OTHER mon\r\n") ); } */ - - if (compare_pids(env, &descP->ctrlPid, pid)) { - int selectRes; - - /* We don't bother with the queue cleanup here - - * we leave it to the stop callback function. - */ - - SSDBG( descP, - ("SOCKET", "socket_down -> controlling process exit\r\n") ); - descP->state = SOCKET_STATE_CLOSING; - descP->closeLocal = TRUE; - descP->closerPid = *pid; - descP->closerMon = (ESockMonitor) *mon; - descP->closeRef = MKREF(env); // Do we really need this in this case? - - /* - esock_dbg_printf("DOWN", - "[%d] select stop %u,%u,%u,%d\r\n", - descP->sock, - monP->raw[0], monP->raw[1], - monP->raw[2], monP->raw[3]); - */ + if (!IS_CLOSED(descP)) { + if (compare_pids(env, &descP->ctrlPid, pid)) { + int selectRes; - selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP), - descP, NULL, descP->closeRef); + /* We don't bother with the queue cleanup here - + * we leave it to the stop callback function. + */ - if (selectRes & ERL_NIF_SELECT_STOP_CALLED) { - /* We are done - wwe can finalize (socket close) directly */ SSDBG( descP, - ("SOCKET", "socket_down -> [%d] stop called\r\n", descP->sock) ); - dec_socket(descP->domain, descP->type, descP->protocol); - descP->state = SOCKET_STATE_CLOSED; - - /* And finally close the socket. - * Since we close the socket because of an exiting owner, - * we do not need to wait for buffers to sync (linger). - * If the owner wish to ensure the buffer are written, - * it should have closed teh socket explicitly... - */ - if (sock_close(descP->sock) != 0) { - int save_errno = sock_errno(); - - esock_warning_msg("Failed closing socket for terminating " - "controlling process: " - "\r\n Controlling Process: %T" - "\r\n Descriptor: %d" - "\r\n Errno: %d" - "\r\n", pid, descP->sock, save_errno); - } - sock_close_event(descP->event); + ("SOCKET", "socket_down -> controlling process exit\r\n") ); + + descP->state = SOCKET_STATE_CLOSING; + descP->closeLocal = TRUE; + descP->closerPid = *pid; + descP->closerMon = (ESockMonitor) *mon; + descP->closeRef = MKREF(env); // Do we really need this in this case? - descP->sock = INVALID_SOCKET; - descP->event = INVALID_EVENT; + /* + esock_dbg_printf("DOWN", + "[%d] select stop %u,%u,%u,%d\r\n", + descP->sock, + monP->raw[0], monP->raw[1], + monP->raw[2], monP->raw[3]); + */ - descP->state = SOCKET_STATE_CLOSED; + selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP), + descP, NULL, descP->closeRef); + + if (selectRes & ERL_NIF_SELECT_STOP_CALLED) { + /* We are done - we can finalize (socket close) directly */ + SSDBG( descP, + ("SOCKET", + "socket_down -> [%d] stop called\r\n", descP->sock) ); + dec_socket(descP->domain, descP->type, descP->protocol); + descP->state = SOCKET_STATE_CLOSED; + + /* And finally close the socket. + * Since we close the socket because of an exiting owner, + * we do not need to wait for buffers to sync (linger). + * If the owner wish to ensure the buffer are written, + * it should have closed the socket explicitly... + */ + if (sock_close(descP->sock) != 0) { + int save_errno = sock_errno(); + + esock_warning_msg("Failed closing socket for terminating " + "controlling process: " + "\r\n Controlling Process: %T" + "\r\n Descriptor: %d" + "\r\n Errno: %d" + "\r\n", pid, descP->sock, save_errno); + } + sock_close_event(descP->event); - } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) { - /* The stop callback function has been *scheduled* which means that - * "should" wait for it to complete. But since we are in a callback - * (down) function, we cannot... - * So, we must close the socket - */ - SSDBG( descP, - ("SOCKET", - "socket_down -> [%d] stop scheduled\r\n", descP->sock) ); - dec_socket(descP->domain, descP->type, descP->protocol); + descP->sock = INVALID_SOCKET; + descP->event = INVALID_EVENT; - /* And now what? We can't wait for the stop function here... - * So, we simply close it here and leave the rest of the "close" - * for later (when the stop function actually gets called... - */ + descP->state = SOCKET_STATE_CLOSED; - if (sock_close(descP->sock) != 0) { - int save_errno = sock_errno(); - - esock_warning_msg("Failed closing socket for terminating " - "controlling process: " + } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) { + /* The stop callback function has been *scheduled* which means + * that "should" wait for it to complete. But since we are in + * a callback (down) function, we cannot... + * So, we must close the socket + */ + SSDBG( descP, + ("SOCKET", + "socket_down -> [%d] stop scheduled\r\n", + descP->sock) ); + dec_socket(descP->domain, descP->type, descP->protocol); + + /* And now what? We can't wait for the stop function here... + * So, we simply close it here and leave the rest of the "close" + * for later (when the stop function actually gets called... + */ + + if (sock_close(descP->sock) != 0) { + int save_errno = sock_errno(); + + esock_warning_msg("Failed closing socket for terminating " + "controlling process: " + "\r\n Controlling Process: %T" + "\r\n Descriptor: %d" + "\r\n Errno: %d" + "\r\n", pid, descP->sock, save_errno); + } + sock_close_event(descP->event); + + } else { + + /* + * <KOLLA> + * + * WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET, + * SO WE DON'T LET STUFF LEAK. + * NOW, BECAUSE WE FAILED TO SELECT, WE CANNOT FINISH + * THE CLOSE, WHAT TO DO? ABORT? + * + * </KOLLA> + */ + esock_warning_msg("Failed selecting stop when handling down " + "of controlling process: " + "\r\n Select Res: %d" "\r\n Controlling Process: %T" "\r\n Descriptor: %d" - "\r\n Errno: %d" - "\r\n", pid, descP->sock, save_errno); + "\r\n Monitor: %u.%u.%u.%u" + "\r\n", selectRes, pid, descP->sock, + monP->raw[0], monP->raw[1], + monP->raw[2], monP->raw[3]); } - sock_close_event(descP->event); - + } else { - /* - * <KOLLA> + /* check all operation queue(s): acceptor, writer and reader. * - * WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET, - * SO WE DON'T LET STUFF LEAK. - * NOW, BECAUSE WE FAILED TO SELECT, WE CANNOT FINISH - * THE CLOSE, WHAT TO DO? ABORT? + * Is it really any point in doing this if the socket is closed? * - * </KOLLA> */ - esock_warning_msg("Failed selecting stop when handling down " - "of controlling process: " - "\r\n Select Res: %d" - "\r\n Controlling Process: %T" - "\r\n Descriptor: %d" - "\r\n Monitor: %u.%u.%u.%u" - "\r\n", selectRes, pid, descP->sock, - monP->raw[0], monP->raw[1], - monP->raw[2], monP->raw[3]); - } - - } else { - - /* check all operation queue(s): acceptor, writer and reader. */ - - SSDBG( descP, ("SOCKET", "socket_down -> other process term\r\n") ); + SSDBG( descP, ("SOCKET", "socket_down -> other process term\r\n") ); - MLOCK(descP->accMtx); - if (descP->currentAcceptorP != NULL) - socket_down_acceptor(env, descP, pid); - MUNLOCK(descP->accMtx); - - MLOCK(descP->writeMtx); - if (descP->currentWriterP != NULL) - socket_down_writer(env, descP, pid); - MUNLOCK(descP->writeMtx); - - MLOCK(descP->readMtx); - if (descP->currentReaderP != NULL) - socket_down_reader(env, descP, pid); - MUNLOCK(descP->readMtx); + MLOCK(descP->accMtx); + if (descP->currentAcceptorP != NULL) + socket_down_acceptor(env, descP, pid); + MUNLOCK(descP->accMtx); + + MLOCK(descP->writeMtx); + if (descP->currentWriterP != NULL) + socket_down_writer(env, descP, pid); + MUNLOCK(descP->writeMtx); + MLOCK(descP->readMtx); + if (descP->currentReaderP != NULL) + socket_down_reader(env, descP, pid); + MUNLOCK(descP->readMtx); + + } } /* diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile index 22c2f24a80..a910588381 100644 --- a/erts/emulator/test/Makefile +++ b/erts/emulator/test/Makefile @@ -34,6 +34,14 @@ SOCKET_MODULES = \ socket_client \ socket_test_lib \ socket_test_evaluator \ + socket_test_ttest_tcp_gen \ + socket_test_ttest_tcp_socket \ + socket_test_ttest_tcp_client \ + socket_test_ttest_tcp_client_gen \ + socket_test_ttest_tcp_client_socket \ + socket_test_ttest_tcp_server \ + socket_test_ttest_tcp_server_gen \ + socket_test_ttest_tcp_server_socket \ socket_SUITE MODULES= \ @@ -158,7 +166,10 @@ NATIVE_MODULES= $(NATIVE:%=%_native_SUITE) NATIVE_ERL_FILES= $(NATIVE_MODULES:%=%.erl) ERL_FILES= $(MODULES:%=%.erl) -HRL_FILES= socket_test_evaluator.hrl +HRL_FILES= \ + socket_test_evaluator.hrl \ + socket_test_ttest.hrl \ + socket_test_ttest_client.hrl TARGET_FILES = $(MODULES:%=$(EBIN)/%.$(EMULATOR)) SOCKET_TARGETS = $(SOCKET_MODULES:%=$(EBIN)/%.$(EMULATOR)) diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index f2aace40de..71e32f8e95 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -1980,7 +1980,7 @@ api_toc_tcp_client(Parent, GL) -> {To, ConLimit} = api_toc_tcp_client_await_continue(Parent, connect), Result = api_to_connect_tcp_await_timeout(To, ServerSA, Domain, ConLimit), api_toc_tcp_client_announce_ready(Parent, connect, Result), - Reason = sc_rs_tcp_client_await_terminate(Parent), + Reason = api_toc_tcp_client_await_terminate(Parent), exit(Reason). api_toc_tcp_client_init(Parent, GL) -> @@ -2057,7 +2057,7 @@ api_to_connect_tcp_await_timeout(ID, ConLimit, To, ServerSA, NewSock, Acc) -> ERROR end. -api_to_connect_tcp_await_timeout2(ID, To, ServerSA, NewSock) -> +api_to_connect_tcp_await_timeout2(_ID, To, ServerSA, NewSock) -> Sock = NewSock(), %% ?SEV_IPRINT("~w: try connect", [ID]), Start = t(), @@ -10157,37 +10157,6 @@ tpp_udp_sock_close(Sock) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% This gets the local address (not 127.0...) -%% We should really implement this using the (new) net module, -%% but until that gets the necessary functionality... -which_local_addr(Domain) -> - case inet:getifaddrs() of - {ok, IFL} -> - which_addr(Domain, IFL); - {error, Reason} -> - ?FAIL({inet, getifaddrs, Reason}) - end. - -which_addr(_Domain, []) -> - ?FAIL(no_address); -which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") -> - which_addr2(Domain, IFO); -which_addr(Domain, [_|IFL]) -> - which_addr(Domain, IFL). - -which_addr2(_Domain, []) -> - ?FAIL(no_address); -which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) -> - Addr; -which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) -> - Addr; -which_addr2(Domain, [_|IFO]) -> - which_addr2(Domain, IFO). - - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - start_node(Host, NodeName) -> UniqueNodeName = f("~w_~w", [NodeName, erlang:system_time(millisecond)]), case do_start_node(Host, UniqueNodeName) of @@ -10335,6 +10304,25 @@ which_local_addr(Domain) -> ?FAIL({inet, getifaddrs, Reason}) end. +which_addr(_Domain, []) -> + ?FAIL(no_address); +which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") -> + which_addr2(Domain, IFO); +which_addr(Domain, [_|IFL]) -> + which_addr(Domain, IFL). + +which_addr2(_Domain, []) -> + ?FAIL(no_address); +which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) -> + Addr; +which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) -> + Addr; +which_addr2(Domain, [_|IFO]) -> + which_addr2(Domain, IFO). + + + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -10470,21 +10458,21 @@ f(F, A) -> %% i(Before ++ FStr ++ After, []). -d(F, A) -> - d(get(dbg_fd), F, A). +%% d(F, A) -> +%% d(get(dbg_fd), F, A). -d(undefined, F, A) -> - [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]), - DbgFileName = f("~s-dbg.txt", [NodeNameStr]), - case file:open(DbgFileName, [write]) of - {ok, FD} -> - put(dbg_fd, FD), - d(FD, F, A); - {error, Reason} -> - exit({failed_open_dbg_file, Reason}) - end; -d(FD, F, A) -> - io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]). +%% d(undefined, F, A) -> +%% [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]), +%% DbgFileName = f("~s-dbg.txt", [NodeNameStr]), +%% case file:open(DbgFileName, [write]) of +%% {ok, FD} -> +%% put(dbg_fd, FD), +%% d(FD, F, A); +%% {error, Reason} -> +%% exit({failed_open_dbg_file, Reason}) +%% end; +%% d(FD, F, A) -> +%% io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]). i(F) -> i(F, []). diff --git a/erts/emulator/test/socket_test_ttest.hrl b/erts/emulator/test/socket_test_ttest.hrl new file mode 100644 index 0000000000..1a004a9a7a --- /dev/null +++ b/erts/emulator/test/socket_test_ttest.hrl @@ -0,0 +1,32 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-ifndef(socket_test_ttest). +-define(socket_test_ttest, true). + +-define(TTEST_TAG, 42). +-define(TTEST_TYPE_REQUEST, 101). +-define(TTEST_TYPE_REPLY, 102). + +-define(SECS(I), timer:seconds(I)). + +-define(SLEEP(T), receive after T -> ok end). + +-endif. % -ifdef(socket_test_ttest). diff --git a/erts/emulator/test/socket_test_ttest_client.hrl b/erts/emulator/test/socket_test_ttest_client.hrl new file mode 100644 index 0000000000..84e736cc34 --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_client.hrl @@ -0,0 +1,141 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-ifndef(socket_test_ttest_client). +-define(socket_test_ttest_client, true). + +-define(MSG_ID_DEFAULT, 2). +-define(RUNTIME_DEFAULT, ?SECS(10)). +-define(MAX_ID, 16#FFFFFFFF). + +-define(MSG_DATA1, <<"This is test data 0123456789 0123456789 0123456789">>). +-define(MSG_DATA2, <<"This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789">>). +-define(MSG_DATA3, <<"This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789" + "This is test data 0123456789 0123456789 0123456789">>). + + +-endif. % -ifdef(socket_test_ttest_client). diff --git a/erts/emulator/test/socket_test_ttest_tcp_client.erl b/erts/emulator/test/socket_test_ttest_tcp_client.erl new file mode 100644 index 0000000000..1bd1bc54e9 --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_tcp_client.erl @@ -0,0 +1,617 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +%% ========================================================================== +%% +%% This is the "simple" client using gen_tcp. The client is supposed to be +%% as simple as possible in order to incur as little overhead as possible. +%% +%% There are three ways to run the client: active, passive or active-once. +%% +%% The client is the entity that controls the test, timing and counting. +%% +%% ========================================================================== +%% +%% Before the actual test starts, the client performs a "warmup". +%% The warmup has two functions. First, to ensure that everything is "loaded" +%% and, second, to calculate an approximate roundtrip time, in order to +%% "know" how many iterations we should make (to run for the expected time). +%% This is not intended to be exact, but just to ensure that all tests take +%% approx the same time to run. +%% +%% ========================================================================== + +-module(socket_test_ttest_tcp_client). + +-export([ + start_monitor/4, start_monitor/5, start_monitor/7, + stop/1 + ]). + +-include_lib("kernel/include/inet.hrl"). +-include("socket_test_ttest.hrl"). +-include("socket_test_ttest_client.hrl"). + +-define(RECV_TIMEOUT, 10000). +-define(MAX_OUTSTANDING_DEFAULT_1, 100). +-define(MAX_OUTSTANDING_DEFAULT_2, 10). +-define(MAX_OUTSTANDING_DEFAULT_3, 3). + + +-type active() :: once | boolean(). +-type msg_id() :: 1..3. +-type max_outstanding() :: pos_integer(). +-type runtime() :: pos_integer(). + + +%% ========================================================================== + +-spec start_monitor(Mod, Active, Addr, Port) -> term() when + Mod :: atom(), + Active :: active(), + Addr :: inet:ip_address(), + Port :: inet:port_number(). + +%% RunTime is in number of ms. +start_monitor(Mod, Active, Addr, Port) -> + start_monitor(Mod, Active, Addr, Port, ?MSG_ID_DEFAULT). + +-spec start_monitor(Mod, Active, Addr, Port, MsgID) -> term() when + Mod :: atom(), + Active :: active(), + Addr :: inet:ip_address(), + Port :: inet:port_number(), + MsgID :: msg_id(). + +%% RunTime is in number of ms. +start_monitor(Mod, Active, Addr, Port, 1 = MsgID) -> + start_monitor(Mod, Active, Addr, Port, MsgID, + ?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT); +start_monitor(Mod, Active, Addr, Port, 2 = MsgID) -> + start_monitor(Mod, Active, Addr, Port, MsgID, + ?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT); +start_monitor(Mod, Active, Addr, Port, 3 = MsgID) -> + start_monitor(Mod, Active, Addr, Port, MsgID, + ?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT). + +-spec start_monitor(Mod, + Active, + Addr, + Port, + MsgID, + MaxOutstanding, + RunTime) -> term() when + Mod :: atom(), + Active :: active(), + Addr :: inet:ip_address(), + Port :: inet:port_number(), + MsgID :: msg_id(), + MaxOutstanding :: max_outstanding(), + RunTime :: runtime(). + +%% RunTime is in number of ms. +start_monitor(Mod, Active, Addr, Port, + MsgID, MaxOutstanding, RunTime) + when is_atom(Mod) andalso + (is_boolean(Active) orelse (Active =:= once)) andalso + is_tuple(Addr) andalso + (is_integer(Port) andalso (Port > 0)) andalso + (is_integer(MsgID) andalso (MsgID >= 1) andalso (MsgID =< 3)) andalso + (is_integer(MaxOutstanding) andalso (MaxOutstanding > 0)) andalso + (is_integer(RunTime) andalso (RunTime > 0)) -> + Self = self(), + ClientInit = fun() -> put(sname, "client"), + init(Self, + Mod, Active, Addr, Port, + MsgID, MaxOutstanding, RunTime) + end, + {Pid, MRef} = spawn_monitor(ClientInit), + receive + {?MODULE, Pid, ok} -> + erlang:demonitor(MRef, [flush]), + {ok, {Pid, MRef}}; + + {?MODULE, Pid, {error, _} = ERROR} -> + erlang:demonitor(MRef, [flush]), + ERROR; + + {'DOWN', MRef, process, Pid, normal} -> + ok; + {'DOWN', MRef, process, Pid, Reason} -> + {error, {exit, Reason}} + + end. + + +%% We should not normally stop this (it terminates when its done). +stop(Pid) when is_pid(Pid) -> + req(Pid, stop). + + +%% ========================================================================== + +init(Parent, Mod, Active, Addr, Port, + MsgID, MaxOutstanding, RunTime) -> + i("init -> entry with" + "~n Parent: ~p" + "~n Mod: ~p" + "~n Active: ~p" + "~n Addr: ~s" + "~n Port: ~p" + "~n Msg ID: ~p (=> 16 + ~w bytes)" + "~n Max Outstanding: ~p" + "~n (Suggested) Run Time: ~p ms", + [Parent, + Mod, Active, inet:ntoa(Addr), Port, + MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]), + case Mod:connect(Addr, Port) of + {ok, Sock} -> + i("init -> connected"), + Parent ! {?MODULE, self(), ok}, + initial_activation(Mod, Sock, Active), + Results = loop(#{slogan => run, + runtime => RunTime, + start => t(), + parent => Parent, + mod => Mod, + sock => Sock, + active => Active, + msg_data => which_msg_data(MsgID), + outstanding => 0, + max_outstanding => MaxOutstanding, + sid => 1, + rid => 1, + scnt => 0, + rcnt => 0, + bcnt => 0, + num => undefined, + acc => <<>>}), + present_results(Results), + (catch Mod:close(Sock)), + exit(normal); + {error, Reason} -> + i("init -> connect failed: ~p", [Reason]), + exit({connect, Reason}) + end. + +which_msg_data(1) -> ?MSG_DATA1; +which_msg_data(2) -> ?MSG_DATA2; +which_msg_data(3) -> ?MSG_DATA3. + + +present_results(#{status := ok, + runtime := RunTime, + bcnt := ByteCnt, + cnt := NumIterations}) -> + i("Results: " + "~n Run Time: ~s" + "~n ByteCnt: ~s" + "~n NumIterations: ~s", + [format_time(RunTime), + if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) -> + f("~w, ~w", [ByteCnt, RunTime]); + true -> + f("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime]) + end, + if (RunTime =:= 0) -> + "-"; + true -> + f("~p => ~p iterations / ms", + [NumIterations, NumIterations div RunTime]) + end]), + ok; +present_results(#{status := Failure, + runtime := RunTime, + sid := SID, + rid := RID, + scnt := SCnt, + rcnt := RCnt, + bcnt := BCnt, + num := Num}) -> + i("Time Test failed: " + "~n ~p" + "~n" + "~nwhen" + "~n" + "~n Run Time: ~s" + "~n Send ID: ~p" + "~n Recv ID: ~p" + "~n Send Count: ~p" + "~n Recv Count: ~p" + "~n Byte Count: ~p" + "~n Num Iterations: ~p", + [Failure, + format_time(RunTime), + SID, RID, SCnt, RCnt, BCnt, Num]). + + + +loop(#{runtime := RunTime} = State) -> + erlang:start_timer(RunTime, self(), stop), + try do_loop(State) + catch + throw:Results -> + Results + end. + +do_loop(State) -> + do_loop( handle_message( msg_exchange(State) ) ). + +msg_exchange(#{rcnt := Num, num := Num} = State) -> + %% i("we are done"), + finish(ok, State); +msg_exchange(#{scnt := Num, num := Num} = State) -> + %% We are done sending more requests - now we will just await + %% the replies for the (still) outstanding replies. + %% i("we have sent all requests - (only) wait for replies"), + msg_exchange( recv_reply(State) ); +msg_exchange(#{outstanding := Outstanding, + max_outstanding := MaxOutstanding} = State) + when (Outstanding < MaxOutstanding) -> + %% i("send the (initial) requests (~w, ~w)", [Outstanding, MaxOutstanding]), + msg_exchange( send_request(State) ); +msg_exchange(State) -> + send_request( recv_reply(State) ). + + +finish(ok, + #{start := Start, bcnt := BCnt, num := Num}) -> + Stop = t(), + throw(#{status => ok, + runtime => tdiff(Start, Stop), + bcnt => BCnt, + cnt => Num}); +finish(Reason, + #{start := Start, + sid := SID, rid := RID, + scnt := SCnt, rcnt := RCnt, bcnt := BCnt, + num := Num}) -> + Stop = t(), + throw(#{status => Reason, + runtime => tdiff(Start, Stop), + sid => SID, + rid => RID, + scnt => SCnt, + rcnt => RCnt, + bcnt => BCnt, + num => Num}). + +send_request(#{mod := Mod, + sock := Sock, + sid := ID, + scnt := Cnt, + outstanding := Outstanding, + max_outstanding := MaxOutstanding, + msg_data := Data} = State) + when (MaxOutstanding > Outstanding) -> + %% i("send request -> entry when" + %% "~n ID: ~p" + %% "~n Cnt: ~p" + %% "~n Outstanding: ~p" + %% "~n MaxOutstanding: ~p", [ID, Cnt, Outstanding, MaxOutstanding]), + SZ = size(Data), + Req = <<?TTEST_TAG:32, + ?TTEST_TYPE_REQUEST:32, + ID:32, + SZ:32, + Data/binary>>, + case Mod:send(Sock, Req) of + ok -> + %% i("~w bytes sent", [size(Req)]), + State#{sid => next_id(ID), + scnt => Cnt + 1, + outstanding => Outstanding + 1}; + {error, Reason} -> + e("Failed sending request: ~p", [Reason]), + exit({send, Reason}) + end; +send_request(State) -> + State. + + + +recv_reply(#{mod := Mod, + sock := Sock, + rid := ID, + active := false, + bcnt := BCnt, + rcnt := Cnt, + outstanding := Outstanding} = State) -> + %% i("recv-reply(false) -> entry with" + %% "~n (R)ID: ~p" + %% "~n (R)Cnt: ~p" + %% "~n BCnt: ~p" + %% "~n Outstanding: ~p", [ID, Cnt, BCnt, Outstanding]), + case recv_reply_message1(Mod, Sock, ID) of + {ok, MsgSz} -> + State#{rid => next_id(ID), + bcnt => BCnt + MsgSz, + rcnt => Cnt + 1, + outstanding => Outstanding - 1}; + + {error, timeout} -> + i("recv_reply(false) -> error: timeout"), + State; + + {error, Reason} -> + finish(Reason, State) + end; +recv_reply(#{mod := Mod, + sock := Sock, + rid := ID, + active := Active, + bcnt := BCnt, + scnt := SCnt, + rcnt := RCnt, + outstanding := Outstanding, + acc := Acc} = State) -> + %% i("recv-reply(~w) -> entry with" + %% "~n (R)ID: ~p" + %% "~n RCnt: ~p" + %% "~n BCnt: ~p" + %% "~n Outstanding: ~p", [Active, ID, RCnt, BCnt, Outstanding]), + case recv_reply_message2(Mod, Sock, ID, Acc) of + {ok, {MsgSz, NewAcc}} when is_integer(MsgSz) andalso is_binary(NewAcc) -> + maybe_activate(Mod, Sock, Active), + State#{rid => next_id(ID), + bcnt => BCnt + MsgSz, + rcnt => RCnt + 1, + outstanding => Outstanding - 1, + acc => NewAcc}; + + ok -> + State; + + {error, stop} -> + i("recv_reply(~w) -> stop", [Active]), + %% This will have the effect that no more requests are sent... + State#{num => SCnt, stop_started => t()}; + + {error, timeout} -> + i("recv_reply(~w) -> error: timeout", [Active]), + State; + + {error, Reason} -> + finish(Reason, State) + end. + + +%% This function reads exactly one (reply) message. No more no less. +recv_reply_message1(Mod, Sock, ID) -> + %% i("recv_reply_message1 -> entry with" + %% "~n ID: ~w", [ID]), + case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of + {ok, <<?TTEST_TAG:32, + ?TTEST_TYPE_REPLY:32, + ID:32, + SZ:32>> = Hdr} -> + %% Receive the ping-pong reply boby + %% i("recv_reply_message1 -> try read body" + %% "~n ID: ~w", [ID]), + case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of + {ok, Data} when (size(Data) =:= SZ) -> + {ok, size(Hdr) + size(Data)}; + {error, Reason2} -> + i("recv_reply_message1 -> body error: " + "~n ~p: ~p", [Reason2]), + {error, {recv_hdr, Reason2}} + end; + + {ok, <<BadTag:32, + BadType:32, + BadID:32, + BadSZ:32>>} -> + {error, {invalid_hdr, + {?TTEST_TAG, BadTag}, + {?TTEST_TYPE_REPLY, BadType}, + {ID, BadID}, + BadSZ}}; + {ok, _InvHdr} -> + {error, invalid_hdr}; + + {error, Reason1} -> + i("recv_reply_message1 -> hdr error: " + "~n ~p", [Reason1]), + {error, {recv_hdr, Reason1}} + end. + + +%% This function first attempts to process the data we have already +%% accumulated. If that is not enough for a (complete) reply, it +%% will attempt to receive more. +recv_reply_message2(Mod, Sock, ID, Acc) -> + %% i("recv_reply_message2 -> entry with" + %% "~n ID: ~w", [ID]), + case process_acc_data(ID, Acc) of + ok -> + %% No or insufficient data, so get more + recv_reply_message3(Mod, Sock, ID, Acc); + + {ok, _} = OK -> % We already had a reply accumulated - no need to read more + OK; + + {error, _} = ERROR -> + ERROR + end. + +%% This function receives a "chunk" of data, then it tries to extract +%% one (reply) message from the accumulated and new data (combined). +recv_reply_message3(_Mod, Sock, ID, Acc) -> + receive + {timeout, _TRef, stop} -> + %% i("stop - when messages: ~p", [process_info(self(), messages)]), + {error, stop}; + + {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse + (TagClosed =:= socket_closed) -> + {error, closed}; + + {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse + (TagErr =:= socket_error) -> + {error, Reason}; + + {Tag, Sock, Msg} when (Tag =:= tcp) orelse + (Tag =:= socket) -> + %% i("recv_reply_message3 -> got ~w byte message", [size(Msg)]), + process_acc_data(ID, <<Acc/binary, Msg/binary>>) + + %% after ?RECV_TIMEOUT -> + %% {error, timeout} + end. + + +process_acc_data(ID, <<?TTEST_TAG:32, + ?TTEST_TYPE_REPLY:32, + ID:32, + SZ:32, + Data/binary>>) when (SZ =< size(Data)) -> + %% i("process_acc_data -> entry with" + %% "~n ID: ~w" + %% "~n SZ: ~w", [ID, SZ]), + <<_Body:SZ/binary, Rest/binary>> = Data, + {ok, {4*4+SZ, Rest}}; +process_acc_data(ID, <<BadTag:32, + BadType:32, + BadID:32, + BadSZ:32, + _Data/binary>>) + when ((BadTag =/= ?TTEST_TAG) orelse + (BadType =/= ?TTEST_TYPE_REPLY) orelse + (BadID =/= ID)) -> + {error, {invalid_hdr, + {?TTEST_TAG, BadTag}, + {?TTEST_TYPE_REPLY, BadType}, + {ID, BadID}, + BadSZ}}; +%% Not enough for an entire (reply) message +process_acc_data(_ID, _Data) -> + ok. + + +handle_message(#{parent := Parent, sock := Sock, scnt := SCnt} = State) -> + receive + {timeout, _TRef, stop} -> + i("stop"), + %% This will have the effect that no more requests are sent... + State#{num => SCnt, stop_started => t()}; + + {?MODULE, Ref, Parent, stop} -> + %% This *aborts* the test + reply(Parent, Ref, ok), + exit(normal); + + %% Only when active + {TagClosed, Sock, Reason} when (TagClosed =:= tcp_closed) orelse + (TagClosed =:= socket_closed) -> + %% We should never get this (unless the server crashed) + exit({closed, Reason}); + + %% Only when active + {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse + (TagErr =:= socket_error) -> + exit({error, Reason}) + + after 0 -> + State + end. + + +initial_activation(_Mod, _Sock, false = _Active) -> + ok; +initial_activation(Mod, Sock, Active) -> + Mod:active(Sock, Active). + + +maybe_activate(Mod, Sock, once = Active) -> + Mod:active(Sock, Active); +maybe_activate(_, _, _) -> + ok. + + +%% ========================================================================== + +req(Pid, Req) -> + Ref = make_ref(), + Pid ! {?MODULE, Ref, Pid, Req}, + receive + {'EXIT', Pid, Reason} -> + {error, {exit, Reason}}; + {?MODULE, Ref, Reply} -> + Reply + end. + +reply(Pid, Ref, Reply) -> + Pid ! {?MODULE, Ref, Reply}. + + +%% ========================================================================== + +next_id(ID) when (ID < ?MAX_ID) -> + ID + 1; +next_id(_) -> + 1. + + +%% ========================================================================== + +t() -> + os:timestamp(). + +tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> + T1 = A1*1000000000+B1*1000+(C1 div 1000), + T2 = A2*1000000000+B2*1000+(C2 div 1000), + T2 - T1. + +formated_timestamp() -> + format_timestamp(os:timestamp()). + +format_timestamp({_N1, _N2, N3} = TS) -> + {_Date, Time} = calendar:now_to_local_time(TS), + {Hour,Min,Sec} = Time, + FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", + [Hour, Min, Sec, round(N3/1000)]), + lists:flatten(FormatTS). + +%% Time is always in number os ms (milli seconds) +format_time(T) -> + f("~p", [T]). + + +%% ========================================================================== + +f(F, A) -> + lists:flatten(io_lib:format(F, A)). + +%% e(F) -> +%% i("<ERROR> " ++ F). + +e(F, A) -> + p(get(sname), "<ERROR> " ++ F, A). + +i(F) -> + i(F, []). + +i(F, A) -> + p(get(sname), "<INFO> " ++ F, A). + +p(undefined, F, A) -> + p("- ", F, A); +p(Prefix, F, A) -> + io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). diff --git a/erts/emulator/test/socket_test_ttest_tcp_client_gen.erl b/erts/emulator/test/socket_test_ttest_tcp_client_gen.erl new file mode 100644 index 0000000000..4f6152a4b1 --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_tcp_client_gen.erl @@ -0,0 +1,45 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_ttest_tcp_client_gen). + +-export([ + start_monitor/3, start_monitor/4, start_monitor/6, + stop/1 + ]). + +-define(TRANSPORT_MOD, socket_test_ttest_tcp_gen). + +start_monitor(Active, Addr, Port) -> + socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, + Active, Addr, Port). + +start_monitor(Active, Addr, Port, MsgID) -> + socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, + Active, Addr, Port, + MsgID). + +start_monitor(Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> + socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, + Active, Addr, Port, + MsgID, MaxOutstanding, RunTime). + +stop(Pid) -> + socket_test_ttest_tcp_client:stop(Pid). diff --git a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl new file mode 100644 index 0000000000..0d2ec38876 --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl @@ -0,0 +1,45 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_ttest_tcp_client_socket). + +-export([ + start_monitor/3, start_monitor/4, start_monitor/6, + stop/1 + ]). + +-define(TRANSPORT_MOD, socket_test_ttest_tcp_socket). + +start_monitor(Active, Addr, Port) -> + socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, + Active, Addr, Port). + +start_monitor(Active, Addr, Port, MsgID) -> + socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, + Active, Addr, Port, + MsgID). + +start_monitor(Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> + socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, + Active, Addr, Port, + MsgID, MaxOutstanding, RunTime). + +stop(Pid) -> + socket_test_ttest_client:stop(Pid). diff --git a/erts/emulator/test/socket_test_ttest_tcp_gen.erl b/erts/emulator/test/socket_test_ttest_tcp_gen.erl new file mode 100644 index 0000000000..de8822157d --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_tcp_gen.erl @@ -0,0 +1,123 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_ttest_tcp_gen). + +-export([ + accept/1, accept/2, + active/2, + close/1, + connect/2, + controlling_process/2, + listen/0, listen/1, + peername/1, + port/1, + recv/2, recv/3, + send/2, + shutdown/2, + sockname/1 + ]). + + +%% ========================================================================== + +accept(Sock) -> + case gen_tcp:accept(Sock) of + {ok, NewSock} -> + {ok, NewSock}; + {error, _} = ERROR -> + ERROR + end. + +accept(Sock, Timeout) -> + case gen_tcp:accept(Sock, Timeout) of + {ok, NewSock} -> + {ok, NewSock}; + {error, _} = ERROR -> + ERROR + end. + + +active(Sock, NewActive) + when (is_boolean(NewActive) orelse (NewActive =:= once)) -> + inet:setopts(Sock, [{active, NewActive}]). + + +close(Sock) -> + gen_tcp:close(Sock). + + +connect(Addr, Port) -> + Opts = [binary, {packet, raw}, {active, false}], + case gen_tcp:connect(Addr, Port, Opts) of + {ok, Sock} -> + {ok, Sock}; + {error, _} = ERROR -> + ERROR + end. + +controlling_process(Sock, NewPid) -> + gen_tcp:controlling_process(Sock, NewPid). + + +%% Create a listen socket +listen() -> + listen(0). + +listen(Port) when is_integer(Port) andalso (Port >= 0) -> + Opts = [binary, {ip, {0,0,0,0}}, {packet, raw}, {active, false}], + case gen_tcp:listen(Port, Opts) of + {ok, Sock} -> + {ok, Sock}; + {error, _} = ERROR -> + ERROR + end. + + +peername(Sock) -> + inet:peername(Sock). + + +port(Sock) -> + inet:port(Sock). + + +recv(Sock, Length) -> + gen_tcp:recv(Sock, Length). +recv(Sock, Length, Timeout) -> + gen_tcp:recv(Sock, Length, Timeout). + + +send(Sock, Data) -> + gen_tcp:send(Sock, Data). + + +shutdown(Sock, How) -> + gen_tcp:shutdown(Sock, How). + + +sockname(Sock) -> + inet:sockname(Sock). + + +%% ========================================================================== + + + diff --git a/erts/emulator/test/socket_test_ttest_tcp_server.erl b/erts/emulator/test/socket_test_ttest_tcp_server.erl new file mode 100644 index 0000000000..cb503a1feb --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_tcp_server.erl @@ -0,0 +1,678 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +%% ========================================================================== +%% +%% This is the "simple" server using gen_tcp. The server is supposed to be +%% as simple as possible in order to incur as little overhead as possible. +%% +%% There are three ways to run the server: active, passive or active-once. +%% +%% The server does only two things; accept connnections and then reply +%% to requests (actually the handler(s) does that). No timing or counting. +%% That is all done by the clients. +%% +%% ========================================================================== + +-module(socket_test_ttest_tcp_server). + +-export([ + start_monitor/2, + stop/1 + ]). + +-include_lib("kernel/include/inet.hrl"). +-include("socket_test_ttest.hrl"). + +-define(ACC_TIMEOUT, 10000). +-define(RECV_TIMEOUT, 10000). + + +%% ========================================================================== + +start_monitor(Mod, Active) + when is_atom(Mod) andalso is_boolean(Active) orelse (Active =:= once) -> + Self = self(), + ServerInit = fun() -> put(sname, "server"), server_init(Self, Mod, Active) end, + {Pid, MRef} = spawn_monitor(ServerInit), + receive + {'DOWN', MRef, process, Pid, normal} -> + ok; + {'DOWN', MRef, process, Pid, Reason} -> + {error, {exit, Reason}}; + + {?MODULE, Pid, {ok, Port}} -> + erlang:demonitor(MRef, [flush]), + {ok, {Pid, MRef, Port}}; + {?MODULE, Pid, {error, _} = ERROR} -> + erlang:demonitor(MRef, [flush]), + ERROR + end. + + +stop(Pid) when is_pid(Pid) -> + req(Pid, stop). + + +%% ========================================================================== + +server_init(Parent, Mod, Active) -> + i("init -> entry with" + "~n Parent: ~p" + "~n Mod: ~p" + "~n Active: ~p", [Parent, Mod, Active]), + case Mod:listen(0) of + {ok, LSock} -> + case Mod:port(LSock) of + {ok, Port} = OK -> + Addr = which_addr(), % This is just for convenience + i("init -> listening on:" + "~n Addr: ~p (~s)" + "~n Port: ~w" + "~n", [Addr, inet:ntoa(Addr), Port]), + Parent ! {?MODULE, self(), OK}, + server_loop(#{parent => Parent, + mod => Mod, + active => Active, + lsock => LSock, + handlers => [], + %% Accumulation + runtime => 0, + mcnt => 0, + bcnt => 0, + hcnt => 0 + }); + {error, PReason} -> + (catch Mod:close(LSock)), + exit({port, PReason}) + end; + {error, LReason} -> + exit({listen, LReason}) + end. + + +server_loop(State) -> + %% i("loop -> enter"), + server_loop( server_handle_message( server_accept(State) ) ). + +server_accept(#{mod := Mod, + active := Active, + lsock := LSock, + handlers := Handlers} = State) -> + + %% i("server-accept -> entry with" + %% "~n Mod: ~p" + %% "~n Active: ~p" + %% "~n LSock: ~p", [Mod, Active, LSock]), + + case Mod:accept(LSock, ?ACC_TIMEOUT) of + {ok, Sock} -> + %% i("server-accept -> accepted: " + %% "~n Sock: ~p", [Sock]), + i("accepted connection from ~s", + [case Mod:peername(Sock) of + {ok, Peer} -> + format_peername(Peer); + {error, _} -> + "-" + end]), + {Pid, _} = handler_start(), + i("handler ~p started -> try transfer socket control", [Pid]), + case Mod:controlling_process(Sock, Pid) of + ok -> + i("server-accept: handler ~p started", [Pid]), + handler_continue(Pid, Mod, Sock, Active), + Handlers2 = [Pid | Handlers], + State#{handlers => Handlers2}; + {error, CPReason} -> + (catch Mod:close(Sock)), + (catch Mod:close(LSock)), + exit({controlling_process, CPReason}) + end; + {error, timeout} -> + State; + {error, AReason} -> + (catch Mod:close(LSock)), + exit({accept, AReason}) + end. + +format_peername({Addr, Port}) -> + case inet:gethostbyaddr(Addr) of + {ok, #hostent{h_name = N}} -> + f("~s (~s:~w)", [N, inet:ntoa(Addr), Port]); + {error, _} -> + f("~p, ~p", [Addr, Port]) + end. + +server_handle_message(#{parent := Parent, handlers := H} = State) -> + %% i("server_handle_message -> enter"), + receive + {?MODULE, Ref, Parent, stop} -> + reply(Parent, Ref, ok), + lists:foreach(fun(P) -> handler_stop(P) end, H), + exit(normal); + + {'DOWN', _MRef, process, Pid, Reason} -> + server_handle_down(Pid, Reason, State) + + after 0 -> + State + end. + + +server_handle_down(Pid, Reason, #{handlers := Handlers} = State) -> + case lists:delete(Pid, Handlers) of + Handlers -> + i("unknown process ~p died", [Pid]), + State; + Handlers2 -> + server_handle_handler_down(Pid, Reason, State#{handlers => Handlers2}) + end. + + +server_handle_handler_down(Pid, + {done, RunTime, MCnt, BCnt}, + #{runtime := AccRunTime, + mcnt := AccMCnt, + bcnt := AccBCnt, + hcnt := AccHCnt} = State) -> + AccRunTime2 = AccRunTime + RunTime, + AccMCnt2 = AccMCnt + MCnt, + AccBCnt2 = AccBCnt + BCnt, + AccHCnt2 = AccHCnt + 1, + i("handler ~p (~w) done => accumulated results: " + "~n Run Time: ~s ms" + "~n Message Count: ~s" + "~n Byte Count: ~s", + [Pid, AccHCnt2, + format_time(AccRunTime2), + if (AccRunTime2 > 0) -> + f("~w => ~w (~w) msgs / ms", + [AccMCnt2, + AccMCnt2 div AccRunTime2, + (AccMCnt2 div AccHCnt2) div AccRunTime2]); + true -> + f("~w", [AccMCnt2]) + end, + if (AccRunTime2 > 0) -> + f("~w => ~w (~w) bytes / ms", + [AccBCnt2, + AccBCnt2 div AccRunTime2, + (AccBCnt2 div AccHCnt2) div AccRunTime2]); + true -> + f("~w", [AccBCnt2]) + end]), + State#{runtime => AccRunTime2, + mcnt => AccMCnt2, + bcnt => AccBCnt2, + hcnt => AccHCnt2}; +server_handle_handler_down(Pid, Reason, State) -> + i("handler ~p terminated: " + "~n ~p", [Pid, Reason]), + State. + + + +%% ========================================================================== + +handler_start() -> + Self = self(), + HandlerInit = fun() -> put(sname, "handler"), handler_init(Self) end, + spawn_monitor(HandlerInit). + +handler_continue(Pid, Mod, Sock, Active) -> + req(Pid, {continue, Mod, Sock, Active}). + +handler_stop(Pid) -> + req(Pid, stop). + +handler_init(Parent) -> + i("starting"), + receive + {?MODULE, Ref, Parent, {continue, Mod, Sock, Active}} -> + i("received continue"), + reply(Parent, Ref, ok), + handler_initial_activation(Mod, Sock, Active), + handler_loop(#{parent => Parent, + mod => Mod, + sock => Sock, + active => Active, + start => t(), + mcnt => 0, + bcnt => 0, + last_reply => none, + acc => <<>>}) + + after 5000 -> + i("timeout when message queue: " + "~n ~p" + "~nwhen" + "~n Parent: ~p", [process_info(self(), messages), Parent]), + handler_init(Parent) + end. + +handler_loop(State) -> + %% i("handler-loop"), + handler_loop( handler_handle_message( handler_recv_message(State) ) ). + +%% When passive, we read *one* request and then attempt to reply to it. +handler_recv_message(#{mod := Mod, + sock := Sock, + active := false, + mcnt := MCnt, + bcnt := BCnt} = State) -> + %% i("handler_recv_message(false) -> entry"), + case handler_recv_message2(Mod, Sock) of + {ok, {MsgSz, ID, Body}} -> + handler_send_reply(Mod, Sock, ID, Body), + State#{mcnt => MCnt + 1, + bcnt => BCnt + MsgSz, + last_reply => ID}; + {error, closed} -> + handler_done(State); + {error, timeout} -> + State + end; + + +%% When "active" (once or true), we receive one data "message", which may +%% contain any number of requests or only part of a request. Then we +%% process this data together with whatever we had "accumulated" from +%% prevous messages. Each request will be extracted and replied to. If +%% there is some data left, not enough for a complete request, we store +%% this in 'acc' (accumulate it). +handler_recv_message(#{mod := Mod, + sock := Sock, + active := Active, + mcnt := MCnt, + bcnt := BCnt, + last_reply := LID, + acc := Acc} = State) -> + %% i("handler_recv_message(~w) -> entry", [Active]), + case handler_recv_message3(Mod, Sock, Acc, LID) of + {ok, {MCnt2, BCnt2, LID2}, NewAcc} -> + handler_maybe_activate(Mod, Sock, Active), + State#{mcnt => MCnt + MCnt2, + bcnt => BCnt + BCnt2, + last_reply => LID2, + acc => NewAcc}; + + {error, closed} -> + if + (size(Acc) =:= 0) -> + handler_done(State); + true -> + e("client done with partial message: " + "~n Last Reply Sent: ~w" + "~n Message Count: ~w" + "~n Byte Count: ~w" + "~n Partial Message: ~w bytes", + [LID, MCnt, BCnt, size(Acc)]), + exit({closed_with_partial_message, LID}) + end; + + {error, timeout} -> + State + end. + +handler_process_data(Acc, Mod, Sock, LID) -> + handler_process_data(Acc, Mod, Sock, 0, 0, LID). + +%% Extract each complete request, one at a time. +handler_process_data(<<?TTEST_TAG:32, + ?TTEST_TYPE_REQUEST:32, + ID:32, + SZ:32, + Rest/binary>>, + Mod, Sock, + MCnt, BCnt, _LID) when (size(Rest) >= SZ) -> + <<Body:SZ/binary, Rest2/binary>> = Rest, + case handler_send_reply(Mod, Sock, ID, Body) of + ok -> + handler_process_data(Rest2, Mod, Sock, MCnt+1, BCnt+SZ, ID); + {error, _} = ERROR -> + ERROR + end; +handler_process_data(Data, _Mod, _Sock, MCnt, BCnt, LID) -> + {ok, {MCnt, BCnt, LID}, Data}. + + +handler_recv_message2(Mod, Sock) -> + %% i("handler_recv_message2 -> entry"), + case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of + {ok, <<?TTEST_TAG:32, + ?TTEST_TYPE_REQUEST:32, + ID:32, + SZ:32>> = Hdr} -> + %% i("handler_recv_message2 -> got request header: " + %% "~n ID: ~p" + %% "~n SZ: ~p", [ID, SZ]), + case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of + {ok, Body} when (SZ =:= size(Body)) -> + %% i("handler_recv_message2 -> got body"), + {ok, {size(Hdr) + size(Body), ID, Body}}; + {error, BReason} -> + e("failed reading body (~w) of message ~w:" + "~n ~p", [SZ, ID, BReason]), + exit({recv, body, ID, SZ, BReason}) + end; + {error, timeout} = ERROR -> + i("handler_recv_message2 -> timeout"), + ERROR; + {error, closed} = ERROR -> + ERROR; + {error, HReason} -> + e("failed reading header of message:" + "~n ~p", [HReason]), + exit({recv, header, HReason}) + end. + + +handler_recv_message3(Mod, Sock, Acc, LID) -> + receive + {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse + (TagClosed =:= socket_closed) -> + {error, closed}; + + {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse + (TagErr =:= socket_error) -> + {error, Reason}; + + {Tag, Sock, Msg} when (Tag =:= tcp) orelse + (Tag =:= socket) -> + handler_process_data(<<Acc/binary, Msg/binary>>, Mod, Sock, LID) + + after ?RECV_TIMEOUT -> + {error, timeout} + end. + + + +%% %% Socket in passive mode => we need to read explicitly +%% handler_loop(#{sock := Sock, active := false} = State, MCnt, BCnt) -> +%% %% i("try recv msg header"), +%% MsgSz = +%% case gen_tcp:recv(Sock, 4*4) of +%% {ok, <<?SOCKET_SIMPLE_TAG:32, +%% ?SOCKET_SIMPLE_TYPE_REQUEST:32, +%% ID:32, +%% SZ:32>> = Hdr} -> +%% %% i("msg ~w header received (data sz is ~w bytes)", [ID, SZ]), +%% case gen_tcp:recv(Sock, SZ) of +%% {ok, Data} when (size(Data) =:= SZ) -> +%% handler_send_reply(Sock, ID, Data), +%% size(Hdr)+SZ; +%% {ok, InvData} -> +%% i("invalid data"), +%% (catch gen_tcp:close(Sock)), +%% exit({invalid_data, SZ, size(InvData)}); +%% {error, Reason2} -> +%% i("Data read failed: ~p", [Reason2]), +%% (catch gen_tcp:close(Sock)), +%% exit({recv_data, Reason2}) +%% end; +%% {ok, _InvHdr} -> +%% (catch gen_tcp:close(Sock)), +%% exit(invalid_hdr); +%% {error, closed} -> +%% i("we are done: " +%% "~n Message Count: ~p" +%% "~n Byte Count: ~p", [MCnt, BCnt]), +%% exit(normal); +%% {error, Reason1} -> +%% i("Header read failed: ~p", [Reason1]), +%% (catch gen_tcp:close(Sock)), +%% exit({recv_hdr, Reason1}) +%% end, +%% handler_loop(State, MCnt+1, BCnt+MsgSz); + +%% %% Socket in active mode (once | true) => data messages arrive +%% handler_loop(#{sock := Sock, active := Active} = State, MCnt, BCnt) -> +%% %% i("await msg"), +%% try handler_recv_request(Sock, Active) of +%% {ID, Data, MsgSz} -> +%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]), +%% handler_send_reply(Sock, ID, Data), +%% handler_maybe_activate(Sock, Active), +%% handler_loop(State, MCnt+1, BCnt+MsgSz) +%% catch +%% throw:tcp_closed -> +%% i("we are done: " +%% "~n Message Count: ~p" +%% "~n Byte Count: ~p", [MCnt, BCnt]), +%% exit(normal); +%% throw:{tcp_error, Reason} -> +%% i("<ERROR> TCP error ~p when: " +%% "~n Message Count: ~p" +%% "~n Byte Count: ~p", [Reason, MCnt, BCnt]), +%% exit({tcp_error, Reason}) +%% end. +%% %% {ID, Data, MsgSz} = handler_recv_request(Sock, Active), +%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]), +%% %% handler_send_reply(Sock, ID, Data), +%% %% handler_maybe_activate(Sock, Active), +%% %% handler_loop(State, MCnt+1, BCnt+MsgSz). + + +%% handler_recv_request(Sock, Active) -> +%% %% In theory we should also be ready for a partial header, +%% %% but I can't be bothered... +%% receive +%% {tcp_closed, Sock} -> +%% throw(tcp_closed); +%% {tcp_error, Sock, Reason} -> +%% throw({tcp_error, Reason}); +%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32, +%% ?SOCKET_SIMPLE_TYPE_REQUEST:32, +%% ID:32, +%% SZ:32, +%% Data/binary>> = Msg} when (size(Data) =:= SZ) -> +%% %% i("[complete] msg ~w received (data sz is ~w bytes)", [ID, SZ]), +%% {ID, Data, size(Msg)}; +%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32, +%% ?SOCKET_SIMPLE_TYPE_REQUEST:32, +%% ID:32, +%% SZ:32, +%% Data/binary>> = Msg} when (size(Data) < SZ) -> +%% %% i("[incomplete] msg ~w received (data sz is ~w bytes)", [ID, SZ]), +%% handler_recv_request_data(Sock, Active, ID, SZ, Data, size(Msg)) +%% end. + +%% handler_recv_request_data(Sock, Active, ID, SZ, AccData, AccMsgSz) -> +%% handler_maybe_activate(Sock, Active), +%% receive +%% {tcp_closed, Sock} -> +%% %% i("we are done (incomplete data)"), +%% throw(tcp_closed); +%% {tcp_error, Sock, Reason} -> +%% throw({tcp_error, Reason}); +%% {tcp, Sock, Data} when (SZ =:= (size(AccData) + size(Data))) -> +%% %% i("[complete] received the remaining data (~w bytes) for msg ~w", +%% %% [size(Data), ID]), +%% {ID, <<AccData/binary, Data/binary>>, AccMsgSz+size(Data)}; +%% {tcp, Sock, Data} -> +%% %% i("[incomplete] received ~w bytes of data for for msg ~w", +%% %% [size(Data), ID]), +%% handler_recv_request_data(Sock, Active, ID, SZ, +%% <<AccData/binary, Data/binary>>, +%% AccMsgSz+size(Data)) +%% end. + +handler_send_reply(Mod, Sock, ID, Data) -> + SZ = size(Data), + Msg = <<?TTEST_TAG:32, + ?TTEST_TYPE_REPLY:32, + ID:32, + SZ:32, + Data/binary>>, + %% i("handler-send-reply -> try send reply ~w: ~w bytes", [ID, size(Msg)]), + case Mod:send(Sock, Msg) of + ok -> + %% i("handler-send-reply -> reply ~w (~w bytes) sent", [ID, size(Msg)]), + ok; + {error, Reason} -> + (catch Mod:close(Sock)), + exit({send, Reason}) + end. + + +handler_done(State) -> + handler_done(State, t()). + +handler_done(#{start := Start, + mod := Mod, + sock := Sock, + mcnt := MCnt, + bcnt := BCnt}, Stop) -> + (catch Mod:close(Sock)), + exit({done, tdiff(Start, Stop), MCnt, BCnt}). + + +handler_handle_message(#{parent := Parent} = State) -> + receive + {'EXIT', Parent, Reason} -> + exit({parent_exit, Reason}) + after 0 -> + State + end. + + +handler_initial_activation(_Mod, _Sock, false = _Active) -> + ok; +handler_initial_activation(Mod, Sock, Active) -> + Mod:active(Sock, Active). + + +handler_maybe_activate(Mod, Sock, once = Active) -> + Mod:active(Sock, Active); +handler_maybe_activate(_, _, _) -> + ok. + + + +%% ========================================================================== + +which_addr() -> + case inet:getifaddrs() of + {ok, IfAddrs} -> + which_addrs(inet, IfAddrs); + {error, Reason} -> + exit({getifaddrs, Reason}) + end. + +which_addrs(_Family, []) -> + exit({getifaddrs, not_found}); +which_addrs(Family, [{"lo", _} | IfAddrs]) -> + %% Skip + which_addrs(Family, IfAddrs); +which_addrs(Family, [{"docker" ++ _, _} | IfAddrs]) -> + %% Skip docker + which_addrs(Family, IfAddrs); +which_addrs(Family, [{"br-" ++ _, _} | IfAddrs]) -> + %% Skip docker + which_addrs(Family, IfAddrs); +which_addrs(Family, [{"en" ++ _, IfOpts} | IfAddrs]) -> + %% Maybe take this one + case which_addr(Family, IfOpts) of + {ok, Addr} -> + Addr; + error -> + which_addrs(Family, IfAddrs) + end; +which_addrs(Family, [{_IfName, IfOpts} | IfAddrs]) -> + case which_addr(Family, IfOpts) of + {ok, Addr} -> + Addr; + error -> + which_addrs(Family, IfAddrs) + end. + +which_addr(_, []) -> + error; +which_addr(inet, [{addr, Addr}|_]) + when is_tuple(Addr) andalso (size(Addr) =:= 4) -> + {ok, Addr}; +which_addr(inet6, [{addr, Addr}|_]) + when is_tuple(Addr) andalso (size(Addr) =:= 8) -> + {ok, Addr}; +which_addr(Family, [_|IfOpts]) -> + which_addr(Family, IfOpts). + + +%% ========================================================================== + +req(Pid, Req) -> + Ref = make_ref(), + Pid ! {?MODULE, Ref, self(), Req}, + receive + {'EXIT', Pid, Reason} -> + {error, {exit, Reason}}; + {?MODULE, Ref, Reply} -> + Reply + end. + +reply(Pid, Ref, Reply) -> + Pid ! {?MODULE, Ref, Reply}. + + +%% ========================================================================== + +t() -> + os:timestamp(). + +tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> + T1 = A1*1000000000+B1*1000+(C1 div 1000), + T2 = A2*1000000000+B2*1000+(C2 div 1000), + T2 - T1. + +formated_timestamp() -> + format_timestamp(os:timestamp()). + +format_timestamp({_N1, _N2, N3} = TS) -> + {_Date, Time} = calendar:now_to_local_time(TS), + {Hour,Min,Sec} = Time, + FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", + [Hour, Min, Sec, round(N3/1000)]), + lists:flatten(FormatTS). + +%% Time is always in number os ms (milli seconds) +format_time(T) -> + f("~p", [T]). + + +%% ========================================================================== + +f(F, A) -> + lists:flatten(io_lib:format(F, A)). + +e(F, A) -> + p(get(sname), "<ERROR> " ++ F, A). + +i(F) -> + i(F, []). + +i(F, A) -> + p(get(sname), "<INFO> " ++ F, A). + +p(undefined, F, A) -> + p("- ", F, A); +p(Prefix, F, A) -> + io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). + diff --git a/erts/emulator/test/socket_test_ttest_tcp_server_gen.erl b/erts/emulator/test/socket_test_ttest_tcp_server_gen.erl new file mode 100644 index 0000000000..53a9dc7d2a --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_tcp_server_gen.erl @@ -0,0 +1,34 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_ttest_tcp_server_gen). + +-export([ + start_monitor/1, + stop/1 + ]). + +-define(TRANSPORT_MOD, socket_test_ttest_tcp_gen). + +start_monitor(Active) -> + socket_test_ttest_tcp_server:start_monitor(?TRANSPORT_MOD, Active). + +stop(Pid) -> + socket_test_ttest_tcp_server:stop(Pid). diff --git a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl new file mode 100644 index 0000000000..de9df857fe --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl @@ -0,0 +1,34 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_ttest_tcp_server_socket). + +-export([ + start_monitor/1, + stop/1 + ]). + +-define(TRANSPORT_MOD, socket_test_ttest_tcp_socket). + +start_monitor(Active) -> + socket_test_ttest_tcp_server:start_monitor(?TRANSPORT_MOD, Active). + +stop(Pid) -> + socket_test_ttest_tcp_server:stop(Pid). diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl new file mode 100644 index 0000000000..12d9e052d7 --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -0,0 +1,345 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_ttest_tcp_socket). + +-export([ + accept/1, accept/2, + active/2, + close/1, + connect/2, + controlling_process/2, + listen/0, listen/1, + port/1, + peername/1, + recv/2, recv/3, + send/2, + shutdown/2, + sockname/1 + ]). + + +-define(READER_RECV_TIMEOUT, 1000). + + +%% ========================================================================== + +accept(#{sock := LSock}) -> + case socket:accept(LSock) of + {ok, Sock} -> + Self = self(), + Reader = spawn(fun() -> reader_init(Self, Sock, false) end), + {ok, #{sock => Sock, reader => Reader}}; + {error, _} = ERROR -> + ERROR + end. + +accept(#{sock := LSock}, Timeout) -> + case socket:accept(LSock, Timeout) of + {ok, Sock} -> + Self = self(), + Reader = spawn(fun() -> reader_init(Self, Sock, false) end), + {ok, #{sock => Sock, reader => Reader}}; + {error, _} = ERROR -> + ERROR + end. + + +active(#{reader := Pid}, NewActive) + when (is_boolean(NewActive) orelse (NewActive =:= once)) -> + Pid ! {?MODULE, active, NewActive}, + ok. + + +close(#{sock := Sock, reader := Pid}) -> + Pid ! {?MODULE, stop}, + socket:close(Sock). + +%% Create a socket and connect it to a peer +connect(Addr, Port) -> + try + begin + Sock = + case socket:open(inet, stream, tcp) of + {ok, S} -> + S; + {error, OReason} -> + throw({error, {open, OReason}}) + end, + case socket:bind(Sock, any) of + {ok, _} -> + ok; + {error, BReason} -> + (catch socket:close(Sock)), + throw({error, {bind, BReason}}) + end, + SA = #{family => inet, + addr => Addr, + port => Port}, + case socket:connect(Sock, SA) of + ok -> + ok; + {error, CReason} -> + (catch socket:close(Sock)), + throw({error, {connect, CReason}}) + end, + Self = self(), + Reader = spawn(fun() -> reader_init(Self, Sock, false) end), + {ok, #{sock => Sock, reader => Reader}} + end + catch + throw:ERROR:_ -> + ERROR + end. + + +controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> + case socket:setopt(Sock, otp, controlling_process, NewPid) of + ok -> + Pid ! {?MODULE, self(), controlling_process, NewPid}, + receive + {?MODULE, Pid, controlling_process} -> + ok + end; + {error, _} = ERROR -> + ERROR + end. + + +%% Create a listen socket +listen() -> + listen(0). + +listen(Port) when is_integer(Port) andalso (Port >= 0) -> + try + begin + Sock = case socket:open(inet, stream, tcp) of + {ok, S} -> + S; + {error, OReason} -> + throw({error, {open, OReason}}) + end, + SA = #{family => inet, + port => Port}, + case socket:bind(Sock, SA) of + {ok, _} -> + ok; + {error, BReason} -> + (catch socket:close(Sock)), + throw({error, {bind, BReason}}) + end, + case socket:listen(Sock) of + ok -> + ok; + {error, LReason} -> + (catch socket:close(Sock)), + throw({error, {listen, LReason}}) + end, + {ok, #{sock => Sock}} + end + catch + throw:{error, Reason}:_ -> + {error, Reason} + end. + + +port(#{sock := Sock}) -> + case socket:sockname(Sock) of + {ok, #{port := Port}} -> + {ok, Port}; + {error, _} = ERROR -> + ERROR + end. + + +peername(#{sock := Sock}) -> + case socket:peername(Sock) of + {ok, #{addr := Addr, port := Port}} -> + {ok, {Addr, Port}}; + {error, _} = ERROR -> + ERROR + end. + + +recv(#{sock := Sock}, Length) -> + socket:recv(Sock, Length). +recv(#{sock := Sock}, Length, Timeout) -> + socket:recv(Sock, Length, Timeout). + + +send(#{sock := Sock}, Length) -> + socket:send(Sock, Length). + + +shutdown(#{sock := Sock}, How) -> + socket:shutdown(Sock, How). + + +sockname(#{sock := Sock}) -> + case socket:sockname(Sock) of + {ok, #{addr := Addr, port := Port}} -> + {ok, {Addr, Port}}; + {error, _} = ERROR -> + ERROR + end. + + +%% ========================================================================== + +reader_init(ControllingProcess, Sock, Active) + when is_pid(ControllingProcess) andalso + (is_boolean(Active) orelse (Active =:= once)) -> + MRef = erlang:monitor(process, ControllingProcess), + reader_loop(#{ctrl_proc => ControllingProcess, + ctrl_proc_mref => MRef, + active => Active, + sock => Sock}). + + +%% Never read +reader_loop(#{active := false, + ctrl_proc := Pid} = State) -> + receive + {?MODULE, stop} -> + exit(normal); + + {?MODULE, Pid, controlling_process, NewPid} -> + MRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(MRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef when (Reason =:= normal) -> + exit(normal); + MRef -> + exit({controlling_process, Reason}); + _ -> + reader_loop(State) + end + end; + +%% Read *once* and then change to false +reader_loop(#{active := once, + sock := Sock, + ctrl_proc := Pid} = State) -> + case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + {ok, Data} -> + Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + reader_loop(State#{active => false}); + {error, timeout} -> + receive + {?MODULE, stop} -> + exit(normal); + + {?MODULE, Pid, controlling_process, NewPid} -> + MRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(MRef, [flush]), + MRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => MRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef when (Reason =:= normal) -> + exit(normal); + MRef -> + exit({controlling_process, Reason}); + _ -> + reader_loop(State) + end + after 0 -> + reader_loop(State) + end; + + {error, closed} -> + Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + exit(normal); + + {error, Reason} -> + Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + exit(Reason) + end; + +%% Read and forward data continuously +reader_loop(#{active := true, + sock := Sock, + ctrl_proc := Pid} = State) -> + case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + {ok, Data} -> + Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + reader_loop(State); + {error, timeout} -> + receive + {?MODULE, stop} -> + exit(normal); + + {?MODULE, Pid, controlling_process, NewPid} -> + MRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(MRef, [flush]), + MRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => MRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef when (Reason =:= normal) -> + exit(normal); + MRef -> + exit({controlling_process, Reason}); + _ -> + reader_loop(State) + end + after 0 -> + reader_loop(State) + end; + + {error, closed} -> + Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + exit(normal); + + {error, Reason} -> + Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + exit(Reason) + end. + + + + + + +%% ========================================================================== + + + |