Hi Devs,

May be you can shed some light..

Thanks in advance
---------- Forwarded message ----------
From: "m...@muradm.net" <m...@muradm.net>
Date: 26 Apr 2017 19:53
Subject: Re: Time synchronization between streams
To: <us...@kafka.apache.org>
Cc:

Yes, basically I'm ok with how join works including window and retention
> periods, under normal circumstances. In real time of occurrence of events,
> application joining streams will get something like this:
>
> T1 + 0 => topic_small (K1, V1)  => join result (None)
> T1 + 1 min =>  topic_large (K1, VT1) => join result (K1, V1, VT1)
> T1 + 3 mins => topic_large (K1, VT2) => join result (K1, V1, VT2)
> T1 + 7 mins => topic_small (K1, V2) => join result (K1, V2, VT2)
>
> According to Windowed<K> and WindowedSerializer it keeps only start of
> window with key when storing it to state store. Assuming that window start
> time same for both topics/KStreams (not sure yet, still reading source),
> but even if not same, state stores actions of Kafka Streams will be like
> this:
>
> join_left_side_store.put ( K1-W1, V1 )
> join_right_side_store.put ( K1-W1, VT1 )
> join_left_side_store.put ( K1-W1, V2 )
> join_right_side_store.put ( K1-W1, VT2 )
>
> However when consuming same topics by the same application from beginning
> from scratch (no application local state stores) for large period of time
> (greater than window period, but less than retention period), join result
> for 10 minutes window will be different, like this:
>
> join result (None)
> join result (K1, V2, VT1)
> join result (K1, V2, VT2)
>
> Because topic_large's stream is being read slower, value of topic_small in
> window will change from V1 to V2, before Kafka Streams will receive VT1.
>
> I.e. state stores actions of Kafka Streams will be like this:
>
> join_left_side_store.put ( K1-W1, V1 )
> join_left_side_store.put ( K1-W1, V2 )
> join_right_side_store.put ( K1-W1, VT1 )
> join_right_side_store.put ( K1-W1, VT2 )
>
> Isn't it?
>
> On Wed, Apr 26, 2017 at 6:50 PM, Damian Guy <damian....@gmail.com> wrote:
>
> Hi Murad, On Wed, 26 Apr 2017 at 13:37 Murad Mamedov <m...@muradm.net>
> wrote:
>
> Is there any global time synchronization between streams in Kafka Streams
> API? So that, it would not consume more events from one stream while the
> other is still behind in time. Or probably better to rephrase it like, is
> there global event ordering based on timestamp of event?
>
> Yes. When streams are joined each partition from the joined streams are
> grouped together into a single Task. Each Task maintains a record buffer
> for all of the topics it is consuming from. When it is time process a
> record it will chose a record from the partition that has the smallest
> timestamp. So in this way it makes a best effort to keep the streams in
> sync.
>
> The other thing could be to join streams in window, however same question
> arises, if one stream days behind the other, will the join window of 15
> minutes ever work?
>
> If the data is arriving much later you can use 
> JoinWindows.until(SOME_TIME_PERIOD)
> to keep the data around. In this case the streams will still join. Once
> SOME_TIME_PERIOD has expired the streams will no longer be able to join.
>
> I'm trying to grasp a way on how to design replay of long periods of time
> for application with multiple topics/streams. Especially when combining
> with low-level API processors and transformers which relay on each other
> via GlobalKTable or KTable stores on these streams. For instance, smaller
> topic could have the following sequence of events: T1 - (k1, v1) T1 + 10
> minutes - (k1, null) T1 + 20 minutes - (k1, v2) While topic with larger
> events: T1 - (k1, vt1) T1 + 5 minutes - (k1, null) T1 + 15 minutes - (k1,
> vt2) If one would join or lookup these streams in realtime (timestamp of
> event is approximately = wall clock time) result would be: T1 - topic_small
> (k1, v1) - topic_large (k1, vt1) T1 + 5 minutes - topic_small (k1, v1) -
> topic_large (k1, null) T1 + 10 minutes - topic_small (k1, null) -
> topic_large (k1, null) T1 + 15 minutes - topic_small (k1, null) -
> topic_large (k1, vt2) T1 + 20 minutes - topic_small (k1, v2) - topic_large
> (k1, vt2) However, when replaying streams from beginning, from perspective
> of topic with large events, it would see topic with small events as (k1,
> v2), completely missing v1 and null states in case of GlobalKTable/KTable
> presentation or events in case of KStream-KStream windowed join.
>
> I don't really follow here. In the case of a GlobalKTable it will be
> initialized with all of the existing data before the rest of the streams
> start processing.
>
> Do I miss something here? Should application be responsible in global
> synchronization between topics, or Kafka Streams does / can do that? If
> application should, then what could be approach to solve it? I hope I could
> explain myself. Thanks in advance
>
>

Reply via email to