Line data Source code
1 : /*
2 : Unix SMB/CIFS Implementation.
3 :
4 : DSDB replication service periodic notification handling
5 :
6 : Copyright (C) Andrew Tridgell 2009
7 : based on drepl_periodic
8 :
9 : This program is free software; you can redistribute it and/or modify
10 : it under the terms of the GNU General Public License as published by
11 : the Free Software Foundation; either version 3 of the License, or
12 : (at your option) any later version.
13 :
14 : This program is distributed in the hope that it will be useful,
15 : but WITHOUT ANY WARRANTY; without even the implied warranty of
16 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 : GNU General Public License for more details.
18 :
19 : You should have received a copy of the GNU General Public License
20 : along with this program. If not, see <http://www.gnu.org/licenses/>.
21 :
22 : */
23 :
24 : #include "includes.h"
25 : #include "lib/events/events.h"
26 : #include "dsdb/samdb/samdb.h"
27 : #include "auth/auth.h"
28 : #include "samba/service.h"
29 : #include "dsdb/repl/drepl_service.h"
30 : #include <ldb_errors.h>
31 : #include "../lib/util/dlinklist.h"
32 : #include "librpc/gen_ndr/ndr_misc.h"
33 : #include "librpc/gen_ndr/ndr_drsuapi.h"
34 : #include "librpc/gen_ndr/ndr_drsblobs.h"
35 : #include "libcli/composite/composite.h"
36 : #include "../lib/util/tevent_ntstatus.h"
37 :
38 : #undef DBGC_CLASS
39 : #define DBGC_CLASS DBGC_DRS_REPL
40 :
41 :
42 : struct dreplsrv_op_notify_state {
43 : struct tevent_context *ev;
44 : struct dreplsrv_notify_operation *op;
45 : void *ndr_struct_ptr;
46 : };
47 :
48 : static void dreplsrv_op_notify_connect_done(struct tevent_req *subreq);
49 :
50 : /*
51 : start the ReplicaSync async call
52 : */
53 7409 : static struct tevent_req *dreplsrv_op_notify_send(TALLOC_CTX *mem_ctx,
54 : struct tevent_context *ev,
55 : struct dreplsrv_notify_operation *op)
56 : {
57 0 : struct tevent_req *req;
58 0 : struct dreplsrv_op_notify_state *state;
59 0 : struct tevent_req *subreq;
60 :
61 7409 : req = tevent_req_create(mem_ctx, &state,
62 : struct dreplsrv_op_notify_state);
63 7409 : if (req == NULL) {
64 0 : return NULL;
65 : }
66 7409 : state->ev = ev;
67 7409 : state->op = op;
68 :
69 7409 : subreq = dreplsrv_out_drsuapi_send(state,
70 : ev,
71 7409 : op->source_dsa->conn);
72 7409 : if (tevent_req_nomem(subreq, req)) {
73 0 : return tevent_req_post(req, ev);
74 : }
75 7409 : tevent_req_set_callback(subreq, dreplsrv_op_notify_connect_done, req);
76 :
77 7409 : return req;
78 : }
79 :
80 : static void dreplsrv_op_notify_replica_sync_trigger(struct tevent_req *req);
81 :
82 7408 : static void dreplsrv_op_notify_connect_done(struct tevent_req *subreq)
83 : {
84 7408 : struct tevent_req *req = tevent_req_callback_data(subreq,
85 : struct tevent_req);
86 0 : NTSTATUS status;
87 :
88 7408 : status = dreplsrv_out_drsuapi_recv(subreq);
89 7408 : TALLOC_FREE(subreq);
90 7408 : if (tevent_req_nterror(req, status)) {
91 6263 : return;
92 : }
93 :
94 1145 : dreplsrv_op_notify_replica_sync_trigger(req);
95 : }
96 :
97 : static void dreplsrv_op_notify_replica_sync_done(struct tevent_req *subreq);
98 :
99 1145 : static void dreplsrv_op_notify_replica_sync_trigger(struct tevent_req *req)
100 : {
101 0 : struct dreplsrv_op_notify_state *state =
102 1145 : tevent_req_data(req,
103 : struct dreplsrv_op_notify_state);
104 1145 : struct dreplsrv_partition *partition = state->op->source_dsa->partition;
105 1145 : struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
106 0 : struct drsuapi_DsReplicaSync *r;
107 0 : struct tevent_req *subreq;
108 :
109 1145 : r = talloc_zero(state, struct drsuapi_DsReplicaSync);
110 1145 : if (tevent_req_nomem(r, req)) {
111 0 : return;
112 : }
113 1145 : r->in.req = talloc_zero(r, union drsuapi_DsReplicaSyncRequest);
114 1145 : if (tevent_req_nomem(r, req)) {
115 0 : return;
116 : }
117 1145 : r->in.bind_handle = &drsuapi->bind_handle;
118 1145 : r->in.level = 1;
119 1145 : r->in.req->req1.naming_context = &partition->nc;
120 1145 : r->in.req->req1.source_dsa_guid = state->op->service->ntds_guid;
121 1145 : r->in.req->req1.options =
122 : DRSUAPI_DRS_ASYNC_OP |
123 : DRSUAPI_DRS_UPDATE_NOTIFICATION |
124 : DRSUAPI_DRS_WRIT_REP;
125 :
126 1145 : if (state->op->is_urgent) {
127 327 : r->in.req->req1.options |= DRSUAPI_DRS_SYNC_URGENT;
128 : }
129 :
130 1145 : state->ndr_struct_ptr = r;
131 :
132 1145 : if (DEBUGLVL(10)) {
133 0 : NDR_PRINT_IN_DEBUG(drsuapi_DsReplicaSync, r);
134 : }
135 :
136 1145 : subreq = dcerpc_drsuapi_DsReplicaSync_r_send(state,
137 : state->ev,
138 : drsuapi->drsuapi_handle,
139 : r);
140 1145 : if (tevent_req_nomem(subreq, req)) {
141 0 : return;
142 : }
143 1145 : tevent_req_set_callback(subreq, dreplsrv_op_notify_replica_sync_done, req);
144 : }
145 :
146 1145 : static void dreplsrv_op_notify_replica_sync_done(struct tevent_req *subreq)
147 : {
148 0 : struct tevent_req *req =
149 1145 : tevent_req_callback_data(subreq,
150 : struct tevent_req);
151 0 : struct dreplsrv_op_notify_state *state =
152 1145 : tevent_req_data(req,
153 : struct dreplsrv_op_notify_state);
154 1145 : struct drsuapi_DsReplicaSync *r = talloc_get_type(state->ndr_struct_ptr,
155 : struct drsuapi_DsReplicaSync);
156 0 : NTSTATUS status;
157 :
158 1145 : state->ndr_struct_ptr = NULL;
159 :
160 1145 : status = dcerpc_drsuapi_DsReplicaSync_r_recv(subreq, r);
161 1145 : TALLOC_FREE(subreq);
162 1145 : if (tevent_req_nterror(req, status)) {
163 0 : return;
164 : }
165 :
166 1145 : if (!W_ERROR_IS_OK(r->out.result)) {
167 0 : status = werror_to_ntstatus(r->out.result);
168 0 : tevent_req_nterror(req, status);
169 0 : return;
170 : }
171 :
172 1145 : tevent_req_done(req);
173 : }
174 :
175 7408 : static NTSTATUS dreplsrv_op_notify_recv(struct tevent_req *req)
176 : {
177 7408 : return tevent_req_simple_recv_ntstatus(req);
178 : }
179 :
180 : /*
181 : called when a notify operation has completed
182 : */
183 7408 : static void dreplsrv_notify_op_callback(struct tevent_req *subreq)
184 : {
185 0 : struct dreplsrv_notify_operation *op =
186 7408 : tevent_req_callback_data(subreq,
187 : struct dreplsrv_notify_operation);
188 0 : NTSTATUS status;
189 7408 : struct dreplsrv_service *s = op->service;
190 0 : WERROR werr;
191 :
192 7408 : status = dreplsrv_op_notify_recv(subreq);
193 7408 : werr = ntstatus_to_werror(status);
194 7408 : TALLOC_FREE(subreq);
195 7408 : if (!NT_STATUS_IS_OK(status)) {
196 6263 : DBG_INFO("dreplsrv_notify: Failed to send DsReplicaSync to %s for %s - %s : %s\n",
197 : op->source_dsa->repsFrom1->other_info->dns_name,
198 : ldb_dn_get_linearized(op->source_dsa->partition->dn),
199 : nt_errstr(status), win_errstr(werr));
200 : } else {
201 1145 : DBG_INFO("dreplsrv_notify: DsReplicaSync successfully sent to %s\n",
202 : op->source_dsa->repsFrom1->other_info->dns_name);
203 1145 : op->source_dsa->notify_uSN = op->uSN;
204 : }
205 :
206 7408 : drepl_reps_update(s, "repsTo", op->source_dsa->partition->dn,
207 7408 : &op->source_dsa->repsFrom1->source_dsa_obj_guid,
208 : werr);
209 :
210 7408 : talloc_free(op);
211 7408 : s->ops.n_current = NULL;
212 7408 : dreplsrv_run_pending_ops(s);
213 7408 : }
214 :
215 : /*
216 : run any pending replica sync calls
217 : */
218 7445 : void dreplsrv_notify_run_ops(struct dreplsrv_service *s)
219 : {
220 0 : struct dreplsrv_notify_operation *op;
221 0 : struct tevent_req *subreq;
222 :
223 7445 : if (s->ops.n_current || s->ops.current) {
224 : /* if there's still one running, we're done */
225 36 : return;
226 : }
227 :
228 7409 : if (!s->ops.notifies) {
229 : /* if there're no pending operations, we're done */
230 0 : return;
231 : }
232 :
233 7409 : op = s->ops.notifies;
234 7409 : s->ops.n_current = op;
235 7409 : DLIST_REMOVE(s->ops.notifies, op);
236 :
237 7409 : subreq = dreplsrv_op_notify_send(op, s->task->event_ctx, op);
238 7409 : if (!subreq) {
239 0 : DBG_ERR("dreplsrv_notify_run_ops: dreplsrv_op_notify_send[%s][%s] - no memory\n",
240 : op->source_dsa->repsFrom1->other_info->dns_name,
241 : ldb_dn_get_linearized(op->source_dsa->partition->dn));
242 0 : return;
243 : }
244 7409 : tevent_req_set_callback(subreq, dreplsrv_notify_op_callback, op);
245 7409 : DBG_INFO("started DsReplicaSync for %s to %s\n",
246 : ldb_dn_get_linearized(op->source_dsa->partition->dn),
247 : op->source_dsa->repsFrom1->other_info->dns_name);
248 : }
249 :
250 :
251 : /*
252 : find a source_dsa for a given guid
253 : */
254 21522 : static struct dreplsrv_partition_source_dsa *dreplsrv_find_notify_dsa(struct dreplsrv_partition *p,
255 : struct GUID *guid)
256 : {
257 0 : struct dreplsrv_partition_source_dsa *s;
258 :
259 : /* first check the sources list */
260 28980 : for (s=p->sources; s; s=s->next) {
261 13994 : if (GUID_equal(&s->repsFrom1->source_dsa_obj_guid, guid)) {
262 6536 : return s;
263 : }
264 : }
265 :
266 : /* then the notifies list */
267 34481 : for (s=p->notifies; s; s=s->next) {
268 34481 : if (GUID_equal(&s->repsFrom1->source_dsa_obj_guid, guid)) {
269 14986 : return s;
270 : }
271 : }
272 0 : return NULL;
273 : }
274 :
275 :
276 : /*
277 : schedule a replicaSync message
278 : */
279 7537 : static WERROR dreplsrv_schedule_notify_sync(struct dreplsrv_service *service,
280 : struct dreplsrv_partition *p,
281 : struct repsFromToBlob *reps,
282 : TALLOC_CTX *mem_ctx,
283 : uint64_t uSN,
284 : bool is_urgent,
285 : uint32_t replica_flags)
286 : {
287 0 : struct dreplsrv_notify_operation *op;
288 0 : struct dreplsrv_partition_source_dsa *s;
289 :
290 7537 : s = dreplsrv_find_notify_dsa(p, &reps->ctr.ctr1.source_dsa_obj_guid);
291 7537 : if (s == NULL) {
292 0 : DBG_ERR("Unable to find source_dsa for %s\n",
293 : GUID_string(mem_ctx, &reps->ctr.ctr1.source_dsa_obj_guid));
294 0 : return WERR_DS_UNAVAILABLE;
295 : }
296 :
297 : /* first try to find an existing notify operation */
298 60249 : for (op = service->ops.notifies; op; op = op->next) {
299 52838 : if (op->source_dsa != s) {
300 52712 : continue;
301 : }
302 :
303 128 : if (op->is_urgent != is_urgent) {
304 2 : continue;
305 : }
306 :
307 126 : if (op->replica_flags != replica_flags) {
308 0 : continue;
309 : }
310 :
311 126 : if (op->uSN < uSN) {
312 54 : op->uSN = uSN;
313 : }
314 :
315 : /* reuse the notify operation, as it's not yet started */
316 126 : return WERR_OK;
317 : }
318 :
319 7411 : op = talloc_zero(mem_ctx, struct dreplsrv_notify_operation);
320 7411 : W_ERROR_HAVE_NO_MEMORY(op);
321 :
322 7411 : op->service = service;
323 7411 : op->source_dsa = s;
324 7411 : op->uSN = uSN;
325 7411 : op->is_urgent = is_urgent;
326 7411 : op->replica_flags = replica_flags;
327 7411 : op->schedule_time = time(NULL);
328 :
329 7411 : DLIST_ADD_END(service->ops.notifies, op);
330 7411 : talloc_steal(service, op);
331 7411 : return WERR_OK;
332 : }
333 :
334 : /*
335 : see if a partition has a hugher uSN than what is in the repsTo and
336 : if so then send a DsReplicaSync
337 : */
338 49865 : static WERROR dreplsrv_notify_check(struct dreplsrv_service *s,
339 : struct dreplsrv_partition *p,
340 : TALLOC_CTX *mem_ctx)
341 : {
342 49865 : uint32_t count=0;
343 515 : struct repsFromToBlob *reps;
344 515 : WERROR werr;
345 515 : uint64_t uSNHighest;
346 515 : uint64_t uSNUrgent;
347 515 : uint32_t i;
348 515 : int ret;
349 :
350 49865 : werr = dsdb_loadreps(s->samdb, mem_ctx, p->dn, "repsTo", &reps, &count);
351 49865 : if (!W_ERROR_IS_OK(werr)) {
352 0 : DBG_ERR("Failed to load repsTo for %s\n",
353 : ldb_dn_get_linearized(p->dn));
354 0 : return werr;
355 : }
356 :
357 : /* loads the partition uSNHighest and uSNUrgent */
358 49865 : ret = dsdb_load_partition_usn(s->samdb, p->dn, &uSNHighest, &uSNUrgent);
359 49865 : if (ret != LDB_SUCCESS || uSNHighest == 0) {
360 : /* nothing to do */
361 0 : return WERR_OK;
362 : }
363 :
364 : /* see if any of our partners need some of our objects */
365 63850 : for (i=0; i<count; i++) {
366 0 : struct dreplsrv_partition_source_dsa *sdsa;
367 0 : uint32_t replica_flags;
368 13985 : sdsa = dreplsrv_find_notify_dsa(p, &reps[i].ctr.ctr1.source_dsa_obj_guid);
369 13985 : replica_flags = reps[i].ctr.ctr1.replica_flags;
370 13985 : if (sdsa == NULL) continue;
371 13985 : if (sdsa->notify_uSN < uSNHighest) {
372 : /* we need to tell this partner to replicate
373 : with us */
374 7537 : bool is_urgent = sdsa->notify_uSN < uSNUrgent;
375 :
376 : /* check if urgent replication is needed */
377 7537 : werr = dreplsrv_schedule_notify_sync(s, p, &reps[i], mem_ctx,
378 : uSNHighest, is_urgent, replica_flags);
379 7537 : if (!W_ERROR_IS_OK(werr)) {
380 0 : DBG_ERR("Failed to setup notify to %s for %s\n",
381 : reps[i].ctr.ctr1.other_info->dns_name,
382 : ldb_dn_get_linearized(p->dn));
383 0 : return werr;
384 : }
385 7537 : DBG_DEBUG("queued DsReplicaSync for %s to %s "
386 : "(urgent=%s) uSN=%llu:%llu\n",
387 : ldb_dn_get_linearized(p->dn),
388 : reps[i].ctr.ctr1.other_info->dns_name,
389 : is_urgent?"true":"false",
390 : (unsigned long long)sdsa->notify_uSN,
391 : (unsigned long long)uSNHighest);
392 : }
393 : }
394 :
395 49865 : return WERR_OK;
396 : }
397 :
398 : /*
399 : see if any of the partitions have changed, and if so then send a
400 : DsReplicaSync to all the replica partners in the repsTo object
401 : */
402 10247 : static WERROR dreplsrv_notify_check_all(struct dreplsrv_service *s, TALLOC_CTX *mem_ctx)
403 : {
404 103 : WERROR status;
405 103 : struct dreplsrv_partition *p;
406 :
407 60112 : for (p = s->partitions; p; p = p->next) {
408 49865 : status = dreplsrv_notify_check(s, p, mem_ctx);
409 49865 : W_ERROR_NOT_OK_RETURN(status);
410 : }
411 :
412 10247 : return WERR_OK;
413 : }
414 :
415 : static void dreplsrv_notify_run(struct dreplsrv_service *service);
416 :
417 10247 : static void dreplsrv_notify_handler_te(struct tevent_context *ev, struct tevent_timer *te,
418 : struct timeval t, void *ptr)
419 : {
420 10247 : struct dreplsrv_service *service = talloc_get_type(ptr, struct dreplsrv_service);
421 103 : WERROR status;
422 :
423 10247 : service->notify.te = NULL;
424 :
425 10247 : dreplsrv_notify_run(service);
426 :
427 10247 : status = dreplsrv_notify_schedule(service, service->notify.interval);
428 10247 : if (!W_ERROR_IS_OK(status)) {
429 0 : task_server_terminate(service->task, win_errstr(status), false);
430 0 : return;
431 : }
432 : }
433 :
434 10305 : WERROR dreplsrv_notify_schedule(struct dreplsrv_service *service, uint32_t next_interval)
435 : {
436 105 : TALLOC_CTX *tmp_mem;
437 105 : struct tevent_timer *new_te;
438 105 : struct timeval next_time;
439 :
440 : /* prevent looping */
441 10305 : if (next_interval == 0) next_interval = 1;
442 :
443 10305 : next_time = timeval_current_ofs(next_interval, 50);
444 :
445 10305 : if (service->notify.te) {
446 : /*
447 : * if the timestamp of the new event is higher,
448 : * as current next we don't need to reschedule
449 : */
450 0 : if (timeval_compare(&next_time, &service->notify.next_event) > 0) {
451 0 : return WERR_OK;
452 : }
453 : }
454 :
455 : /* reset the next scheduled timestamp */
456 10305 : service->notify.next_event = next_time;
457 :
458 10305 : new_te = tevent_add_timer(service->task->event_ctx, service,
459 : service->notify.next_event,
460 : dreplsrv_notify_handler_te, service);
461 10305 : W_ERROR_HAVE_NO_MEMORY(new_te);
462 :
463 10305 : tmp_mem = talloc_new(service);
464 10305 : DBG_DEBUG("dreplsrv_notify_schedule(%u) %sscheduled for: %s\n",
465 : next_interval,
466 : (service->notify.te?"re":""),
467 : nt_time_string(tmp_mem, timeval_to_nttime(&next_time)));
468 10305 : talloc_free(tmp_mem);
469 :
470 10305 : talloc_free(service->notify.te);
471 10305 : service->notify.te = new_te;
472 :
473 10305 : return WERR_OK;
474 : }
475 :
476 10247 : static void dreplsrv_notify_run(struct dreplsrv_service *service)
477 : {
478 103 : TALLOC_CTX *mem_ctx;
479 :
480 10247 : mem_ctx = talloc_new(service);
481 10247 : dreplsrv_notify_check_all(service, mem_ctx);
482 10247 : talloc_free(mem_ctx);
483 :
484 10247 : dreplsrv_run_pending_ops(service);
485 10247 : }
|