Also Is in fromoffset api last saved offset is fetched twice ? Is fromoffset api starts from Map<TopicAndPartition m,Long l>'s Long value or LongValue+1 ? If its from Longvalue - it will be twice - once it was in last application's run before crash and once after crash in first run ?
On Thu, Aug 6, 2015 at 9:05 AM, Shushant Arora <shushantaror...@gmail.com> wrote: > Hi > > For checkpointing and using fromOffsets arguments- Say for the first > time when my app starts I don't have any prev state stored and I want to > start consuming from largest offset > > 1. is it possible to specify that in fromOffsets api- I don't want to > use another api which returs JavaPairInputDStream but fromoffsets api > returns JavaDStream - since I want to keep further flow of my app same in > both case. > > > 2. So to achieve first(same flow in both cases) if I use diff api in 2 > cases and when I transfer JavaPairInputDStream to JavaDStream using map > function , I am no longer able to typecast transferred stream to > HasOffsetRanges for getting offstes of current run- it throws class cast > exception - > when i do > OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges(); > on transformed stream - > > java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot > be cast to org.apache.spark.streaming.kafka.HasOffsetRanges > > > > > On Thu, Jul 30, 2015 at 7:58 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> You can't use checkpoints across code upgrades. That may or may not >> change in the future, but for now that's a limitation of spark checkpoints >> (regardless of whether you're using Kafka). >> >> Some options: >> >> - Start up the new job on a different cluster, then kill the old job once >> it's caught up to where the new job started. If you care about duplicate >> work, you should be doing idempotent / transactional writes anyway, which >> should take care of the overlap between the two. If you're doing batches, >> you may need to be a little more careful about handling batch boundaries >> >> - Store the offsets somewhere other than the checkpoint, and provide them >> on startup using the fromOffsets argument to createDirectStream >> >> >> >> >> >> On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro <nibbi...@gmail.com> >> wrote: >> >>> Hi, >>> I've read about the recent updates about spark-streaming integration >>> with Kafka (I refer to the new approach without receivers). >>> In the new approach, metadata are persisted in checkpoint folders on >>> HDFS so that the SparkStreaming context can be recreated in case of >>> failures. >>> This means that the streaming application will restart from the where it >>> exited and the message consuming process continues with new messages only. >>> Also, if I manually stop the streaming process and recreate the context >>> from checkpoint (using an approach similar to >>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala), >>> the behavior would be the same. >>> >>> Now, suppose I want to change something in the software and modify the >>> processing pipeline. >>> Can spark use the previous checkpoint to recreate the new application? >>> Will I ever be able to upgrade the software without processing all the >>> messages in Kafka again? >>> >>> Regards, >>> Nicola >>> >> >> >