Canal 是阿里巴巴开源的一个基于 MySQL 数据库增量日志解析的项目,可以将 MySQL 的数据变化实时同步到其他中间件,如 Kafka。使用 Canal 将 MySQL 的新增数据实时推送到 Kafka,可以通过以下步骤实现:
步骤 1:安装和配置 Canal下载 Canal:从 GitHub 下载 Canal 并解压。
配置 Canal 连接 MySQL:编辑 conf/example/instance.properties 文件,配置 MySQL 服务器的地址、端口、用户名和密码。
canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset=UTF-8 canal.instance.tsdb.enable=true canal.instance.gtidon=false
配置 Canal 监听的数据库和表:在同一个配置文件中,设置 Canal 监听的数据库和表。
canal.instance.filter.regex=your_database\\.your_table步骤 2:安装和配置 Kafka
下载 Kafka:从 Kafka 官网 下载并解压 Kafka。
启动 Zookeeper 和 Kafka:首先启动 Zookeeper,然后启动 Kafka 服务。
# 启动 Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # 启动 Kafka bin/kafka-server-start.sh config/server.properties
创建 Kafka 主题:创建一个 Kafka 主题用于接收 Canal 推送的数据。
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic your_topic_name步骤 3:配置 Canal 推送到 Kafka
编辑 Canal 的配置:修改 conf/example/instance.properties 文件,启用 Canal 推送到 Kafka 的配置。
canal.mq.topic=your_topic_name canal.mq.partitionsNum=1 canal.mq.partitionHash=your_database.your_table:id
启用 Kafka:编辑 conf/canal.properties 文件,启用 Kafka。
canal.serverMode=kafka步骤 4:启动 Canal
在 Canal 解压目录下,启动 Canal 服务。
bin/startup.sh步骤 5:验证数据同步
插入或修改你在步骤 1 中配置的 MySQL 表中的数据,然后使用 Kafka 的消费者工具查看是否能够接收到数据。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your_topic_name --from-beginning
完成以上步骤后,当 MySQL 数据库中的数据发生变化时,变化的数据会实时推送到 Kafka 主题中。这样,你就可以在 Kafka 中实时处理或分析这些数据了。
网友回复