Few quick checks:

  - Do you properly set the parallelism?
  - If you start 640 tasks (parallelism), and you use the same key for
everything, that behaves like parallelism 1 (Piotr mentioned this)

  - Do you use the RocksDB state backend? If yes, try the FsStateBackend.
It looks like your state data type object (CollectiveData) is very
expensive to serialize and for RocksDB, you get a back and forth
serialization (off-heap => on-heap, compute, on-heap => off-heap)

On Thu, Mar 1, 2018 at 4:32 PM, Supun Kamburugamuve <supu...@gmail.com>

> Yes, the program runs fine, I can see it on the UI. Sorry, didn't include
> the part where the execute is called.
> Thanks,
> Supun..
> On Thu, Mar 1, 2018 at 10:27 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>> Are you sure the program is doing anything at all?
>> Do you call execute() on the StreamExecutionEnvironment?
>> 2018-03-01 15:55 GMT+01:00 Supun Kamburugamuve <supu...@gmail.com>:
>>> Thanks Piotrek,
>>> I did it this way on purpose to see how Flink performs. With 128000
>>> messages it takes an un-reasonable amount of time for Flink to complete the
>>> operation. With another framework the same operation completes in about 70
>>> seconds for 1000 messages of size 128000, while Flink takes hours.
>>> Thanks,
>>> Supun..
>>> On Thu, Mar 1, 2018 at 3:58 AM, Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>>> Hi,
>>>> First of all learn about what’s going with your job: check the status
>>>> of the machines, cpu/network usage on the cluster. If CPU is not ~100%,
>>>> analyse what is preventing the machines to work faster (network bottleneck,
>>>> locking, blocking operations etc). If CPU is ~100%, profile the
>>>> TaskManagers to see what can you speed up.
>>>> In your example couple of questions:
>>>> - you create CollectiveData instances with size 128000 by default.
>>>> Doesn’t it mean that your records are gigantic? I can not tell, since you
>>>> didn’t provide full code.
>>>> - you are mapping the data to new Tuple2<Integer, CollectiveData>(0,
>>>> s);  and then keying by the first field, which is always 0. Probably
>>>> all of the records are ending up on one single machine
>>>> Piotrek
>>>> On 28 Feb 2018, at 17:20, Supun Kamburugamuve <supu...@gmail.com>
>>>> wrote:
>>>> Hi,
>>>> I'm trying to run a simple benchmark on Flink streaming reduce. It
>>>> seems it is very slow. Could you let me know if I'm doing something wrong.
>>>> Here is the program. I'm running this on 32 nodes with 20 tasks in each
>>>> node. So the parallelism is at 640.
>>>> public class StreamingReduce {
>>>>   int size;
>>>>   int iterations;
>>>>   StreamExecutionEnvironment env;
>>>>   String outFile;
>>>>   public StreamingReduce(int size, int iterations, 
>>>> StreamExecutionEnvironment env, String outFile) {
>>>>     this.size = size;
>>>>     this.iterations = iterations;
>>>>     this.env = env;
>>>>     this.outFile = outFile;
>>>>   }
>>>>   public void execute() {
>>>>     DataStream<CollectiveData> stringStream = env.addSource(new 
>>>> RichParallelSourceFunction<CollectiveData>() {
>>>>       int i = 1;
>>>>       int count = 0;
>>>>       int size = 0;
>>>>       int iterations = 10000;
>>>>       @Override
>>>>       public void open(Configuration parameters) throws Exception {
>>>>         super.open(parameters);
>>>>         ParameterTool p = (ParameterTool)
>>>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>>>         size = p.getInt("size", 128000);
>>>>         iterations = p.getInt("itr", 10000);
>>>>         System.out.println("6666 iterations: " + iterations + " size: " + 
>>>> size);
>>>>       }
>>>>       @Override
>>>>       public void run(SourceContext<CollectiveData> sourceContext) throws 
>>>> Exception {
>>>>         while (count < iterations) {
>>>>           CollectiveData i = new CollectiveData(size);
>>>>           sourceContext.collect(i);
>>>>           count++;
>>>>         }
>>>>       }
>>>>       @Override
>>>>       public void cancel() {
>>>>       }
>>>>     });
>>>>     stringStream.map(new RichMapFunction<CollectiveData, Tuple2<Integer, 
>>>> CollectiveData>>() {
>>>>       @Override
>>>>       public Tuple2<Integer, CollectiveData> map(CollectiveData s) throws 
>>>> Exception {
>>>>         return new Tuple2<Integer, CollectiveData>(0, s);
>>>>       }
>>>>     }).keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, 
>>>> CollectiveData>>() {
>>>>       @Override
>>>>       public Tuple2<Integer, CollectiveData> reduce(Tuple2<Integer, 
>>>> CollectiveData> c1,
>>>>                                                     Tuple2<Integer, 
>>>> CollectiveData> c2) throws Exception {
>>>>         return new Tuple2<Integer, CollectiveData>(0, add(c1.f1, c2.f1));
>>>>       }
>>>>     }).addSink(new RichSinkFunction<Tuple2<Integer,CollectiveData>>() {
>>>>       long start;
>>>>       int count = 0;
>>>>       int iterations;
>>>>       @Override
>>>>       public void open(Configuration parameters) throws Exception {
>>>>         super.open(parameters);
>>>>         ParameterTool p = (ParameterTool)
>>>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>>>         iterations = p.getInt("itr", 10000);
>>>>         System.out.println("7777 iterations: " + iterations);
>>>>       }
>>>>       @Override
>>>>       public void invoke(Tuple2<Integer, CollectiveData> 
>>>> integerStringTuple2) throws Exception {
>>>>         if (count == 0) {
>>>>           start = System.nanoTime();
>>>>         }
>>>>         count++;
>>>>         if (count >= iterations) {
>>>>           System.out.println("Final: " + count + " " + (System.nanoTime() 
>>>> - start) / 1000000 + " " + (integerStringTuple2.f1));
>>>>         }
>>>>       }
>>>>     });
>>>>   }
>>>>   private static CollectiveData add(CollectiveData i, CollectiveData j) {
>>>>     List<Integer> r= new ArrayList<>();
>>>>     for (int k = 0; k < i.getList().size(); k++) {
>>>>       r.add((i.getList().get(k) + j.getList().get(k)));
>>>>     }
>>>>     return new CollectiveData(r);
>>>>   }
>>>> }
>>>> Thanks,
>>>> Supun..
>>> --
>>> Supun Kamburugamuve
>>> Member, Apache Software Foundation; http://www.apache.org
>>> E-mail: supun@apache.o <supu...@gmail.com>rg;  Mobile: +1 812 219 2563
>>> <(812)%20219-2563>
> --
> Supun Kamburugamuve
> Member, Apache Software Foundation; http://www.apache.org
> E-mail: supun@apache.o <supu...@gmail.com>rg;  Mobile: +1 812 219 2563
> <+1%20812-219-2563>

Reply via email to