Yes, for EOW, writing into changelog topics happens in the same
transaction as writing to output topic.

On 5/19/20 1:22 PM, Raffaele Esposito wrote:
> Thanks a lot Alex and Matthias,
> From Alex answer, I understand that the record is written to the compacted
> topic
> as part of the transaction right ?
> On Tue, May 19, 2020 at 8:32 PM Matthias J. Sax <> wrote:
>> What Alex says is correct.
>> The changelog topic is only written into during processing -- in fact,
>> you could consider this write a "side effect" of doing `store.put()`.
>> The changelog topic is only read when recovering from an error and the
>> store needs to be rebuilt from it.
>> -Matthias
>> On 5/19/20 9:30 AM, Alex Craig wrote:
>>> Hi Raffaele, hopefully others more knowledgeable will correct me if I'm
>>> wrong, but I don't believe anything gets read from the changelog topic.
>>> (other than at startup if the state-store needs to be restored)  So in
>>> your Sub-topology-1,
>>> the only topic being consumed from is the repartition topic.  After it
>> hits
>>> the aggregation (state store), the record is emitted to the to the next
>>> step which converts it back to a KStream object, and then finally it's
>>> sinked to your output topic. If a failure occurred somewhere in that
>>> sub-topology, then the offset for the repartition topic would not have
>> been
>>> committed, which means it would get processed again on application
>>> startup.  Hope that helps,
>>> Alex Craig
>>> On Tue, May 19, 2020 at 10:21 AM Raffaele Esposito <
>>> wrote:
>>>> This is the topology of a simple word count:
>>>> Topologies:
>>>>    Sub-topology: 0
>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [word_count_input])
>>>>       --> KSTREAM-FLATMAPVALUES-0000000001
>>>>     Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
>>>>       --> KSTREAM-KEY-SELECT-0000000002
>>>>       <-- KSTREAM-SOURCE-0000000000
>>>>     Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
>>>>       --> KSTREAM-FILTER-0000000005
>>>>       <-- KSTREAM-FLATMAPVALUES-0000000001
>>>>     Processor: KSTREAM-FILTER-0000000005 (stores: [])
>>>>       --> KSTREAM-SINK-0000000004
>>>>       <-- KSTREAM-KEY-SELECT-0000000002
>>>>     Sink: KSTREAM-SINK-0000000004 (topic: Counts-repartition)
>>>>       <-- KSTREAM-FILTER-0000000005
>>>>   Sub-topology: 1
>>>>     Source: KSTREAM-SOURCE-0000000006 (topics: [Counts-repartition])
>>>>       --> KSTREAM-AGGREGATE-0000000003
>>>>     Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Counts])
>>>>       --> KTABLE-TOSTREAM-0000000007
>>>>       <-- KSTREAM-SOURCE-0000000006
>>>>     Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
>>>>       --> KSTREAM-SINK-0000000008
>>>>       <-- KSTREAM-AGGREGATE-0000000003
>>>>     Sink: KSTREAM-SINK-0000000008 (topic: word_count_output)
>>>>       <-- KTABLE-TOSTREAM-0000000007
>>>> I would like to understand better what (stores: [Counts]) means.
>>>> The documentation says:
>>>> For each state store, Kafka maintains a replicated changelog Kafka
>> topic in
>>>> which it tracks any state updates.
>>>> As I understand a KTable in Kafka is implemented with an in memory table
>>>> (RockDB but in theory could also be an HashMap) backed up by a compacted
>>>> log, which is the one of the internal topic created by my sample
>>>> application.
>>>> wordcount-application-Counts-changelog
>>>> If the above assumption is correct it means that in that step Kafka
>> Streams
>>>> is writing back to kafka and start reading again from Kafka.
>>>> So basically the log compacted topic is the sink for one
>> read-process-write
>>>> cycle and the source for the next.
>>>>    - read from repartition topic -> write on KTable compacted topic
>>>>    - read from KTable compacted topic -> process -> write to output
>> topic
>>>> Is this correct ?
>>>> As per fault tolerance and exaclty-once semantic I would also expect
>> Kafka
>>>> to use apply transactions. Once again, are my assumptions correct ?
>> Where I
>>>> could learn more about these concepts ?
>>>> Any help is welcome !

