Thanks for the updates Aditya

Follow-up on LM1: If we throw something like IllegalState if the old
subscribe is used with the new setRebalanceListener, nothing breaks for any
user on day 1 (no existing app has setRebalanceListener). Things would fail
for users who don't adopt the new APIs properly (which is good I expect).
So the question is, do we want to allow users adopting the new API
setRebalanceListener, without adopting the new subscribe without listener?
I was thinking we don't, it seems just like a wrong usage of the new APIs,
feels like a migration done wrong and we can guide it better? This
confusing combination of APIs would be introduced only with this KIP btw,
doesn't exist now, trunk APIs only allow listeners at the subscription
level..wrong, but consistently wrong :). Thoughts?

Follow-up on LM2: Agree with the meaning of the deprecation tag (version
where we will deprecate, says 4.4, ok). But then the KIP reads
elsewhere: "*Deprecate
all subscribe() method variants with rebalance listener in AK5.0*" (I guess
this is what's wrong, but let's clarify when exactly we're deprecating)

Thanks!
Lianet



On Tue, Apr 21, 2026 at 2:15 PM Aditya Kousik <[email protected]>
wrote:

> Hi Lianet,
>
> Thanks for the review.
>
> LM1: My intent was to leave the existing behaviour unchanged and support
> both methods equally. For users who upgrade to the new version, subscribe()
> will break on day 1 if we change default implementation to throw
> IllegalStateException. The only change I was planning was to manually
> register the rebalance listener with the subscription state in the
> deprecated methods, where right now we simply pass the listener as an
> argument.
>
> LM2: I read the @Deprecated annotation as the version from which the
> method will stick around deprecated i.e. “since 4.4”, but marked for
> eventual removal at a future unspecified version (5.0/6.0).
>
> LM3: Thanks for highlighting the missing methods. I’ve addressed them in
> the updated included/excluded section. I’ve also added the interface for
> the new view in the KIP. It’s meant to mirror the methods of Consumer ->
> KafkaConsumer by delegating the actual calls to the proper implementation.
>
> LM4: I agree with the naming challenge. Some of the alternatives I’d
> considered/rejected: RebalanceHandler, RebalanceView,
> ConsumerRebalanceView, GuardedConsumerView. We landed on
> ConsumerRebalanceAdapter since it’s not really just a view (we allow
> pause/commit etc.); we already have ConsumerRebalanceListener so
> consistency played a role there.
>
> I had not considered RebalanceConsumer. It fits here because it supports
> nearly 70% of the methods on Consumer and clearly denotes it’s meant for
> use at rebalance time like you said. I agree with this change. I’ve updated
> the KIP with the new name.
>
> Best,
> Aditya
>
> > On Apr 21, 2026, at 05:17, Lianet Magrans <[email protected]> wrote:
> >
> > Hi, thanks for the KIP!
> >
> > LM1: We’re proposing a new API to set the consumer rebalance listener at
> > the consumer level (nice), but what will be the behaviour if an app calls
> > that new API + any of the “old” subscribe (e.g, subscribe(...,
> > ConsumerRebalanceListener)). Do we intend to accept it silently and
> > override the listener (similar to what happens today), or to throw some
> > kind of IllegalState? I imagine the latter is better to guide users on
> how
> > to properly use the APIs, and aligns with the Deprecation that the KIP
> > introduces on the subscribe with listener.
> >
> > LM2: About the depreciation of subscribe with listener, one section of
> the
> > KIP says deprecate in 5.0, the sample code has @deprecate tags “since
> 4.4”.
> > What’s the intention?
> >
> > LM3: About the new ConsumerRebalanceAdapter, could we include the
> explicit
> > definition of the component?
> >    - Is it going to be a class? Or interface consistent with the Consumer
> > -> KafkaConsumer? Not clear what the intention is.
> >   - There are some KafkaConsumer APIs that are not listed in the
> > “Included” section, nor in the “Excluded” section. (E.g, “subscription”,
> > the metrics-related ones like registerMetricForSubscription,
> > clientInstanceId). What's the intention there?
> >
> > LM4: “ConsumerRebalanceAdapter” as a name seems confusing to me for this
> > component, as it's not clear what it is (it's just a Consumer in the end,
> > just a restricted one to be used for rebalance, correct?). Could we think
> > about a better/clear name? (e.g, RebalanceConsumer). I find this one
> clear
> > and to the point, but no strong position, naming is tricky :)
> >
> > Thanks!
> > Lianet
> >
> >> On Fri, Apr 17, 2026 at 3:12 AM Andrew Schofield <[email protected]
> >
> >> wrote:
> >>
> >> Thanks. Looks good to me.
> >>
> >> Andrew
> >>
> >>> On 2026/04/13 23:00:13 Aditya Kousik wrote:
> >>> Thank you for the clarification, Andrew.
> >>>
> >>> I’d initially made the changes to the KIP along the same lines after
> >> your suggestion. I think I just got a little ahead of my thinking
> without a
> >> setListener method; instead to create an interceptor like we do for
> >> consumer - the KIP did not reflect this idea. It was just me floating
> the
> >> idea.
> >>>
> >>> I think the KIP currently reflects all your suggestions, please take a
> >> look when you can. It includes the explicit mark for removal of the
> >> subscribe methods,  new setConsumerRebalanceListener method on Consumer.
> >> I’ve renamed the name ConsumerRebalanceAdapter - it’s not really just a
> >> view because we do allow write-like ops like commit/seek.
> >>>
> >>> Best,
> >>> Aditya
> >>>
> >>>> On Apr 13, 2026, at 10:12, Andrew Schofield <[email protected]>
> >> wrote:
> >>>>
> >>>> Hi Aditya,
> >>>> I think I might have confused you. I didn't write clearly enough so
> >> apologies. I'll try again.
> >>>>
> >>>> Today, we have:
> >>>>
> >>>> public interface ConsumerRebalanceListener {
> >>>>
> >>>>   void onPartitionsAssigned(Collection<TopicPartition> partitions);
> >>>>
> >>>>   void onPartitionsRevoked(Collection<TopicPartition> partitions);
> >>>>
> >>>>   default void onPartitionsLost(Collection<TopicPartition>
> >> partitions) {
> >>>>       onPartitionsRevoked(partitions);
> >>>>   }
> >>>> }
> >>>>
> >>>> I think what you end up with is:
> >>>>
> >>>> public interface ConsumerRebalanceListener {
> >>>>
> >>>>   void onPartitionsAssigned(Collection<TopicPartition> partitions);
> >>>>
> >>>>   default void onPartitionsAssigned(Collection<TopicPartition>
> >> partitions,
> >>>>       ConsumerRebalanceAdapter consumerAdapter) {
> >>>>     onPartitionsAssigned(partitions);
> >>>>   }
> >>>>
> >>>>   void onPartitionsRevoked(Collection<TopicPartition> partitions);
> >>>>
> >>>>   default void onPartitionsRevoked(Collection<TopicPartition>
> >> partitions,
> >>>>       ConsumerRebalanceAdapter consumerAdapter) {
> >>>>     onPartitionsRevoked(partitions);
> >>>>   }
> >>>>
> >>>>   default void onPartitionsLost(Collection<TopicPartition>
> >> partitions) {
> >>>>     onPartitionsRevoked(partitions);
> >>>>   }
> >>>>
> >>>>   default void onPartitionsLost(Collection<TopicPartition> partitions,
> >>>>       ConsumerRebalanceAdapter consumerAdapter) {
> >>>>     onPartitionsRevoked(partitions, consumerAdapter);
> >>>>   }
> >>>> }
> >>>>
> >>>> ConsumerRebalanceView is also fine for the cut-down interface to the
> >> Consumer. I do think it would be best to start the name with
> >> ConsumerRebalance... .
> >>>>
> >>>> And then the changes to Consumer are:
> >>>>
> >>>> * Add setConsumerRebalanceListener(ConsumerRebalanceListener listener)
> >>>> * Deprecate all of the subscribe variants which take a
> >> ConsumerRebalanceListener parameter
> >>>>
> >>>>
> >>>> I do take the point about having a list of rebalance listeners like
> >> the interceptor classes, but I wouldn't wrap that into the same KIP
> >> personally. People tend to implement these things in lambdas.
> >>>>
> >>>> Thanks,
> >>>> Andrew
> >>>>
> >>>>> On 2026/04/08 05:30:51 Aditya Kousik wrote:
> >>>>> Hello Andrew,
> >>>>>
> >>>>> I dwelt on this a bit more. I think supporting both listeners until
> >> AK5.0 may not be as irksome as I initially feared. We already do this
> for
> >> classic/async consumers with ConsumerDelegate.
> >>>>>
> >>>>> I was curious why subscribe() alone took a client-constructed object
> >> like ConsumerRebalanceListener whereas all other hooks were
> instantiated by
> >> the client code via Configurable. We can support both but amply call out
> >> via documentation and a log info line about which one will be activated
> at
> >> runtime. This can be opt-in at launch and eventually made the de facto
> >> pattern in the next major release.
> >>>>>
> >>>>> This would also tie in nicely with subscribe only supporting topic
> >> collections + regex and move the callback within the client code like
> other
> >> interceptors already.
> >>>>>
> >>>>> I also wanted to call out that this could be an opportunity to
> >> support a list of ConsumerRebalanceInterceptor like the others.
> Currently
> >> in my code, i wrap the outer one as a CompositeRebalanceListener with
> >> List<ConsumerRebalanceListener> invoked serially. We already do this as
> I
> >> mentioned earlier with producer/consumerInterceptors handling exceptions
> >> within each call in a loop by logging a warn.
> >>>>>
> >>>>> So rebalance.interceptors with a LIST of fqdn classnames instantiated
> >> within the constructor is my current favourite approach. We support one
> API
> >> exactly for all rebalances indicating which one at runtime.
> >>>>>
> >>>>> Lmk your thoughts on this.
> >>>>>
> >>>>> Thanks,
> >>>>> Aditya Kousik
> >>>>>
> >>>>>>> On Apr 5, 2026, at 12:37, Aditya Kousik <[email protected]>
> >> wrote:
> >>>>>>
> >>>>>> Hi Andrew,
> >>>>>>
> >>>>>> I see what you’re saying.
> >>>>>> With AS1,2 the flow becomes clearer for the subscribe interaction:
> >> we only change the subscription state for topics and leave rebalance
> events
> >> to a separate mechanism uncoupled from the subscribe() call.
> >>>>>>
> >>>>>> To keep in line with other kafka client classes, can we follow the
> >> same convention of using ConsumerConfig to handle this? A new
> >> `ConsumerRebalanceInterceptor` with the same signature I’d proposed.
> >> Instantiated with Utils#newConfiguredInstance and make the class
> >> Configurable. Naming and instantiating makes it closer to existing
> >> interceptor classes.
> >>>>>>
> >>>>>> My only worry is that as long as ConsumerRebalanceListener exists,
> >> this can be a source of confusion for which interface to use for
> rebalance
> >> events. Unless we deprecate it, we bear the burden of invoking both,
> even
> >> if we state that only oneOf(ConsumerRebalanceListener,
> >> ConsumberRebalanceInterceptor) will be invoked during rebalances.
> >>>>>>
> >>>>>> Would love to hear your thoughts on this.
> >>>>>>
> >>>>>> -Aditya
> >>>>>>
> >>>>>>> On Apr 5, 2026, at 09:39, Andrew Schofield <[email protected]>
> >> wrote:
> >>>>>>>
> >>>>>>> Hi Aditya,
> >>>>>>> I agree that using the existing ConsumerRebalanceListener gives a
> >> lower adoption burden.
> >>>>>>>
> >>>>>>> AS1: To be more concrete with what I mean here, we could:
> >>>>>>> * Deprecate Consumer.subscribe(Collection<String>,
> >> ConsumerRebalanceListener) for removal in AK 5.0
> >>>>>>> * Introduce
> >> Consumer.setConsumerRebalanceListener(ConsumerRebalanceListener)
> >>>>>>>
> >>>>>>> AS2: Given that we already have an interface called
> >> ConsumerRebalanceListener, I suggest that ConsumerRebalanceXXX would be
> a
> >> better naming choice for naming your new interface in terms of
> consistency.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Andrew
> >>>>>>>
> >>>>>>>>> On 2026/04/04 23:48:28 Aditya Kousik wrote:
> >>>>>>>> Hi Andrew, thank you for the quick feedback. It turned out to be
> >> pivotal.
> >>>>>>>>
> >>>>>>>> One of the rejected alternatives was to Add default methods to
> >> ConsumerRebalanceListener.
> >>>>>>>> I was ambivalent on this approach with the hopes that a new method
> >> and new interface would create the least friction.
> >>>>>>>>
> >>>>>>>> You’re right about the state change w.r.t subscribe() variants.
> >> With the Classic consumer, we directly update SubscriptionType with
> >> setSubscriptionType and similarly a more complex setup for the async
> >> consumer. So your setRebalanceHander suggestion seems to follow existing
> >> patterns.
> >>>>>>>>
> >>>>>>>> However, looking at the places I’d need to pipe RebalanceHandler
> >> through, it’s going to add a burden to the plumbing and subscription
> state.
> >>>>>>>>
> >>>>>>>> I’m falling squarely in the extending the existing
> >> ConsumerRebalanceListener with new default methods. This also allows
> >> existing frameworks like Spring and SmallRye can directly hook into the
> new
> >> method with minimal change.
> >>>>>>>>
> >>>>>>>> I’ve renamed/updated the KIP to reflect this. (I can see why
> >> people use shareable URLs for the confluence docs)
> >>>>>>>>
> >>>>>>>> https://cwiki.apache.org/confluence/x/9ZU8G
> >>>>>>>>
> >>>>>>>> -Aditya
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>>> On Apr 2, 2026, at 05:50, Andrew Schofield <
> >> [email protected]> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Aditya,
> >>>>>>>>> Thanks for the KIP. I've only taken a quick look so far, but
> >> here's an initial comment.
> >>>>>>>>>
> >>>>>>>>> AS1: One of the mistakes in the Kafka consumer API today is that
> >> the `subscribe(Collection<String>, ConsumerRebalanceListener)` does two
> >> jobs. First, it replaces the rebalance listener (when you might assume
> that
> >> the listener applies only to rebalance changes resulting from this call
> to
> >> subscribe). Second, it changes the subscription. If the second of these
> >> throws an exception, the first will still occur. It's a bit of a mess. I
> >> suggest you have a `Consumer.setRebalanceHandler(RebalanceHandler)`
> method
> >> and do not add a new override for `Consumer.subscribe`.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Andrew
> >>>>>>>>>
> >>>>>>>>> On 2026/04/01 15:16:36 Aditya Kousik wrote:
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> I'd like to start a discussion on KIP-1306: RebalanceHandler:
> >> Consumer-Aware Rebalance Callback.
> >>>>>>>>>>
> >>>>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1306%3A+RebalanceHandler%3A+Consumer-Aware+Rebalance+Callback
> >>>>>>>>>>
> >>>>>>>>>> Spring Kafka, SmallRye, and Micronaut all pass the consumer into
> >> rebalance callbacks; the client doesn't. The standard workaround of
> >> constructor-injecting a full Consumer reference allows dangerous
> operations
> >> like poll() and close() inside a callback. This KIP proposes
> >> RebalanceHandler, with a RebalanceConsumerView that exposes only safe
> >> operations, making misuse a compile error.
> >>>>>>>>>>
> >>>>>>>>>> Looking forward to your feedback.
> >>>>>>>>>>
> >>>>>>>>>> Thanks
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>
> >>>
> >>
>

Reply via email to