It is not possible to use a single message, because both messages may go
to different partitions and may be processed by different applications
instances.

Note, that the overall KTable state is sharded. Updating a single
upstream shard, might required to update two different downstream shards.


-Matthias

On 7/16/18 2:50 PM, Vasily Sulatskov wrote:
> Hi,
> 
> It seems that it wouldn't be that difficult to address: just don't
> break Change(newVal, oldVal) into Change(newVal, null) /
> Change(oldVal, null) and update aggregator value in one .process()
> call.
> 
> Would this change make sense?
> On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax <matth...@confluent.io> 
> wrote:
>>
>> Vasily,
>>
>> yes, it can happen. As you noticed, both messages might be processed on
>> different machines. Thus, Kafka Streams provides 'eventual consistency'
>> guarantees.
>>
>>
>> -Matthias
>>
>> On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
>>> Hi John,
>>>
>>> Thanks a lot for you explanation. It does make much more sense now.
>>>
>>> The Jira issue I think is pretty well explained (with a reference to
>>> this thread). And I've lest my 2 cents in the pull request.
>>>
>>> You are right I didn't notice that repartition topic contains the same
>>> message effectively twice, and 0/1 bytes are non-visible, so when I
>>> used kafka-console-consumer I didn't notice that. So I have a quick
>>> suggestion here, wouldn't it make sense to change 0 and 1 bytes to
>>> something that has visible corresponding ascii characters, say + and
>>> -, as these messages are effectively commands to reducer to execute
>>> either an addition or subtraction?
>>>
>>> On a more serious, side, can you please explain temporal aspects of
>>> how change messages are handled? More specifically, is it guaranteed
>>> that both Change(newValue, null) and Change(null, oldValue) are
>>> handled before a new aggregated value is comitted to an output topic?
>>> Change(newValue, null) and Change(null, oldValue) are delivered as two
>>> separate messages via a kafka topic, and when they are read from a
>>> topic (possibly on a different machine where a commit interval is
>>> asynchronous to a machine that's put these changes into a topic) can
>>> it happen so a Change(newValue, null) is processed by a
>>> KTableReduceProcessor, the value of the aggregator is updated, and
>>> committed to the changelog topic, and a Change(null, oldValue) is
>>> processed only in the next commit interval? If I am understand this
>>> correctly that would mean that in an aggregated table an incorrect
>>> aggregated value will be observed briefly, before being eventually
>>> corrected.
>>>
>>> Can that happen? Or I can't see something that would make it impossible?
>>> On Fri, Jul 13, 2018 at 8:05 PM John Roesler <j...@confluent.io> wrote:
>>>>
>>>> Hi Vasily,
>>>>
>>>> I'm glad you're making me look at this; it's good homework for me!
>>>>
>>>> This is very non-obvious, but here's what happens:
>>>>
>>>> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits
>>>> new/old Change pairs as the value.
>>>>
>>>> Next is the Select (aka GroupBy). In the DSL code, this is the
>>>> KTableRepartitionMap (we call it a repartition when you select a new key,
>>>> since the new keys may belong to different partitions).
>>>> KTableRepartitionMap is a processor that does two things:
>>>> 1. it maps K => K1 (new keys) and V => V1 (new values)
>>>> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
>>>> null)]
>>>> In other words, it turns each Change event into two events: a retraction
>>>> and an update
>>>>
>>>> Next comes the reduce operation. In building the processor node for this
>>>> operation, we create the sink, repartition topic, and source, followed by
>>>> the actual Reduce node. So if you want to look at how the changes get
>>>> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
>>>> You'll see that sink and source a ChangedSerializer and 
>>>> ChangedDeserializer.
>>>>
>>>> By looking into those implementations, I found that they depend on each
>>>> Change containing just one of new OR old. They serialize the underlying
>>>> value using the serde you provide, along with a single byte that signifies
>>>> if the serialized value is the new or old value, which the deserializer
>>>> uses on the receiving end to turn it back into a Change(new, null) or
>>>> Change(null, old) as appropriate. This is why the repartition topic looks
>>>> like it's just the raw data. It basically is, except for the magic byte.
>>>>
>>>> Does that make sense?
>>>>
>>>> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
>>>> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
>>>> leaving any feedback you have?
>>>>
>>>> Thanks,
>>>> -John
>>>>
>>>> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <vas...@sulatskov.net>
>>>> wrote:
>>>>
>>>>> Hi John,
>>>>>
>>>>> Thanks for your explanation.
>>>>>
>>>>> I have an answer to the practical question, i.e. a null aggregator
>>>>> value should be interpreted as a fatal application error.
>>>>>
>>>>> On the other hand, looking at the app topology, I see that a message
>>>>> from KSTREAM-REDUCE-0000000002 / "table" goes goes to
>>>>> KTABLE-SELECT-0000000006 which in turn forwards data to
>>>>> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
>>>>> this point I assume that data goes back to kafka into a *-repartition
>>>>> topic, after that the message is read from kafka by
>>>>> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
>>>>> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
>>>>> [aggregated-table]), where the actual aggregation takes place. What I
>>>>> don't get is where this Change value comes from, I mean if it's been
>>>>> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
>>>>> message goes through kafka where it gets serialized, and looking at
>>>>> kafka "repartition" topic, it contains regular values, not a pair of
>>>>> old/new.
>>>>>
>>>>> As far as I understand, Change is a purely in-memory representation of
>>>>> the state for a particular key, and at no point it's serialized back
>>>>> to kafka, yet somehow this Change values makes it to reducer. I feel
>>>>> like I am missing something here. Could you please clarify this?
>>>>>
>>>>> Can you please point me to a place in kafka-streams sources where a
>>>>> Change of newValue/oldValue is produced, so I could take a look? I
>>>>> found KTableReduce implementation, but can't find who makes these
>>>>> Change value.
>>>>> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <j...@confluent.io> wrote:
>>>>>>
>>>>>> Hi again Vasily,
>>>>>>
>>>>>> Ok, it looks to me like this behavior is the result of the un-clean
>>>>>> topology change.
>>>>>>
>>>>>> Just in case you're interested, here's what I think happened.
>>>>>>
>>>>>> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table"
>>>>> )
>>>>>> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
>>>>>> forwarding both the old and new value that we are able to maintain
>>>>>> aggregates using the subtractor/adder pairs)
>>>>>>
>>>>>> 2. In the full topology, these old/new pairs go through some
>>>>>> transformations, but still in some form eventually make their way down to
>>>>>> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
>>>>>>
>>>>>> 3. The reduce processor logic looks like this:
>>>>>> final V oldAgg = store.get(key);
>>>>>> V newAgg = oldAgg;
>>>>>>
>>>>>> // first try to add the new value
>>>>>> if (value.newValue != null) {
>>>>>>     if (newAgg == null) {
>>>>>>         newAgg = value.newValue;
>>>>>>     } else {
>>>>>>         newAgg = addReducer.apply(newAgg, value.newValue);
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> // then try to remove the old value
>>>>>> if (value.oldValue != null) {
>>>>>>     // Here's where the assumption breaks down...
>>>>>>     newAgg = removeReducer.apply(newAgg, value.oldValue);
>>>>>> }
>>>>>>
>>>>>> 4. Here's what I think happened. This processor saw an event like
>>>>>> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block,
>>>>> and
>>>>>> (since "oldValue != null") would go into the second block. I think that
>>>>> in
>>>>>> the normal case we can rely on the invariant that any value we get as an
>>>>>> "oldValue" is one that we've previously seen ( as "newValue" ).
>>>>>> Consequently, we should be able to assume that if we get a non-null
>>>>>> "oldValue", "newAgg" will also not be null (because we would have written
>>>>>> it to the store back when we saw it as "newValue" and then retrieved it
>>>>>> just now as "newAgg = oldAgg").
>>>>>>
>>>>>> However, if subtopology2, along with KTABLE-SELECT-0000000006
>>>>>> and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
>>>>>> "table") has already emitted some values, then we might in fact receive
>>>>> an
>>>>>> event with some "oldValue" that we have in fact never seen before
>>>>> (because (
>>>>>> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when
>>>>> it
>>>>>> was first emitted as a "newValue").
>>>>>>
>>>>>> This would violate our assumption, and we would unintentionally send a
>>>>>> "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
>>>>>> If you want to double-check my reasoning, you should be able to do so in
>>>>>> the debugger with a breakpoint in KTableReduce.
>>>>>>
>>>>>>
>>>>>> tl;dr: Supposing you reset the app when the topology changes, I think
>>>>> that
>>>>>> you should be able to rely on non-null aggregates being passed in to
>>>>> *both*
>>>>>> the adder and subtractor in a reduce.
>>>>>>
>>>>>> I would be in favor, as you suggested, of adding an explicit check and
>>>>>> throwing an exception if the aggregate is ever null at those points. This
>>>>>> would actually help us detect if the topology has changed unexpectedly
>>>>> and
>>>>>> shut down, hopefully before any damage is done. I'll send a PR and see
>>>>> what
>>>>>> everyone thinks.
>>>>>>
>>>>>> Does this all seem like it adds up to you?
>>>>>> -John
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <vas...@sulatskov.net>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi John,
>>>>>>>
>>>>>>> Thanks for your reply. I am not sure if this behavior I've observed is
>>>>>>> a bug or not, as I've not been resetting my application properly. On
>>>>>>> the other hand if the subtractor or adder in the reduce operation are
>>>>>>> never supposed to be called with null aggregator value, perhaps it
>>>>>>> would make sense to put a null check in the table reduce
>>>>>>> implementation to detect an application entering an invalid state. A
>>>>>>> bit like a check for topics having the same number of partitions when
>>>>>>> doing a join.
>>>>>>>
>>>>>>> Here's some information about my tests. Hope that can be useful:
>>>>>>>
>>>>>>> Topology at start:
>>>>>>>
>>>>>>> 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
>>>>>>>    Sub-topology: 0
>>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>>>>>>>       --> KSTREAM-MAP-0000000001
>>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>>>>>>>       --> KSTREAM-FILTER-0000000004
>>>>>>>       <-- KSTREAM-SOURCE-0000000000
>>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000003
>>>>>>>       <-- KSTREAM-MAP-0000000001
>>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>>>>>>>       <-- KSTREAM-FILTER-0000000004
>>>>>>>
>>>>>>>   Sub-topology: 1
>>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>>>>>>>       --> KSTREAM-REDUCE-0000000002
>>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>>>>>>>       --> KTABLE-TOSTREAM-0000000006
>>>>>>>       <-- KSTREAM-SOURCE-0000000005
>>>>>>>     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000007
>>>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
>>>>>>>       <-- KTABLE-TOSTREAM-0000000006
>>>>>>>
>>>>>>> This topology just takes data from the source topic "slope" which
>>>>>>> produces messages like this:
>>>>>>>
>>>>>>> key1
>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>>>> key3
>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>>>> key2
>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
>>>>>>> key3
>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>>>> key1
>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>>>> key2
>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
>>>>>>>
>>>>>>> Every second, there are 3 new messages arrive from "slope" topic for
>>>>>>> keys key1, key2 and key3, with constantly increasing value.
>>>>>>> Data is transformed so that the original key is also tracked in the
>>>>>>> message value, grouped by key, and windowed with a custom window, and
>>>>>>> reduced with a dummy reduce operation to make a KTable.
>>>>>>> KTable is converted back to a stream, and sent to a topic (just for
>>>>>>> debugging purposes).
>>>>>>>
>>>>>>> Here's the source (it's kafka-streams-scala for the most part). Also
>>>>>>> please ignore silly classes, it's obviously a test:
>>>>>>>
>>>>>>>     val slopeTable = builder
>>>>>>>       .stream[String, TimedValue]("slope")
>>>>>>>       .map(
>>>>>>>         (key, value) =>
>>>>>>>           (
>>>>>>>             StringWrapper(key),
>>>>>>>             TimedValueWithKey(value = value.value, timestamp =
>>>>>>> value.timestamp, key = key)
>>>>>>>         )
>>>>>>>       )
>>>>>>>       .groupByKey
>>>>>>>       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
>>>>>>>       .reduceMat((aggValue, newValue) => newValue, "table")
>>>>>>>
>>>>>>>     slopeTable.toStream
>>>>>>>       .to("slope-table")
>>>>>>>
>>>>>>> Topology after change without a proper reset:
>>>>>>>
>>>>>>> 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
>>>>>>>    Sub-topology: 0
>>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
>>>>>>>       --> KSTREAM-MAP-0000000001
>>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
>>>>>>>       --> KSTREAM-FILTER-0000000004
>>>>>>>       <-- KSTREAM-SOURCE-0000000000
>>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000003
>>>>>>>       <-- KSTREAM-MAP-0000000001
>>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
>>>>>>>       <-- KSTREAM-FILTER-0000000004
>>>>>>>
>>>>>>>   Sub-topology: 1
>>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
>>>>>>>       --> KSTREAM-REDUCE-0000000002
>>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
>>>>>>>       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
>>>>>>>       <-- KSTREAM-SOURCE-0000000005
>>>>>>>     Processor: KTABLE-SELECT-0000000006 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000007
>>>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>>>     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000013
>>>>>>>       <-- KSTREAM-REDUCE-0000000002
>>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
>>>>>>>       <-- KTABLE-SELECT-0000000006
>>>>>>>     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
>>>>>>>       <-- KTABLE-TOSTREAM-0000000012
>>>>>>>
>>>>>>>   Sub-topology: 2
>>>>>>>     Source: KSTREAM-SOURCE-0000000008 (topics:
>>>>>>> [aggregated-table-repartition])
>>>>>>>       --> KTABLE-REDUCE-0000000009
>>>>>>>     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
>>>>>>>       --> KTABLE-TOSTREAM-0000000010
>>>>>>>       <-- KSTREAM-SOURCE-0000000008
>>>>>>>     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
>>>>>>>       --> KSTREAM-SINK-0000000011
>>>>>>>       <-- KTABLE-REDUCE-0000000009
>>>>>>>     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
>>>>>>>       <-- KTABLE-TOSTREAM-0000000010
>>>>>>>
>>>>>>> Here's the source of the sub-topology that does table aggregation:
>>>>>>>
>>>>>>>     slopeTable
>>>>>>>       .groupBy(
>>>>>>>         (key, value) => (new Windowed(StringWrapper("dummykey"),
>>>>>>> key.window()), value)
>>>>>>>       )
>>>>>>>       .reduceMat(adder = (aggValue, newValue) => {
>>>>>>>         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
>>>>>>>         val agg = Option(aggValue)
>>>>>>>         TimedValueWithKey(
>>>>>>>           value = agg.map(_.value).getOrElse(0) + newValue.value,
>>>>>>>           timestamp =
>>>>>>>
>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
>>>>>>> newValue.timestamp),
>>>>>>>           key = "reduced"
>>>>>>>         )
>>>>>>>       }, subtractor = (aggValue, newValue) => {
>>>>>>>         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
>>>>>>>         val agg = Option(aggValue)
>>>>>>>         TimedValueWithKey(
>>>>>>>           value = agg.map(_.value).getOrElse(0) - newValue.value,
>>>>>>>           timestamp =
>>>>>>>
>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
>>>>>>> newValue.timestamp),
>>>>>>>           key = "reduced"
>>>>>>>         )
>>>>>>>       }, "aggregated-table")
>>>>>>>       .toStream
>>>>>>>       .to("slope-aggregated-table")
>>>>>>>
>>>>>>> I log all calls to adder and subtractor, so I am able to see what's
>>>>>>> going on there, as well as I track the original keys of the aggregated
>>>>>>> values and their timestamps, so it's relatively easy to see how the
>>>>>>> data goes through this topology
>>>>>>>
>>>>>>> In order to reproduce this behavior I need to:
>>>>>>> 1. Start a full topology (with table aggregation)
>>>>>>> 2. Start without table aggregation (no app reset)
>>>>>>> 3. Start with table aggregation (no app reset)
>>>>>>>
>>>>>>> Bellow is an interpretation of the adder/subtractor logs for a given
>>>>>>> key/window in the chronological order
>>>>>>>
>>>>>>> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
>>>>>>> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
>>>>>>> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
>>>>>>> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
>>>>>>> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
>>>>>>> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
>>>>>>>
>>>>>>> And in the end the last value that's materialized for that window
>>>>>>> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
>>>>>>> for a single key between some point in the middle of the window and at
>>>>>>> the end of the window, times 3. As opposed to the expected value of
>>>>>>> 751 * 3 = 2253 (sum of last values in a time window for all keys being
>>>>>>> aggregated).
>>>>>>>
>>>>>>> It's clear to me that I should do an application reset, but I also
>>>>>>> would like to understand, should I expect adder/subtractor being
>>>>>>> called with null aggValue, or is it a clear sign that something went
>>>>>>> horribly wrong?
>>>>>>>
>>>>>>> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <j...@confluent.io>
>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Vasily,
>>>>>>>>
>>>>>>>> Thanks for the email.
>>>>>>>>
>>>>>>>> To answer your question: you should reset the application basically
>>>>> any
>>>>>>>> time you change the topology. Some transitions are safe, but others
>>>>> will
>>>>>>>> result in data loss or corruption. Rather than try to reason about
>>>>> which
>>>>>>> is
>>>>>>>> which, it's much safer just to either reset the app or not change it
>>>>> (if
>>>>>>> it
>>>>>>>> has important state).
>>>>>>>>
>>>>>>>> Beyond changes that you make to the topology, we spend a lot of
>>>>> effort to
>>>>>>>> try and make sure that different versions of Streams will produce the
>>>>>>> same
>>>>>>>> topology, so unless the release notes say otherwise, you should be
>>>>> able
>>>>>>> to
>>>>>>>> upgrade without a reset.
>>>>>>>>
>>>>>>>>
>>>>>>>> I can't say right now whether those wacky behaviors are bugs or the
>>>>>>> result
>>>>>>>> of changing the topology without a reset. Or if they are correct but
>>>>>>>> surprising behavior somehow. I'll look into it tomorrow. Do feel
>>>>> free to
>>>>>>>> open a Jira ticket if you think you have found a bug, especially if
>>>>> you
>>>>>>> can
>>>>>>>> describe a repro. Knowing your topology before and after the change
>>>>> would
>>>>>>>> also be immensely helpful. You can print it with Topology.describe().
>>>>>>>>
>>>>>>>> Regardless, I'll make a note to take a look at the code tomorrow and
>>>>> try
>>>>>>> to
>>>>>>>> decide if you should expect these behaviors with "clean" topology
>>>>>>> changes.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <
>>>>> vas...@sulatskov.net>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am doing some experiments with kafka-streams KGroupedTable
>>>>>>>>> aggregation, and admittedly I am not wiping data properly on each
>>>>>>>>> restart, partially because I also wonder what would happen if you
>>>>>>>>> change a streams topology without doing a proper reset.
>>>>>>>>>
>>>>>>>>> I've noticed that from time to time, kafka-streams
>>>>>>>>> KGroupedTable.reduce() can call subtractor function with null
>>>>>>>>> aggregator value, and if you try to work around that, by
>>>>> interpreting
>>>>>>>>> null aggregator value as zero for numeric value you get incorrect
>>>>>>>>> aggregation result.
>>>>>>>>>
>>>>>>>>> I do understand that the proper way of handling this is to do a
>>>>> reset
>>>>>>>>> on topology changes, but I'd like to understand if there's any
>>>>>>>>> legitmate case when kafka-streams can call an adder or a
>>>>> substractor
>>>>>>>>> with null aggregator value, and should I plan for this, or should I
>>>>>>>>> interpret this as an invalid state, and terminate the application,
>>>>> and
>>>>>>>>> do a proper reset?
>>>>>>>>>
>>>>>>>>> Also, I can't seem to find a guide which explains when application
>>>>>>>>> reset is necessary. Intuitively it seems that it should be done
>>>>> every
>>>>>>>>> time a topology changes. Any other cases?
>>>>>>>>>
>>>>>>>>> I tried to debug where the null value comes from and it seems that
>>>>>>>>> KTableReduce.process() is getting called with Change<V> value with
>>>>>>>>> newValue == null, and some non-null oldValue. Which leads to and to
>>>>>>>>> subtractor being called with null aggregator value. I wonder how
>>>>> it is
>>>>>>>>> possible to have an old value for a key without a new value (does
>>>>> it
>>>>>>>>> happen because of the auto commit interval?).
>>>>>>>>>
>>>>>>>>> I've also noticed that it's possible for an input value from a
>>>>> topic
>>>>>>>>> to bypass aggregation function entirely and be directly
>>>>> transmitted to
>>>>>>>>> the output in certain cases: oldAgg is null, newValue is not null
>>>>> and
>>>>>>>>> oldValue is null - in that case newValue will be transmitted
>>>>> directly
>>>>>>>>> to the output. I suppose it's the correct behaviour, but feels a
>>>>> bit
>>>>>>>>> weird nonetheless. And I've actually been able to observe this
>>>>>>>>> behaviour in practice. I suppose it's also caused by this happening
>>>>>>>>> right before a commit happens, and the message is sent to a
>>>>> changelog
>>>>>>>>> topic.
>>>>>>>>>
>>>>>>>>> Please can someone with more knowledge shed some light on these
>>>>> issues?
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards,
>>>>>>>>> Vasily Sulatskov
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Vasily Sulatskov
>>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Vasily Sulatskov
>>>>>
>>>
>>>
>>>
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to