This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch jstream2
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 72905f4600d088cb08233b1239b5e8c338eb4a3d
Author: Ken Hu <[email protected]>
AuthorDate: Wed May 13 21:27:47 2026 -0700

    Split streaming and nonstreaming pipeline
---
 .../tinkerpop/gremlin/driver/Channelizer.java      |   9 +-
 .../tinkerpop/gremlin/driver/Connection.java       |   4 +
 .../driver/handler/GremlinResponseHandler.java     |  21 +--
 .../handler/HttpStreamingResponseHandler.java      |  32 ++--
 .../handler/HttpStreamingResponseHandlerTest.java  | 207 ---------------------
 5 files changed, 32 insertions(+), 241 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 7e297be462..30cf6fed57 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -212,7 +212,8 @@ public interface Channelizer extends ChannelHandler {
                 final GraphBinaryReader graphBinaryReader =
                         ((GraphBinaryMessageSerializerV4) 
serializer).getMapper().getReader();
                 streamingResponseHandler = new HttpStreamingResponseHandler(
-                        graphBinaryReader, pending, 
cluster.getMaxResponseContentLength());
+                        graphBinaryReader, pending, 
cluster.getMaxResponseContentLength(),
+                        () -> { connection.returnToPool(); 
connection.tryShutdown(); });
             } else {
                 useStreaming = false;
                 gremlinResponseDecoder = new 
HttpGremlinResponseDecoder(serializer);
@@ -267,12 +268,8 @@ public interface Channelizer extends ChannelHandler {
                 pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
                 pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, 
httpCompressionDecoder);
                 pipeline.addLast(PIPELINE_HTTP_DECODER, 
gremlinResponseDecoder);
+                pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new 
GremlinResponseHandler(pending));
             }
-
-            pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new 
GremlinResponseHandler(pending, () -> {
-                connection.returnToPool();
-                connection.tryShutdown();
-            }, useStreaming));
         }
     }
 }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 12a1de646c..2cd0cf2cdd 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -217,6 +217,10 @@ final class Connection {
                                 // write operation.
                                 logger.debug("Error while processing request 
on the server {}.", this, t);
                                 handleConnectionCleanupOnError(thisConnection);
+                            } else {
+                                // the callback for when the read was 
successful, meaning that ResultSet.markComplete()
+                                // was called
+                                thisConnection.returnToPool();
                             }
                             // While this request was in process, close might 
have been signaled in closeAsync().
                             // However, close would be blocked until all 
pending requests are completed. Attempt
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
index 94898783eb..d4cfb167d2 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
@@ -49,13 +49,9 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
     private static final Logger logger = 
LoggerFactory.getLogger(GremlinResponseHandler.class);
     private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = 
AttributeKey.valueOf("caughtException");
     private final AtomicReference<ResultSet> pendingResultSet;
-    private final Runnable onResponseComplete;
-    private final boolean streaming;
 
-    public GremlinResponseHandler(final AtomicReference<ResultSet> pending, 
final Runnable onResponseComplete, final boolean streaming) {
+    public GremlinResponseHandler(final AtomicReference<ResultSet> pending) {
         this.pendingResultSet = pending;
-        this.onResponseComplete = onResponseComplete;
-        this.streaming = streaming;
     }
 
     @Override
@@ -104,17 +100,14 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
 
         // Stream is done when the last content signaling response message is 
read.
         if (LAST_CONTENT_READ_RESPONSE == response) {
-            if (!streaming) {
-                final ResultSet rs = pendingResultSet.getAndSet(null);
-                if (rs != null) {
-                    if (null == 
channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
-                        rs.markComplete();
-                    } else {
-                        
rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
-                    }
+            final ResultSet rs = pendingResultSet.getAndSet(null);
+            if (rs != null) {
+                if (null == 
channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
+                    rs.markComplete();
+                } else {
+                    
rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
                 }
             }
-            onResponseComplete.run();
         }
     }
 
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
index ea0d897aac..09a91c37b7 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
@@ -21,7 +21,7 @@ package org.apache.tinkerpop.gremlin.driver.handler;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.handler.codec.http.HttpObject;
 import io.netty.handler.codec.http.HttpContent;
@@ -43,7 +43,6 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -51,18 +50,20 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
-
 /**
  * Decodes chunked HTTP responses into streaming results without buffering the 
full response body.
  * <p>
  * For GraphBinary responses, content chunks are passed to a {@link 
ByteBufQueueInputStream} consumed by a
  * {@link GraphBinaryStreamResponseReader} on a separate thread. That reader 
deserializes results incrementally,
  * delivers them to the {@code ResultSet}, and handles completion and cleanup. 
For non-GraphBinary error responses
- * (e.g., JSON 401/500), the error body is accumulated and parsed when the 
response ends, then
- * {@code LAST_CONTENT_READ_RESPONSE} is fired for {@link 
GremlinResponseHandler} to process.
+ * (e.g., JSON 401/500), the error body is accumulated and parsed when the 
response ends, then the error is
+ * marked directly on the {@code ResultSet}.
+ * <p>
+ * This handler also owns the connection return lifecycle for the streaming 
pipeline: once all HTTP bytes are
+ * consumed (LastHttpContent), the connection is returned to the pool 
immediately. The reader thread continues
+ * consuming buffered bytes independently of the channel.
  */
-public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpObject> {
+public class HttpStreamingResponseHandler extends 
SimpleChannelInboundHandler<HttpObject> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(HttpStreamingResponseHandler.class);
     private static final ObjectMapper mapper = new ObjectMapper();
@@ -70,6 +71,7 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
     private final GraphBinaryReader graphBinaryReader;
     private final AtomicReference<ResultSet> pendingResultSet;
     private final long maxResponseContentLength;
+    private final Runnable onWireComplete;
 
     // Per-connection executor for the reader thread. Sized to one thread 
because an HTTP/1.1
     // connection has at most one in-flight request. The thread is created 
lazily on first use
@@ -86,10 +88,12 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
 
     public HttpStreamingResponseHandler(final GraphBinaryReader 
graphBinaryReader,
                                         final AtomicReference<ResultSet> 
pendingResultSet,
-                                        final long maxResponseContentLength) {
+                                        final long maxResponseContentLength,
+                                        final Runnable onWireComplete) {
         this.graphBinaryReader = graphBinaryReader;
         this.pendingResultSet = pendingResultSet;
         this.maxResponseContentLength = maxResponseContentLength;
+        this.onWireComplete = onWireComplete;
         this.readerExecutor = new ThreadPoolExecutor(0, 1, 60L, 
TimeUnit.SECONDS,
                 new LinkedBlockingQueue<>(),
                 new BasicThreadFactory.Builder().namingPattern(
@@ -97,8 +101,7 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
     }
 
     @Override
-    protected void decode(final ChannelHandlerContext ctx, final HttpObject 
msg,
-                          final List<Object> out) throws Exception {
+    protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
         if (msg instanceof HttpResponse) {
             final HttpResponse resp = (HttpResponse) msg;
 
@@ -119,10 +122,9 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
                             new GraphBinaryStreamResponseReader(buffer, 
graphBinaryReader, rs, pendingResultSet);
                     readerExecutor.submit(streamReader::run);
                 } else {
-                    // No pending ResultSet — close the stream and fire 
sentinel immediately
+                    // No pending ResultSet — close the stream and return 
connection immediately
                     queueInputStream.signalEndOfStream();
                     queueInputStream = null;
-                    out.add(LAST_CONTENT_READ_RESPONSE);
                 }
             }
         }
@@ -163,12 +165,14 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
                         // rather than offered to the already-closed stream.
                         queueInputStream = null;
                     }
-                    out.add(LAST_CONTENT_READ_RESPONSE);
                 } else {
                     // Non-GraphBinary error — parse accumulated body and fire 
sentinel
                     handleNonGraphBinaryError();
-                    out.add(LAST_CONTENT_READ_RESPONSE);
                 }
+                // All HTTP bytes are off the wire. Return the connection to 
the pool so it
+                // can be reused for the next request. The reader thread 
continues to consume
+                // buffered bytes independently of the channel.
+                onWireComplete.run();
             }
         }
     }
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
deleted file mode 100644
index 2ca5dc0171..0000000000
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver.handler;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.TooLongFrameException;
-import io.netty.handler.codec.http.DefaultHttpContent;
-import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.DefaultLastHttpContent;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpVersion;
-import io.netty.util.CharsetUtil;
-import org.apache.tinkerpop.gremlin.driver.ResultSet;
-import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
-import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
-import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
-import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
-import static org.junit.Assert.*;
-
-public class HttpStreamingResponseHandlerTest {
-
-    private ExecutorService executor;
-    private GraphBinaryMessageSerializerV4 serializer;
-    private GraphBinaryReader reader;
-
-    @Before
-    public void setup() {
-        executor = Executors.newSingleThreadExecutor();
-        serializer = new GraphBinaryMessageSerializerV4();
-        reader = serializer.getMapper().getReader();
-    }
-
-    @After
-    public void teardown() {
-        executor.shutdownNow();
-    }
-
-    private EmbeddedChannel createChannel(final AtomicReference<ResultSet> 
pendingResultSet, final long maxResponseContentLength) {
-        final HttpStreamingResponseHandler handler = new 
HttpStreamingResponseHandler(
-                reader, pendingResultSet, maxResponseContentLength);
-        return new EmbeddedChannel(handler);
-    }
-
-    @Test
-    public void shouldEmitLastContentReadResponseOnHappyPath() throws 
Exception {
-        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
-        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
-        final EmbeddedChannel channel = createChannel(pending, 0);
-
-        // Serialize a valid GraphBinary response
-        final byte[] payload = toBytes(serializer.serializeResponseAsBinary(
-                ResponseMessage.build().code(HttpResponseStatus.OK)
-                        .result(Collections.singletonList(1)).create(),
-                channel.alloc()));
-
-        // Send HttpResponse with GraphBinary content type
-        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
-        channel.writeInbound(response);
-
-        // Send content
-        channel.writeInbound(new 
DefaultHttpContent(Unpooled.wrappedBuffer(payload)));
-
-        // Send LastHttpContent
-        channel.writeInbound(new DefaultLastHttpContent());
-
-        // Verify LAST_CONTENT_READ_RESPONSE is emitted
-        final Object out = channel.readInbound();
-        assertSame(LAST_CONTENT_READ_RESPONSE, out);
-
-        channel.finishAndReleaseAll();
-    }
-
-    @Test
-    public void shouldHandleDoubleLastHttpContentWithoutError() throws 
Exception {
-        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
-        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
-        final EmbeddedChannel channel = createChannel(pending, 0);
-
-        final byte[] payload = toBytes(serializer.serializeResponseAsBinary(
-                ResponseMessage.build().code(HttpResponseStatus.OK)
-                        .result(Collections.singletonList(1)).create(),
-                channel.alloc()));
-
-        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
-        channel.writeInbound(response);
-        channel.writeInbound(new 
DefaultHttpContent(Unpooled.wrappedBuffer(payload)));
-        channel.writeInbound(new DefaultLastHttpContent());
-
-        // Send a second LastHttpContent — should not throw NPE
-        channel.writeInbound(new DefaultLastHttpContent());
-
-        channel.finishAndReleaseAll();
-    }
-
-    @Test
-    public void shouldThrowTooLongFrameExceptionWhenMaxLengthExceeded() {
-        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
-        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
-        final EmbeddedChannel channel = createChannel(pending, 10);
-
-        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
-        channel.writeInbound(response);
-
-        try {
-            // Send content exceeding the 10-byte limit
-            channel.writeInbound(new 
DefaultHttpContent(Unpooled.wrappedBuffer(new byte[20])));
-            fail("Expected TooLongFrameException");
-        } catch (Exception e) {
-            assertTrue(e instanceof TooLongFrameException);
-        }
-
-        channel.finishAndReleaseAll();
-    }
-
-    @Test
-    public void shouldSignalQueueInputStreamOnChannelInactive() throws 
Exception {
-        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
-        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
-        final EmbeddedChannel channel = createChannel(pending, 0);
-
-        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
-        channel.writeInbound(response);
-
-        // Send some content but no LastHttpContent
-        channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new 
byte[]{1, 2, 3})));
-
-        // Fire channelInactive — should not throw and should signal the stream
-        channel.pipeline().fireChannelInactive();
-
-        channel.finishAndReleaseAll();
-    }
-
-    @Test
-    public void shouldMarkErrorOnResultSetForNonGraphBinaryError() throws 
Exception {
-        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
-        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
-        final EmbeddedChannel channel = createChannel(pending, 0);
-
-        // Send a 500 response with JSON content type
-        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
"application/json");
-        channel.writeInbound(response);
-
-        // Send JSON error body
-        final String errorJson = "{\"message\":\"test error\"}";
-        channel.writeInbound(new 
DefaultLastHttpContent(Unpooled.copiedBuffer(errorJson, CharsetUtil.UTF_8)));
-
-        // Verify error is marked on the ResultSet
-        try {
-            rs.all().get();
-            fail("Expected exception");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof ResponseException);
-            final ResponseException re = (ResponseException) e.getCause();
-            assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
re.getResponseStatusCode());
-            assertEquals("test error", re.getMessage());
-        }
-
-        // Verify pendingResultSet was cleared
-        assertNull(pending.get());
-
-        channel.finishAndReleaseAll();
-    }
-
-    private byte[] toBytes(final io.netty.buffer.ByteBuf buf) {
-        final byte[] bytes = new byte[buf.readableBytes()];
-        buf.readBytes(bytes);
-        buf.release();
-        return bytes;
-    }
-}

Reply via email to