This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch server-coordinator
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 5d467193f4cb4b02d94f97bd1341f2f64667f29c
Author: Ken Hu <[email protected]>
AuthorDate: Wed Jun 10 22:24:52 2026 -0700

    Centralize HTTP response writes in HttpResponseCoordinator.
    
    Introduce a per-request HttpResponseCoordinator that is the sole owner of
    response writes for the HTTP endpoint, guarded by a single monitor. It 
replaces
    three uncoordinated state holders (Context.requestState, 
Context.startedResponse,
    StateKey.HTTP_RESPONSE_SENT) with one {NOT_STARTED, STREAMING, COMPLETED} 
state
    machine, closing the timeout-vs-eval double-response race: whichever thread
    terminates the response first wins and the others no-op via the COMPLETED
    short-circuit.
    
    Assisted-by: Claude Code:claude-opus-4-8
---
 .../apache/tinkerpop/gremlin/server/Context.java   |  31 ---
 .../server/handler/HttpGremlinEndpointHandler.java | 187 ++++-------------
 .../gremlin/server/handler/HttpHandlerUtil.java    | 111 ----------
 .../server/handler/HttpResponseCoordinator.java    | 232 +++++++++++++++++++++
 .../tinkerpop/gremlin/server/handler/StateKey.java |   5 -
 .../server/GremlinServerHttpIntegrateTest.java     |  14 ++
 6 files changed, 290 insertions(+), 290 deletions(-)

diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index c7c5a610e3..51373d7741 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -23,7 +23,6 @@ import io.netty.channel.ChannelHandlerContext;
 import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
 import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptChecker;
 import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.AbstractTraverser;
-import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
@@ -36,7 +35,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * The context of Gremlin Server within which a particular request is made.
@@ -54,8 +52,6 @@ public class Context {
     private final String materializeProperties;
     private final Object gremlinArgument;
     private final RequestType requestType;
-    private HttpGremlinEndpointHandler.RequestState requestState;
-    private final AtomicBoolean startedResponse = new AtomicBoolean(false);
     private ScheduledFuture<?> timeoutExecutor = null;
     private boolean timeoutExecutorGrabbed = false;
     private final Object timeoutExecutorLock = new Object();
@@ -65,14 +61,6 @@ public class Context {
     public Context(final RequestMessage requestMessage, final 
ChannelHandlerContext ctx,
                    final Settings settings, final GraphManager graphManager,
                    final GremlinExecutor gremlinExecutor, final 
ScheduledExecutorService scheduledExecutorService) {
-        this(requestMessage, ctx, settings, graphManager, gremlinExecutor, 
scheduledExecutorService,
-                HttpGremlinEndpointHandler.RequestState.NOT_STARTED);
-    }
-
-    public Context(final RequestMessage requestMessage, final 
ChannelHandlerContext ctx,
-                   final Settings settings, final GraphManager graphManager,
-                   final GremlinExecutor gremlinExecutor, final 
ScheduledExecutorService scheduledExecutorService,
-                   final HttpGremlinEndpointHandler.RequestState requestState) 
{
         this.requestMessage = requestMessage;
         this.channelHandlerContext = ctx;
         this.settings = settings;
@@ -84,7 +72,6 @@ public class Context {
         final String gremlin = requestMessage.getGremlin();
         this.gremlinArgument = gremlin;
         this.requestType = RequestType.fromGremlin(gremlin);
-        this.requestState = requestState;
         this.requestTimeout = determineTimeout();
         this.materializeProperties = determineMaterializeProperties();
         this.transactionId = 
requestMessage.getField(Tokens.ARGS_TRANSACTION_ID);
@@ -200,16 +187,6 @@ public class Context {
         return gremlinExecutor;
     }
 
-    /**
-     * Gets whether the server has started processing the response for this 
request.
-     */
-    public boolean getStartedResponse() { return startedResponse.get(); }
-
-    /**
-     * Signal that the server has started processing the response.
-     */
-    public void setStartedResponse() { startedResponse.set(true); }
-
     private long determineTimeout() {
         // timeout override - handle both deprecated and newly named 
configuration. earlier logic should prevent
         // both configurations from being submitted at the same time
@@ -250,14 +227,6 @@ public class Context {
         }
     }
 
-    public HttpGremlinEndpointHandler.RequestState getRequestState() {
-        return requestState;
-    }
-
-    public void setRequestState(HttpGremlinEndpointHandler.RequestState 
requestState) {
-        this.requestState = requestState;
-    }
-
     /**
      * Classifies an HTTP request by the kind of work it performs. Transaction 
control requests reuse the canonical
      * Gremlin idioms ({@code g.tx().begin()} etc.) as protocol signals rather 
than evaluating them, because the
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index 40f0d2b79f..2248044a28 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -20,13 +20,11 @@ package org.apache.tinkerpop.gremlin.server.handler;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.TooLongFrameException;
-import io.netty.handler.codec.http.DefaultHttpContent;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -66,7 +64,6 @@ import org.apache.tinkerpop.gremlin.util.MessageSerializer;
 import org.apache.tinkerpop.gremlin.util.Tokens;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
 import org.codehaus.groovy.control.MultipleCompilationErrorsException;
 import org.javatuples.Pair;
@@ -103,14 +100,6 @@ import static 
io.netty.handler.codec.http.HttpHeaderNames.ACCEPT_ENCODING;
 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
 import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
 import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static 
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.FINISHED;
-import static 
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.FINISHING;
-import static 
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.NOT_STARTED;
-import static 
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.STREAMING;
-import static 
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendHttpResponse;
-import static 
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendLastHttpContent;
-import static 
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.writeError;
 
 /**
  * Handler that processes RequestMessage. This handler will attempt to execute 
the query and stream the results back
@@ -184,11 +173,11 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
 
     @Override
     public void channelRead0(final ChannelHandlerContext ctx, final 
RequestMessage requestMessage) {
-        ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(false);
         final Pair<String, MessageSerializer<?>> serializer = 
ctx.channel().attr(StateKey.SERIALIZER).get();
 
         final Context requestCtx = new Context(requestMessage, ctx, settings, 
graphManager, gremlinExecutor,
-                gremlinExecutor.getScheduledExecutorService(), NOT_STARTED);
+                gremlinExecutor.getScheduledExecutorService());
+        final HttpResponseCoordinator coordinator = new 
HttpResponseCoordinator(requestCtx, serializer.getValue0(), 
serializer.getValue1());
 
         final Timer.Context timerContext = evalOpTimer.time();
         // timeout override - handle both deprecated and newly named 
configuration. earlier logic should prevent
@@ -197,8 +186,6 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         final long seto = (null != timeoutMs) ? timeoutMs : 
requestCtx.getSettings().getEvaluationTimeout();
 
         final FutureTask<Void> evalFuture = new FutureTask<>(() -> {
-            requestCtx.setStartedResponse();
-
             try {
                 logger.debug("Processing request containing script [{}] and 
bindings of [{}] on {}",
                         requestMessage.getFieldOrDefault(Tokens.ARGS_GREMLIN, 
""),
@@ -283,15 +270,13 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
 
                 // Send back the 200 OK response header here since the 
response is always chunk transfer encoded. Any
                 // failures that follow this will show up in the response body 
instead.
-                sendHttpResponse(ctx, OK, createResponseHeaders(ctx, 
serializer, requestCtx).toArray(CharSequence[]::new));
-                sendHttpContents(ctx, requestCtx);
-                // Skip if writeError() already terminated the response (e.g., 
serialization error in makeChunk).
-                // Sending a second LastHttpContent would corrupt the HTTP 
framing on keep-alive connections.
-                if (requestCtx.getRequestState() != RequestState.ERROR) {
-                    sendLastHttpContent(ctx, HttpResponseStatus.OK, "");
-                }
+                coordinator.writeHeader(createResponseHeaders(ctx, serializer, 
requestCtx).toArray(CharSequence[]::new));
+                sendHttpContents(ctx, requestCtx, coordinator);
+                // Idempotent terminal call: if the data path already 
terminated the response, complete() is a no-op
+                // via its COMPLETED short-circuit. Otherwise it writes the 
terminal LastHttpContent.
+                coordinator.complete(HttpResponseStatus.OK, "");
             } catch (Throwable t) {
-                writeError(requestCtx, formErrorResponseMessage(t, 
requestMessage), serializer.getValue1());
+                coordinator.writeError(formErrorResponseMessage(t, 
requestMessage));
             } finally {
                 timerContext.stop();
 
@@ -313,18 +298,18 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
                     
transactionManager.get(requestCtx.getTransactionId()).get().submit(evalFuture) :
                     
requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture);
             if (seto > 0) {
-                // Schedule a timeout in the thread pool for future execution
+                // Schedule a timeout in the thread pool for future execution. 
The coordinator's monitor guarantees
+                // exactly one response: whichever of this timeout task or the 
eval task terminates the response
+                // first wins, and the other's write becomes a no-op.
                 
requestCtx.setTimeoutExecutor(requestCtx.getScheduledExecutorService().schedule(()
 -> {
                     executionFuture.cancel(true);
-                    if (!requestCtx.getStartedResponse()) {
-                        writeError(requestCtx, 
GremlinError.timeout(requestMessage), serializer.getValue1());
-                    }
+                    
coordinator.writeError(GremlinError.timeout(requestMessage));
                 }, seto, TimeUnit.MILLISECONDS));
             }
         } catch (RejectedExecutionException ree) {
-            writeError(requestCtx, GremlinError.rateLimiting(), 
serializer.getValue1());
+            coordinator.writeError(GremlinError.rateLimiting());
         } catch (NoSuchElementException | IllegalStateException nsee) {
-            writeError(requestCtx, 
GremlinError.transactionNotFound(requestCtx.getTransactionId()), 
serializer.getValue1());
+            
coordinator.writeError(GremlinError.transactionNotFound(requestCtx.getTransactionId()));
         }
     }
 
@@ -345,7 +330,8 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         return headers;
     }
 
-    private void sendHttpContents(final ChannelHandlerContext ctx, final 
Context requestContext) throws Exception {
+    private void sendHttpContents(final ChannelHandlerContext ctx, final 
Context requestContext,
+                                  final HttpResponseCoordinator coordinator) 
throws Exception {
         final Pair<String, MessageSerializer<?>> serializer = 
ctx.channel().attr(StateKey.SERIALIZER).get();
         final RequestMessage request = requestContext.getRequestMessage();
         final String txId = requestContext.getTransactionId();
@@ -355,14 +341,14 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         if ((txId != null) && transaction.isEmpty()) throw new 
ProcessingException(GremlinError.transactionNotFound(txId));
 
         if (requestContext.isTransactionBegin()) {
-            runBegin(requestContext, transaction.get(), serializer);
+            runBegin(transaction.get(), coordinator);
         } else if (requestContext.isTransactionCommit()) {
-            handleGraphOp(requestContext, txId, Transaction::commit, 
serializer);
+            handleGraphOp(requestContext, txId, Transaction::commit, 
coordinator);
         } else if (requestContext.isTransactionRollback()) {
-            handleGraphOp(requestContext, txId, Transaction::rollback, 
serializer);
+            handleGraphOp(requestContext, txId, Transaction::rollback, 
coordinator);
         } else {
             // Both transactional and non-transactional traversals follow this 
path for response chunking.
-            iterateScriptEvalResult(requestContext, serializer.getValue1(), 
request);
+            iterateScriptEvalResult(requestContext, serializer.getValue1(), 
request, coordinator);
         }
     }
 
@@ -416,7 +402,8 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         return GremlinError.general(t);
     }
 
-    private void iterateScriptEvalResult(final Context context, 
MessageSerializer<?> serializer, final RequestMessage message)
+    private void iterateScriptEvalResult(final Context context, 
MessageSerializer<?> serializer, final RequestMessage message,
+                                         final HttpResponseCoordinator 
coordinator)
             throws ProcessingException, InterruptedException, ScriptException {
         final Map<String, Object> args = message.getFields();
         final String language = args.containsKey(Tokens.ARGS_LANGUAGE) ? 
(String) args.get(Tokens.ARGS_LANGUAGE) : "gremlin-lang";
@@ -460,10 +447,10 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
                 // optimization for driver requests
                 ((Traversal.Admin<?, ?>) result).applyStrategies();
                 itty = new TraverserIterator((Traversal.Admin<?, ?>) result);
-                handleIterator(context, itty, serializer, true);
+                handleIterator(context, itty, coordinator, true);
             } else {
                 itty = IteratorUtils.asIterator(result);
-                handleIterator(context, itty, serializer, false);
+                handleIterator(context, itty, coordinator, false);
             }
 
             if (autoCommit && graph.tx().isOpen()) graph.tx().commit();
@@ -516,20 +503,18 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         }
     }
 
-    private void runBegin(final Context ctx, UnmanagedTransaction tx, final 
Pair<String, MessageSerializer<?>> serializer) throws Exception {
-        final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(), 
List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, tx.getTransactionId())), false, 
false);
-        ctx.getChannelHandlerContext().writeAndFlush(new 
DefaultHttpContent(chunk));
+    private void runBegin(final UnmanagedTransaction tx, final 
HttpResponseCoordinator coordinator) throws Exception {
+        coordinator.writeData(List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, 
tx.getTransactionId())), false, false);
     }
 
     private void handleGraphOp(final Context ctx,
                                final String transactionId,
                                final Consumer<Transaction> graphOp,
-                               final Pair<String, MessageSerializer<?>> 
serializer) throws Exception {
+                               final HttpResponseCoordinator coordinator) 
throws Exception {
         final Graph graph = 
graphManager.getTraversalSource(ctx.getRequestMessage().getField(Tokens.ARGS_G)).getGraph();
         graphOp.accept(graph.tx());
         transactionManager.get(transactionId).ifPresent(tx -> tx.close(true));
-        final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(), 
List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, transactionId)), false, false);
-        ctx.getChannelHandlerContext().writeAndFlush(new 
DefaultHttpContent(chunk));
+        coordinator.writeData(List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, 
transactionId)), false, false);
     }
 
     private Bindings mergeBindingsFromRequest(final Context ctx, final 
Bindings bindings) throws ProcessingException {
@@ -570,7 +555,7 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         return bindings;
     }
 
-    private void handleIterator(final Context context, final Iterator itty, 
final MessageSerializer<?> serializer, final boolean bulking) throws 
InterruptedException {
+    private void handleIterator(final Context context, final Iterator itty, 
final HttpResponseCoordinator coordinator, final boolean bulking) throws 
InterruptedException {
         final ChannelHandlerContext nettyContext = 
context.getChannelHandlerContext();
         final RequestMessage msg = context.getRequestMessage();
         final Settings settings = context.getSettings();
@@ -582,14 +567,11 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
 
         // we have an empty iterator - happens on stuff like: g.V().iterate()
         if (!itty.hasNext()) {
-            ByteBuf chunk = null;
             try {
-                chunk = makeChunk(context, serializer, new ArrayList<>(), 
false, bulking);
-                nettyContext.writeAndFlush(new DefaultHttpContent(chunk));
+                coordinator.writeData(new ArrayList<>(), false, bulking);
             } catch (Exception ex) {
-                // Bytebuf is a countable release - if it does not get written 
downstream
-                // it needs to be released here
-                if (chunk != null) chunk.release();
+                // serialization error is written back to the driver inside 
writeData (which terminates the
+                // response); nothing further to do here.
             }
             return;
         }
@@ -646,37 +628,25 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
             // results till the timeout. checking for isActive() should help 
prevent that.
             if (nettyContext.channel().isActive() && 
nettyContext.channel().isWritable()) {
                 if (aggregate.size() == resultIterationBatchSize || 
!itty.hasNext()) {
-                    ByteBuf chunk = null;
-                    try {
-                        chunk = makeChunk(context, serializer, aggregate, 
itty.hasNext(), bulking);
-                    } catch (Exception ex) {
-                        // Bytebuf is a countable release - if it does not get 
written downstream
-                        // it needs to be released here
-                        if (chunk != null) chunk.release();
-
-                        // exception is handled in makeFrame() - serialization 
error gets written back to driver
-                        // at that point
-                        break;
-                    }
-
                     // track whether there is anything left in the iterator 
because it needs to be accessed after
                     // the transaction could be closed - in that case a call 
to hasNext() could open a new transaction
-                    // unintentionally
+                    // unintentionally. compute it before writing so the 
(possibly tx-closing) hasNext() does not run
+                    // under the coordinator monitor.
                     hasMore = itty.hasNext();
 
+                    final List<Object> page = aggregate;
+                    // only need a fresh aggregation list if there's more 
stuff to write
+                    if (hasMore) {
+                        aggregate = new ArrayList<>(resultIterationBatchSize);
+                    }
+
                     try {
-                        // only need to reset the aggregation list if there's 
more stuff to write
-                        if (hasMore) {
-                            aggregate = new 
ArrayList<>(resultIterationBatchSize);
-                        }
+                        coordinator.writeData(page, hasMore, bulking);
                     } catch (Exception ex) {
-                        // Bytebuf is a countable release - if it does not get 
written downstream
-                        // it needs to be released here
-                        if (chunk != null) chunk.release();
-                        throw ex;
+                        // serialization error gets written back to the driver 
inside writeData (which terminates
+                        // the response); stop iterating.
+                        break;
                     }
-
-                    nettyContext.writeAndFlush(new DefaultHttpContent(chunk));
                 }
             } else {
                 final long currentTime = System.currentTimeMillis();
@@ -724,73 +694,4 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         return false;
     }
 
-    private static ByteBuf makeChunk(final Context ctx, final 
MessageSerializer<?> serializer,
-                                     final List<Object> aggregate, final 
boolean hasMore,
-                                     final boolean bulking) throws Exception {
-        try {
-            final ChannelHandlerContext nettyContext = 
ctx.getChannelHandlerContext();
-
-            ctx.handleDetachment(aggregate);
-
-            if (!hasMore && ctx.getRequestState() == STREAMING) {
-                ctx.setRequestState(FINISHING);
-            }
-
-            ResponseMessage responseMessage = null;
-
-            // for this state no need to build full ResponseMessage
-            if (ctx.getRequestState() != STREAMING) {
-                final ResponseMessage.Builder builder = 
ResponseMessage.build().result(aggregate);
-
-                // need to put status in last message
-                if (ctx.getRequestState() == FINISHING) {
-                    builder.code(HttpResponseStatus.OK);
-                }
-
-                builder.bulked(bulking);
-
-                responseMessage = builder.create();
-            }
-
-            switch (ctx.getRequestState()) {
-                case NOT_STARTED:
-                    if (hasMore) {
-                        ctx.setRequestState(STREAMING);
-                        return serializer.writeHeader(responseMessage, 
nettyContext.alloc());
-                    }
-
-                    final ByteBuf fullResponse = 
serializer.serializeResponseAsBinary(ResponseMessage.build()
-                            .result(aggregate)
-                            .bulked(bulking)
-                            .code(HttpResponseStatus.OK)
-                            .create(), nettyContext.alloc());
-                    ctx.setRequestState(FINISHED);
-                    return fullResponse;
-
-                case STREAMING:
-                    return serializer.writeChunk(aggregate, 
nettyContext.alloc());
-                case FINISHING:
-                    final ByteBuf footer = 
serializer.writeFooter(responseMessage, nettyContext.alloc());
-                    ctx.setRequestState(FINISHED);
-                    return footer;
-            }
-
-            return serializer.serializeResponseAsBinary(responseMessage, 
nettyContext.alloc());
-
-        } catch (Exception ex) {
-            final UUID requestId = 
ctx.getChannelHandlerContext().attr(StateKey.REQUEST_ID).get();
-            logger.warn("The result [{}] in the request {} could not be 
serialized and returned.", aggregate, requestId, ex);
-            writeError(ctx, GremlinError.serialization(ex), serializer);
-            throw ex;
-        }
-    }
-
-    public enum RequestState {
-        NOT_STARTED,
-        STREAMING,
-        // last portion of data
-        FINISHING,
-        FINISHED,
-        ERROR
-    }
 }
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
index 1e5f9a0a7c..8a4d5e316a 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
@@ -19,27 +19,15 @@
 package org.apache.tinkerpop.gremlin.server.handler;
 
 import com.codahale.metrics.Meter;
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.DefaultHttpContent;
-import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.DefaultLastHttpContent;
 import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpUtil;
 import io.netty.util.CharsetUtil;
-import org.apache.tinkerpop.gremlin.server.Context;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
-import org.apache.tinkerpop.gremlin.server.util.GremlinError;
 import org.apache.tinkerpop.gremlin.server.util.MetricManager;
-import org.apache.tinkerpop.gremlin.util.MessageSerializer;
-import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
-import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
-import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
 import org.apache.tinkerpop.shaded.jackson.databind.node.ObjectNode;
 import org.slf4j.Logger;
@@ -47,8 +35,6 @@ import org.slf4j.LoggerFactory;
 
 import static com.codahale.metrics.MetricRegistry.name;
 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
-import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING;
-import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
 /**
@@ -92,101 +78,4 @@ public class HttpHandlerUtil {
 
         ctx.writeAndFlush(response);
     }
-
-    /**
-     * Writes and flushes a {@link ResponseMessage} that contains an error 
back to the client. Can be used to send
-     * errors while streaming or when no response chunk has been sent. This 
serves as the end of a response.
-     *
-     * @param context           The netty context.
-     * @param responseMessage   The response to send back.
-     * @param serializer        The serializer to use to serialize the error 
response.
-     */
-    static void writeError(final Context context, final ResponseMessage 
responseMessage, final MessageSerializer<?> serializer) {
-        // Prevent writing after the response is already terminated. A second 
write would corrupt
-        // HTTP framing on keep-alive connections, poisoning them for 
subsequent requests.
-        if (context.getRequestState() == 
HttpGremlinEndpointHandler.RequestState.ERROR ||
-            context.getRequestState() == 
HttpGremlinEndpointHandler.RequestState.FINISHED) {
-            return;
-        }
-
-        try {
-            final ChannelHandlerContext ctx = 
context.getChannelHandlerContext();
-            final ByteBuf ByteBuf = context.getRequestState() == 
HttpGremlinEndpointHandler.RequestState.STREAMING
-                    ? serializer.writeErrorFooter(responseMessage, ctx.alloc())
-                    : serializer.serializeResponseAsBinary(responseMessage, 
ctx.alloc());
-
-            
context.setRequestState(HttpGremlinEndpointHandler.RequestState.ERROR);
-
-            if (!ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).get()) {
-                sendHttpResponse(ctx,
-                        responseMessage.getStatus().getCode(),
-                        HttpHeaderNames.CONTENT_TYPE, 
ctx.channel().attr(StateKey.SERIALIZER).get().getValue0());
-            }
-
-            ctx.writeAndFlush(new DefaultHttpContent(ByteBuf));
-
-            sendLastHttpContent(ctx, responseMessage.getStatus().getCode(), 
responseMessage.getStatus().getException());
-        } catch (SerializationException se) {
-            logger.warn("Unable to serialize ResponseMessage: {} ", 
responseMessage);
-        }
-    }
-
-    /**
-     * Writes a {@link GremlinError} into the status object of a {@link 
ResponseMessage} and then flushes it. Used to
-     * send specific errors back to the client. This serves as the end of a 
response.
-     *
-     * @param context       The netty context.
-     * @param error         The GremlinError used to populate the status.
-     * @param serializer    The serializer to use to serialize the error 
response.
-     */
-    static void writeError(final Context context, final GremlinError error, 
final MessageSerializer<?> serializer) {
-        final ResponseMessage responseMessage = ResponseMessage.build()
-                .code(error.getCode())
-                .statusMessage(error.getMessage())
-                .exception(error.getException())
-                .create();
-
-        writeError(context, responseMessage, serializer);
-    }
-
-    /**
-     * Adds trailing headers specified in the arguments to a {@link 
DefaultLastHttpContent} and then flushes it. This
-     * serves as the end of a response.
-     *
-     * @param ctx           The netty context.
-     * @param statusCode    The status code to include in the trailers.
-     * @param exceptionType The type of exception to include in the trailers. 
Leave blank or null if no error occurred.
-     */
-    static void sendLastHttpContent(final ChannelHandlerContext ctx, final 
HttpResponseStatus statusCode, final String exceptionType) {
-        // TODO: this might be not sent if exception occurs early so HTTP not 
properly terminated
-        final DefaultLastHttpContent defaultLastHttpContent = new 
DefaultLastHttpContent();
-        defaultLastHttpContent.trailingHeaders().add(SerTokens.TOKEN_CODE, 
statusCode.code());
-        if (exceptionType != null && !exceptionType.isEmpty()) {
-            
defaultLastHttpContent.trailingHeaders().add(SerTokens.TOKEN_EXCEPTION, 
exceptionType);
-        }
-
-        ctx.writeAndFlush(defaultLastHttpContent);
-    }
-
-    /**
-     * Sends the initial HTTP response header with the given status and 
optional header pairs.
-     * Also marks the channel as having sent a response. Headers must be 
provided as alternating
-     * name/value pairs (e.g. {@code CONTENT_TYPE, "application/json"}).
-     *
-     * @param ctx     The netty channel context.
-     * @param status  The HTTP status code for the response.
-     * @param headers Alternating header name/value pairs to set on the 
response.
-     * @throws IllegalArgumentException if headers length is not even
-     */
-    static void sendHttpResponse(final ChannelHandlerContext ctx, final 
HttpResponseStatus status, final CharSequence... headers) {
-        if ((headers.length%2) != 0) throw new 
IllegalArgumentException("Headers should come in pairs.");
-
-        final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1, 
status);
-        responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED);
-        for (int i=0; i<headers.length; i+=2) {
-            responseHeader.headers().set(headers[i], headers[i+1]);
-        }
-        ctx.writeAndFlush(responseHeader);
-        ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true);
-    }
 }
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpResponseCoordinator.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpResponseCoordinator.java
new file mode 100644
index 0000000000..c98dca641c
--- /dev/null
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpResponseCoordinator.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.DefaultLastHttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.util.GremlinError;
+import org.apache.tinkerpop.gremlin.util.MessageSerializer;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
+import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.UUID;
+
+import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED;
+import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Owns every response write for a single HTTP request and guarantees, by 
construction, that a request produces
+ * exactly one well-formed, terminated chunked response. This is a plain 
per-request helper, not a Netty pipeline
+ * handler.
+ * <p>
+ * All mutating methods are guarded by this object's monitor, so the 
eval-worker thread and the timeout-scheduler
+ * thread cannot interleave their writes. The {@link State#COMPLETED} 
short-circuit makes the terminal-write methods
+ * idempotent: whichever thread terminates the response first wins, and the 
others become no-ops. The eval task calls
+ * {@link #complete} from a {@code finally} block, so the terminal {@code 
LastHttpContent} (which clears the channel's
+ * in-use flag and ends the chunked stream) is written even if the 
body-producing code threw.
+ * <p>
+ * Locking discipline (IMPORTANT for maintainers): the {@code state} and 
{@code headerSent} fields are guarded by
+ * this object's monitor. The {@code public}/package methods are {@code 
synchronized} and are the only valid entry
+ * points. The private helpers ({@code ensureHeaderSent}, {@code 
writeTerminal}) mutate or rely on that guarded state
+ * but are deliberately NOT {@code synchronized} — they assume the caller 
already holds the monitor. Only ever call
+ * them from a {@code synchronized} method, and only ever read/write {@code 
state}/{@code headerSent} while holding
+ * the monitor. Do not add an entry point that touches this state without 
{@code synchronized}.
+ */
+final class HttpResponseCoordinator {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpResponseCoordinator.class);
+
+    private enum State {
+        /** No response body has been written yet. */
+        NOT_STARTED,
+        /** The opening framing (header chunk) has been written; more chunks 
or a footer are expected. */
+        STREAMING,
+        /** The response has been fully terminated (terminal {@code 
LastHttpContent} written); no further writes. */
+        COMPLETED
+    }
+
+    private final Context context;
+    private final MessageSerializer<?> serializer;
+    private final String contentType;
+
+    private State state = State.NOT_STARTED;
+    private boolean headerSent = false;
+
+    HttpResponseCoordinator(final Context context, final String contentType, 
final MessageSerializer<?> serializer) {
+        this.context = context;
+        this.contentType = contentType;
+        this.serializer = serializer;
+    }
+
+    /**
+     * Sends the {@code 200 OK} chunked response header exactly once. The 
header pairs are supplied by the caller
+     * because, for a begin-transaction request, the transaction id header is 
only known once the transaction has been
+     * opened. No-op if the response was already terminated or the header was 
already sent.
+     */
+    synchronized void writeHeader(final CharSequence... headers) {
+        if (state == State.COMPLETED) return;
+        ensureHeaderSent(HttpResponseStatus.OK, headers);
+    }
+
+    /**
+     * Writes one page of results. On the final page ({@code hasMore == 
false}) this also writes the body footer and
+     * the terminal {@code LastHttpContent}, transitioning to {@link 
State#COMPLETED}. No-op if already completed.
+     * <p>
+     * On a serialization failure the error is written through {@link 
#writeError} (terminating the response) and the
+     * exception is re-thrown so the caller can stop iterating.
+     */
+    synchronized void writeData(final List<Object> aggregate, final boolean 
hasMore, final boolean bulking) throws Exception {
+        if (state == State.COMPLETED) return;
+
+        final ChannelHandlerContext nettyContext = 
context.getChannelHandlerContext();
+        // backstop: the eval task sends the header before iterating, but 
guarantee it here too.
+        ensureHeaderSent(HttpResponseStatus.OK, HttpHeaderNames.CONTENT_TYPE, 
contentType);
+
+        final boolean firstChunk = state == State.NOT_STARTED;
+        final boolean terminal = !hasMore;
+
+        final ByteBuf chunk;
+        try {
+            // detachment runs inside the try so a failure here is reported to 
the client as a serialization error
+            // (matching the prior makeChunk behavior) rather than escaping 
uncaught.
+            context.handleDetachment(aggregate);
+
+            // An intermediate streaming chunk carries only data (no status, 
no ResponseMessage). Every other case
+            // builds a full ResponseMessage; the terminal page additionally 
carries the OK status. The four cases map
+            // onto distinct serializer framing calls: single-shot 
(first+terminal), header (first+more),
+            // footer (streaming+terminal), and chunk (streaming+more).
+            if (!firstChunk && !terminal) {
+                chunk = serializer.writeChunk(aggregate, nettyContext.alloc());
+            } else {
+                final ResponseMessage.Builder builder = 
ResponseMessage.build().result(aggregate).bulked(bulking);
+                if (terminal) builder.code(HttpResponseStatus.OK);
+                final ResponseMessage responseMessage = builder.create();
+
+                if (firstChunk && terminal) {
+                    chunk = 
serializer.serializeResponseAsBinary(responseMessage, nettyContext.alloc());
+                } else if (firstChunk) {
+                    state = State.STREAMING;
+                    chunk = serializer.writeHeader(responseMessage, 
nettyContext.alloc());
+                } else {
+                    chunk = serializer.writeFooter(responseMessage, 
nettyContext.alloc());
+                }
+            }
+        } catch (Exception ex) {
+            final UUID requestId = 
nettyContext.attr(StateKey.REQUEST_ID).get();
+            logger.warn("The result [{}] in the request {} could not be 
serialized and returned.", aggregate, requestId, ex);
+            writeError(GremlinError.serialization(ex));
+            throw ex;
+        }
+
+        nettyContext.writeAndFlush(new DefaultHttpContent(chunk));
+
+        // The final page closes the body framing above; now end the chunked 
stream.
+        if (!hasMore) {
+            writeTerminal(HttpResponseStatus.OK, "");
+            state = State.COMPLETED;
+        }
+    }
+
+    /**
+     * Writes an error response and terminates the stream. Serializes an error 
footer when mid-stream or a complete
+     * message otherwise. No-op if the response was already terminated. Never 
throws on serialization failure (logs
+     * instead) so the terminal {@code LastHttpContent} is still written.
+     */
+    synchronized void writeError(final ResponseMessage responseMessage) {
+        if (state == State.COMPLETED) return;
+
+        final ChannelHandlerContext ctx = context.getChannelHandlerContext();
+        try {
+            final ByteBuf byteBuf = state == State.STREAMING
+                    ? serializer.writeErrorFooter(responseMessage, ctx.alloc())
+                    : serializer.serializeResponseAsBinary(responseMessage, 
ctx.alloc());
+
+            // an error response that has not yet emitted a header carries the 
error status code on the header line,
+            // matching the prior behavior of HttpHandlerUtil.writeError.
+            ensureHeaderSent(responseMessage.getStatus().getCode(), 
HttpHeaderNames.CONTENT_TYPE, contentType);
+
+            ctx.writeAndFlush(new DefaultHttpContent(byteBuf));
+            writeTerminal(responseMessage.getStatus().getCode(), 
responseMessage.getStatus().getException());
+            state = State.COMPLETED;
+        } catch (SerializationException se) {
+            logger.warn("Unable to serialize ResponseMessage: {} ", 
responseMessage);
+        }
+    }
+
+    /**
+     * Writes an error response built from a {@link GremlinError}.
+     */
+    synchronized void writeError(final GremlinError error) {
+        writeError(ResponseMessage.build()
+                .code(error.getCode())
+                .statusMessage(error.getMessage())
+                .exception(error.getException())
+                .create());
+    }
+
+    /**
+     * Terminal call from the eval task's {@code finally}. Idempotent: writes 
the terminal {@code LastHttpContent}
+     * only if the response was not already completed (by the final data page 
or an error). This guarantees the
+     * chunked stream is always ended even when the body-producing code threw 
an unchecked exception.
+     */
+    synchronized void complete(final HttpResponseStatus status, final String 
exceptionType) {
+        if (state == State.COMPLETED) return;
+        ensureHeaderSent(status, HttpHeaderNames.CONTENT_TYPE, contentType);
+        writeTerminal(status, exceptionType);
+        state = State.COMPLETED;
+    }
+
+    // Caller must hold this object's monitor: reads and mutates the guarded 
{@code headerSent} field. Only ever
+    // invoked from the synchronized public methods; intentionally not 
synchronized itself (see class javadoc).
+    private void ensureHeaderSent(final HttpResponseStatus status, final 
CharSequence... headers) {
+        if (headerSent) return;
+        if ((headers.length % 2) != 0) throw new 
IllegalArgumentException("Headers should come in pairs.");
+
+        final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1, 
status);
+        responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED);
+        for (int i = 0; i < headers.length; i += 2) {
+            responseHeader.headers().set(headers[i], headers[i + 1]);
+        }
+        context.getChannelHandlerContext().writeAndFlush(responseHeader);
+        headerSent = true;
+    }
+
+    // Caller must hold this object's monitor: invoked only from the 
synchronized terminal-write methods as part of
+    // their guarded state transition to COMPLETED. Intentionally not 
synchronized itself (see class javadoc).
+    private void writeTerminal(final HttpResponseStatus statusCode, final 
String exceptionType) {
+        final DefaultLastHttpContent last = new DefaultLastHttpContent();
+        last.trailingHeaders().add(SerTokens.TOKEN_CODE, statusCode.code());
+        if (exceptionType != null && !exceptionType.isEmpty()) {
+            last.trailingHeaders().add(SerTokens.TOKEN_EXCEPTION, 
exceptionType);
+        }
+        context.getChannelHandlerContext().writeAndFlush(last);
+    }
+}
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
index 7849951542..b8727cf3cc 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
@@ -50,11 +50,6 @@ public final class StateKey {
      */
     public static final AttributeKey<UUID> REQUEST_ID = 
AttributeKey.valueOf("requestId");
 
-    /**
-     * The key for whether a {@link io.netty.handler.codec.http.HttpResponse} 
has been sent for the current response.
-     */
-    public static final AttributeKey<Boolean> HTTP_RESPONSE_SENT = 
AttributeKey.valueOf("responseSent");
-
     /**
      * The key for the current {@link AuthenticatedUser}.
      */
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
index ee881d9b8c..c67a0add0d 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
@@ -948,8 +948,21 @@ public class GremlinServerHttpIntegrateTest extends 
AbstractGremlinServerIntegra
             }
         };
 
+        // The second query also times out; additionally assert that the 
timed-out response is exactly one
+        // well-formed, terminated chunked response. The timeout path is the 
cross-thread case (the scheduler thread
+        // calls the coordinator's writeError concurrently with the eval 
worker), so this guards that the single-
+        // response + termination guarantee holds when the second writer is 
the timeout, not just the eval failure
+        // path covered by the shouldHandleErrors* tests. EntityUtils.toString 
only returns once the chunked entity
+        // is fully read including the terminal zero-length chunk, so a 
non-terminated response would block until the
+        // @Test timeout.
         final Callable<Integer> secondQueryWrapper = () -> {
             try (final CloseableHttpResponse response = 
secondClient.execute(secondPost)) {
+                assertTrue(response.getEntity().isChunked());
+                final JsonNode node = 
mapper.readTree(EntityUtils.toString(response.getEntity()));
+                assertEquals(500, node.get("status").get("code").intValue());
+                final Header[] footers = getTrailingHeaders(response);
+                assertEquals("code", footers[0].getName());
+                assertEquals("500", footers[0].getValue());
                 return response.getStatusLine().getStatusCode();
             }
         };
@@ -964,6 +977,7 @@ public class GremlinServerHttpIntegrateTest extends 
AbstractGremlinServerIntegra
 
         threadPool.shutdown();
     }
+
     @Test
     public void shouldErrorWhenTryingToConnectWithHttp1() throws Exception {
         final CloseableHttpClient httpclient = HttpClients.createDefault();

Reply via email to