Hi,

I want to map a stream of JSON documents from Kafka to a scala case-class.
How can this be accomplished using the JSONKeyValueDeserializationSchema?Is
a manual mapping of object nodes required?

I have a Spark background. There, such manual mappings usually are
discouraged. Instead, they offer a nice API (dataset API) to perform such a
type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to be
faster

In Flink, instead, such off-heap optimizations seem not to be talked much
about (sorry if I miss something, I am a Flink newbie). Is there a reason
why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
    new FlinkKafkaConsumer(
      "tweets-raw-json",
      serializer,
      properties
    ).setStartFromEarliest() // TODO experiment with different start values
  )

to map to this Tweet class concisely, i.e. without manually iterating
through all the attribute fields and parsing the keys from the object node
tree.

final case class Tweet(tweet_id: Option[String], text: Option[String],
source: Option[String], geo: Option[String], place: Option[String], lang:
Option[String], created_at: Option[String], timestamp_ms: Option[String],
coordinates: Option[String], user_id: Option[Long], user_name:
Option[String], screen_name: Option[String], user_created_at:
Option[String], followers_count: Option[Long], friends_count: Option[Long],
user_lang: Option[String], user_location: Option[String], hashtags:
Option[Seq[String]])

Best,
Georg

Reply via email to