loserwang1024 commented on code in PR #4113: URL: https://github.com/apache/flink-cdc/pull/4113#discussion_r2329390546
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java: ########## @@ -133,6 +135,17 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { } } + @Override + protected void onSplitFinished(Map finishedSplitIds) { + super.onSplitFinished(finishedSplitIds); + + if (this.sourceConfig.getStartupOptions().isSnapshotOnly()) { + PostgresDialect dialect = (PostgresDialect) this.dialect; + boolean removed = dialect.removeSlot(dialect.getSlotName()); + LOG.info("Remove slot '{}' result is {}.", dialect.getSlotName(), removed); + } + } + Review Comment: ```suggestion @Override protected void onSplitFinished(Map<String, SourceSplitState> finishedSplitIds) { super.onSplitFinished(finishedSplitIds); for (SourceSplitState splitState : finishedSplitIds.values()) { SourceSplitBase sourceSplit = splitState.toSourceSplit(); if (sourceSplit.isStreamSplit() ) { StreamSplit streamSplit = sourceSplit.asStreamSplit(); if(streamSplit.getStartingOffset().isAtOrAfter(streamSplit.getEndingOffset())){ PostgresDialect dialect = (PostgresDialect) this.dialect; boolean removed = dialect.removeSlot(dialect.getSlotName()); LOG.info("Remove slot '{}' result is {}.", dialect.getSlotName(), removed); } } } } ``` It seems that even a snapshot split is finished, the global slot name will be removed. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java: ########## @@ -193,16 +191,16 @@ private void maybeCreateSlotForBackFillReadTask( /** Drop slot for backfill task and close replication connection. */ private void maybeDropSlotForBackFillReadTask( - PostgresReplicationConnection replicationConnection, boolean skipSnapshotBackfill) { + PostgresConnection connection, String slotName, boolean skipSnapshotBackfill) { // if skip backfill, no need to create slot here if (skipSnapshotBackfill) { return; } try { - replicationConnection.close(true); + connection.dropReplicationSlot(slotName); Review Comment: Why needs change here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org