Hi Jun,

81. I forgot to say that I put UniformAssignor as the first one in the
list. I think that it should be the default one.

Best,
David

On Tue, Oct 18, 2022 at 8:33 PM David Jacot <dja...@confluent.io> wrote:
>
> Hi Jun,
>
> 80. Hmm. It seems preferable to keep
> ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId as a
> topic id in order for the RPC to remain symmetrical with the
> PrepareAssignment RPCs. The client has to create the TopicPartitions
> from the mapping provided in the PrepareAssignment response so it can
> use the same mapping to convert them back to topic ids afterwards. I
> personally find this cleaner from an RPC perspective, don't you think?
>
> 81. Make sense. Done.
>
> Thanks,
> David
>
> On Tue, Oct 18, 2022 at 8:00 PM Jun Rao <j...@confluent.io.invalid> wrote:
> >
> > Hi, David,
> >
> > Thanks for the reply.
> >
> > 80. The change in ConsumerGroupPrepareAssignmentResponse sounds good to me.
> > Should ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId be
> > topic name? Intuitively, the client assignor assigns partitions based on
> > topic names and it seems that the group coordinator should be responsible
> > for mapping the topic names to topic ids.
> >
> > 81. group.consumer.assignors: Should we change the default values to
> > include the full class name?
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Oct 18, 2022 at 2:36 AM David Jacot <dja...@confluent.io.invalid>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > 80. I have included the mapping from topic ids to topic names in
> > > ConsumerGroupPrepareAssignmentResponse.Topics. It can be used to
> > > convert all the topic ids that you mentioned. It seems preferable to
> > > me to keep it like this as topic ids are usually smaller than topic
> > > names. Does that make sense?
> > >
> > > Thanks,
> > > David
> > >
> > > On Tue, Oct 18, 2022 at 1:51 AM Jun Rao <j...@confluent.io.invalid> wrote:
> > > >
> > > > Hi, David,
> > > >
> > > > Thanks for the reply. Just one more comment.
> > > >
> > > > 80. Since PartitionAssignor.AssignmentMemberSpec uses topic name for 
> > > > both
> > > > subscribedTopics and targetPartitions, in
> > > > ConsumerGroupPrepareAssignmentResponse, should 
> > > > Members.SubscribedTopicIds
> > > > and Members.TopicPartitions.TopicId be topic names too? Similarly, since
> > > > PartitionAssignor.MemberAssignment uses topic name, should
> > > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId be 
> > > > topic
> > > > name too?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Mon, Oct 17, 2022 at 2:35 AM David Jacot <dja...@confluent.io.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Thanks for your comments. Please find my answers below.
> > > > >
> > > > > 60. Sure. Let me use a concrete example to illustrate it. Let's assume
> > > > > that KStreams has a member A with a task reading from foo-0 and a
> > > > > member B with a standby task reading from foo-0. As you know, the
> > > > > standby task information for B is encoded in the assignment metadata
> > > > > whereas the assigned partition for A is in the assignment. Now, let's
> > > > > imagine that the assignor decides to promote the standby task to
> > > > > become the active task for foo-0. The assignor will create a new
> > > > > assignment for the group and will encode the standby task in A's
> > > > > metadata and assign foo-0 to B. A will be requested to revoke foo-0
> > > > > and, while it does so, B will get its new metadata but without foo-0
> > > > > because foo-0 is not revoked yet. From the point of view of B, it will
> > > > > see that the standby task is no longer there. Without providing the
> > > > > full set of assigned partitions, it would not know what to do here. If
> > > > > foo-0 is targeted to be assigned to B, B should wait until it is
> > > > > before stopping the standby task. If foo-0 is not, it can stop the
> > > > > standby immediately. Long story short, KStreams needs to get the full
> > > > > assignment + metadata in order to reason about the correct end state.
> > > > >
> > > > > How do we provide this? We have added the pending partitions in the
> > > > > ConsumerGroupHeartbeatResponse in conjunction to the assigned
> > > > > partitions. With this, the Consumer knows the full set of partitions.
> > > > > PartitionAssignor#onAssign will be called when the member transitions
> > > > > to a new epoch with the target partitions and the metadata for the
> > > > > member. Then, RebalanceListener#onAssignedPartitions is called when
> > > > > partitions are incrementally assigned. At most, it will be called N
> > > > > times where N is the number of assigned partitions in the current
> > > > > epoch.
> > > > >
> > > > > For context, this was discussed with Guohzang in this thread.
> > > > >
> > > > > I hope that this clarifies the intent.
> > > > >
> > > > > 62. Got it. I have reworked that part as well. Unfortunately, I think
> > > > > that we should keep using a Set here because it is already there.
> > > > > Deprecating the current one to change the type is not worth it.
> > > > >
> > > > > 63. Fixed as well.
> > > > >
> > > > > 68.2 Right. The error is only used by the client side assignor. The
> > > > > client assignor will report a non-zero error code while installing a
> > > > > new assignment if it was not able to compute a new assignment for the
> > > > > group. In this case, the assignment is not changed but the error is
> > > > > reported to all the members. KStreams will have a few of such errors.
> > > > > They are listed in the KIP.
> > > > >
> > > > > When a new assignment is installed with the
> > > > > ConsumerGroupInstallAssignment API, the coordinator validates it and
> > > > > rejects the installation directly if it is not valid. In this case,
> > > > > only the member trying to install the assignment gets an error. The
> > > > > other members are not aware of it as they keep their current assignor.
> > > > >
> > > > > 71.1 That makes sense. Using SubscribedTopicNames is indeed more
> > > > > intuitive. I have changed the HB request to use it in order to be
> > > > > consistent. I have also added the topic names in the
> > > > > ConsumerGroupDescribe response.
> > > > >
> > > > > Best,
> > > > > David
> > > > >
> > > > > On Fri, Oct 14, 2022 at 8:05 PM Jun Rao <j...@confluent.io.invalid>
> > > wrote:
> > > > > >
> > > > > > Hi, David,
> > > > > >
> > > > > > Thanks for the reply.
> > > > > >
> > > > > > 60. Hmm, could you explain why KStreams needs the full set of
> > > partition
> > > > > > assignments? I am also not sure how this will be implemented based
> > > on the
> > > > > > protocol. Since HeartBeatResponse sends the assigned partitions in
> > > phases
> > > > > > (those that don't need to wait for revocation from other members
> > > first,
> > > > > > followed by the full assignment list), how does a member know which
> > > > > > response has the full assignment?
> > > > > >
> > > > > > 62. I was referring to Admin#describeConsumerGroups. It seems that
> > > > > > ownedPartitions is still of type List<TopicIdPartition>. Also, the
> > > > > > existing topicPartitions() returns Set<TopicPartition>, not a
> > > collection.
> > > > > >
> > > > > > 63. This is also in Admin#describeConsumerGroups. The comment seems
> > > > > > inconsistent with the field name.
> > > > > >     /**
> > > > > >      * The reason reported by the assignor.
> > > > > >      */
> > > > > >     byte error;
> > > > > >
> > > > > > 67. Thanks for the explanation. Make sense. The existing name may be
> > > ok.
> > > > > >
> > > > > > 68.2 Are you saying the error is only intended for the client
> > > assignor?
> > > > > But
> > > > > > the coordinator generates the error based on the server side
> > > validation,
> > > > > > right? Should we provide some info to tell the client why the
> > > validation
> > > > > > fails?
> > > > > >
> > > > > > 71.1 Hmm, for SubscribedTopicIds, should we use topic name in the
> > > > > > subscription part? That seems more intuitive---a subscription
> > > shouldn't
> > > > > > change just because a topic is recreated. For the assigned
> > > partitions,
> > > > > > perhaps we could include both topicId and name just like
> > > > > FetchOffsetRequest.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Fri, Oct 14, 2022 at 2:49 AM Luke Chen <show...@gmail.com> wrote:
> > > > > >
> > > > > > > Thanks for the update.
> > > > > > > Yes, I think using similar way as KIP-868 to fix this issue makes
> > > > > sense.
> > > > > > > Let's consider it in the future.
> > > > > > >
> > > > > > > Luke
> > > > > > >
> > > > > > > On Fri, Oct 14, 2022 at 5:16 PM David Jacot
> > > > > <dja...@confluent.io.invalid>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Luke,
> > > > > > > >
> > > > > > > > Thanks for your questions.
> > > > > > > >
> > > > > > > > > 1. We will store the "targetAssignment" into log now. But as 
> > > > > > > > > we
> > > > > know,
> > > > > > > > there's max batch size limit (default 1MB), which means, we
> > > cannot
> > > > > > > support
> > > > > > > > 1M partitions in one group (actually, it should be less than 60k
> > > > > > > partitions
> > > > > > > > since we'll store {topicID+partition id}) by default now. How
> > > will we
> > > > > > > > handle that? Do we expect users to adjust the max batch size to
> > > > > support
> > > > > > > > large partitions in groups, which we don't need this change for
> > > old
> > > > > > > > protocol?
> > > > > > > >
> > > > > > > > That's right. I have a few ideas to remove this limitation in 
> > > > > > > > the
> > > > > > > > future but I decided to keep them for future improvement. The
> > > KIP is
> > > > > > > > large enough and as the current protocol suffers from the exact
> > > same
> > > > > > > > limitation, it is not a regression.
> > > > > > > >
> > > > > > > > For the future, my thinking is to split the assignment and to
> > > only
> > > > > > > > write deltas to the log instead of re-writing all of it. We 
> > > > > > > > would
> > > > > need
> > > > > > > > to use transactions for this in the coordinator (similarly to
> > > > > > > > KIP-868). The challenge is that we have to ensure that those
> > > deltas
> > > > > > > > are all written or completely roll backed. Otherwise, we would
> > > have a
> > > > > > > > weird state with the compaction. This needs obviously more
> > > thinking.
> > > > > > > >
> > > > > > > > > I'm wondering why we should persist the "targetAssignment"
> > > data?
> > > > > If we
> > > > > > > > want
> > > > > > > > to work for coordinator failover, could the new coordinator try
> > > to
> > > > > > > request
> > > > > > > > for currently owned partitions from each consumer when failed
> > > over?
> > > > > I'm
> > > > > > > not
> > > > > > > > sure if the consumer will auto send owned partitions to the new
> > > > > > > > coordinator. If not, maybe we can return an error to client
> > > > > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error,
> > > and
> > > > > ask
> > > > > > > > client to append the currently owned partitions to new
> > > coordinates
> > > > > for
> > > > > > > new
> > > > > > > > assignment computation. Does that make sense?
> > > > > > > >
> > > > > > > > The entire reconciliation process depends on it so if we lose it
> > > > > > > > during a failover, members could be in a weird state. For
> > > instance,
> > > > > > > > they could be in the middle of a transition from their current
> > > > > > > > assignment to their new target and thus would be blocked.
> > > Relying on
> > > > > > > > members to reconstruct it back does not really work because they
> > > > > don't
> > > > > > > > have all the information to do so (e.g. new metadata) so we 
> > > > > > > > would
> > > > > have
> > > > > > > > to recompute a new one. This implies that we need to get the
> > > owned
> > > > > > > > partitions from all members and that would take a few seconds
> > > until
> > > > > > > > all members come back in the best case, up to the session
> > > timeout in
> > > > > > > > the worst case. Imagine that a member joins or fails during this
> > > > > time,
> > > > > > > > the whole process would be stuck. I am afraid storing it is the
> > > best
> > > > > > > > way here.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > David
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 14, 2022 at 5:11 AM Luke Chen <show...@gmail.com>
> > > wrote:
> > > > > > > > >
> > > > > > > > > Hi David,
> > > > > > > > >
> > > > > > > > > A few more questions:
> > > > > > > > > 1. We will store the "targetAssignment" into log now. But as 
> > > > > > > > > we
> > > > > know,
> > > > > > > > > there's max batch size limit (default 1MB), which means, we
> > > cannot
> > > > > > > > support
> > > > > > > > > 1M partitions in one group (actually, it should be less than
> > > 60k
> > > > > > > > partitions
> > > > > > > > > since we'll store {topicID+partition id}) by default now. How
> > > will
> > > > > we
> > > > > > > > > handle that? Do we expect users to adjust the max batch size 
> > > > > > > > > to
> > > > > support
> > > > > > > > > large partitions in groups, which we don't need this change
> > > for old
> > > > > > > > > protocol?
> > > > > > > > >
> > > > > > > > > I'm wondering why we should persist the "targetAssignment"
> > > data?
> > > > > If we
> > > > > > > > want
> > > > > > > > > to work for coordinator failover, could the new coordinator
> > > try to
> > > > > > > > request
> > > > > > > > > for currently owned partitions from each consumer when failed
> > > > > over? I'm
> > > > > > > > not
> > > > > > > > > sure if the consumer will auto send owned partitions to the 
> > > > > > > > > new
> > > > > > > > > coordinator. If not, maybe we can return an error to client
> > > > > > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error,
> > > and
> > > > > ask
> > > > > > > > > client to append the currently owned partitions to new
> > > coordinates
> > > > > for
> > > > > > > > new
> > > > > > > > > assignment computation. Does that make sense?
> > > > > > > > >
> > > > > > > > > Luke
> > > > > > > > >
> > > > > > > > > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao
> > > <j...@confluent.io.invalid
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, David,
> > > > > > > > > >
> > > > > > > > > > Thanks for the reply and the updated KIP. A few more
> > > comments on
> > > > > the
> > > > > > > > > > interfaces and the protocols.
> > > > > > > > > >
> > > > > > > > > > 60.  On the consumer side, do we need both
> > > > > > > > PartitionAssignor.onAssignment
> > > > > > > > > > and ConsumerRebalanceListener.onPartitionsAssigned? My
> > > > > understanding
> > > > > > > is
> > > > > > > > > > that the former was added for cooperative rebalance, which
> > > is now
> > > > > > > > handled
> > > > > > > > > > by the coordinator. If we do need both, should we make them
> > > more
> > > > > > > > consistent
> > > > > > > > > > (e.g. topic name vs topic id, list vs set vs collection)?
> > > > > > > > > >
> > > > > > > > > > 61. group.local.assignors: Could we make it clear that it's
> > > the
> > > > > full
> > > > > > > > class
> > > > > > > > > > name that implements PartitionAssignor?
> > > > > > > > > >
> > > > > > > > > > 62. MemberAssignment: It currently has the following method.
> > > > > > > > > >     public Set<TopicPartition> topicPartitions()
> > > > > > > > > > We are adding List<TopicIdPartition> ownedPartitions. Should
> > > we
> > > > > keep
> > > > > > > > the
> > > > > > > > > > naming and the return type consistent?
> > > > > > > > > >
> > > > > > > > > > 63. MemberAssignment.error: should that be reason?
> > > > > > > > > >
> > > > > > > > > > 64. group.remote.assignor: The client may not know what
> > > > > assignors the
> > > > > > > > > > broker supports. Should we default this to what the broker
> > > > > determines
> > > > > > > > (e.g.
> > > > > > > > > > first assignor listed in group.consumer.assignors)?
> > > > > > > > > >
> > > > > > > > > > 65. After the text "When A heartbeats again and acknowledges
> > > the
> > > > > > > > > > revocation, the group coordinator transitions him to epoch 2
> > > and
> > > > > > > > releases
> > > > > > > > > > foo-2.", we have the following.
> > > > > > > > > >   B - epoch=2, partitions=[foo-2], pending-partitions=[]
> > > > > > > > > > Should foo-2 be in pending-partitions?
> > > > > > > > > >
> > > > > > > > > > 66. In the Online Migration example, is the first occurence
> > > of
> > > > > "C -
> > > > > > > > > > epoch=23, partitions=[foo-2, foo-5, foo-4],
> > > > > pending-partitions=[]"
> > > > > > > > correct?
> > > > > > > > > > It seems that should happen after C receives a SyncGroup
> > > > > response? If
> > > > > > > > so,
> > > > > > > > > > subsequent examples have the same issue.
> > > > > > > > > >
> > > > > > > > > > 67. ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs : Which
> > > > > config
> > > > > > > > > > controls this? How is this used by the group coordinator
> > > since
> > > > > there
> > > > > > > > is no
> > > > > > > > > > sync barrier anymore?
> > > > > > > > > >
> > > > > > > > > > 68. ConsumerGroupHeartbeatResponse:
> > > > > > > > > > 68.1 AssignedTopicPartitions and PendingTopicPartitions are
> > > of
> > > > > type
> > > > > > > > > > []TopicPartition. Should they be TopicPartitions?
> > > > > > > > > > 68.2 Assignment.error. Should we also have an errorMessage
> > > field?
> > > > > > > > > >
> > > > > > > > > > 69. ConsumerGroupPrepareAssignmentResponse.Members.Assignor:
> > > > > Should
> > > > > > > it
> > > > > > > > > > include the selected assignor name?
> > > > > > > > > >
> > > > > > > > > > 70. ConsumerGroupInstallAssignmentRequest.GroupEpoch: Should
> > > we
> > > > > let
> > > > > > > the
> > > > > > > > > > client set this? Intuitively, it seems the coordinator 
> > > > > > > > > > should
> > > > > manage
> > > > > > > > the
> > > > > > > > > > group epoch.
> > > > > > > > > >
> > > > > > > > > > 71. ConsumerGroupDescribeResponse:
> > > > > > > > > > 71.1 Members.Assignment.Partitions. Should we include the
> > > topic
> > > > > name
> > > > > > > > too
> > > > > > > > > > since it's convenient for building tools? Ditto for
> > > > > TargetAssignment.
> > > > > > > > > > 71.2 Members: Should we include SubscribedTopicRegex too?
> > > > > > > > > >
> > > > > > > > > > 72. OffsetFetchRequest: Is GenerationIdOrMemberEpoch needed
> > > since
> > > > > > > > tools may
> > > > > > > > > > also want to issue this request?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > >

Reply via email to