Hello,
I tried to use sparkSQL to analyse json data streams within a standalone
application.
here the code snippet that receive the streaming data:
*final JavaReceiverInputDStream<String> lines =
streamCtx.socketTextStream("localhost", Integer.parseInt(args[0]),
StorageLevel.MEMORY_AND_DISK_SER_2());*
*lines.foreachRDD((rdd) -> {*
*final JavaRDD<String> jsonElements = rdd.flatMap(new
FlatMapFunction<String, String>() {*
*@Override*
*public Iterable<String> call(final String line)*
*throws Exception {*
*return Arrays.asList(line.split("\n"));*
*}*
*}).filter(new Function<String, Boolean>() {*
*@Override*
*public Boolean call(final String v1)*
*throws Exception {*
*return v1.length() > 0;*
*}*
*});*
*//System.out.println("Data Received = " + jsonElements.collect().size());*
*final SQLContext sqlContext =
JavaSQLContextSingleton.getInstance(rdd.context());*
*final DataFrame dfJsonElement = sqlContext.read().json(jsonElements);
*
*executeSQLOperations(sqlContext, dfJsonElement);*
*});*
*streamCtx.start();*
*streamCtx.awaitTermination();*
*}*
I got the following error when the red line is executed:
java.lang.ClassNotFoundException:
com.intrinsec.common.spark.SQLStreamingJsonAnalyzer$2
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)