This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch jstream2
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 7ad120269be6dd1082702d24830b2a2afc62d1e1
Author: Cole Greer <[email protected]>
AuthorDate: Fri Apr 24 11:43:59 2026 -0700

    Add streaming HTTP response support to gremlin-driver
    
    Adds incremental GraphBinary response deserialization to the Java driver.
    HTTP response chunks are fed to a BlockingQueue-backed InputStream, and a
    dedicated reader thread deserializes items one at a time using the existing
    TypeSerializer infrastructure via an InputStream-backed Buffer adapter.
    
    Results are pushed to the ResultSet as they are deserialized, enabling
    consumers to process results before the full response is received.
    
    Non-GraphBinary serializers (GraphSON, custom MessageSerializer) fall back
    to the previous aggregating pipeline (HttpObjectAggregator +
    HttpGremlinResponseDecoder).
    
    fix tests
    
    Fix connection corruption from double-LastHttpContent in streaming pipeline
    
    Server: Guard sendLastHttpContent with state check so it is skipped when
    writeError() in makeChunk() already terminated the response. Without this,
    a serialization error mid-stream sends two LastHttpContent messages,
    corrupting HTTP framing on keep-alive connections.
    
    Client: Null out queueInputStream after signalEndOfStream on LastHttpContent
    so spurious content between responses is dropped rather than offered to the
    closed stream.
    
    Guard writeError() against double-write after response termination
    
    Prevent writeError() from sending data after the response is already in
    ERROR or FINISHED state. Without this guard, any code path that calls
    writeError() after the response is terminated sends a second
    LastHttpContent, corrupting HTTP framing on keep-alive connections and
    causing subsequent requests on the same connection to hang.
    
    Assisted-by: Kiro:claude-sonnet-4-20250514
    
    Fix serialization error handling and connection return race in streaming 
pipeline
    
    1. HttpGremlinEndpointHandler.makeChunk(): Defer setRequestState(FINISHED) 
until
       after serialization succeeds. Previously, serialization failure left the 
state
       as FINISHED causing writeError() to bail out, resulting in empty 
responses and
       client-side EOFExceptions.
    
    2. Connection.write(): Change whenCompleteAsync to whenComplete so 
returnToPool()
       runs synchronously when readCompleted fires. This prevents a race where 
the
       caller submits the next request before the connection is returned to the 
pool,
       causing connection starvation and idle timeout hangs.
    
    Assisted-by: Kiro:claude-sonnet-4-20250514
    
    Fix channel-error recovery by checking isOpen() in addition to isDead()
    
    When a channel error occurs (e.g. TooLongFrameException), 
GremlinResponseHandler
    calls ctx.close() which initiates an async channel close. The whenComplete 
callback
    fires synchronously and checks isDead() (channel.isActive()), but 
isActive() may
    still return true because the close hasn't completed deregistration yet.
    
    Adding a channel.isOpen() check resolves this TOCTOU race because isOpen() 
returns
    false immediately when close() is called, before deregistration completes.
    
    Assisted-by: Kiro:claude-sonnet-4-20250514
    
    Decouple connection pool return from ResultSet completion in streaming 
pipeline
    
    The streaming reader thread calls markComplete() when all results are
    deserialized, but this could fire before the HTTP response's LastHttpContent
    is processed by the codec. Reusing the connection before HTTP framing is
    complete caused the codec to silently drop subsequent responses.
    
    Changes:
    - Move connection return (returnToPool) from the whenComplete callback to
      GremlinResponseHandler, triggered by LAST_CONTENT_READ_RESPONSE. This
      ensures the connection is only reused after HTTP framing is fully 
complete.
    - Change HttpStreamingResponseHandler type parameter from DefaultHttpObject 
to
      HttpObject. Netty's LastHttpContent.EMPTY_LAST_CONTENT does not extend
      DefaultHttpObject, causing it to bypass decode() and preventing
      LAST_CONTENT_READ_RESPONSE from being emitted.
    - Add streaming flag to GremlinResponseHandler to skip markComplete() for
      streaming responses (the reader thread owns ResultSet completion).
    - Remove returnToPool() from Connection.write() whenComplete success path;
      it is now handled by the pipeline on LastHttpContent for both streaming
      and non-streaming paths.
    
    Assisted-by: Kiro:claude-sonnet-4-20250514
    
    Harden streaming pipeline: fix races, improve robustness, add tests
    
    - Make Connection.returnToPool() idempotent via AtomicBoolean guard to
      prevent double-return corrupting the connection pool in error paths
    - Change streamingReaderPool max to scale with cluster topology
      (maxConnectionPoolSize * contactPoints * 4) instead of per-host limit
    - Catch Throwable (not just Exception) in GraphBinaryStreamResponseReader
      to prevent consumer hangs on Error subclasses (OOM, StackOverflow)
    - Add 30s poll timeout to ByteBufQueueInputStream to prevent reader
      threads from blocking indefinitely if end-of-stream is never signaled
    - Set BYTES_READ attribute on first content (not headers) to preserve
      idle timeout error messaging when server sends headers before execution
    - Clear pendingResultSet in non-GraphBinary error path to prevent stale
      state blocking graceful shutdown
    - Rename isGraphBinaryResponse() to shouldStreamResponse() for clarity
    - Expand upgrade documentation with user-visible behavior details
    - Add HttpStreamingResponseHandlerTest covering happy path, double
      LastHttpContent, max content length, channelInactive, and error cases
    - Add error-then-reuse integration test for connection recovery
    
    cleanup
    
    fix tests
---
 CHANGELOG.asciidoc                                 |   1 +
 docs/src/upgrade/release-4.x.x.asciidoc            |   9 +
 .../tinkerpop/gremlin/driver/Channelizer.java      |  44 ++-
 .../apache/tinkerpop/gremlin/driver/Cluster.java   |  18 ++
 .../tinkerpop/gremlin/driver/Connection.java       |  23 +-
 .../tinkerpop/gremlin/driver/ConnectionPool.java   |   1 +
 .../driver/handler/GremlinResponseHandler.java     |  21 +-
 .../handler/HttpStreamingResponseHandler.java      | 237 ++++++++++++++++
 .../driver/stream/ByteBufQueueInputStream.java     | 137 +++++++++
 .../stream/GraphBinaryStreamResponseReader.java    | 105 +++++++
 .../gremlin/driver/stream/InputStreamBuffer.java   | 311 +++++++++++++++++++++
 .../handler/ByteBufQueueInputStreamTest.java       |  95 +++++++
 .../GraphBinaryStreamResponseReaderTest.java       | 226 +++++++++++++++
 .../handler/HttpStreamingResponseHandlerTest.java  | 207 ++++++++++++++
 .../driver/handler/InputStreamBufferTest.java      |  94 +++++++
 .../server/handler/HttpGremlinEndpointHandler.java |  14 +-
 .../gremlin/server/handler/HttpHandlerUtil.java    |   7 +
 .../server/StreamingResponseIntegrateTest.java     | 239 ++++++++++++++++
 18 files changed, 1761 insertions(+), 28 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index e3f3471a6f..f76f6ce790 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -27,6 +27,7 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 
 * Added Gremlator, a single page web application, that translates Gremlin into 
various programming languages like Javascript and Python.
 * Removed `uuid` dependency from `gremlin-javascript` in favor of the built-in 
`globalThis.crypto.randomUUID()`.
+* Added streaming HTTP response support to `gremlin-driver` for incremental 
result deserialization over GraphBinary.
 * Connected HTTP streaming response deserialization to the traversal API in 
`gremlin-javascript`, enabling `next()` to return the first result without 
waiting for the full response.
 * Changed `Client.stream()` in `gremlin-javascript` to return an 
`AsyncGenerator` for direct incremental consumption.
 * Removed `readable-stream` dependency from `gremlin-javascript`.
diff --git a/docs/src/upgrade/release-4.x.x.asciidoc 
b/docs/src/upgrade/release-4.x.x.asciidoc
index f0216e0b8d..2ebd34fff2 100644
--- a/docs/src/upgrade/release-4.x.x.asciidoc
+++ b/docs/src/upgrade/release-4.x.x.asciidoc
@@ -270,6 +270,15 @@ try {
 The traversal API is not affected — `Next()`, `ToList()`, etc. still throw 
`ResponseException` directly since they
 block on the async stream internally.
 
+==== Streaming Response Deserialization in gremlin-driver
+
+The Java driver now deserializes HTTP responses incrementally, delivering 
results to the `ResultSet` as they arrive
+rather than buffering the entire response. This reduces time-to-first-result 
for large result sets.
+
+This change is automatic and requires no code changes. It applies only when 
using the default GraphBinary serializer.
+Custom `MessageSerializer` implementations fall back to the non-streaming 
pipeline that buffers the full response
+before deserialization. The `ResultSet` API is unchanged.
+
 ==== More Secure Gremlin Server
 
 Previous versions of Gremlin Server relied on a Gremlin-flavored Groovy 
`ScriptEngine` for basic server initialization,
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 427f76c056..59b9d02365 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -34,9 +34,13 @@ import 
org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler;
 import 
org.apache.tinkerpop.gremlin.driver.handler.HttpContentDecompressionHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
 import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder;
+import 
org.apache.tinkerpop.gremlin.driver.handler.HttpStreamingResponseHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.IdleConnectionHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.InactiveChannelHandler;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
 import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.MessageSerializer;
 
 import java.util.Collections;
 import java.util.Optional;
@@ -92,7 +96,7 @@ public interface Channelizer extends ChannelHandler {
         protected Connection connection;
         protected Cluster cluster;
         protected SslHandler sslHandler;
-        private AtomicReference<ResultSet> pending;
+        protected AtomicReference<ResultSet> pending;
 
         protected static final String PIPELINE_GREMLIN_HANDLER = 
"gremlin-handler";
         protected static final String PIPELINE_SSL_HANDLER = 
"gremlin-ssl-handler";
@@ -158,7 +162,6 @@ public interface Channelizer extends ChannelHandler {
             }
 
             configure(pipeline);
-            pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new 
GremlinResponseHandler(pending));
         }
 
         @Override
@@ -187,7 +190,9 @@ public interface Channelizer extends ChannelHandler {
                 
ResponseMessage.build().code(HttpResponseStatus.NO_CONTENT).result(Collections.emptyList()).create();
 
         private HttpGremlinRequestEncoder gremlinRequestEncoder;
+        private HttpStreamingResponseHandler streamingResponseHandler;
         private HttpGremlinResponseDecoder gremlinResponseDecoder;
+        private boolean useStreaming;
 
         private HttpContentDecompressionHandler httpCompressionDecoder;
         private IdleStateHandler idleStateHandler;
@@ -200,7 +205,19 @@ public interface Channelizer extends ChannelHandler {
             httpCompressionDecoder = new HttpContentDecompressionHandler();
             gremlinRequestEncoder = new 
HttpGremlinRequestEncoder(cluster.getSerializer(), 
cluster.getRequestInterceptors(),
                     cluster.isUserAgentOnConnectEnabled(), 
cluster.isBulkResultsEnabled(), connection.getUri());
-            gremlinResponseDecoder = new 
HttpGremlinResponseDecoder(cluster.getSerializer());
+
+            final MessageSerializer<?> serializer = cluster.getSerializer();
+            if (serializer instanceof GraphBinaryMessageSerializerV4) {
+                useStreaming = true;
+                final GraphBinaryReader graphBinaryReader =
+                        ((GraphBinaryMessageSerializerV4) 
serializer).getMapper().getReader();
+                streamingResponseHandler = new HttpStreamingResponseHandler(
+                        graphBinaryReader, pending, 
cluster.streamingReaderPool(), cluster.getMaxResponseContentLength());
+            } else {
+                useStreaming = false;
+                gremlinResponseDecoder = new 
HttpGremlinResponseDecoder(serializer);
+            }
+
             if (cluster.getIdleConnectionTimeout() > 0) {
                 final int idleConnectionTimeout = (int) 
(cluster.getIdleConnectionTimeout() / 1000);
                 idleStateHandler = new IdleStateHandler(idleConnectionTimeout, 
idleConnectionTimeout, 0);
@@ -240,11 +257,22 @@ public interface Channelizer extends ChannelHandler {
                     DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS, false);
 
             pipeline.addLast(PIPELINE_HTTP_CODEC, handler);
-            pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new 
HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0
-                    ? (int) cluster.getMaxResponseContentLength() : 
Integer.MAX_VALUE));
-            pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
-            pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, 
httpCompressionDecoder);
-            pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder);
+            if (useStreaming) {
+                pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, 
httpCompressionDecoder);
+                pipeline.addLast(PIPELINE_HTTP_DECODER, 
streamingResponseHandler);
+                pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
+            } else {
+                pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new 
HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0
+                        ? (int) cluster.getMaxResponseContentLength() : 
Integer.MAX_VALUE));
+                pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
+                pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, 
httpCompressionDecoder);
+                pipeline.addLast(PIPELINE_HTTP_DECODER, 
gremlinResponseDecoder);
+            }
+
+            pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new 
GremlinResponseHandler(pending, () -> {
+                connection.returnToPool();
+                connection.tryShutdown();
+            }, useStreaming));
         }
     }
 }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index cac193814c..d4a3967f02 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -70,8 +70,11 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -383,6 +386,10 @@ public final class Cluster {
         return manager.connectionScheduler;
     }
 
+    ExecutorService streamingReaderPool() {
+        return manager.streamingReaderPool;
+    }
+
     Settings.ConnectionPoolSettings connectionPoolSettings() {
         return manager.connectionPoolSettings;
     }
@@ -956,6 +963,12 @@ public final class Cluster {
          */
         private final ScheduledThreadPoolExecutor connectionScheduler;
 
+        /**
+         * Cached thread pool for streaming response reader threads. One 
thread per active streaming response,
+         * bounded implicitly by the connection pool size.
+         */
+        private final ExecutorService streamingReaderPool;
+
         private final int nioPoolSize;
         private final int workerPoolSize;
         private final int port;
@@ -1023,6 +1036,10 @@ public final class Cluster {
             this.connectionScheduler = new 
ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                     new 
BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build());
 
+            this.streamingReaderPool = new ThreadPoolExecutor(0, 
builder.maxConnectionPoolSize * Math.max(contactPoints.size(), 1) * 4,
+                    60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
+                    new 
BasicThreadFactory.Builder().namingPattern("gremlin-driver-stream-reader-%d").build());
+
             validationRequest = () -> 
RequestMessage.build(builder.validationRequest);
         }
 
@@ -1133,6 +1150,7 @@ public final class Cluster {
                 executor.shutdown();
                 hostScheduler.shutdown();
                 connectionScheduler.shutdown();
+                streamingReaderPool.shutdownNow();
                 closeIt.complete(null);
             });
 
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index cb94b39b87..12a1de646c 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -66,6 +66,14 @@ final class Connection {
      * Is a {@code Connection} borrowed from the pool.
      */
     private final AtomicBoolean isBorrowed = new AtomicBoolean(false);
+    /**
+     * Prevents returnToPool() from being called more than once per borrow 
cycle.
+     */
+    private final AtomicBoolean returned = new AtomicBoolean(false);
+
+    void resetReturned() {
+        returned.set(false);
+    }
     /**
      * This boolean guards the replace of the connection and ensures that it 
only occurs once.
      */
@@ -201,7 +209,7 @@ final class Connection {
                     } else {
                         final ResultSet resultSet = new 
ResultSet(cluster.executor(), requestMessage, pool.host);
 
-                        resultSet.getReadCompleted().whenCompleteAsync((v, t) 
-> {
+                        resultSet.getReadCompleted().whenComplete((v, t) -> {
                             if (t != null) {
                                 // the callback for when the read failed. a 
failed read means the request went to the server
                                 // and came back with a server-side error of 
some sort.  it means the server is responsive
@@ -209,17 +217,13 @@ final class Connection {
                                 // write operation.
                                 logger.debug("Error while processing request 
on the server {}.", this, t);
                                 handleConnectionCleanupOnError(thisConnection);
-                            } else {
-                                // the callback for when the read was 
successful, meaning that ResultSet.markComplete()
-                                // was called
-                                thisConnection.returnToPool();
                             }
                             // While this request was in process, close might 
have been signaled in closeAsync().
                             // However, close would be blocked until all 
pending requests are completed. Attempt
                             // the shutdown if the returned result cleared up 
the last pending message and unblocked
                             // the close.
                             tryShutdown();
-                        }, cluster.executor());
+                        });
 
                         pending.set(resultSet);
 
@@ -234,7 +238,8 @@ final class Connection {
         return requestPromise;
     }
 
-    private void returnToPool() {
+    void returnToPool() {
+        if (!returned.compareAndSet(false, true)) return;
         try {
             if (pool != null) pool.returnConnection(this);
         } catch (ConnectionException ce) {
@@ -244,7 +249,7 @@ final class Connection {
     }
 
     private void handleConnectionCleanupOnError(final Connection 
thisConnection) {
-        if (thisConnection.isDead()) {
+        if (thisConnection.isDead() || (thisConnection.channel != null && 
!thisConnection.channel.isOpen())) {
             if (pool != null) pool.replaceConnection(thisConnection);
         } else {
             thisConnection.returnToPool();
@@ -259,7 +264,7 @@ final class Connection {
      * Close was signaled in closeAsync() but there were pending messages at 
that time. This method attempts the
      * shutdown if the returned result cleared up the last pending message.
      */
-    private void tryShutdown() {
+    void tryShutdown() {
         if (isClosing() && isOkToClose())
             shutdown(closeFuture.get());
     }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index ee4c34c5aa..4f40b6733a 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -525,6 +525,7 @@ final class ConnectionPool {
         while (head != null) {
             // try to borrow connection
             if (!head.isDead() && !head.isBorrowed().get() && 
head.isBorrowed().compareAndSet(false, true)) {
+                head.resetReturned();
                 available = head;
                 break;
             }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
index d4cfb167d2..94898783eb 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
@@ -49,9 +49,13 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
     private static final Logger logger = 
LoggerFactory.getLogger(GremlinResponseHandler.class);
     private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = 
AttributeKey.valueOf("caughtException");
     private final AtomicReference<ResultSet> pendingResultSet;
+    private final Runnable onResponseComplete;
+    private final boolean streaming;
 
-    public GremlinResponseHandler(final AtomicReference<ResultSet> pending) {
+    public GremlinResponseHandler(final AtomicReference<ResultSet> pending, 
final Runnable onResponseComplete, final boolean streaming) {
         this.pendingResultSet = pending;
+        this.onResponseComplete = onResponseComplete;
+        this.streaming = streaming;
     }
 
     @Override
@@ -100,14 +104,17 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
 
         // Stream is done when the last content signaling response message is 
read.
         if (LAST_CONTENT_READ_RESPONSE == response) {
-            final ResultSet rs = pendingResultSet.getAndSet(null);
-            if (rs != null) {
-                if (null == 
channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
-                    rs.markComplete();
-                } else {
-                    
rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
+            if (!streaming) {
+                final ResultSet rs = pendingResultSet.getAndSet(null);
+                if (rs != null) {
+                    if (null == 
channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
+                        rs.markComplete();
+                    } else {
+                        
rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
+                    }
                 }
             }
+            onResponseComplete.run();
         }
     }
 
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
new file mode 100644
index 0000000000..e9d81b46b2
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.CharsetUtil;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import 
org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader;
+import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
+import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
+import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
+
+/**
+ * Decodes chunked HTTP responses into streaming results without buffering the 
full response body.
+ * <p>
+ * For GraphBinary responses, content chunks are passed to a {@link 
ByteBufQueueInputStream} consumed by a
+ * {@link GraphBinaryStreamResponseReader} on a separate thread. That reader 
deserializes results incrementally,
+ * delivers them to the {@code ResultSet}, and handles completion and cleanup. 
For non-GraphBinary error responses
+ * (e.g., JSON 401/500), the error body is accumulated and parsed when the 
response ends, then
+ * {@code LAST_CONTENT_READ_RESPONSE} is fired for {@link 
GremlinResponseHandler} to process.
+ */
+public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpObject> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpStreamingResponseHandler.class);
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    private final GraphBinaryReader graphBinaryReader;
+    private final AtomicReference<ResultSet> pendingResultSet;
+    private final ExecutorService readerPool;
+    private final long maxResponseContentLength;
+
+    // Mutable state below is accessed exclusively from the channel's event 
loop thread.
+    private HttpResponseStatus responseStatus;
+    private String contentType;
+    private long bytesRead;
+    private ByteBufQueueInputStream queueInputStream;
+    private CompositeByteBuf errorBody;
+
+    public HttpStreamingResponseHandler(final GraphBinaryReader 
graphBinaryReader,
+                                        final AtomicReference<ResultSet> 
pendingResultSet,
+                                        final ExecutorService readerPool,
+                                        final long maxResponseContentLength) {
+        this.graphBinaryReader = graphBinaryReader;
+        this.pendingResultSet = pendingResultSet;
+        this.readerPool = readerPool;
+        this.maxResponseContentLength = maxResponseContentLength;
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final HttpObject 
msg,
+                          final List<Object> out) throws Exception {
+        if (msg instanceof HttpResponse) {
+            final HttpResponse resp = (HttpResponse) msg;
+
+            // Reset mutable state for the new response cycle to prevent stale 
state from a previous
+            // response bleeding into this one when the handler is reused on 
the same connection.
+            resetState();
+
+            responseStatus = resp.status();
+            contentType = resp.headers().get(HttpHeaderNames.CONTENT_TYPE);
+            queueInputStream = new ByteBufQueueInputStream();
+
+            // Spawn reader thread for GraphBinary responses
+            if (isGraphBinaryResponse()) {
+                final ResultSet rs = pendingResultSet.get();
+                if (rs != null) {
+                    final InputStreamBuffer buffer = new 
InputStreamBuffer(queueInputStream);
+                    final GraphBinaryStreamResponseReader streamReader =
+                            new GraphBinaryStreamResponseReader(buffer, 
graphBinaryReader, rs, pendingResultSet);
+                    try {
+                        readerPool.submit(streamReader::run);
+                    } catch (RejectedExecutionException e) {
+                        queueInputStream.signalEndOfStream();
+                        rs.markError(e);
+                        pendingResultSet.compareAndSet(rs, null);
+                        out.add(LAST_CONTENT_READ_RESPONSE);
+                    }
+                } else {
+                    // No pending ResultSet — close the stream and fire 
sentinel immediately
+                    queueInputStream.signalEndOfStream();
+                    queueInputStream = null;
+                    out.add(LAST_CONTENT_READ_RESPONSE);
+                }
+            }
+        }
+
+        if (msg instanceof HttpContent) {
+            final ByteBuf content = ((HttpContent) msg).content();
+            bytesRead += content.readableBytes();
+
+            if (bytesRead > 0 && 
ctx.channel().attr(InactiveChannelHandler.BYTES_READ).get() == null) {
+                ctx.channel().attr(InactiveChannelHandler.BYTES_READ).set(0);
+            }
+
+            if (maxResponseContentLength > 0 && bytesRead > 
maxResponseContentLength) {
+                // Don't signal here — exceptionCaught will handle cleanup
+                throw new TooLongFrameException("Response entity too large");
+            }
+
+            if (!isGraphBinaryResponse()) {
+                // Accumulate non-GraphBinary error body across chunks
+                if (content.readableBytes() > 0) {
+                    if (errorBody == null) {
+                        errorBody = ctx.alloc().compositeBuffer();
+                    }
+                    // retain() because Netty releases the content ByteBuf 
after decode() returns
+                    errorBody.addComponent(true, content.retain());
+                }
+            } else if (content.readableBytes() > 0 && queueInputStream != 
null) {
+                // Feed bytes to the reader thread
+                // retain() because Netty releases the content ByteBuf after 
decode() returns
+                queueInputStream.offer(content.retain());
+            }
+
+            if (msg instanceof LastHttpContent) {
+                if (isGraphBinaryResponse()) {
+                    if (queueInputStream != null) {
+                        queueInputStream.signalEndOfStream();
+                        // Null out so any spurious content arriving between 
responses is dropped
+                        // rather than offered to the already-closed stream.
+                        queueInputStream = null;
+                    }
+                    out.add(LAST_CONTENT_READ_RESPONSE);
+                } else {
+                    // Non-GraphBinary error — parse accumulated body and fire 
sentinel
+                    handleNonGraphBinaryError();
+                    out.add(LAST_CONTENT_READ_RESPONSE);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) throws 
Exception {
+        if (queueInputStream != null) {
+            queueInputStream.signalEndOfStream();
+        }
+        releaseErrorBody();
+        super.channelInactive(ctx);
+    }
+
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) throws Exception {
+        if (queueInputStream != null) {
+            queueInputStream.signalEndOfStream();
+        }
+        releaseErrorBody();
+        super.exceptionCaught(ctx, cause);
+    }
+
+    private void handleNonGraphBinaryError() {
+        final ResultSet rs = pendingResultSet.get();
+        if (rs == null) return;
+
+        try {
+            if (errorBody != null && errorBody.readableBytes() > 0) {
+                final JsonNode node = 
mapper.readTree(errorBody.toString(CharsetUtil.UTF_8));
+                final String message = node.has("message") ? 
node.get("message").asText() : responseStatus.reasonPhrase();
+                rs.markError(new ResponseException(responseStatus, message));
+            } else {
+                rs.markError(new ResponseException(responseStatus, 
responseStatus.reasonPhrase()));
+            }
+        } catch (Exception e) {
+            logger.debug("Failed to parse error response body as JSON", e);
+            rs.markError(new ResponseException(responseStatus, 
responseStatus.reasonPhrase()));
+        } finally {
+            pendingResultSet.compareAndSet(rs, null);
+            releaseErrorBody();
+        }
+    }
+
+    private void resetState() {
+        // Clean up any leftover resources from a previous response on this 
connection
+        if (queueInputStream != null) {
+            queueInputStream.signalEndOfStream();
+            queueInputStream = null;
+        }
+        releaseErrorBody();
+        bytesRead = 0;
+        responseStatus = null;
+        contentType = null;
+    }
+
+    private void releaseErrorBody() {
+        if (errorBody != null) {
+            errorBody.release();
+            errorBody = null;
+        }
+    }
+
+    private boolean isGraphBinaryResponse() {
+        return !isError(responseStatus) || 
SerTokens.MIME_GRAPHBINARY_V4.equals(contentType);
+    }
+
+    private static boolean isError(final HttpResponseStatus status) {
+        return status != HttpResponseStatus.OK;
+    }
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
new file mode 100644
index 0000000000..757b2821cb
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.stream;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An {@link InputStream} backed by a {@link BlockingQueue} of {@link ByteBuf} 
objects. The Netty event loop
+ * offers ByteBufs to the queue as HTTP content chunks arrive, and a reader 
thread consumes them via
+ * standard InputStream reads.
+ */
+public class ByteBufQueueInputStream extends InputStream {
+
+    private static final ByteBuf END_OF_STREAM = Unpooled.buffer(0);
+
+    private final BlockingQueue<ByteBuf> queue;
+    private ByteBuf current;
+    private volatile boolean eof;
+
+    public ByteBufQueueInputStream() {
+        this.queue = new LinkedBlockingQueue<>();
+    }
+
+    /**
+     * Offer a ByteBuf to the queue. The caller must have already retained the 
ByteBuf if needed.
+     * The ByteBuf will be released after it is fully read. If the stream is 
already closed,
+     * the buffer is released immediately.
+     */
+    public void offer(final ByteBuf buf) {
+        if (eof) {
+            if (buf != END_OF_STREAM && buf.refCnt() > 0) {
+                buf.release();
+            }
+            return;
+        }
+        queue.add(buf);
+    }
+
+    /**
+     * Signal that no more ByteBufs will be offered.
+     */
+    public void signalEndOfStream() {
+        queue.offer(END_OF_STREAM);
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (eof) return -1;
+
+        while (current == null || !current.isReadable()) {
+            releaseCurrent();
+            try {
+                current = queue.poll(30, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted while waiting for data", e);
+            }
+            if (current == null) throw new IOException("Timed out waiting for 
streaming response data");
+            if (current == END_OF_STREAM) {
+                eof = true;
+                current = null;
+                return -1;
+            }
+        }
+        return current.readByte() & 0xFF;
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws 
IOException {
+        if (eof) return -1;
+        if (len == 0) return 0;
+
+        // Block until at least one byte is available, then return what we 
have (short read).
+        while (current == null || !current.isReadable()) {
+            releaseCurrent();
+            try {
+                current = queue.poll(30, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted while waiting for data", e);
+            }
+            if (current == null) throw new IOException("Timed out waiting for 
streaming response data");
+            if (current == END_OF_STREAM) {
+                eof = true;
+                current = null;
+                return -1;
+            }
+        }
+        final int readable = Math.min(current.readableBytes(), len);
+        current.readBytes(b, off, readable);
+        return readable;
+    }
+
+    @Override
+    public void close() throws IOException {
+        eof = true;
+        releaseCurrent();
+        // drain and release any remaining buffers
+        ByteBuf buf;
+        while ((buf = queue.poll()) != null) {
+            if (buf != END_OF_STREAM && buf.refCnt() > 0) {
+                buf.release();
+            }
+        }
+    }
+
+    private void releaseCurrent() {
+        if (current != null && current != END_OF_STREAM && current.refCnt() > 
0) {
+            current.release();
+        }
+        current = null;
+    }
+
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java
new file mode 100644
index 0000000000..11491903e8
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.stream;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import 
org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
+import org.apache.tinkerpop.gremlin.structure.io.Buffer;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.structure.io.binary.Marker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Performs pull-based streaming deserialization of a GraphBinary v4 response 
from an {@link InputStreamBuffer}.
+ * Reads one item at a time using the {@link GraphBinaryReader} and {@code 
TypeSerializer} infrastructure,
+ * pushing each result to the {@link ResultSet} as it is deserialized.
+ * <p>
+ * Wire format: {@code [version_byte][bulked_flag_byte][items...][EndOfStream 
marker][status_code][message][exception]}
+ */
+public class GraphBinaryStreamResponseReader implements Runnable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(GraphBinaryStreamResponseReader.class);
+
+    private final Buffer buffer;
+    private final GraphBinaryReader reader;
+    private final ResultSet resultSet;
+    private final AtomicReference<ResultSet> pendingResultSet;
+
+    public GraphBinaryStreamResponseReader(final Buffer buffer,
+                                           final GraphBinaryReader reader,
+                                           final ResultSet resultSet,
+                                           final AtomicReference<ResultSet> 
pendingResultSet) {
+        this.buffer = buffer;
+        this.reader = reader;
+        this.resultSet = resultSet;
+        this.pendingResultSet = pendingResultSet;
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Read header: version byte (MSB must be 1) and bulking flag
+            final byte version = buffer.readByte();
+            if ((version & 0x80) == 0) {
+                throw new RuntimeException("Invalid GraphBinary response 
version: " + version);
+            }
+            final boolean bulked = (buffer.readByte() & 1) == 1;
+
+            // Read items until EndOfStream marker
+            while (true) {
+                final Object obj = reader.read(buffer);
+                if (obj instanceof Marker) {
+                    break;
+                }
+
+                if (bulked) {
+                    final long bulk = reader.read(buffer);
+                    resultSet.add(new Result(new DefaultRemoteTraverser<>(obj, 
bulk)));
+                } else {
+                    resultSet.add(new Result(obj));
+                }
+            }
+
+            // Read footer: status code, nullable message, nullable exception
+            final int statusCode = reader.readValue(buffer, Integer.class, 
false);
+            final String message = reader.readValue(buffer, String.class, 
true);
+            final String exception = reader.readValue(buffer, String.class, 
true);
+
+            // Status code 0 means success in GraphBinary v4 — the server 
omits the HTTP status code
+            // in the binary footer when the response is successful.
+            if (statusCode == 0 || statusCode == HttpResponseStatus.OK.code()) 
{
+                resultSet.markComplete();
+            } else {
+                resultSet.markError(new 
ResponseException(HttpResponseStatus.valueOf(statusCode), message, exception));
+            }
+        } catch (Throwable t) {
+            logger.warn("Error reading streaming response", t);
+            resultSet.markError(t);
+        } finally {
+            pendingResultSet.compareAndSet(resultSet, null);
+            buffer.release();
+        }
+    }
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java
new file mode 100644
index 0000000000..4490975f23
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.stream;
+
+import org.apache.tinkerpop.gremlin.structure.io.Buffer;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A read-only {@link Buffer} implementation backed by a blocking {@link 
InputStream} via {@link DataInputStream}.
+ * Supports only sequential read operations — all write, random-access, and 
NIO methods throw
+ * {@link UnsupportedOperationException}.
+ * <p>
+ * This allows the existing {@code TypeSerializer} implementations (which only 
use sequential reads) to work
+ * unchanged over a streaming HTTP response body.
+ */
+public class InputStreamBuffer implements Buffer {
+
+    private final DataInputStream in;
+    private int bytesRead;
+
+    public InputStreamBuffer(final InputStream inputStream) {
+        this.in = new DataInputStream(inputStream);
+    }
+
+    @Override
+    public boolean readBoolean() {
+        try {
+            final boolean v = in.readBoolean();
+            bytesRead += 1;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public byte readByte() {
+        try {
+            final byte v = in.readByte();
+            bytesRead += 1;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public short readShort() {
+        try {
+            final short v = in.readShort();
+            bytesRead += 2;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public int readInt() {
+        try {
+            final int v = in.readInt();
+            bytesRead += 4;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public long readLong() {
+        try {
+            final long v = in.readLong();
+            bytesRead += 8;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public float readFloat() {
+        try {
+            final float v = in.readFloat();
+            bytesRead += 4;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public double readDouble() {
+        try {
+            final double v = in.readDouble();
+            bytesRead += 8;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Buffer readBytes(final byte[] destination) {
+        try {
+            in.readFully(destination);
+            bytesRead += destination.length;
+            return this;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Buffer readBytes(final byte[] destination, final int dstIndex, 
final int length) {
+        try {
+            in.readFully(destination, dstIndex, length);
+            bytesRead += length;
+            return this;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Buffer readBytes(final ByteBuffer dst) {
+        try {
+            final byte[] tmp = new byte[dst.remaining()];
+            in.readFully(tmp);
+            dst.put(tmp);
+            bytesRead += tmp.length;
+            return this;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Buffer readBytes(final OutputStream out, final int length) throws 
IOException {
+        final byte[] tmp = new byte[length];
+        in.readFully(tmp);
+        out.write(tmp);
+        bytesRead += length;
+        return this;
+    }
+
+    @Override
+    public int readerIndex() {
+        return bytesRead;
+    }
+
+    @Override
+    public int readableBytes() {
+        throw new UnsupportedOperationException("readableBytes() is not 
supported on a streaming Buffer");
+    }
+
+    @Override
+    public Buffer readerIndex(final int readerIndex) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int writerIndex() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writerIndex(final int writerIndex) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer markWriterIndex() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer resetWriterIndex() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int capacity() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isDirect() {
+        return false;
+    }
+
+    @Override
+    public Buffer writeBoolean(final boolean value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeByte(final int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeShort(final int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeInt(final int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeLong(final long value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeFloat(final float value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeDouble(final double value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeBytes(final byte[] src) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeBytes(final ByteBuffer src) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeBytes(final byte[] src, final int srcIndex, final int 
length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean release() {
+        try {
+            in.close();
+        } catch (IOException e) {
+            // best-effort close
+        }
+        return true;
+    }
+
+    @Override
+    public Buffer retain() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int referenceCount() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int nioBufferCount() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers(final int index, final int length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuffer nioBuffer() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuffer nioBuffer(final int index, final int length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer getBytes(final int index, final byte[] dst) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
new file mode 100644
index 0000000000..91ba9cf28a
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ByteBufQueueInputStreamTest {
+
+    @Test
+    public void shouldReadSingleByteBuf() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        final ByteBuf buf = Unpooled.buffer();
+        buf.writeBytes(new byte[]{1, 2, 3, 4});
+        stream.offer(buf);
+        stream.signalEndOfStream();
+
+        assertEquals(1, stream.read());
+        assertEquals(2, stream.read());
+        assertEquals(3, stream.read());
+        assertEquals(4, stream.read());
+        assertEquals(-1, stream.read());
+    }
+
+    @Test
+    public void shouldReadAcrossMultipleByteBufs() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2}));
+        stream.offer(Unpooled.wrappedBuffer(new byte[]{3, 4}));
+        stream.signalEndOfStream();
+
+        final byte[] result = new byte[8];
+        int totalRead = 0;
+        int read;
+        while ((read = stream.read(result, totalRead, result.length - 
totalRead)) != -1) {
+            totalRead += read;
+        }
+        assertEquals(4, totalRead);
+        assertArrayEquals(new byte[]{1, 2, 3, 4}, 
java.util.Arrays.copyOf(result, totalRead));
+    }
+
+    @Test
+    public void shouldReleaseByteBufsAfterReading() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4);
+        buf.writeBytes(new byte[]{1, 2, 3, 4});
+        assertEquals(1, buf.refCnt());
+
+        stream.offer(buf);
+        stream.signalEndOfStream();
+
+        final byte[] result = new byte[4];
+        stream.read(result, 0, 4);
+        stream.read(); // triggers release of buf and reads EOS
+
+        assertEquals(0, buf.refCnt());
+    }
+
+    @Test
+    public void shouldCleanUpOnClose() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        final ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(2);
+        buf1.writeBytes(new byte[]{1, 2});
+        final ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(2);
+        buf2.writeBytes(new byte[]{3, 4});
+
+        stream.offer(buf1);
+        stream.offer(buf2);
+        stream.close();
+
+        assertEquals(0, buf1.refCnt());
+        assertEquals(0, buf2.refCnt());
+    }
+}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java
new file mode 100644
index 0000000000..484ec7d524
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import 
org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader;
+import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer;
+import 
org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class GraphBinaryStreamResponseReaderTest {
+
+    private ExecutorService executor;
+    private GraphBinaryMessageSerializerV4 serializer;
+    private GraphBinaryReader reader;
+
+    @Before
+    public void setup() {
+        executor = Executors.newCachedThreadPool();
+        serializer = new GraphBinaryMessageSerializerV4();
+        reader = serializer.getMapper().getReader();
+    }
+
+    @After
+    public void teardown() {
+        executor.shutdownNow();
+    }
+
+    @Test
+    public void shouldReadSingleItemResponse() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        final ByteBuf payload = serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Collections.singletonList("hello")).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(payload);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        final List<Result> results = rs.all().get();
+        assertEquals(1, results.size());
+        assertEquals("hello", results.get(0).getString());
+        assertNull(pending.get());
+    }
+
+    @Test
+    public void shouldReadMultipleItemResponse() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        final ByteBuf payload = serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Arrays.asList(1, 2, 3)).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(payload);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        final List<Result> results = rs.all().get();
+        assertEquals(3, results.size());
+        assertEquals(1, results.get(0).getInt());
+        assertEquals(2, results.get(1).getInt());
+        assertEquals(3, results.get(2).getInt());
+    }
+
+    @Test
+    public void shouldReadBulkedResponse() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        final ByteBuf payload = serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Arrays.asList("a", 3L)).bulked(true).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(payload);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        final List<Result> results = rs.all().get();
+        assertEquals(1, results.size());
+        assertTrue(results.get(0).getObject() instanceof 
DefaultRemoteTraverser);
+        final DefaultRemoteTraverser<?> traverser = 
(DefaultRemoteTraverser<?>) results.get(0).getObject();
+        assertEquals("a", traverser.get());
+        assertEquals(3L, traverser.bulk());
+    }
+
+    @Test
+    public void shouldHandleErrorFooter() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        final ByteBuf payload = serializer.serializeResponseAsBinary(
+                
ResponseMessage.build().code(HttpResponseStatus.INTERNAL_SERVER_ERROR)
+                        .statusMessage("Something went wrong")
+                        .exception("java.lang.RuntimeException")
+                        .result(Collections.emptyList()).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(payload);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        assertTrue(rs.allItemsAvailable());
+        try {
+            rs.all().get();
+            fail("Expected exception");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof ResponseException);
+            final ResponseException re = (ResponseException) e.getCause();
+            assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
re.getResponseStatusCode());
+            assertEquals("Something went wrong", re.getMessage());
+        }
+        assertNull(pending.get());
+    }
+
+    @Test
+    public void shouldReadDataSplitAcrossChunks() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        // Use a large string (~1000 chars) so the split point must land 
inside it regardless of
+        // header/footer overhead. The header+footer are ~30 bytes; the string 
item is ~1000 bytes.
+        final String largeValue = String.join("", Collections.nCopies(100, 
"abcdefghij"));
+        final ByteBuf fullPayload = serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        
.result(Collections.singletonList(largeValue)).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final int splitPoint = fullPayload.readableBytes() / 2;
+        final ByteBuf chunk1 = fullPayload.readSlice(splitPoint).retain();
+        final ByteBuf chunk2 = fullPayload.retain();
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(chunk1);
+        stream.offer(chunk2);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        final List<Result> results = rs.all().get();
+        assertEquals(1, results.size());
+        assertEquals(largeValue, results.get(0).getString());
+
+        fullPayload.release();
+    }
+
+    @Test
+    public void shouldReadEmptyResponse() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        final ByteBuf payload = serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Collections.emptyList()).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(payload);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        final List<Result> results = rs.all().get();
+        assertTrue(results.isEmpty());
+        assertNull(pending.get());
+    }
+}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
new file mode 100644
index 0000000000..fdd37818a4
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.DefaultLastHttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.CharsetUtil;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
+import static org.junit.Assert.*;
+
+public class HttpStreamingResponseHandlerTest {
+
+    private ExecutorService executor;
+    private GraphBinaryMessageSerializerV4 serializer;
+    private GraphBinaryReader reader;
+
+    @Before
+    public void setup() {
+        executor = Executors.newSingleThreadExecutor();
+        serializer = new GraphBinaryMessageSerializerV4();
+        reader = serializer.getMapper().getReader();
+    }
+
+    @After
+    public void teardown() {
+        executor.shutdownNow();
+    }
+
+    private EmbeddedChannel createChannel(final AtomicReference<ResultSet> 
pendingResultSet, final long maxResponseContentLength) {
+        final HttpStreamingResponseHandler handler = new 
HttpStreamingResponseHandler(
+                reader, pendingResultSet, executor, maxResponseContentLength);
+        return new EmbeddedChannel(handler);
+    }
+
+    @Test
+    public void shouldEmitLastContentReadResponseOnHappyPath() throws 
Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+        final EmbeddedChannel channel = createChannel(pending, 0);
+
+        // Serialize a valid GraphBinary response
+        final byte[] payload = toBytes(serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Collections.singletonList(1)).create(),
+                channel.alloc()));
+
+        // Send HttpResponse with GraphBinary content type
+        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
+        channel.writeInbound(response);
+
+        // Send content
+        channel.writeInbound(new 
DefaultHttpContent(Unpooled.wrappedBuffer(payload)));
+
+        // Send LastHttpContent
+        channel.writeInbound(new DefaultLastHttpContent());
+
+        // Verify LAST_CONTENT_READ_RESPONSE is emitted
+        final Object out = channel.readInbound();
+        assertSame(LAST_CONTENT_READ_RESPONSE, out);
+
+        channel.finishAndReleaseAll();
+    }
+
+    @Test
+    public void shouldHandleDoubleLastHttpContentWithoutError() throws 
Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+        final EmbeddedChannel channel = createChannel(pending, 0);
+
+        final byte[] payload = toBytes(serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Collections.singletonList(1)).create(),
+                channel.alloc()));
+
+        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
+        channel.writeInbound(response);
+        channel.writeInbound(new 
DefaultHttpContent(Unpooled.wrappedBuffer(payload)));
+        channel.writeInbound(new DefaultLastHttpContent());
+
+        // Send a second LastHttpContent — should not throw NPE
+        channel.writeInbound(new DefaultLastHttpContent());
+
+        channel.finishAndReleaseAll();
+    }
+
+    @Test
+    public void shouldThrowTooLongFrameExceptionWhenMaxLengthExceeded() {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+        final EmbeddedChannel channel = createChannel(pending, 10);
+
+        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
+        channel.writeInbound(response);
+
+        try {
+            // Send content exceeding the 10-byte limit
+            channel.writeInbound(new 
DefaultHttpContent(Unpooled.wrappedBuffer(new byte[20])));
+            fail("Expected TooLongFrameException");
+        } catch (Exception e) {
+            assertTrue(e instanceof TooLongFrameException);
+        }
+
+        channel.finishAndReleaseAll();
+    }
+
+    @Test
+    public void shouldSignalQueueInputStreamOnChannelInactive() throws 
Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+        final EmbeddedChannel channel = createChannel(pending, 0);
+
+        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
+        channel.writeInbound(response);
+
+        // Send some content but no LastHttpContent
+        channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new 
byte[]{1, 2, 3})));
+
+        // Fire channelInactive — should not throw and should signal the stream
+        channel.pipeline().fireChannelInactive();
+
+        channel.finishAndReleaseAll();
+    }
+
+    @Test
+    public void shouldMarkErrorOnResultSetForNonGraphBinaryError() throws 
Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+        final EmbeddedChannel channel = createChannel(pending, 0);
+
+        // Send a 500 response with JSON content type
+        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
"application/json");
+        channel.writeInbound(response);
+
+        // Send JSON error body
+        final String errorJson = "{\"message\":\"test error\"}";
+        channel.writeInbound(new 
DefaultLastHttpContent(Unpooled.copiedBuffer(errorJson, CharsetUtil.UTF_8)));
+
+        // Verify error is marked on the ResultSet
+        try {
+            rs.all().get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof ResponseException);
+            final ResponseException re = (ResponseException) e.getCause();
+            assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
re.getResponseStatusCode());
+            assertEquals("test error", re.getMessage());
+        }
+
+        // Verify pendingResultSet was cleared
+        assertNull(pending.get());
+
+        channel.finishAndReleaseAll();
+    }
+
+    private byte[] toBytes(final io.netty.buffer.ByteBuf buf) {
+        final byte[] bytes = new byte[buf.readableBytes()];
+        buf.readBytes(bytes);
+        buf.release();
+        return bytes;
+    }
+}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java
new file mode 100644
index 0000000000..02d029d66e
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.Unpooled;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class InputStreamBufferTest {
+
+    @Test
+    public void shouldReadPrimitivesThroughInputStreamBuffer() throws 
Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        final io.netty.buffer.ByteBuf buf = Unpooled.buffer();
+        buf.writeByte(42);
+        buf.writeInt(12345);
+        buf.writeLong(9876543210L);
+        buf.writeFloat(3.14f);
+        buf.writeDouble(2.718281828);
+        buf.writeShort(256);
+        buf.writeBoolean(true);
+        stream.offer(buf);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        assertEquals(42, buffer.readByte());
+        assertEquals(12345, buffer.readInt());
+        assertEquals(9876543210L, buffer.readLong());
+        assertEquals(3.14f, buffer.readFloat(), 0.001f);
+        assertEquals(2.718281828, buffer.readDouble(), 0.000001);
+        assertEquals(256, buffer.readShort());
+        assertTrue(buffer.readBoolean());
+    }
+
+    @Test
+    public void shouldReadBytesArray() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(Unpooled.wrappedBuffer(new byte[]{10, 20, 30}));
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        final byte[] dest = new byte[3];
+        buffer.readBytes(dest);
+        assertArrayEquals(new byte[]{10, 20, 30}, dest);
+    }
+
+    @Test
+    public void shouldTrackReaderIndex() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 
9}));
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        assertEquals(0, buffer.readerIndex());
+        buffer.readByte();
+        assertEquals(1, buffer.readerIndex());
+        buffer.readInt();
+        assertEquals(5, buffer.readerIndex());
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowOnReadableBytes() {
+        new InputStreamBuffer(new ByteBufQueueInputStream()).readableBytes();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowOnWriteInt() {
+        new InputStreamBuffer(new ByteBufQueueInputStream()).writeInt(1);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowOnNioBuffer() {
+        new InputStreamBuffer(new ByteBufQueueInputStream()).nioBuffer();
+    }
+}
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index 7c474d6ba0..f0cb2b9ea2 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -275,7 +275,11 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
                 // failures that follow this will show up in the response body 
instead.
                 sendHttpResponse(ctx, OK, createResponseHeaders(ctx, 
serializer, requestCtx).toArray(CharSequence[]::new));
                 sendHttpContents(ctx, requestCtx);
-                sendLastHttpContent(ctx, HttpResponseStatus.OK, "");
+                // Skip if writeError() already terminated the response (e.g., 
serialization error in makeChunk).
+                // Sending a second LastHttpContent would corrupt the HTTP 
framing on keep-alive connections.
+                if (requestCtx.getRequestState() != RequestState.ERROR) {
+                    sendLastHttpContent(ctx, HttpResponseStatus.OK, "");
+                }
             } catch (Throwable t) {
                 writeError(requestCtx, formErrorResponseMessage(t, 
requestMessage), serializer.getValue1());
             } finally {
@@ -753,19 +757,21 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
                         ctx.setRequestState(STREAMING);
                         return serializer.writeHeader(responseMessage, 
nettyContext.alloc());
                     }
-                    ctx.setRequestState(FINISHED);
 
-                    return 
serializer.serializeResponseAsBinary(ResponseMessage.build()
+                    final ByteBuf fullResponse = 
serializer.serializeResponseAsBinary(ResponseMessage.build()
                             .result(aggregate)
                             .bulked(bulking)
                             .code(HttpResponseStatus.OK)
                             .create(), nettyContext.alloc());
+                    ctx.setRequestState(FINISHED);
+                    return fullResponse;
 
                 case STREAMING:
                     return serializer.writeChunk(aggregate, 
nettyContext.alloc());
                 case FINISHING:
+                    final ByteBuf footer = 
serializer.writeFooter(responseMessage, nettyContext.alloc());
                     ctx.setRequestState(FINISHED);
-                    return serializer.writeFooter(responseMessage, 
nettyContext.alloc());
+                    return footer;
             }
 
             return serializer.serializeResponseAsBinary(responseMessage, 
nettyContext.alloc());
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
index f0f6b652c0..4fa5ba84ec 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
@@ -97,6 +97,13 @@ public class HttpHandlerUtil {
      * @param serializer        The serializer to use to serialize the error 
response.
      */
     static void writeError(final Context context, final ResponseMessage 
responseMessage, final MessageSerializer<?> serializer) {
+        // Prevent writing after the response is already terminated. A second 
write would corrupt
+        // HTTP framing on keep-alive connections, poisoning them for 
subsequent requests.
+        if (context.getRequestState() == 
HttpGremlinEndpointHandler.RequestState.ERROR ||
+            context.getRequestState() == 
HttpGremlinEndpointHandler.RequestState.FINISHED) {
+            return;
+        }
+
         try {
             final ChannelHandlerContext ctx = 
context.getChannelHandlerContext();
             final ByteBuf ByteBuf = context.getRequestState() == 
HttpGremlinEndpointHandler.RequestState.STREAMING
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java
new file mode 100644
index 0000000000..708a63e13f
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
+import static org.junit.Assert.*;
+
+/**
+ * Integration tests for streaming HTTP response support in the Java driver.
+ * Verifies that the streaming pipeline (HttpStreamingResponseHandler + 
GraphBinaryStreamResponseReader)
+ * works correctly end-to-end against a real Gremlin Server.
+ */
+public class StreamingResponseIntegrateTest extends 
AbstractGremlinServerIntegrationTest {
+
+    @Override
+    public Settings overrideSettings(final Settings settings) {
+        settings.channelizer = HttpChannelizer.class.getName();
+        return settings;
+    }
+
+    @Test
+    public void shouldStreamBasicResults() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            final List<Result> results = 
client.submit("g.inject(1,2,3)").all().get();
+            assertEquals(3, results.size());
+            assertEquals(1, results.get(0).getInt());
+            assertEquals(2, results.get(1).getInt());
+            assertEquals(3, results.get(2).getInt());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamIncrementallyWithIterator() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            final ResultSet rs = client.submit("g.inject(1,2,3,4,5)");
+
+            // Consume results one at a time via iterator
+            final Iterator<Result> iter = rs.iterator();
+            final List<Integer> collected = new ArrayList<>();
+            while (iter.hasNext()) {
+                collected.add(iter.next().getInt());
+            }
+            assertEquals(5, collected.size());
+            for (int i = 0; i < 5; i++) {
+                assertEquals(i + 1, (int) collected.get(i));
+            }
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamIncrementallyWithOne() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            final ResultSet rs = client.submit("g.inject(10,20,30)");
+
+            assertEquals(10, rs.one().getInt());
+            assertEquals(20, rs.one().getInt());
+            assertEquals(30, rs.one().getInt());
+            assertNull(rs.one());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamLargeResultSet() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            // Generate a large result set that would span multiple HTTP chunks
+            final List<Result> results = client.submit(
+                    
"g.inject(1).repeat(__.identity()).times(1000).emit()").all().get();
+            assertEquals(1000, results.size());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamEmptyResponse() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            final List<Result> results = 
client.submit("g.V().hasLabel('nonexistent')").all().get();
+            assertTrue(results.isEmpty());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldHandleServerErrorDuringStreaming() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            client.submit("invalid_script_that_should_fail").all().get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            // Error should propagate correctly through the streaming pipeline
+            assertTrue(e.getCause() instanceof ResponseException);
+            final ResponseException re = (ResponseException) e.getCause();
+            assertEquals(HttpResponseStatus.BAD_REQUEST, 
re.getResponseStatusCode());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldReuseConnectionAfterStreamingComplete() throws Exception 
{
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+
+            // First request
+            final List<Result> results1 = 
client.submit("g.inject(1)").all().get();
+            assertEquals(1, results1.size());
+
+            // Second request on same client (should reuse connection)
+            final List<Result> results2 = 
client.submit("g.inject(2)").all().get();
+            assertEquals(1, results2.size());
+            assertEquals(2, results2.get(0).getInt());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldHandleConcurrentStreamingRequests() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+
+            final CompletableFuture<List<Result>> f1 = 
client.submit("g.inject(1,2,3)").all();
+            final CompletableFuture<List<Result>> f2 = 
client.submit("g.inject(4,5,6)").all();
+            final CompletableFuture<List<Result>> f3 = 
client.submit("g.inject(7,8,9)").all();
+
+            assertEquals(3, f1.get().size());
+            assertEquals(3, f2.get().size());
+            assertEquals(3, f3.get().size());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamWithTraversalApi() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final GraphTraversalSource g = traversal().with(
+                    DriverRemoteConnection.using(cluster));
+
+            final List<Integer> results = g.inject(1, 2, 3).toList();
+            assertEquals(3, results.size());
+            assertTrue(results.contains(1));
+            assertTrue(results.contains(2));
+            assertTrue(results.contains(3));
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamVerticesFromGraph() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+
+            // Use inject to create data rather than relying on pre-loaded 
graph
+            final List<Result> results = 
client.submit("g.inject(1,2,3,4,5,6)").all().get();
+            assertEquals(6, results.size());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldReuseConnectionAfterServerError() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+
+            // Submit a request that causes a server error
+            try {
+                client.submit("throw new RuntimeException('test 
error')").all().get();
+                fail("Should have thrown");
+            } catch (ExecutionException e) {
+                // expected
+            }
+
+            // Connection should still be usable
+            final List<Result> results = 
client.submit("g.inject(1)").all().get();
+            assertEquals(1, results.get(0).getInt());
+        } finally {
+            cluster.close();
+        }
+    }
+}


Reply via email to