Few quick checks: - Do you properly set the parallelism? - If you start 640 tasks (parallelism), and you use the same key for everything, that behaves like parallelism 1 (Piotr mentioned this)
- Do you use the RocksDB state backend? If yes, try the FsStateBackend. It looks like your state data type object (CollectiveData) is very expensive to serialize and for RocksDB, you get a back and forth serialization (off-heap => on-heap, compute, on-heap => off-heap) On Thu, Mar 1, 2018 at 4:32 PM, Supun Kamburugamuve <supu...@gmail.com> wrote: > Yes, the program runs fine, I can see it on the UI. Sorry, didn't include > the part where the execute is called. > > Thanks, > Supun.. > > On Thu, Mar 1, 2018 at 10:27 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Are you sure the program is doing anything at all? >> Do you call execute() on the StreamExecutionEnvironment? >> >> 2018-03-01 15:55 GMT+01:00 Supun Kamburugamuve <supu...@gmail.com>: >> >>> Thanks Piotrek, >>> >>> I did it this way on purpose to see how Flink performs. With 128000 >>> messages it takes an un-reasonable amount of time for Flink to complete the >>> operation. With another framework the same operation completes in about 70 >>> seconds for 1000 messages of size 128000, while Flink takes hours. >>> >>> Thanks, >>> Supun.. >>> >>> On Thu, Mar 1, 2018 at 3:58 AM, Piotr Nowojski <pi...@data-artisans.com> >>> wrote: >>> >>>> Hi, >>>> >>>> First of all learn about what’s going with your job: check the status >>>> of the machines, cpu/network usage on the cluster. If CPU is not ~100%, >>>> analyse what is preventing the machines to work faster (network bottleneck, >>>> locking, blocking operations etc). If CPU is ~100%, profile the >>>> TaskManagers to see what can you speed up. >>>> >>>> In your example couple of questions: >>>> - you create CollectiveData instances with size 128000 by default. >>>> Doesn’t it mean that your records are gigantic? I can not tell, since you >>>> didn’t provide full code. >>>> - you are mapping the data to new Tuple2<Integer, CollectiveData>(0, >>>> s); and then keying by the first field, which is always 0. Probably >>>> all of the records are ending up on one single machine >>>> >>>> Piotrek >>>> >>>> On 28 Feb 2018, at 17:20, Supun Kamburugamuve <supu...@gmail.com> >>>> wrote: >>>> >>>> Hi, >>>> >>>> I'm trying to run a simple benchmark on Flink streaming reduce. It >>>> seems it is very slow. Could you let me know if I'm doing something wrong. >>>> >>>> Here is the program. I'm running this on 32 nodes with 20 tasks in each >>>> node. So the parallelism is at 640. >>>> >>>> public class StreamingReduce { >>>> int size; >>>> int iterations; >>>> StreamExecutionEnvironment env; >>>> String outFile; >>>> >>>> public StreamingReduce(int size, int iterations, >>>> StreamExecutionEnvironment env, String outFile) { >>>> this.size = size; >>>> this.iterations = iterations; >>>> this.env = env; >>>> this.outFile = outFile; >>>> } >>>> >>>> public void execute() { >>>> DataStream<CollectiveData> stringStream = env.addSource(new >>>> RichParallelSourceFunction<CollectiveData>() { >>>> int i = 1; >>>> int count = 0; >>>> int size = 0; >>>> int iterations = 10000; >>>> >>>> @Override >>>> public void open(Configuration parameters) throws Exception { >>>> super.open(parameters); >>>> ParameterTool p = (ParameterTool) >>>> >>>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); >>>> size = p.getInt("size", 128000); >>>> iterations = p.getInt("itr", 10000); >>>> System.out.println("6666 iterations: " + iterations + " size: " + >>>> size); >>>> } >>>> >>>> @Override >>>> public void run(SourceContext<CollectiveData> sourceContext) throws >>>> Exception { >>>> while (count < iterations) { >>>> CollectiveData i = new CollectiveData(size); >>>> sourceContext.collect(i); >>>> count++; >>>> } >>>> } >>>> >>>> @Override >>>> public void cancel() { >>>> } >>>> }); >>>> >>>> stringStream.map(new RichMapFunction<CollectiveData, Tuple2<Integer, >>>> CollectiveData>>() { >>>> @Override >>>> public Tuple2<Integer, CollectiveData> map(CollectiveData s) throws >>>> Exception { >>>> return new Tuple2<Integer, CollectiveData>(0, s); >>>> } >>>> }).keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, >>>> CollectiveData>>() { >>>> @Override >>>> public Tuple2<Integer, CollectiveData> reduce(Tuple2<Integer, >>>> CollectiveData> c1, >>>> Tuple2<Integer, >>>> CollectiveData> c2) throws Exception { >>>> return new Tuple2<Integer, CollectiveData>(0, add(c1.f1, c2.f1)); >>>> } >>>> }).addSink(new RichSinkFunction<Tuple2<Integer,CollectiveData>>() { >>>> long start; >>>> int count = 0; >>>> int iterations; >>>> >>>> @Override >>>> public void open(Configuration parameters) throws Exception { >>>> super.open(parameters); >>>> ParameterTool p = (ParameterTool) >>>> >>>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); >>>> iterations = p.getInt("itr", 10000); >>>> System.out.println("7777 iterations: " + iterations); >>>> } >>>> >>>> @Override >>>> public void invoke(Tuple2<Integer, CollectiveData> >>>> integerStringTuple2) throws Exception { >>>> if (count == 0) { >>>> start = System.nanoTime(); >>>> } >>>> count++; >>>> if (count >= iterations) { >>>> System.out.println("Final: " + count + " " + (System.nanoTime() >>>> - start) / 1000000 + " " + (integerStringTuple2.f1)); >>>> } >>>> } >>>> }); >>>> >>>> } >>>> >>>> private static CollectiveData add(CollectiveData i, CollectiveData j) { >>>> List<Integer> r= new ArrayList<>(); >>>> for (int k = 0; k < i.getList().size(); k++) { >>>> r.add((i.getList().get(k) + j.getList().get(k))); >>>> } >>>> return new CollectiveData(r); >>>> } >>>> } >>>> >>>> Thanks, >>>> Supun.. >>>> >>>> >>>> >>>> >>> >>> >>> -- >>> Supun Kamburugamuve >>> Member, Apache Software Foundation; http://www.apache.org >>> E-mail: supun@apache.o <supu...@gmail.com>rg; Mobile: +1 812 219 2563 >>> <(812)%20219-2563> >>> >>> >>> >> > > > -- > Supun Kamburugamuve > Member, Apache Software Foundation; http://www.apache.org > E-mail: supun@apache.o <supu...@gmail.com>rg; Mobile: +1 812 219 2563 > <+1%20812-219-2563> > > >