A small update to the KIP: the deduping of records using the cache does not affect the .to operator since we'd have already deduped the KTable before the operator. Adjusting KIP.
Thanks Eno > On 5 Sep 2016, at 12:43, Eno Thereska <eno.there...@gmail.com> wrote: > > Hi Matthias, > > The motivation for KIP-63 was primarily aggregates and reducing the load on > "both" state stores and downstream. I think there is agreement that for the > DSL the motivation and design make sense. > > For the Processor API: caching is a major component in any system, and it is > difficult to continue to operate as before, without fully understanding the > consequences. Hence, I think this is mostly a case of educating users to > understand the boundaries of the solution. > > Introducing a cache, either for the state store only, or for downstream > forwarding only, or for both, leads to moving from a model where we process > each request end-to-end (today) to one where a request is temporarily > buffered in a cache. In all the cases, this opens up the question of what to > do next once the request then leaves the cache, and how to express that > (future) behaviour. E.g., even when the cache is just for downstream > forwarding (i.e., decoupled from any state store), the processor API user > might be surprised that context.forward() does not immediately do anything. > > I agree that for ultra-flexibility, a processor API user should be able to > choose whether the dedup cache is put 1) on top of a store only, 2) on > forward only, 3) on both store and forward, but given the motivation for > KIP-63 (aggregates), I believe a decoupled store-forward dedup cache is a > reasonable choice that provides good default behaviour, without prodding the > user to specify the combinations. > > We need to educate users that if a cache is used in the Processor API, the > forwarding will happen in the future. > > -Eno > > > >> On 4 Sep 2016, at 19:11, Matthias J. Sax <matth...@confluent.io> wrote: >> >>> Processor code should always work; independently if caching is enabled >> or not. >> >> If we want to get this, I guess we need a quite different design (see (1)). >> >> The point is, that we want to dedup the output, and not state updates. >> >> It just happens that our starting point was KTable, for which state >> updates and downstream changelog output is the same thing. Thus, we can >> just use the internal KTable state to do the deduplication for the >> downstream changelog. >> >> However, from a general point of view (Processor API view), if we dedup >> the output, we want dedup/caching for the processor (and not for a state >> store). Of course, we need a state to do the dedup. For KTable, both >> things merge into a single abstraction, and we use only a single state >> instead of two. From a general point of view, we would need two states >> though (one for the actual state, and one for dedup -- think Processor >> API -- not DSL). >> >> >> Alternative proposal 1: >> (see also (2) -- which might be better than this one) >> >> Thus, it might be a cleaner design to decouple user-states and >> dedup-state from each other. If a user enables dedup/caching (for a >> processor) we add an additional state to do the dedup and this >> dedup-state is independent from all user states and context.forward() >> works as always. The dedup state could be hidden from the user and could >> be a pure in-memory state (no need for any recovery -- only flush on >> commit). Internally, a context.forward() would call dedupState.put() and >> trigger actual output if dedup state needs to evict records. >> >> The disadvantage would be, that we end up with two states for KTable. >> The advantage is, that deduplication can be switched off/on without any >> Processor code change. >> >> >> Alternative proposal 2: >> >> We basically keep the current KIP design, including not to disable >> context.forward() if a cached state is used. Additionally, for cached >> state, we rename put() into putAndForward() which is only available for >> cached states. Thus, in processor code, a state must be explicitly cast >> into a cached state. We also make the user aware, that an update/put to >> a state result in downstream output and that context.forward() would be >> a "direct/non-cached" output. >> >> The disadvantage of this is, that processor code is not independent from >> caching and thus, caching cannot just be switched on/off (ie, we do not >> follow the initial statement of this mail). The advantage is, we can >> keep a single state for KTable and this design is just small changes to >> the current KIP. >> >> >> >> -Matthias >> >> >> On 09/04/2016 07:10 PM, Matthias J. Sax wrote: >>> Sure, you can use a non-cached state. However, if you write code like >>> below for a non-cached state, and learn about caching later on, and >>> think, caching is a cool feature, I want to use it, you would simply >>> want to enable caching (without breaking your code). >>> >>> Processor code should always work independently if caching is enabled or >>> not. >>> >>> -Matthias >>> >>> On 09/04/2016 06:56 PM, Eno Thereska wrote: >>>> Hi Matthias, >>>> >>>> Thanks for the good questions. >>>> >>>> There is still the option of not using cached state. If one uses cached >>>> state it will dedup for stores and forwarding further. But you can always >>>> disable caching and do what you say. >>>> >>>> Eno >>>> >>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <matth...@confluent.io> wrote: >>>>> >>>>> Sorry for not being precise. What I meant be "completely" is for a >>>>> single processor. Assume I want to have the following pattern: >>>>> >>>>> process(...) { >>>>> if (someCondition) { >>>>> state.put(...) >>>>> context.forward(...); >>>>> } else { >>>>> context.forward(...); >>>>> } >>>>> >>>>> Ie, for some record I do update the state and emit output records, for >>>>> other records I only emit output records. This work in current design. >>>>> However, if a "cached state" would be used, it would not work any more. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 09/04/2016 05:58 PM, Damian Guy wrote: >>>>>> Hi Matthias, >>>>>> >>>>>> Thanks for bringing the conversation across to the thread. >>>>>> >>>>>> I think a main limitation would be, that you cannot mix the 4 patterns >>>>>>> within a single application anymore (iff you use a "caches state"). If >>>>>>> you have processor with a "cached state" this disables direct usage of >>>>>>> context.forward() completely -- if I understand the design correctly. >>>>>>> Thus, if a "cached state" is used, forwarding is only possible via state >>>>>>> updates. >>>>>>> >>>>>>> >>>>>> The above statement is not correct. Caching doesn't completely disable >>>>>> forwarding, it only disables it for Processors that are using State >>>>>> Stores. >>>>>> In all other cases context.forward() works as it does now. >>>>>> >>>>>> Thanks, >>>>>> Damian >>>>>> >>>>> >>>> >>> >> >