You don't want auto.offset.reset on executors, you want executors to do what the driver told them to do. Otherwise you're going to get really horrible data inconsistency issues if the executors silently reset.
If your retention is so low that retention gets expired in between when the driver created a batch with a given starting offset, and when an executor starts to process that batch, you're going to have problems. On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth...@gmail.com> wrote: > This isn't a production setup. We kept retention low intentionally. > My original question was why I got the exception instead of it using > auto.offset.reset on restart? > > > > > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org> wrote: >> >> If you leave enable.auto.commit set to true, it will commit offsets to >> kafka, but you will get undefined delivery semantics. >> >> If you just want to restart from a fresh state, the easiest thing to >> do is use a new consumer group name. >> >> But if that keeps happening, you should look into why your retention >> is not sufficient. >> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> wrote: >> > You are right. I got confused as its all part of same log when running >> > from >> > IDE. >> > I was looking for a good guide to read to understand the this integ. >> > >> > I'm not managing offset on my own. I've not enabled checkpoint for my >> > tests. >> > I assumed offsets will be stored in kafka by default. >> > >> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( >> > ssc, PreferConsistent, SubscribePattern[Array[Byte], >> > Array[Byte]](pattern, kafkaParams) ) >> > >> > * @param offsets: offsets to begin at on initial startup. If no >> > offset >> > is given for a >> > * TopicPartition, the committed offset (if applicable) or kafka param >> > * auto.offset.reset will be used. >> > >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values: >> > enable.auto.commit = true >> > auto.offset.reset = latest >> > >> > Srikanth >> > >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org> >> > wrote: >> >> >> >> Seems like you're confused about the purpose of that line of code, it >> >> applies to executors, not the driver. The driver is responsible for >> >> determining offsets. >> >> >> >> Where are you storing offsets, in Kafka, checkpoints, or your own >> >> store? >> >> Auto offset reset won't be used if there are stored offsets. >> >> >> >> >> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote: >> >>> >> >>> Hi, >> >>> >> >>> Upon restarting my Spark Streaming app it is failing with error >> >>> >> >>> Exception in thread "main" org.apache.spark.SparkException: Job >> >>> aborted >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost): >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets >> >>> out of >> >>> range with no configured reset policy for partitions: >> >>> {mt-event-2=1710706} >> >>> >> >>> It is correct that the last read offset was deleted by kafka due to >> >>> retention period expiry. >> >>> I've set auto.offset.reset in my app but it is getting reset here >> >>> >> >>> >> >>> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160 >> >>> >> >>> How to force it to restart in this case (fully aware of potential data >> >>> loss)? >> >>> >> >>> Srikanth >> > >> > > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org