This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch jstream2 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 7ad120269be6dd1082702d24830b2a2afc62d1e1 Author: Cole Greer <[email protected]> AuthorDate: Fri Apr 24 11:43:59 2026 -0700 Add streaming HTTP response support to gremlin-driver Adds incremental GraphBinary response deserialization to the Java driver. HTTP response chunks are fed to a BlockingQueue-backed InputStream, and a dedicated reader thread deserializes items one at a time using the existing TypeSerializer infrastructure via an InputStream-backed Buffer adapter. Results are pushed to the ResultSet as they are deserialized, enabling consumers to process results before the full response is received. Non-GraphBinary serializers (GraphSON, custom MessageSerializer) fall back to the previous aggregating pipeline (HttpObjectAggregator + HttpGremlinResponseDecoder). fix tests Fix connection corruption from double-LastHttpContent in streaming pipeline Server: Guard sendLastHttpContent with state check so it is skipped when writeError() in makeChunk() already terminated the response. Without this, a serialization error mid-stream sends two LastHttpContent messages, corrupting HTTP framing on keep-alive connections. Client: Null out queueInputStream after signalEndOfStream on LastHttpContent so spurious content between responses is dropped rather than offered to the closed stream. Guard writeError() against double-write after response termination Prevent writeError() from sending data after the response is already in ERROR or FINISHED state. Without this guard, any code path that calls writeError() after the response is terminated sends a second LastHttpContent, corrupting HTTP framing on keep-alive connections and causing subsequent requests on the same connection to hang. Assisted-by: Kiro:claude-sonnet-4-20250514 Fix serialization error handling and connection return race in streaming pipeline 1. HttpGremlinEndpointHandler.makeChunk(): Defer setRequestState(FINISHED) until after serialization succeeds. Previously, serialization failure left the state as FINISHED causing writeError() to bail out, resulting in empty responses and client-side EOFExceptions. 2. Connection.write(): Change whenCompleteAsync to whenComplete so returnToPool() runs synchronously when readCompleted fires. This prevents a race where the caller submits the next request before the connection is returned to the pool, causing connection starvation and idle timeout hangs. Assisted-by: Kiro:claude-sonnet-4-20250514 Fix channel-error recovery by checking isOpen() in addition to isDead() When a channel error occurs (e.g. TooLongFrameException), GremlinResponseHandler calls ctx.close() which initiates an async channel close. The whenComplete callback fires synchronously and checks isDead() (channel.isActive()), but isActive() may still return true because the close hasn't completed deregistration yet. Adding a channel.isOpen() check resolves this TOCTOU race because isOpen() returns false immediately when close() is called, before deregistration completes. Assisted-by: Kiro:claude-sonnet-4-20250514 Decouple connection pool return from ResultSet completion in streaming pipeline The streaming reader thread calls markComplete() when all results are deserialized, but this could fire before the HTTP response's LastHttpContent is processed by the codec. Reusing the connection before HTTP framing is complete caused the codec to silently drop subsequent responses. Changes: - Move connection return (returnToPool) from the whenComplete callback to GremlinResponseHandler, triggered by LAST_CONTENT_READ_RESPONSE. This ensures the connection is only reused after HTTP framing is fully complete. - Change HttpStreamingResponseHandler type parameter from DefaultHttpObject to HttpObject. Netty's LastHttpContent.EMPTY_LAST_CONTENT does not extend DefaultHttpObject, causing it to bypass decode() and preventing LAST_CONTENT_READ_RESPONSE from being emitted. - Add streaming flag to GremlinResponseHandler to skip markComplete() for streaming responses (the reader thread owns ResultSet completion). - Remove returnToPool() from Connection.write() whenComplete success path; it is now handled by the pipeline on LastHttpContent for both streaming and non-streaming paths. Assisted-by: Kiro:claude-sonnet-4-20250514 Harden streaming pipeline: fix races, improve robustness, add tests - Make Connection.returnToPool() idempotent via AtomicBoolean guard to prevent double-return corrupting the connection pool in error paths - Change streamingReaderPool max to scale with cluster topology (maxConnectionPoolSize * contactPoints * 4) instead of per-host limit - Catch Throwable (not just Exception) in GraphBinaryStreamResponseReader to prevent consumer hangs on Error subclasses (OOM, StackOverflow) - Add 30s poll timeout to ByteBufQueueInputStream to prevent reader threads from blocking indefinitely if end-of-stream is never signaled - Set BYTES_READ attribute on first content (not headers) to preserve idle timeout error messaging when server sends headers before execution - Clear pendingResultSet in non-GraphBinary error path to prevent stale state blocking graceful shutdown - Rename isGraphBinaryResponse() to shouldStreamResponse() for clarity - Expand upgrade documentation with user-visible behavior details - Add HttpStreamingResponseHandlerTest covering happy path, double LastHttpContent, max content length, channelInactive, and error cases - Add error-then-reuse integration test for connection recovery cleanup fix tests --- CHANGELOG.asciidoc | 1 + docs/src/upgrade/release-4.x.x.asciidoc | 9 + .../tinkerpop/gremlin/driver/Channelizer.java | 44 ++- .../apache/tinkerpop/gremlin/driver/Cluster.java | 18 ++ .../tinkerpop/gremlin/driver/Connection.java | 23 +- .../tinkerpop/gremlin/driver/ConnectionPool.java | 1 + .../driver/handler/GremlinResponseHandler.java | 21 +- .../handler/HttpStreamingResponseHandler.java | 237 ++++++++++++++++ .../driver/stream/ByteBufQueueInputStream.java | 137 +++++++++ .../stream/GraphBinaryStreamResponseReader.java | 105 +++++++ .../gremlin/driver/stream/InputStreamBuffer.java | 311 +++++++++++++++++++++ .../handler/ByteBufQueueInputStreamTest.java | 95 +++++++ .../GraphBinaryStreamResponseReaderTest.java | 226 +++++++++++++++ .../handler/HttpStreamingResponseHandlerTest.java | 207 ++++++++++++++ .../driver/handler/InputStreamBufferTest.java | 94 +++++++ .../server/handler/HttpGremlinEndpointHandler.java | 14 +- .../gremlin/server/handler/HttpHandlerUtil.java | 7 + .../server/StreamingResponseIntegrateTest.java | 239 ++++++++++++++++ 18 files changed, 1761 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index e3f3471a6f..f76f6ce790 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -27,6 +27,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Added Gremlator, a single page web application, that translates Gremlin into various programming languages like Javascript and Python. * Removed `uuid` dependency from `gremlin-javascript` in favor of the built-in `globalThis.crypto.randomUUID()`. +* Added streaming HTTP response support to `gremlin-driver` for incremental result deserialization over GraphBinary. * Connected HTTP streaming response deserialization to the traversal API in `gremlin-javascript`, enabling `next()` to return the first result without waiting for the full response. * Changed `Client.stream()` in `gremlin-javascript` to return an `AsyncGenerator` for direct incremental consumption. * Removed `readable-stream` dependency from `gremlin-javascript`. diff --git a/docs/src/upgrade/release-4.x.x.asciidoc b/docs/src/upgrade/release-4.x.x.asciidoc index f0216e0b8d..2ebd34fff2 100644 --- a/docs/src/upgrade/release-4.x.x.asciidoc +++ b/docs/src/upgrade/release-4.x.x.asciidoc @@ -270,6 +270,15 @@ try { The traversal API is not affected — `Next()`, `ToList()`, etc. still throw `ResponseException` directly since they block on the async stream internally. +==== Streaming Response Deserialization in gremlin-driver + +The Java driver now deserializes HTTP responses incrementally, delivering results to the `ResultSet` as they arrive +rather than buffering the entire response. This reduces time-to-first-result for large result sets. + +This change is automatic and requires no code changes. It applies only when using the default GraphBinary serializer. +Custom `MessageSerializer` implementations fall back to the non-streaming pipeline that buffers the full response +before deserialization. The `ResultSet` API is unchanged. + ==== More Secure Gremlin Server Previous versions of Gremlin Server relied on a Gremlin-flavored Groovy `ScriptEngine` for basic server initialization, diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 427f76c056..59b9d02365 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -34,9 +34,13 @@ import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler; import org.apache.tinkerpop.gremlin.driver.handler.HttpContentDecompressionHandler; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder; +import org.apache.tinkerpop.gremlin.driver.handler.HttpStreamingResponseHandler; import org.apache.tinkerpop.gremlin.driver.handler.IdleConnectionHandler; import org.apache.tinkerpop.gremlin.driver.handler.InactiveChannelHandler; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.apache.tinkerpop.gremlin.util.MessageSerializer; import java.util.Collections; import java.util.Optional; @@ -92,7 +96,7 @@ public interface Channelizer extends ChannelHandler { protected Connection connection; protected Cluster cluster; protected SslHandler sslHandler; - private AtomicReference<ResultSet> pending; + protected AtomicReference<ResultSet> pending; protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler"; protected static final String PIPELINE_SSL_HANDLER = "gremlin-ssl-handler"; @@ -158,7 +162,6 @@ public interface Channelizer extends ChannelHandler { } configure(pipeline); - pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending)); } @Override @@ -187,7 +190,9 @@ public interface Channelizer extends ChannelHandler { ResponseMessage.build().code(HttpResponseStatus.NO_CONTENT).result(Collections.emptyList()).create(); private HttpGremlinRequestEncoder gremlinRequestEncoder; + private HttpStreamingResponseHandler streamingResponseHandler; private HttpGremlinResponseDecoder gremlinResponseDecoder; + private boolean useStreaming; private HttpContentDecompressionHandler httpCompressionDecoder; private IdleStateHandler idleStateHandler; @@ -200,7 +205,19 @@ public interface Channelizer extends ChannelHandler { httpCompressionDecoder = new HttpContentDecompressionHandler(); gremlinRequestEncoder = new HttpGremlinRequestEncoder(cluster.getSerializer(), cluster.getRequestInterceptors(), cluster.isUserAgentOnConnectEnabled(), cluster.isBulkResultsEnabled(), connection.getUri()); - gremlinResponseDecoder = new HttpGremlinResponseDecoder(cluster.getSerializer()); + + final MessageSerializer<?> serializer = cluster.getSerializer(); + if (serializer instanceof GraphBinaryMessageSerializerV4) { + useStreaming = true; + final GraphBinaryReader graphBinaryReader = + ((GraphBinaryMessageSerializerV4) serializer).getMapper().getReader(); + streamingResponseHandler = new HttpStreamingResponseHandler( + graphBinaryReader, pending, cluster.streamingReaderPool(), cluster.getMaxResponseContentLength()); + } else { + useStreaming = false; + gremlinResponseDecoder = new HttpGremlinResponseDecoder(serializer); + } + if (cluster.getIdleConnectionTimeout() > 0) { final int idleConnectionTimeout = (int) (cluster.getIdleConnectionTimeout() / 1000); idleStateHandler = new IdleStateHandler(idleConnectionTimeout, idleConnectionTimeout, 0); @@ -240,11 +257,22 @@ public interface Channelizer extends ChannelHandler { DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS, false); pipeline.addLast(PIPELINE_HTTP_CODEC, handler); - pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0 - ? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE)); - pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); - pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); - pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder); + if (useStreaming) { + pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); + pipeline.addLast(PIPELINE_HTTP_DECODER, streamingResponseHandler); + pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); + } else { + pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0 + ? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE)); + pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); + pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); + pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder); + } + + pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending, () -> { + connection.returnToPool(); + connection.tryShutdown(); + }, useStreaming)); } } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index cac193814c..d4a3967f02 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -70,8 +70,11 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -383,6 +386,10 @@ public final class Cluster { return manager.connectionScheduler; } + ExecutorService streamingReaderPool() { + return manager.streamingReaderPool; + } + Settings.ConnectionPoolSettings connectionPoolSettings() { return manager.connectionPoolSettings; } @@ -956,6 +963,12 @@ public final class Cluster { */ private final ScheduledThreadPoolExecutor connectionScheduler; + /** + * Cached thread pool for streaming response reader threads. One thread per active streaming response, + * bounded implicitly by the connection pool size. + */ + private final ExecutorService streamingReaderPool; + private final int nioPoolSize; private final int workerPoolSize; private final int port; @@ -1023,6 +1036,10 @@ public final class Cluster { this.connectionScheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build()); + this.streamingReaderPool = new ThreadPoolExecutor(0, builder.maxConnectionPoolSize * Math.max(contactPoints.size(), 1) * 4, + 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), + new BasicThreadFactory.Builder().namingPattern("gremlin-driver-stream-reader-%d").build()); + validationRequest = () -> RequestMessage.build(builder.validationRequest); } @@ -1133,6 +1150,7 @@ public final class Cluster { executor.shutdown(); hostScheduler.shutdown(); connectionScheduler.shutdown(); + streamingReaderPool.shutdownNow(); closeIt.complete(null); }); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index cb94b39b87..12a1de646c 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -66,6 +66,14 @@ final class Connection { * Is a {@code Connection} borrowed from the pool. */ private final AtomicBoolean isBorrowed = new AtomicBoolean(false); + /** + * Prevents returnToPool() from being called more than once per borrow cycle. + */ + private final AtomicBoolean returned = new AtomicBoolean(false); + + void resetReturned() { + returned.set(false); + } /** * This boolean guards the replace of the connection and ensures that it only occurs once. */ @@ -201,7 +209,7 @@ final class Connection { } else { final ResultSet resultSet = new ResultSet(cluster.executor(), requestMessage, pool.host); - resultSet.getReadCompleted().whenCompleteAsync((v, t) -> { + resultSet.getReadCompleted().whenComplete((v, t) -> { if (t != null) { // the callback for when the read failed. a failed read means the request went to the server // and came back with a server-side error of some sort. it means the server is responsive @@ -209,17 +217,13 @@ final class Connection { // write operation. logger.debug("Error while processing request on the server {}.", this, t); handleConnectionCleanupOnError(thisConnection); - } else { - // the callback for when the read was successful, meaning that ResultSet.markComplete() - // was called - thisConnection.returnToPool(); } // While this request was in process, close might have been signaled in closeAsync(). // However, close would be blocked until all pending requests are completed. Attempt // the shutdown if the returned result cleared up the last pending message and unblocked // the close. tryShutdown(); - }, cluster.executor()); + }); pending.set(resultSet); @@ -234,7 +238,8 @@ final class Connection { return requestPromise; } - private void returnToPool() { + void returnToPool() { + if (!returned.compareAndSet(false, true)) return; try { if (pool != null) pool.returnConnection(this); } catch (ConnectionException ce) { @@ -244,7 +249,7 @@ final class Connection { } private void handleConnectionCleanupOnError(final Connection thisConnection) { - if (thisConnection.isDead()) { + if (thisConnection.isDead() || (thisConnection.channel != null && !thisConnection.channel.isOpen())) { if (pool != null) pool.replaceConnection(thisConnection); } else { thisConnection.returnToPool(); @@ -259,7 +264,7 @@ final class Connection { * Close was signaled in closeAsync() but there were pending messages at that time. This method attempts the * shutdown if the returned result cleared up the last pending message. */ - private void tryShutdown() { + void tryShutdown() { if (isClosing() && isOkToClose()) shutdown(closeFuture.get()); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java index ee4c34c5aa..4f40b6733a 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java @@ -525,6 +525,7 @@ final class ConnectionPool { while (head != null) { // try to borrow connection if (!head.isDead() && !head.isBorrowed().get() && head.isBorrowed().compareAndSet(false, true)) { + head.resetReturned(); available = head; break; } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java index d4cfb167d2..94898783eb 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@ -49,9 +49,13 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class); private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = AttributeKey.valueOf("caughtException"); private final AtomicReference<ResultSet> pendingResultSet; + private final Runnable onResponseComplete; + private final boolean streaming; - public GremlinResponseHandler(final AtomicReference<ResultSet> pending) { + public GremlinResponseHandler(final AtomicReference<ResultSet> pending, final Runnable onResponseComplete, final boolean streaming) { this.pendingResultSet = pending; + this.onResponseComplete = onResponseComplete; + this.streaming = streaming; } @Override @@ -100,14 +104,17 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response // Stream is done when the last content signaling response message is read. if (LAST_CONTENT_READ_RESPONSE == response) { - final ResultSet rs = pendingResultSet.getAndSet(null); - if (rs != null) { - if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { - rs.markComplete(); - } else { - rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); + if (!streaming) { + final ResultSet rs = pendingResultSet.getAndSet(null); + if (rs != null) { + if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { + rs.markComplete(); + } else { + rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); + } } } + onResponseComplete.run(); } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java new file mode 100644 index 0000000000..e9d81b46b2 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.CharsetUtil; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; +import org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader; +import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.util.ser.SerTokens; +import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; +import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; + +/** + * Decodes chunked HTTP responses into streaming results without buffering the full response body. + * <p> + * For GraphBinary responses, content chunks are passed to a {@link ByteBufQueueInputStream} consumed by a + * {@link GraphBinaryStreamResponseReader} on a separate thread. That reader deserializes results incrementally, + * delivers them to the {@code ResultSet}, and handles completion and cleanup. For non-GraphBinary error responses + * (e.g., JSON 401/500), the error body is accumulated and parsed when the response ends, then + * {@code LAST_CONTENT_READ_RESPONSE} is fired for {@link GremlinResponseHandler} to process. + */ +public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpObject> { + + private static final Logger logger = LoggerFactory.getLogger(HttpStreamingResponseHandler.class); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final GraphBinaryReader graphBinaryReader; + private final AtomicReference<ResultSet> pendingResultSet; + private final ExecutorService readerPool; + private final long maxResponseContentLength; + + // Mutable state below is accessed exclusively from the channel's event loop thread. + private HttpResponseStatus responseStatus; + private String contentType; + private long bytesRead; + private ByteBufQueueInputStream queueInputStream; + private CompositeByteBuf errorBody; + + public HttpStreamingResponseHandler(final GraphBinaryReader graphBinaryReader, + final AtomicReference<ResultSet> pendingResultSet, + final ExecutorService readerPool, + final long maxResponseContentLength) { + this.graphBinaryReader = graphBinaryReader; + this.pendingResultSet = pendingResultSet; + this.readerPool = readerPool; + this.maxResponseContentLength = maxResponseContentLength; + } + + @Override + protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, + final List<Object> out) throws Exception { + if (msg instanceof HttpResponse) { + final HttpResponse resp = (HttpResponse) msg; + + // Reset mutable state for the new response cycle to prevent stale state from a previous + // response bleeding into this one when the handler is reused on the same connection. + resetState(); + + responseStatus = resp.status(); + contentType = resp.headers().get(HttpHeaderNames.CONTENT_TYPE); + queueInputStream = new ByteBufQueueInputStream(); + + // Spawn reader thread for GraphBinary responses + if (isGraphBinaryResponse()) { + final ResultSet rs = pendingResultSet.get(); + if (rs != null) { + final InputStreamBuffer buffer = new InputStreamBuffer(queueInputStream); + final GraphBinaryStreamResponseReader streamReader = + new GraphBinaryStreamResponseReader(buffer, graphBinaryReader, rs, pendingResultSet); + try { + readerPool.submit(streamReader::run); + } catch (RejectedExecutionException e) { + queueInputStream.signalEndOfStream(); + rs.markError(e); + pendingResultSet.compareAndSet(rs, null); + out.add(LAST_CONTENT_READ_RESPONSE); + } + } else { + // No pending ResultSet — close the stream and fire sentinel immediately + queueInputStream.signalEndOfStream(); + queueInputStream = null; + out.add(LAST_CONTENT_READ_RESPONSE); + } + } + } + + if (msg instanceof HttpContent) { + final ByteBuf content = ((HttpContent) msg).content(); + bytesRead += content.readableBytes(); + + if (bytesRead > 0 && ctx.channel().attr(InactiveChannelHandler.BYTES_READ).get() == null) { + ctx.channel().attr(InactiveChannelHandler.BYTES_READ).set(0); + } + + if (maxResponseContentLength > 0 && bytesRead > maxResponseContentLength) { + // Don't signal here — exceptionCaught will handle cleanup + throw new TooLongFrameException("Response entity too large"); + } + + if (!isGraphBinaryResponse()) { + // Accumulate non-GraphBinary error body across chunks + if (content.readableBytes() > 0) { + if (errorBody == null) { + errorBody = ctx.alloc().compositeBuffer(); + } + // retain() because Netty releases the content ByteBuf after decode() returns + errorBody.addComponent(true, content.retain()); + } + } else if (content.readableBytes() > 0 && queueInputStream != null) { + // Feed bytes to the reader thread + // retain() because Netty releases the content ByteBuf after decode() returns + queueInputStream.offer(content.retain()); + } + + if (msg instanceof LastHttpContent) { + if (isGraphBinaryResponse()) { + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + // Null out so any spurious content arriving between responses is dropped + // rather than offered to the already-closed stream. + queueInputStream = null; + } + out.add(LAST_CONTENT_READ_RESPONSE); + } else { + // Non-GraphBinary error — parse accumulated body and fire sentinel + handleNonGraphBinaryError(); + out.add(LAST_CONTENT_READ_RESPONSE); + } + } + } + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + } + releaseErrorBody(); + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + } + releaseErrorBody(); + super.exceptionCaught(ctx, cause); + } + + private void handleNonGraphBinaryError() { + final ResultSet rs = pendingResultSet.get(); + if (rs == null) return; + + try { + if (errorBody != null && errorBody.readableBytes() > 0) { + final JsonNode node = mapper.readTree(errorBody.toString(CharsetUtil.UTF_8)); + final String message = node.has("message") ? node.get("message").asText() : responseStatus.reasonPhrase(); + rs.markError(new ResponseException(responseStatus, message)); + } else { + rs.markError(new ResponseException(responseStatus, responseStatus.reasonPhrase())); + } + } catch (Exception e) { + logger.debug("Failed to parse error response body as JSON", e); + rs.markError(new ResponseException(responseStatus, responseStatus.reasonPhrase())); + } finally { + pendingResultSet.compareAndSet(rs, null); + releaseErrorBody(); + } + } + + private void resetState() { + // Clean up any leftover resources from a previous response on this connection + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + queueInputStream = null; + } + releaseErrorBody(); + bytesRead = 0; + responseStatus = null; + contentType = null; + } + + private void releaseErrorBody() { + if (errorBody != null) { + errorBody.release(); + errorBody = null; + } + } + + private boolean isGraphBinaryResponse() { + return !isError(responseStatus) || SerTokens.MIME_GRAPHBINARY_V4.equals(contentType); + } + + private static boolean isError(final HttpResponseStatus status) { + return status != HttpResponseStatus.OK; + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java new file mode 100644 index 0000000000..757b2821cb --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.stream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * An {@link InputStream} backed by a {@link BlockingQueue} of {@link ByteBuf} objects. The Netty event loop + * offers ByteBufs to the queue as HTTP content chunks arrive, and a reader thread consumes them via + * standard InputStream reads. + */ +public class ByteBufQueueInputStream extends InputStream { + + private static final ByteBuf END_OF_STREAM = Unpooled.buffer(0); + + private final BlockingQueue<ByteBuf> queue; + private ByteBuf current; + private volatile boolean eof; + + public ByteBufQueueInputStream() { + this.queue = new LinkedBlockingQueue<>(); + } + + /** + * Offer a ByteBuf to the queue. The caller must have already retained the ByteBuf if needed. + * The ByteBuf will be released after it is fully read. If the stream is already closed, + * the buffer is released immediately. + */ + public void offer(final ByteBuf buf) { + if (eof) { + if (buf != END_OF_STREAM && buf.refCnt() > 0) { + buf.release(); + } + return; + } + queue.add(buf); + } + + /** + * Signal that no more ByteBufs will be offered. + */ + public void signalEndOfStream() { + queue.offer(END_OF_STREAM); + } + + @Override + public int read() throws IOException { + if (eof) return -1; + + while (current == null || !current.isReadable()) { + releaseCurrent(); + try { + current = queue.poll(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for data", e); + } + if (current == null) throw new IOException("Timed out waiting for streaming response data"); + if (current == END_OF_STREAM) { + eof = true; + current = null; + return -1; + } + } + return current.readByte() & 0xFF; + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + if (eof) return -1; + if (len == 0) return 0; + + // Block until at least one byte is available, then return what we have (short read). + while (current == null || !current.isReadable()) { + releaseCurrent(); + try { + current = queue.poll(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for data", e); + } + if (current == null) throw new IOException("Timed out waiting for streaming response data"); + if (current == END_OF_STREAM) { + eof = true; + current = null; + return -1; + } + } + final int readable = Math.min(current.readableBytes(), len); + current.readBytes(b, off, readable); + return readable; + } + + @Override + public void close() throws IOException { + eof = true; + releaseCurrent(); + // drain and release any remaining buffers + ByteBuf buf; + while ((buf = queue.poll()) != null) { + if (buf != END_OF_STREAM && buf.refCnt() > 0) { + buf.release(); + } + } + } + + private void releaseCurrent() { + if (current != null && current != END_OF_STREAM && current.refCnt() > 0) { + current.release(); + } + current = null; + } + +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java new file mode 100644 index 0000000000..11491903e8 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.stream; + +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; +import org.apache.tinkerpop.gremlin.structure.io.Buffer; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.structure.io.binary.Marker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Performs pull-based streaming deserialization of a GraphBinary v4 response from an {@link InputStreamBuffer}. + * Reads one item at a time using the {@link GraphBinaryReader} and {@code TypeSerializer} infrastructure, + * pushing each result to the {@link ResultSet} as it is deserialized. + * <p> + * Wire format: {@code [version_byte][bulked_flag_byte][items...][EndOfStream marker][status_code][message][exception]} + */ +public class GraphBinaryStreamResponseReader implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(GraphBinaryStreamResponseReader.class); + + private final Buffer buffer; + private final GraphBinaryReader reader; + private final ResultSet resultSet; + private final AtomicReference<ResultSet> pendingResultSet; + + public GraphBinaryStreamResponseReader(final Buffer buffer, + final GraphBinaryReader reader, + final ResultSet resultSet, + final AtomicReference<ResultSet> pendingResultSet) { + this.buffer = buffer; + this.reader = reader; + this.resultSet = resultSet; + this.pendingResultSet = pendingResultSet; + } + + @Override + public void run() { + try { + // Read header: version byte (MSB must be 1) and bulking flag + final byte version = buffer.readByte(); + if ((version & 0x80) == 0) { + throw new RuntimeException("Invalid GraphBinary response version: " + version); + } + final boolean bulked = (buffer.readByte() & 1) == 1; + + // Read items until EndOfStream marker + while (true) { + final Object obj = reader.read(buffer); + if (obj instanceof Marker) { + break; + } + + if (bulked) { + final long bulk = reader.read(buffer); + resultSet.add(new Result(new DefaultRemoteTraverser<>(obj, bulk))); + } else { + resultSet.add(new Result(obj)); + } + } + + // Read footer: status code, nullable message, nullable exception + final int statusCode = reader.readValue(buffer, Integer.class, false); + final String message = reader.readValue(buffer, String.class, true); + final String exception = reader.readValue(buffer, String.class, true); + + // Status code 0 means success in GraphBinary v4 — the server omits the HTTP status code + // in the binary footer when the response is successful. + if (statusCode == 0 || statusCode == HttpResponseStatus.OK.code()) { + resultSet.markComplete(); + } else { + resultSet.markError(new ResponseException(HttpResponseStatus.valueOf(statusCode), message, exception)); + } + } catch (Throwable t) { + logger.warn("Error reading streaming response", t); + resultSet.markError(t); + } finally { + pendingResultSet.compareAndSet(resultSet, null); + buffer.release(); + } + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java new file mode 100644 index 0000000000..4490975f23 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.stream; + +import org.apache.tinkerpop.gremlin.structure.io.Buffer; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * A read-only {@link Buffer} implementation backed by a blocking {@link InputStream} via {@link DataInputStream}. + * Supports only sequential read operations — all write, random-access, and NIO methods throw + * {@link UnsupportedOperationException}. + * <p> + * This allows the existing {@code TypeSerializer} implementations (which only use sequential reads) to work + * unchanged over a streaming HTTP response body. + */ +public class InputStreamBuffer implements Buffer { + + private final DataInputStream in; + private int bytesRead; + + public InputStreamBuffer(final InputStream inputStream) { + this.in = new DataInputStream(inputStream); + } + + @Override + public boolean readBoolean() { + try { + final boolean v = in.readBoolean(); + bytesRead += 1; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte readByte() { + try { + final byte v = in.readByte(); + bytesRead += 1; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public short readShort() { + try { + final short v = in.readShort(); + bytesRead += 2; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public int readInt() { + try { + final int v = in.readInt(); + bytesRead += 4; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public long readLong() { + try { + final long v = in.readLong(); + bytesRead += 8; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public float readFloat() { + try { + final float v = in.readFloat(); + bytesRead += 4; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public double readDouble() { + try { + final double v = in.readDouble(); + bytesRead += 8; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final byte[] destination) { + try { + in.readFully(destination); + bytesRead += destination.length; + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final byte[] destination, final int dstIndex, final int length) { + try { + in.readFully(destination, dstIndex, length); + bytesRead += length; + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final ByteBuffer dst) { + try { + final byte[] tmp = new byte[dst.remaining()]; + in.readFully(tmp); + dst.put(tmp); + bytesRead += tmp.length; + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final OutputStream out, final int length) throws IOException { + final byte[] tmp = new byte[length]; + in.readFully(tmp); + out.write(tmp); + bytesRead += length; + return this; + } + + @Override + public int readerIndex() { + return bytesRead; + } + + @Override + public int readableBytes() { + throw new UnsupportedOperationException("readableBytes() is not supported on a streaming Buffer"); + } + + @Override + public Buffer readerIndex(final int readerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public int writerIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writerIndex(final int writerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer markWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer resetWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public int capacity() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDirect() { + return false; + } + + @Override + public Buffer writeBoolean(final boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeByte(final int value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeShort(final int value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeInt(final int value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeLong(final long value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeFloat(final float value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeDouble(final double value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeBytes(final byte[] src) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeBytes(final ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeBytes(final byte[] src, final int srcIndex, final int length) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release() { + try { + in.close(); + } catch (IOException e) { + // best-effort close + } + return true; + } + + @Override + public Buffer retain() { + throw new UnsupportedOperationException(); + } + + @Override + public int referenceCount() { + throw new UnsupportedOperationException(); + } + + @Override + public int nioBufferCount() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers(final int index, final int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer(final int index, final int length) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer getBytes(final int index, final byte[] dst) { + throw new UnsupportedOperationException(); + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java new file mode 100644 index 0000000000..91ba9cf28a --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class ByteBufQueueInputStreamTest { + + @Test + public void shouldReadSingleByteBuf() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(new byte[]{1, 2, 3, 4}); + stream.offer(buf); + stream.signalEndOfStream(); + + assertEquals(1, stream.read()); + assertEquals(2, stream.read()); + assertEquals(3, stream.read()); + assertEquals(4, stream.read()); + assertEquals(-1, stream.read()); + } + + @Test + public void shouldReadAcrossMultipleByteBufs() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2})); + stream.offer(Unpooled.wrappedBuffer(new byte[]{3, 4})); + stream.signalEndOfStream(); + + final byte[] result = new byte[8]; + int totalRead = 0; + int read; + while ((read = stream.read(result, totalRead, result.length - totalRead)) != -1) { + totalRead += read; + } + assertEquals(4, totalRead); + assertArrayEquals(new byte[]{1, 2, 3, 4}, java.util.Arrays.copyOf(result, totalRead)); + } + + @Test + public void shouldReleaseByteBufsAfterReading() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4); + buf.writeBytes(new byte[]{1, 2, 3, 4}); + assertEquals(1, buf.refCnt()); + + stream.offer(buf); + stream.signalEndOfStream(); + + final byte[] result = new byte[4]; + stream.read(result, 0, 4); + stream.read(); // triggers release of buf and reads EOS + + assertEquals(0, buf.refCnt()); + } + + @Test + public void shouldCleanUpOnClose() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(2); + buf1.writeBytes(new byte[]{1, 2}); + final ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(2); + buf2.writeBytes(new byte[]{3, 4}); + + stream.offer(buf1); + stream.offer(buf2); + stream.close(); + + assertEquals(0, buf1.refCnt()); + assertEquals(0, buf2.refCnt()); + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java new file mode 100644 index 0000000000..484ec7d524 --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; +import org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader; +import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer; +import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class GraphBinaryStreamResponseReaderTest { + + private ExecutorService executor; + private GraphBinaryMessageSerializerV4 serializer; + private GraphBinaryReader reader; + + @Before + public void setup() { + executor = Executors.newCachedThreadPool(); + serializer = new GraphBinaryMessageSerializerV4(); + reader = serializer.getMapper().getReader(); + } + + @After + public void teardown() { + executor.shutdownNow(); + } + + @Test + public void shouldReadSingleItemResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList("hello")).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List<Result> results = rs.all().get(); + assertEquals(1, results.size()); + assertEquals("hello", results.get(0).getString()); + assertNull(pending.get()); + } + + @Test + public void shouldReadMultipleItemResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Arrays.asList(1, 2, 3)).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List<Result> results = rs.all().get(); + assertEquals(3, results.size()); + assertEquals(1, results.get(0).getInt()); + assertEquals(2, results.get(1).getInt()); + assertEquals(3, results.get(2).getInt()); + } + + @Test + public void shouldReadBulkedResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Arrays.asList("a", 3L)).bulked(true).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List<Result> results = rs.all().get(); + assertEquals(1, results.size()); + assertTrue(results.get(0).getObject() instanceof DefaultRemoteTraverser); + final DefaultRemoteTraverser<?> traverser = (DefaultRemoteTraverser<?>) results.get(0).getObject(); + assertEquals("a", traverser.get()); + assertEquals(3L, traverser.bulk()); + } + + @Test + public void shouldHandleErrorFooter() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.INTERNAL_SERVER_ERROR) + .statusMessage("Something went wrong") + .exception("java.lang.RuntimeException") + .result(Collections.emptyList()).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + assertTrue(rs.allItemsAvailable()); + try { + rs.all().get(); + fail("Expected exception"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof ResponseException); + final ResponseException re = (ResponseException) e.getCause(); + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, re.getResponseStatusCode()); + assertEquals("Something went wrong", re.getMessage()); + } + assertNull(pending.get()); + } + + @Test + public void shouldReadDataSplitAcrossChunks() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + + // Use a large string (~1000 chars) so the split point must land inside it regardless of + // header/footer overhead. The header+footer are ~30 bytes; the string item is ~1000 bytes. + final String largeValue = String.join("", Collections.nCopies(100, "abcdefghij")); + final ByteBuf fullPayload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList(largeValue)).create(), + ByteBufAllocator.DEFAULT); + + final int splitPoint = fullPayload.readableBytes() / 2; + final ByteBuf chunk1 = fullPayload.readSlice(splitPoint).retain(); + final ByteBuf chunk2 = fullPayload.retain(); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(chunk1); + stream.offer(chunk2); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List<Result> results = rs.all().get(); + assertEquals(1, results.size()); + assertEquals(largeValue, results.get(0).getString()); + + fullPayload.release(); + } + + @Test + public void shouldReadEmptyResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.emptyList()).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List<Result> results = rs.all().get(); + assertTrue(results.isEmpty()); + assertNull(pending.get()); + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java new file mode 100644 index 0000000000..fdd37818a4 --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.apache.tinkerpop.gremlin.util.ser.SerTokens; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; +import static org.junit.Assert.*; + +public class HttpStreamingResponseHandlerTest { + + private ExecutorService executor; + private GraphBinaryMessageSerializerV4 serializer; + private GraphBinaryReader reader; + + @Before + public void setup() { + executor = Executors.newSingleThreadExecutor(); + serializer = new GraphBinaryMessageSerializerV4(); + reader = serializer.getMapper().getReader(); + } + + @After + public void teardown() { + executor.shutdownNow(); + } + + private EmbeddedChannel createChannel(final AtomicReference<ResultSet> pendingResultSet, final long maxResponseContentLength) { + final HttpStreamingResponseHandler handler = new HttpStreamingResponseHandler( + reader, pendingResultSet, executor, maxResponseContentLength); + return new EmbeddedChannel(handler); + } + + @Test + public void shouldEmitLastContentReadResponseOnHappyPath() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + // Serialize a valid GraphBinary response + final byte[] payload = toBytes(serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList(1)).create(), + channel.alloc())); + + // Send HttpResponse with GraphBinary content type + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + + // Send content + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(payload))); + + // Send LastHttpContent + channel.writeInbound(new DefaultLastHttpContent()); + + // Verify LAST_CONTENT_READ_RESPONSE is emitted + final Object out = channel.readInbound(); + assertSame(LAST_CONTENT_READ_RESPONSE, out); + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldHandleDoubleLastHttpContentWithoutError() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + final byte[] payload = toBytes(serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList(1)).create(), + channel.alloc())); + + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(payload))); + channel.writeInbound(new DefaultLastHttpContent()); + + // Send a second LastHttpContent — should not throw NPE + channel.writeInbound(new DefaultLastHttpContent()); + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldThrowTooLongFrameExceptionWhenMaxLengthExceeded() { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 10); + + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + + try { + // Send content exceeding the 10-byte limit + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[20]))); + fail("Expected TooLongFrameException"); + } catch (Exception e) { + assertTrue(e instanceof TooLongFrameException); + } + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldSignalQueueInputStreamOnChannelInactive() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + + // Send some content but no LastHttpContent + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[]{1, 2, 3}))); + + // Fire channelInactive — should not throw and should signal the stream + channel.pipeline().fireChannelInactive(); + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldMarkErrorOnResultSetForNonGraphBinaryError() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + // Send a 500 response with JSON content type + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json"); + channel.writeInbound(response); + + // Send JSON error body + final String errorJson = "{\"message\":\"test error\"}"; + channel.writeInbound(new DefaultLastHttpContent(Unpooled.copiedBuffer(errorJson, CharsetUtil.UTF_8))); + + // Verify error is marked on the ResultSet + try { + rs.all().get(); + fail("Expected exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof ResponseException); + final ResponseException re = (ResponseException) e.getCause(); + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, re.getResponseStatusCode()); + assertEquals("test error", re.getMessage()); + } + + // Verify pendingResultSet was cleared + assertNull(pending.get()); + + channel.finishAndReleaseAll(); + } + + private byte[] toBytes(final io.netty.buffer.ByteBuf buf) { + final byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + buf.release(); + return bytes; + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java new file mode 100644 index 0000000000..02d029d66e --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.Unpooled; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; +import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class InputStreamBufferTest { + + @Test + public void shouldReadPrimitivesThroughInputStreamBuffer() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final io.netty.buffer.ByteBuf buf = Unpooled.buffer(); + buf.writeByte(42); + buf.writeInt(12345); + buf.writeLong(9876543210L); + buf.writeFloat(3.14f); + buf.writeDouble(2.718281828); + buf.writeShort(256); + buf.writeBoolean(true); + stream.offer(buf); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + assertEquals(42, buffer.readByte()); + assertEquals(12345, buffer.readInt()); + assertEquals(9876543210L, buffer.readLong()); + assertEquals(3.14f, buffer.readFloat(), 0.001f); + assertEquals(2.718281828, buffer.readDouble(), 0.000001); + assertEquals(256, buffer.readShort()); + assertTrue(buffer.readBoolean()); + } + + @Test + public void shouldReadBytesArray() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(Unpooled.wrappedBuffer(new byte[]{10, 20, 30})); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + final byte[] dest = new byte[3]; + buffer.readBytes(dest); + assertArrayEquals(new byte[]{10, 20, 30}, dest); + } + + @Test + public void shouldTrackReaderIndex() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9})); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + assertEquals(0, buffer.readerIndex()); + buffer.readByte(); + assertEquals(1, buffer.readerIndex()); + buffer.readInt(); + assertEquals(5, buffer.readerIndex()); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowOnReadableBytes() { + new InputStreamBuffer(new ByteBufQueueInputStream()).readableBytes(); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowOnWriteInt() { + new InputStreamBuffer(new ByteBufQueueInputStream()).writeInt(1); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowOnNioBuffer() { + new InputStreamBuffer(new ByteBufQueueInputStream()).nioBuffer(); + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index 7c474d6ba0..f0cb2b9ea2 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -275,7 +275,11 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ // failures that follow this will show up in the response body instead. sendHttpResponse(ctx, OK, createResponseHeaders(ctx, serializer, requestCtx).toArray(CharSequence[]::new)); sendHttpContents(ctx, requestCtx); - sendLastHttpContent(ctx, HttpResponseStatus.OK, ""); + // Skip if writeError() already terminated the response (e.g., serialization error in makeChunk). + // Sending a second LastHttpContent would corrupt the HTTP framing on keep-alive connections. + if (requestCtx.getRequestState() != RequestState.ERROR) { + sendLastHttpContent(ctx, HttpResponseStatus.OK, ""); + } } catch (Throwable t) { writeError(requestCtx, formErrorResponseMessage(t, requestMessage), serializer.getValue1()); } finally { @@ -753,19 +757,21 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ ctx.setRequestState(STREAMING); return serializer.writeHeader(responseMessage, nettyContext.alloc()); } - ctx.setRequestState(FINISHED); - return serializer.serializeResponseAsBinary(ResponseMessage.build() + final ByteBuf fullResponse = serializer.serializeResponseAsBinary(ResponseMessage.build() .result(aggregate) .bulked(bulking) .code(HttpResponseStatus.OK) .create(), nettyContext.alloc()); + ctx.setRequestState(FINISHED); + return fullResponse; case STREAMING: return serializer.writeChunk(aggregate, nettyContext.alloc()); case FINISHING: + final ByteBuf footer = serializer.writeFooter(responseMessage, nettyContext.alloc()); ctx.setRequestState(FINISHED); - return serializer.writeFooter(responseMessage, nettyContext.alloc()); + return footer; } return serializer.serializeResponseAsBinary(responseMessage, nettyContext.alloc()); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java index f0f6b652c0..4fa5ba84ec 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java @@ -97,6 +97,13 @@ public class HttpHandlerUtil { * @param serializer The serializer to use to serialize the error response. */ static void writeError(final Context context, final ResponseMessage responseMessage, final MessageSerializer<?> serializer) { + // Prevent writing after the response is already terminated. A second write would corrupt + // HTTP framing on keep-alive connections, poisoning them for subsequent requests. + if (context.getRequestState() == HttpGremlinEndpointHandler.RequestState.ERROR || + context.getRequestState() == HttpGremlinEndpointHandler.RequestState.FINISHED) { + return; + } + try { final ChannelHandlerContext ctx = context.getChannelHandlerContext(); final ByteBuf ByteBuf = context.getRequestState() == HttpGremlinEndpointHandler.RequestState.STREAMING diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java new file mode 100644 index 0000000000..708a63e13f --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; +import static org.junit.Assert.*; + +/** + * Integration tests for streaming HTTP response support in the Java driver. + * Verifies that the streaming pipeline (HttpStreamingResponseHandler + GraphBinaryStreamResponseReader) + * works correctly end-to-end against a real Gremlin Server. + */ +public class StreamingResponseIntegrateTest extends AbstractGremlinServerIntegrationTest { + + @Override + public Settings overrideSettings(final Settings settings) { + settings.channelizer = HttpChannelizer.class.getName(); + return settings; + } + + @Test + public void shouldStreamBasicResults() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final List<Result> results = client.submit("g.inject(1,2,3)").all().get(); + assertEquals(3, results.size()); + assertEquals(1, results.get(0).getInt()); + assertEquals(2, results.get(1).getInt()); + assertEquals(3, results.get(2).getInt()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamIncrementallyWithIterator() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final ResultSet rs = client.submit("g.inject(1,2,3,4,5)"); + + // Consume results one at a time via iterator + final Iterator<Result> iter = rs.iterator(); + final List<Integer> collected = new ArrayList<>(); + while (iter.hasNext()) { + collected.add(iter.next().getInt()); + } + assertEquals(5, collected.size()); + for (int i = 0; i < 5; i++) { + assertEquals(i + 1, (int) collected.get(i)); + } + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamIncrementallyWithOne() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final ResultSet rs = client.submit("g.inject(10,20,30)"); + + assertEquals(10, rs.one().getInt()); + assertEquals(20, rs.one().getInt()); + assertEquals(30, rs.one().getInt()); + assertNull(rs.one()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamLargeResultSet() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + // Generate a large result set that would span multiple HTTP chunks + final List<Result> results = client.submit( + "g.inject(1).repeat(__.identity()).times(1000).emit()").all().get(); + assertEquals(1000, results.size()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamEmptyResponse() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final List<Result> results = client.submit("g.V().hasLabel('nonexistent')").all().get(); + assertTrue(results.isEmpty()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldHandleServerErrorDuringStreaming() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + client.submit("invalid_script_that_should_fail").all().get(); + fail("Expected exception"); + } catch (ExecutionException e) { + // Error should propagate correctly through the streaming pipeline + assertTrue(e.getCause() instanceof ResponseException); + final ResponseException re = (ResponseException) e.getCause(); + assertEquals(HttpResponseStatus.BAD_REQUEST, re.getResponseStatusCode()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldReuseConnectionAfterStreamingComplete() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + // First request + final List<Result> results1 = client.submit("g.inject(1)").all().get(); + assertEquals(1, results1.size()); + + // Second request on same client (should reuse connection) + final List<Result> results2 = client.submit("g.inject(2)").all().get(); + assertEquals(1, results2.size()); + assertEquals(2, results2.get(0).getInt()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldHandleConcurrentStreamingRequests() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + final CompletableFuture<List<Result>> f1 = client.submit("g.inject(1,2,3)").all(); + final CompletableFuture<List<Result>> f2 = client.submit("g.inject(4,5,6)").all(); + final CompletableFuture<List<Result>> f3 = client.submit("g.inject(7,8,9)").all(); + + assertEquals(3, f1.get().size()); + assertEquals(3, f2.get().size()); + assertEquals(3, f3.get().size()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamWithTraversalApi() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final GraphTraversalSource g = traversal().with( + DriverRemoteConnection.using(cluster)); + + final List<Integer> results = g.inject(1, 2, 3).toList(); + assertEquals(3, results.size()); + assertTrue(results.contains(1)); + assertTrue(results.contains(2)); + assertTrue(results.contains(3)); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamVerticesFromGraph() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + // Use inject to create data rather than relying on pre-loaded graph + final List<Result> results = client.submit("g.inject(1,2,3,4,5,6)").all().get(); + assertEquals(6, results.size()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldReuseConnectionAfterServerError() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + // Submit a request that causes a server error + try { + client.submit("throw new RuntimeException('test error')").all().get(); + fail("Should have thrown"); + } catch (ExecutionException e) { + // expected + } + + // Connection should still be usable + final List<Result> results = client.submit("g.inject(1)").all().get(); + assertEquals(1, results.get(0).getInt()); + } finally { + cluster.close(); + } + } +}
