This is an automated email from the ASF dual-hosted git repository.
Cole-Greer pushed a commit to branch HTTPClientPoC
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/HTTPClientPoC by this push:
new 085da1cb0b Prevent OOM during streaming deserialization of large
responses
085da1cb0b is described below
commit 085da1cb0b1136805a0b2fb6b9ea098dd0a016aa
Author: Cole Greer <[email protected]>
AuthorDate: Fri May 8 16:06:34 2026 -0700
Prevent OOM during streaming deserialization of large responses
When a server-side evaluationTimeout fires mid-stream, the client may
have already queued significant response data. The reader thread could
OOM trying to deserialize large objects (e.g., vertex properties with
huge strings) from the already-queued bytes.
Fixes:
- Reduce MAX_QUEUE_CHUNKS from 512 to 64 (bounds heap usage from
queued byte[] chunks)
- cancel() now calls markError() instead of markComplete(), causing
QueueInputStream.read() to throw immediately on next access
- Add error check at top of QueueInputStream.read() so in-progress
reads from current buffer fail fast when cancelled
- Reader loop checks cancelled flag between readChunk() calls
The combined effect: when the ResultSet is done (timeout, error, or
client abandonment), the reader thread stops consuming data promptly
rather than continuing to deserialize queued bytes until OOM.
---
.../java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java | 3 ++-
.../apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java | 4 ++--
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
index dac3450bc3..90c15e0b81 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/QueueInputStream.java
@@ -36,7 +36,7 @@ final class QueueInputStream extends InputStream {
* Maximum number of queued byte[] chunks before enqueue starts discarding.
* This bounds memory usage if the consumer is not cancelled promptly.
*/
- private static final int MAX_QUEUE_CHUNKS = 512;
+ private static final int MAX_QUEUE_CHUNKS = 64;
private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
private byte[] current;
@@ -78,6 +78,7 @@ final class QueueInputStream extends InputStream {
@Override
public int read(final byte[] b, final int off, final int len) throws
IOException {
+ if (error != null) throw error;
if (closed) return -1;
while (current == null || pos >= current.length) {
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
index 80ad6011cf..c9149088d0 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/StreamingResponseConsumer.java
@@ -86,7 +86,7 @@ final class StreamingResponseConsumer implements
AsyncResponseConsumer<Void> {
*/
void cancel() {
cancelled = true;
- queueInputStream.markComplete();
+ queueInputStream.markError(new java.io.IOException("Response
cancelled"));
}
@Override
@@ -213,7 +213,7 @@ final class StreamingResponseConsumer implements
AsyncResponseConsumer<Void> {
boolean isFirstChunk = true;
try {
final InputStreamBuffer buffer = new
InputStreamBuffer(queueInputStream);
- while (true) {
+ while (!cancelled) {
final ResponseMessage msg = serializer.readChunk(buffer,
isFirstChunk);
isFirstChunk = false;