vvcephei commented on a change in pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#discussion_r460307479



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
##########
@@ -114,8 +118,16 @@ public void flushState() {
         stateMgr.checkpoint(offsets);
     }
 
-    public void close() throws IOException {
+    public void close(final boolean wipeStateStore) throws IOException {
         stateMgr.close();
+        if (wipeStateStore) {
+            try {
+                log.error("Wiping state stores for global task.");
+                Utils.delete(stateMgr.baseDir());
+            } catch (final IOException e) {
+                log.error("Failed to wiping state stores for global task.", e);

Review comment:
       ```suggestion
                   log.error("Failed to delete global task directory after 
detecting corruption.", e);
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
##########
@@ -114,8 +118,16 @@ public void flushState() {
         stateMgr.checkpoint(offsets);
     }
 
-    public void close() throws IOException {
+    public void close(final boolean wipeStateStore) throws IOException {
         stateMgr.close();
+        if (wipeStateStore) {
+            try {
+                log.error("Wiping state stores for global task.");

Review comment:
       This doesn't seem to be an error. Maybe info would be better? Also, I 
think "wipe state stores" might be confusing for a user looking at the log 
messages with no context. "Deleting the task directory" seems to be a more 
context-free statement of what we're doing.
   
   ```suggestion
                   log.info("Deleting global task directory after detecting 
corruption.");
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java
##########
@@ -31,7 +31,7 @@
 
     void flushState();
 
-    void close() throws IOException;
+    void close(final boolean wipeStateStore) throws IOException;

Review comment:
       Thanks @mjsax , this sounds perfect to me.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##########
@@ -284,10 +278,21 @@ public void run() {
         }
         setState(State.RUNNING);
 
+        boolean wipeStateStore = false;
         try {
             while (stillRunning()) {
                 stateConsumer.pollAndUpdate();
             }
+        } catch (final InvalidOffsetException recoverableException) {
+            wipeStateStore = true;
+            log.error(
+                "Updating global state failed. You can restart KafkaStreams to 
recover from this error.",

Review comment:
       ```suggestion
                   "Updating global state failed due to inconsistent local 
state. Will attempt to clean up the local state. You can restart KafkaStreams 
to recover from this error.",
   ```
   
   Just a thought to indicate why just restarting would recover anything.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to