Hi, Omnia,

Thanks for the reply.

JR11. Do you think that we need to add key.serializers and
key.deserializers or do you think covering large messages in value is
enough?

JR12. We can load ComposableSerializer automatically if value.serializers
or value.deserializers are specified. But, it seems that we could keep
ComposableSerializer as an internal implementation. For example,
ProducerInterceptors is automatically loaded when multiple interceptors are
specified and is an internal class.

JR17. We could estimate the size after compression, but the estimator is
not 100% accurate. It seems that it's simpler to just use the original
message size.

Jun

On Mon, Aug 4, 2025 at 10:33 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
wrote:

>
> Hi Jun
> > JR11. value.serializers and value.deserializers: Should they be of type
> > List? Also, where are key.serializers and key.deserializers?
> >
> Updated now
> > JR12. Do we still need ComposableSerializer and ComposableDeserializer?
> The initial thinking here was if `value.serializers or value.deserializers
> ` exists the client will load  ComposableSerializer or
> ComposableDeserializer  automatically and use them. Unfortunately this
> would need us to define these serializers.
>
> The other option is to update `Plugin<Serializer<V>>
> valueSerializerPlugin` and KafkaProducer constructor to accept
> List<Serializer<V>> and move the logic of the ComposableSerializer into
> KafkaProducer::doSend which when we do serialization (same with
> KafkaConsumer). This option hide the logic and reduce exposure for client
> to these.
> WDYT?
>
> >
> > JR13. large.message.payload.store.class : should it be of type class?
> Updated
> >
> > JR14.
> >
> org.apache.kafka.common.serialization.largemessage.LargeMessageSerializer :
> > The name seems redundant since largemessage appears twice.
> Updated
> >
> > JR15. PayloadResponse: It still mentions response code. It mentions
> > "isRetryable flag", which no longer exists in PayloadStoreException.
> There
> > are typos in "then it will serialiser will”.
> The KIP is updated now
>
> > JR16. Regarding returning new byte[0] if
> large.message.skip.not.found.error
> > is true, this will likely fail the next deserializer and the application
> > won't have the right context of the error. It's probably better to just
> > propagate the specific exception and let the caller handle it.
> You right this will cause issue if there is another deserializer waiting
> for the data. I have updated the KIP with this.
>
> >
> > JR17. LargeMessageSerializer:  "Check if the estimated size of the data
> > (bytes) after applying provided compression (if there is one)"
> Compression
> > actually happens after serialization and is done on a batch of records.
> Yes the compression itself happened after but we also have
> `estimateSizeInBytesUpperBound` which my understanding is this take the
> compression type into the account as well
> >
> >
> > JR18. Could you define the type T for LargeMessageSerializer?
> Update the KIP this T would be byte[]
>
> Thanks
> Omnia
> >
> > Jun
> >
> > On Fri, Jul 25, 2025 at 6:28 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
> <mailto:o.g.h.ibra...@gmail.com>>
> > wrote:
> >
> >> Hi Jun, thanks for having the time to review this
> >>
> >>> JR1. While the KIP is potentially useful, I am wondering who is
> >> responsible
> >>> for retention for the objects in the payload store. Once a message
> with a
> >>> reference is deleted, the key of the external object is lost and the
> >> object
> >>> may never be deleted.
> >>
> >> The `ttl` in the object store is the responsibility of the owner of this
> >> store it should be configured in away that is reasonable with the
> retention
> >> config in Kafka.
> >> I have updated the KIP with `Consideration` section.
> >>>
> >>> JR2. Configs: For all new configs, it would be useful to list their
> >> types.
> >> Updated the KIP now
> >>>
> >>> JR3. value.serializers: Why is this required? If a user doesn't set it,
> >> we
> >>> should just use value.serializer, right? Ditto for key.serializers.
> >> No you right this was copy/past mistake
> >>
> >>>
> >>> JR4. For all new public interfaces such as LargeMessageSerializer,
> >>> PayloadStore and PayloadResponse, it would be useful to include the
> full
> >>> package name.
> >> Updated the KIP now
> >>>
> >>> JR5. large.message.payload.store.retry.max.backoff.ms and
> >>> large.message.payload.store.retry.delay.backoff.ms: Is the intention
> to
> >>> implement exponential backoff on retries? If so, it's more consistent
> if
> >> we
> >>> can follow the existing naming convention like retry.backoff.max.ms <
> >> http://retry.backoff.max.ms/> and
> >>> retry.backoff.ms <http://retry.backoff.ms/> <http://retry.backoff.ms/
> >.
> >> I have removed these to simplify the config more (as Luke suggested
> >> initially) and added these to the consideration section.
> >>
> >>>
> >>> JR6. large.message.skip.not.found.error : If the reference can't be
> >> found,
> >>> what value does the deserializer return? Note that null has a special
> >>> meaning for tombstone in compacted topics.
> >> The deserialiser will return `new byte[0]` not null.
> >>>
> >>> JR7. PayloadResponse: Why do we have both responseCode and
> >>> PayloadStoreException?
> >> We can do without responseCode, the initial though was to report
> response
> >> code form payload store.
> >> Update the KIP.
> >>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor
> the
> >>> metrics in a plugin through the Monitorable interface.
> >> Oh nice, I didn’t know about this interface before. Updated the KIP with
> >> this now.
> >>>
> >>> JR9. Why do we need the protected field
> >> PayloadStoreException.isRetryable?
> >> Initial thought here was the serializer can retry the upload. But I have
> >> removed all the retry logic from serializer and it will be up to the
> >> PayloadStore provider to implement this if they need it.
> >>>
> >>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
> >> interface.
> >> It is updated now to interface.
> >>
> >> Hope the last version of the KIP is more simpler now
> >>
> >> Thanks
> >> Omnia
> >>
> >>> On 23 Jul 2025, at 00:43, Jun Rao <j...@confluent.io.INVALID <mailto:
> j...@confluent.io.INVALID>> wrote:
> >>>
> >>> Thanks for the KIP. A few comments.
> >>>
> >>> JR1. While the KIP is potentially useful, I am wondering who is
> >> responsible
> >>> for retention for the objects in the payload store. Once a message
> with a
> >>> reference is deleted, the key of the external object is lost and the
> >> object
> >>> may never be deleted.
> >>>
> >>> JR2. Configs: For all new configs, it would be useful to list their
> >> types.
> >>>
> >>> JR3. value.serializers: Why is this required? If a user doesn't set it,
> >> we
> >>> should just use value.serializer, right? Ditto for key.serializers.
> >>>
> >>> JR4. For all new public interfaces such as LargeMessageSerializer,
> >>> PayloadStore and PayloadResponse, it would be useful to include the
> full
> >>> package name.
> >>>
> >>> JR5. large.message.payload.store.retry.max.backoff.ms and
> >>> large.message.payload.store.retry.delay.backoff.ms: Is the intention
> to
> >>> implement exponential backoff on retries? If so, it's more consistent
> if
> >> we
> >>> can follow the existing naming convention like retry.backoff.max.ms <
> http://retry.backoff.max.ms/> <
> >> http://retry.backoff.max.ms/> and
> >>> retry.backoff.ms <http://retry.backoff.ms/> <http://retry.backoff.ms/
> >.
> >>>
> >>> JR6. large.message.skip.not.found.error : If the reference can't be
> >> found,
> >>> what value does the deserializer return? Note that null has a special
> >>> meaning for tombstone in compacted topics.
> >>>
> >>> JR7. PayloadResponse: Why do we have both responseCode and
> >>> PayloadStoreException?
> >>>
> >>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor
> the
> >>> metrics in a plugin through the Monitorable interface.
> >>>
> >>> JR9. Why do we need the protected field
> >> PayloadStoreException.isRetryable?
> >>>
> >>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
> >> interface.
> >>>
> >>> Thanks,
>
>

Reply via email to