阿里巴巴Canal中间件入门

2022/04/24 中间件 共 8529 字,约 25 分钟

今天我们来介绍一款MySQL数据库增量日志中间件,阿里巴巴Canal,做过数据库备份或同步的大都会用到。

canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ。

另外,canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑,包括java、c#、go、php、Python、Rust等多语言客户端。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

工作原理

我们先看下MySQL的主备复制原理。

MySQL主备复制原理

img

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

img

快速开始

我们需要做一些准备工作,包括MySQL开启binlog、下载安装并启动canal服务。

MySQL开启binlog

我的MySQL版本是8.0,通过homebrew安装。

对于自建MySQL,默认是没有开启binlog的,我们可以使用命令查看binglog的开启状态

show variables like 'log_bin%';

结果可以看到,log_bin = OFF,处于关闭状态。

image-20220430233250544

所以,我们需要先开启binlog写入功能,配置 binlog-format 为 ROW 模式。

找到本地安装的MySQL的 my.cnf 文件,Mac brew安装的路径为/usr/local/etc/my.cnf

my.cnf配置如下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步

重启mysql

mysql.server restart

image-20220502204350173

OK,上图显示已开启binlog。

授权canal权限

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

下载安装canal

下载release包,这里我只下载了deployer包,版本v1.1.5。

解压缩安装包,修改配置

vi conf/example/instance.properties

主要修改slaveId、数据库信息、账号密码等。

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

启动canal

sh bin/startup.sh

查看Server日志

tail -200f logs/canal/canal.log
2022-05-02 22:28:43.032 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2022-05-02 22:28:43.055 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2022-05-02 22:28:43.069 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2022-05-02 22:28:43.102 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.2.56(192.168.2.56):11111]
2022-05-02 22:28:44.070 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

查询实例日志

tail -100f logs/example/example.log

报错了,mysql8.0出现caching_sha2_password Auth failed报错。

2022-05-02 22:41:56.113 [destination = example , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /127.0.0.1:3306 failure
Caused by: java.io.IOException: connect /127.0.0.1:3306 failure
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:85)
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:90)
	at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
	at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:176)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: caching_sha2_password Auth failed
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:260)
	at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:82)
	... 4 more

我们使用Navicat登录下canal账号,就正常了。

2022-05-02 23:05:18.260 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2022-05-02 23:05:18.271 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2022-05-02 23:05:18.271 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2022-05-02 23:05:18.359 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

关闭服务

sh bin/stop.sh

Java客户端demo

canal支持java客户端,我们可以在java服务中

MySQL建表

我们先创建一个user表用来测试,库名canal_test。

CREATE TABLE `user` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(32) NOT NULL,
  `age` int DEFAULT NULL,
  `sex` tinyint DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

测试类

我们可以直接使用canal.example工程,也可以自己从头创建工程。下面我们自己写个demo。

依赖配置:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
</dependency>

测试类

测试类代码如下,运行该类。

package com.example.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * @author dxyin
 * @Date 2022-05-02 23:10
 */
public class SimpleCanalClientExample {

    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost",
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.ALTER.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

触发数据库变更

INSERT `user` VALUES (2,'canal',1, 0, NOW());

控制台日志如下:

================&gt; binlog[mysql-bin.000003:393] , name[canal_test,user] , eventType : INSERT
id : 2    update=true
username : canal    update=true
age : 1    update=true
sex : 0    update=true
create_time : 2022-05-02 22:52:03    update=true
empty count : 1
empty count : 2
empty count : 3

我们可以看到binlog信息,包括mysql库名、表名、事件类型Insert、被更新的字段信息。

再来一个Update语句:

UPDATE `user` set age = 100 WHERE id = 2;
================&gt; binlog[mysql-bin.000003:717] , name[canal_test,user] , eventType : UPDATE
-------&gt; before
id : 2    update=false
username : canal    update=false
age : 1    update=false
sex : 0    update=false
create_time : 2022-05-02 22:52:03    update=false
-------&gt; after
id : 2    update=false
username : canal    update=false
age : 100    update=true
sex : 0    update=false
create_time : 2022-05-02 22:52:03    update=false
empty count : 1
empty count : 2
empty count : 3

id=2该行的数据更新前后的信息都可以看到。

java服务端拿到表的增量数据后就可以做定制化的业务处理逻辑了。

最后

canal 作为 MySQL binlog 增量获取和解析工具,变更记录不仅可以使用多语言客户端解析,也可将变更记录投递到 MQ、elasticsearch等。

本文重在介绍canal的基本原理,并写了个简单的java demo测试,后面我会写canal变更记录投递到MQ和ES的相关文章。

文档信息

搜索

    Table of Contents