jolshan commented on code in PR #13421: URL: https://github.com/apache/kafka/pull/13421#discussion_r1232859633
########## core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala: ########## @@ -196,6 +201,110 @@ class ReplicaAlterLogDirsThreadTest { assertEquals(0, thread.partitionCount) } + @Test + def shouldResumeCleanLogDirAfterMarkPartitionFailed(): Unit = { + val brokerId = 1 + val partitionId = 0 + val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "localhost:1234")) + + val partition = Mockito.mock(classOf[Partition]) + val replicaManager = Mockito.mock(classOf[ReplicaManager]) + val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager]) + val futureLog = Mockito.mock(classOf[UnifiedLog]) + val logManager = Mockito.mock(classOf[LogManager]) + + val logs = new Pool[TopicPartition, UnifiedLog]() + logs.put(t1p0, futureLog) + val logCleaner = new LogCleaner(new CleanerConfig(true), + logDirs = Array(TestUtils.tempDir()), + logs = logs, + logDirFailureChannel = new LogDirFailureChannel(1), + time = new MockTime) + + val leaderEpoch = 5 + val logEndOffset = 0 + + when(partition.partitionId).thenReturn(partitionId) + when(replicaManager.metadataCache).thenReturn(metadataCache) + when(replicaManager.futureLocalLogOrException(t1p0)).thenReturn(futureLog) + when(replicaManager.futureLogExists(t1p0)).thenReturn(true) + when(replicaManager.onlinePartition(t1p0)).thenReturn(Some(partition)) + when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition) + when(replicaManager.logManager).thenReturn(logManager) + doAnswer(_ => { + logCleaner.abortAndPauseCleaning(t1p0) + }).when(logManager).abortAndPauseCleaning(t1p0) + doAnswer(_ => { + logCleaner.resumeCleaning(Seq(t1p0)) + }).when(logManager).resumeCleaning(t1p0) + + when(quotaManager.isQuotaExceeded).thenReturn(false) + + when(partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, fetchOnlyFromLeader = false)) + .thenReturn(new EpochEndOffset() + .setPartition(partitionId) + .setErrorCode(Errors.NONE.code) + .setLeaderEpoch(leaderEpoch) + .setEndOffset(logEndOffset)) + when(partition.futureLocalLogOrException).thenReturn(futureLog) + doNothing().when(partition).truncateTo(offset = 0, isFuture = true) + when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(false) + + when(futureLog.logStartOffset).thenReturn(0L) + when(futureLog.logEndOffset).thenReturn(0L) + when(futureLog.latestEpoch).thenReturn(None) + + val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L, + config.replicaFetchMaxBytes, Optional.of(leaderEpoch)) + val responseData = new FetchPartitionData( + Errors.NONE, + 0L, + 0L, + MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8))), + Optional.empty(), + OptionalLong.empty(), + Optional.empty(), + OptionalInt.empty(), + false) + mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, responseData) + + val endPoint = new BrokerEndPoint(0, "localhost", 1000) + val leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, quotaManager) + val thread = new ReplicaAlterLogDirsThread( + "alter-logs-dirs-thread", + leader, + failedPartitions, + replicaManager, + quotaManager, + new BrokerTopicStats, + config.replicaFetchBackoffMs) + + // before starting the fetch, pause the clean of the partition. + logManager.abortAndPauseCleaning(t1p0) + thread.addPartitions(Map(t1p0 -> initialFetchState(fetchOffset = 0L, leaderEpoch))) + assertTrue(thread.fetchState(t1p0).isDefined) + assertEquals(1, thread.partitionCount) + + // first test: get handle without exception + when(partition.appendRecordsToFollowerOrFutureReplica(any(), ArgumentMatchers.eq(true))).thenReturn(None) + + thread.doWork() + + assertTrue(thread.fetchState(t1p0).isDefined) + assertEquals(1, thread.partitionCount) + assertTrue(logCleaner.isCleaningInStatePaused(t1p0)) + + // second test: process partition data with throwing a KafkaStorageException. + when(partition.appendRecordsToFollowerOrFutureReplica(any(), ArgumentMatchers.eq(true))).thenThrow(new KafkaStorageException("disk error")) Review Comment: Do we have a way to verify the log is cleaned up after it is in failed state? Or in a state to resume correctly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org