Erm, you are trying to do all the work in the create() method. This is definitely not what you want to do. It is just supposed to make the JavaSparkStreamingContext. A further problem is that you're using anonymous inner classes, which are non-static and contain a reference to the outer class. The closure cleaner can sometimes get rid of that, but perhaps not here. Consider a static inner class if you can't resolve it other ways. There is probably however at least another issue in this code ...
On Thu, Nov 6, 2014 at 1:43 PM, Vasu C <vasuc.bigd...@gmail.com> wrote: > HI Sean, > > Below is my java code and using spark 1.1.0. Still getting the same error. > Here Bean class is serialized. Not sure where exactly is the problem. > What am I doing wrong here ? > > public class StreamingJson { > public static void main(String[] args) throws Exception { > final String HDFS_FILE_LOC = args[0]; > final String IMPALA_TABLE_LOC = args[1]; > final String TEMP_TABLE_NAME = args[2]; > final String HDFS_CHECKPOINT_DIR = args[3]; > > JavaStreamingContextFactory contextFactory = new > JavaStreamingContextFactory() { > public JavaStreamingContext create() { > SparkConf sparkConf = new SparkConf().setAppName( > "test").set("spark.cores.max", "3"); > > final JavaStreamingContext jssc = new JavaStreamingContext( > sparkConf, new Duration(500)); > > final JavaHiveContext javahiveContext = new JavaHiveContext( > jssc.sc()); > > javahiveContext.createParquetFile(Bean.class, > IMPALA_TABLE_LOC, true, new Configuration()) > .registerTempTable(TEMP_TABLE_NAME); > > final JavaDStream<String> textFileStream = jssc > .textFileStream(HDFS_FILE_LOC); > > textFileStream > .foreachRDD(new Function2<JavaRDD<String>, Time, Void>() { > > @Override > public Void call(JavaRDD<String> rdd, Time time) > throws Exception { > if (rdd != null) { > if (rdd.count() > 0) { > JavaSchemaRDD schRdd = javahiveContext > .jsonRDD(rdd); > schRdd.insertInto(TEMP_TABLE_NAME); > } > } > return null; > } > }); > jssc.checkpoint(HDFS_CHECKPOINT_DIR); > return jssc; > } > }; > JavaStreamingContext context = JavaStreamingContext.getOrCreate( > HDFS_CHECKPOINT_DIR, contextFactory); > context.start(); // Start the computation > context.awaitTermination(); > } > } > > > > Regards, > Vasu C > > On Thu, Nov 6, 2014 at 1:33 PM, Sean Owen <so...@cloudera.com> wrote: >> >> No, not the same thing then. This just means you accidentally have a >> reference to the unserializable enclosing test class in your code. >> Just make sure the reference is severed. >> >> On Thu, Nov 6, 2014 at 8:00 AM, Vasu C <vasuc.bigd...@gmail.com> wrote: >> > Thanks for pointing to the issue. >> > >> > Yes I think its the same issue, below is Exception >> > >> > >> > ERROR OneForOneStrategy: TestCheckpointStreamingJson$1 >> > java.io.NotSerializableException: TestCheckpointStreamingJson > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org