aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/beam/erl_port.h
blob: 67c8c28f3f6b92b79d81e5e77df34316809fe112 (plain) (tree)


















                                                                         


                                  
                                                          


                                                                     

                    

                          
                             



                                        
 



                                      

                                            
                                                           


























                                                                           












                                                                   




































                                                                 
                      




                                                                    
                     

                                


                         
      
                                                         













                                                                                    
                                                            
                                                                




                                                         






                                                                
 





                               

























































                                                                         













                                                                                   
                                       
                                                            
                                            
                                                            
                               
                                                            
                                               

                                                            
                                                                          

                                                              
                                               
                                                            






                                                                         
                                                     



                                                                         
                                                                         


                                                                         

                                 























                                                


                                          
                               
      

                                                  
                                    





                                                        


                                                                    






                                                     
                                                  

               


                                      


      
                                                  

               






                                                         


      











                                                                   



                                
               


                                                              
     
             
      




                             
               


                                                              





                               
               


                                                              
      






                                                                 
                                                         















                                                                                


                                                         


                                   



                                                          




                                                                                  
                                               











                                                                             
                                                                                


                                 


























                                                                      




                                                                    


 
                     































                                                                      



                                           
                        

              


                                                              


                                 

                                                                      
 


















                                                                      
     
 







                                                              
               





































                                                                      

         





                                                       

                       





                    
                                
 



















                                               









                                                                                    













                                             

                                                      

                   




                                                            
                                               
                        
             
                    
                                                

                                                      

                        
               




                             
              


                                 


                                                                     
                    

                                                  
                             
                    

                                                      
                    










                                               




                            



                                                             
                                      













                                                                             






























                                                                          

                                              

                                    

              

























                                                                        




















































                                     




























                                                                          









                                                                



















































                                                                                    
      
/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB 2012. All Rights Reserved.
 *
 * The contents of this file are subject to the Erlang Public License,
 * Version 1.1, (the "License"); you may not use this file except in
 * compliance with the License. You should have received a copy of the
 * Erlang Public License along with this software. If not, it can be
 * retrieved online at http://www.erlang.org/.
 *
 * Software distributed under the License is distributed on an "AS IS"
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 * the License for the specific language governing rights and limitations
 * under the License.
 *
 * %CopyrightEnd%
 */

#ifndef ERL_PORT_TYPE__
#define ERL_PORT_TYPE__
typedef struct _erl_drv_port Port;
typedef struct ErtsProc2PortSigData_ ErtsProc2PortSigData;
#endif

#if !defined(ERL_PORT_H__) && !defined(ERL_PORT_GET_PORT_TYPE_ONLY__)
#define ERL_PORT_H__

#include "erl_port_task.h"
#include "erl_ptab.h"
#include "erl_thr_progress.h"
#include "erl_trace.h"

#define ERTS_DEFAULT_MAX_PORTS (1 << 16)
#define ERTS_MIN_PORTS 1024

extern int erts_port_synchronous_ops;
extern int erts_port_schedule_all_ops;
extern int erts_port_parallelism;

typedef struct erts_driver_t_ erts_driver_t;

#define ERTS_INVALID_ERL_DRV_PORT ((ErlDrvPort) (SWord) -1)
#define SMALL_IO_QUEUE 5   /* Number of fixed elements */

typedef struct {
    ErlDrvSizeT size;       /* total size in bytes */

    SysIOVec* v_start;
    SysIOVec* v_end;
    SysIOVec* v_head;
    SysIOVec* v_tail;
    SysIOVec  v_small[SMALL_IO_QUEUE];

    ErlDrvBinary** b_start;
    ErlDrvBinary** b_end;
    ErlDrvBinary** b_head;
    ErlDrvBinary** b_tail;
    ErlDrvBinary*  b_small[SMALL_IO_QUEUE];
} ErlIOQueue;

typedef struct line_buf {  /* Buffer used in line oriented I/O */
    ErlDrvSizeT bufsiz;      /* Size of character buffer */
    ErlDrvSizeT ovlen;       /* Length of overflow data */
    ErlDrvSizeT ovsiz;       /* Actual size of overflow buffer */
    char data[1];            /* Starting point of buffer data,
			      data[0] is a flag indicating an unprocess CR,
			      The rest is the overflow buffer. */
} LineBuf;

/*
 * Items part of erlang:port_info/1 result. Note am_registered_name
 * *need* to be first.
 */

#define ERTS_PORT_INFO_1_ITEMS				\
    {	am_registered_name,	/* Needs to be first */	\
	am_name,					\
	am_links,					\
	am_id,						\
	am_connected,					\
	am_input,					\
	am_output }

/*
 * Port Specific Data.
 *
 * Only use PrtSD for very rarely used data.
 */

#define ERTS_PRTSD_SCHED_ID 0

#define ERTS_PRTSD_SIZE 1

typedef struct {
    void *data[ERTS_PRTSD_SIZE];
} ErtsPrtSD;

#ifdef ERTS_SMP
typedef struct ErtsXPortsList_ ErtsXPortsList;
#endif

/*
 * Port locking:
 *
 * Locking is done either driver specific or port specific. When
 * driver specific locking is used, all instances of the driver,
 * i.e. ports running the driver, share the same lock. When port
 * specific locking is used each instance have its own lock.
 *
 * Most fields in the Port structure are protected by the lock
 * referred to by the 'lock' field. This lock is shared between
 * all ports running the same driver when driver specific locking
 * is used.
 *
 * The 'sched' field is protected by the run queue lock that the
 * port currently is assigned to.
 *
 */

struct _erl_drv_port {
    ErtsPTabElementCommon common; /* *Need* to be first in struct */

    ErtsPortTaskSched sched;
    ErtsPortTaskHandle timeout_task;
#ifdef ERTS_SMP
    erts_mtx_t *lock;
    ErtsXPortsList *xports;
    erts_smp_atomic_t run_queue;
#else
    erts_atomic32_t refc;
    int cleanup;
#endif
    erts_atomic_t connected;	/* A connected process */
    Eterm caller;		/* Current caller. */
    Eterm data;			/* Data associated with port. */
    ErlHeapFragment* bp;	/* Heap fragment holding data (NULL if imm data). */
    Uint bytes_in;		/* Number of bytes read */
    Uint bytes_out;		/* Number of bytes written */

    ErlIOQueue ioq;              /* driver accessible i/o queue */
    DistEntry *dist_entry;       /* Dist entry used in DISTRIBUTION */
    char *name;		         /* String used in the open */
    erts_driver_t* drv_ptr;
    UWord drv_data;
    ErtsProcList *suspended;	 /* List of suspended processes. */
    LineBuf *linebuf;            /* Buffer to hold data not ready for
				    process to get (line oriented I/O)*/
    erts_atomic32_t state;	 /* Status and type flags */
    int control_flags;		 /* Flags for port_control()  */
    ErlDrvPDL port_data_lock;

    ErtsPrtSD *psd;		 /* Port specific data */
};

#define ERTS_PORT_GET_CONNECTED(PRT) \
    ((Eterm) erts_atomic_read_nob(&(PRT)->connected))
#define ERTS_PORT_SET_CONNECTED(PRT, PID) \
    erts_atomic_set_relb(&(PRT)->connected, (erts_aint_t) (PID))
#define ERTS_PORT_INIT_CONNECTED(PRT, PID) \
    erts_atomic_init_nob(&(PRT)->connected, (erts_aint_t) (PID))


struct erl_drv_port_data_lock {
    erts_mtx_t mtx;
    erts_atomic_t refc;
    Port *prt;
};

ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt);

#if ERTS_GLB_INLINE_INCL_FUNC_DEF

ERTS_GLB_INLINE ErtsRunQueue *
erts_port_runq(Port *prt)
{
#ifdef ERTS_SMP
    ErtsRunQueue *rq1, *rq2;
    rq1 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue);
    if (!rq1)
	return NULL;
    while (1) {
	erts_smp_runq_lock(rq1);
	rq2 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue);
	if (rq1 == rq2)
	    return rq1;
	erts_smp_runq_unlock(rq1);
	rq1 = rq2;
	if (!rq1)
	    return NULL;
    }
#else
    return ERTS_RUNQ_IX(0);
#endif
}

#endif


ERTS_GLB_INLINE void *erts_prtsd_get(Port *p, int ix);
ERTS_GLB_INLINE void *erts_prtsd_set(Port *p, int ix, void *new);

#if ERTS_GLB_INLINE_INCL_FUNC_DEF

ERTS_GLB_INLINE void *
erts_prtsd_get(Port *prt, int ix)
{
    return prt->psd ? prt->psd->data[ix] : NULL;
}

ERTS_GLB_INLINE void *
erts_prtsd_set(Port *prt, int ix, void *data)
{
    if (prt->psd) {
	void *old = prt->psd->data[ix];
	prt->psd->data[ix] = data;
	return old;
    }
    else {
	prt->psd = erts_alloc(ERTS_ALC_T_PRTSD, sizeof(ErtsPrtSD));
	prt->psd->data[ix] = data;
	return NULL;
    }
}

#endif

extern erts_smp_atomic_t erts_bytes_out;	/* no bytes written out */
extern erts_smp_atomic_t erts_bytes_in;		/* no bytes sent into the system */


/* port status flags */

#define ERTS_PORT_SFLG_CONNECTED	((Uint32) (1 <<  0))
/* Port have begun exiting */
#define ERTS_PORT_SFLG_EXITING		((Uint32) (1 <<  1))
/* Distribution port */
#define ERTS_PORT_SFLG_DISTRIBUTION	((Uint32) (1 <<  2))
#define ERTS_PORT_SFLG_BINARY_IO	((Uint32) (1 <<  3))
#define ERTS_PORT_SFLG_SOFT_EOF		((Uint32) (1 <<  4))
/* Flow control */
/* Port is closing (no i/o accepted) */
#define ERTS_PORT_SFLG_CLOSING		((Uint32) (1 <<  5))
/* Send a closed message when terminating */
#define ERTS_PORT_SFLG_SEND_CLOSED	((Uint32) (1 <<  6))
/* Line orinted io on port */  
#define ERTS_PORT_SFLG_LINEBUF_IO	((Uint32) (1 <<  7))
/* Immortal port (only certain system ports) */
#define ERTS_PORT_SFLG_FREE		((Uint32) (1 <<  8))
#define ERTS_PORT_SFLG_INITIALIZING	((Uint32) (1 <<  9))
/* Port uses port specific locking (opposed to driver specific locking) */
#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 10))
#define ERTS_PORT_SFLG_INVALID		((Uint32) (1 << 11))
/* Last port to terminate halts the emulator */
#define ERTS_PORT_SFLG_HALT		((Uint32) (1 << 12))
#ifdef DEBUG
/* Only debug: make sure all flags aren't cleared unintentionally */
#define ERTS_PORT_SFLG_PORT_DEBUG	((Uint32) (1 << 31))
#endif

/* Combinations of port status flags */ 
#define ERTS_PORT_SFLGS_DEAD						\
  (ERTS_PORT_SFLG_FREE | ERTS_PORT_SFLG_INITIALIZING)
#define ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP				\
  (ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID)
#define ERTS_PORT_SFLGS_INVALID_LOOKUP					\
  (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP				\
   | ERTS_PORT_SFLG_EXITING						\
   | ERTS_PORT_SFLG_CLOSING)
#define ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP				\
  (ERTS_PORT_SFLGS_INVALID_LOOKUP					\
   | ERTS_PORT_SFLG_DISTRIBUTION)


/*
 * Costs in reductions for some port operations.
 */
#define ERTS_PORT_REDS_EXECUTE		10
#define ERTS_PORT_REDS_FREE		100
#define ERTS_PORT_REDS_TIMEOUT		400
#define ERTS_PORT_REDS_INPUT		400
#define ERTS_PORT_REDS_OUTPUT		400
#define ERTS_PORT_REDS_EVENT		400
#define ERTS_PORT_REDS_CMD_OUTPUTV	400
#define ERTS_PORT_REDS_CMD_OUTPUT	400
#define ERTS_PORT_REDS_EXIT		300
#define ERTS_PORT_REDS_CONNECT		40
#define ERTS_PORT_REDS_UNLINK		40
#define ERTS_PORT_REDS_LINK		40
#define ERTS_PORT_REDS_BADSIG		40
#define ERTS_PORT_REDS_CONTROL		400
#define ERTS_PORT_REDS_CALL		400
#define ERTS_PORT_REDS_INFO		100
#define ERTS_PORT_REDS_SET_DATA		40
#define ERTS_PORT_REDS_GET_DATA		40
#define ERTS_PORT_REDS_TERMINATE	200

void print_port_info(Port *, int, void *);
void erts_port_free(Port *);
#ifndef ERTS_SMP
void erts_port_cleanup(Port *);
#endif
void erts_fire_port_monitor(Port *prt, Eterm ref);
#ifdef ERTS_SMP
int erts_port_handle_xports(Port *);
#endif

#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
int erts_lc_is_port_locked(Port *);
#endif

ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt);
ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt);
ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc);

ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt);
ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt);
ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt);

#if ERTS_GLB_INLINE_INCL_FUNC_DEF

ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt)
{
#ifdef ERTS_SMP
    erts_ptab_inc_refc(&prt->common);
#else
    erts_atomic32_inc_nob(&prt->refc);
#endif
}

ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt)
{
#ifdef ERTS_SMP
    int referred = erts_ptab_dec_test_refc(&prt->common);
    if (!referred)
	erts_port_free(prt);
#else
    int refc = erts_atomic32_dec_read_nob(&prt->refc);
    if (refc == 0)
	erts_port_free(prt);	
#endif
}

ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc)
{
#ifdef ERTS_SMP
    int referred = erts_ptab_add_test_refc(&prt->common, add_refc);
    if (!referred)
	erts_port_free(prt);
#else
    int refc = erts_atomic32_add_read_nob(&prt->refc, add_refc);
    if (refc == 0)
	erts_port_free(prt);	
#endif
}

ERTS_GLB_INLINE int
erts_smp_port_trylock(Port *prt)
{
#ifdef ERTS_SMP
    /* *Need* to be a managed thread */
    ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
    return erts_mtx_trylock(prt->lock);
#else
    return 0;
#endif
}

ERTS_GLB_INLINE void
erts_smp_port_lock(Port *prt)
{
#ifdef ERTS_SMP
    /* *Need* to be a managed thread */
    ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
    erts_mtx_lock(prt->lock);
#endif
}

ERTS_GLB_INLINE void
erts_smp_port_unlock(Port *prt)
{
#ifdef ERTS_SMP
    /* *Need* to be a managed thread */
    ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
    erts_mtx_unlock(prt->lock);
#endif
}

#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */


#define ERTS_INVALID_PORT_OPT(PP, ID, FLGS)			\
    (!(PP)							\
     || (erts_atomic32_read_nob(&(PP)->state) & (FLGS))	\
     || (PP)->common.id != (ID))

/* port lookup */

#define INVALID_PORT(PP, ID) \
  ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_LOOKUP)

/* Invalidate trace port if anything suspicious, for instance
 * that the port is a distribution port or it is busy.
 */
#define INVALID_TRACER_PORT(PP, ID)					\
  ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP)

#define ERTS_PORT_SCHED_ID(P, ID) \
  ((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID)))

extern const Port erts_invalid_port;
#define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port)

int erts_is_port_ioq_empty(Port *);
void erts_terminate_port(Port *);

#ifdef ERTS_SMP
Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks);
#endif

ERTS_GLB_INLINE Port *erts_pix2port(int);
ERTS_GLB_INLINE Port *erts_port_lookup_raw(Eterm);
ERTS_GLB_INLINE Port *erts_port_lookup(Eterm, Uint32);
ERTS_GLB_INLINE Port*erts_id2port(Eterm id);
ERTS_GLB_INLINE Port *erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32);
ERTS_GLB_INLINE void erts_port_release(Port *);
#ifdef ERTS_SMP
ERTS_GLB_INLINE Port *erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs);
ERTS_GLB_INLINE void erts_thr_port_release(Port *prt);
#endif
ERTS_GLB_INLINE Port *erts_thr_drvport2port_raw(ErlDrvPort);
ERTS_GLB_INLINE Port *erts_drvport2port_raw(ErlDrvPort drvport);
ERTS_GLB_INLINE Port *erts_drvport2port(ErlDrvPort, erts_aint32_t *);
ERTS_GLB_INLINE Port *erts_drvportid2port(Eterm);
ERTS_GLB_INLINE Eterm erts_drvport2id(ErlDrvPort);
ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm);
ERTS_GLB_INLINE int erts_is_port_alive(Eterm);
ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm);
ERTS_GLB_INLINE int erts_port_driver_callback_epilogue(Port *, erts_aint32_t *);

#if ERTS_GLB_INLINE_INCL_FUNC_DEF

ERTS_GLB_INLINE Port *erts_pix2port(int ix)
{
    Port *prt;
    ASSERT(0 <= ix && ix < erts_ptab_max(&erts_port));
    prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port, ix);
    return prt == ERTS_PORT_LOCK_BUSY ? NULL : prt;
}

ERTS_GLB_INLINE Port *
erts_port_lookup_raw(Eterm id)
{
    Port *prt;

    ERTS_SMP_LC_ASSERT(erts_thr_progress_lc_is_delaying());

    if (is_not_internal_port(id))
	return NULL;

    prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
					     internal_port_index(id));
    return prt && prt->common.id == id ? prt : NULL;
}

ERTS_GLB_INLINE Port *
erts_port_lookup(Eterm id, Uint32 invalid_sflgs)
{
    Port *prt = erts_port_lookup_raw(id);
    return (!prt
	    ? NULL
	    : ((invalid_sflgs & erts_atomic32_read_nob(&prt->state))
	       ? NULL
	       : prt));
}


ERTS_GLB_INLINE Port*
erts_id2port(Eterm id)
{
    erts_aint32_t state;
    Port *prt;

    /* Only allowed to be called from managed threads */
    ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());

    if (is_not_internal_port(id))
	return NULL;

    prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
					     internal_port_index(id));

    if (!prt || prt->common.id != id)
	return NULL;

    erts_smp_port_lock(prt);
    state = erts_atomic32_read_nob(&prt->state);
    if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
	erts_smp_port_unlock(prt);
	return NULL;
    }

    return prt;
}


ERTS_GLB_INLINE Port*
erts_id2port_sflgs(Eterm id,
		   Process *c_p, ErtsProcLocks c_p_locks,
		   Uint32 invalid_sflgs)
{
#ifdef ERTS_SMP
    int no_proc_locks = !c_p || !c_p_locks;
#endif
    erts_aint32_t state;
    Port *prt;

    /* Only allowed to be called from managed threads */
    ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());

    if (is_not_internal_port(id))
	return NULL;

    prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
					     internal_port_index(id));

    if (!prt || prt->common.id != id)
	return NULL;

#ifdef ERTS_SMP
    if (no_proc_locks)
	erts_smp_port_lock(prt);
    else if (erts_smp_port_trylock(prt) == EBUSY) {
	/* Unlock process locks, and acquire locks in lock order... */
	erts_smp_proc_unlock(c_p, c_p_locks);
	erts_smp_port_lock(prt);
	erts_smp_proc_lock(c_p, c_p_locks);
    }
#endif
    state = erts_atomic32_read_nob(&prt->state);
    if (state & invalid_sflgs) {
#ifdef ERTS_SMP
	erts_smp_port_unlock(prt);
#endif
	return NULL;
    }

    return prt;
}

ERTS_GLB_INLINE void
erts_port_release(Port *prt)
{
    /* Only allowed to be called from managed threads */
    ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
#ifdef ERTS_SMP
    erts_smp_port_unlock(prt);
#else
    if (prt->cleanup) {
	prt->cleanup = 0;
	erts_port_cleanup(prt);
    }
#endif
}

#ifdef ERTS_SMP

/*
 * erts_thr_id2port_sflgs() and erts_thr_port_release() can
 * be used by unmanaged threads in the SMP case.
 */
ERTS_GLB_INLINE Port *
erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs)
{
    Port *prt;
    ErtsThrPrgrDelayHandle dhndl;

    if (is_not_internal_port(id))
	return NULL;

    dhndl = erts_thr_progress_unmanaged_delay();

    prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
					     internal_port_index(id));

    if (!prt || prt->common.id != id) {
	erts_thr_progress_unmanaged_continue(dhndl);
	prt = NULL;
    }
    else {
	erts_aint32_t state;
	if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
	    erts_port_inc_refc(prt);
	    erts_thr_progress_unmanaged_continue(dhndl);
	}

	erts_mtx_lock(prt->lock);
	state = erts_atomic32_read_nob(&prt->state);
	if (state & invalid_sflgs) {
	    erts_mtx_unlock(prt->lock);
	    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
		erts_port_dec_refc(prt);
	    prt = NULL;
	}
    }

    return prt;
}

ERTS_GLB_INLINE void
erts_thr_port_release(Port *prt)
{
    erts_mtx_unlock(prt->lock);
#ifdef ERTS_SMP
    if (!erts_thr_progress_is_managed_thread())
	erts_port_dec_refc(prt);
#endif
}

#endif

ERTS_GLB_INLINE Port*
erts_thr_drvport2port_raw(ErlDrvPort drvport)
{
#if ERTS_ENABLE_LOCK_CHECK
    int emu_thread = erts_lc_is_emu_thr();
#endif
    if (drvport == ERTS_INVALID_ERL_DRV_PORT)
	return NULL;
    else {
	Port *prt = (Port *) drvport;
#if ERTS_ENABLE_LOCK_CHECK
	if (!ERTS_IS_CRASH_DUMPING) {
	    if (emu_thread) {
		ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
		ERTS_LC_ASSERT(!prt->port_data_lock
			       || erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
	    }
	    else {
		ERTS_LC_ASSERT(prt->port_data_lock);
		ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
	    }
	}
#endif
	return prt;
    }
}

ERTS_GLB_INLINE Port*
erts_drvport2port_raw(ErlDrvPort drvport)
{
    ERTS_LC_ASSERT(erts_lc_is_emu_thr());
    if (drvport == ERTS_INVALID_ERL_DRV_PORT)
	return NULL;
    else {
	Port *prt = (Port *) drvport;
	ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
			   || ERTS_IS_CRASH_DUMPING);
	return prt;
    }
}

ERTS_GLB_INLINE Port*
erts_drvport2port(ErlDrvPort drvport, erts_aint32_t *statep)
{
    Port *prt = erts_drvport2port_raw(drvport);
    erts_aint32_t state;
    if (!prt)
	return NULL;
    state = erts_atomic32_read_nob(&prt->state);
    if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
	return NULL;
    if (statep)
	*statep = state;
    return prt;
}

ERTS_GLB_INLINE Port*
erts_drvportid2port(Eterm id)
{
    Port *prt;
    erts_aint32_t state;
    if (is_not_internal_port(id))
	return NULL;
    prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port,
					    internal_port_index(id));
    if (!prt)
	return NULL;
    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
		       || ERTS_IS_CRASH_DUMPING);
    if (prt->common.id != id)
	return NULL;
    state = erts_atomic32_read_nob(&prt->state);
    if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
	return NULL;
    return prt;
}

ERTS_GLB_INLINE Eterm
erts_drvport2id(ErlDrvPort drvport)
{
    Port *prt = erts_drvport2port_raw(drvport);
    if (!prt)
	return am_undefined;
    else
	return prt->common.id;
}

ERTS_GLB_INLINE Uint32
erts_portid2status(Eterm id)
{
    Port *prt = erts_port_lookup_raw(id);
    if (prt)
	return (Uint32) erts_atomic32_read_acqb(&prt->state);
    else
	return ERTS_PORT_SFLG_INVALID;
}

ERTS_GLB_INLINE int
erts_is_port_alive(Eterm id)
{
    return !(erts_portid2status(id) & (ERTS_PORT_SFLG_INVALID
				       | ERTS_PORT_SFLGS_DEAD));
}

ERTS_GLB_INLINE int
erts_is_valid_tracer_port(Eterm id)
{
    return !(erts_portid2status(id) & ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
}

ERTS_GLB_INLINE int
erts_port_driver_callback_epilogue(Port *prt, erts_aint32_t *statep)
{
    int reds = 0;
    erts_aint32_t state;

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));

    state = erts_atomic32_read_nob(&prt->state);
    if ((state & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(prt)) {
	reds += ERTS_PORT_REDS_TERMINATE;
	erts_terminate_port(prt);
	state = erts_atomic32_read_nob(&prt->state);
	ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
    }

#ifdef ERTS_SMP
    if (prt->xports) {
	reds += erts_port_handle_xports(prt);
	ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
	ASSERT(!prt->xports);
    }
#endif

    if (statep)
	*statep = state;

    return reds;
}

#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */

void erts_port_resume_procs(Port *);

struct binary;

#define ERTS_P2P_SIG_TYPE_BAD			0
#define ERTS_P2P_SIG_TYPE_OUTPUT		1
#define ERTS_P2P_SIG_TYPE_OUTPUTV		2
#define ERTS_P2P_SIG_TYPE_CONNECT		3
#define ERTS_P2P_SIG_TYPE_EXIT			4
#define ERTS_P2P_SIG_TYPE_CONTROL		5
#define ERTS_P2P_SIG_TYPE_CALL			6
#define ERTS_P2P_SIG_TYPE_INFO			7
#define ERTS_P2P_SIG_TYPE_LINK			8
#define ERTS_P2P_SIG_TYPE_UNLINK		9
#define ERTS_P2P_SIG_TYPE_SET_DATA		10
#define ERTS_P2P_SIG_TYPE_GET_DATA		11

#define ERTS_P2P_SIG_TYPE_BITS			4
#define ERTS_P2P_SIG_TYPE_MASK \
    ((1 << ERTS_P2P_SIG_TYPE_BITS) - 1)

#define ERTS_P2P_SIG_DATA_FLG(N) \
    (1 << (ERTS_P2P_SIG_TYPE_BITS + (N)))
#define ERTS_P2P_SIG_DATA_FLG_BANG_OP		ERTS_P2P_SIG_DATA_FLG(0)
#define ERTS_P2P_SIG_DATA_FLG_REPLY		ERTS_P2P_SIG_DATA_FLG(1)
#define ERTS_P2P_SIG_DATA_FLG_NOSUSPEND		ERTS_P2P_SIG_DATA_FLG(2)
#define ERTS_P2P_SIG_DATA_FLG_FORCE		ERTS_P2P_SIG_DATA_FLG(3)
#define ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT	ERTS_P2P_SIG_DATA_FLG(4)
#define ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK	ERTS_P2P_SIG_DATA_FLG(5)
#define ERTS_P2P_SIG_DATA_FLG_SCHED		ERTS_P2P_SIG_DATA_FLG(6)

struct ErtsProc2PortSigData_ {
    int flags;
    Eterm caller;
    Uint32 ref[ERTS_MAX_REF_NUMBERS];
    union {
	struct {
	    Eterm from;
	    ErlIOVec *evp;
	    ErlDrvBinary *cbinp;
	} outputv;
	struct {
	    Eterm from;
	    char *bufp;
	    ErlDrvSizeT size;
	} output;
	struct {
	    Eterm from;
	    Eterm connected;
	} connect;
	struct {
	    Eterm from;
	    Eterm reason;
	    ErlHeapFragment *bp;
	} exit;
	struct {
	    struct binary *binp;
	    unsigned int command;
	    char *bufp;
	    ErlDrvSizeT size;
	} control;
	struct {
	    unsigned int command;
	    char *bufp;
	    ErlDrvSizeT size;
	} call;
	struct {
	    Eterm item;
	} info;
	struct {
	    Eterm port;
	    Eterm to;
	} link;
	struct {
	    Eterm from;
	} unlink;
	struct {
	    ErlHeapFragment *bp;
	    Eterm data;
	} set_data;
    } u;
} ;

ERTS_GLB_INLINE int
erts_proc2port_sig_is_command_op(ErtsProc2PortSigData *sigdp);
ERTS_GLB_INLINE ErlDrvSizeT
erts_proc2port_sig_command_data_size(ErtsProc2PortSigData *sigdp);

#if ERTS_GLB_INLINE_INCL_FUNC_DEF

ERTS_GLB_INLINE int
erts_proc2port_sig_is_command_op(ErtsProc2PortSigData *sigdp)
{
    switch (sigdp->flags & ERTS_P2P_SIG_TYPE_MASK) {
    case ERTS_P2P_SIG_TYPE_OUTPUT:	return !0;
    case ERTS_P2P_SIG_TYPE_OUTPUTV:	return !0;
    default:				return 0;
    }
}

ERTS_GLB_INLINE ErlDrvSizeT
erts_proc2port_sig_command_data_size(ErtsProc2PortSigData *sigdp)
{
    switch (sigdp->flags & ERTS_P2P_SIG_TYPE_MASK) {
    case ERTS_P2P_SIG_TYPE_OUTPUT:	return sigdp->u.output.size;
    case ERTS_P2P_SIG_TYPE_OUTPUTV:	return sigdp->u.outputv.evp->size;
    default:				return (ErlDrvSizeT) 0;
    }
}

#endif

#define ERTS_PROC2PORT_SIG_EXEC			0
#define ERTS_PROC2PORT_SIG_ABORT		1
#define ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND	2
#define ERTS_PROC2PORT_SIG_ABORT_CLOSED		3

typedef int (*ErtsProc2PortSigCallback)(Port *,
					erts_aint32_t,
					int,
					ErtsProc2PortSigData *);

typedef enum {
    ERTS_PORT_OP_BADARG,
    ERTS_PORT_OP_CALLER_EXIT,
    ERTS_PORT_OP_BUSY,
    ERTS_PORT_OP_BUSY_SCHEDULED,
    ERTS_PORT_OP_SCHEDULED,
    ERTS_PORT_OP_DROPPED,
    ERTS_PORT_OP_DONE
} ErtsPortOpResult;

ErtsPortOpResult
erts_schedule_proc2port_signal(Process *,
			       Port *,
			       Eterm,
			       Eterm *,
			       ErtsProc2PortSigData *,
			       int,
			       ErtsProc2PortSigCallback);

int erts_deliver_port_exit(Port *, Eterm, Eterm, int);

/*
 * Port signal flags
 */
#define ERTS_PORT_SIG_FLG_BANG_OP		ERTS_P2P_SIG_DATA_FLG_BANG_OP
#define ERTS_PORT_SIG_FLG_NOSUSPEND		ERTS_P2P_SIG_DATA_FLG_NOSUSPEND
#define ERTS_PORT_SIG_FLG_FORCE			ERTS_P2P_SIG_DATA_FLG_FORCE
#define ERTS_PORT_SIG_FLG_BROKEN_LINK		ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK
#define ERTS_PORT_SIG_FLG_BAD_OUTPUT		ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT
#define ERTS_PORT_SIG_FLG_FORCE_SCHED		ERTS_P2P_SIG_DATA_FLG_SCHED

/*
 * Port ! {Owner, {command, Data}}
 * Port ! {Owner, {connect, NewOwner}}
 * Port ! {Owner, close}
 */
ErtsPortOpResult erts_port_command(Process *, int, Port *, Eterm, Eterm *);

/*
 * Signals from processes to ports.
 */
ErtsPortOpResult erts_port_output(Process *, int, Port *, Eterm, Eterm, Eterm *);
ErtsPortOpResult erts_port_exit(Process *, int, Port *, Eterm, Eterm, Eterm *);
ErtsPortOpResult erts_port_connect(Process *, int, Port *, Eterm, Eterm, Eterm *);
ErtsPortOpResult erts_port_link(Process *, Port *, Eterm, Eterm *);
ErtsPortOpResult erts_port_unlink(Process *, Port *, Eterm, Eterm *);
ErtsPortOpResult erts_port_control(Process *, Port *, unsigned int, Eterm, Eterm *);
ErtsPortOpResult erts_port_call(Process *, Port *, unsigned int, Eterm, Eterm *);
ErtsPortOpResult erts_port_info(Process *, Port *, Eterm, Eterm *);
ErtsPortOpResult erts_port_set_data(Process *, Port *, Eterm, Eterm *);
ErtsPortOpResult erts_port_get_data(Process *, Port *, Eterm *);

#endif