That means in order to process filtered out records in a next batch, we have to seek KafkaStreams back, right?
On Tue, May 9, 2017 at 1:19 AM, Matthias J. Sax <matth...@confluent.io> wrote: > I see. > > I you do the step of storing the end offsets in your database before > starting up Streams this would work. > > What you could do as a work around (even if it might not be a nice > solution), is to apply a `transform()` as your first operator. Within > `transfrom()` you get access to there current record offset via > `context.offset` (context object is provided via `init()`). Thus, you > can implement an "offset filter" and also track if all partitions did > reach their end offset (you also get a records partitions via context). > > Thus, if one record is after the partition end-offset, you just filter > the record out. If all partitions did reach end-offset, you can set a > flag to notify you "main" thread to close() Kafka Streams instances. > > Does this make sense? > > > -Matthias > > On 5/8/17 12:49 PM, Timur Yusupov wrote: > > Matthias, > > > > Thanks for your answers. > > > >>> So we are considering to just pause specific > >>> topic partitions as soon as we arrive to stop offsets for them. > >> I am just wondering how you would do this in a fault-tolerant way (if > you > > would have pause API)? > > On start of batch cycle we have to store somewhere (for our use case > > database we already use will work) end offsets for topic partitions we > are > > interested in. Then we just need to process all messages up to stored end > > offsets. In case application is restarted - it first checks database for > > stored end offsets. > > > >>> 2) Assume we process multiple topics in some parallel way and want to > > pause > >>> some topics while waiting for other topics to catch up. > >> Streams synchronizes topics on time automatically for your. So I am > > wondering why this does not work for you? > > Right, this is probably a bad example, but use case 1) with batch > > processing is still relevant. > > > > > >> > >> -Matthias > >> > >> > >> On 4/27/17 8:52 AM, Timur Yusupov wrote: > >>> I see it is possible to pause specific topic partition consumption when > >>> using KafkaConsumer directly, but looks like it is not possible when > >> using > >>> KafkaStreams. > >>> > >>> There are following use cases for that: > >>> 1) Doing batch processing using Kafka Streams (I found > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams > >>> proposal for Kafka Streams, but according to > >>> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active > >>> development on that side). So we are considering to just pause specific > >>> topic partitions as soon as we arrive to stop offsets for them. > >>> > >>> 2) Assume we process multiple topics in some parallel way and want to > >> pause > >>> some topics while waiting for other topics to catch up. > >>> > >>> Actually, the first use case is more important for us, so would be good > >> to > >>> know if there is a possibility or some improvements are already planned > >> for > >>> allowing to pause specific topic partition consumption in KafkaStream. > >>> > >> > >> > > > > > > -- С наилучшими пожеланиями, Тимур.