I think I understand *groupByKey/**mapGroupsWithState *and I am still
trying to wrap my head around *GroupState<S>*. so, I believe I have a naive
questions to ask on *GroupState<S>*.

If I were to represent a state that has history of events (say 24 hours)
and say the number of events can be big for a given 24 hour period. where
do I store the state S? An external store like Kafka or a Database or a
Distributed File system ? I wonder if I can represent the state S using a
DataSet that represents the history of events? GroupState also has
.exists() and  .get() and if I am not wrong I should override these methods
right so comparisons and retrieval from external store can work?

Thanks!



On Wed, Aug 30, 2017 at 1:39 AM, kant kodali <kanth...@gmail.com> wrote:

> Hi TD,
>
> Thanks for the explanation and for the clear pseudo code and an example!
>
> mapGroupsWithState is cool and looks very flexible however I have few
> concerns and questions. For example
>
> Say I store TrainHistory as max heap from the Java Collections library and
> I keep adding to to this heap for 24 hours and at some point I will run out
> of Java heap space right? Do I need to store TrainHistory as a DataSet or
> DataFrame instead of in memory max heap object from Java Collections
> library?
>
> I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState*
> which approach is more efficient to solve this particular problem ?
>
> Thanks!
>
>
>
>
>
> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Aah, I might have misinterpreted. The groupBy + window solution would
>> give the max time for each train over 24 hours (non-overlapping window) of
>> event data (timestamped by activity_timestamp). So the output would be
>> like.
>>
>> Train     Dest   Window(activity_timestamp)    max(Time)
>> 1         HK     Aug28-00:00 to Aug29-00:00    10:00    <- updating
>> currently through aug29
>> 1         HK    Aug27-00:00 to Aug28-00:00     09:00    <- not updating
>> as no new updates coming in with activity_timestamp in this range.
>>
>> The drawback of this approach is that as soon as Aug28 starts, you have
>> wait for new event about a train to get a new max(time). You may rather
>> want a rolling 24 hour period, that is, the max time known over events in
>> the last 24 hours.
>> Then maintaining our own custom state using mapGroupsWithState/
>> flatMapGroupsWithState() is the best and most flexible option.
>> It is available in Spark 2.2 in Scala, Java.
>>
>> Here is an example that tracks sessions based on events.
>> Scala - https://github.com/apache/spark/blob/master/examples/src/
>> main/scala/org/apache/spark/examples/sql/streaming/Structu
>> redSessionization.scala
>>
>> You will have to create a custom per-train state which keeps track of
>> last 24 hours of trains history, and use that state to calculate the max
>> time for each train.
>>
>>
>> def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents],
>> state: GroupState[TrainHistory]): Long = {
>>     // for every event, update history (i.e. last 24 hours of events) and
>> return the max time from the history
>> }
>>
>> trainTimesDataset     // Dataset[TrainEvents]
>>   .groupByKey(_.train)
>>   .mapGroupsWithState(updateHistoryAndGetMax)
>>
>> Hope this helps.
>>
>>
>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote:
>>
>>> Hey TD,
>>>
>>> If I understood the question correctly, your solution wouldn't return
>>> the exact solution, since it also groups by on destination. I would say the
>>> easiest solution would be to use flatMapGroupsWithState, where you:
>>> .groupByKey(_.train)
>>>
>>> and keep in state the row with the maximum time.
>>>
>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> Yes. And in that case, if you just care about only the last few days of
>>>> max, then you should set watermark on the timestamp column.
>>>>
>>>>  *trainTimesDataset*
>>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>>>> "train", "dest")*
>>>> *  .max("time")*
>>>>
>>>> Any counts which are more than 5 days old will be dropped from the
>>>> streaming state.
>>>>
>>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thanks for the response. Since this is a streaming based query and in
>>>>> my case I need to hold state for 24 hours which I forgot to mention in my
>>>>> previous email. can I do ?
>>>>>
>>>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
>>>>> "24 hours"), "train", "dest").max("time")*
>>>>>
>>>>>
>>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>
>>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>>>>> Int, dest: String, time: Timestamp] *
>>>>>>
>>>>>>
>>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>>>>
>>>>>>
>>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>>>>> train, dest"*    // after calling
>>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I am wondering what is the easiest and concise way to express the
>>>>>>> computation below in Spark Structured streaming given that it supports 
>>>>>>> both
>>>>>>> imperative and declarative styles?
>>>>>>> I am just trying to select rows that has max timestamp for each
>>>>>>> train? Instead of doing some sort of nested queries like we normally do 
>>>>>>> in
>>>>>>> any relational database I am trying to see if I can leverage both
>>>>>>> imperative and declarative at the same time. If nested queries or join 
>>>>>>> are
>>>>>>> not required then I would like to see how this can be possible? I am 
>>>>>>> using
>>>>>>> spark 2.1.1.
>>>>>>>
>>>>>>> Dataset
>>>>>>>
>>>>>>> Train    Dest      Time1        HK        10:001        SH        
>>>>>>> 12:001        SZ        14:002        HK        13:002        SH        
>>>>>>> 09:002        SZ        07:00
>>>>>>>
>>>>>>> The desired result should be:
>>>>>>>
>>>>>>> Train    Dest      Time1        SZ        14:002        HK        13:00
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to