Thank you Jason for the ideas.

On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson <ja...@confluent.io> wrote:

> Hi Boyang,
>
> Thanks for the updates. A few comments below:
>
> 1. The KIP mentions that `transaction.timeout.ms` should be reduced to
> 10s.
> I think this makes sense for Kafka Streams which is tied to the consumer
> group semantics and uses a default 10s session timeout. However, it seems a
> bit dangerous to make this change for the producer generally. Could we just
> change it for streams?
>
> That sounds good to me.

> 2. The new `initTransactions` API takes a `Consumer` instance. I think the
> idea is to basically put in a backdoor to give the producer access to the
> group generationId. It's not clear to me how this would work given package
> restrictions. I wonder if it would be better to just expose the state we
> need from the consumer. I know we have been reluctant to do this so far
> because we treat the generationId as an implementation detail. However, I
> think we might just bite the bullet and expose it rather than coming up
> with a messy hack. Concepts such as memberIds have already been exposed in
> the AdminClient, so maybe it is not too bad. Alternatively, we could use an
> opaque type. For example:
>
> // public
> interface GroupMetadata {}
>
> // private
> interface ConsumerGroupMetadata {
>   final int generationId;
>   final String memberId;
> }
>
> // Consumer API
> public GroupMetadata groupMetadata();
>
> I am probably leaning toward just exposing the state we need.
>
> Yes, also to mention that Kafka Streams use generic Cosnumer API which
doesn't have rich
states like a full `KafkaConsumer`. The hack will not work as expected.

Instead, just exposing the consumer generation.id seems a way easier work.
We could consolidate
the API and make it

3. Given that we are already providing a way to propagate group state from
> the consumer to the producer, I wonder if we may as well include the
> memberId and groupInstanceId. This would make the validation we do for
> TxnOffsetCommit consistent with OffsetCommit. If for no other benefit, at
> least this may help with debugging.
>

Yes, we could put them into the GroupMetadata struct.


> 4. I like the addition of isolation_level to the offset fetch. At the same
> time, its behavior is a bit inconsistent with how it is used in the
> consumer generally. There is no reason for the group coordinator to ever
> expose aborted data, so this is mostly about awaiting pending offset
> commits, not reading uncommitted data. Perhaps instead of calling this
> "isolation level," it should be more like "await_pending_transaction" or
> something like that?
>
> Also, just to be clear, the consumer would treat this as an optional field,
> right? So if the broker does not support the latest OffsetFetch API, it
> would silently revert to reading the old data. Basically it would be up to
> the streams version probing logic to ensure that the expectation on this
> API fits with the usage of `transctional.id`.
>
> Sounds like a better naming to me, while I think it could be shortened to
`await_transaction`.
I think the field should be optional, too.


> Thanks,
> Jason
>
>
>
>
>
> On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen <reluctanthero...@gmail.com>
> wrote:
>
> > Hey Guozhang,
> >
> > I will correct my statement from last email. I don't think the
> > read_committed (3.a) is necessary to be added to the OffsetFetch request,
> > as if we are using EOS application, the underlying consumers within the
> > group should always back off when there is pending offsets.
> >
> > Let me know if you think this is correct.
> >
> > On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen <reluctanthero...@gmail.com>
> > wrote:
> >
> > > Thank you Guozhang for the questions, inline answers are below.
> > >
> > > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen <reluctanthero...@gmail.com
> >
> > > wrote:
> > >
> > >> Hey all,
> > >>
> > >> I have done a fundamental polish of KIP-447
> > >> <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> >
> > and
> > >> written a design doc
> > >> <
> >
> https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit#
> >
> > depicting
> > >> internal changes. We stripped off many implementation details from the
> > KIP,
> > >> and simplified the public changes by a lot. For reviewers, it is
> highly
> > >> recommended to fully understand EOS design in KIP-98 and read its
> > >> corresponding design doc
> > >> <
> >
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit
> >
> > if
> > >> you haven't done so already.
> > >>
> > >> Let me know if you found anything confusing around the KIP or the
> > design.
> > >> Would be happy to discuss in depth.
> > >>
> > >> Best,
> > >> Boyang
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >>
> > >>> 2. The reason we did not expose generation.id from KafkaConsumer
> > public
> > >>> APIs directly is to abstract this notion from users (since it is an
> > >>> implementation detail of the rebalance protocol itself, e.g. if user
> > >>> calls
> > >>> consumer.assign() they do not need to invoke ConsumerCoordinator and
> no
> > >>> need to be aware of generation.id at all).
> > >>>
> > >>> On the other hand, with the current proposal the txn.coordiantor did
> > not
> > >>> know about the latest generation from the source-of-truth
> > >>> group.coordinator; instead, it will only bump up the generation from
> > the
> > >>> producer's InitProducerIdRequest only.
> > >>>
> > >>> The key here is that GroupCoordinator, when handling
> > >>> `InitProducerIdRequest
> > >>>
> > >> In the new design, we just pass the entire consumer instance into the
> > > producer through
> > > #initTransaction, so no public API will be created.
> > >
> > >> 3. I agree that if we rely on the group coordinator to block on
> > returning
> > >>> offset-fetch-response if read-committed is enabled, then we do not
> need
> > >>> to
> > >>> store partition assignment on txn coordinator and therefore it's
> better
> > >>> to
> > >>> still decouple them. For that case we still need to update the KIP
> wiki
> > >>> page that includes:
> > >>>
> > >>> 3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as well.
> > >>> 3.b. Add new error code in OffsetFetchResponse to let client backoff
> > and
> > >>> retry if there are pending txns including the interested partitions.
> > >>> 3.c. Also in the worst case we would let the client be blocked for
> the
> > >>> txn.timeout period, and for that rationale we may need to consider
> > >>> reducing
> > >>> our default txn.timeout value as well.
> > >>>
> > >>> Addressed 3.b and 3.c, will do 3.a.
> > >
> > >> 4. According to Colin it seems we do not need to create another KIP
> and
> > we
> > >>> can just complete it as part of KIP-117 / KAFKA-5214; and we need to
> do
> > >>> some cleanup to have BrokerApiVersion exposed from AdminClient
> (@Colin
> > >>> please let use know if you have any concerns exposing it).
> > >>>
> > >> I think we no longer need to rely on api version for initialization,
> > > since we will be using the upgrade.from config anyway.
> > >
> > >>
> > >>> Guozhang
> > >>>
> > >>>
> > >>> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson <ja...@confluent.io>
> > >>> wrote:
> > >>>
> > >>> > For reference, we have BrokerApiVersionCommand already as a public
> > >>> > interface. We have a bit of tech debt at the moment because it
> uses a
> > >>> > custom AdminClient. It would be nice to clean that up. In general,
> I
> > >>> think
> > >>> > it is reasonable to expose from AdminClient. It can be used by
> > >>> management
> > >>> > tools to inspect running Kafka versions for example.
> > >>> >
> > >>> > -Jason
> > >>> >
> > >>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen <
> > >>> reluctanthero...@gmail.com>
> > >>> > wrote:
> > >>> >
> > >>> > > Thank you for the context Colin. The groupId was indeed a
> > copy-paste
> > >>> > error.
> > >>> > > Our use case here for 447 is (Quoted from Guozhang):
> > >>> > > '''
> > >>> > > I think if we can do something else to
> > >>> > > avoid this config though, for example we can use the embedded
> > >>> AdminClient
> > >>> > > to send the APIVersion request upon starting up, and based on the
> > >>> > returned
> > >>> > > value decides whether to go to the old code path or the new
> > behavior.
> > >>> > > '''
> > >>> > > The benefit we get is to avoid adding a new configuration to
> make a
> > >>> > > decision simply base on broker version. If you have concerns with
> > >>> > exposing
> > >>> > > ApiVersion for client, we could
> > >>> > > try to think of alternative solutions too.
> > >>> > >
> > >>> > > Boyang
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe <cmcc...@apache.org
> >
> > >>> wrote:
> > >>> > >
> > >>> > > > kafka.api.ApiVersion is an internal class, not suitable to
> > exposing
> > >>> > > > through AdminClient.  That class is not even accessible without
> > >>> having
> > >>> > > the
> > >>> > > > broker jars on your CLASSPATH.
> > >>> > > >
> > >>> > > > Another question is, what is the groupId parameter doing in the
> > >>> call?
> > >>> > > The
> > >>> > > > API versions are the same no matter what consumer group we use,
> > >>> right?
> > >>> > > > Perhaps this was a copy and paste error?
> > >>> > > >
> > >>> > > > This is not the first time we have discussed having a method in
> > >>> > > > AdminClient to retrieve API version information.  In fact, the
> > >>> original
> > >>> > > KIP
> > >>> > > > which created KafkaAdminClient specified an API for fetching
> > >>> version
> > >>> > > > information.  It was called apiVersions and it is still there
> on
> > >>> the
> > >>> > > wiki.
> > >>> > > > See
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> > >>> > > >
> > >>> > > > However, this API wasn't ready in time for 0.11.0 so we shipped
> > >>> without
> > >>> > > > it.  There was a JIRA to implement it for later versions,
> > >>> > > > https://issues.apache.org/jira/browse/KAFKA-5214 , as well as
> a
> > >>> PR,
> > >>> > > > https://github.com/apache/kafka/pull/3012 .  However, we
> started
> > >>> to
> > >>> > > > rethink whether this AdminClient function was even necessary.
> > >>> Most of
> > >>> > > the
> > >>> > > > use-cases we could think of seemed like horrible hacks.  So it
> > has
> > >>> > never
> > >>> > > > really been implemented (yet?).
> > >>> > > >
> > >>> > > > best,
> > >>> > > > Colin
> > >>> > > >
> > >>> > > >
> > >>> > > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote:
> > >>> > > > > Actually, after a second thought, I think it actually makes
> > >>> sense to
> > >>> > > > > support auto upgrade through admin client to help use get api
> > >>> version
> > >>> > > > > from
> > >>> > > > > broker.
> > >>> > > > > A draft KIP is here:
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> > >>> > > > >
> > >>> > > > > Boyang
> > >>> > > > >
> > >>> > > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen <
> > >>> > > reluctanthero...@gmail.com>
> > >>> > > > > wrote:
> > >>> > > > >
> > >>> > > > > > Thank you Guozhang, some of my understandings are inline
> > below.
> > >>> > > > > >
> > >>> > > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson <
> > >>> > ja...@confluent.io
> > >>> > > >
> > >>> > > > > > wrote:
> > >>> > > > > >
> > >>> > > > > >> >
> > >>> > > > > >> > I think co-locating does have some merits here, i.e.
> > >>> letting the
> > >>> > > > > >> > ConsumerCoordinator which has the source-of-truth of
> > >>> assignment
> > >>> > to
> > >>> > > > act
> > >>> > > > > >> as
> > >>> > > > > >> > the TxnCoordinator as well; but I agree there's also
> some
> > >>> cons
> > >>> > of
> > >>> > > > > >> coupling
> > >>> > > > > >> > them together. I'm still a bit inclining towards
> > colocation
> > >>> but
> > >>> > if
> > >>> > > > there
> > >>> > > > > >> > are good rationales not to do so I can be convinced as
> > well.
> > >>> > > > > >>
> > >>> > > > > >>
> > >>> > > > > >> The good rationale is that we have no mechanism to
> colocate
> > >>> > > > partitions ;).
> > >>> > > > > >> Are you suggesting we store the group and transaction
> state
> > >>> in the
> > >>> > > > same
> > >>> > > > > >> log? Can you be more concrete about the benefit?
> > >>> > > > > >>
> > >>> > > > > >> -Jason
> > >>> > > > > >>
> > >>> > > > > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang <
> > >>> > wangg...@gmail.com>
> > >>> > > > > >> wrote:
> > >>> > > > > >>
> > >>> > > > > >> > Hi Boyang,
> > >>> > > > > >> >
> > >>> > > > > >> > 1. One advantage of retry against on-hold is that it
> will
> > >>> not
> > >>> > > > tie-up a
> > >>> > > > > >> > handler thread (of course the latter could do the same
> but
> > >>> that
> > >>> > > > involves
> > >>> > > > > >> > using a purgatory which is more complicated), and also
> it
> > is
> > >>> > less
> > >>> > > > > >> likely to
> > >>> > > > > >> > violate request timeout. So I think there are some
> > >>> rationales to
> > >>> > > > prefer
> > >>> > > > > >> > retries.
> > >>> > > > > >> >
> > >>> > > > > >>
> > >>> > > > > >  That sounds fair to me, also we are avoiding usage of
> > another
> > >>> > > > purgatory
> > >>> > > > > > instance. Usually for one back-off
> > >>> > > > > > we are only delaying 50ms during startup which is trivial
> > cost.
> > >>> > This
> > >>> > > > > > behavior shouldn't be changed.
> > >>> > > > > >
> > >>> > > > > > > 2. Regarding "ConsumerRebalanceListener": both
> > >>> > > > ConsumerRebalanceListener
> > >>> > > > > >> > and PartitionAssignors are user-customizable modules,
> and
> > >>> only
> > >>> > > > > >> difference
> > >>> > > > > >> > is that the former is specified via code and the latter
> is
> > >>> > > > specified via
> > >>> > > > > >> > config.
> > >>> > > > > >> >
> > >>> > > > > >> > Regarding Jason's proposal of ConsumerAssignment, one
> > thing
> > >>> to
> > >>> > > note
> > >>> > > > > >> though
> > >>> > > > > >> > with KIP-429 the onPartitionAssigned may not be called
> if
> > >>> the
> > >>> > > > assignment
> > >>> > > > > >> > does not change, whereas onAssignment would always be
> > >>> called at
> > >>> > > the
> > >>> > > > end
> > >>> > > > > >> of
> > >>> > > > > >> > sync-group response. My proposed semantics is that
> > >>> > > > > >> > `RebalanceListener#onPartitionsXXX` are used for
> > >>> notifications
> > >>> > to
> > >>> > > > user,
> > >>> > > > > >> and
> > >>> > > > > >> > hence if there's no changes these will not be called,
> > >>> whereas
> > >>> > > > > >> > `PartitionAssignor` is used for assignor logic, whose
> > >>> callback
> > >>> > > would
> > >>> > > > > >> always
> > >>> > > > > >> > be called no matter if the partitions have changed or
> not.
> > >>> > > > > >>
> > >>> > > > > >> I think a third option is to gracefully expose generation
> id
> > >>> as
> > >>> > part
> > >>> > > > of
> > >>> > > > > > consumer API, so that we don't need to
> > >>> > > > > > bother overloading various callbacks. Of course, this
> builds
> > >>> upon
> > >>> > the
> > >>> > > > > > assumption that topic partitions
> > >>> > > > > > will not be included in new initTransaction API.
> > >>> > > > > >
> > >>> > > > > > > 3. I feel it is a bit awkward to let the TxnCoordinator
> > >>> keeping
> > >>> > > > partition
> > >>> > > > > >> > assignments since it is sort of taking over the job of
> the
> > >>> > > > > >> > ConsumerCoordinator, and may likely cause a split-brain
> > >>> problem
> > >>> > as
> > >>> > > > two
> > >>> > > > > >> > coordinators keep a copy of this assignment which may be
> > >>> > > different.
> > >>> > > > > >> >
> > >>> > > > > >> > I think co-locating does have some merits here, i.e.
> > >>> letting the
> > >>> > > > > >> > ConsumerCoordinator which has the source-of-truth of
> > >>> assignment
> > >>> > to
> > >>> > > > act
> > >>> > > > > >> as
> > >>> > > > > >> > the TxnCoordinator as well; but I agree there's also
> some
> > >>> cons
> > >>> > of
> > >>> > > > > >> coupling
> > >>> > > > > >> > them together. I'm still a bit inclining towards
> > colocation
> > >>> but
> > >>> > if
> > >>> > > > there
> > >>> > > > > >> > are good rationales not to do so I can be convinced as
> > well.
> > >>> > > > > >> >
> > >>> > > > > >>
> > >>> > > > > > The purpose of co-location is to let txn coordinator see
> the
> > >>> group
> > >>> > > > > > assignment. This priority is weakened
> > >>> > > > > > when we already have defense on the consumer offset fetch,
> > so I
> > >>> > guess
> > >>> > > > it's
> > >>> > > > > > not super important anymore.
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > >> > 4. I guess I'm preferring the philosophy of "only add
> > >>> configs if
> > >>> > > > > >> there's no
> > >>> > > > > >> > other ways", since more and more configs would make it
> > less
> > >>> and
> > >>> > > less
> > >>> > > > > >> > intuitive out of the box to use.
> > >>> > > > > >> >
> > >>> > > > > >> > I think it's a valid point that checks upon starting up
> > >>> does not
> > >>> > > > cope
> > >>> > > > > >> with
> > >>> > > > > >> > brokers downgrading but even with a config, but it is
> > still
> > >>> hard
> > >>> > > for
> > >>> > > > > >> users
> > >>> > > > > >> > to determine when they can be ensured the broker would
> > never
> > >>> > > > downgrade
> > >>> > > > > >> > anymore and hence can safely switch the config. So my
> > >>> feeling is
> > >>> > > > that
> > >>> > > > > >> this
> > >>> > > > > >> > config would not be helping too much still. If we want
> to
> > >>> be at
> > >>> > > the
> > >>> > > > > >> safer
> > >>> > > > > >> > side, then I'd suggest we modify the Coordinator ->
> > >>> > NetworkClient
> > >>> > > > > >> hierarchy
> > >>> > > > > >> > to allow the NetworkClient being able to pass the
> > APIVersion
> > >>> > > > metadata to
> > >>> > > > > >> > Coordinator, so that Coordinator can rely on that logic
> to
> > >>> > change
> > >>> > > > its
> > >>> > > > > >> > behavior dynamically.
> > >>> > > > > >>
> > >>> > > > > > The stream thread init could not be supported by a client
> > >>> > coordinator
> > >>> > > > > > behavior change on the fly,
> > >>> > > > > > we are only losing possibilities after we initialized.
> (main
> > >>> thread
> > >>> > > > gets
> > >>> > > > > > exit and no thread has global picture anymore)
> > >>> > > > > > If we do want to support auto version detection, admin
> client
> > >>> > request
> > >>> > > > in
> > >>> > > > > > this sense shall be easier.
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > >> >
> > >>> > > > > >> > 5. I do not have a concrete idea about how the impact on
> > >>> Connect
> > >>> > > > would
> > >>> > > > > >> > make, maybe Randall or Konstantine can help here?
> > >>> > > > > >> >
> > >>> > > > > >>
> > >>> > > > > > Sounds good, let's see their thoughts.
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > >> > Guozhang
> > >>> > > > > >> >
> > >>> > > > > >> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen <
> > >>> > > > > >> reluctanthero...@gmail.com>
> > >>> > > > > >> > wrote:
> > >>> > > > > >> >
> > >>> > > > > >> > > Hey Jason,
> > >>> > > > > >> > >
> > >>> > > > > >> > > thank you for the proposal here. Some of my thoughts
> > >>> below.
> > >>> > > > > >> > >
> > >>> > > > > >> > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson <
> > >>> > > > ja...@confluent.io>
> > >>> > > > > >> > > wrote:
> > >>> > > > > >> > >
> > >>> > > > > >> > > > Hi Boyang,
> > >>> > > > > >> > > >
> > >>> > > > > >> > > > Thanks for picking this up! Still reading through
> the
> > >>> > updates,
> > >>> > > > but
> > >>> > > > > >> here
> > >>> > > > > >> > > are
> > >>> > > > > >> > > > a couple initial comments on the APIs:
> > >>> > > > > >> > > >
> > >>> > > > > >> > > > 1. The `TxnProducerIdentity` class is a bit
> awkward. I
> > >>> think
> > >>> > > we
> > >>> > > > are
> > >>> > > > > >> > > trying
> > >>> > > > > >> > > > to encapsulate state from the current group
> > assignment.
> > >>> > Maybe
> > >>> > > > > >> something
> > >>> > > > > >> > > > like `ConsumerAssignment` would be clearer? If we
> make
> > >>> the
> > >>> > > usage
> > >>> > > > > >> > > consistent
> > >>> > > > > >> > > > across the consumer and producer, then we can avoid
> > >>> exposing
> > >>> > > > > >> internal
> > >>> > > > > >> > > state
> > >>> > > > > >> > > > like the generationId.
> > >>> > > > > >> > > >
> > >>> > > > > >> > > > For example:
> > >>> > > > > >> > > >
> > >>> > > > > >> > > > // Public API
> > >>> > > > > >> > > > interface ConsumerAssignment {
> > >>> > > > > >> > > >   Set<TopicPartition> partittions();
> > >>> > > > > >> > > > }
> > >>> > > > > >> > > >
> > >>> > > > > >> > > > // Not a public API
> > >>> > > > > >> > > > class InternalConsumerAssignment implements
> > >>> > > ConsumerAssignment {
> > >>> > > > > >> > > >   Set<TopicPartition> partittions;
> > >>> > > > > >> > > >   int generationId;
> > >>> > > > > >> > > > }
> > >>> > > > > >> > > >
> > >>> > > > > >> > > > Then we can change the rebalance listener to
> something
> > >>> like
> > >>> > > > this:
> > >>> > > > > >> > > > onPartitionsAssigned(ConsumerAssignment assignment)
> > >>> > > > > >> > > >
> > >>> > > > > >> > > > And on the producer:
> > >>> > > > > >> > > > void initTransactions(String groupId,
> > ConsumerAssignment
> > >>> > > > > >> assignment);
> > >>> > > > > >> > > >
> > >>> > > > > >> > > > 2. Another bit of awkwardness is the fact that we
> have
> > >>> to
> > >>> > pass
> > >>> > > > the
> > >>> > > > > >> > > groupId
> > >>> > > > > >> > > > through both initTransactions() and
> > >>> > > sendOffsetsToTransaction().
> > >>> > > > We
> > >>> > > > > >> > could
> > >>> > > > > >> > > > consider a config instead. Maybe something like `
> > >>> > > > > >> > transactional.group.id
> > >>> > > > > >> > > `?
> > >>> > > > > >> > > > Then we could simplify the producer APIs,
> potentially
> > >>> even
> > >>> > > > > >> deprecating
> > >>> > > > > >> > > the
> > >>> > > > > >> > > > current sendOffsetsToTransaction. In fact, for this
> > new
> > >>> > usage,
> > >>> > > > the `
> > >>> > > > > >> > > > transational.id` config is not needed. It would be
> > >>> nice if
> > >>> > we
> > >>> > > > don't
> > >>> > > > > >> > have
> > >>> > > > > >> > > > to
> > >>> > > > > >> > > > provide it.
> > >>> > > > > >> > > >
> > >>> > > > > >> > > > I like the idea of combining 1 and 2. We could
> > >>> definitely
> > >>> > pass
> > >>> > > > in a
> > >>> > > > > >> > > group.id config
> > >>> > > > > >> > > so that we could avoid exposing that information in a
> > >>> public
> > >>> > > API.
> > >>> > > > The
> > >>> > > > > >> > > question I have
> > >>> > > > > >> > > is that whether we should name the interface
> > >>> `GroupAssignment`
> > >>> > > > > >> instead,
> > >>> > > > > >> > so
> > >>> > > > > >> > > that Connect later
> > >>> > > > > >> > > could also extend on the same interface, just to echo
> > >>> > Guozhang's
> > >>> > > > point
> > >>> > > > > >> > > here, Also the base interface
> > >>> > > > > >> > > is better to be defined empty for easy extension, or
> > >>> define an
> > >>> > > > > >> abstract
> > >>> > > > > >> > > type called `Resource` to be shareable
> > >>> > > > > >> > > later IMHO.
> > >>> > > > > >> > >
> > >>> > > > > >> > >
> > >>> > > > > >> > > > By the way, I'm a bit confused about discussion
> above
> > >>> about
> > >>> > > > > >> colocating
> > >>> > > > > >> > > the
> > >>> > > > > >> > > > txn and group coordinators. That is not actually
> > >>> necessary,
> > >>> > is
> > >>> > > > it?
> > >>> > > > > >> > > >
> > >>> > > > > >> > > > Yes, this is not a requirement for this KIP, because
> > it
> > >>> is
> > >>> > > > > >> inherently
> > >>> > > > > >> > > impossible to
> > >>> > > > > >> > > achieve co-locating  topic partition of transaction
> log
> > >>> and
> > >>> > > > consumed
> > >>> > > > > >> > offset
> > >>> > > > > >> > > topics.
> > >>> > > > > >> > >
> > >>> > > > > >> > >
> > >>> > > > > >> > > > Thanks,
> > >>> > > > > >> > > > Jason
> > >>> > > > > >> > > >
> > >>> > > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen <
> > >>> > > > > >> reluctanthero...@gmail.com
> > >>> > > > > >> > >
> > >>> > > > > >> > > > wrote:
> > >>> > > > > >> > > >
> > >>> > > > > >> > > > > Thank you Ismael for the suggestion. We will
> attempt
> > >>> to
> > >>> > > > address
> > >>> > > > > >> it by
> > >>> > > > > >> > > > > giving more details to rejected alternative
> section.
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > Thank you for the comment Guozhang! Answers are
> > inline
> > >>> > > below.
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang <
> > >>> > > > wangg...@gmail.com
> > >>> > > > > >> >
> > >>> > > > > >> > > > wrote:
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > > Hello Boyang,
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > > > Thanks for the KIP, I have some comments below:
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > > > 1. "Once transactions are complete, the call
> will
> > >>> > return."
> > >>> > > > This
> > >>> > > > > >> > seems
> > >>> > > > > >> > > > > > different from the existing behavior, in which
> we
> > >>> would
> > >>> > > > return a
> > >>> > > > > >> > > > > retriable
> > >>> > > > > >> > > > > > CONCURRENT_TRANSACTIONS and let the client to
> > >>> retry, is
> > >>> > > this
> > >>> > > > > >> > > > intentional?
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > I don’t think it is intentional, and I will defer
> > this
> > >>> > > > question to
> > >>> > > > > >> > > Jason
> > >>> > > > > >> > > > > when he got time to answer since from what I
> > >>> understood
> > >>> > > retry
> > >>> > > > and
> > >>> > > > > >> on
> > >>> > > > > >> > > hold
> > >>> > > > > >> > > > > seem both valid approaches.
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > > 2. "an overload to onPartitionsAssigned in the
> > >>> > consumer's
> > >>> > > > > >> rebalance
> > >>> > > > > >> > > > > > listener interface": as part of KIP-341 we've
> > >>> already
> > >>> > add
> > >>> > > > this
> > >>> > > > > >> > > > > information
> > >>> > > > > >> > > > > > to the onAssignment callback. Would this be
> > >>> sufficient?
> > >>> > Or
> > >>> > > > more
> > >>> > > > > >> > > > generally
> > >>> > > > > >> > > > > > speaking, which information have to be passed
> > >>> around in
> > >>> > > > > >> rebalance
> > >>> > > > > >> > > > > callback
> > >>> > > > > >> > > > > > while others can be passed around in
> > >>> PartitionAssignor
> > >>> > > > > >> callback? In
> > >>> > > > > >> > > > > Streams
> > >>> > > > > >> > > > > > for example both callbacks are used but most
> > >>> critical
> > >>> > > > > >> information
> > >>> > > > > >> > is
> > >>> > > > > >> > > > > passed
> > >>> > > > > >> > > > > > via onAssignment.
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > We still need to extend ConsumerRebalanceListener
> > >>> because
> > >>> > > > it’s the
> > >>> > > > > >> > > > > interface we could have public access to. The
> > >>> > #onAssignment
> > >>> > > > call
> > >>> > > > > >> is
> > >>> > > > > >> > > > defined
> > >>> > > > > >> > > > > on PartitionAssignor level which is not easy to
> work
> > >>> with
> > >>> > > > external
> > >>> > > > > >> > > > > producers.
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > > 3. "We propose to use a separate record type in
> > >>> order to
> > >>> > > > store
> > >>> > > > > >> the
> > >>> > > > > >> > > > group
> > >>> > > > > >> > > > > > assignment.": hmm, I thought with the third
> typed
> > >>> > > > > >> FindCoordinator,
> > >>> > > > > >> > > the
> > >>> > > > > >> > > > > same
> > >>> > > > > >> > > > > > broker that act as the  consumer coordinator
> would
> > >>> > always
> > >>> > > be
> > >>> > > > > >> > selected
> > >>> > > > > >> > > > as
> > >>> > > > > >> > > > > > the txn coordinator, in which case it can access
> > its
> > >>> > local
> > >>> > > > cache
> > >>> > > > > >> > > > > metadata /
> > >>> > > > > >> > > > > > offset topic to get this information already? We
> > >>> just
> > >>> > need
> > >>> > > > to
> > >>> > > > > >> think
> > >>> > > > > >> > > > about
> > >>> > > > > >> > > > > > how to make these two modules directly exchange
> > >>> > > information
> > >>> > > > > >> without
> > >>> > > > > >> > > > > messing
> > >>> > > > > >> > > > > > up the code hierarchy.
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > These two coordinators will be on the same broker
> > only
> > >>> > when
> > >>> > > > > >> number of
> > >>> > > > > >> > > > > partitions for transaction state topic and
> consumer
> > >>> offset
> > >>> > > > topic
> > >>> > > > > >> are
> > >>> > > > > >> > > the
> > >>> > > > > >> > > > > same. This normally holds true, but I'm afraid
> > >>> > > > > >> > > > > we couldn't make this assumption?
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > 4. The config of
> "CONSUMER_GROUP_AWARE_TRANSACTION":
> > >>> it
> > >>> > > seems
> > >>> > > > the
> > >>> > > > > >> > goal
> > >>> > > > > >> > > of
> > >>> > > > > >> > > > > > this config is just to avoid old-versioned
> broker
> > >>> to not
> > >>> > > be
> > >>> > > > > >> able to
> > >>> > > > > >> > > > > > recognize newer versioned client. I think if we
> > can
> > >>> do
> > >>> > > > something
> > >>> > > > > >> > else
> > >>> > > > > >> > > > to
> > >>> > > > > >> > > > > > avoid this config though, for example we can use
> > the
> > >>> > > > embedded
> > >>> > > > > >> > > > AdminClient
> > >>> > > > > >> > > > > > to send the APIVersion request upon starting up,
> > and
> > >>> > based
> > >>> > > > on
> > >>> > > > > >> the
> > >>> > > > > >> > > > > returned
> > >>> > > > > >> > > > > > value decides whether to go to the old code path
> > or
> > >>> the
> > >>> > > new
> > >>> > > > > >> > behavior.
> > >>> > > > > >> > > > > > Admittedly asking a random broker about
> APIVersion
> > >>> does
> > >>> > > not
> > >>> > > > > >> > guarantee
> > >>> > > > > >> > > > the
> > >>> > > > > >> > > > > > whole cluster's versions, but what we can do is
> to
> > >>> first
> > >>> > > 1)
> > >>> > > > find
> > >>> > > > > >> > the
> > >>> > > > > >> > > > > > coordinator (and if the random broker does not
> > even
> > >>> > > > recognize
> > >>> > > > > >> the
> > >>> > > > > >> > new
> > >>> > > > > >> > > > > > discover type, fall back to old path directly),
> > and
> > >>> then
> > >>> > > 2)
> > >>> > > > ask
> > >>> > > > > >> the
> > >>> > > > > >> > > > > > discovered coordinator about its supported
> > >>> APIVersion.
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > The caveat here is that we have to make sure both
> > the
> > >>> > group
> > >>> > > > > >> > coordinator
> > >>> > > > > >> > > > and
> > >>> > > > > >> > > > > transaction coordinator are on the latest version
> > >>> during
> > >>> > > init
> > >>> > > > > >> stage.
> > >>> > > > > >> > > This
> > >>> > > > > >> > > > > is potentially doable as we only need a consumer
> > >>> group.id
> > >>> > > > > >> > > > > to check that. In the meantime, a hard-coded
> config
> > is
> > >>> > > still a
> > >>> > > > > >> > > favorable
> > >>> > > > > >> > > > > backup in case the server has downgraded, so you
> > will
> > >>> want
> > >>> > > to
> > >>> > > > use
> > >>> > > > > >> a
> > >>> > > > > >> > new
> > >>> > > > > >> > > > > version client without `consumer group`
> > transactional
> > >>> > > support.
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > 5. This is a meta question: have you considered
> how
> > >>> this
> > >>> > can
> > >>> > > > be
> > >>> > > > > >> > applied
> > >>> > > > > >> > > > to
> > >>> > > > > >> > > > > > Kafka Connect as well? For example, for source
> > >>> > connectors,
> > >>> > > > the
> > >>> > > > > >> > > > assignment
> > >>> > > > > >> > > > > > is not by "partitions", but by some other sort
> of
> > >>> > > > "resources"
> > >>> > > > > >> based
> > >>> > > > > >> > > on
> > >>> > > > > >> > > > > the
> > >>> > > > > >> > > > > > source systems, how KIP-447 would affect Kafka
> > >>> > Connectors
> > >>> > > > that
> > >>> > > > > >> > > > > implemented
> > >>> > > > > >> > > > > > EOS as well?
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > No, it's not currently included in the scope.
> Could
> > >>> you
> > >>> > > point
> > >>> > > > me
> > >>> > > > > >> to a
> > >>> > > > > >> > > > > sample source connector who uses EOS? Could always
> > >>> > > piggy-back
> > >>> > > > into
> > >>> > > > > >> > the
> > >>> > > > > >> > > > > TxnProducerIdentity struct with more information
> > such
> > >>> as
> > >>> > > > tasks. If
> > >>> > > > > >> > > > > this is something to support in near term, an
> > abstract
> > >>> > type
> > >>> > > > called
> > >>> > > > > >> > > > > "Resource" could be provided and let topic
> partition
> > >>> and
> > >>> > > > connect
> > >>> > > > > >> task
> > >>> > > > > >> > > > > implement it.
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > > > Guozhang
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma <
> > >>> > > > ism...@juma.me.uk>
> > >>> > > > > >> > > wrote:
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > > > > Hi Boyang,
> > >>> > > > > >> > > > > > >
> > >>> > > > > >> > > > > > > Thanks for the KIP. It's good that we listed a
> > >>> number
> > >>> > of
> > >>> > > > > >> rejected
> > >>> > > > > >> > > > > > > alternatives. It would be helpful to have an
> > >>> > explanation
> > >>> > > > of
> > >>> > > > > >> why
> > >>> > > > > >> > > they
> > >>> > > > > >> > > > > were
> > >>> > > > > >> > > > > > > rejected.
> > >>> > > > > >> > > > > > >
> > >>> > > > > >> > > > > > > Ismael
> > >>> > > > > >> > > > > > >
> > >>> > > > > >> > > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen <
> > >>> > > > > >> bche...@outlook.com
> > >>> > > > > >> > >
> > >>> > > > > >> > > > > wrote:
> > >>> > > > > >> > > > > > >
> > >>> > > > > >> > > > > > > > Hey all,
> > >>> > > > > >> > > > > > > >
> > >>> > > > > >> > > > > > > > I would like to start a discussion for
> > KIP-447:
> > >>> > > > > >> > > > > > > >
> > >>> > > > > >> > > > > > > >
> > >>> > > > > >> > > > > > >
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > >
> > >>> > > > > >> > >
> > >>> > > > > >> >
> > >>> > > > > >>
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > >>> > > > > >> > > > > > > >
> > >>> > > > > >> > > > > > > > this is a work originated by Jason Gustafson
> > >>> and we
> > >>> > > > would
> > >>> > > > > >> like
> > >>> > > > > >> > to
> > >>> > > > > >> > > > > > proceed
> > >>> > > > > >> > > > > > > > into discussion stage.
> > >>> > > > > >> > > > > > > >
> > >>> > > > > >> > > > > > > > Let me know your thoughts, thanks!
> > >>> > > > > >> > > > > > > >
> > >>> > > > > >> > > > > > > > Boyang
> > >>> > > > > >> > > > > > > >
> > >>> > > > > >> > > > > > >
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > > > --
> > >>> > > > > >> > > > > > -- Guozhang
> > >>> > > > > >> > > > > >
> > >>> > > > > >> > > > >
> > >>> > > > > >> > > >
> > >>> > > > > >> > >
> > >>> > > > > >> >
> > >>> > > > > >> >
> > >>> > > > > >> > --
> > >>> > > > > >> > -- Guozhang
> > >>> > > > > >> >
> > >>> > > > > >>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>
> >
>

Reply via email to