I'm afraid SparkSQL isn't an option for my use case, so I need to use the Spark API itself. I turned off Kryo, and I'm getting a NullPointerException now:
scala> val ref = file.take(1)(0)._2 ref: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable = org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@9753e scala> ref.size res7: Int = 79 // *This matches the no. of columns that I know exist in that RC record* scala> ref.get(0) java.lang.NullPointerException at org.apache.hadoop.hive.serde2.columnar.BytesRefWritable.toString(BytesRefWritable.java:194) at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324) at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329) at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337) at .<init>(<console>:10) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) Pramod On Wed, Sep 24, 2014 at 7:38 AM, cem <cayiro...@gmail.com> wrote: > > I was able to read RC files with the following line: > > > val file: RDD[(LongWritable, BytesRefArrayWritable)] = > sc.hadoopFile("hdfs://****day=2014-08-10/hour=00/", > classOf[RCFileInputFormat[LongWritable, BytesRefArrayWritable]], > classOf[LongWritable], classOf[BytesRefArrayWritable],500) > > Try with disabling kryo serializer. > > Best Regards, > Cem Cayiroglu > > On Tue, Sep 23, 2014 at 7:23 PM, Matei Zaharia <matei.zaha...@gmail.com> > wrote: > >> Is your file managed by Hive (and thus present in a Hive metastore)? In >> that case, Spark SQL ( >> https://spark.apache.org/docs/latest/sql-programming-guide.html) is the >> easiest way. >> >> Matei >> >> On September 23, 2014 at 2:26:10 PM, Pramod Biligiri ( >> pramodbilig...@gmail.com) wrote: >> >> Hi, >> I'm trying to read some data in RCFiles using Spark, but can't seem to >> find a suitable example anywhere. Currently I've written the following bit >> of code that lets me count() the no. of records, but when I try to do a >> collect() or a map(), it fails with a ConcurrentModificationException. I'm >> running Spark 1.0.1 on a Hadoop YARN cluster: >> >> import org.apache.hadoop.hive.ql.io.RCFileInputFormat; >> val file = sc.hadoopFile("/hdfs/path/to/file", >> >> classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[org.apache.hadoop.io.LongWritable,org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]], >> classOf[org.apache.hadoop.io.LongWritable], >> classOf[org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable] >> ) >> file.collect() >> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 10.0:6 failed 4 times, most recent failure: Exception failure in TID 395 on >> host (redacted): com.esotericsoftware.kryo.KryoException: >> java.util.ConcurrentModificationException >> Serialization trace: >> classes (sun.misc.Launcher$AppClassLoader) >> parent (org.apache.spark.repl.ExecutorClassLoader) >> classLoader (org.apache.hadoop.mapred.JobConf) >> conf (org.apache.hadoop.io.compress.GzipCodec) >> codec (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer) >> this$0 >> (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer$LazyDecompressionCallbackImpl) >> lazyDecompressObj >> (org.apache.hadoop.hive.serde2.columnar.BytesRefWritable) >> bytesRefWritables >> (org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) >> com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38) >> >> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34) >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) >> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:141) >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193) >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> java.lang.Thread.run(Thread.java:722) >> Driver stacktrace: >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1046) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1030) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1028) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >> at scala.Option.foreach(Option.scala:236) >> at >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> Pramod >> >> >> >