Apologies David, here's a temporary link to the image:
https://screenshot.click/19-19-0xwls-jfx7s.png


On Thu, Jul 18, 2024 at 9:00 AM David Radley <david_rad...@uk.ibm.com>
wrote:

> Hi Kevin,
> That sounds good, unfortunately the image did not come through in the
> email for me,    kind regards, David
>
> From: Kevin Lam <kevin....@shopify.com.INVALID>
> Date: Wednesday, 10 July 2024 at 19:20
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] Re: Potential Kafka Connector FLIP: Large Message
> Handling
> Hey David, Yes my proposal was originally to do this at the connector
> level, as you mentioned it doesn't make sense in a format. In the end I
> took the approach in my previous e-mail: I was able to insert our Large
> Message handling by overriding
>
> Hey David,
>
> Yes my proposal was originally to do this at the connector level, as you
> mentioned it doesn't make sense in a format. In the end I took the approach
> in my previous e-mail:
>
> I was able to insert our Large Message handling by overriding
> value.serializer<
> https://kafka.apache.org/documentation/#producerconfigs_value.serializer>
> and value.deserializer<
> https://kafka.apache.org/documentation/#consumerconfigs_value.deserializer>
> in the consumer and producer configuration that Flink sets, using the
> `properties.*` option in the Kafka Connector. This approach doesn't require
> Flink to know anything about large messages or have any major changes made
> to it.
> Flink uses the ByteArray(De|S)erializers by default in its source<
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L457-L470>
> and sink<
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83>.
> Overriding the source serializer requires a small change to flip this
> boolean<
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L464>
> to make it overridable. I'm planning to start a separate thread to propose
> making `value.serializer` overridable.
>
> This is a rough representation of the before and after for producers
> (similar applies for consumers):
> [cid:ii_lyg5y5qx0]
> I assume you would want to add configuration to define where the external
> storage lives and authentication.  Limitations around stack and heap sizes
> would be worth considering.
>
> Yes, we're implementing the 'claim check' pattern and this is definitely
> something we're considering! Thanks for raising it
>
> On Wed, Jul 10, 2024 at 11:06 AM David Radley <david_rad...@uk.ibm.com
> <mailto:david_rad...@uk.ibm.com>> wrote:
> Hi Kevin,
> You mention the link
> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0
> , I assume this is the approach you are considering. And that this is being
> done at the connector level, as the message could be in any of the existing
> supported formats – so is not appropriate as a new format. It sounds like
> for deserialization, the reference to external storage header would be
> found in your deser and the contents then taken from external source and
> put into the Kafka body or the other way round for serialization. This is
> different to my Apicurio work that is to handle format specific headers.
>
> I assume you would want to add configuration to define where the external
> storage lives and authentication.  Limitations around stack and heap sizes
> would be worth considering.
>
> Am I understanding your intent correctly?
>       Kind regards,  David.
>
>
> From: Kevin Lam <kevin....@shopify.com.INVALID>
> Date: Wednesday, 10 July 2024 at 14:35
> To: dev@flink.apache.org<mailto:dev@flink.apache.org> <
> dev@flink.apache.org<mailto:dev@flink.apache.org>>
> Subject: [EXTERNAL] Re: Potential Kafka Connector FLIP: Large Message
> Handling
> Hey all, just a follow-up here. I was able to insert our Large Message
> handling by overriding value.serializer
> <https://kafka.apache.org/documentation/#producerconfigs_value.serializer>
> and value.deserializer
> <
> https://kafka.apache.org/documentation/#consumerconfigs_value.deserializer
> >
>  in the consumer and producer configuration that Flink sets, using the
> `properties.*` option in the Kafka Connector. This approach doesn't require
> Flink to know anything about large messages or have any major changes made
> to it.
>
> Flink uses the ByteArray(De|S)erializers by default in its source
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L457-L470
> >
> and sink
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83
> >.
> Overriding the source serializer requires a small change to flip this
> boolean
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L464
> >
> to make it overridable. I'm planning to start a separate thread to propose
> making `value.serializer` overridable.
>
>
> On Mon, Jul 8, 2024 at 11:18 AM Kevin Lam <kevin....@shopify.com<mailto:
> kevin....@shopify.com>> wrote:
>
> > Hi Fabian,
> >
> > Awesome, this project looks great! Thanks for sharing. It would work well
> > with KafkaSource and the DataStream API as you've mentioned. We have
> > something similar internally, but where we are encountering difficulty is
> > integrating it with the Flink SQL Kafka DynamicTable Source and Sinks.
> Our
> > Large Message SerDe uses the Kafka Message header to store the URI on
> > object storage, and currently the Flink SQL Format Interfaces do not
> allow
> > passing data to/from the Kafka message headers, which lead me to suggest
> my
> > proposal. It's not easy for us to change our Large Message SerDe to use
> the
> > value to provide the reference, as it's already widely used and would
> > require a significant migration.
> >
> > However, thinking further, maybe we should not bring in any Large message
> > concerns into Flink, but instead better support reading and writing
> > headers from Flink Formats.
> >
> > I'm aware of the existing work in progress on handling headers via
> > FLIP-454
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-454%3A+New+Apicurio+Avro+format>
> and
> > this mailing list discussion
> > <https://lists.apache.org/thread/f1y8dzplmsjdzcwsw60cml1rnfl57nol>.
> >
> > On Mon, Jul 8, 2024 at 10:08 AM Fabian Paul <fp...@apache.org<mailto:
> fp...@apache.org>> wrote:
> >
> >> Hi Kevin,
> >>
> >> I worked on a project [1] in the past that had a similar purpose. You
> >> should be able to use a similar approach with the existing KafkaSource
> by
> >> implementing your own KafkaRecordDeserializationSchema that hides the
> >> logic
> >> of pulling the records from blob storage from the connector. You can
> even
> >> use the linked project directly with the KafkaSource using [2] and [3].
> >>
> >> I agree there is room for improvements, like propagating Flink's
> >> Filesystem
> >> credentials to the custom deserializer, but the overall idea seems to
> >> require only very few changes to Flink.
> >>
> >> Best,
> >> Fabian
> >>
> >> [1] https://github.com/bakdata/kafka-large-message-serde
> >> [2]
> >>
> >>
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L107
> >> [3]
> >>
> >>
> https://github.com/bakdata/kafka-large-message-serde/blob/09eae933afaf8a1970b1b1bebcdffe934c368cb9/large-message-serde/src/main/java/com/bakdata/kafka/LargeMessageDeserializer.java#L50
> >>
> >> On Mon, Jul 8, 2024 at 3:49 PM Kevin Lam <kevin....@shopify.com.invalid
> >
> >> wrote:
> >>
> >> > Hi all,
> >> >
> >> > Thanks for the responses.
> >> >
> >> > Grace those are indeed both challenges, thanks for flagging them.
> >> Regarding
> >> > expiry, we could consider having a Mark and Sweep garbage collection
> >> > system. A service can consume the topics with large messages, and
> track
> >> > references. When there are no references left for large messages, they
> >> can
> >> > be removed.
> >> >
> >> > Martjin, I will take a look at if there's any prior discussions in the
> >> > Kafka community and send the proposal to the Kafka Dev mailing list if
> >> it
> >> > makes sense :). It'd be much preferred if this was natively supported
> by
> >> > Kafka, since it's not currently I was also exploring making this work
> in
> >> > Flink.
> >> >
> >> >
> >> >
> >> > On Mon, Jul 8, 2024 at 3:23 AM Martijn Visser <
> martijnvis...@apache.org<mailto:martijnvis...@apache.org>
> >> >
> >> > wrote:
> >> >
> >> > > Hi Kevin,
> >> > >
> >> > > I just want to double check, were you planning to send this proposal
> >> to
> >> > the
> >> > > Kafka Dev mailing list? Because I don't see directly how this
> affects
> >> > Flink
> >> > > :)
> >> > >
> >> > > Best regards,
> >> > >
> >> > > Martijn
> >> > >
> >> > > On Mon, Jul 8, 2024 at 8:05 AM Grace Grimwood <ggrim...@redhat.com
> <mailto:ggrim...@redhat.com>>
> >> > wrote:
> >> > >
> >> > > > Hi Kevin,
> >> > > >
> >> > > > Thanks for starting this thread.
> >> > > >
> >> > > > This idea is something that was discussed in Kroxylicious (an open
> >> > source
> >> > > > Kafka proxy, I'm a maintainer there). In that discussion [1] we
> >> came to
> >> > > the
> >> > > > conclusion that there are a couple of issues with implementing
> this:
> >> > > > 1. Doesn't scale - very large messages (>1GiB) or large batch
> sizes
> >> > could
> >> > > > cause extreme memory bloat in clients, as the entire thing would
> >> need
> >> > to
> >> > > be
> >> > > > fed into the producer which could very quickly fill its buffers.
> >> > > Depending
> >> > > > on how the subsequent deserialization and payload fetch is handled
> >> at
> >> > the
> >> > > > consumer end, it's likely that the same behaviour would also be
> seen
> >> > > there.
> >> > > > 2. Difficult to sync expiry - when Kafka deletes messages due to
> >> > > retention
> >> > > > (or topic compaction), it does so without notifying clients. There
> >> is
> >> > no
> >> > > > (easy) way to ensure the associated payload is deleted from object
> >> > > storage
> >> > > > at the same time.
> >> > > >
> >> > > > It's not totally clear how Conduktor solved these issues, but IMO
> >> they
> >> > > are
> >> > > > worth keeping in mind. For Kroxylicious we decided these problems
> >> meant
> >> > > it
> >> > > > wasn't practical for us to implement this, but I'd be curious to
> >> know
> >> > if
> >> > > > you've got any ideas :)
> >> > > >
> >> > > > Regards,
> >> > > > Grace
> >> > > >
> >> > > > [1] https://github.com/kroxylicious/kroxylicious/discussions/1244
> >> > > >
> >> > > > On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam
> >> <kevin....@shopify.com.invalid
> >> > >
> >> > > > wrote:
> >> > > >
> >> > > > > Hi all,
> >> > > > >
> >> > > > > Writing to see if the community would be open to exploring a
> FLIP
> >> for
> >> > > the
> >> > > > > Kafka Table Connectors. The FLIP would allow for storing Kafka
> >> > Messages
> >> > > > > beyond a Kafka cluster's message limit (1 MB by default) out of
> >> band
> >> > in
> >> > > > > cloud object storage or another backend.
> >> > > > >
> >> > > > > During serialization the message would be replaced with a
> >> reference,
> >> > > and
> >> > > > > during deserialization the reference would be used to fetch the
> >> large
> >> > > > > message and pass it to Flink. Something like Option 1 in this
> blog
> >> > post
> >> > > > > <
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0
> >> > > > > >
> >> > > > > .
> >> > > > >
> >> > > > > What do you think?
> >> > > > >
> >> > > > > We can make it generic by allowing users to implement their own
> >> > > > > LargeMessageSerializer/Deserializer interface for serializing
> and
> >> > > > > deserializing and handling interactions with object storage or
> >> some
> >> > > other
> >> > > > > backend.
> >> > > > >
> >> > > > > The Kafka Connectors can be extended to support ConfigOptions to
> >> > > > > specify the class to load, as well as some user-specified
> >> properties.
> >> > > For
> >> > > > > example: `large-record-handling.class` and `
> >> > > > > large-record-handling.properties.*` (where the user can specify
> >> any
> >> > > > > properties similar to how the Kafka Consumer and Producer
> >> properties
> >> > > are
> >> > > > > handled
> >> > > > > <
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201
> >> > > > > >
> >> > > > > ).
> >> > > > >
> >> > > > > In terms of call sites for the LargeMessage handling, I think we
> >> can
> >> > > > > consider inside of DynamicKafkaDeserializationSchema
> >> > > > > <
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
> >> > > > > >
> >> > > > > and DynamicKafkaRecordSerializationSchema
> >> > > > > <
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
> >> > > > > >,
> >> > > > > where the ConsumerRecord
> >> > > > > <
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108
> >> > > > > >
> >> > > > > and ProducerRecords
> >> > > > > <
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L75
> >> > > > > >
> >> > > > > are passed respectively.
> >> > > > >
> >> > > > > If there's interest, I would be happy to help flesh out the
> >> proposal
> >> > > > more.
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>

Reply via email to