Vasily, yes, it can happen. As you noticed, both messages might be processed on different machines. Thus, Kafka Streams provides 'eventual consistency' guarantees.
-Matthias On 7/16/18 6:51 AM, Vasily Sulatskov wrote: > Hi John, > > Thanks a lot for you explanation. It does make much more sense now. > > The Jira issue I think is pretty well explained (with a reference to > this thread). And I've lest my 2 cents in the pull request. > > You are right I didn't notice that repartition topic contains the same > message effectively twice, and 0/1 bytes are non-visible, so when I > used kafka-console-consumer I didn't notice that. So I have a quick > suggestion here, wouldn't it make sense to change 0 and 1 bytes to > something that has visible corresponding ascii characters, say + and > -, as these messages are effectively commands to reducer to execute > either an addition or subtraction? > > On a more serious, side, can you please explain temporal aspects of > how change messages are handled? More specifically, is it guaranteed > that both Change(newValue, null) and Change(null, oldValue) are > handled before a new aggregated value is comitted to an output topic? > Change(newValue, null) and Change(null, oldValue) are delivered as two > separate messages via a kafka topic, and when they are read from a > topic (possibly on a different machine where a commit interval is > asynchronous to a machine that's put these changes into a topic) can > it happen so a Change(newValue, null) is processed by a > KTableReduceProcessor, the value of the aggregator is updated, and > committed to the changelog topic, and a Change(null, oldValue) is > processed only in the next commit interval? If I am understand this > correctly that would mean that in an aggregated table an incorrect > aggregated value will be observed briefly, before being eventually > corrected. > > Can that happen? Or I can't see something that would make it impossible? > On Fri, Jul 13, 2018 at 8:05 PM John Roesler <j...@confluent.io> wrote: >> >> Hi Vasily, >> >> I'm glad you're making me look at this; it's good homework for me! >> >> This is very non-obvious, but here's what happens: >> >> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits >> new/old Change pairs as the value. >> >> Next is the Select (aka GroupBy). In the DSL code, this is the >> KTableRepartitionMap (we call it a repartition when you select a new key, >> since the new keys may belong to different partitions). >> KTableRepartitionMap is a processor that does two things: >> 1. it maps K => K1 (new keys) and V => V1 (new values) >> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new, >> null)] >> In other words, it turns each Change event into two events: a retraction >> and an update >> >> Next comes the reduce operation. In building the processor node for this >> operation, we create the sink, repartition topic, and source, followed by >> the actual Reduce node. So if you want to look at how the changes get >> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate. >> You'll see that sink and source a ChangedSerializer and ChangedDeserializer. >> >> By looking into those implementations, I found that they depend on each >> Change containing just one of new OR old. They serialize the underlying >> value using the serde you provide, along with a single byte that signifies >> if the serialized value is the new or old value, which the deserializer >> uses on the receiving end to turn it back into a Change(new, null) or >> Change(null, old) as appropriate. This is why the repartition topic looks >> like it's just the raw data. It basically is, except for the magic byte. >> >> Does that make sense? >> >> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and >> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and >> leaving any feedback you have? >> >> Thanks, >> -John >> >> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <vas...@sulatskov.net> >> wrote: >> >>> Hi John, >>> >>> Thanks for your explanation. >>> >>> I have an answer to the practical question, i.e. a null aggregator >>> value should be interpreted as a fatal application error. >>> >>> On the other hand, looking at the app topology, I see that a message >>> from KSTREAM-REDUCE-0000000002 / "table" goes goes to >>> KTABLE-SELECT-0000000006 which in turn forwards data to >>> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at >>> this point I assume that data goes back to kafka into a *-repartition >>> topic, after that the message is read from kafka by >>> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]), >>> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores: >>> [aggregated-table]), where the actual aggregation takes place. What I >>> don't get is where this Change value comes from, I mean if it's been >>> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the >>> message goes through kafka where it gets serialized, and looking at >>> kafka "repartition" topic, it contains regular values, not a pair of >>> old/new. >>> >>> As far as I understand, Change is a purely in-memory representation of >>> the state for a particular key, and at no point it's serialized back >>> to kafka, yet somehow this Change values makes it to reducer. I feel >>> like I am missing something here. Could you please clarify this? >>> >>> Can you please point me to a place in kafka-streams sources where a >>> Change of newValue/oldValue is produced, so I could take a look? I >>> found KTableReduce implementation, but can't find who makes these >>> Change value. >>> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <j...@confluent.io> wrote: >>>> >>>> Hi again Vasily, >>>> >>>> Ok, it looks to me like this behavior is the result of the un-clean >>>> topology change. >>>> >>>> Just in case you're interested, here's what I think happened. >>>> >>>> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table" >>> ) >>>> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by >>>> forwarding both the old and new value that we are able to maintain >>>> aggregates using the subtractor/adder pairs) >>>> >>>> 2. In the full topology, these old/new pairs go through some >>>> transformations, but still in some form eventually make their way down to >>>> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table"). >>>> >>>> 3. The reduce processor logic looks like this: >>>> final V oldAgg = store.get(key); >>>> V newAgg = oldAgg; >>>> >>>> // first try to add the new value >>>> if (value.newValue != null) { >>>> if (newAgg == null) { >>>> newAgg = value.newValue; >>>> } else { >>>> newAgg = addReducer.apply(newAgg, value.newValue); >>>> } >>>> } >>>> >>>> // then try to remove the old value >>>> if (value.oldValue != null) { >>>> // Here's where the assumption breaks down... >>>> newAgg = removeReducer.apply(newAgg, value.oldValue); >>>> } >>>> >>>> 4. Here's what I think happened. This processor saw an event like >>>> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block, >>> and >>>> (since "oldValue != null") would go into the second block. I think that >>> in >>>> the normal case we can rely on the invariant that any value we get as an >>>> "oldValue" is one that we've previously seen ( as "newValue" ). >>>> Consequently, we should be able to assume that if we get a non-null >>>> "oldValue", "newAgg" will also not be null (because we would have written >>>> it to the store back when we saw it as "newValue" and then retrieved it >>>> just now as "newAgg = oldAgg"). >>>> >>>> However, if subtopology2, along with KTABLE-SELECT-0000000006 >>>> and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 / >>>> "table") has already emitted some values, then we might in fact receive >>> an >>>> event with some "oldValue" that we have in fact never seen before >>> (because ( >>>> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when >>> it >>>> was first emitted as a "newValue"). >>>> >>>> This would violate our assumption, and we would unintentionally send a >>>> "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor). >>>> If you want to double-check my reasoning, you should be able to do so in >>>> the debugger with a breakpoint in KTableReduce. >>>> >>>> >>>> tl;dr: Supposing you reset the app when the topology changes, I think >>> that >>>> you should be able to rely on non-null aggregates being passed in to >>> *both* >>>> the adder and subtractor in a reduce. >>>> >>>> I would be in favor, as you suggested, of adding an explicit check and >>>> throwing an exception if the aggregate is ever null at those points. This >>>> would actually help us detect if the topology has changed unexpectedly >>> and >>>> shut down, hopefully before any damage is done. I'll send a PR and see >>> what >>>> everyone thinks. >>>> >>>> Does this all seem like it adds up to you? >>>> -John >>>> >>>> >>>> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <vas...@sulatskov.net> >>>> wrote: >>>> >>>>> Hi John, >>>>> >>>>> Thanks for your reply. I am not sure if this behavior I've observed is >>>>> a bug or not, as I've not been resetting my application properly. On >>>>> the other hand if the subtractor or adder in the reduce operation are >>>>> never supposed to be called with null aggregator value, perhaps it >>>>> would make sense to put a null check in the table reduce >>>>> implementation to detect an application entering an invalid state. A >>>>> bit like a check for topics having the same number of partitions when >>>>> doing a join. >>>>> >>>>> Here's some information about my tests. Hope that can be useful: >>>>> >>>>> Topology at start: >>>>> >>>>> 2018-07-13 10:29:48 [main] INFO TableAggregationTest - Topologies: >>>>> Sub-topology: 0 >>>>> Source: KSTREAM-SOURCE-0000000000 (topics: [slope]) >>>>> --> KSTREAM-MAP-0000000001 >>>>> Processor: KSTREAM-MAP-0000000001 (stores: []) >>>>> --> KSTREAM-FILTER-0000000004 >>>>> <-- KSTREAM-SOURCE-0000000000 >>>>> Processor: KSTREAM-FILTER-0000000004 (stores: []) >>>>> --> KSTREAM-SINK-0000000003 >>>>> <-- KSTREAM-MAP-0000000001 >>>>> Sink: KSTREAM-SINK-0000000003 (topic: table-repartition) >>>>> <-- KSTREAM-FILTER-0000000004 >>>>> >>>>> Sub-topology: 1 >>>>> Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition]) >>>>> --> KSTREAM-REDUCE-0000000002 >>>>> Processor: KSTREAM-REDUCE-0000000002 (stores: [table]) >>>>> --> KTABLE-TOSTREAM-0000000006 >>>>> <-- KSTREAM-SOURCE-0000000005 >>>>> Processor: KTABLE-TOSTREAM-0000000006 (stores: []) >>>>> --> KSTREAM-SINK-0000000007 >>>>> <-- KSTREAM-REDUCE-0000000002 >>>>> Sink: KSTREAM-SINK-0000000007 (topic: slope-table) >>>>> <-- KTABLE-TOSTREAM-0000000006 >>>>> >>>>> This topology just takes data from the source topic "slope" which >>>>> produces messages like this: >>>>> >>>>> key1 >>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"} >>>>> key3 >>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"} >>>>> key2 >>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"} >>>>> key3 >>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"} >>>>> key1 >>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"} >>>>> key2 >>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"} >>>>> >>>>> Every second, there are 3 new messages arrive from "slope" topic for >>>>> keys key1, key2 and key3, with constantly increasing value. >>>>> Data is transformed so that the original key is also tracked in the >>>>> message value, grouped by key, and windowed with a custom window, and >>>>> reduced with a dummy reduce operation to make a KTable. >>>>> KTable is converted back to a stream, and sent to a topic (just for >>>>> debugging purposes). >>>>> >>>>> Here's the source (it's kafka-streams-scala for the most part). Also >>>>> please ignore silly classes, it's obviously a test: >>>>> >>>>> val slopeTable = builder >>>>> .stream[String, TimedValue]("slope") >>>>> .map( >>>>> (key, value) => >>>>> ( >>>>> StringWrapper(key), >>>>> TimedValueWithKey(value = value.value, timestamp = >>>>> value.timestamp, key = key) >>>>> ) >>>>> ) >>>>> .groupByKey >>>>> .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1)) >>>>> .reduceMat((aggValue, newValue) => newValue, "table") >>>>> >>>>> slopeTable.toStream >>>>> .to("slope-table") >>>>> >>>>> Topology after change without a proper reset: >>>>> >>>>> 2018-07-13 10:38:32 [main] INFO TableAggregationTest - Topologies: >>>>> Sub-topology: 0 >>>>> Source: KSTREAM-SOURCE-0000000000 (topics: [slope]) >>>>> --> KSTREAM-MAP-0000000001 >>>>> Processor: KSTREAM-MAP-0000000001 (stores: []) >>>>> --> KSTREAM-FILTER-0000000004 >>>>> <-- KSTREAM-SOURCE-0000000000 >>>>> Processor: KSTREAM-FILTER-0000000004 (stores: []) >>>>> --> KSTREAM-SINK-0000000003 >>>>> <-- KSTREAM-MAP-0000000001 >>>>> Sink: KSTREAM-SINK-0000000003 (topic: table-repartition) >>>>> <-- KSTREAM-FILTER-0000000004 >>>>> >>>>> Sub-topology: 1 >>>>> Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition]) >>>>> --> KSTREAM-REDUCE-0000000002 >>>>> Processor: KSTREAM-REDUCE-0000000002 (stores: [table]) >>>>> --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012 >>>>> <-- KSTREAM-SOURCE-0000000005 >>>>> Processor: KTABLE-SELECT-0000000006 (stores: []) >>>>> --> KSTREAM-SINK-0000000007 >>>>> <-- KSTREAM-REDUCE-0000000002 >>>>> Processor: KTABLE-TOSTREAM-0000000012 (stores: []) >>>>> --> KSTREAM-SINK-0000000013 >>>>> <-- KSTREAM-REDUCE-0000000002 >>>>> Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition) >>>>> <-- KTABLE-SELECT-0000000006 >>>>> Sink: KSTREAM-SINK-0000000013 (topic: slope-table) >>>>> <-- KTABLE-TOSTREAM-0000000012 >>>>> >>>>> Sub-topology: 2 >>>>> Source: KSTREAM-SOURCE-0000000008 (topics: >>>>> [aggregated-table-repartition]) >>>>> --> KTABLE-REDUCE-0000000009 >>>>> Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table]) >>>>> --> KTABLE-TOSTREAM-0000000010 >>>>> <-- KSTREAM-SOURCE-0000000008 >>>>> Processor: KTABLE-TOSTREAM-0000000010 (stores: []) >>>>> --> KSTREAM-SINK-0000000011 >>>>> <-- KTABLE-REDUCE-0000000009 >>>>> Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table) >>>>> <-- KTABLE-TOSTREAM-0000000010 >>>>> >>>>> Here's the source of the sub-topology that does table aggregation: >>>>> >>>>> slopeTable >>>>> .groupBy( >>>>> (key, value) => (new Windowed(StringWrapper("dummykey"), >>>>> key.window()), value) >>>>> ) >>>>> .reduceMat(adder = (aggValue, newValue) => { >>>>> log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue") >>>>> val agg = Option(aggValue) >>>>> TimedValueWithKey( >>>>> value = agg.map(_.value).getOrElse(0) + newValue.value, >>>>> timestamp = >>>>> >>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp), >>>>> newValue.timestamp), >>>>> key = "reduced" >>>>> ) >>>>> }, subtractor = (aggValue, newValue) => { >>>>> log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue") >>>>> val agg = Option(aggValue) >>>>> TimedValueWithKey( >>>>> value = agg.map(_.value).getOrElse(0) - newValue.value, >>>>> timestamp = >>>>> >>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp), >>>>> newValue.timestamp), >>>>> key = "reduced" >>>>> ) >>>>> }, "aggregated-table") >>>>> .toStream >>>>> .to("slope-aggregated-table") >>>>> >>>>> I log all calls to adder and subtractor, so I am able to see what's >>>>> going on there, as well as I track the original keys of the aggregated >>>>> values and their timestamps, so it's relatively easy to see how the >>>>> data goes through this topology >>>>> >>>>> In order to reproduce this behavior I need to: >>>>> 1. Start a full topology (with table aggregation) >>>>> 2. Start without table aggregation (no app reset) >>>>> 3. Start with table aggregation (no app reset) >>>>> >>>>> Bellow is an interpretation of the adder/subtractor logs for a given >>>>> key/window in the chronological order >>>>> >>>>> SUB: newValue=(key2, 732, 10:50:40) aggValue=null >>>>> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40) >>>>> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59) >>>>> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59) >>>>> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59) >>>>> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59) >>>>> >>>>> And in the end the last value that's materialized for that window >>>>> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value >>>>> for a single key between some point in the middle of the window and at >>>>> the end of the window, times 3. As opposed to the expected value of >>>>> 751 * 3 = 2253 (sum of last values in a time window for all keys being >>>>> aggregated). >>>>> >>>>> It's clear to me that I should do an application reset, but I also >>>>> would like to understand, should I expect adder/subtractor being >>>>> called with null aggValue, or is it a clear sign that something went >>>>> horribly wrong? >>>>> >>>>> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <j...@confluent.io> >>> wrote: >>>>>> >>>>>> Hi Vasily, >>>>>> >>>>>> Thanks for the email. >>>>>> >>>>>> To answer your question: you should reset the application basically >>> any >>>>>> time you change the topology. Some transitions are safe, but others >>> will >>>>>> result in data loss or corruption. Rather than try to reason about >>> which >>>>> is >>>>>> which, it's much safer just to either reset the app or not change it >>> (if >>>>> it >>>>>> has important state). >>>>>> >>>>>> Beyond changes that you make to the topology, we spend a lot of >>> effort to >>>>>> try and make sure that different versions of Streams will produce the >>>>> same >>>>>> topology, so unless the release notes say otherwise, you should be >>> able >>>>> to >>>>>> upgrade without a reset. >>>>>> >>>>>> >>>>>> I can't say right now whether those wacky behaviors are bugs or the >>>>> result >>>>>> of changing the topology without a reset. Or if they are correct but >>>>>> surprising behavior somehow. I'll look into it tomorrow. Do feel >>> free to >>>>>> open a Jira ticket if you think you have found a bug, especially if >>> you >>>>> can >>>>>> describe a repro. Knowing your topology before and after the change >>> would >>>>>> also be immensely helpful. You can print it with Topology.describe(). >>>>>> >>>>>> Regardless, I'll make a note to take a look at the code tomorrow and >>> try >>>>> to >>>>>> decide if you should expect these behaviors with "clean" topology >>>>> changes. >>>>>> >>>>>> Thanks, >>>>>> -John >>>>>> >>>>>> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov < >>> vas...@sulatskov.net> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am doing some experiments with kafka-streams KGroupedTable >>>>>>> aggregation, and admittedly I am not wiping data properly on each >>>>>>> restart, partially because I also wonder what would happen if you >>>>>>> change a streams topology without doing a proper reset. >>>>>>> >>>>>>> I've noticed that from time to time, kafka-streams >>>>>>> KGroupedTable.reduce() can call subtractor function with null >>>>>>> aggregator value, and if you try to work around that, by >>> interpreting >>>>>>> null aggregator value as zero for numeric value you get incorrect >>>>>>> aggregation result. >>>>>>> >>>>>>> I do understand that the proper way of handling this is to do a >>> reset >>>>>>> on topology changes, but I'd like to understand if there's any >>>>>>> legitmate case when kafka-streams can call an adder or a >>> substractor >>>>>>> with null aggregator value, and should I plan for this, or should I >>>>>>> interpret this as an invalid state, and terminate the application, >>> and >>>>>>> do a proper reset? >>>>>>> >>>>>>> Also, I can't seem to find a guide which explains when application >>>>>>> reset is necessary. Intuitively it seems that it should be done >>> every >>>>>>> time a topology changes. Any other cases? >>>>>>> >>>>>>> I tried to debug where the null value comes from and it seems that >>>>>>> KTableReduce.process() is getting called with Change<V> value with >>>>>>> newValue == null, and some non-null oldValue. Which leads to and to >>>>>>> subtractor being called with null aggregator value. I wonder how >>> it is >>>>>>> possible to have an old value for a key without a new value (does >>> it >>>>>>> happen because of the auto commit interval?). >>>>>>> >>>>>>> I've also noticed that it's possible for an input value from a >>> topic >>>>>>> to bypass aggregation function entirely and be directly >>> transmitted to >>>>>>> the output in certain cases: oldAgg is null, newValue is not null >>> and >>>>>>> oldValue is null - in that case newValue will be transmitted >>> directly >>>>>>> to the output. I suppose it's the correct behaviour, but feels a >>> bit >>>>>>> weird nonetheless. And I've actually been able to observe this >>>>>>> behaviour in practice. I suppose it's also caused by this happening >>>>>>> right before a commit happens, and the message is sent to a >>> changelog >>>>>>> topic. >>>>>>> >>>>>>> Please can someone with more knowledge shed some light on these >>> issues? >>>>>>> >>>>>>> -- >>>>>>> Best regards, >>>>>>> Vasily Sulatskov >>>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Best regards, >>>>> Vasily Sulatskov >>>>> >>> >>> >>> >>> -- >>> Best regards, >>> Vasily Sulatskov >>> > > >
signature.asc
Description: OpenPGP digital signature