yifan-c commented on code in PR #345:
URL: https://github.com/apache/cassandra-sidecar/pull/345#discussion_r3286554272
##########
server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java:
##########
@@ -160,4 +175,147 @@ void testEventConsumerCreatesValidConsumer()
assertThat(result).isNotNull();
assertThat(result).isInstanceOf(CdcEventConsumer.class);
}
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeChangedDispatchesToWorkerPool() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleTokenRangeChange();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeGainedPassesEventToHandler() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address());
+ RangeManager.RangeChangeEvent event =
mock(RangeManager.RangeChangeEvent.class);
+ when(msg.body()).thenReturn(event);
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleRangeGained(event);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeLostPassesEventToHandler() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address());
+ RangeManager.RangeChangeEvent event =
mock(RangeManager.RangeChangeEvent.class);
+ when(msg.body()).thenReturn(event);
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleRangeLost(event);
+ }
+
+ @Test
+ void testHandleServerStopRunsStopOnEventLoopWithoutWorkerPool()
+ {
+ Message<Object> msg = mock(Message.class);
+ when(msg.address()).thenReturn(ON_SERVER_STOP.address());
+
+ // ON_SERVER_STOP must run synchronously on the event loop so shutdown
+ // completes before the loop closes; the no-worker-pool assertion below
+ // is the load-bearing part of the contract.
+ assertThatCode(() ->
cdcPublisher.handle(msg)).doesNotThrowAnyException();
+ verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));
+ }
+
+ @Test
+ void testHandleCdcCacheWarmedUpRunsOnEventLoopWithoutWorkerPool()
+ {
+ Message<Object> msg = mock(Message.class);
+ when(msg.address()).thenReturn(ON_CDC_CACHE_WARMED_UP.address());
+
+ cdcPublisher.handle(msg);
+
+ verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));
+ }
+
+ @Test
+ void testHandleUnknownAddressIsNoOp()
+ {
+ Message<Object> msg = mock(Message.class);
+ when(msg.address()).thenReturn("unknown.address");
+
+ cdcPublisher.handle(msg);
+
+ verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleWorkerPoolFailureIsLoggedAndCaptured()
+ {
+ RuntimeException boom = new RuntimeException("boom");
+
when(taskExecutorPool.executeBlocking(any(Callable.class))).thenReturn(Future.failedFuture(boom));
+
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+ cdcPublisher.handle(msg);
+
+ // The discarded Future's failure path must reach our backstop:
+ // captureUnrecoverableCdcError so the failure is observable in
metrics.
+ verify(sidecarCdcStats).captureUnrecoverableCdcError(boom);
+ }
+
+ private SslConfiguration mockSslConfiguration(boolean enabled,
+ boolean preferOpenSSL,
+ String clientAuth,
+ java.util.List<String>
cipherSuites,
+ java.util.List<String>
secureTransportProtocols,
+ String handshakeTimeout,
+ boolean keystoreConfigured,
+ boolean truststoreConfigured)
+ {
+ SslConfiguration sslConfig = mock(SslConfiguration.class,
RETURNS_DEEP_STUBS);
+ when(sslConfig.enabled()).thenReturn(enabled);
+ when(sslConfig.preferOpenSSL()).thenReturn(preferOpenSSL);
+ when(sslConfig.clientAuth()).thenReturn(clientAuth);
+ when(sslConfig.cipherSuites()).thenReturn(cipherSuites);
+
when(sslConfig.secureTransportProtocols()).thenReturn(secureTransportProtocols);
+
+ SecondBoundConfiguration durationSpec =
mock(SecondBoundConfiguration.class);
+ when(durationSpec.toString()).thenReturn(handshakeTimeout);
+ when(sslConfig.handshakeTimeout()).thenReturn(durationSpec);
+
+ when(sslConfig.isKeystoreConfigured()).thenReturn(keystoreConfigured);
+
when(sslConfig.isTrustStoreConfigured()).thenReturn(truststoreConfigured);
+
+ return sslConfig;
+ }
+
+ private KeyStoreConfiguration mockKeystoreConfiguration(String path,
String password, String type)
+ {
+ KeyStoreConfiguration config = mock(KeyStoreConfiguration.class);
+ when(config.path()).thenReturn(path);
+ when(config.password()).thenReturn(password);
+ when(config.type()).thenReturn(type);
+ return config;
+ }
Review Comment:
those 2 methods are unused. Please remove them if it is a mistake.
##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -306,26 +321,35 @@ else if (conf.cdcEnabled())
// EventBus handlers
@Override
- public synchronized void handle(Message<Object> msg)
+ public void handle(Message<Object> msg)
{
- if
(msg.address().equals(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address()))
+ String address = msg.address();
+ if
(address.equals(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address()))
{
- handleTokenRangeChange();
+ executorPools.executeBlocking(() -> { handleTokenRangeChange();
return null; })
Review Comment:
Code looks nicer with `runBlocking`. Please update the other places in the
method.
```suggestion
executorPools.runBlocking(this::handleTokenRangeChange)
```
##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -160,10 +160,25 @@ public void handle(Message<Object> event)
executorPools.executeBlocking(() -> {
restart();
return null;
- });
+ }).onFailure(t ->
handleAsyncFailure(ON_CDC_CONFIGURATION_CHANGED.address(), t));
}
}
+ /**
+ * Backstop for fire-and-forget {@code executeBlocking(...)} dispatches in
event-bus handlers.
+ * The handler methods ({@code restart}, {@code stop}) already catch and
record their own
+ * failures via {@code sidecarCdcStats}; this fires only when something
unexpected escapes
+ * (e.g. an {@link Error}, or a stats-counter increment that throws before
reaching the
+ * handler's own try/catch). Logging at ERROR + bumping
+ * {@link SidecarCdcStats#captureUnrecoverableCdcError(Throwable)} so the
issue is surfaced
+ * rather than silently dropped by the discarded {@code Future}.
+ */
+ private void handleAsyncFailure(String address, Throwable t)
+ {
+ LOGGER.error("Unexpected error while processing event '{}' on worker
pool", address, t);
+ sidecarCdcStats.captureUnrecoverableCdcError(t);
+ }
+
Review Comment:
I do not think the method will be invoked, since the `restart` & `stop`
methods does not re-throw exception. I am fine with keeping the method.
##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -160,10 +160,25 @@ public void handle(Message<Object> event)
executorPools.executeBlocking(() -> {
restart();
return null;
- });
+ }).onFailure(t ->
handleAsyncFailure(ON_CDC_CONFIGURATION_CHANGED.address(), t));
Review Comment:
cosmetic feedback
```suggestion
executorPools.runBlocking(CdcPublisher.this::restart)
.onFailure(t ->
handleAsyncFailure(ON_CDC_CONFIGURATION_CHANGED.address(), t));
```
##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -160,10 +160,25 @@ public void handle(Message<Object> event)
executorPools.executeBlocking(() -> {
restart();
return null;
- });
+ }).onFailure(t ->
handleAsyncFailure(ON_CDC_CONFIGURATION_CHANGED.address(), t));
}
}
+ /**
+ * Backstop for fire-and-forget {@code executeBlocking(...)} dispatches in
event-bus handlers.
+ * The handler methods ({@code restart}, {@code stop}) already catch and
record their own
+ * failures via {@code sidecarCdcStats}; this fires only when something
unexpected escapes
+ * (e.g. an {@link Error}, or a stats-counter increment that throws before
reaching the
+ * handler's own try/catch). Logging at ERROR + bumping
+ * {@link SidecarCdcStats#captureUnrecoverableCdcError(Throwable)} so the
issue is surfaced
+ * rather than silently dropped by the discarded {@code Future}.
+ */
Review Comment:
nit: the method itself is pretty straightforward. I find the comment is
harder to understand.
##########
server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java:
##########
@@ -160,4 +175,147 @@ void testEventConsumerCreatesValidConsumer()
assertThat(result).isNotNull();
assertThat(result).isInstanceOf(CdcEventConsumer.class);
}
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeChangedDispatchesToWorkerPool() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleTokenRangeChange();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeGainedPassesEventToHandler() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address());
+ RangeManager.RangeChangeEvent event =
mock(RangeManager.RangeChangeEvent.class);
+ when(msg.body()).thenReturn(event);
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleRangeGained(event);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testHandleTokenRangeLostPassesEventToHandler() throws Exception
+ {
+ Message<Object> msg = mock(Message.class);
+
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address());
+ RangeManager.RangeChangeEvent event =
mock(RangeManager.RangeChangeEvent.class);
+ when(msg.body()).thenReturn(event);
+
+ CdcPublisher spyPublisher = spy(cdcPublisher);
+ spyPublisher.handle(msg);
+
+ ArgumentCaptor<Callable<Void>> captor =
ArgumentCaptor.forClass(Callable.class);
+ verify(taskExecutorPool).executeBlocking(captor.capture());
+
+ captor.getValue().call();
+ verify(spyPublisher).handleRangeLost(event);
+ }
+
+ @Test
+ void testHandleServerStopRunsStopOnEventLoopWithoutWorkerPool()
+ {
+ Message<Object> msg = mock(Message.class);
+ when(msg.address()).thenReturn(ON_SERVER_STOP.address());
+
+ // ON_SERVER_STOP must run synchronously on the event loop so shutdown
+ // completes before the loop closes; the no-worker-pool assertion below
+ // is the load-bearing part of the contract.
+ assertThatCode(() ->
cdcPublisher.handle(msg)).doesNotThrowAnyException();
+ verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));
Review Comment:
Spy the publisher to verify stop is called too.
```suggestion
CdcPublisher spyPublisher = spy(cdcPublisher);
spyPublisher.handle(msg);
verify(spyPublisher).stop(); // verify stop is called
verify(taskExecutorPool,
never()).executeBlocking(any(Callable.class));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]