Thank you for the suggestions Jason. And a side note for Guozhang, I updated the KIP to reflect the dependency on 447.
On Tue, Aug 6, 2019 at 11:35 AM Jason Gustafson <ja...@confluent.io> wrote: > Hi Boyang, thanks for the updates. I have a few more comments: > > 1. We are adding some new fields to TxnOffsetCommit to support group-based > fencing. Do we need these fields to be persisted in the offsets topic to > ensure that the fencing still works after a coordinator failover? > > We already persist member.id, instance.id and generation.id in the offset topic, what extra fields we need to store? > 2. Since you are proposing a new `groupMetadata` API, have you considered > whether we still need the `initTransactions` overload? Another way would be > to pass it through the `sendOffsetsToTransaction` API: > > void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> > offsets, GroupMetadata groupMetadata) throws > ProducerFencedException, IllegalGenerationException; > > This seems a little more consistent with the current API and avoids the > direct dependence on the Consumer in the producer. > > Note that although we avoid one dependency to consumer, producer needs to periodically update its group metadata, or in this case the caller of *sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>* *offsets, GroupMetadata groupMetadata) *is responsible for getting the latest value of group metadata. This should be easily done on the stream side as we have StreamsPartitionAssignor to reflect metadata changes upon #onAssignment(), but non-stream user has to code the callback by hand, do you think the convenience we sacrifice here worth the simplification benefit? > 3. Can you clarify the behavior of the clients when the brokers do not > support the latest API versions? This is both for the new TxnOffsetCommit > and the OffsetFetch APIs. I guess the high level idea in streams is to > detect broker support before instantiating the producer and consumer. I > think that's reasonable, but we might need some approach for non-streams > use cases. One option I was considering is enforcing the latest version > through the new `sendOffsetsToTransaction` API. Basically when you use the > new API, we require support for the latest TxnOffsetCommit version. This > puts some burden on users, but it avoids breaking correctness assumptions > when the new APIs are in use. What do you think? > Yes, I think we haven't covered this case, so the plan is to crash the non-stream application when the job is using new sendOffsets API. > > -Jason > > > > > On Mon, Aug 5, 2019 at 6:06 PM Boyang Chen <reluctanthero...@gmail.com> > wrote: > > > Yep, Guozhang I think that would be best as passing in an entire consumer > > instance is indeed cumbersome. > > > > Just saw you updated KIP-429, I will follow-up to change 447 as well. > > > > Best, > > Boyang > > > > On Mon, Aug 5, 2019 at 3:18 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > > okay I think I understand your concerns about ConsumerGroupMetadata > now: > > if > > > we still want to only call initTxns once, then we should allow the > > whatever > > > passed-in parameter to reflect the latest value of generation id > whenever > > > sending the offset fetch request. > > > > > > Whereas the current ConsumerGroupMetadata is a static object. > > > > > > Maybe we can consider having an extended class of ConsumerGroupMetadata > > > whose values are updated from the consumer's rebalance callback? > > > > > > > > > Guozhang > > > > > > > > > On Mon, Aug 5, 2019 at 9:26 AM Boyang Chen <reluctanthero...@gmail.com > > > > > wrote: > > > > > > > Thank you Guozhang for the reply! I'm curious whether KIP-429 has > > > reflected > > > > the latest change on ConsumerGroupMetadata? Also regarding question > > one, > > > > the group metadata needs to be accessed via callback, does that mean > we > > > > need a separate producer API such like > > > > "producer.refreshMetadata(groupMetadata)" to be able to access it > > instead > > > > of passing in the consumer instance? > > > > > > > > Boyang > > > > > > > > On Fri, Aug 2, 2019 at 4:36 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > > > Thanks Boyang, > > > > > > > > > > I've made another pass on KIP-447 as well as > > > > > https://github.com/apache/kafka/pull/7078, and have some minor > > > comments > > > > > about the proposed API: > > > > > > > > > > 1. it seems instead of needing the whole KafkaConsumer object, > you'd > > > only > > > > > need the "ConsumerGroupMetadata", in that case can we just pass in > > that > > > > > object into the initTxns call? > > > > > > > > > > 2. the current trunk already has a public class named > > > > > (ConsumerGroupMetadata) > > > > > under o.a.k.clients.consumer created by KIP-429. If we want to just > > use > > > > > that then maybe it makes less sense to declare a base GroupMetadata > > as > > > we > > > > > are already leaking such information on the assignor anyways. > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen < > > > reluctanthero...@gmail.com> > > > > > wrote: > > > > > > > > > > > Thank you Guozhang for the reply. We will consider the interface > > > change > > > > > > from 429 as a backup plan for 447. > > > > > > > > > > > > And bumping this thread for more discussion. > > > > > > > > > > > > On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen < > > > > > reluctanthero...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Thank you Guozhang for the suggestion! I would normally > prefer > > > > > naming a > > > > > > > > flag corresponding to its functionality. Seems to me > > > > > `isolation_level` > > > > > > > > makes us another hop on information track. > > > > > > > > > > > > > > > > Fair enough, let's use a separate flag name then :) > > > > > > > > > > > > > > > > > > > > > > As for the generation.id exposure, I'm fine leveraging the > new > > > API > > > > > > from > > > > > > > > 429, but however is that design finalized yet, and whether > the > > > API > > > > > will > > > > > > > be > > > > > > > > added on the generic Consumer<K, V> interface? > > > > > > > > > > > > > > > > The current PartitionAssignor is inside `internals` package > and > > > in > > > > > > > KIP-429 > > > > > > > we are going to create a new interface out of `internals` to > > really > > > > > make > > > > > > it > > > > > > > public APIs, and as part of that we are refactoring some of its > > > > method > > > > > > > signatures. I just feel some of the newly introduced classes > can > > be > > > > > > reused > > > > > > > in your KIP as well, i.e. just for code succinctness, but no > > > > semantical > > > > > > > indications. > > > > > > > > > > > > > > > > > > > > > > Boyang > > > > > > > > > > > > > > > > On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang < > > > wangg...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Boyang, thanks for the updated proposal! > > > > > > > > > > > > > > > > > > 3.a. As Jason mentioned, with EOS enabled we still need to > > > > augment > > > > > > the > > > > > > > > > offset fetch request with a boolean to indicate "give me an > > > > > retriable > > > > > > > > error > > > > > > > > > code if there's pending offset, rather than sending me the > > > > > committed > > > > > > > > offset > > > > > > > > > immediately". Personally I still feel it is okay to > > piggy-back > > > on > > > > > the > > > > > > > > > ISOLATION_LEVEL boolean, but I'm also fine with another > > > > > > > > `await_transaction` > > > > > > > > > boolean if you feel strongly about it. > > > > > > > > > > > > > > > > > > 10. About the exposure of generation id, there may be some > > > > > > refactoring > > > > > > > > work > > > > > > > > > coming from KIP-429 that can benefit KIP-447 as well since > we > > > are > > > > > > > > wrapping > > > > > > > > > the consumer subscription / assignment data in new classes. > > > Note > > > > > that > > > > > > > > > current proposal does not `generationId` since with the > > > > cooperative > > > > > > > > sticky > > > > > > > > > assignor we think it is not necessary for correctness, but > > also > > > > if > > > > > we > > > > > > > > agree > > > > > > > > > it is okay to expose it we can potentially include it in > > > > > > > > > `ConsumerAssignmentData` as well. > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen < > > > > > > > reluctanthero...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thank you Jason for the ideas. > > > > > > > > > > > > > > > > > > > > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson < > > > > > > ja...@confluent.io> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Boyang, > > > > > > > > > > > > > > > > > > > > > > Thanks for the updates. A few comments below: > > > > > > > > > > > > > > > > > > > > > > 1. The KIP mentions that `transaction.timeout.ms` > should > > > be > > > > > > > reduced > > > > > > > > to > > > > > > > > > > > 10s. > > > > > > > > > > > I think this makes sense for Kafka Streams which is > tied > > to > > > > the > > > > > > > > > consumer > > > > > > > > > > > group semantics and uses a default 10s session timeout. > > > > > However, > > > > > > it > > > > > > > > > > seems a > > > > > > > > > > > bit dangerous to make this change for the producer > > > generally. > > > > > > Could > > > > > > > > we > > > > > > > > > > just > > > > > > > > > > > change it for streams? > > > > > > > > > > > > > > > > > > > > > > That sounds good to me. > > > > > > > > > > > > > > > > > > > > > 2. The new `initTransactions` API takes a `Consumer` > > > > instance. > > > > > I > > > > > > > > think > > > > > > > > > > the > > > > > > > > > > > idea is to basically put in a backdoor to give the > > producer > > > > > > access > > > > > > > to > > > > > > > > > the > > > > > > > > > > > group generationId. It's not clear to me how this would > > > work > > > > > > given > > > > > > > > > > package > > > > > > > > > > > restrictions. I wonder if it would be better to just > > expose > > > > the > > > > > > > state > > > > > > > > > we > > > > > > > > > > > need from the consumer. I know we have been reluctant > to > > do > > > > > this > > > > > > so > > > > > > > > far > > > > > > > > > > > because we treat the generationId as an implementation > > > > detail. > > > > > > > > > However, I > > > > > > > > > > > think we might just bite the bullet and expose it > rather > > > than > > > > > > > coming > > > > > > > > up > > > > > > > > > > > with a messy hack. Concepts such as memberIds have > > already > > > > been > > > > > > > > exposed > > > > > > > > > > in > > > > > > > > > > > the AdminClient, so maybe it is not too bad. > > Alternatively, > > > > we > > > > > > > could > > > > > > > > > use > > > > > > > > > > an > > > > > > > > > > > opaque type. For example: > > > > > > > > > > > > > > > > > > > > > > // public > > > > > > > > > > > interface GroupMetadata {} > > > > > > > > > > > > > > > > > > > > > > // private > > > > > > > > > > > interface ConsumerGroupMetadata { > > > > > > > > > > > final int generationId; > > > > > > > > > > > final String memberId; > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > // Consumer API > > > > > > > > > > > public GroupMetadata groupMetadata(); > > > > > > > > > > > > > > > > > > > > > > I am probably leaning toward just exposing the state we > > > need. > > > > > > > > > > > > > > > > > > > > > > Yes, also to mention that Kafka Streams use generic > > > Cosnumer > > > > > API > > > > > > > > which > > > > > > > > > > doesn't have rich > > > > > > > > > > states like a full `KafkaConsumer`. The hack will not > work > > as > > > > > > > expected. > > > > > > > > > > > > > > > > > > > > Instead, just exposing the consumer generation.id seems > a > > > way > > > > > > easier > > > > > > > > > work. > > > > > > > > > > We could consolidate > > > > > > > > > > the API and make it > > > > > > > > > > > > > > > > > > > > 3. Given that we are already providing a way to propagate > > > group > > > > > > state > > > > > > > > > from > > > > > > > > > > > the consumer to the producer, I wonder if we may as > well > > > > > include > > > > > > > the > > > > > > > > > > > memberId and groupInstanceId. This would make the > > > validation > > > > we > > > > > > do > > > > > > > > for > > > > > > > > > > > TxnOffsetCommit consistent with OffsetCommit. If for no > > > other > > > > > > > > benefit, > > > > > > > > > at > > > > > > > > > > > least this may help with debugging. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, we could put them into the GroupMetadata struct. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. I like the addition of isolation_level to the offset > > > > fetch. > > > > > At > > > > > > > the > > > > > > > > > > same > > > > > > > > > > > time, its behavior is a bit inconsistent with how it is > > > used > > > > in > > > > > > the > > > > > > > > > > > consumer generally. There is no reason for the group > > > > > coordinator > > > > > > to > > > > > > > > > ever > > > > > > > > > > > expose aborted data, so this is mostly about awaiting > > > pending > > > > > > > offset > > > > > > > > > > > commits, not reading uncommitted data. Perhaps instead > of > > > > > calling > > > > > > > > this > > > > > > > > > > > "isolation level," it should be more like > > > > > > > "await_pending_transaction" > > > > > > > > > or > > > > > > > > > > > something like that? > > > > > > > > > > > > > > > > > > > > > > Also, just to be clear, the consumer would treat this > as > > an > > > > > > > optional > > > > > > > > > > field, > > > > > > > > > > > right? So if the broker does not support the latest > > > > OffsetFetch > > > > > > > API, > > > > > > > > it > > > > > > > > > > > would silently revert to reading the old data. > Basically > > it > > > > > would > > > > > > > be > > > > > > > > up > > > > > > > > > > to > > > > > > > > > > > the streams version probing logic to ensure that the > > > > > expectation > > > > > > on > > > > > > > > > this > > > > > > > > > > > API fits with the usage of `transctional.id`. > > > > > > > > > > > > > > > > > > > > > > Sounds like a better naming to me, while I think it > could > > > be > > > > > > > > shortened > > > > > > > > > to > > > > > > > > > > `await_transaction`. > > > > > > > > > > I think the field should be optional, too. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen < > > > > > > > > reluctanthero...@gmail.com > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hey Guozhang, > > > > > > > > > > > > > > > > > > > > > > > > I will correct my statement from last email. I don't > > > think > > > > > the > > > > > > > > > > > > read_committed (3.a) is necessary to be added to the > > > > > > OffsetFetch > > > > > > > > > > request, > > > > > > > > > > > > as if we are using EOS application, the underlying > > > > consumers > > > > > > > within > > > > > > > > > the > > > > > > > > > > > > group should always back off when there is pending > > > offsets. > > > > > > > > > > > > > > > > > > > > > > > > Let me know if you think this is correct. > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen < > > > > > > > > > reluctanthero...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Thank you Guozhang for the questions, inline > answers > > > are > > > > > > below. > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen < > > > > > > > > > > reluctanthero...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > >> Hey all, > > > > > > > > > > > > >> > > > > > > > > > > > > >> I have done a fundamental polish of KIP-447 > > > > > > > > > > > > >> < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics > > > > > > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > >> written a design doc > > > > > > > > > > > > >> < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit# > > > > > > > > > > > > > > > > > > > > > > > > depicting > > > > > > > > > > > > >> internal changes. We stripped off many > > implementation > > > > > > details > > > > > > > > from > > > > > > > > > > the > > > > > > > > > > > > KIP, > > > > > > > > > > > > >> and simplified the public changes by a lot. For > > > > reviewers, > > > > > > it > > > > > > > is > > > > > > > > > > > highly > > > > > > > > > > > > >> recommended to fully understand EOS design in > KIP-98 > > > and > > > > > > read > > > > > > > > its > > > > > > > > > > > > >> corresponding design doc > > > > > > > > > > > > >> < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit > > > > > > > > > > > > > > > > > > > > > > > > if > > > > > > > > > > > > >> you haven't done so already. > > > > > > > > > > > > >> > > > > > > > > > > > > >> Let me know if you found anything confusing around > > the > > > > KIP > > > > > > or > > > > > > > > the > > > > > > > > > > > > design. > > > > > > > > > > > > >> Would be happy to discuss in depth. > > > > > > > > > > > > >> > > > > > > > > > > > > >> Best, > > > > > > > > > > > > >> Boyang > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang < > > > > > > > > > wangg...@gmail.com> > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > >> > > > > > > > > > > > > >>> 2. The reason we did not expose generation.id > from > > > > > > > > KafkaConsumer > > > > > > > > > > > > public > > > > > > > > > > > > >>> APIs directly is to abstract this notion from > users > > > > > (since > > > > > > it > > > > > > > > is > > > > > > > > > an > > > > > > > > > > > > >>> implementation detail of the rebalance protocol > > > itself, > > > > > > e.g. > > > > > > > if > > > > > > > > > > user > > > > > > > > > > > > >>> calls > > > > > > > > > > > > >>> consumer.assign() they do not need to invoke > > > > > > > > ConsumerCoordinator > > > > > > > > > > and > > > > > > > > > > > no > > > > > > > > > > > > >>> need to be aware of generation.id at all). > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> On the other hand, with the current proposal the > > > > > > > > txn.coordiantor > > > > > > > > > > did > > > > > > > > > > > > not > > > > > > > > > > > > >>> know about the latest generation from the > > > > source-of-truth > > > > > > > > > > > > >>> group.coordinator; instead, it will only bump up > > the > > > > > > > generation > > > > > > > > > > from > > > > > > > > > > > > the > > > > > > > > > > > > >>> producer's InitProducerIdRequest only. > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> The key here is that GroupCoordinator, when > > handling > > > > > > > > > > > > >>> `InitProducerIdRequest > > > > > > > > > > > > >>> > > > > > > > > > > > > >> In the new design, we just pass the entire > consumer > > > > > instance > > > > > > > > into > > > > > > > > > > the > > > > > > > > > > > > > producer through > > > > > > > > > > > > > #initTransaction, so no public API will be created. > > > > > > > > > > > > > > > > > > > > > > > > > >> 3. I agree that if we rely on the group > coordinator > > to > > > > > block > > > > > > > on > > > > > > > > > > > > returning > > > > > > > > > > > > >>> offset-fetch-response if read-committed is > enabled, > > > > then > > > > > we > > > > > > > do > > > > > > > > > not > > > > > > > > > > > need > > > > > > > > > > > > >>> to > > > > > > > > > > > > >>> store partition assignment on txn coordinator and > > > > > therefore > > > > > > > > it's > > > > > > > > > > > better > > > > > > > > > > > > >>> to > > > > > > > > > > > > >>> still decouple them. For that case we still need > to > > > > > update > > > > > > > the > > > > > > > > > KIP > > > > > > > > > > > wiki > > > > > > > > > > > > >>> page that includes: > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> 3.a. Augment OffsetFetchRequest with the > > > > ISOLATION_LEVEL > > > > > as > > > > > > > > well. > > > > > > > > > > > > >>> 3.b. Add new error code in OffsetFetchResponse to > > let > > > > > > client > > > > > > > > > > backoff > > > > > > > > > > > > and > > > > > > > > > > > > >>> retry if there are pending txns including the > > > > interested > > > > > > > > > > partitions. > > > > > > > > > > > > >>> 3.c. Also in the worst case we would let the > client > > > be > > > > > > > blocked > > > > > > > > > for > > > > > > > > > > > the > > > > > > > > > > > > >>> txn.timeout period, and for that rationale we may > > > need > > > > to > > > > > > > > > consider > > > > > > > > > > > > >>> reducing > > > > > > > > > > > > >>> our default txn.timeout value as well. > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> Addressed 3.b and 3.c, will do 3.a. > > > > > > > > > > > > > > > > > > > > > > > > > >> 4. According to Colin it seems we do not need to > > > create > > > > > > > another > > > > > > > > > KIP > > > > > > > > > > > and > > > > > > > > > > > > we > > > > > > > > > > > > >>> can just complete it as part of KIP-117 / > > KAFKA-5214; > > > > and > > > > > > we > > > > > > > > need > > > > > > > > > > to > > > > > > > > > > > do > > > > > > > > > > > > >>> some cleanup to have BrokerApiVersion exposed > from > > > > > > > AdminClient > > > > > > > > > > > (@Colin > > > > > > > > > > > > >>> please let use know if you have any concerns > > exposing > > > > > it). > > > > > > > > > > > > >>> > > > > > > > > > > > > >> I think we no longer need to rely on api version > for > > > > > > > > > initialization, > > > > > > > > > > > > > since we will be using the upgrade.from config > > anyway. > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > >>> Guozhang > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson < > > > > > > > > > > ja...@confluent.io> > > > > > > > > > > > > >>> wrote: > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > For reference, we have BrokerApiVersionCommand > > > > already > > > > > > as a > > > > > > > > > > public > > > > > > > > > > > > >>> > interface. We have a bit of tech debt at the > > moment > > > > > > because > > > > > > > > it > > > > > > > > > > > uses a > > > > > > > > > > > > >>> > custom AdminClient. It would be nice to clean > > that > > > > up. > > > > > In > > > > > > > > > > general, > > > > > > > > > > > I > > > > > > > > > > > > >>> think > > > > > > > > > > > > >>> > it is reasonable to expose from AdminClient. It > > can > > > > be > > > > > > used > > > > > > > > by > > > > > > > > > > > > >>> management > > > > > > > > > > > > >>> > tools to inspect running Kafka versions for > > > example. > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > -Jason > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen < > > > > > > > > > > > > >>> reluctanthero...@gmail.com> > > > > > > > > > > > > >>> > wrote: > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > Thank you for the context Colin. The groupId > > was > > > > > > indeed a > > > > > > > > > > > > copy-paste > > > > > > > > > > > > >>> > error. > > > > > > > > > > > > >>> > > Our use case here for 447 is (Quoted from > > > > Guozhang): > > > > > > > > > > > > >>> > > ''' > > > > > > > > > > > > >>> > > I think if we can do something else to > > > > > > > > > > > > >>> > > avoid this config though, for example we can > > use > > > > the > > > > > > > > embedded > > > > > > > > > > > > >>> AdminClient > > > > > > > > > > > > >>> > > to send the APIVersion request upon starting > > up, > > > > and > > > > > > > based > > > > > > > > on > > > > > > > > > > the > > > > > > > > > > > > >>> > returned > > > > > > > > > > > > >>> > > value decides whether to go to the old code > > path > > > or > > > > > the > > > > > > > new > > > > > > > > > > > > behavior. > > > > > > > > > > > > >>> > > ''' > > > > > > > > > > > > >>> > > The benefit we get is to avoid adding a new > > > > > > configuration > > > > > > > > to > > > > > > > > > > > make a > > > > > > > > > > > > >>> > > decision simply base on broker version. If > you > > > have > > > > > > > > concerns > > > > > > > > > > with > > > > > > > > > > > > >>> > exposing > > > > > > > > > > > > >>> > > ApiVersion for client, we could > > > > > > > > > > > > >>> > > try to think of alternative solutions too. > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > Boyang > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe > < > > > > > > > > > > cmcc...@apache.org > > > > > > > > > > > > > > > > > > > > > > > > >>> wrote: > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > kafka.api.ApiVersion is an internal class, > > not > > > > > > suitable > > > > > > > > to > > > > > > > > > > > > exposing > > > > > > > > > > > > >>> > > > through AdminClient. That class is not > even > > > > > > accessible > > > > > > > > > > without > > > > > > > > > > > > >>> having > > > > > > > > > > > > >>> > > the > > > > > > > > > > > > >>> > > > broker jars on your CLASSPATH. > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > Another question is, what is the groupId > > > > parameter > > > > > > > doing > > > > > > > > in > > > > > > > > > > the > > > > > > > > > > > > >>> call? > > > > > > > > > > > > >>> > > The > > > > > > > > > > > > >>> > > > API versions are the same no matter what > > > consumer > > > > > > group > > > > > > > > we > > > > > > > > > > use, > > > > > > > > > > > > >>> right? > > > > > > > > > > > > >>> > > > Perhaps this was a copy and paste error? > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > This is not the first time we have > discussed > > > > > having a > > > > > > > > > method > > > > > > > > > > in > > > > > > > > > > > > >>> > > > AdminClient to retrieve API version > > > information. > > > > > In > > > > > > > > fact, > > > > > > > > > > the > > > > > > > > > > > > >>> original > > > > > > > > > > > > >>> > > KIP > > > > > > > > > > > > >>> > > > which created KafkaAdminClient specified an > > API > > > > for > > > > > > > > > fetching > > > > > > > > > > > > >>> version > > > > > > > > > > > > >>> > > > information. It was called apiVersions and > > it > > > is > > > > > > still > > > > > > > > > there > > > > > > > > > > > on > > > > > > > > > > > > >>> the > > > > > > > > > > > > >>> > > wiki. > > > > > > > > > > > > >>> > > > See > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > However, this API wasn't ready in time for > > > 0.11.0 > > > > > so > > > > > > we > > > > > > > > > > shipped > > > > > > > > > > > > >>> without > > > > > > > > > > > > >>> > > > it. There was a JIRA to implement it for > > later > > > > > > > versions, > > > > > > > > > > > > >>> > > > > > > https://issues.apache.org/jira/browse/KAFKA-5214 > > > > , > > > > > > as > > > > > > > > well > > > > > > > > > > as > > > > > > > > > > > a > > > > > > > > > > > > >>> PR, > > > > > > > > > > > > >>> > > > https://github.com/apache/kafka/pull/3012 > . > > > > > > However, > > > > > > > we > > > > > > > > > > > started > > > > > > > > > > > > >>> to > > > > > > > > > > > > >>> > > > rethink whether this AdminClient function > was > > > > even > > > > > > > > > necessary. > > > > > > > > > > > > >>> Most of > > > > > > > > > > > > >>> > > the > > > > > > > > > > > > >>> > > > use-cases we could think of seemed like > > > horrible > > > > > > hacks. > > > > > > > > So > > > > > > > > > > it > > > > > > > > > > > > has > > > > > > > > > > > > >>> > never > > > > > > > > > > > > >>> > > > really been implemented (yet?). > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > best, > > > > > > > > > > > > >>> > > > Colin > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen > > > > wrote: > > > > > > > > > > > > >>> > > > > Actually, after a second thought, I think > > it > > > > > > actually > > > > > > > > > makes > > > > > > > > > > > > >>> sense to > > > > > > > > > > > > >>> > > > > support auto upgrade through admin client > > to > > > > help > > > > > > use > > > > > > > > get > > > > > > > > > > api > > > > > > > > > > > > >>> version > > > > > > > > > > > > >>> > > > > from > > > > > > > > > > > > >>> > > > > broker. > > > > > > > > > > > > >>> > > > > A draft KIP is here: > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > > Boyang > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang > > Chen < > > > > > > > > > > > > >>> > > reluctanthero...@gmail.com> > > > > > > > > > > > > >>> > > > > wrote: > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > > > Thank you Guozhang, some of my > > > understandings > > > > > are > > > > > > > > > inline > > > > > > > > > > > > below. > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason > > > > > Gustafson > > > > > > < > > > > > > > > > > > > >>> > ja...@confluent.io > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > wrote: > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > I think co-locating does have some > > > merits > > > > > > here, > > > > > > > > i.e. > > > > > > > > > > > > >>> letting the > > > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator which has the > > > > > > > source-of-truth > > > > > > > > of > > > > > > > > > > > > >>> assignment > > > > > > > > > > > > >>> > to > > > > > > > > > > > > >>> > > > act > > > > > > > > > > > > >>> > > > > >> as > > > > > > > > > > > > >>> > > > > >> > the TxnCoordinator as well; but I > > agree > > > > > > there's > > > > > > > > also > > > > > > > > > > > some > > > > > > > > > > > > >>> cons > > > > > > > > > > > > >>> > of > > > > > > > > > > > > >>> > > > > >> coupling > > > > > > > > > > > > >>> > > > > >> > them together. I'm still a bit > > inclining > > > > > > towards > > > > > > > > > > > > colocation > > > > > > > > > > > > >>> but > > > > > > > > > > > > >>> > if > > > > > > > > > > > > >>> > > > there > > > > > > > > > > > > >>> > > > > >> > are good rationales not to do so I > can > > > be > > > > > > > > convinced > > > > > > > > > as > > > > > > > > > > > > well. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> The good rationale is that we have no > > > > > mechanism > > > > > > to > > > > > > > > > > > colocate > > > > > > > > > > > > >>> > > > partitions ;). > > > > > > > > > > > > >>> > > > > >> Are you suggesting we store the group > > and > > > > > > > > transaction > > > > > > > > > > > state > > > > > > > > > > > > >>> in the > > > > > > > > > > > > >>> > > > same > > > > > > > > > > > > >>> > > > > >> log? Can you be more concrete about > the > > > > > benefit? > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> -Jason > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> On Tue, Jun 25, 2019 at 10:51 AM > > Guozhang > > > > > Wang < > > > > > > > > > > > > >>> > wangg...@gmail.com> > > > > > > > > > > > > >>> > > > > >> wrote: > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > Hi Boyang, > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > 1. One advantage of retry against > > > on-hold > > > > is > > > > > > > that > > > > > > > > it > > > > > > > > > > > will > > > > > > > > > > > > >>> not > > > > > > > > > > > > >>> > > > tie-up a > > > > > > > > > > > > >>> > > > > >> > handler thread (of course the latter > > > could > > > > > do > > > > > > > the > > > > > > > > > same > > > > > > > > > > > but > > > > > > > > > > > > >>> that > > > > > > > > > > > > >>> > > > involves > > > > > > > > > > > > >>> > > > > >> > using a purgatory which is more > > > > > complicated), > > > > > > > and > > > > > > > > > also > > > > > > > > > > > it > > > > > > > > > > > > is > > > > > > > > > > > > >>> > less > > > > > > > > > > > > >>> > > > > >> likely to > > > > > > > > > > > > >>> > > > > >> > violate request timeout. So I think > > > there > > > > > are > > > > > > > some > > > > > > > > > > > > >>> rationales to > > > > > > > > > > > > >>> > > > prefer > > > > > > > > > > > > >>> > > > > >> > retries. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > > That sounds fair to me, also we are > > > avoiding > > > > > > usage > > > > > > > > of > > > > > > > > > > > > another > > > > > > > > > > > > >>> > > > purgatory > > > > > > > > > > > > >>> > > > > > instance. Usually for one back-off > > > > > > > > > > > > >>> > > > > > we are only delaying 50ms during > startup > > > > which > > > > > is > > > > > > > > > trivial > > > > > > > > > > > > cost. > > > > > > > > > > > > >>> > This > > > > > > > > > > > > >>> > > > > > behavior shouldn't be changed. > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > > > 2. Regarding > > "ConsumerRebalanceListener": > > > > > both > > > > > > > > > > > > >>> > > > ConsumerRebalanceListener > > > > > > > > > > > > >>> > > > > >> > and PartitionAssignors are > > > > user-customizable > > > > > > > > > modules, > > > > > > > > > > > and > > > > > > > > > > > > >>> only > > > > > > > > > > > > >>> > > > > >> difference > > > > > > > > > > > > >>> > > > > >> > is that the former is specified via > > code > > > > and > > > > > > the > > > > > > > > > > latter > > > > > > > > > > > is > > > > > > > > > > > > >>> > > > specified via > > > > > > > > > > > > >>> > > > > >> > config. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > Regarding Jason's proposal of > > > > > > > ConsumerAssignment, > > > > > > > > > one > > > > > > > > > > > > thing > > > > > > > > > > > > >>> to > > > > > > > > > > > > >>> > > note > > > > > > > > > > > > >>> > > > > >> though > > > > > > > > > > > > >>> > > > > >> > with KIP-429 the onPartitionAssigned > > may > > > > not > > > > > > be > > > > > > > > > called > > > > > > > > > > > if > > > > > > > > > > > > >>> the > > > > > > > > > > > > >>> > > > assignment > > > > > > > > > > > > >>> > > > > >> > does not change, whereas > onAssignment > > > > would > > > > > > > always > > > > > > > > > be > > > > > > > > > > > > >>> called at > > > > > > > > > > > > >>> > > the > > > > > > > > > > > > >>> > > > end > > > > > > > > > > > > >>> > > > > >> of > > > > > > > > > > > > >>> > > > > >> > sync-group response. My proposed > > > semantics > > > > > is > > > > > > > that > > > > > > > > > > > > >>> > > > > >> > `RebalanceListener#onPartitionsXXX` > > are > > > > used > > > > > > for > > > > > > > > > > > > >>> notifications > > > > > > > > > > > > >>> > to > > > > > > > > > > > > >>> > > > user, > > > > > > > > > > > > >>> > > > > >> and > > > > > > > > > > > > >>> > > > > >> > hence if there's no changes these > will > > > not > > > > > be > > > > > > > > > called, > > > > > > > > > > > > >>> whereas > > > > > > > > > > > > >>> > > > > >> > `PartitionAssignor` is used for > > assignor > > > > > > logic, > > > > > > > > > whose > > > > > > > > > > > > >>> callback > > > > > > > > > > > > >>> > > would > > > > > > > > > > > > >>> > > > > >> always > > > > > > > > > > > > >>> > > > > >> > be called no matter if the > partitions > > > have > > > > > > > changed > > > > > > > > > or > > > > > > > > > > > not. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> I think a third option is to > gracefully > > > > expose > > > > > > > > > > generation > > > > > > > > > > > id > > > > > > > > > > > > >>> as > > > > > > > > > > > > >>> > part > > > > > > > > > > > > >>> > > > of > > > > > > > > > > > > >>> > > > > > consumer API, so that we don't need to > > > > > > > > > > > > >>> > > > > > bother overloading various callbacks. > Of > > > > > course, > > > > > > > this > > > > > > > > > > > builds > > > > > > > > > > > > >>> upon > > > > > > > > > > > > >>> > the > > > > > > > > > > > > >>> > > > > > assumption that topic partitions > > > > > > > > > > > > >>> > > > > > will not be included in new > > initTransaction > > > > > API. > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > > > 3. I feel it is a bit awkward to let > > the > > > > > > > > > TxnCoordinator > > > > > > > > > > > > >>> keeping > > > > > > > > > > > > >>> > > > partition > > > > > > > > > > > > >>> > > > > >> > assignments since it is sort of > taking > > > > over > > > > > > the > > > > > > > > job > > > > > > > > > of > > > > > > > > > > > the > > > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator, and may likely > > > cause > > > > a > > > > > > > > > > split-brain > > > > > > > > > > > > >>> problem > > > > > > > > > > > > >>> > as > > > > > > > > > > > > >>> > > > two > > > > > > > > > > > > >>> > > > > >> > coordinators keep a copy of this > > > > assignment > > > > > > > which > > > > > > > > > may > > > > > > > > > > be > > > > > > > > > > > > >>> > > different. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > I think co-locating does have some > > > merits > > > > > > here, > > > > > > > > i.e. > > > > > > > > > > > > >>> letting the > > > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator which has the > > > > > > > source-of-truth > > > > > > > > of > > > > > > > > > > > > >>> assignment > > > > > > > > > > > > >>> > to > > > > > > > > > > > > >>> > > > act > > > > > > > > > > > > >>> > > > > >> as > > > > > > > > > > > > >>> > > > > >> > the TxnCoordinator as well; but I > > agree > > > > > > there's > > > > > > > > also > > > > > > > > > > > some > > > > > > > > > > > > >>> cons > > > > > > > > > > > > >>> > of > > > > > > > > > > > > >>> > > > > >> coupling > > > > > > > > > > > > >>> > > > > >> > them together. I'm still a bit > > inclining > > > > > > towards > > > > > > > > > > > > colocation > > > > > > > > > > > > >>> but > > > > > > > > > > > > >>> > if > > > > > > > > > > > > >>> > > > there > > > > > > > > > > > > >>> > > > > >> > are good rationales not to do so I > can > > > be > > > > > > > > convinced > > > > > > > > > as > > > > > > > > > > > > well. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > > The purpose of co-location is to let > txn > > > > > > > coordinator > > > > > > > > > see > > > > > > > > > > > the > > > > > > > > > > > > >>> group > > > > > > > > > > > > >>> > > > > > assignment. This priority is weakened > > > > > > > > > > > > >>> > > > > > when we already have defense on the > > > consumer > > > > > > offset > > > > > > > > > > fetch, > > > > > > > > > > > > so I > > > > > > > > > > > > >>> > guess > > > > > > > > > > > > >>> > > > it's > > > > > > > > > > > > >>> > > > > > not super important anymore. > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > >> > 4. I guess I'm preferring the > > philosophy > > > > of > > > > > > > "only > > > > > > > > > add > > > > > > > > > > > > >>> configs if > > > > > > > > > > > > >>> > > > > >> there's no > > > > > > > > > > > > >>> > > > > >> > other ways", since more and more > > configs > > > > > would > > > > > > > > make > > > > > > > > > it > > > > > > > > > > > > less > > > > > > > > > > > > >>> and > > > > > > > > > > > > >>> > > less > > > > > > > > > > > > >>> > > > > >> > intuitive out of the box to use. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > I think it's a valid point that > checks > > > > upon > > > > > > > > starting > > > > > > > > > > up > > > > > > > > > > > > >>> does not > > > > > > > > > > > > >>> > > > cope > > > > > > > > > > > > >>> > > > > >> with > > > > > > > > > > > > >>> > > > > >> > brokers downgrading but even with a > > > > config, > > > > > > but > > > > > > > it > > > > > > > > > is > > > > > > > > > > > > still > > > > > > > > > > > > >>> hard > > > > > > > > > > > > >>> > > for > > > > > > > > > > > > >>> > > > > >> users > > > > > > > > > > > > >>> > > > > >> > to determine when they can be > ensured > > > the > > > > > > broker > > > > > > > > > would > > > > > > > > > > > > never > > > > > > > > > > > > >>> > > > downgrade > > > > > > > > > > > > >>> > > > > >> > anymore and hence can safely switch > > the > > > > > > config. > > > > > > > So > > > > > > > > > my > > > > > > > > > > > > >>> feeling is > > > > > > > > > > > > >>> > > > that > > > > > > > > > > > > >>> > > > > >> this > > > > > > > > > > > > >>> > > > > >> > config would not be helping too much > > > > still. > > > > > If > > > > > > > we > > > > > > > > > want > > > > > > > > > > > to > > > > > > > > > > > > >>> be at > > > > > > > > > > > > >>> > > the > > > > > > > > > > > > >>> > > > > >> safer > > > > > > > > > > > > >>> > > > > >> > side, then I'd suggest we modify the > > > > > > Coordinator > > > > > > > > -> > > > > > > > > > > > > >>> > NetworkClient > > > > > > > > > > > > >>> > > > > >> hierarchy > > > > > > > > > > > > >>> > > > > >> > to allow the NetworkClient being > able > > to > > > > > pass > > > > > > > the > > > > > > > > > > > > APIVersion > > > > > > > > > > > > >>> > > > metadata to > > > > > > > > > > > > >>> > > > > >> > Coordinator, so that Coordinator can > > > rely > > > > on > > > > > > > that > > > > > > > > > > logic > > > > > > > > > > > to > > > > > > > > > > > > >>> > change > > > > > > > > > > > > >>> > > > its > > > > > > > > > > > > >>> > > > > >> > behavior dynamically. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > > The stream thread init could not be > > > supported > > > > > by > > > > > > a > > > > > > > > > client > > > > > > > > > > > > >>> > coordinator > > > > > > > > > > > > >>> > > > > > behavior change on the fly, > > > > > > > > > > > > >>> > > > > > we are only losing possibilities after > we > > > > > > > > initialized. > > > > > > > > > > > (main > > > > > > > > > > > > >>> thread > > > > > > > > > > > > >>> > > > gets > > > > > > > > > > > > >>> > > > > > exit and no thread has global picture > > > > anymore) > > > > > > > > > > > > >>> > > > > > If we do want to support auto version > > > > > detection, > > > > > > > > admin > > > > > > > > > > > client > > > > > > > > > > > > >>> > request > > > > > > > > > > > > >>> > > > in > > > > > > > > > > > > >>> > > > > > this sense shall be easier. > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > 5. I do not have a concrete idea > about > > > how > > > > > the > > > > > > > > > impact > > > > > > > > > > on > > > > > > > > > > > > >>> Connect > > > > > > > > > > > > >>> > > > would > > > > > > > > > > > > >>> > > > > >> > make, maybe Randall or Konstantine > can > > > > help > > > > > > > here? > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > > Sounds good, let's see their thoughts. > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > >> > Guozhang > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > On Mon, Jun 24, 2019 at 10:26 PM > > Boyang > > > > > Chen < > > > > > > > > > > > > >>> > > > > >> reluctanthero...@gmail.com> > > > > > > > > > > > > >>> > > > > >> > wrote: > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > Hey Jason, > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > thank you for the proposal here. > > Some > > > of > > > > > my > > > > > > > > > thoughts > > > > > > > > > > > > >>> below. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 8:58 PM > > Jason > > > > > > > Gustafson > > > > > > > > < > > > > > > > > > > > > >>> > > > ja...@confluent.io> > > > > > > > > > > > > >>> > > > > >> > > wrote: > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > Hi Boyang, > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > Thanks for picking this up! > Still > > > > > reading > > > > > > > > > through > > > > > > > > > > > the > > > > > > > > > > > > >>> > updates, > > > > > > > > > > > > >>> > > > but > > > > > > > > > > > > >>> > > > > >> here > > > > > > > > > > > > >>> > > > > >> > > are > > > > > > > > > > > > >>> > > > > >> > > > a couple initial comments on the > > > APIs: > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > 1. The `TxnProducerIdentity` > class > > > is > > > > a > > > > > > bit > > > > > > > > > > > awkward. I > > > > > > > > > > > > >>> think > > > > > > > > > > > > >>> > > we > > > > > > > > > > > > >>> > > > are > > > > > > > > > > > > >>> > > > > >> > > trying > > > > > > > > > > > > >>> > > > > >> > > > to encapsulate state from the > > > current > > > > > > group > > > > > > > > > > > > assignment. > > > > > > > > > > > > >>> > Maybe > > > > > > > > > > > > >>> > > > > >> something > > > > > > > > > > > > >>> > > > > >> > > > like `ConsumerAssignment` would > be > > > > > > clearer? > > > > > > > If > > > > > > > > > we > > > > > > > > > > > make > > > > > > > > > > > > >>> the > > > > > > > > > > > > >>> > > usage > > > > > > > > > > > > >>> > > > > >> > > consistent > > > > > > > > > > > > >>> > > > > >> > > > across the consumer and > producer, > > > then > > > > > we > > > > > > > can > > > > > > > > > > avoid > > > > > > > > > > > > >>> exposing > > > > > > > > > > > > >>> > > > > >> internal > > > > > > > > > > > > >>> > > > > >> > > state > > > > > > > > > > > > >>> > > > > >> > > > like the generationId. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > For example: > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > // Public API > > > > > > > > > > > > >>> > > > > >> > > > interface ConsumerAssignment { > > > > > > > > > > > > >>> > > > > >> > > > Set<TopicPartition> > > partittions(); > > > > > > > > > > > > >>> > > > > >> > > > } > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > // Not a public API > > > > > > > > > > > > >>> > > > > >> > > > class InternalConsumerAssignment > > > > > > implements > > > > > > > > > > > > >>> > > ConsumerAssignment { > > > > > > > > > > > > >>> > > > > >> > > > Set<TopicPartition> > partittions; > > > > > > > > > > > > >>> > > > > >> > > > int generationId; > > > > > > > > > > > > >>> > > > > >> > > > } > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > Then we can change the rebalance > > > > > listener > > > > > > to > > > > > > > > > > > something > > > > > > > > > > > > >>> like > > > > > > > > > > > > >>> > > > this: > > > > > > > > > > > > >>> > > > > >> > > > > > > > onPartitionsAssigned(ConsumerAssignment > > > > > > > > > > assignment) > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > And on the producer: > > > > > > > > > > > > >>> > > > > >> > > > void initTransactions(String > > > groupId, > > > > > > > > > > > > ConsumerAssignment > > > > > > > > > > > > >>> > > > > >> assignment); > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > 2. Another bit of awkwardness is > > the > > > > > fact > > > > > > > that > > > > > > > > > we > > > > > > > > > > > have > > > > > > > > > > > > >>> to > > > > > > > > > > > > >>> > pass > > > > > > > > > > > > >>> > > > the > > > > > > > > > > > > >>> > > > > >> > > groupId > > > > > > > > > > > > >>> > > > > >> > > > through both initTransactions() > > and > > > > > > > > > > > > >>> > > sendOffsetsToTransaction(). > > > > > > > > > > > > >>> > > > We > > > > > > > > > > > > >>> > > > > >> > could > > > > > > > > > > > > >>> > > > > >> > > > consider a config instead. Maybe > > > > > something > > > > > > > > like > > > > > > > > > ` > > > > > > > > > > > > >>> > > > > >> > transactional.group.id > > > > > > > > > > > > >>> > > > > >> > > `? > > > > > > > > > > > > >>> > > > > >> > > > Then we could simplify the > > producer > > > > > APIs, > > > > > > > > > > > potentially > > > > > > > > > > > > >>> even > > > > > > > > > > > > >>> > > > > >> deprecating > > > > > > > > > > > > >>> > > > > >> > > the > > > > > > > > > > > > >>> > > > > >> > > > current > sendOffsetsToTransaction. > > In > > > > > fact, > > > > > > > for > > > > > > > > > > this > > > > > > > > > > > > new > > > > > > > > > > > > >>> > usage, > > > > > > > > > > > > >>> > > > the ` > > > > > > > > > > > > >>> > > > > >> > > > transational.id` config is not > > > > needed. > > > > > It > > > > > > > > would > > > > > > > > > > be > > > > > > > > > > > > >>> nice if > > > > > > > > > > > > >>> > we > > > > > > > > > > > > >>> > > > don't > > > > > > > > > > > > >>> > > > > >> > have > > > > > > > > > > > > >>> > > > > >> > > > to > > > > > > > > > > > > >>> > > > > >> > > > provide it. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > I like the idea of combining 1 > and > > > 2. > > > > We > > > > > > > could > > > > > > > > > > > > >>> definitely > > > > > > > > > > > > >>> > pass > > > > > > > > > > > > >>> > > > in a > > > > > > > > > > > > >>> > > > > >> > > group.id config > > > > > > > > > > > > >>> > > > > >> > > so that we could avoid exposing > that > > > > > > > information > > > > > > > > > in > > > > > > > > > > a > > > > > > > > > > > > >>> public > > > > > > > > > > > > >>> > > API. > > > > > > > > > > > > >>> > > > The > > > > > > > > > > > > >>> > > > > >> > > question I have > > > > > > > > > > > > >>> > > > > >> > > is that whether we should name the > > > > > interface > > > > > > > > > > > > >>> `GroupAssignment` > > > > > > > > > > > > >>> > > > > >> instead, > > > > > > > > > > > > >>> > > > > >> > so > > > > > > > > > > > > >>> > > > > >> > > that Connect later > > > > > > > > > > > > >>> > > > > >> > > could also extend on the same > > > interface, > > > > > > just > > > > > > > to > > > > > > > > > > echo > > > > > > > > > > > > >>> > Guozhang's > > > > > > > > > > > > >>> > > > point > > > > > > > > > > > > >>> > > > > >> > > here, Also the base interface > > > > > > > > > > > > >>> > > > > >> > > is better to be defined empty for > > easy > > > > > > > > extension, > > > > > > > > > or > > > > > > > > > > > > >>> define an > > > > > > > > > > > > >>> > > > > >> abstract > > > > > > > > > > > > >>> > > > > >> > > type called `Resource` to be > > shareable > > > > > > > > > > > > >>> > > > > >> > > later IMHO. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > By the way, I'm a bit confused > > about > > > > > > > > discussion > > > > > > > > > > > above > > > > > > > > > > > > >>> about > > > > > > > > > > > > >>> > > > > >> colocating > > > > > > > > > > > > >>> > > > > >> > > the > > > > > > > > > > > > >>> > > > > >> > > > txn and group coordinators. That > > is > > > > not > > > > > > > > actually > > > > > > > > > > > > >>> necessary, > > > > > > > > > > > > >>> > is > > > > > > > > > > > > >>> > > > it? > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > Yes, this is not a requirement > for > > > > this > > > > > > KIP, > > > > > > > > > > because > > > > > > > > > > > > it > > > > > > > > > > > > >>> is > > > > > > > > > > > > >>> > > > > >> inherently > > > > > > > > > > > > >>> > > > > >> > > impossible to > > > > > > > > > > > > >>> > > > > >> > > achieve co-locating topic > partition > > > of > > > > > > > > > transaction > > > > > > > > > > > log > > > > > > > > > > > > >>> and > > > > > > > > > > > > >>> > > > consumed > > > > > > > > > > > > >>> > > > > >> > offset > > > > > > > > > > > > >>> > > > > >> > > topics. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > Thanks, > > > > > > > > > > > > >>> > > > > >> > > > Jason > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM > > > Boyang > > > > > > Chen < > > > > > > > > > > > > >>> > > > > >> reluctanthero...@gmail.com > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > wrote: > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > Thank you Ismael for the > > > suggestion. > > > > > We > > > > > > > will > > > > > > > > > > > attempt > > > > > > > > > > > > >>> to > > > > > > > > > > > > >>> > > > address > > > > > > > > > > > > >>> > > > > >> it by > > > > > > > > > > > > >>> > > > > >> > > > > giving more details to > rejected > > > > > > > alternative > > > > > > > > > > > section. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > Thank you for the comment > > > Guozhang! > > > > > > > Answers > > > > > > > > > are > > > > > > > > > > > > inline > > > > > > > > > > > > >>> > > below. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33 > PM > > > > > Guozhang > > > > > > > > Wang > > > > > > > > > < > > > > > > > > > > > > >>> > > > wangg...@gmail.com > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > wrote: > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > Hello Boyang, > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > Thanks for the KIP, I have > > some > > > > > > comments > > > > > > > > > > below: > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > 1. "Once transactions are > > > > complete, > > > > > > the > > > > > > > > call > > > > > > > > > > > will > > > > > > > > > > > > >>> > return." > > > > > > > > > > > > >>> > > > This > > > > > > > > > > > > >>> > > > > >> > seems > > > > > > > > > > > > >>> > > > > >> > > > > > different from the existing > > > > > behavior, > > > > > > in > > > > > > > > > which > > > > > > > > > > > we > > > > > > > > > > > > >>> would > > > > > > > > > > > > >>> > > > return a > > > > > > > > > > > > >>> > > > > >> > > > > retriable > > > > > > > > > > > > >>> > > > > >> > > > > > CONCURRENT_TRANSACTIONS and > > let > > > > the > > > > > > > client > > > > > > > > > to > > > > > > > > > > > > >>> retry, is > > > > > > > > > > > > >>> > > this > > > > > > > > > > > > >>> > > > > >> > > > intentional? > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > I don’t think it is > intentional, > > > > and I > > > > > > > will > > > > > > > > > > defer > > > > > > > > > > > > this > > > > > > > > > > > > >>> > > > question to > > > > > > > > > > > > >>> > > > > >> > > Jason > > > > > > > > > > > > >>> > > > > >> > > > > when he got time to answer > since > > > > from > > > > > > > what I > > > > > > > > > > > > >>> understood > > > > > > > > > > > > >>> > > retry > > > > > > > > > > > > >>> > > > and > > > > > > > > > > > > >>> > > > > >> on > > > > > > > > > > > > >>> > > > > >> > > hold > > > > > > > > > > > > >>> > > > > >> > > > > seem both valid approaches. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > 2. "an overload to > > > > > > onPartitionsAssigned > > > > > > > in > > > > > > > > > the > > > > > > > > > > > > >>> > consumer's > > > > > > > > > > > > >>> > > > > >> rebalance > > > > > > > > > > > > >>> > > > > >> > > > > > listener interface": as part > > of > > > > > > KIP-341 > > > > > > > > > we've > > > > > > > > > > > > >>> already > > > > > > > > > > > > >>> > add > > > > > > > > > > > > >>> > > > this > > > > > > > > > > > > >>> > > > > >> > > > > information > > > > > > > > > > > > >>> > > > > >> > > > > > to the onAssignment > callback. > > > > Would > > > > > > this > > > > > > > > be > > > > > > > > > > > > >>> sufficient? > > > > > > > > > > > > >>> > Or > > > > > > > > > > > > >>> > > > more > > > > > > > > > > > > >>> > > > > >> > > > generally > > > > > > > > > > > > >>> > > > > >> > > > > > speaking, which information > > have > > > > to > > > > > be > > > > > > > > > passed > > > > > > > > > > > > >>> around in > > > > > > > > > > > > >>> > > > > >> rebalance > > > > > > > > > > > > >>> > > > > >> > > > > callback > > > > > > > > > > > > >>> > > > > >> > > > > > while others can be passed > > > around > > > > in > > > > > > > > > > > > >>> PartitionAssignor > > > > > > > > > > > > >>> > > > > >> callback? In > > > > > > > > > > > > >>> > > > > >> > > > > Streams > > > > > > > > > > > > >>> > > > > >> > > > > > for example both callbacks > are > > > > used > > > > > > but > > > > > > > > most > > > > > > > > > > > > >>> critical > > > > > > > > > > > > >>> > > > > >> information > > > > > > > > > > > > >>> > > > > >> > is > > > > > > > > > > > > >>> > > > > >> > > > > passed > > > > > > > > > > > > >>> > > > > >> > > > > > via onAssignment. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > We still need to extend > > > > > > > > > > ConsumerRebalanceListener > > > > > > > > > > > > >>> because > > > > > > > > > > > > >>> > > > it’s the > > > > > > > > > > > > >>> > > > > >> > > > > interface we could have public > > > > access > > > > > > to. > > > > > > > > The > > > > > > > > > > > > >>> > #onAssignment > > > > > > > > > > > > >>> > > > call > > > > > > > > > > > > >>> > > > > >> is > > > > > > > > > > > > >>> > > > > >> > > > defined > > > > > > > > > > > > >>> > > > > >> > > > > on PartitionAssignor level > which > > > is > > > > > not > > > > > > > easy > > > > > > > > > to > > > > > > > > > > > work > > > > > > > > > > > > >>> with > > > > > > > > > > > > >>> > > > external > > > > > > > > > > > > >>> > > > > >> > > > > producers. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > 3. "We propose to use a > > separate > > > > > > record > > > > > > > > type > > > > > > > > > > in > > > > > > > > > > > > >>> order to > > > > > > > > > > > > >>> > > > store > > > > > > > > > > > > >>> > > > > >> the > > > > > > > > > > > > >>> > > > > >> > > > group > > > > > > > > > > > > >>> > > > > >> > > > > > assignment.": hmm, I thought > > > with > > > > > the > > > > > > > > third > > > > > > > > > > > typed > > > > > > > > > > > > >>> > > > > >> FindCoordinator, > > > > > > > > > > > > >>> > > > > >> > > the > > > > > > > > > > > > >>> > > > > >> > > > > same > > > > > > > > > > > > >>> > > > > >> > > > > > broker that act as the > > consumer > > > > > > > > coordinator > > > > > > > > > > > would > > > > > > > > > > > > >>> > always > > > > > > > > > > > > >>> > > be > > > > > > > > > > > > >>> > > > > >> > selected > > > > > > > > > > > > >>> > > > > >> > > > as > > > > > > > > > > > > >>> > > > > >> > > > > > the txn coordinator, in > which > > > case > > > > > it > > > > > > > can > > > > > > > > > > access > > > > > > > > > > > > its > > > > > > > > > > > > >>> > local > > > > > > > > > > > > >>> > > > cache > > > > > > > > > > > > >>> > > > > >> > > > > metadata / > > > > > > > > > > > > >>> > > > > >> > > > > > offset topic to get this > > > > information > > > > > > > > > already? > > > > > > > > > > We > > > > > > > > > > > > >>> just > > > > > > > > > > > > >>> > need > > > > > > > > > > > > >>> > > > to > > > > > > > > > > > > >>> > > > > >> think > > > > > > > > > > > > >>> > > > > >> > > > about > > > > > > > > > > > > >>> > > > > >> > > > > > how to make these two > modules > > > > > directly > > > > > > > > > > exchange > > > > > > > > > > > > >>> > > information > > > > > > > > > > > > >>> > > > > >> without > > > > > > > > > > > > >>> > > > > >> > > > > messing > > > > > > > > > > > > >>> > > > > >> > > > > > up the code hierarchy. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > These two coordinators will be > > on > > > > the > > > > > > same > > > > > > > > > > broker > > > > > > > > > > > > only > > > > > > > > > > > > >>> > when > > > > > > > > > > > > >>> > > > > >> number of > > > > > > > > > > > > >>> > > > > >> > > > > partitions for transaction > state > > > > topic > > > > > > and > > > > > > > > > > > consumer > > > > > > > > > > > > >>> offset > > > > > > > > > > > > >>> > > > topic > > > > > > > > > > > > >>> > > > > >> are > > > > > > > > > > > > >>> > > > > >> > > the > > > > > > > > > > > > >>> > > > > >> > > > > same. This normally holds > true, > > > but > > > > > I'm > > > > > > > > afraid > > > > > > > > > > > > >>> > > > > >> > > > > we couldn't make this > > assumption? > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > 4. The config of > > > > > > > > > > > "CONSUMER_GROUP_AWARE_TRANSACTION": > > > > > > > > > > > > >>> it > > > > > > > > > > > > >>> > > seems > > > > > > > > > > > > >>> > > > the > > > > > > > > > > > > >>> > > > > >> > goal > > > > > > > > > > > > >>> > > > > >> > > of > > > > > > > > > > > > >>> > > > > >> > > > > > this config is just to avoid > > > > > > > old-versioned > > > > > > > > > > > broker > > > > > > > > > > > > >>> to not > > > > > > > > > > > > >>> > > be > > > > > > > > > > > > >>> > > > > >> able to > > > > > > > > > > > > >>> > > > > >> > > > > > recognize newer versioned > > > client. > > > > I > > > > > > > think > > > > > > > > if > > > > > > > > > > we > > > > > > > > > > > > can > > > > > > > > > > > > >>> do > > > > > > > > > > > > >>> > > > something > > > > > > > > > > > > >>> > > > > >> > else > > > > > > > > > > > > >>> > > > > >> > > > to > > > > > > > > > > > > >>> > > > > >> > > > > > avoid this config though, > for > > > > > example > > > > > > we > > > > > > > > can > > > > > > > > > > use > > > > > > > > > > > > the > > > > > > > > > > > > >>> > > > embedded > > > > > > > > > > > > >>> > > > > >> > > > AdminClient > > > > > > > > > > > > >>> > > > > >> > > > > > to send the APIVersion > request > > > > upon > > > > > > > > starting > > > > > > > > > > up, > > > > > > > > > > > > and > > > > > > > > > > > > >>> > based > > > > > > > > > > > > >>> > > > on > > > > > > > > > > > > >>> > > > > >> the > > > > > > > > > > > > >>> > > > > >> > > > > returned > > > > > > > > > > > > >>> > > > > >> > > > > > value decides whether to go > to > > > the > > > > > old > > > > > > > > code > > > > > > > > > > path > > > > > > > > > > > > or > > > > > > > > > > > > >>> the > > > > > > > > > > > > >>> > > new > > > > > > > > > > > > >>> > > > > >> > behavior. > > > > > > > > > > > > >>> > > > > >> > > > > > Admittedly asking a random > > > broker > > > > > > about > > > > > > > > > > > APIVersion > > > > > > > > > > > > >>> does > > > > > > > > > > > > >>> > > not > > > > > > > > > > > > >>> > > > > >> > guarantee > > > > > > > > > > > > >>> > > > > >> > > > the > > > > > > > > > > > > >>> > > > > >> > > > > > whole cluster's versions, > but > > > what > > > > > we > > > > > > > can > > > > > > > > do > > > > > > > > > > is > > > > > > > > > > > to > > > > > > > > > > > > >>> first > > > > > > > > > > > > >>> > > 1) > > > > > > > > > > > > >>> > > > find > > > > > > > > > > > > >>> > > > > >> > the > > > > > > > > > > > > >>> > > > > >> > > > > > coordinator (and if the > random > > > > > broker > > > > > > > does > > > > > > > > > not > > > > > > > > > > > > even > > > > > > > > > > > > >>> > > > recognize > > > > > > > > > > > > >>> > > > > >> the > > > > > > > > > > > > >>> > > > > >> > new > > > > > > > > > > > > >>> > > > > >> > > > > > discover type, fall back to > > old > > > > path > > > > > > > > > > directly), > > > > > > > > > > > > and > > > > > > > > > > > > >>> then > > > > > > > > > > > > >>> > > 2) > > > > > > > > > > > > >>> > > > ask > > > > > > > > > > > > >>> > > > > >> the > > > > > > > > > > > > >>> > > > > >> > > > > > discovered coordinator about > > its > > > > > > > supported > > > > > > > > > > > > >>> APIVersion. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > The caveat here is that we > have > > to > > > > > make > > > > > > > sure > > > > > > > > > > both > > > > > > > > > > > > the > > > > > > > > > > > > >>> > group > > > > > > > > > > > > >>> > > > > >> > coordinator > > > > > > > > > > > > >>> > > > > >> > > > and > > > > > > > > > > > > >>> > > > > >> > > > > transaction coordinator are on > > the > > > > > > latest > > > > > > > > > > version > > > > > > > > > > > > >>> during > > > > > > > > > > > > >>> > > init > > > > > > > > > > > > >>> > > > > >> stage. > > > > > > > > > > > > >>> > > > > >> > > This > > > > > > > > > > > > >>> > > > > >> > > > > is potentially doable as we > only > > > > need > > > > > a > > > > > > > > > consumer > > > > > > > > > > > > >>> group.id > > > > > > > > > > > > >>> > > > > >> > > > > to check that. In the > meantime, > > a > > > > > > > hard-coded > > > > > > > > > > > config > > > > > > > > > > > > is > > > > > > > > > > > > >>> > > still a > > > > > > > > > > > > >>> > > > > >> > > favorable > > > > > > > > > > > > >>> > > > > >> > > > > backup in case the server has > > > > > > downgraded, > > > > > > > so > > > > > > > > > you > > > > > > > > > > > > will > > > > > > > > > > > > >>> want > > > > > > > > > > > > >>> > > to > > > > > > > > > > > > >>> > > > use > > > > > > > > > > > > >>> > > > > >> a > > > > > > > > > > > > >>> > > > > >> > new > > > > > > > > > > > > >>> > > > > >> > > > > version client without > `consumer > > > > > group` > > > > > > > > > > > > transactional > > > > > > > > > > > > >>> > > support. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > 5. This is a meta question: > have > > > you > > > > > > > > > considered > > > > > > > > > > > how > > > > > > > > > > > > >>> this > > > > > > > > > > > > >>> > can > > > > > > > > > > > > >>> > > > be > > > > > > > > > > > > >>> > > > > >> > applied > > > > > > > > > > > > >>> > > > > >> > > > to > > > > > > > > > > > > >>> > > > > >> > > > > > Kafka Connect as well? For > > > > example, > > > > > > for > > > > > > > > > source > > > > > > > > > > > > >>> > connectors, > > > > > > > > > > > > >>> > > > the > > > > > > > > > > > > >>> > > > > >> > > > assignment > > > > > > > > > > > > >>> > > > > >> > > > > > is not by "partitions", but > by > > > > some > > > > > > > other > > > > > > > > > sort > > > > > > > > > > > of > > > > > > > > > > > > >>> > > > "resources" > > > > > > > > > > > > >>> > > > > >> based > > > > > > > > > > > > >>> > > > > >> > > on > > > > > > > > > > > > >>> > > > > >> > > > > the > > > > > > > > > > > > >>> > > > > >> > > > > > source systems, how KIP-447 > > > would > > > > > > affect > > > > > > > > > Kafka > > > > > > > > > > > > >>> > Connectors > > > > > > > > > > > > >>> > > > that > > > > > > > > > > > > >>> > > > > >> > > > > implemented > > > > > > > > > > > > >>> > > > > >> > > > > > EOS as well? > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > No, it's not currently > included > > in > > > > the > > > > > > > > scope. > > > > > > > > > > > Could > > > > > > > > > > > > >>> you > > > > > > > > > > > > >>> > > point > > > > > > > > > > > > >>> > > > me > > > > > > > > > > > > >>> > > > > >> to a > > > > > > > > > > > > >>> > > > > >> > > > > sample source connector who > uses > > > > EOS? > > > > > > > Could > > > > > > > > > > always > > > > > > > > > > > > >>> > > piggy-back > > > > > > > > > > > > >>> > > > into > > > > > > > > > > > > >>> > > > > >> > the > > > > > > > > > > > > >>> > > > > >> > > > > TxnProducerIdentity struct > with > > > more > > > > > > > > > information > > > > > > > > > > > > such > > > > > > > > > > > > >>> as > > > > > > > > > > > > >>> > > > tasks. If > > > > > > > > > > > > >>> > > > > >> > > > > this is something to support > in > > > near > > > > > > term, > > > > > > > > an > > > > > > > > > > > > abstract > > > > > > > > > > > > >>> > type > > > > > > > > > > > > >>> > > > called > > > > > > > > > > > > >>> > > > > >> > > > > "Resource" could be provided > and > > > let > > > > > > topic > > > > > > > > > > > partition > > > > > > > > > > > > >>> and > > > > > > > > > > > > >>> > > > connect > > > > > > > > > > > > >>> > > > > >> task > > > > > > > > > > > > >>> > > > > >> > > > > implement it. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > Guozhang > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > On Sat, Jun 22, 2019 at 8:40 > > PM > > > > > Ismael > > > > > > > > Juma > > > > > > > > > < > > > > > > > > > > > > >>> > > > ism...@juma.me.uk> > > > > > > > > > > > > >>> > > > > >> > > wrote: > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > Hi Boyang, > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > Thanks for the KIP. It's > > good > > > > that > > > > > > we > > > > > > > > > > listed a > > > > > > > > > > > > >>> number > > > > > > > > > > > > >>> > of > > > > > > > > > > > > >>> > > > > >> rejected > > > > > > > > > > > > >>> > > > > >> > > > > > > alternatives. It would be > > > > helpful > > > > > to > > > > > > > > have > > > > > > > > > an > > > > > > > > > > > > >>> > explanation > > > > > > > > > > > > >>> > > > of > > > > > > > > > > > > >>> > > > > >> why > > > > > > > > > > > > >>> > > > > >> > > they > > > > > > > > > > > > >>> > > > > >> > > > > were > > > > > > > > > > > > >>> > > > > >> > > > > > > rejected. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > Ismael > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > On Sat, Jun 22, 2019 at > 8:31 > > > PM > > > > > > Boyang > > > > > > > > > Chen > > > > > > > > > > < > > > > > > > > > > > > >>> > > > > >> bche...@outlook.com > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > wrote: > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > Hey all, > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > I would like to start a > > > > > discussion > > > > > > > forhttps://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > this is a work > originated > > by > > > > > Jason > > > > > > > > > > Gustafson > > > > > > > > > > > > >>> and we > > > > > > > > > > > > >>> > > > would > > > > > > > > > > > > >>> > > > > >> like > > > > > > > > > > > > >>> > > > > >> > to > > > > > > > > > > > > >>> > > > > >> > > > > > proceed > > > > > > > > > > > > >>> > > > > >> > > > > > > > into discussion stage. > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > Let me know your > thoughts, > > > > > thanks! > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > Boyang > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > -- > > > > > > > > > > > > >>> > > > > >> > > > > > -- Guozhang > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > -- > > > > > > > > > > > > >>> > > > > >> > -- Guozhang > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> -- > > > > > > > > > > > > >>> -- Guozhang > > > > > > > > > > > > >>> > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > >