Yes, for EOW, writing into changelog topics happens in the same transaction as writing to output topic.
You might be interesting in this blog post: https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/ On 5/19/20 1:22 PM, Raffaele Esposito wrote: > Thanks a lot Alex and Matthias, > From Alex answer, I understand that the record is written to the compacted > topic > as part of the transaction right ? > > > On Tue, May 19, 2020 at 8:32 PM Matthias J. Sax <mj...@apache.org> wrote: > >> What Alex says is correct. >> >> The changelog topic is only written into during processing -- in fact, >> you could consider this write a "side effect" of doing `store.put()`. >> >> The changelog topic is only read when recovering from an error and the >> store needs to be rebuilt from it. >> >> >> -Matthias >> >> On 5/19/20 9:30 AM, Alex Craig wrote: >>> Hi Raffaele, hopefully others more knowledgeable will correct me if I'm >>> wrong, but I don't believe anything gets read from the changelog topic. >>> (other than at startup if the state-store needs to be restored) So in >>> your Sub-topology-1, >>> the only topic being consumed from is the repartition topic. After it >> hits >>> the aggregation (state store), the record is emitted to the to the next >>> step which converts it back to a KStream object, and then finally it's >>> sinked to your output topic. If a failure occurred somewhere in that >>> sub-topology, then the offset for the repartition topic would not have >> been >>> committed, which means it would get processed again on application >>> startup. Hope that helps, >>> >>> Alex Craig >>> >>> On Tue, May 19, 2020 at 10:21 AM Raffaele Esposito < >> rafaelral...@gmail.com> >>> wrote: >>> >>>> This is the topology of a simple word count: >>>> >>>> Topologies: >>>> Sub-topology: 0 >>>> Source: KSTREAM-SOURCE-0000000000 (topics: [word_count_input]) >>>> --> KSTREAM-FLATMAPVALUES-0000000001 >>>> Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: []) >>>> --> KSTREAM-KEY-SELECT-0000000002 >>>> <-- KSTREAM-SOURCE-0000000000 >>>> Processor: KSTREAM-KEY-SELECT-0000000002 (stores: []) >>>> --> KSTREAM-FILTER-0000000005 >>>> <-- KSTREAM-FLATMAPVALUES-0000000001 >>>> Processor: KSTREAM-FILTER-0000000005 (stores: []) >>>> --> KSTREAM-SINK-0000000004 >>>> <-- KSTREAM-KEY-SELECT-0000000002 >>>> Sink: KSTREAM-SINK-0000000004 (topic: Counts-repartition) >>>> <-- KSTREAM-FILTER-0000000005 >>>> >>>> Sub-topology: 1 >>>> Source: KSTREAM-SOURCE-0000000006 (topics: [Counts-repartition]) >>>> --> KSTREAM-AGGREGATE-0000000003 >>>> Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Counts]) >>>> --> KTABLE-TOSTREAM-0000000007 >>>> <-- KSTREAM-SOURCE-0000000006 >>>> Processor: KTABLE-TOSTREAM-0000000007 (stores: []) >>>> --> KSTREAM-SINK-0000000008 >>>> <-- KSTREAM-AGGREGATE-0000000003 >>>> Sink: KSTREAM-SINK-0000000008 (topic: word_count_output) >>>> <-- KTABLE-TOSTREAM-0000000007 >>>> >>>> I would like to understand better what (stores: [Counts]) means. >>>> >>>> The documentation says: >>>> >>>> For each state store, Kafka maintains a replicated changelog Kafka >> topic in >>>> which it tracks any state updates. >>>> >>>> As I understand a KTable in Kafka is implemented with an in memory table >>>> (RockDB but in theory could also be an HashMap) backed up by a compacted >>>> log, which is the one of the internal topic created by my sample >>>> application. >>>> >>>> wordcount-application-Counts-changelog >>>> >>>> If the above assumption is correct it means that in that step Kafka >> Streams >>>> is writing back to kafka and start reading again from Kafka. >>>> >>>> So basically the log compacted topic is the sink for one >> read-process-write >>>> cycle and the source for the next. >>>> >>>> - read from repartition topic -> write on KTable compacted topic >>>> - read from KTable compacted topic -> process -> write to output >> topic >>>> >>>> Is this correct ? >>>> As per fault tolerance and exaclty-once semantic I would also expect >> Kafka >>>> to use apply transactions. Once again, are my assumptions correct ? >> Where I >>>> could learn more about these concepts ? >>>> >>>> Any help is welcome ! >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature