This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch pinned-reuse in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 9eb29e587c4be3b787291ee0fdd8bfa88123a922 Author: Ken Hu <[email protected]> AuthorDate: Mon Jun 15 12:17:30 2026 -0700 Share one connection pool across remote transactions in gremlin-driver Each Cluster.transact() previously built its own ClusteredClient, which stood up a full set of per-host connection pools and then tore them down on every commit/rollback. That cost memory and added connection-setup latency (TCP/TLS handshake) to every transaction, even though a transaction only ever talks to one host. Transactions don't need their own pools or a dedicated connection: the server binds each request to its transaction via the transaction id, and the HTTP model returns a connection to the pool as soon as the response completes. So a transaction only needs to pin to a host, not hold a connection. This lets all transactions share a single, eagerly created ClusteredClient and borrow a connection per request from the pinned host's warm pool, giving transaction requests the same connection occupancy as normal query traffic. While reworking PinnedClient: - Host selection now delegates to the cluster's LoadBalancingStrategy so transactions honor the configured strategy and skip unavailable hosts instead of picking a random (possibly dead) host. - closeAsync() no longer tears down pools. The pools belong to the shared client and are closed with the Cluster, after open transactions are rolled back; closing one transaction must not break its siblings. Assisted-by: Kiro:claude-opus-4-8 --- .../apache/tinkerpop/gremlin/driver/Client.java | 53 ++++++++++++++------- .../apache/tinkerpop/gremlin/driver/Cluster.java | 22 ++++++++- .../tinkerpop/gremlin/driver/ClientTest.java | 55 ++++++++++++++++++++++ 3 files changed, 111 insertions(+), 19 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index 26c24f2da2..18b47997b0 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -567,35 +568,54 @@ public abstract class Client implements RequestSubmitter, RequestSubmitterAsync } /** - * A {@link Client} that pins all requests to a single {@link Host}. Used internally by transactions - * to ensure all requests within a transaction go to the same server. + * A {@link Client} that pins all requests of a single transaction to one {@link Host}. Used internally by + * transactions; obtain one indirectly via {@link Cluster#transact()} or {@link Cluster#transact(String)} rather + * than directly. * <p> - * This client is not intended to be used directly — obtain a {@link org.apache.tinkerpop.gremlin.structure.Transaction} - * via {@link Cluster#transact()} or {@link Cluster#transact(String)} instead. + * This is a lightweight routing view that owns no connection pools. All transactions share a single + * {@link ClusteredClient} (one per {@link Cluster}) and borrow a connection per request from the pinned host's + * pool, returning it once the response completes. Those pools are closed when the Cluster closes, never by this + * client. */ public static class PinnedClient extends Client { private final ClusteredClient clusteredClient; private final Host pinnedHost; - private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null); + // Tracks this view's own closed state for the isClosing() contract; see closeAsync(). + private final AtomicBoolean closed = new AtomicBoolean(false); - PinnedClient(final Cluster cluster) { - super(cluster); - this.pinnedHost = chooseRandomHost(); - this.clusteredClient = cluster.connect(); + PinnedClient(final ClusteredClient clusteredClient) { + super(clusteredClient.getCluster()); + this.clusteredClient = clusteredClient; + this.pinnedHost = choosePinnedHost(); } public Host getPinnedHost() { return pinnedHost; } + /** + * Delegates host selection to the cluster's {@link LoadBalancingStrategy} so transactions honor the configured + * strategy and skip unavailable hosts rather than pinning to a dead one. Falls back to a random host if the + * strategy yields none, mirroring {@link ClusteredClient#chooseConnection(RequestMessage)}. + */ + private Host choosePinnedHost() { + final Iterator<Host> hosts = cluster.loadBalancingStrategy().select(null); + return hosts.hasNext() ? hosts.next() : chooseRandomHost(); + } + @Override protected void initializeImplementation() { - this.clusteredClient.init(); + // Pools are owned and eagerly initialized by the shared client; this idempotent call just ensures they + // exist before the first request is routed. + clusteredClient.init(); initialized = true; } @Override protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException { + // Transactions pin to a host, not a connection: each request carries its transaction id, so any connection + // to the pinned host is interchangeable. The borrowed connection returns to the pool as soon as the + // response completes (see Connection.write()). final ConnectionPool pool = clusteredClient.hostConnectionPools.get(pinnedHost); if (pool == null) throw new NoHostAvailableException(); return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS); @@ -603,18 +623,17 @@ public abstract class Client implements RequestSubmitter, RequestSubmitterAsync @Override public boolean isClosing() { - return closing.get() != null; + return closed.get(); } /** - * Marks this client as closed. The underlying pool is owned by {@link ClusteredClient} and is not closed here. + * Records the closed state for the {@link #isClosing()} contract. Intentionally closes no connections: the + * shared pools are owned by the transaction {@link ClusteredClient} and closed with the {@link Cluster}. */ @Override - public synchronized CompletableFuture<Void> closeAsync() { - if (closing.get() != null) return closing.get(); - - closing.set(clusteredClient.closeAsync()); - return closing.get(); + public CompletableFuture<Void> closeAsync() { + closed.set(true); + return CompletableFuture.completedFuture(null); } } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index a40e0f6a72..b6936c01fc 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -119,8 +119,9 @@ public final class Cluster { * @param graphOrTraversalSource the graph/traversal source alias, or null to use the server default */ public RemoteTransaction transact(final String graphOrTraversalSource) { - final Client.PinnedClient pinnedClient = new Client.PinnedClient(this); - manager.trackClient(pinnedClient); + // Ensures the shared transaction client and its pools exist before pinning the transaction to a host. + init(); + final Client.PinnedClient pinnedClient = new Client.PinnedClient(manager.transactionClient); return new HttpRemoteTransaction(pinnedClient, graphOrTraversalSource); } @@ -911,6 +912,10 @@ public final class Cluster { private final List<WeakReference<Client>> openedClients = new ArrayList<>(); + // Single ClusteredClient shared by all transactions. Not tracked in openedClients: it is Cluster-owned and + // closed explicitly in close(), after the rollbacks that borrow from its pools. + private Client.ClusteredClient transactionClient; + private Manager(final Builder builder) { validateBuilder(builder); @@ -1018,6 +1023,13 @@ public final class Cluster { contactPoints.forEach(address -> { final Host host = add(address); }); + + // All transactions share one ClusteredClient rather than each transact() standing up (and tearing down on + // commit/rollback) its own per-host pools. Sharing is safe because transactions borrow a connection per + // request and return it immediately, so pool occupancy matches normal query traffic. Eager for simplicity; + // unused connections are reaped by the idle timeout. + transactionClient = new Client.ClusteredClient(Cluster.this); + transactionClient.init(); } void trackClient(final Client client) { @@ -1068,6 +1080,12 @@ public final class Cluster { } } + // Closed here, after the rollbacks above that borrow from its pools (it is intentionally not in + // openedClients). + if (transactionClient != null) { + clientCloseFutures.add(transactionClient.closeAsync()); + } + // when all the clients are fully closed then shutdown the netty event loop. not sure why this needs to // block here, but if it doesn't then factory.shutdown() below doesn't seem to want to ever complete. // ideally, this should all be async, but i guess it wasn't before this change so just going to leave it diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java new file mode 100644 index 0000000000..9e90db7aaa --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link Client} and its implementations. + */ +public class ClientTest { + + /** + * Exercises {@link Client.PinnedClient}'s host selection (the transaction-pinning logic). A transaction must pin + * to a host chosen via the cluster's load-balancing strategy, and must never pin to a host that is currently + * unavailable (it would otherwise route every request to a dead server). The strategy is seeded directly via + * {@code onNew} so selection can be verified without opening any real connections. + */ + @Test + public void shouldPinTransactionToAvailableHostSelectedByStrategy() { + final Cluster cluster = Cluster.build("localhost").create(); + + final Host availableHost = mock(Host.class); + final Host unavailableHost = mock(Host.class); + when(availableHost.isAvailable()).thenReturn(true); + when(unavailableHost.isAvailable()).thenReturn(false); + + // Simulate both hosts having been registered with the strategy without standing up connection pools. + cluster.loadBalancingStrategy().onNew(unavailableHost); + cluster.loadBalancingStrategy().onNew(availableHost); + + final Client.PinnedClient pinnedClient = new Client.PinnedClient(new Client.ClusteredClient(cluster)); + + assertEquals(availableHost, pinnedClient.getPinnedHost()); + } +}
