Hi John, Thanks a lot for you explanation. It does make much more sense now.
The Jira issue I think is pretty well explained (with a reference to this thread). And I've lest my 2 cents in the pull request. You are right I didn't notice that repartition topic contains the same message effectively twice, and 0/1 bytes are non-visible, so when I used kafka-console-consumer I didn't notice that. So I have a quick suggestion here, wouldn't it make sense to change 0 and 1 bytes to something that has visible corresponding ascii characters, say + and -, as these messages are effectively commands to reducer to execute either an addition or subtraction? On a more serious, side, can you please explain temporal aspects of how change messages are handled? More specifically, is it guaranteed that both Change(newValue, null) and Change(null, oldValue) are handled before a new aggregated value is comitted to an output topic? Change(newValue, null) and Change(null, oldValue) are delivered as two separate messages via a kafka topic, and when they are read from a topic (possibly on a different machine where a commit interval is asynchronous to a machine that's put these changes into a topic) can it happen so a Change(newValue, null) is processed by a KTableReduceProcessor, the value of the aggregator is updated, and committed to the changelog topic, and a Change(null, oldValue) is processed only in the next commit interval? If I am understand this correctly that would mean that in an aggregated table an incorrect aggregated value will be observed briefly, before being eventually corrected. Can that happen? Or I can't see something that would make it impossible? On Fri, Jul 13, 2018 at 8:05 PM John Roesler <j...@confluent.io> wrote: > > Hi Vasily, > > I'm glad you're making me look at this; it's good homework for me! > > This is very non-obvious, but here's what happens: > > KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits > new/old Change pairs as the value. > > Next is the Select (aka GroupBy). In the DSL code, this is the > KTableRepartitionMap (we call it a repartition when you select a new key, > since the new keys may belong to different partitions). > KTableRepartitionMap is a processor that does two things: > 1. it maps K => K1 (new keys) and V => V1 (new values) > 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new, > null)] > In other words, it turns each Change event into two events: a retraction > and an update > > Next comes the reduce operation. In building the processor node for this > operation, we create the sink, repartition topic, and source, followed by > the actual Reduce node. So if you want to look at how the changes get > serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate. > You'll see that sink and source a ChangedSerializer and ChangedDeserializer. > > By looking into those implementations, I found that they depend on each > Change containing just one of new OR old. They serialize the underlying > value using the serde you provide, along with a single byte that signifies > if the serialized value is the new or old value, which the deserializer > uses on the receiving end to turn it back into a Change(new, null) or > Change(null, old) as appropriate. This is why the repartition topic looks > like it's just the raw data. It basically is, except for the magic byte. > > Does that make sense? > > Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and > https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and > leaving any feedback you have? > > Thanks, > -John > > On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <vas...@sulatskov.net> > wrote: > > > Hi John, > > > > Thanks for your explanation. > > > > I have an answer to the practical question, i.e. a null aggregator > > value should be interpreted as a fatal application error. > > > > On the other hand, looking at the app topology, I see that a message > > from KSTREAM-REDUCE-0000000002 / "table" goes goes to > > KTABLE-SELECT-0000000006 which in turn forwards data to > > KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at > > this point I assume that data goes back to kafka into a *-repartition > > topic, after that the message is read from kafka by > > KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]), > > and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores: > > [aggregated-table]), where the actual aggregation takes place. What I > > don't get is where this Change value comes from, I mean if it's been > > produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the > > message goes through kafka where it gets serialized, and looking at > > kafka "repartition" topic, it contains regular values, not a pair of > > old/new. > > > > As far as I understand, Change is a purely in-memory representation of > > the state for a particular key, and at no point it's serialized back > > to kafka, yet somehow this Change values makes it to reducer. I feel > > like I am missing something here. Could you please clarify this? > > > > Can you please point me to a place in kafka-streams sources where a > > Change of newValue/oldValue is produced, so I could take a look? I > > found KTableReduce implementation, but can't find who makes these > > Change value. > > On Fri, Jul 13, 2018 at 6:17 PM John Roesler <j...@confluent.io> wrote: > > > > > > Hi again Vasily, > > > > > > Ok, it looks to me like this behavior is the result of the un-clean > > > topology change. > > > > > > Just in case you're interested, here's what I think happened. > > > > > > 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table" > > ) > > > internally emits pairs of "oldValue"/"newValue" . (side-note: It's by > > > forwarding both the old and new value that we are able to maintain > > > aggregates using the subtractor/adder pairs) > > > > > > 2. In the full topology, these old/new pairs go through some > > > transformations, but still in some form eventually make their way down to > > > the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table"). > > > > > > 3. The reduce processor logic looks like this: > > > final V oldAgg = store.get(key); > > > V newAgg = oldAgg; > > > > > > // first try to add the new value > > > if (value.newValue != null) { > > > if (newAgg == null) { > > > newAgg = value.newValue; > > > } else { > > > newAgg = addReducer.apply(newAgg, value.newValue); > > > } > > > } > > > > > > // then try to remove the old value > > > if (value.oldValue != null) { > > > // Here's where the assumption breaks down... > > > newAgg = removeReducer.apply(newAgg, value.oldValue); > > > } > > > > > > 4. Here's what I think happened. This processor saw an event like > > > {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block, > > and > > > (since "oldValue != null") would go into the second block. I think that > > in > > > the normal case we can rely on the invariant that any value we get as an > > > "oldValue" is one that we've previously seen ( as "newValue" ). > > > Consequently, we should be able to assume that if we get a non-null > > > "oldValue", "newAgg" will also not be null (because we would have written > > > it to the store back when we saw it as "newValue" and then retrieved it > > > just now as "newAgg = oldAgg"). > > > > > > However, if subtopology2, along with KTABLE-SELECT-0000000006 > > > and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 / > > > "table") has already emitted some values, then we might in fact receive > > an > > > event with some "oldValue" that we have in fact never seen before > > (because ( > > > KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when > > it > > > was first emitted as a "newValue"). > > > > > > This would violate our assumption, and we would unintentionally send a > > > "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor). > > > If you want to double-check my reasoning, you should be able to do so in > > > the debugger with a breakpoint in KTableReduce. > > > > > > > > > tl;dr: Supposing you reset the app when the topology changes, I think > > that > > > you should be able to rely on non-null aggregates being passed in to > > *both* > > > the adder and subtractor in a reduce. > > > > > > I would be in favor, as you suggested, of adding an explicit check and > > > throwing an exception if the aggregate is ever null at those points. This > > > would actually help us detect if the topology has changed unexpectedly > > and > > > shut down, hopefully before any damage is done. I'll send a PR and see > > what > > > everyone thinks. > > > > > > Does this all seem like it adds up to you? > > > -John > > > > > > > > > On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <vas...@sulatskov.net> > > > wrote: > > > > > > > Hi John, > > > > > > > > Thanks for your reply. I am not sure if this behavior I've observed is > > > > a bug or not, as I've not been resetting my application properly. On > > > > the other hand if the subtractor or adder in the reduce operation are > > > > never supposed to be called with null aggregator value, perhaps it > > > > would make sense to put a null check in the table reduce > > > > implementation to detect an application entering an invalid state. A > > > > bit like a check for topics having the same number of partitions when > > > > doing a join. > > > > > > > > Here's some information about my tests. Hope that can be useful: > > > > > > > > Topology at start: > > > > > > > > 2018-07-13 10:29:48 [main] INFO TableAggregationTest - Topologies: > > > > Sub-topology: 0 > > > > Source: KSTREAM-SOURCE-0000000000 (topics: [slope]) > > > > --> KSTREAM-MAP-0000000001 > > > > Processor: KSTREAM-MAP-0000000001 (stores: []) > > > > --> KSTREAM-FILTER-0000000004 > > > > <-- KSTREAM-SOURCE-0000000000 > > > > Processor: KSTREAM-FILTER-0000000004 (stores: []) > > > > --> KSTREAM-SINK-0000000003 > > > > <-- KSTREAM-MAP-0000000001 > > > > Sink: KSTREAM-SINK-0000000003 (topic: table-repartition) > > > > <-- KSTREAM-FILTER-0000000004 > > > > > > > > Sub-topology: 1 > > > > Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition]) > > > > --> KSTREAM-REDUCE-0000000002 > > > > Processor: KSTREAM-REDUCE-0000000002 (stores: [table]) > > > > --> KTABLE-TOSTREAM-0000000006 > > > > <-- KSTREAM-SOURCE-0000000005 > > > > Processor: KTABLE-TOSTREAM-0000000006 (stores: []) > > > > --> KSTREAM-SINK-0000000007 > > > > <-- KSTREAM-REDUCE-0000000002 > > > > Sink: KSTREAM-SINK-0000000007 (topic: slope-table) > > > > <-- KTABLE-TOSTREAM-0000000006 > > > > > > > > This topology just takes data from the source topic "slope" which > > > > produces messages like this: > > > > > > > > key1 > > > > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"} > > > > key3 > > > > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"} > > > > key2 > > > > {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"} > > > > key3 > > > > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"} > > > > key1 > > > > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"} > > > > key2 > > > > {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"} > > > > > > > > Every second, there are 3 new messages arrive from "slope" topic for > > > > keys key1, key2 and key3, with constantly increasing value. > > > > Data is transformed so that the original key is also tracked in the > > > > message value, grouped by key, and windowed with a custom window, and > > > > reduced with a dummy reduce operation to make a KTable. > > > > KTable is converted back to a stream, and sent to a topic (just for > > > > debugging purposes). > > > > > > > > Here's the source (it's kafka-streams-scala for the most part). Also > > > > please ignore silly classes, it's obviously a test: > > > > > > > > val slopeTable = builder > > > > .stream[String, TimedValue]("slope") > > > > .map( > > > > (key, value) => > > > > ( > > > > StringWrapper(key), > > > > TimedValueWithKey(value = value.value, timestamp = > > > > value.timestamp, key = key) > > > > ) > > > > ) > > > > .groupByKey > > > > .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1)) > > > > .reduceMat((aggValue, newValue) => newValue, "table") > > > > > > > > slopeTable.toStream > > > > .to("slope-table") > > > > > > > > Topology after change without a proper reset: > > > > > > > > 2018-07-13 10:38:32 [main] INFO TableAggregationTest - Topologies: > > > > Sub-topology: 0 > > > > Source: KSTREAM-SOURCE-0000000000 (topics: [slope]) > > > > --> KSTREAM-MAP-0000000001 > > > > Processor: KSTREAM-MAP-0000000001 (stores: []) > > > > --> KSTREAM-FILTER-0000000004 > > > > <-- KSTREAM-SOURCE-0000000000 > > > > Processor: KSTREAM-FILTER-0000000004 (stores: []) > > > > --> KSTREAM-SINK-0000000003 > > > > <-- KSTREAM-MAP-0000000001 > > > > Sink: KSTREAM-SINK-0000000003 (topic: table-repartition) > > > > <-- KSTREAM-FILTER-0000000004 > > > > > > > > Sub-topology: 1 > > > > Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition]) > > > > --> KSTREAM-REDUCE-0000000002 > > > > Processor: KSTREAM-REDUCE-0000000002 (stores: [table]) > > > > --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012 > > > > <-- KSTREAM-SOURCE-0000000005 > > > > Processor: KTABLE-SELECT-0000000006 (stores: []) > > > > --> KSTREAM-SINK-0000000007 > > > > <-- KSTREAM-REDUCE-0000000002 > > > > Processor: KTABLE-TOSTREAM-0000000012 (stores: []) > > > > --> KSTREAM-SINK-0000000013 > > > > <-- KSTREAM-REDUCE-0000000002 > > > > Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition) > > > > <-- KTABLE-SELECT-0000000006 > > > > Sink: KSTREAM-SINK-0000000013 (topic: slope-table) > > > > <-- KTABLE-TOSTREAM-0000000012 > > > > > > > > Sub-topology: 2 > > > > Source: KSTREAM-SOURCE-0000000008 (topics: > > > > [aggregated-table-repartition]) > > > > --> KTABLE-REDUCE-0000000009 > > > > Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table]) > > > > --> KTABLE-TOSTREAM-0000000010 > > > > <-- KSTREAM-SOURCE-0000000008 > > > > Processor: KTABLE-TOSTREAM-0000000010 (stores: []) > > > > --> KSTREAM-SINK-0000000011 > > > > <-- KTABLE-REDUCE-0000000009 > > > > Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table) > > > > <-- KTABLE-TOSTREAM-0000000010 > > > > > > > > Here's the source of the sub-topology that does table aggregation: > > > > > > > > slopeTable > > > > .groupBy( > > > > (key, value) => (new Windowed(StringWrapper("dummykey"), > > > > key.window()), value) > > > > ) > > > > .reduceMat(adder = (aggValue, newValue) => { > > > > log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue") > > > > val agg = Option(aggValue) > > > > TimedValueWithKey( > > > > value = agg.map(_.value).getOrElse(0) + newValue.value, > > > > timestamp = > > > > > > > > Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp), > > > > newValue.timestamp), > > > > key = "reduced" > > > > ) > > > > }, subtractor = (aggValue, newValue) => { > > > > log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue") > > > > val agg = Option(aggValue) > > > > TimedValueWithKey( > > > > value = agg.map(_.value).getOrElse(0) - newValue.value, > > > > timestamp = > > > > > > > > Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp), > > > > newValue.timestamp), > > > > key = "reduced" > > > > ) > > > > }, "aggregated-table") > > > > .toStream > > > > .to("slope-aggregated-table") > > > > > > > > I log all calls to adder and subtractor, so I am able to see what's > > > > going on there, as well as I track the original keys of the aggregated > > > > values and their timestamps, so it's relatively easy to see how the > > > > data goes through this topology > > > > > > > > In order to reproduce this behavior I need to: > > > > 1. Start a full topology (with table aggregation) > > > > 2. Start without table aggregation (no app reset) > > > > 3. Start with table aggregation (no app reset) > > > > > > > > Bellow is an interpretation of the adder/subtractor logs for a given > > > > key/window in the chronological order > > > > > > > > SUB: newValue=(key2, 732, 10:50:40) aggValue=null > > > > ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40) > > > > SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59) > > > > ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59) > > > > SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59) > > > > ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59) > > > > > > > > And in the end the last value that's materialized for that window > > > > (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value > > > > for a single key between some point in the middle of the window and at > > > > the end of the window, times 3. As opposed to the expected value of > > > > 751 * 3 = 2253 (sum of last values in a time window for all keys being > > > > aggregated). > > > > > > > > It's clear to me that I should do an application reset, but I also > > > > would like to understand, should I expect adder/subtractor being > > > > called with null aggValue, or is it a clear sign that something went > > > > horribly wrong? > > > > > > > > On Fri, Jul 13, 2018 at 12:19 AM John Roesler <j...@confluent.io> > > wrote: > > > > > > > > > > Hi Vasily, > > > > > > > > > > Thanks for the email. > > > > > > > > > > To answer your question: you should reset the application basically > > any > > > > > time you change the topology. Some transitions are safe, but others > > will > > > > > result in data loss or corruption. Rather than try to reason about > > which > > > > is > > > > > which, it's much safer just to either reset the app or not change it > > (if > > > > it > > > > > has important state). > > > > > > > > > > Beyond changes that you make to the topology, we spend a lot of > > effort to > > > > > try and make sure that different versions of Streams will produce the > > > > same > > > > > topology, so unless the release notes say otherwise, you should be > > able > > > > to > > > > > upgrade without a reset. > > > > > > > > > > > > > > > I can't say right now whether those wacky behaviors are bugs or the > > > > result > > > > > of changing the topology without a reset. Or if they are correct but > > > > > surprising behavior somehow. I'll look into it tomorrow. Do feel > > free to > > > > > open a Jira ticket if you think you have found a bug, especially if > > you > > > > can > > > > > describe a repro. Knowing your topology before and after the change > > would > > > > > also be immensely helpful. You can print it with Topology.describe(). > > > > > > > > > > Regardless, I'll make a note to take a look at the code tomorrow and > > try > > > > to > > > > > decide if you should expect these behaviors with "clean" topology > > > > changes. > > > > > > > > > > Thanks, > > > > > -John > > > > > > > > > > On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov < > > vas...@sulatskov.net> > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > I am doing some experiments with kafka-streams KGroupedTable > > > > > > aggregation, and admittedly I am not wiping data properly on each > > > > > > restart, partially because I also wonder what would happen if you > > > > > > change a streams topology without doing a proper reset. > > > > > > > > > > > > I've noticed that from time to time, kafka-streams > > > > > > KGroupedTable.reduce() can call subtractor function with null > > > > > > aggregator value, and if you try to work around that, by > > interpreting > > > > > > null aggregator value as zero for numeric value you get incorrect > > > > > > aggregation result. > > > > > > > > > > > > I do understand that the proper way of handling this is to do a > > reset > > > > > > on topology changes, but I'd like to understand if there's any > > > > > > legitmate case when kafka-streams can call an adder or a > > substractor > > > > > > with null aggregator value, and should I plan for this, or should I > > > > > > interpret this as an invalid state, and terminate the application, > > and > > > > > > do a proper reset? > > > > > > > > > > > > Also, I can't seem to find a guide which explains when application > > > > > > reset is necessary. Intuitively it seems that it should be done > > every > > > > > > time a topology changes. Any other cases? > > > > > > > > > > > > I tried to debug where the null value comes from and it seems that > > > > > > KTableReduce.process() is getting called with Change<V> value with > > > > > > newValue == null, and some non-null oldValue. Which leads to and to > > > > > > subtractor being called with null aggregator value. I wonder how > > it is > > > > > > possible to have an old value for a key without a new value (does > > it > > > > > > happen because of the auto commit interval?). > > > > > > > > > > > > I've also noticed that it's possible for an input value from a > > topic > > > > > > to bypass aggregation function entirely and be directly > > transmitted to > > > > > > the output in certain cases: oldAgg is null, newValue is not null > > and > > > > > > oldValue is null - in that case newValue will be transmitted > > directly > > > > > > to the output. I suppose it's the correct behaviour, but feels a > > bit > > > > > > weird nonetheless. And I've actually been able to observe this > > > > > > behaviour in practice. I suppose it's also caused by this happening > > > > > > right before a commit happens, and the message is sent to a > > changelog > > > > > > topic. > > > > > > > > > > > > Please can someone with more knowledge shed some light on these > > issues? > > > > > > > > > > > > -- > > > > > > Best regards, > > > > > > Vasily Sulatskov > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best regards, > > > > Vasily Sulatskov > > > > > > > > > > > > -- > > Best regards, > > Vasily Sulatskov > > -- Best regards, Vasily Sulatskov