vvcephei commented on a change in pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#discussion_r460307479
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
##########
@@ -114,8 +118,16 @@ public void flushState() {
stateMgr.checkpoint(offsets);
}
- public void close() throws IOException {
+ public void close(final boolean wipeStateStore) throws IOException {
stateMgr.close();
+ if (wipeStateStore) {
+ try {
+ log.error("Wiping state stores for global task.");
+ Utils.delete(stateMgr.baseDir());
+ } catch (final IOException e) {
+ log.error("Failed to wiping state stores for global task.", e);
Review comment:
```suggestion
log.error("Failed to delete global task directory after
detecting corruption.", e);
```
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
##########
@@ -114,8 +118,16 @@ public void flushState() {
stateMgr.checkpoint(offsets);
}
- public void close() throws IOException {
+ public void close(final boolean wipeStateStore) throws IOException {
stateMgr.close();
+ if (wipeStateStore) {
+ try {
+ log.error("Wiping state stores for global task.");
Review comment:
This doesn't seem to be an error. Maybe info would be better? Also, I
think "wipe state stores" might be confusing for a user looking at the log
messages with no context. "Deleting the task directory" seems to be a more
context-free statement of what we're doing.
```suggestion
log.info("Deleting global task directory after detecting
corruption.");
```
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java
##########
@@ -31,7 +31,7 @@
void flushState();
- void close() throws IOException;
+ void close(final boolean wipeStateStore) throws IOException;
Review comment:
Thanks @mjsax , this sounds perfect to me.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##########
@@ -284,10 +278,21 @@ public void run() {
}
setState(State.RUNNING);
+ boolean wipeStateStore = false;
try {
while (stillRunning()) {
stateConsumer.pollAndUpdate();
}
+ } catch (final InvalidOffsetException recoverableException) {
+ wipeStateStore = true;
+ log.error(
+ "Updating global state failed. You can restart KafkaStreams to
recover from this error.",
Review comment:
```suggestion
"Updating global state failed due to inconsistent local
state. Will attempt to clean up the local state. You can restart KafkaStreams
to recover from this error.",
```
Just a thought to indicate why just restarting would recover anything.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]