Hi Raffaele, hopefully others more knowledgeable will correct me if I'm
wrong, but I don't believe anything gets read from the changelog topic.
(other than at startup if the state-store needs to be restored)  So in
your Sub-topology-1,
the only topic being consumed from is the repartition topic.  After it hits
the aggregation (state store), the record is emitted to the to the next
step which converts it back to a KStream object, and then finally it's
sinked to your output topic. If a failure occurred somewhere in that
sub-topology, then the offset for the repartition topic would not have been
committed, which means it would get processed again on application
startup.  Hope that helps,

Alex Craig

On Tue, May 19, 2020 at 10:21 AM Raffaele Esposito <rafaelral...@gmail.com>
wrote:

> This is the topology of a simple word count:
>
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-0000000000 (topics: [word_count_input])
>       --> KSTREAM-FLATMAPVALUES-0000000001
>     Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
>       --> KSTREAM-KEY-SELECT-0000000002
>       <-- KSTREAM-SOURCE-0000000000
>     Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
>       --> KSTREAM-FILTER-0000000005
>       <-- KSTREAM-FLATMAPVALUES-0000000001
>     Processor: KSTREAM-FILTER-0000000005 (stores: [])
>       --> KSTREAM-SINK-0000000004
>       <-- KSTREAM-KEY-SELECT-0000000002
>     Sink: KSTREAM-SINK-0000000004 (topic: Counts-repartition)
>       <-- KSTREAM-FILTER-0000000005
>
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-0000000006 (topics: [Counts-repartition])
>       --> KSTREAM-AGGREGATE-0000000003
>     Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Counts])
>       --> KTABLE-TOSTREAM-0000000007
>       <-- KSTREAM-SOURCE-0000000006
>     Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
>       --> KSTREAM-SINK-0000000008
>       <-- KSTREAM-AGGREGATE-0000000003
>     Sink: KSTREAM-SINK-0000000008 (topic: word_count_output)
>       <-- KTABLE-TOSTREAM-0000000007
>
> I would like to understand better what (stores: [Counts]) means.
>
> The documentation says:
>
> For each state store, Kafka maintains a replicated changelog Kafka topic in
> which it tracks any state updates.
>
> As I understand a KTable in Kafka is implemented with an in memory table
> (RockDB but in theory could also be an HashMap) backed up by a compacted
> log, which is the one of the internal topic created by my sample
> application.
>
> wordcount-application-Counts-changelog
>
> If the above assumption is correct it means that in that step Kafka Streams
> is writing back to kafka and start reading again from Kafka.
>
> So basically the log compacted topic is the sink for one read-process-write
> cycle and the source for the next.
>
>    - read from repartition topic -> write on KTable compacted topic
>    - read from KTable compacted topic -> process -> write to output topic
>
> Is this correct ?
> As per fault tolerance and exaclty-once semantic I would also expect Kafka
> to use apply transactions. Once again, are my assumptions correct ? Where I
> could learn more about these concepts ?
>
> Any help is welcome !
>

Reply via email to