[ https://issues.apache.org/jira/browse/FLINK-35387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-35387: ----------------------------------- Labels: pull-request-available (was: ) > PG CDC source support heart beat > -------------------------------- > > Key: FLINK-35387 > URL: https://issues.apache.org/jira/browse/FLINK-35387 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.1.0 > Reporter: Hongshun Wang > Assignee: Hongshun Wang > Priority: Major > Labels: pull-request-available > Fix For: cdc-3.3.0 > > > Though, document of PG CDC [1] has heartbeat.interval.ms, but it's not valid. > The reason is bellow. > In debezium dos says: For the connector to detect and process events from a > heartbeat table, you must add the table to the PostgreSQL publication > specified by the > [publication.name|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-publication-name] > property. If this publication predates your Debezium deployment, the > connector uses the publications as defined. If the publication is not already > configured to automatically replicate changes {{FOR ALL TABLES}} in the > database, you must explicitly add the heartbeat table to the publication[2]. > Thus, if you want use heart beat in cdc: > 1. add a heartbeat table to publication: ALTER PUBLICATION > _<publicationName>_ ADD TABLE {_}<heartbeatTableName>{_}; > 2. set heartbeatInterval > 3. add > debezium.[{{heartbeat.action.query}}|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query] > [3] > > However, when I use it it CDC, some exception occurs: > {code:java} > Caused by: java.lang.NullPointerException > at > io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55) > at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:127) > at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:94){code} > !https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5292b7c63c883d1620bbf7d3875a3a5b158e70b814913bc360a414d3de9277d871abf3af1cbd75249eddaaa1b37c2b2f5421a918fb1a2f0f3853c0ce41721e620699d98626fa2281948c58faa63edf8ebfc653b69905bac42?tmpCode=9193555a-7bf3-4335-9427-b59c1dfe1931! > > It seems CDC don't add a HeartbeatConnectionProvider when configure > PostgresEventDispatcher: > {code:java} > //org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configure > this.postgresDispatcher = > new PostgresEventDispatcher<>( > dbzConfig, > topicSelector, > schema, > queue, > dbzConfig.getTableFilters().dataCollectionFilter(), > DataChangeEvent::new, > metadataProvider, > schemaNameAdjuster); {code} > in debezium, when PostgresConnectorTask start, it will do it > {code:java} > //io.debezium.connector.postgresql.PostgresConnectorTask#start > final PostgresEventDispatcher<TableId> dispatcher = new > PostgresEventDispatcher<>( > connectorConfig, > topicNamingStrategy, > schema, > queue, > connectorConfig.getTableFilters().dataCollectionFilter(), > DataChangeEvent::new, > PostgresChangeRecordEmitter::updateSchema, > metadataProvider, > connectorConfig.createHeartbeat( > topicNamingStrategy, > schemaNameAdjuster, > () -> new > PostgresConnection(connectorConfig.getJdbcConfig(), > PostgresConnection.CONNECTION_GENERAL), > exception -> { > String sqlErrorId = exception.getSQLState(); > switch (sqlErrorId) { > case "57P01": > // Postgres error admin_shutdown, see > https://www.postgresql.org/docs/12/errcodes-appendix.html > throw new DebeziumException("Could > not execute heartbeat action query (Error: " + sqlErrorId + ")", exception); > case "57P03": > // Postgres error cannot_connect_now, > see https://www.postgresql.org/docs/12/errcodes-appendix.html > throw new RetriableException("Could > not execute heartbeat action query (Error: " + sqlErrorId + ")", exception); > default: > break; > } > }), > schemaNameAdjuster, > signalProcessor); {code} > Thus, this jira will add this. > > [1] > https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/ > [2] > https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-interval-ms > [3] > https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query -- This message was sent by Atlassian Jira (v8.20.10#820010)