Hi,

reading a topic twice -- what it the first requirement you have -- is
not possible (and not necessary IMHO) with Streams API -- regardless of
a "delayed" read. The reason is, that Streams uses a single consumer
group.id internally and thus, Streams can commit only one offset per
topic-partitions, but reading a topic twice would require to commit two
offsets.

But as mentioned already, I don't think you need to read the data twice
anyway. You can just read it once, and buffer all data from the last 60
minutes within your application.

I am not sure, how you identify "related" records. It would be good to
understand some more details to give better help. Maybe you can use
window-aggregation or a windowed-join.

In any case, you can fall back to lower-lever Processor API -- it gives
you full control and allows you to express any logic you need.

Hope this helps.

-Matthias

On 4/13/17 4:03 PM, Marcos Juarez wrote:
> I'm building a prototype with Kafka Streams that will be consuming from the
> same topic twice, once with no delay, just like any normal consumer, and
> once with a 60 minute delay, using the new timestamp-per-message field.  It
> will also store state coming from other topics that are being read
> simultaneously.
> 
> The reason why I'm consuming twice from the same topic, with one of them
> delayed, is that our processor needs to know, for any particular record, if
> there are any more records related to that coming within the next 60
> minutes, and change some of the fields accordingly, before sending them
> down as the final version of that event.
> 
> Everything seems to be supported by the normal Streams DSL, except for a
> way to specify a consumption delay from a particular source topic.
> 
> I know I can just create a delayed topic myself, and then consume from
> that, but the topic volume in this case prevents us from doing this.  I
> don't want to duplicate the data and load in the cluster.  This topic
> currently handles ~2B records per day, and will grow in the future to
> potentially 10B records per day.
> 
> Any ideas on how I could handle this using Kafka Streams?  Or if it's
> better to use the lower level Streams API for that, can you point me to a
> starting point?  I've been reading docs and javadocs for a while now, and
> I'm not sure where I would add/configure this.
> 
> Thanks!
> 
> Marcos Juarez
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to