跳到主要内容

简介

SAP HANA(全称SAP High-performance ANalytic Appliance)是由SAP开发的一款内置列式数据库的系统平台, 除内置数据库以外,还具有高级分析功能(例如预测分析、空间数据处理、文本分析、文本搜索、流分析、图形数据处理),ETL功能,并内置了应用程序服务器,本文提到的 SAP HANA 特指数据平台内置的数据库管理系统。 SAP HANA 是内存数据库系统,可以把系统所有的数据都载入内存中,因此,与传统的将数据存储在硬盘上的数据库相比,HANA的性能可以提升10~10,000倍。一般SAP HANA 内置在 SAP ERP 系统中作为整体提供服务,在制造业应用广泛。 现如今企业都会建立内部的统一数据分析平台,SAP HANA 保存了ERP相关数据,如何实时同步 SAP HANA 的数据到数据平台,一直是困扰企业用户的问题。

CloudCanal免费的数据同步工具,新版本中支持 SAP HANA 同步到 Mysql、MariaDB、Doris、StarRocks,同时支持Mysql同步到多种数据源。下面将介绍 CloudCanal 实现 SAP HANA 实时同步的原理。

基于触发器 (Trigger) 实现Hana实时数据同步

整体流程

CloudCanal 实现触发器同步的整体流程如下:

  1. 安装触发器,通过触发器捕获增量变更数据
  2. 记录位点,记录增量数据数据同步的起点
  3. 执行全量数据同步
  4. 执行增量数据同步

触发器安装

触发器是一种自动触发执行的存储过程,它可以在数据变更前执行也可以在数据变更后执行,因为本质也是存储过程,所以存储过程支持的操作触发器均支持。

不同数据库对触发器的支持程度不同,Hana 的触发器支持监听 I(新增)/U(更新)/D(删除) 三种事件,因此数据的所有变更都可以通过触发器捕获。

安装触发器的方式与创建存储过程类似,即通过执行Sql创建触发器。

通过触发器实现增量数据同步,需要触发器捕获数据的I/U/D变更事件并写入增量CDC数据表, 下面是触发器执行的整体流程:

触发器安装示例

下面是安装触发器的示例脚本

CREATE TRIGGER "SYSTEM"."CLOUD_CANAL_ON_U_TEST_COLUMN_1_TRIGGER" AFTER UPDATE ON "SYSTEM"."TEST_COLUMN_1" REFERENCING OLD ROW OLD, NEW ROW NEW FOR EACH ROW
BEGIN
-- 定义异常处理器,当触发器发生异常时退出执行,不阻塞业务操作
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
if 1=1 then
if 1=1 then
-- 写入变更数据到增量CDC数据表,包括主键数据、更新前的数据、更新后的数据
insert into "SYSTEM"."CLOUD_CANAL_TRIGGER_DATA" (catalog_name, schema_name, table_name, event_type, trigger_id, pk_data, row_data, old_data, transaction_id, external_data, create_time)
values(
'HXE',
'SYSTEM',
'TEST_COLUMN_1',
'U',
3116,
-- 将变更数据拼接字符串保存,注意拼接后处理转义问题
'{' || '"TEST_BIGINT":' || case when :old."TEST_BIGINT" is null then '"CLOUD_CANAL_NULL"' else concat(concat('"',cast(TO_DECIMAL(:old."TEST_BIGINT") as char)),'"') end || '}',
'{' || '"TEST_INTEGER":' || case when :new."TEST_BIGINT" is null then '"CLOUD_CANAL_NULL"' else concat(concat('"',cast(TO_DECIMAL(:new."TEST_BIGINT") as char)),'"') end || '}',
'{' || '"TEST_INTEGER":' || case when :new."TEST_BIGINT" is null then '"CLOUD_CANAL_NULL"' else concat(concat('"',cast(TO_DECIMAL(:new."TEST_BIGINT") as char)),'"') end || '}',
'',
CURRENT_UTCTIMESTAMP
);
END IF;
END IF;
END;

安装好触发器,下面介绍增量CDC数据表的具体结构设计

增量CDC数据表

增量CDC数据表结构如下:

CREATE COLUMN TABLE "SYSTEM"."CLOUD_CANAL_TRIGGER_DATA" ("DATA_ID" BIGINT GENERATED BY DEFAULT AS IDENTITY (
START WITH 1 INCREMENT BY 1) NOT NULL ,
"CATALOG_NAME" VARCHAR(255) NULL ,
"SCHEMA_NAME" VARCHAR(255) NOT NULL ,
"TABLE_NAME" VARCHAR(255) NOT NULL ,
"EVENT_TYPE" VARCHAR(20) NOT NULL ,
"ROW_DATA" CLOB,
"PK_DATA" CLOB,
"OLD_DATA" CLOB,
"TRIGGER_ID" INTEGER NOT NULL ,
"EXTERNAL_DATA" VARCHAR(50),
"CREATE_TIME" LONGDATE,
PRIMARY KEY ("DATA_ID"));
CREATE UNIQUE CPBTREE INDEX "CLOUD_CANAL_IDX_D_ID" ON
"SYSTEM"."CLOUD_CANAL_TRIGGER_DATA" ( "DATA_ID" ASC);
CREATE INDEX CLOUD_CANAL_TRIGGER_DATA_CREATE_TIME_IDX ON "SYSTEM".CLOUD_CANAL_TRIGGER_DATA (CREATE_TIME);

增量CDC数据表记录了每次变更数据的数据库( CATALOG_NAME )、模式名称( SCHEMA_NAME )、表名称( TABLE_NAME )、事件类型( EVENT_TYPE )、变更前的数据镜像( ROW_DATA )、变更前主键数据( PK_DATA )、变更后的数据镜像( OLD_DATA )、触发器ID( TRIGGER_ID )、变更时间( CREATE_TIME )。

另外,需要注意的是使用自增ID作为主键,同时记录创建时间。自增ID( DATA_ID ) 可以唯一标识数据变更事件并确保有序,创建时间( CREATE_TIME ) 作为数据变更事件的时间戳,记录数据变更发生时间。

如何顺序扫描增量CDC数据表并确保不丢数据

扫描增量CDC数据表需要做到不重复扫、不漏数据、顺序扫描。

保证顺序的方式是通过自增ID排序,即order by DATA_ID asc,通过这个方式相当于对全局的变更事件进行编号,基于编号进行扫描和消费,可确保不重。但只做到这一点还不够,会出现丢数据情况,数据丢失的原因如图所示:

当查询的语句执行时,可能有部分事务没有commit,导致漏扫,这种问题如何解决?

首先想到的可能是等待一段时间,但是等待多久合适也是问题,时间长了延迟高,时间短了丢数据,而且当事务出现回滚时,自增序列会出现缺失,缺失的原因是事务没有回滚前占用了自增ID生成的序号,事务回滚后占用的序号也不会被重复使用。遇到自增ID序号缺失的情况,通过等待一段时间方式,只能每次都等待最大超时时间,会导致同步延迟增大。

这个问题的关键点是确定占用自增ID的事务是否还在活跃状态。

CloudCanal采用的解决办法是查缺补漏,在扫描增量CDC数据表时遇到某个数据id缺失的情况,会尝试 insert 一条相同id的数据,通过唯一键来判断这个id的数据是否被占用,如果出现异常,则重新查询;如没有异常,则 会写入数据占用这个id,因为这个id的数据已经被填充,因此也不用担心这个id的数据被漏扫,可以继续读取大于这个id数据。

位点

位点用于管理数据同步的进度,记录哪些变更事件已经同步、哪些变更事件没有同步。

基于触发器的数据同步方案选择增量CDC数据表的自增ID及时间戳作为位点,自增ID可以精确定位到每个数据变更事件,时间戳可以方便以用户视角感知同步任务的延迟情况。

什么时间更新位点?

当捕获的变更事件成功写入目标端后更新位点,如果写入数据成功更新位点失败会导致数据不一致吗?

答案是不会,因为源端的变更事件是顺序保存并且顺序读取,类似Mysql binlog,只要 CloudCanal 按照源端的事件顺序消费,当消费到事件末尾时可以确保源端和目标端的的数据状态一致。

延迟的判断方式

延迟可通过位点来判断,如果位点不断向前推进,可以得到正确延迟时间,但是如果没有变更事件,位点不更新延迟就会持续增大,但实际上没有延迟。

CloudCanal 的解决办法,通过查询增量CDC数据表判断是否存在延迟,如果有数据返回按照数据上记录的时间戳判断延迟,如果没有数据返回发送心跳事件,根据心跳上的事件戳判断是否延迟

总结

本文简单介绍了 Hana CDC 技术,CloudCanal 是如何实现稳定的增量 DML 同步