Hi, I want to map a stream of JSON documents from Kafka to a scala case-class. How can this be accomplished using the JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes required?
I have a Spark background. There, such manual mappings usually are discouraged. Instead, they offer a nice API (dataset API) to perform such a type of assignment. 1) this is concise 2) it operates on sparks off-heap memory representations (tungsten) to be faster In Flink, instead, such off-heap optimizations seem not to be talked much about (sorry if I miss something, I am a Flink newbie). Is there a reason why these optimizations are not necessary in Flink? How could I get the following example: val serializer = new JSONKeyValueDeserializationSchema(false) val stream = senv.addSource( new FlinkKafkaConsumer( "tweets-raw-json", serializer, properties ).setStartFromEarliest() // TODO experiment with different start values ) to map to this Tweet class concisely, i.e. without manually iterating through all the attribute fields and parsing the keys from the object node tree. final case class Tweet(tweet_id: Option[String], text: Option[String], source: Option[String], geo: Option[String], place: Option[String], lang: Option[String], created_at: Option[String], timestamp_ms: Option[String], coordinates: Option[String], user_id: Option[Long], user_name: Option[String], screen_name: Option[String], user_created_at: Option[String], followers_count: Option[Long], friends_count: Option[Long], user_lang: Option[String], user_location: Option[String], hashtags: Option[Seq[String]]) Best, Georg