Hi Kirk, thanks for having time to look into this, > KT1: In PayloadReferenceValue, Is "payloadStoreClass" intended to be a > fully-qualified Java class name, or something else? I'm considering the case > where a Producer and/or Consumer are using a non-Java-based client. This would be any string that can be used back by the consumer to verify that the reference can be used by the provided payload store class. If the payload store that used to publish the reference isn’t the same one to download then the serializer should fail, in Java client this would be the fully qualified name. the KIP doesn’t cover non-JVM clients and their cases but if any non-JVM lib implement this they have the freedom to provide any reference to the class as string.
> KT2: Are there any reference implementations of PayloadStore being shipped? > It appears that it's up to the user to supply. I'm wondering if some basic > scaffolding could be useful? I was planning to ship one for local file system for development and testing, I would like to avoid any specific implementation to any specific payload store/cloud provider. > KT3: I'm assuming a given PayloadStore implementation is instantiated and > configured once per client. As such, (in the case of Producers used across > multiple threads), they need to be thread safe. Can the lifecycle and thread > safety aspects be made more clear in the documentation? Any Kafka serializer provided via `value.serializer` or `value.deserializer` will be initialised per-client so if the app is setting up multiple threads each thread with one producer then the client will initialised separate serializer for each one. The serilaizer is called per-record and pass this down to the payload which inherently per-record. Nothing special here about this serializer, as this is the normal path. If someone is setting up the client using by passing the same instance of `LargeMessageSerializer` and their payload store class to multiple producers then it is on them to make sure it is thread-safe which depends on how their payload store implementation works. > KT4: What is the intended behavior when an object referenced by > "fullPayloadPath" is missing? Imagine there's a mismatch in retention time > between the backing store and the Kafka message? Does the message get > skipped, does it throw an error, or ? if `large.message.skip.not.found.error` is set to true it will skip, if not it will fail. So it is up to consumer to decide what they want to do when the TTL of the payload on the store hits before Kafka retention. Thanks Omnia > On 2 May 2025, at 01:44, Kirk True <k...@kirktrue.pro> wrote: > > Hi Omnia, > > A very interesting KIP! Thanks for the write up and discussion thus far! > > A few questions: > > KT1: In PayloadReferenceValue, Is "payloadStoreClass" intended to be a > fully-qualified Java class name, or something else? I'm considering the case > where a Producer and/or Consumer are using a non-Java-based client. > > KT2: Are there any reference implementations of PayloadStore being shipped? > It appears that it's up to the user to supply. I'm wondering if some basic > scaffolding could be useful? > > KT3: I'm assuming a given PayloadStore implementation is instantiated and > configured once per client. As such, (in the case of Producers used across > multiple threads), they need to be thread safe. Can the lifecycle and thread > safety aspects be made more clear in the documentation? > > KT4: What is the intended behavior when an object referenced by > "fullPayloadPath" is missing? Imagine there's a mismatch in retention time > between the backing store and the Kafka message? Does the message get > skipped, does it throw an error, or ? > > Thanks, > Kirk > > On Wed, Apr 30, 2025, at 6:21 AM, Omnia Ibrahim wrote: >> Hi Luke, >> >>>> 3. What does "LargeMessageFormatter" do in the process? >>>> I thought all we want to do is to replace the "large value data" into a >>>> path, and consumers will read the path via blob store class. >>>> All these should be done in serializer/deserializer, so why do we need the >>>> formatter? >>> >>> I wanted to bit of more info than just the path to download, for example I >>> want to add stuff like the class path for the original blob store for >>> example if consumer is setup with the unmatched blob store to the one used >>> during publishing. >>> I have updated the KIP to simplify this by having this always as a simple >>> json of path and publisher class which can be represented as >>> PayloadReferenceValue. WDYT? >> >> I thought of another case where having the freedom to form the reference >> might be nice feature, which is DR. Let’s imagine this case where someone >> publish large messages to S3 and reference to Kafka topic then they want to >> have DR. This can be achievable if they have mirrored Kafka topic which >> contains the references but if S3 is unreachable form the DR backup location >> then the reference they have is bit useless. However if the message >> formatter is customisable then dev can implement a complicated store that >> publish to two store locations and the publish both references to Kafka as >> one message and the consumer store can download from either buckets that are >> available. I think keeping the door open to such use-case might be good >> feature but also having such use case might be questionable a bit with the >> latency it will add as we will be publishing to N number of stores. >> >> Regards >> Omnia >> >>> On 24 Apr 2025, at 17:40, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: >>> >>> Hi Luke, thanks for having the time to look into the KIP >>>> 2. It's not clear how the "retry count" comes into play in the KIP. It >>>> needs more explanation. >>> My initial thinking is the retry configs are a must for all blob stores, so >>> we can provide them, and validate them for free for all blob stores so not >>> every implementation will go through verifying them. >>> >>>> 3. What does "LargeMessageFormatter" do in the process? >>>> I thought all we want to do is to replace the "large value data" into a >>>> path, and consumers will read the path via blob store class. >>>> All these should be done in serializer/deserializer, so why do we need the >>>> formatter? >>> >>> I wanted to bit of more info than just the path to download, for example I >>> want to add stuff like the class path for the original blob store for >>> example if consumer is setup with the unmatched blob store to the one used >>> during publishing. >>> I have updated the KIP to simplify this by having this always as a simple >>> json of path and publisher class which can be represented as >>> PayloadReferenceValue. WDYT? >>> >>>> 4. In the BlobStore, it looks like we presume users will use object stores, >>>> which is not good. >>>> Could we make it more generic? Javadoc, method names, … >>> This is a good point, I have updated the method names and Javadoc. I also >>> thinking of renaming the class name to PayloadStoreinstead of BlobStore as >>> blob store still tide to object store as well. To set some context here, I >>> am proposing this after working with some community form Apache Cassandra >>> who are working on Cassandra CEP >>> <https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-44%3A+Kafka+integration+for+Cassandra+CDC+using+Sidecar#CEP44:KafkaintegrationforCassandraCDCusingSidecar-LargeBlob>-44 >>> to handle large CDC and the initial thinking was let’s publish any large >>> cdc to an object store instead of Kafka this why the naming was suggesting >>> “blob store” only. >>> >>>> 5. It would be good to have some explanation for the purpose of each new >>>> interface/class, and clear javadoc for each method. >>> >>> Updated the KIP with javadocs >>> >>>> 6. About the BlobIdGenerator, why do we need it, could you explain more? >>>> Again, I thought we only need to replace value to a path, and add the >>>> "large-message" header, so that when consumer reads this message, it'll >>>> read the path from value and get the original data via BlobStore. Why do we >>>> need this ID generator? I think users should do the object naming when >>>> putting the object by themselves, not via another interface. WDYT? >>> In some cases generating the ID might need some smart work for example to >>> avoid s3 throttling the recommended way on their doc it to create sub paths >>> under the original bucket, to decide this we might hash the data to find a >>> suitable sub-path. >>> Here is an example of how I would generate an path for s3 file >>> ``` >>> public String id(byte[] data) { >>> String subFolder = topic + "-" + Utils.toPositive(Utils.murmur2(data)) % >>> distributionFactor // distributionFactor is a config for the Id generator >>> and it represent the max number of sub-folders under the bucket >>> return subFolder + “/“ + UUID.randomUUID().toString() >>> } >>> ``` >>> Hope this example clarify a bit. However I do agree here it might not need >>> a class. I have move it to be part of the store class. >>> >>> Please let me know WDYT of the final shape of the KIP now >>> >>> Thanks >>> Omnia >>> >>>> On 24 Apr 2025, at 13:31, Luke Chen <show...@gmail.com> wrote: >>>> >>>> Hi Omnia, >>>> >>>> Thanks for proposing this feature that many users expected. >>>> >>>> Some comments: >>>> 1. It's quite interesting to see the idea of chained >>>> serializer/deserializer used here. I like it. >>>> >>>> 2. It's not clear how the "retry count" comes into play in the KIP. It >>>> needs more explanation. >>>> >>>> 3. What does "LargeMessageFormatter" do in the process? >>>> I thought all we want to do is to replace the "large value data" into a >>>> path, and consumers will read the path via blob store class. >>>> All these should be done in serializer/deserializer, so why do we need the >>>> formatter? >>>> >>>> 4. In the BlobStore, it looks like we presume users will use object stores, >>>> which is not good. >>>> Could we make it more generic? Javadoc, method names, ... >>>> >>>> 5. It would be good to have some explanation for the purpose of each new >>>> interface/class, and clear javadoc for each method. >>>> >>>> 6. About the BlobIdGenerator, why do we need it, could you explain more? >>>> Again, I thought we only need to replace value to a path, and add the >>>> "large-message" header, so that when consumer reads this message, it'll >>>> read the path from value and get the original data via BlobStore. Why do we >>>> need this ID generator? I think users should do the object naming when >>>> putting the object by themselves, not via another interface. WDYT? >>>> >>>> Thanks. >>>> Luke >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Apr 10, 2025 at 9:31 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>> wrote: >>>> >>>>> Hi there I would like to start discussions on >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1159%3A+Large+message+reference+based+Serializer >>>>> >>>>> Thanks >>>>> Omnia