跳到主要内容

MQ 消息同步格式说明

CloudCanal 支持选择同步到 MQ 的消息存储格式,本文介绍 MQ 多种消息格式的定义说明,方便下游消费和使用。

消息格式介绍

支持的消息同步格式

  • CloudCanal Json:CloudCanal 默认的消息格式,解析数据库增量日志传输至 Kafka,支持批量消息传输。
  • Canal Json:对于 Canal 的兼容格式,数据存储格式为 Canal Json。
  • Aliyun DTS Avro:一种数据序列化格式,可以将数据结构或对象转化成便于存储或传输的格式。
  • Debezium Envelope:Debezium 官方的 CDC 消息格式,携带 SCHEMA 信息,对大数据下游消费友好。

目标端 MQ 支持情况

消息格式KafkaRocketMQRabbitMQ
CloudCanal Json支持支持支持
Canal Json支持支持支持
Aliyun DTS Avro支持--
Debezium Envelope支持--

源端 MQ 支持情况

消息格式KafkaRocketMQRabbitMQ
CloudCanal Json支持支持支持
Canal Json支持支持支持
Aliyun DTS Avro---
Debezium Envelope支持--

消息格式具体说明

CloudCanal Json

参数说明:

参数类型说明
actionString操作的类型,如:INSERT / UPDATE / DELETE。
bidLongBatchEventBuffer 的 Batch Id。
beforeList变更前的数据。
dataList当前操作的数据。
dbString数据库名称。
schemaStringSCHEMA 名称。
tableString表名。
dbValTypeMap字段数据类型名称。
jdbcTypeMap字段 JDBC 数据类型
entryTypeString源端事件类型,如:ROWDATA / TRANSACTIONEND。
isDdlBoolean是否为 DDL 操作。
pksList源端主键名称。
execTsLong源端 SQL 执行的时间,13位Unix时间戳,单位为毫秒。
sendTsLong操作发送的时间,13 位 Unix 时间戳,单位为毫秒。
sqlString源端执行的 DDL 语句。
tableChangesJson该消息为 DDL 时,携带的该表的元信息,如:主键,列。

DML操作示例如下:

{
"action":"INSERT/DELETE/UPDATE",
"before":[
// UPDATE 的 before 字段
{
"col1":"22",
"col2":"22",
"col_pk":"22"
}
],
"bid":0,
"data":[
{
"col1":"11",
"col2":"11",
"col_pk":"11"
}
],
"db":"db_test",
"dbValType":{
"col1":"varchar(22)",
"col2":"varchar(22)",
"col_pk":"varchar(22)"
},
"isDdl":false,
"entryType":"ROWDATA",
"execTs":1669789152000,
"jdbcType":{
"col1":12,
"col2":12,
"col_pk":12
},
"pks":[],
"schema":"db_test",
"sendTs":1669789153377,
"sql":"",
"table":"table_test"
}

DDL操作示例如下:

{
"action":"ALTER",
"before":[],
"bid":0,
"data":[],
"db":"db_test",
"dbValType":{
"col1":"varchar(22)",
"col2":"varchar(22)",
"col_pk":"varchar(22)"
},
"isDdl":true,
"entryType":"ROWDATA",
"execTs":1669789188000,
"jdbcType":{
"col1":12,
"col2":12,
"col_pk":12
},
"pks":[],
"schema":"db_test",
"sendTs":1669789189533,
"sql":"alter table table_test add col2 varchar(22) null",
"table":"table_test",
"tableChanges":{
"table":{
"columns":[
{
"jdbcType":12, // jdbc 类型。
"name":"col1", // 字段名称。
"position":0, // 字段的顺序。
"typeExpression":"varchar(22)", // 类型描述。
"typeName":"varchar" // 类型名称。
},
{
"jdbcType":12,
"name":"col2",
"position":1,
"typeExpression":"varchar(22)",
"typeName":"varchar"
},
{
"jdbcType":12,
"name":"col_pk",
"position":2,
"typeExpression":"varchar(22)",
"typeName":"varchar"
}
],
"primaryKeyColumnNames":["col_pk"] // 主键名列表。
},
"type":"ALTER"
}
}

Canal Json

参数说明:

参数类型说明
typeString操作的类型,如:INSERT / UPDATE / DELETE。
idLong操作的序列号。
oldList变更前的数据。
dataList当前操作的数据。
databaseString数据库名称。
tableString表名。
mysqlTypeMap字段数据类型名称。
sqlTypeMap字段 JDBC 数据类型
isDdlBoolean是否为 DDL 操作。
pkNamesList源端主键名称。
esLong源端 SQL 执行的时间,13位Unix时间戳,单位为毫秒。
tsLong操作发送的时间,13 位 Unix 时间戳,单位为毫秒。
sqlString源端执行的 DDL 语句。
tableChangesJson该消息为 DDL 时,携带的该表的元信息,如:主键,列。

DML操作示例如下:

{
"data":[
{
"col1":"11",
"col2":"11",
"col_pk":"11"
}
],
"database":"db_test",
"es":1669790847000,
"id":0,
"isDdl":false,
"mysqlType":{
"col1":"varchar(22)",
"col2":"varchar(22)",
"col_pk":"varchar(22)"
},
"old":[
// UPDATE 的 old 字段
{
"col1":"22",
"col2":"22",
"col_pk":"22"
}
],
"pkNames":["col_pk"],
"sql":"",
"sqlType":{
"col1":12,
"col2":12,
"col_pk":12
},
"table":"table_test",
"ts":1669790848072,
"type":"INSERT/DELETE/UPDATE"
}

DDL操作示例如下:

{
"data":[],
"database":"db_test",
"es":1669790951000,
"id":0,
"isDdl":true,
"mysqlType":{
"col1":"varchar(22)",
"col2":"varchar(22)",
"col_pk":"varchar(22)"
},
"old":[],
"pkNames":[],
"sql":"alter table table_test add col2 varchar(22) null",
"sqlType":{
"col1":12,
"col2":12,
"col_pk":12
},
"table":"table_test",
"tableChanges":{
"table":{
"columns":[
{
"jdbcType":12, // jdbc 类型。
"name":"col1", // 字段名称。
"position":0, // 字段的顺序。
"typeExpression":"varchar(22)", // 类型描述。
"typeName":"varchar" // 类型名称。
},
{
"jdbcType":12,
"name":"col2",
"position":1,
"typeExpression":"varchar(22)",
"typeName":"varchar"
},
{
"jdbcType":12,
"name":"col_pk",
"position":2,
"typeExpression":"varchar(22)",
"typeName":"varchar"
}
],
"primaryKeyColumnNames":["col_pk"] // 主键名列表。
},
"type":"ALTER"
},
"ts":1669790952584,
"type":"ALTER"
}

Aliyun DTS Avro

该消息类型需要根据 DTS Avro 的 SCHEMA 定义进行数据解析,DTS Avro 定义详情参见 DTS Avro 的 SCHEMA 定义

Debezium Envelope

该消息类型主要由 SCHEMA 和 PAYLOAD 构成,SCHEMA 是数据的元信息,PADYLOAD 是记录数据变化的内容。

SCHEMA 定义详情参见 Debezium 官方文档

Kafka 源端使用该消息格式,参见:源端 Kafka Debezium Json 使用说明

参数说明:

参数类型说明
opString操作的类型,如:c(INSERT),u(UPDATE),d(DELETE),a(ALTER)。
ts_msLong操作发送的时间,13 位 Unix 时间戳,单位为毫秒。
afterJson变更前的数据。
beforeJson变更后的数据。
sourceJson事件的元信息,如:db,table。
ddlString源端执行的 DDL 语句。
tableChangesJson该消息为 DDL 时,携带的该表的元信息,如:主键,列。

DML操作示例如下:

{
"schema":...,
"payload":{
"op":"i",
"ts_ms":1669796261933,
"after":{
"col1":"11",
"col2":"11",
"col_pk":"11"
},
"before":{},
"source":{
"ts_ms":1669796261933,
"db":"db_test",
"table":"table_test",
"connector":"MySQL",
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"server_id": 223344,
...
}
}
}

DDL操作示例如下:

{
"schema":...,
"payload":{
"databaseName":"db_test",
"ddl":"alter table table_test add col2 varchar(22) null",
"ts_ms":1669797213247,
"source":{
"ts_ms":1669796261933,
"db":"db_test",
"table":"table_test",
"connector":"MySQL",
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"server_id": 223344,
...
},
"tableChanges":{
"type":"ALTER",
"table":{
"columns":[
{
"jdbcType":12, // jdbc 类型。
"name":"col1", // 字段名称。
"position":0, // 字段的顺序。
"typeExpression":"varchar(22)", // 类型描述。
"typeName":"varchar" // 类型名称。
},
{
"jdbcType":12,
"name":"col2",
"position":1,
"typeExpression":"varchar(22)",
"typeName":"varchar"
},
{
"jdbcType":12,
"name":"col_pk",
"position":2,
"typeExpression":"varchar(22)",
"typeName":"varchar"
}
],
"primaryKeyColumnNames":["col_pk"], // 主键名列表。
}
}
}
}

附录

jdbcType 与数字对照表

jdbcType对应数字
unknown10000
ARRAY2003
BIGINT-5
BINARY-2
BIT-7
BLOB2004
BOOLEAN16
CHAR1
CLOB2005
DATALINK70
DATE91
DECIMAL3
DISTINCT2001
DOUBLE8
FLOAT6
INTEGER4
JAVA_OBJECT2000
LONGVARBINARY-4
LONGVARCHAR-1
NULL0
NUMERIC2
OTHER1111
REAL7
REF2006
SMALLINT5
STRUCT2002
TIME92
TIMESTAMP93
TINYINT-6
VARBINARY-3
VARCHAR12