diff options
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 190 |
1 files changed, 149 insertions, 41 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 9375e9c005..fc218f5163 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -812,11 +812,20 @@ typedef struct { SocketRequestQueue acceptorsQ; /* +++ Config & Misc stuff +++ */ - size_t rBufSz; // Read buffer size (when data length = 0 is specified) - size_t rCtrlSz; // Read control buffer size - size_t wCtrlSz; // Write control buffer size - BOOLEAN_T iow; // Inform On (counter) Wrap - BOOLEAN_T dbg; + size_t rBufSz; // Read buffer size (when data length = 0) + /* rNum and rNumCnt are used (together with rBufSz) when calling the recv + * function with the Length argument set to 0 (zero). + * If rNum is 0 (zero), then rNumCnt is not used and only *one* read will + * be done. Also, when get'ing the value of the option (rcvbuf) with + * getopt, the value will be reported as an integer. If the rNum has a + * value greater then 0 (zero), then it will instead be reported as {N, BufSz}. + */ + unsigned int rNum; // recv: Number of reads using rBufSz + unsigned int rNumCnt; // recv: Current number of reads (so far) + size_t rCtrlSz; // Read control buffer size + size_t wCtrlSz; // Write control buffer size + BOOLEAN_T iow; // Inform On (counter) Wrap + BOOLEAN_T dbg; /* +++ Close stuff +++ */ ErlNifMutex* closeMtx; @@ -4959,6 +4968,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, accDescP->type = descP->type; accDescP->protocol = descP->protocol; accDescP->rBufSz = descP->rBufSz; // Inherit buffer size + accDescP->rNum = descP->rNum; // Inherit buffer uses + accDescP->rNumCnt = 0; accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer siez accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size @@ -5737,8 +5748,8 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } - sockRef = argv[0]; // We need this in case we send in case we send abort - recvRef = argv[1]; + sockRef = argv[0]; // We need this in case we case we send abort + recvRef = argv[1]; if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { return enif_make_badarg(env); @@ -5791,9 +5802,9 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, int bufSz = (len ? len : descP->rBufSz); SSDBG( descP, ("SOCKET", "nrecv -> entry with" - "\r\n len: %d (%d)" + "\r\n len: %d (%d:%d)" "\r\n flags: %d" - "\r\n", len, bufSz, flags) ); + "\r\n", len, descP->rNumCnt, bufSz, flags) ); if (!descP->isReadable) return enif_make_badarg(env); @@ -5817,10 +5828,11 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, // If it fails (read = -1), we need errno... SSDBG( descP, ("SOCKET", "nrecv -> try read (%d)\r\n", buf.size) ); read = sock_recv(descP->sock, buf.data, buf.size, flags); - if (IS_SOCKET_ERROR(read)) + if (IS_SOCKET_ERROR(read)) { save_errno = sock_errno(); - else + } else { save_errno = -1; // The value does not actually matter in this case + } SSDBG( descP, ("SOCKET", "nrecv -> read: %d (%d)\r\n", read, save_errno) ); @@ -6719,7 +6731,7 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env, } descP->ctrlPid = newCtrlPid; - descP->ctrlMon = newCtrlMon; + descP->ctrlMon = newCtrlMon; SSDBG( descP, ("SOCKET", "nsetopt_otp_ctrl_proc -> done\r\n") ); @@ -6729,22 +6741,60 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env, /* nsetopt_otp_rcvbuf - Handle the OTP (level) rcvbuf option + * The (otp) rcvbuf option is provided as: + * + * BufSz :: integer() | {N :: pos_integer(), BufSz :: pod_integer()} + * + * Where N is the max number of reads. */ static ERL_NIF_TERM nsetopt_otp_rcvbuf(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM eVal) { - size_t val; - char* xres; + const ERL_NIF_TERM* t; // The array of the elements of the tuple + int tsz; // The size of the tuple - should be 2 + unsigned int n; + size_t bufSz; + char* xres; - if ((xres = esock_decode_bufsz(env, - eVal, - SOCKET_RECV_BUFFER_SIZE_DEFAULT, &val)) != NULL) - return esock_make_error_str(env, xres); + if (IS_NUM(env, eVal)) { + + /* This will have the effect that the buffer size will be + * reported as an integer (getopt). + */ + n = 0; - descP->rBufSz = val; + if ((xres = esock_decode_bufsz(env, + eVal, + SOCKET_RECV_BUFFER_SIZE_DEFAULT, + &bufSz)) != NULL) + return esock_make_error_str(env, xres); + + } else if (IS_TUPLE(env, eVal)) { + + if (!GET_TUPLE(env, eVal, &tsz, &t)) + return enif_make_badarg(env); // We should use a "proper" error value... + + if (tsz != 2) + return enif_make_badarg(env); // We should use a "proper" error value... + if (!GET_UINT(env, t[0], &n)) + return enif_make_badarg(env); // We should use a "proper" error value... + + if ((xres = esock_decode_bufsz(env, + t[1], + SOCKET_RECV_BUFFER_SIZE_DEFAULT, + &bufSz)) != NULL) + return esock_make_error_str(env, xres); + + } else { + return enif_make_badarg(env); // We should use a "proper" error value... + } + + descP->rNum = n; + descP->rBufSz = bufSz; + return esock_atom_ok; } @@ -10097,7 +10147,13 @@ static ERL_NIF_TERM ngetopt_otp_rcvbuf(ErlNifEnv* env, SocketDescriptor* descP) { - ERL_NIF_TERM eVal = MKI(env, descP->rBufSz); + ERL_NIF_TERM eVal; + + if (descP->rNum == 0) { + eVal = MKI(env, descP->rBufSz); + } else { + eVal = MKT2(env, MKI(env, descP->rNum), MKI(env, descP->rBufSz)); + } return esock_make_ok2(env, eVal); } @@ -13681,7 +13737,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, if (toRead == 0) { - /* +++ Give us everything you have got => needs to continue +++ */ + /* +++ Give us everything you have got => * + * (maybe) needs to continue +++ */ /* How do we do this? * Either: @@ -13695,36 +13752,83 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * (continuous binary realloc "here"). * * => We choose alt 1 for now. + * + * Also, we need to check if the rNumCnt has reached its max (rNum), + * in which case we will assume the read to be done! */ cnt_inc(&descP->readByteCnt, read); - if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) - return esock_make_error_str(env, xres); + SSDBG( descP, + ("SOCKET", "recv_check_result -> shall we continue reading" + "\r\n read: %d" + "\r\n rNum: %d" + "\r\n rNumCnt: %d" + "\r\n", read, descP->rNum, descP->rNumCnt) ); - /* This transfers "ownership" of the *allocated* binary to an - * erlang term (no need for an explicit free). - */ - data = MKBIN(env, bufP); + if (descP->rNum > 0) { - SSDBG( descP, - ("SOCKET", - "recv_check_result -> [%d] " - "we are done for now - read more\r\n", toRead) ); + descP->rNumCnt++; + if (descP->rNumCnt >= descP->rNum) { - return esock_make_ok3(env, atom_false, data); + descP->rNumCnt = 0; - } else { + cnt_inc(&descP->readPkgCnt, 1); + + recv_update_current_reader(env, descP); + + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ + data = MKBIN(env, bufP); + + return esock_make_ok3(env, atom_true, data); - /* +++ We got exactly as much as we requested +++ */ + } else { - /* <KOLLA> - * WE NEED TO INFORM ANY WAITING READERS - * - * DEMONP of the current reader! - * - * </KOLLA> - */ + /* Yes, we *do* need to continue reading */ + + if ((xres = recv_init_current_reader(env, + descP, recvRef)) != NULL) { + descP->rNumCnt = 0; + return esock_make_error_str(env, xres); + } + + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ + data = MKBIN(env, bufP); + + return esock_make_ok3(env, atom_false, data); + + } + + } else { + + /* Yes, we *do* need to continue reading */ + + if ((xres = recv_init_current_reader(env, + descP, recvRef)) != NULL) { + descP->rNumCnt = 0; + return esock_make_error_str(env, xres); + } + + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ + data = MKBIN(env, bufP); + + SSDBG( descP, + ("SOCKET", + "recv_check_result -> [%d] " + "we are done for now - read more\r\n", toRead) ); + + return esock_make_ok3(env, atom_false, data); + } + + } else { + + /* +++ We got exactly as much as we requested => We are done +++ */ cnt_inc(&descP->readPkgCnt, 1); cnt_inc(&descP->readByteCnt, read); @@ -13793,6 +13897,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] eagain\r\n", toRead) ); + descP->rNumCnt = 0; if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) return esock_make_error_str(env, xres); @@ -13840,6 +13945,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] split buffer\r\n", toRead) ); + descP->rNumCnt = 0; cnt_inc(&descP->readPkgCnt, 1); cnt_inc(&descP->readByteCnt, read); @@ -15859,6 +15965,8 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->closeMtx = MCREATE(buf); descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT; + descP->rNum = 0; + descP->rNumCnt = 0; descP->rCtrlSz = SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT; descP->wCtrlSz = SOCKET_SEND_CTRL_BUFFER_SIZE_DEFAULT; descP->iow = FALSE; |