Thanks for your help.

For sliding windows the changelog as output behaviour is as expected.
But for non-overlapping windows most of the use cases expect micro-batch
semantics ( no intermediate changelog as output, only final aggregation in
the window).
Any examples for reference to implement micro-batch style window ?

Thanks,
EC

On Tue, Jun 5, 2018 at 2:23 AM, John Roesler <j...@confluent.io> wrote:

> Hi EC,
>
> Thanks for the very clear report and question. Like Guozhang said this is
> expected (but not ideal) behavior.
>
> For an immediate work-around, you can try materializing the KTable and
> setting the commit interval and cache size as discussed here (
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
> )
> to reduce (but not eliminate) duplicates.
>
> I'm in the process of getting my thoughts in order to write a KIP to
> address this exact use case. If you're interested in participating in the
> discussion, you can keep an eye on the dev mailing list or watch the KIP
> page. I can't say when exactly I'll start it. I want to get it out there
> soon, but I also want to do my homework and have a good proposal.
>
> Thanks,
> -John
>
> On Mon, Jun 4, 2018 at 12:45 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello,
> >
> > Your observation is correct, Kafka Streams by default will print
> continuous
> > updates to each window, instead of waiting for the "final" update for
> each
> > window.
> >
> > There are some ongoing work to provide the functionality to allow users
> > specify sth. like "give me the final result for windowed aggregations" in
> > the DSL, it will probably come post 2.0 release.
> >
> > Guozhang
> >
> >
> > On Mon, Jun 4, 2018 at 8:14 AM, EC Boost <ecboost2...@gmail.com> wrote:
> >
> > > Logged the internal windows information:
> > >
> > > Window{start=1528043030000, end=1528043040000} key=t6  1
> > > Window{start=1528043040000, end=1528043050000} key=t1  2
> > > Window{start=1528043040000, end=1528043050000} key=t7  3
> > > Window{start=1528043040000, end=1528043050000} key=t5  4
> > > Window{start=1528043040000, end=1528043050000} key=t5  4,5
> > > Window{start=1528043050000, end=1528043060000} key=t6  6
> > > Window{start=1528043050000, end=1528043060000} key=t6  6,7
> > > Window{start=1528043050000, end=1528043060000} key=t4  8
> > > Window{start=1528043060000, end=1528043070000} key=t6  9
> > > Window{start=1528043060000, end=1528043070000} key=t7  10
> > > Window{start=1528043060000, end=1528043070000} key=t6  9,11
> > > Window{start=1528043070000, end=1528043080000} key=t5  12
> > > Window{start=1528043070000, end=1528043080000} key=t6  13
> > > Window{start=1528043070000, end=1528043080000} key=t4  14
> > > Window{start=1528043070000, end=1528043080000} key=t4  14,15
> > >
> > > ....
> > >
> > > It seems that Kafka Stream send all the  KTable changelog as output and
> > > that's probably why there's duplicate outputs for gap-less
> > non-overlapping
> > > window.
> > >
> > > Is there any way to achieve real mini-batch-like style processing
> > semantics
> > > using non-overlapping windows which means only the last  value will be
> > sent
> > > as output not all the changelogs in the windows?
> > >
> > >
> > > On Mon, Jun 4, 2018 at 1:25 AM, EC Boost <ecboost2...@gmail.com>
> wrote:
> > >
> > > > Hello Everyone,
> > > >
> > > > I got duplicated results using kstreams for simple  windowed
> > aggregation.
> > > >
> > > > The input event format is comma seperated:  "event_id,event_type"
> and I
> > > > need to aggregate them by event type.
> > > >
> > > > Following is the Kafka Stream processing logic:
> > > >
> > > > events
> > > >       .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0]))
> > > >       .groupByKey()
> > > >       .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
> > > >       .aggregate(
> > > >         ArrayList::new,
> > > >         (type, id, eventList) -> {
> > > >           eventList.add(id);
> > > >           return eventList;
> > > >         },
> > > >         Materialized.with(stringSerde, arraySerde)
> > > >       )
> > > >       .toStream((k,v) -> k.key())
> > > >       .mapValues((v)-> String.join(",", v))
> > > >       .to("ks-debug-output", Produced.with(stringSerde,
> stringSerde));
> > > >
> > > >
> > > > I produced the input messages using the following snippet:
> > > >
> > > > require "kafka"
> > > >
> > > > kafka = Kafka.new(["localhost:9092"], client_id: "event-producer")
> > > >
> > > > f = File.open("events.txt")
> > > > f.each_line { |l|
> > > >   puts l
> > > >   kafka.deliver_message("#{l.strip}", topic: "ks-debug-input")
> > > >   sleep(3)
> > > > }
> > > >
> > > >
> > > >
> > > > Messages in events.txt is the following ( format :
> > "event_id,event_type"
> > > > and event_id is unique )  :
> > > >
> > > > Input
> > > >
> > > > 1,t6
> > > > 2,t1
> > > > 3,t7
> > > > 4,t5
> > > > 5,t5
> > > > 6,t6
> > > > 7,t6
> > > > 8,t4
> > > > 9,t6
> > > > 10,t7
> > > > 11,t6
> > > > 12,t5
> > > > 13,t6
> > > > 14,t4
> > > > 15,t4
> > > > 16,t2
> > > > 17,t7
> > > > 18,t6
> > > > 19,t3
> > > > 20,t7
> > > > 21,t1
> > > > 22,t5
> > > > 23,t5
> > > > 24,t6
> > > > 25,t6
> > > > 26,t4
> > > > 27,t4
> > > > 28,t3
> > > > 29,t2
> > > > 30,t5
> > > > 31,t1
> > > > 32,t1
> > > > 33,t1
> > > > 34,t1
> > > > 35,t2
> > > > 36,t4
> > > > 37,t3
> > > > 38,t3
> > > > 39,t6
> > > > 40,t6
> > > > 41,t1
> > > > 42,t4
> > > > 43,t4
> > > > 44,t6
> > > > 45,t6
> > > > 46,t7
> > > > 47,t7
> > > > 48,t3
> > > > 49,t1
> > > > 50,t6
> > > > 51,t1
> > > > 52,t4
> > > > 53,t6
> > > > 54,t7
> > > > 55,t1
> > > > 56,t1
> > > > 57,t1
> > > > 58,t5
> > > > 59,t6
> > > > 60,t7
> > > > 61,t6
> > > > 62,t4
> > > > 63,t5
> > > > 64,t1
> > > > 65,t3
> > > > 66,t1
> > > > 67,t3
> > > > 68,t3
> > > > 69,t5
> > > > 70,t1
> > > > 71,t6
> > > > 72,t5
> > > > 73,t6
> > > > 74,t1
> > > > 75,t7
> > > > 76,t5
> > > > 77,t3
> > > > 78,t1
> > > > 79,t4
> > > > 80,t3
> > > > 81,t6
> > > > 82,t2
> > > > 83,t6
> > > > 84,t2
> > > > 85,t4
> > > > 86,t7
> > > > 87,t4
> > > > 88,t6
> > > > 89,t5
> > > > 90,t6
> > > > 91,t4
> > > > 92,t3
> > > > 93,t4
> > > > 94,t6
> > > > 95,t2
> > > > 96,t2
> > > > 97,t7
> > > > 98,t4
> > > > 99,t3
> > > > 100,t3
> > > >
> > > > <https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81
> > > c2#output>
> > > >
> > > > But got the following output with duplicate event_ids between
> windows :
> > > >
> > > > Output
> > > >
> > > > t6    1
> > > > t1    2
> > > > t7    3
> > > > t5    4
> > > > t5    4,5
> > > > t6    6
> > > > t6    6,7
> > > > t4    8
> > > > t6    9
> > > > t7    10
> > > > t6    9,11
> > > > t5    12
> > > > t6    13
> > > > t4    14
> > > > t4    14,15
> > > > t2    16
> > > > t7    17
> > > > t6    18
> > > > t3    19
> > > > t7    20
> > > > t1    21
> > > > t5    22
> > > > t5    22,23
> > > > t6    24
> > > > t6    24,25
> > > > t4    26
> > > > t4    26,27
> > > > t3    28
> > > > t2    29
> > > > t5    30
> > > > t1    31
> > > > t1    32
> > > > t1    32,33
> > > > t1    32,33,34
> > > > t2    35
> > > > t4    36
> > > > t3    37
> > > > t3    37,38
> > > > t6    39
> > > > t6    39,40
> > > > t1    41
> > > > t4    42
> > > > t4    42,43
> > > > t6    44
> > > > t6    44,45
> > > > t7    46
> > > > t7    46,47
> > > > t3    48
> > > > t1    49
> > > > t6    50
> > > > t1    49,51
> > > > t4    52
> > > > t6    53
> > > > t7    54
> > > > t1    55
> > > > t1    56
> > > > t1    56,57
> > > > t5    58
> > > > t6    59
> > > > t7    60
> > > > t6    59,61
> > > > t4    62
> > > > t5    63
> > > > t1    64
> > > > t3    65
> > > > t1    66
> > > > t3    67
> > > > t3    67,68
> > > > t5    69
> > > > t1    70
> > > > t6    71
> > > > t5    72
> > > > t6    73
> > > > t1    74
> > > > t7    75
> > > > t5    76
> > > > t3    77
> > > > t1    78
> > > > t4    79
> > > > t3    80
> > > > t6    81
> > > > t2    82
> > > > t6    83
> > > > t2    82,84
> > > > t4    85
> > > > t7    86
> > > > t4    87
> > > > t6    88
> > > > t5    89
> > > > t6    90
> > > > t4    91
> > > > t3    92
> > > > t4    93
> > > > t6    94
> > > > t2    95
> > > > t2    96
> > > > t7    97
> > > > t4    98
> > > > t3    99
> > > > t3    99,100
> > > >
> > > >
> > > >
> > > > Since I am using non-overlapping gap-less windows in kstream
> processing
> > > > dsl, the correct ouput should NOT contain duplicate event ids between
> > > > windows.  Any ideas why the duplicates ?   ( Link for the debug
> > project:
> > > > https://github.com/westec/ks-aggregate-debug )
> > > >
> > > > Appreciate for your help!
> > > >
> > > > Regards,
> > > > EC
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to