1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
|
/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2001-2018. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* %CopyrightEnd%
*/
#ifndef ERL_NODE_TABLES_BASIC__
#define ERL_NODE_TABLES_BASIC__
typedef struct dist_entry_ DistEntry;
typedef struct ErtsDistOutputBuf_ ErtsDistOutputBuf;
void erts_ref_dist_entry(DistEntry *dep);
void erts_deref_dist_entry(DistEntry *dep);
#endif
#if !defined(ERL_NODE_TABLES_BASIC_ONLY) && !defined(ERL_NODE_TABLES_H__)
#define ERL_NODE_TABLES_H__
/*
* The "node_tables module" contain two (hash) tables: the node_table
* and the dist_table.
*
* The elements of the node_table represents a specific incarnation of
* an Erlang node and has {Nodename, Creation} pairs as keys. Elements
* in the node_table are referred to from node containers (see
* node_container_utils.h).
*
* The elements of the dist_table represents a (potential) connection
* to an Erlang node and has Nodename as key. Elements in the
* dist_table are either referred to from elements in the node_table
* or from the process or port structure of the entity controlling
* the connection.
*
* Both tables are garbage collected by reference counting.
*/
#include "sys.h"
#include "hash.h"
#include "erl_alloc.h"
#define ERTS_PORT_TASK_ONLY_BASIC_TYPES__
#include "erl_port_task.h"
#undef ERTS_PORT_TASK_ONLY_BASIC_TYPES__
#include "erl_process.h"
#define ERTS_BINARY_TYPES_ONLY__
#include "erl_binary.h"
#undef ERTS_BINARY_TYPES_ONLY__
#include "erl_monitor_link.h"
#define ERTS_NODE_TAB_DELAY_GC_DEFAULT (60)
#define ERTS_NODE_TAB_DELAY_GC_MAX (100*1000*1000)
#define ERTS_NODE_TAB_DELAY_GC_INFINITY (ERTS_NODE_TAB_DELAY_GC_MAX+1)
#define ERST_INTERNAL_CHANNEL_NO 0
enum dist_entry_state {
ERTS_DE_STATE_IDLE,
ERTS_DE_STATE_PENDING,
ERTS_DE_STATE_CONNECTED,
ERTS_DE_STATE_EXITING
};
#define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0)
#define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1)
#define ERTS_DE_QFLG_REQ_INFO (((erts_aint32_t) 1) << 2)
#define ERTS_DE_QFLG_PORT_CTRL (((erts_aint32_t) 1) << 3)
#define ERTS_DE_QFLG_PROC_CTRL (((erts_aint32_t) 1) << 4)
#define ERTS_DE_QFLGS_ALL (ERTS_DE_QFLG_BUSY \
| ERTS_DE_QFLG_EXIT \
| ERTS_DE_QFLG_REQ_INFO \
| ERTS_DE_QFLG_PORT_CTRL \
| ERTS_DE_QFLG_PROC_CTRL)
#if defined(ARCH_64)
#define ERTS_DIST_OUTPUT_BUF_DBG_PATTERN ((Uint) 0xf713f713f713f713UL)
#else
#define ERTS_DIST_OUTPUT_BUF_DBG_PATTERN ((Uint) 0xf713f713)
#endif
struct ErtsDistOutputBuf_ {
#ifdef DEBUG
Uint dbg_pattern;
byte *alloc_endp;
#endif
ErtsDistOutputBuf *next;
Binary *bin;
/* Pointers to the distribution header,
if NULL the distr header is in the extp */
byte *hdrp;
byte *hdr_endp;
/* Pointers to the ctl + payload */
byte *extp;
byte *ext_endp;
/* Start of payload and hopefull_flags, used by transcode */
Uint hopefull_flags;
byte *msg_start;
/* start of the ext buffer, this is not always the same as extp
as the atom cache handling can use less then the allotted buffer.
This value is needed to calculate the size of this output buffer.*/
byte *ext_start;
};
typedef struct {
ErtsDistOutputBuf *first;
ErtsDistOutputBuf *last;
} ErtsDistOutputQueue;
struct ErtsProcList_;
/*
* Lock order:
* 1. dist_entry->rwmtx
* 2. erts_node_table_rwmtx
* 3. erts_dist_table_rwmtx
*
* Lock mutexes with lower numbers before mutexes with higher numbers and
* unlock mutexes with higher numbers before mutexes with higher numbers.
*/
struct dist_entry_ {
HashBucket hash_bucket; /* Hash bucket */
struct dist_entry_ *next; /* Next entry in dist_table (not sorted) */
struct dist_entry_ *prev; /* Previous entry in dist_table (not sorted) */
erts_rwmtx_t rwmtx; /* Protects all fields below until lck_mtx. */
Eterm sysname; /* name@host atom for efficiency */
Uint32 creation; /* creation of connected node */
erts_atomic_t input_handler; /* Input handler */
Eterm cid; /* connection handler (pid or port),
NIL == free */
Uint32 connection_id; /* Connection id incremented on connect */
enum dist_entry_state state;
Uint32 flags; /* Distribution flags, like hidden,
atom cache etc. */
unsigned long version; /* Protocol version */
ErtsMonLnkDist *mld; /* Monitors and links */
erts_mtx_t qlock; /* Protects qflgs and out_queue */
erts_atomic32_t qflgs;
erts_atomic_t qsize;
erts_atomic64_t in;
erts_atomic64_t out;
ErtsDistOutputQueue out_queue;
struct ErtsProcList_ *suspended;
ErtsDistOutputQueue tmp_out_queue;
ErtsDistOutputQueue finalized_out_queue;
erts_atomic_t dist_cmd_scheduled;
ErtsPortTaskHandle dist_cmd;
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
struct cache* cache; /* The atom cache */
ErtsThrPrgrLaterOp later_op;
struct transcode_context* transcode_ctx;
struct dist_sequences *sequences; /* Ongoing distribution sequences */
};
/*
#define ERL_NODE_BOOKKEEP
* Bookkeeping of ErlNode inc and dec operations to help debug refc problems.
* This is best used together with cerl -rr. Type the below into gdb:
* gdb:
set pagination off
set $i = 0
set $node = referred_nodes[$node_ix].node
while $i < $node->slot.counter
printf "%p: ", $node->books[$i].term
etp-1 $node->books[$i].who
printf " "
p $node->books[$i].what
set $i++
end
* Then save that into a file called test.txt and run the below in
* an erlang shell in order to get all inc/dec that do not have a
* match.
f(), {ok, B} = file:read_file("test.txt").
Vs = [begin [Val, _, _, _, What] = All = string:lexemes(Ln, " "),{Val,What,All} end || Ln <- string:lexemes(B,"\n")].
Accs = lists:foldl(fun({V,<<"ERL_NODE_INC">>,_},M) -> Val = maps:get(V,M,0), M#{ V => Val + 1 }; ({V,<<"ERL_NODE_DEC">>,_},M) -> Val = maps:get(V,M,0), M#{ V => Val - 1 } end, #{}, Vs).
lists:usort(lists:filter(fun({V,N}) -> N /= 0 end, maps:to_list(Accs))).
* There are bound to be bugs in the the instrumentation code, but
* atleast this is a place to start when hunting refc bugs.
*
*/
#ifdef ERL_NODE_BOOKKEEP
struct erl_node_bookkeeping {
Eterm who;
Eterm term;
enum { ERL_NODE_INC, ERL_NODE_DEC } what;
};
#endif
typedef struct erl_node_ {
HashBucket hash_bucket; /* Hash bucket */
erts_refc_t refc; /* Reference count */
Eterm sysname; /* name@host atom for efficiency */
Uint32 creation; /* Creation */
DistEntry *dist_entry; /* Corresponding dist entry */
#ifdef ERL_NODE_BOOKKEEP
struct erl_node_bookkeeping books[1024];
erts_atomic_t slot;
#endif
} ErlNode;
extern Hash erts_dist_table;
extern Hash erts_node_table;
extern erts_rwmtx_t erts_dist_table_rwmtx;
extern erts_rwmtx_t erts_node_table_rwmtx;
extern DistEntry *erts_hidden_dist_entries;
extern DistEntry *erts_visible_dist_entries;
extern DistEntry *erts_pending_dist_entries;
extern DistEntry *erts_not_connected_dist_entries;
extern Sint erts_no_of_hidden_dist_entries;
extern Sint erts_no_of_visible_dist_entries;
extern Sint erts_no_of_pending_dist_entries;
extern Sint erts_no_of_not_connected_dist_entries;
extern DistEntry *erts_this_dist_entry;
extern ErlNode *erts_this_node;
extern char *erts_this_node_sysname; /* must match erl_node_tables.c */
Uint erts_delayed_node_table_gc(void);
DistEntry *erts_channel_no_to_dist_entry(Uint);
DistEntry *erts_sysname_to_connected_dist_entry(Eterm);
DistEntry *erts_find_or_insert_dist_entry(Eterm);
DistEntry *erts_find_dist_entry(Eterm);
void erts_schedule_delete_dist_entry(DistEntry *);
Uint erts_dist_table_size(void);
void erts_dist_table_info(fmtfn_t, void *);
void erts_set_dist_entry_not_connected(DistEntry *);
void erts_set_dist_entry_pending(DistEntry *);
void erts_set_dist_entry_connected(DistEntry *, Eterm, Uint);
ErlNode *erts_find_or_insert_node(Eterm, Uint32, Eterm);
void erts_schedule_delete_node(ErlNode *);
void erts_set_this_node(Eterm, Uint);
Uint erts_node_table_size(void);
void erts_init_node_tables(int);
void erts_node_table_info(fmtfn_t, void *);
void erts_print_node_info(fmtfn_t, void *, Eterm, int*, int*);
Eterm erts_get_node_and_dist_references(struct process *);
#if defined(ERTS_ENABLE_LOCK_CHECK)
int erts_lc_is_de_rwlocked(DistEntry *);
int erts_lc_is_de_rlocked(DistEntry *);
#endif
int erts_dist_entry_destructor(Binary *bin);
DistEntry *erts_dhandle_to_dist_entry(Eterm dhandle, Uint32* connection_id);
#define ERTS_DHANDLE_SIZE (3+ERTS_MAGIC_REF_THING_SIZE)
Eterm erts_build_dhandle(Eterm **hpp, ErlOffHeap*, DistEntry*, Uint32 conn_id);
Eterm erts_make_dhandle(Process *c_p, DistEntry*, Uint32 conn_id);
ERTS_GLB_INLINE void erts_init_node_entry(ErlNode *np, erts_aint_t val);
ERTS_GLB_INLINE erts_aint_t erts_ref_node_entry(ErlNode *np, int min_val, Eterm term);
ERTS_GLB_INLINE void erts_deref_node_entry(ErlNode *np, Eterm term);
ERTS_GLB_INLINE void erts_de_rlock(DistEntry *dep);
ERTS_GLB_INLINE void erts_de_runlock(DistEntry *dep);
ERTS_GLB_INLINE void erts_de_rwlock(DistEntry *dep);
ERTS_GLB_INLINE void erts_de_rwunlock(DistEntry *dep);
#ifdef ERL_NODE_BOOKKEEP
void erts_node_bookkeep(ErlNode *, Eterm , int);
#else
#define erts_node_bookkeep(...)
#endif
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void
erts_init_node_entry(ErlNode *np, erts_aint_t val)
{
erts_refc_init(&np->refc, val);
}
ERTS_GLB_INLINE erts_aint_t
erts_ref_node_entry(ErlNode *np, int min_val, Eterm term)
{
erts_node_bookkeep(np, term, ERL_NODE_INC);
return erts_refc_inctest(&np->refc, min_val);
}
ERTS_GLB_INLINE void
erts_deref_node_entry(ErlNode *np, Eterm term)
{
erts_node_bookkeep(np, term, ERL_NODE_DEC);
if (erts_refc_dectest(&np->refc, 0) == 0)
erts_schedule_delete_node(np);
}
ERTS_GLB_INLINE void
erts_de_rlock(DistEntry *dep)
{
erts_rwmtx_rlock(&dep->rwmtx);
}
ERTS_GLB_INLINE void
erts_de_runlock(DistEntry *dep)
{
erts_rwmtx_runlock(&dep->rwmtx);
}
ERTS_GLB_INLINE void
erts_de_rwlock(DistEntry *dep)
{
erts_rwmtx_rwlock(&dep->rwmtx);
}
ERTS_GLB_INLINE void
erts_de_rwunlock(DistEntry *dep)
{
erts_rwmtx_rwunlock(&dep->rwmtx);
}
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
void erts_debug_test_node_tab_delayed_delete(Sint64 millisecs);
void erts_lcnt_update_distribution_locks(int enable);
#endif
|