本博客详细介绍了如何在 Windows 环境下使用 Docker 安装和配置 Kafka 集群。通过提供具体的操作步骤和命令示例,从零开始帮助用户配置 Docker、启动 Kafka 集群、并在 Spring Boot 应用程序中使用 Kafka 进行消息传递。该指南包括 Docker 安装、Kafka 集群搭建、启动 Kafka 服务、创建主题、配置 Spring Boot 应用以及测试 Kafka 功能。无论是新手还是有经验的开发者,都可以通过本指南轻松掌握 Kafka 在 Docker 环境下的配置和使用。
注意:此时有可能需要打开科学上网插件
CTRL+SHIFT+ESC
)-> 选择性能 -> CPU -> 确认是否显示虚拟化已启用。① 在控制面板打开程序,然后点击启动或关闭 Windows 功能:
② 如果未找到 Hyper-V,可以按照以下步骤操作:
Hyper-V.bat
文件,并将以下代码粘贴到文件中:pushd "%~dp0" dir /b %SystemRoot%\servicing\Packages\*Hyper-V*.mum >hyper-v.txt for /f %%i in ('findstr /i . hyper-v.txt 2^>nul') do dism /online /norestart /add-package:"%SystemRoot%\servicing\Packages\%%i" del hyper-v.txt Dism /online /enable-feature /featurename:Microsoft-Hyper-V-All /LimitAccess /ALL
① 如果使用的是较旧版本的 Windows 10,可能需要手动安装 wsl_update_X64.msi
:
wsl_update_X64.msi
安装包。② 对于较新版本的 Windows 10及以上,可以直接使用 PowerShell 命令:
wsl --install
③ 安装完 WSL 后,开始安装 Docker Desktop:
④ 配置国内镜像加速器:
registry-mirrors
字段中添加国内的镜像加速器 URL。例如: { "builder": { "gc": { "defaultKeepStorage": "20GB", "enabled": true } }, "experimental": false, "features": { "buildkit": true }, "registry-mirrors": [ "http://hub-mirror.c.163.com", "https://mirror.baidubce.com", "https://registry.docker-cn.com", "https://hub.uuuadc.top", "https://docker.anyhub.us.kg", "https://dockerhub.jobcher.com", "https://dockerhub.icu", "https://docker.ckyl.me" ] }
① 打开终端或命令提示符,运行以下命令查看 Docker 版本:
docker --version
示例结果:
Docker version 20.10.8, build 3967b7d
说明:如果正确安装了 Docker,您将看到版本号和构建信息。
② 查看 Docker 服务状态:
docker ps
示例结果:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
说明:此命令显示正在运行的 Docker 容器列表。如果没有容器在运行,列表将为空。
① 查看 Docker 版本:
docker --version
示例结果:
Docker version 20.10.8, build 3967b7d
说明:这将返回当前安装的 Docker 版本信息。
② 查看运行中的容器:
docker ps
示例结果:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
说明:列出所有当前运行的容器。如果没有运行的容器,列表将为空。
③ 查看所有容器(包括已停止的容器):
docker ps -a
示例结果:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES e5bc1c1d6b78 mysql:latest "docker-entrypoint.s…" 2 hours ago Exited (0) 30 minutes ago mysql
说明:显示所有容器,包括运行中的和已停止的容器。您可以看到每个容器的状态和名称。
④ 启动 Docker 服务:
systemctl start docker
示例结果:
(无输出表示成功启动)
说明:此命令在系统中启动 Docker 服务。如果成功启动,命令将没有任何输出。
⑤ 设置 Docker 服务开机自启动:
systemctl enable docker
示例结果:
Created symlink from /etc/systemd/system/multi-user.target.wants/docker.service to /usr/lib/systemd/system/docker.service.
说明:此命令配置 Docker 服务在系统启动时自动启动。如果成功,您将看到符号链接创建的确认消息。
⑥ 搜索 MySQL 镜像:
docker search mysql
示例结果:
NAME DESCRIPTION STARS OFFICIAL AUTOMATED mysql MySQL is a widely used, open-source relation… 10913 [OK] mariadb MariaDB is a community-developed fork of MyS… 4234 [OK]
说明:显示 Docker Hub 上与 MySQL 相关的镜像列表,包括描述和星级评分。
⑦ 运行 MySQL 容器:
docker run --name mysql -e MYSQL_ROOT_PASSWORD=root -d mysql:latest
示例结果:
7d5d6f8e7c9b81d845b7bfa1e738e8f7a6871e123b4c6d217c905e7890a5e8e5
说明:启动一个新的 MySQL 容器,并设置 root 用户的密码。如果成功启动,您将看到新容器的 ID。
⑧ 进入容器:
docker exec -it bash
示例结果:
root@7d5d6f8e7c9b:/#
说明:进入指定容器的终端会话,您可以在容器内执行命令。
docker-compose.yml
文件docker-compose.yml
,并将配置粘贴进去:version: '3.8' # 定义 Docker Compose 文件的版本 # version 这里可以删除 services: # 定义一组服务 zoo1: # 定义第一个 Zookeeper 容器 image: confluentinc/cp-zookeeper:7.3.2 # 使用 Confluent 提供的 Zookeeper 镜像,版本为 7.3.2 hostname: zoo1 # 设置容器主机名为 zoo1 container_name: zoo1 # 设置容器名称为 zoo1 ports: - "2181:2181" # 将主机的 2181 端口映射到容器的 2181 端口 environment: # 定义环境变量 ZOOKEEPER_CLIENT_PORT: 2181 # 设置 Zookeeper 客户端连接端口为 2181 ZOOKEEPER_SERVER_ID: 1 # 设置 Zookeeper 服务器 ID 为 1 ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888 # 定义 Zookeeper 集群的服务器列表 volumes: # 定义数据卷,将主机目录映射到容器目录 - ./data/zookeeper/zoo1/data:/data # 将主机的 ./data/zookeeper/zoo1/data 映射到容器的 /data 目录 - ./data/zookeeper/zoo1/datalog:/datalog # 将主机的 ./data/zookeeper/zoo1/datalog 映射到容器的 /datalog 目录 zoo2: # 定义第二个 Zookeeper 容器 image: confluentinc/cp-zookeeper:7.3.2 hostname: zoo2 container_name: zoo2 ports: - "2182:2182" # 将主机的 2182 端口映射到容器的 2182 端口 environment: ZOOKEEPER_CLIENT_PORT: 2182 ZOOKEEPER_SERVER_ID: 2 ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888 volumes: - ./data/zookeeper/zoo2/data:/data - ./data/zookeeper/zoo2/datalog:/datalog zoo3: # 定义第三个 Zookeeper 容器 image: confluentinc/cp-zookeeper:7.3.2 hostname: zoo3 container_name: zoo3 ports: - "2183:2183" # 将主机的 2183 端口映射到容器的 2183 端口 environment: ZOOKEEPER_CLIENT_PORT: 2183 ZOOKEEPER_SERVER_ID: 3 ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888 volumes: - ./data/zookeeper/zoo3/data:/data - ./data/zookeeper/zoo3/datalog:/datalog kafka1: # 定义第一个 Kafka 容器 image: confluentinc/cp-kafka:7.3.2 # 使用 Confluent 提供的 Kafka 镜像,版本为 7.3.2 hostname: kafka1 # 设置容器主机名为 kafka1 container_name: kafka1 # 设置容器名称为 kafka1 ports: - "9092:9092" # 将主机的 9092 端口映射到容器的 9092 端口 - "29092:29092" # 将主机的 29092 端口映射到容器的 29092 端口 environment: # 定义环境变量 KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 # 定义 Kafka 的监听地址 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT # 定义 Kafka 的安全协议映射 KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL # 定义 Kafka 内部通信的监听器名称 KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" # 定义 Zookeeper 集群的连接地址 KAFKA_BROKER_ID: 1 # 设置 Kafka Broker 的 ID 为 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" # 定义 Kafka 的日志级别 KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer # 设置 Kafka 的授权器类名 KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" # 设置当没有找到 ACL 时是否允许所有人访问 volumes: # 定义数据卷,将主机目录映射到容器目录 - ./data/kafka_data1:/kafka/data # 将主机的 ./data/kafka_data1 映射到容器的 /kafka/data 目录 depends_on: # 定义服务依赖关系 - zoo1 - zoo2 - zoo3 # 在启动 kafka1 容器前,确保 zoo1、zoo2 和 zoo3 容器已经启动 kafka2: # 定义第二个 Kafka 容器 image: confluentinc/cp-kafka:7.3.2 hostname: kafka2 container_name: kafka2 ports: - "9093:9093" # 将主机的 9093 端口映射到容器的 9093 端口 - "29093:29093" # 将主机的 29093 端口映射到容器的 29093 端口 environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 2 # 设置 Kafka Broker 的 ID 为 2 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" volumes: - ./data/kafka_data2:/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka3: # 定义第三个 Kafka 容器 image: confluentinc/cp-kafka:7.3.2 hostname: kafka3 container_name: kafka3 ports: - "9094:9094" # 将主机的 9094 端口映射到容器的 9094 端口 - "29094:29094" # 将主机的 29094 端口映射到容器的 29094 端口 environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 3 # 设置 Kafka Broker 的 ID 为 3 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" volumes: - ./data/kafka_data 3:/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka-ui: # 定义 Kafka UI 容器 container_name: kafka-ui # 设置容器名称为 kafka-ui image: provectuslabs/kafka-ui:latest # 使用 Provectus 提供的 Kafka UI 镜像 ports: - 9999:8080 # 将主机的 9999 端口映射到容器的 8080 端口 depends_on: - kafka1 - kafka2 - kafka3 # 在启动 kafka-ui 容器前,确保 kafka1、kafka2 和 kafka3 容器已经启动 environment: KAFKA_CLUSTERS_0_NAME: k1 # 定义第一个 Kafka 集群的名称 KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092 # 定义第一个 Kafka 集群的引导服务器地址 KAFKA_CLUSTERS_1_NAME: k2 # 定义第二个 Kafka 集群的名称 KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka2:29093 # 定义第二个 Kafka 集群的引导服务器地址 KAFKA_CLUSTERS_2_NAME: k3 # 定义第三个 Kafka 集群的名称 KAFKA_CLUSTERS_2_BOOTSTRAPSERVERS: kafka3:29094 # 定义第三个 Kafka 集群的引导服务器地址
注意:docker-compose.yml
文件的 version
字段在 Docker Compose V2 中是多余的,虽然不会导致问题,但可以去掉或更新。你可以将 docker-compose.yml
文件中的版本声明删除或使用最新版本(例如,不指定版本)。
(1) 打开终端或命令提示符,导航到包含 docker-compose.yml
文件的目录。
docker-compose.yml
文件的文件夹,然后在地址栏中输入 cmd
并按 Enter 键,这样会在该文件夹中打开命令提示符。cd
命令导航到该文件夹,例如:cd path/to/your/docker-compose-folder
(2) 运行以下命令启动所有服务:
docker-compose up -d
运行以下命令查看所有服务的状态:
docker-compose ps
示例输出:
Name Command State Ports ---------------------------------------------------------------------------------------------- kafka1 /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp, 29092/tcp kafka2 /etc/confluent/docker/run Up 0.0.0.0:9093->9093/tcp, 29093/tcp kafka3 /etc/confluent/docker/run Up 0.0.0.0:9094->9094/tcp, 29094/tcp zoo1 /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp zoo2 /etc/confluent/docker/run Up 0.0.0.0:2182->2182/tcp zoo3 /etc/confluent/docker/run Up 0.0.0.0:2183->2183/tcp kafka-ui /bin/sh -c /usr/bin/dumb- ... Up 0.0.0.0:9999->8080/tcp
解释:此命令显示当前运行的所有 Docker 容器及其状态。State 列表示容器是否正在运行,Ports 列显示容器端口映射情况。
docker-compose up -d
时出现project name must not be empty
问题,则需要指定项目名称。执行以下命令: docker-compose -p my_project_name_docker up -d
示例解释:
-p my_project_name_docker
:指定自定义项目名称 my_project_name_docker
。up -d
:启动所有服务,并在后台运行。此命令可以避免项目名称冲突或其他与项目名称相关的问题。
在执行yml文件时,出现
connecting to 127.0.0.1:10809: connectex: No connection could be made because the target machine actively refused it.
下面是解决方法:
① 打开 Docker Desktop 设置:
- 单击系统托盘中的 Docker 图标,选择“Settings”或“Preferences”。
② 检查代理设置:
- 选择“Resources” -> “Proxies”。
- 确认 HTTP Proxy 和 HTTPS Proxy 字段为空。如果不为空,请清空它们。
③ 保存设置并重启 Docker。
① 打开 Docker Desktop 设置:
- 单击系统托盘中的 Docker 图标,选择“Settings”或“Preferences”。
② 配置镜像加速器:
- 选择“Docker Engine”或“Daemon”。
- 在 registry-mirrors
字段中添加国内的镜像加速器 URL。例如:
{ "registry-mirrors": [ "http://hub-mirror.c.163.com", "https://mirror.baidubce.com", "https://registry.docker-cn.com", "https://hub.uuuadc.top", "https://docker.anyhub.us.kg", "https://dockerhub.jobcher.com", "https://dockerhub.icu", "https://docker.ckyl.me" ] }
③ 保存设置并重启 Docker。
① 清除代理环境变量:
$env:HTTP_PROXY="" $env:HTTPS_PROXY=""
② 手动拉取镜像:
docker pull confluentinc/cp-zookeeper:7.3.2 docker pull confluentinc/cp-kafka:7.3.2 docker pull provectuslabs/kafka-ui:latest
这是最重要的一点,在我尝试了所有方法仍然有问题之后,我更换了更快的网络,此时运行成功。
localhost:9999
可以访问 UI 后台,通过后台新建 topic 来验证集群是否工作。docker-compose.yml
文件所在目录可以在poweshell中输入:
cd path_to_the_file
docker-compose -p my_project_name_docker up -d
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
(1)bin/zookeeper-server-start.sh:
bin/
:表示脚本文件位于 bin
目录下。zookeeper-server-start.sh
:启动 Zookeeper 服务的脚本文件,用于启动 Zookeeper 服务。config/zookeeper.properties
:指定 Zookeeper 的配置文件路径,配置 Zookeeper 的启动参数。(2)bin/kafka-server-start.sh:
bin/
:表示脚本文件位于 bin
目录下。kafka-server-start.sh
:启动 Kafka 服务的脚本文件,用于启动 Kafka 服务。config/server.properties
:指定 Kafka 服务器的配置文件路径,配置 Kafka 的启动参数。bin/kafka-topics.sh --create --topic test_topic1 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
(1)bin/kafka-topics.sh:
bin/
:表示脚本文件位于 bin
目录下。kafka-topics.sh
:管理 Kafka 主题的脚本文件,用于创建、删除、列出 Kafka 主题。(2)–create:
(3)–topic:
test_topic1
:指定要创建的主题名称为 test_topic1
。(4)–bootstrap-server:
localhost:9092
:指定 Kafka 集群的引导服务器地址,通常是主机名或 IP 地址加端口号(在这里是本地主机 localhost
和端口 9092
)。(5)–partitions:
1
:指定创建主题时的分区数量。分区是 Kafka 中的并行单位,分区越多,主题可以处理的并行任务就越多。(6)–replication-factor:
1
:指定主题的副本因子,即每个分区的副本数量。副本因子决定了数据的容错性。值越大,Kafka 集群的容错能力越强。目的:启动 Kafka 集群并创建 Kafka 主题,以便在 Java 应用程序中使用 Kafka 进行消息传递。
(1)打开 PowerShell 或终端。
(2)进入 Kafka 安装目录,依次运行以下命令来启动 Zookeeper 和 Kafka 服务:
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
(1)在同一个终端,运行以下命令来创建 Kafka 主题:
bin/kafka-topics.sh --create --topic test_topic1 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
test_topic1
的 Kafka 主题,指定了引导服务器地址、分区数量和副本因子。Kafka 主题是消息的逻辑集合,分区是并行处理的单位,副本因子决定数据的容错性。确保在 src/main/resources/application.properties
文件中以及项目目录的其他相关配置文件中已配置 Kafka 相关属性,以便 Spring Boot 应用程序能够连接到 Kafka:
src └── main ├── java │ └── com │ └── example │ └── kafkastudy │ ├── KafkaStudyApplication.java │ ├── KafkaConfig.java │ ├── TicketController.java │ ├── SmsNotificationService.java │ ├── OrderRecordService.java │ └── TicketBusinessService.java └── resources ├── static ├── templates └── application.properties
application.properties
文件配置在 src/main/resources/application.properties
文件中配置 Kafka 相关属性:
spring.application.name=KafkaStudy1 # 设置 Spring 应用程序的名称 spring.kafka.bootstrap-servers=localhost:9092 # 设置 Kafka 服务器地址 spring.kafka.consumer.group-id=group_id # 设置 Kafka 消费者组 ID spring.kafka.consumer.auto-offset-reset=earliest # 设置消费者偏移量重置策略为 earliest(从最早的消息开始消费) spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置键反序列化器 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置值反序列化器 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer # 设置键序列化器 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 设置值序列化器 kafka.topic=test_topic1 # 指定要使用的 Kafka 主题名称 kafka.group_id=01 # 指定消费者组 ID
KafkaConfig.java
文件配置在 src/main/java/com/example/kafkastudy/KafkaConfig.java
文件中配置 Kafka:
package com.example.kafkastudy; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; @Configuration public class KafkaConfig { @Autowired private KafkaProperties kafkaProperties; @Bean public KafkaAdmin admin() { KafkaAdmin admin = new KafkaAdmin(kafkaProperties.buildAdminProperties()); admin.setFatalIfBrokerNotAvailable(true); return admin; } @Bean public NewTopic ticketBookingTopic() { return new NewTopic("ticket_booking", 1, (short) 1); } }
KafkaConfig.java
文件中定义了 Kafka 的配置,包括创建 KafkaAdmin 和 NewTopic 的 Bean。KafkaStudyApplication
类,选择 Run 'KafkaStudyApplication'
来启动 Spring Boot 应用程序KafkaStudyApplication
类是 Spring Boot 应用的主入口,运行它将启动整个应用程序,并使其能够与 Kafka 集群进行交互。KafkaProducerTest
类,选择 Run 'KafkaProducerTest'
来运行测试KafkaProducerTest
类包含测试用例,用于验证 Kafka 生产者的功能是否正常运行。这些测试用例将消息发送到 Kafka 主题,并验证消息是否成功传递。