For reference, we have BrokerApiVersionCommand already as a public interface. We have a bit of tech debt at the moment because it uses a custom AdminClient. It would be nice to clean that up. In general, I think it is reasonable to expose from AdminClient. It can be used by management tools to inspect running Kafka versions for example.
-Jason On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen <reluctanthero...@gmail.com> wrote: > Thank you for the context Colin. The groupId was indeed a copy-paste error. > Our use case here for 447 is (Quoted from Guozhang): > ''' > I think if we can do something else to > avoid this config though, for example we can use the embedded AdminClient > to send the APIVersion request upon starting up, and based on the returned > value decides whether to go to the old code path or the new behavior. > ''' > The benefit we get is to avoid adding a new configuration to make a > decision simply base on broker version. If you have concerns with exposing > ApiVersion for client, we could > try to think of alternative solutions too. > > Boyang > > > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe <cmcc...@apache.org> wrote: > > > kafka.api.ApiVersion is an internal class, not suitable to exposing > > through AdminClient. That class is not even accessible without having > the > > broker jars on your CLASSPATH. > > > > Another question is, what is the groupId parameter doing in the call? > The > > API versions are the same no matter what consumer group we use, right? > > Perhaps this was a copy and paste error? > > > > This is not the first time we have discussed having a method in > > AdminClient to retrieve API version information. In fact, the original > KIP > > which created KafkaAdminClient specified an API for fetching version > > information. It was called apiVersions and it is still there on the > wiki. > > See > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations > > > > However, this API wasn't ready in time for 0.11.0 so we shipped without > > it. There was a JIRA to implement it for later versions, > > https://issues.apache.org/jira/browse/KAFKA-5214 , as well as a PR, > > https://github.com/apache/kafka/pull/3012 . However, we started to > > rethink whether this AdminClient function was even necessary. Most of > the > > use-cases we could think of seemed like horrible hacks. So it has never > > really been implemented (yet?). > > > > best, > > Colin > > > > > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote: > > > Actually, after a second thought, I think it actually makes sense to > > > support auto upgrade through admin client to help use get api version > > > from > > > broker. > > > A draft KIP is here: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client > > > > > > Boyang > > > > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen < > reluctanthero...@gmail.com> > > > wrote: > > > > > > > Thank you Guozhang, some of my understandings are inline below. > > > > > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson <ja...@confluent.io > > > > > > wrote: > > > > > > > >> > > > > >> > I think co-locating does have some merits here, i.e. letting the > > > >> > ConsumerCoordinator which has the source-of-truth of assignment to > > act > > > >> as > > > >> > the TxnCoordinator as well; but I agree there's also some cons of > > > >> coupling > > > >> > them together. I'm still a bit inclining towards colocation but if > > there > > > >> > are good rationales not to do so I can be convinced as well. > > > >> > > > >> > > > >> The good rationale is that we have no mechanism to colocate > > partitions ;). > > > >> Are you suggesting we store the group and transaction state in the > > same > > > >> log? Can you be more concrete about the benefit? > > > >> > > > >> -Jason > > > >> > > > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang <wangg...@gmail.com> > > > >> wrote: > > > >> > > > >> > Hi Boyang, > > > >> > > > > >> > 1. One advantage of retry against on-hold is that it will not > > tie-up a > > > >> > handler thread (of course the latter could do the same but that > > involves > > > >> > using a purgatory which is more complicated), and also it is less > > > >> likely to > > > >> > violate request timeout. So I think there are some rationales to > > prefer > > > >> > retries. > > > >> > > > > >> > > > > That sounds fair to me, also we are avoiding usage of another > > purgatory > > > > instance. Usually for one back-off > > > > we are only delaying 50ms during startup which is trivial cost. This > > > > behavior shouldn't be changed. > > > > > > > > > 2. Regarding "ConsumerRebalanceListener": both > > ConsumerRebalanceListener > > > >> > and PartitionAssignors are user-customizable modules, and only > > > >> difference > > > >> > is that the former is specified via code and the latter is > > specified via > > > >> > config. > > > >> > > > > >> > Regarding Jason's proposal of ConsumerAssignment, one thing to > note > > > >> though > > > >> > with KIP-429 the onPartitionAssigned may not be called if the > > assignment > > > >> > does not change, whereas onAssignment would always be called at > the > > end > > > >> of > > > >> > sync-group response. My proposed semantics is that > > > >> > `RebalanceListener#onPartitionsXXX` are used for notifications to > > user, > > > >> and > > > >> > hence if there's no changes these will not be called, whereas > > > >> > `PartitionAssignor` is used for assignor logic, whose callback > would > > > >> always > > > >> > be called no matter if the partitions have changed or not. > > > >> > > > >> I think a third option is to gracefully expose generation id as part > > of > > > > consumer API, so that we don't need to > > > > bother overloading various callbacks. Of course, this builds upon the > > > > assumption that topic partitions > > > > will not be included in new initTransaction API. > > > > > > > > > 3. I feel it is a bit awkward to let the TxnCoordinator keeping > > partition > > > >> > assignments since it is sort of taking over the job of the > > > >> > ConsumerCoordinator, and may likely cause a split-brain problem as > > two > > > >> > coordinators keep a copy of this assignment which may be > different. > > > >> > > > > >> > I think co-locating does have some merits here, i.e. letting the > > > >> > ConsumerCoordinator which has the source-of-truth of assignment to > > act > > > >> as > > > >> > the TxnCoordinator as well; but I agree there's also some cons of > > > >> coupling > > > >> > them together. I'm still a bit inclining towards colocation but if > > there > > > >> > are good rationales not to do so I can be convinced as well. > > > >> > > > > >> > > > > The purpose of co-location is to let txn coordinator see the group > > > > assignment. This priority is weakened > > > > when we already have defense on the consumer offset fetch, so I guess > > it's > > > > not super important anymore. > > > > > > > > > > > >> > 4. I guess I'm preferring the philosophy of "only add configs if > > > >> there's no > > > >> > other ways", since more and more configs would make it less and > less > > > >> > intuitive out of the box to use. > > > >> > > > > >> > I think it's a valid point that checks upon starting up does not > > cope > > > >> with > > > >> > brokers downgrading but even with a config, but it is still hard > for > > > >> users > > > >> > to determine when they can be ensured the broker would never > > downgrade > > > >> > anymore and hence can safely switch the config. So my feeling is > > that > > > >> this > > > >> > config would not be helping too much still. If we want to be at > the > > > >> safer > > > >> > side, then I'd suggest we modify the Coordinator -> NetworkClient > > > >> hierarchy > > > >> > to allow the NetworkClient being able to pass the APIVersion > > metadata to > > > >> > Coordinator, so that Coordinator can rely on that logic to change > > its > > > >> > behavior dynamically. > > > >> > > > > The stream thread init could not be supported by a client coordinator > > > > behavior change on the fly, > > > > we are only losing possibilities after we initialized. (main thread > > gets > > > > exit and no thread has global picture anymore) > > > > If we do want to support auto version detection, admin client request > > in > > > > this sense shall be easier. > > > > > > > > > > > >> > > > > >> > 5. I do not have a concrete idea about how the impact on Connect > > would > > > >> > make, maybe Randall or Konstantine can help here? > > > >> > > > > >> > > > > Sounds good, let's see their thoughts. > > > > > > > > > > > >> > Guozhang > > > >> > > > > >> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen < > > > >> reluctanthero...@gmail.com> > > > >> > wrote: > > > >> > > > > >> > > Hey Jason, > > > >> > > > > > >> > > thank you for the proposal here. Some of my thoughts below. > > > >> > > > > > >> > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson < > > ja...@confluent.io> > > > >> > > wrote: > > > >> > > > > > >> > > > Hi Boyang, > > > >> > > > > > > >> > > > Thanks for picking this up! Still reading through the updates, > > but > > > >> here > > > >> > > are > > > >> > > > a couple initial comments on the APIs: > > > >> > > > > > > >> > > > 1. The `TxnProducerIdentity` class is a bit awkward. I think > we > > are > > > >> > > trying > > > >> > > > to encapsulate state from the current group assignment. Maybe > > > >> something > > > >> > > > like `ConsumerAssignment` would be clearer? If we make the > usage > > > >> > > consistent > > > >> > > > across the consumer and producer, then we can avoid exposing > > > >> internal > > > >> > > state > > > >> > > > like the generationId. > > > >> > > > > > > >> > > > For example: > > > >> > > > > > > >> > > > // Public API > > > >> > > > interface ConsumerAssignment { > > > >> > > > Set<TopicPartition> partittions(); > > > >> > > > } > > > >> > > > > > > >> > > > // Not a public API > > > >> > > > class InternalConsumerAssignment implements > ConsumerAssignment { > > > >> > > > Set<TopicPartition> partittions; > > > >> > > > int generationId; > > > >> > > > } > > > >> > > > > > > >> > > > Then we can change the rebalance listener to something like > > this: > > > >> > > > onPartitionsAssigned(ConsumerAssignment assignment) > > > >> > > > > > > >> > > > And on the producer: > > > >> > > > void initTransactions(String groupId, ConsumerAssignment > > > >> assignment); > > > >> > > > > > > >> > > > 2. Another bit of awkwardness is the fact that we have to pass > > the > > > >> > > groupId > > > >> > > > through both initTransactions() and > sendOffsetsToTransaction(). > > We > > > >> > could > > > >> > > > consider a config instead. Maybe something like ` > > > >> > transactional.group.id > > > >> > > `? > > > >> > > > Then we could simplify the producer APIs, potentially even > > > >> deprecating > > > >> > > the > > > >> > > > current sendOffsetsToTransaction. In fact, for this new usage, > > the ` > > > >> > > > transational.id` config is not needed. It would be nice if we > > don't > > > >> > have > > > >> > > > to > > > >> > > > provide it. > > > >> > > > > > > >> > > > I like the idea of combining 1 and 2. We could definitely pass > > in a > > > >> > > group.id config > > > >> > > so that we could avoid exposing that information in a public > API. > > The > > > >> > > question I have > > > >> > > is that whether we should name the interface `GroupAssignment` > > > >> instead, > > > >> > so > > > >> > > that Connect later > > > >> > > could also extend on the same interface, just to echo Guozhang's > > point > > > >> > > here, Also the base interface > > > >> > > is better to be defined empty for easy extension, or define an > > > >> abstract > > > >> > > type called `Resource` to be shareable > > > >> > > later IMHO. > > > >> > > > > > >> > > > > > >> > > > By the way, I'm a bit confused about discussion above about > > > >> colocating > > > >> > > the > > > >> > > > txn and group coordinators. That is not actually necessary, is > > it? > > > >> > > > > > > >> > > > Yes, this is not a requirement for this KIP, because it is > > > >> inherently > > > >> > > impossible to > > > >> > > achieve co-locating topic partition of transaction log and > > consumed > > > >> > offset > > > >> > > topics. > > > >> > > > > > >> > > > > > >> > > > Thanks, > > > >> > > > Jason > > > >> > > > > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen < > > > >> reluctanthero...@gmail.com > > > >> > > > > > >> > > > wrote: > > > >> > > > > > > >> > > > > Thank you Ismael for the suggestion. We will attempt to > > address > > > >> it by > > > >> > > > > giving more details to rejected alternative section. > > > >> > > > > > > > >> > > > > > > > >> > > > > Thank you for the comment Guozhang! Answers are inline > below. > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang < > > wangg...@gmail.com > > > >> > > > > >> > > > wrote: > > > >> > > > > > > > >> > > > > > Hello Boyang, > > > >> > > > > > > > > >> > > > > > Thanks for the KIP, I have some comments below: > > > >> > > > > > > > > >> > > > > > 1. "Once transactions are complete, the call will return." > > This > > > >> > seems > > > >> > > > > > different from the existing behavior, in which we would > > return a > > > >> > > > > retriable > > > >> > > > > > CONCURRENT_TRANSACTIONS and let the client to retry, is > this > > > >> > > > intentional? > > > >> > > > > > > > > >> > > > > > > > >> > > > > I don’t think it is intentional, and I will defer this > > question to > > > >> > > Jason > > > >> > > > > when he got time to answer since from what I understood > retry > > and > > > >> on > > > >> > > hold > > > >> > > > > seem both valid approaches. > > > >> > > > > > > > >> > > > > > > > >> > > > > > 2. "an overload to onPartitionsAssigned in the consumer's > > > >> rebalance > > > >> > > > > > listener interface": as part of KIP-341 we've already add > > this > > > >> > > > > information > > > >> > > > > > to the onAssignment callback. Would this be sufficient? Or > > more > > > >> > > > generally > > > >> > > > > > speaking, which information have to be passed around in > > > >> rebalance > > > >> > > > > callback > > > >> > > > > > while others can be passed around in PartitionAssignor > > > >> callback? In > > > >> > > > > Streams > > > >> > > > > > for example both callbacks are used but most critical > > > >> information > > > >> > is > > > >> > > > > passed > > > >> > > > > > via onAssignment. > > > >> > > > > > > > > >> > > > > > > > >> > > > > We still need to extend ConsumerRebalanceListener because > > it’s the > > > >> > > > > interface we could have public access to. The #onAssignment > > call > > > >> is > > > >> > > > defined > > > >> > > > > on PartitionAssignor level which is not easy to work with > > external > > > >> > > > > producers. > > > >> > > > > > > > >> > > > > > > > >> > > > > > 3. "We propose to use a separate record type in order to > > store > > > >> the > > > >> > > > group > > > >> > > > > > assignment.": hmm, I thought with the third typed > > > >> FindCoordinator, > > > >> > > the > > > >> > > > > same > > > >> > > > > > broker that act as the consumer coordinator would always > be > > > >> > selected > > > >> > > > as > > > >> > > > > > the txn coordinator, in which case it can access its local > > cache > > > >> > > > > metadata / > > > >> > > > > > offset topic to get this information already? We just need > > to > > > >> think > > > >> > > > about > > > >> > > > > > how to make these two modules directly exchange > information > > > >> without > > > >> > > > > messing > > > >> > > > > > up the code hierarchy. > > > >> > > > > > > > > >> > > > > > > > >> > > > > These two coordinators will be on the same broker only when > > > >> number of > > > >> > > > > partitions for transaction state topic and consumer offset > > topic > > > >> are > > > >> > > the > > > >> > > > > same. This normally holds true, but I'm afraid > > > >> > > > > we couldn't make this assumption? > > > >> > > > > > > > >> > > > > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it > seems > > the > > > >> > goal > > > >> > > of > > > >> > > > > > this config is just to avoid old-versioned broker to not > be > > > >> able to > > > >> > > > > > recognize newer versioned client. I think if we can do > > something > > > >> > else > > > >> > > > to > > > >> > > > > > avoid this config though, for example we can use the > > embedded > > > >> > > > AdminClient > > > >> > > > > > to send the APIVersion request upon starting up, and based > > on > > > >> the > > > >> > > > > returned > > > >> > > > > > value decides whether to go to the old code path or the > new > > > >> > behavior. > > > >> > > > > > Admittedly asking a random broker about APIVersion does > not > > > >> > guarantee > > > >> > > > the > > > >> > > > > > whole cluster's versions, but what we can do is to first > 1) > > find > > > >> > the > > > >> > > > > > coordinator (and if the random broker does not even > > recognize > > > >> the > > > >> > new > > > >> > > > > > discover type, fall back to old path directly), and then > 2) > > ask > > > >> the > > > >> > > > > > discovered coordinator about its supported APIVersion. > > > >> > > > > > > > > >> > > > > > > > >> > > > > The caveat here is that we have to make sure both the group > > > >> > coordinator > > > >> > > > and > > > >> > > > > transaction coordinator are on the latest version during > init > > > >> stage. > > > >> > > This > > > >> > > > > is potentially doable as we only need a consumer group.id > > > >> > > > > to check that. In the meantime, a hard-coded config is > still a > > > >> > > favorable > > > >> > > > > backup in case the server has downgraded, so you will want > to > > use > > > >> a > > > >> > new > > > >> > > > > version client without `consumer group` transactional > support. > > > >> > > > > > > > >> > > > > 5. This is a meta question: have you considered how this can > > be > > > >> > applied > > > >> > > > to > > > >> > > > > > Kafka Connect as well? For example, for source connectors, > > the > > > >> > > > assignment > > > >> > > > > > is not by "partitions", but by some other sort of > > "resources" > > > >> based > > > >> > > on > > > >> > > > > the > > > >> > > > > > source systems, how KIP-447 would affect Kafka Connectors > > that > > > >> > > > > implemented > > > >> > > > > > EOS as well? > > > >> > > > > > > > > >> > > > > > > > >> > > > > No, it's not currently included in the scope. Could you > point > > me > > > >> to a > > > >> > > > > sample source connector who uses EOS? Could always > piggy-back > > into > > > >> > the > > > >> > > > > TxnProducerIdentity struct with more information such as > > tasks. If > > > >> > > > > this is something to support in near term, an abstract type > > called > > > >> > > > > "Resource" could be provided and let topic partition and > > connect > > > >> task > > > >> > > > > implement it. > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > > Guozhang > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma < > > ism...@juma.me.uk> > > > >> > > wrote: > > > >> > > > > > > > > >> > > > > > > Hi Boyang, > > > >> > > > > > > > > > >> > > > > > > Thanks for the KIP. It's good that we listed a number of > > > >> rejected > > > >> > > > > > > alternatives. It would be helpful to have an explanation > > of > > > >> why > > > >> > > they > > > >> > > > > were > > > >> > > > > > > rejected. > > > >> > > > > > > > > > >> > > > > > > Ismael > > > >> > > > > > > > > > >> > > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen < > > > >> bche...@outlook.com > > > >> > > > > > >> > > > > wrote: > > > >> > > > > > > > > > >> > > > > > > > Hey all, > > > >> > > > > > > > > > > >> > > > > > > > I would like to start a discussion for KIP-447: > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics > > > >> > > > > > > > > > > >> > > > > > > > this is a work originated by Jason Gustafson and we > > would > > > >> like > > > >> > to > > > >> > > > > > proceed > > > >> > > > > > > > into discussion stage. > > > >> > > > > > > > > > > >> > > > > > > > Let me know your thoughts, thanks! > > > >> > > > > > > > > > > >> > > > > > > > Boyang > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > -- > > > >> > > > > > -- Guozhang > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > -- > > > >> > -- Guozhang > > > >> > > > > >> > > > > > > > > > >