I guess I understand now.

However, it seems to be an "issue" with record caching. Setting the
commit interval to zero would flush the cache each time, but it is not
the "right" config change. You should just disable the `KTable` cache
instead.

You can disable caching globally by setting `cache.max.bytes.buffering`
configuration parameter to zero.

Or you can disable caching for an individual KTable via
`Materialized#withCachingDisabled()` that you can pass into your
`aggregation()` operator.

Thus, overall, I don't see the need for a new operator.


-Matthias


On 10/7/20 1:51 PM, Javier Freire Riobo wrote:
> I have done a small demo example. I hope it serves as a clarification.
> 
> https://github.com/javierfreire/KTableToKStreamTest
> 
> Thank you very much
> 
> El mié., 7 oct. 2020 a las 3:01, Matthias J. Sax (<mj...@apache.org>)
> escribió:
> 
>> Thanks for the KIP.
>>
>> I am not sure if I understand the motivation. In particular the KIP says:
>>
>>> The main problem, apart from needing more code, is that if the same
>> event is received twice at the same time and the commit time is not 0, the
>> difference is deleted and nothing is emitted.
>>
>> Can you elaborate? Maybe you can provide a concrete example? I don't
>> understand the relationship between "the same event is received twice"
>> and a "non-zero commit time".
>>
>>
>> -Matthias
>>
>> On 10/6/20 6:25 AM, Javier Freire Riobo wrote:
>>> Hi all,
>>>
>>> I'd like to propose these changes to the Kafka Streams API.
>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value
>>>
>>> This is a proposal to convert a KTable to a KStream knowing the previous
>>> value of the registry.
>>>
>>> I also opened a proof-of-concept PR:
>>>
>>> PR#9321:  https://github.com/apache/kafka/pull/9381
>>>
>>> What do you think?
>>>
>>> Cheers,
>>> Javier Freire
>>>
>>
> 

Reply via email to