Here is what I'm thinking about this trade-off: we want to fail fast when brokers do not yet support the requested API version, with the cost that we need to do this one-time thing with an expensed NetworkClient plus a bit longer starting up latency. Here are a few different options:
1) we ask all the brokers: this is one extreme of the trade-off, we still need to handle UnsupportedApiVersionsException since brokers can downgrade. 2) we ask a random broker: this is what we did, and a bit weaker than 1) but saves on latency. 3) we do not ask anyone: this is the other extreme of the trade-off. To me I think 1) is an overkill, so I did not include that in my proposals. Between 2) and 3) I'm slightly preferring 3) since even under this case we are sort of failing fast because we will throw the exception at the first rebalance, but I can still see the value of option 2). Ideally we can have some APIs from AdminClient to check API versions but this does not exist today and I do not want to drag too long with growing scope on this KIP, so the current proposal's implementation uses expendable Network which is a bit fuzzy. Guozhang On Tue, Nov 7, 2017 at 2:07 AM, Matthias J. Sax <matth...@confluent.io> wrote: > I would prefer to keep the current check. We could even improve it, and > do the check to more than one brokers (even all provided > bootstrap.servers) or some random servers after we got all meta data > about the cluster. > > > -Matthias > > On 11/7/17 1:01 AM, Guozhang Wang wrote: > > Hello folks, > > > > One update I'd like to propose regarding "compatibility checking": > > currently we create a single StreamsKafkaClient at the beginning to issue > > an ApiVersionsRequest to a random broker and then check on its versions, > > and fail if it does not satisfy the version (0.10.1+ without EOS, 0.11.0 > > with EOS); after this check we throw this client away. My original plan > is > > to replace this logic with the NetworkClient's own apiVersions, but after > > some digging while working on the PR I found that exposing this > apiVersions > > variable from NetworkClient through AdminClient is not very straight > > forward, plus it would need an API change on the AdminClient itself as > well > > to expose the versions information. > > > > On the other hand, this one-time compatibility checking's benefit may be > > not significant: 1) it asks a random broker upon starting up, and hence > > does not guarantee all broker's support the corresponding API versions at > > that time; 2) brokers can still be upgraded / downgraded after the > streams > > app is up and running, and hence we still need to handle > > UnsupportedVersionExceptions thrown from the producer / consumer / admin > > client during the runtime anyways. > > > > So I'd like to propose two options in this KIP: > > > > 1) we remove this one-time compatibility check on Streams starting up in > > this KIP, and solely rely on handling producer / consumer / admin > client's > > API UnsupportedVersionException throwable exceptions. Please share your > > thoughts about this. > > > > 2) we create a one-time NetworkClient upon starting up, send the > > ApiVersionsRequest and get the response and do the checking; after that > > throw this client away. > > > > Please let me know what do you think. Thanks! > > > > > > Guozhang > > > > > > On Mon, Nov 6, 2017 at 7:56 AM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Thanks for the update and clarification. > >> > >> Sounds good to me :) > >> > >> > >> -Matthias > >> > >> > >> > >> On 11/6/17 12:16 AM, Guozhang Wang wrote: > >>> Thanks Matthias, > >>> > >>> 1) Updated the KIP page to include KAFKA-6126. > >>> 2) For passing configs, I agree, will make a pass over the existing > >> configs > >>> passed to StreamsKafkaClient and update the wiki page accordingly, to > >>> capture all changes that would happen for the replacement in this > single > >>> KIP. > >>> 3) For internal topic purging, I'm not sure if we need to include this > >> as a > >>> public change since internal topics are meant for abstracted away from > >> the > >>> Streams users; they should not leverage such internal topics elsewhere > >>> themselves. The only thing I can think of is for Kafka operators this > >> would > >>> mean that such internal topics would be largely reduced in their > >> footprint, > >>> but that would not be needed in the KIP as well. > >>> > >>> > >>> Guozhang > >>> > >>> > >>> On Sat, Nov 4, 2017 at 9:00 AM, Matthias J. Sax <matth...@confluent.io > > > >>> wrote: > >>> > >>>> I like this KIP. Can you also link to > >>>> https://issues.apache.org/jira/browse/KAFKA-6126 in the KIP? > >>>> > >>>> What I am wondering though: if we start to partially (ie, step by > step) > >>>> replace the existing StreamsKafkaClient with Java AdminClient, don't > we > >>>> need more KIPs? For example, if we use purge-api for internal topics, > it > >>>> seems like a change that requires a KIP. Similar for passing configs > -- > >>>> the old client might have different config than the old client? Can we > >>>> double check this? > >>>> > >>>> Thus, it might make sense to replace the old client with the new one > in > >>>> one shot. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 11/4/17 4:01 AM, Ted Yu wrote: > >>>>> Looks good overall. > >>>>> > >>>>> bq. the creation within StreamsPartitionAssignor > >>>>> > >>>>> Typo above: should be StreamPartitionAssignor > >>>>> > >>>>> On Fri, Nov 3, 2017 at 4:49 PM, Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Hello folks, > >>>>>> > >>>>>> I have filed a new KIP on adding AdminClient into Streams for > internal > >>>>>> topic management. > >>>>>> > >>>>>> Looking for feedback on > >>>>>> > >>>>>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier > >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>> 220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier>* > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>> > >>>> > >>> > >>> > >> > >> > > > > > > -- -- Guozhang