jsancio commented on code in PR #18852:
URL: https://github.com/apache/kafka/pull/18852#discussion_r1964063484


##########
core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala:
##########
@@ -108,12 +118,93 @@ final class KafkaMetadataLogTest {
       classOf[RuntimeException],
       () => {
         log.appendAsFollower(
-          MemoryRecords.withRecords(initialOffset, Compression.NONE, 
currentEpoch, recordFoo)
+          MemoryRecords.withRecords(initialOffset, Compression.NONE, 
currentEpoch, recordFoo),
+          currentEpoch
         )
       }
     )
   }
 
+  @Test
+  def testEmptyAppendNotAllowed(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    assertThrows(classOf[IllegalArgumentException], () => 
log.appendAsFollower(MemoryRecords.EMPTY, 1));
+    assertThrows(classOf[IllegalArgumentException], () => 
log.appendAsLeader(MemoryRecords.EMPTY, 1));
+  }
+
+  @ParameterizedTest
+  @ArgumentsSource(classOf[InvalidMemoryRecordsProvider])
+  def testInvalidMemoryRecords(records: MemoryRecords, expectedException: 
Optional[Class[Exception]]): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+    val previousEndOffset = log.endOffset().offset()
+
+    val action: Executable = () => log.appendAsFollower(records, Int.MaxValue)
+    if (expectedException.isPresent()) {
+      assertThrows(expectedException.get, action)
+    } else {
+      assertThrows(classOf[CorruptRecordException], action)
+    }
+
+    assertEquals(previousEndOffset, log.endOffset().offset())
+  }
+
+  @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)

Review Comment:
   Most likely the test will fail because the LEO before and after the append 
do not match. jqwik will return the random seed that generated the "valid" 
record.
   
   > timestamp = 2025-02-20T10:15:17.660380, 
KafkaMetadataLogTest:testRandomRecords = 
   >   org.opentest4j.AssertionFailedError:
   >  ...
   > 
   > tries = 1                     | # of calls to property
   > checks = 1                    | # of not rejected calls
   > generation = RANDOMIZED       | parameters are randomly generated
   > after-failure = SAMPLE_ONLY   | only try the previously failed sample
   > when-fixed-seed = ALLOW       | fixing the random seed is allowed
   > edge-cases#mode = MIXIN       | edge cases are mixed in
   > edge-cases#total = 0          | # of all combined edge cases
   > edge-cases#tried = 0          | # of edge cases tried in current run
   > seed = -5949598009408269565   | random seed to reproduce generated values
   > 
   > Sample
   > ------
   >   records: MemoryRecords(size=57, buffer=java.nio.HeapByteBuffer[pos=0 
lim=57 cap=57]) 
   
   I can fix the `ArbitraryMemoryRecords` when these cases are identified. I 
created [KAFKA-18838](https://issues.apache.org/jira/browse/KAFKA-18838) to 
improve the arbitrary generator so that records are not completely random. Do 
you mind if this is implemented after this PR?



##########
core/src/test/scala/unit/kafka/server/MockFetcherThread.scala:
##########
@@ -115,6 +125,11 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
       batches.headOption.map(_.lastOffset).getOrElse(-1)))
   }
 
+  private def hasInvalidPartitionLeaderEpoch(batch: RecordBatch, leaderEpoch: 
Int): Boolean = {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to