Hi,
I started my new streams app with this commented
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

What I observed was that it started consuming message from latest offset
from the source topic.
Based on comments by Eno I thought that if offset do not exist then

So in practice an app will include the property below (e.g., set to
"earliest")

However it picked the latest offset. Also I check this props
auto.offset.reset in the latest doc and I see it as largest.

So now I am confused that in streams application what is the default offset
if none exist?

note I am using version kafka_2.10-0.10.0.1.

Thanks
Sachin



On Mon, Nov 21, 2016 at 7:08 PM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Sachin,
>
> There is no need to check within the app whether the offset exists or not,
> since the consumer code will do that check automatically for you. So in
> practice an app will include the property below (e.g., set to "earliest"),
> but that will only have an effect if the consumer detects that the offsets
> do not exist anymore. If the offset exist, then that line is a noop.
>
> So in summary, I'd just include that property, and no more code changes
> are required.
>
> Thanks
> Eno
>
> > On 21 Nov 2016, at 12:11, Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > So in my java code how can I check
> > when there is no initial offset in Kafka or if the current offset does
> not
> > exist any more on the server (e.g. because that data has been deleted)
> >
> > So in this case as you have said I can set offset as
> > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //or
> latest
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Nov 21, 2016 at 4:16 PM, Eno Thereska <eno.there...@gmail.com
> <mailto:eno.there...@gmail.com>>
> > wrote:
> >
> >> Hi Sachin,
> >>
> >> Currently you can only change the following global configuration by
> using
> >> "earliest" or "latest", as shown in the code snippet below. As the
> Javadoc
> >> mentions: "What to do when there is no initial offset in Kafka or if the
> >> current offset does not exist any more on the server (e.g. because that
> >> data has been deleted)":
> >>
> >> ...
> >> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >> ...
> >> return new KafkaStreams(builder, props)
> >>
> >>
> >>
> >> In addition, there is a tool to reset the offsets of all topics to the
> >> beginning. This is useful for reprocessing:
> https://www.confluent.io/blog/
> >> data-reprocessing-with-kafka-streams-resetting-a-streams-application/ <
> >> https://www.confluent.io/blog/data-reprocessing-with- <
> https://www.confluent.io/blog/data-reprocessing-with->
> >> kafka-streams-resetting-a-streams-application/>
> >>
> >> However, there is no option currently for resetting the offset to an
> >> arbitrary offset.
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 21 Nov 2016, at 10:37, Sachin Mittal <sjmit...@gmail.com> wrote:
> >>>
> >>> Hi
> >>> I am running a streaming application with
> >>> streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> >>>
> >>> How do I find out the offsets for each of the source, intermediate and
> >>> internal topics associated with this application.
> >>>
> >>> And how can I reset them to some specific value via shell of otherwise.
> >>>
> >>> Thanks
> >>> Sachin
>
>

Reply via email to