[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17777664#comment-17777664
 ] 

Divij Vaidya edited comment on KAFKA-15653 at 10/20/23 10:12 AM:
-----------------------------------------------------------------

I think the potential bug could be in how we are closing the ChunkedBytesStream 
and returning the buffer for re-use later.
{code:java}
@Override
public void close() throws IOException {
    byte[] mybuf = intermediateBuf;
    intermediateBuf = null;

    InputStream input = in;
    in = null;

    if (mybuf != null)
        bufferSupplier.release(intermediateBufRef);
    if (input != null)
        input.close();
} {code}
I -am suspecting this because, we are setting underlying buffer behind the 
ByteBuffer to be null, which will allow it to be garbage collected. But we 
don't want it to be GC'ed because we simply want to return it to the pool so 
that it can be used later again.-

-I will verify this tomorrow with a test and have a fix out soon.-
Nevermind, it will not be GC'ed because we have a reference to it via mybuf. I 
verified it using a test.


{code:java}
@Test
public void testValidReturnToBufferSupplierOnClose() throws IOException, 
InterruptedException {
    final BufferSupplier threadSpecificSupplier = BufferSupplier.create();
    final boolean delegateSkipToSourceStream = false;

    final ByteBuffer inputBuffer = 
ByteBuffer.allocate(SIZE_LITTLE_LARGE_THAN_INTERMEDIATE_BUFFER_SIZE);
    final ByteBuffer originalIntermediateBufferRef;

    try (ChunkedBytesStream is = new ChunkedBytesStream(new 
ByteBufferInputStream(inputBuffer.duplicate()), threadSpecificSupplier, 
DEFAULT_INTERMEDIATE_BUFFER_SIZE, delegateSkipToSourceStream)) {
        assertNotNull(is.intermediateBufRef);
        // read everything to verify sanity
        Utils.readFully(is, ByteBuffer.allocate(inputBuffer.capacity()));
        // store reference of intermediate buffer provided by buffer supplier
        originalIntermediateBufferRef = is.intermediateBufRef;
    }

    // The intermediate buffer should have been returned to buffer supplier.
    // Force GC to rule out any lingering references. Notably this is just a 
hint and may not cause actual GC.
    System.gc();
    // Wait to GC to do it's magic.
    Thread.sleep(2000);

    try (ChunkedBytesStream is = new ChunkedBytesStream(new 
ByteBufferInputStream(inputBuffer.duplicate()), threadSpecificSupplier, 
DEFAULT_INTERMEDIATE_BUFFER_SIZE, delegateSkipToSourceStream)) {
        assertNotNull(is.intermediateBufRef);
        assertEquals(originalIntermediateBufferRef, is.intermediateBufRef);
    }
} {code}


was (Author: divijvaidya):
I think the potential bug could be in how we are closing the ChunkedBytesStream 
and returning the buffer for re-use later.
{code:java}

@Override
public void close() throws IOException {
    byte[] mybuf = intermediateBuf;
    intermediateBuf = null;

    InputStream input = in;
    in = null;

    if (mybuf != null)
        bufferSupplier.release(intermediateBufRef);
    if (input != null)
        input.close();
} {code}

I am suspecting this because, we are setting underlying buffer behind the 
ByteBuffer to be null, which will allow it to be garbage collected. But we 
don't want it to be GC'ed because we simply want to return it to the pool so 
that it can be used later again.

I will verify this tomorrow with a test and have a fix out soon.

> NPE in ChunkedByteStream
> ------------------------
>
>                 Key: KAFKA-15653
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15653
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.6.0
>         Environment: Docker container on a Linux laptop, using the latest 
> release.
>            Reporter: Travis Bischel
>            Assignee: Divij Vaidya
>            Priority: Major
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>       at 
> org.apache.kafka.common.utils.ChunkedBytesStream.<init>(ChunkedBytesStream.java:89)
>       at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>       at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>       at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>       at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>       at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>       at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>       at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>       at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>       at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>       at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>       at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>       at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>       at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
>       at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>       at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to