Hello, Your observation is correct, Kafka Streams by default will print continuous updates to each window, instead of waiting for the "final" update for each window.
There are some ongoing work to provide the functionality to allow users specify sth. like "give me the final result for windowed aggregations" in the DSL, it will probably come post 2.0 release. Guozhang On Mon, Jun 4, 2018 at 8:14 AM, EC Boost <ecboost2...@gmail.com> wrote: > Logged the internal windows information: > > Window{start=1528043030000, end=1528043040000} key=t6 1 > Window{start=1528043040000, end=1528043050000} key=t1 2 > Window{start=1528043040000, end=1528043050000} key=t7 3 > Window{start=1528043040000, end=1528043050000} key=t5 4 > Window{start=1528043040000, end=1528043050000} key=t5 4,5 > Window{start=1528043050000, end=1528043060000} key=t6 6 > Window{start=1528043050000, end=1528043060000} key=t6 6,7 > Window{start=1528043050000, end=1528043060000} key=t4 8 > Window{start=1528043060000, end=1528043070000} key=t6 9 > Window{start=1528043060000, end=1528043070000} key=t7 10 > Window{start=1528043060000, end=1528043070000} key=t6 9,11 > Window{start=1528043070000, end=1528043080000} key=t5 12 > Window{start=1528043070000, end=1528043080000} key=t6 13 > Window{start=1528043070000, end=1528043080000} key=t4 14 > Window{start=1528043070000, end=1528043080000} key=t4 14,15 > > .... > > It seems that Kafka Stream send all the KTable changelog as output and > that's probably why there's duplicate outputs for gap-less non-overlapping > window. > > Is there any way to achieve real mini-batch-like style processing semantics > using non-overlapping windows which means only the last value will be sent > as output not all the changelogs in the windows? > > > On Mon, Jun 4, 2018 at 1:25 AM, EC Boost <ecboost2...@gmail.com> wrote: > > > Hello Everyone, > > > > I got duplicated results using kstreams for simple windowed aggregation. > > > > The input event format is comma seperated: "event_id,event_type" and I > > need to aggregate them by event type. > > > > Following is the Kafka Stream processing logic: > > > > events > > .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0])) > > .groupByKey() > > .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10))) > > .aggregate( > > ArrayList::new, > > (type, id, eventList) -> { > > eventList.add(id); > > return eventList; > > }, > > Materialized.with(stringSerde, arraySerde) > > ) > > .toStream((k,v) -> k.key()) > > .mapValues((v)-> String.join(",", v)) > > .to("ks-debug-output", Produced.with(stringSerde, stringSerde)); > > > > > > I produced the input messages using the following snippet: > > > > require "kafka" > > > > kafka = Kafka.new(["localhost:9092"], client_id: "event-producer") > > > > f = File.open("events.txt") > > f.each_line { |l| > > puts l > > kafka.deliver_message("#{l.strip}", topic: "ks-debug-input") > > sleep(3) > > } > > > > > > > > Messages in events.txt is the following ( format : "event_id,event_type" > > and event_id is unique ) : > > > > Input > > > > 1,t6 > > 2,t1 > > 3,t7 > > 4,t5 > > 5,t5 > > 6,t6 > > 7,t6 > > 8,t4 > > 9,t6 > > 10,t7 > > 11,t6 > > 12,t5 > > 13,t6 > > 14,t4 > > 15,t4 > > 16,t2 > > 17,t7 > > 18,t6 > > 19,t3 > > 20,t7 > > 21,t1 > > 22,t5 > > 23,t5 > > 24,t6 > > 25,t6 > > 26,t4 > > 27,t4 > > 28,t3 > > 29,t2 > > 30,t5 > > 31,t1 > > 32,t1 > > 33,t1 > > 34,t1 > > 35,t2 > > 36,t4 > > 37,t3 > > 38,t3 > > 39,t6 > > 40,t6 > > 41,t1 > > 42,t4 > > 43,t4 > > 44,t6 > > 45,t6 > > 46,t7 > > 47,t7 > > 48,t3 > > 49,t1 > > 50,t6 > > 51,t1 > > 52,t4 > > 53,t6 > > 54,t7 > > 55,t1 > > 56,t1 > > 57,t1 > > 58,t5 > > 59,t6 > > 60,t7 > > 61,t6 > > 62,t4 > > 63,t5 > > 64,t1 > > 65,t3 > > 66,t1 > > 67,t3 > > 68,t3 > > 69,t5 > > 70,t1 > > 71,t6 > > 72,t5 > > 73,t6 > > 74,t1 > > 75,t7 > > 76,t5 > > 77,t3 > > 78,t1 > > 79,t4 > > 80,t3 > > 81,t6 > > 82,t2 > > 83,t6 > > 84,t2 > > 85,t4 > > 86,t7 > > 87,t4 > > 88,t6 > > 89,t5 > > 90,t6 > > 91,t4 > > 92,t3 > > 93,t4 > > 94,t6 > > 95,t2 > > 96,t2 > > 97,t7 > > 98,t4 > > 99,t3 > > 100,t3 > > > > <https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81 > c2#output> > > > > But got the following output with duplicate event_ids between windows : > > > > Output > > > > t6 1 > > t1 2 > > t7 3 > > t5 4 > > t5 4,5 > > t6 6 > > t6 6,7 > > t4 8 > > t6 9 > > t7 10 > > t6 9,11 > > t5 12 > > t6 13 > > t4 14 > > t4 14,15 > > t2 16 > > t7 17 > > t6 18 > > t3 19 > > t7 20 > > t1 21 > > t5 22 > > t5 22,23 > > t6 24 > > t6 24,25 > > t4 26 > > t4 26,27 > > t3 28 > > t2 29 > > t5 30 > > t1 31 > > t1 32 > > t1 32,33 > > t1 32,33,34 > > t2 35 > > t4 36 > > t3 37 > > t3 37,38 > > t6 39 > > t6 39,40 > > t1 41 > > t4 42 > > t4 42,43 > > t6 44 > > t6 44,45 > > t7 46 > > t7 46,47 > > t3 48 > > t1 49 > > t6 50 > > t1 49,51 > > t4 52 > > t6 53 > > t7 54 > > t1 55 > > t1 56 > > t1 56,57 > > t5 58 > > t6 59 > > t7 60 > > t6 59,61 > > t4 62 > > t5 63 > > t1 64 > > t3 65 > > t1 66 > > t3 67 > > t3 67,68 > > t5 69 > > t1 70 > > t6 71 > > t5 72 > > t6 73 > > t1 74 > > t7 75 > > t5 76 > > t3 77 > > t1 78 > > t4 79 > > t3 80 > > t6 81 > > t2 82 > > t6 83 > > t2 82,84 > > t4 85 > > t7 86 > > t4 87 > > t6 88 > > t5 89 > > t6 90 > > t4 91 > > t3 92 > > t4 93 > > t6 94 > > t2 95 > > t2 96 > > t7 97 > > t4 98 > > t3 99 > > t3 99,100 > > > > > > > > Since I am using non-overlapping gap-less windows in kstream processing > > dsl, the correct ouput should NOT contain duplicate event ids between > > windows. Any ideas why the duplicates ? ( Link for the debug project: > > https://github.com/westec/ks-aggregate-debug ) > > > > Appreciate for your help! > > > > Regards, > > EC > > > > > > > > > > > > > -- -- Guozhang