loserwang1024 commented on code in PR #2571: URL: https://github.com/apache/flink-cdc/pull/2571#discussion_r1828650330
########## flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java: ########## @@ -113,12 +112,17 @@ public PostgresStreamingChangeEventSource( @Override public void init() { + // It's not necessary to refresh schema again, which is very time-consuming. + // The schema of taskContext is the reference of PostgresSourceFetchTaskContext#schema, and + // has been initialized when submit StreamSplit fetch task by + // IncrementalSourceStreamFetcher#submitTask -> PostgresSourceFetchTaskContext#configure. + // refresh the schema so we have a latest view of the DB tables - try { - taskContext.refreshSchema(connection, true); Review Comment: It's copied form debezium. Maybe we can provide a subclass(StreamSplitReadTask) to override this method? ########## flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java: ########## @@ -214,6 +218,21 @@ public JdbcSourceFetchTaskContext createFetchTaskContext( return new PostgresSourceFetchTaskContext(taskSourceConfig, this); } + @Override + public FetchTask.Context createFetchTaskContext( + SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig, boolean reuse) { + if (!reuse) { + return createFetchTaskContext(sourceSplitBase, taskSourceConfig); + } + if (this.taskSourceConfig == null + || this.fetchTaskContext == null + || !this.taskSourceConfig.equals(taskSourceConfig)) { + this.taskSourceConfig = taskSourceConfig; + this.fetchTaskContext = createFetchTaskContext(sourceSplitBase, taskSourceConfig); Review Comment: It seem that fetchTaskContext will be shared, what happens if the first one invoke FetchTask.Context#close to close the connection and then another task use this fetchTaskContext? ########## flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java: ########## @@ -71,6 +71,12 @@ public interface DataSourceDialect<C extends SourceConfig> /** The task context used for fetch task to fetch data from external systems. */ FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase, C sourceConfig); + /** Try to reuse Context if reuse is true. */ Review Comment: I wonder which situation reuse is true , and which situation resuse is false? If always reuse, no need the param resuse? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org