This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 5ad67238f7dcea79fe08cf81ba1f721d60951b4d
Author: Ken Hu <[email protected]>
AuthorDate: Tue Jun 23 15:24:37 2026 -0700

    Route exceptionCaught through the response coordinator CTR
    
    A late pipeline error wrote a self-contained response unconditionally, which
    could collide with an already-started chunked stream. Routing through the
    in-flight coordinator preserves the single-response guarantee; sendError
    remains the fallback when no coordinator exists yet.
    
    Assisted-by: Claude Code:claude-opus-4-8
---
 .../server/handler/HttpGremlinEndpointHandler.java | 16 +++++-
 .../server/handler/HttpRequestIdHandler.java       |  4 ++
 .../tinkerpop/gremlin/server/handler/StateKey.java | 10 ++++
 .../handler/HttpGremlinEndpointHandlerTest.java    | 62 ++++++++++++++++++++++
 .../server/handler/HttpRequestIdHandlerTest.java   | 25 +++++++++
 5 files changed, 116 insertions(+), 1 deletion(-)

diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index 7b78a31b77..11939008a8 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -178,6 +178,10 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         final Context requestCtx = new Context(requestMessage, ctx, settings, 
graphManager, gremlinExecutor,
                 gremlinExecutor.getScheduledExecutorService());
         final HttpResponseCoordinator coordinator = new 
HttpResponseCoordinator(requestCtx, serializer.getValue0(), 
serializer.getValue1());
+        // Publish the coordinator so exceptionCaught can route a late 
pipeline error through it (terminating an
+        // in-flight chunked response) instead of writing a second, 
conflicting response. Cleared at the next
+        // request's start in HttpRequestIdHandler.
+        ctx.channel().attr(StateKey.RESPONSE_COORDINATOR).set(coordinator);
 
         final Timer.Context timerContext = evalOpTimer.time();
         // timeout override - handle both deprecated and newly named 
configuration. earlier logic should prevent
@@ -476,7 +480,17 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
     public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) {
         logger.error("Error processing HTTP Request", cause);
 
-        if (ctx.channel().isActive()) {
+        if (!ctx.channel().isActive()) return;
+
+        // If a request reached the endpoint and started a response, route the 
error through its coordinator so the
+        // single-owner-of-writes guarantee holds: it terminates an in-flight 
chunked stream (or no-ops if the response
+        // already completed) instead of writing a second, conflicting full 
response onto the channel. When no
+        // coordinator is set, the error came from an upstream handler before 
the endpoint ran (no response started, and
+        // possibly no serializer negotiated), so fall back to a 
self-contained sendError.
+        final HttpResponseCoordinator coordinator = 
ctx.channel().attr(StateKey.RESPONSE_COORDINATOR).get();
+        if (coordinator != null) {
+            coordinator.writeError(GremlinError.general(cause));
+        } else {
             HttpHandlerUtil.sendError(ctx, INTERNAL_SERVER_ERROR, 
cause.getMessage());
         }
     }
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java
index 55b0e9d71c..f60eafa2f0 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java
@@ -57,6 +57,10 @@ public class HttpRequestIdHandler extends 
ChannelDuplexHandler {
 
             ctx.channel().attr(StateKey.REQUEST_ID).set(UUID.randomUUID());
             ctx.channel().attr(IN_USE).set(true);
+            // Clear any coordinator left over from the previous request on 
this keep-alive connection so that an
+            // exception from an upstream handler (before the endpoint creates 
this request's coordinator) falls back
+            // to sendError rather than no-opping on the prior, 
already-completed coordinator.
+            ctx.channel().attr(StateKey.RESPONSE_COORDINATOR).set(null);
         }
 
         ctx.fireChannelRead(msg);
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
index b8727cf3cc..412aadab84 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
@@ -54,4 +54,14 @@ public final class StateKey {
      * The key for the current {@link AuthenticatedUser}.
      */
     public static final AttributeKey<AuthenticatedUser> AUTHENTICATED_USER = 
AttributeKey.valueOf("authenticatedUser");
+
+    /**
+     * The key for the in-flight request's {@link HttpResponseCoordinator}. 
Cleared at request start (alongside the
+     * in-use flag) and set by {@code HttpGremlinEndpointHandler} once a 
request reaches the endpoint and a coordinator
+     * is created. It lets {@code exceptionCaught} route a late pipeline error 
through the coordinator so it terminates
+     * an already-started chunked stream rather than writing a second, 
conflicting response. When absent — the error
+     * arrived before the endpoint ran, so no response has started — the error 
path falls back to a self-contained
+     * {@code sendError}.
+     */
+    public static final AttributeKey<HttpResponseCoordinator> 
RESPONSE_COORDINATOR = AttributeKey.valueOf("responseCoordinator");
 }
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java
new file mode 100644
index 0000000000..3058289f7b
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.util.ReferenceCountUtil;
+import org.junit.Test;
+
+import static io.netty.util.CharsetUtil.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link HttpGremlinEndpointHandler#exceptionCaught}. When no 
{@link HttpResponseCoordinator} has been
+ * published for the request, the error came from an upstream handler before 
the endpoint ran (no response started, no
+ * serializer necessarily negotiated), so the handler must fall back to a 
self-contained {@code sendError}.
+ */
+public class HttpGremlinEndpointHandlerTest {
+
+    // exceptionCaught touches none of the handler's constructor dependencies, 
so nulls are sufficient here.
+    private static HttpGremlinEndpointHandler newHandler() {
+        return new HttpGremlinEndpointHandler(null, null, null, null);
+    }
+
+    @Test
+    public void exceptionCaughtFallsBackToSendErrorWhenNoCoordinator() {
+        // No RESPONSE_COORDINATOR is set: this models an error from an 
upstream handler before the endpoint ran, where
+        // no response has started (and possibly no serializer was 
negotiated). The handler must produce a single,
+        // self-contained 500 response, preserving the pre-coordinator 
behavior.
+        final EmbeddedChannel channel = new EmbeddedChannel(newHandler());
+
+        channel.pipeline().fireExceptionCaught(new RuntimeException("boom"));
+
+        final FullHttpResponse response = channel.readOutbound();
+        assertEquals(500, response.status().code());
+        assertEquals("application/json", 
response.headers().get(HttpHeaderNames.CONTENT_TYPE));
+        assertTrue(response.content().toString(UTF_8).contains("boom"));
+        assertNull("only a single full response should be written", 
channel.readOutbound());
+
+        ReferenceCountUtil.release(response);
+        channel.finishAndReleaseAll();
+    }
+}
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandlerTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandlerTest.java
index c8ccf6b4de..1b7fdd816a 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandlerTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandlerTest.java
@@ -32,10 +32,12 @@ import io.netty.handler.codec.http.HttpServerCodec;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.util.CharsetUtil;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.UUID;
 
 import static 
org.apache.tinkerpop.gremlin.server.handler.HttpRequestIdHandler.REQUEST_ID_HEADER_NAME;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class HttpRequestIdHandlerTest {
@@ -79,4 +81,27 @@ public class HttpRequestIdHandlerTest {
         httpRequest.release();
         httpResponse.release();
     }
+
+    @Test
+    public void shouldClearStaleResponseCoordinatorOnNewRequest() {
+        final HttpRequestIdHandler httpRequestIdHandler = new 
HttpRequestIdHandler();
+        final EmbeddedChannel testChannel = new EmbeddedChannel(new 
HttpServerCodec(), httpRequestIdHandler);
+
+        // Simulate a coordinator left over from a previous request on this 
keep-alive connection.
+        
testChannel.attr(StateKey.RESPONSE_COORDINATOR).set(Mockito.mock(HttpResponseCoordinator.class));
+
+        final ByteBuf buffer = testChannel.alloc().buffer();
+        buffer.writeCharSequence("abc", CharsetUtil.UTF_8);
+        final FullHttpRequest httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/gremlin", 
buffer);
+
+        testChannel.writeInbound(httpRequest);
+        testChannel.finish();
+
+        // A newly arriving request must clear the stale coordinator so that 
an exception from an upstream handler
+        // (before the endpoint publishes this request's coordinator) falls 
back to sendError rather than no-opping on
+        // the prior, already-completed coordinator.
+        assertNull(testChannel.attr(StateKey.RESPONSE_COORDINATOR).get());
+
+        buffer.release();
+    }
 }

Reply via email to