aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/nifs/common/socket_nif.c281
-rw-r--r--erts/emulator/test/Makefile13
-rw-r--r--erts/emulator/test/socket_SUITE.erl82
-rw-r--r--erts/emulator/test/socket_test_ttest.hrl32
-rw-r--r--erts/emulator/test/socket_test_ttest_client.hrl141
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_client.erl617
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_client_gen.erl45
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_client_socket.erl45
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_gen.erl123
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_server.erl678
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_server_gen.erl34
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_server_socket.erl34
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_socket.erl345
13 files changed, 2299 insertions, 171 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index cdd9f0fb15..3da895e644 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -436,7 +436,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL;
#define SOCKET_RECV_FLAG_LOW SOCKET_RECV_FLAG_CMSG_CLOEXEC
#define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_TRUNC
-#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048
+#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 8192
#define SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT 1024
#define SOCKET_SEND_CTRL_BUFFER_SIZE_DEFAULT 1024
@@ -775,7 +775,7 @@ typedef struct {
ErlNifEnv* env;
/* +++ Controller (owner) process +++ */
- ErlNifPid ctrlPid;
+ ErlNifPid ctrlPid;
// ErlNifMonitor ctrlMon;
ESockMonitor ctrlMon;
@@ -5232,7 +5232,6 @@ ERL_NIF_TERM nsend(ErlNifEnv* env,
else
save_errno = -1; // The value does not actually matter in this case
-
return send_check_result(env, descP,
written, sndDataP->size, save_errno, sendRef);
@@ -6185,8 +6184,10 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
int type = descP->type;
int protocol = descP->protocol;
- SSDBG( descP, ("SOCKET", "nclose -> [%d] entry (0x%lX, 0x%lX, 0x%lX)\r\n",
+ SSDBG( descP, ("SOCKET",
+ "nclose -> [%d] entry (0x%lX, 0x%lX, 0x%lX, 0x%lX)\r\n",
descP->sock,
+ descP->state,
descP->currentWriterP,
descP->currentReaderP,
descP->currentAcceptorP) );
@@ -6246,7 +6247,6 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "nclose -> [%d] stop was called\r\n", descP->sock) );
dec_socket(domain, type, protocol);
- DEMONP("nclose -> closer", env, descP, &descP->closerMon);
reply = esock_atom_ok;
} else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) {
/* The stop callback function has been *scheduled* which means that we
@@ -6284,8 +6284,9 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "nclose -> [%d] done when: "
+ "\r\n state: 0x%lX"
"\r\n reply: %T"
- "\r\n", descP->sock, reply) );
+ "\r\n", descP->sock, descP->state, reply) );
return reply;
}
@@ -16920,8 +16921,29 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
MLOCK(descP->accMtx);
MLOCK(descP->closeMtx);
- SSDBG( descP, ("SOCKET", "socket_stop -> all mutex(s) locked\r\n") );
-
+ SSDBG( descP, ("SOCKET", "socket_stop -> "
+ "[%d, %T] all mutex(s) locked when counters:"
+ "\r\n writePkgCnt: %u"
+ "\r\n writeByteCnt: %u"
+ "\r\n writeTries: %u"
+ "\r\n writeWaits: %u"
+ "\r\n writeFails: %u"
+ "\r\n readPkgCnt: %u"
+ "\r\n readByteCnt: %u"
+ "\r\n readTries: %u"
+ "\r\n readWaits: %u"
+ "\r\n",
+ descP->sock, descP->ctrlPid,
+ descP->writePkgCnt,
+ descP->writeByteCnt,
+ descP->writeTries,
+ descP->writeWaits,
+ descP->writeFails,
+ descP->readPkgCnt,
+ descP->readByteCnt,
+ descP->readTries,
+ descP->readWaits) );
+
descP->state = SOCKET_STATE_CLOSING; // Just in case...???
descP->isReadable = FALSE;
descP->isWritable = FALSE;
@@ -17063,7 +17085,8 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* </KOLLA>
*/
- if (descP->closeLocal) {
+ if (descP->closeLocal &&
+ !is_direct_call) {
/* +++ send close message to the waiting process +++
*
@@ -17073,13 +17096,18 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
*
* WHAT HAPPENS IF THE RECEIVER HAS DIED IN THE MEANTIME????
*
+ * Also, we should really *always* use a tag unique to this
+ * (nif-) module. Some like (in this case):
+ *
+ * {'$socket', close, CloseRef}
+ *
* </KOLLA>
*/
- send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
+ send_msg(env,
+ MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
- DEMONP("socket_stop -> closer",
- env, descP, &descP->closerMon);
+ DEMONP("socket_stop -> closer", env, descP, &descP->closerMon);
} else {
@@ -17218,137 +17246,144 @@ void socket_down(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "socket_down -> OTHER mon\r\n") );
}
*/
-
- if (compare_pids(env, &descP->ctrlPid, pid)) {
- int selectRes;
-
- /* We don't bother with the queue cleanup here -
- * we leave it to the stop callback function.
- */
-
- SSDBG( descP,
- ("SOCKET", "socket_down -> controlling process exit\r\n") );
- descP->state = SOCKET_STATE_CLOSING;
- descP->closeLocal = TRUE;
- descP->closerPid = *pid;
- descP->closerMon = (ESockMonitor) *mon;
- descP->closeRef = MKREF(env); // Do we really need this in this case?
-
- /*
- esock_dbg_printf("DOWN",
- "[%d] select stop %u,%u,%u,%d\r\n",
- descP->sock,
- monP->raw[0], monP->raw[1],
- monP->raw[2], monP->raw[3]);
- */
+ if (!IS_CLOSED(descP)) {
+ if (compare_pids(env, &descP->ctrlPid, pid)) {
+ int selectRes;
- selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP),
- descP, NULL, descP->closeRef);
+ /* We don't bother with the queue cleanup here -
+ * we leave it to the stop callback function.
+ */
- if (selectRes & ERL_NIF_SELECT_STOP_CALLED) {
- /* We are done - wwe can finalize (socket close) directly */
SSDBG( descP,
- ("SOCKET", "socket_down -> [%d] stop called\r\n", descP->sock) );
- dec_socket(descP->domain, descP->type, descP->protocol);
- descP->state = SOCKET_STATE_CLOSED;
-
- /* And finally close the socket.
- * Since we close the socket because of an exiting owner,
- * we do not need to wait for buffers to sync (linger).
- * If the owner wish to ensure the buffer are written,
- * it should have closed teh socket explicitly...
- */
- if (sock_close(descP->sock) != 0) {
- int save_errno = sock_errno();
-
- esock_warning_msg("Failed closing socket for terminating "
- "controlling process: "
- "\r\n Controlling Process: %T"
- "\r\n Descriptor: %d"
- "\r\n Errno: %d"
- "\r\n", pid, descP->sock, save_errno);
- }
- sock_close_event(descP->event);
+ ("SOCKET", "socket_down -> controlling process exit\r\n") );
+
+ descP->state = SOCKET_STATE_CLOSING;
+ descP->closeLocal = TRUE;
+ descP->closerPid = *pid;
+ descP->closerMon = (ESockMonitor) *mon;
+ descP->closeRef = MKREF(env); // Do we really need this in this case?
- descP->sock = INVALID_SOCKET;
- descP->event = INVALID_EVENT;
+ /*
+ esock_dbg_printf("DOWN",
+ "[%d] select stop %u,%u,%u,%d\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
- descP->state = SOCKET_STATE_CLOSED;
+ selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP),
+ descP, NULL, descP->closeRef);
+
+ if (selectRes & ERL_NIF_SELECT_STOP_CALLED) {
+ /* We are done - we can finalize (socket close) directly */
+ SSDBG( descP,
+ ("SOCKET",
+ "socket_down -> [%d] stop called\r\n", descP->sock) );
+ dec_socket(descP->domain, descP->type, descP->protocol);
+ descP->state = SOCKET_STATE_CLOSED;
+
+ /* And finally close the socket.
+ * Since we close the socket because of an exiting owner,
+ * we do not need to wait for buffers to sync (linger).
+ * If the owner wish to ensure the buffer are written,
+ * it should have closed the socket explicitly...
+ */
+ if (sock_close(descP->sock) != 0) {
+ int save_errno = sock_errno();
+
+ esock_warning_msg("Failed closing socket for terminating "
+ "controlling process: "
+ "\r\n Controlling Process: %T"
+ "\r\n Descriptor: %d"
+ "\r\n Errno: %d"
+ "\r\n", pid, descP->sock, save_errno);
+ }
+ sock_close_event(descP->event);
- } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) {
- /* The stop callback function has been *scheduled* which means that
- * "should" wait for it to complete. But since we are in a callback
- * (down) function, we cannot...
- * So, we must close the socket
- */
- SSDBG( descP,
- ("SOCKET",
- "socket_down -> [%d] stop scheduled\r\n", descP->sock) );
- dec_socket(descP->domain, descP->type, descP->protocol);
+ descP->sock = INVALID_SOCKET;
+ descP->event = INVALID_EVENT;
- /* And now what? We can't wait for the stop function here...
- * So, we simply close it here and leave the rest of the "close"
- * for later (when the stop function actually gets called...
- */
+ descP->state = SOCKET_STATE_CLOSED;
- if (sock_close(descP->sock) != 0) {
- int save_errno = sock_errno();
-
- esock_warning_msg("Failed closing socket for terminating "
- "controlling process: "
+ } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) {
+ /* The stop callback function has been *scheduled* which means
+ * that "should" wait for it to complete. But since we are in
+ * a callback (down) function, we cannot...
+ * So, we must close the socket
+ */
+ SSDBG( descP,
+ ("SOCKET",
+ "socket_down -> [%d] stop scheduled\r\n",
+ descP->sock) );
+ dec_socket(descP->domain, descP->type, descP->protocol);
+
+ /* And now what? We can't wait for the stop function here...
+ * So, we simply close it here and leave the rest of the "close"
+ * for later (when the stop function actually gets called...
+ */
+
+ if (sock_close(descP->sock) != 0) {
+ int save_errno = sock_errno();
+
+ esock_warning_msg("Failed closing socket for terminating "
+ "controlling process: "
+ "\r\n Controlling Process: %T"
+ "\r\n Descriptor: %d"
+ "\r\n Errno: %d"
+ "\r\n", pid, descP->sock, save_errno);
+ }
+ sock_close_event(descP->event);
+
+ } else {
+
+ /*
+ * <KOLLA>
+ *
+ * WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET,
+ * SO WE DON'T LET STUFF LEAK.
+ * NOW, BECAUSE WE FAILED TO SELECT, WE CANNOT FINISH
+ * THE CLOSE, WHAT TO DO? ABORT?
+ *
+ * </KOLLA>
+ */
+ esock_warning_msg("Failed selecting stop when handling down "
+ "of controlling process: "
+ "\r\n Select Res: %d"
"\r\n Controlling Process: %T"
"\r\n Descriptor: %d"
- "\r\n Errno: %d"
- "\r\n", pid, descP->sock, save_errno);
+ "\r\n Monitor: %u.%u.%u.%u"
+ "\r\n", selectRes, pid, descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
}
- sock_close_event(descP->event);
-
+
} else {
- /*
- * <KOLLA>
+ /* check all operation queue(s): acceptor, writer and reader.
*
- * WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET,
- * SO WE DON'T LET STUFF LEAK.
- * NOW, BECAUSE WE FAILED TO SELECT, WE CANNOT FINISH
- * THE CLOSE, WHAT TO DO? ABORT?
+ * Is it really any point in doing this if the socket is closed?
*
- * </KOLLA>
*/
- esock_warning_msg("Failed selecting stop when handling down "
- "of controlling process: "
- "\r\n Select Res: %d"
- "\r\n Controlling Process: %T"
- "\r\n Descriptor: %d"
- "\r\n Monitor: %u.%u.%u.%u"
- "\r\n", selectRes, pid, descP->sock,
- monP->raw[0], monP->raw[1],
- monP->raw[2], monP->raw[3]);
- }
-
- } else {
-
- /* check all operation queue(s): acceptor, writer and reader. */
-
- SSDBG( descP, ("SOCKET", "socket_down -> other process term\r\n") );
+ SSDBG( descP, ("SOCKET", "socket_down -> other process term\r\n") );
- MLOCK(descP->accMtx);
- if (descP->currentAcceptorP != NULL)
- socket_down_acceptor(env, descP, pid);
- MUNLOCK(descP->accMtx);
-
- MLOCK(descP->writeMtx);
- if (descP->currentWriterP != NULL)
- socket_down_writer(env, descP, pid);
- MUNLOCK(descP->writeMtx);
-
- MLOCK(descP->readMtx);
- if (descP->currentReaderP != NULL)
- socket_down_reader(env, descP, pid);
- MUNLOCK(descP->readMtx);
+ MLOCK(descP->accMtx);
+ if (descP->currentAcceptorP != NULL)
+ socket_down_acceptor(env, descP, pid);
+ MUNLOCK(descP->accMtx);
+
+ MLOCK(descP->writeMtx);
+ if (descP->currentWriterP != NULL)
+ socket_down_writer(env, descP, pid);
+ MUNLOCK(descP->writeMtx);
+ MLOCK(descP->readMtx);
+ if (descP->currentReaderP != NULL)
+ socket_down_reader(env, descP, pid);
+ MUNLOCK(descP->readMtx);
+
+ }
}
/*
diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile
index 22c2f24a80..a910588381 100644
--- a/erts/emulator/test/Makefile
+++ b/erts/emulator/test/Makefile
@@ -34,6 +34,14 @@ SOCKET_MODULES = \
socket_client \
socket_test_lib \
socket_test_evaluator \
+ socket_test_ttest_tcp_gen \
+ socket_test_ttest_tcp_socket \
+ socket_test_ttest_tcp_client \
+ socket_test_ttest_tcp_client_gen \
+ socket_test_ttest_tcp_client_socket \
+ socket_test_ttest_tcp_server \
+ socket_test_ttest_tcp_server_gen \
+ socket_test_ttest_tcp_server_socket \
socket_SUITE
MODULES= \
@@ -158,7 +166,10 @@ NATIVE_MODULES= $(NATIVE:%=%_native_SUITE)
NATIVE_ERL_FILES= $(NATIVE_MODULES:%=%.erl)
ERL_FILES= $(MODULES:%=%.erl)
-HRL_FILES= socket_test_evaluator.hrl
+HRL_FILES= \
+ socket_test_evaluator.hrl \
+ socket_test_ttest.hrl \
+ socket_test_ttest_client.hrl
TARGET_FILES = $(MODULES:%=$(EBIN)/%.$(EMULATOR))
SOCKET_TARGETS = $(SOCKET_MODULES:%=$(EBIN)/%.$(EMULATOR))
diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl
index f2aace40de..71e32f8e95 100644
--- a/erts/emulator/test/socket_SUITE.erl
+++ b/erts/emulator/test/socket_SUITE.erl
@@ -1980,7 +1980,7 @@ api_toc_tcp_client(Parent, GL) ->
{To, ConLimit} = api_toc_tcp_client_await_continue(Parent, connect),
Result = api_to_connect_tcp_await_timeout(To, ServerSA, Domain, ConLimit),
api_toc_tcp_client_announce_ready(Parent, connect, Result),
- Reason = sc_rs_tcp_client_await_terminate(Parent),
+ Reason = api_toc_tcp_client_await_terminate(Parent),
exit(Reason).
api_toc_tcp_client_init(Parent, GL) ->
@@ -2057,7 +2057,7 @@ api_to_connect_tcp_await_timeout(ID, ConLimit, To, ServerSA, NewSock, Acc) ->
ERROR
end.
-api_to_connect_tcp_await_timeout2(ID, To, ServerSA, NewSock) ->
+api_to_connect_tcp_await_timeout2(_ID, To, ServerSA, NewSock) ->
Sock = NewSock(),
%% ?SEV_IPRINT("~w: try connect", [ID]),
Start = t(),
@@ -10157,37 +10157,6 @@ tpp_udp_sock_close(Sock) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-%% This gets the local address (not 127.0...)
-%% We should really implement this using the (new) net module,
-%% but until that gets the necessary functionality...
-which_local_addr(Domain) ->
- case inet:getifaddrs() of
- {ok, IFL} ->
- which_addr(Domain, IFL);
- {error, Reason} ->
- ?FAIL({inet, getifaddrs, Reason})
- end.
-
-which_addr(_Domain, []) ->
- ?FAIL(no_address);
-which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") ->
- which_addr2(Domain, IFO);
-which_addr(Domain, [_|IFL]) ->
- which_addr(Domain, IFL).
-
-which_addr2(_Domain, []) ->
- ?FAIL(no_address);
-which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) ->
- Addr;
-which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) ->
- Addr;
-which_addr2(Domain, [_|IFO]) ->
- which_addr2(Domain, IFO).
-
-
-
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
start_node(Host, NodeName) ->
UniqueNodeName = f("~w_~w", [NodeName, erlang:system_time(millisecond)]),
case do_start_node(Host, UniqueNodeName) of
@@ -10335,6 +10304,25 @@ which_local_addr(Domain) ->
?FAIL({inet, getifaddrs, Reason})
end.
+which_addr(_Domain, []) ->
+ ?FAIL(no_address);
+which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") ->
+ which_addr2(Domain, IFO);
+which_addr(Domain, [_|IFL]) ->
+ which_addr(Domain, IFL).
+
+which_addr2(_Domain, []) ->
+ ?FAIL(no_address);
+which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) ->
+ Addr;
+which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) ->
+ Addr;
+which_addr2(Domain, [_|IFO]) ->
+ which_addr2(Domain, IFO).
+
+
+
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -10470,21 +10458,21 @@ f(F, A) ->
%% i(Before ++ FStr ++ After, []).
-d(F, A) ->
- d(get(dbg_fd), F, A).
+%% d(F, A) ->
+%% d(get(dbg_fd), F, A).
-d(undefined, F, A) ->
- [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]),
- DbgFileName = f("~s-dbg.txt", [NodeNameStr]),
- case file:open(DbgFileName, [write]) of
- {ok, FD} ->
- put(dbg_fd, FD),
- d(FD, F, A);
- {error, Reason} ->
- exit({failed_open_dbg_file, Reason})
- end;
-d(FD, F, A) ->
- io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]).
+%% d(undefined, F, A) ->
+%% [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]),
+%% DbgFileName = f("~s-dbg.txt", [NodeNameStr]),
+%% case file:open(DbgFileName, [write]) of
+%% {ok, FD} ->
+%% put(dbg_fd, FD),
+%% d(FD, F, A);
+%% {error, Reason} ->
+%% exit({failed_open_dbg_file, Reason})
+%% end;
+%% d(FD, F, A) ->
+%% io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]).
i(F) ->
i(F, []).
diff --git a/erts/emulator/test/socket_test_ttest.hrl b/erts/emulator/test/socket_test_ttest.hrl
new file mode 100644
index 0000000000..1a004a9a7a
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest.hrl
@@ -0,0 +1,32 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-ifndef(socket_test_ttest).
+-define(socket_test_ttest, true).
+
+-define(TTEST_TAG, 42).
+-define(TTEST_TYPE_REQUEST, 101).
+-define(TTEST_TYPE_REPLY, 102).
+
+-define(SECS(I), timer:seconds(I)).
+
+-define(SLEEP(T), receive after T -> ok end).
+
+-endif. % -ifdef(socket_test_ttest).
diff --git a/erts/emulator/test/socket_test_ttest_client.hrl b/erts/emulator/test/socket_test_ttest_client.hrl
new file mode 100644
index 0000000000..84e736cc34
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_client.hrl
@@ -0,0 +1,141 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-ifndef(socket_test_ttest_client).
+-define(socket_test_ttest_client, true).
+
+-define(MSG_ID_DEFAULT, 2).
+-define(RUNTIME_DEFAULT, ?SECS(10)).
+-define(MAX_ID, 16#FFFFFFFF).
+
+-define(MSG_DATA1, <<"This is test data 0123456789 0123456789 0123456789">>).
+-define(MSG_DATA2, <<"This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789">>).
+-define(MSG_DATA3, <<"This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789"
+ "This is test data 0123456789 0123456789 0123456789">>).
+
+
+-endif. % -ifdef(socket_test_ttest_client).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_client.erl b/erts/emulator/test/socket_test_ttest_tcp_client.erl
new file mode 100644
index 0000000000..1bd1bc54e9
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_tcp_client.erl
@@ -0,0 +1,617 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%% ==========================================================================
+%%
+%% This is the "simple" client using gen_tcp. The client is supposed to be
+%% as simple as possible in order to incur as little overhead as possible.
+%%
+%% There are three ways to run the client: active, passive or active-once.
+%%
+%% The client is the entity that controls the test, timing and counting.
+%%
+%% ==========================================================================
+%%
+%% Before the actual test starts, the client performs a "warmup".
+%% The warmup has two functions. First, to ensure that everything is "loaded"
+%% and, second, to calculate an approximate roundtrip time, in order to
+%% "know" how many iterations we should make (to run for the expected time).
+%% This is not intended to be exact, but just to ensure that all tests take
+%% approx the same time to run.
+%%
+%% ==========================================================================
+
+-module(socket_test_ttest_tcp_client).
+
+-export([
+ start_monitor/4, start_monitor/5, start_monitor/7,
+ stop/1
+ ]).
+
+-include_lib("kernel/include/inet.hrl").
+-include("socket_test_ttest.hrl").
+-include("socket_test_ttest_client.hrl").
+
+-define(RECV_TIMEOUT, 10000).
+-define(MAX_OUTSTANDING_DEFAULT_1, 100).
+-define(MAX_OUTSTANDING_DEFAULT_2, 10).
+-define(MAX_OUTSTANDING_DEFAULT_3, 3).
+
+
+-type active() :: once | boolean().
+-type msg_id() :: 1..3.
+-type max_outstanding() :: pos_integer().
+-type runtime() :: pos_integer().
+
+
+%% ==========================================================================
+
+-spec start_monitor(Mod, Active, Addr, Port) -> term() when
+ Mod :: atom(),
+ Active :: active(),
+ Addr :: inet:ip_address(),
+ Port :: inet:port_number().
+
+%% RunTime is in number of ms.
+start_monitor(Mod, Active, Addr, Port) ->
+ start_monitor(Mod, Active, Addr, Port, ?MSG_ID_DEFAULT).
+
+-spec start_monitor(Mod, Active, Addr, Port, MsgID) -> term() when
+ Mod :: atom(),
+ Active :: active(),
+ Addr :: inet:ip_address(),
+ Port :: inet:port_number(),
+ MsgID :: msg_id().
+
+%% RunTime is in number of ms.
+start_monitor(Mod, Active, Addr, Port, 1 = MsgID) ->
+ start_monitor(Mod, Active, Addr, Port, MsgID,
+ ?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT);
+start_monitor(Mod, Active, Addr, Port, 2 = MsgID) ->
+ start_monitor(Mod, Active, Addr, Port, MsgID,
+ ?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT);
+start_monitor(Mod, Active, Addr, Port, 3 = MsgID) ->
+ start_monitor(Mod, Active, Addr, Port, MsgID,
+ ?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT).
+
+-spec start_monitor(Mod,
+ Active,
+ Addr,
+ Port,
+ MsgID,
+ MaxOutstanding,
+ RunTime) -> term() when
+ Mod :: atom(),
+ Active :: active(),
+ Addr :: inet:ip_address(),
+ Port :: inet:port_number(),
+ MsgID :: msg_id(),
+ MaxOutstanding :: max_outstanding(),
+ RunTime :: runtime().
+
+%% RunTime is in number of ms.
+start_monitor(Mod, Active, Addr, Port,
+ MsgID, MaxOutstanding, RunTime)
+ when is_atom(Mod) andalso
+ (is_boolean(Active) orelse (Active =:= once)) andalso
+ is_tuple(Addr) andalso
+ (is_integer(Port) andalso (Port > 0)) andalso
+ (is_integer(MsgID) andalso (MsgID >= 1) andalso (MsgID =< 3)) andalso
+ (is_integer(MaxOutstanding) andalso (MaxOutstanding > 0)) andalso
+ (is_integer(RunTime) andalso (RunTime > 0)) ->
+ Self = self(),
+ ClientInit = fun() -> put(sname, "client"),
+ init(Self,
+ Mod, Active, Addr, Port,
+ MsgID, MaxOutstanding, RunTime)
+ end,
+ {Pid, MRef} = spawn_monitor(ClientInit),
+ receive
+ {?MODULE, Pid, ok} ->
+ erlang:demonitor(MRef, [flush]),
+ {ok, {Pid, MRef}};
+
+ {?MODULE, Pid, {error, _} = ERROR} ->
+ erlang:demonitor(MRef, [flush]),
+ ERROR;
+
+ {'DOWN', MRef, process, Pid, normal} ->
+ ok;
+ {'DOWN', MRef, process, Pid, Reason} ->
+ {error, {exit, Reason}}
+
+ end.
+
+
+%% We should not normally stop this (it terminates when its done).
+stop(Pid) when is_pid(Pid) ->
+ req(Pid, stop).
+
+
+%% ==========================================================================
+
+init(Parent, Mod, Active, Addr, Port,
+ MsgID, MaxOutstanding, RunTime) ->
+ i("init -> entry with"
+ "~n Parent: ~p"
+ "~n Mod: ~p"
+ "~n Active: ~p"
+ "~n Addr: ~s"
+ "~n Port: ~p"
+ "~n Msg ID: ~p (=> 16 + ~w bytes)"
+ "~n Max Outstanding: ~p"
+ "~n (Suggested) Run Time: ~p ms",
+ [Parent,
+ Mod, Active, inet:ntoa(Addr), Port,
+ MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]),
+ case Mod:connect(Addr, Port) of
+ {ok, Sock} ->
+ i("init -> connected"),
+ Parent ! {?MODULE, self(), ok},
+ initial_activation(Mod, Sock, Active),
+ Results = loop(#{slogan => run,
+ runtime => RunTime,
+ start => t(),
+ parent => Parent,
+ mod => Mod,
+ sock => Sock,
+ active => Active,
+ msg_data => which_msg_data(MsgID),
+ outstanding => 0,
+ max_outstanding => MaxOutstanding,
+ sid => 1,
+ rid => 1,
+ scnt => 0,
+ rcnt => 0,
+ bcnt => 0,
+ num => undefined,
+ acc => <<>>}),
+ present_results(Results),
+ (catch Mod:close(Sock)),
+ exit(normal);
+ {error, Reason} ->
+ i("init -> connect failed: ~p", [Reason]),
+ exit({connect, Reason})
+ end.
+
+which_msg_data(1) -> ?MSG_DATA1;
+which_msg_data(2) -> ?MSG_DATA2;
+which_msg_data(3) -> ?MSG_DATA3.
+
+
+present_results(#{status := ok,
+ runtime := RunTime,
+ bcnt := ByteCnt,
+ cnt := NumIterations}) ->
+ i("Results: "
+ "~n Run Time: ~s"
+ "~n ByteCnt: ~s"
+ "~n NumIterations: ~s",
+ [format_time(RunTime),
+ if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) ->
+ f("~w, ~w", [ByteCnt, RunTime]);
+ true ->
+ f("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime])
+ end,
+ if (RunTime =:= 0) ->
+ "-";
+ true ->
+ f("~p => ~p iterations / ms",
+ [NumIterations, NumIterations div RunTime])
+ end]),
+ ok;
+present_results(#{status := Failure,
+ runtime := RunTime,
+ sid := SID,
+ rid := RID,
+ scnt := SCnt,
+ rcnt := RCnt,
+ bcnt := BCnt,
+ num := Num}) ->
+ i("Time Test failed: "
+ "~n ~p"
+ "~n"
+ "~nwhen"
+ "~n"
+ "~n Run Time: ~s"
+ "~n Send ID: ~p"
+ "~n Recv ID: ~p"
+ "~n Send Count: ~p"
+ "~n Recv Count: ~p"
+ "~n Byte Count: ~p"
+ "~n Num Iterations: ~p",
+ [Failure,
+ format_time(RunTime),
+ SID, RID, SCnt, RCnt, BCnt, Num]).
+
+
+
+loop(#{runtime := RunTime} = State) ->
+ erlang:start_timer(RunTime, self(), stop),
+ try do_loop(State)
+ catch
+ throw:Results ->
+ Results
+ end.
+
+do_loop(State) ->
+ do_loop( handle_message( msg_exchange(State) ) ).
+
+msg_exchange(#{rcnt := Num, num := Num} = State) ->
+ %% i("we are done"),
+ finish(ok, State);
+msg_exchange(#{scnt := Num, num := Num} = State) ->
+ %% We are done sending more requests - now we will just await
+ %% the replies for the (still) outstanding replies.
+ %% i("we have sent all requests - (only) wait for replies"),
+ msg_exchange( recv_reply(State) );
+msg_exchange(#{outstanding := Outstanding,
+ max_outstanding := MaxOutstanding} = State)
+ when (Outstanding < MaxOutstanding) ->
+ %% i("send the (initial) requests (~w, ~w)", [Outstanding, MaxOutstanding]),
+ msg_exchange( send_request(State) );
+msg_exchange(State) ->
+ send_request( recv_reply(State) ).
+
+
+finish(ok,
+ #{start := Start, bcnt := BCnt, num := Num}) ->
+ Stop = t(),
+ throw(#{status => ok,
+ runtime => tdiff(Start, Stop),
+ bcnt => BCnt,
+ cnt => Num});
+finish(Reason,
+ #{start := Start,
+ sid := SID, rid := RID,
+ scnt := SCnt, rcnt := RCnt, bcnt := BCnt,
+ num := Num}) ->
+ Stop = t(),
+ throw(#{status => Reason,
+ runtime => tdiff(Start, Stop),
+ sid => SID,
+ rid => RID,
+ scnt => SCnt,
+ rcnt => RCnt,
+ bcnt => BCnt,
+ num => Num}).
+
+send_request(#{mod := Mod,
+ sock := Sock,
+ sid := ID,
+ scnt := Cnt,
+ outstanding := Outstanding,
+ max_outstanding := MaxOutstanding,
+ msg_data := Data} = State)
+ when (MaxOutstanding > Outstanding) ->
+ %% i("send request -> entry when"
+ %% "~n ID: ~p"
+ %% "~n Cnt: ~p"
+ %% "~n Outstanding: ~p"
+ %% "~n MaxOutstanding: ~p", [ID, Cnt, Outstanding, MaxOutstanding]),
+ SZ = size(Data),
+ Req = <<?TTEST_TAG:32,
+ ?TTEST_TYPE_REQUEST:32,
+ ID:32,
+ SZ:32,
+ Data/binary>>,
+ case Mod:send(Sock, Req) of
+ ok ->
+ %% i("~w bytes sent", [size(Req)]),
+ State#{sid => next_id(ID),
+ scnt => Cnt + 1,
+ outstanding => Outstanding + 1};
+ {error, Reason} ->
+ e("Failed sending request: ~p", [Reason]),
+ exit({send, Reason})
+ end;
+send_request(State) ->
+ State.
+
+
+
+recv_reply(#{mod := Mod,
+ sock := Sock,
+ rid := ID,
+ active := false,
+ bcnt := BCnt,
+ rcnt := Cnt,
+ outstanding := Outstanding} = State) ->
+ %% i("recv-reply(false) -> entry with"
+ %% "~n (R)ID: ~p"
+ %% "~n (R)Cnt: ~p"
+ %% "~n BCnt: ~p"
+ %% "~n Outstanding: ~p", [ID, Cnt, BCnt, Outstanding]),
+ case recv_reply_message1(Mod, Sock, ID) of
+ {ok, MsgSz} ->
+ State#{rid => next_id(ID),
+ bcnt => BCnt + MsgSz,
+ rcnt => Cnt + 1,
+ outstanding => Outstanding - 1};
+
+ {error, timeout} ->
+ i("recv_reply(false) -> error: timeout"),
+ State;
+
+ {error, Reason} ->
+ finish(Reason, State)
+ end;
+recv_reply(#{mod := Mod,
+ sock := Sock,
+ rid := ID,
+ active := Active,
+ bcnt := BCnt,
+ scnt := SCnt,
+ rcnt := RCnt,
+ outstanding := Outstanding,
+ acc := Acc} = State) ->
+ %% i("recv-reply(~w) -> entry with"
+ %% "~n (R)ID: ~p"
+ %% "~n RCnt: ~p"
+ %% "~n BCnt: ~p"
+ %% "~n Outstanding: ~p", [Active, ID, RCnt, BCnt, Outstanding]),
+ case recv_reply_message2(Mod, Sock, ID, Acc) of
+ {ok, {MsgSz, NewAcc}} when is_integer(MsgSz) andalso is_binary(NewAcc) ->
+ maybe_activate(Mod, Sock, Active),
+ State#{rid => next_id(ID),
+ bcnt => BCnt + MsgSz,
+ rcnt => RCnt + 1,
+ outstanding => Outstanding - 1,
+ acc => NewAcc};
+
+ ok ->
+ State;
+
+ {error, stop} ->
+ i("recv_reply(~w) -> stop", [Active]),
+ %% This will have the effect that no more requests are sent...
+ State#{num => SCnt, stop_started => t()};
+
+ {error, timeout} ->
+ i("recv_reply(~w) -> error: timeout", [Active]),
+ State;
+
+ {error, Reason} ->
+ finish(Reason, State)
+ end.
+
+
+%% This function reads exactly one (reply) message. No more no less.
+recv_reply_message1(Mod, Sock, ID) ->
+ %% i("recv_reply_message1 -> entry with"
+ %% "~n ID: ~w", [ID]),
+ case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of
+ {ok, <<?TTEST_TAG:32,
+ ?TTEST_TYPE_REPLY:32,
+ ID:32,
+ SZ:32>> = Hdr} ->
+ %% Receive the ping-pong reply boby
+ %% i("recv_reply_message1 -> try read body"
+ %% "~n ID: ~w", [ID]),
+ case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of
+ {ok, Data} when (size(Data) =:= SZ) ->
+ {ok, size(Hdr) + size(Data)};
+ {error, Reason2} ->
+ i("recv_reply_message1 -> body error: "
+ "~n ~p: ~p", [Reason2]),
+ {error, {recv_hdr, Reason2}}
+ end;
+
+ {ok, <<BadTag:32,
+ BadType:32,
+ BadID:32,
+ BadSZ:32>>} ->
+ {error, {invalid_hdr,
+ {?TTEST_TAG, BadTag},
+ {?TTEST_TYPE_REPLY, BadType},
+ {ID, BadID},
+ BadSZ}};
+ {ok, _InvHdr} ->
+ {error, invalid_hdr};
+
+ {error, Reason1} ->
+ i("recv_reply_message1 -> hdr error: "
+ "~n ~p", [Reason1]),
+ {error, {recv_hdr, Reason1}}
+ end.
+
+
+%% This function first attempts to process the data we have already
+%% accumulated. If that is not enough for a (complete) reply, it
+%% will attempt to receive more.
+recv_reply_message2(Mod, Sock, ID, Acc) ->
+ %% i("recv_reply_message2 -> entry with"
+ %% "~n ID: ~w", [ID]),
+ case process_acc_data(ID, Acc) of
+ ok ->
+ %% No or insufficient data, so get more
+ recv_reply_message3(Mod, Sock, ID, Acc);
+
+ {ok, _} = OK -> % We already had a reply accumulated - no need to read more
+ OK;
+
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+%% This function receives a "chunk" of data, then it tries to extract
+%% one (reply) message from the accumulated and new data (combined).
+recv_reply_message3(_Mod, Sock, ID, Acc) ->
+ receive
+ {timeout, _TRef, stop} ->
+ %% i("stop - when messages: ~p", [process_info(self(), messages)]),
+ {error, stop};
+
+ {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse
+ (TagClosed =:= socket_closed) ->
+ {error, closed};
+
+ {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse
+ (TagErr =:= socket_error) ->
+ {error, Reason};
+
+ {Tag, Sock, Msg} when (Tag =:= tcp) orelse
+ (Tag =:= socket) ->
+ %% i("recv_reply_message3 -> got ~w byte message", [size(Msg)]),
+ process_acc_data(ID, <<Acc/binary, Msg/binary>>)
+
+ %% after ?RECV_TIMEOUT ->
+ %% {error, timeout}
+ end.
+
+
+process_acc_data(ID, <<?TTEST_TAG:32,
+ ?TTEST_TYPE_REPLY:32,
+ ID:32,
+ SZ:32,
+ Data/binary>>) when (SZ =< size(Data)) ->
+ %% i("process_acc_data -> entry with"
+ %% "~n ID: ~w"
+ %% "~n SZ: ~w", [ID, SZ]),
+ <<_Body:SZ/binary, Rest/binary>> = Data,
+ {ok, {4*4+SZ, Rest}};
+process_acc_data(ID, <<BadTag:32,
+ BadType:32,
+ BadID:32,
+ BadSZ:32,
+ _Data/binary>>)
+ when ((BadTag =/= ?TTEST_TAG) orelse
+ (BadType =/= ?TTEST_TYPE_REPLY) orelse
+ (BadID =/= ID)) ->
+ {error, {invalid_hdr,
+ {?TTEST_TAG, BadTag},
+ {?TTEST_TYPE_REPLY, BadType},
+ {ID, BadID},
+ BadSZ}};
+%% Not enough for an entire (reply) message
+process_acc_data(_ID, _Data) ->
+ ok.
+
+
+handle_message(#{parent := Parent, sock := Sock, scnt := SCnt} = State) ->
+ receive
+ {timeout, _TRef, stop} ->
+ i("stop"),
+ %% This will have the effect that no more requests are sent...
+ State#{num => SCnt, stop_started => t()};
+
+ {?MODULE, Ref, Parent, stop} ->
+ %% This *aborts* the test
+ reply(Parent, Ref, ok),
+ exit(normal);
+
+ %% Only when active
+ {TagClosed, Sock, Reason} when (TagClosed =:= tcp_closed) orelse
+ (TagClosed =:= socket_closed) ->
+ %% We should never get this (unless the server crashed)
+ exit({closed, Reason});
+
+ %% Only when active
+ {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse
+ (TagErr =:= socket_error) ->
+ exit({error, Reason})
+
+ after 0 ->
+ State
+ end.
+
+
+initial_activation(_Mod, _Sock, false = _Active) ->
+ ok;
+initial_activation(Mod, Sock, Active) ->
+ Mod:active(Sock, Active).
+
+
+maybe_activate(Mod, Sock, once = Active) ->
+ Mod:active(Sock, Active);
+maybe_activate(_, _, _) ->
+ ok.
+
+
+%% ==========================================================================
+
+req(Pid, Req) ->
+ Ref = make_ref(),
+ Pid ! {?MODULE, Ref, Pid, Req},
+ receive
+ {'EXIT', Pid, Reason} ->
+ {error, {exit, Reason}};
+ {?MODULE, Ref, Reply} ->
+ Reply
+ end.
+
+reply(Pid, Ref, Reply) ->
+ Pid ! {?MODULE, Ref, Reply}.
+
+
+%% ==========================================================================
+
+next_id(ID) when (ID < ?MAX_ID) ->
+ ID + 1;
+next_id(_) ->
+ 1.
+
+
+%% ==========================================================================
+
+t() ->
+ os:timestamp().
+
+tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
+ T1 = A1*1000000000+B1*1000+(C1 div 1000),
+ T2 = A2*1000000000+B2*1000+(C2 div 1000),
+ T2 - T1.
+
+formated_timestamp() ->
+ format_timestamp(os:timestamp()).
+
+format_timestamp({_N1, _N2, N3} = TS) ->
+ {_Date, Time} = calendar:now_to_local_time(TS),
+ {Hour,Min,Sec} = Time,
+ FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
+ [Hour, Min, Sec, round(N3/1000)]),
+ lists:flatten(FormatTS).
+
+%% Time is always in number os ms (milli seconds)
+format_time(T) ->
+ f("~p", [T]).
+
+
+%% ==========================================================================
+
+f(F, A) ->
+ lists:flatten(io_lib:format(F, A)).
+
+%% e(F) ->
+%% i("<ERROR> " ++ F).
+
+e(F, A) ->
+ p(get(sname), "<ERROR> " ++ F, A).
+
+i(F) ->
+ i(F, []).
+
+i(F, A) ->
+ p(get(sname), "<INFO> " ++ F, A).
+
+p(undefined, F, A) ->
+ p("- ", F, A);
+p(Prefix, F, A) ->
+ io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_client_gen.erl b/erts/emulator/test/socket_test_ttest_tcp_client_gen.erl
new file mode 100644
index 0000000000..4f6152a4b1
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_tcp_client_gen.erl
@@ -0,0 +1,45 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(socket_test_ttest_tcp_client_gen).
+
+-export([
+ start_monitor/3, start_monitor/4, start_monitor/6,
+ stop/1
+ ]).
+
+-define(TRANSPORT_MOD, socket_test_ttest_tcp_gen).
+
+start_monitor(Active, Addr, Port) ->
+ socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+ Active, Addr, Port).
+
+start_monitor(Active, Addr, Port, MsgID) ->
+ socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+ Active, Addr, Port,
+ MsgID).
+
+start_monitor(Active, Addr, Port, MsgID, MaxOutstanding, RunTime) ->
+ socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+ Active, Addr, Port,
+ MsgID, MaxOutstanding, RunTime).
+
+stop(Pid) ->
+ socket_test_ttest_tcp_client:stop(Pid).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl
new file mode 100644
index 0000000000..0d2ec38876
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl
@@ -0,0 +1,45 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(socket_test_ttest_tcp_client_socket).
+
+-export([
+ start_monitor/3, start_monitor/4, start_monitor/6,
+ stop/1
+ ]).
+
+-define(TRANSPORT_MOD, socket_test_ttest_tcp_socket).
+
+start_monitor(Active, Addr, Port) ->
+ socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+ Active, Addr, Port).
+
+start_monitor(Active, Addr, Port, MsgID) ->
+ socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+ Active, Addr, Port,
+ MsgID).
+
+start_monitor(Active, Addr, Port, MsgID, MaxOutstanding, RunTime) ->
+ socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+ Active, Addr, Port,
+ MsgID, MaxOutstanding, RunTime).
+
+stop(Pid) ->
+ socket_test_ttest_client:stop(Pid).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_gen.erl b/erts/emulator/test/socket_test_ttest_tcp_gen.erl
new file mode 100644
index 0000000000..de8822157d
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_tcp_gen.erl
@@ -0,0 +1,123 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(socket_test_ttest_tcp_gen).
+
+-export([
+ accept/1, accept/2,
+ active/2,
+ close/1,
+ connect/2,
+ controlling_process/2,
+ listen/0, listen/1,
+ peername/1,
+ port/1,
+ recv/2, recv/3,
+ send/2,
+ shutdown/2,
+ sockname/1
+ ]).
+
+
+%% ==========================================================================
+
+accept(Sock) ->
+ case gen_tcp:accept(Sock) of
+ {ok, NewSock} ->
+ {ok, NewSock};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+accept(Sock, Timeout) ->
+ case gen_tcp:accept(Sock, Timeout) of
+ {ok, NewSock} ->
+ {ok, NewSock};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+
+active(Sock, NewActive)
+ when (is_boolean(NewActive) orelse (NewActive =:= once)) ->
+ inet:setopts(Sock, [{active, NewActive}]).
+
+
+close(Sock) ->
+ gen_tcp:close(Sock).
+
+
+connect(Addr, Port) ->
+ Opts = [binary, {packet, raw}, {active, false}],
+ case gen_tcp:connect(Addr, Port, Opts) of
+ {ok, Sock} ->
+ {ok, Sock};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+controlling_process(Sock, NewPid) ->
+ gen_tcp:controlling_process(Sock, NewPid).
+
+
+%% Create a listen socket
+listen() ->
+ listen(0).
+
+listen(Port) when is_integer(Port) andalso (Port >= 0) ->
+ Opts = [binary, {ip, {0,0,0,0}}, {packet, raw}, {active, false}],
+ case gen_tcp:listen(Port, Opts) of
+ {ok, Sock} ->
+ {ok, Sock};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+
+peername(Sock) ->
+ inet:peername(Sock).
+
+
+port(Sock) ->
+ inet:port(Sock).
+
+
+recv(Sock, Length) ->
+ gen_tcp:recv(Sock, Length).
+recv(Sock, Length, Timeout) ->
+ gen_tcp:recv(Sock, Length, Timeout).
+
+
+send(Sock, Data) ->
+ gen_tcp:send(Sock, Data).
+
+
+shutdown(Sock, How) ->
+ gen_tcp:shutdown(Sock, How).
+
+
+sockname(Sock) ->
+ inet:sockname(Sock).
+
+
+%% ==========================================================================
+
+
+
diff --git a/erts/emulator/test/socket_test_ttest_tcp_server.erl b/erts/emulator/test/socket_test_ttest_tcp_server.erl
new file mode 100644
index 0000000000..cb503a1feb
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_tcp_server.erl
@@ -0,0 +1,678 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%% ==========================================================================
+%%
+%% This is the "simple" server using gen_tcp. The server is supposed to be
+%% as simple as possible in order to incur as little overhead as possible.
+%%
+%% There are three ways to run the server: active, passive or active-once.
+%%
+%% The server does only two things; accept connnections and then reply
+%% to requests (actually the handler(s) does that). No timing or counting.
+%% That is all done by the clients.
+%%
+%% ==========================================================================
+
+-module(socket_test_ttest_tcp_server).
+
+-export([
+ start_monitor/2,
+ stop/1
+ ]).
+
+-include_lib("kernel/include/inet.hrl").
+-include("socket_test_ttest.hrl").
+
+-define(ACC_TIMEOUT, 10000).
+-define(RECV_TIMEOUT, 10000).
+
+
+%% ==========================================================================
+
+start_monitor(Mod, Active)
+ when is_atom(Mod) andalso is_boolean(Active) orelse (Active =:= once) ->
+ Self = self(),
+ ServerInit = fun() -> put(sname, "server"), server_init(Self, Mod, Active) end,
+ {Pid, MRef} = spawn_monitor(ServerInit),
+ receive
+ {'DOWN', MRef, process, Pid, normal} ->
+ ok;
+ {'DOWN', MRef, process, Pid, Reason} ->
+ {error, {exit, Reason}};
+
+ {?MODULE, Pid, {ok, Port}} ->
+ erlang:demonitor(MRef, [flush]),
+ {ok, {Pid, MRef, Port}};
+ {?MODULE, Pid, {error, _} = ERROR} ->
+ erlang:demonitor(MRef, [flush]),
+ ERROR
+ end.
+
+
+stop(Pid) when is_pid(Pid) ->
+ req(Pid, stop).
+
+
+%% ==========================================================================
+
+server_init(Parent, Mod, Active) ->
+ i("init -> entry with"
+ "~n Parent: ~p"
+ "~n Mod: ~p"
+ "~n Active: ~p", [Parent, Mod, Active]),
+ case Mod:listen(0) of
+ {ok, LSock} ->
+ case Mod:port(LSock) of
+ {ok, Port} = OK ->
+ Addr = which_addr(), % This is just for convenience
+ i("init -> listening on:"
+ "~n Addr: ~p (~s)"
+ "~n Port: ~w"
+ "~n", [Addr, inet:ntoa(Addr), Port]),
+ Parent ! {?MODULE, self(), OK},
+ server_loop(#{parent => Parent,
+ mod => Mod,
+ active => Active,
+ lsock => LSock,
+ handlers => [],
+ %% Accumulation
+ runtime => 0,
+ mcnt => 0,
+ bcnt => 0,
+ hcnt => 0
+ });
+ {error, PReason} ->
+ (catch Mod:close(LSock)),
+ exit({port, PReason})
+ end;
+ {error, LReason} ->
+ exit({listen, LReason})
+ end.
+
+
+server_loop(State) ->
+ %% i("loop -> enter"),
+ server_loop( server_handle_message( server_accept(State) ) ).
+
+server_accept(#{mod := Mod,
+ active := Active,
+ lsock := LSock,
+ handlers := Handlers} = State) ->
+
+ %% i("server-accept -> entry with"
+ %% "~n Mod: ~p"
+ %% "~n Active: ~p"
+ %% "~n LSock: ~p", [Mod, Active, LSock]),
+
+ case Mod:accept(LSock, ?ACC_TIMEOUT) of
+ {ok, Sock} ->
+ %% i("server-accept -> accepted: "
+ %% "~n Sock: ~p", [Sock]),
+ i("accepted connection from ~s",
+ [case Mod:peername(Sock) of
+ {ok, Peer} ->
+ format_peername(Peer);
+ {error, _} ->
+ "-"
+ end]),
+ {Pid, _} = handler_start(),
+ i("handler ~p started -> try transfer socket control", [Pid]),
+ case Mod:controlling_process(Sock, Pid) of
+ ok ->
+ i("server-accept: handler ~p started", [Pid]),
+ handler_continue(Pid, Mod, Sock, Active),
+ Handlers2 = [Pid | Handlers],
+ State#{handlers => Handlers2};
+ {error, CPReason} ->
+ (catch Mod:close(Sock)),
+ (catch Mod:close(LSock)),
+ exit({controlling_process, CPReason})
+ end;
+ {error, timeout} ->
+ State;
+ {error, AReason} ->
+ (catch Mod:close(LSock)),
+ exit({accept, AReason})
+ end.
+
+format_peername({Addr, Port}) ->
+ case inet:gethostbyaddr(Addr) of
+ {ok, #hostent{h_name = N}} ->
+ f("~s (~s:~w)", [N, inet:ntoa(Addr), Port]);
+ {error, _} ->
+ f("~p, ~p", [Addr, Port])
+ end.
+
+server_handle_message(#{parent := Parent, handlers := H} = State) ->
+ %% i("server_handle_message -> enter"),
+ receive
+ {?MODULE, Ref, Parent, stop} ->
+ reply(Parent, Ref, ok),
+ lists:foreach(fun(P) -> handler_stop(P) end, H),
+ exit(normal);
+
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ server_handle_down(Pid, Reason, State)
+
+ after 0 ->
+ State
+ end.
+
+
+server_handle_down(Pid, Reason, #{handlers := Handlers} = State) ->
+ case lists:delete(Pid, Handlers) of
+ Handlers ->
+ i("unknown process ~p died", [Pid]),
+ State;
+ Handlers2 ->
+ server_handle_handler_down(Pid, Reason, State#{handlers => Handlers2})
+ end.
+
+
+server_handle_handler_down(Pid,
+ {done, RunTime, MCnt, BCnt},
+ #{runtime := AccRunTime,
+ mcnt := AccMCnt,
+ bcnt := AccBCnt,
+ hcnt := AccHCnt} = State) ->
+ AccRunTime2 = AccRunTime + RunTime,
+ AccMCnt2 = AccMCnt + MCnt,
+ AccBCnt2 = AccBCnt + BCnt,
+ AccHCnt2 = AccHCnt + 1,
+ i("handler ~p (~w) done => accumulated results: "
+ "~n Run Time: ~s ms"
+ "~n Message Count: ~s"
+ "~n Byte Count: ~s",
+ [Pid, AccHCnt2,
+ format_time(AccRunTime2),
+ if (AccRunTime2 > 0) ->
+ f("~w => ~w (~w) msgs / ms",
+ [AccMCnt2,
+ AccMCnt2 div AccRunTime2,
+ (AccMCnt2 div AccHCnt2) div AccRunTime2]);
+ true ->
+ f("~w", [AccMCnt2])
+ end,
+ if (AccRunTime2 > 0) ->
+ f("~w => ~w (~w) bytes / ms",
+ [AccBCnt2,
+ AccBCnt2 div AccRunTime2,
+ (AccBCnt2 div AccHCnt2) div AccRunTime2]);
+ true ->
+ f("~w", [AccBCnt2])
+ end]),
+ State#{runtime => AccRunTime2,
+ mcnt => AccMCnt2,
+ bcnt => AccBCnt2,
+ hcnt => AccHCnt2};
+server_handle_handler_down(Pid, Reason, State) ->
+ i("handler ~p terminated: "
+ "~n ~p", [Pid, Reason]),
+ State.
+
+
+
+%% ==========================================================================
+
+handler_start() ->
+ Self = self(),
+ HandlerInit = fun() -> put(sname, "handler"), handler_init(Self) end,
+ spawn_monitor(HandlerInit).
+
+handler_continue(Pid, Mod, Sock, Active) ->
+ req(Pid, {continue, Mod, Sock, Active}).
+
+handler_stop(Pid) ->
+ req(Pid, stop).
+
+handler_init(Parent) ->
+ i("starting"),
+ receive
+ {?MODULE, Ref, Parent, {continue, Mod, Sock, Active}} ->
+ i("received continue"),
+ reply(Parent, Ref, ok),
+ handler_initial_activation(Mod, Sock, Active),
+ handler_loop(#{parent => Parent,
+ mod => Mod,
+ sock => Sock,
+ active => Active,
+ start => t(),
+ mcnt => 0,
+ bcnt => 0,
+ last_reply => none,
+ acc => <<>>})
+
+ after 5000 ->
+ i("timeout when message queue: "
+ "~n ~p"
+ "~nwhen"
+ "~n Parent: ~p", [process_info(self(), messages), Parent]),
+ handler_init(Parent)
+ end.
+
+handler_loop(State) ->
+ %% i("handler-loop"),
+ handler_loop( handler_handle_message( handler_recv_message(State) ) ).
+
+%% When passive, we read *one* request and then attempt to reply to it.
+handler_recv_message(#{mod := Mod,
+ sock := Sock,
+ active := false,
+ mcnt := MCnt,
+ bcnt := BCnt} = State) ->
+ %% i("handler_recv_message(false) -> entry"),
+ case handler_recv_message2(Mod, Sock) of
+ {ok, {MsgSz, ID, Body}} ->
+ handler_send_reply(Mod, Sock, ID, Body),
+ State#{mcnt => MCnt + 1,
+ bcnt => BCnt + MsgSz,
+ last_reply => ID};
+ {error, closed} ->
+ handler_done(State);
+ {error, timeout} ->
+ State
+ end;
+
+
+%% When "active" (once or true), we receive one data "message", which may
+%% contain any number of requests or only part of a request. Then we
+%% process this data together with whatever we had "accumulated" from
+%% prevous messages. Each request will be extracted and replied to. If
+%% there is some data left, not enough for a complete request, we store
+%% this in 'acc' (accumulate it).
+handler_recv_message(#{mod := Mod,
+ sock := Sock,
+ active := Active,
+ mcnt := MCnt,
+ bcnt := BCnt,
+ last_reply := LID,
+ acc := Acc} = State) ->
+ %% i("handler_recv_message(~w) -> entry", [Active]),
+ case handler_recv_message3(Mod, Sock, Acc, LID) of
+ {ok, {MCnt2, BCnt2, LID2}, NewAcc} ->
+ handler_maybe_activate(Mod, Sock, Active),
+ State#{mcnt => MCnt + MCnt2,
+ bcnt => BCnt + BCnt2,
+ last_reply => LID2,
+ acc => NewAcc};
+
+ {error, closed} ->
+ if
+ (size(Acc) =:= 0) ->
+ handler_done(State);
+ true ->
+ e("client done with partial message: "
+ "~n Last Reply Sent: ~w"
+ "~n Message Count: ~w"
+ "~n Byte Count: ~w"
+ "~n Partial Message: ~w bytes",
+ [LID, MCnt, BCnt, size(Acc)]),
+ exit({closed_with_partial_message, LID})
+ end;
+
+ {error, timeout} ->
+ State
+ end.
+
+handler_process_data(Acc, Mod, Sock, LID) ->
+ handler_process_data(Acc, Mod, Sock, 0, 0, LID).
+
+%% Extract each complete request, one at a time.
+handler_process_data(<<?TTEST_TAG:32,
+ ?TTEST_TYPE_REQUEST:32,
+ ID:32,
+ SZ:32,
+ Rest/binary>>,
+ Mod, Sock,
+ MCnt, BCnt, _LID) when (size(Rest) >= SZ) ->
+ <<Body:SZ/binary, Rest2/binary>> = Rest,
+ case handler_send_reply(Mod, Sock, ID, Body) of
+ ok ->
+ handler_process_data(Rest2, Mod, Sock, MCnt+1, BCnt+SZ, ID);
+ {error, _} = ERROR ->
+ ERROR
+ end;
+handler_process_data(Data, _Mod, _Sock, MCnt, BCnt, LID) ->
+ {ok, {MCnt, BCnt, LID}, Data}.
+
+
+handler_recv_message2(Mod, Sock) ->
+ %% i("handler_recv_message2 -> entry"),
+ case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of
+ {ok, <<?TTEST_TAG:32,
+ ?TTEST_TYPE_REQUEST:32,
+ ID:32,
+ SZ:32>> = Hdr} ->
+ %% i("handler_recv_message2 -> got request header: "
+ %% "~n ID: ~p"
+ %% "~n SZ: ~p", [ID, SZ]),
+ case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of
+ {ok, Body} when (SZ =:= size(Body)) ->
+ %% i("handler_recv_message2 -> got body"),
+ {ok, {size(Hdr) + size(Body), ID, Body}};
+ {error, BReason} ->
+ e("failed reading body (~w) of message ~w:"
+ "~n ~p", [SZ, ID, BReason]),
+ exit({recv, body, ID, SZ, BReason})
+ end;
+ {error, timeout} = ERROR ->
+ i("handler_recv_message2 -> timeout"),
+ ERROR;
+ {error, closed} = ERROR ->
+ ERROR;
+ {error, HReason} ->
+ e("failed reading header of message:"
+ "~n ~p", [HReason]),
+ exit({recv, header, HReason})
+ end.
+
+
+handler_recv_message3(Mod, Sock, Acc, LID) ->
+ receive
+ {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse
+ (TagClosed =:= socket_closed) ->
+ {error, closed};
+
+ {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse
+ (TagErr =:= socket_error) ->
+ {error, Reason};
+
+ {Tag, Sock, Msg} when (Tag =:= tcp) orelse
+ (Tag =:= socket) ->
+ handler_process_data(<<Acc/binary, Msg/binary>>, Mod, Sock, LID)
+
+ after ?RECV_TIMEOUT ->
+ {error, timeout}
+ end.
+
+
+
+%% %% Socket in passive mode => we need to read explicitly
+%% handler_loop(#{sock := Sock, active := false} = State, MCnt, BCnt) ->
+%% %% i("try recv msg header"),
+%% MsgSz =
+%% case gen_tcp:recv(Sock, 4*4) of
+%% {ok, <<?SOCKET_SIMPLE_TAG:32,
+%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
+%% ID:32,
+%% SZ:32>> = Hdr} ->
+%% %% i("msg ~w header received (data sz is ~w bytes)", [ID, SZ]),
+%% case gen_tcp:recv(Sock, SZ) of
+%% {ok, Data} when (size(Data) =:= SZ) ->
+%% handler_send_reply(Sock, ID, Data),
+%% size(Hdr)+SZ;
+%% {ok, InvData} ->
+%% i("invalid data"),
+%% (catch gen_tcp:close(Sock)),
+%% exit({invalid_data, SZ, size(InvData)});
+%% {error, Reason2} ->
+%% i("Data read failed: ~p", [Reason2]),
+%% (catch gen_tcp:close(Sock)),
+%% exit({recv_data, Reason2})
+%% end;
+%% {ok, _InvHdr} ->
+%% (catch gen_tcp:close(Sock)),
+%% exit(invalid_hdr);
+%% {error, closed} ->
+%% i("we are done: "
+%% "~n Message Count: ~p"
+%% "~n Byte Count: ~p", [MCnt, BCnt]),
+%% exit(normal);
+%% {error, Reason1} ->
+%% i("Header read failed: ~p", [Reason1]),
+%% (catch gen_tcp:close(Sock)),
+%% exit({recv_hdr, Reason1})
+%% end,
+%% handler_loop(State, MCnt+1, BCnt+MsgSz);
+
+%% %% Socket in active mode (once | true) => data messages arrive
+%% handler_loop(#{sock := Sock, active := Active} = State, MCnt, BCnt) ->
+%% %% i("await msg"),
+%% try handler_recv_request(Sock, Active) of
+%% {ID, Data, MsgSz} ->
+%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]),
+%% handler_send_reply(Sock, ID, Data),
+%% handler_maybe_activate(Sock, Active),
+%% handler_loop(State, MCnt+1, BCnt+MsgSz)
+%% catch
+%% throw:tcp_closed ->
+%% i("we are done: "
+%% "~n Message Count: ~p"
+%% "~n Byte Count: ~p", [MCnt, BCnt]),
+%% exit(normal);
+%% throw:{tcp_error, Reason} ->
+%% i("<ERROR> TCP error ~p when: "
+%% "~n Message Count: ~p"
+%% "~n Byte Count: ~p", [Reason, MCnt, BCnt]),
+%% exit({tcp_error, Reason})
+%% end.
+%% %% {ID, Data, MsgSz} = handler_recv_request(Sock, Active),
+%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]),
+%% %% handler_send_reply(Sock, ID, Data),
+%% %% handler_maybe_activate(Sock, Active),
+%% %% handler_loop(State, MCnt+1, BCnt+MsgSz).
+
+
+%% handler_recv_request(Sock, Active) ->
+%% %% In theory we should also be ready for a partial header,
+%% %% but I can't be bothered...
+%% receive
+%% {tcp_closed, Sock} ->
+%% throw(tcp_closed);
+%% {tcp_error, Sock, Reason} ->
+%% throw({tcp_error, Reason});
+%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32,
+%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
+%% ID:32,
+%% SZ:32,
+%% Data/binary>> = Msg} when (size(Data) =:= SZ) ->
+%% %% i("[complete] msg ~w received (data sz is ~w bytes)", [ID, SZ]),
+%% {ID, Data, size(Msg)};
+%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32,
+%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
+%% ID:32,
+%% SZ:32,
+%% Data/binary>> = Msg} when (size(Data) < SZ) ->
+%% %% i("[incomplete] msg ~w received (data sz is ~w bytes)", [ID, SZ]),
+%% handler_recv_request_data(Sock, Active, ID, SZ, Data, size(Msg))
+%% end.
+
+%% handler_recv_request_data(Sock, Active, ID, SZ, AccData, AccMsgSz) ->
+%% handler_maybe_activate(Sock, Active),
+%% receive
+%% {tcp_closed, Sock} ->
+%% %% i("we are done (incomplete data)"),
+%% throw(tcp_closed);
+%% {tcp_error, Sock, Reason} ->
+%% throw({tcp_error, Reason});
+%% {tcp, Sock, Data} when (SZ =:= (size(AccData) + size(Data))) ->
+%% %% i("[complete] received the remaining data (~w bytes) for msg ~w",
+%% %% [size(Data), ID]),
+%% {ID, <<AccData/binary, Data/binary>>, AccMsgSz+size(Data)};
+%% {tcp, Sock, Data} ->
+%% %% i("[incomplete] received ~w bytes of data for for msg ~w",
+%% %% [size(Data), ID]),
+%% handler_recv_request_data(Sock, Active, ID, SZ,
+%% <<AccData/binary, Data/binary>>,
+%% AccMsgSz+size(Data))
+%% end.
+
+handler_send_reply(Mod, Sock, ID, Data) ->
+ SZ = size(Data),
+ Msg = <<?TTEST_TAG:32,
+ ?TTEST_TYPE_REPLY:32,
+ ID:32,
+ SZ:32,
+ Data/binary>>,
+ %% i("handler-send-reply -> try send reply ~w: ~w bytes", [ID, size(Msg)]),
+ case Mod:send(Sock, Msg) of
+ ok ->
+ %% i("handler-send-reply -> reply ~w (~w bytes) sent", [ID, size(Msg)]),
+ ok;
+ {error, Reason} ->
+ (catch Mod:close(Sock)),
+ exit({send, Reason})
+ end.
+
+
+handler_done(State) ->
+ handler_done(State, t()).
+
+handler_done(#{start := Start,
+ mod := Mod,
+ sock := Sock,
+ mcnt := MCnt,
+ bcnt := BCnt}, Stop) ->
+ (catch Mod:close(Sock)),
+ exit({done, tdiff(Start, Stop), MCnt, BCnt}).
+
+
+handler_handle_message(#{parent := Parent} = State) ->
+ receive
+ {'EXIT', Parent, Reason} ->
+ exit({parent_exit, Reason})
+ after 0 ->
+ State
+ end.
+
+
+handler_initial_activation(_Mod, _Sock, false = _Active) ->
+ ok;
+handler_initial_activation(Mod, Sock, Active) ->
+ Mod:active(Sock, Active).
+
+
+handler_maybe_activate(Mod, Sock, once = Active) ->
+ Mod:active(Sock, Active);
+handler_maybe_activate(_, _, _) ->
+ ok.
+
+
+
+%% ==========================================================================
+
+which_addr() ->
+ case inet:getifaddrs() of
+ {ok, IfAddrs} ->
+ which_addrs(inet, IfAddrs);
+ {error, Reason} ->
+ exit({getifaddrs, Reason})
+ end.
+
+which_addrs(_Family, []) ->
+ exit({getifaddrs, not_found});
+which_addrs(Family, [{"lo", _} | IfAddrs]) ->
+ %% Skip
+ which_addrs(Family, IfAddrs);
+which_addrs(Family, [{"docker" ++ _, _} | IfAddrs]) ->
+ %% Skip docker
+ which_addrs(Family, IfAddrs);
+which_addrs(Family, [{"br-" ++ _, _} | IfAddrs]) ->
+ %% Skip docker
+ which_addrs(Family, IfAddrs);
+which_addrs(Family, [{"en" ++ _, IfOpts} | IfAddrs]) ->
+ %% Maybe take this one
+ case which_addr(Family, IfOpts) of
+ {ok, Addr} ->
+ Addr;
+ error ->
+ which_addrs(Family, IfAddrs)
+ end;
+which_addrs(Family, [{_IfName, IfOpts} | IfAddrs]) ->
+ case which_addr(Family, IfOpts) of
+ {ok, Addr} ->
+ Addr;
+ error ->
+ which_addrs(Family, IfAddrs)
+ end.
+
+which_addr(_, []) ->
+ error;
+which_addr(inet, [{addr, Addr}|_])
+ when is_tuple(Addr) andalso (size(Addr) =:= 4) ->
+ {ok, Addr};
+which_addr(inet6, [{addr, Addr}|_])
+ when is_tuple(Addr) andalso (size(Addr) =:= 8) ->
+ {ok, Addr};
+which_addr(Family, [_|IfOpts]) ->
+ which_addr(Family, IfOpts).
+
+
+%% ==========================================================================
+
+req(Pid, Req) ->
+ Ref = make_ref(),
+ Pid ! {?MODULE, Ref, self(), Req},
+ receive
+ {'EXIT', Pid, Reason} ->
+ {error, {exit, Reason}};
+ {?MODULE, Ref, Reply} ->
+ Reply
+ end.
+
+reply(Pid, Ref, Reply) ->
+ Pid ! {?MODULE, Ref, Reply}.
+
+
+%% ==========================================================================
+
+t() ->
+ os:timestamp().
+
+tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
+ T1 = A1*1000000000+B1*1000+(C1 div 1000),
+ T2 = A2*1000000000+B2*1000+(C2 div 1000),
+ T2 - T1.
+
+formated_timestamp() ->
+ format_timestamp(os:timestamp()).
+
+format_timestamp({_N1, _N2, N3} = TS) ->
+ {_Date, Time} = calendar:now_to_local_time(TS),
+ {Hour,Min,Sec} = Time,
+ FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
+ [Hour, Min, Sec, round(N3/1000)]),
+ lists:flatten(FormatTS).
+
+%% Time is always in number os ms (milli seconds)
+format_time(T) ->
+ f("~p", [T]).
+
+
+%% ==========================================================================
+
+f(F, A) ->
+ lists:flatten(io_lib:format(F, A)).
+
+e(F, A) ->
+ p(get(sname), "<ERROR> " ++ F, A).
+
+i(F) ->
+ i(F, []).
+
+i(F, A) ->
+ p(get(sname), "<INFO> " ++ F, A).
+
+p(undefined, F, A) ->
+ p("- ", F, A);
+p(Prefix, F, A) ->
+ io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
+
diff --git a/erts/emulator/test/socket_test_ttest_tcp_server_gen.erl b/erts/emulator/test/socket_test_ttest_tcp_server_gen.erl
new file mode 100644
index 0000000000..53a9dc7d2a
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_tcp_server_gen.erl
@@ -0,0 +1,34 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(socket_test_ttest_tcp_server_gen).
+
+-export([
+ start_monitor/1,
+ stop/1
+ ]).
+
+-define(TRANSPORT_MOD, socket_test_ttest_tcp_gen).
+
+start_monitor(Active) ->
+ socket_test_ttest_tcp_server:start_monitor(?TRANSPORT_MOD, Active).
+
+stop(Pid) ->
+ socket_test_ttest_tcp_server:stop(Pid).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl
new file mode 100644
index 0000000000..de9df857fe
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl
@@ -0,0 +1,34 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(socket_test_ttest_tcp_server_socket).
+
+-export([
+ start_monitor/1,
+ stop/1
+ ]).
+
+-define(TRANSPORT_MOD, socket_test_ttest_tcp_socket).
+
+start_monitor(Active) ->
+ socket_test_ttest_tcp_server:start_monitor(?TRANSPORT_MOD, Active).
+
+stop(Pid) ->
+ socket_test_ttest_tcp_server:stop(Pid).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
new file mode 100644
index 0000000000..12d9e052d7
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
@@ -0,0 +1,345 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(socket_test_ttest_tcp_socket).
+
+-export([
+ accept/1, accept/2,
+ active/2,
+ close/1,
+ connect/2,
+ controlling_process/2,
+ listen/0, listen/1,
+ port/1,
+ peername/1,
+ recv/2, recv/3,
+ send/2,
+ shutdown/2,
+ sockname/1
+ ]).
+
+
+-define(READER_RECV_TIMEOUT, 1000).
+
+
+%% ==========================================================================
+
+accept(#{sock := LSock}) ->
+ case socket:accept(LSock) of
+ {ok, Sock} ->
+ Self = self(),
+ Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
+ {ok, #{sock => Sock, reader => Reader}};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+accept(#{sock := LSock}, Timeout) ->
+ case socket:accept(LSock, Timeout) of
+ {ok, Sock} ->
+ Self = self(),
+ Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
+ {ok, #{sock => Sock, reader => Reader}};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+
+active(#{reader := Pid}, NewActive)
+ when (is_boolean(NewActive) orelse (NewActive =:= once)) ->
+ Pid ! {?MODULE, active, NewActive},
+ ok.
+
+
+close(#{sock := Sock, reader := Pid}) ->
+ Pid ! {?MODULE, stop},
+ socket:close(Sock).
+
+%% Create a socket and connect it to a peer
+connect(Addr, Port) ->
+ try
+ begin
+ Sock =
+ case socket:open(inet, stream, tcp) of
+ {ok, S} ->
+ S;
+ {error, OReason} ->
+ throw({error, {open, OReason}})
+ end,
+ case socket:bind(Sock, any) of
+ {ok, _} ->
+ ok;
+ {error, BReason} ->
+ (catch socket:close(Sock)),
+ throw({error, {bind, BReason}})
+ end,
+ SA = #{family => inet,
+ addr => Addr,
+ port => Port},
+ case socket:connect(Sock, SA) of
+ ok ->
+ ok;
+ {error, CReason} ->
+ (catch socket:close(Sock)),
+ throw({error, {connect, CReason}})
+ end,
+ Self = self(),
+ Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
+ {ok, #{sock => Sock, reader => Reader}}
+ end
+ catch
+ throw:ERROR:_ ->
+ ERROR
+ end.
+
+
+controlling_process(#{sock := Sock, reader := Pid}, NewPid) ->
+ case socket:setopt(Sock, otp, controlling_process, NewPid) of
+ ok ->
+ Pid ! {?MODULE, self(), controlling_process, NewPid},
+ receive
+ {?MODULE, Pid, controlling_process} ->
+ ok
+ end;
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+
+%% Create a listen socket
+listen() ->
+ listen(0).
+
+listen(Port) when is_integer(Port) andalso (Port >= 0) ->
+ try
+ begin
+ Sock = case socket:open(inet, stream, tcp) of
+ {ok, S} ->
+ S;
+ {error, OReason} ->
+ throw({error, {open, OReason}})
+ end,
+ SA = #{family => inet,
+ port => Port},
+ case socket:bind(Sock, SA) of
+ {ok, _} ->
+ ok;
+ {error, BReason} ->
+ (catch socket:close(Sock)),
+ throw({error, {bind, BReason}})
+ end,
+ case socket:listen(Sock) of
+ ok ->
+ ok;
+ {error, LReason} ->
+ (catch socket:close(Sock)),
+ throw({error, {listen, LReason}})
+ end,
+ {ok, #{sock => Sock}}
+ end
+ catch
+ throw:{error, Reason}:_ ->
+ {error, Reason}
+ end.
+
+
+port(#{sock := Sock}) ->
+ case socket:sockname(Sock) of
+ {ok, #{port := Port}} ->
+ {ok, Port};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+
+peername(#{sock := Sock}) ->
+ case socket:peername(Sock) of
+ {ok, #{addr := Addr, port := Port}} ->
+ {ok, {Addr, Port}};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+
+recv(#{sock := Sock}, Length) ->
+ socket:recv(Sock, Length).
+recv(#{sock := Sock}, Length, Timeout) ->
+ socket:recv(Sock, Length, Timeout).
+
+
+send(#{sock := Sock}, Length) ->
+ socket:send(Sock, Length).
+
+
+shutdown(#{sock := Sock}, How) ->
+ socket:shutdown(Sock, How).
+
+
+sockname(#{sock := Sock}) ->
+ case socket:sockname(Sock) of
+ {ok, #{addr := Addr, port := Port}} ->
+ {ok, {Addr, Port}};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+
+%% ==========================================================================
+
+reader_init(ControllingProcess, Sock, Active)
+ when is_pid(ControllingProcess) andalso
+ (is_boolean(Active) orelse (Active =:= once)) ->
+ MRef = erlang:monitor(process, ControllingProcess),
+ reader_loop(#{ctrl_proc => ControllingProcess,
+ ctrl_proc_mref => MRef,
+ active => Active,
+ sock => Sock}).
+
+
+%% Never read
+reader_loop(#{active := false,
+ ctrl_proc := Pid} = State) ->
+ receive
+ {?MODULE, stop} ->
+ exit(normal);
+
+ {?MODULE, Pid, controlling_process, NewPid} ->
+ MRef = maps:get(ctrl_proc_mref, State),
+ erlang:demonitor(MRef, [flush]),
+ NewMRef = erlang:monitor(process, NewPid),
+ Pid ! {?MODULE, self(), controlling_process},
+ reader_loop(State#{ctrl_proc => NewPid,
+ ctrl_proc_mref => NewMRef});
+
+ {?MODULE, active, NewActive} ->
+ reader_loop(State#{active => NewActive});
+
+ {'DOWN', MRef, process, Pid, Reason} ->
+ case maps:get(ctrl_proc_mref, State) of
+ MRef when (Reason =:= normal) ->
+ exit(normal);
+ MRef ->
+ exit({controlling_process, Reason});
+ _ ->
+ reader_loop(State)
+ end
+ end;
+
+%% Read *once* and then change to false
+reader_loop(#{active := once,
+ sock := Sock,
+ ctrl_proc := Pid} = State) ->
+ case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of
+ {ok, Data} ->
+ Pid ! {socket, #{sock => Sock, reader => self()}, Data},
+ reader_loop(State#{active => false});
+ {error, timeout} ->
+ receive
+ {?MODULE, stop} ->
+ exit(normal);
+
+ {?MODULE, Pid, controlling_process, NewPid} ->
+ MRef = maps:get(ctrl_proc_mref, State),
+ erlang:demonitor(MRef, [flush]),
+ MRef = erlang:monitor(process, NewPid),
+ Pid ! {?MODULE, self(), controlling_process},
+ reader_loop(State#{ctrl_proc => NewPid,
+ ctrl_proc_mref => MRef});
+
+ {?MODULE, active, NewActive} ->
+ reader_loop(State#{active => NewActive});
+
+ {'DOWN', MRef, process, Pid, Reason} ->
+ case maps:get(ctrl_proc_mref, State) of
+ MRef when (Reason =:= normal) ->
+ exit(normal);
+ MRef ->
+ exit({controlling_process, Reason});
+ _ ->
+ reader_loop(State)
+ end
+ after 0 ->
+ reader_loop(State)
+ end;
+
+ {error, closed} ->
+ Pid ! {socket_closed, #{sock => Sock, reader => self()}},
+ exit(normal);
+
+ {error, Reason} ->
+ Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason},
+ exit(Reason)
+ end;
+
+%% Read and forward data continuously
+reader_loop(#{active := true,
+ sock := Sock,
+ ctrl_proc := Pid} = State) ->
+ case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of
+ {ok, Data} ->
+ Pid ! {socket, #{sock => Sock, reader => self()}, Data},
+ reader_loop(State);
+ {error, timeout} ->
+ receive
+ {?MODULE, stop} ->
+ exit(normal);
+
+ {?MODULE, Pid, controlling_process, NewPid} ->
+ MRef = maps:get(ctrl_proc_mref, State),
+ erlang:demonitor(MRef, [flush]),
+ MRef = erlang:monitor(process, NewPid),
+ Pid ! {?MODULE, self(), controlling_process},
+ reader_loop(State#{ctrl_proc => NewPid,
+ ctrl_proc_mref => MRef});
+
+ {?MODULE, active, NewActive} ->
+ reader_loop(State#{active => NewActive});
+
+ {'DOWN', MRef, process, Pid, Reason} ->
+ case maps:get(ctrl_proc_mref, State) of
+ MRef when (Reason =:= normal) ->
+ exit(normal);
+ MRef ->
+ exit({controlling_process, Reason});
+ _ ->
+ reader_loop(State)
+ end
+ after 0 ->
+ reader_loop(State)
+ end;
+
+ {error, closed} ->
+ Pid ! {socket_closed, #{sock => Sock, reader => self()}},
+ exit(normal);
+
+ {error, Reason} ->
+ Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason},
+ exit(Reason)
+ end.
+
+
+
+
+
+
+%% ==========================================================================
+
+
+