Aah, I might have misinterpreted. The groupBy + window solution would give the max time for each train over 24 hours (non-overlapping window) of event data (timestamped by activity_timestamp). So the output would be like.
Train Dest Window(activity_timestamp) max(Time) 1 HK Aug28-00:00 to Aug29-00:00 10:00 <- updating currently through aug29 1 HK Aug27-00:00 to Aug28-00:00 09:00 <- not updating as no new updates coming in with activity_timestamp in this range. The drawback of this approach is that as soon as Aug28 starts, you have wait for new event about a train to get a new max(time). You may rather want a rolling 24 hour period, that is, the max time known over events in the last 24 hours. Then maintaining our own custom state using mapGroupsWithState/flatMapGroupsWithState() is the best and most flexible option. It is available in Spark 2.2 in Scala, Java. Here is an example that tracks sessions based on events. Scala - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala You will have to create a custom per-train state which keeps track of last 24 hours of trains history, and use that state to calculate the max time for each train. def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents], state: GroupState[TrainHistory]): Long = { // for every event, update history (i.e. last 24 hours of events) and return the max time from the history } trainTimesDataset // Dataset[TrainEvents] .groupByKey(_.train) .mapGroupsWithState(updateHistoryAndGetMax) Hope this helps. On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote: > Hey TD, > > If I understood the question correctly, your solution wouldn't return the > exact solution, since it also groups by on destination. I would say the > easiest solution would be to use flatMapGroupsWithState, where you: > .groupByKey(_.train) > > and keep in state the row with the maximum time. > > On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Yes. And in that case, if you just care about only the last few days of >> max, then you should set watermark on the timestamp column. >> >> *trainTimesDataset* >> * .withWatermark("**activity_timestamp", "5 days")* >> * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", >> "dest")* >> * .max("time")* >> >> Any counts which are more than 5 days old will be dropped from the >> streaming state. >> >> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Hi, >>> >>> Thanks for the response. Since this is a streaming based query and in my >>> case I need to hold state for 24 hours which I forgot to mention in my >>> previous email. can I do ? >>> >>> *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24 >>> hours"), "train", "dest").max("time")* >>> >>> >>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> >>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train: >>>> Int, dest: String, time: Timestamp] * >>>> >>>> >>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* >>>> >>>> >>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by >>>> train, dest"* // after calling >>>> *trainTimesData.createOrReplaceTempView(trainTimesView)* >>>> >>>> >>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I am wondering what is the easiest and concise way to express the >>>>> computation below in Spark Structured streaming given that it supports >>>>> both >>>>> imperative and declarative styles? >>>>> I am just trying to select rows that has max timestamp for each train? >>>>> Instead of doing some sort of nested queries like we normally do in any >>>>> relational database I am trying to see if I can leverage both imperative >>>>> and declarative at the same time. If nested queries or join are not >>>>> required then I would like to see how this can be possible? I am using >>>>> spark 2.1.1. >>>>> >>>>> Dataset >>>>> >>>>> Train Dest Time1 HK 10:001 SH 12:001 >>>>> SZ 14:002 HK 13:002 SH 09:002 >>>>> SZ 07:00 >>>>> >>>>> The desired result should be: >>>>> >>>>> Train Dest Time1 SZ 14:002 HK 13:00 >>>>> >>>>> >>>> >>> >> >