This is an automated email from the ASF dual-hosted git repository.
kenhuuu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/master by this push:
new 36b392f5b2 Share one connection pool across remote transactions in
gremlin-driver (#3461)
36b392f5b2 is described below
commit 36b392f5b223e804a1bf5a1ff8c321c068cc47e6
Author: Ken Hu <[email protected]>
AuthorDate: Fri Jun 26 15:05:57 2026 -0700
Share one connection pool across remote transactions in gremlin-driver
(#3461)
Each Cluster.transact() previously built its own ClusteredClient, which
stood up a full set of per-host connection pools and then tore them down
on every commit/rollback. That cost memory and added connection-setup
latency (TCP/TLS handshake) to every transaction, even though a
transaction only ever talks to one host.
Transactions don't need their own pools or a dedicated connection: the
server binds each request to its transaction via the transaction id, and
the HTTP model returns a connection to the pool as soon as the response
completes. So a transaction only needs to pin to a host, not hold a
connection. This lets all transactions share a single, eagerly created
ClusteredClient and borrow a connection per request from the pinned
host's warm pool, giving transaction requests the same connection
occupancy as normal query traffic.
While reworking PinnedClient:
- Host selection now delegates to the cluster's LoadBalancingStrategy so
transactions honor the configured strategy and skip unavailable hosts
instead of picking a random (possibly dead) host.
- closeAsync() no longer tears down pools. The pools belong to the shared
client and are closed with the Cluster, after open transactions are
rolled back; closing one transaction must not break its siblings.
Assisted-by: Kiro:claude-opus-4-8
---
.../apache/tinkerpop/gremlin/driver/Client.java | 53 ++++++++++++++-------
.../apache/tinkerpop/gremlin/driver/Cluster.java | 22 ++++++++-
.../tinkerpop/gremlin/driver/ClientTest.java | 55 ++++++++++++++++++++++
3 files changed, 111 insertions(+), 19 deletions(-)
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 26c24f2da2..18b47997b0 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -567,35 +568,54 @@ public abstract class Client implements RequestSubmitter,
RequestSubmitterAsync
}
/**
- * A {@link Client} that pins all requests to a single {@link Host}. Used
internally by transactions
- * to ensure all requests within a transaction go to the same server.
+ * A {@link Client} that pins all requests of a single transaction to one
{@link Host}. Used internally by
+ * transactions; obtain one indirectly via {@link Cluster#transact()} or
{@link Cluster#transact(String)} rather
+ * than directly.
* <p>
- * This client is not intended to be used directly — obtain a {@link
org.apache.tinkerpop.gremlin.structure.Transaction}
- * via {@link Cluster#transact()} or {@link Cluster#transact(String)}
instead.
+ * This is a lightweight routing view that owns no connection pools. All
transactions share a single
+ * {@link ClusteredClient} (one per {@link Cluster}) and borrow a
connection per request from the pinned host's
+ * pool, returning it once the response completes. Those pools are closed
when the Cluster closes, never by this
+ * client.
*/
public static class PinnedClient extends Client {
private final ClusteredClient clusteredClient;
private final Host pinnedHost;
- private final AtomicReference<CompletableFuture<Void>> closing = new
AtomicReference<>(null);
+ // Tracks this view's own closed state for the isClosing() contract;
see closeAsync().
+ private final AtomicBoolean closed = new AtomicBoolean(false);
- PinnedClient(final Cluster cluster) {
- super(cluster);
- this.pinnedHost = chooseRandomHost();
- this.clusteredClient = cluster.connect();
+ PinnedClient(final ClusteredClient clusteredClient) {
+ super(clusteredClient.getCluster());
+ this.clusteredClient = clusteredClient;
+ this.pinnedHost = choosePinnedHost();
}
public Host getPinnedHost() {
return pinnedHost;
}
+ /**
+ * Delegates host selection to the cluster's {@link
LoadBalancingStrategy} so transactions honor the configured
+ * strategy and skip unavailable hosts rather than pinning to a dead
one. Falls back to a random host if the
+ * strategy yields none, mirroring {@link
ClusteredClient#chooseConnection(RequestMessage)}.
+ */
+ private Host choosePinnedHost() {
+ final Iterator<Host> hosts =
cluster.loadBalancingStrategy().select(null);
+ return hosts.hasNext() ? hosts.next() : chooseRandomHost();
+ }
+
@Override
protected void initializeImplementation() {
- this.clusteredClient.init();
+ // Pools are owned and eagerly initialized by the shared client;
this idempotent call just ensures they
+ // exist before the first request is routed.
+ clusteredClient.init();
initialized = true;
}
@Override
protected Connection chooseConnection(final RequestMessage msg) throws
TimeoutException, ConnectionException {
+ // Transactions pin to a host, not a connection: each request
carries its transaction id, so any connection
+ // to the pinned host is interchangeable. The borrowed connection
returns to the pool as soon as the
+ // response completes (see Connection.write()).
final ConnectionPool pool =
clusteredClient.hostConnectionPools.get(pinnedHost);
if (pool == null) throw new NoHostAvailableException();
return
pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection,
TimeUnit.MILLISECONDS);
@@ -603,18 +623,17 @@ public abstract class Client implements RequestSubmitter,
RequestSubmitterAsync
@Override
public boolean isClosing() {
- return closing.get() != null;
+ return closed.get();
}
/**
- * Marks this client as closed. The underlying pool is owned by {@link
ClusteredClient} and is not closed here.
+ * Records the closed state for the {@link #isClosing()} contract.
Intentionally closes no connections: the
+ * shared pools are owned by the transaction {@link ClusteredClient}
and closed with the {@link Cluster}.
*/
@Override
- public synchronized CompletableFuture<Void> closeAsync() {
- if (closing.get() != null) return closing.get();
-
- closing.set(clusteredClient.closeAsync());
- return closing.get();
+ public CompletableFuture<Void> closeAsync() {
+ closed.set(true);
+ return CompletableFuture.completedFuture(null);
}
}
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index a40e0f6a72..b6936c01fc 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -119,8 +119,9 @@ public final class Cluster {
* @param graphOrTraversalSource the graph/traversal source alias, or null
to use the server default
*/
public RemoteTransaction transact(final String graphOrTraversalSource) {
- final Client.PinnedClient pinnedClient = new Client.PinnedClient(this);
- manager.trackClient(pinnedClient);
+ // Ensures the shared transaction client and its pools exist before
pinning the transaction to a host.
+ init();
+ final Client.PinnedClient pinnedClient = new
Client.PinnedClient(manager.transactionClient);
return new HttpRemoteTransaction(pinnedClient, graphOrTraversalSource);
}
@@ -911,6 +912,10 @@ public final class Cluster {
private final List<WeakReference<Client>> openedClients = new
ArrayList<>();
+ // Single ClusteredClient shared by all transactions. Not tracked in
openedClients: it is Cluster-owned and
+ // closed explicitly in close(), after the rollbacks that borrow from
its pools.
+ private Client.ClusteredClient transactionClient;
+
private Manager(final Builder builder) {
validateBuilder(builder);
@@ -1018,6 +1023,13 @@ public final class Cluster {
contactPoints.forEach(address -> {
final Host host = add(address);
});
+
+ // All transactions share one ClusteredClient rather than each
transact() standing up (and tearing down on
+ // commit/rollback) its own per-host pools. Sharing is safe
because transactions borrow a connection per
+ // request and return it immediately, so pool occupancy matches
normal query traffic. Eager for simplicity;
+ // unused connections are reaped by the idle timeout.
+ transactionClient = new Client.ClusteredClient(Cluster.this);
+ transactionClient.init();
}
void trackClient(final Client client) {
@@ -1068,6 +1080,12 @@ public final class Cluster {
}
}
+ // Closed here, after the rollbacks above that borrow from its
pools (it is intentionally not in
+ // openedClients).
+ if (transactionClient != null) {
+ clientCloseFutures.add(transactionClient.closeAsync());
+ }
+
// when all the clients are fully closed then shutdown the netty
event loop. not sure why this needs to
// block here, but if it doesn't then factory.shutdown() below
doesn't seem to want to ever complete.
// ideally, this should all be async, but i guess it wasn't before
this change so just going to leave it
diff --git
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
new file mode 100644
index 0000000000..9e90db7aaa
--- /dev/null
+++
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link Client} and its implementations.
+ */
+public class ClientTest {
+
+ /**
+ * Exercises {@link Client.PinnedClient}'s host selection (the
transaction-pinning logic). A transaction must pin
+ * to a host chosen via the cluster's load-balancing strategy, and must
never pin to a host that is currently
+ * unavailable (it would otherwise route every request to a dead server).
The strategy is seeded directly via
+ * {@code onNew} so selection can be verified without opening any real
connections.
+ */
+ @Test
+ public void shouldPinTransactionToAvailableHostSelectedByStrategy() {
+ final Cluster cluster = Cluster.build("localhost").create();
+
+ final Host availableHost = mock(Host.class);
+ final Host unavailableHost = mock(Host.class);
+ when(availableHost.isAvailable()).thenReturn(true);
+ when(unavailableHost.isAvailable()).thenReturn(false);
+
+ // Simulate both hosts having been registered with the strategy
without standing up connection pools.
+ cluster.loadBalancingStrategy().onNew(unavailableHost);
+ cluster.loadBalancingStrategy().onNew(availableHost);
+
+ final Client.PinnedClient pinnedClient = new Client.PinnedClient(new
Client.ClusteredClient(cluster));
+
+ assertEquals(availableHost, pinnedClient.getPinnedHost());
+ }
+}