Hi, I am invoking the spark-shell (Spark 1.0.0) with:
spark-shell --jars \ libs/aws-java-sdk-1.3.26.jar,\ libs/httpclient-4.1.1.jar,\ libs/httpcore-nio-4.1.jar,\ libs/gson-2.1.jar,\ libs/httpclient-cache-4.1.1.jar,\ libs/httpmime-4.1.1.jar,\ libs/hive-dynamodb-handler-0.11.0.jar,\ libs/httpcore-4.1.jar,\ libs/joda-time-2.1.jar and, entering the following in the shell: import org.apache.hadoop.io.Text; import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.mapred.JobConf var jobConf = new JobConf(sc.hadoopConfiguration) jobConf.set("dynamodb.servicename", "dynamodb") jobConf.set("dynamodb.input.tableName", "<...>") jobConf.set("dynamodb.endpoint", "dynamodb.eu-west-1.amazonaws.com") jobConf.set("dynamodb.regionid", "eu-west-1") jobConf.set("dynamodb.throughput.read", "1") jobConf.set("dynamodb.throughput.read.percent", "1") jobConf.set("dynamodb.awsAccessKeyId", "<...>") jobConf.set("dynamodb.awsSecretAccessKey", "<...>") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") var users = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) users.count() This is raising an npe for FileSplit (as below). Any suggestions on what I might pursue to correct this would be very welcome. ian 14/07/20 23:56:03 INFO deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 14/07/20 23:56:03 INFO JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 14/07/20 23:56:03 INFO AbstractDynamoDBInputFormat: Throughput percent: 1.0 14/07/20 23:56:03 INFO EndpointProvider: Using endpoint for DynamoDB: dynamodb.eu-west-1.amazonaws.com 14/07/20 23:56:03 INFO DynamoDBClient: Describe Table Output: {Table: {TableName: <...>, KeySchema: {HashKeyElement: {AttributeName: id, AttributeType: S, }, }, TableStatus: ACTIVE, CreationDateTime: Wed May 07 14:38:30 BST 2014, ProvisionedThroughput: {ReadCapacityUnits: 4, WriteCapacityUnits: 4, }, TableSizeBytes: 2473, ItemCount: 14, }, } 14/07/20 23:56:03 INFO SparkContext: Starting job: count at <console>:21 14/07/20 23:56:03 INFO DAGScheduler: Got job 0 (count at <console>:21) with 1 output partitions (allowLocal=false) 14/07/20 23:56:03 INFO DAGScheduler: Final stage: Stage 0(count at <console>:21) 14/07/20 23:56:03 INFO DAGScheduler: Parents of final stage: List() 14/07/20 23:56:03 INFO DAGScheduler: Missing parents: List() 14/07/20 23:56:03 INFO DAGScheduler: Submitting Stage 0 (HadoopRDD[0] at hadoopRDD at <console>:18), which has no missing parents 14/07/20 23:56:03 ERROR DAGSchedulerActorSupervisor: eventProcesserActor failed due to the error null; shutting down SparkContext 14/07/20 23:56:04 INFO SparkUI: Stopped Spark web UI at http://10.0.1.7:4040 14/07/20 23:56:04 INFO DAGScheduler: Stopping DAGScheduler 14/07/20 23:56:05 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/07/20 23:56:05 INFO ConnectionManager: Selector thread was interrupted! 14/07/20 23:56:05 INFO ConnectionManager: ConnectionManager stopped 14/07/20 23:56:05 INFO MemoryStore: MemoryStore cleared 14/07/20 23:56:05 INFO BlockManager: BlockManager stopped 14/07/20 23:56:05 INFO BlockManagerMasterActor: Stopping BlockManagerMaster 14/07/20 23:56:05 INFO BlockManagerMaster: BlockManagerMaster stopped 14/07/20 23:56:05 INFO SparkContext: Successfully stopped SparkContext 14/07/20 23:56:05 ERROR OneForOneStrategy: java.lang.NullPointerException at org.apache.hadoop.mapreduce.lib.input.FileSplit.write(FileSplit.java:80) at org.apache.hadoop.mapred.FileSplit.write(FileSplit.java:85) at org.apache.hadoop.dynamodb.split.DynamoDBSplit.write(DynamoDBSplit.java:63) at org.apache.hadoop.io.ObjectWritable.writeObject(ObjectWritable.java:202) at org.apache.hadoop.io.ObjectWritable.writeObject(ObjectWritable.java:128) at org.apache.hadoop.io.ObjectWritable.write(ObjectWritable.java:82) at org.apache.spark.SerializableWritable.writeObject(SerializableWritable.scala:35) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:132) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:767) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/07/20 23:56:05 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/07/20 23:56:05 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/07/20 23:56:05 INFO Remoting: Remoting shut down 14/07/20 23:56:05 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. On 4 Jul 2014, at 18:04, Nick Pentreath <nick.pentre...@gmail.com> wrote: > Interesting - I would have thought they would make that available publicly. > Unfortunately, unless you can use Spark on EMR, I guess your options are to > hack it by spinning up an EMR cluster and getting the JAR, or maybe fall back > to using boto and rolling your own :( > > > > On Fri, Jul 4, 2014 at 9:28 AM, Ian Wilkinson <ia...@me.com> wrote: > Trying to discover source for the DynamoDBInputFormat. > Not appearing in: > > - https://github.com/aws/aws-sdk-java > - https://github.com/apache/hive > > Then came across > http://stackoverflow.com/questions/17077774/jar-containing-org-apache-hadoop-hive-dynamodb. > Unsure whether this represents the latest situation… > > ian > > > On 4 Jul 2014, at 16:58, Nick Pentreath <nick.pentre...@gmail.com> wrote: > >> I should qualify by saying there is boto support for dynamodb - but not for >> the inputFormat. You could roll your own python-based connection but this >> involves figuring out how to split the data in dynamo - inputFormat takes >> care of this so should be the easier approach >> — >> Sent from Mailbox >> >> >> On Fri, Jul 4, 2014 at 8:51 AM, Ian Wilkinson <ia...@me.com> wrote: >> >> Excellent. Let me get browsing on this. >> >> >> Huge thanks, >> ian >> >> >> On 4 Jul 2014, at 16:47, Nick Pentreath <nick.pentre...@gmail.com> wrote: >> >>> No boto support for that. >>> >>> In master there is Python support for loading Hadoop inputFormat. Not sure >>> if it will be in 1.0.1 or 1.1 >>> >>> I master docs under the programming guide are instructions and also under >>> examples project there are pyspark examples of using Cassandra and HBase. >>> These should hopefully give you enough to get started. >>> >>> Depending on how easy it is to use the dynamo DB format, you may have to >>> write a custom converter (see the mentioned examples for storm details). >>> >>> Sent from my iPhone >>> >>> On 4 Jul 2014, at 08:38, Ian Wilkinson <ia...@me.com> wrote: >>> >>>> Hi Nick, >>>> >>>> I’m going to be working with python primarily. Are you aware of >>>> comparable boto support? >>>> >>>> ian >>>> >>>> On 4 Jul 2014, at 16:32, Nick Pentreath <nick.pentre...@gmail.com> wrote: >>>> >>>>> You should be able to use DynamoDBInputFormat (I think this should be >>>>> part of AWS libraries for Java) and create a HadoopRDD from that. >>>>> >>>>> >>>>> On Fri, Jul 4, 2014 at 8:28 AM, Ian Wilkinson <ia...@me.com> wrote: >>>>> Hi, >>>>> >>>>> I noticed mention of DynamoDB as input source in >>>>> http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf. >>>>> >>>>> Unfortunately, Google is not coming to my rescue on finding >>>>> further mention for this support. >>>>> >>>>> Any pointers would be well received. >>>>> >>>>> Big thanks, >>>>> ian >>>>> >>>> >> >> > >