Hi Anissa,

Are you using combineGroup or reduceGroup?
Your question refers to combineGroup, but the code only shows reduceGroup.

combineGroup is non-deterministic by design to enable efficient partial
results without network and disk IO.
reduceGroup is deterministic given a deterministic key extractor and
deterministic GroupReduceFunction.

Hope this helps,
Fabian

Am Di., 20. Aug. 2019 um 14:21 Uhr schrieb anissa moussaoui <
anissa.moussa...@dcbrain.com>:

> Hi,
>
> I used the combineGroup function to reduce groups of a very large dataset.
> By modifying the parallelism to 1 I have a different results with a
> parallelism to 8, Knowing that the good results are those obtained with the
> parallelism with 1.
>
> I also used table api to group dataset and select sum of column, i also
> had the same result with parallelism to 8 and combineGroup.
>
> My combineGroup function :
>
> public class SumGroup extends Resampling implements
> GroupReduceFunction<Row, Row>{
>
>       @Override
>        public void reduce(Iterable<Row> values, Collector<Row> out) throws
> Exception {
>              Iterator<Row> itr = values.iterator();
>              double sum = 0;
>              Row row = null;
>              while(itr.hasNext()) {
>                          row = itr.next();
>                          sum = sum + (double) row.getField(indexField);
>              }
>              row.setField(indexField, sum);
>             out.collect(row);
>       }
>    }
> I call this function like this :
> DataSet<Row> ds = env.toDataSet(inputTable(env), Row.class)
>                       .groupBy(groupFields.stream().toArray(String[] ::
> new))
>                              .sortGroup(sortField, Order.ASCENDING)
>                                     .reduceGroup(new
> SumGroup()).returns(getOutputType());
>
> results with parallelism to 1 :
> 24000 cluster 1 28/02/2017 06:00:00
> 21000 cluster 1 31/03/2017 06:00:00
> 15000 cluster 1 30/04/2017 06:00:00
> 10000 cluster 1 31/05/2017 06:00:00
>
> results with parallelism to 8 :
> 22000 cluster 1 28/02/2017 06:00:00
> 4350 cluster 1 31/03/2017 06:00:00
> 14000 cluster 1 30/04/2017 06:00:00
> 2256 cluster 1 31/05/2017 06:00:00
> I do not know how to make no difference with the modification of
> parallelism and have same results with parallelism to 1.
>
> can someone have any idea to help me please ?
>
> thank you in advance !!
>
> Anissa
>
>

Reply via email to