今天我们来介绍一款MySQL数据库增量日志中间件,阿里巴巴Canal,做过数据库备份或同步的大都会用到。
canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ。
另外,canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑,包括java、c#、go、php、Python、Rust等多语言客户端。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
工作原理
我们先看下MySQL的主备复制原理。
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
快速开始
我们需要做一些准备工作,包括MySQL开启binlog、下载安装并启动canal服务。
MySQL开启binlog
我的MySQL版本是8.0,通过homebrew安装。
对于自建MySQL,默认是没有开启binlog的,我们可以使用命令查看binglog的开启状态
show variables like 'log_bin%';
结果可以看到,log_bin = OFF,处于关闭状态。
所以,我们需要先开启binlog写入功能,配置 binlog-format 为 ROW 模式。
找到本地安装的MySQL的 my.cnf 文件,Mac brew安装的路径为/usr/local/etc/my.cnf
my.cnf配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
重启mysql
mysql.server restart
OK,上图显示已开启binlog。
授权canal权限
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
下载安装canal
下载release包,这里我只下载了deployer包,版本v1.1.5。
解压缩安装包,修改配置
vi conf/example/instance.properties
主要修改slaveId、数据库信息、账号密码等。
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
启动canal
sh bin/startup.sh
查看Server日志
tail -200f logs/canal/canal.log
2022-05-02 22:28:43.032 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2022-05-02 22:28:43.055 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2022-05-02 22:28:43.069 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2022-05-02 22:28:43.102 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.2.56(192.168.2.56):11111]
2022-05-02 22:28:44.070 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
查询实例日志
tail -100f logs/example/example.log
报错了,mysql8.0出现caching_sha2_password Auth failed报错。
2022-05-02 22:41:56.113 [destination = example , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /127.0.0.1:3306 failure
Caused by: java.io.IOException: connect /127.0.0.1:3306 failure
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:85)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:90)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:176)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: caching_sha2_password Auth failed
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:260)
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:82)
... 4 more
我们使用Navicat登录下canal账号,就正常了。
2022-05-02 23:05:18.260 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2022-05-02 23:05:18.271 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2022-05-02 23:05:18.271 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2022-05-02 23:05:18.359 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
关闭服务
sh bin/stop.sh
Java客户端demo
canal支持java客户端,我们可以在java服务中
MySQL建表
我们先创建一个user表用来测试,库名canal_test。
CREATE TABLE `user` (
`id` int NOT NULL AUTO_INCREMENT,
`username` varchar(32) NOT NULL,
`age` int DEFAULT NULL,
`sex` tinyint DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
测试类
我们可以直接使用canal.example工程,也可以自己从头创建工程。下面我们自己写个demo。
依赖配置:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
测试类
测试类代码如下,运行该类。
package com.example.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @author dxyin
* @Date 2022-05-02 23:10
*/
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.ALTER.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
触发数据库变更
INSERT `user` VALUES (2,'canal',1, 0, NOW());
控制台日志如下:
================> binlog[mysql-bin.000003:393] , name[canal_test,user] , eventType : INSERT
id : 2 update=true
username : canal update=true
age : 1 update=true
sex : 0 update=true
create_time : 2022-05-02 22:52:03 update=true
empty count : 1
empty count : 2
empty count : 3
我们可以看到binlog信息,包括mysql库名、表名、事件类型Insert、被更新的字段信息。
再来一个Update语句:
UPDATE `user` set age = 100 WHERE id = 2;
================> binlog[mysql-bin.000003:717] , name[canal_test,user] , eventType : UPDATE
-------> before
id : 2 update=false
username : canal update=false
age : 1 update=false
sex : 0 update=false
create_time : 2022-05-02 22:52:03 update=false
-------> after
id : 2 update=false
username : canal update=false
age : 100 update=true
sex : 0 update=false
create_time : 2022-05-02 22:52:03 update=false
empty count : 1
empty count : 2
empty count : 3
id=2该行的数据更新前后的信息都可以看到。
java服务端拿到表的增量数据后就可以做定制化的业务处理逻辑了。
最后
canal 作为 MySQL binlog 增量获取和解析工具,变更记录不仅可以使用多语言客户端解析,也可将变更记录投递到 MQ、elasticsearch等。
本文重在介绍canal的基本原理,并写了个简单的java demo测试,后面我会写canal变更记录投递到MQ和ES的相关文章。
文档信息
- 本文作者:yindongxu
- 本文链接:https://iceblow.github.io/2022/04/24/Canal/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)