Hi Piyush, Could you try to add clientId into your aggregate function, and to track the map of <clientId, your_original_aggregation> inside your new aggregate function, and assemble what ever result when emit. The SQL will looks like:
SELECT userId, some_aggregation(clientId, eventType, `timestamp`, dataField) FROM my_kafka_stream_table GROUP BY userId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR) Kurt On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <p.nar...@criteo.com> wrote: > Hi folks, > > > > I’m getting started with Flink and trying to figure out how to express > aggregating some rows into an array to finally sink data into an > AppendStreamTableSink. > > My data looks something like this: > > userId, clientId, eventType, timestamp, dataField > > > > I need to compute some custom aggregations using a UDAF while grouping by > userId, clientId over a sliding window (10 mins, triggered every 1 min). My > first attempt is: > > SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) > as custom_aggregated > > FROM my_kafka_stream_table > > GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL > '1' HOUR) > > > > This query works as I expect it to. In every time window I end up with > inserts for unique userId + clientId combinations. What I want to do > though, is generate a single row per userId in each time window and this is > what I’m struggling with expressing along with the restriction that I want > to sink this to an AppendStreamTableSink. I was hoping to do something like > this: > > > > SELECT userId, COLLECT(client_custom_aggregated) > > FROM > > ( > > SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, > dataField) as custom_aggregated] as client_custom_aggregated > > FROM my_kafka_stream_table > > GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, > INTERVAL '1' HOUR) > > ) GROUP BY userId > > > > Unfortunately when I try this (and a few other variants), I run into the > error, “AppendStreamTableSink requires that Table has only insert changes”. > Does anyone know if there’s a way for me to compute my collect aggregation > to produce one row per userId for a given time window? > > > > Thanks, > > > > -- Piyush > > >