This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch pinned-reuse
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 9eb29e587c4be3b787291ee0fdd8bfa88123a922
Author: Ken Hu <[email protected]>
AuthorDate: Mon Jun 15 12:17:30 2026 -0700

    Share one connection pool across remote transactions in gremlin-driver
    
    Each Cluster.transact() previously built its own ClusteredClient, which
    stood up a full set of per-host connection pools and then tore them down
    on every commit/rollback. That cost memory and added connection-setup
    latency (TCP/TLS handshake) to every transaction, even though a
    transaction only ever talks to one host.
    
    Transactions don't need their own pools or a dedicated connection: the
    server binds each request to its transaction via the transaction id, and
    the HTTP model returns a connection to the pool as soon as the response
    completes. So a transaction only needs to pin to a host, not hold a
    connection. This lets all transactions share a single, eagerly created
    ClusteredClient and borrow a connection per request from the pinned
    host's warm pool, giving transaction requests the same connection
    occupancy as normal query traffic.
    
    While reworking PinnedClient:
    - Host selection now delegates to the cluster's LoadBalancingStrategy so
      transactions honor the configured strategy and skip unavailable hosts
      instead of picking a random (possibly dead) host.
    - closeAsync() no longer tears down pools. The pools belong to the shared
      client and are closed with the Cluster, after open transactions are
      rolled back; closing one transaction must not break its siblings.
    
    Assisted-by: Kiro:claude-opus-4-8
---
 .../apache/tinkerpop/gremlin/driver/Client.java    | 53 ++++++++++++++-------
 .../apache/tinkerpop/gremlin/driver/Cluster.java   | 22 ++++++++-
 .../tinkerpop/gremlin/driver/ClientTest.java       | 55 ++++++++++++++++++++++
 3 files changed, 111 insertions(+), 19 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 26c24f2da2..18b47997b0 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -567,35 +568,54 @@ public abstract class Client implements RequestSubmitter, 
RequestSubmitterAsync
     }
 
     /**
-     * A {@link Client} that pins all requests to a single {@link Host}. Used 
internally by transactions
-     * to ensure all requests within a transaction go to the same server.
+     * A {@link Client} that pins all requests of a single transaction to one 
{@link Host}. Used internally by
+     * transactions; obtain one indirectly via {@link Cluster#transact()} or 
{@link Cluster#transact(String)} rather
+     * than directly.
      * <p>
-     * This client is not intended to be used directly — obtain a {@link 
org.apache.tinkerpop.gremlin.structure.Transaction}
-     * via {@link Cluster#transact()} or {@link Cluster#transact(String)} 
instead.
+     * This is a lightweight routing view that owns no connection pools. All 
transactions share a single
+     * {@link ClusteredClient} (one per {@link Cluster}) and borrow a 
connection per request from the pinned host's
+     * pool, returning it once the response completes. Those pools are closed 
when the Cluster closes, never by this
+     * client.
      */
     public static class PinnedClient extends Client {
         private final ClusteredClient clusteredClient;
         private final Host pinnedHost;
-        private final AtomicReference<CompletableFuture<Void>> closing = new 
AtomicReference<>(null);
+        // Tracks this view's own closed state for the isClosing() contract; 
see closeAsync().
+        private final AtomicBoolean closed = new AtomicBoolean(false);
 
-        PinnedClient(final Cluster cluster) {
-            super(cluster);
-            this.pinnedHost = chooseRandomHost();
-            this.clusteredClient = cluster.connect();
+        PinnedClient(final ClusteredClient clusteredClient) {
+            super(clusteredClient.getCluster());
+            this.clusteredClient = clusteredClient;
+            this.pinnedHost = choosePinnedHost();
         }
 
         public Host getPinnedHost() {
             return pinnedHost;
         }
 
+        /**
+         * Delegates host selection to the cluster's {@link 
LoadBalancingStrategy} so transactions honor the configured
+         * strategy and skip unavailable hosts rather than pinning to a dead 
one. Falls back to a random host if the
+         * strategy yields none, mirroring {@link 
ClusteredClient#chooseConnection(RequestMessage)}.
+         */
+        private Host choosePinnedHost() {
+            final Iterator<Host> hosts = 
cluster.loadBalancingStrategy().select(null);
+            return hosts.hasNext() ? hosts.next() : chooseRandomHost();
+        }
+
         @Override
         protected void initializeImplementation() {
-            this.clusteredClient.init();
+            // Pools are owned and eagerly initialized by the shared client; 
this idempotent call just ensures they
+            // exist before the first request is routed.
+            clusteredClient.init();
             initialized = true;
         }
 
         @Override
         protected Connection chooseConnection(final RequestMessage msg) throws 
TimeoutException, ConnectionException {
+            // Transactions pin to a host, not a connection: each request 
carries its transaction id, so any connection
+            // to the pinned host is interchangeable. The borrowed connection 
returns to the pool as soon as the
+            // response completes (see Connection.write()).
             final ConnectionPool pool = 
clusteredClient.hostConnectionPools.get(pinnedHost);
             if (pool == null) throw new NoHostAvailableException();
             return 
pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, 
TimeUnit.MILLISECONDS);
@@ -603,18 +623,17 @@ public abstract class Client implements RequestSubmitter, 
RequestSubmitterAsync
 
         @Override
         public boolean isClosing() {
-            return closing.get() != null;
+            return closed.get();
         }
 
         /**
-         * Marks this client as closed. The underlying pool is owned by {@link 
ClusteredClient} and is not closed here.
+         * Records the closed state for the {@link #isClosing()} contract. 
Intentionally closes no connections: the
+         * shared pools are owned by the transaction {@link ClusteredClient} 
and closed with the {@link Cluster}.
          */
         @Override
-        public synchronized CompletableFuture<Void> closeAsync() {
-            if (closing.get() != null) return closing.get();
-
-            closing.set(clusteredClient.closeAsync());
-            return closing.get();
+        public CompletableFuture<Void> closeAsync() {
+            closed.set(true);
+            return CompletableFuture.completedFuture(null);
         }
     }
 }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index a40e0f6a72..b6936c01fc 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -119,8 +119,9 @@ public final class Cluster {
      * @param graphOrTraversalSource the graph/traversal source alias, or null 
to use the server default
      */
     public RemoteTransaction transact(final String graphOrTraversalSource) {
-        final Client.PinnedClient pinnedClient = new Client.PinnedClient(this);
-        manager.trackClient(pinnedClient);
+        // Ensures the shared transaction client and its pools exist before 
pinning the transaction to a host.
+        init();
+        final Client.PinnedClient pinnedClient = new 
Client.PinnedClient(manager.transactionClient);
         return new HttpRemoteTransaction(pinnedClient, graphOrTraversalSource);
     }
 
@@ -911,6 +912,10 @@ public final class Cluster {
 
         private final List<WeakReference<Client>> openedClients = new 
ArrayList<>();
 
+        // Single ClusteredClient shared by all transactions. Not tracked in 
openedClients: it is Cluster-owned and
+        // closed explicitly in close(), after the rollbacks that borrow from 
its pools.
+        private Client.ClusteredClient transactionClient;
+
         private Manager(final Builder builder) {
             validateBuilder(builder);
 
@@ -1018,6 +1023,13 @@ public final class Cluster {
             contactPoints.forEach(address -> {
                 final Host host = add(address);
             });
+
+            // All transactions share one ClusteredClient rather than each 
transact() standing up (and tearing down on
+            // commit/rollback) its own per-host pools. Sharing is safe 
because transactions borrow a connection per
+            // request and return it immediately, so pool occupancy matches 
normal query traffic. Eager for simplicity;
+            // unused connections are reaped by the idle timeout.
+            transactionClient = new Client.ClusteredClient(Cluster.this);
+            transactionClient.init();
         }
 
         void trackClient(final Client client) {
@@ -1068,6 +1080,12 @@ public final class Cluster {
                 }
             }
 
+            // Closed here, after the rollbacks above that borrow from its 
pools (it is intentionally not in
+            // openedClients).
+            if (transactionClient != null) {
+                clientCloseFutures.add(transactionClient.closeAsync());
+            }
+
             // when all the clients are fully closed then shutdown the netty 
event loop. not sure why this needs to
             // block here, but if it doesn't then factory.shutdown() below 
doesn't seem to want to ever complete.
             // ideally, this should all be async, but i guess it wasn't before 
this change so just going to leave it
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
new file mode 100644
index 0000000000..9e90db7aaa
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link Client} and its implementations.
+ */
+public class ClientTest {
+
+    /**
+     * Exercises {@link Client.PinnedClient}'s host selection (the 
transaction-pinning logic). A transaction must pin
+     * to a host chosen via the cluster's load-balancing strategy, and must 
never pin to a host that is currently
+     * unavailable (it would otherwise route every request to a dead server). 
The strategy is seeded directly via
+     * {@code onNew} so selection can be verified without opening any real 
connections.
+     */
+    @Test
+    public void shouldPinTransactionToAvailableHostSelectedByStrategy() {
+        final Cluster cluster = Cluster.build("localhost").create();
+
+        final Host availableHost = mock(Host.class);
+        final Host unavailableHost = mock(Host.class);
+        when(availableHost.isAvailable()).thenReturn(true);
+        when(unavailableHost.isAvailable()).thenReturn(false);
+
+        // Simulate both hosts having been registered with the strategy 
without standing up connection pools.
+        cluster.loadBalancingStrategy().onNew(unavailableHost);
+        cluster.loadBalancingStrategy().onNew(availableHost);
+
+        final Client.PinnedClient pinnedClient = new Client.PinnedClient(new 
Client.ClusteredClient(cluster));
+
+        assertEquals(availableHost, pinnedClient.getPinnedHost());
+    }
+}

Reply via email to