I don't immediately see what the issue could be - try .count()-ing the
individual RDDs to narrow it down?
What code change made it work?

Also I think this could probably be a few lines of SQL with an
aggregate, collect_list(), and joins.

On Thu, May 21, 2020 at 11:27 PM Stephen Coy
<s...@infomedia.com.au.invalid> wrote:
>
> Hi there,
>
> This will be a little long so please bear with me. There is a buildable 
> example available at https://github.com/sfcoy/sfcoy-spark-cce-test.
>
> Say I have the following three tables:
>
> Machines
>
> Id,MachineType
> 100001,A
> 100002,B
> 200003,B
> 200004,A
> 200005,B
>
> Bolts
>
> MachineType,Description
> A,20 x M5
> A,30 x M5
> B,"2"" x 1/4"""
> B,"2"" x 1/2"""
> B,"2"" x 3/8"""
> A,40 x M6
> A,50 x M10
> B,"1"" x 1/8"""
>
> Nuts
>
> MachineType,Description
> A,M5
> A,M6
> B,"1/4"""
> B,"1/8"""
> B,"3/8"""
>
>
> The objective is to create lists of Machines by Id, with all of their bolts 
> and nuts listed on the same line:
>
> 100001, 20 x M5, 30 x M5, 40 x M6,50 x M10,M5,M6
>
> The output is further categorised by the first 5 digits of the machine id, 
> although that seems immaterial to this problem.
> In practice I’m dealing with ~70 million machines with a couple of hundred 
> thousand types - therefore Spark!
>
> The code to do this looks like:
>
> final Dataset<Machine> machineRecords = sparkSession
>   .read()
>   .format("csv")
>   .option("header", "true")
>   .option("inferSchema", "true")
>   .load("src/main/data/machines.csv")
>   .as(Encoders.bean(Machine.class))
>   .persist();
>
> final int workerCount = sparkContext.defaultParallelism();
>
> final JavaPairRDD<String, List<Nut>> nutsByMachine = sparkSession
>   .read()
>   .format("csv")
>   .option("header", "true")
>   .option("inferSchema", "true")
>   .load("src/main/data/nuts.csv")
>   .as(Encoders.bean(Nut.class))
>   .toJavaRDD()
>   .mapToPair(nut -> new Tuple2<>(nut.getMachineType(), nut))
>   .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
>   .combineByKey(SparkCCETest::createListAndCombine, 
> SparkCCETest::mergeValues, SparkCCETest::mergeCombiners)
>   .persist(StorageLevel.MEMORY_AND_DISK());
>
> final JavaPairRDD<String, List<Bolt>> boltsByMachine = sparkSession
>   .read()
>   .format("csv")
>   .option("header", "true")
>   .option("inferSchema", "true")
>   .load("src/main/data/bolts.csv")
>   .as(Encoders.bean(Bolt.class))
>   .toJavaRDD()
>   .mapToPair(bolt -> new Tuple2<>(bolt.getMachineType(), bolt))
>   .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
>   .combineByKey(SparkCCETest::createListAndCombine, 
> SparkCCETest::mergeValues, SparkCCETest::mergeCombiners)
>   .persist(StorageLevel.MEMORY_AND_DISK());
>
> machineRecords
>   .toJavaRDD()
>   .mapToPair(machine -> new Tuple2<>(machine.getMachineType(), machine))
>   .join(nutsByMachine)
>   .join(boltsByMachine)
>   .map(Tuple2::_2)
>   .map(tuples -> new Tuple3<>(tuples._1._1, tuples._1._2, tuples._2))
>   .mapToPair(machineWithNutsBolts -> new 
> Tuple2<>(exportFileFor(machineWithNutsBolts._1()), machineWithNutsBolts))
>   .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
>   .foreachPartition(machineIterator -> { // <- line 77
>       ///...
>   });
>
>
> static String exportFileFor(Machine machine) {
>     return machine.getId().substring(0, 5);
> }
>
> static <T> List<T> createListAndCombine(T v) {
>     List<T> c = new ArrayList<>();
>     c.add(v);
>     return c;
> }
>
> static <T> List<T> mergeValues(List<T> c, T v) {
>     c.add(v);
>     return c;
> }
>
> static <T> List<T> mergeCombiners(List<T> c1, List<T> c2) {
>     c1.addAll(c2);
>     return c1;
> }
>
> Running this yields a ClassCastException:
>
> 20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_1 failed due to 
> exception java.lang.ClassCastException: org.example.Bolt cannot be cast to 
> org.example.Nut.
> 20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_2 failed due to 
> exception java.lang.ClassCastException: org.example.Bolt cannot be cast to 
> org.example.Nut.
> 20/05/22 14:05:31 WARN BlockManager: Block rdd_47_2 could not be removed as 
> it was not found on disk or in memory
> 20/05/22 14:05:31 WARN BlockManager: Block rdd_47_1 could not be removed as 
> it was not found on disk or in memory
> 20/05/22 14:05:31 ERROR Executor: Exception in task 2.0 in stage 9.0 (TID 13)
> java.lang.ClassCastException: org.example.Bolt cannot be cast to 
> org.example.Nut
> at 
> org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
> ...
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 20/05/22 14:05:31 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 12)
> java.lang.ClassCastException: org.example.Bolt cannot be cast to 
> org.example.Nut
> at 
> org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
> ...
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> …
>
> Driver stacktrace:
> at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1979)
> at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1967)
> at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1966)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1966)
> at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:946)
> at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:946)
> at scala.Option.foreach(Option.scala:407)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:946)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2196)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2145)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2134)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:748)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2095)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2160)
> at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:994)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:992)
> at 
> org.apache.spark.api.java.JavaRDDLike.foreachPartition(JavaRDDLike.scala:219)
> at 
> org.apache.spark.api.java.JavaRDDLike.foreachPartition$(JavaRDDLike.scala:218)
> at 
> org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45)
> at org.example.SparkCCETest.main(SparkCCETest.java:77)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
> at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassCastException: org.example.Bolt cannot be cast to 
> org.example.Nut
> at 
> org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1$adapted(ExternalAppendOnlyMap.scala:151)
> at 
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
> at 
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:164)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
> at 
> org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$3(PairRDDFunctions.scala:92)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362)
> at 
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1306)
> at 
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1233)
> at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1297)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1121)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
> at 
> org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$2(CoGroupedRDD.scala:140)
> at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
> at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:127)
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> Anyway, to cut a long story short, its occurred to me while creating this 
> reproducer to replace those generic methods at the bottom of the code with 
> explicitly typed versions.
>
> This made the problem go away.
>
> This seems like a work around, but does anyone think this could be a bug?
>
> Thanks,
>
> Steve C
>
> P.S. I’m relatively new to Apache Spark so if anyone thinks I’m going about 
> this the wrong way then I would be pleased to hear any better ideas.
>
>
>
>
> This email contains confidential information of and is the copyright of 
> Infomedia. It must not be forwarded, amended or disclosed without consent of 
> the sender. If you received this message by mistake, please advise the sender 
> and delete all copies. Security of transmission on the internet cannot be 
> guaranteed, could be infected, intercepted, or corrupted and you should 
> ensure you have suitable antivirus protection in place. By sending us your or 
> any third party personal details, you consent to (or confirm you have 
> obtained consent from such third parties) to Infomedia’s privacy policy. 
> http://www.infomedia.com.au/privacy-policy/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to