Klose6 commented on code in PR #345:
URL: https://github.com/apache/cassandra-sidecar/pull/345#discussion_r3289649761
##########
server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java:
##########
@@ -160,4 +175,147 @@ void testEventConsumerCreatesValidConsumer()
assertThat(result).isNotNull();
assertThat(result).isInstanceOf(CdcEventConsumer.class);
}
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeChangedDispatchesToWorkerPool() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleTokenRangeChange();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeGainedPassesEventToHandler() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address());
+ RangeManager.RangeChangeEvent event =
mock(RangeManager.RangeChangeEvent.class);
+ when(msg.body()).thenReturn(event);
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleRangeGained(event);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeLostPassesEventToHandler() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address());
+ RangeManager.RangeChangeEvent event =
mock(RangeManager.RangeChangeEvent.class);
+ when(msg.body()).thenReturn(event);
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleRangeLost(event);
+ }
+
+ @Test
+ void testHandleServerStopRunsStopOnEventLoopWithoutWorkerPool()
+ {
+ Message<Object> msg = mock(Message.class);
+ when(msg.address()).thenReturn(ON_SERVER_STOP.address());
+
+ // ON_SERVER_STOP must run synchronously on the event loop so shutdown
+ // completes before the loop closes; the no-worker-pool assertion below
+ // is the load-bearing part of the contract.
+ assertThatCode(() ->
cdcPublisher.handle(msg)).doesNotThrowAnyException();
+ verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));
+ }
+
+ @Test
+ void testHandleCdcCacheWarmedUpRunsOnEventLoopWithoutWorkerPool()
+ {
+ Message<Object> msg = mock(Message.class);
+ when(msg.address()).thenReturn(ON_CDC_CACHE_WARMED_UP.address());
+
+ cdcPublisher.handle(msg);
+
+ verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));
+ }
+
+ @Test
+ void testHandleUnknownAddressIsNoOp()
+ {
+ Message<Object> msg = mock(Message.class);
+ when(msg.address()).thenReturn("unknown.address");
+
+ cdcPublisher.handle(msg);
+
+ verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleWorkerPoolFailureIsLoggedAndCaptured()
+ {
+ RuntimeException boom = new RuntimeException("boom");
+
when(taskExecutorPool.executeBlocking(any(Callable.class))).thenReturn(Future.failedFuture(boom));
+
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+ cdcPublisher.handle(msg);
+
+ // The discarded Future's failure path must reach our backstop:
+ // captureUnrecoverableCdcError so the failure is observable in
metrics.
+ verify(sidecarCdcStats).captureUnrecoverableCdcError(boom);
+ }
+
+ private SslConfiguration mockSslConfiguration(boolean enabled,
+ boolean preferOpenSSL,
+ String clientAuth,
+ java.util.List<String>
cipherSuites,
+ java.util.List<String>
secureTransportProtocols,
+ String handshakeTimeout,
+ boolean keystoreConfigured,
+ boolean truststoreConfigured)
+ {
+ SslConfiguration sslConfig = mock(SslConfiguration.class,
RETURNS_DEEP_STUBS);
+ when(sslConfig.enabled()).thenReturn(enabled);
+ when(sslConfig.preferOpenSSL()).thenReturn(preferOpenSSL);
+ when(sslConfig.clientAuth()).thenReturn(clientAuth);
+ when(sslConfig.cipherSuites()).thenReturn(cipherSuites);
+
when(sslConfig.secureTransportProtocols()).thenReturn(secureTransportProtocols);
+
+ SecondBoundConfiguration durationSpec =
mock(SecondBoundConfiguration.class);
+ when(durationSpec.toString()).thenReturn(handshakeTimeout);
+ when(sslConfig.handshakeTimeout()).thenReturn(durationSpec);
+
+ when(sslConfig.isKeystoreConfigured()).thenReturn(keystoreConfigured);
+
when(sslConfig.isTrustStoreConfigured()).thenReturn(truststoreConfigured);
+
+ return sslConfig;
+ }
+
+ private KeyStoreConfiguration mockKeystoreConfiguration(String path,
String password, String type)
+ {
+ KeyStoreConfiguration config = mock(KeyStoreConfiguration.class);
+ when(config.path()).thenReturn(path);
+ when(config.password()).thenReturn(password);
+ when(config.type()).thenReturn(type);
+ return config;
+ }
Review Comment:
yeah, good catch, removed them now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]