Hi Jun
> JR11. Do you think that we need to add key.serializers and
> key.deserializers or do you think covering large messages in value is
> enough?
The large message issue usually is a value issue, I never saw a key that is 
bigger than broker message size. If I would add key.serializers and 
key.deserializers it would be for consistency and maybe there are use-cases 
where developers want to apply multiple serializations in order on the key as 
well outside the context of the large message support. 
I updated the KIP to add these two configs now.

> JR12. We can load ComposableSerializer automatically if value.serializers
> or value.deserializers are specified. But, it seems that we could keep
> ComposableSerializer as an internal implementation. For example,
> ProducerInterceptors is automatically loaded when multiple interceptors are
> specified and is an internal class.
I updated the kip to highlight that the changes is updating ProducerConfig and 
ConsumerConfig to have new config for serializers/deserializers and not the 
actual class this is implementation details and not public interfaces. 

> JR17. We could estimate the size after compression, but the estimator is
> not 100% accurate. It seems that it's simpler to just use the original
> message size.
We can keep it as original message size, I was thinking if it is good enough 
for max.request.size it might be good enough for this. I updated the KIP anyway 
to simplify it and keep it to the original size. 

Hope the final version addressed all feedbacks and we can resume with the 
voting 

Thanks 
Omnia

> On 8 Aug 2025, at 22:10, Jun Rao <j...@confluent.io.INVALID> wrote:
> 
> Hi, Omnia,
> 
> Thanks for the reply.
> 
> JR11. Do you think that we need to add key.serializers and
> key.deserializers or do you think covering large messages in value is
> enough?
> 
> JR12. We can load ComposableSerializer automatically if value.serializers
> or value.deserializers are specified. But, it seems that we could keep
> ComposableSerializer as an internal implementation. For example,
> ProducerInterceptors is automatically loaded when multiple interceptors are
> specified and is an internal class.
> 
> JR17. We could estimate the size after compression, but the estimator is
> not 100% accurate. It seems that it's simpler to just use the original
> message size.
> 
> Jun
> 
> On Mon, Aug 4, 2025 at 10:33 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
> <mailto:o.g.h.ibra...@gmail.com>>
> wrote:
> 
>> 
>> Hi Jun
>>> JR11. value.serializers and value.deserializers: Should they be of type
>>> List? Also, where are key.serializers and key.deserializers?
>>> 
>> Updated now
>>> JR12. Do we still need ComposableSerializer and ComposableDeserializer?
>> The initial thinking here was if `value.serializers or value.deserializers
>> ` exists the client will load  ComposableSerializer or
>> ComposableDeserializer  automatically and use them. Unfortunately this
>> would need us to define these serializers.
>> 
>> The other option is to update `Plugin<Serializer<V>>
>> valueSerializerPlugin` and KafkaProducer constructor to accept
>> List<Serializer<V>> and move the logic of the ComposableSerializer into
>> KafkaProducer::doSend which when we do serialization (same with
>> KafkaConsumer). This option hide the logic and reduce exposure for client
>> to these.
>> WDYT?
>> 
>>> 
>>> JR13. large.message.payload.store.class : should it be of type class?
>> Updated
>>> 
>>> JR14.
>>> 
>> org.apache.kafka.common.serialization.largemessage.LargeMessageSerializer :
>>> The name seems redundant since largemessage appears twice.
>> Updated
>>> 
>>> JR15. PayloadResponse: It still mentions response code. It mentions
>>> "isRetryable flag", which no longer exists in PayloadStoreException.
>> There
>>> are typos in "then it will serialiser will”.
>> The KIP is updated now
>> 
>>> JR16. Regarding returning new byte[0] if
>> large.message.skip.not.found.error
>>> is true, this will likely fail the next deserializer and the application
>>> won't have the right context of the error. It's probably better to just
>>> propagate the specific exception and let the caller handle it.
>> You right this will cause issue if there is another deserializer waiting
>> for the data. I have updated the KIP with this.
>> 
>>> 
>>> JR17. LargeMessageSerializer:  "Check if the estimated size of the data
>>> (bytes) after applying provided compression (if there is one)"
>> Compression
>>> actually happens after serialization and is done on a batch of records.
>> Yes the compression itself happened after but we also have
>> `estimateSizeInBytesUpperBound` which my understanding is this take the
>> compression type into the account as well
>>> 
>>> 
>>> JR18. Could you define the type T for LargeMessageSerializer?
>> Update the KIP this T would be byte[]
>> 
>> Thanks
>> Omnia
>>> 
>>> Jun
>>> 
>>> On Fri, Jul 25, 2025 at 6:28 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
>> <mailto:o.g.h.ibra...@gmail.com>>
>>> wrote:
>>> 
>>>> Hi Jun, thanks for having the time to review this
>>>> 
>>>>> JR1. While the KIP is potentially useful, I am wondering who is
>>>> responsible
>>>>> for retention for the objects in the payload store. Once a message
>> with a
>>>>> reference is deleted, the key of the external object is lost and the
>>>> object
>>>>> may never be deleted.
>>>> 
>>>> The `ttl` in the object store is the responsibility of the owner of this
>>>> store it should be configured in away that is reasonable with the
>> retention
>>>> config in Kafka.
>>>> I have updated the KIP with `Consideration` section.
>>>>> 
>>>>> JR2. Configs: For all new configs, it would be useful to list their
>>>> types.
>>>> Updated the KIP now
>>>>> 
>>>>> JR3. value.serializers: Why is this required? If a user doesn't set it,
>>>> we
>>>>> should just use value.serializer, right? Ditto for key.serializers.
>>>> No you right this was copy/past mistake
>>>> 
>>>>> 
>>>>> JR4. For all new public interfaces such as LargeMessageSerializer,
>>>>> PayloadStore and PayloadResponse, it would be useful to include the
>> full
>>>>> package name.
>>>> Updated the KIP now
>>>>> 
>>>>> JR5. large.message.payload.store.retry.max.backoff.ms and
>>>>> large.message.payload.store.retry.delay.backoff.ms: Is the intention
>> to
>>>>> implement exponential backoff on retries? If so, it's more consistent
>> if
>>>> we
>>>>> can follow the existing naming convention like retry.backoff.max.ms <
>>>> http://retry.backoff.max.ms/> and
>>>>> retry.backoff.ms <http://retry.backoff.ms/> <http://retry.backoff.ms/> 
>>>>> <http://retry.backoff.ms/
>>> .
>>>> I have removed these to simplify the config more (as Luke suggested
>>>> initially) and added these to the consideration section.
>>>> 
>>>>> 
>>>>> JR6. large.message.skip.not.found.error : If the reference can't be
>>>> found,
>>>>> what value does the deserializer return? Note that null has a special
>>>>> meaning for tombstone in compacted topics.
>>>> The deserialiser will return `new byte[0]` not null.
>>>>> 
>>>>> JR7. PayloadResponse: Why do we have both responseCode and
>>>>> PayloadStoreException?
>>>> We can do without responseCode, the initial though was to report
>> response
>>>> code form payload store.
>>>> Update the KIP.
>>>>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor
>> the
>>>>> metrics in a plugin through the Monitorable interface.
>>>> Oh nice, I didn’t know about this interface before. Updated the KIP with
>>>> this now.
>>>>> 
>>>>> JR9. Why do we need the protected field
>>>> PayloadStoreException.isRetryable?
>>>> Initial thought here was the serializer can retry the upload. But I have
>>>> removed all the retry logic from serializer and it will be up to the
>>>> PayloadStore provider to implement this if they need it.
>>>>> 
>>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
>>>> interface.
>>>> It is updated now to interface.
>>>> 
>>>> Hope the last version of the KIP is more simpler now
>>>> 
>>>> Thanks
>>>> Omnia
>>>> 
>>>>> On 23 Jul 2025, at 00:43, Jun Rao <j...@confluent.io.INVALID 
>>>>> <mailto:j...@confluent.io.INVALID> <mailto:
>> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID>>> wrote:
>>>>> 
>>>>> Thanks for the KIP. A few comments.
>>>>> 
>>>>> JR1. While the KIP is potentially useful, I am wondering who is
>>>> responsible
>>>>> for retention for the objects in the payload store. Once a message
>> with a
>>>>> reference is deleted, the key of the external object is lost and the
>>>> object
>>>>> may never be deleted.
>>>>> 
>>>>> JR2. Configs: For all new configs, it would be useful to list their
>>>> types.
>>>>> 
>>>>> JR3. value.serializers: Why is this required? If a user doesn't set it,
>>>> we
>>>>> should just use value.serializer, right? Ditto for key.serializers.
>>>>> 
>>>>> JR4. For all new public interfaces such as LargeMessageSerializer,
>>>>> PayloadStore and PayloadResponse, it would be useful to include the
>> full
>>>>> package name.
>>>>> 
>>>>> JR5. large.message.payload.store.retry.max.backoff.ms and
>>>>> large.message.payload.store.retry.delay.backoff.ms: Is the intention
>> to
>>>>> implement exponential backoff on retries? If so, it's more consistent
>> if
>>>> we
>>>>> can follow the existing naming convention like retry.backoff.max.ms 
>>>>> <http://retry.backoff.max.ms/> <
>> http://retry.backoff.max.ms/> <
>>>> http://retry.backoff.max.ms/> and
>>>>> retry.backoff.ms <http://retry.backoff.ms/> <http://retry.backoff.ms/> 
>>>>> <http://retry.backoff.ms/
>>> .
>>>>> 
>>>>> JR6. large.message.skip.not.found.error : If the reference can't be
>>>> found,
>>>>> what value does the deserializer return? Note that null has a special
>>>>> meaning for tombstone in compacted topics.
>>>>> 
>>>>> JR7. PayloadResponse: Why do we have both responseCode and
>>>>> PayloadStoreException?
>>>>> 
>>>>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor
>> the
>>>>> metrics in a plugin through the Monitorable interface.
>>>>> 
>>>>> JR9. Why do we need the protected field
>>>> PayloadStoreException.isRetryable?
>>>>> 
>>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
>>>> interface.
>>>>> 
>>>>> Thanks,

Reply via email to