There's friction with using scala/java protobuf and trying to convert them
into a Flink Table from a DataStream[ProtobufObject].
Scenario:
Input is a DataStream[ProtobufObject] from a kafka topic that we read data
from and deserialise into Protobuf objects (scala case classes or
alternatively Java classes) using scalapb https://scalapb.github.io/

Goal: Given a topic name and a protobuf classname, we would like to
automatically generate a Flink Table for it.

Problems:

   - The Java Protobuf classes are not Pojos and therefore not recognised.
   They show up as a single RAW column when converted from
   streamTableEnv.fromDataStream()
   - scala protobuf is better, the only issue is with repeated fields. They
   are represented as Seq in scala, which does not map to a flink table type
   and shows up as RAW again (only java.util.List types show as proper arrays)
   - scalapb allows customising the collection type but the standard ones
   have the same issue:
   https://scalapb.github.io/docs/customizations/#custom-collection-types
   - I tried to implement a new collection type that both satisfies the
   collection type requirements from scalapb as well as that of a
   java.util.List. but ultimately failed to do so because of signature clashes
   - flink table api has a protobuf support
   
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/
   but it requires translating the entire protobuf structure manually to SQL
   type which is not feasible.

 Questions:
- are there plans to support scala Seq for Flink SQL Array Type? Would it
be straightforward for me to implement a custom typeinformation(?) to help
Flink Table API convert it correctly?

- Why is the Java protobuf class not recognised as Pojo? Is it possible to
add support for them?

- Why does a Flink `CREATE TABLE` from Protobuf require the entire table
column structure to be defined in SQL again? Shouldn't fields be inferred
automatically from the provided protobuf class?
- Are there other ways of solving this challenge that maybe someone has
already successfully used?

So far my workaround is to implement a custom .map() step to convert the pb
object into something readable by the Flink Table API. But that has to be
done manually for each individual topic and pb class which does not scale.

Would be very glad for any insights to any of those questions above, I have
been hitting my head against this several prolonged times over the past
year(s) :(

Thanks a lot

Clemens

-- 


By communicating with Grab Holdings Limited and/or its subsidiaries, 
associate companies and jointly controlled entities (collectively, “Grab”), 
you are deemed to have consented to the processing of your personal data as 
set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/  <https://grab.com/privacy/>


 This email 
contains confidential information that may be privileged and is only for 
the intended recipient(s). If you are not the intended recipient(s), please 
do not disseminate, distribute or copy this email. Please notify Grab 
immediately if you have received this by mistake and delete this email from 
your system. Email transmission may not be secure or error-free as any 
information could be intercepted, corrupted, lost, destroyed, delayed or 
incomplete, or contain viruses. Grab does not accept liability for any 
errors or omissions in this email that arise as a result of email 
transmission. All intellectual property rights in this email and any 
attachments shall remain vested in Grab, unless otherwise provided by law

Reply via email to