aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/nifs/common
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/nifs/common')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c190
1 files changed, 149 insertions, 41 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 9375e9c005..fc218f5163 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -812,11 +812,20 @@ typedef struct {
SocketRequestQueue acceptorsQ;
/* +++ Config & Misc stuff +++ */
- size_t rBufSz; // Read buffer size (when data length = 0 is specified)
- size_t rCtrlSz; // Read control buffer size
- size_t wCtrlSz; // Write control buffer size
- BOOLEAN_T iow; // Inform On (counter) Wrap
- BOOLEAN_T dbg;
+ size_t rBufSz; // Read buffer size (when data length = 0)
+ /* rNum and rNumCnt are used (together with rBufSz) when calling the recv
+ * function with the Length argument set to 0 (zero).
+ * If rNum is 0 (zero), then rNumCnt is not used and only *one* read will
+ * be done. Also, when get'ing the value of the option (rcvbuf) with
+ * getopt, the value will be reported as an integer. If the rNum has a
+ * value greater then 0 (zero), then it will instead be reported as {N, BufSz}.
+ */
+ unsigned int rNum; // recv: Number of reads using rBufSz
+ unsigned int rNumCnt; // recv: Current number of reads (so far)
+ size_t rCtrlSz; // Read control buffer size
+ size_t wCtrlSz; // Write control buffer size
+ BOOLEAN_T iow; // Inform On (counter) Wrap
+ BOOLEAN_T dbg;
/* +++ Close stuff +++ */
ErlNifMutex* closeMtx;
@@ -4959,6 +4968,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
accDescP->type = descP->type;
accDescP->protocol = descP->protocol;
accDescP->rBufSz = descP->rBufSz; // Inherit buffer size
+ accDescP->rNum = descP->rNum; // Inherit buffer uses
+ accDescP->rNumCnt = 0;
accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer siez
accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size
@@ -5737,8 +5748,8 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env,
!GET_UINT(env, argv[3], &eflags)) {
return enif_make_badarg(env);
}
- sockRef = argv[0]; // We need this in case we send in case we send abort
- recvRef = argv[1];
+ sockRef = argv[0]; // We need this in case we case we send abort
+ recvRef = argv[1];
if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) {
return enif_make_badarg(env);
@@ -5791,9 +5802,9 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
int bufSz = (len ? len : descP->rBufSz);
SSDBG( descP, ("SOCKET", "nrecv -> entry with"
- "\r\n len: %d (%d)"
+ "\r\n len: %d (%d:%d)"
"\r\n flags: %d"
- "\r\n", len, bufSz, flags) );
+ "\r\n", len, descP->rNumCnt, bufSz, flags) );
if (!descP->isReadable)
return enif_make_badarg(env);
@@ -5817,10 +5828,11 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
// If it fails (read = -1), we need errno...
SSDBG( descP, ("SOCKET", "nrecv -> try read (%d)\r\n", buf.size) );
read = sock_recv(descP->sock, buf.data, buf.size, flags);
- if (IS_SOCKET_ERROR(read))
+ if (IS_SOCKET_ERROR(read)) {
save_errno = sock_errno();
- else
+ } else {
save_errno = -1; // The value does not actually matter in this case
+ }
SSDBG( descP, ("SOCKET", "nrecv -> read: %d (%d)\r\n", read, save_errno) );
@@ -6719,7 +6731,7 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
}
descP->ctrlPid = newCtrlPid;
- descP->ctrlMon = newCtrlMon;
+ descP->ctrlMon = newCtrlMon;
SSDBG( descP, ("SOCKET", "nsetopt_otp_ctrl_proc -> done\r\n") );
@@ -6729,22 +6741,60 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
/* nsetopt_otp_rcvbuf - Handle the OTP (level) rcvbuf option
+ * The (otp) rcvbuf option is provided as:
+ *
+ * BufSz :: integer() | {N :: pos_integer(), BufSz :: pod_integer()}
+ *
+ * Where N is the max number of reads.
*/
static
ERL_NIF_TERM nsetopt_otp_rcvbuf(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM eVal)
{
- size_t val;
- char* xres;
+ const ERL_NIF_TERM* t; // The array of the elements of the tuple
+ int tsz; // The size of the tuple - should be 2
+ unsigned int n;
+ size_t bufSz;
+ char* xres;
- if ((xres = esock_decode_bufsz(env,
- eVal,
- SOCKET_RECV_BUFFER_SIZE_DEFAULT, &val)) != NULL)
- return esock_make_error_str(env, xres);
+ if (IS_NUM(env, eVal)) {
+
+ /* This will have the effect that the buffer size will be
+ * reported as an integer (getopt).
+ */
+ n = 0;
- descP->rBufSz = val;
+ if ((xres = esock_decode_bufsz(env,
+ eVal,
+ SOCKET_RECV_BUFFER_SIZE_DEFAULT,
+ &bufSz)) != NULL)
+ return esock_make_error_str(env, xres);
+
+ } else if (IS_TUPLE(env, eVal)) {
+
+ if (!GET_TUPLE(env, eVal, &tsz, &t))
+ return enif_make_badarg(env); // We should use a "proper" error value...
+
+ if (tsz != 2)
+ return enif_make_badarg(env); // We should use a "proper" error value...
+ if (!GET_UINT(env, t[0], &n))
+ return enif_make_badarg(env); // We should use a "proper" error value...
+
+ if ((xres = esock_decode_bufsz(env,
+ t[1],
+ SOCKET_RECV_BUFFER_SIZE_DEFAULT,
+ &bufSz)) != NULL)
+ return esock_make_error_str(env, xres);
+
+ } else {
+ return enif_make_badarg(env); // We should use a "proper" error value...
+ }
+
+ descP->rNum = n;
+ descP->rBufSz = bufSz;
+
return esock_atom_ok;
}
@@ -10097,7 +10147,13 @@ static
ERL_NIF_TERM ngetopt_otp_rcvbuf(ErlNifEnv* env,
SocketDescriptor* descP)
{
- ERL_NIF_TERM eVal = MKI(env, descP->rBufSz);
+ ERL_NIF_TERM eVal;
+
+ if (descP->rNum == 0) {
+ eVal = MKI(env, descP->rBufSz);
+ } else {
+ eVal = MKT2(env, MKI(env, descP->rNum), MKI(env, descP->rBufSz));
+ }
return esock_make_ok2(env, eVal);
}
@@ -13681,7 +13737,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
if (toRead == 0) {
- /* +++ Give us everything you have got => needs to continue +++ */
+ /* +++ Give us everything you have got => *
+ * (maybe) needs to continue +++ */
/* How do we do this?
* Either:
@@ -13695,36 +13752,83 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* (continuous binary realloc "here").
*
* => We choose alt 1 for now.
+ *
+ * Also, we need to check if the rNumCnt has reached its max (rNum),
+ * in which case we will assume the read to be done!
*/
cnt_inc(&descP->readByteCnt, read);
- if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL)
- return esock_make_error_str(env, xres);
+ SSDBG( descP,
+ ("SOCKET", "recv_check_result -> shall we continue reading"
+ "\r\n read: %d"
+ "\r\n rNum: %d"
+ "\r\n rNumCnt: %d"
+ "\r\n", read, descP->rNum, descP->rNumCnt) );
- /* This transfers "ownership" of the *allocated* binary to an
- * erlang term (no need for an explicit free).
- */
- data = MKBIN(env, bufP);
+ if (descP->rNum > 0) {
- SSDBG( descP,
- ("SOCKET",
- "recv_check_result -> [%d] "
- "we are done for now - read more\r\n", toRead) );
+ descP->rNumCnt++;
+ if (descP->rNumCnt >= descP->rNum) {
- return esock_make_ok3(env, atom_false, data);
+ descP->rNumCnt = 0;
- } else {
+ cnt_inc(&descP->readPkgCnt, 1);
+
+ recv_update_current_reader(env, descP);
+
+ /* This transfers "ownership" of the *allocated* binary to an
+ * erlang term (no need for an explicit free).
+ */
+ data = MKBIN(env, bufP);
+
+ return esock_make_ok3(env, atom_true, data);
- /* +++ We got exactly as much as we requested +++ */
+ } else {
- /* <KOLLA>
- * WE NEED TO INFORM ANY WAITING READERS
- *
- * DEMONP of the current reader!
- *
- * </KOLLA>
- */
+ /* Yes, we *do* need to continue reading */
+
+ if ((xres = recv_init_current_reader(env,
+ descP, recvRef)) != NULL) {
+ descP->rNumCnt = 0;
+ return esock_make_error_str(env, xres);
+ }
+
+ /* This transfers "ownership" of the *allocated* binary to an
+ * erlang term (no need for an explicit free).
+ */
+ data = MKBIN(env, bufP);
+
+ return esock_make_ok3(env, atom_false, data);
+
+ }
+
+ } else {
+
+ /* Yes, we *do* need to continue reading */
+
+ if ((xres = recv_init_current_reader(env,
+ descP, recvRef)) != NULL) {
+ descP->rNumCnt = 0;
+ return esock_make_error_str(env, xres);
+ }
+
+ /* This transfers "ownership" of the *allocated* binary to an
+ * erlang term (no need for an explicit free).
+ */
+ data = MKBIN(env, bufP);
+
+ SSDBG( descP,
+ ("SOCKET",
+ "recv_check_result -> [%d] "
+ "we are done for now - read more\r\n", toRead) );
+
+ return esock_make_ok3(env, atom_false, data);
+ }
+
+ } else {
+
+ /* +++ We got exactly as much as we requested => We are done +++ */
cnt_inc(&descP->readPkgCnt, 1);
cnt_inc(&descP->readByteCnt, read);
@@ -13793,6 +13897,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SSDBG( descP, ("SOCKET",
"recv_check_result -> [%d] eagain\r\n", toRead) );
+ descP->rNumCnt = 0;
if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL)
return esock_make_error_str(env, xres);
@@ -13840,6 +13945,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SSDBG( descP, ("SOCKET",
"recv_check_result -> [%d] split buffer\r\n", toRead) );
+ descP->rNumCnt = 0;
cnt_inc(&descP->readPkgCnt, 1);
cnt_inc(&descP->readByteCnt, read);
@@ -15859,6 +15965,8 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->closeMtx = MCREATE(buf);
descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT;
+ descP->rNum = 0;
+ descP->rNumCnt = 0;
descP->rCtrlSz = SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT;
descP->wCtrlSz = SOCKET_SEND_CTRL_BUFFER_SIZE_DEFAULT;
descP->iow = FALSE;