Hi,
I try to read data from DynamoDB table with Spark, but after I run this
code I got an error massege like in below.
I use Spark 1.1.1 and emr-core-1.1.jar, emr-ddb-hive-1.0.jar and
emr-ddb-hadoop-1.0.jar.
valsparkConf = SparkConf().setAppName("DynamoRdeader").setMaster("local[4]")
valctx = JavaSparkContext(sparkConf)
valjobConf = JobConf(ctx.hadoopConfiguration())
jobConf.set("dynamodb.servicename","dynamodb")
jobConf.set("dynamodb.input.tableName","<...>")
jobConf.set("dynamodb.endpoint","<...>")
jobConf.set("dynamodb.regionid","<...>")
jobConf.set("dynamodb.throughput.read","1")
jobConf.set("dynamodb.throughput.read.percent","1.5")
jobConf.set("dynamodb.max.map.tasks","2")
jobConf.set("dynamodb.awsAccessKeyId","<...>")
jobConf.set("dynamodb.awsSecretAccessKey","<...>")
jobConf.set("mapred.input.format.class",
javaClass<DynamoDBInputFormat>().getName())
varusers =
ctx.hadoopRDD(jobConf,javaClass<DynamoDBInputFormat>(),javaClass<Text>(),javaClass<DynamoDBItemWritable>())
users.collect().forEach{
println(it)
}
Exception in thread "main" org.apache.spark.SparkException: Job aborted
due to stage failure: Task serialization failed: java.io.IOException:
java.lang.NullPointerException
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:930)
org.apache.spark.SerializableWritable.writeObject(SerializableWritable.scala:34)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:867)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753)
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
akka.actor.ActorCell.invoke(ActorCell.scala:456)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
akka.dispatch.Mailbox.run(Mailbox.scala:219)
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:874)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Could you give me any suggastions how can I fix it?
Thank you,
Istvan