Hi,

I want to run a map/reduce process over last 5 seconds of data, every 4 
seconds. This is quite similar to the sliding window pictorial example under 
Window Operations section on 
http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html

The RDDs returned by window transformation function are incorrect in my case. 
To investigate this further, I ran a series of examples with varying values of 
window length & slide interval. Summary of the test results:
(window length, slide interval) -> result
(3,1) -> success
(4,2) -> success
(3,2) -> fail
(4,3) -> fail
(5,4) -> fail
(5,2) -> fail

The only condition mentioned in the doc is that the two values(5 & 4) should be 
multiples of batch interval(1 in my case) and obviously, I get a run time error 
if I attempt to violate this condition. Looking at my results, it seems that 
failures result when the slide interval isn't a multiple of window length.

My code:
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 
* 60 * 1000));
JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
JavaDStream<String> objWindow = inputStream.window(new 
Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
objWindow.dstream().saveAsTextFiles("/Output", "");

Detailed results:
(3,1) -> success
@t_0: [inputStream's RDD@t_0]
@t_1: [inputStream's RDD@t_0,1]
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: [inputStream's RDD@t_1,2,3]
@t_4: [inputStream's RDD@t_2,3,4]
@t_5: [inputStream's RDD@t_3,4,5]

(4,2) -> success
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]

(3,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_2,3]    //(expected RDD@t_1,2,3)
@t_4: nothing
@t_5: [inputStream's RDD@t_4,5]    //(expected RDD@t_3,4,5)

(4,3) -> fail
@t_0: nothing
@t_1: nothing
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: nothing
@t_4: nothing
@t_5: [inputStream's RDD@t_3,4,5]    //(expected RDD@t_2,3,4,5)

(5,4) -> fail
@t_0: nothing
@t_1: nothing
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: nothing
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

(5,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]    //(expected RDD@t_1,2,3,4,5)
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

I have run all the above examples twice to be sure !
I believe either my understanding of sliding window mechanism is incorrect or 
there is a problem in the sliding window mechanism.

Regards,
Sanjay

Reply via email to