dajac commented on code in PR #18499:
URL: https://github.com/apache/kafka/pull/18499#discussion_r1914544061


##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -840,7 +848,11 @@ private void failCurrentBatch(Throwable t) {
             if (currentBatch != null) {
                 coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
                 for (DeferredEvent event : currentBatch.deferredEvents) {
-                    event.complete(t);
+                    try {
+                        event.complete(t);
+                    } catch (Throwable e) {
+                        log.error("Event {} for {} failed to complete.", 
event, tp, e);
+                    }

Review Comment:
   Instead of putting those try..catch everywhere we call complete, I wonder if 
we could have a wrapper event which would catch all exceptions. We could wrap 
all the events with it. We need only one instance. Have you considered 
something like this? I am not sure if it is really better though.



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -1837,7 +1849,14 @@ public void onHighWatermarkUpdated(
                                 // exists and is in the active state.
                                 log.debug("Updating high watermark of {} to 
{}.", tp, newHighWatermark);
                                 
context.coordinator.updateLastCommittedOffset(newHighWatermark);
-                                
context.deferredEventQueue.completeUpTo(newHighWatermark);
+                                try {
+                                    
context.deferredEventQueue.completeUpTo(newHighWatermark);
+                                } catch (Throwable e) {
+                                    log.error("Failed to complete deferred 
events for {} up to {}, flushing deferred event queue.",
+                                        tp, newHighWatermark, e);
+                                    
context.deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+                                    
context.failCurrentBatch(Errors.NOT_COORDINATOR.exception());

Review Comment:
   I don't understand the reasoning behind those two lines. Could you please 
elaborate?



##########
server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java:
##########
@@ -82,7 +85,11 @@ public void failAll(Exception exception) {
             Entry<Long, List<DeferredEvent>> entry = iter.next();
             for (DeferredEvent event : entry.getValue()) {
                 log.info("failAll({}): failing {}.", 
exception.getClass().getSimpleName(), event);
-                event.complete(exception);
+                try {
+                    event.complete(exception);
+                } catch (Throwable e) {
+                    log.error("failAll({}): {} threw an exception.", 
exception.getClass().getSimpleName(), event, e);
+                }

Review Comment:
   I find it weird that we handle exception in failAll but not in completeUpTo. 
I think that this class should treat events consistently.



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -670,23 +671,30 @@ private void transitionTo(
                     highWatermarklistener = new HighWatermarkListener();
                     partitionWriter.registerListener(tp, 
highWatermarklistener);
                     coordinator.onLoaded(metadataImage);
+                    // Update partition metrics only after the coordinator 
context is fully loaded.
+                    runtimeMetrics.recordPartitionStateChange(oldState, state);
                     break;
 
                 case FAILED:
                     state = CoordinatorState.FAILED;
+                    // Update partition metrics before unload, since the 
coordinator context is no
+                    // longer usable.

Review Comment:
   This comment is incorrect because we actually keep the context when 
transitioning to failed state.



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -670,23 +671,30 @@ private void transitionTo(
                     highWatermarklistener = new HighWatermarkListener();
                     partitionWriter.registerListener(tp, 
highWatermarklistener);
                     coordinator.onLoaded(metadataImage);
+                    // Update partition metrics only after the coordinator 
context is fully loaded.
+                    runtimeMetrics.recordPartitionStateChange(oldState, state);

Review Comment:
   Does applying it after make a big difference? I wonder whether we should 
call `recordPartitionStateChange` before the switch in order to be consistent 
for all states. We could also more `state = ...` to before the switch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to