aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/beam/erl_port_task.c
blob: a044de3fee1835066a0a32394a1414a2b3bb67d6 (plain) (tree)
1
2
3
4
5
6
7
8
9

                   
  
                                                        
  


                                                                   
  






                                                                           
  

















                                        
                         
                           
                          
                   
 


                                                                        
                                                  
 
                       


                                   

      





                                                                                      
      
 
                    
                                                                    
                                                                    


                                                                    
                                                                    


                                                                    


                                                     











                                                                   
 






                                                                                   

                                                      


                                         
 









                                          

                      
                              
                          



                                       


                                             


                                   

  





                                     
      
















                                                                             
 


                                                         


                                              
                                      
                                                      
 

                                                             
                                    

                                                            











                                          



                                                                  



                        
 










                                                        

                                         
 









                                                                     
























                                                                       

































































































                                                                                    
            
                            
      
                                               





                                                                         






























































































































































































































                                                                                               




                            





                                                       


                                     
                                                            




                               


                                                        



                       









                                                               

                                                       
                               
               

                                                          


     












                                                                    
 
  
                             

   



                                                   
 

                           
 






















                                                                                      


                       
                                                  
 


                               
 
                                
 

                  
 
                              
 











                                                                           
 





                                                   
 
                                
 

                  
 
                              
 








                                                                           
 































                                                                                  
 







                                                                    
 


                                                                                 

                                        


                                      
                                                                          





                                                
 

                              
 
                                          
 










                                                                               
 


















                                                                       
 







                                                                                    


         
                                            
 





                                   

 















                                                                  



                                                                            







                                              

                                               



                                            
                  












                                                                                 


































                                                                         











                                                         








                                         

                                                 
 
                                                                         

               
                                                             

                                  











                                                          
                                 


                                          
                                                                             






                                                  


                        
 





                                              
 
 

                                                 

                             

                                             

                                                         

                           
          


                                                                    
         




                                                           

              


                                              
         
                                      

                                                         
     
                                            

                    

 

                                                                       
 
                                                                     
 
                                                                              





                                                   
     
 
                                                               














                                                                        
     

 

                                                                    
                                                                    
 
                      
                                     
 

                                             
          





                                                           
                  
 


                                                                    
 



                                                                   

                               
 







                                                                         

                                                       


                       
     
 










                                                                         
                                                

 

                                                                                
 

                                                                       


                                                                        

                                                               
                                         















                                                                       
     








                                                                    
     

                 

 









                                                       
 





                                                               
                                            
















                                                                       
     
          

                                   
                                                  











                                                                   

     





























                                                                  


                                 


                                              
 








                                                                   
              


                                             
         

                                                                             
     

 
  



















                                                       






                                                           
  

                          
 
   
                                              
 
            
                      


                                                                       

                            

                 
          






                                                             
                                                      
         
      
 




                                                                           
              
 
                                         
 










                                                                               
 
                    

         
 

                                                

      
               
 
 

                                              
 



                                                                 
 
                                          

                                                                 


                                                      
 









                                       
 

                                   
 



                                                        
 









                                                            
 







                                                            
 











                                                                           
 
                                     
 

                             
 




                                                        
 
                                                                         
     









                                                 
                            
 

                                            



                                 

                       
                             
                                 
                                     
 
                                                                
 
                                 
 


                                                
 
                                  
 
               



                                                    
     

      

                  
 

                                          
 

                               
 
                                                                         
 

                              

                   
                              


                                 
                                                             
                     
                                                                       




                                

                                                                      
                     
                                                                       

              
                                   
                     





                                                                               
                                                          
                                      




                                                                  

              
            


              
                                                        
                          



                                                   
     
 



                                                



                                                                   


                               
                                          



                                                                
                         








                                                                         

         



                                                          






                                                 







                                                                   

                                      
                
                               
                                                                     
                                   




                                                     
 
                           






                                   


                                                




                                               
             
 






                                               
                                                             





                                                 
             
 






                                               


                                                     



                            




                                  
                        
                       

                                                   
                                                                         
 
                              






                                                               


                                                          

                                                         
 


                                                            
                                           
 
 
  
                                     

                                                                  
                                                                    




                                                               
             
                        
                          
                
                  
                 
                                      
                         

                        
                          
                                              
                              


                                                         





                        

                                    
                               

                       

                                  












                                                                        
                                                 


                           




                                                
                                        
 
                                               
                                      
                                                         
                            
 


                                 
 
                                                                   









                                                                            
 



                                                     




                                                       
                                    
                              






                                                      
                                                     

                                                               


                                                                       

                                  
                                        
                                                        
                                                  
                                                 
                                                          
                                      
                                                                  
                                                                  
                                               


                                   
                                         
                                                        
                                                   
                                                  
                                                                   
                                                                   
                                               


                                  
                                        
                                                        
                                            
                                           
                                                            

                                                                 
                                               

                                

                                                                     
                              
                                                        
                                     



                                                                             

                                                                               



                                                                             

                                                  
                  
         
                                     
                              
                                                                  

                  
                                      




                                                    
                                                               
 







                                                                               

                                     
 


                                          
                                                               
 

                  
 


                                          



                                     
                  

     
                                       
                             
 
 
                            



                                                                             

     
               
                                                                              

      
                                                          
 

                            





                             
                 



                            
                                                                             
 

                                                                       
                                          
                                        


                                   



                                                                
                                  
                                                                         


                                       



                                            

                                     

         

     
      
                                                                         
                               
 
                                        
 
                                                         
                                                    



               





                                       









                                                                              
      

           
                                                                             
 
               

                                      

                                             


                                                   
      






                                                                


            


















                                                             
                       



















                                                                              































                                                                           
                              

                                


                                        
                                                    
                                                               
                                                               

                                              

                                       
                                                    
                                                               
                                                               

                                               

                                      
                                                    
                                                               
                                                               

                                         


                                         














                                                                                   
                      
             
                    
                                          



                                                        

                                         


         
                                                     
                                                                  


























                                                                       
                                                                                                
                                  
                                                                                              





                                                                   




                                            








                                                                                     


                    

 
               
 




                                                                            
                                                                                

                         
 

                                   
 




                                                                               

                                                               
              









                         

                                                                  
                           
                                   
 
/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB 2006-2017. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * %CopyrightEnd%
 */

/*
 * Description:	Scheduling of port tasks
 *
 * Author: 	Rickard Green
 */

#define ERL_PORT_TASK_C__

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "global.h"
#include "erl_port_task.h"
#include "dist.h"
#include "erl_check_io.h"
#include "dtrace-wrapper.h"
#include "lttng-wrapper.h"
#include <stdarg.h>

/*
 * ERTS_PORT_CALLBACK_VREDS: Limit the amount of callback calls we do...
 */
#define ERTS_PORT_CALLBACK_VREDS (CONTEXT_REDS/20)

#if defined(DEBUG) && 0
#define ERTS_HARD_DEBUG_TASK_QUEUES
#else
#undef ERTS_HARD_DEBUG_TASK_QUEUES
#endif

#ifdef ERTS_HARD_DEBUG_TASK_QUEUES
static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue);
#define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ) \
    chk_task_queues((PP), (EQ), (PBQ))
#else
#define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ)
#endif

#ifdef USE_VM_PROBES
#define DTRACE_DRIVER(PROBE_NAME, PP)                              \
    if (DTRACE_ENABLED(PROBE_NAME)) {                              \
        DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);         \
        DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);            \
                                                                   \
        dtrace_pid_str(ERTS_PORT_GET_CONNECTED(PP), process_str);  \
        dtrace_port_str(PP, port_str);                             \
        DTRACE3(PROBE_NAME, process_str, port_str, PP->name);      \
    }
#else
#define  DTRACE_DRIVER(PROBE_NAME, PP) do {} while(0)
#endif
#ifdef USE_LTTNG_VM_TRACEPOINTS
#define LTTNG_DRIVER(TRACEPOINT, PP)                              \
    if (LTTNG_ENABLED(TRACEPOINT)) {                              \
        lttng_decl_portbuf(port_str);                             \
        lttng_decl_procbuf(proc_str);                             \
        lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(PP), proc_str);  \
        lttng_port_to_str((PP), port_str);                        \
        LTTNG3(TRACEPOINT, proc_str, port_str, (PP)->name);       \
    }
#else
#define LTTNG_DRIVER(TRACEPOINT, PP) do {} while(0)
#endif

#define ERTS_SMP_LC_VERIFY_RQ(RQ, PP)					\
    do {								\
	ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));		\
	ERTS_SMP_LC_ASSERT((RQ) == ((ErtsRunQueue *)			\
				    erts_smp_atomic_read_nob(&(PP)->run_queue))); \
    } while (0)

erts_smp_atomic_t erts_port_task_outstanding_io_tasks;

#define ERTS_PT_STATE_SCHEDULED		0
#define ERTS_PT_STATE_ABORTED		1
#define ERTS_PT_STATE_EXECUTING		2

typedef union {
    struct { /* I/O tasks */
	ErlDrvEvent event;
	ErlDrvEventData event_data;
    } io;
    struct {
	ErtsProc2PortSigCallback callback;
	ErtsProc2PortSigData data;
    } psig;
} ErtsPortTaskTypeData;

struct ErtsPortTask_ {
    erts_smp_atomic32_t state;
    ErtsPortTaskType type;
    union {
	struct {
	    ErtsPortTask *next;
	    ErtsPortTaskHandle *handle;
	    int flags;
	    Uint32 ref[ERTS_MAX_REF_NUMBERS];
	    ErtsPortTaskTypeData td;
	} alive;
	ErtsThrPrgrLaterOp release;
    } u;
};

struct ErtsPortTaskHandleList_ {
    ErtsPortTaskHandle handle;
    union {
	ErtsPortTaskHandleList *next;
#ifdef ERTS_SMP
	ErtsThrPrgrLaterOp release;
#endif
    } u;
};

typedef struct ErtsPortTaskBusyCaller_ ErtsPortTaskBusyCaller;
struct ErtsPortTaskBusyCaller_ {
    ErtsPortTaskBusyCaller *next;
    Eterm caller;
    SWord count;
    ErtsPortTask *last;
};

#define ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS 17
struct ErtsPortTaskBusyCallerTable_ {
    ErtsPortTaskBusyCaller *bucket[ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS];
    ErtsPortTaskBusyCaller pre_alloc_busy_caller;
};


static void begin_port_cleanup(Port *pp,
			       ErtsPortTask **execq,
			       int *processing_busy_q_p);

ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task,
				 ErtsPortTask,
				 1000,
				 ERTS_ALC_T_PORT_TASK)

ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(busy_caller_table,
				 ErtsPortTaskBusyCallerTable,
				 50,
				 ERTS_ALC_T_BUSY_CALLER_TAB)

#ifdef ERTS_SMP
static void
call_port_task_free(void *vptp)
{
    port_task_free((ErtsPortTask *) vptp);
}
#endif

static ERTS_INLINE void
schedule_port_task_free(ErtsPortTask *ptp)
{
#ifdef ERTS_SMP
    erts_schedule_thr_prgr_later_cleanup_op(call_port_task_free,
					    (void *) ptp,
					    &ptp->u.release,
					    sizeof(ErtsPortTask));
#else
    port_task_free(ptp);
#endif
}

static ERTS_INLINE ErtsPortTask *
p2p_sig_data_to_task(ErtsProc2PortSigData *sigdp)
{
    ErtsPortTask *ptp;
    char *ptr = (char *) sigdp;
    ptr -= offsetof(ErtsPortTask, u.alive.td.psig.data);
    ptp = (ErtsPortTask *) ptr;
    ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
    return ptp;
}

static ERTS_INLINE ErtsProc2PortSigData *
p2p_sig_data_init(ErtsPortTask *ptp)
{

    ptp->type = ERTS_PORT_TASK_PROC_SIG;
    ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP;
    erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);

    ASSERT(ptp == p2p_sig_data_to_task(&ptp->u.alive.td.psig.data));

    return &ptp->u.alive.td.psig.data;
}

ErtsProc2PortSigData *
erts_port_task_alloc_p2p_sig_data(void)
{
    ErtsPortTask *ptp = port_task_alloc();

    return p2p_sig_data_init(ptp);
}

ErtsProc2PortSigData *
erts_port_task_alloc_p2p_sig_data_extra(size_t extra, void **extra_ptr)
{
    ErtsPortTask *ptp = erts_alloc(ERTS_ALC_T_PORT_TASK,
                                   sizeof(ErtsPortTask) + extra);

    *extra_ptr = ptp+1;

    return p2p_sig_data_init(ptp);
}

void
erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData *sigdp)
{
    schedule_port_task_free(p2p_sig_data_to_task(sigdp));
}

static ERTS_INLINE Eterm
task_caller(ErtsPortTask *ptp)
{
    Eterm caller;

    ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);

    caller = ptp->u.alive.td.psig.data.caller;

    ASSERT(is_internal_pid(caller) || is_internal_port(caller));

    return caller;
}

/*
 * Busy queue management
 */

static ERTS_INLINE int
caller2bix(Eterm caller)
{
    ASSERT(is_internal_pid(caller) || is_internal_port(caller));
    return (int) (_GET_PID_DATA(caller) % ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS);
}


static void
popped_from_busy_queue(Port *pp, ErtsPortTask *ptp, int last)
{
    ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp;
    ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
    Eterm caller = task_caller(ptp);
    int bix = caller2bix(caller);

    ASSERT(is_internal_pid(caller));

    ASSERT(tabp);
    bcp = tabp->bucket[bix];
    prev_bcpp = &tabp->bucket[bix];
    ASSERT(bcp);
    while (bcp->caller != caller) {
	prev_bcpp = &bcp->next;
	bcp = bcp->next;
	ASSERT(bcp);
    }
    ASSERT(bcp->count > 0);
    if (--bcp->count != 0) {
	ASSERT(!last);
    }
    else {
	*prev_bcpp = bcp->next;
	if (bcp == &tabp->pre_alloc_busy_caller)
	    bcp->caller = am_undefined;
	else
	    erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
	if (last) {
#ifdef DEBUG
	    erts_aint32_t flags =
#endif
		erts_smp_atomic32_read_band_nob(
		    &pp->sched.flags,
		    ~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
	    ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
#ifdef DEBUG
	    for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
		ASSERT(!tabp->bucket[bix]);
	    }
#endif
	    busy_caller_table_free(tabp);
	    pp->sched.taskq.local.busy.first = NULL;
	    pp->sched.taskq.local.busy.last = NULL;
	    pp->sched.taskq.local.busy.table = NULL;
	}
    }
}

static void
busy_wait_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
{
    ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
    Eterm caller = task_caller(ptp);
    ErtsPortTaskBusyCaller *bcp;
    int bix;

    ASSERT(is_internal_pid(caller));
    /*
     * Port is busy and this task type needs to wait until not busy.
     */

    ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY);

    ptp->u.alive.next = NULL;
    if (pp->sched.taskq.local.busy.last) {
	ASSERT(pp->sched.taskq.local.busy.first);
	pp->sched.taskq.local.busy.last->u.alive.next = ptp;
    }
    else {
	int i;
#ifdef DEBUG
	erts_aint32_t flags;
#endif
	pp->sched.taskq.local.busy.first = ptp;

#ifdef DEBUG
	flags = 
#endif
	    erts_smp_atomic32_read_bor_nob(&pp->sched.flags,
					   ERTS_PTS_FLG_HAVE_BUSY_TASKS);
	ASSERT(!(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS));

	ASSERT(!tabp);

	tabp = busy_caller_table_alloc();
	pp->sched.taskq.local.busy.table = tabp;
	for (i = 0; i < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; i++)
	    tabp->bucket[i] = NULL;
	tabp->pre_alloc_busy_caller.caller = am_undefined;
    }
    pp->sched.taskq.local.busy.last = ptp;

    bix = caller2bix(caller);
    ASSERT(tabp);
    bcp = tabp->bucket[bix];

    while (bcp && bcp->caller != caller)
	bcp = bcp->next;

    if (bcp)
	bcp->count++;
    else {
	if (tabp->pre_alloc_busy_caller.caller == am_undefined)
	    bcp = &tabp->pre_alloc_busy_caller;
	else
	    bcp = erts_alloc(ERTS_ALC_T_BUSY_CALLER,
			     sizeof(ErtsPortTaskBusyCaller));
	bcp->caller = caller;
	bcp->count = 1;
	bcp->next = tabp->bucket[bix];
	tabp->bucket[bix] = bcp;
    }

    bcp->last = ptp;
}

static ERTS_INLINE int
check_sig_dep_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
{
    ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
    ErtsPortTask *last_ptp;
    ErtsPortTaskBusyCaller *bcp;
    int bix;
    Eterm caller;

    ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP);
    ASSERT(pp->sched.taskq.local.busy.last);
    ASSERT(tabp);


    /*
     * We are either not busy, or the task does not imply wait on busy port.
     * However, due to the signaling order requirements the task might depend
     * on other tasks in the busy queue.
     */

    caller = task_caller(ptp);
    bix = caller2bix(caller);
    bcp = tabp->bucket[bix];
    while (bcp && bcp->caller != caller)
	bcp = bcp->next;

    if (!bcp)
	return 0;

    /*
     * There are other tasks that we depend on in the busy queue;
     * move into busy queue.
     */

    bcp->count++;
    last_ptp = bcp->last;
    ptp->u.alive.next = last_ptp->u.alive.next;
    if (!ptp->u.alive.next) {
	ASSERT(pp->sched.taskq.local.busy.last == last_ptp);
	pp->sched.taskq.local.busy.last = ptp;
    }
    last_ptp->u.alive.next = ptp;
    bcp->last = ptp;

    return 1;
}

static void
no_sig_dep_move_from_busyq(Port *pp)
{
    ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
    ErtsPortTask *first_ptp, *last_ptp, *ptp;
    ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp = NULL;

    /*
     * Move tasks at the head of the busy queue that no longer
     * have any dependencies to busy wait tasks into the ordinary
     * queue.
     */

    first_ptp = ptp = pp->sched.taskq.local.busy.first;

    ASSERT(ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
    ASSERT(tabp);

    do {
	Eterm caller = task_caller(ptp);

	if (!bcp || bcp->caller != caller) {
	    int bix = caller2bix(caller);

	    prev_bcpp = &tabp->bucket[bix];
	    bcp = tabp->bucket[bix];
	    ASSERT(bcp);
	    while (bcp->caller != caller) {
		ASSERT(bcp);
		prev_bcpp = &bcp->next;
		bcp = bcp->next;
	    }
	}

	ASSERT(bcp->caller == caller);
	ASSERT(bcp->count > 0);

	if (--bcp->count == 0) {
	    *prev_bcpp = bcp->next;
	    if (bcp == &tabp->pre_alloc_busy_caller)
		bcp->caller = am_undefined;
	    else
		erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
	}

	last_ptp = ptp;
	ptp = ptp->u.alive.next;
    } while (ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));

    pp->sched.taskq.local.busy.first = last_ptp->u.alive.next;
    if (!pp->sched.taskq.local.busy.first) {
#ifdef DEBUG
	int bix;
	erts_aint32_t flags =
#endif
	    erts_smp_atomic32_read_band_nob(
		&pp->sched.flags,
		~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
	ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
#ifdef DEBUG
	for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
	    ASSERT(!tabp->bucket[bix]);
	}
#endif
	busy_caller_table_free(tabp);
	pp->sched.taskq.local.busy.last = NULL;
	pp->sched.taskq.local.busy.table = NULL;
    }
    last_ptp->u.alive.next = pp->sched.taskq.local.first;
    pp->sched.taskq.local.first = first_ptp;
}

#ifdef ERTS_HARD_DEBUG_TASK_QUEUES

static void
chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue)
{
    Sint tot_count, tot_table_count;
    int bix;
    ErtsPortTask *ptp, *last;
    ErtsPortTask *first = processing_busy_queue ? execq : pp->sched.taskq.local.busy.first;
    ErtsPortTask *nb_task_queue = processing_busy_queue ? pp->sched.taskq.local.first : execq;
    ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
    ErtsPortTaskBusyCaller *bcp;

    if (!first) {
	ASSERT(!tabp);
	ASSERT(!pp->sched.taskq.local.busy.last);
	ASSERT(!(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
	return;
    }

    ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
    ASSERT(tabp);

    tot_count = 0;
    ptp = first;
    while (ptp) {
	Sint count = 0;
	Eterm caller = task_caller(ptp);
	int bix = caller2bix(caller);
	for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
	    if (bcp->caller == caller)
		break;
	ASSERT(bcp && bcp->caller == caller);

	ASSERT(bcp->last);
	while (1) {
	    ErtsPortTask *ptp2;

	    ASSERT(caller == task_caller(ptp));
	    count++;
	    tot_count++;
	    last = ptp;

	    for (ptp2 = nb_task_queue; ptp2; ptp2 = ptp2->u.alive.next) {
		ASSERT(ptp != ptp2);
	    }

	    if (ptp == bcp->last)
		break;
	    ptp = ptp->u.alive.next;
	}

	ASSERT(count == bcp->count);
	ptp = ptp->u.alive.next;
    }

    tot_table_count = 0;
    for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
	for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
	    tot_table_count += bcp->count;
    }

    ASSERT(tot_count == tot_table_count);

    ASSERT(last == pp->sched.taskq.local.busy.last);
}

#endif /* ERTS_HARD_DEBUG_TASK_QUEUES */

/*
 * Task handle manipulation.
 */

static ERTS_INLINE void
reset_port_task_handle(ErtsPortTaskHandle *pthp)
{
    erts_smp_atomic_set_relb(pthp, (erts_aint_t) NULL);
}

static ERTS_INLINE ErtsPortTask *
handle2task(ErtsPortTaskHandle *pthp)
{
    return (ErtsPortTask *) erts_smp_atomic_read_acqb(pthp);
}

static ERTS_INLINE void
reset_handle(ErtsPortTask *ptp)
{
    if (ptp->u.alive.handle) {
	ASSERT(ptp == handle2task(ptp->u.alive.handle));
	reset_port_task_handle(ptp->u.alive.handle);
    }
}

static ERTS_INLINE void
reset_executed_io_task_handle(ErtsPortTask *ptp)
{
    if (ptp->u.alive.handle) {
	ASSERT(ptp == handle2task(ptp->u.alive.handle));
	erts_io_notify_port_task_executed(ptp->u.alive.handle);
	reset_port_task_handle(ptp->u.alive.handle);
    }
}

static ERTS_INLINE void
set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
{
    ptp->u.alive.handle = pthp;
    if (pthp) {
	erts_smp_atomic_set_relb(pthp, (erts_aint_t) ptp);
	ASSERT(ptp == handle2task(ptp->u.alive.handle));
    }
}

static ERTS_INLINE void
set_tmp_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
{
    ptp->u.alive.handle = NULL;
    if (pthp) {
	/*
	 * IMPORTANT! Task either need to be aborted, or task handle
	 * need to be detached before thread progress has been made.
	 */
	erts_smp_atomic_set_relb(pthp, (erts_aint_t) ptp);
    }
}


/*
 * Busy port queue management
 */

static erts_aint32_t
check_unset_busy_port_q(Port *pp,
			erts_aint32_t flags,
			ErtsPortTaskBusyPortQ *bpq)
{
    ErlDrvSizeT qsize, low;
    int resume_procs = 0;

    ASSERT(bpq);
    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));

    erts_port_task_sched_lock(&pp->sched);
    qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
    low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
    if (qsize < low) {
	erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
			       | ERTS_PTS_FLG_BUSY_PORT_Q);
	flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask);
	if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
	    resume_procs = 1;
    }
    else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
	flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags,
						 ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
	flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
    }
    erts_port_task_sched_unlock(&pp->sched);
    if (resume_procs)
	erts_port_resume_procs(pp);

    return flags;
}

static ERTS_INLINE void
aborted_proc2port_data(Port *pp, ErlDrvSizeT size)
{
    ErtsPortTaskBusyPortQ *bpq;
    erts_aint32_t flags;
    ErlDrvSizeT qsz;

    ASSERT(pp->sched.taskq.bpq);

    if (size == 0)
	return;

    bpq = pp->sched.taskq.bpq;

    qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
						      (erts_aint_t) -size);
    ASSERT(qsz + size > qsz);
    flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
    ASSERT(pp->sched.taskq.bpq);
    if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
		  | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q)
	return;
    if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low))
	erts_smp_atomic32_read_bor_nob(&pp->sched.flags,
				       ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
}

static ERTS_INLINE void
dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
{
    ErtsPortTaskBusyPortQ *bpq;
    erts_aint32_t flags;
    ErlDrvSizeT qsz;

    ASSERT(pp->sched.taskq.bpq);

    if (size == 0)
	return;

    bpq = pp->sched.taskq.bpq;

    qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
						      (erts_aint_t) -size);
    ASSERT(qsz + size > qsz);
    flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
    if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
	return;
    if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low))
	check_unset_busy_port_q(pp, flags, bpq);
}

static ERTS_INLINE erts_aint32_t
enqueue_proc2port_data(Port *pp,
		       ErtsProc2PortSigData *sigdp,
		       erts_aint32_t flags)
{
    ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
    if (sigdp && bpq) {
	ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
	if (size) {
	    erts_aint_t asize = erts_smp_atomic_add_read_acqb(&bpq->size,
							      (erts_aint_t) size);
	    ErlDrvSizeT qsz = (ErlDrvSizeT) asize;

	    ASSERT(qsz - size < qsz);

	    if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
		flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags,
							ERTS_PTS_FLG_BUSY_PORT_Q);
		flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
		qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size);
		if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) {
		    flags = (erts_smp_atomic32_read_bor_relb(
				 &pp->sched.flags,
				 ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
		    flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
		}
	    }
	    ASSERT(!(flags & ERTS_PTS_FLG_EXIT));
	}
    }
    return flags;
}

/*
 * erl_drv_busy_msgq_limits() is called by drivers either reading or
 * writing the limits.
 *
 * A limit of zero is interpreted as a read only request (using a
 * limit of zero would not be useful). Other values are interpreted
 * as a write-read request.
 */

void
erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
{
    Port *pp = erts_drvport2port(dport);
    ErtsPortTaskBusyPortQ *bpq;
    int written = 0, resume_procs = 0;
    ErlDrvSizeT low, high;

    if (pp == ERTS_INVALID_ERL_DRV_PORT || !(bpq = pp->sched.taskq.bpq)) {
	if (lowp)
	    *lowp = ERL_DRV_BUSY_MSGQ_DISABLED;
	if (highp)
	    *highp = ERL_DRV_BUSY_MSGQ_DISABLED;
	return;
    }

    low = lowp ? *lowp : 0;
    high = highp ? *highp : 0;

    erts_port_task_sched_lock(&pp->sched);

    if (low == ERL_DRV_BUSY_MSGQ_DISABLED
	|| high == ERL_DRV_BUSY_MSGQ_DISABLED) {
	/* Disable busy msgq feature */
	erts_aint32_t flags;
	pp->sched.taskq.bpq = NULL;
	flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
	flags = erts_smp_atomic32_read_band_acqb(&pp->sched.flags, flags);
	if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
	    resume_procs = 1;
    }
    else {

	if (!low)
	    low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
	else {
	    if (bpq->high < low)
		bpq->high = low;
	    erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
	    written = 1;
	}
    
	if (!high)
	    high = bpq->high;
	else {
	    if (low > high) {
		low = high;
		erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
	    }
	    bpq->high = high;
	    written = 1;
	}

	if (written) {
	    ErlDrvSizeT size = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
	    if (size > high)
		erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
						ERTS_PTS_FLG_BUSY_PORT_Q);
	    else if (size < low)
		erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
						ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
	}
    }

    erts_port_task_sched_unlock(&pp->sched);

    if (resume_procs)
	erts_port_resume_procs(pp);
    if (lowp)
	*lowp = low;
    if (highp)
	*highp = high;
}

/*
 * No-suspend handles.
 */

#ifdef ERTS_SMP
static void
free_port_task_handle_list(void *vpthlp)
{
    erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
}
#endif

static void
schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
{
#ifdef ERTS_SMP
    erts_schedule_thr_prgr_later_cleanup_op(free_port_task_handle_list,
					    (void *) pthlp,
					    &pthlp->u.release,
					    sizeof(ErtsPortTaskHandleList));
#else
    erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp);
#endif
}

static ERTS_INLINE void
abort_nosuspend_task(Port *pp,
		     ErtsPortTaskType type,
		     ErtsPortTaskTypeData *tdp,
		     int bpq_data)
{

    ASSERT(type == ERTS_PORT_TASK_PROC_SIG);

    if (!bpq_data)
	tdp->psig.callback(NULL,
			   ERTS_PORT_SFLG_INVALID,
			   ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
			   &tdp->psig.data);
    else {
	ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data);
	tdp->psig.callback(NULL,
			   ERTS_PORT_SFLG_INVALID,
			   ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
			   &tdp->psig.data);
	aborted_proc2port_data(pp, size);
    }
}

static ErtsPortTaskHandleList *
get_free_nosuspend_handles(Port *pp)
{
    ErtsPortTaskHandleList *nshp, *last_nshp = NULL;

    ERTS_SMP_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched));

    nshp = pp->sched.taskq.local.busy.nosuspend;

    while (nshp && !erts_port_task_is_scheduled(&nshp->handle)) {
	last_nshp = nshp;
	nshp = nshp->u.next;
    }

    if (!last_nshp)
	nshp = NULL;
    else {
	nshp = pp->sched.taskq.local.busy.nosuspend;
	pp->sched.taskq.local.busy.nosuspend = last_nshp->u.next;
	last_nshp->u.next = NULL;
	if (!pp->sched.taskq.local.busy.nosuspend)
	    erts_smp_atomic32_read_band_nob(&pp->sched.flags,
					    ~ERTS_PTS_FLG_HAVE_NS_TASKS);
    }
    return nshp;
}

static void
free_nosuspend_handles(ErtsPortTaskHandleList *free_nshp)
{
    while (free_nshp) {
	ErtsPortTaskHandleList *nshp = free_nshp;
	free_nshp = free_nshp->u.next;
	schedule_port_task_handle_list_free(nshp);
    }
}

/*
 * Port queue operations
 */

static ERTS_INLINE void
enqueue_port(ErtsRunQueue *runq, Port *pp)
{
    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
    pp->sched.next = NULL;
    if (runq->ports.end) {
	ASSERT(runq->ports.start);
	runq->ports.end->sched.next = pp;
    }
    else {
	ASSERT(!runq->ports.start);
	runq->ports.start = pp;
    }

    runq->ports.end = pp;
    ASSERT(runq->ports.start && runq->ports.end);

    erts_smp_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);

#ifdef ERTS_SMP
    if (ERTS_RUNQ_FLGS_GET_NOB(runq) & ERTS_RUNQ_FLG_HALTING)
	erts_non_empty_runq(runq);
#endif
}

static ERTS_INLINE Port *
pop_port(ErtsRunQueue *runq)
{
    Port *pp = runq->ports.start;
    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
    if (!pp) {
	ASSERT(!runq->ports.end);
    }
    else {
	runq->ports.start = runq->ports.start->sched.next;
	if (!runq->ports.start) {
	    ASSERT(runq->ports.end == pp);
	    runq->ports.end = NULL;
	}
	erts_smp_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
    }

    ASSERT(runq->ports.start || !runq->ports.end);
    ASSERT(runq->ports.end || !runq->ports.start);
    return pp;
}

/*
 * Task queue operations
 */

static ERTS_INLINE int
enqueue_task(Port *pp,
	     ErtsPortTask *ptp,
	     ErtsProc2PortSigData *sigdp,
	     ErtsPortTaskHandleList *ns_pthlp,
	     erts_aint32_t *flagsp)

{
    int res;
    erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT;
    erts_aint32_t flags;
    ptp->u.alive.next = NULL;
    if (ns_pthlp)
	fail_flags |= ERTS_PTS_FLG_BUSY_PORT;
    erts_port_task_sched_lock(&pp->sched);
    flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
    if (flags & fail_flags)
	res = 0;
    else {
	if (ns_pthlp) {
	    ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
	    pp->sched.taskq.local.busy.nosuspend = ns_pthlp;
	}
	if (pp->sched.taskq.in.last) {
	    ASSERT(pp->sched.taskq.in.first);
	    ASSERT(!pp->sched.taskq.in.last->u.alive.next);

	    pp->sched.taskq.in.last->u.alive.next = ptp;
	}
	else {
	    ASSERT(!pp->sched.taskq.in.first);

	    pp->sched.taskq.in.first = ptp;
	}
	pp->sched.taskq.in.last = ptp;
	flags = enqueue_proc2port_data(pp, sigdp, flags);
	res = 1;
    }
    erts_port_task_sched_unlock(&pp->sched);
    *flagsp = flags;
    return res;
}

static ERTS_INLINE void
prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
    erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags);

    if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) {
	*execqp = pp->sched.taskq.local.first;
	*processing_busy_q_p = 0;
    }
    else {
	*execqp = pp->sched.taskq.local.busy.first;
	*processing_busy_q_p = 1;
    }

    ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);

    while (1) {
	erts_aint32_t new, exp;

	new = exp = act;

	new &= ~ERTS_PTS_FLG_IN_RUNQ;
	new |= ERTS_PTS_FLG_EXEC;

	act = erts_smp_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp);

	ASSERT(act & ERTS_PTS_FLG_IN_RUNQ);

	if (exp == act)
	    break;
    }
}

/* finalize_exec() return value != 0 if port should remain active */
static ERTS_INLINE int
finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
{
    erts_aint32_t act;
    unsigned int prof_runnable_ports;

    if (!processing_busy_q)
	pp->sched.taskq.local.first = *execq;
    else {
	pp->sched.taskq.local.busy.first = *execq;
	ASSERT(*execq);
    }

    ERTS_PT_DBG_CHK_TASK_QS(pp, *execq, processing_busy_q);

    *execq = NULL;

    act = erts_smp_atomic32_read_nob(&pp->sched.flags);
    if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
	act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq);

    prof_runnable_ports = erts_system_profile_flags.runnable_ports;
    if (prof_runnable_ports)
	erts_port_task_sched_lock(&pp->sched);

    while (1) {
	erts_aint32_t new, exp;

	new = exp = act;

	new &= ~ERTS_PTS_FLG_EXEC;
	if (act & ERTS_PTS_FLG_HAVE_TASKS)
	    new |= ERTS_PTS_FLG_IN_RUNQ;

	act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);

	ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ));
	ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_EXEC_IMM));

	if (exp == act)
	    break;
    }

    if (prof_runnable_ports | IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
	/* trace port scheduling, out */
	if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS))
	    trace_sched_ports(pp, am_out);
	if (prof_runnable_ports) {
	    if (!(act & (ERTS_PTS_FLG_EXEC_IMM|ERTS_PTS_FLG_HAVE_TASKS)))
		profile_runnable_port(pp, am_inactive);
	    erts_port_task_sched_unlock(&pp->sched);
	}
    }

    return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0;
}

static ERTS_INLINE erts_aint32_t
select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
    erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags);

    if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
	flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq);

    ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);

    if (flags & ERTS_PTS_FLG_BUSY_PORT) {
	if (*processing_busy_q_p) {
	    ErtsPortTask *ptp;

	    ptp = pp->sched.taskq.local.busy.first = *execqp;
	    if (!ptp)
		pp->sched.taskq.local.busy.last = NULL;
	    else if (!(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY))
		no_sig_dep_move_from_busyq(pp);

	    *execqp = pp->sched.taskq.local.first;
	    *processing_busy_q_p = 0;

	    ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
	}

	return flags;
    }

    /* Not busy */

    if (!*processing_busy_q_p && pp->sched.taskq.local.busy.first) {
	pp->sched.taskq.local.first = *execqp;
	*execqp = pp->sched.taskq.local.busy.first;
	*processing_busy_q_p = 1;

	ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
    }

    return flags;
}

/*
 * check_task_for_exec() returns a value !0 if the task
 * is ok to execute; otherwise 0.
 */
static ERTS_INLINE int
check_task_for_exec(Port *pp,
		    erts_aint32_t flags,
		    ErtsPortTask **execqp,
		    int *processing_busy_q_p,
		    ErtsPortTask *ptp)
{

    if (!*processing_busy_q_p) {
	/* Processing normal queue */

	ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);

	if ((flags & ERTS_PTS_FLG_BUSY_PORT)
	    && (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) {

	    busy_wait_move_to_busy_queue(pp, ptp);
	    ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);

	    return 0;
	}

	if (pp->sched.taskq.local.busy.last
	    && (ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP)) {

	    int res = !check_sig_dep_move_to_busy_queue(pp, ptp);
	    ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);

	    return res;
	}

    }
    else {
	/* Processing busy queue */

	ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT));

	ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);

	popped_from_busy_queue(pp, ptp, !*execqp);

	if (!*execqp) {
	    *execqp = pp->sched.taskq.local.first;
	    *processing_busy_q_p = 0;
	}

	ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);

    }

    return 1;
}

static ErtsPortTask *
fetch_in_queue(Port *pp, ErtsPortTask **execqp)
{
    ErtsPortTask *ptp;
    ErtsPortTaskHandleList *free_nshp = NULL;

    erts_port_task_sched_lock(&pp->sched);

    ptp = pp->sched.taskq.in.first;
    pp->sched.taskq.in.first = NULL;
    pp->sched.taskq.in.last = NULL;
    if (ptp)
	*execqp = ptp->u.alive.next;
    else
	erts_smp_atomic32_read_band_nob(&pp->sched.flags,
					~ERTS_PTS_FLG_HAVE_TASKS);

    
    if (pp->sched.taskq.local.busy.nosuspend)
	free_nshp = get_free_nosuspend_handles(pp);

    erts_port_task_sched_unlock(&pp->sched);

    if (free_nshp)
	free_nosuspend_handles(free_nshp);

    return ptp;
}

static ERTS_INLINE ErtsPortTask *
select_task_for_exec(Port *pp,
		     ErtsPortTask **execqp,
		     int *processing_busy_q_p)
{
    ErtsPortTask *ptp;
    erts_aint32_t flags;

    flags = select_queue_for_exec(pp, execqp, processing_busy_q_p);

    while (1) {
	ptp = *execqp;
	if (ptp)
	    *execqp = ptp->u.alive.next;
	else {
	    ptp = fetch_in_queue(pp, execqp);
	    if (!ptp)
		return NULL;
	}
	if (check_task_for_exec(pp, flags, execqp, processing_busy_q_p, ptp))
	    return ptp;
    }
}

/*
 * Cut time slice
 */

int
erl_drv_consume_timeslice(ErlDrvPort dprt, int percent)
{
    Port *pp = erts_drvport2port(dprt);
    if (pp == ERTS_INVALID_ERL_DRV_PORT)
	return -1;
    if (percent < 1)
	percent = 1;
    else if (100 < percent)
	percent = 100;
    pp->reds += percent*((CONTEXT_REDS+99)/100);
    if (pp->reds < CONTEXT_REDS)
	return 0;
    pp->reds = CONTEXT_REDS;
    return 1;
}

void
erts_port_task_tmp_handle_detach(ErtsPortTaskHandle *pthp)
{
    ERTS_SMP_LC_ASSERT(erts_thr_progress_lc_is_delaying());
    reset_port_task_handle(pthp);
}

/*
 * Abort a scheduled task.
 */

int
erts_port_task_abort(ErtsPortTaskHandle *pthp)
{
    int res;
    ErtsPortTask *ptp;
#ifdef ERTS_SMP
    ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
#endif

    ptp = handle2task(pthp);
    if (!ptp)
	res = -1;
    else {
	erts_aint32_t old_state;

#ifdef DEBUG
	ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle;
	ERTS_SMP_READ_MEMORY_BARRIER;
	old_state = erts_smp_atomic32_read_nob(&ptp->state);
	if (old_state == ERTS_PT_STATE_SCHEDULED) {
	    ASSERT(!saved_pthp || saved_pthp == pthp);
	}
#endif

	old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
						  ERTS_PT_STATE_ABORTED,
						  ERTS_PT_STATE_SCHEDULED);
	if (old_state != ERTS_PT_STATE_SCHEDULED)
	    res = - 1; /* Task already aborted, executing, or executed */
	else {

	    reset_port_task_handle(pthp);

	    switch (ptp->type) {
	    case ERTS_PORT_TASK_INPUT:
	    case ERTS_PORT_TASK_OUTPUT:
	    case ERTS_PORT_TASK_EVENT:
		ASSERT(erts_smp_atomic_read_nob(
			   &erts_port_task_outstanding_io_tasks) > 0);
		erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
		break;
	    default:
		break;
	    }

	    res = 0;
	}
    }

#ifdef ERTS_SMP
    erts_thr_progress_unmanaged_continue(dhndl);
#endif

    return res;
}

void
erts_port_task_abort_nosuspend_tasks(Port *pp)
{
    ErtsPortTaskHandleList *abort_list;
#ifdef ERTS_SMP
    ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID;
#endif

    erts_port_task_sched_lock(&pp->sched);
    erts_smp_atomic32_read_band_nob(&pp->sched.flags,
				    ~ERTS_PTS_FLG_HAVE_NS_TASKS);
    abort_list = pp->sched.taskq.local.busy.nosuspend;
    pp->sched.taskq.local.busy.nosuspend = NULL;
    erts_port_task_sched_unlock(&pp->sched);

    while (abort_list) {
#ifdef DEBUG
	ErtsPortTaskHandle *saved_pthp;
#endif
	ErtsPortTaskType type;
	ErtsPortTaskTypeData td;
	ErtsPortTaskHandle *pthp;
	ErtsPortTask *ptp;
	ErtsPortTaskHandleList *pthlp;
	erts_aint32_t old_state;

	pthlp = abort_list;
	abort_list = pthlp->u.next;

#ifdef ERTS_SMP
	if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
	    dhndl = erts_thr_progress_unmanaged_delay();
#endif

	pthp = &pthlp->handle;
	ptp = handle2task(pthp);
	if (!ptp) {
#ifdef ERTS_SMP
	    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
		erts_thr_progress_unmanaged_continue(dhndl);
#endif
	    schedule_port_task_handle_list_free(pthlp);
	    continue;
	}

#ifdef DEBUG
	saved_pthp = ptp->u.alive.handle;
	ERTS_SMP_READ_MEMORY_BARRIER;
	old_state = erts_smp_atomic32_read_nob(&ptp->state);
	if (old_state == ERTS_PT_STATE_SCHEDULED) {
	    ASSERT(saved_pthp == pthp);
	}
#endif

	old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
						  ERTS_PT_STATE_ABORTED,
						  ERTS_PT_STATE_SCHEDULED);
	if (old_state != ERTS_PT_STATE_SCHEDULED) {
	    /* Task already aborted, executing, or executed */
#ifdef ERTS_SMP
	    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
		erts_thr_progress_unmanaged_continue(dhndl);
#endif
	    schedule_port_task_handle_list_free(pthlp);
	    continue;
	}

	reset_port_task_handle(pthp);

	type = ptp->type;
	td = ptp->u.alive.td;

#ifdef ERTS_SMP
	if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
	    erts_thr_progress_unmanaged_continue(dhndl);
#endif
	schedule_port_task_handle_list_free(pthlp);

	abort_nosuspend_task(pp, type, &td, pp->sched.taskq.bpq != NULL);
    }
}

/*
 * Schedule a task.
 */

int
erts_port_task_schedule(Eterm id,
			ErtsPortTaskHandle *pthp,
			ErtsPortTaskType type,
			...)
{
    ErtsProc2PortSigData *sigdp = NULL;
    ErtsPortTaskHandleList *ns_pthlp = NULL;
#ifdef ERTS_SMP
    ErtsRunQueue *xrunq;
    ErtsThrPrgrDelayHandle dhndl;
#endif
    ErtsRunQueue *runq;
    Port *pp;
    ErtsPortTask *ptp = NULL;
    erts_aint32_t act, add_flags;
    unsigned int prof_runnable_ports;

    ERTS_LC_ASSERT(!pthp || !erts_port_task_is_scheduled(pthp));

    ASSERT(is_internal_port(id));

#ifdef ERTS_SMP
    dhndl = erts_thr_progress_unmanaged_delay();
#endif

    pp = erts_port_lookup_raw(id);

#ifdef ERTS_SMP
    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
	if (pp)
	    erts_port_inc_refc(pp);
	erts_thr_progress_unmanaged_continue(dhndl);
    }
#endif

    if (!pp)
	goto fail;

    if (type != ERTS_PORT_TASK_PROC_SIG) {
	ptp = port_task_alloc();

	ptp->type = type;
	ptp->u.alive.flags = 0;

	erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);

	set_handle(ptp, pthp);
    }

    switch (type) {
    case ERTS_PORT_TASK_INPUT:
    case ERTS_PORT_TASK_OUTPUT: {
	va_list argp;
	va_start(argp, type);
	ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
	va_end(argp);
	erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
	break;
    }
    case ERTS_PORT_TASK_EVENT: {
	va_list argp;
	va_start(argp, type);
	ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
	ptp->u.alive.td.io.event_data = va_arg(argp, ErlDrvEventData);
	va_end(argp);
	erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
	break;
    }
    case ERTS_PORT_TASK_PROC_SIG: {
	va_list argp;
	va_start(argp, type);
	sigdp = va_arg(argp, ErtsProc2PortSigData *);
	ptp = p2p_sig_data_to_task(sigdp);
	ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback);
 	ptp->u.alive.flags |= va_arg(argp, int);
	va_end(argp);
	if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND))
	    set_tmp_handle(ptp, pthp);
	else {
	    ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
				  sizeof(ErtsPortTaskHandleList));
	    set_handle(ptp, &ns_pthlp->handle);
	}
	break;
    }
    default:
	break;
    }

    if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) {
	reset_handle(ptp);
	if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT))
	    goto abort_nosuspend;
	else
	    goto fail;
    }

    add_flags = ERTS_PTS_FLG_HAVE_TASKS;
    if (ns_pthlp)
	add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS;

    prof_runnable_ports = erts_system_profile_flags.runnable_ports;
    if (prof_runnable_ports)
	erts_port_task_sched_lock(&pp->sched);

    while (1) {
	erts_aint32_t new, exp;

	if ((act & add_flags) == add_flags
	    && (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
	    goto done; /* Done */

	new = exp = act;
	new |= add_flags;
	if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
	    new |= ERTS_PTS_FLG_IN_RUNQ;

	act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);

	if (exp == act) {
	    if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
		break; /* Need to enqueue port */
	    goto done; /* Done */
	}

	if (act & ERTS_PTS_FLG_EXIT)
	    goto done; /* Died after our task insert... */
    }

    if (prof_runnable_ports) {
	if (!(act & ERTS_PTS_FLG_EXEC_IMM))
	    profile_runnable_port(pp, am_active);
	erts_port_task_sched_unlock(&pp->sched);
	prof_runnable_ports = 0;
    }

    /* Enqueue port on run-queue */

    runq = erts_port_runq(pp);
    if (!runq)
	ERTS_INTERNAL_ERROR("Missing run-queue");

#ifdef ERTS_SMP
    xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
    ERTS_SMP_LC_ASSERT(runq != xrunq);
    ERTS_SMP_LC_VERIFY_RQ(runq, pp);
    if (xrunq) {
	/* Emigrate port ... */
	erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
	erts_smp_runq_unlock(runq);
	runq = erts_port_runq(pp);
	if (!runq)
	    ERTS_INTERNAL_ERROR("Missing run-queue");
    }
#endif

    enqueue_port(runq, pp);

    erts_smp_runq_unlock(runq);

    erts_smp_notify_inc_runq(runq);

done:

    if (prof_runnable_ports)
	erts_port_task_sched_unlock(&pp->sched);

#ifdef ERTS_SMP
    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
	erts_port_dec_refc(pp);
#endif

    return 0;

abort_nosuspend:

#ifdef ERTS_SMP
    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
	erts_port_dec_refc(pp);
#endif

    abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td, 0);

    ASSERT(ns_pthlp);
    erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
    if (ptp)
	port_task_free(ptp);

    return 0;

fail:

#ifdef ERTS_SMP
    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
	erts_port_dec_refc(pp);
#endif

    if (ns_pthlp)
	erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);

    if (ptp)
	port_task_free(ptp);

    return -1;
}

void
erts_port_task_free_port(Port *pp)
{
    erts_aint32_t flags;
    ErtsRunQueue *runq;

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
    ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));

    runq = erts_port_runq(pp);
    if (!runq)
	ERTS_INTERNAL_ERROR("Missing run-queue");
    erts_port_task_sched_lock(&pp->sched);
    flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
					    ERTS_PTS_FLG_EXIT);
    erts_port_task_sched_unlock(&pp->sched);
    erts_atomic32_read_bset_relb(&pp->state,
				 (ERTS_PORT_SFLG_CONNECTED
				  | ERTS_PORT_SFLG_EXITING
				  | ERTS_PORT_SFLG_CLOSING
				  | ERTS_PORT_SFLG_FREE),
				 ERTS_PORT_SFLG_FREE);

    erts_smp_runq_unlock(runq);

    if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
	begin_port_cleanup(pp, NULL, NULL);
}

/*
 * Execute scheduled tasks of a port.
 *
 * erts_port_task_execute() is called by scheduler threads between
 * scheduling of processes. Run-queue lock should be held by caller.
 */

int
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
    Port *pp;
    ErtsPortTask *execq;
    int processing_busy_q;
    int res = 0;
    int vreds = 0;
    int reds = 0;
    erts_aint_t io_tasks_executed = 0;
    int fpe_was_unmasked;
    erts_aint32_t state;
    int active;
    Uint64 start_time = 0;
    ErtsSchedulerData *esdp = runq->scheduler;
    ERTS_MSACC_PUSH_STATE_M();

    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));

    pp = pop_port(runq);
    if (!pp) {
	res = 0;
	goto done;
    }

    ERTS_SMP_LC_VERIFY_RQ(runq, pp);

    erts_smp_runq_unlock(runq);

    *curr_port_pp = pp;
    
    if (erts_sched_stat.enabled) {
	Uint old = ERTS_PORT_SCHED_ID(pp, esdp->no);
	int migrated = old && old != esdp->no;

	erts_smp_spin_lock(&erts_sched_stat.lock);
	erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_executed++;
	erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].executed++;
	if (migrated) {
	    erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_migrated++;
	    erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].migrated++;
	}
	erts_smp_spin_unlock(&erts_sched_stat.lock);
    }

    prepare_exec(pp, &execq, &processing_busy_q);

    erts_smp_port_lock(pp);

    /* trace port scheduling, in */
    if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
	trace_sched_ports(pp, am_in);
    }

    fpe_was_unmasked = erts_block_fpe();

    state = erts_atomic32_read_nob(&pp->state);
    pp->reds = ERTS_PORT_REDS_EXECUTE;
    ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
    goto begin_handle_tasks;

    while (1) {
	erts_aint32_t task_state;
	ErtsPortTask *ptp;

	ptp = select_task_for_exec(pp, &execq, &processing_busy_q);
	if (!ptp)
	    break;

	task_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
						   ERTS_PT_STATE_EXECUTING,
						   ERTS_PT_STATE_SCHEDULED);
	if (task_state != ERTS_PT_STATE_SCHEDULED) {
	    ASSERT(task_state == ERTS_PT_STATE_ABORTED);
	    goto aborted_port_task;
	}

	if (erts_system_monitor_long_schedule != 0) {
	    start_time = erts_timestamp_millis();
	}

	ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
	ERTS_SMP_CHK_NO_PROC_LOCKS;
	ASSERT(pp->drv_ptr);

	switch (ptp->type) {
	case ERTS_PORT_TASK_TIMEOUT:
	    reset_handle(ptp);
	    if (!ERTS_PTMR_IS_TIMED_OUT(pp))
		reds = 0;
	    else {
		ERTS_PTMR_CLEAR(pp);
		reds = ERTS_PORT_REDS_TIMEOUT;
		if (!(state & ERTS_PORT_SFLGS_DEAD)) {
		    DTRACE_DRIVER(driver_timeout, pp);
		    LTTNG_DRIVER(driver_timeout, pp);
                    if (IS_TRACED_FL(pp, F_TRACE_RECEIVE))
                        trace_port(pp, am_receive, am_timeout);
		    (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
		}
	    }
	    break;
	case ERTS_PORT_TASK_INPUT:
	    reds = ERTS_PORT_REDS_INPUT;
	    ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
            DTRACE_DRIVER(driver_ready_input, pp);
            LTTNG_DRIVER(driver_ready_input, pp);
	    /* NOTE some windows drivers use ->ready_input
	       for input and output */
	    (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data,
					ptp->u.alive.td.io.event);
	    reset_executed_io_task_handle(ptp);
	    io_tasks_executed++;
	    break;
	case ERTS_PORT_TASK_OUTPUT:
	    reds = ERTS_PORT_REDS_OUTPUT;
	    ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
            DTRACE_DRIVER(driver_ready_output, pp);
            LTTNG_DRIVER(driver_ready_output, pp);
	    (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data,
					 ptp->u.alive.td.io.event);
	    reset_executed_io_task_handle(ptp);
	    io_tasks_executed++;
	    break;
	case ERTS_PORT_TASK_EVENT:
	    reds = ERTS_PORT_REDS_EVENT;
	    ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
            DTRACE_DRIVER(driver_event, pp);
            LTTNG_DRIVER(driver_event, pp);
	    (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data,
				  ptp->u.alive.td.io.event,
				  ptp->u.alive.td.io.event_data);
	    reset_executed_io_task_handle(ptp);
	    io_tasks_executed++;
	    break;
	case ERTS_PORT_TASK_PROC_SIG: {
	    ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
	    reset_handle(ptp);
	    ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
	    if (!pp->sched.taskq.bpq)
		reds = ptp->u.alive.td.psig.callback(pp,
						     state,
						     ERTS_PROC2PORT_SIG_EXEC,
						     sigdp);
	    else {
		ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
		reds = ptp->u.alive.td.psig.callback(pp,
						     state,
						     ERTS_PROC2PORT_SIG_EXEC,
						     sigdp);
		dequeued_proc2port_data(pp, size);
	    }
	    break;
	}
	case ERTS_PORT_TASK_DIST_CMD:
	    reset_handle(ptp);
	    reds = erts_dist_command(pp, CONTEXT_REDS - pp->reds);
	    break;
	default:
	    erts_exit(ERTS_ABORT_EXIT,
		     "Invalid port task type: %d\n",
		     (int) ptp->type);
	    break;
	}

	reds += erts_port_driver_callback_epilogue(pp, &state);

	if (start_time != 0) {
	    Sint64 diff = erts_timestamp_millis() - start_time;
	    if (diff > 0 && (Uint) diff >  erts_system_monitor_long_schedule) {
		monitor_long_schedule_port(pp,ptp->type,(Uint) diff);
	    }
	}
	start_time = 0;

    aborted_port_task:
	schedule_port_task_free(ptp);

    begin_handle_tasks:
	if (state & ERTS_PORT_SFLG_FREE) {
	    reds += ERTS_PORT_REDS_FREE;
	    begin_port_cleanup(pp, &execq, &processing_busy_q);

	    break;
	}

	vreds += ERTS_PORT_CALLBACK_VREDS;
	reds += ERTS_PORT_CALLBACK_VREDS;

	pp->reds += reds;
	reds = 0;

	if (pp->reds >= CONTEXT_REDS)
	    break;
    }

    erts_unblock_fpe(fpe_was_unmasked);
    ERTS_MSACC_POP_STATE_M();


    if (io_tasks_executed) {
	ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
	       >= io_tasks_executed);
	erts_smp_atomic_add_relb(&erts_port_task_outstanding_io_tasks,
				 -1*io_tasks_executed);
    }

#ifdef ERTS_SMP
    ASSERT(runq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
#endif

    active = finalize_exec(pp, &execq, processing_busy_q);

    reds = pp->reds - vreds;

    erts_port_release(pp);

    *curr_port_pp = NULL;

    erts_smp_runq_lock(runq);
 
    if (active) {
#ifdef ERTS_SMP
	ErtsRunQueue *xrunq;
#endif

	ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));

#ifdef ERTS_SMP
	xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
	ERTS_SMP_LC_ASSERT(runq != xrunq);
	ERTS_SMP_LC_VERIFY_RQ(runq, pp);
	if (!xrunq) {
#endif
	    enqueue_port(runq, pp);
	    /* No need to notify ourselves about inc in runq. */
#ifdef ERTS_SMP
	}
	else {
	    /* Emigrate port... */
	    erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
	    erts_smp_runq_unlock(runq);

	    xrunq = erts_port_runq(pp);
	    ASSERT(xrunq);
	    enqueue_port(xrunq, pp);
	    erts_smp_runq_unlock(xrunq);
	    erts_smp_notify_inc_runq(xrunq);

	    erts_smp_runq_lock(runq);
	}
#endif
    }

 done:
    res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
	   != (erts_aint_t) 0);

    runq->scheduler->reductions += reds;

    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
    ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds);

    return res;
}

#ifdef ERTS_SMP
static void
release_port(void *vport)
{
    erts_port_dec_refc((Port *) vport);
}

static void
schedule_release_port(void *vport) {
  Port *pp = (Port*)vport;
  /* This is only used when a port release was ordered from a non-scheduler */
  erts_schedule_thr_prgr_later_op(release_port,
				  (void *) pp,
				  &pp->common.u.release);
}

#endif

static void
begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
    int i, max;
    ErtsPortTaskBusyCallerTable *tabp;
    ErtsPortTask *qs[3];
    ErtsPortTaskHandleList *free_nshp = NULL;
    ErtsProcList *plp;

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));

    /*
     * Abort remaining tasks...
     *
     * We want to process queues in the following order in order
     * to preserve signal ordering guarantees:
     *  1. Local busy queue
     *  2. Local queue
     *  3. In queue
     */

    max = 0;
    if (!execqp) {
	if (pp->sched.taskq.local.busy.first)
	    qs[max++] = pp->sched.taskq.local.busy.first;
	if (pp->sched.taskq.local.first)
	    qs[max++] = pp->sched.taskq.local.first;
    }
    else {
	if (*processing_busy_q_p) {
	    if (*execqp)
		qs[max++] = *execqp;
	    if (pp->sched.taskq.local.first)
		qs[max++] = pp->sched.taskq.local.first;
	}
	else {
	    if (pp->sched.taskq.local.busy.first)
		qs[max++] = pp->sched.taskq.local.busy.first;
	    if (*execqp)
		qs[max++] = *execqp;
	}
	*execqp = NULL;
	*processing_busy_q_p = 0;
    }
    pp->sched.taskq.local.busy.first = NULL;
    pp->sched.taskq.local.busy.last = NULL;
    pp->sched.taskq.local.first = NULL;
    tabp = pp->sched.taskq.local.busy.table;
    if (tabp) {
	int bix;
	for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
	    ErtsPortTaskBusyCaller *bcp = tabp->bucket[bix];
	    while (bcp) {
		ErtsPortTaskBusyCaller *free_bcp = bcp;
		bcp = bcp->next;
		if (free_bcp != &tabp->pre_alloc_busy_caller)
		    erts_free(ERTS_ALC_T_BUSY_CALLER, free_bcp);
	    }
	}

	busy_caller_table_free(tabp);
	pp->sched.taskq.local.busy.table = NULL;
    }

    erts_port_task_sched_lock(&pp->sched);
    qs[max] = pp->sched.taskq.in.first;
    pp->sched.taskq.in.first = NULL;
    pp->sched.taskq.in.last = NULL;
    erts_port_task_sched_unlock(&pp->sched);
    if (qs[max])
	max++;

    for (i = 0; i < max; i++) {
	while (1) {
	    erts_aint32_t state;
	    ErtsPortTask *ptp = qs[i];
	    if (!ptp)
		break;

	    qs[i] = ptp->u.alive.next;

	    /* Normal case here is aborted tasks... */
	    state = erts_smp_atomic32_read_nob(&ptp->state);
	    if (state == ERTS_PT_STATE_ABORTED)
		goto aborted_port_task;

	    state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
						  ERTS_PT_STATE_EXECUTING,
						  ERTS_PT_STATE_SCHEDULED);
	    if (state != ERTS_PT_STATE_SCHEDULED) {
		ASSERT(state == ERTS_PT_STATE_ABORTED);
		goto aborted_port_task;
	    }

	    reset_handle(ptp);

	    switch (ptp->type) {
	    case ERTS_PORT_TASK_TIMEOUT:
		break;
	    case ERTS_PORT_TASK_INPUT:
		erts_stale_drv_select(pp->common.id,
				      ERTS_Port2ErlDrvPort(pp),
				      ptp->u.alive.td.io.event,
				      DO_READ,
				      1);
		break;
	    case ERTS_PORT_TASK_OUTPUT:
		erts_stale_drv_select(pp->common.id,
				      ERTS_Port2ErlDrvPort(pp),
				      ptp->u.alive.td.io.event,
				      DO_WRITE,
				      1);
		break;
	    case ERTS_PORT_TASK_EVENT:
		erts_stale_drv_select(pp->common.id,
				      ERTS_Port2ErlDrvPort(pp),
				      ptp->u.alive.td.io.event,
				      0,
				      1);
		break;
	    case ERTS_PORT_TASK_DIST_CMD:
		break;
	    case ERTS_PORT_TASK_PROC_SIG: {
		ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
		if (!pp->sched.taskq.bpq)
		    ptp->u.alive.td.psig.callback(NULL,
						  ERTS_PORT_SFLG_INVALID,
						  ERTS_PROC2PORT_SIG_ABORT_CLOSED,
						  sigdp);
		else {
		    ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
		    ptp->u.alive.td.psig.callback(NULL,
						  ERTS_PORT_SFLG_INVALID,
						  ERTS_PROC2PORT_SIG_ABORT_CLOSED,
						  sigdp);
		    aborted_proc2port_data(pp, size);
		}
		break;
	    }
	    default:
		erts_exit(ERTS_ABORT_EXIT,
			 "Invalid port task type: %d\n",
			 (int) ptp->type);
	    }

	aborted_port_task:
	    schedule_port_task_free(ptp);
	}
    }

    erts_smp_atomic32_read_band_nob(&pp->sched.flags,
				    ~(ERTS_PTS_FLG_HAVE_BUSY_TASKS
				      |ERTS_PTS_FLG_HAVE_TASKS
				      |ERTS_PTS_FLGS_BUSY));

    erts_port_task_sched_lock(&pp->sched);

    /* Cleanup nosuspend handles... */
    free_nshp = (pp->sched.taskq.local.busy.nosuspend
		 ? get_free_nosuspend_handles(pp)
		 : NULL);
    ASSERT(!pp->sched.taskq.local.busy.nosuspend);

    /* Make sure not to leave any processes suspended on the port... */
    plp = pp->suspended;
    pp->suspended = NULL;

    erts_port_task_sched_unlock(&pp->sched);

    if (free_nshp)
	free_nosuspend_handles(free_nshp);

    if (erts_proclist_fetch(&plp, NULL)) {
#ifdef USE_VM_PROBES
	if (DTRACE_ENABLED(process_port_unblocked)) {
	    DTRACE_CHARBUF(port_str, 16);
	    DTRACE_CHARBUF(pid_str, 16);
	    ErtsProcList* plp2 = plp;

	    erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", pp->common.id);
	    while (plp2 != NULL) {
		erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->pid);
		DTRACE2(process_port_unblocked, pid_str, port_str);
	    }
	}
#endif
	erts_resume_processes(plp);
    }

    /*
     * Schedule cleanup of port structure...
     */
#ifdef ERTS_SMP
    /* We might not be a scheduler, eg. traceing to port we are sys_msg_dispatcher */
    if (!erts_get_scheduler_data()) {
      erts_schedule_misc_aux_work(1, schedule_release_port, (void*)pp);
    } else {
      /* Has to be more or less immediate to release any driver */
      erts_schedule_thr_prgr_later_op(release_port,
				      (void *) pp,
				      &pp->common.u.release);
    }
#else
    pp->cleanup = 1;
#endif
}

#ifdef ERTS_SMP

void
erts_enqueue_port(ErtsRunQueue *rq, Port *pp)
{
    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
    ASSERT(rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
    ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ);
    enqueue_port(rq, pp);
}

Port *
erts_dequeue_port(ErtsRunQueue *rq)
{
    Port *pp;
    ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
    pp = pop_port(rq);
    ASSERT(!pp
	   || rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
    ASSERT(!pp || (erts_smp_atomic32_read_nob(&pp->sched.flags)
		   & ERTS_PTS_FLG_IN_RUNQ));
    return pp;
}

#endif

/*
 * Initialize the module.
 */
void
erts_port_task_init(void)
{
    erts_smp_atomic_init_nob(&erts_port_task_outstanding_io_tasks,
			     (erts_aint_t) 0);
    init_port_task_alloc();
    init_busy_caller_table_alloc();
}