Henry, Thanks for the great feedbacks. I'm making some proposal for adding the control mechanism for latency v.s. data volume tradeoffs, which I will put up to wiki once it is done:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Discussions We can continue the discussion from there. In the end we need to proposal a KIP for this feature. Guozhang On Wed, Apr 20, 2016 at 10:57 PM, Henry Cai <h...@pinterest.com.invalid> wrote: > My use case is actually myTable.aggregate().to("output_topic"), so I need a > way to suppress the number of outputs. > > I don't think correlating the internal cache flush with the output window > emit frequency is ideal. It's too hard for application developer to see > when the cache will be flushed, we would like to see a clearly defined > window emit policy. I think emit on window end (or plus a application > defined delay) is very easy to understand, it's OK to re-emit when we have > late-arrival events since this doesn't happen that often (if we use google > data flow concept, the window end is based on user defined watermark > function which should already add the buffer for most common processing > delays). > > Another problem with cache flush is it actually flushed quite often, e.g. > since the in-memory cache doesn't support rangeScan and most of the lookup > on window-based store needs to do range scan which would trigger the flush > first. > > On Wed, Apr 20, 2016 at 9:13 PM, Jay Kreps <j...@confluent.io> wrote: > > > To summarize a chat session Guozhang and I just had on slack: > > > > We currently do dedupe the output for stateful operations (e.g. join, > > aggregate). They hit an in-memory cache and only produce output to > > rocksdb/kafka when either that cache fills up or the commit period > occurs. > > So the changelog for these operations which is often also the output > > already gets this deduplication. Controlling the commit frequency and > cache > > size is probably the right way to trade off latency of the update vs > update > > volume. > > > > The operation we don't dedupe is to()/through(). So, say if you do an > > operation like > > myTable.aggregate().filter().map().to("output_topic") > > Here the aggregation itself (and hence its changelog) isn't the intended > > output, but rather some transformed version of it. In this case the issue > > you describe is correct, we don't dedupe. There might be several options > > here. One would be for the aggregate to produce deduped output lazily. > The > > other would be for the to() operator to also dedupe. > > > > Personally I feel this idea of caching to suppress output versus is > > actually a better way to model and think about what's going on than > trying > > to have a triggering policy. If you set a triggering policy that says > "only > > output at the end of the window" the reality is that if late data comes > you > > still have to produce additional outputs. So you don't produce one output > > at the end but rather potentially any number of outputs. So a simple way > to > > think about this is that you produce all updates but optimistically > > suppress some duplicates for efficiency. > > > > -Jay > > > > On Wed, Apr 20, 2016 at 5:24 PM, Henry Cai <h...@pinterest.com.invalid> > > wrote: > > > > > 0.10.0.1 is fine for me, I am actually building from trunk head for > > streams > > > package. > > > > > > On Wed, Apr 20, 2016 at 5:06 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > I saw that note, thanks for commenting. > > > > > > > > I are cutting the next 0.10.0.0 RC next week, so I am not certain if > it > > > > will make it for 0.10.0.0. But we can push it to be in 0.10.0.1. > > > > > > > > Guozhang > > > > > > > > On Wed, Apr 20, 2016 at 4:57 PM, Henry Cai > <h...@pinterest.com.invalid > > > > > > > wrote: > > > > > > > > > Thanks. > > > > > > > > > > Do you know when KAFKA-3101 will be implemented? > > > > > > > > > > I also add a note to that JIRA for a left outer join use case which > > > also > > > > > need buffer support. > > > > > > > > > > > > > > > On Wed, Apr 20, 2016 at 4:42 PM, Guozhang Wang <wangg...@gmail.com > > > > > > wrote: > > > > > > > > > > > Henry, > > > > > > > > > > > > I thought you were concerned about consumer memory contention. > > > That's a > > > > > > valid point, and yes, you need to keep those buffered records in > a > > > > > > persistent store. > > > > > > > > > > > > As I mentioned we are trying to do optimize the aggregation > outputs > > > as > > > > in > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3101 > > > > > > > > > > > > Its idea is very similar to buffering, while we keep the > aggregated > > > > > values > > > > > > in RocksDB, we do not send the updated values for each receiving > > > record > > > > > but > > > > > > only do that based on some policy. More generally we can have a > > > trigger > > > > > > mechanism for user to customize when to emit. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Wed, Apr 20, 2016 at 4:03 PM, Henry Cai > > > <h...@pinterest.com.invalid > > > > > > > > > > > wrote: > > > > > > > > > > > > > I think this scheme still has problems. If during 'holding' I > > > > > literally > > > > > > > hold (don't return the method call), I will starve the thread. > > If > > > I > > > > am > > > > > > > writing the output to a in-memory buffer and let the method > > > returns, > > > > > the > > > > > > > kafka stream will acknowledge the record to upstream queue as > > > > > processed, > > > > > > so > > > > > > > I would lose the record if the node crashed after ack but > before > > 10 > > > > > > minutes > > > > > > > is up. > > > > > > > > > > > > > > I guess I need to write the buffered result into a persistent > > > store, > > > > > > > another kafka queue or K/V store. > > > > > > > > > > > > > > On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang < > > wangg...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > By "holding the stream", I assume you are still consuming > data, > > > but > > > > > > just > > > > > > > > that you only write data every 10 minutes instead of upon > each > > > > > received > > > > > > > > record right? > > > > > > > > > > > > > > > > Anyways, in either case, consumer should not have severe > memory > > > > issue > > > > > > as > > > > > > > > Kafka Streams will pause its consuming when enough data is > > > buffered > > > > > at > > > > > > > the > > > > > > > > streams end (note that we have two buffers here, the consumer > > > > buffers > > > > > > raw > > > > > > > > bytes, and the streams library take raw bytes and buffer the > > > > > > > de-serialized > > > > > > > > objects, and threshold on its own buffer to pause / resume > the > > > > > > consumer). > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai > > > > > <h...@pinterest.com.invalid > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > So hold the stream for 15 minutes wouldn't cause too much > > > > > performance > > > > > > > > > problems? > > > > > > > > > > > > > > > > > > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang < > > > > wangg...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Consumer' buffer does not depend on offset committing, > once > > > it > > > > is > > > > > > > given > > > > > > > > > > from the poll() call it is out of the buffer. If offsets > > are > > > > not > > > > > > > > > committed, > > > > > > > > > > then upon failover it will simply re-consumer these > records > > > > again > > > > > > > from > > > > > > > > > > Kafka. > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai > > > > > > > <h...@pinterest.com.invalid > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > For the technique of custom Processor of holding call > to > > > > > > > > > > context.forward(), > > > > > > > > > > > if I hold it for 10 minutes, what does that mean for > the > > > > > consumer > > > > > > > > > > > acknowledgement on source node? > > > > > > > > > > > > > > > > > > > > > > I guess if I hold it for 10 minutes, the consumer is > not > > > > going > > > > > to > > > > > > > ack > > > > > > > > > to > > > > > > > > > > > the upstream queue, will that impact the consumer > > > > performance, > > > > > > will > > > > > > > > > > > consumer's kafka client message buffer overflow when > > there > > > is > > > > > no > > > > > > > ack > > > > > > > > in > > > > > > > > > > 10 > > > > > > > > > > > minutes? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang < > > > > > > wangg...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Yes we are aware of this behavior and are working on > > > > > optimizing > > > > > > > it: > > > > > > > > > > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3101 > > > > > > > > > > > > > > > > > > > > > > > > More generally, we are considering to add a "trigger" > > > > > interface > > > > > > > > > similar > > > > > > > > > > > to > > > > > > > > > > > > the Millwheel model where users can customize when > they > > > > want > > > > > to > > > > > > > > emit > > > > > > > > > > > > outputs to the downstream operators. Unfortunately > for > > > now > > > > > > there > > > > > > > > will > > > > > > > > > > no > > > > > > > > > > > > easy workaround for buffering, and you may want to do > > > this > > > > in > > > > > > app > > > > > > > > > code > > > > > > > > > > > (for > > > > > > > > > > > > example, in a customized Processor where you can > > control > > > > when > > > > > > to > > > > > > > > call > > > > > > > > > > > > context.forward() ). > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas < > > > > > > jklu...@simple.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Is it true that the aggregation and reduction > methods > > > of > > > > > > > KStream > > > > > > > > > will > > > > > > > > > > > > emit > > > > > > > > > > > > > a new output message for each incoming message? > > > > > > > > > > > > > > > > > > > > > > > > > > I have an application that's copying a Postgres > > > > replication > > > > > > > > stream > > > > > > > > > > to a > > > > > > > > > > > > > Kafka topic, and activity tends to be clustered, > with > > > > many > > > > > > > > updates > > > > > > > > > > to a > > > > > > > > > > > > > given primary key happening in quick succession. > I'd > > > like > > > > > to > > > > > > > > smooth > > > > > > > > > > > that > > > > > > > > > > > > > out by buffering the messages in tumbling windows, > > > > allowing > > > > > > the > > > > > > > > > > updates > > > > > > > > > > > > to > > > > > > > > > > > > > overwrite one another, and emitting output messages > > > only > > > > at > > > > > > the > > > > > > > > end > > > > > > > > > > of > > > > > > > > > > > > the > > > > > > > > > > > > > window. > > > > > > > > > > > > > > > > > > > > > > > > > > Does the Kafka Streams API provide any hooks that I > > > could > > > > > use > > > > > > > to > > > > > > > > > > > achieve > > > > > > > > > > > > > this kind of windowed "buffering" or > "deduplication" > > > of a > > > > > > > stream? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > -- -- Guozhang