This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch master-tx-client
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit b521d6084dc5c6679aa7cab65ec20cdeb2e8fec4
Author: Ken Hu <[email protected]>
AuthorDate: Mon Mar 16 17:24:06 2026 -0700

    Add HTTP transaction support to gremlin-driver
    
    Introduce HttpRemoteTransaction and TransactionRemoteConnection to
    manage transaction lifecycle. RequestSubmitter and RequestSubmitterAsync
    interfaces were extracted out of Client to ensure transactions would
    have the same submission interface. Host selection and pooling moved
    out of Client and into Cluster to allow for easier implementation of a
    PinnedClient. Since Clients and Transactions now have a similar
    submission interface, they are both obtained from a Cluster.
    Transactions are meant to be short-lived then closed and Clients remain
    long-lived and preferrably re-used.
---
 CHANGELOG.asciidoc                                 |   2 +
 .../traversal/dsl/graph/GraphTraversalSource.java  |  13 +-
 .../apache/tinkerpop/gremlin/driver/Client.java    | 206 +++----
 .../apache/tinkerpop/gremlin/driver/Cluster.java   | 195 +++++--
 .../tinkerpop/gremlin/driver/Connection.java       |   7 -
 .../tinkerpop/gremlin/driver/ConnectionPool.java   |  23 +-
 .../tinkerpop/gremlin/driver/RequestOptions.java   |  31 ++
 .../tinkerpop/gremlin/driver/RequestSubmitter.java |  61 ++
 .../gremlin/driver/RequestSubmitterAsync.java      |  67 +++
 .../driver/handler/HttpGremlinRequestEncoder.java  |   7 +
 .../driver/remote/DriverRemoteConnection.java      |  15 +-
 .../driver/remote/HttpRemoteTransaction.java       | 314 +++++++++++
 .../driver/remote/TransactionRemoteConnection.java | 113 ++++
 .../tinkerpop/gremlin/driver/ClientTest.java       | 159 ++++++
 .../GremlinDriverTransactionIntegrateTest.java     | 617 +++++++++++++++++++++
 15 files changed, 1603 insertions(+), 227 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index fe123a3fae..27b6ebe1cf 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -28,6 +28,8 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Added `__contains__` and `keys()` to `Element` in `gremlin-python`.
 * Added `subgraph()` support for `gremlin-python` so that results are stored 
in a detached `Graph` object.
 * Added support for remote transactions to the `gremlin-server` through 
`TransactionManager` and `UnmanagedTransaction`.
+* Added support for transactions to `gremlin-driver` using the new 
`HttpRemoteTransaction`.
+* Modified how connection pooling works in `gremlin-driver` by moving handling 
from `Client` to `Cluster`.
 * Modified grammar to make `discard()` usage more consistent as a filter step 
where it can now be used to chain additional traversal steps and be used 
anonymously.
 * Removed `Meta` field from `ResponseResult` struct in `gremlin-go`.
 * Removed deprecated elements of the Java-based process testing suite: 
`ProcessStandardSuite`, `ProcessComputerSuite`, `ProcessLimitedSuite` and 
associated tests.
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
index b1d417ee02..684d9e8609 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
@@ -703,14 +703,19 @@ public class GraphTraversalSource implements 
TraversalSource {
 
     /**
      * Proxies calls through to the underlying {@link Graph#tx()} or to the 
{@link RemoteConnection#tx()}.
+     * <p>
+     * When a remote connection is present, this method delegates to the 
connection's
+     * {@link RemoteConnection#tx()} method, which returns an appropriate 
transaction
+     * implementation for the remote connection type (e.g., {@code 
HttpRemoteTransaction}
+     * for HTTP-based connections).
+     *
+     * @return A {@link Transaction} for managing transactional operations
      */
     public Transaction tx() {
         if (null == this.connection)
             return this.graph.tx();
-        else {
-            throw new UnsupportedOperationException("TinkerPop 4 does not yet 
support remote transactions");
-        }
-
+        else
+            return this.connection.tx();
     }
 
     /**
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 95af4270f5..0a5f518ac3 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -18,8 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -27,22 +25,10 @@ import 
org.apache.tinkerpop.gremlin.util.message.RequestMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLException;
-import java.net.ConnectException;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 /**
  * A {@code Client} is constructed from a {@link Cluster} and represents a way 
to send messages to Gremlin Server.
@@ -53,7 +39,7 @@ import java.util.stream.Collectors;
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public abstract class Client {
+public abstract class Client implements RequestSubmitter, 
RequestSubmitterAsync {
 
     private static final Logger logger = LoggerFactory.getLogger(Client.class);
     public static final String TOO_MANY_IN_FLIGHT_REQUESTS = "Number of active 
requests (%s) exceeds pool size (%s). " +
@@ -62,8 +48,6 @@ public abstract class Client {
     protected final Cluster cluster;
     protected volatile boolean initialized;
 
-    private static final Random random = new Random();
-
     Client(final Cluster cluster) {
         this.cluster = cluster;
     }
@@ -83,9 +67,10 @@ public abstract class Client {
     protected abstract void initializeImplementation();
 
     /**
-     * Chooses a {@link Connection} to write the message to.
+     * Selects the {@link Host} to send the request to. Implementations define 
the selection strategy
+     * (load-balanced vs pinned).
      */
-    protected abstract Connection chooseConnection(final RequestMessage msg) 
throws TimeoutException, ConnectionException;
+    protected abstract Host selectHost(final RequestMessage msg);
 
     /**
      * Asynchronous close of the {@code Client}.
@@ -239,6 +224,7 @@ public abstract class Client {
         options.getLanguage().ifPresent(lang -> request.addLanguage(lang));
         options.getMaterializeProperties().ifPresent(mp -> 
request.addMaterializeProperties(mp));
         options.getBulkResults().ifPresent(bulked -> 
request.addBulkResults(Boolean.parseBoolean(bulked)));
+        options.getTransactionId().ifPresent(transactionId -> 
request.addTransactionId(transactionId));
 
         return submitAsync(request.create());
     }
@@ -255,9 +241,7 @@ public abstract class Client {
         final CompletableFuture<ResultSet> future = new CompletableFuture<>();
         Connection connection = null;
         try {
-            // the connection is returned to the pool once the response has 
been completed...see Connection.write()
-            // the connection may be returned to the pool with the host being 
marked as "unavailable"
-            connection = chooseConnection(msg);
+            connection = cluster.borrowConnection(selectHost(msg));
             connection.write(msg, future);
             return future;
         } catch (RuntimeException re) {
@@ -292,9 +276,7 @@ public abstract class Client {
      */
     public final static class ClusteredClient extends Client {
 
-        final ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new 
ConcurrentHashMap<>();
         private final AtomicReference<CompletableFuture<Void>> closing = new 
AtomicReference<>(null);
-        private Throwable initializationFailure = null;
 
         ClusteredClient(final Cluster cluster) {
             super(cluster);
@@ -344,137 +326,32 @@ public abstract class Client {
         }
 
         /**
-         * Uses a {@link LoadBalancingStrategy} to choose the best {@link 
Host} and then selects the best connection
-         * from that host's connection pool.
+         * Uses a {@link LoadBalancingStrategy} to choose the best {@link 
Host}.
          */
         @Override
-        protected Connection chooseConnection(final RequestMessage msg) throws 
TimeoutException, ConnectionException {
+        protected Host selectHost(final RequestMessage msg) {
             final Iterator<Host> possibleHosts = 
this.cluster.loadBalancingStrategy().select(msg);
-
-            // try a random host if none are marked available. maybe it will 
reconnect in the meantime. better than
-            // going straight to a fast NoHostAvailableException as was the 
case in versions 3.5.4 and earlier
-            final Host bestHost = possibleHosts.hasNext() ? 
possibleHosts.next() : chooseRandomHost();
-            final ConnectionPool pool = hostConnectionPools.get(bestHost);
-            return 
pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, 
TimeUnit.MILLISECONDS);
-        }
-
-        private Host chooseRandomHost() {
-            final List<Host> hosts = new ArrayList<>(cluster.allHosts());
-            final int ix = random.nextInt(hosts.size());
-            return hosts.get(ix);
+            return possibleHosts.hasNext() ? possibleHosts.next() : 
cluster.randomHost();
         }
 
         /**
-         * Initializes the connection pools on all hosts.
+         * No-op — connection pools are owned and closed by the {@link 
Cluster}.
          */
         @Override
         protected void initializeImplementation() {
-            try {
-                CompletableFuture.allOf(cluster.allHosts().stream()
-                                .map(host -> CompletableFuture.runAsync(
-                                        () -> 
initializeConnectionSetupForHost.accept(host), cluster.hostScheduler()))
-                                .toArray(CompletableFuture[]::new))
-                        .join();
-            } catch (CompletionException ex) {
-                logger.error("Initialization failed", ex);
-                this.initializationFailure = ex;
-            }
-
-            // throw an error if there is no host available after initializing 
connection pool. we used
-            // to test cluster.availableHosts().isEmpty() but checking if we 
actually have hosts in
-            // the connection pool seems a bit more fireproof. if we look at 
initializeConnectionSetupForHost
-            // we can see that a successful initialization of the 
host/connection pool pair is followed by
-            // marking the host available and notifying the load balancing 
strategy. by relying directly on
-            // the state of hostConnectionPools we ensure that there is 
actually a concrete
-            // host/connection pool pair. even if the connection pool has 
immediate problems, it can fallback
-            // to its normal reconnection operation and won't put 
chooseConnection in a state where it can
-            // get a NPE if hostConnectionPools ends up being empty. it seems 
as if the safest minimum
-            // requirement for leaving this method is to ensure that at least 
one ConnectionPool constructor
-            // completed for at least one Host.
-            if (hostConnectionPools.isEmpty()) {
-                throwNoHostAvailableException();
-            }
-
-            // try to re-initiate any unavailable hosts in the background.
-            final List<Host> unavailableHosts = cluster.allHosts()
-                    .stream().filter(host -> 
!host.isAvailable()).collect(Collectors.toList());
-            if (!unavailableHosts.isEmpty()) {
-                handleUnavailableHosts(unavailableHosts);
-            }
-        }
-
-        private void throwNoHostAvailableException() {
-            final Throwable rootCause = 
ExceptionUtils.getRootCause(initializationFailure);
-            // allow the certain exceptions to propagate as a cause
-            if (rootCause instanceof SSLException || rootCause instanceof 
ConnectException) {
-                throw new NoHostAvailableException(initializationFailure);
-            } else {
-                throw new NoHostAvailableException();
-            }
+            // pools are initialized by Cluster.Manager.init()
         }
 
         /**
-         * Closes all the connection pools on all hosts.
+         * Marks this client as closed. Connection pools are managed by the 
{@link Cluster} and are not closed here.
          */
         @Override
         public synchronized CompletableFuture<Void> closeAsync() {
-            if (closing.get() != null)
-                return closing.get();
-
-            final CompletableFuture<Void> allPoolsClosedFuture =
-                    
CompletableFuture.allOf(hostConnectionPools.values().stream()
-                            .map(ConnectionPool::closeAsync)
-                            .toArray(CompletableFuture[]::new));
-
-            closing.set(allPoolsClosedFuture);
+            if (closing.get() != null) return closing.get();
+            closing.set(CompletableFuture.completedFuture(null));
             return closing.get();
         }
 
-        private final Consumer<Host> initializeConnectionSetupForHost = host 
-> {
-            try {
-                // hosts that don't initialize connection pools will come up 
as a dead host.
-                hostConnectionPools.put(host, new ConnectionPool(host, 
ClusteredClient.this));
-
-                // hosts are not marked as available at cluster initialization 
and are made available here instead.
-                host.makeAvailable();
-
-                // added a new host to the cluster so let the load-balancer 
know.
-                
ClusteredClient.this.cluster.loadBalancingStrategy().onNew(host);
-            } catch (RuntimeException ex) {
-                final String errMsg = "Could not initialize client for " + 
host;
-                logger.error(errMsg);
-                throw ex;
-            }
-        };
-
-        private void handleUnavailableHosts(final List<Host> unavailableHosts) 
{
-            // start the re-initialization attempt for each of the unavailable 
hosts through Host.makeUnavailable().
-            for (Host host : unavailableHosts) {
-                final CompletableFuture<Void> f = CompletableFuture.runAsync(
-                        () -> host.makeUnavailable(this::tryReInitializeHost), 
cluster.hostScheduler());
-                f.exceptionally(t -> {
-                    logger.error("", (t.getCause() == null) ? t : 
t.getCause());
-                    return null;
-                });
-            }
-        }
-
-        /**
-         * Attempt to re-initialize the {@link Host} that was previously 
marked as unavailable.  This method gets called
-         * as part of a schedule in {@link Host} to periodically try to 
re-initialize.
-         */
-        public boolean tryReInitializeHost(final Host host) {
-            logger.debug("Trying to re-initiate host connection pool on {}", 
host);
-
-            try {
-                initializeConnectionSetupForHost.accept(host);
-                return true;
-            } catch (Exception ex) {
-                logger.debug("Failed re-initialization attempt on {}", host, 
ex);
-                return false;
-            }
-        }
-
     }
 
     /**
@@ -530,12 +407,12 @@ public abstract class Client {
         }
 
         /**
-         * Delegates to the underlying {@link Client.ClusteredClient}.
+         * Delegates host selection to the underlying {@link 
Client.ClusteredClient}.
          */
         @Override
-        protected Connection chooseConnection(final RequestMessage msg) throws 
TimeoutException, ConnectionException {
+        protected Host selectHost(final RequestMessage msg) {
             if (close.isDone()) throw new IllegalStateException("Client is 
closed");
-            return client.chooseConnection(msg);
+            return client.selectHost(msg);
         }
 
         @Override
@@ -561,5 +438,54 @@ public abstract class Client {
             if (close.isDone()) throw new IllegalStateException("Client is 
closed");
             return new AliasClusteredClient(client, graphOrTraversalSource);
         }
+
     }
+
+    /**
+     * A {@link Client} that pins all requests to a single {@link Host}. Used 
internally by transactions
+     * to ensure all requests within a transaction go to the same server.
+     * <p>
+     * This client is not intended to be used directly — obtain a {@link 
org.apache.tinkerpop.gremlin.structure.Transaction}
+     * via {@link Cluster#transact()} or {@link Cluster#transact(String)} 
instead.
+     */
+    public static class PinnedClient extends Client {
+
+        private final Host pinnedHost;
+        private final AtomicReference<CompletableFuture<Void>> closing = new 
AtomicReference<>(null);
+
+        PinnedClient(final Cluster cluster, final Host pinnedHost) {
+            super(cluster);
+            this.pinnedHost = pinnedHost;
+        }
+
+        public Host getPinnedHost() {
+            return pinnedHost;
+        }
+
+        @Override
+        protected void initializeImplementation() {
+            initialized = true; // PinnedClient only borrows resources so it's 
technically always initialized
+        }
+
+        @Override
+        protected Host selectHost(final RequestMessage msg) {
+            return pinnedHost;
+        }
+
+        @Override
+        public boolean isClosing() {
+            return closing.get() != null;
+        }
+
+        /**
+         * Marks this client as closed. The underlying pool is owned by {@link 
Cluster} and is not closed here.
+         */
+        @Override
+        public synchronized CompletableFuture<Void> closeAsync() {
+            if (closing.get() != null) return closing.get();
+            closing.set(CompletableFuture.completedFuture(null));
+            return closing.get();
+        }
+    }
+
 }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index e080da8d51..0e52b2d423 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -31,7 +31,11 @@ import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.tinkerpop.gremlin.driver.auth.Auth;
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import 
org.apache.tinkerpop.gremlin.driver.interceptor.PayloadSerializingInterceptor;
+import org.apache.tinkerpop.gremlin.driver.remote.HttpRemoteTransaction;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
 import org.apache.tinkerpop.gremlin.util.MessageSerializer;
 import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
@@ -46,7 +50,6 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.ref.WeakReference;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -60,15 +63,20 @@ import java.security.cert.CertificateException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -81,6 +89,7 @@ import java.util.stream.Collectors;
 public final class Cluster {
     public static final String SERIALIZER_INTERCEPTOR_NAME = "serializer";
     private static final Logger logger = 
LoggerFactory.getLogger(Cluster.class);
+    private static final Random random = new Random();
 
     private final Manager manager;
 
@@ -94,49 +103,42 @@ public final class Cluster {
     }
 
     /**
-     * Creates a SessionedClient instance to this {@code Cluster}, meaning 
requests will be routed to
-     * a single server (randomly selected from the cluster), where the same 
bindings will be available on each request.
-     * Requests are bound to the same thread on the server and thus 
transactions may extend beyond the bounds of a
-     * single request.  The transactions are managed by the user and must be 
committed or rolled-back manually.
-     * <p/>
-     * Note that calling this method does not imply that a connection is made 
to the server itself at this point.
-     * Therefore, if there is only one server specified in the {@code Cluster} 
and that server is not available an
-     * error will not be raised at this point.  Connections get initialized in 
the {@link Client} when a request is
-     * submitted or can be directly initialized via {@link Client#init()}.
-     *
-     * @param sessionId user supplied id for the session which should be 
unique (a UUID is ideal).
+     * Creates a new {@link Client} based on the settings provided.
      */
-    public <T extends Client> T connect(final String sessionId) {
-        throw new UnsupportedOperationException("not implemented");
+    public <T extends Client> T connect() {
+        final Client client = new Client.ClusteredClient(this);
+        return (T) client;
     }
 
     /**
-     * Creates a SessionedClient instance to this {@code Cluster}, meaning 
requests will be routed to
-     * a single server (randomly selected from the cluster), where the same 
bindings will be available on each request.
-     * Requests are bound to the same thread on the server and thus 
transactions may extend beyond the bounds of a
-     * single request.  If {@code manageTransactions} is set to {@code false} 
then transactions are managed by the
-     * user and must be committed or rolled-back manually. When set to {@code 
true} the transaction is committed or
-     * rolled-back at the end of each request.
-     * <p/>
-     * Note that calling this method does not imply that a connection is made 
to the server itself at this point.
-     * Therefore, if there is only one server specified in the {@code Cluster} 
and that server is not available an
-     * error will not be raised at this point.  Connections get initialized in 
the {@link Client} when a request is
-     * submitted or can be directly initialized via {@link Client#init()}.
+     * Creates a new {@link Transaction} using the server's default traversal 
source.
+     * The server will bind to "g" by default when no traversal source is 
specified.
+     */
+    public RemoteTransaction transact() {
+        return transact(null);
+    }
+
+    /**
+     * Creates a new {@link Transaction} bound to the specified graph or 
traversal source.
      *
-     * @param sessionId user supplied id for the session which should be 
unique (a UUID is ideal).
-     * @param manageTransactions enables auto-transactions when set to true
+     * @param graphOrTraversalSource the graph/traversal source alias, or null 
to use the server default
      */
-    public <T extends Client> T connect(final String sessionId, final boolean 
manageTransactions) {
-        throw new UnsupportedOperationException("not implemented");
+    public RemoteTransaction transact(final String graphOrTraversalSource) {
+        init();
+        final Host host = randomHost();
+        final Client.PinnedClient pinnedClient = new Client.PinnedClient(this, 
host);
+        return new HttpRemoteTransaction(pinnedClient, graphOrTraversalSource);
     }
 
     /**
-     * Creates a new {@link Client} based on the settings provided.
+     * Selects a random host from all known hosts.
+     * @return the randomly chosen Host.
      */
-    public <T extends Client> T connect() {
-        final Client client = new Client.ClusteredClient(this);
-        manager.trackClient(client);
-        return (T) client;
+    Host randomHost() {
+        final List<Host> all = new ArrayList<>(allHosts());
+        if (all.isEmpty()) throw new NoHostAvailableException();
+
+        return all.get(random.nextInt(all.size()));
     }
 
     @Override
@@ -365,6 +367,32 @@ public final class Cluster {
         return Collections.unmodifiableCollection(manager.allHosts());
     }
 
+    /**
+     * Returns the {@link ConnectionPool} for the given {@link Host}, or null 
if not found.
+     */
+    ConnectionPool getPoolFor(final Host host) {
+        return manager.getPoolFor(host);
+    }
+
+    public void trackTransaction(final HttpRemoteTransaction tx) {
+        manager.trackTransaction(tx);
+    }
+
+    public void untrackTransaction(final HttpRemoteTransaction tx) {
+        manager.untrackTransaction(tx);
+    }
+
+    /**
+     * Borrows a {@link Connection} from the pool for the given {@link Host}.
+     * Throws {@link IllegalStateException} if the cluster is closing.
+     */
+    Connection borrowConnection(final Host host) throws TimeoutException, 
ConnectionException {
+        if (isClosing()) throw new IllegalStateException("Cannot borrow a 
connection - cluster is closing");
+        final ConnectionPool pool = manager.getPoolFor(host);
+        if (pool == null) throw new NoHostAvailableException();
+        return 
pool.borrowConnection(connectionPoolSettings().maxWaitForConnection, 
TimeUnit.MILLISECONDS);
+    }
+
     Factory getFactory() {
         return manager.factory;
     }
@@ -393,7 +421,7 @@ public final class Cluster {
         return manager.connectionPoolSettings;
     }
 
-    LoadBalancingStrategy loadBalancingStrategy() {
+    public LoadBalancingStrategy loadBalancingStrategy() {
         return manager.loadBalancingStrategy;
     }
 
@@ -936,6 +964,8 @@ public final class Cluster {
 
     class Manager {
         private final ConcurrentMap<InetSocketAddress, Host> hosts = new 
ConcurrentHashMap<>();
+        private final ConcurrentMap<Host, ConnectionPool> hostConnectionPools 
= new ConcurrentHashMap<>();
+        private final Set<HttpRemoteTransaction> openTransactions = 
ConcurrentHashMap.newKeySet();
         private boolean initialized;
         private final List<InetSocketAddress> contactPoints;
         private final Factory factory;
@@ -970,8 +1000,6 @@ public final class Cluster {
 
         private final AtomicReference<CompletableFuture<Void>> closeFuture = 
new AtomicReference<>();
 
-        private final List<WeakReference<Client>> openedClients = new 
ArrayList<>();
-
         private Manager(final Builder builder) {
             validateBuilder(builder);
 
@@ -1075,10 +1103,72 @@ public final class Cluster {
             contactPoints.forEach(address -> {
                 final Host host = add(address);
             });
+
+            // initialize connection pools for all known hosts
+            try {
+                CompletableFuture.allOf(hosts.values().stream()
+                        .map(host -> CompletableFuture.runAsync(
+                                () -> initializeConnectionSetupForHost(host), 
hostScheduler))
+                        .toArray(CompletableFuture[]::new))
+                        .join();
+            } catch (CompletionException ex) {
+                logger.error("Cluster pool initialization failed", ex);
+            }
+
+            if (hostConnectionPools.isEmpty()) {
+                throw new NoHostAvailableException();
+            }
+
+            final List<Host> unavailableHosts = hosts.values().stream()
+                    .filter(host -> 
!host.isAvailable()).collect(Collectors.toList());
+            if (!unavailableHosts.isEmpty()) {
+                handleUnavailableHosts(unavailableHosts);
+            }
+        }
+
+        void initializeConnectionSetupForHost(final Host host) {
+            try {
+                hostConnectionPools.put(host, new ConnectionPool(host, 
Cluster.this));
+                host.makeAvailable();
+                loadBalancingStrategy.onNew(host);
+            } catch (RuntimeException ex) {
+                logger.error("Could not initialize connection pool for {}", 
host);
+                throw ex;
+            }
         }
 
-        void trackClient(final Client client) {
-            openedClients.add(new WeakReference<>(client));
+        void handleUnavailableHosts(final List<Host> unavailableHosts) {
+            for (Host host : unavailableHosts) {
+                final CompletableFuture<Void> f = CompletableFuture.runAsync(
+                        () -> host.makeUnavailable(this::tryReInitializeHost), 
hostScheduler);
+                f.exceptionally(t -> {
+                    logger.error("", (t.getCause() == null) ? t : 
t.getCause());
+                    return null;
+                });
+            }
+        }
+
+        public boolean tryReInitializeHost(final Host host) {
+            logger.debug("Trying to re-initiate host connection pool on {}", 
host);
+            try {
+                initializeConnectionSetupForHost(host);
+                return true;
+            } catch (Exception ex) {
+                logger.debug("Failed re-initialization attempt on {}", host, 
ex);
+                return false;
+            }
+        }
+
+        ConnectionPool getPoolFor(final Host host) {
+            return hostConnectionPools.get(host);
+        }
+
+        void trackTransaction(final HttpRemoteTransaction tx) {
+            openTransactions.add(tx);
+        }
+
+        void untrackTransaction(final HttpRemoteTransaction tx) {
+            openTransactions.remove(tx);
         }
 
         public Host add(final InetSocketAddress address) {
@@ -1096,25 +1186,22 @@ public final class Cluster {
             if (closeFuture.get() != null)
                 return closeFuture.get();
 
-            final List<CompletableFuture<Void>> clientCloseFutures = new 
ArrayList<>(openedClients.size());
-            for (WeakReference<Client> openedClient : openedClients) {
-                final Client client = openedClient.get();
-                if (client != null) {
-                    // best to call close() even if the Client is already 
closing so that we can be sure that
-                    // any background client closing operations are included 
in this shutdown future
-                    clientCloseFutures.add(client.closeAsync());
+            // best-effort rollback of any open transactions before closing 
pools snapshot to avoid concurrent
+            // modification since rollback() calls untrackTransaction()
+            new ArrayList<>(openTransactions).forEach(tx -> {
+                try {
+                    tx.rollback();
+                } catch (Exception e) {
+                    logger.warn("Failed to rollback transaction on cluster 
close", e);
                 }
-            }
+            });
+            openTransactions.clear();
 
-            // when all the clients are fully closed then shutdown the netty 
event loop. not sure why this needs to
-            // block here, but if it doesn't then factory.shutdown() below 
doesn't seem to want to ever complete.
-            // ideally, this should all be async, but i guess it wasn't before 
this change so just going to leave it
-            // for now as this really isn't the focus on this change
-            CompletableFuture.allOf(clientCloseFutures.toArray(new 
CompletableFuture[0])).join();
+            // close all connection pools owned by the cluster
+            CompletableFuture.allOf(hostConnectionPools.values().stream().
+                    
map(ConnectionPool::closeAsync).toArray(CompletableFuture[]::new)).join();
 
             final CompletableFuture<Void> closeIt = new CompletableFuture<>();
-            // shutdown the event loop. that shutdown can trigger some final 
jobs to get scheduled so add a listener
-            // to the termination event to shutdown remaining thread pools
             factory.shutdown().awaitUninterruptibly().addListener(f -> {
                 executor.shutdown();
                 hostScheduler.shutdown();
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index cb94b39b87..17b1beef65 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -57,7 +57,6 @@ final class Connection {
     private final URI uri;
     private final AtomicReference<ResultSet> pending = new AtomicReference<>();
     private final Cluster cluster;
-    private final Client client;
     private final ConnectionPool pool;
     private final String creatingThread;
     private final String createdTimestamp;
@@ -81,7 +80,6 @@ final class Connection {
     public Connection(final URI uri, final ConnectionPool pool) throws 
ConnectionException {
         this.uri = uri;
         this.cluster = pool.getCluster();
-        this.client = pool.getClient();
         this.pool = pool;
         this.creatingThread = Thread.currentThread().getName();
         this.createdTimestamp = Instant.now().toString();
@@ -90,9 +88,6 @@ final class Connection {
         if (cluster.isClosing())
             throw new IllegalStateException("Cannot open a connection with the 
cluster after close() is called");
 
-        if (client.isClosing())
-            throw new IllegalStateException("Cannot open a connection with the 
client after close() is called");
-
         final Bootstrap b = this.cluster.getFactory().createBootstrap();
         try {
             channelizer = new Channelizer.HttpChannelizer();
@@ -111,8 +106,6 @@ final class Connection {
                 // if the closeFuture is not set, it means that closeAsync() 
wasn't called
                 if (thisConnection.closeFuture.get() == null) {
                     if 
(!channel.hasAttr(IdleConnectionHandler.IDLE_STATE_EVENT)) {
-                        // if idle state event is not present, it means the 
server closed the channel for some reason.
-                        // it's important to distinguish that difference in 
debugging
                         logger.error(String.format(
                                 "Server closed the Connection on channel %s - 
scheduling removal from %s",
                                 channel.id().asShortText(), 
thisConnection.pool.getPoolInfo(thisConnection)));
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index ee4c34c5aa..3581e1db1a 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -57,7 +57,6 @@ final class ConnectionPool {
 
     public final Host host;
     private final Cluster cluster;
-    private final Client client;
     private final List<Connection> connections;
     private final AtomicInteger open;
     private final Queue<Connection> availableConnections = new 
ConcurrentLinkedQueue<>();
@@ -90,18 +89,17 @@ final class ConnectionPool {
         public void setTimeNow() { timeOfConnectionAttempt = 
System.currentTimeMillis(); }
     }
 
-    public ConnectionPool(final Host host, final Client client) {
-        this(host, client, Optional.empty());
+    public ConnectionPool(final Host host, final Cluster cluster) {
+        this(host, cluster, Optional.empty());
     }
 
-    public ConnectionPool(final Host host, final Client client, final 
Optional<Integer> overrideMaxPoolSize) {
-        this(host, client, overrideMaxPoolSize, new 
ConnectionFactory.DefaultConnectionFactory());
+    public ConnectionPool(final Host host, final Cluster cluster, final 
Optional<Integer> overrideMaxPoolSize) {
+        this(host, cluster, overrideMaxPoolSize, new 
ConnectionFactory.DefaultConnectionFactory());
     }
 
-    ConnectionPool(final Host host, final Client client, final 
Optional<Integer> overrideMaxPoolSize, final ConnectionFactory 
connectionFactory) {
+    ConnectionPool(final Host host, final Cluster cluster, final 
Optional<Integer> overrideMaxPoolSize, final ConnectionFactory 
connectionFactory) {
         this.host = host;
-        this.client = client;
-        this.cluster = client.cluster;
+        this.cluster = cluster;
         this.connectionFactory = connectionFactory;
         poolLabel = "Connection Pool {host=" + host + "}";
 
@@ -207,10 +205,6 @@ final class ConnectionPool {
         }
     }
 
-    Client getClient() {
-        return client;
-    }
-
     Cluster getCluster() {
         return cluster;
     }
@@ -415,7 +409,8 @@ final class ConnectionPool {
             // Assumes that the root cause will give better information about 
why the connection failed.
             
cause.append(ExceptionHelper.getRootCause(res.getFailureCause()).getMessage());
         } else if (open.get() >= maxPoolSize) {
-            cause.append(String.format(Client.TOO_MANY_IN_FLIGHT_REQUESTS, 
open.get(), maxPoolSize));
+            cause.append(String.format("Number of active requests (%s) exceeds 
pool size (%s). " +
+                    "Consider increasing the value for 
maxConnectionPoolSize.", open.get(), maxPoolSize));
         } else {
             cause.setLength(0);
         }
@@ -479,7 +474,7 @@ final class ConnectionPool {
             // pool needs it. for now that seems like an unnecessary added bit 
of complexity for dealing with this
             // error state
             connection = connectionFactory.create(this);
-            final RequestMessage ping = 
client.buildMessage(cluster.validationRequest()).create();
+            final RequestMessage ping = cluster.validationRequest().create();
             final CompletableFuture<ResultSet> f = new CompletableFuture<>();
             connection.write(ping, f);
             f.get().all().get();
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
index a43e186164..c51f2b8516 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
@@ -50,6 +50,7 @@ public final class RequestOptions {
     private final String language;
     private final String materializeProperties;
     private final String bulkResults;
+    private final String transactionId;
 
     private RequestOptions(final Builder builder) {
         this.graphOrTraversalSource = builder.graphOrTraversalSource;
@@ -59,6 +60,7 @@ public final class RequestOptions {
         this.language = builder.language;
         this.materializeProperties = builder.materializeProperties;
         this.bulkResults = builder.bulkResults;
+        this.transactionId = builder.transactionId;
     }
 
     public Optional<String> getG() {
@@ -85,6 +87,8 @@ public final class RequestOptions {
 
     public Optional<String> getBulkResults() { return 
Optional.ofNullable(bulkResults); }
 
+    public Optional<String> getTransactionId() { return 
Optional.ofNullable(transactionId); }
+
     public static Builder build() {
         return new Builder();
     }
@@ -125,6 +129,25 @@ public final class RequestOptions {
         private String materializeProperties = null;
         private String language = null;
         private String bulkResults = null;
+        private String transactionId = null;
+
+        /**
+         * Creates a {@link Builder} populated with the values from the 
provided {@link RequestOptions}.
+         * @param options the options to copy from
+         * @return a {@link Builder} with the copied options
+         */
+        public static Builder from(final RequestOptions options) {
+            final Builder builder = build();
+            builder.graphOrTraversalSource = options.graphOrTraversalSource;
+            builder.parameters = options.parameters;
+            builder.batchSize = options.batchSize;
+            builder.timeout = options.timeout;
+            builder.materializeProperties = options.materializeProperties;
+            builder.language = options.language;
+            builder.bulkResults = options.bulkResults;
+            builder.transactionId = options.transactionId;
+            return builder;
+        }
 
         /**
          * The aliases to set on the request.
@@ -196,6 +219,14 @@ public final class RequestOptions {
             return this;
         }
 
+        /**
+         * Sets the transactionId value to be sent on the request.
+         */
+        public Builder transactionId(final String id) {
+            this.transactionId = id;
+            return this;
+        }
+
         public RequestOptions create() {
             return new RequestOptions(this);
         }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java
new file mode 100644
index 0000000000..3afc8e099d
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import java.util.Map;
+
+/**
+ * Defines the synchronous request submission contract for Gremlin requests.
+ * <p>
+ * This interface is implemented by both {@link Client} and transaction 
classes to ensure a consistent API for
+ * submitting Gremlin scripts. The synchronous nature of these methods means 
they block until the request completes.
+ * <p>
+ * For asynchronous submission, see {@link RequestSubmitterAsync}.
+ */
+public interface RequestSubmitter {
+
+    /**
+     * Submits a Gremlin script and blocks until the response is received.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @return the results of the script execution
+     */
+    ResultSet submit(String gremlin);
+
+    /**
+     * Submits a Gremlin script with bound parameters and blocks until the 
response is received.
+     * <p>
+     * Prefer this method over string concatenation when executing scripts 
with variable
+     * arguments, as parameterized scripts perform better.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @param parameters a map of parameters that will be bound to the script 
on execution
+     * @return the results of the script execution
+     */
+    ResultSet submit(String gremlin, Map<String, Object> parameters);
+
+    /**
+     * Submits a Gremlin script with request options and blocks until the 
response is received.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @param options the options to supply for this request
+     * @return the results of the script execution
+     */
+    ResultSet submit(String gremlin, RequestOptions options);
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java
new file mode 100644
index 0000000000..c94ed5bb72
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Defines the asynchronous request submission contract for Gremlin requests.
+ * <p>
+ * This interface is implemented by {@link Client} to provide non-blocking 
request submission. The returned
+ * {@link CompletableFuture} completes when the write of the request is 
complete, not when the response is received.
+ * <p>
+ * Note: Transaction classes intentionally do not implement this interface 
because transactional operations require
+ * sequential execution to maintain ordering guarantees over HTTP.
+ * <p>
+ * For synchronous submission, see {@link RequestSubmitter}.
+ */
+public interface RequestSubmitterAsync {
+
+    /**
+     * Submits a Gremlin script asynchronously.
+     * <p>
+     * The returned future completes when the write of the request is complete.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @return a future that completes with the results when the request write 
is complete
+     */
+    CompletableFuture<ResultSet> submitAsync(String gremlin);
+
+    /**
+     * Submits a Gremlin script with bound parameters asynchronously.
+     * <p>
+     * Prefer this method over string concatenation when executing scripts 
with variable
+     * arguments, as parameterized scripts perform better.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @param parameters a map of parameters that will be bound to the script 
on execution
+     * @return a future that completes with the results when the request write 
is complete
+     */
+    CompletableFuture<ResultSet> submitAsync(String gremlin, Map<String, 
Object> parameters);
+
+    /**
+     * Submits a Gremlin script with request options asynchronously.
+     *
+     * @param gremlin the Gremlin script to execute
+     * @param options the options to supply for this request
+     * @return a future that completes with the results when the request write 
is complete
+     */
+    CompletableFuture<ResultSet> submitAsync(String gremlin, RequestOptions 
options);
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
index c94d74264d..2b45be91ce 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
@@ -91,6 +91,13 @@ public final class HttpGremlinRequestEncoder extends 
MessageToMessageEncoder<Req
             if (bulkResults) {
                 headersMap.put(Tokens.BULK_RESULTS, "true");
             }
+            
+            // Add X-Transaction-Id header to comply with specification's dual 
transmission (header and body)
+            final String transactionId = 
requestMessage.getField(Tokens.ARGS_TRANSACTION_ID);
+            if (transactionId != null) {
+                headersMap.put("X-Transaction-Id", transactionId);
+            }
+            
             HttpRequest gremlinRequest = new HttpRequest(headersMap, 
requestMessage, uri);
 
             for (final Pair<String, ? extends RequestInterceptor> interceptor 
: interceptors) {
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
index c8d56814c4..c909e48604 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
@@ -248,15 +248,14 @@ public class DriverRemoteConnection implements 
RemoteConnection {
     }
 
     /**
-     * Constructs a new {@link DriverRemoteTransaction}. Not yet supported in 
TinkerPop 4.
+     * Creates a new {@link HttpRemoteTransaction} for executing transactional 
operations.
+     *
+     * @return A new {@link HttpRemoteTransaction}
      */
-//    @Override
-//    public Transaction tx() {
-//        // todo: not implemented
-//        final DriverRemoteConnection session = new DriverRemoteConnection(
-//                client.getCluster().connect(), remoteTraversalSourceName, 
true);
-//        return new DriverRemoteTransaction(session);
-//    }
+    @Override
+    public Transaction tx() {
+        return client.getCluster().transact(remoteTraversalSourceName);
+    }
 
     @Override
     public String toString() {
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java
new file mode 100644
index 0000000000..9ee3b7c934
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.remote;
+
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Host;
+import org.apache.tinkerpop.gremlin.driver.RemoteTransaction;
+import org.apache.tinkerpop.gremlin.driver.RequestOptions;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
+import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.structure.util.TransactionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * A {@link Transaction} implementation for HTTP-based remote connections.
+ * <p>
+ * This class provides synchronous, sequential request execution within a 
transaction context.
+ * All requests are pinned to a single host and include the transaction ID 
(after begin()).
+ * <p>
+ * Key characteristics:
+ * <ul>
+ *   <li>Synchronous API only - no submitAsync() methods</li>
+ *   <li>Host pinning - all requests go to the same server</li>
+ *   <li>Sequential execution - requests block until complete</li>
+ * </ul>
+ * <p>
+ * Usage:
+ * <pre>
+ * Transaction tx = cluster.transact("g");
+ * GraphTraversalSource gtx = tx.begin();
+ * gtx.addV("person").property("name", "alice").iterate();
+ * tx.commit();
+ * </pre>
+ *
+ * This class is <b>NOT</b> thread-safe.
+ */
+public class HttpRemoteTransaction implements RemoteTransaction {
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpRemoteTransaction.class);
+    private static final long CLOSING_MAX_WAIT_MS = 10000;
+
+    protected Consumer<Transaction> closeConsumer = CLOSE_BEHAVIOR.COMMIT;
+    private final Client.PinnedClient pinnedClient;
+    private final Cluster cluster;
+    private final Host pinnedHost;
+    private final String graphAlias;
+    private String transactionId;  // null until begin(), set from server 
response
+    private TransactionState state = TransactionState.NOT_STARTED;
+
+    private enum TransactionState {
+        NOT_STARTED, OPEN, CLOSED
+    }
+
+    /**
+     * Creates a new HTTP transaction.
+     * <p>
+     * The transaction is not started until {@link #begin(Class)} is called.
+     * A host is selected at creation time and all requests will be pinned to 
it.
+     *
+     * @param pinnedClient the underlying client for connection access
+     * @param graphAlias the graph/traversal source alias (e.g., "g")
+     * @throws NoHostAvailableException if no hosts are available in the 
cluster
+     */
+    public HttpRemoteTransaction(final Client.PinnedClient pinnedClient, final 
String graphAlias) {
+        this.pinnedClient = pinnedClient;
+        this.graphAlias = graphAlias;
+        this.pinnedHost = pinnedClient.getPinnedHost();
+        this.cluster = pinnedClient.getCluster();
+    }
+
+    /**
+     * Not supported for remote transactions. Use {@link #begin(Class)} 
instead.
+     *
+     * @throws UnsupportedOperationException always
+     */
+    @Override
+    public void open() {
+        begin();
+    }
+
+    /**
+     * Starts a transaction and returns a traversal source bound to it.
+     * <p>
+     * This method sends {@code g.tx().begin()} to the server, which returns
+     * the transaction ID. All subsequent requests will include this ID.
+     *
+     * @param traversalSourceClass the class of the traversal source to create
+     * @param <T> the type of the traversal source
+     * @return a new traversal source bound to this transaction
+     * @throws IllegalStateException if the transaction is already started
+     * @throws RuntimeException if the transaction fails to begin
+     */
+    @Override
+    public <T extends TraversalSource> T begin(final Class<T> 
traversalSourceClass) {
+        if (state != TransactionState.NOT_STARTED) {
+            throw new IllegalStateException("Transaction already started");
+        }
+        cluster.trackTransaction(this);
+
+        try {
+            // Send begin - no txId attached yet
+            final ResultSet rs = submitInternal("g.tx().begin()");
+            
+            // Server returns the transaction ID
+            this.transactionId = extractTransactionId(rs);
+            this.state = TransactionState.OPEN;
+        } catch (Exception e) {
+            cleanUp();
+            throw new RuntimeException("Failed to begin transaction: " + 
e.getMessage(), e);
+        }
+
+        // Create RemoteConnection for the traversal source
+        final TransactionRemoteConnection txConnection = new 
TransactionRemoteConnection(this);
+
+        try {
+            return 
traversalSourceClass.getConstructor(RemoteConnection.class).newInstance(txConnection);
+        } catch (Exception e) {
+            rollback();
+            throw new IllegalStateException("Failed to create 
TraversalSource", e);
+        }
+    }
+
+    /**
+     * Extracts the transaction ID from the begin() response.
+     * <p>
+     * The server returns the transaction ID as part of the response to 
g.tx().begin().
+     *
+     * @param rs the result set from the begin request
+     * @return the transaction ID
+     */
+    private String extractTransactionId(final ResultSet rs) {
+        // Wait for all results and extract the transaction ID
+        final List<Result> results = rs.all().join();
+        if (results.isEmpty()) {
+            throw new IllegalStateException("Server did not return transaction 
ID");
+        }
+        // The transaction ID is returned as the result of g.tx().begin()
+        final Object id = results.get(0).get(Map.class).get("transactionId");
+        if (id == null) throw new IllegalStateException("Server did not return 
transaction ID");
+
+        final String idStr = id.toString();
+        if (idStr.isBlank()) throw new IllegalStateException("Server returned 
empty transaction ID");
+
+        return idStr;
+    }
+
+    /**
+     * Commits the transaction.
+     * <p>
+     * Sends {@code g.tx().commit()} to the server and closes the transaction.
+     *
+     * @throws IllegalStateException if the transaction is not open
+     * @throws RuntimeException if the commit fails
+     */
+    @Override
+    public void commit() {
+        closeRemoteTransaction("g.tx().commit()");
+    }
+
+    /**
+     * Rolls back the transaction.
+     * <p>
+     * Sends {@code g.tx().rollback()} to the server and closes the 
transaction.
+     * This is best-effort - errors are logged but not thrown.
+     */
+    @Override
+    public void rollback() {
+        closeRemoteTransaction("g.tx().rollback()");
+    }
+
+    private void closeRemoteTransaction(final String closeScript) {
+        if (state != TransactionState.OPEN) throw new 
IllegalStateException("Transaction is not open");
+
+        try {
+            submitInternal(closeScript).all().get(CLOSING_MAX_WAIT_MS, 
TimeUnit.MILLISECONDS);
+            cleanUp();
+        } catch (Exception e) {
+            logger.warn("Failed to {} transaction on {}", closeScript, 
pinnedHost);
+            throw new TransactionException("Failed to " + closeScript, e);
+        }
+    }
+
+    private void cleanUp() {
+        state = TransactionState.CLOSED;
+        cluster.untrackTransaction(this);
+    }
+
+    /**
+     * Returns the server-generated transaction ID, or {@code null} if the 
transaction
+     * has not yet been started via {@link #begin(Class)}.
+     *
+     * @return the transaction ID, or null if not yet begun
+     */
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return state == TransactionState.OPEN;
+    }
+
+    @Override
+    public void readWrite() {
+        throw new UnsupportedOperationException("Remote transaction behaviors 
are not configurable - they are always manually controlled");
+    }
+
+    @Override
+    public void close() {
+        closeConsumer.accept(this);
+        
+        // this is just for safety in case of custom closeConsumer but should 
normally be handled by commit/rollback
+        cleanUp();
+    }
+
+    @Override
+    public Transaction onReadWrite(final Consumer<Transaction> consumer) {
+        throw new UnsupportedOperationException("Remote transaction behaviors 
are not configurable - they are always manually controlled");
+    }
+
+    @Override
+    public Transaction onClose(final Consumer<Transaction> consumer) {
+        this.closeConsumer = consumer;
+        return this;
+    }
+
+    @Override
+    public void addTransactionListener(final Consumer<Status> listener) {
+        throw new UnsupportedOperationException("Remote transactions cannot 
have listeners attached");
+    }
+
+    @Override
+    public void removeTransactionListener(final Consumer<Status> listener) {
+        throw new UnsupportedOperationException("Remote transactions cannot 
have listeners attached");
+    }
+
+    @Override
+    public void clearTransactionListeners() {
+        throw new UnsupportedOperationException("Remote transactions cannot 
have listeners attached");
+    }
+
+    @Override
+    public ResultSet submit(final String gremlin) {
+        return submit(gremlin, RequestOptions.EMPTY);
+    }
+
+    @Override
+    public ResultSet submit(final String gremlin, final Map<String, Object> 
parameters) {
+        final RequestOptions.Builder builder = RequestOptions.build();
+        if (parameters != null && !parameters.isEmpty()) {
+            parameters.forEach(builder::addParameter);
+        }
+        return submit(gremlin, builder.create());
+    }
+
+    @Override
+    public ResultSet submit(final String gremlin, final RequestOptions 
options) {
+        if (state != TransactionState.OPEN) {
+            throw new IllegalStateException("Transaction is not open");
+        }
+        return submitInternal(gremlin, options);
+    }
+
+    private ResultSet submitInternal(final String gremlin) {
+        return submitInternal(gremlin, RequestOptions.EMPTY);
+    }
+
+    // synchronized here is a bit defensive but ensures that even if a user 
accidentally uses this in different threads,
+    // the server will still receive the requests in the correct order
+    private synchronized ResultSet submitInternal(final String gremlin, final 
RequestOptions options) {
+        final RequestOptions.Builder builder = 
RequestOptions.Builder.from(options);
+        if (graphAlias != null) {
+            // Don't allow per-request override of "g" as transactions should 
only target a single Graph instance.
+            builder.addG(graphAlias);
+        }
+
+        // Attach txId if we have one (not present for begin())
+        if (transactionId != null) {
+            builder.transactionId(transactionId);
+        }
+
+        try {
+            return pinnedClient.submit(gremlin, builder.create());
+        } catch (Exception e) {
+            throw new RuntimeException("Transaction request failed: " + 
e.getMessage(), e);
+        }
+    }
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java
new file mode 100644
index 0000000000..0f9e903dd8
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.remote;
+
+import org.apache.tinkerpop.gremlin.driver.RequestOptions;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
+import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.GremlinLang;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.tinkerpop.gremlin.driver.RequestOptions.getRequestOptions;
+
+/**
+ * A {@link RemoteConnection} that routes all submissions through an {@link 
HttpRemoteTransaction}.
+ * <p>
+ * This connection adapts the synchronous transaction API to the async {@link 
RemoteConnection}
+ * interface required by {@link 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource}.
+ * <p>
+ * Key characteristics:
+ * <ul>
+ *   <li>Submissions are synchronous internally (via {@link 
HttpRemoteTransaction#submit})</li>
+ *   <li>Returns completed futures to satisfy the async interface</li>
+ *   <li>Transaction ID is automatically attached by the transaction</li>
+ * </ul>
+ */
+class TransactionRemoteConnection implements RemoteConnection {
+
+    private final HttpRemoteTransaction transaction;
+
+    /**
+     * Creates a new connection bound to the specified transaction.
+     *
+     * @param transaction the transaction that owns this connection
+     */
+    TransactionRemoteConnection(final HttpRemoteTransaction transaction) {
+        this.transaction = transaction;
+    }
+
+    /**
+     * Submits a traversal through the transaction.
+     * <p>
+     * The submission is synchronous internally but returns a completed future
+     * to satisfy the {@link RemoteConnection} interface.
+     *
+     * @param gremlinLang the traversal to submit
+     * @param <E> the type of elements returned by the traversal
+     * @return a completed future with the traversal results
+     * @throws RemoteConnectionException if the transaction is not open or 
submission fails
+     */
+    @Override
+    public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final 
GremlinLang gremlinLang)
+            throws RemoteConnectionException {
+        if (!transaction.isOpen()) {
+            throw new RemoteConnectionException("Transaction is not open");
+        }
+
+        try {
+            // Synchronous submission through transaction
+            final ResultSet rs = transaction.submit(gremlinLang.getGremlin(), 
getRequestOptions(gremlinLang));
+
+            final RemoteTraversal<?, E> traversal = new 
DriverRemoteTraversal<>(rs,
+                null,   // client not needed for iteration
+                false,  // attachElements
+                Optional.empty());
+
+            return CompletableFuture.completedFuture(traversal);
+        } catch (Exception e) {
+            throw new RemoteConnectionException(e);
+        }
+    }
+
+    /**
+     * Returns the owning transaction.
+     *
+     * @return the transaction that owns this connection
+     */
+    @Override
+    public Transaction tx() {
+        return transaction;
+    }
+
+    /**
+     * No-op close implementation.
+     * <p>
+     * The transaction manages its own lifecycle - users should call
+     * {@link Transaction#commit()} or {@link Transaction#rollback()} 
explicitly.
+     */
+    @Override
+    public void close() {
+        // Transaction manages its own lifecycle - don't close it here
+    }
+}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
new file mode 100644
index 0000000000..23443c1b3e
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link Client} subclasses verifying the refactored host 
selection
+ * and connection pool ownership model.
+ */
+public class ClientTest {
+
+    /**
+     * ClusteredClient.selectHost() should return the host from the load 
balancing strategy.
+     */
+    @Test
+    public void shouldSelectHostViaLoadBalancingStrategy() {
+        final Host expectedHost = mock(Host.class);
+        final LoadBalancingStrategy strategy = 
mock(LoadBalancingStrategy.class);
+        
when(strategy.select(any())).thenReturn(List.of(expectedHost).iterator());
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.loadBalancingStrategy()).thenReturn(strategy);
+
+        final Client.ClusteredClient client = new 
Client.ClusteredClient(cluster);
+        final RequestMessage msg = RequestMessage.build("g.V()").create();
+
+        final Host selected = client.selectHost(msg);
+        assertSame(expectedHost, selected);
+    }
+
+    /**
+     * When the load balancing strategy returns an empty iterator, 
ClusteredClient.selectHost()
+     * should fall back to Cluster.randomHost().
+     */
+    @Test
+    public void shouldFallbackToRandomHostWhenStrategyReturnsEmpty() {
+        final Host fallbackHost = mock(Host.class);
+        final LoadBalancingStrategy strategy = 
mock(LoadBalancingStrategy.class);
+        when(strategy.select(any())).thenReturn(Collections.emptyIterator());
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.loadBalancingStrategy()).thenReturn(strategy);
+        when(cluster.randomHost()).thenReturn(fallbackHost);
+
+        final Client.ClusteredClient client = new 
Client.ClusteredClient(cluster);
+        final RequestMessage msg = RequestMessage.build("g.V()").create();
+
+        final Host selected = client.selectHost(msg);
+        assertSame(fallbackHost, selected);
+    }
+
+    /**
+     * PinnedClient.selectHost() should always return the pinned host 
regardless of the message.
+     */
+    @Test
+    public void shouldAlwaysReturnPinnedHost() {
+        final Host pinnedHost = mock(Host.class);
+        final Cluster cluster = mock(Cluster.class);
+
+        final Client.PinnedClient client = new Client.PinnedClient(cluster, 
pinnedHost);
+
+        final RequestMessage msg1 = RequestMessage.build("g.V()").create();
+        final RequestMessage msg2 = RequestMessage.build("g.E()").create();
+
+        assertSame(pinnedHost, client.selectHost(msg1));
+        assertSame(pinnedHost, client.selectHost(msg2));
+        assertSame(pinnedHost, client.selectHost(null));
+    }
+
+    /**
+     * AliasClusteredClient.selectHost() should delegate to the underlying 
client's selectHost().
+     */
+    @Test
+    public void shouldDelegateSelectHostInAliasClient() {
+        final Host expectedHost = mock(Host.class);
+        final LoadBalancingStrategy strategy = 
mock(LoadBalancingStrategy.class);
+        
when(strategy.select(any())).thenReturn(List.of(expectedHost).iterator());
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.loadBalancingStrategy()).thenReturn(strategy);
+
+        final Client.ClusteredClient underlying = new 
Client.ClusteredClient(cluster);
+        final Client.AliasClusteredClient aliasClient = new 
Client.AliasClusteredClient(underlying, "g");
+
+        final RequestMessage msg = RequestMessage.build("g.V()").create();
+        final Host selected = aliasClient.selectHost(msg);
+        assertSame(expectedHost, selected);
+    }
+
+    /**
+     * ClusteredClient.closeAsync() should return a completed future without 
closing any pools.
+     */
+    @Test
+    public void shouldNotClosePoolsOnClusteredClientClose() {
+        final Cluster cluster = mock(Cluster.class);
+        final Client.ClusteredClient client = new 
Client.ClusteredClient(cluster);
+
+        assertFalse(client.isClosing());
+
+        final java.util.concurrent.CompletableFuture<Void> future = 
client.closeAsync();
+        assertNotNull(future);
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertTrue(client.isClosing());
+
+        // calling again should return the same future
+        assertSame(future, client.closeAsync());
+    }
+
+    /**
+     * PinnedClient.closeAsync() should return a completed future without 
closing any pools.
+     */
+    @Test
+    public void shouldNotClosePoolsOnPinnedClientClose() {
+        final Host host = mock(Host.class);
+        final Cluster cluster = mock(Cluster.class);
+        final Client.PinnedClient client = new Client.PinnedClient(cluster, 
host);
+
+        assertFalse(client.isClosing());
+
+        final java.util.concurrent.CompletableFuture<Void> future = 
client.closeAsync();
+        assertNotNull(future);
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertTrue(client.isClosing());
+
+        // calling again should return the same future
+        assertSame(future, client.closeAsync());
+    }
+}
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
new file mode 100644
index 0000000000..2198db5525
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.RemoteTransaction;
+import org.apache.tinkerpop.gremlin.driver.RequestOptions;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
+import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for HTTP remote transactions using the driver API ({@code 
Cluster.transact()}).
+ * 
+ * Tests exercise {@link RemoteTransaction} directly via its {@code submit()} 
methods.
+ * 
+ * Server-side verification tests (raw HTTP) are in {@link 
GremlinServerHttpTransactionIntegrateTest}.
+ */
+public class GremlinDriverTransactionIntegrateTest extends 
AbstractGremlinServerIntegrationTest {
+    private static final String GTX = "gtx";
+    private static final int MAX_GET_WAIT = 5000;
+
+    private Cluster cluster;
+
+    @Before
+    public void openCluster() {
+        cluster = TestClientFactory.open();
+    }
+
+    @After
+    public void closeCluster() throws Exception {
+        if (cluster != null) cluster.close();
+    }
+
+    @Override
+    public Settings overrideSettings(final Settings settings) {
+        settings.channelizer = HttpChannelizer.class.getName();
+        final String nameOfTest = name.getMethodName();
+        switch (nameOfTest) {
+            case "shouldEnforceMaxConcurrentTransactions":
+                settings.maxConcurrentTransactions = 1;
+                break;
+            case "shouldTimeoutIdleTransaction":
+            case "shouldTimeoutIdleTransactionWithNoOperations":
+            case "shouldRejectLateCommitAfterTimeout":
+                settings.transactionTimeout = 1000;
+                break;
+            case "shouldTimeoutOnlyIdleTransactionNotActiveOne":
+                settings.transactionTimeout = 2000;
+                break;
+        }
+        return settings;
+    }
+
+    /**
+     * Begin a transaction, add a vertex, verify isolation, commit, verify 
isOpen transitions, and verify data persists
+     * after commit.
+     */
+    @Test
+    public void shouldCommitTransaction() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+        // #4: isOpen true after begin
+        assertTrue(tx.isOpen());
+
+        tx.submit("g.addV('person').property('name','alice')");
+
+        // #6: uncommitted data not visible outside the transaction
+        assertEquals(0L, 
client.submit("g.V().hasLabel('person').count()").one().getLong());
+
+        tx.commit();
+        // #4: isOpen false after commit
+        assertFalse(tx.isOpen());
+        // #1, #7: committed data visible to non-transactional reads
+        assertEquals(1L, 
client.submit("g.V().hasLabel('person').count()").one().getLong());
+
+        client.close();
+    }
+
+    @Test
+    public void shouldRollbackTransaction() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+        assertTrue(tx.isOpen());
+
+        tx.submit("g.addV('person').property('name','bob')");
+
+        tx.rollback();
+        // #5: isOpen false after rollback
+        assertFalse(tx.isOpen());
+        // #2: data discarded after rollback
+        assertEquals(0L, 
client.submit("g.V().hasLabel('person').count()").one().getLong());
+
+        client.close();
+    }
+
+    @Test
+    public void shouldSupportIntraTransactionConsistency() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+        tx.submit("g.addV('test').property('name','A')");
+        // #8: read-your-own-writes — vertex A visible within the transaction
+        assertEquals(1L, 
tx.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong());
+
+        tx.submit("g.addV('test').property('name','B')");
+        
tx.submit("g.V().has('name','A').addE('knows').to(V().has('name','B'))");
+
+        // verify the full subgraph is visible within the transaction before 
commit
+        assertEquals(2L, 
tx.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong());
+        assertEquals(1L, 
tx.submit("g.V().outE('knows').count()").all().get().get(0).getLong());
+
+        tx.commit();
+
+        // #3: vertices and edges persist after commit
+        assertEquals(2L, 
client.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong());
+        assertEquals(1L, 
client.submit("g.E().hasLabel('knows').count()").all().get().get(0).getLong());
+    }
+
+    @Test
+    public void shouldThrowOnSubmitAfterCommit() throws Exception {
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+
+        tx.submit("g.addV()");
+        tx.commit();
+
+        try {
+            tx.submit("g.V().count()");
+            fail("Expected exception on submit after commit");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction is not 
open"));
+        }
+    }
+
+    @Test
+    public void shouldThrowOnSubmitAfterRollback() throws Exception {
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+
+        tx.submit("g.addV()");
+        tx.rollback();
+
+        try {
+            tx.submit("g.V().count()");
+            fail("Expected exception on submit after rollback");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction is not 
open"));
+        }
+    }
+
+    @Test
+    public void shouldThrowOnDoubleBegin() throws Exception {
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+
+        try {
+            tx.begin();
+            fail("Expected IllegalStateException on second begin()");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction already 
started"));
+        }
+    }
+
+    @Test
+    public void shouldThrowOnCommitWhenNotOpen() throws Exception {
+        final RemoteTransaction tx = cluster.transact(GTX);
+        assertFalse(tx.isOpen());
+
+        try {
+            tx.commit();
+            fail("Expected IllegalStateException on commit when not open");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction is not 
open"));
+        }
+    }
+
+    @Test
+    public void shouldThrowOnRollbackWhenNotOpen() throws Exception {
+        final RemoteTransaction tx = cluster.transact(GTX);
+        assertFalse(tx.isOpen());
+
+        try {
+            tx.rollback();
+            fail("Expected IllegalStateException on rollback when not open");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction is not 
open"));
+        }
+    }
+
+    @Test
+    public void shouldReturnNullTransactionIdBeforeBegin() throws Exception {
+        final RemoteTransaction tx = cluster.transact(GTX);
+
+        // before begin, transactionId should be null
+        assertNull(tx.getTransactionId());
+
+        tx.begin();
+        // after begin, transactionId should be non-null and non-blank
+        assertNotNull(tx.getTransactionId());
+        assertFalse(tx.getTransactionId().isBlank());
+    }
+
+    @Test
+    public void shouldCommitOnCloseByDefault() throws Exception {
+        final RemoteTransaction tx1 = cluster.transact(GTX);
+        tx1.begin();
+        tx1.submit("g.addV('person').property('name','close_test')");
+        // close() should trigger default COMMIT behavior
+        tx1.close();
+        assertFalse(tx1.isOpen());
+
+        final RemoteTransaction tx2 = cluster.transact(GTX);
+        tx2.begin();
+        assertEquals(1L, 
tx2.submit("g.V().hasLabel('person').count()").one().getLong());
+    }
+
+    @Test
+    public void shouldRollbackOnCloseWhenConfigured() throws Exception {
+        final RemoteTransaction tx1 = cluster.transact(GTX);
+        tx1.onClose(Transaction.CLOSE_BEHAVIOR.ROLLBACK);
+        tx1.begin();
+        tx1.submit("g.addV('person').property('name','rollback_close_test')");
+        tx1.close();
+        assertFalse(tx1.isOpen());
+
+        final RemoteTransaction tx2 = cluster.transact(GTX);
+        tx2.begin();
+        assertEquals(0L, 
tx2.submit("g.V().hasLabel('person').count()").one().getLong());
+    }
+
+    @Test
+    public void shouldRollbackOpenTransactionsOnClusterClose() throws 
Exception {
+        final RemoteTransaction tx1 = cluster.transact(GTX);
+        tx1.begin();
+        tx1.submit("g.addV('cluster-close')");
+        tx1.submit("g.addV('cluster-close')");
+
+        final RemoteTransaction tx2 = cluster.transact(GTX);
+        tx2.begin();
+        tx2.submit("g.addV('cluster-close')");
+
+        // close cluster without committing — should rollback open transactions
+        cluster.close();
+        cluster = null;
+
+        // reconnect and verify data was not persisted
+        final Cluster cluster2 = TestClientFactory.open();
+        try {
+            final RemoteTransaction cluster2tx1 = cluster2.transact(GTX);
+            cluster2tx1.begin();
+            assertEquals(0L, 
cluster2tx1.submit("g.V().hasLabel('person').count()").all().get().get(0).getLong());
+        } finally {
+            cluster2.close();
+        }
+    }
+
+    @Test
+    public void shouldEnforceMaxConcurrentTransactions() throws Exception {
+        // first transaction fills the single slot
+        final RemoteTransaction tx1 = cluster.transact(GTX);
+        tx1.begin();
+
+        // second transaction should fail
+        try {
+            final RemoteTransaction tx2 = cluster.transact(GTX);
+            tx2.begin();
+            fail("Expected exception when max concurrent transactions 
exceeded");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Maximum concurrent 
transactions exceeded"));
+        }
+    }
+
+    @Test
+    public void shouldTimeoutIdleTransaction() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+        tx.submit("g.addV('timeout_test')");
+
+        // wait for the transaction to timeout
+        Thread.sleep(2000);
+
+        // the transaction was rolled back server-side; attempting to commit 
should fail
+        try {
+            tx.commit();
+            fail("Expected exception on commit after server-side timeout");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Transaction not 
found"));
+        }
+
+        // verify data was not persisted
+        assertEquals(0L, 
client.submit("g.V().hasLabel('timeout_test').count()").all().get().get(0).getLong());
+    }
+
+    @Test
+    public void shouldTimeoutIdleTransactionWithNoOperations() throws 
Exception {
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+
+        // wait for the transaction to timeout
+        Thread.sleep(2000);
+
+        // attempting to commit should fail because the server rolled back
+        try {
+            tx.commit();
+            fail("Expected exception on commit after server-side timeout");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Transaction not 
found"));
+        }
+    }
+
+    @Test
+    public void shouldTimeoutOnlyIdleTransactionNotActiveOne() throws 
Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final RemoteTransaction txActive = cluster.transact(GTX);
+        txActive.begin();
+        txActive.submit("g.addV('active')");
+
+        final RemoteTransaction txIdle = cluster.transact(GTX);
+        txIdle.begin();
+        txIdle.submit("g.addV('idle')");
+
+        // keep the active transaction alive by sending requests at intervals 
shorter than timeout
+        for (int i = 0; i < 3; i++) {
+            Thread.sleep(800);
+            txActive.submit("g.V().count()");
+        }
+
+        // by now the idle transaction should have timed out (2000ms elapsed)
+        // the active transaction should still be alive
+        txActive.commit();
+
+        // verify active transaction's data persisted
+        assertEquals(1L, 
client.submit("g.V().hasLabel('active').count()").all().get().get(0).getLong());
+
+        // idle transaction should have been rolled back by timeout
+        try {
+            txIdle.commit();
+            fail("Expected exception on commit of timed-out idle transaction");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Transaction not 
found"));
+        }
+
+        // verify idle transaction's data was not persisted
+        assertEquals(0L, 
client.submit("g.V().hasLabel('idle').count()").all().get().get(0).getLong());
+    }
+
+    @Test
+    public void shouldRejectLateCommitAfterTimeout() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+        tx.submit("g.addV('person').property('name','late_commit')");
+
+        // wait for timeout
+        Thread.sleep(2000);
+
+        // attempt commit — should fail because server already rolled back
+        try {
+            tx.commit();
+            fail("Expected exception on late commit after timeout");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Transaction not 
found"));
+        }
+
+        // verify data was not persisted
+        assertEquals(0L, 
client.submit("g.V().hasLabel('person').count()").all().get().get(0).getLong());
+    }
+
+    @Test
+    public void shouldIsolateConcurrentTransactions() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final RemoteTransaction tx1 = cluster.transact(GTX);
+        tx1.begin();
+        final RemoteTransaction tx2 = cluster.transact(GTX);
+        tx2.begin();
+
+        tx1.submit("g.addV('tx1')");
+        tx2.submit("g.addV('tx2')");
+
+        // tx1 should not see tx2's data and vice versa
+        assertEquals(0L, 
tx1.submit("g.V().hasLabel('tx2').count()").all().get().get(0).getLong());
+        assertEquals(0L, 
tx2.submit("g.V().hasLabel('tx1').count()").all().get().get(0).getLong());
+
+        tx1.commit();
+        tx2.commit();
+
+        // both should be visible after commit
+        assertEquals(1L, 
client.submit("g.V().hasLabel('tx1').count()").all().get().get(0).getLong());
+        assertEquals(1L, 
client.submit("g.V().hasLabel('tx2').count()").all().get().get(0).getLong());
+    }
+
+    @Test
+    public void shouldOpenAndCloseManyTransactionsSequentially() throws 
Exception {
+        final Client client = cluster.connect().alias(GTX);
+        final long numberOfTransactions = 50;
+
+        for (int i = 0; i < numberOfTransactions; i++) {
+            final RemoteTransaction tx = cluster.transact(GTX);
+            tx.begin();
+            tx.submit("g.addV('stress')");
+            tx.commit();
+        }
+
+        final long count = 
client.submit("g.V().hasLabel('stress').count()").all().get().get(0).getLong();
+        assertEquals(numberOfTransactions, count);
+
+        Thread.sleep(100);
+        // this should be 0, but to prevent flakiness, make it a reasonable 
number less than numberOfTransactions
+        
assertTrue(server.getServerGremlinExecutor().getTransactionManager().getActiveTransactionCount()
 < 35);
+    }
+
+    @Test
+    public void shouldIsolateTransactionalAndNonTransactionalRequests() throws 
Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+        tx.submit("g.addV('tx_data')");
+
+        // non-transactional read should not see uncommitted tx data
+        assertEquals(0L, 
client.submit("g.V().hasLabel('tx_data').count()").all().get().get(0).getLong());
+
+        tx.commit();
+
+        // now the data should be visible
+        assertEquals(1L, 
client.submit("g.V().hasLabel('tx_data').count()").all().get().get(0).getLong());
+    }
+
+    @Test
+    public void shouldRejectBeginOnNonTransactionalGraph() throws Exception {
+        final RemoteTransaction tx = cluster.transact("gclassic");
+        try {
+            tx.begin();
+            fail("Expected exception when beginning transaction on 
non-transactional graph");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionHelper.getRootCause(ex);
+            assertThat(root.getMessage(), containsString("Graph does not 
support transactions"));
+        }
+    }
+
+    @Test
+    public void shouldTargetCorrectGraph() throws Exception {
+        final Client client = cluster.connect();
+
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+
+        tx.submit("g.addV('routed')");
+        tx.commit();
+
+        // vertex should exist in the transactional graph (gtx)
+        assertEquals(1L, client.submit("g.V().hasLabel('routed').count()",
+                
RequestOptions.build().addG(GTX).create()).all().get().get(0).getLong());
+
+        // vertex should NOT exist in the classic graph (gclassic)
+        assertEquals(0L, client.submit("g.V().hasLabel('routed').count()",
+                
RequestOptions.build().addG("gclassic").create()).all().get().get(0).getLong());
+    }
+
+    @Test
+    public void shouldAutoCommitNonTransactionalWrite() throws Exception {
+        final Client client = cluster.connect();
+        client.submit("g.addV('auto')").all().get();
+        assertEquals(1L, 
client.submit("g.V().hasLabel('auto').count()").one().getLong());
+    }
+
+    @Test
+    public void shouldUseProvidedRequestOptions() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+        final RequestOptions ro = RequestOptions.build().
+                addParameter("x", "vowels").
+                materializeProperties(Tokens.MATERIALIZE_PROPERTIES_TOKENS).
+                create();
+        final Vertex v = tx.submit("g.addV(x).property('a', 'b')", ro).
+                all().
+                get(MAX_GET_WAIT, TimeUnit.MILLISECONDS).
+                get(0).
+                getVertex();
+        assertFalse(v.properties().hasNext());
+        tx.commit();
+
+        final List<Result> results =
+                
client.submit("g.V().hasLabel('vowels')").all().get(MAX_GET_WAIT, 
TimeUnit.MILLISECONDS);
+        assertEquals(1L, results.size());
+    }
+
+    @Test
+    public void shouldUseProvidedParameters() throws Exception {
+        final Client client = cluster.connect().alias(GTX);
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+        final Map params = new HashMap();
+        params.put("x", "consonants");
+        tx.submit("g.addV(x)", params);
+        tx.commit();
+
+        final List<Result> results =
+                
client.submit("g.V().hasLabel('consonants')").all().get(MAX_GET_WAIT, 
TimeUnit.MILLISECONDS);
+        assertEquals(1L, results.size());
+    }
+
+    @Test
+    public void shouldCleanUpOnBeginFailure() throws Exception {
+        final RemoteTransaction tx = cluster.transact("gclassic");
+
+        try {
+            tx.begin();
+            fail("Expected exception on begin for non-transactional graph");
+        } catch (RuntimeException ex) {
+            assertThat(ex.getMessage(), containsString("Failed to begin 
transaction"));
+        }
+
+        // verify cleanup: transaction is closed and has no ID
+        assertFalse(tx.isOpen());
+        assertNull(tx.getTransactionId());
+
+        // second begin should fail — state moved to CLOSED, not back to 
NOT_STARTED
+        try {
+            tx.begin();
+            fail("Expected IllegalStateException on begin after failed begin");
+        } catch (IllegalStateException ex) {
+            assertThat(ex.getMessage(), containsString("Transaction already 
started"));
+        }
+    }
+
+    @Test
+    public void shouldKeepTransactionOpenAfterTraversalError() throws 
Exception {
+        final Client client = cluster.connect().alias(GTX);
+
+        final RemoteTransaction tx = cluster.transact(GTX);
+        tx.begin();
+        tx.submit("g.addV('good_vertex')");
+
+        // submit a bad traversal that should fail
+        try {
+            tx.submit("g.V().fail()");
+        } catch (Exception ex) {
+            // expected error from bad traversal
+        }
+
+        // transaction should still be open — rollback should work
+        assertTrue(tx.isOpen());
+        tx.rollback();
+
+        assertFalse(tx.isOpen());
+        assertEquals(0L, 
client.submit("g.V().hasLabel('good_vertex').count()").all().get().get(0).getLong());
+    }
+
+    @Test
+    public void shouldWorkWithDriverRemoteConnection() throws Exception {
+        final GraphTraversalSource g = 
traversal().with(DriverRemoteConnection.using(cluster, GTX));
+        final GraphTraversalSource gtx = g.tx().begin();
+        gtx.addV("val").iterate();
+        gtx.tx().commit();
+
+        assertEquals(1L, g.V().hasLabel("val").count().next().longValue());
+    }
+}

Reply via email to