分类目录归档:数据分析

Logstash从grok到5.X版本的dissect

grok 作为 Logstash 最广为人知的插件,在性能和资源损耗方面同样也广为诟病。为了应对这个情况,同时也考虑到大多数时候,日志格式并没有那么复杂,Logstash 开发团队在 5.0 版新添加了另一个解析字段的插件:dissect。当日志格式有比较简明的分隔标志位,而且重复性较大的时候,我们可以使用 dissect 插件更快的完成解析工作。

检查所有插件
/your/logstash/path/bin/logstash-plugin list
如果没有,安装logstash-filter-dissect
/your/logstash/path/bin/logstash-plugin install logstash-filter-dissect
例如:

filter {
    dissect {
        mapping => {
            "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"
        }
        convert_datatype => {
            pid => "int"
        }
    }
}

语法解释:
我们看到上面使用了和 Grok 很类似的 %{} 语法来表示字段,这显然是基于习惯延续的考虑。不过示例中 %{+ts} 的加号就不一般了。dissect 除了字段外面的字符串定位功能以外,还通过几个特殊符号来处理字段提取的规则:
● %{+key} 这个 + 表示,前面已经捕获到一个 key 字段了,而这次捕获的内容,自动添补到之前 key 字段内容的后面。
● %{+key/2} 这个 /2 表示,在有多次捕获内容都填到 key 字段里的时候,拼接字符串的顺序谁前谁后。/2 表示排第 2 位。
● %{?string} 这个 ? 表示,这块只是一个占位,并不会实际生成捕获字段存到 Event 里面。
● %{?string} %{&string} 当同样捕获名称都是 string,但是一个 ? 一个 & 的时候,表示这是一个键值对。

比如对 http://rizhiyi.com/index.do?id=123 写这么一段配置:
http://%{domain}/%{?url}?%{?arg1}=%{&arg1}
则最终生成的 Event 内容是这样的:

{
  domain => "rizhiyi.com",
  id => "123"
}

 

实际测试:
解析Nginx日志格式如下:

127.0.0.1 - - [24/May/2017:14:24:34 +0800] \"POST /ws-sale/shopJsonService HTTP/1.1\" 200 1139 \"-\" \"Apache CXF 2.7.0\" 0.002 0.002 \"-\"

发现如果出现了\”,会解析错误,例如:

    dissect {
        mapping => {
            "message" => "%{ipaddress} - - [%{timestamp}] \"%{verb} %{url} HTTP/%{?http_version}\" %{code} %{bytes} \"%{referrer}\" \"%{agent}\" %{} %{} \"%{cookie}\""
        }
    }

在网上也查到了相关文章:https://github.com/logstash-plugins/logstash-filter-dissect/issues/10
需要替换为单引号

    dissect {
        mapping => {
            "message" => '%{ipaddress} - - [%{timestamp}] "%{verb} %{url} HTTP/%{?http_version}" %{code} %{bytes} "%{referrer}" "%{agent}" %{} %{} "%{cookie}"'
        }
    }

另外发现最后一个字段是cookie,但是拿出来的值最后都有\”,例如如果cookie为空,则日志中会记录为-,而logstash打印出来是”cookie” => “-\””
这里用ruby去掉

event.set('cookie',event.get('cookie').chop)

另外使用ruby分割客户端IP和代理IP,uri和参数的时候,发现ruby code的用法在5.3也改了。
之前在2.3的时候,用法如下:

    ruby {
        code => "
                event['uri'] = event['url'].split('?')[0]
                event['parameter'] = event['url'].split('?')[1]
        "
    }

5.3版本报错如下:

[2017-05-24T16:03:14,482][ERROR][logstash.filters.ruby    ] Ruby exception occurred: Direct event field references (i.e. event['field']) have been disabled in favor of using event get and set methods (e.g. event.get('field')). Please consult the Logstash 5.0 breaking changes documentation for more details.

修改为:

    ruby {
        code => "
            event.set('uri',event.get('url').split('?')[0])
            event.set('parameter',event.get('url').split('?')[1])
        "
    }

最终配置:

filter{
    json {
        source => "message"
    }
    dissect {
        mapping => {
            "message" => '%{ipaddress} - - [%{timestamp}] "%{verb} %{url} HTTP/%{?http_version}" %{code} %{bytes} "%{referrer}" "%{agent}" %{} %{} "%{cookie}"'
        }
    }
    ruby {
        code => "
            event.set('uri',event.get('url').split('?')[0])
            event.set('parameter',event.get('url').split('?')[1])
            event.set('client',event.get('ipaddress').split(', ')[0])
            event.set('proxy',event.get('ipaddress').split(', ')[1..5])
            event.set('cookie',event.get('cookie').chop)
        "
    }
    date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss +0800"]
        target => "@timestamp"
        "locale" => "en"
        timezone => "UTC"
    }  
}

Elasticsearch聚合之Date Histogram聚合

Date histogram的用法与histogram差不多,只不过区间上支持了日期的表达式。

{
"aggs":{
    "articles_over_time":{
        "date_histogram":{
            "field":"date",
            "interval":"month"
            }
        }
    }
}

interval字段支持多种关键字:`year`, `quarter`, `month`, `week`, `day`, `hour`, `minute`, `second`

当然也支持对这些关键字进行扩展使用,比如一个半小时可以定义成如下:

{
    "aggs":{
        "articles_over_time":{
            "date_histogram":{
                "field":"date",
                "interval":"1.5h"
                }
            }
        }
}

返回的结果可以通过设置format进行格式化:

{
    "aggs":{
        "articles_over_time":{
            "date_histogram":{
                "field":"date",
                "interval":"1M",
                "format":"yyyy-MM-dd"
                }
            }
        }
}

得到的结果如下:

{
    "aggregations":{
        "articles_over_time":{
            "buckets":[{
                "key_as_string":"2013-02-02",
                "key":1328140800000,
                "doc_count":1
            },{
                "key_as_string":"2013-03-02",
                "key":1330646400000,
                "doc_count":2
            },
            ...
            ]}
        }
}

其中key_as_string是格式化后的日期,key显示了是日期时间戳,

time_zone时区的用法
在es中日期支持时区的表示方法,这样就相当于东八区的时间。

{
    "aggs":{
        "by_day":{
            "date_histogram":{
                "field":"date",
                "interval":"day",
                "time_zone":"+08:00"
            }
        }
    }
}

offset 使用偏移值,改变时间区间
默认情况是从凌晨0点到午夜24:00,如果想改变时间区间,可以通过下面的方式,设置偏移值:

{"aggs":{
    "by_day":{
        "date_histogram":{
            "field":"date",
            "interval":"day",
            "offset":"+6h"
            }
        }
    }
}

那么桶的区间就改变为:

"aggregations":{
    "by_day":{
        "buckets":[{
            "key_as_string":"2015-09-30T06:00:00.000Z",
            "key":1443592800000,
            "doc_count":1
        },{
            "key_as_string":"2015-10-01T06:00:00.000Z",
            "key":1443679200000,
            "doc_count":1
        }]
    }
}

Missing Value缺省字段
当遇到没有值的字段,就会按照缺省字段missing value来计算:

{
    "aggs":{
        "publish_date":{
            "date_histogram":{
                "field":"publish_date",
                "interval":"year",
                "missing":"2000-01-01"
            }
        }
    }
}

公司使用Elasticsearch统计CDN日志计算95峰值带宽的搜索语句

{
  "size": 0,
  "aggs": {
    "articles_over_time": {
      "date_histogram": {
        "field": "@timestamp",
        "interval": "5m",
        "time_zone": "Asia/Shanghai",
        "format": "yyyy-MM-dd HH:mm"
      },
      "aggs": {
        "1": {
          "sum": {
            "field": "bytes"
          }
        }
      }
    }
  }
}

logstash filter/geoip 插件

GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。

配置如下:
geoip {
source => “client”
#database => “/var/geoip/GeoLiteCity.dat” 如果没有设置这个参数,默认使用GeoLiteCity数据库。
}
解析后的Geoip地址数据,默认是geoip这个字段。
logstash1.4.2下搜索geoip库
[root@web40 etc]# locate GeoLiteCity.dat
/web/logstash/logstash-1.4.2/vendor/geoip/GeoLiteCity.dat
但是在logstash-2.3.3中没有搜到GeoLiteCity.dat。
在logstash-2.3.3中运行结果如下:

GeoIP 库数据较多,如果你不需要这么多内容,可以通过 fields 选项指定自己所需要的。
例如:
fields => [“country_name”, “city_name”, “real_region_name”, “latitude”, “longitude”]
那么结果如下:

geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的 geoip 插件对 null 结果的处理是:不生成对应的 geoip字段。如果使用内网地址,会发现没有对应输出!

geoip包更新方式:
1)直接下载geoip包
https://dev.maxmind.com/geoip/legacy/geolite/
每个月的第一个周二会更新一次。
下载最新http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz
放到/soft/GeoLiteCity.dat,配置如下:
geoip {
source => “client”
#fields => [“country_name”, “city_name”, “real_region_name”, “latitude”, “longitude”]
database => “/soft/GeoLiteCity.dat”
}
2)下载更新程序更新:
https://dev.maxmind.com/geoip/geoipupdate/
下载geoipupdate-2.2.2.tar.gz并安装。
[root@localhost geoipupdate-2.2.2]# vim /usr/local/etc/GeoIP.conf
改为
# The following UserId and LicenseKey are required placeholders:
UserId 999999
LicenseKey 000000000000

# Include one or more of the following ProductIds:
# * GeoLite2-City – GeoLite 2 City
# * GeoLite2-Country – GeoLite2 Country
# * GeoLite-Legacy-IPv6-City – GeoLite Legacy IPv6 City
# * GeoLite-Legacy-IPv6-Country – GeoLite Legacy IPv6 Country
# * 506 – GeoLite Legacy Country
# * 517 – GeoLite Legacy ASN
# * 533 – GeoLite Legacy City
ProductIds GeoLite2-City GeoLite2-Country GeoLite-Legacy-IPv6-City GeoLite-Legacy-IPv6-Country 506 517 533

执行程序:
[root@localhost geoipupdate-2.2.2]# /usr/local/bin/geoipupdate -d /soft/geoip/
下载包的内容:
GeoIPv6.dat
GeoLite2-City.mmdb
GeoLite2-Country.mmdb
GeoLiteASNum.dat
GeoLiteCity.dat
GeoLiteCityv6.dat
GeoLiteCountry.dat

logstash 解码 %uxxxx

想通过logstash从cookie中获取username做关联分析。

然而发现提取出来的username为%u6E38%u5BA2(游客)
尝试使用urldecode发现不能解码。
例如配置文件如下:

 input {
    stdin {
    }
}
filter{
    urldecode {
        field => "message"
    }
}
output {
    stdout {
        codec => rubydebug
    }
}

测试结果:

可以使用ruby解码。

 ruby {
     code => "
         # urldecode non-standard %uXXXX type of string
         ['cs_uri_query', 'cs_cookie', 'cs_referer'].each { |field|
             if event[field] and event[field].include? '%u'
                 event[field] = event[field].gsub(/%u([0-9A-F]{4})/i){$1.hex.chr(Encoding::UTF_8)}.strip
             end
         }
     "
 }

然后看下解析结果

参考文章:
https://discuss.elastic.co/t/how-to-urldecode-uxxxx-type-of-strings/27718/3

Logstash由于时区导致8小时时差解决方案

Logstash 2.3版本


logstash的date插件配置:

date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}

查看解析结果发现@timestamp比中国时间早了8小时

对于页面查看,ELK 的解决方案是在 Kibana 上,读取浏览器的当前时区,然后在页面上转换时间内容的显示。
解决方案找到两个
1、

vim vendor/bundle/jruby/1.9/gems/logstash-core-event-2.3.3-java/lib/logstash/timestamp.rb

把@time = time.utc 改成time即可

2、
这种办法与Linux服务器的时区设置有关系,有些Linux可能修改不成功,推荐用第一种。
http://www.aichengxu.com/view/6621766
1)修改logstash配置

date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
"locale" => "en"
timezone => "+00:00"
}

添加 timezone => “+00:00”
然后测试@timestamp就是正常的时间了,@timestamp和timestamp是一致的。
2)因为kibana会读取浏览器的时区,然后+8小时,所以需要修改kibana的配置。
Settings – Advanced – dateFormat:tz 修改为UTC

Elasticsearch索引创建错误
修改之后发现logstash在按每天输出到elasticsearch时,每天8:00才创建当天索引,而8:00以前数据还是会输出到昨天的索引,如图:

解决办法如下:

vim ./vendor/bundle/jruby/1.9/gems/logstash-core-event-2.3.3-java/lib/logstash/string_interpolation.rb
.withZone(org.joda.time.DateTimeZone::UTC)

修改为

.withZone(org.joda.time.DateTimeZone.getDefault())

Logstash5.3版本


解决8小时时差问题,配置如下:

    date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss +0800"]
        target => "@timestamp"
        "locale" => "en"
        timezone => "UTC"
    }

Kafka日志存储解析

kafka是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率。 
Kafka的名词解释 
1,Broker : 一个单独的kafka机器节点就称为一个broker,多个broker组成的集群,称为kafka集群
2,Topic   :类似数据库中的一个表,我们将数据存储在Topic里面,当然这只是逻辑上的,在物理上,一个Topic 可能被多个Broker分区存储,这对用户是透明的,用户只需关注消息的产生于消费即可 
3,Partition:类似分区表,每个Topic可根据设置将数据存储在多个整体有序的Partition中,每个顺序化partition会生成2个文件,一个是index文件一个是log文件,index文件存储索引和偏移量,log文件存储具体的数据 
4,Producer:生产者,向Topic里面发送消息的角色 
5,Consumer:消费者,从Topic里面读取消息的角色 
6,Consumer Group:每个Consumer属于一个特定的消费者组,可为Consumer指定group name,如果不指定默认属于group 

安装配置:
启动zookeeper集群
/root/zookeeper-3.4.9/bin/zkServer.sh start

创建kafka日志目录
mkdir /data/kafka

vim /root/kafka_2.10-0.10.0.0/config/server.properties
修改broker.id为唯一
添加host.name=IP
修改log.dirs=/data/kafka
修改zookeeper.connect=10.200.1.111:2181,10.200.1.109:2181,10.200.1.110:2181
#添加过期时间,每小时新建一个分段,删除2小时前的分段
log.roll.hours=1
log.retention.hours=2
log.segment.delete.delay.ms=0

启动kafka
/root/kafka_2.10-0.10.0.0/bin/kafka-server-start.sh /root/kafka_2.10-0.10.0.0/config/server.properties & >/dev/null 2>&1

创建Topic的命令:
[root@template kafka_2.10-0.10.0.0]# bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic oplog
Created topic "test".
replication-factor表示备份数
partitions表示分区数量,分区数量需要根据消费者来定,每个分区只能由一个消费者,比如三台logstash,每台logstash上启5个线程,所以一共15个消费者。可以配置16个分区。 
查看Topic列表
[root@template kafka_2.10-0.10.0.0]#  bin/kafka-topics.sh –list –zookeeper localhost:2181
__consumer_offsets
oplog
删除Topic
[root@template kafka_2.10-0.10.0.0]# bin/kafka-topics.sh –delete –zookeeper localhost:2181 –topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

日志存储
Kafka的data是保存在文件系统中的。Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition,每个topic有几个partition是在创建topic时指定的,每个partition存储一部分Message。
partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。
zookeeper会将分区平均分配创建到不同的broker上,例如
[root@template kafka_2.10-0.10.0.0]# bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic 666
Topic:666    PartitionCount:5    ReplicationFactor:1    Configs:
    Topic: 666    Partition: 0    Leader: 3    Replicas: 3    Isr: 3
    Topic: 666    Partition: 1    Leader: 1    Replicas: 1    Isr: 1
    Topic: 666    Partition: 2    Leader: 2    Replicas: 2    Isr: 2
    Topic: 666    Partition: 3    Leader: 3    Replicas: 3    Isr: 3
    Topic: 666    Partition: 4    Leader: 1    Replicas: 1    Isr: 1
Isr表示分区创建在哪个broker上。
Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:
offset
MessageSize
data
其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。
Kafka通过分段和索引的方式来提高查询效率
1)分段

Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。
[root@template logstash-2.3.3]# ll /data/kafka/oplog-11/
总用量 328472
-rw-r–r–. 1 root root    265608 9月  22 12:48 00000000000007296468.index
-rw-r–r–. 1 root root 164713782 9月  22 12:48 00000000000007296468.log
-rw-r–r–. 1 root root    273472 9月  22 13:48 00000000000007459414.index
-rw-r–r–. 1 root root 168925074 9月  22 13:48 00000000000007459414.log
-rw-r–r–. 1 root root  10485760 9月  22 13:49 00000000000007626514.index
-rw-r–r–. 1 root root   2155779 9月  22 13:49 00000000000007626514.log
2)为数据文件建索引
数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。 
索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。
index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
 

kafka Increasing replication factor

一开始创建Topic的时候

/root/kafka_2.10-0.10.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 16 --topic oplog

现在需求是增加replication-factor为3,使用kafka自带的脚本kafka-reassign-partitions.sh
首先看一下增加replication-factor前的Topic详情:

[root@template ~]# /root/kafka_2.10-0.10.0.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oplog
Topic:oplog    PartitionCount:16    ReplicationFactor:1    Configs:
    Topic: oplog    Partition: 0    Leader: 3    Replicas: 3    Isr: 3
    Topic: oplog    Partition: 1    Leader: 1    Replicas: 1    Isr: 1
    Topic: oplog    Partition: 2    Leader: 2    Replicas: 2    Isr: 2
    Topic: oplog    Partition: 3    Leader: 3    Replicas: 3    Isr: 3
    Topic: oplog    Partition: 4    Leader: 1    Replicas: 1    Isr: 1
    Topic: oplog    Partition: 5    Leader: 2    Replicas: 2    Isr: 2
    Topic: oplog    Partition: 6    Leader: 3    Replicas: 3    Isr: 3
    Topic: oplog    Partition: 7    Leader: 1    Replicas: 1    Isr: 1
    Topic: oplog    Partition: 8    Leader: 2    Replicas: 2    Isr: 2
    Topic: oplog    Partition: 9    Leader: 3    Replicas: 3    Isr: 3
    Topic: oplog    Partition: 10    Leader: 1    Replicas: 1    Isr: 1
    Topic: oplog    Partition: 11    Leader: 2    Replicas: 2    Isr: 2
    Topic: oplog    Partition: 12    Leader: 3    Replicas: 3    Isr: 3
    Topic: oplog    Partition: 13    Leader: 1    Replicas: 1    Isr: 1
    Topic: oplog    Partition: 14    Leader: 2    Replicas: 2    Isr: 2
    Topic: oplog    Partition: 15    Leader: 3    Replicas: 3    Isr: 3

先将分区partitions 0 增加两个备份。

[root@template bin]# vim increase-replication-factor.json
{"version":1,
 "partitions":[{"topic":"oplog","partition":0,"replicas":[1,2,3]}]}
[root@template bin]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

然后可以–verify来检查修改后的状态

[root@template bin]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [oplog,0] completed successfully

同样可以使用describe查看

[root@template bin]# /root/kafka_2.10-0.10.0.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oplog
Topic:oplog    PartitionCount:16    ReplicationFactor:3    Configs:
    Topic: oplog    Partition: 0    Leader: 3    Replicas: 1,2,3    Isr: 3,1,2

然后修改increase-replication-factor.json

{"version":1,
 "partitions":[{"topic":"oplog","partition":1,"replicas":[1,2,3]},
               {"topic":"oplog","partition":2,"replicas":[1,2,3]},
               {"topic":"oplog","partition":3,"replicas":[1,2,3]},
               {"topic":"oplog","partition":4,"replicas":[1,2,3]},
               {"topic":"oplog","partition":5,"replicas":[1,2,3]},
               {"topic":"oplog","partition":6,"replicas":[1,2,3]},
               {"topic":"oplog","partition":7,"replicas":[1,2,3]},
               {"topic":"oplog","partition":8,"replicas":[1,2,3]},
               {"topic":"oplog","partition":9,"replicas":[1,2,3]},
               {"topic":"oplog","partition":10,"replicas":[1,2,3]},
               {"topic":"oplog","partition":11,"replicas":[1,2,3]},
               {"topic":"oplog","partition":12,"replicas":[1,2,3]},
               {"topic":"oplog","partition":13,"replicas":[1,2,3]},
               {"topic":"oplog","partition":14,"replicas":[1,2,3]},
               {"topic":"oplog","partition":15,"replicas":[1,2,3]}]}
[root@template bin]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
[root@template bin]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [oplog,14] completed successfully
Reassignment of partition [oplog,10] completed successfully
Reassignment of partition [oplog,3] completed successfully
Reassignment of partition [oplog,1] completed successfully
Reassignment of partition [oplog,11] completed successfully
Reassignment of partition [oplog,6] completed successfully
Reassignment of partition [oplog,2] completed successfully
Reassignment of partition [oplog,5] completed successfully
Reassignment of partition [oplog,9] completed successfully
Reassignment of partition [oplog,4] completed successfully
Reassignment of partition [oplog,12] completed successfully
Reassignment of partition [oplog,7] completed successfully
Reassignment of partition [oplog,13] completed successfully
Reassignment of partition [oplog,15] completed successfully
Reassignment of partition [oplog,8] completed successfully

elasticsearch集群搭建

Nginx日志收集服务器的集群配置如下,其中10.59.0.248为集群主,不存储数据。10.59.0.116和10.59.0.138作为数据节点。
10.59.0.248:

network.bind_host: 0
http.cors.enabled: true
cluster.name: logstash_sec
node.name: node-1
node.master: true
node.data: false
discovery.zen.ping.unicast.hosts: ["10.59.0.138","10.59.0.248","10.59.0.116"]
network.publish_host: 10.59.0.248
path.data: /data/

10.59.0.116:

network.bind_host: 0
http.cors.enabled: true
cluster.name: logstash_sec
node.name: node-3
node.master: false
node.data: true
discovery.zen.ping.unicast.hosts: ["10.59.0.138","10.59.0.248","10.211.0.116"]
network.publish_host: 10.59.0.116
path.data: /data,/backup

10.59.0.138:

network.bind_host: 0
http.cors.enabled: true
cluster.name: logstash_sec
node.name: node-2
node.master: false
node.data: true
discovery.zen.ping.unicast.hosts: ["10.59.0.248","10.59.0.138","10.59.0.116"]
network.publish_host: 10.59.0.138
path.data: /data/data/

然后分别启动,可以使用Head插件查看集群的状态
http://10.59.0.248:9200/_plugin/head/

123

先看集群的状态,集群的状态是”green”,颜色分别表示为:
1 绿色,最健康的状态,代表所有的分片包括备份都可用
2 黄色,基本的分片可用,但是备份不可用(也可能是没有备份)
3 红色,部分的分片可用,表明分片有一部分损坏。此时执行查询部分数据仍然可以查到,遇到这种情况,还是赶快解决比较好。
elasticsearch集群一旦建立起来以后,会选举出一个master,其他都为slave节点。es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,你与任何一个节点的通信和与整个es集群通信是等价的。
elasticsearch关于分区和备份的默认配置为:
index.number_of_shards:5
设置默认索引分片个数,默认为5片。
index.number_of_replicas:1
设置默认索引副本个数,默认为1个副本。
看上面的图片可以看出集群中存在三个Elasticsearch,其中五角星表示主节点,圆表示工作节点。
其次就是索引存储的分配,0~4是5个分片,是默认设置的,因为默认存在一份备份的,所以一天的索引就会有十个分片。仔细看看,对于有的分片周围是粗的线,点击查看发现其实是primary,是true就代表是主数据,检索的时候从这里拿,如果是false的时候就代表是backup的数据。
另外集群的状态和节点也可以通过REST API获取
集群的状态:

[root@localhost ~]#  curl localhost:9200/_cat/health?v
epoch      timestamp cluster      status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent 
1492130477 08:41:17  logstash_sec green           3         2     17  16    0    0        0             0                  -                100.0% 

节点Node:

[root@localhost ~]# curl localhost:9200/_cat/nodes?v
host        ip          heap.percent ram.percent load node.role master name   
10.59.0.116 10.59.0.116           42          70 0.24 d         -      node-3 
10.59.0.248 10.59.0.248            5          25 2.84 -         *      node-1 
10.59.0.138 10.59.0.138           10          99 1.26 d         -      node-2

PS:部署的时候碰见报错failed to send join request to master
原因是没有配置network.publish_host

elasticsearch.yml集群配置参数详解

重要的名词:
cluster 
代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的。es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,你与任何一个节点的通信和与整个es集群通信是等价的。 
shards 
代表索引分片,es可以把一个完整的索引分成多个分片,这样的好处是可以把一个大的索引拆分成多个,分布到不同的节点上。构成分布式搜索。分片的数量只能在索引创建前指定,并且索引创建后不能更改。 
replicas 
代表索引副本,es可以设置多个索引的副本,副本的作用一是提高系统的容错性,当个某个节点某个分片损坏或丢失时可以从副本中恢复。二是提高es的查询效率,es会自动对搜索请求进行负载均衡。 
recovery 
代表数据恢复或叫数据重新分布,es在有节点加入或退出时会根据机器的负载对索引分片进行重新分配,挂掉的节点重新启动时也会进行数据恢复。 
river 
代表es的一个数据源,也是其它存储方式(如:数据库)同步数据到es的一个方法。它是以插件方式存在的一个es服务,通过读取river中的数据并把它索引到es中,官方的river有couchDB的,RabbitMQ的,Twitter的,Wikipedia的,river这个功能将会在后面的文件中重点说到。 
gateway 
代表es索引的持久化存储方式,es默认是先把索引存放到内存中,当内存满了时再持久化到硬盘。当这个es集群关闭再重新启动时就会从gateway中读取索引数据。es支持多种类型的gateway,有本地文件系统(默认),分布式文件系统,Hadoop的HDFS和amazon的s3云存储服务。 
discovery.zen 
代表es的自动发现节点机制,es是一个基于p2p的系统,它先通过广播寻找存在的节点,再通过多播协议来进行节点之间的通信,同时也支持点对点的交互。 
Transport 
代表es内部节点或集群与客户端的交互方式,默认内部是使用tcp协议进行交互,同时它支持http协议(json格式)、thrift、servlet、memcached、zeroMQ等的传输协议(通过插件方式集成)。
具体的配置:
cluster.name:elasticsearch
配置es的集群名称,默认是elasticsearch,es会自动发现在同一网段下的es,如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群。

node.name:"FranzKafka"
节点名,默认随机指定一个name列表中名字,该列表在es的jar包中config文件夹里name.txt文件中,其中有很多作者添加的有趣名字。

node.master:true
指定该节点是否有资格被选举成为node,默认是true,es是默认集群中的第一台机器为master,如果这台机挂了就会重新选举master。

node.data:true
指定该节点是否存储索引数据,默认为true。

index.number_of_shards:5
设置默认索引分片个数,默认为5片。

index.number_of_replicas:1
设置默认索引副本个数,默认为1个副本。

path.conf:/path/to/conf
设置配置文件的存储路径,默认是es根目录下的config文件夹。

path.data:/path/to/data
设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开,例:
path.data:/path/to/data1,/path/to/data2

path.work:/path/to/work
设置临时文件的存储路径,默认是es根目录下的work文件夹。

path.logs:/path/to/logs
设置日志文件的存储路径,默认是es根目录下的logs文件夹

path.plugins:/path/to/plugins
设置插件的存放路径,默认是es根目录下的plugins文件夹

bootstrap.mlockall:true
设置为true来锁住内存。因为当jvm开始swapping时es的效率会降低,所以要保证它不swap,可以把ES_MIN_MEM和ES_MAX_MEM两个环境变量设置成同一个值,并且保证机器有足够的内存分配给es。同时也要允许elasticsearch的进程可以锁住内存,linux下可以通过`ulimit-l unlimited`命令。

network.bind_host:192.168.0.1
设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0。

network.publish_host:192.168.0.1
设置其它节点和该节点交互的ip地址,如果不设置它会自动判断,值必须是个真实的ip地址。

network.host:192.168.0.1
这个参数是用来同时设置bind_host和publish_host上面两个参数。

transport.tcp.port:9300
设置节点间交互的tcp端口,默认是9300。

transport.tcp.compress:true
设置是否压缩tcp传输时的数据,默认为false,不压缩。

http.port:9200
设置对外服务的http端口,默认为9200。

http.max_content_length:100mb
设置内容的最大容量,默认100mb

http.enabled:false
是否使用http协议对外提供服务,默认为true,开启。

gateway.type:local
gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统,分布式文件系统,hadoop的HDFS,和amazon的s3服务器,其它文件系统的设置方法下次再详细说。

gateway.recover_after_nodes:1
设置集群中N个节点启动时进行数据恢复,默认为1。

gateway.recover_after_time:5m
设置初始化数据恢复进程的超时时间,默认是5分钟。

gateway.expected_nodes:2
设置这个集群中节点的数量,默认为2,一旦这N个节点启动,就会立即进行数据恢复。

cluster.routing.allocation.node_initial_primaries_recoveries:4
初始化数据恢复时,并发恢复线程的个数,默认为4。

cluster.routing.allocation.node_concurrent_recoveries:2
添加删除节点或负载均衡时并发恢复线程的个数,默认为4。

indices.recovery.max_size_per_sec:0
设置数据恢复时限制的带宽,如入100mb,默认为0,即无限制。

indices.recovery.concurrent_streams:5
设置这个参数来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5。

discovery.zen.minimum_master_nodes:1
设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。默认为1,对于大的集群来说,可以设置大一点的值(2-4)

discovery.zen.ping.timeout:3s
设置集群中自动发现其它节点时ping连接超时时间,默认为3秒,对于比较差的网络环境可以高点的值来防止自动发现时出错。

discovery.zen.ping.multicast.enabled:false
设置是否打开多播发现节点,默认是true。

discovery.zen.ping.unicast.hosts:[“host1”, “host2:port”,”host3[portX-portY]”]
设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点

Zookeeper安装及在Kafka中的应用

Zookeeper 分布式服务框架是 Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。

安装步骤:
1)解压zookeeper-3.4.9.tar.gz
2)cp zoo_sample.cfg zoo.cfg
将 zoo_sample.cfg 改名为 zoo.cfg,因为 Zookeeper 在启动时会找这个文件作为默认配置文件。
创建data目录
mkdir -p /opt/zookeeper/data
修改zoo.cfg配置:
dataDir=/opt/zookeeper/data
server.1=172.16.100.168:2888:3888
server.2=172.16.100.170:2888:3888
具体配置如下:
dataDir:数据目录
dataLogDir:日志目录
clientPort:客户端连接端口
tickTime:Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
initLimit:Zookeeper的Leader 接受客户端(Follower)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒+
syncLimit:表示 Leader 与 Follower 之间发送消息时请求和应答时间长度,最长不能超过多少个tickTime 的时间长度,总的时间长度就是 2*2000=4 秒。
server.A=B:C:D:其中A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

接下来在dataDir所指定的目录下创建一个文件名为myid的文件,文件中的内容只有一行,为本主机对应的id值,也就是上图中server.id中的id。例如:在服务器1中的myid的内容应该写入1。

启动与停止
启动:
/opt/zookeeper/bin/zkServer.sh start
停止:
/opt/zookeeper/bin/zkServer.sh stop
查看状态:
leader:
[root@vincent zookeeper-3.4.9]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: leader
follower:
[root@vincent zookeeper-3.4.9]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: follower
通过/root/zookeeper-3.4.9/bin/zkCli.sh可以管理zookeeper
Broker注册
Broker在zookeeper中保存为一个临时节点,节点的路径是/brokers/ids/[brokerid],每个节点会保存对应broker的IP以及端口等信息.
bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[3, 2, 1]
[zk: localhost:2181(CONNECTED) 1] get /brokers/ids/1
{"jmx_port":-1,"timestamp":"1474199602586","endpoints":[“PLAINTEXT://10.200.1.111:9092”],"host":"10.200.1.111","version":3,"port":9092}
cZxid = 0x200000063
ctime = Sun Sep 18 19:53:22 CST 2016
mZxid = 0x200000063
mtime = Sun Sep 18 19:53:22 CST 2016
pZxid = 0x200000063
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1573c8a27d00005
dataLength = 135
numChildren = 0
Topic注册
在kafka中,一个topic会被分成多个区并被分到多个broker上,分区的信息以及broker的分布情况都保存在zookeeper中,根节点路径为/brokers/topics,每个topic都会在topics下建立独立的子节点,每个topic节点下都会包含分区以及broker的对应信息,例如下图中的状态
查看topic
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[oplog]
查看分区情况
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/oplog/partitions
[15, 13, 14, 11, 12, 3, 2, 1, 10, 0, 7, 6, 5, 4, 9, 8]
查看分区在topic上的分布情况
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/oplog/partitions/1/state   
{"controller_epoch":14,"leader":1,"version":1,"leader_epoch":1,"isr":[1]}
cZxid = 0x30000002c
ctime = Mon Sep 19 20:10:28 CST 2016
mZxid = 0x3000013ef
mtime = Tue Sep 20 09:09:24 CST 2016
pZxid = 0x30000002c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 73
numChildren = 0
其中isr表示分区在哪个broker节点上

kafka有消费者分组的概念,每个分组中可以包含多个消费者,每条消息只会发给分组中的一个消费者,且每个分组之间是相互独立互不影响的。
消费者与分区的对应关系
在kafka的设计中规定,对于topic的每个分区,最多只能被一个消费者进行消费,也就是消费者与分区的关系是一对多的关系。消费者与分区的关系也被存储在zookeeper中
节点的路劲为 /consumers/[group_id]/owners/[topic]/[broker_id-partition_id],该节点的内容就是消费者的Consumer ID
[zk: localhost:2181(CONNECTED) 29] ls /consumers/logstash/owners/oplog
[15, 13, 14, 11, 12, 3, 2, 1, 10, 0, 7, 6, 5, 4, 9, 8]
[zk: localhost:2181(CONNECTED) 30] get /consumers/logstash/owners/oplog/1
logstash_109-0
cZxid = 0x30003be29
ctime = Tue Sep 20 16:57:58 CST 2016
mZxid = 0x30003be29
mtime = Tue Sep 20 16:57:58 CST 2016
pZxid = 0x30003be29
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2574250b30600a9
dataLength = 14
numChildren = 0
可以看到分区1对应的消费者为logstash_109-0(logstash为Group,109为consumer ID,0为109的启动的线程号)
消费者负载均衡
消费者服务启动时,会创建一个属于消费者节点的临时节点,节点的路径为 /consumers/[group_id]/ids/[consumer_id],该节点的内容是该消费者订阅的Topic信息。
每个消费者会对/consumers/[group_id]/ids节点注册Watcher监听器,一旦消费者的数量增加或减少就会触发消费者的负载均衡。
查看目前工作的消费者
[zk: localhost:2181(CONNECTED) 31] ls /consumers/logstash/ids
[logstash_110, logstash_111, logstash_109]

消费者的offset
zookeeper记录的消费者的offset
格式为/consumers/[group_id]/offsets/[topic]/[part_id]
[zk: localhost:2181(CONNECTED) 28] get /consumers/logstash/offsets/oplog/1
705216
cZxid = 0x3000004b2
ctime = Tue Sep 20 08:36:33 CST 2016
mZxid = 0x30003d433
mtime = Tue Sep 20 17:03:51 CST 2016
pZxid = 0x3000004b2
cversion = 0
dataVersion = 9107
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
可以看到消费者的offset为705216