Yes, it requires an internal class. But I would rather optimize for a
clean user facing API. Personally, I don't think it will make our code
base significantly more complex, so I think it's worth the tradeoff.
And yes, we would need to take user provided `CloseOption` object and
create `CloseOptionInternal` object, but as it's not on the hot code
path, and both objects are short lived anyway, it seems not to have any
actual perf impact.
In the end, I just state my POV, and if people do not want to follow the
way KS does it, but add the getters directly to the new `CloseOption`
class fine with me, too. I would just personally prefer to other way,
but I won't block the KIP on this decision. Either way would be fine
with me.
-Matthias
On 10/1/24 5:48 AM, Chia-Ping Tsai wrote:
@Matthias
From an impl POV, the internal member can be either (1) package-private
to allow direct access within the same package, or (2) protected in
combination with an internal sub-class in an internal package to add the
necessary getters. As an example cf `Consumed` and `ConsumedInternal`
I like the idea of a minimal API, but not exposing the getter complicates
things. This approach would require Consumed to provide several protected
methods and introduce an additional "internal" class to expose them.
Moreover, it results in repeated object creation. If we all agree to use a
specific package name, like an "internal" package, to indicate which
classes/methods users should avoid, we could also consider adding an
"internal" annotation. This would make it clear that the getters are
intended for internal use only. For example:
class CloseOptions {
@InterfaceStability.Internal // this means this method is not a public
API so users should avoid using it.
public Optional<Duration> timeout() {
return timeout;
}
}
Best,
Chia-Ping
TengYao Chi <kiting...@gmail.com> 於 2024年10月1日 週二 下午4:54寫道:
Hi Kirk,
Thanks for reviewing the KIP and providing detailed feedback.
KT4: I would prefer to keep the name as `timeout` since its type is
`Duration`, and `Duration` provides various ways to create an instance, not
just using milliseconds.
KT5: Good point! I’ve added some additional description about the behavior
when `null` is passed as `CloseOptions`.
KT6: I would like to retain the use of `Optional` for this case, as it
helps differentiate between the default settings and user-defined ones. It
also provides better semantics and null-safety. Let me know if you have any
further thoughts on this.
KT7: I agree that enums offer better readability and semantics. I've
applied this change and slightly adjusted the name to
`GroupMembershipOperations` for precision.
KT8: Thanks for pointing this out. I've corrected the test plan to specify
that the default timeout is 30 seconds instead of `Long.MAX_VALUE`.
Lastly, I’ve also added content to the "Compatibility, Deprecation, and
Migration Plan" section, as we are planning to deprecate the
`Consumer#close(Duration)` method. Please take a look and let me know what
you think.
Best,
TengYao
Andrew Schofield <andrew_schofield_j...@outlook.com> 於 2024年10月1日 週二
下午2:41寫道:
+1 from me on the fluent API using the `with` prefix too.
Thanks,
Andrew
________________________________________
From: TengYao Chi <kiting...@gmail.com>
Sent: 01 October 2024 05:23
To: dev@kafka.apache.org <dev@kafka.apache.org>
Subject: Re: [DISCUSS] KIP-1092: Extend Consumer#close with an option to
leave the group or not
Hello everyone,
I'm also +1 on using the fluent API and having the `with` prefix in
setter
method names.
Regarding Matthias' point, I agree with Sophie that we should keep the
`CloseOptions` classes separate.
These two `CloseOptions` serve different purposes, and while they may
occasionally share some similarities at the moment, keeping them separate
allows more flexibility for their own future changes. This way, each
class
can evolve independently based on the specific needs of the Consumer and
Kafka Streams without introducing unnecessary complexity.
Best,
TengYao
Matthias J. Sax <mj...@apache.org> 於 2024年10月1日 週二 上午7:38寫道:
Sophie, yes, that a fair summary, and yes, it was only an alternative
idea for the case that people think, allowing to disable leave-group
request for the plain consumer is not desirable. Seems we are actually
on the same page.
(And yes, it was meant for this thread, not KIP-1094...)
On 9/30/24 4:32 PM, Matthias J. Sax wrote:
Kirk,
I think good API design principle is to expose the minimum require
API
to users, and users don't need getters, that's why we don't have any
getters in the KS config object classes. Getters are only needed
internally.
From an impl POV, the internal member can be either (1)
package-private
to allow direct access within the same package, or (2) protected in
combination with an internal sub-class in an internal package to add
the
necessary getters. As an example cf `Consumed` and `ConsumedInternal`
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
It provides a clean separation of user-facing public API, vs internal
APIs.
For `timeout()` given that it takes a `Duration` argument (not a
`long`), I believe the current name is correct?
The like the idea of using an enum for send-leave-group request flag.
For `Optional`, if we remove the public getters, the question goes
away.
About merging both `CloseOptions`: fine with me to keep them
separated.
Was just an idea :)
-Matthias
On 9/30/24 3:14 PM, Sophie Blee-Goldman wrote:
+1 to using the fluent API and including "with" in the setter names.
I also think Matthias raised a good point, that maybe it would be a
good
time to "fix" this issue in the CloseOptions for the
KafkaStreams#close
method, to conform to this API format. As for his other "random
idea"
about
combining the two: Personally I would prefer to keep a separate
CloseOptions class for the Consumer vs for Kafka Streams, since they
will
not necessarily always have the same exact semantics and parameters.
Even
if the API ends up looking more or less the same right now --
although
consider that the "default" behavior is different for the consumer
vs
Kafka
Streams, if leaveGroup is not specified in the CloseOptions then the
consumer will opt to leave the group in some cases whereas Streams
never
will. So already there is some difference between the Consumer and
Streams
CloseOptions, so I'd rather not combine them
On Mon, Sep 30, 2024 at 11:16 AM Chia-Ping Tsai <chia7...@gmail.com
wrote:
Overall, the "static method builder" pattern seems better to me,
and
thus I would prefer to make it the "gold standard" and we can see
what we
can do for `Admin API` mid/long term?
Since we want to avoid complicated compatibility issues, adding a
static
method builder to Admin options seems more acceptable.
However, the naming convention for the "setter" method might be a
concern.
Kafka Streams (KS) uses "withXXX," while Admin options do not
include
the
"with" prefix. Additionally, the RPC requests follow the setXXX
naming
convention.
I’m unsure if this is the right time to align the fluent pattern
naming
across the entire Kafka project, but it would be great if we could
reach a
consensus on "gold standard".
I'm +1 to using the static method builder and `with` prefix due to
following advantages.
-
1. A static method builder allows making the constructor private,
which can
prevent unintended inheritance
-
2. The "with" prefix makes it easier to search for setters within
the
option class
Any feedback?
Btw: I was also wondering, if we should re-use the new consumer
`CloseOption` class for `KafkaStreams#close()` and deprecate the KS
`CloseOption`
class? Not sure. Just another "random" idea.
I guess it needs another KIP after this KIP gets merged.
Best,
Chia-Ping
Matthias J. Sax <mj...@apache.org> 於 2024年10月1日 週二 上午1:16寫道:
I am also in favor of consistent APIs. That's very good point. I
did
not
take `Admin` API into account, and I am not aware that consumer /
producer would have config object classes?
Seems we are in a tricky situation here, because "consistent API"
to
me
means producer, consumer, admin and KS.
The KS surface area might be the largest one (even if there is a
long
list of `XxxOption` classes for Admin API), and we do follow the
pattern
as described. It's of course not desirable to just change the
whole
Admin API (and neither the KS API), but it might be good to agree
on a
"gold standard" and do everything new accordingly?
Given that producer / consumer do not have any config object
classes
yet, it seems to be the question if they should follow the Admin
pattern
or the KS pattern. -- I tend to think, following the KS pattern
might be
better? But yes, I see your POV to say it should follow the Admin
API
pattern, however, it would imply that we "need" to change all the
KS
APIs.
Overall, the "static method builder" pattern seems better to me,
and
thus I would prefer to make it the "gold standard" and we can see
what
we can do for `Admin API` mid/long term?
Let's see what others think.
Btw: I was also wondering, if we should re-use the new consumer
`CloseOption` class for `KafkaStreams#close()` and deprecate the
KS
`CloseOption` class? Not sure. Just another "random" idea.
-Matthias
On 9/29/24 12:08 PM, Chia-Ping Tsai wrote:
From an API POV, I think the new `CloseOptions` class should
not
have
any "getters" and thus, it's irrelevant how we represent the
different
cases in code internally (even if I believe using `Optional`
might
be a
good way to handle it).
If we choose to avoid using getters, consumers would have to
access
the
internal variables directly, similar to how it's done in the
streams
CloseOptions. While this is a matter of preference, it's worth
noting
that
Admin options do provide both setters and getters. In my opinion,
since
all
of these options are part of the kafka-clients code, they should
adhere
to
a consistent coding style.
In KS, we use config objects like `CloseOption` all the time,
with
static
"factory" method (and private constructors), and an internal
sub-class
which would have the getters if needed. So I think we should have
I value the static factory methods for their convenience,
allowing
users
to
quickly create an object when they want to set only one option.
However,
while I don't want to sound repetitive, maintaining a consistent
pattern
across the API is essential.
Best,
Chia-Ping
Matthias J. Sax <mj...@apache.org> 於 2024年9月30日 週一 上午2:26寫
道:
I am not sure, but I get the impression that we are starting to
talk
too
much about implementation details now?
From an API POV, I think the new `CloseOptions` class should
not
have
any "getters" and thus, it's irrelevant how we represent the
different
cases in code internally (even if I believe using `Optional`
might
be
a
good way to handle it).
In KS, we use config objects like `CloseOption` all the time,
with
static "factory" method (and private constructors), and an
internal
sub-class which would have the getters if needed. So I think we
should
have
public class CloseOptions {
private CloseOptions(Optional<Duration>,
Optional<boolean>);
public static CloseOptions timeout(Duration);
public static CloseOptions leaveGroup(boolean);
public CloseOption withTimeout(Duration);
public CloseOption withLeaveGroup(boolean);
}
This allows to call as example:
consumer.close(CloseOptions.leaveGroup(true));
or
consumer.close(
CloseOptions.timeout(Duration.ofMinutes(5)
.withLeaveGroup(false)
);
We can still discuss naming and what overloads we want to add,
but
in
general, a well established and proven pattern is to have a few
static
"entry" methods, with return the object itself to allow chaining
with
non-static methods.
I am not sure, why KIP-812 did not follow this pattern... I
think
it
got
it wrong and we should not repeat this "mistake". Maybe we could
actually piggy-pack a cleanup for the existing KS CloseOption
object
into this KIP?
-Matthias
On 9/29/24 8:39 AM, TengYao Chi wrote:
Hi Chia-Ping,
Thanks for your feedback.
What I intended to express is that if `Optional.empty()` is
passed to
the
`timeout`, it will eventually be converted to
`DEFAULT_CLOSE_TIMEOUT_MS`,
just as you mentioned.
Apologies for not expressing that clearly and for any confusion
caused.
Best regards,
TengYao
Chia-Ping Tsai <chia7...@gmail.com> 於 2024年9月29日 週日 下午
10:16寫道:
hi TengYao
I have reviewed the `close()` method implementation for both
the
Classic
and Async Consumers. I believe the `timeout` parameter could
have
a
default
value, and this default should align with the existing
`Consumer#close()`
method, which internally calls the overloaded
`Consumer#close(Duration)`
with a default of 30 seconds (`DEFAULT_CLOSE_TIMEOUT_MS`).
If you assign a default value (30s) to CloseOptions#timeout,
the
consumer
won't be able to differentiate between the "default" and
"user-defined"
behaviors.
Therefore, I prefer to leave the timeout empty as the default
value,
allowing the consumer to handle it in a way that reflects
default
behavior.
Best,
Chia-Ping
TengYao Chi <kiting...@gmail.com> 於 2024年9月29日 週日 下午2:23
寫道:
Hi Sophie,
Thanks for the suggestions.
I have reviewed the `close()` method implementation for both
the
Classic
and Async Consumers. I believe the `timeout` parameter could
have a
default
value, and this default should align with the existing
`Consumer#close()`
method, which internally calls the overloaded
`Consumer#close(Duration)`
with a default of 30 seconds (`DEFAULT_CLOSE_TIMEOUT_MS`).
Regarding the `leaveGroup` parameter, since static and
dynamic
members
have
different default behaviors upon close, and the primitive
type
boolean
cannot capture all cases, I agree with Chia-Ping’s suggestion
of
using
Optional instead. This approach avoids potential NPE issues
that
could
arise from using the boxed type Boolean and allows us to
cover
all
use
cases more effectively.
Given the above, although there is no standard in the
Consumer
API
(as
far
as I know), I believe adopting a fluent API would be
beneficial, as
it
would be more user-friendly and concise for configuring the
necessary
options.
Best,
TengYao
Chia-Ping Tsai <chia7...@gmail.com> 於 2024年9月29日 週日 上午
7:07寫道:
hi Sophie
Fewer overloads are preferable, so in my opinion, the
consumer
should
only
have close() and close(CloseOptions), with the other
overloads
deprecated.
That means all options in CloseOptions should be optional,
and
we
should
use a fluent-style API to add setters for them. This would
allow
users
to
configure only the necessary options while leaving the rest
at
their
default values. For example:
// case 0: set both timeout and leaveGroup
new CloseOptions()
.timeout(100)
.leaveGroup(false);
// case 1: set only timeout and leaveGroup is default
new CloseOptions()
.timeout(100)
// case 2: set only leaveGroup, and timeout is default
new CloseOptions()
.leaveGroup(true)
Additionally, all getters of CloseOptions return Optional<>,
which
can
distinguish between a "default" value and a "user-defined"
value.
For
another, `close()` can have default implementation by
`close(new
CloseOptions())`
Best,
Chia-Ping
Sophie Blee-Goldman <sop...@responsive.dev> 於 2024年9月29日 週日
上午5:52寫道:
@Matthias, regarding your questions in 101, is it fair to
summarize
your
points here as (A) only Kafka Streams, but not plain
consumer,
would
need
to avoid leaving the group on close (for non-static
members),
and
(B)
with
KIP-1088 we will soon have a Streams-specific Consumer API
that
would
be
more suited to these kinds of Streams-specific APIs, and
therefore
-->
(C)
it doesn't make sense to do this KIP now and we should just
wait
for
1088
so we can apply the API on top of it?
If that's a fair summary, then I would have to disagree.
Not
sending
a
LeaveGroup on close for dynamic members isn't some highly
specific optimization that could only ever make sense in
the
context
of
Kafka Streams. IT doesn't really have to do with Kafka
Streams at
all,
it's
just a good thing to do for stateful apps where you don't
want to
shuffle
around partitions too much after a simple bounce. So imo
this
KIP
makes
sense to do as-is, and people have been asking for it for
quite
some
time
so I wouldn't want to delay this any further if possible.
Also, this is technically a bit of an implementation detail
so
perhaps
it
doesn't belong in the KIP, but I think we should remove the
"internal.leave.group.pon.close" ConsumerConfig that was
used
exclusively
by Kafka Streams since we can now use the new
Consumer#close(CloseOptions)
API for everything. And we should mention doing this in the
KIP,
even
though it's an internal config, just in case someone out
there is
using
it.
@Chia-Ping
Regarding what the default behavior, I think it probably
makes
sense
to
leave the default behavior of the CloseOptions identical to
the
existing
Consumer#close overloads to avoid making things too
complicated
for
users
to understand. Especially if we're not going to remove the
old
#close
overloads
@TengYao
Thanks for adding the specific APIs. We should probably
first
determine
things like which parameters of CloseOptions should be
required
and
which
can be left to the default (plus what that default is: see
above
conversation with Chia-Ping) and then we can design the API
around
that.
I
noticed for example that with the current proposal, it
would
be
impossible
to set both the leaveGroup and timeOut parameters of the
CloseOptions.
We
need to make sure the available public constructors allow
users
to
set
the
full range of configs. We could also use a fluent-style API
like
we
do
in
Kafka Streams for config objects. Not sure what (if
anything)
is
the
standard for Consumer APIs?
Finally, one open-ended question I have for everyone here:
should
the
leaveGroup config be a required parameter of CloseOptions?
Or
should
we
allow users to pass in an "empty" CloseOptions to leave
both
the
timeout
and leaveGroup behavior to the default?
On Sat, Sep 28, 2024 at 9:58 AM Chia-Ping Tsai <
chia7...@gmail.com
wrote:
hi TengYao
Could you please consider adding a "default" behavior for
leaveGroup?
As
I
previously mentioned, leaveGroup=true is not ideal as the
default
for
static members, and similarly, leaveGroup=false is not
suitable
as
the
default for dynamic members.
Maybe we could change the type of leaveGroup to
Optional<Boolean>,
allowing
it to represent three distinct behaviors.
Best,
Chia-Ping
TengYao Chi <kiting...@gmail.com> 於 2024年9月29日 週日 上午
12:51寫道:
Hi Sophie
Thanks for feedback.
I have updated the Public Interface part accordingly.
Please take a look.
Best,
TengYao
TengYao Chi <kiting...@gmail.com> 於 2024年9月28日 週六 下
午1:26寫道:
Hi Matthias,
Thanks for the explanation, particularly regarding the
important
considerations for both the plain consumer and Kafka
Streams
use
cases.
In this case, I think it would be better to stick with
my
initial
proposal. We should give plain consumers the ability to
determine
whether
to send a leave group request or not, with clear
documentation
highlighting
the potential downsides. This could also provide
flexibility
for
future
features.
Best,
TengYao
Chia-Ping Tsai <chia7...@gmail.com> 於 2024年9月28日 週六
上午3:27寫道:
hi Matthias
100: Why do we want to distinguish between the
classic
and
the
new
async
consumer? Should they not have the same (user facing)
behavior?
Or
maybe
I misunderstand something. Can one catch we up what
epoch
"-1"
vs
epoch
"-2" means?
I apologize for any confusion in my earlier
explanation.
The
way a
consumer
leaves a group varies between the Classic Consumer and
the
Async
Consumer:
- The *Classic Consumer* uses a LeaveGroupRequest but
does
*not*
send
this
request for static members.
- In contrast, the *Async Consumer* sends a
ConsumerGroupHeartbeatRequest.
If the member is static, this request is sent with an
epoch
value
of
-2,
indicating that the static member has temporarily left
the
group
and
is
*not* removed. An epoch of -1 in the CONSUMER protocol
signifies
that
the
static member is treated as a dynamic member and will
leave
the
group
completely.
Hence, even if not useful for the plain consumer to
disable
sending
a
leave-group-request, it might be worth to add a generic
enable/disable
API
for both dynamic and static groups, so KS can use this
API
(and
we
could
remove the internal consumer config, which is a
workaround
anyway).
I agree that having a generic enable/disable API would
be
beneficial,
especially if we can provide comprehensive
documentation.
This
documentation should clearly outline the potential
downsides
of
not
sending
a LEAVE_REQUEST for dynamic members, ensuring users are
well-informed
about
the implications of their choices.
Best,
Chia-Ping
Matthias J. Sax <mj...@apache.org> 於 2024年9月28日 週六
上午2:07寫道:
Thanks for the KIP. Two questions/comments:
100: Why do we want to distinguish between the classic
and
the
new
async
consumer? Should they not have the same (user facing)
behavior?
Or
maybe
I misunderstand something. Can one catch we up what
epoch
"-1"
vs
epoch
"-2" means?
101: I think we need to distinguish between the plain
consumer
and
the
KS case.
Plain consumer: for this case, atm user don't have
control
at
all,
but
it's hard coded when a leave group request is sent. If
we
only
consider
this case, the current KIP to allow sending a
leave-group
request
for
static members is sufficient. I agree that disabling
leave-group
request
for dynamic member is not necessary for the plain
consumer
case.
However, for the KS case it's different. Because KS
uses
the
internal
config to disable sending leave group request for
dynamic
members,
we
lack an user facing API to enable sending a leave
group
request
for
this
case, and if we only allow to enable sending leave
group
request
for
static members on the consumer, the KIP would fall
short
to
close
this
gap.
Hence, even if not useful for the plain consumer to
disable
sending
a
leave-group-request, it might be worth to add a
generic
enable/disable
API for both dynamic and static groups, so KS can use
this
API
(and
we
could remove the internal consumer config, which is a
workaround
anyway).
On the other hand, given the light of KIP-1088, maybe
there
are
other
ways to fix it on the KS side? I think the goal should
be to
remove
the
internal consumer config (as it's static, and we
cannot
overwrite
it
at
runtime), and to give KS a way to dynamically send a
leave-group-request
on close() -- but maybe we could build this on an
internal
consumer
API,
and not make it public? For this case, the current KIP
would
be
sufficient.
-Matthias
On 9/26/24 8:19 PM, Sophie Blee-Goldman wrote:
Thanks for the KIP! Quick request for readability,
can
you
please
include
the exact APIs that you're proposing to add or change
under
the
"Public
Interfaces" section? The KIP should display the
actual
method
signature
and
any applicable javadocs for new public APIs.
You can look at other KIPs for a clear sense of what
it
should
contain,
but
here's one example you could work from:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception
On Thu, Sep 26, 2024 at 6:22 PM Chia-Ping Tsai <
chia7...@gmail.com>
wrote:
I think I’m actually OK with leaving it as
leaveGroup
with a
lot
of
documentation that warns users away from changing it
arbitrarily.
Pardon me, I just want to ensure we are all on the
same
page.
1. `leaveGroup=true`: `ClassicKafkaConsumer`
sends
a
`LeaveGroupRequest` for either the dynamic or
static
member.
2. `leaveGroup=false`:
`ClassicKafkaConsumer`
does
not
send
any
`
LeaveGroupRequest` for either the dynamic or
static
member.
3. `leaveGroup=default` (current behavior):
`ClassicKafkaConsumer`
sends
a `LeaveGroupRequest` for dynamic member, and
it
does
NOT
send
any
`ConsumerGroupHeartbeatRequest`for static
member
4. `leaveGroup=true`: `AsyncKafkaConsumer`
sends a
`ConsumerGroupHeartbeatRequest` with "-1"
epoch
for
either
the
dynamic
or
static member
5. `leaveGroup=false`: `AsyncKafkaConsumer`
sends a
`ConsumerGroupHeartbeatRequest` with "-2"
epoch
for
the
static
member,
and
it does NOT send any
`ConsumerGroupHeartbeatRequest`
for
dynamic
member
6. `leaveGroup=default` (current behavior):
`AsyncKafkaConsumer`
sends a
`ConsumerGroupHeartbeatRequest`with "-1"
epoch
for
dynamic
member
and
"-2" epoch for static member
Best,
Chia-Ping