tchivs created FLINK-38844:
------------------------------
Summary: Add metadata column support for PostgreSQL Pipeline
Connector
Key: FLINK-38844
URL: https://issues.apache.org/jira/browse/FLINK-38844
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: cdc-3.5.0
Reporter: tchivs
h2. Component
*Flink CDC / Pipeline Connectors / PostgreSQL*
h2. Summary
Add metadata column support for PostgreSQL Pipeline Connector
h2. Description
h3. Background
Currently, the PostgreSQL Source Connector supports metadata columns (op_ts,
database_name, schema_name, table_name, row_kind) through
{{{}PostgreSQLReadableMetadata{}}}, but the PostgreSQL Pipeline Connector does
not expose these metadata columns to users.
h3. Problem
Users cannot access metadata information (such as operation timestamp, database
name, schema name, table name) when using PostgreSQL Pipeline Connector in
their data pipelines. This limits the ability to:
* Track when data changes occurred
* Identify the source database/schema/table of records
* Implement metadata-based routing or filtering logic
h3. Proposed Solution
Implement metadata column support for PostgreSQL Pipeline Connector by:
# *Create metadata column implementations* for the pipeline connector:
** {{{}OpTsMetadataColumn{}}}: Provides operation timestamp (milliseconds
since epoch)
** {{{}DatabaseNameMetadataColumn{}}}: Provides source database name
** {{{}SchemaNameMetadataColumn{}}}: Provides source schema name
** {{{}TableNameMetadataColumn{}}}: Provides source table name
# *Update PostgresDataSource* to expose supported metadata columns via
{{supportedMetadataColumns()}} method
# *Add comprehensive tests* to verify metadata functionality in both snapshot
and incremental phases
# *Update documentation* to guide users on how to use metadata columns
h3. Implementation Details
h4. Metadata Columns
The following metadata columns will be supported:
||Metadata Key||Data Type||Description||
|{{op_ts}}|BIGINT NOT NULL|Operation timestamp in milliseconds. Returns 0 for
snapshot records, actual timestamp for incremental records|
|{{database_name}}|STRING NOT NULL|Name of the source database|
|{{schema_name}}|STRING NOT NULL|Name of the source schema|
|{{table_name}}|STRING NOT NULL|Name of the source table|
*Note:* {{row_kind}} metadata is not included because it requires {{RowData}}
and cannot be read from {{SourceRecord}} like other metadata columns.
h4. Architecture
{{PostgresDataSource
├── supportedMetadataColumns()
│ ├── OpTsMetadataColumn
│ ├── DatabaseNameMetadataColumn
│ ├── SchemaNameMetadataColumn
│ └── TableNameMetadataColumn
└── PostgresEventDeserializer
└── getMetadata() - reads metadata from SourceRecord}}
h2. Usage Example
h3. Pipeline Configuration
{{}}
{code:java}
source:
type: postgres
hostname: localhost
port: 5432
username: postgres
password: postgres
database-name: mydb
schema-name: public
table-name: orders
transform:
- source-table: public.orders
projection: id, order_date, amount, op_ts, database_name, schema_name,
table_name
description: Include metadata columns in output
sink:
type: doris
# ... sink configuration
{code}
{{}}
h3. Expected Output
The output records will include metadata columns:
||id||order_date||amount||op_ts||database_name||schema_name||table_name||
|1|2024-01-01|100.00|0|mydb|public|orders|
|2|2024-01-02|200.00|1704182400000|mydb|public|orders|
*Note:* {{op_ts}} is 0 for snapshot records and contains actual timestamp for
incremental records.
h3. Fix Version
* 3.6.0 (or next release version)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)