Basically, you need to convert it to a serializable format before doing the
collect/take.

You can fire up a spark shell and paste this:

    val sFile = sc.sequenceFile[LongWritable, Text]("/home/akhld/sequence
> /sigmoid")
>       *.map(_._2.toString)*
>     sFile.take(5).foreach(println)


Use the attached sequence file generator and generated sequence file that i
used for testing.

Also note:If you don't do the .map to convert to string, then it will end
up with the serializable Exception that you are hitting.

[image: Inline image 1]


Thanks
Best Regards

On Tue, Jun 2, 2015 at 6:21 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> Spark Shell:
>
> val x =
> sc.sequenceFile("/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761",
> classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.Text])
>
> OR
>
> val x =
> sc.sequenceFile("/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761",
> classOf[org.apache.hadoop.io.Text],
> classOf[org.apache.hadoop.io.LongWritable])
>
> OR
>
> val x =
> sc.sequenceFile("/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761",
> classOf[org.apache.hadoop.io. LongWritable],
> classOf[org.apache.hadoop.io.Text])
>
> x.take(10).foreach(println)
>
> is throwing
>
> ===================================
> Exception:
>
> 15/06/02 05:49:51 ERROR executor.Executor: Exception in task 0.0 in stage
> 2.0 (TID 2)
>
> java.io.NotSerializableException: org.apache.hadoop.io.Text
>
> Serialization stack:
>
> - object not serializable (class: org.apache.hadoop.io.Text, value:
> 290090268£2013112699)
>
> - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
>
> - object (class scala.Tuple2,
> (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.000000000000001.0000000000000019.00.00.0020.0020.0020.00113NY0000000000000000000-99902000-04-01
> 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH))
>
> - element of array (index: 0)
>
> - array (class [Lscala.Tuple2;, size 10)
>
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
>
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 15/06/02 05:49:51 ERROR scheduler.TaskSetManager: Task 0.0 in stage 2.0
> (TID 2) had a not serializable result: org.apache.hadoop.io.Text
>
> Serialization stack:
>
> - object not serializable (class: org.apache.hadoop.io.Text, value:
> 290090268£2013112699)
>
> - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
>
> - object (class scala.Tuple2,
> (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.000000000000001.0000000000000019.00.00.0020.0020.0020.00113NY0000000000000000000-99902000-04-01
> 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH))
>
> - element of array (index: 0)
>
> - array (class [Lscala.Tuple2;, size 10); not retrying
>
> 15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0,
> whose tasks have all completed, from pool
>
> 15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2
>
> 15/06/02 05:49:51 INFO scheduler.DAGScheduler: Stage 2 (take at
> <console>:24) failed in 0.032 s
>
> 15/06/02 05:49:51 INFO scheduler.DAGScheduler: Job 2 failed: take at
> <console>:24, took 0.041207 s
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0.0 in stage 2.0 (TID 2) had a not serializable result:
> org.apache.hadoop.io.Text
>
> Serialization stack:
>
> - object not serializable (class: org.apache.hadoop.io.Text, value:
> 290090268£2013112699)
>
> - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
>
> - object (class scala.Tuple2,
> (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.000000000000001.0000000000000019.00.00.0020.0020.0020.00113NY0000000000000000000-99902000-04-01
> 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH))
>
> - element of array (index: 0)
>
> - array (class [Lscala.Tuple2;, size 10)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>
> at scala.Option.foreach(Option.scala:236)
>
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
> ===================================
>
> ./bin/spark-shell -v  --driver-class-path
> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
> --jars
> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar
> --num-executors 7919 --driver-memory 14g --driver-java-options
> "-XX:MaxPermSize=512M" --executor-memory 14g --executor-cores 1 --queue
> hdmi-others
>
> On Tue, Jun 2, 2015 at 6:03 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:
>
>> I have a sequence file
>>
>>
>> SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v?
>>
>>
>> Key = Text
>>
>> Value = Text
>>
>> and it seems to be using GzipCodec.
>>
>> How should i read it from Spark
>>
>> I am using
>>
>> val x = sc.sequenceFile(dwTable, classOf[Text], classOf[Text])
>> .partitionBy(new org.apache.spark.HashPartitioner(7919))
>>
>> When i do
>>
>> x.take(10).foreach(println)
>>
>> each record return is identical. How is that possible. In this Sequence
>> file records are unique. (guarenteed)
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>
/**
 * Created by akhld on 20/5/15.
 */

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

//White, Tom (2012-05-10). Hadoop: The Definitive Guide (Kindle Locations 5375-5384). OReilly Media - A. Kindle Edition.

public class SequenceFileWriteDemo {

    private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" };

    public static void main( String[] args) throws IOException {
        String uri = "/home/akhld/sequence/sigmoid";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create( uri), conf);
        Path path = new Path( uri);
        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try {
            writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
            for (int i = 0; i < 100; i ++) {
                key.set( 100 - i);
                value.set( DATA[ i % DATA.length]);
                //System.out.printf("[% s]\t% s\t% s\n", writer.getLength(), key, value);
                writer.append( key, value); }
        } finally
        { IOUtils.closeStream( writer);
        }
    }
}

Attachment: sigmoid
Description: Binary data

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to