Line data Source code
1 : /*
2 : Unix SMB/CIFS implementation.
3 : Samba internal messaging functions
4 : Copyright (C) Andrew Tridgell 2000
5 : Copyright (C) 2001 by Martin Pool
6 : Copyright (C) 2002 by Jeremy Allison
7 : Copyright (C) 2007 by Volker Lendecke
8 :
9 : This program is free software; you can redistribute it and/or modify
10 : it under the terms of the GNU General Public License as published by
11 : the Free Software Foundation; either version 3 of the License, or
12 : (at your option) any later version.
13 :
14 : This program is distributed in the hope that it will be useful,
15 : but WITHOUT ANY WARRANTY; without even the implied warranty of
16 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 : GNU General Public License for more details.
18 :
19 : You should have received a copy of the GNU General Public License
20 : along with this program. If not, see <http://www.gnu.org/licenses/>.
21 : */
22 :
23 : /**
24 : @defgroup messages Internal messaging framework
25 : @{
26 : @file messages.c
27 :
28 : @brief Module for internal messaging between Samba daemons.
29 :
30 : The idea is that if a part of Samba wants to do communication with
31 : another Samba process then it will do a message_register() of a
32 : dispatch function, and use message_send_pid() to send messages to
33 : that process.
34 :
35 : The dispatch function is given the pid of the sender, and it can
36 : use that to reply by message_send_pid(). See ping_message() for a
37 : simple example.
38 :
39 : @caution Dispatch functions must be able to cope with incoming
40 : messages on an *odd* byte boundary.
41 :
42 : This system doesn't have any inherent size limitations but is not
43 : very efficient for large messages or when messages are sent in very
44 : quick succession.
45 :
46 : */
47 :
48 : #include "includes.h"
49 : #include "lib/util/server_id.h"
50 : #include "dbwrap/dbwrap.h"
51 : #include "serverid.h"
52 : #include "messages.h"
53 : #include "lib/util/tevent_unix.h"
54 : #include "lib/background.h"
55 : #include "lib/messaging/messages_dgm.h"
56 : #include "lib/util/iov_buf.h"
57 : #include "lib/util/server_id_db.h"
58 : #include "lib/messaging/messages_dgm_ref.h"
59 : #include "lib/messages_ctdb.h"
60 : #include "lib/messages_ctdb_ref.h"
61 : #include "lib/messages_util.h"
62 : #include "cluster_support.h"
63 : #include "ctdbd_conn.h"
64 : #include "ctdb_srvids.h"
65 :
66 : #ifdef CLUSTER_SUPPORT
67 : #include "ctdb_protocol.h"
68 : #endif
69 :
70 : struct messaging_callback {
71 : struct messaging_callback *prev, *next;
72 : uint32_t msg_type;
73 : void (*fn)(struct messaging_context *msg, void *private_data,
74 : uint32_t msg_type,
75 : struct server_id server_id, DATA_BLOB *data);
76 : void *private_data;
77 : };
78 :
79 : struct messaging_registered_ev {
80 : struct tevent_context *ev;
81 : struct tevent_immediate *im;
82 : size_t refcount;
83 : };
84 :
85 : struct messaging_context {
86 : struct server_id id;
87 : struct tevent_context *event_ctx;
88 : struct messaging_callback *callbacks;
89 :
90 : struct messaging_rec *posted_msgs;
91 :
92 : struct messaging_registered_ev *event_contexts;
93 :
94 : struct tevent_req **new_waiters;
95 : size_t num_new_waiters;
96 :
97 : struct tevent_req **waiters;
98 : size_t num_waiters;
99 :
100 : struct server_id_db *names_db;
101 :
102 : TALLOC_CTX *per_process_talloc_ctx;
103 : };
104 :
105 : static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
106 : struct messaging_rec *rec);
107 : static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
108 : struct messaging_rec *rec);
109 : static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
110 : struct tevent_context *ev,
111 : struct messaging_rec *rec);
112 : static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
113 : struct tevent_context *ev,
114 : struct messaging_rec *rec);
115 :
116 : /****************************************************************************
117 : A useful function for testing the message system.
118 : ****************************************************************************/
119 :
120 105 : static void ping_message(struct messaging_context *msg_ctx,
121 : void *private_data,
122 : uint32_t msg_type,
123 : struct server_id src,
124 : DATA_BLOB *data)
125 : {
126 105 : struct server_id_buf idbuf;
127 :
128 210 : DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
129 : server_id_str_buf(src, &idbuf), (int)data->length,
130 : data->data ? (char *)data->data : ""));
131 :
132 105 : messaging_send(msg_ctx, src, MSG_PONG, data);
133 105 : }
134 :
135 172797 : struct messaging_rec *messaging_rec_create(
136 : TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
137 : uint32_t msg_type, const struct iovec *iov, int iovlen,
138 : const int *fds, size_t num_fds)
139 : {
140 369 : ssize_t buflen;
141 369 : uint8_t *buf;
142 369 : struct messaging_rec *result;
143 :
144 172797 : if (num_fds > INT8_MAX) {
145 0 : return NULL;
146 : }
147 :
148 172797 : buflen = iov_buflen(iov, iovlen);
149 172797 : if (buflen == -1) {
150 0 : return NULL;
151 : }
152 172797 : buf = talloc_array(mem_ctx, uint8_t, buflen);
153 172797 : if (buf == NULL) {
154 0 : return NULL;
155 : }
156 172797 : iov_buf(iov, iovlen, buf, buflen);
157 :
158 172797 : {
159 369 : struct messaging_rec rec;
160 172797 : int64_t fds64[MAX(1, num_fds)];
161 369 : size_t i;
162 :
163 172798 : for (i=0; i<num_fds; i++) {
164 1 : fds64[i] = fds[i];
165 : }
166 :
167 172797 : rec = (struct messaging_rec) {
168 : .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
169 : .src = src, .dest = dst,
170 : .buf.data = buf, .buf.length = buflen,
171 : .num_fds = num_fds, .fds = fds64,
172 : };
173 :
174 172797 : result = messaging_rec_dup(mem_ctx, &rec);
175 : }
176 :
177 172797 : TALLOC_FREE(buf);
178 :
179 172797 : return result;
180 : }
181 :
182 142912 : static bool messaging_register_event_context(struct messaging_context *ctx,
183 : struct tevent_context *ev)
184 : {
185 3937 : size_t i, num_event_contexts;
186 142912 : struct messaging_registered_ev *free_reg = NULL;
187 3937 : struct messaging_registered_ev *tmp;
188 :
189 142912 : num_event_contexts = talloc_array_length(ctx->event_contexts);
190 :
191 142945 : for (i=0; i<num_event_contexts; i++) {
192 137688 : struct messaging_registered_ev *reg = &ctx->event_contexts[i];
193 :
194 137688 : if (reg->refcount == 0) {
195 6 : if (reg->ev != NULL) {
196 0 : abort();
197 : }
198 6 : free_reg = reg;
199 : /*
200 : * We continue here and may find another
201 : * free_req, but the important thing is
202 : * that we continue to search for an
203 : * existing registration in the loop.
204 : */
205 6 : continue;
206 : }
207 :
208 137682 : if (reg->ev == ev) {
209 137655 : reg->refcount += 1;
210 137655 : return true;
211 : }
212 : }
213 :
214 5257 : if (free_reg == NULL) {
215 5251 : struct tevent_immediate *im = NULL;
216 :
217 5251 : im = tevent_create_immediate(ctx);
218 5251 : if (im == NULL) {
219 0 : return false;
220 : }
221 :
222 5251 : tmp = talloc_realloc(ctx, ctx->event_contexts,
223 : struct messaging_registered_ev,
224 : num_event_contexts+1);
225 5251 : if (tmp == NULL) {
226 0 : return false;
227 : }
228 5251 : ctx->event_contexts = tmp;
229 :
230 5251 : free_reg = &ctx->event_contexts[num_event_contexts];
231 5251 : free_reg->im = talloc_move(ctx->event_contexts, &im);
232 : }
233 :
234 : /*
235 : * free_reg->im might be cached
236 : */
237 5257 : free_reg->ev = ev;
238 5257 : free_reg->refcount = 1;
239 :
240 5257 : return true;
241 : }
242 :
243 131140 : static bool messaging_deregister_event_context(struct messaging_context *ctx,
244 : struct tevent_context *ev)
245 : {
246 3665 : size_t i, num_event_contexts;
247 :
248 131140 : num_event_contexts = talloc_array_length(ctx->event_contexts);
249 :
250 131167 : for (i=0; i<num_event_contexts; i++) {
251 131167 : struct messaging_registered_ev *reg = &ctx->event_contexts[i];
252 :
253 131167 : if (reg->refcount == 0) {
254 0 : continue;
255 : }
256 :
257 131167 : if (reg->ev == ev) {
258 131140 : reg->refcount -= 1;
259 :
260 131140 : if (reg->refcount == 0) {
261 : /*
262 : * The primary event context
263 : * is never unregistered using
264 : * messaging_deregister_event_context()
265 : * it's only registered using
266 : * messaging_register_event_context().
267 : */
268 27 : SMB_ASSERT(ev != ctx->event_ctx);
269 27 : SMB_ASSERT(reg->ev != ctx->event_ctx);
270 :
271 : /*
272 : * Not strictly necessary, just
273 : * paranoia
274 : */
275 27 : reg->ev = NULL;
276 :
277 : /*
278 : * Do not talloc_free(reg->im),
279 : * recycle immediates events.
280 : *
281 : * We just invalidate it using
282 : * the primary event context,
283 : * which is never unregistered.
284 : */
285 27 : tevent_schedule_immediate(reg->im,
286 : ctx->event_ctx,
287 3665 : NULL, NULL);
288 : }
289 131140 : return true;
290 : }
291 : }
292 0 : return false;
293 : }
294 :
295 172309 : static void messaging_post_main_event_context(struct tevent_context *ev,
296 : struct tevent_immediate *im,
297 : void *private_data)
298 : {
299 172309 : struct messaging_context *ctx = talloc_get_type_abort(
300 : private_data, struct messaging_context);
301 :
302 373418 : while (ctx->posted_msgs != NULL) {
303 172742 : struct messaging_rec *rec = ctx->posted_msgs;
304 366 : bool consumed;
305 :
306 172742 : DLIST_REMOVE(ctx->posted_msgs, rec);
307 :
308 172742 : consumed = messaging_dispatch_classic(ctx, rec);
309 172738 : if (!consumed) {
310 22086 : consumed = messaging_dispatch_waiters(
311 : ctx, ctx->event_ctx, rec);
312 : }
313 :
314 172738 : if (!consumed) {
315 : uint8_t i;
316 :
317 20641 : for (i=0; i<rec->num_fds; i++) {
318 0 : close(rec->fds[i]);
319 : }
320 : }
321 :
322 173091 : TALLOC_FREE(rec);
323 : }
324 172305 : }
325 :
326 0 : static void messaging_post_sub_event_context(struct tevent_context *ev,
327 : struct tevent_immediate *im,
328 : void *private_data)
329 : {
330 0 : struct messaging_context *ctx = talloc_get_type_abort(
331 : private_data, struct messaging_context);
332 0 : struct messaging_rec *rec, *next;
333 :
334 0 : for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
335 0 : bool consumed;
336 :
337 0 : next = rec->next;
338 :
339 0 : consumed = messaging_dispatch_waiters(ctx, ev, rec);
340 0 : if (consumed) {
341 0 : DLIST_REMOVE(ctx->posted_msgs, rec);
342 0 : TALLOC_FREE(rec);
343 : }
344 : }
345 0 : }
346 :
347 172797 : static bool messaging_alert_event_contexts(struct messaging_context *ctx)
348 : {
349 369 : size_t i, num_event_contexts;
350 :
351 172797 : num_event_contexts = talloc_array_length(ctx->event_contexts);
352 :
353 345617 : for (i=0; i<num_event_contexts; i++) {
354 172820 : struct messaging_registered_ev *reg = &ctx->event_contexts[i];
355 :
356 172820 : if (reg->refcount == 0) {
357 19 : continue;
358 : }
359 :
360 : /*
361 : * We depend on schedule_immediate to work
362 : * multiple times. Might be a bit inefficient,
363 : * but this needs to be proven in tests. The
364 : * alternatively would be to track whether the
365 : * immediate has already been scheduled. For
366 : * now, avoid that complexity here.
367 : */
368 :
369 172801 : if (reg->ev == ctx->event_ctx) {
370 172797 : tevent_schedule_immediate(
371 : reg->im, reg->ev,
372 : messaging_post_main_event_context,
373 369 : ctx);
374 : } else {
375 4 : tevent_schedule_immediate(
376 : reg->im, reg->ev,
377 : messaging_post_sub_event_context,
378 371 : ctx);
379 : }
380 :
381 : }
382 172797 : return true;
383 : }
384 :
385 85830 : static void messaging_recv_cb(struct tevent_context *ev,
386 : const uint8_t *msg, size_t msg_len,
387 : int *fds, size_t num_fds,
388 : void *private_data)
389 85830 : {
390 85830 : struct messaging_context *msg_ctx = talloc_get_type_abort(
391 : private_data, struct messaging_context);
392 345 : struct server_id_buf idbuf;
393 345 : struct messaging_rec rec;
394 85830 : int64_t fds64[MAX(1, MIN(num_fds, INT8_MAX))];
395 345 : size_t i;
396 :
397 85830 : if (msg_len < MESSAGE_HDR_LENGTH) {
398 0 : DBG_WARNING("message too short: %zu\n", msg_len);
399 0 : return;
400 : }
401 :
402 85830 : if (num_fds > INT8_MAX) {
403 0 : DBG_WARNING("too many fds: %zu\n", num_fds);
404 0 : return;
405 : }
406 :
407 121672 : for (i=0; i < num_fds; i++) {
408 35842 : fds64[i] = fds[i];
409 : }
410 :
411 85830 : rec = (struct messaging_rec) {
412 : .msg_version = MESSAGE_VERSION,
413 85830 : .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
414 85830 : .buf.length = msg_len - MESSAGE_HDR_LENGTH,
415 : .num_fds = num_fds,
416 : .fds = fds64,
417 : };
418 :
419 85830 : message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
420 :
421 85830 : DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
422 : (unsigned)rec.msg_type, rec.buf.length, num_fds,
423 : server_id_str_buf(rec.src, &idbuf));
424 :
425 85830 : if (server_id_same_process(&rec.src, &msg_ctx->id)) {
426 0 : DBG_DEBUG("Ignoring self-send\n");
427 0 : return;
428 : }
429 :
430 85830 : messaging_dispatch_rec(msg_ctx, ev, &rec);
431 :
432 122017 : for (i=0; i<num_fds; i++) {
433 35842 : fds[i] = fds64[i];
434 : }
435 : }
436 :
437 35964 : static int messaging_context_destructor(struct messaging_context *ctx)
438 : {
439 966 : size_t i;
440 :
441 199287 : for (i=0; i<ctx->num_new_waiters; i++) {
442 163323 : if (ctx->new_waiters[i] != NULL) {
443 33520 : tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
444 33520 : ctx->new_waiters[i] = NULL;
445 : }
446 : }
447 55954 : for (i=0; i<ctx->num_waiters; i++) {
448 19990 : if (ctx->waiters[i] != NULL) {
449 2451 : tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
450 2451 : ctx->waiters[i] = NULL;
451 : }
452 : }
453 :
454 : /*
455 : * The immediates from messaging_alert_event_contexts
456 : * reference "ctx". Don't let them outlive the
457 : * messaging_context we're destroying here.
458 : */
459 35964 : TALLOC_FREE(ctx->event_contexts);
460 :
461 35964 : return 0;
462 : }
463 :
464 36858 : static const char *private_path(const char *name)
465 : {
466 36858 : return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
467 : }
468 :
469 5230 : static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
470 : struct tevent_context *ev,
471 : struct messaging_context **pmsg_ctx)
472 : {
473 137 : TALLOC_CTX *frame;
474 137 : struct messaging_context *ctx;
475 137 : NTSTATUS status;
476 137 : int ret;
477 137 : const char *lck_path;
478 137 : const char *priv_path;
479 137 : void *ref;
480 137 : bool ok;
481 :
482 : /*
483 : * sec_init() *must* be called before any other
484 : * functions that use sec_XXX(). e.g. sec_initial_uid().
485 : */
486 :
487 5230 : sec_init();
488 :
489 5230 : lck_path = lock_path(talloc_tos(), "msg.lock");
490 5230 : if (lck_path == NULL) {
491 0 : return NT_STATUS_NO_MEMORY;
492 : }
493 :
494 5230 : ok = directory_create_or_exist_strict(lck_path,
495 : sec_initial_uid(),
496 : 0755);
497 5230 : if (!ok) {
498 0 : DBG_DEBUG("Could not create lock directory: %s\n",
499 : strerror(errno));
500 0 : return NT_STATUS_ACCESS_DENIED;
501 : }
502 :
503 5230 : priv_path = private_path("msg.sock");
504 5230 : if (priv_path == NULL) {
505 0 : return NT_STATUS_NO_MEMORY;
506 : }
507 :
508 5230 : ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
509 : 0700);
510 5230 : if (!ok) {
511 0 : DBG_DEBUG("Could not create msg directory: %s\n",
512 : strerror(errno));
513 0 : return NT_STATUS_ACCESS_DENIED;
514 : }
515 :
516 5230 : frame = talloc_stackframe();
517 5230 : if (frame == NULL) {
518 0 : return NT_STATUS_NO_MEMORY;
519 : }
520 :
521 5230 : ctx = talloc_zero(frame, struct messaging_context);
522 5230 : if (ctx == NULL) {
523 0 : status = NT_STATUS_NO_MEMORY;
524 0 : goto done;
525 : }
526 :
527 5367 : ctx->id = (struct server_id) {
528 5230 : .pid = tevent_cached_getpid(), .vnn = NONCLUSTER_VNN
529 : };
530 :
531 5230 : ctx->event_ctx = ev;
532 :
533 5230 : ctx->per_process_talloc_ctx = talloc_new(ctx);
534 5230 : if (ctx->per_process_talloc_ctx == NULL) {
535 0 : status = NT_STATUS_NO_MEMORY;
536 0 : goto done;
537 : }
538 :
539 5230 : ok = messaging_register_event_context(ctx, ev);
540 5230 : if (!ok) {
541 0 : status = NT_STATUS_NO_MEMORY;
542 0 : goto done;
543 : }
544 :
545 5230 : ref = messaging_dgm_ref(
546 : ctx->per_process_talloc_ctx,
547 : ctx->event_ctx,
548 : &ctx->id.unique_id,
549 : priv_path,
550 : lck_path,
551 : messaging_recv_cb,
552 : ctx,
553 : &ret);
554 5230 : if (ref == NULL) {
555 5 : DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
556 5 : status = map_nt_error_from_unix(ret);
557 5 : goto done;
558 : }
559 5225 : talloc_set_destructor(ctx, messaging_context_destructor);
560 :
561 : #ifdef CLUSTER_SUPPORT
562 : if (lp_clustering()) {
563 : ref = messaging_ctdb_ref(
564 : ctx->per_process_talloc_ctx,
565 : ctx->event_ctx,
566 : lp_ctdbd_socket(),
567 : lp_ctdb_timeout(),
568 : ctx->id.unique_id,
569 : messaging_recv_cb,
570 : ctx,
571 : &ret);
572 : if (ref == NULL) {
573 : DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
574 : strerror(ret));
575 : status = map_nt_error_from_unix(ret);
576 : goto done;
577 : }
578 : }
579 : #endif
580 :
581 5225 : ctx->id.vnn = get_my_vnn();
582 :
583 5225 : ctx->names_db = server_id_db_init(ctx,
584 : ctx->id,
585 : lp_lock_directory(),
586 : 0,
587 : TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
588 5225 : if (ctx->names_db == NULL) {
589 0 : DBG_DEBUG("server_id_db_init failed\n");
590 0 : status = NT_STATUS_NO_MEMORY;
591 0 : goto done;
592 : }
593 :
594 5225 : messaging_register(ctx, NULL, MSG_PING, ping_message);
595 :
596 : /* Register some debugging related messages */
597 :
598 5225 : register_msg_pool_usage(ctx->per_process_talloc_ctx, ctx);
599 5225 : register_dmalloc_msgs(ctx);
600 5225 : debug_register_msgs(ctx);
601 :
602 : {
603 132 : struct server_id_buf tmp;
604 5225 : DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
605 : }
606 :
607 5225 : *pmsg_ctx = talloc_steal(mem_ctx, ctx);
608 :
609 5225 : status = NT_STATUS_OK;
610 5230 : done:
611 5230 : TALLOC_FREE(frame);
612 :
613 5230 : return status;
614 : }
615 :
616 5230 : struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
617 : struct tevent_context *ev)
618 : {
619 5230 : struct messaging_context *ctx = NULL;
620 137 : NTSTATUS status;
621 :
622 5230 : status = messaging_init_internal(mem_ctx,
623 : ev,
624 : &ctx);
625 5230 : if (!NT_STATUS_IS_OK(status)) {
626 0 : return NULL;
627 : }
628 :
629 5225 : return ctx;
630 : }
631 :
632 5884932 : struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
633 : {
634 5884932 : return msg_ctx->id;
635 : }
636 :
637 : /*
638 : * re-init after a fork
639 : */
640 31628 : NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
641 : {
642 855 : int ret;
643 855 : char *lck_path;
644 855 : void *ref;
645 :
646 31628 : TALLOC_FREE(msg_ctx->per_process_talloc_ctx);
647 :
648 31628 : msg_ctx->per_process_talloc_ctx = talloc_new(msg_ctx);
649 31628 : if (msg_ctx->per_process_talloc_ctx == NULL) {
650 0 : return NT_STATUS_NO_MEMORY;
651 : }
652 :
653 32483 : msg_ctx->id = (struct server_id) {
654 31628 : .pid = tevent_cached_getpid(), .vnn = msg_ctx->id.vnn
655 : };
656 :
657 31628 : lck_path = lock_path(talloc_tos(), "msg.lock");
658 31628 : if (lck_path == NULL) {
659 0 : return NT_STATUS_NO_MEMORY;
660 : }
661 :
662 31628 : ref = messaging_dgm_ref(
663 : msg_ctx->per_process_talloc_ctx,
664 : msg_ctx->event_ctx,
665 : &msg_ctx->id.unique_id,
666 : private_path("msg.sock"),
667 : lck_path,
668 : messaging_recv_cb,
669 : msg_ctx,
670 : &ret);
671 :
672 31628 : if (ref == NULL) {
673 0 : DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
674 0 : return map_nt_error_from_unix(ret);
675 : }
676 :
677 31628 : if (lp_clustering()) {
678 0 : ref = messaging_ctdb_ref(
679 : msg_ctx->per_process_talloc_ctx,
680 : msg_ctx->event_ctx,
681 : lp_ctdbd_socket(),
682 : lp_ctdb_timeout(),
683 : msg_ctx->id.unique_id,
684 : messaging_recv_cb,
685 : msg_ctx,
686 : &ret);
687 0 : if (ref == NULL) {
688 0 : DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
689 : strerror(ret));
690 0 : return map_nt_error_from_unix(ret);
691 : }
692 : }
693 :
694 31628 : server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
695 31628 : register_msg_pool_usage(msg_ctx->per_process_talloc_ctx, msg_ctx);
696 :
697 31628 : return NT_STATUS_OK;
698 : }
699 :
700 :
701 : /*
702 : * Register a dispatch function for a particular message type. Allow multiple
703 : * registrants
704 : */
705 440082 : NTSTATUS messaging_register(struct messaging_context *msg_ctx,
706 : void *private_data,
707 : uint32_t msg_type,
708 : void (*fn)(struct messaging_context *msg,
709 : void *private_data,
710 : uint32_t msg_type,
711 : struct server_id server_id,
712 : DATA_BLOB *data))
713 : {
714 11086 : struct messaging_callback *cb;
715 :
716 440082 : DEBUG(5, ("Registering messaging pointer for type %u - "
717 : "private_data=%p\n",
718 : (unsigned)msg_type, private_data));
719 :
720 : /*
721 : * Only one callback per type
722 : */
723 :
724 9797500 : for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
725 : /* we allow a second registration of the same message
726 : type if it has a different private pointer. This is
727 : needed in, for example, the internal notify code,
728 : which creates a new notify context for each tree
729 : connect, and expects to receive messages to each of
730 : them. */
731 9388841 : if (cb->msg_type == msg_type && private_data == cb->private_data) {
732 31423 : DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
733 : (unsigned)msg_type, private_data));
734 31423 : cb->fn = fn;
735 31423 : cb->private_data = private_data;
736 31423 : return NT_STATUS_OK;
737 : }
738 : }
739 :
740 408659 : if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
741 0 : return NT_STATUS_NO_MEMORY;
742 : }
743 :
744 408659 : cb->msg_type = msg_type;
745 408659 : cb->fn = fn;
746 408659 : cb->private_data = private_data;
747 :
748 408659 : DLIST_ADD(msg_ctx->callbacks, cb);
749 408659 : return NT_STATUS_OK;
750 : }
751 :
752 : /*
753 : De-register the function for a particular message type.
754 : */
755 177689 : void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
756 : void *private_data)
757 : {
758 4554 : struct messaging_callback *cb, *next;
759 :
760 4804149 : for (cb = ctx->callbacks; cb; cb = next) {
761 4626460 : next = cb->next;
762 4626460 : if ((cb->msg_type == msg_type)
763 177689 : && (cb->private_data == private_data)) {
764 177689 : DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
765 : (unsigned)msg_type, private_data));
766 177689 : DLIST_REMOVE(ctx->callbacks, cb);
767 177689 : TALLOC_FREE(cb);
768 : }
769 : }
770 177689 : }
771 :
772 : /*
773 : Send a message to a particular server
774 : */
775 225676 : NTSTATUS messaging_send(struct messaging_context *msg_ctx,
776 : struct server_id server, uint32_t msg_type,
777 : const DATA_BLOB *data)
778 : {
779 225676 : struct iovec iov = {0};
780 :
781 225676 : if (data != NULL) {
782 225120 : iov.iov_base = data->data;
783 225120 : iov.iov_len = data->length;
784 585 : };
785 :
786 225676 : return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
787 : }
788 :
789 18064 : NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
790 : struct server_id server, uint32_t msg_type,
791 : const uint8_t *buf, size_t len)
792 : {
793 18064 : DATA_BLOB blob = data_blob_const(buf, len);
794 18064 : return messaging_send(msg_ctx, server, msg_type, &blob);
795 : }
796 :
797 172797 : static int messaging_post_self(struct messaging_context *msg_ctx,
798 : struct server_id src, struct server_id dst,
799 : uint32_t msg_type,
800 : const struct iovec *iov, int iovlen,
801 : const int *fds, size_t num_fds)
802 : {
803 369 : struct messaging_rec *rec;
804 369 : bool ok;
805 :
806 172797 : rec = messaging_rec_create(
807 : msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
808 172797 : if (rec == NULL) {
809 0 : return ENOMEM;
810 : }
811 :
812 172797 : ok = messaging_alert_event_contexts(msg_ctx);
813 172797 : if (!ok) {
814 0 : TALLOC_FREE(rec);
815 0 : return ENOMEM;
816 : }
817 :
818 172797 : DLIST_ADD_END(msg_ctx->posted_msgs, rec);
819 :
820 172428 : return 0;
821 : }
822 :
823 587305 : int messaging_send_iov_from(struct messaging_context *msg_ctx,
824 : struct server_id src, struct server_id dst,
825 : uint32_t msg_type,
826 : const struct iovec *iov, int iovlen,
827 : const int *fds, size_t num_fds)
828 587305 : {
829 1686 : int ret;
830 1686 : uint8_t hdr[MESSAGE_HDR_LENGTH];
831 587305 : struct iovec iov2[iovlen+1];
832 :
833 587305 : if (server_id_is_disconnected(&dst)) {
834 0 : return EINVAL;
835 : }
836 :
837 587305 : if (num_fds > INT8_MAX) {
838 0 : return EINVAL;
839 : }
840 :
841 587305 : if (server_id_equal(&dst, &msg_ctx->id)) {
842 172485 : ret = messaging_post_self(msg_ctx, src, dst, msg_type,
843 : iov, iovlen, fds, num_fds);
844 172485 : return ret;
845 : }
846 :
847 414820 : message_hdr_put(hdr, msg_type, src, dst);
848 414820 : iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
849 414820 : memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
850 :
851 414820 : if (dst.vnn != msg_ctx->id.vnn) {
852 0 : if (num_fds > 0) {
853 0 : return ENOSYS;
854 : }
855 :
856 0 : ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
857 0 : return ret;
858 : }
859 :
860 414820 : ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
861 :
862 414820 : if (ret == EACCES) {
863 0 : become_root();
864 0 : ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
865 : fds, num_fds);
866 0 : unbecome_root();
867 : }
868 :
869 414820 : if (ret == ECONNREFUSED) {
870 : /*
871 : * Linux returns this when a socket exists in the file
872 : * system without a listening process. This is not
873 : * documented in susv4 or the linux manpages, but it's
874 : * easily testable. For the higher levels this is the
875 : * same as "destination does not exist"
876 : */
877 6283 : ret = ENOENT;
878 : }
879 :
880 413503 : return ret;
881 : }
882 :
883 587305 : NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
884 : struct server_id dst, uint32_t msg_type,
885 : const struct iovec *iov, int iovlen,
886 : const int *fds, size_t num_fds)
887 : {
888 1686 : int ret;
889 :
890 587305 : ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
891 : iov, iovlen, fds, num_fds);
892 587305 : if (ret != 0) {
893 6285 : return map_nt_error_from_unix(ret);
894 : }
895 581020 : return NT_STATUS_OK;
896 : }
897 :
898 : struct send_all_state {
899 : struct messaging_context *msg_ctx;
900 : int msg_type;
901 : const void *buf;
902 : size_t len;
903 : };
904 :
905 14466 : static int send_all_fn(pid_t pid, void *private_data)
906 : {
907 14466 : struct send_all_state *state = private_data;
908 6 : NTSTATUS status;
909 :
910 14466 : if (pid == tevent_cached_getpid()) {
911 228 : DBG_DEBUG("Skip ourselves in messaging_send_all\n");
912 228 : return 0;
913 : }
914 :
915 14238 : status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
916 14238 : state->msg_type, state->buf, state->len);
917 14238 : if (!NT_STATUS_IS_OK(status)) {
918 6283 : DBG_NOTICE("messaging_send_buf to %ju failed: %s\n",
919 : (uintmax_t)pid, nt_errstr(status));
920 : }
921 :
922 14233 : return 0;
923 : }
924 :
925 228 : void messaging_send_all(struct messaging_context *msg_ctx,
926 : int msg_type, const void *buf, size_t len)
927 : {
928 228 : struct send_all_state state = {
929 : .msg_ctx = msg_ctx, .msg_type = msg_type,
930 : .buf = buf, .len = len
931 : };
932 1 : int ret;
933 :
934 : #ifdef CLUSTER_SUPPORT
935 : if (lp_clustering()) {
936 : struct ctdbd_connection *conn = messaging_ctdb_connection();
937 : uint8_t msghdr[MESSAGE_HDR_LENGTH];
938 : struct iovec iov[] = {
939 : { .iov_base = msghdr,
940 : .iov_len = sizeof(msghdr) },
941 : { .iov_base = discard_const_p(void, buf),
942 : .iov_len = len }
943 : };
944 :
945 : message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
946 : (struct server_id) {0});
947 :
948 : ret = ctdbd_messaging_send_iov(
949 : conn, CTDB_BROADCAST_CONNECTED,
950 : CTDB_SRVID_SAMBA_PROCESS,
951 : iov, ARRAY_SIZE(iov));
952 : if (ret != 0) {
953 : DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
954 : strerror(ret));
955 : }
956 :
957 : return;
958 : }
959 : #endif
960 :
961 228 : ret = messaging_dgm_forall(send_all_fn, &state);
962 228 : if (ret != 0) {
963 0 : DBG_WARNING("messaging_dgm_forall failed: %s\n",
964 : strerror(ret));
965 : }
966 228 : }
967 :
968 177069 : static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
969 : struct messaging_rec *rec)
970 : {
971 597 : struct messaging_rec *result;
972 177069 : size_t fds_size = sizeof(int64_t) * rec->num_fds;
973 597 : size_t payload_len;
974 :
975 177069 : payload_len = rec->buf.length + fds_size;
976 177069 : if (payload_len < rec->buf.length) {
977 : /* overflow */
978 0 : return NULL;
979 : }
980 :
981 177069 : result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
982 : payload_len);
983 177069 : if (result == NULL) {
984 0 : return NULL;
985 : }
986 177069 : *result = *rec;
987 :
988 : /* Doesn't fail, see talloc_pooled_object */
989 :
990 177069 : result->buf.data = talloc_memdup(result, rec->buf.data,
991 : rec->buf.length);
992 :
993 177069 : result->fds = NULL;
994 177069 : if (result->num_fds > 0) {
995 56 : size_t i;
996 :
997 1110 : result->fds = talloc_memdup(result, rec->fds, fds_size);
998 :
999 2223 : for (i=0; i<rec->num_fds; i++) {
1000 : /*
1001 : * fd's can only exist once
1002 : */
1003 1113 : rec->fds[i] = -1;
1004 : }
1005 : }
1006 :
1007 176472 : return result;
1008 : }
1009 :
1010 : struct messaging_filtered_read_state {
1011 : struct tevent_context *ev;
1012 : struct messaging_context *msg_ctx;
1013 : struct messaging_dgm_fde *fde;
1014 : struct messaging_ctdb_fde *cluster_fde;
1015 :
1016 : bool (*filter)(struct messaging_rec *rec, void *private_data);
1017 : void *private_data;
1018 :
1019 : struct messaging_rec *rec;
1020 : };
1021 :
1022 : static void messaging_filtered_read_cleanup(struct tevent_req *req,
1023 : enum tevent_req_state req_state);
1024 :
1025 137682 : struct tevent_req *messaging_filtered_read_send(
1026 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1027 : struct messaging_context *msg_ctx,
1028 : bool (*filter)(struct messaging_rec *rec, void *private_data),
1029 : void *private_data)
1030 : {
1031 3800 : struct tevent_req *req;
1032 3800 : struct messaging_filtered_read_state *state;
1033 3800 : size_t new_waiters_len;
1034 3800 : bool ok;
1035 :
1036 137682 : req = tevent_req_create(mem_ctx, &state,
1037 : struct messaging_filtered_read_state);
1038 137682 : if (req == NULL) {
1039 0 : return NULL;
1040 : }
1041 137682 : state->ev = ev;
1042 137682 : state->msg_ctx = msg_ctx;
1043 137682 : state->filter = filter;
1044 137682 : state->private_data = private_data;
1045 :
1046 : /*
1047 : * We have to defer the callback here, as we might be called from
1048 : * within a different tevent_context than state->ev
1049 : */
1050 137682 : tevent_req_defer_callback(req, state->ev);
1051 :
1052 137682 : state->fde = messaging_dgm_register_tevent_context(state, ev);
1053 137682 : if (tevent_req_nomem(state->fde, req)) {
1054 0 : return tevent_req_post(req, ev);
1055 : }
1056 :
1057 137682 : if (lp_clustering()) {
1058 0 : state->cluster_fde =
1059 0 : messaging_ctdb_register_tevent_context(state, ev);
1060 0 : if (tevent_req_nomem(state->cluster_fde, req)) {
1061 0 : return tevent_req_post(req, ev);
1062 : }
1063 : }
1064 :
1065 : /*
1066 : * We add ourselves to the "new_waiters" array, not the "waiters"
1067 : * array. If we are called from within messaging_read_done,
1068 : * messaging_dispatch_rec will be in an active for-loop on
1069 : * "waiters". We must be careful not to mess with this array, because
1070 : * it could mean that a single event is being delivered twice.
1071 : */
1072 :
1073 137682 : new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1074 :
1075 137682 : if (new_waiters_len == msg_ctx->num_new_waiters) {
1076 3734 : struct tevent_req **tmp;
1077 :
1078 117180 : tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1079 : struct tevent_req *, new_waiters_len+1);
1080 117180 : if (tevent_req_nomem(tmp, req)) {
1081 0 : return tevent_req_post(req, ev);
1082 : }
1083 117180 : msg_ctx->new_waiters = tmp;
1084 : }
1085 :
1086 137682 : msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1087 137682 : msg_ctx->num_new_waiters += 1;
1088 137682 : tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1089 :
1090 137682 : ok = messaging_register_event_context(msg_ctx, ev);
1091 137682 : if (!ok) {
1092 0 : tevent_req_oom(req);
1093 0 : return tevent_req_post(req, ev);
1094 : }
1095 :
1096 133882 : return req;
1097 : }
1098 :
1099 131140 : static void messaging_filtered_read_cleanup(struct tevent_req *req,
1100 : enum tevent_req_state req_state)
1101 : {
1102 131140 : struct messaging_filtered_read_state *state = tevent_req_data(
1103 : req, struct messaging_filtered_read_state);
1104 131140 : struct messaging_context *msg_ctx = state->msg_ctx;
1105 3665 : size_t i;
1106 3665 : bool ok;
1107 :
1108 131140 : tevent_req_set_cleanup_fn(req, NULL);
1109 :
1110 131140 : TALLOC_FREE(state->fde);
1111 131140 : TALLOC_FREE(state->cluster_fde);
1112 :
1113 131140 : ok = messaging_deregister_event_context(msg_ctx, state->ev);
1114 131140 : if (!ok) {
1115 0 : abort();
1116 : }
1117 :
1118 : /*
1119 : * Just set the [new_]waiters entry to NULL, be careful not to mess
1120 : * with the other "waiters" array contents. We are often called from
1121 : * within "messaging_dispatch_rec", which loops over
1122 : * "waiters". Messing with the "waiters" array will mess up that
1123 : * for-loop.
1124 : */
1125 :
1126 192674 : for (i=0; i<msg_ctx->num_waiters; i++) {
1127 82335 : if (msg_ctx->waiters[i] == req) {
1128 20801 : msg_ctx->waiters[i] = NULL;
1129 20801 : return;
1130 : }
1131 : }
1132 :
1133 432526 : for (i=0; i<msg_ctx->num_new_waiters; i++) {
1134 432526 : if (msg_ctx->new_waiters[i] == req) {
1135 110339 : msg_ctx->new_waiters[i] = NULL;
1136 110339 : return;
1137 : }
1138 : }
1139 : }
1140 :
1141 4272 : static void messaging_filtered_read_done(struct tevent_req *req,
1142 : struct messaging_rec *rec)
1143 : {
1144 4272 : struct messaging_filtered_read_state *state = tevent_req_data(
1145 : req, struct messaging_filtered_read_state);
1146 :
1147 4272 : state->rec = messaging_rec_dup(state, rec);
1148 4272 : if (tevent_req_nomem(state->rec, req)) {
1149 0 : return;
1150 : }
1151 4272 : tevent_req_done(req);
1152 : }
1153 :
1154 4272 : int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1155 : struct messaging_rec **presult)
1156 : {
1157 4272 : struct messaging_filtered_read_state *state = tevent_req_data(
1158 : req, struct messaging_filtered_read_state);
1159 228 : int err;
1160 :
1161 4272 : if (tevent_req_is_unix_error(req, &err)) {
1162 0 : tevent_req_received(req);
1163 0 : return err;
1164 : }
1165 4272 : if (presult != NULL) {
1166 4272 : *presult = talloc_move(mem_ctx, &state->rec);
1167 : }
1168 4272 : tevent_req_received(req);
1169 4272 : return 0;
1170 : }
1171 :
1172 : struct messaging_read_state {
1173 : uint32_t msg_type;
1174 : struct messaging_rec *rec;
1175 : };
1176 :
1177 : static bool messaging_read_filter(struct messaging_rec *rec,
1178 : void *private_data);
1179 : static void messaging_read_done(struct tevent_req *subreq);
1180 :
1181 30857 : struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1182 : struct tevent_context *ev,
1183 : struct messaging_context *msg,
1184 : uint32_t msg_type)
1185 : {
1186 949 : struct tevent_req *req, *subreq;
1187 949 : struct messaging_read_state *state;
1188 :
1189 30857 : req = tevent_req_create(mem_ctx, &state,
1190 : struct messaging_read_state);
1191 30857 : if (req == NULL) {
1192 0 : return NULL;
1193 : }
1194 30857 : state->msg_type = msg_type;
1195 :
1196 30857 : subreq = messaging_filtered_read_send(state, ev, msg,
1197 : messaging_read_filter, state);
1198 30857 : if (tevent_req_nomem(subreq, req)) {
1199 0 : return tevent_req_post(req, ev);
1200 : }
1201 30857 : tevent_req_set_callback(subreq, messaging_read_done, req);
1202 30857 : return req;
1203 : }
1204 :
1205 25421 : static bool messaging_read_filter(struct messaging_rec *rec,
1206 : void *private_data)
1207 : {
1208 25421 : struct messaging_read_state *state = talloc_get_type_abort(
1209 : private_data, struct messaging_read_state);
1210 :
1211 25421 : if (rec->num_fds != 0) {
1212 696 : return false;
1213 : }
1214 :
1215 24723 : return rec->msg_type == state->msg_type;
1216 : }
1217 :
1218 168 : static void messaging_read_done(struct tevent_req *subreq)
1219 : {
1220 168 : struct tevent_req *req = tevent_req_callback_data(
1221 : subreq, struct tevent_req);
1222 168 : struct messaging_read_state *state = tevent_req_data(
1223 : req, struct messaging_read_state);
1224 112 : int ret;
1225 :
1226 168 : ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1227 168 : TALLOC_FREE(subreq);
1228 168 : if (tevent_req_error(req, ret)) {
1229 0 : return;
1230 : }
1231 168 : tevent_req_done(req);
1232 : }
1233 :
1234 168 : int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1235 : struct messaging_rec **presult)
1236 : {
1237 168 : struct messaging_read_state *state = tevent_req_data(
1238 : req, struct messaging_read_state);
1239 112 : int err;
1240 :
1241 168 : if (tevent_req_is_unix_error(req, &err)) {
1242 0 : return err;
1243 : }
1244 168 : if (presult != NULL) {
1245 66 : *presult = talloc_move(mem_ctx, &state->rec);
1246 : }
1247 56 : return 0;
1248 : }
1249 :
1250 64984 : static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1251 : {
1252 64984 : if (msg_ctx->num_new_waiters == 0) {
1253 59906 : return true;
1254 : }
1255 :
1256 5074 : if (talloc_array_length(msg_ctx->waiters) <
1257 5074 : (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1258 217 : struct tevent_req **tmp;
1259 3099 : tmp = talloc_realloc(
1260 : msg_ctx, msg_ctx->waiters, struct tevent_req *,
1261 : msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1262 3099 : if (tmp == NULL) {
1263 0 : DEBUG(1, ("%s: talloc failed\n", __func__));
1264 0 : return false;
1265 : }
1266 3099 : msg_ctx->waiters = tmp;
1267 : }
1268 :
1269 5074 : memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1270 5074 : sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1271 :
1272 5074 : msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1273 5074 : msg_ctx->num_new_waiters = 0;
1274 :
1275 5074 : return true;
1276 : }
1277 :
1278 258240 : static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1279 : struct messaging_rec *rec)
1280 : {
1281 711 : struct messaging_callback *cb, *next;
1282 :
1283 1407626 : for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1284 5292 : size_t j;
1285 :
1286 1342974 : next = cb->next;
1287 1342974 : if (cb->msg_type != rec->msg_type) {
1288 1149386 : continue;
1289 : }
1290 :
1291 : /*
1292 : * the old style callbacks don't support fd passing
1293 : */
1294 193588 : for (j=0; j < rec->num_fds; j++) {
1295 0 : int fd = rec->fds[j];
1296 0 : close(fd);
1297 : }
1298 193588 : rec->num_fds = 0;
1299 193588 : rec->fds = NULL;
1300 :
1301 193588 : cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1302 : rec->src, &rec->buf);
1303 :
1304 193584 : return true;
1305 : }
1306 :
1307 64417 : return false;
1308 : }
1309 :
1310 64984 : static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1311 : struct tevent_context *ev,
1312 : struct messaging_rec *rec)
1313 : {
1314 235 : size_t i;
1315 :
1316 64984 : if (!messaging_append_new_waiters(msg_ctx)) {
1317 0 : return false;
1318 : }
1319 :
1320 64749 : i = 0;
1321 289474 : while (i < msg_ctx->num_waiters) {
1322 889 : struct tevent_req *req;
1323 889 : struct messaging_filtered_read_state *state;
1324 :
1325 228762 : req = msg_ctx->waiters[i];
1326 228762 : if (req == NULL) {
1327 : /*
1328 : * This got cleaned up. In the meantime,
1329 : * move everything down one. We need
1330 : * to keep the order of waiters, as
1331 : * other code may depend on this.
1332 : */
1333 6271 : ARRAY_DEL_ELEMENT(
1334 227 : msg_ctx->waiters, i, msg_ctx->num_waiters);
1335 6271 : msg_ctx->num_waiters -= 1;
1336 6271 : continue;
1337 : }
1338 :
1339 222491 : state = tevent_req_data(
1340 : req, struct messaging_filtered_read_state);
1341 443688 : if ((ev == state->ev) &&
1342 221197 : state->filter(rec, state->private_data)) {
1343 4272 : messaging_filtered_read_done(req, rec);
1344 4272 : return true;
1345 : }
1346 :
1347 218219 : i += 1;
1348 : }
1349 :
1350 60705 : return false;
1351 : }
1352 :
1353 : /*
1354 : Dispatch one messaging_rec
1355 : */
1356 85830 : static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1357 : struct tevent_context *ev,
1358 : struct messaging_rec *rec)
1359 : {
1360 345 : bool consumed;
1361 345 : size_t i;
1362 :
1363 85830 : if (ev == msg_ctx->event_ctx) {
1364 85498 : consumed = messaging_dispatch_classic(msg_ctx, rec);
1365 85498 : if (consumed) {
1366 42806 : return;
1367 : }
1368 : }
1369 :
1370 42898 : consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1371 42898 : if (consumed) {
1372 2614 : return;
1373 : }
1374 :
1375 40071 : if (ev != msg_ctx->event_ctx) {
1376 0 : struct iovec iov;
1377 312 : int fds[MAX(1, rec->num_fds)];
1378 0 : int ret;
1379 :
1380 : /*
1381 : * We've been listening on a nested event
1382 : * context. Messages need to be handled in the main
1383 : * event context, so post to ourselves
1384 : */
1385 :
1386 312 : iov.iov_base = rec->buf.data;
1387 312 : iov.iov_len = rec->buf.length;
1388 :
1389 312 : for (i=0; i<rec->num_fds; i++) {
1390 0 : fds[i] = rec->fds[i];
1391 : }
1392 :
1393 312 : ret = messaging_post_self(
1394 312 : msg_ctx, rec->src, rec->dest, rec->msg_type,
1395 312 : &iov, 1, fds, rec->num_fds);
1396 312 : if (ret == 0) {
1397 312 : return;
1398 : }
1399 : }
1400 : }
1401 :
1402 : static int mess_parent_dgm_cleanup(void *private_data);
1403 : static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1404 :
1405 88 : bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1406 : {
1407 0 : struct tevent_req *req;
1408 :
1409 88 : req = background_job_send(
1410 : msg, msg->event_ctx, msg, NULL, 0,
1411 88 : lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1412 : 60*15),
1413 : mess_parent_dgm_cleanup, msg);
1414 88 : if (req == NULL) {
1415 0 : DBG_WARNING("background_job_send failed\n");
1416 0 : return false;
1417 : }
1418 88 : tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1419 88 : return true;
1420 : }
1421 :
1422 0 : static int mess_parent_dgm_cleanup(void *private_data)
1423 : {
1424 0 : int ret;
1425 :
1426 0 : ret = messaging_dgm_wipe();
1427 0 : DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1428 : ret ? strerror(ret) : "ok"));
1429 0 : return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1430 : 60*15);
1431 : }
1432 :
1433 0 : static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1434 : {
1435 0 : struct messaging_context *msg = tevent_req_callback_data(
1436 : req, struct messaging_context);
1437 0 : NTSTATUS status;
1438 :
1439 0 : status = background_job_recv(req);
1440 0 : TALLOC_FREE(req);
1441 0 : DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1442 : nt_errstr(status)));
1443 :
1444 0 : req = background_job_send(
1445 : msg, msg->event_ctx, msg, NULL, 0,
1446 0 : lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1447 : 60*15),
1448 : mess_parent_dgm_cleanup, msg);
1449 0 : if (req == NULL) {
1450 0 : DEBUG(1, ("background_job_send failed\n"));
1451 0 : return;
1452 : }
1453 0 : tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1454 : }
1455 :
1456 0 : int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1457 : {
1458 0 : int ret;
1459 :
1460 0 : if (pid == 0) {
1461 0 : ret = messaging_dgm_wipe();
1462 : } else {
1463 0 : ret = messaging_dgm_cleanup(pid);
1464 : }
1465 :
1466 0 : return ret;
1467 : }
1468 :
1469 75016 : struct tevent_context *messaging_tevent_context(
1470 : struct messaging_context *msg_ctx)
1471 : {
1472 75016 : return msg_ctx->event_ctx;
1473 : }
1474 :
1475 20649 : struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1476 : {
1477 20649 : return msg_ctx->names_db;
1478 : }
1479 :
1480 : /** @} **/
|