This is an automated email from the ASF dual-hosted git repository.

Cole-Greer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/master by this push:
     new c11d71b30d Add streaming HTTP response support to gremlin-driver 
(#3419)
c11d71b30d is described below

commit c11d71b30d444f0a5fbe90b924ad0a9b4216dca2
Author: Cole Greer <[email protected]>
AuthorDate: Tue May 19 16:59:33 2026 -0700

    Add streaming HTTP response support to gremlin-driver (#3419)
    
    Adds streaming GraphBinary response deserialization to the Java driver.
    Instead of buffering the entire HTTP response body before processing,
    results are now delivered to the ResultSet as they arrive from the server.
    This reduces time-to-first-result for large result sets without requiring
    application code changes.
    
    Design
    ------
    
    The streaming pipeline replaces HttpObjectAggregator + 
HttpGremlinResponseDecoder
    with a single HttpStreamingResponseHandler that feeds HTTP content chunks 
to a
    reader thread via a BlockingQueue<ByteBuf>. The reader deserializes 
GraphBinary
    items one at a time using an InputStream-backed Buffer adapter, allowing 
existing
    TypeSerializer implementations to work unchanged.
    
    New classes:
    - HttpStreamingResponseHandler: Netty handler orchestrating the streaming 
lifecycle
    - ByteBufQueueInputStream: bridges event loop to reader thread via blocking 
queue
    - InputStreamBuffer: read-only Buffer over InputStream for TypeSerializers
    - GraphBinaryStreamResponseReader: pull-based deserializer on a dedicated 
thread
    
    Non-GraphBinary serializers automatically fall back to the buffered 
pipeline.
    
    Connection lifecycle
    --------------------
    
    Connection pool return is driven by wire-level completion 
(LAST_CONTENT_READ_RESPONSE),
    not application-level completion (ResultSet.markComplete). This ensures all 
HTTP bytes
    are consumed before the connection is reused, preventing framing corruption.
    An AtomicBoolean guard makes returnToPool() idempotent across concurrent 
error paths.
    
    Error handling races between the event loop and reader thread are resolved 
by marking
    errors on the ResultSet before signaling end-of-stream — CompletableFuture's
    single-completion semantics guarantee the correct error surfaces to the 
caller.
    
    Server-side fixes
    -----------------
    
    Prevents connection corruption from double-LastHttpContent when 
writeError() is
    called after response streaming has already terminated. Guards writeError() 
against
    writes in FINISHED/ERROR state and defers state transitions until after 
serialization.
---
 CHANGELOG.asciidoc                                 |   1 +
 docs/src/upgrade/release-4.x.x.asciidoc            |   9 +
 .../tinkerpop/gremlin/driver/Channelizer.java      |  44 ++-
 .../apache/tinkerpop/gremlin/driver/Cluster.java   |  18 ++
 .../tinkerpop/gremlin/driver/Connection.java       |  23 +-
 .../tinkerpop/gremlin/driver/ConnectionPool.java   |   1 +
 .../driver/handler/GremlinResponseHandler.java     |  21 +-
 .../handler/HttpStreamingResponseHandler.java      | 242 ++++++++++++++++
 .../driver/stream/ByteBufQueueInputStream.java     | 137 +++++++++
 .../stream/GraphBinaryStreamResponseReader.java    | 105 +++++++
 .../gremlin/driver/stream/InputStreamBuffer.java   | 311 +++++++++++++++++++++
 .../handler/ByteBufQueueInputStreamTest.java       |  95 +++++++
 .../GraphBinaryStreamResponseReaderTest.java       | 226 +++++++++++++++
 .../handler/HttpStreamingResponseHandlerTest.java  | 207 ++++++++++++++
 .../driver/handler/InputStreamBufferTest.java      |  94 +++++++
 .../server/handler/HttpGremlinEndpointHandler.java |  14 +-
 .../gremlin/server/handler/HttpHandlerUtil.java    |   7 +
 .../server/StreamingResponseIntegrateTest.java     | 239 ++++++++++++++++
 18 files changed, 1766 insertions(+), 28 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index ec09926402..284bbcca26 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -28,6 +28,7 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Added `NextN(n)` to `Traversal` in `gremlin-go` for batched result 
iteration, providing API parity with `next(n)` in the Java, Python, and .NET 
GLVs, and updated the Go translators in `gremlin-core` and `gremlin-javascript` 
to emit `NextN(n)` for the batched form.
 * Added Gremlator, a single page web application, that translates Gremlin into 
various programming languages like Javascript and Python.
 * Removed `uuid` dependency from `gremlin-javascript` in favor of the built-in 
`globalThis.crypto.randomUUID()`.
+* Added streaming HTTP response support to `gremlin-driver` for incremental 
result deserialization over GraphBinary.
 * Connected HTTP streaming response deserialization to the traversal API in 
`gremlin-javascript`, enabling `next()` to return the first result without 
waiting for the full response.
 * Changed `Client.stream()` in `gremlin-javascript` to return an 
`AsyncGenerator` for direct incremental consumption.
 * Removed `readable-stream` dependency from `gremlin-javascript`.
diff --git a/docs/src/upgrade/release-4.x.x.asciidoc 
b/docs/src/upgrade/release-4.x.x.asciidoc
index 2df3fbc987..960c090f5d 100644
--- a/docs/src/upgrade/release-4.x.x.asciidoc
+++ b/docs/src/upgrade/release-4.x.x.asciidoc
@@ -277,6 +277,15 @@ try {
 The traversal API is not affected — `Next()`, `ToList()`, etc. still throw 
`ResponseException` directly since they
 block on the async stream internally.
 
+==== Streaming Response Deserialization in gremlin-driver
+
+The Java driver now deserializes HTTP responses incrementally, delivering 
results to the `ResultSet` as they arrive
+rather than buffering the entire response. This reduces time-to-first-result 
for large result sets.
+
+This change is automatic and requires no code changes. It applies only when 
using the default GraphBinary serializer.
+Custom `MessageSerializer` implementations fall back to the non-streaming 
pipeline that buffers the full response
+before deserialization. The `ResultSet` API is unchanged.
+
 ==== More Secure Gremlin Server
 
 Previous versions of Gremlin Server relied on a Gremlin-flavored Groovy 
`ScriptEngine` for basic server initialization,
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 427f76c056..59b9d02365 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -34,9 +34,13 @@ import 
org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler;
 import 
org.apache.tinkerpop.gremlin.driver.handler.HttpContentDecompressionHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
 import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder;
+import 
org.apache.tinkerpop.gremlin.driver.handler.HttpStreamingResponseHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.IdleConnectionHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.InactiveChannelHandler;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
 import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.MessageSerializer;
 
 import java.util.Collections;
 import java.util.Optional;
@@ -92,7 +96,7 @@ public interface Channelizer extends ChannelHandler {
         protected Connection connection;
         protected Cluster cluster;
         protected SslHandler sslHandler;
-        private AtomicReference<ResultSet> pending;
+        protected AtomicReference<ResultSet> pending;
 
         protected static final String PIPELINE_GREMLIN_HANDLER = 
"gremlin-handler";
         protected static final String PIPELINE_SSL_HANDLER = 
"gremlin-ssl-handler";
@@ -158,7 +162,6 @@ public interface Channelizer extends ChannelHandler {
             }
 
             configure(pipeline);
-            pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new 
GremlinResponseHandler(pending));
         }
 
         @Override
@@ -187,7 +190,9 @@ public interface Channelizer extends ChannelHandler {
                 
ResponseMessage.build().code(HttpResponseStatus.NO_CONTENT).result(Collections.emptyList()).create();
 
         private HttpGremlinRequestEncoder gremlinRequestEncoder;
+        private HttpStreamingResponseHandler streamingResponseHandler;
         private HttpGremlinResponseDecoder gremlinResponseDecoder;
+        private boolean useStreaming;
 
         private HttpContentDecompressionHandler httpCompressionDecoder;
         private IdleStateHandler idleStateHandler;
@@ -200,7 +205,19 @@ public interface Channelizer extends ChannelHandler {
             httpCompressionDecoder = new HttpContentDecompressionHandler();
             gremlinRequestEncoder = new 
HttpGremlinRequestEncoder(cluster.getSerializer(), 
cluster.getRequestInterceptors(),
                     cluster.isUserAgentOnConnectEnabled(), 
cluster.isBulkResultsEnabled(), connection.getUri());
-            gremlinResponseDecoder = new 
HttpGremlinResponseDecoder(cluster.getSerializer());
+
+            final MessageSerializer<?> serializer = cluster.getSerializer();
+            if (serializer instanceof GraphBinaryMessageSerializerV4) {
+                useStreaming = true;
+                final GraphBinaryReader graphBinaryReader =
+                        ((GraphBinaryMessageSerializerV4) 
serializer).getMapper().getReader();
+                streamingResponseHandler = new HttpStreamingResponseHandler(
+                        graphBinaryReader, pending, 
cluster.streamingReaderPool(), cluster.getMaxResponseContentLength());
+            } else {
+                useStreaming = false;
+                gremlinResponseDecoder = new 
HttpGremlinResponseDecoder(serializer);
+            }
+
             if (cluster.getIdleConnectionTimeout() > 0) {
                 final int idleConnectionTimeout = (int) 
(cluster.getIdleConnectionTimeout() / 1000);
                 idleStateHandler = new IdleStateHandler(idleConnectionTimeout, 
idleConnectionTimeout, 0);
@@ -240,11 +257,22 @@ public interface Channelizer extends ChannelHandler {
                     DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS, false);
 
             pipeline.addLast(PIPELINE_HTTP_CODEC, handler);
-            pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new 
HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0
-                    ? (int) cluster.getMaxResponseContentLength() : 
Integer.MAX_VALUE));
-            pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
-            pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, 
httpCompressionDecoder);
-            pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder);
+            if (useStreaming) {
+                pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, 
httpCompressionDecoder);
+                pipeline.addLast(PIPELINE_HTTP_DECODER, 
streamingResponseHandler);
+                pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
+            } else {
+                pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new 
HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0
+                        ? (int) cluster.getMaxResponseContentLength() : 
Integer.MAX_VALUE));
+                pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
+                pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, 
httpCompressionDecoder);
+                pipeline.addLast(PIPELINE_HTTP_DECODER, 
gremlinResponseDecoder);
+            }
+
+            pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new 
GremlinResponseHandler(pending, () -> {
+                connection.returnToPool();
+                connection.tryShutdown();
+            }, useStreaming));
         }
     }
 }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index cac193814c..d4a3967f02 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -70,8 +70,11 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -383,6 +386,10 @@ public final class Cluster {
         return manager.connectionScheduler;
     }
 
+    ExecutorService streamingReaderPool() {
+        return manager.streamingReaderPool;
+    }
+
     Settings.ConnectionPoolSettings connectionPoolSettings() {
         return manager.connectionPoolSettings;
     }
@@ -956,6 +963,12 @@ public final class Cluster {
          */
         private final ScheduledThreadPoolExecutor connectionScheduler;
 
+        /**
+         * Cached thread pool for streaming response reader threads. One 
thread per active streaming response,
+         * bounded implicitly by the connection pool size.
+         */
+        private final ExecutorService streamingReaderPool;
+
         private final int nioPoolSize;
         private final int workerPoolSize;
         private final int port;
@@ -1023,6 +1036,10 @@ public final class Cluster {
             this.connectionScheduler = new 
ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                     new 
BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build());
 
+            this.streamingReaderPool = new ThreadPoolExecutor(0, 
builder.maxConnectionPoolSize * Math.max(contactPoints.size(), 1) * 4,
+                    60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
+                    new 
BasicThreadFactory.Builder().namingPattern("gremlin-driver-stream-reader-%d").build());
+
             validationRequest = () -> 
RequestMessage.build(builder.validationRequest);
         }
 
@@ -1133,6 +1150,7 @@ public final class Cluster {
                 executor.shutdown();
                 hostScheduler.shutdown();
                 connectionScheduler.shutdown();
+                streamingReaderPool.shutdownNow();
                 closeIt.complete(null);
             });
 
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index cb94b39b87..12a1de646c 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -66,6 +66,14 @@ final class Connection {
      * Is a {@code Connection} borrowed from the pool.
      */
     private final AtomicBoolean isBorrowed = new AtomicBoolean(false);
+    /**
+     * Prevents returnToPool() from being called more than once per borrow 
cycle.
+     */
+    private final AtomicBoolean returned = new AtomicBoolean(false);
+
+    void resetReturned() {
+        returned.set(false);
+    }
     /**
      * This boolean guards the replace of the connection and ensures that it 
only occurs once.
      */
@@ -201,7 +209,7 @@ final class Connection {
                     } else {
                         final ResultSet resultSet = new 
ResultSet(cluster.executor(), requestMessage, pool.host);
 
-                        resultSet.getReadCompleted().whenCompleteAsync((v, t) 
-> {
+                        resultSet.getReadCompleted().whenComplete((v, t) -> {
                             if (t != null) {
                                 // the callback for when the read failed. a 
failed read means the request went to the server
                                 // and came back with a server-side error of 
some sort.  it means the server is responsive
@@ -209,17 +217,13 @@ final class Connection {
                                 // write operation.
                                 logger.debug("Error while processing request 
on the server {}.", this, t);
                                 handleConnectionCleanupOnError(thisConnection);
-                            } else {
-                                // the callback for when the read was 
successful, meaning that ResultSet.markComplete()
-                                // was called
-                                thisConnection.returnToPool();
                             }
                             // While this request was in process, close might 
have been signaled in closeAsync().
                             // However, close would be blocked until all 
pending requests are completed. Attempt
                             // the shutdown if the returned result cleared up 
the last pending message and unblocked
                             // the close.
                             tryShutdown();
-                        }, cluster.executor());
+                        });
 
                         pending.set(resultSet);
 
@@ -234,7 +238,8 @@ final class Connection {
         return requestPromise;
     }
 
-    private void returnToPool() {
+    void returnToPool() {
+        if (!returned.compareAndSet(false, true)) return;
         try {
             if (pool != null) pool.returnConnection(this);
         } catch (ConnectionException ce) {
@@ -244,7 +249,7 @@ final class Connection {
     }
 
     private void handleConnectionCleanupOnError(final Connection 
thisConnection) {
-        if (thisConnection.isDead()) {
+        if (thisConnection.isDead() || (thisConnection.channel != null && 
!thisConnection.channel.isOpen())) {
             if (pool != null) pool.replaceConnection(thisConnection);
         } else {
             thisConnection.returnToPool();
@@ -259,7 +264,7 @@ final class Connection {
      * Close was signaled in closeAsync() but there were pending messages at 
that time. This method attempts the
      * shutdown if the returned result cleared up the last pending message.
      */
-    private void tryShutdown() {
+    void tryShutdown() {
         if (isClosing() && isOkToClose())
             shutdown(closeFuture.get());
     }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index ee4c34c5aa..4f40b6733a 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -525,6 +525,7 @@ final class ConnectionPool {
         while (head != null) {
             // try to borrow connection
             if (!head.isDead() && !head.isBorrowed().get() && 
head.isBorrowed().compareAndSet(false, true)) {
+                head.resetReturned();
                 available = head;
                 break;
             }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
index d4cfb167d2..94898783eb 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
@@ -49,9 +49,13 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
     private static final Logger logger = 
LoggerFactory.getLogger(GremlinResponseHandler.class);
     private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = 
AttributeKey.valueOf("caughtException");
     private final AtomicReference<ResultSet> pendingResultSet;
+    private final Runnable onResponseComplete;
+    private final boolean streaming;
 
-    public GremlinResponseHandler(final AtomicReference<ResultSet> pending) {
+    public GremlinResponseHandler(final AtomicReference<ResultSet> pending, 
final Runnable onResponseComplete, final boolean streaming) {
         this.pendingResultSet = pending;
+        this.onResponseComplete = onResponseComplete;
+        this.streaming = streaming;
     }
 
     @Override
@@ -100,14 +104,17 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
 
         // Stream is done when the last content signaling response message is 
read.
         if (LAST_CONTENT_READ_RESPONSE == response) {
-            final ResultSet rs = pendingResultSet.getAndSet(null);
-            if (rs != null) {
-                if (null == 
channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
-                    rs.markComplete();
-                } else {
-                    
rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
+            if (!streaming) {
+                final ResultSet rs = pendingResultSet.getAndSet(null);
+                if (rs != null) {
+                    if (null == 
channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
+                        rs.markComplete();
+                    } else {
+                        
rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
+                    }
                 }
             }
+            onResponseComplete.run();
         }
     }
 
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
new file mode 100644
index 0000000000..607fd6bdf9
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.CharsetUtil;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import 
org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader;
+import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
+import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
+import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
+
+/**
+ * Decodes chunked HTTP responses into streaming results without buffering the 
full response body.
+ * <p>
+ * For GraphBinary responses, content chunks are passed to a {@link 
ByteBufQueueInputStream} consumed by a
+ * {@link GraphBinaryStreamResponseReader} on a separate thread. That reader 
deserializes results incrementally,
+ * delivers them to the {@code ResultSet}, and handles completion and cleanup. 
For non-GraphBinary error responses
+ * (e.g., JSON 401/500), the error body is accumulated and parsed when the 
response ends, then
+ * {@code LAST_CONTENT_READ_RESPONSE} is fired for {@link 
GremlinResponseHandler} to process.
+ */
+public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpObject> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpStreamingResponseHandler.class);
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    private final GraphBinaryReader graphBinaryReader;
+    private final AtomicReference<ResultSet> pendingResultSet;
+    private final ExecutorService readerPool;
+    private final long maxResponseContentLength;
+
+    // Mutable state below is accessed exclusively from the channel's event 
loop thread.
+    private HttpResponseStatus responseStatus;
+    private String contentType;
+    private long bytesRead;
+    private ByteBufQueueInputStream queueInputStream;
+    private CompositeByteBuf errorBody;
+
+    public HttpStreamingResponseHandler(final GraphBinaryReader 
graphBinaryReader,
+                                        final AtomicReference<ResultSet> 
pendingResultSet,
+                                        final ExecutorService readerPool,
+                                        final long maxResponseContentLength) {
+        this.graphBinaryReader = graphBinaryReader;
+        this.pendingResultSet = pendingResultSet;
+        this.readerPool = readerPool;
+        this.maxResponseContentLength = maxResponseContentLength;
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final HttpObject 
msg,
+                          final List<Object> out) throws Exception {
+        if (msg instanceof HttpResponse) {
+            final HttpResponse resp = (HttpResponse) msg;
+
+            // Reset mutable state for the new response cycle to prevent stale 
state from a previous
+            // response bleeding into this one when the handler is reused on 
the same connection.
+            resetState();
+
+            responseStatus = resp.status();
+            contentType = resp.headers().get(HttpHeaderNames.CONTENT_TYPE);
+            queueInputStream = new ByteBufQueueInputStream();
+
+            // Spawn reader thread for GraphBinary responses
+            if (isGraphBinaryResponse()) {
+                final ResultSet rs = pendingResultSet.get();
+                if (rs != null) {
+                    final InputStreamBuffer buffer = new 
InputStreamBuffer(queueInputStream);
+                    final GraphBinaryStreamResponseReader streamReader =
+                            new GraphBinaryStreamResponseReader(buffer, 
graphBinaryReader, rs, pendingResultSet);
+                    try {
+                        readerPool.submit(streamReader::run);
+                    } catch (RejectedExecutionException e) {
+                        queueInputStream.signalEndOfStream();
+                        rs.markError(e);
+                        pendingResultSet.compareAndSet(rs, null);
+                        out.add(LAST_CONTENT_READ_RESPONSE);
+                    }
+                } else {
+                    // No pending ResultSet — close the stream and fire 
sentinel immediately
+                    queueInputStream.signalEndOfStream();
+                    queueInputStream = null;
+                    out.add(LAST_CONTENT_READ_RESPONSE);
+                }
+            }
+        }
+
+        if (msg instanceof HttpContent) {
+            final ByteBuf content = ((HttpContent) msg).content();
+            bytesRead += content.readableBytes();
+
+            if (bytesRead > 0 && 
ctx.channel().attr(InactiveChannelHandler.BYTES_READ).get() == null) {
+                ctx.channel().attr(InactiveChannelHandler.BYTES_READ).set(0);
+            }
+
+            if (maxResponseContentLength > 0 && bytesRead > 
maxResponseContentLength) {
+                throw new TooLongFrameException("Response entity too large");
+            }
+
+            if (!isGraphBinaryResponse()) {
+                // Accumulate non-GraphBinary error body across chunks
+                if (content.readableBytes() > 0) {
+                    if (errorBody == null) {
+                        errorBody = ctx.alloc().compositeBuffer();
+                    }
+                    // retain() because Netty releases the content ByteBuf 
after decode() returns
+                    errorBody.addComponent(true, content.retain());
+                }
+            } else if (content.readableBytes() > 0 && queueInputStream != 
null) {
+                // Feed bytes to the reader thread
+                // retain() because Netty releases the content ByteBuf after 
decode() returns
+                queueInputStream.offer(content.retain());
+            }
+
+            if (msg instanceof LastHttpContent) {
+                if (isGraphBinaryResponse()) {
+                    if (queueInputStream != null) {
+                        queueInputStream.signalEndOfStream();
+                        // Null out so any spurious content arriving between 
responses is dropped
+                        // rather than offered to the already-closed stream.
+                        queueInputStream = null;
+                    }
+                    out.add(LAST_CONTENT_READ_RESPONSE);
+                } else {
+                    // Non-GraphBinary error — parse accumulated body and fire 
sentinel
+                    handleNonGraphBinaryError();
+                    out.add(LAST_CONTENT_READ_RESPONSE);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) throws 
Exception {
+        if (queueInputStream != null) {
+            queueInputStream.signalEndOfStream();
+        }
+        releaseErrorBody();
+        super.channelInactive(ctx);
+    }
+
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) throws Exception {
+        // Mark error before signaling end-of-stream so the reader thread 
can't race
+        // with an EOFException from the closed stream.
+        final ResultSet rs = pendingResultSet.getAndSet(null);
+        if (rs != null) {
+            rs.markError(cause);
+        }
+        if (queueInputStream != null) {
+            queueInputStream.signalEndOfStream();
+        }
+        releaseErrorBody();
+        super.exceptionCaught(ctx, cause);
+    }
+
+    private void handleNonGraphBinaryError() {
+        final ResultSet rs = pendingResultSet.get();
+        if (rs == null) return;
+
+        try {
+            if (errorBody != null && errorBody.readableBytes() > 0) {
+                final JsonNode node = 
mapper.readTree(errorBody.toString(CharsetUtil.UTF_8));
+                final String message = node.has("message") ? 
node.get("message").asText() : responseStatus.reasonPhrase();
+                rs.markError(new ResponseException(responseStatus, message));
+            } else {
+                rs.markError(new ResponseException(responseStatus, 
responseStatus.reasonPhrase()));
+            }
+        } catch (Exception e) {
+            logger.debug("Failed to parse error response body as JSON", e);
+            rs.markError(new ResponseException(responseStatus, 
responseStatus.reasonPhrase()));
+        } finally {
+            pendingResultSet.compareAndSet(rs, null);
+            releaseErrorBody();
+        }
+    }
+
+    private void resetState() {
+        // Clean up any leftover resources from a previous response on this 
connection
+        if (queueInputStream != null) {
+            queueInputStream.signalEndOfStream();
+            queueInputStream = null;
+        }
+        releaseErrorBody();
+        bytesRead = 0;
+        responseStatus = null;
+        contentType = null;
+    }
+
+    private void releaseErrorBody() {
+        if (errorBody != null) {
+            errorBody.release();
+            errorBody = null;
+        }
+    }
+
+    private boolean isGraphBinaryResponse() {
+        return !isError(responseStatus) || 
SerTokens.MIME_GRAPHBINARY_V4.equals(contentType);
+    }
+
+    private static boolean isError(final HttpResponseStatus status) {
+        return status != HttpResponseStatus.OK;
+    }
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
new file mode 100644
index 0000000000..757b2821cb
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.stream;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An {@link InputStream} backed by a {@link BlockingQueue} of {@link ByteBuf} 
objects. The Netty event loop
+ * offers ByteBufs to the queue as HTTP content chunks arrive, and a reader 
thread consumes them via
+ * standard InputStream reads.
+ */
+public class ByteBufQueueInputStream extends InputStream {
+
+    private static final ByteBuf END_OF_STREAM = Unpooled.buffer(0);
+
+    private final BlockingQueue<ByteBuf> queue;
+    private ByteBuf current;
+    private volatile boolean eof;
+
+    public ByteBufQueueInputStream() {
+        this.queue = new LinkedBlockingQueue<>();
+    }
+
+    /**
+     * Offer a ByteBuf to the queue. The caller must have already retained the 
ByteBuf if needed.
+     * The ByteBuf will be released after it is fully read. If the stream is 
already closed,
+     * the buffer is released immediately.
+     */
+    public void offer(final ByteBuf buf) {
+        if (eof) {
+            if (buf != END_OF_STREAM && buf.refCnt() > 0) {
+                buf.release();
+            }
+            return;
+        }
+        queue.add(buf);
+    }
+
+    /**
+     * Signal that no more ByteBufs will be offered.
+     */
+    public void signalEndOfStream() {
+        queue.offer(END_OF_STREAM);
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (eof) return -1;
+
+        while (current == null || !current.isReadable()) {
+            releaseCurrent();
+            try {
+                current = queue.poll(30, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted while waiting for data", e);
+            }
+            if (current == null) throw new IOException("Timed out waiting for 
streaming response data");
+            if (current == END_OF_STREAM) {
+                eof = true;
+                current = null;
+                return -1;
+            }
+        }
+        return current.readByte() & 0xFF;
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws 
IOException {
+        if (eof) return -1;
+        if (len == 0) return 0;
+
+        // Block until at least one byte is available, then return what we 
have (short read).
+        while (current == null || !current.isReadable()) {
+            releaseCurrent();
+            try {
+                current = queue.poll(30, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted while waiting for data", e);
+            }
+            if (current == null) throw new IOException("Timed out waiting for 
streaming response data");
+            if (current == END_OF_STREAM) {
+                eof = true;
+                current = null;
+                return -1;
+            }
+        }
+        final int readable = Math.min(current.readableBytes(), len);
+        current.readBytes(b, off, readable);
+        return readable;
+    }
+
+    @Override
+    public void close() throws IOException {
+        eof = true;
+        releaseCurrent();
+        // drain and release any remaining buffers
+        ByteBuf buf;
+        while ((buf = queue.poll()) != null) {
+            if (buf != END_OF_STREAM && buf.refCnt() > 0) {
+                buf.release();
+            }
+        }
+    }
+
+    private void releaseCurrent() {
+        if (current != null && current != END_OF_STREAM && current.refCnt() > 
0) {
+            current.release();
+        }
+        current = null;
+    }
+
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java
new file mode 100644
index 0000000000..11491903e8
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.stream;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import 
org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
+import org.apache.tinkerpop.gremlin.structure.io.Buffer;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.structure.io.binary.Marker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Performs pull-based streaming deserialization of a GraphBinary v4 response 
from an {@link InputStreamBuffer}.
+ * Reads one item at a time using the {@link GraphBinaryReader} and {@code 
TypeSerializer} infrastructure,
+ * pushing each result to the {@link ResultSet} as it is deserialized.
+ * <p>
+ * Wire format: {@code [version_byte][bulked_flag_byte][items...][EndOfStream 
marker][status_code][message][exception]}
+ */
+public class GraphBinaryStreamResponseReader implements Runnable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(GraphBinaryStreamResponseReader.class);
+
+    private final Buffer buffer;
+    private final GraphBinaryReader reader;
+    private final ResultSet resultSet;
+    private final AtomicReference<ResultSet> pendingResultSet;
+
+    public GraphBinaryStreamResponseReader(final Buffer buffer,
+                                           final GraphBinaryReader reader,
+                                           final ResultSet resultSet,
+                                           final AtomicReference<ResultSet> 
pendingResultSet) {
+        this.buffer = buffer;
+        this.reader = reader;
+        this.resultSet = resultSet;
+        this.pendingResultSet = pendingResultSet;
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Read header: version byte (MSB must be 1) and bulking flag
+            final byte version = buffer.readByte();
+            if ((version & 0x80) == 0) {
+                throw new RuntimeException("Invalid GraphBinary response 
version: " + version);
+            }
+            final boolean bulked = (buffer.readByte() & 1) == 1;
+
+            // Read items until EndOfStream marker
+            while (true) {
+                final Object obj = reader.read(buffer);
+                if (obj instanceof Marker) {
+                    break;
+                }
+
+                if (bulked) {
+                    final long bulk = reader.read(buffer);
+                    resultSet.add(new Result(new DefaultRemoteTraverser<>(obj, 
bulk)));
+                } else {
+                    resultSet.add(new Result(obj));
+                }
+            }
+
+            // Read footer: status code, nullable message, nullable exception
+            final int statusCode = reader.readValue(buffer, Integer.class, 
false);
+            final String message = reader.readValue(buffer, String.class, 
true);
+            final String exception = reader.readValue(buffer, String.class, 
true);
+
+            // Status code 0 means success in GraphBinary v4 — the server 
omits the HTTP status code
+            // in the binary footer when the response is successful.
+            if (statusCode == 0 || statusCode == HttpResponseStatus.OK.code()) 
{
+                resultSet.markComplete();
+            } else {
+                resultSet.markError(new 
ResponseException(HttpResponseStatus.valueOf(statusCode), message, exception));
+            }
+        } catch (Throwable t) {
+            logger.warn("Error reading streaming response", t);
+            resultSet.markError(t);
+        } finally {
+            pendingResultSet.compareAndSet(resultSet, null);
+            buffer.release();
+        }
+    }
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java
new file mode 100644
index 0000000000..4490975f23
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.stream;
+
+import org.apache.tinkerpop.gremlin.structure.io.Buffer;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A read-only {@link Buffer} implementation backed by a blocking {@link 
InputStream} via {@link DataInputStream}.
+ * Supports only sequential read operations — all write, random-access, and 
NIO methods throw
+ * {@link UnsupportedOperationException}.
+ * <p>
+ * This allows the existing {@code TypeSerializer} implementations (which only 
use sequential reads) to work
+ * unchanged over a streaming HTTP response body.
+ */
+public class InputStreamBuffer implements Buffer {
+
+    private final DataInputStream in;
+    private int bytesRead;
+
+    public InputStreamBuffer(final InputStream inputStream) {
+        this.in = new DataInputStream(inputStream);
+    }
+
+    @Override
+    public boolean readBoolean() {
+        try {
+            final boolean v = in.readBoolean();
+            bytesRead += 1;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public byte readByte() {
+        try {
+            final byte v = in.readByte();
+            bytesRead += 1;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public short readShort() {
+        try {
+            final short v = in.readShort();
+            bytesRead += 2;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public int readInt() {
+        try {
+            final int v = in.readInt();
+            bytesRead += 4;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public long readLong() {
+        try {
+            final long v = in.readLong();
+            bytesRead += 8;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public float readFloat() {
+        try {
+            final float v = in.readFloat();
+            bytesRead += 4;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public double readDouble() {
+        try {
+            final double v = in.readDouble();
+            bytesRead += 8;
+            return v;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Buffer readBytes(final byte[] destination) {
+        try {
+            in.readFully(destination);
+            bytesRead += destination.length;
+            return this;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Buffer readBytes(final byte[] destination, final int dstIndex, 
final int length) {
+        try {
+            in.readFully(destination, dstIndex, length);
+            bytesRead += length;
+            return this;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Buffer readBytes(final ByteBuffer dst) {
+        try {
+            final byte[] tmp = new byte[dst.remaining()];
+            in.readFully(tmp);
+            dst.put(tmp);
+            bytesRead += tmp.length;
+            return this;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Buffer readBytes(final OutputStream out, final int length) throws 
IOException {
+        final byte[] tmp = new byte[length];
+        in.readFully(tmp);
+        out.write(tmp);
+        bytesRead += length;
+        return this;
+    }
+
+    @Override
+    public int readerIndex() {
+        return bytesRead;
+    }
+
+    @Override
+    public int readableBytes() {
+        throw new UnsupportedOperationException("readableBytes() is not 
supported on a streaming Buffer");
+    }
+
+    @Override
+    public Buffer readerIndex(final int readerIndex) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int writerIndex() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writerIndex(final int writerIndex) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer markWriterIndex() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer resetWriterIndex() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int capacity() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isDirect() {
+        return false;
+    }
+
+    @Override
+    public Buffer writeBoolean(final boolean value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeByte(final int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeShort(final int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeInt(final int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeLong(final long value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeFloat(final float value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeDouble(final double value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeBytes(final byte[] src) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeBytes(final ByteBuffer src) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer writeBytes(final byte[] src, final int srcIndex, final int 
length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean release() {
+        try {
+            in.close();
+        } catch (IOException e) {
+            // best-effort close
+        }
+        return true;
+    }
+
+    @Override
+    public Buffer retain() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int referenceCount() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int nioBufferCount() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers(final int index, final int length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuffer nioBuffer() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuffer nioBuffer(final int index, final int length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer getBytes(final int index, final byte[] dst) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
new file mode 100644
index 0000000000..91ba9cf28a
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ByteBufQueueInputStreamTest {
+
+    @Test
+    public void shouldReadSingleByteBuf() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        final ByteBuf buf = Unpooled.buffer();
+        buf.writeBytes(new byte[]{1, 2, 3, 4});
+        stream.offer(buf);
+        stream.signalEndOfStream();
+
+        assertEquals(1, stream.read());
+        assertEquals(2, stream.read());
+        assertEquals(3, stream.read());
+        assertEquals(4, stream.read());
+        assertEquals(-1, stream.read());
+    }
+
+    @Test
+    public void shouldReadAcrossMultipleByteBufs() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2}));
+        stream.offer(Unpooled.wrappedBuffer(new byte[]{3, 4}));
+        stream.signalEndOfStream();
+
+        final byte[] result = new byte[8];
+        int totalRead = 0;
+        int read;
+        while ((read = stream.read(result, totalRead, result.length - 
totalRead)) != -1) {
+            totalRead += read;
+        }
+        assertEquals(4, totalRead);
+        assertArrayEquals(new byte[]{1, 2, 3, 4}, 
java.util.Arrays.copyOf(result, totalRead));
+    }
+
+    @Test
+    public void shouldReleaseByteBufsAfterReading() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4);
+        buf.writeBytes(new byte[]{1, 2, 3, 4});
+        assertEquals(1, buf.refCnt());
+
+        stream.offer(buf);
+        stream.signalEndOfStream();
+
+        final byte[] result = new byte[4];
+        stream.read(result, 0, 4);
+        stream.read(); // triggers release of buf and reads EOS
+
+        assertEquals(0, buf.refCnt());
+    }
+
+    @Test
+    public void shouldCleanUpOnClose() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        final ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(2);
+        buf1.writeBytes(new byte[]{1, 2});
+        final ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(2);
+        buf2.writeBytes(new byte[]{3, 4});
+
+        stream.offer(buf1);
+        stream.offer(buf2);
+        stream.close();
+
+        assertEquals(0, buf1.refCnt());
+        assertEquals(0, buf2.refCnt());
+    }
+}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java
new file mode 100644
index 0000000000..484ec7d524
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import 
org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader;
+import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer;
+import 
org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class GraphBinaryStreamResponseReaderTest {
+
+    private ExecutorService executor;
+    private GraphBinaryMessageSerializerV4 serializer;
+    private GraphBinaryReader reader;
+
+    @Before
+    public void setup() {
+        executor = Executors.newCachedThreadPool();
+        serializer = new GraphBinaryMessageSerializerV4();
+        reader = serializer.getMapper().getReader();
+    }
+
+    @After
+    public void teardown() {
+        executor.shutdownNow();
+    }
+
+    @Test
+    public void shouldReadSingleItemResponse() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        final ByteBuf payload = serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Collections.singletonList("hello")).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(payload);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        final List<Result> results = rs.all().get();
+        assertEquals(1, results.size());
+        assertEquals("hello", results.get(0).getString());
+        assertNull(pending.get());
+    }
+
+    @Test
+    public void shouldReadMultipleItemResponse() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        final ByteBuf payload = serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Arrays.asList(1, 2, 3)).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(payload);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        final List<Result> results = rs.all().get();
+        assertEquals(3, results.size());
+        assertEquals(1, results.get(0).getInt());
+        assertEquals(2, results.get(1).getInt());
+        assertEquals(3, results.get(2).getInt());
+    }
+
+    @Test
+    public void shouldReadBulkedResponse() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        final ByteBuf payload = serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Arrays.asList("a", 3L)).bulked(true).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(payload);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        final List<Result> results = rs.all().get();
+        assertEquals(1, results.size());
+        assertTrue(results.get(0).getObject() instanceof 
DefaultRemoteTraverser);
+        final DefaultRemoteTraverser<?> traverser = 
(DefaultRemoteTraverser<?>) results.get(0).getObject();
+        assertEquals("a", traverser.get());
+        assertEquals(3L, traverser.bulk());
+    }
+
+    @Test
+    public void shouldHandleErrorFooter() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        final ByteBuf payload = serializer.serializeResponseAsBinary(
+                
ResponseMessage.build().code(HttpResponseStatus.INTERNAL_SERVER_ERROR)
+                        .statusMessage("Something went wrong")
+                        .exception("java.lang.RuntimeException")
+                        .result(Collections.emptyList()).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(payload);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        assertTrue(rs.allItemsAvailable());
+        try {
+            rs.all().get();
+            fail("Expected exception");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof ResponseException);
+            final ResponseException re = (ResponseException) e.getCause();
+            assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
re.getResponseStatusCode());
+            assertEquals("Something went wrong", re.getMessage());
+        }
+        assertNull(pending.get());
+    }
+
+    @Test
+    public void shouldReadDataSplitAcrossChunks() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        // Use a large string (~1000 chars) so the split point must land 
inside it regardless of
+        // header/footer overhead. The header+footer are ~30 bytes; the string 
item is ~1000 bytes.
+        final String largeValue = String.join("", Collections.nCopies(100, 
"abcdefghij"));
+        final ByteBuf fullPayload = serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        
.result(Collections.singletonList(largeValue)).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final int splitPoint = fullPayload.readableBytes() / 2;
+        final ByteBuf chunk1 = fullPayload.readSlice(splitPoint).retain();
+        final ByteBuf chunk2 = fullPayload.retain();
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(chunk1);
+        stream.offer(chunk2);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        final List<Result> results = rs.all().get();
+        assertEquals(1, results.size());
+        assertEquals(largeValue, results.get(0).getString());
+
+        fullPayload.release();
+    }
+
+    @Test
+    public void shouldReadEmptyResponse() throws Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+        final ByteBuf payload = serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Collections.emptyList()).create(),
+                ByteBufAllocator.DEFAULT);
+
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(payload);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+        final List<Result> results = rs.all().get();
+        assertTrue(results.isEmpty());
+        assertNull(pending.get());
+    }
+}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
new file mode 100644
index 0000000000..fdd37818a4
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.DefaultLastHttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.CharsetUtil;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
+import static org.junit.Assert.*;
+
+public class HttpStreamingResponseHandlerTest {
+
+    private ExecutorService executor;
+    private GraphBinaryMessageSerializerV4 serializer;
+    private GraphBinaryReader reader;
+
+    @Before
+    public void setup() {
+        executor = Executors.newSingleThreadExecutor();
+        serializer = new GraphBinaryMessageSerializerV4();
+        reader = serializer.getMapper().getReader();
+    }
+
+    @After
+    public void teardown() {
+        executor.shutdownNow();
+    }
+
+    private EmbeddedChannel createChannel(final AtomicReference<ResultSet> 
pendingResultSet, final long maxResponseContentLength) {
+        final HttpStreamingResponseHandler handler = new 
HttpStreamingResponseHandler(
+                reader, pendingResultSet, executor, maxResponseContentLength);
+        return new EmbeddedChannel(handler);
+    }
+
+    @Test
+    public void shouldEmitLastContentReadResponseOnHappyPath() throws 
Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+        final EmbeddedChannel channel = createChannel(pending, 0);
+
+        // Serialize a valid GraphBinary response
+        final byte[] payload = toBytes(serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Collections.singletonList(1)).create(),
+                channel.alloc()));
+
+        // Send HttpResponse with GraphBinary content type
+        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
+        channel.writeInbound(response);
+
+        // Send content
+        channel.writeInbound(new 
DefaultHttpContent(Unpooled.wrappedBuffer(payload)));
+
+        // Send LastHttpContent
+        channel.writeInbound(new DefaultLastHttpContent());
+
+        // Verify LAST_CONTENT_READ_RESPONSE is emitted
+        final Object out = channel.readInbound();
+        assertSame(LAST_CONTENT_READ_RESPONSE, out);
+
+        channel.finishAndReleaseAll();
+    }
+
+    @Test
+    public void shouldHandleDoubleLastHttpContentWithoutError() throws 
Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+        final EmbeddedChannel channel = createChannel(pending, 0);
+
+        final byte[] payload = toBytes(serializer.serializeResponseAsBinary(
+                ResponseMessage.build().code(HttpResponseStatus.OK)
+                        .result(Collections.singletonList(1)).create(),
+                channel.alloc()));
+
+        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
+        channel.writeInbound(response);
+        channel.writeInbound(new 
DefaultHttpContent(Unpooled.wrappedBuffer(payload)));
+        channel.writeInbound(new DefaultLastHttpContent());
+
+        // Send a second LastHttpContent — should not throw NPE
+        channel.writeInbound(new DefaultLastHttpContent());
+
+        channel.finishAndReleaseAll();
+    }
+
+    @Test
+    public void shouldThrowTooLongFrameExceptionWhenMaxLengthExceeded() {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+        final EmbeddedChannel channel = createChannel(pending, 10);
+
+        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
+        channel.writeInbound(response);
+
+        try {
+            // Send content exceeding the 10-byte limit
+            channel.writeInbound(new 
DefaultHttpContent(Unpooled.wrappedBuffer(new byte[20])));
+            fail("Expected TooLongFrameException");
+        } catch (Exception e) {
+            assertTrue(e instanceof TooLongFrameException);
+        }
+
+        channel.finishAndReleaseAll();
+    }
+
+    @Test
+    public void shouldSignalQueueInputStreamOnChannelInactive() throws 
Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+        final EmbeddedChannel channel = createChannel(pending, 0);
+
+        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
SerTokens.MIME_GRAPHBINARY_V4);
+        channel.writeInbound(response);
+
+        // Send some content but no LastHttpContent
+        channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new 
byte[]{1, 2, 3})));
+
+        // Fire channelInactive — should not throw and should signal the stream
+        channel.pipeline().fireChannelInactive();
+
+        channel.finishAndReleaseAll();
+    }
+
+    @Test
+    public void shouldMarkErrorOnResultSetForNonGraphBinaryError() throws 
Exception {
+        final ResultSet rs = new ResultSet(executor, 
RequestMessage.build("g.V()").create(), null);
+        final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+        final EmbeddedChannel channel = createChannel(pending, 0);
+
+        // Send a 500 response with JSON content type
+        final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
"application/json");
+        channel.writeInbound(response);
+
+        // Send JSON error body
+        final String errorJson = "{\"message\":\"test error\"}";
+        channel.writeInbound(new 
DefaultLastHttpContent(Unpooled.copiedBuffer(errorJson, CharsetUtil.UTF_8)));
+
+        // Verify error is marked on the ResultSet
+        try {
+            rs.all().get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof ResponseException);
+            final ResponseException re = (ResponseException) e.getCause();
+            assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
re.getResponseStatusCode());
+            assertEquals("test error", re.getMessage());
+        }
+
+        // Verify pendingResultSet was cleared
+        assertNull(pending.get());
+
+        channel.finishAndReleaseAll();
+    }
+
+    private byte[] toBytes(final io.netty.buffer.ByteBuf buf) {
+        final byte[] bytes = new byte[buf.readableBytes()];
+        buf.readBytes(bytes);
+        buf.release();
+        return bytes;
+    }
+}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java
new file mode 100644
index 0000000000..02d029d66e
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.Unpooled;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class InputStreamBufferTest {
+
+    @Test
+    public void shouldReadPrimitivesThroughInputStreamBuffer() throws 
Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        final io.netty.buffer.ByteBuf buf = Unpooled.buffer();
+        buf.writeByte(42);
+        buf.writeInt(12345);
+        buf.writeLong(9876543210L);
+        buf.writeFloat(3.14f);
+        buf.writeDouble(2.718281828);
+        buf.writeShort(256);
+        buf.writeBoolean(true);
+        stream.offer(buf);
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        assertEquals(42, buffer.readByte());
+        assertEquals(12345, buffer.readInt());
+        assertEquals(9876543210L, buffer.readLong());
+        assertEquals(3.14f, buffer.readFloat(), 0.001f);
+        assertEquals(2.718281828, buffer.readDouble(), 0.000001);
+        assertEquals(256, buffer.readShort());
+        assertTrue(buffer.readBoolean());
+    }
+
+    @Test
+    public void shouldReadBytesArray() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(Unpooled.wrappedBuffer(new byte[]{10, 20, 30}));
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        final byte[] dest = new byte[3];
+        buffer.readBytes(dest);
+        assertArrayEquals(new byte[]{10, 20, 30}, dest);
+    }
+
+    @Test
+    public void shouldTrackReaderIndex() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+        stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 
9}));
+        stream.signalEndOfStream();
+
+        final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+        assertEquals(0, buffer.readerIndex());
+        buffer.readByte();
+        assertEquals(1, buffer.readerIndex());
+        buffer.readInt();
+        assertEquals(5, buffer.readerIndex());
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowOnReadableBytes() {
+        new InputStreamBuffer(new ByteBufQueueInputStream()).readableBytes();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowOnWriteInt() {
+        new InputStreamBuffer(new ByteBufQueueInputStream()).writeInt(1);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowOnNioBuffer() {
+        new InputStreamBuffer(new ByteBufQueueInputStream()).nioBuffer();
+    }
+}
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index 7c474d6ba0..f0cb2b9ea2 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -275,7 +275,11 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
                 // failures that follow this will show up in the response body 
instead.
                 sendHttpResponse(ctx, OK, createResponseHeaders(ctx, 
serializer, requestCtx).toArray(CharSequence[]::new));
                 sendHttpContents(ctx, requestCtx);
-                sendLastHttpContent(ctx, HttpResponseStatus.OK, "");
+                // Skip if writeError() already terminated the response (e.g., 
serialization error in makeChunk).
+                // Sending a second LastHttpContent would corrupt the HTTP 
framing on keep-alive connections.
+                if (requestCtx.getRequestState() != RequestState.ERROR) {
+                    sendLastHttpContent(ctx, HttpResponseStatus.OK, "");
+                }
             } catch (Throwable t) {
                 writeError(requestCtx, formErrorResponseMessage(t, 
requestMessage), serializer.getValue1());
             } finally {
@@ -753,19 +757,21 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
                         ctx.setRequestState(STREAMING);
                         return serializer.writeHeader(responseMessage, 
nettyContext.alloc());
                     }
-                    ctx.setRequestState(FINISHED);
 
-                    return 
serializer.serializeResponseAsBinary(ResponseMessage.build()
+                    final ByteBuf fullResponse = 
serializer.serializeResponseAsBinary(ResponseMessage.build()
                             .result(aggregate)
                             .bulked(bulking)
                             .code(HttpResponseStatus.OK)
                             .create(), nettyContext.alloc());
+                    ctx.setRequestState(FINISHED);
+                    return fullResponse;
 
                 case STREAMING:
                     return serializer.writeChunk(aggregate, 
nettyContext.alloc());
                 case FINISHING:
+                    final ByteBuf footer = 
serializer.writeFooter(responseMessage, nettyContext.alloc());
                     ctx.setRequestState(FINISHED);
-                    return serializer.writeFooter(responseMessage, 
nettyContext.alloc());
+                    return footer;
             }
 
             return serializer.serializeResponseAsBinary(responseMessage, 
nettyContext.alloc());
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
index f0f6b652c0..4fa5ba84ec 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
@@ -97,6 +97,13 @@ public class HttpHandlerUtil {
      * @param serializer        The serializer to use to serialize the error 
response.
      */
     static void writeError(final Context context, final ResponseMessage 
responseMessage, final MessageSerializer<?> serializer) {
+        // Prevent writing after the response is already terminated. A second 
write would corrupt
+        // HTTP framing on keep-alive connections, poisoning them for 
subsequent requests.
+        if (context.getRequestState() == 
HttpGremlinEndpointHandler.RequestState.ERROR ||
+            context.getRequestState() == 
HttpGremlinEndpointHandler.RequestState.FINISHED) {
+            return;
+        }
+
         try {
             final ChannelHandlerContext ctx = 
context.getChannelHandlerContext();
             final ByteBuf ByteBuf = context.getRequestState() == 
HttpGremlinEndpointHandler.RequestState.STREAMING
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java
new file mode 100644
index 0000000000..708a63e13f
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
+import static org.junit.Assert.*;
+
+/**
+ * Integration tests for streaming HTTP response support in the Java driver.
+ * Verifies that the streaming pipeline (HttpStreamingResponseHandler + 
GraphBinaryStreamResponseReader)
+ * works correctly end-to-end against a real Gremlin Server.
+ */
+public class StreamingResponseIntegrateTest extends 
AbstractGremlinServerIntegrationTest {
+
+    @Override
+    public Settings overrideSettings(final Settings settings) {
+        settings.channelizer = HttpChannelizer.class.getName();
+        return settings;
+    }
+
+    @Test
+    public void shouldStreamBasicResults() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            final List<Result> results = 
client.submit("g.inject(1,2,3)").all().get();
+            assertEquals(3, results.size());
+            assertEquals(1, results.get(0).getInt());
+            assertEquals(2, results.get(1).getInt());
+            assertEquals(3, results.get(2).getInt());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamIncrementallyWithIterator() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            final ResultSet rs = client.submit("g.inject(1,2,3,4,5)");
+
+            // Consume results one at a time via iterator
+            final Iterator<Result> iter = rs.iterator();
+            final List<Integer> collected = new ArrayList<>();
+            while (iter.hasNext()) {
+                collected.add(iter.next().getInt());
+            }
+            assertEquals(5, collected.size());
+            for (int i = 0; i < 5; i++) {
+                assertEquals(i + 1, (int) collected.get(i));
+            }
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamIncrementallyWithOne() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            final ResultSet rs = client.submit("g.inject(10,20,30)");
+
+            assertEquals(10, rs.one().getInt());
+            assertEquals(20, rs.one().getInt());
+            assertEquals(30, rs.one().getInt());
+            assertNull(rs.one());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamLargeResultSet() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            // Generate a large result set that would span multiple HTTP chunks
+            final List<Result> results = client.submit(
+                    
"g.inject(1).repeat(__.identity()).times(1000).emit()").all().get();
+            assertEquals(1000, results.size());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamEmptyResponse() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            final List<Result> results = 
client.submit("g.V().hasLabel('nonexistent')").all().get();
+            assertTrue(results.isEmpty());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldHandleServerErrorDuringStreaming() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+            client.submit("invalid_script_that_should_fail").all().get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            // Error should propagate correctly through the streaming pipeline
+            assertTrue(e.getCause() instanceof ResponseException);
+            final ResponseException re = (ResponseException) e.getCause();
+            assertEquals(HttpResponseStatus.BAD_REQUEST, 
re.getResponseStatusCode());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldReuseConnectionAfterStreamingComplete() throws Exception 
{
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+
+            // First request
+            final List<Result> results1 = 
client.submit("g.inject(1)").all().get();
+            assertEquals(1, results1.size());
+
+            // Second request on same client (should reuse connection)
+            final List<Result> results2 = 
client.submit("g.inject(2)").all().get();
+            assertEquals(1, results2.size());
+            assertEquals(2, results2.get(0).getInt());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldHandleConcurrentStreamingRequests() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+
+            final CompletableFuture<List<Result>> f1 = 
client.submit("g.inject(1,2,3)").all();
+            final CompletableFuture<List<Result>> f2 = 
client.submit("g.inject(4,5,6)").all();
+            final CompletableFuture<List<Result>> f3 = 
client.submit("g.inject(7,8,9)").all();
+
+            assertEquals(3, f1.get().size());
+            assertEquals(3, f2.get().size());
+            assertEquals(3, f3.get().size());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamWithTraversalApi() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final GraphTraversalSource g = traversal().with(
+                    DriverRemoteConnection.using(cluster));
+
+            final List<Integer> results = g.inject(1, 2, 3).toList();
+            assertEquals(3, results.size());
+            assertTrue(results.contains(1));
+            assertTrue(results.contains(2));
+            assertTrue(results.contains(3));
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldStreamVerticesFromGraph() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+
+            // Use inject to create data rather than relying on pre-loaded 
graph
+            final List<Result> results = 
client.submit("g.inject(1,2,3,4,5,6)").all().get();
+            assertEquals(6, results.size());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldReuseConnectionAfterServerError() throws Exception {
+        final Cluster cluster = TestClientFactory.build().create();
+        try {
+            final Client client = cluster.connect();
+
+            // Submit a request that causes a server error
+            try {
+                client.submit("throw new RuntimeException('test 
error')").all().get();
+                fail("Should have thrown");
+            } catch (ExecutionException e) {
+                // expected
+            }
+
+            // Connection should still be usable
+            final List<Result> results = 
client.submit("g.inject(1)").all().get();
+            assertEquals(1, results.get(0).getInt());
+        } finally {
+            cluster.close();
+        }
+    }
+}


Reply via email to