We had a recent discussion about KIP-63, and I just c&p from the JIRA
discussion:

Damian:
> During the code walk-through, Matthias raised a very good point about the use 
> of context().forward being coupled to whether or not caching is enabled. Now 
> that i've had the chance to think about it I have one potential solution for 
> making this transparent to uses of the Processor API.
> 
> We can add another method boolean isCachingEnabled() to the new interface 
> ForwardingStateStoreSupplier. We also add 2 new methods to ProcessorNode:
> boolean isStateStoreCachingEnabled() and void setStateStoreCachingEnabled()
> 
> In TopologyBuilder when we are creating the ProcessorNodeCacheFlushListener 
> to attach to the ForwardingStateStoreSupplier we can call 
> ProcessorNode.setStateStoreCachingEnabled(supplier.isStateStoreCachingEnabled())
> 
> We add an extra boolean parameter to the ProcessorRecordContextImpl forward 
> this will be set to false when constructed from StreamTask and will be set to 
> true when constructed from ProcessorNodeCacheFlushListener. Then in 
> ProcessorRecordContextImpl.forward(..) we add a guard if (shouldForward()) 
> where shouldForward is return forward || !node.stateStoreCachingEnabled();
> 
> Now Processors are free to call context().forward(..) whether caching is 
> enabled or not. If it is enabled the values just wont get forwarded until the 
> cache evicts/flushes them.


Matthias:
> I guess this is a good solution/workaround. I had something like this in my 
> mind during the call, too.
> 
> However, thinking about the root cause of this issue again, I am not sure if 
> the (overall) design of this KIP is optimal or not. My new concern is, that 
> with this caching strategy, we "merge" two concepts into one; and I am not 
> sure, if we should to this.
> 
> Currently, message flow and state is decoupled and independent of each other. 
> Thus, if there is a state, updates to the state are completely independent 
> from emitting output records. With the new design, we merge state updates and 
> record emits, limiting the overall flexibility. I guess, from a DSL point of 
> view, this would not be problematic, because in an aggregation and changelog 
> output, each update to the state should result in a downstream record. 
> However, from a Processor API point of view, there are other patterns we want 
> to be able to support, too.
> 
> Basically, for each input record, there a four different patterns that could 
> be applied by the user:
> 
>     no state updates, no output records
>     only state update
>     only output records
>     state updates and output records
> 
> Right now, we go with a design that allows to use one of the patterns within 
> a Processor. However, all 4 pattern could be mixed within a single Processor 
> (pre KIP design), and this mixture would not be possible any more. If we want 
> to support all four cases, we might not want to merge both into "a single 
> abstraction" as we do in the design of this PR. What if a user just wants to 
> sent a record downstream (without any state manipulation)?
> 
> Going back to the KIP design, we move the cache from RocksDB into the 
> processor. However, what we actually wanted to do was to de-duplicate output 
> records. Thus, the newly introduced cache, could actually go "after the 
> processor" and could be completely independent from the state. Thus, on each 
> call to forward() the record is put into the cache, and if the cache is full, 
> an actual cache eviction and record forwarding happens. This would make the 
> de-duplication cache independent from the state.


Eno:
> it's not entirely true that the flexibility is limited. For example, what's 
> next in implementation is https://issues.apache.org/jira/browse/KAFKA-3779 
> where we add the dedup cache to the to operator. That is not implemented yet.


Damian:
> i think of the 4 patterns you mentioned only the last one changes, i.e, state 
> updates and output records.
> context.forward() still exists so you can just send a record downstream 
> without any state manipulation, that behaviour hasn't changed.






On 08/24/2016 03:35 PM, Eno Thereska wrote:
> Hi folks,
> 
> We've been working on a proof-of-concept for KIP-63 and that can now be
> found at the main JIRA (https://issues.apache.org/jira/browse/KAFKA-3776)
> under PR https://github.com/apache/kafka/pull/1752. It is still work in
> progress, however we are confident that the basic structure is there.
> 
> As part of this work, we've also updated the KIP to clarify several things,
> listed here for convenience:
> 
> - Clarify that the optimization is applicable to aggregations and to
> operators. It is not applicable to joins.
> - Clarify that for the low-level Processor API, we propose to allow users
> for disabling caching on a store-by-store basis using a new
> .enableCaching() call.
> 
> We'll start the voting process shortly for this KIP.
> 
> Thanks
> Eno
> 
> 
> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <eno.there...@gmail.com>
> wrote:
> 
>> Hi there,
>>
>> I have created KIP-63: Unify store and downstream caching in streams
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 63%3A+Unify+store+and+downstream+caching+in+streams
>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams>
>>
>>
>> Feedback is appreciated.
>>
>> Thank you
>> Eno
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to