LCOV - code coverage report
Current view: top level - source3/rpc_server/mdssvc - mdssvc_es.c (source / functions) Hit Total Coverage
Test: coverage report for vadcx-master-patch-75612 fe003de8 Lines: 260 396 65.7 %
Date: 2024-02-29 22:57:05 Functions: 18 22 81.8 %

          Line data    Source code
       1             : /*
       2             :    Unix SMB/CIFS implementation.
       3             :    Main metadata server / Spotlight routines / ES backend
       4             : 
       5             :    Copyright (C) Ralph Boehme                   2019
       6             : 
       7             :    This program is free software; you can redistribute it and/or modify
       8             :    it under the terms of the GNU General Public License as published by
       9             :    the Free Software Foundation; either version 3 of the License, or
      10             :    (at your option) any later version.
      11             : 
      12             :    This program is distributed in the hope that it will be useful,
      13             :    but WITHOUT ANY WARRANTY; without even the implied warranty of
      14             :    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      15             :    GNU General Public License for more details.
      16             : 
      17             :    You should have received a copy of the GNU General Public License
      18             :    along with this program.  If not, see <http://www.gnu.org/licenses/>.
      19             : */
      20             : 
      21             : #include "includes.h"
      22             : #include "system/filesys.h"
      23             : #include "lib/util/time_basic.h"
      24             : #include "lib/tls/tls.h"
      25             : #include "lib/util/tevent_ntstatus.h"
      26             : #include "libcli/http/http.h"
      27             : #include "lib/util/tevent_unix.h"
      28             : #include "credentials.h"
      29             : #include "mdssvc.h"
      30             : #include "mdssvc_es.h"
      31             : #include "rpc_server/mdssvc/es_parser.tab.h"
      32             : 
      33             : #include <jansson.h>
      34             : 
      35             : #undef DBGC_CLASS
      36             : #define DBGC_CLASS DBGC_RPC_SRV
      37             : 
      38             : #define MDSSVC_ELASTIC_QUERY_TEMPLATE   \
      39             :         "{"                           \
      40             :         "    \"from\": %zu,"                \
      41             :         "    \"size\": %zu,"                \
      42             :         "    \"_source\": [%s],"    \
      43             :         "    \"query\": {"          \
      44             :         "        \"query_string\": {"       \
      45             :         "            \"query\": \"%s\"" \
      46             :         "        }"                   \
      47             :         "    }"                               \
      48             :         "}"
      49             : 
      50             : #define MDSSVC_ELASTIC_SOURCES \
      51             :         "\"path.real\""
      52             : 
      53           4 : static bool mdssvc_es_init(struct mdssvc_ctx *mdssvc_ctx)
      54             : {
      55           4 :         struct mdssvc_es_ctx *mdssvc_es_ctx = NULL;
      56             :         json_error_t json_error;
      57           4 :         char *default_path = NULL;
      58           4 :         const char *path = NULL;
      59             : 
      60           4 :         mdssvc_es_ctx = talloc_zero(mdssvc_ctx, struct mdssvc_es_ctx);
      61           4 :         if (mdssvc_es_ctx == NULL) {
      62           0 :                 return false;
      63             :         }
      64           4 :         mdssvc_es_ctx->mdssvc_ctx = mdssvc_ctx;
      65             : 
      66           4 :         mdssvc_es_ctx->creds = cli_credentials_init_anon(mdssvc_es_ctx);
      67           4 :         if (mdssvc_es_ctx->creds == NULL) {
      68           0 :                 TALLOC_FREE(mdssvc_es_ctx);
      69           0 :                 return false;
      70             :         }
      71             : 
      72           4 :         default_path = talloc_asprintf(
      73             :                 mdssvc_es_ctx,
      74             :                 "%s/mdssvc/elasticsearch_mappings.json",
      75             :                 get_dyn_SAMBA_DATADIR());
      76           4 :         if (default_path == NULL) {
      77           0 :                 TALLOC_FREE(mdssvc_es_ctx);
      78           0 :                 return false;
      79             :         }
      80             : 
      81           4 :         path = lp_parm_const_string(GLOBAL_SECTION_SNUM,
      82             :                                     "elasticsearch",
      83             :                                     "mappings",
      84             :                                     default_path);
      85           4 :         if (path == NULL) {
      86           0 :                 TALLOC_FREE(mdssvc_es_ctx);
      87           0 :                 return false;
      88             :         }
      89             : 
      90           4 :         mdssvc_es_ctx->mappings = json_load_file(path, 0, &json_error);
      91           4 :         if (mdssvc_es_ctx->mappings == NULL) {
      92           0 :                 DBG_ERR("Opening mapping file [%s] failed: %s\n",
      93             :                         path, json_error.text);
      94           0 :                 TALLOC_FREE(mdssvc_es_ctx);
      95           0 :                 return false;
      96             :         }
      97           4 :         TALLOC_FREE(default_path);
      98             : 
      99           4 :         mdssvc_ctx->backend_private = mdssvc_es_ctx;
     100           4 :         return true;
     101             : }
     102             : 
     103           4 : static bool mdssvc_es_shutdown(struct mdssvc_ctx *mdssvc_ctx)
     104             : {
     105           4 :         return true;
     106             : }
     107             : 
     108             : static struct tevent_req *mds_es_connect_send(
     109             :                                 TALLOC_CTX *mem_ctx,
     110             :                                 struct tevent_context *ev,
     111             :                                 struct mds_es_ctx *mds_es_ctx);
     112             : static int mds_es_connect_recv(struct tevent_req *req);
     113             : static void mds_es_connected(struct tevent_req *subreq);
     114             : static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx);
     115             : static void mds_es_search_set_pending(struct sl_es_search *s);
     116             : static void mds_es_search_unset_pending(struct sl_es_search *s);
     117             : 
     118          12 : static int mds_es_ctx_destructor(struct mds_es_ctx *mds_es_ctx)
     119             : {
     120          12 :         struct sl_es_search *s = mds_es_ctx->searches;
     121             : 
     122             :         /*
     123             :          * The per tree-connect state mds_es_ctx (a child of mds_ctx) is about
     124             :          * to go away and has already freed all waiting searches. If there's a
     125             :          * search remaining that's when the search is already active. Reset the
     126             :          * mds_es_ctx pointer, so we can detect this when the search completes.
     127             :          */
     128             : 
     129          12 :         if (s == NULL) {
     130          12 :                 return 0;
     131             :         }
     132             : 
     133           0 :         s->mds_es_ctx = NULL;
     134             : 
     135           0 :         return 0;
     136             : }
     137             : 
     138          12 : static bool mds_es_connect(struct mds_ctx *mds_ctx)
     139             : {
     140          12 :         struct mdssvc_es_ctx *mdssvc_es_ctx = talloc_get_type_abort(
     141             :                 mds_ctx->mdssvc_ctx->backend_private, struct mdssvc_es_ctx);
     142          12 :         struct mds_es_ctx *mds_es_ctx = NULL;
     143          12 :         struct tevent_req *subreq = NULL;
     144             : 
     145          12 :         mds_es_ctx = talloc_zero(mds_ctx, struct mds_es_ctx);
     146          12 :         if (mds_es_ctx == NULL) {
     147           0 :                 return false;
     148             :         }
     149          12 :         *mds_es_ctx = (struct mds_es_ctx) {
     150             :                 .mdssvc_es_ctx = mdssvc_es_ctx,
     151             :                 .mds_ctx = mds_ctx,
     152             :         };
     153             : 
     154          12 :         mds_ctx->backend_private = mds_es_ctx;
     155          12 :         talloc_set_destructor(mds_es_ctx, mds_es_ctx_destructor);
     156             : 
     157          12 :         subreq = mds_es_connect_send(
     158             :                         mds_es_ctx,
     159          12 :                         mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
     160             :                         mds_es_ctx);
     161          12 :         if (subreq == NULL) {
     162           0 :                 TALLOC_FREE(mds_es_ctx);
     163           0 :                 return false;
     164             :         }
     165          12 :         tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
     166          12 :         return true;
     167             : }
     168             : 
     169           2 : static void mds_es_connected(struct tevent_req *subreq)
     170             : {
     171           2 :         struct mds_es_ctx *mds_es_ctx = tevent_req_callback_data(
     172             :                 subreq, struct mds_es_ctx);
     173             :         int ret;
     174             :         bool ok;
     175             : 
     176           2 :         ret = mds_es_connect_recv(subreq);
     177           2 :         TALLOC_FREE(subreq);
     178           2 :         if (ret != 0) {
     179           0 :                 DBG_ERR("HTTP connect failed\n");
     180           0 :                 return;
     181             :         }
     182             : 
     183           2 :         ok = mds_es_next_search_trigger(mds_es_ctx);
     184           2 :         if (!ok) {
     185           0 :                 DBG_ERR("mds_es_next_search_trigger failed\n");
     186             :         }
     187           2 :         return;
     188             : }
     189             : 
     190             : struct mds_es_connect_state {
     191             :         struct tevent_context *ev;
     192             :         struct mds_es_ctx *mds_es_ctx;
     193             :         struct tevent_queue_entry *qe;
     194             :         const char *server_addr;
     195             :         uint16_t server_port;
     196             :         struct tstream_tls_params *tls_params;
     197             : };
     198             : 
     199             : static void mds_es_http_connect_done(struct tevent_req *subreq);
     200             : static void mds_es_http_waited(struct tevent_req *subreq);
     201             : 
     202          12 : static struct tevent_req *mds_es_connect_send(
     203             :                                 TALLOC_CTX *mem_ctx,
     204             :                                 struct tevent_context *ev,
     205             :                                 struct mds_es_ctx *mds_es_ctx)
     206             : {
     207          12 :         struct tevent_req *req = NULL;
     208          12 :         struct tevent_req *subreq = NULL;
     209          12 :         struct mds_es_connect_state *state = NULL;
     210          12 :         const char *server_addr = NULL;
     211             :         bool use_tls;
     212             :         NTSTATUS status;
     213             : 
     214          12 :         req = tevent_req_create(mem_ctx, &state, struct mds_es_connect_state);
     215          12 :         if (req == NULL) {
     216           0 :                 return NULL;
     217             :         }
     218          12 :         *state = (struct mds_es_connect_state) {
     219             :                 .ev = ev,
     220             :                 .mds_es_ctx = mds_es_ctx,
     221             :         };
     222             : 
     223          12 :         server_addr = lp_parm_const_string(
     224          12 :                 mds_es_ctx->mds_ctx->snum,
     225             :                 "elasticsearch",
     226             :                 "address",
     227             :                 "localhost");
     228          12 :         state->server_addr = talloc_strdup(state, server_addr);
     229          12 :         if (tevent_req_nomem(state->server_addr, req)) {
     230           0 :                 return tevent_req_post(req, ev);
     231             :         }
     232             : 
     233          24 :         state->server_port = lp_parm_int(
     234          12 :                 mds_es_ctx->mds_ctx->snum,
     235             :                 "elasticsearch",
     236             :                 "port",
     237             :                 9200);
     238             : 
     239          12 :         use_tls = lp_parm_bool(
     240          12 :                 mds_es_ctx->mds_ctx->snum,
     241             :                 "elasticsearch",
     242             :                 "use tls",
     243             :                 false);
     244             : 
     245          12 :         DBG_DEBUG("Connecting to HTTP%s [%s] port [%"PRIu16"]\n",
     246             :                   use_tls ? "S" : "", state->server_addr, state->server_port);
     247             : 
     248          12 :         if (use_tls) {
     249           0 :                 const char *ca_file = lp__tls_cafile();
     250           0 :                 const char *crl_file = lp__tls_crlfile();
     251           0 :                 const char *tls_priority = lp_tls_priority();
     252           0 :                 enum tls_verify_peer_state verify_peer = lp_tls_verify_peer();
     253             : 
     254           0 :                 status = tstream_tls_params_client(state,
     255             :                                                    ca_file,
     256             :                                                    crl_file,
     257             :                                                    tls_priority,
     258             :                                                    verify_peer,
     259           0 :                                                    state->server_addr,
     260           0 :                                                    &state->tls_params);
     261           0 :                 if (!NT_STATUS_IS_OK(status)) {
     262           0 :                         DBG_ERR("Failed tstream_tls_params_client - %s\n",
     263             :                                 nt_errstr(status));
     264           0 :                         tevent_req_nterror(req, status);
     265           0 :                         return tevent_req_post(req, ev);
     266             :                 }
     267             :         }
     268             : 
     269          12 :         subreq = http_connect_send(state,
     270          12 :                                    state->ev,
     271          12 :                                    state->server_addr,
     272          12 :                                    state->server_port,
     273          12 :                                    mds_es_ctx->mdssvc_es_ctx->creds,
     274          12 :                                    state->tls_params);
     275          12 :         if (tevent_req_nomem(subreq, req)) {
     276           0 :                 return tevent_req_post(req, ev);
     277             :         }
     278          12 :         tevent_req_set_callback(subreq, mds_es_http_connect_done, req);
     279          12 :         return req;
     280             : }
     281             : 
     282          12 : static void mds_es_http_connect_done(struct tevent_req *subreq)
     283             : {
     284          12 :         struct tevent_req *req = tevent_req_callback_data(
     285             :                 subreq, struct tevent_req);
     286          12 :         struct mds_es_connect_state *state = tevent_req_data(
     287             :                 req, struct mds_es_connect_state);
     288             :         int error;
     289             : 
     290          12 :         error = http_connect_recv(subreq,
     291          12 :                                   state->mds_es_ctx,
     292          12 :                                   &state->mds_es_ctx->http_conn);
     293          12 :         TALLOC_FREE(subreq);
     294          12 :         if (error != 0) {
     295          10 :                 DBG_ERR("HTTP connect failed, retrying...\n");
     296             : 
     297          10 :                 subreq = tevent_wakeup_send(
     298          10 :                         state->mds_es_ctx,
     299          10 :                         state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
     300             :                         tevent_timeval_current_ofs(10, 0));
     301          10 :                 if (tevent_req_nomem(subreq, req)) {
     302           0 :                         return;
     303             :                 }
     304          10 :                 tevent_req_set_callback(subreq,
     305             :                                         mds_es_http_waited,
     306             :                                         req);
     307          10 :                 return;
     308             :         }
     309             : 
     310           2 :         DBG_DEBUG("Connected to HTTP%s [%s] port [%"PRIu16"]\n",
     311             :                   state->tls_params ? "S" : "",
     312             :                   state->server_addr, state->server_port);
     313             : 
     314           2 :         tevent_req_done(req);
     315           2 :         return;
     316             : }
     317             : 
     318           0 : static void mds_es_http_waited(struct tevent_req *subreq)
     319             : {
     320           0 :         struct tevent_req *req = tevent_req_callback_data(
     321             :                 subreq, struct tevent_req);
     322           0 :         struct mds_es_connect_state *state = tevent_req_data(
     323             :                 req, struct mds_es_connect_state);
     324             :         bool ok;
     325             : 
     326           0 :         ok = tevent_wakeup_recv(subreq);
     327           0 :         TALLOC_FREE(subreq);
     328           0 :         if (!ok) {
     329           0 :                 tevent_req_error(req, ETIMEDOUT);
     330           0 :                 return;
     331             :         }
     332             : 
     333           0 :         subreq = mds_es_connect_send(
     334           0 :                         state->mds_es_ctx,
     335           0 :                         state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
     336             :                         state->mds_es_ctx);
     337           0 :         if (tevent_req_nomem(subreq, req)) {
     338           0 :                 return;
     339             :         }
     340           0 :         tevent_req_set_callback(subreq, mds_es_connected, state->mds_es_ctx);
     341             : }
     342             : 
     343           2 : static int mds_es_connect_recv(struct tevent_req *req)
     344             : {
     345           2 :         return tevent_req_simple_recv_unix(req);
     346             : }
     347             : 
     348           0 : static void mds_es_reconnect_on_error(struct sl_es_search *s)
     349             : {
     350           0 :         struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
     351           0 :         struct tevent_req *subreq = NULL;
     352             : 
     353           0 :         if (s->slq != NULL) {
     354           0 :                 s->slq->state = SLQ_STATE_ERROR;
     355             :         }
     356             : 
     357           0 :         DBG_WARNING("Reconnecting HTTP...\n");
     358           0 :         TALLOC_FREE(mds_es_ctx->http_conn);
     359             : 
     360           0 :         subreq = mds_es_connect_send(
     361             :                         mds_es_ctx,
     362           0 :                         mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
     363             :                         mds_es_ctx);
     364           0 :         if (subreq == NULL) {
     365           0 :                 DBG_ERR("mds_es_connect_send failed\n");
     366           0 :                 return;
     367             :         }
     368           0 :         tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
     369             : }
     370             : 
     371           2 : static int search_destructor(struct sl_es_search *s)
     372             : {
     373           2 :         if (s->mds_es_ctx == NULL) {
     374           0 :                 return 0;
     375             :         }
     376           2 :         DLIST_REMOVE(s->mds_es_ctx->searches, s);
     377           2 :         return 0;
     378             : }
     379             : 
     380             : static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
     381             :                                              struct tevent_context *ev,
     382             :                                              struct sl_es_search *s);
     383             : static int mds_es_search_recv(struct tevent_req *req);
     384             : static void mds_es_search_done(struct tevent_req *subreq);
     385             : 
     386           2 : static bool mds_es_search(struct sl_query *slq)
     387             : {
     388           2 :         struct mds_es_ctx *mds_es_ctx = talloc_get_type_abort(
     389             :                 slq->mds_ctx->backend_private, struct mds_es_ctx);
     390           2 :         struct sl_es_search *s = NULL;
     391             :         bool ok;
     392             : 
     393           2 :         s = talloc_zero(slq, struct sl_es_search);
     394           2 :         if (s == NULL) {
     395           0 :                 return false;
     396             :         }
     397           2 :         *s = (struct sl_es_search) {
     398           2 :                 .ev = mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
     399             :                 .mds_es_ctx = mds_es_ctx,
     400             :                 .slq = slq,
     401             :                 .size = SL_PAGESIZE,
     402             :         };
     403             : 
     404             :         /* 0 would mean no limit */
     405           2 :         s->max = lp_parm_ulonglong(s->slq->mds_ctx->snum,
     406             :                                    "elasticsearch",
     407             :                                    "max results",
     408             :                                    MAX_SL_RESULTS);
     409             : 
     410           2 :         DBG_DEBUG("Spotlight query: '%s'\n", slq->query_string);
     411             : 
     412           2 :         ok = map_spotlight_to_es_query(
     413             :                 s,
     414           2 :                 mds_es_ctx->mdssvc_es_ctx->mappings,
     415             :                 slq->path_scope,
     416           2 :                 slq->query_string,
     417             :                 &s->es_query);
     418           2 :         if (!ok) {
     419           0 :                 TALLOC_FREE(s);
     420           0 :                 return false;
     421             :         }
     422           2 :         DBG_DEBUG("Elasticsearch query: '%s'\n", s->es_query);
     423             : 
     424           2 :         slq->backend_private = s;
     425           2 :         slq->state = SLQ_STATE_RUNNING;
     426           2 :         DLIST_ADD_END(mds_es_ctx->searches, s);
     427           2 :         talloc_set_destructor(s, search_destructor);
     428             : 
     429           2 :         return mds_es_next_search_trigger(mds_es_ctx);
     430             : }
     431             : 
     432           6 : static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx)
     433             : {
     434           6 :         struct tevent_req *subreq = NULL;
     435           6 :         struct sl_es_search *s = mds_es_ctx->searches;
     436             : 
     437           6 :         if (mds_es_ctx->http_conn == NULL) {
     438           0 :                 DBG_DEBUG("Waiting for HTTP connection...\n");
     439           0 :                 return true;
     440             :         }
     441           6 :         if (s == NULL) {
     442           4 :                 DBG_DEBUG("No pending searches, idling...\n");
     443           4 :                 return true;
     444             :         }
     445           2 :         if (s->pending) {
     446           0 :                 DBG_DEBUG("Search pending [%p]\n", s);
     447           0 :                 return true;
     448             :         }
     449             : 
     450           2 :         subreq = mds_es_search_send(s, s->ev, s);
     451           2 :         if (subreq == NULL) {
     452           0 :                 return false;
     453             :         }
     454           2 :         tevent_req_set_callback(subreq, mds_es_search_done, s);
     455           2 :         mds_es_search_set_pending(s);
     456           2 :         return true;
     457             : }
     458             : 
     459           2 : static void mds_es_search_done(struct tevent_req *subreq)
     460             : {
     461           2 :         struct sl_es_search *s = tevent_req_callback_data(
     462             :                 subreq, struct sl_es_search);
     463           2 :         struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
     464           2 :         struct sl_query *slq = s->slq;
     465             :         int ret;
     466             :         bool ok;
     467             : 
     468           2 :         DBG_DEBUG("Search done for search [%p]\n", s);
     469             : 
     470           2 :         mds_es_search_unset_pending(s);
     471             : 
     472           2 :         if (mds_es_ctx == NULL) {
     473             :                 /*
     474             :                  * Search connection closed by the user while s was pending.
     475             :                  */
     476           0 :                 TALLOC_FREE(s);
     477           0 :                 return;
     478             :         }
     479             : 
     480           2 :         DLIST_REMOVE(mds_es_ctx->searches, s);
     481             : 
     482           2 :         ret = mds_es_search_recv(subreq);
     483           2 :         TALLOC_FREE(subreq);
     484           2 :         if (ret != 0) {
     485           0 :                 mds_es_reconnect_on_error(s);
     486           0 :                 return;
     487             :         }
     488             : 
     489           2 :         if (slq == NULL) {
     490             :                 /*
     491             :                  * Closed by the user. Explicitly free "s" here because the
     492             :                  * talloc parent slq is already gone.
     493             :                  */
     494           0 :                 TALLOC_FREE(s);
     495           0 :                 goto trigger;
     496             :         }
     497             : 
     498           2 :         SLQ_DEBUG(10, slq, "search done");
     499             : 
     500           2 :         if (s->total == 0 || s->from >= s->max) {
     501           2 :                 slq->state = SLQ_STATE_DONE;
     502           2 :                 goto trigger;
     503             :         }
     504             : 
     505           0 :         if (slq->query_results->num_results >= SL_PAGESIZE) {
     506           0 :                 slq->state = SLQ_STATE_FULL;
     507           0 :                 goto trigger;
     508             :         }
     509             : 
     510             :         /*
     511             :          * Reschedule this query as there are more results waiting in the
     512             :          * Elasticsearch server and the client result queue has room as
     513             :          * well. But put it at the end of the list of active queries as a simple
     514             :          * heuristic that should ensure all client queries are dispatched to the
     515             :          * server.
     516             :          */
     517           0 :         DLIST_ADD_END(mds_es_ctx->searches, s);
     518             : 
     519           2 : trigger:
     520           2 :         ok = mds_es_next_search_trigger(mds_es_ctx);
     521           2 :         if (!ok) {
     522           0 :                 DBG_ERR("mds_es_next_search_trigger failed\n");
     523             :         }
     524             : }
     525             : 
     526             : static void mds_es_search_http_send_done(struct tevent_req *subreq);
     527             : static void mds_es_search_http_read_done(struct tevent_req *subreq);
     528             : 
     529             : struct mds_es_search_state {
     530             :         struct tevent_context *ev;
     531             :         struct sl_es_search *s;
     532             :         struct tevent_queue_entry *qe;
     533             :         struct http_request http_request;
     534             :         struct http_request *http_response;
     535             : };
     536             : 
     537           0 : static int mds_es_search_pending_destructor(struct sl_es_search *s)
     538             : {
     539             :         /*
     540             :          * s is a child of slq which may get freed when a user closes a
     541             :          * query. To maintain the HTTP request/response sequence on the HTTP
     542             :          * channel, we keep processing pending requests and free s when we
     543             :          * receive the HTTP response for pending requests.
     544             :          */
     545           0 :         DBG_DEBUG("Preserving pending search [%p]\n", s);
     546           0 :         s->slq = NULL;
     547           0 :         return -1;
     548             : }
     549             : 
     550           2 : static void mds_es_search_set_pending(struct sl_es_search *s)
     551             : {
     552           2 :         DBG_DEBUG("Set pending [%p]\n", s);
     553           2 :         SLQ_DEBUG(10, s->slq, "pending");
     554             : 
     555           2 :         s->pending = true;
     556           2 :         talloc_set_destructor(s, mds_es_search_pending_destructor);
     557           2 : }
     558             : 
     559           2 : static void mds_es_search_unset_pending(struct sl_es_search *s)
     560             : {
     561           2 :         DBG_DEBUG("Unset pending [%p]\n", s);
     562           2 :         if (s->slq != NULL) {
     563           2 :                 SLQ_DEBUG(10, s->slq, "unset pending");
     564             :         }
     565             : 
     566           2 :         s->pending = false;
     567           2 :         talloc_set_destructor(s, search_destructor);
     568           2 : }
     569             : 
     570           2 : static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
     571             :                                               struct tevent_context *ev,
     572             :                                               struct sl_es_search *s)
     573             : {
     574           2 :         struct tevent_req *req = NULL;
     575           2 :         struct tevent_req *subreq = NULL;
     576           2 :         struct mds_es_search_state *state = NULL;
     577           2 :         const char *index = NULL;
     578           2 :         char *elastic_query = NULL;
     579           2 :         char *uri = NULL;
     580             :         size_t elastic_query_len;
     581           2 :         char *elastic_query_len_str = NULL;
     582           2 :         char *hostname = NULL;
     583           2 :         bool pretty = false;
     584             : 
     585           2 :         req = tevent_req_create(mem_ctx, &state, struct mds_es_search_state);
     586           2 :         if (req == NULL) {
     587           0 :                 return NULL;
     588             :         }
     589           2 :         *state = (struct mds_es_search_state) {
     590             :                 .ev = ev,
     591             :                 .s = s,
     592             :         };
     593             : 
     594           2 :         if (!tevent_req_set_endtime(req, ev, timeval_current_ofs(60, 0))) {
     595           0 :                 return tevent_req_post(req, s->ev);
     596             :         }
     597             : 
     598           2 :         index = lp_parm_const_string(s->slq->mds_ctx->snum,
     599             :                                      "elasticsearch",
     600             :                                      "index",
     601             :                                      "_all");
     602           2 :         if (tevent_req_nomem(index, req)) {
     603           0 :                 return tevent_req_post(req, ev);
     604             :         }
     605             : 
     606           2 :         if (DEBUGLVL(10)) {
     607           0 :                 pretty = true;
     608             :         }
     609             : 
     610           2 :         uri = talloc_asprintf(state,
     611             :                               "/%s/_search%s",
     612             :                               index,
     613             :                               pretty ? "?pretty" : "");
     614           2 :         if (tevent_req_nomem(uri, req)) {
     615           0 :                 return tevent_req_post(req, ev);
     616             :         }
     617             : 
     618           2 :         elastic_query = talloc_asprintf(state,
     619             :                                         MDSSVC_ELASTIC_QUERY_TEMPLATE,
     620             :                                         s->from,
     621             :                                         s->size,
     622             :                                         MDSSVC_ELASTIC_SOURCES,
     623             :                                         s->es_query);
     624           2 :         if (tevent_req_nomem(elastic_query, req)) {
     625           0 :                 return tevent_req_post(req, ev);
     626             :         }
     627           2 :         DBG_DEBUG("Elastic query: '%s'\n", elastic_query);
     628             : 
     629           2 :         elastic_query_len = strlen(elastic_query);
     630             : 
     631           4 :         state->http_request = (struct http_request) {
     632             :                 .type = HTTP_REQ_POST,
     633             :                 .uri = uri,
     634           2 :                 .body = data_blob_const(elastic_query, elastic_query_len),
     635             :                 .major = '1',
     636             :                 .minor = '1',
     637             :         };
     638             : 
     639           2 :         elastic_query_len_str = talloc_asprintf(state, "%zu", elastic_query_len);
     640           2 :         if (tevent_req_nomem(elastic_query_len_str, req)) {
     641           0 :                 return tevent_req_post(req, ev);
     642             :         }
     643             : 
     644           2 :         hostname = get_myname(state);
     645           2 :         if (tevent_req_nomem(hostname, req)) {
     646           0 :                 return tevent_req_post(req, ev);
     647             :         }
     648             : 
     649           2 :         http_add_header(state, &state->http_request.headers,
     650             :                         "Content-Type",       "application/json");
     651           2 :         http_add_header(state, &state->http_request.headers,
     652             :                         "Accept", "application/json");
     653           2 :         http_add_header(state, &state->http_request.headers,
     654             :                         "User-Agent", "Samba/mdssvc");
     655           2 :         http_add_header(state, &state->http_request.headers,
     656             :                         "Host", hostname);
     657           2 :         http_add_header(state, &state->http_request.headers,
     658             :                         "Content-Length", elastic_query_len_str);
     659             : 
     660           2 :         subreq = http_send_request_send(state,
     661             :                                         ev,
     662           2 :                                         s->mds_es_ctx->http_conn,
     663           2 :                                         &state->http_request);
     664           2 :         if (tevent_req_nomem(subreq, req)) {
     665           0 :                 return tevent_req_post(req, ev);
     666             :         }
     667           2 :         tevent_req_set_callback(subreq, mds_es_search_http_send_done, req);
     668           2 :         return req;
     669             : }
     670             : 
     671           2 : static void mds_es_search_http_send_done(struct tevent_req *subreq)
     672             : {
     673           2 :         struct tevent_req *req = tevent_req_callback_data(
     674             :                 subreq, struct tevent_req);
     675           2 :         struct mds_es_search_state *state = tevent_req_data(
     676             :                 req, struct mds_es_search_state);
     677             :         NTSTATUS status;
     678             : 
     679           2 :         DBG_DEBUG("Sent out search [%p]\n", state->s);
     680             : 
     681           2 :         status = http_send_request_recv(subreq);
     682           2 :         TALLOC_FREE(subreq);
     683           2 :         if (!NT_STATUS_IS_OK(status)) {
     684           0 :                 tevent_req_error(req, map_errno_from_nt_status(status));
     685           0 :                 return;
     686             :         }
     687             : 
     688           2 :         if (state->s->mds_es_ctx == NULL || state->s->slq == NULL) {
     689           0 :                 tevent_req_done(req);
     690           0 :                 return;
     691             :         }
     692             : 
     693           2 :         subreq = http_read_response_send(state,
     694             :                                          state->ev,
     695           2 :                                          state->s->mds_es_ctx->http_conn,
     696             :                                          SL_PAGESIZE * 8192);
     697           2 :         if (tevent_req_nomem(subreq, req)) {
     698           0 :                 return;
     699             :         }
     700           2 :         tevent_req_set_callback(subreq, mds_es_search_http_read_done, req);
     701             : }
     702             : 
     703           2 : static void mds_es_search_http_read_done(struct tevent_req *subreq)
     704             : {
     705           2 :         struct tevent_req *req = tevent_req_callback_data(
     706             :                 subreq, struct tevent_req);
     707           2 :         struct mds_es_search_state *state = tevent_req_data(
     708             :                 req, struct mds_es_search_state);
     709           2 :         struct sl_es_search *s = state->s;
     710           2 :         struct sl_query *slq = s->slq;
     711           2 :         json_t *root = NULL;
     712           2 :         json_t *matches = NULL;
     713           2 :         json_t *match = NULL;
     714             :         size_t i;
     715             :         json_error_t error;
     716             :         size_t hits;
     717             :         NTSTATUS status;
     718             :         int ret;
     719             :         bool ok;
     720             : 
     721           2 :         DBG_DEBUG("Got response for search [%p]\n", s);
     722             : 
     723           2 :         status = http_read_response_recv(subreq, state, &state->http_response);
     724           2 :         TALLOC_FREE(subreq);
     725           2 :         if (!NT_STATUS_IS_OK(status)) {
     726           0 :                 DBG_DEBUG("HTTP response failed: %s\n", nt_errstr(status));
     727           0 :                 tevent_req_error(req, map_errno_from_nt_status(status));
     728           0 :                 return;
     729             :         }
     730             : 
     731           2 :         if (slq == NULL || s->mds_es_ctx == NULL) {
     732           0 :                 tevent_req_done(req);
     733           0 :                 return;
     734             :         }
     735             : 
     736           2 :         switch (state->http_response->response_code) {
     737           2 :         case 200:
     738           2 :                 break;
     739           0 :         default:
     740           0 :                 DBG_ERR("HTTP server response: %u\n",
     741             :                         state->http_response->response_code);
     742           0 :                 goto fail;
     743             :         }
     744             : 
     745           2 :         DBG_DEBUG("JSON response:\n%s\n",
     746             :                   talloc_strndup(talloc_tos(),
     747             :                                  (char *)state->http_response->body.data,
     748             :                                  state->http_response->body.length));
     749             : 
     750           2 :         root = json_loadb((char *)state->http_response->body.data,
     751           2 :                           state->http_response->body.length,
     752             :                           0,
     753             :                           &error);
     754           2 :         if (root == NULL) {
     755           0 :                 DBG_ERR("json_loadb failed\n");
     756           0 :                 goto fail;
     757             :         }
     758             : 
     759           2 :         if (s->total == 0) {
     760             :                 /*
     761             :                  * Get the total number of results the first time, format
     762             :                  * used by Elasticsearch 7.0 or newer
     763             :                  */
     764           2 :                 ret = json_unpack(root, "{s: {s: {s: i}}}",
     765             :                                   "hits", "total", "value", &s->total);
     766           2 :                 if (ret != 0) {
     767             :                         /* Format used before 7.0 */
     768           0 :                         ret = json_unpack(root, "{s: {s: i}}",
     769             :                                           "hits", "total", &s->total);
     770           0 :                         if (ret != 0) {
     771           0 :                                 DBG_ERR("json_unpack failed\n");
     772           0 :                                 goto fail;
     773             :                         }
     774             :                 }
     775             : 
     776           2 :                 DBG_DEBUG("Total: %zu\n", s->total);
     777             : 
     778           2 :                 if (s->total == 0) {
     779           0 :                         json_decref(root);
     780           0 :                         tevent_req_done(req);
     781           0 :                         return;
     782             :                 }
     783             :         }
     784             : 
     785           2 :         if (s->max == 0 || s->max > s->total) {
     786           2 :                 s->max = s->total;
     787             :         }
     788             : 
     789           2 :         ret = json_unpack(root, "{s: {s:o}}",
     790             :                           "hits", "hits", &matches);
     791           2 :         if (ret != 0 || matches == NULL) {
     792           0 :                 DBG_ERR("json_unpack hits failed\n");
     793           0 :                 goto fail;
     794             :         }
     795             : 
     796           2 :         hits = json_array_size(matches);
     797           2 :         if (hits == 0) {
     798           0 :                 DBG_ERR("Hu?! No results?\n");
     799           0 :                 goto fail;
     800             :         }
     801           2 :         DBG_DEBUG("Hits: %zu\n", hits);
     802             : 
     803           6 :         for (i = 0; i < hits && s->from + i < s->max; i++) {
     804           4 :                 const char *path = NULL;
     805             : 
     806           4 :                 match = json_array_get(matches, i);
     807           4 :                 if (match == NULL) {
     808           0 :                         DBG_ERR("Hu?! No value for index %zu\n", i);
     809           0 :                         goto fail;
     810             :                 }
     811           4 :                 ret = json_unpack(match,
     812             :                                   "{s: {s: {s: s}}}",
     813             :                                   "_source",
     814             :                                   "path",
     815             :                                   "real",
     816             :                                   &path);
     817           4 :                 if (ret != 0) {
     818           0 :                         DBG_ERR("Missing path.real in JSON result\n");
     819           0 :                         goto fail;
     820             :                 }
     821             : 
     822           4 :                 ok = mds_add_result(slq, path);
     823           4 :                 if (!ok) {
     824           0 :                         DBG_ERR("error adding result for path: %s\n", path);
     825           0 :                         goto fail;
     826             :                 }
     827             :         }
     828           2 :         json_decref(root);
     829             : 
     830           2 :         s->from += hits;
     831           2 :         slq->state = SLQ_STATE_RESULTS;
     832           2 :         tevent_req_done(req);
     833           2 :         return;
     834             : 
     835           0 : fail:
     836           0 :         if (root != NULL) {
     837           0 :                 json_decref(root);
     838             :         }
     839           0 :         slq->state = SLQ_STATE_ERROR;
     840           0 :         tevent_req_error(req, EINVAL);
     841           0 :         return;
     842             : }
     843             : 
     844           2 : static int mds_es_search_recv(struct tevent_req *req)
     845             : {
     846           2 :         return tevent_req_simple_recv_unix(req);
     847             : }
     848             : 
     849           0 : static bool mds_es_search_cont(struct sl_query *slq)
     850             : {
     851           0 :         struct sl_es_search *s = talloc_get_type_abort(
     852             :                 slq->backend_private, struct sl_es_search);
     853             : 
     854           0 :         SLQ_DEBUG(10, slq, "continue");
     855           0 :         DLIST_ADD_END(s->mds_es_ctx->searches, s);
     856           0 :         return mds_es_next_search_trigger(s->mds_es_ctx);
     857             : }
     858             : 
     859             : struct mdssvc_backend mdsscv_backend_es = {
     860             :         .init = mdssvc_es_init,
     861             :         .shutdown = mdssvc_es_shutdown,
     862             :         .connect = mds_es_connect,
     863             :         .search_start = mds_es_search,
     864             :         .search_cont = mds_es_search_cont,
     865             : };

Generated by: LCOV version 1.14