Hi TD,
Thanks for the help.
The only problem left here is that the dstreamTime contains some extra
information which seems date i.e. 1405944367000 ms whereas my application
timestamps are just in sec which I converted
to ms. e.g. 2300, 2400, 2500 etc. So the filter doesn't take effect.
I was thinking to add that extra info to my Time(4000). But I am not really
sure what it is?
val keyAndValues = eegStreams.map(x=> {
val token = x.split(" ")
((token(0).toDouble *
1000).toLong,token(1).toDouble)
})
val transformed =
keyAndValues.window(Seconds(8),Seconds(4)).transform((windowedRDD, dstreamTime)
=> {
val currentAppTimeWindowStart = dstreamTime - Time(4000) //
define the window over the timestamp that you want to process
val currentAppTimeWindowEnd = dstreamTime
val filteredRDD = windowedRDD.filter(r => Duration(r._1) >
currentAppTimeWindowStart && Time(r._1) <= currentAppTimeWindowEnd)
filteredRDD
})
The sample input is as under
AppTimestamp Datapoints
0 -145.934066
0.003906 0.19536
0.007812 0.19536
0.011719 0.19536
0.015625 0.19536
0.019531 0.976801
0.023438 0.586081
0.027344 -1.758242
0.03125 -1.367521
0.035156 2.930403
0.039062 4.102564
0.042969 3.711844
0.046875 2.148962
0.050781 -4.102564
0.054688 -1.758242
0.058594 3.711844
0.0625 9.181929
0.066406 11.135531
0.070312 4.884005
0.074219 0.976801
0.078125 4.493284
0.082031 11.135531
0.085938 12.698413
0.089844 15.824176
0.09375 21.684982
0.097656 22.466422
0.101562 18.949939
0.105469 14.652015
0.109375 11.135531
0.113281 1.758242
0.117188 -6.056166
0.121094 -0.976801
0.125 0.19536
0.128906 -6.837607
0.132812 -8.400488
0.136719 -14.261294
0.140625 -24.810745
0.144531 -25.592186
0.148438 -19.73138
0.152344 -18.559219
0.15625 -25.201465
Regards,
Laeeq
On Thursday, July 17, 2014 8:58 PM, Tathagata Das <[email protected]>
wrote:
You have to define what is the range records that needs to be filtered out in
every windowed RDD, right? For example, when the DStream.window has data from
from times 0 - 8 seconds by DStream time, you only want to filter out data that
falls into say 4 - 8 seconds by application time. This latter is the
application-level time window that you need to define in the transform
function. What may help is that there is another version of transform which
allows you to get the current DStream time (that is, it will give the value
"8") from which you can calculate the app-time-window 4 - 8.
val transformed = keyAndValues.window(Seconds(8),
Seconds(4)).transform((windowedRDD: RDD[...], dstreamTime: Time) => {
val currentAppTimeWindowStart = dstreamTime - appTimeWindowSize
// define the window over the timestamp that you want to process
val currentAppTimeWindowEnd = dstreamTime
val filteredRDD = windowedRDD.filter(r => r._1 <= currentAppTimeWindowEnd &&
r._1 > currentAppTimeWindowStart) // filter and retain only the records
that fall in the current app-time window
return filteredRDD
})
Hope this helps!
TD
On Thu, Jul 17, 2014 at 5:54 AM, Laeeq Ahmed <[email protected]> wrote:
Hi TD,
>
>
>I have been able to filter the first WindowedRDD, but I am not sure how to
>make a generic filter. The larger window is 8 seconds and want to fetch 4
>second based on application-time-stamp. I have seen an earlier post which
>suggest timeStampBasedwindow but I am not sure how to make
>timestampBasedwindow in the following example.
>
>
>
> val transformed = keyAndValues.window(Seconds(8),
>Seconds(4)).transform(windowedRDD => {
> //val timeStampBasedWindow = ??? // define the window over
>the timestamp that you want to process
> val filteredRDD = windowedRDD.filter(_._1 < 4) // filter and retain only
>the records that fall in the timestamp-based window
> return filteredRDD
> })
>
>Consider the input tuples as (1,23),(1.2,34) . . . . . (3.8,54)(4,413) . . .
>whereas key is the timestamp.
>
>Regards,
>Laeeq
>
>
>
>
>
>
>On Saturday, July 12, 2014 8:29 PM, Laeeq Ahmed <[email protected]> wrote:
>
>
>
>Hi,
>Thanks I will try to implement it.
>
>
>Regards,
>Laeeq
>
>
>
>
>
> On Saturday, July 12, 2014 4:37 AM, Tathagata Das
> <[email protected]> wrote:
>
>
>
>This is not in the current streaming API.
>
>
>Queue stream is useful for testing with generated RDDs, but not for actual
>data. For actual data stream, the slack time can be implemented by doing
>DStream.window on a larger window that take slack time in consideration, and
>then the required application-time-based-window of data filtered out. For
>example, if you want a slack time of 1 minute and batches of 10 seconds, then
>do a window operation of 70 seconds, then in each RDD filter out the records
>with the desired application time and process them.
>
>
>TD
>
>
>
>On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed <[email protected]> wrote:
>
>Hi,
>>
>>
>>In the spark streaming paper, "slack time" has been suggested for delaying
>>the batch creation in case of external timestamps. I don't see any such
>>option in streamingcontext. Is it available in the API?
>>
>>
>>
>>Also going through the previous posts, queueStream has been suggested for
>>this. I looked into to queueStream example.
>>
>>
>>
>> // Create and push some RDDs into Queue
>> for (i <- 1 to 30) {
>> rddQueue += ssc.sparkContext.makeRDD(1 to 10)
>> Thread.sleep(1000)
>> }
>>
>>The only thing I am unsure is how to make batches(basic RDD) out of stream
>>coming on a port.
>>
>>
>>Regards,
>>Laeeq
>>
>>
>
>
>
>
>