Hi,

Thanks for the response.

1) regarding the JIRA issue related to the .global and .forward functions – I 
believe it is a good idea to be removed as they are confusing. Actually, they 
are totally missing from the documentation webpage
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows
which makes things even more confusing regarding what is their 
role/capabilities.

2) regarding the ".timeWindowAll()", it’s behavior is not as one / I would 
expect. I am not sure if this behavior is intentional or there is an error. I 
would expect as mentioned in my initial email that even if on the previous 
operators I have a parallelism of N, using this function I can get a 
parallelism of 1 in which I can aggregate the data from the previous operators. 
However, it is not really the case. More specifically, it is not the case when 
you execute this function in a cluster with more machine (on the other hand it 
works ok for the local case!).  It turns out that the parallelism degree is 
kept when being run in the cluster (and it is to guess I would say the function 
is executed round-robin over the executors). So if you use this function to 
aggregate all data in one place you will end up aggregating it over multiple 
parallel instances. I am attaching bellow a dummy piece of code to exemplify.

The function reads events from a network socket and multiplies these events 
based on the parallelism degree. The stream is partitioned based on a key. This 
is followed by the “main computation” that is run in parallel  and finally by 
an aggregation part.  For this aggregation part I use as suggested 
“.timeWindowAll()”. Assume that this aggregation function counts the events  
processed in the system across all instances and prints/logs this data.
For example if you run this with a parallelism degree of 10 – you end up with 
outputs from the timeWindowAll() across all instances in the cluster. A sample 
output is shown below. This shows that despite that the function should be 
executed with parallelism 1, actually it is not – so it cannot aggregate the 
data into one place…  Is this actually the intended behavior (case in which it 
would be interested to understand what is the target scenario) or is there an 
error?

Machine1:                                (values in the file)
/tmp/testoutput/1   (10)
/tmp/testoutput/2   (20)

Machine2:
/tmp/testoutput/6   (10)
/tmp/testoutput/4
/tmp/testoutput/7   (40)

Machine3:
/tmp/testoutput/5
/tmp/testoutput/3


……………



public static void main(String[] args) throws Exception {

              final StreamExecutionEnvironment env = StreamExecutionEnvironment
                           .getExecutionEnvironment();

              final int parallelism = 10;
              env.setParallelism(parallelism);

              DataStream<Tuple2<Integer, String>> inputStream = 
env.socketTextStream(
                           hostIP, port2use, '\n').flatMap(
                           new FlatMapFunction<String, Tuple2<Integer, 
String>>() {

                                  @Override
                                  public void flatMap(String arg0,
                                                Collector<Tuple2<Integer, 
String>> arg1)
                                                throws Exception {

                                         for (int i = 0; i < parallelism; i++)
                                                arg1.collect(new Tuple2(i, 
arg0));
                                  }

                           });

              DataStream<Tuple2<Integer, Integer>> result = inputStream
                           .keyBy(0)
                           .timeWindow(Time.of(2, TimeUnit.SECONDS))
                           .apply(new WindowFunction<Tuple2<Integer, String>, 
Tuple2<Integer, Integer>, Tuple, TimeWindow>() {
                                  public void apply(
                                                Tuple arg0,
                                                TimeWindow arg1,
                                                
java.lang.Iterable<org.apache.flink.api.java.tuple.Tuple2<Integer, String>> 
arg2,
                                                
org.apache.flink.util.Collector<org.apache.flink.api.java.tuple.Tuple2<Integer, 
Integer>> arg3)
                                                throws Exception {

                                         // Compuatation ....
                                         int count = 0;
                                         for (Tuple2<Integer, String> value : 
arg2) {
                                                count++;
                                                arg3.collect(new 
Tuple2<Integer, Integer>(value.f0,
                                                              
value.f1.length()));
                                         }
                                         //System.out.println("Count per hash 
is " + count);
                                  };

                           });

              result.timeWindowAll(Time.of(2, TimeUnit.SECONDS))
                           .apply(new AllWindowFunction<Tuple2<Integer, 
Integer>, Tuple1<Integer>, TimeWindow>() {
                                  @Override
                                  public void apply(TimeWindow arg0,
                                                Iterable<Tuple2<Integer, 
Integer>> arg1,
                                                Collector<Tuple1<Integer>> 
arg2) throws Exception {

                                         // Compuatation ....
                                         int count = 0;
                                         for (Tuple2<Integer, Integer> value : 
arg1) {
                                                count++;
                                         }
                                         //System.out.println("Count aggregated 
metrics is "
                                         //            + count + " at " + 
System.currentTimeMillis());
                                         arg2.collect(new Tuple1(count));

                                  }
                           }).setParallelism(1)
                           .writeAsText("/tmp/testoutput", WriteMode.OVERWRITE);

              env.execute("main stream application");

       }



Regards,


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: Friday, January 15, 2016 10:18 AM
To: user@flink.apache.org
Subject: Re: global function over partitions

Hi Radu,

I'm sorry for the delayed response.
I'm not sure what the purpose of DataStream.global() actually is. I've opened a 
JIRA to document or remove it: https://issues.apache.org/jira/browse/FLINK-3240.

For getting the final metrics, you can just call ".timeWindowAll()", without a 
".global()" call before. The timeWindowAll() will run with a parallelism of 
one, hence it will receive the data from all partitions.

Regards,
Robert





On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoran 
<radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>> wrote:
Hi,

I am trying to compute some final statistics over a stream topology. For this I 
would like to gather all data from all windows and parallel partitions into a 
single/global window. Could you suggest a solution for this. I saw that the map 
function has a ".global()" but I end up with the same number of partitions as I 
have in the main computation. Bellow you can find a schema for the program:


DataStream stream = env.Read...

end.setParallelism(10);
//Compute phase
        DataStream<Tuple...> result = stream.keyBy(_).window(_).apply();
//end compute phase


//get the metrics
        result.map(//extract some of the Tuple 
fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, 
TimeUnit.SECONDS))
                .trigger(EventTimeTrigger.create()).apply ().writeAsText();


For this last function - I would expect that even if I had parallel computation 
during the compute phase, I can select part of the events from all partitions 
and gather all these into one unique window. However, I do not seem to be 
successful in this.
I also tried by applying a keyBy() to the result stream in which I assigned the 
same hash to any event, but the result remains the same.
result.map((//extract some of the Tuple fields).keyBy(
new KeySelector<Tuple2<Long,Long>, Integer>() {
                        @Override
                        public Integer getKey(Tuple2<Long, Long> arg0) throws 
Exception {

                                return 1;
                        }
                        @Override
                        public int hashCode() {

                                return 1;
                        }

                }). timeWindowAll().apply()


Thanks for the help/ideas



Reply via email to