Logstash:如何处理 Logstash pipeline 错误信息

2022-11-13,,,

转载自:https://elasticstack.blog.csdn.net/article/details/114290663

在我们使用 Logstash 的时候经常会出现一些错误。比如当我们使用 dissect 这样的 filter 时,会出现格式不匹配从而导致错误。那么我们该如何处理这类错误呢?当 dissect 遇到错误的格式不能进行解析时,会为文档添加一个叫做 _dissectfailure 的标签,并继续处理该事件:

那么我们该如何处理该类错误的信息呢?

一种比较好的办法就是通过 elasticsearch output 把他存放于另外一个索引中。我们先用如下的例子来进行实验。

dissect.conf

    input {
generator {
message => "<1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54"
count => 1
}
} filter {
if [message] =~ "THREAT," {
dissect {
mapping => {
message => "<%{priority}>%{syslog_timestamp} %{+syslog_timestamp} %{+syslog_timestamp} %{logsource} %{pan_fut_use_01},%{pan_rec_time},%{pan_serial_number},%{pan_type},%{pan_subtype},%{pan_fut_use_02},%{pan_gen_time},%{pan_src_ip},%{pan_dst_ip},%{pan_nat_src_ip},%{pan_nat_dst_ip},%{pan_rule_name},%{pan_src_user},%{pan_dst_user},%{pan_app},%{pan_vsys},%{pan_src_zone},%{pan_dst_zone},%{pan_ingress_intf},%{pan_egress_intf},%{pan_log_fwd_profile},%{pan_fut_use_03},%{pan_session_id},%{pan_repeat_cnt},%{pan_src_port},%{pan_dst_port},%{pan_nat_src_port},%{pan_nat_dst_port},%{pan_flags},%{pan_prot},%{pan_action},%{pan_misc},%{pan_threat_id},%{pan_cat},%{pan_severity},%{pan_direction},%{pan_seq_number},%{pan_action_flags},%{pan_src_location},%{pan_dst_location},%{pan_content_type},%{pan_pcap_id},%{pan_filedigest},%{pan_cloud},%{pan_user_agent},%{pan_file_type},%{pan_xff},%{pan_referer},%{pan_sender},%{pan_subject},%{pan_recipient},%{pan_report_id},%{pan_anymore}"
}
}
}
} ​
output {
stdout {
codec => rubydebug
}
} 上面的 pipeline 在正常没有错误的情况下,会生成如下的结果: 现在假如我们修改上面的 generator 部分。在它的前面添加一个空格: input {
generator {
message => " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54"
count => 1
}
} 由于 dissect 对格式是非常的挑剔。格式不对那么它就会生成一个错误。为此,它会为文档添加一个叫做 _dissectfailure 的标签。我们可以依据这个标签,把文档保存于一个叫做 parsefailures 的索引中: input {
generator {
message => " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54"
count => 100
}
} filter {
if [message] =~ "THREAT," {
dissect {
mapping => {
message => "<%{priority}>%{syslog_timestamp} %{+syslog_timestamp} %{+syslog_timestamp} %{logsource} %{pan_fut_use_01},%{pan_rec_time},%{pan_serial_number},%{pan_type},%{pan_subtype},%{pan_fut_use_02},%{pan_gen_time},%{pan_src_ip},%{pan_dst_ip},%{pan_nat_src_ip},%{pan_nat_dst_ip},%{pan_rule_name},%{pan_src_user},%{pan_dst_user},%{pan_app},%{pan_vsys},%{pan_src_zone},%{pan_dst_zone},%{pan_ingress_intf},%{pan_egress_intf},%{pan_log_fwd_profile},%{pan_fut_use_03},%{pan_session_id},%{pan_repeat_cnt},%{pan_src_port},%{pan_dst_port},%{pan_nat_src_port},%{pan_nat_dst_port},%{pan_flags},%{pan_prot},%{pan_action},%{pan_misc},%{pan_threat_id},%{pan_cat},%{pan_severity},%{pan_direction},%{pan_seq_number},%{pan_action_flags},%{pan_src_location},%{pan_dst_location},%{pan_content_type},%{pan_pcap_id},%{pan_filedigest},%{pan_cloud},%{pan_user_agent},%{pan_file_type},%{pan_xff},%{pan_referer},%{pan_sender},%{pan_subject},%{pan_recipient},%{pan_report_id},%{pan_anymore}"
}
}
}
} ​
output {
stdout {
codec => rubydebug
} if "_dissectfailure" in [tags] {
elasticsearch {
index => "parsefailures"
hosts => [ "localhost:9200" ]
}
}
} 在上面我有意识地把 generator 中的 count 增加到100。这样确保在 Logstash 退出之前,有时间把内容写到 Elasticsearch 中去。我们重新运行 Logstash: 我们发现一个错误的信息。它说明在使用 dissect filter 时导致错误。我们可以在 Kibana 中检查 parsefailures 这个索引: GET parsefailures/_search {
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 102,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "parsefailures",
"_type" : "_doc",
"_id" : "3Llu8ncBReLdFyHVZsv0",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2021-03-02T10:13:45.332Z",
"tags" : [
"_dissectfailure"
],
"sequence" : 0,
"message" : " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54",
"host" : "liuxg",
"@version" : "1"
}
},
{
"_index" : "parsefailures",
"_type" : "_doc",
"_id" : "37l08ncBReLdFyHVUcs4",
"_score" : 1.0,
"_source" : {
"tags" : [
"_dissectfailure"
],
"host" : "liuxg",
"@timestamp" : "2021-03-02T10:20:44.841Z",
"sequence" : 12,
"message" : " <1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54",
"@version" : "1"
}
},
...

Logstash:如何处理 Logstash pipeline 错误信息的相关教程结束。

《Logstash:如何处理 Logstash pipeline 错误信息.doc》

下载本文的Word格式文档,以方便收藏与打印。