This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch jstream2 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 72905f4600d088cb08233b1239b5e8c338eb4a3d Author: Ken Hu <[email protected]> AuthorDate: Wed May 13 21:27:47 2026 -0700 Split streaming and nonstreaming pipeline --- .../tinkerpop/gremlin/driver/Channelizer.java | 9 +- .../tinkerpop/gremlin/driver/Connection.java | 4 + .../driver/handler/GremlinResponseHandler.java | 21 +-- .../handler/HttpStreamingResponseHandler.java | 32 ++-- .../handler/HttpStreamingResponseHandlerTest.java | 207 --------------------- 5 files changed, 32 insertions(+), 241 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 7e297be462..30cf6fed57 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -212,7 +212,8 @@ public interface Channelizer extends ChannelHandler { final GraphBinaryReader graphBinaryReader = ((GraphBinaryMessageSerializerV4) serializer).getMapper().getReader(); streamingResponseHandler = new HttpStreamingResponseHandler( - graphBinaryReader, pending, cluster.getMaxResponseContentLength()); + graphBinaryReader, pending, cluster.getMaxResponseContentLength(), + () -> { connection.returnToPool(); connection.tryShutdown(); }); } else { useStreaming = false; gremlinResponseDecoder = new HttpGremlinResponseDecoder(serializer); @@ -267,12 +268,8 @@ public interface Channelizer extends ChannelHandler { pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder); + pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending)); } - - pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending, () -> { - connection.returnToPool(); - connection.tryShutdown(); - }, useStreaming)); } } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 12a1de646c..2cd0cf2cdd 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -217,6 +217,10 @@ final class Connection { // write operation. logger.debug("Error while processing request on the server {}.", this, t); handleConnectionCleanupOnError(thisConnection); + } else { + // the callback for when the read was successful, meaning that ResultSet.markComplete() + // was called + thisConnection.returnToPool(); } // While this request was in process, close might have been signaled in closeAsync(). // However, close would be blocked until all pending requests are completed. Attempt diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java index 94898783eb..d4cfb167d2 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@ -49,13 +49,9 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class); private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = AttributeKey.valueOf("caughtException"); private final AtomicReference<ResultSet> pendingResultSet; - private final Runnable onResponseComplete; - private final boolean streaming; - public GremlinResponseHandler(final AtomicReference<ResultSet> pending, final Runnable onResponseComplete, final boolean streaming) { + public GremlinResponseHandler(final AtomicReference<ResultSet> pending) { this.pendingResultSet = pending; - this.onResponseComplete = onResponseComplete; - this.streaming = streaming; } @Override @@ -104,17 +100,14 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response // Stream is done when the last content signaling response message is read. if (LAST_CONTENT_READ_RESPONSE == response) { - if (!streaming) { - final ResultSet rs = pendingResultSet.getAndSet(null); - if (rs != null) { - if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { - rs.markComplete(); - } else { - rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); - } + final ResultSet rs = pendingResultSet.getAndSet(null); + if (rs != null) { + if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { + rs.markComplete(); + } else { + rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); } } - onResponseComplete.run(); } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index ea0d897aac..09a91c37b7 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -21,7 +21,7 @@ package org.apache.tinkerpop.gremlin.driver.handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpContent; @@ -43,7 +43,6 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -51,18 +50,20 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; - /** * Decodes chunked HTTP responses into streaming results without buffering the full response body. * <p> * For GraphBinary responses, content chunks are passed to a {@link ByteBufQueueInputStream} consumed by a * {@link GraphBinaryStreamResponseReader} on a separate thread. That reader deserializes results incrementally, * delivers them to the {@code ResultSet}, and handles completion and cleanup. For non-GraphBinary error responses - * (e.g., JSON 401/500), the error body is accumulated and parsed when the response ends, then - * {@code LAST_CONTENT_READ_RESPONSE} is fired for {@link GremlinResponseHandler} to process. + * (e.g., JSON 401/500), the error body is accumulated and parsed when the response ends, then the error is + * marked directly on the {@code ResultSet}. + * <p> + * This handler also owns the connection return lifecycle for the streaming pipeline: once all HTTP bytes are + * consumed (LastHttpContent), the connection is returned to the pool immediately. The reader thread continues + * consuming buffered bytes independently of the channel. */ -public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpObject> { +public class HttpStreamingResponseHandler extends SimpleChannelInboundHandler<HttpObject> { private static final Logger logger = LoggerFactory.getLogger(HttpStreamingResponseHandler.class); private static final ObjectMapper mapper = new ObjectMapper(); @@ -70,6 +71,7 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb private final GraphBinaryReader graphBinaryReader; private final AtomicReference<ResultSet> pendingResultSet; private final long maxResponseContentLength; + private final Runnable onWireComplete; // Per-connection executor for the reader thread. Sized to one thread because an HTTP/1.1 // connection has at most one in-flight request. The thread is created lazily on first use @@ -86,10 +88,12 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb public HttpStreamingResponseHandler(final GraphBinaryReader graphBinaryReader, final AtomicReference<ResultSet> pendingResultSet, - final long maxResponseContentLength) { + final long maxResponseContentLength, + final Runnable onWireComplete) { this.graphBinaryReader = graphBinaryReader; this.pendingResultSet = pendingResultSet; this.maxResponseContentLength = maxResponseContentLength; + this.onWireComplete = onWireComplete; this.readerExecutor = new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new BasicThreadFactory.Builder().namingPattern( @@ -97,8 +101,7 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb } @Override - protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, - final List<Object> out) throws Exception { + protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { if (msg instanceof HttpResponse) { final HttpResponse resp = (HttpResponse) msg; @@ -119,10 +122,9 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb new GraphBinaryStreamResponseReader(buffer, graphBinaryReader, rs, pendingResultSet); readerExecutor.submit(streamReader::run); } else { - // No pending ResultSet — close the stream and fire sentinel immediately + // No pending ResultSet — close the stream and return connection immediately queueInputStream.signalEndOfStream(); queueInputStream = null; - out.add(LAST_CONTENT_READ_RESPONSE); } } } @@ -163,12 +165,14 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb // rather than offered to the already-closed stream. queueInputStream = null; } - out.add(LAST_CONTENT_READ_RESPONSE); } else { // Non-GraphBinary error — parse accumulated body and fire sentinel handleNonGraphBinaryError(); - out.add(LAST_CONTENT_READ_RESPONSE); } + // All HTTP bytes are off the wire. Return the connection to the pool so it + // can be reused for the next request. The reader thread continues to consume + // buffered bytes independently of the channel. + onWireComplete.run(); } } } diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java deleted file mode 100644 index 2ca5dc0171..0000000000 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tinkerpop.gremlin.driver.handler; - -import io.netty.buffer.Unpooled; -import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.http.DefaultHttpContent; -import io.netty.handler.codec.http.DefaultHttpResponse; -import io.netty.handler.codec.http.DefaultLastHttpContent; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.util.CharsetUtil; -import org.apache.tinkerpop.gremlin.driver.ResultSet; -import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; -import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; -import org.apache.tinkerpop.gremlin.util.message.RequestMessage; -import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; -import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; -import org.apache.tinkerpop.gremlin.util.ser.SerTokens; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.Collections; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; -import static org.junit.Assert.*; - -public class HttpStreamingResponseHandlerTest { - - private ExecutorService executor; - private GraphBinaryMessageSerializerV4 serializer; - private GraphBinaryReader reader; - - @Before - public void setup() { - executor = Executors.newSingleThreadExecutor(); - serializer = new GraphBinaryMessageSerializerV4(); - reader = serializer.getMapper().getReader(); - } - - @After - public void teardown() { - executor.shutdownNow(); - } - - private EmbeddedChannel createChannel(final AtomicReference<ResultSet> pendingResultSet, final long maxResponseContentLength) { - final HttpStreamingResponseHandler handler = new HttpStreamingResponseHandler( - reader, pendingResultSet, maxResponseContentLength); - return new EmbeddedChannel(handler); - } - - @Test - public void shouldEmitLastContentReadResponseOnHappyPath() throws Exception { - final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); - final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); - final EmbeddedChannel channel = createChannel(pending, 0); - - // Serialize a valid GraphBinary response - final byte[] payload = toBytes(serializer.serializeResponseAsBinary( - ResponseMessage.build().code(HttpResponseStatus.OK) - .result(Collections.singletonList(1)).create(), - channel.alloc())); - - // Send HttpResponse with GraphBinary content type - final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); - channel.writeInbound(response); - - // Send content - channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(payload))); - - // Send LastHttpContent - channel.writeInbound(new DefaultLastHttpContent()); - - // Verify LAST_CONTENT_READ_RESPONSE is emitted - final Object out = channel.readInbound(); - assertSame(LAST_CONTENT_READ_RESPONSE, out); - - channel.finishAndReleaseAll(); - } - - @Test - public void shouldHandleDoubleLastHttpContentWithoutError() throws Exception { - final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); - final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); - final EmbeddedChannel channel = createChannel(pending, 0); - - final byte[] payload = toBytes(serializer.serializeResponseAsBinary( - ResponseMessage.build().code(HttpResponseStatus.OK) - .result(Collections.singletonList(1)).create(), - channel.alloc())); - - final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); - channel.writeInbound(response); - channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(payload))); - channel.writeInbound(new DefaultLastHttpContent()); - - // Send a second LastHttpContent — should not throw NPE - channel.writeInbound(new DefaultLastHttpContent()); - - channel.finishAndReleaseAll(); - } - - @Test - public void shouldThrowTooLongFrameExceptionWhenMaxLengthExceeded() { - final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); - final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); - final EmbeddedChannel channel = createChannel(pending, 10); - - final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); - channel.writeInbound(response); - - try { - // Send content exceeding the 10-byte limit - channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[20]))); - fail("Expected TooLongFrameException"); - } catch (Exception e) { - assertTrue(e instanceof TooLongFrameException); - } - - channel.finishAndReleaseAll(); - } - - @Test - public void shouldSignalQueueInputStreamOnChannelInactive() throws Exception { - final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); - final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); - final EmbeddedChannel channel = createChannel(pending, 0); - - final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); - channel.writeInbound(response); - - // Send some content but no LastHttpContent - channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[]{1, 2, 3}))); - - // Fire channelInactive — should not throw and should signal the stream - channel.pipeline().fireChannelInactive(); - - channel.finishAndReleaseAll(); - } - - @Test - public void shouldMarkErrorOnResultSetForNonGraphBinaryError() throws Exception { - final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); - final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); - final EmbeddedChannel channel = createChannel(pending, 0); - - // Send a 500 response with JSON content type - final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR); - response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json"); - channel.writeInbound(response); - - // Send JSON error body - final String errorJson = "{\"message\":\"test error\"}"; - channel.writeInbound(new DefaultLastHttpContent(Unpooled.copiedBuffer(errorJson, CharsetUtil.UTF_8))); - - // Verify error is marked on the ResultSet - try { - rs.all().get(); - fail("Expected exception"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof ResponseException); - final ResponseException re = (ResponseException) e.getCause(); - assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, re.getResponseStatusCode()); - assertEquals("test error", re.getMessage()); - } - - // Verify pendingResultSet was cleared - assertNull(pending.get()); - - channel.finishAndReleaseAll(); - } - - private byte[] toBytes(final io.netty.buffer.ByteBuf buf) { - final byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - buf.release(); - return bytes; - } -}
