aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/beam/dist.c
blob: 9efb7e79ac68a0486a95c8d5744847f1ac973b1f (plain) (tree)
1
2
3
4
5
6
7
8
9

                   
  
                                                        
  


                                                                   
  






                                                                           
  









                                                  
                                   
















                               
                             
                           
                               
 

                                










                                                                
                                         
 
                                                






                                                                   


                                                            

                            

                                                          






                                                                               
                                                         

                                                                          
















                                                                              
                                                         

                             

                                 
                                  

                     
                                       


                          
                                                                        

                                                                       
                                                             

                                                               
 

                               





                        





                                                     

                                                     









                            
                                     



                                                                         
                                    







                                                                    
                                                                        


                     
                                                              
 
                        
                                                       
                                                                    

                                    



                                                    



                                                 


     


























































































































































                                                                                     


















































                                                                         





                                                   
                                                 
 
                              





                                                                                  
                                






                                
                                                        











                                                                 
                                                     

                                             
                                  
 



                               

                                                             















                                                        







                                                                              
                        

                         


                                                       






                                    
                                                 
 


                                                         
 
                                       


                                                               
                                                                            
                                                                                  
                                         
                                           

                                                                             
                                                                                  
                                         
                                           
             








                                                                                
                                          



                                                   
 

                                                












                                                                                 





                                                 

                                                                        


                                               
 
 
          
                                                        














                                                                  
     
                                                                 



                                 

                     

                                                        
 
                                         
                                                                                   
 


                                                            
 
                                                  
            
                                                                            


              
                                               
                                       

                                                                               
                                         

         

                        



                                















                                                                    

                                               
                              
 







                                                                  
 






                                          
     


                   








                                                   


                           

                                                                    


                                



                          


                          

                                        

                                                         
                                                           


                                                                    
     

                                                                        


                                                                     


                                                                         
     










                                                                     


                                                         
                                         









                                                                  
                          








                                                 
                                                             
 

                            
                                                       
 






                                                                   




                                             

                                

                                    

                                          
 

                
 


                                                                       









                                     
                                   
                                                              

                                                     
                                     


     
                                            



                                                        
                                                    








                                                                        

                                        

             










                                                                           
                                                     


                                                        
                                                     



                                                                       
                                                    


 
















                                                                           
                                     
                                                                          

                        
 
                                      

                          




                                                                    
                                     
                                                                            
            
 
                        
                                      

                          


 
                                                                         
                                                                      




                                                                        


                                     
                                                                        





                                                                               
                        



                                                              
                                      

                          









                                                                               

                                     
 
                                                                        








                                                                              
                        



                                           
                                      

                          



                                                                             
                                                           




                                                             

                                     
 
                                                                        





                                                                            
                        



                                             
                                          

                          

 


                                                                       
                                                     













                                                                              
   
                                                                     

              
                      
                                    
            




                         


                                      
      
 
                                                 



                                                                         
                    

                                                                              
                                                                        
                                                   



                                                                                
                                     
                                   
                                                        



                                                                 
      
 





                                                                           
                                                 








                                                     
         




                                                                               
         
     
 



                                                                       



                                               
               


   

                                                        

              
                      
                                    
            




                         


                                       
      
 
                                                 



                                                                              
                    

                                                                              
                                                                        
                                                   


                                                                                

                                                         
                                   
                                                        



                                                                 
      
 
                                                            
                                                                    
                                                                      
        
                                                                 
                                                               
 



                                                                       



                                               
               







                                                                      

                                     
                    
                                 


                         



                                     
      
 
                        
                               






                                                                                
                    

                                                     




                                                                            
                                                    

                                                                          
                                   
                                                        



                                                                 
      

                                                                         
                                 
                                      

                          




                                                                                






                                                              
                                 
                                       

                          




                                                                                 


                                     
 


                                                               
 
                                      

                          





                                                                           


                                     
 


                                                               
 
                                      

                          








                                                             
                                                                              
                                                               












                                                
 

                                    
                                    
                                
                                      
                               
                                     

                         
                 






                           
                                                            
                             
                            



                     
                     

                        
                               

      
                                            
 
                           
 
                                                        
 

                                                  
                 
     
 
                      
 

                                                  
                 
     





                                
                                                                          
 



                                                              



                                                                                  



                                                  
                        

                                                                                             
      






                                                                              
     

                                          



                                                                        
                                                                        
                                               

                            
                                                                                    


                                 
                          
     

                        
                                              


                             
                                                                         
                                 
                             


                   
                

                                            



                         


                                 


                                                     

                                      
 

















                                                                         
 

                                                           
 








                                                                    
 




                                                                             
 
              
     

                      


                                 

                        


                                                 
                                 
 


                                                                     
 

                                    
 
                                                      





                                                                 



                         


                                 




                                                        





                                                         
                                 
 















                                                                               
 



                                                               
 

                                                                    





                                                                       

                                                          











                                                                             






                                                                           


                                 


                           

                       
                              


                                 



























                                                                                    

              
                         



                                 









                                                                            



                                                          





                                                 


                                                 
                                                     














                                                             
                                                                       





                                                                 
                                                                      
                      
                                            


              

                              
                     
 


                                 






                                                
                  







                                 
          


                                                                      
           



                                                                       


                                                 
                      


                                 
                                  
                 



                                                            
                                    



                                                            
                                                                       

                                           


                                                                 
                                                                          
                      
                                            

              
     




                                                                              



                                 

                                                                 


                           
                            
                                 
 

                                                                 
 






                                                                         
 

                                                               




                     

                                                   







                                     
                






                                     
         


                                                   
                                 

         


                                                         


                      
                   

                                                   






                                     
                









                                                          
         

                                                                  
              
 
                          


                                 

                                              


                                                 
 
                                                                    

              

                             

     
                                 


                                                      
                                              
                           
             





                                                                           
             
                             
                                 


                                                      
           
                                              
                                            
                           


              
                                                                       
 













                                                                             
























                                                                      




                                                                      
              
 


                                       
                                     
                                  
 

                                              
 
                                    

                                                                             
 

                                         
 

                                                              
                                              


                                 
                                              
             
 
                            


                                                                

          
                                                       


                                                                                        






                                                        
 
                                                       




                                                                                
 







                                                                             
                                                                                                       




                                                                                                    





                                                      
                                         


                                                       
 

                                                         


                                                                                                                      
             
 






                                                          
                                                                                       



                                                                                
                                                                 
              

                                                                           
               
                                   
                               
                           

                                                   
                                                               
                                                                                 
                                     


                                          

                                    
                                         


                                                 
                                           
                                                                                  
                                      
                                                            
                                                                                        
                                                                                

                                               

                                                                       

                                                                               






                                                                         
                                                                      
                                                 



                                                                      
                                               
                 
 







                                                          
                                                                
                                                       





















                                                                                              
 
                                             
                                                          





                                                              
                                     

                                                            





































                                                                                              
                                                                                          
         

     


                                                                                     
     
                  

 



                                                     

                     
 

                                                
 







                                           
 
                    



                                       


                                                                          



                                                              
      
 

                                        
                                                                    







                                                      
                     



                        

                                                
 



                           











                                                                    
 



                     





                                                                  

                                  
                    



                                       


                                                                          



                                                               
      








                                                                
                    
                                                 
                      





                                        







                                                              
                                              
 
                                                             
                                
                 
                             


                                                     
                              
                                                        
 
                                                
 
                                                    
 
                       
                       
                       
                     
                         
 
                                         
                                                                     

                                             

     
                                           
 









                                                     
                               



                                    
                                 





                                               
                                                            
 
                 

                       
                                                               

                        


                                           
                                             
                                          
                            

                                        
      




                                                       
                                                                    
                                                                    
                                                     
                      






                                        
                                               

                                  
                                                     
                      


                          
                                                                                  



                                     
                         
























                                                                          


          
                    

                                      

                                   




                                                                                    
                                                       
                                                            
                                          
                                             
                                          
                            

                                        
      




                                                       
                                                                    
                                                                     
                                                                               
                                   






















                                                                     
                                   


                                                                              
                           
                     
                                                   
                                                             


                                                                     
                                         

                                                        
                                                            

            
                                         







                                  
                                   
            

                                                                         

                           
                                                                  
      
                                     

     
                                      








                                                   

                         
 
                               

           



                                              
 
                                    
 
                                          




















                                                           






                                                         
                                       
                                                        




                                                 
                                         






                                              











                                                
                               




                                    
                                 

















































































                                                                    
                   



                                     
                                                               






                                                              
                       
 




                                        

                                         
                                                 

                                           
                                                   



                                                                  
                                                          
                                                                         
                                                       

                               
                                                               









                                                                               
                         












                                          
                   










                                               
                                                          


                                 
                                                                      



                                          
                                    







                                                                              
                                                     
 
                                                                         





                                                               
                                                   















                                                   

                                                                     



                                 
                       
 



                                        


                                                       
 
                         
 























                                                              
                   



                                     
                                                               




                                       




                                        

                                                 
                         






                                                     

                                                        



                            
                   



                                     
                                                               

                                 
                       
 




                                        
                                            















                                                    
                                        
                                                       
                            
                                           



                                                                
                                             





                                             
                                 

                             






                                                                                 

         


                                              

     
                                     
 
                         










                                                        
                                                                    


                                          
                                                                        
                                          
                                   
                                                                   
                                     

                                                              
                                                            


         
                                                     

 


                                  
                    



                                             


                                                                          



                                                           
      


                                          


                                                
                                                  
 
                                       










                                                                       
    
                                                         
 
                        
                                     
                                                   
 
                             
     
                          


                      
               




                                                                  
                                                      
                                                      








                                                                 
                                                             







                                                                                  


     
                                                                     

                                         







                                                               



                                                            



                                                           

 
                                                                  

                                         



                                                    






                             
          
                                                                                  

























                                                                              

                                       

                                                                                 







                                                          

                                   



           
                                                                             






















                                                                      



                                                                 
                                                                       


                                                

















                                                                              








                                                                             






















                                                                         
                                                  


                   





                                                                                

                   
                                               
                                                                           

                                        




                                              

            
                                             
                                                                    
                                               

      

                                                 
                   


                                                                     

                                               
 




                                                                    
                                              

                                                               





                             



                                                 
 















                                                               


                    

                                                            
                          
                      

                    



                                   


                                      
 















                                                                    
 
                                        
                                                                 
                                                      

                                                                            

                                                                   
                                                                             





                                                               




                                                              









                                                       

                                            
                                       
 


                                   
 


                                                                      
 
                                 
                            


                                                
         

                                         
 





                                                              
 






                                                                           
 
                       
 
                                                  
 


                                                                                  
     
          
                       
 



                                                                
                            

                      
                                                
                        







                                                               

                                                   
 
                                                                            
 

                                               
 
                                                            
 



                                          
 






                                                                                 

         





                                                                                 
                                            
                                                                            

















                                               

     




                              
















                                                               

                           
 
                                                              
                                                
 


                                          
                                                     
 
















                                                                           
 
                          
 

                                           
 

                   
                            
                                  
                                     

                                                                         
 
 






                                                                                     
                   



                   
                                       



                        
                                            
                    
 
                                 
 

                             
 

                                       
 
                                                   
 

                   
 

                                                                    



                                                  
                                                    


                                              
                                                 
                     
                                                                    











                                                  

 
 




                                                   
                                                       


                   

                  
 



                                                    




                                      


                        
                         
                                 

                               
                                     

                            
                                         
                                     
              

                                                                            

                          

                                                                      
                               
                     
 
 
                                                            
 
                        
 

                                      
                                                     

                             
                                                   

                                
                                   
                      
                            
 
                                 
 

                        
 

                           









                                                                   

                                               

                              

                                                                        






                                                              
                                      
 
                    
     

                          
 
 





                                                     
                                                         

                   

                   
 



                                                          
                                         
                                 

     

                                          
                     
 
 


                                                                              
                                           










                                                     
                                         





                                                                      
                                           


                     
          
                                                           
           
                                                           
                                                           
                                                

                                                                 
                                             
                                                                     



                                               








































                                                                        
                                                        




                               


                        









                                                                            
                                                                            



                                         


                               


               
                                             
 
                                                      
                                                
                                                 

                                                 

                                                            









                                                
                                                   
                  






                                 
                       
                                                                        



                                                    



                                                                  
       
     














                                                                  
                                               


                          
                    



                            













                                                                        

                                                               
 

                          
            
                          
 
                                                                     
                                   






                                                                              
                                 


                   
                             
     

                    
 












































                                                                                 
     
 
                   

                         
 

                                                   
                  
 
                                               
                                                      



                                                                                 
                         






                                                               
                             
             
                            














































                                                                          
         
 

                                                         
         
 
                             

              

     


                    
 















                                                                         

 





                                                                  



                                       
                                                            













                                                      
                      
                  
                        



















                                                                           
                                     

                                   














                                                                      
                                                             
                                                                                 
                          
                          

 

                                                       
 




                                                                           
 




                                             
 









































                                                                

         











































                                                                          
     

                                             

 

                                            
 
                         
 

                                              
 
                                     
 







                                                            
 



                       
 



                               
 




                                                     
 




                                                         
 
              




                                                                                   




                                             



                           
                    





                                       



                                                                                         


                                                          
      
 



                                                             
                                       









                                                                          
 
                                         
 



                                                 
 


                                                  





                                   
                                                             


                             

                                                                 


                             
                                                                    


             

                          
 






                                         
 

                                                            
 




                                                     
 







                                                             
 


                                               
 
                                                                          
 
                                 
 





                                                                      
     
 

                                        
 
                           
 


                
              
                              
 
 















































                                                                           

 


                                             



                                                      
              
            




                
                                                                           



                                               
                                       

           

                    
 








                                                     
 



                             

                        



                       
                                         
 



                                             
 
/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB 1996-2018. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * %CopyrightEnd%
 */

/*
 * distribution of erlang messages to other nodes.
 */


/* define this to get a lot of debug output */
/* #define ERTS_DIST_MSG_DBG */
/* #define ERTS_RAW_DIST_MSG_DBG */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#define ERTS_WANT_EXTERNAL_TAGS

#include <stddef.h>
#include "sys.h"
#include "erl_vm.h"
#include "global.h"
#include "erl_process.h"
#include "error.h"
#include "dist.h"
#include "bif.h"
#include "external.h"
#include "erl_binary.h"
#include "erl_thr_progress.h"
#include "dtrace-wrapper.h"
#include "erl_proc_sig_queue.h"

#define DIST_CTL_DEFAULT_SIZE 64

/* Turn this on to get printouts of all distribution messages
 * which go on the line
 */
#if 0
#define ERTS_DIST_MSG_DBG
#endif
#if 0
#define ERTS_RAW_DIST_MSG_DBG
#endif

#if defined(ERTS_DIST_MSG_DBG) || defined(ERTS_RAW_DIST_MSG_DBG)
static void bw(byte *buf, ErlDrvSizeT sz)
{
    bin_write(ERTS_PRINT_STDERR, NULL, buf, sz);
}
#endif

#ifdef ERTS_DIST_MSG_DBG
static void
dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
{
    ErtsHeapFactory factory;
    DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
    Eterm* ctl = ctl_default;
    byte *extp = edep->extp;
    Eterm msg;
    Sint ctl_len;
    Sint size = ctl_len = erts_decode_dist_ext_size(edep);
    if (size < 0) {
	erts_fprintf(stderr,
		     "DIST MSG DEBUG: erts_decode_dist_ext_size(%s) failed:\n",
		     what);
	bw(buf, sz);
    }
    else {
	ErlHeapFragment *mbuf = new_message_buffer(size);
	erts_factory_static_init(&factory, ctl, ctl_len, &mbuf->off_heap);
	msg = erts_decode_dist_ext(&factory, edep);
	if (is_value(msg))
	    erts_fprintf(stderr, "    %s: %T\n", what, msg);
	else {
	    erts_fprintf(stderr,
			 "DIST MSG DEBUG: erts_decode_dist_ext(%s) failed:\n",
			 what);
	    bw(buf, sz);
	}
	free_message_buffer(mbuf);
	edep->extp = extp;
    }
}

#endif



int erts_is_alive; /* System must be blocked on change */
int erts_dist_buf_busy_limit;


/* distribution trap functions */
Export* dmonitor_node_trap = NULL;

/* local variables */
static Export *dist_ctrl_put_data_trap;

/* forward declarations */

static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
static Sint abort_connection(DistEntry* dep, Uint32 conn_id);
static ErtsDistOutputBuf* clear_de_out_queues(DistEntry*);
static void free_de_out_queues(DistEntry*, ErtsDistOutputBuf*);

static erts_atomic_t no_caches;
static erts_atomic_t no_nodes;

struct {
    Eterm reason;
    ErlHeapFragment *bp;
} nodedown;


static void
delete_cache(ErtsAtomCache *cache)
{
    if (cache) {
	erts_free(ERTS_ALC_T_DCACHE, (void *) cache);
	ASSERT(erts_atomic_read_nob(&no_caches) > 0);
	erts_atomic_dec_nob(&no_caches);
    }
}


static void
create_cache(DistEntry *dep)
{
    int i;
    ErtsAtomCache *cp;

    ERTS_LC_ASSERT(is_nil(dep->cid));
    ASSERT(!dep->cache);

    dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE,
						  sizeof(ErtsAtomCache));
    erts_atomic_inc_nob(&no_caches);
    for (i = 0; i < sizeof(cp->in_arr)/sizeof(cp->in_arr[0]); i++) {
	cp->in_arr[i] = THE_NON_VALUE;
	cp->out_arr[i] = THE_NON_VALUE;
    }
}

Uint erts_dist_cache_size(void)
{
    return (Uint) erts_atomic_read_mb(&no_caches)*sizeof(ErtsAtomCache);
}

static ErtsProcList *
get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs)
{
    erts_aint32_t qflgs;
    ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
    qflgs = erts_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs);
    qflgs &= ~unset_qflgs;
    if (qflgs & ERTS_DE_QFLG_EXIT) {
	/* No resume when exit has been scheduled */
	return NULL;
    }
    else {
	ErtsProcList *suspended = dep->suspended;
	dep->suspended = NULL;
	erts_proclist_fetch(&suspended, NULL);
	return suspended;
    }
}

#define ERTS_MON_LNK_FIRE_LIMIT 100

static void monitor_connection_down(ErtsMonitor *mon, void *unused)
{
    if (erts_monitor_is_origin(mon))
        erts_proc_sig_send_demonitor(mon);
    else
        erts_proc_sig_send_monitor_down(mon, am_noconnection);
}

static void link_connection_down(ErtsLink *lnk, void *vdist)
{
    erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, lnk,
                                 am_noconnection, NIL);
}

typedef enum {
    ERTS_CML_CLEANUP_STATE_LINKS,
    ERTS_CML_CLEANUP_STATE_MONITORS,
    ERTS_CML_CLEANUP_STATE_ONAME_MONITORS,
    ERTS_CML_CLEANUP_STATE_NODE_MONITORS
} ErtsConMonLnkCleaupState;

typedef struct {
    ErtsConMonLnkCleaupState state;
    ErtsMonLnkDist *dist;
    void *yield_state;
    int trigger_node_monitors;
    Eterm nodename;
    Eterm visability;
    Eterm reason;
    ErlOffHeap oh;
    Eterm heap[1];
} ErtsConMonLnkCleanup;

static void
con_monitor_link_cleanup(void *vcmlcp)
{
    ErtsConMonLnkCleanup *cmlcp = vcmlcp;
    ErtsMonLnkDist *dist = cmlcp->dist;
    ErtsSchedulerData *esdp;
    int yield;

    switch (cmlcp->state) {
    case ERTS_CML_CLEANUP_STATE_LINKS:
        yield = erts_link_list_foreach_delete_yielding(&dist->links,
                                                       link_connection_down,
                                                       NULL, &cmlcp->yield_state,
                                                       ERTS_MON_LNK_FIRE_LIMIT);
        if (yield)
            break;

        ASSERT(!cmlcp->yield_state);
        cmlcp->state = ERTS_CML_CLEANUP_STATE_MONITORS;
    case ERTS_CML_CLEANUP_STATE_MONITORS:
        yield = erts_monitor_list_foreach_delete_yielding(&dist->monitors,
                                                          monitor_connection_down,
                                                          NULL, &cmlcp->yield_state,
                                                          ERTS_MON_LNK_FIRE_LIMIT);
        if (yield)
            break;

        ASSERT(!cmlcp->yield_state);
        cmlcp->state = ERTS_CML_CLEANUP_STATE_ONAME_MONITORS;
    case ERTS_CML_CLEANUP_STATE_ONAME_MONITORS:
        yield = erts_monitor_tree_foreach_delete_yielding(&dist->orig_name_monitors,
                                                          monitor_connection_down,
                                                          NULL, &cmlcp->yield_state,
                                                          ERTS_MON_LNK_FIRE_LIMIT/2);
        if (yield)
            break;

        cmlcp->dist = NULL;
        erts_mon_link_dist_dec_refc(dist);

        ASSERT(!cmlcp->yield_state);
        cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS;
    case ERTS_CML_CLEANUP_STATE_NODE_MONITORS:
        if (cmlcp->trigger_node_monitors) {
            send_nodes_mon_msgs(NULL,
                                am_nodedown,
                                cmlcp->nodename,
                                cmlcp->visability,
                                cmlcp->reason);
        }
        erts_cleanup_offheap(&cmlcp->oh);
        erts_free(ERTS_ALC_T_CML_CLEANUP, vcmlcp);
        return; /* done */
    }

    /* yield... */

    esdp = erts_get_scheduler_data();
    ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL);
    erts_schedule_misc_aux_work((int) esdp->no,
                                con_monitor_link_cleanup,
                                (void *) cmlcp);
}

static void
schedule_con_monitor_link_cleanup(ErtsMonLnkDist *dist,
                                  Eterm nodename,
                                  Eterm visability,
                                  Eterm reason)
{
    if (dist || is_value(nodename)) {
        ErtsSchedulerData *esdp;
        ErtsConMonLnkCleanup *cmlcp;
        Uint rsz, size;

        size = sizeof(ErtsConMonLnkCleanup);

        if (is_non_value(reason) || is_immed(reason)) {
            rsz = 0;
            size -= sizeof(Eterm);
        }
        else {
            rsz = size_object(reason);
            size += sizeof(Eterm) * (rsz - 1);
        }

        cmlcp = erts_alloc(ERTS_ALC_T_CML_CLEANUP, size);

        ERTS_INIT_OFF_HEAP(&cmlcp->oh);

        cmlcp->yield_state = NULL;
        cmlcp->dist = dist;
        if (!dist)
            cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS;
        else {
            cmlcp->state = ERTS_CML_CLEANUP_STATE_LINKS;
            erts_mtx_lock(&dist->mtx);
            ASSERT(dist->alive);
            dist->alive = 0;
            erts_mtx_unlock(&dist->mtx);
        }

        cmlcp->trigger_node_monitors = is_value(nodename);
        cmlcp->nodename = nodename;
        cmlcp->visability = visability;
        if (rsz == 0)
            cmlcp->reason = reason;
        else {
            Eterm *hp = &cmlcp->heap[0];
            cmlcp->reason = copy_struct(reason, rsz, &hp, &cmlcp->oh);
        }

        esdp = erts_get_scheduler_data();
        ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL);
        erts_schedule_misc_aux_work((int) esdp->no,
                                    con_monitor_link_cleanup,
                                    (void *) cmlcp);
    }
}

/*
** A full node name constists of a "n@h"
**
** n must be a valid node name: string of ([a-z][A-Z][0-9]_-)+
** 
** h is not checked at all, we assume that we have a properly
** configured machine where the networking is ok for the OS
**
** We do check that there is not a second @ in the string, since
** many distributed operations are guaranteed not to work then.
*/


static int is_node_name(char *ptr, int len)
{
   int c = '\0';		/* suppress use-before-set warning */
   int pos = 0;

   while (pos < len) {
      c = ptr[pos++];
      if (! ((c == '-') || (c == '_') ||
	     ((c >= 'a') && (c <= 'z')) ||
	     ((c >= 'A') && (c <= 'Z')) ||
	     ((c >= '0') && (c <= '9'))))
	 break;
   }

   /* Scanned past the host name: now we want to see a '@', and there
      should be text both before and after it. */
   if (c != '@' || pos < 2 || pos == len)
      return 0;

   while (pos < len) {
      c = ptr[pos++];
      if (c == '@')
	 return 0;
   }

   return 1;
}

int is_node_name_atom(Eterm a)
{
    int i;
    if(is_not_atom(a))
	return 0;
    i = atom_val(a);
    ASSERT((i > 0) && (i < atom_table_size()) &&  (atom_tab(i) != NULL));
    return is_node_name((char*)atom_tab(i)->name, atom_tab(i)->len);
}

static void
set_node_not_alive(void *unused)
{
    ErlHeapFragment *bp;
    Eterm nodename = erts_this_dist_entry->sysname;

    ASSERT(erts_atomic_read_nob(&no_nodes) == 0);

    erts_thr_progress_block();
    erts_set_this_node(am_Noname, 0);
    erts_is_alive = 0;
    send_nodes_mon_msgs(NULL, am_nodedown, nodename, am_visible, nodedown.reason);
    nodedown.reason = NIL;
    bp = nodedown.bp;
    nodedown.bp = NULL;
    erts_thr_progress_unblock();
    if (bp)
	free_message_buffer(bp);
}

static ERTS_INLINE void
dec_no_nodes(void)
{
    erts_aint_t no = erts_atomic_dec_read_mb(&no_nodes);
    ASSERT(no >= 0);
    ASSERT(erts_get_scheduler_id()); /* Need to be a scheduler */
    if (no == 0)
	erts_schedule_misc_aux_work(erts_get_scheduler_id(),
				    set_node_not_alive,
				    NULL);
}

static ERTS_INLINE void
inc_no_nodes(void)
{
#ifdef DEBUG
    erts_aint_t no = erts_atomic_read_nob(&no_nodes);
    ASSERT(erts_is_alive ? no > 0 : no == 0);
#endif
    erts_atomic_inc_mb(&no_nodes);
}

static void
kill_dist_ctrl_proc(void *vpid)
{
    erts_proc_sig_send_exit(NULL, (Eterm) vpid, (Eterm) vpid,
                            am_kill, NIL, 0);
}

static void
schedule_kill_dist_ctrl_proc(Eterm pid)
{
    ErtsSchedulerData *esdp = erts_get_scheduler_data();
    int sched_id = 1;
    if (!esdp || ERTS_SCHEDULER_IS_DIRTY(esdp))
        sched_id = 1;
    else
        sched_id = (int) esdp->no;
    erts_schedule_misc_aux_work(sched_id,
                                kill_dist_ctrl_proc,
                                (void *) (UWord) pid);
}

/*
 * proc is currently running or exiting process.
 */
int erts_do_net_exits(DistEntry *dep, Eterm reason)
{
    Eterm nodename;

    if (dep == erts_this_dist_entry) {  /* Net kernel has died (clean up!!) */
	DistEntry *tdep;
	int no_dist_ctrl;
        int no_pending;
	Eterm nd_reason = (reason == am_no_network
			   ? am_no_network
			   : am_net_kernel_terminated);
        int i = 0;
        Eterm *dist_ctrl;
        DistEntry** pending;

        ERTS_UNDEF(dist_ctrl, NULL);
        ERTS_UNDEF(pending, NULL);

	erts_rwmtx_rlock(&erts_dist_table_rwmtx);

        no_dist_ctrl = (erts_no_of_hidden_dist_entries +
                        erts_no_of_visible_dist_entries);
        no_pending = erts_no_of_pending_dist_entries;

	/* KILL all port controllers */
	if (no_dist_ctrl) {
            dist_ctrl = erts_alloc(ERTS_ALC_T_TMP,
                                   sizeof(Eterm)*no_dist_ctrl);
	    for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) {
		ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid));
                ASSERT(i < no_dist_ctrl);
		dist_ctrl[i++] = tdep->cid;
	    }
	    for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) {
		ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid));
                ASSERT(i < no_dist_ctrl);
		dist_ctrl[i++] = tdep->cid;
	    }
            ASSERT(i == no_dist_ctrl);
        }
        if (no_pending) {
            pending = erts_alloc(ERTS_ALC_T_TMP, sizeof(DistEntry*)*no_pending);
            i = 0;
            for (tdep = erts_pending_dist_entries; tdep; tdep = tdep->next) {
                ASSERT(is_nil(tdep->cid));
                ASSERT(i < no_pending);
                pending[i++] = tdep;
                erts_ref_dist_entry(tdep);
            }
            ASSERT(i == no_pending);
        }
        erts_rwmtx_runlock(&erts_dist_table_rwmtx);

        if (no_dist_ctrl) {
            for (i = 0; i < no_dist_ctrl; i++) {
                if (is_internal_pid(dist_ctrl[i]))
                    schedule_kill_dist_ctrl_proc(dist_ctrl[i]);
                else {
                    Port *prt = erts_port_lookup(dist_ctrl[i],
                                                 ERTS_PORT_SFLGS_INVALID_LOOKUP);
                    if (prt) {
                        ASSERT(erts_atomic32_read_nob(&prt->state)
                               & ERTS_PORT_SFLG_DISTRIBUTION);

                        erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED,
                                       prt, dist_ctrl[i], nd_reason, NULL);
                    }
                }
            }
            erts_free(ERTS_ALC_T_TMP, dist_ctrl);
        }

        if (no_pending) {
            for (i = 0; i < no_pending; i++) {
                abort_connection(pending[i], pending[i]->connection_id);
                erts_deref_dist_entry(pending[i]);
            }
            erts_free(ERTS_ALC_T_TMP, pending);
        }


	/*
	 * When last dist ctrl exits, node will be taken
	 * from alive to not alive.
	 */
	ASSERT(is_nil(nodedown.reason) && !nodedown.bp);
	if (is_immed(nd_reason))
	    nodedown.reason = nd_reason;
	else {
	    Eterm *hp;
	    Uint sz = size_object(nd_reason);
	    nodedown.bp = new_message_buffer(sz);
	    hp = nodedown.bp->mem;
	    nodedown.reason = copy_struct(nd_reason,
					  sz,
					  &hp,
					  &nodedown.bp->off_heap);
	}
    }
    else { /* Call from distribution controller (port/process) */
        ErtsMonLnkDist *mld;
        ErtsAtomCache *cache;
        ErtsProcList *suspendees;
        ErtsDistOutputBuf *obuf;
	Uint32 flags;

	erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1);
	erts_de_rwlock(dep);

        if (is_internal_port(dep->cid)) {
            ERTS_LC_ASSERT(erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));

            if (erts_port_task_is_scheduled(&dep->dist_cmd))
                erts_port_task_abort(&dep->dist_cmd);
        }

	if (dep->state == ERTS_DE_STATE_EXITING) {
#ifdef DEBUG
	    ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT);
#endif
	}
	else {
	    dep->state = ERTS_DE_STATE_EXITING;
	    erts_mtx_lock(&dep->qlock);
	    ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
	    erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT);
	    erts_mtx_unlock(&dep->qlock);
	}

        mld = dep->mld;
        dep->mld = NULL;

	nodename = dep->sysname;
	flags = dep->flags;

        erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) NIL);
        cache = dep->cache;
        dep->cache = NULL;

        erts_mtx_lock(&dep->qlock);

        erts_atomic64_set_nob(&dep->in, 0);
        erts_atomic64_set_nob(&dep->out, 0);

        obuf = clear_de_out_queues(dep);
        suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);

        erts_mtx_unlock(&dep->qlock);
        erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
        dep->send = NULL;

	erts_set_dist_entry_not_connected(dep);

	erts_de_rwunlock(dep);

        schedule_con_monitor_link_cleanup(mld,
                                          nodename,
                                          (flags & DFLAG_PUBLISHED
                                           ? am_visible
                                           : am_hidden),
                                          (reason == am_normal
                                           ? am_connection_closed
                                           : reason));

        erts_resume_processes(suspendees);

        delete_cache(cache);

        free_de_out_queues(dep, obuf);
        if (dep->transcode_ctx)
            transcode_free_ctx(dep);
    }

    dec_no_nodes();

    return 1;
}

static Export*
trap_function(Eterm func, int arity)
{
    return erts_export_put(am_erlang, func, arity);
}

/*
 * Sync with dist_util.erl:
 *
 * -record(erts_dflags,
 *         {default, mandatory, addable, rejectable, strict_order}).
 */
static Eterm erts_dflags_record;

void init_dist(void)
{
    init_nodes_monitors();

    nodedown.reason = NIL;
    nodedown.bp = NULL;

    erts_atomic_init_nob(&no_nodes, 0);
    erts_atomic_init_nob(&no_caches, 0);

    /* Lookup/Install all references to trap functions */
    dmonitor_node_trap = trap_function(am_dmonitor_node,3);
    dist_ctrl_put_data_trap = erts_export_put(am_erts_internal,
                                              am_dist_ctrl_put_data,
                                              2);
    {
        Eterm* hp = erts_alloc(ERTS_ALC_T_LITERAL, (1+6)*sizeof(Eterm));
        erts_dflags_record = TUPLE6(hp, am_erts_dflags,
                                    make_small(DFLAG_DIST_DEFAULT),
                                    make_small(DFLAG_DIST_MANDATORY),
                                    make_small(DFLAG_DIST_ADDABLE),
                                    make_small(DFLAG_DIST_REJECTABLE),
                                    make_small(DFLAG_DIST_STRICT_ORDER));
        erts_set_literal_tag(&erts_dflags_record, hp, (1+6));
    }
}

#define ErtsDistOutputBuf2Binary(OB) \
  ((Binary *) (((char *) (OB)) - offsetof(Binary, orig_bytes)))

static ERTS_INLINE ErtsDistOutputBuf *
alloc_dist_obuf(Uint size)
{
    ErtsDistOutputBuf *obuf;
    Uint obuf_size = sizeof(ErtsDistOutputBuf)+sizeof(byte)*(size-1);
    Binary *bin = erts_bin_drv_alloc(obuf_size);
    obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0];
#ifdef DEBUG
    obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN;
    obuf->alloc_endp = obuf->data + size;
    ASSERT(bin == ErtsDistOutputBuf2Binary(obuf));
#endif
    return obuf;
}

static ERTS_INLINE void
free_dist_obuf(ErtsDistOutputBuf *obuf)
{
    Binary *bin = ErtsDistOutputBuf2Binary(obuf);
    ASSERT(obuf->dbg_pattern == ERTS_DIST_OUTPUT_BUF_DBG_PATTERN);
    erts_bin_release(bin);
}

static ERTS_INLINE Sint
size_obuf(ErtsDistOutputBuf *obuf)
{
    Binary *bin = ErtsDistOutputBuf2Binary(obuf);
    return bin->orig_size;
}

static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep)
{
    ErtsDistOutputBuf *obuf;

    ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));

    if (!dep->out_queue.last)
	obuf = dep->finalized_out_queue.first;
    else {
	dep->out_queue.last->next = dep->finalized_out_queue.first;
	obuf = dep->out_queue.first;
    }

    if (dep->tmp_out_queue.first) {
        dep->tmp_out_queue.last->next = obuf;
        obuf = dep->tmp_out_queue.first;
    }

    dep->out_queue.first = NULL;
    dep->out_queue.last = NULL;
    dep->tmp_out_queue.first = NULL;
    dep->tmp_out_queue.last = NULL;
    dep->finalized_out_queue.first = NULL;
    dep->finalized_out_queue.last = NULL;

    return obuf;
}

static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf)
{
    Sint obufsize = 0;

    while (obuf) {
	ErtsDistOutputBuf *fobuf;
	fobuf = obuf;
	obuf = obuf->next;
	obufsize += size_obuf(fobuf);
	free_dist_obuf(fobuf);
    }

    if (obufsize) {
	erts_mtx_lock(&dep->qlock);
        ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize);
        erts_atomic_add_nob(&dep->qsize,
                            (erts_aint_t) -obufsize);
	erts_mtx_unlock(&dep->qlock);
    }
}

int erts_dsend_context_dtor(Binary* ctx_bin)
{
    ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin);
    switch (ctx->dss.phase) {
    case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
	DESTROY_SAVED_WSTACK(&ctx->dss.u.sc.wstack);
	break;
    case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
	DESTROY_SAVED_WSTACK(&ctx->dss.u.ec.wstack);
	break;
    default:;
    }
    if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) {
	free_dist_obuf(ctx->dss.obuf);
    }
    if (ctx->deref_dep)
	erts_deref_dist_entry(ctx->dep);

    return 1;
}

Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx)
{
    struct exported_ctx {
	ErtsSendContext ctx;
	ErtsAtomCacheMap acm;
    };
    Binary* ctx_bin = erts_create_magic_binary(sizeof(struct exported_ctx),
					       erts_dsend_context_dtor);
    struct exported_ctx* dst = ERTS_MAGIC_BIN_DATA(ctx_bin);
    Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);

    sys_memcpy(&dst->ctx, ctx, sizeof(ErtsSendContext));
    ASSERT(ctx->dss.ctl == make_tuple(ctx->ctl_heap));
    dst->ctx.dss.ctl = make_tuple(dst->ctx.ctl_heap);
    if (ctx->dss.acmp) {
	sys_memcpy(&dst->acm, ctx->dss.acmp, sizeof(ErtsAtomCacheMap));
	dst->ctx.dss.acmp = &dst->acm;
    }
    return erts_mk_magic_ref(&hp, &MSO(p), ctx_bin);
}


/*
 * The erts_dsig_send_*() functions implemented below, sends asynchronous
 * distributed signals to other Erlang nodes. Before sending a distributed
 * signal, you need to prepare the operation by calling erts_dsig_prepare()
 * (see dist.h).
 *
 * Note that the distributed signal send operation is truly asynchronous,
 * and the signal is not guaranteed to reach the receiver if the connection
 * goes down before the signal has reached the receiver.
 */

/*
** Send a DOP_LINK link message
*/
int
erts_dsig_send_link(ErtsDSigData *dsdp, Eterm local, Eterm remote)
{
    DeclareTmpHeapNoproc(ctl_heap,4);
    Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_LINK), local, remote);
    int res;
    UseTmpHeapNoproc(4);

    res = dsig_send_ctl(dsdp, ctl, 0);
    UnUseTmpHeapNoproc(4);
    return res;
}

int
erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote)
{
    DeclareTmpHeapNoproc(ctl_heap,4);
    Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_UNLINK), local, remote);
    int res;

    UseTmpHeapNoproc(4);
    res = dsig_send_ctl(dsdp, ctl, 0);
    UnUseTmpHeapNoproc(4);
    return res;
}


/* A local process that's being monitored by a remote one exits. We send:
   {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason} */
int
erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, 
			  Eterm ref, Eterm reason)
{
    Eterm ctl;
    DeclareTmpHeapNoproc(ctl_heap,6);
    int res;

    if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
        /*
         * Receiver does not support DOP_MONITOR_P_EXIT (see dsig_send_monitor)
         */
        return ERTS_DSIG_SEND_OK;
    }

    UseTmpHeapNoproc(6);

    ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
		 watched, watcher, ref, reason);

    res = dsig_send_ctl(dsdp, ctl, 1);
    UnUseTmpHeapNoproc(6);
    return res;
}

/* We want to monitor a process (named or unnamed) on another node, we send:
   {DOP_MONITOR_P, Local pid, Remote pid or name, Ref}, which is exactly what's
   needed on the other side... */
int
erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
		       Eterm ref)
{
    Eterm ctl;
    DeclareTmpHeapNoproc(ctl_heap,5);
    int res;

    if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
        /*
         * Receiver does not support DOP_MONITOR_P.
         * Just avoid sending it and by doing that reduce this monitor
         * to only supervise the connection. This will work for simple c-nodes
         * with a 1-to-1 relation between "Erlang process" and OS-process.
         */
        return ERTS_DSIG_SEND_OK;
    }

    UseTmpHeapNoproc(5);
    ctl = TUPLE4(&ctl_heap[0],
		 make_small(DOP_MONITOR_P),
		 watcher, watched, ref);

    res = dsig_send_ctl(dsdp, ctl, 0);
    UnUseTmpHeapNoproc(5);
    return res;
}

/* A local process monitoring a remote one wants to stop monitoring, either 
   because of a demonitor bif call or because the local process died. We send
   {DOP_DEMONITOR_P, Local pid, Remote pid or name, ref} */
int
erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
			 Eterm watched, Eterm ref, int force)
{
    Eterm ctl;
    DeclareTmpHeapNoproc(ctl_heap,5);
    int res;

    if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
        /*
         * Receiver does not support DOP_DEMONITOR_P (see dsig_send_monitor)
         */
        return ERTS_DSIG_SEND_OK;
    }

    UseTmpHeapNoproc(5);
    ctl = TUPLE4(&ctl_heap[0],
		 make_small(DOP_DEMONITOR_P),
		 watcher, watched, ref);

    res = dsig_send_ctl(dsdp, ctl, force);
    UnUseTmpHeapNoproc(5);
    return res;
}

static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) {
    Eterm label;

    if (ctx->dsd.flags & DFLAG_BIG_SEQTRACE_LABELS) {
        /* The other end is capable of handling arbitrary seq_trace labels. */
        return 1;
    }

    /* The other end only tolerates smalls, but since we could potentially be
     * talking to an old 32-bit emulator from a 64-bit one, we have to check
     * whether the label is small on any emulator. */
    label = SEQ_TRACE_T_LABEL(token);

    return is_small(label) &&
        signed_val(label) <= (ERTS_SINT32_MAX >> _TAG_IMMED1_SIZE) &&
        signed_val(label) >= (ERTS_SINT32_MIN >> _TAG_IMMED1_SIZE);
}

int
erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
{
    Eterm ctl;
    Eterm token = NIL;
    Process *sender = ctx->dsd.proc;
    int res;
#ifdef USE_VM_PROBES
    Sint tok_label = 0;
    Sint tok_lastcnt = 0;
    Sint tok_serial = 0;
    Uint msize = 0;
    DTRACE_CHARBUF(node_name, 64);
    DTRACE_CHARBUF(sender_name, 64);
    DTRACE_CHARBUF(receiver_name, 64);
#endif

    if (have_seqtrace(SEQ_TRACE_TOKEN(sender))) {
	seq_trace_update_send(sender);
	token = SEQ_TRACE_TOKEN(sender);
	seq_trace_output(token, message, SEQ_TRACE_SEND, remote, sender);
    }
#ifdef USE_VM_PROBES
    *node_name = *sender_name = *receiver_name = '\0';
    if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) {
        erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)),
                      "%T", ctx->dsd.dep->sysname);
        erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
                      "%T", sender->common.id);
        erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
                      "%T", remote);
        msize = size_object(message);
        if (have_seqtrace(token)) {
            tok_label = SEQ_TRACE_T_DTRACE_LABEL(token);
            tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
            tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
        }
    }
#endif

    {
        Eterm dist_op, sender_id;
        int send_token;

        send_token = (token != NIL && can_send_seqtrace_token(ctx, token));

        if (ctx->dsd.flags & DFLAG_SEND_SENDER) {
            dist_op = make_small(send_token ?
                                 DOP_SEND_SENDER_TT :
                                 DOP_SEND_SENDER);
            sender_id = sender->common.id;
        } else {
            dist_op = make_small(send_token ?
                                 DOP_SEND_TT :
                                 DOP_SEND);
            sender_id = am_Empty;
        }

        if (send_token) {
            ctl = TUPLE4(&ctx->ctl_heap[0], dist_op, sender_id, remote, token);
        } else {
            ctl = TUPLE3(&ctx->ctl_heap[0], dist_op, sender_id, remote);
        }
    }

    DTRACE6(message_send, sender_name, receiver_name,
            msize, tok_label, tok_lastcnt, tok_serial);
    DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
            msize, tok_label, tok_lastcnt, tok_serial);
    ctx->dss.ctl = ctl;
    ctx->dss.msg = message;
    ctx->dss.force_busy = 0;
    res = erts_dsig_send(&ctx->dsd, &ctx->dss);
    return res;
}

int
erts_dsig_send_reg_msg(Eterm remote_name, Eterm message,
		       ErtsSendContext* ctx)
{
    Eterm ctl;
    Eterm token = NIL;
    Process *sender = ctx->dsd.proc;
    int res;
#ifdef USE_VM_PROBES
    Sint tok_label = 0;
    Sint tok_lastcnt = 0;
    Sint tok_serial = 0;
    Uint32 msize = 0;
    DTRACE_CHARBUF(node_name, 64);
    DTRACE_CHARBUF(sender_name, 64);
    DTRACE_CHARBUF(receiver_name, 128);
#endif

    if (have_seqtrace(SEQ_TRACE_TOKEN(sender))) {
	seq_trace_update_send(sender);
	token = SEQ_TRACE_TOKEN(sender);
	seq_trace_output(token, message, SEQ_TRACE_SEND, remote_name, sender);
    }
#ifdef USE_VM_PROBES
    *node_name = *sender_name = *receiver_name = '\0';
    if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) {
        erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)),
                      "%T", ctx->dsd.dep->sysname);
        erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
                      "%T", sender->common.id);
        erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
                      "{%T,%s}", remote_name, node_name);
        msize = size_object(message);
        if (have_seqtrace(token)) {
            tok_label = SEQ_TRACE_T_DTRACE_LABEL(token);
            tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
            tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
        }
    }
#endif

    if (token != NIL && can_send_seqtrace_token(ctx, token))
	ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_REG_SEND_TT),
		     sender->common.id, am_Empty, remote_name, token);
    else
	ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_REG_SEND),
		     sender->common.id, am_Empty, remote_name);

    DTRACE6(message_send, sender_name, receiver_name,
            msize, tok_label, tok_lastcnt, tok_serial);
    DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
            msize, tok_label, tok_lastcnt, tok_serial);
    ctx->dss.ctl = ctl;
    ctx->dss.msg = message;
    ctx->dss.force_busy = 0;
    res = erts_dsig_send(&ctx->dsd, &ctx->dss);
    return res;
}

/* local has died, deliver the exit signal to remote */
int
erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, 
		       Eterm reason, Eterm token)
{
    Eterm ctl;
    DeclareTmpHeapNoproc(ctl_heap,6);
    int res;
#ifdef USE_VM_PROBES
    Process *sender = dsdp->proc;
    Sint tok_label = 0;
    Sint tok_lastcnt = 0;
    Sint tok_serial = 0;
    DTRACE_CHARBUF(node_name, 64);
    DTRACE_CHARBUF(sender_name, 64);
    DTRACE_CHARBUF(remote_name, 128);
    DTRACE_CHARBUF(reason_str, 128);
#endif

    UseTmpHeapNoproc(6);
    if (have_seqtrace(token)) {
	seq_trace_update_send(dsdp->proc);
	seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local);
	ctl = TUPLE5(&ctl_heap[0],
		     make_small(DOP_EXIT_TT), local, remote, token, reason);
    } else {
	ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
    }
#ifdef USE_VM_PROBES
    *node_name = *sender_name = *remote_name = '\0';
    if (DTRACE_ENABLED(process_exit_signal_remote)) {
        erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)),
                      "%T", dsdp->dep->sysname);
        erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
                      "%T", sender->common.id);
        erts_snprintf(remote_name, sizeof(DTRACE_CHARBUF_NAME(remote_name)),
                      "{%T,%s}", remote, node_name);
        erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)),
                      "%T", reason);
        if (have_seqtrace(token)) {
            tok_label = SEQ_TRACE_T_DTRACE_LABEL(token);
            tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
            tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
        }
    }
#endif
    DTRACE7(process_exit_signal_remote, sender_name, node_name,
            remote_name, reason_str, tok_label, tok_lastcnt, tok_serial);
    /* forced, i.e ignore busy */
    res = dsig_send_ctl(dsdp, ctl, 1);
    UnUseTmpHeapNoproc(6);
    return res;
}

int
erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
{
    DeclareTmpHeapNoproc(ctl_heap,5);
    int res;
    Eterm ctl;

    UseTmpHeapNoproc(5);
    ctl = TUPLE4(&ctl_heap[0],
		 make_small(DOP_EXIT), local, remote, reason);
    /* forced, i.e ignore busy */
    res =  dsig_send_ctl(dsdp, ctl, 1);
    UnUseTmpHeapNoproc(5);
    return res;
}

int
erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
{
    DeclareTmpHeapNoproc(ctl_heap,5);
    int res;
    Eterm ctl;

    UseTmpHeapNoproc(5);
    ctl = TUPLE4(&ctl_heap[0],
		 make_small(DOP_EXIT2), local, remote, reason);

    res = dsig_send_ctl(dsdp, ctl, 0);
    UnUseTmpHeapNoproc(5);
    return res;
}


int
erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
{
    DeclareTmpHeapNoproc(ctl_heap,4);
    int res;
    Eterm ctl;

    UseTmpHeapNoproc(4);
    ctl = TUPLE3(&ctl_heap[0],
		 make_small(DOP_GROUP_LEADER), leader, remote);

    res = dsig_send_ctl(dsdp, ctl, 0);
    UnUseTmpHeapNoproc(4);
    return res;
}

#if defined(PURIFY)
#  define PURIFY_MSG(msg) \
    purify_printf("%s, line %d: %s", __FILE__, __LINE__, msg)
#elif defined(VALGRIND)
#include <valgrind/valgrind.h>
#include <valgrind/memcheck.h>

#  define PURIFY_MSG(msg)                                                    \
    VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg)
#else
#  define PURIFY_MSG(msg)
#endif

/*
** Input from distribution port.
**  Input follows the distribution protocol v4.5
**  
**   The protocol is a 4 byte header protocol
**   the DOP_DATA is stripped by driver_output
**
**   assert  hlen == 0 !!!
*/

int erts_net_message(Port *prt,
		     DistEntry *dep,
                     Uint32 conn_id,
		     byte *hbuf,
		     ErlDrvSizeT hlen,
		     byte *buf,
		     ErlDrvSizeT len)
{
    ErtsDistExternal ede;
    Sint ctl_len;
    Eterm arg;
    Eterm from, to;
    Eterm watcher, watched;
    Eterm ref;
    Eterm *tuple;
    Eterm reason;
    Process* rp;
    DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
    Eterm* ctl = ctl_default;
    ErtsHeapFactory factory;
    Eterm* hp;
    Sint type;
    Eterm token;
    Eterm token_size;
    Uint tuple_arity;
    int res;
#ifdef ERTS_DIST_MSG_DBG
    ErlDrvSizeT orig_len = len;
#endif

    UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);

    ERTS_CHK_NO_PROC_LOCKS;

    ERTS_LC_ASSERT(!prt || erts_lc_is_port_locked(prt));

    if (!erts_is_alive) {
	UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
	return 0;
    }

    ASSERT(hlen == 0);

    if (len == 0) {  /* HANDLE TICK !!! */
	UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
	return 0;
    }

#ifdef ERTS_RAW_DIST_MSG_DBG
    erts_fprintf(stderr, "<< ");
    bw(buf, len);
#endif

    res = erts_prepare_dist_ext(&ede, buf, len, dep, conn_id, dep->cache);

    switch (res) {
    case ERTS_PREP_DIST_EXT_CLOSED:
        return 0; /* Connection not alive; ignore signal... */
    case ERTS_PREP_DIST_EXT_FAILED:
#ifdef ERTS_DIST_MSG_DBG
	erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n");
	bw(buf, orig_len);
#endif
        goto data_error;
    case ERTS_PREP_DIST_EXT_SUCCESS:
	ctl_len = erts_decode_dist_ext_size(&ede);
        if (ctl_len < 0) {
#ifdef ERTS_DIST_MSG_DBG
            erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n");
            bw(buf, orig_len);
#endif
            PURIFY_MSG("data error");
            goto data_error;
        }
        break;
    default:
        ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()");
        break;
    }

    if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
	ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm));
    }
    hp = ctl;

    erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF);
    arg = erts_decode_dist_ext(&factory, &ede);
    if (is_non_value(arg)) {
#ifdef ERTS_DIST_MSG_DBG
	erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext(CTL) failed:\n");
	bw(buf, orig_len);
#endif
	PURIFY_MSG("data error");
	goto decode_error;
    }

#ifdef ERTS_DIST_MSG_DBG
    erts_fprintf(stderr, "<< CTL: %T\n", arg);
#endif

    if (is_not_tuple(arg) || 
	(tuple = tuple_val(arg), (tuple_arity = arityval(*tuple)) < 1) ||
	is_not_small(tuple[1])) {
 	goto invalid_message;
    }

    token_size = 0;
    token = NIL;

    switch (type = unsigned_val(tuple[1])) {
    case DOP_LINK: {
        ErtsDSigData dsd;
        int code;

	if (tuple_arity != 3) {
	    goto invalid_message;
	}
	from = tuple[2];
	to   = tuple[3];  /* local proc to link to */

	if (is_not_external_pid(from))
            goto invalid_message;

        if (dep != external_pid_dist_entry(from))
            goto invalid_message;

        if (is_external_pid(to)) {
            if (external_pid_dist_entry(to) != erts_this_dist_entry)
                goto invalid_message;
            /* old incarnation of node; reply noproc... */
        }
        else if (is_internal_pid(to)) {
            ErtsLinkData *ldp = erts_link_create(ERTS_LNK_TYPE_DIST_PROC,
                                                 from, to);
            ASSERT(ldp->a.other.item == to);
            ASSERT(eq(ldp->b.other.item, from));
#ifdef DEBUG
            code =
#endif
                erts_link_dist_insert(&ldp->a, dep->mld);
            ASSERT(code);

            if (erts_proc_sig_send_link(NULL, to, &ldp->b))
                break; /* done */

            /* Failed to send signal; cleanup and reply noproc... */

#ifdef DEBUG
            code =
#endif
                erts_link_dist_delete(&ldp->a);
            ASSERT(code);
            erts_link_release_both(ldp);
        }

        code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0);
        if (code == ERTS_DSIG_PREP_CONNECTED) {
            code = erts_dsig_send_exit(&dsd, to, from, am_noproc);
            ASSERT(code == ERTS_DSIG_SEND_OK);
        }

	break;
    }

    case DOP_UNLINK: {
	if (tuple_arity != 3) {
	    goto invalid_message;
	}
	from = tuple[2];
	to = tuple[3];
	if (is_not_external_pid(from))
	    goto invalid_message;
        if (dep != external_pid_dist_entry(from))
	    goto invalid_message;

        if (is_external_pid(to)
            && erts_this_dist_entry == external_pid_dist_entry(from))
            break;

        if (is_not_internal_pid(to))
            goto invalid_message;

        erts_proc_sig_send_dist_unlink(dep, from, to);
	break;
    }
    
    case DOP_MONITOR_P: {
	/* A remote process wants to monitor us, we get:
	   {DOP_MONITOR_P, Remote pid, local pid or name, ref} */
	Eterm pid, name;
        ErtsDSigData dsd;
        int code;

	if (tuple_arity != 4) {
	    goto invalid_message;
	}

	watcher = tuple[2];
	watched = tuple[3];  /* local proc to monitor */
	ref     = tuple[4];

        if (is_not_external_pid(watcher))
            goto invalid_message;
        else if (external_pid_dist_entry(watcher) != dep)
            goto invalid_message;

	if (is_not_ref(ref))
	    goto invalid_message;

        if (is_internal_pid(watched)) {
            name = NIL;
            pid = watched;
        }
        else if (is_atom(watched)) {
            name = watched;
            pid = erts_whereis_name_to_id(NULL, watched);
            /* if port or undefined; reply noproc... */
        }
        else if (is_external_pid(watched)
                 && external_pid_dist_entry(watched) == erts_this_dist_entry) {
            name = NIL;
            pid = am_undefined; /* old incarnation; reply noproc... */
        }
        else
            goto invalid_message;

        if (is_internal_pid(pid)) {
            ErtsMonitorData *mdp;
            mdp = erts_monitor_create(ERTS_MON_TYPE_DIST_PROC,
                                      ref, watcher, pid, name);

            code = erts_monitor_dist_insert(&mdp->origin, dep->mld);
            ASSERT(code); (void)code;

            if (erts_proc_sig_send_monitor(&mdp->target, pid))
                break; /* done */

            /* Failed to send to local proc; cleanup reply noproc... */

            code = erts_monitor_dist_delete(&mdp->origin);
            ASSERT(code); (void)code;
            erts_monitor_release_both(mdp);

        }

        code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0);
        if (code == ERTS_DSIG_PREP_CONNECTED) {
            code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref,
                                         am_noproc);
            ASSERT(code == ERTS_DSIG_SEND_OK);
        }

        break;
    }

    case DOP_DEMONITOR_P:
	/* A remote node informs us that a local pid in no longer monitored
	   We get {DOP_DEMONITOR_P, Remote pid, Local pid or name, ref},
	   We need only the ref of course */

	if (tuple_arity != 4) {
	    goto invalid_message;
	}

	watcher = tuple[2];
	watched = tuple[3];
	ref = tuple[4];

	if (is_not_ref(ref)) {
	    goto invalid_message;
	}

        if (is_not_external_pid(watcher) || external_pid_dist_entry(watcher) != dep)
            goto invalid_message;

        if (is_internal_pid(watched))
            erts_proc_sig_send_dist_demonitor(watched, ref);
        else if (is_external_pid(watched)
                 && external_pid_dist_entry(watched) == erts_this_dist_entry) {
            /* old incarnation; ignore it */
            ;
        }
        else if (is_atom(watched)) {
            ErtsMonLnkDist *mld = dep->mld;
            ErtsMonitor *mon;

            erts_mtx_lock(&mld->mtx);

            mon = erts_monitor_tree_lookup(mld->orig_name_monitors, ref);
            if (mon)
                erts_monitor_tree_delete(&mld->orig_name_monitors, mon);

            erts_mtx_unlock(&mld->mtx);

            if (mon)
                erts_proc_sig_send_demonitor(mon);
        }
        else
            goto invalid_message;

	break;

    case DOP_REG_SEND_TT:
	if (tuple_arity != 5) {
	    goto invalid_message;
	}

	token_size = size_object(tuple[5]);
	/* Fall through ... */
    case DOP_REG_SEND:
	/* {DOP_REG_SEND, From, Cookie, ToName} -- Message */
	/* {DOP_REG_SEND_TT, From, Cookie, ToName, TraceToken} -- Message */

	/*
	 * There is intentionally no testing of the cookie (it is always '')
	 * from R9B and onwards.
	 */
	if (type != DOP_REG_SEND_TT && tuple_arity != 4) {
	    goto invalid_message;
	}

#ifdef ERTS_DIST_MSG_DBG
	dist_msg_dbg(&ede, "MSG", buf, orig_len);
#endif

	from = tuple[2];
	to = tuple[4];
	if (is_not_pid(from) || is_not_atom(to)){
	    goto invalid_message;
	}
	rp = erts_whereis_process(NULL, 0, to, 0, 0);
	if (rp) {
	    Uint xsize = (type == DOP_REG_SEND
			  ? 0
			  : ERTS_HEAP_FRAG_SIZE(token_size));
	    ErtsProcLocks locks = 0;
	    ErtsDistExternal *ede_copy;

	    ede_copy = erts_make_dist_ext_copy(&ede, xsize);
	    if (type == DOP_REG_SEND) {
		token = NIL;
	    } else {
		ErlHeapFragment *heap_frag;
		ErlOffHeap *ohp;
		ASSERT(xsize);
		heap_frag = erts_dist_ext_trailer(ede_copy);
		ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
		hp = heap_frag->mem;
		ohp = &heap_frag->off_heap;
		token = tuple[5];
		token = copy_struct(token, token_size, &hp, ohp);
	    }

	    erts_queue_dist_message(rp, locks, ede_copy, token, from);
	    if (locks)
		erts_proc_unlock(rp, locks);
	}
	break;

    case DOP_SEND_SENDER_TT: {
        Uint xsize;
    case DOP_SEND_TT:

	if (tuple_arity != 4) {
	    goto invalid_message;
	}

	token = tuple[4];
	token_size = size_object(token);
        xsize = ERTS_HEAP_FRAG_SIZE(token_size);
        goto send_common;

    case DOP_SEND_SENDER:
    case DOP_SEND:

        token = NIL;
        xsize = 0;
	if (tuple_arity != 3)
	    goto invalid_message;

    send_common:

	/*
         * If DOP_SEND_SENDER or DOP_SEND_SENDER_TT element 2 contains
         * the sender pid (i.e. DFLAG_SEND_SENDER is set); otherwise,
         * the atom '' (empty cookie).
	 */
        ASSERT((type == DOP_SEND_SENDER || type == DOP_SEND_SENDER_TT)
               ? (is_pid(tuple[2]) && (dep->flags & DFLAG_SEND_SENDER))
               : tuple[2] == am_Empty);

#ifdef ERTS_DIST_MSG_DBG
	dist_msg_dbg(&ede, "MSG", buf, orig_len);
#endif
	to = tuple[3];
	if (is_not_pid(to)) {
	    goto invalid_message;
	}
	rp = erts_proc_lookup(to);
	if (rp) {
	    ErtsProcLocks locks = 0;
	    ErtsDistExternal *ede_copy;

	    ede_copy = erts_make_dist_ext_copy(&ede, xsize);
	    if (is_not_nil(token)) {
		ErlHeapFragment *heap_frag;
		ErlOffHeap *ohp;
		ASSERT(xsize);
		heap_frag = erts_dist_ext_trailer(ede_copy);
		ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
		hp = heap_frag->mem;
		ohp = &heap_frag->off_heap;
		token = copy_struct(token, token_size, &hp, ohp);
	    }

	    erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty);
	    if (locks)
		erts_proc_unlock(rp, locks);
	}
	break;
    }

    case DOP_MONITOR_P_EXIT: {
	/* We are monitoring a process on the remote node which dies, we get
	   {DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */
	   
	if (tuple_arity != 5) {
	    goto invalid_message;
	}

	watched = tuple[2];  /* remote proc or name which died */
	watcher = tuple[3];
	ref     = tuple[4];
	reason  = tuple[5];

	if (is_not_ref(ref))
	    goto invalid_message;

        if (is_not_external_pid(watched) && is_not_atom(watched))
            goto invalid_message;

        if (is_not_internal_pid(watcher)) {
            if (!is_external_pid(watcher))
                goto invalid_message;
            if (erts_this_dist_entry == external_pid_dist_entry(watcher))
                break;
            goto invalid_message;
        }

        erts_proc_sig_send_dist_monitor_down(dep, ref, watched,
                                             watcher, reason);
	break;
    }

    case DOP_EXIT_TT:
    case DOP_EXIT: {
	/* 'from', which 'to' is linked to, died */
	if (type == DOP_EXIT) {
	    if (tuple_arity != 4) {
		goto invalid_message;
	    }
	    
	    from = tuple[2];
	    to = tuple[3];
	    reason = tuple[4];
	    token = NIL;
	} else {
	    if (tuple_arity != 5) {
		goto invalid_message;
	    }
	    from = tuple[2];
	    to = tuple[3];
	    token = tuple[4];
	    reason = tuple[5];
	}
	if (is_not_external_pid(from)
            || dep != external_pid_dist_entry(from)
            || is_not_internal_pid(to)) {
	    goto invalid_message;
	}

        erts_proc_sig_send_dist_link_exit(dep,
                                          from, to,
                                          reason, token);
	break;
    }
    case DOP_EXIT2_TT:
    case DOP_EXIT2:
	/* 'from' is send an exit signal to 'to' */
	if (type == DOP_EXIT2) {
	    if (tuple_arity != 4) {
		goto invalid_message;
	    }
	    from = tuple[2];
	    to = tuple[3];
	    reason = tuple[4];
	    token = NIL;
	} else {
	    if (tuple_arity != 5) {
		goto invalid_message;
	    }
	    from = tuple[2];
	    to = tuple[3];
	    token = tuple[4];
	    reason = tuple[5];
	}
	if (is_not_pid(from) || is_not_internal_pid(to)) {
	    goto invalid_message;
	}

        erts_proc_sig_send_exit(NULL, from, to, reason, token, 0);
	break;

    case DOP_GROUP_LEADER:
	if (tuple_arity != 3) {
	    goto invalid_message;
	}
	from = tuple[2];   /* Group leader  */
	to = tuple[3];     /* new member */
	if (is_not_pid(from) || is_not_pid(to)) {
	    goto invalid_message;
	}

        (void) erts_proc_sig_send_group_leader(NULL, to, from, NIL);
	break;

    default: 
	goto invalid_message;
    }

    erts_factory_close(&factory);
    if (ctl != ctl_default) {
	erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
    }
    UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
    ERTS_CHK_NO_PROC_LOCKS;
    return 0;
 invalid_message:
    {
	erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
	erts_dsprintf(dsbufp, "Invalid distribution message: %.200T", arg);
	erts_send_error_to_logger_nogl(dsbufp);
    }
decode_error:
    PURIFY_MSG("data error");
    erts_factory_close(&factory);
    if (ctl != ctl_default) {
	erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
    }
data_error:
    UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
    erts_kill_dist_connection(dep, conn_id);
    ERTS_CHK_NO_PROC_LOCKS;
    return -1;
}

static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
{
    struct erts_dsig_send_context ctx;
    int ret;
    ctx.ctl = ctl;
    ctx.msg = THE_NON_VALUE;
    ctx.force_busy = force_busy;
    ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
#ifdef DEBUG
    ctx.reds = 1; /* provoke assert below (no reduction count without msg) */
#endif
    ret = erts_dsig_send(dsdp, &ctx);
    ASSERT(ret != ERTS_DSIG_SEND_CONTINUE);
    return ret;
}

static ERTS_INLINE void
notify_dist_data(Process *c_p, Eterm pid)
{
    Process *rp;
    ErtsProcLocks rp_locks;

    ASSERT(erts_get_scheduler_data()
           && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data()));
    ASSERT(is_internal_pid(pid));

    if (c_p && c_p->common.id == pid) {
        rp = c_p;
        rp_locks = ERTS_PROC_LOCK_MAIN;
    }
    else {
        rp = erts_proc_lookup(pid);
        rp_locks = 0;
    }

    if (rp) {
        ErtsMessage *mp = erts_alloc_message(0, NULL);
        erts_queue_message(rp, rp_locks, mp, am_dist_data, am_system);
    }
}

int
erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
{
    int retval;
    Sint initial_reds = ctx->reds;
    Eterm cid;

    while (1) {
	switch (ctx->phase) {
	case ERTS_DSIG_SEND_PHASE_INIT:
	    ctx->flags = dsdp->flags;
	    ctx->c_p = dsdp->proc;

	    if (!ctx->c_p || dsdp->no_suspend)
		ctx->force_busy = 1;

	    ERTS_LC_ASSERT(!ctx->c_p
			       || (ERTS_PROC_LOCK_MAIN
				   == erts_proc_lc_my_proc_locks(ctx->c_p)));

	    if (!erts_is_alive)
		return ERTS_DSIG_SEND_OK;

	    if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) {
		ctx->acmp = erts_get_atom_cache_map(ctx->c_p);
		ctx->max_finalize_prepend = 0;
	    }
	    else {
		ctx->acmp = NULL;
		ctx->max_finalize_prepend = 3;
	    }

    #ifdef ERTS_DIST_MSG_DBG
            erts_fprintf(stderr, ">> CTL: %T\n", ctx->ctl);
            if (is_value(ctx->msg))
                erts_fprintf(stderr, "    MSG: %T\n", ctx->msg);
    #endif

	    ctx->data_size = ctx->max_finalize_prepend;
	    erts_reset_atom_cache_map(ctx->acmp);
	    erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size);

	    if (is_non_value(ctx->msg)) {
                ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
                break;
            }
            ctx->u.sc.wstack.wstart = NULL;
            ctx->u.sc.flags = ctx->flags;
            ctx->u.sc.level = 0;

            ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
	case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
	    if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
		retval = ERTS_DSIG_SEND_CONTINUE;
		goto done;
	    }

	    ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
	case ERTS_DSIG_SEND_PHASE_ALLOC:
	    erts_finalize_atom_cache_map(ctx->acmp, ctx->flags);

	    ctx->dhdr_ext_size = erts_encode_ext_dist_header_size(ctx->acmp);
	    ctx->data_size += ctx->dhdr_ext_size;

	    ctx->obuf = alloc_dist_obuf(ctx->data_size);
	    ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size;

	    /* Encode internal version of dist header */
	    ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp);
	    /* Encode control message */
	    erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL);
	    if (is_non_value(ctx->msg)) {
                ctx->obuf->msg_start = NULL;
                ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
                break;
            }
            ctx->u.ec.flags = ctx->flags;
            ctx->u.ec.hopefull_flags = 0;
            ctx->u.ec.level = 0;
            ctx->u.ec.wstack.wstart = NULL;
            ctx->obuf->msg_start = ctx->obuf->ext_endp;

            ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE;
        case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
	    if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) {
		retval = ERTS_DSIG_SEND_CONTINUE;
		goto done;
	    }

	    ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
	case ERTS_DSIG_SEND_PHASE_FIN: {
	    DistEntry *dep = dsdp->dep;
	    int suspended = 0;
	    int resume = 0;

	    ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp);
	    ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend);
	    ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size);

	    ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp;

            ctx->obuf->hopefull_flags = ctx->u.ec.hopefull_flags;
	    /*
	     * Signal encoded; now verify that the connection still exists,
	     * and if so enqueue the signal and schedule it for send.
	     */
	    ctx->obuf->next = NULL;
	    erts_de_rlock(dep);
	    cid = dep->cid;
	    if (dep->state == ERTS_DE_STATE_EXITING
                || dep->state == ERTS_DE_STATE_IDLE
                || dep->connection_id != dsdp->connection_id) {
		/* Not the same connection as when we started; drop message... */
		erts_de_runlock(dep);
		free_dist_obuf(ctx->obuf);
	    }
	    else {
                Sint qsize;
                erts_aint32_t qflgs;
		ErtsProcList *plp = NULL;
                Eterm notify_proc = NIL;
                Sint obsz = size_obuf(ctx->obuf);

		erts_mtx_lock(&dep->qlock);
		qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz);
                ASSERT(qsize >= obsz);
                qflgs = erts_atomic32_read_nob(&dep->qflgs);
		if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) {
		    erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY);
                    qflgs |= ERTS_DE_QFLG_BUSY;
                }
                if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) {
                    /* Previously empty queue and info requested... */
                    qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
                                                       ~ERTS_DE_QFLG_REQ_INFO);
                    if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
                        notify_proc = dep->cid;
                        ASSERT(is_internal_pid(notify_proc));
                    }
                    /* else: requester will send itself the message... */
                    qflgs &= ~ERTS_DE_QFLG_REQ_INFO;
                }
		if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) {
		    erts_mtx_unlock(&dep->qlock);

		    plp = erts_proclist_create(ctx->c_p);
		    erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL);
		    suspended = 1;
		    erts_mtx_lock(&dep->qlock);
		}

		/* Enqueue obuf on dist entry */
		if (dep->out_queue.last)
		    dep->out_queue.last->next = ctx->obuf;
		else
		    dep->out_queue.first = ctx->obuf;
		dep->out_queue.last = ctx->obuf;

		if (!ctx->force_busy) {
                    qflgs = erts_atomic32_read_nob(&dep->qflgs);
		    if (!(qflgs & ERTS_DE_QFLG_BUSY)) {
			if (suspended)
			    resume = 1; /* was busy when we started, but isn't now */
    #ifdef USE_VM_PROBES
			if (resume && DTRACE_ENABLED(dist_port_not_busy)) {
			    DTRACE_CHARBUF(port_str, 64);
			    DTRACE_CHARBUF(remote_str, 64);

			    erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
					  "%T", cid);
			    erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
					  "%T", dep->sysname);
			    DTRACE3(dist_port_not_busy, erts_this_node_sysname,
				    port_str, remote_str);
			}
    #endif
		    }
		    else {
			/* Enqueue suspended process on dist entry */
			ASSERT(plp);
			erts_proclist_store_last(&dep->suspended, plp);
		    }
		}

		erts_mtx_unlock(&dep->qlock);
		if (dep->state != ERTS_DE_STATE_PENDING) {
                    if (is_internal_port(dep->cid))
                        erts_schedule_dist_command(NULL, dep);
                }
                else {
                    notify_proc = NIL;
                }
		erts_de_runlock(dep);
                if (is_internal_pid(notify_proc))
                    notify_dist_data(ctx->c_p, notify_proc);

		if (resume) {
		    erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN);
		    erts_proclist_destroy(plp);
		    /*
		     * Note that the calling process still have to yield as if it
		     * suspended. If not, the calling process could later be
		     * erroneously scheduled when it shouldn't be.
		     */
		}
	    }
	    ctx->obuf = NULL;

	    if (suspended) {
    #ifdef USE_VM_PROBES
		if (!resume && DTRACE_ENABLED(dist_port_busy)) {
		    DTRACE_CHARBUF(port_str, 64);
		    DTRACE_CHARBUF(remote_str, 64);
		    DTRACE_CHARBUF(pid_str, 16);

		    erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", cid);
		    erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
				  "%T", dep->sysname);
		    erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)),
				  "%T", ctx->c_p->common.id);
		    DTRACE4(dist_port_busy, erts_this_node_sysname,
			    port_str, remote_str, pid_str);
		}
    #endif
		if (!resume && erts_system_monitor_flags.busy_dist_port)
		    monitor_generic(ctx->c_p, am_busy_dist_port, cid);
		retval = ERTS_DSIG_SEND_YIELD;
	    } else {
		retval = ERTS_DSIG_SEND_OK;
	    }
	    goto done;
	}
	default:
	    erts_exit(ERTS_ABORT_EXIT, "dsig_send invalid phase (%d)\n", (int)ctx->phase);
	}
    }

done:
    if (ctx->msg && ctx->c_p) {
	BUMP_REDS(ctx->c_p, (initial_reds - ctx->reds) / TERM_TO_BINARY_LOOP_FACTOR);
    }
    return retval;
}

static Uint
dist_port_command(Port *prt, ErtsDistOutputBuf *obuf)
{
    int fpe_was_unmasked;
    ErlDrvSizeT size;
    char *bufp;

    ERTS_CHK_NO_PROC_LOCKS;
    ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));

    if (!obuf) {
        size = 0;
        bufp = NULL;
    }
    else {
        size = obuf->ext_endp - obuf->extp;
        bufp = (char*) obuf->extp;
    }

#ifdef USE_VM_PROBES
    if (DTRACE_ENABLED(dist_output)) {
        DTRACE_CHARBUF(port_str, 64);
        DTRACE_CHARBUF(remote_str, 64);

        erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
                      "%T", prt->common.id);
        erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
                      "%T", prt->dist_entry->sysname);
        DTRACE4(dist_output, erts_this_node_sysname, port_str,
                remote_str, size);
    }
#endif

    prt->caller = NIL;
    fpe_was_unmasked = erts_block_fpe();
    (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, bufp, size);
    erts_unblock_fpe(fpe_was_unmasked);
    return size;
}

static Uint
dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
{
    int fpe_was_unmasked;
    ErlDrvSizeT size;
    SysIOVec iov[2];
    ErlDrvBinary* bv[2];
    ErlIOVec eiov;

    ERTS_CHK_NO_PROC_LOCKS;
    ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));

    iov[0].iov_base = NULL;
    iov[0].iov_len = 0;
    bv[0] = NULL;

    if (!obuf) {
        size = 0;
        eiov.vsize = 1;
    }
    else {
        size = obuf->ext_endp - obuf->extp;
        eiov.vsize = 2;

        iov[1].iov_base = obuf->extp;
        iov[1].iov_len = size;
        bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf));
    }

    eiov.size = size;
    eiov.iov = iov;
    eiov.binv = bv;

    if (size > (Uint) INT_MAX)
	erts_exit(ERTS_DUMP_EXIT,
		 "Absurdly large distribution output data buffer "
		 "(%beu bytes) passed.\n",
		 size);

    ASSERT(prt->drv_ptr->outputv);

#ifdef USE_VM_PROBES
    if (DTRACE_ENABLED(dist_outputv)) {
        DTRACE_CHARBUF(port_str, 64);
        DTRACE_CHARBUF(remote_str, 64);

        erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
                      "%T", prt->common.id);
        erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
                      "%T", prt->dist_entry->sysname);
        DTRACE4(dist_outputv, erts_this_node_sysname, port_str,
                remote_str, size);
    }
#endif
    prt->caller = NIL;
    fpe_was_unmasked = erts_block_fpe();
    (*prt->drv_ptr->outputv)((ErlDrvData) prt->drv_data, &eiov);
    erts_unblock_fpe(fpe_was_unmasked);

    return size;
}


#if defined(ARCH_64)
#define ERTS_PORT_REDS_MASK__ 0x003fffffffffffffL
#elif defined(ARCH_32)
#define ERTS_PORT_REDS_MASK__ 0x003fffff
#else
#  error "Ohh come on ... !?!"
#endif

#define ERTS_PORT_REDS_DIST_CMD_START 5
#define ERTS_PORT_REDS_DIST_CMD_EXIT 200
#define ERTS_PORT_REDS_DIST_CMD_RESUMED 5
#define ERTS_PORT_REDS_DIST_CMD_DATA(SZ) \
  ((SZ) < (1 << 10) \
   ? ((Sint) 1) \
   : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__)))

int
erts_dist_command(Port *prt, int initial_reds)
{
    Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START;
    enum dist_entry_state state;
    Uint32 flags;
    Sint qsize, obufsize = 0;
    ErtsDistOutputQueue oq, foq;
    DistEntry *dep = prt->dist_entry;
    Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
    erts_aint32_t sched_flags;
    ErtsSchedulerData *esdp = erts_get_scheduler_data();

    ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));

    erts_atomic_set_mb(&dep->dist_cmd_scheduled, 0);

    erts_de_rlock(dep);
    flags = dep->flags;
    state = dep->state;
    send = dep->send;
    erts_de_runlock(dep);

    if (state == ERTS_DE_STATE_EXITING) {
	erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1);
        reds -= ERTS_PORT_REDS_DIST_CMD_EXIT;
	return initial_reds - reds;
    }

    ASSERT(state != ERTS_DE_STATE_PENDING);

    ASSERT(send);

    /*
     * We need to remove both out queues from the
     * dist entry while passing it to port command;
     * otherwise, port command will free the buffers
     * in the queues on failure and we'll end up with
     * a mess.
     */

    erts_mtx_lock(&dep->qlock);
    oq.first = dep->out_queue.first;
    oq.last = dep->out_queue.last;
    dep->out_queue.first = NULL;
    dep->out_queue.last = NULL;
    erts_mtx_unlock(&dep->qlock);

    foq.first = dep->finalized_out_queue.first;
    foq.last = dep->finalized_out_queue.last;
    dep->finalized_out_queue.first = NULL;
    dep->finalized_out_queue.last = NULL;

    sched_flags = erts_atomic32_read_nob(&prt->sched.flags);

    if (reds < 0)
	goto preempted;

    if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) {
	int preempt = 0;
	do {
            Uint size;
            ErtsDistOutputBuf *fob;
            size = (*send)(prt, foq.first);
            erts_atomic64_inc_nob(&dep->out);
            esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
            erts_fprintf(stderr, ">> ");
            bw(foq.first->extp, size);
#endif
	    reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size);
	    fob = foq.first;
	    obufsize += size_obuf(fob);
	    foq.first = foq.first->next;
	    free_dist_obuf(fob);
	    sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
	    preempt = reds < 0 || (sched_flags & ERTS_PTS_FLG_EXIT);
	    if (sched_flags & ERTS_PTS_FLG_BUSY_PORT)
		break;
	} while (foq.first && !preempt);
	if (!foq.first)
	    foq.last = NULL;
	if (preempt)
	    goto preempted;
    }

    if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) {
	if (oq.first) {
	    ErtsDistOutputBuf *ob;
            ErtsDistOutputBuf *last_finalized = NULL;
	finalize_only:
	    ob = oq.first;
	    ASSERT(ob);
	    do {
		reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds);
                if (reds < 0)
                    break;
                last_finalized  = ob;
                ob = ob->next;
	    } while (ob);
            if (last_finalized) {
                /*
                 * At least one buffer was finalized; if we got preempted,
                 * ob points to the next buffer to continue finalize.
                 */
                if (foq.last)
                    foq.last->next = oq.first;
                else
                    foq.first = oq.first;
                foq.last = last_finalized;
                if (!ob) {
                    /* All buffers finalized */
                    ASSERT(foq.last == oq.last);
                    ASSERT(foq.last->next == NULL);
                    oq.first = oq.last = NULL;
                }
                else {
                    /* Not all buffers finalized; split oq. */
                    ASSERT(foq.last->next == ob);
                    foq.last->next = NULL;
                    oq.first = ob;
                }
            }
            if (reds <= 0)
                goto preempted;
	}
    }
    else {
        int de_busy;
	int preempt = 0;
	while (oq.first && !preempt) {
	    ErtsDistOutputBuf *fob;
	    Uint size;
            reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds);
            if (reds < 0) {
                preempt = 1;
                break;
            }
	    ASSERT(&oq.first->data[0] <= oq.first->extp
		   && oq.first->extp <= oq.first->ext_endp);
	    size = (*send)(prt, oq.first);
            erts_atomic64_inc_nob(&dep->out);
	    esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
            erts_fprintf(stderr, ">> ");
            bw(oq.first->extp, size);
#endif
	    reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size);
	    fob = oq.first;
	    obufsize += size_obuf(fob);
	    oq.first = oq.first->next;
	    free_dist_obuf(fob);
	    sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
	    preempt = reds <= 0 || (sched_flags & ERTS_PTS_FLG_EXIT);
	    if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt)
		goto finalize_only;
	}

	ASSERT(!oq.first || preempt);

	/*
	 * Preempt if not all buffers have been handled.
	 */
	if (preempt && oq.first)
	    goto preempted;

#ifdef DEBUG
	oq.last = NULL;
#endif
	ASSERT(!oq.first);
	ASSERT(!foq.first && !foq.last);

	/*
	 * Everything that was buffered when we started have now been
	 * written to the port. If port isn't busy but dist entry is
	 * and we havn't got too muched queued on dist entry, set
	 * dist entry in a non-busy state and resume suspended
	 * processes.
	 */
	erts_mtx_lock(&dep->qlock);
        de_busy = !!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY);
        qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
                                                (erts_aint_t) -obufsize);
	ASSERT(qsize >= 0);
	obufsize = 0;
	if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
	    && de_busy && qsize < erts_dist_buf_busy_limit) {
	    ErtsProcList *suspendees;
	    int resumed;
	    suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
	    erts_mtx_unlock(&dep->qlock);

	    resumed = erts_resume_processes(suspendees);
	    reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
	}
	else
	    erts_mtx_unlock(&dep->qlock);
    }

    ASSERT(!oq.first && !oq.last);

 done:

    if (obufsize != 0) {
	ASSERT(obufsize > 0);
	erts_mtx_lock(&dep->qlock);
#ifdef DEBUG
        qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
                                                (erts_aint_t) -obufsize);
	ASSERT(qsize >= 0);
#else
        erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize);
#endif
	erts_mtx_unlock(&dep->qlock);
    }

    ASSERT(!!foq.first == !!foq.last);
    ASSERT(!dep->finalized_out_queue.first);
    ASSERT(!dep->finalized_out_queue.last);

    if (foq.first) {
	dep->finalized_out_queue.first = foq.first;
	dep->finalized_out_queue.last = foq.last;
    }

     /* Avoid wrapping reduction counter... */
    if (reds < INT_MIN/2)
	reds = INT_MIN/2;

    return initial_reds - reds;

 preempted:
    /*
     * Here we assume that state has been read
     * since last call to driver.
     */

    ASSERT(!!oq.first == !!oq.last);

    if (sched_flags & ERTS_PTS_FLG_EXIT) {
	/*
	 * Port died during port command; clean up 'oq'
	 * and 'foq'. Things buffered in dist entry after
	 * we begun processing the queues have already been
	 * cleaned up when port terminated.
	 */

	if (oq.first)
	    oq.last->next = foq.first;
	else
	    oq.first = foq.first;

	while (oq.first) {
	    ErtsDistOutputBuf *fob = oq.first;
	    oq.first = oq.first->next;
	    obufsize += size_obuf(fob);
	    free_dist_obuf(fob);
	}

	foq.first = NULL;
	foq.last = NULL;
    }
    else {
	if (oq.first) {
	    /*
	     * Unhandle buffers need to be put back first
	     * in out_queue.
	     */
	    erts_mtx_lock(&dep->qlock);
	    erts_atomic_add_nob(&dep->qsize, -obufsize);
	    obufsize = 0;
	    oq.last->next = dep->out_queue.first;
	    dep->out_queue.first = oq.first;
	    if (!dep->out_queue.last)
		dep->out_queue.last = oq.last;
	    erts_mtx_unlock(&dep->qlock);
	}

	erts_schedule_dist_command(prt, NULL);
    }
    goto done;
}

#if 0

int
dist_data_finalize(Process *c_p, int reds_limit)
{
    int reds = 5;
    DistEntry *dep = ;
    ErtsDistOutputQueue oq, foq;
    ErtsDistOutputBuf *ob;
    int preempt;


    erts_mtx_lock(&dep->qlock);
    flags = dep->flags;
    oq.first = dep->out_queue.first;
    oq.last = dep->out_queue.last;
    dep->out_queue.first = NULL;
    dep->out_queue.last = NULL;
    erts_mtx_unlock(&dep->qlock);

    if (!oq.first) {
        ASSERT(!oq.last);
        oq.first = dep->tmp_out_queue.first;
        oq.last = dep->tmp_out_queue.last;
    }
    else {
        ErtsDistOutputBuf *f, *l;
        ASSERT(oq.last);
        if (dep->tmp_out_queue.last) {
            dep->tmp_out_queue.last->next = oq.first;
            oq.first = dep->tmp_out_queue.first;
        }
    }

    if (!oq.first) {
        /* Nothing to do... */
        ASSERT(!oq.last);
        return reds;
    }

    foq.first = dep->finalized_out_queue.first;
    foq.last = dep->finalized_out_queue.last;

    preempt = 0;
    ob = oq.first;
    ASSERT(ob);

    do {
        ob->extp = erts_encode_ext_dist_header_finalize(ob->extp,
                                                        dep->cache,
                                                        flags);
        if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
            *--ob->extp = PASS_THROUGH; /* Old node; 'pass through'
                                           needed */
        ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp);
        reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
        preempt = reds > reds_limit;
        if (preempt)
            break;
        ob = ob->next;
    } while (ob);
    /*
     * At least one buffer was finalized; if we got preempted,
     * ob points to the last buffer that we finalized.
     */
    if (foq.last)
        foq.last->next = oq.first;
    else
        foq.first = oq.first;
    if (!preempt) {
        /* All buffers finalized */
        foq.last = oq.last;
        oq.first = oq.last = NULL;
    }
    else {
        /* Not all buffers finalized; split oq. */
        foq.last = ob;
        oq.first = ob->next;
        if (oq.first)
            ob->next = NULL;
        else
            oq.last = NULL;
    }

    dep->finalized_out_queue.first = foq.first;
    dep->finalized_out_queue.last = foq.last;
    dep->tmp_out_queue.first = oq.first;
    dep->tmp_out_queue.last = oq.last;

    return reds;
}

#endif

BIF_RETTYPE
dist_ctrl_get_data_notification_1(BIF_ALIST_1)
{
    DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
    erts_aint32_t qflgs;
    erts_aint_t qsize;
    Eterm receiver = NIL;
    Uint32 conn_id;

    if (!dep)
        BIF_ERROR(BIF_P, EXC_NOTSUP);

    if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
        BIF_ERROR(BIF_P, BADARG);

    /*
     * Caller is the only one that can consume from this queue
     * and the only one that can set the req-info flag...
     */

    erts_de_rlock(dep);

    if (dep->connection_id != conn_id) {
        erts_de_runlock(dep);
        BIF_ERROR(BIF_P, BADARG);
    }

    ASSERT(dep->cid == BIF_P->common.id);

    qflgs = erts_atomic32_read_acqb(&dep->qflgs);

    if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) {
        qsize = erts_atomic_read_acqb(&dep->qsize);
        ASSERT(qsize >= 0);
        if (qsize > 0)
            receiver = BIF_P->common.id; /* Notify ourselves... */
        else { /* Empty queue; set req-info flag... */
            qflgs = erts_atomic32_read_bor_mb(&dep->qflgs,
                                                  ERTS_DE_QFLG_REQ_INFO);
            qsize = erts_atomic_read_acqb(&dep->qsize);
            ASSERT(qsize >= 0);
            if (qsize > 0) {
                qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
                                                       ~ERTS_DE_QFLG_REQ_INFO);
                if (qflgs & ERTS_DE_QFLG_REQ_INFO)
                    receiver = BIF_P->common.id; /* Notify ourselves... */
                /* else: someone else will notify us... */
            }
            /* else: still empty queue... */
        }
    }
    /* else: Already requested... */

    erts_de_runlock(dep);

    if (is_internal_pid(receiver))
        notify_dist_data(BIF_P, receiver);

    BIF_RET(am_ok);
}

BIF_RETTYPE
dist_ctrl_put_data_2(BIF_ALIST_2)
{
    DistEntry *dep;
    ErlDrvSizeT size;
    Eterm input_handler;
    Uint32 conn_id;

    if (is_binary(BIF_ARG_2))
        size = binary_size(BIF_ARG_2);
    else if (is_nil(BIF_ARG_2))
        size = 0;
    else if (is_list(BIF_ARG_2))
        BIF_TRAP2(dist_ctrl_put_data_trap,
                  BIF_P, BIF_ARG_1, BIF_ARG_2);
    else
        BIF_ERROR(BIF_P, BADARG);

    dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id);
    if (!dep)
        BIF_ERROR(BIF_P, BADARG);

    input_handler = (Eterm) erts_atomic_read_nob(&dep->input_handler);

    if (input_handler != BIF_P->common.id)
        BIF_ERROR(BIF_P, EXC_NOTSUP);

    erts_atomic64_inc_nob(&dep->in);

    if (size != 0) {
        byte *data, *temp_alloc = NULL;

        data = (byte *) erts_get_aligned_binary_bytes(BIF_ARG_2, &temp_alloc);
        if (!data)
            BIF_ERROR(BIF_P, BADARG);

        erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);

        (void) erts_net_message(NULL, dep, conn_id, NULL, 0, data, size);
        /*
         * We ignore any decode failures. On fatal failures the
         * connection will be taken down by killing the
         * distribution channel controller...
         */

        erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);

        BUMP_REDS(BIF_P, 5);

        erts_free_aligned_binary_bytes(temp_alloc);

    }

    BIF_RET(am_ok);
}

BIF_RETTYPE
dist_get_stat_1(BIF_ALIST_1)
{
    Sint64 read, write, pend;
    Eterm res, *hp, **hpp;
    Uint sz, *szp;
    Uint32 conn_id;
    DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id);

    if (!dep)
        BIF_ERROR(BIF_P, BADARG);

    erts_de_rlock(dep);

    if (dep->connection_id != conn_id) {
        erts_de_runlock(dep);
        BIF_ERROR(BIF_P, BADARG);
    }
    read = (Sint64) erts_atomic64_read_nob(&dep->in);
    write = (Sint64) erts_atomic64_read_nob(&dep->out);
    pend = (Sint64) erts_atomic_read_nob(&dep->qsize);

    erts_de_runlock(dep);

    sz = 0;
    szp = &sz;
    hpp = NULL;

    while (1) {
        res = erts_bld_tuple(hpp, szp, 4,
                             am_ok,
                             erts_bld_sint64(hpp, szp, read),
                             erts_bld_sint64(hpp, szp, write),
                             pend ? am_true : am_false);
        if (hpp)
            break;
        hp = HAlloc(BIF_P, sz);
        hpp = &hp;
        szp = NULL;
    }

    BIF_RET(res);
}

BIF_RETTYPE
dist_ctrl_input_handler_2(BIF_ALIST_2)
{
    DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
    Uint32 conn_id;

    if (!dep)
        BIF_ERROR(BIF_P, EXC_NOTSUP);

    if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
        BIF_ERROR(BIF_P, BADARG);

    if (is_not_internal_pid(BIF_ARG_2))
        BIF_ERROR(BIF_P, BADARG);

    erts_de_rlock(dep);
    if (dep->connection_id != conn_id) {
        erts_de_runlock(dep);
        BIF_ERROR(BIF_P, BADARG);
    }
    erts_atomic_set_nob(&dep->input_handler,
                        (erts_aint_t) BIF_ARG_2);
    erts_de_runlock(dep);
    BIF_RET(am_ok);
}

BIF_RETTYPE
dist_ctrl_get_data_1(BIF_ALIST_1)
{
    DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
    const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
    Sint reds = initial_reds;
    ErtsDistOutputBuf *obuf;
    Eterm *hp;
    ProcBin *pb;
    erts_aint_t qsize;
    Uint32 conn_id;

    if (!dep)
        BIF_ERROR(BIF_P, EXC_NOTSUP);

    if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
        BIF_ERROR(BIF_P, BADARG);

    erts_de_rlock(dep);

    if (dep->connection_id != conn_id) {
        erts_de_runlock(dep);
        BIF_ERROR(BIF_P, BADARG);
    }

    if (dep->state == ERTS_DE_STATE_EXITING)
        goto return_none;

    ASSERT(dep->cid == BIF_P->common.id);

#if 0
    if (dep->finalized_out_queue.first) {
        obuf = dep->finalized_out_queue.first;
        dep->finalized_out_queue.first = obuf->next;
        if (!obuf->next)
            dep->finalized_out_queue.last = NULL;
    }
    else
#endif
    {
        if (!dep->tmp_out_queue.first) {
            ASSERT(!dep->tmp_out_queue.last);
            ASSERT(!dep->transcode_ctx);
            qsize = erts_atomic_read_acqb(&dep->qsize);
            if (qsize > 0) {
                erts_mtx_lock(&dep->qlock);
                dep->tmp_out_queue.first = dep->out_queue.first;
                dep->tmp_out_queue.last = dep->out_queue.last;
                dep->out_queue.first = NULL;
                dep->out_queue.last = NULL;
                erts_mtx_unlock(&dep->qlock);
            }
        }

        if (!dep->tmp_out_queue.first) {
            ASSERT(!dep->tmp_out_queue.last);
        return_none:
            erts_de_runlock(dep);
            BIF_RET(am_none);
        }

        obuf = dep->tmp_out_queue.first;
        reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds);
        if (reds < 0) {
            erts_de_runlock(dep);
            ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1],
                            BIF_P, BIF_ARG_1);
        }

        dep->tmp_out_queue.first = obuf->next;
        if (!obuf->next)
            dep->tmp_out_queue.last = NULL;
    }

    erts_atomic64_inc_nob(&dep->out);

    erts_de_runlock(dep);

    hp =  HAlloc(BIF_P, PROC_BIN_SIZE);
    pb = (ProcBin *) (char *) hp;
    pb->thing_word = HEADER_PROC_BIN;
    pb->size = obuf->ext_endp - obuf->extp;
    pb->next = MSO(BIF_P).first;
    MSO(BIF_P).first = (struct erl_off_heap_header*) pb;
    pb->val = ErtsDistOutputBuf2Binary(obuf);
    pb->bytes = (byte*) obuf->extp;
    pb->flags = 0;

    qsize = erts_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf));
    ASSERT(qsize >= 0);

    if (qsize < erts_dist_buf_busy_limit/2
        && (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) {
        ErtsProcList *resume_procs = NULL;
        erts_mtx_lock(&dep->qlock);
        resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
        erts_mtx_unlock(&dep->qlock);
        if (resume_procs) {
            int resumed = erts_resume_processes(resume_procs);
            reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
        }
    }

    BIF_RET2(make_binary(pb), (initial_reds - reds));
}

void
erts_dist_port_not_busy(Port *prt)
{
#ifdef USE_VM_PROBES
    if (DTRACE_ENABLED(dist_port_not_busy)) {
        DTRACE_CHARBUF(port_str, 64);
        DTRACE_CHARBUF(remote_str, 64);

        erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
                      "%T", prt->common.id);
        erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
                      "%T", prt->dist_entry->sysname);
        DTRACE3(dist_port_not_busy, erts_this_node_sysname,
                port_str, remote_str);
    }
#endif
    erts_schedule_dist_command(prt, NULL);
}

static void kill_connection(DistEntry *dep)
{
    ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep));
    ASSERT(dep->state == ERTS_DE_STATE_CONNECTED);

    dep->state = ERTS_DE_STATE_EXITING;
    erts_mtx_lock(&dep->qlock);
    ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
    erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT);
    erts_mtx_unlock(&dep->qlock);

    if (is_internal_port(dep->cid))
        erts_schedule_dist_command(NULL, dep);
    else if (is_internal_pid(dep->cid))
        schedule_kill_dist_ctrl_proc(dep->cid);
}

void
erts_kill_dist_connection(DistEntry *dep, Uint32 conn_id)
{
    erts_de_rwlock(dep);
    if (conn_id == dep->connection_id
        && dep->state == ERTS_DE_STATE_CONNECTED) {

        kill_connection(dep);
    }
    erts_de_rwunlock(dep);
}

struct print_to_data {
    fmtfn_t to;
    void *arg;
};

static void doit_print_monitor_info(ErtsMonitor *mon, void *vptdp)
{
    fmtfn_t to = ((struct print_to_data *) vptdp)->to;
    void *arg = ((struct print_to_data *) vptdp)->arg;
    ErtsMonitorDataExtended *mdep;

    ASSERT(mon->flags & ERTS_ML_FLG_EXTENDED);

    mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon);

    ASSERT(mdep->dist);

    if (erts_monitor_is_origin(mon)) {
	erts_print(to, arg, "Remotely monitored by: %T %T\n",
		   mon->other.item, mdep->md.target.other.item);
    }
    else {
	erts_print(to, arg, "Remote monitoring: %T ", mon->other.item);
        if (mon->flags & ERTS_ML_FLG_NAME)
	    erts_print(to, arg, "{%T, %T}\n", mdep->u.name, mdep->dist->nodename);
        else
	    erts_print(to, arg, "%T\n", mdep->md.origin.other.item);
    }
}    

static void print_monitor_info(fmtfn_t to, void *arg, DistEntry *dep)
{
    struct print_to_data ptd = {to, arg};
    if (dep->mld) {
        erts_monitor_list_foreach(dep->mld->monitors,
                                  doit_print_monitor_info,
                                  (void *) &ptd);
        erts_monitor_tree_foreach(dep->mld->orig_name_monitors,
                                  doit_print_monitor_info,
                                  (void *) &ptd);
    }
}

static void doit_print_link_info(ErtsLink *lnk, void *vptdp)
{
    struct print_to_data *ptdp = vptdp;
    ErtsLink *lnk2 = erts_link_to_other(lnk, NULL);
    erts_print(ptdp->to, ptdp->arg, "Remote link: %T %T\n",
	       lnk2->other.item, lnk->other.item);
}

static void print_link_info(fmtfn_t to, void *arg, DistEntry *dep)
{
    struct print_to_data ptd = {to, arg};
    if (dep->mld)
        erts_link_list_foreach(dep->mld->links,
                               doit_print_link_info,
                               (void *) &ptd);
}

typedef struct {
    struct print_to_data ptd;
    Eterm sysname;
} PrintNodeLinkContext;
    
static int
info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connected)
{

  if (visible && connected) {
      erts_print(to, arg, "=visible_node:");
  } else if (connected) {
      erts_print(to, arg, "=hidden_node:");
  } else {
      erts_print(to, arg, "=not_connected:");
  }
  erts_print(to, arg, "%d\n", dist_entry_channel_no(dep));

  if(connected && is_nil(dep->cid)) {
    erts_print(to, arg,
	       "Error: Not connected node still registered as connected:%T\n",
	       dep->sysname);
    return 0;
  }

  if(!connected && is_not_nil(dep->cid)) {
    erts_print(to, arg,
	       "Error: Connected node not registered as connected:%T\n",
	       dep->sysname);
    return 0;
  }

  erts_print(to, arg, "Name: %T", dep->sysname);
  erts_print(to, arg, "\n");
  if (!connected && is_nil(dep->cid)) {
    if (dep->mld) {
      erts_print(to, arg, "Error: Got links/monitors to not connected node:%T\n",
		 dep->sysname);
    }
    return 0;
  }

  erts_print(to, arg, "Controller: %T\n", dep->cid, to);

  erts_print_node_info(to, arg, dep->sysname, NULL, NULL);
  print_monitor_info(to, arg, dep);
  print_link_info(to, arg, dep);

  return 0;
    
}
int distribution_info(fmtfn_t to, void *arg)	/* Called by break handler */
{
    DistEntry *dep;

    erts_print(to, arg, "=node:%T\n", erts_this_dist_entry->sysname);
 
    if (erts_this_node->sysname == am_Noname) {
	erts_print(to, arg, "=no_distribution\n");
	return(0);
    }

#if 0
    if (!erts_visible_dist_entries && !erts_hidden_dist_entries) 
      erts_print(to, arg, "Alive but not holding any connections \n");
#endif

    for(dep = erts_visible_dist_entries; dep; dep = dep->next) {
      info_dist_entry(to, arg, dep, 1, 1);
    }

    for(dep = erts_hidden_dist_entries; dep; dep = dep->next) {
      info_dist_entry(to, arg, dep, 0, 1);
    }

    for (dep = erts_pending_dist_entries; dep; dep = dep->next) {
        info_dist_entry(to, arg, dep, 0, 0);
    }

    for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) {
        if (dep != erts_this_dist_entry) {
            info_dist_entry(to, arg, dep, 0, 0);
        }
    }

    return(0);
}

/****************************************************************************
  DISTRIBUTION BIFS:

            setnode/2     -- start distribution
            setnode/3     -- set node controller

            node/1        -- return objects node name
            node/0        -- return this node name
            nodes/0       -- return a list of all (non hidden) nodes
            is_alive      -- return true if distribution is running else false
	    monitor_node  -- turn on/off node monitoring

            node controller only:
            dist_link/2       -- link a remote process to a local
            dist_unlink/2     -- unlink a remote from a local
****************************************************************************/



/**********************************************************************
 ** Set the node name of current node fail if node already is set.
 ** setnode(name@host, Creation)
 ***********************************************************************/

BIF_RETTYPE setnode_2(BIF_ALIST_2)
{
    Process *net_kernel;
    Uint creation;

    /* valid creation ? */
    if(!term_to_Uint(BIF_ARG_2, &creation))
	goto error;
    if(creation > 3)
	goto error;

    /* valid node name ? */
    if (!is_node_name_atom(BIF_ARG_1))
	goto error;

    if (BIF_ARG_1 == am_Noname) /* cant use this name !! */
	goto error;
    if (erts_is_alive)     /* must not be alive! */
	goto error;

    /* Check that all trap functions are defined !! */
    if (dmonitor_node_trap->addressv[0] == NULL) {
	goto error;
    }

    net_kernel = erts_whereis_process(BIF_P,
                                      ERTS_PROC_LOCK_MAIN,
				      am_net_kernel,
                                      ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS,
                                      0);
    if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel))
	goto error;

    /* By setting F_DISTRIBUTION on net_kernel,
     * erts_do_net_exits will be called when net_kernel is terminated !! */
    net_kernel->flags |= F_DISTRIBUTION;

    erts_proc_unlock(net_kernel,
                     (ERTS_PROC_LOCK_STATUS
                      | ((net_kernel != BIF_P)
                         ? ERTS_PROC_LOCK_MAIN
                         : 0)));

#ifdef DEBUG
    erts_rwmtx_rlock(&erts_dist_table_rwmtx);
    ASSERT(!erts_visible_dist_entries && !erts_hidden_dist_entries);
    erts_rwmtx_runlock(&erts_dist_table_rwmtx);
#endif

    erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
    erts_thr_progress_block();
    inc_no_nodes();
    erts_set_this_node(BIF_ARG_1, (Uint32) creation);
    erts_is_alive = 1;
    send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL);
    erts_thr_progress_unblock();
    erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);

    /*
     * Note erts_this_dist_entry is changed by erts_set_this_node(),
     * so we *need* to use the new one after erts_set_this_node()
     * is called.
     */
    erts_ref_dist_entry(erts_this_dist_entry);
    ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry);

    BIF_RET(am_true);

 error:
    BIF_ERROR(BIF_P, BADARG);
}

/*
 * erts_internal:create_dist_channel/4 is used by
 * erlang:setnode/3.
 */

typedef struct {
    DistEntry *dep;
    Uint flags;
    Uint version;
} ErtsSetupConnDistCtrl;

static void
setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
                                  Eterm ctrlr, Uint flags,
                                  Uint version);

static Eterm
setup_connection_distctrl(Process *c_p, void *arg,
                          int *redsp, ErlHeapFragment **bpp);

BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
{
    BIF_RETTYPE ret;
    Uint flags;
    Uint version;
    Eterm *hp, res_tag = THE_NON_VALUE, res = THE_NON_VALUE;
    DistEntry *dep = NULL;
    int de_locked = 0;
    Port *pp = NULL;

    /*
     * Check and pick out arguments
     */

    /* Node name... */
    if (!is_node_name_atom(BIF_ARG_1))
        goto badarg;

    /* Distribution controller... */
    if (!is_internal_port(BIF_ARG_2) && !is_internal_pid(BIF_ARG_2))
        goto badarg;

    /* Dist flags... */
    if (!is_small(BIF_ARG_3))
        goto badarg;
    flags = unsigned_val(BIF_ARG_3);

    /* Version... */
    if (!is_small(BIF_ARG_4))
        goto badarg;
    version = unsigned_val(BIF_ARG_4);

    if (version == 0)
        goto badarg;

    if (~flags & DFLAG_DIST_MANDATORY) {
	erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
	erts_dsprintf(dsbufp, "%T", BIF_P->common.id);
	if (BIF_P->common.u.alive.reg)
	    erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name);
	erts_dsprintf(dsbufp,
		      " attempted to enable connection to node %T "
		      "which does not support all mandatory capabilities.\n",
		      BIF_ARG_1);
	erts_send_error_to_logger(BIF_P->group_leader, dsbufp);
	goto badarg;
    }

    /*
     * ToDo: Should we not pass connection_id as well
     *       to make sure it's the right connection we commit.
     */

    /*
     * Arguments seem to be in order.
     */

    /* get dist_entry */
    dep = erts_find_or_insert_dist_entry(BIF_ARG_1);
    if (dep == erts_this_dist_entry)
	goto badarg;
    else if (!dep)
	goto system_limit; /* Should never happen!!! */

    if (is_internal_pid(BIF_ARG_2)) {
        if (BIF_P->common.id == BIF_ARG_2) {
            ErtsSetupConnDistCtrl scdc;

            scdc.dep = dep;
            scdc.flags = flags;
            scdc.version = version;

            res = setup_connection_distctrl(BIF_P, &scdc, NULL, NULL);
            BUMP_REDS(BIF_P, 5);
            dep = NULL;

            if (res == am_badarg)
                goto badarg;

            ASSERT(is_internal_magic_ref(res));
            res_tag = am_ok; /* Connection up */
        }
        else {
            ErtsSetupConnDistCtrl *scdcp;

            scdcp = erts_alloc(ERTS_ALC_T_SETUP_CONN_ARG,
                               sizeof(ErtsSetupConnDistCtrl));

            scdcp->dep = dep;
            scdcp->flags = flags;
            scdcp->version = version;

            res = erts_proc_sig_send_rpc_request(BIF_P,
                                                 BIF_ARG_2,
                                                 !0,
                                                 setup_connection_distctrl,
                                                 (void *) scdcp);
            if (is_non_value(res))
                goto badarg;

            dep = NULL;

            ASSERT(is_internal_ordinary_ref(res));

            res_tag = am_message; /* Caller need to wait for dhandle in message */
        }
        hp = HAlloc(BIF_P, 3);
    }
    else {
        Uint32 conn_id;

        pp = erts_id2port_sflgs(BIF_ARG_2,
                                BIF_P,
                                ERTS_PROC_LOCK_MAIN,
                                ERTS_PORT_SFLGS_INVALID_LOOKUP);
        erts_de_rwlock(dep);
        de_locked = 1;

        if (dep->state != ERTS_DE_STATE_PENDING)
            goto badarg;

        if (!pp || (erts_atomic32_read_nob(&pp->state)
                    & ERTS_PORT_SFLG_EXITING))
            goto badarg;

        if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
            goto badarg;

        if (pp->dist_entry || is_not_nil(dep->cid))
            goto badarg;

        erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);

        pp->dist_entry = dep;
        pp->connection_id = dep->connection_id;

        ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);

        dep->send = (pp->drv_ptr->outputv
                     ? dist_port_commandv
                     : dist_port_command);
        ASSERT(dep->send);

        /*
         * Dist-ports do not use the "busy port message queue" functionality, but
         * instead use "busy dist entry" functionality.
        */
        {
            ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
            erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
        }

        conn_id = dep->connection_id;
        setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version);
        de_locked = 0;

        hp = HAlloc(BIF_P, 3 + ERTS_DHANDLE_SIZE);
        res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id);
        res_tag = am_ok; /* Connection up */
        dep = NULL; /* inc of refc transferred to port (dist_entry field) */
    }

    ASSERT(is_value(res) && is_value(res_tag));

    res = TUPLE2(hp, res_tag, res);

    ERTS_BIF_PREP_RET(ret, res);

 done:

    if (dep && dep != erts_this_dist_entry) {
        if (de_locked) {
            if (de_locked > 0)
                erts_de_rwunlock(dep);
            else
                erts_de_runlock(dep);
        }
	erts_deref_dist_entry(dep);
    }

    if (pp)
	erts_port_release(pp);

    return ret;

 badarg:
    ERTS_BIF_PREP_RET(ret, am_badarg);
    goto done;

 system_limit:
    ERTS_BIF_PREP_RET(ret, am_system_limit);
    goto done;
}

static void
setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
                                  Eterm ctrlr, Uint flags,
                                  Uint version)
{
    Eterm notify_proc = NIL;
    erts_aint32_t qflgs;

    dep->version = version;
    dep->creation = 0;

    ASSERT(is_internal_port(ctrlr) || is_internal_pid(ctrlr));
    ASSERT(dep->state == ERTS_DE_STATE_PENDING);

    if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
	create_cache(dep);

    erts_set_dist_entry_connected(dep, ctrlr, flags);

    notify_proc = NIL;
    if (erts_atomic_read_nob(&dep->qsize)) {
        if (is_internal_port(dep->cid)) {
            erts_schedule_dist_command(NULL, dep);
        }
        else {
            qflgs = erts_atomic32_read_nob(&dep->qflgs);
            if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
                qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
                                                   ~ERTS_DE_QFLG_REQ_INFO);
                if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
                    notify_proc = dep->cid;
                    ASSERT(is_internal_pid(notify_proc));
                }
            }
        }
    }

    erts_de_rwunlock(dep);

    if (is_internal_pid(notify_proc))
        notify_dist_data(c_p, notify_proc);

    inc_no_nodes();

    send_nodes_mon_msgs(c_p,
			am_nodeup,
			dep->sysname,
			flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
			NIL);
}

static Eterm
setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp)
{
    ErtsSetupConnDistCtrl *scdcp = (ErtsSetupConnDistCtrl *) arg;
    DistEntry *dep = scdcp->dep;
    int dep_locked = 0;
    Eterm *hp;
    Uint32 conn_id;

    if (redsp)
        *redsp = 1;

    ASSERT(!ERTS_PROC_IS_EXITING(c_p));

    erts_de_rwlock(dep);
    dep_locked = !0;

    if (dep->state != ERTS_DE_STATE_PENDING)
        goto badarg;

    conn_id = dep->connection_id;

    if (is_not_nil(dep->cid))
        goto badarg;

    c_p->flags |= F_DISTRIBUTION;
    ERTS_PROC_SET_DIST_ENTRY(c_p, dep);

    dep->send = NULL; /* Only for distr ports... */

    if (redsp)
        *redsp = 5;

    setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id,
                                      scdcp->flags, scdcp->version);

    /* we take over previous inc in refc of dep */

    if (!bpp) /* called directly... */
        return erts_make_dhandle(c_p, dep, conn_id);

    erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);

    *bpp = new_message_buffer(ERTS_DHANDLE_SIZE);
    hp = (*bpp)->mem;
    return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep, conn_id);

badarg:

    if (bpp) /* not called directly */
        erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);

    if (dep_locked)
        erts_de_rwunlock(dep);

    erts_deref_dist_entry(dep);

    return am_badarg;
}


BIF_RETTYPE erts_internal_get_dflags_0(BIF_ALIST_0)
{
    return erts_dflags_record;
}

BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1)
{
    DistEntry* dep;
    Uint32 conn_id;
    Eterm* hp;
    Eterm dhandle;

    if (is_not_atom(BIF_ARG_1)) {
	BIF_ERROR(BIF_P, BADARG);
    }
    dep = erts_find_or_insert_dist_entry(BIF_ARG_1);

    if (dep == erts_this_dist_entry) {
        erts_deref_dist_entry(dep);
        BIF_ERROR(BIF_P, BADARG);
    }

    erts_de_rwlock(dep);

    switch (dep->state) {
    case ERTS_DE_STATE_CONNECTED:
    case ERTS_DE_STATE_EXITING:
    case ERTS_DE_STATE_PENDING:
	conn_id = dep->connection_id;
        break;
    case ERTS_DE_STATE_IDLE:
        erts_set_dist_entry_pending(dep);
	conn_id = dep->connection_id;
        break;
    default:
        erts_exit(ERTS_ABORT_EXIT, "Invalid dep->state (%d)\n", dep->state);
    }
    erts_de_rwunlock(dep);
    hp = HAlloc(BIF_P, ERTS_DHANDLE_SIZE);
    dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id);
    erts_deref_dist_entry(dep);
    BIF_RET(dhandle);
}

static Sint abort_connection(DistEntry* dep, Uint32 conn_id)
{
    erts_de_rwlock(dep);

    if (dep->connection_id != conn_id)
        ;
    else if (dep->state == ERTS_DE_STATE_CONNECTED) {
        kill_connection(dep);
    }
    else if (dep->state == ERTS_DE_STATE_PENDING) {
        ErtsAtomCache *cache;
        ErtsDistOutputBuf *obuf;
        ErtsProcList *resume_procs;
        Sint reds = 0;
        ErtsMonLnkDist *mld;

	ASSERT(is_nil(dep->cid));

        mld = dep->mld;
        dep->mld = NULL;

	cache = dep->cache;
	dep->cache = NULL;
	erts_mtx_lock(&dep->qlock);
        obuf = dep->out_queue.first;
        dep->out_queue.first = NULL;
        dep->out_queue.last = NULL;
        ASSERT(!dep->tmp_out_queue.first);
        ASSERT(!dep->finalized_out_queue.first);
        resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
	erts_mtx_unlock(&dep->qlock);
	erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
	dep->send = NULL;

        erts_set_dist_entry_not_connected(dep);
	erts_de_rwunlock(dep);

        schedule_con_monitor_link_cleanup(mld, THE_NON_VALUE,
                                          THE_NON_VALUE, THE_NON_VALUE);

        if (resume_procs) {
            int resumed = erts_resume_processes(resume_procs);
            reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
        }

	delete_cache(cache);
	free_de_out_queues(dep, obuf);

        return reds;
    }
    erts_de_rwunlock(dep);
    return 0;
}

Sint
erts_abort_connection(DistEntry *dep, Uint32 conn_id)
{
    return abort_connection(dep, conn_id);
}

BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2)
{
    DistEntry* dep;
    Uint32 conn_id;
    Sint reds;

    if (is_not_atom(BIF_ARG_1))
        BIF_ERROR(BIF_P, BADARG);
    dep = erts_dhandle_to_dist_entry(BIF_ARG_2, &conn_id);
    if (!dep || dep != erts_find_dist_entry(BIF_ARG_1)
        || dep == erts_this_dist_entry) {
        BIF_ERROR(BIF_P, BADARG);
    }

    reds = abort_connection(dep, conn_id);
    BUMP_REDS(BIF_P, reds);
    BIF_RET(am_true);
}

int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks)
{
    erts_de_rwlock(dep);
    if (dep->state != ERTS_DE_STATE_IDLE) {
        erts_de_rwunlock(dep);
    }
    else {
        Process* net_kernel;
        ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ;
        Eterm *hp;
        ErlOffHeap *ohp;
        ErtsMessage *mp;
        Eterm msg, dhandle;
        Uint32 conn_id;

        erts_set_dist_entry_pending(dep);
        conn_id = dep->connection_id;
        erts_de_rwunlock(dep);

        net_kernel = erts_whereis_process(proc, proc_locks,
                                          am_net_kernel, nk_locks, 0);
        if (!net_kernel) {
            abort_connection(dep, conn_id);
            return 0;
        }

        /*
         * Send {auto_connect, Node, DHandle} to net_kernel
         */
        mp = erts_alloc_message_heap(net_kernel, &nk_locks,
                                     4 + ERTS_DHANDLE_SIZE,
                                     &hp, &ohp);
        dhandle = erts_build_dhandle(&hp, ohp, dep, conn_id);
        msg = TUPLE3(hp, am_auto_connect, dep->sysname, dhandle);
        ERL_MESSAGE_TOKEN(mp) = am_undefined;
        erts_queue_proc_message(proc, net_kernel, nk_locks, mp, msg);
        erts_proc_unlock(net_kernel, nk_locks);
    }

    return 1;
}

/**********************************************************************/
/* node(Object) -> Node */

BIF_RETTYPE node_1(BIF_ALIST_1)
{ 
    if (is_not_node_container(BIF_ARG_1))
      BIF_ERROR(BIF_P, BADARG);
    BIF_RET(node_container_node_name(BIF_ARG_1));
}

/**********************************************************************/
/* node() -> Node */

BIF_RETTYPE node_0(BIF_ALIST_0)
{
    BIF_RET(erts_this_dist_entry->sysname);
}


/**********************************************************************/
/* nodes() -> [ Node ] */

#if 0 /* Done in erlang.erl instead. */
BIF_RETTYPE nodes_0(BIF_ALIST_0)
{
  return nodes_1(BIF_P, am_visible);
}
#endif


BIF_RETTYPE nodes_1(BIF_ALIST_1)
{
    Eterm result;
    int length;
    Eterm* hp;
    int not_connected = 0;
    int visible = 0;
    int hidden = 0;
    int this = 0;
    DeclareTmpHeap(buf,2,BIF_P); /* For one cons-cell */
    DistEntry *dep;
    Eterm arg_list = BIF_ARG_1;
#ifdef DEBUG
    Eterm* endp;
#endif

    UseTmpHeap(2,BIF_P);

    if (is_atom(BIF_ARG_1))
      arg_list = CONS(buf, BIF_ARG_1, NIL);

    while (is_list(arg_list)) {
      switch(CAR(list_val(arg_list))) {
      case am_visible:   visible = 1;                                 break;
      case am_hidden:    hidden = 1;                                  break;
      case am_known:     visible = hidden = not_connected = this = 1; break;
      case am_this:      this = 1;                                    break;
      case am_connected: visible = hidden = 1;                        break;
      default:           goto error;                                  break;
      }
      arg_list = CDR(list_val(arg_list));
    }

    if (is_not_nil(arg_list)) {
	goto error;
    }

    length = 0;

    erts_rwmtx_rlock(&erts_dist_table_rwmtx);

    ASSERT(erts_no_of_not_connected_dist_entries > 0);
    ASSERT(erts_no_of_hidden_dist_entries >= 0);
    ASSERT(erts_no_of_pending_dist_entries >= 0);
    ASSERT(erts_no_of_visible_dist_entries >= 0);
    if(not_connected)
      length += ((erts_no_of_not_connected_dist_entries - 1)
                 + erts_no_of_pending_dist_entries);
    if(hidden)
      length += erts_no_of_hidden_dist_entries;
    if(visible)
      length += erts_no_of_visible_dist_entries;
    if(this)
      length++;

    result = NIL;

    if (length == 0) {
	erts_rwmtx_runlock(&erts_dist_table_rwmtx);
	goto done;
    }

    hp = HAlloc(BIF_P, 2*length);

#ifdef DEBUG
    endp = hp + length*2;
#endif
    if(not_connected) {
      for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) {
          if (dep != erts_this_dist_entry) {
            result = CONS(hp, dep->sysname, result);
            hp += 2;
          }
        }
      for(dep = erts_pending_dist_entries; dep; dep = dep->next) {
          result = CONS(hp, dep->sysname, result);
          hp += 2;
      }
    }
    if(hidden)
      for(dep = erts_hidden_dist_entries; dep; dep = dep->next) {
	result = CONS(hp, dep->sysname, result);
	hp += 2;
      }
    if(visible)
      for(dep = erts_visible_dist_entries; dep; dep = dep->next) {
	result = CONS(hp, dep->sysname, result);
	hp += 2;
      }
    if(this) {
	result = CONS(hp, erts_this_dist_entry->sysname, result);
	hp += 2;
    }
    ASSERT(endp == hp);
    erts_rwmtx_runlock(&erts_dist_table_rwmtx);

done:
    UnUseTmpHeap(2,BIF_P);
    BIF_RET(result);

error:
    UnUseTmpHeap(2,BIF_P);
    BIF_ERROR(BIF_P,BADARG);
}

/**********************************************************************/
/* is_alive() -> Bool */

BIF_RETTYPE is_alive_0(BIF_ALIST_0)
{
    Eterm res = erts_is_alive ? am_true : am_false;
    BIF_RET(res);
}

/**********************************************************************/
/* erlang:monitor_node(Node, Bool, Options) -> Bool */

static BIF_RETTYPE
monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options)
{
    BIF_RETTYPE ret;
    DistEntry *dep = NULL;
    Eterm l;
    int async_connect = 1;

    for (l = Options; l != NIL && is_list(l); l = CDR(list_val(l))) {
	Eterm t = CAR(list_val(l));
	if (t == am_allow_passive_connect) {
	    /*
	     * Handle this horrible feature by falling back on old synchronous
	     * auto-connect (if needed)
	     */
	    async_connect = 0;
	} else {
	    BIF_ERROR(p, BADARG);
	}
    }
    if (l != NIL) {
	BIF_ERROR(p, BADARG);
    }
    if (l != NIL)
        goto badarg;

    if (is_not_atom(Node))
        goto badarg;

    if (erts_this_node->sysname == am_Noname && Node != am_Noname)
	goto badarg;

    switch (Bool) {

    case am_false: {
        ErtsMonitor *mon;
        /*
         * Before OTP-21, monitor_node(Node, false) triggered
         * auto-connect and a 'nodedown' message if that failed.
         * Now it's a simple no-op which feels more reasonable.
         */
        mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(p), Node);
        if (mon) {
            ErtsMonitorDataExtended *mdep;
            ASSERT(erts_monitor_is_origin(mon));

            mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon);

            ASSERT((mdep->u.refc > 0));
            if (--mdep->u.refc == 0) {
                if (!mdep->uptr.node_monitors)
                    erts_monitor_tree_delete(&ERTS_P_MONITORS(p), mon);
                else {
                    ErtsMonitor *sub_mon;
                    ErtsMonitorDataExtended *sub_mdep;
                    sub_mon = erts_monitor_list_last(mdep->uptr.node_monitors);
                    erts_monitor_list_delete(&mdep->uptr.node_monitors, sub_mon);
                    sub_mon->flags &= ~ERTS_ML_FLG_IN_SUBTABLE;
                    sub_mdep = ((ErtsMonitorDataExtended *)
                                erts_monitor_to_data(sub_mon));
                    sub_mdep->uptr.node_monitors = mdep->uptr.node_monitors;
                    mdep->uptr.node_monitors = NULL;
                    erts_monitor_tree_replace(&ERTS_P_MONITORS(p), mon, sub_mon);
                }
                if (erts_monitor_dist_delete(&mdep->md.target))
                    erts_monitor_release_both((ErtsMonitorData *) mdep);
                else
                    erts_monitor_release(mon);
            }
        }
        break;
    }

    case am_true: {
        ErtsDSigData dsd;
        dsd.node = Node;

        dep = erts_find_or_insert_dist_entry(Node);
        if (dep == erts_this_dist_entry)
            break;

        switch (erts_dsig_prepare(&dsd, dep, p,
                                  ERTS_PROC_LOCK_MAIN,
                                  ERTS_DSP_RLOCK, 0, async_connect)) {
        case ERTS_DSIG_PREP_NOT_ALIVE:
	case ERTS_DSIG_PREP_NOT_CONNECTED:
	    /* Trap to either send 'nodedown' or do passive connection attempt */
            goto do_trap;
	case ERTS_DSIG_PREP_PENDING:
	    if (!async_connect) {
		/*
		 * Pending connection may fail, so we must trap
		 * to ensure passive connection attempt
		 */
		erts_de_runlock(dep);
		goto do_trap;
	    }
	    /*fall through*/
        case ERTS_DSIG_PREP_CONNECTED: {
            ErtsMonitor *mon;
            ErtsMonitorDataExtended *mdep;
            int created;

            mon = erts_monitor_tree_lookup_create(&ERTS_P_MONITORS(p),
                                                  &created,
                                                  ERTS_MON_TYPE_NODE,
                                                  p->common.id,
                                                  Node);
            mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon);
            if (created) {
#ifdef DEBUG
                int inserted =
#endif
                    erts_monitor_dist_insert(&mdep->md.target, dep->mld);
                ASSERT(inserted);
                ASSERT(mdep->dist->connection_id == dep->connection_id);
            }
            else if (mdep->dist->connection_id != dep->connection_id) {
                ErtsMonitorDataExtended *mdep2;
                ErtsMonitor *mon2;
#ifdef DEBUG
                int inserted;
#endif
                mdep2 = ((ErtsMonitorDataExtended *)
                         erts_monitor_create(ERTS_MON_TYPE_NODE, NIL,
                                             p->common.id, Node, NIL));
                mon2 = &mdep2->md.origin;
#ifdef DEBUG
                inserted =
#endif
                    erts_monitor_dist_insert(&mdep->md.target, dep->mld);
                ASSERT(inserted);
                ASSERT(mdep2->dist->connection_id == dep->connection_id);

                mdep2->uptr.node_monitors = mdep->uptr.node_monitors;
                mdep->uptr.node_monitors = NULL;
                erts_monitor_tree_replace(&ERTS_P_MONITORS(p), mon, mon2);
                erts_monitor_list_insert(&mdep2->uptr.node_monitors, mon);
                mon->flags |= ERTS_ML_FLG_IN_SUBTABLE;
                mdep = mdep2;
            }

            mdep->u.refc++;

            break;
        }

        default:
            ERTS_ASSERT(! "Invalid dsig prepare result");
        }

        erts_de_runlock(dep);

        break;
    }

    default:
        goto badarg;
    }

    ERTS_BIF_PREP_RET(ret, am_true);

do_return:

    if (dep)
        erts_deref_dist_entry(dep);

    return ret;

do_trap:
    ERTS_BIF_PREP_TRAP3(ret, dmonitor_node_trap, p, Node, Bool, Options);
    goto do_return;

badarg:
    ERTS_BIF_PREP_ERROR(ret, p, BADARG);
    goto do_return;
}

BIF_RETTYPE monitor_node_3(BIF_ALIST_3)
{
    BIF_RET(monitor_node(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3));
}


/* monitor_node(Node, Bool) -> Bool */

BIF_RETTYPE monitor_node_2(BIF_ALIST_2)
{
    BIF_RET(monitor_node(BIF_P, BIF_ARG_1, BIF_ARG_2, NIL));
}

BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1)
{
    DistEntry *de;
    Uint32 f;
    if (is_not_pid(BIF_ARG_1)) {
	BIF_ERROR(BIF_P,BADARG);
    }
    de = pid_dist_entry(BIF_ARG_1);
    ASSERT(de != NULL);
    if (de == erts_this_dist_entry) {
	BIF_RET(am_true);
    }
    erts_de_rlock(de);
    f = de->flags;
    erts_de_runlock(de);
    BIF_RET(((f & DFLAG_UNICODE_IO) ? am_true : am_false));
}
    
/*
 * The major part of the implementation of net_kernel:monitor_nodes/[1,2]
 * follows.
 *
 * Currently net_kernel:monitor_nodes/[1,2] calls process_flag/2 which in
 * turn calls erts_monitor_nodes(). If the process_flag() call fails (with
 * badarg), the code in net_kernel determines what type of error to return.
 * This in order to simplify the task of being backward compatible.
 */

#define ERTS_NODES_MON_OPT_TYPE_VISIBLE		(((Uint16) 1) << 0)
#define ERTS_NODES_MON_OPT_TYPE_HIDDEN		(((Uint16) 1) << 1)
#define ERTS_NODES_MON_OPT_DOWN_REASON		(((Uint16) 1) << 2)

#define ERTS_NODES_MON_OPT_TYPES \
  (ERTS_NODES_MON_OPT_TYPE_VISIBLE|ERTS_NODES_MON_OPT_TYPE_HIDDEN)

static erts_mtx_t nodes_monitors_mtx;
static ErtsMonitor *nodes_monitors;
static Uint no_nodes_monitors;

/*
 * Nodes monitors are stored in a double linked list. 'nodes_monitors'
 * points to the beginning of the list and 'nodes_monitors_end' points
 * to the end of the list.
 *
 * There might be more than one entry per process in the list. If so,
 * they are located in sequence. The 'nodes_monitors' field of the
 * process struct refers to the first element in the sequence
 * corresponding to the process in question.
 */

static void
init_nodes_monitors(void)
{
    erts_mtx_init(&nodes_monitors_mtx, "nodes_monitors", NIL,
        ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION);
    nodes_monitors = NULL;
    no_nodes_monitors = 0;
}

Eterm
erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist)
{
    Eterm key, old_value, opts_list = olist;
    Uint opts = (Uint) 0;

    ASSERT(c_p);
    ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);

    if (on != am_true && on != am_false)
	return THE_NON_VALUE;

    if (is_not_nil(opts_list)) {
	int all = 0, visible = 0, hidden = 0;

	while (is_list(opts_list)) {
	    Eterm *cp = list_val(opts_list);
	    Eterm opt = CAR(cp);
	    opts_list = CDR(cp);
	    if (opt == am_nodedown_reason)
		opts |= ERTS_NODES_MON_OPT_DOWN_REASON;
	    else if (is_tuple(opt)) {
		Eterm* tp = tuple_val(opt);
		if (arityval(tp[0]) != 2)
		    return THE_NON_VALUE;
		switch (tp[1]) {
		case am_node_type:
		    switch (tp[2]) {
		    case am_visible:
			if (hidden || all)
			    return THE_NON_VALUE;
			opts |= ERTS_NODES_MON_OPT_TYPE_VISIBLE;
			visible = 1;
			break;
		    case am_hidden:
			if (visible || all)
			    return THE_NON_VALUE;
			opts |= ERTS_NODES_MON_OPT_TYPE_HIDDEN;
			hidden = 1;
			break;
		    case am_all:
			if (visible || hidden)
			    return THE_NON_VALUE;
			opts |= ERTS_NODES_MON_OPT_TYPES;
			all = 1;
			break;
		    default:
			return THE_NON_VALUE;
		    }
		    break;
		default:
		    return THE_NON_VALUE;
		}
	    }
	    else {
		return THE_NON_VALUE;
	    }
	}

	if (is_not_nil(opts_list))
	    return THE_NON_VALUE;
    }

    key = make_small(opts);

    if (on == am_true) {
        ErtsMonitorDataExtended *mdep;
        ErtsMonitor *omon;
        int created;
        omon = erts_monitor_tree_lookup_create(&ERTS_P_MONITORS(c_p),
                                               &created,
                                               ERTS_MON_TYPE_NODES,
                                               c_p->common.id,
                                               key);
        mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(omon);
        if (created) {
            erts_mtx_lock(&nodes_monitors_mtx);
            no_nodes_monitors++;
            erts_monitor_list_insert(&nodes_monitors, &mdep->md.target);
            erts_mtx_unlock(&nodes_monitors_mtx);
        }
        old_value = mdep->u.refc;
        mdep->u.refc++;
    }
    else {
        ErtsMonitorDataExtended *mdep;
        ErtsMonitor *omon;
        omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p), key);
        if (!omon)
            old_value = 0;
        else {
            mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(omon);
            old_value = mdep->u.refc;
            ASSERT(mdep->u.refc > 0);
            erts_mtx_lock(&nodes_monitors_mtx);
            ASSERT(no_nodes_monitors > 0);
            no_nodes_monitors--;
            ASSERT(erts_monitor_is_in_table(&mdep->md.target));
            erts_monitor_list_delete(&nodes_monitors, &mdep->md.target);
            erts_mtx_unlock(&nodes_monitors_mtx);
            erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p), omon);
            erts_monitor_release_both((ErtsMonitorData *) mdep);
        }
    }

    return erts_make_integer(old_value, c_p);
}

void
erts_monitor_nodes_delete(ErtsMonitor *omon)
{
    ErtsMonitorData *mdp;

    ASSERT(omon->type == ERTS_MON_TYPE_NODES);
    ASSERT(erts_monitor_is_origin(omon));

    mdp = erts_monitor_to_data(omon);

    erts_mtx_lock(&nodes_monitors_mtx);
    ASSERT(erts_monitor_is_in_table(&mdp->target));
    ASSERT(no_nodes_monitors > 0);
    no_nodes_monitors--;
    erts_monitor_list_delete(&nodes_monitors, &mdp->target);
    erts_mtx_unlock(&nodes_monitors_mtx);
    erts_monitor_release_both(mdp);
}

typedef struct {
    Eterm pid;
    Eterm options;
} ErtsNodesMonitorData;

typedef struct {
    ErtsNodesMonitorData *nmdp;
    Uint i;
} ErtsNodesMonitorContext;

static void
save_nodes_monitor(ErtsMonitor *mon, void *vctxt)
{
    ErtsNodesMonitorContext *ctxt = vctxt;
    ErtsMonitorData *mdp = erts_monitor_to_data(mon);

    ASSERT(erts_monitor_is_target(mon));
    ASSERT(mon->type == ERTS_MON_TYPE_NODES);

    ctxt->nmdp[ctxt->i].pid = mon->other.item;
    ctxt->nmdp[ctxt->i].options = mdp->origin.other.item;

    ctxt->i++;
}

static void
send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reason)
{
    Uint opts;
    Uint i, no, reason_size;
    ErtsNodesMonitorData def_buf[100];
    ErtsNodesMonitorData *nmdp = &def_buf[0];
    ErtsNodesMonitorContext ctxt;

    ASSERT(is_immed(what));
    ASSERT(is_immed(node));
    ASSERT(is_immed(type));
#ifdef USE_VM_PROBES
    if (DTRACE_ENABLED(dist_monitor)) {
        DTRACE_CHARBUF(what_str, 12);
        DTRACE_CHARBUF(node_str, 64);
        DTRACE_CHARBUF(type_str, 12);
        DTRACE_CHARBUF(reason_str, 64);

        erts_snprintf(what_str, sizeof(DTRACE_CHARBUF_NAME(what_str)), "%T", what);
        erts_snprintf(node_str, sizeof(DTRACE_CHARBUF_NAME(node_str)), "%T", node);
        erts_snprintf(type_str, sizeof(DTRACE_CHARBUF_NAME(type_str)), "%T", type);
        erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)), "%T", reason);
        DTRACE5(dist_monitor, erts_this_node_sysname,
                what_str, node_str, type_str, reason_str);
    }
#endif

    ctxt.i = 0;

    reason_size = is_immed(reason) ? 0 : size_object(reason);

    erts_mtx_lock(&nodes_monitors_mtx);
    if (no_nodes_monitors > sizeof(def_buf)/sizeof(def_buf[0]))
        nmdp = erts_alloc(ERTS_ALC_T_TMP,
                          no_nodes_monitors*sizeof(ErtsNodesMonitorData));
    ctxt.nmdp = nmdp;
    erts_monitor_list_foreach(nodes_monitors,
                              save_nodes_monitor,
                              (void *) &ctxt);
    
    ASSERT(ctxt.i == no_nodes_monitors);
    no = no_nodes_monitors;

    erts_mtx_unlock(&nodes_monitors_mtx);

    for (i = 0; i < no; i++) {
        Eterm tmp_heap[3+2+3+2+4 /* max need */];
        Eterm *hp, msg;
        Uint hsz;

        ASSERT(is_small(nmdp[i].options));
        opts = (Uint) signed_val(nmdp[i].options);
	if (!opts) {
	    if (type != am_visible)
		continue;
	}
	else {
	    switch (type) {
	    case am_hidden:
		if (!(opts & ERTS_NODES_MON_OPT_TYPE_HIDDEN))
		    continue;
		break;
	    case am_visible:
		if ((opts & ERTS_NODES_MON_OPT_TYPES)
		    && !(opts & ERTS_NODES_MON_OPT_TYPE_VISIBLE))
		    continue;
		break;
	    default:
		erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
	    }
	}

        hsz = 0;
        hp = &tmp_heap[0];

        if (!opts) {
            msg = TUPLE2(hp, what, node);
            hp += 3;
        }
        else {
            Eterm tup;
            Eterm info = NIL;

            if (opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE
                        | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) {

                tup = TUPLE2(hp, am_node_type, type);
                hp += 3;
                info = CONS(hp, tup, info);
                hp += 2;
            }

            if (what == am_nodedown
                && (opts & ERTS_NODES_MON_OPT_DOWN_REASON)) {
                hsz += reason_size;
                tup = TUPLE2(hp, am_nodedown_reason, reason);
                hp += 3;
                info = CONS(hp, tup, info);
                hp += 2;
            }

            msg = TUPLE3(hp, what, node, info);
            hp += 4;
        }

        ASSERT(hp - &tmp_heap[0] <= sizeof(tmp_heap)/sizeof(tmp_heap[0]));

        hsz += hp - &tmp_heap[0];

        erts_proc_sig_send_persistent_monitor_msg(ERTS_MON_TYPE_NODES,
                                                  nmdp[i].options,
                                                  am_system,
                                                  nmdp[i].pid,
                                                  msg,
                                                  hsz);
    }

    if (nmdp != &def_buf[0])
        erts_free(ERTS_ALC_T_TMP, nmdp);
}
                           

typedef struct {
    Eterm **hpp;
    Uint *szp;
    Eterm res;
} ErtsNodesMonitorInfoContext;


static void
nodes_monitor_info(ErtsMonitor *mon, void *vctxt)
{
    ErtsMonitorDataExtended *mdep;
    ErtsNodesMonitorInfoContext *ctxt = vctxt;
    Uint no, i, opts, *szp;
    Eterm **hpp, res;

    hpp = ctxt->hpp;
    szp = ctxt->szp;
    res = ctxt->res;

    ASSERT(erts_monitor_is_target(mon));
    ASSERT(mon->type == ERTS_MON_TYPE_NODES);
    mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon);
    no = mdep->u.refc;

    ASSERT(is_small(mdep->md.origin.other.item));
    opts = (Uint) signed_val(mdep->md.origin.other.item);

    for (i = 0; i < no; i++) {
        Eterm olist = NIL;
        if (opts & ERTS_NODES_MON_OPT_TYPES) {
            Eterm type;
            switch (opts & ERTS_NODES_MON_OPT_TYPES) {
            case ERTS_NODES_MON_OPT_TYPES:        type = am_all;     break;
            case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break;
            case ERTS_NODES_MON_OPT_TYPE_HIDDEN:  type = am_hidden;  break;
            default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
            }
            olist = erts_bld_cons(hpp, szp, 
                                  erts_bld_tuple(hpp, szp, 2,
                                                 am_node_type,
                                                 type),
                                  olist);
        }
        if (opts & ERTS_NODES_MON_OPT_DOWN_REASON)
            olist = erts_bld_cons(hpp, szp, am_nodedown_reason, olist);
        res = erts_bld_cons(hpp, szp,
                            erts_bld_tuple(hpp, szp, 2,
                                           mon->other.item,
                                           olist),
                            res);
    }

    ctxt->hpp = hpp;
    ctxt->szp = szp;
    ctxt->res = res;
}

Eterm
erts_processes_monitoring_nodes(Process *c_p)
{
    /*
     * Note, this function is only used for debugging.
     */
    ErtsNodesMonitorInfoContext ctxt;
    Eterm *hp;
    Uint sz;
#ifdef DEBUG
    Eterm *hend;
#endif

    ASSERT(c_p);
    ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);

    erts_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN);
    erts_thr_progress_block();

    erts_mtx_lock(&nodes_monitors_mtx);

    sz = 0;
    ctxt.szp = &sz;
    ctxt.hpp = NULL;

    while (1) {
        ctxt.res = NIL;

        erts_monitor_list_foreach(nodes_monitors,
                                  nodes_monitor_info,
                                  (void *) &ctxt);

        if (ctxt.hpp)
            break;

	hp = HAlloc(c_p, sz);
#ifdef DEBUG
	hend = hp + sz;
#endif
	ctxt.hpp = &hp;
	ctxt.szp = NULL;
    }

    ASSERT(hp == hend);

    erts_mtx_unlock(&nodes_monitors_mtx);

    erts_thr_progress_unblock();
    erts_proc_lock(c_p, ERTS_PROC_LOCK_MAIN);

    return ctxt.res;
}