loserwang1024 commented on code in PR #2571:
URL: https://github.com/apache/flink-cdc/pull/2571#discussion_r1828650330


##########
flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java:
##########
@@ -113,12 +112,17 @@ public PostgresStreamingChangeEventSource(
 
     @Override
     public void init() {
+        // It's not necessary to refresh schema again, which is very 
time-consuming.
+        // The schema of taskContext is the reference of 
PostgresSourceFetchTaskContext#schema, and
+        // has been initialized when submit StreamSplit fetch task by
+        // IncrementalSourceStreamFetcher#submitTask -> 
PostgresSourceFetchTaskContext#configure.
+
         // refresh the schema so we have a latest view of the DB tables
-        try {
-            taskContext.refreshSchema(connection, true);

Review Comment:
   It's copied form debezium.  Maybe we can provide a 
subclass(StreamSplitReadTask) to override this method?



##########
flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java:
##########
@@ -214,6 +218,21 @@ public JdbcSourceFetchTaskContext createFetchTaskContext(
         return new PostgresSourceFetchTaskContext(taskSourceConfig, this);
     }
 
+    @Override
+    public FetchTask.Context createFetchTaskContext(
+            SourceSplitBase sourceSplitBase, JdbcSourceConfig 
taskSourceConfig, boolean reuse) {
+        if (!reuse) {
+            return createFetchTaskContext(sourceSplitBase, taskSourceConfig);
+        }
+        if (this.taskSourceConfig == null
+                || this.fetchTaskContext == null
+                || !this.taskSourceConfig.equals(taskSourceConfig)) {
+            this.taskSourceConfig = taskSourceConfig;
+            this.fetchTaskContext = createFetchTaskContext(sourceSplitBase, 
taskSourceConfig);

Review Comment:
   It seem that fetchTaskContext will be shared, what happens if the first one 
invoke FetchTask.Context#close to close the connection and then another task 
use this fetchTaskContext?



##########
flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java:
##########
@@ -71,6 +71,12 @@ public interface DataSourceDialect<C extends SourceConfig>
     /** The task context used for fetch task to fetch data from external 
systems. */
     FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase, 
C sourceConfig);
 
+    /** Try to reuse Context if reuse is true. */

Review Comment:
   I wonder which situation reuse is true , and which situation resuse is 
false? If always reuse,  no need the param resuse?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to