Hi, David, Thanks for the reply.
60. Hmm, could you explain why KStreams needs the full set of partition assignments? I am also not sure how this will be implemented based on the protocol. Since HeartBeatResponse sends the assigned partitions in phases (those that don't need to wait for revocation from other members first, followed by the full assignment list), how does a member know which response has the full assignment? 62. I was referring to Admin#describeConsumerGroups. It seems that ownedPartitions is still of type List<TopicIdPartition>. Also, the existing topicPartitions() returns Set<TopicPartition>, not a collection. 63. This is also in Admin#describeConsumerGroups. The comment seems inconsistent with the field name. /** * The reason reported by the assignor. */ byte error; 67. Thanks for the explanation. Make sense. The existing name may be ok. 68.2 Are you saying the error is only intended for the client assignor? But the coordinator generates the error based on the server side validation, right? Should we provide some info to tell the client why the validation fails? 71.1 Hmm, for SubscribedTopicIds, should we use topic name in the subscription part? That seems more intuitive---a subscription shouldn't change just because a topic is recreated. For the assigned partitions, perhaps we could include both topicId and name just like FetchOffsetRequest. Thanks, Jun On Fri, Oct 14, 2022 at 2:49 AM Luke Chen <show...@gmail.com> wrote: > Thanks for the update. > Yes, I think using similar way as KIP-868 to fix this issue makes sense. > Let's consider it in the future. > > Luke > > On Fri, Oct 14, 2022 at 5:16 PM David Jacot <dja...@confluent.io.invalid> > wrote: > > > Hi Luke, > > > > Thanks for your questions. > > > > > 1. We will store the "targetAssignment" into log now. But as we know, > > there's max batch size limit (default 1MB), which means, we cannot > support > > 1M partitions in one group (actually, it should be less than 60k > partitions > > since we'll store {topicID+partition id}) by default now. How will we > > handle that? Do we expect users to adjust the max batch size to support > > large partitions in groups, which we don't need this change for old > > protocol? > > > > That's right. I have a few ideas to remove this limitation in the > > future but I decided to keep them for future improvement. The KIP is > > large enough and as the current protocol suffers from the exact same > > limitation, it is not a regression. > > > > For the future, my thinking is to split the assignment and to only > > write deltas to the log instead of re-writing all of it. We would need > > to use transactions for this in the coordinator (similarly to > > KIP-868). The challenge is that we have to ensure that those deltas > > are all written or completely roll backed. Otherwise, we would have a > > weird state with the compaction. This needs obviously more thinking. > > > > > I'm wondering why we should persist the "targetAssignment" data? If we > > want > > to work for coordinator failover, could the new coordinator try to > request > > for currently owned partitions from each consumer when failed over? I'm > not > > sure if the consumer will auto send owned partitions to the new > > coordinator. If not, maybe we can return an error to client > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and ask > > client to append the currently owned partitions to new coordinates for > new > > assignment computation. Does that make sense? > > > > The entire reconciliation process depends on it so if we lose it > > during a failover, members could be in a weird state. For instance, > > they could be in the middle of a transition from their current > > assignment to their new target and thus would be blocked. Relying on > > members to reconstruct it back does not really work because they don't > > have all the information to do so (e.g. new metadata) so we would have > > to recompute a new one. This implies that we need to get the owned > > partitions from all members and that would take a few seconds until > > all members come back in the best case, up to the session timeout in > > the worst case. Imagine that a member joins or fails during this time, > > the whole process would be stuck. I am afraid storing it is the best > > way here. > > > > Best, > > David > > > > > > On Fri, Oct 14, 2022 at 5:11 AM Luke Chen <show...@gmail.com> wrote: > > > > > > Hi David, > > > > > > A few more questions: > > > 1. We will store the "targetAssignment" into log now. But as we know, > > > there's max batch size limit (default 1MB), which means, we cannot > > support > > > 1M partitions in one group (actually, it should be less than 60k > > partitions > > > since we'll store {topicID+partition id}) by default now. How will we > > > handle that? Do we expect users to adjust the max batch size to support > > > large partitions in groups, which we don't need this change for old > > > protocol? > > > > > > I'm wondering why we should persist the "targetAssignment" data? If we > > want > > > to work for coordinator failover, could the new coordinator try to > > request > > > for currently owned partitions from each consumer when failed over? I'm > > not > > > sure if the consumer will auto send owned partitions to the new > > > coordinator. If not, maybe we can return an error to client > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and ask > > > client to append the currently owned partitions to new coordinates for > > new > > > assignment computation. Does that make sense? > > > > > > Luke > > > > > > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao <j...@confluent.io.invalid> > > wrote: > > > > > > > Hi, David, > > > > > > > > Thanks for the reply and the updated KIP. A few more comments on the > > > > interfaces and the protocols. > > > > > > > > 60. On the consumer side, do we need both > > PartitionAssignor.onAssignment > > > > and ConsumerRebalanceListener.onPartitionsAssigned? My understanding > is > > > > that the former was added for cooperative rebalance, which is now > > handled > > > > by the coordinator. If we do need both, should we make them more > > consistent > > > > (e.g. topic name vs topic id, list vs set vs collection)? > > > > > > > > 61. group.local.assignors: Could we make it clear that it's the full > > class > > > > name that implements PartitionAssignor? > > > > > > > > 62. MemberAssignment: It currently has the following method. > > > > public Set<TopicPartition> topicPartitions() > > > > We are adding List<TopicIdPartition> ownedPartitions. Should we keep > > the > > > > naming and the return type consistent? > > > > > > > > 63. MemberAssignment.error: should that be reason? > > > > > > > > 64. group.remote.assignor: The client may not know what assignors the > > > > broker supports. Should we default this to what the broker determines > > (e.g. > > > > first assignor listed in group.consumer.assignors)? > > > > > > > > 65. After the text "When A heartbeats again and acknowledges the > > > > revocation, the group coordinator transitions him to epoch 2 and > > releases > > > > foo-2.", we have the following. > > > > B - epoch=2, partitions=[foo-2], pending-partitions=[] > > > > Should foo-2 be in pending-partitions? > > > > > > > > 66. In the Online Migration example, is the first occurence of "C - > > > > epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]" > > correct? > > > > It seems that should happen after C receives a SyncGroup response? If > > so, > > > > subsequent examples have the same issue. > > > > > > > > 67. ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs : Which config > > > > controls this? How is this used by the group coordinator since there > > is no > > > > sync barrier anymore? > > > > > > > > 68. ConsumerGroupHeartbeatResponse: > > > > 68.1 AssignedTopicPartitions and PendingTopicPartitions are of type > > > > []TopicPartition. Should they be TopicPartitions? > > > > 68.2 Assignment.error. Should we also have an errorMessage field? > > > > > > > > 69. ConsumerGroupPrepareAssignmentResponse.Members.Assignor: Should > it > > > > include the selected assignor name? > > > > > > > > 70. ConsumerGroupInstallAssignmentRequest.GroupEpoch: Should we let > the > > > > client set this? Intuitively, it seems the coordinator should manage > > the > > > > group epoch. > > > > > > > > 71. ConsumerGroupDescribeResponse: > > > > 71.1 Members.Assignment.Partitions. Should we include the topic name > > too > > > > since it's convenient for building tools? Ditto for TargetAssignment. > > > > 71.2 Members: Should we include SubscribedTopicRegex too? > > > > > > > > 72. OffsetFetchRequest: Is GenerationIdOrMemberEpoch needed since > > tools may > > > > also want to issue this request? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > >