Hi Guozhang,

I see now that we should have addressed the method for generating
topology IDs in the KIP, even if it is somewhat of an implementation
detail. We did not include it in the KIP, because it can be changed,
and there doesn't seem to be a perfect choice, so we wanted to
sidestep a lengthy discussion.
I think the topology ID has the following requirements:
 1) For clients running exactly the same build of the streams
application, it must be the same. This is the most important
requirement.
 2) From the protocol side, we have to change the topology ID whenever
the set of subtopologies, input topics or internal topics changes,
since this is what triggers reinitialization of the group.
 3) The topology ID should "detect" as many changes to the topology as possible.

Requirement 3 (lets call it "sensitivity" of the topology ID) is
somewhat of a nice-to-have, but I think it can give good information
about the state of the streams group. If the group is in a partially
upgraded state, and some instances run a new filter predicate and some
don't, this is something that we want to detect and tell the user
about. This will be fine during a rolling bounce but will cause lots
of problems if in a permanent state.

Ideally, the topology ID would come from something like the git hash
of the streams application, which is fed to the protocol through a
public config. However, topology evolution is such an advanced topic,
and we don't want to burden every user with implementing a workflow to
set a correct topology ID. Therefore, we want to provide a default
implementation for the topology ID which is "good enough" in most
cases. Think of the default implementation to be something like
hash(topology.describe()) -- although it probably makes sense to
implement it more efficiently.

If we agree to use a sensitive topology ID, we'd need to make sure
that rolling bounces still work without completely pausing processing.
That's why the protocol tries to be "clever" about it.

Cheers,
Lucas




On Tue, Aug 20, 2024 at 4:59 AM Guozhang Wang
<guozhang.wang...@gmail.com> wrote:
>
> Thanks for the replies Lucas. Just to help me better understand here:
> today, do all kinds of modifications to the processing logic, not only
> topological changes, but also say logical changes (e.g. changing the
> threshold value of a filter, or changes that do not impact the
> generated list of tasks as they only touch on the sub-topology of
> single tasks) change the topology ID? My original thought was that:
>
> * For topological changes that would result in a different list of
> generated tasks, then letting those clients continue processing during
> the bounce may not help since those continued-running tasks may be
> removed after the bounce anyways.
> * For other changes that do not change the list of generated tasks,
> but only the logic within certain tasks, like a single operator
> change, or changes within a sub-topology while the input / output data
> format stays the same, then we possibly do not event need to change
> the topology ID at all (this is where I'm not sure about).
> * In practice, most of the changes are in the second case, where we
> may actually be fine without a topology ID change (this is again where
> I'm not sure about).
>
> If any of those assumptions are not true in practice, then I agree
> it's still worth adding the complexity to continue processing for any
> processing logic changes.
>
> Re: initialization step, sounds good, in that case we would still keep it 
> then.
>
> On Mon, Aug 19, 2024 at 8:52 AM Lucas Brutschy
> <lbruts...@confluent.io.invalid> wrote:
> >
> > Hi Guozhang,
> >
> > thanks for clarifying. I think I understand better what you meant now,
> > However, my question remains - wouldn't that effectively make a
> > "rolling bounce" like an offline upgrade, if the application
> > effectively halts processing during the roll? I agree that could be
> > simpler, but it would also mean we cannot remain available even during
> > minor modifications to the topology - something that is possible
> > today.
> >
> > w.r.t. a) - we'd still need to have an initialization step at some
> > point to tell the broker about the required internal topics etc.
> >
> > Cheers,
> > Lucas
> >
> > On Mon, Aug 19, 2024 at 5:19 PM Guozhang Wang
> > <guozhang.wang...@gmail.com> wrote:
> > >
> > > Hi Lucas,
> > >
> > > From the current description in section "Topology updates", my
> > > understanding is that a) coordinator will remember a topology ID as
> > > the group topology ID, which has to be initialized and agreed by
> > > everyone in the current generation; b) when forming a new generation,
> > > if some members has a topology ID which is different from the group
> > > topology ID that previous generation's members all agree, we will try
> > > to act smart by not assigning any new tasks to these members, will
> > > still give them old tasks (if any) that they own in previous
> > > generation, c) we allow clients to initialize a new topology Id.
> > >
> > > I'm feeling simply that the above complex logic may not be worth it
> > > (plus, what if some tasks no longer exist under the new topology ID
> > > etc, in all we need to consider quite some different corner cases).
> > > What if we just :
> > >
> > > a) do not have the "initialize topology" logic at all, and
> > > b) do not try to try to do assignment, including trying to give the
> > > ones with inconsistent IDs their previous tasks, etc; but simply
> > > c) in any generations, if not every member agrees on the same topology
> > > ID, simply do not perform new assignment, and return an warning code
> > > telling every client there are other peer's whose topology are
> > > different (of course it could be because of a rolling bounce, so no
> > > need to shout out as an ERROR but just WARN or even INFO), every
> > > client will just act as if there's no new assignment received. This is
> > > what I meant by "blocking the progress" since as we did not perform a
> > > new assignment, the new topology ID would not be accepted and hence in
> > > an rolling bounce upgrade case the new application's topology would
> > > not be executed. And if it keeps happening, operators would use
> > > DescribeStreamsGroup to ping down who are the culprits.
> > >
> > >
> > >
> > > On Mon, Aug 19, 2024 at 7:06 AM Lucas Brutschy
> > > <lbruts...@confluent.io.invalid> wrote:
> > > >
> > > > Hi Guozhang,
> > > >
> > > > Thanks for reviewing the KIP, your feedback is extremely valuable.
> > > >
> > > > I think your analysis is quite right  - we care about cases a) and b)
> > > > and I generally agree - we want the protocol to be simple and
> > > > debuggable. Situation a) should be relatively rare since in the common
> > > > case all streams applications run from the same jar/build, and we
> > > > shouldn't have zombies that don't update to a new topology. In this
> > > > case, it should just be easy to debug. In situation b), things should
> > > > "just work". And I think both are enabled by the KIP. In particular,
> > > > these situations should be relatively easy to debug:
> > > >
> > > >  - Using DescribeStreamsGroup, you can find out the topology ID of the
> > > > group and the topology ID of each member, to understand
> > > > inconsistencies.
> > > >  - Inconsistent clients and even the broker could log messages to
> > > > indicate the inconsistencies.
> > > >  - One could also consider exposing the number of clients by topology
> > > > IDs as a metric, to enhance observability (this is not in the KIP
> > > > yet).
> > > >
> > > > What I'm not sure about is, what you mean precisely by temporarily
> > > > blocking progress of the group? Do you propose to stop processing
> > > > altogether if topology IDs don't match - wouldn't that defy the aim of
> > > > doing a rolling bounce of the application (in case b)?
> > > >
> > > > Cheers,
> > > > Lucas
> > > >
> > > > On Mon, Aug 19, 2024 at 3:59 PM Lucas Brutschy <lbruts...@confluent.io> 
> > > > wrote:
> > > > >
> > > > > Hi Nick,
> > > > >
> > > > > NT4: As discussed, we will still require locking in the new protocol
> > > > > to avoid concurrent read/write access on the checkpoint file, at least
> > > > > as long as KIP-1035 hasn't landed. However, as you correctly pointed
> > > > > out, the assignor will have to accept offsets for overlapping sets of
> > > > > dormant tasks. I updated the KIP to make this explicit. If the
> > > > > corresponding offset information for one task conflicts between
> > > > > clients (which can happen), the conflict is resolved by taking the
> > > > > maximum of the offsets.
> > > > >
> > > > > Cheers,
> > > > > Lucas
> > > > >
> > > > > On Fri, Aug 16, 2024 at 7:14 PM Guozhang Wang
> > > > > <guozhang.wang...@gmail.com> wrote:
> > > > > >
> > > > > > Hello Lucas,
> > > > > >
> > > > > > Thanks for the great KIP. I've read it through and it looks good to
> > > > > > me. As we've discussed, much of my thoughts would be outside the 
> > > > > > scope
> > > > > > of this very well scoped and defined KIP, so I will omit them for 
> > > > > > now.
> > > > > >
> > > > > > The only one I had related to this KIP is about topology updating. I
> > > > > > understand the motivation of the proposal is that basically since 
> > > > > > each
> > > > > > time group forming a (new) generation may potentially accept not all
> > > > > > of the members joining because of the timing of the RPCs, the 
> > > > > > group's
> > > > > > topology ID may be not reflecting the "actual" most recent 
> > > > > > topologies
> > > > > > if some zombie members holding an old topology form a group 
> > > > > > generation
> > > > > > quickly enough, which would effectively mean that zombie members
> > > > > > actually blocking other real members from getting tasks assigned. On
> > > > > > the other hand, like you've mentioned already in the doc, requesting
> > > > > > some sort of ID ordering by pushing the burden on the user's side
> > > > > > would also be too much for users, increasing the risk of human 
> > > > > > errors
> > > > > > in operations.
> > > > > >
> > > > > > I'm wondering if instead of trying to be smart programmingly, we 
> > > > > > just
> > > > > > let the protocol to act dumbly (details below). The main reasons I 
> > > > > > had
> > > > > > in mind are:
> > > > > >
> > > > > > 1) Upon topology changes, some tasks may no longer exist in the new
> > > > > > topology, so still letting them execute on the clients which do not
> > > > > > yet have the new topology would waste resources.
> > > > > >
> > > > > > 2) As we discussed, trying to act smart introduces more complexities
> > > > > > in the coordinator that tries to balance different assignment goals
> > > > > > between stickiness, balance, and now topology mis-matches between
> > > > > > clients.
> > > > > >
> > > > > > 3) Scenarios that mismatching topologies be observed within a group 
> > > > > > generation:
> > > > > >    a. Zombie / old clients that do not have the new topology, and 
> > > > > > will
> > > > > > never have.
> > > > > >    b. During a rolling bounce upgrade, where not-yet-bounced clients
> > > > > > would not yet have the new topology.
> > > > > >    c. Let's assume we would not ever have scenarios where users want
> > > > > > to intentionally have a subset of clients within a group running a
> > > > > > partial / subset of the full sub-topologies, since such cases can 
> > > > > > well
> > > > > > be covered by a custom assignor that takes into those considerations
> > > > > > by never assigning some tasks to some clients etc. That means, the
> > > > > > only scenarios we would need to consider are a) and b).
> > > > > >
> > > > > > For b), I think it's actually okay to temporarily block the progress
> > > > > > of the group until everyone is bounced with the updated topology; as
> > > > > > for a), originally I thought having one or a few clients blocking 
> > > > > > the
> > > > > > whole group would be a big problem, but now that I think more, I 
> > > > > > felt
> > > > > > from the operations point of view, just letting the app being 
> > > > > > blocked
> > > > > > with a informational log entry to quickly ping-down the zombie 
> > > > > > clients
> > > > > > may actually be acceptable. All in all, it makes the code simpler
> > > > > > programmingly by not trying to abstract away issue scenario a) from
> > > > > > the users (or operators) but letting them know asap.
> > > > > >
> > > > > > ----------
> > > > > >
> > > > > > Other than that, everything else looks good to me.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 16, 2024 at 7:38 AM Nick Telford 
> > > > > > <nick.telf...@gmail.com> wrote:
> > > > > > >
> > > > > > > Hi Lucas,
> > > > > > >
> > > > > > > NT4.
> > > > > > > Given that the new assignment procedure guarantees that a Task 
> > > > > > > has been
> > > > > > > closed before it is assigned to a different client, I don't think 
> > > > > > > there
> > > > > > > should be a problem with concurrent access? I don't think we 
> > > > > > > should worry
> > > > > > > too much about 1035 here, as it's orthogonal to 1071. I don't 
> > > > > > > think that
> > > > > > > 1035 *requires* the locking, and indeed once 1071 is the only 
> > > > > > > assignment
> > > > > > > mechanism, we should be able to do away with the locking 
> > > > > > > completely (I
> > > > > > > think).
> > > > > > >
> > > > > > > Anyway, given your point about it not being possible to guarantee 
> > > > > > > disjoint
> > > > > > > sets, does it make sense to require clients to continue to supply 
> > > > > > > the lags
> > > > > > > for only a subset of the dormant Tasks on-disk? Wouldn't it be 
> > > > > > > simpler to
> > > > > > > just have them supply everything, since the assignor has to handle
> > > > > > > overlapping sets anyway?
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Nick
> > > > > > >
> > > > > > > On Fri, 16 Aug 2024 at 13:51, Lucas Brutschy 
> > > > > > > <lbruts...@confluent.io.invalid>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Nick,
> > > > > > > >
> > > > > > > > NT4. I think it will be hard anyway to ensure that the assignor 
> > > > > > > > always
> > > > > > > > gets disjoint sets (there is no synchronized rebalance point 
> > > > > > > > anymore,
> > > > > > > > so locks wouldn't prevent two clients reporting the same dormant
> > > > > > > > task). So I think we'll have to lift this restriction. I was 
> > > > > > > > thinking
> > > > > > > > more that locking is required to prevent concurrent access. In
> > > > > > > > particular, I was expecting that the lock will avoid two threads
> > > > > > > > opening the same RocksDB in KIP-1035. Wouldn't this cause 
> > > > > > > > problems?
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Lucas
> > > > > > > >
> > > > > > > > On Fri, Aug 16, 2024 at 11:34 AM Nick Telford 
> > > > > > > > <nick.telf...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi Lucas,
> > > > > > > > >
> > > > > > > > > NT4.
> > > > > > > > > The reason I mentioned this was because, while implementing 
> > > > > > > > > 1035, I
> > > > > > > > > stumbled across a problem: initially I had changed it so that 
> > > > > > > > > threads
> > > > > > > > > always reported the lag for *all* dormant Tasks on-disk, even 
> > > > > > > > > if it meant
> > > > > > > > > multiple threads reporting lag for the same Tasks. I found 
> > > > > > > > > that this
> > > > > > > > didn't
> > > > > > > > > work, apparently because the assignor assumes that multiple 
> > > > > > > > > threads on
> > > > > > > > the
> > > > > > > > > same instance always report disjoint sets.
> > > > > > > > >
> > > > > > > > > From reading through 1071, it sounded like this assumption is 
> > > > > > > > > no longer
> > > > > > > > > being made by the assignor, and that the processId field 
> > > > > > > > > would allow the
> > > > > > > > > assignor to understand when multiple clients reporting lag 
> > > > > > > > > for the same
> > > > > > > > > Tasks are on the same instance. This would enable us to do 
> > > > > > > > > away with the
> > > > > > > > > locking when reporting lag, and just have threads report the 
> > > > > > > > > lag for
> > > > > > > > every
> > > > > > > > > Task on-disk, even if other threads are reporting lag for the 
> > > > > > > > > same Tasks.
> > > > > > > > >
> > > > > > > > > But it sounds like this is not correct, and that the new 
> > > > > > > > > assignor will
> > > > > > > > make
> > > > > > > > > the same assumptions as the old one?
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Nick
> > > > > > > > >
> > > > > > > > > On Fri, 16 Aug 2024 at 10:17, Lucas Brutschy 
> > > > > > > > > <lbruts...@confluent.io
> > > > > > > > .invalid>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Nick!
> > > > > > > > > >
> > > > > > > > > > Thanks for getting involved in the discussion.
> > > > > > > > > >
> > > > > > > > > > NT1. We are always referring to offsets in the changelog 
> > > > > > > > > > topics here.
> > > > > > > > > > I tried to make it more consistent. But in the schemas and 
> > > > > > > > > > API, I find
> > > > > > > > > > "task changelog end offset" a bit lengthy, so we use "task 
> > > > > > > > > > offset" and
> > > > > > > > > > "task end offset" for short. We could change it, if people 
> > > > > > > > > > think this
> > > > > > > > > > is confusing.
> > > > > > > > > >
> > > > > > > > > > NT2. You are right. The confusing part is that the current 
> > > > > > > > > > streams
> > > > > > > > > > config is called `max.warmup.replicas`, but in the new 
> > > > > > > > > > protocol, we
> > > > > > > > > > are bounding the group-level parameter using
> > > > > > > > > > `group.streams.max.warmup.replicas`. If we wanted to keep
> > > > > > > > > > `group.streams.max.warmup.replicas` for the config name on 
> > > > > > > > > > the
> > > > > > > > > > group-level, we'd have to bound it using
> > > > > > > > > > `group.streams.max.max.warmup.replicas`. I prefer not doing 
> > > > > > > > > > this, but
> > > > > > > > > > open to suggestions.
> > > > > > > > > >
> > > > > > > > > > NT3. You are right, we do not need to make it this 
> > > > > > > > > > restrictive. I
> > > > > > > > > > think the main problem with having 10,000 warm-up replicas 
> > > > > > > > > > would be
> > > > > > > > > > that it slows down the assignment inside the broker - once 
> > > > > > > > > > we are
> > > > > > > > > > closer to production-ready implementation, we may have 
> > > > > > > > > > better numbers
> > > > > > > > > > of this and may revisit these defaults. I'll set the max to 
> > > > > > > > > > 100 for
> > > > > > > > > > now, but it would be good to hear what values people 
> > > > > > > > > > typically use in
> > > > > > > > > > their production workloads.
> > > > > > > > > >
> > > > > > > > > > NT4. We will actually only report the offsets if we manage 
> > > > > > > > > > to acquire
> > > > > > > > > > the lock. I tried to make this more precise. I suppose also 
> > > > > > > > > > with
> > > > > > > > > > KIP-1035, we'd require the lock to read the offset?
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Lucas
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 15, 2024 at 8:40 PM Nick Telford 
> > > > > > > > > > <nick.telf...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > Hi everyone,
> > > > > > > > > > >
> > > > > > > > > > > Looks really promising, and I can see this resolving 
> > > > > > > > > > > several issues
> > > > > > > > I've
> > > > > > > > > > > noticed. I particularly like the choice to use a String 
> > > > > > > > > > > for
> > > > > > > > Subtopology
> > > > > > > > > > ID,
> > > > > > > > > > > as it will (eventually) lead to a better solution to 
> > > > > > > > > > > KIP-816.
> > > > > > > > > > >
> > > > > > > > > > > I noticed a few typos in the KIP that I thought I'd 
> > > > > > > > > > > mention:
> > > > > > > > > > >
> > > > > > > > > > > NT1.
> > > > > > > > > > > In several places you refer to "task changelog end 
> > > > > > > > > > > offsets", while in
> > > > > > > > > > > others, you call it "task end offsets". Which is it?
> > > > > > > > > > >
> > > > > > > > > > > NT2.
> > > > > > > > > > > Under "Group Configurations", you included
> > > > > > > > > > > "group.streams.max.warmup.replicas", but I think you meant
> > > > > > > > > > > "group.streams.num.warmup.replicas"?
> > > > > > > > > > >
> > > > > > > > > > > NT3.
> > > > > > > > > > > Not a typo, but a suggestion: it makes sense to set the 
> > > > > > > > > > > default for
> > > > > > > > > > > "group.streams.num.warmup.replicas" to 2, for 
> > > > > > > > > > > compatibility with the
> > > > > > > > > > > existing defaults, but why set the default for
> > > > > > > > > > > "group.streams.max.warmup.replicas" to only 4? That seems 
> > > > > > > > > > > extremely
> > > > > > > > > > > restrictive. These "max" configs are typically used to 
> > > > > > > > > > > prevent a
> > > > > > > > subset
> > > > > > > > > > of
> > > > > > > > > > > users causing problems on the shared broker cluster - 
> > > > > > > > > > > what's the
> > > > > > > > reason
> > > > > > > > > > to
> > > > > > > > > > > set such a restrictive value for max warmup replicas? If 
> > > > > > > > > > > I had 10,000
> > > > > > > > > > > warmup replicas, would it cause a noticeable problem on 
> > > > > > > > > > > the brokers?
> > > > > > > > > > >
> > > > > > > > > > > NT4.
> > > > > > > > > > > It's implied that clients send the changelog offsets for 
> > > > > > > > > > > *all*
> > > > > > > > dormant
> > > > > > > > > > > stateful Tasks, but the current behaviour is that clients 
> > > > > > > > > > > will only
> > > > > > > > send
> > > > > > > > > > > the changelog offsets for the stateful Tasks that they 
> > > > > > > > > > > are able to
> > > > > > > > lock
> > > > > > > > > > > on-disk. Since this is a change in behaviour, perhaps 
> > > > > > > > > > > this should be
> > > > > > > > > > called
> > > > > > > > > > > out explicitly?
> > > > > > > > > > >
> > > > > > > > > > > Regards,
> > > > > > > > > > > Nick
> > > > > > > > > > >
> > > > > > > > > > > On Thu, 15 Aug 2024 at 10:55, Lucas Brutschy 
> > > > > > > > > > > <lbruts...@confluent.io
> > > > > > > > > > .invalid>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Andrew,
> > > > > > > > > > > >
> > > > > > > > > > > > thanks for the comment.
> > > > > > > > > > > >
> > > > > > > > > > > > AS12: I clarified the command-line interface. It's 
> > > > > > > > > > > > supposed to be
> > > > > > > > used
> > > > > > > > > > > > with --reset-offsets and --delete-offsets. I removed 
> > > > > > > > > > > > --topic.
> > > > > > > > > > > >
> > > > > > > > > > > > AS13: Yes, it's --delete. I clarified the command-line 
> > > > > > > > > > > > interface.
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > > Lucas
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Aug 13, 2024 at 4:14 PM Andrew Schofield
> > > > > > > > > > > > <andrew_schofi...@live.com> wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi Lucas,
> > > > > > > > > > > > > Thanks for the KIP update.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I think that `kafka-streams-groups.sh` looks like a 
> > > > > > > > > > > > > good
> > > > > > > > equivalent
> > > > > > > > > > to
> > > > > > > > > > > > > the tools for the other types of groups.
> > > > > > > > > > > > >
> > > > > > > > > > > > > AS12: In kafka-streams-groups.sh, the description for 
> > > > > > > > > > > > > the
> > > > > > > > > > > > > --input-topics option seems insufficient. Why is an 
> > > > > > > > > > > > > input topic
> > > > > > > > > > specified
> > > > > > > > > > > > > with this option different than a topic specified 
> > > > > > > > > > > > > with --topic?
> > > > > > > > Why
> > > > > > > > > > is
> > > > > > > > > > > > > It --input-topics rather than --input-topic? Which 
> > > > > > > > > > > > > action of this
> > > > > > > > > > tool
> > > > > > > > > > > > > does this option apply to?
> > > > > > > > > > > > >
> > > > > > > > > > > > > AS13: Similarly, for --internal-topics, which action 
> > > > > > > > > > > > > of the tool
> > > > > > > > > > does it
> > > > > > > > > > > > > apply to? I suppose it’s --delete, but it’s not clear 
> > > > > > > > > > > > > to me.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Andrew
> > > > > > > > > > > > >
> > > > > > > > > > > > > > On 11 Aug 2024, at 12:10, Lucas Brutschy <
> > > > > > > > lbruts...@confluent.io
> > > > > > > > > > .INVALID>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Andrew/Lianet,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I have added an administrative command-line tool 
> > > > > > > > > > > > > > (replacing
> > > > > > > > > > > > > > `kafka-streams-application-reset`) and extensions 
> > > > > > > > > > > > > > of the Admin
> > > > > > > > API
> > > > > > > > > > for
> > > > > > > > > > > > > > listing, deleting, describing groups and listing, 
> > > > > > > > > > > > > > altering and
> > > > > > > > > > > > > > deleting offsets for streams groups. No new RPCs 
> > > > > > > > > > > > > > have to be
> > > > > > > > added,
> > > > > > > > > > > > > > however, we duplicate some of the API in the admin 
> > > > > > > > > > > > > > client that
> > > > > > > > > > exist
> > > > > > > > > > > > > > for consumer groups. It seems to me cleaner to 
> > > > > > > > > > > > > > duplicate some
> > > > > > > > > > > > > > code/interface here, instead of using "consumer 
> > > > > > > > > > > > > > group" APIs for
> > > > > > > > > > > > > > streams groups, or renaming existing APIs that use
> > > > > > > > "consumerGroup"
> > > > > > > > > > in
> > > > > > > > > > > > > > the name to something more generic (which wouldn't 
> > > > > > > > > > > > > > cover share
> > > > > > > > > > > > > > groups).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I think for now, all comments are addressed.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > Lucas
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy <
> > > > > > > > > > lbruts...@confluent.io>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Hi Lianet and Andrew,
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> LM1/LM2: You are right. The idea is to omit fields 
> > > > > > > > > > > > > >> exactly in
> > > > > > > > the
> > > > > > > > > > same
> > > > > > > > > > > > > >> situations as in KIP-848. In the KIP, I stuck with 
> > > > > > > > > > > > > >> how the
> > > > > > > > > > behavior
> > > > > > > > > > > > > >> was defined in KIP-848 (e.g. KIP-848 defined that 
> > > > > > > > > > > > > >> that
> > > > > > > > instance ID
> > > > > > > > > > > > > >> will be omitted if it did not change since the last
> > > > > > > > heartbeat).
> > > > > > > > > > But
> > > > > > > > > > > > > >> you are correct that the implementation handles 
> > > > > > > > > > > > > >> these details
> > > > > > > > > > slightly
> > > > > > > > > > > > > >> differently. I updated the KIP to match more 
> > > > > > > > > > > > > >> closely the
> > > > > > > > behavior
> > > > > > > > > > of
> > > > > > > > > > > > > >> the KIP-848 implementation.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> LM9: Yes, there are several options to do this. 
> > > > > > > > > > > > > >> The idea is to
> > > > > > > > > > have
> > > > > > > > > > > > > >> only one client initialize the topology, not all 
> > > > > > > > > > > > > >> clients. It
> > > > > > > > seems
> > > > > > > > > > > > > >> easier to understand on the protocol level 
> > > > > > > > > > > > > >> (otherwise we'd
> > > > > > > > have N
> > > > > > > > > > > > > >> topology initializations racing with a 
> > > > > > > > > > > > > >> hard-to-determine
> > > > > > > > winner).
> > > > > > > > > > We
> > > > > > > > > > > > > >> also expect the payload of the request to grow in 
> > > > > > > > > > > > > >> the future
> > > > > > > > and
> > > > > > > > > > want
> > > > > > > > > > > > > >> to avoid the overhead of having all clients 
> > > > > > > > > > > > > >> sending the
> > > > > > > > topology
> > > > > > > > > > at
> > > > > > > > > > > > > >> the same time. But initializing the group could 
> > > > > > > > > > > > > >> take some
> > > > > > > > time -
> > > > > > > > > > we
> > > > > > > > > > > > > >> have to create internal topics, and maybe a client 
> > > > > > > > > > > > > >> is
> > > > > > > > > > malfunctioning
> > > > > > > > > > > > > >> and the initialization has to be retried. It 
> > > > > > > > > > > > > >> seemed a bit
> > > > > > > > > > confusing to
> > > > > > > > > > > > > >> return errors to all other clients that are trying 
> > > > > > > > > > > > > >> to join the
> > > > > > > > > > group
> > > > > > > > > > > > > >> during that time - as if there was a problem with 
> > > > > > > > > > > > > >> joining the
> > > > > > > > > > group /
> > > > > > > > > > > > > >> the contents of the heartbeat. It seems cleaner to 
> > > > > > > > > > > > > >> me to let
> > > > > > > > all
> > > > > > > > > > > > > >> clients successfully join the group and heartbeat, 
> > > > > > > > > > > > > >> but remain
> > > > > > > > in
> > > > > > > > > > an
> > > > > > > > > > > > > >> INITIALIZING state which does not yet assign any 
> > > > > > > > > > > > > >> tasks. Does
> > > > > > > > that
> > > > > > > > > > make
> > > > > > > > > > > > > >> sense to you? You are right that returning a 
> > > > > > > > > > > > > >> retriable error
> > > > > > > > and
> > > > > > > > > > > > > >> having all clients retry until the group is 
> > > > > > > > > > > > > >> initialized would
> > > > > > > > also
> > > > > > > > > > > > > >> work, it just doesn't model well that "everything 
> > > > > > > > > > > > > >> is going
> > > > > > > > > > according
> > > > > > > > > > > > > >> to plan".
> > > > > > > > > > > > > >> As for the order of the calls - yes, I think it is 
> > > > > > > > > > > > > >> fine to
> > > > > > > > allow
> > > > > > > > > > an
> > > > > > > > > > > > > >> Initialize RPC before the first heartbeat for 
> > > > > > > > > > > > > >> supporting
> > > > > > > > future
> > > > > > > > > > admin
> > > > > > > > > > > > > >> tools. I made this change throughout the KIP, 
> > > > > > > > > > > > > >> thanks!
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> AS11: Yes, your understanding is correct. The 
> > > > > > > > > > > > > >> number of tasks
> > > > > > > > for
> > > > > > > > > > one
> > > > > > > > > > > > > >> subtopology is the maximum number of partitions in 
> > > > > > > > > > > > > >> any of the
> > > > > > > > > > matched
> > > > > > > > > > > > > >> topics. What will happen in Kafka Streams is that 
> > > > > > > > > > > > > >> the
> > > > > > > > partitions
> > > > > > > > > > of
> > > > > > > > > > > > > >> the matched topics will effectively be merged 
> > > > > > > > > > > > > >> during stream
> > > > > > > > > > > > > >> processing, so in your example, subtopology:0 
> > > > > > > > > > > > > >> would consume
> > > > > > > > from
> > > > > > > > > > AB:0
> > > > > > > > > > > > > >> and AC:0.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Cheers,
> > > > > > > > > > > > > >> Lucas
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On Fri, Aug 2, 2024 at 9:47 PM Lianet M. 
> > > > > > > > > > > > > >> <liane...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> Hi Bruno, answering your questions:
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> About the full heartbeat (LM1): I just wanted to 
> > > > > > > > > > > > > >>> confirm that
> > > > > > > > > > you'll
> > > > > > > > > > > > be
> > > > > > > > > > > > > >>> sending full HBs in case of errors in general. 
> > > > > > > > > > > > > >>> It's not clear
> > > > > > > > > > from
> > > > > > > > > > > > the KIP,
> > > > > > > > > > > > > >>> since it referred to sending Id/epoch and 
> > > > > > > > > > > > > >>> whatever had
> > > > > > > > changed
> > > > > > > > > > since
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>> last HB only. Sending full HB on error is key to 
> > > > > > > > > > > > > >>> ensure fresh
> > > > > > > > > > > > rejoins after
> > > > > > > > > > > > > >>> fencing for instance, and retries with all 
> > > > > > > > > > > > > >>> relevant info.
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> About the instanceId (LM2): The instanceId is 
> > > > > > > > > > > > > >>> needed on
> > > > > > > > every HB
> > > > > > > > > > to
> > > > > > > > > > > > be able
> > > > > > > > > > > > > >>> to identify a member using one that is already 
> > > > > > > > > > > > > >>> taken. On
> > > > > > > > every
> > > > > > > > > > HB,
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>> broker uses the instance id (if any) to retrieve 
> > > > > > > > > > > > > >>> the member
> > > > > > > > ID
> > > > > > > > > > > > associated
> > > > > > > > > > > > > >>> with it, and checks it against the memberId 
> > > > > > > > > > > > > >>> received in the
> > > > > > > > HB
> > > > > > > > > > > > > >>> (throwing UnreleasedInstance exception if 
> > > > > > > > > > > > > >>> needed). So
> > > > > > > > similar to
> > > > > > > > > > my
> > > > > > > > > > > > > >>> previous point, just wanted to confirm that we are
> > > > > > > > considering
> > > > > > > > > > that
> > > > > > > > > > > > here
> > > > > > > > > > > > > >>> too.
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> Now some other thoughts:
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> LM9: Definitely interesting imo if we can avoid 
> > > > > > > > > > > > > >>> the
> > > > > > > > dependency
> > > > > > > > > > > > between the
> > > > > > > > > > > > > >>> StreamsGroupInitialize and the 
> > > > > > > > > > > > > >>> StreamsGroupHeartbeat. I
> > > > > > > > totally
> > > > > > > > > > get
> > > > > > > > > > > > that
> > > > > > > > > > > > > >>> the initial client implementation will do a HB 
> > > > > > > > > > > > > >>> first, and
> > > > > > > > that's
> > > > > > > > > > > > fine, but
> > > > > > > > > > > > > >>> not having the flow enforced at the protocol 
> > > > > > > > > > > > > >>> level would
> > > > > > > > allow
> > > > > > > > > > for
> > > > > > > > > > > > further
> > > > > > > > > > > > > >>> improvement in the future (that initialize via 
> > > > > > > > > > > > > >>> admin idea you
> > > > > > > > > > > > mentioned,
> > > > > > > > > > > > > >>> for instance). Actually, I may be missing 
> > > > > > > > > > > > > >>> something about
> > > > > > > > the HB,
> > > > > > > > > > > > but if we
> > > > > > > > > > > > > >>> are at the point where HB requires that the 
> > > > > > > > > > > > > >>> topology has been
> > > > > > > > > > > > initialized,
> > > > > > > > > > > > > >>> and the topology init requires the group, why is 
> > > > > > > > > > > > > >>> it the
> > > > > > > > heartbeat
> > > > > > > > > > > > RPC the
> > > > > > > > > > > > > >>> one responsible for the group creation? (vs.
> > > > > > > > > > StreamsGroupInitialize
> > > > > > > > > > > > creates
> > > > > > > > > > > > > >>> group if needed + HB just fails if topology not 
> > > > > > > > > > > > > >>> initialized)
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> Thanks!
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> Lianet
> > > > > > > > > > > > > >>> (I didn't miss your answer on my 
> > > > > > > > > > > > > >>> INVALID_GROUP_TYPE proposal,
> > > > > > > > > > just
> > > > > > > > > > > > still
> > > > > > > > > > > > > >>> thinking about it in sync with the same 
> > > > > > > > > > > > > >>> discussion we're
> > > > > > > > having
> > > > > > > > > > on
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>> KIP-1043 thread...I'll come back on that)
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> On Thu, Aug 1, 2024 at 10:55 AM Andrew Schofield <
> > > > > > > > > > > > andrew_schofi...@live.com>
> > > > > > > > > > > > > >>> wrote:
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>>> Hi Bruno,
> > > > > > > > > > > > > >>>> Thanks for adding the detail on the schemas on 
> > > > > > > > > > > > > >>>> records
> > > > > > > > written
> > > > > > > > > > to
> > > > > > > > > > > > > >>>> __consumer_offsets.
> > > > > > > > > > > > > >>>> I’ve reviewed them in detail and they look good 
> > > > > > > > > > > > > >>>> to me. I
> > > > > > > > have
> > > > > > > > > > one
> > > > > > > > > > > > naive
> > > > > > > > > > > > > >>>> question.
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> AS11: I notice that an assignment is essentially 
> > > > > > > > > > > > > >>>> a set of
> > > > > > > > > > partition
> > > > > > > > > > > > > >>>> indices for
> > > > > > > > > > > > > >>>> subtopologies. Since a subtopology can be 
> > > > > > > > > > > > > >>>> defined by a
> > > > > > > > source
> > > > > > > > > > topic
> > > > > > > > > > > > regex,
> > > > > > > > > > > > > >>>> does
> > > > > > > > > > > > > >>>> this mean that an assignment gives the same set 
> > > > > > > > > > > > > >>>> of partition
> > > > > > > > > > > > indices for
> > > > > > > > > > > > > >>>> all topics
> > > > > > > > > > > > > >>>> which happen to match the regex? So, a 
> > > > > > > > > > > > > >>>> subtopology reading
> > > > > > > > from
> > > > > > > > > > A*
> > > > > > > > > > > > that
> > > > > > > > > > > > > >>>> matches
> > > > > > > > > > > > > >>>> AB and AC would give the same set of partitions 
> > > > > > > > > > > > > >>>> to each
> > > > > > > > task for
> > > > > > > > > > > > both
> > > > > > > > > > > > > >>>> topics, and
> > > > > > > > > > > > > >>>> is not able to give AB:0 to one task and AC:0 to 
> > > > > > > > > > > > > >>>> a different
> > > > > > > > > > task.
> > > > > > > > > > > > Is this
> > > > > > > > > > > > > >>>> correct?
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>> Thanks,
> > > > > > > > > > > > > >>>> Andrew
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > >>>>> On 23 Jul 2024, at 16:16, Bruno Cadonna <
> > > > > > > > cado...@apache.org>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> Hi Lianet,
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> Thanks for the review!
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> Here my answers:
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> LM1. Is your question whether we need to send a 
> > > > > > > > > > > > > >>>>> full
> > > > > > > > heartbeat
> > > > > > > > > > > > each time
> > > > > > > > > > > > > >>>> the member re-joins the group even if the 
> > > > > > > > > > > > > >>>> information in
> > > > > > > > the RPC
> > > > > > > > > > > > did not
> > > > > > > > > > > > > >>>> change since the last heartbeat?
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> LM2. Is the reason for sending the instance ID 
> > > > > > > > > > > > > >>>>> each time
> > > > > > > > that a
> > > > > > > > > > > > member
> > > > > > > > > > > > > >>>> could shutdown, change the instance ID and then 
> > > > > > > > > > > > > >>>> start and
> > > > > > > > > > heartbeat
> > > > > > > > > > > > again,
> > > > > > > > > > > > > >>>> but the group coordinator would never notice 
> > > > > > > > > > > > > >>>> that the
> > > > > > > > instance
> > > > > > > > > > ID
> > > > > > > > > > > > changed?
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> LM3. I see your point. I am wondering whether 
> > > > > > > > > > > > > >>>>> this
> > > > > > > > additional
> > > > > > > > > > > > > >>>> information is worth the dependency between the 
> > > > > > > > > > > > > >>>> group
> > > > > > > > types. To
> > > > > > > > > > > > return
> > > > > > > > > > > > > >>>> INVALID_GROUP_TYPE, the group coordinator needs 
> > > > > > > > > > > > > >>>> to know
> > > > > > > > that a
> > > > > > > > > > > > group ID
> > > > > > > > > > > > > >>>> exists with a different group type. With a group
> > > > > > > > coordinator as
> > > > > > > > > > we
> > > > > > > > > > > > have it
> > > > > > > > > > > > > >>>> now in Apache Kafka that manages all group 
> > > > > > > > > > > > > >>>> types, that is
> > > > > > > > not a
> > > > > > > > > > big
> > > > > > > > > > > > deal,
> > > > > > > > > > > > > >>>> but imagine if we (or some implementation of the 
> > > > > > > > > > > > > >>>> Apache
> > > > > > > > Kafka
> > > > > > > > > > > > protocol)
> > > > > > > > > > > > > >>>> decide to have a separate group coordinator for 
> > > > > > > > > > > > > >>>> each group
> > > > > > > > type.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> LM4. Using INVALID_GROUP_ID if the group ID is 
> > > > > > > > > > > > > >>>>> empty makes
> > > > > > > > > > sense
> > > > > > > > > > > > to me.
> > > > > > > > > > > > > >>>> I going to change that.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> LM5. I think there is a dependency from the
> > > > > > > > > > StreamsGroupInitialize
> > > > > > > > > > > > RPC
> > > > > > > > > > > > > >>>> to the heartbeat. The group must exist when the 
> > > > > > > > > > > > > >>>> initialize
> > > > > > > > RPC
> > > > > > > > > > is
> > > > > > > > > > > > received
> > > > > > > > > > > > > >>>> by the group coordinator. The group is created 
> > > > > > > > > > > > > >>>> by the
> > > > > > > > heartbeat
> > > > > > > > > > > > RPC. I
> > > > > > > > > > > > > >>>> would be in favor of making the initialize RPC 
> > > > > > > > > > > > > >>>> independent
> > > > > > > > from
> > > > > > > > > > the
> > > > > > > > > > > > > >>>> heartbeat RPC. That would allow to initialize a 
> > > > > > > > > > > > > >>>> streams
> > > > > > > > group
> > > > > > > > > > > > explicitly
> > > > > > > > > > > > > >>>> with an admin tool.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> LM6. I think it affects streams and streams 
> > > > > > > > > > > > > >>>>> should behave
> > > > > > > > as
> > > > > > > > > > the
> > > > > > > > > > > > > >>>> consumer group.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> LM7. Good point that we will consider.
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> LM8. Fixed! Thanks!
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> Best,
> > > > > > > > > > > > > >>>>> Bruno
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > >>>>> On 7/19/24 9:53 PM, Lianet M. wrote:
> > > > > > > > > > > > > >>>>>> Hi Lucas/Bruno, thanks for the great KIP! 
> > > > > > > > > > > > > >>>>>> First comments:
> > > > > > > > > > > > > >>>>>> LM1. Related to where the KIP says:  *“Group 
> > > > > > > > > > > > > >>>>>> ID, member
> > > > > > > > ID,
> > > > > > > > > > > > member epoch
> > > > > > > > > > > > > >>>>>> are sent with each heartbeat request. Any other
> > > > > > > > information
> > > > > > > > > > that
> > > > > > > > > > > > has not
> > > > > > > > > > > > > >>>>>> changed since the last heartbeat can be 
> > > > > > > > > > > > > >>>>>> omitted.”. *I
> > > > > > > > expect
> > > > > > > > > > all
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>>> other
> > > > > > > > > > > > > >>>>>> info also needs to be sent whenever a full 
> > > > > > > > > > > > > >>>>>> heartbeat is
> > > > > > > > > > required
> > > > > > > > > > > > (even
> > > > > > > > > > > > > >>>> if
> > > > > > > > > > > > > >>>>>> it didn’t change from the last heartbeat), ex. 
> > > > > > > > > > > > > >>>>>> on fencing
> > > > > > > > > > > > scenarios,
> > > > > > > > > > > > > >>>>>> correct?
> > > > > > > > > > > > > >>>>>> LM2. For consumer groups we always send the
> > > > > > > > groupInstanceId
> > > > > > > > > > (if
> > > > > > > > > > > > any) as
> > > > > > > > > > > > > >>>>>> part of every heartbeat, along with memberId, 
> > > > > > > > > > > > > >>>>>> epoch and
> > > > > > > > > > groupId.
> > > > > > > > > > > > Should
> > > > > > > > > > > > > >>>> we
> > > > > > > > > > > > > >>>>>> consider that too here?
> > > > > > > > > > > > > >>>>>> LM3. We’re proposing returning a 
> > > > > > > > > > > > > >>>>>> GROUP_ID_NOT_FOUND error
> > > > > > > > in
> > > > > > > > > > > > response to
> > > > > > > > > > > > > >>>>>> the stream-specific RPCs if the groupId is 
> > > > > > > > > > > > > >>>>>> associated
> > > > > > > > with a
> > > > > > > > > > > > group type
> > > > > > > > > > > > > >>>>>> that is not streams (ie. consumer group or 
> > > > > > > > > > > > > >>>>>> share group). I
> > > > > > > > > > wonder
> > > > > > > > > > > > if at
> > > > > > > > > > > > > >>>>>> this point, where we're getting several new 
> > > > > > > > > > > > > >>>>>> group types
> > > > > > > > added,
> > > > > > > > > > > > each with
> > > > > > > > > > > > > >>>>>> RPCs that are supposed to include groupId of a 
> > > > > > > > > > > > > >>>>>> certain
> > > > > > > > type,
> > > > > > > > > > we
> > > > > > > > > > > > should
> > > > > > > > > > > > > >>>> be
> > > > > > > > > > > > > >>>>>> more explicit about this situation. Maybe a 
> > > > > > > > > > > > > >>>>>> kind of
> > > > > > > > > > > > INVALID_GROUP_TYPE
> > > > > > > > > > > > > >>>>>> (group exists but not with a valid type for 
> > > > > > > > > > > > > >>>>>> this RPC) vs a
> > > > > > > > > > > > > >>>>>> GROUP_ID_NOT_FOUND (group does not exist).  
> > > > > > > > > > > > > >>>>>> Those errors
> > > > > > > > > > would be
> > > > > > > > > > > > > >>>>>> consistently used across consumer, share, and 
> > > > > > > > > > > > > >>>>>> streams RPCs
> > > > > > > > > > > > whenever the
> > > > > > > > > > > > > >>>>>> group id is not of the expected type.
> > > > > > > > > > > > > >>>>>> This is truly not specific to this KIP, and 
> > > > > > > > > > > > > >>>>>> should be
> > > > > > > > > > addressed
> > > > > > > > > > > > with all
> > > > > > > > > > > > > >>>>>> group types and their RPCs in mind. I just 
> > > > > > > > > > > > > >>>>>> wanted to bring
> > > > > > > > > > out my
> > > > > > > > > > > > > >>>> concern
> > > > > > > > > > > > > >>>>>> and get thoughts around it.
> > > > > > > > > > > > > >>>>>> LM4. On a related note, StreamsGroupDescribe 
> > > > > > > > > > > > > >>>>>> returns
> > > > > > > > > > > > INVALID_REQUEST if
> > > > > > > > > > > > > >>>>>> groupId is empty. There is already an 
> > > > > > > > > > > > > >>>>>> INVALID_GROUP_ID
> > > > > > > > error,
> > > > > > > > > > > > that seems
> > > > > > > > > > > > > >>>>>> more specific to this situation. Error 
> > > > > > > > > > > > > >>>>>> handling of
> > > > > > > > specific
> > > > > > > > > > > > errors would
> > > > > > > > > > > > > >>>>>> definitely be easier than having to deal with 
> > > > > > > > > > > > > >>>>>> a generic
> > > > > > > > > > > > INVALID_REQUEST
> > > > > > > > > > > > > >>>>>> (and probably its custom message). I know that 
> > > > > > > > > > > > > >>>>>> for
> > > > > > > > KIP-848 we
> > > > > > > > > > have
> > > > > > > > > > > > > >>>>>> INVALID_REQUEST for similar situations, so if 
> > > > > > > > > > > > > >>>>>> ever we take
> > > > > > > > > > down
> > > > > > > > > > > > this
> > > > > > > > > > > > > >>>> path
> > > > > > > > > > > > > >>>>>> we should review it there too for consistency. 
> > > > > > > > > > > > > >>>>>> Thoughts?
> > > > > > > > > > > > > >>>>>> LM5. The dependency between the 
> > > > > > > > > > > > > >>>>>> StreamsGroupHeartbeat RPC
> > > > > > > > and
> > > > > > > > > > the
> > > > > > > > > > > > > >>>>>> StreamsGroupInitialize RPC is one-way only 
> > > > > > > > > > > > > >>>>>> right? HB
> > > > > > > > requires
> > > > > > > > > > a
> > > > > > > > > > > > previous
> > > > > > > > > > > > > >>>>>> StreamsGroupInitialize request, but 
> > > > > > > > > > > > > >>>>>> StreamsGroupInitialize
> > > > > > > > > > > > processing is
> > > > > > > > > > > > > >>>>>> totally independent of heartbeats (and could 
> > > > > > > > > > > > > >>>>>> perfectly be
> > > > > > > > > > > > processed
> > > > > > > > > > > > > >>>> without
> > > > > > > > > > > > > >>>>>> a previous HB, even though the client 
> > > > > > > > > > > > > >>>>>> implementation we’re
> > > > > > > > > > > > proposing
> > > > > > > > > > > > > >>>> won’t
> > > > > > > > > > > > > >>>>>> go down that path). Is my understanding 
> > > > > > > > > > > > > >>>>>> correct? Just to
> > > > > > > > > > double
> > > > > > > > > > > > check,
> > > > > > > > > > > > > >>>>>> seems sensible like that at the protocol level.
> > > > > > > > > > > > > >>>>>> LM6. With KIP-848, there is an important 
> > > > > > > > > > > > > >>>>>> improvement that
> > > > > > > > > > brings a
> > > > > > > > > > > > > >>>>>> difference in behaviour around the static 
> > > > > > > > > > > > > >>>>>> membership:
> > > > > > > > with the
> > > > > > > > > > > > classic
> > > > > > > > > > > > > >>>>>> protocol, if a static member joins with a 
> > > > > > > > > > > > > >>>>>> group instance
> > > > > > > > > > already
> > > > > > > > > > > > in
> > > > > > > > > > > > > >>>> use, it
> > > > > > > > > > > > > >>>>>> makes the initial member fail with a 
> > > > > > > > > > > > > >>>>>> FENCED_INSTANCED_ID
> > > > > > > > > > > > exception, vs.
> > > > > > > > > > > > > >>>>>> with the new consumer group protocol, the 
> > > > > > > > > > > > > >>>>>> second member
> > > > > > > > > > trying to
> > > > > > > > > > > > join
> > > > > > > > > > > > > >>>>>> fails with an UNRELEASED_INSTANCE_ID. Does 
> > > > > > > > > > > > > >>>>>> this change
> > > > > > > > need
> > > > > > > > > > to be
> > > > > > > > > > > > > >>>>>> considered in any way for the streams app? 
> > > > > > > > > > > > > >>>>>> (I'm not
> > > > > > > > familiar
> > > > > > > > > > with
> > > > > > > > > > > > KS
> > > > > > > > > > > > > >>>> yet,
> > > > > > > > > > > > > >>>>>> but thought it was worth asking. If it doesn't 
> > > > > > > > > > > > > >>>>>> affect in
> > > > > > > > any
> > > > > > > > > > way,
> > > > > > > > > > > > still
> > > > > > > > > > > > > >>>>>> maybe helpful to call it out on a section for 
> > > > > > > > > > > > > >>>>>> static
> > > > > > > > > > membership)
> > > > > > > > > > > > > >>>>>> LM7. Regarding the admin tool to manage 
> > > > > > > > > > > > > >>>>>> streams groups.
> > > > > > > > We can
> > > > > > > > > > > > discuss
> > > > > > > > > > > > > >>>>>> whether to have it here or separately, but I 
> > > > > > > > > > > > > >>>>>> think we
> > > > > > > > should
> > > > > > > > > > aim
> > > > > > > > > > > > for
> > > > > > > > > > > > > >>>> some
> > > > > > > > > > > > > >>>>>> basic admin capabilities from the start, 
> > > > > > > > > > > > > >>>>>> mainly because I
> > > > > > > > > > believe
> > > > > > > > > > > > it
> > > > > > > > > > > > > >>>> will
> > > > > > > > > > > > > >>>>>> be very helpful/needed in practice during the 
> > > > > > > > > > > > > >>>>>> impl of the
> > > > > > > > KIP.
> > > > > > > > > > > > From
> > > > > > > > > > > > > >>>>>> experience with KIP-848, we felt a bit 
> > > > > > > > > > > > > >>>>>> blindfolded in the
> > > > > > > > > > initial
> > > > > > > > > > > > phase
> > > > > > > > > > > > > >>>>>> where we still didn't have 
> > > > > > > > > > > > > >>>>>> kafka-consumer-groups dealing
> > > > > > > > with
> > > > > > > > > > the
> > > > > > > > > > > > new
> > > > > > > > > > > > > >>>>>> groups (and then it was very helpful and used 
> > > > > > > > > > > > > >>>>>> when we were
> > > > > > > > > > able to
> > > > > > > > > > > > > >>>> easily
> > > > > > > > > > > > > >>>>>> inspect them from the console)
> > > > > > > > > > > > > >>>>>> LM8. nit: the links the KIP-848 are not quite 
> > > > > > > > > > > > > >>>>>> right
> > > > > > > > (pointing
> > > > > > > > > > to
> > > > > > > > > > > > an
> > > > > > > > > > > > > >>>>>> unrelated “Future work section” at the end of 
> > > > > > > > > > > > > >>>>>> KIP-848)
> > > > > > > > > > > > > >>>>>> Thanks!
> > > > > > > > > > > > > >>>>>> Lianet
> > > > > > > > > > > > > >>>>>> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy
> > > > > > > > > > > > > >>>>>> <lbruts...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > >>>>>>> Hi Andrew,
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> AS2: I added a note for now. If others feel 
> > > > > > > > > > > > > >>>>>>> strongly
> > > > > > > > about
> > > > > > > > > > it,
> > > > > > > > > > > > we can
> > > > > > > > > > > > > >>>>>>> still add more administrative tools to the 
> > > > > > > > > > > > > >>>>>>> KIP - it
> > > > > > > > should
> > > > > > > > > > not
> > > > > > > > > > > > change
> > > > > > > > > > > > > >>>>>>> the overall story significantly.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> AS8: "streams.group.assignor.name" sounds 
> > > > > > > > > > > > > >>>>>>> good to me to
> > > > > > > > > > > > distinguish
> > > > > > > > > > > > > >>>>>>> the config from class names. Not sure if I 
> > > > > > > > > > > > > >>>>>>> like the
> > > > > > > > > > "default".
> > > > > > > > > > > > To be
> > > > > > > > > > > > > >>>>>>> consistent, we'd then have to call it
> > > > > > > > > > > > > >>>>>>> `group.streams.default.session.timeout.ms` as 
> > > > > > > > > > > > > >>>>>>> well. I
> > > > > > > > only
> > > > > > > > > > > > added the
> > > > > > > > > > > > > >>>>>>> `.name` on both broker and group level for 
> > > > > > > > > > > > > >>>>>>> now.
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> AS10: Ah, I misread your comment, now I know 
> > > > > > > > > > > > > >>>>>>> what you
> > > > > > > > meant.
> > > > > > > > > > Good
> > > > > > > > > > > > > >>>>>>> point, fixed (by Bruno).
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> Cheers,
> > > > > > > > > > > > > >>>>>>> Lucas
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>>>> On Fri, Jul 19, 2024 at 4:44 PM Andrew 
> > > > > > > > > > > > > >>>>>>> Schofield
> > > > > > > > > > > > > >>>>>>> <andrew_schofi...@live.com> wrote:
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>>> Hi Lucas,
> > > > > > > > > > > > > >>>>>>>> I see that I hit send too quickly. One more 
> > > > > > > > > > > > > >>>>>>>> comment:
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>>> AS2: I think stating that there will be a
> > > > > > > > > > > > `kafka-streams-group.sh` in
> > > > > > > > > > > > > >>>> a
> > > > > > > > > > > > > >>>>>>>> future KIP is fine to keep this KIP focused.
> > > > > > > > Personally, I
> > > > > > > > > > would
> > > > > > > > > > > > > >>>> probably
> > > > > > > > > > > > > >>>>>>>> put all of the gory details in this KIP, but 
> > > > > > > > > > > > > >>>>>>>> then it’s
> > > > > > > > not
> > > > > > > > > > my
> > > > > > > > > > > > KIP. A
> > > > > > > > > > > > > >>>>>>> future
> > > > > > > > > > > > > >>>>>>>> pointer is fine too.
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>>> Thanks,
> > > > > > > > > > > > > >>>>>>>> Andrew
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy <
> > > > > > > > > > > > lbruts...@confluent.io
> > > > > > > > > > > > > >>>> .INVALID>
> > > > > > > > > > > > > >>>>>>> wrote:
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> Hi Andrew,
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> thanks for getting the discussion going! 
> > > > > > > > > > > > > >>>>>>>>> Here are my
> > > > > > > > > > responses.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> AS1: Good point, done.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> AS2: We were planning to add more 
> > > > > > > > > > > > > >>>>>>>>> administrative tools
> > > > > > > > to
> > > > > > > > > > the
> > > > > > > > > > > > > >>>>>>>>> interface in a follow-up KIP, to not make 
> > > > > > > > > > > > > >>>>>>>>> this KIP too
> > > > > > > > > > large.
> > > > > > > > > > > > If
> > > > > > > > > > > > > >>>>>>>>> people think that it would help to 
> > > > > > > > > > > > > >>>>>>>>> understand the
> > > > > > > > overall
> > > > > > > > > > > > picture if
> > > > > > > > > > > > > >>>>>>>>> we already add something like
> > > > > > > > `kafka-streams-groups.sh`, we
> > > > > > > > > > > > will do
> > > > > > > > > > > > > >>>>>>>>> that. I also agree that we should address 
> > > > > > > > > > > > > >>>>>>>>> how this
> > > > > > > > relates
> > > > > > > > > > to
> > > > > > > > > > > > > >>>>>>>>> KIP-1043, we'll add it.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> AS3: Good idea, that's more consistent with
> > > > > > > > `assigning` and
> > > > > > > > > > > > > >>>>>>> `reconciling` etc.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> AS4: Thanks, Fixed.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> AS5: Good catch. This was supposed to mean 
> > > > > > > > > > > > > >>>>>>>>> that we
> > > > > > > > require
> > > > > > > > > > > > CREATE on
> > > > > > > > > > > > > >>>>>>>>> cluster or CREATE on all topics, not both. 
> > > > > > > > > > > > > >>>>>>>>> Fixed.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> AS6: Thanks, Fixed.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> AS7. Thanks, Fixed.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> AS8: I think this works a bit different in 
> > > > > > > > > > > > > >>>>>>>>> this KIP
> > > > > > > > than in
> > > > > > > > > > > > consumer
> > > > > > > > > > > > > >>>>>>>>> groups. KIP-848 lets the members vote for a 
> > > > > > > > > > > > > >>>>>>>>> preferred
> > > > > > > > > > > > assignor, and
> > > > > > > > > > > > > >>>>>>>>> the broker-side assignor is picked by 
> > > > > > > > > > > > > >>>>>>>>> majority vote.
> > > > > > > > The
> > > > > > > > > > > > > >>>>>>>>> `group.consumer.assignors` specifies the 
> > > > > > > > > > > > > >>>>>>>>> list of
> > > > > > > > assignors
> > > > > > > > > > > > that are
> > > > > > > > > > > > > >>>>>>>>> supported on the broker, and is 
> > > > > > > > > > > > > >>>>>>>>> configurable because
> > > > > > > > the
> > > > > > > > > > > > interface is
> > > > > > > > > > > > > >>>>>>>>> pluggable. In this KIP, the task assignor 
> > > > > > > > > > > > > >>>>>>>>> is not voted
> > > > > > > > on
> > > > > > > > > > by
> > > > > > > > > > > > members
> > > > > > > > > > > > > >>>>>>>>> but configured on the broker-side.
> > > > > > > > > > `group.streams.assignor` is
> > > > > > > > > > > > used
> > > > > > > > > > > > > >>>>>>>>> for this, and uses a specific name. If 
> > > > > > > > > > > > > >>>>>>>>> we'll make the
> > > > > > > > task
> > > > > > > > > > > > assignor
> > > > > > > > > > > > > >>>>>>>>> pluggable on the broker-side, we'd 
> > > > > > > > > > > > > >>>>>>>>> introduce a separate
> > > > > > > > > > config
> > > > > > > > > > > > > >>>>>>>>> `group.streams.assignors`, which would 
> > > > > > > > > > > > > >>>>>>>>> indeed be a
> > > > > > > > list of
> > > > > > > > > > > > class
> > > > > > > > > > > > > >>>>>>>>> names. I think there is no conflict here, 
> > > > > > > > > > > > > >>>>>>>>> the two
> > > > > > > > > > > > configurations
> > > > > > > > > > > > > >>>> serve
> > > > > > > > > > > > > >>>>>>>>> different purposes.  The only gripe I'd 
> > > > > > > > > > > > > >>>>>>>>> have here is
> > > > > > > > > > naming as
> > > > > > > > > > > > > >>>>>>>>> `group.streams.assignor` and 
> > > > > > > > > > > > > >>>>>>>>> `group.streams.assignors`
> > > > > > > > > > would
> > > > > > > > > > > > be a bit
> > > > > > > > > > > > > >>>>>>>>> similar, but I cannot really think of a 
> > > > > > > > > > > > > >>>>>>>>> better name for
> > > > > > > > > > > > > >>>>>>>>> `group.streams.assignor`, so I'd probably 
> > > > > > > > > > > > > >>>>>>>>> rather
> > > > > > > > introduce
> > > > > > > > > > > > > >>>>>>>>> `group.streams.assignors`  as
> > > > > > > > > > > > `group.streams.possible_assignors`  or
> > > > > > > > > > > > > >>>>>>>>> something like that.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> AS9: I added explanations for the various 
> > > > > > > > > > > > > >>>>>>>>> record types.
> > > > > > > > > > Apart
> > > > > > > > > > > > from
> > > > > > > > > > > > > >>>> the
> > > > > > > > > > > > > >>>>>>>>> new topology record, and the partition 
> > > > > > > > > > > > > >>>>>>>>> metadata (which
> > > > > > > > is
> > > > > > > > > > > > based on
> > > > > > > > > > > > > >>>> the
> > > > > > > > > > > > > >>>>>>>>> topology and can only be created once we 
> > > > > > > > > > > > > >>>>>>>>> have a
> > > > > > > > topology
> > > > > > > > > > > > initialized)
> > > > > > > > > > > > > >>>>>>>>> the lifecycle for the records is basically 
> > > > > > > > > > > > > >>>>>>>>> identical
> > > > > > > > as in
> > > > > > > > > > > > KIP-848.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> AS10: In the consumer offset topic, the 
> > > > > > > > > > > > > >>>>>>>>> version in the
> > > > > > > > key
> > > > > > > > > > is
> > > > > > > > > > > > used to
> > > > > > > > > > > > > >>>>>>>>> differentiate different schema types with 
> > > > > > > > > > > > > >>>>>>>>> the same
> > > > > > > > > > content. So
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>>>>>>>> keys are not versioned, but the version 
> > > > > > > > > > > > > >>>>>>>>> field is
> > > > > > > > "abused"
> > > > > > > > > > as a
> > > > > > > > > > > > type
> > > > > > > > > > > > > >>>>>>>>> tag. This is the same in KIP-848, we 
> > > > > > > > > > > > > >>>>>>>>> followed it for
> > > > > > > > > > > > consistency.
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> Cheers,
> > > > > > > > > > > > > >>>>>>>>> Lucas
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew 
> > > > > > > > > > > > > >>>>>>>>> Schofield
> > > > > > > > > > > > > >>>>>>>>> <andrew_schofi...@live.com> wrote:
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> Hi Lucas and Bruno,
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> Thanks for the great KIP.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> I've read through the document and have 
> > > > > > > > > > > > > >>>>>>>>>> some initial
> > > > > > > > > > comments.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> AS1: I suppose that there is a new
> > > > > > > > > > > > o.a.k.common.GroupType.STREAMS
> > > > > > > > > > > > > >>>>>>> enumeration
> > > > > > > > > > > > > >>>>>>>>>> constant. This is a change to the public 
> > > > > > > > > > > > > >>>>>>>>>> interface and
> > > > > > > > > > should
> > > > > > > > > > > > be
> > > > > > > > > > > > > >>>>>>> called out.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> AS2: Since streams groups are no longer 
> > > > > > > > > > > > > >>>>>>>>>> consumer
> > > > > > > > groups,
> > > > > > > > > > how
> > > > > > > > > > > > does
> > > > > > > > > > > > > >>>> the
> > > > > > > > > > > > > >>>>>>> user
> > > > > > > > > > > > > >>>>>>>>>> manipulate them, observe lag and so on? 
> > > > > > > > > > > > > >>>>>>>>>> Will you add
> > > > > > > > > > > > > >>>>>>> `kafka-streams-groups.sh`
> > > > > > > > > > > > > >>>>>>>>>> or extend 
> > > > > > > > > > > > > >>>>>>>>>> `kafka-streams-application-reset.sh`? Of
> > > > > > > > course,
> > > > > > > > > > > > KIP-1043
> > > > > > > > > > > > > >>>>>>> can easily
> > > > > > > > > > > > > >>>>>>>>>> be extended to support streams groups, but 
> > > > > > > > > > > > > >>>>>>>>>> that only
> > > > > > > > lets
> > > > > > > > > > the
> > > > > > > > > > > > user
> > > > > > > > > > > > > >>>>>>> see the
> > > > > > > > > > > > > >>>>>>>>>> groups, not manipulate them.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> AS3: I wonder whether the streams group 
> > > > > > > > > > > > > >>>>>>>>>> state of
> > > > > > > > > > > > UNINITIALIZED would
> > > > > > > > > > > > > >>>>>>> be
> > > > > > > > > > > > > >>>>>>>>>> better expressed as INITIALIZING.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> AS4: In StreamsGroupInitializeRequest,
> > > > > > > > > > > > Topology[].SourceTopicRegex
> > > > > > > > > > > > > >>>>>>> should
> > > > > > > > > > > > > >>>>>>>>>> be nullable.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> AS5: Why does StreamsGroupInitialize 
> > > > > > > > > > > > > >>>>>>>>>> require CREATE
> > > > > > > > > > > > permission on
> > > > > > > > > > > > > >>>> the
> > > > > > > > > > > > > >>>>>>>>>> cluster resource? I imagine that this is 
> > > > > > > > > > > > > >>>>>>>>>> one of the
> > > > > > > > ways
> > > > > > > > > > that
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>>>>>> request might
> > > > > > > > > > > > > >>>>>>>>>> be granted permission to create the
> > > > > > > > StateChangelogTopics
> > > > > > > > > > and
> > > > > > > > > > > > > >>>>>>>>>> RepartitionSourceTopics, but if it is 
> > > > > > > > > > > > > >>>>>>>>>> granted
> > > > > > > > permission
> > > > > > > > > > to
> > > > > > > > > > > > create
> > > > > > > > > > > > > >>>>>>> those topics
> > > > > > > > > > > > > >>>>>>>>>> with specific ACLs, would CREATE on the 
> > > > > > > > > > > > > >>>>>>>>>> cluster
> > > > > > > > resource
> > > > > > > > > > > > still be
> > > > > > > > > > > > > >>>>>>> required?
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> AS6: StreamsGroupInitialize can also fail 
> > > > > > > > > > > > > >>>>>>>>>> with
> > > > > > > > > > > > > >>>>>>> TOPIC_AUTHORIZATION_FAILED
> > > > > > > > > > > > > >>>>>>>>>> and (subject to AS5) 
> > > > > > > > > > > > > >>>>>>>>>> CLUSTER_AUTHORIZATION_FAILED.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> AS7: A tiny nit. You've used TopologyID 
> > > > > > > > > > > > > >>>>>>>>>> (capitals) in
> > > > > > > > > > > > > >>>>>>> StreamsGroupHeartbeatRequest
> > > > > > > > > > > > > >>>>>>>>>> and a few others, but in all other cases 
> > > > > > > > > > > > > >>>>>>>>>> the fields
> > > > > > > > which
> > > > > > > > > > are
> > > > > > > > > > > > ids
> > > > > > > > > > > > > >>>> are
> > > > > > > > > > > > > >>>>>>> spelled Id.
> > > > > > > > > > > > > >>>>>>>>>> I suggest TopologyId.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> Also, "interal" is probably meant to be 
> > > > > > > > > > > > > >>>>>>>>>> "interval”.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> AS8: For consumer groups, the
> > > > > > > > `group.consumer.assignors`
> > > > > > > > > > > > > >>>>>>> configuration is a
> > > > > > > > > > > > > >>>>>>>>>> list of class names. The assignors do have 
> > > > > > > > > > > > > >>>>>>>>>> names too,
> > > > > > > > but
> > > > > > > > > > the
> > > > > > > > > > > > > >>>>>>> configuration which
> > > > > > > > > > > > > >>>>>>>>>> enables them is in terms of class names. I 
> > > > > > > > > > > > > >>>>>>>>>> wonder
> > > > > > > > whether
> > > > > > > > > > the
> > > > > > > > > > > > > >>>> broker’s
> > > > > > > > > > > > > >>>>>>>>>> group.streams.assignor could actually be
> > > > > > > > > > > > `group.streams.assignors`
> > > > > > > > > > > > > >>>>>>> and specified
> > > > > > > > > > > > > >>>>>>>>>> as a list of the class names of the 
> > > > > > > > > > > > > >>>>>>>>>> supplied
> > > > > > > > assignors. I
> > > > > > > > > > know
> > > > > > > > > > > > > >>>> you're
> > > > > > > > > > > > > >>>>>>> not supporting
> > > > > > > > > > > > > >>>>>>>>>> other assignors yet, but when you do, I 
> > > > > > > > > > > > > >>>>>>>>>> expect you
> > > > > > > > would
> > > > > > > > > > > > prefer to
> > > > > > > > > > > > > >>>>>>> have used class
> > > > > > > > > > > > > >>>>>>>>>> names from the start.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> The use of assignor names in the other 
> > > > > > > > > > > > > >>>>>>>>>> places looks
> > > > > > > > good
> > > > > > > > > > to
> > > > > > > > > > > > me.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> AS9: I'd find it really helpful to have a 
> > > > > > > > > > > > > >>>>>>>>>> bit of a
> > > > > > > > > > > > description about
> > > > > > > > > > > > > >>>>>>> the purpose and
> > > > > > > > > > > > > >>>>>>>>>> lifecycle of the 9 record types you've 
> > > > > > > > > > > > > >>>>>>>>>> introduced on
> > > > > > > > the
> > > > > > > > > > > > > >>>>>>> __consumer_offsets topic.
> > > > > > > > > > > > > >>>>>>>>>> I did a cursory review but without really
> > > > > > > > understanding
> > > > > > > > > > what's
> > > > > > > > > > > > > >>>>>>> written when,
> > > > > > > > > > > > > >>>>>>>>>> I can't do a thorough job of reviewing.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> AS10: In the definitions of the record 
> > > > > > > > > > > > > >>>>>>>>>> keys, such as
> > > > > > > > > > > > > >>>>>>>>>> StreamsGroupCurrentMemberAssignmentKey, 
> > > > > > > > > > > > > >>>>>>>>>> the versions
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > fields
> > > > > > > > > > > > > >>>>>>> must
> > > > > > > > > > > > > >>>>>>>>>> match the versions of the types.
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>> Thanks,
> > > > > > > > > > > > > >>>>>>>>>> Andrew
> > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy <
> > > > > > > > > > > > lbruts...@confluent.io
> > > > > > > > > > > > > >>>> .INVALID>

> > > > > > > > > > > > > >>>>>>> wrote:
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> Hi all,
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> I would like to start a discussion thread 
> > > > > > > > > > > > > >>>>>>>>>>> on
> > > > > > > > KIP-1071:
> > > > > > > > > > > > Streams
> > > > > > > > > > > > > >>>>>>>>>>> Rebalance Protocol. With this KIP, we aim 
> > > > > > > > > > > > > >>>>>>>>>>> to bring
> > > > > > > > the
> > > > > > > > > > > > principles
> > > > > > > > > > > > > >>>>>>> laid
> > > > > > > > > > > > > >>>>>>>>>>> down by KIP-848 to Kafka Streams, to make 
> > > > > > > > > > > > > >>>>>>>>>>> rebalances
> > > > > > > > more
> > > > > > > > > > > > reliable
> > > > > > > > > > > > > >>>>>>> and
> > > > > > > > > > > > > >>>>>>>>>>> scalable, and make Kafka Streams overall 
> > > > > > > > > > > > > >>>>>>>>>>> easier to
> > > > > > > > > > deploy and
> > > > > > > > > > > > > >>>>>>> operate.
> > > > > > > > > > > > > >>>>>>>>>>> The KIP proposed moving the assignment 
> > > > > > > > > > > > > >>>>>>>>>>> logic to the
> > > > > > > > > > broker,
> > > > > > > > > > > > and
> > > > > > > > > > > > > >>>>>>>>>>> introducing a dedicated group type and 
> > > > > > > > > > > > > >>>>>>>>>>> dedicated
> > > > > > > > RPCs for
> > > > > > > > > > > > Kafka
> > > > > > > > > > > > > >>>>>>>>>>> Streams.
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> The KIP is here:
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > >>>>
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> This is joint work with Bruno Cadonna.
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> Please take a look and let us know what 
> > > > > > > > > > > > > >>>>>>>>>>> you think.
> > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > >>>>>>>>>>> Best,
> > > > > > > > > > > > > >>>>>>>>>>> Lucas
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > >
> > > >

Reply via email to