Domanda Reindicando la ricerca elastica tramite API Bulk, scansiona e scorri


Sto provando a reindicizzare la mia configurazione di ricerca elastica, che attualmente sto esaminando la documentazione di ricerca elastica e un esempio usando l'API Python

Sono un po 'confuso su come tutto questo funzioni però. Sono stato in grado di ottenere l'ID scroll dall'API Python:

es = Elasticsearch("myhost")

index = "myindex"
query = {"query":{"match_all":{}}}
response = es.search(index= index, doc_type= "my-doc-type", body= query, search_type= "scan", scroll= "10m")

scroll_id = response["_scroll_id"]

Ora la mia domanda è: a che cosa serve questo? Cosa mi dà anche la conoscenza dell'ID di scorrimento? La documentazione dice di usare la "Bulk API" ma non ho idea di come i fattori scoll_id in questo, è stato un po 'di confusione.

Qualcuno potrebbe dare un breve esempio mostrando come reindicizzare da questo punto, considerando che ho lo scroll_id correttamente?


11
2017-10-14 22:08


origine


risposte:


Ciao, puoi usare la pergamena per scorrere tutti i documenti nel modo più efficiente. Usando scroll_id puoi trovare una sessione che è memorizzata sul server per la tua specifica richiesta di scorrimento. Quindi è necessario fornire scroll_id con ciascuna richiesta per ottenere più articoli.

La maggior parte è per documenti di indicizzazione più efficienti. Quando copi e indicizzi hai bisogno di entrambi, ma non sono realmente correlati.

Ho un codice java che potrebbe aiutarti a capire come funziona.

    public void reIndex() {
    logger.info("Start creating a new index based on the old index.");

    SearchResponse searchResponse = client.prepareSearch(MUSIC_INDEX)
            .setQuery(matchAllQuery())
            .setSearchType(SearchType.SCAN)
            .setScroll(createScrollTimeoutValue())
            .setSize(SCROLL_SIZE).execute().actionGet();

    BulkProcessor bulkProcessor = BulkProcessor.builder(client,
            createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD)
            .setConcurrentRequests(BULK_CONCURRENT_REQUESTS)
            .setFlushInterval(createFlushIntervalTime())
            .build();

    while (true) {
        searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
                .setScroll(createScrollTimeoutValue()).execute().actionGet();

        if (searchResponse.getHits().getHits().length == 0) {
            logger.info("Closing the bulk processor");
            bulkProcessor.close();
            break; //Break condition: No hits are returned
        }

        for (SearchHit hit : searchResponse.getHits()) {
            IndexRequest request = new IndexRequest(MUSIC_INDEX_NEW, hit.type(), hit.id());
            request.source(hit.sourceRef());
            bulkProcessor.add(request);
        }
    }
}

7
2017-10-14 22:17



ecco un esempio di reindicizzazione di un altro nodo elasticsearch utilizzando elasticsearch-py:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des)

puoi anche reindicizzare il risultato di una query in un indice diverso ecco come farlo:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

body = {"query": {"term": {"year": "2004"}}}
helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des, query=body)

7
2018-01-14 13:30



Per chiunque incontri questo problema, puoi utilizzare la seguente API dal client Python per reindicizzare:

https://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.reindex

Ciò ti aiuterà a evitare di dover scorrere e cercare per ottenere tutti i dati e utilizzare l'API di massa per inserire i dati nel nuovo indice.


5
2018-05-14 23:57