> From an API POV, I think the new `CloseOptions` class should not have any "getters" and thus, it's irrelevant how we represent the different cases in code internally (even if I believe using `Optional` might be a good way to handle it).
If we choose to avoid using getters, consumers would have to access the internal variables directly, similar to how it's done in the streams CloseOptions. While this is a matter of preference, it's worth noting that Admin options do provide both setters and getters. In my opinion, since all of these options are part of the kafka-clients code, they should adhere to a consistent coding style. > In KS, we use config objects like `CloseOption` all the time, with static "factory" method (and private constructors), and an internal sub-class which would have the getters if needed. So I think we should have I value the static factory methods for their convenience, allowing users to quickly create an object when they want to set only one option. However, while I don't want to sound repetitive, maintaining a consistent pattern across the API is essential. Best, Chia-Ping Matthias J. Sax <mj...@apache.org> 於 2024年9月30日 週一 上午2:26寫道: > I am not sure, but I get the impression that we are starting to talk too > much about implementation details now? > > From an API POV, I think the new `CloseOptions` class should not have > any "getters" and thus, it's irrelevant how we represent the different > cases in code internally (even if I believe using `Optional` might be a > good way to handle it). > > In KS, we use config objects like `CloseOption` all the time, with > static "factory" method (and private constructors), and an internal > sub-class which would have the getters if needed. So I think we should have > > public class CloseOptions { > private CloseOptions(Optional<Duration>, Optional<boolean>); > > public static CloseOptions timeout(Duration); > > public static CloseOptions leaveGroup(boolean); > > public CloseOption withTimeout(Duration); > > public CloseOption withLeaveGroup(boolean); > } > > > This allows to call as example: > > consumer.close(CloseOptions.leaveGroup(true)); > > or > > consumer.close( > CloseOptions.timeout(Duration.ofMinutes(5) > .withLeaveGroup(false) > ); > > We can still discuss naming and what overloads we want to add, but in > general, a well established and proven pattern is to have a few static > "entry" methods, with return the object itself to allow chaining with > non-static methods. > > > > I am not sure, why KIP-812 did not follow this pattern... I think it got > it wrong and we should not repeat this "mistake". Maybe we could > actually piggy-pack a cleanup for the existing KS CloseOption object > into this KIP? > > > > -Matthias > > On 9/29/24 8:39 AM, TengYao Chi wrote: > > Hi Chia-Ping, > > > > Thanks for your feedback. > > What I intended to express is that if `Optional.empty()` is passed to the > > `timeout`, it will eventually be converted to `DEFAULT_CLOSE_TIMEOUT_MS`, > > just as you mentioned. > > Apologies for not expressing that clearly and for any confusion caused. > > > > Best regards, > > TengYao > > > > Chia-Ping Tsai <chia7...@gmail.com> 於 2024年9月29日 週日 下午10:16寫道: > > > >> hi TengYao > >> > >>> I have reviewed the `close()` method implementation for both the > Classic > >> and Async Consumers. I believe the `timeout` parameter could have a > default > >> value, and this default should align with the existing > `Consumer#close()` > >> method, which internally calls the overloaded `Consumer#close(Duration)` > >> with a default of 30 seconds (`DEFAULT_CLOSE_TIMEOUT_MS`). > >> > >> If you assign a default value (30s) to CloseOptions#timeout, the > consumer > >> won't be able to differentiate between the "default" and "user-defined" > >> behaviors. > >> > >> Therefore, I prefer to leave the timeout empty as the default value, > >> allowing the consumer to handle it in a way that reflects default > behavior. > >> Best, > >> Chia-Ping > >> > >> > >> > >> TengYao Chi <kiting...@gmail.com> 於 2024年9月29日 週日 下午2:23寫道: > >> > >>> Hi Sophie, > >>> > >>> Thanks for the suggestions. > >>> > >>> I have reviewed the `close()` method implementation for both the > Classic > >>> and Async Consumers. I believe the `timeout` parameter could have a > >> default > >>> value, and this default should align with the existing > `Consumer#close()` > >>> method, which internally calls the overloaded > `Consumer#close(Duration)` > >>> with a default of 30 seconds (`DEFAULT_CLOSE_TIMEOUT_MS`). > >>> > >>> Regarding the `leaveGroup` parameter, since static and dynamic members > >> have > >>> different default behaviors upon close, and the primitive type boolean > >>> cannot capture all cases, I agree with Chia-Ping’s suggestion of using > >>> Optional instead. This approach avoids potential NPE issues that could > >>> arise from using the boxed type Boolean and allows us to cover all use > >>> cases more effectively. > >>> > >>> Given the above, although there is no standard in the Consumer API (as > >> far > >>> as I know), I believe adopting a fluent API would be beneficial, as it > >>> would be more user-friendly and concise for configuring the necessary > >>> options. > >>> > >>> Best, > >>> TengYao > >>> > >>> Chia-Ping Tsai <chia7...@gmail.com> 於 2024年9月29日 週日 上午7:07寫道: > >>> > >>>> hi Sophie > >>>> > >>>> Fewer overloads are preferable, so in my opinion, the consumer should > >>> only > >>>> have close() and close(CloseOptions), with the other overloads > >>> deprecated. > >>>> > >>>> That means all options in CloseOptions should be optional, and we > >> should > >>>> use a fluent-style API to add setters for them. This would allow users > >> to > >>>> configure only the necessary options while leaving the rest at their > >>>> default values. For example: > >>>> > >>>> // case 0: set both timeout and leaveGroup > >>>> new CloseOptions() > >>>> .timeout(100) > >>>> .leaveGroup(false); > >>>> > >>>> // case 1: set only timeout and leaveGroup is default > >>>> new CloseOptions() > >>>> .timeout(100) > >>>> > >>>> // case 2: set only leaveGroup, and timeout is default > >>>> new CloseOptions() > >>>> .leaveGroup(true) > >>>> > >>>> Additionally, all getters of CloseOptions return Optional<>, which can > >>>> distinguish between a "default" value and a "user-defined" value. For > >>>> another, `close()` can have default implementation by `close(new > >>>> CloseOptions())` > >>>> > >>>> Best, > >>>> Chia-Ping > >>>> > >>>> > >>>> Sophie Blee-Goldman <sop...@responsive.dev> 於 2024年9月29日 週日 上午5:52寫道: > >>>> > >>>>> @Matthias, regarding your questions in 101, is it fair to summarize > >>> your > >>>>> points here as (A) only Kafka Streams, but not plain consumer, would > >>> need > >>>>> to avoid leaving the group on close (for non-static members), and (B) > >>>> with > >>>>> KIP-1088 we will soon have a Streams-specific Consumer API that would > >>> be > >>>>> more suited to these kinds of Streams-specific APIs, and therefore > >> --> > >>>> (C) > >>>>> it doesn't make sense to do this KIP now and we should just wait for > >>> 1088 > >>>>> so we can apply the API on top of it? > >>>>> > >>>>> If that's a fair summary, then I would have to disagree. Not sending > >> a > >>>>> LeaveGroup on close for dynamic members isn't some highly > >>>>> specific optimization that could only ever make sense in the context > >> of > >>>>> Kafka Streams. IT doesn't really have to do with Kafka Streams at > >> all, > >>>> it's > >>>>> just a good thing to do for stateful apps where you don't want to > >>> shuffle > >>>>> around partitions too much after a simple bounce. So imo this KIP > >> makes > >>>>> sense to do as-is, and people have been asking for it for quite some > >>> time > >>>>> so I wouldn't want to delay this any further if possible. > >>>>> > >>>>> Also, this is technically a bit of an implementation detail so > >> perhaps > >>> it > >>>>> doesn't belong in the KIP, but I think we should remove the > >>>>> "internal.leave.group.pon.close" ConsumerConfig that was used > >>> exclusively > >>>>> by Kafka Streams since we can now use the new > >>>> Consumer#close(CloseOptions) > >>>>> API for everything. And we should mention doing this in the KIP, even > >>>>> though it's an internal config, just in case someone out there is > >> using > >>>> it. > >>>>> > >>>>> @Chia-Ping > >>>>> Regarding what the default behavior, I think it probably makes sense > >> to > >>>>> leave the default behavior of the CloseOptions identical to the > >>> existing > >>>>> Consumer#close overloads to avoid making things too complicated for > >>> users > >>>>> to understand. Especially if we're not going to remove the old #close > >>>>> overloads > >>>>> > >>>>> @TengYao > >>>>> Thanks for adding the specific APIs. We should probably first > >> determine > >>>>> things like which parameters of CloseOptions should be required and > >>> which > >>>>> can be left to the default (plus what that default is: see above > >>>>> conversation with Chia-Ping) and then we can design the API around > >>> that. > >>>> I > >>>>> noticed for example that with the current proposal, it would be > >>>> impossible > >>>>> to set both the leaveGroup and timeOut parameters of the > >> CloseOptions. > >>> We > >>>>> need to make sure the available public constructors allow users to > >> set > >>>> the > >>>>> full range of configs. We could also use a fluent-style API like we > >> do > >>> in > >>>>> Kafka Streams for config objects. Not sure what (if anything) is the > >>>>> standard for Consumer APIs? > >>>>> > >>>>> > >>>>> Finally, one open-ended question I have for everyone here: should the > >>>>> leaveGroup config be a required parameter of CloseOptions? Or should > >> we > >>>>> allow users to pass in an "empty" CloseOptions to leave both the > >>> timeout > >>>>> and leaveGroup behavior to the default? > >>>>> > >>>>> > >>>>> > >>>>> On Sat, Sep 28, 2024 at 9:58 AM Chia-Ping Tsai <chia7...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> hi TengYao > >>>>>> > >>>>>> Could you please consider adding a "default" behavior for > >> leaveGroup? > >>>> As > >>>>> I > >>>>>> previously mentioned, leaveGroup=true is not ideal as the default > >> for > >>>>>> static members, and similarly, leaveGroup=false is not suitable as > >>> the > >>>>>> default for dynamic members. > >>>>>> > >>>>>> Maybe we could change the type of leaveGroup to Optional<Boolean>, > >>>>> allowing > >>>>>> it to represent three distinct behaviors. > >>>>>> > >>>>>> Best, > >>>>>> > >>>>>> Chia-Ping > >>>>>> > >>>>>> TengYao Chi <kiting...@gmail.com> 於 2024年9月29日 週日 上午12:51寫道: > >>>>>> > >>>>>>> Hi Sophie > >>>>>>> > >>>>>>> Thanks for feedback. > >>>>>>> I have updated the Public Interface part accordingly. > >>>>>>> Please take a look. > >>>>>>> > >>>>>>> Best, > >>>>>>> TengYao > >>>>>>> > >>>>>>> TengYao Chi <kiting...@gmail.com> 於 2024年9月28日 週六 下午1:26寫道: > >>>>>>> > >>>>>>>> Hi Matthias, > >>>>>>>> > >>>>>>>> Thanks for the explanation, particularly regarding the > >> important > >>>>>>>> considerations for both the plain consumer and Kafka Streams > >> use > >>>>> cases. > >>>>>>>> > >>>>>>>> In this case, I think it would be better to stick with my > >> initial > >>>>>>>> proposal. We should give plain consumers the ability to > >> determine > >>>>>> whether > >>>>>>>> to send a leave group request or not, with clear documentation > >>>>>>> highlighting > >>>>>>>> the potential downsides. This could also provide flexibility > >> for > >>>>> future > >>>>>>>> features. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> TengYao > >>>>>>>> > >>>>>>>> Chia-Ping Tsai <chia7...@gmail.com> 於 2024年9月28日 週六 上午3:27寫道: > >>>>>>>> > >>>>>>>>> hi Matthias > >>>>>>>>> > >>>>>>>>>> 100: Why do we want to distinguish between the classic and > >>> the > >>>>> new > >>>>>>>>> async > >>>>>>>>> consumer? Should they not have the same (user facing) > >> behavior? > >>> Or > >>>>>> maybe > >>>>>>>>> I misunderstand something. Can one catch we up what epoch "-1" > >>> vs > >>>>>> epoch > >>>>>>>>> "-2" means? > >>>>>>>>> > >>>>>>>>> I apologize for any confusion in my earlier explanation. The > >>> way a > >>>>>>>>> consumer > >>>>>>>>> leaves a group varies between the Classic Consumer and the > >> Async > >>>>>>> Consumer: > >>>>>>>>> > >>>>>>>>> - The *Classic Consumer* uses a LeaveGroupRequest but does > >> *not* > >>>>> send > >>>>>>> this > >>>>>>>>> request for static members. > >>>>>>>>> > >>>>>>>>> - In contrast, the *Async Consumer* sends a > >>>>>>> ConsumerGroupHeartbeatRequest. > >>>>>>>>> If the member is static, this request is sent with an epoch > >>> value > >>>> of > >>>>>> -2, > >>>>>>>>> indicating that the static member has temporarily left the > >> group > >>>> and > >>>>>> is > >>>>>>>>> *not* removed. An epoch of -1 in the CONSUMER protocol > >> signifies > >>>>> that > >>>>>>> the > >>>>>>>>> static member is treated as a dynamic member and will leave > >> the > >>>>> group > >>>>>>>>> completely. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> Hence, even if not useful for the plain consumer to disable > >>>>> sending > >>>>>> a > >>>>>>>>> leave-group-request, it might be worth to add a generic > >>>>> enable/disable > >>>>>>>>> API > >>>>>>>>> for both dynamic and static groups, so KS can use this API > >> (and > >>> we > >>>>>>> could > >>>>>>>>> remove the internal consumer config, which is a workaround > >>>> anyway). > >>>>>>>>> > >>>>>>>>> I agree that having a generic enable/disable API would be > >>>>> beneficial, > >>>>>>>>> especially if we can provide comprehensive documentation. This > >>>>>>>>> documentation should clearly outline the potential downsides > >> of > >>>> not > >>>>>>>>> sending > >>>>>>>>> a LEAVE_REQUEST for dynamic members, ensuring users are > >>>>> well-informed > >>>>>>>>> about > >>>>>>>>> the implications of their choices. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> > >>>>>>>>> Chia-Ping > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Matthias J. Sax <mj...@apache.org> 於 2024年9月28日 週六 上午2:07寫道: > >>>>>>>>> > >>>>>>>>>> Thanks for the KIP. Two questions/comments: > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 100: Why do we want to distinguish between the classic and > >> the > >>>> new > >>>>>>> async > >>>>>>>>>> consumer? Should they not have the same (user facing) > >>> behavior? > >>>> Or > >>>>>>> maybe > >>>>>>>>>> I misunderstand something. Can one catch we up what epoch > >> "-1" > >>>> vs > >>>>>>> epoch > >>>>>>>>>> "-2" means? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 101: I think we need to distinguish between the plain > >> consumer > >>>> and > >>>>>> the > >>>>>>>>>> KS case. > >>>>>>>>>> > >>>>>>>>>> Plain consumer: for this case, atm user don't have control > >> at > >>>> all, > >>>>>> but > >>>>>>>>>> it's hard coded when a leave group request is sent. If we > >> only > >>>>>>> consider > >>>>>>>>>> this case, the current KIP to allow sending a leave-group > >>>> request > >>>>>> for > >>>>>>>>>> static members is sufficient. I agree that disabling > >>> leave-group > >>>>>>> request > >>>>>>>>>> for dynamic member is not necessary for the plain consumer > >>> case. > >>>>>>>>>> > >>>>>>>>>> However, for the KS case it's different. Because KS uses the > >>>>>> internal > >>>>>>>>>> config to disable sending leave group request for dynamic > >>>> members, > >>>>>> we > >>>>>>>>>> lack an user facing API to enable sending a leave group > >>> request > >>>>> for > >>>>>>> this > >>>>>>>>>> case, and if we only allow to enable sending leave group > >>> request > >>>>> for > >>>>>>>>>> static members on the consumer, the KIP would fall short to > >>>> close > >>>>>> this > >>>>>>>>> gap. > >>>>>>>>>> > >>>>>>>>>> Hence, even if not useful for the plain consumer to disable > >>>>> sending > >>>>>> a > >>>>>>>>>> leave-group-request, it might be worth to add a generic > >>>>>> enable/disable > >>>>>>>>>> API for both dynamic and static groups, so KS can use this > >> API > >>>>> (and > >>>>>> we > >>>>>>>>>> could remove the internal consumer config, which is a > >>> workaround > >>>>>>>>> anyway). > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On the other hand, given the light of KIP-1088, maybe there > >>> are > >>>>>> other > >>>>>>>>>> ways to fix it on the KS side? I think the goal should be to > >>>>> remove > >>>>>>> the > >>>>>>>>>> internal consumer config (as it's static, and we cannot > >>>> overwrite > >>>>> it > >>>>>>> at > >>>>>>>>>> runtime), and to give KS a way to dynamically send a > >>>>>>> leave-group-request > >>>>>>>>>> on close() -- but maybe we could build this on an internal > >>>>> consumer > >>>>>>> API, > >>>>>>>>>> and not make it public? For this case, the current KIP would > >>> be > >>>>>>>>> sufficient. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 9/26/24 8:19 PM, Sophie Blee-Goldman wrote: > >>>>>>>>>>> Thanks for the KIP! Quick request for readability, can you > >>>>> please > >>>>>>>>> include > >>>>>>>>>>> the exact APIs that you're proposing to add or change > >> under > >>>> the > >>>>>>>>> "Public > >>>>>>>>>>> Interfaces" section? The KIP should display the actual > >>> method > >>>>>>>>> signature > >>>>>>>>>> and > >>>>>>>>>>> any applicable javadocs for new public APIs. > >>>>>>>>>>> > >>>>>>>>>>> You can look at other KIPs for a clear sense of what it > >>> should > >>>>>>>>> contain, > >>>>>>>>>> but > >>>>>>>>>>> here's one example you could work from: > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Sep 26, 2024 at 6:22 PM Chia-Ping Tsai < > >>>>>> chia7...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>>> I think I’m actually OK with leaving it as leaveGroup > >>> with a > >>>>> lot > >>>>>>> of > >>>>>>>>>>>> documentation that warns users away from changing it > >>>>> arbitrarily. > >>>>>>>>>>>> > >>>>>>>>>>>> Pardon me, I just want to ensure we are all on the same > >>> page. > >>>>>>>>>>>> > >>>>>>>>>>>> 1. `leaveGroup=true`: `ClassicKafkaConsumer` sends a > >>>>>>>>>>>> `LeaveGroupRequest` for either the dynamic or static > >>>>> member. > >>>>>>>>>>>> 2. `leaveGroup=false`: `ClassicKafkaConsumer` does > >> not > >>>>> send > >>>>>>> any > >>>>>>>>> ` > >>>>>>>>>>>> LeaveGroupRequest` for either the dynamic or static > >>>> member. > >>>>>>>>>>>> 3. `leaveGroup=default` (current behavior): > >>>>>>>>> `ClassicKafkaConsumer` > >>>>>>>>>> sends > >>>>>>>>>>>> a `LeaveGroupRequest` for dynamic member, and it does > >>> NOT > >>>>>> send > >>>>>>>>> any > >>>>>>>>>>>> `ConsumerGroupHeartbeatRequest`for static member > >>>>>>>>>>>> 4. `leaveGroup=true`: `AsyncKafkaConsumer` sends a > >>>>>>>>>>>> `ConsumerGroupHeartbeatRequest` with "-1" epoch for > >>>> either > >>>>>> the > >>>>>>>>>> dynamic > >>>>>>>>>>>> or > >>>>>>>>>>>> static member > >>>>>>>>>>>> 5. `leaveGroup=false`: `AsyncKafkaConsumer` sends a > >>>>>>>>>>>> `ConsumerGroupHeartbeatRequest` with "-2" epoch for > >> the > >>>>>> static > >>>>>>>>>> member, > >>>>>>>>>>>> and > >>>>>>>>>>>> it does NOT send any `ConsumerGroupHeartbeatRequest` > >>> for > >>>>>>> dynamic > >>>>>>>>>> member > >>>>>>>>>>>> 6. `leaveGroup=default` (current behavior): > >>>>>>> `AsyncKafkaConsumer` > >>>>>>>>>> sends a > >>>>>>>>>>>> `ConsumerGroupHeartbeatRequest`with "-1" epoch for > >>>> dynamic > >>>>>>> member > >>>>>>>>>> and > >>>>>>>>>>>> "-2" epoch for static member > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Chia-Ping > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >