You can try the Jackson ObjectMapper library and that will get you from json to object.
Regards, Taher Koitawala On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com> wrote: > Hi, > > I want to map a stream of JSON documents from Kafka to a scala case-class. > How can this be accomplished using the JSONKeyValueDeserializationSchema?Is > a manual mapping of object nodes required? > > I have a Spark background. There, such manual mappings usually are > discouraged. Instead, they offer a nice API (dataset API) to perform such a > type of assignment. > 1) this is concise > 2) it operates on sparks off-heap memory representations (tungsten) to be > faster > > In Flink, instead, such off-heap optimizations seem not to be talked much > about (sorry if I miss something, I am a Flink newbie). Is there a reason > why these optimizations are not necessary in Flink? > > > How could I get the following example: > val serializer = new JSONKeyValueDeserializationSchema(false) > val stream = senv.addSource( > new FlinkKafkaConsumer( > "tweets-raw-json", > serializer, > properties > ).setStartFromEarliest() // TODO experiment with different start values > ) > > to map to this Tweet class concisely, i.e. without manually iterating > through all the attribute fields and parsing the keys from the object node > tree. > > final case class Tweet(tweet_id: Option[String], text: Option[String], > source: Option[String], geo: Option[String], place: Option[String], lang: > Option[String], created_at: Option[String], timestamp_ms: Option[String], > coordinates: Option[String], user_id: Option[Long], user_name: > Option[String], screen_name: Option[String], user_created_at: > Option[String], followers_count: Option[Long], friends_count: Option[Long], > user_lang: Option[String], user_location: Option[String], hashtags: > Option[Seq[String]]) > > Best, > Georg >