Not sure on that, maybe someone else can chime in On Sat, Nov 14, 2015 at 4:51 AM, kundan kumar <iitr.kun...@gmail.com> wrote:
> Hi Cody , > > Thanks for the clarification. I will try to come up with some workaround. > > I have an another doubt. When my job is restarted, and recovers from the > checkpoint it does the re-partitioning step twice for each 15 minute job > until the window of 2 hours is complete. Then the re-partitioning takes > place only once. > > For eg - When the job recovers at 16:15 it does re-partitioning for the > 16:15 kafka stream and the 14:15 kafka stream as well. Also, all the other > intermediate stages are computed for 10:00 batch. I am using > reduceByKeyAndWindow with inverse function. Now once the 2 hrs window is > complete i.e at 18:15 repartitioning takes place only once. Seems like the > checkpoint does not have rdd stored for beyond 2 hrs which is my window > duration. Because of this my job takes more time than usual. > > Is there a way or some configuration parameter which would help avoid > repartitioning twice ? > > I am attaching the snapshot for the same. > > Thanks !! > Kundan > > On Fri, Nov 13, 2015 at 8:48 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Unless you change maxRatePerPartition, a batch is going to contain all of >> the offsets from the last known processed to the highest available. >> >> Offsets are not time-based, and Kafka's time-based api currently has very >> poor granularity (it's based on filesystem timestamp of the log segment). >> There's a kafka improvement proposal to add time-based indexing, but I >> wouldn't expect it soon. >> >> Basically, if you want batches to relate to time even while your spark >> job is down, you need an external process to index Kafka and do some custom >> work to use that index to generate batches. >> >> Or (preferably) embed a time in your message, and do any time-based >> calculations using that time, not time of processing. >> >> On Fri, Nov 13, 2015 at 4:36 AM, kundan kumar <iitr.kun...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I am using spark streaming check-pointing mechanism and reading the data >>> from kafka. The window duration for my application is 2 hrs with a sliding >>> interval of 15 minutes. >>> >>> So, my batches run at following intervals... >>> 09:45 >>> 10:00 >>> 10:15 >>> 10:30 and so on >>> >>> Suppose, my running batch dies at 09:55 and I restart the application at >>> 12:05, then the flow is something like >>> >>> At 12:05 it would run the 10:00 batch -> would this read the kafka >>> offsets from the time it went down (or 9:45) to 12:00 ? or just upto >>> 10:10 ? >>> then next would 10:15 batch - what would be the offsets as input for >>> this batch ? ...so on for all the queued batches >>> >>> >>> Basically, my requirement is such that when the application is restarted >>> at 12:05 then it should read the kafka offsets till 10:00 and then the >>> next queued batch takes offsets from 10:00 to 10:15 and so on until all the >>> queued batches are processed. >>> >>> If this is the way offsets are handled for all the queued batched and I >>> am fine. >>> >>> Or else please provide suggestions on how this can be done. >>> >>> >>> >>> Thanks!!! >>> >>> >> >