Klose6 commented on code in PR #345:
URL: https://github.com/apache/cassandra-sidecar/pull/345#discussion_r3289626059
##########
server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java:
##########
@@ -160,4 +175,147 @@ void testEventConsumerCreatesValidConsumer()
assertThat(result).isNotNull();
assertThat(result).isInstanceOf(CdcEventConsumer.class);
}
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeChangedDispatchesToWorkerPool() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleTokenRangeChange();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeGainedPassesEventToHandler() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address());
+ RangeManager.RangeChangeEvent event =
mock(RangeManager.RangeChangeEvent.class);
+ when(msg.body()).thenReturn(event);
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleRangeGained(event);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeLostPassesEventToHandler() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address());
+ RangeManager.RangeChangeEvent event =
mock(RangeManager.RangeChangeEvent.class);
+ when(msg.body()).thenReturn(event);
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleRangeLost(event);
+ }
+
+ @Test
+ void testHandleServerStopRunsStopOnEventLoopWithoutWorkerPool()
+ {
+ Message<Object> msg = mock(Message.class);
+ when(msg.address()).thenReturn(ON_SERVER_STOP.address());
+
+ // ON_SERVER_STOP must run synchronously on the event loop so shutdown
+ // completes before the loop closes; the no-worker-pool assertion below
+ // is the load-bearing part of the contract.
+ assertThatCode(() ->
cdcPublisher.handle(msg)).doesNotThrowAnyException();
+ verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));
Review Comment:
to verify the stop we need to change the stop() from private to package
private, I made the change to relax its visibility for testing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]