Hi Matthias,

The motivation for KIP-63 was primarily aggregates and reducing the load on 
"both" state stores and downstream. I think there is agreement that for the DSL 
the motivation and design make sense.

For the Processor API: caching is a major component in any system, and it is 
difficult to continue to operate as before, without fully understanding the 
consequences. Hence, I think this is mostly a case of educating users to 
understand the boundaries of the solution. 

Introducing a cache, either for the state store only, or for downstream 
forwarding only, or for both, leads to moving from a model where we process 
each request end-to-end (today) to one where a request is temporarily buffered 
in a cache. In all the cases, this opens up the question of what to do next 
once the request then leaves the cache, and how to express that (future) 
behaviour. E.g., even when the cache is just for downstream forwarding (i.e., 
decoupled from any state store), the processor API user might be surprised that 
context.forward() does not immediately do anything.

I agree that for ultra-flexibility, a processor API user should be able to 
choose whether the dedup cache is put 1) on top of a store only, 2) on forward 
only, 3) on both store and forward, but given the motivation for KIP-63 
(aggregates), I believe a decoupled store-forward dedup cache is a reasonable 
choice that provides good default behaviour, without prodding the user to 
specify the combinations.

We need to educate users that if a cache is used in the Processor API, the 
forwarding will happen in the future. 

-Eno



> On 4 Sep 2016, at 19:11, Matthias J. Sax <matth...@confluent.io> wrote:
> 
>> Processor code should always work; independently if caching is enabled
> or not.
> 
> If we want to get this, I guess we need a quite different design (see (1)).
> 
> The point is, that we want to dedup the output, and not state updates.
> 
> It just happens that our starting point was KTable, for which state
> updates and downstream changelog output is the same thing. Thus, we can
> just use the internal KTable state to do the deduplication for the
> downstream changelog.
> 
> However, from a general point of view (Processor API view), if we dedup
> the output, we want dedup/caching for the processor (and not for a state
> store). Of course, we need a state to do the dedup. For KTable, both
> things merge into a single abstraction, and we use only a single state
> instead of two. From a general point of view, we would need two states
> though (one for the actual state, and one for dedup -- think Processor
> API -- not DSL).
> 
> 
> Alternative proposal 1:
> (see also (2) -- which might be better than this one)
> 
> Thus, it might be a cleaner design to decouple user-states and
> dedup-state from each other. If a user enables dedup/caching (for a
> processor) we add an additional state to do the dedup and this
> dedup-state is independent from all user states and context.forward()
> works as always. The dedup state could be hidden from the user and could
> be a pure in-memory state (no need for any recovery -- only flush on
> commit). Internally, a context.forward() would call dedupState.put() and
> trigger actual output if dedup state needs to evict records.
> 
> The disadvantage would be, that we end up with two states for KTable.
> The advantage is, that deduplication can be switched off/on without any
> Processor code change.
> 
> 
> Alternative proposal 2:
> 
> We basically keep the current KIP design, including not to disable
> context.forward() if a cached state is used. Additionally, for cached
> state, we rename put() into putAndForward() which is only available for
> cached states. Thus, in processor code, a state must be explicitly cast
> into a cached state. We also make the user aware, that an update/put to
> a state result in downstream output and that context.forward() would be
> a "direct/non-cached" output.
> 
> The disadvantage of this is, that processor code is not independent from
> caching and thus, caching cannot just be switched on/off (ie, we do not
> follow the initial statement of this mail). The advantage is, we can
> keep a single state for KTable and this design is just small changes to
> the current KIP.
> 
> 
> 
> -Matthias
> 
> 
> On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
>> Sure, you can use a non-cached state. However, if you write code like
>> below for a non-cached state, and learn about caching later on, and
>> think, caching is a cool feature, I want to use it, you would simply
>> want to enable caching (without breaking your code).
>> 
>> Processor code should always work independently if caching is enabled or
>> not.
>> 
>> -Matthias
>> 
>> On 09/04/2016 06:56 PM, Eno Thereska wrote:
>>> Hi Matthias,
>>> 
>>> Thanks for the good questions. 
>>> 
>>> There is still the option of not using cached state. If one uses cached 
>>> state it will dedup for stores and forwarding further. But you can always 
>>> disable caching and do what you say.
>>> 
>>> Eno
>>> 
>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <matth...@confluent.io> wrote:
>>>> 
>>>> Sorry for not being precise. What I meant be "completely" is for a
>>>> single processor. Assume I want to have the following pattern:
>>>> 
>>>> process(...) {
>>>>   if (someCondition) {
>>>>     state.put(...)
>>>>     context.forward(...);
>>>>   } else {
>>>>     context.forward(...);
>>>> }
>>>> 
>>>> Ie, for some record I do update the state and emit output records, for
>>>> other records I only emit output records. This work in current design.
>>>> However, if a "cached state" would be used, it would not work any more.
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
>>>>> Hi Matthias,
>>>>> 
>>>>> Thanks for bringing the conversation across to the thread.
>>>>> 
>>>>> I think a main limitation would be, that you cannot mix the 4 patterns
>>>>>> within a single application anymore (iff you use a "caches state"). If
>>>>>> you have processor with a "cached state" this disables direct usage of
>>>>>> context.forward() completely -- if I understand the design correctly.
>>>>>> Thus, if a "cached state" is used, forwarding is only possible via state
>>>>>> updates.
>>>>>> 
>>>>>> 
>>>>> The above statement is not correct. Caching doesn't completely disable
>>>>> forwarding, it only disables it for Processors that are using State 
>>>>> Stores.
>>>>> In all other cases context.forward() works as it does now.
>>>>> 
>>>>> Thanks,
>>>>> Damian
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to