Hi,
I am able to read a custom input format in spark.
scala> val inputRead = sc.newAPIHadoopFile("hdfs://
127.0.0.1/user/cloudera/date_dataset/
",classOf[io.reader.PatternInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])

However, doing a
inputRead.count()
results in null pointer exception.
14/04/08 13:33:39 INFO FileInputFormat: Total input paths to process : 1
14/04/08 13:33:39 INFO SparkContext: Starting job: count at <console>:15
14/04/08 13:33:39 INFO DAGScheduler: Got job 8 (count at <console>:15) with
1 output partitions (allowLocal=false)
14/04/08 13:33:39 INFO DAGScheduler: Final stage: Stage 9 (count at
<console>:15)
14/04/08 13:33:39 INFO DAGScheduler: Parents of final stage: List()
14/04/08 13:33:39 INFO DAGScheduler: Missing parents: List()
14/04/08 13:33:39 INFO DAGScheduler: Submitting Stage 9 (NewHadoopRDD[19]
at newAPIHadoopFile at <console>:12), which has no missing parents
14/04/08 13:33:39 INFO DAGScheduler: Submitting 1 missing tasks from Stage
9 (NewHadoopRDD[19] at newAPIHadoopFile at <console>:12)
14/04/08 13:33:39 INFO TaskSchedulerImpl: Adding task set 9.0 with 1 tasks
14/04/08 13:33:39 INFO TaskSetManager: Starting task 9.0:0 as TID 8 on
executor localhost: localhost (PROCESS_LOCAL)
14/04/08 13:33:39 INFO TaskSetManager: Serialized task 9.0:0 as 1297 bytes
in 0 ms
14/04/08 13:33:39 INFO Executor: Running task ID 8
14/04/08 13:33:39 INFO BlockManager: Found block broadcast_5 locally
14/04/08 13:33:39 INFO NewHadoopRDD: Input split: hdfs://
127.0.0.1/user/cloudera/date_dataset/sample.txt:0+759
14/04/08 13:33:39 WARN TaskSetManager: Lost TID 8 (task 9.0:0)
14/04/08 13:33:39 WARN TaskSetManager: Loss was due to
java.lang.NullPointerException
java.lang.NullPointerException
    at java.util.regex.Pattern.<init>(Pattern.java:1132)
    at java.util.regex.Pattern.compile(Pattern.java:823)
    at io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
    at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
14/04/08 13:33:39 ERROR TaskSetManager: Task 9.0:0 failed 1 times; aborting
job
14/04/08 13:33:39 INFO DAGScheduler: Failed to run count at <console>:15
14/04/08 13:33:39 INFO TaskSchedulerImpl: Remove TaskSet 9.0 from pool
14/04/08 13:33:39 ERROR Executor: Exception in task ID 8
java.lang.NullPointerException
    at java.util.regex.Pattern.<init>(Pattern.java:1132)
    at java.util.regex.Pattern.compile(Pattern.java:823)
    at io.reader.PatternRecordReader.initialize(PatternRecordReader.java:42)
    at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:96)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
org.apache.spark.SparkException: Job aborted: Task 9.0:0 failed 1 times
(most recent failure: Exception failure: java.lang.NullPointerException)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at scala.Option.foreach(Option.scala:236)
    at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


any idea what might be happening here?

-anurag



-- 
Twitter: @anuragphadke (https://twitter.com/#!/anuragphadke)

Reply via email to