Ismaël Mejía created HADOOP-19901:
-------------------------------------
Summary: ChecksumFileSystem.readVectored leaks buffers allocated
through caller's IntFunction allocator
Key: HADOOP-19901
URL: https://issues.apache.org/jira/browse/HADOOP-19901
Project: Hadoop Common
Issue Type: Bug
Components: fs
Affects Versions: 3.4.3
Reporter: Ismaël Mejía
h3. Summary
When {{ChecksumFileSystem.readVectored()}} is called with checksum verification
enabled (the default for {{LocalFileSystem}}), it allocates buffers for *both*
file data ranges and checksum ranges through the caller-provided
{{IntFunction<ByteBuffer> allocate}} function. However, the checksum buffers
are only used temporarily for verification and are never released back to the
caller. The caller has no reference to these buffers and no mechanism to
release them.
This was discovered in Apache Parquet Java while upgrading from Hadoop 3.3.0 to
3.4.3 and testing with {{TrackingByteBufferAllocator}}, which detected leaked
{{ByteBuffer}} allocations.
h3. Root cause
In {{ChecksumFSInputChecker.readVectored()}} ([ChecksumFileSystem.java,
trunk|https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]):
{code:java}
@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
// ...
sums.readVectored(checksumRanges, allocate, release); // allocates checksum
buffers via caller's allocator
datas.readVectored(dataRanges, allocate, release); // allocates data
buffers via caller's allocator
for (CombinedFileRange checksumRange : checksumRanges) {
for (FileRange dataRange : checksumRange.getUnderlying()) {
CompletableFuture<ByteBuffer> result =
checksumRange.getData().thenCombineAsync(dataRange.getData(),
(sumBuffer, dataBuffer) ->
checkBytes(sumBuffer, checksumRange.getOffset(),
dataBuffer, dataRange.getOffset(), bytesPerSum, file));
for (FileRange original : ((CombinedFileRange)
dataRange).getUnderlying()) {
original.setData(result.thenApply(
(b) -> VectoredReadUtils.sliceTo(b, dataRange.getOffset(),
original)));
}
}
}
}
{code}
Two problems:
# *Checksum buffers are never released.* {{sums.readVectored(checksumRanges,
allocate, release)}} allocates buffers through the caller's {{allocate}}
function to read checksum data. After {{checkBytes()}} verifies the data, the
checksum buffers ({{sumBuffer}}) are no longer needed, but they are never
passed to {{release}} and are invisible to the caller. They leak.
# *The 2-arg API provides no release mechanism.* The 2-arg overload passes a
no-op release:
{code:java}
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws
IOException {
readVectored(ranges, allocate, (b) -> { });
}
{code}
Even callers using the 3-arg API don't benefit, because
{{ChecksumFileSystem}} itself never calls {{release}} on the checksum buffers
-- it only passes {{release}} down to the underlying streams.
h3. How this was discovered
Apache Parquet Java uses a {{TrackingByteBufferAllocator}} in tests that wraps
the real allocator and tracks all allocations. When the allocator is closed, it
throws {{LeakedByteBufferException}} if any allocated buffers were not
released. After upgrading Hadoop from 3.3.0 to 3.4.3, the following test
classes started failing with buffer leak errors in the vectored I/O path:
* {{TestRecordLevelFilters}} (15 tests)
* {{TestColumnIndexFiltering}} (24 tests)
* {{TestParquetReader}} (6+ tests)
The allocation stacktrace showed:
{code}
TrackingByteBufferAllocator.allocate
-> VectorIOBufferPool.getBuffer
-> RawLocalFileSystem$AsyncHandler.initiateRead
{code}
Parquet's {{readVectored()}} method passes a {{ByteBufferAllocator}} to Hadoop,
but Hadoop uses it for internal temporary allocations (checksum ranges) that
are invisible to the caller.
h3. Workaround in Parquet
We implemented a "capturing allocator" pattern that wraps the allocator to
track all buffers allocated during {{readVectored()}}, then registers them all
for release:
{code:java}
List<ByteBuffer> allocatedBuffers = new ArrayList<>();
ByteBufferAllocator capturingAllocator = new ByteBufferAllocator() {
@Override
public ByteBuffer allocate(int size) {
ByteBuffer buf = options.getAllocator().allocate(size);
allocatedBuffers.add(buf);
return buf;
}
// ...
};
try {
f.readVectored(ranges, capturingAllocator);
// ... process futures ...
} finally {
builder.addBuffersToRelease(allocatedBuffers);
}
{code}
This ensures all buffers allocated through the caller's allocator are
eventually released, regardless of whether they are returned in a future or
used internally by ChecksumFileSystem. See [parquet-java commit
fc0586d68|https://github.com/apache/parquet-java/commit/fc0586d68].
h3. Suggested fixes
*Option A (minimal): Release checksum buffers after verification.*
In {{ChecksumFSInputChecker.readVectored()}}, after {{checkBytes()}} completes,
call {{release}} on the checksum buffer:
{code:java}
CompletableFuture<ByteBuffer> result =
checksumRange.getData().thenCombineAsync(dataRange.getData(),
(sumBuffer, dataBuffer) -> {
ByteBuffer verified = checkBytes(sumBuffer,
checksumRange.getOffset(),
dataBuffer, dataRange.getOffset(), bytesPerSum, file);
release.accept(sumBuffer); // release checksum buffer after
verification
return verified;
});
{code}
*Option B (comprehensive): Don't use the caller's allocator for internal
temporaries.*
ChecksumFileSystem should allocate its own temporary buffers for checksum data
instead of using the caller-provided allocator. The caller's allocator is
intended for buffers that the caller will own and manage. Using it for internal
temporaries violates that expectation.
{code:java}
// Use internal allocation for checksums, not the caller's allocator
sums.readVectored(checksumRanges, ByteBuffer::allocate, (b) -> { });
// Only use caller's allocator for data ranges
datas.readVectored(dataRanges, allocate, release);
{code}
*Option C (API improvement): Extend the API to support paired allocate/release.*
The current {{IntFunction<ByteBuffer>}} allocator is one-way -- there's no way
for Hadoop to release a buffer it allocated through the caller's function.
HADOOP-19303 added a {{Consumer<ByteBuffer> release}} parameter, but it's
separate from the allocate function and {{ChecksumFileSystem}} doesn't use it
for its own intermediate buffers. A paired allocator/releaser interface
(similar to Parquet's {{ByteBufferAllocator}} with both {{allocate}} and
{{release}} methods) would make the lifecycle explicit.
h3. Related issues
* *HADOOP-19303* (VectorIO API to support releasing buffers on failure) --
Added the 3-arg {{readVectored}} with {{release}} Consumer, but
{{ChecksumFileSystem}} doesn't call {{release}} on checksum buffers.
* *HADOOP-18296* (Memory fragmentation in ChecksumFileSystem Vectored IO) --
Fixed range merging fragmentation, but did not address checksum buffer leaks.
* *PARQUET-2171* (Implement vectored IO in parquet file format) -- The Parquet
side implementation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]