This is an automated email from the ASF dual-hosted git repository.
Cole-Greer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/master by this push:
new c11d71b30d Add streaming HTTP response support to gremlin-driver
(#3419)
c11d71b30d is described below
commit c11d71b30d444f0a5fbe90b924ad0a9b4216dca2
Author: Cole Greer <[email protected]>
AuthorDate: Tue May 19 16:59:33 2026 -0700
Add streaming HTTP response support to gremlin-driver (#3419)
Adds streaming GraphBinary response deserialization to the Java driver.
Instead of buffering the entire HTTP response body before processing,
results are now delivered to the ResultSet as they arrive from the server.
This reduces time-to-first-result for large result sets without requiring
application code changes.
Design
------
The streaming pipeline replaces HttpObjectAggregator +
HttpGremlinResponseDecoder
with a single HttpStreamingResponseHandler that feeds HTTP content chunks
to a
reader thread via a BlockingQueue<ByteBuf>. The reader deserializes
GraphBinary
items one at a time using an InputStream-backed Buffer adapter, allowing
existing
TypeSerializer implementations to work unchanged.
New classes:
- HttpStreamingResponseHandler: Netty handler orchestrating the streaming
lifecycle
- ByteBufQueueInputStream: bridges event loop to reader thread via blocking
queue
- InputStreamBuffer: read-only Buffer over InputStream for TypeSerializers
- GraphBinaryStreamResponseReader: pull-based deserializer on a dedicated
thread
Non-GraphBinary serializers automatically fall back to the buffered
pipeline.
Connection lifecycle
--------------------
Connection pool return is driven by wire-level completion
(LAST_CONTENT_READ_RESPONSE),
not application-level completion (ResultSet.markComplete). This ensures all
HTTP bytes
are consumed before the connection is reused, preventing framing corruption.
An AtomicBoolean guard makes returnToPool() idempotent across concurrent
error paths.
Error handling races between the event loop and reader thread are resolved
by marking
errors on the ResultSet before signaling end-of-stream — CompletableFuture's
single-completion semantics guarantee the correct error surfaces to the
caller.
Server-side fixes
-----------------
Prevents connection corruption from double-LastHttpContent when
writeError() is
called after response streaming has already terminated. Guards writeError()
against
writes in FINISHED/ERROR state and defers state transitions until after
serialization.
---
CHANGELOG.asciidoc | 1 +
docs/src/upgrade/release-4.x.x.asciidoc | 9 +
.../tinkerpop/gremlin/driver/Channelizer.java | 44 ++-
.../apache/tinkerpop/gremlin/driver/Cluster.java | 18 ++
.../tinkerpop/gremlin/driver/Connection.java | 23 +-
.../tinkerpop/gremlin/driver/ConnectionPool.java | 1 +
.../driver/handler/GremlinResponseHandler.java | 21 +-
.../handler/HttpStreamingResponseHandler.java | 242 ++++++++++++++++
.../driver/stream/ByteBufQueueInputStream.java | 137 +++++++++
.../stream/GraphBinaryStreamResponseReader.java | 105 +++++++
.../gremlin/driver/stream/InputStreamBuffer.java | 311 +++++++++++++++++++++
.../handler/ByteBufQueueInputStreamTest.java | 95 +++++++
.../GraphBinaryStreamResponseReaderTest.java | 226 +++++++++++++++
.../handler/HttpStreamingResponseHandlerTest.java | 207 ++++++++++++++
.../driver/handler/InputStreamBufferTest.java | 94 +++++++
.../server/handler/HttpGremlinEndpointHandler.java | 14 +-
.../gremlin/server/handler/HttpHandlerUtil.java | 7 +
.../server/StreamingResponseIntegrateTest.java | 239 ++++++++++++++++
18 files changed, 1766 insertions(+), 28 deletions(-)
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index ec09926402..284bbcca26 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -28,6 +28,7 @@
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Added `NextN(n)` to `Traversal` in `gremlin-go` for batched result
iteration, providing API parity with `next(n)` in the Java, Python, and .NET
GLVs, and updated the Go translators in `gremlin-core` and `gremlin-javascript`
to emit `NextN(n)` for the batched form.
* Added Gremlator, a single page web application, that translates Gremlin into
various programming languages like Javascript and Python.
* Removed `uuid` dependency from `gremlin-javascript` in favor of the built-in
`globalThis.crypto.randomUUID()`.
+* Added streaming HTTP response support to `gremlin-driver` for incremental
result deserialization over GraphBinary.
* Connected HTTP streaming response deserialization to the traversal API in
`gremlin-javascript`, enabling `next()` to return the first result without
waiting for the full response.
* Changed `Client.stream()` in `gremlin-javascript` to return an
`AsyncGenerator` for direct incremental consumption.
* Removed `readable-stream` dependency from `gremlin-javascript`.
diff --git a/docs/src/upgrade/release-4.x.x.asciidoc
b/docs/src/upgrade/release-4.x.x.asciidoc
index 2df3fbc987..960c090f5d 100644
--- a/docs/src/upgrade/release-4.x.x.asciidoc
+++ b/docs/src/upgrade/release-4.x.x.asciidoc
@@ -277,6 +277,15 @@ try {
The traversal API is not affected — `Next()`, `ToList()`, etc. still throw
`ResponseException` directly since they
block on the async stream internally.
+==== Streaming Response Deserialization in gremlin-driver
+
+The Java driver now deserializes HTTP responses incrementally, delivering
results to the `ResultSet` as they arrive
+rather than buffering the entire response. This reduces time-to-first-result
for large result sets.
+
+This change is automatic and requires no code changes. It applies only when
using the default GraphBinary serializer.
+Custom `MessageSerializer` implementations fall back to the non-streaming
pipeline that buffers the full response
+before deserialization. The `ResultSet` API is unchanged.
+
==== More Secure Gremlin Server
Previous versions of Gremlin Server relied on a Gremlin-flavored Groovy
`ScriptEngine` for basic server initialization,
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 427f76c056..59b9d02365 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -34,9 +34,13 @@ import
org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler;
import
org.apache.tinkerpop.gremlin.driver.handler.HttpContentDecompressionHandler;
import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder;
+import
org.apache.tinkerpop.gremlin.driver.handler.HttpStreamingResponseHandler;
import org.apache.tinkerpop.gremlin.driver.handler.IdleConnectionHandler;
import org.apache.tinkerpop.gremlin.driver.handler.InactiveChannelHandler;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import java.util.Collections;
import java.util.Optional;
@@ -92,7 +96,7 @@ public interface Channelizer extends ChannelHandler {
protected Connection connection;
protected Cluster cluster;
protected SslHandler sslHandler;
- private AtomicReference<ResultSet> pending;
+ protected AtomicReference<ResultSet> pending;
protected static final String PIPELINE_GREMLIN_HANDLER =
"gremlin-handler";
protected static final String PIPELINE_SSL_HANDLER =
"gremlin-ssl-handler";
@@ -158,7 +162,6 @@ public interface Channelizer extends ChannelHandler {
}
configure(pipeline);
- pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new
GremlinResponseHandler(pending));
}
@Override
@@ -187,7 +190,9 @@ public interface Channelizer extends ChannelHandler {
ResponseMessage.build().code(HttpResponseStatus.NO_CONTENT).result(Collections.emptyList()).create();
private HttpGremlinRequestEncoder gremlinRequestEncoder;
+ private HttpStreamingResponseHandler streamingResponseHandler;
private HttpGremlinResponseDecoder gremlinResponseDecoder;
+ private boolean useStreaming;
private HttpContentDecompressionHandler httpCompressionDecoder;
private IdleStateHandler idleStateHandler;
@@ -200,7 +205,19 @@ public interface Channelizer extends ChannelHandler {
httpCompressionDecoder = new HttpContentDecompressionHandler();
gremlinRequestEncoder = new
HttpGremlinRequestEncoder(cluster.getSerializer(),
cluster.getRequestInterceptors(),
cluster.isUserAgentOnConnectEnabled(),
cluster.isBulkResultsEnabled(), connection.getUri());
- gremlinResponseDecoder = new
HttpGremlinResponseDecoder(cluster.getSerializer());
+
+ final MessageSerializer<?> serializer = cluster.getSerializer();
+ if (serializer instanceof GraphBinaryMessageSerializerV4) {
+ useStreaming = true;
+ final GraphBinaryReader graphBinaryReader =
+ ((GraphBinaryMessageSerializerV4)
serializer).getMapper().getReader();
+ streamingResponseHandler = new HttpStreamingResponseHandler(
+ graphBinaryReader, pending,
cluster.streamingReaderPool(), cluster.getMaxResponseContentLength());
+ } else {
+ useStreaming = false;
+ gremlinResponseDecoder = new
HttpGremlinResponseDecoder(serializer);
+ }
+
if (cluster.getIdleConnectionTimeout() > 0) {
final int idleConnectionTimeout = (int)
(cluster.getIdleConnectionTimeout() / 1000);
idleStateHandler = new IdleStateHandler(idleConnectionTimeout,
idleConnectionTimeout, 0);
@@ -240,11 +257,22 @@ public interface Channelizer extends ChannelHandler {
DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS, false);
pipeline.addLast(PIPELINE_HTTP_CODEC, handler);
- pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new
HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0
- ? (int) cluster.getMaxResponseContentLength() :
Integer.MAX_VALUE));
- pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
- pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER,
httpCompressionDecoder);
- pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder);
+ if (useStreaming) {
+ pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER,
httpCompressionDecoder);
+ pipeline.addLast(PIPELINE_HTTP_DECODER,
streamingResponseHandler);
+ pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
+ } else {
+ pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new
HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0
+ ? (int) cluster.getMaxResponseContentLength() :
Integer.MAX_VALUE));
+ pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
+ pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER,
httpCompressionDecoder);
+ pipeline.addLast(PIPELINE_HTTP_DECODER,
gremlinResponseDecoder);
+ }
+
+ pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new
GremlinResponseHandler(pending, () -> {
+ connection.returnToPool();
+ connection.tryShutdown();
+ }, useStreaming));
}
}
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index cac193814c..d4a3967f02 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -70,8 +70,11 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -383,6 +386,10 @@ public final class Cluster {
return manager.connectionScheduler;
}
+ ExecutorService streamingReaderPool() {
+ return manager.streamingReaderPool;
+ }
+
Settings.ConnectionPoolSettings connectionPoolSettings() {
return manager.connectionPoolSettings;
}
@@ -956,6 +963,12 @@ public final class Cluster {
*/
private final ScheduledThreadPoolExecutor connectionScheduler;
+ /**
+ * Cached thread pool for streaming response reader threads. One
thread per active streaming response,
+ * bounded implicitly by the connection pool size.
+ */
+ private final ExecutorService streamingReaderPool;
+
private final int nioPoolSize;
private final int workerPoolSize;
private final int port;
@@ -1023,6 +1036,10 @@ public final class Cluster {
this.connectionScheduler = new
ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
new
BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build());
+ this.streamingReaderPool = new ThreadPoolExecutor(0,
builder.maxConnectionPoolSize * Math.max(contactPoints.size(), 1) * 4,
+ 60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
+ new
BasicThreadFactory.Builder().namingPattern("gremlin-driver-stream-reader-%d").build());
+
validationRequest = () ->
RequestMessage.build(builder.validationRequest);
}
@@ -1133,6 +1150,7 @@ public final class Cluster {
executor.shutdown();
hostScheduler.shutdown();
connectionScheduler.shutdown();
+ streamingReaderPool.shutdownNow();
closeIt.complete(null);
});
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index cb94b39b87..12a1de646c 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -66,6 +66,14 @@ final class Connection {
* Is a {@code Connection} borrowed from the pool.
*/
private final AtomicBoolean isBorrowed = new AtomicBoolean(false);
+ /**
+ * Prevents returnToPool() from being called more than once per borrow
cycle.
+ */
+ private final AtomicBoolean returned = new AtomicBoolean(false);
+
+ void resetReturned() {
+ returned.set(false);
+ }
/**
* This boolean guards the replace of the connection and ensures that it
only occurs once.
*/
@@ -201,7 +209,7 @@ final class Connection {
} else {
final ResultSet resultSet = new
ResultSet(cluster.executor(), requestMessage, pool.host);
- resultSet.getReadCompleted().whenCompleteAsync((v, t)
-> {
+ resultSet.getReadCompleted().whenComplete((v, t) -> {
if (t != null) {
// the callback for when the read failed. a
failed read means the request went to the server
// and came back with a server-side error of
some sort. it means the server is responsive
@@ -209,17 +217,13 @@ final class Connection {
// write operation.
logger.debug("Error while processing request
on the server {}.", this, t);
handleConnectionCleanupOnError(thisConnection);
- } else {
- // the callback for when the read was
successful, meaning that ResultSet.markComplete()
- // was called
- thisConnection.returnToPool();
}
// While this request was in process, close might
have been signaled in closeAsync().
// However, close would be blocked until all
pending requests are completed. Attempt
// the shutdown if the returned result cleared up
the last pending message and unblocked
// the close.
tryShutdown();
- }, cluster.executor());
+ });
pending.set(resultSet);
@@ -234,7 +238,8 @@ final class Connection {
return requestPromise;
}
- private void returnToPool() {
+ void returnToPool() {
+ if (!returned.compareAndSet(false, true)) return;
try {
if (pool != null) pool.returnConnection(this);
} catch (ConnectionException ce) {
@@ -244,7 +249,7 @@ final class Connection {
}
private void handleConnectionCleanupOnError(final Connection
thisConnection) {
- if (thisConnection.isDead()) {
+ if (thisConnection.isDead() || (thisConnection.channel != null &&
!thisConnection.channel.isOpen())) {
if (pool != null) pool.replaceConnection(thisConnection);
} else {
thisConnection.returnToPool();
@@ -259,7 +264,7 @@ final class Connection {
* Close was signaled in closeAsync() but there were pending messages at
that time. This method attempts the
* shutdown if the returned result cleared up the last pending message.
*/
- private void tryShutdown() {
+ void tryShutdown() {
if (isClosing() && isOkToClose())
shutdown(closeFuture.get());
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index ee4c34c5aa..4f40b6733a 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -525,6 +525,7 @@ final class ConnectionPool {
while (head != null) {
// try to borrow connection
if (!head.isDead() && !head.isBorrowed().get() &&
head.isBorrowed().compareAndSet(false, true)) {
+ head.resetReturned();
available = head;
break;
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
index d4cfb167d2..94898783eb 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
@@ -49,9 +49,13 @@ public class GremlinResponseHandler extends
SimpleChannelInboundHandler<Response
private static final Logger logger =
LoggerFactory.getLogger(GremlinResponseHandler.class);
private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION =
AttributeKey.valueOf("caughtException");
private final AtomicReference<ResultSet> pendingResultSet;
+ private final Runnable onResponseComplete;
+ private final boolean streaming;
- public GremlinResponseHandler(final AtomicReference<ResultSet> pending) {
+ public GremlinResponseHandler(final AtomicReference<ResultSet> pending,
final Runnable onResponseComplete, final boolean streaming) {
this.pendingResultSet = pending;
+ this.onResponseComplete = onResponseComplete;
+ this.streaming = streaming;
}
@Override
@@ -100,14 +104,17 @@ public class GremlinResponseHandler extends
SimpleChannelInboundHandler<Response
// Stream is done when the last content signaling response message is
read.
if (LAST_CONTENT_READ_RESPONSE == response) {
- final ResultSet rs = pendingResultSet.getAndSet(null);
- if (rs != null) {
- if (null ==
channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
- rs.markComplete();
- } else {
-
rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
+ if (!streaming) {
+ final ResultSet rs = pendingResultSet.getAndSet(null);
+ if (rs != null) {
+ if (null ==
channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
+ rs.markComplete();
+ } else {
+
rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
+ }
}
}
+ onResponseComplete.run();
}
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
new file mode 100644
index 0000000000..607fd6bdf9
--- /dev/null
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.CharsetUtil;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import
org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader;
+import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
+import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
+import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
+
+/**
+ * Decodes chunked HTTP responses into streaming results without buffering the
full response body.
+ * <p>
+ * For GraphBinary responses, content chunks are passed to a {@link
ByteBufQueueInputStream} consumed by a
+ * {@link GraphBinaryStreamResponseReader} on a separate thread. That reader
deserializes results incrementally,
+ * delivers them to the {@code ResultSet}, and handles completion and cleanup.
For non-GraphBinary error responses
+ * (e.g., JSON 401/500), the error body is accumulated and parsed when the
response ends, then
+ * {@code LAST_CONTENT_READ_RESPONSE} is fired for {@link
GremlinResponseHandler} to process.
+ */
+public class HttpStreamingResponseHandler extends
MessageToMessageDecoder<HttpObject> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(HttpStreamingResponseHandler.class);
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private final GraphBinaryReader graphBinaryReader;
+ private final AtomicReference<ResultSet> pendingResultSet;
+ private final ExecutorService readerPool;
+ private final long maxResponseContentLength;
+
+ // Mutable state below is accessed exclusively from the channel's event
loop thread.
+ private HttpResponseStatus responseStatus;
+ private String contentType;
+ private long bytesRead;
+ private ByteBufQueueInputStream queueInputStream;
+ private CompositeByteBuf errorBody;
+
+ public HttpStreamingResponseHandler(final GraphBinaryReader
graphBinaryReader,
+ final AtomicReference<ResultSet>
pendingResultSet,
+ final ExecutorService readerPool,
+ final long maxResponseContentLength) {
+ this.graphBinaryReader = graphBinaryReader;
+ this.pendingResultSet = pendingResultSet;
+ this.readerPool = readerPool;
+ this.maxResponseContentLength = maxResponseContentLength;
+ }
+
+ @Override
+ protected void decode(final ChannelHandlerContext ctx, final HttpObject
msg,
+ final List<Object> out) throws Exception {
+ if (msg instanceof HttpResponse) {
+ final HttpResponse resp = (HttpResponse) msg;
+
+ // Reset mutable state for the new response cycle to prevent stale
state from a previous
+ // response bleeding into this one when the handler is reused on
the same connection.
+ resetState();
+
+ responseStatus = resp.status();
+ contentType = resp.headers().get(HttpHeaderNames.CONTENT_TYPE);
+ queueInputStream = new ByteBufQueueInputStream();
+
+ // Spawn reader thread for GraphBinary responses
+ if (isGraphBinaryResponse()) {
+ final ResultSet rs = pendingResultSet.get();
+ if (rs != null) {
+ final InputStreamBuffer buffer = new
InputStreamBuffer(queueInputStream);
+ final GraphBinaryStreamResponseReader streamReader =
+ new GraphBinaryStreamResponseReader(buffer,
graphBinaryReader, rs, pendingResultSet);
+ try {
+ readerPool.submit(streamReader::run);
+ } catch (RejectedExecutionException e) {
+ queueInputStream.signalEndOfStream();
+ rs.markError(e);
+ pendingResultSet.compareAndSet(rs, null);
+ out.add(LAST_CONTENT_READ_RESPONSE);
+ }
+ } else {
+ // No pending ResultSet — close the stream and fire
sentinel immediately
+ queueInputStream.signalEndOfStream();
+ queueInputStream = null;
+ out.add(LAST_CONTENT_READ_RESPONSE);
+ }
+ }
+ }
+
+ if (msg instanceof HttpContent) {
+ final ByteBuf content = ((HttpContent) msg).content();
+ bytesRead += content.readableBytes();
+
+ if (bytesRead > 0 &&
ctx.channel().attr(InactiveChannelHandler.BYTES_READ).get() == null) {
+ ctx.channel().attr(InactiveChannelHandler.BYTES_READ).set(0);
+ }
+
+ if (maxResponseContentLength > 0 && bytesRead >
maxResponseContentLength) {
+ throw new TooLongFrameException("Response entity too large");
+ }
+
+ if (!isGraphBinaryResponse()) {
+ // Accumulate non-GraphBinary error body across chunks
+ if (content.readableBytes() > 0) {
+ if (errorBody == null) {
+ errorBody = ctx.alloc().compositeBuffer();
+ }
+ // retain() because Netty releases the content ByteBuf
after decode() returns
+ errorBody.addComponent(true, content.retain());
+ }
+ } else if (content.readableBytes() > 0 && queueInputStream !=
null) {
+ // Feed bytes to the reader thread
+ // retain() because Netty releases the content ByteBuf after
decode() returns
+ queueInputStream.offer(content.retain());
+ }
+
+ if (msg instanceof LastHttpContent) {
+ if (isGraphBinaryResponse()) {
+ if (queueInputStream != null) {
+ queueInputStream.signalEndOfStream();
+ // Null out so any spurious content arriving between
responses is dropped
+ // rather than offered to the already-closed stream.
+ queueInputStream = null;
+ }
+ out.add(LAST_CONTENT_READ_RESPONSE);
+ } else {
+ // Non-GraphBinary error — parse accumulated body and fire
sentinel
+ handleNonGraphBinaryError();
+ out.add(LAST_CONTENT_READ_RESPONSE);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws
Exception {
+ if (queueInputStream != null) {
+ queueInputStream.signalEndOfStream();
+ }
+ releaseErrorBody();
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final
Throwable cause) throws Exception {
+ // Mark error before signaling end-of-stream so the reader thread
can't race
+ // with an EOFException from the closed stream.
+ final ResultSet rs = pendingResultSet.getAndSet(null);
+ if (rs != null) {
+ rs.markError(cause);
+ }
+ if (queueInputStream != null) {
+ queueInputStream.signalEndOfStream();
+ }
+ releaseErrorBody();
+ super.exceptionCaught(ctx, cause);
+ }
+
+ private void handleNonGraphBinaryError() {
+ final ResultSet rs = pendingResultSet.get();
+ if (rs == null) return;
+
+ try {
+ if (errorBody != null && errorBody.readableBytes() > 0) {
+ final JsonNode node =
mapper.readTree(errorBody.toString(CharsetUtil.UTF_8));
+ final String message = node.has("message") ?
node.get("message").asText() : responseStatus.reasonPhrase();
+ rs.markError(new ResponseException(responseStatus, message));
+ } else {
+ rs.markError(new ResponseException(responseStatus,
responseStatus.reasonPhrase()));
+ }
+ } catch (Exception e) {
+ logger.debug("Failed to parse error response body as JSON", e);
+ rs.markError(new ResponseException(responseStatus,
responseStatus.reasonPhrase()));
+ } finally {
+ pendingResultSet.compareAndSet(rs, null);
+ releaseErrorBody();
+ }
+ }
+
+ private void resetState() {
+ // Clean up any leftover resources from a previous response on this
connection
+ if (queueInputStream != null) {
+ queueInputStream.signalEndOfStream();
+ queueInputStream = null;
+ }
+ releaseErrorBody();
+ bytesRead = 0;
+ responseStatus = null;
+ contentType = null;
+ }
+
+ private void releaseErrorBody() {
+ if (errorBody != null) {
+ errorBody.release();
+ errorBody = null;
+ }
+ }
+
+ private boolean isGraphBinaryResponse() {
+ return !isError(responseStatus) ||
SerTokens.MIME_GRAPHBINARY_V4.equals(contentType);
+ }
+
+ private static boolean isError(final HttpResponseStatus status) {
+ return status != HttpResponseStatus.OK;
+ }
+}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
new file mode 100644
index 0000000000..757b2821cb
--- /dev/null
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.stream;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An {@link InputStream} backed by a {@link BlockingQueue} of {@link ByteBuf}
objects. The Netty event loop
+ * offers ByteBufs to the queue as HTTP content chunks arrive, and a reader
thread consumes them via
+ * standard InputStream reads.
+ */
+public class ByteBufQueueInputStream extends InputStream {
+
+ private static final ByteBuf END_OF_STREAM = Unpooled.buffer(0);
+
+ private final BlockingQueue<ByteBuf> queue;
+ private ByteBuf current;
+ private volatile boolean eof;
+
+ public ByteBufQueueInputStream() {
+ this.queue = new LinkedBlockingQueue<>();
+ }
+
+ /**
+ * Offer a ByteBuf to the queue. The caller must have already retained the
ByteBuf if needed.
+ * The ByteBuf will be released after it is fully read. If the stream is
already closed,
+ * the buffer is released immediately.
+ */
+ public void offer(final ByteBuf buf) {
+ if (eof) {
+ if (buf != END_OF_STREAM && buf.refCnt() > 0) {
+ buf.release();
+ }
+ return;
+ }
+ queue.add(buf);
+ }
+
+ /**
+ * Signal that no more ByteBufs will be offered.
+ */
+ public void signalEndOfStream() {
+ queue.offer(END_OF_STREAM);
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (eof) return -1;
+
+ while (current == null || !current.isReadable()) {
+ releaseCurrent();
+ try {
+ current = queue.poll(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for data", e);
+ }
+ if (current == null) throw new IOException("Timed out waiting for
streaming response data");
+ if (current == END_OF_STREAM) {
+ eof = true;
+ current = null;
+ return -1;
+ }
+ }
+ return current.readByte() & 0xFF;
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws
IOException {
+ if (eof) return -1;
+ if (len == 0) return 0;
+
+ // Block until at least one byte is available, then return what we
have (short read).
+ while (current == null || !current.isReadable()) {
+ releaseCurrent();
+ try {
+ current = queue.poll(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for data", e);
+ }
+ if (current == null) throw new IOException("Timed out waiting for
streaming response data");
+ if (current == END_OF_STREAM) {
+ eof = true;
+ current = null;
+ return -1;
+ }
+ }
+ final int readable = Math.min(current.readableBytes(), len);
+ current.readBytes(b, off, readable);
+ return readable;
+ }
+
+ @Override
+ public void close() throws IOException {
+ eof = true;
+ releaseCurrent();
+ // drain and release any remaining buffers
+ ByteBuf buf;
+ while ((buf = queue.poll()) != null) {
+ if (buf != END_OF_STREAM && buf.refCnt() > 0) {
+ buf.release();
+ }
+ }
+ }
+
+ private void releaseCurrent() {
+ if (current != null && current != END_OF_STREAM && current.refCnt() >
0) {
+ current.release();
+ }
+ current = null;
+ }
+
+}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java
new file mode 100644
index 0000000000..11491903e8
--- /dev/null
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.stream;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import
org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
+import org.apache.tinkerpop.gremlin.structure.io.Buffer;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.structure.io.binary.Marker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Performs pull-based streaming deserialization of a GraphBinary v4 response
from an {@link InputStreamBuffer}.
+ * Reads one item at a time using the {@link GraphBinaryReader} and {@code
TypeSerializer} infrastructure,
+ * pushing each result to the {@link ResultSet} as it is deserialized.
+ * <p>
+ * Wire format: {@code [version_byte][bulked_flag_byte][items...][EndOfStream
marker][status_code][message][exception]}
+ */
+public class GraphBinaryStreamResponseReader implements Runnable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(GraphBinaryStreamResponseReader.class);
+
+ private final Buffer buffer;
+ private final GraphBinaryReader reader;
+ private final ResultSet resultSet;
+ private final AtomicReference<ResultSet> pendingResultSet;
+
+ public GraphBinaryStreamResponseReader(final Buffer buffer,
+ final GraphBinaryReader reader,
+ final ResultSet resultSet,
+ final AtomicReference<ResultSet>
pendingResultSet) {
+ this.buffer = buffer;
+ this.reader = reader;
+ this.resultSet = resultSet;
+ this.pendingResultSet = pendingResultSet;
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Read header: version byte (MSB must be 1) and bulking flag
+ final byte version = buffer.readByte();
+ if ((version & 0x80) == 0) {
+ throw new RuntimeException("Invalid GraphBinary response
version: " + version);
+ }
+ final boolean bulked = (buffer.readByte() & 1) == 1;
+
+ // Read items until EndOfStream marker
+ while (true) {
+ final Object obj = reader.read(buffer);
+ if (obj instanceof Marker) {
+ break;
+ }
+
+ if (bulked) {
+ final long bulk = reader.read(buffer);
+ resultSet.add(new Result(new DefaultRemoteTraverser<>(obj,
bulk)));
+ } else {
+ resultSet.add(new Result(obj));
+ }
+ }
+
+ // Read footer: status code, nullable message, nullable exception
+ final int statusCode = reader.readValue(buffer, Integer.class,
false);
+ final String message = reader.readValue(buffer, String.class,
true);
+ final String exception = reader.readValue(buffer, String.class,
true);
+
+ // Status code 0 means success in GraphBinary v4 — the server
omits the HTTP status code
+ // in the binary footer when the response is successful.
+ if (statusCode == 0 || statusCode == HttpResponseStatus.OK.code())
{
+ resultSet.markComplete();
+ } else {
+ resultSet.markError(new
ResponseException(HttpResponseStatus.valueOf(statusCode), message, exception));
+ }
+ } catch (Throwable t) {
+ logger.warn("Error reading streaming response", t);
+ resultSet.markError(t);
+ } finally {
+ pendingResultSet.compareAndSet(resultSet, null);
+ buffer.release();
+ }
+ }
+}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java
new file mode 100644
index 0000000000..4490975f23
--- /dev/null
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.stream;
+
+import org.apache.tinkerpop.gremlin.structure.io.Buffer;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A read-only {@link Buffer} implementation backed by a blocking {@link
InputStream} via {@link DataInputStream}.
+ * Supports only sequential read operations — all write, random-access, and
NIO methods throw
+ * {@link UnsupportedOperationException}.
+ * <p>
+ * This allows the existing {@code TypeSerializer} implementations (which only
use sequential reads) to work
+ * unchanged over a streaming HTTP response body.
+ */
+public class InputStreamBuffer implements Buffer {
+
+ private final DataInputStream in;
+ private int bytesRead;
+
+ public InputStreamBuffer(final InputStream inputStream) {
+ this.in = new DataInputStream(inputStream);
+ }
+
+ @Override
+ public boolean readBoolean() {
+ try {
+ final boolean v = in.readBoolean();
+ bytesRead += 1;
+ return v;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public byte readByte() {
+ try {
+ final byte v = in.readByte();
+ bytesRead += 1;
+ return v;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public short readShort() {
+ try {
+ final short v = in.readShort();
+ bytesRead += 2;
+ return v;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public int readInt() {
+ try {
+ final int v = in.readInt();
+ bytesRead += 4;
+ return v;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public long readLong() {
+ try {
+ final long v = in.readLong();
+ bytesRead += 8;
+ return v;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public float readFloat() {
+ try {
+ final float v = in.readFloat();
+ bytesRead += 4;
+ return v;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public double readDouble() {
+ try {
+ final double v = in.readDouble();
+ bytesRead += 8;
+ return v;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Buffer readBytes(final byte[] destination) {
+ try {
+ in.readFully(destination);
+ bytesRead += destination.length;
+ return this;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Buffer readBytes(final byte[] destination, final int dstIndex,
final int length) {
+ try {
+ in.readFully(destination, dstIndex, length);
+ bytesRead += length;
+ return this;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Buffer readBytes(final ByteBuffer dst) {
+ try {
+ final byte[] tmp = new byte[dst.remaining()];
+ in.readFully(tmp);
+ dst.put(tmp);
+ bytesRead += tmp.length;
+ return this;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Buffer readBytes(final OutputStream out, final int length) throws
IOException {
+ final byte[] tmp = new byte[length];
+ in.readFully(tmp);
+ out.write(tmp);
+ bytesRead += length;
+ return this;
+ }
+
+ @Override
+ public int readerIndex() {
+ return bytesRead;
+ }
+
+ @Override
+ public int readableBytes() {
+ throw new UnsupportedOperationException("readableBytes() is not
supported on a streaming Buffer");
+ }
+
+ @Override
+ public Buffer readerIndex(final int readerIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int writerIndex() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer writerIndex(final int writerIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer markWriterIndex() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer resetWriterIndex() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int capacity() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDirect() {
+ return false;
+ }
+
+ @Override
+ public Buffer writeBoolean(final boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer writeByte(final int value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer writeShort(final int value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer writeInt(final int value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer writeLong(final long value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer writeFloat(final float value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer writeDouble(final double value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer writeBytes(final byte[] src) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer writeBytes(final ByteBuffer src) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer writeBytes(final byte[] src, final int srcIndex, final int
length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean release() {
+ try {
+ in.close();
+ } catch (IOException e) {
+ // best-effort close
+ }
+ return true;
+ }
+
+ @Override
+ public Buffer retain() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int referenceCount() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int nioBufferCount() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers(final int index, final int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuffer nioBuffer() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuffer nioBuffer(final int index, final int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer getBytes(final int index, final byte[] dst) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
new file mode 100644
index 0000000000..91ba9cf28a
--- /dev/null
+++
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ByteBufQueueInputStreamTest {
+
+ @Test
+ public void shouldReadSingleByteBuf() throws Exception {
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ final ByteBuf buf = Unpooled.buffer();
+ buf.writeBytes(new byte[]{1, 2, 3, 4});
+ stream.offer(buf);
+ stream.signalEndOfStream();
+
+ assertEquals(1, stream.read());
+ assertEquals(2, stream.read());
+ assertEquals(3, stream.read());
+ assertEquals(4, stream.read());
+ assertEquals(-1, stream.read());
+ }
+
+ @Test
+ public void shouldReadAcrossMultipleByteBufs() throws Exception {
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2}));
+ stream.offer(Unpooled.wrappedBuffer(new byte[]{3, 4}));
+ stream.signalEndOfStream();
+
+ final byte[] result = new byte[8];
+ int totalRead = 0;
+ int read;
+ while ((read = stream.read(result, totalRead, result.length -
totalRead)) != -1) {
+ totalRead += read;
+ }
+ assertEquals(4, totalRead);
+ assertArrayEquals(new byte[]{1, 2, 3, 4},
java.util.Arrays.copyOf(result, totalRead));
+ }
+
+ @Test
+ public void shouldReleaseByteBufsAfterReading() throws Exception {
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4);
+ buf.writeBytes(new byte[]{1, 2, 3, 4});
+ assertEquals(1, buf.refCnt());
+
+ stream.offer(buf);
+ stream.signalEndOfStream();
+
+ final byte[] result = new byte[4];
+ stream.read(result, 0, 4);
+ stream.read(); // triggers release of buf and reads EOS
+
+ assertEquals(0, buf.refCnt());
+ }
+
+ @Test
+ public void shouldCleanUpOnClose() throws Exception {
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ final ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(2);
+ buf1.writeBytes(new byte[]{1, 2});
+ final ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(2);
+ buf2.writeBytes(new byte[]{3, 4});
+
+ stream.offer(buf1);
+ stream.offer(buf2);
+ stream.close();
+
+ assertEquals(0, buf1.refCnt());
+ assertEquals(0, buf2.refCnt());
+ }
+}
diff --git
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java
new file mode 100644
index 0000000000..484ec7d524
--- /dev/null
+++
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import
org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader;
+import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer;
+import
org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class GraphBinaryStreamResponseReaderTest {
+
+ private ExecutorService executor;
+ private GraphBinaryMessageSerializerV4 serializer;
+ private GraphBinaryReader reader;
+
+ @Before
+ public void setup() {
+ executor = Executors.newCachedThreadPool();
+ serializer = new GraphBinaryMessageSerializerV4();
+ reader = serializer.getMapper().getReader();
+ }
+
+ @After
+ public void teardown() {
+ executor.shutdownNow();
+ }
+
+ @Test
+ public void shouldReadSingleItemResponse() throws Exception {
+ final ResultSet rs = new ResultSet(executor,
RequestMessage.build("g.V()").create(), null);
+ final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+ final ByteBuf payload = serializer.serializeResponseAsBinary(
+ ResponseMessage.build().code(HttpResponseStatus.OK)
+ .result(Collections.singletonList("hello")).create(),
+ ByteBufAllocator.DEFAULT);
+
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ stream.offer(payload);
+ stream.signalEndOfStream();
+
+ final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+ new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+ final List<Result> results = rs.all().get();
+ assertEquals(1, results.size());
+ assertEquals("hello", results.get(0).getString());
+ assertNull(pending.get());
+ }
+
+ @Test
+ public void shouldReadMultipleItemResponse() throws Exception {
+ final ResultSet rs = new ResultSet(executor,
RequestMessage.build("g.V()").create(), null);
+ final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+ final ByteBuf payload = serializer.serializeResponseAsBinary(
+ ResponseMessage.build().code(HttpResponseStatus.OK)
+ .result(Arrays.asList(1, 2, 3)).create(),
+ ByteBufAllocator.DEFAULT);
+
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ stream.offer(payload);
+ stream.signalEndOfStream();
+
+ final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+ new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+ final List<Result> results = rs.all().get();
+ assertEquals(3, results.size());
+ assertEquals(1, results.get(0).getInt());
+ assertEquals(2, results.get(1).getInt());
+ assertEquals(3, results.get(2).getInt());
+ }
+
+ @Test
+ public void shouldReadBulkedResponse() throws Exception {
+ final ResultSet rs = new ResultSet(executor,
RequestMessage.build("g.V()").create(), null);
+ final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+ final ByteBuf payload = serializer.serializeResponseAsBinary(
+ ResponseMessage.build().code(HttpResponseStatus.OK)
+ .result(Arrays.asList("a", 3L)).bulked(true).create(),
+ ByteBufAllocator.DEFAULT);
+
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ stream.offer(payload);
+ stream.signalEndOfStream();
+
+ final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+ new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+ final List<Result> results = rs.all().get();
+ assertEquals(1, results.size());
+ assertTrue(results.get(0).getObject() instanceof
DefaultRemoteTraverser);
+ final DefaultRemoteTraverser<?> traverser =
(DefaultRemoteTraverser<?>) results.get(0).getObject();
+ assertEquals("a", traverser.get());
+ assertEquals(3L, traverser.bulk());
+ }
+
+ @Test
+ public void shouldHandleErrorFooter() throws Exception {
+ final ResultSet rs = new ResultSet(executor,
RequestMessage.build("g.V()").create(), null);
+ final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+ final ByteBuf payload = serializer.serializeResponseAsBinary(
+
ResponseMessage.build().code(HttpResponseStatus.INTERNAL_SERVER_ERROR)
+ .statusMessage("Something went wrong")
+ .exception("java.lang.RuntimeException")
+ .result(Collections.emptyList()).create(),
+ ByteBufAllocator.DEFAULT);
+
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ stream.offer(payload);
+ stream.signalEndOfStream();
+
+ final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+ new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+ assertTrue(rs.allItemsAvailable());
+ try {
+ rs.all().get();
+ fail("Expected exception");
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof ResponseException);
+ final ResponseException re = (ResponseException) e.getCause();
+ assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR,
re.getResponseStatusCode());
+ assertEquals("Something went wrong", re.getMessage());
+ }
+ assertNull(pending.get());
+ }
+
+ @Test
+ public void shouldReadDataSplitAcrossChunks() throws Exception {
+ final ResultSet rs = new ResultSet(executor,
RequestMessage.build("g.V()").create(), null);
+ final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+ // Use a large string (~1000 chars) so the split point must land
inside it regardless of
+ // header/footer overhead. The header+footer are ~30 bytes; the string
item is ~1000 bytes.
+ final String largeValue = String.join("", Collections.nCopies(100,
"abcdefghij"));
+ final ByteBuf fullPayload = serializer.serializeResponseAsBinary(
+ ResponseMessage.build().code(HttpResponseStatus.OK)
+
.result(Collections.singletonList(largeValue)).create(),
+ ByteBufAllocator.DEFAULT);
+
+ final int splitPoint = fullPayload.readableBytes() / 2;
+ final ByteBuf chunk1 = fullPayload.readSlice(splitPoint).retain();
+ final ByteBuf chunk2 = fullPayload.retain();
+
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ stream.offer(chunk1);
+ stream.offer(chunk2);
+ stream.signalEndOfStream();
+
+ final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+ new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+ final List<Result> results = rs.all().get();
+ assertEquals(1, results.size());
+ assertEquals(largeValue, results.get(0).getString());
+
+ fullPayload.release();
+ }
+
+ @Test
+ public void shouldReadEmptyResponse() throws Exception {
+ final ResultSet rs = new ResultSet(executor,
RequestMessage.build("g.V()").create(), null);
+ final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+
+ final ByteBuf payload = serializer.serializeResponseAsBinary(
+ ResponseMessage.build().code(HttpResponseStatus.OK)
+ .result(Collections.emptyList()).create(),
+ ByteBufAllocator.DEFAULT);
+
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ stream.offer(payload);
+ stream.signalEndOfStream();
+
+ final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+ new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run();
+
+ final List<Result> results = rs.all().get();
+ assertTrue(results.isEmpty());
+ assertNull(pending.get());
+ }
+}
diff --git
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
new file mode 100644
index 0000000000..fdd37818a4
--- /dev/null
+++
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.DefaultLastHttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.CharsetUtil;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
+import static org.junit.Assert.*;
+
+public class HttpStreamingResponseHandlerTest {
+
+ private ExecutorService executor;
+ private GraphBinaryMessageSerializerV4 serializer;
+ private GraphBinaryReader reader;
+
+ @Before
+ public void setup() {
+ executor = Executors.newSingleThreadExecutor();
+ serializer = new GraphBinaryMessageSerializerV4();
+ reader = serializer.getMapper().getReader();
+ }
+
+ @After
+ public void teardown() {
+ executor.shutdownNow();
+ }
+
+ private EmbeddedChannel createChannel(final AtomicReference<ResultSet>
pendingResultSet, final long maxResponseContentLength) {
+ final HttpStreamingResponseHandler handler = new
HttpStreamingResponseHandler(
+ reader, pendingResultSet, executor, maxResponseContentLength);
+ return new EmbeddedChannel(handler);
+ }
+
+ @Test
+ public void shouldEmitLastContentReadResponseOnHappyPath() throws
Exception {
+ final ResultSet rs = new ResultSet(executor,
RequestMessage.build("g.V()").create(), null);
+ final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+ final EmbeddedChannel channel = createChannel(pending, 0);
+
+ // Serialize a valid GraphBinary response
+ final byte[] payload = toBytes(serializer.serializeResponseAsBinary(
+ ResponseMessage.build().code(HttpResponseStatus.OK)
+ .result(Collections.singletonList(1)).create(),
+ channel.alloc()));
+
+ // Send HttpResponse with GraphBinary content type
+ final HttpResponse response = new
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE,
SerTokens.MIME_GRAPHBINARY_V4);
+ channel.writeInbound(response);
+
+ // Send content
+ channel.writeInbound(new
DefaultHttpContent(Unpooled.wrappedBuffer(payload)));
+
+ // Send LastHttpContent
+ channel.writeInbound(new DefaultLastHttpContent());
+
+ // Verify LAST_CONTENT_READ_RESPONSE is emitted
+ final Object out = channel.readInbound();
+ assertSame(LAST_CONTENT_READ_RESPONSE, out);
+
+ channel.finishAndReleaseAll();
+ }
+
+ @Test
+ public void shouldHandleDoubleLastHttpContentWithoutError() throws
Exception {
+ final ResultSet rs = new ResultSet(executor,
RequestMessage.build("g.V()").create(), null);
+ final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+ final EmbeddedChannel channel = createChannel(pending, 0);
+
+ final byte[] payload = toBytes(serializer.serializeResponseAsBinary(
+ ResponseMessage.build().code(HttpResponseStatus.OK)
+ .result(Collections.singletonList(1)).create(),
+ channel.alloc()));
+
+ final HttpResponse response = new
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE,
SerTokens.MIME_GRAPHBINARY_V4);
+ channel.writeInbound(response);
+ channel.writeInbound(new
DefaultHttpContent(Unpooled.wrappedBuffer(payload)));
+ channel.writeInbound(new DefaultLastHttpContent());
+
+ // Send a second LastHttpContent — should not throw NPE
+ channel.writeInbound(new DefaultLastHttpContent());
+
+ channel.finishAndReleaseAll();
+ }
+
+ @Test
+ public void shouldThrowTooLongFrameExceptionWhenMaxLengthExceeded() {
+ final ResultSet rs = new ResultSet(executor,
RequestMessage.build("g.V()").create(), null);
+ final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+ final EmbeddedChannel channel = createChannel(pending, 10);
+
+ final HttpResponse response = new
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE,
SerTokens.MIME_GRAPHBINARY_V4);
+ channel.writeInbound(response);
+
+ try {
+ // Send content exceeding the 10-byte limit
+ channel.writeInbound(new
DefaultHttpContent(Unpooled.wrappedBuffer(new byte[20])));
+ fail("Expected TooLongFrameException");
+ } catch (Exception e) {
+ assertTrue(e instanceof TooLongFrameException);
+ }
+
+ channel.finishAndReleaseAll();
+ }
+
+ @Test
+ public void shouldSignalQueueInputStreamOnChannelInactive() throws
Exception {
+ final ResultSet rs = new ResultSet(executor,
RequestMessage.build("g.V()").create(), null);
+ final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+ final EmbeddedChannel channel = createChannel(pending, 0);
+
+ final HttpResponse response = new
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE,
SerTokens.MIME_GRAPHBINARY_V4);
+ channel.writeInbound(response);
+
+ // Send some content but no LastHttpContent
+ channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new
byte[]{1, 2, 3})));
+
+ // Fire channelInactive — should not throw and should signal the stream
+ channel.pipeline().fireChannelInactive();
+
+ channel.finishAndReleaseAll();
+ }
+
+ @Test
+ public void shouldMarkErrorOnResultSetForNonGraphBinaryError() throws
Exception {
+ final ResultSet rs = new ResultSet(executor,
RequestMessage.build("g.V()").create(), null);
+ final AtomicReference<ResultSet> pending = new AtomicReference<>(rs);
+ final EmbeddedChannel channel = createChannel(pending, 0);
+
+ // Send a 500 response with JSON content type
+ final HttpResponse response = new
DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE,
"application/json");
+ channel.writeInbound(response);
+
+ // Send JSON error body
+ final String errorJson = "{\"message\":\"test error\"}";
+ channel.writeInbound(new
DefaultLastHttpContent(Unpooled.copiedBuffer(errorJson, CharsetUtil.UTF_8)));
+
+ // Verify error is marked on the ResultSet
+ try {
+ rs.all().get();
+ fail("Expected exception");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof ResponseException);
+ final ResponseException re = (ResponseException) e.getCause();
+ assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR,
re.getResponseStatusCode());
+ assertEquals("test error", re.getMessage());
+ }
+
+ // Verify pendingResultSet was cleared
+ assertNull(pending.get());
+
+ channel.finishAndReleaseAll();
+ }
+
+ private byte[] toBytes(final io.netty.buffer.ByteBuf buf) {
+ final byte[] bytes = new byte[buf.readableBytes()];
+ buf.readBytes(bytes);
+ buf.release();
+ return bytes;
+ }
+}
diff --git
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java
new file mode 100644
index 0000000000..02d029d66e
--- /dev/null
+++
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.Unpooled;
+import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
+import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class InputStreamBufferTest {
+
+ @Test
+ public void shouldReadPrimitivesThroughInputStreamBuffer() throws
Exception {
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ final io.netty.buffer.ByteBuf buf = Unpooled.buffer();
+ buf.writeByte(42);
+ buf.writeInt(12345);
+ buf.writeLong(9876543210L);
+ buf.writeFloat(3.14f);
+ buf.writeDouble(2.718281828);
+ buf.writeShort(256);
+ buf.writeBoolean(true);
+ stream.offer(buf);
+ stream.signalEndOfStream();
+
+ final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+ assertEquals(42, buffer.readByte());
+ assertEquals(12345, buffer.readInt());
+ assertEquals(9876543210L, buffer.readLong());
+ assertEquals(3.14f, buffer.readFloat(), 0.001f);
+ assertEquals(2.718281828, buffer.readDouble(), 0.000001);
+ assertEquals(256, buffer.readShort());
+ assertTrue(buffer.readBoolean());
+ }
+
+ @Test
+ public void shouldReadBytesArray() throws Exception {
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ stream.offer(Unpooled.wrappedBuffer(new byte[]{10, 20, 30}));
+ stream.signalEndOfStream();
+
+ final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+ final byte[] dest = new byte[3];
+ buffer.readBytes(dest);
+ assertArrayEquals(new byte[]{10, 20, 30}, dest);
+ }
+
+ @Test
+ public void shouldTrackReaderIndex() throws Exception {
+ final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
+ stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2, 3, 4, 5, 6, 7, 8,
9}));
+ stream.signalEndOfStream();
+
+ final InputStreamBuffer buffer = new InputStreamBuffer(stream);
+ assertEquals(0, buffer.readerIndex());
+ buffer.readByte();
+ assertEquals(1, buffer.readerIndex());
+ buffer.readInt();
+ assertEquals(5, buffer.readerIndex());
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void shouldThrowOnReadableBytes() {
+ new InputStreamBuffer(new ByteBufQueueInputStream()).readableBytes();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void shouldThrowOnWriteInt() {
+ new InputStreamBuffer(new ByteBufQueueInputStream()).writeInt(1);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void shouldThrowOnNioBuffer() {
+ new InputStreamBuffer(new ByteBufQueueInputStream()).nioBuffer();
+ }
+}
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index 7c474d6ba0..f0cb2b9ea2 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -275,7 +275,11 @@ public class HttpGremlinEndpointHandler extends
SimpleChannelInboundHandler<Requ
// failures that follow this will show up in the response body
instead.
sendHttpResponse(ctx, OK, createResponseHeaders(ctx,
serializer, requestCtx).toArray(CharSequence[]::new));
sendHttpContents(ctx, requestCtx);
- sendLastHttpContent(ctx, HttpResponseStatus.OK, "");
+ // Skip if writeError() already terminated the response (e.g.,
serialization error in makeChunk).
+ // Sending a second LastHttpContent would corrupt the HTTP
framing on keep-alive connections.
+ if (requestCtx.getRequestState() != RequestState.ERROR) {
+ sendLastHttpContent(ctx, HttpResponseStatus.OK, "");
+ }
} catch (Throwable t) {
writeError(requestCtx, formErrorResponseMessage(t,
requestMessage), serializer.getValue1());
} finally {
@@ -753,19 +757,21 @@ public class HttpGremlinEndpointHandler extends
SimpleChannelInboundHandler<Requ
ctx.setRequestState(STREAMING);
return serializer.writeHeader(responseMessage,
nettyContext.alloc());
}
- ctx.setRequestState(FINISHED);
- return
serializer.serializeResponseAsBinary(ResponseMessage.build()
+ final ByteBuf fullResponse =
serializer.serializeResponseAsBinary(ResponseMessage.build()
.result(aggregate)
.bulked(bulking)
.code(HttpResponseStatus.OK)
.create(), nettyContext.alloc());
+ ctx.setRequestState(FINISHED);
+ return fullResponse;
case STREAMING:
return serializer.writeChunk(aggregate,
nettyContext.alloc());
case FINISHING:
+ final ByteBuf footer =
serializer.writeFooter(responseMessage, nettyContext.alloc());
ctx.setRequestState(FINISHED);
- return serializer.writeFooter(responseMessage,
nettyContext.alloc());
+ return footer;
}
return serializer.serializeResponseAsBinary(responseMessage,
nettyContext.alloc());
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
index f0f6b652c0..4fa5ba84ec 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
@@ -97,6 +97,13 @@ public class HttpHandlerUtil {
* @param serializer The serializer to use to serialize the error
response.
*/
static void writeError(final Context context, final ResponseMessage
responseMessage, final MessageSerializer<?> serializer) {
+ // Prevent writing after the response is already terminated. A second
write would corrupt
+ // HTTP framing on keep-alive connections, poisoning them for
subsequent requests.
+ if (context.getRequestState() ==
HttpGremlinEndpointHandler.RequestState.ERROR ||
+ context.getRequestState() ==
HttpGremlinEndpointHandler.RequestState.FINISHED) {
+ return;
+ }
+
try {
final ChannelHandlerContext ctx =
context.getChannelHandlerContext();
final ByteBuf ByteBuf = context.getRequestState() ==
HttpGremlinEndpointHandler.RequestState.STREAMING
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java
new file mode 100644
index 0000000000..708a63e13f
--- /dev/null
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
+import
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static
org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
+import static org.junit.Assert.*;
+
+/**
+ * Integration tests for streaming HTTP response support in the Java driver.
+ * Verifies that the streaming pipeline (HttpStreamingResponseHandler +
GraphBinaryStreamResponseReader)
+ * works correctly end-to-end against a real Gremlin Server.
+ */
+public class StreamingResponseIntegrateTest extends
AbstractGremlinServerIntegrationTest {
+
+ @Override
+ public Settings overrideSettings(final Settings settings) {
+ settings.channelizer = HttpChannelizer.class.getName();
+ return settings;
+ }
+
+ @Test
+ public void shouldStreamBasicResults() throws Exception {
+ final Cluster cluster = TestClientFactory.build().create();
+ try {
+ final Client client = cluster.connect();
+ final List<Result> results =
client.submit("g.inject(1,2,3)").all().get();
+ assertEquals(3, results.size());
+ assertEquals(1, results.get(0).getInt());
+ assertEquals(2, results.get(1).getInt());
+ assertEquals(3, results.get(2).getInt());
+ } finally {
+ cluster.close();
+ }
+ }
+
+ @Test
+ public void shouldStreamIncrementallyWithIterator() throws Exception {
+ final Cluster cluster = TestClientFactory.build().create();
+ try {
+ final Client client = cluster.connect();
+ final ResultSet rs = client.submit("g.inject(1,2,3,4,5)");
+
+ // Consume results one at a time via iterator
+ final Iterator<Result> iter = rs.iterator();
+ final List<Integer> collected = new ArrayList<>();
+ while (iter.hasNext()) {
+ collected.add(iter.next().getInt());
+ }
+ assertEquals(5, collected.size());
+ for (int i = 0; i < 5; i++) {
+ assertEquals(i + 1, (int) collected.get(i));
+ }
+ } finally {
+ cluster.close();
+ }
+ }
+
+ @Test
+ public void shouldStreamIncrementallyWithOne() throws Exception {
+ final Cluster cluster = TestClientFactory.build().create();
+ try {
+ final Client client = cluster.connect();
+ final ResultSet rs = client.submit("g.inject(10,20,30)");
+
+ assertEquals(10, rs.one().getInt());
+ assertEquals(20, rs.one().getInt());
+ assertEquals(30, rs.one().getInt());
+ assertNull(rs.one());
+ } finally {
+ cluster.close();
+ }
+ }
+
+ @Test
+ public void shouldStreamLargeResultSet() throws Exception {
+ final Cluster cluster = TestClientFactory.build().create();
+ try {
+ final Client client = cluster.connect();
+ // Generate a large result set that would span multiple HTTP chunks
+ final List<Result> results = client.submit(
+
"g.inject(1).repeat(__.identity()).times(1000).emit()").all().get();
+ assertEquals(1000, results.size());
+ } finally {
+ cluster.close();
+ }
+ }
+
+ @Test
+ public void shouldStreamEmptyResponse() throws Exception {
+ final Cluster cluster = TestClientFactory.build().create();
+ try {
+ final Client client = cluster.connect();
+ final List<Result> results =
client.submit("g.V().hasLabel('nonexistent')").all().get();
+ assertTrue(results.isEmpty());
+ } finally {
+ cluster.close();
+ }
+ }
+
+ @Test
+ public void shouldHandleServerErrorDuringStreaming() throws Exception {
+ final Cluster cluster = TestClientFactory.build().create();
+ try {
+ final Client client = cluster.connect();
+ client.submit("invalid_script_that_should_fail").all().get();
+ fail("Expected exception");
+ } catch (ExecutionException e) {
+ // Error should propagate correctly through the streaming pipeline
+ assertTrue(e.getCause() instanceof ResponseException);
+ final ResponseException re = (ResponseException) e.getCause();
+ assertEquals(HttpResponseStatus.BAD_REQUEST,
re.getResponseStatusCode());
+ } finally {
+ cluster.close();
+ }
+ }
+
+ @Test
+ public void shouldReuseConnectionAfterStreamingComplete() throws Exception
{
+ final Cluster cluster = TestClientFactory.build().create();
+ try {
+ final Client client = cluster.connect();
+
+ // First request
+ final List<Result> results1 =
client.submit("g.inject(1)").all().get();
+ assertEquals(1, results1.size());
+
+ // Second request on same client (should reuse connection)
+ final List<Result> results2 =
client.submit("g.inject(2)").all().get();
+ assertEquals(1, results2.size());
+ assertEquals(2, results2.get(0).getInt());
+ } finally {
+ cluster.close();
+ }
+ }
+
+ @Test
+ public void shouldHandleConcurrentStreamingRequests() throws Exception {
+ final Cluster cluster = TestClientFactory.build().create();
+ try {
+ final Client client = cluster.connect();
+
+ final CompletableFuture<List<Result>> f1 =
client.submit("g.inject(1,2,3)").all();
+ final CompletableFuture<List<Result>> f2 =
client.submit("g.inject(4,5,6)").all();
+ final CompletableFuture<List<Result>> f3 =
client.submit("g.inject(7,8,9)").all();
+
+ assertEquals(3, f1.get().size());
+ assertEquals(3, f2.get().size());
+ assertEquals(3, f3.get().size());
+ } finally {
+ cluster.close();
+ }
+ }
+
+ @Test
+ public void shouldStreamWithTraversalApi() throws Exception {
+ final Cluster cluster = TestClientFactory.build().create();
+ try {
+ final GraphTraversalSource g = traversal().with(
+ DriverRemoteConnection.using(cluster));
+
+ final List<Integer> results = g.inject(1, 2, 3).toList();
+ assertEquals(3, results.size());
+ assertTrue(results.contains(1));
+ assertTrue(results.contains(2));
+ assertTrue(results.contains(3));
+ } finally {
+ cluster.close();
+ }
+ }
+
+ @Test
+ public void shouldStreamVerticesFromGraph() throws Exception {
+ final Cluster cluster = TestClientFactory.build().create();
+ try {
+ final Client client = cluster.connect();
+
+ // Use inject to create data rather than relying on pre-loaded
graph
+ final List<Result> results =
client.submit("g.inject(1,2,3,4,5,6)").all().get();
+ assertEquals(6, results.size());
+ } finally {
+ cluster.close();
+ }
+ }
+
+ @Test
+ public void shouldReuseConnectionAfterServerError() throws Exception {
+ final Cluster cluster = TestClientFactory.build().create();
+ try {
+ final Client client = cluster.connect();
+
+ // Submit a request that causes a server error
+ try {
+ client.submit("throw new RuntimeException('test
error')").all().get();
+ fail("Should have thrown");
+ } catch (ExecutionException e) {
+ // expected
+ }
+
+ // Connection should still be usable
+ final List<Result> results =
client.submit("g.inject(1)").all().get();
+ assertEquals(1, results.get(0).getInt());
+ } finally {
+ cluster.close();
+ }
+ }
+}