[ 
https://issues.apache.org/jira/browse/FLINK-35387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35387:
-----------------------------------
    Labels: pull-request-available  (was: )

> PG CDC source support heart beat
> --------------------------------
>
>                 Key: FLINK-35387
>                 URL: https://issues.apache.org/jira/browse/FLINK-35387
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.1.0
>            Reporter: Hongshun Wang
>            Assignee: Hongshun Wang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: cdc-3.3.0
>
>
> Though, document of PG CDC [1] has heartbeat.interval.ms, but it's not valid. 
> The reason is bellow.
> In debezium dos says: For the connector to detect and process events from a 
> heartbeat table, you must add the table to the PostgreSQL publication 
> specified by the 
> [publication.name|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-publication-name]
>  property. If this publication predates your Debezium deployment, the 
> connector uses the publications as defined. If the publication is not already 
> configured to automatically replicate changes {{FOR ALL TABLES}} in the 
> database, you must explicitly add the heartbeat table to the publication[2].
> Thus, if you want use heart beat in cdc:
> 1. add a heartbeat table to publication: ALTER PUBLICATION 
> _<publicationName>_ ADD TABLE {_}<heartbeatTableName>{_};
> 2. set heartbeatInterval
> 3. add 
> debezium.[{{heartbeat.action.query}}|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query]
>  [3]
>  
> However, when I use it it CDC, some exception occurs:
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55)
> at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:127)
> at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:94){code}
> !https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5292b7c63c883d1620bbf7d3875a3a5b158e70b814913bc360a414d3de9277d871abf3af1cbd75249eddaaa1b37c2b2f5421a918fb1a2f0f3853c0ce41721e620699d98626fa2281948c58faa63edf8ebfc653b69905bac42?tmpCode=9193555a-7bf3-4335-9427-b59c1dfe1931!
>  
> It seems CDC don't add  a HeartbeatConnectionProvider  when configure 
> PostgresEventDispatcher:
> {code:java}
> //org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configure
> this.postgresDispatcher =
>                 new PostgresEventDispatcher<>(
>                         dbzConfig,
>                         topicSelector,
>                         schema,
>                         queue,
>                         dbzConfig.getTableFilters().dataCollectionFilter(),
>                         DataChangeEvent::new,
>                         metadataProvider,
>                         schemaNameAdjuster); {code}
> in debezium, when PostgresConnectorTask start, it will  do it
> {code:java}
> //io.debezium.connector.postgresql.PostgresConnectorTask#start
>   final PostgresEventDispatcher<TableId> dispatcher = new 
> PostgresEventDispatcher<>(
>                     connectorConfig,
>                     topicNamingStrategy,
>                     schema,
>                     queue,
>                     connectorConfig.getTableFilters().dataCollectionFilter(),
>                     DataChangeEvent::new,
>                     PostgresChangeRecordEmitter::updateSchema,
>                     metadataProvider,
>                     connectorConfig.createHeartbeat(
>                             topicNamingStrategy,
>                             schemaNameAdjuster,
>                             () -> new 
> PostgresConnection(connectorConfig.getJdbcConfig(), 
> PostgresConnection.CONNECTION_GENERAL),
>                             exception -> {
>                                 String sqlErrorId = exception.getSQLState();
>                                 switch (sqlErrorId) {
>                                     case "57P01":
>                                         // Postgres error admin_shutdown, see 
> https://www.postgresql.org/docs/12/errcodes-appendix.html
>                                         throw new DebeziumException("Could 
> not execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
>                                     case "57P03":
>                                         // Postgres error cannot_connect_now, 
> see https://www.postgresql.org/docs/12/errcodes-appendix.html
>                                         throw new RetriableException("Could 
> not execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
>                                     default:
>                                         break;
>                                 }
>                             }),
>                     schemaNameAdjuster,
>                     signalProcessor); {code}
> Thus, this jira will add this.
>  
>  [1] 
> https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/
> [2] 
> https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-interval-ms
> [3] 
> https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to