Hi,
Thanks for the link.

What I understand is that when cache.max.bytes.buffering value is reached
it will push the aggregation to downstream.
What is the default value for the same?
And how can I determine my cache size for current stream so as to set an
optimal value.

I also suppose the push to downstream based on number of messages
aggregated or time elapsed is something of future work planned and not
available in the master branch right now?
I suppose this part is of more of interest to us.

Thanks
Sachin




On Thu, Dec 1, 2016 at 3:43 PM, Eno Thereska <eno.there...@gmail.com> wrote:

> Hi Sachin,
>
> This landed in 0.10.1, so the docs are at http://kafka.apache.org/0101/
> javadoc/index.html <http://kafka.apache.org/0101/javadoc/index.html>.
>
> This wiki has a good description of how this works:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 63%3A+Unify+store+and+downstream+caching+in+streams <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+
> downstream+caching+in+streams>
>
> Eno
>
> > On 1 Dec 2016, at 10:07, Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > Hi,
> > I checked the docs
> > http://kafka.apache.org/0100/javadoc/index.html class StreamsConfig but
> did
> > not find this CACHE_MAX_BYTES_BUFFERING_CONFIG setting.
> >
> > Also on the first option:
> > use the record cache to dedup messages with the same key before sending
> > downstream
> >
> > I did not understand this. How does one implement this option.
> >
> > Thanks
> > Sachin
> >
> >
> > On Thu, Dec 1, 2016 at 3:06 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
> >
> >> Hi Sachin,
> >>
> >> If you are using the DSL, currently there is no way to do fine-grained
> >> control of the downstream sending. There is some coarse-grained control
> in
> >> that you can use the record cache to dedup messages with the same key
> >> before sending downstream, or you can choose to get all records by
> setting
> >> the cache to 0:
> >> e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> BUFFERING_CONFIG,
> >> 0);
> >>
> >> So it looks like you might want to build such logic downstream.
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 1 Dec 2016, at 09:19, Sachin Mittal <sjmit...@gmail.com> wrote:
> >>>
> >>> Hi all,
> >>> Say I have a pipleline like this
> >>>
> >>> topic.aggregateByKey( ...) => to downstream
> >>>
> >>> Now for every message in topic it will call aggregateByKey and send it
> to
> >>> downstream
> >>>
> >>> Is there a way to tell the pipeline that if it gets a certain message
> >> then
> >>> only push the current aggregation result to downstream.
> >>>
> >>> Or I can do some configuration like until it has aggregated the result
> >> of n
> >>> messages don't push it to downstream.
> >>>
> >>> Or any such logic can only be built in the downstream to check and
> decide
> >>> if it needs to process the current aggregation or not.
> >>>
> >>> Thanks
> >>> Sachin
> >>
> >>
>
>

Reply via email to