Hi, I was using primitive types, and EnableObjectReuse was turned on. My next move was to turn it off, and it did solved the problem. It also increased execution time by 10%, but it’s hard to say if this overhead is due to the copy or to the change of behavior of the reduceGroup algorithm once it get the right data.
Since I never modify my objects, why object reuse isn’t working ? Best regards, Arnaud De : Till Rohrmann [mailto:trohrm...@apache.org] Envoyé : jeudi 22 octobre 2015 12:36 À : user@flink.apache.org Objet : Re: Multiple keys in reduceGroup ? If not, could you provide us with the program and test data to reproduce the error? Cheers, Till On Thu, Oct 22, 2015 at 12:34 PM, Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> wrote: Hi, but he’s comparing it to a primitive long, so shouldn’t the Long key be unboxed and the comparison still be valid? My question is whether you enabled object-reuse-mode on the ExecutionEnvironment? Cheers, Aljoscha > On 22 Oct 2015, at 12:31, Stephan Ewen > <se...@apache.org<mailto:se...@apache.org>> wrote: > > Hi! > > You are checking for equality / inequality with "!=" - can you check with > "equals()" ? > > The key objects will most certainly be different in each record (as they are > deserialized individually), but they should be equal. > > Stephan > > > On Thu, Oct 22, 2015 at 12:20 PM, LINZ, Arnaud > <al...@bouyguestelecom.fr<mailto:al...@bouyguestelecom.fr>> wrote: > Hello, > > > > Trying to understand why my code was giving strange results, I’ve ended up > adding “useless” controls in my code and came with what seems to me a bug. I > group my dataset according to a key, but in the reduceGroup function I am > passed values with different keys. > > > > My code has the following pattern (mix of java & pseudo-code in []) : > > > > inputDataSet [of InputRecord] > > .joinWithTiny(referencesDataSet [of Reference]) > > .where([InputRecord SecondaryKeySelector]).equalTo([Reference KeySelector]) > > > .groupBy([PrimaryKeySelector : Tuple2<InputRecord, Reference> -> > value.f0.getPrimaryKey()]) > > .sortGroup([DateKeySelector], Order.ASCENDING) > > .reduceGroup(new ReduceFunction<InputRecord, OutputRecord>() { > > @Override > > public void reduce(Iterable< Tuple2<InputRecord, Reference>> values, > Collector<OutputRecord> out) throws Exception { > > // Issue : all values do not share the same key > > final List<Tuple2<InputRecord, Reference>> listValues = new > ArrayList<Tuple2<InputRecord, Reference>>(); > > for (final Tuple2<InputRecord, Reference>value : values) { > listValues.add(value); } > > > > final long primkey = listValues.get(0).f0.getPrimaryKey(); > > for (int i = 1; i < listValues.size(); i++) { > > if (listValues.get(i).f0.getPrimaryKey() != primkey) { > > throw new IllegalStateException(primkey + " != " + > listValues.get(i).f0.getPrimaryKey()); > > è This exception is fired ! > > } > > } > > } > > }) ; > > > > I use the current 0.10 snapshot. The issue appears in local cluster mode unit > tests as well as in yarn mode (however it’s ok when I test it with very few > elements). > > > > The sortGroup is not the cause of the problem, as I do get the same error > without it. > > > > Have I misunderstood the grouping concept or is it really an awful bug? > > > > Best regards, > > Arnaud > > > > > > > > > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses pièces > jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous > n'êtes pas destinataire de ce message, merci de le détruire et d'avertir > l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. The > company that sent this message cannot therefore be held liable for its > content nor attachments. Any unauthorized use or dissemination is prohibited. > If you are not the intended recipient of this message, then please delete it > and notify the sender. >