loserwang1024 commented on code in PR #4113:
URL: https://github.com/apache/flink-cdc/pull/4113#discussion_r2329390546


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java:
##########
@@ -133,6 +135,17 @@ public void notifyCheckpointComplete(long checkpointId) 
throws Exception {
         }
     }
 
+    @Override
+    protected void onSplitFinished(Map finishedSplitIds) {
+        super.onSplitFinished(finishedSplitIds);
+
+        if (this.sourceConfig.getStartupOptions().isSnapshotOnly()) {
+            PostgresDialect dialect = (PostgresDialect) this.dialect;
+            boolean removed = dialect.removeSlot(dialect.getSlotName());
+            LOG.info("Remove slot '{}' result is {}.", dialect.getSlotName(), 
removed);
+        }
+    }
+

Review Comment:
   ```suggestion
   @Override
       protected void onSplitFinished(Map<String, SourceSplitState> 
finishedSplitIds) {
           super.onSplitFinished(finishedSplitIds);
   
           for (SourceSplitState splitState : finishedSplitIds.values()) {
               SourceSplitBase sourceSplit = splitState.toSourceSplit();
               if (sourceSplit.isStreamSplit()  ) {
                   StreamSplit streamSplit = sourceSplit.asStreamSplit();
                   
if(streamSplit.getStartingOffset().isAtOrAfter(streamSplit.getEndingOffset())){
                       PostgresDialect dialect = (PostgresDialect) this.dialect;
                       boolean removed = 
dialect.removeSlot(dialect.getSlotName());
                       LOG.info("Remove slot '{}' result is {}.", 
dialect.getSlotName(), removed);
                   }
   
               }
           }
       }
   
   ```
   
   It seems that even a snapshot split is finished, the global slot name will 
be removed.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java:
##########
@@ -193,16 +191,16 @@ private void maybeCreateSlotForBackFillReadTask(
 
     /** Drop slot for backfill task and close replication connection. */
     private void maybeDropSlotForBackFillReadTask(
-            PostgresReplicationConnection replicationConnection, boolean 
skipSnapshotBackfill) {
+            PostgresConnection connection, String slotName, boolean 
skipSnapshotBackfill) {
         // if skip backfill, no need to create slot here
         if (skipSnapshotBackfill) {
             return;
         }
 
         try {
-            replicationConnection.close(true);
+            connection.dropReplicationSlot(slotName);

Review Comment:
   Why needs change here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to