Yes, basically I'm ok with how join works including window and
retention periods, under normal circumstances. In real time of
occurrence of events, application joining streams will get something
like this:
T1 + 0 => topic_small (K1, V1) => join result (None)
T1 + 1 min => topic_large (K1, VT1) => join result (K1, V1, VT1)
T1 + 3 mins => topic_large (K1, VT2) => join result (K1, V1, VT2)
T1 + 7 mins => topic_small (K1, V2) => join result (K1, V2, VT2)
According to Windowed<K> and WindowedSerializer it keeps only start of
window with key when storing it to state store. Assuming that window
start time same for both topics/KStreams (not sure yet, still reading
source), but even if not same, state stores actions of Kafka Streams
will be like this:
join_left_side_store.put ( K1-W1, V1 )
join_right_side_store.put ( K1-W1, VT1 )
join_left_side_store.put ( K1-W1, V2 )
join_right_side_store.put ( K1-W1, VT2 )
However when consuming same topics by the same application from
beginning from scratch (no application local state stores) for large
period of time (greater than window period, but less than retention
period), join result for 10 minutes window will be different, like this:
join result (None)
join result (K1, V2, VT1)
join result (K1, V2, VT2)
Because topic_large's stream is being read slower, value of topic_small
in window will change from V1 to V2, before Kafka Streams will receive
VT1.
I.e. state stores actions of Kafka Streams will be like this:
join_left_side_store.put ( K1-W1, V1 )
join_left_side_store.put ( K1-W1, V2 )
join_right_side_store.put ( K1-W1, VT1 )
join_right_side_store.put ( K1-W1, VT2 )
Isn't it?
On Wed, Apr 26, 2017 at 6:50 PM, Damian Guy <damian....@gmail.com>
wrote:
Hi Murad,
On Wed, 26 Apr 2017 at 13:37 Murad Mamedov <m...@muradm.net> wrote:
Is there any global time synchronization between streams in Kafka
Streams
API? So that, it would not consume more events from one stream
while the
other is still behind in time. Or probably better to rephrase it
like, is
there global event ordering based on timestamp of event?
Yes. When streams are joined each partition from the joined streams
are
grouped together into a single Task. Each Task maintains a record
buffer
for all of the topics it is consuming from. When it is time process a
record it will chose a record from the partition that has the smallest
timestamp. So in this way it makes a best effort to keep the streams
in
sync.
The other thing could be to join streams in window, however same
question
arises, if one stream days behind the other, will the join window
of 15
minutes ever work?
If the data is arriving much later you can use
JoinWindows.until(SOME_TIME_PERIOD) to keep the data around. In this
case
the streams will still join. Once SOME_TIME_PERIOD has expired the
streams
will no longer be able to join.
I'm trying to grasp a way on how to design replay of long periods
of time
for application with multiple topics/streams. Especially when
combining
with low-level API processors and transformers which relay on each
other
via GlobalKTable or KTable stores on these streams. For instance,
smaller
topic could have the following sequence of events:
T1 - (k1, v1)
T1 + 10 minutes - (k1, null)
T1 + 20 minutes - (k1, v2)
While topic with larger events:
T1 - (k1, vt1)
T1 + 5 minutes - (k1, null)
T1 + 15 minutes - (k1, vt2)
If one would join or lookup these streams in realtime (timestamp of
event
is approximately = wall clock time) result would be:
T1 - topic_small (k1, v1) - topic_large (k1, vt1)
T1 + 5 minutes - topic_small (k1, v1) - topic_large (k1, null)
T1 + 10 minutes - topic_small (k1, null) - topic_large (k1, null)
T1 + 15 minutes - topic_small (k1, null) - topic_large (k1, vt2)
T1 + 20 minutes - topic_small (k1, v2) - topic_large (k1, vt2)
However, when replaying streams from beginning, from perspective of
topic
with large events, it would see topic with small events as (k1, v2),
completely missing v1 and null states in case of GlobalKTable/KTable
presentation or events in case of KStream-KStream windowed join.
I don't really follow here. In the case of a GlobalKTable it will be
initialized with all of the existing data before the rest of the
streams
start processing.
Do I miss something here? Should application be responsible in
global
synchronization between topics, or Kafka Streams does / can do
that? If
application should, then what could be approach to solve it?
I hope I could explain myself.
Thanks in advance