提示:建议客户升级到 4.2.19.1 及之后版本;本文方案适用于 4.2.19.1 及之后版本。
1. 概述
使用实时管道任务、数据开发-实时任务同步 KingBaseES 数据前,需要参考本文在数据源中进行一些配置,为后续的数据同步做好准备。
2. 数据库服务端基础配置
提示:2.1 确认当前使用的数据库版本
| 数据库类型 | 支持版本 |
|---|---|
| KingbaseES(MySQL模式) | V9.X |
| KingbaseES(SQLServer模式) | V8.X |
| KingBaseES(Oracle模式) | V8.X |
2.2 赋予数据连接用户权限
1)新建用户,作为 KingbaseES 数据连接的用户名。
su - kingbase:切换到 kingbase 安装用户。
ksql -U system -d test -W:以 system 用户登录 test 数据库,会提示输入密码。
CREATE USER fdl_user WITH REPLICATION LOGIN PASSWORD 'your_password'; 创建一个用户,fdl_user和密码需要自定义,并赋予他复制权限,该用户后续用于 FDL 数据连接。
注:用户根据实际情况设置用户名和密码。

2)为新建用户授予某个模式下所有表的读取权限。
-- 将业务 schema 下所有表的查询权限授予 fdl_user
-- 替换 FLY 为实际业务 schema 名称
GRANT SELECT ON ALL TABLES IN SCHEMA FLY TO fdl_user;2.3 修改REPLICA IDENTITY
修改复制标识为 FULL(使用整行作为标识),该属性决定了当数据发生 UPDATE/DELETE 时,日志记录的字段。
注:所有需要进行实时同步的表,都应该做此操作。
ALTER TABLE your_schema.your_table REPLICA IDENTITY FULL;
your_schema:模式名称。
your_table:表名称。
2.4 修改配置文件
1)使用 \q 语句退出 kingbase 登录。
2)进入 KingbaseES 的数据目录下,数据目录位置查询方法请自行百度。
本文示例中,数据目录位置为/home/KingbaseES/V9/data,用户需要找到自己实际的数据目录路径。
3)修改配置文件 kingbase.conf(使用 vi 语句),将 wal_level 的值修改为 logical。示例如下。
注:或者找到下面代码中的设置项,取消注释,修改对应的值。
wal_level = logical
4)设置用户 replication 权限。
在 data 目录下的 sys_hba.conf 中,增加下述内容以保障账号访问权限。
注:替换 username 为本文 2.2 节创建的用户。
local replication username trust
host replication username 127.0.0.1/32 trust
host replication username ::1/128 trust2.5 重启数据库
用户根据实际情况修改语句:
./sys_ctl restart -D /home/KingbaseES/V9/data
2.6 检查权限是否可用
1)使用SELECT * FROM pg_create_logical_replication_slot('slot_test', 'decoderbufs');创建 slot 。如下图所示:

slot_test 用户可自定义,为逻辑复制槽的名称;创建成功代表:
用户具备 REPLICATION 权限或超级用户权限。
wal_level 已设置为 logical 。
2)使用SELECT * FROM pg_replication_slots语句查看有没有对应的 slot 。如下图所示:

3)使用SELECT PG_DROP_REPLICATION_SLOT('slot_test');语句删除对应的 slot 。如下图所示:

3. 超管创建 DDL 检测资源
提示:后面需要配置 KingBaseES 数据连接开发实时任务和实时管道任务,如果数据连接中用户为DBA(superuser),忽略本章内容。
如果数据连接中用户为普通用户,需要执行本章操作
DBA(superuser)继续在 KingbaseES 中创建 DDL 同步所需的 schema、表、function、trigger。以下 SQL 中的名称均可自定义,以默认名称为例。
3.1 创建schema
CREATE SCHEMA IF NOT EXISTS fdl_temp;
3.2 创建 DDL 临时表
用来保存用户的 DDL 操作:
用户可自定义修改 SQL 语句中的 trep_sync_ddl(为 ddlTable 名称):
CREATE TABLE IF NOT EXISTS fdl_temp.trep_sync_ddl ( // trep_sync_ddl为用户自定义的表名
id bigserial primary key,
ddl_time timestamptz,
command_tag text,
objid text,
object_type text,
schema_name text,
table_name text,
search_path text,
ddl_sql text
);3.3 创建 function
用于往 3.2 中创建的 ddl table 中写入 ddl 信息。
3.3.1 Function 1 —— 捕获 DDL 变更(ALTER TABLE、RENAME COLUMN 等)
DO $$
BEGIN
EXECUTE $func$
CREATE OR REPLACE FUNCTION fdl_temp.trep_pg_ddl_trigger_function() // trep_pg_ddl_trigger_function用户可以自定义
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY definer
SET search_path = fdl_temp, public
AS $function$
DECLARE
current_rows integer;
BEGIN
INSERT INTO fdl_temp.trep_sync_ddl // trep_sync_ddl为3.2节中用户自定义的tablename
(ddl_time, command_tag, objid, object_type, schema_name, table_name, search_path, ddl_sql)
SELECT
CURRENT_TIMESTAMP,
c.command_tag::varchar,
c.objid::varchar,
c.object_type::varchar,
COALESCE(c.schema_name::varchar, split_part(c.object_identity::varchar, '.', 1))::varchar,
(CASE
WHEN strpos(c.object_identity::varchar, '.') > 0
THEN split_part(c.object_identity::varchar, '.', 2)
ELSE c.object_identity::varchar
END)::varchar,
current_setting('search_path')::varchar,
pg_catalog.kdb_current_query()
FROM sys_event_trigger_ddl_commands() c
WHERE c.object_type IN ('table', 'table column');
SELECT count(*) INTO current_rows FROM fdl_temp.trep_sync_ddl; // trep_sync_ddl为3.2节中用户定义的tablename
IF current_rows > 1000 THEN
DELETE FROM fdl_temp.trep_sync_ddl // trep_sync_ddl为3.2节中用户定义的tablename
WHERE id IN (
SELECT id FROM fdl_temp.trep_sync_ddl ORDER BY id ASC LIMIT 100 // trep_sync_ddl为3.2节中用户定义的tablename
);
END IF;
END;
$function$
$func$;
END
$$ LANGUAGE plpgsql;
3.3.2 Function 2 —— 捕获 DROP TABLE
DO $$
BEGIN
EXECUTE $func$
CREATE OR REPLACE FUNCTION fdl_temp.trep_pg_drop_trigger_function() // trep_pg_drop_trigger_function用户可以自定义
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY definer
SET search_path = fdl_temp, public
AS $function$
DECLARE
current_rows integer;
BEGIN
INSERT INTO fdl_temp.trep_sync_ddl ( // trep_sync_ddl为3.2节中用户自定义的tablename
ddl_time, command_tag, objid, object_type, schema_name, table_name, search_path, ddl_sql
)
SELECT
CURRENT_TIMESTAMP,
'DROP',
d.objid::varchar,
d.object_type::varchar,
COALESCE(d.schema_name::varchar, split_part(d.object_identity::varchar, '.', 1))::varchar,
(CASE
WHEN strpos(d.object_identity::varchar, '.') > 0
THEN split_part(d.object_identity::varchar, '.', 2)
ELSE d.object_identity::varchar
END)::varchar,
current_setting('search_path')::varchar,
pg_catalog.kdb_current_query()
FROM sys_event_trigger_dropped_objects() d
WHERE d.object_type IN ('table');
SELECT count(*) INTO current_rows FROM fdl_temp.trep_sync_ddl; // trep_sync_ddl为3.2节中用户自定义的tablename
IF current_rows > 1000 THEN
DELETE FROM fdl_temp.trep_sync_ddl
WHERE id IN (
SELECT id FROM fdl_temp.trep_sync_ddl ORDER BY id ASC LIMIT 100 // trep_sync_ddl为3.2节中用户自定义的tablename
);
END IF;
END;
$function$
$func$;
END
$$ LANGUAGE plpgsql;
3.4 创建 Event Trigger
用于感知 ddl 操作然后执行 3.3 节中的function:
注:请认真看下 SQL 语句中的备注,根据实际情况修改。
CREATE EVENT TRIGGER flysync_pg_ddl_trigger // flysync_pg_ddl_trigger用户可以自定义名称
ON ddl_command_end
EXECUTE FUNCTION fdl_temp.trep_pg_ddl_trigger_function(); // trep_pg_ddl_trigger_function为 3.3 节中用户自定义的名称
CREATE EVENT TRIGGER flysync_pg_drop_trigger // flysync_pg_drop_trigger用户可以自定义名称
ON sql_drop
EXECUTE FUNCTION fdl_temp.trep_pg_drop_trigger_function(); // trep_pg_drop_trigger_function为3.3节中用户自定义的名称
3.5 数据连接最终授权补充
在 2.2-2.3 节中已创建用户并授予业务表 SELECT 权限。还需要为 fdl_temp schema 补充以下权限:
注:SQL 语句中的fdl_temp.trep_sync_ddl为本文 3.2 节用户自定义的表名,需根据实际情况修改
-- Schema 使用权
GRANT USAGE ON SCHEMA fdl_temp TO fdl_user;
-- DDL 临时表读权限
GRANT SELECT ON fdl_temp.trep_sync_ddl TO fdl_user;
-- DDL 临时表清理权限(FDL 采集任务每小时清理一次)
GRANT TRUNCATE ON fdl_temp.trep_sync_ddl TO fdl_user;
-- 未来在 fdl_temp 下新建表自动可读
ALTER DEFAULT PRIVILEGES IN SCHEMA fdl_temp GRANT SELECT ON TABLES TO fdl_user;
| 权限 | 目标对象 | 用途 |
|---|---|---|
| USAGE | 业务 schema + fdl_temp | 允许访问 schema |
| SELECT | 业务表 + fdl_temp 下表 | 读取 DML 数据和 DDL 事件 |
| TRUNCATE | fdl_temp.trep_sync_ddl | DDL 临时表定时清理 |
| REPLICATION | 用户级别(2.2 节已赋予) | 创建 replication slot,读取 WAL 日志 |
普通用户不需要 INSERT/CREATE/DROP 权限在 fdl_temp 上。DDL 临时表的写入由 SECURITY DEFINER function 以 DBA 权限完成
4. 数据连接配置
DBA 完成以上所有准备后,在 FineDataLink 中使用 fdl_user 创建数据连接:人大金仓KingBaseES(MySQL模式)数据源、人大金仓KingBaseES(Oracle模式)数据源、KingBaseES(SqlServer模式)数据源

部分设置项说明如下表所示:
| 设置项 | 说明 | |||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ddltable | 该数据源作为实时管道任务、实时任务来源端时:
| |||||||||||||
| 采集DDL事件配置 | 需要开启,该按钮开启后,实时管道任务才能使用 同步源表结构变化 功能 开启后仅在该数据连接作为实时来源端时生效,调整后需重启(暂停再启动)对应采集任务 |
5. 资源回收
如果用户的 ddltable 确认不再使用,则严格按照一下 SQL 顺序进行资源回收:
DROP EVENT TRIGGER IF EXISTS flysync_pg_ddl_trigger; // flysync_pg_ddl_trigger为用户自定义的trigger名
DROP EVENT TRIGGER IF EXISTS flysync_pg_drop_trigger; // flysync_pg_drop_trigger为用户自定义的trigger名
DROP FUNCTION IF EXISTS fdl_temp.trep_pg_ddl_trigger_function(); // trep_pg_ddl_trigger_function为用户自定义的function名
DROP FUNCTION IF EXISTS fdl_temp.trep_pg_drop_trigger_function(); // trep_pg_drop_trigger_function为用户自定义的function名
DROP TABLE IF EXISTS fdl_temp.trep_sync_ddl; // trep_sync_ddl为用户自定义的table名
