Hi Jun, 81. I forgot to say that I put UniformAssignor as the first one in the list. I think that it should be the default one.
Best, David On Tue, Oct 18, 2022 at 8:33 PM David Jacot <dja...@confluent.io> wrote: > > Hi Jun, > > 80. Hmm. It seems preferable to keep > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId as a > topic id in order for the RPC to remain symmetrical with the > PrepareAssignment RPCs. The client has to create the TopicPartitions > from the mapping provided in the PrepareAssignment response so it can > use the same mapping to convert them back to topic ids afterwards. I > personally find this cleaner from an RPC perspective, don't you think? > > 81. Make sense. Done. > > Thanks, > David > > On Tue, Oct 18, 2022 at 8:00 PM Jun Rao <j...@confluent.io.invalid> wrote: > > > > Hi, David, > > > > Thanks for the reply. > > > > 80. The change in ConsumerGroupPrepareAssignmentResponse sounds good to me. > > Should ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId be > > topic name? Intuitively, the client assignor assigns partitions based on > > topic names and it seems that the group coordinator should be responsible > > for mapping the topic names to topic ids. > > > > 81. group.consumer.assignors: Should we change the default values to > > include the full class name? > > > > Thanks, > > > > Jun > > > > On Tue, Oct 18, 2022 at 2:36 AM David Jacot <dja...@confluent.io.invalid> > > wrote: > > > > > Hi Jun, > > > > > > 80. I have included the mapping from topic ids to topic names in > > > ConsumerGroupPrepareAssignmentResponse.Topics. It can be used to > > > convert all the topic ids that you mentioned. It seems preferable to > > > me to keep it like this as topic ids are usually smaller than topic > > > names. Does that make sense? > > > > > > Thanks, > > > David > > > > > > On Tue, Oct 18, 2022 at 1:51 AM Jun Rao <j...@confluent.io.invalid> wrote: > > > > > > > > Hi, David, > > > > > > > > Thanks for the reply. Just one more comment. > > > > > > > > 80. Since PartitionAssignor.AssignmentMemberSpec uses topic name for > > > > both > > > > subscribedTopics and targetPartitions, in > > > > ConsumerGroupPrepareAssignmentResponse, should > > > > Members.SubscribedTopicIds > > > > and Members.TopicPartitions.TopicId be topic names too? Similarly, since > > > > PartitionAssignor.MemberAssignment uses topic name, should > > > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId be > > > > topic > > > > name too? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Mon, Oct 17, 2022 at 2:35 AM David Jacot <dja...@confluent.io.invalid > > > > > > > > wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > Thanks for your comments. Please find my answers below. > > > > > > > > > > 60. Sure. Let me use a concrete example to illustrate it. Let's assume > > > > > that KStreams has a member A with a task reading from foo-0 and a > > > > > member B with a standby task reading from foo-0. As you know, the > > > > > standby task information for B is encoded in the assignment metadata > > > > > whereas the assigned partition for A is in the assignment. Now, let's > > > > > imagine that the assignor decides to promote the standby task to > > > > > become the active task for foo-0. The assignor will create a new > > > > > assignment for the group and will encode the standby task in A's > > > > > metadata and assign foo-0 to B. A will be requested to revoke foo-0 > > > > > and, while it does so, B will get its new metadata but without foo-0 > > > > > because foo-0 is not revoked yet. From the point of view of B, it will > > > > > see that the standby task is no longer there. Without providing the > > > > > full set of assigned partitions, it would not know what to do here. If > > > > > foo-0 is targeted to be assigned to B, B should wait until it is > > > > > before stopping the standby task. If foo-0 is not, it can stop the > > > > > standby immediately. Long story short, KStreams needs to get the full > > > > > assignment + metadata in order to reason about the correct end state. > > > > > > > > > > How do we provide this? We have added the pending partitions in the > > > > > ConsumerGroupHeartbeatResponse in conjunction to the assigned > > > > > partitions. With this, the Consumer knows the full set of partitions. > > > > > PartitionAssignor#onAssign will be called when the member transitions > > > > > to a new epoch with the target partitions and the metadata for the > > > > > member. Then, RebalanceListener#onAssignedPartitions is called when > > > > > partitions are incrementally assigned. At most, it will be called N > > > > > times where N is the number of assigned partitions in the current > > > > > epoch. > > > > > > > > > > For context, this was discussed with Guohzang in this thread. > > > > > > > > > > I hope that this clarifies the intent. > > > > > > > > > > 62. Got it. I have reworked that part as well. Unfortunately, I think > > > > > that we should keep using a Set here because it is already there. > > > > > Deprecating the current one to change the type is not worth it. > > > > > > > > > > 63. Fixed as well. > > > > > > > > > > 68.2 Right. The error is only used by the client side assignor. The > > > > > client assignor will report a non-zero error code while installing a > > > > > new assignment if it was not able to compute a new assignment for the > > > > > group. In this case, the assignment is not changed but the error is > > > > > reported to all the members. KStreams will have a few of such errors. > > > > > They are listed in the KIP. > > > > > > > > > > When a new assignment is installed with the > > > > > ConsumerGroupInstallAssignment API, the coordinator validates it and > > > > > rejects the installation directly if it is not valid. In this case, > > > > > only the member trying to install the assignment gets an error. The > > > > > other members are not aware of it as they keep their current assignor. > > > > > > > > > > 71.1 That makes sense. Using SubscribedTopicNames is indeed more > > > > > intuitive. I have changed the HB request to use it in order to be > > > > > consistent. I have also added the topic names in the > > > > > ConsumerGroupDescribe response. > > > > > > > > > > Best, > > > > > David > > > > > > > > > > On Fri, Oct 14, 2022 at 8:05 PM Jun Rao <j...@confluent.io.invalid> > > > wrote: > > > > > > > > > > > > Hi, David, > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > 60. Hmm, could you explain why KStreams needs the full set of > > > partition > > > > > > assignments? I am also not sure how this will be implemented based > > > on the > > > > > > protocol. Since HeartBeatResponse sends the assigned partitions in > > > phases > > > > > > (those that don't need to wait for revocation from other members > > > first, > > > > > > followed by the full assignment list), how does a member know which > > > > > > response has the full assignment? > > > > > > > > > > > > 62. I was referring to Admin#describeConsumerGroups. It seems that > > > > > > ownedPartitions is still of type List<TopicIdPartition>. Also, the > > > > > > existing topicPartitions() returns Set<TopicPartition>, not a > > > collection. > > > > > > > > > > > > 63. This is also in Admin#describeConsumerGroups. The comment seems > > > > > > inconsistent with the field name. > > > > > > /** > > > > > > * The reason reported by the assignor. > > > > > > */ > > > > > > byte error; > > > > > > > > > > > > 67. Thanks for the explanation. Make sense. The existing name may be > > > ok. > > > > > > > > > > > > 68.2 Are you saying the error is only intended for the client > > > assignor? > > > > > But > > > > > > the coordinator generates the error based on the server side > > > validation, > > > > > > right? Should we provide some info to tell the client why the > > > validation > > > > > > fails? > > > > > > > > > > > > 71.1 Hmm, for SubscribedTopicIds, should we use topic name in the > > > > > > subscription part? That seems more intuitive---a subscription > > > shouldn't > > > > > > change just because a topic is recreated. For the assigned > > > partitions, > > > > > > perhaps we could include both topicId and name just like > > > > > FetchOffsetRequest. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:49 AM Luke Chen <show...@gmail.com> wrote: > > > > > > > > > > > > > Thanks for the update. > > > > > > > Yes, I think using similar way as KIP-868 to fix this issue makes > > > > > sense. > > > > > > > Let's consider it in the future. > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 5:16 PM David Jacot > > > > > <dja...@confluent.io.invalid> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > > > > > Thanks for your questions. > > > > > > > > > > > > > > > > > 1. We will store the "targetAssignment" into log now. But as > > > > > > > > > we > > > > > know, > > > > > > > > there's max batch size limit (default 1MB), which means, we > > > cannot > > > > > > > support > > > > > > > > 1M partitions in one group (actually, it should be less than 60k > > > > > > > partitions > > > > > > > > since we'll store {topicID+partition id}) by default now. How > > > will we > > > > > > > > handle that? Do we expect users to adjust the max batch size to > > > > > support > > > > > > > > large partitions in groups, which we don't need this change for > > > old > > > > > > > > protocol? > > > > > > > > > > > > > > > > That's right. I have a few ideas to remove this limitation in > > > > > > > > the > > > > > > > > future but I decided to keep them for future improvement. The > > > KIP is > > > > > > > > large enough and as the current protocol suffers from the exact > > > same > > > > > > > > limitation, it is not a regression. > > > > > > > > > > > > > > > > For the future, my thinking is to split the assignment and to > > > only > > > > > > > > write deltas to the log instead of re-writing all of it. We > > > > > > > > would > > > > > need > > > > > > > > to use transactions for this in the coordinator (similarly to > > > > > > > > KIP-868). The challenge is that we have to ensure that those > > > deltas > > > > > > > > are all written or completely roll backed. Otherwise, we would > > > have a > > > > > > > > weird state with the compaction. This needs obviously more > > > thinking. > > > > > > > > > > > > > > > > > I'm wondering why we should persist the "targetAssignment" > > > data? > > > > > If we > > > > > > > > want > > > > > > > > to work for coordinator failover, could the new coordinator try > > > to > > > > > > > request > > > > > > > > for currently owned partitions from each consumer when failed > > > over? > > > > > I'm > > > > > > > not > > > > > > > > sure if the consumer will auto send owned partitions to the new > > > > > > > > coordinator. If not, maybe we can return an error to client > > > > > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, > > > and > > > > > ask > > > > > > > > client to append the currently owned partitions to new > > > coordinates > > > > > for > > > > > > > new > > > > > > > > assignment computation. Does that make sense? > > > > > > > > > > > > > > > > The entire reconciliation process depends on it so if we lose it > > > > > > > > during a failover, members could be in a weird state. For > > > instance, > > > > > > > > they could be in the middle of a transition from their current > > > > > > > > assignment to their new target and thus would be blocked. > > > Relying on > > > > > > > > members to reconstruct it back does not really work because they > > > > > don't > > > > > > > > have all the information to do so (e.g. new metadata) so we > > > > > > > > would > > > > > have > > > > > > > > to recompute a new one. This implies that we need to get the > > > owned > > > > > > > > partitions from all members and that would take a few seconds > > > until > > > > > > > > all members come back in the best case, up to the session > > > timeout in > > > > > > > > the worst case. Imagine that a member joins or fails during this > > > > > time, > > > > > > > > the whole process would be stuck. I am afraid storing it is the > > > best > > > > > > > > way here. > > > > > > > > > > > > > > > > Best, > > > > > > > > David > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 5:11 AM Luke Chen <show...@gmail.com> > > > wrote: > > > > > > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > > > > > A few more questions: > > > > > > > > > 1. We will store the "targetAssignment" into log now. But as > > > > > > > > > we > > > > > know, > > > > > > > > > there's max batch size limit (default 1MB), which means, we > > > cannot > > > > > > > > support > > > > > > > > > 1M partitions in one group (actually, it should be less than > > > 60k > > > > > > > > partitions > > > > > > > > > since we'll store {topicID+partition id}) by default now. How > > > will > > > > > we > > > > > > > > > handle that? Do we expect users to adjust the max batch size > > > > > > > > > to > > > > > support > > > > > > > > > large partitions in groups, which we don't need this change > > > for old > > > > > > > > > protocol? > > > > > > > > > > > > > > > > > > I'm wondering why we should persist the "targetAssignment" > > > data? > > > > > If we > > > > > > > > want > > > > > > > > > to work for coordinator failover, could the new coordinator > > > try to > > > > > > > > request > > > > > > > > > for currently owned partitions from each consumer when failed > > > > > over? I'm > > > > > > > > not > > > > > > > > > sure if the consumer will auto send owned partitions to the > > > > > > > > > new > > > > > > > > > coordinator. If not, maybe we can return an error to client > > > > > > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, > > > and > > > > > ask > > > > > > > > > client to append the currently owned partitions to new > > > coordinates > > > > > for > > > > > > > > new > > > > > > > > > assignment computation. Does that make sense? > > > > > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi, David, > > > > > > > > > > > > > > > > > > > > Thanks for the reply and the updated KIP. A few more > > > comments on > > > > > the > > > > > > > > > > interfaces and the protocols. > > > > > > > > > > > > > > > > > > > > 60. On the consumer side, do we need both > > > > > > > > PartitionAssignor.onAssignment > > > > > > > > > > and ConsumerRebalanceListener.onPartitionsAssigned? My > > > > > understanding > > > > > > > is > > > > > > > > > > that the former was added for cooperative rebalance, which > > > is now > > > > > > > > handled > > > > > > > > > > by the coordinator. If we do need both, should we make them > > > more > > > > > > > > consistent > > > > > > > > > > (e.g. topic name vs topic id, list vs set vs collection)? > > > > > > > > > > > > > > > > > > > > 61. group.local.assignors: Could we make it clear that it's > > > the > > > > > full > > > > > > > > class > > > > > > > > > > name that implements PartitionAssignor? > > > > > > > > > > > > > > > > > > > > 62. MemberAssignment: It currently has the following method. > > > > > > > > > > public Set<TopicPartition> topicPartitions() > > > > > > > > > > We are adding List<TopicIdPartition> ownedPartitions. Should > > > we > > > > > keep > > > > > > > > the > > > > > > > > > > naming and the return type consistent? > > > > > > > > > > > > > > > > > > > > 63. MemberAssignment.error: should that be reason? > > > > > > > > > > > > > > > > > > > > 64. group.remote.assignor: The client may not know what > > > > > assignors the > > > > > > > > > > broker supports. Should we default this to what the broker > > > > > determines > > > > > > > > (e.g. > > > > > > > > > > first assignor listed in group.consumer.assignors)? > > > > > > > > > > > > > > > > > > > > 65. After the text "When A heartbeats again and acknowledges > > > the > > > > > > > > > > revocation, the group coordinator transitions him to epoch 2 > > > and > > > > > > > > releases > > > > > > > > > > foo-2.", we have the following. > > > > > > > > > > B - epoch=2, partitions=[foo-2], pending-partitions=[] > > > > > > > > > > Should foo-2 be in pending-partitions? > > > > > > > > > > > > > > > > > > > > 66. In the Online Migration example, is the first occurence > > > of > > > > > "C - > > > > > > > > > > epoch=23, partitions=[foo-2, foo-5, foo-4], > > > > > pending-partitions=[]" > > > > > > > > correct? > > > > > > > > > > It seems that should happen after C receives a SyncGroup > > > > > response? If > > > > > > > > so, > > > > > > > > > > subsequent examples have the same issue. > > > > > > > > > > > > > > > > > > > > 67. ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs : Which > > > > > config > > > > > > > > > > controls this? How is this used by the group coordinator > > > since > > > > > there > > > > > > > > is no > > > > > > > > > > sync barrier anymore? > > > > > > > > > > > > > > > > > > > > 68. ConsumerGroupHeartbeatResponse: > > > > > > > > > > 68.1 AssignedTopicPartitions and PendingTopicPartitions are > > > of > > > > > type > > > > > > > > > > []TopicPartition. Should they be TopicPartitions? > > > > > > > > > > 68.2 Assignment.error. Should we also have an errorMessage > > > field? > > > > > > > > > > > > > > > > > > > > 69. ConsumerGroupPrepareAssignmentResponse.Members.Assignor: > > > > > Should > > > > > > > it > > > > > > > > > > include the selected assignor name? > > > > > > > > > > > > > > > > > > > > 70. ConsumerGroupInstallAssignmentRequest.GroupEpoch: Should > > > we > > > > > let > > > > > > > the > > > > > > > > > > client set this? Intuitively, it seems the coordinator > > > > > > > > > > should > > > > > manage > > > > > > > > the > > > > > > > > > > group epoch. > > > > > > > > > > > > > > > > > > > > 71. ConsumerGroupDescribeResponse: > > > > > > > > > > 71.1 Members.Assignment.Partitions. Should we include the > > > topic > > > > > name > > > > > > > > too > > > > > > > > > > since it's convenient for building tools? Ditto for > > > > > TargetAssignment. > > > > > > > > > > 71.2 Members: Should we include SubscribedTopicRegex too? > > > > > > > > > > > > > > > > > > > > 72. OffsetFetchRequest: Is GenerationIdOrMemberEpoch needed > > > since > > > > > > > > tools may > > > > > > > > > > also want to issue this request? > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >