We had a recent discussion about KIP-63, and I just c&p from the JIRA discussion:
Damian: > During the code walk-through, Matthias raised a very good point about the use > of context().forward being coupled to whether or not caching is enabled. Now > that i've had the chance to think about it I have one potential solution for > making this transparent to uses of the Processor API. > > We can add another method boolean isCachingEnabled() to the new interface > ForwardingStateStoreSupplier. We also add 2 new methods to ProcessorNode: > boolean isStateStoreCachingEnabled() and void setStateStoreCachingEnabled() > > In TopologyBuilder when we are creating the ProcessorNodeCacheFlushListener > to attach to the ForwardingStateStoreSupplier we can call > ProcessorNode.setStateStoreCachingEnabled(supplier.isStateStoreCachingEnabled()) > > We add an extra boolean parameter to the ProcessorRecordContextImpl forward > this will be set to false when constructed from StreamTask and will be set to > true when constructed from ProcessorNodeCacheFlushListener. Then in > ProcessorRecordContextImpl.forward(..) we add a guard if (shouldForward()) > where shouldForward is return forward || !node.stateStoreCachingEnabled(); > > Now Processors are free to call context().forward(..) whether caching is > enabled or not. If it is enabled the values just wont get forwarded until the > cache evicts/flushes them. Matthias: > I guess this is a good solution/workaround. I had something like this in my > mind during the call, too. > > However, thinking about the root cause of this issue again, I am not sure if > the (overall) design of this KIP is optimal or not. My new concern is, that > with this caching strategy, we "merge" two concepts into one; and I am not > sure, if we should to this. > > Currently, message flow and state is decoupled and independent of each other. > Thus, if there is a state, updates to the state are completely independent > from emitting output records. With the new design, we merge state updates and > record emits, limiting the overall flexibility. I guess, from a DSL point of > view, this would not be problematic, because in an aggregation and changelog > output, each update to the state should result in a downstream record. > However, from a Processor API point of view, there are other patterns we want > to be able to support, too. > > Basically, for each input record, there a four different patterns that could > be applied by the user: > > no state updates, no output records > only state update > only output records > state updates and output records > > Right now, we go with a design that allows to use one of the patterns within > a Processor. However, all 4 pattern could be mixed within a single Processor > (pre KIP design), and this mixture would not be possible any more. If we want > to support all four cases, we might not want to merge both into "a single > abstraction" as we do in the design of this PR. What if a user just wants to > sent a record downstream (without any state manipulation)? > > Going back to the KIP design, we move the cache from RocksDB into the > processor. However, what we actually wanted to do was to de-duplicate output > records. Thus, the newly introduced cache, could actually go "after the > processor" and could be completely independent from the state. Thus, on each > call to forward() the record is put into the cache, and if the cache is full, > an actual cache eviction and record forwarding happens. This would make the > de-duplication cache independent from the state. Eno: > it's not entirely true that the flexibility is limited. For example, what's > next in implementation is https://issues.apache.org/jira/browse/KAFKA-3779 > where we add the dedup cache to the to operator. That is not implemented yet. Damian: > i think of the 4 patterns you mentioned only the last one changes, i.e, state > updates and output records. > context.forward() still exists so you can just send a record downstream > without any state manipulation, that behaviour hasn't changed. On 08/24/2016 03:35 PM, Eno Thereska wrote: > Hi folks, > > We've been working on a proof-of-concept for KIP-63 and that can now be > found at the main JIRA (https://issues.apache.org/jira/browse/KAFKA-3776) > under PR https://github.com/apache/kafka/pull/1752. It is still work in > progress, however we are confident that the basic structure is there. > > As part of this work, we've also updated the KIP to clarify several things, > listed here for convenience: > > - Clarify that the optimization is applicable to aggregations and to > operators. It is not applicable to joins. > - Clarify that for the low-level Processor API, we propose to allow users > for disabling caching on a store-by-store basis using a new > .enableCaching() call. > > We'll start the voting process shortly for this KIP. > > Thanks > Eno > > > On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> Hi there, >> >> I have created KIP-63: Unify store and downstream caching in streams >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 63%3A+Unify+store+and+downstream+caching+in+streams >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams> >> >> >> Feedback is appreciated. >> >> Thank you >> Eno >> >
signature.asc
Description: OpenPGP digital signature