Hi all,
Let me add more info about this. The log showed: 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Time 1498383086000 ms is valid 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Window time = 2000 ms 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Slide time = 8000 ms 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Zero time = 1498383078000 ms 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 ms, 1498383086000 ms] 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = [1498383077000 ms, 1498383078000 ms] 17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) 17/06/25 17:31:26 INFO ShuffledDStream: Time 1498383078000 ms is invalid as zeroTime is 1498383078000 ms , slideDuration is 1000 ms and difference is 0 ms 17/06/25 17:31:26 DEBUG ShuffledDStream: Time 1498383079000 ms is valid 17/06/25 17:31:26 DEBUG MappedDStream: Time 1498383079000 ms is valid the slice time is wrong. For my test code: lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { ??=== here the windowDuration is 2 seconds and the slideDuration is 8 seconds. ===========key log begin ============ 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 ms, 1498383086000 ms] 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = [1498383077000 ms, 1498383078000 ms] 17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) ??=== here, the old RDD slices from 1498383077000 to 1498383084000 . It is 8 seconds. Actual it should be 2 seconds. ===========key log end============ ===========code in ReducedWindowedDStream.scala begin============ override def compute(validTime: Time): Option[RDD[(K, V)]] = { val reduceF = reduceFunc val invReduceF = invReduceFunc val currentTime = validTime val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime) val previousWindow = currentWindow - slideDuration logDebug("Window time = " + windowDuration) logDebug("Slide time = " + slideDuration) logDebug("Zero time = " + zeroTime) logDebug("Current window = " + currentWindow) logDebug("Previous window = " + previousWindow) // _____________________________ // | previous window ________|__________________ // |___________________| current window | --------------> Time // |_____________________________| // // |________ ________| |_______ _________| // | | // V V // old RDDs new RDDs // // Get the RDDs of the reduced values in "old time steps" val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) ??== I think this line is "reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime + windowDuration - parent.slideDuration)" logDebug("# old RDDs = " + oldRDDs.size) // Get the RDDs of the reduced values in "new time steps" val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)??== this line is "reducedStream.slice(previousWindow.endTime + windowDuration - parent.slideDuration, currentWindow.endTime)" logDebug("# new RDDs = " + newRDDs.size) ===========code in ReducedWindowedDStream.scala end============ Thanks Fei Shao ---Original--- From: "??????????"<1427357...@qq.com> Date: 2017/6/24 14:51:52 To: "user"<u...@spark.apache.org>;"dev"<dev@spark.apache.org>; Subject: issue about the windows slice of stream Hi all, I found an issue about the windows slice of dstream. My code is : ssc = new StreamingContext( conf, Seconds(1)) val content = ssc.socketTextStream('ip','port') content.countByValueAndWindow( Seconds(2), Seconds(8)).foreach( println(xxxx)) The key is that slide is greater than windows. I checked the output.The result from foreach( println(xxxx)) was wrong. I found the stream was cut apart wrong. Can I open a JIRA please? thanks Fei Shao