jmckenzie-dev commented on code in PR #178:
URL:
https://github.com/apache/cassandra-analytics/pull/178#discussion_r2896708734
##########
cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java:
##########
Review Comment:
I was tracing the code here, and it looks like the caller of `run()` will
basically absorb all Throwables that bubble up, log them, and then continue
executing. That _feels_ like it's a huge problematic gap in our flow. The code
I'm looking at:
```
protected void runSafe(CompletableFuture<Void> future)
{
try
{
run();
completeActiveFuture(future);
scheduleNextRun();
}
catch (NotEnoughReplicasException e)
{
// NotEnoughReplicasException can occur when too many replicas
are down
// OR if there are no new commit logs to read if writes are idle
on the cluster
completeActiveFuture(future);
scheduleRun(cdcOptions.sleepWhenInsufficientReplicas().toMillis());
}
catch (Throwable t)
{
completeActiveFuture(future);
if (handleError(t))
{
LOGGER.warn("CdcConsumer epoch failed with recoverable
error, scheduling next run jobId={} partition={} epoch={}",
jobId, partitionId, currentState.epoch, t);
scheduleNextRun();
}
else
{
LOGGER.error("CdcConsumer epoch failed with unrecoverable
error jobId={} partition={} epoch={}",
jobId, partitionId, currentState.epoch, t);
stop();
}
}
}
```
`handleError` just logs the message and keeps on trucking. OutOfMemory? We
keep going. Disk failures? Keep going.
We have the
[JVMStabilityInspector](https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java)
in Cassandra that I ended up writing for just this type of problem - wrapping
intelligence and logic in a central place around "is this a recoverable error
or not? And should users be able to configure it as such?"
So that said - I don't think we need to go THAT far (certainly not in this
patch), but just wondering if there's something we should do here w/augmenting
the `handleError` path or the exception handling path in `runSafe` that's less
"absorb and allow all errors" and has a _little_ more intelligence around our
exception handling situation to prevent the situation that gave rise to this
ticket: data loss from exception cases.
##########
cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java:
##########
@@ -442,5 +454,6 @@ protected void refreshSchema()
public void close()
{
this.stop();
+ statePersister.stop();
Review Comment:
I _think_ we also need to stop the `statePersistor` in the `Cdc#stop`
method. Right now we have:
```
public void stop(boolean blocking) throws ExecutionException,
InterruptedException
{
if (isRunning.get() && isRunning.compareAndSet(true, false))
{
LOGGER.info("Stopping CDC Consumer jobId={} partitionId={}",
jobId, partitionId);
CompletableFuture<Void> activeFuture = active.get();
if (activeFuture != null && blocking)
{
// block until active future completes
long timeout = cdcOptions.stopTimeout().toMillis();
try
{
activeFuture.get(timeout, TimeUnit.MILLISECONDS);
}
catch (TimeoutException e)
{
LOGGER.warn("Failed to cleanly shutdown active future
after {} millis", timeout);
stats.cdcConsumerStopTimeout();
}
}
LOGGER.info("Stopped CDC Consumer jobId={} partitionId={}",
jobId, partitionId);
}
}
```
Tracing the paths that lead to here, it's the exception path from `runSafe`
we'll never hit because `handleError` eats everything. So I guess the question
is: if someone calls `stop()` on Cdc, do they also expect the `StatePersister`
to flush and stop as well? Or should those be able to happen separately? I
think it's the former and, if so, we should wire that child `stop` call to the
parent (i.e. if `Cdc` stops, `StatePersister` stops).
Then a call to `close` would call `Cdc#stop` which would then also stop
children.
##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java:
##########
Review Comment:
We should switch the order here I think. If we throw an exception during the
`flush()` call below, the timerId will be set to -1 at this point allowing
subsequent `start()` calls even though the flush failed. My thinking is we
should have a flow of:
1. flush
2. If that succeeds, THEN we stop the timer
Since this is in a synchronized block we're not at risk of any kernel
scheduling or races there. But should be in line with the spirit of what we're
trying to accomplish on this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]