Hi,
I am invoking the spark-shell (Spark 1.0.0) with:
spark-shell --jars \
libs/aws-java-sdk-1.3.26.jar,\
libs/httpclient-4.1.1.jar,\
libs/httpcore-nio-4.1.jar,\
libs/gson-2.1.jar,\
libs/httpclient-cache-4.1.1.jar,\
libs/httpmime-4.1.1.jar,\
libs/hive-dynamodb-handler-0.11.0.jar,\
libs/httpcore-4.1.jar,\
libs/joda-time-2.1.jar
and, entering the following in the shell:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.mapred.JobConf
var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", "<...>")
jobConf.set("dynamodb.endpoint", "dynamodb.eu-west-1.amazonaws.com")
jobConf.set("dynamodb.regionid", "eu-west-1")
jobConf.set("dynamodb.throughput.read", "1")
jobConf.set("dynamodb.throughput.read.percent", "1")
jobConf.set("dynamodb.awsAccessKeyId", "<...>")
jobConf.set("dynamodb.awsSecretAccessKey", "<...>")
jobConf.set("mapred.output.format.class",
"org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
var users = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text],
classOf[DynamoDBItemWritable])
users.count()
This is raising an npe for FileSplit (as below). Any suggestions on
what I might pursue to correct this would be very welcome.
ian
14/07/20 23:56:03 INFO deprecation: session.id is deprecated. Instead, use
dfs.metrics.session-id
14/07/20 23:56:03 INFO JvmMetrics: Initializing JVM Metrics with
processName=JobTracker, sessionId=
14/07/20 23:56:03 INFO AbstractDynamoDBInputFormat: Throughput percent: 1.0
14/07/20 23:56:03 INFO EndpointProvider: Using endpoint for DynamoDB:
dynamodb.eu-west-1.amazonaws.com
14/07/20 23:56:03 INFO DynamoDBClient: Describe Table Output: {Table:
{TableName: <...>, KeySchema: {HashKeyElement: {AttributeName: id,
AttributeType: S, }, }, TableStatus: ACTIVE, CreationDateTime: Wed May 07
14:38:30 BST 2014, ProvisionedThroughput: {ReadCapacityUnits: 4,
WriteCapacityUnits: 4, }, TableSizeBytes: 2473, ItemCount: 14, }, }
14/07/20 23:56:03 INFO SparkContext: Starting job: count at <console>:21
14/07/20 23:56:03 INFO DAGScheduler: Got job 0 (count at <console>:21) with 1
output partitions (allowLocal=false)
14/07/20 23:56:03 INFO DAGScheduler: Final stage: Stage 0(count at <console>:21)
14/07/20 23:56:03 INFO DAGScheduler: Parents of final stage: List()
14/07/20 23:56:03 INFO DAGScheduler: Missing parents: List()
14/07/20 23:56:03 INFO DAGScheduler: Submitting Stage 0 (HadoopRDD[0] at
hadoopRDD at <console>:18), which has no missing parents
14/07/20 23:56:03 ERROR DAGSchedulerActorSupervisor: eventProcesserActor failed
due to the error null; shutting down SparkContext
14/07/20 23:56:04 INFO SparkUI: Stopped Spark web UI at http://10.0.1.7:4040
14/07/20 23:56:04 INFO DAGScheduler: Stopping DAGScheduler
14/07/20 23:56:05 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor
stopped!
14/07/20 23:56:05 INFO ConnectionManager: Selector thread was interrupted!
14/07/20 23:56:05 INFO ConnectionManager: ConnectionManager stopped
14/07/20 23:56:05 INFO MemoryStore: MemoryStore cleared
14/07/20 23:56:05 INFO BlockManager: BlockManager stopped
14/07/20 23:56:05 INFO BlockManagerMasterActor: Stopping BlockManagerMaster
14/07/20 23:56:05 INFO BlockManagerMaster: BlockManagerMaster stopped
14/07/20 23:56:05 INFO SparkContext: Successfully stopped SparkContext
14/07/20 23:56:05 ERROR OneForOneStrategy:
java.lang.NullPointerException
at
org.apache.hadoop.mapreduce.lib.input.FileSplit.write(FileSplit.java:80)
at org.apache.hadoop.mapred.FileSplit.write(FileSplit.java:85)
at
org.apache.hadoop.dynamodb.split.DynamoDBSplit.write(DynamoDBSplit.java:63)
at
org.apache.hadoop.io.ObjectWritable.writeObject(ObjectWritable.java:202)
at
org.apache.hadoop.io.ObjectWritable.writeObject(ObjectWritable.java:128)
at org.apache.hadoop.io.ObjectWritable.write(ObjectWritable.java:82)
at
org.apache.spark.SerializableWritable.writeObject(SerializableWritable.scala:35)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:132)
at
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:767)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/07/20 23:56:05 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down
remote daemon.
14/07/20 23:56:05 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon
shut down; proceeding with flushing remote transports.
14/07/20 23:56:05 INFO Remoting: Remoting shut down
14/07/20 23:56:05 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut
down.
On 4 Jul 2014, at 18:04, Nick Pentreath <[email protected]> wrote:
> Interesting - I would have thought they would make that available publicly.
> Unfortunately, unless you can use Spark on EMR, I guess your options are to
> hack it by spinning up an EMR cluster and getting the JAR, or maybe fall back
> to using boto and rolling your own :(
>
>
>
> On Fri, Jul 4, 2014 at 9:28 AM, Ian Wilkinson <[email protected]> wrote:
> Trying to discover source for the DynamoDBInputFormat.
> Not appearing in:
>
> - https://github.com/aws/aws-sdk-java
> - https://github.com/apache/hive
>
> Then came across
> http://stackoverflow.com/questions/17077774/jar-containing-org-apache-hadoop-hive-dynamodb.
> Unsure whether this represents the latest situation…
>
> ian
>
>
> On 4 Jul 2014, at 16:58, Nick Pentreath <[email protected]> wrote:
>
>> I should qualify by saying there is boto support for dynamodb - but not for
>> the inputFormat. You could roll your own python-based connection but this
>> involves figuring out how to split the data in dynamo - inputFormat takes
>> care of this so should be the easier approach
>> —
>> Sent from Mailbox
>>
>>
>> On Fri, Jul 4, 2014 at 8:51 AM, Ian Wilkinson <[email protected]> wrote:
>>
>> Excellent. Let me get browsing on this.
>>
>>
>> Huge thanks,
>> ian
>>
>>
>> On 4 Jul 2014, at 16:47, Nick Pentreath <[email protected]> wrote:
>>
>>> No boto support for that.
>>>
>>> In master there is Python support for loading Hadoop inputFormat. Not sure
>>> if it will be in 1.0.1 or 1.1
>>>
>>> I master docs under the programming guide are instructions and also under
>>> examples project there are pyspark examples of using Cassandra and HBase.
>>> These should hopefully give you enough to get started.
>>>
>>> Depending on how easy it is to use the dynamo DB format, you may have to
>>> write a custom converter (see the mentioned examples for storm details).
>>>
>>> Sent from my iPhone
>>>
>>> On 4 Jul 2014, at 08:38, Ian Wilkinson <[email protected]> wrote:
>>>
>>>> Hi Nick,
>>>>
>>>> I’m going to be working with python primarily. Are you aware of
>>>> comparable boto support?
>>>>
>>>> ian
>>>>
>>>> On 4 Jul 2014, at 16:32, Nick Pentreath <[email protected]> wrote:
>>>>
>>>>> You should be able to use DynamoDBInputFormat (I think this should be
>>>>> part of AWS libraries for Java) and create a HadoopRDD from that.
>>>>>
>>>>>
>>>>> On Fri, Jul 4, 2014 at 8:28 AM, Ian Wilkinson <[email protected]> wrote:
>>>>> Hi,
>>>>>
>>>>> I noticed mention of DynamoDB as input source in
>>>>> http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf.
>>>>>
>>>>> Unfortunately, Google is not coming to my rescue on finding
>>>>> further mention for this support.
>>>>>
>>>>> Any pointers would be well received.
>>>>>
>>>>> Big thanks,
>>>>> ian
>>>>>
>>>>
>>
>>
>
>