aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/beam/erl_port_task.c
blob: edcd3112da1b1acc42f384a0e8398a209005e735 (plain) (tree)
1
2
3
4
5
6
7
8
9
10

                   
  
                                                        
  




                                                                      
  



                                                                         
  

















                                        
                           
                   







                                                






                                           
 
                    








                                                                    


                                                     
 

                                                      


                                         

                      
                              
                          












                                               

  
                                                               


                                              
                                      
                                                      



















                                                        




                            





                                                       


                                     
                                                            




                               


                                                        





                                                       
                               
               

                                                          











                                                         








                                         

                                                 
 
                                                                         











                                                          
                                 


                                          
                                                                             






                                                  


                        
 

                                         
 









                                                           

              


                                              
         
                                      
     

                                            

 

                                         
 
                      
 



                                    
     















                                                                  


                       
                                             
 



















                                                                        
     

 


                                                                    
 
                      
 

                                   
 



                                       
 

                               
 











                                                                         
     

                                                
 





                          
                                              
 
            
                      


                                                                       

                            












                                                             
 





                                                                           
 
                                         
 










                                                                               
 

                    

     


                                                
 
               









                                                 
                            
 



                                 

                       

                             


                                                    
                                   

     
                                 
 


                                                
 
                                  

               



                                                    


      



                            

                     
 
                                                                     

                   
                              




                                                            
                                                                       










                                                                       
            


              
                          
 



                                  
     


















                                                                         

         














                                                                     
                                   




                                                     
 



                                                   
     











                                               
             











                                               




                                  
                        
                       

                                                   
                                                                         
 
                              









                                                               
 




                                                            
 
  
                                     

                                                                  
                                                                    




                                                               
             
                        

                                      
                                      
                         

                        


                                                         





                        
                               

                       















                                                                        



                             




                                                
                                        
 

                                             
 


                                 
 










                                                                            

                          





                                                       

                                           
                                                                               
                                                  
                                                                   
             


                                         
                                                      
                                                  
                                                  
                                                                                  

                                                                  



                                          
                                                      
                                                  
                                                   

                                                                   



                                         
                                                      
                                                  
                                            


                                                                











                                                             

                                                                             

                                             
                                                       











                                                       

                                     
 


                                          
 
                                           
 





                                 


                                       




                                                
                            



                                                                             

     
               
                                                                              

      








                                       

                                                     





                            
                                                                             
 




                                                                       




                                                                
                                                                         


                                       



                                            

                                     

         

     
      
                                                                         
                               
 






                                                         






                                       

           
                                                   
 

                        


                                                   
 







































                                                                           
                              

                                


                                        
                                                    
                                                              

                                              

                                       
                                                    
                                                              

                                               

                                      
                                                    
                                                              

                                         








                                                        

                                         


         












                                                              




                                

                                                                        


               
 




                                                                            
                                                                                

                         
 

                                   
 




                                                                               

                                                               
              









                         

                                                                  
                           
 
/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB 2006-2012. All Rights Reserved.
 *
 * The contents of this file are subject to the Erlang Public License,
 * Version 1.1, (the "License"); you may not use this file except in
 * compliance with the License. You should have received a copy of the
 * Erlang Public License along with this software. If not, it can be
 * retrieved online at http://www.erlang.org/.
 *
 * Software distributed under the License is distributed on an "AS IS"
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 * the License for the specific language governing rights and limitations
 * under the License.
 *
 * %CopyrightEnd%
 */

/*
 * Description:	Scheduling of port tasks
 *
 * Author: 	Rickard Green
 */

#define ERL_PORT_TASK_C__

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "global.h"
#include "erl_port_task.h"
#include "dist.h"
#include "dtrace-wrapper.h"
#include <stdarg.h>

#if defined(DEBUG) && 0
#define HARD_DEBUG
#endif

/*
 * Costs in reductions for some port operations.
 */
#define ERTS_PORT_REDS_EXECUTE		10
#define ERTS_PORT_REDS_FREE		100
#define ERTS_PORT_REDS_TIMEOUT		400
#define ERTS_PORT_REDS_INPUT		400
#define ERTS_PORT_REDS_OUTPUT		400
#define ERTS_PORT_REDS_EVENT		400
#define ERTS_PORT_REDS_TERMINATE	200

#ifdef USE_VM_PROBES
#define DTRACE_DRIVER(PROBE_NAME, PP)                              \
    if (DTRACE_ENABLED(driver_ready_input)) {                      \
        DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);         \
        DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);            \
                                                                   \
        dtrace_pid_str(PP->connected, process_str);                \
        dtrace_port_str(PP, port_str);                             \
        DTRACE3(PROBE_NAME, process_str, port_str, PP->name);      \
    }
#else
#define  DTRACE_DRIVER(PROBE_NAME, PP) do {} while(0)
#endif

erts_smp_atomic_t erts_port_task_outstanding_io_tasks;

#define ERTS_PT_STATE_SCHEDULED		0
#define ERTS_PT_STATE_ABORTED		1
#define ERTS_PT_STATE_EXECUTING		2

struct ErtsPortTask_ {
    erts_smp_atomic32_t state;
    ErtsPortTaskType type;
    union {
	struct {
	    ErtsPortTask *next;
	    ErtsPortTaskHandle *handle;
	    union {
		struct { /* I/O tasks */
		    ErlDrvEvent event;
		    ErlDrvEventData event_data;
		} io;
	    } u;
	} alive;
	ErtsThrPrgrLaterOp release;
    } u;
};

static void begin_port_cleanup(Port *pp, ErtsPortTask **execq);

ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task,
				 ErtsPortTask,
				 1000,
				 ERTS_ALC_T_PORT_TASK)

#ifdef ERTS_SMP
static void
call_port_task_free(void *vptp)
{
    port_task_free((ErtsPortTask *) vptp);
}
#endif

static ERTS_INLINE void
schedule_port_task_free(ErtsPortTask *ptp)
{
#ifdef ERTS_SMP
    erts_schedule_thr_prgr_later_op(call_port_task_free,
				    (void *) ptp,
				    &ptp->u.release);
#else
    port_task_free(ptp);
#endif
}

/*
 * Task handle manipulation.
 */

static ERTS_INLINE void
reset_port_task_handle(ErtsPortTaskHandle *pthp)
{
    erts_smp_atomic_set_relb(pthp, (erts_aint_t) NULL);
}

static ERTS_INLINE ErtsPortTask *
handle2task(ErtsPortTaskHandle *pthp)
{
    return (ErtsPortTask *) erts_smp_atomic_read_acqb(pthp);
}

static ERTS_INLINE void
reset_handle(ErtsPortTask *ptp)
{
    if (ptp->u.alive.handle) {
	ASSERT(ptp == handle2task(ptp->u.alive.handle));
	reset_port_task_handle(ptp->u.alive.handle);
    }
}

static ERTS_INLINE void
set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
{
    ptp->u.alive.handle = pthp;
    if (pthp) {
	erts_smp_atomic_set_relb(pthp, (erts_aint_t) ptp);
	ASSERT(ptp == handle2task(ptp->u.alive.handle));
    }
}

/*
 * Port queue operations
 */

static ERTS_INLINE void
enqueue_port(ErtsRunQueue *runq, Port *pp)
{
    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
    pp->sched.next = NULL;
    if (runq->ports.end) {
	ASSERT(runq->ports.start);
	runq->ports.end->sched.next = pp;
    }
    else {
	ASSERT(!runq->ports.start);
	runq->ports.start = pp;
    }

    runq->ports.end = pp;
    ASSERT(runq->ports.start && runq->ports.end);

    erts_smp_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
}

static ERTS_INLINE Port *
pop_port(ErtsRunQueue *runq)
{
    Port *pp = runq->ports.start;
    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
    if (!pp) {
	ASSERT(!runq->ports.end);
    }
    else {
	runq->ports.start = runq->ports.start->sched.next;
	if (!runq->ports.start) {
	    ASSERT(runq->ports.end == pp);
	    runq->ports.end = NULL;
	}
	erts_smp_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
    }

    ASSERT(runq->ports.start || !runq->ports.end);
    ASSERT(runq->ports.end || !runq->ports.start);
    return pp;
}

/*
 * Task queue operations
 */

static ERTS_INLINE erts_aint32_t
enqueue_task(Port *pp, ErtsPortTask *ptp)
{
    erts_aint32_t flags;
    ptp->u.alive.next = NULL;
    erts_port_task_sched_lock(&pp->sched);
    flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
    if (!(flags & ERTS_PTS_FLG_EXIT)) {
	if (pp->sched.taskq.in.last) {
	    ASSERT(pp->sched.taskq.in.first);
	    ASSERT(!pp->sched.taskq.in.last->u.alive.next);

	    pp->sched.taskq.in.last->u.alive.next = ptp;
	}
	else {
	    ASSERT(!pp->sched.taskq.in.first);

	    pp->sched.taskq.in.first = ptp;
	}
	pp->sched.taskq.in.last = ptp;
    }
    erts_port_task_sched_unlock(&pp->sched);
    return flags;
}

static ERTS_INLINE ErtsPortTask *
pop_task(Port *pp, ErtsPortTask **execqp)
{
    ErtsPortTask *ptp;

    ptp = *execqp;
    if (ptp) {
	*execqp = ptp->u.alive.next;
	return ptp;
    }

    ASSERT(!pp->sched.taskq.local);

    erts_port_task_sched_lock(&pp->sched);
    ptp = pp->sched.taskq.in.first;
    pp->sched.taskq.in.first = NULL;
    pp->sched.taskq.in.last = NULL;
    if (ptp)
	*execqp = ptp->u.alive.next;
    else
	erts_smp_atomic32_read_band_nob(&pp->sched.flags,
					~ERTS_PTS_FLG_HAVE_TASKS);
    erts_port_task_sched_unlock(&pp->sched);


    return ptp;
}

static ERTS_INLINE void
prepare_exec(Port *pp, ErtsPortTask **execqp)
{
    erts_aint32_t act;
    *execqp = pp->sched.taskq.local;

    /* guess a likely value */
    act = ERTS_PTS_FLG_HAVE_TASKS|ERTS_PTS_FLG_IN_RUNQ;

    while (1) {
	erts_aint32_t new, exp;

	new = exp = act;

	new &= ~ERTS_PTS_FLG_IN_RUNQ;
	new |= ERTS_PTS_FLG_EXEC;

	act = erts_smp_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp);

	ASSERT(act & ERTS_PTS_FLG_IN_RUNQ);

	if (exp == act)
	    break;
    }
}

/* finalize_exec() return value != 0 if port should remain active */
static ERTS_INLINE int
finalize_exec(Port *pp, ErtsPortTask **execq)
{
    erts_aint32_t act;

    pp->sched.taskq.local = *execq;
    *execq = NULL;

    /* guess a likely value */
    act = ERTS_PTS_FLG_EXEC;
    if (execq)
	act |= ERTS_PTS_FLG_HAVE_TASKS;

    while (1) {
	erts_aint32_t new, exp;

	new = exp = act;

	new &= ~ERTS_PTS_FLG_EXEC;
	if (act & ERTS_PTS_FLG_HAVE_TASKS)
	    new |= ERTS_PTS_FLG_IN_RUNQ;

	act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);

	ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ));

	if (exp == act)
	    break;
    }

    return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0;
}

/*
 * Abort a scheduled task.
 */

int
erts_port_task_abort(ErtsPortTaskHandle *pthp)
{
    int res;
    ErtsPortTask *ptp;
#ifdef ERTS_SMP
    ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
#endif

    ptp = handle2task(pthp);
    if (!ptp)
	res = -1;
    else {
	erts_aint32_t old_state;

#ifdef DEBUG
	ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle;
	ERTS_SMP_READ_MEMORY_BARRIER;
	old_state = erts_smp_atomic32_read_nob(&ptp->state);
	if (old_state == ERTS_PT_STATE_SCHEDULED) {
	    ASSERT(saved_pthp == pthp);
	}
#endif

	old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
						  ERTS_PT_STATE_ABORTED,
						  ERTS_PT_STATE_SCHEDULED);
	if (old_state != ERTS_PT_STATE_SCHEDULED)
	    res = - 1; /* Task already aborted, executing, or executed */
	else {

	    reset_port_task_handle(pthp);

	    switch (ptp->type) {
	    case ERTS_PORT_TASK_INPUT:
	    case ERTS_PORT_TASK_OUTPUT:
	    case ERTS_PORT_TASK_EVENT:
		ASSERT(erts_smp_atomic_read_nob(
			   &erts_port_task_outstanding_io_tasks) > 0);
		erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
		break;
	    default:
		break;
	    }

	    res = 0;
	}
    }

#ifdef ERTS_SMP
    erts_thr_progress_unmanaged_continue(dhndl);
#endif

    return res;
}

/*
 * Schedule a task.
 */

int
erts_port_task_schedule(Eterm id,
			ErtsPortTaskHandle *pthp,
			ErtsPortTaskType type,
			...)
{
#ifdef ERTS_SMP
    ErtsRunQueue *xrunq;
    ErtsThrPrgrDelayHandle dhndl;
#endif
    ErtsRunQueue *runq;
    Port *pp;
    ErtsPortTask *ptp = NULL;
    erts_aint32_t act;

    if (pthp && erts_port_task_is_scheduled(pthp)) {
	ASSERT(0);
	erts_port_task_abort(pthp);
    }

    ASSERT(is_internal_port(id));

#ifdef ERTS_SMP
    dhndl = erts_thr_progress_unmanaged_delay();
#endif

    pp = erts_port_lookup_raw(id);

#ifdef ERTS_SMP
    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
	if (pp)
	    erts_port_inc_refc(pp);
	erts_thr_progress_unmanaged_continue(dhndl);
    }
#endif

    if (!pp)
	goto fail;

    ptp = port_task_alloc();

    ptp->type = type;

    erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);

    switch (type) {
    case ERTS_PORT_TASK_INPUT:
    case ERTS_PORT_TASK_OUTPUT: {
	va_list argp;
	va_start(argp, type);
	ptp->u.alive.u.io.event = va_arg(argp, ErlDrvEvent);
	va_end(argp);
	erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
	break;
    }
    case ERTS_PORT_TASK_EVENT: {
	va_list argp;
	va_start(argp, type);
	ptp->u.alive.u.io.event = va_arg(argp, ErlDrvEvent);
	ptp->u.alive.u.io.event_data = va_arg(argp, ErlDrvEventData);
	va_end(argp);
	erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
	break;
    }
    default:
	break;
    }

    set_handle(ptp, pthp);

    act = enqueue_task(pp, ptp);
    if (act & ERTS_PTS_FLG_EXIT) {
	reset_handle(ptp);
	goto fail;
    }

    while (1) {
	erts_aint32_t new, exp;

	if ((act & ERTS_PTS_FLG_HAVE_TASKS)
	    && (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
	    goto done; /* Done */

	new = exp = act;
	new |= ERTS_PTS_FLG_HAVE_TASKS;
	if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
	    new |= ERTS_PTS_FLG_IN_RUNQ;

	act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);

	if (exp == act) {
	    if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
		break; /* Need to enqueue port */
	    goto done; /* Done */
	}

	if (act & ERTS_PTS_FLG_EXIT)
	    goto done; /* Died after our task insert... */
    }

    /* Enqueue port on run-queue */

    runq = erts_port_runq(pp);
    if (!runq)
	ERTS_INTERNAL_ERROR("Missing run-queue");

#ifdef ERTS_SMP
    xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
    if (xrunq) {
	/* Port emigrated ... */
	erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
	erts_smp_runq_unlock(runq);
	runq = erts_port_runq(pp);
	if (!runq)
	    ERTS_INTERNAL_ERROR("Missing run-queue");
    }
#endif

    enqueue_port(runq, pp);
	    
    if (erts_system_profile_flags.runnable_ports) {
	profile_runnable_port(pp, am_active);
    }

    erts_smp_runq_unlock(runq);

    erts_smp_notify_inc_runq(runq);

done:

#ifdef ERTS_SMP
    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
	erts_port_dec_refc(pp);
#endif

    return 0;

fail:

#ifdef ERTS_SMP
    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
	erts_port_dec_refc(pp);
#endif

    if (ptp)
	port_task_free(ptp);

    return -1;
}

void
erts_port_task_free_port(Port *pp)
{
    erts_aint32_t flags;
    ErtsRunQueue *runq;

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
    ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));

    runq = erts_port_runq(pp);
    if (!runq)
	ERTS_INTERNAL_ERROR("Missing run-queue");
    erts_port_task_sched_lock(&pp->sched);
    flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
					    ERTS_PTS_FLG_EXIT);
    erts_port_task_sched_unlock(&pp->sched);
    erts_atomic32_read_bset_relb(&pp->state,
				 (ERTS_PORT_SFLG_CLOSING
				  | ERTS_PORT_SFLG_FREE),
				 ERTS_PORT_SFLG_FREE);

    erts_smp_runq_unlock(runq);

    if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
	begin_port_cleanup(pp, NULL);
}

/*
 * Execute scheduled tasks of a port.
 *
 * erts_port_task_execute() is called by scheduler threads between
 * scheduling of processes. Run-queue lock should be held by caller.
 */

int
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
    Port *pp;
    ErtsPortTask *execq;
    int res = 0;
    int reds = ERTS_PORT_REDS_EXECUTE;
    erts_aint_t io_tasks_executed = 0;
    int fpe_was_unmasked;
    erts_aint32_t state;
    int active;

    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));

    pp = pop_port(runq);
    if (!pp) {
	res = 0;
	goto done;
    }

    erts_smp_runq_unlock(runq);

    *curr_port_pp = pp;
    
    if (erts_sched_stat.enabled) {
	ErtsSchedulerData *esdp = erts_get_scheduler_data();
	Uint old = ERTS_PORT_SCHED_ID(pp, esdp->no);
	int migrated = old && old != esdp->no;

	erts_smp_spin_lock(&erts_sched_stat.lock);
	erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_executed++;
	erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].executed++;
	if (migrated) {
	    erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_migrated++;
	    erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].migrated++;
	}
	erts_smp_spin_unlock(&erts_sched_stat.lock);
    }

    prepare_exec(pp, &execq);

    erts_smp_port_lock(pp);

    /* trace port scheduling, in */
    if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
	trace_sched_ports(pp, am_in);
    }

    fpe_was_unmasked = erts_block_fpe();

    state = erts_atomic_read_nob(&pp->state);
    goto begin_handle_tasks;

    while (1) {
	erts_aint32_t task_state;
	ErtsPortTask *ptp;

	ptp = pop_task(pp, &execq);
	if (!ptp)
	    break;

	task_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
						   ERTS_PT_STATE_EXECUTING,
						   ERTS_PT_STATE_SCHEDULED);
	if (task_state != ERTS_PT_STATE_SCHEDULED) {
	    ASSERT(task_state == ERTS_PT_STATE_ABORTED);
	    goto aborted_port_task;
	}

	reset_handle(ptp);

	ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
	ERTS_SMP_CHK_NO_PROC_LOCKS;
	ASSERT(pp->drv_ptr);

	switch (ptp->type) {
	case ERTS_PORT_TASK_TIMEOUT:
	    reds += ERTS_PORT_REDS_TIMEOUT;
	    if (!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) {
                DTRACE_DRIVER(driver_timeout, pp);
		(*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
            }
	    break;
	case ERTS_PORT_TASK_INPUT:
	    reds += ERTS_PORT_REDS_INPUT;
	    ASSERT((erts_atomic32_read_nob(&pp->state)
		    & ERTS_PORT_SFLGS_DEAD) == 0);
            DTRACE_DRIVER(driver_ready_input, pp);
	    /* NOTE some windows drivers use ->ready_input for input and output */
	    (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data,
					ptp->u.alive.u.io.event);
	    io_tasks_executed++;
	    break;
	case ERTS_PORT_TASK_OUTPUT:
	    reds += ERTS_PORT_REDS_OUTPUT;
	    ASSERT((erts_atomic32_read_nob(&pp->state)
		    & ERTS_PORT_SFLGS_DEAD) == 0);
            DTRACE_DRIVER(driver_ready_output, pp);
	    (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data,
					 ptp->u.alive.u.io.event);
	    io_tasks_executed++;
	    break;
	case ERTS_PORT_TASK_EVENT:
	    reds += ERTS_PORT_REDS_EVENT;
	    ASSERT((erts_atomic32_read_nob(&pp->state)
		    & ERTS_PORT_SFLGS_DEAD) == 0);
            DTRACE_DRIVER(driver_event, pp);
	    (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data,
				  ptp->u.alive.u.io.event,
				  ptp->u.alive.u.io.event_data);
	    io_tasks_executed++;
	    break;
	case ERTS_PORT_TASK_DIST_CMD:
	    reds += erts_dist_command(pp, CONTEXT_REDS-reds);
	    break;
	default:
	    erl_exit(ERTS_ABORT_EXIT,
		     "Invalid port task type: %d\n",
		     (int) ptp->type);
	    break;
	}

	state = erts_atomic32_read_nob(&pp->state);
	if ((state & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(pp)) {
	    reds += ERTS_PORT_REDS_TERMINATE;
	    erts_terminate_port(pp);
	    state = erts_atomic32_read_nob(&pp->state);
	}

	ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));

#ifdef ERTS_SMP
	if (pp->xports)
	    erts_smp_xports_unlock(pp);
	ASSERT(!pp->xports);
#endif

	ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));

    aborted_port_task:
	schedule_port_task_free(ptp);

    begin_handle_tasks:
	if (state & ERTS_PORT_SFLG_FREE) {
	    reds += ERTS_PORT_REDS_FREE;

	    begin_port_cleanup(pp, &execq);

	    break;
	}

	if (reds >= CONTEXT_REDS)
	    break;
    }

    erts_unblock_fpe(fpe_was_unmasked);

    /* trace port scheduling, out */
    if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
    	trace_sched_ports(pp, am_out);
    }

    if (io_tasks_executed) {
	ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
	       >= io_tasks_executed);
	erts_smp_atomic_add_relb(&erts_port_task_outstanding_io_tasks,
				 -1*io_tasks_executed);
    }

#ifdef ERTS_SMP
    ASSERT(runq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
#endif

    active = finalize_exec(pp, &execq);

    erts_port_release(pp);

    *curr_port_pp = NULL;

    erts_smp_runq_lock(runq);
 
    if (!active) {
	if (erts_system_profile_flags.runnable_ports)
	    profile_runnable_port(pp, am_inactive);
    }
    else {
#ifdef ERTS_SMP
	ErtsRunQueue *xrunq;
#endif

	ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));

#ifdef ERTS_SMP
	xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
	if (!xrunq) {
#endif
	    enqueue_port(runq, pp);
	    /* No need to notify ourselves about inc in runq. */
#ifdef ERTS_SMP
	}
	else {
	    /* Port emigrated ... */
	    erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
	    erts_smp_runq_unlock(runq);

	    xrunq = erts_port_runq(pp);
	    ASSERT(xrunq);
	    enqueue_port(xrunq, pp);
	    erts_smp_runq_unlock(xrunq);
	    erts_smp_notify_inc_runq(xrunq);

	    erts_smp_runq_lock(runq);
	}
#endif
    }

 done:
    res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
	   != (erts_aint_t) 0);

    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));

    ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds);

    return res;
}

#ifdef ERTS_SMP
static void
release_port(void *vport)
{
    erts_port_dec_refc((Port *) vport);
}
#endif

static void
begin_port_cleanup(Port *pp, ErtsPortTask **execqp)
{
    int i, max;
    ErtsPortTask *qs[2];

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));


    /*
     * Handle remaining tasks...
     */

    max = 0;
    if (execqp && *execqp) {
	qs[max++] = *execqp;
	*execqp = NULL;
    }

    erts_port_task_sched_lock(&pp->sched);
    qs[max] = pp->sched.taskq.in.first;
    pp->sched.taskq.in.first = NULL;
    pp->sched.taskq.in.last = NULL;
    erts_port_task_sched_unlock(&pp->sched);
    if (qs[max])
	max++;

    for (i = 0; i < max; i++) {
	while (1) {
	    erts_aint32_t state;
	    ErtsPortTask *ptp = qs[i];
	    if (!ptp)
		break;

	    qs[i] = ptp->u.alive.next;

	    /* Normal case here is aborted tasks... */
	    state = erts_smp_atomic32_read_nob(&ptp->state);
	    if (state == ERTS_PT_STATE_ABORTED)
		goto aborted_port_task;

	    state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
						  ERTS_PT_STATE_EXECUTING,
						  ERTS_PT_STATE_SCHEDULED);
	    if (state != ERTS_PT_STATE_SCHEDULED) {
		ASSERT(state == ERTS_PT_STATE_ABORTED);
		goto aborted_port_task;
	    }

	    reset_handle(ptp);

	    switch (ptp->type) {
	    case ERTS_PORT_TASK_TIMEOUT:
		break;
	    case ERTS_PORT_TASK_INPUT:
		erts_stale_drv_select(pp->common.id,
				      ptp->u.alive.u.io.event,
				      DO_READ,
				      1);
		break;
	    case ERTS_PORT_TASK_OUTPUT:
		erts_stale_drv_select(pp->common.id,
				      ptp->u.alive.u.io.event,
				      DO_WRITE,
				      1);
		break;
	    case ERTS_PORT_TASK_EVENT:
		erts_stale_drv_select(pp->common.id,
				      ptp->u.alive.u.io.event,
				      0,
				      1);
		break;
	    case ERTS_PORT_TASK_DIST_CMD:
		break;
	    default:
		erl_exit(ERTS_ABORT_EXIT,
			 "Invalid port task type: %d\n",
			 (int) ptp->type);
	    }

	aborted_port_task:
	    schedule_port_task_free(ptp);
	}
    }

    erts_smp_atomic32_read_band_nob(&pp->sched.flags,
				    ~ERTS_PTS_FLG_HAVE_TASKS);

    /*
     * Schedule cleanup of port structure...
     */
#ifdef ERTS_SMP
    erts_schedule_thr_prgr_later_op(release_port,
				    (void *) pp,
				    &pp->common.u.release);
#else
    pp->cleanup = 1;
#endif
}

int
erts_port_is_scheduled(Port *pp)
{
    erts_aint32_t flags = erts_smp_atomic32_read_acqb(&pp->sched.flags);
    return (flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)) != 0;
}

#ifdef ERTS_SMP

void
erts_enqueue_port(ErtsRunQueue *rq, Port *pp)
{
    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
    ASSERT(rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
    ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ);
    enqueue_port(rq, pp);
}

Port *
erts_dequeue_port(ErtsRunQueue *rq)
{
    Port *pp;
    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
    pp = pop_port(rq);
    ASSERT(!pp
	   || rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
    ASSERT(!pp || (erts_smp_atomic32_read_nob(&pp->sched.flags)
		   & ERTS_PTS_FLG_IN_RUNQ));
    return pp;
}

#endif

/*
 * Initialize the module.
 */
void
erts_port_task_init(void)
{
    erts_smp_atomic_init_nob(&erts_port_task_outstanding_io_tasks,
			     (erts_aint_t) 0);
    init_port_task_alloc();
}