Mikael,

> Sure, I guess the topic is auto-created the first time I start the
topology
> and the second time its there already. It could be possible to create
> topics up front for us, or even use an admin call from inside the code.

Yes, that (i.e. you are running with auto-topic creation enabled) was what
I implicitly understood.  As covered in [1] we strongly recommend to
manually pre-create/manage user topics though.  User topics include the
source topics that you are reading from (cf. `stream()`, `table()`) but
also include the topics you use in `through()` and `to()`.


> Today there is .through(topic, store) and
> .to(topic), maybe it would be possible to have something like
> .materialize(store) which takes care of topic creation? Would adding
> something like this require a KIP?

There is already work being done in the Admin API (KIP-4), and part of this
functionality was released in the latest Kafka versions.  You can use this
to programmatically create topics, for example.  Note though that the work
on KIP-4 is not fully completed yet.

-Michael




[1]
http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application



On Thu, Nov 24, 2016 at 3:55 PM, Mikael Högqvist <hoegqv...@gmail.com>
wrote:

> Sure, I guess the topic is auto-created the first time I start the topology
> and the second time its there already. It could be possible to create
> topics up front for us, or even use an admin call from inside the code.
>
> That said, as a user, I think it would be great with a function in the
> Kafka Streams DSL that would allow me to materialize a KTable without
> pre-creating the topic. Today there is .through(topic, store) and
> .to(topic), maybe it would be possible to have something like
> .materialize(store) which takes care of topic creation? Would adding
> something like this require a KIP?
>
> Best,
> Mikael
>
> On Thu, Nov 24, 2016 at 1:44 PM Damian Guy <damian....@gmail.com> wrote:
>
> Mikeal,
>
> When you use `through(..)` topics are not created by KafkaStreams. You need
> to create them yourself before you run the application.
>
> Thanks,
> Damian
>
> On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist <hoegqv...@gmail.com> wrote:
>
> > Yes, the naming is not an issue.
> >
> > I've tested this with the topology described earlier. Every time I start
> > the topology with a call to .through() that references a topic that does
> > not exist, I get an exception from the UncaughtExceptionHandler:
> >
> > Uncaught exception org.apache.kafka.streams.errors.StreamsException:
> Topic
> > not found during partition assignment: words-count-changelog
> >
> > This happens when .through("words-count-changelog", "count") is part of
> the
> > topology. The topology is also not forwarding anything to that
> topic/store.
> > After restarting the application it works fine.
> >
> > Are the changelog topics created via, for example, .aggregate() different
> > to topics auto created via .through()?
> >
> > Thanks,
> > Mikael
> >
> > On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > > 1) Create a state store AND the changelog
> > > > topic 2) follow the Kafka Streams naming convention for changelog
> > topics.
> > > > Basically, I want to have a method that does what .through() is
> > supposed
> > > to
> > > > do according to the documentation, but without the "topic" parameter.
> > >
> > > I understand what you are saying, but you can get this done right now,
> > > too. If you use through(...) you will get the store. And you can just
> > > specify the topic name as "applicationId-storeName-changelog" to
> follow
> > > the naming convention Streams used internally. What is the problem
> using
> > > this approach (besides that you have to provide the topic name which
> > > seems not to be a big burden to me?)
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > > > Hi Michael,
> > > >
> > > > thanks for the extensive explanation, and yes it definitely helps
> with
> > my
> > > > understanding of through(). :)
> > > >
> > > > You guessed correctly that I'm doing some "shenanings" where I'm
> trying
> > > to
> > > > derive the changelog of a state store from the state store name. This
> > > works
> > > > perfectly fine with with a naming convention for the topics and by
> > > creating
> > > > them in Kafka upfront.
> > > >
> > > > My point is that it would help me (and maybe others), if the API of
> > > KTable
> > > > was extended to have a new method that does two things that is not
> part
> > > of
> > > > the implementation of .through(). 1) Create a state store AND the
> > > changelog
> > > > topic 2) follow the Kafka Streams naming convention for changelog
> > topics.
> > > > Basically, I want to have a method that does what .through() is
> > supposed
> > > to
> > > > do according to the documentation, but without the "topic" parameter.
> > > >
> > > > What do you think, would it be possible to extend the API with a
> method
> > > > like that?
> > > >
> > > > Thanks,
> > > > Mikael
> > > >
> > > > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <mich...@confluent.io>
> > > wrote:
> > > >
> > > >> Mikael,
> > > >>
> > > >> regarding your second question:
> > > >>
> > > >>> 2) Regarding the use case, the topology looks like this:
> > > >>>
> > > >>> .stream(...)
> > > >>> .aggregate(..., "store-1")
> > > >>> .mapValues(...)
> > > >>> .through(..., "store-2")
> > > >>
> > > >> The last operator above would, without "..." ellipsis, be sth like
> > > >> `KTable#through("through-topic", "store-2")`.  Here,
> "through-topic"
> > is
> > > the
> > > >> changelog topic for both the KTable and the state store "store-2".
> So
> > > this
> > > >> is the changelog topic name that you want to know.
> > > >>
> > > >> - If you want the "through" topic to have a `-changelog` suffix,
> then
> > > you'd
> > > >> need to add that yourself in the call to `through(...)`.
> > > >>
> > > >> - If you wonder why `through()` doesn't add a `-changelog` suffix
> > > >> automatically:  That's because `through()` -- like `to()` or
> > `stream()`,
> > > >> `table()` -- require you to explicitly provide a topic name, and of
> > > course
> > > >> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is
> > > only
> > > >> added when Kafka creates internal changelog topics behind the scenes
> > for
> > > >> you.)   Unfortunately, the javadocs of `KTable#through()` is
> incorrect
> > > >> because it refers to `-changelog`;  we'll fix that as mentioned
> above.
> > > >>
> > > >> - Also, in case you want to do some shenanigans (like for some
> tooling
> > > >> you're building around state stores/changelogs/interactive queries)
> > such
> > > >> detecting all state store changelogs by doing the equivalent of `ls
> > > >> *-changelog`, then this will miss changelogs of KTables that are
> > > created by
> > > >> `through()` and `to()` (unless you come up with a naming convention
> > that
> > > >> your tooling can assume to be in place, e.g. by always adding
> > > `-changelog`
> > > >> to topic names when you call `through()`).
> > > >>
> > > >> I hope this helps!
> > > >> Michael
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <
> hoegqv...@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >>> Hi Eno,
> > > >>>
> > > >>> 1) Great :)
> > > >>>
> > > >>> 2) Yes, we are using the Interactive Queries to access the state
> > > stores.
> > > >> In
> > > >>> addition, we access the changelogs to subscribe to updates. For
> this
> > > >> reason
> > > >>> we need to know the changelog topic name.
> > > >>>
> > > >>> Thanks,
> > > >>> Mikael
> > > >>>
> > > >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <
> eno.there...@gmail.com
> > >
> > > >>> wrote:
> > > >>>
> > > >>>> HI Mikael,
> > > >>>>
> > > >>>> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
> > > >> looking
> > > >>>> into fixing it. I agree that it can be confusing to have topic
> names
> > > >> that
> > > >>>> are not what one would expect.
> > > >>>>
> > > >>>> 2) If your goal is to query/read from the state stores, you can
> use
> > > >>>> Interactive Queries to do that (you don't need to worry about the
> > > >>> changelog
> > > >>>> topic name and such). Interactive Queries is a new feature in
> 0.10.1
> > > >>> (blog
> > > >>>> here:
> > > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > > >>> and-interactive-queries-in-apache-kafka/
> > > >>>> <
> > > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > > >>> and-interactive-queries-in-apache-kafka/
> > > >>>>> ).
> > > >>>>
> > > >>>> Thanks
> > > >>>> Eno
> > > >>>>
> > > >>>>
> > > >>>>> On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqv...@gmail.com>
> > > >> wrote:
> > > >>>>>
> > > >>>>> Sorry for being unclear, i'll try again :)
> > > >>>>>
> > > >>>>> 1) The JavaDoc for through is not correct, it states that a
> > changelog
> > > >>>> topic
> > > >>>>> will be created for the state store. That is, if I would call it
> > with
> > > >>>>> through("topic", "a-store"), I would expect a kafka topic
> > > >>>>> "my-app-id-a-store-changelog" to be created.
> > > >>>>>
> > > >>>>> 2) Regarding the use case, the topology looks like this:
> > > >>>>>
> > > >>>>> .stream(...)
> > > >>>>> .aggregate(..., "store-1")
> > > >>>>> .mapValues(...)
> > > >>>>> .through(..., "store-2")
> > > >>>>>
> > > >>>>> Basically, I want to materialize both the result from the
> aggregate
> > > >>>> method
> > > >>>>> and the result from mapValues, which is materialized using
> > > >> .through().
> > > >>>>> Later, I will access both the tables (store-1 and store-2) to a)
> > get
> > > >>> the
> > > >>>>> current state of the aggregate, b) subscribe to future updates.
> > This
> > > >>>> works
> > > >>>>> just fine. The only issue is that I assumed to have a changelog
> > topic
> > > >>> for
> > > >>>>> store-2 created automatically, which didnt happen.
> > > >>>>>
> > > >>>>> Since I want to access the changelog topic, it helps if the
> naming
> > is
> > > >>>>> consistent. So either we enforce the same naming pattern as kafka
> > > >> when
> > > >>>>> calling .through() or alternatively the Kafka Streams API can
> > > >> provide a
> > > >>>>> method to materialize tables which creates a topic name according
> > to
> > > >>> the
> > > >>>>> naming pattern. E.g. .through() without the topic parameter.
> > > >>>>>
> > > >>>>> What do you think?
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Mikael
> > > >>>>>
> > > >>>>> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
> > > >> matth...@confluent.io
> > > >>>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> I cannot completely follow what want to achieve.
> > > >>>>>>
> > > >>>>>> However, the JavaDoc for through() seems not to be correct to
> me.
> > > >>> Using
> > > >>>>>> through() will not create an extra internal changelog topic with
> > the
> > > >>>>>> described naming schema, because the topic specified in
> through()
> > > >> can
> > > >>> be
> > > >>>>>> used for this (there is no point in duplicating the data).
> > > >>>>>>
> > > >>>>>> If you have a KTable and apply a mapValues(), this will not
> write
> > > >> data
> > > >>>>>> to any topic. The derived KTable is in-memory because you can
> > easily
> > > >>>>>> recreate it from its base KTable.
> > > >>>>>>
> > > >>>>>> What is the missing part you want to get?
> > > >>>>>>
> > > >>>>>> Btw: the internally created changelog topics are only used for
> > > >>> recovery
> > > >>>>>> in case of failure. Streams does not consumer from those topic
> > > >> during
> > > >>>>>> "normal operation".
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> -Matthias
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > > >>>>>>> Hi,
> > > >>>>>>>
> > > >>>>>>> in the documentation for KTable#through, it is stated that a
> new
> > > >>>>>> changelog
> > > >>>>>>> topic will be created for the table. It also states that
> calling
> > > >>>> through
> > > >>>>>> is
> > > >>>>>>> equivalent to calling #to followed by KStreamBuilder#table.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> > > >>> streams/kstream/KTable.html#through(org.apache.kafka.
> > > >>> common.serialization.Serde,%20org.apache.kafka.common.
> > > >>> serialization.Serde,%20java.lang.String,%20java.lang.String)
> > > >>>>>>>
> > > >>>>>>> In the docs for KStreamBuilder#table it is stated that no new
> > > >>> changelog
> > > >>>>>>> topic will be created since the underlying topic acts as the
> > > >>> changelog.
> > > >>>>>>> I've verified that this is the case.
> > > >>>>>>>
> > > >>>>>>> Is there another API method to materialize the results of a
> > KTable
> > > >>>>>>> including a changelog, i.e. such that kafka streams creates the
> > > >> topic
> > > >>>> and
> > > >>>>>>> uses the naming schema for changelog topics? The use case I
> have
> > in
> > > >>>> mind
> > > >>>>>> is
> > > >>>>>>> aggregate followed by mapValues.
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Mikael
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to