Hi Piyush,

Could you try to add clientId into your aggregate function, and to track
the map of <clientId, your_original_aggregation> inside your new aggregate
function, and assemble what ever result when emit.
The SQL will looks like:

SELECT userId, some_aggregation(clientId, eventType, `timestamp`,
dataField)
FROM my_kafka_stream_table
GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

Kurt


On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <p.nar...@criteo.com> wrote:

> Hi folks,
>
>
>
> I’m getting started with Flink and trying to figure out how to express
> aggregating some rows into an array to finally sink data into an
> AppendStreamTableSink.
>
> My data looks something like this:
>
> userId, clientId, eventType, timestamp, dataField
>
>
>
> I need to compute some custom aggregations using a UDAF while grouping by
> userId, clientId over a sliding window (10 mins, triggered every 1 min). My
> first attempt is:
>
> SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField)
> as custom_aggregated
>
> FROM my_kafka_stream_table
>
> GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL
> '1' HOUR)
>
>
>
> This query works as I expect it to. In every time window I end up with
> inserts for unique userId + clientId combinations. What I want to do
> though, is generate a single row per userId in each time window and this is
> what I’m struggling with expressing along with the restriction that I want
> to sink this to an AppendStreamTableSink. I was hoping to do something like
> this:
>
>
>
> SELECT userId, COLLECT(client_custom_aggregated)
>
> FROM
>
> (
>
>   SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`,
> dataField) as custom_aggregated] as client_custom_aggregated
>
>   FROM my_kafka_stream_table
>
>   GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE,
> INTERVAL '1' HOUR)
>
> ) GROUP BY userId
>
>
>
> Unfortunately when I try this (and a few other variants), I run into the
> error, “AppendStreamTableSink requires that Table has only insert changes”.
> Does anyone know if there’s a way for me to compute my collect aggregation
> to produce one row per userId for a given time window?
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>

Reply via email to