Mikael, > Sure, I guess the topic is auto-created the first time I start the topology > and the second time its there already. It could be possible to create > topics up front for us, or even use an admin call from inside the code.
Yes, that (i.e. you are running with auto-topic creation enabled) was what I implicitly understood. As covered in [1] we strongly recommend to manually pre-create/manage user topics though. User topics include the source topics that you are reading from (cf. `stream()`, `table()`) but also include the topics you use in `through()` and `to()`. > Today there is .through(topic, store) and > .to(topic), maybe it would be possible to have something like > .materialize(store) which takes care of topic creation? Would adding > something like this require a KIP? There is already work being done in the Admin API (KIP-4), and part of this functionality was released in the latest Kafka versions. You can use this to programmatically create topics, for example. Note though that the work on KIP-4 is not fully completed yet. -Michael [1] http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application On Thu, Nov 24, 2016 at 3:55 PM, Mikael Högqvist <hoegqv...@gmail.com> wrote: > Sure, I guess the topic is auto-created the first time I start the topology > and the second time its there already. It could be possible to create > topics up front for us, or even use an admin call from inside the code. > > That said, as a user, I think it would be great with a function in the > Kafka Streams DSL that would allow me to materialize a KTable without > pre-creating the topic. Today there is .through(topic, store) and > .to(topic), maybe it would be possible to have something like > .materialize(store) which takes care of topic creation? Would adding > something like this require a KIP? > > Best, > Mikael > > On Thu, Nov 24, 2016 at 1:44 PM Damian Guy <damian....@gmail.com> wrote: > > Mikeal, > > When you use `through(..)` topics are not created by KafkaStreams. You need > to create them yourself before you run the application. > > Thanks, > Damian > > On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist <hoegqv...@gmail.com> wrote: > > > Yes, the naming is not an issue. > > > > I've tested this with the topology described earlier. Every time I start > > the topology with a call to .through() that references a topic that does > > not exist, I get an exception from the UncaughtExceptionHandler: > > > > Uncaught exception org.apache.kafka.streams.errors.StreamsException: > Topic > > not found during partition assignment: words-count-changelog > > > > This happens when .through("words-count-changelog", "count") is part of > the > > topology. The topology is also not forwarding anything to that > topic/store. > > After restarting the application it works fine. > > > > Are the changelog topics created via, for example, .aggregate() different > > to topics auto created via .through()? > > > > Thanks, > > Mikael > > > > On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > > 1) Create a state store AND the changelog > > > > topic 2) follow the Kafka Streams naming convention for changelog > > topics. > > > > Basically, I want to have a method that does what .through() is > > supposed > > > to > > > > do according to the documentation, but without the "topic" parameter. > > > > > > I understand what you are saying, but you can get this done right now, > > > too. If you use through(...) you will get the store. And you can just > > > specify the topic name as "applicationId-storeName-changelog" to > follow > > > the naming convention Streams used internally. What is the problem > using > > > this approach (besides that you have to provide the topic name which > > > seems not to be a big burden to me?) > > > > > > > > > -Matthias > > > > > > > > > On 11/23/16 8:59 AM, Mikael Högqvist wrote: > > > > Hi Michael, > > > > > > > > thanks for the extensive explanation, and yes it definitely helps > with > > my > > > > understanding of through(). :) > > > > > > > > You guessed correctly that I'm doing some "shenanings" where I'm > trying > > > to > > > > derive the changelog of a state store from the state store name. This > > > works > > > > perfectly fine with with a naming convention for the topics and by > > > creating > > > > them in Kafka upfront. > > > > > > > > My point is that it would help me (and maybe others), if the API of > > > KTable > > > > was extended to have a new method that does two things that is not > part > > > of > > > > the implementation of .through(). 1) Create a state store AND the > > > changelog > > > > topic 2) follow the Kafka Streams naming convention for changelog > > topics. > > > > Basically, I want to have a method that does what .through() is > > supposed > > > to > > > > do according to the documentation, but without the "topic" parameter. > > > > > > > > What do you think, would it be possible to extend the API with a > method > > > > like that? > > > > > > > > Thanks, > > > > Mikael > > > > > > > > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <mich...@confluent.io> > > > wrote: > > > > > > > >> Mikael, > > > >> > > > >> regarding your second question: > > > >> > > > >>> 2) Regarding the use case, the topology looks like this: > > > >>> > > > >>> .stream(...) > > > >>> .aggregate(..., "store-1") > > > >>> .mapValues(...) > > > >>> .through(..., "store-2") > > > >> > > > >> The last operator above would, without "..." ellipsis, be sth like > > > >> `KTable#through("through-topic", "store-2")`. Here, > "through-topic" > > is > > > the > > > >> changelog topic for both the KTable and the state store "store-2". > So > > > this > > > >> is the changelog topic name that you want to know. > > > >> > > > >> - If you want the "through" topic to have a `-changelog` suffix, > then > > > you'd > > > >> need to add that yourself in the call to `through(...)`. > > > >> > > > >> - If you wonder why `through()` doesn't add a `-changelog` suffix > > > >> automatically: That's because `through()` -- like `to()` or > > `stream()`, > > > >> `table()` -- require you to explicitly provide a topic name, and of > > > course > > > >> Kafka will use exactly this name. (FWIW, the `-changelog` suffix is > > > only > > > >> added when Kafka creates internal changelog topics behind the scenes > > for > > > >> you.) Unfortunately, the javadocs of `KTable#through()` is > incorrect > > > >> because it refers to `-changelog`; we'll fix that as mentioned > above. > > > >> > > > >> - Also, in case you want to do some shenanigans (like for some > tooling > > > >> you're building around state stores/changelogs/interactive queries) > > such > > > >> detecting all state store changelogs by doing the equivalent of `ls > > > >> *-changelog`, then this will miss changelogs of KTables that are > > > created by > > > >> `through()` and `to()` (unless you come up with a naming convention > > that > > > >> your tooling can assume to be in place, e.g. by always adding > > > `-changelog` > > > >> to topic names when you call `through()`). > > > >> > > > >> I hope this helps! > > > >> Michael > > > >> > > > >> > > > >> > > > >> > > > >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist < > hoegqv...@gmail.com > > > > > > >> wrote: > > > >> > > > >>> Hi Eno, > > > >>> > > > >>> 1) Great :) > > > >>> > > > >>> 2) Yes, we are using the Interactive Queries to access the state > > > stores. > > > >> In > > > >>> addition, we access the changelogs to subscribe to updates. For > this > > > >> reason > > > >>> we need to know the changelog topic name. > > > >>> > > > >>> Thanks, > > > >>> Mikael > > > >>> > > > >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska < > eno.there...@gmail.com > > > > > > >>> wrote: > > > >>> > > > >>>> HI Mikael, > > > >>>> > > > >>>> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is > > > >> looking > > > >>>> into fixing it. I agree that it can be confusing to have topic > names > > > >> that > > > >>>> are not what one would expect. > > > >>>> > > > >>>> 2) If your goal is to query/read from the state stores, you can > use > > > >>>> Interactive Queries to do that (you don't need to worry about the > > > >>> changelog > > > >>>> topic name and such). Interactive Queries is a new feature in > 0.10.1 > > > >>> (blog > > > >>>> here: > > > >>>> https://www.confluent.io/blog/unifying-stream-processing- > > > >>> and-interactive-queries-in-apache-kafka/ > > > >>>> < > > > >>>> https://www.confluent.io/blog/unifying-stream-processing- > > > >>> and-interactive-queries-in-apache-kafka/ > > > >>>>> ). > > > >>>> > > > >>>> Thanks > > > >>>> Eno > > > >>>> > > > >>>> > > > >>>>> On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqv...@gmail.com> > > > >> wrote: > > > >>>>> > > > >>>>> Sorry for being unclear, i'll try again :) > > > >>>>> > > > >>>>> 1) The JavaDoc for through is not correct, it states that a > > changelog > > > >>>> topic > > > >>>>> will be created for the state store. That is, if I would call it > > with > > > >>>>> through("topic", "a-store"), I would expect a kafka topic > > > >>>>> "my-app-id-a-store-changelog" to be created. > > > >>>>> > > > >>>>> 2) Regarding the use case, the topology looks like this: > > > >>>>> > > > >>>>> .stream(...) > > > >>>>> .aggregate(..., "store-1") > > > >>>>> .mapValues(...) > > > >>>>> .through(..., "store-2") > > > >>>>> > > > >>>>> Basically, I want to materialize both the result from the > aggregate > > > >>>> method > > > >>>>> and the result from mapValues, which is materialized using > > > >> .through(). > > > >>>>> Later, I will access both the tables (store-1 and store-2) to a) > > get > > > >>> the > > > >>>>> current state of the aggregate, b) subscribe to future updates. > > This > > > >>>> works > > > >>>>> just fine. The only issue is that I assumed to have a changelog > > topic > > > >>> for > > > >>>>> store-2 created automatically, which didnt happen. > > > >>>>> > > > >>>>> Since I want to access the changelog topic, it helps if the > naming > > is > > > >>>>> consistent. So either we enforce the same naming pattern as kafka > > > >> when > > > >>>>> calling .through() or alternatively the Kafka Streams API can > > > >> provide a > > > >>>>> method to materialize tables which creates a topic name according > > to > > > >>> the > > > >>>>> naming pattern. E.g. .through() without the topic parameter. > > > >>>>> > > > >>>>> What do you think? > > > >>>>> > > > >>>>> Best, > > > >>>>> Mikael > > > >>>>> > > > >>>>> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax < > > > >> matth...@confluent.io > > > >>>> > > > >>>>> wrote: > > > >>>>> > > > >>>>>> I cannot completely follow what want to achieve. > > > >>>>>> > > > >>>>>> However, the JavaDoc for through() seems not to be correct to > me. > > > >>> Using > > > >>>>>> through() will not create an extra internal changelog topic with > > the > > > >>>>>> described naming schema, because the topic specified in > through() > > > >> can > > > >>> be > > > >>>>>> used for this (there is no point in duplicating the data). > > > >>>>>> > > > >>>>>> If you have a KTable and apply a mapValues(), this will not > write > > > >> data > > > >>>>>> to any topic. The derived KTable is in-memory because you can > > easily > > > >>>>>> recreate it from its base KTable. > > > >>>>>> > > > >>>>>> What is the missing part you want to get? > > > >>>>>> > > > >>>>>> Btw: the internally created changelog topics are only used for > > > >>> recovery > > > >>>>>> in case of failure. Streams does not consumer from those topic > > > >> during > > > >>>>>> "normal operation". > > > >>>>>> > > > >>>>>> > > > >>>>>> -Matthias > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> On 11/22/16 1:59 AM, Mikael Högqvist wrote: > > > >>>>>>> Hi, > > > >>>>>>> > > > >>>>>>> in the documentation for KTable#through, it is stated that a > new > > > >>>>>> changelog > > > >>>>>>> topic will be created for the table. It also states that > calling > > > >>>> through > > > >>>>>> is > > > >>>>>>> equivalent to calling #to followed by KStreamBuilder#table. > > > >>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/ > > > >>> streams/kstream/KTable.html#through(org.apache.kafka. > > > >>> common.serialization.Serde,%20org.apache.kafka.common. > > > >>> serialization.Serde,%20java.lang.String,%20java.lang.String) > > > >>>>>>> > > > >>>>>>> In the docs for KStreamBuilder#table it is stated that no new > > > >>> changelog > > > >>>>>>> topic will be created since the underlying topic acts as the > > > >>> changelog. > > > >>>>>>> I've verified that this is the case. > > > >>>>>>> > > > >>>>>>> Is there another API method to materialize the results of a > > KTable > > > >>>>>>> including a changelog, i.e. such that kafka streams creates the > > > >> topic > > > >>>> and > > > >>>>>>> uses the naming schema for changelog topics? The use case I > have > > in > > > >>>> mind > > > >>>>>> is > > > >>>>>>> aggregate followed by mapValues. > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> Mikael > > > >>>>>>> > > > >>>>>> > > > >>>>>> > > > >>>> > > > >>>> > > > >>> > > > >> > > > > > > > > > > > > >