jolshan commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1633660092


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -711,8 +752,16 @@ private void flushCurrentBatch() {
                     coordinator.updateLastWrittenOffset(offset);
 
                     if (offset != currentBatch.nextOffset) {
-                        throw new IllegalStateException("The state machine of 
coordinator " + tp + " is out of sync with the " +
-                            "underlying log. The last write returned " + 
offset + " while " + currentBatch.nextOffset + " was expected");
+                        log.error("The state machine of the coordinator {} is 
out of sync with the underlying log. " +
+                            "The last written offset returned is {} while the 
coordinator expected {}. The coordinator " +
+                            "will be reloaded in order to re-synchronize the 
state machine.",
+                            tp, offset, currentBatch.nextOffset);
+                        // Transition to FAILED state to unload the state 
machine and complete
+                        // exceptionally all the pending operations.
+                        transitionTo(CoordinatorState.FAILED);
+                        // Transition to LOADING to trigger the restoration of 
the state.
+                        transitionTo(CoordinatorState.LOADING);

Review Comment:
   Is it worth adding a test for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to