Hi, I started my new streams app with this commented //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
What I observed was that it started consuming message from latest offset from the source topic. Based on comments by Eno I thought that if offset do not exist then So in practice an app will include the property below (e.g., set to "earliest") However it picked the latest offset. Also I check this props auto.offset.reset in the latest doc and I see it as largest. So now I am confused that in streams application what is the default offset if none exist? note I am using version kafka_2.10-0.10.0.1. Thanks Sachin On Mon, Nov 21, 2016 at 7:08 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Sachin, > > There is no need to check within the app whether the offset exists or not, > since the consumer code will do that check automatically for you. So in > practice an app will include the property below (e.g., set to "earliest"), > but that will only have an effect if the consumer detects that the offsets > do not exist anymore. If the offset exist, then that line is a noop. > > So in summary, I'd just include that property, and no more code changes > are required. > > Thanks > Eno > > > On 21 Nov 2016, at 12:11, Sachin Mittal <sjmit...@gmail.com> wrote: > > > > So in my java code how can I check > > when there is no initial offset in Kafka or if the current offset does > not > > exist any more on the server (e.g. because that data has been deleted) > > > > So in this case as you have said I can set offset as > > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //or > latest > > > > Thanks > > Sachin > > > > > > On Mon, Nov 21, 2016 at 4:16 PM, Eno Thereska <eno.there...@gmail.com > <mailto:eno.there...@gmail.com>> > > wrote: > > > >> Hi Sachin, > >> > >> Currently you can only change the following global configuration by > using > >> "earliest" or "latest", as shown in the code snippet below. As the > Javadoc > >> mentions: "What to do when there is no initial offset in Kafka or if the > >> current offset does not exist any more on the server (e.g. because that > >> data has been deleted)": > >> > >> ... > >> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > >> ... > >> return new KafkaStreams(builder, props) > >> > >> > >> > >> In addition, there is a tool to reset the offsets of all topics to the > >> beginning. This is useful for reprocessing: > https://www.confluent.io/blog/ > >> data-reprocessing-with-kafka-streams-resetting-a-streams-application/ < > >> https://www.confluent.io/blog/data-reprocessing-with- < > https://www.confluent.io/blog/data-reprocessing-with-> > >> kafka-streams-resetting-a-streams-application/> > >> > >> However, there is no option currently for resetting the offset to an > >> arbitrary offset. > >> > >> Thanks > >> Eno > >> > >>> On 21 Nov 2016, at 10:37, Sachin Mittal <sjmit...@gmail.com> wrote: > >>> > >>> Hi > >>> I am running a streaming application with > >>> streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); > >>> > >>> How do I find out the offsets for each of the source, intermediate and > >>> internal topics associated with this application. > >>> > >>> And how can I reset them to some specific value via shell of otherwise. > >>> > >>> Thanks > >>> Sachin > >