Hey all, just a follow-up here. I was able to insert our Large Message handling by overriding value.serializer <https://kafka.apache.org/documentation/#producerconfigs_value.serializer> and value.deserializer <https://kafka.apache.org/documentation/#consumerconfigs_value.deserializer> in the consumer and producer configuration that Flink sets, using the `properties.*` option in the Kafka Connector. This approach doesn't require Flink to know anything about large messages or have any major changes made to it.
Flink uses the ByteArray(De|S)erializers by default in its source <https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L457-L470> and sink <https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83>. Overriding the source serializer requires a small change to flip this boolean <https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L464> to make it overridable. I'm planning to start a separate thread to propose making `value.serializer` overridable. On Mon, Jul 8, 2024 at 11:18 AM Kevin Lam <kevin....@shopify.com> wrote: > Hi Fabian, > > Awesome, this project looks great! Thanks for sharing. It would work well > with KafkaSource and the DataStream API as you've mentioned. We have > something similar internally, but where we are encountering difficulty is > integrating it with the Flink SQL Kafka DynamicTable Source and Sinks. Our > Large Message SerDe uses the Kafka Message header to store the URI on > object storage, and currently the Flink SQL Format Interfaces do not allow > passing data to/from the Kafka message headers, which lead me to suggest my > proposal. It's not easy for us to change our Large Message SerDe to use the > value to provide the reference, as it's already widely used and would > require a significant migration. > > However, thinking further, maybe we should not bring in any Large message > concerns into Flink, but instead better support reading and writing > headers from Flink Formats. > > I'm aware of the existing work in progress on handling headers via > FLIP-454 > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-454%3A+New+Apicurio+Avro+format> > and > this mailing list discussion > <https://lists.apache.org/thread/f1y8dzplmsjdzcwsw60cml1rnfl57nol>. > > On Mon, Jul 8, 2024 at 10:08 AM Fabian Paul <fp...@apache.org> wrote: > >> Hi Kevin, >> >> I worked on a project [1] in the past that had a similar purpose. You >> should be able to use a similar approach with the existing KafkaSource by >> implementing your own KafkaRecordDeserializationSchema that hides the >> logic >> of pulling the records from blob storage from the connector. You can even >> use the linked project directly with the KafkaSource using [2] and [3]. >> >> I agree there is room for improvements, like propagating Flink's >> Filesystem >> credentials to the custom deserializer, but the overall idea seems to >> require only very few changes to Flink. >> >> Best, >> Fabian >> >> [1] https://github.com/bakdata/kafka-large-message-serde >> [2] >> >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L107 >> [3] >> >> https://github.com/bakdata/kafka-large-message-serde/blob/09eae933afaf8a1970b1b1bebcdffe934c368cb9/large-message-serde/src/main/java/com/bakdata/kafka/LargeMessageDeserializer.java#L50 >> >> On Mon, Jul 8, 2024 at 3:49 PM Kevin Lam <kevin....@shopify.com.invalid> >> wrote: >> >> > Hi all, >> > >> > Thanks for the responses. >> > >> > Grace those are indeed both challenges, thanks for flagging them. >> Regarding >> > expiry, we could consider having a Mark and Sweep garbage collection >> > system. A service can consume the topics with large messages, and track >> > references. When there are no references left for large messages, they >> can >> > be removed. >> > >> > Martjin, I will take a look at if there's any prior discussions in the >> > Kafka community and send the proposal to the Kafka Dev mailing list if >> it >> > makes sense :). It'd be much preferred if this was natively supported by >> > Kafka, since it's not currently I was also exploring making this work in >> > Flink. >> > >> > >> > >> > On Mon, Jul 8, 2024 at 3:23 AM Martijn Visser <martijnvis...@apache.org >> > >> > wrote: >> > >> > > Hi Kevin, >> > > >> > > I just want to double check, were you planning to send this proposal >> to >> > the >> > > Kafka Dev mailing list? Because I don't see directly how this affects >> > Flink >> > > :) >> > > >> > > Best regards, >> > > >> > > Martijn >> > > >> > > On Mon, Jul 8, 2024 at 8:05 AM Grace Grimwood <ggrim...@redhat.com> >> > wrote: >> > > >> > > > Hi Kevin, >> > > > >> > > > Thanks for starting this thread. >> > > > >> > > > This idea is something that was discussed in Kroxylicious (an open >> > source >> > > > Kafka proxy, I'm a maintainer there). In that discussion [1] we >> came to >> > > the >> > > > conclusion that there are a couple of issues with implementing this: >> > > > 1. Doesn't scale - very large messages (>1GiB) or large batch sizes >> > could >> > > > cause extreme memory bloat in clients, as the entire thing would >> need >> > to >> > > be >> > > > fed into the producer which could very quickly fill its buffers. >> > > Depending >> > > > on how the subsequent deserialization and payload fetch is handled >> at >> > the >> > > > consumer end, it's likely that the same behaviour would also be seen >> > > there. >> > > > 2. Difficult to sync expiry - when Kafka deletes messages due to >> > > retention >> > > > (or topic compaction), it does so without notifying clients. There >> is >> > no >> > > > (easy) way to ensure the associated payload is deleted from object >> > > storage >> > > > at the same time. >> > > > >> > > > It's not totally clear how Conduktor solved these issues, but IMO >> they >> > > are >> > > > worth keeping in mind. For Kroxylicious we decided these problems >> meant >> > > it >> > > > wasn't practical for us to implement this, but I'd be curious to >> know >> > if >> > > > you've got any ideas :) >> > > > >> > > > Regards, >> > > > Grace >> > > > >> > > > [1] https://github.com/kroxylicious/kroxylicious/discussions/1244 >> > > > >> > > > On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam >> <kevin....@shopify.com.invalid >> > > >> > > > wrote: >> > > > >> > > > > Hi all, >> > > > > >> > > > > Writing to see if the community would be open to exploring a FLIP >> for >> > > the >> > > > > Kafka Table Connectors. The FLIP would allow for storing Kafka >> > Messages >> > > > > beyond a Kafka cluster's message limit (1 MB by default) out of >> band >> > in >> > > > > cloud object storage or another backend. >> > > > > >> > > > > During serialization the message would be replaced with a >> reference, >> > > and >> > > > > during deserialization the reference would be used to fetch the >> large >> > > > > message and pass it to Flink. Something like Option 1 in this blog >> > post >> > > > > < >> > > > > >> > > > >> > > >> > >> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0 >> > > > > > >> > > > > . >> > > > > >> > > > > What do you think? >> > > > > >> > > > > We can make it generic by allowing users to implement their own >> > > > > LargeMessageSerializer/Deserializer interface for serializing and >> > > > > deserializing and handling interactions with object storage or >> some >> > > other >> > > > > backend. >> > > > > >> > > > > The Kafka Connectors can be extended to support ConfigOptions to >> > > > > specify the class to load, as well as some user-specified >> properties. >> > > For >> > > > > example: `large-record-handling.class` and ` >> > > > > large-record-handling.properties.*` (where the user can specify >> any >> > > > > properties similar to how the Kafka Consumer and Producer >> properties >> > > are >> > > > > handled >> > > > > < >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201 >> > > > > > >> > > > > ). >> > > > > >> > > > > In terms of call sites for the LargeMessage handling, I think we >> can >> > > > > consider inside of DynamicKafkaDeserializationSchema >> > > > > < >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java >> > > > > > >> > > > > and DynamicKafkaRecordSerializationSchema >> > > > > < >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java >> > > > > >, >> > > > > where the ConsumerRecord >> > > > > < >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108 >> > > > > > >> > > > > and ProducerRecords >> > > > > < >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L75 >> > > > > > >> > > > > are passed respectively. >> > > > > >> > > > > If there's interest, I would be happy to help flesh out the >> proposal >> > > > more. >> > > > > >> > > > >> > > >> > >> >