Hey Theodor,

That's pretty much it, assuming your Protobuf schema is more or less fixed.
But for a production workload, you'd need to add a Schema Registry lookup.
I guess the implementation for that would be similar to what's in the Avro
format.

On Tue, Nov 29, 2022 at 2:26 AM Theodor Wübker <theo.wueb...@inside-m2m.de>
wrote:

> Hey all,
>
> so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON
> that create messages with a magic byte followed by a 4 byte schema id
> followed by the actual payload (refer the docs
> <https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format>).
> When I try to read such messages with the regular Protobuf, Avro and JSON
> formats in my Table API Program, it of course does not work. For Avro,
> Flink also has a Confluent-Avro format that can deal with this. However for
> Protobuf and JSON, there is nothing like this yet. I saw a ticket in the
> JIRA
> <https://issues.apache.org/jira/browse/FLINK-29731?filter=-4&jql=project%20=%20FLINK%20AND%20issuetype%20=%20%22New%20Feature%22%20AND%20text%20~%20%22protobuf%22%20order%20by%20created%20DESC>,
> but I cannot wait for this. Hence I wonder, how much effort it would be, to
> implement this myself - not in a production-ready way, but just in a way
> that makes my program not break. Meaning I would be happy with a solution
> that just ignores the first 5 bytes and passes the rest on to the existing
> handlers of Protobuf and JSON formats.
>
>
> Now lets take for Example the existing Protobuf Format: I assume I have to
> implement the DeserializationFormatFactory, create a few Decoding and
> Encoding Formats, just like the PbDecodingFormat for example, then a new
> DeserializationSchema and there I would have such a method
> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.html>
> :
>
> @Override
> public RowData deserialize(byte[] message) throws IOException {
>     try {
>         return protoToRowConverter.convertProtoBinaryToRow(message);
>     } catch (Throwable t) {
>         if (formatConfig.isIgnoreParseErrors()) {
>             return null;
>         }
>         throw new IOException("Failed to deserialize PB object.", t);
>     }
> }
>
> But instead of converting the message immediately, I would slice the first
> few Bytes off and go from there. Is this pretty much it, or is there more
> to it?
>
> -Theo
>
>

Reply via email to