Hi David,

Thanks for the response. Replies below:

> To answer your two other questions.
The consumer would keep it after a session timeout but the coordinator
would not. The coordinator would return UNKNOWN_MEMBER_ID when the
member id does not exist and the member epoch is not equal to zero.
This differentiation is perhaps overkill here. What do you think?

I do think the concept of an incarnation ID is useful. It allows us to
continue tracking consumer instances as they join and leave the group,
which is useful when debugging. I find the handling a tad awkward
though. It sounds like we are trying not to let the UUID be generated on
the client side, but after the coordinator drops the state, then it has
to trust the client. That is probably fine I guess. Alternatively, we could
separate
the incarnation ID. Let the client provide the value for that, but let the
coordinator provide a member (or session) ID which is only valid for the
duration of the session. Not really sure it's worth having two IDs, but it
would get around the slight awkwardness. Or perhaps we could let the
coordinator trust the client-provided ID consistently. A small benefit of
either of these approaches is that the ID would cover the entire lifecycle
of the consumer, and not just the time following coordinator introduction.
That might give us an easy grep for the logs of a specific consumer
instance. Anyway, I don't feel too strongly about this.

> Regarding reusing the metadata.version, I see your point. Decoupling
has the advantage that both could be managed independently. The
disadvantages is that operators have to manage them independently. It
makes the upgrade/downgrade procedure a little more complex. I think
that it really depends on how independent we want to be here. What's
your take?

I think the way we viewed this was that for most users, feature upgrades
would be done using the `kafka-features.sh upgrade --release` option.
So using `upgrade --release 3.4` might give you  `metadata.version=8`
and `group.coordinator.version=1`. Most users would not see it,
but it avoids the coupling with the metadata version which I see as
mainly about the versions of records in the metadata log. The more
advanced users might prefer to enable this feature only after it has
cleared testing and operational requirements. It also gives us some
flexibility in regard to when the feature becomes the default. For example,
we might let it be experimental in the first release and only become part
of the automatic `--release` upgrade in a later release. That would
lower its risk given the inability to downgrade.

> There is a bit of complexity here so I wonder if we should tackle this
in a separate KIP. We would definitely have to do this before we could
release the new protocol. I could add this in the future work section.
What do you think?

A separate KIP is reasonable. Just mentioning it since we have been
burned in the past by incompatible changes in the group coordinator.

> It definitely is a scalability problem. We are basically limited by
the 1MB like today. I would like to improve this as well. For
instance, we could have one record per member and only write the
difference between the previous and the new assignment to the log. The
issue with this is that the write must be atomic here. Otherwise, we
would end up with a weird state of the records are partially written
and the log is compacted. My idea is to do something similar to
KIP-868 but by reusing existing transactional markers. That would
allow us to abort the transaction when the coordinator is reloaded. I
wanted to cover this separately from the current proposal. Do you
think that we should do this in this KIP?

That is a reasonable idea. I am wondering if there are lighter weight
options
though.  Suppose that we used separate records for assignment metadata
and individual member assignments. In the metadata, we might identify
all the member IDs that are covered by the assignment. Then when we load
the assignment, we can validate that all the member assignment records
are present. If not, then we consider it invalid and begin a
new assignment.
Just a thought.

Thanks,
Jason

On Wed, Oct 19, 2022 at 10:47 AM David Jacot <dja...@confluent.io.invalid>
wrote:

> Hi Jason,
>
> Thanks for your comments.
>
> 1. My intent was to let the consumer reuse its member id when it
> rejoins the group after being fenced. I basically wanted to
> differentiate a restart of the consumer from a rejoin of the member on
> the server side. Joining with a member id would only be allowed with
> the member epoch equals to zero. To answer your two other questions.
> The consumer would keep it after a session timeout but the coordinator
> would not. The coordinator would return UNKNOWN_MEMBER_ID when the
> member id does not exist and the member epoch is not equal to zero.
> This differentiation is perhaps overkill here. What do you think?
>
> 2. That's an interesting question. My intent was to start small here
> and to use the metadata version like we use the IBP today. In this
> regard, the current Group Coordinator does not really handle
> downgrades if I remember correctly. That being said, I do agree with
> you that we should do better here.
>
> Can we support downgrades? I suppose that the challenge is that we
> would have to rewrite all the records to the logs before downgrading
> the software. We could do this when the metadata version is changed
> for instance. The issue is that the log would still have the previous
> records until compaction happens. This gives us two choices: 1) we
> force the compaction; or 2) we delete the past once we have rewritten
> the whole state. We would have to handle backward compatibility
> breakages in the controller as well like for the controller. We could
> add another boolean to the MetadataVersion. Does it sound reasonable?
> I was thinking out loud here :).
>
> Regarding reusing the metadata.version, I see your point. Decoupling
> has the advantage that both could be managed independently. The
> disadvantages is that operators have to manage them independently. It
> makes the upgrade/downgrade procedure a little more complex. I think
> that it really depends on how independent we want to be here. What's
> your take?
>
> There is a bit of complexity here so I wonder if we should tackle this
> in a separate KIP. We would definitely have to do this before we could
> release the new protocol. I could add this in the future work section.
> What do you think?
>
> 3. It definitely is a scalability problem. We are basically limited by
> the 1MB like today. I would like to improve this as well. For
> instance, we could have one record per member and only write the
> difference between the previous and the new assignment to the log. The
> issue with this is that the write must be atomic here. Otherwise, we
> would end up with a weird state of the records are partially written
> and the log is compacted. My idea is to do something similar to
> KIP-868 but by reusing existing transactional markers. That would
> allow us to abort the transaction when the coordinator is reloaded. I
> wanted to cover this separately from the current proposal. Do you
> think that we should do this in this KIP?
>
> Cheers,
> David
>
> On Wed, Oct 19, 2022 at 6:53 PM Jason Gustafson
> <ja...@confluent.io.invalid> wrote:
> >
> > Hi David,
> >
> > A few questions below:
> >
> > 1. In regard to this comment:
> >
> > > Every member is uniquely identified by a UUID. This is called the
> Member
> > ID. This UUID is generated on the server side and given to the member
> when
> > it joins the group. It is used in all the communication with the group
> > coordinator and must be kept during the entirely life span of the member
> > (e.g. the consumer). In that sense, it is similar to an incarnation ID.
> >
> > This is a little confusing to me. The id is generated when we join the
> > group, but it is also intended to be kept for the life span of the
> > consumer? Do we keep it after a session timeout? In what case does the
> > coordinator send UNKNOWN_MEMBER_ID?
> >
> > 2. It sounds like the plan is to piggyback on metadata.version for the
> > upgrade of the persistent records stored in __consumer_offsets. Do I have
> > that right? The metadata version proposal (KIP-778) covers downgrade for
> > incompatible changes to the records in the metadata log, but I don't
> think
> > we've said anything yet about other internal topic data. I think the new
> > records would definitely be an incompatible change, so I guess the basic
> > question is whether downgrade will be covered? An alternative to using
> the
> > metadata version which has been proposed in the past is to use a new
> > feature version for the group coordinator. This would avoid coupling the
> > group changes so that metadata downgrades would still be possible.
> >
> > 3. The doc mentions that member metadata is stored in separate records in
> > order to avoid the batch limit. The group assignment, on the other hand,
> is
> > still stored as a single record. Will that be a scalability problem?
> >
> > Thanks,
> > Jason
> >
> > On Wed, Oct 19, 2022 at 5:30 AM David Jacot <dja...@confluent.io.invalid
> >
> > wrote:
> >
> > > Hi all,
> > >
> > > I would like to highlight a small change that I did in the KIP around
> > > static membership based on Magnus' feedback. Magnus pointed out that
> > > two members using the same static instance id will basically
> > > continuously fence each other. The issue is that the existing member
> > > is fenced and replaced by the new joining member. This is not ideal.
> > > Note that Jason mentioned something similar earlier in this thread.
> > >
> > > I think that we can make this slightly better by rejecting the new
> > > member until the lease of the existing member is either explicitly
> > > terminated or expired based on the session timeout. The existing
> > > member can terminate its lease by sending an heartbeat with -2 as
> > > member epoch. This would signal to the group coordinator that the
> > > existing member left the group with the intent to rejoin within the
> > > session timeout. The new member is rejected with a new error code
> > > UNRELEASED_INSTANCE_ID during that time period. In case of failure of
> > > the existing member, the new member has to wait until its session
> > > expires.
> > >
> > > Best,
> > > David
> > >
> > >
> > > On Wed, Oct 19, 2022 at 10:05 AM David Jacot <dja...@confluent.io>
> wrote:
> > > >
> > > > Hi Jun,
> > > >
> > > > Thanks for your thorough review. There is already a vote thread if
> you
> > > > want to vote.
> > > >
> > > > Best,
> > > > David
> > > >
> > > > On Tue, Oct 18, 2022 at 11:07 PM Jun Rao <j...@confluent.io.invalid>
> > > wrote:
> > > > >
> > > > > Hi, David,
> > > > >
> > > > > Thanks for the reply. No more comments from me.
> > > > >
> > > > > 80. Yes, since PrepareAssignment returns topicIds, using topicId in
> > > > > ConsumerGroupInstallAssignmentRequest makes sense.
> > > > >
> > > > > 81. Sounds good.
> > > > >
> > > > > Jun
> > > > >
> > > > > On Tue, Oct 18, 2022 at 11:46 AM David Jacot
> > > <dja...@confluent.io.invalid>
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > 81. I forgot to say that I put UniformAssignor as the first one
> in
> > > the
> > > > > > list. I think that it should be the default one.
> > > > > >
> > > > > > Best,
> > > > > > David
> > > > > >
> > > > > > On Tue, Oct 18, 2022 at 8:33 PM David Jacot <dja...@confluent.io
> >
> > > wrote:
> > > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > 80. Hmm. It seems preferable to keep
> > > > > > >
> ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId
> > > as a
> > > > > > > topic id in order for the RPC to remain symmetrical with the
> > > > > > > PrepareAssignment RPCs. The client has to create the
> > > TopicPartitions
> > > > > > > from the mapping provided in the PrepareAssignment response so
> it
> > > can
> > > > > > > use the same mapping to convert them back to topic ids
> afterwards.
> > > I
> > > > > > > personally find this cleaner from an RPC perspective, don't you
> > > think?
> > > > > > >
> > > > > > > 81. Make sense. Done.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > David
> > > > > > >
> > > > > > > On Tue, Oct 18, 2022 at 8:00 PM Jun Rao
> <j...@confluent.io.invalid>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi, David,
> > > > > > > >
> > > > > > > > Thanks for the reply.
> > > > > > > >
> > > > > > > > 80. The change in ConsumerGroupPrepareAssignmentResponse
> sounds
> > > good
> > > > > > to me.
> > > > > > > > Should
> > > > > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId
> be
> > > > > > > > topic name? Intuitively, the client assignor assigns
> partitions
> > > based
> > > > > > on
> > > > > > > > topic names and it seems that the group coordinator should be
> > > > > > responsible
> > > > > > > > for mapping the topic names to topic ids.
> > > > > > > >
> > > > > > > > 81. group.consumer.assignors: Should we change the default
> > > values to
> > > > > > > > include the full class name?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Tue, Oct 18, 2022 at 2:36 AM David Jacot
> > > > > > <dja...@confluent.io.invalid>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > 80. I have included the mapping from topic ids to topic
> names
> > > in
> > > > > > > > > ConsumerGroupPrepareAssignmentResponse.Topics. It can be
> used
> > > to
> > > > > > > > > convert all the topic ids that you mentioned. It seems
> > > preferable to
> > > > > > > > > me to keep it like this as topic ids are usually smaller
> than
> > > topic
> > > > > > > > > names. Does that make sense?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > David
> > > > > > > > >
> > > > > > > > > On Tue, Oct 18, 2022 at 1:51 AM Jun Rao
> > > <j...@confluent.io.invalid>
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi, David,
> > > > > > > > > >
> > > > > > > > > > Thanks for the reply. Just one more comment.
> > > > > > > > > >
> > > > > > > > > > 80. Since PartitionAssignor.AssignmentMemberSpec uses
> topic
> > > name
> > > > > > for both
> > > > > > > > > > subscribedTopics and targetPartitions, in
> > > > > > > > > > ConsumerGroupPrepareAssignmentResponse, should
> > > > > > Members.SubscribedTopicIds
> > > > > > > > > > and Members.TopicPartitions.TopicId be topic names too?
> > > Similarly,
> > > > > > since
> > > > > > > > > > PartitionAssignor.MemberAssignment uses topic name,
> should
> > > > > > > > > >
> > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId
> > > > > > be topic
> > > > > > > > > > name too?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, Oct 17, 2022 at 2:35 AM David Jacot
> > > > > > <dja...@confluent.io.invalid
> > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Jun,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for your comments. Please find my answers below.
> > > > > > > > > > >
> > > > > > > > > > > 60. Sure. Let me use a concrete example to illustrate
> it.
> > > Let's
> > > > > > assume
> > > > > > > > > > > that KStreams has a member A with a task reading from
> > > foo-0 and a
> > > > > > > > > > > member B with a standby task reading from foo-0. As you
> > > know, the
> > > > > > > > > > > standby task information for B is encoded in the
> assignment
> > > > > > metadata
> > > > > > > > > > > whereas the assigned partition for A is in the
> assignment.
> > > Now,
> > > > > > let's
> > > > > > > > > > > imagine that the assignor decides to promote the
> standby
> > > task to
> > > > > > > > > > > become the active task for foo-0. The assignor will
> create
> > > a new
> > > > > > > > > > > assignment for the group and will encode the standby
> task
> > > in A's
> > > > > > > > > > > metadata and assign foo-0 to B. A will be requested to
> > > revoke
> > > > > > foo-0
> > > > > > > > > > > and, while it does so, B will get its new metadata but
> > > without
> > > > > > foo-0
> > > > > > > > > > > because foo-0 is not revoked yet. From the point of
> view
> > > of B,
> > > > > > it will
> > > > > > > > > > > see that the standby task is no longer there. Without
> > > providing
> > > > > > the
> > > > > > > > > > > full set of assigned partitions, it would not know
> what to
> > > do
> > > > > > here. If
> > > > > > > > > > > foo-0 is targeted to be assigned to B, B should wait
> until
> > > it is
> > > > > > > > > > > before stopping the standby task. If foo-0 is not, it
> can
> > > stop
> > > > > > the
> > > > > > > > > > > standby immediately. Long story short, KStreams needs
> to
> > > get the
> > > > > > full
> > > > > > > > > > > assignment + metadata in order to reason about the
> correct
> > > end
> > > > > > state.
> > > > > > > > > > >
> > > > > > > > > > > How do we provide this? We have added the pending
> > > partitions in
> > > > > > the
> > > > > > > > > > > ConsumerGroupHeartbeatResponse in conjunction to the
> > > assigned
> > > > > > > > > > > partitions. With this, the Consumer knows the full set
> of
> > > > > > partitions.
> > > > > > > > > > > PartitionAssignor#onAssign will be called when the
> member
> > > > > > transitions
> > > > > > > > > > > to a new epoch with the target partitions and the
> metadata
> > > for
> > > > > > the
> > > > > > > > > > > member. Then, RebalanceListener#onAssignedPartitions is
> > > called
> > > > > > when
> > > > > > > > > > > partitions are incrementally assigned. At most, it
> will be
> > > > > > called N
> > > > > > > > > > > times where N is the number of assigned partitions in
> the
> > > current
> > > > > > > > > > > epoch.
> > > > > > > > > > >
> > > > > > > > > > > For context, this was discussed with Guohzang in this
> > > thread.
> > > > > > > > > > >
> > > > > > > > > > > I hope that this clarifies the intent.
> > > > > > > > > > >
> > > > > > > > > > > 62. Got it. I have reworked that part as well.
> > > Unfortunately, I
> > > > > > think
> > > > > > > > > > > that we should keep using a Set here because it is
> already
> > > there.
> > > > > > > > > > > Deprecating the current one to change the type is not
> > > worth it.
> > > > > > > > > > >
> > > > > > > > > > > 63. Fixed as well.
> > > > > > > > > > >
> > > > > > > > > > > 68.2 Right. The error is only used by the client side
> > > assignor.
> > > > > > The
> > > > > > > > > > > client assignor will report a non-zero error code while
> > > > > > installing a
> > > > > > > > > > > new assignment if it was not able to compute a new
> > > assignment
> > > > > > for the
> > > > > > > > > > > group. In this case, the assignment is not changed but
> the
> > > error
> > > > > > is
> > > > > > > > > > > reported to all the members. KStreams will have a few
> of
> > > such
> > > > > > errors.
> > > > > > > > > > > They are listed in the KIP.
> > > > > > > > > > >
> > > > > > > > > > > When a new assignment is installed with the
> > > > > > > > > > > ConsumerGroupInstallAssignment API, the coordinator
> > > validates it
> > > > > > and
> > > > > > > > > > > rejects the installation directly if it is not valid.
> In
> > > this
> > > > > > case,
> > > > > > > > > > > only the member trying to install the assignment gets
> an
> > > error.
> > > > > > The
> > > > > > > > > > > other members are not aware of it as they keep their
> > > current
> > > > > > assignor.
> > > > > > > > > > >
> > > > > > > > > > > 71.1 That makes sense. Using SubscribedTopicNames is
> > > indeed more
> > > > > > > > > > > intuitive. I have changed the HB request to use it in
> > > order to be
> > > > > > > > > > > consistent. I have also added the topic names in the
> > > > > > > > > > > ConsumerGroupDescribe response.
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > David
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Oct 14, 2022 at 8:05 PM Jun Rao
> > > <j...@confluent.io.invalid
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Hi, David,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the reply.
> > > > > > > > > > > >
> > > > > > > > > > > > 60. Hmm, could you explain why KStreams needs the
> full
> > > set of
> > > > > > > > > partition
> > > > > > > > > > > > assignments? I am also not sure how this will be
> > > implemented
> > > > > > based
> > > > > > > > > on the
> > > > > > > > > > > > protocol. Since HeartBeatResponse sends the assigned
> > > > > > partitions in
> > > > > > > > > phases
> > > > > > > > > > > > (those that don't need to wait for revocation from
> other
> > > > > > members
> > > > > > > > > first,
> > > > > > > > > > > > followed by the full assignment list), how does a
> member
> > > know
> > > > > > which
> > > > > > > > > > > > response has the full assignment?
> > > > > > > > > > > >
> > > > > > > > > > > > 62. I was referring to Admin#describeConsumerGroups.
> It
> > > seems
> > > > > > that
> > > > > > > > > > > > ownedPartitions is still of type
> List<TopicIdPartition>.
> > > Also,
> > > > > > the
> > > > > > > > > > > > existing topicPartitions() returns
> Set<TopicPartition>,
> > > not a
> > > > > > > > > collection.
> > > > > > > > > > > >
> > > > > > > > > > > > 63. This is also in Admin#describeConsumerGroups. The
> > > comment
> > > > > > seems
> > > > > > > > > > > > inconsistent with the field name.
> > > > > > > > > > > >     /**
> > > > > > > > > > > >      * The reason reported by the assignor.
> > > > > > > > > > > >      */
> > > > > > > > > > > >     byte error;
> > > > > > > > > > > >
> > > > > > > > > > > > 67. Thanks for the explanation. Make sense. The
> existing
> > > name
> > > > > > may be
> > > > > > > > > ok.
> > > > > > > > > > > >
> > > > > > > > > > > > 68.2 Are you saying the error is only intended for
> the
> > > client
> > > > > > > > > assignor?
> > > > > > > > > > > But
> > > > > > > > > > > > the coordinator generates the error based on the
> server
> > > side
> > > > > > > > > validation,
> > > > > > > > > > > > right? Should we provide some info to tell the client
> > > why the
> > > > > > > > > validation
> > > > > > > > > > > > fails?
> > > > > > > > > > > >
> > > > > > > > > > > > 71.1 Hmm, for SubscribedTopicIds, should we use topic
> > > name in
> > > > > > the
> > > > > > > > > > > > subscription part? That seems more intuitive---a
> > > subscription
> > > > > > > > > shouldn't
> > > > > > > > > > > > change just because a topic is recreated. For the
> > > assigned
> > > > > > > > > partitions,
> > > > > > > > > > > > perhaps we could include both topicId and name just
> like
> > > > > > > > > > > FetchOffsetRequest.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jun
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:49 AM Luke Chen <
> > > show...@gmail.com>
> > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the update.
> > > > > > > > > > > > > Yes, I think using similar way as KIP-868 to fix
> this
> > > issue
> > > > > > makes
> > > > > > > > > > > sense.
> > > > > > > > > > > > > Let's consider it in the future.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Luke
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Oct 14, 2022 at 5:16 PM David Jacot
> > > > > > > > > > > <dja...@confluent.io.invalid>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Luke,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for your questions.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. We will store the "targetAssignment" into
> log
> > > now.
> > > > > > But as we
> > > > > > > > > > > know,
> > > > > > > > > > > > > > there's max batch size limit (default 1MB), which
> > > means, we
> > > > > > > > > cannot
> > > > > > > > > > > > > support
> > > > > > > > > > > > > > 1M partitions in one group (actually, it should
> be
> > > less
> > > > > > than 60k
> > > > > > > > > > > > > partitions
> > > > > > > > > > > > > > since we'll store {topicID+partition id}) by
> default
> > > now.
> > > > > > How
> > > > > > > > > will we
> > > > > > > > > > > > > > handle that? Do we expect users to adjust the max
> > > batch
> > > > > > size to
> > > > > > > > > > > support
> > > > > > > > > > > > > > large partitions in groups, which we don't need
> this
> > > > > > change for
> > > > > > > > > old
> > > > > > > > > > > > > > protocol?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > That's right. I have a few ideas to remove this
> > > limitation
> > > > > > in the
> > > > > > > > > > > > > > future but I decided to keep them for future
> > > improvement.
> > > > > > The
> > > > > > > > > KIP is
> > > > > > > > > > > > > > large enough and as the current protocol suffers
> > > from the
> > > > > > exact
> > > > > > > > > same
> > > > > > > > > > > > > > limitation, it is not a regression.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > For the future, my thinking is to split the
> > > assignment and
> > > > > > to
> > > > > > > > > only
> > > > > > > > > > > > > > write deltas to the log instead of re-writing
> all of
> > > it.
> > > > > > We would
> > > > > > > > > > > need
> > > > > > > > > > > > > > to use transactions for this in the coordinator
> > > (similarly
> > > > > > to
> > > > > > > > > > > > > > KIP-868). The challenge is that we have to ensure
> > > that
> > > > > > those
> > > > > > > > > deltas
> > > > > > > > > > > > > > are all written or completely roll backed.
> > > Otherwise, we
> > > > > > would
> > > > > > > > > have a
> > > > > > > > > > > > > > weird state with the compaction. This needs
> > > obviously more
> > > > > > > > > thinking.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I'm wondering why we should persist the
> > > > > > "targetAssignment"
> > > > > > > > > data?
> > > > > > > > > > > If we
> > > > > > > > > > > > > > want
> > > > > > > > > > > > > > to work for coordinator failover, could the new
> > > > > > coordinator try
> > > > > > > > > to
> > > > > > > > > > > > > request
> > > > > > > > > > > > > > for currently owned partitions from each consumer
> > > when
> > > > > > failed
> > > > > > > > > over?
> > > > > > > > > > > I'm
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > sure if the consumer will auto send owned
> partitions
> > > to
> > > > > > the new
> > > > > > > > > > > > > > coordinator. If not, maybe we can return an
> error to
> > > client
> > > > > > > > > > > > > > ConsumerGroupHeartbeat API with
> > > REQUIRE_OWNED_PARTITION
> > > > > > error,
> > > > > > > > > and
> > > > > > > > > > > ask
> > > > > > > > > > > > > > client to append the currently owned partitions
> to
> > > new
> > > > > > > > > coordinates
> > > > > > > > > > > for
> > > > > > > > > > > > > new
> > > > > > > > > > > > > > assignment computation. Does that make sense?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > The entire reconciliation process depends on it
> so
> > > if we
> > > > > > lose it
> > > > > > > > > > > > > > during a failover, members could be in a weird
> > > state. For
> > > > > > > > > instance,
> > > > > > > > > > > > > > they could be in the middle of a transition from
> > > their
> > > > > > current
> > > > > > > > > > > > > > assignment to their new target and thus would be
> > > blocked.
> > > > > > > > > Relying on
> > > > > > > > > > > > > > members to reconstruct it back does not really
> work
> > > > > > because they
> > > > > > > > > > > don't
> > > > > > > > > > > > > > have all the information to do so (e.g. new
> > > metadata) so
> > > > > > we would
> > > > > > > > > > > have
> > > > > > > > > > > > > > to recompute a new one. This implies that we
> need to
> > > get
> > > > > > the
> > > > > > > > > owned
> > > > > > > > > > > > > > partitions from all members and that would take
> a few
> > > > > > seconds
> > > > > > > > > until
> > > > > > > > > > > > > > all members come back in the best case, up to the
> > > session
> > > > > > > > > timeout in
> > > > > > > > > > > > > > the worst case. Imagine that a member joins or
> fails
> > > > > > during this
> > > > > > > > > > > time,
> > > > > > > > > > > > > > the whole process would be stuck. I am afraid
> > > storing it
> > > > > > is the
> > > > > > > > > best
> > > > > > > > > > > > > > way here.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > David
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 5:11 AM Luke Chen <
> > > > > > show...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi David,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > A few more questions:
> > > > > > > > > > > > > > > 1. We will store the "targetAssignment" into
> log
> > > now.
> > > > > > But as we
> > > > > > > > > > > know,
> > > > > > > > > > > > > > > there's max batch size limit (default 1MB),
> which
> > > means,
> > > > > > we
> > > > > > > > > cannot
> > > > > > > > > > > > > > support
> > > > > > > > > > > > > > > 1M partitions in one group (actually, it
> should be
> > > less
> > > > > > than
> > > > > > > > > 60k
> > > > > > > > > > > > > > partitions
> > > > > > > > > > > > > > > since we'll store {topicID+partition id}) by
> > > default
> > > > > > now. How
> > > > > > > > > will
> > > > > > > > > > > we
> > > > > > > > > > > > > > > handle that? Do we expect users to adjust the
> max
> > > batch
> > > > > > size to
> > > > > > > > > > > support
> > > > > > > > > > > > > > > large partitions in groups, which we don't need
> > > this
> > > > > > change
> > > > > > > > > for old
> > > > > > > > > > > > > > > protocol?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I'm wondering why we should persist the
> > > > > > "targetAssignment"
> > > > > > > > > data?
> > > > > > > > > > > If we
> > > > > > > > > > > > > > want
> > > > > > > > > > > > > > > to work for coordinator failover, could the new
> > > > > > coordinator
> > > > > > > > > try to
> > > > > > > > > > > > > > request
> > > > > > > > > > > > > > > for currently owned partitions from each
> consumer
> > > when
> > > > > > failed
> > > > > > > > > > > over? I'm
> > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > sure if the consumer will auto send owned
> > > partitions to
> > > > > > the new
> > > > > > > > > > > > > > > coordinator. If not, maybe we can return an
> error
> > > to
> > > > > > client
> > > > > > > > > > > > > > > ConsumerGroupHeartbeat API with
> > > REQUIRE_OWNED_PARTITION
> > > > > > error,
> > > > > > > > > and
> > > > > > > > > > > ask
> > > > > > > > > > > > > > > client to append the currently owned
> partitions to
> > > new
> > > > > > > > > coordinates
> > > > > > > > > > > for
> > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > assignment computation. Does that make sense?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao
> > > > > > > > > <j...@confluent.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi, David,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks for the reply and the updated KIP. A
> few
> > > more
> > > > > > > > > comments on
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > interfaces and the protocols.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 60.  On the consumer side, do we need both
> > > > > > > > > > > > > > PartitionAssignor.onAssignment
> > > > > > > > > > > > > > > > and
> > > ConsumerRebalanceListener.onPartitionsAssigned? My
> > > > > > > > > > > understanding
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > that the former was added for cooperative
> > > rebalance,
> > > > > > which
> > > > > > > > > is now
> > > > > > > > > > > > > > handled
> > > > > > > > > > > > > > > > by the coordinator. If we do need both,
> should
> > > we make
> > > > > > them
> > > > > > > > > more
> > > > > > > > > > > > > > consistent
> > > > > > > > > > > > > > > > (e.g. topic name vs topic id, list vs set vs
> > > > > > collection)?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 61. group.local.assignors: Could we make it
> > > clear that
> > > > > > it's
> > > > > > > > > the
> > > > > > > > > > > full
> > > > > > > > > > > > > > class
> > > > > > > > > > > > > > > > name that implements PartitionAssignor?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 62. MemberAssignment: It currently has the
> > > following
> > > > > > method.
> > > > > > > > > > > > > > > >     public Set<TopicPartition>
> topicPartitions()
> > > > > > > > > > > > > > > > We are adding List<TopicIdPartition>
> > > ownedPartitions.
> > > > > > Should
> > > > > > > > > we
> > > > > > > > > > > keep
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > naming and the return type consistent?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 63. MemberAssignment.error: should that be
> > > reason?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 64. group.remote.assignor: The client may not
> > > know what
> > > > > > > > > > > assignors the
> > > > > > > > > > > > > > > > broker supports. Should we default this to
> what
> > > the
> > > > > > broker
> > > > > > > > > > > determines
> > > > > > > > > > > > > > (e.g.
> > > > > > > > > > > > > > > > first assignor listed in
> > > group.consumer.assignors)?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 65. After the text "When A heartbeats again
> and
> > > > > > acknowledges
> > > > > > > > > the
> > > > > > > > > > > > > > > > revocation, the group coordinator transitions
> > > him to
> > > > > > epoch 2
> > > > > > > > > and
> > > > > > > > > > > > > > releases
> > > > > > > > > > > > > > > > foo-2.", we have the following.
> > > > > > > > > > > > > > > >   B - epoch=2, partitions=[foo-2],
> > > > > > pending-partitions=[]
> > > > > > > > > > > > > > > > Should foo-2 be in pending-partitions?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 66. In the Online Migration example, is the
> first
> > > > > > occurence
> > > > > > > > > of
> > > > > > > > > > > "C -
> > > > > > > > > > > > > > > > epoch=23, partitions=[foo-2, foo-5, foo-4],
> > > > > > > > > > > pending-partitions=[]"
> > > > > > > > > > > > > > correct?
> > > > > > > > > > > > > > > > It seems that should happen after C receives
> a
> > > > > > SyncGroup
> > > > > > > > > > > response? If
> > > > > > > > > > > > > > so,
> > > > > > > > > > > > > > > > subsequent examples have the same issue.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 67.
> > > ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs :
> > > > > > Which
> > > > > > > > > > > config
> > > > > > > > > > > > > > > > controls this? How is this used by the group
> > > > > > coordinator
> > > > > > > > > since
> > > > > > > > > > > there
> > > > > > > > > > > > > > is no
> > > > > > > > > > > > > > > > sync barrier anymore?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 68. ConsumerGroupHeartbeatResponse:
> > > > > > > > > > > > > > > > 68.1 AssignedTopicPartitions and
> > > > > > PendingTopicPartitions are
> > > > > > > > > of
> > > > > > > > > > > type
> > > > > > > > > > > > > > > > []TopicPartition. Should they be
> TopicPartitions?
> > > > > > > > > > > > > > > > 68.2 Assignment.error. Should we also have an
> > > > > > errorMessage
> > > > > > > > > field?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 69.
> > > > > > ConsumerGroupPrepareAssignmentResponse.Members.Assignor:
> > > > > > > > > > > Should
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > include the selected assignor name?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 70.
> > > ConsumerGroupInstallAssignmentRequest.GroupEpoch:
> > > > > > Should
> > > > > > > > > we
> > > > > > > > > > > let
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > client set this? Intuitively, it seems the
> > > coordinator
> > > > > > should
> > > > > > > > > > > manage
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > group epoch.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 71. ConsumerGroupDescribeResponse:
> > > > > > > > > > > > > > > > 71.1 Members.Assignment.Partitions. Should we
> > > include
> > > > > > the
> > > > > > > > > topic
> > > > > > > > > > > name
> > > > > > > > > > > > > > too
> > > > > > > > > > > > > > > > since it's convenient for building tools?
> Ditto
> > > for
> > > > > > > > > > > TargetAssignment.
> > > > > > > > > > > > > > > > 71.2 Members: Should we include
> > > SubscribedTopicRegex
> > > > > > too?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 72. OffsetFetchRequest: Is
> > > GenerationIdOrMemberEpoch
> > > > > > needed
> > > > > > > > > since
> > > > > > > > > > > > > > tools may
> > > > > > > > > > > > > > > > also want to issue this request?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Jun
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > >
> > >
>

Reply via email to