Hi Georg,

I'm afraid the other suggestions are missing the point a bit. From your other emails it seems you want to use Kafka with JSON records together with the Table API/SQL. For that, take a look at [1] which describes how to define data sources for the Table API. Especially the Kafka and JSON sections should be relevant.

That first link I mentioned is for the legacy connector API. There is a newer API with slightly different properties which will allow us to do the kinds of optimization like working on binary data throughout the stack: [2]. Unfortunately, there is no programmatic API yet, you would have to use `TableEnvironment.executeSql()` to execute SQL DDL that defines your sources. There is a FLIP for adding the programmatic API: [3]

Best,
Aljoscha

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/

[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API

On 10.07.20 05:01, Aaron Levin wrote:
Hi Georg, you can try using the circe library for this which has a way to
automatically generate JSON decoders for scala case classes.

As it was mentioned earlier, Flink does not come packaged with
JSON-decoding generators for Scala like spark does.

On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

Great. Thanks.
But would it be possible to automate this i.e. to have this work
automatically for the case class / product?

Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
taher...@gmail.com>:

The performant way would be to apply a map function over the stream and
then use the Jackson ObjectMapper to convert to scala objects. In flink
there is no API like Spark to automatically get all fields.

On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

How can I use it with a scala case class?
If I understand it correctly for better performance the Object Mapper is
already initialized in each KafkaConsumer and returning ObjectNodes. So
probably I should rephrase to: how can I then map these to case classes
without handcoding it?  https://github.com/json4s/json4s or
https://github.com/FasterXML/jackson-module-scala both only seem to
consume strings.

Best,
Georg

Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
taher...@gmail.com>:

You can try the Jackson ObjectMapper library and that will get you from
json to object.

Regards,
Taher Koitawala

On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

Hi,

I want to map a stream of JSON documents from Kafka to a scala
case-class. How can this be accomplished using the
JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
required?

I have a Spark background. There, such manual mappings usually are
discouraged. Instead, they offer a nice API (dataset API) to perform such a
type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to
be faster

In Flink, instead, such off-heap optimizations seem not to be talked
much about (sorry if I miss something, I am a Flink newbie). Is there a
reason why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
     new FlinkKafkaConsumer(
       "tweets-raw-json",
       serializer,
       properties
     ).setStartFromEarliest() // TODO experiment with different start
values
   )

to map to this Tweet class concisely, i.e. without manually iterating
through all the attribute fields and parsing the keys from the object node
tree.

final case class Tweet(tweet_id: Option[String], text: Option[String],
source: Option[String], geo: Option[String], place: Option[String], lang:
Option[String], created_at: Option[String], timestamp_ms: Option[String],
coordinates: Option[String], user_id: Option[Long], user_name:
Option[String], screen_name: Option[String], user_created_at:
Option[String], followers_count: Option[Long], friends_count: Option[Long],
user_lang: Option[String], user_location: Option[String], hashtags:
Option[Seq[String]])

Best,
Georg




Reply via email to