Hi Luke, I had another round of updating the KIP. 
I now made `PayloadStore::publish` returns a simplified version of 
“PayloadResponse” which will now will simply have the "fullPayloadPath", and 
“PayloadException”. 
If PayloadResponse
-  contains PayloadStoreException with `isRetriable` flag set to true then this 
will trigger the retry logic 
- If response code is 404 and large.message.skip.not.found.error then we will 
skip failing on the download in the deserialiser side. 

WDYT? 

Regards 
Omnia


> On 22 May 2025, at 15:16, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote:
> 
> Hi Luke, sorry for late response. 
>> IMO, the retry should be the logic inside the
>> "large.message.payload.store.class"
>> implementation. If we really want it, I think we need to make it clear in
>> which circumstance we will retry. For example, if it's an unknown exception
>> thrown from S3 API, what will we do to it?
> There was one class definition missing which is PayloadStoreException that is 
> used in PayloadResponse. The PayloadStoreException has flag if it is true 
> then we retry. And then serialised retry on any exception with this flag. We 
> can make publish and download methods throw  PayloadStoreException directly 
> to simplify things WDYT?
> 
>> Moving it into the store class makes it much clearer. And it's good to have
>> a default implementation.
>> But to me, it's still an implementation-level detail that we don't need to
>> expose to users to implement it.
>> Could I know more about when the Id generator will be invoked?
>> My thought is :
>> Users can implement a `publish` method to publish like this:
>> public PayloadResponse publish(String topic, byte[] data) {
>>  String id = genId();
>>  // put the id and data to the remote storage
>>  s3Client.put(id, data, ...);
>> }
>> 
>> So, with the id method in the interface, who will invoke it? Suppose it
>> will be the serializer/deserializer, but no one passes the generated id to
>> the publish method, how do we use it?
> 
> It will be invoked by the payload class when publish the payload. So it is an 
> implementation details of this class yes. 
> 
>> 7. Why do we need "PayloadResponse"? Why can't we return a String or a URL
>> object?
> 
> Originally because I needed something bit of wrapper for re-try and skip 
> logic. But as I mentioned in first point we can make publish and download 
> methods return the ref or payload otherwise they throw PayloadStoreException 
> directly to simplify things WDYT? Other option have publish return only ref 
> as string and download would be the one that could throw 
> PayloadNotFoundException. 
> 
>> 8. Could we change the abstract class to interface?
> We can once we agree on what config we will drop and move to the details of 
> payload provided by the users. 
> 
> regards
> Omnia
> 
>> On 19 May 2025, at 09:54, Luke Chen <show...@gmail.com> wrote:
>> 
>> Hi Omnia,
>> 
>> Thanks for the explanation and update.
>> It's better now.
>> 
>> Questions:
>>> 2. It's not clear how the "retry count" comes into play in the KIP. It
>>> needs more explanation.
>> My initial thinking is the retry configs are a must for all blob stores, so
>> we can provide them, and validate them for free for all blob stores so not
>> every implementation will go through verifying them.
>> 
>> IMO, the retry should be the logic inside the
>> "large.message.payload.store.class"
>> implementation. If we really want it, I think we need to make it clear in
>> which circumstance we will retry. For example, if it's an unknown exception
>> thrown from S3 API, what will we do to it?
>> 
>>> 6. About the BlobIdGenerator, why do we need it, could you explain more?
>>> Again, I thought we only need to replace value to a path, and add the
>>> "large-message" header, so that when consumer reads this message, it'll
>>> read the path from value and get the original data via BlobStore. Why do
>> we
>>> need this ID generator? I think users should do the object naming when
>>> putting the object by themselves, not via another interface. WDYT?
>> In some cases generating the ID might need some smart work for example to
>> avoid s3 throttling the recommended way on their doc it to create sub paths
>> under the original bucket, to decide this we might hash the data to find a
>> suitable sub-path.
>> Here is an example of how I would generate an path for s3 file
>> ```
>> public String id(byte[] data) {
>>   String subFolder = topic + "-" + Utils.toPositive(Utils.murmur2(data)) %
>> distributionFactor // distributionFactor is a config for the Id generator
>> and it represent the max number of sub-folders under the bucket
>>   return subFolder + “/“ + UUID.randomUUID().toString()
>> }
>> ```
>> Hope this example clarify a bit. However I do agree here it might not need
>> a class. I have move it to be part of the store class.
>> 
>> Moving it into the store class makes it much clearer. And it's good to have
>> a default implementation.
>> But to me, it's still an implementation-level detail that we don't need to
>> expose to users to implement it.
>> Could I know more about when the Id generator will be invoked?
>> My thought is :
>> Users can implement a `publish` method to publish like this:
>> public PayloadResponse publish(String topic, byte[] data) {
>>  String id = genId();
>>  // put the id and data to the remote storage
>>  s3Client.put(id, data, ...);
>> }
>> 
>> So, with the id method in the interface, who will invoke it? Suppose it
>> will be the serializer/deserializer, but no one passes the generated id to
>> the publish method, how do we use it?
>> 
>> 7. Why do we need "PayloadResponse"? Why can't we return a String or a URL
>> object?
>> 
>> 8. Could we change the abstract class to interface?
>> 
>> Thanks.
>> Luke
>> 
>> 
>> 
>> 
>> On Wed, Apr 30, 2025 at 9:21 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
>> <mailto:o.g.h.ibra...@gmail.com>>
>> wrote:
>> 
>>> Hi Luke,
>>> 
>>>>> 3. What does "LargeMessageFormatter" do in the process?
>>>>> I thought all we want to do is to replace the "large value data" into a
>>>>> path, and consumers will read the path via blob store class.
>>>>> All these should be done in serializer/deserializer, so why do we need
>>> the
>>>>> formatter?
>>>> 
>>>> I wanted to bit of more info than just the path to download, for example
>>> I want to add stuff like the class path for the original blob store for
>>> example if consumer is setup with the unmatched blob store to the one used
>>> during publishing.
>>>> I have updated the KIP to simplify this by having this always as a
>>> simple json of path and publisher class which can be represented as
>>> PayloadReferenceValue. WDYT?
>>> 
>>> I thought of another case where having the freedom to form the reference
>>> might be nice feature, which is DR. Let’s imagine this case where someone
>>> publish large messages to S3 and reference to Kafka topic then they want to
>>> have DR. This can be achievable if they have mirrored Kafka topic which
>>> contains the references but if S3 is unreachable form the DR backup
>>> location then the reference they have is bit useless. However if the
>>> message formatter is customisable then dev can implement a complicated
>>> store that publish to two store locations and the publish both references
>>> to Kafka as one message and the consumer store can download from either
>>> buckets that are available. I think keeping the door open to such use-case
>>> might be good feature but also having such use case might be questionable a
>>> bit with the latency it will add as we will be publishing to N number of
>>> stores.
>>> 
>>> Regards
>>> Omnia
>>> 
>>>> On 24 Apr 2025, at 17:40, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote:
>>>> 
>>>> Hi Luke, thanks for having the time to look into the KIP
>>>>> 2. It's not clear how the "retry count" comes into play in the KIP. It
>>>>> needs more explanation.
>>>> My initial thinking is the retry configs are a must for all blob stores,
>>> so we can provide them, and validate them for free for all blob stores so
>>> not every implementation will go through verifying them.
>>>> 
>>>>> 3. What does "LargeMessageFormatter" do in the process?
>>>>> I thought all we want to do is to replace the "large value data" into a
>>>>> path, and consumers will read the path via blob store class.
>>>>> All these should be done in serializer/deserializer, so why do we need
>>> the
>>>>> formatter?
>>>> 
>>>> I wanted to bit of more info than just the path to download, for example
>>> I want to add stuff like the class path for the original blob store for
>>> example if consumer is setup with the unmatched blob store to the one used
>>> during publishing.
>>>> I have updated the KIP to simplify this by having this always as a
>>> simple json of path and publisher class which can be represented as
>>> PayloadReferenceValue. WDYT?
>>>> 
>>>>> 4. In the BlobStore, it looks like we presume users will use object
>>> stores,
>>>>> which is not good.
>>>>> Could we make it more generic? Javadoc, method names, …
>>>> This is a good point, I have updated the method names and Javadoc. I
>>> also thinking of renaming the class name to PayloadStoreinstead of
>>> BlobStore as blob store still tide to object store as well. To set some
>>> context here, I am proposing this after working with some community form
>>> Apache Cassandra who are working on Cassandra CEP <
>>> https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-44%3A+Kafka+integration+for+Cassandra+CDC+using+Sidecar#CEP44:KafkaintegrationforCassandraCDCusingSidecar-LargeBlob>-44
>>> to handle large CDC and the initial thinking was let’s publish any large
>>> cdc to an object store instead of Kafka this why the naming was suggesting
>>> “blob store” only.
>>>> 
>>>>> 5. It would be good to have some explanation for the purpose of each new
>>>>> interface/class, and clear javadoc for each method.
>>>> 
>>>> Updated the KIP with javadocs
>>>> 
>>>>> 6. About the BlobIdGenerator, why do we need it, could you explain more?
>>>>> Again, I thought we only need to replace value to a path, and add the
>>>>> "large-message" header, so that when consumer reads this message, it'll
>>>>> read the path from value and get the original data via BlobStore. Why
>>> do we
>>>>> need this ID generator? I think users should do the object naming when
>>>>> putting the object by themselves, not via another interface. WDYT?
>>>> In some cases generating the ID might need some smart work for example
>>> to avoid s3 throttling the recommended way on their doc it to create sub
>>> paths under the original bucket, to decide this we might hash the data to
>>> find a suitable sub-path.
>>>> Here is an example of how I would generate an path for s3 file
>>>> ```
>>>> public String id(byte[] data) {
>>>>   String subFolder = topic + "-" +
>>> Utils.toPositive(Utils.murmur2(data)) % distributionFactor //
>>> distributionFactor is a config for the Id generator and it represent the
>>> max number of sub-folders under the bucket
>>>>   return subFolder + “/“ + UUID.randomUUID().toString()
>>>> }
>>>> ```
>>>> Hope this example clarify a bit. However I do agree here it might not
>>> need a class. I have move it to be part of the store class.
>>>> 
>>>> Please let me know WDYT of the final shape of the KIP now
>>>> 
>>>> Thanks
>>>> Omnia
>>>> 
>>>>> On 24 Apr 2025, at 13:31, Luke Chen <show...@gmail.com> wrote:
>>>>> 
>>>>> Hi Omnia,
>>>>> 
>>>>> Thanks for proposing this feature that many users expected.
>>>>> 
>>>>> Some comments:
>>>>> 1. It's quite interesting to see the idea of chained
>>>>> serializer/deserializer used here. I like it.
>>>>> 
>>>>> 2. It's not clear how the "retry count" comes into play in the KIP. It
>>>>> needs more explanation.
>>>>> 
>>>>> 3. What does "LargeMessageFormatter" do in the process?
>>>>> I thought all we want to do is to replace the "large value data" into a
>>>>> path, and consumers will read the path via blob store class.
>>>>> All these should be done in serializer/deserializer, so why do we need
>>> the
>>>>> formatter?
>>>>> 
>>>>> 4. In the BlobStore, it looks like we presume users will use object
>>> stores,
>>>>> which is not good.
>>>>> Could we make it more generic? Javadoc, method names, ...
>>>>> 
>>>>> 5. It would be good to have some explanation for the purpose of each new
>>>>> interface/class, and clear javadoc for each method.
>>>>> 
>>>>> 6. About the BlobIdGenerator, why do we need it, could you explain more?
>>>>> Again, I thought we only need to replace value to a path, and add the
>>>>> "large-message" header, so that when consumer reads this message, it'll
>>>>> read the path from value and get the original data via BlobStore. Why
>>> do we
>>>>> need this ID generator? I think users should do the object naming when
>>>>> putting the object by themselves, not via another interface. WDYT?
>>>>> 
>>>>> Thanks.
>>>>> Luke
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Apr 10, 2025 at 9:31 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi there I would like to start discussions on
>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1159%3A+Large+message+reference+based+Serializer
>>>>>> 
>>>>>> Thanks
>>>>>> Omnia
> 

Reply via email to