This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 5ad67238f7dcea79fe08cf81ba1f721d60951b4d Author: Ken Hu <[email protected]> AuthorDate: Tue Jun 23 15:24:37 2026 -0700 Route exceptionCaught through the response coordinator CTR A late pipeline error wrote a self-contained response unconditionally, which could collide with an already-started chunked stream. Routing through the in-flight coordinator preserves the single-response guarantee; sendError remains the fallback when no coordinator exists yet. Assisted-by: Claude Code:claude-opus-4-8 --- .../server/handler/HttpGremlinEndpointHandler.java | 16 +++++- .../server/handler/HttpRequestIdHandler.java | 4 ++ .../tinkerpop/gremlin/server/handler/StateKey.java | 10 ++++ .../handler/HttpGremlinEndpointHandlerTest.java | 62 ++++++++++++++++++++++ .../server/handler/HttpRequestIdHandlerTest.java | 25 +++++++++ 5 files changed, 116 insertions(+), 1 deletion(-) diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index 7b78a31b77..11939008a8 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -178,6 +178,10 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ final Context requestCtx = new Context(requestMessage, ctx, settings, graphManager, gremlinExecutor, gremlinExecutor.getScheduledExecutorService()); final HttpResponseCoordinator coordinator = new HttpResponseCoordinator(requestCtx, serializer.getValue0(), serializer.getValue1()); + // Publish the coordinator so exceptionCaught can route a late pipeline error through it (terminating an + // in-flight chunked response) instead of writing a second, conflicting response. Cleared at the next + // request's start in HttpRequestIdHandler. + ctx.channel().attr(StateKey.RESPONSE_COORDINATOR).set(coordinator); final Timer.Context timerContext = evalOpTimer.time(); // timeout override - handle both deprecated and newly named configuration. earlier logic should prevent @@ -476,7 +480,17 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { logger.error("Error processing HTTP Request", cause); - if (ctx.channel().isActive()) { + if (!ctx.channel().isActive()) return; + + // If a request reached the endpoint and started a response, route the error through its coordinator so the + // single-owner-of-writes guarantee holds: it terminates an in-flight chunked stream (or no-ops if the response + // already completed) instead of writing a second, conflicting full response onto the channel. When no + // coordinator is set, the error came from an upstream handler before the endpoint ran (no response started, and + // possibly no serializer negotiated), so fall back to a self-contained sendError. + final HttpResponseCoordinator coordinator = ctx.channel().attr(StateKey.RESPONSE_COORDINATOR).get(); + if (coordinator != null) { + coordinator.writeError(GremlinError.general(cause)); + } else { HttpHandlerUtil.sendError(ctx, INTERNAL_SERVER_ERROR, cause.getMessage()); } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java index 55b0e9d71c..f60eafa2f0 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java @@ -57,6 +57,10 @@ public class HttpRequestIdHandler extends ChannelDuplexHandler { ctx.channel().attr(StateKey.REQUEST_ID).set(UUID.randomUUID()); ctx.channel().attr(IN_USE).set(true); + // Clear any coordinator left over from the previous request on this keep-alive connection so that an + // exception from an upstream handler (before the endpoint creates this request's coordinator) falls back + // to sendError rather than no-opping on the prior, already-completed coordinator. + ctx.channel().attr(StateKey.RESPONSE_COORDINATOR).set(null); } ctx.fireChannelRead(msg); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java index b8727cf3cc..412aadab84 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java @@ -54,4 +54,14 @@ public final class StateKey { * The key for the current {@link AuthenticatedUser}. */ public static final AttributeKey<AuthenticatedUser> AUTHENTICATED_USER = AttributeKey.valueOf("authenticatedUser"); + + /** + * The key for the in-flight request's {@link HttpResponseCoordinator}. Cleared at request start (alongside the + * in-use flag) and set by {@code HttpGremlinEndpointHandler} once a request reaches the endpoint and a coordinator + * is created. It lets {@code exceptionCaught} route a late pipeline error through the coordinator so it terminates + * an already-started chunked stream rather than writing a second, conflicting response. When absent — the error + * arrived before the endpoint ran, so no response has started — the error path falls back to a self-contained + * {@code sendError}. + */ + public static final AttributeKey<HttpResponseCoordinator> RESPONSE_COORDINATOR = AttributeKey.valueOf("responseCoordinator"); } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java new file mode 100644 index 0000000000..3058289f7b --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.handler; + +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.util.ReferenceCountUtil; +import org.junit.Test; + +import static io.netty.util.CharsetUtil.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link HttpGremlinEndpointHandler#exceptionCaught}. When no {@link HttpResponseCoordinator} has been + * published for the request, the error came from an upstream handler before the endpoint ran (no response started, no + * serializer necessarily negotiated), so the handler must fall back to a self-contained {@code sendError}. + */ +public class HttpGremlinEndpointHandlerTest { + + // exceptionCaught touches none of the handler's constructor dependencies, so nulls are sufficient here. + private static HttpGremlinEndpointHandler newHandler() { + return new HttpGremlinEndpointHandler(null, null, null, null); + } + + @Test + public void exceptionCaughtFallsBackToSendErrorWhenNoCoordinator() { + // No RESPONSE_COORDINATOR is set: this models an error from an upstream handler before the endpoint ran, where + // no response has started (and possibly no serializer was negotiated). The handler must produce a single, + // self-contained 500 response, preserving the pre-coordinator behavior. + final EmbeddedChannel channel = new EmbeddedChannel(newHandler()); + + channel.pipeline().fireExceptionCaught(new RuntimeException("boom")); + + final FullHttpResponse response = channel.readOutbound(); + assertEquals(500, response.status().code()); + assertEquals("application/json", response.headers().get(HttpHeaderNames.CONTENT_TYPE)); + assertTrue(response.content().toString(UTF_8).contains("boom")); + assertNull("only a single full response should be written", channel.readOutbound()); + + ReferenceCountUtil.release(response); + channel.finishAndReleaseAll(); + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandlerTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandlerTest.java index c8ccf6b4de..1b7fdd816a 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandlerTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandlerTest.java @@ -32,10 +32,12 @@ import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpVersion; import io.netty.util.CharsetUtil; import org.junit.Test; +import org.mockito.Mockito; import java.util.UUID; import static org.apache.tinkerpop.gremlin.server.handler.HttpRequestIdHandler.REQUEST_ID_HEADER_NAME; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class HttpRequestIdHandlerTest { @@ -79,4 +81,27 @@ public class HttpRequestIdHandlerTest { httpRequest.release(); httpResponse.release(); } + + @Test + public void shouldClearStaleResponseCoordinatorOnNewRequest() { + final HttpRequestIdHandler httpRequestIdHandler = new HttpRequestIdHandler(); + final EmbeddedChannel testChannel = new EmbeddedChannel(new HttpServerCodec(), httpRequestIdHandler); + + // Simulate a coordinator left over from a previous request on this keep-alive connection. + testChannel.attr(StateKey.RESPONSE_COORDINATOR).set(Mockito.mock(HttpResponseCoordinator.class)); + + final ByteBuf buffer = testChannel.alloc().buffer(); + buffer.writeCharSequence("abc", CharsetUtil.UTF_8); + final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/gremlin", buffer); + + testChannel.writeInbound(httpRequest); + testChannel.finish(); + + // A newly arriving request must clear the stale coordinator so that an exception from an upstream handler + // (before the endpoint publishes this request's coordinator) falls back to sendError rather than no-opping on + // the prior, already-completed coordinator. + assertNull(testChannel.attr(StateKey.RESPONSE_COORDINATOR).get()); + + buffer.release(); + } }
