Hello,

I wonder if ordering of the messages is preserved by kafka streams when the 
messages are processes by the same sub-topology without redistribution and in 
the end there are multiple sinks for the same topic. 

I couldn't find the answer to this question in the docs/mailing list/stack 
overflow.

You can arrive to this situation with the code like this:
 
val source = builder.stream[Key, Value]("input")
source
  .filter(...)
  .mapValues(...)
  .transform(...)
  .to("output")

source
  .filter(...)
  .mapValues(...)
  .transform(...)
  .to("output")

Basically it's two different processing branches, that process each input value 
slightly differently. I.e. if one branch produces a message, in response to an 
input message, the other branch will produce the message as well. So keeping 
the ordering in this case means, all messages produces for earlier source 
messages on one branch should precede messages produced by the other branch for 
later source messages.

Here's my topology:

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000019 (topics: [input])
      --> KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
      --> KSTREAM-MAPVALUES-0000000022, KSTREAM-TRANSFORM-0000000021
      <-- KSTREAM-SOURCE-0000000019
    Processor: KSTREAM-MAPVALUES-0000000022 (stores: [])
      --> KSTREAM-TRANSFORM-0000000023
      <-- KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-TRANSFORM-0000000021 (stores: [store1])
      --> KSTREAM-MAP-0000000027
      <-- KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-TRANSFORM-0000000023 (stores: [store2])
      --> KSTREAM-MAP-0000000024
      <-- KSTREAM-MAPVALUES-0000000022
    Processor: KSTREAM-MAP-0000000024 (stores: [])
      --> KSTREAM-FILTER-0000000025
      <-- KSTREAM-TRANSFORM-0000000023
    Processor: KSTREAM-MAP-0000000027 (stores: [])
      --> KSTREAM-FILTER-0000000028
      <-- KSTREAM-TRANSFORM-0000000021
    Processor: KSTREAM-FILTER-0000000025 (stores: [])
      --> KSTREAM-SINK-0000000026
      <-- KSTREAM-MAP-0000000024
    Processor: KSTREAM-FILTER-0000000028 (stores: [])
      --> KSTREAM-SINK-0000000029
      <-- KSTREAM-MAP-0000000027
    Sink: KSTREAM-SINK-0000000026 (topic: output)
      <-- KSTREAM-FILTER-0000000025
    Sink: KSTREAM-SINK-0000000029 (topic: output)
      <-- KSTREAM-FILTER-0000000028

On one hand I guess that it all information coming from one partition will be 
processed by one thread, so it can keep the order of the messages, but on the 
other hand I see two independent sinks in the topology, with independent 
buffers etc I guess. So in the end I am not sure what's going to happen.

I would guess that it can work because sinks probably have the same buffer 
size, but it's not guaranteed. I can imagine a following failure scenario: a 
write by one sink can succeed while the write by the other sink fails, so a 
batch of messages gets delivered to the output partition out of order. 

Can someone please clarify what happens in this case? Is there an ordering 
guarantee? Can this streams be merged while preserving ordering?

I know that regular Source.merge() doesn't preserve ordering, but in this case 
I know that there's no repartitioning etc, and messages basically appear on the 
same "tick", so it feels like there should be a way to do this. Can I keep 
ordering if I replace my transformers with processors and manually connect them 
to the same sink?


--
Best regards,
Vasily Sulatskov

Reply via email to