反馈已提交

网络繁忙

PostgreSQL-pgoutput读取方式

  • 文档创建者:Wendy123456
  • 历史版本:8
  • 最近更新:Wendy123456 于 2026-06-11
  • icon提示:

    建议客户升级到 4.2.19.1 及之后版本,再使用 pgoutput 读取方式。

    本文方案适用于 4.2.19.1 及之后版本。

    1. 概述

    1.1 版本

    FineDataLink 版本
    功能变动
    4.2.10.1PostgreSQL 作为数据开发-实时任务、实时管道任务的数据来源时,适配 pgoutput 逻辑解码插件

    1.2 功能简介

    本文介绍 PostgreSQL 作为数据开发-实时任务、实时管道任务的数据来源且读取方式选择 pgoutput 时需要做的一些配置。

    21.png

    1.3 wal2json和pgoutput的区别

    wal2jaon 读取方式需要额外安装插件。

    2. 数据库服务端基础配置

    本文以 PostgreSQL 安装在 Linux 系统中操作为例;若 PostgreSQL 安装在 Windows 系统中,可以在 【Windows环境】PostgreSQL环境准备 文档中找到与本文步骤相同的操作,进行参考。

    icon提示:
    以下操作由 DBA(superuser) 在 PostgreSQL 服务器上执行,目的是开启逻辑解码、设置表复制级别、创建 FDL 专用用户。

    2.1 确认当前使用的数据库版本

    PostgreSQL 10 以上版本& FineDataLink 为 4.2.10.1 及之后版本时,适配 pgoutput 逻辑解码插件(该插件内置在 PostgreSQL 中,无需用户手动安装)。

    2.2 创建 FDL 数据连接用户

    SQL 语句如下,需要该用户为 PostgreSQL replication 角色、PostgreSQL login 角色,并且有需要同步的表的 select  权限。

    fdl_user 和密码请根据实际情况自定义,该用户后续用于 FDL 数据连接。your_schema 替换为实际业务 schema 名称。

    -- 使用 postgres 超管登录
    psql -U postgres -d your_database

    -- 创建用户(带 REPLICATION 和 LOGIN 权限)
    CREATE USER fdl_user WITH REPLICATION LOGIN PASSWORD 'your_password';

    -- 授予业务 schema 下所有表的查询权限
    GRANT SELECT ON ALL TABLES IN SCHEMA your_schema TO fdl_user;

    2.3 设置 REPLICA IDENTITY FULL

    pgoutput 模式下,所有需实时同步的表必须设置 REPLICA IDENTITY FULL,以确保 UPDATE 和 DELETE 操作能捕获完整行数据。

    所有需要实时同步的表都必须执行此操作。若遗漏,管道启动时会报错中止。

    -- 对每张需要同步的表执行(替换 schema 和 table 为实际名称)
    ALTER TABLE your_schema.your_table REPLICA IDENTITY FULL;

    例如将模式为 public 的表「产品名称维度」修改设置,则输入如下语句:

    1781166746861008.png

    2.4 赋予用户 replication 和 login 权限命令

    进入 PostgreSQL 安装目录下的 data 文件夹,进入 data 文件夹中,找到 postgresql.conf、pg_hba.conf,对这两个文件进行修改。

    • 修改 PostgreSQL 复制槽设置。

    本文示例中,postgresql.conf 文件在/var/lib/pgsql/12/data目录下,使用vi /var/lib/pgsql/12/data/postgresql.conf(用户根据实际情况修改 postgresql.conf 所在路径)语句编辑该文件,在文件末尾追加以下配置:

    注:或者找到下面代码中的设置项,取消注释,修改对应的值。

    wal_level = logical
    max_wal_senders = 10
    max_replication_slots = 10
    语句
    说明

    wal_level = logical

    开启逻辑解码
    max_wal_senders = 10最大 WAL 发送进程数,需要 ≥ FDL 采集任务数
    max_replication_slots = 10最大复制槽数量,需要 ≥ FDL 采集任务数
    • 设置用户 replication 权限。

    本文示例中,pg_hba.conf 文件在/var/lib/pgsql/12/data目录下,使用vi /var/lib/pgsql/12/data/pg_hba.conf(用户根据实际情况修改 pg_hba.conf 所在路径)语句编辑该文件,把下图中红圈中的两行前边的 # 去掉,或者直接追加:

    local   replication   all               trust
    host    replication   all   127.0.0.1/32   trust
    host    replication   all   ::1/128         trust

    2.5 重启数据库

    # 根据系统不同,选择对应命令
    systemctl restart postgresql-12
    # 或
    pg_ctl restart -D /var/lib/pgsql/12/data

    3. 超管创建 DDL 检测资源

    icon提示:

    后面需要配置 PostgreSQL数据连接 开发实时任务和实时管道任务,如果数据连接中用户为DBA(superuser),忽略本章内容。

    如果数据连接中用户为普通用户,需要执行本章操作

    DBA(superuser)继续在 PostgreSQL 中创建 DDL 同步所需的 schema、表、function、trigger、publication。

    注:无法捕获字段注释变更,仅支持表结构层面的变更(增删表、增删字段、修改字段名/类型、重命名表)。

    3.1 创建schema

    如果没有 schema fdl_temp 则使用下面语句创建schema fdl_temp:

    CREATE SCHEMA IF NOT EXISTS fdl_temp;

    3.2 创建 DDL 临时表

    用来保存用户的 DDL 操作:

    用户可自定义修改 SQL 语句中的 trep_sync_ddl(为 ddlTable 名称):

    CREATE TABLE IF NOT EXISTS fdl_temp.trep_sync_ddl (
        id           bigserial primary key,
        ddl_time     timestamptz,
        command_tag  text,
        objid        text,
        object_type  text,
        schema_name  text,
        table_name   text,
        search_path  text,
        ddl_sql      text
    );

    表名 trep_sync_ddl 可自定义,后续 function 和 FDL 配置中保持一致即可。

    3.3 创建function

    本文 SQL 用于往 3.2 节中创建的 ddl table 中写入 ddl 信息。

    3.3.1 Function 1 —— 捕获 DDL 变更(ALTER TABLE、RENAME COLUMN 等)

    CREATE OR REPLACE FUNCTION fdl_temp.trep_pg_ddl_trigger_function() // trep_pg_ddl_trigger_function用户可以自定义
    RETURNS event_trigger
    LANGUAGE plpgsql
    SECURITY definer
    SET search_path = fdl_temp, public
    AS $function$
    DECLARE
      ddl_table_name text := 'trep_sync_ddl';  // trep_sync_ddl为 3.2 中用户自定义的tablename
      full_table_name text := format('%I.%I', 'fdl_temp', ddl_table_name);
      current_rows integer;
    BEGIN
      EXECUTE 'INSERT INTO ' || full_table_name || ' (' ||
              'ddl_time, command_tag, objid, object_type, schema_name, table_name, search_path, ddl_sql' ||
              ') ' ||
              'SELECT ' ||
              'CURRENT_TIMESTAMP, ' ||
              'c.command_tag, ' ||
              'c.objid, ' ||
              'c.object_type, ' ||
              'COALESCE(c.schema_name, split_part(c.object_identity, ''.'', 1)) AS schema_name, ' ||
              'split_part(c.object_identity, ''.'', 2) AS table_name, ' ||
              'current_setting(''search_path''), ' ||
              'pg_catalog.current_query() ' ||
              'FROM pg_event_trigger_ddl_commands() c ' ||
              'WHERE c.object_type IN (''table'', ''table column'')';

      EXECUTE 'SELECT count(*) FROM ' || full_table_name
      INTO current_rows;

      IF current_rows > 1000 THEN
        EXECUTE 'DELETE FROM ' || full_table_name ||
                ' WHERE id IN (' ||
                'SELECT id FROM ' || full_table_name || ' ORDER BY id ASC LIMIT 100)';
      END IF;
    END;
    $function$;

    3.3.2 Function 2 —— 捕获 DROP TABLE

    CREATE OR REPLACE FUNCTION fdl_temp.trep_pg_drop_trigger_function()  //trep_pg_drop_trigger_function用户可以自定义
    RETURNS event_trigger
    LANGUAGE plpgsql
    SECURITY definer
    SET search_path = fdl_temp, public
    AS $function$
    DECLARE
      ddl_table_name text := 'trep_sync_ddl';  // trep_sync_ddl为3.2中用户自定义的tablename
      full_table_name text := format('%I.%I', 'fdl_temp', ddl_table_name);
      current_rows integer;
    BEGIN
      EXECUTE 'INSERT INTO ' || full_table_name || ' (' ||
              'ddl_time, command_tag, objid, object_type, schema_name, table_name, search_path, ddl_sql' ||
              ') ' ||
              'SELECT ' ||
              'CURRENT_TIMESTAMP, ' ||
              '''DROP'', ' ||
              'd.objid, ' ||
              'd.object_type, ' ||
              'COALESCE(d.schema_name, split_part(d.object_identity, ''.'', 1)) AS schema_name, ' ||
              'split_part(d.object_identity, ''.'', 2) AS table_name, ' ||
              'current_setting(''search_path''), ' ||
              'pg_catalog.current_query() ' ||
              'FROM pg_event_trigger_dropped_objects() d ' ||
              'WHERE d.object_type IN (''table'')';

      EXECUTE 'SELECT count(*) FROM ' || full_table_name
      INTO current_rows;

      IF current_rows > 1000 THEN
        EXECUTE 'DELETE FROM ' || full_table_name ||
                ' WHERE id IN (' ||
                'SELECT id FROM ' || full_table_name || ' ORDER BY id ASC LIMIT 100)';
      END IF;
    END;
    $function$;

    3.4 创建 Event Trigger

    用于感知 ddl 操作然后执行 3.3 中的 function:

    CREATE EVENT TRIGGER flysync_pg_ddl_trigger   // flysync_pg_ddl_trigger用户可以自定义名称
    ON ddl_command_end
    EXECUTE PROCEDURE fdl_temp.trep_pg_ddl_trigger_function();  // trep_pg_ddl_trigger_function为3.3节中用户自定义的名称

    CREATE EVENT TRIGGER flysync_pg_drop_trigger  // flysync_pg_drop_trigger用户可以自定义名称
    ON sql_drop
    EXECUTE PROCEDURE fdl_temp.trep_pg_drop_trigger_function();  // trep_pg_drop_trigger_function为3.3节中用户自定义的名称

    3.5 创建publication

    「建议」:用户可自定义修改 SQL 语句中的 fdl_publication 值:

    CREATE PUBLICATION fdl_publication FOR ALL TABLES;       // fdl_publication用户可以自定义名称

    如果想要实现仅对单表进行 publication 权限的赋予,命令如下:

    CREATE PUBLICATION fdl_publication FOR TABLE schema.table;

    注意事项如下:

    1)若超管已提前创建了 FOR ALL TABLES 的 publication,FDL 系统检测到后不会再执行 ALTER PUBLICATION ADD/DROP TABLE。

    2)多个 FDL 任务可以共享同一个 publication;DBA 提前创建 FOR ALL TABLES 的 publication 后,多个任务可以共享,系统检测到已配置后不会修改它。

    3)如果对单表进行 publication 权限的赋予,需注意:不在 publication 范围内的表无法被 pgoutput 正常读取。

    3.6 数据连接最终授权补充

    在本文 2.2 节中已创建用户并授予业务表 SELECT 权限。还需要为 fdl_temp schema 补充以下权限:

    注:SQL 语句中的fdl_temp.trep_sync_ddl为本文 3.2 节用户自定义的表名,需根据实际情况修改。

    -- Schema 使用权
    GRANT USAGE ON SCHEMA fdl_temp TO fdl_user;

    -- DDL 临时表读权限
    GRANT SELECT ON fdl_temp.trep_sync_ddl TO fdl_user;

    -- DDL 临时表清理权限(FDL 采集任务每小时清理一次)
    GRANT TRUNCATE ON fdl_temp.trep_sync_ddl TO fdl_user;

    -- 未来在 fdl_temp 下新建表自动可读
    ALTER DEFAULT PRIVILEGES IN SCHEMA fdl_temp GRANT SELECT ON TABLES TO fdl_user;
    权限
    目标对象用途
    USAGE业务 schema + fdl_temp允许访问 schema
    SELECT业务表 + fdl_temp 下表读取 DML 数据和 DDL 事件
    TRUNCATEfdl_temp.trep_sync_ddlDDL 临时表定时清理
    REPLICATION用户级别(2.2 节已赋予)创建 replication slot,读取 WAL 日志

    普通用户不需要 INSERT/CREATE/DROP 权限在 fdl_temp 上。DDL 临时表的写入由 SECURITY DEFINER function 以超管权限完成。

    4. 数据连接配置

    超管完成以上所有准备后,在 FineDataLink 中使用 fdl_user 创建 PostgreSQL 数据连接

    1781169255750086.png

    设置项
    说明
    ddlTable、publication
    数据连接用户角色对应操作
    超管不需要配置这两个设置项 
    普通用户

    方案一:需要配置这两个设置项

    设置项说明
    ddlTabletrep_sync_ddl(根据实际情况填写) DDL 临时表名,不需要带 fdl_temp 前缀
    publicationfdl_publication(根据实际情况填写) 3.5 节中创建的 publication 名称


    方案二:给这个普通用户赋权,使其具有 superuser 角色权限,步骤如下:

    注:superuser 拥有数据库的全部权限,可以绕过所有权限检查,建议谨慎授予

    1)以现有的 superuser 身份连接数据库

    2)执行 ALTER USER username WITH SUPERUSER;语句(根据实际情况替换 username)

    注:所有连接上的改动都必须重置(重新生成)采集任务 才会生效,不然生成采集任务后就会把对应配置保存起来,无法修改

    采集DDL事件配置需要开启,该按钮开启后,实时管道任务才能使用 同步源表结构变化 功能

    开启后仅在该数据连接作为实时来源端时生效,调整后需重启(暂停再启动)对应采集任务

    5. 资源回收

    如果确认不再使用 DDL 同步功能,按以下严格顺序清理资源:

    DROP PUBLICATION IF EXISTS fdl_publication;    // fdl_publication为用户自定义的publication名
     
    DROP EVENT TRIGGER IF EXISTS flysync_pg_ddl_trigger; // flysync_pg_ddl_trigger为用户自定义的trigger名
    DROP EVENT TRIGGER IF EXISTS flysync_pg_drop_trigger; // flysync_pg_drop_trigger为用户自定义的trigger名
     
    DROP FUNCTION IF EXISTS fdl_temp.trep_pg_ddl_trigger_function(); // trep_pg_ddl_trigger_function为用户自定义的function名
    DROP FUNCTION IF EXISTS fdl_temp.trep_pg_drop_trigger_function(); // trep_pg_drop_trigger_function为用户自定义的function名
     
    DROP TABLE IF EXISTS fdl_temp.trep_sync_ddl;// trep_sync_ddl为用户自定义的table名

    如自定义了名称,请替换为实际值。必须先删除 publication 和 trigger,再删除 function 和 table,否则会因依赖关系报错。

    6. 后续步骤

    设计任务即可,详情参见:配置数据管道任务实时任务概述




    附件列表


    主题: 数据管道
    • 有帮助
    • 没帮助
    • 只是浏览
    中文(简体)

    鼠标选中内容,快速反馈问题

    鼠标选中存在疑惑的内容,即可快速反馈问题,我们将会跟进处理。

    不再提示

    10s后关闭



    AI

    联系我们
    在线支持
    获取专业技术支持,快速帮助您解决问题
    工作日9:00-12:00,13:30-17:30在线
    页面反馈
    针对当前网页的建议、问题反馈
    售前咨询
    采购需求/获取报价/预约演示
    或拨打: 400-811-8890 转1
    qr
    热线电话
    咨询/故障救援热线:400-811-8890转2
    总裁办24H投诉:17312781526
    提交页面反馈
    仅适用于当前网页的意见收集,帆软产品问题请在 问答板块提问前往服务平台 获取技术支持