Hi Anissa, Are you using combineGroup or reduceGroup? Your question refers to combineGroup, but the code only shows reduceGroup.
combineGroup is non-deterministic by design to enable efficient partial results without network and disk IO. reduceGroup is deterministic given a deterministic key extractor and deterministic GroupReduceFunction. Hope this helps, Fabian Am Di., 20. Aug. 2019 um 14:21 Uhr schrieb anissa moussaoui < anissa.moussa...@dcbrain.com>: > Hi, > > I used the combineGroup function to reduce groups of a very large dataset. > By modifying the parallelism to 1 I have a different results with a > parallelism to 8, Knowing that the good results are those obtained with the > parallelism with 1. > > I also used table api to group dataset and select sum of column, i also > had the same result with parallelism to 8 and combineGroup. > > My combineGroup function : > > public class SumGroup extends Resampling implements > GroupReduceFunction<Row, Row>{ > > @Override > public void reduce(Iterable<Row> values, Collector<Row> out) throws > Exception { > Iterator<Row> itr = values.iterator(); > double sum = 0; > Row row = null; > while(itr.hasNext()) { > row = itr.next(); > sum = sum + (double) row.getField(indexField); > } > row.setField(indexField, sum); > out.collect(row); > } > } > I call this function like this : > DataSet<Row> ds = env.toDataSet(inputTable(env), Row.class) > .groupBy(groupFields.stream().toArray(String[] :: > new)) > .sortGroup(sortField, Order.ASCENDING) > .reduceGroup(new > SumGroup()).returns(getOutputType()); > > results with parallelism to 1 : > 24000 cluster 1 28/02/2017 06:00:00 > 21000 cluster 1 31/03/2017 06:00:00 > 15000 cluster 1 30/04/2017 06:00:00 > 10000 cluster 1 31/05/2017 06:00:00 > > results with parallelism to 8 : > 22000 cluster 1 28/02/2017 06:00:00 > 4350 cluster 1 31/03/2017 06:00:00 > 14000 cluster 1 30/04/2017 06:00:00 > 2256 cluster 1 31/05/2017 06:00:00 > I do not know how to make no difference with the modification of > parallelism and have same results with parallelism to 1. > > can someone have any idea to help me please ? > > thank you in advance !! > > Anissa > >