Hi,

Users have many options for buffering in the Processor API and it doesn't seem 
right we should prescribe a particular one. Also, there is value in continuing 
to keep the Processor API simple.

As such, we'll remove the ".enableCaching" for a store used in the processor 
API from the KIP and simplify the KIP by having it apply to the DSL only.

Thanks
Eno

> On 7 Sep 2016, at 15:41, Damian Guy <damian....@gmail.com> wrote:
> 
> Gouzhang,
> 
> Some points about what you have mentioned:
> 1. You can't just call context.forward() on the flush listener. You have to
> set some other contextual information (currently ProcessorRecordContext)
> prior to doing this otherwise the nodes you are forwarding to are
> undetermined, i.e, this can be called at any point during the topology or
> on commit.
> 2. It is a bytes cache, so the Processors would need to have the Serdes in
> order to use this pattern.
> 3. the namespace of the cache can't just be processorName or even
> processorName-stateStoreName, it also will need to have something like
> taskId along with it.
> 
> Thanks,
> Damian
> 
> 
> On Wed, 7 Sep 2016 at 00:39 Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> Hi Matthias,
>> 
>> I agree with your concerns of coupling with record forwarding with record
>> storing in the state store, and my understanding is that this can (and
>> should) be resolved with the current interface. Here are my thoughts:
>> 
>> 1. The global cache, MemoryLRUCacheBytes, although is currently defined as
>> internal class, since it is exposed in ProcessorContext anyways, should
>> really be a public class anyways that users can access to (I have some
>> other comments about the names, but will rather leave them in the PR).
>> 
>> 2. In the processor API, the users can choose to use the cache to store the
>> intermediate results in the cache, and register the flush listener via
>> addDirtyEntryFlushListener (again some naming suggestions in PR but use it
>> for discussion for now). And as a result, if the old processor code looks
>> like this:
>> 
>> ----------------
>> 
>> process(...) {
>> 
>>  state.put(...);
>>  context.forward(...);
>> }
>> ----------------
>> 
>> Users can now leverage the cache on some of the processors by modifying the
>> code as:
>> 
>> ----------------
>> 
>> init(...) {
>> 
>>  context.getCache().addDirtyEntyFlushLisener(processorName,
>> {state.put(...); context.forward(...)})
>> }
>> 
>> process(...) {
>> 
>>  context.getCache().put(processorName, ..);
>> }
>> 
>> ----------------
>> 
>> 3. Note whether or not to apply caching is optional for each processor node
>> now, and is decoupled with its logic of forwarding / storing in persistent
>> state stores.
>> 
>> One may argue that now if users want to make use of the cache, he will need
>> to make code changes; but I think this is a reasonable requirement to users
>> actually, since that 1) currently we do one update-per-incoming-record, and
>> without code changes this behavior will be preserved, and 2) for DSL
>> implementation, we can just follow the above pattern to abstract it from
>> users, so they can pick up these changes automatically.
>> 
>> 
>> Guozhang
>> 
>> 
>> On Tue, Sep 6, 2016 at 7:41 AM, Eno Thereska <eno.there...@gmail.com>
>> wrote:
>> 
>>> A small update to the KIP: the deduping of records using the cache does
>>> not affect the .to operator since we'd have already deduped the KTable
>>> before the operator. Adjusting KIP.
>>> 
>>> Thanks
>>> Eno
>>> 
>>>> On 5 Sep 2016, at 12:43, Eno Thereska <eno.there...@gmail.com> wrote:
>>>> 
>>>> Hi Matthias,
>>>> 
>>>> The motivation for KIP-63 was primarily aggregates and reducing the
>> load
>>> on "both" state stores and downstream. I think there is agreement that
>> for
>>> the DSL the motivation and design make sense.
>>>> 
>>>> For the Processor API: caching is a major component in any system, and
>>> it is difficult to continue to operate as before, without fully
>>> understanding the consequences. Hence, I think this is mostly a case of
>>> educating users to understand the boundaries of the solution.
>>>> 
>>>> Introducing a cache, either for the state store only, or for downstream
>>> forwarding only, or for both, leads to moving from a model where we
>> process
>>> each request end-to-end (today) to one where a request is temporarily
>>> buffered in a cache. In all the cases, this opens up the question of what
>>> to do next once the request then leaves the cache, and how to express
>> that
>>> (future) behaviour. E.g., even when the cache is just for downstream
>>> forwarding (i.e., decoupled from any state store), the processor API user
>>> might be surprised that context.forward() does not immediately do
>> anything.
>>>> 
>>>> I agree that for ultra-flexibility, a processor API user should be able
>>> to choose whether the dedup cache is put 1) on top of a store only, 2) on
>>> forward only, 3) on both store and forward, but given the motivation for
>>> KIP-63 (aggregates), I believe a decoupled store-forward dedup cache is a
>>> reasonable choice that provides good default behaviour, without prodding
>>> the user to specify the combinations.
>>>> 
>>>> We need to educate users that if a cache is used in the Processor API,
>>> the forwarding will happen in the future.
>>>> 
>>>> -Eno
>>>> 
>>>> 
>>>> 
>>>>> On 4 Sep 2016, at 19:11, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>>>> 
>>>>>> Processor code should always work; independently if caching is
>> enabled
>>>>> or not.
>>>>> 
>>>>> If we want to get this, I guess we need a quite different design (see
>>> (1)).
>>>>> 
>>>>> The point is, that we want to dedup the output, and not state updates.
>>>>> 
>>>>> It just happens that our starting point was KTable, for which state
>>>>> updates and downstream changelog output is the same thing. Thus, we
>> can
>>>>> just use the internal KTable state to do the deduplication for the
>>>>> downstream changelog.
>>>>> 
>>>>> However, from a general point of view (Processor API view), if we
>> dedup
>>>>> the output, we want dedup/caching for the processor (and not for a
>> state
>>>>> store). Of course, we need a state to do the dedup. For KTable, both
>>>>> things merge into a single abstraction, and we use only a single state
>>>>> instead of two. From a general point of view, we would need two states
>>>>> though (one for the actual state, and one for dedup -- think Processor
>>>>> API -- not DSL).
>>>>> 
>>>>> 
>>>>> Alternative proposal 1:
>>>>> (see also (2) -- which might be better than this one)
>>>>> 
>>>>> Thus, it might be a cleaner design to decouple user-states and
>>>>> dedup-state from each other. If a user enables dedup/caching (for a
>>>>> processor) we add an additional state to do the dedup and this
>>>>> dedup-state is independent from all user states and context.forward()
>>>>> works as always. The dedup state could be hidden from the user and
>> could
>>>>> be a pure in-memory state (no need for any recovery -- only flush on
>>>>> commit). Internally, a context.forward() would call dedupState.put()
>> and
>>>>> trigger actual output if dedup state needs to evict records.
>>>>> 
>>>>> The disadvantage would be, that we end up with two states for KTable.
>>>>> The advantage is, that deduplication can be switched off/on without
>> any
>>>>> Processor code change.
>>>>> 
>>>>> 
>>>>> Alternative proposal 2:
>>>>> 
>>>>> We basically keep the current KIP design, including not to disable
>>>>> context.forward() if a cached state is used. Additionally, for cached
>>>>> state, we rename put() into putAndForward() which is only available
>> for
>>>>> cached states. Thus, in processor code, a state must be explicitly
>> cast
>>>>> into a cached state. We also make the user aware, that an update/put
>> to
>>>>> a state result in downstream output and that context.forward() would
>> be
>>>>> a "direct/non-cached" output.
>>>>> 
>>>>> The disadvantage of this is, that processor code is not independent
>> from
>>>>> caching and thus, caching cannot just be switched on/off (ie, we do
>> not
>>>>> follow the initial statement of this mail). The advantage is, we can
>>>>> keep a single state for KTable and this design is just small changes
>> to
>>>>> the current KIP.
>>>>> 
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
>>>>>> Sure, you can use a non-cached state. However, if you write code like
>>>>>> below for a non-cached state, and learn about caching later on, and
>>>>>> think, caching is a cool feature, I want to use it, you would simply
>>>>>> want to enable caching (without breaking your code).
>>>>>> 
>>>>>> Processor code should always work independently if caching is enabled
>>> or
>>>>>> not.
>>>>>> 
>>>>>> -Matthias
>>>>>> 
>>>>>> On 09/04/2016 06:56 PM, Eno Thereska wrote:
>>>>>>> Hi Matthias,
>>>>>>> 
>>>>>>> Thanks for the good questions.
>>>>>>> 
>>>>>>> There is still the option of not using cached state. If one uses
>>> cached state it will dedup for stores and forwarding further. But you can
>>> always disable caching and do what you say.
>>>>>>> 
>>>>>>> Eno
>>>>>>> 
>>>>>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>>>>>> 
>>>>>>>> Sorry for not being precise. What I meant be "completely" is for a
>>>>>>>> single processor. Assume I want to have the following pattern:
>>>>>>>> 
>>>>>>>> process(...) {
>>>>>>>> if (someCondition) {
>>>>>>>>   state.put(...)
>>>>>>>>   context.forward(...);
>>>>>>>> } else {
>>>>>>>>   context.forward(...);
>>>>>>>> }
>>>>>>>> 
>>>>>>>> Ie, for some record I do update the state and emit output records,
>>> for
>>>>>>>> other records I only emit output records. This work in current
>>> design.
>>>>>>>> However, if a "cached state" would be used, it would not work any
>>> more.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> -Matthias
>>>>>>>> 
>>>>>>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
>>>>>>>>> Hi Matthias,
>>>>>>>>> 
>>>>>>>>> Thanks for bringing the conversation across to the thread.
>>>>>>>>> 
>>>>>>>>> I think a main limitation would be, that you cannot mix the 4
>>> patterns
>>>>>>>>>> within a single application anymore (iff you use a "caches
>>> state"). If
>>>>>>>>>> you have processor with a "cached state" this disables direct
>>> usage of
>>>>>>>>>> context.forward() completely -- if I understand the design
>>> correctly.
>>>>>>>>>> Thus, if a "cached state" is used, forwarding is only possible
>> via
>>> state
>>>>>>>>>> updates.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> The above statement is not correct. Caching doesn't completely
>>> disable
>>>>>>>>> forwarding, it only disables it for Processors that are using
>> State
>>> Stores.
>>>>>>>>> In all other cases context.forward() works as it does now.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Damian
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
>> --
>> -- Guozhang
>> 

Reply via email to