Logstash是一个具有实时流水线功能的开源数据收集引擎,Logstash可以动态地统一来自不同数据源的数据,并将数据规范化到您所选择的目的地,对于各种高级的下游分析和可视化用例清理和统一化所有的数据。本文介绍 Logstash 安装使用。
安装
基于docker
安装
docker pull logstash:7.5.1
docker run -d -it --restart=always \
--privileged=true \
--name=logstash \
-p 5047:5047 \
-p 9600:9600 \
logstash:7.5.1
配置
syslog
修改/usr/share/logstash/configlogstash.yml
http.host: "0.0.0.0"
#xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]
path.config: /usr/share/logstash/config/logstash.conf
创建/usr/share/logstash/config/logstash.conf
input {
syslog {
port => 5047
}
}
output {
stdout {}
}
重启服务,可以采用如下工具排查:
# yum install nc -y
# nc -uv 127.0.0.1 5047
Ncat: Version 7.50 ( https://nmap.org/ncat )
Ncat: Connected to 127.0.0.1:5047.
s
sdsdf
# logstash 的日志如下
[2018-04-13T08:45:01,956][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"main"}
[2018-04-13T08:45:02,063][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-04-13T08:45:02,086][INFO ][logstash.inputs.syslog ] Starting syslog udp listener {:address=>"0.0.0.0:5047"}
[2018-04-13T08:45:02,118][INFO ][logstash.inputs.syslog ] Starting syslog tcp listener {:address=>"0.0.0.0:5047"}
[2018-04-13T08:45:02,493][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{
"severity_label" => "Emergency",
"@timestamp" => 2018-04-13T08:48:23.049Z,
"severity" => 0,
"tags" => [
[0] "_grokparsefailure_sysloginput"
],
"host" => "127.0.0.1",
"priority" => 0,
"message" => "s\n",
"facility" => 0,
"@version" => "1",
"facility_label" => "kernel"
}
{
"severity_label" => "Emergency",
"@timestamp" => 2018-04-13T08:48:28.829Z,
"severity" => 0,
"tags" => [
[0] "_grokparsefailure_sysloginput"
],
"host" => "127.0.0.1",
"priority" => 0,
"message" => "sdsdf\n",
"facility" => 0,
"@version" => "1",
"facility_label" => "kernel"
}
kafka
logstash kafka imput 参数优化:
input{
kafka{
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
topics => ["nginx"]
codec => "json"
auto_offset_reset => "earliest"
session_timeout_ms => "60000"
request_timeout_ms => "70000"
heartbeat_interval_ms => "15000"
}
}
参数说明:
- request_timeout_ms 要大于 session_timeout_ms
- heartbeat_interval_ms 比 session_timeout_ms 小,不能大于 session_timeout_ms 的 1/3