Hi Magnus,

Thanks for your comments. Please find my answers below.

1. Right. RebalanceTimeout is max.poll.interval.ms and it is different
from the session timeout. Are there parts in the KIP where this is not
clear? Note that the session timeout is now defined on the server side
so the client does not know it.

2. Noted. I will try to improve this.

3. Right. The member id is generated by the coordinator when the
member joins for the first time. Let me correct that sentence.

4. That's right. Rebalance errors are only defined by a customer
assignor and they are different from the Kafka protocol errors.

5. That makes sense. The timer should start ticking when the heartbeat
request is processed. Let me fix this.

6. A client side assignor is used if one is commonly specified by all
members. A server side assignor is used otherwise. The group
coordinator will pick the most common server side assignor or default
to the first one specified in `group.consumer.assignors` if none are
provided by the members.

7. Yeah, that's a good point. We are a bit limited by the current
implementation of the Java Consumer which requires `poll` to be called
in order to make progress. We have this project to refactor the
threading model of the Java Consumer. Once we have it, we could revise
this choice as we should be able to run the assignor in the background
thread so it should be fairly quick. I have kept the KIP based on the
current implementation for now.

8. The member should not send the final heartbeat with epoch -1 when
static membership is enabled. In other words, it should not leave the
group. The group coordinator will rely on the session timeout to kick
him out if it does not rejoin in time.

9. The `PartitionAssignor` is the public API here.
`PartitionAssignor#metadata` will be consulted when the heartbeat is
constructed. This allows the power users to update the reason and/or
the metadata.

10. That's a good observation. The case that you describe could
happen. I think that treating `FENCED_MEMBER_EPOCH` as a non-terminal
error is the right thing to do here. Even if we were to treat it as a
terminal one, it is very likely that the software would get restarted
so it would rejoin as well. We could improve the protocol to be better
in this case. I am thinking about the following. When a member with
static membership leaves, it could send a final heartbeat with -2 as
epoch to signal to the group coordinator that it is leaving with the
intent to rejoin. With this, we could reject any new member joining
with the same static id if the current member has not left yet and its
lease is not expired. We could perhaps reuse `UNKNOWN_MEMBER_ID` in
this case. That would prevent the case that you described. What do you
think?

11. We will set the `ErrorMessage` with an appropriate message which
contains the supported assignors.

12. That makes sense.

13. I agree that this is a little weird. The current protocol requests
READ ACL to join groups so we have stuck with the same here.

14. We can remove it. I have put it because of the Java client.

15. Right. Let me update this part. They can be updated anytime for
sure but they must be present when the member joins.

16. Nothing. This is independent from the heartbeat process. The
coordinator will select a new member. That could be the same one but
it does not have to.

17. Ideally, the member should not send Prepare...Request while the
heartbeat is in flight because the member epoch could be updated. If
that happens, it can retry it with its new member epoch.

18. Yeah, Read Group like for 13.

19. That's right. The heartbeat process is completely independent of
this assignment process.

20. Right. The heartbeat is the default process that should be running
all the time.

21. I updated it this morning to return `SubscribedTopicNames` and
`SubscribedTopicRegex`.

22. Fixed.

23. This should not happen because a member is allowed to move to its
next epoch only when it is done with the revocation process and the
group coordinator won't give the partition to any other members until
this is done. However, the retry should be based on the actual
assignment, especially if it was updated by the last heartbeat.

34. As soon as the group is converted to a consumer group type, we
will consider it as a member epoch for non-upgraded members as well.

Best,
David

On Mon, Oct 17, 2022 at 12:33 PM Magnus Edenhill <mag...@edenhill.se> wrote:
>
> Hi David,
>
> kudos on a very well designed KIP, this will make a ton of difference for
> the maintainability of client implementations and operational aspects of
> consumer groups!
>
> Some comments:
>
>
> 1. Maybe I missed this, but where does max.poll.interval.ms fit into the
> new protocol?
>    It seems like the new RebalanceTimeout is a mix between
> session.timeout.ms and max.poll.interval.ms, but I think they serve
> different purposes:
>   The session timeout is the member failure detection time, while max.poll
> is the partition ownership time (more or less).
>
> 2. While the KIP is very well written, it has 22 "him"s (and "his") and no
> "her"s. Suggest changing to "the member", "the client", "it", "them", or
> similar.
>
> 3.  "MemberID - The unique identifier of the member. It is generated by the
> client once and must be used during its lifetime." - Later in the document
> it says it is generated by the coordinator. Which is it? Guessing the
> coordinator.
>
> 4. The new rebalance error is an int8, while the Kafka protocol uses int16.
> I'm guessing this is because the rebalance errors are their own thing and
> the existing protocol error codes must not be used for the rebalance
> errors. Is that correct?
>
> 5. "The rebalance timeout is provided by the member when it joins the
> group. It is basically the max poll interval configured on the client side.
> The timer starts ticking when the heartbeat response is sent out by the
> group coordinator." - since there is some arbitrary time (0ms to seconds)
> for the response to make it to the client - this means the coordinator will
> expire the rebalance timeout before the consumer does - which is worse than
> the other way around (consumer before coordinator) - can we flip this
> around so that, from the consumer's perspective, the timer starts ticking
> when it sends the heartbeat request (rather than receives the response)?
>
> 6. "The server side assignor is used if any member specified one. If
> multiple server side assignors are specified in the group, the group
> coordinator uses the most common one." - what's the tie breaker if they are
> equally specified?
>
> 7. "The chosen member is expected to complete the assignment process within
> the rebalance timeout" - Given that the rebalance timeout seems analogue to
> max.poll.interval.ms - which in turn can be quite high - does this reuse of
> timeouts impose a slowness to failure detection during client-side
> assignment operations? max.poll may be >60s, but an assignment should
> finish in under a second - if the same timeouts are used then failure
> detection for an assignment operation will be very long.
>
> 8. "Static Membership (KIP-345)" - how are leaves handled to avoid a
> rebalance when a static member restarts?
>
> 9. "power users will have the ability to trigger a reassignment by either
> providing a non-zero reason or by updating the assignor metadata." - how,
> without a public API?
>
> 10. "FENCED_MEMBER_EPOCH - The member epoch is fenced by the coordinator.
> The member must abandon all its partitions and rejoins." - this is a bit
> different than other uses of fencing in the Kafka protocol, where a fencing
> error is typically handled as a client-fatal error. In particular, is there
> a risk for a fencing-loop if a new consumer with the same instance.id joins
> the group, causing the coordinator to fence off the older consumer, which
> then rejoins and fences off the new consumer, rinse and repeat?
>
> 11. "UNSUPPORTED_ASSIGNOR - The assignor used by the member or its version
> range are not supported by the group." - is there a way to return the
> supported assignors back to the client? I think this would be useful for
> troubleshooting by returning the available server-side assignors to the
> user.
>
> 12. ConsumerGroupHeartbeatRequest - since MemberEpoch is a required field,
> should it perhaps not have a default value?
>
> 13. ConsumerGroupHeartbeatRequest ACLs - only READ GROUP, should it perhaps
> be READ&WRITE given that the request may alter the group state?
>
> 14. "SubscribedTopicNames and SubscribedTopicRegex cannot be used
> together." - why this explicit constraint? I think it is okay if it
> simplifies things. I see how this makes sense for the java Subscribe() API,
> but other clients have a list of topic names to subscribe(), and those
> topic names may or may not be regexes - so this new constraint adds an
> error path.
>
> 15. "SubscribedTopicNames or SubscribedTopicRegex must be in the first
> heartbeat request." - I think "first" is a bit ambiguous here and it would
> be good to be more specific, e.g., only when MemberEpoch is 0. What about
> rejoins due to subscription change?
>
>  16. For Prepare...Request: "Upon receiving the UNKNOWN_MEMBER_ID error,
> the consumer abandon the process." - what does the member do next? Send a
> heartbeat right away or on the next timed interval?
>
> 17. For Prepare...Request: "Upon receiving the STALE_MEMBER_EPOCH error,
> the consumer retries when receiving its next heartbeat response with its
> member epoch." - This is a bit ambiguous, what does it retry?
>
> 18. ConsumerGroupInstallAssignment ACL: also just READ GROUP, but this
> request alters the group state so should be WRITE?
>
> 19. ..InstallAssignmentResponse: "If the response contains no error, the
> member is done." - I guess this means it is done with the assignor, but it
> sitll needs to send off another Heartbeat request to retrieve its own
> assignment, correct?
>
> 20. ..InstallAssignmentResponse: "Upon receiving any other errors, the
> consumer abandon the process." - it is not clear to me what this means in
> practice, does it go back to sending heartbeats?
>
> 21. DescribeGroup.. - There's a SubscribedTopicIds, but no
> SubscribedTopicRegexs, SubscribedTopicNames, etc. Isn't valuable to see
> this as well? In particular for troubleshooting why consumers are not
> receiving messages, you could see that it has a regex that is misspelled
> and does not match any topics, for instance.
>
> 22. ListGroupsResponse: The GroupType description is wrong.
>
> 23. OffsetCommitResponse: "Upon receiving the STALE_MEMBER_EPOCH error, the
> consumer retries when receiving its next heartbeat response with its member
> epoch." - does this mean it should retry the commit once it has received an
> updated member epoch? If so, isn't there a case where another member might
> be assigned the partition-to-be-committed for some time before the
> partition is assigned back to this consumer, which would cause the
> old-but-retried (with a newly acquired epoch) offset commit to commit an
> old outdated offset, effectively overwriting the interim member's committed
> offset?
>
> 24. OffsetCommit and OffsetFetch: reuse of "GenerationIdOrMemberEpoch" - if
> the group transitions between generic and consumer group types, how do we
> know if this field represents a generationId or a MemberEpoch?
>
>
> Regards,
> Magnus
>
> Den mån 17 okt. 2022 kl 11:34 skrev David Jacot <dja...@confluent.io.invalid
> >:
>
> > Hi Jun,
> >
> > Thanks for your comments. Please find my answers below.
> >
> > 60. Sure. Let me use a concrete example to illustrate it. Let's assume
> > that KStreams has a member A with a task reading from foo-0 and a
> > member B with a standby task reading from foo-0. As you know, the
> > standby task information for B is encoded in the assignment metadata
> > whereas the assigned partition for A is in the assignment. Now, let's
> > imagine that the assignor decides to promote the standby task to
> > become the active task for foo-0. The assignor will create a new
> > assignment for the group and will encode the standby task in A's
> > metadata and assign foo-0 to B. A will be requested to revoke foo-0
> > and, while it does so, B will get its new metadata but without foo-0
> > because foo-0 is not revoked yet. From the point of view of B, it will
> > see that the standby task is no longer there. Without providing the
> > full set of assigned partitions, it would not know what to do here. If
> > foo-0 is targeted to be assigned to B, B should wait until it is
> > before stopping the standby task. If foo-0 is not, it can stop the
> > standby immediately. Long story short, KStreams needs to get the full
> > assignment + metadata in order to reason about the correct end state.
> >
> > How do we provide this? We have added the pending partitions in the
> > ConsumerGroupHeartbeatResponse in conjunction to the assigned
> > partitions. With this, the Consumer knows the full set of partitions.
> > PartitionAssignor#onAssign will be called when the member transitions
> > to a new epoch with the target partitions and the metadata for the
> > member. Then, RebalanceListener#onAssignedPartitions is called when
> > partitions are incrementally assigned. At most, it will be called N
> > times where N is the number of assigned partitions in the current
> > epoch.
> >
> > For context, this was discussed with Guohzang in this thread.
> >
> > I hope that this clarifies the intent.
> >
> > 62. Got it. I have reworked that part as well. Unfortunately, I think
> > that we should keep using a Set here because it is already there.
> > Deprecating the current one to change the type is not worth it.
> >
> > 63. Fixed as well.
> >
> > 68.2 Right. The error is only used by the client side assignor. The
> > client assignor will report a non-zero error code while installing a
> > new assignment if it was not able to compute a new assignment for the
> > group. In this case, the assignment is not changed but the error is
> > reported to all the members. KStreams will have a few of such errors.
> > They are listed in the KIP.
> >
> > When a new assignment is installed with the
> > ConsumerGroupInstallAssignment API, the coordinator validates it and
> > rejects the installation directly if it is not valid. In this case,
> > only the member trying to install the assignment gets an error. The
> > other members are not aware of it as they keep their current assignor.
> >
> > 71.1 That makes sense. Using SubscribedTopicNames is indeed more
> > intuitive. I have changed the HB request to use it in order to be
> > consistent. I have also added the topic names in the
> > ConsumerGroupDescribe response.
> >
> > Best,
> > David
> >
> > On Fri, Oct 14, 2022 at 8:05 PM Jun Rao <j...@confluent.io.invalid> wrote:
> > >
> > > Hi, David,
> > >
> > > Thanks for the reply.
> > >
> > > 60. Hmm, could you explain why KStreams needs the full set of partition
> > > assignments? I am also not sure how this will be implemented based on the
> > > protocol. Since HeartBeatResponse sends the assigned partitions in phases
> > > (those that don't need to wait for revocation from other members first,
> > > followed by the full assignment list), how does a member know which
> > > response has the full assignment?
> > >
> > > 62. I was referring to Admin#describeConsumerGroups. It seems that
> > > ownedPartitions is still of type List<TopicIdPartition>. Also, the
> > > existing topicPartitions() returns Set<TopicPartition>, not a collection.
> > >
> > > 63. This is also in Admin#describeConsumerGroups. The comment seems
> > > inconsistent with the field name.
> > >     /**
> > >      * The reason reported by the assignor.
> > >      */
> > >     byte error;
> > >
> > > 67. Thanks for the explanation. Make sense. The existing name may be ok.
> > >
> > > 68.2 Are you saying the error is only intended for the client assignor?
> > But
> > > the coordinator generates the error based on the server side validation,
> > > right? Should we provide some info to tell the client why the validation
> > > fails?
> > >
> > > 71.1 Hmm, for SubscribedTopicIds, should we use topic name in the
> > > subscription part? That seems more intuitive---a subscription shouldn't
> > > change just because a topic is recreated. For the assigned partitions,
> > > perhaps we could include both topicId and name just like
> > FetchOffsetRequest.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Oct 14, 2022 at 2:49 AM Luke Chen <show...@gmail.com> wrote:
> > >
> > > > Thanks for the update.
> > > > Yes, I think using similar way as KIP-868 to fix this issue makes
> > sense.
> > > > Let's consider it in the future.
> > > >
> > > > Luke
> > > >
> > > > On Fri, Oct 14, 2022 at 5:16 PM David Jacot
> > <dja...@confluent.io.invalid>
> > > > wrote:
> > > >
> > > > > Hi Luke,
> > > > >
> > > > > Thanks for your questions.
> > > > >
> > > > > > 1. We will store the "targetAssignment" into log now. But as we
> > know,
> > > > > there's max batch size limit (default 1MB), which means, we cannot
> > > > support
> > > > > 1M partitions in one group (actually, it should be less than 60k
> > > > partitions
> > > > > since we'll store {topicID+partition id}) by default now. How will we
> > > > > handle that? Do we expect users to adjust the max batch size to
> > support
> > > > > large partitions in groups, which we don't need this change for old
> > > > > protocol?
> > > > >
> > > > > That's right. I have a few ideas to remove this limitation in the
> > > > > future but I decided to keep them for future improvement. The KIP is
> > > > > large enough and as the current protocol suffers from the exact same
> > > > > limitation, it is not a regression.
> > > > >
> > > > > For the future, my thinking is to split the assignment and to only
> > > > > write deltas to the log instead of re-writing all of it. We would
> > need
> > > > > to use transactions for this in the coordinator (similarly to
> > > > > KIP-868). The challenge is that we have to ensure that those deltas
> > > > > are all written or completely roll backed. Otherwise, we would have a
> > > > > weird state with the compaction. This needs obviously more thinking.
> > > > >
> > > > > > I'm wondering why we should persist the "targetAssignment" data?
> > If we
> > > > > want
> > > > > to work for coordinator failover, could the new coordinator try to
> > > > request
> > > > > for currently owned partitions from each consumer when failed over?
> > I'm
> > > > not
> > > > > sure if the consumer will auto send owned partitions to the new
> > > > > coordinator. If not, maybe we can return an error to client
> > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and
> > ask
> > > > > client to append the currently owned partitions to new coordinates
> > for
> > > > new
> > > > > assignment computation. Does that make sense?
> > > > >
> > > > > The entire reconciliation process depends on it so if we lose it
> > > > > during a failover, members could be in a weird state. For instance,
> > > > > they could be in the middle of a transition from their current
> > > > > assignment to their new target and thus would be blocked. Relying on
> > > > > members to reconstruct it back does not really work because they
> > don't
> > > > > have all the information to do so (e.g. new metadata) so we would
> > have
> > > > > to recompute a new one. This implies that we need to get the owned
> > > > > partitions from all members and that would take a few seconds until
> > > > > all members come back in the best case, up to the session timeout in
> > > > > the worst case. Imagine that a member joins or fails during this
> > time,
> > > > > the whole process would be stuck. I am afraid storing it is the best
> > > > > way here.
> > > > >
> > > > > Best,
> > > > > David
> > > > >
> > > > >
> > > > > On Fri, Oct 14, 2022 at 5:11 AM Luke Chen <show...@gmail.com> wrote:
> > > > > >
> > > > > > Hi David,
> > > > > >
> > > > > > A few more questions:
> > > > > > 1. We will store the "targetAssignment" into log now. But as we
> > know,
> > > > > > there's max batch size limit (default 1MB), which means, we cannot
> > > > > support
> > > > > > 1M partitions in one group (actually, it should be less than 60k
> > > > > partitions
> > > > > > since we'll store {topicID+partition id}) by default now. How will
> > we
> > > > > > handle that? Do we expect users to adjust the max batch size to
> > support
> > > > > > large partitions in groups, which we don't need this change for old
> > > > > > protocol?
> > > > > >
> > > > > > I'm wondering why we should persist the "targetAssignment" data?
> > If we
> > > > > want
> > > > > > to work for coordinator failover, could the new coordinator try to
> > > > > request
> > > > > > for currently owned partitions from each consumer when failed
> > over? I'm
> > > > > not
> > > > > > sure if the consumer will auto send owned partitions to the new
> > > > > > coordinator. If not, maybe we can return an error to client
> > > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION error, and
> > ask
> > > > > > client to append the currently owned partitions to new coordinates
> > for
> > > > > new
> > > > > > assignment computation. Does that make sense?
> > > > > >
> > > > > > Luke
> > > > > >
> > > > > > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao <j...@confluent.io.invalid
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi, David,
> > > > > > >
> > > > > > > Thanks for the reply and the updated KIP. A few more comments on
> > the
> > > > > > > interfaces and the protocols.
> > > > > > >
> > > > > > > 60.  On the consumer side, do we need both
> > > > > PartitionAssignor.onAssignment
> > > > > > > and ConsumerRebalanceListener.onPartitionsAssigned? My
> > understanding
> > > > is
> > > > > > > that the former was added for cooperative rebalance, which is now
> > > > > handled
> > > > > > > by the coordinator. If we do need both, should we make them more
> > > > > consistent
> > > > > > > (e.g. topic name vs topic id, list vs set vs collection)?
> > > > > > >
> > > > > > > 61. group.local.assignors: Could we make it clear that it's the
> > full
> > > > > class
> > > > > > > name that implements PartitionAssignor?
> > > > > > >
> > > > > > > 62. MemberAssignment: It currently has the following method.
> > > > > > >     public Set<TopicPartition> topicPartitions()
> > > > > > > We are adding List<TopicIdPartition> ownedPartitions. Should we
> > keep
> > > > > the
> > > > > > > naming and the return type consistent?
> > > > > > >
> > > > > > > 63. MemberAssignment.error: should that be reason?
> > > > > > >
> > > > > > > 64. group.remote.assignor: The client may not know what
> > assignors the
> > > > > > > broker supports. Should we default this to what the broker
> > determines
> > > > > (e.g.
> > > > > > > first assignor listed in group.consumer.assignors)?
> > > > > > >
> > > > > > > 65. After the text "When A heartbeats again and acknowledges the
> > > > > > > revocation, the group coordinator transitions him to epoch 2 and
> > > > > releases
> > > > > > > foo-2.", we have the following.
> > > > > > >   B - epoch=2, partitions=[foo-2], pending-partitions=[]
> > > > > > > Should foo-2 be in pending-partitions?
> > > > > > >
> > > > > > > 66. In the Online Migration example, is the first occurence of
> > "C -
> > > > > > > epoch=23, partitions=[foo-2, foo-5, foo-4],
> > pending-partitions=[]"
> > > > > correct?
> > > > > > > It seems that should happen after C receives a SyncGroup
> > response? If
> > > > > so,
> > > > > > > subsequent examples have the same issue.
> > > > > > >
> > > > > > > 67. ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs : Which
> > config
> > > > > > > controls this? How is this used by the group coordinator since
> > there
> > > > > is no
> > > > > > > sync barrier anymore?
> > > > > > >
> > > > > > > 68. ConsumerGroupHeartbeatResponse:
> > > > > > > 68.1 AssignedTopicPartitions and PendingTopicPartitions are of
> > type
> > > > > > > []TopicPartition. Should they be TopicPartitions?
> > > > > > > 68.2 Assignment.error. Should we also have an errorMessage field?
> > > > > > >
> > > > > > > 69. ConsumerGroupPrepareAssignmentResponse.Members.Assignor:
> > Should
> > > > it
> > > > > > > include the selected assignor name?
> > > > > > >
> > > > > > > 70. ConsumerGroupInstallAssignmentRequest.GroupEpoch: Should we
> > let
> > > > the
> > > > > > > client set this? Intuitively, it seems the coordinator should
> > manage
> > > > > the
> > > > > > > group epoch.
> > > > > > >
> > > > > > > 71. ConsumerGroupDescribeResponse:
> > > > > > > 71.1 Members.Assignment.Partitions. Should we include the topic
> > name
> > > > > too
> > > > > > > since it's convenient for building tools? Ditto for
> > TargetAssignment.
> > > > > > > 71.2 Members: Should we include SubscribedTopicRegex too?
> > > > > > >
> > > > > > > 72. OffsetFetchRequest: Is GenerationIdOrMemberEpoch needed since
> > > > > tools may
> > > > > > > also want to issue this request?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > >
> > > >
> >

Reply via email to