Hi again, Patrik, You'll probably be interested in this recent Jira: https://issues.apache.org/jira/browse/KAFKA-7658
You have a good point about the overhead of going through an intermediate topic... I can see how explicit topic management is an operational burden, and you're also right that the changelog topic only gets read on state restoration. That was an oversight on my part. I think that with KAFKA-7658 and https://github.com/apache/kafka/pull/5779, you'll have two good options in the future. To solve your problem *right now*, you can circumvent the null filtering by wrapping the values of your stream. For example, immediately before the reduce, you could mapValues and wrap the values with Optional. Then, your reduce function can unwrap the Optional and return null if it's empty. Does that make sense? This comes with an important caveat, though, which is part of the motivation for this roadblock to begin with: if your incoming data gets repartitioned in your topology, then the order of records for the key is not deterministic. This would break the semantics of your reduce-to-latest function, and, indeed, any non-commutative reduce function. For example, if you have a topic like: dummykey1: {realkey: A, value: 4} dummykey2: {realkey: A, value: 5} and you do a groupBy( select realkey ) and then reduce( keep latest value) Then, if dummykey1 and dummykey2 are in different partitions, the result would be either A:4 or A:5, depending on which input partition processed first. We have discussed several times solutions to resolve this issue, but it's quite complex in the details. Nevertheless, if you're careful and ensure that you don't have multiple threads producing the same key into the input topic, and also that you don't have a repartition in the middle, then this should work for you. Hope this helps! -john On Sun, Nov 18, 2018 at 7:04 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hi Patrik, > > Thanks for explaining your use case to us. While we can still discuss how > KStream should interpret null-values in aggregations, one workaround atm: > if you deduplication logic can be written as a transformValues operation, > you can do the following: > > > builder.table("source-topic").transformValues(... > Materialized.as("store-name")) > > Note that in a recent PR that we are merging, the source KTable from > builder.table() would not be materialized if users do not specify a > materialized store name, only the value-transformed KTable will be > materialized: > > https://github.com/apache/kafka/pull/5779 > > > Would that work for you? > > Guozhang > > > On Mon, Oct 29, 2018 at 2:08 AM Patrik Kleindl <pklei...@gmail.com> wrote: > > > Hi John and Matthias > > thanks for the questions, maybe explaining our use case helps a bit: > > We are receiving CDC records (row-level insert/update/delete) in one > topic > > per table. The key is derived from the DB records, the value is null in > > case of deletes. Those would be the immutable facts I guess. > > These topics are first streamed through a deduplication Transformer to > drop > > changes on irrelevant fields. > > The results are translated to KTables and joined to each other to > represent > > the same result as the SQLs on the database, but faster. At this stage > the > > delete/null records matter because if a record gets deleted then we want > it > > to drop out of the join too. -> Our reduce-approach produced unexpected > > results here. > > We took the deduplication step separately because in some cases we only > > need the the KStream for processing. > > If you see a simpler/cleaner approach here I'm open to suggestions, of > > course. > > > > Regarding the overhead: > > 1) Named topics create management/maintenance overhead because they have > to > > be created/treated separately (auto-create is not an option) and be > > considered in future changes, topology changes/resets and so on. The > > internal topic removes most of those issues. > > 2) One of our developers came up with the question if the traffic to/from > > the broker was actually the same in both scenarios, we expect that the > same > > is written to the broker for the named topic as well as the reduce-case, > > but if the KTable is maintained inside a streams topology, does it have > to > > read back everything it sends to the broker or can it keep the table > > internally? I hope it is understandable what I mean, otherwise I can try > > the explain it more clearly. > > > > best regards > > > > Patrik > > > > > > On Sat, 27 Oct 2018 at 23:50, John Roesler <j...@confluent.io> wrote: > > > > > Hi again Patrik, > > > > > > Actually, this is a good question... Can you share some context about > why > > > you need to convert a stream to a table (including nulls as > retractions)? > > > > > > Thanks, > > > -John > > > > > > On Fri, Oct 26, 2018 at 5:36 PM Matthias J. Sax <matth...@confluent.io > > > > > wrote: > > > > > > > I don't know your overall application setup. However, a KStream > > > > semantically models immutable facts and there is not update semantic. > > > > Thus, it seems semantically questionable, to allow changing the > > > > semantics from facts to updates (the other way is easier IMHO, and > thus > > > > supported via KTable#toStream()). > > > > > > > > Does this make sense? > > > > > > > > Having said this: you _can_ write a KStream into a topic an read it > > back > > > > as KTable. But it's semantically questionable to do so, IMHO. Maybe > it > > > > makes sense for your specific application, but in general I don't > think > > > > it does make sense. > > > > > > > > > > > > -Matthias > > > > > > > > On 10/26/18 9:30 AM, John Roesler wrote: > > > > > Hi Patrik, > > > > > > > > > > Just to drop one observation in... Streaming to a topic and then > > > > consuming > > > > > it as a table does create overhead, but so does reducing a stream > to > > a > > > > > table, and I think it's actually the same in either case. > > > > > > > > > > They both require a store to collect the table state, and in both > > > cases, > > > > > the stores need to have a changelog topic. For the "reduce" > version, > > > it's > > > > > an internal changelog topic, and for the "topic-to-table" version, > > the > > > > > store can use the intermediate topic as its changelog. > > > > > > > > > > This doesn't address your ergonomic concern, but it seemed worth > > > pointing > > > > > out that (as far as I can tell), there doesn't seem to be a > > difference > > > in > > > > > overhead. > > > > > > > > > > Hope this helps! > > > > > -John > > > > > > > > > > On Fri, Oct 26, 2018 at 3:27 AM Patrik Kleindl <pklei...@gmail.com > > > > > > wrote: > > > > > > > > > >> Hello Matthias, > > > > >> thank you for the explanation. > > > > >> Streaming back to a topic and consuming this as a KTable does > > respect > > > > the > > > > >> null values as deletes, correct? But at the price of some > overhead. > > > > >> Is there any (historical, technical or emotional;-)) reason that > no > > > > simple > > > > >> one-step stream-to-table operation exists? > > > > >> Best regards > > > > >> Patrik > > > > >> > > > > >>> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax < > > > matth...@confluent.io > > > > >: > > > > >>> > > > > >>> Patrik, > > > > >>> > > > > >>> `null` values in a KStream don't have delete semantics (it's not > a > > > > >>> changelog stream). That's why we drop them in the KStream#reduce > > > > >>> implemenation. > > > > >>> > > > > >>> If you want to explicitly remove results for a key from the > result > > > > >>> KTable, your `Reducer#apply()` implementation must return `null` > -- > > > the > > > > >>> result of #apply() has changelog/KTable semantics and `null` is > > > > >>> interpreted as delete for this case. > > > > >>> > > > > >>> If you want to use `null` from your KStream to trigger reduce() > to > > > > >>> delete, you will need to use a surrogate value for this, ie, do a > > > > >>> mapValues() before the groupByKey() call, an replace `null` > values > > > with > > > > >>> the surrogate-delete-marker that you can evaluate in > > > `Reducer#apply()` > > > > >>> to return `null` for this case. > > > > >>> > > > > >>> Hope this helps. > > > > >>> > > > > >>> -Matthias > > > > >>> > > > > >>>> On 10/25/18 10:36 AM, Patrik Kleindl wrote: > > > > >>>> Hello > > > > >>>> > > > > >>>> Recently we noticed a lot of warning messages in the logs which > > > > pointed > > > > >> to > > > > >>>> this method (we are running 2.0): > > > > >>>> > > > > >>>> KStreamReduce > > > > >>>> public void process(final K key, final V value) { > > > > >>>> // If the key or value is null we don't need to > proceed > > > > >>>> if (key == null || value == null) { > > > > >>>> LOG.warn( > > > > >>>> "Skipping record due to null key or value. > > > key=[{}] > > > > >>>> value=[{}] topic=[{}] partition=[{}] offset=[{}]", > > > > >>>> key, value, context().topic(), > > > > context().partition(), > > > > >>>> context().offset() > > > > >>>> ); > > > > >>>> metrics.skippedRecordsSensor().record(); > > > > >>>> return; > > > > >>>> } > > > > >>>> > > > > >>>> This was triggered for every record from a stream with an > existing > > > key > > > > >> but > > > > >>>> a null value which we put through groupBy/reduce to get a > KTable. > > > > >>>> My assumption was that this was the correct way inside a streams > > > > >>>> application to get a KTable but this prevents deletion of > records > > > from > > > > >>>> working. > > > > >>>> > > > > >>>> Our alternativ is to send the stream back to a named topic and > > > build a > > > > >> new > > > > >>>> table from it, but this is rather cumbersome and requires a > > separate > > > > >> topic > > > > >>>> which also can't be cleaned up by the streams reset tool. > > > > >>>> > > > > >>>> Did I miss anything relevant here? > > > > >>>> Would it be possible to create a separate method for KStream to > > > > achieve > > > > >>>> this directly? > > > > >>>> > > > > >>>> best regards > > > > >>>> > > > > >>>> Patrik > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > > > -- > -- Guozhang >