Hi EC,

Thanks for the very clear report and question. Like Guozhang said this is
expected (but not ideal) behavior.

For an immediate work-around, you can try materializing the KTable and
setting the commit interval and cache size as discussed here (
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/)
to reduce (but not eliminate) duplicates.

I'm in the process of getting my thoughts in order to write a KIP to
address this exact use case. If you're interested in participating in the
discussion, you can keep an eye on the dev mailing list or watch the KIP
page. I can't say when exactly I'll start it. I want to get it out there
soon, but I also want to do my homework and have a good proposal.

Thanks,
-John

On Mon, Jun 4, 2018 at 12:45 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello,
>
> Your observation is correct, Kafka Streams by default will print continuous
> updates to each window, instead of waiting for the "final" update for each
> window.
>
> There are some ongoing work to provide the functionality to allow users
> specify sth. like "give me the final result for windowed aggregations" in
> the DSL, it will probably come post 2.0 release.
>
> Guozhang
>
>
> On Mon, Jun 4, 2018 at 8:14 AM, EC Boost <ecboost2...@gmail.com> wrote:
>
> > Logged the internal windows information:
> >
> > Window{start=1528043030000, end=1528043040000} key=t6  1
> > Window{start=1528043040000, end=1528043050000} key=t1  2
> > Window{start=1528043040000, end=1528043050000} key=t7  3
> > Window{start=1528043040000, end=1528043050000} key=t5  4
> > Window{start=1528043040000, end=1528043050000} key=t5  4,5
> > Window{start=1528043050000, end=1528043060000} key=t6  6
> > Window{start=1528043050000, end=1528043060000} key=t6  6,7
> > Window{start=1528043050000, end=1528043060000} key=t4  8
> > Window{start=1528043060000, end=1528043070000} key=t6  9
> > Window{start=1528043060000, end=1528043070000} key=t7  10
> > Window{start=1528043060000, end=1528043070000} key=t6  9,11
> > Window{start=1528043070000, end=1528043080000} key=t5  12
> > Window{start=1528043070000, end=1528043080000} key=t6  13
> > Window{start=1528043070000, end=1528043080000} key=t4  14
> > Window{start=1528043070000, end=1528043080000} key=t4  14,15
> >
> > ....
> >
> > It seems that Kafka Stream send all the  KTable changelog as output and
> > that's probably why there's duplicate outputs for gap-less
> non-overlapping
> > window.
> >
> > Is there any way to achieve real mini-batch-like style processing
> semantics
> > using non-overlapping windows which means only the last  value will be
> sent
> > as output not all the changelogs in the windows?
> >
> >
> > On Mon, Jun 4, 2018 at 1:25 AM, EC Boost <ecboost2...@gmail.com> wrote:
> >
> > > Hello Everyone,
> > >
> > > I got duplicated results using kstreams for simple  windowed
> aggregation.
> > >
> > > The input event format is comma seperated:  "event_id,event_type" and I
> > > need to aggregate them by event type.
> > >
> > > Following is the Kafka Stream processing logic:
> > >
> > > events
> > >       .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0]))
> > >       .groupByKey()
> > >       .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
> > >       .aggregate(
> > >         ArrayList::new,
> > >         (type, id, eventList) -> {
> > >           eventList.add(id);
> > >           return eventList;
> > >         },
> > >         Materialized.with(stringSerde, arraySerde)
> > >       )
> > >       .toStream((k,v) -> k.key())
> > >       .mapValues((v)-> String.join(",", v))
> > >       .to("ks-debug-output", Produced.with(stringSerde, stringSerde));
> > >
> > >
> > > I produced the input messages using the following snippet:
> > >
> > > require "kafka"
> > >
> > > kafka = Kafka.new(["localhost:9092"], client_id: "event-producer")
> > >
> > > f = File.open("events.txt")
> > > f.each_line { |l|
> > >   puts l
> > >   kafka.deliver_message("#{l.strip}", topic: "ks-debug-input")
> > >   sleep(3)
> > > }
> > >
> > >
> > >
> > > Messages in events.txt is the following ( format :
> "event_id,event_type"
> > > and event_id is unique )  :
> > >
> > > Input
> > >
> > > 1,t6
> > > 2,t1
> > > 3,t7
> > > 4,t5
> > > 5,t5
> > > 6,t6
> > > 7,t6
> > > 8,t4
> > > 9,t6
> > > 10,t7
> > > 11,t6
> > > 12,t5
> > > 13,t6
> > > 14,t4
> > > 15,t4
> > > 16,t2
> > > 17,t7
> > > 18,t6
> > > 19,t3
> > > 20,t7
> > > 21,t1
> > > 22,t5
> > > 23,t5
> > > 24,t6
> > > 25,t6
> > > 26,t4
> > > 27,t4
> > > 28,t3
> > > 29,t2
> > > 30,t5
> > > 31,t1
> > > 32,t1
> > > 33,t1
> > > 34,t1
> > > 35,t2
> > > 36,t4
> > > 37,t3
> > > 38,t3
> > > 39,t6
> > > 40,t6
> > > 41,t1
> > > 42,t4
> > > 43,t4
> > > 44,t6
> > > 45,t6
> > > 46,t7
> > > 47,t7
> > > 48,t3
> > > 49,t1
> > > 50,t6
> > > 51,t1
> > > 52,t4
> > > 53,t6
> > > 54,t7
> > > 55,t1
> > > 56,t1
> > > 57,t1
> > > 58,t5
> > > 59,t6
> > > 60,t7
> > > 61,t6
> > > 62,t4
> > > 63,t5
> > > 64,t1
> > > 65,t3
> > > 66,t1
> > > 67,t3
> > > 68,t3
> > > 69,t5
> > > 70,t1
> > > 71,t6
> > > 72,t5
> > > 73,t6
> > > 74,t1
> > > 75,t7
> > > 76,t5
> > > 77,t3
> > > 78,t1
> > > 79,t4
> > > 80,t3
> > > 81,t6
> > > 82,t2
> > > 83,t6
> > > 84,t2
> > > 85,t4
> > > 86,t7
> > > 87,t4
> > > 88,t6
> > > 89,t5
> > > 90,t6
> > > 91,t4
> > > 92,t3
> > > 93,t4
> > > 94,t6
> > > 95,t2
> > > 96,t2
> > > 97,t7
> > > 98,t4
> > > 99,t3
> > > 100,t3
> > >
> > > <https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81
> > c2#output>
> > >
> > > But got the following output with duplicate event_ids between windows :
> > >
> > > Output
> > >
> > > t6    1
> > > t1    2
> > > t7    3
> > > t5    4
> > > t5    4,5
> > > t6    6
> > > t6    6,7
> > > t4    8
> > > t6    9
> > > t7    10
> > > t6    9,11
> > > t5    12
> > > t6    13
> > > t4    14
> > > t4    14,15
> > > t2    16
> > > t7    17
> > > t6    18
> > > t3    19
> > > t7    20
> > > t1    21
> > > t5    22
> > > t5    22,23
> > > t6    24
> > > t6    24,25
> > > t4    26
> > > t4    26,27
> > > t3    28
> > > t2    29
> > > t5    30
> > > t1    31
> > > t1    32
> > > t1    32,33
> > > t1    32,33,34
> > > t2    35
> > > t4    36
> > > t3    37
> > > t3    37,38
> > > t6    39
> > > t6    39,40
> > > t1    41
> > > t4    42
> > > t4    42,43
> > > t6    44
> > > t6    44,45
> > > t7    46
> > > t7    46,47
> > > t3    48
> > > t1    49
> > > t6    50
> > > t1    49,51
> > > t4    52
> > > t6    53
> > > t7    54
> > > t1    55
> > > t1    56
> > > t1    56,57
> > > t5    58
> > > t6    59
> > > t7    60
> > > t6    59,61
> > > t4    62
> > > t5    63
> > > t1    64
> > > t3    65
> > > t1    66
> > > t3    67
> > > t3    67,68
> > > t5    69
> > > t1    70
> > > t6    71
> > > t5    72
> > > t6    73
> > > t1    74
> > > t7    75
> > > t5    76
> > > t3    77
> > > t1    78
> > > t4    79
> > > t3    80
> > > t6    81
> > > t2    82
> > > t6    83
> > > t2    82,84
> > > t4    85
> > > t7    86
> > > t4    87
> > > t6    88
> > > t5    89
> > > t6    90
> > > t4    91
> > > t3    92
> > > t4    93
> > > t6    94
> > > t2    95
> > > t2    96
> > > t7    97
> > > t4    98
> > > t3    99
> > > t3    99,100
> > >
> > >
> > >
> > > Since I am using non-overlapping gap-less windows in kstream processing
> > > dsl, the correct ouput should NOT contain duplicate event ids between
> > > windows.  Any ideas why the duplicates ?   ( Link for the debug
> project:
> > > https://github.com/westec/ks-aggregate-debug )
> > >
> > > Appreciate for your help!
> > >
> > > Regards,
> > > EC
> > >
> > >
> > >
> > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to