分类目录归档:数据分析

kafka查看未被消费的日志数量

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  –zookeeper localhost:2181  –group logstash

123

Offset表示已经消费的message条数
logSize表示全部的message条数
Lag表示未消费的message条数
每个分区只能有一个消费者,也就是消费者和分区之前是一对多的关系。如上图所示一共三台logstash,每台logstash上启5和线程,所以一共15个消费者,而该Topic共16个分区,可以看到logstash 109的线程0对应了0和1两个分区

kafka常见问题汇总

Kafka如何消费已经消费过的数据


consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据消费。当然这里会产生一个很严重的问题,如果你重启一消费者程序,那你连一条数据都抓不到,但是log文件中明明可以看到所有数据都好好的存在。换句话说,一旦你消费过这些数据,那你就无法再次用同一个groupid消费同一组数据了。

原因:消费者消费了数据并不从队列中移除,只是记录了offset偏移量。同一个consumergroup的所有consumer合起来消费一个topic,并且他们每次消费的时候都会保存一个offset参数在zookeeper的root上。如果此时某个consumer挂了或者新增一个consumer进程,将会触发kafka的负载均衡,暂时性的重启所有consumer,重新分配哪个consumer去消费哪个partition,然后再继续通过保存在zookeeper上的offset参数继续读取数据。注意:offset保存的是consumer 组消费的消息偏移。
要消费同一组数据,可以采用不同的group。

partition和consumer数目关系


1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

topic 副本问题


Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。
(1) 如何分配副本
Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。

(2) Kafka分配Replica的算法如下
(1)将所有Broker(假设共n个Broker)和待分配的Partition排序
(2)将第i个Partition分配到第(i mod n)个Broker上
(3)将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

如何设置生存周期与清理数据


日志文件的删除策略非常简单:启动一个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间).清理参数在server.properties文件中:

想实现消息队列中保存2小时的消息,那么配置应该像这样:
log.roll.hours=1
log.retention.hours=2
log.segment.delete.delay.ms=0
与控制分段策略相关的几个properties:
log.roll.{hours,ms} — 日志滚动的周期时间(小时,毫秒,log.roll.ms优先级更高),到达指定周期时强制生成一个新的segment。
log.segment.bytes — 每个segment的最大容量上限(默认1GB)。到达指定容量时会强制生成一个新的segment。
与过期segment处理策略相关的几个properties:
cleanup.policy={compact,delete} — 过期segment处理算法,有两种,分别为删除和压缩,默认delete。
log.retention.{hours,minutes,ms} — 日志保留时间(小时,分钟,毫秒。优先级依次升高),超出保留时间的日志执行cleanup.policy定义的操作
log.segment.delete.delay.ms — 删除日志文件前的保留一段时间。默认60000。
log.retention.check.interval.ms — log checker的检测是否需要删除文件的周期。默认300000。

zookeeper如何管理kafka


1、Producer端使用zookeeper用来”发现”broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.
2、Broker端使用zookeeper用来注册broker信息,以及监测partition leader存活性.
3、Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.

kafka能否自动创建topics


如果broker中没有topic的信息,当producer/consumer操作topic时,是否自动创建.
auto.create.topics.enable=true

kafka delete topic,marked for deletion

kafka 删除topic 提示marked for deletion
[html] view plain copy
[root@logSer config]# kafka-topics.sh –delete –zookeeper localhost:2181 –topic test-group        
Topic test-group is marked for deletion.  
Note: This will have no impact if delete.topic.enable is not set to true.  
[root@logSer config]# kafka-topics.sh –list –zookeeper localhost:2181  
test-group – marked for deletion  
test-topic  
test-user-001  


并没有真正删除,如果要真正删除
配置delete.topic.enable=true

配置文件在kafka\config目录

[html] view plain copy
[root@logSer config]# vi server.properties   
delete.topic.enable=true  
"server.properties" 122L, 5585C written  

logstash multiline插件匹配多行日志

在处理日志时,除了访问日志外,还要处理运行时日志,该日志大都用程序写的,比如 log4j。运行时日志跟访问日志最大的不同是,运行时日志是多行,也就是说,连续的多行才能表达一个意思。
可以使用multiline来组合多行数据
对 multiline 插件来说,有三个设置比较重要:negate、pattern 和 what。
negate
类型是 boolean
默认为 false
否定正则表达式(如果没有匹配的话)。
pattern
必须设置
类型为 string
没有默认值
要匹配的正则表达式。
what
必须设置
可以为 previous 或 next
没有默认值
如果正则表达式匹配了,那么该事件是属于下一个或是前一个事件

演示WAF攻击日志如下:

172.16.100.1 [2016-08-03 01:54:54] "POST" "localhost" "11000" "SQLi" "SQL注入行为(Union)" "post" "[[union.+?select@{0,2}(\(.+\)|\s+?.+?|(`|'|").*?(`|'|"))]]" [[[[POST /dvwa/login.php HTTP/1.1
accept-language:zh-CN,zh;q=0.8
content-type:application/x-www-form-urlencoded
connection:keep-alive
content-length:67
cookie:security=low; PHPSESSID=at71e7ipvra6bodpu05kkm16t3
cache-control:no-cache
host:172.16.100.168
accept-encoding:gzip, deflate
referer:http://172.16.100.168/dvwa/login.php
origin:http://172.16.100.168
accept:text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
pragma:no-cache
upgrade-insecure-requests:1
user-agent:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36

username=admin+union+select+1%2C2%2C3&password=password&Login=Login]]]]

 

POST包是多行数据
logstash配置文件如下:

input {
    file {
        path => "/tmp/log/2016-08-03_sec.log"
        type => "test"
        codec=> multiline {
            pattern => "^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\s\["
            negate => true
            what => "previous"
        }
        start_position => beginning
    }
}
filter{
    grok{
        match => {
        "message" => "%{IP:client} \[%{GREEDYDATA:timestamp}\] \"%{WORD:method}\" \"%{GREEDYDATA:servername}\" \"%{NUMBER:ruleid}\" \"%{GREEDYDATA:attacktype}\" \"%{GREEDYDATA:description}\" \"%{WORD:position}\" \"\[\[%{GREEDYDATA:reg}\]\]\" \[\[\[\[(?<http>[\s\S]*?)\]\]\]\]"
        }
    }
    date {
        match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
        target => "@timestamp"
        "locale" => "en"
    }
    mutate {
        remove_field => [ "message" ]
    }
}
output {
    stdout {
        codec => rubydebug
    }
}

 

可以看到多行数据汇总到了message中

123

logstash filters/mutate 插件

filters/mutate 插件是 Logstash 另一个重要插件。它提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。
gsub:
替换字符,例如在CSDN数据库获取到的mail邮箱最后都带有\r,可以利用gsub将\r替换为空。gsub => [“mail”,”\r”,””]

split:
split => [“message”, “|”]
随意输入一串以|分割的字符,比如 “123|321|adfd|dfjld*=123″,可以看到如下输出:
“message” => [
[0] “123”,
[1] “321”,
[2] “adfd”,
[3] “dfjld*=123”
],

join:
仅对数组类型字段有效,我们在之前已经用 split 割切的基础再 join 回去。配置改成:
filter {
mutate {
split => [“message”, “|”]
}
mutate {
join => [“message”, “,”]
}
}

merge:
合并两个数组或者哈希字段。

strip:
过滤掉空格,strip => [“field1”, “field2”]
update:
更新某个字段的内容。如果字段不存在,不会新建。update => { “sample” => “My new message” }
rename:
重命名某个字段,如果目的字段已经存在,会被覆盖掉。rename => [“syslog_host”, “host”]
replace:
作用和 update 类似,但是当字段不存在的时候,它会起到 add_field 参数一样的效果,自动添加新的字段。
lowercase:
lowercase => [ “fieldname” ] 转换为小写
uppercase:
uppercase => [ “fieldname” ]转换为大写

logstash使用redis做消息队列

logstash agent conf文件配置:

input {
 file {
     path => "/tmp/www.log"
     type => "linux-syslog"
  }
}

output {
   redis {
        host => "192.168.192.120"
        data_type => "list"
        key => "logstash"
   }
}

logstash index conf文件配置(输出到elasticsearch):

input {
   redis {
        host => "127.0.0.1"
        type => "linux-syslog"
        data_type => "list"
        key => "logstash"
   }
}

output {
  if "_grokparsefailure" not in [tags] {
  elasticsearch {
        hosts => "127.0.0.1:9200"
        index => "logstash-test"
  }
  }
}

来看下Redis中的内容
127.0.0.1:6379> info keyspace
# Keyspace
db0:keys=1,expires=0,avg_ttl=0
127.0.0.1:6379> keys *
1) “logstash”
查看队列中的日志数量
127.0.0.1:6379> llen logstash
取出前两条信息
127.0.0.1:6379> LRANGE logstash 0 1
1) “{\”message\”:\”Nov  8 14:22:25 server120 sshd[16024]: pam_unix(sshd:session): session closed for user root\”,\”@version\”:\”1\”,\”@timestamp\”:\”2016-11-11T06:25:49.352Z\”,\”path\”:\”/var/log/secure\”,\”host\”:\”0.0.0.0\”,\”type\”:\”test\”}”
2) “{\”message\”:\”Nov 10 11:10:46 server120 sshd[13192]: Accepted password for root from 192.168.190.201 port 52042 ssh2\”,\”@version\”:\”1\”,\”@timestamp\”:\”2016-11-11T06:25:49.507Z\”,\”path\”:\”/var/log/secure\”,\”host\”:\”0.0.0.0\”,\”type\”:\”test\”}”
(integer) 6

logstash if/else表达式

格式如下:

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

可以使用以下的运算符:
equality: ==, !=, <, >, <=, >=
regexp: =~, !~ (checks a pattern on the right against a string value on the left)
inclusion: in, not in
The supported boolean operators are:
and, or, nand, xor
The supported unary operators are:
!

例如不处理GET请求:
if ([verb] == “GET”){drop{}}
只处理POST请求并且URL中包含username的日志。
if ([verb] != “POST” and  [request] !~ “username”) {
drop{}
}

在使用grok正则匹配的时候,如果没有匹配上,就会添加一个tags。
例如匹配CSDN信息的时候
grok {
match=>{ “message”=>”%{USER:username} # %{USER:passwd} # %{GREEDYDATA:mail}” }
}
然而passwd中有可能存在特殊符号,例如这条信息
omega11123 # K87)%23O # KingOmiga@gmail.com
看一下USER的正则怎么写的
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
可以看到仅匹配的大小写字母数字和._-
密码中的)%是匹配不到的。tags中会添加

          "tags" => [
        [0] "_grokparsefailure"
    ]

我们可以在输出的时候用if语句判断一下,如果正则匹配成功才输出到elasticsearch中。

  if "_grokparsefailure" not in [tags] {
  elasticsearch {
        hosts => "127.0.0.1:9200"
        index => "logstash-csdn"
        document_type => "csdn"
  }
  }

此时如果匹配失败的话,就不会输出到elasticsearch中。

logstash filter/kv插件

kv插件可以将URL、request字段取出来进行key-value值匹配,提供字段分隔符”&?”,值键分隔符”=”,会自动将字段和值采集出来。具体的配置项如下:
source:
用于匹配的字段。例如在分析nginx日志的时候,grok的正则匹配为:
%{IPORHOST:client} (%{USER:ident}|-) (%{USER:auth}|-) \[%{HTTPDATE:timestamp}\] \”(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\” %{NUMBER:response} %{NUMBER:bytes} \”(%{GREEDYDATA:referrer}|-)\” \”(%{GREEDYDATA:agent}|-)\”
请求的URL为字段request。所有我们就可以指定为source => “request”
add_field:
数组类型,用于增加一个字段。add_field => { “test” => “Hello world” }
add_tag:
用于增加一个标记。add_tag => [ “vincent” ]
查看格式化后结果:
“tags” => [
[0] “vincent”
]
allow_duplicate_values:
布尔类型,默认值为True。当存在两个相同的key值时,例如from=1&from=2,如果设置为False,则仅保留一个。
default_keys:
default_keys => [ “from”, “logstash@example.com”,
“to”, “default@dev.null” ]
如果key值中不包括from和to的话,那么就会自动添加。
value_split:
默认为= ,表示key与value分割的字符串。
field_split:
例如用&分割,则配置为field_split => “&?”
exclude_keys:
可以排除不需要的key,例如from=<abc>&to=def&foo=bar,如果不需要from和to,只需要保留foo的话,则可以:
filter {
kv {
exclude_keys => [ “from”, “to” ]
}
}
include_keys:
可以配置需要的keys,其他的key将被排除。例如我只需要我username和domain两个参数,则可以这样配置。
include_keys => [ “username”,”domain” ]
remove_field:
用于删除一个字段,remove_field => [ “domain” ]
prefix:
增加一个前缀,filter { kv { prefix => “arg_” } }
target:
如果没有配置target的话是写入到_source下,如果配置了target,则会添加到这个filed下。例如配置target => “kv”

123

trim:
过滤value中的字符,trim => “<>\[\],”
trimkey:
过滤key中的字符, trimkey => “<>\[\],”

logstash filter/date插件

从字段中获取到时间用于logstash的@timestamp。
如果不加date,获取到的时间戳为logstash分析文件的时间。

123

添加一段配置:
date {
match => [“timestamp”, “dd/MMM/yyyy:HH:mm:ss Z”]
#默认目标就是@timestamp
target => "@timestamp"
"locale" => "en"
}
然后看一下获取到的时间戳为:

可以看到@timestamp已经改变,但是看到这里相差8个小时,即@timestamp 比我们早了 8 个小时,ELK 的解决方案是在 Kibana 上,读取浏览器的当前时区,然后在页面上转换时间内容的显示。但是在elsaticsearch中存储的数据的时间还是相差8小时的。

123

需要注意的是,如果你的字段结构如下:
"_source": {
"eventdata": {
"ad_time": "20140430 142448",
}}
那么在匹配的时候需要使用全路径,match => [“[eventdata][ad_time]”, “YYYYMMdd HHmmss”]。

几个重要的配置如下:
match:
match => [“timestamp”, “dd/MMM/yyyy:HH:mm:ss Z”]
其中timestamp为正则匹配出的字段,dd/MMM/yyyy:HH:mm:ss Z为匹配的格式。如果匹配不上的话会添加一个"_dateparsefailure"的tag。
例如日志中的时间为10/Jul/2016:07:52:14 +0800,而我的date配置为:
date {
match => [“timestamp”, “dd/MMM/yyyy”]
target => "@timestamp"
"locale" => "en"
}
那么就会报错。
"tags" => [
[0] "192.168.192.120",
[1] "_dateparsefailure"
],
tag_on_failure:
如果匹配不上的时候,添加到tags中的内容,默认为[“_dateparsefailure”],此处我修改一下。”tag_on_failure” => [“date_test”]
来看一下tags
"tags" => [
[0] "192.168.192.120",
[1] "date_test"
],
target:
需要替换的字段,默认为@timestamp。

logstash filters/grok 插件

GROK在线调试:http://grokdebug.herokuapp.com/
Grok 是 Logstash 最重要的插件,对于一些格式不规范的日志,可以通过正则自定义输出字段。
对于下面这条日志:
55.3.244.1 GET /index.html 15824 0.043
可以这样解析:

filter {
grok {
match=>{ "message"=>"%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}

得到的结果为:

[root@server120 log]# /opt/logstash/bin/logstash -f /opt/logstash/etc/test.conf
Settings: Default pipeline workers: 8
Pipeline main started
{
"message" => "55.3.244.1 GET /index.html 15824 0.043",
"@version" => "1",
"@timestamp" => "2016-07-12T08:03:57.423Z",
"path" => "/tmp/log/2016-7-11.log",
"host" => "0.0.0.0",
"client" => "55.3.244.1",
"method" => "GET",
"request" => "/index.html",
"bytes" => "15824",
"duration" => "0.043"
}

grok提供了哪些SYNTAX?可以查看文件grok-patterns,它默认放在路径/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-0.3.0/patterns下。
例如上面例子中的IP正则,我们把patterns下的所有文件合并输出到一个文件中,然后查找下IP,如下所示:

IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9])
IP (?:%{IPV6}|%{IPV4})

可以看到IP包括了IPV4和IPV6的正则。
WORD的正则:
WORD \b\w+\b

假设现在要匹配一个正则表达式为regexp的字符串,而grok预定义的SYNTAX都不满足,也可以自己定义一个SYNTAX

自定义SYNTAX 方式有两种:
(1)匿名SYNTAX
将%{SYNTAX:SEMANTIC} 写为(?<SEMANTIC>regexp)
(2)命名SYNTAX
在dir下创建一个文件,文件名随意
将dir加入grok路径: patterns_dir => “./dir”
将想要增加的SYNTAX写入: SYNTAX_NAME regexp
使用方法和使用默认SYNTAX相同:%{SYNTAX_NAME:SEMANTIC}

另外如果你把 “message” 里所有的信息都 grok 到不同的字段了,数据实质上就相当于是重复存储了两份。所以你可以用 remove_field 参数来删除掉 message 字段只保留最重要的部分。
filter {
grok {
patterns_dir => “/path/to/your/own/patterns”
match => {
“message” => “%{SYSLOGBASE} %{DATA:message}”
}
overwrite => [“message”]
}
}
我们来找一条Nginx的日志,如下所示:
223.117.212.182 – – [09/Jul/2016:23:58:52 +0800] “GET /ssl.gif HTTP/1.1” 200 43 “http://www.xxoo.com/” “Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0)”
那么匹配的正则如下:

grok {
match=>{ "message"=>"%{IPORHOST:client} (%{USER:ident}|-) (%{USER:auth}|-) \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" %{NUMBER:response} %{NUMBER:bytes} \"(%{GREEDYDATA:referrer}|-)\" \"(%{GREEDYDATA:agent}|-)\"" }
}

需要注意的是符号” [ ]都需要转义。