hql0312 created FLINK-37080:
-------------------------------
Summary: Flink CDC Postgres when start with latest status in
stream phase and add new table, it will be too slow
Key: FLINK-37080
URL: https://issues.apache.org/jira/browse/FLINK-37080
Project: Flink
Issue Type: Bug
Components: Flink CDC
Reporter: hql0312
when i add new tables in a stream phase flink task, it will start with
StreamSplit first,
Enumerator will find add new table, will switch to snapshot in new tables. in
IncrementalSourceSplitReader.java method `pollSplitRecords` have follow logic:
{code:java}
.....
else if (currentFetcher instanceof IncrementalSourceScanFetcher) {
// (2) try to switch to stream split reading util current snapshot split
finished
dataIt = currentFetcher.pollSplitRecords();
if (dataIt != null) {
// first fetch data of snapshot split, return and emit the records of
snapshot split
ChangeEventRecords records;
if (context.isHasAssignedStreamSplit()) {
records = forNewAddedTableFinishedSplit(currentSplitId, dataIt);
closeScanFetcher();
closeStreamFetcher();
} else {
records = forRecords(dataIt);
SnapshotSplit nextSplit = snapshotSplits.poll();
if (nextSplit != null) {
checkState(reusedScanFetcher != null);
submitSnapshotSplit(nextSplit);
} else {
closeScanFetcher();
}
}
return records;
} else {
return finishedSplit();
} }
return records;
} else {
return finishedSplit();
} {code}
when currentFetcher instanceof IncrementalSourceScanFetcher is ture, it will
call forNewAddedTableFinishedSplit and add STREAM_SPLIT_ID to finishedSplitId.
and when currentFetcher fetch complete ,it will be fetch by reader
finishCurrentFetch ,then readd STREAM_SPLIT_ID to splits ,and it will be
consumed again, every streamsplit, will call
PostgresSourceFetchTaskContext.configure like this:
{code:java}
try {
this.schema =
PostgresObjectUtils.newSchema(
jdbcConnection,
dbzConfig,
jdbcConnection.getTypeRegistry(),
topicSelector,
valueConverterBuilder.build(jdbcConnection.getTypeRegistry()));
} catch (SQLException e) {
throw new RuntimeException("Failed to initialize PostgresSchema", e);
} {code}
it will load all table schema.
{code:java}
public static PostgresSchema newSchema(
PostgresConnection connection,
PostgresConnectorConfig config,
TypeRegistry typeRegistry,
TopicSelector<TableId> topicSelector,
PostgresValueConverter valueConverter)
throws SQLException {
PostgresSchema schema =
new PostgresSchema(
config,
typeRegistry,
connection.getDefaultValueConverter(),
topicSelector,
valueConverter);
// load all config table list
schema.refresh(connection, false);
return schema;
} {code}
when my config‘s table list is t_(.*) ,and it will match 1000+ tables, every
snapshot will load.
and when closestreamfetcher,it should be close executorService first ,but like
this
{code:java}
public void close() {
try {
if (taskContext != null) {
taskContext.close();
}
if (snapshotSplitReadTask != null) {
snapshotSplitReadTask.close();
}
if (executorService != null) {
executorService.shutdown();
if (!executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
LOG.warn(
"Failed to close the scan fetcher in {} seconds.",
READER_CLOSE_TIMEOUT_SECONDS);
}
}
} catch (Exception e) {
LOG.error("Close scan fetcher error", e);
}
} {code}
it will cause exception. because the task is running, but taskContext.close()
will close connection first .
--
This message was sent by Atlassian Jira
(v8.20.10#820010)