Копировать данные из одного индекса в другой в эластичном поиске

Как скопировать данные из одного индекса в другой индекс в Elastic Search (который может присутствовать на том же или другом хосте)?

Выход

Reading config file {:file=>"logstash/agent.rb", :level=>:debug, :line=>"309", :method=>"local_config"}
Compiled pipeline code:
        @inputs = []
        @filters = []
        @outputs = []
        @periodic_flushers = []
        @shutdown_flushers = []

          @input_elasticsearch_1 = plugin("input", "elasticsearch", LogStash::Util.hash_merge_many({ "hosts" => ("input hostname") }, { "port" => ("9200") }, { "index" => (".kibana") }, { "size" => 500 }, { "scroll" => ("5m") }, { "docinfo" => ("true") }))

          @inputs << @input_elasticsearch_1

          @output_elasticsearch_2 = plugin("output", "elasticsearch", LogStash::Util.hash_merge_many({ "host" => ("output hostname") }, { "port" => 9200 }, { "protocol" => ("http") }, { "manage_template" => ("false") }, { "index" => ("order-logs-sample") }, { "document_type" => ("logs") }, { "document_id" => ("%{id}") }, { "workers" => 1 }))

          @outputs << @output_elasticsearch_2

  def filter_func(event)
    events = [event]
    @logger.debug? && @logger.debug("filter received", :event => event.to_hash)
    events
  end
  def output_func(event)
    @logger.debug? && @logger.debug("output received", :event => event.to_hash)
    @output_elasticsearch_2.handle(event)
    
  end {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"29", :method=>"initialize"}
Plugin not defined in namespace, checking for plugin file {:type=>"input", :name=>"elasticsearch", :path=>"logstash/inputs/elasticsearch", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"133", :method=>"lookup"}
Plugin not defined in namespace, checking for plugin file {:type=>"codec", :name=>"json", :path=>"logstash/codecs/json", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"133", :method=>"lookup"}
config LogStash::Codecs::JSON/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@hosts = ["ccwlog-stg1-01"] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@port = 9200 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@index = ".kibana" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@size = 500 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@scroll = "5m" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@docinfo = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@debug = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@codec = <LogStash::Codecs::JSON charset=>"UTF-8"> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@query = "{\"query\": { \"match_all\": {} } }" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@scan = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@docinfo_target = "@metadata" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@docinfo_fields = ["_index", "_type", "_id"] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Inputs::Elasticsearch/@ssl = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
Plugin not defined in namespace, checking for plugin file {:type=>"output", :name=>"elasticsearch", :path=>"logstash/outputs/elasticsearch", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"133", :method=>"lookup"}
'[DEPRECATED] use `require 'concurrent'` instead of `require 'concurrent_ruby'`
[2016-01-22 03:49:34.451]  WARN -- Concurrent: [DEPRECATED] Java 7 is deprecated, please use Java 8.
Java 7 support is only best effort, it may not work. It will be removed in next release (1.0).
Plugin not defined in namespace, checking for plugin file {:type=>"codec", :name=>"plain", :path=>"logstash/codecs/plain", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"133", :method=>"lookup"}
config LogStash::Codecs::Plain/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@host = ["ccwlog-stg1-01"] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@port = 9200 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@protocol = "http" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@manage_template = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@index = "order-logs-sample" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@document_type = "logs" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@document_id = "%{id}" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@workers = 1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@type = "" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@tags = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@exclude_tags = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@codec = <LogStash::Codecs::Plain charset=>"UTF-8"> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@template_name = "logstash" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@template_overwrite = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@embedded = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@embedded_http_port = "9200-9300" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@max_inflight_requests = 50 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@flush_size = 5000 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@idle_flush_time = 1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@action = "index" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@path = "/" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@ssl = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@ssl_certificate_verification = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@sniffing = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@max_retries = 3 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@retry_max_items = 5000 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
config LogStash::Outputs::ElasticSearch/@retry_max_interval = 5 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
Normalizing http path {:path=>"/", :normalized=>"/", :level=>:debug, :file=>"logstash/outputs/elasticsearch.rb", :line=>"342", :method=>"register"}
Create client to elasticsearch server on ccwlog-stg1-01: {:level=>:info, :file=>"logstash/outputs/elasticsearch.rb", :line=>"422", :method=>"register"}
Plugin is finished {:plugin=><LogStash::Inputs::Elasticsearch hosts=>["ccwlog-stg1-01"], port=>9200, index=>".kibana", size=>500, scroll=>"5m", docinfo=>true, debug=>false, codec=><LogStash::Codecs::JSON charset=>"UTF-8">, query=>"{\"query\": { \"match_all\": {} } }", scan=>true, docinfo_target=>"@metadata", docinfo_fields=>["_index", "_type", "_id"], ssl=>false>, :level=>:info, :file=>"logstash/plugin.rb", :line=>"61", :method=>"finished"}
New Elasticsearch output {:cluster=>nil, :host=>["ccwlog-stg1-01"], :port=>9200, :embedded=>false, :protocol=>"http", :level=>:info, :file=>"logstash/outputs/elasticsearch.rb", :line=>"439", :method=>"register"}
Pipeline started {:level=>:info, :file=>"logstash/pipeline.rb", :line=>"87", :method=>"run"}
Logstash startup completed
output received {:event=>{"title"=>"logindex", "timeFieldName"=>"@timestamp", "fields"=>"[{\"name\":\"caller\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"_source\",\"type\":\"_source\",\"count\":0,\"scripted\":false,\"indexed\":false,\"analyzed\":false,\"doc_values\":false},{\"name\":\"exception\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"type\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"@version\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"serviceName\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"_type\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":false,\"doc_values\":false},{\"name\":\"_id\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":false,\"analyzed\":false,\"doc_values\":false},{\"name\":\"userId\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"path\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"orderId\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"dc\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"tags\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"host\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"_index\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":false,\"analyzed\":false,\"doc_values\":false},{\"name\":\"elapsedTime\",\"type\":\"number\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":false,\"doc_values\":false},{\"name\":\"message\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false},{\"name\":\"@timestamp\",\"type\":\"date\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":false,\"doc_values\":false},{\"name\":\"performanceRequest\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"indexed\":true,\"analyzed\":true,\"doc_values\":false}]", "@version"=>"1", "@timestamp"=>"2016-01-22T11:49:35.268Z"}, :level=>:debug, :file=>"(eval)", :line=>"21", :method=>"output_func"}

person sri    schedule 08.12.2015    source источник


Ответы (3)


Вы должны просмотреть сканировать и прокручивать документ для этот функционал.

Вы извлекаете данные из старого индекса с заданными параметрами query и size, а затем bulk index в новый индекс. Различные языки предоставляют оболочку, упрощающую переиндексацию.

например, я использую python, и он имеет reindex помощник, который использует подход сканирования и прокрутки.

person ChintanShah25    schedule 08.12.2015

Простой способ сделать это — использовать Logstash с тегом < плагин href="https://www.elastic.co/guide/en/logstash/current/plugins-inputs-elasticsearch.html" rel="nofollow">elasticsearch input и elasticsearch output.

Преимущество этого решения заключается в том, что вам не нужно переписывать шаблонный код для сканирования/прокрутки и массовой переиндексации, что уже предоставляет Logstash.

После установки Logstash вы можете создать файл конфигурации copy.conf, который выглядит так:

input {
  elasticsearch {
   hosts => ["localhost:9200"]                   <--- source ES host
   index => "source_index"
  }
}
filter {
 mutate {
  remove_field => [ "@version", "@timestamp" ]   <--- remove fields added by Logstash
 }
}
output {
 elasticsearch {
   hosts => ["localhost:9200"]                   <--- target ES host
   manage_template => false
   index => "target_index"
   document_id => "%{id}"                        <--- name of your ID field
   workers => 1
 }
}

И затем, после установки правильных значений (исходный/целевой хост + исходный/целевой индекс), вы можете запустить это с помощью bin/logstash -f copy.conf

person Val    schedule 09.12.2015
comment
Вы смогли попробовать это? - person Val; 10.12.2015
comment
Привет, Вэл, я попробовал сделать вышеописанное, но столкнулся с проблемой ниже: Ошибка: [400] {ошибка:SearchPhaseExecutionException[Не удалось выполнить фазу [init_scan], не удалось выполнить все осколки; shardFailures {[eC3o5Cd7QUiR7NsSjUv10g][index][0]: RemoteTransportException[[ccwlog-stg2-01][inet[/64.102.202.10:9300]][indices:data/read/search[phase/scan]]]; вложенный: SearchParseException[[ccw-order-index][0]: from[-1],size[-1]: Ошибка анализа [Не удалось проанализировать источник [na]]]; вложенный: ElasticsearchParseException [Не удалось получить xcontent из org.elasticsearch.common.bytes.ChannelBufferBytesReference@49]; }{[eC3o5Cd7QUiR7NsSjUv10g - person sri; 22.01.2016
comment
Можете ли вы помочь с этим плагином @Val Произошла неисправимая ошибка. Перезапустит этот плагин. Плагин: ‹LogStash::Inputs::Elasticsearch hosts=›[имя хоста:9200], index=›index, query=›*, size=›500, scroll=›5m, docinfo=›true, debug=›false, codec=›‹LogStash::Codecs::JSON charset=›UTF-8›, port=›9200, scan=›true, docinfo_target=›@metadata, docinfo_fields=›[_index, _type, _id], ssl=›false › - person sri; 22.01.2016
comment
Вы уверены, что у вас есть правильное имя хоста во входных данных ES? то есть "host name:9200" - person Val; 22.01.2016
comment
У меня это так: хост => имя хоста порт => 9200 Я пробовал это также hosts => имя хоста: 9200 - person sri; 22.01.2016
comment
Можете ли вы запустить logstash с помощью --debug и обновить свой вопрос полученным результатом? - person Val; 22.01.2016
comment
Давайте продолжим обсуждение в чате. - person sri; 22.01.2016
comment
еще так много вывода, что я не добавил сюда - person sri; 22.01.2016

Эластичный поиск предоставляет Re-index API. это помогает копировать данные из одного индекса в другой. Но убедитесь, что переиндексация не пытается настроить целевой индекс. Он не копирует настройки исходного индекса. Вы должны настроить целевой индекс до запуска действия _reindex, включая настройку сопоставлений, количества сегментов, реплик и т. д.

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html

ex-- POST _reindex { "источник": { "индекс": "старое имя индекса" }, "назначение": { "индекс": "новое имя индекса" } }

person Gaurav    schedule 08.03.2018