You can try the Jackson ObjectMapper library and that will get you from
json to object.

Regards,
Taher Koitawala

On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com> wrote:

> Hi,
>
> I want to map a stream of JSON documents from Kafka to a scala case-class.
> How can this be accomplished using the JSONKeyValueDeserializationSchema?Is
> a manual mapping of object nodes required?
>
> I have a Spark background. There, such manual mappings usually are
> discouraged. Instead, they offer a nice API (dataset API) to perform such a
> type of assignment.
> 1) this is concise
> 2) it operates on sparks off-heap memory representations (tungsten) to be
> faster
>
> In Flink, instead, such off-heap optimizations seem not to be talked much
> about (sorry if I miss something, I am a Flink newbie). Is there a reason
> why these optimizations are not necessary in Flink?
>
>
> How could I get the following example:
> val serializer = new JSONKeyValueDeserializationSchema(false)
> val stream = senv.addSource(
>     new FlinkKafkaConsumer(
>       "tweets-raw-json",
>       serializer,
>       properties
>     ).setStartFromEarliest() // TODO experiment with different start values
>   )
>
> to map to this Tweet class concisely, i.e. without manually iterating
> through all the attribute fields and parsing the keys from the object node
> tree.
>
> final case class Tweet(tweet_id: Option[String], text: Option[String],
> source: Option[String], geo: Option[String], place: Option[String], lang:
> Option[String], created_at: Option[String], timestamp_ms: Option[String],
> coordinates: Option[String], user_id: Option[Long], user_name:
> Option[String], screen_name: Option[String], user_created_at:
> Option[String], followers_count: Option[Long], friends_count: Option[Long],
> user_lang: Option[String], user_location: Option[String], hashtags:
> Option[Seq[String]])
>
> Best,
> Georg
>

Reply via email to