This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5313909e01 [core] Ensuring JDBC connection is not stale before using 
it (#7470)
5313909e01 is described below

commit 5313909e0137244d84d803a85ffba0d9d5b03df6
Author: junmuz <[email protected]>
AuthorDate: Sun Mar 29 13:05:34 2026 +0100

    [core] Ensuring JDBC connection is not stale before using it (#7470)
---
 .../java/org/apache/paimon/client/ClientPool.java  |   9 ++
 .../org/apache/paimon/jdbc/JdbcClientPool.java     |  34 +++++
 .../org/apache/paimon/jdbc/JdbcClientPoolTest.java | 152 +++++++++++++++++++++
 3 files changed, 195 insertions(+)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java 
b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
index c42a30b353..b635a0882c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
+++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
@@ -65,6 +65,7 @@ public interface ClientPool<C, E extends Exception> {
                     continue;
                 }
                 try {
+                    client = ensureActiveClient(client);
                     return action.run(client);
                 } finally {
                     clients.addFirst(client);
@@ -72,6 +73,14 @@ public interface ClientPool<C, E extends Exception> {
             }
         }
 
+        /**
+         * Validates that the client is still active before use. Subclasses 
should override this
+         * method to provide protocol-specific validation and reconnection 
logic.
+         */
+        protected C ensureActiveClient(C client) {
+            return client;
+        }
+
         @Override
         public void execute(ExecuteAction<C, E> action) throws E, 
InterruptedException {
             run(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
index d2ef4ecf73..80c97b4a1b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
@@ -20,6 +20,9 @@ package org.apache.paimon.jdbc;
 
 import org.apache.paimon.client.ClientPool;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -32,12 +35,16 @@ import java.util.regex.Pattern;
 /** Client pool for jdbc. */
 public class JdbcClientPool extends ClientPool.ClientPoolImpl<Connection, 
SQLException> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcClientPool.class);
     private static final Pattern PROTOCOL_PATTERN = 
Pattern.compile("jdbc:([^:]+):(.*)");
+    private static final int CONNECTION_VALIDATION_TIMEOUT_SECONDS = 5;
 
     private final String protocol;
+    private final Supplier<Connection> connectionSupplier;
 
     public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> 
props) {
         super(poolSize, clientSupplier(dbUrl, props));
+        this.connectionSupplier = clientSupplier(dbUrl, props);
         Matcher matcher = PROTOCOL_PATTERN.matcher(dbUrl);
         if (matcher.matches()) {
             this.protocol = matcher.group(1);
@@ -46,6 +53,23 @@ public class JdbcClientPool extends 
ClientPool.ClientPoolImpl<Connection, SQLExc
         }
     }
 
+    @Override
+    protected Connection ensureActiveClient(Connection connection) {
+        try {
+            if (connection.isClosed()
+                    || 
!connection.isValid(CONNECTION_VALIDATION_TIMEOUT_SECONDS)) {
+                LOG.warn("Stale JDBC connection detected, creating a new 
connection.");
+                closeQuietly(connection);
+                return connectionSupplier.get();
+            }
+        } catch (SQLException e) {
+            LOG.warn("Failed to validate JDBC connection, creating a new 
connection.", e);
+            closeQuietly(connection);
+            return connectionSupplier.get();
+        }
+        return connection;
+    }
+
     private static Supplier<Connection> clientSupplier(String dbUrl, 
Map<String, String> props) {
         return () -> {
             try {
@@ -70,4 +94,14 @@ public class JdbcClientPool extends 
ClientPool.ClientPoolImpl<Connection, SQLExc
             throw new RuntimeException("Failed to close connection", e);
         }
     }
+
+    private static void closeQuietly(Connection connection) {
+        try {
+            if (connection != null && !connection.isClosed()) {
+                connection.close();
+            }
+        } catch (SQLException e) {
+            LOG.debug("Failed to close stale connection", e);
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java 
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java
new file mode 100644
index 0000000000..a96a2170bc
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link JdbcClientPool} connection validation. */
+public class JdbcClientPoolTest {
+
+    private JdbcClientPool createPool(int poolSize) {
+        String dbUrl =
+                "jdbc:sqlite:file::memory:?ic" + 
UUID.randomUUID().toString().replace("-", "");
+        return new JdbcClientPool(poolSize, dbUrl, Collections.emptyMap());
+    }
+
+    @Test
+    public void testValidConnectionIsReused() throws SQLException, 
InterruptedException {
+        JdbcClientPool pool = createPool(1);
+        try {
+            AtomicReference<Connection> firstConn = new AtomicReference<>();
+            AtomicReference<Connection> secondConn = new AtomicReference<>();
+
+            pool.run(
+                    connection -> {
+                        firstConn.set(connection);
+                        return null;
+                    });
+
+            pool.run(
+                    connection -> {
+                        secondConn.set(connection);
+                        return null;
+                    });
+
+            assertThat(secondConn.get()).isSameAs(firstConn.get());
+        } finally {
+            pool.close();
+        }
+    }
+
+    @Test
+    public void testClosedConnectionIsReplaced() throws SQLException, 
InterruptedException {
+        JdbcClientPool pool = createPool(1);
+        try {
+            AtomicReference<Connection> firstConn = new AtomicReference<>();
+            AtomicReference<Connection> secondConn = new AtomicReference<>();
+
+            // Get the connection and close it to simulate a stale connection
+            pool.run(
+                    connection -> {
+                        firstConn.set(connection);
+                        connection.close();
+                        return null;
+                    });
+
+            // The pool should detect the closed connection and create a new 
one
+            pool.run(
+                    connection -> {
+                        secondConn.set(connection);
+                        return null;
+                    });
+
+            assertThat(secondConn.get()).isNotSameAs(firstConn.get());
+            assertThat(secondConn.get().isClosed()).isFalse();
+        } finally {
+            pool.close();
+        }
+    }
+
+    @Test
+    public void testReplacedConnectionIsReturnedToPool() throws SQLException, 
InterruptedException {
+        JdbcClientPool pool = createPool(1);
+        try {
+            AtomicReference<Connection> replacedConn = new AtomicReference<>();
+            AtomicReference<Connection> thirdConn = new AtomicReference<>();
+
+            // Close the connection to trigger replacement
+            pool.run(
+                    connection -> {
+                        connection.close();
+                        return null;
+                    });
+
+            // This call gets the replacement connection
+            pool.run(
+                    connection -> {
+                        replacedConn.set(connection);
+                        return null;
+                    });
+
+            // The replacement should be reused since it's valid
+            pool.run(
+                    connection -> {
+                        thirdConn.set(connection);
+                        return null;
+                    });
+
+            assertThat(thirdConn.get()).isSameAs(replacedConn.get());
+        } finally {
+            pool.close();
+        }
+    }
+
+    @Test
+    public void testActionIsExecutedOnValidConnection() throws SQLException, 
InterruptedException {
+        JdbcClientPool pool = createPool(1);
+        try {
+            // Close the connection to simulate staleness
+            pool.run(
+                    connection -> {
+                        connection.close();
+                        return null;
+                    });
+
+            // The action should receive a valid connection and succeed
+            boolean result =
+                    pool.run(
+                            connection -> {
+                                // Execute a real SQL statement to verify the 
connection works
+                                return connection.prepareStatement("SELECT 
1").execute();
+                            });
+
+            assertThat(result).isTrue();
+        } finally {
+            pool.close();
+        }
+    }
+}

Reply via email to