yup, that solves the compilation issue :-)

one quick question regarding specifying Decoder in kafka stream:

please note that I am encoding the message as follows while sending data to
kafka -

<TweetEncoder>

*String msg = objectMapper.writeValueAsString(tweetEvent);*

*return msg.getBytes();*

I have a corresponding <TweetDecoder>

*return objectMapper.readValue(bytes, Tweet.class)*


*>> how do I specify the Decoder in the following stream-processing flow ?*
streams = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers)
  .option(subscribeType, topics)
  .load()
  .withColumn("message", from_json(col("value").cast("string"),
tweetSchema)) // cast the binary value to a string and parse it as json
  .select("message.*") // unnest the json
  .as(Encoders.bean(Tweet.class))

Thanks
Kaniska

---------------------------------------------

On Mon, Mar 27, 2017 at 1:25 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> You need to import col from org.apache.spark.sql.functions.
>
> On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal <kaniska.man...@gmail.com>
> wrote:
>
>> Hi Michael,
>>
>> Can you please check if I am using correct version of spark-streaming
>> library as specified in my pom (specified in the email) ?
>>
>> col("value").cast("string") - throwing an error 'cannot find symbol
>> method col(java.lang.String)'
>> I tried $"value" which results into similar compilation error.
>>
>> Thanks
>> Kaniska
>>
>>
>>
>> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Sorry, I don't think that I understand the question.  Value is just a
>>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>>> I think the code I provided is a good option, but if you are using a
>>> different encoding you may need to write a UDF.
>>>
>>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <
>>> kaniska.man...@gmail.com> wrote:
>>>
>>>> Hi Michael,
>>>>
>>>> Thanks much for the suggestion.
>>>>
>>>> I was wondering - whats the best way to deserialize the 'value' field
>>>>
>>>>
>>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> Encoders can only map data into an object if those columns already
>>>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>>>> in JSON it should be pretty straight forward.
>>>>>
>>>>> streams = spark
>>>>>   .readStream()
>>>>>   .format("kafka")
>>>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>>>   .option(subscribeType, topics)
>>>>>   .load()
>>>>>   .withColumn("message", from_json(col("value").cast("string"),
>>>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>>>   .select("message.*") // unnest the json
>>>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>>>>> lambda functions on the data using this class
>>>>>
>>>>> Here is some more info on working with JSON and other semi-structured
>>>>> formats
>>>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>>>> .
>>>>>
>>>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Currently , encountering the following exception while working with
>>>>>> below-mentioned code snippet :
>>>>>>
>>>>>> > Please suggest the correct approach for reading the stream into a
>>>>>> sql
>>>>>> > schema.
>>>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>>>> message -
>>>>>> > we can not change static schema for kafka.
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>> -------------------------------
>>>>>>
>>>>>> *exception*
>>>>>>
>>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>>>> '`location`' given input columns: [topic, timestamp, key, offset,
>>>>>> value,
>>>>>> timestampType, partition]*;
>>>>>>         at
>>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>>>> At.failAnalysis(package.scala:42)
>>>>>>         at
>>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis
>>>>>> .scala:77)
>>>>>> ------------------------------------------------------------
>>>>>> --------------------------------------------
>>>>>>
>>>>>> *structured streaming code snippet*
>>>>>>
>>>>>> String bootstrapServers = "localhost:9092";
>>>>>>             String subscribeType = "subscribe";
>>>>>>             String topics = "events";
>>>>>>
>>>>>>             StructType tweetSchema = new StructType()
>>>>>>                 .add("tweetId", "string")
>>>>>>                 .add("tweetText", "string")
>>>>>>                 .add("location", "string")
>>>>>>                 .add("timestamp", "string");
>>>>>>
>>>>>>            SparkSession spark = SparkSession
>>>>>>                               .builder()
>>>>>>                               .appName("StreamProcessor")
>>>>>>                               .config("spark.master", "local")
>>>>>>                               .getOrCreate();
>>>>>>
>>>>>>           Dataset<Tweet> streams = spark
>>>>>>                                       .readStream()
>>>>>>                                       .format("kafka")
>>>>>>                                       .option("kafka.bootstrap.servers",
>>>>>> bootstrapServers)
>>>>>>                                       .option(subscribeType, topics)
>>>>>>                                       .load()
>>>>>>                                       .as(Encoders.bean(Tweet.class)
>>>>>> );
>>>>>>
>>>>>>          streams.createOrReplaceTempView("streamsData");
>>>>>>
>>>>>>                    String sql = "SELECT location,  COUNT(*) as count
>>>>>> FROM streamsData
>>>>>> GROUP BY location";
>>>>>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>>>>>
>>>>>>                     StreamingQuery query =
>>>>>> countsByLocation.writeStream()
>>>>>>                       .outputMode("complete")
>>>>>>                       .format("console")
>>>>>>                       .start();
>>>>>>
>>>>>>                     query.awaitTermination();
>>>>>> ------------------------------------------------------------
>>>>>> --------------------------------------
>>>>>>
>>>>>> *Tweet *
>>>>>>
>>>>>> Tweet.java - has public constructor and getter / setter methods
>>>>>>
>>>>>> public class Tweet implements Serializable{
>>>>>>
>>>>>>         private String tweetId;
>>>>>>         private String tweetText;
>>>>>>         private String location;
>>>>>>         private String timestamp;
>>>>>>
>>>>>>         public Tweet(){
>>>>>>
>>>>>>         }
>>>>>> .............
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>> ----------------------------
>>>>>>
>>>>>> *pom.xml *
>>>>>>
>>>>>>
>>>>>>                 <dependency>
>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>                         <artifactId>spark-core_2.10</artifactId>
>>>>>>                         <version>2.1.0</version>
>>>>>>                 </dependency>
>>>>>>                 <dependency>
>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>                         <artifactId>spark-streaming_2.10</artifactId>
>>>>>>                         <version>2.1.0</version>
>>>>>>                 </dependency>
>>>>>>                 <dependency>
>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>                         <artifactId>spark-streaming-ka
>>>>>> fka-0-8_2.10</artifactId>
>>>>>>                         <version>2.1.0</version>
>>>>>>                 </dependency>
>>>>>>                 <dependency>
>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>                         <artifactId>spark-sql_2.10</artifactId>
>>>>>>                         <version>2.1.0</version>
>>>>>>                 </dependency>
>>>>>>                 <dependency>
>>>>>>                 <groupId>org.apache.spark</groupId>
>>>>>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>>>>>                 <version>2.1.0</version>
>>>>>>                 </dependency>
>>>>>> ------------------------------------------------------------
>>>>>> ------------------------
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to