Hi all, The following code works under Flink 1.0.3, but under 1.1.1 it just switches to FINISHED and doesn't output any result.
stream.map(new RichMapFunction<String, Request>() { private ObjectMapper objectMapper; @Override public void open(Configuration parameters) { objectMapper = new ObjectMapper(); } @Override public Request map(String value) throws Exception { return objectMapper.readValue(value, Request.class); } }) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() { @Override public long extractAscendingTimestamp(Request req) { return req.ts; } }) .map((Request req) -> new Tuple3<String, String, Integer>(req.userId, req.location, 1)) .keyBy(0) .timeWindow(Time.minutes(10)) .apply( (Tuple3<String, String, Integer> x, Tuple3<String, String, Integer> y) -> y, (Tuple key, TimeWindow w, Iterable<Tuple3<String, String, Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> { Tuple3<String, String, Integer> res = itrbl.iterator().next(); clctr.collect(new Tuple2<>(res.f1, res.f2)); }) .print(); The problem is with the window operator because I could print results before it. Best, Yassine