This is an automated email from the ASF dual-hosted git repository.

Cole-Greer pushed a commit to branch HTTPClientPoC
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/HTTPClientPoC by this push:
     new 631450020a Fix memory leak: cancel streaming response consumer on 
timeout
631450020a is described below

commit 631450020a8870de7577e729ef4423015d279fad
Author: Cole Greer <[email protected]>
AuthorDate: Fri May 8 12:26:53 2026 -0700

    Fix memory leak: cancel streaming response consumer on timeout
    
    When a request times out or the ResultSet is abandoned, the
    StreamingResponseConsumer was continuing to buffer incoming HTTP
    response data indefinitely, causing OOM.
    
    Fix:
    - Add cancelled flag to StreamingResponseConsumer; consume() discards
      data when cancelled, updateCapacity() returns 0 to stop Apache HC
    - Wire ResultSet.readCompleted to cancel the consumer when done
    - Add MAX_QUEUE_CHUNKS (512) bound to QueueInputStream as defense
    - releaseResources() and failed() both set cancelled
    
    This addresses the Java heap space OOM seen in CI timeout tests where
    the server keeps streaming data after the client has given up.
---
 .../tinkerpop/gremlin/driver/HttpTransport.java    |  4 +++
 .../tinkerpop/gremlin/driver/QueueInputStream.java | 13 +++++--
 .../gremlin/driver/StreamingResponseConsumer.java  | 40 +++++++++++++++-------
 3 files changed, 43 insertions(+), 14 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HttpTransport.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HttpTransport.java
index 78cad60592..c156095336 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HttpTransport.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HttpTransport.java
@@ -121,6 +121,10 @@ final class HttpTransport {
         final StreamingResponseConsumer consumer = new 
StreamingResponseConsumer(
                 resultSet, future, serializer, streamingReaderPool, 
maxResponseContentLength);
 
+        // When the ResultSet is done (success, error, or timeout), cancel the 
consumer
+        // to stop buffering any further incoming data and prevent OOM.
+        resultSet.getReadCompleted().whenComplete((v, t) -> consumer.cancel());
+
         httpClient.execute(
                 SimpleRequestProducer.create(request),
                 consumer,
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
index 0370386297..dac3450bc3 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
@@ -32,6 +32,12 @@ final class QueueInputStream extends InputStream {
 
     private static final byte[] END_MARKER = new byte[0];
 
+    /**
+     * Maximum number of queued byte[] chunks before enqueue starts discarding.
+     * This bounds memory usage if the consumer is not cancelled promptly.
+     */
+    private static final int MAX_QUEUE_CHUNKS = 512;
+
     private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
     private byte[] current;
     private int pos;
@@ -39,10 +45,13 @@ final class QueueInputStream extends InputStream {
     private volatile IOException error;
 
     /**
-     * Enqueue a chunk of data to be read.
+     * Enqueue a chunk of data to be read. If the queue has reached its 
capacity bound,
+     * the data is silently discarded to prevent unbounded memory growth.
      */
     void enqueue(final byte[] data) {
-        queue.offer(data);
+        if (queue.size() < MAX_QUEUE_CHUNKS) {
+            queue.offer(data);
+        }
     }
 
     /**
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
index a34e720336..80ad6011cf 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
@@ -63,6 +63,7 @@ final class StreamingResponseConsumer implements 
AsyncResponseConsumer<Void> {
 
     private volatile int statusCode;
     private volatile boolean futureCompleted;
+    private volatile boolean cancelled;
     private volatile int trailingStatusCode = -1;
     private volatile String trailingException;
     private long bytesRead;
@@ -79,6 +80,15 @@ final class StreamingResponseConsumer implements 
AsyncResponseConsumer<Void> {
         this.maxResponseContentLength = maxResponseContentLength;
     }
 
+    /**
+     * Cancels this consumer so that subsequent {@code consume()} calls 
discard data instead of buffering it.
+     * Should be called when the client has given up on the response (e.g., 
timeout).
+     */
+    void cancel() {
+        cancelled = true;
+        queueInputStream.markComplete();
+    }
+
     @Override
     public void consumeResponse(final HttpResponse response, final 
EntityDetails entityDetails,
                                 final HttpContext context, final 
FutureCallback<Void> resultCallback) throws HttpException, IOException {
@@ -119,23 +129,28 @@ final class StreamingResponseConsumer implements 
AsyncResponseConsumer<Void> {
 
     @Override
     public void updateCapacity(final CapacityChannel capacityChannel) throws 
IOException {
+        if (cancelled) {
+            capacityChannel.update(0);
+            return;
+        }
         // Allow unlimited buffering — backpressure is handled by the blocking 
queue
         capacityChannel.update(Integer.MAX_VALUE);
     }
 
     @Override
     public void consume(final ByteBuffer src) throws IOException {
-        if (src.hasRemaining()) {
-            bytesRead += src.remaining();
-            if (maxResponseContentLength > 0 && bytesRead > 
maxResponseContentLength) {
-                resultSet.markError(new ResponseException(413, "Response 
entity too large"));
-                queueInputStream.markError(new IOException("Response entity 
too large"));
-                return;
-            }
-            final byte[] data = new byte[src.remaining()];
-            src.get(data);
-            queueInputStream.enqueue(data);
+        if (cancelled || !src.hasRemaining()) return;
+
+        bytesRead += src.remaining();
+        if (maxResponseContentLength > 0 && bytesRead > 
maxResponseContentLength) {
+            cancelled = true;
+            resultSet.markError(new ResponseException(413, "Response entity 
too large"));
+            queueInputStream.markError(new IOException("Response entity too 
large"));
+            return;
         }
+        final byte[] data = new byte[src.remaining()];
+        src.get(data);
+        queueInputStream.enqueue(data);
     }
 
     @Override
@@ -164,6 +179,7 @@ final class StreamingResponseConsumer implements 
AsyncResponseConsumer<Void> {
 
     @Override
     public void failed(final Exception cause) {
+        cancelled = true;
         logger.debug("Streaming response failed", cause);
         queueInputStream.markError(new IOException(cause));
         if (!futureCompleted) {
@@ -175,8 +191,8 @@ final class StreamingResponseConsumer implements 
AsyncResponseConsumer<Void> {
 
     @Override
     public void releaseResources() {
-        // Don't close the QueueInputStream here — the reader thread may still 
be consuming data.
-        // The stream will naturally end when the END_MARKER (from 
streamEnd/markComplete) is processed.
+        cancelled = true;
+        queueInputStream.markComplete();
     }
 
     /**

Reply via email to