The performant way would be to apply a map function over the stream and
then use the Jackson ObjectMapper to convert to scala objects. In flink
there is no API like Spark to automatically get all fields.

On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

> How can I use it with a scala case class?
> If I understand it correctly for better performance the Object Mapper is
> already initialized in each KafkaConsumer and returning ObjectNodes. So
> probably I should rephrase to: how can I then map these to case classes
> without handcoding it?  https://github.com/json4s/json4s or
> https://github.com/FasterXML/jackson-module-scala both only seem to
> consume strings.
>
> Best,
> Georg
>
> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
> taher...@gmail.com>:
>
>> You can try the Jackson ObjectMapper library and that will get you from
>> json to object.
>>
>> Regards,
>> Taher Koitawala
>>
>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I want to map a stream of JSON documents from Kafka to a scala
>>> case-class. How can this be accomplished using the
>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>>> required?
>>>
>>> I have a Spark background. There, such manual mappings usually are
>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>>> type of assignment.
>>> 1) this is concise
>>> 2) it operates on sparks off-heap memory representations (tungsten) to
>>> be faster
>>>
>>> In Flink, instead, such off-heap optimizations seem not to be talked
>>> much about (sorry if I miss something, I am a Flink newbie). Is there a
>>> reason why these optimizations are not necessary in Flink?
>>>
>>>
>>> How could I get the following example:
>>> val serializer = new JSONKeyValueDeserializationSchema(false)
>>> val stream = senv.addSource(
>>>     new FlinkKafkaConsumer(
>>>       "tweets-raw-json",
>>>       serializer,
>>>       properties
>>>     ).setStartFromEarliest() // TODO experiment with different start
>>> values
>>>   )
>>>
>>> to map to this Tweet class concisely, i.e. without manually iterating
>>> through all the attribute fields and parsing the keys from the object node
>>> tree.
>>>
>>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>>> source: Option[String], geo: Option[String], place: Option[String], lang:
>>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>>> coordinates: Option[String], user_id: Option[Long], user_name:
>>> Option[String], screen_name: Option[String], user_created_at:
>>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>>> user_lang: Option[String], user_location: Option[String], hashtags:
>>> Option[Seq[String]])
>>>
>>> Best,
>>> Georg
>>>
>>

Reply via email to