Hi all,

Writing to see if the community would be open to exploring a FLIP for the
Kafka Table Connectors. The FLIP would allow for storing Kafka Messages
beyond a Kafka cluster's message limit (1 MB by default) out of band in
cloud object storage or another backend.

During serialization the message would be replaced with a reference, and
during deserialization the reference would be used to fetch the large
message and pass it to Flink. Something like Option 1 in this blog post
<https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0>
.

What do you think?

We can make it generic by allowing users to implement their own
LargeMessageSerializer/Deserializer interface for serializing and
deserializing and handling interactions with object storage or some other
backend.

The Kafka Connectors can be extended to support ConfigOptions to
specify the class to load, as well as some user-specified properties. For
example: `large-record-handling.class` and `
large-record-handling.properties.*` (where the user can specify any
properties similar to how the Kafka Consumer and Producer properties are
handled
<https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201>
).

In terms of call sites for the LargeMessage handling, I think we can
consider inside of DynamicKafkaDeserializationSchema
<https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java>
and DynamicKafkaRecordSerializationSchema
<https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java>,
where the ConsumerRecord
<https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108>
and ProducerRecords
<https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L75>
are passed respectively.

If there's interest, I would be happy to help flesh out the proposal more.

Reply via email to