Hongshun Wang created FLINK-35387: ------------------------------------- Summary: PG CDC source support heart beat Key: FLINK-35387 URL: https://issues.apache.org/jira/browse/FLINK-35387 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: Hongshun Wang Fix For: cdc-3.2.0
Though, document of PG CDC [1] has heartbeat.interval.ms, but it's not valid. The reason is bellow. In debezium dos says: For the connector to detect and process events from a heartbeat table, you must add the table to the PostgreSQL publication specified by the [publication.name|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-publication-name] property. If this publication predates your Debezium deployment, the connector uses the publications as defined. If the publication is not already configured to automatically replicate changes {{FOR ALL TABLES}} in the database, you must explicitly add the heartbeat table to the publication[2]. Thus, if you want use heart beat in cdc: 1. add a heartbeat table to publication: ALTER PUBLICATION _<publicationName>_ ADD TABLE {_}<heartbeatTableName>{_}; 2. set heartbeatInterval 3. add debezium.[{{heartbeat.action.query}}|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query] [3] However, when I use it it CDC, some exception occurs: {code:java} Caused by: java.lang.NullPointerException at io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55) at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:127) at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:94){code} !https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5292b7c63c883d1620bbf7d3875a3a5b158e70b814913bc360a414d3de9277d871abf3af1cbd75249eddaaa1b37c2b2f5421a918fb1a2f0f3853c0ce41721e620699d98626fa2281948c58faa63edf8ebfc653b69905bac42?tmpCode=9193555a-7bf3-4335-9427-b59c1dfe1931! It seems CDC don't add a HeartbeatConnectionProvider when configure PostgresEventDispatcher: {code:java} //org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configure this.postgresDispatcher = new PostgresEventDispatcher<>( dbzConfig, topicSelector, schema, queue, dbzConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, metadataProvider, schemaNameAdjuster); {code} in debezium, when PostgresConnectorTask start, it will do it {code:java} //io.debezium.connector.postgresql.PostgresConnectorTask#start final PostgresEventDispatcher<TableId> dispatcher = new PostgresEventDispatcher<>( connectorConfig, topicNamingStrategy, schema, queue, connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, PostgresChangeRecordEmitter::updateSchema, metadataProvider, connectorConfig.createHeartbeat( topicNamingStrategy, schemaNameAdjuster, () -> new PostgresConnection(connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_GENERAL), exception -> { String sqlErrorId = exception.getSQLState(); switch (sqlErrorId) { case "57P01": // Postgres error admin_shutdown, see https://www.postgresql.org/docs/12/errcodes-appendix.html throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception); case "57P03": // Postgres error cannot_connect_now, see https://www.postgresql.org/docs/12/errcodes-appendix.html throw new RetriableException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception); default: break; } }), schemaNameAdjuster, signalProcessor); {code} Thus, this jira will add this. [1] https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/ [2] https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-interval-ms [3] https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query -- This message was sent by Atlassian Jira (v8.20.10#820010)