Hi all, hi Jason,

Based on Jason's feedback, I have updated the KIP as follows.

* Jason pointed out that the member id handling is a tad weird. The
group coordinator generates the member id and then trusts the member
when it rejoins the group. This also implies that the client could
directly generate its member id and the group coordinator will accept
it. It seems better to directly let the client generate id instead of
relying on the group coordinator. I have updated the KIP in this
direction. Note that the new APIs still use a string for the member id
in order to remain consistent with the existing APIs.

* Using one record to store the target assignment limit the
scalability because of the maximum batch size (1MB by default).
Instead, I purpose to use N records to persist the member assignment
followed by one records the assignment metadata. When the assignment
is updated, the group coordinator will only write the different
between the new and the old assignment. Those records must be written
automatically to the log. As a first step, I propose to rely on the
atomicity of the batch to achieve this. In the future, we could extend
this with a mechanism similar to KIP-868. I have added this to the
future section.

* I have added the upgrade/downgrade in the future work section. As
Jason's pointed out, the group coordinator has always been bad from a
downgrade perspective. This is something that we should improve.
Jason's also suggest to use a separate feature flag (e.g.
group.metadata.version) instead of relying on metadata.version used by
the quorum controller. While I tend to agree with this, I think that
we need to think this through so I propose to revise this in
subsequent KIP focused on the upgrade/downgrade.

Best,
David

On Thu, Oct 20, 2022 at 5:56 PM David Jacot <dja...@confluent.io> wrote:
>
> Hi Jason,
>
> Thanks for your comments. Please find my answers below.
>
> 1. Yeah, I agree that the handling is a tad awkward. It is interesting
> to note that the client could actually generate its own member id with
> the current handling. It may be better to just standardize on doing
> this. I cannot really think of any downsides for it.
>
> 2. I was not aware of the aim to have the --release flag in the
> future. That would definitely illimitate my concern. I will go ahead
> and update the KIP. I will also mention the downgrade as a future
> improvement. We can cover this in a future KIP.
>
> 3. I wanted to avoid having to recompute the assignment if we discover
> that it is corrupted. That is not an issue when a server side assignor
> is used but it could take time when a client side assignor is used.
> This is not ideal because during this time the members are without a
> target assignment and we cannot rely on the previous one because it
> may be gone (or partially compacted to be precise). Members could not
> even be fully reconciled with the previous assignment so the entire
> reconciliation process would be stuck.
>
> The simplest solution that I could think of is to use separate records
> for assignment metadata and member assignments, and to rely on the
> batch to enforce the atomicity of the update. The assignment will be
> updated most likely incrementally so the number of records should be
> rather small in each delta update. This means that we would be rather
> good with the default 1MB limit in this case. This would also allow us
> to move towards the transaction based solution in the future if we
> have to. This would be already better than the solution proposed in
> the KIP..
>
> Best,
> David
>
> On Thu, Oct 20, 2022 at 2:59 PM Jason Gustafson
> <ja...@confluent.io.invalid> wrote:
> >
> > Hi David,
> >
> > Thanks for the response. Replies below:
> >
> > > To answer your two other questions.
> > The consumer would keep it after a session timeout but the coordinator
> > would not. The coordinator would return UNKNOWN_MEMBER_ID when the
> > member id does not exist and the member epoch is not equal to zero.
> > This differentiation is perhaps overkill here. What do you think?
> >
> > I do think the concept of an incarnation ID is useful. It allows us to
> > continue tracking consumer instances as they join and leave the group,
> > which is useful when debugging. I find the handling a tad awkward
> > though. It sounds like we are trying not to let the UUID be generated on
> > the client side, but after the coordinator drops the state, then it has
> > to trust the client. That is probably fine I guess. Alternatively, we could
> > separate
> > the incarnation ID. Let the client provide the value for that, but let the
> > coordinator provide a member (or session) ID which is only valid for the
> > duration of the session. Not really sure it's worth having two IDs, but it
> > would get around the slight awkwardness. Or perhaps we could let the
> > coordinator trust the client-provided ID consistently. A small benefit of
> > either of these approaches is that the ID would cover the entire lifecycle
> > of the consumer, and not just the time following coordinator introduction.
> > That might give us an easy grep for the logs of a specific consumer
> > instance. Anyway, I don't feel too strongly about this.
> >
> > > Regarding reusing the metadata.version, I see your point. Decoupling
> > has the advantage that both could be managed independently. The
> > disadvantages is that operators have to manage them independently. It
> > makes the upgrade/downgrade procedure a little more complex. I think
> > that it really depends on how independent we want to be here. What's
> > your take?
> >
> > I think the way we viewed this was that for most users, feature upgrades
> > would be done using the `kafka-features.sh upgrade --release` option.
> > So using `upgrade --release 3.4` might give you  `metadata.version=8`
> > and `group.coordinator.version=1`. Most users would not see it,
> > but it avoids the coupling with the metadata version which I see as
> > mainly about the versions of records in the metadata log. The more
> > advanced users might prefer to enable this feature only after it has
> > cleared testing and operational requirements. It also gives us some
> > flexibility in regard to when the feature becomes the default. For example,
> > we might let it be experimental in the first release and only become part
> > of the automatic `--release` upgrade in a later release. That would
> > lower its risk given the inability to downgrade.
> >
> > > There is a bit of complexity here so I wonder if we should tackle this
> > in a separate KIP. We would definitely have to do this before we could
> > release the new protocol. I could add this in the future work section.
> > What do you think?
> >
> > A separate KIP is reasonable. Just mentioning it since we have been
> > burned in the past by incompatible changes in the group coordinator.
> >
> > > It definitely is a scalability problem. We are basically limited by
> > the 1MB like today. I would like to improve this as well. For
> > instance, we could have one record per member and only write the
> > difference between the previous and the new assignment to the log. The
> > issue with this is that the write must be atomic here. Otherwise, we
> > would end up with a weird state of the records are partially written
> > and the log is compacted. My idea is to do something similar to
> > KIP-868 but by reusing existing transactional markers. That would
> > allow us to abort the transaction when the coordinator is reloaded. I
> > wanted to cover this separately from the current proposal. Do you
> > think that we should do this in this KIP?
> >
> > That is a reasonable idea. I am wondering if there are lighter weight
> > options
> > though.  Suppose that we used separate records for assignment metadata
> > and individual member assignments. In the metadata, we might identify
> > all the member IDs that are covered by the assignment. Then when we load
> > the assignment, we can validate that all the member assignment records
> > are present. If not, then we consider it invalid and begin a
> > new assignment.
> > Just a thought.
> >
> > Thanks,
> > Jason
> >
> > On Wed, Oct 19, 2022 at 10:47 AM David Jacot <dja...@confluent.io.invalid>
> > wrote:
> >
> > > Hi Jason,
> > >
> > > Thanks for your comments.
> > >
> > > 1. My intent was to let the consumer reuse its member id when it
> > > rejoins the group after being fenced. I basically wanted to
> > > differentiate a restart of the consumer from a rejoin of the member on
> > > the server side. Joining with a member id would only be allowed with
> > > the member epoch equals to zero. To answer your two other questions.
> > > The consumer would keep it after a session timeout but the coordinator
> > > would not. The coordinator would return UNKNOWN_MEMBER_ID when the
> > > member id does not exist and the member epoch is not equal to zero.
> > > This differentiation is perhaps overkill here. What do you think?
> > >
> > > 2. That's an interesting question. My intent was to start small here
> > > and to use the metadata version like we use the IBP today. In this
> > > regard, the current Group Coordinator does not really handle
> > > downgrades if I remember correctly. That being said, I do agree with
> > > you that we should do better here.
> > >
> > > Can we support downgrades? I suppose that the challenge is that we
> > > would have to rewrite all the records to the logs before downgrading
> > > the software. We could do this when the metadata version is changed
> > > for instance. The issue is that the log would still have the previous
> > > records until compaction happens. This gives us two choices: 1) we
> > > force the compaction; or 2) we delete the past once we have rewritten
> > > the whole state. We would have to handle backward compatibility
> > > breakages in the controller as well like for the controller. We could
> > > add another boolean to the MetadataVersion. Does it sound reasonable?
> > > I was thinking out loud here :).
> > >
> > > Regarding reusing the metadata.version, I see your point. Decoupling
> > > has the advantage that both could be managed independently. The
> > > disadvantages is that operators have to manage them independently. It
> > > makes the upgrade/downgrade procedure a little more complex. I think
> > > that it really depends on how independent we want to be here. What's
> > > your take?
> > >
> > > There is a bit of complexity here so I wonder if we should tackle this
> > > in a separate KIP. We would definitely have to do this before we could
> > > release the new protocol. I could add this in the future work section.
> > > What do you think?
> > >
> > > 3. It definitely is a scalability problem. We are basically limited by
> > > the 1MB like today. I would like to improve this as well. For
> > > instance, we could have one record per member and only write the
> > > difference between the previous and the new assignment to the log. The
> > > issue with this is that the write must be atomic here. Otherwise, we
> > > would end up with a weird state of the records are partially written
> > > and the log is compacted. My idea is to do something similar to
> > > KIP-868 but by reusing existing transactional markers. That would
> > > allow us to abort the transaction when the coordinator is reloaded. I
> > > wanted to cover this separately from the current proposal. Do you
> > > think that we should do this in this KIP?
> > >
> > > Cheers,
> > > David
> > >
> > > On Wed, Oct 19, 2022 at 6:53 PM Jason Gustafson
> > > <ja...@confluent.io.invalid> wrote:
> > > >
> > > > Hi David,
> > > >
> > > > A few questions below:
> > > >
> > > > 1. In regard to this comment:
> > > >
> > > > > Every member is uniquely identified by a UUID. This is called the
> > > Member
> > > > ID. This UUID is generated on the server side and given to the member
> > > when
> > > > it joins the group. It is used in all the communication with the group
> > > > coordinator and must be kept during the entirely life span of the member
> > > > (e.g. the consumer). In that sense, it is similar to an incarnation ID.
> > > >
> > > > This is a little confusing to me. The id is generated when we join the
> > > > group, but it is also intended to be kept for the life span of the
> > > > consumer? Do we keep it after a session timeout? In what case does the
> > > > coordinator send UNKNOWN_MEMBER_ID?
> > > >
> > > > 2. It sounds like the plan is to piggyback on metadata.version for the
> > > > upgrade of the persistent records stored in __consumer_offsets. Do I 
> > > > have
> > > > that right? The metadata version proposal (KIP-778) covers downgrade for
> > > > incompatible changes to the records in the metadata log, but I don't
> > > think
> > > > we've said anything yet about other internal topic data. I think the new
> > > > records would definitely be an incompatible change, so I guess the basic
> > > > question is whether downgrade will be covered? An alternative to using
> > > the
> > > > metadata version which has been proposed in the past is to use a new
> > > > feature version for the group coordinator. This would avoid coupling the
> > > > group changes so that metadata downgrades would still be possible.
> > > >
> > > > 3. The doc mentions that member metadata is stored in separate records 
> > > > in
> > > > order to avoid the batch limit. The group assignment, on the other hand,
> > > is
> > > > still stored as a single record. Will that be a scalability problem?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Wed, Oct 19, 2022 at 5:30 AM David Jacot <dja...@confluent.io.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I would like to highlight a small change that I did in the KIP around
> > > > > static membership based on Magnus' feedback. Magnus pointed out that
> > > > > two members using the same static instance id will basically
> > > > > continuously fence each other. The issue is that the existing member
> > > > > is fenced and replaced by the new joining member. This is not ideal.
> > > > > Note that Jason mentioned something similar earlier in this thread.
> > > > >
> > > > > I think that we can make this slightly better by rejecting the new
> > > > > member until the lease of the existing member is either explicitly
> > > > > terminated or expired based on the session timeout. The existing
> > > > > member can terminate its lease by sending an heartbeat with -2 as
> > > > > member epoch. This would signal to the group coordinator that the
> > > > > existing member left the group with the intent to rejoin within the
> > > > > session timeout. The new member is rejected with a new error code
> > > > > UNRELEASED_INSTANCE_ID during that time period. In case of failure of
> > > > > the existing member, the new member has to wait until its session
> > > > > expires.
> > > > >
> > > > > Best,
> > > > > David
> > > > >
> > > > >
> > > > > On Wed, Oct 19, 2022 at 10:05 AM David Jacot <dja...@confluent.io>
> > > wrote:
> > > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > Thanks for your thorough review. There is already a vote thread if
> > > you
> > > > > > want to vote.
> > > > > >
> > > > > > Best,
> > > > > > David
> > > > > >
> > > > > > On Tue, Oct 18, 2022 at 11:07 PM Jun Rao <j...@confluent.io.invalid>
> > > > > wrote:
> > > > > > >
> > > > > > > Hi, David,
> > > > > > >
> > > > > > > Thanks for the reply. No more comments from me.
> > > > > > >
> > > > > > > 80. Yes, since PrepareAssignment returns topicIds, using topicId 
> > > > > > > in
> > > > > > > ConsumerGroupInstallAssignmentRequest makes sense.
> > > > > > >
> > > > > > > 81. Sounds good.
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Tue, Oct 18, 2022 at 11:46 AM David Jacot
> > > > > <dja...@confluent.io.invalid>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > 81. I forgot to say that I put UniformAssignor as the first one
> > > in
> > > > > the
> > > > > > > > list. I think that it should be the default one.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > David
> > > > > > > >
> > > > > > > > On Tue, Oct 18, 2022 at 8:33 PM David Jacot <dja...@confluent.io
> > > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > 80. Hmm. It seems preferable to keep
> > > > > > > > >
> > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId
> > > > > as a
> > > > > > > > > topic id in order for the RPC to remain symmetrical with the
> > > > > > > > > PrepareAssignment RPCs. The client has to create the
> > > > > TopicPartitions
> > > > > > > > > from the mapping provided in the PrepareAssignment response so
> > > it
> > > > > can
> > > > > > > > > use the same mapping to convert them back to topic ids
> > > afterwards.
> > > > > I
> > > > > > > > > personally find this cleaner from an RPC perspective, don't 
> > > > > > > > > you
> > > > > think?
> > > > > > > > >
> > > > > > > > > 81. Make sense. Done.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > David
> > > > > > > > >
> > > > > > > > > On Tue, Oct 18, 2022 at 8:00 PM Jun Rao
> > > <j...@confluent.io.invalid>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi, David,
> > > > > > > > > >
> > > > > > > > > > Thanks for the reply.
> > > > > > > > > >
> > > > > > > > > > 80. The change in ConsumerGroupPrepareAssignmentResponse
> > > sounds
> > > > > good
> > > > > > > > to me.
> > > > > > > > > > Should
> > > > > > > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId
> > > be
> > > > > > > > > > topic name? Intuitively, the client assignor assigns
> > > partitions
> > > > > based
> > > > > > > > on
> > > > > > > > > > topic names and it seems that the group coordinator should 
> > > > > > > > > > be
> > > > > > > > responsible
> > > > > > > > > > for mapping the topic names to topic ids.
> > > > > > > > > >
> > > > > > > > > > 81. group.consumer.assignors: Should we change the default
> > > > > values to
> > > > > > > > > > include the full class name?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > > On Tue, Oct 18, 2022 at 2:36 AM David Jacot
> > > > > > > > <dja...@confluent.io.invalid>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Jun,
> > > > > > > > > > >
> > > > > > > > > > > 80. I have included the mapping from topic ids to topic
> > > names
> > > > > in
> > > > > > > > > > > ConsumerGroupPrepareAssignmentResponse.Topics. It can be
> > > used
> > > > > to
> > > > > > > > > > > convert all the topic ids that you mentioned. It seems
> > > > > preferable to
> > > > > > > > > > > me to keep it like this as topic ids are usually smaller
> > > than
> > > > > topic
> > > > > > > > > > > names. Does that make sense?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > David
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Oct 18, 2022 at 1:51 AM Jun Rao
> > > > > <j...@confluent.io.invalid>
> > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Hi, David,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the reply. Just one more comment.
> > > > > > > > > > > >
> > > > > > > > > > > > 80. Since PartitionAssignor.AssignmentMemberSpec uses
> > > topic
> > > > > name
> > > > > > > > for both
> > > > > > > > > > > > subscribedTopics and targetPartitions, in
> > > > > > > > > > > > ConsumerGroupPrepareAssignmentResponse, should
> > > > > > > > Members.SubscribedTopicIds
> > > > > > > > > > > > and Members.TopicPartitions.TopicId be topic names too?
> > > > > Similarly,
> > > > > > > > since
> > > > > > > > > > > > PartitionAssignor.MemberAssignment uses topic name,
> > > should
> > > > > > > > > > > >
> > > > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId
> > > > > > > > be topic
> > > > > > > > > > > > name too?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jun
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Oct 17, 2022 at 2:35 AM David Jacot
> > > > > > > > <dja...@confluent.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for your comments. Please find my answers 
> > > > > > > > > > > > > below.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 60. Sure. Let me use a concrete example to illustrate
> > > it.
> > > > > Let's
> > > > > > > > assume
> > > > > > > > > > > > > that KStreams has a member A with a task reading from
> > > > > foo-0 and a
> > > > > > > > > > > > > member B with a standby task reading from foo-0. As 
> > > > > > > > > > > > > you
> > > > > know, the
> > > > > > > > > > > > > standby task information for B is encoded in the
> > > assignment
> > > > > > > > metadata
> > > > > > > > > > > > > whereas the assigned partition for A is in the
> > > assignment.
> > > > > Now,
> > > > > > > > let's
> > > > > > > > > > > > > imagine that the assignor decides to promote the
> > > standby
> > > > > task to
> > > > > > > > > > > > > become the active task for foo-0. The assignor will
> > > create
> > > > > a new
> > > > > > > > > > > > > assignment for the group and will encode the standby
> > > task
> > > > > in A's
> > > > > > > > > > > > > metadata and assign foo-0 to B. A will be requested to
> > > > > revoke
> > > > > > > > foo-0
> > > > > > > > > > > > > and, while it does so, B will get its new metadata but
> > > > > without
> > > > > > > > foo-0
> > > > > > > > > > > > > because foo-0 is not revoked yet. From the point of
> > > view
> > > > > of B,
> > > > > > > > it will
> > > > > > > > > > > > > see that the standby task is no longer there. Without
> > > > > providing
> > > > > > > > the
> > > > > > > > > > > > > full set of assigned partitions, it would not know
> > > what to
> > > > > do
> > > > > > > > here. If
> > > > > > > > > > > > > foo-0 is targeted to be assigned to B, B should wait
> > > until
> > > > > it is
> > > > > > > > > > > > > before stopping the standby task. If foo-0 is not, it
> > > can
> > > > > stop
> > > > > > > > the
> > > > > > > > > > > > > standby immediately. Long story short, KStreams needs
> > > to
> > > > > get the
> > > > > > > > full
> > > > > > > > > > > > > assignment + metadata in order to reason about the
> > > correct
> > > > > end
> > > > > > > > state.
> > > > > > > > > > > > >
> > > > > > > > > > > > > How do we provide this? We have added the pending
> > > > > partitions in
> > > > > > > > the
> > > > > > > > > > > > > ConsumerGroupHeartbeatResponse in conjunction to the
> > > > > assigned
> > > > > > > > > > > > > partitions. With this, the Consumer knows the full set
> > > of
> > > > > > > > partitions.
> > > > > > > > > > > > > PartitionAssignor#onAssign will be called when the
> > > member
> > > > > > > > transitions
> > > > > > > > > > > > > to a new epoch with the target partitions and the
> > > metadata
> > > > > for
> > > > > > > > the
> > > > > > > > > > > > > member. Then, RebalanceListener#onAssignedPartitions 
> > > > > > > > > > > > > is
> > > > > called
> > > > > > > > when
> > > > > > > > > > > > > partitions are incrementally assigned. At most, it
> > > will be
> > > > > > > > called N
> > > > > > > > > > > > > times where N is the number of assigned partitions in
> > > the
> > > > > current
> > > > > > > > > > > > > epoch.
> > > > > > > > > > > > >
> > > > > > > > > > > > > For context, this was discussed with Guohzang in this
> > > > > thread.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I hope that this clarifies the intent.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 62. Got it. I have reworked that part as well.
> > > > > Unfortunately, I
> > > > > > > > think
> > > > > > > > > > > > > that we should keep using a Set here because it is
> > > already
> > > > > there.
> > > > > > > > > > > > > Deprecating the current one to change the type is not
> > > > > worth it.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 63. Fixed as well.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 68.2 Right. The error is only used by the client side
> > > > > assignor.
> > > > > > > > The
> > > > > > > > > > > > > client assignor will report a non-zero error code 
> > > > > > > > > > > > > while
> > > > > > > > installing a
> > > > > > > > > > > > > new assignment if it was not able to compute a new
> > > > > assignment
> > > > > > > > for the
> > > > > > > > > > > > > group. In this case, the assignment is not changed but
> > > the
> > > > > error
> > > > > > > > is
> > > > > > > > > > > > > reported to all the members. KStreams will have a few
> > > of
> > > > > such
> > > > > > > > errors.
> > > > > > > > > > > > > They are listed in the KIP.
> > > > > > > > > > > > >
> > > > > > > > > > > > > When a new assignment is installed with the
> > > > > > > > > > > > > ConsumerGroupInstallAssignment API, the coordinator
> > > > > validates it
> > > > > > > > and
> > > > > > > > > > > > > rejects the installation directly if it is not valid.
> > > In
> > > > > this
> > > > > > > > case,
> > > > > > > > > > > > > only the member trying to install the assignment gets
> > > an
> > > > > error.
> > > > > > > > The
> > > > > > > > > > > > > other members are not aware of it as they keep their
> > > > > current
> > > > > > > > assignor.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 71.1 That makes sense. Using SubscribedTopicNames is
> > > > > indeed more
> > > > > > > > > > > > > intuitive. I have changed the HB request to use it in
> > > > > order to be
> > > > > > > > > > > > > consistent. I have also added the topic names in the
> > > > > > > > > > > > > ConsumerGroupDescribe response.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best,
> > > > > > > > > > > > > David
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Oct 14, 2022 at 8:05 PM Jun Rao
> > > > > <j...@confluent.io.invalid
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi, David,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for the reply.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 60. Hmm, could you explain why KStreams needs the
> > > full
> > > > > set of
> > > > > > > > > > > partition
> > > > > > > > > > > > > > assignments? I am also not sure how this will be
> > > > > implemented
> > > > > > > > based
> > > > > > > > > > > on the
> > > > > > > > > > > > > > protocol. Since HeartBeatResponse sends the assigned
> > > > > > > > partitions in
> > > > > > > > > > > phases
> > > > > > > > > > > > > > (those that don't need to wait for revocation from
> > > other
> > > > > > > > members
> > > > > > > > > > > first,
> > > > > > > > > > > > > > followed by the full assignment list), how does a
> > > member
> > > > > know
> > > > > > > > which
> > > > > > > > > > > > > > response has the full assignment?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 62. I was referring to Admin#describeConsumerGroups.
> > > It
> > > > > seems
> > > > > > > > that
> > > > > > > > > > > > > > ownedPartitions is still of type
> > > List<TopicIdPartition>.
> > > > > Also,
> > > > > > > > the
> > > > > > > > > > > > > > existing topicPartitions() returns
> > > Set<TopicPartition>,
> > > > > not a
> > > > > > > > > > > collection.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 63. This is also in Admin#describeConsumerGroups. 
> > > > > > > > > > > > > > The
> > > > > comment
> > > > > > > > seems
> > > > > > > > > > > > > > inconsistent with the field name.
> > > > > > > > > > > > > >     /**
> > > > > > > > > > > > > >      * The reason reported by the assignor.
> > > > > > > > > > > > > >      */
> > > > > > > > > > > > > >     byte error;
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 67. Thanks for the explanation. Make sense. The
> > > existing
> > > > > name
> > > > > > > > may be
> > > > > > > > > > > ok.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 68.2 Are you saying the error is only intended for
> > > the
> > > > > client
> > > > > > > > > > > assignor?
> > > > > > > > > > > > > But
> > > > > > > > > > > > > > the coordinator generates the error based on the
> > > server
> > > > > side
> > > > > > > > > > > validation,
> > > > > > > > > > > > > > right? Should we provide some info to tell the 
> > > > > > > > > > > > > > client
> > > > > why the
> > > > > > > > > > > validation
> > > > > > > > > > > > > > fails?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 71.1 Hmm, for SubscribedTopicIds, should we use 
> > > > > > > > > > > > > > topic
> > > > > name in
> > > > > > > > the
> > > > > > > > > > > > > > subscription part? That seems more intuitive---a
> > > > > subscription
> > > > > > > > > > > shouldn't
> > > > > > > > > > > > > > change just because a topic is recreated. For the
> > > > > assigned
> > > > > > > > > > > partitions,
> > > > > > > > > > > > > > perhaps we could include both topicId and name just
> > > like
> > > > > > > > > > > > > FetchOffsetRequest.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Jun
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:49 AM Luke Chen <
> > > > > show...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for the update.
> > > > > > > > > > > > > > > Yes, I think using similar way as KIP-868 to fix
> > > this
> > > > > issue
> > > > > > > > makes
> > > > > > > > > > > > > sense.
> > > > > > > > > > > > > > > Let's consider it in the future.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 5:16 PM David Jacot
> > > > > > > > > > > > > <dja...@confluent.io.invalid>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Luke,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks for your questions.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 1. We will store the "targetAssignment" into
> > > log
> > > > > now.
> > > > > > > > But as we
> > > > > > > > > > > > > know,
> > > > > > > > > > > > > > > > there's max batch size limit (default 1MB), 
> > > > > > > > > > > > > > > > which
> > > > > means, we
> > > > > > > > > > > cannot
> > > > > > > > > > > > > > > support
> > > > > > > > > > > > > > > > 1M partitions in one group (actually, it should
> > > be
> > > > > less
> > > > > > > > than 60k
> > > > > > > > > > > > > > > partitions
> > > > > > > > > > > > > > > > since we'll store {topicID+partition id}) by
> > > default
> > > > > now.
> > > > > > > > How
> > > > > > > > > > > will we
> > > > > > > > > > > > > > > > handle that? Do we expect users to adjust the 
> > > > > > > > > > > > > > > > max
> > > > > batch
> > > > > > > > size to
> > > > > > > > > > > > > support
> > > > > > > > > > > > > > > > large partitions in groups, which we don't need
> > > this
> > > > > > > > change for
> > > > > > > > > > > old
> > > > > > > > > > > > > > > > protocol?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > That's right. I have a few ideas to remove this
> > > > > limitation
> > > > > > > > in the
> > > > > > > > > > > > > > > > future but I decided to keep them for future
> > > > > improvement.
> > > > > > > > The
> > > > > > > > > > > KIP is
> > > > > > > > > > > > > > > > large enough and as the current protocol suffers
> > > > > from the
> > > > > > > > exact
> > > > > > > > > > > same
> > > > > > > > > > > > > > > > limitation, it is not a regression.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > For the future, my thinking is to split the
> > > > > assignment and
> > > > > > > > to
> > > > > > > > > > > only
> > > > > > > > > > > > > > > > write deltas to the log instead of re-writing
> > > all of
> > > > > it.
> > > > > > > > We would
> > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > to use transactions for this in the coordinator
> > > > > (similarly
> > > > > > > > to
> > > > > > > > > > > > > > > > KIP-868). The challenge is that we have to 
> > > > > > > > > > > > > > > > ensure
> > > > > that
> > > > > > > > those
> > > > > > > > > > > deltas
> > > > > > > > > > > > > > > > are all written or completely roll backed.
> > > > > Otherwise, we
> > > > > > > > would
> > > > > > > > > > > have a
> > > > > > > > > > > > > > > > weird state with the compaction. This needs
> > > > > obviously more
> > > > > > > > > > > thinking.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I'm wondering why we should persist the
> > > > > > > > "targetAssignment"
> > > > > > > > > > > data?
> > > > > > > > > > > > > If we
> > > > > > > > > > > > > > > > want
> > > > > > > > > > > > > > > > to work for coordinator failover, could the new
> > > > > > > > coordinator try
> > > > > > > > > > > to
> > > > > > > > > > > > > > > request
> > > > > > > > > > > > > > > > for currently owned partitions from each 
> > > > > > > > > > > > > > > > consumer
> > > > > when
> > > > > > > > failed
> > > > > > > > > > > over?
> > > > > > > > > > > > > I'm
> > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > sure if the consumer will auto send owned
> > > partitions
> > > > > to
> > > > > > > > the new
> > > > > > > > > > > > > > > > coordinator. If not, maybe we can return an
> > > error to
> > > > > client
> > > > > > > > > > > > > > > > ConsumerGroupHeartbeat API with
> > > > > REQUIRE_OWNED_PARTITION
> > > > > > > > error,
> > > > > > > > > > > and
> > > > > > > > > > > > > ask
> > > > > > > > > > > > > > > > client to append the currently owned partitions
> > > to
> > > > > new
> > > > > > > > > > > coordinates
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > assignment computation. Does that make sense?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > The entire reconciliation process depends on it
> > > so
> > > > > if we
> > > > > > > > lose it
> > > > > > > > > > > > > > > > during a failover, members could be in a weird
> > > > > state. For
> > > > > > > > > > > instance,
> > > > > > > > > > > > > > > > they could be in the middle of a transition from
> > > > > their
> > > > > > > > current
> > > > > > > > > > > > > > > > assignment to their new target and thus would be
> > > > > blocked.
> > > > > > > > > > > Relying on
> > > > > > > > > > > > > > > > members to reconstruct it back does not really
> > > work
> > > > > > > > because they
> > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > > have all the information to do so (e.g. new
> > > > > metadata) so
> > > > > > > > we would
> > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > to recompute a new one. This implies that we
> > > need to
> > > > > get
> > > > > > > > the
> > > > > > > > > > > owned
> > > > > > > > > > > > > > > > partitions from all members and that would take
> > > a few
> > > > > > > > seconds
> > > > > > > > > > > until
> > > > > > > > > > > > > > > > all members come back in the best case, up to 
> > > > > > > > > > > > > > > > the
> > > > > session
> > > > > > > > > > > timeout in
> > > > > > > > > > > > > > > > the worst case. Imagine that a member joins or
> > > fails
> > > > > > > > during this
> > > > > > > > > > > > > time,
> > > > > > > > > > > > > > > > the whole process would be stuck. I am afraid
> > > > > storing it
> > > > > > > > is the
> > > > > > > > > > > best
> > > > > > > > > > > > > > > > way here.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > David
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 5:11 AM Luke Chen <
> > > > > > > > show...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi David,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > A few more questions:
> > > > > > > > > > > > > > > > > 1. We will store the "targetAssignment" into
> > > log
> > > > > now.
> > > > > > > > But as we
> > > > > > > > > > > > > know,
> > > > > > > > > > > > > > > > > there's max batch size limit (default 1MB),
> > > which
> > > > > means,
> > > > > > > > we
> > > > > > > > > > > cannot
> > > > > > > > > > > > > > > > support
> > > > > > > > > > > > > > > > > 1M partitions in one group (actually, it
> > > should be
> > > > > less
> > > > > > > > than
> > > > > > > > > > > 60k
> > > > > > > > > > > > > > > > partitions
> > > > > > > > > > > > > > > > > since we'll store {topicID+partition id}) by
> > > > > default
> > > > > > > > now. How
> > > > > > > > > > > will
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > handle that? Do we expect users to adjust the
> > > max
> > > > > batch
> > > > > > > > size to
> > > > > > > > > > > > > support
> > > > > > > > > > > > > > > > > large partitions in groups, which we don't 
> > > > > > > > > > > > > > > > > need
> > > > > this
> > > > > > > > change
> > > > > > > > > > > for old
> > > > > > > > > > > > > > > > > protocol?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I'm wondering why we should persist the
> > > > > > > > "targetAssignment"
> > > > > > > > > > > data?
> > > > > > > > > > > > > If we
> > > > > > > > > > > > > > > > want
> > > > > > > > > > > > > > > > > to work for coordinator failover, could the 
> > > > > > > > > > > > > > > > > new
> > > > > > > > coordinator
> > > > > > > > > > > try to
> > > > > > > > > > > > > > > > request
> > > > > > > > > > > > > > > > > for currently owned partitions from each
> > > consumer
> > > > > when
> > > > > > > > failed
> > > > > > > > > > > > > over? I'm
> > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > sure if the consumer will auto send owned
> > > > > partitions to
> > > > > > > > the new
> > > > > > > > > > > > > > > > > coordinator. If not, maybe we can return an
> > > error
> > > > > to
> > > > > > > > client
> > > > > > > > > > > > > > > > > ConsumerGroupHeartbeat API with
> > > > > REQUIRE_OWNED_PARTITION
> > > > > > > > error,
> > > > > > > > > > > and
> > > > > > > > > > > > > ask
> > > > > > > > > > > > > > > > > client to append the currently owned
> > > partitions to
> > > > > new
> > > > > > > > > > > coordinates
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > assignment computation. Does that make sense?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao
> > > > > > > > > > > <j...@confluent.io.invalid
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi, David,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks for the reply and the updated KIP. A
> > > few
> > > > > more
> > > > > > > > > > > comments on
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > interfaces and the protocols.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 60.  On the consumer side, do we need both
> > > > > > > > > > > > > > > > PartitionAssignor.onAssignment
> > > > > > > > > > > > > > > > > > and
> > > > > ConsumerRebalanceListener.onPartitionsAssigned? My
> > > > > > > > > > > > > understanding
> > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > that the former was added for cooperative
> > > > > rebalance,
> > > > > > > > which
> > > > > > > > > > > is now
> > > > > > > > > > > > > > > > handled
> > > > > > > > > > > > > > > > > > by the coordinator. If we do need both,
> > > should
> > > > > we make
> > > > > > > > them
> > > > > > > > > > > more
> > > > > > > > > > > > > > > > consistent
> > > > > > > > > > > > > > > > > > (e.g. topic name vs topic id, list vs set vs
> > > > > > > > collection)?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 61. group.local.assignors: Could we make it
> > > > > clear that
> > > > > > > > it's
> > > > > > > > > > > the
> > > > > > > > > > > > > full
> > > > > > > > > > > > > > > > class
> > > > > > > > > > > > > > > > > > name that implements PartitionAssignor?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 62. MemberAssignment: It currently has the
> > > > > following
> > > > > > > > method.
> > > > > > > > > > > > > > > > > >     public Set<TopicPartition>
> > > topicPartitions()
> > > > > > > > > > > > > > > > > > We are adding List<TopicIdPartition>
> > > > > ownedPartitions.
> > > > > > > > Should
> > > > > > > > > > > we
> > > > > > > > > > > > > keep
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > naming and the return type consistent?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 63. MemberAssignment.error: should that be
> > > > > reason?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 64. group.remote.assignor: The client may 
> > > > > > > > > > > > > > > > > > not
> > > > > know what
> > > > > > > > > > > > > assignors the
> > > > > > > > > > > > > > > > > > broker supports. Should we default this to
> > > what
> > > > > the
> > > > > > > > broker
> > > > > > > > > > > > > determines
> > > > > > > > > > > > > > > > (e.g.
> > > > > > > > > > > > > > > > > > first assignor listed in
> > > > > group.consumer.assignors)?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 65. After the text "When A heartbeats again
> > > and
> > > > > > > > acknowledges
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > revocation, the group coordinator 
> > > > > > > > > > > > > > > > > > transitions
> > > > > him to
> > > > > > > > epoch 2
> > > > > > > > > > > and
> > > > > > > > > > > > > > > > releases
> > > > > > > > > > > > > > > > > > foo-2.", we have the following.
> > > > > > > > > > > > > > > > > >   B - epoch=2, partitions=[foo-2],
> > > > > > > > pending-partitions=[]
> > > > > > > > > > > > > > > > > > Should foo-2 be in pending-partitions?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 66. In the Online Migration example, is the
> > > first
> > > > > > > > occurence
> > > > > > > > > > > of
> > > > > > > > > > > > > "C -
> > > > > > > > > > > > > > > > > > epoch=23, partitions=[foo-2, foo-5, foo-4],
> > > > > > > > > > > > > pending-partitions=[]"
> > > > > > > > > > > > > > > > correct?
> > > > > > > > > > > > > > > > > > It seems that should happen after C receives
> > > a
> > > > > > > > SyncGroup
> > > > > > > > > > > > > response? If
> > > > > > > > > > > > > > > > so,
> > > > > > > > > > > > > > > > > > subsequent examples have the same issue.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 67.
> > > > > ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs :
> > > > > > > > Which
> > > > > > > > > > > > > config
> > > > > > > > > > > > > > > > > > controls this? How is this used by the group
> > > > > > > > coordinator
> > > > > > > > > > > since
> > > > > > > > > > > > > there
> > > > > > > > > > > > > > > > is no
> > > > > > > > > > > > > > > > > > sync barrier anymore?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 68. ConsumerGroupHeartbeatResponse:
> > > > > > > > > > > > > > > > > > 68.1 AssignedTopicPartitions and
> > > > > > > > PendingTopicPartitions are
> > > > > > > > > > > of
> > > > > > > > > > > > > type
> > > > > > > > > > > > > > > > > > []TopicPartition. Should they be
> > > TopicPartitions?
> > > > > > > > > > > > > > > > > > 68.2 Assignment.error. Should we also have 
> > > > > > > > > > > > > > > > > > an
> > > > > > > > errorMessage
> > > > > > > > > > > field?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 69.
> > > > > > > > ConsumerGroupPrepareAssignmentResponse.Members.Assignor:
> > > > > > > > > > > > > Should
> > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > include the selected assignor name?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 70.
> > > > > ConsumerGroupInstallAssignmentRequest.GroupEpoch:
> > > > > > > > Should
> > > > > > > > > > > we
> > > > > > > > > > > > > let
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > client set this? Intuitively, it seems the
> > > > > coordinator
> > > > > > > > should
> > > > > > > > > > > > > manage
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > group epoch.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 71. ConsumerGroupDescribeResponse:
> > > > > > > > > > > > > > > > > > 71.1 Members.Assignment.Partitions. Should 
> > > > > > > > > > > > > > > > > > we
> > > > > include
> > > > > > > > the
> > > > > > > > > > > topic
> > > > > > > > > > > > > name
> > > > > > > > > > > > > > > > too
> > > > > > > > > > > > > > > > > > since it's convenient for building tools?
> > > Ditto
> > > > > for
> > > > > > > > > > > > > TargetAssignment.
> > > > > > > > > > > > > > > > > > 71.2 Members: Should we include
> > > > > SubscribedTopicRegex
> > > > > > > > too?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 72. OffsetFetchRequest: Is
> > > > > GenerationIdOrMemberEpoch
> > > > > > > > needed
> > > > > > > > > > > since
> > > > > > > > > > > > > > > > tools may
> > > > > > > > > > > > > > > > > > also want to issue this request?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Jun
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > >
> > > > >
> > >

Reply via email to