Re versions: fair enough, I think it's okay to keep it as strings. Just to clarify the concern I had is that if we do want to augment it in the future, it will be harder to change a `string` field into a `struct` :P
Re onAssignment: actually even with the current proposal, since we are giving the final encoded metadata upon the first `onAssignment` call for the generation already, Streams would infer all the standby tasks that need are migrated in / out, so for Streams' purposes, we'd need to handle such decoupling anyways, and it's actually the opposite: if the first `onAssignment` does not include those yet-to-be-assigned then Streams would not know if a migrating out standby would really be promoting to active later or should it be completely removed, until later when next `onAssignment` is called. And that's where I mentioned "we'd need to figure out which onAssignment is the final call" etc. If we have all the partitions on the `onAssignment`, then we can infer such actions and decide whether we should close a standby task immediately or just recycle it and wait for the active task to be assigned eventually. On the other hand, if we call onAssignment incrementally similar to onPartitionsAssigned, in which we include both assigned and yet-to-be-assigned partitions, then for people who implement both interfaces, they can just ignore the `onPartitionsAssigned` since they have all knowledge they need from onAssignment already. And that's what I'm trying to avoid by making the functionality of the two more separated. Guozhang On Mon, Sep 26, 2022 at 11:30 AM David Jacot <dja...@confluent.io.invalid> wrote: > Regarding the version, I would rather add this later when we have a > clear use case for it. It seems to me that we are speculating here. I > understand your point but I am not fully convinced by the solution at > the moment. Would you agree with this? > > Regarding the onAssignment, I was thinking about the case where a task > is promoted from standby to active. In this case, having onAssignment > with the combined partitions could make it difficult, no? I am > thinking that the assignor will have to remove the standby based on > the metadata but it won't know what to do after that. If the partition > is assigned directly, it can convert it to an active task. On the > other hand, if the partition is not available yet, it would have to > keep it as a standby until the partition is really assigned. It seems > to be that the assignor won't have the information to make this > decision, no? In this case, decouple the "why" from the "when" seems > to make things harder. I am not so familiar with Streams though so my > intuition could be wrong here. > > David > > On Mon, Sep 26, 2022 at 7:26 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > Regarding the version, what I was thinking is that in the HB request, for > > "serverAssignor" field, instead of just having it as a single string > field, > > maybe we could consider also making it a structure that includes: name, > > minimumVersion, maximumVersion. Where the minimumVersion/maximumVersion > > means the versions of the server assignor that the client work best with. > > That being said, I agree with you that such information may also be > > inferred elsewhere e.g. by looking into the "rackId" field, and see if it > > contains a hyphen or not etc. All I was wondering is that, if such > version > > information would be useful for the server assignors to determine its > > actual assignment logic. I do not feel very strong about this one though > > --- even if we do not add it now, we can potentially add later, it's just > > that changing a single string field to a structure would be hard for > > compatibility and we'd then probably have to add top-level fields. > > > > Regarding the `onAssignment` logic, again my train of thoughts is that, > if > > users want to know exactly when a partition is assigned / revoked, they > > would be leveraging on the rebalance callbacks, as that's what people > > should rely on to determine "when" partitions are assigned. The > > `onAssignment` should be used for getting "why" such partition assignment > > decision is made, and hence returning `combined partitions` would be > okay. > > Streams e.g. implement both rebalance callbacks and the assignors, and it > > gets the "when" from the former (and create/close active tasks > accordingly) > > and the "why" from the latter (and update its global info bookkeeping as > > well as standby maintenance accordingly). Most users would be just > > interested in the rebalance callback, and not implement their own > assignor > > at all if they do not care about "why" as they trust the server assignors > > would take good care of those, and only about "when". So if we did build > > such two types of APIs from scratch, I'd indeed feel that not providing > the > > partitions but only the metadata for `onAssignment` may be less confusing > > and push users to separate the usage of these two more clearly, but since > > we already introduced partitions in `onAssignment` for compatibility I'm > > less keen on removing them. > > > > > > Guozhang > > > > On Mon, Sep 26, 2022 at 6:55 AM David Jacot <dja...@confluent.io.invalid > > > > wrote: > > > > > Hi Guozhang, > > > > > > Regarding the version, my understanding is that the version would be > > > either the client software version or the request version, is this > > > correct? If so, we could indeed pass this information down to the > > > assignor via the interface. One way would be to pass a "server > > > context" to the assignor and that context would include that > > > information (and perhaps more). Is this what you are looking for? > > > > > > Regarding the onAssignment, I think that I understand your point. I > > > suppose that the assignor could also be clever and keep track of the > > > last metadata to decide whether it has to do something or not. One > > > question that is still not clear to me is whether the assignor needs > > > to know all the assigned partitions upfront regardless of whether they > > > are already revoked or not. Do you think that we need this as well? > > > > > > From an API perspective, we could have something like > > > onAssignment(Metadata(version, reason, metadata, assigned partitions, > > > pending partitions)). Where the assigned partitions are the partitions > > > ready to be used and the pending partitions are the one assigned to > > > the member but not revoked yet. I find it a bit weird that this method > > > would be called only once because the assignor would not know when the > > > pending partitions changes. That does not look like a clean API. An > > > alternative would be to use onAssignment(Metadata(version, reason, > > > metadata, combined partitions)) but this seems error prone because it > > > is not clear whether a partition is usable or not. Or do you think > > > that we should not provide the partitions but only the metadata? > > > > > > Best, > > > David > > > > > > On Fri, Sep 23, 2022 at 9:40 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > > > > Hello David, > > > > > > > > On Fri, Sep 23, 2022 at 2:00 AM David Jacot > <dja...@confluent.io.invalid > > > > > > > > wrote: > > > > > > > > > Hey, > > > > > > > > > > > Just to clarify I was asking about the `version` of the assignor > > > (i.e. up > > > > > to what version that the client would support), and I do agree we > > > would not > > > > > need metadata. What I have in mind is that, for some specific > built-in > > > > > broker-assignors, e.g. rack-aware assignors, if it's possible that > in a > > > > > newer version we would have a hierarchical rack ID string format, > like > > > > > "tier1-tier2" etc, but if some client has not upgraded their rack > ID > > > > > would still be in old format. In this case, the broker then needs > to > > > choose > > > > > the old versioned assignor. I'm probably making something up here > for > > > rack > > > > > aware assignors, but I'm wondering if in general such an > > > "auto-downgrade" > > > > > behavior would be needed still for broker-side assignor, and if yes > > > would > > > > > "version" still be useful. > > > > > > > > > > Got it. That's an interesting thought. I think that the issue is > that > > > > > the client will never tell you which version of the server-side > > > > > assignor should be used. Do you think that the coordinator would > > > > > downgrade the version if the assignment fails with a higher > version? I > > > > > tend to believe that this should be handled within the assignor > > > > > itself. In the example that you mentioned, the assignor would have > to > > > > > handle all the cases. I am not really convinced that we need this > at > > > > > the moment. > > > > > > > > > > The version from the client side would not be indicating the broker > > > which > > > > version to use, but rather which version the client would "work best > > > with". > > > > Such a "version" field would not be settible by the users, since they > > > will > > > > be hard-codedly bumped when the Kafka byte code version bumped. > > > > Back to the rack aware assignor example, if the older versioned > client > > > does > > > > not have a hierarchical rack ID, however if the assignment returned > to > > > them > > > > is assuming a hierarchical rack structure, it may not reflect the > best > > > > workload balance among those new and old versioned clients. That > means, > > > > when receiving the members subscriptions at the server side, if the > > > > versions from all these members are different, the broker's assignor > may > > > > need to consider using the lower version logic to do the assignment. > So > > > yes > > > > the assignor would indeed have to handle all such cases, but it > needs to > > > do > > > > so such that if there are clients who would not work with certain new > > > > logic, it would then handle such cases automatically by e.g. still > using > > > an > > > > older versioned logic. > > > > > > > > > > > > > > > > > > Okay, my understanding is that the calling ordering of these > > > callbacks > > > > > would be like the following: > > > > > > > > > > Yes, your examples look right. > > > > > > > > > > > I'm wondering if we would still call onAssignment just once, that > > > encodes > > > > > all the assignment for this rebalance, including all the partitions > > > that > > > > > should be assigned to the member but not yet assigned since they > have > > > not > > > > > been revoked by others. In that case the call ordering would be: > > > > > > > > > > Interesting. Is there a case for Streams where having the full > > > > > assignment is beneficial? For instance, I can think of the > following > > > > > case. When a standby task is promoted to an active task, the > metadata > > > > > would not contain the standby task anymore and the assignment may > not > > > > > have the partition yet. In this case, Streams would stop the > standby > > > > > tasks but not have the active task yet if my understanding of > Streams > > > > > is correct. So knowing the full assignment could be helpful here. > > > > > > > > > > If we want to do this, we could structure the assignment given to > the > > > > > member as follow: version, error, metadata, assigned partitions, > > > > > pending partitions, where the pending partitions would be the one > > > > > assigned to this member but not yet available. What do you think? > > > > > > > > > > Regarding onAssignment being called only once, I am not sure to > fully > > > > > grasp the benefit yet. Does the assignor really care about this? In > > > > > the end, the epoch does not really matter for the assignor because > it > > > > > has to converge its state to the desired state anyway. > > > > > > > > > > Here's my rationale (maybe rephased a bit :P ): the implementers of > > > > rebalance listener and assignor are two groups of people, and most > users > > > > fall into the former group, while only very few people fall into the > > > later > > > > group. For rebalance listener implementers, they just want to know > when a > > > > partition is actually revoked or assigned to the consumer and reacts > to > > > it, > > > > for this purpose, `onPartitionsRevoked` and `onPartitionsAssigned` > would > > > be > > > > triggered interleavingly upon `poll` calls across rebalances. The > usual > > > > logic for such rebalance listeners are metrics reporting, committing > > > > offsets (if they do not use Kafka for that), etc. They would not care > > > which > > > > calls are from which rebalances --- in the past with eager > rebalance, it > > > > maybe that each rebalance is associated with exactly a > > > > `onPartitionsRevoked` first and then a `onPartitionsAssigned`, but it > > > would > > > > no longer the cases now. > > > > > > > > The implementers of the assignor though, would care about "how the > > > > assignment was made", that includes from which rebalance a certain > > > > revoke/assign decision was made, based on what metadata such > assignment > > > is > > > > made, etc. And that's the whole point of the `onAssignment` function > > > since > > > > otherwise they can just rely on the listeners. They usually > > > implementation > > > > logic of this callback is to e.g. bookkeep the assignment decision > > > driving > > > > factors a.k.a. the metadata, global information that needs to be > > > propagated > > > > to all members, etc. Take Streams as an example, the active > processing > > > > tasks go along with the assigned partitions, and we can always just > > > > incrementally create / close them upon each rebalance listener > triggers, > > > > when certain partitions are revoked or assigned together; standby > tasks > > > > however are encoded with the metadata, and we can only know which > standby > > > > tasks should we get / drop based on the `onAssignment` function, and > in > > > > fact the creation of such tasks as a result of the metadata > bookkeeping > > > > does not need to wait until all the partitions that are yet-assigned > have > > > > been completely assigned to the member. Such information may not > always > > > be > > > > updatable in an incremental manner as the partitions-revoked / > > > > partitions-assigned. In such a case, it's better to just trigger this > > > > function "once per decision made" i.e. once per rebalance generation. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > Best, > > > > > David > > > > > > > > > > On Thu, Sep 22, 2022 at 6:01 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > > > > Hi David, thanks for all the detailed explanations. I think they > all > > > make > > > > > > sense. Just want to have a couple follow-ups here: > > > > > > > > > > > > > I don't really see the benefits here because server side > assignors > > > > > > don't have metadata at all. They only assign topic-partitions. > They > > > > > > are not supposed to generate metadata nor to receive metadata > from > > > the > > > > > > members. > > > > > > > > > > > > Just to clarify I was asking about the `version` of the assignor > > > (i.e. up > > > > > > to what version that the client would support), and I do agree we > > > would > > > > > not > > > > > > need metadata. What I have in mind is that, for some specific > > > built-in > > > > > > broker-assignors, e.g. rack-aware assignors, if it's possible > that > > > in a > > > > > > newer version we would have a hierarchical rack ID string format, > > > like > > > > > > "tier1-tier2" etc, but if some client has not upgraded their > rack ID > > > > > > would still be in old format. In this case, the broker then > needs to > > > > > choose > > > > > > the old versioned assignor. I'm probably making something up > here for > > > > > rack > > > > > > aware assignors, but I'm wondering if in general such an > > > "auto-downgrade" > > > > > > behavior would be needed still for broker-side assignor, and if > yes > > > would > > > > > > "version" still be useful. > > > > > > > > > > > > > Yeah, that's right. Within a rebalance, `onAssignment` is > called > > > once > > > > > > when the member transitions to a new epoch. This one contains the > > > full > > > > > > metadata provided by the client side assignor. Then, > `onAssignment` > > > > > > can be called max N times where N is the number of partitions > pending > > > > > > revocation by other members. Let me try to clarify this in the > KIP. > > > > > > > > > > > > Okay, my understanding is that the calling ordering of these > > > callbacks > > > > > > would be like the following: > > > > > > > > > > > > ---------------------------------------- > > > > > > onPartitionsRevoked(); // just once, since we do not really > need > > > > > > to revoke incrementally. > > > > > > > > > > > > onAssignment(); // the first call, with epoch incremented > > > > > > onPartitionsAssigned(); // paired with the onAssignment > > > > > > > > > > > > onAssignment(); // the first onAssignment would > bump up > > > the > > > > > > epoch, and the metadata reflected. > > > > > > onPartitionsAssigned(); // each time we get an additional > > > assignment, > > > > > we > > > > > > call onAssignment and then paired with an onPartitionsAssigned > > > > > > ... > > > > > > onAssignment(); > > > > > > onPartitionsAssigned(); // on each of the onAssignment calls, > the > > > > > encoded > > > > > > metadata would not change, only the incrementally added > partitions be > > > > > > reflected > > > > > > > > > > > > Is that the case? > > > > > > > > > > > > I'm wondering if we would still call onAssignment just once, that > > > encodes > > > > > > all the assignment for this rebalance, including all the > partitions > > > that > > > > > > should be assigned to the member but not yet assigned since they > > > have not > > > > > > been revoked by others. In that case the call ordering would be: > > > > > > > > > > > > ---------------------------------------- > > > > > > onPartitionsRevoked(); // just once > > > > > > onAssignment(); // just once, with epoch incremented, and > metadata > > > > > > encoded changed, the "assignment" field also reflect the final > target > > > > > > assignment > > > > > > onPartitionsAssigned(); // multiple times, which represent > > > > > incrementally > > > > > > added partitions > > > > > > ... > > > > > > onPartitionsAssigned(); > > > > > > > > > > > > The motivation from this is that, most users would only > implement the > > > > > > rebalance callback listeners and hence we'd definitely need to > make > > > sure > > > > > > the semantics of that does not change much, and the time > > > > > > `onPartitionsAssigned` indicate the time when the partitions are > > > actually > > > > > > assigned to it; while for assignors, the `onAssignment` is used > to > > > > > indicate > > > > > > what decision is made regarding for this member, i.e. when the > > > partitions > > > > > > are decided to be given to it, but not necessarily meant that it > has > > > been > > > > > > given, since that time should be determined by the time of > > > > > > `onPartitionsAssigned`. The benefits though, would be that > assignor > > > > > > implementers would not need to reason which `onAssignment` would > be > > > the > > > > > > last one for this epoch. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Thu, Sep 22, 2022 at 2:20 AM David Jacot > > > <dja...@confluent.io.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Guozhang, > > > > > > > > > > > > > > > 1) The client-side "PartitionAssignor#Assignment" has an > Error > > > byte > > > > > > > field, > > > > > > > > while the broker-side "PartitionAssignor#Assignment" does > not. > > > And > > > > > the > > > > > > > > rationale seems to be assuming that we should always be able > to > > > do > > > > > the > > > > > > > > assignment at the broker-side assignor without errors. > > > Personally I > > > > > think > > > > > > > > it's still potentially beneficial to add the Error field > even for > > > > > > > > broker-side assignors, e.g. for some edge cases where some > > > subscribed > > > > > > > > topics are not recognized with the current broker's metadata. > > > What > > > > > do you > > > > > > > > think? > > > > > > > > > > > > > > Yeah, that seems reasonable. However, I wonder if it would be > > > better > > > > > > > to use an exception on the server side. This is what we > usually do > > > for > > > > > > > server side plugins. On the client side, we use a field > because the > > > > > > > errors are not defined in advance. > > > > > > > > > > > > > > Your comment also makes me think about what we should do when > the > > > > > > > server side assignor fails. I suppose that we have to keep the > > > current > > > > > > > assignment until a new event occurs. For instance, in your > example, > > > > > > > the coordinator would have to trigger a rebalance when > unrecognized > > > > > > > topics are available. This would be part of the metadata > > > monitoring. > > > > > > > > > > > > > > > 2) The client-side "GroupMember" has three additional fields > > > > > > > > reason/version/metadata compared with the broker-side > > > GroupMember. I > > > > > > > agree > > > > > > > > that broker-side assignor would not need reason/metadata > since > > > they > > > > > are > > > > > > > > blackbox strings/bytes to the assignor, but what about > version? > > > E.g. > > > > > is > > > > > > > it > > > > > > > > possible that we evolve our broker-side built-in assignor but > > > the old > > > > > > > > versioned clients would not be able to work with the new > > > version, in > > > > > > > which > > > > > > > > case we need to let the broker being aware of this and > upgrade > > > its > > > > > > > behavior > > > > > > > > to cooperate with the clients? > > > > > > > > > > > > > > I don't really see the benefits here because server side > assignors > > > > > > > don't have metadata at all. They only assign topic-partitions. > They > > > > > > > are not supposed to generate metadata nor to receive metadata > from > > > the > > > > > > > members. > > > > > > > > > > > > > > > 3) Also related to 2) above, for the client-side > "GroupMember", > > > > > instead > > > > > > > of > > > > > > > > including these three fields, what about just adding the > > > "Metadata" > > > > > field > > > > > > > > class which has these three fields? Also, there are two > > > "Metadata" > > > > > > > > currently in the APIs, the first is a class that encodes > > > > > > > > reason/version/metadata, and the second is just the encoded > > > metadata > > > > > > > bytes. > > > > > > > > I'm wondering what about just naming the first as > memberMetadata, > > > > > which > > > > > > > > then has a bytebuffer field Metadata, or instead naming the > > > second > > > > > > > > bytebuffer field as metadataBytes? > > > > > > > > > > > > > > That's a good point. Let me try to rationalize this interface > > > based on > > > > > > > your suggestions. > > > > > > > > > > > > > > Best, > > > > > > > David > > > > > > > > > > > > > > On Tue, Sep 13, 2022 at 9:21 PM Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > Hello David, > > > > > > > > > > > > > > > > Just had a few more nit questions about the public APIs: > > > > > > > > > > > > > > > > 1) The client-side "PartitionAssignor#Assignment" has an > Error > > > byte > > > > > > > field, > > > > > > > > while the broker-side "PartitionAssignor#Assignment" does > not. > > > And > > > > > the > > > > > > > > rationale seems to be assuming that we should always be able > to > > > do > > > > > the > > > > > > > > assignment at the broker-side assignor without errors. > > > Personally I > > > > > think > > > > > > > > it's still potentially beneficial to add the Error field > even for > > > > > > > > broker-side assignors, e.g. for some edge cases where some > > > subscribed > > > > > > > > topics are not recognized with the current broker's metadata. > > > What > > > > > do you > > > > > > > > think? > > > > > > > > > > > > > > > > 2) The client-side "GroupMember" has three additional fields > > > > > > > > reason/version/metadata compared with the broker-side > > > GroupMember. I > > > > > > > agree > > > > > > > > that broker-side assignor would not need reason/metadata > since > > > they > > > > > are > > > > > > > > blackbox strings/bytes to the assignor, but what about > version? > > > E.g. > > > > > is > > > > > > > it > > > > > > > > possible that we evolve our broker-side built-in assignor but > > > the old > > > > > > > > versioned clients would not be able to work with the new > > > version, in > > > > > > > which > > > > > > > > case we need to let the broker being aware of this and > upgrade > > > its > > > > > > > behavior > > > > > > > > to cooperate with the clients? > > > > > > > > > > > > > > > > 3) Also related to 2) above, for the client-side > "GroupMember", > > > > > instead > > > > > > > of > > > > > > > > including these three fields, what about just adding the > > > "Metadata" > > > > > field > > > > > > > > class which has these three fields? Also, there are two > > > "Metadata" > > > > > > > > currently in the APIs, the first is a class that encodes > > > > > > > > reason/version/metadata, and the second is just the encoded > > > metadata > > > > > > > bytes. > > > > > > > > I'm wondering what about just naming the first as > memberMetadata, > > > > > which > > > > > > > > then has a bytebuffer field Metadata, or instead naming the > > > second > > > > > > > > bytebuffer field as metadataBytes? > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 13, 2022 at 12:08 PM Guozhang Wang < > > > wangg...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hello David, > > > > > > > > > > > > > > > > > > Thanks for bringing this question up. I think the main > > > benefits as > > > > > you > > > > > > > > > listed is 2) above if it stays; just to clarify, we would > only > > > be > > > > > able > > > > > > > to > > > > > > > > > save one round trip if the rebalance is still triggered by > the > > > > > broker; > > > > > > > if > > > > > > > > > the rebalance is triggered by the client then the > > > num.round.trips > > > > > are > > > > > > > the > > > > > > > > > same: > > > > > > > > > > > > > > > > > > 1) With GroupPrepareAssignment: > > > > > > > > > > > > > > > > > > T0: client decides to do a new assignment, suppose it has > > > already > > > > > sent > > > > > > > a > > > > > > > > > HB and hence has to wait for it to return first since only > one > > > > > request > > > > > > > / > > > > > > > > > response can be inflight with the coordinator's socket. > > > > > > > > > T1: client receives the HB response, and then sends the > > > > > > > > > GroupPrepareAssignment request. > > > > > > > > > T2: the GroupPrepareAssignment response is returned. > > > > > > > > > T3: it calculates the new assignment, and sends a > > > > > > > GroupInstallAssignment > > > > > > > > > request. > > > > > > > > > > > > > > > > > > In total, two round trips. > > > > > > > > > > > > > > > > > > 2) Without GroupPrepareAssignment: > > > > > > > > > > > > > > > > > > T0: client decides to do a new assignment, suppose it has > > > already > > > > > sent > > > > > > > a > > > > > > > > > HB and hence has to wait for it to return first since only > one > > > > > request > > > > > > > / > > > > > > > > > response can be inflight with the coordinator's socket. > > > > > > > > > T1: client receives the HB response, and then sends the > new HB > > > > > request > > > > > > > > > with the flag indicating a new rebalance needed.. > > > > > > > > > T2: the HB response with the optional member metadata map > is > > > > > returned. > > > > > > > > > T3: it calculates the new assignment, and sends a > > > > > > > GroupInstallAssignment > > > > > > > > > request. > > > > > > > > > > > > > > > > > > In total, two round trips as well. > > > > > > > > > > > > > > > > > > ----------------------------- > > > > > > > > > > > > > > > > > > So to complete the full picture here, we'd need to modify > both > > > HB > > > > > > > request > > > > > > > > > and response so that the client can also indicate a new > > > rebalance > > > > > via > > > > > > > the > > > > > > > > > HB request as well, right? > > > > > > > > > > > > > > > > > > Assuming all above is true, I think it's okay to merge the > > > > > > > > > GroupPrepareAssignment into HB given that we can make the > > > > > additional > > > > > > > fields > > > > > > > > > encoding the full member (subscription) metadata and topic > > > > > metadata as > > > > > > > > > optional fields. > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Sep 12, 2022 at 5:22 AM David Jacot > > > > > > > <dja...@confluent.io.invalid> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > >> Hi all, > > > > > > > > >> > > > > > > > > >> During an offline conversation, someone asked why we need > the > > > > > > > > >> ConsumerGroupPrepareAssignment API and suggested that we > could > > > > > instead > > > > > > > > >> provide the group state in the heartbeat response. This > has a > > > few > > > > > > > > >> advantages: 1) it does not require using a special error > code > > > to > > > > > > > > >> signal that a new assignment is required as the signal > would > > > be > > > > > the > > > > > > > > >> provided group state; 2) it removes one round trip when a > > > client > > > > > side > > > > > > > > >> assignor is used. The downside is that it makes the > heartbeat > > > > > > > > >> response's definition quite large. I recall that I went > with > > > the > > > > > > > > >> current approach due to this. > > > > > > > > >> > > > > > > > > >> Providing the group state in the heartbeat response is > > > appealing. > > > > > What > > > > > > > > >> do you guys think? > > > > > > > > >> > > > > > > > > >> Best, > > > > > > > > >> David > > > > > > > > >> > > > > > > > > >> On Mon, Sep 12, 2022 at 2:17 PM David Jacot < > > > dja...@confluent.io> > > > > > > > wrote: > > > > > > > > >> > > > > > > > > > >> > Hi Guozhang, > > > > > > > > >> > > > > > > > > > >> > 1. I have added a reference to the relevant chapter > instead > > > of > > > > > > > > >> > repeating the whole thing. Does that work for you? > > > > > > > > >> > > > > > > > > > >> > 2. The "Rebalance Triggers" section you are referring > to is > > > > > about > > > > > > > when > > > > > > > > >> > a rebalance should be triggered for the non-upgraded > members > > > > > using > > > > > > > the > > > > > > > > >> > old protocol. The section mentions that a rebalance > must be > > > > > > > triggered > > > > > > > > >> > when a new assignment is installed. This implies that > the > > > group > > > > > > > epoch > > > > > > > > >> > was updated either by a native member or a non-upgraded > > > member. > > > > > For > > > > > > > > >> > the latter, the JoinGroup request would be the trigger. > I > > > have > > > > > > > added a > > > > > > > > >> > reference to the relevant chapter in the "JoinGroup > > > Handling" > > > > > > > section > > > > > > > > >> > as well. Does that make sense? > > > > > > > > >> > > > > > > > > > >> > Thanks, > > > > > > > > >> > David > > > > > > > > >> > > > > > > > > > >> > On Fri, Sep 9, 2022 at 10:35 PM Guozhang Wang < > > > > > wangg...@gmail.com> > > > > > > > > >> wrote: > > > > > > > > >> > > > > > > > > > > >> > > Hello David, > > > > > > > > >> > > > > > > > > > > >> > > Alright I think that's sufficient. Just to make that > > > clear in > > > > > the > > > > > > > doc, > > > > > > > > >> > > could we update: > > > > > > > > >> > > > > > > > > > > >> > > 1) the heartbeat request handling section, stating > when > > > > > > > coordinator > > > > > > > > >> will > > > > > > > > >> > > trigger rebalance based on the HB's member metadata / > > > reason? > > > > > > > > >> > > 2) the "Rebalance Triggers" section to include what we > > > > > described > > > > > > > in > > > > > > > > >> "Group > > > > > > > > >> > > Epoch - Trigger a rebalance" section as well? > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > Guozhang > > > > > > > > >> > > > > > > > > > > >> > > On Fri, Sep 9, 2022 at 1:28 AM David Jacot > > > > > > > > >> <dja...@confluent.io.invalid> > > > > > > > > >> > > wrote: > > > > > > > > >> > > > > > > > > > > >> > > > Hi Guozhang, > > > > > > > > >> > > > > > > > > > > > >> > > > I thought that the assignor will always be consulted > > > when > > > > > the > > > > > > > next > > > > > > > > >> > > > heartbeat request is constructed. In other words, > > > > > > > > >> > > > `PartitionAssignor#metadata` will be called for > every > > > > > heartbeat. > > > > > > > > >> This > > > > > > > > >> > > > gives the opportunity for the assignor to enforce a > > > > > rebalance by > > > > > > > > >> > > > setting the reason to a non-zero value or by > changing > > > the > > > > > > > bytes. Do > > > > > > > > >> > > > you think that this is not sufficient? Are you > > > concerned by > > > > > the > > > > > > > > >> delay? > > > > > > > > >> > > > > > > > > > > > >> > > > Best, > > > > > > > > >> > > > David > > > > > > > > >> > > > > > > > > > > > >> > > > On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang < > > > > > > > wangg...@gmail.com> > > > > > > > > >> wrote: > > > > > > > > >> > > > > > > > > > > > > >> > > > > Hello David, > > > > > > > > >> > > > > > > > > > > > > >> > > > > One of Jun's comments make me thinking: > > > > > > > > >> > > > > > > > > > > > > >> > > > > ``` > > > > > > > > >> > > > > In this case, a new assignment is triggered by the > > > client > > > > > side > > > > > > > > >> > > > > assignor. When constructing the HB, the consumer > will > > > > > always > > > > > > > > >> consult > > > > > > > > >> > > > > the client side assignor and propagate the > > > information to > > > > > the > > > > > > > > >> group > > > > > > > > >> > > > > coordinator. In other words, we don't expect > users to > > > call > > > > > > > > >> > > > > Consumer#enforceRebalance anymore. > > > > > > > > >> > > > > ``` > > > > > > > > >> > > > > > > > > > > > > >> > > > > As I looked at the current PartitionAssignor's > > > interface, > > > > > we > > > > > > > > >> actually do > > > > > > > > >> > > > > not have a way yet to instruct how to construct > the > > > next > > > > > HB > > > > > > > > >> request, e.g. > > > > > > > > >> > > > > when the assignor wants to enforce a new rebalance > > > with a > > > > > new > > > > > > > > >> assignment, > > > > > > > > >> > > > > we'd need some customizable APIs inside the > > > > > PartitionAssignor > > > > > > > to > > > > > > > > >> indicate > > > > > > > > >> > > > > the next HB telling broker about so. WDYT about > adding > > > > > such an > > > > > > > > >> API on the > > > > > > > > >> > > > > PartitionAssignor? > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > Guozhang > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > On Tue, Sep 6, 2022 at 6:09 AM David Jacot > > > > > > > > >> <dja...@confluent.io.invalid> > > > > > > > > >> > > > > wrote: > > > > > > > > >> > > > > > > > > > > > > >> > > > > > Hi Jun, > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > I have updated the KIP to include your > feedback. I > > > have > > > > > also > > > > > > > > >> tried to > > > > > > > > >> > > > > > clarify the parts which were not cleared. > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > Best, > > > > > > > > >> > > > > > David > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > On Fri, Sep 2, 2022 at 4:18 PM David Jacot < > > > > > > > dja...@confluent.io > > > > > > > > >> > > > > > > > > > >> > > > wrote: > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > Hi Jun, > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > Thanks for your feedback. Let me start by > > > answering > > > > > your > > > > > > > > >> questions > > > > > > > > >> > > > > > > inline and I will update the KIP next week. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > Thanks for the KIP. Overall, the main > benefits > > > of > > > > > the > > > > > > > KIP > > > > > > > > >> seem to > > > > > > > > >> > > > be > > > > > > > > >> > > > > > fewer > > > > > > > > >> > > > > > > > RPCs during rebalance and more efficient > > > support of > > > > > > > > >> wildcard. A few > > > > > > > > >> > > > > > > > comments below. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > I would also add that the KIP removes the > global > > > sync > > > > > > > barrier > > > > > > > > >> in the > > > > > > > > >> > > > > > > protocol which is essential to improve group > > > > > stability and > > > > > > > > >> > > > > > > scalability, and the KIP also simplifies the > > > client by > > > > > > > moving > > > > > > > > >> most of > > > > > > > > >> > > > > > > the logic to the server side. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 30. ConsumerGroupHeartbeatRequest > > > > > > > > >> > > > > > > > 30.1 ServerAssignor is a singleton. Do we > plan > > > to > > > > > > > support > > > > > > > > >> rolling > > > > > > > > >> > > > > > changing > > > > > > > > >> > > > > > > > of the partition assignor in the consumers? > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > Definitely. The group coordinator will use the > > > > > assignor > > > > > > > used > > > > > > > > >> by a > > > > > > > > >> > > > > > > majority of the members. This allows the > group to > > > move > > > > > > > from > > > > > > > > >> one > > > > > > > > >> > > > > > > assignor to another by a roll. This is > explained > > > in > > > > > the > > > > > > > > >> Assignor > > > > > > > > >> > > > > > > Selection chapter. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 30.2 For each field, could you explain > whether > > > it's > > > > > > > > >> required in > > > > > > > > >> > > > every > > > > > > > > >> > > > > > > > request or the scenarios when it needs to be > > > > > filled? For > > > > > > > > >> example, > > > > > > > > >> > > > it's > > > > > > > > >> > > > > > not > > > > > > > > >> > > > > > > > clear to me when TopicPartitions needs to be > > > filled. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > The client is expected to set those fields in > > > case of > > > > > a > > > > > > > > >> connection > > > > > > > > >> > > > > > > issue (e.g. timeout) or when the fields have > > > changed > > > > > since > > > > > > > > >> the last > > > > > > > > >> > > > > > > HB. The server populates those fields as long > as > > > the > > > > > > > member > > > > > > > > >> is not > > > > > > > > >> > > > > > > fully reconciled - the member should > acknowledge > > > that > > > > > it > > > > > > > has > > > > > > > > >> the > > > > > > > > >> > > > > > > expected epoch and assignment. I will clarify > > > this in > > > > > the > > > > > > > KIP. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 31. In the current consumer protocol, the > rack > > > > > affinity > > > > > > > > >> between the > > > > > > > > >> > > > > > client > > > > > > > > >> > > > > > > > and the broker is only considered during > > > fetching, > > > > > but > > > > > > > not > > > > > > > > >> during > > > > > > > > >> > > > > > assigning > > > > > > > > >> > > > > > > > partitions to consumers. Sometimes, once the > > > > > assignment > > > > > > > is > > > > > > > > >> made, > > > > > > > > >> > > > there > > > > > > > > >> > > > > > is > > > > > > > > >> > > > > > > > no opportunity for read affinity because no > > > > > replicas of > > > > > > > > >> assigned > > > > > > > > >> > > > > > partitions > > > > > > > > >> > > > > > > > are close to the member. I am wondering if > we > > > > > should use > > > > > > > > >> this > > > > > > > > >> > > > > > opportunity > > > > > > > > >> > > > > > > > to address this by including rack in > > > GroupMember. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > That's an interesting idea. I don't see any > issue > > > with > > > > > > > adding > > > > > > > > >> the > > > > > > > > >> > > > rack > > > > > > > > >> > > > > > > to the members. I will do so. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 32. On the metric side, often, it's useful > to > > > know > > > > > how > > > > > > > busy > > > > > > > > >> a group > > > > > > > > >> > > > > > > > coordinator is. By moving the event loop > model, > > > it > > > > > seems > > > > > > > > >> that we > > > > > > > > >> > > > could > > > > > > > > >> > > > > > add > > > > > > > > >> > > > > > > > a metric that tracks the fraction of the > time > > > the > > > > > event > > > > > > > > >> loop is > > > > > > > > >> > > > doing > > > > > > > > >> > > > > > the > > > > > > > > >> > > > > > > > actual work. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > That's a great idea. I will add it. Thanks. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 33. Could we add a section on coordinator > > > failover > > > > > > > > >> handling? For > > > > > > > > >> > > > > > example, > > > > > > > > >> > > > > > > > does it need to trigger the check if any > group > > > with > > > > > the > > > > > > > > >> wildcard > > > > > > > > >> > > > > > > > subscription now has a new matching topic? > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > Sure. When the new group coordinator takes > over, > > > it > > > > > has > > > > > > > to: > > > > > > > > >> > > > > > > * Setup the session timeouts. > > > > > > > > >> > > > > > > * Trigger a new assignment if a client side > > > assignor > > > > > is > > > > > > > used. > > > > > > > > >> We > > > > > > > > >> > > > don't > > > > > > > > >> > > > > > > store the information about the member > selected to > > > > > run the > > > > > > > > >> assignment > > > > > > > > >> > > > > > > so we have to start a new one. > > > > > > > > >> > > > > > > * Update the topics metadata, verify the > wildcard > > > > > > > > >> subscriptions, and > > > > > > > > >> > > > > > > trigger a rebalance if needed. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 34. ConsumerGroupMetadataValue, > > > > > > > > >> > > > ConsumerGroupPartitionMetadataValue, > > > > > > > > >> > > > > > > > ConsumerGroupMemberMetadataValue: Could we > > > document > > > > > what > > > > > > > > >> the epoch > > > > > > > > >> > > > > > field > > > > > > > > >> > > > > > > > reflects? For example, does the epoch in > > > > > > > > >> ConsumerGroupMetadataValue > > > > > > > > >> > > > > > reflect > > > > > > > > >> > > > > > > > the latest group epoch? What about the one > in > > > > > > > > >> > > > > > > > ConsumerGroupPartitionMetadataValue and > > > > > > > > >> > > > > > ConsumerGroupMemberMetadataValue? > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > Sure. I will clarify that but it is always the > > > latest > > > > > > > group > > > > > > > > >> epoch. > > > > > > > > >> > > > > > > When the group state is updated, the group > epoch > > > is > > > > > > > bumped so > > > > > > > > >> we use > > > > > > > > >> > > > > > > that one for all the change records related > to the > > > > > update. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 35. "the group coordinator will ensure that > the > > > > > > > following > > > > > > > > >> > > > invariants > > > > > > > > >> > > > > > are > > > > > > > > >> > > > > > > > met: ... All members exists." It's possible > for > > > a > > > > > member > > > > > > > > >> not to > > > > > > > > >> > > > get any > > > > > > > > >> > > > > > > > assigned partitions, right? > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > That's right. Here I meant that the members > > > provided > > > > > by > > > > > > > the > > > > > > > > >> assignor > > > > > > > > >> > > > > > > in the assignment must exist in the group. The > > > > > assignor > > > > > > > can > > > > > > > > >> not make > > > > > > > > >> > > > > > > up new member ids. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 36. "He can rejoins the group with a member > > > epoch > > > > > > > equals to > > > > > > > > >> 0": > > > > > > > > >> > > > When > > > > > > > > >> > > > > > would > > > > > > > > >> > > > > > > > a consumer rejoin and what member id would > be > > > used? > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > A member is expected to abandon all its > > > partitions and > > > > > > > > >> rejoins when > > > > > > > > >> > > > it > > > > > > > > >> > > > > > > receives the FENCED_MEMBER_EPOCH error. In > this > > > case, > > > > > the > > > > > > > > >> group > > > > > > > > >> > > > > > > coordinator will have removed the member from > the > > > > > group. > > > > > > > The > > > > > > > > >> member > > > > > > > > >> > > > > > > can rejoin the group with the same member id > but > > > with > > > > > 0 as > > > > > > > > >> epoch. Let > > > > > > > > >> > > > > > > me see if I can clarify this in the KIP. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 37. "Instead, power users will have the > ability > > > to > > > > > > > trigger a > > > > > > > > >> > > > > > reassignment > > > > > > > > >> > > > > > > > by either providing a non-zero reason or by > > > > > updating the > > > > > > > > >> assignor > > > > > > > > >> > > > > > > > metadata." Hmm, this seems to be conflicting > > > with > > > > > the > > > > > > > > >> deprecation > > > > > > > > >> > > > of > > > > > > > > >> > > > > > > > Consumer#enforeRebalance. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > In this case, a new assignment is triggered > by the > > > > > client > > > > > > > side > > > > > > > > >> > > > > > > assignor. When constructing the HB, the > consumer > > > will > > > > > > > always > > > > > > > > >> consult > > > > > > > > >> > > > > > > the client side assignor and propagate the > > > > > information to > > > > > > > the > > > > > > > > >> group > > > > > > > > >> > > > > > > coordinator. In other words, we don't expect > > > users to > > > > > call > > > > > > > > >> > > > > > > Consumer#enforceRebalance anymore. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 38. The reassignment examples are nice. But > the > > > > > section > > > > > > > > >> seems to > > > > > > > > >> > > > have > > > > > > > > >> > > > > > > > multiple typos. > > > > > > > > >> > > > > > > > 38.1 When the group transitions to epoch 2, > B > > > > > > > immediately > > > > > > > > >> gets into > > > > > > > > >> > > > > > > > "epoch=1, partitions=[foo-2]", which seems > > > > > incorrect. > > > > > > > > >> > > > > > > > 38.2 When the group transitions to epoch 3, > C > > > seems > > > > > to > > > > > > > get > > > > > > > > >> into > > > > > > > > >> > > > > > epoch=3, > > > > > > > > >> > > > > > > > partitions=[foo-1] too early. > > > > > > > > >> > > > > > > > 38.3 After A transitions to epoch 3, C still > > > has A - > > > > > > > > >> epoch=2, > > > > > > > > >> > > > > > > > partitions=[foo-0]. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > Sorry for that! I will revise them. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > 39. Rolling upgrade of consumers: Do we > support > > > the > > > > > > > upgrade > > > > > > > > >> from > > > > > > > > >> > > > any > > > > > > > > >> > > > > > old > > > > > > > > >> > > > > > > > version to new one? > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > We will support upgrading from the consumer > > > protocol > > > > > > > version > > > > > > > > >> 3, > > > > > > > > >> > > > > > > introduced in KIP-792. KIP-792 is not > implemented > > > yet > > > > > so > > > > > > > the > > > > > > > > >> earliest > > > > > > > > >> > > > > > > version is unknown at the moment. This is > > > explained > > > > > in the > > > > > > > > >> migration > > > > > > > > >> > > > > > > plan chapter. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > Thanks again for your feedback, Jun. I will > update > > > > > the KIP > > > > > > > > >> based on > > > > > > > > >> > > > it > > > > > > > > >> > > > > > > next week. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > Best, > > > > > > > > >> > > > > > > David > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > On Thu, Sep 1, 2022 at 9:07 PM Jun Rao > > > > > > > > >> <j...@confluent.io.invalid> > > > > > > > > >> > > > wrote: > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > Hi, David, > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the KIP. Overall, the main > benefits > > > of > > > > > the > > > > > > > KIP > > > > > > > > >> seem to > > > > > > > > >> > > > be > > > > > > > > >> > > > > > fewer > > > > > > > > >> > > > > > > > RPCs during rebalance and more efficient > > > support of > > > > > > > > >> wildcard. A few > > > > > > > > >> > > > > > > > comments below. > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > 30. ConsumerGroupHeartbeatRequest > > > > > > > > >> > > > > > > > 30.1 ServerAssignor is a singleton. Do we > plan > > > to > > > > > > > support > > > > > > > > >> rolling > > > > > > > > >> > > > > > changing > > > > > > > > >> > > > > > > > of the partition assignor in the consumers? > > > > > > > > >> > > > > > > > 30.2 For each field, could you explain > whether > > > it's > > > > > > > > >> required in > > > > > > > > >> > > > every > > > > > > > > >> > > > > > > > request or the scenarios when it needs to be > > > > > filled? For > > > > > > > > >> example, > > > > > > > > >> > > > it's > > > > > > > > >> > > > > > not > > > > > > > > >> > > > > > > > clear to me when TopicPartitions needs to be > > > filled. > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > 31. In the current consumer protocol, the > rack > > > > > affinity > > > > > > > > >> between the > > > > > > > > >> > > > > > client > > > > > > > > >> > > > > > > > and the broker is only considered during > > > fetching, > > > > > but > > > > > > > not > > > > > > > > >> during > > > > > > > > >> > > > > > assigning > > > > > > > > >> > > > > > > > partitions to consumers. Sometimes, once the > > > > > assignment > > > > > > > is > > > > > > > > >> made, > > > > > > > > >> > > > there > > > > > > > > >> > > > > > is > > > > > > > > >> > > > > > > > no opportunity for read affinity because no > > > > > replicas of > > > > > > > > >> assigned > > > > > > > > >> > > > > > partitions > > > > > > > > >> > > > > > > > are close to the member. I am wondering if > we > > > > > should use > > > > > > > > >> this > > > > > > > > >> > > > > > opportunity > > > > > > > > >> > > > > > > > to address this by including rack in > > > GroupMember. > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > 32. On the metric side, often, it's useful > to > > > know > > > > > how > > > > > > > busy > > > > > > > > >> a group > > > > > > > > >> > > > > > > > coordinator is. By moving the event loop > model, > > > it > > > > > seems > > > > > > > > >> that we > > > > > > > > >> > > > could > > > > > > > > >> > > > > > add > > > > > > > > >> > > > > > > > a metric that tracks the fraction of the > time > > > the > > > > > event > > > > > > > > >> loop is > > > > > > > > >> > > > doing > > > > > > > > >> > > > > > the > > > > > > > > >> > > > > > > > actual work. > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > 33. Could we add a section on coordinator > > > failover > > > > > > > > >> handling? For > > > > > > > > >> > > > > > example, > > > > > > > > >> > > > > > > > does it need to trigger the check if any > group > > > with > > > > > the > > > > > > > > >> wildcard > > > > > > > > >> > > > > > > > subscription now has a new matching topic? > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > 34. ConsumerGroupMetadataValue, > > > > > > > > >> > > > ConsumerGroupPartitionMetadataValue, > > > > > > > > >> > > > > > > > ConsumerGroupMemberMetadataValue: Could we > > > document > > > > > what > > > > > > > > >> the epoch > > > > > > > > >> > > > > > field > > > > > > > > >> > > > > > > > reflects? For example, does the epoch in > > > > > > > > >> ConsumerGroupMetadataValue > > > > > > > > >> > > > > > reflect > > > > > > > > >> > > > > > > > the latest group epoch? What about the one > in > > > > > > > > >> > > > > > > > ConsumerGroupPartitionMetadataValue and > > > > > > > > >> > > > > > ConsumerGroupMemberMetadataValue? > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > 35. "the group coordinator will ensure that > the > > > > > > > following > > > > > > > > >> > > > invariants > > > > > > > > >> > > > > > are > > > > > > > > >> > > > > > > > met: ... All members exists." It's possible > for > > > a > > > > > member > > > > > > > > >> not to > > > > > > > > >> > > > get any > > > > > > > > >> > > > > > > > assigned partitions, right? > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > 36. "He can rejoins the group with a member > > > epoch > > > > > > > equals to > > > > > > > > >> 0": > > > > > > > > >> > > > When > > > > > > > > >> > > > > > would > > > > > > > > >> > > > > > > > a consumer rejoin and what member id would > be > > > used? > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > 37. "Instead, power users will have the > ability > > > to > > > > > > > trigger a > > > > > > > > >> > > > > > reassignment > > > > > > > > >> > > > > > > > by either providing a non-zero reason or by > > > > > updating the > > > > > > > > >> assignor > > > > > > > > >> > > > > > > > metadata." Hmm, this seems to be conflicting > > > with > > > > > the > > > > > > > > >> deprecation > > > > > > > > >> > > > of > > > > > > > > >> > > > > > > > Consumer#enforeRebalance. > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > 38. The reassignment examples are nice. But > the > > > > > section > > > > > > > > >> seems to > > > > > > > > >> > > > have > > > > > > > > >> > > > > > > > multiple typos. > > > > > > > > >> > > > > > > > 38.1 When the group transitions to epoch 2, > B > > > > > > > immediately > > > > > > > > >> gets into > > > > > > > > >> > > > > > > > "epoch=1, partitions=[foo-2]", which seems > > > > > incorrect. > > > > > > > > >> > > > > > > > 38.2 When the group transitions to epoch 3, > C > > > seems > > > > > to > > > > > > > get > > > > > > > > >> into > > > > > > > > >> > > > > > epoch=3, > > > > > > > > >> > > > > > > > partitions=[foo-1] too early. > > > > > > > > >> > > > > > > > 38.3 After A transitions to epoch 3, C still > > > has A - > > > > > > > > >> epoch=2, > > > > > > > > >> > > > > > > > partitions=[foo-0]. > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > 39. Rolling upgrade of consumers: Do we > support > > > the > > > > > > > upgrade > > > > > > > > >> from > > > > > > > > >> > > > any > > > > > > > > >> > > > > > old > > > > > > > > >> > > > > > > > version to new one? > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > Thanks, > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > Jun > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > On Mon, Aug 29, 2022 at 9:20 AM David Jacot > > > > > > > > >> > > > > > <dja...@confluent.io.invalid> > > > > > > > > >> > > > > > > > wrote: > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > Hi all, > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > The KIP states that we will re-implement > the > > > > > > > coordinator > > > > > > > > >> in > > > > > > > > >> > > > Java. I > > > > > > > > >> > > > > > > > > discussed this offline with a few folks > and > > > folks > > > > > are > > > > > > > > >> concerned > > > > > > > > >> > > > that > > > > > > > > >> > > > > > > > > we could introduce many regressions in > the old > > > > > > > protocol > > > > > > > > >> if we do > > > > > > > > >> > > > so. > > > > > > > > >> > > > > > > > > Therefore, I am going to remove this > statement > > > > > from > > > > > > > the > > > > > > > > >> KIP. It > > > > > > > > >> > > > is an > > > > > > > > >> > > > > > > > > implementation detail after all so it > does not > > > > > have > > > > > > > to be > > > > > > > > >> > > > decided at > > > > > > > > >> > > > > > > > > this stage. We will likely start by > trying to > > > > > > > refactor the > > > > > > > > >> > > > current > > > > > > > > >> > > > > > > > > implementation as a first step. > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > Cheers, > > > > > > > > >> > > > > > > > > David > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > On Mon, Aug 29, 2022 at 3:52 PM David > Jacot < > > > > > > > > >> dja...@confluent.io > > > > > > > > >> > > > > > > > > > > > > >> > > > > > wrote: > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > Hi Luke, > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 1.1. I think the state machine are: > > > "Empty, > > > > > > > assigning, > > > > > > > > >> > > > > > reconciling, > > > > > > > > >> > > > > > > > > stable, > > > > > > > > >> > > > > > > > > > > dead" mentioned in Consumer Group > States > > > > > section, > > > > > > > > >> right? > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > This sentence does not refer to those > group > > > > > states > > > > > > > but > > > > > > > > >> rather > > > > > > > > >> > > > to a > > > > > > > > >> > > > > > > > > > state machine replication (SMR). This > > > refers to > > > > > the > > > > > > > > >> entire > > > > > > > > >> > > > state of > > > > > > > > >> > > > > > > > > > group coordinator which is replicated > via > > > the > > > > > log > > > > > > > > >> layer. I will > > > > > > > > >> > > > > > > > > > clarify this in the KIP. > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 1.2. What do you mean "each state > machine > > > is > > > > > > > modelled > > > > > > > > >> as an > > > > > > > > >> > > > event > > > > > > > > >> > > > > > > > > loop"? > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > The idea is to follow a model similar to > > > the new > > > > > > > quorum > > > > > > > > >> > > > > > controller. We > > > > > > > > >> > > > > > > > > > will have N threads to process events. > Each > > > > > > > > >> __consumer_offsets > > > > > > > > >> > > > > > > > > > partition is assigned to a unique > thread and > > > > > all the > > > > > > > > >> events > > > > > > > > >> > > > (e.g. > > > > > > > > >> > > > > > > > > > requests, callbacks, etc.) are > processed by > > > this > > > > > > > > >> thread. This > > > > > > > > >> > > > > > simplify > > > > > > > > >> > > > > > > > > > concurrency and will enable us to do > > > simulation > > > > > > > testing > > > > > > > > >> for the > > > > > > > > >> > > > > > group > > > > > > > > >> > > > > > > > > > coordinator. > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 1.3. Why do we need a state machine > per > > > > > > > > >> *__consumer_offsets* > > > > > > > > >> > > > > > > > > partitions? > > > > > > > > >> > > > > > > > > > > Not a state machine "per consumer > group" > > > > > owned by > > > > > > > a > > > > > > > > >> group > > > > > > > > >> > > > > > coordinator? > > > > > > > > >> > > > > > > > > For > > > > > > > > >> > > > > > > > > > > example, if one group coordinator > owns 2 > > > > > consumer > > > > > > > > >> groups, and > > > > > > > > >> > > > > > both > > > > > > > > >> > > > > > > > > exist in > > > > > > > > >> > > > > > > > > > > *__consumer_offsets-0*, will we have 1 > > > state > > > > > > > machine > > > > > > > > >> for it, > > > > > > > > >> > > > or > > > > > > > > >> > > > > > 2? > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > See 1.1. The confusion comes from > there, I > > > > > think. > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 1.4. I know the > > > "*group.coordinator.threads" > > > > > *is > > > > > > > the > > > > > > > > >> number > > > > > > > > >> > > > of > > > > > > > > >> > > > > > threads > > > > > > > > >> > > > > > > > > used > > > > > > > > >> > > > > > > > > > > to run the state machines. But I'm > > > wondering > > > > > if > > > > > > > the > > > > > > > > >> purpose > > > > > > > > >> > > > of > > > > > > > > >> > > > > > the > > > > > > > > >> > > > > > > > > threads > > > > > > > > >> > > > > > > > > > > is only to keep the state of each > consumer > > > > > group > > > > > > > (or > > > > > > > > >> > > > > > > > > *__consumer_offsets* > > > > > > > > >> > > > > > > > > > > partitions?), and no heavy > computation, > > > why > > > > > > > should we > > > > > > > > >> need > > > > > > > > >> > > > > > > > > multi-threads > > > > > > > > >> > > > > > > > > > > here? > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > See 1.2. The idea is to have an ability > to > > > > > shard the > > > > > > > > >> > > > processing as > > > > > > > > >> > > > > > the > > > > > > > > >> > > > > > > > > > computation could be heavy. > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 2.1. The consumer session timeout, why > > > does > > > > > the > > > > > > > > >> default > > > > > > > > >> > > > session > > > > > > > > >> > > > > > > > > timeout not > > > > > > > > >> > > > > > > > > > > locate between min (45s) and > max(60s)? I > > > > > thought > > > > > > > the > > > > > > > > >> min/max > > > > > > > > >> > > > > > session > > > > > > > > >> > > > > > > > > > > timeout is to define lower/upper > bound of > > > it, > > > > > no? > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > group.consumer.session.timeout.ms int > > > 30s The > > > > > > > > >> timeout to > > > > > > > > >> > > > detect > > > > > > > > >> > > > > > client > > > > > > > > >> > > > > > > > > > > failures when using the consumer group > > > > > protocol. > > > > > > > > >> > > > > > > > > > > group.consumer.min.session.timeout.ms > int > > > > > 45s The > > > > > > > > >> minimum > > > > > > > > >> > > > > > session > > > > > > > > >> > > > > > > > > timeout. > > > > > > > > >> > > > > > > > > > > group.consumer.max.session.timeout.ms > int > > > > > 60s The > > > > > > > > >> maximum > > > > > > > > >> > > > > > session > > > > > > > > >> > > > > > > > > timeout. > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > This is indeed a mistake. The default > > > session > > > > > > > timeout > > > > > > > > >> should > > > > > > > > >> > > > be 45s > > > > > > > > >> > > > > > > > > > (the current default). > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 2.2. The default server side assignor > are > > > > > [range, > > > > > > > > >> uniform], > > > > > > > > >> > > > > > which means > > > > > > > > >> > > > > > > > > > > we'll default to "range" assignor. I'd > > > like to > > > > > > > know > > > > > > > > >> why not > > > > > > > > >> > > > > > uniform > > > > > > > > >> > > > > > > > > one? I > > > > > > > > >> > > > > > > > > > > thought usually users will choose > uniform > > > > > assignor > > > > > > > > >> (former > > > > > > > > >> > > > sticky > > > > > > > > >> > > > > > > > > assinor) > > > > > > > > >> > > > > > > > > > > for better evenly distribution. Any > other > > > > > reason > > > > > > > we > > > > > > > > >> choose > > > > > > > > >> > > > range > > > > > > > > >> > > > > > > > > assignor > > > > > > > > >> > > > > > > > > > > as default? > > > > > > > > >> > > > > > > > > > > group.consumer.assignors List range, > > > uniform > > > > > The > > > > > > > > >> server side > > > > > > > > >> > > > > > assignors. > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > The order on the server side has no > > > influence > > > > > > > because > > > > > > > > >> the > > > > > > > > >> > > > client > > > > > > > > >> > > > > > must > > > > > > > > >> > > > > > > > > > chose the selector that he wants to use. > > > There > > > > > is no > > > > > > > > >> default > > > > > > > > >> > > > in the > > > > > > > > >> > > > > > > > > > current proposal. If the assignor is not > > > > > specified > > > > > > > by > > > > > > > > >> the > > > > > > > > >> > > > client, > > > > > > > > >> > > > > > the > > > > > > > > >> > > > > > > > > > request is rejected. The default client > > > value > > > > > for > > > > > > > > >> > > > > > > > > > `group.remote.assignor` is `uniform` > though. > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks for your very good comments, > Luke. I > > > hope > > > > > > > that my > > > > > > > > >> > > > answers > > > > > > > > >> > > > > > help > > > > > > > > >> > > > > > > > > > to clarify things. I will update the > KIP as > > > well > > > > > > > based > > > > > > > > >> on your > > > > > > > > >> > > > > > > > > > feedback. > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > Cheers, > > > > > > > > >> > > > > > > > > > David > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > On Mon, Aug 22, 2022 at 9:29 AM Luke > Chen < > > > > > > > > >> show...@gmail.com> > > > > > > > > >> > > > > > wrote: > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Hi David, > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks for the update. > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Some more questions: > > > > > > > > >> > > > > > > > > > > 1. In Group Coordinator section, you > > > > > mentioned: > > > > > > > > >> > > > > > > > > > > > The new group coordinator will have > a > > > state > > > > > > > machine > > > > > > > > >> per > > > > > > > > >> > > > > > > > > > > *__consumer_offsets* partitions, where > > > each > > > > > state > > > > > > > > >> machine is > > > > > > > > >> > > > > > modelled > > > > > > > > >> > > > > > > > > as an > > > > > > > > >> > > > > > > > > > > event loop. Those state machines will > be > > > > > executed > > > > > > > in > > > > > > > > >> > > > > > > > > > > *group.coordinator.threads* threads. > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 1.1. I think the state machine are: > > > "Empty, > > > > > > > assigning, > > > > > > > > >> > > > > > reconciling, > > > > > > > > >> > > > > > > > > stable, > > > > > > > > >> > > > > > > > > > > dead" mentioned in Consumer Group > States > > > > > section, > > > > > > > > >> right? > > > > > > > > >> > > > > > > > > > > 1.2. What do you mean "each state > machine > > > is > > > > > > > modelled > > > > > > > > >> as an > > > > > > > > >> > > > event > > > > > > > > >> > > > > > > > > loop"? > > > > > > > > >> > > > > > > > > > > 1.3. Why do we need a state machine > per > > > > > > > > >> *__consumer_offsets* > > > > > > > > >> > > > > > > > > partitions? > > > > > > > > >> > > > > > > > > > > Not a state machine "per consumer > group" > > > > > owned by > > > > > > > a > > > > > > > > >> group > > > > > > > > >> > > > > > coordinator? > > > > > > > > >> > > > > > > > > For > > > > > > > > >> > > > > > > > > > > example, if one group coordinator > owns 2 > > > > > consumer > > > > > > > > >> groups, and > > > > > > > > >> > > > > > both > > > > > > > > >> > > > > > > > > exist in > > > > > > > > >> > > > > > > > > > > *__consumer_offsets-0*, will we have 1 > > > state > > > > > > > machine > > > > > > > > >> for it, > > > > > > > > >> > > > or > > > > > > > > >> > > > > > 2? > > > > > > > > >> > > > > > > > > > > 1.4. I know the > > > "*group.coordinator.threads" > > > > > *is > > > > > > > the > > > > > > > > >> number > > > > > > > > >> > > > of > > > > > > > > >> > > > > > threads > > > > > > > > >> > > > > > > > > used > > > > > > > > >> > > > > > > > > > > to run the state machines. But I'm > > > wondering > > > > > if > > > > > > > the > > > > > > > > >> purpose > > > > > > > > >> > > > of > > > > > > > > >> > > > > > the > > > > > > > > >> > > > > > > > > threads > > > > > > > > >> > > > > > > > > > > is only to keep the state of each > consumer > > > > > group > > > > > > > (or > > > > > > > > >> > > > > > > > > *__consumer_offsets* > > > > > > > > >> > > > > > > > > > > partitions?), and no heavy > computation, > > > why > > > > > > > should we > > > > > > > > >> need > > > > > > > > >> > > > > > > > > multi-threads > > > > > > > > >> > > > > > > > > > > here? > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 2. For the default value in the new > > > configs: > > > > > > > > >> > > > > > > > > > > 2.1. The consumer session timeout, why > > > does > > > > > the > > > > > > > > >> default > > > > > > > > >> > > > session > > > > > > > > >> > > > > > > > > timeout not > > > > > > > > >> > > > > > > > > > > locate between min (45s) and > max(60s)? I > > > > > thought > > > > > > > the > > > > > > > > >> min/max > > > > > > > > >> > > > > > session > > > > > > > > >> > > > > > > > > > > timeout is to define lower/upper > bound of > > > it, > > > > > no? > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > group.consumer.session.timeout.ms int > > > 30s The > > > > > > > > >> timeout to > > > > > > > > >> > > > detect > > > > > > > > >> > > > > > client > > > > > > > > >> > > > > > > > > > > failures when using the consumer group > > > > > protocol. > > > > > > > > >> > > > > > > > > > > group.consumer.min.session.timeout.ms > int > > > > > 45s The > > > > > > > > >> minimum > > > > > > > > >> > > > > > session > > > > > > > > >> > > > > > > > > timeout. > > > > > > > > >> > > > > > > > > > > group.consumer.max.session.timeout.ms > int > > > > > 60s The > > > > > > > > >> maximum > > > > > > > > >> > > > > > session > > > > > > > > >> > > > > > > > > timeout. > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 2.2. The default server side assignor > are > > > > > [range, > > > > > > > > >> uniform], > > > > > > > > >> > > > > > which means > > > > > > > > >> > > > > > > > > > > we'll default to "range" assignor. I'd > > > like to > > > > > > > know > > > > > > > > >> why not > > > > > > > > >> > > > > > uniform > > > > > > > > >> > > > > > > > > one? I > > > > > > > > >> > > > > > > > > > > thought usually users will choose > uniform > > > > > assignor > > > > > > > > >> (former > > > > > > > > >> > > > sticky > > > > > > > > >> > > > > > > > > assinor) > > > > > > > > >> > > > > > > > > > > for better evenly distribution. Any > other > > > > > reason > > > > > > > we > > > > > > > > >> choose > > > > > > > > >> > > > range > > > > > > > > >> > > > > > > > > assignor > > > > > > > > >> > > > > > > > > > > as default? > > > > > > > > >> > > > > > > > > > > group.consumer.assignors List range, > > > uniform > > > > > The > > > > > > > > >> server side > > > > > > > > >> > > > > > assignors. > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thank you. > > > > > > > > >> > > > > > > > > > > Luke > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > On Mon, Aug 22, 2022 at 2:10 PM Luke > Chen > > > < > > > > > > > > >> show...@gmail.com > > > > > > > > >> > > > > > > > > > > > > >> > > > > > wrote: > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Hi Sagar, > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > I have some thoughts about Kafka > Connect > > > > > > > > >> integrating with > > > > > > > > >> > > > > > KIP-848, > > > > > > > > >> > > > > > > > > but I > > > > > > > > >> > > > > > > > > > > > think we should have a separate > > > discussion > > > > > > > thread > > > > > > > > >> for the > > > > > > > > >> > > > Kafka > > > > > > > > >> > > > > > > > > Connect > > > > > > > > >> > > > > > > > > > > > KIP: Integrating Kafka Connect With > New > > > > > Consumer > > > > > > > > >> Rebalance > > > > > > > > >> > > > > > Protocol > > > > > > > > >> > > > > > > > > [1], > > > > > > > > >> > > > > > > > > > > > and let this discussion thread > focus on > > > > > consumer > > > > > > > > >> rebalance > > > > > > > > >> > > > > > protocol, > > > > > > > > >> > > > > > > > > WDYT? > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > [1] > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Thank you. > > > > > > > > >> > > > > > > > > > > > Luke > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > On Fri, Aug 12, 2022 at 9:31 PM > Sagar < > > > > > > > > >> > > > > > sagarmeansoc...@gmail.com> > > > > > > > > >> > > > > > > > > wrote: > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> Thank you Guozhang/David for the > > > feedback. > > > > > > > Looks > > > > > > > > >> like > > > > > > > > >> > > > there's > > > > > > > > >> > > > > > > > > agreement on > > > > > > > > >> > > > > > > > > > > >> using separate APIs for Connect. I > > > would > > > > > > > revisit > > > > > > > > >> the doc > > > > > > > > >> > > > and > > > > > > > > >> > > > > > see > > > > > > > > >> > > > > > > > > what > > > > > > > > >> > > > > > > > > > > >> changes are to be made. > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> Thanks! > > > > > > > > >> > > > > > > > > > > >> Sagar. > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> On Tue, Aug 9, 2022 at 7:11 PM > David > > > Jacot > > > > > > > > >> > > > > > > > > <dja...@confluent.io.invalid> > > > > > > > > >> > > > > > > > > > > >> wrote: > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > Hi Sagar, > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > Thanks for the feedback and the > > > document. > > > > > > > That's > > > > > > > > >> really > > > > > > > > >> > > > > > helpful. I > > > > > > > > >> > > > > > > > > > > >> > will take a look at it. > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > Overall, it seems to me that both > > > Connect > > > > > > > and the > > > > > > > > >> > > > Consumer > > > > > > > > >> > > > > > could > > > > > > > > >> > > > > > > > > share > > > > > > > > >> > > > > > > > > > > >> > the same underlying "engine". The > > > main > > > > > > > > >> difference is > > > > > > > > >> > > > that > > > > > > > > >> > > > > > the > > > > > > > > >> > > > > > > > > Consumer > > > > > > > > >> > > > > > > > > > > >> > assigns topic-partitions to > members > > > > > whereas > > > > > > > > >> Connect > > > > > > > > >> > > > assigns > > > > > > > > >> > > > > > tasks > > > > > > > > >> > > > > > > > > to > > > > > > > > >> > > > > > > > > > > >> > workers. I see two ways to move > > > forward: > > > > > > > > >> > > > > > > > > > > >> > 1) We extend the new proposed > APIs to > > > > > support > > > > > > > > >> different > > > > > > > > >> > > > > > resource > > > > > > > > >> > > > > > > > > types > > > > > > > > >> > > > > > > > > > > >> > (e.g. partitions, tasks, etc.); > or > > > > > > > > >> > > > > > > > > > > >> > 2) We use new dedicated APIs for > > > > > Connect. The > > > > > > > > >> dedicated > > > > > > > > >> > > > APIs > > > > > > > > >> > > > > > > > > would be > > > > > > > > >> > > > > > > > > > > >> > similar to the new ones but > > > different on > > > > > the > > > > > > > > >> > > > > > content/resources and > > > > > > > > >> > > > > > > > > > > >> > they would rely on the same > engine > > > on the > > > > > > > > >> coordinator > > > > > > > > >> > > > side. > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > I personally lean towards 2) > because > > > I am > > > > > > > not a > > > > > > > > >> fan of > > > > > > > > >> > > > > > > > > overcharging > > > > > > > > >> > > > > > > > > > > >> > APIs to serve different purposes. > > > That > > > > > being > > > > > > > > >> said, I am > > > > > > > > >> > > > not > > > > > > > > >> > > > > > > > > opposed to > > > > > > > > >> > > > > > > > > > > >> > 1) if we can find an elegant way > to > > > do > > > > > it. > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > I think that we can continue to > > > discuss > > > > > it > > > > > > > here > > > > > > > > >> for now > > > > > > > > >> > > > in > > > > > > > > >> > > > > > order > > > > > > > > >> > > > > > > > > to > > > > > > > > >> > > > > > > > > > > >> > ensure that this KIP is > compatible > > > with > > > > > what > > > > > > > we > > > > > > > > >> will do > > > > > > > > >> > > > for > > > > > > > > >> > > > > > > > > Connect in > > > > > > > > >> > > > > > > > > > > >> > the future. > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > Best, > > > > > > > > >> > > > > > > > > > > >> > David > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > On Mon, Aug 8, 2022 at 2:41 PM > David > > > > > Jacot < > > > > > > > > >> > > > > > dja...@confluent.io> > > > > > > > > >> > > > > > > > > wrote: > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > Hi all, > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > I am back from vacation. I > will go > > > > > through > > > > > > > and > > > > > > > > >> address > > > > > > > > >> > > > > > your > > > > > > > > >> > > > > > > > > comments > > > > > > > > >> > > > > > > > > > > >> > > in the coming days. Thanks for > your > > > > > > > feedback. > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > Cheers, > > > > > > > > >> > > > > > > > > > > >> > > David > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > On Wed, Aug 3, 2022 at 10:05 PM > > > Gregory > > > > > > > Harris > > > > > > > > >> < > > > > > > > > >> > > > > > > > > gharris1...@gmail.com > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > wrote: > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > Hey All! > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > Thanks for the KIP, it's > > > wonderful > > > > > to see > > > > > > > > >> > > > cooperative > > > > > > > > >> > > > > > > > > rebalancing > > > > > > > > >> > > > > > > > > > > >> > making it > > > > > > > > >> > > > > > > > > > > >> > > > down the stack! > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > I had a few questions: > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > 1. The 'Rejected > Alternatives' > > > > > section > > > > > > > > >> describes how > > > > > > > > >> > > > > > member > > > > > > > > >> > > > > > > > > epoch > > > > > > > > >> > > > > > > > > > > >> > should > > > > > > > > >> > > > > > > > > > > >> > > > advance in step with the > group > > > epoch > > > > > and > > > > > > > > >> assignment > > > > > > > > >> > > > > > epoch > > > > > > > > >> > > > > > > > > values. I > > > > > > > > >> > > > > > > > > > > >> > think > > > > > > > > >> > > > > > > > > > > >> > > > that this is a good idea for > the > > > > > reasons > > > > > > > > >> described > > > > > > > > >> > > > in > > > > > > > > >> > > > > > the > > > > > > > > >> > > > > > > > > KIP. When > > > > > > > > >> > > > > > > > > > > >> the > > > > > > > > >> > > > > > > > > > > >> > > > protocol is incrementally > > > assigning > > > > > > > > >> partitions to a > > > > > > > > >> > > > > > worker, > > > > > > > > >> > > > > > > > > what > > > > > > > > >> > > > > > > > > > > >> member > > > > > > > > >> > > > > > > > > > > >> > > > epoch does each incremental > > > > > assignment > > > > > > > use? > > > > > > > > >> Are > > > > > > > > >> > > > member > > > > > > > > >> > > > > > epochs > > > > > > > > >> > > > > > > > > > > >> re-used, > > > > > > > > >> > > > > > > > > > > >> > and > > > > > > > > >> > > > > > > > > > > >> > > > a single member epoch can > > > correspond > > > > > to > > > > > > > > >> multiple > > > > > > > > >> > > > > > different > > > > > > > > >> > > > > > > > > > > >> > (monotonically > > > > > > > > >> > > > > > > > > > > >> > > > larger) assignments? > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > 2. Is the Assignor's 'Reason' > > > field > > > > > > > opaque > > > > > > > > >> to the > > > > > > > > >> > > > group > > > > > > > > >> > > > > > > > > > > >> coordinator? If > > > > > > > > >> > > > > > > > > > > >> > > > not, should custom > client-side > > > > > assignor > > > > > > > > >> > > > implementations > > > > > > > > >> > > > > > > > > interact > > > > > > > > >> > > > > > > > > > > >> with > > > > > > > > >> > > > > > > > > > > >> > the > > > > > > > > >> > > > > > > > > > > >> > > > Reason field, and how is its > > > common > > > > > > > meaning > > > > > > > > >> agreed > > > > > > > > >> > > > > > upon? If > > > > > > > > >> > > > > > > > > so, what > > > > > > > > >> > > > > > > > > > > >> > is the > > > > > > > > >> > > > > > > > > > > >> > > > benefit of a distinct Reason > > > field > > > > > over > > > > > > > > >> including > > > > > > > > >> > > > such > > > > > > > > >> > > > > > > > > functionality > > > > > > > > >> > > > > > > > > > > >> > in the > > > > > > > > >> > > > > > > > > > > >> > > > opaque metadata? > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > 3. The following is included > in > > > the > > > > > KIP: > > > > > > > > >> "Thanks to > > > > > > > > >> > > > > > this, the > > > > > > > > >> > > > > > > > > input > > > > > > > > >> > > > > > > > > > > >> of > > > > > > > > >> > > > > > > > > > > >> > the > > > > > > > > >> > > > > > > > > > > >> > > > client side assignor is > entirely > > > > > driven > > > > > > > by > > > > > > > > >> the group > > > > > > > > >> > > > > > > > > coordinator. > > > > > > > > >> > > > > > > > > > > >> The > > > > > > > > >> > > > > > > > > > > >> > > > consumer is no longer > > > responsible for > > > > > > > > >> maintaining > > > > > > > > >> > > > any > > > > > > > > >> > > > > > state > > > > > > > > >> > > > > > > > > besides > > > > > > > > >> > > > > > > > > > > >> its > > > > > > > > >> > > > > > > > > > > >> > > > assigned partitions." Does > this > > > mean > > > > > > > that the > > > > > > > > >> > > > > > client-side > > > > > > > > >> > > > > > > > > assignor > > > > > > > > >> > > > > > > > > > > >> MAY > > > > > > > > >> > > > > > > > > > > >> > > > incorporate additional > > > non-Metadata > > > > > state > > > > > > > > >> (such as > > > > > > > > >> > > > > > partition > > > > > > > > >> > > > > > > > > > > >> > throughput, > > > > > > > > >> > > > > > > > > > > >> > > > cpu/memory metrics, config > > > topics, > > > > > etc), > > > > > > > or > > > > > > > > >> that > > > > > > > > >> > > > > > additional > > > > > > > > >> > > > > > > > > > > >> > non-Metadata > > > > > > > > >> > > > > > > > > > > >> > > > state SHOULD NOT be used? > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > 4. I see that there are > separate > > > > > classes > > > > > > > > >> > > > > > > > > > > >> > > > for > > > > > > > > >> > > > > > > > > org.apache.kafka.server.group.consumer.PartitionAssignor > > > > > > > > >> > > > > > > > > > > >> > > > and > > > > > > > > >> > > > org.apache.kafka.clients.consumer.PartitionAssignor > > > > > > > > >> > > > > > that > > > > > > > > >> > > > > > > > > seem to > > > > > > > > >> > > > > > > > > > > >> > > > overlap significantly. Is it > > > > > possible for > > > > > > > > >> these two > > > > > > > > >> > > > > > > > > implementations > > > > > > > > >> > > > > > > > > > > >> to > > > > > > > > >> > > > > > > > > > > >> > be > > > > > > > > >> > > > > > > > > > > >> > > > unified? This would serve to > > > promote > > > > > > > feature > > > > > > > > >> parity > > > > > > > > >> > > > of > > > > > > > > >> > > > > > > > > server-side > > > > > > > > >> > > > > > > > > > > >> and > > > > > > > > >> > > > > > > > > > > >> > > > client-side assignors, and > would > > > also > > > > > > > > >> facilitate > > > > > > > > >> > > > > > operational > > > > > > > > >> > > > > > > > > > > >> > flexibility in > > > > > > > > >> > > > > > > > > > > >> > > > certain situations. For > example, > > > if a > > > > > > > > >> server-side > > > > > > > > >> > > > > > assignor > > > > > > > > >> > > > > > > > > has some > > > > > > > > >> > > > > > > > > > > >> > poor > > > > > > > > >> > > > > > > > > > > >> > > > behavior and needs a patch, > > > > > deploying the > > > > > > > > >> patched > > > > > > > > >> > > > > > assignor to > > > > > > > > >> > > > > > > > > the > > > > > > > > >> > > > > > > > > > > >> > client > > > > > > > > >> > > > > > > > > > > >> > > > and switching one consumer > group > > > to a > > > > > > > > >> client-side > > > > > > > > >> > > > > > assignor > > > > > > > > >> > > > > > > > > may be > > > > > > > > >> > > > > > > > > > > >> > faster > > > > > > > > >> > > > > > > > > > > >> > > > and less risky than patching > all > > > of > > > > > the > > > > > > > > >> brokers. > > > > > > > > >> > > > With > > > > > > > > >> > > > > > the > > > > > > > > >> > > > > > > > > currently > > > > > > > > >> > > > > > > > > > > >> > > > proposed distinct APIs, a > > > non-trivial > > > > > > > > >> > > > reimplementation > > > > > > > > >> > > > > > would > > > > > > > > >> > > > > > > > > have > > > > > > > > >> > > > > > > > > > > >> to be > > > > > > > > >> > > > > > > > > > > >> > > > assembled, and if the two > APIs > > > have > > > > > > > diverged > > > > > > > > >> > > > > > significantly, > > > > > > > > >> > > > > > > > > then it > > > > > > > > >> > > > > > > > > > > >> is > > > > > > > > >> > > > > > > > > > > >> > > > possible that a > reimplementation > > > > > would > > > > > > > not be > > > > > > > > >> > > > possible. > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > -- > > > > > > > > >> > > > > > > > > > > >> > > > Greg Harris > > > > > > > > >> > > > > > > > > > > >> > > > gharris1...@gmail.com > > > > > > > > >> > > > > > > > > > > >> > > > github.com/gharris1727 > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > On Wed, Aug 3, 2022 at 8:39 > AM > > > Sagar > > > > > < > > > > > > > > >> > > > > > > > > sagarmeansoc...@gmail.com> > > > > > > > > >> > > > > > > > > > > >> > wrote: > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > Hi Guozhang/David, > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > I created a confluence > page to > > > > > discuss > > > > > > > how > > > > > > > > >> Connect > > > > > > > > >> > > > > > would > > > > > > > > >> > > > > > > > > need to > > > > > > > > >> > > > > > > > > > > >> > change > > > > > > > > >> > > > > > > > > > > >> > > > > based on the new rebalance > > > > > protocol. > > > > > > > > >> Here's the > > > > > > > > >> > > > page: > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > It's also pretty longish > and I > > > have > > > > > > > tried > > > > > > > > >> to keep > > > > > > > > >> > > > a > > > > > > > > >> > > > > > format > > > > > > > > >> > > > > > > > > > > >> similar to > > > > > > > > >> > > > > > > > > > > >> > > > > KIP-848. Let me know what > you > > > > > think. > > > > > > > Also, > > > > > > > > >> do you > > > > > > > > >> > > > > > think this > > > > > > > > >> > > > > > > > > > > >> should > > > > > > > > >> > > > > > > > > > > >> > be > > > > > > > > >> > > > > > > > > > > >> > > > > moved to a separate > discussion > > > > > thread > > > > > > > or > > > > > > > > >> is this > > > > > > > > >> > > > one > > > > > > > > >> > > > > > fine? > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > Thanks! > > > > > > > > >> > > > > > > > > > > >> > > > > Sagar. > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > On Tue, Jul 26, 2022 at > 7:37 AM > > > > > Sagar < > > > > > > > > >> > > > > > > > > sagarmeansoc...@gmail.com> > > > > > > > > >> > > > > > > > > > > >> > wrote: > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > Hello Guozhang, > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > Thank you so much for the > > > doc on > > > > > > > Kafka > > > > > > > > >> Streams. > > > > > > > > >> > > > > > Sure, I > > > > > > > > >> > > > > > > > > would do > > > > > > > > >> > > > > > > > > > > >> > the > > > > > > > > >> > > > > > > > > > > >> > > > > > analysis and come up with > > > such a > > > > > > > > >> document. > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > Thanks! > > > > > > > > >> > > > > > > > > > > >> > > > > > Sagar. > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > On Tue, Jul 26, 2022 at > 4:47 > > > AM > > > > > > > Guozhang > > > > > > > > >> Wang < > > > > > > > > >> > > > > > > > > > > >> wangg...@gmail.com> > > > > > > > > >> > > > > > > > > > > >> > > > > wrote: > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > >> Hello Sagar, > > > > > > > > >> > > > > > > > > > > >> > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > > > >> It would be great if you > > > could > > > > > come > > > > > > > > >> back with > > > > > > > > >> > > > some > > > > > > > > >> > > > > > > > > analysis on > > > > > > > > >> > > > > > > > > > > >> > how to > > > > > > > > >> > > > > > > > > > > >> > > > > >> implement the Connect > side > > > > > > > integration > > > > > > > > >> with > > > > > > > > >> > > > the new > > > > > > > > >> > > > > > > > > protocol; > > > > > > > > >> > > > > > > > > > > >> so > > > > > > > > >> > > > > > > > > > > >> > far > > > > > > > > >> > > > > > > > > > > >> > > > > >> besides leveraging on > the > > > new > > > > > > > "protocol > > > > > > > > >> type" > > > > > > > > >> > > > we > > > > > > > > >> > > > > > did not > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > -- > > -- Guozhang > -- -- Guozhang