I think a main limitation would be, that you cannot mix the 4 patterns within a single application anymore (iff you use a "caches state"). If you have processor with a "cached state" this disables direct usage of context.forward() completely -- if I understand the design correctly. Thus, if a "cached state" is used, forwarding is only possible via state updates.
The above described approach is fine from DSL point of view. The main question is, if a "cached state" should be a DSL internal implementation detail or should be exposed to the user for Processor API reuse. For the former, the design is fine; for the latter, IMHO it puts a limitation and hard to understand usage pattern for a regular user of Processor API. -Matthias On 09/04/2016 05:28 PM, Matthias J. Sax wrote: > We had a recent discussion about KIP-63, and I just c&p from the JIRA > discussion: > > Damian: >> During the code walk-through, Matthias raised a very good point about the >> use of context().forward being coupled to whether or not caching is enabled. >> Now that i've had the chance to think about it I have one potential solution >> for making this transparent to uses of the Processor API. >> >> We can add another method boolean isCachingEnabled() to the new interface >> ForwardingStateStoreSupplier. We also add 2 new methods to ProcessorNode: >> boolean isStateStoreCachingEnabled() and void setStateStoreCachingEnabled() >> >> In TopologyBuilder when we are creating the ProcessorNodeCacheFlushListener >> to attach to the ForwardingStateStoreSupplier we can call >> ProcessorNode.setStateStoreCachingEnabled(supplier.isStateStoreCachingEnabled()) >> >> We add an extra boolean parameter to the ProcessorRecordContextImpl forward >> this will be set to false when constructed from StreamTask and will be set >> to true when constructed from ProcessorNodeCacheFlushListener. Then in >> ProcessorRecordContextImpl.forward(..) we add a guard if (shouldForward()) >> where shouldForward is return forward || !node.stateStoreCachingEnabled(); >> >> Now Processors are free to call context().forward(..) whether caching is >> enabled or not. If it is enabled the values just wont get forwarded until >> the cache evicts/flushes them. > > > Matthias: >> I guess this is a good solution/workaround. I had something like this in my >> mind during the call, too. >> >> However, thinking about the root cause of this issue again, I am not sure if >> the (overall) design of this KIP is optimal or not. My new concern is, that >> with this caching strategy, we "merge" two concepts into one; and I am not >> sure, if we should to this. >> >> Currently, message flow and state is decoupled and independent of each >> other. Thus, if there is a state, updates to the state are completely >> independent from emitting output records. With the new design, we merge >> state updates and record emits, limiting the overall flexibility. I guess, >> from a DSL point of view, this would not be problematic, because in an >> aggregation and changelog output, each update to the state should result in >> a downstream record. However, from a Processor API point of view, there are >> other patterns we want to be able to support, too. >> >> Basically, for each input record, there a four different patterns that could >> be applied by the user: >> >> no state updates, no output records >> only state update >> only output records >> state updates and output records >> >> Right now, we go with a design that allows to use one of the patterns within >> a Processor. However, all 4 pattern could be mixed within a single Processor >> (pre KIP design), and this mixture would not be possible any more. If we >> want to support all four cases, we might not want to merge both into "a >> single abstraction" as we do in the design of this PR. What if a user just >> wants to sent a record downstream (without any state manipulation)? >> >> Going back to the KIP design, we move the cache from RocksDB into the >> processor. However, what we actually wanted to do was to de-duplicate output >> records. Thus, the newly introduced cache, could actually go "after the >> processor" and could be completely independent from the state. Thus, on each >> call to forward() the record is put into the cache, and if the cache is >> full, an actual cache eviction and record forwarding happens. This would >> make the de-duplication cache independent from the state. > > > Eno: >> it's not entirely true that the flexibility is limited. For example, what's >> next in implementation is https://issues.apache.org/jira/browse/KAFKA-3779 >> where we add the dedup cache to the to operator. That is not implemented yet. > > > Damian: >> i think of the 4 patterns you mentioned only the last one changes, i.e, >> state updates and output records. >> context.forward() still exists so you can just send a record downstream >> without any state manipulation, that behaviour hasn't changed. > > > > > > > On 08/24/2016 03:35 PM, Eno Thereska wrote: >> Hi folks, >> >> We've been working on a proof-of-concept for KIP-63 and that can now be >> found at the main JIRA (https://issues.apache.org/jira/browse/KAFKA-3776) >> under PR https://github.com/apache/kafka/pull/1752. It is still work in >> progress, however we are confident that the basic structure is there. >> >> As part of this work, we've also updated the KIP to clarify several things, >> listed here for convenience: >> >> - Clarify that the optimization is applicable to aggregations and to >> operators. It is not applicable to joins. >> - Clarify that for the low-level Processor API, we propose to allow users >> for disabling caching on a store-by-store basis using a new >> .enableCaching() call. >> >> We'll start the voting process shortly for this KIP. >> >> Thanks >> Eno >> >> >> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <[email protected]> >> wrote: >> >>> Hi there, >>> >>> I have created KIP-63: Unify store and downstream caching in streams >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> 63%3A+Unify+store+and+downstream+caching+in+streams >>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams> >>> >>> >>> Feedback is appreciated. >>> >>> Thank you >>> Eno >>> >> >
signature.asc
Description: OpenPGP digital signature
