tchivs created FLINK-36164:
------------------------------
Summary: JdbcIncrementalSource CheckPoint Timeout Due to
Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned
PostgreSQL Table
Key: FLINK-36164
URL: https://issues.apache.org/jira/browse/FLINK-36164
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: tchivs
When synchronizing a PostgreSQL table using a connector, if the table contains
a large number of partitions, the checkpoint always fails. By tracing the
source code, it was found that the PostgresDialect's queryTableSchema queries
the schema of each table. This schema querying during each checkpoint causes
connection timeouts.
tableName="(public)\.(aia_t_icc_jjdb.*|aia_t_vcs_fkdb.*|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)"
```
JdbcIncrementalSource<String> incrSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname(hostname)
.port(port)
.database(databaseName)
.schemaList(schemaName)
.tableList(tableName)
.username(username)
.password(password)
.deserializer(schema)
.slotName(slotName)
.decodingPluginName(config.get(DECODING_PLUGIN_NAME))
.includeSchemaChanges(true)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
.fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
.connectTimeout(config.get(CONNECT_TIMEOUT))
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
.connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
.distributionFactorUpper(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
.distributionFactorLower(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
.heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
.build();
return env.fromSource(
incrSource, WatermarkStrategy.noWatermarks(), "Postgres IncrSource");
```
check point Exception:
```
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold. The latest checkpoint failed due to Checkpoint expired
before completing., view the Checkpoint History tab or the Job Manager log to
find out why continuous checkpoints failed.
```

error log:
```logger
2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] -
Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0
(dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Un-registering task and sending final execution state CANCELED to
JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0
dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O
exception (java.net.SocketException) caught when processing request to
{}->http://192.168.0.168:8040: Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Un-registering task and sending final execution state CANCELED to
JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0
dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] -
Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling
signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown
Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)
2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping
the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] -
Reporting error:
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822)
[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_381]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping
the task and engine
2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] -
Stopping down connector
2024-05-28 15:41:07,376 WARN io.debezium.pipeline.ChangeEventSourceCoordinator
[] - Coordinator didn't stop in the expected time, shutting down executor now
2024-05-28 15:41:07,377 WARN
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot was
interrupted before completion
2024-05-28 15:41:07,377 INFO
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot -
Final stage
2024-05-28 15:41:07,377 WARN io.debezium.pipeline.ChangeEventSourceCoordinator
[] - Change event source executor was interrupted
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at
io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_381]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_381]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:41:07,378 INFO io.debezium.pipeline.ChangeEventSourceCoordinator
[] - Connected metrics set to 'false'
2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection
gracefully closed
```

### What You Expected?
When synchronizing a PostgreSQL table using a connector, if the table contains
a large number of partitions, the checkpoint always fails. By tracing the
source code, it was found that the `PostgresDialect`'s `queryTableSchema`
queries the schema of each table. This schema querying during each checkpoint
causes connection timeouts.
**Solution:** Use caching. Each partitioned table should match and fetch the
schema only once using the following parameters:
- `--multi-to-one-origin "jjdb_.*|fkdb_.*|pjdb_.*"`
- `--multi-to-one-target "dwd_jjdb|dwdfkdb|dwd_pjdb"`
### How to Reproduce?
**Steps to Reproduce:**
1. Add a partitioned table in PostgreSQL.
2. Create partitions for nearly ten years.
3. Synchronize this table.
### Anything Else?
**Method Modification:**
I modified the method as follows and it works well:
```java
@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId
tableId) {
long startTime = System.nanoTime(); // Record start time
String name = this.tableNameConverter.convert(tableId.table());
TableId parentTableId = new TableId(null, tableId.schema(), name);
TableChanges.TableChange tableChange = cache.get(parentTableId);
if (tableChange == null) {
LOG.info("[queryTableSchema begin] {}", tableId.identifier());
if (schema == null) {
schema = new CustomPostgresSchema((PostgresConnection) jdbc,
sourceConfig);
}
tableChange = schema.getTableSchema(tableId);
LOG.info("[queryTableSchema end] {}", tableId.identifier());
cache.put(parentTableId, tableChange);
}
long endTime = System.nanoTime();
long duration = endTime - startTime;
LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(),
duration / 1_000_000); // Convert nanoseconds to milliseconds
return tableChange;
}
```
I am willing to submit a PR!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)