Actually, after a second thought, I think it actually makes sense to support auto upgrade through admin client to help use get api version from broker. A draft KIP is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
Boyang On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen <reluctanthero...@gmail.com> wrote: > Thank you Guozhang, some of my understandings are inline below. > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson <ja...@confluent.io> > wrote: > >> > >> > I think co-locating does have some merits here, i.e. letting the >> > ConsumerCoordinator which has the source-of-truth of assignment to act >> as >> > the TxnCoordinator as well; but I agree there's also some cons of >> coupling >> > them together. I'm still a bit inclining towards colocation but if there >> > are good rationales not to do so I can be convinced as well. >> >> >> The good rationale is that we have no mechanism to colocate partitions ;). >> Are you suggesting we store the group and transaction state in the same >> log? Can you be more concrete about the benefit? >> >> -Jason >> >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> > Hi Boyang, >> > >> > 1. One advantage of retry against on-hold is that it will not tie-up a >> > handler thread (of course the latter could do the same but that involves >> > using a purgatory which is more complicated), and also it is less >> likely to >> > violate request timeout. So I think there are some rationales to prefer >> > retries. >> > >> > That sounds fair to me, also we are avoiding usage of another purgatory > instance. Usually for one back-off > we are only delaying 50ms during startup which is trivial cost. This > behavior shouldn't be changed. > > > 2. Regarding "ConsumerRebalanceListener": both ConsumerRebalanceListener >> > and PartitionAssignors are user-customizable modules, and only >> difference >> > is that the former is specified via code and the latter is specified via >> > config. >> > >> > Regarding Jason's proposal of ConsumerAssignment, one thing to note >> though >> > with KIP-429 the onPartitionAssigned may not be called if the assignment >> > does not change, whereas onAssignment would always be called at the end >> of >> > sync-group response. My proposed semantics is that >> > `RebalanceListener#onPartitionsXXX` are used for notifications to user, >> and >> > hence if there's no changes these will not be called, whereas >> > `PartitionAssignor` is used for assignor logic, whose callback would >> always >> > be called no matter if the partitions have changed or not. >> >> I think a third option is to gracefully expose generation id as part of > consumer API, so that we don't need to > bother overloading various callbacks. Of course, this builds upon the > assumption that topic partitions > will not be included in new initTransaction API. > > > 3. I feel it is a bit awkward to let the TxnCoordinator keeping partition >> > assignments since it is sort of taking over the job of the >> > ConsumerCoordinator, and may likely cause a split-brain problem as two >> > coordinators keep a copy of this assignment which may be different. >> > >> > I think co-locating does have some merits here, i.e. letting the >> > ConsumerCoordinator which has the source-of-truth of assignment to act >> as >> > the TxnCoordinator as well; but I agree there's also some cons of >> coupling >> > them together. I'm still a bit inclining towards colocation but if there >> > are good rationales not to do so I can be convinced as well. >> > >> > The purpose of co-location is to let txn coordinator see the group > assignment. This priority is weakened > when we already have defense on the consumer offset fetch, so I guess it's > not super important anymore. > > >> > 4. I guess I'm preferring the philosophy of "only add configs if >> there's no >> > other ways", since more and more configs would make it less and less >> > intuitive out of the box to use. >> > >> > I think it's a valid point that checks upon starting up does not cope >> with >> > brokers downgrading but even with a config, but it is still hard for >> users >> > to determine when they can be ensured the broker would never downgrade >> > anymore and hence can safely switch the config. So my feeling is that >> this >> > config would not be helping too much still. If we want to be at the >> safer >> > side, then I'd suggest we modify the Coordinator -> NetworkClient >> hierarchy >> > to allow the NetworkClient being able to pass the APIVersion metadata to >> > Coordinator, so that Coordinator can rely on that logic to change its >> > behavior dynamically. >> > The stream thread init could not be supported by a client coordinator > behavior change on the fly, > we are only losing possibilities after we initialized. (main thread gets > exit and no thread has global picture anymore) > If we do want to support auto version detection, admin client request in > this sense shall be easier. > > >> > >> > 5. I do not have a concrete idea about how the impact on Connect would >> > make, maybe Randall or Konstantine can help here? >> > >> > Sounds good, let's see their thoughts. > > >> > Guozhang >> > >> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen < >> reluctanthero...@gmail.com> >> > wrote: >> > >> > > Hey Jason, >> > > >> > > thank you for the proposal here. Some of my thoughts below. >> > > >> > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson <ja...@confluent.io> >> > > wrote: >> > > >> > > > Hi Boyang, >> > > > >> > > > Thanks for picking this up! Still reading through the updates, but >> here >> > > are >> > > > a couple initial comments on the APIs: >> > > > >> > > > 1. The `TxnProducerIdentity` class is a bit awkward. I think we are >> > > trying >> > > > to encapsulate state from the current group assignment. Maybe >> something >> > > > like `ConsumerAssignment` would be clearer? If we make the usage >> > > consistent >> > > > across the consumer and producer, then we can avoid exposing >> internal >> > > state >> > > > like the generationId. >> > > > >> > > > For example: >> > > > >> > > > // Public API >> > > > interface ConsumerAssignment { >> > > > Set<TopicPartition> partittions(); >> > > > } >> > > > >> > > > // Not a public API >> > > > class InternalConsumerAssignment implements ConsumerAssignment { >> > > > Set<TopicPartition> partittions; >> > > > int generationId; >> > > > } >> > > > >> > > > Then we can change the rebalance listener to something like this: >> > > > onPartitionsAssigned(ConsumerAssignment assignment) >> > > > >> > > > And on the producer: >> > > > void initTransactions(String groupId, ConsumerAssignment >> assignment); >> > > > >> > > > 2. Another bit of awkwardness is the fact that we have to pass the >> > > groupId >> > > > through both initTransactions() and sendOffsetsToTransaction(). We >> > could >> > > > consider a config instead. Maybe something like ` >> > transactional.group.id >> > > `? >> > > > Then we could simplify the producer APIs, potentially even >> deprecating >> > > the >> > > > current sendOffsetsToTransaction. In fact, for this new usage, the ` >> > > > transational.id` config is not needed. It would be nice if we don't >> > have >> > > > to >> > > > provide it. >> > > > >> > > > I like the idea of combining 1 and 2. We could definitely pass in a >> > > group.id config >> > > so that we could avoid exposing that information in a public API. The >> > > question I have >> > > is that whether we should name the interface `GroupAssignment` >> instead, >> > so >> > > that Connect later >> > > could also extend on the same interface, just to echo Guozhang's point >> > > here, Also the base interface >> > > is better to be defined empty for easy extension, or define an >> abstract >> > > type called `Resource` to be shareable >> > > later IMHO. >> > > >> > > >> > > > By the way, I'm a bit confused about discussion above about >> colocating >> > > the >> > > > txn and group coordinators. That is not actually necessary, is it? >> > > > >> > > > Yes, this is not a requirement for this KIP, because it is >> inherently >> > > impossible to >> > > achieve co-locating topic partition of transaction log and consumed >> > offset >> > > topics. >> > > >> > > >> > > > Thanks, >> > > > Jason >> > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen < >> reluctanthero...@gmail.com >> > > >> > > > wrote: >> > > > >> > > > > Thank you Ismael for the suggestion. We will attempt to address >> it by >> > > > > giving more details to rejected alternative section. >> > > > > >> > > > > >> > > > > Thank you for the comment Guozhang! Answers are inline below. >> > > > > >> > > > > >> > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang <wangg...@gmail.com >> > >> > > > wrote: >> > > > > >> > > > > > Hello Boyang, >> > > > > > >> > > > > > Thanks for the KIP, I have some comments below: >> > > > > > >> > > > > > 1. "Once transactions are complete, the call will return." This >> > seems >> > > > > > different from the existing behavior, in which we would return a >> > > > > retriable >> > > > > > CONCURRENT_TRANSACTIONS and let the client to retry, is this >> > > > intentional? >> > > > > > >> > > > > >> > > > > I don’t think it is intentional, and I will defer this question to >> > > Jason >> > > > > when he got time to answer since from what I understood retry and >> on >> > > hold >> > > > > seem both valid approaches. >> > > > > >> > > > > >> > > > > > 2. "an overload to onPartitionsAssigned in the consumer's >> rebalance >> > > > > > listener interface": as part of KIP-341 we've already add this >> > > > > information >> > > > > > to the onAssignment callback. Would this be sufficient? Or more >> > > > generally >> > > > > > speaking, which information have to be passed around in >> rebalance >> > > > > callback >> > > > > > while others can be passed around in PartitionAssignor >> callback? In >> > > > > Streams >> > > > > > for example both callbacks are used but most critical >> information >> > is >> > > > > passed >> > > > > > via onAssignment. >> > > > > > >> > > > > >> > > > > We still need to extend ConsumerRebalanceListener because it’s the >> > > > > interface we could have public access to. The #onAssignment call >> is >> > > > defined >> > > > > on PartitionAssignor level which is not easy to work with external >> > > > > producers. >> > > > > >> > > > > >> > > > > > 3. "We propose to use a separate record type in order to store >> the >> > > > group >> > > > > > assignment.": hmm, I thought with the third typed >> FindCoordinator, >> > > the >> > > > > same >> > > > > > broker that act as the consumer coordinator would always be >> > selected >> > > > as >> > > > > > the txn coordinator, in which case it can access its local cache >> > > > > metadata / >> > > > > > offset topic to get this information already? We just need to >> think >> > > > about >> > > > > > how to make these two modules directly exchange information >> without >> > > > > messing >> > > > > > up the code hierarchy. >> > > > > > >> > > > > >> > > > > These two coordinators will be on the same broker only when >> number of >> > > > > partitions for transaction state topic and consumer offset topic >> are >> > > the >> > > > > same. This normally holds true, but I'm afraid >> > > > > we couldn't make this assumption? >> > > > > >> > > > > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it seems the >> > goal >> > > of >> > > > > > this config is just to avoid old-versioned broker to not be >> able to >> > > > > > recognize newer versioned client. I think if we can do something >> > else >> > > > to >> > > > > > avoid this config though, for example we can use the embedded >> > > > AdminClient >> > > > > > to send the APIVersion request upon starting up, and based on >> the >> > > > > returned >> > > > > > value decides whether to go to the old code path or the new >> > behavior. >> > > > > > Admittedly asking a random broker about APIVersion does not >> > guarantee >> > > > the >> > > > > > whole cluster's versions, but what we can do is to first 1) find >> > the >> > > > > > coordinator (and if the random broker does not even recognize >> the >> > new >> > > > > > discover type, fall back to old path directly), and then 2) ask >> the >> > > > > > discovered coordinator about its supported APIVersion. >> > > > > > >> > > > > >> > > > > The caveat here is that we have to make sure both the group >> > coordinator >> > > > and >> > > > > transaction coordinator are on the latest version during init >> stage. >> > > This >> > > > > is potentially doable as we only need a consumer group.id >> > > > > to check that. In the meantime, a hard-coded config is still a >> > > favorable >> > > > > backup in case the server has downgraded, so you will want to use >> a >> > new >> > > > > version client without `consumer group` transactional support. >> > > > > >> > > > > 5. This is a meta question: have you considered how this can be >> > applied >> > > > to >> > > > > > Kafka Connect as well? For example, for source connectors, the >> > > > assignment >> > > > > > is not by "partitions", but by some other sort of "resources" >> based >> > > on >> > > > > the >> > > > > > source systems, how KIP-447 would affect Kafka Connectors that >> > > > > implemented >> > > > > > EOS as well? >> > > > > > >> > > > > >> > > > > No, it's not currently included in the scope. Could you point me >> to a >> > > > > sample source connector who uses EOS? Could always piggy-back into >> > the >> > > > > TxnProducerIdentity struct with more information such as tasks. If >> > > > > this is something to support in near term, an abstract type called >> > > > > "Resource" could be provided and let topic partition and connect >> task >> > > > > implement it. >> > > > > >> > > > > >> > > > > > >> > > > > > Guozhang >> > > > > > >> > > > > > >> > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma <ism...@juma.me.uk> >> > > wrote: >> > > > > > >> > > > > > > Hi Boyang, >> > > > > > > >> > > > > > > Thanks for the KIP. It's good that we listed a number of >> rejected >> > > > > > > alternatives. It would be helpful to have an explanation of >> why >> > > they >> > > > > were >> > > > > > > rejected. >> > > > > > > >> > > > > > > Ismael >> > > > > > > >> > > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen < >> bche...@outlook.com >> > > >> > > > > wrote: >> > > > > > > >> > > > > > > > Hey all, >> > > > > > > > >> > > > > > > > I would like to start a discussion for KIP-447: >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics >> > > > > > > > >> > > > > > > > this is a work originated by Jason Gustafson and we would >> like >> > to >> > > > > > proceed >> > > > > > > > into discussion stage. >> > > > > > > > >> > > > > > > > Let me know your thoughts, thanks! >> > > > > > > > >> > > > > > > > Boyang >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > -- Guozhang >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > -- >> > -- Guozhang >> > >> >