Hi Bill,

Hope the following is what you need.

val zerotime = System.currentTimeMillis()


Then in foreach do the following

//difference = RDDtimeparameter - zerotime //only to find the constant value to 
be used later
starttime = (RDDtimeparameter - (zerotime + difference)) -  intervalsize

endtime =  RDDtimeparameter - (zerotime + difference) 


Here zerotime is the time when streaming starts. Find out what is the 
difference (RDDtimeparameter - zerotime) for the first batch. Use it as a 
constant.(in mycase its 5000ms) At the end I will say its better to work with 
Spark time stamps rather than Application timestamps as it becomes very messy. 
If you reach a better solution, also let me know.

Regards,
Laeeq



On Friday, July 18, 2014 7:21 PM, Bill Jay <bill.jaypeter...@gmail.com> wrote:
 


Hi Tathagata,





On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das <tathagata.das1...@gmail.com> 
wrote:

The RDD parameter in foreachRDD contains raw/transformed data from the last 
batch. So when forearchRDD is called with the time parameter as 5:02:01 and 
batch size is 1 minute, then the rdd will contain data based on the data 
received by between 5:02:00 and 5:02:01. 
Do you mean the data between 5:02:02 and 5:02:01? The time parameter is 
5:02:01. Moreover, when the program is running, it is very difficult to specify 
a starting time because sometimes it is difficult to know when the program 
executes that line. And do we need a different time parameter for each 
foreachRDD or Spark will calculate the next one according to batch.   

>
>If you want to do custom intervals, then I suggest the following 
>1. Do 1 second batch intervals
>2. Then in the foreachRDD, from  5:02:30 to 5:03:28, put all the RDDs in a 
>ArrayBuffer/ListBuffer
>3. At 5:03:29, add the RDD to the buffer, and do a union of all the buffered 
>RDDs, and process them.
>
>
>So in foreachRDD, based on the time, buffer the RDDs, until you reach the 
>appropriate time. Then union all the buffered RDDs and process them. 
>
>
>TD
>
>
>
>On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay <bill.jaypeter...@gmail.com> wrote:
>
>Hi Tathagata,
>>
>>
>>
>>Thanks for your answer. Please see my further question below: 
>>
>>
>>
>>
>>On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das <tathagata.das1...@gmail.com> 
>>wrote:
>>
>>Answers inline.
>>>
>>>
>>>
>>>
>>>On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay <bill.jaypeter...@gmail.com> wrote:
>>>
>>>Hi all,
>>>>
>>>>
>>>>I am currently using Spark Streaming to conduct a real-time data analytics. 
>>>>We receive data from Kafka. We want to generate output files that contain 
>>>>results that are based on the data we receive from a specific time 
>>>>interval. 
>>>>
>>>>
>>>>I have several questions on Spark Streaming's timestamp:
>>>>
>>>>
>>>>1) If I use saveAsTextFiles, it seems Spark streaming will generate files 
>>>>in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), 
>>>>etc. Does this mean the results are based on the data from 5:00:01 to 
>>>>5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the 
>>>>files are generated?
>>>>
>>>>
>>>File named  5:00:01 contains results from data received between  5:00:00 and 
>>> 5:00:01 (based on system time of the cluster).
>>>
>>>
>>> 
>>>2) If I do not use saveAsTextFiles, how do I get the exact time interval of 
>>>the RDD when I use foreachRDD to do custom output of the results? 
>>>>
>>>>
>>>There is a version of foreachRDD which allows you specify the function that 
>>>takes in Time object. 
>>> 
>>>3) How can we specify the starting time of the batches?
>>> 
>>>What do you mean? Batches are timed based on the system time of the cluster. 
>>I would like to control the starting time and ending time of each batch. For 
>>example, if I use saveAsTextFiles as output method and the batch size is 1 
>>minute, Spark will align time intervals to complete minutes, such as 5:01:00, 
>>5:02:00, 5:03:00. It will have not results that are 5:01:03, 5:02:03, 
>>5:03:03, etc. My goal is to generate output for a customized interval such as 
>>from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. 
>>
>>
>>I checked the api of foreachRDD with time parameter. It seems there is not 
>>explanation on what does that parameter mean. Does it mean the starting time 
>>of the first batch?   
>> 
>>>
>>>>
>>>>Thanks!
>>>>
>>>>
>>>>Bill
>>>
>>
>

Reply via email to