Hi, Andrew, Removing AdminClient.listGroups() and the LisGroups RPC for now sounds good to me.
Thanks, Jun On Mon, May 6, 2024 at 11:10 AM Andrew Schofield <andrew_schofi...@live.com> wrote: > Hi Jun, > Thanks for the reply. > > 162. I’ve mentioned before that I plan another KIP for administration > of groups. I think this is heading into that territory. I would like > listGroups() to do a comprehensive job of returning all of the groups, > and that does include groups which aren’t currently covered by GroupType. > There’s a single namespace for all groups, and if someone is to make sense > of the groups in a cluster, I think they need to be able to see them all. > This is really just putting an API on top of the ListGroups RPC that > already > exists. > > I don’t think it would be desirable for connect-based groups to > have GroupType.UNKNOWN. There are other custom group types. > This all needs sorting out, I think. > > I propose to remove AdminClient.listGroups() from this KIP, and put > it in the administration KIP. > > Let me know what you think. > > Thanks, > Andrew > > > > On 6 May 2024, at 18:04, Jun Rao <j...@confluent.io.INVALID> wrote: > > > > Hi, Andrew, > > > > Thanks for the reply. > > > > 162. It's fine to start with just the group type. Since ListGroups() is a > > generic API, I want to make sure that it covers all existing groups. > > Currently, GroupType only has "classic" and "consumer", both of which > seem > > to be related to groups formed by consumers since it's part of > > ConsumerGroupDescription. Does ListGroup() return connect based groups > and > > if so, what's the GroupType? If ListGroup() doesn't cover all groups, > > should we name it more accurately? > > > > Jun > > > > > > On Fri, May 3, 2024 at 7:51 PM Andrew Schofield < > andrew_schofi...@live.com> > > wrote: > > > >> Hi Jun, > >> Thanks for your reply. > >> > >> 161. ShareGroupListing and ShareGroupDescription are using > >> the same pattern as ConsumerGroupListing and > >> ConsumerGroupDescription. I have gone for consistency which > >> I think is probably best here. It’s what I would expect if I had > previously > >> used the admin API for consumer groups and was looking to use it for > >> share groups. I agree it’s a bit weird. > >> > >> 162. GroupListing contains the only information which is properly > >> in common between a ConsumerGroupListing and a ShareGroupListing. > >> ListGroupsResponse.ProtocolType is interpreted to provide the > >> group type. I know that the ListGroups RPC also includes the group > >> state, but that’s as a string and there’s no common enum for the states > >> of all types of group. As a result, I have exposed group type but not > >> state on this API. > >> > >> Previously in the discussion for this KIP, I mentioned that I would > >> create another KIP for the administration of groups, in particular > >> how the administrator can ensure that particular group IDs > >> are used for the group type they desire. At the moment, I think > >> keeping ListGroups in this KIP is a good idea. If we actually want > >> to make it more sophisticated, perhaps that would be better with > >> the group administration KIP. > >> > >> 163. It will be one higher than the latest version at the time we are > >> ready to deliver this feature for real. When we are on the cusp of > >> delivery, I’ll update the KIP with the final value. > >> > >> 164. KRaft only. All the RPCs are “broker” only. None of the code will > >> be merged until after 3.8 has branched. > >> > >> Thanks, > >> Andrew > >> > >>> On 4 May 2024, at 00:12, Jun Rao <j...@confluent.io.INVALID> wrote: > >>> > >>> Hi, Andrew, > >>> > >>> Thanks for the reply. A few more comments. > >>> > >>> 161. ShareGroupListing.state() returns an optional, but > >>> ShareGroupDescription.state() does not. Should we make them consistent? > >>> Also, it seems a bit weird to return optional with an UNKNOWN state. > >>> > >>> 162. Should GroupListing include ProtocolType and GroupState too? > >>> > >>> 163. What is the value of group.version to gate the queuing feature? > >>> > >>> 164. Is the queueing feature only supported on KRaft clusters? For > >> example, > >>> the feature tool seems to be built only for the KRaft cluster. > >>> > >>> Jun > >>> > >>> On Fri, May 3, 2024 at 10:32 AM Andrew Schofield < > >> andrew_schofi...@live.com> > >>> wrote: > >>> > >>>> Hi Jun, > >>>> Thanks for your reply. > >>>> > >>>> 147. Yes, I see what you mean. The rebalance latency will indeed > >>>> be very short by comparison. I have removed the rebalance latency > >>>> metrics from the client and retained the rebalance count and rate. > >>>> > >>>> 150. Yes, I think so. I have tweaked the text so that the simple > >>>> assignor will take into account existing assignment information when > >>>> it has it, which would just minimise unnecessary churn of (b). > >>>> > >>>> 158. I’ve changed it to ReadShareGroupStateSummary. > >>>> > >>>> Thanks, > >>>> Andrew > >>>> > >>>> > >>>>> On 3 May 2024, at 22:17, Jun Rao <j...@confluent.io.INVALID> wrote: > >>>>> > >>>>> Hi, Andrew, > >>>>> > >>>>> Thanks for the reply. > >>>>> > >>>>> 147. There seems to be some difference between consumer groups and > >> share > >>>>> groups. In the consumer groups, if a client receives a heartbeat > >> response > >>>>> to revoke some partitions, it may have to commit offsets before > >> revoking > >>>>> partitions or it may have to call the rebalance callbacks provided by > >> the > >>>>> user. This may take some time and can be reflected in the rebalance > >> time > >>>>> metric. In the share groups, none of that exists. If a client > receives > >>>> some > >>>>> added/revoked partitions, it accepts them immediately, right? So, > does > >>>> that > >>>>> practically make the rebalance time always 0? > >>>>> > >>>>> 150. I guess in the common case, there will be many more members than > >>>>> partitions. So the need for (b) will be less common. We can probably > >>>> leave > >>>>> the persisting of the assignment out for now. > >>>>> > >>>>> 158. The new name sounds good to me. > >>>>> > >>>>> Jun > >>>>> > >>>>> On Thu, May 2, 2024 at 10:21 PM Andrew Schofield < > >>>> andrew_schofi...@live.com> > >>>>> wrote: > >>>>> > >>>>>> Hi Jun, > >>>>>> Thanks for the response. > >>>>>> > >>>>>> 147. I am trying to get a correspondence between the concepts and > >>>>>> metrics of consumer groups and share groups. In both cases, > >>>>>> the client doesn’t strictly know when the rebalance starts. All it > >> knows > >>>>>> is when it has work to do in order to perform its part of a > rebalance. > >>>>>> I am proposing that share groups and consumer groups use > >>>>>> equivalent logic. > >>>>>> > >>>>>> I could remove the rebalance metrics from the client because I do > >>>>>> understand that they are making a judgement about when a rebalance > >>>>>> starts, but it’s their own part of the rebalance they are measuring. > >>>>>> > >>>>>> I tend to think these metrics are better than no metrics and > >>>>>> will at least enable administrators to see how much rebalance > >>>>>> activity the members of share groups are experiencing. > >>>>>> > >>>>>> 150. The simple assignor does not take existing assignments into > >>>>>> consideration. The ShareGroupPartitionAssignor interface would > >>>>>> permit this, but the simple assignor does not currently use it. > >>>>>> > >>>>>> The simple assignor assigns partitions in two ways: > >>>>>> a) Distribute the members across the partitions by hashed member ID. > >>>>>> b) If any partitions have no members assigned, distribute the > members > >>>>>> across these partitions round-robin. > >>>>>> > >>>>>> The (a) partitions will be quite stable. The (b) partitions will be > >> less > >>>>>> stable. By using existing assignment information, it could make (b) > >>>>>> partition assignment more stable, whether the assignments are > >>>>>> persisted or not. Perhaps it would be worth changing the simple > >>>>>> assignor in order to make (b) more stable. > >>>>>> > >>>>>> I envisage more sophisticated assignors in the future which could > use > >>>>>> existing assignments and also other dynamic factors such as lag. > >>>>>> > >>>>>> If it transpires that there is significant benefit in persisting > >>>>>> assignments > >>>>>> specifically to help smooth assignment in the event of GC change, > >>>>>> it would be quite an easy enhancement. I am not inclined to persist > >>>>>> the assignments in this KIP. > >>>>>> > >>>>>> 158. Ah, yes. I see. Of course, I want the names as consistent and > >>>>>> understandable too. I suggest renaming > >>>>>> ReadShareGroupOffsetsState to ReadShareGroupStateSummary. > >>>>>> I haven’t changed the KIP yet, so let me know if that’s OK. > >>>>>> > >>>>>> Thanks, > >>>>>> Andrew > >>>>>> > >>>>>>> On 2 May 2024, at 22:18, Jun Rao <j...@confluent.io.INVALID> wrote: > >>>>>>> > >>>>>>> Hi, Andrew, > >>>>>>> > >>>>>>> Thanks for the reply. > >>>>>>> > >>>>>>> 147. " it makes a judgement about whether an assignment received is > >>>> equal > >>>>>>> to what it already is using." > >>>>>>> If a client receives an assignment different from what it has, it > >>>>>> indicates > >>>>>>> the end of the rebalance. But how does the client know when the > >>>> rebalance > >>>>>>> starts? In the shareHeartbeat design, the new group epoch is > >> propagated > >>>>>>> together with the new assignment in the response. > >>>>>>> > >>>>>>> 150. It could be a potential concern if each GC change forces > >>>> significant > >>>>>>> assignment changes. Does the default assignor take existing > >> assignments > >>>>>>> into consideration? > >>>>>>> > >>>>>>> 155. Good point. Sounds good. > >>>>>>> > >>>>>>> 158. My concern with the current naming is that it's not clear what > >> the > >>>>>>> difference is between ReadShareGroupOffsetsState and > >>>> ReadShareGroupState. > >>>>>>> The state in the latter is also offsets. > >>>>>>> > >>>>>>> Jun > >>>>>>> > >>>>>>> On Wed, May 1, 2024 at 9:51 PM Andrew Schofield < > >>>>>> andrew_schofi...@live.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Jun, > >>>>>>>> Thanks for your reply. > >>>>>>>> > >>>>>>>> 147. Perhaps the easiest is to take a look at the code in > >>>>>>>> o.a.k.clients.consumer.internal.MembershipManagerImpl. > >>>>>>>> This class is part of the new consumer group protocol > >>>>>>>> code in the client. It makes state transitions based on > >>>>>>>> the heartbeat requests and responses, and it makes a > >>>>>>>> judgement about whether an assignment received is > >>>>>>>> equal to what it already is using. When a state transition > >>>>>>>> is deemed to be the beginning or end of a rebalance > >>>>>>>> from the point of view of this client, it counts towards the > >>>>>>>> rebalance metrics. > >>>>>>>> > >>>>>>>> Share groups will follow the same path. > >>>>>>>> > >>>>>>>> 150. I do not consider it a concern. Rebalancing a share group > >>>>>>>> is less disruptive than rebalancing a consumer group. If the > >> assignor > >>>>>>>> Has information about existing assignments, it can use it. It is > >>>>>>>> true that this information cannot be replayed from a topic and > will > >>>>>>>> sometimes be unknown as a result. > >>>>>>>> > >>>>>>>> 151. I don’t want to rename TopicPartitionsMetadata to > >>>>>>>> simply TopicPartitions (it’s information about the partitions of > >>>>>>>> a topic) because we then have an array of plurals. > >>>>>>>> I’ve renamed Metadata to Info. That’s a bit less cumbersome. > >>>>>>>> > >>>>>>>> 152. Fixed. > >>>>>>>> > >>>>>>>> 153. It’s the GC. Fixed. > >>>>>>>> > >>>>>>>> 154. The UNKNOWN “state” is essentially a default for situations > >> where > >>>>>>>> the code cannot understand data it received. For example, let’s > say > >>>> that > >>>>>>>> Kafka 4.0 has groups with states EMPTY, STABLE, DEAD. If Kafka 4.1 > >>>>>>>> introduced another state THINKING, a tool built with Kafka 4.0 > would > >>>> not > >>>>>>>> know what THINKING meant. It will use “UNKNOWN” to indicate that > the > >>>>>>>> state was something that it could not understand. > >>>>>>>> > >>>>>>>> 155. No, it’s a the level of the share-partition. If the offsets > for > >>>>>> just > >>>>>>>> one share-partition is reset, only the state epoch for that > >> partition > >>>> is > >>>>>>>> updated. > >>>>>>>> > >>>>>>>> 156. Strictly speaking, it’s redundant. I think having the > >> StartOffset > >>>>>>>> separate gives helpful clarity and I prefer to retain it. > >>>>>>>> > >>>>>>>> 157. Yes, you are right. There’s no reason why a leader change > needs > >>>>>>>> to force a ShareSnapshot. I’ve added leaderEpoch to the > ShareUpdate. > >>>>>>>> > >>>>>>>> 158. Although ReadShareGroupOffsetsState is a bit of a mouthful, > >>>>>>>> having “State” in the name makes it clear that this one the family > >> of > >>>>>>>> inter-broker RPCs served by the share coordinator. The admin RPCs > >>>>>>>> such as DescribeShareGroupOffsets do not include “State”. > >>>>>>>> > >>>>>>>> 159. Fixed. > >>>>>>>> > >>>>>>>> 160. Fixed. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Andrew > >>>>>>>> > >>>>>>>>> On 2 May 2024, at 00:29, Jun Rao <j...@confluent.io.INVALID> > wrote: > >>>>>>>>> > >>>>>>>>> Hi, Andrew, > >>>>>>>>> > >>>>>>>>> Thanks for the reply. > >>>>>>>>> > >>>>>>>>> 147. "The measurement is certainly from the point of view of the > >>>>>> client, > >>>>>>>>> but it’s driven by sending and receiving heartbeats rather than > >>>> whether > >>>>>>>> the > >>>>>>>>> client triggered the rebalance itself." > >>>>>>>>> Hmm, how does a client know which heartbeat response starts a > >>>>>> rebalance? > >>>>>>>>> > >>>>>>>>> 150. PartitionAssignor takes existing assignments into > >> consideration. > >>>>>>>> Since > >>>>>>>>> GC doesn't persist the assignment for share groups, it means that > >>>>>>>>> ShareGroupPartitionAssignor can't reliably depend on existing > >>>>>>>> assignments. > >>>>>>>>> Is that a concern? > >>>>>>>>> > >>>>>>>>> 151. ShareGroupPartitionMetadataValue: Should we rename > >>>>>>>>> TopicPartitionsMetadata and TopicMetadata since there is no > >> metadata? > >>>>>>>>> > >>>>>>>>> 152. ShareGroupMetadataKey: "versions": "3" > >>>>>>>>> The versions should be 11. > >>>>>>>>> > >>>>>>>>> 153. ShareGroupDescription.coordinator(): The description says > "The > >>>>>> share > >>>>>>>>> group coordinator". Is that the GC or SC? > >>>>>>>>> > >>>>>>>>> 154. "A share group has only three states - EMPTY , STABLE and > >> DEAD". > >>>>>>>>> What about UNKNOWN? > >>>>>>>>> > >>>>>>>>> 155. WriteShareGroupState: StateEpoch is at the group level, not > >>>>>>>> partition > >>>>>>>>> level, right? > >>>>>>>>> > >>>>>>>>> 156. ShareSnapshotValue: Is StartOffset redundant since it's the > >> same > >>>>>> as > >>>>>>>>> the smallest FirstOffset in StateBatches? > >>>>>>>>> > >>>>>>>>> 157. Every leader change forces a ShareSnapshotValue write to > >> persist > >>>>>> the > >>>>>>>>> new leader epoch. Is that a concern? An alternative is to include > >>>>>>>>> leaderEpoch in ShareUpdateValue. > >>>>>>>>> > >>>>>>>>> 158. ReadShareGroupOffsetsState: The state is the offsets. Should > >> we > >>>>>>>> rename > >>>>>>>>> it to something like ReadShareGroupStartOffset? > >>>>>>>>> > >>>>>>>>> 159. members are assigned members round-robin => members are > >> assigned > >>>>>>>>> round-robin > >>>>>>>>> > >>>>>>>>> 160. "may called": typo > >>>>>>>>> > >>>>>>>>> Jun > >>>>>>>>> > >>>>>>>>> On Mon, Apr 29, 2024 at 10:11 AM Andrew Schofield < > >>>>>>>> andrew_schofi...@live.com> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Jun, > >>>>>>>>>> Thanks for the reply and sorry for the delay in responding. > >>>>>>>>>> > >>>>>>>>>> 123. Yes, I didn’t quite get your point earlier. The member > >>>>>>>>>> epoch is bumped by the GC when it sends a new assignment. > >>>>>>>>>> When the member sends its next heartbeat, it echoes back > >>>>>>>>>> the member epoch, which will confirm the receipt of the > >>>>>>>>>> assignment. It would send the same member epoch even > >>>>>>>>>> after recovery of a network disconnection, so that should > >>>>>>>>>> be sufficient to cope with this eventuality. > >>>>>>>>>> > >>>>>>>>>> 125. Yes, I have added it to the table which now matches > >>>>>>>>>> the text earlier in the KIP. Thanks. > >>>>>>>>>> > >>>>>>>>>> 140. Yes, I have added it to the table which now matches > >>>>>>>>>> the text earlier in the KIP. I’ve also added more detail for > >>>>>>>>>> the case where the entire share group is being deleted. > >>>>>>>>>> > >>>>>>>>>> 141. Yes! Sorry for confusing things. > >>>>>>>>>> > >>>>>>>>>> Back to the original question for this point. To delete a share > >>>>>>>>>> group, should the GC write a tombstone for each > >>>>>>>>>> ShareGroupMemberMetadata record? > >>>>>>>>>> > >>>>>>>>>> Tombstones are necessary to delete ShareGroupMemberMetadata > >>>>>>>>>> records. But, deletion of a share group is only possible when > >>>>>>>>>> the group is already empty, so the tombstones will have > >>>>>>>>>> been written as a result of the members leaving the group. > >>>>>>>>>> > >>>>>>>>>> 143. Yes, that’s right. > >>>>>>>>>> > >>>>>>>>>> 147. The measurement is certainly from the point of view > >>>>>>>>>> of the client, but it’s driven by sending and receiving > heartbeats > >>>>>>>>>> rather than whether the client triggered the rebalance itself. > >>>>>>>>>> The client decides when it enters and leaves reconciliation > >>>>>>>>>> of the assignment, and measures this period. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Andrew > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> On 26 Apr 2024, at 09:43, Jun Rao <j...@confluent.io.INVALID> > >>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the reply. > >>>>>>>>>>> > >>>>>>>>>>> 123. "Rather than add the group epoch to the > >> ShareGroupHeartbeat, I > >>>>>>>> have > >>>>>>>>>>> decided to go for TopicPartitions in ShareGroupHeartbeatRequest > >>>> which > >>>>>>>>>>> mirrors ConsumerGroupHeartbeatRequest." > >>>>>>>>>>> ShareGroupHeartbeat.MemberEpoch is the group epoch, right? Is > >> that > >>>>>>>> enough > >>>>>>>>>>> for confirming the receipt of the new assignment? > >>>>>>>>>>> > >>>>>>>>>>> 125. This also means that "Alter share group offsets" needs to > >>>> write > >>>>>> a > >>>>>>>>>>> ShareGroupPartitionMetadata record, if the partition is not > >> already > >>>>>>>>>>> initialized. > >>>>>>>>>>> > >>>>>>>>>>> 140. In the table for "Delete share group offsets", we need to > >> add > >>>> a > >>>>>>>> step > >>>>>>>>>>> to write a ShareGroupPartitionMetadata record with > >> DeletingTopics. > >>>>>>>>>>> > >>>>>>>>>>> 141. Hmm, ShareGroupMemberMetadata is stored in the > >>>>>> __consumer_offsets > >>>>>>>>>>> topic, which is a compacted topic, right? > >>>>>>>>>>> > >>>>>>>>>>> 143. So, the client sends DescribeShareGroupOffsets requests to > >> GC, > >>>>>>>> which > >>>>>>>>>>> then forwards it to SC? > >>>>>>>>>>> > >>>>>>>>>>> 147. I guess a client only knows the rebalance triggered by > >> itself, > >>>>>> but > >>>>>>>>>> not > >>>>>>>>>>> the ones triggered by other members or topic/partition changes? > >>>>>>>>>>> > >>>>>>>>>>> Jun > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Apr 25, 2024 at 4:19 AM Andrew Schofield < > >>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>> Thanks for the response. > >>>>>>>>>>>> > >>>>>>>>>>>> 123. Of course, ShareGroupHearbeat started off as > >>>>>>>> ConsumerGroupHeartbeat > >>>>>>>>>>>> and then unnecessary fields were removed. In the network issue > >>>> case, > >>>>>>>>>>>> there is not currently enough state being exchanged to be sure > >> an > >>>>>>>>>>>> assignment > >>>>>>>>>>>> was received. > >>>>>>>>>>>> > >>>>>>>>>>>> Rather than add the group epoch to the ShareGroupHeartbeat, I > >> have > >>>>>>>>>> decided > >>>>>>>>>>>> to go for TopicPartitions in ShareGroupHeartbeatRequest which > >>>>>> mirrors > >>>>>>>>>>>> ConsumerGroupHeartbeatRequest. It means the share group member > >>>> does > >>>>>>>>>>>> confirm the assignment it is using, and that can be used by > the > >> GC > >>>>>> to > >>>>>>>>>>>> safely > >>>>>>>>>>>> stop repeating the assignment in heartbeat responses. > >>>>>>>>>>>> > >>>>>>>>>>>> 125. Ah, yes. This is indeed something possible with a > consumer > >>>>>> group > >>>>>>>>>>>> and share groups should support it too. This does of course > >> imply > >>>>>> that > >>>>>>>>>>>> ShareGroupPartitionMetadataValue needs an array of partitions, > >> not > >>>>>>>>>>>> just the number. > >>>>>>>>>>>> > >>>>>>>>>>>> 140. Yes, good spot. There is an inconsistency here in > consumer > >>>>>> groups > >>>>>>>>>>>> where you can use AdminClient.deleteConsumerGroupOffsets at > the > >>>>>>>>>>>> partition level, but kafka-consumer-groups.sh --delete only > >>>> operates > >>>>>>>>>>>> at the topic level. > >>>>>>>>>>>> > >>>>>>>>>>>> Personally, I don’t think it’s sensible to delete offsets at > the > >>>>>>>>>> partition > >>>>>>>>>>>> level only. You can reset them, but if you’re actively using a > >>>> topic > >>>>>>>>>> with > >>>>>>>>>>>> a share group, I don’t see why you’d want to delete offsets > >> rather > >>>>>>>> than > >>>>>>>>>>>> reset. If you’ve finished using a topic with a share group and > >>>> want > >>>>>> to > >>>>>>>>>>>> clean > >>>>>>>>>>>> up, use delete. > >>>>>>>>>>>> > >>>>>>>>>>>> So, I’ve changed the AdminClient.deleteConsumerGroupOffsets to > >> be > >>>>>>>>>>>> topic-based and the RPCs behind it. > >>>>>>>>>>>> > >>>>>>>>>>>> The GC reconciles the cluster state with the > >>>>>>>> ShareGroupPartitionMetadata > >>>>>>>>>>>> to spot deletion of topics and the like. However, when the > >> offsets > >>>>>> for > >>>>>>>>>>>> a topic were deleted manually, the topic very like still > exists > >> so > >>>>>>>>>>>> reconciliation > >>>>>>>>>>>> alone is not going to be able to continue an interrupted > >> operation > >>>>>>>> that > >>>>>>>>>>>> has started. So, I’ve added DeletingTopics back into > >>>>>>>>>>>> ShareGroupPartitionMetadata for this purpose. It’s so failover > >> of > >>>> a > >>>>>> GC > >>>>>>>>>>>> can continue where it left off rather than leaving fragments > >>>> across > >>>>>>>> the > >>>>>>>>>>>> SCs. > >>>>>>>>>>>> > >>>>>>>>>>>> 141. That is not required. Because this is not a compacted > >> topic, > >>>> it > >>>>>>>> is > >>>>>>>>>>>> not necessary to write tombstones for every key. As long as > >> there > >>>>>> is a > >>>>>>>>>>>> clear and unambiguous record for the deletion of the group, > that > >>>> is > >>>>>>>>>> enough. > >>>>>>>>>>>> The tombstone for ShareGroupPartitionMetadata is theoretically > >> not > >>>>>>>>>>>> required but it’s a single record, rather than one per member, > >> so > >>>> I > >>>>>>>>>> prefer > >>>>>>>>>>>> to leave it as a record that the interactions with the SC have > >>>> been > >>>>>>>>>>>> completed. > >>>>>>>>>>>> > >>>>>>>>>>>> 142. > >>>>>>>>>>>> 142.1. It will prompt the user to confirm they want to > continue. > >>>>>>>>>>>> This is in common with `kafka-consumer-groups.sh` which > >>>> historically > >>>>>>>>>>>> has defaulted to --dry-run behaviour, but is due to change to > >>>>>>>> prompting > >>>>>>>>>>>> if neither --dry-run nor --execute is specified “in a future > >> major > >>>>>>>>>>>> release”. > >>>>>>>>>>>> > >>>>>>>>>>>> 142.2. It should support partition-level reset, but only > >>>> topic-level > >>>>>>>>>>>> delete. > >>>>>>>>>>>> I have updated the usage text accordingly. This is in common > >> with > >>>>>>>>>>>> kafka-consumer-groups.sh. > >>>>>>>>>>>> > >>>>>>>>>>>> 142.3. --dry-run displays the operation that would be > executed. > >>>>>>>>>>>> > >>>>>>>>>>>> 142.4. The valid values are: Dead, Empty, Stable. Added to the > >>>>>>>>>>>> usage text. > >>>>>>>>>>>> > >>>>>>>>>>>> 143. DescribeShareGroupOffsets is served by the group > >> coordinator > >>>>>>>>>>>> for this kind of reason. > >>>>>>>>>>>> > >>>>>>>>>>>> 144. That’s the default. If you haven’t asked to release or > >>>> reject, > >>>>>> it > >>>>>>>>>>>> accepts. > >>>>>>>>>>>> This is analogous to fetching and committing offsets in a > >> consumer > >>>>>>>>>> group. > >>>>>>>>>>>> > >>>>>>>>>>>> 145. I think this is a good idea, but I would prefer to defer > it > >>>>>>>> until a > >>>>>>>>>>>> future > >>>>>>>>>>>> metrics KIP that I have planned. In KIP-932, I have added > basic > >>>>>>>> metrics > >>>>>>>>>>>> only. > >>>>>>>>>>>> For example, you’ll see that there’s no concept of lag yet, > >> which > >>>>>>>> surely > >>>>>>>>>>>> will have relevance for share groups. I plan to create and > >> deliver > >>>>>> the > >>>>>>>>>>>> metrics KIP before share groups are declared ready for > >> production. > >>>>>>>>>>>> I want the new metrics to be developed with the experience of > >>>>>> running > >>>>>>>>>>>> the code. > >>>>>>>>>>>> > >>>>>>>>>>>> 146. Milliseconds. KIP updated. > >>>>>>>>>>>> > >>>>>>>>>>>> 147. There is a membership state machine in the client that > >>>>>>>>>>>> changes states as the ShareGroupHeartbeat requests and > responses > >>>>>>>>>>>> flow. The duration of a rebalance will be shorter from the > point > >>>> of > >>>>>>>>>>>> view of the share-group consumer because it doesn’t have to > >> worry > >>>>>>>> about > >>>>>>>>>>>> rebalance callbacks and committing offsets as the partitions > >> move > >>>>>>>>>>>> around, but the overall flow is very similar. So, it’s the > state > >>>>>>>>>>>> transitions > >>>>>>>>>>>> that drive the collection of the rebalance metrics. > >>>>>>>>>>>> > >>>>>>>>>>>> 148. Strangely, none of the existing uses of > >>>> records-per-request-avg > >>>>>>>>>>>> actually have records-per-request-max. I tend to err on the > side > >>>> of > >>>>>>>>>>>> consistency, but can’t think of any reason not to add this. > >> Done. > >>>>>>>>>>>> > >>>>>>>>>>>> 149. There are several error codes for > >>>> WriteShareGroupStateResponse: > >>>>>>>>>>>> > >>>>>>>>>>>> NOT_COORDINATOR - This is not the share coordinator you’re > >> looking > >>>>>>>> for. > >>>>>>>>>>>> COORDINATOR_NOT_AVAILABLE - The SC can’t > >>>>>>>>>>>> COORDINATOR_LOAD_IN_PROGRESS - The SC is replaying the topic. > >>>>>>>>>>>> GROUP_ID_NOT_FOUND - The SC doesn’t have state for this group. > >>>>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION - The SC doesn’t have state for > this > >>>>>>>>>>>> topic-partition. > >>>>>>>>>>>> FENCED_STATE_EPOCH - The write has the wrong state epoch. > >>>>>>>>>>>> INVALID_REQUEST - There was a problem with the request. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Andrew > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>>> On 24 Apr 2024, at 19:10, Jun Rao <j...@confluent.io.INVALID> > >>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the response. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 123. I thought the network issue can be covered with the > group > >>>>>> epoch. > >>>>>>>>>>>>> Basically, if the assignment is to be changed, GC bumps up > the > >>>>>> epoch > >>>>>>>>>>>> first, > >>>>>>>>>>>>> but doesn't expose the new epoch to members until the > >> assignment > >>>> is > >>>>>>>>>>>>> complete (including initializing the sharePartitionState). > Once > >>>> the > >>>>>>>>>>>>> assignment is complete, GC includes the bumped up epoch and > the > >>>> new > >>>>>>>>>>>>> assignment in the heartbeatResponse. If the next heartbeat > >>>> Request > >>>>>>>>>>>> includes > >>>>>>>>>>>>> the new epoch, it means that the member has received the new > >>>>>>>> assignment > >>>>>>>>>>>> and > >>>>>>>>>>>>> GC can exclude the assignment in the heartbeatResponse. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 125. "If AlterShareGroupOffsets is called for a > topic-partition > >>>>>> which > >>>>>>>>>> is > >>>>>>>>>>>>> not yet in ShareGroupPartitionMetadataValue, it’s not really > >> part > >>>>>> of > >>>>>>>>>> the > >>>>>>>>>>>>> group yet. So I think it’s permitted to fail the > >>>>>>>> AlterShareGroupOffsets > >>>>>>>>>>>>> because the topic-partition would not be part of > >>>>>>>> ListShareGroupOffsets > >>>>>>>>>>>>> result." > >>>>>>>>>>>>> A user may want to initialize SPSO (e.g. based on timestamp) > >>>> before > >>>>>>>> the > >>>>>>>>>>>>> application is first used. If we reject > AlterShareGroupOffsets > >>>> when > >>>>>>>>>>>>> ShareGroupPartitionMetadataValue is empty, the user will be > >>>> forced > >>>>>> to > >>>>>>>>>>>> start > >>>>>>>>>>>>> the application with the wrong SPSO first and then reset it, > >>>> which > >>>>>>>> will > >>>>>>>>>>>> be > >>>>>>>>>>>>> inconvenient. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 140. AdminClient.deleteShareGroupOffsets allows deletion of > >> SPSO > >>>>>> for > >>>>>>>>>>>>> individual partitions, but ShareGroupPartitionMetadataValue > >> only > >>>>>>>> tracks > >>>>>>>>>>>>> consecutive partitions. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 141. Delete share group: Should GC also write a tombstone for > >>>> each > >>>>>>>>>>>>> ShareGroupMemberMetadata record? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 142. kafka-share-groups.sh > >>>>>>>>>>>>> 142.1 What happens if neither --dry-run nor --execute is > >>>> specified > >>>>>>>> for > >>>>>>>>>>>>> reset-offsets? > >>>>>>>>>>>>> 142.2 Should it support --delete-offsets and --reset-offsets > at > >>>> the > >>>>>>>>>>>>> partition level to match the AdminClient api? > >>>>>>>>>>>>> 142.3 How useful is --dry-run? The corresponding RPCs don't > >> carry > >>>>>> the > >>>>>>>>>>>>> dry-run flag. > >>>>>>>>>>>>> 142.4 --state [String]: What are the valid state values? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 143. DescribeShareGroupOffsets is served by the > share-partition > >>>>>>>> leader. > >>>>>>>>>>>> How > >>>>>>>>>>>>> could a user describe the share group offset when there is no > >>>>>> member? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 144. kafka-console-share-consumer.sh: Is there an option to > >>>> accept > >>>>>>>>>>>> consumed > >>>>>>>>>>>>> messages? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 145. It would be useful to add a broker side metric that > >> measures > >>>>>> the > >>>>>>>>>>>>> pressure from group.share.partition.max.record.locks, e.g. > the > >>>>>>>> fraction > >>>>>>>>>>>> of > >>>>>>>>>>>>> the time that SPL is blocked because > >>>>>>>>>>>> group.share.partition.max.record.locks > >>>>>>>>>>>>> is reached. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 146. heartbeat-response-time-max: What's the unit? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 147. rebalance related metrics: How does a share consumer > know > >>>> when > >>>>>>>>>> there > >>>>>>>>>>>>> is a rebalance and how does it measure the rebalance time? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 148. records-per-request-avg: Should we pair it with > >>>>>>>>>>>>> records-per-request-max? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 149. If the shareGroupState is not available, what error code > >> is > >>>>>> used > >>>>>>>>>> in > >>>>>>>>>>>>> WriteShareGroupStateResponse? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Jun > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, Apr 24, 2024 at 7:12 AM Andrew Schofield < > >>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>>>> Thanks for your reply. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 123. The GC only sends the assignment in ShareGroupHeartbeat > >>>>>>>>>>>>>> response if the member has just joined, or the assignment > has > >>>> been > >>>>>>>>>>>>>> recalculated. This latter condition is met when the GC fails > >>>> over. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> With a consumer group, it works a little differently. The > >>>>>>>> heartbeating > >>>>>>>>>>>>>> occurs asynchronously with the reconciliation process in the > >>>>>> client. > >>>>>>>>>>>>>> The client has reconciliation callbacks, as well as offsets > to > >>>>>>>> commit > >>>>>>>>>>>>>> before it can confirm that revocation has occured. So, it > >> might > >>>>>> take > >>>>>>>>>>>>>> multiple heartbeats to complete the installation of a new > >> target > >>>>>>>>>>>>>> assignment in the client. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> With a share group, it’s more brutal. The GC sends the > >>>> assignment > >>>>>>>>>>>>>> in a heartbeat response, and the client is assumed to have > >> acted > >>>>>> on > >>>>>>>>>>>>>> It immediately. Since share group assignment is really about > >>>>>>>> balancing > >>>>>>>>>>>>>> consumers across the available partitions, rather than > safely > >>>>>>>> handing > >>>>>>>>>>>>>> ownership of partitions between consumers, there is less > >>>>>>>> coordination. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I wonder whether there is one situation that does need > >>>>>> considering. > >>>>>>>>>>>>>> If the network connection between the client and the GC is > >> lost, > >>>>>> it > >>>>>>>> is > >>>>>>>>>>>>>> possible that a response containing the assignment is lost. > >>>> Then, > >>>>>>>>>>>>>> the connection will be reestablished, and the assignment > will > >>>> only > >>>>>>>> be > >>>>>>>>>>>>>> sent when it’s recalculated. Adding the equivalent of > >>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest.TopicPartitions would be one > >>>>>>>>>>>>>> way to close that. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 125. Yes, you are right. The reconciliation that I describe > in > >>>> the > >>>>>>>>>>>>>> answer to (54) below is related. If AlterShareGroupOffsets > is > >>>>>> called > >>>>>>>>>>>>>> for a topic-partition which is not yet in > >>>>>>>>>>>> ShareGroupPartitionMetadataValue, > >>>>>>>>>>>>>> it’s not really part of the group yet. So I think it’s > >> permitted > >>>>>> to > >>>>>>>>>> fail > >>>>>>>>>>>>>> the AlterShareGroupOffsets because the topic-partition would > >> not > >>>>>> be > >>>>>>>>>>>>>> part of ListShareGroupOffsets result. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> If AlterShareGroupOffsets is called for a topic-partition > >> which > >>>> is > >>>>>>>> in > >>>>>>>>>>>>>> ShareGroupPartitionMetadataValue, then just calling > >>>>>>>>>>>>>> InitializeShareGroupState to set its offset would be > >> sufficient > >>>>>>>>>> without > >>>>>>>>>>>>>> writing ShareGroupPartitionMetadata again. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 138. Added. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 139. This week, I have been experimenting with using the > >>>> existing > >>>>>>>>>>>>>> consumer metrics with share consumer code. That was my plan > to > >>>> get > >>>>>>>>>>>>>> started with metrics. While they work to some extent, I am > not > >>>>>>>>>> entirely > >>>>>>>>>>>>>> happy with the result. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I have added a basic set of share consumer-specific metrics > to > >>>>>>>> KIP-932 > >>>>>>>>>>>>>> which I think is a step in the right direction. I > anticipate a > >>>>>>>> future > >>>>>>>>>>>> KIP > >>>>>>>>>>>>>> that defines many more metrics and tackles concepts such as > >> what > >>>>>>>>>>>>>> “lag” means for a share group. The metrics I’ve included are > >>>>>> simply > >>>>>>>>>>>>>> counting and measuring the operations, which is a good > start. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 54. I think either with or without > >>>>>> InitializingTopics/DeletingTopics > >>>>>>>>>> in > >>>>>>>>>>>>>> ShareGroupPartitionMetadataValue works. However, I think > there > >>>> is > >>>>>>>>>>>>>> a deeper point behind what you said. The GC needs to be > >>>> responsive > >>>>>>>>>>>>>> to topic creation and deletion, and addition of partitions > in > >>>>>> cases > >>>>>>>>>>>> where > >>>>>>>>>>>>>> it doesn’t agree with InitializingTopics/DeletingTopics. So, > >> we > >>>>>> are > >>>>>>>>>>>>>> probably not saving anything by having those fields. As a > >>>> result, > >>>>>> I > >>>>>>>>>> have > >>>>>>>>>>>>>> removed InitializingTopics and DeletingTopics and updated > the > >>>> KIP > >>>>>>>>>>>>>> accordingly. The GC needs to reconcile its view of the > >>>> initialised > >>>>>>>>>>>>>> topic-partitions with ShareGroupPartitionMetadataValue and > it > >>>> will > >>>>>>>>>>>>>> initialize or delete share-group state accordingly. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I have made one more change to the KIP. The > >>>> SharePartitionAssignor > >>>>>>>>>>>>>> interface has been renamed to > >>>>>>>>>>>>>> o.a.k.coordinator.group.assignor.ShareGroupPartitionAssignor > >> and > >>>>>> it > >>>>>>>>>>>>>> now extends the PartitionAssignor interface. It’s > essentially > >> a > >>>>>>>> marker > >>>>>>>>>>>>>> of which partition assignors can be used with share groups. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 23 Apr 2024, at 18:27, Jun Rao <j...@confluent.io.INVALID > > > >>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the reply. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 123. "it doesn’t need to confirm the assignment back to the > >>>> GC." > >>>>>>>>>>>>>>> Hmm, I thought the member needs to confirm the assignment > to > >> GC > >>>>>> to > >>>>>>>>>>>>>>> avoid GC including the assignment in the heartbeat response > >>>>>>>>>>>>>> continuously. I > >>>>>>>>>>>>>>> assume this is done by including the new group epoch in the > >>>>>>>> heartbeat > >>>>>>>>>>>>>>> response. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 125. It's possible that the share partition has never been > >>>>>>>>>> initialized > >>>>>>>>>>>>>> when > >>>>>>>>>>>>>>> AlterShareGroupOffsets is called. If GC doesn't write > >>>>>>>>>>>>>>> ShareGroupPartitionMetadata and the GC fails over, it would > >>>>>>>>>>>> reinitialize > >>>>>>>>>>>>>>> the share partition and lose the effect of > >>>>>> AlterShareGroupOffsets. > >>>>>>>> If > >>>>>>>>>>>> the > >>>>>>>>>>>>>>> partition has already been initialized and it's recorded > >>>>>>>>>>>>>>> in ShareGroupPartitionMetadata, it's possible not to write > >>>>>>>>>>>>>>> ShareGroupPartitionMetadata again when handling > >>>>>>>>>> AlterShareGroupOffsets. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 138. Could we add the flow in GC when a topic is deleted? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 139. Do we need to add any metrics in KafkaShareConsumer? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 54. "I don’t think there is any redundancy. The > >>>>>>>>>>>> ShareGroupMemberMetadata > >>>>>>>>>>>>>>> does include a list of subscribed topics. However, if there > >> is > >>>> a > >>>>>>>>>> period > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>> time in which no members are subscribed to a particular > >> topic, > >>>> it > >>>>>>>>>> does > >>>>>>>>>>>>>> not > >>>>>>>>>>>>>>> mean that the topic’s ShareGroupState should be immediately > >>>>>>>> removed, > >>>>>>>>>>>> but > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>> does mean that there will be no ShareGroupMemberMetadata > >>>> records > >>>>>>>>>>>>>> containing > >>>>>>>>>>>>>>> that topic." > >>>>>>>>>>>>>>> I am still trying to understand the value of > >> InitializingTopics > >>>>>> and > >>>>>>>>>>>>>>> DeletingTopics in ShareGroupPartitionMetadataValue. They > are > >>>> used > >>>>>>>> to > >>>>>>>>>>>>>>> remember the intention of an operation. However, GC still > >> needs > >>>>>> to > >>>>>>>>>>>> handle > >>>>>>>>>>>>>>> the case when the intention is not safely recorded. If GC > >> wants > >>>>>> to > >>>>>>>>>>>>>>> initialize a new topic/partition, a simpler approach is for > >> it > >>>> to > >>>>>>>>>> send > >>>>>>>>>>>> an > >>>>>>>>>>>>>>> InitializeShareGroupState to the share coordinator and > after > >>>>>>>>>> receiving > >>>>>>>>>>>> a > >>>>>>>>>>>>>>> success response, write ShareGroupPartitionMetadataValue > with > >>>> the > >>>>>>>>>>>>>>> initialized partition included in InitializedTopics. This > >>>> saves a > >>>>>>>>>>>> record > >>>>>>>>>>>>>>> write. It's possible for GC to fail in between the two > steps. > >>>> On > >>>>>>>>>>>>>> failover, > >>>>>>>>>>>>>>> the new GC just repeats the process. The case that you > >>>> mentioned > >>>>>>>>>> above > >>>>>>>>>>>>>> can > >>>>>>>>>>>>>>> still be achieved. If a partition is in InitializedTopics > of > >>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue, but no member subscribes > to > >>>> it, > >>>>>>>> we > >>>>>>>>>>>> can > >>>>>>>>>>>>>>> still keep the ShareGroupState as long as the topic still > >>>> exists. > >>>>>>>> The > >>>>>>>>>>>>>> same > >>>>>>>>>>>>>>> optimization could be applied to DeletingTopics too. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Jun > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:57 AM Andrew Schofield < > >>>>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>>>>>> Thanks for the reply. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 123. Every time the GC fails over, it needs to recompute > the > >>>>>>>>>>>> assignment > >>>>>>>>>>>>>>>> for every member. However, the impact of re-assignment is > >> not > >>>>>> that > >>>>>>>>>>>>>> onerous. > >>>>>>>>>>>>>>>> If the recomputed assignments are the same, which they may > >>>> well > >>>>>>>> be, > >>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>> is no impact on the members at all. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On receiving the new assignment, the member adjusts the > >>>>>>>>>>>> topic-partitions > >>>>>>>>>>>>>>>> in its share sessions, removing those which were revoked > and > >>>>>>>> adding > >>>>>>>>>>>>>> those > >>>>>>>>>>>>>>>> which were assigned. It is able to acknowledge the records > >> it > >>>>>>>>>> fetched > >>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>> the partitions which have just been revoked, and it > doesn’t > >>>> need > >>>>>>>> to > >>>>>>>>>>>>>> confirm > >>>>>>>>>>>>>>>> the assignment back to the GC. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 125. I don’t think the GC needs to write > >>>>>>>> ShareGroupPartitionMetadata > >>>>>>>>>>>>>>>> when processing AlterShareGroupOffsets. This is because > the > >>>>>>>>>> operation > >>>>>>>>>>>>>>>> happens as a result of an explicit administrative action > and > >>>> it > >>>>>> is > >>>>>>>>>>>>>> possible > >>>>>>>>>>>>>>>> to return a specific error code for each topic-partition. > >> The > >>>>>>>> cases > >>>>>>>>>>>>>> where > >>>>>>>>>>>>>>>> ShareGroupPartitionMetadata is used are when a topic is > >> added > >>>> or > >>>>>>>>>>>> removed > >>>>>>>>>>>>>>>> from the subscribed topics, or the number of partitions > >>>> changes. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 130. I suppose that limits the minimum lock timeout for a > >>>>>> cluster > >>>>>>>> to > >>>>>>>>>>>>>>>> prevent > >>>>>>>>>>>>>>>> a group from having an excessively low value. Config > added. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 131. I have changed it to > >>>>>> group.share.partition.max.record.locks. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 136. When GC failover occurs, the GC gaining ownership > of a > >>>>>>>>>> partition > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>> the __consumer_offsets topic replays the records to build > >> its > >>>>>>>> state. > >>>>>>>>>>>>>>>> In the case of a share group, it learns: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> * The share group and its group epoch (ShareGroupMetadata) > >>>>>>>>>>>>>>>> * The list of members (ShareGroupMemberMetadata) > >>>>>>>>>>>>>>>> * The list of share-partitions > (ShareGroupPartitionMetadata) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> It will recompute the assignments in order to respond to > >>>>>>>>>>>>>>>> ShareGroupHeartbeat requests. As a result, it bumps the > >> group > >>>>>>>> epoch. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I will update the KIP accordingly to confirm the > behaviour. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 137.1: The GC and the SPL report the metrics in the > >>>>>>>>>>>>>>>> group-coordinator-metrics > >>>>>>>>>>>>>>>> group. Unlike consumer groups in which the GC performs > >> offset > >>>>>>>>>> commit, > >>>>>>>>>>>>>>>> the share group equivalent is performed by the SPL. So, > I’ve > >>>>>>>> grouped > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> concepts which relate to the group in > >>>> group-coordinator-metrics. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The SC reports the metrics in the > share-coordinator-metrics > >>>>>> group. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 137.2: There is one metric in both groups - > >>>> partition-load-time. > >>>>>>>> In > >>>>>>>>>>>> the > >>>>>>>>>>>>>> SC > >>>>>>>>>>>>>>>> group, > >>>>>>>>>>>>>>>> it refers to the time loading data from the share-group > >> state > >>>>>>>> topic > >>>>>>>>>> so > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>> a ReadShareGroupState request can be answered. In the GC > >>>> group, > >>>>>>>>>>>>>>>> it refers to the time to read the state from the > persister. > >>>>>> Apart > >>>>>>>>>> from > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> interbroker RPC latency of the read, they’re likely to be > >> very > >>>>>>>>>> close. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Later, for a cluster which is using a custom persister, > the > >>>>>>>>>>>>>>>> share-coordinator > >>>>>>>>>>>>>>>> metrics would likely not be reported, and the persister > >> would > >>>>>> have > >>>>>>>>>> its > >>>>>>>>>>>>>> own > >>>>>>>>>>>>>>>> metrics. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 137.3: Correct. Fixed. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 137.4: Yes, it does include the time to write to the > >> internal > >>>>>>>> topic. > >>>>>>>>>>>>>>>> I’ve tweaked the description. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 22 Apr 2024, at 20:04, Jun Rao > <j...@confluent.io.INVALID > >>> > >>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for the reply. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 123. "The share group does not persist the target > >>>> assignment." > >>>>>>>>>>>>>>>>> What's the impact of this? Everytime that GC fails over, > it > >>>>>> needs > >>>>>>>>>> to > >>>>>>>>>>>>>>>>> recompute the assignment for every member. Do we expect > the > >>>>>>>> member > >>>>>>>>>>>>>>>>> assignment to change on every GC failover? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 125. Should the GC also write > ShareGroupPartitionMetadata? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 127. So, group epoch is only propagated to SC when > >>>>>>>>>>>>>>>>> InitializeShareGroupState request is sent. This sounds > >> good. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 130. Should we have a > >>>> group.share.min.record.lock.duration.ms > >>>>>> to > >>>>>>>>>>>> pair > >>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>> group.share.max.record.lock.duration.ms? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 131. Sounds good. The name > >>>>>>>> group.share.record.lock.partition.limit > >>>>>>>>>>>>>>>> doesn't > >>>>>>>>>>>>>>>>> seem very intuitive. How about something > >>>>>>>>>>>>>>>>> like group.share.partition.max.records.pending.ack? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 136. Could we describe the process of GC failover? I > guess > >> it > >>>>>>>> needs > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> compute member reassignment and check if there is any new > >>>>>>>>>>>>>> topic/partition > >>>>>>>>>>>>>>>>> matching the share subscription. Does it bump up the > group > >>>>>> epoch? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 137. Metrics: > >>>>>>>>>>>>>>>>> 137.1 It would be useful to document who reports each > >> metric. > >>>>>> Is > >>>>>>>> it > >>>>>>>>>>>> any > >>>>>>>>>>>>>>>>> broker, GC, SC or SPL? > >>>>>>>>>>>>>>>>> 137.2 partition-load-time: Is that the loading time at > SPL > >> or > >>>>>> SC? > >>>>>>>>>>>>>>>>> 137.3 "The time taken in milliseconds to load the > >> share-group > >>>>>>>> state > >>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>> the share-group state partitions loaded in the last 30 > >>>>>> seconds." > >>>>>>>>>>>>>>>>> The window depends on metrics.num.samples and > >>>>>>>>>>>> metrics.sample.window.ms > >>>>>>>>>>>>>>>>> and is not always 30 seconds, right? > >>>>>>>>>>>>>>>>> 137.4 Could you explain write/write-latency a bit more? > >> Does > >>>> it > >>>>>>>>>>>> include > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> time to write to the internal topic? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Jun > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield < > >>>>>>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>>>>>>>> Thanks for your comments. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 120. Thanks. Fixed. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch indicates which > >> snapshot > >>>>>>>>>>>>>>>>>> the update applies to. It should of course be the > snapshot > >>>>>> that > >>>>>>>>>>>>>> precedes > >>>>>>>>>>>>>>>>>> it in the log. It’s just there to provide a consistency > >>>> check. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I also noticed that ShareSnapshotValue was missing > >>>> StateEpoch. > >>>>>>>> It > >>>>>>>>>>>>>>>>>> isn’t any more. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 122. In KIP-848, ConsumerGroupMemberMetadataValue > includes > >>>>>>>>>>>>>>>>>> GroupEpoch, but in the code it does not. In fact, there > is > >>>>>>>>>>>>>> considerable > >>>>>>>>>>>>>>>>>> divergence between the KIP and the code for this record > >>>> value > >>>>>>>>>> schema > >>>>>>>>>>>>>>>>>> which I expect will be resolved when the migration code > >> has > >>>>>> been > >>>>>>>>>>>>>>>>>> completed. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 123. The share group does not persist the target > >> assignment. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 124. Share groups have three kinds of record: > >>>>>>>>>>>>>>>>>> i) ShareGroupMetadata > >>>>>>>>>>>>>>>>>> - this contains the group epoch and is written whenever > >> the > >>>>>>>> group > >>>>>>>>>>>>>>>>>> epoch changes. > >>>>>>>>>>>>>>>>>> ii) ShareGroupMemberMetadata > >>>>>>>>>>>>>>>>>> - this does not contain the group epoch. > >>>>>>>>>>>>>>>>>> iii) ShareGroupPartitionMetadata > >>>>>>>>>>>>>>>>>> - this currently contains the epoch, but I think that is > >>>>>>>>>>>> unnecessary. > >>>>>>>>>>>>>>>>>> For one thing, the ConsumerGroupPartitionMetadata > >> definition > >>>>>>>>>>>>>>>>>> contains the group epoch, but the value appears never to > >> be > >>>>>> set. > >>>>>>>>>>>>>>>>>> David Jacot confirms that it’s not necessary and is > >> removing > >>>>>> it. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I have removed the Epoch from > ShareGroupPartitionMetadata. > >>>>>>>>>>>>>>>>>> The only purpose of the persisting the epoch for a share > >>>> group > >>>>>>>> is > >>>>>>>>>> so > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> when a group coordinator takes over the share group, it > is > >>>>>> able > >>>>>>>> to > >>>>>>>>>>>>>>>>>> continue the sequence of epochs. > >>>> ShareGroupMetadataValue.Epoch > >>>>>>>>>>>>>>>>>> is used for this. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 125. The group epoch will be incremented in this case > and > >>>>>>>>>>>>>>>>>> consequently a ShareGroupMetadata will be written. KIP > >>>>>> updated. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 126. Not directly. A share group can only be deleted > when > >> it > >>>>>> has > >>>>>>>>>> no > >>>>>>>>>>>>>>>>>> members, so the tombstones for ShareGroupMemberMetadata > >> will > >>>>>>>>>>>>>>>>>> have been written when the members left. I have > clarified > >>>>>> this. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 127. The share coordinator is ignorant of the group > epoch. > >>>>>> When > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>> group coordinator is initializing the share-group state > >> the > >>>>>>>> first > >>>>>>>>>>>> time > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> a share-partition is being added to an assignment in the > >>>>>> group, > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>> group epoch is used as the state epoch. But as the group > >>>> epoch > >>>>>>>>>>>>>>>>>> increases over time, the share coordinator is entirely > >>>>>> unaware. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> When the first consumer for a share-partition fetches > >>>> records > >>>>>>>>>> from a > >>>>>>>>>>>>>>>>>> share-partition leader, the SPL calls the share > >> coordinator > >>>> to > >>>>>>>>>>>>>>>>>> ReadShareGroupState. If the SPL has previously read the > >>>>>>>>>> information > >>>>>>>>>>>>>>>>>> and again it’s going from 0 to 1 consumer, it confirms > >> it's > >>>> up > >>>>>>>> to > >>>>>>>>>>>> date > >>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>> calling ReadShareGroupOffsetsState. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Even if many consumers are joining at the same time, any > >>>>>>>>>>>>>> share-partition > >>>>>>>>>>>>>>>>>> which is being initialized will not be included in their > >>>>>>>>>>>> assignments. > >>>>>>>>>>>>>>>> Once > >>>>>>>>>>>>>>>>>> the initialization is complete, the next rebalance will > >>>> assign > >>>>>>>> the > >>>>>>>>>>>>>>>>>> partition > >>>>>>>>>>>>>>>>>> to some consumers which will discover this by > >>>>>>>> ShareGroupHeartbeat > >>>>>>>>>>>>>>>>>> response. And then, the fetching begins. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> If an SPL receives a ShareFetch request before it’s read > >> the > >>>>>>>> state > >>>>>>>>>>>>>>>>>> from the SC, it can make the ShareFetch request wait up > to > >>>>>>>>>> MaxWaitMs > >>>>>>>>>>>>>>>>>> and then it can return an empty set of records if it’s > >> still > >>>>>> not > >>>>>>>>>>>>>> ready. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> So, I don’t believe there will be too much load. If a > >> topic > >>>>>> with > >>>>>>>>>>>> many > >>>>>>>>>>>>>>>>>> partitions is added to the subscribed topics for a share > >>>>>> group, > >>>>>>>>>> the > >>>>>>>>>>>>>> fact > >>>>>>>>>>>>>>>>>> that the assignments will only start to include the > >>>> partitions > >>>>>>>> as > >>>>>>>>>>>>>> their > >>>>>>>>>>>>>>>>>> initialization completes should soften the impact. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 128, 129: The “proper” way to turn on this feature when > >> it’s > >>>>>>>>>>>> finished > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>> be using `group.coordinator.rebalance.protocols` and > >>>>>>>>>>>> `group.version`. > >>>>>>>>>>>>>>>>>> While it’s in Early Access and for test cases, the > >>>>>>>>>>>>>> `group.share.enable` > >>>>>>>>>>>>>>>>>> configuration will turn it on. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I have described `group.share.enable` as an internal > >>>>>>>> configuration > >>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> the KIP. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 130. The config `group.share.record.lock.duration.ms` > >>>> applies > >>>>>>>> to > >>>>>>>>>>>>>> groups > >>>>>>>>>>>>>>>>>> which do not specify a group-level configuration for > lock > >>>>>>>>>> duration. > >>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>> minimum and maximum for this configuration are intended > to > >>>>>> give > >>>>>>>> it > >>>>>>>>>>>>>>>>>> sensible bounds. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> If a group does specify its own ` > >>>>>>>>>>>> group.share.record.lock.duration.ms > >>>>>>>>>>>>>> `, > >>>>>>>>>>>>>>>>>> the broker-level ` > group.share.max.record.lock.duration.ms > >> ` > >>>>>>>> gives > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> cluster administrator a way of setting a maximum value > for > >>>> all > >>>>>>>>>>>> groups. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> While editing, I renamed ` > >>>>>>>> group.share.record.lock.duration.max.ms > >>>>>>>>>> ` > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> `group.share.max.record.lock.duration.ms` for > consistency > >>>>>> with > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>> rest of the min/max configurations. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 131. This is the limit per partition so you can go wider > >>>> with > >>>>>>>>>>>> multiple > >>>>>>>>>>>>>>>>>> partitions. > >>>>>>>>>>>>>>>>>> I have set the initial value low for safety. I expect to > >> be > >>>>>> able > >>>>>>>>>> to > >>>>>>>>>>>>>>>>>> increase this > >>>>>>>>>>>>>>>>>> significantly when we have mature code which has been > >>>>>>>>>> battle-tested. > >>>>>>>>>>>>>>>>>> Rather than try to guess how high it can safely go, I’ve > >>>> erred > >>>>>>>> on > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> side > >>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> caution and expect to open it up in a future KIP. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 132. Good catch. The problem is that I have missed two > >> group > >>>>>>>>>>>>>>>>>> configurations, > >>>>>>>>>>>>>>>>>> now added. These are group.share.session.timeout.ms and > >>>>>>>>>>>>>>>>>> group.share.heartbeat.timeout.ms . The configurations > you > >>>>>>>>>> mentioned > >>>>>>>>>>>>>>>>>> are the bounds for the group-level configurations. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 133. The name `group.share.max.size` was chosen to > mirror > >>>> the > >>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>> `group.consumer.max.size`. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 134. It is intended to be a list of all of the valid > >>>> assignors > >>>>>>>> for > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> cluster. > >>>>>>>>>>>>>>>>>> When the assignors are configurable at the group level, > >> the > >>>>>>>> group > >>>>>>>>>>>>>>>>>> configuration > >>>>>>>>>>>>>>>>>> will only be permitted to name an assignor which is in > >> this > >>>>>>>> list. > >>>>>>>>>>>> For > >>>>>>>>>>>>>>>> now, > >>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>> is no group configuration for assignor, so all groups > get > >>>> the > >>>>>>>> one > >>>>>>>>>>>> and > >>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>> assignor in the list. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 135. It’s the number of threads per broker. For a > cluster > >>>>>> with a > >>>>>>>>>>>> small > >>>>>>>>>>>>>>>>>> number of > >>>>>>>>>>>>>>>>>> of brokers and a lot of share group activity, it may be > >>>>>>>>>> appropriate > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> increase > >>>>>>>>>>>>>>>>>> this. We will be able to give tuning advice once we have > >>>>>>>>>> experience > >>>>>>>>>>>> of > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> performance impact of increasing it. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 20 Apr 2024, at 00:14, Jun Rao > >> <j...@confluent.io.INVALID > >>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for the reply. A few more comments. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 120. There is still reference to > >> ConsumerGroupMetadataKey. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 121. ShareUpdateValue.SnapshotEpoch: Should we change > it > >>>>>> since > >>>>>>>>>> it's > >>>>>>>>>>>>>>>> not a > >>>>>>>>>>>>>>>>>>> snapshot? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 122. ConsumerGroupMemberMetadataValue includes epoch, > but > >>>>>>>>>>>>>>>>>>> ShareGroupMemberMetadataValue does not. Do we know how > >> the > >>>>>>>> epoch > >>>>>>>>>> is > >>>>>>>>>>>>>>>> being > >>>>>>>>>>>>>>>>>>> used in the consumer group and whether it's needed in > the > >>>>>> share > >>>>>>>>>>>>>> group? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 123. There is no equivalent of > >>>>>>>>>> ConsumerGroupTargetAssignmentMember > >>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> ShareGroup. How does the shareGroup persist the member > >>>>>>>>>> assignment? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 124. Assign a share-partition: "When a topic-partition > is > >>>>>>>>>> assigned > >>>>>>>>>>>>>> to a > >>>>>>>>>>>>>>>>>>> member of a share group for the first time, the group > >>>>>>>> coordinator > >>>>>>>>>>>>>>>> writes > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>> ShareGroupPartitionMetadata record to the > >>>> __consumer_offsets > >>>>>>>>>>>> topic". > >>>>>>>>>>>>>>>>>>> When does the coordinator write ShareGroupMetadata with > >> the > >>>>>>>>>> bumped > >>>>>>>>>>>>>>>>>> epoch? > >>>>>>>>>>>>>>>>>>> In general, is there a particular order to bump up the > >>>> epoch > >>>>>> in > >>>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>> records? When can the new epoch be exposed to the > >>>>>>>>>>>>>> sharePartitionLeader? > >>>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>> would be useful to make this clear in other state > >> changing > >>>>>>>>>>>> operations > >>>>>>>>>>>>>>>>>> too. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 125. "Alter share group offsets Group coordinator and > >> share > >>>>>>>>>>>>>> coordinator > >>>>>>>>>>>>>>>>>>> Only empty share groups support this operation. The > group > >>>>>>>>>>>> coordinator > >>>>>>>>>>>>>>>>>> sends > >>>>>>>>>>>>>>>>>>> an InitializeShareGroupState request to the share > >>>>>> coordinator. > >>>>>>>>>> The > >>>>>>>>>>>>>>>> share > >>>>>>>>>>>>>>>>>>> coordinator writes a ShareSnapshot record with the new > >>>> state > >>>>>>>>>> epoch > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> __share_group_state topic." > >>>>>>>>>>>>>>>>>>> Does the operation need to write > >>>> ShareGroupPartitionMetadata > >>>>>>>> and > >>>>>>>>>>>>>>>>>>> ShareGroupMetadata (for the new epoch)? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 126. Delete share group: Does this operation also need > to > >>>>>>>> write a > >>>>>>>>>>>>>>>>>> tombstone > >>>>>>>>>>>>>>>>>>> for the ShareGroupMemberMetadata record? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 127. I was thinking about the impact on a new consumer > >>>>>> joining > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> shareGroup. This causes the GroupCoordinator and the > >>>>>>>>>>>> ShareCoordinator > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> bump up the group epoch, which in turn causes the > >>>>>>>> SharePartition > >>>>>>>>>>>>>> leader > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> reinitialize the state with ReadShareGroupOffsetsState. > >> If > >>>>>> many > >>>>>>>>>>>>>>>> consumers > >>>>>>>>>>>>>>>>>>> are joining a shareGroup around the same time, would > >> there > >>>> be > >>>>>>>> too > >>>>>>>>>>>>>> much > >>>>>>>>>>>>>>>>>> load > >>>>>>>>>>>>>>>>>>> for the ShareCoordinator and SharePartition leader? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 128. Since group.share.enable will be replaced by the > >>>>>>>>>> group.version > >>>>>>>>>>>>>>>>>>> feature, should we make it an internal config? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 129. group.coordinator.rebalance.protocols: this seems > >>>>>>>> redundant > >>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>> group.share.enable? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 130. Why do we have both > >>>> group.share.record.lock.duration.ms > >>>>>>>> and > >>>>>>>>>>>>>>>>>>> group.share.record.lock.duration.max.ms, each with its > >> own > >>>>>> max > >>>>>>>>>>>>>> value? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 131. group.share.record.lock.partition.limit defaults > to > >>>> 200. > >>>>>>>>>> This > >>>>>>>>>>>>>>>> limits > >>>>>>>>>>>>>>>>>>> the max degree of consumer parallelism to 200, right? > If > >>>>>> there > >>>>>>>>>> are > >>>>>>>>>>>>>>>>>> multiple > >>>>>>>>>>>>>>>>>>> records per batch, it could be even smaller. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 132. Why do we need all three of > >>>>>>>> group.share.session.timeout.ms, > >>>>>>>>>>>>>>>>>>> group.share.min.session.timeout.ms and > >>>>>>>>>>>>>>>>>> group.share.max.session.timeout.ms? > >>>>>>>>>>>>>>>>>>> Session timeout can't be set by the client. Ditto for > >>>>>>>>>>>>>>>>>>> group.share.heartbeat.interval.ms. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 133. group.share.max.size: Would > >>>>>>>>>> group.share.max.members.per.group > >>>>>>>>>>>>>> be a > >>>>>>>>>>>>>>>>>>> more intuitive name? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 134. group.share.assignors: Why does it need to be a > >> list? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 135. share.coordinator.threads: Is that per share > >>>> coordinator > >>>>>>>> or > >>>>>>>>>>>> per > >>>>>>>>>>>>>>>>>> broker? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Jun > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Tue, Apr 16, 2024 at 3:21 AM Andrew Schofield < > >>>>>>>>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>>>>>>>>>> Thanks for you reply. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 42.1. That’s a sensible improvement. Done. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 47,56. Done. All instances of BaseOffset changed to > >>>>>>>> FirstOffset. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 105. I think that would be in a future KIP. > Personally, > >> I > >>>>>>>> don’t > >>>>>>>>>>>> mind > >>>>>>>>>>>>>>>>>> having > >>>>>>>>>>>>>>>>>>>> a non-contiguous set of values in this KIP. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 114. Done. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 115. If the poll is just returning a single record > >> because > >>>>>>>> there > >>>>>>>>>>>> is > >>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>> much > >>>>>>>>>>>>>>>>>>>> data to consume, committing on every record is OK. > It’s > >>>>>>>>>>>> inefficient > >>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>> acceptable. > >>>>>>>>>>>>>>>>>>>> If the first poll returns just one record, but many > more > >>>>>> have > >>>>>>>>>>>> piled > >>>>>>>>>>>>>> up > >>>>>>>>>>>>>>>>>>>> while > >>>>>>>>>>>>>>>>>>>> the first one was being processed, the next poll has > the > >>>>>>>>>>>> opportunity > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>>>> a bunch of records and then these will be able to be > >>>>>> committed > >>>>>>>>>>>>>>>> together. > >>>>>>>>>>>>>>>>>>>> So, my answer is that optimisation on the broker to > >> return > >>>>>>>>>> batches > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> records when the records are available is the approach > >> we > >>>>>> will > >>>>>>>>>>>> take > >>>>>>>>>>>>>>>>>> here. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 116. Good idea. Done. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 117. I’ve rewritten the Kafka Broker Migration > section. > >>>> Let > >>>>>> me > >>>>>>>>>>>> know > >>>>>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>>>>> you think. > >>>>>>>>>>>>>>>>>>>> I am discussing the configuration to enable the > feature > >> in > >>>>>> the > >>>>>>>>>>>>>> mailing > >>>>>>>>>>>>>>>>>>>> list with > >>>>>>>>>>>>>>>>>>>> David Jacot also, so I anticipate a bit of change in > >> this > >>>>>> area > >>>>>>>>>>>>>> still. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 15 Apr 2024, at 23:34, Jun Rao > >>>> <j...@confluent.io.INVALID > >>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks for the updated KIP. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 42.1 "If the share group offset is altered multiple > >> times > >>>>>>>> when > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>>>>> remains empty, it would be harmless if the same state > >>>> epoch > >>>>>>>> was > >>>>>>>>>>>>>>>> reused > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> initialize the state." > >>>>>>>>>>>>>>>>>>>>> Hmm, how does the partition leader know for sure that > >> it > >>>>>> has > >>>>>>>>>>>>>> received > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> latest share group offset if epoch is reused? > >>>>>>>>>>>>>>>>>>>>> Could we update the section "Group epoch - Trigger a > >>>>>>>> rebalance" > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>> AdminClient.alterShareGroupOffsets causes the group > >> epoch > >>>>>> to > >>>>>>>> be > >>>>>>>>>>>>>>>> bumped > >>>>>>>>>>>>>>>>>>>> too? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 47,56 "my view is that BaseOffset should become > >>>> FirstOffset > >>>>>>>> in > >>>>>>>>>>>> ALL > >>>>>>>>>>>>>>>>>>>> schemas > >>>>>>>>>>>>>>>>>>>>> defined in the KIP." > >>>>>>>>>>>>>>>>>>>>> Yes, that seems better to me. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 105. "I have another non-terminal state in mind for > 3." > >>>>>>>>>>>>>>>>>>>>> Should we document it? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 114. session.timeout.ms in the consumer > configuration > >> is > >>>>>>>>>>>>>> deprecated > >>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>> KIP-848. So, we need to remove it from the > >> shareConsumer > >>>>>>>>>>>>>>>> configuration. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 115. I am wondering if it's a good idea to always > >> commit > >>>>>> acks > >>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>> ShareConsumer.poll(). In the extreme case, each batch > >> may > >>>>>>>> only > >>>>>>>>>>>>>>>> contain > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>> single record and each poll() only returns a single > >>>> batch. > >>>>>>>> This > >>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>> cause > >>>>>>>>>>>>>>>>>>>>> each record to be committed individually. Is there a > >> way > >>>>>> for > >>>>>>>> a > >>>>>>>>>>>> user > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> optimize this? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 116. For each new RPC, could we list the associated > >> acls? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 117. Since this KIP changes internal records and > RPCs, > >> it > >>>>>>>> would > >>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> useful > >>>>>>>>>>>>>>>>>>>>> to document the upgrade process. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Jun > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 7:35 AM Andrew Schofield < > >>>>>>>>>>>>>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>>>>>>>>>>>> Thanks for your questions. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 41. > >>>>>>>>>>>>>>>>>>>>>> 41.1. The partition leader obtains the state epoch > in > >>>> the > >>>>>>>>>>>> response > >>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>> ReadShareGroupState. When it becomes a > share-partition > >>>>>>>> leader, > >>>>>>>>>>>>>>>>>>>>>> it reads the share-group state and one of the things > >> it > >>>>>>>> learns > >>>>>>>>>>>> is > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> current state epoch. Then it uses the state epoch in > >> all > >>>>>>>>>>>>>> subsequent > >>>>>>>>>>>>>>>>>>>>>> calls to WriteShareGroupState. The fencing is to > >> prevent > >>>>>>>>>> writes > >>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>> a previous state epoch, which are very unlikely but > >>>> which > >>>>>>>>>> would > >>>>>>>>>>>>>> mean > >>>>>>>>>>>>>>>>>>>>>> that a leader was using an out-of-date epoch and was > >>>>>> likely > >>>>>>>> no > >>>>>>>>>>>>>>>> longer > >>>>>>>>>>>>>>>>>>>>>> the current leader at all, perhaps due to a long > pause > >>>> for > >>>>>>>>>> some > >>>>>>>>>>>>>>>>>> reason. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 41.2. If the group coordinator were to set the SPSO, > >>>>>>>> wouldn’t > >>>>>>>>>> it > >>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>>>> to discover the initial offset? I’m trying to avoid > >> yet > >>>>>>>>>> another > >>>>>>>>>>>>>>>>>>>>>> inter-broker > >>>>>>>>>>>>>>>>>>>>>> hop. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 42. > >>>>>>>>>>>>>>>>>>>>>> 42.1. I think I’ve confused things. When the share > >> group > >>>>>>>>>> offset > >>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> altered > >>>>>>>>>>>>>>>>>>>>>> using AdminClient.alterShareGroupOffsets, the group > >>>>>>>>>> coordinator > >>>>>>>>>>>>>> WILL > >>>>>>>>>>>>>>>>>>>>>> update the state epoch. I don’t think it needs to > >> update > >>>>>> the > >>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>> epoch > >>>>>>>>>>>>>>>>>>>>>> at the same time (although it could) because the > group > >>>>>> epoch > >>>>>>>>>>>> will > >>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>> been bumped when the group became empty. If the > share > >>>>>> group > >>>>>>>>>>>> offset > >>>>>>>>>>>>>>>>>>>>>> is altered multiple times when the group remains > >> empty, > >>>> it > >>>>>>>>>> would > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>> harmless if the same state epoch was reused to > >>>> initialize > >>>>>>>> the > >>>>>>>>>>>>>> state. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> When the share-partition leader updates the SPSO as > a > >>>>>> result > >>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>> the usual flow of record delivery, it does not > update > >>>> the > >>>>>>>>>> state > >>>>>>>>>>>>>>>> epoch. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 42.2. The share-partition leader will notice the > >>>>>> alteration > >>>>>>>>>>>>>> because, > >>>>>>>>>>>>>>>>>>>>>> when it issues WriteShareGroupState, the response > will > >>>>>>>> contain > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> error code FENCED_STATE_EPOCH. This is supposed to > be > >>>> the > >>>>>>>>>>>>>>>>>>>>>> last-resort way of catching this. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> When the share-partition leader handles its first > >>>>>> ShareFetch > >>>>>>>>>>>>>>>> request, > >>>>>>>>>>>>>>>>>>>>>> it learns the state epoch from the response to > >>>>>>>>>>>>>> ReadShareGroupState. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> In normal running, the state epoch will remain > >> constant, > >>>>>>>> but, > >>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>>>> are no consumers and the group is empty, it might > >>>> change. > >>>>>>>> As a > >>>>>>>>>>>>>>>> result, > >>>>>>>>>>>>>>>>>>>>>> I think it would be sensible when the set of share > >>>>>> sessions > >>>>>>>>>>>>>>>>>> transitions > >>>>>>>>>>>>>>>>>>>>>> from 0 to 1, which is a reasonable proxy for the > share > >>>>>> group > >>>>>>>>>>>>>>>>>>>> transitioning > >>>>>>>>>>>>>>>>>>>>>> from empty to non-empty, for the share-partition > >> leader > >>>> to > >>>>>>>>>> issue > >>>>>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsState to validate the state > >> epoch. > >>>> If > >>>>>>>> its > >>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>>>>>> epoch is out of date, it can then > ReadShareGroupState > >> to > >>>>>>>>>>>>>>>>>> re-initialize. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I’ve changed the KIP accordingly. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 47, 56. If I am to change BaseOffset to FirstOffset, > >> we > >>>>>> need > >>>>>>>>>> to > >>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>> a clear view of which is the correct term. Having > >>>> reviewed > >>>>>>>> all > >>>>>>>>>>>> of > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> instances, my view is that BaseOffset should become > >>>>>>>>>> FirstOffset > >>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>> ALL schemas defined in the KIP. Then, BaseOffset is > >> just > >>>>>>>> used > >>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>> record batches, which is already a known concept. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Please let me know if you agree. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 60. I’ve added FindCoordinator to the top level > index > >>>> for > >>>>>>>>>>>> protocol > >>>>>>>>>>>>>>>>>>>> changes. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 61. OK. I expect you are correct about how users > will > >> be > >>>>>>>> using > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> console share consumer. When I use the console > >>>> consumer, I > >>>>>>>>>>>> always > >>>>>>>>>>>>>>>> get > >>>>>>>>>>>>>>>>>>>>>> a new consumer group. I have changed the default > group > >>>> ID > >>>>>>>> for > >>>>>>>>>>>>>>>> console > >>>>>>>>>>>>>>>>>>>>>> share consumer to “console-share-consumer” to match > >> the > >>>>>>>>>> console > >>>>>>>>>>>>>>>>>> consumer > >>>>>>>>>>>>>>>>>>>>>> better and give more of an idea where this > mysterious > >>>>>> group > >>>>>>>>>> has > >>>>>>>>>>>>>> come > >>>>>>>>>>>>>>>>>>>> from. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 77. I will work on a proposal that does not use > >>>> compaction > >>>>>>>> and > >>>>>>>>>>>> we > >>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>> make a judgement about whether it’s a better course > >> for > >>>>>>>>>> KIP-932. > >>>>>>>>>>>>>>>>>>>>>> Personally, > >>>>>>>>>>>>>>>>>>>>>> until I’ve written it down and lived with the ideas > >> for > >>>> a > >>>>>>>> few > >>>>>>>>>>>>>> days, > >>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>> won’t be > >>>>>>>>>>>>>>>>>>>>>> able to choose which I prefer. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I should be able to get the proposal written by the > >> end > >>>> of > >>>>>>>>>> this > >>>>>>>>>>>>>>>> week. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs > >>>> matches > >>>>>>>>>>>>>>>>>>>>>> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs > from > >>>>>>>> KIP-848. > >>>>>>>>>>>>>>>>>>>>>> I prefer to maintain the consistency. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 101. Thanks for catching this. The > >>>>>>>> ShareGroupHeartbeatResponse > >>>>>>>>>>>> was > >>>>>>>>>>>>>>>>>>>>>> originally > >>>>>>>>>>>>>>>>>>>>>> created from KIP-848. This part of the schema does > not > >>>>>> apply > >>>>>>>>>>>> and I > >>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>> removed > >>>>>>>>>>>>>>>>>>>>>> it. I have also renamed AssignedTopicPartitions to > >>>> simply > >>>>>>>>>>>>>>>>>>>> TopicPartitions > >>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>> aligns with the actual definition of > >>>>>>>>>>>>>> ConsumerGroupHeartbeatResponse. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 102. No, I don’t think we do. Removed. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 103. I’ve changed the description for the error > codes > >>>> for > >>>>>>>>>>>>>>>>>>>>>> ShareFetchResponse. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 104. Interesting. I have added ErrorMessages to > these > >>>> RPCs > >>>>>>>> as > >>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>> suggest. > >>>>>>>>>>>>>>>>>>>>>> It’s a good improvement for problem determination. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 105. The values are reserved in my brain. Actually, > 1 > >> is > >>>>>>>>>>>> Acquired > >>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>> is not persisted, and I have another non-terminal > >> state > >>>> in > >>>>>>>>>> mind > >>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>> 3. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 106. A few people have raised the question of > whether > >>>>>>>>>>>>>>>>>> OffsetAndMetadata > >>>>>>>>>>>>>>>>>>>>>> is sensible in this KIP, given that the optional > >>>> Metadata > >>>>>>>> part > >>>>>>>>>>>>>> comes > >>>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>>>> a regular consumer commits offsets. However, it is > >>>> correct > >>>>>>>>>> that > >>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>> never be metadata with a share group. I have changed > >>>>>>>>>>>>>>>>>>>>>> the KIP to replace OffsetAndMetadata with Long. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 107. Yes, you are right. I have learnt during this > >>>> process > >>>>>>>>>> that > >>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>> version > >>>>>>>>>>>>>>>>>>>>>> bump > >>>>>>>>>>>>>>>>>>>>>> can be a logical not just a physical change to the > >>>> schema. > >>>>>>>> KIP > >>>>>>>>>>>>>>>>>> updated. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 108. I would prefer not to extend this RPC for all > of > >>>> the > >>>>>>>>>> states > >>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>> point. > >>>>>>>>>>>>>>>>>>>>>> I think there is definitely scope for another KIP > >>>> focused > >>>>>> on > >>>>>>>>>>>>>>>>>>>> administration > >>>>>>>>>>>>>>>>>>>>>> of share groups that might want this information so > >>>>>> someone > >>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>> build > >>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>> UI and other tools on top. Doing that well is quite > a > >>>> lot > >>>>>> of > >>>>>>>>>>>> work > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>>>>>> own right > >>>>>>>>>>>>>>>>>>>>>> so I would prefer not to do that now. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 109.Yes, they’re inconsistent and follow the > consumer > >>>>>> groups > >>>>>>>>>>>>>>>>>> equivalents > >>>>>>>>>>>>>>>>>>>>>> which are also inconsistent. In general, the > >>>>>>>>>> KafkaAdmin.delete…. > >>>>>>>>>>>>>>>>>> Methods > >>>>>>>>>>>>>>>>>>>>>> use the Map<XXX, KafkaFuture<YYY>> pattern like > >>>>>>>>>>>>>>>>>> DeleteShareGroupsResult. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Would you prefer that I do that, or remain > consistent > >>>> with > >>>>>>>>>>>>>> consumer > >>>>>>>>>>>>>>>>>>>> groups? > >>>>>>>>>>>>>>>>>>>>>> Happy to change it. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 110. Again, consumer groups don’t yield that level > of > >>>>>> detail > >>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>> epochs. > >>>>>>>>>>>>>>>>>>>>>> The MemberDescription does include the assignment, > but > >>>> not > >>>>>>>> the > >>>>>>>>>>>>>> list > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>> subscribed topic names. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 111. I didn’t include GroupState in GroupListing > >> because > >>>>>>>>>> there’s > >>>>>>>>>>>>>> no > >>>>>>>>>>>>>>>>>>>>>> single class that includes the states of all group > >>>> types. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 112. I think it’s good practice for the API to have > >>>>>>>>>>>>>>>>>>>>>> ListShareGroupOffsetSpec. > >>>>>>>>>>>>>>>>>>>>>> It makes evolution and extension of the API much > >> easier. > >>>>>>>> Also, > >>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>> matches > >>>>>>>>>>>>>>>>>>>>>> the precedent set by ListConsumerGroupOffsetSpec. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 113. ListConsumerGroupsResults.errors() is the > same. I > >>>>>> think > >>>>>>>>>> you > >>>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>> to look in the exception details and the same > pattern > >> is > >>>>>>>> being > >>>>>>>>>>>>>>>>>> followed > >>>>>>>>>>>>>>>>>>>>>> here. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Over the next few days, I have committed to writing > a > >>>>>>>> proposal > >>>>>>>>>>>> for > >>>>>>>>>>>>>>>> how > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> persist > >>>>>>>>>>>>>>>>>>>>>> share-group state that doesn’t use log compaction. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I am also aware that discussion with Justine Olshan > on > >>>>>>>>>>>>>>>> read-committed > >>>>>>>>>>>>>>>>>>>>>> isolation level is not yet quite complete. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks for the detailed review. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On 9 Apr 2024, at 23:58, Jun Rao > >>>>>> <j...@confluent.io.INVALID > >>>>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi, Andrew, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the reply. A few more comments. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 41. > >>>>>>>>>>>>>>>>>>>>>>> 41.1 How does the partition leader obtain the group > >>>> epoch > >>>>>>>> to > >>>>>>>>>>>> set > >>>>>>>>>>>>>>>>>>>>>>> WriteShareGroupStateRequest.StateEpoch? > >>>>>>>>>>>>>>>>>>>>>>> 41.2 What's the benefit of having the group > >> coordinator > >>>>>>>>>>>>>> initialize > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> state and the partition leader set the SPSO? It > seems > >>>>>>>> simpler > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> partition leader initialize both the state and the > >> SPSO > >>>>>>>>>>>> together? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 42. > >>>>>>>>>>>>>>>>>>>>>>> 42.1 "I don’t think the group epoch needs to be > >> bumped > >>>>>> when > >>>>>>>>>> the > >>>>>>>>>>>>>>>> share > >>>>>>>>>>>>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>>>>>>> offset is altered." > >>>>>>>>>>>>>>>>>>>>>>> But the part on handling Alter share group offsets > >> says > >>>>>>>> "The > >>>>>>>>>>>>>> share > >>>>>>>>>>>>>>>>>>>>>>> coordinator writes a ShareCheckpoint record with > the > >>>> new > >>>>>>>>>> state > >>>>>>>>>>>>>>>> epoch > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> __share_group_state topic." So, which is correct? > We > >>>>>> have > >>>>>>>>>> two > >>>>>>>>>>>>>>>> paths > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> update the state in the share coordinator, one from > >> the > >>>>>>>> group > >>>>>>>>>>>>>>>>>>>> coordinator > >>>>>>>>>>>>>>>>>>>>>>> and another from the partition leader. I thought > the > >>>>>>>> benefit > >>>>>>>>>> of > >>>>>>>>>>>>>>>>>> bumping > >>>>>>>>>>>>>>>>>>>>>> up > >>>>>>>>>>>>>>>>>>>>>>> the epoch is to fence off a late request in the > >>>> previous > >>>>>>>>>> epoch > >>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>> another > >>>>>>>>>>>>>>>>>>>>>>> path. > >>>>>>>>>>>>>>>>>>>>>>> 42.2 When the group coordinator alters the share > >> group > >>>>>>>> offset > >>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> share > >>>>>>>>>>>>>>>>>>>>>>> coordinator, how does the partition leader know the > >>>> share > >>>>>>>>>> group > >>>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>>>> has > >>>>>>>>>>>>>>>>>>>>>>> been altered so that it could clear its in-memory > >>>> state? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 47. 56. BaseOffset typically refers to the base > >> offset > >>>>>> for > >>>>>>>>>> the > >>>>>>>>>>>>>>>> batch > >>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>> can be confusing. FirstOffset is clearer and > matches > >>>>>>>>>>>> LastOffset. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 60. Could we include FindCoordinatorRequest in the > >> top > >>>>>>>> level > >>>>>>>>>>>>>> index > >>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>> Kafka protocol changes? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 61. I think it's probably ok to add time-based > >>>> expiration > >>>>>>>>>>>> later. > >>>>>>>>>>>>>>>> But > >>>>>>>>>>>>>>>>>>>>>> using > >>>>>>>>>>>>>>>>>>>>>>> a default group in console-share-consumer probably > >>>> won't > >>>>>>>> help > >>>>>>>>>>>>>>>> reduce > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> garbage. In the common case, the user of the > console > >>>>>>>> consumer > >>>>>>>>>>>>>>>> likely > >>>>>>>>>>>>>>>>>>>>>> wants > >>>>>>>>>>>>>>>>>>>>>>> to see the recently produced records for > >> verification. > >>>> If > >>>>>>>> the > >>>>>>>>>>>>>>>> default > >>>>>>>>>>>>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>>>>>>> doesn't provide that (because of the stale state), > >> the > >>>>>> user > >>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>> likely > >>>>>>>>>>>>>>>>>>>>>>> just use a new group. It's true that we don't > garbage > >>>>>>>> collect > >>>>>>>>>>>>>> idle > >>>>>>>>>>>>>>>>>>>>>> topics. > >>>>>>>>>>>>>>>>>>>>>>> However, the share groups are similar to > consumers, > >>>>>> which > >>>>>>>>>> does > >>>>>>>>>>>>>>>>>> support > >>>>>>>>>>>>>>>>>>>>>>> garbage collection. Typically, we read topics more > >> than > >>>>>>>>>>>> creating > >>>>>>>>>>>>>>>>>> them. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 77. If the generic compaction is inconvenient, we > >> could > >>>>>> use > >>>>>>>>>>>>>>>>>> customized > >>>>>>>>>>>>>>>>>>>>>>> logic. If we go with that route, option (b) seems > >>>> cleaner > >>>>>>>> and > >>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>> optimized. Since the share states for all groups > fit > >> in > >>>>>>>>>> memory, > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>> generate snapshots more efficiently than going > >> through > >>>>>>>>>>>>>> compaction. > >>>>>>>>>>>>>>>>>>>>>> Having a > >>>>>>>>>>>>>>>>>>>>>>> separate log per share partition is probably too > much > >>>>>>>>>> overhead. > >>>>>>>>>>>>>>>> It's > >>>>>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>> efficient to put the state changes for multiple > share > >>>>>>>>>>>> partitions > >>>>>>>>>>>>>>>> in a > >>>>>>>>>>>>>>>>>>>>>>> single log. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs: > >>>>>> Should > >>>>>>>> we > >>>>>>>>>>>>>> name > >>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>> SessionTimeoutMs? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 101. ShareGroupHeartbeatResponse.Assignment.Error: > >> What > >>>>>>>> kind > >>>>>>>>>> of > >>>>>>>>>>>>>>>> error > >>>>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>> we have when assigning partitions? What are the > >>>>>>>> corresponding > >>>>>>>>>>>>>> error > >>>>>>>>>>>>>>>>>>>>>> codes? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 102. Do we still need > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 103. Could we list the error codes separately for > >>>>>>>>>>>>>>>>>>>>>>> ShareFetchResponse.Responses.Partitions.ErrorCode > and > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 104. Should we add error message for the errorCode > in > >>>>>>>>>>>>>>>>>>>> ShareFetchResponse, > >>>>>>>>>>>>>>>>>>>>>>> ShareAcknowledgeResponse, > >> ReadShareGroupStateResponse, > >>>>>>>>>>>>>>>>>>>>>>> WriteShareGroupStateResponse, > >>>>>>>> DeleteShareGroupStateResponse, > >>>>>>>>>>>>>>>>>>>>>>> ReadShareGroupOffsetsStateResponse and > >>>>>>>>>>>>>>>>>>>> InitializeShareGroupStateResponse? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 105. "about": "The state - > >>>>>>>> 0:Available,2:Acked,4:Archived.": > >>>>>>>>>>>> What > >>>>>>>>>>>>>>>>>>>> about 1 > >>>>>>>>>>>>>>>>>>>>>>> and 3? Are we leaving them out intentionally? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 106. Do we have usage of metadata in > >> OffsetAndMetadata? > >>>>>> If > >>>>>>>>>> not, > >>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>> remove it from AdminClient and KafkaShareConsumer? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 107. ListGroupsRequest: Should we bump up the > version > >>>>>> since > >>>>>>>>>> it > >>>>>>>>>>>>>> now > >>>>>>>>>>>>>>>>>>>>>> supports > >>>>>>>>>>>>>>>>>>>>>>> a new group type "share"? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 108. AdminClient.listShareGroupOffsets: Should it > >>>> expose > >>>>>>>> all > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> states > >>>>>>>>>>>>>>>>>>>>>>> from ReadShareGroupStateResponse, instead of just > >> SPSO? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 109. DeleteShareGroupOffsetsResult exposes > >>>>>>>>>>>>>>>>>>>>>>> public KafkaFuture<Void> partitionResult(final > >>>>>>>> TopicPartition > >>>>>>>>>>>>>>>>>>>> partition) > >>>>>>>>>>>>>>>>>>>>>>> DeleteShareGroupsResult exposes > >>>>>>>>>>>>>>>>>>>>>>> public Map<String, KafkaFuture<Void>> > deletedGroups() > >>>>>>>>>>>>>>>>>>>>>>> Should we make them more consistent? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 110. Should ShareGroupDescription include fields > like > >>>>>>>>>>>> GroupEpoch, > >>>>>>>>>>>>>>>>>>>>>>> AssignmentEpoch, MemberEpoch, and > >> SubscribedTopicNames? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 111. Should GroupListing include GroupState? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 112. Do we need ListShareGroupOffsetsSpec? Could we > >>>> just > >>>>>>>> use > >>>>>>>>>>>>>>>>>>>>>>> Set<TopicPartition> directly? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 113. ListShareGroupsResult.errors(): How do we know > >>>> which > >>>>>>>>>> group > >>>>>>>>>>>>>> has > >>>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>> error? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Jun > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield < > >>>>>>>>>>>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hi David, > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for your questions. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 70. The Group Coordinator communicates with the > >> Share > >>>>>>>>>>>>>> Coordinator > >>>>>>>>>>>>>>>>>> over > >>>>>>>>>>>>>>>>>>>>>>>> RPCs. > >>>>>>>>>>>>>>>>>>>>>>>> In the general case, it’s an inter-broker call. It > >> is > >>>>>>>>>> probably > >>>>>>>>>>>>>>>>>>>> possible > >>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> optimise > >>>>>>>>>>>>>>>>>>>>>>>> for the situation in which the appropriate GC and > SC > >>>>>>>> shards > >>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>>>> co-located, but the > >>>>>>>>>>>>>>>>>>>>>>>> KIP does not delve that deep into potential > >>>> performance > >>>>>>>>>>>>>>>>>> optimisations. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 71. Avoiding collisions would be a good idea, but > I > >> do > >>>>>>>> worry > >>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>>>>>> retrospectively > >>>>>>>>>>>>>>>>>>>>>>>> introducing a naming convention for groups. I feel > >>>> that > >>>>>>>>>> naming > >>>>>>>>>>>>>>>>>>>>>> conventions > >>>>>>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>> typically be the responsibility of the cluster > >>>>>>>>>> administrators > >>>>>>>>>>>>>>>> based > >>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>> organizational > >>>>>>>>>>>>>>>>>>>>>>>> factors, such as the name of an application. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 72. Personally, I don’t like INVALID_GROUP_ID > >> because > >>>>>> the > >>>>>>>>>>>> group > >>>>>>>>>>>>>> ID > >>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> correct but > >>>>>>>>>>>>>>>>>>>>>>>> the group is the wrong type. The nearest existing > >>>> error > >>>>>>>> code > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> gets > >>>>>>>>>>>>>>>>>>>>>>>> that across > >>>>>>>>>>>>>>>>>>>>>>>> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is > >> really > >>>>>>>>>> showing > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>>> error code would be better. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 73. The Metadata fields are not used. I have > removed > >>>>>> them. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 74. The metadata is re-evaluated on every change, > >> but > >>>>>>>> only a > >>>>>>>>>>>>>>>> subset > >>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> relevant > >>>>>>>>>>>>>>>>>>>>>>>> for rebalancing. A check is done against the names > >> of > >>>>>> the > >>>>>>>>>>>>>>>> subscribed > >>>>>>>>>>>>>>>>>>>>>>>> topics to > >>>>>>>>>>>>>>>>>>>>>>>> see if any relevant changes may have occurred. > Then > >>>> the > >>>>>>>>>>>> changes > >>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>> trigger > >>>>>>>>>>>>>>>>>>>>>>>> a rebalance are topic creation, deletion, change > in > >>>>>>>>>>>> partitions, > >>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>> rack > >>>>>>>>>>>>>>>>>>>>>>>> IDs for the > >>>>>>>>>>>>>>>>>>>>>>>> replicas. I have updated the KIP to make this more > >>>>>> clear. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 75. The assignment is not persisted because it is > >> much > >>>>>>>> less > >>>>>>>>>>>>>>>>>> important > >>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> assignment survives a GC change. There’s no need > to > >>>>>>>> transfer > >>>>>>>>>>>>>>>>>>>> partitions > >>>>>>>>>>>>>>>>>>>>>>>> safely from > >>>>>>>>>>>>>>>>>>>>>>>> member to member in the way that is required for > >>>>>> consumer > >>>>>>>>>>>>>> groups, > >>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>>> optimisation, the assignments for a share group > are > >>>> not > >>>>>>>>>>>>>> persisted. > >>>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>>>>>>> wouldn’t do any > >>>>>>>>>>>>>>>>>>>>>>>> harm, but it just seems unnecessary. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 76. In the event that a consumer tries to > >> acknowledge > >>>> a > >>>>>>>>>> record > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>> now > >>>>>>>>>>>>>>>>>>>>>>>> longer > >>>>>>>>>>>>>>>>>>>>>>>> has the right to acknowledge, the > >> INVALID_RECORD_STATE > >>>>>>>> error > >>>>>>>>>>>>>> code > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>> used. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> If the application uses the > >>>>>> KafkaShareConsumer.commitSync > >>>>>>>>>>>>>> method, > >>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>> see an InvalidRecordState exception returned. > >>>>>>>> Alternatively, > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> application can > >>>>>>>>>>>>>>>>>>>>>>>> register an acknowledgement commit callback which > >> will > >>>>>> be > >>>>>>>>>>>> called > >>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> status > >>>>>>>>>>>>>>>>>>>>>>>> of the acknowledgements that have succeeded or > >> failed. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 77. I have tried to tread a careful path with the > >>>>>> durable > >>>>>>>>>>>>>>>>>>>>>> share-partition > >>>>>>>>>>>>>>>>>>>>>>>> state in this > >>>>>>>>>>>>>>>>>>>>>>>> KIP. The significant choices I made are that: > >>>>>>>>>>>>>>>>>>>>>>>> * Topics are used so that the state is replicated > >>>>>> between > >>>>>>>>>>>>>> brokers. > >>>>>>>>>>>>>>>>>>>>>>>> * Log compaction is used to keep a lid on the > >> storage. > >>>>>>>>>>>>>>>>>>>>>>>> * Only one topic is required. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Log compaction as it stands is not ideal for this > >> kind > >>>>>> of > >>>>>>>>>>>> data, > >>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>> evidenced by > >>>>>>>>>>>>>>>>>>>>>>>> the DeltaIndex technique I employed. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I can think of a few relatively simple ways to > >> improve > >>>>>>>> upon > >>>>>>>>>>>> it. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> a) We could use a customised version of the log > >>>>>> compactor > >>>>>>>>>> for > >>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>> topic > >>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>> understands the rules for ShareCheckpoint and > >>>> ShareDelta > >>>>>>>>>>>>>> records. > >>>>>>>>>>>>>>>>>>>>>>>> Essentially, > >>>>>>>>>>>>>>>>>>>>>>>> for each share-partition, the latest > ShareCheckpoint > >>>> and > >>>>>>>> any > >>>>>>>>>>>>>>>>>>>> subsequent > >>>>>>>>>>>>>>>>>>>>>>>> ShareDelta > >>>>>>>>>>>>>>>>>>>>>>>> records must not be cleaned. Anything else can be > >>>>>> cleaned. > >>>>>>>>>> We > >>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>> then > >>>>>>>>>>>>>>>>>>>>>>>> be sure > >>>>>>>>>>>>>>>>>>>>>>>> that multiple ShareDelta records with the same key > >>>> would > >>>>>>>>>>>> survive > >>>>>>>>>>>>>>>>>>>>>> cleaning > >>>>>>>>>>>>>>>>>>>>>>>> and we could > >>>>>>>>>>>>>>>>>>>>>>>> abandon the DeltaIndex technique. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> b) Actually what is required is a log per > >>>>>> share-partition. > >>>>>>>>>>>> Let’s > >>>>>>>>>>>>>>>>>>>> imagine > >>>>>>>>>>>>>>>>>>>>>>>> that we had > >>>>>>>>>>>>>>>>>>>>>>>> a share-state topic per topic being consumed in a > >>>> share > >>>>>>>>>> group, > >>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> same number > >>>>>>>>>>>>>>>>>>>>>>>> of partitions as the topic being consumed. We > could > >>>>>> write > >>>>>>>>>> many > >>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>> deltas > >>>>>>>>>>>>>>>>>>>>>>>> between > >>>>>>>>>>>>>>>>>>>>>>>> checkpoints, and just take periodic checkpoints to > >>>> keep > >>>>>>>>>>>> control > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> storage used. > >>>>>>>>>>>>>>>>>>>>>>>> Once a checkpoint has been taken, we could use > >>>>>>>>>>>>>>>>>>>>>> KafkaAdmin.deleteRecords() > >>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> prune all of the older records. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> The share-state topics would be more numerous, but > >> we > >>>>>> are > >>>>>>>>>>>>>> talking > >>>>>>>>>>>>>>>>>> one > >>>>>>>>>>>>>>>>>>>>>> per > >>>>>>>>>>>>>>>>>>>>>>>> topic > >>>>>>>>>>>>>>>>>>>>>>>> per share group that it’s being consumed in. These > >>>>>> topics > >>>>>>>>>>>> would > >>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>> compacted. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> As you’ll see in the KIP, the Persister interface > is > >>>>>>>>>> intended > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>> pluggable one day. > >>>>>>>>>>>>>>>>>>>>>>>> I know the scheme in the KIP is not ideal. It > seems > >>>>>> likely > >>>>>>>>>> to > >>>>>>>>>>>> me > >>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>> future KIPs will > >>>>>>>>>>>>>>>>>>>>>>>> improve upon it. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> If I can get buy-in for option (b), I’m happy to > >>>> change > >>>>>>>> this > >>>>>>>>>>>>>> KIP. > >>>>>>>>>>>>>>>>>>>> While > >>>>>>>>>>>>>>>>>>>>>>>> option (a) is > >>>>>>>>>>>>>>>>>>>>>>>> probably workable, it does seem a bit of a hack to > >>>> have > >>>>>> a > >>>>>>>>>>>>>>>> customised > >>>>>>>>>>>>>>>>>>>> log > >>>>>>>>>>>>>>>>>>>>>>>> compactor > >>>>>>>>>>>>>>>>>>>>>>>> just for this topic. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 78. How about DeliveryState? I agree that State is > >>>>>>>>>> overloaded. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 79. See (77). > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On 5 Apr 2024, at 05:07, David Arthur < > >>>>>>>>>>>>>> david.art...@confluent.io > >>>>>>>>>>>>>>>>>>>>>> .INVALID> > >>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Andrew, thanks for the KIP! This is a pretty > >> exciting > >>>>>>>>>> effort. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> I've finally made it through the KIP, still > trying > >> to > >>>>>>>> grok > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> whole > >>>>>>>>>>>>>>>>>>>>>>>> thing. > >>>>>>>>>>>>>>>>>>>>>>>>> Sorry if some of my questions are basic :) > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Concepts: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 70. Does the Group Coordinator communicate with > the > >>>>>> Share > >>>>>>>>>>>>>>>>>> Coordinator > >>>>>>>>>>>>>>>>>>>>>>>> over > >>>>>>>>>>>>>>>>>>>>>>>>> RPC or directly in-process? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 71. For preventing name collisions with regular > >>>>>> consumer > >>>>>>>>>>>>>> groups, > >>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>> define a reserved share group prefix? E.g., the > >>>>>> operator > >>>>>>>>>>>>>> defines > >>>>>>>>>>>>>>>>>>>> "sg_" > >>>>>>>>>>>>>>>>>>>>>>>> as a > >>>>>>>>>>>>>>>>>>>>>>>>> prefix for share groups only, and if a regular > >>>> consumer > >>>>>>>>>> group > >>>>>>>>>>>>>>>> tries > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>>>>>>>> that name it fails. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 72. When a consumer tries to use a share group, > or > >> a > >>>>>>>> share > >>>>>>>>>>>>>>>> consumer > >>>>>>>>>>>>>>>>>>>>>> tries > >>>>>>>>>>>>>>>>>>>>>>>>> to use a regular group, would INVALID_GROUP_ID > make > >>>>>> more > >>>>>>>>>>>> sense > >>>>>>>>>>>>>>>>>>>>>>>>> than INCONSISTENT_GROUP_PROTOCOL? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> -------- > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Share Group Membership: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 73. What goes in the Metadata field for > >>>>>>>>>>>> TargetAssignment#Member > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>> Assignment? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 74. Under Trigger a rebalance, it says we > rebalance > >>>>>> when > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> partition > >>>>>>>>>>>>>>>>>>>>>>>>> metadata changes. Would this be for any change, > or > >>>> just > >>>>>>>>>>>> certain > >>>>>>>>>>>>>>>>>> ones? > >>>>>>>>>>>>>>>>>>>>>> For > >>>>>>>>>>>>>>>>>>>>>>>>> example, if a follower drops out of the ISR and > >> comes > >>>>>>>> back, > >>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>> probably > >>>>>>>>>>>>>>>>>>>>>>>>> don't need to rebalance. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 75. "For a share group, the group coordinator > does > >>>>>> *not* > >>>>>>>>>>>>>> persist > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> assignment" Can you explain why this is not > needed? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 76. " If the consumer just failed to heartbeat > due > >>>> to a > >>>>>>>>>>>>>> temporary > >>>>>>>>>>>>>>>>>>>>>> pause, > >>>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>> could in theory continue to fetch and acknowledge > >>>>>>>> records. > >>>>>>>>>>>> When > >>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>> finally > >>>>>>>>>>>>>>>>>>>>>>>>> sends a heartbeat and realises it’s been kicked > out > >>>> of > >>>>>>>> the > >>>>>>>>>>>>>> group, > >>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>>>>> stop fetching records because its assignment has > >> been > >>>>>>>>>>>> revoked, > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>> rejoin > >>>>>>>>>>>>>>>>>>>>>>>>> the group." > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> A consumer with a long pause might still deliver > >> some > >>>>>>>>>>>> buffered > >>>>>>>>>>>>>>>>>>>> records, > >>>>>>>>>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>>>>>>> if the share group coordinator has expired its > >>>> session, > >>>>>>>> it > >>>>>>>>>>>>>>>> wouldn't > >>>>>>>>>>>>>>>>>>>>>>>> accept > >>>>>>>>>>>>>>>>>>>>>>>>> acknowledgments for that share consumer. In such > a > >>>>>> case, > >>>>>>>> is > >>>>>>>>>>>> any > >>>>>>>>>>>>>>>>>> kind > >>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>> error raised to the application like "hey, I know > >> we > >>>>>> gave > >>>>>>>>>> you > >>>>>>>>>>>>>>>> these > >>>>>>>>>>>>>>>>>>>>>>>>> records, but really we shouldn't have" ? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> ----- > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Record Delivery and acknowledgement > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 77. If we guarantee that a ShareCheckpoint is > >> written > >>>>>> at > >>>>>>>>>>>> least > >>>>>>>>>>>>>>>>>> every > >>>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>>>>> often, could we add a new log compactor that > avoids > >>>>>>>>>>>> compacting > >>>>>>>>>>>>>>>>>>>>>>>> ShareDelta-s > >>>>>>>>>>>>>>>>>>>>>>>>> that are still "active" (i.e., not yet superceded > >> by > >>>> a > >>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>>>> ShareCheckpoint). Mechnically, this could be done > >> by > >>>>>>>>>> keeping > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> LSO > >>>>>>>>>>>>>>>>>>>> no > >>>>>>>>>>>>>>>>>>>>>>>>> greater than the oldest "active" ShareCheckpoint. > >>>> This > >>>>>>>>>> might > >>>>>>>>>>>>>> let > >>>>>>>>>>>>>>>> us > >>>>>>>>>>>>>>>>>>>>>>>> remove > >>>>>>>>>>>>>>>>>>>>>>>>> the DeltaIndex thing. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 78. Instead of the State in the > >> ShareDelta/Checkpoint > >>>>>>>>>>>> records, > >>>>>>>>>>>>>>>> how > >>>>>>>>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>>>>>>> MessageState? (State is kind of > >> overloaded/ambiguous) > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 79. One possible limitation with the current > >>>>>> persistence > >>>>>>>>>>>> model > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>>>>>>> the share state is stored in one topic. It seems > >> like > >>>>>> we > >>>>>>>>>> are > >>>>>>>>>>>>>>>> going > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>> storing a lot more state than we do in > >>>>>> __consumer_offsets > >>>>>>>>>>>> since > >>>>>>>>>>>>>>>>>> we're > >>>>>>>>>>>>>>>>>>>>>>>>> dealing with message-level acks. With aggressive > >>>>>>>>>>>> checkpointing > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>> compaction, we can mitigate the storage > >> requirements, > >>>>>> but > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> throughput > >>>>>>>>>>>>>>>>>>>>>>>>> could be a limiting factor. Have we considered > >> other > >>>>>>>>>>>>>>>> possibilities > >>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>> persistence? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>>>>>>> David > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >> > >> > >