Hi All,

I'm going to attempt to revive the discussion for FLIP-34440
<https://issues.apache.org/jira/browse/FLINK-34440> since the original
discussion thread has stalled. Fresh start! :)

Let's start with discussing the approach as per the Flink contribution
guides!

First of all, I think it's important to note that we're talking about (at
least) 2 formats here:
- flink-protobuf-confluent: A source and sink format that can read / write
messages using the Confluent wire format.
- flink-protobuf-confluent-debezium: A source format that can read Debezium
changelog events which are serialized to the Confluent wire format

Since both of these formats will deal with messages that follow the
Confluent wire format, I think it makes sense to build
flink-protobuf-confluent-debezium as a special-purpose implementation of
flink-protobuf-confluent, just like it's done for avro-confluent and
avro-debezium.

The second big question in my opinion is to decide the extent to which
flink-protobuf-confluent should reuse the existing flink-protobuf
implementation. An important distinction between flink-protobuf and
flink-protobuf-confluent is that the former always expects that the Java
classes generated from the .proto files are available on the classpath at
boot time. On the other hand, when using the Confluent wire format and the
Confluent schema registry, producers and consumers very often resort to
dynamic de/serialization. Therefore a common pattern for Confluent protobuf
serdes is to use `DynamicMessage` types to bridge bytes to Java and vice
versa.

In my opinion, going with DynamicMessages would present 2 problems:
1. The behavior of flink-protobuf and flink-protobuf-confluent would
diverge unless we take a lot of extra care. This would be confusing to
users who will expect that flink-protobuf and flink-protobuf-confluent map
proto bytes to / from Flink identically.
2. It would mean that we disregard the choice of the original implementers
of flink-protobuf who deliberately decided against DynamicMessages due to
its slower performance compared with native Java classes.

The alternative to using DynamicMessages is to use more runtime code
generation and reuse the Java <-> Flink mapping from flink-protobuf. The
flow would look like this:

### Deserializer

- Fetch the message's protobuf descriptor from the Confluent schema registry
- Generate a java class from the descriptor at runtime
- Deserialize `byte[]`s to the generated `protobuf.Message` type using a
`io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer`
- Delegate the work of converting between a `protobuf.Message` and a
`RowData` object to the existing flink-protobuf format

### Serializer

- Convert the user's `RowType` to a protobuf descriptor
- Generate a java class from the descriptor at runtime
- Delegate the `RowData` -> `AbstractMessage` conversion to the existing
flink-protobuf format
- Serialize the `AbstractMessage` object using a
`io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer`

---

That's hopefully enough context to get the conversation going. Looking
forward to the community's response!

cc @Kevin Lam <kevin....@shopify.com>

-- 
David Mariassy
Staff Production Engineer
[image: Shopify]
<https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>

Reply via email to