hql0312 created FLINK-37080:
-------------------------------

             Summary: Flink CDC Postgres when start with latest status in 
stream phase and add new table, it will be too slow
                 Key: FLINK-37080
                 URL: https://issues.apache.org/jira/browse/FLINK-37080
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: hql0312


when i add new tables in a stream phase flink task, it will start with 
StreamSplit first, 
Enumerator will find add new table, will switch to snapshot in new tables.  in  
IncrementalSourceSplitReader.java method `pollSplitRecords` have follow logic:


{code:java}
..... 
else if (currentFetcher instanceof IncrementalSourceScanFetcher) {
    // (2) try to switch to stream split reading util current snapshot split 
finished
    dataIt = currentFetcher.pollSplitRecords();
    if (dataIt != null) {
        // first fetch data of snapshot split, return and emit the records of 
snapshot split
        ChangeEventRecords records;
        if (context.isHasAssignedStreamSplit()) {
            records = forNewAddedTableFinishedSplit(currentSplitId, dataIt);
            closeScanFetcher();
            closeStreamFetcher();
        } else {
            records = forRecords(dataIt);
            SnapshotSplit nextSplit = snapshotSplits.poll();
            if (nextSplit != null) {
                checkState(reusedScanFetcher != null);
                submitSnapshotSplit(nextSplit);
            } else {
                closeScanFetcher();
            }
        }
        return records;
    } else {
        return finishedSplit();
    } }
return records;
} else {
return finishedSplit();
} {code}
when currentFetcher instanceof IncrementalSourceScanFetcher is ture, it will 
call forNewAddedTableFinishedSplit and add STREAM_SPLIT_ID to finishedSplitId. 
and when currentFetcher fetch complete ,it will be fetch by reader 
finishCurrentFetch ,then readd STREAM_SPLIT_ID to splits ,and it will be 
consumed again, every streamsplit, will call 
PostgresSourceFetchTaskContext.configure like this:
{code:java}
try {
    this.schema =
            PostgresObjectUtils.newSchema(
                    jdbcConnection,
                    dbzConfig,
                    jdbcConnection.getTypeRegistry(),
                    topicSelector,
                    
valueConverterBuilder.build(jdbcConnection.getTypeRegistry()));
} catch (SQLException e) {
    throw new RuntimeException("Failed to initialize PostgresSchema", e);
} {code}
 it will load all table schema.
{code:java}
public static PostgresSchema newSchema(
        PostgresConnection connection,
        PostgresConnectorConfig config,
        TypeRegistry typeRegistry,
        TopicSelector<TableId> topicSelector,
        PostgresValueConverter valueConverter)
        throws SQLException {
    PostgresSchema schema =
            new PostgresSchema(
                    config,
                    typeRegistry,
                    connection.getDefaultValueConverter(),
                    topicSelector,
                    valueConverter);
    // load all config table list
    schema.refresh(connection, false);
    return schema;
} {code}
 when my config‘s table list is t_(.*) ,and it will match 1000+ tables, every 
snapshot will load.
and when closestreamfetcher,it should be close executorService first ,but like 
this 
{code:java}
public void close() {
    try {
        if (taskContext != null) {
            taskContext.close();
        }

        if (snapshotSplitReadTask != null) {
            snapshotSplitReadTask.close();
        }

        if (executorService != null) {
            executorService.shutdown();
            if (!executorService.awaitTermination(
                    READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                LOG.warn(
                        "Failed to close the scan fetcher in {} seconds.",
                        READER_CLOSE_TIMEOUT_SECONDS);
            }
        }
    } catch (Exception e) {
        LOG.error("Close scan fetcher error", e);
    }
} {code}
it will cause exception. because the task is running, but  taskContext.close() 
will close connection first .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to