Hi folks, I’m getting started with Flink and trying to figure out how to express aggregating some rows into an array to finally sink data into an AppendStreamTableSink. My data looks something like this: userId, clientId, eventType, timestamp, dataField
I need to compute some custom aggregations using a UDAF while grouping by userId, clientId over a sliding window (10 mins, triggered every 1 min). My first attempt is: SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated FROM my_kafka_stream_table GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR) This query works as I expect it to. In every time window I end up with inserts for unique userId + clientId combinations. What I want to do though, is generate a single row per userId in each time window and this is what I’m struggling with expressing along with the restriction that I want to sink this to an AppendStreamTableSink. I was hoping to do something like this: SELECT userId, COLLECT(client_custom_aggregated) FROM ( SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated] as client_custom_aggregated FROM my_kafka_stream_table GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR) ) GROUP BY userId Unfortunately when I try this (and a few other variants), I run into the error, “AppendStreamTableSink requires that Table has only insert changes”. Does anyone know if there’s a way for me to compute my collect aggregation to produce one row per userId for a given time window? Thanks, -- Piyush