使用 Kafka 和 Docker 开发事件驱动应用

随着微服务兴起,事件驱动架构越来越受欢迎。Apache Kafka 是一个分布式事件流平台,常是这些架构的核心。不幸的是,为开发环境设置和部署自己的 Kafka 实例通常很棘手。幸运的是,Docker 和容器使这变得容易得多。

在本指南中,您将学习如何

  1. 使用 Docker 启动 Kafka 集群
  2. 将非容器化应用连接到集群
  3. 将容器化应用连接到集群
  4. 部署 Kafka-UI 以帮助故障排除和调试

先决条件

要遵循本操作指南,需要满足以下先决条件

启动 Kafka

Kafka 3.3 开始,借助 KRaft (Kafka Raft),不再需要 Zookeeper,Kafka 的部署得到了极大简化。使用 KRaft,为本地开发设置 Kafka 实例要容易得多。从 Kafka 3.8 发布开始,现已推出新的 kafka-native Docker 镜像,提供了显著更快的启动速度和更低的内存占用。

提示

本指南将使用 apache/kafka 镜像,因为它包含许多用于管理和使用 Kafka 的实用脚本。但是,您可能希望使用 apache/kafka-native 镜像,因为它启动更快且所需的资源更少。

启动 Kafka 实例

按照以下步骤启动一个基本的 Kafka 集群。本示例将启动一个集群,并将端口 9092 暴露到主机上,以便原生运行的应用可以连接到它。

  1. 通过运行以下命令启动一个 Kafka 容器

    $ docker run -d --name=kafka -p 9092:9092 apache/kafka
    
  2. 一旦镜像拉取完成,您将在几秒钟内启动并运行一个 Kafka 实例。

  3. apache/kafka 镜像在 /opt/kafka/bin 目录中附带了一些有用的脚本。运行以下命令来验证集群是否已启动并运行,并获取其集群 ID

    $ docker exec -ti kafka /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server :9092
    

    执行此操作将产生类似于以下内容的输出

    Cluster ID: 5L6g3nShT-eMCtK--X86sw
  4. 通过运行以下命令创建一个示例主题并生产(或发布)一些消息

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
    

    运行后,您可以每行输入一条消息。例如,输入几条消息,每行一条。一些示例可能是

    First message

    以及

    Second message

    enter 发送最后一条消息,完成后按 ctrl+c。这些消息将发布到 Kafka。

  5. 通过消费消息来确认消息已发布到集群中

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic demo --from-beginning
    

    然后您应该在输出中看到您的消息

    First message
    Second message

    如果需要,您可以打开另一个终端并发布更多消息,然后看到它们出现在消费者端。

    完成后,按 ctrl+c 停止消费消息。

您现在拥有一个本地运行的 Kafka 集群,并已验证您可以连接到它。

从非容器化应用连接到 Kafka

既然您已经展示了如何从命令行连接到 Kafka 实例,现在是时候从应用连接到集群了。在本示例中,您将使用一个使用 KafkaJS 库的简单 Node 项目。

由于集群在本地运行并暴露在端口 9092 上,该应用可以通过 localhost:9092 连接到集群(因为它现在是原生运行,而不是在容器中)。连接后,此示例应用将记录从 demo 主题消费的消息。此外,当它在开发模式下运行时,如果主题不存在,它也将创建该主题。

  1. 如果您没有从上一步运行 Kafka 集群,请运行以下命令启动一个 Kafka 实例

    $ docker run -d --name=kafka -p 9092:9092 apache/kafka
    
  2. GitHub 仓库 克隆到本地。

    $ git clone https://github.com/dockersamples/kafka-development-node.git
    
  3. 导航到项目目录。

    cd kafka-development-node/app
    
  4. 使用 yarn 安装依赖项。

    $ yarn install
    
  5. 使用 yarn dev 启动应用。这将把 NODE_ENV 环境变量设置为 development 并使用 nodemon 监视文件更改。

    $ yarn dev
    
  6. 应用现在正在运行,它将把接收到的消息记录到控制台。在新终端中,使用以下命令发布几条消息

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
    

    然后向集群发送一条消息

    Test message

    请记住,完成后按 ctrl+c 停止生产消息。

从容器化和原生应用连接到 Kafka

现在您已经有一个应用通过暴露的端口连接到 Kafka,是时候探索从另一个容器连接到 Kafka 需要进行哪些更改了。为此,您现在将通过容器运行该应用,而不是原生运行。

但在您这样做之前,了解 Kafka 监听器的工作原理以及它们如何帮助客户端连接非常重要。

理解 Kafka 监听器

当客户端连接到 Kafka 集群时,它实际上是连接到一个“代理(broker)”。代理有很多角色,其中之一是支持客户端的负载均衡。客户端连接时,代理会返回一组连接 URL,客户端随后应使用这些 URL 进行消息的生产或消费。这些连接 URL 是如何配置的?

每个 Kafka 实例都有一组监听器(listeners)和公告监听器(advertised listeners)。“监听器”是 Kafka 绑定的地址,而“公告监听器”则配置客户端应如何连接到集群。客户端接收的连接 URL 取决于客户端连接到哪个监听器。

定义监听器

为了帮助理解这一点,让我们看看 Kafka 需要如何配置以支持两种连接方式

  1. 主机连接(通过主机映射端口进行的连接)- 这些需要使用 localhost 连接
  2. Docker 连接(来自 Docker 网络内部的连接)- 这些不能使用 localhost 连接,而需要使用 Kafka 服务的网络别名(或 DNS 地址)

由于客户端需要使用两种不同的方法连接,因此需要两种不同的监听器 - HOSTDOCKERHOST 监听器将告诉客户端使用 localhost:9092 连接,而 DOCKER 监听器将告知客户端使用 kafka:9093 连接。请注意,这意味着 Kafka 同时在 9092 和 9093 端口上监听。但是,只需要将主机监听器暴露给主机。

Diagram showing the DOCKER and HOST listeners and how they are exposed to the host and Docker networks

为了进行此设置,Kafka 的 compose.yaml 需要一些额外的配置。一旦开始覆盖一些默认设置,还需要指定其他一些选项,以便 KRaft 模式能够工作。

services:
  kafka:
    image: apache/kafka-native
    ports:
      - "9092:9092"
    environment:
      # Configure listeners for both docker and host communication
      KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT

      # Settings required for KRaft mode
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091

      # Listener to use for broker-to-broker communication
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER

      # Required for a single node cluster
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

按照以下步骤尝试一下。

  1. 如果您有上一步中运行的 Node 应用,请在终端中按 ctrl+c 停止它。

  2. 如果您有上一节中运行的 Kafka 集群,请使用以下命令停止该容器

    $ docker rm -f kafka
    
  3. 在克隆项目目录的根目录下运行以下命令启动 Compose 堆栈

    $ docker compose up
    

    稍等片刻,应用就会启动并运行。

  4. 堆栈中还有一个可用于发布消息的服务。访问 http://localhost:3000 打开它。当您输入消息并提交表单时,您应该在应用中看到接收到该消息的日志信息。

    这有助于说明容器化方法如何轻松添加其他服务来帮助测试和排除应用故障。

添加集群可视化

一旦您开始在开发环境中使用容器,就会意识到添加专注于帮助开发的其他服务(例如可视化工具和其他支持服务)非常容易。既然您已经运行了 Kafka,可视化 Kafka 集群中正在发生的事情可能会很有帮助。为此,您可以运行 Kafbat UI Web 应用

要将其添加到您自己的项目中(在演示应用中已包含),只需将以下配置添加到您的 Compose 文件即可

services:
  kafka-ui:
    image: kafbat/kafka-ui:main
    ports:
      - 8080:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: "true"
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
    depends_on:
      - kafka

然后,一旦 Compose 堆栈启动,您就可以在浏览器中打开 http://localhost:8080 并浏览以查看有关集群的更多详细信息、检查消费者、发布测试消息等等。

使用 Kafka 进行测试

如果您有兴趣了解如何轻松地将 Kafka 集成到您的集成测试中,请查阅 使用 Testcontainers 测试 Spring Boot Kafka 监听器 指南。本指南将教您如何使用 Testcontainers 在测试中管理 Kafka 容器的生命周期。

结论

通过使用 Docker,您可以简化使用 Kafka 开发和测试事件驱动应用的过程。容器简化了设置和部署开发所需的各种服务的过程。一旦它们在 Compose 中定义好,团队中的每个人都可以享受到易用性带来的便利。

如果您之前错过了,所有示例应用代码都可以在 dockersamples/kafka-development-node 中找到。

页面选项