yifan-c commented on code in PR #345:
URL: https://github.com/apache/cassandra-sidecar/pull/345#discussion_r3286554272


##########
server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java:
##########
@@ -160,4 +175,147 @@ void testEventConsumerCreatesValidConsumer()
         assertThat(result).isNotNull();
         assertThat(result).isInstanceOf(CdcEventConsumer.class);
     }
+
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testHandleTokenRangeChangedDispatchesToWorkerPool() throws Exception
+    {
+        Message<Object> msg = mock(Message.class);
+        
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+        CdcPublisher spyPublisher = spy(cdcPublisher);
+        spyPublisher.handle(msg);
+
+        ArgumentCaptor<Callable<Void>> captor = 
ArgumentCaptor.forClass(Callable.class);
+        verify(taskExecutorPool).executeBlocking(captor.capture());
+
+        captor.getValue().call();
+        verify(spyPublisher).handleTokenRangeChange();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testHandleTokenRangeGainedPassesEventToHandler() throws Exception
+    {
+        Message<Object> msg = mock(Message.class);
+        
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address());
+        RangeManager.RangeChangeEvent event = 
mock(RangeManager.RangeChangeEvent.class);
+        when(msg.body()).thenReturn(event);
+
+        CdcPublisher spyPublisher = spy(cdcPublisher);
+        spyPublisher.handle(msg);
+
+        ArgumentCaptor<Callable<Void>> captor = 
ArgumentCaptor.forClass(Callable.class);
+        verify(taskExecutorPool).executeBlocking(captor.capture());
+
+        captor.getValue().call();
+        verify(spyPublisher).handleRangeGained(event);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testHandleTokenRangeLostPassesEventToHandler() throws Exception
+    {
+        Message<Object> msg = mock(Message.class);
+        
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address());
+        RangeManager.RangeChangeEvent event = 
mock(RangeManager.RangeChangeEvent.class);
+        when(msg.body()).thenReturn(event);
+
+        CdcPublisher spyPublisher = spy(cdcPublisher);
+        spyPublisher.handle(msg);
+
+        ArgumentCaptor<Callable<Void>> captor = 
ArgumentCaptor.forClass(Callable.class);
+        verify(taskExecutorPool).executeBlocking(captor.capture());
+
+        captor.getValue().call();
+        verify(spyPublisher).handleRangeLost(event);
+    }
+
+    @Test
+    void testHandleServerStopRunsStopOnEventLoopWithoutWorkerPool()
+    {
+        Message<Object> msg = mock(Message.class);
+        when(msg.address()).thenReturn(ON_SERVER_STOP.address());
+
+        // ON_SERVER_STOP must run synchronously on the event loop so shutdown
+        // completes before the loop closes; the no-worker-pool assertion below
+        // is the load-bearing part of the contract.
+        assertThatCode(() -> 
cdcPublisher.handle(msg)).doesNotThrowAnyException();
+        verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));
+    }
+
+    @Test
+    void testHandleCdcCacheWarmedUpRunsOnEventLoopWithoutWorkerPool()
+    {
+        Message<Object> msg = mock(Message.class);
+        when(msg.address()).thenReturn(ON_CDC_CACHE_WARMED_UP.address());
+
+        cdcPublisher.handle(msg);
+
+        verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));
+    }
+
+    @Test
+    void testHandleUnknownAddressIsNoOp()
+    {
+        Message<Object> msg = mock(Message.class);
+        when(msg.address()).thenReturn("unknown.address");
+
+        cdcPublisher.handle(msg);
+
+        verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testHandleWorkerPoolFailureIsLoggedAndCaptured()
+    {
+        RuntimeException boom = new RuntimeException("boom");
+        
when(taskExecutorPool.executeBlocking(any(Callable.class))).thenReturn(Future.failedFuture(boom));
+
+        Message<Object> msg = mock(Message.class);
+        
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+        cdcPublisher.handle(msg);
+
+        // The discarded Future's failure path must reach our backstop:
+        // captureUnrecoverableCdcError so the failure is observable in 
metrics.
+        verify(sidecarCdcStats).captureUnrecoverableCdcError(boom);
+    }
+
+    private SslConfiguration mockSslConfiguration(boolean enabled,
+                                                  boolean preferOpenSSL,
+                                                  String clientAuth,
+                                                  java.util.List<String> 
cipherSuites,
+                                                  java.util.List<String> 
secureTransportProtocols,
+                                                  String handshakeTimeout,
+                                                  boolean keystoreConfigured,
+                                                  boolean truststoreConfigured)
+    {
+        SslConfiguration sslConfig = mock(SslConfiguration.class, 
RETURNS_DEEP_STUBS);
+        when(sslConfig.enabled()).thenReturn(enabled);
+        when(sslConfig.preferOpenSSL()).thenReturn(preferOpenSSL);
+        when(sslConfig.clientAuth()).thenReturn(clientAuth);
+        when(sslConfig.cipherSuites()).thenReturn(cipherSuites);
+        
when(sslConfig.secureTransportProtocols()).thenReturn(secureTransportProtocols);
+
+        SecondBoundConfiguration durationSpec = 
mock(SecondBoundConfiguration.class);
+        when(durationSpec.toString()).thenReturn(handshakeTimeout);
+        when(sslConfig.handshakeTimeout()).thenReturn(durationSpec);
+
+        when(sslConfig.isKeystoreConfigured()).thenReturn(keystoreConfigured);
+        
when(sslConfig.isTrustStoreConfigured()).thenReturn(truststoreConfigured);
+
+        return sslConfig;
+    }
+
+    private KeyStoreConfiguration mockKeystoreConfiguration(String path, 
String password, String type)
+    {
+        KeyStoreConfiguration config = mock(KeyStoreConfiguration.class);
+        when(config.path()).thenReturn(path);
+        when(config.password()).thenReturn(password);
+        when(config.type()).thenReturn(type);
+        return config;
+    }

Review Comment:
   those 2 methods are unused. Please remove them if it is a mistake. 



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -306,26 +321,35 @@ else if (conf.cdcEnabled())
 
     // EventBus handlers
     @Override
-    public synchronized void handle(Message<Object> msg)
+    public void handle(Message<Object> msg)
     {
-        if 
(msg.address().equals(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address()))
+        String address = msg.address();
+        if 
(address.equals(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address()))
         {
-            handleTokenRangeChange();
+            executorPools.executeBlocking(() -> { handleTokenRangeChange(); 
return null; })

Review Comment:
   Code looks nicer with `runBlocking`. Please update the other places in the 
method. 
   ```suggestion
               executorPools.runBlocking(this::handleTokenRangeChange)
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -160,10 +160,25 @@ public void handle(Message<Object> event)
             executorPools.executeBlocking(() -> {
                 restart();
                 return null;
-            });
+            }).onFailure(t -> 
handleAsyncFailure(ON_CDC_CONFIGURATION_CHANGED.address(), t));
         }
     }
 
+    /**
+     * Backstop for fire-and-forget {@code executeBlocking(...)} dispatches in 
event-bus handlers.
+     * The handler methods ({@code restart}, {@code stop}) already catch and 
record their own
+     * failures via {@code sidecarCdcStats}; this fires only when something 
unexpected escapes
+     * (e.g. an {@link Error}, or a stats-counter increment that throws before 
reaching the
+     * handler's own try/catch). Logging at ERROR + bumping
+     * {@link SidecarCdcStats#captureUnrecoverableCdcError(Throwable)} so the 
issue is surfaced
+     * rather than silently dropped by the discarded {@code Future}.
+     */
+    private void handleAsyncFailure(String address, Throwable t)
+    {
+        LOGGER.error("Unexpected error while processing event '{}' on worker 
pool", address, t);
+        sidecarCdcStats.captureUnrecoverableCdcError(t);
+    }
+

Review Comment:
   I do not think the method will be invoked, since the `restart` &  `stop` 
methods does not re-throw exception. I am fine with keeping the method. 



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -160,10 +160,25 @@ public void handle(Message<Object> event)
             executorPools.executeBlocking(() -> {
                 restart();
                 return null;
-            });
+            }).onFailure(t -> 
handleAsyncFailure(ON_CDC_CONFIGURATION_CHANGED.address(), t));

Review Comment:
   cosmetic feedback
   
   ```suggestion
               executorPools.runBlocking(CdcPublisher.this::restart)
                            .onFailure(t -> 
handleAsyncFailure(ON_CDC_CONFIGURATION_CHANGED.address(), t));
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -160,10 +160,25 @@ public void handle(Message<Object> event)
             executorPools.executeBlocking(() -> {
                 restart();
                 return null;
-            });
+            }).onFailure(t -> 
handleAsyncFailure(ON_CDC_CONFIGURATION_CHANGED.address(), t));
         }
     }
 
+    /**
+     * Backstop for fire-and-forget {@code executeBlocking(...)} dispatches in 
event-bus handlers.
+     * The handler methods ({@code restart}, {@code stop}) already catch and 
record their own
+     * failures via {@code sidecarCdcStats}; this fires only when something 
unexpected escapes
+     * (e.g. an {@link Error}, or a stats-counter increment that throws before 
reaching the
+     * handler's own try/catch). Logging at ERROR + bumping
+     * {@link SidecarCdcStats#captureUnrecoverableCdcError(Throwable)} so the 
issue is surfaced
+     * rather than silently dropped by the discarded {@code Future}.
+     */

Review Comment:
   nit: the method itself is pretty straightforward. I find the comment is 
harder to understand. 



##########
server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java:
##########
@@ -160,4 +175,147 @@ void testEventConsumerCreatesValidConsumer()
         assertThat(result).isNotNull();
         assertThat(result).isInstanceOf(CdcEventConsumer.class);
     }
+
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testHandleTokenRangeChangedDispatchesToWorkerPool() throws Exception
+    {
+        Message<Object> msg = mock(Message.class);
+        
when(msg.address()).thenReturn(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address());
+
+        CdcPublisher spyPublisher = spy(cdcPublisher);
+        spyPublisher.handle(msg);
+
+        ArgumentCaptor<Callable<Void>> captor = 
ArgumentCaptor.forClass(Callable.class);
+        verify(taskExecutorPool).executeBlocking(captor.capture());
+
+        captor.getValue().call();
+        verify(spyPublisher).handleTokenRangeChange();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testHandleTokenRangeGainedPassesEventToHandler() throws Exception
+    {
+        Message<Object> msg = mock(Message.class);
+        
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address());
+        RangeManager.RangeChangeEvent event = 
mock(RangeManager.RangeChangeEvent.class);
+        when(msg.body()).thenReturn(event);
+
+        CdcPublisher spyPublisher = spy(cdcPublisher);
+        spyPublisher.handle(msg);
+
+        ArgumentCaptor<Callable<Void>> captor = 
ArgumentCaptor.forClass(Callable.class);
+        verify(taskExecutorPool).executeBlocking(captor.capture());
+
+        captor.getValue().call();
+        verify(spyPublisher).handleRangeGained(event);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testHandleTokenRangeLostPassesEventToHandler() throws Exception
+    {
+        Message<Object> msg = mock(Message.class);
+        
when(msg.address()).thenReturn(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address());
+        RangeManager.RangeChangeEvent event = 
mock(RangeManager.RangeChangeEvent.class);
+        when(msg.body()).thenReturn(event);
+
+        CdcPublisher spyPublisher = spy(cdcPublisher);
+        spyPublisher.handle(msg);
+
+        ArgumentCaptor<Callable<Void>> captor = 
ArgumentCaptor.forClass(Callable.class);
+        verify(taskExecutorPool).executeBlocking(captor.capture());
+
+        captor.getValue().call();
+        verify(spyPublisher).handleRangeLost(event);
+    }
+
+    @Test
+    void testHandleServerStopRunsStopOnEventLoopWithoutWorkerPool()
+    {
+        Message<Object> msg = mock(Message.class);
+        when(msg.address()).thenReturn(ON_SERVER_STOP.address());
+
+        // ON_SERVER_STOP must run synchronously on the event loop so shutdown
+        // completes before the loop closes; the no-worker-pool assertion below
+        // is the load-bearing part of the contract.
+        assertThatCode(() -> 
cdcPublisher.handle(msg)).doesNotThrowAnyException();
+        verify(taskExecutorPool, never()).executeBlocking(any(Callable.class));

Review Comment:
   Spy the publisher to verify stop is called too. 
   
   ```suggestion
           CdcPublisher spyPublisher = spy(cdcPublisher);
           spyPublisher.handle(msg);
           verify(spyPublisher).stop(); // verify stop is called
           verify(taskExecutorPool, 
never()).executeBlocking(any(Callable.class));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to