There's friction with using scala/java protobuf and trying to convert them into a Flink Table from a DataStream[ProtobufObject]. Scenario: Input is a DataStream[ProtobufObject] from a kafka topic that we read data from and deserialise into Protobuf objects (scala case classes or alternatively Java classes) using scalapb https://scalapb.github.io/
Goal: Given a topic name and a protobuf classname, we would like to automatically generate a Flink Table for it. Problems: - The Java Protobuf classes are not Pojos and therefore not recognised. They show up as a single RAW column when converted from streamTableEnv.fromDataStream() - scala protobuf is better, the only issue is with repeated fields. They are represented as Seq in scala, which does not map to a flink table type and shows up as RAW again (only java.util.List types show as proper arrays) - scalapb allows customising the collection type but the standard ones have the same issue: https://scalapb.github.io/docs/customizations/#custom-collection-types - I tried to implement a new collection type that both satisfies the collection type requirements from scalapb as well as that of a java.util.List. but ultimately failed to do so because of signature clashes - flink table api has a protobuf support https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/ but it requires translating the entire protobuf structure manually to SQL type which is not feasible. Questions: - are there plans to support scala Seq for Flink SQL Array Type? Would it be straightforward for me to implement a custom typeinformation(?) to help Flink Table API convert it correctly? - Why is the Java protobuf class not recognised as Pojo? Is it possible to add support for them? - Why does a Flink `CREATE TABLE` from Protobuf require the entire table column structure to be defined in SQL again? Shouldn't fields be inferred automatically from the provided protobuf class? - Are there other ways of solving this challenge that maybe someone has already successfully used? So far my workaround is to implement a custom .map() step to convert the pb object into something readable by the Flink Table API. But that has to be done manually for each individual topic and pb class which does not scale. Would be very glad for any insights to any of those questions above, I have been hitting my head against this several prolonged times over the past year(s) :( Thanks a lot Clemens -- By communicating with Grab Holdings Limited and/or its subsidiaries, associate companies and jointly controlled entities (collectively, “Grab”), you are deemed to have consented to the processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/ <https://grab.com/privacy/> This email contains confidential information that may be privileged and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email. Please notify Grab immediately if you have received this by mistake and delete this email from your system. Email transmission may not be secure or error-free as any information could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab does not accept liability for any errors or omissions in this email that arise as a result of email transmission. All intellectual property rights in this email and any attachments shall remain vested in Grab, unless otherwise provided by law