Basically, you need to convert it to a serializable format before doing the collect/take.
You can fire up a spark shell and paste this: val sFile = sc.sequenceFile[LongWritable, Text]("/home/akhld/sequence > /sigmoid") > *.map(_._2.toString)* > sFile.take(5).foreach(println) Use the attached sequence file generator and generated sequence file that i used for testing. Also note:If you don't do the .map to convert to string, then it will end up with the serializable Exception that you are hitting. [image: Inline image 1] Thanks Best Regards On Tue, Jun 2, 2015 at 6:21 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > Spark Shell: > > val x = > sc.sequenceFile("/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761", > classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.Text]) > > OR > > val x = > sc.sequenceFile("/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761", > classOf[org.apache.hadoop.io.Text], > classOf[org.apache.hadoop.io.LongWritable]) > > OR > > val x = > sc.sequenceFile("/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761", > classOf[org.apache.hadoop.io. LongWritable], > classOf[org.apache.hadoop.io.Text]) > > x.take(10).foreach(println) > > is throwing > > =================================== > Exception: > > 15/06/02 05:49:51 ERROR executor.Executor: Exception in task 0.0 in stage > 2.0 (TID 2) > > java.io.NotSerializableException: org.apache.hadoop.io.Text > > Serialization stack: > > - object not serializable (class: org.apache.hadoop.io.Text, value: > 290090268£2013112699) > > - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) > > - object (class scala.Tuple2, > (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.000000000000001.0000000000000019.00.00.0020.0020.0020.00113NY0000000000000000000-99902000-04-01 > 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH)) > > - element of array (index: 0) > > - array (class [Lscala.Tuple2;, size 10) > > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) > > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > 15/06/02 05:49:51 ERROR scheduler.TaskSetManager: Task 0.0 in stage 2.0 > (TID 2) had a not serializable result: org.apache.hadoop.io.Text > > Serialization stack: > > - object not serializable (class: org.apache.hadoop.io.Text, value: > 290090268£2013112699) > > - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) > > - object (class scala.Tuple2, > (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.000000000000001.0000000000000019.00.00.0020.0020.0020.00113NY0000000000000000000-99902000-04-01 > 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH)) > > - element of array (index: 0) > > - array (class [Lscala.Tuple2;, size 10); not retrying > > 15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, > whose tasks have all completed, from pool > > 15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2 > > 15/06/02 05:49:51 INFO scheduler.DAGScheduler: Stage 2 (take at > <console>:24) failed in 0.032 s > > 15/06/02 05:49:51 INFO scheduler.DAGScheduler: Job 2 failed: take at > <console>:24, took 0.041207 s > > org.apache.spark.SparkException: Job aborted due to stage failure: Task > 0.0 in stage 2.0 (TID 2) had a not serializable result: > org.apache.hadoop.io.Text > > Serialization stack: > > - object not serializable (class: org.apache.hadoop.io.Text, value: > 290090268£2013112699) > > - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) > > - object (class scala.Tuple2, > (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.000000000000001.0000000000000019.00.00.0020.0020.0020.00113NY0000000000000000000-99902000-04-01 > 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH)) > > - element of array (index: 0) > > - array (class [Lscala.Tuple2;, size 10) > > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > > at scala.Option.foreach(Option.scala:236) > > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > > =================================== > > ./bin/spark-shell -v --driver-class-path > /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar > --jars > /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar > --num-executors 7919 --driver-memory 14g --driver-java-options > "-XX:MaxPermSize=512M" --executor-memory 14g --executor-cores 1 --queue > hdmi-others > > On Tue, Jun 2, 2015 at 6:03 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > >> I have a sequence file >> >> >> SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v? >> >> >> Key = Text >> >> Value = Text >> >> and it seems to be using GzipCodec. >> >> How should i read it from Spark >> >> I am using >> >> val x = sc.sequenceFile(dwTable, classOf[Text], classOf[Text]) >> .partitionBy(new org.apache.spark.HashPartitioner(7919)) >> >> When i do >> >> x.take(10).foreach(println) >> >> each record return is identical. How is that possible. In this Sequence >> file records are unique. (guarenteed) >> >> -- >> Deepak >> >> > > > -- > Deepak > >
/** * Created by akhld on 20/5/15. */ import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; //White, Tom (2012-05-10). Hadoop: The Definitive Guide (Kindle Locations 5375-5384). OReilly Media - A. Kindle Edition. public class SequenceFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main( String[] args) throws IOException { String uri = "/home/akhld/sequence/sigmoid"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create( uri), conf); Path path = new Path( uri); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass()); for (int i = 0; i < 100; i ++) { key.set( 100 - i); value.set( DATA[ i % DATA.length]); //System.out.printf("[% s]\t% s\t% s\n", writer.getLength(), key, value); writer.append( key, value); } } finally { IOUtils.closeStream( writer); } } }
sigmoid
Description: Binary data
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org