yup, that solves the compilation issue :-) one quick question regarding specifying Decoder in kafka stream:
please note that I am encoding the message as follows while sending data to kafka - <TweetEncoder> *String msg = objectMapper.writeValueAsString(tweetEvent);* *return msg.getBytes();* I have a corresponding <TweetDecoder> *return objectMapper.readValue(bytes, Tweet.class)* *>> how do I specify the Decoder in the following stream-processing flow ?* streams = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .load() .withColumn("message", from_json(col("value").cast("string"), tweetSchema)) // cast the binary value to a string and parse it as json .select("message.*") // unnest the json .as(Encoders.bean(Tweet.class)) Thanks Kaniska --------------------------------------------- On Mon, Mar 27, 2017 at 1:25 PM, Michael Armbrust <mich...@databricks.com> wrote: > You need to import col from org.apache.spark.sql.functions. > > On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal <kaniska.man...@gmail.com> > wrote: > >> Hi Michael, >> >> Can you please check if I am using correct version of spark-streaming >> library as specified in my pom (specified in the email) ? >> >> col("value").cast("string") - throwing an error 'cannot find symbol >> method col(java.lang.String)' >> I tried $"value" which results into similar compilation error. >> >> Thanks >> Kaniska >> >> >> >> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust < >> mich...@databricks.com> wrote: >> >>> Sorry, I don't think that I understand the question. Value is just a >>> binary blob that we get from kafka and pass to you. If its stored in JSON, >>> I think the code I provided is a good option, but if you are using a >>> different encoding you may need to write a UDF. >>> >>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal < >>> kaniska.man...@gmail.com> wrote: >>> >>>> Hi Michael, >>>> >>>> Thanks much for the suggestion. >>>> >>>> I was wondering - whats the best way to deserialize the 'value' field >>>> >>>> >>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust < >>>> mich...@databricks.com> wrote: >>>> >>>>> Encoders can only map data into an object if those columns already >>>>> exist. When we are reading from Kafka, we just get a binary blob and >>>>> you'll need to help Spark parse that first. Assuming your data is stored >>>>> in JSON it should be pretty straight forward. >>>>> >>>>> streams = spark >>>>> .readStream() >>>>> .format("kafka") >>>>> .option("kafka.bootstrap.servers", bootstrapServers) >>>>> .option(subscribeType, topics) >>>>> .load() >>>>> .withColumn("message", from_json(col("value").cast("string"), >>>>> tweetSchema)) // cast the binary value to a string and parse it as json >>>>> .select("message.*") // unnest the json >>>>> .as(Encoders.bean(Tweet.class)) // only required if you want to use >>>>> lambda functions on the data using this class >>>>> >>>>> Here is some more info on working with JSON and other semi-structured >>>>> formats >>>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html> >>>>> . >>>>> >>>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Currently , encountering the following exception while working with >>>>>> below-mentioned code snippet : >>>>>> >>>>>> > Please suggest the correct approach for reading the stream into a >>>>>> sql >>>>>> > schema. >>>>>> > If I add 'tweetSchema' while reading stream, it errors out with >>>>>> message - >>>>>> > we can not change static schema for kafka. >>>>>> >>>>>> ------------------------------------------------------------ >>>>>> ------------------------------- >>>>>> >>>>>> *exception* >>>>>> >>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve >>>>>> '`location`' given input columns: [topic, timestamp, key, offset, >>>>>> value, >>>>>> timestampType, partition]*; >>>>>> at >>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError >>>>>> At.failAnalysis(package.scala:42) >>>>>> at >>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu >>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis >>>>>> .scala:77) >>>>>> ------------------------------------------------------------ >>>>>> -------------------------------------------- >>>>>> >>>>>> *structured streaming code snippet* >>>>>> >>>>>> String bootstrapServers = "localhost:9092"; >>>>>> String subscribeType = "subscribe"; >>>>>> String topics = "events"; >>>>>> >>>>>> StructType tweetSchema = new StructType() >>>>>> .add("tweetId", "string") >>>>>> .add("tweetText", "string") >>>>>> .add("location", "string") >>>>>> .add("timestamp", "string"); >>>>>> >>>>>> SparkSession spark = SparkSession >>>>>> .builder() >>>>>> .appName("StreamProcessor") >>>>>> .config("spark.master", "local") >>>>>> .getOrCreate(); >>>>>> >>>>>> Dataset<Tweet> streams = spark >>>>>> .readStream() >>>>>> .format("kafka") >>>>>> .option("kafka.bootstrap.servers", >>>>>> bootstrapServers) >>>>>> .option(subscribeType, topics) >>>>>> .load() >>>>>> .as(Encoders.bean(Tweet.class) >>>>>> ); >>>>>> >>>>>> streams.createOrReplaceTempView("streamsData"); >>>>>> >>>>>> String sql = "SELECT location, COUNT(*) as count >>>>>> FROM streamsData >>>>>> GROUP BY location"; >>>>>> Dataset<Row> countsByLocation = spark.sql(sql); >>>>>> >>>>>> StreamingQuery query = >>>>>> countsByLocation.writeStream() >>>>>> .outputMode("complete") >>>>>> .format("console") >>>>>> .start(); >>>>>> >>>>>> query.awaitTermination(); >>>>>> ------------------------------------------------------------ >>>>>> -------------------------------------- >>>>>> >>>>>> *Tweet * >>>>>> >>>>>> Tweet.java - has public constructor and getter / setter methods >>>>>> >>>>>> public class Tweet implements Serializable{ >>>>>> >>>>>> private String tweetId; >>>>>> private String tweetText; >>>>>> private String location; >>>>>> private String timestamp; >>>>>> >>>>>> public Tweet(){ >>>>>> >>>>>> } >>>>>> ............. >>>>>> >>>>>> ------------------------------------------------------------ >>>>>> ---------------------------- >>>>>> >>>>>> *pom.xml * >>>>>> >>>>>> >>>>>> <dependency> >>>>>> <groupId>org.apache.spark</groupId> >>>>>> <artifactId>spark-core_2.10</artifactId> >>>>>> <version>2.1.0</version> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.apache.spark</groupId> >>>>>> <artifactId>spark-streaming_2.10</artifactId> >>>>>> <version>2.1.0</version> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.apache.spark</groupId> >>>>>> <artifactId>spark-streaming-ka >>>>>> fka-0-8_2.10</artifactId> >>>>>> <version>2.1.0</version> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.apache.spark</groupId> >>>>>> <artifactId>spark-sql_2.10</artifactId> >>>>>> <version>2.1.0</version> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.apache.spark</groupId> >>>>>> <artifactId>spark-sql-kafka-0-10_2.10</artifactId> >>>>>> <version>2.1.0</version> >>>>>> </dependency> >>>>>> ------------------------------------------------------------ >>>>>> ------------------------ >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> View this message in context: http://apache-spark-user-list. >>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html >>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>> Nabble.com. >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>> >>>>>> >>>>> >>>> >>> >> >