I think I understand *groupByKey/**mapGroupsWithState *and I am still trying to wrap my head around *GroupState<S>*. so, I believe I have a naive questions to ask on *GroupState<S>*.
If I were to represent a state that has history of events (say 24 hours) and say the number of events can be big for a given 24 hour period. where do I store the state S? An external store like Kafka or a Database or a Distributed File system ? I wonder if I can represent the state S using a DataSet that represents the history of events? GroupState also has .exists() and .get() and if I am not wrong I should override these methods right so comparisons and retrieval from external store can work? Thanks! On Wed, Aug 30, 2017 at 1:39 AM, kant kodali <kanth...@gmail.com> wrote: > Hi TD, > > Thanks for the explanation and for the clear pseudo code and an example! > > mapGroupsWithState is cool and looks very flexible however I have few > concerns and questions. For example > > Say I store TrainHistory as max heap from the Java Collections library and > I keep adding to to this heap for 24 hours and at some point I will run out > of Java heap space right? Do I need to store TrainHistory as a DataSet or > DataFrame instead of in memory max heap object from Java Collections > library? > > I wonder between *Nested query* vs *groupByKey/**mapGroupsWithState* > which approach is more efficient to solve this particular problem ? > > Thanks! > > > > > > On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Aah, I might have misinterpreted. The groupBy + window solution would >> give the max time for each train over 24 hours (non-overlapping window) of >> event data (timestamped by activity_timestamp). So the output would be >> like. >> >> Train Dest Window(activity_timestamp) max(Time) >> 1 HK Aug28-00:00 to Aug29-00:00 10:00 <- updating >> currently through aug29 >> 1 HK Aug27-00:00 to Aug28-00:00 09:00 <- not updating >> as no new updates coming in with activity_timestamp in this range. >> >> The drawback of this approach is that as soon as Aug28 starts, you have >> wait for new event about a train to get a new max(time). You may rather >> want a rolling 24 hour period, that is, the max time known over events in >> the last 24 hours. >> Then maintaining our own custom state using mapGroupsWithState/ >> flatMapGroupsWithState() is the best and most flexible option. >> It is available in Spark 2.2 in Scala, Java. >> >> Here is an example that tracks sessions based on events. >> Scala - https://github.com/apache/spark/blob/master/examples/src/ >> main/scala/org/apache/spark/examples/sql/streaming/Structu >> redSessionization.scala >> >> You will have to create a custom per-train state which keeps track of >> last 24 hours of trains history, and use that state to calculate the max >> time for each train. >> >> >> def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents], >> state: GroupState[TrainHistory]): Long = { >> // for every event, update history (i.e. last 24 hours of events) and >> return the max time from the history >> } >> >> trainTimesDataset // Dataset[TrainEvents] >> .groupByKey(_.train) >> .mapGroupsWithState(updateHistoryAndGetMax) >> >> Hope this helps. >> >> >> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote: >> >>> Hey TD, >>> >>> If I understood the question correctly, your solution wouldn't return >>> the exact solution, since it also groups by on destination. I would say the >>> easiest solution would be to use flatMapGroupsWithState, where you: >>> .groupByKey(_.train) >>> >>> and keep in state the row with the maximum time. >>> >>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> >>>> Yes. And in that case, if you just care about only the last few days of >>>> max, then you should set watermark on the timestamp column. >>>> >>>> *trainTimesDataset* >>>> * .withWatermark("**activity_timestamp", "5 days")* >>>> * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), >>>> "train", "dest")* >>>> * .max("time")* >>>> >>>> Any counts which are more than 5 days old will be dropped from the >>>> streaming state. >>>> >>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Thanks for the response. Since this is a streaming based query and in >>>>> my case I need to hold state for 24 hours which I forgot to mention in my >>>>> previous email. can I do ? >>>>> >>>>> *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", >>>>> "24 hours"), "train", "dest").max("time")* >>>>> >>>>> >>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das < >>>>> tathagata.das1...@gmail.com> wrote: >>>>> >>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train: >>>>>> Int, dest: String, time: Timestamp] * >>>>>> >>>>>> >>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* >>>>>> >>>>>> >>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by >>>>>> train, dest"* // after calling >>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)* >>>>>> >>>>>> >>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I am wondering what is the easiest and concise way to express the >>>>>>> computation below in Spark Structured streaming given that it supports >>>>>>> both >>>>>>> imperative and declarative styles? >>>>>>> I am just trying to select rows that has max timestamp for each >>>>>>> train? Instead of doing some sort of nested queries like we normally do >>>>>>> in >>>>>>> any relational database I am trying to see if I can leverage both >>>>>>> imperative and declarative at the same time. If nested queries or join >>>>>>> are >>>>>>> not required then I would like to see how this can be possible? I am >>>>>>> using >>>>>>> spark 2.1.1. >>>>>>> >>>>>>> Dataset >>>>>>> >>>>>>> Train Dest Time1 HK 10:001 SH >>>>>>> 12:001 SZ 14:002 HK 13:002 SH >>>>>>> 09:002 SZ 07:00 >>>>>>> >>>>>>> The desired result should be: >>>>>>> >>>>>>> Train Dest Time1 SZ 14:002 HK 13:00 >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >