Hi Boyang, thanks for the updates. I have a few more comments:

1. We are adding some new fields to TxnOffsetCommit to support group-based
fencing. Do we need these fields to be persisted in the offsets topic to
ensure that the fencing still works after a coordinator failover?

2. Since you are proposing a new `groupMetadata` API, have you considered
whether we still need the `initTransactions` overload? Another way would be
to pass it through the `sendOffsetsToTransaction` API:

void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>
offsets, GroupMetadata groupMetadata) throws
ProducerFencedException, IllegalGenerationException;

This seems a little more consistent with the current API and avoids the
direct dependence on the Consumer in the producer.

3. Can you clarify the behavior of the clients when the brokers do not
support the latest API versions? This is both for the new TxnOffsetCommit
and the OffsetFetch APIs. I guess the high level idea in streams is to
detect broker support before instantiating the producer and consumer. I
think that's reasonable, but we might need some approach for non-streams
use cases. One option I was considering is enforcing the latest version
through the new `sendOffsetsToTransaction` API. Basically when you use the
new API, we require support for the latest TxnOffsetCommit version. This
puts some burden on users, but it avoids breaking correctness assumptions
when the new APIs are in use. What do you think?


-Jason






On Mon, Aug 5, 2019 at 6:06 PM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Yep, Guozhang I think that would be best as passing in an entire consumer
> instance is indeed cumbersome.
>
> Just saw you updated KIP-429, I will follow-up to change 447 as well.
>
> Best,
> Boyang
>
> On Mon, Aug 5, 2019 at 3:18 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > okay I think I understand your concerns about ConsumerGroupMetadata now:
> if
> > we still want to only call initTxns once, then we should allow the
> whatever
> > passed-in parameter to reflect the latest value of generation id whenever
> > sending the offset fetch request.
> >
> > Whereas the current ConsumerGroupMetadata is a static object.
> >
> > Maybe we can consider having an extended class of ConsumerGroupMetadata
> > whose values are updated from the consumer's rebalance callback?
> >
> >
> > Guozhang
> >
> >
> > On Mon, Aug 5, 2019 at 9:26 AM Boyang Chen <reluctanthero...@gmail.com>
> > wrote:
> >
> > > Thank you Guozhang for the reply! I'm curious whether KIP-429 has
> > reflected
> > > the latest change on ConsumerGroupMetadata? Also regarding question
> one,
> > > the group metadata needs to be accessed via callback, does that mean we
> > > need a separate producer API such like
> > > "producer.refreshMetadata(groupMetadata)" to be able to access it
> instead
> > > of passing in the consumer instance?
> > >
> > > Boyang
> > >
> > > On Fri, Aug 2, 2019 at 4:36 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >
> > > > Thanks Boyang,
> > > >
> > > > I've made another pass on KIP-447 as well as
> > > > https://github.com/apache/kafka/pull/7078, and have some minor
> > comments
> > > > about the proposed API:
> > > >
> > > > 1. it seems instead of needing the whole KafkaConsumer object, you'd
> > only
> > > > need the "ConsumerGroupMetadata", in that case can we just pass in
> that
> > > > object into the initTxns call?
> > > >
> > > > 2. the current trunk already has a public class named
> > > > (ConsumerGroupMetadata)
> > > > under o.a.k.clients.consumer created by KIP-429. If we want to just
> use
> > > > that then maybe it makes less sense to declare a base GroupMetadata
> as
> > we
> > > > are already leaking such information on the assignor anyways.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thank you Guozhang for the reply. We will consider the interface
> > change
> > > > > from 429 as a backup plan for 447.
> > > > >
> > > > > And bumping this thread for more discussion.
> > > > >
> > > > > On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen <
> > > > reluctanthero...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Thank you Guozhang for the suggestion! I would normally prefer
> > > > naming a
> > > > > > > flag corresponding to its functionality. Seems to me
> > > > `isolation_level`
> > > > > > > makes us another hop on information track.
> > > > > > >
> > > > > > > Fair enough, let's use a separate flag name then :)
> > > > > >
> > > > > >
> > > > > > > As for the generation.id exposure, I'm fine leveraging the new
> > API
> > > > > from
> > > > > > > 429, but however is that design finalized yet, and whether the
> > API
> > > > will
> > > > > > be
> > > > > > > added on the generic Consumer<K, V> interface?
> > > > > > >
> > > > > > > The current PartitionAssignor is inside `internals` package and
> > in
> > > > > > KIP-429
> > > > > > we are going to create a new interface out of `internals` to
> really
> > > > make
> > > > > it
> > > > > > public APIs, and as part of that we are refactoring some of its
> > > method
> > > > > > signatures. I just feel some of the newly introduced classes can
> be
> > > > > reused
> > > > > > in your KIP as well, i.e. just for code succinctness, but no
> > > semantical
> > > > > > indications.
> > > > > >
> > > > > >
> > > > > > > Boyang
> > > > > > >
> > > > > > > On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang <
> > wangg...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Boyang, thanks for the updated proposal!
> > > > > > > >
> > > > > > > > 3.a. As Jason mentioned, with EOS enabled we still need to
> > > augment
> > > > > the
> > > > > > > > offset fetch request with a boolean to indicate "give me an
> > > > retriable
> > > > > > > error
> > > > > > > > code if there's pending offset, rather than sending me the
> > > > committed
> > > > > > > offset
> > > > > > > > immediately". Personally I still feel it is okay to
> piggy-back
> > on
> > > > the
> > > > > > > > ISOLATION_LEVEL boolean, but I'm also fine with another
> > > > > > > `await_transaction`
> > > > > > > > boolean if you feel strongly about it.
> > > > > > > >
> > > > > > > > 10. About the exposure of generation id, there may be some
> > > > > refactoring
> > > > > > > work
> > > > > > > > coming from KIP-429 that can benefit KIP-447 as well since we
> > are
> > > > > > > wrapping
> > > > > > > > the consumer subscription / assignment data in new classes.
> > Note
> > > > that
> > > > > > > > current proposal does not `generationId` since with the
> > > cooperative
> > > > > > > sticky
> > > > > > > > assignor we think it is not necessary for correctness, but
> also
> > > if
> > > > we
> > > > > > > agree
> > > > > > > > it is okay to expose it we can potentially include it in
> > > > > > > > `ConsumerAssignmentData` as well.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen <
> > > > > > reluctanthero...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thank you Jason for the ideas.
> > > > > > > > >
> > > > > > > > > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson <
> > > > > ja...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Boyang,
> > > > > > > > > >
> > > > > > > > > > Thanks for the updates. A few comments below:
> > > > > > > > > >
> > > > > > > > > > 1. The KIP mentions that `transaction.timeout.ms` should
> > be
> > > > > > reduced
> > > > > > > to
> > > > > > > > > > 10s.
> > > > > > > > > > I think this makes sense for Kafka Streams which is tied
> to
> > > the
> > > > > > > > consumer
> > > > > > > > > > group semantics and uses a default 10s session timeout.
> > > > However,
> > > > > it
> > > > > > > > > seems a
> > > > > > > > > > bit dangerous to make this change for the producer
> > generally.
> > > > > Could
> > > > > > > we
> > > > > > > > > just
> > > > > > > > > > change it for streams?
> > > > > > > > > >
> > > > > > > > > > That sounds good to me.
> > > > > > > > >
> > > > > > > > > > 2. The new `initTransactions` API takes a `Consumer`
> > > instance.
> > > > I
> > > > > > > think
> > > > > > > > > the
> > > > > > > > > > idea is to basically put in a backdoor to give the
> producer
> > > > > access
> > > > > > to
> > > > > > > > the
> > > > > > > > > > group generationId. It's not clear to me how this would
> > work
> > > > > given
> > > > > > > > > package
> > > > > > > > > > restrictions. I wonder if it would be better to just
> expose
> > > the
> > > > > > state
> > > > > > > > we
> > > > > > > > > > need from the consumer. I know we have been reluctant to
> do
> > > > this
> > > > > so
> > > > > > > far
> > > > > > > > > > because we treat the generationId as an implementation
> > > detail.
> > > > > > > > However, I
> > > > > > > > > > think we might just bite the bullet and expose it rather
> > than
> > > > > > coming
> > > > > > > up
> > > > > > > > > > with a messy hack. Concepts such as memberIds have
> already
> > > been
> > > > > > > exposed
> > > > > > > > > in
> > > > > > > > > > the AdminClient, so maybe it is not too bad.
> Alternatively,
> > > we
> > > > > > could
> > > > > > > > use
> > > > > > > > > an
> > > > > > > > > > opaque type. For example:
> > > > > > > > > >
> > > > > > > > > > // public
> > > > > > > > > > interface GroupMetadata {}
> > > > > > > > > >
> > > > > > > > > > // private
> > > > > > > > > > interface ConsumerGroupMetadata {
> > > > > > > > > >   final int generationId;
> > > > > > > > > >   final String memberId;
> > > > > > > > > > }
> > > > > > > > > >
> > > > > > > > > > // Consumer API
> > > > > > > > > > public GroupMetadata groupMetadata();
> > > > > > > > > >
> > > > > > > > > > I am probably leaning toward just exposing the state we
> > need.
> > > > > > > > > >
> > > > > > > > > > Yes, also to mention that Kafka Streams use generic
> > Cosnumer
> > > > API
> > > > > > > which
> > > > > > > > > doesn't have rich
> > > > > > > > > states like a full `KafkaConsumer`. The hack will not work
> as
> > > > > > expected.
> > > > > > > > >
> > > > > > > > > Instead, just exposing the consumer generation.id seems a
> > way
> > > > > easier
> > > > > > > > work.
> > > > > > > > > We could consolidate
> > > > > > > > > the API and make it
> > > > > > > > >
> > > > > > > > > 3. Given that we are already providing a way to propagate
> > group
> > > > > state
> > > > > > > > from
> > > > > > > > > > the consumer to the producer, I wonder if we may as well
> > > > include
> > > > > > the
> > > > > > > > > > memberId and groupInstanceId. This would make the
> > validation
> > > we
> > > > > do
> > > > > > > for
> > > > > > > > > > TxnOffsetCommit consistent with OffsetCommit. If for no
> > other
> > > > > > > benefit,
> > > > > > > > at
> > > > > > > > > > least this may help with debugging.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yes, we could put them into the GroupMetadata struct.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > 4. I like the addition of isolation_level to the offset
> > > fetch.
> > > > At
> > > > > > the
> > > > > > > > > same
> > > > > > > > > > time, its behavior is a bit inconsistent with how it is
> > used
> > > in
> > > > > the
> > > > > > > > > > consumer generally. There is no reason for the group
> > > > coordinator
> > > > > to
> > > > > > > > ever
> > > > > > > > > > expose aborted data, so this is mostly about awaiting
> > pending
> > > > > > offset
> > > > > > > > > > commits, not reading uncommitted data. Perhaps instead of
> > > > calling
> > > > > > > this
> > > > > > > > > > "isolation level," it should be more like
> > > > > > "await_pending_transaction"
> > > > > > > > or
> > > > > > > > > > something like that?
> > > > > > > > > >
> > > > > > > > > > Also, just to be clear, the consumer would treat this as
> an
> > > > > > optional
> > > > > > > > > field,
> > > > > > > > > > right? So if the broker does not support the latest
> > > OffsetFetch
> > > > > > API,
> > > > > > > it
> > > > > > > > > > would silently revert to reading the old data. Basically
> it
> > > > would
> > > > > > be
> > > > > > > up
> > > > > > > > > to
> > > > > > > > > > the streams version probing logic to ensure that the
> > > > expectation
> > > > > on
> > > > > > > > this
> > > > > > > > > > API fits with the usage of `transctional.id`.
> > > > > > > > > >
> > > > > > > > > > Sounds like a better naming to me, while I think it could
> > be
> > > > > > > shortened
> > > > > > > > to
> > > > > > > > > `await_transaction`.
> > > > > > > > > I think the field should be optional, too.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Jason
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen <
> > > > > > > reluctanthero...@gmail.com
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hey Guozhang,
> > > > > > > > > > >
> > > > > > > > > > > I will correct my statement from last email. I don't
> > think
> > > > the
> > > > > > > > > > > read_committed (3.a) is necessary to be added to the
> > > > > OffsetFetch
> > > > > > > > > request,
> > > > > > > > > > > as if we are using EOS application, the underlying
> > > consumers
> > > > > > within
> > > > > > > > the
> > > > > > > > > > > group should always back off when there is pending
> > offsets.
> > > > > > > > > > >
> > > > > > > > > > > Let me know if you think this is correct.
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen <
> > > > > > > > reluctanthero...@gmail.com
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thank you Guozhang for the questions, inline answers
> > are
> > > > > below.
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen <
> > > > > > > > > reluctanthero...@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> Hey all,
> > > > > > > > > > > >>
> > > > > > > > > > > >> I have done a fundamental polish of KIP-447
> > > > > > > > > > > >> <
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > > > > > > > > >
> > > > > > > > > > > and
> > > > > > > > > > > >> written a design doc
> > > > > > > > > > > >> <
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit#
> > > > > > > > > > >
> > > > > > > > > > > depicting
> > > > > > > > > > > >> internal changes. We stripped off many
> implementation
> > > > > details
> > > > > > > from
> > > > > > > > > the
> > > > > > > > > > > KIP,
> > > > > > > > > > > >> and simplified the public changes by a lot. For
> > > reviewers,
> > > > > it
> > > > > > is
> > > > > > > > > > highly
> > > > > > > > > > > >> recommended to fully understand EOS design in KIP-98
> > and
> > > > > read
> > > > > > > its
> > > > > > > > > > > >> corresponding design doc
> > > > > > > > > > > >> <
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit
> > > > > > > > > > >
> > > > > > > > > > > if
> > > > > > > > > > > >> you haven't done so already.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Let me know if you found anything confusing around
> the
> > > KIP
> > > > > or
> > > > > > > the
> > > > > > > > > > > design.
> > > > > > > > > > > >> Would be happy to discuss in depth.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Best,
> > > > > > > > > > > >> Boyang
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang <
> > > > > > > > wangg...@gmail.com>
> > > > > > > > > > > >> wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >>> 2. The reason we did not expose generation.id from
> > > > > > > KafkaConsumer
> > > > > > > > > > > public
> > > > > > > > > > > >>> APIs directly is to abstract this notion from users
> > > > (since
> > > > > it
> > > > > > > is
> > > > > > > > an
> > > > > > > > > > > >>> implementation detail of the rebalance protocol
> > itself,
> > > > > e.g.
> > > > > > if
> > > > > > > > > user
> > > > > > > > > > > >>> calls
> > > > > > > > > > > >>> consumer.assign() they do not need to invoke
> > > > > > > ConsumerCoordinator
> > > > > > > > > and
> > > > > > > > > > no
> > > > > > > > > > > >>> need to be aware of generation.id at all).
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> On the other hand, with the current proposal the
> > > > > > > txn.coordiantor
> > > > > > > > > did
> > > > > > > > > > > not
> > > > > > > > > > > >>> know about the latest generation from the
> > > source-of-truth
> > > > > > > > > > > >>> group.coordinator; instead, it will only bump up
> the
> > > > > > generation
> > > > > > > > > from
> > > > > > > > > > > the
> > > > > > > > > > > >>> producer's InitProducerIdRequest only.
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> The key here is that GroupCoordinator, when
> handling
> > > > > > > > > > > >>> `InitProducerIdRequest
> > > > > > > > > > > >>>
> > > > > > > > > > > >> In the new design, we just pass the entire consumer
> > > > instance
> > > > > > > into
> > > > > > > > > the
> > > > > > > > > > > > producer through
> > > > > > > > > > > > #initTransaction, so no public API will be created.
> > > > > > > > > > > >
> > > > > > > > > > > >> 3. I agree that if we rely on the group coordinator
> to
> > > > block
> > > > > > on
> > > > > > > > > > > returning
> > > > > > > > > > > >>> offset-fetch-response if read-committed is enabled,
> > > then
> > > > we
> > > > > > do
> > > > > > > > not
> > > > > > > > > > need
> > > > > > > > > > > >>> to
> > > > > > > > > > > >>> store partition assignment on txn coordinator and
> > > > therefore
> > > > > > > it's
> > > > > > > > > > better
> > > > > > > > > > > >>> to
> > > > > > > > > > > >>> still decouple them. For that case we still need to
> > > > update
> > > > > > the
> > > > > > > > KIP
> > > > > > > > > > wiki
> > > > > > > > > > > >>> page that includes:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> 3.a. Augment OffsetFetchRequest with the
> > > ISOLATION_LEVEL
> > > > as
> > > > > > > well.
> > > > > > > > > > > >>> 3.b. Add new error code in OffsetFetchResponse to
> let
> > > > > client
> > > > > > > > > backoff
> > > > > > > > > > > and
> > > > > > > > > > > >>> retry if there are pending txns including the
> > > interested
> > > > > > > > > partitions.
> > > > > > > > > > > >>> 3.c. Also in the worst case we would let the client
> > be
> > > > > > blocked
> > > > > > > > for
> > > > > > > > > > the
> > > > > > > > > > > >>> txn.timeout period, and for that rationale we may
> > need
> > > to
> > > > > > > > consider
> > > > > > > > > > > >>> reducing
> > > > > > > > > > > >>> our default txn.timeout value as well.
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Addressed 3.b and 3.c, will do 3.a.
> > > > > > > > > > > >
> > > > > > > > > > > >> 4. According to Colin it seems we do not need to
> > create
> > > > > > another
> > > > > > > > KIP
> > > > > > > > > > and
> > > > > > > > > > > we
> > > > > > > > > > > >>> can just complete it as part of KIP-117 /
> KAFKA-5214;
> > > and
> > > > > we
> > > > > > > need
> > > > > > > > > to
> > > > > > > > > > do
> > > > > > > > > > > >>> some cleanup to have BrokerApiVersion exposed from
> > > > > > AdminClient
> > > > > > > > > > (@Colin
> > > > > > > > > > > >>> please let use know if you have any concerns
> exposing
> > > > it).
> > > > > > > > > > > >>>
> > > > > > > > > > > >> I think we no longer need to rely on api version for
> > > > > > > > initialization,
> > > > > > > > > > > > since we will be using the upgrade.from config
> anyway.
> > > > > > > > > > > >
> > > > > > > > > > > >>
> > > > > > > > > > > >>> Guozhang
> > > > > > > > > > > >>>
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson <
> > > > > > > > > ja...@confluent.io>
> > > > > > > > > > > >>> wrote:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> > For reference, we have BrokerApiVersionCommand
> > > already
> > > > > as a
> > > > > > > > > public
> > > > > > > > > > > >>> > interface. We have a bit of tech debt at the
> moment
> > > > > because
> > > > > > > it
> > > > > > > > > > uses a
> > > > > > > > > > > >>> > custom AdminClient. It would be nice to clean
> that
> > > up.
> > > > In
> > > > > > > > > general,
> > > > > > > > > > I
> > > > > > > > > > > >>> think
> > > > > > > > > > > >>> > it is reasonable to expose from AdminClient. It
> can
> > > be
> > > > > used
> > > > > > > by
> > > > > > > > > > > >>> management
> > > > > > > > > > > >>> > tools to inspect running Kafka versions for
> > example.
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > -Jason
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen <
> > > > > > > > > > > >>> reluctanthero...@gmail.com>
> > > > > > > > > > > >>> > wrote:
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > > Thank you for the context Colin. The groupId
> was
> > > > > indeed a
> > > > > > > > > > > copy-paste
> > > > > > > > > > > >>> > error.
> > > > > > > > > > > >>> > > Our use case here for 447 is (Quoted from
> > > Guozhang):
> > > > > > > > > > > >>> > > '''
> > > > > > > > > > > >>> > > I think if we can do something else to
> > > > > > > > > > > >>> > > avoid this config though, for example we can
> use
> > > the
> > > > > > > embedded
> > > > > > > > > > > >>> AdminClient
> > > > > > > > > > > >>> > > to send the APIVersion request upon starting
> up,
> > > and
> > > > > > based
> > > > > > > on
> > > > > > > > > the
> > > > > > > > > > > >>> > returned
> > > > > > > > > > > >>> > > value decides whether to go to the old code
> path
> > or
> > > > the
> > > > > > new
> > > > > > > > > > > behavior.
> > > > > > > > > > > >>> > > '''
> > > > > > > > > > > >>> > > The benefit we get is to avoid adding a new
> > > > > configuration
> > > > > > > to
> > > > > > > > > > make a
> > > > > > > > > > > >>> > > decision simply base on broker version. If you
> > have
> > > > > > > concerns
> > > > > > > > > with
> > > > > > > > > > > >>> > exposing
> > > > > > > > > > > >>> > > ApiVersion for client, we could
> > > > > > > > > > > >>> > > try to think of alternative solutions too.
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> > > Boyang
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe <
> > > > > > > > > cmcc...@apache.org
> > > > > > > > > > >
> > > > > > > > > > > >>> wrote:
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> > > > kafka.api.ApiVersion is an internal class,
> not
> > > > > suitable
> > > > > > > to
> > > > > > > > > > > exposing
> > > > > > > > > > > >>> > > > through AdminClient.  That class is not even
> > > > > accessible
> > > > > > > > > without
> > > > > > > > > > > >>> having
> > > > > > > > > > > >>> > > the
> > > > > > > > > > > >>> > > > broker jars on your CLASSPATH.
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > Another question is, what is the groupId
> > > parameter
> > > > > > doing
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > >>> call?
> > > > > > > > > > > >>> > > The
> > > > > > > > > > > >>> > > > API versions are the same no matter what
> > consumer
> > > > > group
> > > > > > > we
> > > > > > > > > use,
> > > > > > > > > > > >>> right?
> > > > > > > > > > > >>> > > > Perhaps this was a copy and paste error?
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > This is not the first time we have discussed
> > > > having a
> > > > > > > > method
> > > > > > > > > in
> > > > > > > > > > > >>> > > > AdminClient to retrieve API version
> > information.
> > > > In
> > > > > > > fact,
> > > > > > > > > the
> > > > > > > > > > > >>> original
> > > > > > > > > > > >>> > > KIP
> > > > > > > > > > > >>> > > > which created KafkaAdminClient specified an
> API
> > > for
> > > > > > > > fetching
> > > > > > > > > > > >>> version
> > > > > > > > > > > >>> > > > information.  It was called apiVersions and
> it
> > is
> > > > > still
> > > > > > > > there
> > > > > > > > > > on
> > > > > > > > > > > >>> the
> > > > > > > > > > > >>> > > wiki.
> > > > > > > > > > > >>> > > > See
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > However, this API wasn't ready in time for
> > 0.11.0
> > > > so
> > > > > we
> > > > > > > > > shipped
> > > > > > > > > > > >>> without
> > > > > > > > > > > >>> > > > it.  There was a JIRA to implement it for
> later
> > > > > > versions,
> > > > > > > > > > > >>> > > >
> > https://issues.apache.org/jira/browse/KAFKA-5214
> > > ,
> > > > > as
> > > > > > > well
> > > > > > > > > as
> > > > > > > > > > a
> > > > > > > > > > > >>> PR,
> > > > > > > > > > > >>> > > > https://github.com/apache/kafka/pull/3012 .
> > > > > However,
> > > > > > we
> > > > > > > > > > started
> > > > > > > > > > > >>> to
> > > > > > > > > > > >>> > > > rethink whether this AdminClient function was
> > > even
> > > > > > > > necessary.
> > > > > > > > > > > >>> Most of
> > > > > > > > > > > >>> > > the
> > > > > > > > > > > >>> > > > use-cases we could think of seemed like
> > horrible
> > > > > hacks.
> > > > > > > So
> > > > > > > > > it
> > > > > > > > > > > has
> > > > > > > > > > > >>> > never
> > > > > > > > > > > >>> > > > really been implemented (yet?).
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > best,
> > > > > > > > > > > >>> > > > Colin
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen
> > > wrote:
> > > > > > > > > > > >>> > > > > Actually, after a second thought, I think
> it
> > > > > actually
> > > > > > > > makes
> > > > > > > > > > > >>> sense to
> > > > > > > > > > > >>> > > > > support auto upgrade through admin client
> to
> > > help
> > > > > use
> > > > > > > get
> > > > > > > > > api
> > > > > > > > > > > >>> version
> > > > > > > > > > > >>> > > > > from
> > > > > > > > > > > >>> > > > > broker.
> > > > > > > > > > > >>> > > > > A draft KIP is here:
> > > > > > > > > > > >>> > > > >
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> > > > > > > > > > > >>> > > > >
> > > > > > > > > > > >>> > > > > Boyang
> > > > > > > > > > > >>> > > > >
> > > > > > > > > > > >>> > > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang
> Chen <
> > > > > > > > > > > >>> > > reluctanthero...@gmail.com>
> > > > > > > > > > > >>> > > > > wrote:
> > > > > > > > > > > >>> > > > >
> > > > > > > > > > > >>> > > > > > Thank you Guozhang, some of my
> > understandings
> > > > are
> > > > > > > > inline
> > > > > > > > > > > below.
> > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > >>> > > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason
> > > > Gustafson
> > > > > <
> > > > > > > > > > > >>> > ja...@confluent.io
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > > > wrote:
> > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >> > I think co-locating does have some
> > merits
> > > > > here,
> > > > > > > i.e.
> > > > > > > > > > > >>> letting the
> > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator which has the
> > > > > > source-of-truth
> > > > > > > of
> > > > > > > > > > > >>> assignment
> > > > > > > > > > > >>> > to
> > > > > > > > > > > >>> > > > act
> > > > > > > > > > > >>> > > > > >> as
> > > > > > > > > > > >>> > > > > >> > the TxnCoordinator as well; but I
> agree
> > > > > there's
> > > > > > > also
> > > > > > > > > > some
> > > > > > > > > > > >>> cons
> > > > > > > > > > > >>> > of
> > > > > > > > > > > >>> > > > > >> coupling
> > > > > > > > > > > >>> > > > > >> > them together. I'm still a bit
> inclining
> > > > > towards
> > > > > > > > > > > colocation
> > > > > > > > > > > >>> but
> > > > > > > > > > > >>> > if
> > > > > > > > > > > >>> > > > there
> > > > > > > > > > > >>> > > > > >> > are good rationales not to do so I can
> > be
> > > > > > > convinced
> > > > > > > > as
> > > > > > > > > > > well.
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > > > >> The good rationale is that we have no
> > > > mechanism
> > > > > to
> > > > > > > > > > colocate
> > > > > > > > > > > >>> > > > partitions ;).
> > > > > > > > > > > >>> > > > > >> Are you suggesting we store the group
> and
> > > > > > > transaction
> > > > > > > > > > state
> > > > > > > > > > > >>> in the
> > > > > > > > > > > >>> > > > same
> > > > > > > > > > > >>> > > > > >> log? Can you be more concrete about the
> > > > benefit?
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > > > >> -Jason
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > > > >> On Tue, Jun 25, 2019 at 10:51 AM
> Guozhang
> > > > Wang <
> > > > > > > > > > > >>> > wangg...@gmail.com>
> > > > > > > > > > > >>> > > > > >> wrote:
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > > > >> > Hi Boyang,
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >> > 1. One advantage of retry against
> > on-hold
> > > is
> > > > > > that
> > > > > > > it
> > > > > > > > > > will
> > > > > > > > > > > >>> not
> > > > > > > > > > > >>> > > > tie-up a
> > > > > > > > > > > >>> > > > > >> > handler thread (of course the latter
> > could
> > > > do
> > > > > > the
> > > > > > > > same
> > > > > > > > > > but
> > > > > > > > > > > >>> that
> > > > > > > > > > > >>> > > > involves
> > > > > > > > > > > >>> > > > > >> > using a purgatory which is more
> > > > complicated),
> > > > > > and
> > > > > > > > also
> > > > > > > > > > it
> > > > > > > > > > > is
> > > > > > > > > > > >>> > less
> > > > > > > > > > > >>> > > > > >> likely to
> > > > > > > > > > > >>> > > > > >> > violate request timeout. So I think
> > there
> > > > are
> > > > > > some
> > > > > > > > > > > >>> rationales to
> > > > > > > > > > > >>> > > > prefer
> > > > > > > > > > > >>> > > > > >> > retries.
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > > > >  That sounds fair to me, also we are
> > avoiding
> > > > > usage
> > > > > > > of
> > > > > > > > > > > another
> > > > > > > > > > > >>> > > > purgatory
> > > > > > > > > > > >>> > > > > > instance. Usually for one back-off
> > > > > > > > > > > >>> > > > > > we are only delaying 50ms during startup
> > > which
> > > > is
> > > > > > > > trivial
> > > > > > > > > > > cost.
> > > > > > > > > > > >>> > This
> > > > > > > > > > > >>> > > > > > behavior shouldn't be changed.
> > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > >>> > > > > > > 2. Regarding
> "ConsumerRebalanceListener":
> > > > both
> > > > > > > > > > > >>> > > > ConsumerRebalanceListener
> > > > > > > > > > > >>> > > > > >> > and PartitionAssignors are
> > > user-customizable
> > > > > > > > modules,
> > > > > > > > > > and
> > > > > > > > > > > >>> only
> > > > > > > > > > > >>> > > > > >> difference
> > > > > > > > > > > >>> > > > > >> > is that the former is specified via
> code
> > > and
> > > > > the
> > > > > > > > > latter
> > > > > > > > > > is
> > > > > > > > > > > >>> > > > specified via
> > > > > > > > > > > >>> > > > > >> > config.
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >> > Regarding Jason's proposal of
> > > > > > ConsumerAssignment,
> > > > > > > > one
> > > > > > > > > > > thing
> > > > > > > > > > > >>> to
> > > > > > > > > > > >>> > > note
> > > > > > > > > > > >>> > > > > >> though
> > > > > > > > > > > >>> > > > > >> > with KIP-429 the onPartitionAssigned
> may
> > > not
> > > > > be
> > > > > > > > called
> > > > > > > > > > if
> > > > > > > > > > > >>> the
> > > > > > > > > > > >>> > > > assignment
> > > > > > > > > > > >>> > > > > >> > does not change, whereas onAssignment
> > > would
> > > > > > always
> > > > > > > > be
> > > > > > > > > > > >>> called at
> > > > > > > > > > > >>> > > the
> > > > > > > > > > > >>> > > > end
> > > > > > > > > > > >>> > > > > >> of
> > > > > > > > > > > >>> > > > > >> > sync-group response. My proposed
> > semantics
> > > > is
> > > > > > that
> > > > > > > > > > > >>> > > > > >> > `RebalanceListener#onPartitionsXXX`
> are
> > > used
> > > > > for
> > > > > > > > > > > >>> notifications
> > > > > > > > > > > >>> > to
> > > > > > > > > > > >>> > > > user,
> > > > > > > > > > > >>> > > > > >> and
> > > > > > > > > > > >>> > > > > >> > hence if there's no changes these will
> > not
> > > > be
> > > > > > > > called,
> > > > > > > > > > > >>> whereas
> > > > > > > > > > > >>> > > > > >> > `PartitionAssignor` is used for
> assignor
> > > > > logic,
> > > > > > > > whose
> > > > > > > > > > > >>> callback
> > > > > > > > > > > >>> > > would
> > > > > > > > > > > >>> > > > > >> always
> > > > > > > > > > > >>> > > > > >> > be called no matter if the partitions
> > have
> > > > > > changed
> > > > > > > > or
> > > > > > > > > > not.
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > > > >> I think a third option is to gracefully
> > > expose
> > > > > > > > > generation
> > > > > > > > > > id
> > > > > > > > > > > >>> as
> > > > > > > > > > > >>> > part
> > > > > > > > > > > >>> > > > of
> > > > > > > > > > > >>> > > > > > consumer API, so that we don't need to
> > > > > > > > > > > >>> > > > > > bother overloading various callbacks. Of
> > > > course,
> > > > > > this
> > > > > > > > > > builds
> > > > > > > > > > > >>> upon
> > > > > > > > > > > >>> > the
> > > > > > > > > > > >>> > > > > > assumption that topic partitions
> > > > > > > > > > > >>> > > > > > will not be included in new
> initTransaction
> > > > API.
> > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > >>> > > > > > > 3. I feel it is a bit awkward to let
> the
> > > > > > > > TxnCoordinator
> > > > > > > > > > > >>> keeping
> > > > > > > > > > > >>> > > > partition
> > > > > > > > > > > >>> > > > > >> > assignments since it is sort of taking
> > > over
> > > > > the
> > > > > > > job
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator, and may likely
> > cause
> > > a
> > > > > > > > > split-brain
> > > > > > > > > > > >>> problem
> > > > > > > > > > > >>> > as
> > > > > > > > > > > >>> > > > two
> > > > > > > > > > > >>> > > > > >> > coordinators keep a copy of this
> > > assignment
> > > > > > which
> > > > > > > > may
> > > > > > > > > be
> > > > > > > > > > > >>> > > different.
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >> > I think co-locating does have some
> > merits
> > > > > here,
> > > > > > > i.e.
> > > > > > > > > > > >>> letting the
> > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator which has the
> > > > > > source-of-truth
> > > > > > > of
> > > > > > > > > > > >>> assignment
> > > > > > > > > > > >>> > to
> > > > > > > > > > > >>> > > > act
> > > > > > > > > > > >>> > > > > >> as
> > > > > > > > > > > >>> > > > > >> > the TxnCoordinator as well; but I
> agree
> > > > > there's
> > > > > > > also
> > > > > > > > > > some
> > > > > > > > > > > >>> cons
> > > > > > > > > > > >>> > of
> > > > > > > > > > > >>> > > > > >> coupling
> > > > > > > > > > > >>> > > > > >> > them together. I'm still a bit
> inclining
> > > > > towards
> > > > > > > > > > > colocation
> > > > > > > > > > > >>> but
> > > > > > > > > > > >>> > if
> > > > > > > > > > > >>> > > > there
> > > > > > > > > > > >>> > > > > >> > are good rationales not to do so I can
> > be
> > > > > > > convinced
> > > > > > > > as
> > > > > > > > > > > well.
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > > > > The purpose of co-location is to let txn
> > > > > > coordinator
> > > > > > > > see
> > > > > > > > > > the
> > > > > > > > > > > >>> group
> > > > > > > > > > > >>> > > > > > assignment. This priority is weakened
> > > > > > > > > > > >>> > > > > > when we already have defense on the
> > consumer
> > > > > offset
> > > > > > > > > fetch,
> > > > > > > > > > > so I
> > > > > > > > > > > >>> > guess
> > > > > > > > > > > >>> > > > it's
> > > > > > > > > > > >>> > > > > > not super important anymore.
> > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > >>> > > > > >> > 4. I guess I'm preferring the
> philosophy
> > > of
> > > > > > "only
> > > > > > > > add
> > > > > > > > > > > >>> configs if
> > > > > > > > > > > >>> > > > > >> there's no
> > > > > > > > > > > >>> > > > > >> > other ways", since more and more
> configs
> > > > would
> > > > > > > make
> > > > > > > > it
> > > > > > > > > > > less
> > > > > > > > > > > >>> and
> > > > > > > > > > > >>> > > less
> > > > > > > > > > > >>> > > > > >> > intuitive out of the box to use.
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >> > I think it's a valid point that checks
> > > upon
> > > > > > > starting
> > > > > > > > > up
> > > > > > > > > > > >>> does not
> > > > > > > > > > > >>> > > > cope
> > > > > > > > > > > >>> > > > > >> with
> > > > > > > > > > > >>> > > > > >> > brokers downgrading but even with a
> > > config,
> > > > > but
> > > > > > it
> > > > > > > > is
> > > > > > > > > > > still
> > > > > > > > > > > >>> hard
> > > > > > > > > > > >>> > > for
> > > > > > > > > > > >>> > > > > >> users
> > > > > > > > > > > >>> > > > > >> > to determine when they can be ensured
> > the
> > > > > broker
> > > > > > > > would
> > > > > > > > > > > never
> > > > > > > > > > > >>> > > > downgrade
> > > > > > > > > > > >>> > > > > >> > anymore and hence can safely switch
> the
> > > > > config.
> > > > > > So
> > > > > > > > my
> > > > > > > > > > > >>> feeling is
> > > > > > > > > > > >>> > > > that
> > > > > > > > > > > >>> > > > > >> this
> > > > > > > > > > > >>> > > > > >> > config would not be helping too much
> > > still.
> > > > If
> > > > > > we
> > > > > > > > want
> > > > > > > > > > to
> > > > > > > > > > > >>> be at
> > > > > > > > > > > >>> > > the
> > > > > > > > > > > >>> > > > > >> safer
> > > > > > > > > > > >>> > > > > >> > side, then I'd suggest we modify the
> > > > > Coordinator
> > > > > > > ->
> > > > > > > > > > > >>> > NetworkClient
> > > > > > > > > > > >>> > > > > >> hierarchy
> > > > > > > > > > > >>> > > > > >> > to allow the NetworkClient being able
> to
> > > > pass
> > > > > > the
> > > > > > > > > > > APIVersion
> > > > > > > > > > > >>> > > > metadata to
> > > > > > > > > > > >>> > > > > >> > Coordinator, so that Coordinator can
> > rely
> > > on
> > > > > > that
> > > > > > > > > logic
> > > > > > > > > > to
> > > > > > > > > > > >>> > change
> > > > > > > > > > > >>> > > > its
> > > > > > > > > > > >>> > > > > >> > behavior dynamically.
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > > > > The stream thread init could not be
> > supported
> > > > by
> > > > > a
> > > > > > > > client
> > > > > > > > > > > >>> > coordinator
> > > > > > > > > > > >>> > > > > > behavior change on the fly,
> > > > > > > > > > > >>> > > > > > we are only losing possibilities after we
> > > > > > > initialized.
> > > > > > > > > > (main
> > > > > > > > > > > >>> thread
> > > > > > > > > > > >>> > > > gets
> > > > > > > > > > > >>> > > > > > exit and no thread has global picture
> > > anymore)
> > > > > > > > > > > >>> > > > > > If we do want to support auto version
> > > > detection,
> > > > > > > admin
> > > > > > > > > > client
> > > > > > > > > > > >>> > request
> > > > > > > > > > > >>> > > > in
> > > > > > > > > > > >>> > > > > > this sense shall be easier.
> > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >> > 5. I do not have a concrete idea about
> > how
> > > > the
> > > > > > > > impact
> > > > > > > > > on
> > > > > > > > > > > >>> Connect
> > > > > > > > > > > >>> > > > would
> > > > > > > > > > > >>> > > > > >> > make, maybe Randall or Konstantine can
> > > help
> > > > > > here?
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > > > > Sounds good, let's see their thoughts.
> > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > >>> > > > > >> > Guozhang
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >> > On Mon, Jun 24, 2019 at 10:26 PM
> Boyang
> > > > Chen <
> > > > > > > > > > > >>> > > > > >> reluctanthero...@gmail.com>
> > > > > > > > > > > >>> > > > > >> > wrote:
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >> > > Hey Jason,
> > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > >>> > > > > >> > > thank you for the proposal here.
> Some
> > of
> > > > my
> > > > > > > > thoughts
> > > > > > > > > > > >>> below.
> > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 8:58 PM
> Jason
> > > > > > Gustafson
> > > > > > > <
> > > > > > > > > > > >>> > > > ja...@confluent.io>
> > > > > > > > > > > >>> > > > > >> > > wrote:
> > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > >>> > > > > >> > > > Hi Boyang,
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > > Thanks for picking this up! Still
> > > > reading
> > > > > > > > through
> > > > > > > > > > the
> > > > > > > > > > > >>> > updates,
> > > > > > > > > > > >>> > > > but
> > > > > > > > > > > >>> > > > > >> here
> > > > > > > > > > > >>> > > > > >> > > are
> > > > > > > > > > > >>> > > > > >> > > > a couple initial comments on the
> > APIs:
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > > 1. The `TxnProducerIdentity` class
> > is
> > > a
> > > > > bit
> > > > > > > > > > awkward. I
> > > > > > > > > > > >>> think
> > > > > > > > > > > >>> > > we
> > > > > > > > > > > >>> > > > are
> > > > > > > > > > > >>> > > > > >> > > trying
> > > > > > > > > > > >>> > > > > >> > > > to encapsulate state from the
> > current
> > > > > group
> > > > > > > > > > > assignment.
> > > > > > > > > > > >>> > Maybe
> > > > > > > > > > > >>> > > > > >> something
> > > > > > > > > > > >>> > > > > >> > > > like `ConsumerAssignment` would be
> > > > > clearer?
> > > > > > If
> > > > > > > > we
> > > > > > > > > > make
> > > > > > > > > > > >>> the
> > > > > > > > > > > >>> > > usage
> > > > > > > > > > > >>> > > > > >> > > consistent
> > > > > > > > > > > >>> > > > > >> > > > across the consumer and producer,
> > then
> > > > we
> > > > > > can
> > > > > > > > > avoid
> > > > > > > > > > > >>> exposing
> > > > > > > > > > > >>> > > > > >> internal
> > > > > > > > > > > >>> > > > > >> > > state
> > > > > > > > > > > >>> > > > > >> > > > like the generationId.
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > > For example:
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > > // Public API
> > > > > > > > > > > >>> > > > > >> > > > interface ConsumerAssignment {
> > > > > > > > > > > >>> > > > > >> > > >   Set<TopicPartition>
> partittions();
> > > > > > > > > > > >>> > > > > >> > > > }
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > > // Not a public API
> > > > > > > > > > > >>> > > > > >> > > > class InternalConsumerAssignment
> > > > > implements
> > > > > > > > > > > >>> > > ConsumerAssignment {
> > > > > > > > > > > >>> > > > > >> > > >   Set<TopicPartition> partittions;
> > > > > > > > > > > >>> > > > > >> > > >   int generationId;
> > > > > > > > > > > >>> > > > > >> > > > }
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > > Then we can change the rebalance
> > > > listener
> > > > > to
> > > > > > > > > > something
> > > > > > > > > > > >>> like
> > > > > > > > > > > >>> > > > this:
> > > > > > > > > > > >>> > > > > >> > > >
> > > onPartitionsAssigned(ConsumerAssignment
> > > > > > > > > assignment)
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > > And on the producer:
> > > > > > > > > > > >>> > > > > >> > > > void initTransactions(String
> > groupId,
> > > > > > > > > > > ConsumerAssignment
> > > > > > > > > > > >>> > > > > >> assignment);
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > > 2. Another bit of awkwardness is
> the
> > > > fact
> > > > > > that
> > > > > > > > we
> > > > > > > > > > have
> > > > > > > > > > > >>> to
> > > > > > > > > > > >>> > pass
> > > > > > > > > > > >>> > > > the
> > > > > > > > > > > >>> > > > > >> > > groupId
> > > > > > > > > > > >>> > > > > >> > > > through both initTransactions()
> and
> > > > > > > > > > > >>> > > sendOffsetsToTransaction().
> > > > > > > > > > > >>> > > > We
> > > > > > > > > > > >>> > > > > >> > could
> > > > > > > > > > > >>> > > > > >> > > > consider a config instead. Maybe
> > > > something
> > > > > > > like
> > > > > > > > `
> > > > > > > > > > > >>> > > > > >> > transactional.group.id
> > > > > > > > > > > >>> > > > > >> > > `?
> > > > > > > > > > > >>> > > > > >> > > > Then we could simplify the
> producer
> > > > APIs,
> > > > > > > > > > potentially
> > > > > > > > > > > >>> even
> > > > > > > > > > > >>> > > > > >> deprecating
> > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > >>> > > > > >> > > > current sendOffsetsToTransaction.
> In
> > > > fact,
> > > > > > for
> > > > > > > > > this
> > > > > > > > > > > new
> > > > > > > > > > > >>> > usage,
> > > > > > > > > > > >>> > > > the `
> > > > > > > > > > > >>> > > > > >> > > > transational.id` config is not
> > > needed.
> > > > It
> > > > > > > would
> > > > > > > > > be
> > > > > > > > > > > >>> nice if
> > > > > > > > > > > >>> > we
> > > > > > > > > > > >>> > > > don't
> > > > > > > > > > > >>> > > > > >> > have
> > > > > > > > > > > >>> > > > > >> > > > to
> > > > > > > > > > > >>> > > > > >> > > > provide it.
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > > I like the idea of combining 1 and
> > 2.
> > > We
> > > > > > could
> > > > > > > > > > > >>> definitely
> > > > > > > > > > > >>> > pass
> > > > > > > > > > > >>> > > > in a
> > > > > > > > > > > >>> > > > > >> > > group.id config
> > > > > > > > > > > >>> > > > > >> > > so that we could avoid exposing that
> > > > > > information
> > > > > > > > in
> > > > > > > > > a
> > > > > > > > > > > >>> public
> > > > > > > > > > > >>> > > API.
> > > > > > > > > > > >>> > > > The
> > > > > > > > > > > >>> > > > > >> > > question I have
> > > > > > > > > > > >>> > > > > >> > > is that whether we should name the
> > > > interface
> > > > > > > > > > > >>> `GroupAssignment`
> > > > > > > > > > > >>> > > > > >> instead,
> > > > > > > > > > > >>> > > > > >> > so
> > > > > > > > > > > >>> > > > > >> > > that Connect later
> > > > > > > > > > > >>> > > > > >> > > could also extend on the same
> > interface,
> > > > > just
> > > > > > to
> > > > > > > > > echo
> > > > > > > > > > > >>> > Guozhang's
> > > > > > > > > > > >>> > > > point
> > > > > > > > > > > >>> > > > > >> > > here, Also the base interface
> > > > > > > > > > > >>> > > > > >> > > is better to be defined empty for
> easy
> > > > > > > extension,
> > > > > > > > or
> > > > > > > > > > > >>> define an
> > > > > > > > > > > >>> > > > > >> abstract
> > > > > > > > > > > >>> > > > > >> > > type called `Resource` to be
> shareable
> > > > > > > > > > > >>> > > > > >> > > later IMHO.
> > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > >>> > > > > >> > > > By the way, I'm a bit confused
> about
> > > > > > > discussion
> > > > > > > > > > above
> > > > > > > > > > > >>> about
> > > > > > > > > > > >>> > > > > >> colocating
> > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > >>> > > > > >> > > > txn and group coordinators. That
> is
> > > not
> > > > > > > actually
> > > > > > > > > > > >>> necessary,
> > > > > > > > > > > >>> > is
> > > > > > > > > > > >>> > > > it?
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > > Yes, this is not a requirement for
> > > this
> > > > > KIP,
> > > > > > > > > because
> > > > > > > > > > > it
> > > > > > > > > > > >>> is
> > > > > > > > > > > >>> > > > > >> inherently
> > > > > > > > > > > >>> > > > > >> > > impossible to
> > > > > > > > > > > >>> > > > > >> > > achieve co-locating  topic partition
> > of
> > > > > > > > transaction
> > > > > > > > > > log
> > > > > > > > > > > >>> and
> > > > > > > > > > > >>> > > > consumed
> > > > > > > > > > > >>> > > > > >> > offset
> > > > > > > > > > > >>> > > > > >> > > topics.
> > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > >>> > > > > >> > > > Thanks,
> > > > > > > > > > > >>> > > > > >> > > > Jason
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM
> > Boyang
> > > > > Chen <
> > > > > > > > > > > >>> > > > > >> reluctanthero...@gmail.com
> > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > >>> > > > > >> > > > wrote:
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > > > > Thank you Ismael for the
> > suggestion.
> > > > We
> > > > > > will
> > > > > > > > > > attempt
> > > > > > > > > > > >>> to
> > > > > > > > > > > >>> > > > address
> > > > > > > > > > > >>> > > > > >> it by
> > > > > > > > > > > >>> > > > > >> > > > > giving more details to rejected
> > > > > > alternative
> > > > > > > > > > section.
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > Thank you for the comment
> > Guozhang!
> > > > > > Answers
> > > > > > > > are
> > > > > > > > > > > inline
> > > > > > > > > > > >>> > > below.
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM
> > > > Guozhang
> > > > > > > Wang
> > > > > > > > <
> > > > > > > > > > > >>> > > > wangg...@gmail.com
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >> > > > wrote:
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > Hello Boyang,
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > Thanks for the KIP, I have
> some
> > > > > comments
> > > > > > > > > below:
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > 1. "Once transactions are
> > > complete,
> > > > > the
> > > > > > > call
> > > > > > > > > > will
> > > > > > > > > > > >>> > return."
> > > > > > > > > > > >>> > > > This
> > > > > > > > > > > >>> > > > > >> > seems
> > > > > > > > > > > >>> > > > > >> > > > > > different from the existing
> > > > behavior,
> > > > > in
> > > > > > > > which
> > > > > > > > > > we
> > > > > > > > > > > >>> would
> > > > > > > > > > > >>> > > > return a
> > > > > > > > > > > >>> > > > > >> > > > > retriable
> > > > > > > > > > > >>> > > > > >> > > > > > CONCURRENT_TRANSACTIONS and
> let
> > > the
> > > > > > client
> > > > > > > > to
> > > > > > > > > > > >>> retry, is
> > > > > > > > > > > >>> > > this
> > > > > > > > > > > >>> > > > > >> > > > intentional?
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > I don’t think it is intentional,
> > > and I
> > > > > > will
> > > > > > > > > defer
> > > > > > > > > > > this
> > > > > > > > > > > >>> > > > question to
> > > > > > > > > > > >>> > > > > >> > > Jason
> > > > > > > > > > > >>> > > > > >> > > > > when he got time to answer since
> > > from
> > > > > > what I
> > > > > > > > > > > >>> understood
> > > > > > > > > > > >>> > > retry
> > > > > > > > > > > >>> > > > and
> > > > > > > > > > > >>> > > > > >> on
> > > > > > > > > > > >>> > > > > >> > > hold
> > > > > > > > > > > >>> > > > > >> > > > > seem both valid approaches.
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > 2. "an overload to
> > > > > onPartitionsAssigned
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > >>> > consumer's
> > > > > > > > > > > >>> > > > > >> rebalance
> > > > > > > > > > > >>> > > > > >> > > > > > listener interface": as part
> of
> > > > > KIP-341
> > > > > > > > we've
> > > > > > > > > > > >>> already
> > > > > > > > > > > >>> > add
> > > > > > > > > > > >>> > > > this
> > > > > > > > > > > >>> > > > > >> > > > > information
> > > > > > > > > > > >>> > > > > >> > > > > > to the onAssignment callback.
> > > Would
> > > > > this
> > > > > > > be
> > > > > > > > > > > >>> sufficient?
> > > > > > > > > > > >>> > Or
> > > > > > > > > > > >>> > > > more
> > > > > > > > > > > >>> > > > > >> > > > generally
> > > > > > > > > > > >>> > > > > >> > > > > > speaking, which information
> have
> > > to
> > > > be
> > > > > > > > passed
> > > > > > > > > > > >>> around in
> > > > > > > > > > > >>> > > > > >> rebalance
> > > > > > > > > > > >>> > > > > >> > > > > callback
> > > > > > > > > > > >>> > > > > >> > > > > > while others can be passed
> > around
> > > in
> > > > > > > > > > > >>> PartitionAssignor
> > > > > > > > > > > >>> > > > > >> callback? In
> > > > > > > > > > > >>> > > > > >> > > > > Streams
> > > > > > > > > > > >>> > > > > >> > > > > > for example both callbacks are
> > > used
> > > > > but
> > > > > > > most
> > > > > > > > > > > >>> critical
> > > > > > > > > > > >>> > > > > >> information
> > > > > > > > > > > >>> > > > > >> > is
> > > > > > > > > > > >>> > > > > >> > > > > passed
> > > > > > > > > > > >>> > > > > >> > > > > > via onAssignment.
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > We still need to extend
> > > > > > > > > ConsumerRebalanceListener
> > > > > > > > > > > >>> because
> > > > > > > > > > > >>> > > > it’s the
> > > > > > > > > > > >>> > > > > >> > > > > interface we could have public
> > > access
> > > > > to.
> > > > > > > The
> > > > > > > > > > > >>> > #onAssignment
> > > > > > > > > > > >>> > > > call
> > > > > > > > > > > >>> > > > > >> is
> > > > > > > > > > > >>> > > > > >> > > > defined
> > > > > > > > > > > >>> > > > > >> > > > > on PartitionAssignor level which
> > is
> > > > not
> > > > > > easy
> > > > > > > > to
> > > > > > > > > > work
> > > > > > > > > > > >>> with
> > > > > > > > > > > >>> > > > external
> > > > > > > > > > > >>> > > > > >> > > > > producers.
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > 3. "We propose to use a
> separate
> > > > > record
> > > > > > > type
> > > > > > > > > in
> > > > > > > > > > > >>> order to
> > > > > > > > > > > >>> > > > store
> > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > >>> > > > > >> > > > group
> > > > > > > > > > > >>> > > > > >> > > > > > assignment.": hmm, I thought
> > with
> > > > the
> > > > > > > third
> > > > > > > > > > typed
> > > > > > > > > > > >>> > > > > >> FindCoordinator,
> > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > >>> > > > > >> > > > > same
> > > > > > > > > > > >>> > > > > >> > > > > > broker that act as the
> consumer
> > > > > > > coordinator
> > > > > > > > > > would
> > > > > > > > > > > >>> > always
> > > > > > > > > > > >>> > > be
> > > > > > > > > > > >>> > > > > >> > selected
> > > > > > > > > > > >>> > > > > >> > > > as
> > > > > > > > > > > >>> > > > > >> > > > > > the txn coordinator, in which
> > case
> > > > it
> > > > > > can
> > > > > > > > > access
> > > > > > > > > > > its
> > > > > > > > > > > >>> > local
> > > > > > > > > > > >>> > > > cache
> > > > > > > > > > > >>> > > > > >> > > > > metadata /
> > > > > > > > > > > >>> > > > > >> > > > > > offset topic to get this
> > > information
> > > > > > > > already?
> > > > > > > > > We
> > > > > > > > > > > >>> just
> > > > > > > > > > > >>> > need
> > > > > > > > > > > >>> > > > to
> > > > > > > > > > > >>> > > > > >> think
> > > > > > > > > > > >>> > > > > >> > > > about
> > > > > > > > > > > >>> > > > > >> > > > > > how to make these two modules
> > > > directly
> > > > > > > > > exchange
> > > > > > > > > > > >>> > > information
> > > > > > > > > > > >>> > > > > >> without
> > > > > > > > > > > >>> > > > > >> > > > > messing
> > > > > > > > > > > >>> > > > > >> > > > > > up the code hierarchy.
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > These two coordinators will be
> on
> > > the
> > > > > same
> > > > > > > > > broker
> > > > > > > > > > > only
> > > > > > > > > > > >>> > when
> > > > > > > > > > > >>> > > > > >> number of
> > > > > > > > > > > >>> > > > > >> > > > > partitions for transaction state
> > > topic
> > > > > and
> > > > > > > > > > consumer
> > > > > > > > > > > >>> offset
> > > > > > > > > > > >>> > > > topic
> > > > > > > > > > > >>> > > > > >> are
> > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > >>> > > > > >> > > > > same. This normally holds true,
> > but
> > > > I'm
> > > > > > > afraid
> > > > > > > > > > > >>> > > > > >> > > > > we couldn't make this
> assumption?
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > 4. The config of
> > > > > > > > > > "CONSUMER_GROUP_AWARE_TRANSACTION":
> > > > > > > > > > > >>> it
> > > > > > > > > > > >>> > > seems
> > > > > > > > > > > >>> > > > the
> > > > > > > > > > > >>> > > > > >> > goal
> > > > > > > > > > > >>> > > > > >> > > of
> > > > > > > > > > > >>> > > > > >> > > > > > this config is just to avoid
> > > > > > old-versioned
> > > > > > > > > > broker
> > > > > > > > > > > >>> to not
> > > > > > > > > > > >>> > > be
> > > > > > > > > > > >>> > > > > >> able to
> > > > > > > > > > > >>> > > > > >> > > > > > recognize newer versioned
> > client.
> > > I
> > > > > > think
> > > > > > > if
> > > > > > > > > we
> > > > > > > > > > > can
> > > > > > > > > > > >>> do
> > > > > > > > > > > >>> > > > something
> > > > > > > > > > > >>> > > > > >> > else
> > > > > > > > > > > >>> > > > > >> > > > to
> > > > > > > > > > > >>> > > > > >> > > > > > avoid this config though, for
> > > > example
> > > > > we
> > > > > > > can
> > > > > > > > > use
> > > > > > > > > > > the
> > > > > > > > > > > >>> > > > embedded
> > > > > > > > > > > >>> > > > > >> > > > AdminClient
> > > > > > > > > > > >>> > > > > >> > > > > > to send the APIVersion request
> > > upon
> > > > > > > starting
> > > > > > > > > up,
> > > > > > > > > > > and
> > > > > > > > > > > >>> > based
> > > > > > > > > > > >>> > > > on
> > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > >>> > > > > >> > > > > returned
> > > > > > > > > > > >>> > > > > >> > > > > > value decides whether to go to
> > the
> > > > old
> > > > > > > code
> > > > > > > > > path
> > > > > > > > > > > or
> > > > > > > > > > > >>> the
> > > > > > > > > > > >>> > > new
> > > > > > > > > > > >>> > > > > >> > behavior.
> > > > > > > > > > > >>> > > > > >> > > > > > Admittedly asking a random
> > broker
> > > > > about
> > > > > > > > > > APIVersion
> > > > > > > > > > > >>> does
> > > > > > > > > > > >>> > > not
> > > > > > > > > > > >>> > > > > >> > guarantee
> > > > > > > > > > > >>> > > > > >> > > > the
> > > > > > > > > > > >>> > > > > >> > > > > > whole cluster's versions, but
> > what
> > > > we
> > > > > > can
> > > > > > > do
> > > > > > > > > is
> > > > > > > > > > to
> > > > > > > > > > > >>> first
> > > > > > > > > > > >>> > > 1)
> > > > > > > > > > > >>> > > > find
> > > > > > > > > > > >>> > > > > >> > the
> > > > > > > > > > > >>> > > > > >> > > > > > coordinator (and if the random
> > > > broker
> > > > > > does
> > > > > > > > not
> > > > > > > > > > > even
> > > > > > > > > > > >>> > > > recognize
> > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > >>> > > > > >> > new
> > > > > > > > > > > >>> > > > > >> > > > > > discover type, fall back to
> old
> > > path
> > > > > > > > > directly),
> > > > > > > > > > > and
> > > > > > > > > > > >>> then
> > > > > > > > > > > >>> > > 2)
> > > > > > > > > > > >>> > > > ask
> > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > >>> > > > > >> > > > > > discovered coordinator about
> its
> > > > > > supported
> > > > > > > > > > > >>> APIVersion.
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > The caveat here is that we have
> to
> > > > make
> > > > > > sure
> > > > > > > > > both
> > > > > > > > > > > the
> > > > > > > > > > > >>> > group
> > > > > > > > > > > >>> > > > > >> > coordinator
> > > > > > > > > > > >>> > > > > >> > > > and
> > > > > > > > > > > >>> > > > > >> > > > > transaction coordinator are on
> the
> > > > > latest
> > > > > > > > > version
> > > > > > > > > > > >>> during
> > > > > > > > > > > >>> > > init
> > > > > > > > > > > >>> > > > > >> stage.
> > > > > > > > > > > >>> > > > > >> > > This
> > > > > > > > > > > >>> > > > > >> > > > > is potentially doable as we only
> > > need
> > > > a
> > > > > > > > consumer
> > > > > > > > > > > >>> group.id
> > > > > > > > > > > >>> > > > > >> > > > > to check that. In the meantime,
> a
> > > > > > hard-coded
> > > > > > > > > > config
> > > > > > > > > > > is
> > > > > > > > > > > >>> > > still a
> > > > > > > > > > > >>> > > > > >> > > favorable
> > > > > > > > > > > >>> > > > > >> > > > > backup in case the server has
> > > > > downgraded,
> > > > > > so
> > > > > > > > you
> > > > > > > > > > > will
> > > > > > > > > > > >>> want
> > > > > > > > > > > >>> > > to
> > > > > > > > > > > >>> > > > use
> > > > > > > > > > > >>> > > > > >> a
> > > > > > > > > > > >>> > > > > >> > new
> > > > > > > > > > > >>> > > > > >> > > > > version client without `consumer
> > > > group`
> > > > > > > > > > > transactional
> > > > > > > > > > > >>> > > support.
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > 5. This is a meta question: have
> > you
> > > > > > > > considered
> > > > > > > > > > how
> > > > > > > > > > > >>> this
> > > > > > > > > > > >>> > can
> > > > > > > > > > > >>> > > > be
> > > > > > > > > > > >>> > > > > >> > applied
> > > > > > > > > > > >>> > > > > >> > > > to
> > > > > > > > > > > >>> > > > > >> > > > > > Kafka Connect as well? For
> > > example,
> > > > > for
> > > > > > > > source
> > > > > > > > > > > >>> > connectors,
> > > > > > > > > > > >>> > > > the
> > > > > > > > > > > >>> > > > > >> > > > assignment
> > > > > > > > > > > >>> > > > > >> > > > > > is not by "partitions", but by
> > > some
> > > > > > other
> > > > > > > > sort
> > > > > > > > > > of
> > > > > > > > > > > >>> > > > "resources"
> > > > > > > > > > > >>> > > > > >> based
> > > > > > > > > > > >>> > > > > >> > > on
> > > > > > > > > > > >>> > > > > >> > > > > the
> > > > > > > > > > > >>> > > > > >> > > > > > source systems, how KIP-447
> > would
> > > > > affect
> > > > > > > > Kafka
> > > > > > > > > > > >>> > Connectors
> > > > > > > > > > > >>> > > > that
> > > > > > > > > > > >>> > > > > >> > > > > implemented
> > > > > > > > > > > >>> > > > > >> > > > > > EOS as well?
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > No, it's not currently included
> in
> > > the
> > > > > > > scope.
> > > > > > > > > > Could
> > > > > > > > > > > >>> you
> > > > > > > > > > > >>> > > point
> > > > > > > > > > > >>> > > > me
> > > > > > > > > > > >>> > > > > >> to a
> > > > > > > > > > > >>> > > > > >> > > > > sample source connector who uses
> > > EOS?
> > > > > > Could
> > > > > > > > > always
> > > > > > > > > > > >>> > > piggy-back
> > > > > > > > > > > >>> > > > into
> > > > > > > > > > > >>> > > > > >> > the
> > > > > > > > > > > >>> > > > > >> > > > > TxnProducerIdentity struct with
> > more
> > > > > > > > information
> > > > > > > > > > > such
> > > > > > > > > > > >>> as
> > > > > > > > > > > >>> > > > tasks. If
> > > > > > > > > > > >>> > > > > >> > > > > this is something to support in
> > near
> > > > > term,
> > > > > > > an
> > > > > > > > > > > abstract
> > > > > > > > > > > >>> > type
> > > > > > > > > > > >>> > > > called
> > > > > > > > > > > >>> > > > > >> > > > > "Resource" could be provided and
> > let
> > > > > topic
> > > > > > > > > > partition
> > > > > > > > > > > >>> and
> > > > > > > > > > > >>> > > > connect
> > > > > > > > > > > >>> > > > > >> task
> > > > > > > > > > > >>> > > > > >> > > > > implement it.
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > Guozhang
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > On Sat, Jun 22, 2019 at 8:40
> PM
> > > > Ismael
> > > > > > > Juma
> > > > > > > > <
> > > > > > > > > > > >>> > > > ism...@juma.me.uk>
> > > > > > > > > > > >>> > > > > >> > > wrote:
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > > Hi Boyang,
> > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > > Thanks for the KIP. It's
> good
> > > that
> > > > > we
> > > > > > > > > listed a
> > > > > > > > > > > >>> number
> > > > > > > > > > > >>> > of
> > > > > > > > > > > >>> > > > > >> rejected
> > > > > > > > > > > >>> > > > > >> > > > > > > alternatives. It would be
> > > helpful
> > > > to
> > > > > > > have
> > > > > > > > an
> > > > > > > > > > > >>> > explanation
> > > > > > > > > > > >>> > > > of
> > > > > > > > > > > >>> > > > > >> why
> > > > > > > > > > > >>> > > > > >> > > they
> > > > > > > > > > > >>> > > > > >> > > > > were
> > > > > > > > > > > >>> > > > > >> > > > > > > rejected.
> > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > > Ismael
> > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > > On Sat, Jun 22, 2019 at 8:31
> > PM
> > > > > Boyang
> > > > > > > > Chen
> > > > > > > > > <
> > > > > > > > > > > >>> > > > > >> bche...@outlook.com
> > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > >>> > > > > >> > > > > wrote:
> > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > > > Hey all,
> > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > > > I would like to start a
> > > > discussion
> > > > > > for
> > > > > > > > > > > KIP-447:
> > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > > > this is a work originated
> by
> > > > Jason
> > > > > > > > > Gustafson
> > > > > > > > > > > >>> and we
> > > > > > > > > > > >>> > > > would
> > > > > > > > > > > >>> > > > > >> like
> > > > > > > > > > > >>> > > > > >> > to
> > > > > > > > > > > >>> > > > > >> > > > > > proceed
> > > > > > > > > > > >>> > > > > >> > > > > > > > into discussion stage.
> > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > > > Let me know your thoughts,
> > > > thanks!
> > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > > > Boyang
> > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > > > --
> > > > > > > > > > > >>> > > > > >> > > > > > -- Guozhang
> > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >> > --
> > > > > > > > > > > >>> > > > > >> > -- Guozhang
> > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > >>> > > > >
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>>
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> --
> > > > > > > > > > > >>> -- Guozhang
> > > > > > > > > > > >>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to