This is an automated email from the ASF dual-hosted git repository.
Cole-Greer pushed a commit to branch HTTPClientPoC
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/HTTPClientPoC by this push:
new 631450020a Fix memory leak: cancel streaming response consumer on
timeout
631450020a is described below
commit 631450020a8870de7577e729ef4423015d279fad
Author: Cole Greer <[email protected]>
AuthorDate: Fri May 8 12:26:53 2026 -0700
Fix memory leak: cancel streaming response consumer on timeout
When a request times out or the ResultSet is abandoned, the
StreamingResponseConsumer was continuing to buffer incoming HTTP
response data indefinitely, causing OOM.
Fix:
- Add cancelled flag to StreamingResponseConsumer; consume() discards
data when cancelled, updateCapacity() returns 0 to stop Apache HC
- Wire ResultSet.readCompleted to cancel the consumer when done
- Add MAX_QUEUE_CHUNKS (512) bound to QueueInputStream as defense
- releaseResources() and failed() both set cancelled
This addresses the Java heap space OOM seen in CI timeout tests where
the server keeps streaming data after the client has given up.
---
.../tinkerpop/gremlin/driver/HttpTransport.java | 4 +++
.../tinkerpop/gremlin/driver/QueueInputStream.java | 13 +++++--
.../gremlin/driver/StreamingResponseConsumer.java | 40 +++++++++++++++-------
3 files changed, 43 insertions(+), 14 deletions(-)
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HttpTransport.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HttpTransport.java
index 78cad60592..c156095336 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HttpTransport.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HttpTransport.java
@@ -121,6 +121,10 @@ final class HttpTransport {
final StreamingResponseConsumer consumer = new
StreamingResponseConsumer(
resultSet, future, serializer, streamingReaderPool,
maxResponseContentLength);
+ // When the ResultSet is done (success, error, or timeout), cancel the
consumer
+ // to stop buffering any further incoming data and prevent OOM.
+ resultSet.getReadCompleted().whenComplete((v, t) -> consumer.cancel());
+
httpClient.execute(
SimpleRequestProducer.create(request),
consumer,
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
index 0370386297..dac3450bc3 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
@@ -32,6 +32,12 @@ final class QueueInputStream extends InputStream {
private static final byte[] END_MARKER = new byte[0];
+ /**
+ * Maximum number of queued byte[] chunks before enqueue starts discarding.
+ * This bounds memory usage if the consumer is not cancelled promptly.
+ */
+ private static final int MAX_QUEUE_CHUNKS = 512;
+
private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
private byte[] current;
private int pos;
@@ -39,10 +45,13 @@ final class QueueInputStream extends InputStream {
private volatile IOException error;
/**
- * Enqueue a chunk of data to be read.
+ * Enqueue a chunk of data to be read. If the queue has reached its
capacity bound,
+ * the data is silently discarded to prevent unbounded memory growth.
*/
void enqueue(final byte[] data) {
- queue.offer(data);
+ if (queue.size() < MAX_QUEUE_CHUNKS) {
+ queue.offer(data);
+ }
}
/**
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
index a34e720336..80ad6011cf 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
@@ -63,6 +63,7 @@ final class StreamingResponseConsumer implements
AsyncResponseConsumer<Void> {
private volatile int statusCode;
private volatile boolean futureCompleted;
+ private volatile boolean cancelled;
private volatile int trailingStatusCode = -1;
private volatile String trailingException;
private long bytesRead;
@@ -79,6 +80,15 @@ final class StreamingResponseConsumer implements
AsyncResponseConsumer<Void> {
this.maxResponseContentLength = maxResponseContentLength;
}
+ /**
+ * Cancels this consumer so that subsequent {@code consume()} calls
discard data instead of buffering it.
+ * Should be called when the client has given up on the response (e.g.,
timeout).
+ */
+ void cancel() {
+ cancelled = true;
+ queueInputStream.markComplete();
+ }
+
@Override
public void consumeResponse(final HttpResponse response, final
EntityDetails entityDetails,
final HttpContext context, final
FutureCallback<Void> resultCallback) throws HttpException, IOException {
@@ -119,23 +129,28 @@ final class StreamingResponseConsumer implements
AsyncResponseConsumer<Void> {
@Override
public void updateCapacity(final CapacityChannel capacityChannel) throws
IOException {
+ if (cancelled) {
+ capacityChannel.update(0);
+ return;
+ }
// Allow unlimited buffering — backpressure is handled by the blocking
queue
capacityChannel.update(Integer.MAX_VALUE);
}
@Override
public void consume(final ByteBuffer src) throws IOException {
- if (src.hasRemaining()) {
- bytesRead += src.remaining();
- if (maxResponseContentLength > 0 && bytesRead >
maxResponseContentLength) {
- resultSet.markError(new ResponseException(413, "Response
entity too large"));
- queueInputStream.markError(new IOException("Response entity
too large"));
- return;
- }
- final byte[] data = new byte[src.remaining()];
- src.get(data);
- queueInputStream.enqueue(data);
+ if (cancelled || !src.hasRemaining()) return;
+
+ bytesRead += src.remaining();
+ if (maxResponseContentLength > 0 && bytesRead >
maxResponseContentLength) {
+ cancelled = true;
+ resultSet.markError(new ResponseException(413, "Response entity
too large"));
+ queueInputStream.markError(new IOException("Response entity too
large"));
+ return;
}
+ final byte[] data = new byte[src.remaining()];
+ src.get(data);
+ queueInputStream.enqueue(data);
}
@Override
@@ -164,6 +179,7 @@ final class StreamingResponseConsumer implements
AsyncResponseConsumer<Void> {
@Override
public void failed(final Exception cause) {
+ cancelled = true;
logger.debug("Streaming response failed", cause);
queueInputStream.markError(new IOException(cause));
if (!futureCompleted) {
@@ -175,8 +191,8 @@ final class StreamingResponseConsumer implements
AsyncResponseConsumer<Void> {
@Override
public void releaseResources() {
- // Don't close the QueueInputStream here — the reader thread may still
be consuming data.
- // The stream will naturally end when the END_MARKER (from
streamEnd/markComplete) is processed.
+ cancelled = true;
+ queueInputStream.markComplete();
}
/**