标签归档:kafka

Kafka日志存储解析

kafka是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率。 
Kafka的名词解释 
1,Broker : 一个单独的kafka机器节点就称为一个broker,多个broker组成的集群,称为kafka集群
2,Topic   :类似数据库中的一个表,我们将数据存储在Topic里面,当然这只是逻辑上的,在物理上,一个Topic 可能被多个Broker分区存储,这对用户是透明的,用户只需关注消息的产生于消费即可 
3,Partition:类似分区表,每个Topic可根据设置将数据存储在多个整体有序的Partition中,每个顺序化partition会生成2个文件,一个是index文件一个是log文件,index文件存储索引和偏移量,log文件存储具体的数据 
4,Producer:生产者,向Topic里面发送消息的角色 
5,Consumer:消费者,从Topic里面读取消息的角色 
6,Consumer Group:每个Consumer属于一个特定的消费者组,可为Consumer指定group name,如果不指定默认属于group 

安装配置:
启动zookeeper集群
/root/zookeeper-3.4.9/bin/zkServer.sh start

创建kafka日志目录
mkdir /data/kafka

vim /root/kafka_2.10-0.10.0.0/config/server.properties
修改broker.id为唯一
添加host.name=IP
修改log.dirs=/data/kafka
修改zookeeper.connect=10.200.1.111:2181,10.200.1.109:2181,10.200.1.110:2181
#添加过期时间,每小时新建一个分段,删除2小时前的分段
log.roll.hours=1
log.retention.hours=2
log.segment.delete.delay.ms=0

启动kafka
/root/kafka_2.10-0.10.0.0/bin/kafka-server-start.sh /root/kafka_2.10-0.10.0.0/config/server.properties & >/dev/null 2>&1

创建Topic的命令:
[root@template kafka_2.10-0.10.0.0]# bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic oplog
Created topic "test".
replication-factor表示备份数
partitions表示分区数量,分区数量需要根据消费者来定,每个分区只能由一个消费者,比如三台logstash,每台logstash上启5个线程,所以一共15个消费者。可以配置16个分区。 
查看Topic列表
[root@template kafka_2.10-0.10.0.0]#  bin/kafka-topics.sh –list –zookeeper localhost:2181
__consumer_offsets
oplog
删除Topic
[root@template kafka_2.10-0.10.0.0]# bin/kafka-topics.sh –delete –zookeeper localhost:2181 –topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

日志存储
Kafka的data是保存在文件系统中的。Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition,每个topic有几个partition是在创建topic时指定的,每个partition存储一部分Message。
partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。
zookeeper会将分区平均分配创建到不同的broker上,例如
[root@template kafka_2.10-0.10.0.0]# bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic 666
Topic:666    PartitionCount:5    ReplicationFactor:1    Configs:
    Topic: 666    Partition: 0    Leader: 3    Replicas: 3    Isr: 3
    Topic: 666    Partition: 1    Leader: 1    Replicas: 1    Isr: 1
    Topic: 666    Partition: 2    Leader: 2    Replicas: 2    Isr: 2
    Topic: 666    Partition: 3    Leader: 3    Replicas: 3    Isr: 3
    Topic: 666    Partition: 4    Leader: 1    Replicas: 1    Isr: 1
Isr表示分区创建在哪个broker上。
Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:
offset
MessageSize
data
其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。
Kafka通过分段和索引的方式来提高查询效率
1)分段

Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。
[root@template logstash-2.3.3]# ll /data/kafka/oplog-11/
总用量 328472
-rw-r–r–. 1 root root    265608 9月  22 12:48 00000000000007296468.index
-rw-r–r–. 1 root root 164713782 9月  22 12:48 00000000000007296468.log
-rw-r–r–. 1 root root    273472 9月  22 13:48 00000000000007459414.index
-rw-r–r–. 1 root root 168925074 9月  22 13:48 00000000000007459414.log
-rw-r–r–. 1 root root  10485760 9月  22 13:49 00000000000007626514.index
-rw-r–r–. 1 root root   2155779 9月  22 13:49 00000000000007626514.log
2)为数据文件建索引
数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。 
索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。
index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
 

kafka Increasing replication factor

一开始创建Topic的时候

/root/kafka_2.10-0.10.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 16 --topic oplog

现在需求是增加replication-factor为3,使用kafka自带的脚本kafka-reassign-partitions.sh
首先看一下增加replication-factor前的Topic详情:

[root@template ~]# /root/kafka_2.10-0.10.0.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oplog
Topic:oplog    PartitionCount:16    ReplicationFactor:1    Configs:
    Topic: oplog    Partition: 0    Leader: 3    Replicas: 3    Isr: 3
    Topic: oplog    Partition: 1    Leader: 1    Replicas: 1    Isr: 1
    Topic: oplog    Partition: 2    Leader: 2    Replicas: 2    Isr: 2
    Topic: oplog    Partition: 3    Leader: 3    Replicas: 3    Isr: 3
    Topic: oplog    Partition: 4    Leader: 1    Replicas: 1    Isr: 1
    Topic: oplog    Partition: 5    Leader: 2    Replicas: 2    Isr: 2
    Topic: oplog    Partition: 6    Leader: 3    Replicas: 3    Isr: 3
    Topic: oplog    Partition: 7    Leader: 1    Replicas: 1    Isr: 1
    Topic: oplog    Partition: 8    Leader: 2    Replicas: 2    Isr: 2
    Topic: oplog    Partition: 9    Leader: 3    Replicas: 3    Isr: 3
    Topic: oplog    Partition: 10    Leader: 1    Replicas: 1    Isr: 1
    Topic: oplog    Partition: 11    Leader: 2    Replicas: 2    Isr: 2
    Topic: oplog    Partition: 12    Leader: 3    Replicas: 3    Isr: 3
    Topic: oplog    Partition: 13    Leader: 1    Replicas: 1    Isr: 1
    Topic: oplog    Partition: 14    Leader: 2    Replicas: 2    Isr: 2
    Topic: oplog    Partition: 15    Leader: 3    Replicas: 3    Isr: 3

先将分区partitions 0 增加两个备份。

[root@template bin]# vim increase-replication-factor.json
{"version":1,
 "partitions":[{"topic":"oplog","partition":0,"replicas":[1,2,3]}]}
[root@template bin]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

然后可以–verify来检查修改后的状态

[root@template bin]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [oplog,0] completed successfully

同样可以使用describe查看

[root@template bin]# /root/kafka_2.10-0.10.0.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oplog
Topic:oplog    PartitionCount:16    ReplicationFactor:3    Configs:
    Topic: oplog    Partition: 0    Leader: 3    Replicas: 1,2,3    Isr: 3,1,2

然后修改increase-replication-factor.json

{"version":1,
 "partitions":[{"topic":"oplog","partition":1,"replicas":[1,2,3]},
               {"topic":"oplog","partition":2,"replicas":[1,2,3]},
               {"topic":"oplog","partition":3,"replicas":[1,2,3]},
               {"topic":"oplog","partition":4,"replicas":[1,2,3]},
               {"topic":"oplog","partition":5,"replicas":[1,2,3]},
               {"topic":"oplog","partition":6,"replicas":[1,2,3]},
               {"topic":"oplog","partition":7,"replicas":[1,2,3]},
               {"topic":"oplog","partition":8,"replicas":[1,2,3]},
               {"topic":"oplog","partition":9,"replicas":[1,2,3]},
               {"topic":"oplog","partition":10,"replicas":[1,2,3]},
               {"topic":"oplog","partition":11,"replicas":[1,2,3]},
               {"topic":"oplog","partition":12,"replicas":[1,2,3]},
               {"topic":"oplog","partition":13,"replicas":[1,2,3]},
               {"topic":"oplog","partition":14,"replicas":[1,2,3]},
               {"topic":"oplog","partition":15,"replicas":[1,2,3]}]}
[root@template bin]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
[root@template bin]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [oplog,14] completed successfully
Reassignment of partition [oplog,10] completed successfully
Reassignment of partition [oplog,3] completed successfully
Reassignment of partition [oplog,1] completed successfully
Reassignment of partition [oplog,11] completed successfully
Reassignment of partition [oplog,6] completed successfully
Reassignment of partition [oplog,2] completed successfully
Reassignment of partition [oplog,5] completed successfully
Reassignment of partition [oplog,9] completed successfully
Reassignment of partition [oplog,4] completed successfully
Reassignment of partition [oplog,12] completed successfully
Reassignment of partition [oplog,7] completed successfully
Reassignment of partition [oplog,13] completed successfully
Reassignment of partition [oplog,15] completed successfully
Reassignment of partition [oplog,8] completed successfully

Zookeeper安装及在Kafka中的应用

Zookeeper 分布式服务框架是 Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。

安装步骤:
1)解压zookeeper-3.4.9.tar.gz
2)cp zoo_sample.cfg zoo.cfg
将 zoo_sample.cfg 改名为 zoo.cfg,因为 Zookeeper 在启动时会找这个文件作为默认配置文件。
创建data目录
mkdir -p /opt/zookeeper/data
修改zoo.cfg配置:
dataDir=/opt/zookeeper/data
server.1=172.16.100.168:2888:3888
server.2=172.16.100.170:2888:3888
具体配置如下:
dataDir:数据目录
dataLogDir:日志目录
clientPort:客户端连接端口
tickTime:Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
initLimit:Zookeeper的Leader 接受客户端(Follower)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒+
syncLimit:表示 Leader 与 Follower 之间发送消息时请求和应答时间长度,最长不能超过多少个tickTime 的时间长度,总的时间长度就是 2*2000=4 秒。
server.A=B:C:D:其中A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

接下来在dataDir所指定的目录下创建一个文件名为myid的文件,文件中的内容只有一行,为本主机对应的id值,也就是上图中server.id中的id。例如:在服务器1中的myid的内容应该写入1。

启动与停止
启动:
/opt/zookeeper/bin/zkServer.sh start
停止:
/opt/zookeeper/bin/zkServer.sh stop
查看状态:
leader:
[root@vincent zookeeper-3.4.9]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: leader
follower:
[root@vincent zookeeper-3.4.9]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: follower
通过/root/zookeeper-3.4.9/bin/zkCli.sh可以管理zookeeper
Broker注册
Broker在zookeeper中保存为一个临时节点,节点的路径是/brokers/ids/[brokerid],每个节点会保存对应broker的IP以及端口等信息.
bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[3, 2, 1]
[zk: localhost:2181(CONNECTED) 1] get /brokers/ids/1
{"jmx_port":-1,"timestamp":"1474199602586","endpoints":[“PLAINTEXT://10.200.1.111:9092”],"host":"10.200.1.111","version":3,"port":9092}
cZxid = 0x200000063
ctime = Sun Sep 18 19:53:22 CST 2016
mZxid = 0x200000063
mtime = Sun Sep 18 19:53:22 CST 2016
pZxid = 0x200000063
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1573c8a27d00005
dataLength = 135
numChildren = 0
Topic注册
在kafka中,一个topic会被分成多个区并被分到多个broker上,分区的信息以及broker的分布情况都保存在zookeeper中,根节点路径为/brokers/topics,每个topic都会在topics下建立独立的子节点,每个topic节点下都会包含分区以及broker的对应信息,例如下图中的状态
查看topic
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[oplog]
查看分区情况
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/oplog/partitions
[15, 13, 14, 11, 12, 3, 2, 1, 10, 0, 7, 6, 5, 4, 9, 8]
查看分区在topic上的分布情况
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/oplog/partitions/1/state   
{"controller_epoch":14,"leader":1,"version":1,"leader_epoch":1,"isr":[1]}
cZxid = 0x30000002c
ctime = Mon Sep 19 20:10:28 CST 2016
mZxid = 0x3000013ef
mtime = Tue Sep 20 09:09:24 CST 2016
pZxid = 0x30000002c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 73
numChildren = 0
其中isr表示分区在哪个broker节点上

kafka有消费者分组的概念,每个分组中可以包含多个消费者,每条消息只会发给分组中的一个消费者,且每个分组之间是相互独立互不影响的。
消费者与分区的对应关系
在kafka的设计中规定,对于topic的每个分区,最多只能被一个消费者进行消费,也就是消费者与分区的关系是一对多的关系。消费者与分区的关系也被存储在zookeeper中
节点的路劲为 /consumers/[group_id]/owners/[topic]/[broker_id-partition_id],该节点的内容就是消费者的Consumer ID
[zk: localhost:2181(CONNECTED) 29] ls /consumers/logstash/owners/oplog
[15, 13, 14, 11, 12, 3, 2, 1, 10, 0, 7, 6, 5, 4, 9, 8]
[zk: localhost:2181(CONNECTED) 30] get /consumers/logstash/owners/oplog/1
logstash_109-0
cZxid = 0x30003be29
ctime = Tue Sep 20 16:57:58 CST 2016
mZxid = 0x30003be29
mtime = Tue Sep 20 16:57:58 CST 2016
pZxid = 0x30003be29
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2574250b30600a9
dataLength = 14
numChildren = 0
可以看到分区1对应的消费者为logstash_109-0(logstash为Group,109为consumer ID,0为109的启动的线程号)
消费者负载均衡
消费者服务启动时,会创建一个属于消费者节点的临时节点,节点的路径为 /consumers/[group_id]/ids/[consumer_id],该节点的内容是该消费者订阅的Topic信息。
每个消费者会对/consumers/[group_id]/ids节点注册Watcher监听器,一旦消费者的数量增加或减少就会触发消费者的负载均衡。
查看目前工作的消费者
[zk: localhost:2181(CONNECTED) 31] ls /consumers/logstash/ids
[logstash_110, logstash_111, logstash_109]

消费者的offset
zookeeper记录的消费者的offset
格式为/consumers/[group_id]/offsets/[topic]/[part_id]
[zk: localhost:2181(CONNECTED) 28] get /consumers/logstash/offsets/oplog/1
705216
cZxid = 0x3000004b2
ctime = Tue Sep 20 08:36:33 CST 2016
mZxid = 0x30003d433
mtime = Tue Sep 20 17:03:51 CST 2016
pZxid = 0x3000004b2
cversion = 0
dataVersion = 9107
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
可以看到消费者的offset为705216
 

kafka查看未被消费的日志数量

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  –zookeeper localhost:2181  –group logstash

123

Offset表示已经消费的message条数
logSize表示全部的message条数
Lag表示未消费的message条数
每个分区只能有一个消费者,也就是消费者和分区之前是一对多的关系。如上图所示一共三台logstash,每台logstash上启5和线程,所以一共15个消费者,而该Topic共16个分区,可以看到logstash 109的线程0对应了0和1两个分区

kafka常见问题汇总

Kafka如何消费已经消费过的数据


consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据消费。当然这里会产生一个很严重的问题,如果你重启一消费者程序,那你连一条数据都抓不到,但是log文件中明明可以看到所有数据都好好的存在。换句话说,一旦你消费过这些数据,那你就无法再次用同一个groupid消费同一组数据了。

原因:消费者消费了数据并不从队列中移除,只是记录了offset偏移量。同一个consumergroup的所有consumer合起来消费一个topic,并且他们每次消费的时候都会保存一个offset参数在zookeeper的root上。如果此时某个consumer挂了或者新增一个consumer进程,将会触发kafka的负载均衡,暂时性的重启所有consumer,重新分配哪个consumer去消费哪个partition,然后再继续通过保存在zookeeper上的offset参数继续读取数据。注意:offset保存的是consumer 组消费的消息偏移。
要消费同一组数据,可以采用不同的group。

partition和consumer数目关系


1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

topic 副本问题


Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。
(1) 如何分配副本
Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。

(2) Kafka分配Replica的算法如下
(1)将所有Broker(假设共n个Broker)和待分配的Partition排序
(2)将第i个Partition分配到第(i mod n)个Broker上
(3)将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

如何设置生存周期与清理数据


日志文件的删除策略非常简单:启动一个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间).清理参数在server.properties文件中:

想实现消息队列中保存2小时的消息,那么配置应该像这样:
log.roll.hours=1
log.retention.hours=2
log.segment.delete.delay.ms=0
与控制分段策略相关的几个properties:
log.roll.{hours,ms} — 日志滚动的周期时间(小时,毫秒,log.roll.ms优先级更高),到达指定周期时强制生成一个新的segment。
log.segment.bytes — 每个segment的最大容量上限(默认1GB)。到达指定容量时会强制生成一个新的segment。
与过期segment处理策略相关的几个properties:
cleanup.policy={compact,delete} — 过期segment处理算法,有两种,分别为删除和压缩,默认delete。
log.retention.{hours,minutes,ms} — 日志保留时间(小时,分钟,毫秒。优先级依次升高),超出保留时间的日志执行cleanup.policy定义的操作
log.segment.delete.delay.ms — 删除日志文件前的保留一段时间。默认60000。
log.retention.check.interval.ms — log checker的检测是否需要删除文件的周期。默认300000。

zookeeper如何管理kafka


1、Producer端使用zookeeper用来”发现”broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.
2、Broker端使用zookeeper用来注册broker信息,以及监测partition leader存活性.
3、Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.

kafka能否自动创建topics


如果broker中没有topic的信息,当producer/consumer操作topic时,是否自动创建.
auto.create.topics.enable=true

kafka delete topic,marked for deletion

kafka 删除topic 提示marked for deletion
[html] view plain copy
[root@logSer config]# kafka-topics.sh –delete –zookeeper localhost:2181 –topic test-group        
Topic test-group is marked for deletion.  
Note: This will have no impact if delete.topic.enable is not set to true.  
[root@logSer config]# kafka-topics.sh –list –zookeeper localhost:2181  
test-group – marked for deletion  
test-topic  
test-user-001  


并没有真正删除,如果要真正删除
配置delete.topic.enable=true

配置文件在kafka\config目录

[html] view plain copy
[root@logSer config]# vi server.properties   
delete.topic.enable=true  
"server.properties" 122L, 5585C written