Hi Radu,

I'm sorry for the delayed response.
I'm not sure what the purpose of DataStream.global() actually is. I've
opened a JIRA to document or remove it:
https://issues.apache.org/jira/browse/FLINK-3240.

For getting the final metrics, you can just call ".timeWindowAll()",
without a ".global()" call before. The timeWindowAll() will run with a
parallelism of one, hence it will receive the data from all partitions.

Regards,
Robert





On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoran <radu.tudo...@huawei.com>
wrote:

> Hi,
>
> I am trying to compute some final statistics over a stream topology. For
> this I would like to gather all data from all windows and parallel
> partitions into a single/global window. Could you suggest a solution for
> this. I saw that the map function has a ".global()" but I end up with the
> same number of partitions as I have in the main computation. Bellow you can
> find a schema for the program:
>
>
> DataStream stream = env.Read...
>
> end.setParallelism(10);
> //Compute phase
>         DataStream<Tuple...> result = stream.keyBy(_).window(_).apply();
> //end compute phase
>
>
> //get the metrics
>         result.map(//extract some of the Tuple
> fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1,
> TimeUnit.SECONDS))
>                 .trigger(EventTimeTrigger.create()).apply ().writeAsText();
>
>
> For this last function - I would expect that even if I had parallel
> computation during the compute phase, I can select part of the events from
> all partitions and gather all these into one unique window. However, I do
> not seem to be successful in this.
> I also tried by applying a keyBy() to the result stream in which I
> assigned the same hash to any event, but the result remains the same.
> result.map((//extract some of the Tuple fields).keyBy(
> new KeySelector<Tuple2<Long,Long>, Integer>() {
>                         @Override
>                         public Integer getKey(Tuple2<Long, Long> arg0)
> throws Exception {
>
>                                 return 1;
>                         }
>                         @Override
>                         public int hashCode() {
>
>                                 return 1;
>                         }
>
>                 }). timeWindowAll().apply()
>
>
> Thanks for the help/ideas
>
>
>
>

Reply via email to