Thanks for the update. Yes, I think using similar way as KIP-868 to fix this issue makes sense. Let's consider it in the future.
Luke On Fri, Oct 14, 2022 at 5:16 PM David Jacot <dja...@confluent.io.invalid> wrote: > Hi Luke, > > Thanks for your questions. > > > 1. We will store the "targetAssignment" into log now. But as we know, > there's max batch size limit (default 1MB), which means, we cannot support > 1M partitions in one group (actually, it should be less than 60k partitions > since we'll store {topicID+partition id}) by default now. How will we > handle that? Do we expect users to adjust the max batch size to support > large partitions in groups, which we don't need this change for old > protocol? > > That's right. I have a few ideas to remove this limitation in the > future but I decided to keep them for future improvement. The KIP is > large enough and as the current protocol suffers from the exact same > limitation, it is not a regression. > > For the future, my thinking is to split the assignment and to only > write deltas to the log instead of re-writing all of it. We would need > to use transactions for this in the coordinator (similarly to > KIP-868). The challenge is that we have to ensure that those deltas > are all written or completely roll backed. Otherwise, we would have a > weird state with the compaction. This needs obviously more thinking. > > > I'm wondering why we should persist the "targetAssignment" data? If we > want > to work for coordinator failover, could the new coordinator try to request > for currently owned partitions from each consumer when failed over? I'm not > sure if the consumer will auto send owned partitions to the new > coordinator. If not, maybe we can return an error to client > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and ask > client to append the currently owned partitions to new coordinates for new > assignment computation. Does that make sense? > > The entire reconciliation process depends on it so if we lose it > during a failover, members could be in a weird state. For instance, > they could be in the middle of a transition from their current > assignment to their new target and thus would be blocked. Relying on > members to reconstruct it back does not really work because they don't > have all the information to do so (e.g. new metadata) so we would have > to recompute a new one. This implies that we need to get the owned > partitions from all members and that would take a few seconds until > all members come back in the best case, up to the session timeout in > the worst case. Imagine that a member joins or fails during this time, > the whole process would be stuck. I am afraid storing it is the best > way here. > > Best, > David > > > On Fri, Oct 14, 2022 at 5:11 AM Luke Chen <show...@gmail.com> wrote: > > > > Hi David, > > > > A few more questions: > > 1. We will store the "targetAssignment" into log now. But as we know, > > there's max batch size limit (default 1MB), which means, we cannot > support > > 1M partitions in one group (actually, it should be less than 60k > partitions > > since we'll store {topicID+partition id}) by default now. How will we > > handle that? Do we expect users to adjust the max batch size to support > > large partitions in groups, which we don't need this change for old > > protocol? > > > > I'm wondering why we should persist the "targetAssignment" data? If we > want > > to work for coordinator failover, could the new coordinator try to > request > > for currently owned partitions from each consumer when failed over? I'm > not > > sure if the consumer will auto send owned partitions to the new > > coordinator. If not, maybe we can return an error to client > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and ask > > client to append the currently owned partitions to new coordinates for > new > > assignment computation. Does that make sense? > > > > Luke > > > > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao <j...@confluent.io.invalid> > wrote: > > > > > Hi, David, > > > > > > Thanks for the reply and the updated KIP. A few more comments on the > > > interfaces and the protocols. > > > > > > 60. On the consumer side, do we need both > PartitionAssignor.onAssignment > > > and ConsumerRebalanceListener.onPartitionsAssigned? My understanding is > > > that the former was added for cooperative rebalance, which is now > handled > > > by the coordinator. If we do need both, should we make them more > consistent > > > (e.g. topic name vs topic id, list vs set vs collection)? > > > > > > 61. group.local.assignors: Could we make it clear that it's the full > class > > > name that implements PartitionAssignor? > > > > > > 62. MemberAssignment: It currently has the following method. > > > public Set<TopicPartition> topicPartitions() > > > We are adding List<TopicIdPartition> ownedPartitions. Should we keep > the > > > naming and the return type consistent? > > > > > > 63. MemberAssignment.error: should that be reason? > > > > > > 64. group.remote.assignor: The client may not know what assignors the > > > broker supports. Should we default this to what the broker determines > (e.g. > > > first assignor listed in group.consumer.assignors)? > > > > > > 65. After the text "When A heartbeats again and acknowledges the > > > revocation, the group coordinator transitions him to epoch 2 and > releases > > > foo-2.", we have the following. > > > B - epoch=2, partitions=[foo-2], pending-partitions=[] > > > Should foo-2 be in pending-partitions? > > > > > > 66. In the Online Migration example, is the first occurence of "C - > > > epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]" > correct? > > > It seems that should happen after C receives a SyncGroup response? If > so, > > > subsequent examples have the same issue. > > > > > > 67. ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs : Which config > > > controls this? How is this used by the group coordinator since there > is no > > > sync barrier anymore? > > > > > > 68. ConsumerGroupHeartbeatResponse: > > > 68.1 AssignedTopicPartitions and PendingTopicPartitions are of type > > > []TopicPartition. Should they be TopicPartitions? > > > 68.2 Assignment.error. Should we also have an errorMessage field? > > > > > > 69. ConsumerGroupPrepareAssignmentResponse.Members.Assignor: Should it > > > include the selected assignor name? > > > > > > 70. ConsumerGroupInstallAssignmentRequest.GroupEpoch: Should we let the > > > client set this? Intuitively, it seems the coordinator should manage > the > > > group epoch. > > > > > > 71. ConsumerGroupDescribeResponse: > > > 71.1 Members.Assignment.Partitions. Should we include the topic name > too > > > since it's convenient for building tools? Ditto for TargetAssignment. > > > 71.2 Members: Should we include SubscribedTopicRegex too? > > > > > > 72. OffsetFetchRequest: Is GenerationIdOrMemberEpoch needed since > tools may > > > also want to issue this request? > > > > > > Thanks, > > > > > > Jun > > > >