Zookeeper安装及在Kafka中的应用

Zookeeper 分布式服务框架是 Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。

安装步骤:
1)解压zookeeper-3.4.9.tar.gz
2)cp zoo_sample.cfg zoo.cfg
将 zoo_sample.cfg 改名为 zoo.cfg,因为 Zookeeper 在启动时会找这个文件作为默认配置文件。
创建data目录
mkdir -p /opt/zookeeper/data
修改zoo.cfg配置:
dataDir=/opt/zookeeper/data
server.1=172.16.100.168:2888:3888
server.2=172.16.100.170:2888:3888
具体配置如下:
dataDir:数据目录
dataLogDir:日志目录
clientPort:客户端连接端口
tickTime:Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
initLimit:Zookeeper的Leader 接受客户端(Follower)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒+
syncLimit:表示 Leader 与 Follower 之间发送消息时请求和应答时间长度,最长不能超过多少个tickTime 的时间长度,总的时间长度就是 2*2000=4 秒。
server.A=B:C:D:其中A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

接下来在dataDir所指定的目录下创建一个文件名为myid的文件,文件中的内容只有一行,为本主机对应的id值,也就是上图中server.id中的id。例如:在服务器1中的myid的内容应该写入1。

启动与停止
启动:
/opt/zookeeper/bin/zkServer.sh start
停止:
/opt/zookeeper/bin/zkServer.sh stop
查看状态:
leader:
[root@vincent zookeeper-3.4.9]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: leader
follower:
[root@vincent zookeeper-3.4.9]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: follower
通过/root/zookeeper-3.4.9/bin/zkCli.sh可以管理zookeeper
Broker注册
Broker在zookeeper中保存为一个临时节点,节点的路径是/brokers/ids/[brokerid],每个节点会保存对应broker的IP以及端口等信息.
bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[3, 2, 1]
[zk: localhost:2181(CONNECTED) 1] get /brokers/ids/1
{"jmx_port":-1,"timestamp":"1474199602586","endpoints":[“PLAINTEXT://10.200.1.111:9092”],"host":"10.200.1.111","version":3,"port":9092}
cZxid = 0x200000063
ctime = Sun Sep 18 19:53:22 CST 2016
mZxid = 0x200000063
mtime = Sun Sep 18 19:53:22 CST 2016
pZxid = 0x200000063
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1573c8a27d00005
dataLength = 135
numChildren = 0
Topic注册
在kafka中,一个topic会被分成多个区并被分到多个broker上,分区的信息以及broker的分布情况都保存在zookeeper中,根节点路径为/brokers/topics,每个topic都会在topics下建立独立的子节点,每个topic节点下都会包含分区以及broker的对应信息,例如下图中的状态
查看topic
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[oplog]
查看分区情况
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/oplog/partitions
[15, 13, 14, 11, 12, 3, 2, 1, 10, 0, 7, 6, 5, 4, 9, 8]
查看分区在topic上的分布情况
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/oplog/partitions/1/state   
{"controller_epoch":14,"leader":1,"version":1,"leader_epoch":1,"isr":[1]}
cZxid = 0x30000002c
ctime = Mon Sep 19 20:10:28 CST 2016
mZxid = 0x3000013ef
mtime = Tue Sep 20 09:09:24 CST 2016
pZxid = 0x30000002c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 73
numChildren = 0
其中isr表示分区在哪个broker节点上

kafka有消费者分组的概念,每个分组中可以包含多个消费者,每条消息只会发给分组中的一个消费者,且每个分组之间是相互独立互不影响的。
消费者与分区的对应关系
在kafka的设计中规定,对于topic的每个分区,最多只能被一个消费者进行消费,也就是消费者与分区的关系是一对多的关系。消费者与分区的关系也被存储在zookeeper中
节点的路劲为 /consumers/[group_id]/owners/[topic]/[broker_id-partition_id],该节点的内容就是消费者的Consumer ID
[zk: localhost:2181(CONNECTED) 29] ls /consumers/logstash/owners/oplog
[15, 13, 14, 11, 12, 3, 2, 1, 10, 0, 7, 6, 5, 4, 9, 8]
[zk: localhost:2181(CONNECTED) 30] get /consumers/logstash/owners/oplog/1
logstash_109-0
cZxid = 0x30003be29
ctime = Tue Sep 20 16:57:58 CST 2016
mZxid = 0x30003be29
mtime = Tue Sep 20 16:57:58 CST 2016
pZxid = 0x30003be29
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2574250b30600a9
dataLength = 14
numChildren = 0
可以看到分区1对应的消费者为logstash_109-0(logstash为Group,109为consumer ID,0为109的启动的线程号)
消费者负载均衡
消费者服务启动时,会创建一个属于消费者节点的临时节点,节点的路径为 /consumers/[group_id]/ids/[consumer_id],该节点的内容是该消费者订阅的Topic信息。
每个消费者会对/consumers/[group_id]/ids节点注册Watcher监听器,一旦消费者的数量增加或减少就会触发消费者的负载均衡。
查看目前工作的消费者
[zk: localhost:2181(CONNECTED) 31] ls /consumers/logstash/ids
[logstash_110, logstash_111, logstash_109]

消费者的offset
zookeeper记录的消费者的offset
格式为/consumers/[group_id]/offsets/[topic]/[part_id]
[zk: localhost:2181(CONNECTED) 28] get /consumers/logstash/offsets/oplog/1
705216
cZxid = 0x3000004b2
ctime = Tue Sep 20 08:36:33 CST 2016
mZxid = 0x30003d433
mtime = Tue Sep 20 17:03:51 CST 2016
pZxid = 0x3000004b2
cversion = 0
dataVersion = 9107
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
可以看到消费者的offset为705216