反馈已提交

网络繁忙

KingBaseES环境准备

  • 文档创建者:Wendy123456
  • 历史版本:9
  • 最近更新:Wendy123456 于 2026-06-11
  • icon提示:

    建议客户升级到 4.2.19.1 及之后版本;本文方案适用于 4.2.19.1 及之后版本。

    1. 概述

    使用实时管道任务、数据开发-实时任务同步 KingBaseES 数据前,需要参考本文在数据源中进行一些配置,为后续的数据同步做好准备。

    2. 数据库服务端基础配置

    icon提示:
    以下操作由 DBA(superuser) 在 KingbaseES 服务器上执行,目的是开启逻辑解码、设置表复制级别、创建 FDL 专用用户。

    2.1 确认当前使用的数据库版本

    数据库类型
    支持版本
    KingbaseES(MySQL模式V9.X
    KingbaseES(SQLServer模式V8.X
    KingBaseES(Oracle模式)V8.X

    2.2 赋予数据连接用户权限

    1)新建用户,作为 KingbaseES 数据连接的用户名。

    • su - kingbase:切换到 kingbase 安装用户。

    • ksql -U system -d test -W:以 system 用户登录 test 数据库,会提示输入密码。

    • CREATE USER fdl_user WITH REPLICATION LOGIN PASSWORD 'your_password'; 创建一个用户,fdl_user和密码需要自定义,并赋予他复制权限,该用户后续用于 FDL 数据连接。

    注:用户根据实际情况设置用户名和密码。

    1752144607452243.png

    2)为新建用户授予某个模式下所有表的读取权限。

    -- 将业务 schema 下所有表的查询权限授予 fdl_user
    -- 替换 FLY 为实际业务 schema 名称
    GRANT SELECT ON ALL TABLES IN SCHEMA FLY TO fdl_user;

    2.3 修改REPLICA IDENTITY

    修改复制标识为 FULL(使用整行作为标识),该属性决定了当数据发生 UPDATE/DELETE 时,日志记录的字段。

    注:所有需要进行实时同步的表,都应该做此操作。

    ALTER TABLE your_schema.your_table REPLICA IDENTITY FULL;
    • your_schema:模式名称。

    • your_table:表名称。

    2.4 修改配置文件

    1)使用 \q 语句退出 kingbase 登录。

    2)进入 KingbaseES数据目录下,数据目录位置查询方法请自行百度。

    本文示例中,数据目录位置为/home/KingbaseES/V9/data,用户需要找到自己实际的数据目录路径。

    3)修改配置文件 kingbase.conf(使用 vi 语句),将 wal_level 的值修改为 logical。示例如下。

    注:或者找到下面代码中的设置项,取消注释,修改对应的值。

    wal_level = logical

    4)设置用户 replication 权限。

    在 data 目录下的 sys_hba.conf 中,增加下述内容以保障账号访问权限。

    注:替换 username 为本文 2.2 节创建的用户。

    local replication username trust
    host replication username 127.0.0.1/32 trust
    host replication username ::1/128 trust

    2.5 重启数据库

    用户根据实际情况修改语句:

    ./sys_ctl restart -D /home/KingbaseES/V9/data

    2.6 检查权限是否可用

    1)使用SELECT * FROM pg_create_logical_replication_slot('slot_test', 'decoderbufs');创建 slot 。如下图所示:

    1752148400786330.png

    slot_test 用户可自定义,为逻辑复制槽的名称;创建成功代表:

    • 用户具备 REPLICATION 权限或超级用户权限。

    • wal_level 已设置为 logical 。

    2)使用SELECT * FROM pg_replication_slots语句查看有没有对应的 slot 。如下图所示:

    1752148655730910.png

    3)使用SELECT PG_DROP_REPLICATION_SLOT('slot_test');语句删除对应的 slot 。如下图所示:

    1752148728291432.png

    3. 超管创建 DDL 检测资源

    icon提示:

    后面需要配置 KingBaseES 数据连接开发实时任务和实时管道任务,如果数据连接中用户为DBA(superuser),忽略本章内容。

    如果数据连接中用户为普通用户,需要执行本章操作

    DBA(superuser)继续在 KingbaseES 中创建 DDL 同步所需的 schema、表、function、trigger。以下 SQL 中的名称均可自定义,以默认名称为例。

    3.1 创建schema

    CREATE SCHEMA IF NOT EXISTS fdl_temp;

    3.2 创建 DDL 临时表

    用来保存用户的 DDL 操作:

    用户可自定义修改 SQL 语句中的 trep_sync_ddl(为 ddlTable 名称):

    CREATE TABLE IF NOT EXISTS fdl_temp.trep_sync_ddl ( // trep_sync_ddl为用户自定义的表名
        id              bigserial primary key,
        ddl_time        timestamptz,
        command_tag     text,
        objid           text,
        object_type     text,
        schema_name     text,
        table_name      text,
        search_path     text,
        ddl_sql         text
    );

    3.3 创建 function

    用于往 3.2 中创建的 ddl table 中写入 ddl 信息。

    3.3.1 Function 1 —— 捕获 DDL 变更(ALTER TABLE、RENAME COLUMN 等)

    DO $$
    BEGIN
      EXECUTE $func$
        CREATE OR REPLACE FUNCTION fdl_temp.trep_pg_ddl_trigger_function()  // trep_pg_ddl_trigger_function用户可以自定义
        RETURNS event_trigger
        LANGUAGE plpgsql
        SECURITY definer
        SET search_path = fdl_temp, public
        AS $function$
        DECLARE
          current_rows integer;
        BEGIN
          INSERT INTO fdl_temp.trep_sync_ddl   // trep_sync_ddl为3.2节中用户自定义的tablename
            (ddl_time, command_tag, objid, object_type, schema_name, table_name, search_path, ddl_sql)
          SELECT
            CURRENT_TIMESTAMP,
            c.command_tag::varchar,
            c.objid::varchar,
            c.object_type::varchar,
            COALESCE(c.schema_name::varchar, split_part(c.object_identity::varchar, '.', 1))::varchar,
            (CASE
              WHEN strpos(c.object_identity::varchar, '.') > 0
                THEN split_part(c.object_identity::varchar, '.', 2)
              ELSE c.object_identity::varchar
            END)::varchar,
            current_setting('search_path')::varchar,
            pg_catalog.kdb_current_query()
          FROM sys_event_trigger_ddl_commands() c
          WHERE c.object_type IN ('table', 'table column');

          SELECT count(*) INTO current_rows FROM fdl_temp.trep_sync_ddl;  // trep_sync_ddl为3.2节中用户定义的tablename

          IF current_rows > 1000 THEN
            DELETE FROM fdl_temp.trep_sync_ddl  // trep_sync_ddl为3.2节中用户定义的tablename
            WHERE id IN (
              SELECT id FROM fdl_temp.trep_sync_ddl ORDER BY id ASC LIMIT 100  // trep_sync_ddl为3.2节中用户定义的tablename
            );
          END IF;
        END;
        $function$
      $func$;
    END
    $$ LANGUAGE plpgsql;

    3.3.2 Function 2 —— 捕获 DROP TABLE

    DO $$
    BEGIN
      EXECUTE $func$
        CREATE OR REPLACE FUNCTION fdl_temp.trep_pg_drop_trigger_function() // trep_pg_drop_trigger_function用户可以自定义
        RETURNS event_trigger
        LANGUAGE plpgsql
        SECURITY definer
        SET search_path = fdl_temp, public
        AS $function$
        DECLARE
          current_rows integer;
        BEGIN
          INSERT INTO fdl_temp.trep_sync_ddl (  // trep_sync_ddl为3.2节中用户自定义的tablename
            ddl_time, command_tag, objid, object_type, schema_name, table_name, search_path, ddl_sql
          )
          SELECT
            CURRENT_TIMESTAMP,
            'DROP',
            d.objid::varchar,
            d.object_type::varchar,
            COALESCE(d.schema_name::varchar, split_part(d.object_identity::varchar, '.', 1))::varchar,
            (CASE
              WHEN strpos(d.object_identity::varchar, '.') > 0
                THEN split_part(d.object_identity::varchar, '.', 2)
              ELSE d.object_identity::varchar
            END)::varchar,
            current_setting('search_path')::varchar,
            pg_catalog.kdb_current_query()
          FROM sys_event_trigger_dropped_objects() d
          WHERE d.object_type IN ('table');

          SELECT count(*) INTO current_rows FROM fdl_temp.trep_sync_ddl;  // trep_sync_ddl为3.2节中用户自定义的tablename

          IF current_rows > 1000 THEN
            DELETE FROM fdl_temp.trep_sync_ddl
            WHERE id IN (
              SELECT id FROM fdl_temp.trep_sync_ddl ORDER BY id ASC LIMIT 100 // trep_sync_ddl为3.2节中用户自定义的tablename
            );
          END IF;
        END;
        $function$
      $func$;
    END
    $$ LANGUAGE plpgsql;

    3.4 创建 Event Trigger

    用于感知 ddl 操作然后执行 3.3 节中的function:

    注:请认真看下 SQL 语句中的备注,根据实际情况修改。

    CREATE EVENT TRIGGER flysync_pg_ddl_trigger  // flysync_pg_ddl_trigger用户可以自定义名称
    ON ddl_command_end
    EXECUTE FUNCTION fdl_temp.trep_pg_ddl_trigger_function();  // trep_pg_ddl_trigger_function为 3.3 节中用户自定义的名称
     
     
    CREATE EVENT TRIGGER flysync_pg_drop_trigger // flysync_pg_drop_trigger用户可以自定义名称
    ON sql_drop
    EXECUTE FUNCTION fdl_temp.trep_pg_drop_trigger_function(); // trep_pg_drop_trigger_function为3.3节中用户自定义的名称

    3.5 数据连接最终授权补充

    在 2.2-2.3 节中已创建用户并授予业务表 SELECT 权限。还需要为 fdl_temp schema 补充以下权限:

    注:SQL 语句中的fdl_temp.trep_sync_ddl为本文 3.2 节用户自定义的表名,需根据实际情况修改

    -- Schema 使用权
    GRANT USAGE ON SCHEMA fdl_temp TO fdl_user;

    -- DDL 临时表读权限
    GRANT SELECT ON fdl_temp.trep_sync_ddl TO fdl_user;

    -- DDL 临时表清理权限(FDL 采集任务每小时清理一次)
    GRANT TRUNCATE ON fdl_temp.trep_sync_ddl TO fdl_user;

    -- 未来在 fdl_temp 下新建表自动可读
    ALTER DEFAULT PRIVILEGES IN SCHEMA fdl_temp GRANT SELECT ON TABLES TO fdl_user;
    权限
    目标对象用途
    USAGE业务 schema + fdl_temp允许访问 schema
    SELECT业务表 + fdl_temp 下表读取 DML 数据和 DDL 事件
    TRUNCATEfdl_temp.trep_sync_ddlDDL 临时表定时清理
    REPLICATION用户级别(2.2 节已赋予)创建 replication slot,读取 WAL 日志

    普通用户不需要 INSERT/CREATE/DROP 权限在 fdl_temp 上。DDL 临时表的写入由 SECURITY DEFINER function 以 DBA 权限完成

    4. 数据连接配置

    DBA 完成以上所有准备后,在 FineDataLink 中使用 fdl_user 创建数据连接:人大金仓KingBaseES(MySQL模式)数据源人大金仓KingBaseES(Oracle模式)数据源KingBaseES(SqlServer模式)数据源

    1781174068340674.png

    部分设置项说明如下表所示:

    设置项
    说明
    ddltable

    该数据源作为实时管道任务、实时任务来源端时

    数据连接用户角色说明
    superuser不需要配置该设置项 
    普通用户

    方案一:需要配置设置

    设置项
    说明
    ddlTable

    trep_sync_ddl

    (根据实际情况填写) 

    DDL 临时表名,不需要带 fdl_temp 前缀

    注:所有连接上的改动都必须重置(重新生成)采集任务 才会生效,不然生成采集任务后就会把对应配置保存起来,无法修改

    方案二:给该普通用户授予 superuser 权限 

    超管账号连接数据库,执行下面语句:

    注:将下面 SQL 语句中的 username 替换为实际用户名

    ALTER USER username WITH SUPERUSER;


    采集DDL事件配置

    需要开启,该按钮开启后,实时管道任务才能使用 同步源表结构变化 功能

    开启后仅在该数据连接作为实时来源端时生效,调整后需重启(暂停再启动)对应采集任务

    5. 资源回收

    如果用户的 ddltable 确认不再使用,则严格按照一下 SQL 顺序进行资源回收:

    DROP EVENT TRIGGER IF EXISTS flysync_pg_ddl_trigger;   // flysync_pg_ddl_trigger为用户自定义的trigger名
    DROP EVENT TRIGGER IF EXISTS flysync_pg_drop_trigger;  // flysync_pg_drop_trigger为用户自定义的trigger名
     
    DROP FUNCTION IF EXISTS fdl_temp.trep_pg_ddl_trigger_function();  // trep_pg_ddl_trigger_function为用户自定义的function名
    DROP FUNCTION IF EXISTS fdl_temp.trep_pg_drop_trigger_function(); // trep_pg_drop_trigger_function为用户自定义的function名
     
    DROP TABLE IF EXISTS fdl_temp.trep_sync_ddl;   // trep_sync_ddl为用户自定义的table名

    6. 后续步骤

    设计任务即可,详情参见:配置数据管道任务实时任务概述



    附件列表


    主题: 数据管道
    • 有帮助
    • 没帮助
    • 只是浏览
    中文(简体)

    鼠标选中内容,快速反馈问题

    鼠标选中存在疑惑的内容,即可快速反馈问题,我们将会跟进处理。

    不再提示

    10s后关闭



    AI

    联系我们
    在线支持
    获取专业技术支持,快速帮助您解决问题
    工作日9:00-12:00,13:30-17:30在线
    页面反馈
    针对当前网页的建议、问题反馈
    售前咨询
    采购需求/获取报价/预约演示
    或拨打: 400-811-8890 转1
    qr
    热线电话
    咨询/故障救援热线:400-811-8890转2
    总裁办24H投诉:17312781526
    提交页面反馈
    仅适用于当前网页的意见收集,帆软产品问题请在 问答板块提问前往服务平台 获取技术支持