Hi, Thanks for the link. What I understand is that when cache.max.bytes.buffering value is reached it will push the aggregation to downstream. What is the default value for the same? And how can I determine my cache size for current stream so as to set an optimal value.
I also suppose the push to downstream based on number of messages aggregated or time elapsed is something of future work planned and not available in the master branch right now? I suppose this part is of more of interest to us. Thanks Sachin On Thu, Dec 1, 2016 at 3:43 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Sachin, > > This landed in 0.10.1, so the docs are at http://kafka.apache.org/0101/ > javadoc/index.html <http://kafka.apache.org/0101/javadoc/index.html>. > > This wiki has a good description of how this works: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 63%3A+Unify+store+and+downstream+caching+in+streams < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+ > downstream+caching+in+streams> > > Eno > > > On 1 Dec 2016, at 10:07, Sachin Mittal <sjmit...@gmail.com> wrote: > > > > Hi, > > I checked the docs > > http://kafka.apache.org/0100/javadoc/index.html class StreamsConfig but > did > > not find this CACHE_MAX_BYTES_BUFFERING_CONFIG setting. > > > > Also on the first option: > > use the record cache to dedup messages with the same key before sending > > downstream > > > > I did not understand this. How does one implement this option. > > > > Thanks > > Sachin > > > > > > On Thu, Dec 1, 2016 at 3:06 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > > > >> Hi Sachin, > >> > >> If you are using the DSL, currently there is no way to do fine-grained > >> control of the downstream sending. There is some coarse-grained control > in > >> that you can use the record cache to dedup messages with the same key > >> before sending downstream, or you can choose to get all records by > setting > >> the cache to 0: > >> e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > BUFFERING_CONFIG, > >> 0); > >> > >> So it looks like you might want to build such logic downstream. > >> > >> Thanks > >> Eno > >> > >>> On 1 Dec 2016, at 09:19, Sachin Mittal <sjmit...@gmail.com> wrote: > >>> > >>> Hi all, > >>> Say I have a pipleline like this > >>> > >>> topic.aggregateByKey( ...) => to downstream > >>> > >>> Now for every message in topic it will call aggregateByKey and send it > to > >>> downstream > >>> > >>> Is there a way to tell the pipeline that if it gets a certain message > >> then > >>> only push the current aggregation result to downstream. > >>> > >>> Or I can do some configuration like until it has aggregated the result > >> of n > >>> messages don't push it to downstream. > >>> > >>> Or any such logic can only be built in the downstream to check and > decide > >>> if it needs to process the current aggregation or not. > >>> > >>> Thanks > >>> Sachin > >> > >> > >