kafka入门

Kafka 相关概念

Kafka 是一款高吞吐量、分布式、可持久化的消息系统,具有以下特点:

  • 高吞吐量:Kafka 可以处理大量的消息流,每秒能够处理上百万条消息。
  • 分布式系统:Kafka 可以在多台服务器上运行,实现消息的分布式处理和存储。
  • 可持久化的消息存储:Kafka 通过消息存储机制,保证消息的可靠传递和持久化存储。

本文将介绍 Kafka 的相关概念。

Topic

Topic 是 Kafka 中消息发送和接收的单位,每个 Topic 包含多个消息,每个消息由一个 Key 和一个 Value 组成,通常 Key 用于分割消息流,Value 用于存储具体的消息内容。

  • 创建 Topic:可以通过 Kafka 的命令行工具或者 API 来创建 Topic。
  • 分区:每个 Topic 可以分为多个分区,每个分区可以在不同节点上存储和处理,实现消息的并行处理。
  • 副本:为了保证消息的可靠传递,每个分区可以有多个副本,分布在不同的节点上,当主副本故障时,备用副本可以接替主副本的功能。

Producer

Producer 是生产者,负责向 Kafka 发送消息并将消息写入 Topic 的分区中。

  • 发送消息:Producer 可以通过 Kafka 的 API 来发送消息,向指定 Topic 发送消息。
  • 确认机制:Producer 发送消息后,可以通过确认机制获得发送消息的状态。

Consumer

Consumer 是消费者,负责从 Kafka 中读取消息并处理消息。

  • 订阅消息:Consumer 可以通过 Kafka 的 API 来订阅 Topic 上的消息。
  • 分配分区:当 Consumer 订阅某个 Topic 后,需要为其分配分区,以实现消息的并行消费。
  • 确认机制:消费者也可以通过确认机制来确认收到的消息,从而保证消息的可靠接收和处理。

Broker

Broker 是 Kafka 的消息中间件,负责接收和处理 Producer 发送的消息、为 Consumer 提供消息服务。

  • 存储和处理:Broker 负责存储和处理 Kafka 中的消息,实现消息的持久化存储和并行处理。
  • 管理 Topic、Partition 和 Replica:Broker 具有管理 Topic、Partition 和 Replica 的功能,可以实现动态扩展和缩小 Kafka 集群的节点数量,以适应不同的业务需求。
  • 队列存储:Broker 使用队列存储的方式,实现高性能的消息处理和传递。

ZooKeeper

ZooKeeper 是分布式系统协同服务,Kafka 集群中需要使用 ZooKeeper 来进行分布式协调和管理。

  • 为 Kafka 派发 Broker ID:ZooKeeper 可以为 Kafka Server 派发全局唯一的 Broker ID,以实现集群内部的节点通信。
  • 管理元数据:ZooKeeper 管理了 Kafka 集群中的元数据,包括 Topic、Partition、Broker 等信息。
  • 监控:ZooKeeper 可以对 Kafka 集群的状态进行监控和统计,发现和解决问题。

Conclusion

Kafka 是一款高性能、分布式的消息中间件,支持可靠,高容错性的消息处理,并可轻松与各种数据系统集成。通过学习 Kafka 的相关概念,可以更好地了解和掌握 Kafka 的使用和实践。

kafka架构

Kafka 架构

Kafka 架构

Kafka 架构主要分为以下几个部分:

  1. Topic:消息的录入点,每个 Topic 可以被分成多个 Partition,每个 Partition 可以存储多个消息。
  2. Producer:消息的生产者,负责往 Kafka 集群中的 Topic 发送消息,将消息放到指定的 Partition 中。
  3. Broker:消息经过 Producer 生产后,会发送到多个 Broker 上,Broker 维护了消息的持久化存储和 Snapshot 文件,使得 Kafka 集群可以在 Broker 崩溃或网络故障的情况下依然可以保证消息的持久性。
  4. Consumer:消息的消费者,从指定的 Topic 的 Partition 中消费消息,消费后的消息将不能被再次消费,一旦消息被消费,该消费者就读取该消息的 Offset 位置,以便下一次从该位置开始消费。
  5. ZooKeeper:用于管理 Broker 集群,负责维护集群中每个 Broker 的元数据,并且在 Broker 发生变化时通知 Consumer。

Kafka 的工作流程

  1. Producer 将消息发送到 Broker。
  2. Broker 将消息持久化到磁盘,并将消息存储到对应的 Partition 中。
  3. Consumer 从指定的 Partition 中消费消息,并将 Offset 位置保存到 ZooKeeper 中。
  4. 当有新消息到达时,Consumer 将自动从上一次消费的 Offset 位置继续消费。

Java 代码示例

以下是一个简单的 Java 代码示例,展示了如何使用 Kafka 的 Producer 模块发送消息到指定的 Topic 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 配置 Kafka 生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 创建 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(props);

// 生产并发送消息
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}

// 关闭 Kafka 生产者
producer.close();

结语

Kafka 是一个强大的消息引擎,它可以使得大规模数据处理变得更加高效、灵活,适合于各种大数据处理场景。希望本文能为大家对 Kafka 的理解和应用提供帮助。

kafka实践

Kafka实践及代码示例

简介

Apache Kafka是一款实时数据流处理中心,可以快速高效地处理大规模、高吞吐量的数据流,由LinkedIn于2011年开发,后于2012年成为了Apache基金会的一个顶级开源项目。

Kafka以分布式消息队列的形式提供了高可靠(容错性高,支持数据冗余备份)、高伸缩性(支持多节点、多分区)、高吞吐率和低延迟的特性,适用于构建实时数据流的处理方案,被广泛应用于互联网、金融、电商等领域。

本文将介绍如何使用Java代码进行Kafka实践,包括Kafka配置、消息的生产者和消费者的实现。

环境搭建

首先,需要搭建Kafka运行环境,具体步骤请参考Kafka官方文档

Kafka配置

Kafka的配置文件位于config/server.properties,可以根据需要进行修改。

以下是常用的配置项及其含义:

  • broker.id:Broker的唯一标识符,具有唯一性;
  • listeners:Broker监听的网络地址;
  • advertised.listeners:供客户端访问的Broker地址;
  • num.network.threads:处理网络请求的线程数;
  • num.io.threads:处理磁盘IO的线程数;
  • socket.send.buffer.bytes:发送数据缓冲区大小;
  • socket.receive.buffer.bytes:接收数据缓冲区大小;
  • socket.request.max.bytes:最大请求大小;
  • num.partitions:每个Topic的分区数;
  • log.dirs:Topic数据存储路径;
  • auto.create.topics.enable:是否允许自动创建Topic;
  • delete.topic.enable:是否允许删除Topic;
  • group.initial.rebalance.delay.ms:Consumer组初始负载均衡延迟时间;
  • offsets.topic.replication.factor:偏移量Topic的副本因子。

Kafka生产者

Kafka生产者用于向指定的Topic中发送消息,Java代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerDemo {
private static final String TOPIC_NAME = "test_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
producer.send(new ProducerRecord<>(TOPIC_NAME, message));
System.out.println("Sent:" + message);
}
producer.close();
}
}

以上代码实现了一个简单的Kafka生产者,可以向指定的TOPIC_NAME中发送10条消息。具体参数配置说明如下:

  • bootstrap.servers:Kafka集群地址;
  • acks:消息确认模式;
    • 0:不等待Broker的任何确认;
    • 1:等待Broker确认消息已写入本地磁盘;
    • all:等待Broker确认消息已写入所有ISR副本;
  • retries:消息发送失败后的重试次数;
  • batch.size:消息批处理大小,单位为字节;
  • linger.ms:延迟发送消息的时间,单位为毫秒;
  • buffer.memory:生产者可用的内存缓存大小,单位为字节;
  • key.serializer:序列化消息键的方式;
  • value.serializer:序列化消息值的方式。

Kafka消费者

Kafka消费者用于从指定的Topic中消费消息,Java代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {
private static final String TOPIC_NAME = "test_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "test_group";

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("group.id", GROUP_ID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: value = %s, topic = %s, partition = %d, offset = %d%n",
record.value(), record.topic(), record.partition(), record.offset());
}
consumer.commitAsync();
}
}
}

以上代码实现了一个简单的Kafka消费者,可以从指定的TOPIC_NAME中消费消息。具体参数配置说明如下:

  • bootstrap.servers:Kafka集群地址;
  • group.id:消费者组的唯一标识符;
  • enable.auto.commit:是否开启自动提交偏移量;
  • auto.commit.interval.ms:自动提交偏移量的时间间隔,单位为毫秒;
  • key.deserializer:消息键的反序列化方式;
  • value.deserializer:消息值的反序列化方式。

总结

Kafka是一款高可靠、高伸缩性、高吞吐率和低延迟的分布式消息队列,广泛应用于互联网、金融、电商等领域。本文介绍了Kafka的基本概念及其Java代码实践,希望对读者有所帮助。