Thank you everyone for your explanations, that's been most enlightening. On Wed, Jul 18, 2018 at 2:28 AM Matthias J. Sax <matth...@confluent.io> wrote: > > I see -- sorry for miss-understanding initially. > > I agree that it would be possible to detect. Feel free to file a Jira > for this improvement and maybe pick it up by yourself :) > > > -Matthias > > On 7/17/18 3:01 PM, Vasily Sulatskov wrote: > > Hi, > > > > I do understand that in a general case it's not possible to guarantee > > that newValue and oldValue parts of a Change message arrive to the > > same partitions, and I guess that's not really in the plans, but if I > > correctly understand how it works, it should be possible to detect if > > both newValue and oldValue go to the same partition and keep them > > together, thus improving kafka-streams consistency guarantees. Right? > > > > For example right now I have such a usecase that when I perform > > groupBy on a table, my new keys are computed purely from old keys, and > > not from the value. And handling of such cases (not a general case) > > can be improved. > > On Tue, Jul 17, 2018 at 1:48 AM Matthias J. Sax <matth...@confluent.io> > > wrote: > >> > >> It is not possible to use a single message, because both messages may go > >> to different partitions and may be processed by different applications > >> instances. > >> > >> Note, that the overall KTable state is sharded. Updating a single > >> upstream shard, might required to update two different downstream shards. > >> > >> > >> -Matthias > >> > >> On 7/16/18 2:50 PM, Vasily Sulatskov wrote: > >>> Hi, > >>> > >>> It seems that it wouldn't be that difficult to address: just don't > >>> break Change(newVal, oldVal) into Change(newVal, null) / > >>> Change(oldVal, null) and update aggregator value in one .process() > >>> call. > >>> > >>> Would this change make sense? > >>> On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax <matth...@confluent.io> > >>> wrote: > >>>> > >>>> Vasily, > >>>> > >>>> yes, it can happen. As you noticed, both messages might be processed on > >>>> different machines. Thus, Kafka Streams provides 'eventual consistency' > >>>> guarantees. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 7/16/18 6:51 AM, Vasily Sulatskov wrote: > >>>>> Hi John, > >>>>> > >>>>> Thanks a lot for you explanation. It does make much more sense now. > >>>>> > >>>>> The Jira issue I think is pretty well explained (with a reference to > >>>>> this thread). And I've lest my 2 cents in the pull request. > >>>>> > >>>>> You are right I didn't notice that repartition topic contains the same > >>>>> message effectively twice, and 0/1 bytes are non-visible, so when I > >>>>> used kafka-console-consumer I didn't notice that. So I have a quick > >>>>> suggestion here, wouldn't it make sense to change 0 and 1 bytes to > >>>>> something that has visible corresponding ascii characters, say + and > >>>>> -, as these messages are effectively commands to reducer to execute > >>>>> either an addition or subtraction? > >>>>> > >>>>> On a more serious, side, can you please explain temporal aspects of > >>>>> how change messages are handled? More specifically, is it guaranteed > >>>>> that both Change(newValue, null) and Change(null, oldValue) are > >>>>> handled before a new aggregated value is comitted to an output topic? > >>>>> Change(newValue, null) and Change(null, oldValue) are delivered as two > >>>>> separate messages via a kafka topic, and when they are read from a > >>>>> topic (possibly on a different machine where a commit interval is > >>>>> asynchronous to a machine that's put these changes into a topic) can > >>>>> it happen so a Change(newValue, null) is processed by a > >>>>> KTableReduceProcessor, the value of the aggregator is updated, and > >>>>> committed to the changelog topic, and a Change(null, oldValue) is > >>>>> processed only in the next commit interval? If I am understand this > >>>>> correctly that would mean that in an aggregated table an incorrect > >>>>> aggregated value will be observed briefly, before being eventually > >>>>> corrected. > >>>>> > >>>>> Can that happen? Or I can't see something that would make it impossible? > >>>>> On Fri, Jul 13, 2018 at 8:05 PM John Roesler <j...@confluent.io> wrote: > >>>>>> > >>>>>> Hi Vasily, > >>>>>> > >>>>>> I'm glad you're making me look at this; it's good homework for me! > >>>>>> > >>>>>> This is very non-obvious, but here's what happens: > >>>>>> > >>>>>> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it > >>>>>> emits > >>>>>> new/old Change pairs as the value. > >>>>>> > >>>>>> Next is the Select (aka GroupBy). In the DSL code, this is the > >>>>>> KTableRepartitionMap (we call it a repartition when you select a new > >>>>>> key, > >>>>>> since the new keys may belong to different partitions). > >>>>>> KTableRepartitionMap is a processor that does two things: > >>>>>> 1. it maps K => K1 (new keys) and V => V1 (new values) > >>>>>> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new, > >>>>>> null)] > >>>>>> In other words, it turns each Change event into two events: a > >>>>>> retraction > >>>>>> and an update > >>>>>> > >>>>>> Next comes the reduce operation. In building the processor node for > >>>>>> this > >>>>>> operation, we create the sink, repartition topic, and source, followed > >>>>>> by > >>>>>> the actual Reduce node. So if you want to look at how the changes get > >>>>>> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate. > >>>>>> You'll see that sink and source a ChangedSerializer and > >>>>>> ChangedDeserializer. > >>>>>> > >>>>>> By looking into those implementations, I found that they depend on each > >>>>>> Change containing just one of new OR old. They serialize the underlying > >>>>>> value using the serde you provide, along with a single byte that > >>>>>> signifies > >>>>>> if the serialized value is the new or old value, which the deserializer > >>>>>> uses on the receiving end to turn it back into a Change(new, null) or > >>>>>> Change(null, old) as appropriate. This is why the repartition topic > >>>>>> looks > >>>>>> like it's just the raw data. It basically is, except for the magic > >>>>>> byte. > >>>>>> > >>>>>> Does that make sense? > >>>>>> > >>>>>> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and > >>>>>> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look > >>>>>> and > >>>>>> leaving any feedback you have? > >>>>>> > >>>>>> Thanks, > >>>>>> -John > >>>>>> > >>>>>> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov > >>>>>> <vas...@sulatskov.net> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi John, > >>>>>>> > >>>>>>> Thanks for your explanation. > >>>>>>> > >>>>>>> I have an answer to the practical question, i.e. a null aggregator > >>>>>>> value should be interpreted as a fatal application error. > >>>>>>> > >>>>>>> On the other hand, looking at the app topology, I see that a message > >>>>>>> from KSTREAM-REDUCE-0000000002 / "table" goes goes to > >>>>>>> KTABLE-SELECT-0000000006 which in turn forwards data to > >>>>>>> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at > >>>>>>> this point I assume that data goes back to kafka into a *-repartition > >>>>>>> topic, after that the message is read from kafka by > >>>>>>> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]), > >>>>>>> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores: > >>>>>>> [aggregated-table]), where the actual aggregation takes place. What I > >>>>>>> don't get is where this Change value comes from, I mean if it's been > >>>>>>> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the > >>>>>>> message goes through kafka where it gets serialized, and looking at > >>>>>>> kafka "repartition" topic, it contains regular values, not a pair of > >>>>>>> old/new. > >>>>>>> > >>>>>>> As far as I understand, Change is a purely in-memory representation of > >>>>>>> the state for a particular key, and at no point it's serialized back > >>>>>>> to kafka, yet somehow this Change values makes it to reducer. I feel > >>>>>>> like I am missing something here. Could you please clarify this? > >>>>>>> > >>>>>>> Can you please point me to a place in kafka-streams sources where a > >>>>>>> Change of newValue/oldValue is produced, so I could take a look? I > >>>>>>> found KTableReduce implementation, but can't find who makes these > >>>>>>> Change value. > >>>>>>> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <j...@confluent.io> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> Hi again Vasily, > >>>>>>>> > >>>>>>>> Ok, it looks to me like this behavior is the result of the un-clean > >>>>>>>> topology change. > >>>>>>>> > >>>>>>>> Just in case you're interested, here's what I think happened. > >>>>>>>> > >>>>>>>> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / > >>>>>>>> "table" > >>>>>>> ) > >>>>>>>> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by > >>>>>>>> forwarding both the old and new value that we are able to maintain > >>>>>>>> aggregates using the subtractor/adder pairs) > >>>>>>>> > >>>>>>>> 2. In the full topology, these old/new pairs go through some > >>>>>>>> transformations, but still in some form eventually make their way > >>>>>>>> down to > >>>>>>>> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table"). > >>>>>>>> > >>>>>>>> 3. The reduce processor logic looks like this: > >>>>>>>> final V oldAgg = store.get(key); > >>>>>>>> V newAgg = oldAgg; > >>>>>>>> > >>>>>>>> // first try to add the new value > >>>>>>>> if (value.newValue != null) { > >>>>>>>> if (newAgg == null) { > >>>>>>>> newAgg = value.newValue; > >>>>>>>> } else { > >>>>>>>> newAgg = addReducer.apply(newAgg, value.newValue); > >>>>>>>> } > >>>>>>>> } > >>>>>>>> > >>>>>>>> // then try to remove the old value > >>>>>>>> if (value.oldValue != null) { > >>>>>>>> // Here's where the assumption breaks down... > >>>>>>>> newAgg = removeReducer.apply(newAgg, value.oldValue); > >>>>>>>> } > >>>>>>>> > >>>>>>>> 4. Here's what I think happened. This processor saw an event like > >>>>>>>> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first > >>>>>>>> block, > >>>>>>> and > >>>>>>>> (since "oldValue != null") would go into the second block. I think > >>>>>>>> that > >>>>>>> in > >>>>>>>> the normal case we can rely on the invariant that any value we get > >>>>>>>> as an > >>>>>>>> "oldValue" is one that we've previously seen ( as "newValue" ). > >>>>>>>> Consequently, we should be able to assume that if we get a non-null > >>>>>>>> "oldValue", "newAgg" will also not be null (because we would have > >>>>>>>> written > >>>>>>>> it to the store back when we saw it as "newValue" and then retrieved > >>>>>>>> it > >>>>>>>> just now as "newAgg = oldAgg"). > >>>>>>>> > >>>>>>>> However, if subtopology2, along with KTABLE-SELECT-0000000006 > >>>>>>>> and KSTREAM-SINK-0000000013 get added after > >>>>>>>> (KSTREAM-REDUCE-0000000002 / > >>>>>>>> "table") has already emitted some values, then we might in fact > >>>>>>>> receive > >>>>>>> an > >>>>>>>> event with some "oldValue" that we have in fact never seen before > >>>>>>> (because ( > >>>>>>>> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology > >>>>>>>> when > >>>>>>> it > >>>>>>>> was first emitted as a "newValue"). > >>>>>>>> > >>>>>>>> This would violate our assumption, and we would unintentionally send > >>>>>>>> a > >>>>>>>> "null" as the "newAgg" parameter to the "removeReducer" (aka > >>>>>>>> subtractor). > >>>>>>>> If you want to double-check my reasoning, you should be able to do > >>>>>>>> so in > >>>>>>>> the debugger with a breakpoint in KTableReduce. > >>>>>>>> > >>>>>>>> > >>>>>>>> tl;dr: Supposing you reset the app when the topology changes, I think > >>>>>>> that > >>>>>>>> you should be able to rely on non-null aggregates being passed in to > >>>>>>> *both* > >>>>>>>> the adder and subtractor in a reduce. > >>>>>>>> > >>>>>>>> I would be in favor, as you suggested, of adding an explicit check > >>>>>>>> and > >>>>>>>> throwing an exception if the aggregate is ever null at those points. > >>>>>>>> This > >>>>>>>> would actually help us detect if the topology has changed > >>>>>>>> unexpectedly > >>>>>>> and > >>>>>>>> shut down, hopefully before any damage is done. I'll send a PR and > >>>>>>>> see > >>>>>>> what > >>>>>>>> everyone thinks. > >>>>>>>> > >>>>>>>> Does this all seem like it adds up to you? > >>>>>>>> -John > >>>>>>>> > >>>>>>>> > >>>>>>>> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov > >>>>>>>> <vas...@sulatskov.net> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi John, > >>>>>>>>> > >>>>>>>>> Thanks for your reply. I am not sure if this behavior I've observed > >>>>>>>>> is > >>>>>>>>> a bug or not, as I've not been resetting my application properly. On > >>>>>>>>> the other hand if the subtractor or adder in the reduce operation > >>>>>>>>> are > >>>>>>>>> never supposed to be called with null aggregator value, perhaps it > >>>>>>>>> would make sense to put a null check in the table reduce > >>>>>>>>> implementation to detect an application entering an invalid state. A > >>>>>>>>> bit like a check for topics having the same number of partitions > >>>>>>>>> when > >>>>>>>>> doing a join. > >>>>>>>>> > >>>>>>>>> Here's some information about my tests. Hope that can be useful: > >>>>>>>>> > >>>>>>>>> Topology at start: > >>>>>>>>> > >>>>>>>>> 2018-07-13 10:29:48 [main] INFO TableAggregationTest - Topologies: > >>>>>>>>> Sub-topology: 0 > >>>>>>>>> Source: KSTREAM-SOURCE-0000000000 (topics: [slope]) > >>>>>>>>> --> KSTREAM-MAP-0000000001 > >>>>>>>>> Processor: KSTREAM-MAP-0000000001 (stores: []) > >>>>>>>>> --> KSTREAM-FILTER-0000000004 > >>>>>>>>> <-- KSTREAM-SOURCE-0000000000 > >>>>>>>>> Processor: KSTREAM-FILTER-0000000004 (stores: []) > >>>>>>>>> --> KSTREAM-SINK-0000000003 > >>>>>>>>> <-- KSTREAM-MAP-0000000001 > >>>>>>>>> Sink: KSTREAM-SINK-0000000003 (topic: table-repartition) > >>>>>>>>> <-- KSTREAM-FILTER-0000000004 > >>>>>>>>> > >>>>>>>>> Sub-topology: 1 > >>>>>>>>> Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition]) > >>>>>>>>> --> KSTREAM-REDUCE-0000000002 > >>>>>>>>> Processor: KSTREAM-REDUCE-0000000002 (stores: [table]) > >>>>>>>>> --> KTABLE-TOSTREAM-0000000006 > >>>>>>>>> <-- KSTREAM-SOURCE-0000000005 > >>>>>>>>> Processor: KTABLE-TOSTREAM-0000000006 (stores: []) > >>>>>>>>> --> KSTREAM-SINK-0000000007 > >>>>>>>>> <-- KSTREAM-REDUCE-0000000002 > >>>>>>>>> Sink: KSTREAM-SINK-0000000007 (topic: slope-table) > >>>>>>>>> <-- KTABLE-TOSTREAM-0000000006 > >>>>>>>>> > >>>>>>>>> This topology just takes data from the source topic "slope" which > >>>>>>>>> produces messages like this: > >>>>>>>>> > >>>>>>>>> key1 > >>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"} > >>>>>>>>> key3 > >>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"} > >>>>>>>>> key2 > >>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"} > >>>>>>>>> key3 > >>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"} > >>>>>>>>> key1 > >>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"} > >>>>>>>>> key2 > >>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"} > >>>>>>>>> > >>>>>>>>> Every second, there are 3 new messages arrive from "slope" topic for > >>>>>>>>> keys key1, key2 and key3, with constantly increasing value. > >>>>>>>>> Data is transformed so that the original key is also tracked in the > >>>>>>>>> message value, grouped by key, and windowed with a custom window, > >>>>>>>>> and > >>>>>>>>> reduced with a dummy reduce operation to make a KTable. > >>>>>>>>> KTable is converted back to a stream, and sent to a topic (just for > >>>>>>>>> debugging purposes). > >>>>>>>>> > >>>>>>>>> Here's the source (it's kafka-streams-scala for the most part). Also > >>>>>>>>> please ignore silly classes, it's obviously a test: > >>>>>>>>> > >>>>>>>>> val slopeTable = builder > >>>>>>>>> .stream[String, TimedValue]("slope") > >>>>>>>>> .map( > >>>>>>>>> (key, value) => > >>>>>>>>> ( > >>>>>>>>> StringWrapper(key), > >>>>>>>>> TimedValueWithKey(value = value.value, timestamp = > >>>>>>>>> value.timestamp, key = key) > >>>>>>>>> ) > >>>>>>>>> ) > >>>>>>>>> .groupByKey > >>>>>>>>> .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1)) > >>>>>>>>> .reduceMat((aggValue, newValue) => newValue, "table") > >>>>>>>>> > >>>>>>>>> slopeTable.toStream > >>>>>>>>> .to("slope-table") > >>>>>>>>> > >>>>>>>>> Topology after change without a proper reset: > >>>>>>>>> > >>>>>>>>> 2018-07-13 10:38:32 [main] INFO TableAggregationTest - Topologies: > >>>>>>>>> Sub-topology: 0 > >>>>>>>>> Source: KSTREAM-SOURCE-0000000000 (topics: [slope]) > >>>>>>>>> --> KSTREAM-MAP-0000000001 > >>>>>>>>> Processor: KSTREAM-MAP-0000000001 (stores: []) > >>>>>>>>> --> KSTREAM-FILTER-0000000004 > >>>>>>>>> <-- KSTREAM-SOURCE-0000000000 > >>>>>>>>> Processor: KSTREAM-FILTER-0000000004 (stores: []) > >>>>>>>>> --> KSTREAM-SINK-0000000003 > >>>>>>>>> <-- KSTREAM-MAP-0000000001 > >>>>>>>>> Sink: KSTREAM-SINK-0000000003 (topic: table-repartition) > >>>>>>>>> <-- KSTREAM-FILTER-0000000004 > >>>>>>>>> > >>>>>>>>> Sub-topology: 1 > >>>>>>>>> Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition]) > >>>>>>>>> --> KSTREAM-REDUCE-0000000002 > >>>>>>>>> Processor: KSTREAM-REDUCE-0000000002 (stores: [table]) > >>>>>>>>> --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012 > >>>>>>>>> <-- KSTREAM-SOURCE-0000000005 > >>>>>>>>> Processor: KTABLE-SELECT-0000000006 (stores: []) > >>>>>>>>> --> KSTREAM-SINK-0000000007 > >>>>>>>>> <-- KSTREAM-REDUCE-0000000002 > >>>>>>>>> Processor: KTABLE-TOSTREAM-0000000012 (stores: []) > >>>>>>>>> --> KSTREAM-SINK-0000000013 > >>>>>>>>> <-- KSTREAM-REDUCE-0000000002 > >>>>>>>>> Sink: KSTREAM-SINK-0000000007 (topic: > >>>>>>>>> aggregated-table-repartition) > >>>>>>>>> <-- KTABLE-SELECT-0000000006 > >>>>>>>>> Sink: KSTREAM-SINK-0000000013 (topic: slope-table) > >>>>>>>>> <-- KTABLE-TOSTREAM-0000000012 > >>>>>>>>> > >>>>>>>>> Sub-topology: 2 > >>>>>>>>> Source: KSTREAM-SOURCE-0000000008 (topics: > >>>>>>>>> [aggregated-table-repartition]) > >>>>>>>>> --> KTABLE-REDUCE-0000000009 > >>>>>>>>> Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table]) > >>>>>>>>> --> KTABLE-TOSTREAM-0000000010 > >>>>>>>>> <-- KSTREAM-SOURCE-0000000008 > >>>>>>>>> Processor: KTABLE-TOSTREAM-0000000010 (stores: []) > >>>>>>>>> --> KSTREAM-SINK-0000000011 > >>>>>>>>> <-- KTABLE-REDUCE-0000000009 > >>>>>>>>> Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table) > >>>>>>>>> <-- KTABLE-TOSTREAM-0000000010 > >>>>>>>>> > >>>>>>>>> Here's the source of the sub-topology that does table aggregation: > >>>>>>>>> > >>>>>>>>> slopeTable > >>>>>>>>> .groupBy( > >>>>>>>>> (key, value) => (new Windowed(StringWrapper("dummykey"), > >>>>>>>>> key.window()), value) > >>>>>>>>> ) > >>>>>>>>> .reduceMat(adder = (aggValue, newValue) => { > >>>>>>>>> log.info(s"Called ADD: newValue=$newValue > >>>>>>>>> aggValue=$aggValue") > >>>>>>>>> val agg = Option(aggValue) > >>>>>>>>> TimedValueWithKey( > >>>>>>>>> value = agg.map(_.value).getOrElse(0) + newValue.value, > >>>>>>>>> timestamp = > >>>>>>>>> > >>>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp), > >>>>>>>>> newValue.timestamp), > >>>>>>>>> key = "reduced" > >>>>>>>>> ) > >>>>>>>>> }, subtractor = (aggValue, newValue) => { > >>>>>>>>> log.info(s"Called SUB: newValue=$newValue > >>>>>>>>> aggValue=$aggValue") > >>>>>>>>> val agg = Option(aggValue) > >>>>>>>>> TimedValueWithKey( > >>>>>>>>> value = agg.map(_.value).getOrElse(0) - newValue.value, > >>>>>>>>> timestamp = > >>>>>>>>> > >>>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp), > >>>>>>>>> newValue.timestamp), > >>>>>>>>> key = "reduced" > >>>>>>>>> ) > >>>>>>>>> }, "aggregated-table") > >>>>>>>>> .toStream > >>>>>>>>> .to("slope-aggregated-table") > >>>>>>>>> > >>>>>>>>> I log all calls to adder and subtractor, so I am able to see what's > >>>>>>>>> going on there, as well as I track the original keys of the > >>>>>>>>> aggregated > >>>>>>>>> values and their timestamps, so it's relatively easy to see how the > >>>>>>>>> data goes through this topology > >>>>>>>>> > >>>>>>>>> In order to reproduce this behavior I need to: > >>>>>>>>> 1. Start a full topology (with table aggregation) > >>>>>>>>> 2. Start without table aggregation (no app reset) > >>>>>>>>> 3. Start with table aggregation (no app reset) > >>>>>>>>> > >>>>>>>>> Bellow is an interpretation of the adder/subtractor logs for a given > >>>>>>>>> key/window in the chronological order > >>>>>>>>> > >>>>>>>>> SUB: newValue=(key2, 732, 10:50:40) aggValue=null > >>>>>>>>> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40) > >>>>>>>>> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59) > >>>>>>>>> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59) > >>>>>>>>> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59) > >>>>>>>>> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59) > >>>>>>>>> > >>>>>>>>> And in the end the last value that's materialized for that window > >>>>>>>>> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in > >>>>>>>>> value > >>>>>>>>> for a single key between some point in the middle of the window and > >>>>>>>>> at > >>>>>>>>> the end of the window, times 3. As opposed to the expected value of > >>>>>>>>> 751 * 3 = 2253 (sum of last values in a time window for all keys > >>>>>>>>> being > >>>>>>>>> aggregated). > >>>>>>>>> > >>>>>>>>> It's clear to me that I should do an application reset, but I also > >>>>>>>>> would like to understand, should I expect adder/subtractor being > >>>>>>>>> called with null aggValue, or is it a clear sign that something went > >>>>>>>>> horribly wrong? > >>>>>>>>> > >>>>>>>>> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <j...@confluent.io> > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Hi Vasily, > >>>>>>>>>> > >>>>>>>>>> Thanks for the email. > >>>>>>>>>> > >>>>>>>>>> To answer your question: you should reset the application basically > >>>>>>> any > >>>>>>>>>> time you change the topology. Some transitions are safe, but others > >>>>>>> will > >>>>>>>>>> result in data loss or corruption. Rather than try to reason about > >>>>>>> which > >>>>>>>>> is > >>>>>>>>>> which, it's much safer just to either reset the app or not change > >>>>>>>>>> it > >>>>>>> (if > >>>>>>>>> it > >>>>>>>>>> has important state). > >>>>>>>>>> > >>>>>>>>>> Beyond changes that you make to the topology, we spend a lot of > >>>>>>> effort to > >>>>>>>>>> try and make sure that different versions of Streams will produce > >>>>>>>>>> the > >>>>>>>>> same > >>>>>>>>>> topology, so unless the release notes say otherwise, you should be > >>>>>>> able > >>>>>>>>> to > >>>>>>>>>> upgrade without a reset. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I can't say right now whether those wacky behaviors are bugs or the > >>>>>>>>> result > >>>>>>>>>> of changing the topology without a reset. Or if they are correct > >>>>>>>>>> but > >>>>>>>>>> surprising behavior somehow. I'll look into it tomorrow. Do feel > >>>>>>> free to > >>>>>>>>>> open a Jira ticket if you think you have found a bug, especially if > >>>>>>> you > >>>>>>>>> can > >>>>>>>>>> describe a repro. Knowing your topology before and after the change > >>>>>>> would > >>>>>>>>>> also be immensely helpful. You can print it with > >>>>>>>>>> Topology.describe(). > >>>>>>>>>> > >>>>>>>>>> Regardless, I'll make a note to take a look at the code tomorrow > >>>>>>>>>> and > >>>>>>> try > >>>>>>>>> to > >>>>>>>>>> decide if you should expect these behaviors with "clean" topology > >>>>>>>>> changes. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> -John > >>>>>>>>>> > >>>>>>>>>> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov < > >>>>>>> vas...@sulatskov.net> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> I am doing some experiments with kafka-streams KGroupedTable > >>>>>>>>>>> aggregation, and admittedly I am not wiping data properly on each > >>>>>>>>>>> restart, partially because I also wonder what would happen if you > >>>>>>>>>>> change a streams topology without doing a proper reset. > >>>>>>>>>>> > >>>>>>>>>>> I've noticed that from time to time, kafka-streams > >>>>>>>>>>> KGroupedTable.reduce() can call subtractor function with null > >>>>>>>>>>> aggregator value, and if you try to work around that, by > >>>>>>> interpreting > >>>>>>>>>>> null aggregator value as zero for numeric value you get incorrect > >>>>>>>>>>> aggregation result. > >>>>>>>>>>> > >>>>>>>>>>> I do understand that the proper way of handling this is to do a > >>>>>>> reset > >>>>>>>>>>> on topology changes, but I'd like to understand if there's any > >>>>>>>>>>> legitmate case when kafka-streams can call an adder or a > >>>>>>> substractor > >>>>>>>>>>> with null aggregator value, and should I plan for this, or should > >>>>>>>>>>> I > >>>>>>>>>>> interpret this as an invalid state, and terminate the application, > >>>>>>> and > >>>>>>>>>>> do a proper reset? > >>>>>>>>>>> > >>>>>>>>>>> Also, I can't seem to find a guide which explains when application > >>>>>>>>>>> reset is necessary. Intuitively it seems that it should be done > >>>>>>> every > >>>>>>>>>>> time a topology changes. Any other cases? > >>>>>>>>>>> > >>>>>>>>>>> I tried to debug where the null value comes from and it seems that > >>>>>>>>>>> KTableReduce.process() is getting called with Change<V> value with > >>>>>>>>>>> newValue == null, and some non-null oldValue. Which leads to and > >>>>>>>>>>> to > >>>>>>>>>>> subtractor being called with null aggregator value. I wonder how > >>>>>>> it is > >>>>>>>>>>> possible to have an old value for a key without a new value (does > >>>>>>> it > >>>>>>>>>>> happen because of the auto commit interval?). > >>>>>>>>>>> > >>>>>>>>>>> I've also noticed that it's possible for an input value from a > >>>>>>> topic > >>>>>>>>>>> to bypass aggregation function entirely and be directly > >>>>>>> transmitted to > >>>>>>>>>>> the output in certain cases: oldAgg is null, newValue is not null > >>>>>>> and > >>>>>>>>>>> oldValue is null - in that case newValue will be transmitted > >>>>>>> directly > >>>>>>>>>>> to the output. I suppose it's the correct behaviour, but feels a > >>>>>>> bit > >>>>>>>>>>> weird nonetheless. And I've actually been able to observe this > >>>>>>>>>>> behaviour in practice. I suppose it's also caused by this > >>>>>>>>>>> happening > >>>>>>>>>>> right before a commit happens, and the message is sent to a > >>>>>>> changelog > >>>>>>>>>>> topic. > >>>>>>>>>>> > >>>>>>>>>>> Please can someone with more knowledge shed some light on these > >>>>>>> issues? > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> Best regards, > >>>>>>>>>>> Vasily Sulatskov > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Best regards, > >>>>>>>>> Vasily Sulatskov > >>>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Best regards, > >>>>>>> Vasily Sulatskov > >>>>>>> > >>>>> > >>>>> > >>>>> > >>>> > >>> > >>> > >> > > > > >
-- Best regards, Vasily Sulatskov