Hi all, I would like to highlight a small change that I did in the KIP around static membership based on Magnus' feedback. Magnus pointed out that two members using the same static instance id will basically continuously fence each other. The issue is that the existing member is fenced and replaced by the new joining member. This is not ideal. Note that Jason mentioned something similar earlier in this thread.
I think that we can make this slightly better by rejecting the new member until the lease of the existing member is either explicitly terminated or expired based on the session timeout. The existing member can terminate its lease by sending an heartbeat with -2 as member epoch. This would signal to the group coordinator that the existing member left the group with the intent to rejoin within the session timeout. The new member is rejected with a new error code UNRELEASED_INSTANCE_ID during that time period. In case of failure of the existing member, the new member has to wait until its session expires. Best, David On Wed, Oct 19, 2022 at 10:05 AM David Jacot <dja...@confluent.io> wrote: > > Hi Jun, > > Thanks for your thorough review. There is already a vote thread if you > want to vote. > > Best, > David > > On Tue, Oct 18, 2022 at 11:07 PM Jun Rao <j...@confluent.io.invalid> wrote: > > > > Hi, David, > > > > Thanks for the reply. No more comments from me. > > > > 80. Yes, since PrepareAssignment returns topicIds, using topicId in > > ConsumerGroupInstallAssignmentRequest makes sense. > > > > 81. Sounds good. > > > > Jun > > > > On Tue, Oct 18, 2022 at 11:46 AM David Jacot <dja...@confluent.io.invalid> > > wrote: > > > > > Hi Jun, > > > > > > 81. I forgot to say that I put UniformAssignor as the first one in the > > > list. I think that it should be the default one. > > > > > > Best, > > > David > > > > > > On Tue, Oct 18, 2022 at 8:33 PM David Jacot <dja...@confluent.io> wrote: > > > > > > > > Hi Jun, > > > > > > > > 80. Hmm. It seems preferable to keep > > > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId as a > > > > topic id in order for the RPC to remain symmetrical with the > > > > PrepareAssignment RPCs. The client has to create the TopicPartitions > > > > from the mapping provided in the PrepareAssignment response so it can > > > > use the same mapping to convert them back to topic ids afterwards. I > > > > personally find this cleaner from an RPC perspective, don't you think? > > > > > > > > 81. Make sense. Done. > > > > > > > > Thanks, > > > > David > > > > > > > > On Tue, Oct 18, 2022 at 8:00 PM Jun Rao <j...@confluent.io.invalid> > > > wrote: > > > > > > > > > > Hi, David, > > > > > > > > > > Thanks for the reply. > > > > > > > > > > 80. The change in ConsumerGroupPrepareAssignmentResponse sounds good > > > to me. > > > > > Should > > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId be > > > > > topic name? Intuitively, the client assignor assigns partitions based > > > on > > > > > topic names and it seems that the group coordinator should be > > > responsible > > > > > for mapping the topic names to topic ids. > > > > > > > > > > 81. group.consumer.assignors: Should we change the default values to > > > > > include the full class name? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Tue, Oct 18, 2022 at 2:36 AM David Jacot > > > <dja...@confluent.io.invalid> > > > > > wrote: > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > 80. I have included the mapping from topic ids to topic names in > > > > > > ConsumerGroupPrepareAssignmentResponse.Topics. It can be used to > > > > > > convert all the topic ids that you mentioned. It seems preferable to > > > > > > me to keep it like this as topic ids are usually smaller than topic > > > > > > names. Does that make sense? > > > > > > > > > > > > Thanks, > > > > > > David > > > > > > > > > > > > On Tue, Oct 18, 2022 at 1:51 AM Jun Rao <j...@confluent.io.invalid> > > > wrote: > > > > > > > > > > > > > > Hi, David, > > > > > > > > > > > > > > Thanks for the reply. Just one more comment. > > > > > > > > > > > > > > 80. Since PartitionAssignor.AssignmentMemberSpec uses topic name > > > for both > > > > > > > subscribedTopics and targetPartitions, in > > > > > > > ConsumerGroupPrepareAssignmentResponse, should > > > Members.SubscribedTopicIds > > > > > > > and Members.TopicPartitions.TopicId be topic names too? Similarly, > > > since > > > > > > > PartitionAssignor.MemberAssignment uses topic name, should > > > > > > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId > > > be topic > > > > > > > name too? > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > On Mon, Oct 17, 2022 at 2:35 AM David Jacot > > > <dja...@confluent.io.invalid > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > Thanks for your comments. Please find my answers below. > > > > > > > > > > > > > > > > 60. Sure. Let me use a concrete example to illustrate it. Let's > > > assume > > > > > > > > that KStreams has a member A with a task reading from foo-0 and > > > > > > > > a > > > > > > > > member B with a standby task reading from foo-0. As you know, > > > > > > > > the > > > > > > > > standby task information for B is encoded in the assignment > > > metadata > > > > > > > > whereas the assigned partition for A is in the assignment. Now, > > > let's > > > > > > > > imagine that the assignor decides to promote the standby task to > > > > > > > > become the active task for foo-0. The assignor will create a new > > > > > > > > assignment for the group and will encode the standby task in A's > > > > > > > > metadata and assign foo-0 to B. A will be requested to revoke > > > foo-0 > > > > > > > > and, while it does so, B will get its new metadata but without > > > foo-0 > > > > > > > > because foo-0 is not revoked yet. From the point of view of B, > > > it will > > > > > > > > see that the standby task is no longer there. Without providing > > > the > > > > > > > > full set of assigned partitions, it would not know what to do > > > here. If > > > > > > > > foo-0 is targeted to be assigned to B, B should wait until it is > > > > > > > > before stopping the standby task. If foo-0 is not, it can stop > > > the > > > > > > > > standby immediately. Long story short, KStreams needs to get the > > > full > > > > > > > > assignment + metadata in order to reason about the correct end > > > state. > > > > > > > > > > > > > > > > How do we provide this? We have added the pending partitions in > > > the > > > > > > > > ConsumerGroupHeartbeatResponse in conjunction to the assigned > > > > > > > > partitions. With this, the Consumer knows the full set of > > > partitions. > > > > > > > > PartitionAssignor#onAssign will be called when the member > > > transitions > > > > > > > > to a new epoch with the target partitions and the metadata for > > > the > > > > > > > > member. Then, RebalanceListener#onAssignedPartitions is called > > > when > > > > > > > > partitions are incrementally assigned. At most, it will be > > > called N > > > > > > > > times where N is the number of assigned partitions in the > > > > > > > > current > > > > > > > > epoch. > > > > > > > > > > > > > > > > For context, this was discussed with Guohzang in this thread. > > > > > > > > > > > > > > > > I hope that this clarifies the intent. > > > > > > > > > > > > > > > > 62. Got it. I have reworked that part as well. Unfortunately, I > > > think > > > > > > > > that we should keep using a Set here because it is already > > > > > > > > there. > > > > > > > > Deprecating the current one to change the type is not worth it. > > > > > > > > > > > > > > > > 63. Fixed as well. > > > > > > > > > > > > > > > > 68.2 Right. The error is only used by the client side assignor. > > > The > > > > > > > > client assignor will report a non-zero error code while > > > installing a > > > > > > > > new assignment if it was not able to compute a new assignment > > > for the > > > > > > > > group. In this case, the assignment is not changed but the error > > > is > > > > > > > > reported to all the members. KStreams will have a few of such > > > errors. > > > > > > > > They are listed in the KIP. > > > > > > > > > > > > > > > > When a new assignment is installed with the > > > > > > > > ConsumerGroupInstallAssignment API, the coordinator validates it > > > and > > > > > > > > rejects the installation directly if it is not valid. In this > > > case, > > > > > > > > only the member trying to install the assignment gets an error. > > > The > > > > > > > > other members are not aware of it as they keep their current > > > assignor. > > > > > > > > > > > > > > > > 71.1 That makes sense. Using SubscribedTopicNames is indeed more > > > > > > > > intuitive. I have changed the HB request to use it in order to > > > > > > > > be > > > > > > > > consistent. I have also added the topic names in the > > > > > > > > ConsumerGroupDescribe response. > > > > > > > > > > > > > > > > Best, > > > > > > > > David > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 8:05 PM Jun Rao > > > > > > > > <j...@confluent.io.invalid > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > Hi, David, > > > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > > > 60. Hmm, could you explain why KStreams needs the full set of > > > > > > partition > > > > > > > > > assignments? I am also not sure how this will be implemented > > > based > > > > > > on the > > > > > > > > > protocol. Since HeartBeatResponse sends the assigned > > > partitions in > > > > > > phases > > > > > > > > > (those that don't need to wait for revocation from other > > > members > > > > > > first, > > > > > > > > > followed by the full assignment list), how does a member know > > > which > > > > > > > > > response has the full assignment? > > > > > > > > > > > > > > > > > > 62. I was referring to Admin#describeConsumerGroups. It seems > > > that > > > > > > > > > ownedPartitions is still of type List<TopicIdPartition>. Also, > > > the > > > > > > > > > existing topicPartitions() returns Set<TopicPartition>, not a > > > > > > collection. > > > > > > > > > > > > > > > > > > 63. This is also in Admin#describeConsumerGroups. The comment > > > seems > > > > > > > > > inconsistent with the field name. > > > > > > > > > /** > > > > > > > > > * The reason reported by the assignor. > > > > > > > > > */ > > > > > > > > > byte error; > > > > > > > > > > > > > > > > > > 67. Thanks for the explanation. Make sense. The existing name > > > may be > > > > > > ok. > > > > > > > > > > > > > > > > > > 68.2 Are you saying the error is only intended for the client > > > > > > assignor? > > > > > > > > But > > > > > > > > > the coordinator generates the error based on the server side > > > > > > validation, > > > > > > > > > right? Should we provide some info to tell the client why the > > > > > > validation > > > > > > > > > fails? > > > > > > > > > > > > > > > > > > 71.1 Hmm, for SubscribedTopicIds, should we use topic name in > > > the > > > > > > > > > subscription part? That seems more intuitive---a subscription > > > > > > shouldn't > > > > > > > > > change just because a topic is recreated. For the assigned > > > > > > partitions, > > > > > > > > > perhaps we could include both topicId and name just like > > > > > > > > FetchOffsetRequest. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:49 AM Luke Chen <show...@gmail.com> > > > wrote: > > > > > > > > > > > > > > > > > > > Thanks for the update. > > > > > > > > > > Yes, I think using similar way as KIP-868 to fix this issue > > > makes > > > > > > > > sense. > > > > > > > > > > Let's consider it in the future. > > > > > > > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 5:16 PM David Jacot > > > > > > > > <dja...@confluent.io.invalid> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > > > > > > > > > > > Thanks for your questions. > > > > > > > > > > > > > > > > > > > > > > > 1. We will store the "targetAssignment" into log now. > > > But as we > > > > > > > > know, > > > > > > > > > > > there's max batch size limit (default 1MB), which means, > > > > > > > > > > > we > > > > > > cannot > > > > > > > > > > support > > > > > > > > > > > 1M partitions in one group (actually, it should be less > > > than 60k > > > > > > > > > > partitions > > > > > > > > > > > since we'll store {topicID+partition id}) by default now. > > > How > > > > > > will we > > > > > > > > > > > handle that? Do we expect users to adjust the max batch > > > size to > > > > > > > > support > > > > > > > > > > > large partitions in groups, which we don't need this > > > change for > > > > > > old > > > > > > > > > > > protocol? > > > > > > > > > > > > > > > > > > > > > > That's right. I have a few ideas to remove this limitation > > > in the > > > > > > > > > > > future but I decided to keep them for future improvement. > > > The > > > > > > KIP is > > > > > > > > > > > large enough and as the current protocol suffers from the > > > exact > > > > > > same > > > > > > > > > > > limitation, it is not a regression. > > > > > > > > > > > > > > > > > > > > > > For the future, my thinking is to split the assignment and > > > to > > > > > > only > > > > > > > > > > > write deltas to the log instead of re-writing all of it. > > > We would > > > > > > > > need > > > > > > > > > > > to use transactions for this in the coordinator (similarly > > > to > > > > > > > > > > > KIP-868). The challenge is that we have to ensure that > > > those > > > > > > deltas > > > > > > > > > > > are all written or completely roll backed. Otherwise, we > > > would > > > > > > have a > > > > > > > > > > > weird state with the compaction. This needs obviously more > > > > > > thinking. > > > > > > > > > > > > > > > > > > > > > > > I'm wondering why we should persist the > > > "targetAssignment" > > > > > > data? > > > > > > > > If we > > > > > > > > > > > want > > > > > > > > > > > to work for coordinator failover, could the new > > > coordinator try > > > > > > to > > > > > > > > > > request > > > > > > > > > > > for currently owned partitions from each consumer when > > > failed > > > > > > over? > > > > > > > > I'm > > > > > > > > > > not > > > > > > > > > > > sure if the consumer will auto send owned partitions to > > > the new > > > > > > > > > > > coordinator. If not, maybe we can return an error to > > > > > > > > > > > client > > > > > > > > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION > > > error, > > > > > > and > > > > > > > > ask > > > > > > > > > > > client to append the currently owned partitions to new > > > > > > coordinates > > > > > > > > for > > > > > > > > > > new > > > > > > > > > > > assignment computation. Does that make sense? > > > > > > > > > > > > > > > > > > > > > > The entire reconciliation process depends on it so if we > > > lose it > > > > > > > > > > > during a failover, members could be in a weird state. For > > > > > > instance, > > > > > > > > > > > they could be in the middle of a transition from their > > > current > > > > > > > > > > > assignment to their new target and thus would be blocked. > > > > > > Relying on > > > > > > > > > > > members to reconstruct it back does not really work > > > because they > > > > > > > > don't > > > > > > > > > > > have all the information to do so (e.g. new metadata) so > > > we would > > > > > > > > have > > > > > > > > > > > to recompute a new one. This implies that we need to get > > > the > > > > > > owned > > > > > > > > > > > partitions from all members and that would take a few > > > seconds > > > > > > until > > > > > > > > > > > all members come back in the best case, up to the session > > > > > > timeout in > > > > > > > > > > > the worst case. Imagine that a member joins or fails > > > during this > > > > > > > > time, > > > > > > > > > > > the whole process would be stuck. I am afraid storing it > > > is the > > > > > > best > > > > > > > > > > > way here. > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > David > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 5:11 AM Luke Chen < > > > show...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > > > > > > > > > > > A few more questions: > > > > > > > > > > > > 1. We will store the "targetAssignment" into log now. > > > But as we > > > > > > > > know, > > > > > > > > > > > > there's max batch size limit (default 1MB), which means, > > > we > > > > > > cannot > > > > > > > > > > > support > > > > > > > > > > > > 1M partitions in one group (actually, it should be less > > > than > > > > > > 60k > > > > > > > > > > > partitions > > > > > > > > > > > > since we'll store {topicID+partition id}) by default > > > now. How > > > > > > will > > > > > > > > we > > > > > > > > > > > > handle that? Do we expect users to adjust the max batch > > > size to > > > > > > > > support > > > > > > > > > > > > large partitions in groups, which we don't need this > > > change > > > > > > for old > > > > > > > > > > > > protocol? > > > > > > > > > > > > > > > > > > > > > > > > I'm wondering why we should persist the > > > "targetAssignment" > > > > > > data? > > > > > > > > If we > > > > > > > > > > > want > > > > > > > > > > > > to work for coordinator failover, could the new > > > coordinator > > > > > > try to > > > > > > > > > > > request > > > > > > > > > > > > for currently owned partitions from each consumer when > > > failed > > > > > > > > over? I'm > > > > > > > > > > > not > > > > > > > > > > > > sure if the consumer will auto send owned partitions to > > > the new > > > > > > > > > > > > coordinator. If not, maybe we can return an error to > > > client > > > > > > > > > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION > > > error, > > > > > > and > > > > > > > > ask > > > > > > > > > > > > client to append the currently owned partitions to new > > > > > > coordinates > > > > > > > > for > > > > > > > > > > > new > > > > > > > > > > > > assignment computation. Does that make sense? > > > > > > > > > > > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao > > > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi, David, > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the reply and the updated KIP. A few more > > > > > > comments on > > > > > > > > the > > > > > > > > > > > > > interfaces and the protocols. > > > > > > > > > > > > > > > > > > > > > > > > > > 60. On the consumer side, do we need both > > > > > > > > > > > PartitionAssignor.onAssignment > > > > > > > > > > > > > and ConsumerRebalanceListener.onPartitionsAssigned? My > > > > > > > > understanding > > > > > > > > > > is > > > > > > > > > > > > > that the former was added for cooperative rebalance, > > > which > > > > > > is now > > > > > > > > > > > handled > > > > > > > > > > > > > by the coordinator. If we do need both, should we make > > > them > > > > > > more > > > > > > > > > > > consistent > > > > > > > > > > > > > (e.g. topic name vs topic id, list vs set vs > > > collection)? > > > > > > > > > > > > > > > > > > > > > > > > > > 61. group.local.assignors: Could we make it clear that > > > it's > > > > > > the > > > > > > > > full > > > > > > > > > > > class > > > > > > > > > > > > > name that implements PartitionAssignor? > > > > > > > > > > > > > > > > > > > > > > > > > > 62. MemberAssignment: It currently has the following > > > method. > > > > > > > > > > > > > public Set<TopicPartition> topicPartitions() > > > > > > > > > > > > > We are adding List<TopicIdPartition> ownedPartitions. > > > Should > > > > > > we > > > > > > > > keep > > > > > > > > > > > the > > > > > > > > > > > > > naming and the return type consistent? > > > > > > > > > > > > > > > > > > > > > > > > > > 63. MemberAssignment.error: should that be reason? > > > > > > > > > > > > > > > > > > > > > > > > > > 64. group.remote.assignor: The client may not know > > > > > > > > > > > > > what > > > > > > > > assignors the > > > > > > > > > > > > > broker supports. Should we default this to what the > > > broker > > > > > > > > determines > > > > > > > > > > > (e.g. > > > > > > > > > > > > > first assignor listed in group.consumer.assignors)? > > > > > > > > > > > > > > > > > > > > > > > > > > 65. After the text "When A heartbeats again and > > > acknowledges > > > > > > the > > > > > > > > > > > > > revocation, the group coordinator transitions him to > > > epoch 2 > > > > > > and > > > > > > > > > > > releases > > > > > > > > > > > > > foo-2.", we have the following. > > > > > > > > > > > > > B - epoch=2, partitions=[foo-2], > > > pending-partitions=[] > > > > > > > > > > > > > Should foo-2 be in pending-partitions? > > > > > > > > > > > > > > > > > > > > > > > > > > 66. In the Online Migration example, is the first > > > occurence > > > > > > of > > > > > > > > "C - > > > > > > > > > > > > > epoch=23, partitions=[foo-2, foo-5, foo-4], > > > > > > > > pending-partitions=[]" > > > > > > > > > > > correct? > > > > > > > > > > > > > It seems that should happen after C receives a > > > SyncGroup > > > > > > > > response? If > > > > > > > > > > > so, > > > > > > > > > > > > > subsequent examples have the same issue. > > > > > > > > > > > > > > > > > > > > > > > > > > 67. ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs : > > > Which > > > > > > > > config > > > > > > > > > > > > > controls this? How is this used by the group > > > coordinator > > > > > > since > > > > > > > > there > > > > > > > > > > > is no > > > > > > > > > > > > > sync barrier anymore? > > > > > > > > > > > > > > > > > > > > > > > > > > 68. ConsumerGroupHeartbeatResponse: > > > > > > > > > > > > > 68.1 AssignedTopicPartitions and > > > PendingTopicPartitions are > > > > > > of > > > > > > > > type > > > > > > > > > > > > > []TopicPartition. Should they be TopicPartitions? > > > > > > > > > > > > > 68.2 Assignment.error. Should we also have an > > > errorMessage > > > > > > field? > > > > > > > > > > > > > > > > > > > > > > > > > > 69. > > > ConsumerGroupPrepareAssignmentResponse.Members.Assignor: > > > > > > > > Should > > > > > > > > > > it > > > > > > > > > > > > > include the selected assignor name? > > > > > > > > > > > > > > > > > > > > > > > > > > 70. ConsumerGroupInstallAssignmentRequest.GroupEpoch: > > > Should > > > > > > we > > > > > > > > let > > > > > > > > > > the > > > > > > > > > > > > > client set this? Intuitively, it seems the coordinator > > > should > > > > > > > > manage > > > > > > > > > > > the > > > > > > > > > > > > > group epoch. > > > > > > > > > > > > > > > > > > > > > > > > > > 71. ConsumerGroupDescribeResponse: > > > > > > > > > > > > > 71.1 Members.Assignment.Partitions. Should we include > > > the > > > > > > topic > > > > > > > > name > > > > > > > > > > > too > > > > > > > > > > > > > since it's convenient for building tools? Ditto for > > > > > > > > TargetAssignment. > > > > > > > > > > > > > 71.2 Members: Should we include SubscribedTopicRegex > > > too? > > > > > > > > > > > > > > > > > > > > > > > > > > 72. OffsetFetchRequest: Is GenerationIdOrMemberEpoch > > > needed > > > > > > since > > > > > > > > > > > tools may > > > > > > > > > > > > > also want to issue this request? > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >