Hi again Vasily,

Ok, it looks to me like this behavior is the result of the un-clean
topology change.

Just in case you're interested, here's what I think happened.

1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table" )
internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
forwarding both the old and new value that we are able to maintain
aggregates using the subtractor/adder pairs)

2. In the full topology, these old/new pairs go through some
transformations, but still in some form eventually make their way down to
the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").

3. The reduce processor logic looks like this:
final V oldAgg = store.get(key);
V newAgg = oldAgg;

// first try to add the new value
if (value.newValue != null) {
    if (newAgg == null) {
        newAgg = value.newValue;
    } else {
        newAgg = addReducer.apply(newAgg, value.newValue);
    }
}

// then try to remove the old value
if (value.oldValue != null) {
    // Here's where the assumption breaks down...
    newAgg = removeReducer.apply(newAgg, value.oldValue);
}

4. Here's what I think happened. This processor saw an event like
{new=null, old=(key2, 732, 10:50:40)}. This would skip the first block, and
(since "oldValue != null") would go into the second block. I think that in
the normal case we can rely on the invariant that any value we get as an
"oldValue" is one that we've previously seen ( as "newValue" ).
Consequently, we should be able to assume that if we get a non-null
"oldValue", "newAgg" will also not be null (because we would have written
it to the store back when we saw it as "newValue" and then retrieved it
just now as "newAgg = oldAgg").

However, if subtopology2, along with KTABLE-SELECT-0000000006
and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
"table") has already emitted some values, then we might in fact receive an
event with some "oldValue" that we have in fact never seen before (because (
KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when it
was first emitted as a "newValue").

This would violate our assumption, and we would unintentionally send a
"null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
If you want to double-check my reasoning, you should be able to do so in
the debugger with a breakpoint in KTableReduce.


tl;dr: Supposing you reset the app when the topology changes, I think that
you should be able to rely on non-null aggregates being passed in to *both*
the adder and subtractor in a reduce.

I would be in favor, as you suggested, of adding an explicit check and
throwing an exception if the aggregate is ever null at those points. This
would actually help us detect if the topology has changed unexpectedly and
shut down, hopefully before any damage is done. I'll send a PR and see what
everyone thinks.

Does this all seem like it adds up to you?
-John


On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <vas...@sulatskov.net>
wrote:

> Hi John,
>
> Thanks for your reply. I am not sure if this behavior I've observed is
> a bug or not, as I've not been resetting my application properly. On
> the other hand if the subtractor or adder in the reduce operation are
> never supposed to be called with null aggregator value, perhaps it
> would make sense to put a null check in the table reduce
> implementation to detect an application entering an invalid state. A
> bit like a check for topics having the same number of partitions when
> doing a join.
>
> Here's some information about my tests. Hope that can be useful:
>
> Topology at start:
>
> 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>       --> KSTREAM-MAP-0000000001
>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>       --> KSTREAM-FILTER-0000000004
>       <-- KSTREAM-SOURCE-0000000000
>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>       --> KSTREAM-SINK-0000000003
>       <-- KSTREAM-MAP-0000000001
>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>       <-- KSTREAM-FILTER-0000000004
>
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>       --> KSTREAM-REDUCE-0000000002
>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>       --> KTABLE-TOSTREAM-0000000006
>       <-- KSTREAM-SOURCE-0000000005
>     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
>       --> KSTREAM-SINK-0000000007
>       <-- KSTREAM-REDUCE-0000000002
>     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
>       <-- KTABLE-TOSTREAM-0000000006
>
> This topology just takes data from the source topic "slope" which
> produces messages like this:
>
> key1
> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> key3
> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> key2
> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> key3
> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> key1
> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> key2
> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>
> Every second, there are 3 new messages arrive from "slope" topic for
> keys key1, key2 and key3, with constantly increasing value.
> Data is transformed so that the original key is also tracked in the
> message value, grouped by key, and windowed with a custom window, and
> reduced with a dummy reduce operation to make a KTable.
> KTable is converted back to a stream, and sent to a topic (just for
> debugging purposes).
>
> Here's the source (it's kafka-streams-scala for the most part). Also
> please ignore silly classes, it's obviously a test:
>
>     val slopeTable = builder
>       .stream[String, TimedValue]("slope")
>       .map(
>         (key, value) =>
>           (
>             StringWrapper(key),
>             TimedValueWithKey(value = value.value, timestamp =
> value.timestamp, key = key)
>         )
>       )
>       .groupByKey
>       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
>       .reduceMat((aggValue, newValue) => newValue, "table")
>
>     slopeTable.toStream
>       .to("slope-table")
>
> Topology after change without a proper reset:
>
> 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>       --> KSTREAM-MAP-0000000001
>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>       --> KSTREAM-FILTER-0000000004
>       <-- KSTREAM-SOURCE-0000000000
>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>       --> KSTREAM-SINK-0000000003
>       <-- KSTREAM-MAP-0000000001
>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>       <-- KSTREAM-FILTER-0000000004
>
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>       --> KSTREAM-REDUCE-0000000002
>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
>       <-- KSTREAM-SOURCE-0000000005
>     Processor: KTABLE-SELECT-0000000006 (stores: [])
>       --> KSTREAM-SINK-0000000007
>       <-- KSTREAM-REDUCE-0000000002
>     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
>       --> KSTREAM-SINK-0000000013
>       <-- KSTREAM-REDUCE-0000000002
>     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
>       <-- KTABLE-SELECT-0000000006
>     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
>       <-- KTABLE-TOSTREAM-0000000012
>
>   Sub-topology: 2
>     Source: KSTREAM-SOURCE-0000000008 (topics:
> [aggregated-table-repartition])
>       --> KTABLE-REDUCE-0000000009
>     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
>       --> KTABLE-TOSTREAM-0000000010
>       <-- KSTREAM-SOURCE-0000000008
>     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
>       --> KSTREAM-SINK-0000000011
>       <-- KTABLE-REDUCE-0000000009
>     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
>       <-- KTABLE-TOSTREAM-0000000010
>
> Here's the source of the sub-topology that does table aggregation:
>
>     slopeTable
>       .groupBy(
>         (key, value) => (new Windowed(StringWrapper("dummykey"),
> key.window()), value)
>       )
>       .reduceMat(adder = (aggValue, newValue) => {
>         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
>         val agg = Option(aggValue)
>         TimedValueWithKey(
>           value = agg.map(_.value).getOrElse(0) + newValue.value,
>           timestamp =
>
> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> newValue.timestamp),
>           key = "reduced"
>         )
>       }, subtractor = (aggValue, newValue) => {
>         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
>         val agg = Option(aggValue)
>         TimedValueWithKey(
>           value = agg.map(_.value).getOrElse(0) - newValue.value,
>           timestamp =
>
> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> newValue.timestamp),
>           key = "reduced"
>         )
>       }, "aggregated-table")
>       .toStream
>       .to("slope-aggregated-table")
>
> I log all calls to adder and subtractor, so I am able to see what's
> going on there, as well as I track the original keys of the aggregated
> values and their timestamps, so it's relatively easy to see how the
> data goes through this topology
>
> In order to reproduce this behavior I need to:
> 1. Start a full topology (with table aggregation)
> 2. Start without table aggregation (no app reset)
> 3. Start with table aggregation (no app reset)
>
> Bellow is an interpretation of the adder/subtractor logs for a given
> key/window in the chronological order
>
> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
>
> And in the end the last value that's materialized for that window
> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
> for a single key between some point in the middle of the window and at
> the end of the window, times 3. As opposed to the expected value of
> 751 * 3 = 2253 (sum of last values in a time window for all keys being
> aggregated).
>
> It's clear to me that I should do an application reset, but I also
> would like to understand, should I expect adder/subtractor being
> called with null aggValue, or is it a clear sign that something went
> horribly wrong?
>
> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <j...@confluent.io> wrote:
> >
> > Hi Vasily,
> >
> > Thanks for the email.
> >
> > To answer your question: you should reset the application basically any
> > time you change the topology. Some transitions are safe, but others will
> > result in data loss or corruption. Rather than try to reason about which
> is
> > which, it's much safer just to either reset the app or not change it (if
> it
> > has important state).
> >
> > Beyond changes that you make to the topology, we spend a lot of effort to
> > try and make sure that different versions of Streams will produce the
> same
> > topology, so unless the release notes say otherwise, you should be able
> to
> > upgrade without a reset.
> >
> >
> > I can't say right now whether those wacky behaviors are bugs or the
> result
> > of changing the topology without a reset. Or if they are correct but
> > surprising behavior somehow. I'll look into it tomorrow. Do feel free to
> > open a Jira ticket if you think you have found a bug, especially if you
> can
> > describe a repro. Knowing your topology before and after the change would
> > also be immensely helpful. You can print it with Topology.describe().
> >
> > Regardless, I'll make a note to take a look at the code tomorrow and try
> to
> > decide if you should expect these behaviors with "clean" topology
> changes.
> >
> > Thanks,
> > -John
> >
> > On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <vas...@sulatskov.net>
> > wrote:
> >
> > > Hi,
> > >
> > > I am doing some experiments with kafka-streams KGroupedTable
> > > aggregation, and admittedly I am not wiping data properly on each
> > > restart, partially because I also wonder what would happen if you
> > > change a streams topology without doing a proper reset.
> > >
> > > I've noticed that from time to time, kafka-streams
> > > KGroupedTable.reduce() can call subtractor function with null
> > > aggregator value, and if you try to work around that, by interpreting
> > > null aggregator value as zero for numeric value you get incorrect
> > > aggregation result.
> > >
> > > I do understand that the proper way of handling this is to do a reset
> > > on topology changes, but I'd like to understand if there's any
> > > legitmate case when kafka-streams can call an adder or a substractor
> > > with null aggregator value, and should I plan for this, or should I
> > > interpret this as an invalid state, and terminate the application, and
> > > do a proper reset?
> > >
> > > Also, I can't seem to find a guide which explains when application
> > > reset is necessary. Intuitively it seems that it should be done every
> > > time a topology changes. Any other cases?
> > >
> > > I tried to debug where the null value comes from and it seems that
> > > KTableReduce.process() is getting called with Change<V> value with
> > > newValue == null, and some non-null oldValue. Which leads to and to
> > > subtractor being called with null aggregator value. I wonder how it is
> > > possible to have an old value for a key without a new value (does it
> > > happen because of the auto commit interval?).
> > >
> > > I've also noticed that it's possible for an input value from a topic
> > > to bypass aggregation function entirely and be directly transmitted to
> > > the output in certain cases: oldAgg is null, newValue is not null and
> > > oldValue is null - in that case newValue will be transmitted directly
> > > to the output. I suppose it's the correct behaviour, but feels a bit
> > > weird nonetheless. And I've actually been able to observe this
> > > behaviour in practice. I suppose it's also caused by this happening
> > > right before a commit happens, and the message is sent to a changelog
> > > topic.
> > >
> > > Please can someone with more knowledge shed some light on these issues?
> > >
> > > --
> > > Best regards,
> > > Vasily Sulatskov
> > >
>
>
>
> --
> Best regards,
> Vasily Sulatskov
>

Reply via email to