Hi, reading a topic twice -- what it the first requirement you have -- is not possible (and not necessary IMHO) with Streams API -- regardless of a "delayed" read. The reason is, that Streams uses a single consumer group.id internally and thus, Streams can commit only one offset per topic-partitions, but reading a topic twice would require to commit two offsets.
But as mentioned already, I don't think you need to read the data twice anyway. You can just read it once, and buffer all data from the last 60 minutes within your application. I am not sure, how you identify "related" records. It would be good to understand some more details to give better help. Maybe you can use window-aggregation or a windowed-join. In any case, you can fall back to lower-lever Processor API -- it gives you full control and allows you to express any logic you need. Hope this helps. -Matthias On 4/13/17 4:03 PM, Marcos Juarez wrote: > I'm building a prototype with Kafka Streams that will be consuming from the > same topic twice, once with no delay, just like any normal consumer, and > once with a 60 minute delay, using the new timestamp-per-message field. It > will also store state coming from other topics that are being read > simultaneously. > > The reason why I'm consuming twice from the same topic, with one of them > delayed, is that our processor needs to know, for any particular record, if > there are any more records related to that coming within the next 60 > minutes, and change some of the fields accordingly, before sending them > down as the final version of that event. > > Everything seems to be supported by the normal Streams DSL, except for a > way to specify a consumption delay from a particular source topic. > > I know I can just create a delayed topic myself, and then consume from > that, but the topic volume in this case prevents us from doing this. I > don't want to duplicate the data and load in the cluster. This topic > currently handles ~2B records per day, and will grow in the future to > potentially 10B records per day. > > Any ideas on how I could handle this using Kafka Streams? Or if it's > better to use the lower level Streams API for that, can you point me to a > starting point? I've been reading docs and javadocs for a while now, and > I'm not sure where I would add/configure this. > > Thanks! > > Marcos Juarez >
signature.asc
Description: OpenPGP digital signature