Hi Jun > > JR20. The inconsistency is still there. It's probably simpler to > consolidate on > org.apache.kafka.common.serialization.largemessage.PayloadStore. Updated the kip now to address these > JR21. Could you document how the payload id string is encoded? For example, > is it always UTF8? I updated the java doc, the default implementation do provide UTF-8 however if the developer want to support any other they will need to address this in the implementation of the store. > > JR22. "Since compacted topics can retain data indefinitely, users must > choose between setting a business-appropriate TTL or accepting indefinite > storage costs." Setting a TTL is not ideal since it can break the consumer > application. So, we probably don't want to recommend it to the users. > Another option is to run a cleaning job by consuming the topic and if it > observes a null payload, delete the object based on the key in the record.
Updated the recommendations > JR24. In the following, org.apache.kafka.common.serialization.Serializer > should include the largemessage namespace. > producerConfig.put("value.serializers", > "kafka.serializers.KafkaAvroDeserializer, > org.apache.kafka.common.serialization.Serializer"); Updated now Thanks Omnia > On 12 Aug 2025, at 18:09, Jun Rao <j...@confluent.io.INVALID> wrote: > > Hi, Omnia, > > Thanks for the reply. A few more comments. > > JR20. The inconsistency is still there. It's probably simpler to > consolidate on > org.apache.kafka.common.serialization.largemessage.PayloadStore. > > JR21. Could you document how the payload id string is encoded? For example, > is it always UTF8? > > JR22. "Since compacted topics can retain data indefinitely, users must > choose between setting a business-appropriate TTL or accepting indefinite > storage costs." Setting a TTL is not ideal since it can break the consumer > application. So, we probably don't want to recommend it to the users. > Another option is to run a cleaning job by consuming the topic and if it > observes a null payload, delete the object based on the key in the record. > > JR23. We probably don't need to force the PayloadStore implementer to > implement Monitorable. We could document that if it's desirable to monitor > a PayloadStore plugin, one can implement the Monitorable interface. > > JR24. In the following, org.apache.kafka.common.serialization.Serializer > should include the largemessage namespace. > producerConfig.put("value.serializers", > "kafka.serializers.KafkaAvroDeserializer, > org.apache.kafka.common.serialization.Serializer"); > > Jun > > On Tue, Aug 12, 2025 at 3:27 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com > <mailto:o.g.h.ibra...@gmail.com>> > wrote: > >> Hi Jun, >> >>> JR20. Could you fix the inconsistency? >>> org.apache.kafka.common.serialization.largemessage.PayloadStore vs >>> org.apache.kafka.common.serialization.largemessage.store.PayloadStore >> Updated >>> >>> JR21. "Encapsulate that ID into a simple Kafka event using a structured >>> format.". Could you define the structured format explicitly? >> >> This was from the initial design we had Id format but after discussing >> this with Luke I ended up simplifying this by removing the format and I >> guess I forgot this line. I have updated the kip now. >> >>> JR22. TTL for object stores: Could the new serializer be used on a >>> compacted topic? If so, how should a user configure the TTL since the >> Kafka >>> retention is infinite. >> >> Great question about compacted topics! There are few usage of compacted >> topic to consider: >> 1. Typical Compacted Topic Usage: The compacted topics are commonly used >> as cache/state layers where the latest value for each key represents the >> current state. These use cases typically involve smaller records since they >> need to be loaded efficiently for state reconstruction. Large message >> serialization would be unusual for this pattern. >> >> 2. User-Defined Compacted Topics with Large Payloads: for applications >> that do use compacted topics with large payloads, the PayloadStore >> implementation should handle this by: >> a. Consistent ID Generation: Use deterministic IDs based on the >> Kafka key (rather than random UUIDs) so that when a payload id is updated, >> it overwrites the same payload store object instead of creating new ones. >> >> b. TTL Strategy: Since compacted topics can retain data >> indefinitely, users have two options: >> - Set a business-appropriate TTL (basically "we know our >> cache data becomes stale after 30 days”) >> - Configure no TTL and accept indefinite storage costs as >> a trade-off for the architectural benefits >> >> 3. Compact + Delete Policy: Topics with `cleanup.policy=compact,delete` >> will eventually remove old data, so standard TTL approaches work normally. >> The key insight is that TTL configuration depends on the business >> requirements and usage patterns rather than just the Kafka retention >> policy. The PayloadStore implementation should provide flexibility for >> users to make this trade-off consciously. >> >> I have update the consideration section with this. >> >> Thanks >> Omnia >> >>> On 11 Aug 2025, at 23:32, Jun Rao <j...@confluent.io.INVALID> wrote: >>> >>> Hi, Omnia, >>> >>> Thanks for the reply. A few more comments. >>> >>> JR20. Could you fix the inconsistency? >>> org.apache.kafka.common.serialization.largemessage.PayloadStore vs >>> org.apache.kafka.common.serialization.largemessage.store.PayloadStore >>> >>> JR21. "Encapsulate that ID into a simple Kafka event using a structured >>> format.". Could you define the structured format explicitly? >>> >>> JR22. TTL for object stores: Could the new serializer be used on a >>> compacted topic? If so, how should a user configure the TTL since the >> Kafka >>> retention is infinite. >>> >>> Jun >>> >>> On Mon, Aug 11, 2025 at 10:08 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com >> <mailto:o.g.h.ibra...@gmail.com>> >>> wrote: >>> >>>> Hi Jun >>>>> JR11. Do you think that we need to add key.serializers and >>>>> key.deserializers or do you think covering large messages in value is >>>>> enough? >>>> The large message issue usually is a value issue, I never saw a key that >>>> is bigger than broker message size. If I would add key.serializers and >>>> key.deserializers it would be for consistency and maybe there are >> use-cases >>>> where developers want to apply multiple serializations in order on the >> key >>>> as well outside the context of the large message support. >>>> I updated the KIP to add these two configs now. >>>> >>>>> JR12. We can load ComposableSerializer automatically if >> value.serializers >>>>> or value.deserializers are specified. But, it seems that we could keep >>>>> ComposableSerializer as an internal implementation. For example, >>>>> ProducerInterceptors is automatically loaded when multiple interceptors >>>> are >>>>> specified and is an internal class. >>>> I updated the kip to highlight that the changes is updating >> ProducerConfig >>>> and ConsumerConfig to have new config for serializers/deserializers and >> not >>>> the actual class this is implementation details and not public >> interfaces. >>>> >>>>> JR17. We could estimate the size after compression, but the estimator >> is >>>>> not 100% accurate. It seems that it's simpler to just use the original >>>>> message size. >>>> We can keep it as original message size, I was thinking if it is good >>>> enough for max.request.size it might be good enough for this. I updated >> the >>>> KIP anyway to simplify it and keep it to the original size. >>>> >>>> Hope the final version addressed all feedbacks and we can resume with >> the >>>> voting >>>> >>>> Thanks >>>> Omnia >>>> >>>>> On 8 Aug 2025, at 22:10, Jun Rao <j...@confluent.io.INVALID> wrote: >>>>> >>>>> Hi, Omnia, >>>>> >>>>> Thanks for the reply. >>>>> >>>>> JR11. Do you think that we need to add key.serializers and >>>>> key.deserializers or do you think covering large messages in value is >>>>> enough? >>>>> >>>>> JR12. We can load ComposableSerializer automatically if >> value.serializers >>>>> or value.deserializers are specified. But, it seems that we could keep >>>>> ComposableSerializer as an internal implementation. For example, >>>>> ProducerInterceptors is automatically loaded when multiple interceptors >>>> are >>>>> specified and is an internal class. >>>>> >>>>> JR17. We could estimate the size after compression, but the estimator >> is >>>>> not 100% accurate. It seems that it's simpler to just use the original >>>>> message size. >>>>> >>>>> Jun >>>>> >>>>> On Mon, Aug 4, 2025 at 10:33 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com >>>> <mailto:o.g.h.ibra...@gmail.com>> >>>>> wrote: >>>>> >>>>>> >>>>>> Hi Jun >>>>>>> JR11. value.serializers and value.deserializers: Should they be of >> type >>>>>>> List? Also, where are key.serializers and key.deserializers? >>>>>>> >>>>>> Updated now >>>>>>> JR12. Do we still need ComposableSerializer and >> ComposableDeserializer? >>>>>> The initial thinking here was if `value.serializers or >>>> value.deserializers >>>>>> ` exists the client will load ComposableSerializer or >>>>>> ComposableDeserializer automatically and use them. Unfortunately this >>>>>> would need us to define these serializers. >>>>>> >>>>>> The other option is to update `Plugin<Serializer<V>> >>>>>> valueSerializerPlugin` and KafkaProducer constructor to accept >>>>>> List<Serializer<V>> and move the logic of the ComposableSerializer >> into >>>>>> KafkaProducer::doSend which when we do serialization (same with >>>>>> KafkaConsumer). This option hide the logic and reduce exposure for >>>> client >>>>>> to these. >>>>>> WDYT? >>>>>> >>>>>>> >>>>>>> JR13. large.message.payload.store.class : should it be of type class? >>>>>> Updated >>>>>>> >>>>>>> JR14. >>>>>>> >>>>>> >>>> >> org.apache.kafka.common.serialization.largemessage.LargeMessageSerializer : >>>>>>> The name seems redundant since largemessage appears twice. >>>>>> Updated >>>>>>> >>>>>>> JR15. PayloadResponse: It still mentions response code. It mentions >>>>>>> "isRetryable flag", which no longer exists in PayloadStoreException. >>>>>> There >>>>>>> are typos in "then it will serialiser will”. >>>>>> The KIP is updated now >>>>>> >>>>>>> JR16. Regarding returning new byte[0] if >>>>>> large.message.skip.not.found.error >>>>>>> is true, this will likely fail the next deserializer and the >>>> application >>>>>>> won't have the right context of the error. It's probably better to >> just >>>>>>> propagate the specific exception and let the caller handle it. >>>>>> You right this will cause issue if there is another deserializer >> waiting >>>>>> for the data. I have updated the KIP with this. >>>>>> >>>>>>> >>>>>>> JR17. LargeMessageSerializer: "Check if the estimated size of the >> data >>>>>>> (bytes) after applying provided compression (if there is one)" >>>>>> Compression >>>>>>> actually happens after serialization and is done on a batch of >> records. >>>>>> Yes the compression itself happened after but we also have >>>>>> `estimateSizeInBytesUpperBound` which my understanding is this take >> the >>>>>> compression type into the account as well >>>>>>> >>>>>>> >>>>>>> JR18. Could you define the type T for LargeMessageSerializer? >>>>>> Update the KIP this T would be byte[] >>>>>> >>>>>> Thanks >>>>>> Omnia >>>>>>> >>>>>>> Jun >>>>>>> >>>>>>> On Fri, Jul 25, 2025 at 6:28 AM Omnia Ibrahim < >> o.g.h.ibra...@gmail.com >>>>>> <mailto:o.g.h.ibra...@gmail.com>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Jun, thanks for having the time to review this >>>>>>>> >>>>>>>>> JR1. While the KIP is potentially useful, I am wondering who is >>>>>>>> responsible >>>>>>>>> for retention for the objects in the payload store. Once a message >>>>>> with a >>>>>>>>> reference is deleted, the key of the external object is lost and >> the >>>>>>>> object >>>>>>>>> may never be deleted. >>>>>>>> >>>>>>>> The `ttl` in the object store is the responsibility of the owner of >>>> this >>>>>>>> store it should be configured in away that is reasonable with the >>>>>> retention >>>>>>>> config in Kafka. >>>>>>>> I have updated the KIP with `Consideration` section. >>>>>>>>> >>>>>>>>> JR2. Configs: For all new configs, it would be useful to list their >>>>>>>> types. >>>>>>>> Updated the KIP now >>>>>>>>> >>>>>>>>> JR3. value.serializers: Why is this required? If a user doesn't set >>>> it, >>>>>>>> we >>>>>>>>> should just use value.serializer, right? Ditto for key.serializers. >>>>>>>> No you right this was copy/past mistake >>>>>>>> >>>>>>>>> >>>>>>>>> JR4. For all new public interfaces such as LargeMessageSerializer, >>>>>>>>> PayloadStore and PayloadResponse, it would be useful to include the >>>>>> full >>>>>>>>> package name. >>>>>>>> Updated the KIP now >>>>>>>>> >>>>>>>>> JR5. large.message.payload.store.retry.max.backoff.ms and >>>>>>>>> large.message.payload.store.retry.delay.backoff.ms: Is the >> intention >>>>>> to >>>>>>>>> implement exponential backoff on retries? If so, it's more >> consistent >>>>>> if >>>>>>>> we >>>>>>>>> can follow the existing naming convention like >> retry.backoff.max.ms >>>> < >>>>>>>> http://retry.backoff.max.ms/> and >>>>>>>>> retry.backoff.ms <http://retry.backoff.ms/> < >>>> http://retry.backoff.ms/> <http://retry.backoff.ms/ >>>>>>> . >>>>>>>> I have removed these to simplify the config more (as Luke suggested >>>>>>>> initially) and added these to the consideration section. >>>>>>>> >>>>>>>>> >>>>>>>>> JR6. large.message.skip.not.found.error : If the reference can't be >>>>>>>> found, >>>>>>>>> what value does the deserializer return? Note that null has a >> special >>>>>>>>> meaning for tombstone in compacted topics. >>>>>>>> The deserialiser will return `new byte[0]` not null. >>>>>>>>> >>>>>>>>> JR7. PayloadResponse: Why do we have both responseCode and >>>>>>>>> PayloadStoreException? >>>>>>>> We can do without responseCode, the initial though was to report >>>>>> response >>>>>>>> code form payload store. >>>>>>>> Update the KIP. >>>>>>>>> JR8. Why do we need PayloadStore.metrics? Note that we could >> monitor >>>>>> the >>>>>>>>> metrics in a plugin through the Monitorable interface. >>>>>>>> Oh nice, I didn’t know about this interface before. Updated the KIP >>>> with >>>>>>>> this now. >>>>>>>>> >>>>>>>>> JR9. Why do we need the protected field >>>>>>>> PayloadStoreException.isRetryable? >>>>>>>> Initial thought here was the serializer can retry the upload. But I >>>> have >>>>>>>> removed all the retry logic from serializer and it will be up to the >>>>>>>> PayloadStore provider to implement this if they need it. >>>>>>>>> >>>>>>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an >>>>>>>> interface. >>>>>>>> It is updated now to interface. >>>>>>>> >>>>>>>> Hope the last version of the KIP is more simpler now >>>>>>>> >>>>>>>> Thanks >>>>>>>> Omnia >>>>>>>> >>>>>>>>> On 23 Jul 2025, at 00:43, Jun Rao <j...@confluent.io.INVALID >> <mailto:j...@confluent.io.INVALID> <mailto: >>>> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID> >>>> <mailto:j...@confluent.io.INVALID>> <mailto: >>>>>> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID> >>>>>> <mailto:j...@confluent.io.INVALID> <mailto: >> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID>>>> wrote: >>>>>>>>> >>>>>>>>> Thanks for the KIP. A few comments. >>>>>>>>> >>>>>>>>> JR1. While the KIP is potentially useful, I am wondering who is >>>>>>>> responsible >>>>>>>>> for retention for the objects in the payload store. Once a message >>>>>> with a >>>>>>>>> reference is deleted, the key of the external object is lost and >> the >>>>>>>> object >>>>>>>>> may never be deleted. >>>>>>>>> >>>>>>>>> JR2. Configs: For all new configs, it would be useful to list their >>>>>>>> types. >>>>>>>>> >>>>>>>>> JR3. value.serializers: Why is this required? If a user doesn't set >>>> it, >>>>>>>> we >>>>>>>>> should just use value.serializer, right? Ditto for key.serializers. >>>>>>>>> >>>>>>>>> JR4. For all new public interfaces such as LargeMessageSerializer, >>>>>>>>> PayloadStore and PayloadResponse, it would be useful to include the >>>>>> full >>>>>>>>> package name. >>>>>>>>> >>>>>>>>> JR5. large.message.payload.store.retry.max.backoff.ms and >>>>>>>>> large.message.payload.store.retry.delay.backoff.ms: Is the >> intention >>>>>> to >>>>>>>>> implement exponential backoff on retries? If so, it's more >> consistent >>>>>> if >>>>>>>> we >>>>>>>>> can follow the existing naming convention like >> retry.backoff.max.ms <http://retry.backoff.max.ms/> >> <http://retry.backoff.max.ms/> >>>> <http://retry.backoff.max.ms/> < >>>>>> http://retry.backoff.max.ms/> < >>>>>>>> http://retry.backoff.max.ms/> and >>>>>>>>> retry.backoff.ms <http://retry.backoff.ms/> >>>>>>>>> <http://retry.backoff.ms/> < >> http://retry.backoff.ms/> < >>>> http://retry.backoff.ms/> <http://retry.backoff.ms/ >>>>>>> . >>>>>>>>> >>>>>>>>> JR6. large.message.skip.not.found.error : If the reference can't be >>>>>>>> found, >>>>>>>>> what value does the deserializer return? Note that null has a >> special >>>>>>>>> meaning for tombstone in compacted topics. >>>>>>>>> >>>>>>>>> JR7. PayloadResponse: Why do we have both responseCode and >>>>>>>>> PayloadStoreException? >>>>>>>>> >>>>>>>>> JR8. Why do we need PayloadStore.metrics? Note that we could >> monitor >>>>>> the >>>>>>>>> metrics in a plugin through the Monitorable interface. >>>>>>>>> >>>>>>>>> JR9. Why do we need the protected field >>>>>>>> PayloadStoreException.isRetryable? >>>>>>>>> >>>>>>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an >>>>>>>> interface. >>>>>>>>> >>>>>>>>> Thanks,