Hi Yassine, Are you reading from a file and use ingestion time? If yes, then the problem can be related to this:
https://issues.apache.org/jira/browse/FLINK-4329 <https://issues.apache.org/jira/browse/FLINK-4329> Is this the case? Best, Kostas > On Aug 12, 2016, at 10:30 AM, Yassine Marzougui <yassmar...@gmail.com> wrote: > > Hi all, > > The following code works under Flink 1.0.3, but under 1.1.1 it just switches > to FINISHED and doesn't output any result. > > stream.map(new RichMapFunction<String, Request>() { > > private ObjectMapper objectMapper; > > @Override > public void open(Configuration parameters) { > objectMapper = new ObjectMapper(); > } > > @Override > public Request map(String value) throws Exception { > return objectMapper.readValue(value, Request.class); > } > > }) > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() > { > @Override > public long extractAscendingTimestamp(Request req) { > return req.ts; > } > }) > .map((Request req) -> new Tuple3<String, String, Integer>(req.userId, > req.location, 1)) > .keyBy(0) > .timeWindow(Time.minutes(10)) > .apply( > (Tuple3<String, String, Integer> x, Tuple3<String, String, > Integer> y) -> y, > (Tuple key, TimeWindow w, Iterable<Tuple3<String, String, > Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> { > Tuple3<String, String, Integer> res = itrbl.iterator().next(); > clctr.collect(new Tuple2<>(res.f1, res.f2)); > }) > .print(); > > The problem is with the window operator because I could print results before > it. > > Best, > Yassine