This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5313909e01 [core] Ensuring JDBC connection is not stale before using
it (#7470)
5313909e01 is described below
commit 5313909e0137244d84d803a85ffba0d9d5b03df6
Author: junmuz <[email protected]>
AuthorDate: Sun Mar 29 13:05:34 2026 +0100
[core] Ensuring JDBC connection is not stale before using it (#7470)
---
.../java/org/apache/paimon/client/ClientPool.java | 9 ++
.../org/apache/paimon/jdbc/JdbcClientPool.java | 34 +++++
.../org/apache/paimon/jdbc/JdbcClientPoolTest.java | 152 +++++++++++++++++++++
3 files changed, 195 insertions(+)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
index c42a30b353..b635a0882c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
+++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java
@@ -65,6 +65,7 @@ public interface ClientPool<C, E extends Exception> {
continue;
}
try {
+ client = ensureActiveClient(client);
return action.run(client);
} finally {
clients.addFirst(client);
@@ -72,6 +73,14 @@ public interface ClientPool<C, E extends Exception> {
}
}
+ /**
+ * Validates that the client is still active before use. Subclasses
should override this
+ * method to provide protocol-specific validation and reconnection
logic.
+ */
+ protected C ensureActiveClient(C client) {
+ return client;
+ }
+
@Override
public void execute(ExecuteAction<C, E> action) throws E,
InterruptedException {
run(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
index d2ef4ecf73..80c97b4a1b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java
@@ -20,6 +20,9 @@ package org.apache.paimon.jdbc;
import org.apache.paimon.client.ClientPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
@@ -32,12 +35,16 @@ import java.util.regex.Pattern;
/** Client pool for jdbc. */
public class JdbcClientPool extends ClientPool.ClientPoolImpl<Connection,
SQLException> {
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcClientPool.class);
private static final Pattern PROTOCOL_PATTERN =
Pattern.compile("jdbc:([^:]+):(.*)");
+ private static final int CONNECTION_VALIDATION_TIMEOUT_SECONDS = 5;
private final String protocol;
+ private final Supplier<Connection> connectionSupplier;
public JdbcClientPool(int poolSize, String dbUrl, Map<String, String>
props) {
super(poolSize, clientSupplier(dbUrl, props));
+ this.connectionSupplier = clientSupplier(dbUrl, props);
Matcher matcher = PROTOCOL_PATTERN.matcher(dbUrl);
if (matcher.matches()) {
this.protocol = matcher.group(1);
@@ -46,6 +53,23 @@ public class JdbcClientPool extends
ClientPool.ClientPoolImpl<Connection, SQLExc
}
}
+ @Override
+ protected Connection ensureActiveClient(Connection connection) {
+ try {
+ if (connection.isClosed()
+ ||
!connection.isValid(CONNECTION_VALIDATION_TIMEOUT_SECONDS)) {
+ LOG.warn("Stale JDBC connection detected, creating a new
connection.");
+ closeQuietly(connection);
+ return connectionSupplier.get();
+ }
+ } catch (SQLException e) {
+ LOG.warn("Failed to validate JDBC connection, creating a new
connection.", e);
+ closeQuietly(connection);
+ return connectionSupplier.get();
+ }
+ return connection;
+ }
+
private static Supplier<Connection> clientSupplier(String dbUrl,
Map<String, String> props) {
return () -> {
try {
@@ -70,4 +94,14 @@ public class JdbcClientPool extends
ClientPool.ClientPoolImpl<Connection, SQLExc
throw new RuntimeException("Failed to close connection", e);
}
}
+
+ private static void closeQuietly(Connection connection) {
+ try {
+ if (connection != null && !connection.isClosed()) {
+ connection.close();
+ }
+ } catch (SQLException e) {
+ LOG.debug("Failed to close stale connection", e);
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java
new file mode 100644
index 0000000000..a96a2170bc
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcClientPoolTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jdbc;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link JdbcClientPool} connection validation. */
+public class JdbcClientPoolTest {
+
+ private JdbcClientPool createPool(int poolSize) {
+ String dbUrl =
+ "jdbc:sqlite:file::memory:?ic" +
UUID.randomUUID().toString().replace("-", "");
+ return new JdbcClientPool(poolSize, dbUrl, Collections.emptyMap());
+ }
+
+ @Test
+ public void testValidConnectionIsReused() throws SQLException,
InterruptedException {
+ JdbcClientPool pool = createPool(1);
+ try {
+ AtomicReference<Connection> firstConn = new AtomicReference<>();
+ AtomicReference<Connection> secondConn = new AtomicReference<>();
+
+ pool.run(
+ connection -> {
+ firstConn.set(connection);
+ return null;
+ });
+
+ pool.run(
+ connection -> {
+ secondConn.set(connection);
+ return null;
+ });
+
+ assertThat(secondConn.get()).isSameAs(firstConn.get());
+ } finally {
+ pool.close();
+ }
+ }
+
+ @Test
+ public void testClosedConnectionIsReplaced() throws SQLException,
InterruptedException {
+ JdbcClientPool pool = createPool(1);
+ try {
+ AtomicReference<Connection> firstConn = new AtomicReference<>();
+ AtomicReference<Connection> secondConn = new AtomicReference<>();
+
+ // Get the connection and close it to simulate a stale connection
+ pool.run(
+ connection -> {
+ firstConn.set(connection);
+ connection.close();
+ return null;
+ });
+
+ // The pool should detect the closed connection and create a new
one
+ pool.run(
+ connection -> {
+ secondConn.set(connection);
+ return null;
+ });
+
+ assertThat(secondConn.get()).isNotSameAs(firstConn.get());
+ assertThat(secondConn.get().isClosed()).isFalse();
+ } finally {
+ pool.close();
+ }
+ }
+
+ @Test
+ public void testReplacedConnectionIsReturnedToPool() throws SQLException,
InterruptedException {
+ JdbcClientPool pool = createPool(1);
+ try {
+ AtomicReference<Connection> replacedConn = new AtomicReference<>();
+ AtomicReference<Connection> thirdConn = new AtomicReference<>();
+
+ // Close the connection to trigger replacement
+ pool.run(
+ connection -> {
+ connection.close();
+ return null;
+ });
+
+ // This call gets the replacement connection
+ pool.run(
+ connection -> {
+ replacedConn.set(connection);
+ return null;
+ });
+
+ // The replacement should be reused since it's valid
+ pool.run(
+ connection -> {
+ thirdConn.set(connection);
+ return null;
+ });
+
+ assertThat(thirdConn.get()).isSameAs(replacedConn.get());
+ } finally {
+ pool.close();
+ }
+ }
+
+ @Test
+ public void testActionIsExecutedOnValidConnection() throws SQLException,
InterruptedException {
+ JdbcClientPool pool = createPool(1);
+ try {
+ // Close the connection to simulate staleness
+ pool.run(
+ connection -> {
+ connection.close();
+ return null;
+ });
+
+ // The action should receive a valid connection and succeed
+ boolean result =
+ pool.run(
+ connection -> {
+ // Execute a real SQL statement to verify the
connection works
+ return connection.prepareStatement("SELECT
1").execute();
+ });
+
+ assertThat(result).isTrue();
+ } finally {
+ pool.close();
+ }
+ }
+}