Hi, Quick observations from what you have provided
- The observed discrepancy between rdd.count() and rdd.map(Item::getType).countByValue()in distributed mode suggests a potential aggregation issue with countByValue(). The correct results in local mode give credence to this theory. - Workarounds using mapToPair() and reduceByKey() produce identical results, indicating a broader pattern rather than method specific behaviour. - Dataset.groupBy().count()yields accurate results, but this method incurs overhead for RDD-to-Dataset conversion. Your expected total count of 75187 is around 7 times larger than the observed count of 10519, mapping to the number of your executors 7. This suggests potentially incorrect aggregation or partial aggregation across executors. Now before raising red flag, these could be the culprit - Data Skew, uneven distribution of data across executors could cause partial aggregation if a single executor processes most items of a particular type. - Partial Aggregations, Spark might be combining partial counts from executors incorrectly, leading to inaccuracies. - Finally a bug in 3.5 is possible. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 27 Feb 2024 at 19:02, Stuart Fehr <stuart.f...@gmail.com> wrote: > Hello, I recently encountered a bug with the results from > JavaRDD#countByValue that does not reproduce when running locally. For > background, we are running a Spark 3.5.0 job on AWS EMR 7.0.0. > > The code in question is something like this: > > JavaRDD<Item> rdd = // ... >> rdd.count(); // 75187 > > > > // Get the count broken down by type >> rdd.map(Item::getType).countByValue(); > > > Which gives these results from the resulting Map: > > TypeA: 556 > TypeB: 9168 > TypeC: 590 > TypeD: 205 > (total: 10519) > > These values are incorrect, since every item has a type defined, so the > total of all the types should be 75187. When I inspected this stage in the > Spark UI, I found that it was using 7 executors. Since the value here is > about 1/7th of the actual expected value, I suspect that there is some > issue with the way that the executors report their results back to the > driver. These results for the same code are correct when I run the job in > local mode ("local[4]"), so it may also have something to do with how data > is shared across processes. > > For workarounds, I have also tried: > > rdd.mapToPair(item -> Tuple2.apply(item.getType(), 1)).countByKey(); >> rdd.mapToPair(item -> Tuple2.apply(item.getType(), >> 1L)).reduceByKey(Long::sum).collectAsMap(); > > > These yielded the same (incorrect) result. > > I did find that using Dataset.groupBy().count() did yield the correct > results: > > TypeA: 3996 > TypeB: 65490 > TypeC: 4224 > TypeD: 1477 > > So, I have an immediate workaround, but it is somewhat awkward since I > have to create a Dataframe from a JavaRDD each time. > > Am I doing something wrong? Do these methods not work the way that I > expected them to from reading the documentation? Is this a legitimate bug? > > I would be happy to provide more details if that would help in debugging > this scenario. > > Thank you for your time, > ~Stuart Fehr >