Mikeal, When you use `through(..)` topics are not created by KafkaStreams. You need to create them yourself before you run the application.
Thanks, Damian On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist <hoegqv...@gmail.com> wrote: > Yes, the naming is not an issue. > > I've tested this with the topology described earlier. Every time I start > the topology with a call to .through() that references a topic that does > not exist, I get an exception from the UncaughtExceptionHandler: > > Uncaught exception org.apache.kafka.streams.errors.StreamsException: Topic > not found during partition assignment: words-count-changelog > > This happens when .through("words-count-changelog", "count") is part of the > topology. The topology is also not forwarding anything to that topic/store. > After restarting the application it works fine. > > Are the changelog topics created via, for example, .aggregate() different > to topics auto created via .through()? > > Thanks, > Mikael > > On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax <matth...@confluent.io> > wrote: > > > > 1) Create a state store AND the changelog > > > topic 2) follow the Kafka Streams naming convention for changelog > topics. > > > Basically, I want to have a method that does what .through() is > supposed > > to > > > do according to the documentation, but without the "topic" parameter. > > > > I understand what you are saying, but you can get this done right now, > > too. If you use through(...) you will get the store. And you can just > > specify the topic name as "applicationId-storeName-changelog" to follow > > the naming convention Streams used internally. What is the problem using > > this approach (besides that you have to provide the topic name which > > seems not to be a big burden to me?) > > > > > > -Matthias > > > > > > On 11/23/16 8:59 AM, Mikael Högqvist wrote: > > > Hi Michael, > > > > > > thanks for the extensive explanation, and yes it definitely helps with > my > > > understanding of through(). :) > > > > > > You guessed correctly that I'm doing some "shenanings" where I'm trying > > to > > > derive the changelog of a state store from the state store name. This > > works > > > perfectly fine with with a naming convention for the topics and by > > creating > > > them in Kafka upfront. > > > > > > My point is that it would help me (and maybe others), if the API of > > KTable > > > was extended to have a new method that does two things that is not part > > of > > > the implementation of .through(). 1) Create a state store AND the > > changelog > > > topic 2) follow the Kafka Streams naming convention for changelog > topics. > > > Basically, I want to have a method that does what .through() is > supposed > > to > > > do according to the documentation, but without the "topic" parameter. > > > > > > What do you think, would it be possible to extend the API with a method > > > like that? > > > > > > Thanks, > > > Mikael > > > > > > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <mich...@confluent.io> > > wrote: > > > > > >> Mikael, > > >> > > >> regarding your second question: > > >> > > >>> 2) Regarding the use case, the topology looks like this: > > >>> > > >>> .stream(...) > > >>> .aggregate(..., "store-1") > > >>> .mapValues(...) > > >>> .through(..., "store-2") > > >> > > >> The last operator above would, without "..." ellipsis, be sth like > > >> `KTable#through("through-topic", "store-2")`. Here, "through-topic" > is > > the > > >> changelog topic for both the KTable and the state store "store-2". So > > this > > >> is the changelog topic name that you want to know. > > >> > > >> - If you want the "through" topic to have a `-changelog` suffix, then > > you'd > > >> need to add that yourself in the call to `through(...)`. > > >> > > >> - If you wonder why `through()` doesn't add a `-changelog` suffix > > >> automatically: That's because `through()` -- like `to()` or > `stream()`, > > >> `table()` -- require you to explicitly provide a topic name, and of > > course > > >> Kafka will use exactly this name. (FWIW, the `-changelog` suffix is > > only > > >> added when Kafka creates internal changelog topics behind the scenes > for > > >> you.) Unfortunately, the javadocs of `KTable#through()` is incorrect > > >> because it refers to `-changelog`; we'll fix that as mentioned above. > > >> > > >> - Also, in case you want to do some shenanigans (like for some tooling > > >> you're building around state stores/changelogs/interactive queries) > such > > >> detecting all state store changelogs by doing the equivalent of `ls > > >> *-changelog`, then this will miss changelogs of KTables that are > > created by > > >> `through()` and `to()` (unless you come up with a naming convention > that > > >> your tooling can assume to be in place, e.g. by always adding > > `-changelog` > > >> to topic names when you call `through()`). > > >> > > >> I hope this helps! > > >> Michael > > >> > > >> > > >> > > >> > > >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <hoegqv...@gmail.com > > > > >> wrote: > > >> > > >>> Hi Eno, > > >>> > > >>> 1) Great :) > > >>> > > >>> 2) Yes, we are using the Interactive Queries to access the state > > stores. > > >> In > > >>> addition, we access the changelogs to subscribe to updates. For this > > >> reason > > >>> we need to know the changelog topic name. > > >>> > > >>> Thanks, > > >>> Mikael > > >>> > > >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <eno.there...@gmail.com > > > > >>> wrote: > > >>> > > >>>> HI Mikael, > > >>>> > > >>>> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is > > >> looking > > >>>> into fixing it. I agree that it can be confusing to have topic names > > >> that > > >>>> are not what one would expect. > > >>>> > > >>>> 2) If your goal is to query/read from the state stores, you can use > > >>>> Interactive Queries to do that (you don't need to worry about the > > >>> changelog > > >>>> topic name and such). Interactive Queries is a new feature in 0.10.1 > > >>> (blog > > >>>> here: > > >>>> https://www.confluent.io/blog/unifying-stream-processing- > > >>> and-interactive-queries-in-apache-kafka/ > > >>>> < > > >>>> https://www.confluent.io/blog/unifying-stream-processing- > > >>> and-interactive-queries-in-apache-kafka/ > > >>>>> ). > > >>>> > > >>>> Thanks > > >>>> Eno > > >>>> > > >>>> > > >>>>> On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqv...@gmail.com> > > >> wrote: > > >>>>> > > >>>>> Sorry for being unclear, i'll try again :) > > >>>>> > > >>>>> 1) The JavaDoc for through is not correct, it states that a > changelog > > >>>> topic > > >>>>> will be created for the state store. That is, if I would call it > with > > >>>>> through("topic", "a-store"), I would expect a kafka topic > > >>>>> "my-app-id-a-store-changelog" to be created. > > >>>>> > > >>>>> 2) Regarding the use case, the topology looks like this: > > >>>>> > > >>>>> .stream(...) > > >>>>> .aggregate(..., "store-1") > > >>>>> .mapValues(...) > > >>>>> .through(..., "store-2") > > >>>>> > > >>>>> Basically, I want to materialize both the result from the aggregate > > >>>> method > > >>>>> and the result from mapValues, which is materialized using > > >> .through(). > > >>>>> Later, I will access both the tables (store-1 and store-2) to a) > get > > >>> the > > >>>>> current state of the aggregate, b) subscribe to future updates. > This > > >>>> works > > >>>>> just fine. The only issue is that I assumed to have a changelog > topic > > >>> for > > >>>>> store-2 created automatically, which didnt happen. > > >>>>> > > >>>>> Since I want to access the changelog topic, it helps if the naming > is > > >>>>> consistent. So either we enforce the same naming pattern as kafka > > >> when > > >>>>> calling .through() or alternatively the Kafka Streams API can > > >> provide a > > >>>>> method to materialize tables which creates a topic name according > to > > >>> the > > >>>>> naming pattern. E.g. .through() without the topic parameter. > > >>>>> > > >>>>> What do you think? > > >>>>> > > >>>>> Best, > > >>>>> Mikael > > >>>>> > > >>>>> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax < > > >> matth...@confluent.io > > >>>> > > >>>>> wrote: > > >>>>> > > >>>>>> I cannot completely follow what want to achieve. > > >>>>>> > > >>>>>> However, the JavaDoc for through() seems not to be correct to me. > > >>> Using > > >>>>>> through() will not create an extra internal changelog topic with > the > > >>>>>> described naming schema, because the topic specified in through() > > >> can > > >>> be > > >>>>>> used for this (there is no point in duplicating the data). > > >>>>>> > > >>>>>> If you have a KTable and apply a mapValues(), this will not write > > >> data > > >>>>>> to any topic. The derived KTable is in-memory because you can > easily > > >>>>>> recreate it from its base KTable. > > >>>>>> > > >>>>>> What is the missing part you want to get? > > >>>>>> > > >>>>>> Btw: the internally created changelog topics are only used for > > >>> recovery > > >>>>>> in case of failure. Streams does not consumer from those topic > > >> during > > >>>>>> "normal operation". > > >>>>>> > > >>>>>> > > >>>>>> -Matthias > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> On 11/22/16 1:59 AM, Mikael Högqvist wrote: > > >>>>>>> Hi, > > >>>>>>> > > >>>>>>> in the documentation for KTable#through, it is stated that a new > > >>>>>> changelog > > >>>>>>> topic will be created for the table. It also states that calling > > >>>> through > > >>>>>> is > > >>>>>>> equivalent to calling #to followed by KStreamBuilder#table. > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/ > > >>> streams/kstream/KTable.html#through(org.apache.kafka. > > >>> common.serialization.Serde,%20org.apache.kafka.common. > > >>> serialization.Serde,%20java.lang.String,%20java.lang.String) > > >>>>>>> > > >>>>>>> In the docs for KStreamBuilder#table it is stated that no new > > >>> changelog > > >>>>>>> topic will be created since the underlying topic acts as the > > >>> changelog. > > >>>>>>> I've verified that this is the case. > > >>>>>>> > > >>>>>>> Is there another API method to materialize the results of a > KTable > > >>>>>>> including a changelog, i.e. such that kafka streams creates the > > >> topic > > >>>> and > > >>>>>>> uses the naming schema for changelog topics? The use case I have > in > > >>>> mind > > >>>>>> is > > >>>>>>> aggregate followed by mapValues. > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Mikael > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>> > > >>>> > > >>> > > >> > > > > > > > >