Hi All, I'm going to attempt to revive the discussion for FLIP-34440 <https://issues.apache.org/jira/browse/FLINK-34440> since the original discussion thread has stalled. Fresh start! :)
Let's start with discussing the approach as per the Flink contribution guides! First of all, I think it's important to note that we're talking about (at least) 2 formats here: - flink-protobuf-confluent: A source and sink format that can read / write messages using the Confluent wire format. - flink-protobuf-confluent-debezium: A source format that can read Debezium changelog events which are serialized to the Confluent wire format Since both of these formats will deal with messages that follow the Confluent wire format, I think it makes sense to build flink-protobuf-confluent-debezium as a special-purpose implementation of flink-protobuf-confluent, just like it's done for avro-confluent and avro-debezium. The second big question in my opinion is to decide the extent to which flink-protobuf-confluent should reuse the existing flink-protobuf implementation. An important distinction between flink-protobuf and flink-protobuf-confluent is that the former always expects that the Java classes generated from the .proto files are available on the classpath at boot time. On the other hand, when using the Confluent wire format and the Confluent schema registry, producers and consumers very often resort to dynamic de/serialization. Therefore a common pattern for Confluent protobuf serdes is to use `DynamicMessage` types to bridge bytes to Java and vice versa. In my opinion, going with DynamicMessages would present 2 problems: 1. The behavior of flink-protobuf and flink-protobuf-confluent would diverge unless we take a lot of extra care. This would be confusing to users who will expect that flink-protobuf and flink-protobuf-confluent map proto bytes to / from Flink identically. 2. It would mean that we disregard the choice of the original implementers of flink-protobuf who deliberately decided against DynamicMessages due to its slower performance compared with native Java classes. The alternative to using DynamicMessages is to use more runtime code generation and reuse the Java <-> Flink mapping from flink-protobuf. The flow would look like this: ### Deserializer - Fetch the message's protobuf descriptor from the Confluent schema registry - Generate a java class from the descriptor at runtime - Deserialize `byte[]`s to the generated `protobuf.Message` type using a `io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer` - Delegate the work of converting between a `protobuf.Message` and a `RowData` object to the existing flink-protobuf format ### Serializer - Convert the user's `RowType` to a protobuf descriptor - Generate a java class from the descriptor at runtime - Delegate the `RowData` -> `AbstractMessage` conversion to the existing flink-protobuf format - Serialize the `AbstractMessage` object using a `io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer` --- That's hopefully enough context to get the conversation going. Looking forward to the community's response! cc @Kevin Lam <kevin....@shopify.com> -- David Mariassy Staff Production Engineer [image: Shopify] <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>