Henry,

I thought you were concerned about consumer memory contention. That's a
valid point, and yes, you need to keep those buffered records in a
persistent store.

As I mentioned we are trying to do optimize the aggregation outputs as in

https://issues.apache.org/jira/browse/KAFKA-3101

Its idea is very similar to buffering, while we keep the aggregated values
in RocksDB, we do not send the updated values for each receiving record but
only do that based on some policy. More generally we can have a trigger
mechanism for user to customize when to emit.


Guozhang


On Wed, Apr 20, 2016 at 4:03 PM, Henry Cai <h...@pinterest.com.invalid>
wrote:

> I think this scheme still has problems.  If during 'holding' I literally
> hold (don't return the method call), I will starve the thread.  If I am
> writing the output to a in-memory buffer and let the method returns, the
> kafka stream will acknowledge the record to upstream queue as processed, so
> I would lose the record if the node crashed after ack but before 10 minutes
> is up.
>
> I guess I need to write the buffered result into a persistent store,
> another kafka queue or K/V store.
>
> On Wed, Apr 20, 2016 at 3:49 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > By "holding the stream", I assume you are still consuming data, but just
> > that you only write data every 10 minutes instead of upon each received
> > record right?
> >
> > Anyways, in either case, consumer should not have severe memory issue as
> > Kafka Streams will pause its consuming when enough data is buffered at
> the
> > streams end (note that we have two buffers here, the consumer buffers raw
> > bytes, and the streams library take raw bytes and buffer the
> de-serialized
> > objects, and threshold on its own buffer to pause / resume the consumer).
> >
> >
> > Guozhang
> >
> > On Wed, Apr 20, 2016 at 3:35 PM, Henry Cai <h...@pinterest.com.invalid>
> > wrote:
> >
> > > So hold the stream for 15 minutes wouldn't cause too much performance
> > > problems?
> > >
> > > On Wed, Apr 20, 2016 at 3:16 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Consumer' buffer does not depend on offset committing, once it is
> given
> > > > from the poll() call it is out of the buffer. If offsets are not
> > > committed,
> > > > then upon failover it will simply re-consumer these records again
> from
> > > > Kafka.
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Apr 19, 2016 at 11:34 PM, Henry Cai
> <h...@pinterest.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > For the technique of custom Processor of holding call to
> > > > context.forward(),
> > > > > if I hold it for 10 minutes, what does that mean for the consumer
> > > > > acknowledgement on source node?
> > > > >
> > > > > I guess if I hold it for 10 minutes, the consumer is not going to
> ack
> > > to
> > > > > the upstream queue, will that impact the consumer performance, will
> > > > > consumer's kafka client message buffer overflow when there is no
> ack
> > in
> > > > 10
> > > > > minutes?
> > > > >
> > > > >
> > > > > On Tue, Apr 19, 2016 at 6:10 PM, Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Yes we are aware of this behavior and are working on optimizing
> it:
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-3101
> > > > > >
> > > > > > More generally, we are considering to add a "trigger" interface
> > > similar
> > > > > to
> > > > > > the Millwheel model where users can customize when they want to
> > emit
> > > > > > outputs to the downstream operators. Unfortunately for now there
> > will
> > > > no
> > > > > > easy workaround for buffering, and you may want to do this in app
> > > code
> > > > > (for
> > > > > > example, in a customized Processor where you can control when to
> > call
> > > > > > context.forward() ).
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Tue, Apr 19, 2016 at 1:40 PM, Jeff Klukas <jklu...@simple.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Is it true that the aggregation and reduction methods of
> KStream
> > > will
> > > > > > emit
> > > > > > > a new output message for each incoming message?
> > > > > > >
> > > > > > > I have an application that's copying a Postgres replication
> > stream
> > > > to a
> > > > > > > Kafka topic, and activity tends to be clustered, with many
> > updates
> > > > to a
> > > > > > > given primary key happening in quick succession. I'd like to
> > smooth
> > > > > that
> > > > > > > out by buffering the messages in tumbling windows, allowing the
> > > > updates
> > > > > > to
> > > > > > > overwrite one another, and emitting output messages only at the
> > end
> > > > of
> > > > > > the
> > > > > > > window.
> > > > > > >
> > > > > > > Does the Kafka Streams API provide any hooks that I could use
> to
> > > > > achieve
> > > > > > > this kind of windowed "buffering" or "deduplication" of a
> stream?
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to