Ismaël Mejía created HADOOP-19901:
-------------------------------------

             Summary: ChecksumFileSystem.readVectored leaks buffers allocated 
through caller's IntFunction allocator
                 Key: HADOOP-19901
                 URL: https://issues.apache.org/jira/browse/HADOOP-19901
             Project: Hadoop Common
          Issue Type: Bug
          Components: fs
    Affects Versions: 3.4.3
            Reporter: Ismaël Mejía


h3. Summary

When {{ChecksumFileSystem.readVectored()}} is called with checksum verification 
enabled (the default for {{LocalFileSystem}}), it allocates buffers for *both* 
file data ranges and checksum ranges through the caller-provided 
{{IntFunction<ByteBuffer> allocate}} function. However, the checksum buffers 
are only used temporarily for verification and are never released back to the 
caller. The caller has no reference to these buffers and no mechanism to 
release them.

This was discovered in Apache Parquet Java while upgrading from Hadoop 3.3.0 to 
3.4.3 and testing with {{TrackingByteBufferAllocator}}, which detected leaked 
{{ByteBuffer}} allocations.

h3. Root cause

In {{ChecksumFSInputChecker.readVectored()}} ([ChecksumFileSystem.java, 
trunk|https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]):

{code:java}
@Override
public void readVectored(final List<? extends FileRange> ranges,
    final IntFunction<ByteBuffer> allocate,
    final Consumer<ByteBuffer> release) throws IOException {

  // ...
  sums.readVectored(checksumRanges, allocate, release);   // allocates checksum 
buffers via caller's allocator
  datas.readVectored(dataRanges, allocate, release);       // allocates data 
buffers via caller's allocator
  for (CombinedFileRange checksumRange : checksumRanges) {
    for (FileRange dataRange : checksumRange.getUnderlying()) {
      CompletableFuture<ByteBuffer> result =
          checksumRange.getData().thenCombineAsync(dataRange.getData(),
              (sumBuffer, dataBuffer) ->
                  checkBytes(sumBuffer, checksumRange.getOffset(),
                      dataBuffer, dataRange.getOffset(), bytesPerSum, file));
      for (FileRange original : ((CombinedFileRange) 
dataRange).getUnderlying()) {
        original.setData(result.thenApply(
            (b) -> VectoredReadUtils.sliceTo(b, dataRange.getOffset(), 
original)));
      }
    }
  }
}
{code}

Two problems:

# *Checksum buffers are never released.* {{sums.readVectored(checksumRanges, 
allocate, release)}} allocates buffers through the caller's {{allocate}} 
function to read checksum data. After {{checkBytes()}} verifies the data, the 
checksum buffers ({{sumBuffer}}) are no longer needed, but they are never 
passed to {{release}} and are invisible to the caller. They leak.

# *The 2-arg API provides no release mechanism.* The 2-arg overload passes a 
no-op release:
   {code:java}
   public void readVectored(List<? extends FileRange> ranges,
                            IntFunction<ByteBuffer> allocate) throws 
IOException {
       readVectored(ranges, allocate, (b) -> { });
   }
   {code}
   Even callers using the 3-arg API don't benefit, because 
{{ChecksumFileSystem}} itself never calls {{release}} on the checksum buffers 
-- it only passes {{release}} down to the underlying streams.

h3. How this was discovered

Apache Parquet Java uses a {{TrackingByteBufferAllocator}} in tests that wraps 
the real allocator and tracks all allocations. When the allocator is closed, it 
throws {{LeakedByteBufferException}} if any allocated buffers were not 
released. After upgrading Hadoop from 3.3.0 to 3.4.3, the following test 
classes started failing with buffer leak errors in the vectored I/O path:

* {{TestRecordLevelFilters}} (15 tests)
* {{TestColumnIndexFiltering}} (24 tests) 
* {{TestParquetReader}} (6+ tests)

The allocation stacktrace showed:
{code}
TrackingByteBufferAllocator.allocate
  -> VectorIOBufferPool.getBuffer
    -> RawLocalFileSystem$AsyncHandler.initiateRead
{code}

Parquet's {{readVectored()}} method passes a {{ByteBufferAllocator}} to Hadoop, 
but Hadoop uses it for internal temporary allocations (checksum ranges) that 
are invisible to the caller.

h3. Workaround in Parquet

We implemented a "capturing allocator" pattern that wraps the allocator to 
track all buffers allocated during {{readVectored()}}, then registers them all 
for release:

{code:java}
List<ByteBuffer> allocatedBuffers = new ArrayList<>();
ByteBufferAllocator capturingAllocator = new ByteBufferAllocator() {
    @Override
    public ByteBuffer allocate(int size) {
        ByteBuffer buf = options.getAllocator().allocate(size);
        allocatedBuffers.add(buf);
        return buf;
    }
    // ...
};
try {
    f.readVectored(ranges, capturingAllocator);
    // ... process futures ...
} finally {
    builder.addBuffersToRelease(allocatedBuffers);
}
{code}

This ensures all buffers allocated through the caller's allocator are 
eventually released, regardless of whether they are returned in a future or 
used internally by ChecksumFileSystem. See [parquet-java commit 
fc0586d68|https://github.com/apache/parquet-java/commit/fc0586d68].

h3. Suggested fixes

*Option A (minimal): Release checksum buffers after verification.*

In {{ChecksumFSInputChecker.readVectored()}}, after {{checkBytes()}} completes, 
call {{release}} on the checksum buffer:

{code:java}
CompletableFuture<ByteBuffer> result =
    checksumRange.getData().thenCombineAsync(dataRange.getData(),
        (sumBuffer, dataBuffer) -> {
            ByteBuffer verified = checkBytes(sumBuffer, 
checksumRange.getOffset(),
                dataBuffer, dataRange.getOffset(), bytesPerSum, file);
            release.accept(sumBuffer);  // release checksum buffer after 
verification
            return verified;
        });
{code}

*Option B (comprehensive): Don't use the caller's allocator for internal 
temporaries.*

ChecksumFileSystem should allocate its own temporary buffers for checksum data 
instead of using the caller-provided allocator. The caller's allocator is 
intended for buffers that the caller will own and manage. Using it for internal 
temporaries violates that expectation.

{code:java}
// Use internal allocation for checksums, not the caller's allocator
sums.readVectored(checksumRanges, ByteBuffer::allocate, (b) -> { });
// Only use caller's allocator for data ranges
datas.readVectored(dataRanges, allocate, release);
{code}

*Option C (API improvement): Extend the API to support paired allocate/release.*

The current {{IntFunction<ByteBuffer>}} allocator is one-way -- there's no way 
for Hadoop to release a buffer it allocated through the caller's function. 
HADOOP-19303 added a {{Consumer<ByteBuffer> release}} parameter, but it's 
separate from the allocate function and {{ChecksumFileSystem}} doesn't use it 
for its own intermediate buffers. A paired allocator/releaser interface 
(similar to Parquet's {{ByteBufferAllocator}} with both {{allocate}} and 
{{release}} methods) would make the lifecycle explicit.

h3. Related issues

* *HADOOP-19303* (VectorIO API to support releasing buffers on failure) -- 
Added the 3-arg {{readVectored}} with {{release}} Consumer, but 
{{ChecksumFileSystem}} doesn't call {{release}} on checksum buffers.
* *HADOOP-18296* (Memory fragmentation in ChecksumFileSystem Vectored IO) -- 
Fixed range merging fragmentation, but did not address checksum buffer leaks.
* *PARQUET-2171* (Implement vectored IO in parquet file format) -- The Parquet 
side implementation.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to