Erm, you are trying to do all the work in the create() method. This is
definitely not what you want to do. It is just supposed to make the
JavaSparkStreamingContext. A further problem is that you're using
anonymous inner classes, which are non-static and contain a reference
to the outer class. The closure cleaner can sometimes get rid of that,
but perhaps not here. Consider a static inner class if you can't
resolve it other ways. There is probably however at least another
issue in this code ...

On Thu, Nov 6, 2014 at 1:43 PM, Vasu C <vasuc.bigd...@gmail.com> wrote:
> HI Sean,
>
> Below is my java code and using spark 1.1.0. Still getting the same error.
> Here Bean class is serialized. Not sure where exactly is the problem.
> What am I doing wrong here ?
>
> public class StreamingJson {
> public static void main(String[] args) throws Exception {
> final String HDFS_FILE_LOC = args[0];
> final String IMPALA_TABLE_LOC = args[1];
> final String TEMP_TABLE_NAME = args[2];
> final String HDFS_CHECKPOINT_DIR = args[3];
>
> JavaStreamingContextFactory contextFactory = new
> JavaStreamingContextFactory() {
> public JavaStreamingContext create() {
> SparkConf sparkConf = new SparkConf().setAppName(
> "test").set("spark.cores.max", "3");
>
> final JavaStreamingContext jssc = new JavaStreamingContext(
> sparkConf, new Duration(500));
>
> final JavaHiveContext javahiveContext = new JavaHiveContext(
> jssc.sc());
>
> javahiveContext.createParquetFile(Bean.class,
> IMPALA_TABLE_LOC, true, new Configuration())
> .registerTempTable(TEMP_TABLE_NAME);
>
> final JavaDStream<String> textFileStream = jssc
> .textFileStream(HDFS_FILE_LOC);
>
> textFileStream
> .foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
>
> @Override
> public Void call(JavaRDD<String> rdd, Time time)
> throws Exception {
> if (rdd != null) {
> if (rdd.count() > 0) {
> JavaSchemaRDD schRdd = javahiveContext
> .jsonRDD(rdd);
> schRdd.insertInto(TEMP_TABLE_NAME);
> }
> }
> return null;
> }
> });
> jssc.checkpoint(HDFS_CHECKPOINT_DIR);
> return jssc;
> }
> };
> JavaStreamingContext context = JavaStreamingContext.getOrCreate(
> HDFS_CHECKPOINT_DIR, contextFactory);
> context.start(); // Start the computation
> context.awaitTermination();
> }
> }
>
>
>
> Regards,
>    Vasu C
>
> On Thu, Nov 6, 2014 at 1:33 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>> No, not the same thing then. This just means you accidentally have a
>> reference to the unserializable enclosing test class in your code.
>> Just make sure the reference is severed.
>>
>> On Thu, Nov 6, 2014 at 8:00 AM, Vasu C <vasuc.bigd...@gmail.com> wrote:
>> > Thanks for pointing to the issue.
>> >
>> > Yes I think its the same issue, below is Exception
>> >
>> >
>> > ERROR OneForOneStrategy: TestCheckpointStreamingJson$1
>> > java.io.NotSerializableException: TestCheckpointStreamingJson
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to