Hi John,

Thanks for your explanation.

I have an answer to the practical question, i.e. a null aggregator
value should be interpreted as a fatal application error.

On the other hand, looking at the app topology, I see that a message
from KSTREAM-REDUCE-0000000002 / "table" goes goes to
KTABLE-SELECT-0000000006 which in turn forwards data to
KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
this point I assume that data goes back to kafka into a *-repartition
topic, after that the message is read from kafka by
KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
[aggregated-table]), where the actual aggregation takes place. What I
don't get is where this Change value comes from, I mean if it's been
produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
message goes through kafka where it gets serialized, and looking at
kafka "repartition" topic, it contains regular values, not a pair of
old/new.

As far as I understand, Change is a purely in-memory representation of
the state for a particular key, and at no point it's serialized back
to kafka, yet somehow this Change values makes it to reducer. I feel
like I am missing something here. Could you please clarify this?

Can you please point me to a place in kafka-streams sources where a
Change of newValue/oldValue is produced, so I could take a look? I
found KTableReduce implementation, but can't find who makes these
Change value.
On Fri, Jul 13, 2018 at 6:17 PM John Roesler <j...@confluent.io> wrote:
>
> Hi again Vasily,
>
> Ok, it looks to me like this behavior is the result of the un-clean
> topology change.
>
> Just in case you're interested, here's what I think happened.
>
> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table" )
> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
> forwarding both the old and new value that we are able to maintain
> aggregates using the subtractor/adder pairs)
>
> 2. In the full topology, these old/new pairs go through some
> transformations, but still in some form eventually make their way down to
> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
>
> 3. The reduce processor logic looks like this:
> final V oldAgg = store.get(key);
> V newAgg = oldAgg;
>
> // first try to add the new value
> if (value.newValue != null) {
>     if (newAgg == null) {
>         newAgg = value.newValue;
>     } else {
>         newAgg = addReducer.apply(newAgg, value.newValue);
>     }
> }
>
> // then try to remove the old value
> if (value.oldValue != null) {
>     // Here's where the assumption breaks down...
>     newAgg = removeReducer.apply(newAgg, value.oldValue);
> }
>
> 4. Here's what I think happened. This processor saw an event like
> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block, and
> (since "oldValue != null") would go into the second block. I think that in
> the normal case we can rely on the invariant that any value we get as an
> "oldValue" is one that we've previously seen ( as "newValue" ).
> Consequently, we should be able to assume that if we get a non-null
> "oldValue", "newAgg" will also not be null (because we would have written
> it to the store back when we saw it as "newValue" and then retrieved it
> just now as "newAgg = oldAgg").
>
> However, if subtopology2, along with KTABLE-SELECT-0000000006
> and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
> "table") has already emitted some values, then we might in fact receive an
> event with some "oldValue" that we have in fact never seen before (because (
> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when it
> was first emitted as a "newValue").
>
> This would violate our assumption, and we would unintentionally send a
> "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
> If you want to double-check my reasoning, you should be able to do so in
> the debugger with a breakpoint in KTableReduce.
>
>
> tl;dr: Supposing you reset the app when the topology changes, I think that
> you should be able to rely on non-null aggregates being passed in to *both*
> the adder and subtractor in a reduce.
>
> I would be in favor, as you suggested, of adding an explicit check and
> throwing an exception if the aggregate is ever null at those points. This
> would actually help us detect if the topology has changed unexpectedly and
> shut down, hopefully before any damage is done. I'll send a PR and see what
> everyone thinks.
>
> Does this all seem like it adds up to you?
> -John
>
>
> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <vas...@sulatskov.net>
> wrote:
>
> > Hi John,
> >
> > Thanks for your reply. I am not sure if this behavior I've observed is
> > a bug or not, as I've not been resetting my application properly. On
> > the other hand if the subtractor or adder in the reduce operation are
> > never supposed to be called with null aggregator value, perhaps it
> > would make sense to put a null check in the table reduce
> > implementation to detect an application entering an invalid state. A
> > bit like a check for topics having the same number of partitions when
> > doing a join.
> >
> > Here's some information about my tests. Hope that can be useful:
> >
> > Topology at start:
> >
> > 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
> >    Sub-topology: 0
> >     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >       --> KSTREAM-MAP-0000000001
> >     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >       --> KSTREAM-FILTER-0000000004
> >       <-- KSTREAM-SOURCE-0000000000
> >     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >       --> KSTREAM-SINK-0000000003
> >       <-- KSTREAM-MAP-0000000001
> >     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >       <-- KSTREAM-FILTER-0000000004
> >
> >   Sub-topology: 1
> >     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >       --> KSTREAM-REDUCE-0000000002
> >     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >       --> KTABLE-TOSTREAM-0000000006
> >       <-- KSTREAM-SOURCE-0000000005
> >     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
> >       --> KSTREAM-SINK-0000000007
> >       <-- KSTREAM-REDUCE-0000000002
> >     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
> >       <-- KTABLE-TOSTREAM-0000000006
> >
> > This topology just takes data from the source topic "slope" which
> > produces messages like this:
> >
> > key1
> > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > key3
> > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > key2
> > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> > key3
> > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> > key1
> > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> > key2
> > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >
> > Every second, there are 3 new messages arrive from "slope" topic for
> > keys key1, key2 and key3, with constantly increasing value.
> > Data is transformed so that the original key is also tracked in the
> > message value, grouped by key, and windowed with a custom window, and
> > reduced with a dummy reduce operation to make a KTable.
> > KTable is converted back to a stream, and sent to a topic (just for
> > debugging purposes).
> >
> > Here's the source (it's kafka-streams-scala for the most part). Also
> > please ignore silly classes, it's obviously a test:
> >
> >     val slopeTable = builder
> >       .stream[String, TimedValue]("slope")
> >       .map(
> >         (key, value) =>
> >           (
> >             StringWrapper(key),
> >             TimedValueWithKey(value = value.value, timestamp =
> > value.timestamp, key = key)
> >         )
> >       )
> >       .groupByKey
> >       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
> >       .reduceMat((aggValue, newValue) => newValue, "table")
> >
> >     slopeTable.toStream
> >       .to("slope-table")
> >
> > Topology after change without a proper reset:
> >
> > 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
> >    Sub-topology: 0
> >     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >       --> KSTREAM-MAP-0000000001
> >     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >       --> KSTREAM-FILTER-0000000004
> >       <-- KSTREAM-SOURCE-0000000000
> >     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >       --> KSTREAM-SINK-0000000003
> >       <-- KSTREAM-MAP-0000000001
> >     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >       <-- KSTREAM-FILTER-0000000004
> >
> >   Sub-topology: 1
> >     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >       --> KSTREAM-REDUCE-0000000002
> >     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
> >       <-- KSTREAM-SOURCE-0000000005
> >     Processor: KTABLE-SELECT-0000000006 (stores: [])
> >       --> KSTREAM-SINK-0000000007
> >       <-- KSTREAM-REDUCE-0000000002
> >     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
> >       --> KSTREAM-SINK-0000000013
> >       <-- KSTREAM-REDUCE-0000000002
> >     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
> >       <-- KTABLE-SELECT-0000000006
> >     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
> >       <-- KTABLE-TOSTREAM-0000000012
> >
> >   Sub-topology: 2
> >     Source: KSTREAM-SOURCE-0000000008 (topics:
> > [aggregated-table-repartition])
> >       --> KTABLE-REDUCE-0000000009
> >     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
> >       --> KTABLE-TOSTREAM-0000000010
> >       <-- KSTREAM-SOURCE-0000000008
> >     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
> >       --> KSTREAM-SINK-0000000011
> >       <-- KTABLE-REDUCE-0000000009
> >     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
> >       <-- KTABLE-TOSTREAM-0000000010
> >
> > Here's the source of the sub-topology that does table aggregation:
> >
> >     slopeTable
> >       .groupBy(
> >         (key, value) => (new Windowed(StringWrapper("dummykey"),
> > key.window()), value)
> >       )
> >       .reduceMat(adder = (aggValue, newValue) => {
> >         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
> >         val agg = Option(aggValue)
> >         TimedValueWithKey(
> >           value = agg.map(_.value).getOrElse(0) + newValue.value,
> >           timestamp =
> >
> > Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> > newValue.timestamp),
> >           key = "reduced"
> >         )
> >       }, subtractor = (aggValue, newValue) => {
> >         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
> >         val agg = Option(aggValue)
> >         TimedValueWithKey(
> >           value = agg.map(_.value).getOrElse(0) - newValue.value,
> >           timestamp =
> >
> > Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> > newValue.timestamp),
> >           key = "reduced"
> >         )
> >       }, "aggregated-table")
> >       .toStream
> >       .to("slope-aggregated-table")
> >
> > I log all calls to adder and subtractor, so I am able to see what's
> > going on there, as well as I track the original keys of the aggregated
> > values and their timestamps, so it's relatively easy to see how the
> > data goes through this topology
> >
> > In order to reproduce this behavior I need to:
> > 1. Start a full topology (with table aggregation)
> > 2. Start without table aggregation (no app reset)
> > 3. Start with table aggregation (no app reset)
> >
> > Bellow is an interpretation of the adder/subtractor logs for a given
> > key/window in the chronological order
> >
> > SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> > ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> > SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
> > ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
> > SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
> > ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
> >
> > And in the end the last value that's materialized for that window
> > (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
> > for a single key between some point in the middle of the window and at
> > the end of the window, times 3. As opposed to the expected value of
> > 751 * 3 = 2253 (sum of last values in a time window for all keys being
> > aggregated).
> >
> > It's clear to me that I should do an application reset, but I also
> > would like to understand, should I expect adder/subtractor being
> > called with null aggValue, or is it a clear sign that something went
> > horribly wrong?
> >
> > On Fri, Jul 13, 2018 at 12:19 AM John Roesler <j...@confluent.io> wrote:
> > >
> > > Hi Vasily,
> > >
> > > Thanks for the email.
> > >
> > > To answer your question: you should reset the application basically any
> > > time you change the topology. Some transitions are safe, but others will
> > > result in data loss or corruption. Rather than try to reason about which
> > is
> > > which, it's much safer just to either reset the app or not change it (if
> > it
> > > has important state).
> > >
> > > Beyond changes that you make to the topology, we spend a lot of effort to
> > > try and make sure that different versions of Streams will produce the
> > same
> > > topology, so unless the release notes say otherwise, you should be able
> > to
> > > upgrade without a reset.
> > >
> > >
> > > I can't say right now whether those wacky behaviors are bugs or the
> > result
> > > of changing the topology without a reset. Or if they are correct but
> > > surprising behavior somehow. I'll look into it tomorrow. Do feel free to
> > > open a Jira ticket if you think you have found a bug, especially if you
> > can
> > > describe a repro. Knowing your topology before and after the change would
> > > also be immensely helpful. You can print it with Topology.describe().
> > >
> > > Regardless, I'll make a note to take a look at the code tomorrow and try
> > to
> > > decide if you should expect these behaviors with "clean" topology
> > changes.
> > >
> > > Thanks,
> > > -John
> > >
> > > On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <vas...@sulatskov.net>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am doing some experiments with kafka-streams KGroupedTable
> > > > aggregation, and admittedly I am not wiping data properly on each
> > > > restart, partially because I also wonder what would happen if you
> > > > change a streams topology without doing a proper reset.
> > > >
> > > > I've noticed that from time to time, kafka-streams
> > > > KGroupedTable.reduce() can call subtractor function with null
> > > > aggregator value, and if you try to work around that, by interpreting
> > > > null aggregator value as zero for numeric value you get incorrect
> > > > aggregation result.
> > > >
> > > > I do understand that the proper way of handling this is to do a reset
> > > > on topology changes, but I'd like to understand if there's any
> > > > legitmate case when kafka-streams can call an adder or a substractor
> > > > with null aggregator value, and should I plan for this, or should I
> > > > interpret this as an invalid state, and terminate the application, and
> > > > do a proper reset?
> > > >
> > > > Also, I can't seem to find a guide which explains when application
> > > > reset is necessary. Intuitively it seems that it should be done every
> > > > time a topology changes. Any other cases?
> > > >
> > > > I tried to debug where the null value comes from and it seems that
> > > > KTableReduce.process() is getting called with Change<V> value with
> > > > newValue == null, and some non-null oldValue. Which leads to and to
> > > > subtractor being called with null aggregator value. I wonder how it is
> > > > possible to have an old value for a key without a new value (does it
> > > > happen because of the auto commit interval?).
> > > >
> > > > I've also noticed that it's possible for an input value from a topic
> > > > to bypass aggregation function entirely and be directly transmitted to
> > > > the output in certain cases: oldAgg is null, newValue is not null and
> > > > oldValue is null - in that case newValue will be transmitted directly
> > > > to the output. I suppose it's the correct behaviour, but feels a bit
> > > > weird nonetheless. And I've actually been able to observe this
> > > > behaviour in practice. I suppose it's also caused by this happening
> > > > right before a commit happens, and the message is sent to a changelog
> > > > topic.
> > > >
> > > > Please can someone with more knowledge shed some light on these issues?
> > > >
> > > > --
> > > > Best regards,
> > > > Vasily Sulatskov
> > > >
> >
> >
> >
> > --
> > Best regards,
> > Vasily Sulatskov
> >



-- 
Best regards,
Vasily Sulatskov

Reply via email to