Как индексировать преобразованные записи журнала в AWS Elasticsearch?

TL; DR

Лямбда-функция не может индексировать журналы пожарных шлангов в управляемой AWS ES из-за «проблемы с кодированием».

Фактический ответ при ошибке

Я не получаю никаких ошибок, когда кодирую base64 один logEvent из пожарного шланга record и отправляю собранные записи в управляемую AWS ES.

См. Более подробную информацию в следующем разделе.

Сжатая полезная нагрузка в кодировке base 64 отправляется в ES, поскольку результирующее преобразование json слишком велико для индексации ES - см. эту ссылку на ES.

Я получаю следующую ошибку от ES, управляемого AWS:

{
    "deliveryStreamARN": "arn:aws:firehose:us-west-2:*:deliverystream/*",
    "destination": "arn:aws:es:us-west-2:*:domain/*",
    "deliveryStreamVersionId": 1,
    "message": "The data could not be decoded as UTF-8",
    "errorCode": "InvalidEncodingException",
    "processor": "arn:aws:lambda:us-west-2:*:function:*"
  }

Если выходная запись не сжата, the body size is too long (всего 14 МБ). Без сжатия и простой полезной нагрузки в кодировке base64 я получаю следующую ошибку в журналах Lambda:

{
  "type": "mapper_parsing_exception",
  "reason": "failed to parse",
  "caused_by": {
    "type": "not_x_content_exception",
    "reason": "Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"
  }
}

Описание

У меня есть журналы Cloudwatch, которые буферизуются по размеру / интервалу, которые передаются в Kinesis Firehose. Шланг переносит журналы в лямбда-функцию, которая преобразует журнал в запись json, которая затем должна отправить ее в управляемый AWS кластер Elasticsearch.

Лямбда-функция получает следующую структуру JSON:

{
    "invocationId": "cf1306b5-2d3c-4886-b7be-b5bcf0a66ef3",
    "deliveryStreamArn": "arn:aws:firehose:...",
    "region": "us-west-2",
    "records": [{
        "recordId": "49577998431243709525183749876652374166077260049460232194000000",
        "approximateArrivalTimestamp": 1508197563377,
        "data": "some_compressed_data_in_base_64_encoding"
    }]
}

Затем лямбда-функция извлекает .records[].data и декодирует данные как base64 и распаковывает данные, что приводит к следующему JSON:

{
  "messageType": "DATA_MESSAGE",
  "owner": "aws_account_number",
  "logGroup": "some_cloudwatch_log_group_name",
  "logStream": "i-0221b6ec01af47bfb",
  "subscriptionFilters": [
    "cloudwatch_log_subscription_filter_name"
  ],
  "logEvents": [
    {
      "id": "33633929427703365813575134502195362621356131219229245440",
      "timestamp": 1508197557000,
      "message": "Oct 16 23:45:57 some_log_entry_1"
    },
    {
      "id": "33633929427703365813575134502195362621356131219229245441",
      "timestamp": 1508197557000,
      "message": "Oct 16 23:45:57 some_log_entry_2"
    },
    {
      "id": "33633929427703365813575134502195362621356131219229245442",
      "timestamp": 1508197557000,
      "message": "Oct 16 23:45:57 some_log_entry_3"
    }
  ]
}

Отдельный элемент из .logEvents[] преобразуется в структуру json, где ключи являются желаемыми столбцами при поиске журналов в Kibana - примерно так:

{
    'journalctl_host': 'ip-172-11-11-111',
    'process': 'haproxy',
    'pid': 15507,
    'client_ip': '172.11.11.111',
    'client_port': 3924,
    'frontend_name': 'http-web',
    'backend_name': 'server',
    'server_name': 'server-3',
    'time_duration': 10,
    'status_code': 200,
    'bytes_read': 79,
    '@timestamp': '1900-10-16T23:46:01.0Z',
    'tags': ['haproxy'],
    'message': 'HEAD / HTTP/1.1'
}

Преобразованный json собирается в массив, который получает сжатую zlib и закодированную строку base64, которая затем преобразуется в новую полезную нагрузку json в качестве окончательного лямбда-результата:

{
"records": [
    {
        "recordId": "49577998431243709525183749876652374166077260049460232194000000",
        "result": "Ok",
        "data": "base64_encoded_zlib_compressed_array_of_transformed_logs"
    }
]}

Конфигурация Cloudwatch

13 записей журнала (~ 4 КБ) могут быть преобразованы примерно в 635 КБ.

Я также уменьшил пороговые значения для журналов awslogs, надеясь, что размер журналов, отправляемых в функцию Lambda, станет небольшим:

buffer_duration = 10
batch_count = 10
batch_size = 500

К сожалению, когда происходит всплеск, всплеск может достигать 2800 строк при размере более 1 МБ.

Когда результирующая полезная нагрузка лямбда-функции «слишком велика» (~ 13 МБ преобразованных журналов), в журналы лямбда-облачных наблюдателей регистрируется ошибка - «размер тела слишком длинный». Похоже, что нет никаких указаний на то, откуда эта ошибка или есть ли ограничение на размер полезной нагрузки ответа лямбда-функции.


person neowulf33    schedule 23.10.2017    source источник


Ответы (1)


Итак, сотрудники службы поддержки AWS сказали мне, что следующие ограничения не могут быть устранены для решения этого потока:

  1. размер полезной нагрузки лямбда
  2. сжатая полезная нагрузка пожарного шланга, поступающая в лямбду, которая прямо пропорциональна выходному значению лямбда.

Вместо этого я изменил архитектуру на следующее:

  1. Журналы Cloudwatch сохраняются в S3 через Firehose.
  2. События S3 обрабатываются лямбда-функцией.
  3. Лямбда-функция возвращает код успеха, если лямбда-функция преобразуется и может успешно индексировать журналы в ES.
  4. Если лямбда-функция не работает, очередь недоставленных сообщений (AWS SQS) настраивается с сигналом облачного наблюдения. Образец фрагмента облачной информации можно найти здесь.
  5. Если есть сообщения SQS, можно вручную вызвать лямбда-функцию с этими сообщениями или настроить пакетное задание AWS для обработки сообщений SQS с помощью лямбда-функции. Однако следует быть осторожным, чтобы лямбда-функция не переключилась снова на DLQ. Проверьте журналы lambda cloudwatch, чтобы узнать, почему это сообщение не было обработано и отправлено в DLQ.
person neowulf33    schedule 17.11.2017