hachikuji commented on a change in pull request #11080:
URL: https://github.com/apache/kafka/pull/11080#discussion_r707728626



##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
##########
@@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+                " remaining bytes.");

Review comment:
       Apologies for the delay here. I don't see a problem with the change. I 
believe that @ijuma is right that the fetch response may still return 
incomplete data, but I believe this is handled in `ByteBufferLogInputStream`. 
We stop batch iteration early if there is incomplete data, so we would never 
reach the `readFrom` here which is called for each record in the batch. It's 
worth noting also that the only caller of this method (in 
`DefaultRecordBatch.uncompressedIterator`) has the following logic:
   
   ```java
                   try {
                       return DefaultRecord.readFrom(buffer, baseOffset, 
firstTimestamp, baseSequence, logAppendTime);
                   } catch (BufferUnderflowException e) {
                       throw new InvalidRecordException("Incorrect declared 
batch size, premature EOF reached");
                   }
   ```
   
   So it is already handle underflows in a similar way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to