> Processor code should always work; independently if caching is enabled
or not.

If we want to get this, I guess we need a quite different design (see (1)).

The point is, that we want to dedup the output, and not state updates.

It just happens that our starting point was KTable, for which state
updates and downstream changelog output is the same thing. Thus, we can
just use the internal KTable state to do the deduplication for the
downstream changelog.

However, from a general point of view (Processor API view), if we dedup
the output, we want dedup/caching for the processor (and not for a state
store). Of course, we need a state to do the dedup. For KTable, both
things merge into a single abstraction, and we use only a single state
instead of two. From a general point of view, we would need two states
though (one for the actual state, and one for dedup -- think Processor
API -- not DSL).


Alternative proposal 1:
(see also (2) -- which might be better than this one)

Thus, it might be a cleaner design to decouple user-states and
dedup-state from each other. If a user enables dedup/caching (for a
processor) we add an additional state to do the dedup and this
dedup-state is independent from all user states and context.forward()
works as always. The dedup state could be hidden from the user and could
be a pure in-memory state (no need for any recovery -- only flush on
commit). Internally, a context.forward() would call dedupState.put() and
trigger actual output if dedup state needs to evict records.

The disadvantage would be, that we end up with two states for KTable.
The advantage is, that deduplication can be switched off/on without any
Processor code change.


Alternative proposal 2:

We basically keep the current KIP design, including not to disable
context.forward() if a cached state is used. Additionally, for cached
state, we rename put() into putAndForward() which is only available for
cached states. Thus, in processor code, a state must be explicitly cast
into a cached state. We also make the user aware, that an update/put to
a state result in downstream output and that context.forward() would be
a "direct/non-cached" output.

The disadvantage of this is, that processor code is not independent from
caching and thus, caching cannot just be switched on/off (ie, we do not
follow the initial statement of this mail). The advantage is, we can
keep a single state for KTable and this design is just small changes to
the current KIP.



-Matthias


On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
> Sure, you can use a non-cached state. However, if you write code like
> below for a non-cached state, and learn about caching later on, and
> think, caching is a cool feature, I want to use it, you would simply
> want to enable caching (without breaking your code).
> 
> Processor code should always work independently if caching is enabled or
> not.
> 
> -Matthias
> 
> On 09/04/2016 06:56 PM, Eno Thereska wrote:
>> Hi Matthias,
>>
>> Thanks for the good questions. 
>>
>> There is still the option of not using cached state. If one uses cached 
>> state it will dedup for stores and forwarding further. But you can always 
>> disable caching and do what you say.
>>
>> Eno
>>
>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <matth...@confluent.io> wrote:
>>>
>>> Sorry for not being precise. What I meant be "completely" is for a
>>> single processor. Assume I want to have the following pattern:
>>>
>>>  process(...) {
>>>    if (someCondition) {
>>>      state.put(...)
>>>      context.forward(...);
>>>    } else {
>>>      context.forward(...);
>>>  }
>>>
>>> Ie, for some record I do update the state and emit output records, for
>>> other records I only emit output records. This work in current design.
>>> However, if a "cached state" would be used, it would not work any more.
>>>
>>>
>>> -Matthias
>>>
>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
>>>> Hi Matthias,
>>>>
>>>> Thanks for bringing the conversation across to the thread.
>>>>
>>>> I think a main limitation would be, that you cannot mix the 4 patterns
>>>>> within a single application anymore (iff you use a "caches state"). If
>>>>> you have processor with a "cached state" this disables direct usage of
>>>>> context.forward() completely -- if I understand the design correctly.
>>>>> Thus, if a "cached state" is used, forwarding is only possible via state
>>>>> updates.
>>>>>
>>>>>
>>>> The above statement is not correct. Caching doesn't completely disable
>>>> forwarding, it only disables it for Processors that are using State Stores.
>>>> In all other cases context.forward() works as it does now.
>>>>
>>>> Thanks,
>>>> Damian
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to