Hi, could you please go into more detail about the input and what the expected output is. And then also what the output is with both apply() and reduce()?
With this we might be able to figure it out together. Cheers, Aljoscha On Mon, 24 Oct 2016 at 18:11 Sendoh <unicorn.bana...@gmail.com> wrote: > Hi Flink users, > > I saw a strange behavior that data are missing in reduce() but apply() > doesn't, and when using 1.0.3 we don't see this behavior, and we see this > in > 1.1.3. Losing data means we don't see any event in the keys assigned, not > the count of events. > > The code is as follows. > > DataStream<Map<String, Object>> streams = env.addSource(new > FlinkKafkaConsumer09<>(topicList, new SimpleStringSchema(), properties)) > .name("kafka_topics") > .rebalance() > .flatMap(new Eventsmap(events)) > .assignTimestampsAndWatermarks(new EventWatermark()); > > DataStream<Map<String, Object>> count = streams > .keyBy(new > CompoundJsonKeySelector()).timeWindow(Time.minutes(1)) > .allowedLateness(Time.minutes(3)) > // apply is ok > // .apply(new WindowFunction<Map<String, Object>, > Map<String, Object>, String, TimeWindow>() { > // @Override > // public void apply(String s, TimeWindow > timeWindow, Iterable<Map<String, Object>> iterable, > Collector<Map<String, Object>> collector) throws Exception { > // Iterator<Map<String, Object>> it = > iterable.iterator(); > // collector.collect(it.next()); > // } > // } > // ); > // reduce() loses data > .reduce(new ReduceFunction<Map<String, Object>>() { > @Override > public Map<String, Object> reduce(Map<String, Object> > v1, Map<String, Object> v2) throws Exception { > int newCount = > Integer.parseInt(v1.get("count").toString()) + > Integer.parseInt(v2.get("count").toString()); > v2.put("count",newCount); > return v2; > } > }); > > Best, > > Is there any suggestion that we can try to > figure out the root cause? > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/missing-data-in-window-reduce-while-apply-is-ok-tp9689.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >