jmckenzie-dev commented on code in PR #178:
URL: 
https://github.com/apache/cassandra-analytics/pull/178#discussion_r2896708734


##########
cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java:
##########


Review Comment:
   I was tracing the code here, and it looks like the caller of `run()` will 
basically absorb all Throwables that bubble up, log them, and then continue 
executing. That _feels_ like it's a huge problematic gap in our flow. The code 
I'm looking at:
   ```
       protected void runSafe(CompletableFuture<Void> future)
       {
           try
           {
               run();
               completeActiveFuture(future);
               scheduleNextRun();
           }
           catch (NotEnoughReplicasException e)
           {
               // NotEnoughReplicasException can occur when too many replicas 
are down
               // OR if there are no new commit logs to read if writes are idle 
on the cluster
               completeActiveFuture(future);
               
scheduleRun(cdcOptions.sleepWhenInsufficientReplicas().toMillis());
           }
           catch (Throwable t)
           {
               completeActiveFuture(future);
   
               if (handleError(t))
               {
                   LOGGER.warn("CdcConsumer epoch failed with recoverable 
error, scheduling next run jobId={} partition={} epoch={}",
                               jobId, partitionId, currentState.epoch, t);
                   scheduleNextRun();
               }
               else
               {
                   LOGGER.error("CdcConsumer epoch failed with unrecoverable 
error jobId={} partition={} epoch={}",
                                jobId, partitionId, currentState.epoch, t);
                   stop();
               }
           }
       }
   ```
   `handleError` just logs the message and keeps on trucking. OutOfMemory? We 
keep going. Disk failures? Keep going.
   
   We have the 
[JVMStabilityInspector](https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java)
 in Cassandra that I ended up writing for just this type of problem - wrapping 
intelligence and logic in a central place around "is this a recoverable error 
or not? And should users be able to configure it as such?"
   
   So that said - I don't think we need to go THAT far (certainly not in this 
patch), but just wondering if there's something we should do here w/augmenting 
the `handleError` path or the exception handling path in `runSafe` that's less 
"absorb and allow all errors" and has a _little_ more intelligence around our 
exception handling situation to prevent the situation that gave rise to this 
ticket: data loss from exception cases.



##########
cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java:
##########
@@ -442,5 +454,6 @@ protected void refreshSchema()
     public void close()
     {
         this.stop();
+        statePersister.stop();

Review Comment:
   I _think_ we also need to stop the `statePersistor` in the `Cdc#stop` 
method. Right now we have:
   ```
       public void stop(boolean blocking) throws ExecutionException, 
InterruptedException
       {
           if (isRunning.get() && isRunning.compareAndSet(true, false))
           {
               LOGGER.info("Stopping CDC Consumer jobId={} partitionId={}", 
jobId, partitionId);
               CompletableFuture<Void> activeFuture = active.get();
               if (activeFuture != null && blocking)
               {
                   // block until active future completes
                   long timeout = cdcOptions.stopTimeout().toMillis();
                   try
                   {
                       activeFuture.get(timeout, TimeUnit.MILLISECONDS);
                   }
                   catch (TimeoutException e)
                   {
                       LOGGER.warn("Failed to cleanly shutdown active future 
after {} millis", timeout);
                       stats.cdcConsumerStopTimeout();
                   }
               }
               LOGGER.info("Stopped CDC Consumer jobId={} partitionId={}", 
jobId, partitionId);
           }
       }
   ```
   Tracing the paths that lead to here, it's the exception path from `runSafe` 
we'll never hit because `handleError` eats everything. So I guess the question 
is: if someone calls `stop()` on Cdc, do they also expect the `StatePersister` 
to flush and stop as well? Or should those be able to happen separately? I 
think it's the former and, if so, we should wire that child `stop` call to the 
parent (i.e. if `Cdc` stops, `StatePersister` stops).
   
   Then a call to `close` would call `Cdc#stop` which would then also stop 
children.



##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java:
##########


Review Comment:
   We should switch the order here I think. If we throw an exception during the 
`flush()` call below, the timerId will be set to -1 at this point allowing 
subsequent `start()` calls even though the flush failed. My thinking is we 
should have a flow of:
   1. flush
   2. If that succeeds, THEN we stop the timer
   Since this is in a synchronized block we're not at risk of any kernel 
scheduling or races there. But should be in line with the spirit of what we're 
trying to accomplish on this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to