Kafka日志存储解析

kafka是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率。 
Kafka的名词解释 
1,Broker : 一个单独的kafka机器节点就称为一个broker,多个broker组成的集群,称为kafka集群
2,Topic   :类似数据库中的一个表,我们将数据存储在Topic里面,当然这只是逻辑上的,在物理上,一个Topic 可能被多个Broker分区存储,这对用户是透明的,用户只需关注消息的产生于消费即可 
3,Partition:类似分区表,每个Topic可根据设置将数据存储在多个整体有序的Partition中,每个顺序化partition会生成2个文件,一个是index文件一个是log文件,index文件存储索引和偏移量,log文件存储具体的数据 
4,Producer:生产者,向Topic里面发送消息的角色 
5,Consumer:消费者,从Topic里面读取消息的角色 
6,Consumer Group:每个Consumer属于一个特定的消费者组,可为Consumer指定group name,如果不指定默认属于group 

安装配置:
启动zookeeper集群
/root/zookeeper-3.4.9/bin/zkServer.sh start

创建kafka日志目录
mkdir /data/kafka

vim /root/kafka_2.10-0.10.0.0/config/server.properties
修改broker.id为唯一
添加host.name=IP
修改log.dirs=/data/kafka
修改zookeeper.connect=10.200.1.111:2181,10.200.1.109:2181,10.200.1.110:2181
#添加过期时间,每小时新建一个分段,删除2小时前的分段
log.roll.hours=1
log.retention.hours=2
log.segment.delete.delay.ms=0

启动kafka
/root/kafka_2.10-0.10.0.0/bin/kafka-server-start.sh /root/kafka_2.10-0.10.0.0/config/server.properties & >/dev/null 2>&1

创建Topic的命令:
[root@template kafka_2.10-0.10.0.0]# bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic oplog
Created topic "test".
replication-factor表示备份数
partitions表示分区数量,分区数量需要根据消费者来定,每个分区只能由一个消费者,比如三台logstash,每台logstash上启5个线程,所以一共15个消费者。可以配置16个分区。 
查看Topic列表
[root@template kafka_2.10-0.10.0.0]#  bin/kafka-topics.sh –list –zookeeper localhost:2181
__consumer_offsets
oplog
删除Topic
[root@template kafka_2.10-0.10.0.0]# bin/kafka-topics.sh –delete –zookeeper localhost:2181 –topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

日志存储
Kafka的data是保存在文件系统中的。Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition,每个topic有几个partition是在创建topic时指定的,每个partition存储一部分Message。
partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。
zookeeper会将分区平均分配创建到不同的broker上,例如
[root@template kafka_2.10-0.10.0.0]# bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic 666
Topic:666    PartitionCount:5    ReplicationFactor:1    Configs:
    Topic: 666    Partition: 0    Leader: 3    Replicas: 3    Isr: 3
    Topic: 666    Partition: 1    Leader: 1    Replicas: 1    Isr: 1
    Topic: 666    Partition: 2    Leader: 2    Replicas: 2    Isr: 2
    Topic: 666    Partition: 3    Leader: 3    Replicas: 3    Isr: 3
    Topic: 666    Partition: 4    Leader: 1    Replicas: 1    Isr: 1
Isr表示分区创建在哪个broker上。
Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:
offset
MessageSize
data
其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。
Kafka通过分段和索引的方式来提高查询效率
1)分段

Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。
[root@template logstash-2.3.3]# ll /data/kafka/oplog-11/
总用量 328472
-rw-r–r–. 1 root root    265608 9月  22 12:48 00000000000007296468.index
-rw-r–r–. 1 root root 164713782 9月  22 12:48 00000000000007296468.log
-rw-r–r–. 1 root root    273472 9月  22 13:48 00000000000007459414.index
-rw-r–r–. 1 root root 168925074 9月  22 13:48 00000000000007459414.log
-rw-r–r–. 1 root root  10485760 9月  22 13:49 00000000000007626514.index
-rw-r–r–. 1 root root   2155779 9月  22 13:49 00000000000007626514.log
2)为数据文件建索引
数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。 
索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。
index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。