Yep, Guozhang I think that would be best as passing in an entire consumer instance is indeed cumbersome.
Just saw you updated KIP-429, I will follow-up to change 447 as well. Best, Boyang On Mon, Aug 5, 2019 at 3:18 PM Guozhang Wang <wangg...@gmail.com> wrote: > okay I think I understand your concerns about ConsumerGroupMetadata now: if > we still want to only call initTxns once, then we should allow the whatever > passed-in parameter to reflect the latest value of generation id whenever > sending the offset fetch request. > > Whereas the current ConsumerGroupMetadata is a static object. > > Maybe we can consider having an extended class of ConsumerGroupMetadata > whose values are updated from the consumer's rebalance callback? > > > Guozhang > > > On Mon, Aug 5, 2019 at 9:26 AM Boyang Chen <reluctanthero...@gmail.com> > wrote: > > > Thank you Guozhang for the reply! I'm curious whether KIP-429 has > reflected > > the latest change on ConsumerGroupMetadata? Also regarding question one, > > the group metadata needs to be accessed via callback, does that mean we > > need a separate producer API such like > > "producer.refreshMetadata(groupMetadata)" to be able to access it instead > > of passing in the consumer instance? > > > > Boyang > > > > On Fri, Aug 2, 2019 at 4:36 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > > Thanks Boyang, > > > > > > I've made another pass on KIP-447 as well as > > > https://github.com/apache/kafka/pull/7078, and have some minor > comments > > > about the proposed API: > > > > > > 1. it seems instead of needing the whole KafkaConsumer object, you'd > only > > > need the "ConsumerGroupMetadata", in that case can we just pass in that > > > object into the initTxns call? > > > > > > 2. the current trunk already has a public class named > > > (ConsumerGroupMetadata) > > > under o.a.k.clients.consumer created by KIP-429. If we want to just use > > > that then maybe it makes less sense to declare a base GroupMetadata as > we > > > are already leaking such information on the assignor anyways. > > > > > > > > > Guozhang > > > > > > On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen < > reluctanthero...@gmail.com> > > > wrote: > > > > > > > Thank you Guozhang for the reply. We will consider the interface > change > > > > from 429 as a backup plan for 447. > > > > > > > > And bumping this thread for more discussion. > > > > > > > > On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen < > > > reluctanthero...@gmail.com> > > > > > wrote: > > > > > > > > > > > Thank you Guozhang for the suggestion! I would normally prefer > > > naming a > > > > > > flag corresponding to its functionality. Seems to me > > > `isolation_level` > > > > > > makes us another hop on information track. > > > > > > > > > > > > Fair enough, let's use a separate flag name then :) > > > > > > > > > > > > > > > > As for the generation.id exposure, I'm fine leveraging the new > API > > > > from > > > > > > 429, but however is that design finalized yet, and whether the > API > > > will > > > > > be > > > > > > added on the generic Consumer<K, V> interface? > > > > > > > > > > > > The current PartitionAssignor is inside `internals` package and > in > > > > > KIP-429 > > > > > we are going to create a new interface out of `internals` to really > > > make > > > > it > > > > > public APIs, and as part of that we are refactoring some of its > > method > > > > > signatures. I just feel some of the newly introduced classes can be > > > > reused > > > > > in your KIP as well, i.e. just for code succinctness, but no > > semantical > > > > > indications. > > > > > > > > > > > > > > > > Boyang > > > > > > > > > > > > On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Boyang, thanks for the updated proposal! > > > > > > > > > > > > > > 3.a. As Jason mentioned, with EOS enabled we still need to > > augment > > > > the > > > > > > > offset fetch request with a boolean to indicate "give me an > > > retriable > > > > > > error > > > > > > > code if there's pending offset, rather than sending me the > > > committed > > > > > > offset > > > > > > > immediately". Personally I still feel it is okay to piggy-back > on > > > the > > > > > > > ISOLATION_LEVEL boolean, but I'm also fine with another > > > > > > `await_transaction` > > > > > > > boolean if you feel strongly about it. > > > > > > > > > > > > > > 10. About the exposure of generation id, there may be some > > > > refactoring > > > > > > work > > > > > > > coming from KIP-429 that can benefit KIP-447 as well since we > are > > > > > > wrapping > > > > > > > the consumer subscription / assignment data in new classes. > Note > > > that > > > > > > > current proposal does not `generationId` since with the > > cooperative > > > > > > sticky > > > > > > > assignor we think it is not necessary for correctness, but also > > if > > > we > > > > > > agree > > > > > > > it is okay to expose it we can potentially include it in > > > > > > > `ConsumerAssignmentData` as well. > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen < > > > > > reluctanthero...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Thank you Jason for the ideas. > > > > > > > > > > > > > > > > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson < > > > > ja...@confluent.io> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Boyang, > > > > > > > > > > > > > > > > > > Thanks for the updates. A few comments below: > > > > > > > > > > > > > > > > > > 1. The KIP mentions that `transaction.timeout.ms` should > be > > > > > reduced > > > > > > to > > > > > > > > > 10s. > > > > > > > > > I think this makes sense for Kafka Streams which is tied to > > the > > > > > > > consumer > > > > > > > > > group semantics and uses a default 10s session timeout. > > > However, > > > > it > > > > > > > > seems a > > > > > > > > > bit dangerous to make this change for the producer > generally. > > > > Could > > > > > > we > > > > > > > > just > > > > > > > > > change it for streams? > > > > > > > > > > > > > > > > > > That sounds good to me. > > > > > > > > > > > > > > > > > 2. The new `initTransactions` API takes a `Consumer` > > instance. > > > I > > > > > > think > > > > > > > > the > > > > > > > > > idea is to basically put in a backdoor to give the producer > > > > access > > > > > to > > > > > > > the > > > > > > > > > group generationId. It's not clear to me how this would > work > > > > given > > > > > > > > package > > > > > > > > > restrictions. I wonder if it would be better to just expose > > the > > > > > state > > > > > > > we > > > > > > > > > need from the consumer. I know we have been reluctant to do > > > this > > > > so > > > > > > far > > > > > > > > > because we treat the generationId as an implementation > > detail. > > > > > > > However, I > > > > > > > > > think we might just bite the bullet and expose it rather > than > > > > > coming > > > > > > up > > > > > > > > > with a messy hack. Concepts such as memberIds have already > > been > > > > > > exposed > > > > > > > > in > > > > > > > > > the AdminClient, so maybe it is not too bad. Alternatively, > > we > > > > > could > > > > > > > use > > > > > > > > an > > > > > > > > > opaque type. For example: > > > > > > > > > > > > > > > > > > // public > > > > > > > > > interface GroupMetadata {} > > > > > > > > > > > > > > > > > > // private > > > > > > > > > interface ConsumerGroupMetadata { > > > > > > > > > final int generationId; > > > > > > > > > final String memberId; > > > > > > > > > } > > > > > > > > > > > > > > > > > > // Consumer API > > > > > > > > > public GroupMetadata groupMetadata(); > > > > > > > > > > > > > > > > > > I am probably leaning toward just exposing the state we > need. > > > > > > > > > > > > > > > > > > Yes, also to mention that Kafka Streams use generic > Cosnumer > > > API > > > > > > which > > > > > > > > doesn't have rich > > > > > > > > states like a full `KafkaConsumer`. The hack will not work as > > > > > expected. > > > > > > > > > > > > > > > > Instead, just exposing the consumer generation.id seems a > way > > > > easier > > > > > > > work. > > > > > > > > We could consolidate > > > > > > > > the API and make it > > > > > > > > > > > > > > > > 3. Given that we are already providing a way to propagate > group > > > > state > > > > > > > from > > > > > > > > > the consumer to the producer, I wonder if we may as well > > > include > > > > > the > > > > > > > > > memberId and groupInstanceId. This would make the > validation > > we > > > > do > > > > > > for > > > > > > > > > TxnOffsetCommit consistent with OffsetCommit. If for no > other > > > > > > benefit, > > > > > > > at > > > > > > > > > least this may help with debugging. > > > > > > > > > > > > > > > > > > > > > > > > > Yes, we could put them into the GroupMetadata struct. > > > > > > > > > > > > > > > > > > > > > > > > > 4. I like the addition of isolation_level to the offset > > fetch. > > > At > > > > > the > > > > > > > > same > > > > > > > > > time, its behavior is a bit inconsistent with how it is > used > > in > > > > the > > > > > > > > > consumer generally. There is no reason for the group > > > coordinator > > > > to > > > > > > > ever > > > > > > > > > expose aborted data, so this is mostly about awaiting > pending > > > > > offset > > > > > > > > > commits, not reading uncommitted data. Perhaps instead of > > > calling > > > > > > this > > > > > > > > > "isolation level," it should be more like > > > > > "await_pending_transaction" > > > > > > > or > > > > > > > > > something like that? > > > > > > > > > > > > > > > > > > Also, just to be clear, the consumer would treat this as an > > > > > optional > > > > > > > > field, > > > > > > > > > right? So if the broker does not support the latest > > OffsetFetch > > > > > API, > > > > > > it > > > > > > > > > would silently revert to reading the old data. Basically it > > > would > > > > > be > > > > > > up > > > > > > > > to > > > > > > > > > the streams version probing logic to ensure that the > > > expectation > > > > on > > > > > > > this > > > > > > > > > API fits with the usage of `transctional.id`. > > > > > > > > > > > > > > > > > > Sounds like a better naming to me, while I think it could > be > > > > > > shortened > > > > > > > to > > > > > > > > `await_transaction`. > > > > > > > > I think the field should be optional, too. > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen < > > > > > > reluctanthero...@gmail.com > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hey Guozhang, > > > > > > > > > > > > > > > > > > > > I will correct my statement from last email. I don't > think > > > the > > > > > > > > > > read_committed (3.a) is necessary to be added to the > > > > OffsetFetch > > > > > > > > request, > > > > > > > > > > as if we are using EOS application, the underlying > > consumers > > > > > within > > > > > > > the > > > > > > > > > > group should always back off when there is pending > offsets. > > > > > > > > > > > > > > > > > > > > Let me know if you think this is correct. > > > > > > > > > > > > > > > > > > > > On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen < > > > > > > > reluctanthero...@gmail.com > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Thank you Guozhang for the questions, inline answers > are > > > > below. > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen < > > > > > > > > reluctanthero...@gmail.com > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > >> Hey all, > > > > > > > > > > >> > > > > > > > > > > >> I have done a fundamental polish of KIP-447 > > > > > > > > > > >> < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics > > > > > > > > > > > > > > > > > > > > and > > > > > > > > > > >> written a design doc > > > > > > > > > > >> < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit# > > > > > > > > > > > > > > > > > > > > depicting > > > > > > > > > > >> internal changes. We stripped off many implementation > > > > details > > > > > > from > > > > > > > > the > > > > > > > > > > KIP, > > > > > > > > > > >> and simplified the public changes by a lot. For > > reviewers, > > > > it > > > > > is > > > > > > > > > highly > > > > > > > > > > >> recommended to fully understand EOS design in KIP-98 > and > > > > read > > > > > > its > > > > > > > > > > >> corresponding design doc > > > > > > > > > > >> < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit > > > > > > > > > > > > > > > > > > > > if > > > > > > > > > > >> you haven't done so already. > > > > > > > > > > >> > > > > > > > > > > >> Let me know if you found anything confusing around the > > KIP > > > > or > > > > > > the > > > > > > > > > > design. > > > > > > > > > > >> Would be happy to discuss in depth. > > > > > > > > > > >> > > > > > > > > > > >> Best, > > > > > > > > > > >> Boyang > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang < > > > > > > > wangg...@gmail.com> > > > > > > > > > > >> wrote: > > > > > > > > > > >> > > > > > > > > > > >>> 2. The reason we did not expose generation.id from > > > > > > KafkaConsumer > > > > > > > > > > public > > > > > > > > > > >>> APIs directly is to abstract this notion from users > > > (since > > > > it > > > > > > is > > > > > > > an > > > > > > > > > > >>> implementation detail of the rebalance protocol > itself, > > > > e.g. > > > > > if > > > > > > > > user > > > > > > > > > > >>> calls > > > > > > > > > > >>> consumer.assign() they do not need to invoke > > > > > > ConsumerCoordinator > > > > > > > > and > > > > > > > > > no > > > > > > > > > > >>> need to be aware of generation.id at all). > > > > > > > > > > >>> > > > > > > > > > > >>> On the other hand, with the current proposal the > > > > > > txn.coordiantor > > > > > > > > did > > > > > > > > > > not > > > > > > > > > > >>> know about the latest generation from the > > source-of-truth > > > > > > > > > > >>> group.coordinator; instead, it will only bump up the > > > > > generation > > > > > > > > from > > > > > > > > > > the > > > > > > > > > > >>> producer's InitProducerIdRequest only. > > > > > > > > > > >>> > > > > > > > > > > >>> The key here is that GroupCoordinator, when handling > > > > > > > > > > >>> `InitProducerIdRequest > > > > > > > > > > >>> > > > > > > > > > > >> In the new design, we just pass the entire consumer > > > instance > > > > > > into > > > > > > > > the > > > > > > > > > > > producer through > > > > > > > > > > > #initTransaction, so no public API will be created. > > > > > > > > > > > > > > > > > > > > > >> 3. I agree that if we rely on the group coordinator to > > > block > > > > > on > > > > > > > > > > returning > > > > > > > > > > >>> offset-fetch-response if read-committed is enabled, > > then > > > we > > > > > do > > > > > > > not > > > > > > > > > need > > > > > > > > > > >>> to > > > > > > > > > > >>> store partition assignment on txn coordinator and > > > therefore > > > > > > it's > > > > > > > > > better > > > > > > > > > > >>> to > > > > > > > > > > >>> still decouple them. For that case we still need to > > > update > > > > > the > > > > > > > KIP > > > > > > > > > wiki > > > > > > > > > > >>> page that includes: > > > > > > > > > > >>> > > > > > > > > > > >>> 3.a. Augment OffsetFetchRequest with the > > ISOLATION_LEVEL > > > as > > > > > > well. > > > > > > > > > > >>> 3.b. Add new error code in OffsetFetchResponse to let > > > > client > > > > > > > > backoff > > > > > > > > > > and > > > > > > > > > > >>> retry if there are pending txns including the > > interested > > > > > > > > partitions. > > > > > > > > > > >>> 3.c. Also in the worst case we would let the client > be > > > > > blocked > > > > > > > for > > > > > > > > > the > > > > > > > > > > >>> txn.timeout period, and for that rationale we may > need > > to > > > > > > > consider > > > > > > > > > > >>> reducing > > > > > > > > > > >>> our default txn.timeout value as well. > > > > > > > > > > >>> > > > > > > > > > > >>> Addressed 3.b and 3.c, will do 3.a. > > > > > > > > > > > > > > > > > > > > > >> 4. According to Colin it seems we do not need to > create > > > > > another > > > > > > > KIP > > > > > > > > > and > > > > > > > > > > we > > > > > > > > > > >>> can just complete it as part of KIP-117 / KAFKA-5214; > > and > > > > we > > > > > > need > > > > > > > > to > > > > > > > > > do > > > > > > > > > > >>> some cleanup to have BrokerApiVersion exposed from > > > > > AdminClient > > > > > > > > > (@Colin > > > > > > > > > > >>> please let use know if you have any concerns exposing > > > it). > > > > > > > > > > >>> > > > > > > > > > > >> I think we no longer need to rely on api version for > > > > > > > initialization, > > > > > > > > > > > since we will be using the upgrade.from config anyway. > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >>> Guozhang > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson < > > > > > > > > ja...@confluent.io> > > > > > > > > > > >>> wrote: > > > > > > > > > > >>> > > > > > > > > > > >>> > For reference, we have BrokerApiVersionCommand > > already > > > > as a > > > > > > > > public > > > > > > > > > > >>> > interface. We have a bit of tech debt at the moment > > > > because > > > > > > it > > > > > > > > > uses a > > > > > > > > > > >>> > custom AdminClient. It would be nice to clean that > > up. > > > In > > > > > > > > general, > > > > > > > > > I > > > > > > > > > > >>> think > > > > > > > > > > >>> > it is reasonable to expose from AdminClient. It can > > be > > > > used > > > > > > by > > > > > > > > > > >>> management > > > > > > > > > > >>> > tools to inspect running Kafka versions for > example. > > > > > > > > > > >>> > > > > > > > > > > > >>> > -Jason > > > > > > > > > > >>> > > > > > > > > > > > >>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen < > > > > > > > > > > >>> reluctanthero...@gmail.com> > > > > > > > > > > >>> > wrote: > > > > > > > > > > >>> > > > > > > > > > > > >>> > > Thank you for the context Colin. The groupId was > > > > indeed a > > > > > > > > > > copy-paste > > > > > > > > > > >>> > error. > > > > > > > > > > >>> > > Our use case here for 447 is (Quoted from > > Guozhang): > > > > > > > > > > >>> > > ''' > > > > > > > > > > >>> > > I think if we can do something else to > > > > > > > > > > >>> > > avoid this config though, for example we can use > > the > > > > > > embedded > > > > > > > > > > >>> AdminClient > > > > > > > > > > >>> > > to send the APIVersion request upon starting up, > > and > > > > > based > > > > > > on > > > > > > > > the > > > > > > > > > > >>> > returned > > > > > > > > > > >>> > > value decides whether to go to the old code path > or > > > the > > > > > new > > > > > > > > > > behavior. > > > > > > > > > > >>> > > ''' > > > > > > > > > > >>> > > The benefit we get is to avoid adding a new > > > > configuration > > > > > > to > > > > > > > > > make a > > > > > > > > > > >>> > > decision simply base on broker version. If you > have > > > > > > concerns > > > > > > > > with > > > > > > > > > > >>> > exposing > > > > > > > > > > >>> > > ApiVersion for client, we could > > > > > > > > > > >>> > > try to think of alternative solutions too. > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > Boyang > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe < > > > > > > > > cmcc...@apache.org > > > > > > > > > > > > > > > > > > > > >>> wrote: > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > kafka.api.ApiVersion is an internal class, not > > > > suitable > > > > > > to > > > > > > > > > > exposing > > > > > > > > > > >>> > > > through AdminClient. That class is not even > > > > accessible > > > > > > > > without > > > > > > > > > > >>> having > > > > > > > > > > >>> > > the > > > > > > > > > > >>> > > > broker jars on your CLASSPATH. > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > Another question is, what is the groupId > > parameter > > > > > doing > > > > > > in > > > > > > > > the > > > > > > > > > > >>> call? > > > > > > > > > > >>> > > The > > > > > > > > > > >>> > > > API versions are the same no matter what > consumer > > > > group > > > > > > we > > > > > > > > use, > > > > > > > > > > >>> right? > > > > > > > > > > >>> > > > Perhaps this was a copy and paste error? > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > This is not the first time we have discussed > > > having a > > > > > > > method > > > > > > > > in > > > > > > > > > > >>> > > > AdminClient to retrieve API version > information. > > > In > > > > > > fact, > > > > > > > > the > > > > > > > > > > >>> original > > > > > > > > > > >>> > > KIP > > > > > > > > > > >>> > > > which created KafkaAdminClient specified an API > > for > > > > > > > fetching > > > > > > > > > > >>> version > > > > > > > > > > >>> > > > information. It was called apiVersions and it > is > > > > still > > > > > > > there > > > > > > > > > on > > > > > > > > > > >>> the > > > > > > > > > > >>> > > wiki. > > > > > > > > > > >>> > > > See > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > However, this API wasn't ready in time for > 0.11.0 > > > so > > > > we > > > > > > > > shipped > > > > > > > > > > >>> without > > > > > > > > > > >>> > > > it. There was a JIRA to implement it for later > > > > > versions, > > > > > > > > > > >>> > > > > https://issues.apache.org/jira/browse/KAFKA-5214 > > , > > > > as > > > > > > well > > > > > > > > as > > > > > > > > > a > > > > > > > > > > >>> PR, > > > > > > > > > > >>> > > > https://github.com/apache/kafka/pull/3012 . > > > > However, > > > > > we > > > > > > > > > started > > > > > > > > > > >>> to > > > > > > > > > > >>> > > > rethink whether this AdminClient function was > > even > > > > > > > necessary. > > > > > > > > > > >>> Most of > > > > > > > > > > >>> > > the > > > > > > > > > > >>> > > > use-cases we could think of seemed like > horrible > > > > hacks. > > > > > > So > > > > > > > > it > > > > > > > > > > has > > > > > > > > > > >>> > never > > > > > > > > > > >>> > > > really been implemented (yet?). > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > best, > > > > > > > > > > >>> > > > Colin > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen > > wrote: > > > > > > > > > > >>> > > > > Actually, after a second thought, I think it > > > > actually > > > > > > > makes > > > > > > > > > > >>> sense to > > > > > > > > > > >>> > > > > support auto upgrade through admin client to > > help > > > > use > > > > > > get > > > > > > > > api > > > > > > > > > > >>> version > > > > > > > > > > >>> > > > > from > > > > > > > > > > >>> > > > > broker. > > > > > > > > > > >>> > > > > A draft KIP is here: > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > Boyang > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen < > > > > > > > > > > >>> > > reluctanthero...@gmail.com> > > > > > > > > > > >>> > > > > wrote: > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > > Thank you Guozhang, some of my > understandings > > > are > > > > > > > inline > > > > > > > > > > below. > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason > > > Gustafson > > > > < > > > > > > > > > > >>> > ja...@confluent.io > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > wrote: > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > I think co-locating does have some > merits > > > > here, > > > > > > i.e. > > > > > > > > > > >>> letting the > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator which has the > > > > > source-of-truth > > > > > > of > > > > > > > > > > >>> assignment > > > > > > > > > > >>> > to > > > > > > > > > > >>> > > > act > > > > > > > > > > >>> > > > > >> as > > > > > > > > > > >>> > > > > >> > the TxnCoordinator as well; but I agree > > > > there's > > > > > > also > > > > > > > > > some > > > > > > > > > > >>> cons > > > > > > > > > > >>> > of > > > > > > > > > > >>> > > > > >> coupling > > > > > > > > > > >>> > > > > >> > them together. I'm still a bit inclining > > > > towards > > > > > > > > > > colocation > > > > > > > > > > >>> but > > > > > > > > > > >>> > if > > > > > > > > > > >>> > > > there > > > > > > > > > > >>> > > > > >> > are good rationales not to do so I can > be > > > > > > convinced > > > > > > > as > > > > > > > > > > well. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > >> The good rationale is that we have no > > > mechanism > > > > to > > > > > > > > > colocate > > > > > > > > > > >>> > > > partitions ;). > > > > > > > > > > >>> > > > > >> Are you suggesting we store the group and > > > > > > transaction > > > > > > > > > state > > > > > > > > > > >>> in the > > > > > > > > > > >>> > > > same > > > > > > > > > > >>> > > > > >> log? Can you be more concrete about the > > > benefit? > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > >> -Jason > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang > > > Wang < > > > > > > > > > > >>> > wangg...@gmail.com> > > > > > > > > > > >>> > > > > >> wrote: > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > >> > Hi Boyang, > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > 1. One advantage of retry against > on-hold > > is > > > > > that > > > > > > it > > > > > > > > > will > > > > > > > > > > >>> not > > > > > > > > > > >>> > > > tie-up a > > > > > > > > > > >>> > > > > >> > handler thread (of course the latter > could > > > do > > > > > the > > > > > > > same > > > > > > > > > but > > > > > > > > > > >>> that > > > > > > > > > > >>> > > > involves > > > > > > > > > > >>> > > > > >> > using a purgatory which is more > > > complicated), > > > > > and > > > > > > > also > > > > > > > > > it > > > > > > > > > > is > > > > > > > > > > >>> > less > > > > > > > > > > >>> > > > > >> likely to > > > > > > > > > > >>> > > > > >> > violate request timeout. So I think > there > > > are > > > > > some > > > > > > > > > > >>> rationales to > > > > > > > > > > >>> > > > prefer > > > > > > > > > > >>> > > > > >> > retries. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > > That sounds fair to me, also we are > avoiding > > > > usage > > > > > > of > > > > > > > > > > another > > > > > > > > > > >>> > > > purgatory > > > > > > > > > > >>> > > > > > instance. Usually for one back-off > > > > > > > > > > >>> > > > > > we are only delaying 50ms during startup > > which > > > is > > > > > > > trivial > > > > > > > > > > cost. > > > > > > > > > > >>> > This > > > > > > > > > > >>> > > > > > behavior shouldn't be changed. > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > 2. Regarding "ConsumerRebalanceListener": > > > both > > > > > > > > > > >>> > > > ConsumerRebalanceListener > > > > > > > > > > >>> > > > > >> > and PartitionAssignors are > > user-customizable > > > > > > > modules, > > > > > > > > > and > > > > > > > > > > >>> only > > > > > > > > > > >>> > > > > >> difference > > > > > > > > > > >>> > > > > >> > is that the former is specified via code > > and > > > > the > > > > > > > > latter > > > > > > > > > is > > > > > > > > > > >>> > > > specified via > > > > > > > > > > >>> > > > > >> > config. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > Regarding Jason's proposal of > > > > > ConsumerAssignment, > > > > > > > one > > > > > > > > > > thing > > > > > > > > > > >>> to > > > > > > > > > > >>> > > note > > > > > > > > > > >>> > > > > >> though > > > > > > > > > > >>> > > > > >> > with KIP-429 the onPartitionAssigned may > > not > > > > be > > > > > > > called > > > > > > > > > if > > > > > > > > > > >>> the > > > > > > > > > > >>> > > > assignment > > > > > > > > > > >>> > > > > >> > does not change, whereas onAssignment > > would > > > > > always > > > > > > > be > > > > > > > > > > >>> called at > > > > > > > > > > >>> > > the > > > > > > > > > > >>> > > > end > > > > > > > > > > >>> > > > > >> of > > > > > > > > > > >>> > > > > >> > sync-group response. My proposed > semantics > > > is > > > > > that > > > > > > > > > > >>> > > > > >> > `RebalanceListener#onPartitionsXXX` are > > used > > > > for > > > > > > > > > > >>> notifications > > > > > > > > > > >>> > to > > > > > > > > > > >>> > > > user, > > > > > > > > > > >>> > > > > >> and > > > > > > > > > > >>> > > > > >> > hence if there's no changes these will > not > > > be > > > > > > > called, > > > > > > > > > > >>> whereas > > > > > > > > > > >>> > > > > >> > `PartitionAssignor` is used for assignor > > > > logic, > > > > > > > whose > > > > > > > > > > >>> callback > > > > > > > > > > >>> > > would > > > > > > > > > > >>> > > > > >> always > > > > > > > > > > >>> > > > > >> > be called no matter if the partitions > have > > > > > changed > > > > > > > or > > > > > > > > > not. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > >> I think a third option is to gracefully > > expose > > > > > > > > generation > > > > > > > > > id > > > > > > > > > > >>> as > > > > > > > > > > >>> > part > > > > > > > > > > >>> > > > of > > > > > > > > > > >>> > > > > > consumer API, so that we don't need to > > > > > > > > > > >>> > > > > > bother overloading various callbacks. Of > > > course, > > > > > this > > > > > > > > > builds > > > > > > > > > > >>> upon > > > > > > > > > > >>> > the > > > > > > > > > > >>> > > > > > assumption that topic partitions > > > > > > > > > > >>> > > > > > will not be included in new initTransaction > > > API. > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > 3. I feel it is a bit awkward to let the > > > > > > > TxnCoordinator > > > > > > > > > > >>> keeping > > > > > > > > > > >>> > > > partition > > > > > > > > > > >>> > > > > >> > assignments since it is sort of taking > > over > > > > the > > > > > > job > > > > > > > of > > > > > > > > > the > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator, and may likely > cause > > a > > > > > > > > split-brain > > > > > > > > > > >>> problem > > > > > > > > > > >>> > as > > > > > > > > > > >>> > > > two > > > > > > > > > > >>> > > > > >> > coordinators keep a copy of this > > assignment > > > > > which > > > > > > > may > > > > > > > > be > > > > > > > > > > >>> > > different. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > I think co-locating does have some > merits > > > > here, > > > > > > i.e. > > > > > > > > > > >>> letting the > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator which has the > > > > > source-of-truth > > > > > > of > > > > > > > > > > >>> assignment > > > > > > > > > > >>> > to > > > > > > > > > > >>> > > > act > > > > > > > > > > >>> > > > > >> as > > > > > > > > > > >>> > > > > >> > the TxnCoordinator as well; but I agree > > > > there's > > > > > > also > > > > > > > > > some > > > > > > > > > > >>> cons > > > > > > > > > > >>> > of > > > > > > > > > > >>> > > > > >> coupling > > > > > > > > > > >>> > > > > >> > them together. I'm still a bit inclining > > > > towards > > > > > > > > > > colocation > > > > > > > > > > >>> but > > > > > > > > > > >>> > if > > > > > > > > > > >>> > > > there > > > > > > > > > > >>> > > > > >> > are good rationales not to do so I can > be > > > > > > convinced > > > > > > > as > > > > > > > > > > well. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > > The purpose of co-location is to let txn > > > > > coordinator > > > > > > > see > > > > > > > > > the > > > > > > > > > > >>> group > > > > > > > > > > >>> > > > > > assignment. This priority is weakened > > > > > > > > > > >>> > > > > > when we already have defense on the > consumer > > > > offset > > > > > > > > fetch, > > > > > > > > > > so I > > > > > > > > > > >>> > guess > > > > > > > > > > >>> > > > it's > > > > > > > > > > >>> > > > > > not super important anymore. > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > >> > 4. I guess I'm preferring the philosophy > > of > > > > > "only > > > > > > > add > > > > > > > > > > >>> configs if > > > > > > > > > > >>> > > > > >> there's no > > > > > > > > > > >>> > > > > >> > other ways", since more and more configs > > > would > > > > > > make > > > > > > > it > > > > > > > > > > less > > > > > > > > > > >>> and > > > > > > > > > > >>> > > less > > > > > > > > > > >>> > > > > >> > intuitive out of the box to use. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > I think it's a valid point that checks > > upon > > > > > > starting > > > > > > > > up > > > > > > > > > > >>> does not > > > > > > > > > > >>> > > > cope > > > > > > > > > > >>> > > > > >> with > > > > > > > > > > >>> > > > > >> > brokers downgrading but even with a > > config, > > > > but > > > > > it > > > > > > > is > > > > > > > > > > still > > > > > > > > > > >>> hard > > > > > > > > > > >>> > > for > > > > > > > > > > >>> > > > > >> users > > > > > > > > > > >>> > > > > >> > to determine when they can be ensured > the > > > > broker > > > > > > > would > > > > > > > > > > never > > > > > > > > > > >>> > > > downgrade > > > > > > > > > > >>> > > > > >> > anymore and hence can safely switch the > > > > config. > > > > > So > > > > > > > my > > > > > > > > > > >>> feeling is > > > > > > > > > > >>> > > > that > > > > > > > > > > >>> > > > > >> this > > > > > > > > > > >>> > > > > >> > config would not be helping too much > > still. > > > If > > > > > we > > > > > > > want > > > > > > > > > to > > > > > > > > > > >>> be at > > > > > > > > > > >>> > > the > > > > > > > > > > >>> > > > > >> safer > > > > > > > > > > >>> > > > > >> > side, then I'd suggest we modify the > > > > Coordinator > > > > > > -> > > > > > > > > > > >>> > NetworkClient > > > > > > > > > > >>> > > > > >> hierarchy > > > > > > > > > > >>> > > > > >> > to allow the NetworkClient being able to > > > pass > > > > > the > > > > > > > > > > APIVersion > > > > > > > > > > >>> > > > metadata to > > > > > > > > > > >>> > > > > >> > Coordinator, so that Coordinator can > rely > > on > > > > > that > > > > > > > > logic > > > > > > > > > to > > > > > > > > > > >>> > change > > > > > > > > > > >>> > > > its > > > > > > > > > > >>> > > > > >> > behavior dynamically. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > > The stream thread init could not be > supported > > > by > > > > a > > > > > > > client > > > > > > > > > > >>> > coordinator > > > > > > > > > > >>> > > > > > behavior change on the fly, > > > > > > > > > > >>> > > > > > we are only losing possibilities after we > > > > > > initialized. > > > > > > > > > (main > > > > > > > > > > >>> thread > > > > > > > > > > >>> > > > gets > > > > > > > > > > >>> > > > > > exit and no thread has global picture > > anymore) > > > > > > > > > > >>> > > > > > If we do want to support auto version > > > detection, > > > > > > admin > > > > > > > > > client > > > > > > > > > > >>> > request > > > > > > > > > > >>> > > > in > > > > > > > > > > >>> > > > > > this sense shall be easier. > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > 5. I do not have a concrete idea about > how > > > the > > > > > > > impact > > > > > > > > on > > > > > > > > > > >>> Connect > > > > > > > > > > >>> > > > would > > > > > > > > > > >>> > > > > >> > make, maybe Randall or Konstantine can > > help > > > > > here? > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > > Sounds good, let's see their thoughts. > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > >> > Guozhang > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > On Mon, Jun 24, 2019 at 10:26 PM Boyang > > > Chen < > > > > > > > > > > >>> > > > > >> reluctanthero...@gmail.com> > > > > > > > > > > >>> > > > > >> > wrote: > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > > Hey Jason, > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > thank you for the proposal here. Some > of > > > my > > > > > > > thoughts > > > > > > > > > > >>> below. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 8:58 PM Jason > > > > > Gustafson > > > > > > < > > > > > > > > > > >>> > > > ja...@confluent.io> > > > > > > > > > > >>> > > > > >> > > wrote: > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > > Hi Boyang, > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > Thanks for picking this up! Still > > > reading > > > > > > > through > > > > > > > > > the > > > > > > > > > > >>> > updates, > > > > > > > > > > >>> > > > but > > > > > > > > > > >>> > > > > >> here > > > > > > > > > > >>> > > > > >> > > are > > > > > > > > > > >>> > > > > >> > > > a couple initial comments on the > APIs: > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > 1. The `TxnProducerIdentity` class > is > > a > > > > bit > > > > > > > > > awkward. I > > > > > > > > > > >>> think > > > > > > > > > > >>> > > we > > > > > > > > > > >>> > > > are > > > > > > > > > > >>> > > > > >> > > trying > > > > > > > > > > >>> > > > > >> > > > to encapsulate state from the > current > > > > group > > > > > > > > > > assignment. > > > > > > > > > > >>> > Maybe > > > > > > > > > > >>> > > > > >> something > > > > > > > > > > >>> > > > > >> > > > like `ConsumerAssignment` would be > > > > clearer? > > > > > If > > > > > > > we > > > > > > > > > make > > > > > > > > > > >>> the > > > > > > > > > > >>> > > usage > > > > > > > > > > >>> > > > > >> > > consistent > > > > > > > > > > >>> > > > > >> > > > across the consumer and producer, > then > > > we > > > > > can > > > > > > > > avoid > > > > > > > > > > >>> exposing > > > > > > > > > > >>> > > > > >> internal > > > > > > > > > > >>> > > > > >> > > state > > > > > > > > > > >>> > > > > >> > > > like the generationId. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > For example: > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > // Public API > > > > > > > > > > >>> > > > > >> > > > interface ConsumerAssignment { > > > > > > > > > > >>> > > > > >> > > > Set<TopicPartition> partittions(); > > > > > > > > > > >>> > > > > >> > > > } > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > // Not a public API > > > > > > > > > > >>> > > > > >> > > > class InternalConsumerAssignment > > > > implements > > > > > > > > > > >>> > > ConsumerAssignment { > > > > > > > > > > >>> > > > > >> > > > Set<TopicPartition> partittions; > > > > > > > > > > >>> > > > > >> > > > int generationId; > > > > > > > > > > >>> > > > > >> > > > } > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > Then we can change the rebalance > > > listener > > > > to > > > > > > > > > something > > > > > > > > > > >>> like > > > > > > > > > > >>> > > > this: > > > > > > > > > > >>> > > > > >> > > > > > onPartitionsAssigned(ConsumerAssignment > > > > > > > > assignment) > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > And on the producer: > > > > > > > > > > >>> > > > > >> > > > void initTransactions(String > groupId, > > > > > > > > > > ConsumerAssignment > > > > > > > > > > >>> > > > > >> assignment); > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > 2. Another bit of awkwardness is the > > > fact > > > > > that > > > > > > > we > > > > > > > > > have > > > > > > > > > > >>> to > > > > > > > > > > >>> > pass > > > > > > > > > > >>> > > > the > > > > > > > > > > >>> > > > > >> > > groupId > > > > > > > > > > >>> > > > > >> > > > through both initTransactions() and > > > > > > > > > > >>> > > sendOffsetsToTransaction(). > > > > > > > > > > >>> > > > We > > > > > > > > > > >>> > > > > >> > could > > > > > > > > > > >>> > > > > >> > > > consider a config instead. Maybe > > > something > > > > > > like > > > > > > > ` > > > > > > > > > > >>> > > > > >> > transactional.group.id > > > > > > > > > > >>> > > > > >> > > `? > > > > > > > > > > >>> > > > > >> > > > Then we could simplify the producer > > > APIs, > > > > > > > > > potentially > > > > > > > > > > >>> even > > > > > > > > > > >>> > > > > >> deprecating > > > > > > > > > > >>> > > > > >> > > the > > > > > > > > > > >>> > > > > >> > > > current sendOffsetsToTransaction. In > > > fact, > > > > > for > > > > > > > > this > > > > > > > > > > new > > > > > > > > > > >>> > usage, > > > > > > > > > > >>> > > > the ` > > > > > > > > > > >>> > > > > >> > > > transational.id` config is not > > needed. > > > It > > > > > > would > > > > > > > > be > > > > > > > > > > >>> nice if > > > > > > > > > > >>> > we > > > > > > > > > > >>> > > > don't > > > > > > > > > > >>> > > > > >> > have > > > > > > > > > > >>> > > > > >> > > > to > > > > > > > > > > >>> > > > > >> > > > provide it. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > I like the idea of combining 1 and > 2. > > We > > > > > could > > > > > > > > > > >>> definitely > > > > > > > > > > >>> > pass > > > > > > > > > > >>> > > > in a > > > > > > > > > > >>> > > > > >> > > group.id config > > > > > > > > > > >>> > > > > >> > > so that we could avoid exposing that > > > > > information > > > > > > > in > > > > > > > > a > > > > > > > > > > >>> public > > > > > > > > > > >>> > > API. > > > > > > > > > > >>> > > > The > > > > > > > > > > >>> > > > > >> > > question I have > > > > > > > > > > >>> > > > > >> > > is that whether we should name the > > > interface > > > > > > > > > > >>> `GroupAssignment` > > > > > > > > > > >>> > > > > >> instead, > > > > > > > > > > >>> > > > > >> > so > > > > > > > > > > >>> > > > > >> > > that Connect later > > > > > > > > > > >>> > > > > >> > > could also extend on the same > interface, > > > > just > > > > > to > > > > > > > > echo > > > > > > > > > > >>> > Guozhang's > > > > > > > > > > >>> > > > point > > > > > > > > > > >>> > > > > >> > > here, Also the base interface > > > > > > > > > > >>> > > > > >> > > is better to be defined empty for easy > > > > > > extension, > > > > > > > or > > > > > > > > > > >>> define an > > > > > > > > > > >>> > > > > >> abstract > > > > > > > > > > >>> > > > > >> > > type called `Resource` to be shareable > > > > > > > > > > >>> > > > > >> > > later IMHO. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > > By the way, I'm a bit confused about > > > > > > discussion > > > > > > > > > above > > > > > > > > > > >>> about > > > > > > > > > > >>> > > > > >> colocating > > > > > > > > > > >>> > > > > >> > > the > > > > > > > > > > >>> > > > > >> > > > txn and group coordinators. That is > > not > > > > > > actually > > > > > > > > > > >>> necessary, > > > > > > > > > > >>> > is > > > > > > > > > > >>> > > > it? > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > Yes, this is not a requirement for > > this > > > > KIP, > > > > > > > > because > > > > > > > > > > it > > > > > > > > > > >>> is > > > > > > > > > > >>> > > > > >> inherently > > > > > > > > > > >>> > > > > >> > > impossible to > > > > > > > > > > >>> > > > > >> > > achieve co-locating topic partition > of > > > > > > > transaction > > > > > > > > > log > > > > > > > > > > >>> and > > > > > > > > > > >>> > > > consumed > > > > > > > > > > >>> > > > > >> > offset > > > > > > > > > > >>> > > > > >> > > topics. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > > Thanks, > > > > > > > > > > >>> > > > > >> > > > Jason > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM > Boyang > > > > Chen < > > > > > > > > > > >>> > > > > >> reluctanthero...@gmail.com > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > > wrote: > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > > Thank you Ismael for the > suggestion. > > > We > > > > > will > > > > > > > > > attempt > > > > > > > > > > >>> to > > > > > > > > > > >>> > > > address > > > > > > > > > > >>> > > > > >> it by > > > > > > > > > > >>> > > > > >> > > > > giving more details to rejected > > > > > alternative > > > > > > > > > section. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > Thank you for the comment > Guozhang! > > > > > Answers > > > > > > > are > > > > > > > > > > inline > > > > > > > > > > >>> > > below. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM > > > Guozhang > > > > > > Wang > > > > > > > < > > > > > > > > > > >>> > > > wangg...@gmail.com > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > > > wrote: > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > Hello Boyang, > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > Thanks for the KIP, I have some > > > > comments > > > > > > > > below: > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > 1. "Once transactions are > > complete, > > > > the > > > > > > call > > > > > > > > > will > > > > > > > > > > >>> > return." > > > > > > > > > > >>> > > > This > > > > > > > > > > >>> > > > > >> > seems > > > > > > > > > > >>> > > > > >> > > > > > different from the existing > > > behavior, > > > > in > > > > > > > which > > > > > > > > > we > > > > > > > > > > >>> would > > > > > > > > > > >>> > > > return a > > > > > > > > > > >>> > > > > >> > > > > retriable > > > > > > > > > > >>> > > > > >> > > > > > CONCURRENT_TRANSACTIONS and let > > the > > > > > client > > > > > > > to > > > > > > > > > > >>> retry, is > > > > > > > > > > >>> > > this > > > > > > > > > > >>> > > > > >> > > > intentional? > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > I don’t think it is intentional, > > and I > > > > > will > > > > > > > > defer > > > > > > > > > > this > > > > > > > > > > >>> > > > question to > > > > > > > > > > >>> > > > > >> > > Jason > > > > > > > > > > >>> > > > > >> > > > > when he got time to answer since > > from > > > > > what I > > > > > > > > > > >>> understood > > > > > > > > > > >>> > > retry > > > > > > > > > > >>> > > > and > > > > > > > > > > >>> > > > > >> on > > > > > > > > > > >>> > > > > >> > > hold > > > > > > > > > > >>> > > > > >> > > > > seem both valid approaches. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > 2. "an overload to > > > > onPartitionsAssigned > > > > > in > > > > > > > the > > > > > > > > > > >>> > consumer's > > > > > > > > > > >>> > > > > >> rebalance > > > > > > > > > > >>> > > > > >> > > > > > listener interface": as part of > > > > KIP-341 > > > > > > > we've > > > > > > > > > > >>> already > > > > > > > > > > >>> > add > > > > > > > > > > >>> > > > this > > > > > > > > > > >>> > > > > >> > > > > information > > > > > > > > > > >>> > > > > >> > > > > > to the onAssignment callback. > > Would > > > > this > > > > > > be > > > > > > > > > > >>> sufficient? > > > > > > > > > > >>> > Or > > > > > > > > > > >>> > > > more > > > > > > > > > > >>> > > > > >> > > > generally > > > > > > > > > > >>> > > > > >> > > > > > speaking, which information have > > to > > > be > > > > > > > passed > > > > > > > > > > >>> around in > > > > > > > > > > >>> > > > > >> rebalance > > > > > > > > > > >>> > > > > >> > > > > callback > > > > > > > > > > >>> > > > > >> > > > > > while others can be passed > around > > in > > > > > > > > > > >>> PartitionAssignor > > > > > > > > > > >>> > > > > >> callback? In > > > > > > > > > > >>> > > > > >> > > > > Streams > > > > > > > > > > >>> > > > > >> > > > > > for example both callbacks are > > used > > > > but > > > > > > most > > > > > > > > > > >>> critical > > > > > > > > > > >>> > > > > >> information > > > > > > > > > > >>> > > > > >> > is > > > > > > > > > > >>> > > > > >> > > > > passed > > > > > > > > > > >>> > > > > >> > > > > > via onAssignment. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > We still need to extend > > > > > > > > ConsumerRebalanceListener > > > > > > > > > > >>> because > > > > > > > > > > >>> > > > it’s the > > > > > > > > > > >>> > > > > >> > > > > interface we could have public > > access > > > > to. > > > > > > The > > > > > > > > > > >>> > #onAssignment > > > > > > > > > > >>> > > > call > > > > > > > > > > >>> > > > > >> is > > > > > > > > > > >>> > > > > >> > > > defined > > > > > > > > > > >>> > > > > >> > > > > on PartitionAssignor level which > is > > > not > > > > > easy > > > > > > > to > > > > > > > > > work > > > > > > > > > > >>> with > > > > > > > > > > >>> > > > external > > > > > > > > > > >>> > > > > >> > > > > producers. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > 3. "We propose to use a separate > > > > record > > > > > > type > > > > > > > > in > > > > > > > > > > >>> order to > > > > > > > > > > >>> > > > store > > > > > > > > > > >>> > > > > >> the > > > > > > > > > > >>> > > > > >> > > > group > > > > > > > > > > >>> > > > > >> > > > > > assignment.": hmm, I thought > with > > > the > > > > > > third > > > > > > > > > typed > > > > > > > > > > >>> > > > > >> FindCoordinator, > > > > > > > > > > >>> > > > > >> > > the > > > > > > > > > > >>> > > > > >> > > > > same > > > > > > > > > > >>> > > > > >> > > > > > broker that act as the consumer > > > > > > coordinator > > > > > > > > > would > > > > > > > > > > >>> > always > > > > > > > > > > >>> > > be > > > > > > > > > > >>> > > > > >> > selected > > > > > > > > > > >>> > > > > >> > > > as > > > > > > > > > > >>> > > > > >> > > > > > the txn coordinator, in which > case > > > it > > > > > can > > > > > > > > access > > > > > > > > > > its > > > > > > > > > > >>> > local > > > > > > > > > > >>> > > > cache > > > > > > > > > > >>> > > > > >> > > > > metadata / > > > > > > > > > > >>> > > > > >> > > > > > offset topic to get this > > information > > > > > > > already? > > > > > > > > We > > > > > > > > > > >>> just > > > > > > > > > > >>> > need > > > > > > > > > > >>> > > > to > > > > > > > > > > >>> > > > > >> think > > > > > > > > > > >>> > > > > >> > > > about > > > > > > > > > > >>> > > > > >> > > > > > how to make these two modules > > > directly > > > > > > > > exchange > > > > > > > > > > >>> > > information > > > > > > > > > > >>> > > > > >> without > > > > > > > > > > >>> > > > > >> > > > > messing > > > > > > > > > > >>> > > > > >> > > > > > up the code hierarchy. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > These two coordinators will be on > > the > > > > same > > > > > > > > broker > > > > > > > > > > only > > > > > > > > > > >>> > when > > > > > > > > > > >>> > > > > >> number of > > > > > > > > > > >>> > > > > >> > > > > partitions for transaction state > > topic > > > > and > > > > > > > > > consumer > > > > > > > > > > >>> offset > > > > > > > > > > >>> > > > topic > > > > > > > > > > >>> > > > > >> are > > > > > > > > > > >>> > > > > >> > > the > > > > > > > > > > >>> > > > > >> > > > > same. This normally holds true, > but > > > I'm > > > > > > afraid > > > > > > > > > > >>> > > > > >> > > > > we couldn't make this assumption? > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > 4. The config of > > > > > > > > > "CONSUMER_GROUP_AWARE_TRANSACTION": > > > > > > > > > > >>> it > > > > > > > > > > >>> > > seems > > > > > > > > > > >>> > > > the > > > > > > > > > > >>> > > > > >> > goal > > > > > > > > > > >>> > > > > >> > > of > > > > > > > > > > >>> > > > > >> > > > > > this config is just to avoid > > > > > old-versioned > > > > > > > > > broker > > > > > > > > > > >>> to not > > > > > > > > > > >>> > > be > > > > > > > > > > >>> > > > > >> able to > > > > > > > > > > >>> > > > > >> > > > > > recognize newer versioned > client. > > I > > > > > think > > > > > > if > > > > > > > > we > > > > > > > > > > can > > > > > > > > > > >>> do > > > > > > > > > > >>> > > > something > > > > > > > > > > >>> > > > > >> > else > > > > > > > > > > >>> > > > > >> > > > to > > > > > > > > > > >>> > > > > >> > > > > > avoid this config though, for > > > example > > > > we > > > > > > can > > > > > > > > use > > > > > > > > > > the > > > > > > > > > > >>> > > > embedded > > > > > > > > > > >>> > > > > >> > > > AdminClient > > > > > > > > > > >>> > > > > >> > > > > > to send the APIVersion request > > upon > > > > > > starting > > > > > > > > up, > > > > > > > > > > and > > > > > > > > > > >>> > based > > > > > > > > > > >>> > > > on > > > > > > > > > > >>> > > > > >> the > > > > > > > > > > >>> > > > > >> > > > > returned > > > > > > > > > > >>> > > > > >> > > > > > value decides whether to go to > the > > > old > > > > > > code > > > > > > > > path > > > > > > > > > > or > > > > > > > > > > >>> the > > > > > > > > > > >>> > > new > > > > > > > > > > >>> > > > > >> > behavior. > > > > > > > > > > >>> > > > > >> > > > > > Admittedly asking a random > broker > > > > about > > > > > > > > > APIVersion > > > > > > > > > > >>> does > > > > > > > > > > >>> > > not > > > > > > > > > > >>> > > > > >> > guarantee > > > > > > > > > > >>> > > > > >> > > > the > > > > > > > > > > >>> > > > > >> > > > > > whole cluster's versions, but > what > > > we > > > > > can > > > > > > do > > > > > > > > is > > > > > > > > > to > > > > > > > > > > >>> first > > > > > > > > > > >>> > > 1) > > > > > > > > > > >>> > > > find > > > > > > > > > > >>> > > > > >> > the > > > > > > > > > > >>> > > > > >> > > > > > coordinator (and if the random > > > broker > > > > > does > > > > > > > not > > > > > > > > > > even > > > > > > > > > > >>> > > > recognize > > > > > > > > > > >>> > > > > >> the > > > > > > > > > > >>> > > > > >> > new > > > > > > > > > > >>> > > > > >> > > > > > discover type, fall back to old > > path > > > > > > > > directly), > > > > > > > > > > and > > > > > > > > > > >>> then > > > > > > > > > > >>> > > 2) > > > > > > > > > > >>> > > > ask > > > > > > > > > > >>> > > > > >> the > > > > > > > > > > >>> > > > > >> > > > > > discovered coordinator about its > > > > > supported > > > > > > > > > > >>> APIVersion. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > The caveat here is that we have to > > > make > > > > > sure > > > > > > > > both > > > > > > > > > > the > > > > > > > > > > >>> > group > > > > > > > > > > >>> > > > > >> > coordinator > > > > > > > > > > >>> > > > > >> > > > and > > > > > > > > > > >>> > > > > >> > > > > transaction coordinator are on the > > > > latest > > > > > > > > version > > > > > > > > > > >>> during > > > > > > > > > > >>> > > init > > > > > > > > > > >>> > > > > >> stage. > > > > > > > > > > >>> > > > > >> > > This > > > > > > > > > > >>> > > > > >> > > > > is potentially doable as we only > > need > > > a > > > > > > > consumer > > > > > > > > > > >>> group.id > > > > > > > > > > >>> > > > > >> > > > > to check that. In the meantime, a > > > > > hard-coded > > > > > > > > > config > > > > > > > > > > is > > > > > > > > > > >>> > > still a > > > > > > > > > > >>> > > > > >> > > favorable > > > > > > > > > > >>> > > > > >> > > > > backup in case the server has > > > > downgraded, > > > > > so > > > > > > > you > > > > > > > > > > will > > > > > > > > > > >>> want > > > > > > > > > > >>> > > to > > > > > > > > > > >>> > > > use > > > > > > > > > > >>> > > > > >> a > > > > > > > > > > >>> > > > > >> > new > > > > > > > > > > >>> > > > > >> > > > > version client without `consumer > > > group` > > > > > > > > > > transactional > > > > > > > > > > >>> > > support. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > 5. This is a meta question: have > you > > > > > > > considered > > > > > > > > > how > > > > > > > > > > >>> this > > > > > > > > > > >>> > can > > > > > > > > > > >>> > > > be > > > > > > > > > > >>> > > > > >> > applied > > > > > > > > > > >>> > > > > >> > > > to > > > > > > > > > > >>> > > > > >> > > > > > Kafka Connect as well? For > > example, > > > > for > > > > > > > source > > > > > > > > > > >>> > connectors, > > > > > > > > > > >>> > > > the > > > > > > > > > > >>> > > > > >> > > > assignment > > > > > > > > > > >>> > > > > >> > > > > > is not by "partitions", but by > > some > > > > > other > > > > > > > sort > > > > > > > > > of > > > > > > > > > > >>> > > > "resources" > > > > > > > > > > >>> > > > > >> based > > > > > > > > > > >>> > > > > >> > > on > > > > > > > > > > >>> > > > > >> > > > > the > > > > > > > > > > >>> > > > > >> > > > > > source systems, how KIP-447 > would > > > > affect > > > > > > > Kafka > > > > > > > > > > >>> > Connectors > > > > > > > > > > >>> > > > that > > > > > > > > > > >>> > > > > >> > > > > implemented > > > > > > > > > > >>> > > > > >> > > > > > EOS as well? > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > No, it's not currently included in > > the > > > > > > scope. > > > > > > > > > Could > > > > > > > > > > >>> you > > > > > > > > > > >>> > > point > > > > > > > > > > >>> > > > me > > > > > > > > > > >>> > > > > >> to a > > > > > > > > > > >>> > > > > >> > > > > sample source connector who uses > > EOS? > > > > > Could > > > > > > > > always > > > > > > > > > > >>> > > piggy-back > > > > > > > > > > >>> > > > into > > > > > > > > > > >>> > > > > >> > the > > > > > > > > > > >>> > > > > >> > > > > TxnProducerIdentity struct with > more > > > > > > > information > > > > > > > > > > such > > > > > > > > > > >>> as > > > > > > > > > > >>> > > > tasks. If > > > > > > > > > > >>> > > > > >> > > > > this is something to support in > near > > > > term, > > > > > > an > > > > > > > > > > abstract > > > > > > > > > > >>> > type > > > > > > > > > > >>> > > > called > > > > > > > > > > >>> > > > > >> > > > > "Resource" could be provided and > let > > > > topic > > > > > > > > > partition > > > > > > > > > > >>> and > > > > > > > > > > >>> > > > connect > > > > > > > > > > >>> > > > > >> task > > > > > > > > > > >>> > > > > >> > > > > implement it. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > Guozhang > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > On Sat, Jun 22, 2019 at 8:40 PM > > > Ismael > > > > > > Juma > > > > > > > < > > > > > > > > > > >>> > > > ism...@juma.me.uk> > > > > > > > > > > >>> > > > > >> > > wrote: > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > Hi Boyang, > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > Thanks for the KIP. It's good > > that > > > > we > > > > > > > > listed a > > > > > > > > > > >>> number > > > > > > > > > > >>> > of > > > > > > > > > > >>> > > > > >> rejected > > > > > > > > > > >>> > > > > >> > > > > > > alternatives. It would be > > helpful > > > to > > > > > > have > > > > > > > an > > > > > > > > > > >>> > explanation > > > > > > > > > > >>> > > > of > > > > > > > > > > >>> > > > > >> why > > > > > > > > > > >>> > > > > >> > > they > > > > > > > > > > >>> > > > > >> > > > > were > > > > > > > > > > >>> > > > > >> > > > > > > rejected. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > Ismael > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > On Sat, Jun 22, 2019 at 8:31 > PM > > > > Boyang > > > > > > > Chen > > > > > > > > < > > > > > > > > > > >>> > > > > >> bche...@outlook.com > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > > > wrote: > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > Hey all, > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > I would like to start a > > > discussion > > > > > for > > > > > > > > > > KIP-447: > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > this is a work originated by > > > Jason > > > > > > > > Gustafson > > > > > > > > > > >>> and we > > > > > > > > > > >>> > > > would > > > > > > > > > > >>> > > > > >> like > > > > > > > > > > >>> > > > > >> > to > > > > > > > > > > >>> > > > > >> > > > > > proceed > > > > > > > > > > >>> > > > > >> > > > > > > > into discussion stage. > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > Let me know your thoughts, > > > thanks! > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > Boyang > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > -- > > > > > > > > > > >>> > > > > >> > > > > > -- Guozhang > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > -- > > > > > > > > > > >>> > > > > >> > -- Guozhang > > > > > > > > > > >>> > > > > >> > > > > > > > > > > > >>> > > > > >> > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> -- > > > > > > > > > > >>> -- Guozhang > > > > > > > > > > >>> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >