Hi David, Thanks for your questions. 70. The Group Coordinator communicates with the Share Coordinator over RPCs. In the general case, it’s an inter-broker call. It is probably possible to optimise for the situation in which the appropriate GC and SC shards are co-located, but the KIP does not delve that deep into potential performance optimisations.
71. Avoiding collisions would be a good idea, but I do worry about retrospectively introducing a naming convention for groups. I feel that naming conventions will typically be the responsibility of the cluster administrators based on organizational factors, such as the name of an application. 72. Personally, I don’t like INVALID_GROUP_ID because the group ID is correct but the group is the wrong type. The nearest existing error code that gets that across is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is really showing that a new error code would be better. 73. The Metadata fields are not used. I have removed them. 74. The metadata is re-evaluated on every change, but only a subset is relevant for rebalancing. A check is done against the names of the subscribed topics to see if any relevant changes may have occurred. Then the changes which trigger a rebalance are topic creation, deletion, change in partitions, or rack IDs for the replicas. I have updated the KIP to make this more clear. 75. The assignment is not persisted because it is much less important that the assignment survives a GC change. There’s no need to transfer partitions safely from member to member in the way that is required for consumer groups, so as an optimisation, the assignments for a share group are not persisted. It wouldn’t do any harm, but it just seems unnecessary. 76. In the event that a consumer tries to acknowledge a record that it now longer has the right to acknowledge, the INVALID_RECORD_STATE error code is used. If the application uses the KafkaShareConsumer.commitSync method, it will see an InvalidRecordState exception returned. Alternatively, the application can register an acknowledgement commit callback which will be called with the status of the acknowledgements that have succeeded or failed. 77. I have tried to tread a careful path with the durable share-partition state in this KIP. The significant choices I made are that: * Topics are used so that the state is replicated between brokers. * Log compaction is used to keep a lid on the storage. * Only one topic is required. Log compaction as it stands is not ideal for this kind of data, as evidenced by the DeltaIndex technique I employed. I can think of a few relatively simple ways to improve upon it. a) We could use a customised version of the log compactor for this topic that understands the rules for ShareCheckpoint and ShareDelta records. Essentially, for each share-partition, the latest ShareCheckpoint and any subsequent ShareDelta records must not be cleaned. Anything else can be cleaned. We could then be sure that multiple ShareDelta records with the same key would survive cleaning and we could abandon the DeltaIndex technique. b) Actually what is required is a log per share-partition. Let’s imagine that we had a share-state topic per topic being consumed in a share group, with the same number of partitions as the topic being consumed. We could write many more deltas between checkpoints, and just take periodic checkpoints to keep control of the storage used. Once a checkpoint has been taken, we could use KafkaAdmin.deleteRecords() to prune all of the older records. The share-state topics would be more numerous, but we are talking one per topic per share group that it’s being consumed in. These topics would not be compacted. As you’ll see in the KIP, the Persister interface is intended to be pluggable one day. I know the scheme in the KIP is not ideal. It seems likely to me that future KIPs will improve upon it. If I can get buy-in for option (b), I’m happy to change this KIP. While option (a) is probably workable, it does seem a bit of a hack to have a customised log compactor just for this topic. 78. How about DeliveryState? I agree that State is overloaded. 79. See (77). Thanks, Andrew > On 5 Apr 2024, at 05:07, David Arthur <david.art...@confluent.io.INVALID> > wrote: > > Andrew, thanks for the KIP! This is a pretty exciting effort. > > I've finally made it through the KIP, still trying to grok the whole thing. > Sorry if some of my questions are basic :) > > > Concepts: > > 70. Does the Group Coordinator communicate with the Share Coordinator over > RPC or directly in-process? > > 71. For preventing name collisions with regular consumer groups, could we > define a reserved share group prefix? E.g., the operator defines "sg_" as a > prefix for share groups only, and if a regular consumer group tries to use > that name it fails. > > 72. When a consumer tries to use a share group, or a share consumer tries > to use a regular group, would INVALID_GROUP_ID make more sense > than INCONSISTENT_GROUP_PROTOCOL? > > -------- > > Share Group Membership: > > 73. What goes in the Metadata field for TargetAssignment#Member and > Assignment? > > 74. Under Trigger a rebalance, it says we rebalance when the partition > metadata changes. Would this be for any change, or just certain ones? For > example, if a follower drops out of the ISR and comes back, we probably > don't need to rebalance. > > 75. "For a share group, the group coordinator does *not* persist the > assignment" Can you explain why this is not needed? > > 76. " If the consumer just failed to heartbeat due to a temporary pause, it > could in theory continue to fetch and acknowledge records. When it finally > sends a heartbeat and realises it’s been kicked out of the group, it should > stop fetching records because its assignment has been revoked, and rejoin > the group." > > A consumer with a long pause might still deliver some buffered records, but > if the share group coordinator has expired its session, it wouldn't accept > acknowledgments for that share consumer. In such a case, is any kind of > error raised to the application like "hey, I know we gave you these > records, but really we shouldn't have" ? > > > ----- > > Record Delivery and acknowledgement > > 77. If we guarantee that a ShareCheckpoint is written at least every so > often, could we add a new log compactor that avoids compacting ShareDelta-s > that are still "active" (i.e., not yet superceded by a new > ShareCheckpoint). Mechnically, this could be done by keeping the LSO no > greater than the oldest "active" ShareCheckpoint. This might let us remove > the DeltaIndex thing. > > 78. Instead of the State in the ShareDelta/Checkpoint records, how about > MessageState? (State is kind of overloaded/ambiguous) > > 79. One possible limitation with the current persistence model is that all > the share state is stored in one topic. It seems like we are going to be > storing a lot more state than we do in __consumer_offsets since we're > dealing with message-level acks. With aggressive checkpointing and > compaction, we can mitigate the storage requirements, but the throughput > could be a limiting factor. Have we considered other possibilities for > persistence? > > > Cheers, > David