> Processor code should always work; independently if caching is enabled or not.
If we want to get this, I guess we need a quite different design (see (1)). The point is, that we want to dedup the output, and not state updates. It just happens that our starting point was KTable, for which state updates and downstream changelog output is the same thing. Thus, we can just use the internal KTable state to do the deduplication for the downstream changelog. However, from a general point of view (Processor API view), if we dedup the output, we want dedup/caching for the processor (and not for a state store). Of course, we need a state to do the dedup. For KTable, both things merge into a single abstraction, and we use only a single state instead of two. From a general point of view, we would need two states though (one for the actual state, and one for dedup -- think Processor API -- not DSL). Alternative proposal 1: (see also (2) -- which might be better than this one) Thus, it might be a cleaner design to decouple user-states and dedup-state from each other. If a user enables dedup/caching (for a processor) we add an additional state to do the dedup and this dedup-state is independent from all user states and context.forward() works as always. The dedup state could be hidden from the user and could be a pure in-memory state (no need for any recovery -- only flush on commit). Internally, a context.forward() would call dedupState.put() and trigger actual output if dedup state needs to evict records. The disadvantage would be, that we end up with two states for KTable. The advantage is, that deduplication can be switched off/on without any Processor code change. Alternative proposal 2: We basically keep the current KIP design, including not to disable context.forward() if a cached state is used. Additionally, for cached state, we rename put() into putAndForward() which is only available for cached states. Thus, in processor code, a state must be explicitly cast into a cached state. We also make the user aware, that an update/put to a state result in downstream output and that context.forward() would be a "direct/non-cached" output. The disadvantage of this is, that processor code is not independent from caching and thus, caching cannot just be switched on/off (ie, we do not follow the initial statement of this mail). The advantage is, we can keep a single state for KTable and this design is just small changes to the current KIP. -Matthias On 09/04/2016 07:10 PM, Matthias J. Sax wrote: > Sure, you can use a non-cached state. However, if you write code like > below for a non-cached state, and learn about caching later on, and > think, caching is a cool feature, I want to use it, you would simply > want to enable caching (without breaking your code). > > Processor code should always work independently if caching is enabled or > not. > > -Matthias > > On 09/04/2016 06:56 PM, Eno Thereska wrote: >> Hi Matthias, >> >> Thanks for the good questions. >> >> There is still the option of not using cached state. If one uses cached >> state it will dedup for stores and forwarding further. But you can always >> disable caching and do what you say. >> >> Eno >> >>> On 4 Sep 2016, at 17:36, Matthias J. Sax <matth...@confluent.io> wrote: >>> >>> Sorry for not being precise. What I meant be "completely" is for a >>> single processor. Assume I want to have the following pattern: >>> >>> process(...) { >>> if (someCondition) { >>> state.put(...) >>> context.forward(...); >>> } else { >>> context.forward(...); >>> } >>> >>> Ie, for some record I do update the state and emit output records, for >>> other records I only emit output records. This work in current design. >>> However, if a "cached state" would be used, it would not work any more. >>> >>> >>> -Matthias >>> >>> On 09/04/2016 05:58 PM, Damian Guy wrote: >>>> Hi Matthias, >>>> >>>> Thanks for bringing the conversation across to the thread. >>>> >>>> I think a main limitation would be, that you cannot mix the 4 patterns >>>>> within a single application anymore (iff you use a "caches state"). If >>>>> you have processor with a "cached state" this disables direct usage of >>>>> context.forward() completely -- if I understand the design correctly. >>>>> Thus, if a "cached state" is used, forwarding is only possible via state >>>>> updates. >>>>> >>>>> >>>> The above statement is not correct. Caching doesn't completely disable >>>> forwarding, it only disables it for Processors that are using State Stores. >>>> In all other cases context.forward() works as it does now. >>>> >>>> Thanks, >>>> Damian >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature