标签归档:Logstash

Logstash从grok到5.X版本的dissect

grok 作为 Logstash 最广为人知的插件,在性能和资源损耗方面同样也广为诟病。为了应对这个情况,同时也考虑到大多数时候,日志格式并没有那么复杂,Logstash 开发团队在 5.0 版新添加了另一个解析字段的插件:dissect。当日志格式有比较简明的分隔标志位,而且重复性较大的时候,我们可以使用 dissect 插件更快的完成解析工作。

检查所有插件
/your/logstash/path/bin/logstash-plugin list
如果没有,安装logstash-filter-dissect
/your/logstash/path/bin/logstash-plugin install logstash-filter-dissect
例如:

filter {
    dissect {
        mapping => {
            "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"
        }
        convert_datatype => {
            pid => "int"
        }
    }
}

语法解释:
我们看到上面使用了和 Grok 很类似的 %{} 语法来表示字段,这显然是基于习惯延续的考虑。不过示例中 %{+ts} 的加号就不一般了。dissect 除了字段外面的字符串定位功能以外,还通过几个特殊符号来处理字段提取的规则:
● %{+key} 这个 + 表示,前面已经捕获到一个 key 字段了,而这次捕获的内容,自动添补到之前 key 字段内容的后面。
● %{+key/2} 这个 /2 表示,在有多次捕获内容都填到 key 字段里的时候,拼接字符串的顺序谁前谁后。/2 表示排第 2 位。
● %{?string} 这个 ? 表示,这块只是一个占位,并不会实际生成捕获字段存到 Event 里面。
● %{?string} %{&string} 当同样捕获名称都是 string,但是一个 ? 一个 & 的时候,表示这是一个键值对。

比如对 http://rizhiyi.com/index.do?id=123 写这么一段配置:
http://%{domain}/%{?url}?%{?arg1}=%{&arg1}
则最终生成的 Event 内容是这样的:

{
  domain => "rizhiyi.com",
  id => "123"
}

 

实际测试:
解析Nginx日志格式如下:

127.0.0.1 - - [24/May/2017:14:24:34 +0800] \"POST /ws-sale/shopJsonService HTTP/1.1\" 200 1139 \"-\" \"Apache CXF 2.7.0\" 0.002 0.002 \"-\"

发现如果出现了\”,会解析错误,例如:

    dissect {
        mapping => {
            "message" => "%{ipaddress} - - [%{timestamp}] \"%{verb} %{url} HTTP/%{?http_version}\" %{code} %{bytes} \"%{referrer}\" \"%{agent}\" %{} %{} \"%{cookie}\""
        }
    }

在网上也查到了相关文章:https://github.com/logstash-plugins/logstash-filter-dissect/issues/10
需要替换为单引号

    dissect {
        mapping => {
            "message" => '%{ipaddress} - - [%{timestamp}] "%{verb} %{url} HTTP/%{?http_version}" %{code} %{bytes} "%{referrer}" "%{agent}" %{} %{} "%{cookie}"'
        }
    }

另外发现最后一个字段是cookie,但是拿出来的值最后都有\”,例如如果cookie为空,则日志中会记录为-,而logstash打印出来是”cookie” => “-\””
这里用ruby去掉

event.set('cookie',event.get('cookie').chop)

另外使用ruby分割客户端IP和代理IP,uri和参数的时候,发现ruby code的用法在5.3也改了。
之前在2.3的时候,用法如下:

    ruby {
        code => "
                event['uri'] = event['url'].split('?')[0]
                event['parameter'] = event['url'].split('?')[1]
        "
    }

5.3版本报错如下:

[2017-05-24T16:03:14,482][ERROR][logstash.filters.ruby    ] Ruby exception occurred: Direct event field references (i.e. event['field']) have been disabled in favor of using event get and set methods (e.g. event.get('field')). Please consult the Logstash 5.0 breaking changes documentation for more details.

修改为:

    ruby {
        code => "
            event.set('uri',event.get('url').split('?')[0])
            event.set('parameter',event.get('url').split('?')[1])
        "
    }

最终配置:

filter{
    json {
        source => "message"
    }
    dissect {
        mapping => {
            "message" => '%{ipaddress} - - [%{timestamp}] "%{verb} %{url} HTTP/%{?http_version}" %{code} %{bytes} "%{referrer}" "%{agent}" %{} %{} "%{cookie}"'
        }
    }
    ruby {
        code => "
            event.set('uri',event.get('url').split('?')[0])
            event.set('parameter',event.get('url').split('?')[1])
            event.set('client',event.get('ipaddress').split(', ')[0])
            event.set('proxy',event.get('ipaddress').split(', ')[1..5])
            event.set('cookie',event.get('cookie').chop)
        "
    }
    date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss +0800"]
        target => "@timestamp"
        "locale" => "en"
        timezone => "UTC"
    }  
}

logstash filter/geoip 插件

GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。

配置如下:
geoip {
source => “client”
#database => “/var/geoip/GeoLiteCity.dat” 如果没有设置这个参数,默认使用GeoLiteCity数据库。
}
解析后的Geoip地址数据,默认是geoip这个字段。
logstash1.4.2下搜索geoip库
[root@web40 etc]# locate GeoLiteCity.dat
/web/logstash/logstash-1.4.2/vendor/geoip/GeoLiteCity.dat
但是在logstash-2.3.3中没有搜到GeoLiteCity.dat。
在logstash-2.3.3中运行结果如下:

GeoIP 库数据较多,如果你不需要这么多内容,可以通过 fields 选项指定自己所需要的。
例如:
fields => [“country_name”, “city_name”, “real_region_name”, “latitude”, “longitude”]
那么结果如下:

geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的 geoip 插件对 null 结果的处理是:不生成对应的 geoip字段。如果使用内网地址,会发现没有对应输出!

geoip包更新方式:
1)直接下载geoip包
https://dev.maxmind.com/geoip/legacy/geolite/
每个月的第一个周二会更新一次。
下载最新http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz
放到/soft/GeoLiteCity.dat,配置如下:
geoip {
source => “client”
#fields => [“country_name”, “city_name”, “real_region_name”, “latitude”, “longitude”]
database => “/soft/GeoLiteCity.dat”
}
2)下载更新程序更新:
https://dev.maxmind.com/geoip/geoipupdate/
下载geoipupdate-2.2.2.tar.gz并安装。
[root@localhost geoipupdate-2.2.2]# vim /usr/local/etc/GeoIP.conf
改为
# The following UserId and LicenseKey are required placeholders:
UserId 999999
LicenseKey 000000000000

# Include one or more of the following ProductIds:
# * GeoLite2-City – GeoLite 2 City
# * GeoLite2-Country – GeoLite2 Country
# * GeoLite-Legacy-IPv6-City – GeoLite Legacy IPv6 City
# * GeoLite-Legacy-IPv6-Country – GeoLite Legacy IPv6 Country
# * 506 – GeoLite Legacy Country
# * 517 – GeoLite Legacy ASN
# * 533 – GeoLite Legacy City
ProductIds GeoLite2-City GeoLite2-Country GeoLite-Legacy-IPv6-City GeoLite-Legacy-IPv6-Country 506 517 533

执行程序:
[root@localhost geoipupdate-2.2.2]# /usr/local/bin/geoipupdate -d /soft/geoip/
下载包的内容:
GeoIPv6.dat
GeoLite2-City.mmdb
GeoLite2-Country.mmdb
GeoLiteASNum.dat
GeoLiteCity.dat
GeoLiteCityv6.dat
GeoLiteCountry.dat

logstash 解码 %uxxxx

想通过logstash从cookie中获取username做关联分析。

然而发现提取出来的username为%u6E38%u5BA2(游客)
尝试使用urldecode发现不能解码。
例如配置文件如下:

 input {
    stdin {
    }
}
filter{
    urldecode {
        field => "message"
    }
}
output {
    stdout {
        codec => rubydebug
    }
}

测试结果:

可以使用ruby解码。

 ruby {
     code => "
         # urldecode non-standard %uXXXX type of string
         ['cs_uri_query', 'cs_cookie', 'cs_referer'].each { |field|
             if event[field] and event[field].include? '%u'
                 event[field] = event[field].gsub(/%u([0-9A-F]{4})/i){$1.hex.chr(Encoding::UTF_8)}.strip
             end
         }
     "
 }

然后看下解析结果

参考文章:
https://discuss.elastic.co/t/how-to-urldecode-uxxxx-type-of-strings/27718/3

Logstash由于时区导致8小时时差解决方案

Logstash 2.3版本


logstash的date插件配置:

date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}

查看解析结果发现@timestamp比中国时间早了8小时

对于页面查看,ELK 的解决方案是在 Kibana 上,读取浏览器的当前时区,然后在页面上转换时间内容的显示。
解决方案找到两个
1、

vim vendor/bundle/jruby/1.9/gems/logstash-core-event-2.3.3-java/lib/logstash/timestamp.rb

把@time = time.utc 改成time即可

2、
这种办法与Linux服务器的时区设置有关系,有些Linux可能修改不成功,推荐用第一种。
http://www.aichengxu.com/view/6621766
1)修改logstash配置

date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
"locale" => "en"
timezone => "+00:00"
}

添加 timezone => “+00:00”
然后测试@timestamp就是正常的时间了,@timestamp和timestamp是一致的。
2)因为kibana会读取浏览器的时区,然后+8小时,所以需要修改kibana的配置。
Settings – Advanced – dateFormat:tz 修改为UTC

Elasticsearch索引创建错误
修改之后发现logstash在按每天输出到elasticsearch时,每天8:00才创建当天索引,而8:00以前数据还是会输出到昨天的索引,如图:

解决办法如下:

vim ./vendor/bundle/jruby/1.9/gems/logstash-core-event-2.3.3-java/lib/logstash/string_interpolation.rb
.withZone(org.joda.time.DateTimeZone::UTC)

修改为

.withZone(org.joda.time.DateTimeZone.getDefault())

Logstash5.3版本


解决8小时时差问题,配置如下:

    date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss +0800"]
        target => "@timestamp"
        "locale" => "en"
        timezone => "UTC"
    }

logstash multiline插件匹配多行日志

在处理日志时,除了访问日志外,还要处理运行时日志,该日志大都用程序写的,比如 log4j。运行时日志跟访问日志最大的不同是,运行时日志是多行,也就是说,连续的多行才能表达一个意思。
可以使用multiline来组合多行数据
对 multiline 插件来说,有三个设置比较重要:negate、pattern 和 what。
negate
类型是 boolean
默认为 false
否定正则表达式(如果没有匹配的话)。
pattern
必须设置
类型为 string
没有默认值
要匹配的正则表达式。
what
必须设置
可以为 previous 或 next
没有默认值
如果正则表达式匹配了,那么该事件是属于下一个或是前一个事件

演示WAF攻击日志如下:

172.16.100.1 [2016-08-03 01:54:54] "POST" "localhost" "11000" "SQLi" "SQL注入行为(Union)" "post" "[[union.+?select@{0,2}(\(.+\)|\s+?.+?|(`|'|").*?(`|'|"))]]" [[[[POST /dvwa/login.php HTTP/1.1
accept-language:zh-CN,zh;q=0.8
content-type:application/x-www-form-urlencoded
connection:keep-alive
content-length:67
cookie:security=low; PHPSESSID=at71e7ipvra6bodpu05kkm16t3
cache-control:no-cache
host:172.16.100.168
accept-encoding:gzip, deflate
referer:http://172.16.100.168/dvwa/login.php
origin:http://172.16.100.168
accept:text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
pragma:no-cache
upgrade-insecure-requests:1
user-agent:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36

username=admin+union+select+1%2C2%2C3&password=password&Login=Login]]]]

 

POST包是多行数据
logstash配置文件如下:

input {
    file {
        path => "/tmp/log/2016-08-03_sec.log"
        type => "test"
        codec=> multiline {
            pattern => "^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\s\["
            negate => true
            what => "previous"
        }
        start_position => beginning
    }
}
filter{
    grok{
        match => {
        "message" => "%{IP:client} \[%{GREEDYDATA:timestamp}\] \"%{WORD:method}\" \"%{GREEDYDATA:servername}\" \"%{NUMBER:ruleid}\" \"%{GREEDYDATA:attacktype}\" \"%{GREEDYDATA:description}\" \"%{WORD:position}\" \"\[\[%{GREEDYDATA:reg}\]\]\" \[\[\[\[(?<http>[\s\S]*?)\]\]\]\]"
        }
    }
    date {
        match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
        target => "@timestamp"
        "locale" => "en"
    }
    mutate {
        remove_field => [ "message" ]
    }
}
output {
    stdout {
        codec => rubydebug
    }
}

 

可以看到多行数据汇总到了message中

123

logstash filters/mutate 插件

filters/mutate 插件是 Logstash 另一个重要插件。它提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。
gsub:
替换字符,例如在CSDN数据库获取到的mail邮箱最后都带有\r,可以利用gsub将\r替换为空。gsub => [“mail”,”\r”,””]

split:
split => [“message”, “|”]
随意输入一串以|分割的字符,比如 “123|321|adfd|dfjld*=123″,可以看到如下输出:
“message” => [
[0] “123”,
[1] “321”,
[2] “adfd”,
[3] “dfjld*=123”
],

join:
仅对数组类型字段有效,我们在之前已经用 split 割切的基础再 join 回去。配置改成:
filter {
mutate {
split => [“message”, “|”]
}
mutate {
join => [“message”, “,”]
}
}

merge:
合并两个数组或者哈希字段。

strip:
过滤掉空格,strip => [“field1”, “field2”]
update:
更新某个字段的内容。如果字段不存在,不会新建。update => { “sample” => “My new message” }
rename:
重命名某个字段,如果目的字段已经存在,会被覆盖掉。rename => [“syslog_host”, “host”]
replace:
作用和 update 类似,但是当字段不存在的时候,它会起到 add_field 参数一样的效果,自动添加新的字段。
lowercase:
lowercase => [ “fieldname” ] 转换为小写
uppercase:
uppercase => [ “fieldname” ]转换为大写

logstash使用redis做消息队列

logstash agent conf文件配置:

input {
 file {
     path => "/tmp/www.log"
     type => "linux-syslog"
  }
}

output {
   redis {
        host => "192.168.192.120"
        data_type => "list"
        key => "logstash"
   }
}

logstash index conf文件配置(输出到elasticsearch):

input {
   redis {
        host => "127.0.0.1"
        type => "linux-syslog"
        data_type => "list"
        key => "logstash"
   }
}

output {
  if "_grokparsefailure" not in [tags] {
  elasticsearch {
        hosts => "127.0.0.1:9200"
        index => "logstash-test"
  }
  }
}

来看下Redis中的内容
127.0.0.1:6379> info keyspace
# Keyspace
db0:keys=1,expires=0,avg_ttl=0
127.0.0.1:6379> keys *
1) “logstash”
查看队列中的日志数量
127.0.0.1:6379> llen logstash
取出前两条信息
127.0.0.1:6379> LRANGE logstash 0 1
1) “{\”message\”:\”Nov  8 14:22:25 server120 sshd[16024]: pam_unix(sshd:session): session closed for user root\”,\”@version\”:\”1\”,\”@timestamp\”:\”2016-11-11T06:25:49.352Z\”,\”path\”:\”/var/log/secure\”,\”host\”:\”0.0.0.0\”,\”type\”:\”test\”}”
2) “{\”message\”:\”Nov 10 11:10:46 server120 sshd[13192]: Accepted password for root from 192.168.190.201 port 52042 ssh2\”,\”@version\”:\”1\”,\”@timestamp\”:\”2016-11-11T06:25:49.507Z\”,\”path\”:\”/var/log/secure\”,\”host\”:\”0.0.0.0\”,\”type\”:\”test\”}”
(integer) 6

logstash if/else表达式

格式如下:

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

可以使用以下的运算符:
equality: ==, !=, <, >, <=, >=
regexp: =~, !~ (checks a pattern on the right against a string value on the left)
inclusion: in, not in
The supported boolean operators are:
and, or, nand, xor
The supported unary operators are:
!

例如不处理GET请求:
if ([verb] == “GET”){drop{}}
只处理POST请求并且URL中包含username的日志。
if ([verb] != “POST” and  [request] !~ “username”) {
drop{}
}

在使用grok正则匹配的时候,如果没有匹配上,就会添加一个tags。
例如匹配CSDN信息的时候
grok {
match=>{ “message”=>”%{USER:username} # %{USER:passwd} # %{GREEDYDATA:mail}” }
}
然而passwd中有可能存在特殊符号,例如这条信息
omega11123 # K87)%23O # KingOmiga@gmail.com
看一下USER的正则怎么写的
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
可以看到仅匹配的大小写字母数字和._-
密码中的)%是匹配不到的。tags中会添加

          "tags" => [
        [0] "_grokparsefailure"
    ]

我们可以在输出的时候用if语句判断一下,如果正则匹配成功才输出到elasticsearch中。

  if "_grokparsefailure" not in [tags] {
  elasticsearch {
        hosts => "127.0.0.1:9200"
        index => "logstash-csdn"
        document_type => "csdn"
  }
  }

此时如果匹配失败的话,就不会输出到elasticsearch中。

logstash filter/kv插件

kv插件可以将URL、request字段取出来进行key-value值匹配,提供字段分隔符”&?”,值键分隔符”=”,会自动将字段和值采集出来。具体的配置项如下:
source:
用于匹配的字段。例如在分析nginx日志的时候,grok的正则匹配为:
%{IPORHOST:client} (%{USER:ident}|-) (%{USER:auth}|-) \[%{HTTPDATE:timestamp}\] \”(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\” %{NUMBER:response} %{NUMBER:bytes} \”(%{GREEDYDATA:referrer}|-)\” \”(%{GREEDYDATA:agent}|-)\”
请求的URL为字段request。所有我们就可以指定为source => “request”
add_field:
数组类型,用于增加一个字段。add_field => { “test” => “Hello world” }
add_tag:
用于增加一个标记。add_tag => [ “vincent” ]
查看格式化后结果:
“tags” => [
[0] “vincent”
]
allow_duplicate_values:
布尔类型,默认值为True。当存在两个相同的key值时,例如from=1&from=2,如果设置为False,则仅保留一个。
default_keys:
default_keys => [ “from”, “logstash@example.com”,
“to”, “default@dev.null” ]
如果key值中不包括from和to的话,那么就会自动添加。
value_split:
默认为= ,表示key与value分割的字符串。
field_split:
例如用&分割,则配置为field_split => “&?”
exclude_keys:
可以排除不需要的key,例如from=<abc>&to=def&foo=bar,如果不需要from和to,只需要保留foo的话,则可以:
filter {
kv {
exclude_keys => [ “from”, “to” ]
}
}
include_keys:
可以配置需要的keys,其他的key将被排除。例如我只需要我username和domain两个参数,则可以这样配置。
include_keys => [ “username”,”domain” ]
remove_field:
用于删除一个字段,remove_field => [ “domain” ]
prefix:
增加一个前缀,filter { kv { prefix => “arg_” } }
target:
如果没有配置target的话是写入到_source下,如果配置了target,则会添加到这个filed下。例如配置target => “kv”

123

trim:
过滤value中的字符,trim => “<>\[\],”
trimkey:
过滤key中的字符, trimkey => “<>\[\],”

logstash filter/date插件

从字段中获取到时间用于logstash的@timestamp。
如果不加date,获取到的时间戳为logstash分析文件的时间。

123

添加一段配置:
date {
match => [“timestamp”, “dd/MMM/yyyy:HH:mm:ss Z”]
#默认目标就是@timestamp
target => "@timestamp"
"locale" => "en"
}
然后看一下获取到的时间戳为:

可以看到@timestamp已经改变,但是看到这里相差8个小时,即@timestamp 比我们早了 8 个小时,ELK 的解决方案是在 Kibana 上,读取浏览器的当前时区,然后在页面上转换时间内容的显示。但是在elsaticsearch中存储的数据的时间还是相差8小时的。

123

需要注意的是,如果你的字段结构如下:
"_source": {
"eventdata": {
"ad_time": "20140430 142448",
}}
那么在匹配的时候需要使用全路径,match => [“[eventdata][ad_time]”, “YYYYMMdd HHmmss”]。

几个重要的配置如下:
match:
match => [“timestamp”, “dd/MMM/yyyy:HH:mm:ss Z”]
其中timestamp为正则匹配出的字段,dd/MMM/yyyy:HH:mm:ss Z为匹配的格式。如果匹配不上的话会添加一个"_dateparsefailure"的tag。
例如日志中的时间为10/Jul/2016:07:52:14 +0800,而我的date配置为:
date {
match => [“timestamp”, “dd/MMM/yyyy”]
target => "@timestamp"
"locale" => "en"
}
那么就会报错。
"tags" => [
[0] "192.168.192.120",
[1] "_dateparsefailure"
],
tag_on_failure:
如果匹配不上的时候,添加到tags中的内容,默认为[“_dateparsefailure”],此处我修改一下。”tag_on_failure” => [“date_test”]
来看一下tags
"tags" => [
[0] "192.168.192.120",
[1] "date_test"
],
target:
需要替换的字段,默认为@timestamp。