The exception is telling you precisely what is wrong. The kafka source has a
schema of (topic, partition, offset, key, value, timestamp, timestampType).
Nothing about those columns makes sense as a tweet. You need to inform spark
how to get from bytes to tweet, it doesn't know how you serialized th
Ah, I understand what you are asking now. There is no API for specifying a
kafka specific "decoder", since Spark SQL already has a rich language for
expressing transformations. The dataframe code I gave will parse the JSON
and materialize in a class, very similar to what objectMapper.readValue(by
yup, that solves the compilation issue :-)
one quick question regarding specifying Decoder in kafka stream:
please note that I am encoding the message as follows while sending data to
kafka -
*String msg = objectMapper.writeValueAsString(tweetEvent);*
*return msg.getBytes();*
I have a corres
You need to import col from org.apache.spark.sql.functions.
On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal
wrote:
> Hi Michael,
>
> Can you please check if I am using correct version of spark-streaming
> library as specified in my pom (specified in the email) ?
>
> col("value").cast("string") -
Hi Michael,
Can you please check if I am using correct version of spark-streaming
library as specified in my pom (specified in the email) ?
col("value").cast("string") - throwing an error 'cannot find symbol method
col(java.lang.String)'
I tried $"value" which results into similar compilation err
Sorry, I don't think that I understand the question. Value is just a
binary blob that we get from kafka and pass to you. If its stored in JSON,
I think the code I provided is a good option, but if you are using a
different encoding you may need to write a UDF.
On Fri, Mar 24, 2017 at 4:58 PM, ka
Hi Michael,
Thanks much for the suggestion.
I was wondering - whats the best way to deserialize the 'value' field
On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust
wrote:
> Encoders can only map data into an object if those columns already exist.
> When we are reading from Kafka, we just get
Encoders can only map data into an object if those columns already exist.
When we are reading from Kafka, we just get a binary blob and you'll need
to help Spark parse that first. Assuming your data is stored in JSON it
should be pretty straight forward.
streams = spark
.readStream()
.format(