Hello Matthias, thank you for the explanation. Streaming back to a topic and consuming this as a KTable does respect the null values as deletes, correct? But at the price of some overhead. Is there any (historical, technical or emotional;-)) reason that no simple one-step stream-to-table operation exists? Best regards Patrik
> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <matth...@confluent.io>: > > Patrik, > > `null` values in a KStream don't have delete semantics (it's not a > changelog stream). That's why we drop them in the KStream#reduce > implemenation. > > If you want to explicitly remove results for a key from the result > KTable, your `Reducer#apply()` implementation must return `null` -- the > result of #apply() has changelog/KTable semantics and `null` is > interpreted as delete for this case. > > If you want to use `null` from your KStream to trigger reduce() to > delete, you will need to use a surrogate value for this, ie, do a > mapValues() before the groupByKey() call, an replace `null` values with > the surrogate-delete-marker that you can evaluate in `Reducer#apply()` > to return `null` for this case. > > Hope this helps. > > -Matthias > >> On 10/25/18 10:36 AM, Patrik Kleindl wrote: >> Hello >> >> Recently we noticed a lot of warning messages in the logs which pointed to >> this method (we are running 2.0): >> >> KStreamReduce >> public void process(final K key, final V value) { >> // If the key or value is null we don't need to proceed >> if (key == null || value == null) { >> LOG.warn( >> "Skipping record due to null key or value. key=[{}] >> value=[{}] topic=[{}] partition=[{}] offset=[{}]", >> key, value, context().topic(), context().partition(), >> context().offset() >> ); >> metrics.skippedRecordsSensor().record(); >> return; >> } >> >> This was triggered for every record from a stream with an existing key but >> a null value which we put through groupBy/reduce to get a KTable. >> My assumption was that this was the correct way inside a streams >> application to get a KTable but this prevents deletion of records from >> working. >> >> Our alternativ is to send the stream back to a named topic and build a new >> table from it, but this is rather cumbersome and requires a separate topic >> which also can't be cleaned up by the streams reset tool. >> >> Did I miss anything relevant here? >> Would it be possible to create a separate method for KStream to achieve >> this directly? >> >> best regards >> >> Patrik >> >