A small update to the KIP: the deduping of records using the cache does not 
affect the .to operator since we'd have already deduped the KTable before the 
operator. Adjusting KIP.

Thanks
Eno

> On 5 Sep 2016, at 12:43, Eno Thereska <eno.there...@gmail.com> wrote:
> 
> Hi Matthias,
> 
> The motivation for KIP-63 was primarily aggregates and reducing the load on 
> "both" state stores and downstream. I think there is agreement that for the 
> DSL the motivation and design make sense.
> 
> For the Processor API: caching is a major component in any system, and it is 
> difficult to continue to operate as before, without fully understanding the 
> consequences. Hence, I think this is mostly a case of educating users to 
> understand the boundaries of the solution. 
> 
> Introducing a cache, either for the state store only, or for downstream 
> forwarding only, or for both, leads to moving from a model where we process 
> each request end-to-end (today) to one where a request is temporarily 
> buffered in a cache. In all the cases, this opens up the question of what to 
> do next once the request then leaves the cache, and how to express that 
> (future) behaviour. E.g., even when the cache is just for downstream 
> forwarding (i.e., decoupled from any state store), the processor API user 
> might be surprised that context.forward() does not immediately do anything.
> 
> I agree that for ultra-flexibility, a processor API user should be able to 
> choose whether the dedup cache is put 1) on top of a store only, 2) on 
> forward only, 3) on both store and forward, but given the motivation for 
> KIP-63 (aggregates), I believe a decoupled store-forward dedup cache is a 
> reasonable choice that provides good default behaviour, without prodding the 
> user to specify the combinations.
> 
> We need to educate users that if a cache is used in the Processor API, the 
> forwarding will happen in the future. 
> 
> -Eno
> 
> 
> 
>> On 4 Sep 2016, at 19:11, Matthias J. Sax <matth...@confluent.io> wrote:
>> 
>>> Processor code should always work; independently if caching is enabled
>> or not.
>> 
>> If we want to get this, I guess we need a quite different design (see (1)).
>> 
>> The point is, that we want to dedup the output, and not state updates.
>> 
>> It just happens that our starting point was KTable, for which state
>> updates and downstream changelog output is the same thing. Thus, we can
>> just use the internal KTable state to do the deduplication for the
>> downstream changelog.
>> 
>> However, from a general point of view (Processor API view), if we dedup
>> the output, we want dedup/caching for the processor (and not for a state
>> store). Of course, we need a state to do the dedup. For KTable, both
>> things merge into a single abstraction, and we use only a single state
>> instead of two. From a general point of view, we would need two states
>> though (one for the actual state, and one for dedup -- think Processor
>> API -- not DSL).
>> 
>> 
>> Alternative proposal 1:
>> (see also (2) -- which might be better than this one)
>> 
>> Thus, it might be a cleaner design to decouple user-states and
>> dedup-state from each other. If a user enables dedup/caching (for a
>> processor) we add an additional state to do the dedup and this
>> dedup-state is independent from all user states and context.forward()
>> works as always. The dedup state could be hidden from the user and could
>> be a pure in-memory state (no need for any recovery -- only flush on
>> commit). Internally, a context.forward() would call dedupState.put() and
>> trigger actual output if dedup state needs to evict records.
>> 
>> The disadvantage would be, that we end up with two states for KTable.
>> The advantage is, that deduplication can be switched off/on without any
>> Processor code change.
>> 
>> 
>> Alternative proposal 2:
>> 
>> We basically keep the current KIP design, including not to disable
>> context.forward() if a cached state is used. Additionally, for cached
>> state, we rename put() into putAndForward() which is only available for
>> cached states. Thus, in processor code, a state must be explicitly cast
>> into a cached state. We also make the user aware, that an update/put to
>> a state result in downstream output and that context.forward() would be
>> a "direct/non-cached" output.
>> 
>> The disadvantage of this is, that processor code is not independent from
>> caching and thus, caching cannot just be switched on/off (ie, we do not
>> follow the initial statement of this mail). The advantage is, we can
>> keep a single state for KTable and this design is just small changes to
>> the current KIP.
>> 
>> 
>> 
>> -Matthias
>> 
>> 
>> On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
>>> Sure, you can use a non-cached state. However, if you write code like
>>> below for a non-cached state, and learn about caching later on, and
>>> think, caching is a cool feature, I want to use it, you would simply
>>> want to enable caching (without breaking your code).
>>> 
>>> Processor code should always work independently if caching is enabled or
>>> not.
>>> 
>>> -Matthias
>>> 
>>> On 09/04/2016 06:56 PM, Eno Thereska wrote:
>>>> Hi Matthias,
>>>> 
>>>> Thanks for the good questions. 
>>>> 
>>>> There is still the option of not using cached state. If one uses cached 
>>>> state it will dedup for stores and forwarding further. But you can always 
>>>> disable caching and do what you say.
>>>> 
>>>> Eno
>>>> 
>>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <matth...@confluent.io> wrote:
>>>>> 
>>>>> Sorry for not being precise. What I meant be "completely" is for a
>>>>> single processor. Assume I want to have the following pattern:
>>>>> 
>>>>> process(...) {
>>>>>  if (someCondition) {
>>>>>    state.put(...)
>>>>>    context.forward(...);
>>>>>  } else {
>>>>>    context.forward(...);
>>>>> }
>>>>> 
>>>>> Ie, for some record I do update the state and emit output records, for
>>>>> other records I only emit output records. This work in current design.
>>>>> However, if a "cached state" would be used, it would not work any more.
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
>>>>>> Hi Matthias,
>>>>>> 
>>>>>> Thanks for bringing the conversation across to the thread.
>>>>>> 
>>>>>> I think a main limitation would be, that you cannot mix the 4 patterns
>>>>>>> within a single application anymore (iff you use a "caches state"). If
>>>>>>> you have processor with a "cached state" this disables direct usage of
>>>>>>> context.forward() completely -- if I understand the design correctly.
>>>>>>> Thus, if a "cached state" is used, forwarding is only possible via state
>>>>>>> updates.
>>>>>>> 
>>>>>>> 
>>>>>> The above statement is not correct. Caching doesn't completely disable
>>>>>> forwarding, it only disables it for Processors that are using State 
>>>>>> Stores.
>>>>>> In all other cases context.forward() works as it does now.
>>>>>> 
>>>>>> Thanks,
>>>>>> Damian
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to