Apologies David, here's a temporary link to the image: https://screenshot.click/19-19-0xwls-jfx7s.png
On Thu, Jul 18, 2024 at 9:00 AM David Radley <david_rad...@uk.ibm.com> wrote: > Hi Kevin, > That sounds good, unfortunately the image did not come through in the > email for me, kind regards, David > > From: Kevin Lam <kevin....@shopify.com.INVALID> > Date: Wednesday, 10 July 2024 at 19:20 > To: dev@flink.apache.org <dev@flink.apache.org> > Subject: [EXTERNAL] Re: Potential Kafka Connector FLIP: Large Message > Handling > Hey David, Yes my proposal was originally to do this at the connector > level, as you mentioned it doesn't make sense in a format. In the end I > took the approach in my previous e-mail: I was able to insert our Large > Message handling by overriding > > Hey David, > > Yes my proposal was originally to do this at the connector level, as you > mentioned it doesn't make sense in a format. In the end I took the approach > in my previous e-mail: > > I was able to insert our Large Message handling by overriding > value.serializer< > https://kafka.apache.org/documentation/#producerconfigs_value.serializer> > and value.deserializer< > https://kafka.apache.org/documentation/#consumerconfigs_value.deserializer> > in the consumer and producer configuration that Flink sets, using the > `properties.*` option in the Kafka Connector. This approach doesn't require > Flink to know anything about large messages or have any major changes made > to it. > Flink uses the ByteArray(De|S)erializers by default in its source< > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L457-L470> > and sink< > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83>. > Overriding the source serializer requires a small change to flip this > boolean< > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L464> > to make it overridable. I'm planning to start a separate thread to propose > making `value.serializer` overridable. > > This is a rough representation of the before and after for producers > (similar applies for consumers): > [cid:ii_lyg5y5qx0] > I assume you would want to add configuration to define where the external > storage lives and authentication. Limitations around stack and heap sizes > would be worth considering. > > Yes, we're implementing the 'claim check' pattern and this is definitely > something we're considering! Thanks for raising it > > On Wed, Jul 10, 2024 at 11:06 AM David Radley <david_rad...@uk.ibm.com > <mailto:david_rad...@uk.ibm.com>> wrote: > Hi Kevin, > You mention the link > https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0 > , I assume this is the approach you are considering. And that this is being > done at the connector level, as the message could be in any of the existing > supported formats – so is not appropriate as a new format. It sounds like > for deserialization, the reference to external storage header would be > found in your deser and the contents then taken from external source and > put into the Kafka body or the other way round for serialization. This is > different to my Apicurio work that is to handle format specific headers. > > I assume you would want to add configuration to define where the external > storage lives and authentication. Limitations around stack and heap sizes > would be worth considering. > > Am I understanding your intent correctly? > Kind regards, David. > > > From: Kevin Lam <kevin....@shopify.com.INVALID> > Date: Wednesday, 10 July 2024 at 14:35 > To: dev@flink.apache.org<mailto:dev@flink.apache.org> < > dev@flink.apache.org<mailto:dev@flink.apache.org>> > Subject: [EXTERNAL] Re: Potential Kafka Connector FLIP: Large Message > Handling > Hey all, just a follow-up here. I was able to insert our Large Message > handling by overriding value.serializer > <https://kafka.apache.org/documentation/#producerconfigs_value.serializer> > and value.deserializer > < > https://kafka.apache.org/documentation/#consumerconfigs_value.deserializer > > > in the consumer and producer configuration that Flink sets, using the > `properties.*` option in the Kafka Connector. This approach doesn't require > Flink to know anything about large messages or have any major changes made > to it. > > Flink uses the ByteArray(De|S)erializers by default in its source > < > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L457-L470 > > > and sink > < > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83 > >. > Overriding the source serializer requires a small change to flip this > boolean > < > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L464 > > > to make it overridable. I'm planning to start a separate thread to propose > making `value.serializer` overridable. > > > On Mon, Jul 8, 2024 at 11:18 AM Kevin Lam <kevin....@shopify.com<mailto: > kevin....@shopify.com>> wrote: > > > Hi Fabian, > > > > Awesome, this project looks great! Thanks for sharing. It would work well > > with KafkaSource and the DataStream API as you've mentioned. We have > > something similar internally, but where we are encountering difficulty is > > integrating it with the Flink SQL Kafka DynamicTable Source and Sinks. > Our > > Large Message SerDe uses the Kafka Message header to store the URI on > > object storage, and currently the Flink SQL Format Interfaces do not > allow > > passing data to/from the Kafka message headers, which lead me to suggest > my > > proposal. It's not easy for us to change our Large Message SerDe to use > the > > value to provide the reference, as it's already widely used and would > > require a significant migration. > > > > However, thinking further, maybe we should not bring in any Large message > > concerns into Flink, but instead better support reading and writing > > headers from Flink Formats. > > > > I'm aware of the existing work in progress on handling headers via > > FLIP-454 > > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-454%3A+New+Apicurio+Avro+format> > and > > this mailing list discussion > > <https://lists.apache.org/thread/f1y8dzplmsjdzcwsw60cml1rnfl57nol>. > > > > On Mon, Jul 8, 2024 at 10:08 AM Fabian Paul <fp...@apache.org<mailto: > fp...@apache.org>> wrote: > > > >> Hi Kevin, > >> > >> I worked on a project [1] in the past that had a similar purpose. You > >> should be able to use a similar approach with the existing KafkaSource > by > >> implementing your own KafkaRecordDeserializationSchema that hides the > >> logic > >> of pulling the records from blob storage from the connector. You can > even > >> use the linked project directly with the KafkaSource using [2] and [3]. > >> > >> I agree there is room for improvements, like propagating Flink's > >> Filesystem > >> credentials to the custom deserializer, but the overall idea seems to > >> require only very few changes to Flink. > >> > >> Best, > >> Fabian > >> > >> [1] https://github.com/bakdata/kafka-large-message-serde > >> [2] > >> > >> > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L107 > >> [3] > >> > >> > https://github.com/bakdata/kafka-large-message-serde/blob/09eae933afaf8a1970b1b1bebcdffe934c368cb9/large-message-serde/src/main/java/com/bakdata/kafka/LargeMessageDeserializer.java#L50 > >> > >> On Mon, Jul 8, 2024 at 3:49 PM Kevin Lam <kevin....@shopify.com.invalid > > > >> wrote: > >> > >> > Hi all, > >> > > >> > Thanks for the responses. > >> > > >> > Grace those are indeed both challenges, thanks for flagging them. > >> Regarding > >> > expiry, we could consider having a Mark and Sweep garbage collection > >> > system. A service can consume the topics with large messages, and > track > >> > references. When there are no references left for large messages, they > >> can > >> > be removed. > >> > > >> > Martjin, I will take a look at if there's any prior discussions in the > >> > Kafka community and send the proposal to the Kafka Dev mailing list if > >> it > >> > makes sense :). It'd be much preferred if this was natively supported > by > >> > Kafka, since it's not currently I was also exploring making this work > in > >> > Flink. > >> > > >> > > >> > > >> > On Mon, Jul 8, 2024 at 3:23 AM Martijn Visser < > martijnvis...@apache.org<mailto:martijnvis...@apache.org> > >> > > >> > wrote: > >> > > >> > > Hi Kevin, > >> > > > >> > > I just want to double check, were you planning to send this proposal > >> to > >> > the > >> > > Kafka Dev mailing list? Because I don't see directly how this > affects > >> > Flink > >> > > :) > >> > > > >> > > Best regards, > >> > > > >> > > Martijn > >> > > > >> > > On Mon, Jul 8, 2024 at 8:05 AM Grace Grimwood <ggrim...@redhat.com > <mailto:ggrim...@redhat.com>> > >> > wrote: > >> > > > >> > > > Hi Kevin, > >> > > > > >> > > > Thanks for starting this thread. > >> > > > > >> > > > This idea is something that was discussed in Kroxylicious (an open > >> > source > >> > > > Kafka proxy, I'm a maintainer there). In that discussion [1] we > >> came to > >> > > the > >> > > > conclusion that there are a couple of issues with implementing > this: > >> > > > 1. Doesn't scale - very large messages (>1GiB) or large batch > sizes > >> > could > >> > > > cause extreme memory bloat in clients, as the entire thing would > >> need > >> > to > >> > > be > >> > > > fed into the producer which could very quickly fill its buffers. > >> > > Depending > >> > > > on how the subsequent deserialization and payload fetch is handled > >> at > >> > the > >> > > > consumer end, it's likely that the same behaviour would also be > seen > >> > > there. > >> > > > 2. Difficult to sync expiry - when Kafka deletes messages due to > >> > > retention > >> > > > (or topic compaction), it does so without notifying clients. There > >> is > >> > no > >> > > > (easy) way to ensure the associated payload is deleted from object > >> > > storage > >> > > > at the same time. > >> > > > > >> > > > It's not totally clear how Conduktor solved these issues, but IMO > >> they > >> > > are > >> > > > worth keeping in mind. For Kroxylicious we decided these problems > >> meant > >> > > it > >> > > > wasn't practical for us to implement this, but I'd be curious to > >> know > >> > if > >> > > > you've got any ideas :) > >> > > > > >> > > > Regards, > >> > > > Grace > >> > > > > >> > > > [1] https://github.com/kroxylicious/kroxylicious/discussions/1244 > >> > > > > >> > > > On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam > >> <kevin....@shopify.com.invalid > >> > > > >> > > > wrote: > >> > > > > >> > > > > Hi all, > >> > > > > > >> > > > > Writing to see if the community would be open to exploring a > FLIP > >> for > >> > > the > >> > > > > Kafka Table Connectors. The FLIP would allow for storing Kafka > >> > Messages > >> > > > > beyond a Kafka cluster's message limit (1 MB by default) out of > >> band > >> > in > >> > > > > cloud object storage or another backend. > >> > > > > > >> > > > > During serialization the message would be replaced with a > >> reference, > >> > > and > >> > > > > during deserialization the reference would be used to fetch the > >> large > >> > > > > message and pass it to Flink. Something like Option 1 in this > blog > >> > post > >> > > > > < > >> > > > > > >> > > > > >> > > > >> > > >> > https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0 > >> > > > > > > >> > > > > . > >> > > > > > >> > > > > What do you think? > >> > > > > > >> > > > > We can make it generic by allowing users to implement their own > >> > > > > LargeMessageSerializer/Deserializer interface for serializing > and > >> > > > > deserializing and handling interactions with object storage or > >> some > >> > > other > >> > > > > backend. > >> > > > > > >> > > > > The Kafka Connectors can be extended to support ConfigOptions to > >> > > > > specify the class to load, as well as some user-specified > >> properties. > >> > > For > >> > > > > example: `large-record-handling.class` and ` > >> > > > > large-record-handling.properties.*` (where the user can specify > >> any > >> > > > > properties similar to how the Kafka Consumer and Producer > >> properties > >> > > are > >> > > > > handled > >> > > > > < > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201 > >> > > > > > > >> > > > > ). > >> > > > > > >> > > > > In terms of call sites for the LargeMessage handling, I think we > >> can > >> > > > > consider inside of DynamicKafkaDeserializationSchema > >> > > > > < > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java > >> > > > > > > >> > > > > and DynamicKafkaRecordSerializationSchema > >> > > > > < > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java > >> > > > > >, > >> > > > > where the ConsumerRecord > >> > > > > < > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108 > >> > > > > > > >> > > > > and ProducerRecords > >> > > > > < > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L75 > >> > > > > > > >> > > > > are passed respectively. > >> > > > > > >> > > > > If there's interest, I would be happy to help flesh out the > >> proposal > >> > > > more. > >> > > > > > >> > > > > >> > > > >> > > >> > > > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU >