hql0312 created FLINK-37080: ------------------------------- Summary: Flink CDC Postgres when start with latest status in stream phase and add new table, it will be too slow Key: FLINK-37080 URL: https://issues.apache.org/jira/browse/FLINK-37080 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: hql0312
when i add new tables in a stream phase flink task, it will start with StreamSplit first, Enumerator will find add new table, will switch to snapshot in new tables. in IncrementalSourceSplitReader.java method `pollSplitRecords` have follow logic: {code:java} ..... else if (currentFetcher instanceof IncrementalSourceScanFetcher) { // (2) try to switch to stream split reading util current snapshot split finished dataIt = currentFetcher.pollSplitRecords(); if (dataIt != null) { // first fetch data of snapshot split, return and emit the records of snapshot split ChangeEventRecords records; if (context.isHasAssignedStreamSplit()) { records = forNewAddedTableFinishedSplit(currentSplitId, dataIt); closeScanFetcher(); closeStreamFetcher(); } else { records = forRecords(dataIt); SnapshotSplit nextSplit = snapshotSplits.poll(); if (nextSplit != null) { checkState(reusedScanFetcher != null); submitSnapshotSplit(nextSplit); } else { closeScanFetcher(); } } return records; } else { return finishedSplit(); } } return records; } else { return finishedSplit(); } {code} when currentFetcher instanceof IncrementalSourceScanFetcher is ture, it will call forNewAddedTableFinishedSplit and add STREAM_SPLIT_ID to finishedSplitId. and when currentFetcher fetch complete ,it will be fetch by reader finishCurrentFetch ,then readd STREAM_SPLIT_ID to splits ,and it will be consumed again, every streamsplit, will call PostgresSourceFetchTaskContext.configure like this: {code:java} try { this.schema = PostgresObjectUtils.newSchema( jdbcConnection, dbzConfig, jdbcConnection.getTypeRegistry(), topicSelector, valueConverterBuilder.build(jdbcConnection.getTypeRegistry())); } catch (SQLException e) { throw new RuntimeException("Failed to initialize PostgresSchema", e); } {code} it will load all table schema. {code:java} public static PostgresSchema newSchema( PostgresConnection connection, PostgresConnectorConfig config, TypeRegistry typeRegistry, TopicSelector<TableId> topicSelector, PostgresValueConverter valueConverter) throws SQLException { PostgresSchema schema = new PostgresSchema( config, typeRegistry, connection.getDefaultValueConverter(), topicSelector, valueConverter); // load all config table list schema.refresh(connection, false); return schema; } {code} when my config‘s table list is t_(.*) ,and it will match 1000+ tables, every snapshot will load. and when closestreamfetcher,it should be close executorService first ,but like this {code:java} public void close() { try { if (taskContext != null) { taskContext.close(); } if (snapshotSplitReadTask != null) { snapshotSplitReadTask.close(); } if (executorService != null) { executorService.shutdown(); if (!executorService.awaitTermination( READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { LOG.warn( "Failed to close the scan fetcher in {} seconds.", READER_CLOSE_TIMEOUT_SECONDS); } } } catch (Exception e) { LOG.error("Close scan fetcher error", e); } } {code} it will cause exception. because the task is running, but taskContext.close() will close connection first . -- This message was sent by Atlassian Jira (v8.20.10#820010)