深入了解kafka系列-主题

前言

主题和分区是Kafka 的两个核心概念,前面系列中讲述的生产者和消费者的设计理念所针对的都是主题和分区层面的操作。主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。分区的划分不仅为Kafka提供了可伸缩性、水平扩展的功能,还通过多副本机制来为Kafka提供数据冗余以提高数据可靠性。

从Kafka的底层实现来说,主题和分区都是逻辑上的概念,分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等。不过对于使用Kafka进行消息收发的普通用户而言,了解到分区这一层面足以应对大部分的使用场景,这里暂时只说到主题和分区,更底层的内容会在后续这个系列持续讲解~

主题的管理

主题的管理包括创建主题、查看主题信息、修改主题和删除主题等操作。可以通过 Kafka提供的 kafka-topics.sh脚本来执行这些操作,这个脚本位于$KAFKA_HOME/bin/目录下,其核心代码仅有一行,具体如下:

1
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

当然主题的管理并非只有使用 kafka-topics.sh 脚本这一种方式,我们还可以通过KafkaAdminClient 的方式实现(这种方式实质上是通过发送 CreateTopicsRequest、DeleteTopicsRequest 等请求来实现的,甚至我们还可以通过直接操纵日志文件和ZooKeeper节点来实现,这个后续我会单独抽出来放在这个系列继续讲解~

  • 1 创建主题

如果broker端配置参数auto.create.topics.enable设置为true(默认值就是true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions (默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。不建议这样操作,topic 未知难以维护,建议下面这种方式:

1
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka100 --topic topic-create --replication-factor 2 --partitions 4

上面的示例中创建了一个分区数为 4、副本因子为 2 的主题.

三个broker节点一共创建了8个文件夹,这个数字8实质上是分区数4与副本因子2的乘积。每个副本(或者更确切地说应该是日志,副本与日志一一对应)才真正对应了一个命名形式如<topic>-<partition>的文件夹。主题、分区、副本和 Log(日志)的关系如图所示

同一个分区中的多个副本必须分布在不同的broker中,这样才能提供有效的数据冗余。对于示例中的分区数为4、副本因子为2、broker数为3的情况下,按照2、3、3的分区副本个数分配给各个broker是最优的选择。再比如在分区数为3、副本因子为3,并且broker数同样为3的情况下,分配3、3、3的分区副本个数给各个broker是最优的选择,也就是每个broker中都拥有所有分区的一个副本。

  • 2 分区副本的分配

这里的分区分配是指为集群制定创建主题时的分区副本分配方案,即在哪个broker中创建哪些分区的副本
在创建主题时,如果使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;如果没有使用replica-assignment参数,那么就需要按照内部的逻辑来计算分配方案了。使用kafka-topics.sh脚本创建主题时的内部分配逻辑按照机架信息划分成两种策略:未指定机架信息和指定机架信息。如果集群中所有的 broker节点都没有配置broker.rack参数,或者使用disable-rack-aware参数来创建主题,那么采用的就是未指定机架信息的分配策略,否则采用的就是指定机架信息的分配策略

  • 3 查看主题

kafka-topics.sh脚本有5种指令类型:create、list、describe、alter和delete。其中list和describe指令可以用来方便地查看主题信息,通过list指令可以查看当前所有可用的主题,示例如下:

1
bin/kafka-topics.sh --zookeeper localhost:2181 --list

查看指定topic信息
1
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-create

  • 4 修改主题

当一个主题被创建之后,依然允许我们对其做一定的修改,比如修改分区个数、修改配置等,这个修改的功能就是由kafka-topics.sh脚本中的alter指令提供的。我们首先来看如何增加主题的分区数。以前面的主题topic-config为例,当前分区数为1,修改为3,示例如下:

1
2
3
4
5
6
7
8
9
# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-create
Topic:topic-config PartitionCount:3 ReplicationFactor:1 Configs:
Topic: topic-config Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: topic-config Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-config Partition: 2 Leader: 1 Replicas: 1 Isr: 1

当主题中的消息包含key时(即key不为null),根据key计算分区的行为就会受到影响,对于基于key计算的主题而言,建议在一开始就设置好分区数量,避免以后对其进行调整。目前Kafka只支持增加分区数而不支持减少分区数.

  • 5 删除主题

如果确定不再使用一个主题,那么最好的方式是将其删除,这样可以释放一些资源,比如磁盘、文件句柄等。kafka-topics.sh脚本中的delete指令就可以用来删除主题,比如删除一个主题topic-delete.

初识KafkaAdminClient

一般情况下,我们都习惯使用kafka-topics.sh脚本来管理主题,但有时候我们希望将主题管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台.

KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient抽象类,并提供了多种方法。篇幅限制,下面只列出与本章内容相关的一些方法。

  • 创建主题:
1
CreateTopicsResult createTopics(Collection<NewTopic>newTopics)
  • 删除主题:
1
DeleteTopicsResult deleteTopics(Collection<String>topics)。
  • 列出所有可用的主题:
1
ListTopicsResult listTopics()。
  • 查看主题的信息:
1
DescribeTopicsResult describeTopics(Collection<String>topicNames)
  • 查询配置信息:
1
DescribeConfigsResult describeConfigs(Collection<ConfigResource>resources)
  • 修改配置信息:
1
AlterConfigsResult alterConfigs(Map<ConfigResource,Config>configs)
  • 增加分区:
1
CreatePartitionsResult createPartitions(Map<String,NewPartitions>newPartitions)

原创不易,如果觉得有点用的话,请毫不留情点个赞,转发一下,这将是我持续输出优质文章的最强动力。

请我吃🍗