Как удалить все поля со значением NULL в фильтре Logstash

Я читаю файл журнала контрольной точки в формате csv с logstash, и некоторые поля имеют нулевое значение.

я хочу удалить все поля с нулевым значением.

я не могу точно предсказать, какие поля (ключи) будут иметь нулевое значение, потому что у меня есть 150 столбцов в файле csv, и я не хочу проверять каждый из них.

можно ли сделать динамический фильтр в logstash, который удалит все поля с нулевым значением?

мой файл конфигурации logstash выглядит так:

input {
  stdin { tags => "checkpoint" } 
   file {
   type => "file-input"
   path =>  "D:\Browser Downloads\logstash\logstash-1.4.2\bin\checkpoint.csv"
   sincedb_path => "D:\Browser Downloads\logstash\logstash-1.4.2\bin\sincedb-access2"
   start_position => "beginning"
   tags => ["checkpoint","offline"]
  }
}
filter {
 if "checkpoint" in [tags] {
        csv {
        columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
      #  remove_field => [ any fields with null value] how to do it please 
        separator => "|"
        }
    # drop csv header
        if [num] == "num" and [date] == "date" and [time] == "time" and [orig] == "orig" {
        drop { }
    }
    }
  }

}
output {
   stdout {
    codec => rubydebug 
  }
   file {
      path => "output.txt"
   }

ЗДЕСЬ ПРИКЛАДЫВАЮ НЕКОТОРЫЕ ПРИМЕРЫ ЛОГОВ:

num|date|time|orig|type|action|alert|i/f_name|i/f_dir|product|Internal_CA:|serial_num:|dn:|sys_message:|inzone|outzone|rule|rule_uid|rule_name|service_id|src|dst|proto|service|s_port|dynamic object|change type|message_info|StormAgentName|StormAgentAction|TCP packet out of state|tcp_flags|xlatesrc|xlatedst|NAT_rulenum|NAT_addtnl_rulenum|xlatedport|xlatesport|fw_message|ICMP|ICMP Type|ICMP Code|DCE-RPC Interface UUID|rpc_prog|log_sys_message|scheme:|Validation log:|Reason:|Serial num:|Instruction:|fw_subproduct|vpn_feature_name|srckeyid|dstkeyid|user|methods:|peer gateway|IKE:|CookieI|CookieR|msgid|IKE notification:|Certificate DN:|IKE IDs:|partner|community|Session:|L2TP:|PPP:|MAC:|OM:|om_method:|assigned_IP:|machine:|reject_category|message:|VPN internal source IP|start_time|connection_uid|encryption failure:|vpn_user|Log ID|message|old IP|old port|new IP|new port|elapsed|connectivity_state|ctrl_category|description|description |severity|auth_status|identity_src|snid|src_user_name|endpoint_ip|src_machine_name|src_user_group|src_machine_group|auth_method|identity_type|Authentication trial|roles|dst_user_name|dst_machine_name|spi|encryption fail reason:|information|error_description|domain_name|termination_reason|duration
0|8Jun2012|16:33:35|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 1|8Jun2012|16:36:34|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 3|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Initiated certificate is now valid|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 4|8Jun2012|16:55:44|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Issued empty CRL 1|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
20|8Jun2012|16:58:28|10.0.0.1|log|accept||eth1|inbound|VPN-1 & FireWall-1|||||Internal|External|1|{2A42C8CD-148D-4809-A480-3171108AD6C7}||domain-udp|192.168.100.1|198.32.64.12|udp|53|1036|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

person tomer    schedule 10.03.2015    source источник


Ответы (5)


Ruby фильтр может удовлетворить ваши требования.

input {
        stdin {
        }
}

filter {
        csv {
                columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
                separator => "|"
        }
        ruby {
                code => "
                        hash = event.to_hash
                        hash.each do |k,v|
                                if v == nil
                                        event.remove(k)
                                end
                        end
                "
        }
}

output {
    stdout { codec => rubydebug }
}

Вы можете использовать плагин ruby ​​для фильтрации всех полей со значением nil (ноль в Ruby)

Обновлено:

Это моя среда: Windows Server 2008 и Logstash 1.4.1. Ваш образец логов работает у меня! Я обновил конфигурацию, ввод и вывод.

Вход

2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

Выход:

{
        "@version" => "1",
      "@timestamp" => "2015-03-12T00:30:34.123Z",
            "host" => "BENLIM",
             "num" => "2",
            "date" => "8Jun2012",
            "time" => "16:52:39",
            "orig" => "10.0.0.1",
            "type" => "log",
          "action" => "keyinst",
        "i/f_name" => "daemon",
         "i/f_dir" => "inbound",
         "product" => "VPN-1 & FireWall-1",
    "Internal_CA:" => "Certificate initialized",
     "serial_num:" => "86232",
             "dn:" => "CN=fw-KO,O=sc-KO.KO.dc.obn8cx"
}
person Ben Lim    schedule 10.03.2015
comment
Здравствуйте, Бен, спасибо за помощь, я тестирую ваш рубиновый ответ, но в выводе в файл пустые поля все еще существуют... - person tomer; 10.03.2015
comment
Могу ли я узнать, что значение поля равно нулю или нулю? Если вы хотите удалить поле со значением null, в коде ruby ​​вы хотите использовать if v== 'null ' - person Ben Lim; 10.03.2015
comment
это не работает, когда ваш вывод logstash является файлом или эластичным поиском - person tomer; 10.03.2015
comment
Но у меня это сработало либо в стандартном выводе, либо в файле. Можете ли вы предоставить образец журнала? - person Ben Lim; 11.03.2015
comment
Привет, Бен и 10x за помощь. я обновляю свой вопрос и привожу пример журнала - person tomer; 11.03.2015
comment
Я обновил ответ. Ваши логи работают у меня. Возможно, вы пытались выяснить другую проблему. Вы можете использовать мою конфигурацию и протестировать снова. - person Ben Lim; 12.03.2015
comment
вы можете попробовать добавить файл в раздел вывода. скажите, пожалуйста, есть ли нулевое значение в выходном файле - person tomer; 13.03.2015
comment
Да! Я также использую файл output. Поле с нулевым значением будет удалено - person Ben Lim; 13.03.2015
comment
Хорошо, Бен, твой ответ отлично работает, у меня была ошибка в моем файле конфигурации, потому что в моем разделе фильтра я использую remove_field =› [ ] в плагине csv. поэтому он игнорирует удаленные поля из рубинового плагина - person tomer; 13.03.2015
comment
В фильтре csv также есть опция с именем skip_empty_columns (по крайней мере, сейчас), но рубин был изобретателен. :) - person st2rseeker; 24.03.2016

Проверьте skip_empty_columns опция фильтра csv - была действительно полезно в моем случае использования. :)

Применение:

skip_empty_columns => true
person st2rseeker    schedule 24.03.2016
comment
Ух ты! Это сработало блестяще! Так мило и просто. - person Eric McLachlan; 26.01.2021

Если вам нужно рекурсивно удалить все нулевые, пустые и пустые поля (остаются 0 и false), эта функция может помочь. Он использует фильтр Ruby в Logstash. Это ни в коем случае не элегантно, но, кажется, работает довольно эффективно.

filter {
    ruby {
        init => "
            def Compact(key)
                modifiedKey = nil
                parentKey = nil

                if key.kind_of?(String)
                    if key.start_with?('[')
                        modifiedKey = key
                    else
                        modifiedKey = key.sub( /([^\[^\]]*)/, '[\1]')
                    end

                parentKey = modifiedKey.sub(/\[[^\[]+\]$/, '') unless modifiedKey.sub(/\[[^\[]+\]$/, '').strip.empty?
                end

                unless modifiedKey.nil?
                    if event.get(modifiedKey).is_a?(Enumerable) &&
                    (event.get(modifiedKey).nil? || event.get(modifiedKey).empty?)
                         event.remove(modifiedKey)
                    elsif event.get(modifiedKey).to_s.strip.empty? || event.get(modifiedKey).nil?
                         event.remove(modifiedKey)
                     end

                    if !parentKey.nil? && event.get(parentKey).is_a?(Enumerable) &&
                    (event.get(parentKey).nil? || event.get(parentKey).empty?)
                        event.remove(parentKey)
                    end
                end

               if key == event.to_hash ||
               event.get((modifiedKey ? modifiedKey : '')).is_a?(Enumerable)
                   key = event.get(modifiedKey) unless modifiedKey.nil?
                   key.each{ |k|
                      Compact(%{#{modifiedKey ? modifiedKey : ''}[#{k.first}]}) if k.is_a?(Enumerable)
                   }
               end

               rescue Exception => e
                   puts %{ruby_exception_#{__method__.to_s} - #{e}}
           end
      "

     code => "
         Compact(event.to_hash)
     "
    }
}
person StephenSolace    schedule 20.10.2017

Чтобы делать это динамически, вам нужно использовать фильтр ruby{}. В этом ответе есть хороший пример кода .

person Alain Collins    schedule 10.03.2015
comment
Привет, Ален, спасибо за помощь, я пытаюсь использовать рубиновый фильтр, но я новичок в рубине .... и результат, выводимый в мой файл, по-прежнему имеет нулевые поля, я буду рад получить пример кода для моей конкретной проблемы что я могу проверить - person tomer; 10.03.2015

person    schedule
comment
Я могу подтвердить, что приведенный выше код делает то, что ожидается — удаляет поля с нулевым значением. Использование логсташа 6.8.1. Я немного улучшил его, чтобы также удалить поля со значением -. Просто сделайте так, чтобы вторая строка if выглядела так: if v == '' || v.to_s == '{}' || v == '-' - person Yalok Iy; 31.07.2019
comment
На самом деле, чтобы удалить поля со значением null, нужно добавить еще и || v == nil. - person Yalok Iy; 08.08.2019