Hello Maria, We have some thoughts about supporting finer grained flow controls in Kafka Streams https://issues.apache.org/jira/browse/KAFKA-3478 as part of a big effort to improve re-processing user experience, which covers this use case. We are shooting to have this post 0.10.0.0.
As for now, one work-around I can think of is that upon restart / re-processing, you can delete the offsets through an admin request (look at ConsumerGroupCommand). Guozhang On Thu, Apr 21, 2016 at 6:19 AM, Maria Abramiuc <maria.abram...@gmail.com> wrote: > Kafka Streams look great, but there is one thing I don't seem to find a way > to do: > > - read a topic from beginning even if there is a offset saved: > > I have : > > props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > > this works as described if there is no offset save. > > For a normal consumer we have: > > seekToBeginning > > > In KafkaConsumer: > > Line 132: > > NetworkClient netClient = new NetworkClient(new > Selector(config.getLong("connections.max.idle.ms").longValue(), > this.metrics, this.time, metricGrpPrefix, metricsTags, > channelBuilder), this.metadata, this.clientId, 100, > config.getLong("reconnect.backoff.ms").longValue(), > config.getInt("send.buffer.bytes").intValue(), > config.getInt("receive.buffer.bytes").intValue(), > config.getInt("request.timeout.ms").intValue(), this.time); > this.client = new ConsumerNetworkClient(netClient, this.metadata, > this.time, this.retryBackoffMs); > OffsetResetStrategy offsetResetStrategy = > > OffsetResetStrategy.valueOf(config.getString("auto.offset.reset").toUpperCase()); > this.subscriptions = new SubscriptionState(offsetResetStrategy); > > I can't find any way to set the consumer using StreamsConfig > properties to seekToBeginning or to set subscriptionState to need > offset reset. > > > > *Is there a way to force the consumption of a topic from begging using > Kafka Streams?* > > > Thank you for all the support provided, > > Maria Abramiuc > -- -- Guozhang