Hi For checkpointing and using fromOffsets arguments- Say for the first time when my app starts I don't have any prev state stored and I want to start consuming from largest offset
1. is it possible to specify that in fromOffsets api- I don't want to use another api which returs JavaPairInputDStream but fromoffsets api returns JavaDStream - since I want to keep further flow of my app same in both case. 2. So to achieve first(same flow in both cases) if I use diff api in 2 cases and when I transfer JavaPairInputDStream to JavaDStream using map function , I am no longer able to typecast transferred stream to HasOffsetRanges for getting offstes of current run- it throws class cast exception - when i do OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges(); on transformed stream - java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges On Thu, Jul 30, 2015 at 7:58 PM, Cody Koeninger <[email protected]> wrote: > You can't use checkpoints across code upgrades. That may or may not > change in the future, but for now that's a limitation of spark checkpoints > (regardless of whether you're using Kafka). > > Some options: > > - Start up the new job on a different cluster, then kill the old job once > it's caught up to where the new job started. If you care about duplicate > work, you should be doing idempotent / transactional writes anyway, which > should take care of the overlap between the two. If you're doing batches, > you may need to be a little more careful about handling batch boundaries > > - Store the offsets somewhere other than the checkpoint, and provide them > on startup using the fromOffsets argument to createDirectStream > > > > > > On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro <[email protected]> > wrote: > >> Hi, >> I've read about the recent updates about spark-streaming integration with >> Kafka (I refer to the new approach without receivers). >> In the new approach, metadata are persisted in checkpoint folders on HDFS >> so that the SparkStreaming context can be recreated in case of failures. >> This means that the streaming application will restart from the where it >> exited and the message consuming process continues with new messages only. >> Also, if I manually stop the streaming process and recreate the context >> from checkpoint (using an approach similar to >> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala), >> the behavior would be the same. >> >> Now, suppose I want to change something in the software and modify the >> processing pipeline. >> Can spark use the previous checkpoint to recreate the new application? >> Will I ever be able to upgrade the software without processing all the >> messages in Kafka again? >> >> Regards, >> Nicola >> > >
