Hi, Omnia,

Thanks for the reply. A few more comments.

JR20. Could you fix the inconsistency?
org.apache.kafka.common.serialization.largemessage.PayloadStore vs
org.apache.kafka.common.serialization.largemessage.store.PayloadStore

JR21. "Encapsulate that ID into a simple Kafka event using a structured
format.". Could you define the structured format explicitly?

JR22. TTL for object stores: Could the new serializer be used on a
compacted topic? If so, how should a user configure the TTL since the Kafka
retention is infinite.

Jun

On Mon, Aug 11, 2025 at 10:08 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
wrote:

> Hi Jun
> > JR11. Do you think that we need to add key.serializers and
> > key.deserializers or do you think covering large messages in value is
> > enough?
> The large message issue usually is a value issue, I never saw a key that
> is bigger than broker message size. If I would add key.serializers and
> key.deserializers it would be for consistency and maybe there are use-cases
> where developers want to apply multiple serializations in order on the key
> as well outside the context of the large message support.
> I updated the KIP to add these two configs now.
>
> > JR12. We can load ComposableSerializer automatically if value.serializers
> > or value.deserializers are specified. But, it seems that we could keep
> > ComposableSerializer as an internal implementation. For example,
> > ProducerInterceptors is automatically loaded when multiple interceptors
> are
> > specified and is an internal class.
> I updated the kip to highlight that the changes is updating ProducerConfig
> and ConsumerConfig to have new config for serializers/deserializers and not
> the actual class this is implementation details and not public interfaces.
>
> > JR17. We could estimate the size after compression, but the estimator is
> > not 100% accurate. It seems that it's simpler to just use the original
> > message size.
> We can keep it as original message size, I was thinking if it is good
> enough for max.request.size it might be good enough for this. I updated the
> KIP anyway to simplify it and keep it to the original size.
>
> Hope the final version addressed all feedbacks and we can resume with the
> voting
>
> Thanks
> Omnia
>
> > On 8 Aug 2025, at 22:10, Jun Rao <j...@confluent.io.INVALID> wrote:
> >
> > Hi, Omnia,
> >
> > Thanks for the reply.
> >
> > JR11. Do you think that we need to add key.serializers and
> > key.deserializers or do you think covering large messages in value is
> > enough?
> >
> > JR12. We can load ComposableSerializer automatically if value.serializers
> > or value.deserializers are specified. But, it seems that we could keep
> > ComposableSerializer as an internal implementation. For example,
> > ProducerInterceptors is automatically loaded when multiple interceptors
> are
> > specified and is an internal class.
> >
> > JR17. We could estimate the size after compression, but the estimator is
> > not 100% accurate. It seems that it's simpler to just use the original
> > message size.
> >
> > Jun
> >
> > On Mon, Aug 4, 2025 at 10:33 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
> <mailto:o.g.h.ibra...@gmail.com>>
> > wrote:
> >
> >>
> >> Hi Jun
> >>> JR11. value.serializers and value.deserializers: Should they be of type
> >>> List? Also, where are key.serializers and key.deserializers?
> >>>
> >> Updated now
> >>> JR12. Do we still need ComposableSerializer and ComposableDeserializer?
> >> The initial thinking here was if `value.serializers or
> value.deserializers
> >> ` exists the client will load  ComposableSerializer or
> >> ComposableDeserializer  automatically and use them. Unfortunately this
> >> would need us to define these serializers.
> >>
> >> The other option is to update `Plugin<Serializer<V>>
> >> valueSerializerPlugin` and KafkaProducer constructor to accept
> >> List<Serializer<V>> and move the logic of the ComposableSerializer into
> >> KafkaProducer::doSend which when we do serialization (same with
> >> KafkaConsumer). This option hide the logic and reduce exposure for
> client
> >> to these.
> >> WDYT?
> >>
> >>>
> >>> JR13. large.message.payload.store.class : should it be of type class?
> >> Updated
> >>>
> >>> JR14.
> >>>
> >>
> org.apache.kafka.common.serialization.largemessage.LargeMessageSerializer :
> >>> The name seems redundant since largemessage appears twice.
> >> Updated
> >>>
> >>> JR15. PayloadResponse: It still mentions response code. It mentions
> >>> "isRetryable flag", which no longer exists in PayloadStoreException.
> >> There
> >>> are typos in "then it will serialiser will”.
> >> The KIP is updated now
> >>
> >>> JR16. Regarding returning new byte[0] if
> >> large.message.skip.not.found.error
> >>> is true, this will likely fail the next deserializer and the
> application
> >>> won't have the right context of the error. It's probably better to just
> >>> propagate the specific exception and let the caller handle it.
> >> You right this will cause issue if there is another deserializer waiting
> >> for the data. I have updated the KIP with this.
> >>
> >>>
> >>> JR17. LargeMessageSerializer:  "Check if the estimated size of the data
> >>> (bytes) after applying provided compression (if there is one)"
> >> Compression
> >>> actually happens after serialization and is done on a batch of records.
> >> Yes the compression itself happened after but we also have
> >> `estimateSizeInBytesUpperBound` which my understanding is this take the
> >> compression type into the account as well
> >>>
> >>>
> >>> JR18. Could you define the type T for LargeMessageSerializer?
> >> Update the KIP this T would be byte[]
> >>
> >> Thanks
> >> Omnia
> >>>
> >>> Jun
> >>>
> >>> On Fri, Jul 25, 2025 at 6:28 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
> >> <mailto:o.g.h.ibra...@gmail.com>>
> >>> wrote:
> >>>
> >>>> Hi Jun, thanks for having the time to review this
> >>>>
> >>>>> JR1. While the KIP is potentially useful, I am wondering who is
> >>>> responsible
> >>>>> for retention for the objects in the payload store. Once a message
> >> with a
> >>>>> reference is deleted, the key of the external object is lost and the
> >>>> object
> >>>>> may never be deleted.
> >>>>
> >>>> The `ttl` in the object store is the responsibility of the owner of
> this
> >>>> store it should be configured in away that is reasonable with the
> >> retention
> >>>> config in Kafka.
> >>>> I have updated the KIP with `Consideration` section.
> >>>>>
> >>>>> JR2. Configs: For all new configs, it would be useful to list their
> >>>> types.
> >>>> Updated the KIP now
> >>>>>
> >>>>> JR3. value.serializers: Why is this required? If a user doesn't set
> it,
> >>>> we
> >>>>> should just use value.serializer, right? Ditto for key.serializers.
> >>>> No you right this was copy/past mistake
> >>>>
> >>>>>
> >>>>> JR4. For all new public interfaces such as LargeMessageSerializer,
> >>>>> PayloadStore and PayloadResponse, it would be useful to include the
> >> full
> >>>>> package name.
> >>>> Updated the KIP now
> >>>>>
> >>>>> JR5. large.message.payload.store.retry.max.backoff.ms and
> >>>>> large.message.payload.store.retry.delay.backoff.ms: Is the intention
> >> to
> >>>>> implement exponential backoff on retries? If so, it's more consistent
> >> if
> >>>> we
> >>>>> can follow the existing naming convention like retry.backoff.max.ms
> <
> >>>> http://retry.backoff.max.ms/> and
> >>>>> retry.backoff.ms <http://retry.backoff.ms/> <
> http://retry.backoff.ms/> <http://retry.backoff.ms/
> >>> .
> >>>> I have removed these to simplify the config more (as Luke suggested
> >>>> initially) and added these to the consideration section.
> >>>>
> >>>>>
> >>>>> JR6. large.message.skip.not.found.error : If the reference can't be
> >>>> found,
> >>>>> what value does the deserializer return? Note that null has a special
> >>>>> meaning for tombstone in compacted topics.
> >>>> The deserialiser will return `new byte[0]` not null.
> >>>>>
> >>>>> JR7. PayloadResponse: Why do we have both responseCode and
> >>>>> PayloadStoreException?
> >>>> We can do without responseCode, the initial though was to report
> >> response
> >>>> code form payload store.
> >>>> Update the KIP.
> >>>>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor
> >> the
> >>>>> metrics in a plugin through the Monitorable interface.
> >>>> Oh nice, I didn’t know about this interface before. Updated the KIP
> with
> >>>> this now.
> >>>>>
> >>>>> JR9. Why do we need the protected field
> >>>> PayloadStoreException.isRetryable?
> >>>> Initial thought here was the serializer can retry the upload. But I
> have
> >>>> removed all the retry logic from serializer and it will be up to the
> >>>> PayloadStore provider to implement this if they need it.
> >>>>>
> >>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
> >>>> interface.
> >>>> It is updated now to interface.
> >>>>
> >>>> Hope the last version of the KIP is more simpler now
> >>>>
> >>>> Thanks
> >>>> Omnia
> >>>>
> >>>>> On 23 Jul 2025, at 00:43, Jun Rao <j...@confluent.io.INVALID <mailto:
> j...@confluent.io.INVALID> <mailto:
> >> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID>>> wrote:
> >>>>>
> >>>>> Thanks for the KIP. A few comments.
> >>>>>
> >>>>> JR1. While the KIP is potentially useful, I am wondering who is
> >>>> responsible
> >>>>> for retention for the objects in the payload store. Once a message
> >> with a
> >>>>> reference is deleted, the key of the external object is lost and the
> >>>> object
> >>>>> may never be deleted.
> >>>>>
> >>>>> JR2. Configs: For all new configs, it would be useful to list their
> >>>> types.
> >>>>>
> >>>>> JR3. value.serializers: Why is this required? If a user doesn't set
> it,
> >>>> we
> >>>>> should just use value.serializer, right? Ditto for key.serializers.
> >>>>>
> >>>>> JR4. For all new public interfaces such as LargeMessageSerializer,
> >>>>> PayloadStore and PayloadResponse, it would be useful to include the
> >> full
> >>>>> package name.
> >>>>>
> >>>>> JR5. large.message.payload.store.retry.max.backoff.ms and
> >>>>> large.message.payload.store.retry.delay.backoff.ms: Is the intention
> >> to
> >>>>> implement exponential backoff on retries? If so, it's more consistent
> >> if
> >>>> we
> >>>>> can follow the existing naming convention like retry.backoff.max.ms
> <http://retry.backoff.max.ms/> <
> >> http://retry.backoff.max.ms/> <
> >>>> http://retry.backoff.max.ms/> and
> >>>>> retry.backoff.ms <http://retry.backoff.ms/> <
> http://retry.backoff.ms/> <http://retry.backoff.ms/
> >>> .
> >>>>>
> >>>>> JR6. large.message.skip.not.found.error : If the reference can't be
> >>>> found,
> >>>>> what value does the deserializer return? Note that null has a special
> >>>>> meaning for tombstone in compacted topics.
> >>>>>
> >>>>> JR7. PayloadResponse: Why do we have both responseCode and
> >>>>> PayloadStoreException?
> >>>>>
> >>>>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor
> >> the
> >>>>> metrics in a plugin through the Monitorable interface.
> >>>>>
> >>>>> JR9. Why do we need the protected field
> >>>> PayloadStoreException.isRetryable?
> >>>>>
> >>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an
> >>>> interface.
> >>>>>
> >>>>> Thanks,
>
>

Reply via email to