Hey Theodor, That's pretty much it, assuming your Protobuf schema is more or less fixed. But for a production workload, you'd need to add a Schema Registry lookup. I guess the implementation for that would be similar to what's in the Avro format.
On Tue, Nov 29, 2022 at 2:26 AM Theodor Wübker <theo.wueb...@inside-m2m.de> wrote: > Hey all, > > so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON > that create messages with a magic byte followed by a 4 byte schema id > followed by the actual payload (refer the docs > <https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format>). > When I try to read such messages with the regular Protobuf, Avro and JSON > formats in my Table API Program, it of course does not work. For Avro, > Flink also has a Confluent-Avro format that can deal with this. However for > Protobuf and JSON, there is nothing like this yet. I saw a ticket in the > JIRA > <https://issues.apache.org/jira/browse/FLINK-29731?filter=-4&jql=project%20=%20FLINK%20AND%20issuetype%20=%20%22New%20Feature%22%20AND%20text%20~%20%22protobuf%22%20order%20by%20created%20DESC>, > but I cannot wait for this. Hence I wonder, how much effort it would be, to > implement this myself - not in a production-ready way, but just in a way > that makes my program not break. Meaning I would be happy with a solution > that just ignores the first 5 bytes and passes the rest on to the existing > handlers of Protobuf and JSON formats. > > > Now lets take for Example the existing Protobuf Format: I assume I have to > implement the DeserializationFormatFactory, create a few Decoding and > Encoding Formats, just like the PbDecodingFormat for example, then a new > DeserializationSchema and there I would have such a method > <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.html> > : > > @Override > public RowData deserialize(byte[] message) throws IOException { > try { > return protoToRowConverter.convertProtoBinaryToRow(message); > } catch (Throwable t) { > if (formatConfig.isIgnoreParseErrors()) { > return null; > } > throw new IOException("Failed to deserialize PB object.", t); > } > } > > But instead of converting the message immediately, I would slice the first > few Bytes off and go from there. Is this pretty much it, or is there more > to it? > > -Theo > >