jolshan commented on code in PR #15182:
URL: https://github.com/apache/kafka/pull/15182#discussion_r1450722995


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -684,6 +685,60 @@ public void testScheduleUnloading() {
         // Getting the coordinator context fails because it no longer exists.
         assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
     }
+
+    @Test
+    public void testScheduleUnloadingWithEmptyEpoch() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+        MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+        MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(supplier)
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.withTime(any())).thenReturn(builder);
+        when(builder.withTimer(any())).thenReturn(builder);
+        when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+        when(builder.withTopicPartition(any())).thenReturn(builder);
+        when(builder.build()).thenReturn(coordinator);
+        when(supplier.get()).thenReturn(builder);
+
+        // Loads the coordinator. It directly transitions to active.
+        runtime.scheduleLoadOperation(TP, 10);

Review Comment:
   Is there an amount of time it takes before we clear the deleted partition 
state? Or do we keep all deleted partitions until the offsets topic is 
reassigned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to