Hi Toni. Given there is more than one measure by (user, hour) what is the measure you want to keep? The sum?, the mean?, the most recent measure?. For the sum or the mean you don't need to care about the timing. And If you wan't to have the most recent then you can include the timestamp in the reduction function and use it to decide which measure you keep.
On the other hand. Given the same userId, do you expect that the measures will arrive reasonably ordered? If that's the case, you may reduce using only userId as the key and use your reduction function to decide when to drop the last hour and publish the next one. Maybe using a Map[hour, (timestamp, measure)] as the reduction argument. The expected result of that reduction can be a Map with the significant measures per hour of that DStream. For example: Given a DStream with { UserId = 1, Ts = 8:30, measure = X}, { UserId = 1, Ts = 8:45, measure = Y}, { UserId = 1, Ts = 9:10, measure = Z} Before the reduction (1, Map(8:00 -> (8:30, X)), (1, Map(8:00 -> (8:45,Y)), (1, Map(9:10 -> (9:10,Z)) After the reduction you can produce (assuming you want to keep the most recent) (1, Map(8:00 -> (8:45,Y), 9:00 -> (9,10, Z)) In a further step you can decide to dump to database the 8:00 measure because you have in that DStream a measure happening at 9:00 These way you can keep an structure that will use almost constant memory per userId because in the worst case you will have 2 hour in a map. Regards Nacho 2015-04-28 19:38 GMT+02:00 Toni Cebrián <tcebr...@enerbyte.com>: > Hi, > > Just new to Spark and in need of some help for framing the problem I > have. A problem well stated is half solved it's the saying :) > > Let's say that I have a DStream[String] basically containing Json of > some measurements from IoT devices. In order to keep it simple say that > after unmarshalling I have data like: > > case class Measurement(val deviceId:Long, val timestamp:Date, val > measurement:Double) > > I need to use DStreams because there is some interest in monitoring > real-time the measurements of devices. So let's say that I have a dashboard > with hourly granularity, past hours are consolidated but current hour must > be kept updated on every input. > > The problem is that the Time that matters is the timestamp in the Json not > the receiving timestamp by Spark so I think that I have to keep a stateful > DStream like the one described > http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ > . I have two questions here: > > 1. Once a given hour is gone, I'd like to flush the consolidated > stream into a DB. I think the strategy is to have a Stream with key-values > where the key is (userID, truncateByHour(timestamp)) and reducing over the > values. But it seems to me that with this approach Spark has lost any sense > of time, how would you flush all the RDDs with timestamps between 00:00:00 > and 00:59:59 for instance? Maybe I'm missing some function in the API > 2. How do you deal with events that come with timestamps in the past, > is it a matter of ignoring them, finding a trade-off between memory and how > long the stateful DStream is? But then, who is the one poping the mature > time slices from the stateful stream. > > For me Spark Streaming would be the most natural way to face this problem, > but maybe a simple Spark processing run every minute could keep easily the > sorting by time of external events. > > I'd like to hear your thoughts. > Toni >