I agree that there are cases when it is useful to get the old and new
value. In fact, the DSL internally often tracks old and new value via a
`Change` value type.

We did have some discussion that it might be useful to expose this
currently internal `Change` type in the public API. But if we do this,
it would not be limited to a single operator.


-Matthias

On 10/8/20 1:41 PM, Javier Freire Riobo wrote:
> You're right. The behavior is correct with the cache disabled.
> 
> Anyway I think the operator I propose can be useful. The need to generate a
> value from the previous and current value of a record can be quite common.
> I think the only way to implement it is through an aggregate using a helper
> class. It is simpler and more natural to be able to receive the previous
> and current values in a function.
> 
> Anyway thank you very much. I have been working with Kafka for a short
> time, but I find it an amazing tool. Congratulations.
> 
> El jue., 8 oct. 2020 a las 21:10, Matthias J. Sax (<mj...@apache.org>)
> escribió:
> 
>> I guess I understand now.
>>
>> However, it seems to be an "issue" with record caching. Setting the
>> commit interval to zero would flush the cache each time, but it is not
>> the "right" config change. You should just disable the `KTable` cache
>> instead.
>>
>> You can disable caching globally by setting `cache.max.bytes.buffering`
>> configuration parameter to zero.
>>
>> Or you can disable caching for an individual KTable via
>> `Materialized#withCachingDisabled()` that you can pass into your
>> `aggregation()` operator.
>>
>> Thus, overall, I don't see the need for a new operator.
>>
>>
>> -Matthias
>>
>>
>> On 10/7/20 1:51 PM, Javier Freire Riobo wrote:
>>> I have done a small demo example. I hope it serves as a clarification.
>>>
>>> https://github.com/javierfreire/KTableToKStreamTest
>>>
>>> Thank you very much
>>>
>>> El mié., 7 oct. 2020 a las 3:01, Matthias J. Sax (<mj...@apache.org>)
>>> escribió:
>>>
>>>> Thanks for the KIP.
>>>>
>>>> I am not sure if I understand the motivation. In particular the KIP
>> says:
>>>>
>>>>> The main problem, apart from needing more code, is that if the same
>>>> event is received twice at the same time and the commit time is not 0,
>> the
>>>> difference is deleted and nothing is emitted.
>>>>
>>>> Can you elaborate? Maybe you can provide a concrete example? I don't
>>>> understand the relationship between "the same event is received twice"
>>>> and a "non-zero commit time".
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 10/6/20 6:25 AM, Javier Freire Riobo wrote:
>>>>> Hi all,
>>>>>
>>>>> I'd like to propose these changes to the Kafka Streams API.
>>>>>
>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value
>>>>>
>>>>> This is a proposal to convert a KTable to a KStream knowing the
>> previous
>>>>> value of the registry.
>>>>>
>>>>> I also opened a proof-of-concept PR:
>>>>>
>>>>> PR#9321:  https://github.com/apache/kafka/pull/9381
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Cheers,
>>>>> Javier Freire
>>>>>
>>>>
>>>
>>
> 

Reply via email to