Hi Jun > JR11. Do you think that we need to add key.serializers and > key.deserializers or do you think covering large messages in value is > enough? The large message issue usually is a value issue, I never saw a key that is bigger than broker message size. If I would add key.serializers and key.deserializers it would be for consistency and maybe there are use-cases where developers want to apply multiple serializations in order on the key as well outside the context of the large message support. I updated the KIP to add these two configs now.
> JR12. We can load ComposableSerializer automatically if value.serializers > or value.deserializers are specified. But, it seems that we could keep > ComposableSerializer as an internal implementation. For example, > ProducerInterceptors is automatically loaded when multiple interceptors are > specified and is an internal class. I updated the kip to highlight that the changes is updating ProducerConfig and ConsumerConfig to have new config for serializers/deserializers and not the actual class this is implementation details and not public interfaces. > JR17. We could estimate the size after compression, but the estimator is > not 100% accurate. It seems that it's simpler to just use the original > message size. We can keep it as original message size, I was thinking if it is good enough for max.request.size it might be good enough for this. I updated the KIP anyway to simplify it and keep it to the original size. Hope the final version addressed all feedbacks and we can resume with the voting Thanks Omnia > On 8 Aug 2025, at 22:10, Jun Rao <j...@confluent.io.INVALID> wrote: > > Hi, Omnia, > > Thanks for the reply. > > JR11. Do you think that we need to add key.serializers and > key.deserializers or do you think covering large messages in value is > enough? > > JR12. We can load ComposableSerializer automatically if value.serializers > or value.deserializers are specified. But, it seems that we could keep > ComposableSerializer as an internal implementation. For example, > ProducerInterceptors is automatically loaded when multiple interceptors are > specified and is an internal class. > > JR17. We could estimate the size after compression, but the estimator is > not 100% accurate. It seems that it's simpler to just use the original > message size. > > Jun > > On Mon, Aug 4, 2025 at 10:33 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com > <mailto:o.g.h.ibra...@gmail.com>> > wrote: > >> >> Hi Jun >>> JR11. value.serializers and value.deserializers: Should they be of type >>> List? Also, where are key.serializers and key.deserializers? >>> >> Updated now >>> JR12. Do we still need ComposableSerializer and ComposableDeserializer? >> The initial thinking here was if `value.serializers or value.deserializers >> ` exists the client will load ComposableSerializer or >> ComposableDeserializer automatically and use them. Unfortunately this >> would need us to define these serializers. >> >> The other option is to update `Plugin<Serializer<V>> >> valueSerializerPlugin` and KafkaProducer constructor to accept >> List<Serializer<V>> and move the logic of the ComposableSerializer into >> KafkaProducer::doSend which when we do serialization (same with >> KafkaConsumer). This option hide the logic and reduce exposure for client >> to these. >> WDYT? >> >>> >>> JR13. large.message.payload.store.class : should it be of type class? >> Updated >>> >>> JR14. >>> >> org.apache.kafka.common.serialization.largemessage.LargeMessageSerializer : >>> The name seems redundant since largemessage appears twice. >> Updated >>> >>> JR15. PayloadResponse: It still mentions response code. It mentions >>> "isRetryable flag", which no longer exists in PayloadStoreException. >> There >>> are typos in "then it will serialiser will”. >> The KIP is updated now >> >>> JR16. Regarding returning new byte[0] if >> large.message.skip.not.found.error >>> is true, this will likely fail the next deserializer and the application >>> won't have the right context of the error. It's probably better to just >>> propagate the specific exception and let the caller handle it. >> You right this will cause issue if there is another deserializer waiting >> for the data. I have updated the KIP with this. >> >>> >>> JR17. LargeMessageSerializer: "Check if the estimated size of the data >>> (bytes) after applying provided compression (if there is one)" >> Compression >>> actually happens after serialization and is done on a batch of records. >> Yes the compression itself happened after but we also have >> `estimateSizeInBytesUpperBound` which my understanding is this take the >> compression type into the account as well >>> >>> >>> JR18. Could you define the type T for LargeMessageSerializer? >> Update the KIP this T would be byte[] >> >> Thanks >> Omnia >>> >>> Jun >>> >>> On Fri, Jul 25, 2025 at 6:28 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com >> <mailto:o.g.h.ibra...@gmail.com>> >>> wrote: >>> >>>> Hi Jun, thanks for having the time to review this >>>> >>>>> JR1. While the KIP is potentially useful, I am wondering who is >>>> responsible >>>>> for retention for the objects in the payload store. Once a message >> with a >>>>> reference is deleted, the key of the external object is lost and the >>>> object >>>>> may never be deleted. >>>> >>>> The `ttl` in the object store is the responsibility of the owner of this >>>> store it should be configured in away that is reasonable with the >> retention >>>> config in Kafka. >>>> I have updated the KIP with `Consideration` section. >>>>> >>>>> JR2. Configs: For all new configs, it would be useful to list their >>>> types. >>>> Updated the KIP now >>>>> >>>>> JR3. value.serializers: Why is this required? If a user doesn't set it, >>>> we >>>>> should just use value.serializer, right? Ditto for key.serializers. >>>> No you right this was copy/past mistake >>>> >>>>> >>>>> JR4. For all new public interfaces such as LargeMessageSerializer, >>>>> PayloadStore and PayloadResponse, it would be useful to include the >> full >>>>> package name. >>>> Updated the KIP now >>>>> >>>>> JR5. large.message.payload.store.retry.max.backoff.ms and >>>>> large.message.payload.store.retry.delay.backoff.ms: Is the intention >> to >>>>> implement exponential backoff on retries? If so, it's more consistent >> if >>>> we >>>>> can follow the existing naming convention like retry.backoff.max.ms < >>>> http://retry.backoff.max.ms/> and >>>>> retry.backoff.ms <http://retry.backoff.ms/> <http://retry.backoff.ms/> >>>>> <http://retry.backoff.ms/ >>> . >>>> I have removed these to simplify the config more (as Luke suggested >>>> initially) and added these to the consideration section. >>>> >>>>> >>>>> JR6. large.message.skip.not.found.error : If the reference can't be >>>> found, >>>>> what value does the deserializer return? Note that null has a special >>>>> meaning for tombstone in compacted topics. >>>> The deserialiser will return `new byte[0]` not null. >>>>> >>>>> JR7. PayloadResponse: Why do we have both responseCode and >>>>> PayloadStoreException? >>>> We can do without responseCode, the initial though was to report >> response >>>> code form payload store. >>>> Update the KIP. >>>>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor >> the >>>>> metrics in a plugin through the Monitorable interface. >>>> Oh nice, I didn’t know about this interface before. Updated the KIP with >>>> this now. >>>>> >>>>> JR9. Why do we need the protected field >>>> PayloadStoreException.isRetryable? >>>> Initial thought here was the serializer can retry the upload. But I have >>>> removed all the retry logic from serializer and it will be up to the >>>> PayloadStore provider to implement this if they need it. >>>>> >>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an >>>> interface. >>>> It is updated now to interface. >>>> >>>> Hope the last version of the KIP is more simpler now >>>> >>>> Thanks >>>> Omnia >>>> >>>>> On 23 Jul 2025, at 00:43, Jun Rao <j...@confluent.io.INVALID >>>>> <mailto:j...@confluent.io.INVALID> <mailto: >> j...@confluent.io.INVALID <mailto:j...@confluent.io.INVALID>>> wrote: >>>>> >>>>> Thanks for the KIP. A few comments. >>>>> >>>>> JR1. While the KIP is potentially useful, I am wondering who is >>>> responsible >>>>> for retention for the objects in the payload store. Once a message >> with a >>>>> reference is deleted, the key of the external object is lost and the >>>> object >>>>> may never be deleted. >>>>> >>>>> JR2. Configs: For all new configs, it would be useful to list their >>>> types. >>>>> >>>>> JR3. value.serializers: Why is this required? If a user doesn't set it, >>>> we >>>>> should just use value.serializer, right? Ditto for key.serializers. >>>>> >>>>> JR4. For all new public interfaces such as LargeMessageSerializer, >>>>> PayloadStore and PayloadResponse, it would be useful to include the >> full >>>>> package name. >>>>> >>>>> JR5. large.message.payload.store.retry.max.backoff.ms and >>>>> large.message.payload.store.retry.delay.backoff.ms: Is the intention >> to >>>>> implement exponential backoff on retries? If so, it's more consistent >> if >>>> we >>>>> can follow the existing naming convention like retry.backoff.max.ms >>>>> <http://retry.backoff.max.ms/> < >> http://retry.backoff.max.ms/> < >>>> http://retry.backoff.max.ms/> and >>>>> retry.backoff.ms <http://retry.backoff.ms/> <http://retry.backoff.ms/> >>>>> <http://retry.backoff.ms/ >>> . >>>>> >>>>> JR6. large.message.skip.not.found.error : If the reference can't be >>>> found, >>>>> what value does the deserializer return? Note that null has a special >>>>> meaning for tombstone in compacted topics. >>>>> >>>>> JR7. PayloadResponse: Why do we have both responseCode and >>>>> PayloadStoreException? >>>>> >>>>> JR8. Why do we need PayloadStore.metrics? Note that we could monitor >> the >>>>> metrics in a plugin through the Monitorable interface. >>>>> >>>>> JR9. Why do we need the protected field >>>> PayloadStoreException.isRetryable? >>>>> >>>>> JR10. As Luke mentioned earlier, we could turn PayloadStore to an >>>> interface. >>>>> >>>>> Thanks,