Hi, Andrew,
Thanks for addressing all the comments. The KIP looks good to me now.
Jun
On Mon, May 6, 2024 at 2:15 PM Andrew Schofield
wrote:
> Hi Jun,
> I have removed AdminClient.listGroups and the associated classes and
> interfaces.
>
> Version 6 of the ListGroups RPC remains because it add
Hi, Andrew,
Removing AdminClient.listGroups() and the LisGroups RPC for now sounds good
to me.
Thanks,
Jun
On Mon, May 6, 2024 at 11:10 AM Andrew Schofield
wrote:
> Hi Jun,
> Thanks for the reply.
>
> 162. I’ve mentioned before that I plan another KIP for administration
> of groups. I think t
Hi, Andrew,
Thanks for the reply.
162. It's fine to start with just the group type. Since ListGroups() is a
generic API, I want to make sure that it covers all existing groups.
Currently, GroupType only has "classic" and "consumer", both of which seem
to be related to groups formed by consumers s
Hi Jun,
Thanks for your reply.
161. ShareGroupListing and ShareGroupDescription are using
the same pattern as ConsumerGroupListing and
ConsumerGroupDescription. I have gone for consistency which
I think is probably best here. It’s what I would expect if I had previously
used the admin API for cons
Hi, Andrew,
Thanks for the reply. A few more comments.
161. ShareGroupListing.state() returns an optional, but
ShareGroupDescription.state() does not. Should we make them consistent?
Also, it seems a bit weird to return optional with an UNKNOWN state.
162. Should GroupListing include ProtocolTyp
Hi Jun,
Thanks for your reply.
147. Yes, I see what you mean. The rebalance latency will indeed
be very short by comparison. I have removed the rebalance latency
metrics from the client and retained the rebalance count and rate.
150. Yes, I think so. I have tweaked the text so that the simple
ass
Hi, Andrew,
Thanks for the reply.
147. There seems to be some difference between consumer groups and share
groups. In the consumer groups, if a client receives a heartbeat response
to revoke some partitions, it may have to commit offsets before revoking
partitions or it may have to call the rebal
Hi Jun,
Thanks for the response.
147. I am trying to get a correspondence between the concepts and
metrics of consumer groups and share groups. In both cases,
the client doesn’t strictly know when the rebalance starts. All it knows
is when it has work to do in order to perform its part of a rebala
Hi, Andrew,
Thanks for the reply.
147. " it makes a judgement about whether an assignment received is equal
to what it already is using."
If a client receives an assignment different from what it has, it indicates
the end of the rebalance. But how does the client know when the rebalance
starts? I
Hi Jun,
Thanks for your reply.
147. Perhaps the easiest is to take a look at the code in
o.a.k.clients.consumer.internal.MembershipManagerImpl.
This class is part of the new consumer group protocol
code in the client. It makes state transitions based on
the heartbeat requests and responses, and it
Hi, Andrew,
Thanks for the reply.
147. "The measurement is certainly from the point of view of the client,
but it’s driven by sending and receiving heartbeats rather than whether the
client triggered the rebalance itself."
Hmm, how does a client know which heartbeat response starts a rebalance?
Hi Jun,
Thanks for the reply and sorry for the delay in responding.
123. Yes, I didn’t quite get your point earlier. The member
epoch is bumped by the GC when it sends a new assignment.
When the member sends its next heartbeat, it echoes back
the member epoch, which will confirm the receipt of the
Hi David,
Thanks for your response.
001: OK, I'll include converting an empty classic or consumer group to
a share group in the thinking for the future KIP on group management that
I have in mind for later in 2024.
004: Added to the KIP explicitly.
006: That description helps.
I have changed th
Hi, Andrew,
Thanks for the reply.
123. "Rather than add the group epoch to the ShareGroupHeartbeat, I have
decided to go for TopicPartitions in ShareGroupHeartbeatRequest which
mirrors ConsumerGroupHeartbeatRequest."
ShareGroupHeartbeat.MemberEpoch is the group epoch, right? Is that enough
for co
Hi Andrew,
Thanks for your responses and sorry for my late reply.
001: Makes sense. One thing that we could consider here is to allow
converting an empty classic or consumer group to a share group. If already
do this between empty classic and consumer groups.
004: I see. It would be great to cal
Hi Jun,
Thanks for the response.
123. Of course, ShareGroupHearbeat started off as ConsumerGroupHeartbeat
and then unnecessary fields were removed. In the network issue case,
there is not currently enough state being exchanged to be sure an assignment
was received.
Rather than add the group epoc
Hi, Andrew,
Thanks for the response.
123. I thought the network issue can be covered with the group epoch.
Basically, if the assignment is to be changed, GC bumps up the epoch first,
but doesn't expose the new epoch to members until the assignment is
complete (including initializing the sharePart
Hi Jun,
Thanks for your reply.
123. The GC only sends the assignment in ShareGroupHeartbeat
response if the member has just joined, or the assignment has been
recalculated. This latter condition is met when the GC fails over.
With a consumer group, it works a little differently. The heartbeating
Hi, Andrew,
Thanks for the reply.
123. "it doesn’t need to confirm the assignment back to the GC."
Hmm, I thought the member needs to confirm the assignment to GC to
avoid GC including the assignment in the heartbeat response continuously. I
assume this is done by including the new group epoch
Hi Jun,
Thanks for the reply.
123. Every time the GC fails over, it needs to recompute the assignment
for every member. However, the impact of re-assignment is not that onerous.
If the recomputed assignments are the same, which they may well be, there
is no impact on the members at all.
On receiv
Hi, Andrew,
Thanks for the reply.
123. "The share group does not persist the target assignment."
What's the impact of this? Everytime that GC fails over, it needs to
recompute the assignment for every member. Do we expect the member
assignment to change on every GC failover?
125. Should the GC
Hi Jun,
Thanks for your comments.
120. Thanks. Fixed.
121. ShareUpdateValue.SnapshotEpoch indicates which snapshot
the update applies to. It should of course be the snapshot that precedes
it in the log. It’s just there to provide a consistency check.
I also noticed that ShareSnapshotValue was mi
Hi, Andrew,
Thanks for the reply. A few more comments.
120. There is still reference to ConsumerGroupMetadataKey.
121. ShareUpdateValue.SnapshotEpoch: Should we change it since it's not a
snapshot?
122. ConsumerGroupMemberMetadataValue includes epoch, but
ShareGroupMemberMetadataValue does not.
Hi Jun,
Thanks for you reply.
42.1. That’s a sensible improvement. Done.
47,56. Done. All instances of BaseOffset changed to FirstOffset.
105. I think that would be in a future KIP. Personally, I don’t mind having
a non-contiguous set of values in this KIP.
114. Done.
115. If the poll is just
Hi, Andrew,
Thanks for the updated KIP.
42.1 "If the share group offset is altered multiple times when the group
remains empty, it would be harmless if the same state epoch was reused to
initialize the state."
Hmm, how does the partition leader know for sure that it has received the
latest share
Hi David,
Thanks for reviewing the KIP and your comments.
001: I think the `group.type` is the area of the KIP that has had the most
comments.
I think you’re right that a better approach would be to make the creation of
the group
explicit, for users who want that. I have removed `group.type` fro
Hi Andrew,
Thanks for the KIP. This work is really exciting.
I finally had a bit of time to go through the KIP. I need to read it a
second time in order to get into the details. I have noted a few
points/questions:
001: The dynamic config to force the group type is really weird. As you
said, gro
Hi,
77. I’ve updated the KIP to use log retention rather than log compaction.
The basic ideas of what to persist are unchanged. It makes a few changes:
* It changes the record names: ShareCheckpoint -> ShareSnapshot and
ShareDelta -> ShareUpdate. They’re equivalent, but renaming makes it
simpl
Hi Jun,
Thanks for your questions.
41.
41.1. The partition leader obtains the state epoch in the response from
ReadShareGroupState. When it becomes a share-partition leader,
it reads the share-group state and one of the things it learns is the
current state epoch. Then it uses the state epoch in a
Hi Andrew (and Tom)
> Imagine that we do have `group.type` as a group configuration. How would
> we end up with groups with
> the same ID but different types on the two ends of MM2? Assuming that both
> ends have KIP-932 enabled,
> either the configuration was not set, and a consumer group was made
Hi, Andrew,
Thanks for the reply. A few more comments.
41.
41.1 How does the partition leader obtain the group epoch to set
WriteShareGroupStateRequest.StateEpoch?
41.2 What's the benefit of having the group coordinator initialize the
state and the partition leader set the SPSO? It seems simpler
Hi David,
Thanks for your questions.
70. The Group Coordinator communicates with the Share Coordinator over RPCs.
In the general case, it’s an inter-broker call. It is probably possible to
optimise
for the situation in which the appropriate GC and SC shards are co-located, but
the
KIP does not d
Hi Tom,
Thanks for your question. I have not spent any time learning about MM2
previously, so please forgive my ignorance.
The question of an ordering requirement on upgrading clusters seems real to me.
As I understand it, MM2 can mirror topic configurations between clusters. I
think
you’re corre
Andrew, thanks for the KIP! This is a pretty exciting effort.
I've finally made it through the KIP, still trying to grok the whole thing.
Sorry if some of my questions are basic :)
Concepts:
70. Does the Group Coordinator communicate with the Share Coordinator over
RPC or directly in-process?
Hi Jun,
Thanks for your comments and thorough review.
I’ve added a new ShareGroupMemberMetadata record which I refer to below.
Each member in a share group has a ShareGroupMemberMetadata record.
Previously, members needed to rejoin the share group if the coordinator changed,
but now the member ID
Hi, Andrew,
Thanks for the updated KIP. A few more comments.
40. It would be useful to document the flow for serving the initial
ShareGroupHeartbeatRequest. The coordinator needs to (1) write the
ConsumerGroupMetadata record, the ConsumerGroupMemberMetadata record, the
initial ShareGroupPartition
Hi Andrew (and Omnia),
Thanks for the KIP. I hope to provide some feedback on this KIP soon, but I
had a thought on the specific subject of group configs and MM2. If brokers
validate for known groups configs then doesn't this induce an ordering
requirement on upgrading clusters: Wouldn't you have
Hi Omnia,
Thanks for your questions.
The DR angle on `group.type` is interesting and I had not considered it. The
namespace of groups contains
both consumer groups and share groups, so I was trying to ensure that which
group type was used was
deterministic rather than a race to create the first
Hi Andrew,
Thanks for the KIP it is definitely an interesting read. I have few questions
As the KIP proposing extending `AdminClient.incrementalAlterConfigs` to add an
explicit `group.type` what would this means for DR feature in MM2 offering?
Right now MM2 sync consumer group offsets from sour
Hi Justine,
Thanks for your questions.
There are several limits in this KIP. With consumer groups, we see problems
where there are huge numbers of consumer groups, and we also see problems
when there are huge number of members in a consumer group.
There’s a limit on the number of members in share
Thanks Andrew,
That answers some of the questions I have.
With respect to the limits -- how will this be implemented? One issue we
saw with producers is "short-lived" producers that send one message and
disconnect.
Due to how expiration works for producer state, if we have a simple limit
for prod
Hi Justine,
Thanks for your comment. Sorry for the delay responding.
It was not my intent to leave a query unanswered. I have modified the KIP as a
result
of the discussion and I think perhaps I didn’t neatly close off the email
thread.
In summary:
* The share-partition leader does not maintai
Hey Andrew,
I noticed you started the voting thread, but there seems to be a few
questions that were not answered. One was Jun's about memory usage
> How much additional heap memory will the server use? Do we need to cache
records in heap? If so, is the cache bounded?
Your response was
> This are
Hi Manikumar,
Thanks for your queries.
1) Delivery count is added to the ConsumerRecord class so that a consumer can
tell how often a record has been processed. I imagine that some applications
might
want to take different actions based on whether a record has previously failed.
This
enables ric
Hi Andrew,
Thanks for the updated KIP. Few queries below:
1. What is the use-case of deliveryCount in ShareFetchResponse?
2. During delete share groups, Do we need to clean any in-memory state from
share-partition leaders?
3. Any metrics for the share-coordinator?
Thanks
Manikumar
On Wed, Feb 2
Hi Manikumar,
Thanks for your comments.
1. I believe that in general, there are not situations in which a dynamic config
change is prevented because of the existence of a resource. So, if we prevented
setting config `group.type=consumer` on resource G of GROUP type
if there was a share group G in
Hi Jun,
Thanks for your comments. A couple of your questions I will leave unanswered for
now because there is some research needed.
10. This area needs more work. Using a share group surely gets the broker to do
more manipulation of the data that it fetches than a regular consumer. I want
to mini
Hi Andrew,
Thanks for the KIP. A few comments below.
1. kafka-configs.sh (incrementalAlterConfigs) allows you to dynamically
change the configs. Maybe in this case, we should not allow the user to
change `group.type` if it's already set.
2. What's the behaviour after a group transitions into DEAD
Hi, Andrew,
Thanks for the reply.
10. The impact from doing server side filtering is that we lose zero-copy
transfer, which provides two potential benefits: (1) more direct/efficient
transfer from disk to network; (2) less memory usage in heap since we don't
need to copy the records to heap to se
Hi Chirag,
Thanks for your question.
28. Good catch. Those options were omitted in error. I will update the KIP.
Thanks,
Andrew
> On 12 Feb 2024, at 13:06, Chirag Wadhwa wrote:
>
> Hi Andrew,
>
> Thank you for the KIP, it is a great read ! I just have a small question.
>
> 28. I noticed that th
Hi Andrew,
Thank you for the KIP, it is a great read ! I just have a small question.
28. I noticed that the "*--state*" and "*--timeout*" options are not
mentioned for the kafka-share-groups.sh tool. Was this omission
intentional, or is it possibly an oversight in the KIP?
Thanks,
Chirag
On Mon,
Hi Jun
Thanks for your comments.
10. For read-uncommitted isolation level, the consumer just reads all records.
For read-committed isolation level, the share-partition leader does the
filtering to
enable correct skipping of aborted records. The consumers in a share group are
not
aware of the fil
Hi, Andrew,
Thanks for the KIP. A few comments below.
10. ShareFetchResponse: To consume transactional data, currently
FetchResponse includes the AbortedTransactions fields for the client to
properly skip aborted records. ShareFetchResponse doesn't include that. How
do we prevent the consumer fro
Hi Jack,
Thanks for your comments.
I have added a new section on Log Retention which describes the behaviour of
the SPSO as the LSO advances. That makes total sense
and was an omission from the KIP.
I have added the other ideas as potential future work. I do like the idea of
having the SPSO inf
I would like to see more explicit discussion of topic retention and share
groups. There are a few options here from simple to more sophisticated. There
are also topic-level and share-group level options.
The simple thing would be to ensure that the SPSO of each share group is
bounded by the Log
Hi Matthias,
Thanks for your comments. This KIP is clearly quite a big piece of work and
it’s not complete at this stage. I think it’s a good
principle to develop it in the community. I’m prototyping the code and will
soon start filling in some of the missing details
with concrete proposals for c
Hi Kamal,
Thanks for your comments. Sorry for the delay. Just back from vacation.
101) When I was drafting the KIP, I did try an approach which allowed arbitrary
order
of acknowledgement in the consumer but I wasn’t happy with the usability.
The records delivered to a specific consumer in a batc
There is another detail about EOS that is important I guess.
Messages written into topic-partitions, are only marked as
"transactional", but when we commit (or abort), we only write an
additional "tx marker" into the partition (the original message is not
touched). If we deliver "pending" mess
Yes, I think it's clear now, thank you.
I agree that allowing reading behind the LSO would require more work on the
broker side (we would need 1 more state for the messages, and transition
when the LSO moves forward), but I don't see the extra complexity on the
consumer side. Based on the KIP so fa
Daniel, sure.
To allow the client to filter aborted messages, the broker currently
attaches metadata that tell the client which records were aborted. But
the first message after the LSO is a messages in pending state, ie, it
was neither committed nor aborted yet, so it's not possible to filter
Hi Matthias,
Can you please elaborate on this: "First, you need to understand that
aborted records are filtered client side, and thus for "read-committed" we
can never read beyond the LSO, and the same seems to apply for queuing."
I don't understand the connection here - what does skipping aborted
Thanks for the KIP.
It seems we are in very early stage, and some very important sections in
the KIP are still marked as TODO. In particular, I am curious about the
protocol changes, how the "queuing state" will be represented and made
durable, and all the error edge case / fail-over / fencing
Hi Andrew,
Thank you for the KIP -- interesting read. I have some questions:
101. "The calls to KafkaConsumer.acknowledge(ConsumerRecord,
AcknowledgeType) must be
issued in the order in which the records appear in the ConsumerRecords
object, which will
be in order of increasing offset for each sh
Hi Daniel,
True, I see your point. It’s analogous to a KafkaConsumer fetching
uncommitted records but not delivering them to the application.
Thanks,
Andrew
> On 7 Jun 2023, at 16:38, Dániel Urbán wrote:
>
> Hi Andrew,
>
> I think the "pending" state could be the solution for reading beyond the
Hi Andrew,
I think the "pending" state could be the solution for reading beyond the
LSO. Pending could indicate that a message is not yet available for
consumption (so they won't be offered for consumers), but with transactions
ending, they can become "available". With a pending state, records wou
Hi Daniel,
Kind of. I don’t want a transaction abort to cause disappearance of
records which are already in-flight. A “pending” state doesn’t seem
helpful for read_committed. There’s no such disappearance problem
for read_uncommitted.
Thanks,
Andrew
> On 7 Jun 2023, at 16:19, Dániel Urbán wrote:
Hi Andrew,
I agree with having a single isolation.level for the whole group, it makes
sense.
As for:
"b) The default isolation level for a share group is read_committed, in
which case
the SPSO and SPEO cannot move past the LSO."
With this limitation (SPEO not moving beyond LSO), are you trying to
HI Daniel,
I’ve been thinking about this question and I think this area is a bit tricky.
If there are some consumers in a share group with isolation level
read_uncommitted
and other consumers with read_committed, they have different expectations with
regards to which messages are visible when EOS
Hi Andrew,
Thank you for the clarification. One follow-up to read_committed mode:
Taking the change in message ordering guarantees into account, does this
mean that in queues, share-group consumers will be able to consume
committed records AFTER the LSO?
Thanks,
Daniel
Andrew Schofield ezt írta (
Hi Daniel,
Thanks for your questions.
1) Yes, read_committed fetch will still be possible.
2) You weren’t wrong that this is a broad question :)
Broadly speaking, I can see two ways of managing the in-flight records:
the share-partition leader does it, or the share-group coordinator does it.
I w
Hi Andrew,
Thank you for the KIP, exciting work you are doing :)
I have 2 questions:
1. I understand that EOS won't be supported for share-groups (yet), but
read_committed fetch will still be possible, correct?
2. I have a very broad question about the proposed solution: why not let
the share-gro
Yes, that’s it. I imagine something similar to KIP-848 for managing the share
group
membership, and consumers that fetch records from their assigned partitions and
acknowledge when delivery completes.
Thanks,
Andrew
> On 30 May 2023, at 16:52, Adam Warski wrote:
>
> Thanks for the explanation!
Thanks for the explanation!
So effectively, a share group is subscribed to each partition - but the data is
not pushed to the consumer, but only sent on demand. And when demand is
signalled, a batch of messages is sent?
Hence it would be up to the consumer to prefetch a sufficient number of batc
Hi Adam,
Thanks for your question.
With a share group, each fetch is able to grab available records from any
partition. So, it alleviates
the “head-of-line” blocking problem where a slow consumer gets in the way.
There’s no actual
stealing from a slow consumer, but it can be overtaken and must c
en this KIP is targeting versions of Kafka
>>>> which will all have KIP-848.
>>>>
>>>> The two cases you mention:
>>>> i) If a bad consumer doesn't even heartbeat, it will be ejected from
>> the group. This does not involve a rebalance.
&g
other consumers, and
>> thus bump the delivery count to the limit.
>> This is unlikely but not impossible.
>>
>> 6) Processing semantics
>> Delivery is at-least-once.
>>
>> 7) Acronyms
>> I hadn't thought about the implications of "Kafka SEO"
Hello,
thank you for the proposal! A very interesting read.
I do have one question, though. When you subscribe to a topic using consumer
groups, it might happen that one consumer has processed all messages from its
partitions, while another one still has a lot of work to do (this might be due
volve a rebalance.
> > > ii) If a bad consumer heartbeats but always times out message
> processing, it will slow the advancement of the SSO/SEO. There
> > > is the possibility that such a consumer would invalidate completely
> valid messages. In order to do this, it woul
count to the limit.
> > This is unlikely but not impossible.
> >
> > 6) Processing semantics
> > Delivery is at-least-once.
> >
> > 7) Acronyms
> > I hadn't thought about the implications of "Kafka SEO". I think I'll change
> >
) and
> "Share Partition End Offset" (SPEO).
>
> There is a lot of work ahead for this KIP. I intend to work on the protocol
> changes next.
>
> Thanks for getting involved in the discussion.
> Andrew
>
> From: Stanislav Kozlovski
> Sent: 22 May 2023 11:20
> To:
changes next.
Thanks for getting involved in the discussion.
Andrew
From: Stanislav Kozlovski
Sent: 22 May 2023 11:20
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-932: Queues for Kafka
Hey Andrew!
Kudos on the proposal. It is greatly written - a joy to read. It is
definitely an interesting
This is such a great proposal. Parallel/Cooperative consumption is a long
missing pattern in Apache Kafka and it is a viable solution for the common
head-of-line-blocking problem. Developers have long attempted to solve this
with bespoke consumer proxies that state manage the inflight payloads, so
Wow looks very cool.
One Q.
In the Future Work session:
> The concept can be extended to give key-based ordering so that partial
ordering and fine-grained sharing can be achieved at the same time.
I think this will be a pretty important feature needed to make shared
consumer groups useful. Perh
Hey Andrew!
Kudos on the proposal. It is greatly written - a joy to read. It is
definitely an interesting solution to the queueing problem - I would not
have guessed we could solve it like this. Thank you for working on this.
Happy to get the discussion started - I have a few comments/questions o
Hi,
I would like to start a discussion thread on KIP-932: Queues for Kafka. This
KIP proposes an alternative to consumer groups to enable cooperative
consumption by consumers without partition assignment. You end up with queue
semantics on top of regular Kafka topics, with per-message acknowledg
85 matches
Mail list logo