Hi Yassine,

Are you reading from a file and use ingestion time?
If yes, then the problem can be related to this:

https://issues.apache.org/jira/browse/FLINK-4329 
<https://issues.apache.org/jira/browse/FLINK-4329>

Is this the case?

Best,
Kostas

> On Aug 12, 2016, at 10:30 AM, Yassine Marzougui <yassmar...@gmail.com> wrote:
> 
> Hi all,
> 
> The following code works under Flink 1.0.3, but under 1.1.1 it just switches 
> to FINISHED and doesn't output any result.
> 
> stream.map(new RichMapFunction<String, Request>() {
> 
>         private ObjectMapper objectMapper;
> 
>         @Override
>         public void open(Configuration parameters) {
>             objectMapper = new ObjectMapper();
>         }
> 
>         @Override
>         public Request map(String value) throws Exception {
>             return objectMapper.readValue(value, Request.class);
>         }
> 
>     })
>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() 
> {
>         @Override
>         public long extractAscendingTimestamp(Request req) {
>             return req.ts;
>         }
>     })
>     .map((Request req) -> new Tuple3<String, String, Integer>(req.userId, 
> req.location, 1))
>     .keyBy(0)
>     .timeWindow(Time.minutes(10))
>     .apply(
>             (Tuple3<String, String, Integer> x, Tuple3<String, String, 
> Integer> y) -> y,
>             (Tuple key, TimeWindow w, Iterable<Tuple3<String, String, 
> Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> {
>                 Tuple3<String, String, Integer> res = itrbl.iterator().next();
>                 clctr.collect(new Tuple2<>(res.f1, res.f2));
>             })
>     .print();
> 
> The problem is with the window operator because I could print results before 
> it.
> 
> Best,
> Yassine

Reply via email to