Thanks for your feedback! Sorry, effectively I used reductionGroup, but that gives different results when I change the parallelism to 8 (more than 1) and the true results are with Parallelism 1 and I want to set it to 8.
I do not know how do to have the same result by modifying the parallelism using reduceGroup. Thank you in advance ! Anissa Le jeu. 22 août 2019 à 10:29, Fabian Hueske <fhue...@gmail.com> a écrit : > Hi Anissa, > > Are you using combineGroup or reduceGroup? > Your question refers to combineGroup, but the code only shows reduceGroup. > > combineGroup is non-deterministic by design to enable efficient partial > results without network and disk IO. > reduceGroup is deterministic given a deterministic key extractor and > deterministic GroupReduceFunction. > > Hope this helps, > Fabian > > Am Di., 20. Aug. 2019 um 14:21 Uhr schrieb anissa moussaoui < > anissa.moussa...@dcbrain.com>: > >> Hi, >> >> I used the combineGroup function to reduce groups of a very large >> dataset. By modifying the parallelism to 1 I have a different results with >> a parallelism to 8, Knowing that the good results are those obtained with >> the parallelism with 1. >> >> I also used table api to group dataset and select sum of column, i also >> had the same result with parallelism to 8 and combineGroup. >> >> My combineGroup function : >> >> public class SumGroup extends Resampling implements >> GroupReduceFunction<Row, Row>{ >> >> @Override >> public void reduce(Iterable<Row> values, Collector<Row> out) >> throws Exception { >> Iterator<Row> itr = values.iterator(); >> double sum = 0; >> Row row = null; >> while(itr.hasNext()) { >> row = itr.next(); >> sum = sum + (double) row.getField(indexField); >> } >> row.setField(indexField, sum); >> out.collect(row); >> } >> } >> I call this function like this : >> DataSet<Row> ds = env.toDataSet(inputTable(env), Row.class) >> .groupBy(groupFields.stream().toArray(String[] :: >> new)) >> .sortGroup(sortField, Order.ASCENDING) >> .reduceGroup(new >> SumGroup()).returns(getOutputType()); >> >> results with parallelism to 1 : >> 24000 cluster 1 28/02/2017 06:00:00 >> 21000 cluster 1 31/03/2017 06:00:00 >> 15000 cluster 1 30/04/2017 06:00:00 >> 10000 cluster 1 31/05/2017 06:00:00 >> >> results with parallelism to 8 : >> 22000 cluster 1 28/02/2017 06:00:00 >> 4350 cluster 1 31/03/2017 06:00:00 >> 14000 cluster 1 30/04/2017 06:00:00 >> 2256 cluster 1 31/05/2017 06:00:00 >> I do not know how to make no difference with the modification of >> parallelism and have same results with parallelism to 1. >> >> can someone have any idea to help me please ? >> >> thank you in advance !! >> >> Anissa >> >>