aboutsummaryrefslogblamecommitdiffstats
path: root/erts/lib_src/pthread/ethr_event.c
blob: 464875570aa761f7d610aea22061f162369b934d (plain) (tree)
1
2
3
4
5
6
7
8
9


                   
                                                        
  


                                                                   
  






                                                                           










                                         



                    






                                                                   
                    


                                      








                                                                                





                       


                              
                                                    



             





                                       





                                 
                                                           


                            
                      
                                                  




                                                                     



                                   



                      


                                          







                                                

     

                   
                                                
                                       
                                     











                                                          














                                                                 
                                             


                                                                

                                       
                                     


                                                 

                                             







                                                   
                  
                
                                    
         

     





                                              


                            









                                                                                

                           










                             
















                                                




























                                                                 
































                                                                     




                              
 
                                                    
 


                                            

                                                                  



                                       













                                                          












                                            






                                 



                        



                        


                                         
                                         


                      
                                                           

                       
                      

                                                  
                           
                                        
                                                                            












                                        
                                        

                                                 
      


                                     



                                                 
                      




                                          
                           


                                             


                  




                                   
                                            
                                   

                                 











                                                      




                                                  


                                                 



























                                                                



















                                                                             
                                                                              





                                               
                          
                 















                                                          
     



                     



                              


                                        




                                                        


                             
 
                       
 
                              
 





                                                       




                                                        

































                                                                       





                                                                    
                       
                       





                          
 
                                                                   












                                                           
 
                                            
                                   
                                 




                                                     

     




                                              
 
             




















                                      
                            




                                              












                                                                      
 
/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB 2009-2016. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * %CopyrightEnd%
 */

/*
 * Author: Rickard Green
 */

#define ETHR_INLINE_FUNC_NAME_(X) X ## __
#define ETHR_EVENT_IMPL__

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#if defined(__APPLE__) && defined(__MACH__) && !defined(__DARWIN__)
#  define __DARWIN__ 1
#endif
#ifdef __DARWIN__
#  define _DARWIN_UNLIMITED_SELECT
#endif

#include "ethread.h"
#undef ETHR_INCLUDE_MONOTONIC_CLOCK__
#define ETHR_INCLUDE_MONOTONIC_CLOCK__
#include "ethr_internal.h"

#if defined(ETHR_LINUX_FUTEX_IMPL__)
/* --- Linux futex implementation of ethread events ------------------------- */

#include <sched.h>
#include <errno.h>

#define ETHR_YIELD_AFTER_BUSY_LOOPS 50

void
ethr_init_event__(void)
{

}

int
ethr_event_init(ethr_event *e)
{
    ethr_atomic32_init(&e->futex, ETHR_EVENT_OFF__);
    return 0;
}

int
ethr_event_prepare_timed(ethr_event *e)
{
    return 0;
}

int
ethr_event_destroy(ethr_event *e)
{
    return 0;
}

static ETHR_INLINE int
wait__(ethr_event *e, int spincount, ethr_sint64_t timeout)
{
    unsigned sc = spincount;
    int res;
    ethr_sint32_t val;
    int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS;
    ethr_sint64_t time = 0; /* SHUT UP annoying faulty warning... */
    struct timespec ts, *tsp;
#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
    ethr_sint64_t start = 0; /* SHUT UP annoying faulty warning... */
#endif

    if (spincount < 0)
	ETHR_FATAL_ERROR__(EINVAL);

    if (timeout < 0) {
	tsp = NULL;
    }
    else {
#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
	start = ethr_get_monotonic_time();
#endif
	tsp = &ts;
	time = timeout;
	if (spincount == 0) {
	    val = ethr_atomic32_read(&e->futex);
	    if (val == ETHR_EVENT_ON__)
		goto return_event_on;
	    goto set_timeout;
	}
    }

    while (1) {
	while (1) {
	    val = ethr_atomic32_read(&e->futex);
	    if (val == ETHR_EVENT_ON__)
		goto return_event_on;
	    if (sc == 0)
		break;
	    sc--;
	    ETHR_SPIN_BODY;
	    if (--until_yield == 0) {
		until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS;
		res = ETHR_YIELD();
		if (res != 0)
		    ETHR_FATAL_ERROR__(res);
	    }
	}

	if (timeout >= 0) {
#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
	    time = timeout - (ethr_get_monotonic_time() - start);
#endif
	set_timeout:
	    if (time <= 0) {
		val = ethr_atomic32_read(&e->futex);
		if (val == ETHR_EVENT_ON__)
		    goto return_event_on;
		return ETIMEDOUT;
	    }
	    ts.tv_sec = time / (1000*1000*1000);
	    ts.tv_nsec = time % (1000*1000*1000);
	}

	if (val != ETHR_EVENT_OFF_WAITER__) {
	    val = ethr_atomic32_cmpxchg(&e->futex,
					ETHR_EVENT_OFF_WAITER__,
					ETHR_EVENT_OFF__);

	    if (val == ETHR_EVENT_ON__)
		goto return_event_on;
	    ETHR_ASSERT(val == ETHR_EVENT_OFF__);
	}

	res = ETHR_FUTEX__(&e->futex,
			   ETHR_FUTEX_WAIT__,
			   ETHR_EVENT_OFF_WAITER__,
			   tsp);
	switch (res) {
	case EINTR:
	case ETIMEDOUT:
	    return res;
	case 0:
	case EWOULDBLOCK:
	    break;
	default:
	    ETHR_FATAL_ERROR__(res);
	}
    }

return_event_on:

    ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);

    return 0;

}

#elif defined(ETHR_PTHREADS)
/* --- Posix mutex/cond pipe/select implementation of events ---------------- */

#include <fcntl.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include <sys/select.h>
#include <errno.h>
#include <string.h>

#include "erl_misc_utils.h"

#ifdef __DARWIN__

struct ethr_event_fdsets___ {
    fd_set *rsetp;
    fd_set *esetp;
    size_t mem_size;
    fd_mask mem[1];
};

#endif

static void
setup_nonblocking_pipe(ethr_event *e)
{
    int flgs;
    int res;

    res = pipe(e->fd);
    if (res != 0)
	ETHR_FATAL_ERROR__(errno);

    ETHR_ASSERT(e->fd[0] >= 0 && e->fd[1] >= 0);

    flgs = fcntl(e->fd[0], F_GETFL, 0);
    fcntl(e->fd[0], F_SETFL, flgs | O_NONBLOCK);
    flgs = fcntl(e->fd[1], F_GETFL, 0);
    fcntl(e->fd[1], F_SETFL, flgs | O_NONBLOCK);


#ifndef __DARWIN__
    if (e->fd[0] >= FD_SETSIZE)
	ETHR_FATAL_ERROR__(ENOTSUP);
#else
    {
	int nmasks;
	ethr_event_fdsets__ *fdsets;
	size_t mem_size;

	nmasks = (e->fd[0]+NFDBITS)/NFDBITS;
	mem_size = 2*nmasks*sizeof(fd_mask);
	if (mem_size < 2*sizeof(fd_set)) {
	    mem_size = 2*sizeof(fd_set);
	    nmasks = mem_size/(2*sizeof(fd_mask));
	}

	fdsets = malloc(sizeof(ethr_event_fdsets__)
			+ mem_size
			- sizeof(fd_mask));
	if (!fdsets)
	    ETHR_FATAL_ERROR__(ENOMEM);
	fdsets->rsetp = (fd_set *) (char *) &fdsets->mem[0];
	fdsets->esetp = (fd_set *) (char *) &fdsets->mem[nmasks];
	fdsets->mem_size = mem_size;
	e->fdsets = fdsets;
    }
#endif

    ETHR_MEMBAR(ETHR_StoreStore);
}

#define ETHR_EVENT_INVALID_FD__ -1
#define ETHR_EVENT_COND_TIMEDWAIT__ -2

#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
static pthread_condattr_t monotonic_clock_cond_attr;
#endif
static pthread_condattr_t *monotonic_clock_cond_attr_p;

#ifndef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
#  undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
#endif
#ifndef ETHR_MONOTONIC_CLOCK_ID
#  undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
#endif

void
ethr_init_event__(void)
{
    monotonic_clock_cond_attr_p = NULL;
#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
    if (!ethr_get_monotonic_time_is_broken()
	&& pthread_condattr_init(&monotonic_clock_cond_attr) == 0) {
	if (pthread_condattr_setclock(&monotonic_clock_cond_attr,
				      ETHR_MONOTONIC_CLOCK_ID) == 0)
	    monotonic_clock_cond_attr_p = &monotonic_clock_cond_attr;
	else
	    pthread_condattr_destroy(&monotonic_clock_cond_attr);
    }
#endif
}

int
ethr_event_init(ethr_event *e)
{
    int res;

    ethr_atomic32_init(&e->state, ETHR_EVENT_OFF__);

    res = pthread_mutex_init(&e->mtx, NULL);
    if (res != 0)
	return res;

    res = pthread_cond_init(&e->cnd, monotonic_clock_cond_attr_p);
    if (res != 0) {
	pthread_mutex_destroy(&e->mtx);
	return res;
    }

#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
    /*
     * If ethr_get_monotonic_time() is broken we
     * fall back on the pipe/select solution...
     */
    if (monotonic_clock_cond_attr_p) {
	e->fd[0] = e->fd[1] = ETHR_EVENT_COND_TIMEDWAIT__;
	return 0;
    }
#endif

    e->fd[0] = e->fd[1] = ETHR_EVENT_INVALID_FD__;

#ifdef __DARWIN__
    e->fdsets = NULL;
#endif

    return 0;
}

int
ethr_event_prepare_timed(ethr_event *e)
{
    if (e->fd[0] == ETHR_EVENT_INVALID_FD__)
	setup_nonblocking_pipe(e);

    return 0;
}

int
ethr_event_destroy(ethr_event *e)
{
    int res;
    if (e->fd[0] >= 0) {
	close(e->fd[0]);
	close(e->fd[1]);
    }
#ifdef __DARWIN__
    if (e->fdsets)
	free(e->fdsets);
#endif
    res = pthread_mutex_destroy(&e->mtx);
    if (res != 0)
	return res;
    return pthread_cond_destroy(&e->cnd);
}

static ETHR_INLINE int
wait__(ethr_event *e, int spincount, ethr_sint64_t timeout)
{
    int sc = spincount;
    ethr_sint32_t val;
    int res, ulres;
    int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS;
    ethr_sint64_t time = 0;
#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
    ethr_sint64_t timeout_time = 0; /* SHUT UP annoying faulty warning... */
#endif

    val = ethr_atomic32_read(&e->state);
    if (val == ETHR_EVENT_ON__)
	goto return_event_on;

    if (timeout < 0) {
	if (spincount == 0)
	    goto set_event_off_waiter;
    }
    if (timeout == 0)
	return ETIMEDOUT;
    else {
#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
	timeout_time = ethr_get_monotonic_time();
	timeout_time += timeout;
#endif
	switch (e->fd[0]) {
	case ETHR_EVENT_INVALID_FD__:
	    time = timeout;
	    setup_nonblocking_pipe(e);
	    break;
#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
	case ETHR_EVENT_COND_TIMEDWAIT__:
	    time = -1;
	    if (spincount == 0)
		goto set_event_off_waiter;
	    break;
#endif
	default:
	    time = timeout;
	    /* Already initialized pipe... */
	    if (spincount == 0)
		goto set_select_timeout;
	    break;
	}
    }

    if (spincount < 0)
	ETHR_FATAL_ERROR__(EINVAL);

    while (1) {
	val = ethr_atomic32_read(&e->state);
	if (val == ETHR_EVENT_ON__)
	    goto return_event_on;
	
	if (sc == 0)
	    break;
	sc--;
	ETHR_SPIN_BODY;
	if (--until_yield == 0) {
	    until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS;
	    res = ETHR_YIELD();
	    if (res != 0)
		ETHR_FATAL_ERROR__(res);
	}
    }

    if (timeout < 0
#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
	|| e->fd[0] == ETHR_EVENT_COND_TIMEDWAIT__
#endif
	) {
#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
	struct timespec cond_timeout;
#endif

    set_event_off_waiter:

	if (val != ETHR_EVENT_OFF_WAITER__) {
	    ethr_sint32_t act;
	    act = ethr_atomic32_cmpxchg(&e->state,
					ETHR_EVENT_OFF_WAITER__,
					val);
	    if (act == ETHR_EVENT_ON__)
		goto return_event_on;
	    ETHR_ASSERT(act == val);
	}

	res = pthread_mutex_lock(&e->mtx);
	if (res != 0)
	    ETHR_FATAL_ERROR__(res);

	while (1) {

	    val = ethr_atomic32_read(&e->state);
	    if (val == ETHR_EVENT_ON__) {
		ETHR_ASSERT(res == 0);
		ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
		break;
	    }

#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
	    if (timeout > 0) {
		if (time != timeout_time) {
		    time = timeout_time;

#if ERTS_USE_PREMATURE_TIMEOUT
		    {
			ethr_sint64_t rtmo;

			rtmo = timeout_time - ethr_get_monotonic_time();
			if (rtmo <= 0) {
			    res = ETIMEDOUT;
			    break;
			}
			time = timeout_time;
			time -= ERTS_PREMATURE_TIMEOUT(rtmo, 1000*1000*1000);
		    }
#endif

		    cond_timeout.tv_sec = time / (1000*1000*1000);
		    cond_timeout.tv_nsec = time % (1000*1000*1000);
		}
		res = pthread_cond_timedwait(&e->cnd, &e->mtx, &cond_timeout);
		if (res == EINTR
		    || (res == ETIMEDOUT
#if ERTS_USE_PREMATURE_TIMEOUT
			&& time == timeout_time
#endif
			)) {
		    break;
		}
	    }
	    else
#endif
	    {
		res = pthread_cond_wait(&e->cnd, &e->mtx);
		if (res == EINTR)
		    break;
	    }
	    if (res != 0)
		ETHR_FATAL_ERROR__(res);
	}

	ulres = pthread_mutex_unlock(&e->mtx);
	if (ulres != 0)
	    ETHR_FATAL_ERROR__(ulres);

    }
    else {
	int fd;
	int sres;
	ssize_t rres;
#ifndef __DARWIN__
	fd_set rset, eset;
#endif
	fd_set *rsetp, *esetp;
	struct timeval select_timeout;

#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
#if ERTS_USE_PREMATURE_TIMEOUT
    restart_select:
#endif

	time = timeout_time - ethr_get_monotonic_time();
	if (time <= 0)
	    return ETIMEDOUT;
#endif

    set_select_timeout:

	ETHR_ASSERT(time > 0);

	/*
	 * timeout in nano-second, but we can only wait
	 * for micro-seconds...
	 */
	time = ((time - 1) / 1000) + 1;

#if defined(ETHR_HAVE_ETHR_GET_MONOTONIC_TIME) \
    && ERTS_USE_PREMATURE_TIMEOUT
	time -= ERTS_PREMATURE_TIMEOUT(time, 1000*1000);
#endif

	select_timeout.tv_sec = time / (1000*1000);
	select_timeout.tv_usec = time % (1000*1000);

	ETHR_ASSERT(val != ETHR_EVENT_ON__);

	fd = e->fd[0];

	/* Cleanup pipe... */
	do {
	    char buf[64];
	    rres = read(fd, buf, sizeof(buf));
	} while (rres > 0 || (rres < 0 && errno == EINTR));
	if (rres < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
	    ETHR_FATAL_ERROR__(errno);

	/*
	 * Need to verify that state is still off
	 * after cleaning the pipe...
	 */
	if (val == ETHR_EVENT_OFF_WAITER_SELECT__) {
	    val = ethr_atomic32_read(&e->state);
	    if (val == ETHR_EVENT_ON__)
		goto return_event_on;
	}
	else {
	    ethr_sint32_t act;
	    act = ethr_atomic32_cmpxchg(&e->state,
					ETHR_EVENT_OFF_WAITER_SELECT__,
					val);
	    if (act == ETHR_EVENT_ON__)
		goto return_event_on;
	    ETHR_ASSERT(act == val);
	}


#ifdef __DARWIN__
	rsetp = e->fdsets->rsetp;
	esetp = e->fdsets->esetp;
	memset((void *) &e->fdsets->mem[0], 0, e->fdsets->mem_size);
#else
	FD_ZERO(&rset);
	FD_ZERO(&eset);
	rsetp = &rset;
	esetp = &eset;
#endif

	FD_SET(fd, rsetp);
	FD_SET(fd, esetp);

	sres = select(fd + 1, rsetp, NULL, esetp, &select_timeout);
	if (sres == 0)
	    res = ETIMEDOUT;
	else {
	    res = EINTR;
	    if (sres < 0 && errno != EINTR)
		ETHR_FATAL_ERROR__(errno);
	    /* else:
	     *   Event is *probably* set, but it can be a
	     *   lingering writer. That is, it is important
	     *   that we verify that it actually is set. If
	     *   it isn't, return EINTR (spurious wakeup).
	     */
	}

	val = ethr_atomic32_read(&e->state);
	if (val == ETHR_EVENT_ON__)
	    goto return_event_on;
#if defined(ETHR_HAVE_ETHR_GET_MONOTONIC_TIME) \
    && ERTS_USE_PREMATURE_TIMEOUT
	if (res == ETIMEDOUT)
	    goto restart_select; /* Verify timeout */
#endif
    }

    return res;

return_event_on:

    ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);

    return 0;
}

#else
#error No ethread event implementation
#endif

void
ethr_event_reset(ethr_event *e)
{
    ethr_event_reset__(e);
}

void
ethr_event_set(ethr_event *e)
{
    ethr_event_set__(e);
}

int
ethr_event_wait(ethr_event *e)
{
    return wait__(e, 0, -1);
}

int
ethr_event_swait(ethr_event *e, int spincount)
{
    return wait__(e, spincount, -1);
}

int
ethr_event_twait(ethr_event *e, ethr_sint64_t timeout)
{
    return wait__(e, 0, timeout);
}

int
ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout)
{
    return wait__(e, spincount, timeout);
}