jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348107552
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws
InterruptedException {
assertEquals(1, cnt.get());
assertEquals(0, ctx.timer.size());
}
+
+ @Test
+ public void testStateChanges() throws Exception {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = mock(MockPartitionWriter.class);
+ MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+ MockCoordinatorShardBuilderSupplier supplier =
mock(MockCoordinatorShardBuilderSupplier.class);
+ MockCoordinatorShardBuilder builder =
mock(MockCoordinatorShardBuilder.class);
+ MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+ GroupCoordinatorRuntimeMetrics runtimeMetrics =
mock(GroupCoordinatorRuntimeMetrics.class);
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withLoader(loader)
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(runtimeMetrics)
+ .build();
+
+ when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
+ when(builder.withTime(any())).thenReturn(builder);
+ when(builder.withTimer(any())).thenReturn(builder);
+ when(builder.withTopicPartition(any())).thenReturn(builder);
+ when(builder.build()).thenReturn(coordinator);
+ when(supplier.get()).thenReturn(builder);
+ CompletableFuture<CoordinatorLoader.LoadSummary> future = new
CompletableFuture<>();
+ when(loader.load(TP, coordinator)).thenReturn(future);
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 0);
+
+ // Getting the context succeeds and the coordinator should be in
loading.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(LOADING, ctx.state);
+ verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL,
LOADING);
+
+ // When the loading fails, the coordinator transitions to failed.
+ future.completeExceptionally(new Exception("failure"));
+ assertEquals(FAILED, ctx.state);
+ verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING,
FAILED);
+
+ // Start loading a new topic partition.
+ TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+ future = new CompletableFuture<>();
+ when(loader.load(tp, coordinator)).thenReturn(future);
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(tp, 0);
+ // Getting the context succeeds and the coordinator should be in
loading.
+ ctx = runtime.contextOrThrow(tp);
+ assertEquals(LOADING, ctx.state);
+ verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL,
LOADING);
+
+ // When the loading completes, the coordinator transitions to active.
+ future.complete(null);
+ assertEquals(ACTIVE, ctx.state);
+ verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING,
ACTIVE);
+
+ runtime.close();
+ verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED,
CLOSED);
Review Comment:
maybe a bit unrelated to this test, but do we just keep all partitions we
try to load forever (and then transition to close when we close the runtime)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]