Hi, Andrew,

Removing AdminClient.listGroups() and the LisGroups RPC for now sounds good
to me.

Thanks,

Jun

On Mon, May 6, 2024 at 11:10 AM Andrew Schofield <andrew_schofi...@live.com>
wrote:

> Hi Jun,
> Thanks for the reply.
>
> 162. I’ve mentioned before that I plan another KIP for administration
> of groups. I think this is heading into that territory. I would like
> listGroups() to do a comprehensive job of returning all of the groups,
> and that does include groups which aren’t currently covered by GroupType.
> There’s a single namespace for all groups, and if someone is to make sense
> of the groups in a cluster, I think they need to be able to see them all.
> This is really just putting an API on top of the ListGroups RPC that
> already
> exists.
>
> I don’t think it would be desirable for connect-based groups to
> have GroupType.UNKNOWN. There are other custom group types.
> This all needs sorting out, I think.
>
> I propose to remove AdminClient.listGroups() from this KIP, and put
> it in the administration KIP.
>
> Let me know what you think.
>
> Thanks,
> Andrew
>
>
> > On 6 May 2024, at 18:04, Jun Rao <j...@confluent.io.INVALID> wrote:
> >
> > Hi, Andrew,
> >
> > Thanks for the reply.
> >
> > 162. It's fine to start with just the group type. Since ListGroups() is a
> > generic API, I want to make sure that it covers all existing groups.
> > Currently, GroupType only has "classic" and "consumer", both of which
> seem
> > to be related to groups formed by consumers since it's part of
> > ConsumerGroupDescription. Does ListGroup() return connect based groups
> and
> > if so, what's the GroupType? If ListGroup() doesn't cover all groups,
> > should we name it more accurately?
> >
> > Jun
> >
> >
> > On Fri, May 3, 2024 at 7:51 PM Andrew Schofield <
> andrew_schofi...@live.com>
> > wrote:
> >
> >> Hi Jun,
> >> Thanks for your reply.
> >>
> >> 161. ShareGroupListing and ShareGroupDescription are using
> >> the same pattern as ConsumerGroupListing and
> >> ConsumerGroupDescription. I have gone for consistency which
> >> I think is probably best here. It’s what I would expect if I had
> previously
> >> used the admin API for consumer groups and was looking to use it for
> >> share groups. I agree it’s a bit weird.
> >>
> >> 162. GroupListing contains the only information which is properly
> >> in common between a ConsumerGroupListing and a ShareGroupListing.
> >> ListGroupsResponse.ProtocolType is interpreted to provide the
> >> group type. I know that the ListGroups RPC also includes the group
> >> state, but that’s as a string and there’s no common enum for the states
> >> of all types of group. As a result, I have exposed group type but not
> >> state on this API.
> >>
> >> Previously in the discussion for this KIP, I mentioned that I would
> >> create another KIP for the administration of groups, in particular
> >> how the administrator can ensure that particular group IDs
> >> are used for the group type they desire. At the moment, I think
> >> keeping ListGroups in this KIP is a good idea. If we actually want
> >> to make it more sophisticated, perhaps that would be better with
> >> the group administration KIP.
> >>
> >> 163. It will be one higher than the latest version at the time we are
> >> ready to deliver this feature for real. When we are on the cusp of
> >> delivery, I’ll update the KIP with the final value.
> >>
> >> 164. KRaft only. All the RPCs are “broker” only. None of the code will
> >> be merged until after 3.8 has branched.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>> On 4 May 2024, at 00:12, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>
> >>> Hi, Andrew,
> >>>
> >>> Thanks for the reply. A few more comments.
> >>>
> >>> 161. ShareGroupListing.state() returns an optional, but
> >>> ShareGroupDescription.state() does not. Should we make them consistent?
> >>> Also, it seems a bit weird to return optional with an UNKNOWN state.
> >>>
> >>> 162. Should GroupListing include ProtocolType and GroupState too?
> >>>
> >>> 163. What is the value of group.version to gate the queuing feature?
> >>>
> >>> 164. Is the queueing feature only supported on KRaft clusters? For
> >> example,
> >>> the feature tool seems to be built only for the KRaft cluster.
> >>>
> >>> Jun
> >>>
> >>> On Fri, May 3, 2024 at 10:32 AM Andrew Schofield <
> >> andrew_schofi...@live.com>
> >>> wrote:
> >>>
> >>>> Hi Jun,
> >>>> Thanks for your reply.
> >>>>
> >>>> 147. Yes, I see what you mean. The rebalance latency will indeed
> >>>> be very short by comparison. I have removed the rebalance latency
> >>>> metrics from the client and retained the rebalance count and rate.
> >>>>
> >>>> 150. Yes, I think so. I have tweaked the text so that the simple
> >>>> assignor will take into account existing assignment information when
> >>>> it has it, which would just minimise unnecessary churn of (b).
> >>>>
> >>>> 158. I’ve changed it to ReadShareGroupStateSummary.
> >>>>
> >>>> Thanks,
> >>>> Andrew
> >>>>
> >>>>
> >>>>> On 3 May 2024, at 22:17, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>>>
> >>>>> Hi, Andrew,
> >>>>>
> >>>>> Thanks for the reply.
> >>>>>
> >>>>> 147. There seems to be some difference between consumer groups and
> >> share
> >>>>> groups. In the consumer groups, if a client receives a heartbeat
> >> response
> >>>>> to revoke some partitions, it may have to commit offsets before
> >> revoking
> >>>>> partitions or it may have to call the rebalance callbacks provided by
> >> the
> >>>>> user. This may take some time and can be reflected in the rebalance
> >> time
> >>>>> metric. In the share groups, none of that exists. If a client
> receives
> >>>> some
> >>>>> added/revoked partitions, it accepts them immediately, right? So,
> does
> >>>> that
> >>>>> practically make the rebalance time always 0?
> >>>>>
> >>>>> 150. I guess in the common case, there will be many more members than
> >>>>> partitions. So the need for (b) will be less common. We can probably
> >>>> leave
> >>>>> the persisting of the assignment out for now.
> >>>>>
> >>>>> 158. The new name sounds good to me.
> >>>>>
> >>>>> Jun
> >>>>>
> >>>>> On Thu, May 2, 2024 at 10:21 PM Andrew Schofield <
> >>>> andrew_schofi...@live.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Jun,
> >>>>>> Thanks for the response.
> >>>>>>
> >>>>>> 147. I am trying to get a correspondence between the concepts and
> >>>>>> metrics of consumer groups and share groups. In both cases,
> >>>>>> the client doesn’t strictly know when the rebalance starts. All it
> >> knows
> >>>>>> is when it has work to do in order to perform its part of a
> rebalance.
> >>>>>> I am proposing that share groups and consumer groups use
> >>>>>> equivalent logic.
> >>>>>>
> >>>>>> I could remove the rebalance metrics from the client because I do
> >>>>>> understand that they are making a judgement about when a rebalance
> >>>>>> starts, but it’s their own part of the rebalance they are measuring.
> >>>>>>
> >>>>>> I tend to think these metrics are better than no metrics and
> >>>>>> will at least enable administrators to see how much rebalance
> >>>>>> activity the members of share groups are experiencing.
> >>>>>>
> >>>>>> 150. The simple assignor does not take existing assignments into
> >>>>>> consideration. The ShareGroupPartitionAssignor interface would
> >>>>>> permit this, but the simple assignor does not currently use it.
> >>>>>>
> >>>>>> The simple assignor assigns partitions in two ways:
> >>>>>> a) Distribute the members across the partitions by hashed member ID.
> >>>>>> b) If any partitions have no members assigned, distribute the
> members
> >>>>>> across these partitions round-robin.
> >>>>>>
> >>>>>> The (a) partitions will be quite stable. The (b) partitions will be
> >> less
> >>>>>> stable. By using existing assignment information, it could make (b)
> >>>>>> partition assignment more stable, whether the assignments are
> >>>>>> persisted or not. Perhaps it would be worth changing the simple
> >>>>>> assignor in order to make (b) more stable.
> >>>>>>
> >>>>>> I envisage more sophisticated assignors in the future which could
> use
> >>>>>> existing assignments and also other dynamic factors such as lag.
> >>>>>>
> >>>>>> If it transpires that there is significant benefit in persisting
> >>>>>> assignments
> >>>>>> specifically to help smooth assignment in the event of GC change,
> >>>>>> it would be quite an easy enhancement. I am not inclined to persist
> >>>>>> the assignments in this KIP.
> >>>>>>
> >>>>>> 158. Ah, yes. I see. Of course, I want the names as consistent and
> >>>>>> understandable too. I suggest renaming
> >>>>>> ReadShareGroupOffsetsState to ReadShareGroupStateSummary.
> >>>>>> I haven’t changed the KIP yet, so let me know if that’s OK.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Andrew
> >>>>>>
> >>>>>>> On 2 May 2024, at 22:18, Jun Rao <j...@confluent.io.INVALID> wrote:
> >>>>>>>
> >>>>>>> Hi, Andrew,
> >>>>>>>
> >>>>>>> Thanks for the reply.
> >>>>>>>
> >>>>>>> 147. " it makes a judgement about whether an assignment received is
> >>>> equal
> >>>>>>> to what it already is using."
> >>>>>>> If a client receives an assignment different from what it has, it
> >>>>>> indicates
> >>>>>>> the end of the rebalance. But how does the client know when the
> >>>> rebalance
> >>>>>>> starts? In the shareHeartbeat design, the new group epoch is
> >> propagated
> >>>>>>> together with the new assignment in the response.
> >>>>>>>
> >>>>>>> 150. It could be a potential concern if each GC change forces
> >>>> significant
> >>>>>>> assignment changes. Does the default assignor take existing
> >> assignments
> >>>>>>> into consideration?
> >>>>>>>
> >>>>>>> 155. Good point. Sounds good.
> >>>>>>>
> >>>>>>> 158. My concern with the current naming is that it's not clear what
> >> the
> >>>>>>> difference is between ReadShareGroupOffsetsState and
> >>>> ReadShareGroupState.
> >>>>>>> The state in the latter is also offsets.
> >>>>>>>
> >>>>>>> Jun
> >>>>>>>
> >>>>>>> On Wed, May 1, 2024 at 9:51 PM Andrew Schofield <
> >>>>>> andrew_schofi...@live.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Jun,
> >>>>>>>> Thanks for your reply.
> >>>>>>>>
> >>>>>>>> 147. Perhaps the easiest is to take a look at the code in
> >>>>>>>> o.a.k.clients.consumer.internal.MembershipManagerImpl.
> >>>>>>>> This class is part of the new consumer group protocol
> >>>>>>>> code in the client. It makes state transitions based on
> >>>>>>>> the heartbeat requests and responses, and it makes a
> >>>>>>>> judgement about whether an assignment received is
> >>>>>>>> equal to what it already is using. When a state transition
> >>>>>>>> is deemed to be the beginning or end of a rebalance
> >>>>>>>> from the point of view of this client, it counts towards the
> >>>>>>>> rebalance metrics.
> >>>>>>>>
> >>>>>>>> Share groups will follow the same path.
> >>>>>>>>
> >>>>>>>> 150. I do not consider it a concern. Rebalancing a share group
> >>>>>>>> is less disruptive than rebalancing a consumer group. If the
> >> assignor
> >>>>>>>> Has information about existing assignments, it can use it. It is
> >>>>>>>> true that this information cannot be replayed from a topic and
> will
> >>>>>>>> sometimes be unknown as a result.
> >>>>>>>>
> >>>>>>>> 151. I don’t want to rename TopicPartitionsMetadata to
> >>>>>>>> simply TopicPartitions (it’s information about the partitions of
> >>>>>>>> a topic) because we then have an array of plurals.
> >>>>>>>> I’ve renamed Metadata to Info. That’s a bit less cumbersome.
> >>>>>>>>
> >>>>>>>> 152. Fixed.
> >>>>>>>>
> >>>>>>>> 153. It’s the GC. Fixed.
> >>>>>>>>
> >>>>>>>> 154. The UNKNOWN “state” is essentially a default for situations
> >> where
> >>>>>>>> the code cannot understand data it received. For example, let’s
> say
> >>>> that
> >>>>>>>> Kafka 4.0 has groups with states EMPTY, STABLE, DEAD. If Kafka 4.1
> >>>>>>>> introduced another state THINKING, a tool built with Kafka 4.0
> would
> >>>> not
> >>>>>>>> know what THINKING meant. It will use “UNKNOWN” to indicate that
> the
> >>>>>>>> state was something that it could not understand.
> >>>>>>>>
> >>>>>>>> 155. No, it’s a the level of the share-partition. If the offsets
> for
> >>>>>> just
> >>>>>>>> one share-partition is reset, only the state epoch for that
> >> partition
> >>>> is
> >>>>>>>> updated.
> >>>>>>>>
> >>>>>>>> 156. Strictly speaking, it’s redundant. I think having the
> >> StartOffset
> >>>>>>>> separate gives helpful clarity and I prefer to retain it.
> >>>>>>>>
> >>>>>>>> 157. Yes, you are right. There’s no reason why a leader change
> needs
> >>>>>>>> to force a ShareSnapshot. I’ve added leaderEpoch to the
> ShareUpdate.
> >>>>>>>>
> >>>>>>>> 158. Although ReadShareGroupOffsetsState is a bit of a mouthful,
> >>>>>>>> having “State” in the name makes it clear that this one the family
> >> of
> >>>>>>>> inter-broker RPCs served by the share coordinator. The admin RPCs
> >>>>>>>> such as DescribeShareGroupOffsets do not include “State”.
> >>>>>>>>
> >>>>>>>> 159. Fixed.
> >>>>>>>>
> >>>>>>>> 160. Fixed.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Andrew
> >>>>>>>>
> >>>>>>>>> On 2 May 2024, at 00:29, Jun Rao <j...@confluent.io.INVALID>
> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi, Andrew,
> >>>>>>>>>
> >>>>>>>>> Thanks for the reply.
> >>>>>>>>>
> >>>>>>>>> 147. "The measurement is certainly from the point of view of the
> >>>>>> client,
> >>>>>>>>> but it’s driven by sending and receiving heartbeats rather than
> >>>> whether
> >>>>>>>> the
> >>>>>>>>> client triggered the rebalance itself."
> >>>>>>>>> Hmm, how does a client know which heartbeat response starts a
> >>>>>> rebalance?
> >>>>>>>>>
> >>>>>>>>> 150. PartitionAssignor takes existing assignments into
> >> consideration.
> >>>>>>>> Since
> >>>>>>>>> GC doesn't persist the assignment for share groups, it means that
> >>>>>>>>> ShareGroupPartitionAssignor can't reliably depend on existing
> >>>>>>>> assignments.
> >>>>>>>>> Is that a concern?
> >>>>>>>>>
> >>>>>>>>> 151. ShareGroupPartitionMetadataValue: Should we rename
> >>>>>>>>> TopicPartitionsMetadata and TopicMetadata since there is no
> >> metadata?
> >>>>>>>>>
> >>>>>>>>> 152. ShareGroupMetadataKey: "versions": "3"
> >>>>>>>>> The versions should be 11.
> >>>>>>>>>
> >>>>>>>>> 153. ShareGroupDescription.coordinator(): The description says
> "The
> >>>>>> share
> >>>>>>>>> group coordinator". Is that the GC or SC?
> >>>>>>>>>
> >>>>>>>>> 154. "A share group has only three states - EMPTY , STABLE and
> >> DEAD".
> >>>>>>>>> What about UNKNOWN?
> >>>>>>>>>
> >>>>>>>>> 155. WriteShareGroupState: StateEpoch is at the group level, not
> >>>>>>>> partition
> >>>>>>>>> level, right?
> >>>>>>>>>
> >>>>>>>>> 156. ShareSnapshotValue: Is StartOffset redundant since it's the
> >> same
> >>>>>> as
> >>>>>>>>> the smallest FirstOffset in StateBatches?
> >>>>>>>>>
> >>>>>>>>> 157. Every leader change forces a ShareSnapshotValue write to
> >> persist
> >>>>>> the
> >>>>>>>>> new leader epoch. Is that a concern? An alternative is to include
> >>>>>>>>> leaderEpoch in ShareUpdateValue.
> >>>>>>>>>
> >>>>>>>>> 158. ReadShareGroupOffsetsState: The state is the offsets. Should
> >> we
> >>>>>>>> rename
> >>>>>>>>> it to something like ReadShareGroupStartOffset?
> >>>>>>>>>
> >>>>>>>>> 159. members are assigned members round-robin => members are
> >> assigned
> >>>>>>>>> round-robin
> >>>>>>>>>
> >>>>>>>>> 160. "may called": typo
> >>>>>>>>>
> >>>>>>>>> Jun
> >>>>>>>>>
> >>>>>>>>> On Mon, Apr 29, 2024 at 10:11 AM Andrew Schofield <
> >>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Jun,
> >>>>>>>>>> Thanks for the reply and sorry for the delay in responding.
> >>>>>>>>>>
> >>>>>>>>>> 123. Yes, I didn’t quite get your point earlier. The member
> >>>>>>>>>> epoch is bumped by the GC when it sends a new assignment.
> >>>>>>>>>> When the member sends its next heartbeat, it echoes back
> >>>>>>>>>> the member epoch, which will confirm the receipt of the
> >>>>>>>>>> assignment. It would send the same member epoch even
> >>>>>>>>>> after recovery of a network disconnection, so that should
> >>>>>>>>>> be sufficient to cope with this eventuality.
> >>>>>>>>>>
> >>>>>>>>>> 125. Yes, I have added it to the table which now matches
> >>>>>>>>>> the text earlier in the KIP. Thanks.
> >>>>>>>>>>
> >>>>>>>>>> 140. Yes, I have added it to the table which now matches
> >>>>>>>>>> the text earlier in the KIP. I’ve also added more detail for
> >>>>>>>>>> the case where the entire share group is being deleted.
> >>>>>>>>>>
> >>>>>>>>>> 141. Yes! Sorry for confusing things.
> >>>>>>>>>>
> >>>>>>>>>> Back to the original question for this point. To delete a share
> >>>>>>>>>> group, should the GC write a tombstone for each
> >>>>>>>>>> ShareGroupMemberMetadata record?
> >>>>>>>>>>
> >>>>>>>>>> Tombstones are necessary to delete ShareGroupMemberMetadata
> >>>>>>>>>> records. But, deletion of a share group is only possible when
> >>>>>>>>>> the group is already empty, so the tombstones will have
> >>>>>>>>>> been written as a result of the members leaving the group.
> >>>>>>>>>>
> >>>>>>>>>> 143. Yes, that’s right.
> >>>>>>>>>>
> >>>>>>>>>> 147. The measurement is certainly from the point of view
> >>>>>>>>>> of the client, but it’s driven by sending and receiving
> heartbeats
> >>>>>>>>>> rather than whether the client triggered the rebalance itself.
> >>>>>>>>>> The client decides when it enters and leaves reconciliation
> >>>>>>>>>> of the assignment, and measures this period.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Andrew
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> On 26 Apr 2024, at 09:43, Jun Rao <j...@confluent.io.INVALID>
> >>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>
> >>>>>>>>>>> 123. "Rather than add the group epoch to the
> >> ShareGroupHeartbeat, I
> >>>>>>>> have
> >>>>>>>>>>> decided to go for TopicPartitions in ShareGroupHeartbeatRequest
> >>>> which
> >>>>>>>>>>> mirrors ConsumerGroupHeartbeatRequest."
> >>>>>>>>>>> ShareGroupHeartbeat.MemberEpoch is the group epoch, right? Is
> >> that
> >>>>>>>> enough
> >>>>>>>>>>> for confirming the receipt of the new assignment?
> >>>>>>>>>>>
> >>>>>>>>>>> 125. This also means that "Alter share group offsets" needs to
> >>>> write
> >>>>>> a
> >>>>>>>>>>> ShareGroupPartitionMetadata record, if the partition is not
> >> already
> >>>>>>>>>>> initialized.
> >>>>>>>>>>>
> >>>>>>>>>>> 140. In the table for "Delete share group offsets", we need to
> >> add
> >>>> a
> >>>>>>>> step
> >>>>>>>>>>> to write a ShareGroupPartitionMetadata record with
> >> DeletingTopics.
> >>>>>>>>>>>
> >>>>>>>>>>> 141. Hmm, ShareGroupMemberMetadata is stored in the
> >>>>>> __consumer_offsets
> >>>>>>>>>>> topic, which is a compacted topic, right?
> >>>>>>>>>>>
> >>>>>>>>>>> 143. So, the client sends DescribeShareGroupOffsets requests to
> >> GC,
> >>>>>>>> which
> >>>>>>>>>>> then forwards it to SC?
> >>>>>>>>>>>
> >>>>>>>>>>> 147. I guess a client only knows the rebalance triggered by
> >> itself,
> >>>>>> but
> >>>>>>>>>> not
> >>>>>>>>>>> the ones triggered by other members or topic/partition changes?
> >>>>>>>>>>>
> >>>>>>>>>>> Jun
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Apr 25, 2024 at 4:19 AM Andrew Schofield <
> >>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>> Thanks for the response.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 123. Of course, ShareGroupHearbeat started off as
> >>>>>>>> ConsumerGroupHeartbeat
> >>>>>>>>>>>> and then unnecessary fields were removed. In the network issue
> >>>> case,
> >>>>>>>>>>>> there is not currently enough state being exchanged to be sure
> >> an
> >>>>>>>>>>>> assignment
> >>>>>>>>>>>> was received.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Rather than add the group epoch to the ShareGroupHeartbeat, I
> >> have
> >>>>>>>>>> decided
> >>>>>>>>>>>> to go for TopicPartitions in ShareGroupHeartbeatRequest which
> >>>>>> mirrors
> >>>>>>>>>>>> ConsumerGroupHeartbeatRequest. It means the share group member
> >>>> does
> >>>>>>>>>>>> confirm the assignment it is using, and that can be used by
> the
> >> GC
> >>>>>> to
> >>>>>>>>>>>> safely
> >>>>>>>>>>>> stop repeating the assignment in heartbeat responses.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 125. Ah, yes. This is indeed something possible with a
> consumer
> >>>>>> group
> >>>>>>>>>>>> and share groups should support it too. This does of course
> >> imply
> >>>>>> that
> >>>>>>>>>>>> ShareGroupPartitionMetadataValue needs an array of partitions,
> >> not
> >>>>>>>>>>>> just the number.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 140. Yes, good spot. There is an inconsistency here in
> consumer
> >>>>>> groups
> >>>>>>>>>>>> where you can use AdminClient.deleteConsumerGroupOffsets at
> the
> >>>>>>>>>>>> partition level, but kafka-consumer-groups.sh --delete only
> >>>> operates
> >>>>>>>>>>>> at the topic level.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Personally, I don’t think it’s sensible to delete offsets at
> the
> >>>>>>>>>> partition
> >>>>>>>>>>>> level only. You can reset them, but if you’re actively using a
> >>>> topic
> >>>>>>>>>> with
> >>>>>>>>>>>> a share group, I don’t see why you’d want to delete offsets
> >> rather
> >>>>>>>> than
> >>>>>>>>>>>> reset. If you’ve finished using a topic with a share group and
> >>>> want
> >>>>>> to
> >>>>>>>>>>>> clean
> >>>>>>>>>>>> up, use delete.
> >>>>>>>>>>>>
> >>>>>>>>>>>> So, I’ve changed the AdminClient.deleteConsumerGroupOffsets to
> >> be
> >>>>>>>>>>>> topic-based and the RPCs behind it.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The GC reconciles the cluster state with the
> >>>>>>>> ShareGroupPartitionMetadata
> >>>>>>>>>>>> to spot deletion of topics and the like. However, when the
> >> offsets
> >>>>>> for
> >>>>>>>>>>>> a topic were deleted manually, the topic very like still
> exists
> >> so
> >>>>>>>>>>>> reconciliation
> >>>>>>>>>>>> alone is not going to be able to continue an interrupted
> >> operation
> >>>>>>>> that
> >>>>>>>>>>>> has started. So, I’ve added DeletingTopics back into
> >>>>>>>>>>>> ShareGroupPartitionMetadata for this purpose. It’s so failover
> >> of
> >>>> a
> >>>>>> GC
> >>>>>>>>>>>> can continue where it left off rather than leaving fragments
> >>>> across
> >>>>>>>> the
> >>>>>>>>>>>> SCs.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 141. That is not required. Because this is not a compacted
> >> topic,
> >>>> it
> >>>>>>>> is
> >>>>>>>>>>>> not necessary to write tombstones for every key. As long as
> >> there
> >>>>>> is a
> >>>>>>>>>>>> clear and unambiguous record for the deletion of the group,
> that
> >>>> is
> >>>>>>>>>> enough.
> >>>>>>>>>>>> The tombstone for ShareGroupPartitionMetadata is theoretically
> >> not
> >>>>>>>>>>>> required but it’s a single record, rather than one per member,
> >> so
> >>>> I
> >>>>>>>>>> prefer
> >>>>>>>>>>>> to leave it as a record that the interactions with the SC have
> >>>> been
> >>>>>>>>>>>> completed.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 142.
> >>>>>>>>>>>> 142.1. It will prompt the user to confirm they want to
> continue.
> >>>>>>>>>>>> This is in common with `kafka-consumer-groups.sh` which
> >>>> historically
> >>>>>>>>>>>> has defaulted to --dry-run behaviour, but is due to change to
> >>>>>>>> prompting
> >>>>>>>>>>>> if neither --dry-run nor --execute is specified “in a future
> >> major
> >>>>>>>>>>>> release”.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 142.2. It should support partition-level reset, but only
> >>>> topic-level
> >>>>>>>>>>>> delete.
> >>>>>>>>>>>> I have updated the usage text accordingly. This is in common
> >> with
> >>>>>>>>>>>> kafka-consumer-groups.sh.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 142.3. --dry-run displays the operation that would be
> executed.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 142.4. The valid values are: Dead, Empty, Stable. Added to the
> >>>>>>>>>>>> usage text.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 143. DescribeShareGroupOffsets is served by the group
> >> coordinator
> >>>>>>>>>>>> for this kind of reason.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 144. That’s the default. If you haven’t asked to release or
> >>>> reject,
> >>>>>> it
> >>>>>>>>>>>> accepts.
> >>>>>>>>>>>> This is analogous to fetching and committing offsets in a
> >> consumer
> >>>>>>>>>> group.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 145. I think this is a good idea, but I would prefer to defer
> it
> >>>>>>>> until a
> >>>>>>>>>>>> future
> >>>>>>>>>>>> metrics KIP that I have planned. In KIP-932, I have added
> basic
> >>>>>>>> metrics
> >>>>>>>>>>>> only.
> >>>>>>>>>>>> For example, you’ll see that there’s no concept of lag yet,
> >> which
> >>>>>>>> surely
> >>>>>>>>>>>> will have relevance for share groups. I plan to create and
> >> deliver
> >>>>>> the
> >>>>>>>>>>>> metrics KIP before share groups are declared ready for
> >> production.
> >>>>>>>>>>>> I want the new metrics to be developed with the experience of
> >>>>>> running
> >>>>>>>>>>>> the code.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 146. Milliseconds. KIP updated.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 147. There is a membership state machine in the client that
> >>>>>>>>>>>> changes states as the ShareGroupHeartbeat requests and
> responses
> >>>>>>>>>>>> flow. The duration of a rebalance will be shorter from the
> point
> >>>> of
> >>>>>>>>>>>> view of the share-group consumer because it doesn’t have to
> >> worry
> >>>>>>>> about
> >>>>>>>>>>>> rebalance callbacks and committing offsets as the partitions
> >> move
> >>>>>>>>>>>> around, but the overall flow is very similar. So, it’s the
> state
> >>>>>>>>>>>> transitions
> >>>>>>>>>>>> that drive the collection of the rebalance metrics.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 148. Strangely, none of the existing uses of
> >>>> records-per-request-avg
> >>>>>>>>>>>> actually have records-per-request-max. I tend to err on the
> side
> >>>> of
> >>>>>>>>>>>> consistency, but can’t think of any reason not to add this.
> >> Done.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 149. There are several error codes for
> >>>> WriteShareGroupStateResponse:
> >>>>>>>>>>>>
> >>>>>>>>>>>> NOT_COORDINATOR - This is not the share coordinator you’re
> >> looking
> >>>>>>>> for.
> >>>>>>>>>>>> COORDINATOR_NOT_AVAILABLE - The SC can’t
> >>>>>>>>>>>> COORDINATOR_LOAD_IN_PROGRESS - The SC is replaying the topic.
> >>>>>>>>>>>> GROUP_ID_NOT_FOUND - The SC doesn’t have state for this group.
> >>>>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION - The SC doesn’t have state for
> this
> >>>>>>>>>>>> topic-partition.
> >>>>>>>>>>>> FENCED_STATE_EPOCH - The write has the wrong state epoch.
> >>>>>>>>>>>> INVALID_REQUEST - There was a problem with the request.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Andrew
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 24 Apr 2024, at 19:10, Jun Rao <j...@confluent.io.INVALID>
> >>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the response.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 123. I thought the network issue can be covered with the
> group
> >>>>>> epoch.
> >>>>>>>>>>>>> Basically, if the assignment is to be changed, GC bumps up
> the
> >>>>>> epoch
> >>>>>>>>>>>> first,
> >>>>>>>>>>>>> but doesn't expose the new epoch to members until the
> >> assignment
> >>>> is
> >>>>>>>>>>>>> complete (including initializing the sharePartitionState).
> Once
> >>>> the
> >>>>>>>>>>>>> assignment is complete, GC includes the bumped up epoch and
> the
> >>>> new
> >>>>>>>>>>>>> assignment in the heartbeatResponse. If the next heartbeat
> >>>> Request
> >>>>>>>>>>>> includes
> >>>>>>>>>>>>> the new epoch, it means that the member has received the new
> >>>>>>>> assignment
> >>>>>>>>>>>> and
> >>>>>>>>>>>>> GC can exclude the assignment in the heartbeatResponse.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 125. "If AlterShareGroupOffsets is called for a
> topic-partition
> >>>>>> which
> >>>>>>>>>> is
> >>>>>>>>>>>>> not yet in ShareGroupPartitionMetadataValue, it’s not really
> >> part
> >>>>>> of
> >>>>>>>>>> the
> >>>>>>>>>>>>> group yet. So I think it’s permitted to fail the
> >>>>>>>> AlterShareGroupOffsets
> >>>>>>>>>>>>> because the topic-partition would not be part of
> >>>>>>>> ListShareGroupOffsets
> >>>>>>>>>>>>> result."
> >>>>>>>>>>>>> A user may want to initialize SPSO (e.g. based on timestamp)
> >>>> before
> >>>>>>>> the
> >>>>>>>>>>>>> application is first used. If we reject
> AlterShareGroupOffsets
> >>>> when
> >>>>>>>>>>>>> ShareGroupPartitionMetadataValue is empty, the user will be
> >>>> forced
> >>>>>> to
> >>>>>>>>>>>> start
> >>>>>>>>>>>>> the application with the wrong SPSO first and then reset it,
> >>>> which
> >>>>>>>> will
> >>>>>>>>>>>> be
> >>>>>>>>>>>>> inconvenient.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 140. AdminClient.deleteShareGroupOffsets allows deletion of
> >> SPSO
> >>>>>> for
> >>>>>>>>>>>>> individual partitions, but ShareGroupPartitionMetadataValue
> >> only
> >>>>>>>> tracks
> >>>>>>>>>>>>> consecutive partitions.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 141. Delete share group: Should GC also write a tombstone for
> >>>> each
> >>>>>>>>>>>>> ShareGroupMemberMetadata record?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 142. kafka-share-groups.sh
> >>>>>>>>>>>>> 142.1 What happens if neither --dry-run nor --execute is
> >>>> specified
> >>>>>>>> for
> >>>>>>>>>>>>> reset-offsets?
> >>>>>>>>>>>>> 142.2 Should it support --delete-offsets and --reset-offsets
> at
> >>>> the
> >>>>>>>>>>>>> partition level to match the AdminClient api?
> >>>>>>>>>>>>> 142.3 How useful is --dry-run? The corresponding RPCs don't
> >> carry
> >>>>>> the
> >>>>>>>>>>>>> dry-run flag.
> >>>>>>>>>>>>> 142.4 --state [String]: What are the valid state values?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 143. DescribeShareGroupOffsets is served by the
> share-partition
> >>>>>>>> leader.
> >>>>>>>>>>>> How
> >>>>>>>>>>>>> could a user describe the share group offset when there is no
> >>>>>> member?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 144. kafka-console-share-consumer.sh: Is there an option to
> >>>> accept
> >>>>>>>>>>>> consumed
> >>>>>>>>>>>>> messages?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 145. It would be useful to add a broker side metric that
> >> measures
> >>>>>> the
> >>>>>>>>>>>>> pressure from group.share.partition.max.record.locks, e.g.
> the
> >>>>>>>> fraction
> >>>>>>>>>>>> of
> >>>>>>>>>>>>> the time that SPL is blocked because
> >>>>>>>>>>>> group.share.partition.max.record.locks
> >>>>>>>>>>>>> is reached.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 146. heartbeat-response-time-max: What's the unit?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 147. rebalance related metrics: How does a share consumer
> know
> >>>> when
> >>>>>>>>>> there
> >>>>>>>>>>>>> is a rebalance and how does it measure the rebalance time?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 148. records-per-request-avg: Should we pair it with
> >>>>>>>>>>>>> records-per-request-max?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 149. If the shareGroupState is not available, what error code
> >> is
> >>>>>> used
> >>>>>>>>>> in
> >>>>>>>>>>>>> WriteShareGroupStateResponse?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Apr 24, 2024 at 7:12 AM Andrew Schofield <
> >>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>> Thanks for your reply.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 123. The GC only sends the assignment in ShareGroupHeartbeat
> >>>>>>>>>>>>>> response if the member has just joined, or the assignment
> has
> >>>> been
> >>>>>>>>>>>>>> recalculated. This latter condition is met when the GC fails
> >>>> over.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> With a consumer group, it works a little differently. The
> >>>>>>>> heartbeating
> >>>>>>>>>>>>>> occurs asynchronously with the reconciliation process in the
> >>>>>> client.
> >>>>>>>>>>>>>> The client has reconciliation callbacks, as well as offsets
> to
> >>>>>>>> commit
> >>>>>>>>>>>>>> before it can confirm that revocation has occured. So, it
> >> might
> >>>>>> take
> >>>>>>>>>>>>>> multiple heartbeats to complete the installation of a new
> >> target
> >>>>>>>>>>>>>> assignment in the client.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> With a share group, it’s more brutal. The GC sends the
> >>>> assignment
> >>>>>>>>>>>>>> in a heartbeat response, and the client is assumed to have
> >> acted
> >>>>>> on
> >>>>>>>>>>>>>> It immediately. Since share group assignment is really about
> >>>>>>>> balancing
> >>>>>>>>>>>>>> consumers across the available partitions, rather than
> safely
> >>>>>>>> handing
> >>>>>>>>>>>>>> ownership of partitions between consumers, there is less
> >>>>>>>> coordination.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I wonder whether there is one situation that does need
> >>>>>> considering.
> >>>>>>>>>>>>>> If the network connection between the client and the GC is
> >> lost,
> >>>>>> it
> >>>>>>>> is
> >>>>>>>>>>>>>> possible that a response containing the assignment is lost.
> >>>> Then,
> >>>>>>>>>>>>>> the connection will be reestablished, and the assignment
> will
> >>>> only
> >>>>>>>> be
> >>>>>>>>>>>>>> sent when it’s recalculated. Adding the equivalent of
> >>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest.TopicPartitions would be one
> >>>>>>>>>>>>>> way to close that.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 125. Yes, you are right. The reconciliation that I describe
> in
> >>>> the
> >>>>>>>>>>>>>> answer to (54) below is related. If AlterShareGroupOffsets
> is
> >>>>>> called
> >>>>>>>>>>>>>> for a topic-partition which is not yet in
> >>>>>>>>>>>> ShareGroupPartitionMetadataValue,
> >>>>>>>>>>>>>> it’s not really part of the group yet. So I think it’s
> >> permitted
> >>>>>> to
> >>>>>>>>>> fail
> >>>>>>>>>>>>>> the AlterShareGroupOffsets because the topic-partition would
> >> not
> >>>>>> be
> >>>>>>>>>>>>>> part of ListShareGroupOffsets result.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> If AlterShareGroupOffsets is called for a topic-partition
> >> which
> >>>> is
> >>>>>>>> in
> >>>>>>>>>>>>>> ShareGroupPartitionMetadataValue, then just calling
> >>>>>>>>>>>>>> InitializeShareGroupState to set its offset would be
> >> sufficient
> >>>>>>>>>> without
> >>>>>>>>>>>>>> writing ShareGroupPartitionMetadata again.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 138. Added.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 139. This week, I have been experimenting with using the
> >>>> existing
> >>>>>>>>>>>>>> consumer metrics with share consumer code. That was my plan
> to
> >>>> get
> >>>>>>>>>>>>>> started with metrics. While they work to some extent, I am
> not
> >>>>>>>>>> entirely
> >>>>>>>>>>>>>> happy with the result.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I have added a basic set of share consumer-specific metrics
> to
> >>>>>>>> KIP-932
> >>>>>>>>>>>>>> which I think is a step in the right direction. I
> anticipate a
> >>>>>>>> future
> >>>>>>>>>>>> KIP
> >>>>>>>>>>>>>> that defines many more metrics and tackles concepts such as
> >> what
> >>>>>>>>>>>>>> “lag” means for a share group. The metrics I’ve included are
> >>>>>> simply
> >>>>>>>>>>>>>> counting and measuring the operations, which is a good
> start.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 54. I think either with or without
> >>>>>> InitializingTopics/DeletingTopics
> >>>>>>>>>> in
> >>>>>>>>>>>>>> ShareGroupPartitionMetadataValue works. However, I think
> there
> >>>> is
> >>>>>>>>>>>>>> a deeper point behind what you said. The GC needs to be
> >>>> responsive
> >>>>>>>>>>>>>> to topic creation and deletion, and addition of partitions
> in
> >>>>>> cases
> >>>>>>>>>>>> where
> >>>>>>>>>>>>>> it doesn’t agree with InitializingTopics/DeletingTopics. So,
> >> we
> >>>>>> are
> >>>>>>>>>>>>>> probably not saving anything by having those fields. As a
> >>>> result,
> >>>>>> I
> >>>>>>>>>> have
> >>>>>>>>>>>>>> removed InitializingTopics and DeletingTopics and updated
> the
> >>>> KIP
> >>>>>>>>>>>>>> accordingly. The GC needs to reconcile its view of the
> >>>> initialised
> >>>>>>>>>>>>>> topic-partitions with ShareGroupPartitionMetadataValue and
> it
> >>>> will
> >>>>>>>>>>>>>> initialize or delete share-group state accordingly.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I have made one more change to the KIP. The
> >>>> SharePartitionAssignor
> >>>>>>>>>>>>>> interface has been renamed to
> >>>>>>>>>>>>>> o.a.k.coordinator.group.assignor.ShareGroupPartitionAssignor
> >> and
> >>>>>> it
> >>>>>>>>>>>>>> now extends the PartitionAssignor interface. It’s
> essentially
> >> a
> >>>>>>>> marker
> >>>>>>>>>>>>>> of which partition assignors can be used with share groups.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 23 Apr 2024, at 18:27, Jun Rao <j...@confluent.io.INVALID
> >
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 123. "it doesn’t need to confirm the assignment back to the
> >>>> GC."
> >>>>>>>>>>>>>>> Hmm, I thought the member needs to confirm the assignment
> to
> >> GC
> >>>>>> to
> >>>>>>>>>>>>>>> avoid GC including the assignment in the heartbeat response
> >>>>>>>>>>>>>> continuously. I
> >>>>>>>>>>>>>>> assume this is done by including the new group epoch in the
> >>>>>>>> heartbeat
> >>>>>>>>>>>>>>> response.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 125. It's possible that the share partition has never been
> >>>>>>>>>> initialized
> >>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>> AlterShareGroupOffsets is called. If GC doesn't write
> >>>>>>>>>>>>>>> ShareGroupPartitionMetadata and the GC fails over, it would
> >>>>>>>>>>>> reinitialize
> >>>>>>>>>>>>>>> the share partition and lose the effect of
> >>>>>> AlterShareGroupOffsets.
> >>>>>>>> If
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> partition has already been initialized and it's recorded
> >>>>>>>>>>>>>>> in ShareGroupPartitionMetadata, it's possible not to write
> >>>>>>>>>>>>>>> ShareGroupPartitionMetadata again when handling
> >>>>>>>>>> AlterShareGroupOffsets.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 138. Could we add the flow in GC when a topic is deleted?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 139. Do we need to add any metrics in KafkaShareConsumer?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 54. "I don’t think there is any redundancy. The
> >>>>>>>>>>>> ShareGroupMemberMetadata
> >>>>>>>>>>>>>>> does include a list of subscribed topics. However, if there
> >> is
> >>>> a
> >>>>>>>>>> period
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>> time in which no members are subscribed to a particular
> >> topic,
> >>>> it
> >>>>>>>>>> does
> >>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>> mean that the topic’s ShareGroupState should be immediately
> >>>>>>>> removed,
> >>>>>>>>>>>> but
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>> does mean that there will be no ShareGroupMemberMetadata
> >>>> records
> >>>>>>>>>>>>>> containing
> >>>>>>>>>>>>>>> that topic."
> >>>>>>>>>>>>>>> I am still trying to understand the value of
> >> InitializingTopics
> >>>>>> and
> >>>>>>>>>>>>>>> DeletingTopics in ShareGroupPartitionMetadataValue. They
> are
> >>>> used
> >>>>>>>> to
> >>>>>>>>>>>>>>> remember the intention of an operation. However, GC still
> >> needs
> >>>>>> to
> >>>>>>>>>>>> handle
> >>>>>>>>>>>>>>> the case when the intention is not safely recorded. If GC
> >> wants
> >>>>>> to
> >>>>>>>>>>>>>>> initialize a new topic/partition, a simpler approach is for
> >> it
> >>>> to
> >>>>>>>>>> send
> >>>>>>>>>>>> an
> >>>>>>>>>>>>>>> InitializeShareGroupState to the share coordinator and
> after
> >>>>>>>>>> receiving
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>> success response, write ShareGroupPartitionMetadataValue
> with
> >>>> the
> >>>>>>>>>>>>>>> initialized partition included in InitializedTopics. This
> >>>> saves a
> >>>>>>>>>>>> record
> >>>>>>>>>>>>>>> write. It's possible for GC to fail in between the two
> steps.
> >>>> On
> >>>>>>>>>>>>>> failover,
> >>>>>>>>>>>>>>> the new GC just repeats the process. The case that you
> >>>> mentioned
> >>>>>>>>>> above
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>> still be achieved. If a partition is in InitializedTopics
> of
> >>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue, but no member subscribes
> to
> >>>> it,
> >>>>>>>> we
> >>>>>>>>>>>> can
> >>>>>>>>>>>>>>> still keep the ShareGroupState as long as the topic still
> >>>> exists.
> >>>>>>>> The
> >>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>> optimization could be applied to DeletingTopics too.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:57 AM Andrew Schofield <
> >>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 123. Every time the GC fails over, it needs to recompute
> the
> >>>>>>>>>>>> assignment
> >>>>>>>>>>>>>>>> for every member. However, the impact of re-assignment is
> >> not
> >>>>>> that
> >>>>>>>>>>>>>> onerous.
> >>>>>>>>>>>>>>>> If the recomputed assignments are the same, which they may
> >>>> well
> >>>>>>>> be,
> >>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>> is no impact on the members at all.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On receiving the new assignment, the member adjusts the
> >>>>>>>>>>>> topic-partitions
> >>>>>>>>>>>>>>>> in its share sessions, removing those which were revoked
> and
> >>>>>>>> adding
> >>>>>>>>>>>>>> those
> >>>>>>>>>>>>>>>> which were assigned. It is able to acknowledge the records
> >> it
> >>>>>>>>>> fetched
> >>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>> the partitions which have just been revoked, and it
> doesn’t
> >>>> need
> >>>>>>>> to
> >>>>>>>>>>>>>> confirm
> >>>>>>>>>>>>>>>> the assignment back to the GC.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 125. I don’t think the GC needs to write
> >>>>>>>> ShareGroupPartitionMetadata
> >>>>>>>>>>>>>>>> when processing AlterShareGroupOffsets. This is because
> the
> >>>>>>>>>> operation
> >>>>>>>>>>>>>>>> happens as a result of an explicit administrative action
> and
> >>>> it
> >>>>>> is
> >>>>>>>>>>>>>> possible
> >>>>>>>>>>>>>>>> to return a specific error code for each topic-partition.
> >> The
> >>>>>>>> cases
> >>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>> ShareGroupPartitionMetadata is used are when a topic is
> >> added
> >>>> or
> >>>>>>>>>>>> removed
> >>>>>>>>>>>>>>>> from the subscribed topics, or the number of partitions
> >>>> changes.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 130. I suppose that limits the minimum lock timeout for a
> >>>>>> cluster
> >>>>>>>> to
> >>>>>>>>>>>>>>>> prevent
> >>>>>>>>>>>>>>>> a group from having an excessively low value. Config
> added.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 131. I have changed it to
> >>>>>> group.share.partition.max.record.locks.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 136.  When GC failover occurs, the GC gaining ownership
> of a
> >>>>>>>>>> partition
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>> the __consumer_offsets topic replays the records to build
> >> its
> >>>>>>>> state.
> >>>>>>>>>>>>>>>> In the case of a share group, it learns:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> * The share group and its group epoch (ShareGroupMetadata)
> >>>>>>>>>>>>>>>> * The list of members (ShareGroupMemberMetadata)
> >>>>>>>>>>>>>>>> * The list of share-partitions
> (ShareGroupPartitionMetadata)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It will recompute the assignments in order to respond to
> >>>>>>>>>>>>>>>> ShareGroupHeartbeat requests. As a result, it bumps the
> >> group
> >>>>>>>> epoch.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I will update the KIP accordingly to confirm the
> behaviour.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 137.1: The GC and the SPL report the metrics in the
> >>>>>>>>>>>>>>>> group-coordinator-metrics
> >>>>>>>>>>>>>>>> group. Unlike consumer groups in which the GC performs
> >> offset
> >>>>>>>>>> commit,
> >>>>>>>>>>>>>>>> the share group equivalent is performed by the SPL. So,
> I’ve
> >>>>>>>> grouped
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> concepts which relate to the group in
> >>>> group-coordinator-metrics.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The SC reports the metrics in the
> share-coordinator-metrics
> >>>>>> group.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 137.2: There is one metric in both groups -
> >>>> partition-load-time.
> >>>>>>>> In
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> SC
> >>>>>>>>>>>>>>>> group,
> >>>>>>>>>>>>>>>> it refers to the time loading data from the share-group
> >> state
> >>>>>>>> topic
> >>>>>>>>>> so
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>> a ReadShareGroupState request can be answered. In the GC
> >>>> group,
> >>>>>>>>>>>>>>>> it refers to the time to read the state from the
> persister.
> >>>>>> Apart
> >>>>>>>>>> from
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> interbroker RPC latency of the read, they’re likely to be
> >> very
> >>>>>>>>>> close.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Later, for a cluster which is using a custom persister,
> the
> >>>>>>>>>>>>>>>> share-coordinator
> >>>>>>>>>>>>>>>> metrics would likely not be reported, and the persister
> >> would
> >>>>>> have
> >>>>>>>>>> its
> >>>>>>>>>>>>>> own
> >>>>>>>>>>>>>>>> metrics.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 137.3: Correct. Fixed.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 137.4: Yes, it does include the time to write to the
> >> internal
> >>>>>>>> topic.
> >>>>>>>>>>>>>>>> I’ve tweaked the description.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 22 Apr 2024, at 20:04, Jun Rao
> <j...@confluent.io.INVALID
> >>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 123. "The share group does not persist the target
> >>>> assignment."
> >>>>>>>>>>>>>>>>> What's the impact of this? Everytime that GC fails over,
> it
> >>>>>> needs
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> recompute the assignment for every member. Do we expect
> the
> >>>>>>>> member
> >>>>>>>>>>>>>>>>> assignment to change on every GC failover?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 125. Should the GC also write
> ShareGroupPartitionMetadata?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 127. So, group epoch is only propagated to SC when
> >>>>>>>>>>>>>>>>> InitializeShareGroupState request is sent. This sounds
> >> good.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 130. Should we have a
> >>>> group.share.min.record.lock.duration.ms
> >>>>>> to
> >>>>>>>>>>>> pair
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>> group.share.max.record.lock.duration.ms?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 131. Sounds good. The name
> >>>>>>>> group.share.record.lock.partition.limit
> >>>>>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>>>> seem very intuitive. How about something
> >>>>>>>>>>>>>>>>> like group.share.partition.max.records.pending.ack?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 136. Could we describe the process of GC failover? I
> guess
> >> it
> >>>>>>>> needs
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> compute member reassignment and check if there is any new
> >>>>>>>>>>>>>> topic/partition
> >>>>>>>>>>>>>>>>> matching the share subscription. Does it bump up the
> group
> >>>>>> epoch?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 137. Metrics:
> >>>>>>>>>>>>>>>>> 137.1 It would be useful to document who reports each
> >> metric.
> >>>>>> Is
> >>>>>>>> it
> >>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>> broker, GC, SC or SPL?
> >>>>>>>>>>>>>>>>> 137.2 partition-load-time: Is that the loading time at
> SPL
> >> or
> >>>>>> SC?
> >>>>>>>>>>>>>>>>> 137.3 "The time taken in milliseconds to load the
> >> share-group
> >>>>>>>> state
> >>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>> the share-group state partitions loaded in the last 30
> >>>>>> seconds."
> >>>>>>>>>>>>>>>>> The window depends on metrics.num.samples and
> >>>>>>>>>>>> metrics.sample.window.ms
> >>>>>>>>>>>>>>>>> and is not always 30 seconds, right?
> >>>>>>>>>>>>>>>>> 137.4 Could you explain write/write-latency a bit more?
> >> Does
> >>>> it
> >>>>>>>>>>>> include
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> time to write to the internal topic?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield <
> >>>>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>>>> Thanks for your comments.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 120. Thanks. Fixed.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch indicates which
> >> snapshot
> >>>>>>>>>>>>>>>>>> the update applies to. It should of course be the
> snapshot
> >>>>>> that
> >>>>>>>>>>>>>> precedes
> >>>>>>>>>>>>>>>>>> it in the log. It’s just there to provide a consistency
> >>>> check.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I also noticed that ShareSnapshotValue was missing
> >>>> StateEpoch.
> >>>>>>>> It
> >>>>>>>>>>>>>>>>>> isn’t any more.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 122. In KIP-848, ConsumerGroupMemberMetadataValue
> includes
> >>>>>>>>>>>>>>>>>> GroupEpoch, but in the code it does not. In fact, there
> is
> >>>>>>>>>>>>>> considerable
> >>>>>>>>>>>>>>>>>> divergence between the KIP and the code for this record
> >>>> value
> >>>>>>>>>> schema
> >>>>>>>>>>>>>>>>>> which I expect will be resolved when the migration code
> >> has
> >>>>>> been
> >>>>>>>>>>>>>>>>>> completed.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 123. The share group does not persist the target
> >> assignment.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 124. Share groups have three kinds of record:
> >>>>>>>>>>>>>>>>>> i) ShareGroupMetadata
> >>>>>>>>>>>>>>>>>> - this contains the group epoch and is written whenever
> >> the
> >>>>>>>> group
> >>>>>>>>>>>>>>>>>> epoch changes.
> >>>>>>>>>>>>>>>>>> ii) ShareGroupMemberMetadata
> >>>>>>>>>>>>>>>>>> - this does not contain the group epoch.
> >>>>>>>>>>>>>>>>>> iii) ShareGroupPartitionMetadata
> >>>>>>>>>>>>>>>>>> - this currently contains the epoch, but I think that is
> >>>>>>>>>>>> unnecessary.
> >>>>>>>>>>>>>>>>>> For one thing, the ConsumerGroupPartitionMetadata
> >> definition
> >>>>>>>>>>>>>>>>>> contains the group epoch, but the value appears never to
> >> be
> >>>>>> set.
> >>>>>>>>>>>>>>>>>> David Jacot confirms that it’s not necessary and is
> >> removing
> >>>>>> it.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I have removed the Epoch from
> ShareGroupPartitionMetadata.
> >>>>>>>>>>>>>>>>>> The only purpose of the persisting the epoch for a share
> >>>> group
> >>>>>>>> is
> >>>>>>>>>> so
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> when a group coordinator takes over the share group, it
> is
> >>>>>> able
> >>>>>>>> to
> >>>>>>>>>>>>>>>>>> continue the sequence of epochs.
> >>>> ShareGroupMetadataValue.Epoch
> >>>>>>>>>>>>>>>>>> is used for this.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 125. The group epoch will be incremented in this case
> and
> >>>>>>>>>>>>>>>>>> consequently a ShareGroupMetadata will be written. KIP
> >>>>>> updated.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 126. Not directly. A share group can only be deleted
> when
> >> it
> >>>>>> has
> >>>>>>>>>> no
> >>>>>>>>>>>>>>>>>> members, so the tombstones for ShareGroupMemberMetadata
> >> will
> >>>>>>>>>>>>>>>>>> have been written when the members left. I have
> clarified
> >>>>>> this.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 127. The share coordinator is ignorant of the group
> epoch.
> >>>>>> When
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> group coordinator is initializing the share-group state
> >> the
> >>>>>>>> first
> >>>>>>>>>>>> time
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> a share-partition is being added to an assignment in the
> >>>>>> group,
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> group epoch is used as the state epoch. But as the group
> >>>> epoch
> >>>>>>>>>>>>>>>>>> increases over time, the share coordinator is entirely
> >>>>>> unaware.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> When the first consumer for a share-partition fetches
> >>>> records
> >>>>>>>>>> from a
> >>>>>>>>>>>>>>>>>> share-partition leader, the SPL calls the share
> >> coordinator
> >>>> to
> >>>>>>>>>>>>>>>>>> ReadShareGroupState. If the SPL has previously read the
> >>>>>>>>>> information
> >>>>>>>>>>>>>>>>>> and again it’s going from 0 to 1 consumer, it confirms
> >> it's
> >>>> up
> >>>>>>>> to
> >>>>>>>>>>>> date
> >>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>> calling ReadShareGroupOffsetsState.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Even if many consumers are joining at the same time, any
> >>>>>>>>>>>>>> share-partition
> >>>>>>>>>>>>>>>>>> which is being initialized will not be included in their
> >>>>>>>>>>>> assignments.
> >>>>>>>>>>>>>>>> Once
> >>>>>>>>>>>>>>>>>> the initialization is complete, the next rebalance will
> >>>> assign
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>> to some consumers which will discover this by
> >>>>>>>> ShareGroupHeartbeat
> >>>>>>>>>>>>>>>>>> response. And then, the fetching begins.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> If an SPL receives a ShareFetch request before it’s read
> >> the
> >>>>>>>> state
> >>>>>>>>>>>>>>>>>> from the SC, it can make the ShareFetch request wait up
> to
> >>>>>>>>>> MaxWaitMs
> >>>>>>>>>>>>>>>>>> and then it can return an empty set of records if it’s
> >> still
> >>>>>> not
> >>>>>>>>>>>>>> ready.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> So, I don’t believe there will be too much load. If a
> >> topic
> >>>>>> with
> >>>>>>>>>>>> many
> >>>>>>>>>>>>>>>>>> partitions is added to the subscribed topics for a share
> >>>>>> group,
> >>>>>>>>>> the
> >>>>>>>>>>>>>> fact
> >>>>>>>>>>>>>>>>>> that the assignments will only start to include the
> >>>> partitions
> >>>>>>>> as
> >>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>> initialization completes should soften the impact.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 128, 129: The “proper” way to turn on this feature when
> >> it’s
> >>>>>>>>>>>> finished
> >>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>> be using `group.coordinator.rebalance.protocols` and
> >>>>>>>>>>>> `group.version`.
> >>>>>>>>>>>>>>>>>> While it’s in Early Access and for test cases, the
> >>>>>>>>>>>>>> `group.share.enable`
> >>>>>>>>>>>>>>>>>> configuration will turn it on.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I have described `group.share.enable` as an internal
> >>>>>>>> configuration
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> the KIP.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 130. The config `group.share.record.lock.duration.ms`
> >>>> applies
> >>>>>>>> to
> >>>>>>>>>>>>>> groups
> >>>>>>>>>>>>>>>>>> which do not specify a group-level configuration for
> lock
> >>>>>>>>>> duration.
> >>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>> minimum and maximum for this configuration are intended
> to
> >>>>>> give
> >>>>>>>> it
> >>>>>>>>>>>>>>>>>> sensible bounds.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> If a group does specify its own `
> >>>>>>>>>>>> group.share.record.lock.duration.ms
> >>>>>>>>>>>>>> `,
> >>>>>>>>>>>>>>>>>> the broker-level `
> group.share.max.record.lock.duration.ms
> >> `
> >>>>>>>> gives
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> cluster administrator a way of setting a maximum value
> for
> >>>> all
> >>>>>>>>>>>> groups.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> While editing, I renamed `
> >>>>>>>> group.share.record.lock.duration.max.ms
> >>>>>>>>>> `
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> `group.share.max.record.lock.duration.ms` for
> consistency
> >>>>>> with
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> rest of the min/max configurations.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 131. This is the limit per partition so you can go wider
> >>>> with
> >>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>> partitions.
> >>>>>>>>>>>>>>>>>> I have set the initial value low for safety. I expect to
> >> be
> >>>>>> able
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> increase this
> >>>>>>>>>>>>>>>>>> significantly when we have mature code which has been
> >>>>>>>>>> battle-tested.
> >>>>>>>>>>>>>>>>>> Rather than try to guess how high it can safely go, I’ve
> >>>> erred
> >>>>>>>> on
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> side
> >>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> caution and expect to open it up in a future KIP.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 132. Good catch. The problem is that I have missed two
> >> group
> >>>>>>>>>>>>>>>>>> configurations,
> >>>>>>>>>>>>>>>>>> now added. These are group.share.session.timeout.ms and
> >>>>>>>>>>>>>>>>>> group.share.heartbeat.timeout.ms . The configurations
> you
> >>>>>>>>>> mentioned
> >>>>>>>>>>>>>>>>>> are the bounds for the group-level configurations.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 133. The name `group.share.max.size` was chosen to
> mirror
> >>>> the
> >>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>> `group.consumer.max.size`.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 134. It is intended to be a list of all of the valid
> >>>> assignors
> >>>>>>>> for
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> cluster.
> >>>>>>>>>>>>>>>>>> When the assignors are configurable at the group level,
> >> the
> >>>>>>>> group
> >>>>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>>> will only be permitted to name an assignor which is in
> >> this
> >>>>>>>> list.
> >>>>>>>>>>>> For
> >>>>>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>> is no group configuration for assignor, so all groups
> get
> >>>> the
> >>>>>>>> one
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>> assignor in the list.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 135. It’s the number of threads per broker. For a
> cluster
> >>>>>> with a
> >>>>>>>>>>>> small
> >>>>>>>>>>>>>>>>>> number of
> >>>>>>>>>>>>>>>>>> of brokers and a lot of share group activity, it may be
> >>>>>>>>>> appropriate
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> increase
> >>>>>>>>>>>>>>>>>> this. We will be able to give tuning advice once we have
> >>>>>>>>>> experience
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> performance impact of increasing it.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 20 Apr 2024, at 00:14, Jun Rao
> >> <j...@confluent.io.INVALID
> >>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for the reply. A few more comments.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 120. There is still reference to
> >> ConsumerGroupMetadataKey.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch: Should we change
> it
> >>>>>> since
> >>>>>>>>>> it's
> >>>>>>>>>>>>>>>> not a
> >>>>>>>>>>>>>>>>>>> snapshot?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 122. ConsumerGroupMemberMetadataValue includes epoch,
> but
> >>>>>>>>>>>>>>>>>>> ShareGroupMemberMetadataValue does not. Do we know how
> >> the
> >>>>>>>> epoch
> >>>>>>>>>> is
> >>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>> used in the consumer group and whether it's needed in
> the
> >>>>>> share
> >>>>>>>>>>>>>> group?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 123. There is no equivalent of
> >>>>>>>>>> ConsumerGroupTargetAssignmentMember
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>> ShareGroup. How does the shareGroup persist the member
> >>>>>>>>>> assignment?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 124. Assign a share-partition: "When a topic-partition
> is
> >>>>>>>>>> assigned
> >>>>>>>>>>>>>> to a
> >>>>>>>>>>>>>>>>>>> member of a share group for the first time, the group
> >>>>>>>> coordinator
> >>>>>>>>>>>>>>>> writes
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> ShareGroupPartitionMetadata record to the
> >>>> __consumer_offsets
> >>>>>>>>>>>> topic".
> >>>>>>>>>>>>>>>>>>> When does the coordinator write ShareGroupMetadata with
> >> the
> >>>>>>>>>> bumped
> >>>>>>>>>>>>>>>>>> epoch?
> >>>>>>>>>>>>>>>>>>> In general, is there a particular order to bump up the
> >>>> epoch
> >>>>>> in
> >>>>>>>>>>>>>>>> different
> >>>>>>>>>>>>>>>>>>> records? When can the new epoch be exposed to the
> >>>>>>>>>>>>>> sharePartitionLeader?
> >>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>> would be useful to make this clear in other state
> >> changing
> >>>>>>>>>>>> operations
> >>>>>>>>>>>>>>>>>> too.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 125. "Alter share group offsets Group coordinator and
> >> share
> >>>>>>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>>>>> Only empty share groups support this operation. The
> group
> >>>>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>>>> sends
> >>>>>>>>>>>>>>>>>>> an InitializeShareGroupState  request to the share
> >>>>>> coordinator.
> >>>>>>>>>> The
> >>>>>>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>>>> coordinator writes a ShareSnapshot record with the new
> >>>> state
> >>>>>>>>>> epoch
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> __share_group_state  topic."
> >>>>>>>>>>>>>>>>>>> Does the operation need to write
> >>>> ShareGroupPartitionMetadata
> >>>>>>>> and
> >>>>>>>>>>>>>>>>>>> ShareGroupMetadata (for the new epoch)?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 126. Delete share group: Does this operation also need
> to
> >>>>>>>> write a
> >>>>>>>>>>>>>>>>>> tombstone
> >>>>>>>>>>>>>>>>>>> for the ShareGroupMemberMetadata record?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 127. I was thinking about the impact on a new consumer
> >>>>>> joining
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> shareGroup. This causes the GroupCoordinator and the
> >>>>>>>>>>>> ShareCoordinator
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> bump up the group epoch, which in turn causes the
> >>>>>>>> SharePartition
> >>>>>>>>>>>>>> leader
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> reinitialize the state with ReadShareGroupOffsetsState.
> >> If
> >>>>>> many
> >>>>>>>>>>>>>>>> consumers
> >>>>>>>>>>>>>>>>>>> are joining a shareGroup around the same time, would
> >> there
> >>>> be
> >>>>>>>> too
> >>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>> load
> >>>>>>>>>>>>>>>>>>> for the ShareCoordinator and SharePartition leader?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 128. Since group.share.enable will be replaced by the
> >>>>>>>>>> group.version
> >>>>>>>>>>>>>>>>>>> feature, should we make it an internal config?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 129. group.coordinator.rebalance.protocols: this seems
> >>>>>>>> redundant
> >>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>> group.share.enable?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 130. Why do we have both
> >>>> group.share.record.lock.duration.ms
> >>>>>>>> and
> >>>>>>>>>>>>>>>>>>> group.share.record.lock.duration.max.ms, each with its
> >> own
> >>>>>> max
> >>>>>>>>>>>>>> value?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 131. group.share.record.lock.partition.limit defaults
> to
> >>>> 200.
> >>>>>>>>>> This
> >>>>>>>>>>>>>>>> limits
> >>>>>>>>>>>>>>>>>>> the max degree of consumer parallelism to 200, right?
> If
> >>>>>> there
> >>>>>>>>>> are
> >>>>>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>> records per batch, it could be even smaller.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 132. Why do we need all three of
> >>>>>>>> group.share.session.timeout.ms,
> >>>>>>>>>>>>>>>>>>> group.share.min.session.timeout.ms and
> >>>>>>>>>>>>>>>>>> group.share.max.session.timeout.ms?
> >>>>>>>>>>>>>>>>>>> Session timeout can't be set by the client. Ditto for
> >>>>>>>>>>>>>>>>>>> group.share.heartbeat.interval.ms.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 133. group.share.max.size: Would
> >>>>>>>>>> group.share.max.members.per.group
> >>>>>>>>>>>>>> be a
> >>>>>>>>>>>>>>>>>>> more intuitive name?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 134. group.share.assignors: Why does it need to be a
> >> list?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 135. share.coordinator.threads: Is that per share
> >>>> coordinator
> >>>>>>>> or
> >>>>>>>>>>>> per
> >>>>>>>>>>>>>>>>>> broker?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Tue, Apr 16, 2024 at 3:21 AM Andrew Schofield <
> >>>>>>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>>>>>> Thanks for you reply.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 42.1. That’s a sensible improvement. Done.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 47,56. Done. All instances of BaseOffset changed to
> >>>>>>>> FirstOffset.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 105. I think that would be in a future KIP.
> Personally,
> >> I
> >>>>>>>> don’t
> >>>>>>>>>>>> mind
> >>>>>>>>>>>>>>>>>> having
> >>>>>>>>>>>>>>>>>>>> a non-contiguous set of values in this KIP.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 114. Done.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 115. If the poll is just returning a single record
> >> because
> >>>>>>>> there
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>>> data to consume, committing on every record is OK.
> It’s
> >>>>>>>>>>>> inefficient
> >>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>> acceptable.
> >>>>>>>>>>>>>>>>>>>> If the first poll returns just one record, but many
> more
> >>>>>> have
> >>>>>>>>>>>> piled
> >>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>>>> while
> >>>>>>>>>>>>>>>>>>>> the first one was being processed, the next poll has
> the
> >>>>>>>>>>>> opportunity
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> return
> >>>>>>>>>>>>>>>>>>>> a bunch of records and then these will be able to be
> >>>>>> committed
> >>>>>>>>>>>>>>>> together.
> >>>>>>>>>>>>>>>>>>>> So, my answer is that optimisation on the broker to
> >> return
> >>>>>>>>>> batches
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> records when the records are available is the approach
> >> we
> >>>>>> will
> >>>>>>>>>>>> take
> >>>>>>>>>>>>>>>>>> here.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 116. Good idea. Done.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 117. I’ve rewritten the Kafka Broker Migration
> section.
> >>>> Let
> >>>>>> me
> >>>>>>>>>>>> know
> >>>>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>>>> you think.
> >>>>>>>>>>>>>>>>>>>> I am discussing the configuration to enable the
> feature
> >> in
> >>>>>> the
> >>>>>>>>>>>>>> mailing
> >>>>>>>>>>>>>>>>>>>> list with
> >>>>>>>>>>>>>>>>>>>> David Jacot also, so I anticipate a bit of change in
> >> this
> >>>>>> area
> >>>>>>>>>>>>>> still.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On 15 Apr 2024, at 23:34, Jun Rao
> >>>> <j...@confluent.io.INVALID
> >>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for the updated KIP.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 42.1 "If the share group offset is altered multiple
> >> times
> >>>>>>>> when
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>>> remains empty, it would be harmless if the same state
> >>>> epoch
> >>>>>>>> was
> >>>>>>>>>>>>>>>> reused
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> initialize the state."
> >>>>>>>>>>>>>>>>>>>>> Hmm, how does the partition leader know for sure that
> >> it
> >>>>>> has
> >>>>>>>>>>>>>> received
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> latest share group offset if epoch is reused?
> >>>>>>>>>>>>>>>>>>>>> Could we update the section "Group epoch - Trigger a
> >>>>>>>> rebalance"
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>> AdminClient.alterShareGroupOffsets causes the group
> >> epoch
> >>>>>> to
> >>>>>>>> be
> >>>>>>>>>>>>>>>> bumped
> >>>>>>>>>>>>>>>>>>>> too?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 47,56 "my view is that BaseOffset should become
> >>>> FirstOffset
> >>>>>>>> in
> >>>>>>>>>>>> ALL
> >>>>>>>>>>>>>>>>>>>> schemas
> >>>>>>>>>>>>>>>>>>>>> defined in the KIP."
> >>>>>>>>>>>>>>>>>>>>> Yes, that seems better to me.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 105. "I have another non-terminal state in mind for
> 3."
> >>>>>>>>>>>>>>>>>>>>> Should we document it?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 114. session.timeout.ms in the consumer
> configuration
> >> is
> >>>>>>>>>>>>>> deprecated
> >>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> KIP-848. So, we need to remove it from the
> >> shareConsumer
> >>>>>>>>>>>>>>>> configuration.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 115. I am wondering if it's a good idea to always
> >> commit
> >>>>>> acks
> >>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>> ShareConsumer.poll(). In the extreme case, each batch
> >> may
> >>>>>>>> only
> >>>>>>>>>>>>>>>> contain
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>> single record and each poll() only returns a single
> >>>> batch.
> >>>>>>>> This
> >>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>> cause
> >>>>>>>>>>>>>>>>>>>>> each record to be committed individually. Is there a
> >> way
> >>>>>> for
> >>>>>>>> a
> >>>>>>>>>>>> user
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> optimize this?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 116. For each new RPC, could we list the associated
> >> acls?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 117. Since this KIP changes internal records and
> RPCs,
> >> it
> >>>>>>>> would
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> useful
> >>>>>>>>>>>>>>>>>>>>> to document the upgrade process.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 7:35 AM Andrew Schofield <
> >>>>>>>>>>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>>>>>>>>>>> Thanks for your questions.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 41.
> >>>>>>>>>>>>>>>>>>>>>> 41.1. The partition leader obtains the state epoch
> in
> >>>> the
> >>>>>>>>>>>> response
> >>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>> ReadShareGroupState. When it becomes a
> share-partition
> >>>>>>>> leader,
> >>>>>>>>>>>>>>>>>>>>>> it reads the share-group state and one of the things
> >> it
> >>>>>>>> learns
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> current state epoch. Then it uses the state epoch in
> >> all
> >>>>>>>>>>>>>> subsequent
> >>>>>>>>>>>>>>>>>>>>>> calls to WriteShareGroupState. The fencing is to
> >> prevent
> >>>>>>>>>> writes
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> a previous state epoch, which are very unlikely but
> >>>> which
> >>>>>>>>>> would
> >>>>>>>>>>>>>> mean
> >>>>>>>>>>>>>>>>>>>>>> that a leader was using an out-of-date epoch and was
> >>>>>> likely
> >>>>>>>> no
> >>>>>>>>>>>>>>>> longer
> >>>>>>>>>>>>>>>>>>>>>> the current leader at all, perhaps due to a long
> pause
> >>>> for
> >>>>>>>>>> some
> >>>>>>>>>>>>>>>>>> reason.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 41.2. If the group coordinator were to set the SPSO,
> >>>>>>>> wouldn’t
> >>>>>>>>>> it
> >>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>>>> to discover the initial offset? I’m trying to avoid
> >> yet
> >>>>>>>>>> another
> >>>>>>>>>>>>>>>>>>>>>> inter-broker
> >>>>>>>>>>>>>>>>>>>>>> hop.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 42.
> >>>>>>>>>>>>>>>>>>>>>> 42.1. I think I’ve confused things. When the share
> >> group
> >>>>>>>>>> offset
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> altered
> >>>>>>>>>>>>>>>>>>>>>> using AdminClient.alterShareGroupOffsets, the group
> >>>>>>>>>> coordinator
> >>>>>>>>>>>>>> WILL
> >>>>>>>>>>>>>>>>>>>>>> update the state epoch. I don’t think it needs to
> >> update
> >>>>>> the
> >>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>> epoch
> >>>>>>>>>>>>>>>>>>>>>> at the same time (although it could) because the
> group
> >>>>>> epoch
> >>>>>>>>>>>> will
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> been bumped when the group became empty. If the
> share
> >>>>>> group
> >>>>>>>>>>>> offset
> >>>>>>>>>>>>>>>>>>>>>> is altered multiple times when the group remains
> >> empty,
> >>>> it
> >>>>>>>>>> would
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> harmless if the same state epoch was reused to
> >>>> initialize
> >>>>>>>> the
> >>>>>>>>>>>>>> state.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> When the share-partition leader updates the SPSO as
> a
> >>>>>> result
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> the usual flow of record delivery, it does not
> update
> >>>> the
> >>>>>>>>>> state
> >>>>>>>>>>>>>>>> epoch.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 42.2. The share-partition leader will notice the
> >>>>>> alteration
> >>>>>>>>>>>>>> because,
> >>>>>>>>>>>>>>>>>>>>>> when it issues WriteShareGroupState, the response
> will
> >>>>>>>> contain
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> error code FENCED_STATE_EPOCH. This is supposed to
> be
> >>>> the
> >>>>>>>>>>>>>>>>>>>>>> last-resort way of catching this.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> When the share-partition leader handles its first
> >>>>>> ShareFetch
> >>>>>>>>>>>>>>>> request,
> >>>>>>>>>>>>>>>>>>>>>> it learns the state epoch from the response to
> >>>>>>>>>>>>>> ReadShareGroupState.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> In normal running, the state epoch will remain
> >> constant,
> >>>>>>>> but,
> >>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>>>> are no consumers and the group is empty, it might
> >>>> change.
> >>>>>>>> As a
> >>>>>>>>>>>>>>>> result,
> >>>>>>>>>>>>>>>>>>>>>> I think it would be sensible when the set of share
> >>>>>> sessions
> >>>>>>>>>>>>>>>>>> transitions
> >>>>>>>>>>>>>>>>>>>>>> from 0 to 1, which is a reasonable proxy for the
> share
> >>>>>> group
> >>>>>>>>>>>>>>>>>>>> transitioning
> >>>>>>>>>>>>>>>>>>>>>> from empty to non-empty, for the share-partition
> >> leader
> >>>> to
> >>>>>>>>>> issue
> >>>>>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsState to validate the state
> >> epoch.
> >>>> If
> >>>>>>>> its
> >>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>>>> epoch is out of date, it can then
> ReadShareGroupState
> >> to
> >>>>>>>>>>>>>>>>>> re-initialize.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I’ve changed the KIP accordingly.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 47, 56. If I am to change BaseOffset to FirstOffset,
> >> we
> >>>>>> need
> >>>>>>>>>> to
> >>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> a clear view of which is the correct term. Having
> >>>> reviewed
> >>>>>>>> all
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> instances, my view is that BaseOffset should become
> >>>>>>>>>> FirstOffset
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> ALL schemas defined in the KIP. Then, BaseOffset is
> >> just
> >>>>>>>> used
> >>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> record batches, which is already a known concept.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Please let me know if you agree.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 60. I’ve added FindCoordinator to the top level
> index
> >>>> for
> >>>>>>>>>>>> protocol
> >>>>>>>>>>>>>>>>>>>> changes.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 61. OK. I expect you are correct about how users
> will
> >> be
> >>>>>>>> using
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> console share consumer. When I use the console
> >>>> consumer, I
> >>>>>>>>>>>> always
> >>>>>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>>>>>> a new consumer group. I have changed the default
> group
> >>>> ID
> >>>>>>>> for
> >>>>>>>>>>>>>>>> console
> >>>>>>>>>>>>>>>>>>>>>> share consumer to “console-share-consumer” to match
> >> the
> >>>>>>>>>> console
> >>>>>>>>>>>>>>>>>> consumer
> >>>>>>>>>>>>>>>>>>>>>> better and give more of an idea where this
> mysterious
> >>>>>> group
> >>>>>>>>>> has
> >>>>>>>>>>>>>> come
> >>>>>>>>>>>>>>>>>>>> from.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 77. I will work on a proposal that does not use
> >>>> compaction
> >>>>>>>> and
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>> make a judgement about whether it’s a better course
> >> for
> >>>>>>>>>> KIP-932.
> >>>>>>>>>>>>>>>>>>>>>> Personally,
> >>>>>>>>>>>>>>>>>>>>>> until I’ve written it down and lived with the ideas
> >> for
> >>>> a
> >>>>>>>> few
> >>>>>>>>>>>>>> days,
> >>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>> won’t be
> >>>>>>>>>>>>>>>>>>>>>> able to choose which I prefer.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I should be able to get the proposal written by the
> >> end
> >>>> of
> >>>>>>>>>> this
> >>>>>>>>>>>>>>>> week.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs
> >>>> matches
> >>>>>>>>>>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs
> from
> >>>>>>>> KIP-848.
> >>>>>>>>>>>>>>>>>>>>>> I prefer to maintain the consistency.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 101. Thanks for catching this. The
> >>>>>>>> ShareGroupHeartbeatResponse
> >>>>>>>>>>>> was
> >>>>>>>>>>>>>>>>>>>>>> originally
> >>>>>>>>>>>>>>>>>>>>>> created from KIP-848. This part of the schema does
> not
> >>>>>> apply
> >>>>>>>>>>>> and I
> >>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> removed
> >>>>>>>>>>>>>>>>>>>>>> it. I have also renamed AssignedTopicPartitions to
> >>>> simply
> >>>>>>>>>>>>>>>>>>>> TopicPartitions
> >>>>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>> aligns with the actual definition of
> >>>>>>>>>>>>>> ConsumerGroupHeartbeatResponse.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 102. No, I don’t think we do. Removed.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 103. I’ve changed the description for the error
> codes
> >>>> for
> >>>>>>>>>>>>>>>>>>>>>> ShareFetchResponse.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 104. Interesting. I have added ErrorMessages to
> these
> >>>> RPCs
> >>>>>>>> as
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> suggest.
> >>>>>>>>>>>>>>>>>>>>>> It’s a good improvement for problem determination.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 105. The values are reserved in my brain. Actually,
> 1
> >> is
> >>>>>>>>>>>> Acquired
> >>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>> is not persisted, and I have another non-terminal
> >> state
> >>>> in
> >>>>>>>>>> mind
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 106. A few people have raised the question of
> whether
> >>>>>>>>>>>>>>>>>> OffsetAndMetadata
> >>>>>>>>>>>>>>>>>>>>>> is sensible in this KIP, given that the optional
> >>>> Metadata
> >>>>>>>> part
> >>>>>>>>>>>>>> comes
> >>>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>> a regular consumer commits offsets. However, it is
> >>>> correct
> >>>>>>>>>> that
> >>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>> never be metadata with a share group. I have changed
> >>>>>>>>>>>>>>>>>>>>>> the KIP to replace OffsetAndMetadata with Long.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 107. Yes, you are right. I have learnt during this
> >>>> process
> >>>>>>>>>> that
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>>>>> bump
> >>>>>>>>>>>>>>>>>>>>>> can be a logical not just a physical change to the
> >>>> schema.
> >>>>>>>> KIP
> >>>>>>>>>>>>>>>>>> updated.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 108. I would prefer not to extend this RPC for all
> of
> >>>> the
> >>>>>>>>>> states
> >>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>> point.
> >>>>>>>>>>>>>>>>>>>>>> I think there is definitely scope for another KIP
> >>>> focused
> >>>>>> on
> >>>>>>>>>>>>>>>>>>>> administration
> >>>>>>>>>>>>>>>>>>>>>> of share groups that might want this information so
> >>>>>> someone
> >>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>> build
> >>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> UI and other tools on top. Doing that well is quite
> a
> >>>> lot
> >>>>>> of
> >>>>>>>>>>>> work
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>> own right
> >>>>>>>>>>>>>>>>>>>>>> so I would prefer not to do that now.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 109.Yes, they’re inconsistent and follow the
> consumer
> >>>>>> groups
> >>>>>>>>>>>>>>>>>> equivalents
> >>>>>>>>>>>>>>>>>>>>>> which are also inconsistent. In general, the
> >>>>>>>>>> KafkaAdmin.delete….
> >>>>>>>>>>>>>>>>>> Methods
> >>>>>>>>>>>>>>>>>>>>>> use the Map<XXX, KafkaFuture<YYY>> pattern like
> >>>>>>>>>>>>>>>>>> DeleteShareGroupsResult.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Would you prefer that I do that, or remain
> consistent
> >>>> with
> >>>>>>>>>>>>>> consumer
> >>>>>>>>>>>>>>>>>>>> groups?
> >>>>>>>>>>>>>>>>>>>>>> Happy to change it.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 110. Again, consumer groups don’t yield that level
> of
> >>>>>> detail
> >>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>> epochs.
> >>>>>>>>>>>>>>>>>>>>>> The MemberDescription does include the assignment,
> but
> >>>> not
> >>>>>>>> the
> >>>>>>>>>>>>>> list
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> subscribed topic names.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 111. I didn’t include GroupState in GroupListing
> >> because
> >>>>>>>>>> there’s
> >>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>>>> single class that includes the states of all group
> >>>> types.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 112. I think it’s good practice for the API to have
> >>>>>>>>>>>>>>>>>>>>>> ListShareGroupOffsetSpec.
> >>>>>>>>>>>>>>>>>>>>>> It makes evolution and extension of the API much
> >> easier.
> >>>>>>>> Also,
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>> matches
> >>>>>>>>>>>>>>>>>>>>>> the precedent set by ListConsumerGroupOffsetSpec.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 113. ListConsumerGroupsResults.errors() is the
> same. I
> >>>>>> think
> >>>>>>>>>> you
> >>>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> to look in the exception details and the same
> pattern
> >> is
> >>>>>>>> being
> >>>>>>>>>>>>>>>>>> followed
> >>>>>>>>>>>>>>>>>>>>>> here.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Over the next few days, I have committed to writing
> a
> >>>>>>>> proposal
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> persist
> >>>>>>>>>>>>>>>>>>>>>> share-group state that doesn’t use log compaction.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I am also aware that discussion with Justine Olshan
> on
> >>>>>>>>>>>>>>>> read-committed
> >>>>>>>>>>>>>>>>>>>>>> isolation level is not yet quite complete.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks for the detailed review.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On 9 Apr 2024, at 23:58, Jun Rao
> >>>>>> <j...@confluent.io.INVALID
> >>>>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi, Andrew,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for the reply. A few more comments.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 41.
> >>>>>>>>>>>>>>>>>>>>>>> 41.1 How does the partition leader obtain the group
> >>>> epoch
> >>>>>>>> to
> >>>>>>>>>>>> set
> >>>>>>>>>>>>>>>>>>>>>>> WriteShareGroupStateRequest.StateEpoch?
> >>>>>>>>>>>>>>>>>>>>>>> 41.2 What's the benefit of having the group
> >> coordinator
> >>>>>>>>>>>>>> initialize
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> state and the partition leader set the SPSO? It
> seems
> >>>>>>>> simpler
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> partition leader initialize both the state and the
> >> SPSO
> >>>>>>>>>>>> together?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 42.
> >>>>>>>>>>>>>>>>>>>>>>> 42.1 "I don’t think the group epoch needs to be
> >> bumped
> >>>>>> when
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>>>>> offset is altered."
> >>>>>>>>>>>>>>>>>>>>>>> But the part on handling Alter share group offsets
> >> says
> >>>>>>>> "The
> >>>>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>>>>>>>> coordinator writes a ShareCheckpoint record with
> the
> >>>> new
> >>>>>>>>>> state
> >>>>>>>>>>>>>>>> epoch
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> __share_group_state  topic." So, which is correct?
> We
> >>>>>> have
> >>>>>>>>>> two
> >>>>>>>>>>>>>>>> paths
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> update the state in the share coordinator, one from
> >> the
> >>>>>>>> group
> >>>>>>>>>>>>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>>>>>>>>> and another from the partition leader. I thought
> the
> >>>>>>>> benefit
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> bumping
> >>>>>>>>>>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>>>>>>> the epoch is to fence off a late request in the
> >>>> previous
> >>>>>>>>>> epoch
> >>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>> another
> >>>>>>>>>>>>>>>>>>>>>>> path.
> >>>>>>>>>>>>>>>>>>>>>>> 42.2 When the group coordinator alters the share
> >> group
> >>>>>>>> offset
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> share
> >>>>>>>>>>>>>>>>>>>>>>> coordinator, how does the partition leader know the
> >>>> share
> >>>>>>>>>> group
> >>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>>>>>>>>> been altered so that it could clear its in-memory
> >>>> state?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 47. 56. BaseOffset typically refers to the base
> >> offset
> >>>>>> for
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> batch
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> can be confusing. FirstOffset is clearer and
> matches
> >>>>>>>>>>>> LastOffset.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 60. Could we include FindCoordinatorRequest in the
> >> top
> >>>>>>>> level
> >>>>>>>>>>>>>> index
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>> Kafka protocol changes?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 61. I think it's probably ok to add time-based
> >>>> expiration
> >>>>>>>>>>>> later.
> >>>>>>>>>>>>>>>> But
> >>>>>>>>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>>>>>> a default group in console-share-consumer probably
> >>>> won't
> >>>>>>>> help
> >>>>>>>>>>>>>>>> reduce
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> garbage. In the common case, the user of the
> console
> >>>>>>>> consumer
> >>>>>>>>>>>>>>>> likely
> >>>>>>>>>>>>>>>>>>>>>> wants
> >>>>>>>>>>>>>>>>>>>>>>> to see the recently produced records for
> >> verification.
> >>>> If
> >>>>>>>> the
> >>>>>>>>>>>>>>>> default
> >>>>>>>>>>>>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>>>>>>>> doesn't provide that (because of the stale state),
> >> the
> >>>>>> user
> >>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>> likely
> >>>>>>>>>>>>>>>>>>>>>>> just use a new group. It's true that we don't
> garbage
> >>>>>>>> collect
> >>>>>>>>>>>>>> idle
> >>>>>>>>>>>>>>>>>>>>>> topics.
> >>>>>>>>>>>>>>>>>>>>>>> However,  the share groups are similar to
> consumers,
> >>>>>> which
> >>>>>>>>>> does
> >>>>>>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>>>>>>>> garbage collection. Typically, we read topics more
> >> than
> >>>>>>>>>>>> creating
> >>>>>>>>>>>>>>>>>> them.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 77. If the generic compaction is inconvenient, we
> >> could
> >>>>>> use
> >>>>>>>>>>>>>>>>>> customized
> >>>>>>>>>>>>>>>>>>>>>>> logic. If we go with that route, option (b) seems
> >>>> cleaner
> >>>>>>>> and
> >>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>>> optimized. Since the share states for all groups
> fit
> >> in
> >>>>>>>>>> memory,
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>> generate snapshots more efficiently than going
> >> through
> >>>>>>>>>>>>>> compaction.
> >>>>>>>>>>>>>>>>>>>>>> Having a
> >>>>>>>>>>>>>>>>>>>>>>> separate log per share partition is probably too
> much
> >>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>> It's
> >>>>>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>>> efficient to put the state changes for multiple
> share
> >>>>>>>>>>>> partitions
> >>>>>>>>>>>>>>>> in a
> >>>>>>>>>>>>>>>>>>>>>>> single log.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs:
> >>>>>> Should
> >>>>>>>> we
> >>>>>>>>>>>>>> name
> >>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>> SessionTimeoutMs?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 101. ShareGroupHeartbeatResponse.Assignment.Error:
> >> What
> >>>>>>>> kind
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>> error
> >>>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>> we have when assigning partitions? What are the
> >>>>>>>> corresponding
> >>>>>>>>>>>>>> error
> >>>>>>>>>>>>>>>>>>>>>> codes?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 102. Do we still need
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 103. Could we list the error codes separately for
> >>>>>>>>>>>>>>>>>>>>>>> ShareFetchResponse.Responses.Partitions.ErrorCode
> and
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 104. Should we add error message for the errorCode
> in
> >>>>>>>>>>>>>>>>>>>> ShareFetchResponse,
> >>>>>>>>>>>>>>>>>>>>>>> ShareAcknowledgeResponse,
> >> ReadShareGroupStateResponse,
> >>>>>>>>>>>>>>>>>>>>>>> WriteShareGroupStateResponse,
> >>>>>>>> DeleteShareGroupStateResponse,
> >>>>>>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsStateResponse and
> >>>>>>>>>>>>>>>>>>>> InitializeShareGroupStateResponse?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 105. "about": "The state -
> >>>>>>>> 0:Available,2:Acked,4:Archived.":
> >>>>>>>>>>>> What
> >>>>>>>>>>>>>>>>>>>> about 1
> >>>>>>>>>>>>>>>>>>>>>>> and 3? Are we leaving them out intentionally?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 106. Do we have usage of metadata in
> >> OffsetAndMetadata?
> >>>>>> If
> >>>>>>>>>> not,
> >>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>> remove it from AdminClient and KafkaShareConsumer?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 107. ListGroupsRequest: Should we bump up the
> version
> >>>>>> since
> >>>>>>>>>> it
> >>>>>>>>>>>>>> now
> >>>>>>>>>>>>>>>>>>>>>> supports
> >>>>>>>>>>>>>>>>>>>>>>> a new group type "share"?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 108. AdminClient.listShareGroupOffsets: Should it
> >>>> expose
> >>>>>>>> all
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> states
> >>>>>>>>>>>>>>>>>>>>>>> from ReadShareGroupStateResponse, instead of just
> >> SPSO?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 109. DeleteShareGroupOffsetsResult exposes
> >>>>>>>>>>>>>>>>>>>>>>> public KafkaFuture<Void> partitionResult(final
> >>>>>>>> TopicPartition
> >>>>>>>>>>>>>>>>>>>> partition)
> >>>>>>>>>>>>>>>>>>>>>>> DeleteShareGroupsResult exposes
> >>>>>>>>>>>>>>>>>>>>>>> public Map<String, KafkaFuture<Void>>
> deletedGroups()
> >>>>>>>>>>>>>>>>>>>>>>> Should we make them more consistent?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 110. Should ShareGroupDescription include fields
> like
> >>>>>>>>>>>> GroupEpoch,
> >>>>>>>>>>>>>>>>>>>>>>> AssignmentEpoch, MemberEpoch, and
> >> SubscribedTopicNames?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 111. Should GroupListing include GroupState?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 112. Do we need ListShareGroupOffsetsSpec? Could we
> >>>> just
> >>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>> Set<TopicPartition> directly?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 113. ListShareGroupsResult.errors(): How do we know
> >>>> which
> >>>>>>>>>> group
> >>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>>> error?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield <
> >>>>>>>>>>>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi David,
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for your questions.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 70. The Group Coordinator communicates with the
> >> Share
> >>>>>>>>>>>>>> Coordinator
> >>>>>>>>>>>>>>>>>> over
> >>>>>>>>>>>>>>>>>>>>>>>> RPCs.
> >>>>>>>>>>>>>>>>>>>>>>>> In the general case, it’s an inter-broker call. It
> >> is
> >>>>>>>>>> probably
> >>>>>>>>>>>>>>>>>>>> possible
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> optimise
> >>>>>>>>>>>>>>>>>>>>>>>> for the situation in which the appropriate GC and
> SC
> >>>>>>>> shards
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>> co-located, but the
> >>>>>>>>>>>>>>>>>>>>>>>> KIP does not delve that deep into potential
> >>>> performance
> >>>>>>>>>>>>>>>>>> optimisations.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 71. Avoiding collisions would be a good idea, but
> I
> >> do
> >>>>>>>> worry
> >>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>>>> retrospectively
> >>>>>>>>>>>>>>>>>>>>>>>> introducing a naming convention for groups. I feel
> >>>> that
> >>>>>>>>>> naming
> >>>>>>>>>>>>>>>>>>>>>> conventions
> >>>>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>> typically be the responsibility of the cluster
> >>>>>>>>>> administrators
> >>>>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>> organizational
> >>>>>>>>>>>>>>>>>>>>>>>> factors, such as the name of an application.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 72. Personally, I don’t like INVALID_GROUP_ID
> >> because
> >>>>>> the
> >>>>>>>>>>>> group
> >>>>>>>>>>>>>> ID
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> correct but
> >>>>>>>>>>>>>>>>>>>>>>>> the group is the wrong type. The nearest existing
> >>>> error
> >>>>>>>> code
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> gets
> >>>>>>>>>>>>>>>>>>>>>>>> that across
> >>>>>>>>>>>>>>>>>>>>>>>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is
> >> really
> >>>>>>>>>> showing
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>>> error code would be better.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 73. The Metadata fields are not used. I have
> removed
> >>>>>> them.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 74. The metadata is re-evaluated on every change,
> >> but
> >>>>>>>> only a
> >>>>>>>>>>>>>>>> subset
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> relevant
> >>>>>>>>>>>>>>>>>>>>>>>> for rebalancing. A check is done against the names
> >> of
> >>>>>> the
> >>>>>>>>>>>>>>>> subscribed
> >>>>>>>>>>>>>>>>>>>>>>>> topics to
> >>>>>>>>>>>>>>>>>>>>>>>> see if any relevant changes may have occurred.
> Then
> >>>> the
> >>>>>>>>>>>> changes
> >>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>>>> trigger
> >>>>>>>>>>>>>>>>>>>>>>>> a rebalance are topic creation, deletion, change
> in
> >>>>>>>>>>>> partitions,
> >>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>> rack
> >>>>>>>>>>>>>>>>>>>>>>>> IDs for the
> >>>>>>>>>>>>>>>>>>>>>>>> replicas. I have updated the KIP to make this more
> >>>>>> clear.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 75. The assignment is not persisted because it is
> >> much
> >>>>>>>> less
> >>>>>>>>>>>>>>>>>> important
> >>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> assignment survives a GC change. There’s no need
> to
> >>>>>>>> transfer
> >>>>>>>>>>>>>>>>>>>> partitions
> >>>>>>>>>>>>>>>>>>>>>>>> safely from
> >>>>>>>>>>>>>>>>>>>>>>>> member to member in the way that is required for
> >>>>>> consumer
> >>>>>>>>>>>>>> groups,
> >>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>>>> optimisation, the assignments for a share group
> are
> >>>> not
> >>>>>>>>>>>>>> persisted.
> >>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>>>>>> wouldn’t do any
> >>>>>>>>>>>>>>>>>>>>>>>> harm, but it just seems unnecessary.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 76. In the event that a consumer tries to
> >> acknowledge
> >>>> a
> >>>>>>>>>> record
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> now
> >>>>>>>>>>>>>>>>>>>>>>>> longer
> >>>>>>>>>>>>>>>>>>>>>>>> has the right to acknowledge, the
> >> INVALID_RECORD_STATE
> >>>>>>>> error
> >>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> used.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> If the application uses the
> >>>>>> KafkaShareConsumer.commitSync
> >>>>>>>>>>>>>> method,
> >>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>> see an InvalidRecordState exception returned.
> >>>>>>>> Alternatively,
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> application can
> >>>>>>>>>>>>>>>>>>>>>>>> register an acknowledgement commit callback which
> >> will
> >>>>>> be
> >>>>>>>>>>>> called
> >>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> status
> >>>>>>>>>>>>>>>>>>>>>>>> of the acknowledgements that have succeeded or
> >> failed.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 77. I have tried to tread a careful path with the
> >>>>>> durable
> >>>>>>>>>>>>>>>>>>>>>> share-partition
> >>>>>>>>>>>>>>>>>>>>>>>> state in this
> >>>>>>>>>>>>>>>>>>>>>>>> KIP. The significant choices I made are that:
> >>>>>>>>>>>>>>>>>>>>>>>> * Topics are used so that the state is replicated
> >>>>>> between
> >>>>>>>>>>>>>> brokers.
> >>>>>>>>>>>>>>>>>>>>>>>> * Log compaction is used to keep a lid on the
> >> storage.
> >>>>>>>>>>>>>>>>>>>>>>>> * Only one topic is required.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Log compaction as it stands is not ideal for this
> >> kind
> >>>>>> of
> >>>>>>>>>>>> data,
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>> evidenced by
> >>>>>>>>>>>>>>>>>>>>>>>> the DeltaIndex technique I employed.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I can think of a few relatively simple ways to
> >> improve
> >>>>>>>> upon
> >>>>>>>>>>>> it.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> a) We could use a customised version of the log
> >>>>>> compactor
> >>>>>>>>>> for
> >>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>> understands the rules for ShareCheckpoint and
> >>>> ShareDelta
> >>>>>>>>>>>>>> records.
> >>>>>>>>>>>>>>>>>>>>>>>> Essentially,
> >>>>>>>>>>>>>>>>>>>>>>>> for each share-partition, the latest
> ShareCheckpoint
> >>>> and
> >>>>>>>> any
> >>>>>>>>>>>>>>>>>>>> subsequent
> >>>>>>>>>>>>>>>>>>>>>>>> ShareDelta
> >>>>>>>>>>>>>>>>>>>>>>>> records must not be cleaned. Anything else can be
> >>>>>> cleaned.
> >>>>>>>>>> We
> >>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>>>> be sure
> >>>>>>>>>>>>>>>>>>>>>>>> that multiple ShareDelta records with the same key
> >>>> would
> >>>>>>>>>>>> survive
> >>>>>>>>>>>>>>>>>>>>>> cleaning
> >>>>>>>>>>>>>>>>>>>>>>>> and we could
> >>>>>>>>>>>>>>>>>>>>>>>> abandon the DeltaIndex technique.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> b) Actually what is required is a log per
> >>>>>> share-partition.
> >>>>>>>>>>>> Let’s
> >>>>>>>>>>>>>>>>>>>> imagine
> >>>>>>>>>>>>>>>>>>>>>>>> that we had
> >>>>>>>>>>>>>>>>>>>>>>>> a share-state topic per topic being consumed in a
> >>>> share
> >>>>>>>>>> group,
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> same number
> >>>>>>>>>>>>>>>>>>>>>>>> of partitions as the topic being consumed. We
> could
> >>>>>> write
> >>>>>>>>>> many
> >>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>> deltas
> >>>>>>>>>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>>>>>>>>> checkpoints, and just take periodic checkpoints to
> >>>> keep
> >>>>>>>>>>>> control
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> storage used.
> >>>>>>>>>>>>>>>>>>>>>>>> Once a checkpoint has been taken, we could use
> >>>>>>>>>>>>>>>>>>>>>> KafkaAdmin.deleteRecords()
> >>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> prune all of the older records.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> The share-state topics would be more numerous, but
> >> we
> >>>>>> are
> >>>>>>>>>>>>>> talking
> >>>>>>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>>>>>>>>> per
> >>>>>>>>>>>>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>>>>>>>>>>> per share group that it’s being consumed in. These
> >>>>>> topics
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>> compacted.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> As you’ll see in the KIP, the Persister interface
> is
> >>>>>>>>>> intended
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>> pluggable one day.
> >>>>>>>>>>>>>>>>>>>>>>>> I know the scheme in the KIP is not ideal. It
> seems
> >>>>>> likely
> >>>>>>>>>> to
> >>>>>>>>>>>> me
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>> future KIPs will
> >>>>>>>>>>>>>>>>>>>>>>>> improve upon it.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> If I can get buy-in for option (b), I’m happy to
> >>>> change
> >>>>>>>> this
> >>>>>>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>>>> While
> >>>>>>>>>>>>>>>>>>>>>>>> option (a) is
> >>>>>>>>>>>>>>>>>>>>>>>> probably workable, it does seem a bit of a hack to
> >>>> have
> >>>>>> a
> >>>>>>>>>>>>>>>> customised
> >>>>>>>>>>>>>>>>>>>> log
> >>>>>>>>>>>>>>>>>>>>>>>> compactor
> >>>>>>>>>>>>>>>>>>>>>>>> just for this topic.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 78. How about DeliveryState? I agree that State is
> >>>>>>>>>> overloaded.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 79. See (77).
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On 5 Apr 2024, at 05:07, David Arthur <
> >>>>>>>>>>>>>> david.art...@confluent.io
> >>>>>>>>>>>>>>>>>>>>>> .INVALID>
> >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Andrew, thanks for the KIP! This is a pretty
> >> exciting
> >>>>>>>>>> effort.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I've finally made it through the KIP, still
> trying
> >> to
> >>>>>>>> grok
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> whole
> >>>>>>>>>>>>>>>>>>>>>>>> thing.
> >>>>>>>>>>>>>>>>>>>>>>>>> Sorry if some of my questions are basic :)
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Concepts:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 70. Does the Group Coordinator communicate with
> the
> >>>>>> Share
> >>>>>>>>>>>>>>>>>> Coordinator
> >>>>>>>>>>>>>>>>>>>>>>>> over
> >>>>>>>>>>>>>>>>>>>>>>>>> RPC or directly in-process?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 71. For preventing name collisions with regular
> >>>>>> consumer
> >>>>>>>>>>>>>> groups,
> >>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>> define a reserved share group prefix? E.g., the
> >>>>>> operator
> >>>>>>>>>>>>>> defines
> >>>>>>>>>>>>>>>>>>>> "sg_"
> >>>>>>>>>>>>>>>>>>>>>>>> as a
> >>>>>>>>>>>>>>>>>>>>>>>>> prefix for share groups only, and if a regular
> >>>> consumer
> >>>>>>>>>> group
> >>>>>>>>>>>>>>>> tries
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>>>> that name it fails.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 72. When a consumer tries to use a share group,
> or
> >> a
> >>>>>>>> share
> >>>>>>>>>>>>>>>> consumer
> >>>>>>>>>>>>>>>>>>>>>> tries
> >>>>>>>>>>>>>>>>>>>>>>>>> to use a regular group, would INVALID_GROUP_ID
> make
> >>>>>> more
> >>>>>>>>>>>> sense
> >>>>>>>>>>>>>>>>>>>>>>>>> than INCONSISTENT_GROUP_PROTOCOL?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> --------
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Share Group Membership:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 73. What goes in the Metadata field for
> >>>>>>>>>>>> TargetAssignment#Member
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>> Assignment?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 74. Under Trigger a rebalance, it says we
> rebalance
> >>>>>> when
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>>>>>>>>> metadata changes. Would this be for any change,
> or
> >>>> just
> >>>>>>>>>>>> certain
> >>>>>>>>>>>>>>>>>> ones?
> >>>>>>>>>>>>>>>>>>>>>> For
> >>>>>>>>>>>>>>>>>>>>>>>>> example, if a follower drops out of the ISR and
> >> comes
> >>>>>>>> back,
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> probably
> >>>>>>>>>>>>>>>>>>>>>>>>> don't need to rebalance.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 75. "For a share group, the group coordinator
> does
> >>>>>> *not*
> >>>>>>>>>>>>>> persist
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> assignment" Can you explain why this is not
> needed?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 76. " If the consumer just failed to heartbeat
> due
> >>>> to a
> >>>>>>>>>>>>>> temporary
> >>>>>>>>>>>>>>>>>>>>>> pause,
> >>>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>> could in theory continue to fetch and acknowledge
> >>>>>>>> records.
> >>>>>>>>>>>> When
> >>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>> finally
> >>>>>>>>>>>>>>>>>>>>>>>>> sends a heartbeat and realises it’s been kicked
> out
> >>>> of
> >>>>>>>> the
> >>>>>>>>>>>>>> group,
> >>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>>>> stop fetching records because its assignment has
> >> been
> >>>>>>>>>>>> revoked,
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> rejoin
> >>>>>>>>>>>>>>>>>>>>>>>>> the group."
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> A consumer with a long pause might still deliver
> >> some
> >>>>>>>>>>>> buffered
> >>>>>>>>>>>>>>>>>>>> records,
> >>>>>>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>>>>> if the share group coordinator has expired its
> >>>> session,
> >>>>>>>> it
> >>>>>>>>>>>>>>>> wouldn't
> >>>>>>>>>>>>>>>>>>>>>>>> accept
> >>>>>>>>>>>>>>>>>>>>>>>>> acknowledgments for that share consumer. In such
> a
> >>>>>> case,
> >>>>>>>> is
> >>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>> kind
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>> error raised to the application like "hey, I know
> >> we
> >>>>>> gave
> >>>>>>>>>> you
> >>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>>>>>> records, but really we shouldn't have" ?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> -----
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Record Delivery and acknowledgement
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 77. If we guarantee that a ShareCheckpoint is
> >> written
> >>>>>> at
> >>>>>>>>>>>> least
> >>>>>>>>>>>>>>>>>> every
> >>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>>>>> often, could we add a new log compactor that
> avoids
> >>>>>>>>>>>> compacting
> >>>>>>>>>>>>>>>>>>>>>>>> ShareDelta-s
> >>>>>>>>>>>>>>>>>>>>>>>>> that are still "active" (i.e., not yet superceded
> >> by
> >>>> a
> >>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>>>> ShareCheckpoint). Mechnically, this could be done
> >> by
> >>>>>>>>>> keeping
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> LSO
> >>>>>>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>>>>>>> greater than the oldest "active" ShareCheckpoint.
> >>>> This
> >>>>>>>>>> might
> >>>>>>>>>>>>>> let
> >>>>>>>>>>>>>>>> us
> >>>>>>>>>>>>>>>>>>>>>>>> remove
> >>>>>>>>>>>>>>>>>>>>>>>>> the DeltaIndex thing.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 78. Instead of the State in the
> >> ShareDelta/Checkpoint
> >>>>>>>>>>>> records,
> >>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>>>>> MessageState? (State is kind of
> >> overloaded/ambiguous)
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 79. One possible limitation with the current
> >>>>>> persistence
> >>>>>>>>>>>> model
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>>>>>>> the share state is stored in one topic. It seems
> >> like
> >>>>>> we
> >>>>>>>>>> are
> >>>>>>>>>>>>>>>> going
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>> storing a lot more state than we do in
> >>>>>> __consumer_offsets
> >>>>>>>>>>>> since
> >>>>>>>>>>>>>>>>>> we're
> >>>>>>>>>>>>>>>>>>>>>>>>> dealing with message-level acks. With aggressive
> >>>>>>>>>>>> checkpointing
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>> compaction, we can mitigate the storage
> >> requirements,
> >>>>>> but
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> throughput
> >>>>>>>>>>>>>>>>>>>>>>>>> could be a limiting factor. Have we considered
> >> other
> >>>>>>>>>>>>>>>> possibilities
> >>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>> persistence?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>>>>> David
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to