diff options
Diffstat (limited to 'erts/emulator/sys/unix/sys.c')
| -rw-r--r-- | erts/emulator/sys/unix/sys.c | 259 | 
1 files changed, 236 insertions, 23 deletions
| diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c index c3d7440409..5de0c281c4 100644 --- a/erts/emulator/sys/unix/sys.c +++ b/erts/emulator/sys/unix/sys.c @@ -34,6 +34,7 @@  #include <termios.h>  #include <ctype.h>  #include <sys/utsname.h> +#include <sys/select.h>  #ifdef ISC32  #include <sys/bsdtypes.h> @@ -91,8 +92,10 @@ static erts_smp_rwmtx_t environ_rwmtx;  #  else  #    define CHLDWTHR 0  #  endif +#  define FDBLOCK 1  #else  #  define CHLDWTHR 0 +#  define FDBLOCK 0  #endif  /*   * [OTP-3906] @@ -121,6 +124,15 @@ struct ErtsSysReportExit_ {  #endif  }; +/* Used by the fd driver iff the fd could not be set to non-blocking */ +typedef struct ErtsSysBlocking_ { +    ErlDrvPDL pdl; +    int res; +    int err; +    unsigned int pkey; +} ErtsSysBlocking; + +  /* This data is shared by these drivers - initialized by spawn_init() */  static struct driver_data {      ErlDrvPort port_num; @@ -129,6 +141,8 @@ static struct driver_data {      int pid;      int alive;      int status; +    int terminating; +    ErtsSysBlocking *blocking;  } *driver_data;			/* indexed by fd */  static ErtsSysReportExit *report_exit_list; @@ -284,7 +298,7 @@ struct {      void (*check_io)(int);      Uint (*size)(void);      Eterm (*info)(void *); -    int (*check_io_debug)(void); +    int (*check_io_debug)(ErtsCheckIoDebugInfo *);  } io_func = {0}; @@ -306,9 +320,9 @@ Eterm erts_check_io_info(void *p)  }  int -erts_check_io_debug(void) +erts_check_io_debug(ErtsCheckIoDebugInfo *ip)  { -    return (*io_func.check_io_debug)(); +    return (*io_func.check_io_debug)(ip);  } @@ -1108,11 +1122,16 @@ void fini_getenv_state(GETENV_STATE *state)  /* Driver interfaces */  static ErlDrvData spawn_start(ErlDrvPort, char*, SysDriverOpts*);  static ErlDrvData fd_start(ErlDrvPort, char*, SysDriverOpts*); +#if FDBLOCK +static void fd_async(void *); +static void fd_ready_async(ErlDrvData drv_data, ErlDrvThreadData thread_data); +#endif  static ErlDrvSSizeT fd_control(ErlDrvData, unsigned int, char *, ErlDrvSizeT,  			       char **, ErlDrvSizeT);  static ErlDrvData vanilla_start(ErlDrvPort, char*, SysDriverOpts*);  static int spawn_init(void);  static void fd_stop(ErlDrvData); +static void fd_flush(ErlDrvData);  static void stop(ErlDrvData);  static void ready_input(ErlDrvData, ErlDrvEvent);  static void ready_output(ErlDrvData, ErlDrvEvent); @@ -1157,8 +1176,12 @@ struct erl_drv_entry fd_driver_entry = {      fd_control,      NULL,      outputv, -    NULL, /* ready_async */ -    NULL, /* flush */ +#if FDBLOCK +    fd_ready_async, /* ready_async */ +#else +    NULL, +#endif +    fd_flush, /* flush */      NULL, /* call */      NULL, /* event */      ERL_DRV_EXTENDED_MARKER, @@ -1212,13 +1235,28 @@ static RETSIGTYPE onchld(int signum)  #endif  } +static int set_blocking_data(struct driver_data *dd) { + +    dd->blocking = erts_alloc(ERTS_ALC_T_SYS_BLOCKING, sizeof(ErtsSysBlocking)); + +    erts_smp_atomic_add_nob(&sys_misc_mem_sz, sizeof(ErtsSysBlocking)); + +    dd->blocking->pdl = driver_pdl_create(dd->port_num); +    dd->blocking->res = 0; +    dd->blocking->err = 0; +    dd->blocking->pkey = driver_async_port_key(dd->port_num); + +    return 1; +} +  static int set_driver_data(ErlDrvPort port_num,  			   int ifd,  			   int ofd,  			   int packet_bytes,  			   int read_write,  			   int exit_status, -			   int pid) +			   int pid, +                           int is_blocking)  {      Port *prt;      ErtsSysReportExit *report_exit; @@ -1250,8 +1288,13 @@ static int set_driver_data(ErlDrvPort port_num,  	driver_data[ifd].pid = pid;  	driver_data[ifd].alive = 1;  	driver_data[ifd].status = 0; +        driver_data[ifd].terminating = 0; +        driver_data[ifd].blocking = NULL;  	if (read_write & DO_WRITE) {  	    driver_data[ifd].ofd = ofd; +            if (is_blocking && FDBLOCK) +                if (!set_blocking_data(driver_data+ifd)) +                    return -1;  	    if (ifd != ofd)  		driver_data[ofd] = driver_data[ifd];  /* structure copy */  	} else {		/* DO_READ only */ @@ -1267,6 +1310,11 @@ static int set_driver_data(ErlDrvPort port_num,  	driver_data[ofd].pid = pid;  	driver_data[ofd].alive = 1;  	driver_data[ofd].status = 0; +        driver_data[ofd].terminating = 0; +        driver_data[ofd].blocking = NULL; +        if (is_blocking && FDBLOCK) +            if (!set_blocking_data(driver_data+ofd)) +                return -1;  	return(ofd);      }  } @@ -1276,6 +1324,7 @@ static int spawn_init()     int i;  #if CHLDWTHR     erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER; +     thr_opts.detached = 0;     thr_opts.suggested_stack_size = 0; /* Smallest possible */  #endif @@ -1755,7 +1804,7 @@ static ErlDrvData spawn_start(ErlDrvPort port_num, char* name, SysDriverOpts* op      }      res = set_driver_data(port_num, ifd[0], ofd[1], opts->packet_bytes, -			  opts->read_write, opts->exit_status, pid); +			  opts->read_write, opts->exit_status, pid, 0);      /* Don't unblock SIGCHLD until now, since the call above must         first complete putting away the info about our new subprocess. */      unblock_signals(); @@ -1840,6 +1889,7 @@ static ErlDrvData fd_start(ErlDrvPort port_num, char* name,  			   SysDriverOpts* opts)  {      ErlDrvData res; +    int non_blocking = 0;      if (((opts->read_write & DO_READ) && opts->ifd >= max_files) ||  	((opts->read_write & DO_WRITE) && opts->ofd >= max_files)) @@ -1912,6 +1962,20 @@ static ErlDrvData fd_start(ErlDrvPort port_num, char* name,       * case - it can be called with any old pre-existing file descriptors,       * the relations between which (if they're even two) we can only guess       * at - still, we try our best... +     * +     * Added note OTP 18: Some systems seem to use stdout/stderr to log data +     * using unix pipes, so we cannot allow the system to block on a write. +     * Therefore we use an async thread to write the data to fd's that could +     * not be set to non-blocking. When no async threads are available we +     * fall back on the old behaviour. +     * +     * Also the guarantee about what is delivered to the OS has changed. +     * Pre 18 the fd driver did no flushing of data before terminating. +     * Now it does. This is because we want to be able to guarantee that things +     * such as escripts and friends really have outputted all data before +     * terminating. This could potentially block the termination of the system +     * for a very long time, but if the user wants to terminate fast she should +     * use erlang:halt with flush=false.       */      if (opts->read_write & DO_READ) { @@ -1934,6 +1998,7 @@ static ErlDrvData fd_start(ErlDrvPort port_num, char* name,  		       imagine a scenario where setting non-blocking mode  		       here would cause problems - go ahead and do it. */ +                    non_blocking = 1;  		    SET_NONBLOCKING(opts->ofd);  		} else {	/* output fd is a tty, input fd isn't */ @@ -1976,6 +2041,7 @@ static ErlDrvData fd_start(ErlDrvPort port_num, char* name,  			(nfd = open(tty, O_WRONLY)) != -1) {  			dup2(nfd, opts->ofd);  			close(nfd); +                        non_blocking = 1;  			SET_NONBLOCKING(opts->ofd);  		    }  		} @@ -1984,8 +2050,9 @@ static ErlDrvData fd_start(ErlDrvPort port_num, char* name,      }      CHLD_STAT_LOCK;      res = (ErlDrvData)(long)set_driver_data(port_num, opts->ifd, opts->ofd, -				      opts->packet_bytes, -				      opts->read_write, 0, -1); +                                            opts->packet_bytes, +                                            opts->read_write, 0, -1, +                                            !non_blocking);      CHLD_STAT_UNLOCK;      return res;  } @@ -2011,14 +2078,30 @@ static void nbio_stop_fd(ErlDrvPort prt, int fd)      SET_BLOCKING(fd);  } -static void fd_stop(ErlDrvData fd)  /* Does not close the fds */ +static void fd_stop(ErlDrvData ev)  /* Does not close the fds */  {      int ofd; +    int fd = (int)(long)ev; +    ErlDrvPort prt = driver_data[fd].port_num; -    nbio_stop_fd(driver_data[(int)(long)fd].port_num, (int)(long)fd); -    ofd = driver_data[(int)(long)fd].ofd; -    if (ofd != (int)(long)fd && ofd != -1)  -	nbio_stop_fd(driver_data[(int)(long)fd].port_num, (int)(long)ofd); +#if FDBLOCK +    if (driver_data[fd].blocking) { +        erts_free(ERTS_ALC_T_SYS_BLOCKING,driver_data[fd].blocking); +        driver_data[fd].blocking = NULL; +        erts_smp_atomic_add_nob(&sys_misc_mem_sz, -1*sizeof(ErtsSysBlocking)); +    } +#endif + +    nbio_stop_fd(prt, fd); +    ofd = driver_data[fd].ofd; +    if (ofd != fd && ofd != -1) +	nbio_stop_fd(prt, ofd); +} + +static void fd_flush(ErlDrvData fd) +{ +    if (!driver_data[(int)(long)fd].terminating) +        driver_data[(int)(long)fd].terminating = 1;  }  static ErlDrvData vanilla_start(ErlDrvPort port_num, char* name, @@ -2041,8 +2124,8 @@ static ErlDrvData vanilla_start(ErlDrvPort port_num, char* name,      CHLD_STAT_LOCK;      res = (ErlDrvData)(long)set_driver_data(port_num, fd, fd, -				      opts->packet_bytes, -				      opts->read_write, 0, -1); +                                            opts->packet_bytes, +                                            opts->read_write, 0, -1, 0);      CHLD_STAT_UNLOCK;      return res;  } @@ -2079,6 +2162,7 @@ static void stop(ErlDrvData fd)      }  } +/* used by fd_driver */  static void outputv(ErlDrvData e, ErlIOVec* ev)  {      int fd = (int)(long)e; @@ -2104,12 +2188,21 @@ static void outputv(ErlDrvData e, ErlIOVec* ev)      ev->iov[0].iov_base = lbp;      ev->iov[0].iov_len = pb;      ev->size += pb; + +    if (driver_data[fd].blocking && FDBLOCK) +        driver_pdl_lock(driver_data[fd].blocking->pdl); +      if ((sz = driver_sizeq(ix)) > 0) {  	driver_enqv(ix, ev, 0); + +        if (driver_data[fd].blocking && FDBLOCK) +            driver_pdl_unlock(driver_data[fd].blocking->pdl); +  	if (sz + ev->size >= (1 << 13))  	    set_busy_port(ix, 1);      } -    else { +    else if (!driver_data[fd].blocking || !FDBLOCK) { +        /* We try to write directly if the fd in non-blocking */  	int vsize = ev->vsize > MAX_VSIZE ? MAX_VSIZE : ev->vsize;  	n = writev(ofd, (const void *) (ev->iov), vsize); @@ -2125,10 +2218,22 @@ static void outputv(ErlDrvData e, ErlIOVec* ev)  	driver_enqv(ix, ev, n);  /* n is the skip value */  	driver_select(ix, ofd, ERL_DRV_WRITE|ERL_DRV_USE, 1);      } +#if FDBLOCK +    else { +        if (ev->size != 0) { +            driver_enqv(ix, ev, 0); +            driver_pdl_unlock(driver_data[fd].blocking->pdl); +            driver_async(ix, &driver_data[fd].blocking->pkey, +                         fd_async, driver_data+fd, NULL); +        } else { +            driver_pdl_unlock(driver_data[fd].blocking->pdl); +        } +    } +#endif      /* return 0;*/  } - +/* Used by spawn_driver and vanilla driver */  static void output(ErlDrvData e, char* buf, ErlDrvSizeT len)  {      int fd = (int)(long)e; @@ -2191,6 +2296,23 @@ static int port_inp_failure(ErlDrvPort port_num, int ready_fd, int res)      ASSERT(res <= 0);      (void) driver_select(port_num, ready_fd, ERL_DRV_READ|ERL_DRV_WRITE, 0);       clear_fd_data(ready_fd); + +    if (driver_data[ready_fd].blocking && FDBLOCK) { +        driver_pdl_lock(driver_data[ready_fd].blocking->pdl); +        if (driver_sizeq(driver_data[ready_fd].port_num) > 0) { +            driver_pdl_unlock(driver_data[ready_fd].blocking->pdl); +            /* We have stuff in the output queue, so we just +               set the state to terminating and wait for fd_async_ready +               to terminate the port */ +            if (res == 0) +                driver_data[ready_fd].terminating = 2; +            else +                driver_data[ready_fd].terminating = -err; +            return 0; +        } +        driver_pdl_unlock(driver_data[ready_fd].blocking->pdl); +    } +      if (res == 0) {  	if (driver_data[ready_fd].report_exit) {  	    CHLD_STAT_LOCK; @@ -2241,6 +2363,7 @@ static void ready_input(ErlDrvData e, ErlDrvEvent ready_fd)      port_num = driver_data[fd].port_num;      packet_bytes = driver_data[fd].packet_bytes; +      if (packet_bytes == 0) {  	byte *read_buf = (byte *) erts_alloc(ERTS_ALC_T_SYS_READ_BUF,  					     ERTS_SYS_READ_BUF_SZ); @@ -2364,6 +2487,8 @@ static void ready_output(ErlDrvData e, ErlDrvEvent ready_fd)      if ((iv = (struct iovec*) driver_peekq(ix, &vsize)) == NULL) {  	driver_select(ix, ready_fd, ERL_DRV_WRITE, 0); +        if (driver_data[fd].terminating) +            driver_failure_atom(driver_data[fd].port_num,"normal");  	return; /* 0; */      }      vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize; @@ -2389,6 +2514,82 @@ static void stop_select(ErlDrvEvent fd, void* _)      close((int)fd);  } +#if FDBLOCK + +static void +fd_async(void *async_data) +{ +    int res; +    struct driver_data *dd = (struct driver_data*)async_data; +    SysIOVec      *iov0; +    SysIOVec      *iov; +    int            iovlen; +    int            iovcnt; +    int            p; +    /* much of this code is stolen from efile_drv:invoke_writev */ +    driver_pdl_lock(dd->blocking->pdl); +    iov0 = driver_peekq(dd->port_num, &iovlen); +    /* Calculate iovcnt */ +    for (p = 0, iovcnt = 0; iovcnt < iovlen; +         p += iov0[iovcnt++].iov_len) +        ; +    iov = erts_alloc_fnf(ERTS_ALC_T_SYS_WRITE_BUF, +                         sizeof(SysIOVec)*iovcnt); +    if (!iov) { +        res = -1; +        errno = ENOMEM; +        erts_free(ERTS_ALC_T_SYS_WRITE_BUF, iov); +        driver_pdl_unlock(dd->blocking->pdl); +    } else { +        memcpy(iov,iov0,iovcnt*sizeof(SysIOVec)); +        driver_pdl_unlock(dd->blocking->pdl); + +        res = writev(dd->ofd, iov, iovlen); + +        erts_free(ERTS_ALC_T_SYS_WRITE_BUF, iov); +    } +    dd->blocking->res = res; +    dd->blocking->err = errno; +} + +void fd_ready_async(ErlDrvData drv_data, +                    ErlDrvThreadData thread_data) { +    struct driver_data *dd = (struct driver_data *)thread_data; +    ErlDrvPort port_num = dd->port_num; + +    ASSERT(dd->blocking); +    ASSERT(dd == (driver_data + (int)(long)drv_data)); + +    if (dd->blocking->res > 0) { +        driver_pdl_lock(dd->blocking->pdl); +        if (driver_deq(port_num, dd->blocking->res) == 0) { +            driver_pdl_unlock(dd->blocking->pdl); +            set_busy_port(port_num, 0); +            if (dd->terminating) { +                /* The port is has been ordered to terminate +                   from either fd_flush or port_inp_failure */ +                if (dd->terminating == 1) +                    driver_failure_atom(port_num, "normal"); +                else if (dd->terminating == 2) +                    driver_failure_eof(port_num); +                else if (dd->terminating < 0) +                    driver_failure_posix(port_num, -dd->terminating); +                return; /* -1; */ +            } +        } else { +            driver_pdl_unlock(dd->blocking->pdl); +            /* still data left to write in queue */ +            driver_async(port_num, &dd->blocking->pkey, fd_async, dd, NULL); +            return /* 0; */; +        } +    } else if (dd->blocking->res < 0) { +        driver_failure_posix(port_num, dd->blocking->err); +        return; /* -1; */ +    } +    return; /* 0; */ +} + +#endif  void erts_do_break_handling(void)  { @@ -2658,18 +2859,30 @@ void sys_preload_end(Preload* p)      /* Nothing */  } -/* Read a key from console (?) */ - +/* Read a key from console, used by break.c +   Here we assume that all schedulers are stopped so that erl_poll +   does not interfere with the select below. +*/  int sys_get_key(fd)  int fd;  { -    int c; +    int c, ret;      unsigned char rbuf[64]; +    fd_set fds;      fflush(stdout);		/* Flush query ??? */ -    if ((c = read(fd,rbuf,64)) <= 0) { -      return c;  +    FD_ZERO(&fds); +    FD_SET(fd,&fds); + +    ret = select(fd+1, &fds, NULL, NULL, NULL); + +    if (ret == 1) { +        do { +            c = read(fd,rbuf,64); +        } while (c < 0 && errno == EAGAIN); +        if (c <= 0) +            return c;      }      return rbuf[0];  | 
