Hi Kostas, Yes, that's the case. I will revert back to 1.0.3 until the bug is fixed. Thank you.
Best, Yassine On Fri, Aug 12, 2016 at 10:34 AM, Kostas Kloudas < k.klou...@data-artisans.com> wrote: > Hi Yassine, > > Are you reading from a file and use ingestion time? > If yes, then the problem can be related to this: > > https://issues.apache.org/jira/browse/FLINK-4329 > > Is this the case? > > Best, > Kostas > > On Aug 12, 2016, at 10:30 AM, Yassine Marzougui <yassmar...@gmail.com> > wrote: > > Hi all, > > The following code works under Flink 1.0.3, but under 1.1.1 it just > switches to FINISHED and doesn't output any result. > > stream.map(new RichMapFunction<String, Request>() { > > private ObjectMapper objectMapper; > > @Override > public void open(Configuration parameters) { > objectMapper = new ObjectMapper(); > } > > @Override > public Request map(String value) throws Exception { > return objectMapper.readValue(value, Request.class); > } > > }) > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() > { > @Override > public long extractAscendingTimestamp(Request req) { > return req.ts; > } > }) > .map((Request req) -> new Tuple3<String, String, Integer>(req.userId, > req.location, 1)) > .keyBy(0) > .timeWindow(Time.minutes(10)) > .apply( > (Tuple3<String, String, Integer> x, Tuple3<String, String, > Integer> y) -> y, > (Tuple key, TimeWindow w, Iterable<Tuple3<String, String, > Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> { > Tuple3<String, String, Integer> res = > itrbl.iterator().next(); > clctr.collect(new Tuple2<>(res.f1, res.f2)); > }) > .print(); > > The problem is with the window operator because I could print results > before it. > > Best, > Yassine > > >