Hi, Andrew,

Thanks for the reply. A few more comments.

41.
41.1 How does the partition leader obtain the group epoch to set
WriteShareGroupStateRequest.StateEpoch?
41.2 What's the benefit of having the group coordinator initialize the
state and the partition leader set the SPSO? It seems simpler to have the
partition leader initialize both the state and the SPSO together?

42.
42.1 "I don’t think the group epoch needs to be bumped when the share group
offset is altered."
But the part on handling Alter share group offsets says "The share
coordinator writes a ShareCheckpoint record with the new state epoch to the
__share_group_state  topic." So, which is correct? We have two paths to
update the state in the share coordinator, one from the group coordinator
and another from the partition leader. I thought the benefit of bumping up
the epoch is to fence off a late request in the previous epoch from another
path.
42.2 When the group coordinator alters the share group offset in share
coordinator, how does the partition leader know the share group state has
been altered so that it could clear its in-memory state?

47. 56. BaseOffset typically refers to the base offset for the batch and
can be confusing. FirstOffset is clearer and matches LastOffset.

60. Could we include FindCoordinatorRequest in the top level index for
Kafka protocol changes?

61. I think it's probably ok to add time-based expiration later. But using
a default group in console-share-consumer probably won't help reduce the
garbage. In the common case, the user of the console consumer likely wants
to see the recently produced records for verification. If the default group
doesn't provide that (because of the stale state), the user will likely
just use a new group. It's true that we don't garbage collect idle topics.
However,  the share groups are similar to consumers, which does support
garbage collection. Typically, we read topics more than creating them.

77. If the generic compaction is inconvenient, we could use customized
logic. If we go with that route, option (b) seems cleaner and more
optimized. Since the share states for all groups fit in memory, we could
generate snapshots more efficiently than going through compaction. Having a
separate log per share partition is probably too much overhead. It's more
efficient to put the state changes for multiple share partitions in a
single log.

100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs: Should we name it
SessionTimeoutMs?

101. ShareGroupHeartbeatResponse.Assignment.Error: What kind of error could
we have when assigning partitions? What are the corresponding error codes?

102. Do we still need
ShareGroupDescribeResponse.Members.Assignment.{MetadataVersion,MetadataBytes}?

103. Could we list the error codes separately for
ShareFetchResponse.Responses.Partitions.ErrorCode and
ShareFetchResponse.Responses.Partitions.AcknowledgeErrorCode?

104. Should we add error message for the errorCode in ShareFetchResponse,
ShareAcknowledgeResponse, ReadShareGroupStateResponse,
WriteShareGroupStateResponse, DeleteShareGroupStateResponse,
ReadShareGroupOffsetsStateResponse and InitializeShareGroupStateResponse?

105. "about": "The state - 0:Available,2:Acked,4:Archived.": What about 1
and 3? Are we leaving them out intentionally?

106. Do we have usage of metadata in OffsetAndMetadata? If not, could we
remove it from AdminClient and KafkaShareConsumer?

107. ListGroupsRequest: Should we bump up the version since it now supports
a new group type "share"?

108. AdminClient.listShareGroupOffsets: Should it expose all the states
from ReadShareGroupStateResponse, instead of just SPSO?

109. DeleteShareGroupOffsetsResult exposes
  public KafkaFuture<Void> partitionResult(final TopicPartition partition)
DeleteShareGroupsResult exposes
  public Map<String, KafkaFuture<Void>> deletedGroups()
Should we make them more consistent?

110. Should ShareGroupDescription include fields like GroupEpoch,
AssignmentEpoch, MemberEpoch, and SubscribedTopicNames?

111. Should GroupListing include GroupState?

112. Do we need ListShareGroupOffsetsSpec? Could we just use
Set<TopicPartition> directly?

113. ListShareGroupsResult.errors(): How do we know which group has an
error?

Jun

On Mon, Apr 8, 2024 at 9:32 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi David,
> Thanks for your questions.
>
> 70. The Group Coordinator communicates with the Share Coordinator over
> RPCs.
> In the general case, it’s an inter-broker call. It is probably possible to
> optimise
> for the situation in which the appropriate GC and SC shards are
> co-located, but the
> KIP does not delve that deep into potential performance optimisations.
>
> 71. Avoiding collisions would be a good idea, but I do worry about
> retrospectively
> introducing a naming convention for groups. I feel that naming conventions
> will
> typically be the responsibility of the cluster administrators based on
> organizational
> factors, such as the name of an application.
>
> 72. Personally, I don’t like INVALID_GROUP_ID because the group ID is
> correct but
> the group is the wrong type. The nearest existing error code that gets
> that across
> is INCONSISTENT_GROUP_PROTOCOL. Perhaps this is really showing that a new
> error code would be better.
>
> 73. The Metadata fields are not used. I have removed them.
>
> 74. The metadata is re-evaluated on every change, but only a subset is
> relevant
> for rebalancing. A check is done against the names of the subscribed
> topics to
> see if any relevant changes may have occurred. Then the changes which
> trigger
> a rebalance are topic creation, deletion, change in partitions, or rack
> IDs for the
> replicas. I have updated the KIP to make this more clear.
>
> 75. The assignment is not persisted because it is much less important that
> the
> assignment survives a GC change. There’s no need to transfer partitions
> safely from
> member to member in the way that is required for consumer groups, so as an
> optimisation, the assignments for a share group are not persisted. It
> wouldn’t do any
> harm, but it just seems unnecessary.
>
> 76. In the event that a consumer tries to acknowledge a record that it now
> longer
> has the right to acknowledge, the INVALID_RECORD_STATE error code is used.
>
> If the application uses the KafkaShareConsumer.commitSync method, it will
> see an InvalidRecordState exception returned. Alternatively, the
> application can
> register an acknowledgement commit callback which will be called with the
> status
> of the acknowledgements that have succeeded or failed.
>
> 77. I have tried to tread a careful path with the durable share-partition
> state in this
> KIP. The significant choices I made are that:
> * Topics are used so that the state is replicated between brokers.
> * Log compaction is used to keep a lid on the storage.
> * Only one topic is required.
>
> Log compaction as it stands is not ideal for this kind of data, as
> evidenced by
> the DeltaIndex technique I employed.
>
> I can think of a few relatively simple ways to improve upon it.
>
> a) We could use a customised version of the log compactor for this topic
> that
> understands the rules for ShareCheckpoint and ShareDelta records.
> Essentially,
> for each share-partition, the latest ShareCheckpoint and any subsequent
> ShareDelta
> records must not be cleaned. Anything else can be cleaned. We could then
> be sure
> that multiple ShareDelta records with the same key would survive cleaning
> and we could
> abandon the DeltaIndex technique.
>
> b) Actually what is required is a log per share-partition. Let’s imagine
> that we had
> a share-state topic per topic being consumed in a share group, with the
> same number
> of partitions as the topic being consumed. We could write many more deltas
> between
> checkpoints, and just take periodic checkpoints to keep control of the
> storage used.
> Once a checkpoint has been taken, we could use KafkaAdmin.deleteRecords()
> to
> prune all of the older records.
>
> The share-state topics would be more numerous, but we are talking one per
> topic
> per share group that it’s being consumed in. These topics would not be
> compacted.
>
> As you’ll see in the KIP, the Persister interface is intended to be
> pluggable one day.
> I know the scheme in the KIP is not ideal. It seems likely to me that
> future KIPs will
> improve upon it.
>
> If I can get buy-in for option (b), I’m happy to change this KIP. While
> option (a) is
> probably workable, it does seem a bit of a hack to have a customised log
> compactor
> just for this topic.
>
> 78. How about DeliveryState? I agree that State is overloaded.
>
> 79. See (77).
>
> Thanks,
> Andrew
>
>
> > On 5 Apr 2024, at 05:07, David Arthur <david.art...@confluent.io.INVALID>
> wrote:
> >
> > Andrew, thanks for the KIP! This is a pretty exciting effort.
> >
> > I've finally made it through the KIP, still trying to grok the whole
> thing.
> > Sorry if some of my questions are basic :)
> >
> >
> > Concepts:
> >
> > 70. Does the Group Coordinator communicate with the Share Coordinator
> over
> > RPC or directly in-process?
> >
> > 71. For preventing name collisions with regular consumer groups, could we
> > define a reserved share group prefix? E.g., the operator defines "sg_"
> as a
> > prefix for share groups only, and if a regular consumer group tries to
> use
> > that name it fails.
> >
> > 72. When a consumer tries to use a share group, or a share consumer tries
> > to use a regular group, would INVALID_GROUP_ID make more sense
> > than INCONSISTENT_GROUP_PROTOCOL?
> >
> > --------
> >
> > Share Group Membership:
> >
> > 73. What goes in the Metadata field for TargetAssignment#Member and
> > Assignment?
> >
> > 74. Under Trigger a rebalance, it says we rebalance when the partition
> > metadata changes. Would this be for any change, or just certain ones? For
> > example, if a follower drops out of the ISR and comes back, we probably
> > don't need to rebalance.
> >
> > 75. "For a share group, the group coordinator does *not* persist the
> > assignment" Can you explain why this is not needed?
> >
> > 76. " If the consumer just failed to heartbeat due to a temporary pause,
> it
> > could in theory continue to fetch and acknowledge records. When it
> finally
> > sends a heartbeat and realises it’s been kicked out of the group, it
> should
> > stop fetching records because its assignment has been revoked, and rejoin
> > the group."
> >
> > A consumer with a long pause might still deliver some buffered records,
> but
> > if the share group coordinator has expired its session, it wouldn't
> accept
> > acknowledgments for that share consumer. In such a case, is any kind of
> > error raised to the application like "hey, I know we gave you these
> > records, but really we shouldn't have" ?
> >
> >
> > -----
> >
> > Record Delivery and acknowledgement
> >
> > 77. If we guarantee that a ShareCheckpoint is written at least every so
> > often, could we add a new log compactor that avoids compacting
> ShareDelta-s
> > that are still "active" (i.e., not yet superceded by a new
> > ShareCheckpoint). Mechnically, this could be done by keeping the LSO no
> > greater than the oldest "active" ShareCheckpoint. This might let us
> remove
> > the DeltaIndex thing.
> >
> > 78. Instead of the State in the ShareDelta/Checkpoint records, how about
> > MessageState? (State is kind of overloaded/ambiguous)
> >
> > 79. One possible limitation with the current persistence model is that
> all
> > the share state is stored in one topic. It seems like we are going to be
> > storing a lot more state than we do in __consumer_offsets since we're
> > dealing with message-level acks. With aggressive checkpointing and
> > compaction, we can mitigate the storage requirements, but the throughput
> > could be a limiting factor. Have we considered other possibilities for
> > persistence?
> >
> >
> > Cheers,
> > David
>
>

Reply via email to