For reference, we have BrokerApiVersionCommand already as a public
interface. We have a bit of tech debt at the moment because it uses a
custom AdminClient. It would be nice to clean that up. In general, I think
it is reasonable to expose from AdminClient. It can be used by management
tools to inspect running Kafka versions for example.

-Jason

On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Thank you for the context Colin. The groupId was indeed a copy-paste error.
> Our use case here for 447 is (Quoted from Guozhang):
> '''
> I think if we can do something else to
> avoid this config though, for example we can use the embedded AdminClient
> to send the APIVersion request upon starting up, and based on the returned
> value decides whether to go to the old code path or the new behavior.
> '''
> The benefit we get is to avoid adding a new configuration to make a
> decision simply base on broker version. If you have concerns with exposing
> ApiVersion for client, we could
> try to think of alternative solutions too.
>
> Boyang
>
>
>
> On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe <cmcc...@apache.org> wrote:
>
> > kafka.api.ApiVersion is an internal class, not suitable to exposing
> > through AdminClient.  That class is not even accessible without having
> the
> > broker jars on your CLASSPATH.
> >
> > Another question is, what is the groupId parameter doing in the call?
> The
> > API versions are the same no matter what consumer group we use, right?
> > Perhaps this was a copy and paste error?
> >
> > This is not the first time we have discussed having a method in
> > AdminClient to retrieve API version information.  In fact, the original
> KIP
> > which created KafkaAdminClient specified an API for fetching version
> > information.  It was called apiVersions and it is still there on the
> wiki.
> > See
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> >
> > However, this API wasn't ready in time for 0.11.0 so we shipped without
> > it.  There was a JIRA to implement it for later versions,
> > https://issues.apache.org/jira/browse/KAFKA-5214 , as well as a PR,
> > https://github.com/apache/kafka/pull/3012 .  However, we started to
> > rethink whether this AdminClient function was even necessary.  Most of
> the
> > use-cases we could think of seemed like horrible hacks.  So it has never
> > really been implemented (yet?).
> >
> > best,
> > Colin
> >
> >
> > On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote:
> > > Actually, after a second thought, I think it actually makes sense to
> > > support auto upgrade through admin client to help use get api version
> > > from
> > > broker.
> > > A draft KIP is here:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> > >
> > > Boyang
> > >
> > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thank you Guozhang, some of my understandings are inline below.
> > > >
> > > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson <ja...@confluent.io
> >
> > > > wrote:
> > > >
> > > >> >
> > > >> > I think co-locating does have some merits here, i.e. letting the
> > > >> > ConsumerCoordinator which has the source-of-truth of assignment to
> > act
> > > >> as
> > > >> > the TxnCoordinator as well; but I agree there's also some cons of
> > > >> coupling
> > > >> > them together. I'm still a bit inclining towards colocation but if
> > there
> > > >> > are good rationales not to do so I can be convinced as well.
> > > >>
> > > >>
> > > >> The good rationale is that we have no mechanism to colocate
> > partitions ;).
> > > >> Are you suggesting we store the group and transaction state in the
> > same
> > > >> log? Can you be more concrete about the benefit?
> > > >>
> > > >> -Jason
> > > >>
> > > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang <wangg...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Hi Boyang,
> > > >> >
> > > >> > 1. One advantage of retry against on-hold is that it will not
> > tie-up a
> > > >> > handler thread (of course the latter could do the same but that
> > involves
> > > >> > using a purgatory which is more complicated), and also it is less
> > > >> likely to
> > > >> > violate request timeout. So I think there are some rationales to
> > prefer
> > > >> > retries.
> > > >> >
> > > >>
> > > >  That sounds fair to me, also we are avoiding usage of another
> > purgatory
> > > > instance. Usually for one back-off
> > > > we are only delaying 50ms during startup which is trivial cost. This
> > > > behavior shouldn't be changed.
> > > >
> > > > > 2. Regarding "ConsumerRebalanceListener": both
> > ConsumerRebalanceListener
> > > >> > and PartitionAssignors are user-customizable modules, and only
> > > >> difference
> > > >> > is that the former is specified via code and the latter is
> > specified via
> > > >> > config.
> > > >> >
> > > >> > Regarding Jason's proposal of ConsumerAssignment, one thing to
> note
> > > >> though
> > > >> > with KIP-429 the onPartitionAssigned may not be called if the
> > assignment
> > > >> > does not change, whereas onAssignment would always be called at
> the
> > end
> > > >> of
> > > >> > sync-group response. My proposed semantics is that
> > > >> > `RebalanceListener#onPartitionsXXX` are used for notifications to
> > user,
> > > >> and
> > > >> > hence if there's no changes these will not be called, whereas
> > > >> > `PartitionAssignor` is used for assignor logic, whose callback
> would
> > > >> always
> > > >> > be called no matter if the partitions have changed or not.
> > > >>
> > > >> I think a third option is to gracefully expose generation id as part
> > of
> > > > consumer API, so that we don't need to
> > > > bother overloading various callbacks. Of course, this builds upon the
> > > > assumption that topic partitions
> > > > will not be included in new initTransaction API.
> > > >
> > > > > 3. I feel it is a bit awkward to let the TxnCoordinator keeping
> > partition
> > > >> > assignments since it is sort of taking over the job of the
> > > >> > ConsumerCoordinator, and may likely cause a split-brain problem as
> > two
> > > >> > coordinators keep a copy of this assignment which may be
> different.
> > > >> >
> > > >> > I think co-locating does have some merits here, i.e. letting the
> > > >> > ConsumerCoordinator which has the source-of-truth of assignment to
> > act
> > > >> as
> > > >> > the TxnCoordinator as well; but I agree there's also some cons of
> > > >> coupling
> > > >> > them together. I'm still a bit inclining towards colocation but if
> > there
> > > >> > are good rationales not to do so I can be convinced as well.
> > > >> >
> > > >>
> > > > The purpose of co-location is to let txn coordinator see the group
> > > > assignment. This priority is weakened
> > > > when we already have defense on the consumer offset fetch, so I guess
> > it's
> > > > not super important anymore.
> > > >
> > > >
> > > >> > 4. I guess I'm preferring the philosophy of "only add configs if
> > > >> there's no
> > > >> > other ways", since more and more configs would make it less and
> less
> > > >> > intuitive out of the box to use.
> > > >> >
> > > >> > I think it's a valid point that checks upon starting up does not
> > cope
> > > >> with
> > > >> > brokers downgrading but even with a config, but it is still hard
> for
> > > >> users
> > > >> > to determine when they can be ensured the broker would never
> > downgrade
> > > >> > anymore and hence can safely switch the config. So my feeling is
> > that
> > > >> this
> > > >> > config would not be helping too much still. If we want to be at
> the
> > > >> safer
> > > >> > side, then I'd suggest we modify the Coordinator -> NetworkClient
> > > >> hierarchy
> > > >> > to allow the NetworkClient being able to pass the APIVersion
> > metadata to
> > > >> > Coordinator, so that Coordinator can rely on that logic to change
> > its
> > > >> > behavior dynamically.
> > > >>
> > > > The stream thread init could not be supported by a client coordinator
> > > > behavior change on the fly,
> > > > we are only losing possibilities after we initialized. (main thread
> > gets
> > > > exit and no thread has global picture anymore)
> > > > If we do want to support auto version detection, admin client request
> > in
> > > > this sense shall be easier.
> > > >
> > > >
> > > >> >
> > > >> > 5. I do not have a concrete idea about how the impact on Connect
> > would
> > > >> > make, maybe Randall or Konstantine can help here?
> > > >> >
> > > >>
> > > > Sounds good, let's see their thoughts.
> > > >
> > > >
> > > >> > Guozhang
> > > >> >
> > > >> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen <
> > > >> reluctanthero...@gmail.com>
> > > >> > wrote:
> > > >> >
> > > >> > > Hey Jason,
> > > >> > >
> > > >> > > thank you for the proposal here. Some of my thoughts below.
> > > >> > >
> > > >> > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson <
> > ja...@confluent.io>
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Hi Boyang,
> > > >> > > >
> > > >> > > > Thanks for picking this up! Still reading through the updates,
> > but
> > > >> here
> > > >> > > are
> > > >> > > > a couple initial comments on the APIs:
> > > >> > > >
> > > >> > > > 1. The `TxnProducerIdentity` class is a bit awkward. I think
> we
> > are
> > > >> > > trying
> > > >> > > > to encapsulate state from the current group assignment. Maybe
> > > >> something
> > > >> > > > like `ConsumerAssignment` would be clearer? If we make the
> usage
> > > >> > > consistent
> > > >> > > > across the consumer and producer, then we can avoid exposing
> > > >> internal
> > > >> > > state
> > > >> > > > like the generationId.
> > > >> > > >
> > > >> > > > For example:
> > > >> > > >
> > > >> > > > // Public API
> > > >> > > > interface ConsumerAssignment {
> > > >> > > >   Set<TopicPartition> partittions();
> > > >> > > > }
> > > >> > > >
> > > >> > > > // Not a public API
> > > >> > > > class InternalConsumerAssignment implements
> ConsumerAssignment {
> > > >> > > >   Set<TopicPartition> partittions;
> > > >> > > >   int generationId;
> > > >> > > > }
> > > >> > > >
> > > >> > > > Then we can change the rebalance listener to something like
> > this:
> > > >> > > > onPartitionsAssigned(ConsumerAssignment assignment)
> > > >> > > >
> > > >> > > > And on the producer:
> > > >> > > > void initTransactions(String groupId, ConsumerAssignment
> > > >> assignment);
> > > >> > > >
> > > >> > > > 2. Another bit of awkwardness is the fact that we have to pass
> > the
> > > >> > > groupId
> > > >> > > > through both initTransactions() and
> sendOffsetsToTransaction().
> > We
> > > >> > could
> > > >> > > > consider a config instead. Maybe something like `
> > > >> > transactional.group.id
> > > >> > > `?
> > > >> > > > Then we could simplify the producer APIs, potentially even
> > > >> deprecating
> > > >> > > the
> > > >> > > > current sendOffsetsToTransaction. In fact, for this new usage,
> > the `
> > > >> > > > transational.id` config is not needed. It would be nice if we
> > don't
> > > >> > have
> > > >> > > > to
> > > >> > > > provide it.
> > > >> > > >
> > > >> > > > I like the idea of combining 1 and 2. We could definitely pass
> > in a
> > > >> > > group.id config
> > > >> > > so that we could avoid exposing that information in a public
> API.
> > The
> > > >> > > question I have
> > > >> > > is that whether we should name the interface `GroupAssignment`
> > > >> instead,
> > > >> > so
> > > >> > > that Connect later
> > > >> > > could also extend on the same interface, just to echo Guozhang's
> > point
> > > >> > > here, Also the base interface
> > > >> > > is better to be defined empty for easy extension, or define an
> > > >> abstract
> > > >> > > type called `Resource` to be shareable
> > > >> > > later IMHO.
> > > >> > >
> > > >> > >
> > > >> > > > By the way, I'm a bit confused about discussion above about
> > > >> colocating
> > > >> > > the
> > > >> > > > txn and group coordinators. That is not actually necessary, is
> > it?
> > > >> > > >
> > > >> > > > Yes, this is not a requirement for this KIP, because it is
> > > >> inherently
> > > >> > > impossible to
> > > >> > > achieve co-locating  topic partition of transaction log and
> > consumed
> > > >> > offset
> > > >> > > topics.
> > > >> > >
> > > >> > >
> > > >> > > > Thanks,
> > > >> > > > Jason
> > > >> > > >
> > > >> > > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen <
> > > >> reluctanthero...@gmail.com
> > > >> > >
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > > > Thank you Ismael for the suggestion. We will attempt to
> > address
> > > >> it by
> > > >> > > > > giving more details to rejected alternative section.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > Thank you for the comment Guozhang! Answers are inline
> below.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang <
> > wangg...@gmail.com
> > > >> >
> > > >> > > > wrote:
> > > >> > > > >
> > > >> > > > > > Hello Boyang,
> > > >> > > > > >
> > > >> > > > > > Thanks for the KIP, I have some comments below:
> > > >> > > > > >
> > > >> > > > > > 1. "Once transactions are complete, the call will return."
> > This
> > > >> > seems
> > > >> > > > > > different from the existing behavior, in which we would
> > return a
> > > >> > > > > retriable
> > > >> > > > > > CONCURRENT_TRANSACTIONS and let the client to retry, is
> this
> > > >> > > > intentional?
> > > >> > > > > >
> > > >> > > > >
> > > >> > > > > I don’t think it is intentional, and I will defer this
> > question to
> > > >> > > Jason
> > > >> > > > > when he got time to answer since from what I understood
> retry
> > and
> > > >> on
> > > >> > > hold
> > > >> > > > > seem both valid approaches.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > > 2. "an overload to onPartitionsAssigned in the consumer's
> > > >> rebalance
> > > >> > > > > > listener interface": as part of KIP-341 we've already add
> > this
> > > >> > > > > information
> > > >> > > > > > to the onAssignment callback. Would this be sufficient? Or
> > more
> > > >> > > > generally
> > > >> > > > > > speaking, which information have to be passed around in
> > > >> rebalance
> > > >> > > > > callback
> > > >> > > > > > while others can be passed around in PartitionAssignor
> > > >> callback? In
> > > >> > > > > Streams
> > > >> > > > > > for example both callbacks are used but most critical
> > > >> information
> > > >> > is
> > > >> > > > > passed
> > > >> > > > > > via onAssignment.
> > > >> > > > > >
> > > >> > > > >
> > > >> > > > > We still need to extend ConsumerRebalanceListener because
> > it’s the
> > > >> > > > > interface we could have public access to. The #onAssignment
> > call
> > > >> is
> > > >> > > > defined
> > > >> > > > > on PartitionAssignor level which is not easy to work with
> > external
> > > >> > > > > producers.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > > 3. "We propose to use a separate record type in order to
> > store
> > > >> the
> > > >> > > > group
> > > >> > > > > > assignment.": hmm, I thought with the third typed
> > > >> FindCoordinator,
> > > >> > > the
> > > >> > > > > same
> > > >> > > > > > broker that act as the  consumer coordinator would always
> be
> > > >> > selected
> > > >> > > > as
> > > >> > > > > > the txn coordinator, in which case it can access its local
> > cache
> > > >> > > > > metadata /
> > > >> > > > > > offset topic to get this information already? We just need
> > to
> > > >> think
> > > >> > > > about
> > > >> > > > > > how to make these two modules directly exchange
> information
> > > >> without
> > > >> > > > > messing
> > > >> > > > > > up the code hierarchy.
> > > >> > > > > >
> > > >> > > > >
> > > >> > > > > These two coordinators will be on the same broker only when
> > > >> number of
> > > >> > > > > partitions for transaction state topic and consumer offset
> > topic
> > > >> are
> > > >> > > the
> > > >> > > > > same. This normally holds true, but I'm afraid
> > > >> > > > > we couldn't make this assumption?
> > > >> > > > >
> > > >> > > > > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it
> seems
> > the
> > > >> > goal
> > > >> > > of
> > > >> > > > > > this config is just to avoid old-versioned broker to not
> be
> > > >> able to
> > > >> > > > > > recognize newer versioned client. I think if we can do
> > something
> > > >> > else
> > > >> > > > to
> > > >> > > > > > avoid this config though, for example we can use the
> > embedded
> > > >> > > > AdminClient
> > > >> > > > > > to send the APIVersion request upon starting up, and based
> > on
> > > >> the
> > > >> > > > > returned
> > > >> > > > > > value decides whether to go to the old code path or the
> new
> > > >> > behavior.
> > > >> > > > > > Admittedly asking a random broker about APIVersion does
> not
> > > >> > guarantee
> > > >> > > > the
> > > >> > > > > > whole cluster's versions, but what we can do is to first
> 1)
> > find
> > > >> > the
> > > >> > > > > > coordinator (and if the random broker does not even
> > recognize
> > > >> the
> > > >> > new
> > > >> > > > > > discover type, fall back to old path directly), and then
> 2)
> > ask
> > > >> the
> > > >> > > > > > discovered coordinator about its supported APIVersion.
> > > >> > > > > >
> > > >> > > > >
> > > >> > > > > The caveat here is that we have to make sure both the group
> > > >> > coordinator
> > > >> > > > and
> > > >> > > > > transaction coordinator are on the latest version during
> init
> > > >> stage.
> > > >> > > This
> > > >> > > > > is potentially doable as we only need a consumer group.id
> > > >> > > > > to check that. In the meantime, a hard-coded config is
> still a
> > > >> > > favorable
> > > >> > > > > backup in case the server has downgraded, so you will want
> to
> > use
> > > >> a
> > > >> > new
> > > >> > > > > version client without `consumer group` transactional
> support.
> > > >> > > > >
> > > >> > > > > 5. This is a meta question: have you considered how this can
> > be
> > > >> > applied
> > > >> > > > to
> > > >> > > > > > Kafka Connect as well? For example, for source connectors,
> > the
> > > >> > > > assignment
> > > >> > > > > > is not by "partitions", but by some other sort of
> > "resources"
> > > >> based
> > > >> > > on
> > > >> > > > > the
> > > >> > > > > > source systems, how KIP-447 would affect Kafka Connectors
> > that
> > > >> > > > > implemented
> > > >> > > > > > EOS as well?
> > > >> > > > > >
> > > >> > > > >
> > > >> > > > > No, it's not currently included in the scope. Could you
> point
> > me
> > > >> to a
> > > >> > > > > sample source connector who uses EOS? Could always
> piggy-back
> > into
> > > >> > the
> > > >> > > > > TxnProducerIdentity struct with more information such as
> > tasks. If
> > > >> > > > > this is something to support in near term, an abstract type
> > called
> > > >> > > > > "Resource" could be provided and let topic partition and
> > connect
> > > >> task
> > > >> > > > > implement it.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > >
> > > >> > > > > > Guozhang
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma <
> > ism...@juma.me.uk>
> > > >> > > wrote:
> > > >> > > > > >
> > > >> > > > > > > Hi Boyang,
> > > >> > > > > > >
> > > >> > > > > > > Thanks for the KIP. It's good that we listed a number of
> > > >> rejected
> > > >> > > > > > > alternatives. It would be helpful to have an explanation
> > of
> > > >> why
> > > >> > > they
> > > >> > > > > were
> > > >> > > > > > > rejected.
> > > >> > > > > > >
> > > >> > > > > > > Ismael
> > > >> > > > > > >
> > > >> > > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen <
> > > >> bche...@outlook.com
> > > >> > >
> > > >> > > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > Hey all,
> > > >> > > > > > > >
> > > >> > > > > > > > I would like to start a discussion for KIP-447:
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > >> > > > > > > >
> > > >> > > > > > > > this is a work originated by Jason Gustafson and we
> > would
> > > >> like
> > > >> > to
> > > >> > > > > > proceed
> > > >> > > > > > > > into discussion stage.
> > > >> > > > > > > >
> > > >> > > > > > > > Let me know your thoughts, thanks!
> > > >> > > > > > > >
> > > >> > > > > > > > Boyang
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > --
> > > >> > > > > > -- Guozhang
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > -- Guozhang
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to