dajac commented on code in PR #18499: URL: https://github.com/apache/kafka/pull/18499#discussion_r1914544061
########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java: ########## @@ -840,7 +848,11 @@ private void failCurrentBatch(Throwable t) { if (currentBatch != null) { coordinator.revertLastWrittenOffset(currentBatch.baseOffset); for (DeferredEvent event : currentBatch.deferredEvents) { - event.complete(t); + try { + event.complete(t); + } catch (Throwable e) { + log.error("Event {} for {} failed to complete.", event, tp, e); + } Review Comment: Instead of putting those try..catch everywhere we call complete, I wonder if we could have a wrapper event which would catch all exceptions. We could wrap all the events with it. We need only one instance. Have you considered something like this? I am not sure if it is really better though. ########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java: ########## @@ -1837,7 +1849,14 @@ public void onHighWatermarkUpdated( // exists and is in the active state. log.debug("Updating high watermark of {} to {}.", tp, newHighWatermark); context.coordinator.updateLastCommittedOffset(newHighWatermark); - context.deferredEventQueue.completeUpTo(newHighWatermark); + try { + context.deferredEventQueue.completeUpTo(newHighWatermark); + } catch (Throwable e) { + log.error("Failed to complete deferred events for {} up to {}, flushing deferred event queue.", + tp, newHighWatermark, e); + context.deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); + context.failCurrentBatch(Errors.NOT_COORDINATOR.exception()); Review Comment: I don't understand the reasoning behind those two lines. Could you please elaborate? ########## server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java: ########## @@ -82,7 +85,11 @@ public void failAll(Exception exception) { Entry<Long, List<DeferredEvent>> entry = iter.next(); for (DeferredEvent event : entry.getValue()) { log.info("failAll({}): failing {}.", exception.getClass().getSimpleName(), event); - event.complete(exception); + try { + event.complete(exception); + } catch (Throwable e) { + log.error("failAll({}): {} threw an exception.", exception.getClass().getSimpleName(), event, e); + } Review Comment: I find it weird that we handle exception in failAll but not in completeUpTo. I think that this class should treat events consistently. ########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java: ########## @@ -670,23 +671,30 @@ private void transitionTo( highWatermarklistener = new HighWatermarkListener(); partitionWriter.registerListener(tp, highWatermarklistener); coordinator.onLoaded(metadataImage); + // Update partition metrics only after the coordinator context is fully loaded. + runtimeMetrics.recordPartitionStateChange(oldState, state); break; case FAILED: state = CoordinatorState.FAILED; + // Update partition metrics before unload, since the coordinator context is no + // longer usable. Review Comment: This comment is incorrect because we actually keep the context when transitioning to failed state. ########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java: ########## @@ -670,23 +671,30 @@ private void transitionTo( highWatermarklistener = new HighWatermarkListener(); partitionWriter.registerListener(tp, highWatermarklistener); coordinator.onLoaded(metadataImage); + // Update partition metrics only after the coordinator context is fully loaded. + runtimeMetrics.recordPartitionStateChange(oldState, state); Review Comment: Does applying it after make a big difference? I wonder whether we should call `recordPartitionStateChange` before the switch in order to be consistent for all states. We could also more `state = ...` to before the switch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org