Got it, thanks, Matthias! On Tue, May 9, 2017 at 2:07 AM, Matthias J. Sax <matth...@confluent.io> wrote:
> Yes. That is something you would need to do external too. > > There is a KIP for a tool > (https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 122%3A+Add+Reset+Consumer+Group+Offsets+tooling) > -- but you can also do this using a single `KafkaConsumer` with > `group.id == application.id` that gets all partitions assigned and does > the corresponding seek plus commitSync. > > Note, you need to make sure that Streams "consumer group" is completely > inactive to avoid conflict. To could add a check similar to > https://github.com/apache/kafka/blob/trunk/core/src/ > main/scala/kafka/tools/StreamsResetter.java#L94 > > > -Matthias > > On 5/8/17 4:01 PM, Timur Yusupov wrote: > > That means in order to process filtered out records in a next batch, we > > have to seek KafkaStreams back, right? > > > > On Tue, May 9, 2017 at 1:19 AM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> I see. > >> > >> I you do the step of storing the end offsets in your database before > >> starting up Streams this would work. > >> > >> What you could do as a work around (even if it might not be a nice > >> solution), is to apply a `transform()` as your first operator. Within > >> `transfrom()` you get access to there current record offset via > >> `context.offset` (context object is provided via `init()`). Thus, you > >> can implement an "offset filter" and also track if all partitions did > >> reach their end offset (you also get a records partitions via context). > >> > >> Thus, if one record is after the partition end-offset, you just filter > >> the record out. If all partitions did reach end-offset, you can set a > >> flag to notify you "main" thread to close() Kafka Streams instances. > >> > >> Does this make sense? > >> > >> > >> -Matthias > >> > >> On 5/8/17 12:49 PM, Timur Yusupov wrote: > >>> Matthias, > >>> > >>> Thanks for your answers. > >>> > >>>>> So we are considering to just pause specific > >>>>> topic partitions as soon as we arrive to stop offsets for them. > >>>> I am just wondering how you would do this in a fault-tolerant way (if > >> you > >>> would have pause API)? > >>> On start of batch cycle we have to store somewhere (for our use case > >>> database we already use will work) end offsets for topic partitions we > >> are > >>> interested in. Then we just need to process all messages up to stored > end > >>> offsets. In case application is restarted - it first checks database > for > >>> stored end offsets. > >>> > >>>>> 2) Assume we process multiple topics in some parallel way and want to > >>> pause > >>>>> some topics while waiting for other topics to catch up. > >>>> Streams synchronizes topics on time automatically for your. So I am > >>> wondering why this does not work for you? > >>> Right, this is probably a bad example, but use case 1) with batch > >>> processing is still relevant. > >>> > >>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> On 4/27/17 8:52 AM, Timur Yusupov wrote: > >>>>> I see it is possible to pause specific topic partition consumption > when > >>>>> using KafkaConsumer directly, but looks like it is not possible when > >>>> using > >>>>> KafkaStreams. > >>>>> > >>>>> There are following use cases for that: > >>>>> 1) Doing batch processing using Kafka Streams (I found > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams > >>>>> proposal for Kafka Streams, but according to > >>>>> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active > >>>>> development on that side). So we are considering to just pause > specific > >>>>> topic partitions as soon as we arrive to stop offsets for them. > >>>>> > >>>>> 2) Assume we process multiple topics in some parallel way and want to > >>>> pause > >>>>> some topics while waiting for other topics to catch up. > >>>>> > >>>>> Actually, the first use case is more important for us, so would be > good > >>>> to > >>>>> know if there is a possibility or some improvements are already > planned > >>>> for > >>>>> allowing to pause specific topic partition consumption in > KafkaStream. > >>>>> > >>>> > >>>> > >>> > >>> > >> > >> > > > > > >