vvcephei commented on a change in pull request #9075: URL: https://github.com/apache/kafka/pull/9075#discussion_r460307479
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java ########## @@ -114,8 +118,16 @@ public void flushState() { stateMgr.checkpoint(offsets); } - public void close() throws IOException { + public void close(final boolean wipeStateStore) throws IOException { stateMgr.close(); + if (wipeStateStore) { + try { + log.error("Wiping state stores for global task."); + Utils.delete(stateMgr.baseDir()); + } catch (final IOException e) { + log.error("Failed to wiping state stores for global task.", e); Review comment: ```suggestion log.error("Failed to delete global task directory after detecting corruption.", e); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java ########## @@ -114,8 +118,16 @@ public void flushState() { stateMgr.checkpoint(offsets); } - public void close() throws IOException { + public void close(final boolean wipeStateStore) throws IOException { stateMgr.close(); + if (wipeStateStore) { + try { + log.error("Wiping state stores for global task."); Review comment: This doesn't seem to be an error. Maybe info would be better? Also, I think "wipe state stores" might be confusing for a user looking at the log messages with no context. "Deleting the task directory" seems to be a more context-free statement of what we're doing. ```suggestion log.info("Deleting global task directory after detecting corruption."); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java ########## @@ -31,7 +31,7 @@ void flushState(); - void close() throws IOException; + void close(final boolean wipeStateStore) throws IOException; Review comment: Thanks @mjsax , this sounds perfect to me. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ########## @@ -284,10 +278,21 @@ public void run() { } setState(State.RUNNING); + boolean wipeStateStore = false; try { while (stillRunning()) { stateConsumer.pollAndUpdate(); } + } catch (final InvalidOffsetException recoverableException) { + wipeStateStore = true; + log.error( + "Updating global state failed. You can restart KafkaStreams to recover from this error.", Review comment: ```suggestion "Updating global state failed due to inconsistent local state. Will attempt to clean up the local state. You can restart KafkaStreams to recover from this error.", ``` Just a thought to indicate why just restarting would recover anything. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org