My use case is actually myTable.aggregate().to("output_topic"), so I need a way to suppress the number of outputs.
I don't think correlating the internal cache flush with the output window emit frequency is ideal. It's too hard for application developer to see when the cache will be flushed, we would like to see a clearly defined window emit policy. I think emit on window end (or plus a application defined delay) is very easy to understand, it's OK to re-emit when we have late-arrival events since this doesn't happen that often (if we use google data flow concept, the window end is based on user defined watermark function which should already add the buffer for most common processing delays). Another problem with cache flush is it actually flushed quite often, e.g. since the in-memory cache doesn't support rangeScan and most of the lookup on window-based store needs to do range scan which would trigger the flush first. On Wed, Apr 20, 2016 at 9:13 PM, Jay Kreps <j...@confluent.io> wrote: > To summarize a chat session Guozhang and I just had on slack: > > We currently do dedupe the output for stateful operations (e.g. join, > aggregate). They hit an in-memory cache and only produce output to > rocksdb/kafka when either that cache fills up or the commit period occurs. > So the changelog for these operations which is often also the output > already gets this deduplication. Controlling the commit frequency and cache > size is probably the right way to trade off latency of the update vs update > volume. > > The operation we don't dedupe is to()/through(). So, say if you do an > operation like > myTable.aggregate().filter().map().to("output_topic") > Here the aggregation itself (and hence its changelog) isn't the intended > output, but rather some transformed version of it. In this case the issue > you describe is correct, we don't dedupe. There might be several options > here. One would be for the aggregate to produce deduped output lazily. The > other would be for the to() operator to also dedupe. > > Personally I feel this idea of caching to suppress output versus is > actually a better way to model and think about what's going on than trying > to have a triggering policy. If you set a triggering policy that says "only > output at the end of the window" the reality is that if late data comes you > still have to produce additional outputs. So you don't produce one output > at the end but rather potentially any number of outputs. So a simple way to > think about this is that you produce all updates but optimistically > suppress some duplicates for efficiency. > > -Jay > > On Wed, Apr 20, 2016 at 5:24 PM, Henry Cai <h...@pinterest.com.invalid> > wrote: > > > 0.10.0.1 is fine for me, I am actually building from trunk head for > streams > > package. > > > > On Wed, Apr 20, 2016 at 5:06 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > I saw that note, thanks for commenting. > > > > > > I are cutting the next 0.10.0.0 RC next week, so I am not certain if it > > > will make it for 0.10.0.0. But we can push it to be in 0.10.0.1. > > > > > > Guozhang > > > > > > On Wed, Apr 20, 2016 at 4:57 PM, Henry Cai <h...@pinterest.com.invalid > > > > > wrote: > > > > > > > Thanks. > > > > > > > > Do you know when KAFKA-3101 will be implemented? > > > > > > > > I also add a note to that JIRA for a left outer join use case which > > also > > > > need buffer support. > > > > > > > > > > > > On Wed, Apr 20, 2016 at 4:42 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Henry, > > > > > > > > > > I thought you were concerned about consumer memory contention. > > That's a > > > > > valid point, and yes, you need to keep those buffered records in a > > > > > persistent store. > > > > > > > > > > As I mentioned we are trying to do optimize the aggregation outputs > > as > > > in > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3101 > > > > > > > > > > Its idea is very similar to buffering, while we keep the aggregated > > > > values > > > > > in RocksDB, we do not send the updated values for each receiving > > record > > > > but > > > > > only do that based on some policy. More generally we can have a > > trigger > > > > > mechanism for user to customize when to emit. > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Wed, Apr 20, 2016 at 4:03 PM, Henry Cai > > <h...@pinterest.com.invalid > > > > > > > > > wrote: > > > > > > > > > > > I think this scheme still has problems. If during 'holding' I > > > > literally > > > > > > hold (don't return the method call), I will starve the thread. > If > > I > > > am > > > > > > writing the output to a in-memory buffer and let the method > > returns, > > > > the > > > > > > kafka stream will acknowledge the record to upstream queue as > > > > processed, > > > > > so > > > > > > I would lose the record if the node crashed after ack but before > 10 > > > > > minutes > > > > > > is up. > > > > > > > > > > > > I guess I need to write the buffered result into a persistent > > store, > > > > > > another kafka queue or K/V store. > > > > > > > > > > > > On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang < > wangg...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > By "holding the stream", I assume you are still consuming data, > > but > > > > > just > > > > > > > that you only write data every 10 minutes instead of upon each > > > > received > > > > > > > record right? > > > > > > > > > > > > > > Anyways, in either case, consumer should not have severe memory > > > issue > > > > > as > > > > > > > Kafka Streams will pause its consuming when enough data is > > buffered > > > > at > > > > > > the > > > > > > > streams end (note that we have two buffers here, the consumer > > > buffers > > > > > raw > > > > > > > bytes, and the streams library take raw bytes and buffer the > > > > > > de-serialized > > > > > > > objects, and threshold on its own buffer to pause / resume the > > > > > consumer). > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai > > > > <h...@pinterest.com.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > So hold the stream for 15 minutes wouldn't cause too much > > > > performance > > > > > > > > problems? > > > > > > > > > > > > > > > > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang < > > > wangg...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Consumer' buffer does not depend on offset committing, once > > it > > > is > > > > > > given > > > > > > > > > from the poll() call it is out of the buffer. If offsets > are > > > not > > > > > > > > committed, > > > > > > > > > then upon failover it will simply re-consumer these records > > > again > > > > > > from > > > > > > > > > Kafka. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai > > > > > > <h...@pinterest.com.invalid > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > For the technique of custom Processor of holding call to > > > > > > > > > context.forward(), > > > > > > > > > > if I hold it for 10 minutes, what does that mean for the > > > > consumer > > > > > > > > > > acknowledgement on source node? > > > > > > > > > > > > > > > > > > > > I guess if I hold it for 10 minutes, the consumer is not > > > going > > > > to > > > > > > ack > > > > > > > > to > > > > > > > > > > the upstream queue, will that impact the consumer > > > performance, > > > > > will > > > > > > > > > > consumer's kafka client message buffer overflow when > there > > is > > > > no > > > > > > ack > > > > > > > in > > > > > > > > > 10 > > > > > > > > > > minutes? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang < > > > > > wangg...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Yes we are aware of this behavior and are working on > > > > optimizing > > > > > > it: > > > > > > > > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3101 > > > > > > > > > > > > > > > > > > > > > > More generally, we are considering to add a "trigger" > > > > interface > > > > > > > > similar > > > > > > > > > > to > > > > > > > > > > > the Millwheel model where users can customize when they > > > want > > > > to > > > > > > > emit > > > > > > > > > > > outputs to the downstream operators. Unfortunately for > > now > > > > > there > > > > > > > will > > > > > > > > > no > > > > > > > > > > > easy workaround for buffering, and you may want to do > > this > > > in > > > > > app > > > > > > > > code > > > > > > > > > > (for > > > > > > > > > > > example, in a customized Processor where you can > control > > > when > > > > > to > > > > > > > call > > > > > > > > > > > context.forward() ). > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas < > > > > > jklu...@simple.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Is it true that the aggregation and reduction methods > > of > > > > > > KStream > > > > > > > > will > > > > > > > > > > > emit > > > > > > > > > > > > a new output message for each incoming message? > > > > > > > > > > > > > > > > > > > > > > > > I have an application that's copying a Postgres > > > replication > > > > > > > stream > > > > > > > > > to a > > > > > > > > > > > > Kafka topic, and activity tends to be clustered, with > > > many > > > > > > > updates > > > > > > > > > to a > > > > > > > > > > > > given primary key happening in quick succession. I'd > > like > > > > to > > > > > > > smooth > > > > > > > > > > that > > > > > > > > > > > > out by buffering the messages in tumbling windows, > > > allowing > > > > > the > > > > > > > > > updates > > > > > > > > > > > to > > > > > > > > > > > > overwrite one another, and emitting output messages > > only > > > at > > > > > the > > > > > > > end > > > > > > > > > of > > > > > > > > > > > the > > > > > > > > > > > > window. > > > > > > > > > > > > > > > > > > > > > > > > Does the Kafka Streams API provide any hooks that I > > could > > > > use > > > > > > to > > > > > > > > > > achieve > > > > > > > > > > > > this kind of windowed "buffering" or "deduplication" > > of a > > > > > > stream? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > >