aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/beam/erl_io_queue.h
blob: 7d0fe6751c97bd3e597a48b8e0851fa72209b271 (plain) (tree)








































































































                                                                               
                                                             






                                                                     
                                                             










                                                                     
                                                              










































































                                                                                     
/*
 * %CopyrightBegin%
 * 
 * Copyright Ericsson AB 2017. All Rights Reserved.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * 
 * %CopyrightEnd%
 */

/*
 * Description: A queue used for storing binary data that should be
 *              passed to writev or similar functions. Used by both
 *              the nif and driver api.
 *
 * Author:      Lukas Larsson
 */

#ifndef ERL_IO_QUEUE_H__TYPES__
#define ERL_IO_QUEUE_H__TYPES__

#define ERTS_BINARY_TYPES_ONLY__
#include "erl_binary.h"
#undef ERTS_BINARY_TYPES_ONLY__
#include "erl_nif.h"

#ifdef DEBUG
#define MAX_SYSIOVEC_IOVLEN (1ull << (32 - 1))
#else
#define MAX_SYSIOVEC_IOVLEN (1ull << (sizeof(((SysIOVec*)0)->iov_len) * 8 - 1))
#endif

#define ERTS_SMALL_IO_QUEUE 5

typedef union {
    ErlDrvBinary driver;
    Binary nif;
} ErtsIOQBinary;

typedef struct {
    int vsize;         /* length of vectors */
    Uint size;         /* total size in bytes */
    SysIOVec* iov;
    ErtsIOQBinary** binv;
} ErtsIOVecCommon;

typedef union {
    ErtsIOVecCommon common;
    ErlIOVec driver;
    ErlNifIOVec nif;
} ErtsIOVec;

/* head/tail represent the data in the queue
 * start/end represent the edges of the allocated queue
 * small is used when the number of iovec elements is < SMALL_IO_QUEUE
 */
typedef struct erts_io_queue {
    ErtsAlcType_t alct;
    int driver;
    Uint size;       /* total size in bytes */

    SysIOVec* v_start;
    SysIOVec* v_end;
    SysIOVec* v_head;
    SysIOVec* v_tail;
    SysIOVec  v_small[ERTS_SMALL_IO_QUEUE];

    ErtsIOQBinary **b_start;
    ErtsIOQBinary **b_end;
    ErtsIOQBinary **b_head;
    ErtsIOQBinary **b_tail;
    ErtsIOQBinary  *b_small[ERTS_SMALL_IO_QUEUE];

} ErtsIOQueue;

#endif /* ERL_IO_QUEUE_H__TYPES__ */

#if !defined(ERL_IO_QUEUE_H) && !defined(ERTS_IO_QUEUE_TYPES_ONLY__)
#define ERL_IO_QUEUE_H

#include "erl_binary.h"
#include "erl_bits.h"

void erts_ioq_init(ErtsIOQueue *q, ErtsAlcType_t alct, int driver);
void erts_ioq_clear(ErtsIOQueue *q);
Uint erts_ioq_size(ErtsIOQueue *q);
int erts_ioq_enqv(ErtsIOQueue *q, ErtsIOVec *vec, Uint skip);
int erts_ioq_pushqv(ErtsIOQueue *q, ErtsIOVec *vec, Uint skip);
int erts_ioq_deq(ErtsIOQueue *q, Uint Uint);
Uint erts_ioq_peekqv(ErtsIOQueue *q, ErtsIOVec *ev);
SysIOVec *erts_ioq_peekq(ErtsIOQueue *q, int *vlenp);
Uint erts_ioq_sizeq(ErtsIOQueue *q);

int erts_ioq_iolist_vec_len(Eterm obj, int* vsize, Uint* csize,
                            Uint* pvsize, Uint* pcsize,
                            size_t* total_size, Uint blimit);
int erts_ioq_iolist_to_vec(Eterm obj, SysIOVec* iov,
                           ErtsIOQBinary** binv, ErtsIOQBinary* cbin,
                           Uint bin_limit, int driver_binary);

ERTS_GLB_INLINE
int erts_ioq_iodata_vec_len(Eterm obj, int* vsize, Uint* csize,
                            Uint* pvsize, Uint* pcsize,
                            size_t* total_size, Uint blimit);
ERTS_GLB_INLINE
int erts_ioq_iodata_to_vec(Eterm obj, SysIOVec* iov,
                           ErtsIOQBinary** binv, ErtsIOQBinary* cbin,
                           Uint bin_limit, int driver_binary);


#if ERTS_GLB_INLINE_INCL_FUNC_DEF

ERTS_GLB_INLINE
int erts_ioq_iodata_vec_len(Eterm obj, int* vsize, Uint* csize,
                            Uint* pvsize, Uint* pcsize,
                            size_t* total_size, Uint blimit) {
  if (is_binary(obj)) {
    /* We optimize for when we get a procbin without a bit-offset
     * that fits in one iov slot
     */
    Eterm real_bin;
    byte bitoffs;
    byte bitsize;
    ERTS_DECLARE_DUMMY(Uint offset);
    Uint size = binary_size(obj);
    ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize);
    if (size < MAX_SYSIOVEC_IOVLEN && bitoffs == 0 && bitsize == 0) {
      *vsize = 1;
      *pvsize = 1;
      if (thing_subtag(*binary_val(real_bin)) == REFC_BINARY_SUBTAG) {
          *csize = 0;
          *pcsize = 0;
      } else {
          *csize = size;
          *pcsize = size;
      }
      *total_size = size;
      return 0;
    }
  }

  return erts_ioq_iolist_vec_len(obj, vsize, csize,
                                 pvsize, pcsize, total_size, blimit);
}

ERTS_GLB_INLINE
int erts_ioq_iodata_to_vec(Eterm obj,
                           SysIOVec *iov,
                           ErtsIOQBinary **binv,
                           ErtsIOQBinary  *cbin,
                           Uint bin_limit,
                           int driver)
{
    if (is_binary(obj)) {
        Eterm real_bin;
        byte bitoffs;
        byte bitsize;
        Uint offset;
        Uint size = binary_size(obj);
        ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize);
        if (size < MAX_SYSIOVEC_IOVLEN && bitoffs == 0 && bitsize == 0) {
            Eterm *bptr = binary_val(real_bin);
            if (thing_subtag(*bptr) == REFC_BINARY_SUBTAG) {
                ProcBin *pb = (ProcBin *)bptr;
                if (pb->flags)
                    erts_emasculate_writable_binary(pb);
                iov[0].iov_base = pb->bytes+offset;
                iov[0].iov_len = size;
                if (driver)
                    binv[0] = (ErtsIOQBinary*)Binary2ErlDrvBinary(pb->val);
                else
                    binv[0] = (ErtsIOQBinary*)pb->val;
                return 1;
            } else {
                ErlHeapBin* hb = (ErlHeapBin *)bptr;
                byte *buf = driver ? (byte*)cbin->driver.orig_bytes :
                    (byte*)cbin->nif.orig_bytes;
		copy_binary_to_buffer(buf, 0, ((byte *) hb->data)+offset, 0, 8*size);
                iov[0].iov_base = buf;
                iov[0].iov_len = size;
                binv[0] = cbin;
                return 1;
            }
        }
    }
    return erts_ioq_iolist_to_vec(obj, iov, binv, cbin, bin_limit, driver);
}

#endif

#endif /* ERL_IO_QUEUE_H */