Thanks for your help. For sliding windows the changelog as output behaviour is as expected. But for non-overlapping windows most of the use cases expect micro-batch semantics ( no intermediate changelog as output, only final aggregation in the window). Any examples for reference to implement micro-batch style window ?
Thanks, EC On Tue, Jun 5, 2018 at 2:23 AM, John Roesler <j...@confluent.io> wrote: > Hi EC, > > Thanks for the very clear report and question. Like Guozhang said this is > expected (but not ideal) behavior. > > For an immediate work-around, you can try materializing the KTable and > setting the commit interval and cache size as discussed here ( > https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ > ) > to reduce (but not eliminate) duplicates. > > I'm in the process of getting my thoughts in order to write a KIP to > address this exact use case. If you're interested in participating in the > discussion, you can keep an eye on the dev mailing list or watch the KIP > page. I can't say when exactly I'll start it. I want to get it out there > soon, but I also want to do my homework and have a good proposal. > > Thanks, > -John > > On Mon, Jun 4, 2018 at 12:45 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello, > > > > Your observation is correct, Kafka Streams by default will print > continuous > > updates to each window, instead of waiting for the "final" update for > each > > window. > > > > There are some ongoing work to provide the functionality to allow users > > specify sth. like "give me the final result for windowed aggregations" in > > the DSL, it will probably come post 2.0 release. > > > > Guozhang > > > > > > On Mon, Jun 4, 2018 at 8:14 AM, EC Boost <ecboost2...@gmail.com> wrote: > > > > > Logged the internal windows information: > > > > > > Window{start=1528043030000, end=1528043040000} key=t6 1 > > > Window{start=1528043040000, end=1528043050000} key=t1 2 > > > Window{start=1528043040000, end=1528043050000} key=t7 3 > > > Window{start=1528043040000, end=1528043050000} key=t5 4 > > > Window{start=1528043040000, end=1528043050000} key=t5 4,5 > > > Window{start=1528043050000, end=1528043060000} key=t6 6 > > > Window{start=1528043050000, end=1528043060000} key=t6 6,7 > > > Window{start=1528043050000, end=1528043060000} key=t4 8 > > > Window{start=1528043060000, end=1528043070000} key=t6 9 > > > Window{start=1528043060000, end=1528043070000} key=t7 10 > > > Window{start=1528043060000, end=1528043070000} key=t6 9,11 > > > Window{start=1528043070000, end=1528043080000} key=t5 12 > > > Window{start=1528043070000, end=1528043080000} key=t6 13 > > > Window{start=1528043070000, end=1528043080000} key=t4 14 > > > Window{start=1528043070000, end=1528043080000} key=t4 14,15 > > > > > > .... > > > > > > It seems that Kafka Stream send all the KTable changelog as output and > > > that's probably why there's duplicate outputs for gap-less > > non-overlapping > > > window. > > > > > > Is there any way to achieve real mini-batch-like style processing > > semantics > > > using non-overlapping windows which means only the last value will be > > sent > > > as output not all the changelogs in the windows? > > > > > > > > > On Mon, Jun 4, 2018 at 1:25 AM, EC Boost <ecboost2...@gmail.com> > wrote: > > > > > > > Hello Everyone, > > > > > > > > I got duplicated results using kstreams for simple windowed > > aggregation. > > > > > > > > The input event format is comma seperated: "event_id,event_type" > and I > > > > need to aggregate them by event type. > > > > > > > > Following is the Kafka Stream processing logic: > > > > > > > > events > > > > .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0])) > > > > .groupByKey() > > > > .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10))) > > > > .aggregate( > > > > ArrayList::new, > > > > (type, id, eventList) -> { > > > > eventList.add(id); > > > > return eventList; > > > > }, > > > > Materialized.with(stringSerde, arraySerde) > > > > ) > > > > .toStream((k,v) -> k.key()) > > > > .mapValues((v)-> String.join(",", v)) > > > > .to("ks-debug-output", Produced.with(stringSerde, > stringSerde)); > > > > > > > > > > > > I produced the input messages using the following snippet: > > > > > > > > require "kafka" > > > > > > > > kafka = Kafka.new(["localhost:9092"], client_id: "event-producer") > > > > > > > > f = File.open("events.txt") > > > > f.each_line { |l| > > > > puts l > > > > kafka.deliver_message("#{l.strip}", topic: "ks-debug-input") > > > > sleep(3) > > > > } > > > > > > > > > > > > > > > > Messages in events.txt is the following ( format : > > "event_id,event_type" > > > > and event_id is unique ) : > > > > > > > > Input > > > > > > > > 1,t6 > > > > 2,t1 > > > > 3,t7 > > > > 4,t5 > > > > 5,t5 > > > > 6,t6 > > > > 7,t6 > > > > 8,t4 > > > > 9,t6 > > > > 10,t7 > > > > 11,t6 > > > > 12,t5 > > > > 13,t6 > > > > 14,t4 > > > > 15,t4 > > > > 16,t2 > > > > 17,t7 > > > > 18,t6 > > > > 19,t3 > > > > 20,t7 > > > > 21,t1 > > > > 22,t5 > > > > 23,t5 > > > > 24,t6 > > > > 25,t6 > > > > 26,t4 > > > > 27,t4 > > > > 28,t3 > > > > 29,t2 > > > > 30,t5 > > > > 31,t1 > > > > 32,t1 > > > > 33,t1 > > > > 34,t1 > > > > 35,t2 > > > > 36,t4 > > > > 37,t3 > > > > 38,t3 > > > > 39,t6 > > > > 40,t6 > > > > 41,t1 > > > > 42,t4 > > > > 43,t4 > > > > 44,t6 > > > > 45,t6 > > > > 46,t7 > > > > 47,t7 > > > > 48,t3 > > > > 49,t1 > > > > 50,t6 > > > > 51,t1 > > > > 52,t4 > > > > 53,t6 > > > > 54,t7 > > > > 55,t1 > > > > 56,t1 > > > > 57,t1 > > > > 58,t5 > > > > 59,t6 > > > > 60,t7 > > > > 61,t6 > > > > 62,t4 > > > > 63,t5 > > > > 64,t1 > > > > 65,t3 > > > > 66,t1 > > > > 67,t3 > > > > 68,t3 > > > > 69,t5 > > > > 70,t1 > > > > 71,t6 > > > > 72,t5 > > > > 73,t6 > > > > 74,t1 > > > > 75,t7 > > > > 76,t5 > > > > 77,t3 > > > > 78,t1 > > > > 79,t4 > > > > 80,t3 > > > > 81,t6 > > > > 82,t2 > > > > 83,t6 > > > > 84,t2 > > > > 85,t4 > > > > 86,t7 > > > > 87,t4 > > > > 88,t6 > > > > 89,t5 > > > > 90,t6 > > > > 91,t4 > > > > 92,t3 > > > > 93,t4 > > > > 94,t6 > > > > 95,t2 > > > > 96,t2 > > > > 97,t7 > > > > 98,t4 > > > > 99,t3 > > > > 100,t3 > > > > > > > > <https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81 > > > c2#output> > > > > > > > > But got the following output with duplicate event_ids between > windows : > > > > > > > > Output > > > > > > > > t6 1 > > > > t1 2 > > > > t7 3 > > > > t5 4 > > > > t5 4,5 > > > > t6 6 > > > > t6 6,7 > > > > t4 8 > > > > t6 9 > > > > t7 10 > > > > t6 9,11 > > > > t5 12 > > > > t6 13 > > > > t4 14 > > > > t4 14,15 > > > > t2 16 > > > > t7 17 > > > > t6 18 > > > > t3 19 > > > > t7 20 > > > > t1 21 > > > > t5 22 > > > > t5 22,23 > > > > t6 24 > > > > t6 24,25 > > > > t4 26 > > > > t4 26,27 > > > > t3 28 > > > > t2 29 > > > > t5 30 > > > > t1 31 > > > > t1 32 > > > > t1 32,33 > > > > t1 32,33,34 > > > > t2 35 > > > > t4 36 > > > > t3 37 > > > > t3 37,38 > > > > t6 39 > > > > t6 39,40 > > > > t1 41 > > > > t4 42 > > > > t4 42,43 > > > > t6 44 > > > > t6 44,45 > > > > t7 46 > > > > t7 46,47 > > > > t3 48 > > > > t1 49 > > > > t6 50 > > > > t1 49,51 > > > > t4 52 > > > > t6 53 > > > > t7 54 > > > > t1 55 > > > > t1 56 > > > > t1 56,57 > > > > t5 58 > > > > t6 59 > > > > t7 60 > > > > t6 59,61 > > > > t4 62 > > > > t5 63 > > > > t1 64 > > > > t3 65 > > > > t1 66 > > > > t3 67 > > > > t3 67,68 > > > > t5 69 > > > > t1 70 > > > > t6 71 > > > > t5 72 > > > > t6 73 > > > > t1 74 > > > > t7 75 > > > > t5 76 > > > > t3 77 > > > > t1 78 > > > > t4 79 > > > > t3 80 > > > > t6 81 > > > > t2 82 > > > > t6 83 > > > > t2 82,84 > > > > t4 85 > > > > t7 86 > > > > t4 87 > > > > t6 88 > > > > t5 89 > > > > t6 90 > > > > t4 91 > > > > t3 92 > > > > t4 93 > > > > t6 94 > > > > t2 95 > > > > t2 96 > > > > t7 97 > > > > t4 98 > > > > t3 99 > > > > t3 99,100 > > > > > > > > > > > > > > > > Since I am using non-overlapping gap-less windows in kstream > processing > > > > dsl, the correct ouput should NOT contain duplicate event ids between > > > > windows. Any ideas why the duplicates ? ( Link for the debug > > project: > > > > https://github.com/westec/ks-aggregate-debug ) > > > > > > > > Appreciate for your help! > > > > > > > > Regards, > > > > EC > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > >