Hi, Omnia, Thanks for the reply. A few more comments.
JR20. Could you fix the inconsistency? org.apache.kafka.common.serialization.largemessage.PayloadStore vs org.apache.kafka.common.serialization.largemessage.store.PayloadStore JR21. "Encapsulate that ID into a simple Kafka event using a structured format.". Could you define the structured format explicitly? JR22. TTL for object stores: Could the new serializer be used on a compacted topic? If so, how should a user configure the TTL since the Kafka retention is infinite. Jun On Mon, Aug 11, 2025 at 10:08 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: > Hi Jun > > JR11. Do you think that we need to add key.serializers and > > key.deserializers or do you think covering large messages in value is > > enough? > The large message issue usually is a value issue, I never saw a key that > is bigger than broker message size. If I would add key.serializers and > key.deserializers it would be for consistency and maybe there are use-cases > where developers want to apply multiple serializations in order on the key > as well outside the context of the large message support. > I updated the KIP to add these two configs now. > > > JR12. We can load ComposableSerializer automatically if value.serializers > > or value.deserializers are specified. But, it seems that we could keep > > ComposableSerializer as an internal implementation. For example, > > ProducerInterceptors is automatically loaded when multiple interceptors > are > > specified and is an internal class. > I updated the kip to highlight that the changes is updating ProducerConfig > and ConsumerConfig to have new config for serializers/deserializers and not > the actual class this is implementation details and not public interfaces. > > > JR17. We could estimate the size after compression, but the estimator is > > not 100% accurate. It seems that it's simpler to just use the original > > message size. > We can keep it as original message size, I was thinking if it is good > enough for max.request.size it might be good enough for this. I updated the > KIP anyway to simplify it and keep it to the original size. > > Hope the final version addressed all feedbacks and we can resume with the > voting > > Thanks > Omnia > > > On 8 Aug 2025, at 22:10, Jun Rao <j...@confluent.io.INVALID> wrote: > > > > Hi, Omnia, > > > > Thanks for the reply. > > > > JR11. Do you think that we need to add key.serializers and > > key.deserializers or do you think covering large messages in value is > > enough? > > > > JR12. We can load ComposableSerializer automatically if value.serializers > > or value.deserializers are specified. But, it seems that we could keep > > ComposableSerializer as an internal implementation. For example, > > ProducerInterceptors is automatically loaded when multiple interceptors > are > > specified and is an internal class. > > > > JR17. We could estimate the size after compression, but the estimator is > > not 100% accurate. It seems that it's simpler to just use the original > > message size. > > > > Jun > > > > On Mon, Aug 4, 2025 at 10:33 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com > <mailto:o.g.h.ibra...@gmail.com>> > > wrote: > > > >> > >> Hi Jun > >>> JR11. value.serializers and value.deserializers: Should they be of type > >>> List? Also, where are key.serializers and key.deserializers? > >>> > >> Updated now > >>> JR12. Do we still need ComposableSerializer and ComposableDeserializer? > >> The initial thinking here was if `value.serializers or > value.deserializers > >> ` exists the client will load ComposableSerializer or > >> ComposableDeserializer automatically and use them. Unfortunately this > >> would need us to define these serializers. > >> > >> The other option is to update `Plugin<Serializer<V>> > >> valueSerializerPlugin` and KafkaProducer constructor to accept > >> List<Serializer<V>> and move the logic of the ComposableSerializer into > >> KafkaProducer::doSend which when we do serialization (same with > >> KafkaConsumer). This option hide the logic and reduce exposure for > client > >> to these. > >> WDYT? > >> > >>> > >>> JR13. large.message.payload.store.class : should it be of type class? > >> Updated > >>> > >>> JR14. > >>> > >> > org.apache.kafka.common.serialization.largemessage.LargeMessageSerializer : > >>> The name seems redundant since largemessage appears twice. > >> Updated > >>> > >>> JR15. PayloadResponse: It still mentions response code. It mentions > >>> "isRetryable flag", which no longer exists in PayloadStoreException. > >> There > >>> are typos in "then it will serialiser will”. > >> The KIP is updated now > >> > >>> JR16. Regarding returning new byte[0] if > >> large.message.skip.not.found.error > >>> is true, this will likely fail the next deserializer and the > application > >>> won't have the right context of the error. It's probably better to just > >>> propagate the specific exception and let the caller handle it. > >> You right this will cause issue if there is another deserializer waiting > >> for the data. I have updated the KIP with this. > >> > >>> > >>> JR17. LargeMessageSerializer: "Check if the estimated size of the data > >>> (bytes) after applying provided compression (if there is one)" > >> Compression > >>> actually happens after serialization and is done on a batch of records. > >> Yes the compression itself happened after but we also have > >> `estimateSizeInBytesUpperBound` which my understanding is this take the > >> compression type into the account as well > >>> > >>> > >>> JR18. Could you define the type T for LargeMessageSerializer? > >> Update the KIP this T would be byte[] > >> > >> Thanks > >> Omnia > >>> > >>> Jun > >>> > >>> On Fri, Jul 25, 2025 at 6:28 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com > >> <mailto:o.g.h.ibra...@gmail.com>> > >>> wrote: > >>> > >>>> Hi Jun, thanks for having the time to review this > >>>> > >>>>> JR1. While the KIP is potentially useful, I am wondering who is > >>>> responsible > >>>>> for retention for the objects in the payload store. Once a message > >> with a > >>>>> reference is deleted, the key of the external object is lost and the > >>>> object > >>>>> may never be deleted. > >>>> > >>>> The `ttl` in the object store is the responsibility of the owner of > this > >>>> store it should be configured in away that is reasonable with the > >> retention > >>>> config in Kafka. > >>>> I have updated the KIP with `Consideration` section. > >>>>> > >>>>> JR2. Configs: For all new configs, it would be useful to list their > >>>> types. > >>>> Updated the KIP now > >>>>> > >>>>> JR3. value.serializers: Why is this required? If a user doesn't set > it, > >>>> we > >>>>> should just use value.serializer, right? Ditto for key.serializers. > >>>> No you right this was copy/past mistake > >>>> > >>>>> > >>>>> JR4. For all new public interfaces such as LargeMessageSerializer, > >>>>> PayloadStore and PayloadResponse, it would be useful to include the > >> full > >>>>> package name. > >>>> Updated the KIP now > >>>>> > >>>>> JR5. large.message.payload.store.retry.max.backoff.ms and > >>>>> large.message.payload.store.retry.delay.backoff.ms: Is the intention > >> to > >>>>> implement exponential backoff on retries? If so, it's more consistent > >> if > >>>> we > >>>>> can follow the existing naming convention like retry.backoff.max.ms > < > >>>> http://retry.backoff.max.ms/> and > >>>>> retry.backoff.ms <http://retry.backoff.ms/> < > http://retry.backoff.ms/> <http://retry.backoff.ms/ > >>> . > >>>> I have removed these to simplify the config more (as Luke suggested > >>>> initially) and added these to the consideration section. > >>>> > >>>>> > >>>>> JR6. large.message.skip.not.found.error : If the reference can't be > >>>> found, > >>>>> what value does the deserializer return? Note that null has a special > >>>>> meaning for tombstone in compacted topics. > >>>> The deserialiser will return `new byte[0]` not null. > >>>>> > >>>>> JR7. PayloadResponse: Why do we have both responseCode and > >>>>> PayloadStoreException? > >>>> We can do without responseCode, the initial though was to report > >> response > >>>> code form payload store. > >>>> Update the KIP. > >>>>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor > >> the > >>>>> metrics in a plugin through the Monitorable interface. > >>>> Oh nice, I didn’t know about this interface before. Updated the KIP > with > >>>> this now. > >>>>> > >>>>> JR9. Why do we need the protected field > >>>> PayloadStoreException.isRetryable? > >>>> Initial thought here was the serializer can retry the upload. But I > have > >>>> removed all the retry logic from serializer and it will be up to the > >>>> PayloadStore provider to implement this if they need it. > >>>>> > >>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an > >>>> interface. > >>>> It is updated now to interface. > >>>> > >>>> Hope the last version of the KIP is more simpler now > >>>> > >>>> Thanks > >>>> Omnia > >>>> > >>>>> On 23 Jul 2025, at 00:43, Jun Rao <j...@confluent.io.INVALID <mailto: > j...@confluent.io.INVALID> <mailto: > >> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID>>> wrote: > >>>>> > >>>>> Thanks for the KIP. A few comments. > >>>>> > >>>>> JR1. While the KIP is potentially useful, I am wondering who is > >>>> responsible > >>>>> for retention for the objects in the payload store. Once a message > >> with a > >>>>> reference is deleted, the key of the external object is lost and the > >>>> object > >>>>> may never be deleted. > >>>>> > >>>>> JR2. Configs: For all new configs, it would be useful to list their > >>>> types. > >>>>> > >>>>> JR3. value.serializers: Why is this required? If a user doesn't set > it, > >>>> we > >>>>> should just use value.serializer, right? Ditto for key.serializers. > >>>>> > >>>>> JR4. For all new public interfaces such as LargeMessageSerializer, > >>>>> PayloadStore and PayloadResponse, it would be useful to include the > >> full > >>>>> package name. > >>>>> > >>>>> JR5. large.message.payload.store.retry.max.backoff.ms and > >>>>> large.message.payload.store.retry.delay.backoff.ms: Is the intention > >> to > >>>>> implement exponential backoff on retries? If so, it's more consistent > >> if > >>>> we > >>>>> can follow the existing naming convention like retry.backoff.max.ms > <http://retry.backoff.max.ms/> < > >> http://retry.backoff.max.ms/> < > >>>> http://retry.backoff.max.ms/> and > >>>>> retry.backoff.ms <http://retry.backoff.ms/> < > http://retry.backoff.ms/> <http://retry.backoff.ms/ > >>> . > >>>>> > >>>>> JR6. large.message.skip.not.found.error : If the reference can't be > >>>> found, > >>>>> what value does the deserializer return? Note that null has a special > >>>>> meaning for tombstone in compacted topics. > >>>>> > >>>>> JR7. PayloadResponse: Why do we have both responseCode and > >>>>> PayloadStoreException? > >>>>> > >>>>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor > >> the > >>>>> metrics in a plugin through the Monitorable interface. > >>>>> > >>>>> JR9. Why do we need the protected field > >>>> PayloadStoreException.isRetryable? > >>>>> > >>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an > >>>> interface. > >>>>> > >>>>> Thanks, > >