Yeah, maybe I should bump the issue to major. Now that I thought about to give my previous answer, this should be easy to fix just by doing a foreachRDD on all the input streams within the system (rather than explicitly doing it like I asked you to do).
Thanks Alan, for testing this out and confirming that this was the same issue. I was worried that this is a totally new issue that we did not know of. TD On Wed, Jul 23, 2014 at 12:37 AM, Alan Ngai <a...@opsclarity.com> wrote: > TD, it looks like your instincts were correct. I misunderstood what you > meant. If I force an eval on the inputstream using foreachRDD, the > windowing will work correctly. If I don’t do that, lazy eval somehow screws > with window batches I eventually receive. Any reason the bug is categorized > as minor? It seems that anyone who uses the windowing functionality would > run into this bug. I imagine this would include anyone who wants to use > spark streaming to aggregate data in fixed time batches, which seems like a > fairly common use case. > > Alan > > > > On Jul 22, 2014, at 11:30 PM, Alan Ngai <a...@opsclarity.com> wrote: > > foreachRDD is how I extracted values in the first place, so that’s not going > to make a difference. I don’t think it’s related to SPARK-1312 because I’m > generating data every second in the first place and I’m using foreachRDD > right after the window operation. The code looks something like > > val batchInterval = 5 > val windowInterval = 25 > val slideInterval = 15 > > val windowedStream = inputStream.window(Seconds(windowInterval), > Seconds(slideInterval)) > > val outputFunc = (r: RDD[MetricEvent], t: Time) => { > println("======================================== > %s".format(t.milliseconds / 1000)) > r.foreach{metric => > val timeKey = metric.timeStamp / batchInterval * batchInterval > println("%s %s %s %s".format(timeKey, metric.timeStamp, metric.name, > metric.value)) > } > } > testWindow.foreachRDD(outputFunc) > > On Jul 22, 2014, at 10:13 PM, Tathagata Das <tathagata.das1...@gmail.com> > wrote: > > It could be related to this bug that is currently open. > https://issues.apache.org/jira/browse/SPARK-1312 > > Here is a workaround. Can you put a inputStream.foreachRDD(rdd => { }) and > try these combos again? > > TD > > > On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai <a...@opsclarity.com> wrote: >> >> I have a sample application pumping out records 1 per second. The batch >> interval is set to 5 seconds. Here’s a list of “observed window intervals” >> vs what was actually set >> >> window=25, slide=25 : observed-window=25, overlapped-batches=0 >> window=25, slide=20 : observed-window=20, overlapped-batches=0 >> window=25, slide=15 : observed-window=15, overlapped-batches=0 >> window=25, slide=10 : observed-window=20, overlapped-batches=2 >> window=25, slide=5 : observed-window=25, overlapped-batches=3 >> >> can someone explain this behavior to me? I’m trying to aggregate metrics >> by time batches, but want to skip partial batches. Therefore, I’m trying to >> find a combination which results in 1 overlapped batch, but no combination I >> tried gets me there. >> >> Alan >> > > >