После добавления фильтра Prune вместе с фильтром KV логи не попадают в эластичный поиск

Я изучаю ELK и пытаюсь выступить в качестве POC для своего проекта. Я применяю фильтр KV для примеров журналов интеграции из моего проекта, и я мог видеть, что в результате появляется много дополнительных полей, поэтому я попытался применить фильтр обрезки и внести определенные поля в белый список. Я вижу, как журналы печатаются на сервере logstash, но журналы не будут выполнять эластичный поиск. Если я сниму фильтр, он перейдет к эластичному поиску. Посоветуйте, пожалуйста, как дальше отлаживать этот вопрос.

filter {
    kv {
            field_split => "{},?\[\]"
            transform_key => "capitalize"
            transform_value => "capitalize"
            trim_key => "\s"
            trim_value => "\s"
            include_brackets => false   
        }
    prune
    {
        whitelist_names => [ "App_version", "Correlation_id", "Env", "Flow_name", "host", "Instance_id", "log_level","log_thread", "log_timestamp", "message", "patient_id", "status_code", "type", "detail"]
    }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "mule-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
  }
  stdout { codec => rubydebug }
}

Мне также нужны еще два предложения,

Я также пытаюсь использовать фильтр grok в начальных журналах и пытаюсь взять поля уровня журнала (время и тип журнала) из образца журнала и отправить оставшиеся журналы в фильтр KV. Есть ли какая-нибудь ссылка, пожалуйста, поделитесь ею. Это то, что я пробовал для этого. но получается как _grokparsefailure. Я передал msgbody фильтру kv с опцией источника.

grok {
        match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}\s+%{LOGLEVEL:loglevel}\s+%{GREEDYDATA:msgbody}"}
        overwrite => [ "msgbody" ]
    }

У меня есть поля сообщений в образцах журналов, как показано ниже. Когда данные поступают в Kibana, я вижу два тега поля сообщения: один с полным журналом, а другой с правильным сообщением (выделено). Будет ли работать мутация в этом случае? Есть ли способ изменить полное имя журнала на другое ??

[2020-02-10 11: 20: 07.172] ИНФОРМАЦИЯ Mule.api [[MuleRuntime] .cpuLight.04: [main-api-test] .api-main.CPU_LITE @ 256c5cf5: [main-api-test] .main -api-main / processors / 0 / processors / 0.CPU_LITE @ 378f34b0]: событие: 00000003 {app_name = main-api-main, app_version = v1, env = Test, timestamp = 2020-02-10T11: 20: 07.172Z , log = {correlation_id = 00000003, Patient_id = 12345678, instance_id = hospital, message = Start of System API, flow_name = main-api-main}}


person Vignesh    schedule 12.04.2020    source источник


Ответы (1)


ошибка фильтра удаления

В вашем фильтре prune нет поля @timestamp в списке whitelist_names, ваш вывод основан на дате (%{+YYYY.MM.dd}), logstash нуждается в поле @timestamp в выводе для извлечения даты.

Я запустил ваш конвейер с вашим образцом сообщения, и он работал, как ожидалось, с фильтром prune сообщение отправляется в elasticsearch, но оно сохраняется в индексе с именем mule- без какого-либо поля datetime.

Без фильтра prune ваше сообщение использует время, когда logstash получил событие как @timestamp, поскольку у вас нет фильтра даты, чтобы его изменить.

Если вы создали шаблон индекса для индекса mule-* с полем datetime, например @timestamp, вы не увидите в Kibana никаких документов в индексе, которые не имеют такого же поля datetime.

ошибка grok

Ваш Grok неверен, вам нужно избегать квадратных скобок, окружающих вашу метку времени. У Kibana есть отладчик Grok, где вы можете опробовать свои шаблоны.

Следующий grok работает, переместите свой kv на бег после grok и с msgbody в качестве источника.

grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\]\s+%{LOGLEVEL:loglevel}\s+%{GREEDYDATA:msgbody}"}
    overwrite => [ "msgbody" ]
}
kv {
    source => "msgbody"
    field_split => "{},?\[\]"
    transform_key => "capitalize"
    transform_value => "capitalize"
    trim_key => "\s"
    trim_value => "\s"
    include_brackets => false
}

Просто запустите его с выводом только на стандартный вывод, чтобы увидеть фильтры, необходимые для изменения prune фильтра.

повторяющиеся поля сообщения

Если вы поместите фильтр kv после grok, вы бы не дублировали поля message, поскольку kv использует ваши поля с заглавной буквы, вы закончите с полем message, содержащим ваш полный журнал, и полем Message, содержащим ваше внутреннее сообщение, поля logstash - регистр чувствительный.

Однако вы можете переименовать любое поле, используя фильтр mutate.

mutate {
    rename => ["message", "fullLogMessage"]
}
person leandrojmp    schedule 12.04.2020
comment
Спасибо за подробный ответ и за уделенное время. Мне нужно еще одно предложение по конвейеру. Я пытаюсь выбрать файлы из нескольких ведер s3 и обработать их на одном ES. Итак, я собирался создать 4 конвейера для 4 ведер в качестве ввода и вывода как одного ES. Будет ли все работать параллельно? Поскольку, если я даю интервал для проверки в ведре как 10 минут, если для обработки ввода первого запуска из конвейера 1 требуется больше времени, чем 10 минут с. Что будет с конвейером 2, 3 и т. Д.? Итак, есть ли способ запустить все ведра индивидуально, независимо от каждого ведра s3. - person Vignesh; 13.04.2020
comment
Привет, я пытаюсь добавить два поля, присутствующих в фильтре grok, к выходному ответу, но мне не удалось их добавить. Вы можете мне помочь? Я изменил имя в Grok, как показано ниже, и добавил это в тег добавления фильтра ... add_field = ›{loglevel =›% {loglevel} log_timestamp = ›% {log_timestamp}} Я пытался сохранить отметку времени / уровень журнала в время и формат строки - person Vignesh; 13.04.2020
comment
Вы должны размещать новые вопросы, ваши комментарии не имеют отношения к исходному вопросу. - person leandrojmp; 13.04.2020
comment
Спасибо .. Я отправлю новый вопрос по проблеме Grok. Пожалуйста, помогите мне и в новом вопросе - person Vignesh; 13.04.2020
comment
Я создал новый вопрос. stackoverflow.com/ questions / 61190835 / - person Vignesh; 13.04.2020