Hi Bill, Hope the following is what you need.
val zerotime = System.currentTimeMillis() Then in foreach do the following //difference = RDDtimeparameter - zerotime //only to find the constant value to be used later starttime = (RDDtimeparameter - (zerotime + difference)) - intervalsize endtime = RDDtimeparameter - (zerotime + difference) Here zerotime is the time when streaming starts. Find out what is the difference (RDDtimeparameter - zerotime) for the first batch. Use it as a constant.(in mycase its 5000ms) At the end I will say its better to work with Spark time stamps rather than Application timestamps as it becomes very messy. If you reach a better solution, also let me know. Regards, Laeeq On Friday, July 18, 2014 7:21 PM, Bill Jay <bill.jaypeter...@gmail.com> wrote: Hi Tathagata, On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: The RDD parameter in foreachRDD contains raw/transformed data from the last batch. So when forearchRDD is called with the time parameter as 5:02:01 and batch size is 1 minute, then the rdd will contain data based on the data received by between 5:02:00 and 5:02:01. Do you mean the data between 5:02:02 and 5:02:01? The time parameter is 5:02:01. Moreover, when the program is running, it is very difficult to specify a starting time because sometimes it is difficult to know when the program executes that line. And do we need a different time parameter for each foreachRDD or Spark will calculate the next one according to batch. > >If you want to do custom intervals, then I suggest the following >1. Do 1 second batch intervals >2. Then in the foreachRDD, from 5:02:30 to 5:03:28, put all the RDDs in a >ArrayBuffer/ListBuffer >3. At 5:03:29, add the RDD to the buffer, and do a union of all the buffered >RDDs, and process them. > > >So in foreachRDD, based on the time, buffer the RDDs, until you reach the >appropriate time. Then union all the buffered RDDs and process them. > > >TD > > > >On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay <bill.jaypeter...@gmail.com> wrote: > >Hi Tathagata, >> >> >> >>Thanks for your answer. Please see my further question below: >> >> >> >> >>On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das <tathagata.das1...@gmail.com> >>wrote: >> >>Answers inline. >>> >>> >>> >>> >>>On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay <bill.jaypeter...@gmail.com> wrote: >>> >>>Hi all, >>>> >>>> >>>>I am currently using Spark Streaming to conduct a real-time data analytics. >>>>We receive data from Kafka. We want to generate output files that contain >>>>results that are based on the data we receive from a specific time >>>>interval. >>>> >>>> >>>>I have several questions on Spark Streaming's timestamp: >>>> >>>> >>>>1) If I use saveAsTextFiles, it seems Spark streaming will generate files >>>>in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time), >>>>etc. Does this mean the results are based on the data from 5:00:01 to >>>>5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the >>>>files are generated? >>>> >>>> >>>File named 5:00:01 contains results from data received between 5:00:00 and >>> 5:00:01 (based on system time of the cluster). >>> >>> >>> >>>2) If I do not use saveAsTextFiles, how do I get the exact time interval of >>>the RDD when I use foreachRDD to do custom output of the results? >>>> >>>> >>>There is a version of foreachRDD which allows you specify the function that >>>takes in Time object. >>> >>>3) How can we specify the starting time of the batches? >>> >>>What do you mean? Batches are timed based on the system time of the cluster. >>I would like to control the starting time and ending time of each batch. For >>example, if I use saveAsTextFiles as output method and the batch size is 1 >>minute, Spark will align time intervals to complete minutes, such as 5:01:00, >>5:02:00, 5:03:00. It will have not results that are 5:01:03, 5:02:03, >>5:03:03, etc. My goal is to generate output for a customized interval such as >>from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc. >> >> >>I checked the api of foreachRDD with time parameter. It seems there is not >>explanation on what does that parameter mean. Does it mean the starting time >>of the first batch? >> >>> >>>> >>>>Thanks! >>>> >>>> >>>>Bill >>> >> >