Hi, Users have many options for buffering in the Processor API and it doesn't seem right we should prescribe a particular one. Also, there is value in continuing to keep the Processor API simple.
As such, we'll remove the ".enableCaching" for a store used in the processor API from the KIP and simplify the KIP by having it apply to the DSL only. Thanks Eno > On 7 Sep 2016, at 15:41, Damian Guy <damian....@gmail.com> wrote: > > Gouzhang, > > Some points about what you have mentioned: > 1. You can't just call context.forward() on the flush listener. You have to > set some other contextual information (currently ProcessorRecordContext) > prior to doing this otherwise the nodes you are forwarding to are > undetermined, i.e, this can be called at any point during the topology or > on commit. > 2. It is a bytes cache, so the Processors would need to have the Serdes in > order to use this pattern. > 3. the namespace of the cache can't just be processorName or even > processorName-stateStoreName, it also will need to have something like > taskId along with it. > > Thanks, > Damian > > > On Wed, 7 Sep 2016 at 00:39 Guozhang Wang <wangg...@gmail.com> wrote: > >> Hi Matthias, >> >> I agree with your concerns of coupling with record forwarding with record >> storing in the state store, and my understanding is that this can (and >> should) be resolved with the current interface. Here are my thoughts: >> >> 1. The global cache, MemoryLRUCacheBytes, although is currently defined as >> internal class, since it is exposed in ProcessorContext anyways, should >> really be a public class anyways that users can access to (I have some >> other comments about the names, but will rather leave them in the PR). >> >> 2. In the processor API, the users can choose to use the cache to store the >> intermediate results in the cache, and register the flush listener via >> addDirtyEntryFlushListener (again some naming suggestions in PR but use it >> for discussion for now). And as a result, if the old processor code looks >> like this: >> >> ---------------- >> >> process(...) { >> >> state.put(...); >> context.forward(...); >> } >> ---------------- >> >> Users can now leverage the cache on some of the processors by modifying the >> code as: >> >> ---------------- >> >> init(...) { >> >> context.getCache().addDirtyEntyFlushLisener(processorName, >> {state.put(...); context.forward(...)}) >> } >> >> process(...) { >> >> context.getCache().put(processorName, ..); >> } >> >> ---------------- >> >> 3. Note whether or not to apply caching is optional for each processor node >> now, and is decoupled with its logic of forwarding / storing in persistent >> state stores. >> >> One may argue that now if users want to make use of the cache, he will need >> to make code changes; but I think this is a reasonable requirement to users >> actually, since that 1) currently we do one update-per-incoming-record, and >> without code changes this behavior will be preserved, and 2) for DSL >> implementation, we can just follow the above pattern to abstract it from >> users, so they can pick up these changes automatically. >> >> >> Guozhang >> >> >> On Tue, Sep 6, 2016 at 7:41 AM, Eno Thereska <eno.there...@gmail.com> >> wrote: >> >>> A small update to the KIP: the deduping of records using the cache does >>> not affect the .to operator since we'd have already deduped the KTable >>> before the operator. Adjusting KIP. >>> >>> Thanks >>> Eno >>> >>>> On 5 Sep 2016, at 12:43, Eno Thereska <eno.there...@gmail.com> wrote: >>>> >>>> Hi Matthias, >>>> >>>> The motivation for KIP-63 was primarily aggregates and reducing the >> load >>> on "both" state stores and downstream. I think there is agreement that >> for >>> the DSL the motivation and design make sense. >>>> >>>> For the Processor API: caching is a major component in any system, and >>> it is difficult to continue to operate as before, without fully >>> understanding the consequences. Hence, I think this is mostly a case of >>> educating users to understand the boundaries of the solution. >>>> >>>> Introducing a cache, either for the state store only, or for downstream >>> forwarding only, or for both, leads to moving from a model where we >> process >>> each request end-to-end (today) to one where a request is temporarily >>> buffered in a cache. In all the cases, this opens up the question of what >>> to do next once the request then leaves the cache, and how to express >> that >>> (future) behaviour. E.g., even when the cache is just for downstream >>> forwarding (i.e., decoupled from any state store), the processor API user >>> might be surprised that context.forward() does not immediately do >> anything. >>>> >>>> I agree that for ultra-flexibility, a processor API user should be able >>> to choose whether the dedup cache is put 1) on top of a store only, 2) on >>> forward only, 3) on both store and forward, but given the motivation for >>> KIP-63 (aggregates), I believe a decoupled store-forward dedup cache is a >>> reasonable choice that provides good default behaviour, without prodding >>> the user to specify the combinations. >>>> >>>> We need to educate users that if a cache is used in the Processor API, >>> the forwarding will happen in the future. >>>> >>>> -Eno >>>> >>>> >>>> >>>>> On 4 Sep 2016, at 19:11, Matthias J. Sax <matth...@confluent.io> >> wrote: >>>>> >>>>>> Processor code should always work; independently if caching is >> enabled >>>>> or not. >>>>> >>>>> If we want to get this, I guess we need a quite different design (see >>> (1)). >>>>> >>>>> The point is, that we want to dedup the output, and not state updates. >>>>> >>>>> It just happens that our starting point was KTable, for which state >>>>> updates and downstream changelog output is the same thing. Thus, we >> can >>>>> just use the internal KTable state to do the deduplication for the >>>>> downstream changelog. >>>>> >>>>> However, from a general point of view (Processor API view), if we >> dedup >>>>> the output, we want dedup/caching for the processor (and not for a >> state >>>>> store). Of course, we need a state to do the dedup. For KTable, both >>>>> things merge into a single abstraction, and we use only a single state >>>>> instead of two. From a general point of view, we would need two states >>>>> though (one for the actual state, and one for dedup -- think Processor >>>>> API -- not DSL). >>>>> >>>>> >>>>> Alternative proposal 1: >>>>> (see also (2) -- which might be better than this one) >>>>> >>>>> Thus, it might be a cleaner design to decouple user-states and >>>>> dedup-state from each other. If a user enables dedup/caching (for a >>>>> processor) we add an additional state to do the dedup and this >>>>> dedup-state is independent from all user states and context.forward() >>>>> works as always. The dedup state could be hidden from the user and >> could >>>>> be a pure in-memory state (no need for any recovery -- only flush on >>>>> commit). Internally, a context.forward() would call dedupState.put() >> and >>>>> trigger actual output if dedup state needs to evict records. >>>>> >>>>> The disadvantage would be, that we end up with two states for KTable. >>>>> The advantage is, that deduplication can be switched off/on without >> any >>>>> Processor code change. >>>>> >>>>> >>>>> Alternative proposal 2: >>>>> >>>>> We basically keep the current KIP design, including not to disable >>>>> context.forward() if a cached state is used. Additionally, for cached >>>>> state, we rename put() into putAndForward() which is only available >> for >>>>> cached states. Thus, in processor code, a state must be explicitly >> cast >>>>> into a cached state. We also make the user aware, that an update/put >> to >>>>> a state result in downstream output and that context.forward() would >> be >>>>> a "direct/non-cached" output. >>>>> >>>>> The disadvantage of this is, that processor code is not independent >> from >>>>> caching and thus, caching cannot just be switched on/off (ie, we do >> not >>>>> follow the initial statement of this mail). The advantage is, we can >>>>> keep a single state for KTable and this design is just small changes >> to >>>>> the current KIP. >>>>> >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 09/04/2016 07:10 PM, Matthias J. Sax wrote: >>>>>> Sure, you can use a non-cached state. However, if you write code like >>>>>> below for a non-cached state, and learn about caching later on, and >>>>>> think, caching is a cool feature, I want to use it, you would simply >>>>>> want to enable caching (without breaking your code). >>>>>> >>>>>> Processor code should always work independently if caching is enabled >>> or >>>>>> not. >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 09/04/2016 06:56 PM, Eno Thereska wrote: >>>>>>> Hi Matthias, >>>>>>> >>>>>>> Thanks for the good questions. >>>>>>> >>>>>>> There is still the option of not using cached state. If one uses >>> cached state it will dedup for stores and forwarding further. But you can >>> always disable caching and do what you say. >>>>>>> >>>>>>> Eno >>>>>>> >>>>>>>> On 4 Sep 2016, at 17:36, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>>>>>>> >>>>>>>> Sorry for not being precise. What I meant be "completely" is for a >>>>>>>> single processor. Assume I want to have the following pattern: >>>>>>>> >>>>>>>> process(...) { >>>>>>>> if (someCondition) { >>>>>>>> state.put(...) >>>>>>>> context.forward(...); >>>>>>>> } else { >>>>>>>> context.forward(...); >>>>>>>> } >>>>>>>> >>>>>>>> Ie, for some record I do update the state and emit output records, >>> for >>>>>>>> other records I only emit output records. This work in current >>> design. >>>>>>>> However, if a "cached state" would be used, it would not work any >>> more. >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 09/04/2016 05:58 PM, Damian Guy wrote: >>>>>>>>> Hi Matthias, >>>>>>>>> >>>>>>>>> Thanks for bringing the conversation across to the thread. >>>>>>>>> >>>>>>>>> I think a main limitation would be, that you cannot mix the 4 >>> patterns >>>>>>>>>> within a single application anymore (iff you use a "caches >>> state"). If >>>>>>>>>> you have processor with a "cached state" this disables direct >>> usage of >>>>>>>>>> context.forward() completely -- if I understand the design >>> correctly. >>>>>>>>>> Thus, if a "cached state" is used, forwarding is only possible >> via >>> state >>>>>>>>>> updates. >>>>>>>>>> >>>>>>>>>> >>>>>>>>> The above statement is not correct. Caching doesn't completely >>> disable >>>>>>>>> forwarding, it only disables it for Processors that are using >> State >>> Stores. >>>>>>>>> In all other cases context.forward() works as it does now. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Damian >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> >> >> -- >> -- Guozhang >>