I am not sure, but I get the impression that we are starting to talk too
much about implementation details now?
From an API POV, I think the new `CloseOptions` class should not have
any "getters" and thus, it's irrelevant how we represent the different
cases in code internally (even if I believe using `Optional` might be a
good way to handle it).
In KS, we use config objects like `CloseOption` all the time, with
static "factory" method (and private constructors), and an internal
sub-class which would have the getters if needed. So I think we should have
public class CloseOptions {
private CloseOptions(Optional<Duration>, Optional<boolean>);
public static CloseOptions timeout(Duration);
public static CloseOptions leaveGroup(boolean);
public CloseOption withTimeout(Duration);
public CloseOption withLeaveGroup(boolean);
}
This allows to call as example:
consumer.close(CloseOptions.leaveGroup(true));
or
consumer.close(
CloseOptions.timeout(Duration.ofMinutes(5)
.withLeaveGroup(false)
);
We can still discuss naming and what overloads we want to add, but in
general, a well established and proven pattern is to have a few static
"entry" methods, with return the object itself to allow chaining with
non-static methods.
I am not sure, why KIP-812 did not follow this pattern... I think it got
it wrong and we should not repeat this "mistake". Maybe we could
actually piggy-pack a cleanup for the existing KS CloseOption object
into this KIP?
-Matthias
On 9/29/24 8:39 AM, TengYao Chi wrote:
Hi Chia-Ping,
Thanks for your feedback.
What I intended to express is that if `Optional.empty()` is passed to the
`timeout`, it will eventually be converted to `DEFAULT_CLOSE_TIMEOUT_MS`,
just as you mentioned.
Apologies for not expressing that clearly and for any confusion caused.
Best regards,
TengYao
Chia-Ping Tsai <chia7...@gmail.com> 於 2024年9月29日 週日 下午10:16寫道:
hi TengYao
I have reviewed the `close()` method implementation for both the Classic
and Async Consumers. I believe the `timeout` parameter could have a default
value, and this default should align with the existing `Consumer#close()`
method, which internally calls the overloaded `Consumer#close(Duration)`
with a default of 30 seconds (`DEFAULT_CLOSE_TIMEOUT_MS`).
If you assign a default value (30s) to CloseOptions#timeout, the consumer
won't be able to differentiate between the "default" and "user-defined"
behaviors.
Therefore, I prefer to leave the timeout empty as the default value,
allowing the consumer to handle it in a way that reflects default behavior.
Best,
Chia-Ping
TengYao Chi <kiting...@gmail.com> 於 2024年9月29日 週日 下午2:23寫道:
Hi Sophie,
Thanks for the suggestions.
I have reviewed the `close()` method implementation for both the Classic
and Async Consumers. I believe the `timeout` parameter could have a
default
value, and this default should align with the existing `Consumer#close()`
method, which internally calls the overloaded `Consumer#close(Duration)`
with a default of 30 seconds (`DEFAULT_CLOSE_TIMEOUT_MS`).
Regarding the `leaveGroup` parameter, since static and dynamic members
have
different default behaviors upon close, and the primitive type boolean
cannot capture all cases, I agree with Chia-Ping’s suggestion of using
Optional instead. This approach avoids potential NPE issues that could
arise from using the boxed type Boolean and allows us to cover all use
cases more effectively.
Given the above, although there is no standard in the Consumer API (as
far
as I know), I believe adopting a fluent API would be beneficial, as it
would be more user-friendly and concise for configuring the necessary
options.
Best,
TengYao
Chia-Ping Tsai <chia7...@gmail.com> 於 2024年9月29日 週日 上午7:07寫道:
hi Sophie
Fewer overloads are preferable, so in my opinion, the consumer should
only
have close() and close(CloseOptions), with the other overloads
deprecated.
That means all options in CloseOptions should be optional, and we
should
use a fluent-style API to add setters for them. This would allow users
to
configure only the necessary options while leaving the rest at their
default values. For example:
// case 0: set both timeout and leaveGroup
new CloseOptions()
.timeout(100)
.leaveGroup(false);
// case 1: set only timeout and leaveGroup is default
new CloseOptions()
.timeout(100)
// case 2: set only leaveGroup, and timeout is default
new CloseOptions()
.leaveGroup(true)
Additionally, all getters of CloseOptions return Optional<>, which can
distinguish between a "default" value and a "user-defined" value. For
another, `close()` can have default implementation by `close(new
CloseOptions())`
Best,
Chia-Ping
Sophie Blee-Goldman <sop...@responsive.dev> 於 2024年9月29日 週日 上午5:52寫道:
@Matthias, regarding your questions in 101, is it fair to summarize
your
points here as (A) only Kafka Streams, but not plain consumer, would
need
to avoid leaving the group on close (for non-static members), and (B)
with
KIP-1088 we will soon have a Streams-specific Consumer API that would
be
more suited to these kinds of Streams-specific APIs, and therefore
-->
(C)
it doesn't make sense to do this KIP now and we should just wait for
1088
so we can apply the API on top of it?
If that's a fair summary, then I would have to disagree. Not sending
a
LeaveGroup on close for dynamic members isn't some highly
specific optimization that could only ever make sense in the context
of
Kafka Streams. IT doesn't really have to do with Kafka Streams at
all,
it's
just a good thing to do for stateful apps where you don't want to
shuffle
around partitions too much after a simple bounce. So imo this KIP
makes
sense to do as-is, and people have been asking for it for quite some
time
so I wouldn't want to delay this any further if possible.
Also, this is technically a bit of an implementation detail so
perhaps
it
doesn't belong in the KIP, but I think we should remove the
"internal.leave.group.pon.close" ConsumerConfig that was used
exclusively
by Kafka Streams since we can now use the new
Consumer#close(CloseOptions)
API for everything. And we should mention doing this in the KIP, even
though it's an internal config, just in case someone out there is
using
it.
@Chia-Ping
Regarding what the default behavior, I think it probably makes sense
to
leave the default behavior of the CloseOptions identical to the
existing
Consumer#close overloads to avoid making things too complicated for
users
to understand. Especially if we're not going to remove the old #close
overloads
@TengYao
Thanks for adding the specific APIs. We should probably first
determine
things like which parameters of CloseOptions should be required and
which
can be left to the default (plus what that default is: see above
conversation with Chia-Ping) and then we can design the API around
that.
I
noticed for example that with the current proposal, it would be
impossible
to set both the leaveGroup and timeOut parameters of the
CloseOptions.
We
need to make sure the available public constructors allow users to
set
the
full range of configs. We could also use a fluent-style API like we
do
in
Kafka Streams for config objects. Not sure what (if anything) is the
standard for Consumer APIs?
Finally, one open-ended question I have for everyone here: should the
leaveGroup config be a required parameter of CloseOptions? Or should
we
allow users to pass in an "empty" CloseOptions to leave both the
timeout
and leaveGroup behavior to the default?
On Sat, Sep 28, 2024 at 9:58 AM Chia-Ping Tsai <chia7...@gmail.com>
wrote:
hi TengYao
Could you please consider adding a "default" behavior for
leaveGroup?
As
I
previously mentioned, leaveGroup=true is not ideal as the default
for
static members, and similarly, leaveGroup=false is not suitable as
the
default for dynamic members.
Maybe we could change the type of leaveGroup to Optional<Boolean>,
allowing
it to represent three distinct behaviors.
Best,
Chia-Ping
TengYao Chi <kiting...@gmail.com> 於 2024年9月29日 週日 上午12:51寫道:
Hi Sophie
Thanks for feedback.
I have updated the Public Interface part accordingly.
Please take a look.
Best,
TengYao
TengYao Chi <kiting...@gmail.com> 於 2024年9月28日 週六 下午1:26寫道:
Hi Matthias,
Thanks for the explanation, particularly regarding the
important
considerations for both the plain consumer and Kafka Streams
use
cases.
In this case, I think it would be better to stick with my
initial
proposal. We should give plain consumers the ability to
determine
whether
to send a leave group request or not, with clear documentation
highlighting
the potential downsides. This could also provide flexibility
for
future
features.
Best,
TengYao
Chia-Ping Tsai <chia7...@gmail.com> 於 2024年9月28日 週六 上午3:27寫道:
hi Matthias
100: Why do we want to distinguish between the classic and
the
new
async
consumer? Should they not have the same (user facing)
behavior?
Or
maybe
I misunderstand something. Can one catch we up what epoch "-1"
vs
epoch
"-2" means?
I apologize for any confusion in my earlier explanation. The
way a
consumer
leaves a group varies between the Classic Consumer and the
Async
Consumer:
- The *Classic Consumer* uses a LeaveGroupRequest but does
*not*
send
this
request for static members.
- In contrast, the *Async Consumer* sends a
ConsumerGroupHeartbeatRequest.
If the member is static, this request is sent with an epoch
value
of
-2,
indicating that the static member has temporarily left the
group
and
is
*not* removed. An epoch of -1 in the CONSUMER protocol
signifies
that
the
static member is treated as a dynamic member and will leave
the
group
completely.
Hence, even if not useful for the plain consumer to disable
sending
a
leave-group-request, it might be worth to add a generic
enable/disable
API
for both dynamic and static groups, so KS can use this API
(and
we
could
remove the internal consumer config, which is a workaround
anyway).
I agree that having a generic enable/disable API would be
beneficial,
especially if we can provide comprehensive documentation. This
documentation should clearly outline the potential downsides
of
not
sending
a LEAVE_REQUEST for dynamic members, ensuring users are
well-informed
about
the implications of their choices.
Best,
Chia-Ping
Matthias J. Sax <mj...@apache.org> 於 2024年9月28日 週六 上午2:07寫道:
Thanks for the KIP. Two questions/comments:
100: Why do we want to distinguish between the classic and
the
new
async
consumer? Should they not have the same (user facing)
behavior?
Or
maybe
I misunderstand something. Can one catch we up what epoch
"-1"
vs
epoch
"-2" means?
101: I think we need to distinguish between the plain
consumer
and
the
KS case.
Plain consumer: for this case, atm user don't have control
at
all,
but
it's hard coded when a leave group request is sent. If we
only
consider
this case, the current KIP to allow sending a leave-group
request
for
static members is sufficient. I agree that disabling
leave-group
request
for dynamic member is not necessary for the plain consumer
case.
However, for the KS case it's different. Because KS uses the
internal
config to disable sending leave group request for dynamic
members,
we
lack an user facing API to enable sending a leave group
request
for
this
case, and if we only allow to enable sending leave group
request
for
static members on the consumer, the KIP would fall short to
close
this
gap.
Hence, even if not useful for the plain consumer to disable
sending
a
leave-group-request, it might be worth to add a generic
enable/disable
API for both dynamic and static groups, so KS can use this
API
(and
we
could remove the internal consumer config, which is a
workaround
anyway).
On the other hand, given the light of KIP-1088, maybe there
are
other
ways to fix it on the KS side? I think the goal should be to
remove
the
internal consumer config (as it's static, and we cannot
overwrite
it
at
runtime), and to give KS a way to dynamically send a
leave-group-request
on close() -- but maybe we could build this on an internal
consumer
API,
and not make it public? For this case, the current KIP would
be
sufficient.
-Matthias
On 9/26/24 8:19 PM, Sophie Blee-Goldman wrote:
Thanks for the KIP! Quick request for readability, can you
please
include
the exact APIs that you're proposing to add or change
under
the
"Public
Interfaces" section? The KIP should display the actual
method
signature
and
any applicable javadocs for new public APIs.
You can look at other KIPs for a clear sense of what it
should
contain,
but
here's one example you could work from:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception
On Thu, Sep 26, 2024 at 6:22 PM Chia-Ping Tsai <
chia7...@gmail.com>
wrote:
I think I’m actually OK with leaving it as leaveGroup
with a
lot
of
documentation that warns users away from changing it
arbitrarily.
Pardon me, I just want to ensure we are all on the same
page.
1. `leaveGroup=true`: `ClassicKafkaConsumer` sends a
`LeaveGroupRequest` for either the dynamic or static
member.
2. `leaveGroup=false`: `ClassicKafkaConsumer` does
not
send
any
`
LeaveGroupRequest` for either the dynamic or static
member.
3. `leaveGroup=default` (current behavior):
`ClassicKafkaConsumer`
sends
a `LeaveGroupRequest` for dynamic member, and it does
NOT
send
any
`ConsumerGroupHeartbeatRequest`for static member
4. `leaveGroup=true`: `AsyncKafkaConsumer` sends a
`ConsumerGroupHeartbeatRequest` with "-1" epoch for
either
the
dynamic
or
static member
5. `leaveGroup=false`: `AsyncKafkaConsumer` sends a
`ConsumerGroupHeartbeatRequest` with "-2" epoch for
the
static
member,
and
it does NOT send any `ConsumerGroupHeartbeatRequest`
for
dynamic
member
6. `leaveGroup=default` (current behavior):
`AsyncKafkaConsumer`
sends a
`ConsumerGroupHeartbeatRequest`with "-1" epoch for
dynamic
member
and
"-2" epoch for static member
Best,
Chia-Ping