[ 
https://issues.apache.org/jira/browse/FLINK-37479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17946395#comment-17946395
 ] 

Hongshun Wang commented on FLINK-37479:
---------------------------------------

[~phamvinh1712] I have test it, you are right, let's push forwards it. My test 
is here.
 # Prepare a postgres(13+), create a partition table.
{code:java}
CREATE TABLE measurement (
    city_id         int not null,
    logdate         date not null
)PARTITION BY RANGE (logdate);CREATE TABLE measurement_y2006m02 PARTITION OF 
measurement
    FOR VALUES FROM ('2006-02-01') TO ('2006-03-01');CREATE TABLE 
measurement_y2006m03 PARTITION OF measurement
    FOR VALUES FROM ('2006-03-01') TO ('2006-04-01'); {code}

 # create a publication with publish_via_partition_root  in postgres 
{code:java}
ALTER TABLE public.measurement  
    REPLICA IDENTITY FULL ;
    
CREATE PUBLICATION test_partition_publication 
FOR TABLE public.measurement
WITH ( 
  publish_via_partition_root=true
); {code}

 # create a flink cdc job using this publication, and read latest data. 
{code:java}
CREATE TEMPORARY TABLE pg_source (
   city_id         int not null,
    logdate         date not null
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = 'public',
  'table-name' = 'measurement',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.startup.mode' = 'latest-offset',
  'debezium.publication.autocreate.mode' = 'disabled',
  'decoding.plugin.name'= 'pgoutput',
  'slot.name' = 'test_partition',
);CREATE TEMPORARY TABLE print_sink (
   city_id         int not null,
    logdate         date not null
) WITH (
  'connector' = 'print'
);
insert into print_sink select * from pg_source; {code}

 #  After job starts, insert data into source table
{code:java}
INSERT INTO measurement values (1, '2006-02-03'),(2, '2006-03-02'); {code}

 # finally, we can see flink job prints:

{code:java}
+I[1, 2006-02-03]
+I[2, 2006-03-02] {code}

> postgres cdc connector support discover PARTITIONED TABLE
> ---------------------------------------------------------
>
>                 Key: FLINK-37479
>                 URL: https://issues.apache.org/jira/browse/FLINK-37479
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>            Reporter: Vinh Pham
>            Priority: Minor
>
> At the moment, Postgresql connector doesn't support discover PARTITIONED 
> TABLE, making the connector not possible to take snapshot on partitioned 
> table.
> [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java#L41]
>  
> This is inconsistent with the other implementation in PostgresConnection 
> class 
> [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java#L859]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to