Thanks for the update.
Yes, I think using similar way as KIP-868 to fix this issue makes sense.
Let's consider it in the future.

Luke

On Fri, Oct 14, 2022 at 5:16 PM David Jacot <dja...@confluent.io.invalid>
wrote:

> Hi Luke,
>
> Thanks for your questions.
>
> > 1. We will store the "targetAssignment" into log now. But as we know,
> there's max batch size limit (default 1MB), which means, we cannot support
> 1M partitions in one group (actually, it should be less than 60k partitions
> since we'll store {topicID+partition id}) by default now. How will we
> handle that? Do we expect users to adjust the max batch size to support
> large partitions in groups, which we don't need this change for old
> protocol?
>
> That's right. I have a few ideas to remove this limitation in the
> future but I decided to keep them for future improvement. The KIP is
> large enough and as the current protocol suffers from the exact same
> limitation, it is not a regression.
>
> For the future, my thinking is to split the assignment and to only
> write deltas to the log instead of re-writing all of it. We would need
> to use transactions for this in the coordinator (similarly to
> KIP-868). The challenge is that we have to ensure that those deltas
> are all written or completely roll backed. Otherwise, we would have a
> weird state with the compaction. This needs obviously more thinking.
>
> > I'm wondering why we should persist the "targetAssignment" data? If we
> want
> to work for coordinator failover, could the new coordinator try to request
> for currently owned partitions from each consumer when failed over? I'm not
> sure if the consumer will auto send owned partitions to the new
> coordinator. If not, maybe we can return an error to client
> ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and ask
> client to append the currently owned partitions to new coordinates for new
> assignment computation. Does that make sense?
>
> The entire reconciliation process depends on it so if we lose it
> during a failover, members could be in a weird state. For instance,
> they could be in the middle of a transition from their current
> assignment to their new target and thus would be blocked. Relying on
> members to reconstruct it back does not really work because they don't
> have all the information to do so (e.g. new metadata) so we would have
> to recompute a new one. This implies that we need to get the owned
> partitions from all members and that would take a few seconds until
> all members come back in the best case, up to the session timeout in
> the worst case. Imagine that a member joins or fails during this time,
> the whole process would be stuck. I am afraid storing it is the best
> way here.
>
> Best,
> David
>
>
> On Fri, Oct 14, 2022 at 5:11 AM Luke Chen <show...@gmail.com> wrote:
> >
> > Hi David,
> >
> > A few more questions:
> > 1. We will store the "targetAssignment" into log now. But as we know,
> > there's max batch size limit (default 1MB), which means, we cannot
> support
> > 1M partitions in one group (actually, it should be less than 60k
> partitions
> > since we'll store {topicID+partition id}) by default now. How will we
> > handle that? Do we expect users to adjust the max batch size to support
> > large partitions in groups, which we don't need this change for old
> > protocol?
> >
> > I'm wondering why we should persist the "targetAssignment" data? If we
> want
> > to work for coordinator failover, could the new coordinator try to
> request
> > for currently owned partitions from each consumer when failed over? I'm
> not
> > sure if the consumer will auto send owned partitions to the new
> > coordinator. If not, maybe we can return an error to client
> > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and ask
> > client to append the currently owned partitions to new coordinates for
> new
> > assignment computation. Does that make sense?
> >
> > Luke
> >
> > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao <j...@confluent.io.invalid>
> wrote:
> >
> > > Hi, David,
> > >
> > > Thanks for the reply and the updated KIP. A few more comments on the
> > > interfaces and the protocols.
> > >
> > > 60.  On the consumer side, do we need both
> PartitionAssignor.onAssignment
> > > and ConsumerRebalanceListener.onPartitionsAssigned? My understanding is
> > > that the former was added for cooperative rebalance, which is now
> handled
> > > by the coordinator. If we do need both, should we make them more
> consistent
> > > (e.g. topic name vs topic id, list vs set vs collection)?
> > >
> > > 61. group.local.assignors: Could we make it clear that it's the full
> class
> > > name that implements PartitionAssignor?
> > >
> > > 62. MemberAssignment: It currently has the following method.
> > >     public Set<TopicPartition> topicPartitions()
> > > We are adding List<TopicIdPartition> ownedPartitions. Should we keep
> the
> > > naming and the return type consistent?
> > >
> > > 63. MemberAssignment.error: should that be reason?
> > >
> > > 64. group.remote.assignor: The client may not know what assignors the
> > > broker supports. Should we default this to what the broker determines
> (e.g.
> > > first assignor listed in group.consumer.assignors)?
> > >
> > > 65. After the text "When A heartbeats again and acknowledges the
> > > revocation, the group coordinator transitions him to epoch 2 and
> releases
> > > foo-2.", we have the following.
> > >   B - epoch=2, partitions=[foo-2], pending-partitions=[]
> > > Should foo-2 be in pending-partitions?
> > >
> > > 66. In the Online Migration example, is the first occurence of "C -
> > > epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]"
> correct?
> > > It seems that should happen after C receives a SyncGroup response? If
> so,
> > > subsequent examples have the same issue.
> > >
> > > 67. ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs : Which config
> > > controls this? How is this used by the group coordinator since there
> is no
> > > sync barrier anymore?
> > >
> > > 68. ConsumerGroupHeartbeatResponse:
> > > 68.1 AssignedTopicPartitions and PendingTopicPartitions are of type
> > > []TopicPartition. Should they be TopicPartitions?
> > > 68.2 Assignment.error. Should we also have an errorMessage field?
> > >
> > > 69. ConsumerGroupPrepareAssignmentResponse.Members.Assignor: Should it
> > > include the selected assignor name?
> > >
> > > 70. ConsumerGroupInstallAssignmentRequest.GroupEpoch: Should we let the
> > > client set this? Intuitively, it seems the coordinator should manage
> the
> > > group epoch.
> > >
> > > 71. ConsumerGroupDescribeResponse:
> > > 71.1 Members.Assignment.Partitions. Should we include the topic name
> too
> > > since it's convenient for building tools? Ditto for TargetAssignment.
> > > 71.2 Members: Should we include SubscribedTopicRegex too?
> > >
> > > 72. OffsetFetchRequest: Is GenerationIdOrMemberEpoch needed since
> tools may
> > > also want to issue this request?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
>

Reply via email to