This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch java-tx-sync
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit e35801e9d1bbe6d76f5277ca5b1395c0bc897d72
Author: Ken Hu <[email protected]>
AuthorDate: Mon Jun 29 12:13:51 2026 -0700

    Make Java RemoteTransaction submit() wait for server response headers
    
    This is a case where implementation didn't match documented and
    expected behavior. Transactions need to be semi-blocking in order
    for them to reach the server in the exact same order. It was
    incorrectly assumed that the Java GLV already did this so this
    just changes it to actually do that.
    
    Assisted-by: Claude Code:claude-opus-4-8
---
 .../apache/tinkerpop/gremlin/driver/ResultSet.java | 32 ++++++++
 .../handler/HttpStreamingResponseHandler.java      | 10 +++
 .../driver/remote/HttpRemoteTransaction.java       | 21 +++++-
 .../tinkerpop/gremlin/driver/ResultSetTest.java    | 85 ++++++++++++++++++++++
 .../handler/HttpStreamingResponseHandlerTest.java  | 62 ++++++++++++++++
 5 files changed, 209 insertions(+), 1 deletion(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index b5d506bc1f..d67f16adc0 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -58,6 +58,10 @@ public final class ResultSet implements Iterable<Result> {
     private final Queue<Pair<CompletableFuture<List<Result>>, Integer>> 
waiting = new ConcurrentLinkedQueue<>();
     private final AtomicReference<Throwable> error = new AtomicReference<>();
     private final CompletableFuture<Void> readCompleted = new 
CompletableFuture<>();
+    // Completes when the server's response headers have been received (i.e. 
the request made a full round trip and the
+    // server has begun responding). This is distinct from readCompleted, 
which only completes once the entire body has
+    // been read. It lets a caller (e.g. a remote transaction) block until the 
server has begun handling this request.
+    private final CompletableFuture<Void> headersReceived = new 
CompletableFuture<>();
 
     private final ExecutorService executor;
     private final RequestMessage originalRequestMessage;
@@ -94,6 +98,18 @@ public final class ResultSet implements Iterable<Result> {
         return readCompleted.whenCompleteAsync((s,t) -> {}, executor);
     }
 
+    /**
+     * Returns a future that completes once the server's response headers have 
been received - that is, once the
+     * request has made a full round trip to the server and the server has 
begun responding - without waiting for the
+     * entire body. Used by remote transactions to block on submission just 
long enough to guarantee server-side
+     * ordering relative to a subsequent request, while still streaming the 
body lazily.
+     */
+    public CompletableFuture<Void> headersReceivedAsync() {
+        // completed by Netty's event loop thread; hop to the executor pool so 
application code attached downstream
+        // does not run on (and block) the event loop. Mirrors 
allItemsAvailableAsync().
+        return headersReceived.whenCompleteAsync((s,t) -> {}, executor);
+    }
+
     /**
      * Gets the number of items available on the client.
      */
@@ -196,10 +212,23 @@ public final class ResultSet implements Iterable<Result> {
         tryDrainNextWaiting(false);
     }
 
+    /**
+     * Marks the response headers as received. Called by the streaming decoder 
the moment the server's response header
+     * is decoded, before any body chunks are processed. Idempotent and safe 
to call more than once. On the
+     * non-streaming path there is no separate header event - the aggregator 
coalesces the whole response - so this is
+     * instead tripped as a backstop by {@link #markComplete()}/{@link 
#markError(Throwable)}.
+     */
+    public void markHeadersReceived() {
+        this.headersReceived.complete(null);
+    }
+
     /**
      * Marks the result stream as complete.
      */
     public void markComplete() {
+        // backstop: ensure the headers future is tripped even on a path that 
never signaled it explicitly (e.g. the
+        // non-streaming aggregator path, or a NO_CONTENT response with no 
body).
+        this.headersReceived.complete(null);
         this.readCompleted.complete(null);
         this.drainAllWaiting();
     }
@@ -211,6 +240,9 @@ public final class ResultSet implements Iterable<Result> {
      */
     public void markError(final Throwable throwable) {
         error.set(throwable);
+        // an error may arrive before headers were ever signaled (e.g. a write 
failure or a non-streaming error
+        // response); fail the headers future too so anyone blocked on it 
observes the error rather than hanging.
+        this.headersReceived.completeExceptionally(throwable);
         this.readCompleted.completeExceptionally(throwable);
         this.drainAllWaiting();
     }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
index 7aca5a2e2c..b7bce47f64 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
@@ -95,6 +95,16 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
             contentType = resp.headers().get(HttpHeaderNames.CONTENT_TYPE);
             queueInputStream = new ByteBufQueueInputStream();
 
+            // Signal that the server's response headers have arrived, before 
any body chunk is processed. The full
+            // round trip to the server has completed, so a caller blocked on 
headersReceivedAsync() (e.g. a remote
+            // transaction's submit) can now proceed knowing the server has 
ordered this request ahead of any later one
+            // on the same transaction, without waiting for the body to stream 
back. An error response still trips this
+            // via markComplete()/markError() downstream.
+            {
+                final ResultSet rsForHeaders = pendingResultSet.get();
+                if (rsForHeaders != null) rsForHeaders.markHeadersReceived();
+            }
+
             // Spawn reader thread for GraphBinary responses
             if (isGraphBinaryResponse()) {
                 final ResultSet rs = pendingResultSet.get();
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java
index a245e36b03..5b12b84e1f 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java
@@ -320,7 +320,26 @@ public class HttpRemoteTransaction implements 
RemoteTransaction {
         }
 
         try {
-            return pinnedClient.submit(gremlin, builder.create());
+            final ResultSet rs = pinnedClient.submit(gremlin, 
builder.create());
+
+            // pinnedClient.submit() returns as soon as the request is written 
to the wire - it does NOT wait for the
+            // server. That is unsafe for a transaction: a request is 
fire-and-forget at this point, so a later request
+            // on this transaction (e.g. a commit following an addV) could 
reach the server first and the earlier
+            // request's work would be lost. Block until the server's response 
headers come back, which means the
+            // request has made a full round trip and the server has ordered 
it on the transaction's single-threaded
+            // executor ahead of anything we send next. We deliberately wait 
on headers, not the full body, so large
+            // result sets still stream lazily to the caller via the returned 
ResultSet.
+            //
+            // This wait is intentionally unbounded: we must NOT abandon it on 
a client-side stopwatch. A slow server
+            // may still be going to run the request, so giving up early and 
letting the caller retry the same traversal
+            // could execute it twice. The wait ends only on a definitive 
signal - headers arriving (success) or the
+            // future completing exceptionally because the request can no 
longer succeed: a connection close
+            // (channelInactive -> markError), a transport read timeout 
(ReadTimeoutHandler -> exceptionCaught ->
+            // markError), or a server error response. The connection's own 
read timeout, not this method, bounds how
+            // long we can wait for an unresponsive server.
+            rs.headersReceivedAsync().get();
+
+            return rs;
         } catch (Exception e) {
             throw new RuntimeException("Transaction request failed: " + 
e.getMessage(), e);
         }
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
index 4bf3b8c6b7..707167a29b 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -69,6 +70,90 @@ public class ResultSetTest extends AbstractResultSetTest {
         assertThat(resultSet.allItemsAvailable(), is(true));
     }
 
+    @Test
+    public void shouldCompleteHeadersReceivedOnMarkHeadersReceived() throws 
Exception {
+        final CompletableFuture<Void> headers = 
resultSet.headersReceivedAsync();
+        assertThat(headers.isDone(), is(false));
+
+        resultSet.markHeadersReceived();
+
+        headers.get(2, TimeUnit.SECONDS);
+        assertThat(headers.isDone(), is(true));
+        assertThat(headers.isCompletedExceptionally(), is(false));
+    }
+
+    @Test
+    public void shouldCompleteHeadersReceivedBeforeReadCompleted() throws 
Exception {
+        // models the streaming path: headers decode first, then body chunks 
stream in, then read completes. A caller
+        // blocked on headersReceivedAsync() must be released strictly before 
the full body is available.
+        final CompletableFuture<Void> headers = 
resultSet.headersReceivedAsync();
+        final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync();
+        assertThat(headers.isDone(), is(false));
+        assertThat(all.isDone(), is(false));
+
+        // headers arrive first
+        resultSet.markHeadersReceived();
+        headers.get(2, TimeUnit.SECONDS);
+        assertThat(headers.isDone(), is(true));
+        // the body is not finished yet, so the "all" future is still 
outstanding
+        assertThat(all.isDone(), is(false));
+
+        // body finishes streaming
+        resultSet.add(new Result("test1"));
+        resultSet.markComplete();
+        all.get(2, TimeUnit.SECONDS);
+        assertThat(all.isDone(), is(true));
+    }
+
+    @Test
+    public void shouldCompleteHeadersReceivedAsBackstopOnMarkComplete() throws 
Exception {
+        // models the non-streaming (aggregator) path which never signals 
headers explicitly: markComplete() must trip
+        // the headers future too, otherwise a caller blocked on it would hang 
on that path.
+        final CompletableFuture<Void> headers = 
resultSet.headersReceivedAsync();
+        assertThat(headers.isDone(), is(false));
+
+        resultSet.markComplete();
+
+        headers.get(2, TimeUnit.SECONDS);
+        assertThat(headers.isDone(), is(true));
+        assertThat(headers.isCompletedExceptionally(), is(false));
+    }
+
+    @Test
+    public void 
shouldCompleteHeadersReceivedExceptionallyOnErrorBeforeHeaders() throws 
Exception {
+        // an error can arrive before any header was signaled (e.g. a write 
failure or a non-streaming error response);
+        // the headers future must fail rather than hang so a caller blocked 
on it observes the error.
+        final CompletableFuture<Void> headers = 
resultSet.headersReceivedAsync();
+        assertThat(headers.isDone(), is(false));
+
+        resultSet.markError(new RuntimeException("boom"));
+
+        try {
+            headers.get(2, TimeUnit.SECONDS);
+            fail("headersReceivedAsync() should have completed exceptionally");
+        } catch (ExecutionException ex) {
+            assertThat(ex.getCause().getMessage(), is("boom"));
+        }
+        assertThat(headers.isDone(), is(true));
+        assertThat(headers.isCompletedExceptionally(), is(true));
+    }
+
+    @Test
+    public void 
shouldKeepHeadersReceivedSuccessfulWhenErrorArrivesAfterHeaders() throws 
Exception {
+        // headers come back OK and only then does the body fail (e.g. a 
server-side error in the body footer). The
+        // headers future already completed successfully and must not be 
flipped to a failure by the later error.
+        final CompletableFuture<Void> headers = 
resultSet.headersReceivedAsync();
+        resultSet.markHeadersReceived();
+        headers.get(2, TimeUnit.SECONDS);
+        assertThat(headers.isCompletedExceptionally(), is(false));
+
+        resultSet.markError(new RuntimeException("late boom"));
+
+        // still successfully completed - the late error does not 
retroactively fail the headers future
+        assertThat(headers.isDone(), is(true));
+        assertThat(headers.isCompletedExceptionally(), is(false));
+    }
+
     @Test
     public void 
shouldHaveAllItemsAvailableAsynchronouslyOnReadCompleteWhileLoading() throws 
Exception {
         final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync();
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
index 79d26dd75c..3c06e24f83 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
@@ -41,9 +41,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
@@ -103,6 +105,66 @@ public class HttpStreamingResponseHandlerTest {
         channel.finishAndReleaseAll();
     }
 
+    @Test
+    public void shouldMarkHeadersReceivedWhenResponseHeaderDecodes() throws 
Exception {
+        // The ResultSet completes its async stages on its own executor, 
separate from the handler's readerPool (which
+        // mirrors production: cluster.executor() vs 
cluster.streamingReaderPool()). This matters here because an OK
+        // response spawns a reader thread that blocks the readerPool until 
end-of-stream; sharing one executor would
+        // deadlock the headers future's completion hop.
+        final ExecutorService rsExecutor = Executors.newSingleThreadExecutor();
+        try {
+            final ResultSet rs = new ResultSet(rsExecutor, 
RequestMessage.build("g.V()").create(), null);
+            final AtomicReference<ResultSet> pending = new 
AtomicReference<>(rs);
+            final EmbeddedChannel channel = createChannel(pending);
+
+            // capture the future once - each call to headersReceivedAsync() 
returns a fresh async stage
+            final CompletableFuture<Void> headers = rs.headersReceivedAsync();
+            assertFalse(headers.isDone());
+
+            // the response header alone (no body yet) should trip the 
headers-received future, since the full round
+            // trip to the server has completed and the server has begun 
responding
+            final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+            response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
+            channel.writeInbound(response);
+
+            headers.get(2, TimeUnit.SECONDS);
+            assertTrue(headers.isDone());
+            assertFalse(headers.isCompletedExceptionally());
+            // the body has not been read yet, so read-completion must still 
be outstanding
+            assertFalse(rs.allItemsAvailable());
+
+            channel.finishAndReleaseAll();
+        } finally {
+            rsExecutor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void shouldMarkHeadersReceivedOnErrorResponseHeader() throws 
Exception {
+        final ExecutorService rsExecutor = Executors.newSingleThreadExecutor();
+        try {
+            final ResultSet rs = new ResultSet(rsExecutor, 
RequestMessage.build("g.V()").create(), null);
+            final AtomicReference<ResultSet> pending = new 
AtomicReference<>(rs);
+            final EmbeddedChannel channel = createChannel(pending);
+
+            final CompletableFuture<Void> headers = rs.headersReceivedAsync();
+            assertFalse(headers.isDone());
+
+            // an error status still arrives as a response header first; a 
caller blocked on headers must be released
+            final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
"application/json");
+            channel.writeInbound(response);
+
+            headers.get(2, TimeUnit.SECONDS);
+            assertTrue(headers.isDone());
+            assertFalse(headers.isCompletedExceptionally());
+
+            channel.finishAndReleaseAll();
+        } finally {
+            rsExecutor.shutdownNow();
+        }
+    }
+
     @Test
     public void shouldHandleDoubleLastHttpContentWithoutError() throws 
Exception {
         final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);

Reply via email to