Проблема с конфигурацией logstash при разборе многострочного журнала

У меня есть многострочный журнал ниже, который я пытаюсь проанализировать с помощью моей конфигурации logstash.

2020-05-27 11:59:17 ----------------------------------------------------------------------
2020-05-27 11:59:17 Got context
2020-05-27 11:59:17 Raw context:
                    [email protected]
                    NAME=abc.def
                    PAGER=+11111111111111
                    DATE=2020-05-27
                    AUTHOR=
                    COMMENT=
                    ADDRESS=1.1.1.1
                    ALIAS=abc.example.com
                    ATTEMPT=1
2020-05-27 11:59:17 Previous service hard state not known. Allowing all states.
2020-05-27 11:59:17 Computed variables:
                    URL=abc.example.com
                    STATE=UP                
2020-05-27 11:59:17 Preparing flexible notifications for abc.def
2020-05-27 11:59:17  channel with plugin sms
2020-05-27 11:59:17  - Skipping: set
2020-05-27 11:59:17  channel with plugin plain email        
2020-05-27 11:59:20 --------------------------------------------------------------------

Это моя конфигурация logstash:

    input {
      stdin { }
    }

    filter {

            grok {
                match => { "message" => "(?m)%{GREEDYDATA:data}"}
            }
            if [data] {
                  mutate {
                     gsub => [
                         "data", "^\s*", ""
                     ]
                  }
                  mutate {
                      gsub => ['data', "\n", " "]
                  }
             }
}

    output {
      stdout { codec => rubydebug }
    }

Конфигурация Filebeat:

  multiline.pattern: '^[[:space:]][A-Za-z]* (?m)'
  multiline.negate: false
  multiline.match: after

Чего я хочу добиться: многострочный журнал сначала будет сопоставлен с многострочным шаблоном и будет разбит на строки вроде

  Message1:  2020-05-27 11:59:17 ----------------------------------------------------------------------

  Message2: 2020-05-27 11:59:17 Got context

  Message3:  2020-05-27 11:59:17 Raw notification context:
                        [email protected]
                        NAME=abc.def
                        PAGER=+11111111111111
                        DATE=2020-05-27
                        AUTHOR=
                        COMMENT=
                        ADDRESS=1.1.1.1
                        ALIAS=abc.example.com
                        ATTEMPT=1

После этого, когда эти строки журнала будут проанализированы, они снова будут разделены с помощью разделителя, а затем я могу использовать фильтр kv для чтения каждой пары значений ключа, например ALIAS = abc.example.com, в одном сообщении номер 3.

Подскажите, как этого добиться?


person marco    schedule 27.05.2020    source источник


Ответы (1)


Я бы посоветовал вам читать из файла с помощью многострочного кодека (вы также можете определить его в разделе фильтра, если используете stdin), предоставляя шаблон для каждой новой строки с префиксом временной метки.

Затем в своем фильтре Grok используйте фильтр KV, чтобы разделить поля и значения следующим образом:

input {
    file {
        path => "C:/work/elastic/logstash-6.5.0/config/test.txt"
        start_position => "beginning"
        codec => multiline {
            pattern => "^%{TIMESTAMP_ISO8601}"
            negate => true
            what => "previous"
        }
    }
}
filter {
    kv {
        field_split => "\r\n"
        value_split => "="
        source => "message"
    }
}

output {
    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "test"
    }
}

Результат в Кибане должен выглядеть так:

введите описание изображения здесь

и это:

введите описание изображения здесь

РЕДАКТИРОВАТЬ: в комментариях, которые вы указали, вы не смогли увидеть полное значение, включая пробелы. Я повторно протестировал свое решение с новым СОСТОЯНИЕМ, предоставленным вами, и оно работает нормально:

введите описание изображения здесь

person eladyanai    schedule 28.05.2020
comment
Большое спасибо за ваш ответ. К сожалению, наши администраторы ELK в компании рекомендуют выполнять многострочный синтаксический анализ в самой конфигурации filbeat. Поскольку я новичок в ELK, я понятия не имею, каков рекомендуемый подход. Я пробовала с вашим рисунком, и он дает желаемый результат. В строке журнала, если ключ STATE=wan abc site:def cake-agent jlp bl3 mississipi algebra dev app abc1 nano /nano/ack/divgen/mississipi/bots/linux/\n, но то, что я получаю в Kibana, просто STATE = wan. Как получить все разнесенные значения в значении ключа? - person marco; 28.05.2020
comment
Привет @marco. Я только что пробовал использовать STATE=wan abc site:def cake-agent jlp bl3 mississipi algebra dev app abc1 nano /nano/ack/divgen/mississipi/bots/linux/\n и вижу все значение в Kibana, я отредактировал свой ответ, чтобы включить его. Я ничего не менял в своей конфигурации для поддержки этого. Пожалуйста, проверьте вашу конфигурацию согласно моему ответу. Что касается filebeat, его цель - быть легким агентом для перемещения журналов для дальнейшего анализа и логики в logstash. иметь логику в filebeat - плохая практика, и ее становится труднее поддерживать со временем по мере роста сложности. - person eladyanai; 31.05.2020
comment
@marco, я вижу, что последний символ в вашем комментарии - \n, если вы используете прерыватель строк в Windows, он должен быть \r\n только в Linux \n, пожалуйста, отредактируйте field_split конфигурацию соответствующим образом. - person eladyanai; 31.05.2020
comment
@marco, примите мой ответ, если вы сочтете его полезным. Спасибо. - person eladyanai; 31.05.2020