提示:建议客户升级到 4.2.19.1 及之后版本,再使用 pgoutput 读取方式。
本文方案适用于 4.2.19.1 及之后版本。
1. 概述
1.1 版本
| FineDataLink 版本 | 功能变动 |
|---|---|
| 4.2.10.1 | PostgreSQL 作为数据开发-实时任务、实时管道任务的数据来源时,适配 pgoutput 逻辑解码插件 |
1.2 功能简介
本文介绍 PostgreSQL 作为数据开发-实时任务、实时管道任务的数据来源且读取方式选择 pgoutput 时,需要做的一些配置。

1.3 wal2json和pgoutput的区别
wal2jaon 读取方式需要额外安装插件。
2. 数据库服务端基础配置
本文以 PostgreSQL 安装在 Linux 系统中操作为例;若 PostgreSQL 安装在 Windows 系统中,可以在 【Windows环境】PostgreSQL环境准备 文档中找到与本文步骤相同的操作,进行参考。
提示:2.1 确认当前使用的数据库版本
PostgreSQL 10 以上版本& FineDataLink 为 4.2.10.1 及之后版本时,适配 pgoutput 逻辑解码插件(该插件内置在 PostgreSQL 中,无需用户手动安装)。
2.2 创建 FDL 数据连接用户
SQL 语句如下,需要该用户为 PostgreSQL replication 角色、PostgreSQL login 角色,并且有需要同步的表的 select 权限。
fdl_user 和密码请根据实际情况自定义,该用户后续用于 FDL 数据连接。your_schema 替换为实际业务 schema 名称。
-- 使用 postgres 超管登录
psql -U postgres -d your_database
-- 创建用户(带 REPLICATION 和 LOGIN 权限)
CREATE USER fdl_user WITH REPLICATION LOGIN PASSWORD 'your_password';
-- 授予业务 schema 下所有表的查询权限
GRANT SELECT ON ALL TABLES IN SCHEMA your_schema TO fdl_user;
2.3 设置 REPLICA IDENTITY FULL
pgoutput 模式下,所有需实时同步的表必须设置 REPLICA IDENTITY FULL,以确保 UPDATE 和 DELETE 操作能捕获完整行数据。
所有需要实时同步的表都必须执行此操作。若遗漏,管道启动时会报错中止。
-- 对每张需要同步的表执行(替换 schema 和 table 为实际名称)
ALTER TABLE your_schema.your_table REPLICA IDENTITY FULL;
例如将模式为 public 的表「产品名称维度」修改设置,则输入如下语句:
![]()
2.4 赋予用户 replication 和 login 权限命令
进入 PostgreSQL 安装目录下的 data 文件夹,进入 data 文件夹中,找到 postgresql.conf、pg_hba.conf,对这两个文件进行修改。
修改 PostgreSQL 复制槽设置。
本文示例中,postgresql.conf 文件在/var/lib/pgsql/12/data目录下,使用vi /var/lib/pgsql/12/data/postgresql.conf(用户根据实际情况修改 postgresql.conf 所在路径)语句编辑该文件,在文件末尾追加以下配置:
注:或者找到下面代码中的设置项,取消注释,修改对应的值。
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
| 语句 | 说明 |
|---|---|
wal_level = logical | 开启逻辑解码 |
| max_wal_senders = 10 | 最大 WAL 发送进程数,需要 ≥ FDL 采集任务数 |
| max_replication_slots = 10 | 最大复制槽数量,需要 ≥ FDL 采集任务数 |
设置用户 replication 权限。
本文示例中,pg_hba.conf 文件在/var/lib/pgsql/12/data目录下,使用vi /var/lib/pgsql/12/data/pg_hba.conf(用户根据实际情况修改 pg_hba.conf 所在路径)语句编辑该文件,把下图中红圈中的两行前边的 # 去掉,或者直接追加:
local replication all trust
host replication all 127.0.0.1/32 trust
host replication all ::1/128 trust2.5 重启数据库
# 根据系统不同,选择对应命令
systemctl restart postgresql-12
# 或
pg_ctl restart -D /var/lib/pgsql/12/data
3. 超管创建 DDL 检测资源
DBA(superuser)继续在 PostgreSQL 中创建 DDL 同步所需的 schema、表、function、trigger、publication。
注:无法捕获字段注释变更,仅支持表结构层面的变更(增删表、增删字段、修改字段名/类型、重命名表)。
3.1 创建schema
如果没有 schema fdl_temp 则使用下面语句创建schema fdl_temp:
CREATE SCHEMA IF NOT EXISTS fdl_temp;
3.2 创建 DDL 临时表
用来保存用户的 DDL 操作:
用户可自定义修改 SQL 语句中的 trep_sync_ddl(为 ddlTable 名称):
CREATE TABLE IF NOT EXISTS fdl_temp.trep_sync_ddl (
id bigserial primary key,
ddl_time timestamptz,
command_tag text,
objid text,
object_type text,
schema_name text,
table_name text,
search_path text,
ddl_sql text
);
表名 trep_sync_ddl 可自定义,后续 function 和 FDL 配置中保持一致即可。
3.3 创建function
本文 SQL 用于往 3.2 节中创建的 ddl table 中写入 ddl 信息。
3.3.1 Function 1 —— 捕获 DDL 变更(ALTER TABLE、RENAME COLUMN 等)
CREATE OR REPLACE FUNCTION fdl_temp.trep_pg_ddl_trigger_function() // trep_pg_ddl_trigger_function用户可以自定义
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY definer
SET search_path = fdl_temp, public
AS $function$
DECLARE
ddl_table_name text := 'trep_sync_ddl'; // trep_sync_ddl为 3.2 中用户自定义的tablename
full_table_name text := format('%I.%I', 'fdl_temp', ddl_table_name);
current_rows integer;
BEGIN
EXECUTE 'INSERT INTO ' || full_table_name || ' (' ||
'ddl_time, command_tag, objid, object_type, schema_name, table_name, search_path, ddl_sql' ||
') ' ||
'SELECT ' ||
'CURRENT_TIMESTAMP, ' ||
'c.command_tag, ' ||
'c.objid, ' ||
'c.object_type, ' ||
'COALESCE(c.schema_name, split_part(c.object_identity, ''.'', 1)) AS schema_name, ' ||
'split_part(c.object_identity, ''.'', 2) AS table_name, ' ||
'current_setting(''search_path''), ' ||
'pg_catalog.current_query() ' ||
'FROM pg_event_trigger_ddl_commands() c ' ||
'WHERE c.object_type IN (''table'', ''table column'')';
EXECUTE 'SELECT count(*) FROM ' || full_table_name
INTO current_rows;
IF current_rows > 1000 THEN
EXECUTE 'DELETE FROM ' || full_table_name ||
' WHERE id IN (' ||
'SELECT id FROM ' || full_table_name || ' ORDER BY id ASC LIMIT 100)';
END IF;
END;
$function$;
3.3.2 Function 2 —— 捕获 DROP TABLE
CREATE OR REPLACE FUNCTION fdl_temp.trep_pg_drop_trigger_function() //trep_pg_drop_trigger_function用户可以自定义
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY definer
SET search_path = fdl_temp, public
AS $function$
DECLARE
ddl_table_name text := 'trep_sync_ddl'; // trep_sync_ddl为3.2中用户自定义的tablename
full_table_name text := format('%I.%I', 'fdl_temp', ddl_table_name);
current_rows integer;
BEGIN
EXECUTE 'INSERT INTO ' || full_table_name || ' (' ||
'ddl_time, command_tag, objid, object_type, schema_name, table_name, search_path, ddl_sql' ||
') ' ||
'SELECT ' ||
'CURRENT_TIMESTAMP, ' ||
'''DROP'', ' ||
'd.objid, ' ||
'd.object_type, ' ||
'COALESCE(d.schema_name, split_part(d.object_identity, ''.'', 1)) AS schema_name, ' ||
'split_part(d.object_identity, ''.'', 2) AS table_name, ' ||
'current_setting(''search_path''), ' ||
'pg_catalog.current_query() ' ||
'FROM pg_event_trigger_dropped_objects() d ' ||
'WHERE d.object_type IN (''table'')';
EXECUTE 'SELECT count(*) FROM ' || full_table_name
INTO current_rows;
IF current_rows > 1000 THEN
EXECUTE 'DELETE FROM ' || full_table_name ||
' WHERE id IN (' ||
'SELECT id FROM ' || full_table_name || ' ORDER BY id ASC LIMIT 100)';
END IF;
END;
$function$;
3.4 创建 Event Trigger
用于感知 ddl 操作然后执行 3.3 中的 function:
CREATE EVENT TRIGGER flysync_pg_ddl_trigger // flysync_pg_ddl_trigger用户可以自定义名称
ON ddl_command_end
EXECUTE PROCEDURE fdl_temp.trep_pg_ddl_trigger_function(); // trep_pg_ddl_trigger_function为3.3节中用户自定义的名称
CREATE EVENT TRIGGER flysync_pg_drop_trigger // flysync_pg_drop_trigger用户可以自定义名称
ON sql_drop
EXECUTE PROCEDURE fdl_temp.trep_pg_drop_trigger_function(); // trep_pg_drop_trigger_function为3.3节中用户自定义的名称
3.5 创建publication
「建议」:用户可自定义修改 SQL 语句中的 fdl_publication 值:
CREATE PUBLICATION fdl_publication FOR ALL TABLES; // fdl_publication用户可以自定义名称
如果想要实现仅对单表进行 publication 权限的赋予,命令如下:
CREATE PUBLICATION fdl_publication FOR TABLE schema.table;
注意事项如下:
1)若超管已提前创建了 FOR ALL TABLES 的 publication,FDL 系统检测到后不会再执行 ALTER PUBLICATION ADD/DROP TABLE。
2)多个 FDL 任务可以共享同一个 publication;DBA 提前创建 FOR ALL TABLES 的 publication 后,多个任务可以共享,系统检测到已配置后不会修改它。
3)如果对单表进行 publication 权限的赋予,需注意:不在 publication 范围内的表无法被 pgoutput 正常读取。
3.6 数据连接最终授权补充
在本文 2.2 节中已创建用户并授予业务表 SELECT 权限。还需要为 fdl_temp schema 补充以下权限:
注:SQL 语句中的fdl_temp.trep_sync_ddl为本文 3.2 节用户自定义的表名,需根据实际情况修改。
-- Schema 使用权
GRANT USAGE ON SCHEMA fdl_temp TO fdl_user;
-- DDL 临时表读权限
GRANT SELECT ON fdl_temp.trep_sync_ddl TO fdl_user;
-- DDL 临时表清理权限(FDL 采集任务每小时清理一次)
GRANT TRUNCATE ON fdl_temp.trep_sync_ddl TO fdl_user;
-- 未来在 fdl_temp 下新建表自动可读
ALTER DEFAULT PRIVILEGES IN SCHEMA fdl_temp GRANT SELECT ON TABLES TO fdl_user;
| 权限 | 目标对象 | 用途 |
|---|---|---|
| USAGE | 业务 schema + fdl_temp | 允许访问 schema |
| SELECT | 业务表 + fdl_temp 下表 | 读取 DML 数据和 DDL 事件 |
| TRUNCATE | fdl_temp.trep_sync_ddl | DDL 临时表定时清理 |
| REPLICATION | 用户级别(2.2 节已赋予) | 创建 replication slot,读取 WAL 日志 |
普通用户不需要 INSERT/CREATE/DROP 权限在 fdl_temp 上。DDL 临时表的写入由 SECURITY DEFINER function 以超管权限完成。
4. 数据连接配置
超管完成以上所有准备后,在 FineDataLink 中使用 fdl_user 创建 PostgreSQL 数据连接

| 设置项 | 说明 | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ddlTable、publication |
注:所有连接上的改动都必须重置(重新生成)采集任务 才会生效,不然生成采集任务后就会把对应配置保存起来,无法修改 | ||||||||||||||||
| 采集DDL事件配置 | 需要开启,该按钮开启后,实时管道任务才能使用 同步源表结构变化 功能 开启后仅在该数据连接作为实时来源端时生效,调整后需重启(暂停再启动)对应采集任务 |
5. 资源回收
如果确认不再使用 DDL 同步功能,按以下严格顺序清理资源:
DROP PUBLICATION IF EXISTS fdl_publication; // fdl_publication为用户自定义的publication名
DROP EVENT TRIGGER IF EXISTS flysync_pg_ddl_trigger; // flysync_pg_ddl_trigger为用户自定义的trigger名
DROP EVENT TRIGGER IF EXISTS flysync_pg_drop_trigger; // flysync_pg_drop_trigger为用户自定义的trigger名
DROP FUNCTION IF EXISTS fdl_temp.trep_pg_ddl_trigger_function(); // trep_pg_ddl_trigger_function为用户自定义的function名
DROP FUNCTION IF EXISTS fdl_temp.trep_pg_drop_trigger_function(); // trep_pg_drop_trigger_function为用户自定义的function名
DROP TABLE IF EXISTS fdl_temp.trep_sync_ddl;// trep_sync_ddl为用户自定义的table名
如自定义了名称,请替换为实际值。必须先删除 publication 和 trigger,再删除 function 和 table,否则会因依赖关系报错。
