[ https://issues.apache.org/jira/browse/KAFKA-13530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17457716#comment-17457716 ]
Lucas Bradstreet commented on KAFKA-13530: ------------------------------------------ [~jolshan] if you look at some of the other tests, they use a CountDownLatch with their mock to control the timing of how things complete, e.g. this setup: {noformat} val countDownLatch = new CountDownLatch(1) // Prepare the mocked components for the test val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time), topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = expectTruncation, localLogOffset = Some(10), extraProps = extraProps, topicId = Some(topicId)){noformat} That allows you do things like ensure that the fetcher thread blocks in a certain state where you can validate the state: {code:java} override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = { new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", fetcherId, sourceBroker, config, failedPartitions, replicaManager, metrics, time, quotaManager.follower, Some(blockingSend)) { override def doWork() = { // In case the thread starts before the partition is added by AbstractFetcherManager, // add it here (it's a no-op if already added) val initialOffset = InitialFetchState( topicId = topicId, leader = new BrokerEndPoint(0, "localhost", 9092), initOffset = 0L, currentLeaderEpoch = leaderEpochInLeaderAndIsr) addPartitions(Map(new TopicPartition(topic, topicPartition) -> initialOffset)) super.doWork() // Shut the thread down after one iteration to avoid double-counting truncations initiateShutdown() countDownLatch.countDown() } } } {code} Maybe something like that would be helpful? > Flaky test ReplicaManagerTest > ----------------------------- > > Key: KAFKA-13530 > URL: https://issues.apache.org/jira/browse/KAFKA-13530 > Project: Kafka > Issue Type: Test > Components: core, unit tests > Reporter: Matthias J. Sax > Priority: Critical > Labels: flaky-test > > kafka.server.ReplicaManagerTest.[1] usesTopicIds=true > {quote}org.opentest4j.AssertionFailedError: expected: <true> but was: <false> > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at > kafka.server.ReplicaManagerTest.assertFetcherHasTopicId(ReplicaManagerTest.scala:3502) > at > kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds(ReplicaManagerTest.scala:3572){quote} > STDOUT > {quote}[2021-12-07 16:19:35,906] ERROR Error while reading checkpoint file > /tmp/kafka-6310287969113820536/replication-offset-checkpoint > (kafka.server.LogDirFailureChannel:76) java.nio.file.NoSuchFileException: > /tmp/kafka-6310287969113820536/replication-offset-checkpoint at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at > sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) > at java.nio.file.Files.newByteChannel(Files.java:361) at > java.nio.file.Files.newByteChannel(Files.java:407) at > java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384) > at java.nio.file.Files.newInputStream(Files.java:152) at > java.nio.file.Files.newBufferedReader(Files.java:2784) at > java.nio.file.Files.newBufferedReader(Files.java:2816) at > org.apache.kafka.server.common.CheckpointFile.read(CheckpointFile.java:104) > at > kafka.server.checkpoints.CheckpointFileWithFailureHandler.read(CheckpointFileWithFailureHandler.scala:48) > at > kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:70) > at > kafka.server.checkpoints.LazyOffsetCheckpointMap.offsets$lzycompute(OffsetCheckpointFile.scala:94) > at > kafka.server.checkpoints.LazyOffsetCheckpointMap.offsets(OffsetCheckpointFile.scala:94) > at > kafka.server.checkpoints.LazyOffsetCheckpointMap.fetch(OffsetCheckpointFile.scala:97) > at > kafka.server.checkpoints.LazyOffsetCheckpoints.fetch(OffsetCheckpointFile.scala:89) > at kafka.cluster.Partition.updateHighWatermark$1(Partition.scala:348) at > kafka.cluster.Partition.createLog(Partition.scala:361) at > kafka.cluster.Partition.maybeCreate$1(Partition.scala:334) at > kafka.cluster.Partition.createLogIfNotExists(Partition.scala:341) at > kafka.cluster.Partition.$anonfun$makeLeader$1(Partition.scala:546) at > kafka.cluster.Partition.makeLeader(Partition.scala:530) at > kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$3(ReplicaManager.scala:2163) > at scala.Option.foreach(Option.scala:437) at > kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$2(ReplicaManager.scala:2160) > at > kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$2$adapted(ReplicaManager.scala:2159) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) > at > scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) > at > kafka.server.ReplicaManager.applyLocalLeadersDelta(ReplicaManager.scala:2159) > at kafka.server.ReplicaManager.applyDelta(ReplicaManager.scala:2136) at > kafka.server.ReplicaManagerTest.testDeltaToLeaderOrFollowerMarksPartitionOfflineIfLogCantBeCreated(ReplicaManagerTest.scala:3349){quote} -- This message was sent by Atlassian Jira (v8.20.1#820001)