[ 
https://issues.apache.org/jira/browse/KAFKA-13530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17457716#comment-17457716
 ] 

Lucas Bradstreet commented on KAFKA-13530:
------------------------------------------

[~jolshan] if you look at some of the other tests, they use a CountDownLatch 
with their mock to control the timing of how things complete, e.g. this setup:


{noformat}
val countDownLatch = new CountDownLatch(1)

// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, 
leaderBrokerId, countDownLatch,
expectTruncation = expectTruncation, localLogOffset = Some(10), extraProps = 
extraProps, topicId = Some(topicId)){noformat}
That allows you do things like ensure that the fetcher thread blocks in a 
certain state where you can validate the state:
{code:java}
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): 
ReplicaFetcherThread = {
  new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", fetcherId,
    sourceBroker, config, failedPartitions, replicaManager, metrics, time, 
quotaManager.follower, Some(blockingSend)) {

    override def doWork() = {
      // In case the thread starts before the partition is added by 
AbstractFetcherManager,
      // add it here (it's a no-op if already added)
      val initialOffset = InitialFetchState(
        topicId = topicId,
        leader = new BrokerEndPoint(0, "localhost", 9092),
        initOffset = 0L, currentLeaderEpoch = leaderEpochInLeaderAndIsr)
      addPartitions(Map(new TopicPartition(topic, topicPartition) -> 
initialOffset))
      super.doWork()

      // Shut the thread down after one iteration to avoid double-counting 
truncations
      initiateShutdown()
      countDownLatch.countDown()
    }
  }
}
 {code}
Maybe something like that would be helpful?

> Flaky test ReplicaManagerTest
> -----------------------------
>
>                 Key: KAFKA-13530
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13530
>             Project: Kafka
>          Issue Type: Test
>          Components: core, unit tests
>            Reporter: Matthias J. Sax
>            Priority: Critical
>              Labels: flaky-test
>
> kafka.server.ReplicaManagerTest.[1] usesTopicIds=true
> {quote}org.opentest4j.AssertionFailedError: expected: <true> but was: <false> 
> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at 
> kafka.server.ReplicaManagerTest.assertFetcherHasTopicId(ReplicaManagerTest.scala:3502)
>  at 
> kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds(ReplicaManagerTest.scala:3572){quote}
> STDOUT
> {quote}[2021-12-07 16:19:35,906] ERROR Error while reading checkpoint file 
> /tmp/kafka-6310287969113820536/replication-offset-checkpoint 
> (kafka.server.LogDirFailureChannel:76) java.nio.file.NoSuchFileException: 
> /tmp/kafka-6310287969113820536/replication-offset-checkpoint at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at 
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
>  at java.nio.file.Files.newByteChannel(Files.java:361) at 
> java.nio.file.Files.newByteChannel(Files.java:407) at 
> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
>  at java.nio.file.Files.newInputStream(Files.java:152) at 
> java.nio.file.Files.newBufferedReader(Files.java:2784) at 
> java.nio.file.Files.newBufferedReader(Files.java:2816) at 
> org.apache.kafka.server.common.CheckpointFile.read(CheckpointFile.java:104) 
> at 
> kafka.server.checkpoints.CheckpointFileWithFailureHandler.read(CheckpointFileWithFailureHandler.scala:48)
>  at 
> kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:70)
>  at 
> kafka.server.checkpoints.LazyOffsetCheckpointMap.offsets$lzycompute(OffsetCheckpointFile.scala:94)
>  at 
> kafka.server.checkpoints.LazyOffsetCheckpointMap.offsets(OffsetCheckpointFile.scala:94)
>  at 
> kafka.server.checkpoints.LazyOffsetCheckpointMap.fetch(OffsetCheckpointFile.scala:97)
>  at 
> kafka.server.checkpoints.LazyOffsetCheckpoints.fetch(OffsetCheckpointFile.scala:89)
>  at kafka.cluster.Partition.updateHighWatermark$1(Partition.scala:348) at 
> kafka.cluster.Partition.createLog(Partition.scala:361) at 
> kafka.cluster.Partition.maybeCreate$1(Partition.scala:334) at 
> kafka.cluster.Partition.createLogIfNotExists(Partition.scala:341) at 
> kafka.cluster.Partition.$anonfun$makeLeader$1(Partition.scala:546) at 
> kafka.cluster.Partition.makeLeader(Partition.scala:530) at 
> kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$3(ReplicaManager.scala:2163)
>  at scala.Option.foreach(Option.scala:437) at 
> kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$2(ReplicaManager.scala:2160)
>  at 
> kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$2$adapted(ReplicaManager.scala:2159)
>  at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
>  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
>  at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
>  at 
> kafka.server.ReplicaManager.applyLocalLeadersDelta(ReplicaManager.scala:2159) 
> at kafka.server.ReplicaManager.applyDelta(ReplicaManager.scala:2136) at 
> kafka.server.ReplicaManagerTest.testDeltaToLeaderOrFollowerMarksPartitionOfflineIfLogCantBeCreated(ReplicaManagerTest.scala:3349){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to