Hi Daniel, I am thinking you could use groupByKey & mapGroupsWithState to send whatever updates ("updated state") you want and then use .groupBy(window). will that work as expected?
Thanks, Kant On Mon, Aug 28, 2017 at 7:06 AM, daniel williams <daniel.willi...@gmail.com> wrote: > Hi all, > > I've been looking heavily into Spark 2.2 to solve a problem I have by > specifically using mapGroupsWithState. What I've discovered is that a > *groupBy(window(..))* does not work when being used with a subsequent > *mapGroupsWithState* and produces an AnalysisException of : > > *"mapGroupsWithState is not supported with aggregation on a streaming > DataFrame/Dataset;;"* > > I have http logs that have been rolled up via a previous jobs window > function in the form of: > > {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, > "account": "A", "verb": "GET","statusCode": 500, "eventCount": 10} > {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, > "account": "A", "verb": "GET","statusCode": 200, "eventCount": 89} > > In this data the *when* sub-object is of one minute blocks. I'd lock to > use a *window* function to aggregate that to 10 minute windows and sum > the eventCount by grouping on account, verb, and statusCode. From there > I'd like to *mapGroupsWithState* for each *account* and *verb* to produce > buckets for some configurable window, say 10 minutes for example's sake, of > the form: > > {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:20"}, > "account": "A", "verb": "GET", "totalRequests": 999, "totalErrors": 198} > > *mapGroupsWithState* is perfect for this but, as stated, I've not found a > way to apply a window function *and* use the mapsGroupsWithState. > > Example: > > ds.withColumn("bucket", $"when.from") > > .withWatermark("bucket", "1 minutes") > > .groupBy(window($"bucket", "10 minutes"), -- buckets and sums smaller > windowed events into a rolled up larger window event with summed eventCount > > $"account", > > $"verb", > > $"statusCode") > > .agg( > > sum($"eventCount") > > ) > > .map(r => Log(....)) > > .groupByKey(l => (l.when, l.account, l.verb)) -- maps > > .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout -- will > calculate totalErrors / totalRequests per bucket > > .EventTimeTimeout()) { > > case ((when: Window, account: String, verb: String), > > events: Iterator[Log], > > state: GroupState[SessionInfo]) => { > > .......... > > } > } > > > Any suggestions would be greatly appreciated. > > I've also noticed that *groupByKey().reduceGroups()* does not work with > *mapGroupsWithState > *which is another strategy that I've tried. > > Thanks. > > dan >