Henry,

Thanks for the great feedbacks. I'm making some proposal for adding the
control mechanism for latency v.s. data volume tradeoffs, which I will put
up to wiki once it is done:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Discussions

We can continue the discussion from there. In the end we need to proposal a
KIP for this feature.


Guozhang


On Wed, Apr 20, 2016 at 10:57 PM, Henry Cai <h...@pinterest.com.invalid>
wrote:

> My use case is actually myTable.aggregate().to("output_topic"), so I need a
> way to suppress the number of outputs.
>
> I don't think correlating the internal cache flush with the output window
> emit frequency is ideal.  It's too hard for application developer to see
> when the cache will be flushed, we would like to see a clearly defined
> window emit policy.  I think emit on window end (or plus a application
> defined delay) is very easy to understand, it's OK to re-emit when we have
> late-arrival events since this doesn't happen that often (if we use google
> data flow concept, the window end is based on user defined watermark
> function which should already add the buffer for most common processing
> delays).
>
> Another problem with cache flush is it actually flushed quite often, e.g.
> since the in-memory cache doesn't support rangeScan and most of the lookup
> on window-based store needs to do range scan which would trigger the flush
> first.
>
> On Wed, Apr 20, 2016 at 9:13 PM, Jay Kreps <j...@confluent.io> wrote:
>
> > To summarize a chat session Guozhang and I just had on slack:
> >
> > We currently do dedupe the output for stateful operations (e.g. join,
> > aggregate). They hit an in-memory cache and only produce output to
> > rocksdb/kafka when either that cache fills up or the commit period
> occurs.
> > So the changelog for these operations which is often also the output
> > already gets this deduplication. Controlling the commit frequency and
> cache
> > size is probably the right way to trade off latency of the update vs
> update
> > volume.
> >
> > The operation we don't dedupe is to()/through(). So, say if you do an
> > operation like
> >    myTable.aggregate().filter().map().to("output_topic")
> > Here the aggregation itself (and hence its changelog) isn't the intended
> > output, but rather some transformed version of it. In this case the issue
> > you describe is correct, we don't dedupe. There might be several options
> > here. One would be for the aggregate to produce deduped output lazily.
> The
> > other would be for the to() operator to also dedupe.
> >
> > Personally I feel this idea of caching to suppress output versus is
> > actually a better way to model and think about what's going on than
> trying
> > to have a triggering policy. If you set a triggering policy that says
> "only
> > output at the end of the window" the reality is that if late data comes
> you
> > still have to produce additional outputs. So you don't produce one output
> > at the end but rather potentially any number of outputs. So a simple way
> to
> > think about this is that you produce all updates but optimistically
> > suppress some duplicates for efficiency.
> >
> > -Jay
> >
> > On Wed, Apr 20, 2016 at 5:24 PM, Henry Cai <h...@pinterest.com.invalid>
> > wrote:
> >
> > > 0.10.0.1 is fine for me, I am actually building from trunk head for
> > streams
> > > package.
> > >
> > > On Wed, Apr 20, 2016 at 5:06 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > I saw that note, thanks for commenting.
> > > >
> > > > I are cutting the next 0.10.0.0 RC next week, so I am not certain if
> it
> > > > will make it for 0.10.0.0. But we can push it to be in 0.10.0.1.
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Apr 20, 2016 at 4:57 PM, Henry Cai
> <h...@pinterest.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > Thanks.
> > > > >
> > > > > Do you know when KAFKA-3101 will be implemented?
> > > > >
> > > > > I also add a note to that JIRA for a left outer join use case which
> > > also
> > > > > need buffer support.
> > > > >
> > > > >
> > > > > On Wed, Apr 20, 2016 at 4:42 PM, Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Henry,
> > > > > >
> > > > > > I thought you were concerned about consumer memory contention.
> > > That's a
> > > > > > valid point, and yes, you need to keep those buffered records in
> a
> > > > > > persistent store.
> > > > > >
> > > > > > As I mentioned we are trying to do optimize the aggregation
> outputs
> > > as
> > > > in
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > > >
> > > > > > Its idea is very similar to buffering, while we keep the
> aggregated
> > > > > values
> > > > > > in RocksDB, we do not send the updated values for each receiving
> > > record
> > > > > but
> > > > > > only do that based on some policy. More generally we can have a
> > > trigger
> > > > > > mechanism for user to customize when to emit.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Wed, Apr 20, 2016 at 4:03 PM, Henry Cai
> > > <h...@pinterest.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I think this scheme still has problems.  If during 'holding' I
> > > > > literally
> > > > > > > hold (don't return the method call), I will starve the thread.
> > If
> > > I
> > > > am
> > > > > > > writing the output to a in-memory buffer and let the method
> > > returns,
> > > > > the
> > > > > > > kafka stream will acknowledge the record to upstream queue as
> > > > > processed,
> > > > > > so
> > > > > > > I would lose the record if the node crashed after ack but
> before
> > 10
> > > > > > minutes
> > > > > > > is up.
> > > > > > >
> > > > > > > I guess I need to write the buffered result into a persistent
> > > store,
> > > > > > > another kafka queue or K/V store.
> > > > > > >
> > > > > > > On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > By "holding the stream", I assume you are still consuming
> data,
> > > but
> > > > > > just
> > > > > > > > that you only write data every 10 minutes instead of upon
> each
> > > > > received
> > > > > > > > record right?
> > > > > > > >
> > > > > > > > Anyways, in either case, consumer should not have severe
> memory
> > > > issue
> > > > > > as
> > > > > > > > Kafka Streams will pause its consuming when enough data is
> > > buffered
> > > > > at
> > > > > > > the
> > > > > > > > streams end (note that we have two buffers here, the consumer
> > > > buffers
> > > > > > raw
> > > > > > > > bytes, and the streams library take raw bytes and buffer the
> > > > > > > de-serialized
> > > > > > > > objects, and threshold on its own buffer to pause / resume
> the
> > > > > > consumer).
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > > On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai
> > > > > <h...@pinterest.com.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > So hold the stream for 15 minutes wouldn't cause too much
> > > > > performance
> > > > > > > > > problems?
> > > > > > > > >
> > > > > > > > > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <
> > > > wangg...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Consumer' buffer does not depend on offset committing,
> once
> > > it
> > > > is
> > > > > > > given
> > > > > > > > > > from the poll() call it is out of the buffer. If offsets
> > are
> > > > not
> > > > > > > > > committed,
> > > > > > > > > > then upon failover it will simply re-consumer these
> records
> > > > again
> > > > > > > from
> > > > > > > > > > Kafka.
> > > > > > > > > >
> > > > > > > > > > Guozhang
> > > > > > > > > >
> > > > > > > > > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai
> > > > > > > <h...@pinterest.com.invalid
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > For the technique of custom Processor of holding call
> to
> > > > > > > > > > context.forward(),
> > > > > > > > > > > if I hold it for 10 minutes, what does that mean for
> the
> > > > > consumer
> > > > > > > > > > > acknowledgement on source node?
> > > > > > > > > > >
> > > > > > > > > > > I guess if I hold it for 10 minutes, the consumer is
> not
> > > > going
> > > > > to
> > > > > > > ack
> > > > > > > > > to
> > > > > > > > > > > the upstream queue, will that impact the consumer
> > > > performance,
> > > > > > will
> > > > > > > > > > > consumer's kafka client message buffer overflow when
> > there
> > > is
> > > > > no
> > > > > > > ack
> > > > > > > > in
> > > > > > > > > > 10
> > > > > > > > > > > minutes?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <
> > > > > > wangg...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Yes we are aware of this behavior and are working on
> > > > > optimizing
> > > > > > > it:
> > > > > > > > > > > >
> > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > > > > > > > > >
> > > > > > > > > > > > More generally, we are considering to add a "trigger"
> > > > > interface
> > > > > > > > > similar
> > > > > > > > > > > to
> > > > > > > > > > > > the Millwheel model where users can customize when
> they
> > > > want
> > > > > to
> > > > > > > > emit
> > > > > > > > > > > > outputs to the downstream operators. Unfortunately
> for
> > > now
> > > > > > there
> > > > > > > > will
> > > > > > > > > > no
> > > > > > > > > > > > easy workaround for buffering, and you may want to do
> > > this
> > > > in
> > > > > > app
> > > > > > > > > code
> > > > > > > > > > > (for
> > > > > > > > > > > > example, in a customized Processor where you can
> > control
> > > > when
> > > > > > to
> > > > > > > > call
> > > > > > > > > > > > context.forward() ).
> > > > > > > > > > > >
> > > > > > > > > > > > Guozhang
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <
> > > > > > jklu...@simple.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Is it true that the aggregation and reduction
> methods
> > > of
> > > > > > > KStream
> > > > > > > > > will
> > > > > > > > > > > > emit
> > > > > > > > > > > > > a new output message for each incoming message?
> > > > > > > > > > > > >
> > > > > > > > > > > > > I have an application that's copying a Postgres
> > > > replication
> > > > > > > > stream
> > > > > > > > > > to a
> > > > > > > > > > > > > Kafka topic, and activity tends to be clustered,
> with
> > > > many
> > > > > > > > updates
> > > > > > > > > > to a
> > > > > > > > > > > > > given primary key happening in quick succession.
> I'd
> > > like
> > > > > to
> > > > > > > > smooth
> > > > > > > > > > > that
> > > > > > > > > > > > > out by buffering the messages in tumbling windows,
> > > > allowing
> > > > > > the
> > > > > > > > > > updates
> > > > > > > > > > > > to
> > > > > > > > > > > > > overwrite one another, and emitting output messages
> > > only
> > > > at
> > > > > > the
> > > > > > > > end
> > > > > > > > > > of
> > > > > > > > > > > > the
> > > > > > > > > > > > > window.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Does the Kafka Streams API provide any hooks that I
> > > could
> > > > > use
> > > > > > > to
> > > > > > > > > > > achieve
> > > > > > > > > > > > > this kind of windowed "buffering" or
> "deduplication"
> > > of a
> > > > > > > stream?
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to