Hi all,
I'm trying to work out a problem when using Spark Streaming, currently I
have the following piece of code inside a foreachRDD call:
Dataframe results = ... //some dataframe created from the incoming rdd -
moderately big, I don't want this to be shuffled
DataFrame t = sqlContext.table("a_temp_cached_table"); //a very small table
- that might mutate over time
DataFrame x = results.join(*broadcast(t)*, JOIN_COLUMNS_SEQ)
.groupBy("column1").count()
.write()
.mode(SaveMode.Append)
.save("/some-path/");
The intention of the code above is to distribute the "t" dataframe if
required, but avoid shuffling the "results".
This works fine when ran on scala-shell / spark-submit, but when ran from
within my executors I get the exception below...
Any thoughts? If I remove the *broadcast(t)* then it works fine but where my
big table is shuffled around.
2016-01-21 14:47:00 ERROR JobScheduler:95 - Error running job streaming job
1453387560000 ms.0
java.lang.ArrayIndexOutOfBoundsException: 8388607
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:345)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47)
at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:804)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:570)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194)
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1326)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashJoin.scala:91)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashJoin.scala:79)
at
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashJoin.scala:79)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashJoin.scala:79)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-01-21 14:47:00 ERROR RslApp:60 - ERROR executing the application
java.lang.ArrayIndexOutOfBoundsException: 8388607
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:345)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47)
at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:804)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:570)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194)
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1326)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashJoin.scala:91)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashJoin.scala:79)
at
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashJoin.scala:79)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashJoin.scala:79)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[Stage 10:> (0 + 24) /
24]2016-01-21 14:47:02 ERROR InsertIntoHadoopFsRelation:95 - Aborting job.
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at
org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:125)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:125)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:242)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at
com.hsbc.rsl.spark.streaming.receiver.functions.PersistLevel3WithDataframes.call(PersistLevel3WithDataframes.java:84)
at
com.hsbc.rsl.spark.streaming.receiver.functions.PersistLevel3WithDataframes.call(PersistLevel3WithDataframes.java:27)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:656)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:656)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-01-21 14:47:02 ERROR DefaultWriterContainer:74 - Job
job_201601211447_0000 aborted.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ArrayIndexOutOfBoundsException-when-attempting-broadcastjoin-tp26034.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]