Hi,
I want to map a stream of JSON documents from Kafka to a scala case-class.
How can this be accomplished using the JSONKeyValueDeserializationSchema?Is
a manual mapping of object nodes required?
I have a Spark background. There, such manual mappings usually are
discouraged. Instead, they offer a nice API (dataset API) to perform such a
type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to be
faster
In Flink, instead, such off-heap optimizations seem not to be talked much
about (sorry if I miss something, I am a Flink newbie). Is there a reason
why these optimizations are not necessary in Flink?
How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
new FlinkKafkaConsumer(
"tweets-raw-json",
serializer,
properties
).setStartFromEarliest() // TODO experiment with different start values
)
to map to this Tweet class concisely, i.e. without manually iterating
through all the attribute fields and parsing the keys from the object node
tree.
final case class Tweet(tweet_id: Option[String], text: Option[String],
source: Option[String], geo: Option[String], place: Option[String], lang:
Option[String], created_at: Option[String], timestamp_ms: Option[String],
coordinates: Option[String], user_id: Option[Long], user_name:
Option[String], screen_name: Option[String], user_created_at:
Option[String], followers_count: Option[Long], friends_count: Option[Long],
user_lang: Option[String], user_location: Option[String], hashtags:
Option[Seq[String]])
Best,
Georg