Actually, after a second thought, I think it actually makes sense to
support auto upgrade through admin client to help use get api version from
broker.
A draft KIP is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client

Boyang

On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Thank you Guozhang, some of my understandings are inline below.
>
> On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson <ja...@confluent.io>
> wrote:
>
>> >
>> > I think co-locating does have some merits here, i.e. letting the
>> > ConsumerCoordinator which has the source-of-truth of assignment to act
>> as
>> > the TxnCoordinator as well; but I agree there's also some cons of
>> coupling
>> > them together. I'm still a bit inclining towards colocation but if there
>> > are good rationales not to do so I can be convinced as well.
>>
>>
>> The good rationale is that we have no mechanism to colocate partitions ;).
>> Are you suggesting we store the group and transaction state in the same
>> log? Can you be more concrete about the benefit?
>>
>> -Jason
>>
>> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>> > Hi Boyang,
>> >
>> > 1. One advantage of retry against on-hold is that it will not tie-up a
>> > handler thread (of course the latter could do the same but that involves
>> > using a purgatory which is more complicated), and also it is less
>> likely to
>> > violate request timeout. So I think there are some rationales to prefer
>> > retries.
>> >
>>
>  That sounds fair to me, also we are avoiding usage of another purgatory
> instance. Usually for one back-off
> we are only delaying 50ms during startup which is trivial cost. This
> behavior shouldn't be changed.
>
> > 2. Regarding "ConsumerRebalanceListener": both ConsumerRebalanceListener
>> > and PartitionAssignors are user-customizable modules, and only
>> difference
>> > is that the former is specified via code and the latter is specified via
>> > config.
>> >
>> > Regarding Jason's proposal of ConsumerAssignment, one thing to note
>> though
>> > with KIP-429 the onPartitionAssigned may not be called if the assignment
>> > does not change, whereas onAssignment would always be called at the end
>> of
>> > sync-group response. My proposed semantics is that
>> > `RebalanceListener#onPartitionsXXX` are used for notifications to user,
>> and
>> > hence if there's no changes these will not be called, whereas
>> > `PartitionAssignor` is used for assignor logic, whose callback would
>> always
>> > be called no matter if the partitions have changed or not.
>>
>> I think a third option is to gracefully expose generation id as part of
> consumer API, so that we don't need to
> bother overloading various callbacks. Of course, this builds upon the
> assumption that topic partitions
> will not be included in new initTransaction API.
>
> > 3. I feel it is a bit awkward to let the TxnCoordinator keeping partition
>> > assignments since it is sort of taking over the job of the
>> > ConsumerCoordinator, and may likely cause a split-brain problem as two
>> > coordinators keep a copy of this assignment which may be different.
>> >
>> > I think co-locating does have some merits here, i.e. letting the
>> > ConsumerCoordinator which has the source-of-truth of assignment to act
>> as
>> > the TxnCoordinator as well; but I agree there's also some cons of
>> coupling
>> > them together. I'm still a bit inclining towards colocation but if there
>> > are good rationales not to do so I can be convinced as well.
>> >
>>
> The purpose of co-location is to let txn coordinator see the group
> assignment. This priority is weakened
> when we already have defense on the consumer offset fetch, so I guess it's
> not super important anymore.
>
>
>> > 4. I guess I'm preferring the philosophy of "only add configs if
>> there's no
>> > other ways", since more and more configs would make it less and less
>> > intuitive out of the box to use.
>> >
>> > I think it's a valid point that checks upon starting up does not cope
>> with
>> > brokers downgrading but even with a config, but it is still hard for
>> users
>> > to determine when they can be ensured the broker would never downgrade
>> > anymore and hence can safely switch the config. So my feeling is that
>> this
>> > config would not be helping too much still. If we want to be at the
>> safer
>> > side, then I'd suggest we modify the Coordinator -> NetworkClient
>> hierarchy
>> > to allow the NetworkClient being able to pass the APIVersion metadata to
>> > Coordinator, so that Coordinator can rely on that logic to change its
>> > behavior dynamically.
>>
> The stream thread init could not be supported by a client coordinator
> behavior change on the fly,
> we are only losing possibilities after we initialized. (main thread gets
> exit and no thread has global picture anymore)
> If we do want to support auto version detection, admin client request in
> this sense shall be easier.
>
>
>> >
>> > 5. I do not have a concrete idea about how the impact on Connect would
>> > make, maybe Randall or Konstantine can help here?
>> >
>>
> Sounds good, let's see their thoughts.
>
>
>> > Guozhang
>> >
>> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen <
>> reluctanthero...@gmail.com>
>> > wrote:
>> >
>> > > Hey Jason,
>> > >
>> > > thank you for the proposal here. Some of my thoughts below.
>> > >
>> > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson <ja...@confluent.io>
>> > > wrote:
>> > >
>> > > > Hi Boyang,
>> > > >
>> > > > Thanks for picking this up! Still reading through the updates, but
>> here
>> > > are
>> > > > a couple initial comments on the APIs:
>> > > >
>> > > > 1. The `TxnProducerIdentity` class is a bit awkward. I think we are
>> > > trying
>> > > > to encapsulate state from the current group assignment. Maybe
>> something
>> > > > like `ConsumerAssignment` would be clearer? If we make the usage
>> > > consistent
>> > > > across the consumer and producer, then we can avoid exposing
>> internal
>> > > state
>> > > > like the generationId.
>> > > >
>> > > > For example:
>> > > >
>> > > > // Public API
>> > > > interface ConsumerAssignment {
>> > > >   Set<TopicPartition> partittions();
>> > > > }
>> > > >
>> > > > // Not a public API
>> > > > class InternalConsumerAssignment implements ConsumerAssignment {
>> > > >   Set<TopicPartition> partittions;
>> > > >   int generationId;
>> > > > }
>> > > >
>> > > > Then we can change the rebalance listener to something like this:
>> > > > onPartitionsAssigned(ConsumerAssignment assignment)
>> > > >
>> > > > And on the producer:
>> > > > void initTransactions(String groupId, ConsumerAssignment
>> assignment);
>> > > >
>> > > > 2. Another bit of awkwardness is the fact that we have to pass the
>> > > groupId
>> > > > through both initTransactions() and sendOffsetsToTransaction(). We
>> > could
>> > > > consider a config instead. Maybe something like `
>> > transactional.group.id
>> > > `?
>> > > > Then we could simplify the producer APIs, potentially even
>> deprecating
>> > > the
>> > > > current sendOffsetsToTransaction. In fact, for this new usage, the `
>> > > > transational.id` config is not needed. It would be nice if we don't
>> > have
>> > > > to
>> > > > provide it.
>> > > >
>> > > > I like the idea of combining 1 and 2. We could definitely pass in a
>> > > group.id config
>> > > so that we could avoid exposing that information in a public API. The
>> > > question I have
>> > > is that whether we should name the interface `GroupAssignment`
>> instead,
>> > so
>> > > that Connect later
>> > > could also extend on the same interface, just to echo Guozhang's point
>> > > here, Also the base interface
>> > > is better to be defined empty for easy extension, or define an
>> abstract
>> > > type called `Resource` to be shareable
>> > > later IMHO.
>> > >
>> > >
>> > > > By the way, I'm a bit confused about discussion above about
>> colocating
>> > > the
>> > > > txn and group coordinators. That is not actually necessary, is it?
>> > > >
>> > > > Yes, this is not a requirement for this KIP, because it is
>> inherently
>> > > impossible to
>> > > achieve co-locating  topic partition of transaction log and consumed
>> > offset
>> > > topics.
>> > >
>> > >
>> > > > Thanks,
>> > > > Jason
>> > > >
>> > > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen <
>> reluctanthero...@gmail.com
>> > >
>> > > > wrote:
>> > > >
>> > > > > Thank you Ismael for the suggestion. We will attempt to address
>> it by
>> > > > > giving more details to rejected alternative section.
>> > > > >
>> > > > >
>> > > > > Thank you for the comment Guozhang! Answers are inline below.
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang <wangg...@gmail.com
>> >
>> > > > wrote:
>> > > > >
>> > > > > > Hello Boyang,
>> > > > > >
>> > > > > > Thanks for the KIP, I have some comments below:
>> > > > > >
>> > > > > > 1. "Once transactions are complete, the call will return." This
>> > seems
>> > > > > > different from the existing behavior, in which we would return a
>> > > > > retriable
>> > > > > > CONCURRENT_TRANSACTIONS and let the client to retry, is this
>> > > > intentional?
>> > > > > >
>> > > > >
>> > > > > I don’t think it is intentional, and I will defer this question to
>> > > Jason
>> > > > > when he got time to answer since from what I understood retry and
>> on
>> > > hold
>> > > > > seem both valid approaches.
>> > > > >
>> > > > >
>> > > > > > 2. "an overload to onPartitionsAssigned in the consumer's
>> rebalance
>> > > > > > listener interface": as part of KIP-341 we've already add this
>> > > > > information
>> > > > > > to the onAssignment callback. Would this be sufficient? Or more
>> > > > generally
>> > > > > > speaking, which information have to be passed around in
>> rebalance
>> > > > > callback
>> > > > > > while others can be passed around in PartitionAssignor
>> callback? In
>> > > > > Streams
>> > > > > > for example both callbacks are used but most critical
>> information
>> > is
>> > > > > passed
>> > > > > > via onAssignment.
>> > > > > >
>> > > > >
>> > > > > We still need to extend ConsumerRebalanceListener because it’s the
>> > > > > interface we could have public access to. The #onAssignment call
>> is
>> > > > defined
>> > > > > on PartitionAssignor level which is not easy to work with external
>> > > > > producers.
>> > > > >
>> > > > >
>> > > > > > 3. "We propose to use a separate record type in order to store
>> the
>> > > > group
>> > > > > > assignment.": hmm, I thought with the third typed
>> FindCoordinator,
>> > > the
>> > > > > same
>> > > > > > broker that act as the  consumer coordinator would always be
>> > selected
>> > > > as
>> > > > > > the txn coordinator, in which case it can access its local cache
>> > > > > metadata /
>> > > > > > offset topic to get this information already? We just need to
>> think
>> > > > about
>> > > > > > how to make these two modules directly exchange information
>> without
>> > > > > messing
>> > > > > > up the code hierarchy.
>> > > > > >
>> > > > >
>> > > > > These two coordinators will be on the same broker only when
>> number of
>> > > > > partitions for transaction state topic and consumer offset topic
>> are
>> > > the
>> > > > > same. This normally holds true, but I'm afraid
>> > > > > we couldn't make this assumption?
>> > > > >
>> > > > > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it seems the
>> > goal
>> > > of
>> > > > > > this config is just to avoid old-versioned broker to not be
>> able to
>> > > > > > recognize newer versioned client. I think if we can do something
>> > else
>> > > > to
>> > > > > > avoid this config though, for example we can use the embedded
>> > > > AdminClient
>> > > > > > to send the APIVersion request upon starting up, and based on
>> the
>> > > > > returned
>> > > > > > value decides whether to go to the old code path or the new
>> > behavior.
>> > > > > > Admittedly asking a random broker about APIVersion does not
>> > guarantee
>> > > > the
>> > > > > > whole cluster's versions, but what we can do is to first 1) find
>> > the
>> > > > > > coordinator (and if the random broker does not even recognize
>> the
>> > new
>> > > > > > discover type, fall back to old path directly), and then 2) ask
>> the
>> > > > > > discovered coordinator about its supported APIVersion.
>> > > > > >
>> > > > >
>> > > > > The caveat here is that we have to make sure both the group
>> > coordinator
>> > > > and
>> > > > > transaction coordinator are on the latest version during init
>> stage.
>> > > This
>> > > > > is potentially doable as we only need a consumer group.id
>> > > > > to check that. In the meantime, a hard-coded config is still a
>> > > favorable
>> > > > > backup in case the server has downgraded, so you will want to use
>> a
>> > new
>> > > > > version client without `consumer group` transactional support.
>> > > > >
>> > > > > 5. This is a meta question: have you considered how this can be
>> > applied
>> > > > to
>> > > > > > Kafka Connect as well? For example, for source connectors, the
>> > > > assignment
>> > > > > > is not by "partitions", but by some other sort of "resources"
>> based
>> > > on
>> > > > > the
>> > > > > > source systems, how KIP-447 would affect Kafka Connectors that
>> > > > > implemented
>> > > > > > EOS as well?
>> > > > > >
>> > > > >
>> > > > > No, it's not currently included in the scope. Could you point me
>> to a
>> > > > > sample source connector who uses EOS? Could always piggy-back into
>> > the
>> > > > > TxnProducerIdentity struct with more information such as tasks. If
>> > > > > this is something to support in near term, an abstract type called
>> > > > > "Resource" could be provided and let topic partition and connect
>> task
>> > > > > implement it.
>> > > > >
>> > > > >
>> > > > > >
>> > > > > > Guozhang
>> > > > > >
>> > > > > >
>> > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma <ism...@juma.me.uk>
>> > > wrote:
>> > > > > >
>> > > > > > > Hi Boyang,
>> > > > > > >
>> > > > > > > Thanks for the KIP. It's good that we listed a number of
>> rejected
>> > > > > > > alternatives. It would be helpful to have an explanation of
>> why
>> > > they
>> > > > > were
>> > > > > > > rejected.
>> > > > > > >
>> > > > > > > Ismael
>> > > > > > >
>> > > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen <
>> bche...@outlook.com
>> > >
>> > > > > wrote:
>> > > > > > >
>> > > > > > > > Hey all,
>> > > > > > > >
>> > > > > > > > I would like to start a discussion for KIP-447:
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
>> > > > > > > >
>> > > > > > > > this is a work originated by Jason Gustafson and we would
>> like
>> > to
>> > > > > > proceed
>> > > > > > > > into discussion stage.
>> > > > > > > >
>> > > > > > > > Let me know your thoughts, thanks!
>> > > > > > > >
>> > > > > > > > Boyang
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > -- Guozhang
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>

Reply via email to