I don't immediately see what the issue could be - try .count()-ing the individual RDDs to narrow it down? What code change made it work?
Also I think this could probably be a few lines of SQL with an aggregate, collect_list(), and joins. On Thu, May 21, 2020 at 11:27 PM Stephen Coy <s...@infomedia.com.au.invalid> wrote: > > Hi there, > > This will be a little long so please bear with me. There is a buildable > example available at https://github.com/sfcoy/sfcoy-spark-cce-test. > > Say I have the following three tables: > > Machines > > Id,MachineType > 100001,A > 100002,B > 200003,B > 200004,A > 200005,B > > Bolts > > MachineType,Description > A,20 x M5 > A,30 x M5 > B,"2"" x 1/4""" > B,"2"" x 1/2""" > B,"2"" x 3/8""" > A,40 x M6 > A,50 x M10 > B,"1"" x 1/8""" > > Nuts > > MachineType,Description > A,M5 > A,M6 > B,"1/4""" > B,"1/8""" > B,"3/8""" > > > The objective is to create lists of Machines by Id, with all of their bolts > and nuts listed on the same line: > > 100001, 20 x M5, 30 x M5, 40 x M6,50 x M10,M5,M6 > > The output is further categorised by the first 5 digits of the machine id, > although that seems immaterial to this problem. > In practice I’m dealing with ~70 million machines with a couple of hundred > thousand types - therefore Spark! > > The code to do this looks like: > > final Dataset<Machine> machineRecords = sparkSession > .read() > .format("csv") > .option("header", "true") > .option("inferSchema", "true") > .load("src/main/data/machines.csv") > .as(Encoders.bean(Machine.class)) > .persist(); > > final int workerCount = sparkContext.defaultParallelism(); > > final JavaPairRDD<String, List<Nut>> nutsByMachine = sparkSession > .read() > .format("csv") > .option("header", "true") > .option("inferSchema", "true") > .load("src/main/data/nuts.csv") > .as(Encoders.bean(Nut.class)) > .toJavaRDD() > .mapToPair(nut -> new Tuple2<>(nut.getMachineType(), nut)) > .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount)) > .combineByKey(SparkCCETest::createListAndCombine, > SparkCCETest::mergeValues, SparkCCETest::mergeCombiners) > .persist(StorageLevel.MEMORY_AND_DISK()); > > final JavaPairRDD<String, List<Bolt>> boltsByMachine = sparkSession > .read() > .format("csv") > .option("header", "true") > .option("inferSchema", "true") > .load("src/main/data/bolts.csv") > .as(Encoders.bean(Bolt.class)) > .toJavaRDD() > .mapToPair(bolt -> new Tuple2<>(bolt.getMachineType(), bolt)) > .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount)) > .combineByKey(SparkCCETest::createListAndCombine, > SparkCCETest::mergeValues, SparkCCETest::mergeCombiners) > .persist(StorageLevel.MEMORY_AND_DISK()); > > machineRecords > .toJavaRDD() > .mapToPair(machine -> new Tuple2<>(machine.getMachineType(), machine)) > .join(nutsByMachine) > .join(boltsByMachine) > .map(Tuple2::_2) > .map(tuples -> new Tuple3<>(tuples._1._1, tuples._1._2, tuples._2)) > .mapToPair(machineWithNutsBolts -> new > Tuple2<>(exportFileFor(machineWithNutsBolts._1()), machineWithNutsBolts)) > .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount)) > .foreachPartition(machineIterator -> { // <- line 77 > ///... > }); > > > static String exportFileFor(Machine machine) { > return machine.getId().substring(0, 5); > } > > static <T> List<T> createListAndCombine(T v) { > List<T> c = new ArrayList<>(); > c.add(v); > return c; > } > > static <T> List<T> mergeValues(List<T> c, T v) { > c.add(v); > return c; > } > > static <T> List<T> mergeCombiners(List<T> c1, List<T> c2) { > c1.addAll(c2); > return c1; > } > > Running this yields a ClassCastException: > > 20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_1 failed due to > exception java.lang.ClassCastException: org.example.Bolt cannot be cast to > org.example.Nut. > 20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_2 failed due to > exception java.lang.ClassCastException: org.example.Bolt cannot be cast to > org.example.Nut. > 20/05/22 14:05:31 WARN BlockManager: Block rdd_47_2 could not be removed as > it was not found on disk or in memory > 20/05/22 14:05:31 WARN BlockManager: Block rdd_47_1 could not be removed as > it was not found on disk or in memory > 20/05/22 14:05:31 ERROR Executor: Exception in task 2.0 in stage 9.0 (TID 13) > java.lang.ClassCastException: org.example.Bolt cannot be cast to > org.example.Nut > at > org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152) > ... > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 20/05/22 14:05:31 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 12) > java.lang.ClassCastException: org.example.Bolt cannot be cast to > org.example.Nut > at > org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152) > ... > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > … > > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1979) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1967) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1966) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1966) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:946) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:946) > at scala.Option.foreach(Option.scala:407) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:946) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2196) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2145) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2134) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:748) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2095) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2160) > at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:994) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) > at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:992) > at > org.apache.spark.api.java.JavaRDDLike.foreachPartition(JavaRDDLike.scala:219) > at > org.apache.spark.api.java.JavaRDDLike.foreachPartition$(JavaRDDLike.scala:218) > at > org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45) > at org.example.SparkCCETest.main(SparkCCETest.java:77) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) > at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: java.lang.ClassCastException: org.example.Bolt cannot be cast to > org.example.Nut > at > org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1$adapted(ExternalAppendOnlyMap.scala:151) > at > org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144) > at > org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:164) > at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41) > at > org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$3(PairRDDFunctions.scala:92) > at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837) > at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362) > at > org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1306) > at > org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1233) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1297) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1121) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:311) > at > org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$2(CoGroupedRDD.scala:140) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877) > at scala.collection.immutable.List.foreach(List.scala:392) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) > at > org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) > at org.apache.spark.scheduler.Task.run(Task.scala:127) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > Anyway, to cut a long story short, its occurred to me while creating this > reproducer to replace those generic methods at the bottom of the code with > explicitly typed versions. > > This made the problem go away. > > This seems like a work around, but does anyone think this could be a bug? > > Thanks, > > Steve C > > P.S. I’m relatively new to Apache Spark so if anyone thinks I’m going about > this the wrong way then I would be pleased to hear any better ideas. > > > > > This email contains confidential information of and is the copyright of > Infomedia. It must not be forwarded, amended or disclosed without consent of > the sender. If you received this message by mistake, please advise the sender > and delete all copies. Security of transmission on the internet cannot be > guaranteed, could be infected, intercepted, or corrupted and you should > ensure you have suitable antivirus protection in place. By sending us your or > any third party personal details, you consent to (or confirm you have > obtained consent from such third parties) to Infomedia’s privacy policy. > http://www.infomedia.com.au/privacy-policy/ --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org