Thanks for your feedback!

Sorry, effectively I used reductionGroup, but that gives different results
when I change the parallelism to 8 (more than 1) and the true results are
with Parallelism 1 and I want to set it to 8.

I do not know how do to have the same result by modifying the parallelism
using reduceGroup.

Thank you in advance !

Anissa

Le jeu. 22 août 2019 à 10:29, Fabian Hueske <fhue...@gmail.com> a écrit :

> Hi Anissa,
>
> Are you using combineGroup or reduceGroup?
> Your question refers to combineGroup, but the code only shows reduceGroup.
>
> combineGroup is non-deterministic by design to enable efficient partial
> results without network and disk IO.
> reduceGroup is deterministic given a deterministic key extractor and
> deterministic GroupReduceFunction.
>
> Hope this helps,
> Fabian
>
> Am Di., 20. Aug. 2019 um 14:21 Uhr schrieb anissa moussaoui <
> anissa.moussa...@dcbrain.com>:
>
>> Hi,
>>
>> I used the combineGroup function to reduce groups of a very large
>> dataset. By modifying the parallelism to 1 I have a different results with
>> a parallelism to 8, Knowing that the good results are those obtained with
>> the parallelism with 1.
>>
>> I also used table api to group dataset and select sum of column, i also
>> had the same result with parallelism to 8 and combineGroup.
>>
>> My combineGroup function :
>>
>> public class SumGroup extends Resampling implements
>> GroupReduceFunction<Row, Row>{
>>
>>       @Override
>>        public void reduce(Iterable<Row> values, Collector<Row> out)
>> throws Exception {
>>              Iterator<Row> itr = values.iterator();
>>              double sum = 0;
>>              Row row = null;
>>              while(itr.hasNext()) {
>>                          row = itr.next();
>>                          sum = sum + (double) row.getField(indexField);
>>              }
>>              row.setField(indexField, sum);
>>             out.collect(row);
>>       }
>>    }
>> I call this function like this :
>> DataSet<Row> ds = env.toDataSet(inputTable(env), Row.class)
>>                       .groupBy(groupFields.stream().toArray(String[] ::
>> new))
>>                              .sortGroup(sortField, Order.ASCENDING)
>>                                     .reduceGroup(new
>> SumGroup()).returns(getOutputType());
>>
>> results with parallelism to 1 :
>> 24000 cluster 1 28/02/2017 06:00:00
>> 21000 cluster 1 31/03/2017 06:00:00
>> 15000 cluster 1 30/04/2017 06:00:00
>> 10000 cluster 1 31/05/2017 06:00:00
>>
>> results with parallelism to 8 :
>> 22000 cluster 1 28/02/2017 06:00:00
>> 4350 cluster 1 31/03/2017 06:00:00
>> 14000 cluster 1 30/04/2017 06:00:00
>> 2256 cluster 1 31/05/2017 06:00:00
>> I do not know how to make no difference with the modification of
>> parallelism and have same results with parallelism to 1.
>>
>> can someone have any idea to help me please ?
>>
>> thank you in advance !!
>>
>> Anissa
>>
>>

Reply via email to