Great. Thanks. But would it be possible to automate this i.e. to have this work automatically for the case class / product?
Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala < taher...@gmail.com>: > The performant way would be to apply a map function over the stream and > then use the Jackson ObjectMapper to convert to scala objects. In flink > there is no API like Spark to automatically get all fields. > > On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <georg.kf.hei...@gmail.com> > wrote: > >> How can I use it with a scala case class? >> If I understand it correctly for better performance the Object Mapper is >> already initialized in each KafkaConsumer and returning ObjectNodes. So >> probably I should rephrase to: how can I then map these to case classes >> without handcoding it? https://github.com/json4s/json4s or >> https://github.com/FasterXML/jackson-module-scala both only seem to >> consume strings. >> >> Best, >> Georg >> >> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala < >> taher...@gmail.com>: >> >>> You can try the Jackson ObjectMapper library and that will get you from >>> json to object. >>> >>> Regards, >>> Taher Koitawala >>> >>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I want to map a stream of JSON documents from Kafka to a scala >>>> case-class. How can this be accomplished using the >>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes >>>> required? >>>> >>>> I have a Spark background. There, such manual mappings usually are >>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a >>>> type of assignment. >>>> 1) this is concise >>>> 2) it operates on sparks off-heap memory representations (tungsten) to >>>> be faster >>>> >>>> In Flink, instead, such off-heap optimizations seem not to be talked >>>> much about (sorry if I miss something, I am a Flink newbie). Is there a >>>> reason why these optimizations are not necessary in Flink? >>>> >>>> >>>> How could I get the following example: >>>> val serializer = new JSONKeyValueDeserializationSchema(false) >>>> val stream = senv.addSource( >>>> new FlinkKafkaConsumer( >>>> "tweets-raw-json", >>>> serializer, >>>> properties >>>> ).setStartFromEarliest() // TODO experiment with different start >>>> values >>>> ) >>>> >>>> to map to this Tweet class concisely, i.e. without manually iterating >>>> through all the attribute fields and parsing the keys from the object node >>>> tree. >>>> >>>> final case class Tweet(tweet_id: Option[String], text: Option[String], >>>> source: Option[String], geo: Option[String], place: Option[String], lang: >>>> Option[String], created_at: Option[String], timestamp_ms: Option[String], >>>> coordinates: Option[String], user_id: Option[Long], user_name: >>>> Option[String], screen_name: Option[String], user_created_at: >>>> Option[String], followers_count: Option[Long], friends_count: Option[Long], >>>> user_lang: Option[String], user_location: Option[String], hashtags: >>>> Option[Seq[String]]) >>>> >>>> Best, >>>> Georg >>>> >>>