Hello Matthias,
thank you for the explanation.
Streaming back to a topic and consuming this as a KTable does respect the null 
values as deletes, correct? But at the price of some overhead.
Is there any (historical, technical or emotional;-)) reason that no simple 
one-step stream-to-table operation exists?
Best regards
Patrik

> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <matth...@confluent.io>:
> 
> Patrik,
> 
> `null` values in a KStream don't have delete semantics (it's not a
> changelog stream). That's why we drop them in the KStream#reduce
> implemenation.
> 
> If you want to explicitly remove results for a key from the result
> KTable, your `Reducer#apply()` implementation must return `null` -- the
> result of #apply() has changelog/KTable semantics and `null` is
> interpreted as delete for this case.
> 
> If you want to use `null` from your KStream to trigger reduce() to
> delete, you will need to use a surrogate value for this, ie, do a
> mapValues() before the groupByKey() call, an replace `null` values with
> the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
> to return `null` for this case.
> 
> Hope this helps.
> 
> -Matthias
> 
>> On 10/25/18 10:36 AM, Patrik Kleindl wrote:
>> Hello
>> 
>> Recently we noticed a lot of warning messages in the logs which pointed to
>> this method (we are running 2.0):
>> 
>> KStreamReduce
>> public void process(final K key, final V value) {
>>            // If the key or value is null we don't need to proceed
>>            if (key == null || value == null) {
>>                LOG.warn(
>>                    "Skipping record due to null key or value. key=[{}]
>> value=[{}] topic=[{}] partition=[{}] offset=[{}]",
>>                    key, value, context().topic(), context().partition(),
>> context().offset()
>>                );
>>                metrics.skippedRecordsSensor().record();
>>                return;
>>            }
>> 
>> This was triggered for every record from a stream with an existing key but
>> a null value which we put through groupBy/reduce to get a KTable.
>> My assumption was that this was the correct way inside a streams
>> application to get a KTable but this prevents deletion of records from
>> working.
>> 
>> Our alternativ is to send the stream back to a named topic and build a new
>> table from it, but this is rather cumbersome and requires a separate topic
>> which also can't be cleaned up by the streams reset tool.
>> 
>> Did I miss anything relevant here?
>> Would it be possible to create a separate method for KStream to achieve
>> this directly?
>> 
>> best regards
>> 
>> Patrik
>> 
> 

Reply via email to