Hi all,

Let me add more info about this.
The log showed:
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Time 1498383086000 ms is valid
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Window time = 2000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Slide time = 8000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Zero time = 1498383078000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 
ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = 
[1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 
1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms)
17/06/25 17:31:26 INFO ShuffledDStream: Time 1498383078000 ms is invalid as 
zeroTime is 1498383078000 ms , slideDuration is 1000 ms and difference is 0 ms
17/06/25 17:31:26 DEBUG ShuffledDStream: Time 1498383079000 ms is valid
17/06/25 17:31:26 DEBUG MappedDStream: Time 1498383079000 ms is valid
the slice time is wrong.


For my test code:
lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { ??=== 
here the windowDuration is 2 seconds and the slideDuration is 8 seconds.
===========key  log begin ============
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 
ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = 
[1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 
1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) ??=== here, 
the old RDD slices from 1498383077000 to 1498383084000 . It is 8 seconds. 
Actual it should be 2 seconds.
===========key log end============
===========code in ReducedWindowedDStream.scala begin============
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
val currentTime = validTime
val currentWindow = new Interval(currentTime - windowDuration + 
parent.slideDuration,
currentTime)
val previousWindow = currentWindow - slideDuration
logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
logDebug("Zero time = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
// _____________________________
// | previous window ________|__________________
// |___________________| current window | --------------> Time
// |_____________________________|
//
// |________ ________| |_______ _________|
// | |
// V V
// old RDDs new RDDs
//
// Get the RDDs of the reduced values in "old time steps"
val oldRDDs =
reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - 
parent.slideDuration) ??== I think this line is 
"reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime + 
windowDuration - parent.slideDuration)"
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
val newRDDs =
reducedStream.slice(previousWindow.endTime + parent.slideDuration, 
currentWindow.endTime)??== this line is 
"reducedStream.slice(previousWindow.endTime + windowDuration - 
parent.slideDuration,
currentWindow.endTime)"
logDebug("# new RDDs = " + newRDDs.size)
===========code in ReducedWindowedDStream.scala end============



Thanks
Fei Shao
 
---Original---
From: "??????????"<1427357...@qq.com>
Date: 2017/6/24 14:51:52
To: "user"<u...@spark.apache.org>;"dev"<dev@spark.apache.org>;
Subject: issue about the windows slice of stream


Hi all,
I found an issue about the windows slice of dstream.
My code is :


ssc = new StreamingContext( conf, Seconds(1))


val content = ssc.socketTextStream('ip','port')
content.countByValueAndWindow( Seconds(2),  Seconds(8)).foreach( println(xxxx))
The key is that slide is greater than windows.
I checked the output.The result from  foreach( println(xxxx)) was wrong.
I found the stream was cut apart wrong.
Can I open a JIRA please?


thanks
Fei Shao

Reply via email to