[ https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16805882#comment-16805882 ]
Dana Powers commented on KAFKA-4600: ------------------------------------ This doesn't look fixed in trunk: {code:java} clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java 271 // execute the user's callback after rebalance 272 ConsumerRebalanceListener listener = subscriptions.rebalanceListener(); 273 log.info("Setting newly assigned partitions: {}", Utils.join(assignedPartitions, ", ")); 274 try { 275 listener.onPartitionsAssigned(assignedPartitions); 276 } catch (WakeupException | InterruptException e) { 277 throw e; 278 } catch (Exception e) { 279 log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e); 280 } {code} The exception is logged, but it is not otherwise exposed to the consumer. > Consumer proceeds on when ConsumerRebalanceListener fails > --------------------------------------------------------- > > Key: KAFKA-4600 > URL: https://issues.apache.org/jira/browse/KAFKA-4600 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.10.1.1 > Reporter: Braedon Vickers > Priority: Major > Fix For: 0.11.0.0 > > > One of the use cases for a ConsumerRebalanceListener is to load state > necessary for processing a partition when it is assigned. However, when > ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. > the state isn't loaded), the error is logged and the consumer proceeds on as > if nothing happened, happily consuming messages from the new partition. When > the state is relied upon for correct processing, this can be very bad, e.g. > data loss can occur. > It would be better if the error was propagated up so it could be dealt with > normally. At the very least the assignment should fail so the consumer > doesn't see any messages from the new partitions, and the rebalance can be > reattempted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)