使用 Kafka 和 Docker 开发事件驱动应用
随着微服务兴起,事件驱动架构越来越受欢迎。Apache Kafka 是一个分布式事件流平台,常是这些架构的核心。不幸的是,为开发环境设置和部署自己的 Kafka 实例通常很棘手。幸运的是,Docker 和容器使这变得容易得多。
在本指南中,您将学习如何
- 使用 Docker 启动 Kafka 集群
- 将非容器化应用连接到集群
- 将容器化应用连接到集群
- 部署 Kafka-UI 以帮助故障排除和调试
先决条件
要遵循本操作指南,需要满足以下先决条件
- Docker Desktop
- Node.js 和 yarn
- Kafka 和 Docker 的基础知识
启动 Kafka
从 Kafka 3.3 开始,借助 KRaft (Kafka Raft),不再需要 Zookeeper,Kafka 的部署得到了极大简化。使用 KRaft,为本地开发设置 Kafka 实例要容易得多。从 Kafka 3.8 发布开始,现已推出新的 kafka-native Docker 镜像,提供了显著更快的启动速度和更低的内存占用。
提示
本指南将使用 apache/kafka 镜像,因为它包含许多用于管理和使用 Kafka 的实用脚本。但是,您可能希望使用 apache/kafka-native 镜像,因为它启动更快且所需的资源更少。
启动 Kafka 实例
按照以下步骤启动一个基本的 Kafka 集群。本示例将启动一个集群,并将端口 9092 暴露到主机上,以便原生运行的应用可以连接到它。
通过运行以下命令启动一个 Kafka 容器
$ docker run -d --name=kafka -p 9092:9092 apache/kafka
一旦镜像拉取完成,您将在几秒钟内启动并运行一个 Kafka 实例。
apache/kafka 镜像在
/opt/kafka/bin
目录中附带了一些有用的脚本。运行以下命令来验证集群是否已启动并运行,并获取其集群 ID$ docker exec -ti kafka /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server :9092
执行此操作将产生类似于以下内容的输出
Cluster ID: 5L6g3nShT-eMCtK--X86sw
通过运行以下命令创建一个示例主题并生产(或发布)一些消息
$ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
运行后,您可以每行输入一条消息。例如,输入几条消息,每行一条。一些示例可能是
First message
以及
Second message
按
enter
发送最后一条消息,完成后按ctrl+c
。这些消息将发布到 Kafka。通过消费消息来确认消息已发布到集群中
$ docker exec -ti kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic demo --from-beginning
然后您应该在输出中看到您的消息
First message Second message
如果需要,您可以打开另一个终端并发布更多消息,然后看到它们出现在消费者端。
完成后,按
ctrl+c
停止消费消息。
您现在拥有一个本地运行的 Kafka 集群,并已验证您可以连接到它。
从非容器化应用连接到 Kafka
既然您已经展示了如何从命令行连接到 Kafka 实例,现在是时候从应用连接到集群了。在本示例中,您将使用一个使用 KafkaJS 库的简单 Node 项目。
由于集群在本地运行并暴露在端口 9092 上,该应用可以通过 localhost:9092
连接到集群(因为它现在是原生运行,而不是在容器中)。连接后,此示例应用将记录从 demo
主题消费的消息。此外,当它在开发模式下运行时,如果主题不存在,它也将创建该主题。
如果您没有从上一步运行 Kafka 集群,请运行以下命令启动一个 Kafka 实例
$ docker run -d --name=kafka -p 9092:9092 apache/kafka
将 GitHub 仓库 克隆到本地。
$ git clone https://github.com/dockersamples/kafka-development-node.git
导航到项目目录。
cd kafka-development-node/app
使用 yarn 安装依赖项。
$ yarn install
使用
yarn dev
启动应用。这将把NODE_ENV
环境变量设置为development
并使用nodemon
监视文件更改。$ yarn dev
应用现在正在运行,它将把接收到的消息记录到控制台。在新终端中,使用以下命令发布几条消息
$ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
然后向集群发送一条消息
Test message
请记住,完成后按
ctrl+c
停止生产消息。
从容器化和原生应用连接到 Kafka
现在您已经有一个应用通过暴露的端口连接到 Kafka,是时候探索从另一个容器连接到 Kafka 需要进行哪些更改了。为此,您现在将通过容器运行该应用,而不是原生运行。
但在您这样做之前,了解 Kafka 监听器的工作原理以及它们如何帮助客户端连接非常重要。
理解 Kafka 监听器
当客户端连接到 Kafka 集群时,它实际上是连接到一个“代理(broker)”。代理有很多角色,其中之一是支持客户端的负载均衡。客户端连接时,代理会返回一组连接 URL,客户端随后应使用这些 URL 进行消息的生产或消费。这些连接 URL 是如何配置的?
每个 Kafka 实例都有一组监听器(listeners)和公告监听器(advertised listeners)。“监听器”是 Kafka 绑定的地址,而“公告监听器”则配置客户端应如何连接到集群。客户端接收的连接 URL 取决于客户端连接到哪个监听器。
定义监听器
为了帮助理解这一点,让我们看看 Kafka 需要如何配置以支持两种连接方式
- 主机连接(通过主机映射端口进行的连接)- 这些需要使用 localhost 连接
- Docker 连接(来自 Docker 网络内部的连接)- 这些不能使用 localhost 连接,而需要使用 Kafka 服务的网络别名(或 DNS 地址)
由于客户端需要使用两种不同的方法连接,因此需要两种不同的监听器 - HOST
和 DOCKER
。HOST
监听器将告诉客户端使用 localhost:9092
连接,而 DOCKER
监听器将告知客户端使用 kafka:9093
连接。请注意,这意味着 Kafka 同时在 9092 和 9093 端口上监听。但是,只需要将主机监听器暴露给主机。


为了进行此设置,Kafka 的 compose.yaml
需要一些额外的配置。一旦开始覆盖一些默认设置,还需要指定其他一些选项,以便 KRaft 模式能够工作。
services:
kafka:
image: apache/kafka-native
ports:
- "9092:9092"
environment:
# Configure listeners for both docker and host communication
KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT
# Settings required for KRaft mode
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091
# Listener to use for broker-to-broker communication
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
# Required for a single node cluster
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
按照以下步骤尝试一下。
如果您有上一步中运行的 Node 应用,请在终端中按
ctrl+c
停止它。如果您有上一节中运行的 Kafka 集群,请使用以下命令停止该容器
$ docker rm -f kafka
在克隆项目目录的根目录下运行以下命令启动 Compose 堆栈
$ docker compose up
稍等片刻,应用就会启动并运行。
堆栈中还有一个可用于发布消息的服务。访问 http://localhost:3000 打开它。当您输入消息并提交表单时,您应该在应用中看到接收到该消息的日志信息。
这有助于说明容器化方法如何轻松添加其他服务来帮助测试和排除应用故障。
添加集群可视化
一旦您开始在开发环境中使用容器,就会意识到添加专注于帮助开发的其他服务(例如可视化工具和其他支持服务)非常容易。既然您已经运行了 Kafka,可视化 Kafka 集群中正在发生的事情可能会很有帮助。为此,您可以运行 Kafbat UI Web 应用。
要将其添加到您自己的项目中(在演示应用中已包含),只需将以下配置添加到您的 Compose 文件即可
services:
kafka-ui:
image: kafbat/kafka-ui:main
ports:
- 8080:8080
environment:
DYNAMIC_CONFIG_ENABLED: "true"
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
depends_on:
- kafka
然后,一旦 Compose 堆栈启动,您就可以在浏览器中打开 http://localhost:8080 并浏览以查看有关集群的更多详细信息、检查消费者、发布测试消息等等。
使用 Kafka 进行测试
如果您有兴趣了解如何轻松地将 Kafka 集成到您的集成测试中,请查阅 使用 Testcontainers 测试 Spring Boot Kafka 监听器 指南。本指南将教您如何使用 Testcontainers 在测试中管理 Kafka 容器的生命周期。
结论
通过使用 Docker,您可以简化使用 Kafka 开发和测试事件驱动应用的过程。容器简化了设置和部署开发所需的各种服务的过程。一旦它们在 Compose 中定义好,团队中的每个人都可以享受到易用性带来的便利。
如果您之前错过了,所有示例应用代码都可以在 dockersamples/kafka-development-node
中找到。