This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ac0c992d85 Add PostgreSQLSlotManagerTest (#37454)
7ac0c992d85 is described below

commit 7ac0c992d8564148100b6df21a8a553dbbfe8953
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 21 20:22:25 2025 +0800

    Add PostgreSQLSlotManagerTest (#37454)
---
 .../position/slot/PostgreSQLSlotManagerTest.java   | 143 +++++++++++++++++++++
 1 file changed, 143 insertions(+)

diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/position/slot/PostgreSQLSlotManagerTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/position/slot/PostgreSQLSlotManagerTest.java
new file mode 100644
index 00000000000..d27900b9bcd
--- /dev/null
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/position/slot/PostgreSQLSlotManagerTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.position.slot;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class PostgreSQLSlotManagerTest {
+    
+    private static final String DECODE_PLUGIN = "test_decoding";
+    
+    private static final String LOAD_SQL = "SELECT slot_name, database FROM 
pg_replication_slots WHERE slot_name=? AND plugin=?";
+    
+    private static final String CREATE_SQL = "SELECT * FROM 
pg_create_logical_replication_slot(?, ?)";
+    
+    private static final String DROP_SQL = "SELECT 
pg_drop_replication_slot(?)";
+    
+    private final PostgreSQLSlotManager slotManager = new 
PostgreSQLSlotManager(DECODE_PLUGIN);
+    
+    @Test
+    void assertCreateWhenSlotNotPresent() throws SQLException {
+        Connection connection = mock(Connection.class);
+        when(connection.getCatalog()).thenReturn("foo_catalog");
+        PreparedStatement loadPreparedStatement = 
mock(PreparedStatement.class, RETURNS_DEEP_STUBS);
+        
when(connection.prepareStatement(LOAD_SQL)).thenReturn(loadPreparedStatement);
+        PreparedStatement createPreparedStatement = 
mock(PreparedStatement.class);
+        
when(connection.prepareStatement(CREATE_SQL)).thenReturn(createPreparedStatement);
+        slotManager.create(connection, "foo_slot");
+        String slotName = 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, "foo_slot");
+        verify(loadPreparedStatement).setString(1, slotName);
+        verify(loadPreparedStatement).setString(2, DECODE_PLUGIN);
+        verify(loadPreparedStatement).executeQuery();
+        verify(createPreparedStatement).setString(1, slotName);
+        verify(createPreparedStatement).setString(2, DECODE_PLUGIN);
+        verify(createPreparedStatement).execute();
+    }
+    
+    @Test
+    void assertCreateWhenSlotDatabaseIsNull() throws SQLException {
+        Connection connection = mock(Connection.class);
+        when(connection.getCatalog()).thenReturn("bar_catalog");
+        PreparedStatement loadPreparedStatement = 
mock(PreparedStatement.class);
+        ResultSet loadResultSet = mock(ResultSet.class);
+        
when(connection.prepareStatement(LOAD_SQL)).thenReturn(loadPreparedStatement);
+        when(loadPreparedStatement.executeQuery()).thenReturn(loadResultSet);
+        when(loadResultSet.next()).thenReturn(true);
+        String slotName = 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, "bar_slot");
+        when(loadResultSet.getString(1)).thenReturn(slotName);
+        when(loadResultSet.getString(2)).thenReturn(null);
+        PreparedStatement dropPreparedStatement = 
mock(PreparedStatement.class);
+        PreparedStatement createPreparedStatement = 
mock(PreparedStatement.class);
+        
when(connection.prepareStatement(DROP_SQL)).thenReturn(dropPreparedStatement);
+        
when(connection.prepareStatement(CREATE_SQL)).thenReturn(createPreparedStatement);
+        slotManager.create(connection, "bar_slot");
+        verify(dropPreparedStatement).setString(1, slotName);
+        verify(dropPreparedStatement).execute();
+        verify(createPreparedStatement).setString(1, slotName);
+        verify(createPreparedStatement).setString(2, DECODE_PLUGIN);
+        verify(createPreparedStatement).execute();
+    }
+    
+    @Test
+    void assertCreateWhenSlotDatabaseExistsDoNothing() throws SQLException {
+        Connection connection = mock(Connection.class);
+        when(connection.getCatalog()).thenReturn("baz_catalog");
+        PreparedStatement loadPreparedStatement = 
mock(PreparedStatement.class);
+        ResultSet loadResultSet = mock(ResultSet.class);
+        
when(connection.prepareStatement(LOAD_SQL)).thenReturn(loadPreparedStatement);
+        when(loadPreparedStatement.executeQuery()).thenReturn(loadResultSet);
+        when(loadResultSet.next()).thenReturn(true);
+        String slotName = 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, "baz_slot");
+        when(loadResultSet.getString(1)).thenReturn(slotName);
+        when(loadResultSet.getString(2)).thenReturn("baz_db");
+        slotManager.create(connection, "baz_slot");
+        verify(connection, never()).prepareStatement(DROP_SQL);
+        verify(connection, never()).prepareStatement(CREATE_SQL);
+    }
+    
+    @Test
+    void assertCreateHandleDuplicateAndOtherSQLState() throws SQLException {
+        Connection connection = mock(Connection.class);
+        when(connection.getCatalog()).thenReturn("dup_catalog");
+        PreparedStatement loadPreparedStatement = 
mock(PreparedStatement.class, RETURNS_DEEP_STUBS);
+        
when(connection.prepareStatement(LOAD_SQL)).thenReturn(loadPreparedStatement);
+        PreparedStatement createPreparedStatement = 
mock(PreparedStatement.class);
+        
when(connection.prepareStatement(CREATE_SQL)).thenReturn(createPreparedStatement);
+        when(createPreparedStatement.execute()).thenThrow(new 
SQLException("duplicate", "42710")).thenThrow(new SQLException("error", 
"99999"));
+        slotManager.create(connection, "dup_slot");
+        assertThrows(SQLException.class, () -> slotManager.create(connection, 
"dup_slot"));
+        String slotName = 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, "dup_slot");
+        verify(createPreparedStatement, times(2)).setString(1, slotName);
+        verify(createPreparedStatement, times(2)).setString(2, DECODE_PLUGIN);
+    }
+    
+    @Test
+    void assertDropIfExistedBranches() throws SQLException {
+        Connection connection = mock(Connection.class);
+        when(connection.getCatalog()).thenReturn("drop_catalog");
+        PreparedStatement loadPreparedStatement = 
mock(PreparedStatement.class);
+        ResultSet loadResultSet = mock(ResultSet.class);
+        
when(connection.prepareStatement(LOAD_SQL)).thenReturn(loadPreparedStatement);
+        when(loadPreparedStatement.executeQuery()).thenReturn(loadResultSet);
+        String slotName = 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, "drop_slot");
+        when(loadResultSet.next()).thenReturn(false, true);
+        when(loadResultSet.getString(1)).thenReturn(slotName);
+        when(loadResultSet.getString(2)).thenReturn("drop_db");
+        PreparedStatement dropPreparedStatement = 
mock(PreparedStatement.class);
+        
when(connection.prepareStatement(DROP_SQL)).thenReturn(dropPreparedStatement);
+        slotManager.dropIfExisted(connection, "drop_slot");
+        slotManager.dropIfExisted(connection, "drop_slot");
+        verify(loadPreparedStatement, times(2)).setString(1, slotName);
+        verify(loadPreparedStatement, times(2)).setString(2, DECODE_PLUGIN);
+        verify(dropPreparedStatement).setString(1, slotName);
+        verify(dropPreparedStatement).execute();
+    }
+}

Reply via email to