Hi Jun
> 
> JR20. The inconsistency is still there. It's probably simpler to
> consolidate on
> org.apache.kafka.common.serialization.largemessage.PayloadStore.
Updated the kip now to address these
> JR21. Could you document how the payload id string is encoded? For example,
> is it always UTF8?
I updated the java doc, the default implementation do provide UTF-8 however if 
the developer want to support any other they will need to address this in the 
implementation of the store. 
> 
> JR22. "Since compacted topics can retain data indefinitely, users must
> choose between setting a business-appropriate TTL or accepting indefinite
> storage costs." Setting a TTL is not ideal since it can break the consumer
> application. So, we probably don't want to recommend it to the users.
> Another option is to run a cleaning job by consuming the topic and if it
> observes a null payload, delete the object based on the key in the record.

Updated the recommendations 
> JR24. In the following, org.apache.kafka.common.serialization.Serializer
> should include the largemessage namespace.
> producerConfig.put("value.serializers",
> "kafka.serializers.KafkaAvroDeserializer,
> org.apache.kafka.common.serialization.Serializer");

Updated now

Thanks 
Omnia

> On 12 Aug 2025, at 18:09, Jun Rao <j...@confluent.io.INVALID> wrote:
> 
> Hi, Omnia,
> 
> Thanks for the reply. A few more comments.
> 
> JR20. The inconsistency is still there. It's probably simpler to
> consolidate on
> org.apache.kafka.common.serialization.largemessage.PayloadStore.
> 
> JR21. Could you document how the payload id string is encoded? For example,
> is it always UTF8?
> 
> JR22. "Since compacted topics can retain data indefinitely, users must
> choose between setting a business-appropriate TTL or accepting indefinite
> storage costs." Setting a TTL is not ideal since it can break the consumer
> application. So, we probably don't want to recommend it to the users.
> Another option is to run a cleaning job by consuming the topic and if it
> observes a null payload, delete the object based on the key in the record.
> 
> JR23. We probably don't need to force the PayloadStore implementer to
> implement Monitorable. We could document that if it's desirable to monitor
> a PayloadStore plugin, one can implement the Monitorable interface.
> 
> JR24. In the following, org.apache.kafka.common.serialization.Serializer
> should include the largemessage namespace.
> producerConfig.put("value.serializers",
> "kafka.serializers.KafkaAvroDeserializer,
> org.apache.kafka.common.serialization.Serializer");
> 
> Jun
> 
> On Tue, Aug 12, 2025 at 3:27 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
> <mailto:o.g.h.ibra...@gmail.com>>
> wrote:
> 
>> Hi Jun,
>> 
>>> JR20. Could you fix the inconsistency?
>>> org.apache.kafka.common.serialization.largemessage.PayloadStore vs
>>> org.apache.kafka.common.serialization.largemessage.store.PayloadStore
>> Updated
>>> 
>>> JR21. "Encapsulate that ID into a simple Kafka event using a structured
>>> format.". Could you define the structured format explicitly?
>> 
>> This was from the initial design we had Id format but after discussing
>> this with Luke I ended up simplifying this by removing the format and I
>> guess I forgot this line. I have updated the kip now.
>> 
>>> JR22. TTL for object stores: Could the new serializer be used on a
>>> compacted topic? If so, how should a user configure the TTL since the
>> Kafka
>>> retention is infinite.
>> 
>> Great question about compacted topics! There are few usage of compacted
>> topic to consider:
>> 1. Typical Compacted Topic Usage: The compacted topics are commonly used
>> as cache/state layers where the latest value for each key represents the
>> current state. These use cases typically involve smaller records since they
>> need to be loaded efficiently for state reconstruction. Large message
>> serialization would be unusual for this pattern.
>> 
>> 2. User-Defined Compacted Topics with Large Payloads: for applications
>> that do use compacted topics with large payloads, the PayloadStore
>> implementation should handle this by:
>>        a. Consistent ID Generation: Use deterministic IDs based on the
>> Kafka key (rather than random UUIDs) so that when a payload id is updated,
>> it overwrites the same payload store object instead of creating new ones.
>> 
>>        b. TTL Strategy: Since compacted topics can retain data
>> indefinitely, users have two options:
>>                - Set a business-appropriate TTL (basically "we know our
>> cache data becomes stale after 30 days”)
>>                - Configure no TTL and accept indefinite storage costs as
>> a trade-off for the architectural benefits
>> 
>> 3. Compact + Delete Policy: Topics with `cleanup.policy=compact,delete`
>> will eventually remove old data, so standard TTL approaches work normally.
>> The key insight is that TTL configuration depends on the business
>> requirements and usage patterns rather than just the Kafka retention
>> policy. The PayloadStore implementation should provide flexibility for
>> users to make this trade-off consciously.
>> 
>> I have update the consideration section with this.
>> 
>> Thanks
>> Omnia
>> 
>>> On 11 Aug 2025, at 23:32, Jun Rao <j...@confluent.io.INVALID> wrote:
>>> 
>>> Hi, Omnia,
>>> 
>>> Thanks for the reply. A few more comments.
>>> 
>>> JR20. Could you fix the inconsistency?
>>> org.apache.kafka.common.serialization.largemessage.PayloadStore vs
>>> org.apache.kafka.common.serialization.largemessage.store.PayloadStore
>>> 
>>> JR21. "Encapsulate that ID into a simple Kafka event using a structured
>>> format.". Could you define the structured format explicitly?
>>> 
>>> JR22. TTL for object stores: Could the new serializer be used on a
>>> compacted topic? If so, how should a user configure the TTL since the
>> Kafka
>>> retention is infinite.
>>> 
>>> Jun
>>> 
>>> On Mon, Aug 11, 2025 at 10:08 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
>> <mailto:o.g.h.ibra...@gmail.com>>
>>> wrote:
>>> 
>>>> Hi Jun
>>>>> JR11. Do you think that we need to add key.serializers and
>>>>> key.deserializers or do you think covering large messages in value is
>>>>> enough?
>>>> The large message issue usually is a value issue, I never saw a key that
>>>> is bigger than broker message size. If I would add key.serializers and
>>>> key.deserializers it would be for consistency and maybe there are
>> use-cases
>>>> where developers want to apply multiple serializations in order on the
>> key
>>>> as well outside the context of the large message support.
>>>> I updated the KIP to add these two configs now.
>>>> 
>>>>> JR12. We can load ComposableSerializer automatically if
>> value.serializers
>>>>> or value.deserializers are specified. But, it seems that we could keep
>>>>> ComposableSerializer as an internal implementation. For example,
>>>>> ProducerInterceptors is automatically loaded when multiple interceptors
>>>> are
>>>>> specified and is an internal class.
>>>> I updated the kip to highlight that the changes is updating
>> ProducerConfig
>>>> and ConsumerConfig to have new config for serializers/deserializers and
>> not
>>>> the actual class this is implementation details and not public
>> interfaces.
>>>> 
>>>>> JR17. We could estimate the size after compression, but the estimator
>> is
>>>>> not 100% accurate. It seems that it's simpler to just use the original
>>>>> message size.
>>>> We can keep it as original message size, I was thinking if it is good
>>>> enough for max.request.size it might be good enough for this. I updated
>> the
>>>> KIP anyway to simplify it and keep it to the original size.
>>>> 
>>>> Hope the final version addressed all feedbacks and we can resume with
>> the
>>>> voting
>>>> 
>>>> Thanks
>>>> Omnia
>>>> 
>>>>> On 8 Aug 2025, at 22:10, Jun Rao <j...@confluent.io.INVALID> wrote:
>>>>> 
>>>>> Hi, Omnia,
>>>>> 
>>>>> Thanks for the reply.
>>>>> 
>>>>> JR11. Do you think that we need to add key.serializers and
>>>>> key.deserializers or do you think covering large messages in value is
>>>>> enough?
>>>>> 
>>>>> JR12. We can load ComposableSerializer automatically if
>> value.serializers
>>>>> or value.deserializers are specified. But, it seems that we could keep
>>>>> ComposableSerializer as an internal implementation. For example,
>>>>> ProducerInterceptors is automatically loaded when multiple interceptors
>>>> are
>>>>> specified and is an internal class.
>>>>> 
>>>>> JR17. We could estimate the size after compression, but the estimator
>> is
>>>>> not 100% accurate. It seems that it's simpler to just use the original
>>>>> message size.
>>>>> 
>>>>> Jun
>>>>> 
>>>>> On Mon, Aug 4, 2025 at 10:33 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
>>>> <mailto:o.g.h.ibra...@gmail.com>>
>>>>> wrote:
>>>>> 
>>>>>> 
>>>>>> Hi Jun
>>>>>>> JR11. value.serializers and value.deserializers: Should they be of
>> type
>>>>>>> List? Also, where are key.serializers and key.deserializers?
>>>>>>> 
>>>>>> Updated now
>>>>>>> JR12. Do we still need ComposableSerializer and
>> ComposableDeserializer?
>>>>>> The initial thinking here was if `value.serializers or
>>>> value.deserializers
>>>>>> ` exists the client will load  ComposableSerializer or
>>>>>> ComposableDeserializer  automatically and use them. Unfortunately this
>>>>>> would need us to define these serializers.
>>>>>> 
>>>>>> The other option is to update `Plugin<Serializer<V>>
>>>>>> valueSerializerPlugin` and KafkaProducer constructor to accept
>>>>>> List<Serializer<V>> and move the logic of the ComposableSerializer
>> into
>>>>>> KafkaProducer::doSend which when we do serialization (same with
>>>>>> KafkaConsumer). This option hide the logic and reduce exposure for
>>>> client
>>>>>> to these.
>>>>>> WDYT?
>>>>>> 
>>>>>>> 
>>>>>>> JR13. large.message.payload.store.class : should it be of type class?
>>>>>> Updated
>>>>>>> 
>>>>>>> JR14.
>>>>>>> 
>>>>>> 
>>>> 
>> org.apache.kafka.common.serialization.largemessage.LargeMessageSerializer :
>>>>>>> The name seems redundant since largemessage appears twice.
>>>>>> Updated
>>>>>>> 
>>>>>>> JR15. PayloadResponse: It still mentions response code. It mentions
>>>>>>> "isRetryable flag", which no longer exists in PayloadStoreException.
>>>>>> There
>>>>>>> are typos in "then it will serialiser will”.
>>>>>> The KIP is updated now
>>>>>> 
>>>>>>> JR16. Regarding returning new byte[0] if
>>>>>> large.message.skip.not.found.error
>>>>>>> is true, this will likely fail the next deserializer and the
>>>> application
>>>>>>> won't have the right context of the error. It's probably better to
>> just
>>>>>>> propagate the specific exception and let the caller handle it.
>>>>>> You right this will cause issue if there is another deserializer
>> waiting
>>>>>> for the data. I have updated the KIP with this.
>>>>>> 
>>>>>>> 
>>>>>>> JR17. LargeMessageSerializer:  "Check if the estimated size of the
>> data
>>>>>>> (bytes) after applying provided compression (if there is one)"
>>>>>> Compression
>>>>>>> actually happens after serialization and is done on a batch of
>> records.
>>>>>> Yes the compression itself happened after but we also have
>>>>>> `estimateSizeInBytesUpperBound` which my understanding is this take
>> the
>>>>>> compression type into the account as well
>>>>>>> 
>>>>>>> 
>>>>>>> JR18. Could you define the type T for LargeMessageSerializer?
>>>>>> Update the KIP this T would be byte[]
>>>>>> 
>>>>>> Thanks
>>>>>> Omnia
>>>>>>> 
>>>>>>> Jun
>>>>>>> 
>>>>>>> On Fri, Jul 25, 2025 at 6:28 AM Omnia Ibrahim <
>> o.g.h.ibra...@gmail.com
>>>>>> <mailto:o.g.h.ibra...@gmail.com>>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Jun, thanks for having the time to review this
>>>>>>>> 
>>>>>>>>> JR1. While the KIP is potentially useful, I am wondering who is
>>>>>>>> responsible
>>>>>>>>> for retention for the objects in the payload store. Once a message
>>>>>> with a
>>>>>>>>> reference is deleted, the key of the external object is lost and
>> the
>>>>>>>> object
>>>>>>>>> may never be deleted.
>>>>>>>> 
>>>>>>>> The `ttl` in the object store is the responsibility of the owner of
>>>> this
>>>>>>>> store it should be configured in away that is reasonable with the
>>>>>> retention
>>>>>>>> config in Kafka.
>>>>>>>> I have updated the KIP with `Consideration` section.
>>>>>>>>> 
>>>>>>>>> JR2. Configs: For all new configs, it would be useful to list their
>>>>>>>> types.
>>>>>>>> Updated the KIP now
>>>>>>>>> 
>>>>>>>>> JR3. value.serializers: Why is this required? If a user doesn't set
>>>> it,
>>>>>>>> we
>>>>>>>>> should just use value.serializer, right? Ditto for key.serializers.
>>>>>>>> No you right this was copy/past mistake
>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> JR4. For all new public interfaces such as LargeMessageSerializer,
>>>>>>>>> PayloadStore and PayloadResponse, it would be useful to include the
>>>>>> full
>>>>>>>>> package name.
>>>>>>>> Updated the KIP now
>>>>>>>>> 
>>>>>>>>> JR5. large.message.payload.store.retry.max.backoff.ms and
>>>>>>>>> large.message.payload.store.retry.delay.backoff.ms: Is the
>> intention
>>>>>> to
>>>>>>>>> implement exponential backoff on retries? If so, it's more
>> consistent
>>>>>> if
>>>>>>>> we
>>>>>>>>> can follow the existing naming convention like
>> retry.backoff.max.ms
>>>> <
>>>>>>>> http://retry.backoff.max.ms/> and
>>>>>>>>> retry.backoff.ms <http://retry.backoff.ms/> <
>>>> http://retry.backoff.ms/> <http://retry.backoff.ms/
>>>>>>> .
>>>>>>>> I have removed these to simplify the config more (as Luke suggested
>>>>>>>> initially) and added these to the consideration section.
>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> JR6. large.message.skip.not.found.error : If the reference can't be
>>>>>>>> found,
>>>>>>>>> what value does the deserializer return? Note that null has a
>> special
>>>>>>>>> meaning for tombstone in compacted topics.
>>>>>>>> The deserialiser will return `new byte[0]` not null.
>>>>>>>>> 
>>>>>>>>> JR7. PayloadResponse: Why do we have both responseCode and
>>>>>>>>> PayloadStoreException?
>>>>>>>> We can do without responseCode, the initial though was to report
>>>>>> response
>>>>>>>> code form payload store.
>>>>>>>> Update the KIP.
>>>>>>>>> JR8. Why do we need PayloadStore.metrics? Note that we could
>> monitor
>>>>>> the
>>>>>>>>> metrics in a plugin through the Monitorable interface.
>>>>>>>> Oh nice, I didn’t know about this interface before. Updated the KIP
>>>> with
>>>>>>>> this now.
>>>>>>>>> 
>>>>>>>>> JR9. Why do we need the protected field
>>>>>>>> PayloadStoreException.isRetryable?
>>>>>>>> Initial thought here was the serializer can retry the upload. But I
>>>> have
>>>>>>>> removed all the retry logic from serializer and it will be up to the
>>>>>>>> PayloadStore provider to implement this if they need it.
>>>>>>>>> 
>>>>>>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
>>>>>>>> interface.
>>>>>>>> It is updated now to interface.
>>>>>>>> 
>>>>>>>> Hope the last version of the KIP is more simpler now
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> Omnia
>>>>>>>> 
>>>>>>>>> On 23 Jul 2025, at 00:43, Jun Rao <j...@confluent.io.INVALID
>> <mailto:j...@confluent.io.INVALID> <mailto:
>>>> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID> 
>>>> <mailto:j...@confluent.io.INVALID>> <mailto:
>>>>>> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID> 
>>>>>> <mailto:j...@confluent.io.INVALID> <mailto:
>> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Thanks for the KIP. A few comments.
>>>>>>>>> 
>>>>>>>>> JR1. While the KIP is potentially useful, I am wondering who is
>>>>>>>> responsible
>>>>>>>>> for retention for the objects in the payload store. Once a message
>>>>>> with a
>>>>>>>>> reference is deleted, the key of the external object is lost and
>> the
>>>>>>>> object
>>>>>>>>> may never be deleted.
>>>>>>>>> 
>>>>>>>>> JR2. Configs: For all new configs, it would be useful to list their
>>>>>>>> types.
>>>>>>>>> 
>>>>>>>>> JR3. value.serializers: Why is this required? If a user doesn't set
>>>> it,
>>>>>>>> we
>>>>>>>>> should just use value.serializer, right? Ditto for key.serializers.
>>>>>>>>> 
>>>>>>>>> JR4. For all new public interfaces such as LargeMessageSerializer,
>>>>>>>>> PayloadStore and PayloadResponse, it would be useful to include the
>>>>>> full
>>>>>>>>> package name.
>>>>>>>>> 
>>>>>>>>> JR5. large.message.payload.store.retry.max.backoff.ms and
>>>>>>>>> large.message.payload.store.retry.delay.backoff.ms: Is the
>> intention
>>>>>> to
>>>>>>>>> implement exponential backoff on retries? If so, it's more
>> consistent
>>>>>> if
>>>>>>>> we
>>>>>>>>> can follow the existing naming convention like
>> retry.backoff.max.ms <http://retry.backoff.max.ms/> 
>> <http://retry.backoff.max.ms/>
>>>> <http://retry.backoff.max.ms/> <
>>>>>> http://retry.backoff.max.ms/> <
>>>>>>>> http://retry.backoff.max.ms/> and
>>>>>>>>> retry.backoff.ms <http://retry.backoff.ms/> 
>>>>>>>>> <http://retry.backoff.ms/> <
>> http://retry.backoff.ms/> <
>>>> http://retry.backoff.ms/> <http://retry.backoff.ms/
>>>>>>> .
>>>>>>>>> 
>>>>>>>>> JR6. large.message.skip.not.found.error : If the reference can't be
>>>>>>>> found,
>>>>>>>>> what value does the deserializer return? Note that null has a
>> special
>>>>>>>>> meaning for tombstone in compacted topics.
>>>>>>>>> 
>>>>>>>>> JR7. PayloadResponse: Why do we have both responseCode and
>>>>>>>>> PayloadStoreException?
>>>>>>>>> 
>>>>>>>>> JR8. Why do we need PayloadStore.metrics? Note that we could
>> monitor
>>>>>> the
>>>>>>>>> metrics in a plugin through the Monitorable interface.
>>>>>>>>> 
>>>>>>>>> JR9. Why do we need the protected field
>>>>>>>> PayloadStoreException.isRetryable?
>>>>>>>>> 
>>>>>>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
>>>>>>>> interface.
>>>>>>>>> 
>>>>>>>>> Thanks,

Reply via email to