jeel2420 commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1347078859


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -538,7 +533,75 @@ class RemoteIndexCacheTest {
     assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
       s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
     // file is corrupted it should fetch from remote storage again
-    verifyFetchIndexInvocation(count = 1)
+    verifyFetchIndexInvocation(count = 1, indexTypes = Seq(IndexType.OFFSET))
+  }
+
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+    // Create a spy Cache Entry
+    val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+      time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+    val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata))
+    val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata))
+    val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata))
+
+    val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+    cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+    assertCacheSize(1)
+
+    val latchForCacheRead = new CountDownLatch(1)
+    val latchForCacheRemove = new CountDownLatch(1)
+    val latchForTestWait = new CountDownLatch(1)
+
+    var markForCleanupCallCount = 0
+
+    doAnswer((invocation: InvocationOnMock) => {
+      markForCleanupCallCount += 1
+
+      if (markForCleanupCallCount == 1) {
+        // Signal the CacheRead to unblock itself
+        latchForCacheRead.countDown()
+        // Wait for signal to start renaming the files
+        latchForCacheRemove.await()
+        // Calling the markForCleanup() actual method to start renaming the 
files
+        invocation.callRealMethod()
+        // Signal TestWait to unblock itself so that test can be completed
+        latchForTestWait.countDown()
+      } else {
+        // Subsequent call for markForCleanup method
+        latchForCacheRead.countDown()
+        latchForCacheRemove.countDown()
+      }

Review Comment:
   I found markForCleanup getting called at the end of the test maybe by 
cleanup() (not sure). But after that I added latchForTestWait so this might not 
be required anymore. I will check and remove if it is not required. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to